[Haskell-cafe] How to increase performance using concurrency for sequential producer-consumer problem

John Lato jwlato at gmail.com
Tue Feb 14 13:05:55 CET 2012


I would use bounded STM channels (from the stm-chans package) for
communication; this would keep the producer from getting too far ahead
of the converters.  You'd need to tag items as they're produced (an
Integer should be fine) also, and keep track of the tags.  A TVar
should suffice for that.  The basic outline is that the producer
writes to a channel.  Each converter thread reads from that channel,
and when it's finished, checks the output index TVar.  If the
converter's item index is equal to the current output index,the
converter puts its value into an output channel and increments the
output index.  A final consumer reads from the output channel and
processes each item in turn.

Or instead of a bounded input channel, the producer could write to a
TMVar.  Which is better probably depends on the details of your
production pattern.

You certainly could use something like iteratee-stm or the conduits
variant, but they wouldn't directly help with concurrency of
converters, nor with synchronization.  What they would give you is
concurrency between the producer, converter, and consumer.  Of course
you could build your own converter step to work within that framework.

John L.

> Date: Mon, 13 Feb 2012 16:12:22 +0100
> From: Roel van Dijk <vandijk.roel at gmail.com>
> Subject: [Haskell-cafe] How to increase performance using concurrency
>        for sequential producer-consumer problem
> To: Haskell Caf? <haskell-cafe at haskell.org>
> Message-ID:
>        <CABw4ky7Pu_RJSj5hQ_7GS_VTqv-dA4iYV0VhEY_J1YxxYBLyvg at mail.gmail.com>
> Content-Type: text/plain; charset=UTF-8
>
> Hello,
>
> I have a program which I believe can benefit from concurrency. But I
> am wondering if the mechanisms I need already exist somewhere on
> Hackage.
>
> Here is a sketch of my program, in literate Haskell:
>
>> module Problem where
>> import Control.Monad ( forM_ )
>
> The producer produces values. It blocks until there are now more
> values to produce. Each value is given to a callback function.
>
>> type Producer a = (a -> IO ()) -> IO ()
>
> The converter does some work with a value. This work is purely CPU and
> it is the bottleneck of the program. The amount of work it has to do
> is variable.
>
>> type Converter a b = a -> b
>
> The consumer does something with the value calculated by the
> converter. It is very important that the consumer consumes the values
> in the same order as they are produced.
>
>> type Consumer b = b -> IO ()
>
> Dummy producer, converter and consumer:
>
>> producer :: Producer Int
>> producer callback = forM_ [1..10] callback
>
>> converter :: Converter Int Int
>> converter = (*10)
>
>> consumer :: Consumer Int
>> consumer = print
>
> A simple driver. Does not exploit concurrency.
>
>> simpleDriver :: Producer a -> Converter a b -> Consumer b -> IO ()
>> simpleDriver producer converter consumer = producer (consumer . converter)
>
>> current_situation :: IO ()
>> current_situation = simpleDriver producer converter consumer
>
> Ideally I would like a driver that spawns a worker thread for each
> core in my system. But the trick is in ensuring that the consumer is
> offered results in the same order as they are generated by the
> producer.
>
> I can envision that some kind of storage is necessary to keep track of
> results which can not yet be offered to the consumer because it is
> still waiting for an earlier result.
>
> Is there any package on Haskell that can help me with this problem? Or
> do I have to implement it using lower level concurrency primitives?
>
> Regards,
> Roel
>
>



More information about the Haskell-Cafe mailing list