[commit: ghc] ghc-lwc2: scheduleThreadOnFreeCap is cognitive of the fact that capabilities can be grabbed but still be married to other LWT schedulers. This is achieved through a new capability field picked_up_by_ULS. (f937b9d)

Sivaramakrishnan Krishnamoorthy Chandrasekaran t-sichan at microsoft.com
Sat Mar 2 06:36:38 CET 2013


Repository : http://darcs.haskell.org/ghc.git/

On branch  : ghc-lwc2

http://hackage.haskell.org/trac/ghc/changeset/f937b9d8b9d5092505c1c6e95359ca2e48b44d77

>---------------------------------------------------------------

commit f937b9d8b9d5092505c1c6e95359ca2e48b44d77
Author: KC Sivaramakrishnan <chandras at cs.purdue.edu>
Date:   Fri Mar 1 18:32:17 2013 -0500

    scheduleThreadOnFreeCap is cognitive of the fact that capabilities can be grabbed but still be married to other LWT schedulers. This is achieved through a new capability field picked_up_by_ULS.

>---------------------------------------------------------------

 rts/Capability.c |  1 +
 rts/Capability.h |  6 ++++++
 rts/Schedule.c   | 37 +++++++++++++++++++++----------------
 rts/Threads.c    |  2 +-
 4 files changed, 29 insertions(+), 17 deletions(-)

diff --git a/rts/Capability.c b/rts/Capability.c
index 688f56d..f92a3af 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -250,6 +250,7 @@ initCapability( Capability *cap, nat i )
   cap->spark_stats.converted  = 0;
   cap->spark_stats.gcd        = 0;
   cap->spark_stats.fizzled    = 0;
+  cap->picked_up_by_ULS       = rtsFalse;
 #endif
   cap->total_allocated        = 0;
 
diff --git a/rts/Capability.h b/rts/Capability.h
index df8c669..18c3265 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -139,6 +139,12 @@ struct Capability_ {
 
     // Stats on spark creation/conversion
     SparkCounters spark_stats;
+
+    // Indicates that a scheduler has picked up this capability in order to
+    // initiate parallel execution. This is utilized by
+    // schedulerThreadOnFreeCap function. Can modify only after grabbing hold
+    // of the capability!
+    rtsBool picked_up_by_ULS;
 #endif
     // Total words allocated by this cap since rts start
     W_ total_allocated;
diff --git a/rts/Schedule.c b/rts/Schedule.c
index e95d470..84de8b8 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -757,7 +757,7 @@ scheduleYield (Capability **pcap, Task *task)
  * Push work to other Capabilities if we have some.
  * -------------------------------------------------------------------------- */
 
-    static void
+static void
 schedulePushWork(Capability *cap USED_IF_THREADS,
                  Task *task      USED_IF_THREADS)
 {
@@ -906,7 +906,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
  * ------------------------------------------------------------------------- */
 
 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
-    static void
+static void
 scheduleStartSignalHandlers(Capability *cap)
 {
     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
@@ -915,7 +915,7 @@ scheduleStartSignalHandlers(Capability *cap)
     }
 }
 #else
-    static void
+static void
 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
 {
 }
@@ -925,7 +925,7 @@ scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
  * Check for blocked threads that can be woken up.
  * ------------------------------------------------------------------------- */
 
-    static void
+static void
 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
 {
 #if !defined(THREADED_RTS)
@@ -1073,7 +1073,7 @@ scheduleDetectDeadlock (Capability **pcap, Task *task)
  * ------------------------------------------------------------------------- */
 
 #if defined(PARALLEL_HASKELL)
-    static void
+static void
 scheduleSendPendingMessages(void)
 {
 
@@ -1095,7 +1095,7 @@ scheduleSendPendingMessages(void)
  * Process message in the current Capability's inbox
  * ------------------------------------------------------------------------- */
 
-    static void
+static void
 scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
 {
 #if defined(THREADED_RTS)
@@ -1142,7 +1142,7 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
  * ------------------------------------------------------------------------- */
 
 #if defined(THREADED_RTS)
-    static void
+static void
 scheduleActivateSpark(Capability *cap)
 {
     if (anySparks() && !cap->disabled)
@@ -1157,7 +1157,7 @@ scheduleActivateSpark(Capability *cap)
  * After running a thread...
  * ------------------------------------------------------------------------- */
 
-    static void
+static void
 schedulePostRunThread (Capability *cap, StgTSO *t)
 {
     // We have to be able to catch transactions that are in an infinite loop as a
@@ -1450,7 +1450,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
  * Perform a heap census
  * -------------------------------------------------------------------------- */
 
-    static rtsBool
+static rtsBool
 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
 {
     // When we have +RTS -i0 and we're heap profiling, do a census at
@@ -2066,7 +2066,7 @@ forkProcess(HsStablePtr *entry
  *
  * ------------------------------------------------------------------------- */
 
-    void
+void
 setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
 {
 #if !defined(THREADED_RTS)
@@ -2211,7 +2211,7 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
  * Delete all the threads in the system
  * ------------------------------------------------------------------------- */
 
-    static void
+static void
 deleteAllThreads ( Capability *cap )
 {
     // NOTE: only safe to call if we own all capabilities.
@@ -2282,7 +2282,7 @@ suspendTask (Capability *cap, Task *task, rtsBool append_to_head)
                 task, incall);
 }
 
-    STATIC_INLINE void
+STATIC_INLINE void
 recoverSuspendedTask (Capability *cap, Task *task)
 {
     InCall *incall;
@@ -2335,7 +2335,7 @@ relegateTask (Capability *cap, Task* task) {
  * unbound worker thread.
  * ------------------------------------------------------------------------- */
 
-    void *
+void *
 suspendThread (StgRegTable *reg, rtsBool interruptible)
 {
     Capability *cap;
@@ -2490,7 +2490,7 @@ resumeThread (void *task_)
  * on this thread's stack before the scheduler is invoked.
  * ------------------------------------------------------------------------ */
 
-    void
+void
 scheduleThread(Capability *cap, StgTSO *tso)
 {
     // The thread goes at the *end* of the run-queue, to avoid possible
@@ -2498,7 +2498,7 @@ scheduleThread(Capability *cap, StgTSO *tso)
     appendToRunQueue(cap,tso);
 }
 
-    void
+void
 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
 {
     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
@@ -2524,6 +2524,7 @@ void scheduleThreadOnFreeCap (Capability* cap USED_IF_THREADS,
     nat i;
     rtsBool done = rtsFalse;
 
+    cap->picked_up_by_ULS = rtsTrue;
     tso->flags |= TSO_LOCKED; //request affinity
 
 retry:
@@ -2532,7 +2533,8 @@ retry:
         if (cap != cap0 && tryGrabCapability(cap0,task)) {
             if (!emptyRunQueue(cap0)
                 || cap->returning_tasks_hd != NULL
-                || cap->inbox != (Message*)END_TSO_QUEUE) {
+                || cap->inbox != (Message*)END_TSO_QUEUE
+                || cap0->picked_up_by_ULS) {
                 // it already has some work, we just grabbed it at
                 // the wrong moment.  Or maybe it's deadlocked!
                 releaseCapability(cap0);
@@ -2544,7 +2546,10 @@ retry:
     }
 
     if (!done) goto retry;
+    debugTrace (DEBUG_sched, "scheduleThreadOnFreeCap: thread %d scheduled on cap %d",
+                (int)tso->id, (int)cap0->no);
 
+    cap0->picked_up_by_ULS = rtsTrue;
     tso->cap = cap0;
     tso->why_blocked = NotBlocked;
 
diff --git a/rts/Threads.c b/rts/Threads.c
index 90be44d..546b3be 100644
--- a/rts/Threads.c
+++ b/rts/Threads.c
@@ -908,7 +908,7 @@ printThreadStatus(StgTSO *t)
   debugBelch("\n");
 }
 
-  void
+void
 printAllThreads(void)
 {
   StgTSO *t, *next;





More information about the ghc-commits mailing list