I'm on a learning haskell journey. But reading someone else's code and thoughts while educational is not that enlightening. Anyway, to cut the long story short:
- I (still) want to see what this haskell thing is all about
- I need a moderately complex problem to get a feeling of the haskell type system works, face the dreaded IO, while not wasting too much time
- I need a decent actor style abstraction for some social/population based methods experiments I'm into
So this is how ended with Why not try my hand at erlang style concurrency. The best thing about it is that it aligns nicely with an individualistic view of the world. I can write sequential code to do something, which from time to time communicates with the rest of them, while the rest of the time doesn't really care.
Erlang implements lightweight processes with asynchronous messaging. The cool twist it puts on it is having reception guards - if a message is not matched by the guard conditions it stays in the mailbox (channel), and the rest of the messages are scanned. The first matching message is removed from the mailbox, the remaining messages are left in the mailbox in arrival order.
Ok, let's roll the sleeves and start with some datatypes (all the code is attached at the bottom of this post):
-- a reply function, isolates the communication, contact only those who know of you
-- and a message
data Message a = Message {
reply :: (Message a) -> IO (),
message :: a }
--
-- a process is uniquely identified by it's mbox,
-- the back channel is for temporaries
-- fini is a "I'm finished flag" MVar
data Process a = Process {
mbox :: Chan( Message a),
back :: Chan( Message a),
fini :: MVar () }The Message type defines the format of a message. message is obviously the payload, of some type a. reply is a reply function - if you want to reply to this message call reply. It's a nice idiom I was reminded of by Cale@#haskell.
Process is a process descriptor type. It contains the mbox channel - it's never exposed directly, but always wrapped in a reply function. The channel can be used only with an instance of a Message type. back is a utility back channel, used for restoring the message arrival order in mbox.
How to initialse or kick start a process?
--start a new 'process'
--would it be good to parametrise the channels to use STM or not?
runProcess::((MVar (), a -> IO ()) -> IO t) -> (Process a -> IO c) -> IO c
runProcess feedback = bracket
(do --init
fini' <- newEmptyMVar
mbox' < newChan
back' < newChan
feedback ( fini', (\x -> do { yield >> writeChan mbox' (Message (writeChan mbox') x) }) )
return (Process mbox' back' fini'))
(\ self -> do --clean up
putMVar (fini self) ()
return () )
I use the fini as a it's all over flag. The rest is obvious - create the channels, give some feedback to the creator, return a process. The interesting discovery for me was the bracket function, from Control.Exception. It is the equivalent of scheme's unwind-protect. The first argument is the initialise function. The second the wrapping up function, that is regardless of the reason to exit, normal exit or some exception, you are always guaranteed to have this function executed. And this is where fini is used to signal the unknown custodian, usually the creator, that this thread is no more.
feedback is a callback to report the caller the essential information required - fini and a send - a lambda wrapping putting a message into the mbox channel. yield is there to guarantee rigorous context switching between threads. It is probably the worst performance grabber in this code. Still depending on usage, the cost could be amortised.
--conditional receive function - may return Nothing if the queue does not contain a message
-- is_blocking selects blocking behaviour
-- self is a process descriptor
-- check accepts or delays a message reception
receive :: Bool -> Process a -> (Message a -> Bool) -> (Maybe (Message a) -> IO c) -> IO c
receive is_blocking self check =
let get = (if is_blocking then getb else getnb)
recv = do
m <- (get self)
case m of
Just message -> case (check message) of
True -> do return (Just message)
False -> do
writeChan (back self) message
recv
return (Just message)
Nothing -> do return Nothing
in bracket
(recv)
(\x -> do
forward (back self) (mbox self)
return x)
Message reception. Again, wrapped in a bracket. is_blocking is a flag for blocking/non-blocking reading of mbox. self is a process descriptor. check is a message filter - to get or to skip. In the unwind phase (the second argument to bracket), all backed up messages in the back channel are forwarded back to the mbox.
The rest of the code is routine and requires little explanation. It is in the attachment.
So a simple producer, consumer example, just to check if it works. Oops, compiles, more likely.
Sample client code
producer :: Int -> (Int -> IO t2) -> (Int -> IO t2)-> Process Int -> IO ()
producer 0 send1 send2 self = do return ()
producer acc send1 send2 self = do
send1 acc
send2 acc
producer (acc - 1) send1 send2 self
--
consumer :: Int -> Int -> (Process Int) -> IO ()
consumer cid 0 self = do return ()
consumer cid acc self =
let
--m_print:: Maybe (Message Int) -> IO ()
m_print (Just (Message _ message )) =
print $ "consumer" ++ (show cid) ++ ": "
++ (show acc)
++ " message: "
++ show message
m_print Nothing =
print $ "consumer:" ++ (show cid) ++ ": "
++ (show acc) ++ show( Nothing::Maybe Int )
in do
must_receive self (\_-> True) m_print
consumer cid (acc - 1) self
--
feedback :: Chan a -> a -> IO ()
feedback = writeChan
--
--wait for all children to terminate, they should write to their fini MVar
wait_for_children [c] = do takeMVar c >> return ()
wait_for_children (c:cs) = do takeMVar c >> wait_for_children cs
Simple consumer - count down and print stuff including a message. Similarly simple producer. Wait for children - lifted (nearly) verbatim from Control.Concurrent, just no unsafeIO. The rest is sugar.
main = do
fbChan <- newChan
fb1 <- newChan
--
forkIO $ runProcess (feedback fbChan) (consumer 1 10000)
( endp, reply ) <- (readChan fbChan)::IO (MVar (), Int -> IO())
--
forkIO $ runProcess (feedback fbChan) (consumer 2 1000)
( endp1, reply1 ) <- (readChan fbChan)::IO (MVar (), Int -> IO())
--
forkIO $ runProcess (feedback fbChan) (producer 2000 reply reply1)
( endp2, reply2 ) <- (readChan fbChan)::IO (MVar (), Int -> IO())
--
wait_for_children [endp, endp1, endp2]
return ()
And finally mama main - boiler plate forkIO $ runProcess .... It looks simple. But it took some hair pulling at figuring out what the type of xyz actually is. Fair enough it is not really haskell's fault - this indirection for channels presents similar problems to typing a pointer value. How do you guarantee that that value is of the type it says? With so many indirections? Thanks god for -fglasgow-exts, otherwise this particular code won't compile.
I think the boilerplate forkIO.... stuff should be wrapped. Possibly this can be improved by using plt-scheme-like custodians for managing groups of threads.
bugs and other monsters
After feedback in irc (#haskell), thanks Saizan, I corrected and updated the attached files. All of it related to the forward function, which looks ugly and inefficient now. Sorry. I'll work on that,
forward :: Chan a -> Chan a -> IO ()
forward from to =
let fwd = do
g <- isEmptyChan from
unless g $ do
m <- readChan from
unGetChan to m
fwd
in do
g <- isEmptyChan from
unless g $ do
ms <- (getChanContents from)
(writeList2Chan from) (reverse ms)
fwd
Anyway, that needs more work. What is more worrying, is that the mbox, or readChan mbox, can be leaked outside runProcess. That can cause all kinds of havoc and it would be good, and quite interesting to eliminate this possibility. It requires some head scratching on my part. I'll try to keep an update on the blog of where this is leading, together with some applications I have in mind.
| Attachment | Size |
|---|---|
| ErlActors.hs_.txt | 5.48 KB |
| erlactorstest.hs_.txt | 1.71 KB |
Erlang-style concurrency in Haskell
Here's an older, recently resurrected, take on Erlang-in-Haskell:
http://nowonder.foldr.org:8080/roller/page/vs?entry=erlang_style_distributed_haskell
nice
I was looking for this code after I stumbled upon the dhs paper. It goes further than my modest goals for this thing, which are to use this setting to see how to handle a few delicate implementation details, and eroute to get a better understanding of haskell.
reply
Don't have a lot of money to buy a car? Worry no more, just because that is achievable to take the home loans to solve all the problems. Hence take a small business loan to buy all you need.