Different QSem(N) (was Re: IORef vs TVar performance: 6 seconds
versus 4 minutes)
ChrisK
haskell at list.mightyreason.com
Mon Dec 29 09:24:00 EST 2008
I think I can improve on your code.
Bertram Felgenhauer wrote:
> But why does it manually manage the waiters at all? MVars are fair, in
> ghc at least. So this should work:
>
> data Sem = Sem (MVar Int) (MVar Int)
I changed the above to be a data
>
> newSem :: Int -> IO Sem
> newSem initial = liftM2 Sem (newMVar initial) newEmptyMVar
>
> -- | Wait for a unit to become available
> waitSem :: Sem -> IO ()
> waitSem (Sem sem wakeup) = do
> avail' <- modifyMVar sem (\avail -> return (avail-1, avail-1))
Threads can get out of order at this point. This "order bug" may be
undesirable. Also, killing the thread while it waits for "wakeup" below would be
bad. You need an exception handler and some kind of cleanup.
> when (avail' < 0) $ takeMVar wakeup >>= putMVar sem
> -- | Signal that a unit of the 'Sem' is available
> signalSem :: Sem -> IO ()
> signalSem (Sem sem wakeup) = do
> avail <- takeMVar sem
> if avail < 0 then putMVar wakeup (avail+1)
> else putMVar sem (avail+1)
You should change this from "= do" to "= block $ do".
>
> (I should turn this into a library proposal.)
>
> Bertram
If you do not need to take N at a time then the untested code below has no
"order bug" and is fair.
> module Sem where
>
> import Control.Concurrent.MVar
> import Control.Monad(when,liftM2)
>
> data Sem = Sem { avail :: MVar Int -- ^ provides fast path and fair queue
> , lock :: MVar () } -- ^ Held while signalling the queue
>
> -- It makes no sense here to initialize with a negative number, so
> -- this is treated the same as initializing with 0.
> newSem :: Int -> IO Sem
> newSem init | init < 1 = liftM2 Sem newEmptyMVar (newMVar ())
> | otherwise = liftM2 Sem (newMVar init) (newMVar ())
>
> waitSem :: Sem -> IO ()
> waitSem (Sem sem _) = block $ do
> avail <- takeMVar sem
> when (avail > 1) (signalSemN (pred avail))
>
> signalSem :: Sem -> IO ()
> signalSem = signalSemN 1
>
> signalSemN :: Int -> Sem -> IO ()
> signalSemN i (Sem sem lock) | i <= 1 = return ()
> | otherwise =
> withMVar lock $ \ _ -> block $ do
> old <- tryTakeMVar sem
> case old of
> Nothing -> putMVar sem i
> Just v -> putMVar sem $! succ i
All waitSem block in arrival order with the takeMVar in waitSem. The signalSemN
avoid conflicting by serializing on the "MVar ()" lock. The above is quite
fast so long as the semaphore holds no more than the value 1. Once it hold more
than 1 the waiter must take time to add back the remaining value.
Note that once threads are woken up in order, they may still go out of order
blocking for the () lock when adding back the remaining value (in the presence
of other signalers).
The above is also exception safe. The only place it can die is during the
takeMVar and this merely remove a blocked waiter.
I see no way to add a fair waitSemN without changing Sem. But if I change Sem
then I can make a fair waitSemN. The untested code is below:
> module Sem where
>
> import Control.Concurrent.MVar
> import Control.Monad(when,liftM3)
> import Control.Exception.Base
>
> data Sem = Sem { semWait :: MVar () -- for serializing waiting threads
> , semAvail :: MVar Int -- positive quantity available
> , semSignal :: MVar () -- for serializing signaling threads
> }
>
>
> newSem i | i<=0 = liftM3 Sem (newMVar ()) newEmptyMVar (newMVar ())
> | otherwise = liftM3 Sem (newMVar ()) (newMVar i) (newMVar ())
>
> waitSem :: Sem -> IO ()
> waitSem = waitSemN 1
>
> waitSemN :: Int -> Sem -> IO ()
> waitSemN i sem@(Sem w a s) | i<=0 = return ()
> | otherwise = withMVar w $ \ _ -> block $ do
> let go n = do
> avail <- onException (takeMVar a) (signalSemN (i-n) sem)
> case compare avail n of
> LT -> go $! n-avail
> EQ -> return ()
> GT -> signalSemN (avail-n) sem
> go i
>
> signalSem :: Sem -> IO ()
> signalSem = signalSemN 1
>
> signalSemN :: Int -> Sem -> IO ()
> signalSemN i (Sem _ a s) | i<=0 = return ()
> | otherwise = withMVar s $ \ _ -> block $ do
> ma <- tryTakeMVar a
> case ma of Nothing -> putMVar a i
> Just v -> putMVar a $! v+i
Trying for exception safety makes the above slightly tricky.
It works by allowing only a single thread to get the semWait lock. This keeps
all the arriving threads in the fair blocking queue for the semWait lock. The
holder of the semWait lock then nibbles at semAvail's positive value until it is
satisfied. Excess value is added back safely with signalSemN.
Cheers,
Chris Kuklewicz
More information about the Libraries
mailing list