[Git][ghc/ghc][wip/thread-status] Add primop to list threads

Ben Gamari gitlab at gitlab.haskell.org
Sun Dec 6 19:12:15 UTC 2020



Ben Gamari pushed to branch wip/thread-status at Glasgow Haskell Compiler / GHC


Commits:
c15d33a6 by Ben Gamari at 2020-12-06T14:11:41-05:00
Add primop to list threads

A user came to #ghc yesterday wondering how best to check whether they
were leaking threads. We ended up using the eventlog but it seems to me
like it would be generally useful if Haskell programs could query their
own threads.

- - - - -


13 changed files:

- compiler/GHC/Builtin/primops.txt.pp
- compiler/GHC/StgToCmm/Prim.hs
- includes/rts/Threads.h
- includes/rts/storage/Closures.h
- includes/stg/MiscClosures.h
- libraries/base/GHC/Conc.hs
- libraries/base/GHC/Conc/Sync.hs
- libraries/base/tests/all.T
- + libraries/base/tests/listThreads.hs
- + libraries/base/tests/listThreads.stdout
- rts/PrimOps.cmm
- rts/RtsSymbols.c
- rts/Threads.c


Changes:

=====================================
compiler/GHC/Builtin/primops.txt.pp
=====================================
@@ -2684,6 +2684,13 @@ primop  ThreadStatusOp "threadStatus#" GenPrimOp
    out_of_line = True
    has_side_effects = True
 
+primop ListThreadsOp "listThreads#" GenPrimOp
+   State# RealWorld -> (# State# RealWorld, MutableArray# RealWorld a #)
+   { Returns an array of {\tt ThreadId#}s. }
+   with
+   out_of_line = True
+   has_side_effects = True
+
 ------------------------------------------------------------------------
 section "Weak pointers"
 ------------------------------------------------------------------------


=====================================
compiler/GHC/StgToCmm/Prim.hs
=====================================
@@ -1543,6 +1543,7 @@ emitPrimOp dflags primop = case primop of
   MkApUpd0_Op -> alwaysExternal
   NewBCOOp -> alwaysExternal
   UnpackClosureOp -> alwaysExternal
+  ListThreadsOp -> alwaysExternal
   ClosureSizeOp -> alwaysExternal
   GetApStackValOp -> alwaysExternal
   ClearCCSOp -> alwaysExternal


=====================================
includes/rts/Threads.h
=====================================
@@ -51,6 +51,10 @@ long    rts_getThreadId                  (StgPtr tso);
 void    rts_enableThreadAllocationLimit  (StgPtr tso);
 void    rts_disableThreadAllocationLimit (StgPtr tso);
 
+// Forward declarations, defined in Closures.h
+struct _StgMutArrPtrs;
+struct _StgMutArrPtrs *listThreads               (Capability *cap);
+
 #if !defined(mingw32_HOST_OS)
 pid_t  forkProcess     (HsStablePtr *entry);
 #else


=====================================
includes/rts/storage/Closures.h
=====================================
@@ -169,7 +169,7 @@ typedef struct {
     StgWord    payload[];
 } StgArrBytes;
 
-typedef struct {
+typedef struct _StgMutArrPtrs {
     StgHeader   header;
     StgWord     ptrs;
     StgWord     size; // ptrs plus card table


=====================================
includes/stg/MiscClosures.h
=====================================
@@ -455,6 +455,7 @@ RTS_FUN_DECL(stg_myThreadIdzh);
 RTS_FUN_DECL(stg_labelThreadzh);
 RTS_FUN_DECL(stg_isCurrentThreadBoundzh);
 RTS_FUN_DECL(stg_threadStatuszh);
+RTS_FUN_DECL(stg_listThreadszh);
 
 RTS_FUN_DECL(stg_mkWeakzh);
 RTS_FUN_DECL(stg_mkWeakNoFinalizzerzh);


=====================================
libraries/base/GHC/Conc.hs
=====================================
@@ -45,6 +45,7 @@ module GHC.Conc
         , yield
         , labelThread
         , mkWeakThreadId
+        , listThreads
 
         , ThreadStatus(..), BlockReason(..)
         , threadStatus


=====================================
libraries/base/GHC/Conc/Sync.hs
=====================================
@@ -52,6 +52,7 @@ module GHC.Conc.Sync
         , yield
         , labelThread
         , mkWeakThreadId
+        , listThreads
 
         , ThreadStatus(..), BlockReason(..)
         , threadStatus
@@ -524,6 +525,27 @@ runSparks = IO loop
                       then (# s', () #)
                       else p `seq` loop s'
 
+-- | List the Haskell threads of the current process.
+listThreads :: IO [ThreadId]
+listThreads = IO $ \s ->
+    case listThreads# s of
+      (# s', marr #) ->
+        case unsafeFreezeArray# marr s' of
+          (# s'', arr #) -> (# s'', mapListArray toThreadId arr #)
+  where
+    -- Ideally we would use UnliftedArray# but sadly this doesn't exist.
+    -- Instead listThreads# returns a polymorphic `Array# a` and we coerce.
+    toThreadId :: a -> ThreadId
+    toThreadId tid = ThreadId (unsafeCoerce# tid)
+
+mapListArray :: (a -> b) -> Array# a -> [b]
+mapListArray f arr = go 0#
+  where
+    go i#
+      | isTrue# (i# ==# sizeofArray# arr) = []
+      | otherwise = case indexArray# arr i# of
+                      (# x #) -> f x : go (i# +# 1#)
+
 data BlockReason
   = BlockedOnMVar
         -- ^blocked on 'MVar'


=====================================
libraries/base/tests/all.T
=====================================
@@ -261,3 +261,4 @@ test('T17499', [collect_stats('bytes allocated',5)], compile_and_run, ['-O -w'])
 test('T16643', normal, compile_and_run, [''])
 test('clamp', normal, compile_and_run, [''])
 test('T18642', extra_run_opts('+RTS -T -RTS'), compile_and_run, ['-O2'])
+test('listThreads', normal, compile_and_run, [''])


=====================================
libraries/base/tests/listThreads.hs
=====================================
@@ -0,0 +1,22 @@
+import Control.Concurrent
+import GHC.Conc.Sync
+
+dummyThread :: MVar () -> Int -> IO ()
+dummyThread mvar n = do
+  tid <- myThreadId
+  labelThread tid ("thread-"++show n)
+  readMVar mvar
+
+main :: IO ()
+main = do
+  mvar <- newEmptyMVar
+  let mkThread n = do
+        tid <- forkIO $ readMVar mvar
+        labelThread tid ("thread-"++show n)
+
+  mapM_ mkThread [0..100]
+  threads <- listThreads
+  -- TODO: Check labels
+  print $ length threads
+  putMVar mvar ()
+


=====================================
libraries/base/tests/listThreads.stdout
=====================================
@@ -0,0 +1 @@
+102


=====================================
rts/PrimOps.cmm
=====================================
@@ -2865,3 +2865,11 @@ stg_setThreadAllocationCounterzh ( I64 counter )
     StgTSO_alloc_limit(CurrentTSO) = counter + TO_I64(offset);
     return ();
 }
+
+stg_listThreadszh ()
+{
+  P_ arr;
+
+  ("ptr" arr) = ccall listThreads(MyCapability() "ptr");
+  return (arr);
+}


=====================================
rts/RtsSymbols.c
=====================================
@@ -685,6 +685,7 @@
       SymI_HasProto(stg_isCurrentThreadBoundzh)                         \
       SymI_HasProto(stg_isEmptyMVarzh)                                  \
       SymI_HasProto(stg_killThreadzh)                                   \
+      SymI_HasProto(stg_listThreadszh)                                  \
       SymI_HasProto(loadArchive)                                        \
       SymI_HasProto(loadObj)                                            \
       SymI_HasProto(purgeObj)                                           \


=====================================
rts/Threads.c
=====================================
@@ -844,6 +844,44 @@ loop:
     return true;
 }
 
+StgMutArrPtrs *listThreads(Capability *cap)
+{
+    // First count how many threads we have...
+    StgWord n_threads = 0;
+    for (unsigned g = 0; g < RtsFlags.GcFlags.generations; g++) {
+        for (StgTSO *t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
+            n_threads++;
+        }
+    }
+
+    // Allocate a suitably-sized array...
+    const StgWord size = n_threads + mutArrPtrsCardTableSize(n_threads);
+    StgMutArrPtrs *arr =
+        (StgMutArrPtrs *)allocate(cap, sizeofW(StgMutArrPtrs) + size);
+    TICK_ALLOC_PRIM(sizeofW(StgMutArrPtrs), n, 0);
+    arr->ptrs = n_threads;
+    arr->size = size;
+
+    // Populate it...
+    // N.B. we are guaranteed to see at least n_threads during this traversal
+    // as the only way that threads can be removed from the generations' thread lists
+    // is via garbage collection yet we are in an unsafe foreign call,
+    // precluding GC (as well as the sync phase of the non-moving collector).
+    StgWord i = 0;
+    for (unsigned g = 0; g < RtsFlags.GcFlags.generations; g++) {
+        for (StgTSO *t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
+            // It's possible that new threads have been created since we counted.
+            // Ignore them.
+            if (i == n_threads)
+                break;
+            arr->payload[i] = (StgClosure *) t;
+            i++;
+        }
+    }
+    CHECKM(i == n_threads, "listThreads: Found too few threads");
+    return arr;
+}
+
 /* ----------------------------------------------------------------------------
  * Debugging: why is a thread blocked
  * ------------------------------------------------------------------------- */



View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/commit/c15d33a6fe35f032812a877583b2e3e4a8850ae3

-- 
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/commit/c15d33a6fe35f032812a877583b2e3e4a8850ae3
You're receiving this email because of your account on gitlab.haskell.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/ghc-commits/attachments/20201206/8b235a15/attachment-0001.html>


More information about the ghc-commits mailing list