Changes between Version 6 and Version 7 of ErlangInHaskell


Ignore:
Timestamp:
Jun 8, 2011 9:14:00 PM (3 years ago)
Author:
jepst
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • ErlangInHaskell

    v6 v7  
    1 == Implementation == 
     1== Cloud Haskell == 
    22 
    3 I'm currently working on an implementation of an Erlang-like distributed computing framework for Haskell. The implementation is a work in progress and its interface differs somewhat from the following sketch. The actual interface is described here: http://www.cl.cam.ac.uk/~jee36/remote/ 
     3Cloud Haskell is a distributed computing framework for Haskell, implemented in Haskell. It's a tool for writing applications that coordinate their work on a cluster of commodity computers or virtual machines. This is useful for providing highly reliable, redundant, long-running services, as well as for building compute-intensive applications that can benefit from lots of hardware. It has two interfaces: 
    44 
    5 Here's a brief, high-level introduction to my implementation: 
     5  * The ''process layer'' (aka ErlangInHaskell): an interface based on message-passing between distributed processes. 
     6  * The ''task layer'' (aka SkywritingInHaskell): a fault-tolerant data-centric interface. 
    67 
    7 = Introduction = 
     8Here are some resources relevant to this project: 
    89 
    9 Many programming languages expose concurrent programming as a shared memory model, wherein multiple, concurrently executing programs, or threads, can examine and manipulate variables common to them all. Coordination between threads is achieved with locks, mutexes, and other synchronization mechanisms. In Haskell, these facilities are available as MVars. 
     10  * [http://www.cl.cam.ac.uk/~jee36/remote.pdf A paper on the process layer] 
     11  * [http://www.cl.cam.ac.uk/~jee36/remote/ Haddock documentation] 
     12  * [http://github.com/jepst/CloudHaskell The source repository] 
    1013 
    11 In contrast, languages like Erlang eschew shared data and require that concurrent threads communicate only by message-passing. The key insight of Erlang and languages like it is that reasoning about concurrency is much easier without shared memory. Under a message-passing scheme, a thread provides a  recipient, given as a thread identifier, and a unit of data; that data will be transferred to the recipient's address space and placed in a queue, where it can be retrieved by the recipient. Because data is never shared implicitly, this is a particularly good model for distributed systems. 
    12  
    13 This framework presents a combined approach to distributed framework. While it provides an Erlang-style message-passing system, it lets the programmer use existing paradigms from Concurrent Haskell. 
    14  
    15 = Terminology = 
    16  
    17 Location is represented by a ''node''. Usually, a node corresponds to an instance of the Haskell runtime system; that is, each independently executed Haskell program exists in its own node. Multiple nodes may run concurrently on a single physical host system, but the intention is that nodes run on separate hosts, to take advantage of more hardware. 
    18  
    19 The basic unit of concurrency is the ''process'' (as distinct from the same term as used at the OS level, applied to an instance of an executing program). A process can be considered a thread with a message queue, and is implemented as a lightweight GHC forkIO thread. There is little overhead involved in starting and executing processes, so programmers can start as many as they need. Processes can send message to other processes and receive messages from them. 
    20  
    21 The state associated with process management is wrapped up in the Haskell monad ProcesssM. All framework functions for managing and communicating with processes run in this monad, and most distributed user code will, as well. 
    22  
    23 = Process management = 
    24  
    25 Processes are created with the 'spawnRemote' and 'forkProcess' functions. Their type signatures help explain their operation: 
    26 {{{ 
    27  forkProcess :: ProcessM () -> ProcessM ProcessId 
    28  spawnRemote :: NodeId -> Closure (ProcessM ()) -> ProcessM ProcessId 
    29 }}} 
    30 'forkProcess' takes a function in the ProcessM monad, starts it concurrently as a process on the same node as the caller, and gives a ProcessId that can be used to send messages to it. 'spawnRemote' works analogously, but also takes a NodeId, indicating where to run the process. This lets the programmer start arbitrary functions on other nodes, which may be running on other hosts. Actual code is not transmitted to the other node; instead, a function identifier is sent. This works on the assumption that all connected nodes are running identical copies of the compiled Haskell binary (unlike Erlang, which allows new code to be sent to remote nodes at runtime). 
    31  
    32 We encode the function identifier used to start remote processes as a Closure. Closures may identify only top-level functions, without free variables. Since 'spawnRemote' is the only way to run a process on a remote node, functions run remotely cannot capture local mutable variables. This is the other key distinction between 'spawnRemote' and 'forkProcess': processes run locally with forkProcess share memory with each other, but processes started with 'spawnRemote' cannot (even if the target node is in fact the local node). 
    33  
    34 The following code shows how local variable captures works with 'forkProcess'. There is no analogous code for 'spawnRemote'. 
    35 {{{ 
    36  do m <- liftIO $ newEmptyMVar 
    37     forkProcess (liftIO $ putMVar m ()) 
    38     liftIO $ takeMVar m 
    39 }}} 
    40 Whether a process is running locally or remotely, and whether or not it can share memory, sending messages to it works the same: the 'send' function, which corresponds to Erlang's ! operator. 
    41 {{{ 
    42  send :: (Binary a) => ProcessId -> a -> ProcessM () 
    43 }}} 
    44 Given a ProcessId (from 'forkProcess' or 'spawnRemote') and a chunk of serializable data (implementing Haskell's 'Data.Binary.Binary' type class), we can send a message to the given process. The message will transmitted across the network if necessary and placed in the process's message queue. Note that 'send' will accept any type of data, as long as it implements Binary. Initially, all basic Haskell types implement binary, including tuples and arrays, and it's easy to implement Binary for user-defined types. How then does the receiving process know the type of message to extract from its queue? A message can receive processes by distinguishing their type using the 'receiveWait' function, which corresponds to Erlang's receive clause. The process can provide a distinct handler for each type of process that it knows how to deal with; unmatched messages remain on the queue, where they may be retrieved by later invocations of 'receiveWait'. 
    45  
    46 = Channels = 
    47  
    48 A ''channel'' provides an alternative to message transmission with ''send'' and ''receiveWait''. While ''send'' and ''receiveWait'' allow sending messages of any type, channels require messages to be of uniform type. Channels must be explicitly created with a call to ''makeChannel'': 
    49 {{{ 
    50  makeChannel :: (Binary a) => ProcessM (SendChannel a, ReceiveChannel a) 
    51 }}} 
    52 The resulting ''SendChannel'' can be used with the ''sendChannel'' function to insert messages into the channel, and the ''ReceiveChannel'' can be used with ''receiveChannel''. The SendChannel can be serialized and sent as part of messages to other processes, which can then write to it; the ReceiveChannel, though, cannot be serialized, although it can be read from multiple threads on the same node by variable capture. 
    53  
    54 = Setup and walkthrough = 
    55  
    56 Here I'll provide a basic example of how to get started with your first project on this framework.  
    57  
    58 Here's the overall strategy: We'll be running a program that will estimate pi, making use of available computing resources potentially on remote systems. There will be an arbitrary number of nodes, one of which will be designated the master, and the remaining nodes will be slaves. The slaves will estimate pi in such a way that their results can be combined by the master, and an approximation will be output. The more nodes, and the longer they run, the more precise the output. 
    59  
    60 In more detail: the master will assign each slave a region of the Halton sequence, and the slaves will use elements of the sequence to estimate the ratio of points in a unit square that fall within a unit circle, and that the master will sum these ratios.  
    61  
    62 Here's the procedure, step by step. 
    63  
    64 1. Compile `Pi6.hs`. If you have the framework installed correctly, it should be sufficient to run: 
    65 {{{ 
    66  ghc --make Pi6 
    67 }}} 
    68 2. Select the machines you want to run the program on, and select one of them to be the master. All hosts must be connected on a local area network. For the purposes of this explanation, we'll assume that you will run your master node on a machine named `masterhost` and you will run two slave nodes each on machines named `slavehost1` and `slavehost2`. 
    69  
    70 3. Copy the compiled executable `Pi6` to some location on each of the three hosts. 
    71  
    72 4. For each node, we need to create a configuration file. This is plain text file, usually named `config` and is expected to be in the current directory. There are many possible settings that can be set in the configuration file, but only a few are necessary for this example; the rest have sensible defaults. On `masterhost`, create a file named `config` with the following content: 
    73 {{{ 
    74 cfgRole MASTER 
    75 cfgHostName masterhost 
    76 cfgKnownHosts masterhost slavehost1 slavehost2 
    77 }}} 
    78 On `slavehost1`, create a file named `config` with the following content:  
    79 {{{ 
    80 cfgRole SLAVE 
    81 cfgHostName slavehost1 
    82 cfgKnownHosts masterhost slavehost1 slavehost2 
    83 }}} 
    84 On `slavehost2`, create a file named `config` with the following content:  
    85 {{{ 
    86 cfgRole SLAVE 
    87 cfgHostName slavehost2 
    88 cfgKnownHosts masterhost slavehost1 slavehost2 
    89 }}} 
    90 A brief discussion of these settings and what they mean: 
    91  
    92 The `cfgRole` setting determines the node's initial behavior. This is a string which is used to differentiate the two kinds of nodes in this example. More complex distributed systems might have more different kinds of roles. In this case, SLAVE nodes do nothing on startup, but just wait from a command from a master, whereas MASTER nodes seek out slave nodes and issue them commands. 
    93  
    94 The `cfgHostName` setting indicates to each node the name of the host it's running on. If blank or unspecified, this value will be determined automatically, but to play it safe, we specify it explicitly here. 
    95  
    96 The `cfgKnownHosts` setting provides a list of hosts that form part of this distributed execution. This is necessary so that the master node can find its subservient slave nodes. Depending on your network configuration, it may be possible for the master to discovery other hosts automatically. 
    97  
    98 5. Now, run the `Pi6` program twice in each of the slave nodes. There should now be four slave nodes awaiting instructions. 
    99  
    100 6. To start the execution, run `Pi6` on the master node. You should see output like this: 
    101 {{{ 
    102  2011-02-10 11:14:38.373856 UTC 0 pid://masterhost:48079/6/    SAY Starting... 
    103  2011-02-10 11:14:38.374345 UTC 0 pid://masterhost:48079/6/    SAY Telling slave nid://slavehost1:33716/ to look at range 0..1000000 
    104  2011-02-10 11:14:38.376479 UTC 0 pid://masterhost:48079/6/    SAY Telling slave nid://slavehost1:45343/ to look at range 1000000..2000000 
    105  2011-02-10 11:14:38.382236 UTC 0 pid://masterhost:48079/6/    SAY Telling slave nid://slavehost2:51739/ to look at range 2000000..3000000 
    106  2011-02-10 11:14:38.384613 UTC 0 pid://masterhost:48079/6/    SAY Telling slave nid://slavehost2:44756/ to look at range 3000000..4000000 
    107  2011-02-10 11:14:56.720435 UTC 0 pid://masterhost:48079/6/    SAY Done: 3141606141606141606141606141606141606141606141606141606141606141606141606141606141606141606141606141 
    108 }}} 
    109 Let's talk about what's going on here. 
    110  
    111 This output is generated by the framework's logging facility. Each line of output has the following fields, left-to-right: the date and time that the log entry was generated; the importance of the message (in this case 0); the process ID of the generating process; the subsystem or component that generated this message (in this case, SAY indicates that these messages were output by a call to the ''say'' function); and the body of the message. From these messages, we can see that the master node discovered four nodes running on two remote hosts; for each of them, the master emits a "Telling slave..." message. Note that although we had to specify the host names where the nodes were running in the config file, the master found all nodes running on each of those hosts. The log output also tells us which range of indices of the Halton sequence were assigned to each node. Each slave, having performed its calculation, sends its results back to the master, and when the master has received responses from all slaves, it prints out its estimate of pi and ends. The slave nodes continue running, waiting for another request. At this point, we could run the master again, or we can terminate the slaves manually with Ctrl-C or the kill command. 
    112  
    113  
    114  
    115  
    116  
    117  
    118  
    119  
    120 = Old = 
    121  
    122 This is an older, more abstract discussion of the implementation of this framework, and does not reflect the current state. 
    123  
    124 == Distributed Haskell Processes (or Erlang in Haskell) == 
    125  
    126 Haskell is great at shared-memory concurrency, but we do not yet  
    127 have a good story for distributed systems that need: 
    128  * Disjoint address spaces 
    129  * Message passing rather than shared memory 
    130  * Relatively long latencies on messages 
    131  * Possibility of failure (computers or links going down) 
    132 The current fashion for "cloud computing" also needs a similar 
    133 computational model. 
    134  
    135 I admire Erlang, which was designed for exactly this combination 
    136 of circumstances.  Which leads to the following question: could 
    137 we take the best of Erlang and embed it as a DSL into Haskell? 
    138 This page summarises a possible design. 
    139  
    140  
    141 == Processes == 
    142  
    143 I call the underlying monad `Erlang` in honour of Erlang, although 
    144 many details will differ. 
    145 {{{ 
    146 newtype Erlang a 
    147 instance Monad Erlang 
    148 liftIO :: IO a -> Erlang a    -- You can do IO in the Erlang monad 
    149 }}} 
    150  
    151 Creating a new process: 
    152 {{{ 
    153 spawn   :: Closure (Erlang ()) -> Erlang Pid 
    154 spawnAt :: Location -> Closure (Erlang ()) -> Erlang Pid 
    155 -- Problem: must not capture any free varaibles 
    156  
    157 self :: Erlang Pid 
    158 }}} 
    159 Here `Closure` is some kind of specification of what you want the 
    160 remote process to do.  More on that anon. 
    161  
    162 Processes communicate (only) over typed channels. 
    163 {{{ 
    164 newChan :: Erlang (Send a, Recv a) 
    165 send    :: Serialisable a => Send a -> a -> Erlang () 
    166 receive :: Serialisable a => Recv a -> Erlang a 
    167  
    168 class Serialisable a where 
    169   serialise :: a -> Bytestring 
    170 }}} 
    171 Using typed channels is different to Erlang, where messages are send 
    172 to the process. Note that while Send elements can be serialized  
    173 and transmitted elsewhere, but Recv elements cannot. Thus the message 
    174 queue represented by the channel lives on the node on which newChan 
    175 is called. 
    176  
    177 == Monitoring processes == 
    178  
    179 One of Erlang's most distinctive strength is that one process A can 
    180 "monitor" another B, and be told if B dies. 
    181  
    182 A process can exit, specifying a reason for doing so. Moreover, 
    183 a process can shoot down another process, again specifying a reason: 
    184 {{{ 
    185 exit :: ExitReason -> Erlang () 
    186   -- Exit this process 
    187   -- Like an exception... can be caught 
    188 catch :: Erlang a -> (ExitReason -> Erlang a) -> Erlang a 
    189  
    190 sendExit :: Pid -> ExitReason -> Erlang () 
    191   -- Send another process an exit signal with specified reason 
    192   -- In Erlang the recipient cannot catch this 
    193  
    194 data ExitReason  
    195   = Normal  
    196   | Kill  
    197   | Killed  
    198   | Exception Exception    -- Extensible? 
    199 }}} 
    200 This "shooting down" is done by a special kind of message, 
    201 called an '''exit signal'''.  An exit signal carries an `ExitReason`. 
    202  
    203 A process can "link" to another process: 
    204 {{{ 
    205 link :: Pid -> Erlang () 
    206   -- Link this process to another one 
    207   -- Exception if the process is dead 
    208   -- If the other process dies or terminates normally,  
    209   --   you get sent an exit signal 
    210   -- Linking is symmetric 
    211  
    212 monitor :: Pid -> Erlang () 
    213   -- If the other process dies or terminates normally,  
    214   --   you get sent a message 
    215   -- Monitoring is not symmetric 
    216   -- Can be implemented in terms of link (by spawning another process) 
    217  
    218 isProcessAlive :: Pid -> Erlang Bool 
    219 }}} 
    220 Note that if you link to a process and it dies, ''you'' get an exit 
    221 signal. 
    222 A process has a "trap-exit status", which controls what happens 
    223 if you receive an exit signal. 
    224 {{{ 
    225 trapExit :: Maybe (Send ExitReason) -> Erlang () 
    226 }}} 
    227 When a process receives an exit signal, it is processed as follows 
    228  
    229  * If reason = `Kill` => process dies, sending an exit signal with reason `Killed` to all linked processes 
    230  
    231  * Otherwise, if the process has `TrapExit` = `Just ch`, then exit-signal is turned into a message, and sent to `ch` 
    232  
    233  * Otherwise, if reason = `Normal`, do nothing 
    234  
    235  * Otherwise, the process dies sending `ExitSignal(same-reason)` to linked processes 
    236  
    237 Typically "worker processes" will have `TrapExit` off  
    238 and "system processes" will have them on, so that they can 
    239 respond to the death of their linked processes. 
    240  
    241 == Process registry == 
    242  
    243 The process registry lets you register processes so that others can find them. 
    244 There is one process register per node, and one global registry. The Erlang 
    245 API is something like this: 
    246 {{{ 
    247 getRegistry :: Erlang Pid 
    248  
    249 register :: String -> Pid -> Erlang () 
    250   -- Exception if name [or Pid] is already registered 
    251  
    252 unregister :: String -> Erlang () 
    253   -- Does not require that you are the guy! 
    254   -- Exception if not registered 
    255   -- A process that terminates is unregisted automatically 
    256  
    257 whereIs :: String -> Erlang (Maybe Pid) 
    258  
    259 registered :: Erlang [String] 
    260   -- All registered names 
    261 }}} 
    262 My sense is that it'd be better to implement the registry in Haskell. Then all  
    263 we'd need is a way to find the local registry process. 
    264  
    265 == Closures == 
    266  
    267 We need some way to specify a thunk to send to another node. 
    268 Basically just a pair of a code pointer and some free variables: 
    269 {{{ 
    270 data Closure a where 
    271   Clo :: Serialisable v => (v -#> a) -> v -> Clo a 
    272  
    273 eval :: Closure a -> a 
    274 eval (Clo fun args) = fun ## args 
    275 }}} 
    276  
    277 The type `(v -#> a)` is intended to be the type of "pure functions"; that is, 
    278 a function that 
    279  * Has no free variables; is pure code 
    280  * Can be represented simply a code pointer 
    281  
    282 One possible implementation is: 
    283 {{{ 
    284 type (v -#> a) = String 
    285 ## :: (v -#> a) -> v -> a 
    286 ## fun arg = ...lookup the function, do dynamic type check.... 
    287 }}} 
    288  
    289 A more interesting possibilty is to provide direct language support. 
    290  * A type `(v -#> a)` 
    291  * A intro form `(\# v. a)` 
    292  * An elim form `(f ## v)` 
    293  
    294 Plus perhaps some support for building values of type `Closure a` from 
    295 a term of type `a`. 
    296 {{{ 
    297    spawn <e>  
    298 means 
    299    spawn (Clo (\#(a,b,c) -> e) (a,b,c)) 
    300      -- where a,b,c are the free vars of e 
    301 }}} 
    302  
    303 Random notes  
    304 {{{ 
    305 f x = let g = <\y = x+y> in 
    306         ...<Clo (\#g -> ...) g >... 
    307  
    308  
    309 dmap :: Clo (a->b) -> [(a,Place)] -> Erlang [b] 
    310 dmap (| f |) = mapM (\(x,p) -> do { (s,r) <- newChan 
    311                             ; spawnAt p (| send s (f x) |) 
    312                             ; receive r }) 
    313  
    314   \(| f |) -> e 
    315   \c -> #define f = (c ##) in e 
    316   \(Clo f' x) -> #define f (f' # x) in e 
    317  
    318 [#| e |#] 
    319 <e>  ==   $(cloify [|e|])  -- Good try 
    320  
    321 G|vtop |-  
    322 -------------------- 
    323 G |- \#x -> e : (a -#> b) 
    324 }}} 
    325  
     14This is currently being worked on by [http://www.cl.cam.ac.uk/~jee36/jee36.html me]. Feel free to drop me a line.