RFC: termination detection for STM

Simon Marlow simonmarhaskell at gmail.com
Wed Feb 14 05:04:32 EST 2007


Perhaps I'm missing something, but doesn't GHC already detect the kind of 
deadlock you're talking about here?  When a thread is blocked and cannot be 
woken up, it is sent the BlockedOnDeadMVar exception.  It's more precise than 
the extension you propose, because the GC is used to check which threads are 
unreachable and therefore cannot be woken up, so it can detect mutual-deadlock 
between two threads in a system that contains other running threads.

If I've misunderstood, please let me know.  Maybe you could knock up a quick 
example program of the kind of deadlock you want to detect, and see what GHC 
currently does?

Cheers,
	Simon

Michael Stahl wrote:
> 
> last week i have implemented termination detection for the ghc runtime.
> my motivation was a parallel interpreter for a concurrent constraint
> language which i have implemented using STM.  this interpreter has reached
> its successful final state iff none the stm transactions it has spawned
> make progress anymore, i.e. all threads are in state BlockedOnSTM.  well,
> it turned out that this state is rather difficult to detect.  it could be
> possible to use unsafePerformIO from within an atomically block with an
> idempotent io action such that there is some thread which counts the
> number of threads which are blocked on stm, but i could not come up with
> something that is free of race conditions.  mainly this is because with
> stm, it is impossible to know (in haskell code) which threads are woken up
> by a committed stm transaction.
> 
> well, there is something that would work: a wave-based distributed
> termination algorithm.  but that would be very inefficient: every thread
> needs to be woken up, and there is still the issue of finding the right
> time to send out a detection wave: too late, and the user gets bored; too
> early, and there is even more overhead.
> 
> of course, the ghc runtime knows which threads are woken up, so i thought
> it should be possible to implement the termination detection there.
> and also, this should be more reliable than the ugly hack i had before (a
> master thread with a timeout that is reset on every stm commit, by
> throwing an exception).
> 
> so how can the ghc runtime detect termination?
> it is quite simple: we need to add a counter somewhere that is incremented
> every time a thread that is BlockedOnSTM is woken up, and decremented
> every time a thread goes into the BlockedOnSTM state (via retry).
> but just having a single counter has a drawback: it might become a
> scalability bottleneck.
> so i have extended the StgTSO struct with two fields: a counter, and a
> parent pointer.
> the parent pointer points to the TSO that spawned the thread.
> the counter field of a TSO counts the threads which are children of 
> this thread (non-transitively!) and have not terminated yet.
> invariant: the counter is always >= 0, and == 0 iff the subtree rooted at
> this thread has terminated.
> 
> conceptually, we have to modify the counter at the following events:
> - fork: the forking thread's counter is incremented
>         the forked thread's counter is initialized to 1
> - exit: the exiting thread's counter is decremented
> - retry: the retrying thread's counter is decremented
> - wakeup (stm): the counter of the woken thread is incremented
> increment means: add 1; if new value is 1: recursively increment parent
> decrement means: sub 1; if new value is 0: recursively decrement parent
>                         if there is no parent, signal termination
> 
> of course, it has to be guaranteed that increments always arrive at the
> root before the corresponding decrements, otherwise termination may be
> detected prematurely.
> note that termination can only be signalled for a thread which has already
> exited, or which has called GHC.Conc.awaitTermination (described below).
> 
> there are two added primitive operations:
> - counterDec# decrements the calling thread's counter
>   and also sets the parent pointer to NULL (so the calling thread becomes
>   the root of a thread tree for which termination will be detected)
> - counterInc# just increments the calling thread's counter
>   (cancels the effect of counterDec)
> 
> these primitives are meant to be called from a single place:
> awaitTermination, which is in GHC.Conc.
> it calls counterDec#, then waits for the exception with a delay.
> afterwards, it just calls counterInc#.
> 
> the termination is signalled by throwing a Deadlock exception to the root
> of the thread tree.  actually, i believe the termination condition is a
> livelock, but there is no constructor for that.
> 
> note that there may be several independent subcomputations within a single
> haskell process for which termination can be detected with this approach.
> 
> the main drawbacks of this code:
> - the locking overhead (counter updates)
> - because of the parent pointers, non-leaf threads will never be garbage
>   collected if any child thread is still alive.
> this could be alleviated by only enabling the termination detection if the
> program specifically requests it, e.g. some global flag variable which is
> set by another primitive operation.
> 
> i would welcome a review of the code; this is the first time i have hacked
> on ghc (and also the first non-trivial C code i have written in years), so
> there may be issues and interactions with other parts of the runtime that
> i have not anticipated. so far, i have tested it only with -threaded
> -debug, but it seems to work with -N32 (on a single processor, that is all
> i have).
> do you think that others might be interested in this functionality?
> could it be included in ghc?
> 
> also, something that i really do not understand: in the counterDec#
> primitive, the stack_size field of the StgTSO struct is corrupted for
> no apparent reason, so i save it onto the stack and then restore it.
> none of the other primitive ops seem to do something similar.
> what am i doing wrong?
> 
> patch is against ghc 6.6 (actually the debian package 6.6-3, but i hope
> those weird arm floating point endianness patches don't matter much :) ).
> 
>         michael stahl
> 
> 
> 
> 
> ------------------------------------------------------------------------
> 
> diff -ru /tmp/ghc-6.6/compiler/prelude/primops.txt.pp ghc-6.6/compiler/prelude/primops.txt.pp
> --- /tmp/ghc-6.6/compiler/prelude/primops.txt.pp	2006-10-10 21:03:47.000000000 +0200
> +++ ghc-6.6/compiler/prelude/primops.txt.pp	2007-02-06 15:08:52.000000000 +0100
> @@ -1477,6 +1477,18 @@
>     with
>     out_of_line = True
>  
> +primop  CounterIncOp "counterInc#" GenPrimOp
> +   State# RealWorld -> State# RealWorld
> +   with
> +   has_side_effects = True
> +   out_of_line      = True
> +
> +primop  CounterDecOp "counterDec#" GenPrimOp
> +   State# RealWorld -> State# RealWorld
> +   with
> +   has_side_effects = True
> +   out_of_line      = True
> +
>  ------------------------------------------------------------------------
>  section "Weak pointers"
>  ------------------------------------------------------------------------
> diff -ru /tmp/ghc-6.6/includes/mkDerivedConstants.c ghc-6.6/includes/mkDerivedConstants.c
> --- /tmp/ghc-6.6/includes/mkDerivedConstants.c	2006-10-10 21:03:51.000000000 +0200
> +++ ghc-6.6/includes/mkDerivedConstants.c	2007-02-06 20:37:17.000000000 +0100
> @@ -283,6 +283,8 @@
>      closure_field(StgTSO, saved_errno);
>      closure_field(StgTSO, trec);
>      closure_field(StgTSO, flags);
> +    closure_field(StgTSO, parent);
> +    closure_field(StgTSO, counter);
>      closure_field_("StgTSO_CCCS", StgTSO, prof.CCCS);
>      tso_field(StgTSO, sp);
>      tso_field_offset(StgTSO, stack);
> diff -ru /tmp/ghc-6.6/includes/StgMiscClosures.h ghc-6.6/includes/StgMiscClosures.h
> --- /tmp/ghc-6.6/includes/StgMiscClosures.h	2006-10-10 21:03:49.000000000 +0200
> +++ ghc-6.6/includes/StgMiscClosures.h	2007-02-06 14:55:16.000000000 +0100
> @@ -590,6 +590,8 @@
>  RTS_FUN(myThreadIdzh_fast);
>  RTS_FUN(labelThreadzh_fast);
>  RTS_FUN(isCurrentThreadBoundzh_fast);
> +RTS_FUN(counterInczh_fast);
> +RTS_FUN(counterDeczh_fast);
>  
>  RTS_FUN(mkWeakzh_fast);
>  RTS_FUN(finalizzeWeakzh_fast);
> diff -ru /tmp/ghc-6.6/includes/TSO.h ghc-6.6/includes/TSO.h
> --- /tmp/ghc-6.6/includes/TSO.h	2006-10-10 21:03:50.000000000 +0200
> +++ ghc-6.6/includes/TSO.h	2007-02-13 02:07:44.000000000 +0100
> @@ -160,6 +160,9 @@
>      StgTSODistInfo dist;
>  #endif
>  
> +    StgWord32          counter;
> +    struct StgTSO_*    parent;
> +
>      /* The thread stack... */
>      StgWord32	       stack_size;     /* stack size in *words* */
>      StgWord32          max_stack_size; /* maximum stack size in *words* */
> diff -ru /tmp/ghc-6.6/libraries/base/GHC/Conc.lhs ghc-6.6/libraries/base/GHC/Conc.lhs
> --- /tmp/ghc-6.6/libraries/base/GHC/Conc.lhs	2006-10-10 21:08:04.000000000 +0200
> +++ ghc-6.6/libraries/base/GHC/Conc.lhs	2007-02-12 20:37:00.000000000 +0100
> @@ -36,6 +36,7 @@
>  	, pseq 		-- :: a -> b -> b
>  	, yield         -- :: IO ()
>  	, labelThread	-- :: ThreadId -> String -> IO ()
> +	, awaitTermination -- :: IO ()
>  
>  	-- * Waiting
>  	, threadDelay	  	-- :: Int -> IO ()
> @@ -98,7 +99,8 @@
>  import GHC.Num		( Num(..) )
>  import GHC.Real		( fromIntegral, quot )
>  import GHC.Base		( Int(..) )
> -import GHC.Exception    ( catchException, Exception(..), AsyncException(..) )
> +import GHC.Exception    ( catchException, Exception(..), AsyncException(..),
> +                          unblock )
>  import GHC.Pack		( packCString# )
>  import GHC.Ptr          ( Ptr(..), plusPtr, FunPtr(..) )
>  import GHC.STRef
> @@ -246,6 +248,16 @@
>  yield = IO $ \s -> 
>     case (yield# s) of s1 -> (# s1, () #)
>  
> +awaitTermination :: IO ()
> +awaitTermination =
> +        catchException 
> +                (counterDec >> unblock (threadDelay 2147483647))
> +                (\e -> counterInc >> case e of Deadlock -> return ()
> +                                               _        -> throw e)
> +    where
> +        counterDec = IO (\s -> case counterDec# s of s' -> (# s', () #) )
> +        counterInc = IO (\s -> case counterInc# s of s' -> (# s', () #) )
> +
>  {- | 'labelThread' stores a string as identifier for this thread if
>  you built a RTS with debugging support. This identifier will be used in
>  the debugging output to make distinction of different threads easier
> diff -ru /tmp/ghc-6.6/rts/GC.c ghc-6.6/rts/GC.c
> --- /tmp/ghc-6.6/rts/GC.c	2006-10-10 21:03:51.000000000 +0200
> +++ ghc-6.6/rts/GC.c	2007-02-12 17:25:48.000000000 +0100
> @@ -2642,7 +2642,11 @@
>  
>      // scavange current transaction record
>      tso->trec = (StgTRecHeader *)evacuate((StgClosure *)tso->trec);
> -    
> +
> +    if (tso->parent) {
> +            tso->parent = (StgTSO *)evacuate((StgClosure *)tso->parent);
> +    }
> +
>      // scavenge this thread's stack 
>      scavenge_stack(tso->sp, &(tso->stack[tso->stack_size]));
>  }
> @@ -4175,6 +4179,7 @@
>     * that starts with an activation record. 
>     */
>  
> +  ASSERT (p <= stack_end);
>    while (p < stack_end) {
>      info  = get_ret_itbl((StgClosure *)p);
>        
> diff -ru /tmp/ghc-6.6/rts/HSprel.def ghc-6.6/rts/HSprel.def
> --- /tmp/ghc-6.6/rts/HSprel.def	2006-10-10 21:03:47.000000000 +0200
> +++ ghc-6.6/rts/HSprel.def	2007-02-06 20:08:28.000000000 +0100
> @@ -24,5 +24,6 @@
>  PrelIOBase_BlockedOnDeadMVar_closure
>  PrelIOBase_BlockedIndefinitely_closure
>  PrelIOBase_NonTermination_closure
> +PrelIOBase_Deadlock_closure
>  PrelWeak_runFinalizzerBatch_closure
>  __stginit_Prelude
> diff -ru /tmp/ghc-6.6/rts/Linker.c ghc-6.6/rts/Linker.c
> --- /tmp/ghc-6.6/rts/Linker.c	2006-10-10 21:03:52.000000000 +0200
> +++ ghc-6.6/rts/Linker.c	2007-02-06 15:05:29.000000000 +0100
> @@ -493,6 +493,8 @@
>        SymX(cmpIntegerzh_fast)	        	\
>        SymX(cmpIntegerIntzh_fast)	      	\
>        SymX(complementIntegerzh_fast)		\
> +      SymX(counterDeczh_fast)		        \
> +      SymX(counterInczh_fast)		        \
>        SymX(createAdjustor)			\
>        SymX(decodeDoublezh_fast)			\
>        SymX(decodeFloatzh_fast)			\
> diff -ru /tmp/ghc-6.6/rts/Prelude.h ghc-6.6/rts/Prelude.h
> --- /tmp/ghc-6.6/rts/Prelude.h	2006-10-10 21:03:50.000000000 +0200
> +++ ghc-6.6/rts/Prelude.h	2007-02-06 20:06:00.000000000 +0100
> @@ -41,6 +41,7 @@
>  PRELUDE_CLOSURE(base_GHCziIOBase_BlockedIndefinitely_closure);
>  PRELUDE_CLOSURE(base_GHCziIOBase_NonTermination_closure);
>  PRELUDE_CLOSURE(base_GHCziIOBase_NestedAtomically_closure);
> +PRELUDE_CLOSURE(base_GHCziIOBase_Deadlock_closure);
>  
>  #if !defined(mingw32_HOST_OS)
>  PRELUDE_CLOSURE(base_GHCziConc_ensureIOManagerIsRunning_closure);
> @@ -93,6 +94,7 @@
>  #define BlockedIndefinitely_closure (&base_GHCziIOBase_BlockedIndefinitely_closure)
>  #define NonTermination_closure    (&base_GHCziIOBase_NonTermination_closure)
>  #define NestedAtomically_closure  (&base_GHCziIOBase_NestedAtomically_closure)
> +#define Deadlock_closure          (&base_GHCziIOBase_Deadlock_closure)
>  
>  #define Czh_static_info           (&base_GHCziBase_Czh_static_info)
>  #define Fzh_static_info           (&base_GHCziFloat_Fzh_static_info)
> diff -ru /tmp/ghc-6.6/rts/PrimOps.cmm ghc-6.6/rts/PrimOps.cmm
> --- /tmp/ghc-6.6/rts/PrimOps.cmm	2006-10-10 21:03:51.000000000 +0200
> +++ ghc-6.6/rts/PrimOps.cmm	2007-02-13 02:08:55.000000000 +0100
> @@ -947,6 +947,23 @@
>    RET_N(r);
>  }
>  
> +counterInczh_fast
> +{
> +  foreign "C" incrementCounter(CurrentTSO "ptr", 0/*FALSE*/) [];
> +  jump %ENTRY_CODE(Sp(0));
> +}
> +
> +counterDeczh_fast
> +{
> +  W_ sz;
> +  sz = StgTSO_stack_size(CurrentTSO);
> +  StgTSO_parent(CurrentTSO) = NULL;
> +  foreign "C" decrementCounter(MyCapability() "ptr", CurrentTSO "ptr") [];
> +  /* FIXME: why does stack_size get corrupted? */
> +  StgTSO_stack_size(CurrentTSO) = sz;
> +  jump %ENTRY_CODE(Sp(0));
> +}
> +
>  
>  /* -----------------------------------------------------------------------------
>   * TVar primitives
> diff -ru /tmp/ghc-6.6/rts/RaiseAsync.c ghc-6.6/rts/RaiseAsync.c
> --- /tmp/ghc-6.6/rts/RaiseAsync.c	2006-10-10 21:03:51.000000000 +0200
> +++ ghc-6.6/rts/RaiseAsync.c	2007-02-12 18:24:18.000000000 +0100
> @@ -730,6 +730,7 @@
>      // perhaps have a debugging test to make sure that this really
>      // happens and that the 'zombie' transaction does not get
>      // committed.
> +    incrementCounter(tso, rtsFalse);
>      goto done;
>  
>    case BlockedOnMVar:
> diff -ru /tmp/ghc-6.6/rts/Schedule.c ghc-6.6/rts/Schedule.c
> --- /tmp/ghc-6.6/rts/Schedule.c	2006-10-10 21:03:51.000000000 +0200
> +++ ghc-6.6/rts/Schedule.c	2007-02-13 02:02:07.000000000 +0100
> @@ -1798,9 +1798,91 @@
>   * Handle a thread that returned to the scheduler with ThreadFinished
>   * -------------------------------------------------------------------------- */
>  
> +void decrementCounter(Capability *cap, StgTSO *t) {
> +    StgTSO *tso = t;
> +    StgTSO *par;
> +    StgWord32 ctr;
> +    int tmp;
> +    lockTSO(t);
> +    ASSERT (t->counter > 0);
> +    ctr = --t->counter;
> +    unlockTSO(t);
> +    debugTrace(DEBUG_sched,
> +                        "Decrementing counter for thread %lu (%lu)",
> +                        (unsigned long)t->id, (unsigned long)ctr);
> +    /* recursively decrement counters */
> +    while (ctr == 0) {
> +            par = tso->parent;
> +            if (par) { /* not root */
> +                    while (par->what_next == ThreadRelocated) {
> +                            par = par->link;
> +                            tso->parent = par;
> +                    }
> +                    tso = par;
> +                    lockTSO(tso);
> +                    ASSERT (tso->counter > 0);
> +                    ctr = --tso->counter;
> +                    unlockTSO(tso);
> +                    debugTrace(DEBUG_sched,
> +                        "Decrementing counter for thread %lu (%lu)",
> +                        (unsigned long)tso->id, (unsigned long)ctr);
> +            } else { /* root: termination */
> +                    /* NB: if tso == t, the root thread has exited
> +                     *     throwTo seems to handle that case, so we do not */
> +                    debugTrace(DEBUG_sched,
> +                        "Detected termination for thread %lu (%s)",
> +                        (unsigned long)tso->id, whatNext_strs[tso->what_next]);
> +                    tmp = throwTo(cap, t, tso,
> +                                (StgClosure *)Deadlock_closure, NULL);
> +                    /* we assume that the calling thread will delay */
> +                    ASSERT (tmp == THROWTO_SUCCESS); //FIXME wrong wrong wrong
> +                    break;
> +            }
> +    }
> +}
> +
> +void incrementCounter (StgTSO *t, rtsBool t_is_locked) {
> +    StgTSO *tso = t;
> +    StgTSO *par;
> +    StgWord32 ctr;
> +    if (!t_is_locked) lockTSO(t);
> +    ASSERT (t->counter >= 0);
> +    ctr = ++t->counter;
> +    if (!t_is_locked) unlockTSO(t);
> +    debugTrace(DEBUG_sched,
> +                        "Incrementing counter for thread %lu (%lu)",
> +                        (unsigned long)t->id, (unsigned long)ctr);
> +    while (ctr == 1) {
> +            par = tso->parent;
> +            if (par) { /* not root */
> +                    while (par->what_next == ThreadRelocated) {
> +                            par = par->link;
> +                            tso->parent = par;
> +                    }
> +                    tso = par;
> +                    lockTSO(tso);
> +                    ASSERT (tso->counter >= 0);
> +                    ctr = ++tso->counter;
> +                    unlockTSO(tso);
> +                    debugTrace(DEBUG_sched,
> +                        "Incrementing counter for thread %lu (%lu)",
> +                        (unsigned long)tso->id, (unsigned long)ctr);
> +            } else {
> +                    /* this can be caused by the root thread after it
> +                     * received the Deadlock exception */
> +                    debugTrace(DEBUG_sched,
> +                        "Deadlock broken for thread %lu (%s)",
> +                        (unsigned long)tso->id, whatNext_strs[tso->what_next]);
> +                    break;
> +            }
> +    }
> +}
> +
>  static rtsBool
>  scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
>  {
> +    decrementCounter(cap, t);
> +
>      /* Need to check whether this was a main thread, and if so,
>       * return with the return value.
>       *
> diff -ru /tmp/ghc-6.6/rts/Schedule.h ghc-6.6/rts/Schedule.h
> --- /tmp/ghc-6.6/rts/Schedule.h	2006-10-10 21:03:49.000000000 +0200
> +++ ghc-6.6/rts/Schedule.h	2007-02-12 18:22:04.000000000 +0100
> @@ -13,6 +13,10 @@
>  #include "OSThreads.h"
>  #include "Capability.h"
>  
> +/* termination detection counters */
> +extern void decrementCounter(Capability *cap, StgTSO *t);
> +extern void incrementCounter(StgTSO *t, rtsBool t_is_locked);
> +
>  /* initScheduler(), exitScheduler()
>   * Called from STG :  no
>   * Locks assumed   :  none
> diff -ru /tmp/ghc-6.6/rts/STM.c ghc-6.6/rts/STM.c
> --- /tmp/ghc-6.6/rts/STM.c	2006-10-10 21:03:49.000000000 +0200
> +++ ghc-6.6/rts/STM.c	2007-02-12 17:34:30.000000000 +0100
> @@ -320,10 +320,11 @@
>  
>  // Helper functions for thread blocking and unblocking
>  
> -static void park_tso(StgTSO *tso) {
> +static void park_tso(Capability *cap, StgTSO *tso) {
>    ASSERT(tso -> why_blocked == NotBlocked);
>    tso -> why_blocked = BlockedOnSTM;
>    tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
> +  decrementCounter(cap, tso);
>    TRACE("park_tso on tso=%p\n", tso);
>  }
>  
> @@ -1063,7 +1064,7 @@
>      // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
>      // in the TSO, (c) TREC_WAITING in the Trec.  
>      build_wait_queue_entries_for_trec(cap, tso, trec);
> -    park_tso(tso);
> +    park_tso(cap, tso);
>      trec -> state = TREC_WAITING;
>  
>      // We haven't released ownership of the transaction yet.  The TSO
> @@ -1109,7 +1110,7 @@
>      // The transaction remains valid -- do nothing because it is already on
>      // the wait queues
>      ASSERT (trec -> state == TREC_WAITING);
> -    park_tso(tso);
> +    park_tso(cap, tso);
>      revert_ownership(trec, TRUE);
>    } else {
>      // The transcation has become invalid.  We can now remove it from the wait
> diff -ru /tmp/ghc-6.6/rts/Threads.c ghc-6.6/rts/Threads.c
> --- /tmp/ghc-6.6/rts/Threads.c	2006-10-10 21:03:50.000000000 +0200
> +++ ghc-6.6/rts/Threads.c	2007-02-12 20:46:41.000000000 +0100
> @@ -104,6 +104,16 @@
>      tso->saved_errno = 0;
>      tso->bound = NULL;
>      tso->cap = cap;
> +
> +    /* the main thread has no parent; hope this will detect that */
> +    if (cap->in_haskell == rtsTrue) {
> +            tso->counter = 1; /* thread itself is runnable */
> +            tso->parent = cap->r.rCurrentTSO;
> +            incrementCounter(tso->parent, rtsFalse);
> +    } else {
> +            tso->counter = 1;
> +            tso->parent = NULL;
> +    }
>      
>      tso->stack_size     = stack_size;
>      tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
> @@ -488,6 +498,11 @@
>    // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
>    ASSERT(tso->why_blocked != NotBlocked);
>  
> +  if (tso->why_blocked == BlockedOnSTM) {
> +        /* NB: tso _must_ be locked at this point */
> +        incrementCounter(tso, rtsTrue);
> +  }
> +
>    tso->why_blocked = NotBlocked;
>    next = tso->link;
>    tso->link = END_TSO_QUEUE;
> 
> 
> ------------------------------------------------------------------------
> 
> _______________________________________________
> Glasgow-haskell-users mailing list
> Glasgow-haskell-users at haskell.org
> http://www.haskell.org/mailman/listinfo/glasgow-haskell-users



More information about the Glasgow-haskell-users mailing list