[Haskell-cafe] map/reduce example
Don Stewart
dons at galois.com
Sun Jan 6 23:46:16 EST 2008
Just going back to this, the channel issue may be solved by the
strict-concurrency package (strict Chans and MVars), and the general
problem of distributing arrays seems to be solved more thoroughly
by the data parallel array library (map, fold, scanl, filter, zip et
al), not just map and reduce?
It takes care of the problem of forking gang threads, distributing work,
and does so with a pure interface.
-- Don
bulat.ziganshin:
> Hello Haskell-Cafe,
>
> i've written small program which demonstrates how map/reduce may be
> implemented in Haskell. it counts amount of words in file, splitting
> it into 64kb blocks processed by two threads. their results are
> combined by another two threads. how it may be made better? in
> particular, is it strict in values send over channels?
>
> {-# LANGUAGE BangPatterns #-}
>
> import Control.Concurrent
> import Control.Concurrent.Chan
> import Control.Monad
> import Data.IORef
> import Data.ByteString.Char8 as B hiding (length)
> import System.Environment
> import System.IO
>
> main = do
> (file:_) <- getArgs
> h <- openBinaryFile file ReadMode
>
> map <- newChan
> reduce <- newChan
> result <- newChan
>
> replicateM_ 2 (forkIO$ mapThread map reduce)
> replicateM_ 2 (forkIO$ reduceThread reduce result)
>
> jobs <- new 0
> untilM (hIsEOF h) $ do
> str <- B.hGet h 65536
> writeChan map str
> jobs += 1
>
> jobs' <- val jobs
> writeChan reduce (0,-jobs')
>
> res <- readChan result
> print res
>
>
> mapThread map reduce =
> forever $ do
> str <- readChan map
> let !sum = length (B.words str)
> writeChan reduce (sum,1)
>
> reduceThread reduce result =
> forever $ do
> (sum1,n1) <- readChan reduce
> (sum2,n2) <- readChan reduce
> let (!sum,!n) = (sum1+sum2,n1+n2)
> case n of
> 0 -> writeChan result sum
> _ -> writeChan reduce (sum,n)
>
>
> untilM cond action = do
> deny <- cond
> unless deny $ do
> action
> untilM cond action
>
> forever action = action >> forever action
>
> infixl 0 =:, +=
> new = newIORef
> val = readIORef
> a=:b = writeIORef a b
> a+=b = modifyIORef a (\a->a+b)
>
>
> --
> Best regards,
> Bulat mailto:Bulat.Ziganshin at gmail.com
>
> _______________________________________________
> Haskell-Cafe mailing list
> Haskell-Cafe at haskell.org
> http://www.haskell.org/mailman/listinfo/haskell-cafe
More information about the Haskell-Cafe
mailing list