[Haskell-cafe] map/reduce example

Bulat Ziganshin bulat.ziganshin at gmail.com
Mon Nov 26 06:33:12 EST 2007

Hello Haskell-Cafe,

i've written small program which demonstrates how map/reduce may be
implemented in Haskell. it counts amount of words in file, splitting
it into 64kb blocks processed by two threads. their results are
combined by another two threads. how it may be made better? in
particular, is it strict in values send over channels?

{-# LANGUAGE BangPatterns #-}

import Control.Concurrent
import Control.Concurrent.Chan
import Control.Monad
import Data.IORef
import Data.ByteString.Char8 as B hiding (length)
import System.Environment
import System.IO

main = do
  (file:_) <- getArgs
  h <- openBinaryFile file ReadMode

  map    <- newChan
  reduce <- newChan
  result <- newChan

  replicateM_ 2 (forkIO$ mapThread map reduce)      
  replicateM_ 2 (forkIO$ reduceThread reduce result)

  jobs <- new 0
  untilM (hIsEOF h) $ do
    str <- B.hGet h 65536
    writeChan map str
    jobs += 1

  jobs' <- val jobs
  writeChan reduce (0,-jobs')

  res <- readChan result
  print res

mapThread map reduce =
  forever $ do
    str <- readChan map
    let !sum = length (B.words str)
    writeChan reduce (sum,1)

reduceThread reduce result =
  forever $ do
    (sum1,n1) <- readChan reduce
    (sum2,n2) <- readChan reduce
    let (!sum,!n) = (sum1+sum2,n1+n2)
    case n of
      0 -> writeChan result sum
      _ -> writeChan reduce (sum,n)

untilM cond action = do
  deny <- cond
  unless deny $ do
    untilM cond action

forever action  =  action >> forever action

infixl 0 =:, +=
new = newIORef
val = readIORef
a=:b = writeIORef a b
a+=b = modifyIORef a (\a->a+b)

Best regards,
 Bulat                          mailto:Bulat.Ziganshin at gmail.com

More information about the Haskell-Cafe mailing list