[Haskell-cafe] Re[2]: [Haskell] ANNOUNCE: Process library (for
dataflow-oriented programming?)
Bulat Ziganshin
bulatz at HotPOP.com
Tue Dec 13 10:56:43 EST 2005
Hello Joel,
Tuesday, December 13, 2005, 3:04:11 PM, you wrote:
JR> How is your library licensed?
is costs many megabucks because it's very complex proprietary design
where some functions reach whole 12 lines! :)
of course, you can do what you want with this library. may be the
better way is to write your won, stealing one ot two ideas from mine
you can find another interesting works in:
http://www-i2.informatik.rwth-aachen.de/~stolz/Haskell/CA.hs
http://www-i2.informatik.rwth-aachen.de/Research/distributedHaskell/pbdhs-2001-09-20.tar.gz
(this one seems to be especially interesting for you, providing
"ports" - i think, in Erlang style)
http://www-i2.informatik.rwth-aachen.de/Research/distributedHaskell/network.tar.gz
http://quux.org/devel/missingh/missingh_0.12.0.tar.gz
(see Logging, Network, Threads directories)
JR> How can a process maintain internal state?
process in my lib is just an ordinary Haskell function and therefore
this is done as in any other Haskell functions :)
my examples are easified - to not bother with EOF i just organized in
each process a loop which sends and/or receives just 10 messages. in
my real program data sent between process are defined by structures
like this:
data Message = FileStarted String
| FileData String
| DataEnd
so typical communication scenario is:
sendP h (FileStart "1")
; sendP h (FileData "abc")
; sendP h (FileData "def")
; sendP h (FileData "ghi")
sendP h (FileStart "2")
; sendP h (FileData "qwer")
sendP h (FileStart "3")
; sendP h (FileData "123")
sendP h DataEnd
and each function realizing process finishes only when this process is
done. sender process organizes cycle which reads files and sends their
data to the channel. receiver process organizes cycle until `DataEnd`
is received
in one phrase, it's just the same organization as in your own program
:)))
JR> How would I use your library to code a socket reader/writer that
JR> writes received events to the socket and propagates back anything
JR> that is received?
JR> The producer/consumer in front of this network client would be
JR> another process that analyzes the events sent back to it and produces
JR> events based on the analysis.
i don't understand your questions
JR> How would I use it to launch a few network clients that seat there
JR> and process events until they decided to quit? The whole program
JR> needs to stay up until the last network client has exited.
JR> The pipeline to me looks like this:
JR> <-> Bot <-> Socket client ... Server
JR> /
JR> Bot launcher --- <-> Bot <-> Socket client ... Server
JR> \
JR> <-> Bot <-> Socket client ... Server
JR> Where bot launcher starts a predefined # of bots and collects results
JR> sent back by each one.
my lib is not appropriate for yor task, because it is oriented to
easify creation of processes which have only one input. but your main
thread must receive data from all bots, and bot must receive data from
two sources. the decision depends on the strategy of mixing these
inputs - will it be fair FIFO or more advanced schema?
if it's a FIFO then something like this (i'm skipped only exceptions
processing and creating socket-reader process inside of each bot -
writing to socket must be performed by bot itself):
{-# OPTIONS_GHC -cpp #-}
import Control.Concurrent
import Control.Exception
import Control.Monad
import Data.Array
import Data.Char
import Data.Either
import Data.HashTable
import Data.IORef
import Data.List
import Data.Maybe
import Data.Word
import Debug.Trace
import Foreign.C.String
import Foreign.Marshal.Alloc
import Foreign.Marshal.Array
import Foreign.Marshal.Pool
import Foreign.Marshal.Utils
import Foreign.Ptr
import Text.Regex
import System.IO.Unsafe
-----------------------------------------------------------------
--- Bot launcher implementation ---------------------------------
-----------------------------------------------------------------
main = do
(sendToMain, receiveFromBots) <- createChannel
bots <- foreach [1..10] $ createProcess . bot sendToMain
mapM_ (`sendToProcess` "Wake up, Neo!") bots
while receiveFromBots (/="I want to stop the Matrix!") print
-- ....
mapM_ killProcess bots
-- or, if you are more humane - "mapM_ waitProcessDie bots" :)
-----------------------------------------------------------------
--- Bot implementation ------------------------------------------
-----------------------------------------------------------------
bot sendToMain n receiveMessagesForMe = do
forever $ do
x <- receiveMessagesForMe
case x of
"Wake up, Neo!" -> sendToMain$ show n++": I'm not sleeping!"
"Are you wanna coffee?" -> sendToMain$ show n++": Yes, it is!"
yield
sendToMain "I want to stop the Matrix!"
-----------------------------------------------------------------
--- Process implementation details ------------------------------
-----------------------------------------------------------------
-- |Abstract type for all of our processes
data Process a = Process { pid :: ThreadId
, sender :: a -> IO () -- action which must be used to send sata to this process
, finished :: MVar () -- filled when this process is finisjed
}
-- Operations on processes
-- |Create process and return structure Process to communicate with it
-- The `process` function given are called with a parameter, which
-- represent an action which must be used by process to receive it's
-- data
createProcess process = do
finished <- newEmptyMVar
(sendToMe, receiveMessagesForMe) <- createChannel
pid <- forkIO $ do
process receiveMessagesForMe
`finally`
putMVar finished ()
return Process { pid = pid
, sender = sendToMe
, finished = finished
}
killProcess = killThread.pid
waitProcessDie = readMVar.finished
sendToProcess = sender
-----------------------------------------------------------------
--- Channel implementation details ------------------------------
-----------------------------------------------------------------
#if 1
-- Channels implemented via MVar
createChannel = do c <- newEmptyMVar
return (putMVar c, takeMVar c)
#else
-- Channels implemented via Chan
createChannel = do c <- newChan
return (writeChan c, readChan c)
#endif
-----------------------------------------------------------------
--- Random utility functions ------------------------------------
-----------------------------------------------------------------
while inp cond out = do
x <- inp
if (cond x)
then do out x
while inp cond out
else return x
forever action = do
action
forever action
foreach = flip mapM
JR> I think your library looks a bit like Yampa in that your processes
JR> are somewhat like signal functions.
yes, it is close things - data-driven execution and signal
processing
--
Best regards,
Bulat mailto:bulatz at HotPOP.com
More information about the Haskell-Cafe
mailing list