[Haskell-cafe] How to increase performance using concurrency for sequential producer-consumer problem
blamario at acanac.net
Tue Feb 14 06:36:30 CET 2012
On 12-02-13 10:12 AM, Roel van Dijk wrote:
> I have a program which I believe can benefit from concurrency. But I
> am wondering if the mechanisms I need already exist somewhere on
You can try monad-coroutine. Here is an incomplete transcription of
type Producer a = Coroutine (Yield a) IO ()
type Converter a b = Coroutine (EitherFunctor (Await a) (Yield b)) IO ()
type Consumer b = Coroutine (Await b) IO ()
producer :: Producer Int
producer = forM_ [1..10] yield
converter :: Converter Int Int
converter = forever $ do a <- mapSuspension LeftF await
mapSuspension RightF $ yield (10 * a)
consumer :: Consumer Int
consumer = forever (await >>= lift . print)
main = seesaw parallelBinder awaitYieldResolver consumer producer
The main function is incorrect because it's not using the
converter. I didn't add that because there's no ready function for
invoking it in the package so it wouldn't be a one-liner, and also
because I'm not sure if it should be a monadic coroutine of its own or a
The producer and consumer coroutines' steps should be running in
parallel. I say "should" because I never got any decent parallel speedup
with this scheme, but I'm not sure if the problem is in the library code
or in the earlier versions of GHC.
One thing that's missing here is a way for the producer to run
ahead of the consumer and produce more results that would be buffered.
At the moment the producer blocks until the consumer is ready. I don't
have a ready solution for this, but I'll give it some thought.
> Here is a sketch of my program, in literate Haskell:
>> module Problem where
>> import Control.Monad ( forM_ )
> The producer produces values. It blocks until there are now more
> values to produce. Each value is given to a callback function.
>> type Producer a = (a -> IO ()) -> IO ()
> The converter does some work with a value. This work is purely CPU and
> it is the bottleneck of the program. The amount of work it has to do
> is variable.
>> type Converter a b = a -> b
> The consumer does something with the value calculated by the
> converter. It is very important that the consumer consumes the values
> in the same order as they are produced.
>> type Consumer b = b -> IO ()
> Dummy producer, converter and consumer:
>> producer :: Producer Int
>> producer callback = forM_ [1..10] callback
>> converter :: Converter Int Int
>> converter = (*10)
>> consumer :: Consumer Int
>> consumer = print
> A simple driver. Does not exploit concurrency.
>> simpleDriver :: Producer a -> Converter a b -> Consumer b -> IO ()
>> simpleDriver producer converter consumer = producer (consumer . converter)
>> current_situation :: IO ()
>> current_situation = simpleDriver producer converter consumer
> Ideally I would like a driver that spawns a worker thread for each
> core in my system. But the trick is in ensuring that the consumer is
> offered results in the same order as they are generated by the
> I can envision that some kind of storage is necessary to keep track of
> results which can not yet be offered to the consumer because it is
> still waiting for an earlier result.
> Is there any package on Haskell that can help me with this problem? Or
> do I have to implement it using lower level concurrency primitives?
> Haskell-Cafe mailing list
> Haskell-Cafe at haskell.org
More information about the Haskell-Cafe