{-# OPTIONS_GHC -fglasgow-exts #-} --This code is distributed under the General Public Licence version 2 (GPLv2) or higher module ErlActors ( Message(..), Process(..), --is this export required? or could we get away with returning a function through runProcess... runProcess, receive, must_receive, maybe_receive ) where import Control.Monad import Control.Exception (bracket) import Control.Concurrent ( ThreadId, forkIO, myThreadId, yield ) import Control.Concurrent.Chan ( Chan, newChan, writeChan, readChan, unGetChan, isEmptyChan, getChanContents, writeList2Chan ) import Control.Concurrent.MVar ( MVar, newEmptyMVar, putMVar ) --------------------------------------------------------------------------------------------------- -- A simplistic implementation for erlang style actors/concurrency -- TODO: add control groups, something similar to plt scheme's custodians, would be best -- ref: http://download.plt-scheme.org/doc/371/html/mzscheme/mzscheme-Z-H-9.html#node_sec_9.2 -- security guards will be an overkill, thread groups are the scheduler's job, and -- I don't have an intention of writing a scheduler -- -- TODO: improve receive/forward combo, at the moment it has bad performance -- -- TODO: how to prevent a process passing (receive self) and/or Process to a third party? -- that might prove to have disastrous consequences -- -- TODO: spawnProcess - wrapping the process bootsrapping code, see erlactorstest.hs -- should handle both forkIO and forkOS -- -- TODO: would be cool to have transparent distributed computation (forkNet?) -- -- TODO: simplify - I have a feeling that this code could be simpler, but obviously -- my haskell knowledge is too little to spot that stuff yet --------------------------------------------------------------------------------------------------- -- areply function, isolates the communication, contact only those who know of you data Message a = Message { reply :: (Message a) -> IO (), message :: a } -- Process as a functor? -- fmap f m -> -- a process is uniquely identified by it's mbox, the back channel is for temporaries data Process a = Process { mbox :: Chan( Message a), back :: Chan( Message a), fini :: MVar () } --start a new 'process' --would it be good to parametrise the channels to use STM or not? runProcess::((MVar (), a -> IO ()) -> IO t) -> (Process a -> IO c) -> IO c runProcess feedback = bracket (do --init fini' <- newEmptyMVar mbox' <- newChan back' <- newChan feedback ( fini', (\x -> do { yield >> writeChan mbox' (Message (writeChan mbox') x) }) ) return (Process mbox' back' fini')) (\ self -> do --clean up putMVar (fini self) () return () ) --conditional receive function - may return Nothing if the queue does not contain a message -- is_blocking selects blocking behaviour -- self is a process descriptor -- check accepts or delays a message reception receive :: Bool -> Process a -> (Message a -> Bool) -> (Maybe (Message a) -> IO c) -> IO c receive is_blocking self check = let get = (if is_blocking then getb else getnb) recv = do m <- (get self) case m of Just message -> case (check message) of True -> do return (Just message) False -> do writeChan (back self) message return (Just message) Nothing -> do return Nothing in bracket (recv) (\x -> do forward (back self) (mbox self) return x) -- and some candy floss -- blocking reception must_receive :: forall a c. Process a -> (Message a -> Bool) -> (Maybe (Message a) -> IO c) -> IO c must_receive = receive True --non-blocking reception maybe_receive :: forall a c. Process a -> (Message a -> Bool) -> (Maybe (Message a) -> IO c) -> IO c maybe_receive = receive False -------------------------------------------------------------------------------------------- -- private definitions -------------------------------------------------------------------------------------------- --forward all messages from one queue to another, used to restore the mbox forward :: Chan a -> Chan a -> IO () forward from to = let fwd = do g <- isEmptyChan from unless g $ do m <- readChan from unGetChan to m fwd in do g <- isEmptyChan from unless g $ do --TODO: repair, look at receive too --don't like it, since it is a performance killer, but it will do for now --need to find a better way of doing this, the reverse is required to --keep the origianl order ms <- (getChanContents from) (writeList2Chan from) (reverse ms) fwd -- a non-blocking get, blocks on an empty channel getnb :: Process a -> IO (Maybe (Message a)) getnb self = do yield g <- isEmptyChan (mbox self) case g of False -> do m <- readChan (mbox self) return (Just m) True -> return Nothing -- a blocking get, blocks on an empty channel getb :: Process a -> IO (Maybe (Message a)) getb self = do yield m <- readChan (mbox self) return (Just m)