[Git][ghc/ghc][wip/tsan/fix-races] 8 commits: rts: Fix synchronization on thread blocking state

Ben Gamari (@bgamari) gitlab at gitlab.haskell.org
Sat Dec 16 01:25:47 UTC 2023



Ben Gamari pushed to branch wip/tsan/fix-races at Glasgow Haskell Compiler / GHC


Commits:
3f207212 by Ben Gamari at 2023-12-15T20:24:40-05:00
rts: Fix synchronization on thread blocking state

We now use a release barrier whenever we update a thread's blocking
state. This required widening StgTSO.why_blocked as AArch64 does not
support atomic writes on 16-bit values.

- - - - -
03f58de6 by Ben Gamari at 2023-12-15T20:25:21-05:00
rts: Use relaxed ordering on dirty/clean info tables updates

When changing the dirty/clean state of a mutable object we needn't have
any particular ordering.

- - - - -
23e628ff by Ben Gamari at 2023-12-15T20:25:21-05:00
codeGen: Use relaxed-read in closureInfoPtr

- - - - -
cd53715b by Ben Gamari at 2023-12-15T20:25:21-05:00
STM: Use acquire loads when possible

Full sequential consistency is not needed here.

- - - - -
4ce8a431 by Ben Gamari at 2023-12-15T20:25:22-05:00
rts/Messages: Fix data race

- - - - -
ae21b34e by Ben Gamari at 2023-12-15T20:25:22-05:00
rts/Prof: Fix data race

- - - - -
fafa9b34 by Ben Gamari at 2023-12-15T20:25:22-05:00
rts: Use fence rather than redundant load

Previously we would use an atomic load to ensure acquire ordering.
However, we now have `ACQUIRE_FENCE_ON`, which allows us to express this
more directly.

- - - - -
bc8eba8e by Ben Gamari at 2023-12-15T20:25:22-05:00
rts: Fix data races in profiling timer

- - - - -


22 changed files:

- compiler/GHC/Cmm/Info.hs
- rts/Exception.cmm
- rts/Messages.c
- rts/PrimOps.cmm
- rts/Proftimer.c
- rts/RaiseAsync.c
- rts/STM.c
- rts/Schedule.c
- rts/StgMiscClosures.cmm
- rts/Threads.c
- rts/TraverseHeap.c
- rts/include/rts/storage/ClosureMacros.h
- rts/include/rts/storage/TSO.h
- rts/include/stg/SMP.h
- rts/posix/Select.c
- rts/sm/Compact.c
- rts/sm/GC.c
- rts/sm/GCAux.c
- rts/sm/NonMovingMark.c
- rts/sm/Scav.c
- rts/sm/Storage.c
- rts/win32/AsyncMIO.c


Changes:

=====================================
compiler/GHC/Cmm/Info.hs
=====================================
@@ -449,7 +449,7 @@ wordAligned platform align_check e
 -- | Takes a closure pointer and returns the info table pointer
 closureInfoPtr :: Platform -> DoAlignSanitisation -> CmmExpr -> CmmExpr
 closureInfoPtr platform align_check e =
-    cmmLoadBWord platform (wordAligned platform align_check e)
+    CmmMachOp (MO_RelaxedRead (wordWidth platform)) [wordAligned platform align_check e]
 
 -- | Takes an info pointer (the first word of a closure) and returns its entry
 -- code


=====================================
rts/Exception.cmm
=====================================
@@ -351,9 +351,9 @@ stg_killThreadzh (P_ target, P_ exception)
         if (msg == NULL) {
             return ();
         } else {
-            StgTSO_why_blocked(CurrentTSO) = BlockedOnMsgThrowTo;
             updateRemembSetPushPtr(StgTSO_block_info(CurrentTSO));
             StgTSO_block_info(CurrentTSO) = msg;
+            %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMsgThrowTo;
             // we must block, and unlock the message before returning
             jump stg_block_throwto (target, exception);
         }


=====================================
rts/Messages.c
=====================================
@@ -205,7 +205,7 @@ uint32_t messageBlackHole(Capability *cap, MessageBlackHole *msg)
         StgTSO *owner = (StgTSO*)p;
 
 #if defined(THREADED_RTS)
-        if (owner->cap != cap) {
+        if (RELAXED_LOAD(&owner->cap) != cap) {
             sendMessage(cap, owner->cap, (Message*)msg);
             debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d",
                           owner->cap->no);
@@ -275,7 +275,7 @@ uint32_t messageBlackHole(Capability *cap, MessageBlackHole *msg)
         ASSERT(owner != END_TSO_QUEUE);
 
 #if defined(THREADED_RTS)
-        if (owner->cap != cap) {
+        if (RELAXED_LOAD(&owner->cap) != cap) {
             sendMessage(cap, owner->cap, (Message*)msg);
             debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d",
                           owner->cap->no);


=====================================
rts/PrimOps.cmm
=====================================
@@ -827,7 +827,9 @@ stg_atomicModifyMutVarzuzh ( gcptr mv, gcptr f )
     StgMutVar_var(mv) = z;
 #endif
 
-    if (GET_INFO(mv) == stg_MUT_VAR_CLEAN_info) {
+    W_ info;
+    info = %relaxed GET_INFO(mv);
+    if (info == stg_MUT_VAR_CLEAN_info) {
         ccall dirty_MUT_VAR(BaseReg "ptr", mv "ptr", x "ptr");
     }
 
@@ -1715,21 +1717,17 @@ stg_takeMVarzh ( P_ mvar /* :: MVar a */ )
         StgMVarTSOQueue_link(q) = END_TSO_QUEUE;
         StgMVarTSOQueue_tso(q)  = CurrentTSO;
         SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
-        // Write barrier before we make the new MVAR_TSO_QUEUE
-        // visible to other cores.
-        // See Note [Heap memory barriers]
-        RELEASE_FENCE;
 
         if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
-            StgMVar_head(mvar) = q;
+            %release StgMVar_head(mvar) = q;
         } else {
-            StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q;
+            %release StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q;
             ccall recordClosureMutated(MyCapability() "ptr",
                                              StgMVar_tail(mvar));
         }
         StgTSO__link(CurrentTSO)       = q;
         StgTSO_block_info(CurrentTSO)  = mvar;
-        StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
+        %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
         StgMVar_tail(mvar)             = q;
 
         jump stg_block_takemvar(mvar);
@@ -1884,19 +1882,17 @@ stg_putMVarzh ( P_ mvar, /* :: MVar a */
         StgMVarTSOQueue_tso(q)  = CurrentTSO;
 
         SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
-        // See Note [Heap memory barriers]
-        RELEASE_FENCE;
 
         if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
-            StgMVar_head(mvar) = q;
+            %release StgMVar_head(mvar) = q;
         } else {
-            StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q;
+            %release StgMVarTSOQueue_link(StgMVar_tail(mvar)) = q;
             ccall recordClosureMutated(MyCapability() "ptr",
                                              StgMVar_tail(mvar));
         }
         StgTSO__link(CurrentTSO)       = q;
         StgTSO_block_info(CurrentTSO)  = mvar;
-        StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
+        %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
         StgMVar_tail(mvar)             = q;
 
         jump stg_block_putmvar(mvar,val);
@@ -2033,11 +2029,11 @@ loop:
         }
     }
 
-    ASSERT(StgTSO_block_info(tso) == mvar);
     // save why_blocked here, because waking up the thread destroys
     // this information
     W_ why_blocked;
-    why_blocked = TO_W_(StgTSO_why_blocked(tso));
+    why_blocked = TO_W_(StgTSO_why_blocked(tso)); // TODO: Missing barrier
+    ASSERT(StgTSO_block_info(tso) == mvar);
 
     // actually perform the takeMVar
     W_ stack;
@@ -2093,13 +2089,11 @@ stg_readMVarzh ( P_ mvar, /* :: MVar a */ )
         StgMVarTSOQueue_tso(q)  = CurrentTSO;
 
         SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
-        // See Note [Heap memory barriers]
-        RELEASE_FENCE;
 
+        %release StgMVar_head(mvar) = q;
         StgTSO__link(CurrentTSO)       = q;
         StgTSO_block_info(CurrentTSO)  = mvar;
-        StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16;
-        StgMVar_head(mvar) = q;
+        %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMVarRead::I16;
 
         if (StgMVar_tail(mvar) == stg_END_TSO_QUEUE_closure) {
             StgMVar_tail(mvar) = q;
@@ -2226,17 +2220,16 @@ stg_readIOPortzh ( P_ ioport /* :: IOPort a */ )
         StgMVarTSOQueue_tso(q)  = CurrentTSO;
 
         SET_HDR(q, stg_MVAR_TSO_QUEUE_info, CCS_SYSTEM);
-        // See Note [Heap memory barriers]
-        RELEASE_FENCE;
 
-        StgMVar_head(ioport) = q;
+        %release StgMVar_head(ioport) = q;
         StgTSO__link(CurrentTSO)       = q;
         StgTSO_block_info(CurrentTSO)  = ioport;
-        StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
+
+        // See Note [Heap memory barriers]
+        %release StgTSO_why_blocked(CurrentTSO) = BlockedOnMVar::I16;
 
         //Unlocks the closure as well
         jump stg_block_readmvar(ioport);
-
     }
 
     //This way we can check of there has been a read already.
@@ -2314,11 +2307,11 @@ loop:
     // next element in the waiting list here, as there can only ever
     // be one thread blocked on a port.
 
-    ASSERT(StgTSO_block_info(tso) == ioport);
     // save why_blocked here, because waking up the thread destroys
     // this information
     W_ why_blocked;
-    why_blocked = TO_W_(StgTSO_why_blocked(tso));
+    why_blocked = TO_W_(StgTSO_why_blocked(tso)); // TODO Missing acquire
+    ASSERT(StgTSO_block_info(tso) == ioport);
 
     // actually perform the takeMVar
     W_ stack;
@@ -2560,8 +2553,8 @@ stg_waitReadzh ( W_ fd )
 #else
 
     ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
-    StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16;
     StgTSO_block_info(CurrentTSO) = fd;
+    %release StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16;
     // No locking - we're not going to use this interface in the
     // threaded RTS anyway.
     ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
@@ -2576,8 +2569,8 @@ stg_waitWritezh ( W_ fd )
 #else
 
     ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
-    StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16;
     StgTSO_block_info(CurrentTSO) = fd;
+    %release StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16;
     // No locking - we're not going to use this interface in the
     // threaded RTS anyway.
     ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
@@ -2599,7 +2592,6 @@ stg_delayzh ( W_ us_delay )
 #else
 
     ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
-    StgTSO_why_blocked(CurrentTSO) = BlockedOnDelay::I16;
 
 #if defined(mingw32_HOST_OS)
 
@@ -2616,12 +2608,13 @@ stg_delayzh ( W_ us_delay )
      * simplifies matters, so change the status to OnDoProc put the
      * delayed thread on the blocked_queue.
      */
-    StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16;
+    %release StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16;
     ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
     jump stg_block_async_void();
 
 #else
 
+    %relaxed StgTSO_why_blocked(CurrentTSO) = BlockedOnDelay::I16;
     (target) = ccall getDelayTarget(us_delay);
 
     StgTSO_block_info(CurrentTSO) = target;
@@ -2643,9 +2636,6 @@ stg_asyncReadzh ( W_ fd, W_ is_sock, W_ len, W_ buf )
     ccall barf("asyncRead# on threaded RTS") never returns;
 #else
 
-    ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
-    StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16;
-
     /* could probably allocate this on the heap instead */
     ("ptr" ares) = ccall stgMallocBytes(SIZEOF_StgAsyncIOResult,
                                         "stg_asyncReadzh");
@@ -2654,6 +2644,10 @@ stg_asyncReadzh ( W_ fd, W_ is_sock, W_ len, W_ buf )
     StgAsyncIOResult_len(ares)     = 0;
     StgAsyncIOResult_errCode(ares) = 0;
     StgTSO_block_info(CurrentTSO)  = ares;
+
+    ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
+    %release StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16;
+
     ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
     jump stg_block_async();
 #endif
@@ -2668,9 +2662,6 @@ stg_asyncWritezh ( W_ fd, W_ is_sock, W_ len, W_ buf )
     ccall barf("asyncWrite# on threaded RTS") never returns;
 #else
 
-    ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
-    StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16;
-
     ("ptr" ares) = ccall stgMallocBytes(SIZEOF_StgAsyncIOResult,
                                         "stg_asyncWritezh");
     (reqID) = ccall addIORequest(fd, 1/*TRUE*/,is_sock,len,buf "ptr");
@@ -2679,6 +2670,10 @@ stg_asyncWritezh ( W_ fd, W_ is_sock, W_ len, W_ buf )
     StgAsyncIOResult_len(ares)     = 0;
     StgAsyncIOResult_errCode(ares) = 0;
     StgTSO_block_info(CurrentTSO)  = ares;
+
+    ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
+    %release StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16;
+
     ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
     jump stg_block_async();
 #endif
@@ -2693,9 +2688,6 @@ stg_asyncDoProczh ( W_ proc, W_ param )
     ccall barf("asyncDoProc# on threaded RTS") never returns;
 #else
 
-    ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
-    StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16;
-
     /* could probably allocate this on the heap instead */
     ("ptr" ares) = ccall stgMallocBytes(SIZEOF_StgAsyncIOResult,
                                         "stg_asyncDoProczh");
@@ -2704,6 +2696,10 @@ stg_asyncDoProczh ( W_ proc, W_ param )
     StgAsyncIOResult_len(ares)     = 0;
     StgAsyncIOResult_errCode(ares) = 0;
     StgTSO_block_info(CurrentTSO) = ares;
+
+    ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16);
+    %release StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16;
+
     ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
     jump stg_block_async();
 #endif


=====================================
rts/Proftimer.c
=====================================
@@ -101,7 +101,7 @@ requestHeapCensus( void ){
 void
 initProfTimer( void )
 {
-    performHeapProfile = false;
+    RELAXED_STORE_ALWAYS(&performHeapProfile, false);
 
     ticks_to_heap_profile = RtsFlags.ProfFlags.heapProfileIntervalTicks;
 
@@ -124,7 +124,8 @@ handleProfTick(void)
         uint32_t n;
         for (n=0; n < getNumCapabilities(); n++) {
             Capability *cap = getCapability(n);
-            cap->r.rCCCS->time_ticks++;
+            CostCentreStack *ccs = RELAXED_LOAD(&cap->r.rCCCS);
+            ccs->time_ticks++;
             traceProfSampleCostCentre(cap, cap->r.rCCCS, total_ticks);
         }
     }
@@ -135,7 +136,7 @@ handleProfTick(void)
         ticks_to_ticky_sample--;
         if (ticks_to_ticky_sample <= 0) {
             ticks_to_ticky_sample = RtsFlags.ProfFlags.heapProfileIntervalTicks;
-            performTickySample = true;
+            RELAXED_STORE_ALWAYS(&performTickySample, true);
         }
     }
 #endif
@@ -144,7 +145,7 @@ handleProfTick(void)
         ticks_to_heap_profile--;
         if (ticks_to_heap_profile <= 0) {
             ticks_to_heap_profile = RtsFlags.ProfFlags.heapProfileIntervalTicks;
-            performHeapProfile = true;
+            RELAXED_STORE_ALWAYS(&performHeapProfile, true);
         }
     }
 }


=====================================
rts/RaiseAsync.c
=====================================
@@ -266,7 +266,7 @@ check_target:
         return THROWTO_BLOCKED;
     }
 
-    status = target->why_blocked;
+    status = ACQUIRE_LOAD(&target->why_blocked);
 
     switch (status) {
     case NotBlocked:
@@ -728,7 +728,7 @@ removeFromQueues(Capability *cap, StgTSO *tso)
   }
 
  done:
-  tso->why_blocked = NotBlocked;
+  RELAXED_STORE(&tso->why_blocked, NotBlocked);
   appendToRunQueue(cap, tso);
 }
 


=====================================
rts/STM.c
=====================================
@@ -187,7 +187,7 @@ static StgClosure *lock_tvar(Capability *cap STG_UNUSED,
                              StgTVar *s STG_UNUSED) {
   StgClosure *result;
   TRACE("%p : lock_tvar(%p)", trec, s);
-  result = SEQ_CST_LOAD(&s->current_value);
+  result = ACQUIRE_LOAD(&s->current_value);
   return result;
 }
 
@@ -198,7 +198,7 @@ static void unlock_tvar(Capability *cap,
                         StgBool force_update) {
   TRACE("%p : unlock_tvar(%p)", trec, s);
   if (force_update) {
-    StgClosure *old_value = SEQ_CST_LOAD(&s->current_value);
+    StgClosure *old_value = ACQUIRE_LOAD(&s->current_value);
     RELEASE_STORE(&s->current_value, c);
     dirty_TVAR(cap, s, old_value);
   }
@@ -210,7 +210,7 @@ static StgBool cond_lock_tvar(Capability *cap STG_UNUSED,
                               StgClosure *expected) {
   StgClosure *result;
   TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
-  result = SEQ_CST_LOAD(&s->current_value);
+  result = ACQUIRE_LOAD(&s->current_value);
   TRACE("%p : %s", trec, (result == expected) ? "success" : "failure");
   return (result == expected);
 }
@@ -231,7 +231,7 @@ static void lock_stm(StgTRecHeader *trec) {
 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
   TRACE("%p : unlock_stm()", trec);
   ASSERT(smp_locked == trec);
-  SEQ_CST_STORE(&smp_locked, 0);
+  RELEASE_STORE(&smp_locked, 0);
 }
 
 static StgClosure *lock_tvar(Capability *cap STG_UNUSED,
@@ -240,7 +240,7 @@ static StgClosure *lock_tvar(Capability *cap STG_UNUSED,
   StgClosure *result;
   TRACE("%p : lock_tvar(%p)", trec, s);
   ASSERT(smp_locked == trec);
-  result = SEQ_CST_LOAD(&s->current_value);
+  result = ACQUIRE_LOAD(&s->current_value);
   return result;
 }
 
@@ -252,7 +252,7 @@ static void *unlock_tvar(Capability *cap,
   TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
   ASSERT(smp_locked == trec);
   if (force_update) {
-    StgClosure *old_value = SEQ_CST_LOAD(&s->current_value);
+    StgClosure *old_value = ACQUIRE_LOAD(&s->current_value);
     RELEASE_STORE(&s->current_value, c);
     dirty_TVAR(cap, s, old_value);
   }
@@ -265,7 +265,7 @@ static StgBool cond_lock_tvar(Capability *cap STG_UNUSED,
   StgClosure *result;
   TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
   ASSERT(smp_locked == trec);
-  result = SEQ_CST_LOAD(&s->current_value);
+  result = ACQUIRE_LOAD(&s->current_value);
   TRACE("%p : %d", result ? "success" : "failure");
   return (result == expected);
 }
@@ -291,9 +291,11 @@ static StgClosure *lock_tvar(Capability *cap,
   StgClosure *result;
   TRACE("%p : lock_tvar(%p)", trec, s);
   do {
+    const StgInfoTable *info;
     do {
-      result = SEQ_CST_LOAD(&s->current_value);
-    } while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info);
+      result = ACQUIRE_LOAD(&s->current_value);
+      info = GET_INFO(UNTAG_CLOSURE(result));
+    } while (info == &stg_TREC_HEADER_info);
   } while (cas((void *) &s->current_value,
                (StgWord)result, (StgWord)trec) != (StgWord)result);
 
@@ -311,7 +313,7 @@ static void unlock_tvar(Capability *cap,
                         StgClosure *c,
                         StgBool force_update STG_UNUSED) {
   TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
-  ASSERT(SEQ_CST_LOAD(&s->current_value) == (StgClosure *)trec);
+  ASSERT(ACQUIRE_LOAD(&s->current_value) == (StgClosure *)trec);
   RELEASE_STORE(&s->current_value, c);
   dirty_TVAR(cap, s, (StgClosure *) trec);
 }
@@ -340,8 +342,8 @@ static StgBool cond_lock_tvar(Capability *cap,
 
 static void park_tso(StgTSO *tso) {
   ASSERT(tso -> why_blocked == NotBlocked);
-  tso -> why_blocked = BlockedOnSTM;
   tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
+  RELEASE_STORE(&tso -> why_blocked, BlockedOnSTM);
   TRACE("park_tso on tso=%p", tso);
 }
 
@@ -375,7 +377,7 @@ static void unpark_waiters_on(Capability *cap, StgTVar *s) {
   StgTVarWatchQueue *trail;
   TRACE("unpark_waiters_on tvar=%p", s);
   // unblock TSOs in reverse order, to be a bit fairer (#2319)
-  for (q = SEQ_CST_LOAD(&s->first_watch_queue_entry), trail = q;
+  for (q = ACQUIRE_LOAD(&s->first_watch_queue_entry), trail = q;
        q != END_STM_WATCH_QUEUE;
        q = q -> next_queue_entry) {
     trail = q;
@@ -532,16 +534,16 @@ static void build_watch_queue_entries_for_trec(Capability *cap,
     StgTVarWatchQueue *fq;
     s = e -> tvar;
     TRACE("%p : adding tso=%p to watch queue for tvar=%p", trec, tso, s);
-    ACQ_ASSERT(SEQ_CST_LOAD(&s->current_value) == (StgClosure *)trec);
-    NACQ_ASSERT(SEQ_CST_LOAD(&s->current_value) == e -> expected_value);
-    fq = SEQ_CST_LOAD(&s->first_watch_queue_entry);
+    ACQ_ASSERT(ACQUIRE_LOAD(&s->current_value) == (StgClosure *)trec);
+    NACQ_ASSERT(ACQUIRE_LOAD(&s->current_value) == e -> expected_value);
+    fq = ACQUIRE_LOAD(&s->first_watch_queue_entry);
     q = alloc_stg_tvar_watch_queue(cap, (StgClosure*) tso);
     q -> next_queue_entry = fq;
     q -> prev_queue_entry = END_STM_WATCH_QUEUE;
     if (fq != END_STM_WATCH_QUEUE) {
       fq -> prev_queue_entry = q;
     }
-    SEQ_CST_STORE(&s->first_watch_queue_entry, q);
+    RELEASE_STORE(&s->first_watch_queue_entry, q);
     e -> new_value = (StgClosure *) q;
     dirty_TVAR(cap, s, (StgClosure *) fq); // we modified first_watch_queue_entry
   });
@@ -569,7 +571,7 @@ static void remove_watch_queue_entries_for_trec(Capability *cap,
           trec,
           q -> closure,
           s);
-    ACQ_ASSERT(SEQ_CST_LOAD(&s->current_value) == (StgClosure *)trec);
+    ACQ_ASSERT(ACQUIRE_LOAD(&s->current_value) == (StgClosure *)trec);
     nq = q -> next_queue_entry;
     pq = q -> prev_queue_entry;
     if (nq != END_STM_WATCH_QUEUE) {
@@ -578,8 +580,8 @@ static void remove_watch_queue_entries_for_trec(Capability *cap,
     if (pq != END_STM_WATCH_QUEUE) {
       pq -> next_queue_entry = nq;
     } else {
-      ASSERT(SEQ_CST_LOAD(&s->first_watch_queue_entry) == q);
-      SEQ_CST_STORE(&s->first_watch_queue_entry, nq);
+      ASSERT(ACQUIRE_LOAD(&s->first_watch_queue_entry) == q);
+      RELEASE_STORE(&s->first_watch_queue_entry, nq);
       dirty_TVAR(cap, s, (StgClosure *) q); // we modified first_watch_queue_entry
     }
     free_stg_tvar_watch_queue(cap, q);
@@ -727,7 +729,7 @@ static StgBool entry_is_read_only(TRecEntry *e) {
 static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
   StgClosure *c;
   StgBool result;
-  c = SEQ_CST_LOAD(&s->current_value);
+  c = ACQUIRE_LOAD(&s->current_value);
   result = (c == (StgClosure *) h);
   return result;
 }
@@ -803,13 +805,13 @@ static StgBool validate_and_acquire_ownership (Capability *cap,
           // The memory ordering here must ensure that we have two distinct
           // reads to current_value, with the read from num_updates between
           // them.
-          if (SEQ_CST_LOAD(&s->current_value) != e -> expected_value) {
+          if (ACQUIRE_LOAD(&s->current_value) != e -> expected_value) {
             TRACE("%p : doesn't match", trec);
             result = false;
             BREAK_FOR_EACH;
           }
           e->num_updates = SEQ_CST_LOAD(&s->num_updates);
-          if (SEQ_CST_LOAD(&s->current_value) != e -> expected_value) {
+          if (ACQUIRE_LOAD(&s->current_value) != e -> expected_value) {
             TRACE("%p : doesn't match (race)", trec);
             result = false;
             BREAK_FOR_EACH;
@@ -852,7 +854,7 @@ static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
 
         // We must first load current_value then num_updates; this is inverse of
         // the order of the stores in stmCommitTransaction.
-        StgClosure *current_value = SEQ_CST_LOAD(&s->current_value);
+        StgClosure *current_value = ACQUIRE_LOAD(&s->current_value);
         StgInt num_updates = SEQ_CST_LOAD(&s->num_updates);
 
         // Note we need both checks and in this order as the TVar could be
@@ -1186,7 +1188,7 @@ StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
             unlock_tvar(cap, trec, s, e -> expected_value, false);
         }
         merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
-        ACQ_ASSERT(s -> current_value != (StgClosure *)trec);
+        ACQ_ASSERT(ACQUIRE_LOAD(&s->current_value) != (StgClosure *)trec);
       });
     } else {
         revert_ownership(cap, trec, false);


=====================================
rts/Schedule.c
=====================================
@@ -512,7 +512,8 @@ run_thread:
 #endif
 
     if (ret == ThreadBlocked) {
-        if (t->why_blocked == BlockedOnBlackHole) {
+        uint16_t why_blocked = ACQUIRE_LOAD(&t->why_blocked);
+        if (why_blocked == BlockedOnBlackHole) {
             StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
             traceEventStopThread(cap, t, t->why_blocked + 6,
                                  owner != NULL ? owner->id : 0);
@@ -1385,7 +1386,7 @@ scheduleNeedHeapProfile( bool ready_to_gc )
 {
     // When we have +RTS -i0 and we're heap profiling, do a census at
     // every GC.  This lets us get repeatable runs for debugging.
-    if (performHeapProfile ||
+    if (RELAXED_LOAD(&performHeapProfile) ||
         (RtsFlags.ProfFlags.heapProfileInterval==0 &&
          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
         return true;
@@ -1946,7 +1947,7 @@ delete_threads_and_gc:
 
     // The heap census itself is done during GarbageCollect().
     if (heap_census) {
-        performHeapProfile = false;
+        RELAXED_STORE(&performHeapProfile, false);
     }
 
 #if defined(THREADED_RTS)


=====================================
rts/StgMiscClosures.cmm
=====================================
@@ -606,8 +606,8 @@ retry:
         if (r == 0) {
             goto retry;
         } else {
-            StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
             StgTSO_block_info(CurrentTSO) = msg;
+            %release StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
             jump stg_block_blackhole(node);
         }
     }


=====================================
rts/Threads.c
=====================================
@@ -94,8 +94,8 @@ createThread(Capability *cap, W_ size)
 
     // Always start with the compiled code evaluator
     tso->what_next = ThreadRunGHC;
-    tso->why_blocked  = NotBlocked;
     tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
+    tso->why_blocked  = NotBlocked;
     tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
     tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
     tso->flags = 0;
@@ -286,7 +286,7 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
     }
 #endif
 
-    switch (tso->why_blocked)
+    switch (ACQUIRE_LOAD(&tso->why_blocked))
     {
     case BlockedOnMVar:
     case BlockedOnMVarRead:
@@ -826,10 +826,11 @@ loop:
         }
     }
 
-    ASSERT(tso->block_info.closure == (StgClosure*)mvar);
     // save why_blocked here, because waking up the thread destroys
     // this information
-    StgWord why_blocked = RELAXED_LOAD(&tso->why_blocked);
+    StgWord why_blocked = ACQUIRE_LOAD(&tso->why_blocked);
+    ASSERT(why_blocked == BlockedOnMVarRead || why_blocked == BlockedOnMVar);
+    ASSERT(tso->block_info.closure == (StgClosure*)mvar);
 
     // actually perform the takeMVar
     StgStack* stack = tso->stackobj;
@@ -903,7 +904,7 @@ StgMutArrPtrs *listThreads(Capability *cap)
 void
 printThreadBlockage(StgTSO *tso)
 {
-  switch (tso->why_blocked) {
+  switch (ACQUIRE_LOAD(&tso->why_blocked)) {
 #if defined(mingw32_HOST_OS)
     case BlockedOnDoProc:
     debugBelch("is blocked on proc (request: %u)", tso->block_info.async_result->reqID);


=====================================
rts/TraverseHeap.c
=====================================
@@ -1239,7 +1239,7 @@ inner_loop:
         traversePushClosure(ts, (StgClosure *) tso->blocked_exceptions, c, sep, child_data);
         traversePushClosure(ts, (StgClosure *) tso->bq, c, sep, child_data);
         traversePushClosure(ts, (StgClosure *) tso->trec, c, sep, child_data);
-        switch (tso->why_blocked) {
+        switch (ACQUIRE_LOAD(&tso->why_blocked)) {
         case BlockedOnMVar:
         case BlockedOnMVarRead:
         case BlockedOnBlackHole:


=====================================
rts/include/rts/storage/ClosureMacros.h
=====================================
@@ -184,7 +184,7 @@ EXTERN_INLINE StgHalfWord GET_TAG(const StgClosure *con)
 // Use when changing a closure from one kind to another
 #define OVERWRITE_INFO(c, new_info)                             \
     OVERWRITING_CLOSURE((StgClosure *)(c));                     \
-    SET_INFO((StgClosure *)(c), (new_info));                    \
+    SET_INFO_RELAXED((StgClosure *)(c), (new_info));                    \
     LDV_RECORD_CREATE(c);
 
 /* -----------------------------------------------------------------------------


=====================================
rts/include/rts/storage/TSO.h
=====================================
@@ -126,9 +126,14 @@ typedef struct StgTSO_ {
      */
 
     StgWord16               what_next;      // Values defined in Constants.h
-    StgWord16               why_blocked;    // Values defined in Constants.h
     StgWord32               flags;          // Values defined in Constants.h
-    StgTSOBlockInfo         block_info;
+
+    /*
+     * N.B. why_blocked only has a handful of values but must be atomically
+     * updated; the smallest width which AArch64 supports for is 32-bits.
+     */
+    StgWord32               why_blocked;    // Values defined in Constants.h
+    StgTSOBlockInfo         block_info;     // Barrier provided by why_blocked
     StgThreadID             id;
     StgWord32               saved_errno;
     StgWord32               dirty;          /* non-zero => dirty */


=====================================
rts/include/stg/SMP.h
=====================================
@@ -218,6 +218,7 @@ EXTERN_INLINE void busy_wait_nop(void);
  *   - StgSmallMutArrPtrs: payload
  *   - StgThunk although this is a somewhat special case; see below
  *   - StgInd: indirectee
+ *   - StgTSO: block_info
  *
  * Finally, non-pointer fields can be safely mutated without barriers as
  * they do not refer to other memory locations. Technically, concurrent
@@ -346,6 +347,14 @@ EXTERN_INLINE void busy_wait_nop(void);
  * the capability-local mut_list. Consequently this does not require any memory
  * barrier.
  *
+ * Barriers in thread blocking
+ * ---------------------------
+ * When a thread blocks (e.g. on an MVar) it will typically allocate a heap object
+ * to record its blocked-ness (e.g. a StgMVarTSOQueue), expose this via
+ * StgTSO.block_info, and update StgTSO.why_blocked to record the reason for
+ * its blocking. The visibility of the block_info is guaranteed by the ordering
+ * of the why_blocked update.
+ *
  * Barriers in thread migration
  * ----------------------------
  * When a thread is migrated from one capability to another we must take care


=====================================
rts/posix/Select.c
=====================================
@@ -105,7 +105,7 @@ static bool wakeUpSleepingThreads (Capability *cap, LowResTime now)
             break;
         }
         iomgr->sleeping_queue = tso->_link;
-        tso->why_blocked = NotBlocked;
+        RELAXED_STORE(&tso->why_blocked, NotBlocked);
         tso->_link = END_TSO_QUEUE;
         IF_DEBUG(scheduler, debugBelch("Waking up sleeping thread %"
                                        FMT_StgThreadID "\n", tso->id));
@@ -268,7 +268,7 @@ awaitEvent(Capability *cap, bool wait)
        * So the (int) cast should be removed across the code base once
        * GHC requires a version of FreeBSD that has that change in it.
        */
-        switch (tso->why_blocked) {
+        switch (ACQUIRE_LOAD(&tso->why_blocked)) {
         case BlockedOnRead:
           {
             int fd = tso->block_info.fd;


=====================================
rts/sm/Compact.c
=====================================
@@ -463,7 +463,7 @@ thread_TSO (StgTSO *tso)
     thread_(&tso->_link);
     thread_(&tso->global_link);
 
-    switch (tso->why_blocked) {
+    switch (ACQUIRE_LOAD(&tso->why_blocked)) {
     case BlockedOnMVar:
     case BlockedOnMVarRead:
     case BlockedOnBlackHole:


=====================================
rts/sm/GC.c
=====================================
@@ -340,8 +340,8 @@ GarbageCollect (struct GcConfig config,
   // attribute any costs to CCS_GC
 #if defined(PROFILING)
   for (n = 0; n < getNumCapabilities(); n++) {
-      save_CCS[n] = getCapability(n)->r.rCCCS;
-      getCapability(n)->r.rCCCS = CCS_GC;
+      save_CCS[n] = RELAXED_LOAD(&getCapability(n)->r.rCCCS);
+      RELAXED_STORE(&getCapability(n)->r.rCCCS, CCS_GC);
   }
 #endif
 
@@ -979,9 +979,9 @@ GarbageCollect (struct GcConfig config,
   // Post ticky counter sample.
   // We do this at the end of execution since tickers are registered in the
   // course of program execution.
-  if (performTickySample) {
+  if (RELAXED_LOAD(&performTickySample)) {
       emitTickyCounterSamples();
-      performTickySample = false;
+      RELAXED_STORE(&performTickySample, false);
   }
 #endif
 


=====================================
rts/sm/GCAux.c
=====================================
@@ -91,7 +91,7 @@ isAlive(StgClosure *p)
         return TAG_CLOSURE(tag,(StgClosure*)UN_FORWARDING_PTR(info));
     }
 
-    info = ACQUIRE_LOAD(&q->header.info);
+    ACQUIRE_FENCE_ON(&q->header.info);
     info = INFO_PTR_TO_STRUCT(info);
 
     switch (info->type) {


=====================================
rts/sm/NonMovingMark.c
=====================================
@@ -1052,7 +1052,7 @@ trace_tso (MarkQueue *queue, StgTSO *tso)
     if (tso->label != NULL) {
         markQueuePushClosure_(queue, (StgClosure *) tso->label);
     }
-    switch (tso->why_blocked) {
+    switch (ACQUIRE_LOAD(&tso->why_blocked)) {
     case BlockedOnMVar:
     case BlockedOnMVarRead:
     case BlockedOnBlackHole:


=====================================
rts/sm/Scav.c
=====================================
@@ -137,7 +137,7 @@ scavengeTSO (StgTSO *tso)
         evacuate((StgClosure **)&tso->label);
     }
 
-    switch (tso->why_blocked) {
+    switch (ACQUIRE_LOAD(&tso->why_blocked)) {
     case BlockedOnMVar:
     case BlockedOnMVarRead:
     case BlockedOnBlackHole:


=====================================
rts/sm/Storage.c
=====================================
@@ -1440,7 +1440,7 @@ dirty_MUT_VAR(StgRegTable *reg, StgMutVar *mvar, StgClosure *old)
     Capability *cap = regTableToCapability(reg);
     // No barrier required here as no other heap object fields are read. See
     // Note [Heap memory barriers] in SMP.h.
-    SET_INFO((StgClosure*) mvar, &stg_MUT_VAR_DIRTY_info);
+    SET_INFO_RELAXED((StgClosure*) mvar, &stg_MUT_VAR_DIRTY_info);
     recordClosureMutated(cap, (StgClosure *) mvar);
     IF_NONMOVING_WRITE_BARRIER_ENABLED {
         // See Note [Dirty flags in the non-moving collector] in NonMoving.c
@@ -1462,7 +1462,7 @@ dirty_TVAR(Capability *cap, StgTVar *p,
     // No barrier required here as no other heap object fields are read. See
     // Note [Heap memory barriers] in SMP.h.
     if (RELAXED_LOAD(&p->header.info) == &stg_TVAR_CLEAN_info) {
-        SET_INFO((StgClosure*) p, &stg_TVAR_DIRTY_info);
+        SET_INFO_RELAXED((StgClosure*) p, &stg_TVAR_DIRTY_info);
         recordClosureMutated(cap,(StgClosure*)p);
         IF_NONMOVING_WRITE_BARRIER_ENABLED {
             // See Note [Dirty flags in the non-moving collector] in NonMoving.c


=====================================
rts/win32/AsyncMIO.c
=====================================
@@ -294,7 +294,7 @@ start:
             for(tso = iomgr->blocked_queue_hd; tso != END_TSO_QUEUE;
                   tso = tso->_link) {
 
-                switch(tso->why_blocked) {
+                switch(ACQUIRE_LOAD(&tso->why_blocked)) {
                 case BlockedOnRead:
                 case BlockedOnWrite:
                 case BlockedOnDoProc:



View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/bf2b3041accc5df3f3c5a980a1f8e4e10a34a2e1...bc8eba8e67bb9b1134e0a4a210e3bf79acbf7ac5

-- 
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/bf2b3041accc5df3f3c5a980a1f8e4e10a34a2e1...bc8eba8e67bb9b1134e0a4a210e3bf79acbf7ac5
You're receiving this email because of your account on gitlab.haskell.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/ghc-commits/attachments/20231215/9ecaf4a8/attachment-0001.html>


More information about the ghc-commits mailing list