[Haskell] ANNOUNCE: Process library (for dataflow-oriented programming?)

Bulat Ziganshin bulatz at HotPOP.com
Wed Dec 7 17:36:05 EST 2005


Hello haskell,

Joel's program (discussed in cafe), which now uses MVars instead of
Channels to send data between threads, may be a good example of
dataflow-driven program: it consists of many hundreds of threads and
when one thread sends data to another through MVar, this thread in
most cases goes to sleep until receiving thread will process previous
value of this MVar. so, threads are waked up and asleep according to
passing values between them, and the whole program executes in order
defined by these data dependencies, as opposite to the order of
program statements

one year ago i developed small library, which can be helpful if you
want to use such style of programming. its ideas are modelled after
Unix pipes, which are widely used to assemble complex data processing
"engines" from simple "details". really this library is very thin
layer over direct using of forkOS, channels and MVars; nevertheless,
is is very convenient and beatiful 

you can download library as http://freearc.narod.ru/Process.tar.gz
this page also contains sources of my program where you can find
examples of using library in real toy :)  below is a guide to library
usage


to create pipe, which contains 3 processes - "producer", "transformer"
and "consumer":

runP ( producer |> transformer |> consumer )

each process in pipe runned in separate Haskell thread. process is
represented by ordinary Haskell function which gets an additional
parameter - handle, which can be used to receive data from previous
process in pipe (using receiveP) and send data to the next process
(using sendP). for example, abovementioned processes can be implemented
as: 

producer handle = mapM_ (sendP handle) [1..10]

transformer handle = replicateM_ 10 $
                       do x <- receiveP handle
                          sendP handle (x*2)

consumer handle = replicateM_ 10 $
                    do x <- receiveP handle
                       print x


if first process in pipe tries to use receiveP or last process in pipe
tries to use sendP, then run-time exception is generated. number of
processes in pipe can be arbitrary. because each process is just
ordinary Haskell function, you can add additional parameters to
processes when constructing pipes:

runP ( producer |> multiple 2 |> multiple 3 |> consumer )

multiple n handle = replicateM_ 10 $
                      do x <- receiveP handle
                         sendP handle (x*n)


moreover, you can construct pipe or part of it as ordinary data value,
which then can be runned by runP:

let pipe = case multipliers of
             [x] -> multiple x
             [x,y] -> multiple x |> multiple y
             [x,y,z] -> multiple x |> multiple y |> multiple z
             _         -> \handle -> fail "Zero or too much multipliers"
runP ( producer |> pipe |> consumer )


there is also "back channel", which can be used to "return" data to
previous process in the pipe, its operations is send_backP and
receive_backP. it can be used to return acknowledgments, synchronize
processes or to return resources back. brief example of its usage:

producer: sendP pipe (buf,len)      consumer: ;
          ;                                   (buf,len) <- receiveP pipe
          ;                                   hPutBuf file buf len
          ;                                   send_backP pipe ()
          receive_backP pipe                  ;
          --now we know that buf is free      ;

(i organized lines to show execution order)


if processes joined in pipe with "|>" then channel between them
uses MVar, so at any moment it may contain no more than 1 element. if
channel between two processes is created with "|>>>" then Chan is used,
which can contain arbitrary number of data items. be careful with such
channels, because they can grow to unlimited size. "|>" and "|>>>" can
be arbitrarily combined in one pipe:

runP ( producer |>>> multiple 2 |> multiple 3 |>>> consumer )

back channel (used by send_backP and receive_backP) are always
multi-element (uses Chan)


runP returns when all processes in pipe are finished. if any process
in pipe generates uncaught exception, then all processes in pipe are
killed and this exception is re-raised in thread called runP

pipe or single process can also be runned in background using
runAsyncP:

handle <- runAsyncP (multiple 2)

handle returned here can be used to interact with first and last
processes in pipe, in contrast to runP:

handle <- runAsyncP (multiple 2)
sendP handle 1
res <- receiveP handle

of course, pipe runned asynchronously is not required to perform
input, output, or both:

handle <- runAsyncP ( producer |> transformer )
handle <- runAsyncP (             transformer |> consumer )
handle <- runAsyncP ( producer |> transformer |> consumer )

currently channels to "open ends" of background pipe are always
one-element (uses MVars). types of channels inside pipe are determined,
as usual, by using the "|>" or "|>>>" operator

you can wait for finishing of asynchronous pipe with "joinP handle"


you can "replace" process, which is runned in pipe, with another process
or pipe by using runFuncP:

transformer handle =
  runFuncP ( multiple 2 |> multiple 3 )
    (receiveP handle)
    (send_backP handle)
    (sendP handle)
    (receive_backP handle)

actually, runFuncP just executes its pipe using 4 supported functions
for interaction with first and last pipe processes. to make this
obvious i will say how runP can be implemented in terms of runFuncP:

runP p = runFuncP p
          (error "First process in runP tried to receive")
          (error "First process in runP tried to send_back")
          (error "Last process in runP tried to send")
          (error "Last process in runP tried to receive_back")

runFuncP returns when all processes in its pipe are finished. using
it, you can create unlimited number of scenarios: running several
runFuncP sequentially to deal with different parts of your data,
process part of data itself and part with runFuncP, "redirect" your
input to process runned by runFuncP but consume output from runFuncP
in other way:

consumer handle =
  runFuncP ( multiple 4 )
    (receiveP handle)
    undefined
    print             -- output action
    undefined

          
runFuncP can also be used in other cases when we need to run single
process or whole pipe "imitating" its interaction with "external
world" by some functions:

  runFuncP transformer
    readLn        -- input action
    undefined
    print         -- output action
    undefined

    
it will also be interesting to have some functions which can just
insert "filtering" process or pipe on input or output side of current
process:

transformer channel =
  do channel <- insertInputFilterP (multiple 2) channel
     channel <- insertOutputFilterP (multiple 4) channel
     x <- receiveP channel
     sendP channel (x+1)

... but i don't need it and therefore this currently is not
implemented

the library also don't contains routines to check that input is ready
or output can be done (because i don't need it and it is a bad
programming style), nor routines to check for EOF (i prefer to encode
this explicitly in structure of data sent across channels; you also
can wrap your data in Maybe type and use Nothing to encode EOF)


-- 
Best regards,
 Bulat                          mailto:bulatz at HotPOP.com





More information about the Haskell mailing list