[commit: ghc] ghc-8.0: Another try to get thread migration right (13ff342)

git at git.haskell.org git at git.haskell.org
Thu Aug 25 16:37:26 UTC 2016


Repository : ssh://git@git.haskell.org/ghc

On branch  : ghc-8.0
Link       : http://ghc.haskell.org/trac/ghc/changeset/13ff3423e058a409b035acce5c1448237885ac84/ghc

>---------------------------------------------------------------

commit 13ff3423e058a409b035acce5c1448237885ac84
Author: Simon Marlow <marlowsd at gmail.com>
Date:   Thu Aug 4 15:59:43 2016 +0100

    Another try to get thread migration right
    
    Summary:
    This is surprisingly tricky.  There were linked list bugs in the
    previous version (D2430) that showed up as a test failure in
    setnumcapabilities001 (that's a great stress test!).
    
    This new version uses a different strategy that doesn't suffer from
    the problem that @ezyang pointed out in D2430.  We now pre-calculate
    how many threads to keep for this capability, and then migrate any
    surplus threads off the front of the queue, taking care to account for
    threads that can't be migrated.
    
    Test Plan:
    1. setnumcapabilities001 stress test with sanity checking (+RTS -DS) turned on:
    
    ```
    cd testsuite/tests/concurrent/should_run
    make TEST=setnumcapabilities001 WAY=threaded1 EXTRA_HC_OPTS=-with-rtsopts=-DS CLEANUP=0
    while true; do ./setnumcapabilities001.run/setnumcapabilities001 4 9 2000 || break; done
    ```
    
    2. The test case from #12419
    
    Reviewers: niteria, ezyang, rwbarton, austin, bgamari, erikd
    
    Subscribers: thomie, ezyang
    
    Differential Revision: https://phabricator.haskell.org/D2441
    
    GHC Trac Issues: #12419
    
    (cherry picked from commit 89fa4e968f47cfb42d0dc33fc3bfffdce31d850e)


>---------------------------------------------------------------

13ff3423e058a409b035acce5c1448237885ac84
 rts/Schedule.c | 161 ++++++++++++++++++++++-----------------------------------
 1 file changed, 62 insertions(+), 99 deletions(-)

diff --git a/rts/Schedule.c b/rts/Schedule.c
index 1f3fa36..74859af 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -736,12 +736,6 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
     //  - threads that have TSO_LOCKED cannot migrate
     //  - a thread that is bound to the current Task cannot be migrated
     //
-    // So we walk through the run queue, migrating threads to
-    // free_caps[] round-robin, skipping over immovable threads.  Each
-    // time through free_caps[] we keep one thread for ourselves,
-    // provided we haven't encountered one or more immovable threads
-    // in this pass.
-    //
     // This is about the simplest thing we could do; improvements we
     // might want to do include:
     //
@@ -753,112 +747,81 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
 
     if (n_free_caps > 0) {
         StgTSO *prev, *t, *next;
-#ifdef SPARK_PUSHING
-        rtsBool pushed_to_all;
-#endif
 
         debugTrace(DEBUG_sched,
                    "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
                    cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
                    n_free_caps);
 
-        i = 0;
-#ifdef SPARK_PUSHING
-        pushed_to_all = rtsFalse;
-#endif
-
-        // We want to share threads equally amongst free_caps[] and the
-        // current capability, but sometimes we encounter immovable
-        // threads.  This counter tracks the number of threads we have kept
-        // for the current capability minus the number of passes over
-        // free_caps[]. If it is great than zero (due to immovable
-        // threads), we should try to bring it back to zero again by not
-        // keeping any threads for the current capability.
-        uint32_t imbalance = 0;
-
-        // n_free_caps may be larger than the number of spare threads we have,
-        // if there were sparks in the spark pool. To avoid giving away all our
-        // threads in this case, we limit the number of caps that we give
-        // threads to, to the number of spare threads (n_run_queue-1).
-        uint32_t thread_recipients = stg_min(spare_threads, n_free_caps);
-
-        if (thread_recipients > 0) {
-            prev = END_TSO_QUEUE;
-            t = cap->run_queue_hd;
-            for (; t != END_TSO_QUEUE; t = next) {
-                next = t->_link;
-                t->_link = END_TSO_QUEUE;
-                if (t->bound == task->incall // don't move my bound thread
-                    || tsoLocked(t)) {  // don't move a locked thread
-                    if (prev == END_TSO_QUEUE) {
-                        cap->run_queue_hd = t;
-                    } else {
-                        setTSOLink(cap, prev, t);
-                    }
-                    setTSOPrev(cap, t, prev);
-                    prev = t;
-                    imbalance++;
-                } else if (i == thread_recipients) {
-#ifdef SPARK_PUSHING
-                    pushed_to_all = rtsTrue;
-#endif
-                    // If we have not already kept any threads for this
-                    // capability during the current pass over free_caps[],
-                    // keep one now.
-                    if (imbalance == 0) {
-                        if (prev == END_TSO_QUEUE) {
-                            cap->run_queue_hd = t;
-                        } else {
-                            setTSOLink(cap, prev, t);
-                        }
-                        setTSOPrev(cap, t, prev);
-                        prev = t;
-                    } else {
-                        imbalance--;
-                    }
-                    i = 0;
+        // There are n_free_caps+1 caps in total.  We will share the threads
+        // evently between them, *except* that if the run queue does not divide
+        // evenly by n_free_caps+1 then we bias towards the current capability.
+        // e.g. with n_run_queue=4, n_free_caps=2, we will keep 2.
+        uint32_t keep_threads =
+            (cap->n_run_queue + n_free_caps) / (n_free_caps + 1);
+
+        // This also ensures that we don't give away all our threads, since
+        // (x + y) / (y + 1) >= 1 when x >= 1.
+
+        // The number of threads we have left.
+        uint32_t n = cap->n_run_queue;
+
+        // prev = the previous thread on this cap's run queue
+        prev = END_TSO_QUEUE;
+
+        // We're going to walk through the run queue, migrating threads to other
+        // capabilities until we have only keep_threads left.  We might
+        // encounter a thread that cannot be migrated, in which case we add it
+        // to the current run queue and decrement keep_threads.
+        for (t = cap->run_queue_hd, i = 0;
+             t != END_TSO_QUEUE && n > keep_threads;
+             t = next)
+        {
+            next = t->_link;
+            t->_link = END_TSO_QUEUE;
+
+            // Should we keep this thread?
+            if (t->bound == task->incall // don't move my bound thread
+                || tsoLocked(t) // don't move a locked thread
+                ) {
+                if (prev == END_TSO_QUEUE) {
+                    cap->run_queue_hd = t;
                 } else {
-                    appendToRunQueue(free_caps[i],t);
-                    cap->n_run_queue--;
-
-                    traceEventMigrateThread (cap, t, free_caps[i]->no);
-
-                    if (t->bound) { t->bound->task->cap = free_caps[i]; }
-                    t->cap = free_caps[i];
-                    i++;
+                    setTSOLink(cap, prev, t);
                 }
+                setTSOPrev(cap, t, prev);
+                prev = t;
+                if (keep_threads > 0) keep_threads--;
             }
-            cap->run_queue_tl = prev;
 
-            IF_DEBUG(sanity, checkRunQueue(cap));
-        }
+            // Or migrate it?
+            else {
+                appendToRunQueue(free_caps[i],t);
+                traceEventMigrateThread (cap, t, free_caps[i]->no);
 
-#ifdef SPARK_PUSHING
-        /* JB I left this code in place, it would work but is not necessary */
-
-        // If there are some free capabilities that we didn't push any
-        // threads to, then try to push a spark to each one.
-        if (!pushed_to_all) {
-            StgClosure *spark;
-            // i is the next free capability to push to
-            for (; i < n_free_caps; i++) {
-                if (emptySparkPoolCap(free_caps[i])) {
-                    spark = tryStealSpark(cap->sparks);
-                    if (spark != NULL) {
-                        /* TODO: if anyone wants to re-enable this code then
-                         * they must consider the fizzledSpark(spark) case
-                         * and update the per-cap spark statistics.
-                         */
-                        debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
-
-            traceEventStealSpark(free_caps[i], t, cap->no);
-
-                        newSpark(&(free_caps[i]->r), spark);
-                    }
-                }
+                if (t->bound) { t->bound->task->cap = free_caps[i]; }
+                t->cap = free_caps[i];
+                n--; // we have one fewer threads now
+                i++; // move on to the next free_cap
+                if (i == n_free_caps) i = 0;
             }
         }
-#endif /* SPARK_PUSHING */
+
+        // Join up the beginning of the queue (prev)
+        // with the rest of the queue (t)
+        if (t == END_TSO_QUEUE) {
+            cap->run_queue_tl = prev;
+        } else {
+            setTSOPrev(cap, t, prev);
+        }
+        if (prev == END_TSO_QUEUE) {
+            cap->run_queue_hd = t;
+        } else {
+            setTSOLink(cap, prev, t);
+        }
+        cap->n_run_queue = n;
+
+        IF_DEBUG(sanity, checkRunQueue(cap));
 
         // release the capabilities
         for (i = 0; i < n_free_caps; i++) {



More information about the ghc-commits mailing list