[Haskell-cafe] Re[2]: [Haskell] ANNOUNCE: Process library (for dataflow-oriented programming?)

Bulat Ziganshin bulatz at HotPOP.com
Tue Dec 13 10:56:43 EST 2005


Hello Joel,

Tuesday, December 13, 2005, 3:04:11 PM, you wrote:
JR> How is your library licensed?

is costs many megabucks because it's very complex proprietary design
where some functions reach whole 12 lines! :)

of course, you can do what you want with this library. may be the
better way is to write your won, stealing one ot two ideas from mine

you can find another interesting works in:

http://www-i2.informatik.rwth-aachen.de/~stolz/Haskell/CA.hs
http://www-i2.informatik.rwth-aachen.de/Research/distributedHaskell/pbdhs-2001-09-20.tar.gz
(this one seems to be especially interesting for you, providing
"ports" - i think, in Erlang style)
http://www-i2.informatik.rwth-aachen.de/Research/distributedHaskell/network.tar.gz
http://quux.org/devel/missingh/missingh_0.12.0.tar.gz
(see Logging, Network, Threads directories)

JR> How can a process maintain internal state?

process in my lib is just an ordinary Haskell function and therefore
this is done as in any other Haskell functions :)

my examples are easified - to not bother with EOF i just organized in
each process a loop which sends and/or receives just 10 messages. in
my real program data sent between process are defined by structures
like this:

data Message = FileStarted String
             |   FileData String
             | DataEnd

so typical communication scenario is:

sendP h (FileStart "1")
; sendP h (FileData "abc")
; sendP h (FileData "def")
; sendP h (FileData "ghi")
sendP h (FileStart "2")
; sendP h (FileData "qwer")
sendP h (FileStart "3")
; sendP h (FileData "123")
sendP h DataEnd

and each function realizing process finishes only when this process is
done. sender process organizes cycle which reads files and sends their
data to the channel. receiver process organizes cycle until `DataEnd`
is received

in one phrase, it's just the same organization as in your own program
:)))


JR> How would I use your library to code a socket reader/writer that  
JR> writes received events to the socket and propagates back anything  
JR> that is received?

JR> The producer/consumer in front of this network client would be
JR> another process that analyzes the events sent back to it and produces  
JR> events based on the analysis.

i don't understand your questions

JR> How would I use it to launch a few network clients that seat there  
JR> and process events until they decided to quit? The whole program  
JR> needs to stay up until the last network client has exited.

JR> The pipeline to me looks like this:

JR>                     <-> Bot <-> Socket client ... Server
JR>                   /
JR> Bot launcher ---   <-> Bot <-> Socket client ... Server
JR>                   \
JR>                     <-> Bot <-> Socket client ... Server

JR> Where bot launcher starts a predefined # of bots and collects results  
JR> sent back by each one.

my lib is not appropriate for yor task, because it is oriented to
easify creation of processes which have only one input. but your main
thread must receive data from all bots, and bot must receive data from
two sources. the decision depends on the strategy of mixing these
inputs - will it be fair FIFO or more advanced schema?

if it's a FIFO then something like this (i'm skipped only exceptions
processing and creating socket-reader process inside of each bot -
writing to socket must be performed by bot itself):

{-# OPTIONS_GHC -cpp #-}
import Control.Concurrent
import Control.Exception
import Control.Monad
import Data.Array
import Data.Char
import Data.Either
import Data.HashTable
import Data.IORef
import Data.List
import Data.Maybe
import Data.Word
import Debug.Trace
import Foreign.C.String
import Foreign.Marshal.Alloc
import Foreign.Marshal.Array
import Foreign.Marshal.Pool
import Foreign.Marshal.Utils
import Foreign.Ptr
import Text.Regex
import System.IO.Unsafe

-----------------------------------------------------------------
--- Bot launcher implementation ---------------------------------
-----------------------------------------------------------------

main = do
  (sendToMain, receiveFromBots) <- createChannel
  bots <- foreach [1..10] $ createProcess . bot sendToMain
  mapM_ (`sendToProcess` "Wake up, Neo!") bots
  while receiveFromBots (/="I want to stop the Matrix!") print
  -- ....
  mapM_ killProcess bots
  -- or, if you are more humane - "mapM_ waitProcessDie bots" :)


-----------------------------------------------------------------
--- Bot implementation ------------------------------------------
-----------------------------------------------------------------

bot sendToMain n receiveMessagesForMe = do
  forever $ do
    x <- receiveMessagesForMe
    case x of
      "Wake up, Neo!" -> sendToMain$ show n++": I'm not sleeping!"
      "Are you wanna coffee?" -> sendToMain$ show n++": Yes, it is!"
    yield
    sendToMain "I want to stop the Matrix!"


-----------------------------------------------------------------
--- Process implementation details ------------------------------
-----------------------------------------------------------------

-- |Abstract type for all of our processes
data Process a = Process { pid    :: ThreadId
                         , sender :: a -> IO ()    -- action which must be used to send sata to this process
                         , finished :: MVar ()     -- filled when this process is finisjed
                         }

-- Operations on processes

-- |Create process and return structure Process to communicate with it
-- The `process` function given are called with a parameter, which
-- represent an action which must be used by process to receive it's
-- data
createProcess process = do
  finished <- newEmptyMVar
  (sendToMe, receiveMessagesForMe) <- createChannel
  pid <- forkIO $ do
           process receiveMessagesForMe
          `finally`
           putMVar finished ()
  return Process { pid      = pid
                 , sender   = sendToMe
                 , finished = finished
                 }

killProcess = killThread.pid
waitProcessDie = readMVar.finished
sendToProcess = sender


-----------------------------------------------------------------
--- Channel implementation details ------------------------------
-----------------------------------------------------------------

#if 1
-- Channels implemented via MVar
createChannel = do c <- newEmptyMVar
                   return (putMVar c, takeMVar c)
#else
-- Channels implemented via Chan
createChannel = do c <- newChan
                   return (writeChan c, readChan c)
#endif


-----------------------------------------------------------------
--- Random utility functions ------------------------------------
-----------------------------------------------------------------

while inp cond out = do
  x <- inp
  if (cond x)
    then do out x
            while inp cond out
    else return x

forever action = do
  action
  forever action

foreach = flip mapM




JR> I think your library looks a bit like Yampa in that your processes  
JR> are somewhat like signal functions.

yes, it is close things - data-driven execution and signal
processing







-- 
Best regards,
 Bulat                            mailto:bulatz at HotPOP.com





More information about the Haskell-Cafe mailing list