[Haskell-cafe] Non-strict evaluation and concurrency (STM) : conflict?

Romain Demeyer romain.demeyer at gmail.com
Tue Sep 28 09:06:53 EDT 2010


Hi,

I'm working on Concurrent Haskell, especially with the monad STM. I don't
fully understand the way my program is executed. I think the lazy evaluation
leads to a loss of performance, if we don't pay attention to this problem. A
short example will be more explicit :

Imagine this scenario : we have a set of threads (the workers) that have
(each) a result to compute (purely). When finished, they try to save the
result in an shared inbox, using STM. If the inbox is full, the thread waits
until the inbox is empty.
A specific thread is looking at the inbox: when it finds a value in the
inbox, it prints the value on the screen (for example, it could be any
processing based on the value) and then empty the inbox and wait that a
remaining thread add a new value).


Let's a function "do_job", the function to execute by the threads (the
workers) :

do_job :: (b -> a) -> b -> Inbox a -> IO ()
do_job f input inbox = do { value <- return (f input)
                          ; atomically ( writeMsg inbox value ) }

The idea is : (f input) is the function to compute. Once compute, we want to
save the result atomically.
The problem is, because of the lazy evaluation, the "value" is computed in
the atomic section, and not before, resulting in a loss of efficiency.
Indeed, to be fast, a concurrent program has to keep the atomic sections as
"small" as possible, because it limits the parallelism.

To illustrate this, let's see this source code :


module Main where

import Control.Concurrent
import Control.Concurrent.STM
import Data.Maybe
import System.Random
import System.IO

{-- Inbox --}
type Inbox a = TVar (Maybe a)

createInbox ::  STM (Inbox a)
createInbox = newTVar Nothing

readMsg ::  Inbox a -> STM a
readMsg inbox = do { inboxContent <- readTVar inbox
                   ; if (isNothing inboxContent)
                     then retry
                     else do { writeTVar inbox Nothing
                             ; return (fromJust inboxContent) } }

writeMsg ::  Inbox a -> a -> STM ()
writeMsg inbox value = do { inboxContent <- readTVar inbox
                          ; if (isNothing inboxContent)
                            then writeTVar inbox (Just value)
                            else retry }

{-- Workers --}
*
do_job :: (b -> a) -> b -> Inbox a -> IO ()*
*do_job f input inbox = do { value <- return (f input) *
*                          ; atomically ( writeMsg inbox value ) }*

do_jobs_in_threads ::  [((b->a),b)] -> Inbox a -> TVar Int -> IO ()
do_jobs_in_threads [] _ _ = return ()
do_jobs_in_threads ((f,input):xs) inbox flag =
    do { forkIO_and_notify flag (do_job f input inbox)
       ; do_jobs_in_threads xs inbox flag }

{-- Caller --}
*
*
*caller ::  Inbox a -> (a -> IO ()) -> Int -> IO ()*
*caller _ _ 0 = return ()*
*caller inbox process n = do { msg <- atomically (readMsg inbox)*
*                            ; process msg *
*                            ; caller inbox process (n-1) }*

caller_in_thread flag inbox process n =
    forkIO_and_notify flag (caller inbox process n)

{-- forkIO with notification --}

create_flag = atomically ( newTVar 0 )

forkIO_and_notify :: TVar Int -> IO () -> IO ()
forkIO_and_notify tvar action =
    do { atomically ( do { oldValue <- readTVar tvar
                         ; writeTVar tvar (oldValue + 1) } )
       ; forkIO (do { action
                    ; atomically ( do { oldValue <- readTVar tvar
                                      ; writeTVar tvar (oldValue - 1) } ) }
)
       --; putStrLn "Tread lancé" }
       ; return () }

waitFlag flag = atomically ( do { valueflag <- readTVar flag
                                 ; if valueflag > 0 then retry else return
() } )

{-- main --}

main :: IO ()
main = do { flag <- create_flag
          ; inbox <- atomically (createInbox)
          ; *caller_in_thread flag inbox (\x -> putStrLn ("Caller : "++
(show (x)))) 3*
*          ;
do_jobs_in_threads [(perm,[1..11]),(perm,[1..8]),(perm,[1..3])] inbox flag*
          ; waitFlag flag
          ;  return () }
where perm (l:ls) = injectett l (perm ls)
      injectett x (l:ls) = injecte x l ++ injectett x ls
      injecte x (l:ls) = [x:l:ls]++map (l:) (injecte x ls)
      inputs = zip (replicate 3 f) [[1..11],[1..8],[1..3]]

As you can see, we ask for 3 threads to compute permutations for [1..11],
[1..8] and [1..3]. The "Caller" write a message when a thread finished. What
we expect is that the second and third thread finish their work before the
first one. But the output of this program is :

*Caller : 39916800*
*Caller : 40320*
*Caller : 6*

... which means that threads 2 and 3 have to wait the first thread before
being able to save (and probably compute) their own result.

If I force to evaluate "value" before the atomic section, by defining :

do_job :: (b -> a) -> b -> Inbox a -> IO ()
do_job f input inbox = do { value <- return (f input)
                          ; value `seq` atomically ( writeMsg inbox value )
}

Then I obtain a more efficient program, as the output confirms :

*Caller : 6*
*Caller : 40320*
*Caller : 39916800*

That's what we want, but what is the explanation of this behavior? STM is
designed to be optimistic, not blocking. So, does it means that the "value"
is evaluated at "commit-time"?
Do you know some problems that are related or do you know some works that
can be useful at this subject?

Thanks for your help,

rde
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://www.haskell.org/pipermail/haskell-cafe/attachments/20100928/951dc43c/attachment.html


More information about the Haskell-Cafe mailing list