[commit: ghc] ghc-lwc2: scheduleThreadOnFreeCap is cognitive of the fact that capabilities can be grabbed but still be married to other LWT schedulers. This is achieved through a new capability field picked_up_by_ULS. (f937b9d)
Sivaramakrishnan Krishnamoorthy Chandrasekaran
t-sichan at microsoft.com
Sat Mar 2 06:36:38 CET 2013
Repository : http://darcs.haskell.org/ghc.git/
On branch : ghc-lwc2
http://hackage.haskell.org/trac/ghc/changeset/f937b9d8b9d5092505c1c6e95359ca2e48b44d77
>---------------------------------------------------------------
commit f937b9d8b9d5092505c1c6e95359ca2e48b44d77
Author: KC Sivaramakrishnan <chandras at cs.purdue.edu>
Date: Fri Mar 1 18:32:17 2013 -0500
scheduleThreadOnFreeCap is cognitive of the fact that capabilities can be grabbed but still be married to other LWT schedulers. This is achieved through a new capability field picked_up_by_ULS.
>---------------------------------------------------------------
rts/Capability.c | 1 +
rts/Capability.h | 6 ++++++
rts/Schedule.c | 37 +++++++++++++++++++++----------------
rts/Threads.c | 2 +-
4 files changed, 29 insertions(+), 17 deletions(-)
diff --git a/rts/Capability.c b/rts/Capability.c
index 688f56d..f92a3af 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -250,6 +250,7 @@ initCapability( Capability *cap, nat i )
cap->spark_stats.converted = 0;
cap->spark_stats.gcd = 0;
cap->spark_stats.fizzled = 0;
+ cap->picked_up_by_ULS = rtsFalse;
#endif
cap->total_allocated = 0;
diff --git a/rts/Capability.h b/rts/Capability.h
index df8c669..18c3265 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -139,6 +139,12 @@ struct Capability_ {
// Stats on spark creation/conversion
SparkCounters spark_stats;
+
+ // Indicates that a scheduler has picked up this capability in order to
+ // initiate parallel execution. This is utilized by
+ // schedulerThreadOnFreeCap function. Can modify only after grabbing hold
+ // of the capability!
+ rtsBool picked_up_by_ULS;
#endif
// Total words allocated by this cap since rts start
W_ total_allocated;
diff --git a/rts/Schedule.c b/rts/Schedule.c
index e95d470..84de8b8 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -757,7 +757,7 @@ scheduleYield (Capability **pcap, Task *task)
* Push work to other Capabilities if we have some.
* -------------------------------------------------------------------------- */
- static void
+static void
schedulePushWork(Capability *cap USED_IF_THREADS,
Task *task USED_IF_THREADS)
{
@@ -906,7 +906,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
* ------------------------------------------------------------------------- */
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
- static void
+static void
scheduleStartSignalHandlers(Capability *cap)
{
if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
@@ -915,7 +915,7 @@ scheduleStartSignalHandlers(Capability *cap)
}
}
#else
- static void
+static void
scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
{
}
@@ -925,7 +925,7 @@ scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
* Check for blocked threads that can be woken up.
* ------------------------------------------------------------------------- */
- static void
+static void
scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
{
#if !defined(THREADED_RTS)
@@ -1073,7 +1073,7 @@ scheduleDetectDeadlock (Capability **pcap, Task *task)
* ------------------------------------------------------------------------- */
#if defined(PARALLEL_HASKELL)
- static void
+static void
scheduleSendPendingMessages(void)
{
@@ -1095,7 +1095,7 @@ scheduleSendPendingMessages(void)
* Process message in the current Capability's inbox
* ------------------------------------------------------------------------- */
- static void
+static void
scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
{
#if defined(THREADED_RTS)
@@ -1142,7 +1142,7 @@ scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
* ------------------------------------------------------------------------- */
#if defined(THREADED_RTS)
- static void
+static void
scheduleActivateSpark(Capability *cap)
{
if (anySparks() && !cap->disabled)
@@ -1157,7 +1157,7 @@ scheduleActivateSpark(Capability *cap)
* After running a thread...
* ------------------------------------------------------------------------- */
- static void
+static void
schedulePostRunThread (Capability *cap, StgTSO *t)
{
// We have to be able to catch transactions that are in an infinite loop as a
@@ -1450,7 +1450,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
* Perform a heap census
* -------------------------------------------------------------------------- */
- static rtsBool
+static rtsBool
scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
{
// When we have +RTS -i0 and we're heap profiling, do a census at
@@ -2066,7 +2066,7 @@ forkProcess(HsStablePtr *entry
*
* ------------------------------------------------------------------------- */
- void
+void
setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
{
#if !defined(THREADED_RTS)
@@ -2211,7 +2211,7 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
* Delete all the threads in the system
* ------------------------------------------------------------------------- */
- static void
+static void
deleteAllThreads ( Capability *cap )
{
// NOTE: only safe to call if we own all capabilities.
@@ -2282,7 +2282,7 @@ suspendTask (Capability *cap, Task *task, rtsBool append_to_head)
task, incall);
}
- STATIC_INLINE void
+STATIC_INLINE void
recoverSuspendedTask (Capability *cap, Task *task)
{
InCall *incall;
@@ -2335,7 +2335,7 @@ relegateTask (Capability *cap, Task* task) {
* unbound worker thread.
* ------------------------------------------------------------------------- */
- void *
+void *
suspendThread (StgRegTable *reg, rtsBool interruptible)
{
Capability *cap;
@@ -2490,7 +2490,7 @@ resumeThread (void *task_)
* on this thread's stack before the scheduler is invoked.
* ------------------------------------------------------------------------ */
- void
+void
scheduleThread(Capability *cap, StgTSO *tso)
{
// The thread goes at the *end* of the run-queue, to avoid possible
@@ -2498,7 +2498,7 @@ scheduleThread(Capability *cap, StgTSO *tso)
appendToRunQueue(cap,tso);
}
- void
+void
scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
{
tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
@@ -2524,6 +2524,7 @@ void scheduleThreadOnFreeCap (Capability* cap USED_IF_THREADS,
nat i;
rtsBool done = rtsFalse;
+ cap->picked_up_by_ULS = rtsTrue;
tso->flags |= TSO_LOCKED; //request affinity
retry:
@@ -2532,7 +2533,8 @@ retry:
if (cap != cap0 && tryGrabCapability(cap0,task)) {
if (!emptyRunQueue(cap0)
|| cap->returning_tasks_hd != NULL
- || cap->inbox != (Message*)END_TSO_QUEUE) {
+ || cap->inbox != (Message*)END_TSO_QUEUE
+ || cap0->picked_up_by_ULS) {
// it already has some work, we just grabbed it at
// the wrong moment. Or maybe it's deadlocked!
releaseCapability(cap0);
@@ -2544,7 +2546,10 @@ retry:
}
if (!done) goto retry;
+ debugTrace (DEBUG_sched, "scheduleThreadOnFreeCap: thread %d scheduled on cap %d",
+ (int)tso->id, (int)cap0->no);
+ cap0->picked_up_by_ULS = rtsTrue;
tso->cap = cap0;
tso->why_blocked = NotBlocked;
diff --git a/rts/Threads.c b/rts/Threads.c
index 90be44d..546b3be 100644
--- a/rts/Threads.c
+++ b/rts/Threads.c
@@ -908,7 +908,7 @@ printThreadStatus(StgTSO *t)
debugBelch("\n");
}
- void
+void
printAllThreads(void)
{
StgTSO *t, *next;
More information about the ghc-commits
mailing list