[Haskell-cafe] map/reduce example
Bulat Ziganshin
bulat.ziganshin at gmail.com
Mon Nov 26 06:33:12 EST 2007
Hello Haskell-Cafe,
i've written small program which demonstrates how map/reduce may be
implemented in Haskell. it counts amount of words in file, splitting
it into 64kb blocks processed by two threads. their results are
combined by another two threads. how it may be made better? in
particular, is it strict in values send over channels?
{-# LANGUAGE BangPatterns #-}
import Control.Concurrent
import Control.Concurrent.Chan
import Control.Monad
import Data.IORef
import Data.ByteString.Char8 as B hiding (length)
import System.Environment
import System.IO
main = do
(file:_) <- getArgs
h <- openBinaryFile file ReadMode
map <- newChan
reduce <- newChan
result <- newChan
replicateM_ 2 (forkIO$ mapThread map reduce)
replicateM_ 2 (forkIO$ reduceThread reduce result)
jobs <- new 0
untilM (hIsEOF h) $ do
str <- B.hGet h 65536
writeChan map str
jobs += 1
jobs' <- val jobs
writeChan reduce (0,-jobs')
res <- readChan result
print res
mapThread map reduce =
forever $ do
str <- readChan map
let !sum = length (B.words str)
writeChan reduce (sum,1)
reduceThread reduce result =
forever $ do
(sum1,n1) <- readChan reduce
(sum2,n2) <- readChan reduce
let (!sum,!n) = (sum1+sum2,n1+n2)
case n of
0 -> writeChan result sum
_ -> writeChan reduce (sum,n)
untilM cond action = do
deny <- cond
unless deny $ do
action
untilM cond action
forever action = action >> forever action
infixl 0 =:, +=
new = newIORef
val = readIORef
a=:b = writeIORef a b
a+=b = modifyIORef a (\a->a+b)
--
Best regards,
Bulat mailto:Bulat.Ziganshin at gmail.com
More information about the Haskell-Cafe
mailing list