[Haskell-cafe] Re: map/reduce example

Mirko Rahn rahn at ira.uka.de
Tue Nov 27 06:12:11 EST 2007

> i've written small program which demonstrates how map/reduce may be
> implemented in Haskell. it counts amount of words in file, splitting
> it into 64kb blocks processed by two threads. their results are
> combined by another two threads. how it may be made better?

Hello, I do not know anything about

> {-# LANGUAGE BangPatterns #-}

But your program logic certainly needs some polishing. Your idea is, to 
keep track of the number of thunks

>   jobs <- new 0
>   untilM (hIsEOF h) $ do
>     str <- B.hGet h 65536
>     writeChan map str
>     jobs += 1

>   jobs' <- val jobs
>   writeChan reduce (0,-jobs')

and to collect partial sums, until all jobs are done.

> reduceThread reduce result =
>   forever $ do
>     (sum1,n1) <- readChan reduce     -- (*)
>     (sum2,n2) <- readChan reduce
>     let (!sum,!n) = (sum1+sum2,n1+n2)
>     case n of
>       0 -> writeChan result sum
>       _ -> writeChan reduce (sum,n)

I don't like the 'forever' here, especially since it prevents the 
program from re-usage.

The real problem is the possibility of deadlocks: Suppose the situation, 
where all thunks are done, reducer A is directly before (*) and reducer 
B directly after (*). Now main sends the message (0,-jobs) and reducer A 
receives this message. Then both reducers are in state directly after 
(*) but no more messages are generated: A deadlock.

The classical solution is to write an explicit END message into the 
channel and to keep track of the number of running mappers/reducers and 
not of the number of jobs.

A simple attempt is

mappers = 5
reducers = 3

main = do
   (file:_) <- getArgs
   h <- openBinaryFile file ReadMode

   map <- newChan
   red <- newChan
   res <- newChan

   replicateM_ mappers (forkIO $ mapThread map red)
   replicateM_ reducers (forkIO $ reduceThreadL red res 0)

   untilM (hIsEOF h) $ B.hGet h 65536 >>= writeChan map . Left

   writeChan map (Right mappers) -- explicit end-of-input message

   readChan result >>= print

mapThread map red = readChan map >>= \ msg ->
   case msg of
     Left str              -> writeChan red (Left $ length $ B.words str)
                           >> mapThread map reduce
     Right 1               -> writeChan red (Right reducers)
                           -- ^ explicit end-of-reduce message
     Right mappers_running -> writeChan map (Right $ mappers_running-1)

reduceThreadL red result sum = readChan red >>= \ msg ->
   case msg of
     Left sum1              -> reduceThreadL red result (sum+sum1)
     -- ^ a new partial sum: reduce locally
     Right 1                -> writeChan res sum
     -- ^ I'm the last running reducer: produce the result now
     Right reducers_running -> writeChan red (Left sum)
                            >> writeChan red (Right $ reducers_running-1)
     -- ^ reduce phase has ended, I'm not the last reducer:
     --   send partial reduction result and go home

untilM cond action = cond >>= flip unless (action >> untilM cond action)

It is quite easy to see that the program stops always, producing the 
correct result. There is no 'forever', so the program is re-usable. 
However, it is not exactly the same as the original program, since the 
local reduction results are stored locally and not send to the channel. 
If you want to do so, the problem becomes more complex. You have to 
solve the problem of distributed termination detection. There are 
algorithms for this, for example some simple token-based that need O(P) 
steps, where P is the number of processes involved. There are also some 
O(log P) algorithms, counting activation messages and summing up in two 
wave fronts these numbers. These algorithms are formulated for message 
passing environments with the capability to send a message to a specific 
receiver. I'm quite not sure how to adopt it to the channel-based 
environment (except via simulating the message passing environment).

/BR, Mirko Rahn

More information about the Haskell-Cafe mailing list