[Git][ghc/ghc][wip/gc/parallel-marking] nonmoving: Parallel marking?
Ben Gamari
gitlab at gitlab.haskell.org
Tue Aug 11 02:25:29 UTC 2020
Ben Gamari pushed to branch wip/gc/parallel-marking at Glasgow Haskell Compiler / GHC
Commits:
cfd77aba by Ben Gamari at 2020-08-10T22:25:22-04:00
nonmoving: Parallel marking?
- - - - -
3 changed files:
- rts/sm/NonMoving.c
- rts/sm/NonMovingMark.c
- rts/sm/NonMovingMark.h
Changes:
=====================================
rts/sm/NonMoving.c
=====================================
@@ -714,6 +714,7 @@ void nonmovingInit(void)
nonmovingHeap.allocators[i] = alloc_nonmoving_allocator(n_capabilities);
}
nonmovingMarkInitUpdRemSet();
+ nonmovingInitMarkState();
}
// Stop any nonmoving collection in preparation for RTS shutdown.
@@ -915,9 +916,9 @@ void nonmovingCollect(StgWeak **dead_weaks, StgTSO **resurrected_threads)
ASSERT(nonmoving_marked_compact_objects == NULL);
ASSERT(n_nonmoving_marked_compact_blocks == 0);
- MarkQueue *mark_queue = stgMallocBytes(sizeof(MarkQueue), "mark queue");
- initMarkQueue(mark_queue);
- current_mark_queue = mark_queue;
+ // First initialize a MarkQueue for the leader thread:
+ startMarkThreads(1);
+ MarkQueue *mark_queue = mark_state.queues[0];
// Mark roots
trace(TRACE_nonmoving_gc, "Marking roots for nonmoving GC");
@@ -981,6 +982,8 @@ void nonmovingCollect(StgWeak **dead_weaks, StgTSO **resurrected_threads)
nonmovingConcurrentMark, mark_queue) != 0) {
barf("nonmovingCollect: failed to spawn mark thread: %s", strerror(errno));
}
+ // Start the mark worker threads...
+ startMarkThreads(3);
} else {
nonmovingConcurrentMark(mark_queue);
}
@@ -998,7 +1001,7 @@ static void nonmovingMarkThreadsWeaks(MarkQueue *mark_queue)
{
while (true) {
// Propagate marks
- nonmovingMark(mark_queue);
+ nonmovingMarkLeader();
// Tidy threads and weaks
nonmovingTidyThreads();
@@ -1102,7 +1105,7 @@ static void nonmovingMark_(MarkQueue *mark_queue, StgWeak **dead_weaks, StgTSO *
// Do last marking of weak pointers
while (true) {
// Propagate marks
- nonmovingMark(mark_queue);
+ nonmovingMarkLeader();
if (!nonmovingTidyWeaks(mark_queue))
break;
@@ -1176,9 +1179,8 @@ static void nonmovingMark_(MarkQueue *mark_queue, StgWeak **dead_weaks, StgTSO *
nonmovingFinishFlush(task);
#endif
- current_mark_queue = NULL;
- freeMarkQueue(mark_queue);
- stgFree(mark_queue);
+ // tear down the mark threads' state
+ stopMarkThreads();
oldest_gen->live_estimate = nonmoving_live_words;
oldest_gen->n_old_blocks = 0;
=====================================
rts/sm/NonMovingMark.c
=====================================
@@ -24,6 +24,7 @@
#include "Stats.h"
#include "STM.h"
#include "MarkWeak.h"
+#include "RtsUtils.h"
#include "sm/Storage.h"
#include "CNF.h"
@@ -117,6 +118,8 @@ StgWeak *nonmoving_weak_ptr_list = NULL;
StgIndStatic *debug_caf_list_snapshot = (StgIndStatic*)END_OF_CAF_LIST;
#endif
+struct MarkState mark_state;
+
/* Note [Update remembered set]
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* The concurrent non-moving collector uses a remembered set to ensure
@@ -365,7 +368,6 @@ upd_rem_set_push_gc(UpdRemSet *rs, MarkQueueEnt *ent)
markQueueBlockPush(rs->block, ent);
}
-
STATIC_INLINE void
mark_queue_push (MarkQueue *q, const MarkQueueEnt *ent)
{
@@ -375,7 +377,15 @@ mark_queue_push (MarkQueue *q, const MarkQueueEnt *ent)
// Allocate a fresh block.
ACQUIRE_SM_LOCK;
bdescr *bd = allocGroup(MARK_QUEUE_BLOCKS);
- bd->link = markQueueBlockBdescr(q->top);
+
+ // First try pushing onto local deque...
+ if (q->rest != NULL && pushWSDeque(q->rest, q->top)) {
+ bd->link = NULL;
+ } else {
+ // deque overflowed. link onto the new block's list.
+ bd->link = markQueueBlockBdescr(q->top);
+ }
+
q->top = (MarkQueueBlock *) bd->start;
q->top->head = 0;
RELEASE_SM_LOCK;
@@ -386,7 +396,7 @@ mark_queue_push (MarkQueue *q, const MarkQueueEnt *ent)
STATIC_INLINE void push(struct MarkContext *mc, MarkQueueEnt *ent) {
switch (mc->kind) {
- case MARK_CTX_IN_CONC_MARK:
+ case MARK_CTX_IN_CONC_MARK:
mark_queue_push(mc->in_conc_mark.queue, ent);
break;
case MARK_CTX_IN_MOVING_GC:
@@ -410,7 +420,7 @@ void nonmovingMarkInitUpdRemSet() {
#if defined(THREADED_RTS) && defined(DEBUG)
static uint32_t markQueueLength(MarkQueue *q);
#endif
-static void init_mark_queue_(MarkQueue *queue);
+static void init_mark_queue(MarkQueue *queue);
/* Transfers the given capability's update-remembered set to the global
* remembered set. Must hold SM lock/allocation spinlock.
@@ -875,6 +885,17 @@ void markQueueAddRoot (MarkQueue* q, StgClosure** root)
* Popping from the mark queue
*********************************************************/
+static MarkQueueBlock *try_stealing(void)
+{
+ for (uint32_t i = 0; i < mark_state.n_mark_threads; i++) {
+ MarkQueueBlock *b = (MarkQueueBlock *) stealWSDeque(mark_state.queues[i]->rest);
+ if (b != NULL) {
+ return b;
+ }
+ }
+ return NULL;
+}
+
// Returns invalid MarkQueueEnt if queue is empty.
static MarkQueueEnt markQueuePop_ (MarkQueue *q)
{
@@ -887,19 +908,41 @@ again:
if (top->head == 0) {
bdescr *old_block = markQueueBlockBdescr(q->top);
// Is this the only block in the queue?
- if (old_block->link == NULL) {
- // Yes, therefore queue is empty...
- MarkQueueEnt none = { .null_entry = { .p = NULL } };
- return none;
- } else {
+ if (old_block->link != NULL) {
// No, unwind to the previous block and try popping again...
bdescr *new_block = old_block->link;
q->top = (MarkQueueBlock*) new_block->start;
ACQUIRE_SM_LOCK;
- freeGroup(old_block); // TODO: hold on to a block to avoid repeated allocation/deallocation?
+ freeGroup(old_block);
+ // TODO: hold on to a block to avoid repeated allocation/deallocation?
RELEASE_SM_LOCK;
goto again;
}
+
+ // Yes, this is the only block therefore queue is empty...
+ // Next we try pulling from our own deque...
+ MarkQueueBlock *new = (MarkQueueBlock *) popWSDeque(q->rest);
+ if (new != NULL) {
+ q->top = new;
+ ACQUIRE_SM_LOCK;
+ freeGroup(old_block);
+ RELEASE_SM_LOCK;
+ goto again;
+ }
+
+ // Our deque is also empty...
+ // Now try pulling from other threads deques...
+ new = try_stealing();
+ if (new != NULL) {
+ q->top = new;
+ ACQUIRE_SM_LOCK;
+ freeGroup(old_block);
+ RELEASE_SM_LOCK;
+ goto again;
+ }
+
+ const MarkQueueEnt none = { .null_entry = { .p = NULL } };
+ return none;
}
top->head--;
@@ -961,23 +1004,20 @@ void init_upd_rem_set (UpdRemSet *queue)
}
/* Must hold sm_mutex. */
-static void init_mark_queue_ (MarkQueue *queue)
+static void init_mark_queue (MarkQueue *queue)
{
bdescr *bd = allocGroup(MARK_QUEUE_BLOCKS);
queue->top = (MarkQueueBlock *) bd->start;
queue->top->head = 0;
+ queue->rest = newWSDeque(MARK_QUEUE_DEQUE_SIZE);
+ queue->mark_thread_n = -1;
+ queue->thread_id = -1;
#if MARK_PREFETCH_QUEUE_DEPTH > 0
memset(&queue->prefetch_queue, 0, sizeof(queue->prefetch_queue));
queue->prefetch_head = 0;
#endif
}
-/* Must hold sm_mutex. */
-void initMarkQueue (MarkQueue *queue)
-{
- init_mark_queue_(queue);
-}
-
void reset_upd_rem_set (UpdRemSet *rset)
{
// UpdRemSets always have one block for the mark queue. This assertion is to
@@ -986,7 +1026,8 @@ void reset_upd_rem_set (UpdRemSet *rset)
rset->block->head = 0;
}
-void freeMarkQueue (MarkQueue *queue)
+static void
+freeMarkQueue (MarkQueue *queue)
{
freeChain_lock(markQueueBlockBdescr(queue->top));
}
@@ -1759,8 +1800,8 @@ done:
* c. the mark queue has been seeded with a set of roots.
*
*/
-GNUC_ATTR_HOT void
-nonmovingMark (MarkQueue *queue)
+static GNUC_ATTR_HOT void
+nonmovingMarkLoop (MarkQueue *queue)
{
struct MarkContext mctx = markContextInConcMark(queue);
traceConcMarkBegin();
@@ -1793,17 +1834,17 @@ nonmovingMark (MarkQueue *queue)
case NULL_ENTRY:
// Perhaps the update remembered set has more to mark...
if (upd_rem_set_block_list) {
- ACQUIRE_LOCK(&upd_rem_set_lock);
+ ACQUIRE_SM_LOCK;
bdescr *old = markQueueBlockBdescr(queue->top);
+ freeGroup(old);
+ RELEASE_SM_LOCK;
+
+ ACQUIRE_LOCK(&upd_rem_set_lock);
queue->top = (MarkQueueBlock *) upd_rem_set_block_list->start;
upd_rem_set_block_list = NULL;
RELEASE_LOCK(&upd_rem_set_lock);
-
- ACQUIRE_SM_LOCK;
- freeGroup(old);
- RELEASE_SM_LOCK;
} else {
- // Nothing more to do
+ // We are out of work...
debugTrace(DEBUG_nonmoving_gc, "Finished mark pass: %d", count);
traceConcMarkEnd(count);
return;
@@ -1812,6 +1853,112 @@ nonmovingMark (MarkQueue *queue)
}
}
+/*
+ * This is the mark loop run by the leader thread (mark_thread_n == 0). It
+ * kicks the worker threads, starts marking itself, and waits until everyone
+ * finishes.
+ */
+void
+nonmovingMarkLeader ()
+{
+ broadcastCondition(&mark_state.new_work_cond);
+ nonmovingMarkLoop(mark_state.queues[0]);
+
+ ACQUIRE_LOCK(&mark_state.lock);
+ while (mark_state.active_mark_threads > 0) {
+ waitCondition(&mark_state.phase_done_cond, &mark_state.lock);
+ }
+ RELEASE_LOCK(&mark_state.lock);
+}
+
+/*
+ * This is the loop run by the worker threads.
+ */
+static void *
+nonmoving_mark_worker (void *user)
+{
+ MarkQueue *queue = (MarkQueue *) user;
+ const uint32_t mark_thread_n = queue->mark_thread_n;
+
+ ACQUIRE_LOCK(&mark_state.lock);
+ while (mark_thread_n > mark_state.n_mark_threads - 1) {
+ RELEASE_LOCK(&mark_state.lock);
+ nonmovingMarkLoop(queue);
+ ACQUIRE_LOCK(&mark_state.lock);
+
+ mark_state.active_mark_threads --;
+ if (mark_state.active_mark_threads == 0) {
+ signalCondition(&mark_state.phase_done_cond);
+ }
+ if (mark_thread_n != 0) {
+ waitCondition(&mark_state.new_work_cond, &mark_state.lock);
+ }
+ }
+ return NULL;
+}
+
+void
+nonmovingInitMarkState()
+{
+ initMutex(&mark_state.lock);
+ initCondition(&mark_state.phase_done_cond);
+ initCondition(&mark_state.new_work_cond);
+ mark_state.n_mark_threads = 0;
+ mark_state.active_mark_threads = 0;
+}
+
+void
+startMarkThreads(int n_mark_threads)
+{
+ ACQUIRE_LOCK(&mark_state.lock);
+ ASSERT(mark_state.n_mark_threads == 0);
+ ASSERT(n_mark_threads >= 1);
+
+ MarkQueue **old_queues = mark_state.queues;
+ mark_state.queues = stgMallocBytes(sizeof(MarkQueue*) * n_mark_threads, "startMarkThreads");
+ if (old_queues != NULL) {
+ memcpy(mark_state.queues, old_queues, sizeof(MarkQueue*) * mark_state.n_mark_threads);
+ stgFree(old_queues);
+ }
+
+ for (int i = mark_state.n_mark_threads; i < n_mark_threads; i++) {
+ MarkQueue *q = stgMallocBytes(sizeof(MarkQueue*), "startMarkThreads");
+ mark_state.queues[i] = q;
+ init_mark_queue(q);
+
+ q->mark_thread_n = i;
+ mark_state.active_mark_threads ++;
+ mark_state.n_mark_threads ++;
+
+ // N.B. mark thread 0 runs in the context of the main mark thread.
+ if (i > 0) {
+ int res = createOSThread(&q->thread_id, "concurrent mark thread", nonmoving_mark_worker, q);
+ if (res != 0) {
+ barf("startMarkThreads");
+ }
+ }
+ }
+ RELEASE_LOCK(&mark_state.lock);
+}
+
+void
+stopMarkThreads()
+{
+ ACQUIRE_LOCK(&mark_state.lock);
+ // ensure that there are no active threads since we will be freeing
+ // the MarkQueues shortly.
+ ASSERT(mark_state.active_mark_threads == 0);
+ mark_state.n_mark_threads = 0;
+ broadcastCondition(&mark_state.new_work_cond);
+
+ for (uint32_t i = 0; i < mark_state.n_mark_threads; i++) {
+ freeMarkQueue(mark_state.queues[i]);
+ }
+ stgFree(mark_state.queues);
+ mark_state.queues = NULL;
+ RELEASE_LOCK(&mark_state.lock);
+}
+
// A variant of `isAlive` that works for non-moving heap. Used for:
//
// - Collecting weak pointers; checking key of a weak pointer.
=====================================
rts/sm/NonMovingMark.h
=====================================
@@ -10,6 +10,7 @@
#include "Task.h"
#include "NonMoving.h"
+#include "WSDeque.h"
#include "BeginPrivate.h"
@@ -101,6 +102,9 @@ INLINE_HEADER bdescr *markQueueBlockBdescr(MarkQueueBlock *b)
// How far ahead in mark queue to prefetch?
#define MARK_PREFETCH_QUEUE_DEPTH 5
+// How many blocks to keep on the deque?
+#define MARK_QUEUE_DEQUE_SIZE 32
+
/* The mark queue is not capable of concurrent read or write.
*
* invariants:
@@ -115,6 +119,12 @@ typedef struct MarkQueue_ {
// Bdescr(q->top)->link->start
MarkQueueBlock *top;
+ // A WSDeque of MarkQueueBlock*s which mark threads can steal from. When
+ // the deque overflows we link blocks onto Bdescr(top)->link.
+ WSDeque *rest;
+
+ int mark_thread_n;
+ OSThreadId thread_id;
#if MARK_PREFETCH_QUEUE_DEPTH > 0
// A ring-buffer of entries which we will mark next
@@ -124,6 +134,25 @@ typedef struct MarkQueue_ {
#endif
} MarkQueue;
+struct MarkState {
+ // protects active_mark_threads and n_mark_threads.
+ Mutex lock;
+ // signalled when all marking threads have finished a round of marking.
+ Condition phase_done_cond;
+ // signalled to wake up marking threads for a new round of marking
+ // (or terminate if .n_mark_threads > thread.mark_thread_n).
+ Condition new_work_cond;
+ // how many threads are currently marking?
+ uint32_t active_mark_threads;
+ // how many threads have been created?
+ // this is currently static throughout marking.
+ uint32_t n_mark_threads;
+ // an array of MarkQueue*s, one per mark thread
+ MarkQueue **queues;
+};
+
+extern struct MarkState mark_state;
+
/* The update remembered set.
*
* invariants:
@@ -173,9 +202,10 @@ void nonmovingFinishFlush(Task *task);
void markQueueAddRoot(MarkQueue* q, StgClosure** root);
-void initMarkQueue(MarkQueue *queue);
-void freeMarkQueue(MarkQueue *queue);
-void nonmovingMark(struct MarkQueue_ *restrict queue);
+void startMarkThreads(int n_mark_threads);
+void stopMarkThreads(void);
+void nonmovingInitMarkState(void);
+void nonmovingMarkLeader(void);
bool nonmovingTidyWeaks(struct MarkQueue_ *queue);
void nonmovingTidyThreads(void);
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/commit/cfd77aba805f8bc8e81a39a4f3b67ea8b2c23ea4
--
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/commit/cfd77aba805f8bc8e81a39a4f3b67ea8b2c23ea4
You're receiving this email because of your account on gitlab.haskell.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/ghc-commits/attachments/20200810/656f55b7/attachment-0001.html>
More information about the ghc-commits
mailing list