[Git][ghc/ghc][wip/gc/parallel-marking] nonmoving: Parallel marking?

Ben Gamari gitlab at gitlab.haskell.org
Tue Aug 11 02:25:29 UTC 2020



Ben Gamari pushed to branch wip/gc/parallel-marking at Glasgow Haskell Compiler / GHC


Commits:
cfd77aba by Ben Gamari at 2020-08-10T22:25:22-04:00
nonmoving: Parallel marking?

- - - - -


3 changed files:

- rts/sm/NonMoving.c
- rts/sm/NonMovingMark.c
- rts/sm/NonMovingMark.h


Changes:

=====================================
rts/sm/NonMoving.c
=====================================
@@ -714,6 +714,7 @@ void nonmovingInit(void)
         nonmovingHeap.allocators[i] = alloc_nonmoving_allocator(n_capabilities);
     }
     nonmovingMarkInitUpdRemSet();
+    nonmovingInitMarkState();
 }
 
 // Stop any nonmoving collection in preparation for RTS shutdown.
@@ -915,9 +916,9 @@ void nonmovingCollect(StgWeak **dead_weaks, StgTSO **resurrected_threads)
     ASSERT(nonmoving_marked_compact_objects == NULL);
     ASSERT(n_nonmoving_marked_compact_blocks == 0);
 
-    MarkQueue *mark_queue = stgMallocBytes(sizeof(MarkQueue), "mark queue");
-    initMarkQueue(mark_queue);
-    current_mark_queue = mark_queue;
+    // First initialize a MarkQueue for the leader thread:
+    startMarkThreads(1);
+    MarkQueue *mark_queue = mark_state.queues[0];
 
     // Mark roots
     trace(TRACE_nonmoving_gc, "Marking roots for nonmoving GC");
@@ -981,6 +982,8 @@ void nonmovingCollect(StgWeak **dead_weaks, StgTSO **resurrected_threads)
                            nonmovingConcurrentMark, mark_queue) != 0) {
             barf("nonmovingCollect: failed to spawn mark thread: %s", strerror(errno));
         }
+        // Start the mark worker threads...
+        startMarkThreads(3);
     } else {
         nonmovingConcurrentMark(mark_queue);
     }
@@ -998,7 +1001,7 @@ static void nonmovingMarkThreadsWeaks(MarkQueue *mark_queue)
 {
     while (true) {
         // Propagate marks
-        nonmovingMark(mark_queue);
+        nonmovingMarkLeader();
 
         // Tidy threads and weaks
         nonmovingTidyThreads();
@@ -1102,7 +1105,7 @@ static void nonmovingMark_(MarkQueue *mark_queue, StgWeak **dead_weaks, StgTSO *
     // Do last marking of weak pointers
     while (true) {
         // Propagate marks
-        nonmovingMark(mark_queue);
+        nonmovingMarkLeader();
 
         if (!nonmovingTidyWeaks(mark_queue))
             break;
@@ -1176,9 +1179,8 @@ static void nonmovingMark_(MarkQueue *mark_queue, StgWeak **dead_weaks, StgTSO *
     nonmovingFinishFlush(task);
 #endif
 
-    current_mark_queue = NULL;
-    freeMarkQueue(mark_queue);
-    stgFree(mark_queue);
+    // tear down the mark threads' state
+    stopMarkThreads();
 
     oldest_gen->live_estimate = nonmoving_live_words;
     oldest_gen->n_old_blocks = 0;


=====================================
rts/sm/NonMovingMark.c
=====================================
@@ -24,6 +24,7 @@
 #include "Stats.h"
 #include "STM.h"
 #include "MarkWeak.h"
+#include "RtsUtils.h"
 #include "sm/Storage.h"
 #include "CNF.h"
 
@@ -117,6 +118,8 @@ StgWeak *nonmoving_weak_ptr_list = NULL;
 StgIndStatic *debug_caf_list_snapshot = (StgIndStatic*)END_OF_CAF_LIST;
 #endif
 
+struct MarkState mark_state;
+
 /* Note [Update remembered set]
  * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  * The concurrent non-moving collector uses a remembered set to ensure
@@ -365,7 +368,6 @@ upd_rem_set_push_gc(UpdRemSet *rs, MarkQueueEnt *ent)
     markQueueBlockPush(rs->block, ent);
 }
 
-
 STATIC_INLINE void
 mark_queue_push (MarkQueue *q, const MarkQueueEnt *ent)
 {
@@ -375,7 +377,15 @@ mark_queue_push (MarkQueue *q, const MarkQueueEnt *ent)
         // Allocate a fresh block.
         ACQUIRE_SM_LOCK;
         bdescr *bd = allocGroup(MARK_QUEUE_BLOCKS);
-        bd->link = markQueueBlockBdescr(q->top);
+
+        // First try pushing onto local deque...
+        if (q->rest != NULL && pushWSDeque(q->rest, q->top)) {
+            bd->link = NULL;
+        } else {
+            // deque overflowed. link onto the new block's list.
+            bd->link = markQueueBlockBdescr(q->top);
+        }
+
         q->top = (MarkQueueBlock *) bd->start;
         q->top->head = 0;
         RELEASE_SM_LOCK;
@@ -386,7 +396,7 @@ mark_queue_push (MarkQueue *q, const MarkQueueEnt *ent)
 
 STATIC_INLINE void push(struct MarkContext *mc, MarkQueueEnt *ent) {
     switch (mc->kind) {
-    case MARK_CTX_IN_CONC_MARK: 
+    case MARK_CTX_IN_CONC_MARK:
         mark_queue_push(mc->in_conc_mark.queue, ent);
         break;
     case MARK_CTX_IN_MOVING_GC:
@@ -410,7 +420,7 @@ void nonmovingMarkInitUpdRemSet() {
 #if defined(THREADED_RTS) && defined(DEBUG)
 static uint32_t markQueueLength(MarkQueue *q);
 #endif
-static void init_mark_queue_(MarkQueue *queue);
+static void init_mark_queue(MarkQueue *queue);
 
 /* Transfers the given capability's update-remembered set to the global
  * remembered set. Must hold SM lock/allocation spinlock.
@@ -875,6 +885,17 @@ void markQueueAddRoot (MarkQueue* q, StgClosure** root)
  * Popping from the mark queue
  *********************************************************/
 
+static MarkQueueBlock *try_stealing(void)
+{
+    for (uint32_t i = 0; i < mark_state.n_mark_threads; i++) {
+        MarkQueueBlock *b = (MarkQueueBlock *) stealWSDeque(mark_state.queues[i]->rest);
+        if (b != NULL) {
+            return b;
+        }
+    }
+    return NULL;
+}
+
 // Returns invalid MarkQueueEnt if queue is empty.
 static MarkQueueEnt markQueuePop_ (MarkQueue *q)
 {
@@ -887,19 +908,41 @@ again:
     if (top->head == 0) {
         bdescr *old_block = markQueueBlockBdescr(q->top);
         // Is this the only block in the queue?
-        if (old_block->link == NULL) {
-            // Yes, therefore queue is empty...
-            MarkQueueEnt none = { .null_entry = { .p = NULL } };
-            return none;
-        } else {
+        if (old_block->link != NULL) {
             // No, unwind to the previous block and try popping again...
             bdescr *new_block = old_block->link;
             q->top = (MarkQueueBlock*) new_block->start;
             ACQUIRE_SM_LOCK;
-            freeGroup(old_block); // TODO: hold on to a block to avoid repeated allocation/deallocation?
+            freeGroup(old_block);
+              // TODO: hold on to a block to avoid repeated allocation/deallocation?
             RELEASE_SM_LOCK;
             goto again;
         }
+
+        // Yes, this is the only block therefore queue is empty...
+        // Next we try pulling from our own deque...
+        MarkQueueBlock *new = (MarkQueueBlock *) popWSDeque(q->rest);
+        if (new != NULL) {
+            q->top = new;
+            ACQUIRE_SM_LOCK;
+            freeGroup(old_block);
+            RELEASE_SM_LOCK;
+            goto again;
+        }
+
+        // Our deque is also empty...
+        // Now try pulling from other threads deques...
+        new = try_stealing();
+        if (new != NULL) {
+            q->top = new;
+            ACQUIRE_SM_LOCK;
+            freeGroup(old_block);
+            RELEASE_SM_LOCK;
+            goto again;
+        }
+
+        const MarkQueueEnt none = { .null_entry = { .p = NULL } };
+        return none;
     }
 
     top->head--;
@@ -961,23 +1004,20 @@ void init_upd_rem_set (UpdRemSet *queue)
 }
 
 /* Must hold sm_mutex. */
-static void init_mark_queue_ (MarkQueue *queue)
+static void init_mark_queue (MarkQueue *queue)
 {
     bdescr *bd = allocGroup(MARK_QUEUE_BLOCKS);
     queue->top = (MarkQueueBlock *) bd->start;
     queue->top->head = 0;
+    queue->rest = newWSDeque(MARK_QUEUE_DEQUE_SIZE);
+    queue->mark_thread_n = -1;
+    queue->thread_id = -1;
 #if MARK_PREFETCH_QUEUE_DEPTH > 0
     memset(&queue->prefetch_queue, 0, sizeof(queue->prefetch_queue));
     queue->prefetch_head = 0;
 #endif
 }
 
-/* Must hold sm_mutex. */
-void initMarkQueue (MarkQueue *queue)
-{
-    init_mark_queue_(queue);
-}
-
 void reset_upd_rem_set (UpdRemSet *rset)
 {
     // UpdRemSets always have one block for the mark queue. This assertion is to
@@ -986,7 +1026,8 @@ void reset_upd_rem_set (UpdRemSet *rset)
     rset->block->head = 0;
 }
 
-void freeMarkQueue (MarkQueue *queue)
+static void
+freeMarkQueue (MarkQueue *queue)
 {
     freeChain_lock(markQueueBlockBdescr(queue->top));
 }
@@ -1759,8 +1800,8 @@ done:
  *  c. the mark queue has been seeded with a set of roots.
  *
  */
-GNUC_ATTR_HOT void
-nonmovingMark (MarkQueue *queue)
+static GNUC_ATTR_HOT void
+nonmovingMarkLoop (MarkQueue *queue)
 {
     struct MarkContext mctx = markContextInConcMark(queue);
     traceConcMarkBegin();
@@ -1793,17 +1834,17 @@ nonmovingMark (MarkQueue *queue)
         case NULL_ENTRY:
             // Perhaps the update remembered set has more to mark...
             if (upd_rem_set_block_list) {
-                ACQUIRE_LOCK(&upd_rem_set_lock);
+                ACQUIRE_SM_LOCK;
                 bdescr *old = markQueueBlockBdescr(queue->top);
+                freeGroup(old);
+                RELEASE_SM_LOCK;
+
+                ACQUIRE_LOCK(&upd_rem_set_lock);
                 queue->top = (MarkQueueBlock *) upd_rem_set_block_list->start;
                 upd_rem_set_block_list = NULL;
                 RELEASE_LOCK(&upd_rem_set_lock);
-
-                ACQUIRE_SM_LOCK;
-                freeGroup(old);
-                RELEASE_SM_LOCK;
             } else {
-                // Nothing more to do
+                // We are out of work...
                 debugTrace(DEBUG_nonmoving_gc, "Finished mark pass: %d", count);
                 traceConcMarkEnd(count);
                 return;
@@ -1812,6 +1853,112 @@ nonmovingMark (MarkQueue *queue)
     }
 }
 
+/*
+ * This is the mark loop run by the leader thread (mark_thread_n == 0). It
+ * kicks the worker threads, starts marking itself, and waits until everyone
+ * finishes.
+ */
+void
+nonmovingMarkLeader ()
+{
+    broadcastCondition(&mark_state.new_work_cond);
+    nonmovingMarkLoop(mark_state.queues[0]);
+
+    ACQUIRE_LOCK(&mark_state.lock);
+    while (mark_state.active_mark_threads > 0) {
+        waitCondition(&mark_state.phase_done_cond, &mark_state.lock);
+    }
+    RELEASE_LOCK(&mark_state.lock);
+}
+
+/*
+ * This is the loop run by the worker threads.
+ */
+static void *
+nonmoving_mark_worker (void *user)
+{
+    MarkQueue *queue = (MarkQueue *) user;
+    const uint32_t mark_thread_n = queue->mark_thread_n;
+
+    ACQUIRE_LOCK(&mark_state.lock);
+    while (mark_thread_n > mark_state.n_mark_threads - 1) {
+        RELEASE_LOCK(&mark_state.lock);
+        nonmovingMarkLoop(queue);
+        ACQUIRE_LOCK(&mark_state.lock);
+
+        mark_state.active_mark_threads --;
+        if (mark_state.active_mark_threads == 0) {
+            signalCondition(&mark_state.phase_done_cond);
+        }
+        if (mark_thread_n != 0) {
+            waitCondition(&mark_state.new_work_cond, &mark_state.lock);
+        }
+    }
+    return NULL;
+}
+
+void
+nonmovingInitMarkState()
+{
+    initMutex(&mark_state.lock);
+    initCondition(&mark_state.phase_done_cond);
+    initCondition(&mark_state.new_work_cond);
+    mark_state.n_mark_threads = 0;
+    mark_state.active_mark_threads = 0;
+}
+
+void
+startMarkThreads(int n_mark_threads)
+{
+    ACQUIRE_LOCK(&mark_state.lock);
+    ASSERT(mark_state.n_mark_threads == 0);
+    ASSERT(n_mark_threads >= 1);
+
+    MarkQueue **old_queues = mark_state.queues;
+    mark_state.queues = stgMallocBytes(sizeof(MarkQueue*) * n_mark_threads, "startMarkThreads");
+    if (old_queues != NULL) {
+        memcpy(mark_state.queues, old_queues, sizeof(MarkQueue*) * mark_state.n_mark_threads);
+        stgFree(old_queues);
+    }
+
+    for (int i = mark_state.n_mark_threads; i < n_mark_threads; i++) {
+        MarkQueue *q = stgMallocBytes(sizeof(MarkQueue*), "startMarkThreads");
+        mark_state.queues[i] = q;
+        init_mark_queue(q);
+
+        q->mark_thread_n = i;
+        mark_state.active_mark_threads ++;
+        mark_state.n_mark_threads ++;
+
+        // N.B. mark thread 0 runs in the context of the main mark thread.
+        if (i > 0) {
+            int res = createOSThread(&q->thread_id, "concurrent mark thread", nonmoving_mark_worker, q);
+            if (res != 0) {
+                barf("startMarkThreads");
+            }
+        }
+    }
+    RELEASE_LOCK(&mark_state.lock);
+}
+
+void
+stopMarkThreads()
+{
+    ACQUIRE_LOCK(&mark_state.lock);
+    // ensure that there are no active threads since we will be freeing
+    // the MarkQueues shortly.
+    ASSERT(mark_state.active_mark_threads == 0);
+    mark_state.n_mark_threads = 0;
+    broadcastCondition(&mark_state.new_work_cond);
+
+    for (uint32_t i = 0; i < mark_state.n_mark_threads; i++) {
+        freeMarkQueue(mark_state.queues[i]);
+    }
+    stgFree(mark_state.queues);
+    mark_state.queues = NULL;
+    RELEASE_LOCK(&mark_state.lock);
+}
+
 // A variant of `isAlive` that works for non-moving heap. Used for:
 //
 // - Collecting weak pointers; checking key of a weak pointer.


=====================================
rts/sm/NonMovingMark.h
=====================================
@@ -10,6 +10,7 @@
 
 #include "Task.h"
 #include "NonMoving.h"
+#include "WSDeque.h"
 
 #include "BeginPrivate.h"
 
@@ -101,6 +102,9 @@ INLINE_HEADER bdescr *markQueueBlockBdescr(MarkQueueBlock *b)
 // How far ahead in mark queue to prefetch?
 #define MARK_PREFETCH_QUEUE_DEPTH 5
 
+// How many blocks to keep on the deque?
+#define MARK_QUEUE_DEQUE_SIZE 32
+
 /* The mark queue is not capable of concurrent read or write.
  *
  * invariants:
@@ -115,6 +119,12 @@ typedef struct MarkQueue_ {
     //     Bdescr(q->top)->link->start
     MarkQueueBlock *top;
 
+    // A WSDeque of MarkQueueBlock*s which mark threads can steal from. When
+    // the deque overflows we link blocks onto Bdescr(top)->link.
+    WSDeque *rest;
+
+    int mark_thread_n;
+    OSThreadId thread_id;
 
 #if MARK_PREFETCH_QUEUE_DEPTH > 0
     // A ring-buffer of entries which we will mark next
@@ -124,6 +134,25 @@ typedef struct MarkQueue_ {
 #endif
 } MarkQueue;
 
+struct MarkState {
+    // protects active_mark_threads and n_mark_threads.
+    Mutex lock;
+    // signalled when all marking threads have finished a round of marking.
+    Condition phase_done_cond;
+    // signalled to wake up marking threads for a new round of marking
+    // (or terminate if .n_mark_threads > thread.mark_thread_n).
+    Condition new_work_cond;
+    // how many threads are currently marking?
+    uint32_t active_mark_threads;
+    // how many threads have been created?
+    // this is currently static throughout marking.
+    uint32_t n_mark_threads;
+    // an array of MarkQueue*s, one per mark thread
+    MarkQueue **queues;
+};
+
+extern struct MarkState mark_state;
+
 /* The update remembered set.
  *
  * invariants:
@@ -173,9 +202,10 @@ void nonmovingFinishFlush(Task *task);
 
 void markQueueAddRoot(MarkQueue* q, StgClosure** root);
 
-void initMarkQueue(MarkQueue *queue);
-void freeMarkQueue(MarkQueue *queue);
-void nonmovingMark(struct MarkQueue_ *restrict queue);
+void startMarkThreads(int n_mark_threads);
+void stopMarkThreads(void);
+void nonmovingInitMarkState(void);
+void nonmovingMarkLeader(void);
 
 bool nonmovingTidyWeaks(struct MarkQueue_ *queue);
 void nonmovingTidyThreads(void);



View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/commit/cfd77aba805f8bc8e81a39a4f3b67ea8b2c23ea4

-- 
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/commit/cfd77aba805f8bc8e81a39a4f3b67ea8b2c23ea4
You're receiving this email because of your account on gitlab.haskell.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/ghc-commits/attachments/20200810/656f55b7/attachment-0001.html>


More information about the ghc-commits mailing list