black hole detection and concurrency

Bertram Felgenhauer bertram.felgenhauer at
Sat Dec 27 09:02:46 EST 2008

Sterling Clover wrote:
> I have a good theory on the latter symptom (the "thread killed" message). 
> Sticking in some traces, as in my appended code, helped me to see what's 
> going on. It seems to be exactly what you describe -- the variable v is 
> permanently bound to the exception it "evaluates" to.  Since the right hand 
> True portion of the unamb evaluates more quickly, the spawned threads are 
> killed and the left hand (the v) "evaluates" to "thread killed". This 
> remains the value of its thunk when you access it later.

Thank you for that analysis. It's intriguing. Here's a cut down example:

> import Control.Concurrent
> import Control.Concurrent.MVar
> import Control.Exception
> import System.IO.Unsafe
> main :: IO ()
> main = do
>     v <- newEmptyMVar
>     let y = print "a" >> takeMVar v >> print "b"
>         x = unsafePerformIO $ y `finally` return ()
>     tid <- forkIO $ evaluate x
>     yield
>     print "killing"
>     killThread tid
>     putMVar v ()
>     yield
>     print "finally"
>     print x

  test: thread killed

Interestingly, the program works without the `finally` part, suspending
the IO action y in the middle:

Output without `finally`:

The `unamb` operator needs both behaviours: it has to kill its worker
threads if it turns out that its value is not yet needed, but it also
has to be suspended to allow it to be restarted later.

*after some digging in the ghc sources*

It may be possible to do the restarting manually:

>    let y = catchJust threadKilled
>                      (print "a" >> takeMVar v >> print "b")
>                      (\_ -> myThreadId >>= killThread >> y)
>        threadKilled ThreadKilled = Just ()
>        threadKilled _            = Nothing

(for ghc 6.8 use  threadKilled (AsyncException ThreadKilled) = Just ())


The key part here is 'myThreadId >>= killThread' which throws an
asynchronous exception to the thread itself, causing the update
frames to be saved on the heap.

Note that 'myThreadId >>= killThread' is not equivalent to
'throw ThreadKilled'; it is a synchronous exception and replaces thunks
pointed to by the update frames by another call to the raise primitive -
the result being that the exception gets rethrown whenever such a thunk
is evaluated. This happens with 'finally' and 'bracket': they use
'throw' for re-throwing the exception.

See rts/RaiseAsync.c (raiseAsync() in particular) for the gory details
for the first case, and rts/Schedule.c, raiseExceptionHelper() for the
second case.

In the above code, there is a small window between catching the
ThreadKilled exception and throwing it again though, where other
exceptions may creep in. The only way I see of fixing that is to use
'block' and 'unblock' directly.

Here is an attempt at the 'race' function, using block, catch and
unblock. I'm not sure that it's correct. But it seems to work with
Sterling's example at least, which triggers a restart.

> race :: IO a -> IO a -> IO a
> race a b = block $ do
>     v <- newEmptyMVar
>     let t x = x >>= putMVar v
>     ta <- forkIO (t a)
>     tb <- forkIO (t b)
>     let cleanup = killThread ta >> killThread tb
>     unblock (do r <- takeMVar v; cleanup; return r)
>         `catch` \e -> case e of
>             ThreadKilled -> do
>                 cleanup
>                 myThreadId >>= killThread
>                 unblock (race a b)
>             e -> throwIO e


More information about the Glasgow-haskell-users mailing list