[Haskell-cafe] MVar considered harmful

Viktor Dukhovni ietf-dane at dukhovni.org
Fri Dec 28 22:25:04 UTC 2018


> On Dec 28, 2018, at 12:44 PM, Bertram Felgenhauer via Haskell-Cafe <haskell-cafe at haskell.org> wrote:
> 
> This is awkward to fix. Basically, when abandoning the lock before it
> has been released by the previous owner, we need a new thread to wait
> for the 'current' IVar and notify the 'next' one, since the current
> thread is being interrupted.

I think that work can be delegated to the waiting thread, by making
locks (really barriers) optionally chain to a parent barrier that
also needs to be waited for (recursively).  This is cheap, because
unless threads are actually interrupted, the chain is always one
deep.  When a thread is interrupted, the next thread will wait
for 2 barriers, ...

-- 
	Viktor.

module Main (main) where
import Control.Concurrent.MVar -- should be an IVar
import Control.Concurrent
import Control.Exception (bracket)
import Data.IORef

-- Really a recursive barrier
newtype Lock = Lock (MVar (Maybe Lock))
type Mutex = IORef Lock
type Chain = IORef (Maybe Lock)

newMutex :: IO Mutex
newMutex = Lock <$> newMVar Nothing >>= newIORef

withMutex :: Mutex -> IO a -> IO a
withMutex m =
    bracket swapShared signalPrivate . (\act -> (>> act) . waitChain . snd)
  where
    -- Return a new IORef containing the old barrier from the mutex, and a new
    -- barrier, that has been atomically swapped into the old mutex.
    swapShared :: IO (Lock, Chain)
    swapShared = Lock <$> newEmptyMVar >>= \b' ->
        atomicModifyIORef m (\b -> (b', b)) >>= \b ->
        newIORef (Just b) >>= \chain -> return (b', chain)

    signalPrivate :: (Lock, Chain) -> IO ()
    signalPrivate (Lock b, chain) = readIORef chain >>= putMVar b

    -- The last barrier that we were waiting on (if we're interrupted)
    -- will be left in our chain as a "continuation" for whoever
    -- next gets the mutex.  It may be already signalled by the time they
    -- see it, and that's OK.  On normal return it will be 'Nothing'.
    waitChain :: Chain -> IO ()
    waitChain c = readIORef c >>= go
      where
        go = mapM_ $ \(Lock a) -> readMVar a >>= \b -> writeIORef c b >> go b

mkThread :: Mutex -> String -> IO ThreadId
mkThread m name = do
    tid <- forkIO $ withMutex m $ do
        putStrLn $ unwords ["thread", name, "running"]
        threadDelay 200000
        putStrLn $ unwords ["thread", name, "stopping"]
    yield
    return tid

main :: IO ()
main = do
    m <- newMutex
    _ <- mkThread m "A"
    threadB <- mkThread m "B"
    _ <- mkThread m "C"
    killThread threadB
    threadDelay 1000000


More information about the Haskell-Cafe mailing list