Pragmatic concurrency Re: [Haskell-cafe] multiple computations, same input

Tomasz Zielonka tomasz.zielonka at gmail.com
Fri Mar 31 00:38:15 EST 2006


On Thu, Mar 30, 2006 at 05:05:30PM +0200, Tomasz Zielonka wrote:
> Actually, it may require no effort from compiler implementors.
> I just managed to get the desired effect in current GHC! :-)

More specifically: in uniprocessor GHC 6.4.1.

> I implemented your idea of stepper by writing the function stepper that
> rewrites the list invoking "yield" every 500 processed elements. This
> way I can concurrently consume the list without the space leak - when a
> thread evaluates too many list elements, it gets preempted. I think it
> suffices if RTS employs a round-robin scheduler. I am not sure it's
> important.

I just realised that this technique will only work on uniprocessors! :-(
I relies on only one thread running at any moment. If there are multiple
CPUs, yielding won't stop the current thread from consuming the list.

> The code isn't as beautiful as the naive wc implementation. That's
> because I haven't yet thought how to hide newEmptyMVar, forkIO, putMVar
> i takeMVar. Perhaps someone will come up with a solution to this.

Here is my attempt to make the code more pure. The "concurrently"
combinator uses CPS, because otherwise it was a bit difficult to split
evaluation into two phases - first forking the thread, second taking the
result from an MVar. I also tried using additional data constructor
wrapper for the result, so first phase occured when forcing the
constructor, and the second when forcing it's parameter, but it was
tricky to use it properly considering that "let" and "where" bindings
use irrefutable patterns.

    import Control.Concurrent
    import Control.Monad
    import System.IO.Unsafe

    stepper :: Int -> [a] -> [a]
    stepper n l = s n l
      where
        s 0 (x:xs) = unsafePerformIO $ do
            yield
            return (x : s n xs)
        s i (x:xs) = x : s (i-1) xs
        s _ []     = []

    concurrently :: a -> (a -> b) -> b
    concurrently e f = unsafePerformIO $ do
        var <- newEmptyMVar
        forkIO $ putMVar var $! e
        return (f (unsafePerformIO (takeMVar var)))

    wc :: String -> (Int, Int, Int)
    wc cs0 =
        let cs = stepper 500 cs0 in
        concurrently (length (lines cs)) $ \ll ->
        concurrently (length (words cs)) $ \ww ->
        concurrently (length cs) $ \cc ->
        (ll, ww, cc)

    main = do
        cs <- getContents
        print (wc cs)

It's probably worth noting that (in this case) when I remove "yield", so
I only use concurrency with no stepper, the space-leak is also reduced,
but not completely.

Best regards
Tomasz


More information about the Haskell-Cafe mailing list