[Git][ghc/ghc][wip/tsan/fix-races] 8 commits: rts: Fix synchronization on thread blocking state
Ben Gamari (@bgamari)
gitlab at gitlab.haskell.org
Sat Dec 16 13:36:20 UTC 2023
Ben Gamari pushed to branch wip/tsan/fix-races at Glasgow Haskell Compiler / GHC
Commits:
3a675865 by Ben Gamari at 2023-12-16T08:36:00-05:00
rts: Fix synchronization on thread blocking state
We now use a release barrier whenever we update a thread's blocking
state. This required widening StgTSO.why_blocked as AArch64 does not
support atomic writes on 16-bit values.
- - - - -
bd07c396 by Ben Gamari at 2023-12-16T08:36:00-05:00
rts: Use relaxed ordering on dirty/clean info tables updates
When changing the dirty/clean state of a mutable object we needn't have
any particular ordering.
- - - - -
f46a5ffd by Ben Gamari at 2023-12-16T08:36:00-05:00
codeGen: Use relaxed-read in closureInfoPtr
- - - - -
46e7bcc7 by Ben Gamari at 2023-12-16T08:36:00-05:00
STM: Use acquire loads when possible
Full sequential consistency is not needed here.
- - - - -
92d1a8af by Ben Gamari at 2023-12-16T08:36:00-05:00
rts/Messages: Fix data race
- - - - -
a2ffeafa by Ben Gamari at 2023-12-16T08:36:00-05:00
rts/Prof: Fix data race
- - - - -
299ac1db by Ben Gamari at 2023-12-16T08:36:00-05:00
rts: Use fence rather than redundant load
Previously we would use an atomic load to ensure acquire ordering.
However, we now have `ACQUIRE_FENCE_ON`, which allows us to express this
more directly.
- - - - -
d6e45c01 by Ben Gamari at 2023-12-16T08:36:00-05:00
rts: Fix data races in profiling timer
- - - - -
22 changed files:
- compiler/GHC/Cmm/Info.hs
- rts/Exception.cmm
- rts/Messages.c
- rts/PrimOps.cmm
- rts/Proftimer.c
- rts/RaiseAsync.c
- rts/STM.c
- rts/Schedule.c
- rts/StgMiscClosures.cmm
- rts/Threads.c
- rts/TraverseHeap.c
- rts/include/rts/storage/ClosureMacros.h
- rts/include/rts/storage/TSO.h
- rts/include/stg/SMP.h
- rts/posix/Select.c
- rts/sm/Compact.c
- rts/sm/GC.c
- rts/sm/GCAux.c
- rts/sm/NonMovingMark.c
- rts/sm/Scav.c
- rts/sm/Storage.c
- rts/win32/AsyncMIO.c
Changes:
=====================================
compiler/GHC/Cmm/Info.hs
=====================================
@@ -449,7 +449,7 @@ wordAligned platform align_check e
-- | Takes a closure pointer and returns the info table pointer
closureInfoPtr :: Platform -> DoAlignSanitisation -> CmmExpr -> CmmExpr
closureInfoPtr platform align_check e =
- cmmLoadBWord platform (wordAligned platform align_check e)
+ CmmMachOp (MO_RelaxedRead (wordWidth platform)) [wordAligned platform align_check e]
-- | Takes an info pointer (the first word of a closure) and returns its entry
-- code
=====================================
rts/Exception.cmm
=====================================
@@ -351,9 +351,9 @@ stg_killThreadzh (P_ target, P_ exception)
if (msg == NULL) {
return ();
} else {
- StgTSO_why_blocked(CurrentTSO) = BlockedOnMsgThrowTo;
updateRemembSetPushPtr(StgTSO_block_info(CurrentTSO));
StgTSO_block_info(CurrentTSO) = msg;
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMsgThrowTo;
// we must block, and unlock the message before returning
jump stg_block_throwto (target, exception);
}
=====================================
rts/Messages.c
=====================================
@@ -205,7 +205,7 @@ uint32_t messageBlackHole(Capability *cap, MessageBlackHole *msg)
StgTSO *owner = (StgTSO*)p;
#if defined(THREADED_RTS)
- if (owner->cap != cap) {
+ if (RELAXED_LOAD(&owner->cap) != cap) {
sendMessage(cap, owner->cap, (Message*)msg);
debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d",
owner->cap->no);
@@ -275,7 +275,7 @@ uint32_t messageBlackHole(Capability *cap, MessageBlackHole *msg)
ASSERT(owner != END_TSO_QUEUE);
#if defined(THREADED_RTS)
- if (owner->cap != cap) {
+ if (RELAXED_LOAD(&owner->cap) != cap) {
sendMessage(cap, owner->cap, (Message*)msg);
debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d",
owner->cap->no);
=====================================
rts/PrimOps.cmm
=====================================
@@ -827,7 +827,9 @@ stg_atomicModifyMutVarzuzh ( gcptr mv, gcptr f )
StgMutVar_var(mv) = z;
#endif
- if (GET_INFO(mv) == stg_MUT_VAR_CLEAN_info) {
+ W_ info;
+ info = %relaxed GET_INFO(mv);
+ if (info == stg_MUT_VAR_CLEAN_info) {
ccall dirty_MUT_VAR(BaseReg "ptr", mv "ptr", x "ptr");
}
@@ -1715,21 +1717,17 @@ stg_takeMVarzh ( P_ mvar /* :: MVar a */ )
StgMVarTSOQueue_link(q) = END_TSO_QUEUE;
StgMVarTSOQueue_tso(q) = CurrentTSO;
SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
- // Write barrier before we make the new MVAR_TSO_QUEUE
- // visible to other cores.
- // See Note [Heap memory barriers]
- RELEASE_FENCE;
if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
- StgMVar_head(mvar) = q;
+ %release StgMVar_head(mvar) = q;
} else {
- StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q;
+ %release StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q;
ccall recordClosureMutated(MyCapability() "ptr",
StgMVar_tail(mvar));
}
StgTSO__link(CurrentTSO) = q;
StgTSO_block_info(CurrentTSO) = mvar;
- StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I32;
StgMVar_tail(mvar) = q;
jump stg_block_takemvar(mvar);
@@ -1770,7 +1768,7 @@ loop:
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
- ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
+ ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I32);
ASSERT(StgTSO_block_info(tso) == mvar);
// actually perform the putMVar for the thread that we just woke up
@@ -1838,7 +1836,7 @@ loop:
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
- ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
+ ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I32);
ASSERT(StgTSO_block_info(tso) == mvar);
// actually perform the putMVar for the thread that we just woke up
@@ -1884,19 +1882,17 @@ stg_putMVarzh ( P_ mvar, /* :: MVar a */
StgMVarTSOQueue_tso(q) = CurrentTSO;
SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
- // See Note [Heap memory barriers]
- RELEASE_FENCE;
if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
- StgMVar_head(mvar) = q;
+ %release StgMVar_head(mvar) = q;
} else {
- StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q;
+ %release StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q;
ccall recordClosureMutated(MyCapability() "ptr",
StgMVar_tail(mvar));
}
StgTSO__link(CurrentTSO) = q;
StgTSO_block_info(CurrentTSO) = mvar;
- StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I32;
StgMVar_tail(mvar) = q;
jump stg_block_putmvar(mvar,val);
@@ -2033,11 +2029,11 @@ loop:
}
}
- ASSERT(StgTSO_block_info(tso) == mvar);
// save why_blocked here, because waking up the thread destroys
// this information
W_ why_blocked;
- why_blocked = TO_W_(StgTSO_why_blocked(tso));
+ why_blocked = TO_W_(StgTSO_why_blocked(tso)); // TODO: Missing barrier
+ ASSERT(StgTSO_block_info(tso) == mvar);
// actually perform the takeMVar
W_ stack;
@@ -2093,13 +2089,11 @@ stg_readMVarzh ( P_ mvar, /* :: MVar a */ )
StgMVarTSOQueue_tso(q) = CurrentTSO;
SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
- // See Note [Heap memory barriers]
- RELEASE_FENCE;
+ %release StgMVar_head(mvar) = q;
StgTSO__link(CurrentTSO) = q;
StgTSO_block_info(CurrentTSO) = mvar;
- StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16;
- StgMVar_head(mvar) = q;
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16;
if (StgMVar_tail(mvar) == stg_END_TSO_QUEUE_closure) {
StgMVar_tail(mvar) = q;
@@ -2226,17 +2220,16 @@ stg_readIOPortzh ( P_ ioport /* :: IOPort a */ )
StgMVarTSOQueue_tso(q) = CurrentTSO;
SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
- // See Note [Heap memory barriers]
- RELEASE_FENCE;
- StgMVar_head(ioport) = q;
+ %release StgMVar_head(ioport) = q;
StgTSO__link(CurrentTSO) = q;
StgTSO_block_info(CurrentTSO) = ioport;
- StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
+
+ // See Note [Heap memory barriers]
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I32;
//Unlocks the closure as well
jump stg_block_readmvar(ioport);
-
}
//This way we can check of there has been a read already.
@@ -2314,11 +2307,11 @@ loop:
// next element in the waiting list here, as there can only ever
// be one thread blocked on a port.
- ASSERT(StgTSO_block_info(tso) == ioport);
// save why_blocked here, because waking up the thread destroys
// this information
W_ why_blocked;
- why_blocked = TO_W_(StgTSO_why_blocked(tso));
+ why_blocked = TO_W_(StgTSO_why_blocked(tso)); // TODO Missing acquire
+ ASSERT(StgTSO_block_info(tso) == ioport);
// actually perform the takeMVar
W_ stack;
@@ -2559,9 +2552,9 @@ stg_waitReadzh ( W_ fd )
ccall barf("waitRead# on threaded RTS") never returns;
#else
- ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
- StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16;
+ ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I32);
StgTSO_block_info(CurrentTSO) = fd;
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I32;
// No locking - we're not going to use this interface in the
// threaded RTS anyway.
ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
@@ -2575,9 +2568,9 @@ stg_waitWritezh ( W_ fd )
ccall barf("waitWrite# on threaded RTS") never returns;
#else
- ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
- StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16;
+ ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I32);
StgTSO_block_info(CurrentTSO) = fd;
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I32;
// No locking - we're not going to use this interface in the
// threaded RTS anyway.
ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
@@ -2598,8 +2591,7 @@ stg_delayzh ( W_ us_delay )
ccall barf("delay# on threaded RTS") never returns;
#else
- ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
- StgTSO_why_blocked(CurrentTSO) = BlockedOnDelay::I16;
+ ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I32);
#if defined(mingw32_HOST_OS)
@@ -2616,12 +2608,13 @@ stg_delayzh ( W_ us_delay )
* simplifies matters, so change the status to OnDoProc put the
* delayed thread on the blocked_queue.
*/
- StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16;
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I32;
ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
jump stg_block_async_void();
#else
+ %relaxed StgTSO_why_blocked(CurrentTSO) = BlockedOnDelay::I32;
(target) = ccall getDelayTarget(us_delay);
StgTSO_block_info(CurrentTSO) = target;
@@ -2643,9 +2636,6 @@ stg_asyncReadzh ( W_ fd, W_ is_sock, W_ len, W_ buf )
ccall barf("asyncRead# on threaded RTS") never returns;
#else
- ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
- StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16;
-
/* could probably allocate this on the heap instead */
("ptr" ares) = ccall stgMallocBytes(SIZEOF_StgAsyncIOResult,
"stg_asyncReadzh");
@@ -2654,6 +2644,10 @@ stg_asyncReadzh ( W_ fd, W_ is_sock, W_ len, W_ buf )
StgAsyncIOResult_len(ares) = 0;
StgAsyncIOResult_errCode(ares) = 0;
StgTSO_block_info(CurrentTSO) = ares;
+
+ ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I32);
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I32;
+
ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
jump stg_block_async();
#endif
@@ -2668,9 +2662,6 @@ stg_asyncWritezh ( W_ fd, W_ is_sock, W_ len, W_ buf )
ccall barf("asyncWrite# on threaded RTS") never returns;
#else
- ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
- StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16;
-
("ptr" ares) = ccall stgMallocBytes(SIZEOF_StgAsyncIOResult,
"stg_asyncWritezh");
(reqID) = ccall addIORequest(fd, 1/*TRUE*/,is_sock,len,buf "ptr");
@@ -2679,6 +2670,10 @@ stg_asyncWritezh ( W_ fd, W_ is_sock, W_ len, W_ buf )
StgAsyncIOResult_len(ares) = 0;
StgAsyncIOResult_errCode(ares) = 0;
StgTSO_block_info(CurrentTSO) = ares;
+
+ ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I32);
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I32;
+
ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
jump stg_block_async();
#endif
@@ -2693,9 +2688,6 @@ stg_asyncDoProczh ( W_ proc, W_ param )
ccall barf("asyncDoProc# on threaded RTS") never returns;
#else
- ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
- StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16;
-
/* could probably allocate this on the heap instead */
("ptr" ares) = ccall stgMallocBytes(SIZEOF_StgAsyncIOResult,
"stg_asyncDoProczh");
@@ -2704,6 +2696,10 @@ stg_asyncDoProczh ( W_ proc, W_ param )
StgAsyncIOResult_len(ares) = 0;
StgAsyncIOResult_errCode(ares) = 0;
StgTSO_block_info(CurrentTSO) = ares;
+
+ ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I32);
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I32;
+
ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
jump stg_block_async();
#endif
=====================================
rts/Proftimer.c
=====================================
@@ -101,7 +101,7 @@ requestHeapCensus( void ){
void
initProfTimer( void )
{
- performHeapProfile = false;
+ RELAXED_STORE_ALWAYS(&performHeapProfile, false);
ticks_to_heap_profile = RtsFlags.ProfFlags.heapProfileIntervalTicks;
@@ -124,7 +124,8 @@ handleProfTick(void)
uint32_t n;
for (n=0; n < getNumCapabilities(); n++) {
Capability *cap = getCapability(n);
- cap->r.rCCCS->time_ticks++;
+ CostCentreStack *ccs = RELAXED_LOAD(&cap->r.rCCCS);
+ ccs->time_ticks++;
traceProfSampleCostCentre(cap, cap->r.rCCCS, total_ticks);
}
}
@@ -135,7 +136,7 @@ handleProfTick(void)
ticks_to_ticky_sample--;
if (ticks_to_ticky_sample <= 0) {
ticks_to_ticky_sample = RtsFlags.ProfFlags.heapProfileIntervalTicks;
- performTickySample = true;
+ RELAXED_STORE_ALWAYS(&performTickySample, true);
}
}
#endif
@@ -144,7 +145,7 @@ handleProfTick(void)
ticks_to_heap_profile--;
if (ticks_to_heap_profile <= 0) {
ticks_to_heap_profile = RtsFlags.ProfFlags.heapProfileIntervalTicks;
- performHeapProfile = true;
+ RELAXED_STORE_ALWAYS(&performHeapProfile, true);
}
}
}
=====================================
rts/RaiseAsync.c
=====================================
@@ -266,7 +266,7 @@ check_target:
return THROWTO_BLOCKED;
}
- status = target->why_blocked;
+ status = ACQUIRE_LOAD(&target->why_blocked);
switch (status) {
case NotBlocked:
@@ -728,7 +728,7 @@ removeFromQueues(Capability *cap, StgTSO *tso)
}
done:
- tso->why_blocked = NotBlocked;
+ RELAXED_STORE(&tso->why_blocked, NotBlocked);
appendToRunQueue(cap, tso);
}
=====================================
rts/STM.c
=====================================
@@ -187,7 +187,7 @@ static StgClosure *lock_tvar(Capability *cap STG_UNUSED,
StgTVar *s STG_UNUSED) {
StgClosure *result;
TRACE("%p : lock_tvar(%p)", trec, s);
- result = SEQ_CST_LOAD(&s->current_value);
+ result = ACQUIRE_LOAD(&s->current_value);
return result;
}
@@ -198,7 +198,7 @@ static void unlock_tvar(Capability *cap,
StgBool force_update) {
TRACE("%p : unlock_tvar(%p)", trec, s);
if (force_update) {
- StgClosure *old_value = SEQ_CST_LOAD(&s->current_value);
+ StgClosure *old_value = ACQUIRE_LOAD(&s->current_value);
RELEASE_STORE(&s->current_value, c);
dirty_TVAR(cap, s, old_value);
}
@@ -210,7 +210,7 @@ static StgBool cond_lock_tvar(Capability *cap STG_UNUSED,
StgClosure *expected) {
StgClosure *result;
TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
- result = SEQ_CST_LOAD(&s->current_value);
+ result = ACQUIRE_LOAD(&s->current_value);
TRACE("%p : %s", trec, (result == expected) ? "success" : "failure");
return (result == expected);
}
@@ -231,7 +231,7 @@ static void lock_stm(StgTRecHeader *trec) {
static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
TRACE("%p : unlock_stm()", trec);
ASSERT(smp_locked == trec);
- SEQ_CST_STORE(&smp_locked, 0);
+ RELEASE_STORE(&smp_locked, 0);
}
static StgClosure *lock_tvar(Capability *cap STG_UNUSED,
@@ -240,7 +240,7 @@ static StgClosure *lock_tvar(Capability *cap STG_UNUSED,
StgClosure *result;
TRACE("%p : lock_tvar(%p)", trec, s);
ASSERT(smp_locked == trec);
- result = SEQ_CST_LOAD(&s->current_value);
+ result = ACQUIRE_LOAD(&s->current_value);
return result;
}
@@ -252,7 +252,7 @@ static void *unlock_tvar(Capability *cap,
TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
ASSERT(smp_locked == trec);
if (force_update) {
- StgClosure *old_value = SEQ_CST_LOAD(&s->current_value);
+ StgClosure *old_value = ACQUIRE_LOAD(&s->current_value);
RELEASE_STORE(&s->current_value, c);
dirty_TVAR(cap, s, old_value);
}
@@ -265,7 +265,7 @@ static StgBool cond_lock_tvar(Capability *cap STG_UNUSED,
StgClosure *result;
TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
ASSERT(smp_locked == trec);
- result = SEQ_CST_LOAD(&s->current_value);
+ result = ACQUIRE_LOAD(&s->current_value);
TRACE("%p : %d", result ? "success" : "failure");
return (result == expected);
}
@@ -291,9 +291,11 @@ static StgClosure *lock_tvar(Capability *cap,
StgClosure *result;
TRACE("%p : lock_tvar(%p)", trec, s);
do {
+ const StgInfoTable *info;
do {
- result = SEQ_CST_LOAD(&s->current_value);
- } while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info);
+ result = ACQUIRE_LOAD(&s->current_value);
+ info = GET_INFO(UNTAG_CLOSURE(result));
+ } while (info == &stg_TREC_HEADER_info);
} while (cas((void *) &s->current_value,
(StgWord)result, (StgWord)trec) != (StgWord)result);
@@ -311,7 +313,7 @@ static void unlock_tvar(Capability *cap,
StgClosure *c,
StgBool force_update STG_UNUSED) {
TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
- ASSERT(SEQ_CST_LOAD(&s->current_value) == (StgClosure *)trec);
+ ASSERT(ACQUIRE_LOAD(&s->current_value) == (StgClosure *)trec);
RELEASE_STORE(&s->current_value, c);
dirty_TVAR(cap, s, (StgClosure *) trec);
}
@@ -340,8 +342,8 @@ static StgBool cond_lock_tvar(Capability *cap,
static void park_tso(StgTSO *tso) {
ASSERT(tso -> why_blocked == NotBlocked);
- tso -> why_blocked = BlockedOnSTM;
tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
+ RELEASE_STORE(&tso -> why_blocked, BlockedOnSTM);
TRACE("park_tso on tso=%p", tso);
}
@@ -375,7 +377,7 @@ static void unpark_waiters_on(Capability *cap, StgTVar *s) {
StgTVarWatchQueue *trail;
TRACE("unpark_waiters_on tvar=%p", s);
// unblock TSOs in reverse order, to be a bit fairer (#2319)
- for (q = SEQ_CST_LOAD(&s->first_watch_queue_entry), trail = q;
+ for (q = ACQUIRE_LOAD(&s->first_watch_queue_entry), trail = q;
q != END_STM_WATCH_QUEUE;
q = q -> next_queue_entry) {
trail = q;
@@ -532,16 +534,16 @@ static void build_watch_queue_entries_for_trec(Capability *cap,
StgTVarWatchQueue *fq;
s = e -> tvar;
TRACE("%p : adding tso=%p to watch queue for tvar=%p", trec, tso, s);
- ACQ_ASSERT(SEQ_CST_LOAD(&s->current_value) == (StgClosure *)trec);
- NACQ_ASSERT(SEQ_CST_LOAD(&s->current_value) == e -> expected_value);
- fq = SEQ_CST_LOAD(&s->first_watch_queue_entry);
+ ACQ_ASSERT(ACQUIRE_LOAD(&s->current_value) == (StgClosure *)trec);
+ NACQ_ASSERT(ACQUIRE_LOAD(&s->current_value) == e -> expected_value);
+ fq = ACQUIRE_LOAD(&s->first_watch_queue_entry);
q = alloc_stg_tvar_watch_queue(cap, (StgClosure*) tso);
q -> next_queue_entry = fq;
q -> prev_queue_entry = END_STM_WATCH_QUEUE;
if (fq != END_STM_WATCH_QUEUE) {
fq -> prev_queue_entry = q;
}
- SEQ_CST_STORE(&s->first_watch_queue_entry, q);
+ RELEASE_STORE(&s->first_watch_queue_entry, q);
e -> new_value = (StgClosure *) q;
dirty_TVAR(cap, s, (StgClosure *) fq); // we modified first_watch_queue_entry
});
@@ -569,7 +571,7 @@ static void remove_watch_queue_entries_for_trec(Capability *cap,
trec,
q -> closure,
s);
- ACQ_ASSERT(SEQ_CST_LOAD(&s->current_value) == (StgClosure *)trec);
+ ACQ_ASSERT(ACQUIRE_LOAD(&s->current_value) == (StgClosure *)trec);
nq = q -> next_queue_entry;
pq = q -> prev_queue_entry;
if (nq != END_STM_WATCH_QUEUE) {
@@ -578,8 +580,8 @@ static void remove_watch_queue_entries_for_trec(Capability *cap,
if (pq != END_STM_WATCH_QUEUE) {
pq -> next_queue_entry = nq;
} else {
- ASSERT(SEQ_CST_LOAD(&s->first_watch_queue_entry) == q);
- SEQ_CST_STORE(&s->first_watch_queue_entry, nq);
+ ASSERT(ACQUIRE_LOAD(&s->first_watch_queue_entry) == q);
+ RELEASE_STORE(&s->first_watch_queue_entry, nq);
dirty_TVAR(cap, s, (StgClosure *) q); // we modified first_watch_queue_entry
}
free_stg_tvar_watch_queue(cap, q);
@@ -727,7 +729,7 @@ static StgBool entry_is_read_only(TRecEntry *e) {
static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
StgClosure *c;
StgBool result;
- c = SEQ_CST_LOAD(&s->current_value);
+ c = ACQUIRE_LOAD(&s->current_value);
result = (c == (StgClosure *) h);
return result;
}
@@ -803,13 +805,13 @@ static StgBool validate_and_acquire_ownership (Capability *cap,
// The memory ordering here must ensure that we have two distinct
// reads to current_value, with the read from num_updates between
// them.
- if (SEQ_CST_LOAD(&s->current_value) != e -> expected_value) {
+ if (ACQUIRE_LOAD(&s->current_value) != e -> expected_value) {
TRACE("%p : doesn't match", trec);
result = false;
BREAK_FOR_EACH;
}
e->num_updates = SEQ_CST_LOAD(&s->num_updates);
- if (SEQ_CST_LOAD(&s->current_value) != e -> expected_value) {
+ if (ACQUIRE_LOAD(&s->current_value) != e -> expected_value) {
TRACE("%p : doesn't match (race)", trec);
result = false;
BREAK_FOR_EACH;
@@ -852,7 +854,7 @@ static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
// We must first load current_value then num_updates; this is inverse of
// the order of the stores in stmCommitTransaction.
- StgClosure *current_value = SEQ_CST_LOAD(&s->current_value);
+ StgClosure *current_value = ACQUIRE_LOAD(&s->current_value);
StgInt num_updates = SEQ_CST_LOAD(&s->num_updates);
// Note we need both checks and in this order as the TVar could be
@@ -1186,7 +1188,7 @@ StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
unlock_tvar(cap, trec, s, e -> expected_value, false);
}
merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
- ACQ_ASSERT(s -> current_value != (StgClosure *)trec);
+ ACQ_ASSERT(ACQUIRE_LOAD(&s->current_value) != (StgClosure *)trec);
});
} else {
revert_ownership(cap, trec, false);
=====================================
rts/Schedule.c
=====================================
@@ -512,7 +512,8 @@ run_thread:
#endif
if (ret == ThreadBlocked) {
- if (t->why_blocked == BlockedOnBlackHole) {
+ uint16_t why_blocked = ACQUIRE_LOAD(&t->why_blocked);
+ if (why_blocked == BlockedOnBlackHole) {
StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
traceEventStopThread(cap, t, t->why_blocked + 6,
owner != NULL ? owner->id : 0);
@@ -1385,7 +1386,7 @@ scheduleNeedHeapProfile( bool ready_to_gc )
{
// When we have +RTS -i0 and we're heap profiling, do a census at
// every GC. This lets us get repeatable runs for debugging.
- if (performHeapProfile ||
+ if (RELAXED_LOAD(&performHeapProfile) ||
(RtsFlags.ProfFlags.heapProfileInterval==0 &&
RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
return true;
@@ -1946,7 +1947,7 @@ delete_threads_and_gc:
// The heap census itself is done during GarbageCollect().
if (heap_census) {
- performHeapProfile = false;
+ RELAXED_STORE(&performHeapProfile, false);
}
#if defined(THREADED_RTS)
=====================================
rts/StgMiscClosures.cmm
=====================================
@@ -606,8 +606,8 @@ retry:
if (r == 0) {
goto retry;
} else {
- StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
StgTSO_block_info(CurrentTSO) = msg;
+ %release StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
jump stg_block_blackhole(node);
}
}
=====================================
rts/Threads.c
=====================================
@@ -94,8 +94,8 @@ createThread(Capability *cap, W_ size)
// Always start with the compiled code evaluator
tso->what_next = ThreadRunGHC;
- tso->why_blocked = NotBlocked;
tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
+ tso->why_blocked = NotBlocked;
tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
tso->flags = 0;
@@ -286,7 +286,7 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
}
#endif
- switch (tso->why_blocked)
+ switch (ACQUIRE_LOAD(&tso->why_blocked))
{
case BlockedOnMVar:
case BlockedOnMVarRead:
@@ -826,10 +826,11 @@ loop:
}
}
- ASSERT(tso->block_info.closure == (StgClosure*)mvar);
// save why_blocked here, because waking up the thread destroys
// this information
- StgWord why_blocked = RELAXED_LOAD(&tso->why_blocked);
+ StgWord why_blocked = ACQUIRE_LOAD(&tso->why_blocked);
+ ASSERT(why_blocked == BlockedOnMVarRead || why_blocked == BlockedOnMVar);
+ ASSERT(tso->block_info.closure == (StgClosure*)mvar);
// actually perform the takeMVar
StgStack* stack = tso->stackobj;
@@ -903,7 +904,7 @@ StgMutArrPtrs *listThreads(Capability *cap)
void
printThreadBlockage(StgTSO *tso)
{
- switch (tso->why_blocked) {
+ switch (ACQUIRE_LOAD(&tso->why_blocked)) {
#if defined(mingw32_HOST_OS)
case BlockedOnDoProc:
debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);
=====================================
rts/TraverseHeap.c
=====================================
@@ -1239,7 +1239,7 @@ inner_loop:
traversePushClosure(ts, (StgClosure *) tso->blocked_exceptions, c, sep, child_data);
traversePushClosure(ts, (StgClosure *) tso->bq, c, sep, child_data);
traversePushClosure(ts, (StgClosure *) tso->trec, c, sep, child_data);
- switch (tso->why_blocked) {
+ switch (ACQUIRE_LOAD(&tso->why_blocked)) {
case BlockedOnMVar:
case BlockedOnMVarRead:
case BlockedOnBlackHole:
=====================================
rts/include/rts/storage/ClosureMacros.h
=====================================
@@ -184,7 +184,7 @@ EXTERN_INLINE StgHalfWord GET_TAG(const StgClosure *con)
// Use when changing a closure from one kind to another
#define OVERWRITE_INFO(c, new_info) \
OVERWRITING_CLOSURE((StgClosure *)(c)); \
- SET_INFO((StgClosure *)(c), (new_info)); \
+ SET_INFO_RELAXED((StgClosure *)(c), (new_info)); \
LDV_RECORD_CREATE(c);
/* -----------------------------------------------------------------------------
=====================================
rts/include/rts/storage/TSO.h
=====================================
@@ -126,9 +126,14 @@ typedef struct StgTSO_ {
*/
StgWord16 what_next; // Values defined in Constants.h
- StgWord16 why_blocked; // Values defined in Constants.h
StgWord32 flags; // Values defined in Constants.h
- StgTSOBlockInfo block_info;
+
+ /*
+ * N.B. why_blocked only has a handful of values but must be atomically
+ * updated; the smallest width which AArch64 supports for is 32-bits.
+ */
+ StgWord32 why_blocked; // Values defined in Constants.h
+ StgTSOBlockInfo block_info; // Barrier provided by why_blocked
StgThreadID id;
StgWord32 saved_errno;
StgWord32 dirty; /* non-zero => dirty */
=====================================
rts/include/stg/SMP.h
=====================================
@@ -218,6 +218,7 @@ EXTERN_INLINE void busy_wait_nop(void);
* - StgSmallMutArrPtrs: payload
* - StgThunk although this is a somewhat special case; see below
* - StgInd: indirectee
+ * - StgTSO: block_info
*
* Finally, non-pointer fields can be safely mutated without barriers as
* they do not refer to other memory locations. Technically, concurrent
@@ -346,6 +347,14 @@ EXTERN_INLINE void busy_wait_nop(void);
* the capability-local mut_list. Consequently this does not require any memory
* barrier.
*
+ * Barriers in thread blocking
+ * ---------------------------
+ * When a thread blocks (e.g. on an MVar) it will typically allocate a heap object
+ * to record its blocked-ness (e.g. a StgMVarTSOQueue), expose this via
+ * StgTSO.block_info, and update StgTSO.why_blocked to record the reason for
+ * its blocking. The visibility of the block_info is guaranteed by the ordering
+ * of the why_blocked update.
+ *
* Barriers in thread migration
* ----------------------------
* When a thread is migrated from one capability to another we must take care
=====================================
rts/posix/Select.c
=====================================
@@ -105,7 +105,7 @@ static bool wakeUpSleepingThreads (Capability *cap, LowResTime now)
break;
}
iomgr->sleeping_queue = tso->_link;
- tso->why_blocked = NotBlocked;
+ RELAXED_STORE(&tso->why_blocked, NotBlocked);
tso->_link = END_TSO_QUEUE;
IF_DEBUG(scheduler, debugBelch("Waking up sleeping thread %"
FMT_StgThreadID "\n", tso->id));
@@ -268,7 +268,7 @@ awaitEvent(Capability *cap, bool wait)
* So the (int) cast should be removed across the code base once
* GHC requires a version of FreeBSD that has that change in it.
*/
- switch (tso->why_blocked) {
+ switch (ACQUIRE_LOAD(&tso->why_blocked)) {
case BlockedOnRead:
{
int fd = tso->block_info.fd;
=====================================
rts/sm/Compact.c
=====================================
@@ -463,7 +463,7 @@ thread_TSO (StgTSO *tso)
thread_(&tso->_link);
thread_(&tso->global_link);
- switch (tso->why_blocked) {
+ switch (ACQUIRE_LOAD(&tso->why_blocked)) {
case BlockedOnMVar:
case BlockedOnMVarRead:
case BlockedOnBlackHole:
=====================================
rts/sm/GC.c
=====================================
@@ -340,8 +340,8 @@ GarbageCollect (struct GcConfig config,
// attribute any costs to CCS_GC
#if defined(PROFILING)
for (n = 0; n < getNumCapabilities(); n++) {
- save_CCS[n] = getCapability(n)->r.rCCCS;
- getCapability(n)->r.rCCCS = CCS_GC;
+ save_CCS[n] = RELAXED_LOAD(&getCapability(n)->r.rCCCS);
+ RELAXED_STORE(&getCapability(n)->r.rCCCS, CCS_GC);
}
#endif
@@ -979,9 +979,9 @@ GarbageCollect (struct GcConfig config,
// Post ticky counter sample.
// We do this at the end of execution since tickers are registered in the
// course of program execution.
- if (performTickySample) {
+ if (RELAXED_LOAD(&performTickySample)) {
emitTickyCounterSamples();
- performTickySample = false;
+ RELAXED_STORE(&performTickySample, false);
}
#endif
=====================================
rts/sm/GCAux.c
=====================================
@@ -91,7 +91,7 @@ isAlive(StgClosure *p)
return TAG_CLOSURE(tag,(StgClosure*)UN_FORWARDING_PTR(info));
}
- info = ACQUIRE_LOAD(&q->header.info);
+ ACQUIRE_FENCE_ON(&q->header.info);
info = INFO_PTR_TO_STRUCT(info);
switch (info->type) {
=====================================
rts/sm/NonMovingMark.c
=====================================
@@ -1052,7 +1052,7 @@ trace_tso (MarkQueue *queue, StgTSO *tso)
if (tso->label != NULL) {
markQueuePushClosure_(queue, (StgClosure *) tso->label);
}
- switch (tso->why_blocked) {
+ switch (ACQUIRE_LOAD(&tso->why_blocked)) {
case BlockedOnMVar:
case BlockedOnMVarRead:
case BlockedOnBlackHole:
=====================================
rts/sm/Scav.c
=====================================
@@ -137,7 +137,7 @@ scavengeTSO (StgTSO *tso)
evacuate((StgClosure **)&tso->label);
}
- switch (tso->why_blocked) {
+ switch (ACQUIRE_LOAD(&tso->why_blocked)) {
case BlockedOnMVar:
case BlockedOnMVarRead:
case BlockedOnBlackHole:
=====================================
rts/sm/Storage.c
=====================================
@@ -1440,7 +1440,7 @@ dirty_MUT_VAR(StgRegTable *reg, StgMutVar *mvar, StgClosure *old)
Capability *cap = regTableToCapability(reg);
// No barrier required here as no other heap object fields are read. See
// Note [Heap memory barriers] in SMP.h.
- SET_INFO((StgClosure*) mvar, &stg_MUT_VAR_DIRTY_info);
+ SET_INFO_RELAXED((StgClosure*) mvar, &stg_MUT_VAR_DIRTY_info);
recordClosureMutated(cap, (StgClosure *) mvar);
IF_NONMOVING_WRITE_BARRIER_ENABLED {
// See Note [Dirty flags in the non-moving collector] in NonMoving.c
@@ -1462,7 +1462,7 @@ dirty_TVAR(Capability *cap, StgTVar *p,
// No barrier required here as no other heap object fields are read. See
// Note [Heap memory barriers] in SMP.h.
if (RELAXED_LOAD(&p->header.info) == &stg_TVAR_CLEAN_info) {
- SET_INFO((StgClosure*) p, &stg_TVAR_DIRTY_info);
+ SET_INFO_RELAXED((StgClosure*) p, &stg_TVAR_DIRTY_info);
recordClosureMutated(cap,(StgClosure*)p);
IF_NONMOVING_WRITE_BARRIER_ENABLED {
// See Note [Dirty flags in the non-moving collector] in NonMoving.c
=====================================
rts/win32/AsyncMIO.c
=====================================
@@ -294,7 +294,7 @@ start:
for(tso = iomgr->blocked_queue_hd; tso != END_TSO_QUEUE;
tso = tso->_link) {
- switch(tso->why_blocked) {
+ switch(ACQUIRE_LOAD(&tso->why_blocked)) {
case BlockedOnRead:
case BlockedOnWrite:
case BlockedOnDoProc:
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/bc8eba8e67bb9b1134e0a4a210e3bf79acbf7ac5...d6e45c01d61bf4cb300f4192794b059e4a2697b6
--
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/bc8eba8e67bb9b1134e0a4a210e3bf79acbf7ac5...d6e45c01d61bf4cb300f4192794b059e4a2697b6
You're receiving this email because of your account on gitlab.haskell.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/ghc-commits/attachments/20231216/96ab49c2/attachment-0001.html>
More information about the ghc-commits
mailing list