[Git][ghc/ghc][wip/marge_bot_batch_merge_job] 14 commits: Introduce CapIOManager as the per-cap I/O mangager state
Marge Bot (@marge-bot)
gitlab at gitlab.haskell.org
Tue Nov 22 00:26:07 UTC 2022
Marge Bot pushed to branch wip/marge_bot_batch_merge_job at Glasgow Haskell Compiler / GHC
Commits:
bc7e0ffd by Duncan Coutts at 2022-11-21T19:25:50-05:00
Introduce CapIOManager as the per-cap I/O mangager state
Rather than each I/O manager adding things into the Capability structure
ad-hoc, we should have a common CapIOManager iomgr member of the
Capability structure, with a common interface to initialise etc.
The content of the CapIOManager struct will be defined differently for
each I/O manager implementation. Eventually we should be able to have
the CapIOManager be opaque to the rest of the RTS, and known just to the
I/O manager implementation. We plan for that by making the Capability
contain a pointer to the CapIOManager rather than containing the
structure directly.
Initially just move the Unix threaded I/O manager's control FD.
- - - - -
a6a4d218 by Duncan Coutts at 2022-11-21T19:25:50-05:00
Add hook markCapabilityIOManager
To allow I/O managers to have GC roots in the Capability, within the
CapIOManager structure.
Not yet used in this patch.
- - - - -
b11cf279 by Duncan Coutts at 2022-11-21T19:25:50-05:00
Move APPEND_TO_BLOCKED_QUEUE from cmm to C
The I/O and delay blocking primitives for the non-threaded way
currently access the blocked_queue and sleeping_queue directly.
We want to move where those queues are to make their ownership clearer:
to have them clearly belong to the I/O manager impls rather than to the
scheduler. Ultimately we will want to change their representation too.
It's inconvenient to do that if these queues are accessed directly from
cmm code. So as a first step, replace the APPEND_TO_BLOCKED_QUEUE with a
C version appendToIOBlockedQueue(), and replace the open-coded
sleeping_queue insertion with insertIntoSleepingQueue().
- - - - -
33459873 by Duncan Coutts at 2022-11-21T19:25:51-05:00
Move {blocked,sleeping}_queue from scheduler global vars to CapIOManager
The blocked_queue_{hd,tl} and the sleeping_queue are currently
cooperatively managed between the scheduler and (some but not all of)
the non-threaded I/O manager implementations.
They lived as global vars with the scheduler, but are poked by I/O
primops and the I/O manager backends.
This patch is a step on the path towards making the management of I/O or
timer blocking belong to the I/O managers and not the scheduler.
Specifically, this patch moves the {blocked,sleeping}_queue from being
global vars in the scheduler to being members of the CapIOManager struct
within each Capability. They are not yet exclusively used by the I/O
managers: they are still poked from a couple other places, notably in
the scheduler before calling awaitEvent.
- - - - -
dac811e1 by Duncan Coutts at 2022-11-21T19:25:51-05:00
Remove the now-unused markScheduler
The global vars {blocked,sleeping}_queue are now in the Capability and
so get marked there via markCapabilityIOManager.
- - - - -
fcca3a83 by Duncan Coutts at 2022-11-21T19:25:51-05:00
Move macros for checking for pending IO or timers
from Schedule.h to Schedule.c and IOManager.h
This is just moving, the next step will be to rejig them slightly.
For the non-threaded RTS the scheduler needs to be able to test for
there being pending I/O operation or pending timers. The implementation
of these tests should really be considered to be part of the I/O
managers and not part of the scheduler.
- - - - -
6d52b9cd by Duncan Coutts at 2022-11-21T19:25:51-05:00
Replace EMPTY_{BLOCKED,SLEEPING}_QUEUE macros by function
These are the macros originaly from Scheduler.h, previously moved to
IOManager.h, and now replaced with a single inline function
anyPendingTimeoutsOrIO(). We can use a single function since the two
macros were always checked together.
Note that since anyPendingTimeoutsOrIO is defined for all IO manager
cases, including threaded, we do not need to guard its use by cpp
#if !defined(THREADED_RTS)
- - - - -
d8fed59e by Duncan Coutts at 2022-11-21T19:25:51-05:00
Expand emptyThreadQueues inline for clarity
It was not really adding anything. The name no longer meant anything
since those I/O and timeout queues do not belong to the scheuler.
In one of the two places it was used, the comments already had to
explain what it did, whereas now the code matches the comment nicely.
- - - - -
12d0291f by Duncan Coutts at 2022-11-21T19:25:51-05:00
Move the awaitEvent declaration into IOManager.h
And add or adjust comments at the use sites of awaitEvent.
- - - - -
8d22839e by Duncan Coutts at 2022-11-21T19:25:51-05:00
Pass the Capability *cap explicitly to awaitEvent
It is currently only used in the non-threaded RTS so it works to use
MainCapability, but it's a bit nicer to pass the cap anyway. It's
certainly shorter.
- - - - -
01440674 by Duncan Coutts at 2022-11-21T19:25:51-05:00
Pass the Capability *cap explicitly to appendToIOBlockedQueue
And to insertIntoSleepingQueue. Again, it's a bit cleaner and simpler
though not strictly necessary given that these primops are currently
only used in the non-threaded RTS.
- - - - -
60de539f by Duncan Coutts at 2022-11-21T19:25:51-05:00
Reveiew feedback: improve one of the TODO comments
The one about the nonsense (const False) test on WinIO for there being any IO
or timers pending, leading to unnecessary complication later in the
scheduler.
- - - - -
d6f4019c by Andreas Klebinger at 2022-11-21T19:25:51-05:00
Optimize getLevity.
Avoid the intermediate data structures allocated by splitTyConApp.
This avoids ~0.5% of allocations for a build using -O2.
Fixes #22254
- - - - -
06f37deb by Andreas Klebinger at 2022-11-21T19:25:51-05:00
hadrian:Set TNTC when running testsuite.
- - - - -
18 changed files:
- compiler/GHC/Core/Type.hs
- hadrian/src/Settings/Builders/RunTest.hs
- − rts/AwaitEvent.h
- rts/Capability.c
- rts/Capability.h
- rts/IOManager.c
- rts/IOManager.h
- rts/PrimOps.cmm
- rts/RaiseAsync.c
- rts/Schedule.c
- rts/Schedule.h
- rts/posix/Select.c
- rts/posix/Signals.c
- rts/sm/Compact.c
- rts/sm/GC.c
- rts/sm/NonMoving.c
- rts/win32/AsyncMIO.c
- rts/win32/AwaitEvent.c
Changes:
=====================================
compiler/GHC/Core/Type.hs
=====================================
@@ -260,7 +260,7 @@ import GHC.Builtin.Types.Prim
import {-# SOURCE #-} GHC.Builtin.Types
( charTy, naturalTy
, typeSymbolKind, liftedTypeKind, unliftedTypeKind
- , boxedRepDataConTyCon, constraintKind, zeroBitTypeKind
+ , constraintKind, zeroBitTypeKind
, manyDataConTy, oneDataConTy
, liftedRepTy, unliftedRepTy, zeroBitRepTy )
@@ -596,6 +596,8 @@ interfaces. Notably this plays a role in tcTySigs in GHC.Tc.Gen.Bind.
--
-- @isTyConKeyApp_maybe key ty@ returns @Just tys@ iff
-- the type @ty = T tys@, where T's unique = key
+-- key must not be `fUNTyConKey`; to test for functions, use `splitFunTy_maybe`.
+-- Thanks to this fact, we don't have to pattern match on `FunTy` here.
isTyConKeyApp_maybe :: Unique -> Type -> Maybe [Type]
isTyConKeyApp_maybe key ty
| TyConApp tc args <- coreFullView ty
@@ -2313,8 +2315,11 @@ getRuntimeRep ty
getLevity_maybe :: HasDebugCallStack => Type -> Maybe Type
getLevity_maybe ty
| Just rep <- getRuntimeRep_maybe ty
- , Just (tc, [lev]) <- splitTyConApp_maybe rep
- , tc == boxedRepDataConTyCon
+ -- Directly matching on TyConApp after expanding type synonyms
+ -- saves allocations compared to `splitTyConApp_maybe`. See #22254.
+ -- Given that this is a pretty hot function we make use of the fact
+ -- and use isTyConKeyApp_maybe instead.
+ , Just [lev] <- isTyConKeyApp_maybe boxedRepDataConKey rep
= Just lev
| otherwise
= Nothing
=====================================
hadrian/src/Settings/Builders/RunTest.hs
=====================================
@@ -60,7 +60,6 @@ runTestGhcFlags = do
, pure "-dno-debug-output"
]
-
data TestCompilerArgs = TestCompilerArgs{
hasDynamicRts, hasThreadedRts :: Bool
, hasDynamic :: Bool
@@ -68,6 +67,7 @@ data TestCompilerArgs = TestCompilerArgs{
, withNativeCodeGen :: Bool
, withInterpreter :: Bool
, unregisterised :: Bool
+ , tables_next_to_code :: Bool
, withSMP :: Bool
, debugAssertions :: Bool
-- ^ Whether the compiler has debug assertions enabled,
@@ -99,6 +99,7 @@ inTreeCompilerArgs stg = do
leadingUnderscore <- flag LeadingUnderscore
withInterpreter <- ghcWithInterpreter
unregisterised <- flag GhcUnregisterised
+ tables_next_to_code <- flag TablesNextToCode
withSMP <- targetSupportsSMP
debugAssertions <- ($ stg) . ghcDebugAssertions <$> flavour
profiled <- ghcProfiled <$> flavour <*> pure stg
@@ -144,6 +145,7 @@ outOfTreeCompilerArgs = do
withNativeCodeGen <- getBooleanSetting TestGhcWithNativeCodeGen
withInterpreter <- getBooleanSetting TestGhcWithInterpreter
unregisterised <- getBooleanSetting TestGhcUnregisterised
+ tables_next_to_code <- getBooleanSetting TestGhcUnregisterised
withSMP <- getBooleanSetting TestGhcWithSMP
debugAssertions <- getBooleanSetting TestGhcDebugged
@@ -254,6 +256,7 @@ runTestBuilderArgs = builder Testsuite ? do
, arg "-e", arg $ "config.have_interp=" ++ show withInterpreter
, arg "-e", arg $ "config.unregisterised=" ++ show unregisterised
+ , arg "-e", arg $ "config.tables_next_to_code=" ++ show tables_next_to_code
, arg "-e", arg $ "ghc_compiler_always_flags=" ++ quote ghcFlags
, arg "-e", arg $ asBool "ghc_with_dynamic_rts=" (hasDynamicRts)
=====================================
rts/AwaitEvent.h deleted
=====================================
@@ -1,21 +0,0 @@
-/* -----------------------------------------------------------------------------
- *
- * (c) The GHC Team 1998-2005
- *
- * The awaitEvent() interface, for the non-threaded RTS
- *
- * -------------------------------------------------------------------------*/
-
-#pragma once
-
-#if !defined(THREADED_RTS)
-/* awaitEvent(bool wait)
- *
- * Checks for blocked threads that need to be woken.
- *
- * Called from STG : NO
- * Locks assumed : sched_mutex
- */
-RTS_PRIVATE void awaitEvent(bool wait); /* In posix/Select.c or
- * win32/AwaitEvent.c */
-#endif
=====================================
rts/Capability.c
=====================================
@@ -278,12 +278,11 @@ initCapability (Capability *cap, uint32_t i)
cap->spark_stats.converted = 0;
cap->spark_stats.gcd = 0;
cap->spark_stats.fizzled = 0;
-#if !defined(mingw32_HOST_OS)
- cap->io_manager_control_wr_fd = -1;
-#endif
#endif
cap->total_allocated = 0;
+ initCapabilityIOManager(&cap->iomgr);
+
cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
cap->f.stgGCEnter1 = (StgFunPtr)__stg_gc_enter_1;
cap->f.stgGCFun = (StgFunPtr)__stg_gc_fun;
@@ -1323,6 +1322,8 @@ markCapability (evac_fn evac, void *user, Capability *cap,
}
#endif
+ markCapabilityIOManager(evac, user, cap->iomgr);
+
// Free STM structures for this Capability
stmPreGCHook(cap);
}
=====================================
rts/Capability.h
=====================================
@@ -24,6 +24,7 @@
#include "Task.h"
#include "Sparks.h"
#include "sm/NonMovingMark.h" // for MarkQueue
+#include "IOManager.h" // for CapIOManager
#include "BeginPrivate.h"
@@ -157,12 +158,11 @@ struct Capability_ {
// Stats on spark creation/conversion
SparkCounters spark_stats;
-#if !defined(mingw32_HOST_OS)
- // IO manager for this cap
- int io_manager_control_wr_fd;
-#endif
#endif
+ // I/O manager data structures for this capability
+ CapIOManager *iomgr;
+
// Per-capability STM-related data
StgTVarWatchQueue *free_tvar_watch_queues;
StgTRecChunk *free_trec_chunks;
=====================================
rts/IOManager.c
=====================================
@@ -19,7 +19,9 @@
#include "rts/IOInterface.h" // exported
#include "IOManager.h" // RTS internal
#include "Capability.h"
+#include "Schedule.h"
#include "RtsFlags.h"
+#include "RtsUtils.h"
#if !defined(mingw32_HOST_OS) && defined(HAVE_SIGNAL_H)
#include "posix/Signals.h"
@@ -32,7 +34,30 @@
#endif
-/* Called in the RTS initialisation
+/* Allocate and initialise the per-capability CapIOManager that lives in each
+ * Capability. Called early in the RTS initialisation.
+ */
+void initCapabilityIOManager(CapIOManager **piomgr)
+{
+ CapIOManager *iomgr =
+ (CapIOManager *) stgMallocBytes(sizeof(CapIOManager),
+ "initCapabilityIOManager");
+
+#if defined(THREADED_RTS)
+#if !defined(mingw32_HOST_OS)
+ iomgr->control_fd = -1;
+#endif
+#else // !defined(THREADED_RTS)
+ iomgr->blocked_queue_hd = END_TSO_QUEUE;
+ iomgr->blocked_queue_tl = END_TSO_QUEUE;
+ iomgr->sleeping_queue = END_TSO_QUEUE;
+#endif
+
+ *piomgr = iomgr;
+}
+
+
+/* Called late in the RTS initialisation
*/
void
initIOManager(void)
@@ -131,6 +156,19 @@ void wakeupIOManager(void)
#endif
}
+void markCapabilityIOManager(evac_fn evac USED_IF_NOT_THREADS,
+ void *user USED_IF_NOT_THREADS,
+ CapIOManager *iomgr USED_IF_NOT_THREADS)
+{
+
+#if !defined(THREADED_RTS)
+ evac(user, (StgClosure **)(void *)&iomgr->blocked_queue_hd);
+ evac(user, (StgClosure **)(void *)&iomgr->blocked_queue_tl);
+ evac(user, (StgClosure **)(void *)&iomgr->sleeping_queue);
+#endif
+
+}
+
/* Declared in rts/IOInterface.h. Used only by the MIO threaded I/O manager on
* Unix platforms.
@@ -140,10 +178,42 @@ void
setIOManagerControlFd(uint32_t cap_no USED_IF_THREADS, int fd USED_IF_THREADS) {
#if defined(THREADED_RTS)
if (cap_no < n_capabilities) {
- RELAXED_STORE(&capabilities[cap_no]->io_manager_control_wr_fd, fd);
+ RELAXED_STORE(&capabilities[cap_no]->iomgr->control_fd, fd);
} else {
errorBelch("warning: setIOManagerControlFd called with illegal capability number.");
}
#endif
}
#endif
+
+#if !defined(THREADED_RTS)
+void appendToIOBlockedQueue(Capability *cap, StgTSO *tso)
+{
+ CapIOManager *iomgr = cap->iomgr;
+ ASSERT(tso->_link == END_TSO_QUEUE);
+ if (iomgr->blocked_queue_hd == END_TSO_QUEUE) {
+ iomgr->blocked_queue_hd = tso;
+ } else {
+ setTSOLink(cap, iomgr->blocked_queue_tl, tso);
+ }
+ iomgr->blocked_queue_tl = tso;
+}
+
+void insertIntoSleepingQueue(Capability *cap, StgTSO *tso, LowResTime target)
+{
+ CapIOManager *iomgr = cap->iomgr;
+ StgTSO *prev = NULL;
+ StgTSO *t = iomgr->sleeping_queue;
+ while (t != END_TSO_QUEUE && t->block_info.target < target) {
+ prev = t;
+ t = t->_link;
+ }
+
+ tso->_link = t;
+ if (prev == NULL) {
+ iomgr->sleeping_queue = tso;
+ } else {
+ setTSOLink(cap, prev, tso);
+ }
+}
+#endif
=====================================
rts/IOManager.h
=====================================
@@ -21,7 +21,58 @@
#include "BeginPrivate.h"
-/* Init hook: called from hs_init_ghc.
+#include "sm/GC.h" // for evac_fn
+#include "posix/Select.h" // for LowResTime TODO: switch to normal Time
+
+
+/* The per-capability data structures belonging to the I/O manager.
+ *
+ * It can be accessed as cap->iomgr.
+ *
+ * The content of the structure is defined conditionally so it is different for
+ * each I/O manager implementation.
+ *
+ * TODO: once the content of this struct is genuinely private, and not shared
+ * with other parts of the RTS, then it can be made opaque, so the content is
+ * known only to the I/O manager and not the rest of the RTS.
+ */
+typedef struct {
+
+#if defined(THREADED_RTS)
+#if !defined(mingw32_HOST_OS)
+ /* Control FD for the MIO manager for this capability */
+ int control_fd;
+#endif
+#else // !defined(THREADED_RTS)
+ /* Thread queue for threads blocked on I/O completion.
+ * Used by the select() and Win32 MIO I/O managers. It is not used by
+ * the WinIO I/O manager, though it remains defined in this case.
+ */
+ StgTSO *blocked_queue_hd;
+ StgTSO *blocked_queue_tl;
+
+ /* Thread queue for threads blocked on timeouts.
+ * Used by the select() I/O manager only. It is grossly inefficient, like
+ * everything else to do with the select() I/O manager.
+ *
+ * TODO: It is not used by any of the Windows I/O managers, though it
+ * remains defined for them. This is an oddity that should be resolved.
+ */
+ StgTSO *sleeping_queue;
+#endif
+
+} CapIOManager;
+
+
+/* Allocate and initialise the per-capability CapIOManager that lives in each
+ * Capability. It is called from initCapability, via initScheduler,
+ * via hs_init_ghc.
+ */
+void initCapabilityIOManager(CapIOManager **iomgr);
+
+
+/* Init hook: called from hs_init_ghc, very late in the startup after almost
+ * everything else is done.
*/
void initIOManager(void);
@@ -66,6 +117,52 @@ void exitIOManager(bool wait_threads);
void wakeupIOManager(void);
+/* GC hook: mark any per-capability GC roots the I/O manager uses.
+ */
+void markCapabilityIOManager(evac_fn evac, void *user, CapIOManager *iomgr);
+
+
+#if !defined(THREADED_RTS)
+/* Add a thread to the end of the queue of threads blocked on I/O.
+ *
+ * This is used by the select() and the Windows MIO non-threaded I/O manager
+ * implementation.
+ */
+void appendToIOBlockedQueue(Capability *cap, StgTSO *tso);
+
+/* Insert a thread into the queue of threads blocked on timers.
+ *
+ * This is used by the select() I/O manager implementation only.
+ *
+ * The sleeping queue is defined for other non-threaded I/O managers but not
+ * used. This is a wart that should be excised.
+ */
+void insertIntoSleepingQueue(Capability *cap, StgTSO *tso, LowResTime target);
+#endif
+
+/* Check to see if there are any pending timeouts or I/O operations
+ * in progress with the I/O manager.
+ *
+ * This is used by the scheduler as part of deadlock-detection, and the
+ * "context switch as often as possible" test.
+ */
+INLINE_HEADER bool anyPendingTimeoutsOrIO(CapIOManager *iomgr);
+
+
+#if !defined(THREADED_RTS)
+/* Check whether there is any completed I/O or expired timers. If so,
+ * process the competions as appropriate, which will typically cause some
+ * waiting threads to be woken up.
+ *
+ * Called from schedule() both *before* and *after* scheduleDetectDeadlock().
+ *
+ * Defined in posix/Select.c
+ * or win32/AwaitEvent.c
+ */
+void awaitEvent(Capability *cap, bool wait);
+#endif
+
+
/* Pedantic warning cleanliness
*/
#if !defined(THREADED_RTS) && defined(mingw32_HOST_OS)
@@ -80,5 +177,40 @@ void wakeupIOManager(void);
#define USED_IF_THREADS_AND_NOT_MINGW32 STG_UNUSED
#endif
+/* -----------------------------------------------------------------------------
+ * INLINE functions... private from here on down.
+ *
+ * Some of these hooks are performance sensitive so parts of them are
+ * implemented here so they can be inlined.
+ * -----------------------------------------------------------------------------
+ */
+
+INLINE_HEADER bool anyPendingTimeoutsOrIO(CapIOManager *iomgr USED_IF_NOT_THREADS)
+{
+#if defined(THREADED_RTS)
+ /* For the purpose of the scheduler, the threaded I/O managers never have
+ pending I/O or timers. Of course in reality they do, but they're
+ managed via other primitives that the scheduler can see into (threads,
+ MVars and foreign blocking calls).
+ */
+ return false;
+#else
+#if defined(mingw32_HOST_OS)
+ /* The MIO I/O manager uses the blocked_queue, while the WinIO does not.
+ Note: the latter fact makes this test useless for the WinIO I/O manager,
+ and is the probable cause of the complication in the scheduler with
+ having to call awaitEvent in multiple places.
+
+ None of the Windows I/O managers use the sleeping_queue
+ */
+ return (iomgr->blocked_queue_hd != END_TSO_QUEUE);
+#else
+ /* The select() I/O manager uses the blocked_queue and the sleeping_queue.
+ */
+ return (iomgr->blocked_queue_hd != END_TSO_QUEUE)
+ || (iomgr->sleeping_queue != END_TSO_QUEUE);
+#endif
+#endif
+}
#include "EndPrivate.h"
=====================================
rts/PrimOps.cmm
=====================================
@@ -2571,18 +2571,6 @@ stg_whereFromzh (P_ clos)
Thread I/O blocking primitives
-------------------------------------------------------------------------- */
-/* Add a thread to the end of the blocked queue. (C-- version of the C
- * macro in Schedule.h).
- */
-#define APPEND_TO_BLOCKED_QUEUE(tso) \
- ASSERT(StgTSO__link(tso) == END_TSO_QUEUE); \
- if (W_[blocked_queue_hd] == END_TSO_QUEUE) { \
- W_[blocked_queue_hd] = tso; \
- } else { \
- ccall setTSOLink(MyCapability() "ptr", W_[blocked_queue_tl] "ptr", tso); \
- } \
- W_[blocked_queue_tl] = tso;
-
stg_waitReadzh ( W_ fd )
{
#if defined(THREADED_RTS)
@@ -2594,7 +2582,7 @@ stg_waitReadzh ( W_ fd )
StgTSO_block_info(CurrentTSO) = fd;
// No locking - we're not going to use this interface in the
// threaded RTS anyway.
- APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
+ ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
jump stg_block_noregs();
#endif
}
@@ -2610,7 +2598,7 @@ stg_waitWritezh ( W_ fd )
StgTSO_block_info(CurrentTSO) = fd;
// No locking - we're not going to use this interface in the
// threaded RTS anyway.
- APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
+ ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
jump stg_block_noregs();
#endif
}
@@ -2647,32 +2635,16 @@ stg_delayzh ( W_ us_delay )
* delayed thread on the blocked_queue.
*/
StgTSO_why_blocked(CurrentTSO) = BlockedOnDoProc::I16;
- APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
+ ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
jump stg_block_async_void();
#else
-
(target) = ccall getDelayTarget(us_delay);
StgTSO_block_info(CurrentTSO) = target;
- /* Insert the new thread in the sleeping queue. */
- prev = NULL;
- t = W_[sleeping_queue];
-while:
- if (t != END_TSO_QUEUE && StgTSO_block_info(t) < target) {
- prev = t;
- t = StgTSO__link(t);
- goto while;
- }
-
- StgTSO__link(CurrentTSO) = t;
- if (prev == NULL) {
- W_[sleeping_queue] = CurrentTSO;
- } else {
- ccall setTSOLink(MyCapability() "ptr", prev "ptr", CurrentTSO);
- }
+ ccall insertIntoSleepingQueue(MyCapability() "ptr", CurrentTSO "ptr", target);
jump stg_block_noregs();
#endif
#endif /* !THREADED_RTS */
@@ -2700,7 +2672,7 @@ stg_asyncReadzh ( W_ fd, W_ is_sock, W_ len, W_ buf )
StgAsyncIOResult_len(ares) = 0;
StgAsyncIOResult_errCode(ares) = 0;
StgTSO_block_info(CurrentTSO) = ares;
- APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
+ ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
jump stg_block_async();
#endif
}
@@ -2725,7 +2697,7 @@ stg_asyncWritezh ( W_ fd, W_ is_sock, W_ len, W_ buf )
StgAsyncIOResult_len(ares) = 0;
StgAsyncIOResult_errCode(ares) = 0;
StgTSO_block_info(CurrentTSO) = ares;
- APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
+ ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
jump stg_block_async();
#endif
}
@@ -2750,7 +2722,7 @@ stg_asyncDoProczh ( W_ proc, W_ param )
StgAsyncIOResult_len(ares) = 0;
StgAsyncIOResult_errCode(ares) = 0;
StgTSO_block_info(CurrentTSO) = ares;
- APPEND_TO_BLOCKED_QUEUE(CurrentTSO);
+ ccall appendToIOBlockedQueue(MyCapability() "ptr", CurrentTSO "ptr");
jump stg_block_async();
#endif
}
=====================================
rts/RaiseAsync.c
=====================================
@@ -708,7 +708,8 @@ removeFromQueues(Capability *cap, StgTSO *tso)
#if defined(mingw32_HOST_OS)
case BlockedOnDoProc:
#endif
- removeThreadFromDeQueue(cap, &blocked_queue_hd, &blocked_queue_tl, tso);
+ removeThreadFromDeQueue(cap, &cap->iomgr->blocked_queue_hd,
+ &cap->iomgr->blocked_queue_tl, tso);
#if defined(mingw32_HOST_OS)
/* (Cooperatively) signal that the worker thread should abort
* the request.
@@ -718,7 +719,7 @@ removeFromQueues(Capability *cap, StgTSO *tso)
goto done;
case BlockedOnDelay:
- removeThreadFromQueue(cap, &sleeping_queue, tso);
+ removeThreadFromQueue(cap, &cap->iomgr->sleeping_queue, tso);
goto done;
#endif
=====================================
rts/Schedule.c
=====================================
@@ -30,7 +30,6 @@
#include "Sparks.h"
#include "Capability.h"
#include "Task.h"
-#include "AwaitEvent.h"
#include "IOManager.h"
#if defined(mingw32_HOST_OS)
#include "win32/MIOManager.h"
@@ -70,13 +69,6 @@
* Global variables
* -------------------------------------------------------------------------- */
-#if !defined(THREADED_RTS)
-// Blocked/sleeping threads
-StgTSO *blocked_queue_hd = NULL;
-StgTSO *blocked_queue_tl = NULL;
-StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
-#endif
-
// Bytes allocated since the last time a HeapOverflow exception was thrown by
// the RTS
uint64_t allocated_bytes_at_heapoverflow = 0;
@@ -174,6 +166,7 @@ static void deleteAllThreads (void);
static void deleteThread_(StgTSO *tso);
#endif
+
/* ---------------------------------------------------------------------------
Main scheduling loop.
@@ -323,7 +316,10 @@ schedule (Capability *initialCapability, Task *task)
/* Notify the I/O manager that we have nothing to do. If there are
any outstanding I/O requests we'll block here. If there are not
then this is a user error and we will abort soon. */
- awaitEvent (emptyRunQueue(cap));
+ /* TODO: see if we can rationalise these two awaitEvent calls before
+ * and after scheduleDetectDeadlock().
+ */
+ awaitEvent (cap, emptyRunQueue(cap));
#else
ASSERT(sched_state >= SCHED_INTERRUPTING);
#endif
@@ -404,8 +400,9 @@ schedule (Capability *initialCapability, Task *task)
* the user specified "context switch as often as possible", with
* +RTS -C0
*/
- if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
- && !emptyThreadQueues(cap)) {
+ if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0 &&
+ (!emptyRunQueue(cap) ||
+ anyPendingTimeoutsOrIO(cap->iomgr))) {
RELAXED_STORE(&cap->context_switch, 1);
}
@@ -905,14 +902,34 @@ static void
scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
{
#if !defined(THREADED_RTS)
- //
- // Check whether any waiting threads need to be woken up. If the
- // run queue is empty, and there are no other tasks running, we
- // can wait indefinitely for something to happen.
- //
- if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
+ /* Check whether there is any completed I/O or expired timers. If so,
+ * process the competions as appropriate, which will typically cause some
+ * waiting threads to be woken up.
+ *
+ * If the run queue is empty, and there are no other threads running, we
+ * can wait indefinitely for something to happen.
+ *
+ * TODO: see if we can rationalise these two awaitEvent calls before
+ * and after scheduleDetectDeadlock()
+ *
+ * TODO: this test anyPendingTimeoutsOrIO does not have a proper
+ * implementation the WinIO I/O manager!
+ *
+ * The select() I/O manager uses the sleeping_queue and the blocked_queue,
+ * and the test checks both. The legacy win32 I/O manager only consults
+ * the blocked_queue, but then it puts threads waiting on delay# on the
+ * blocked_queue too, so that's ok.
+ *
+ * The WinIO I/O manager does not use either the sleeping_queue or the
+ * blocked_queue, but it's implementation of anyPendingTimeoutsOrIO still
+ * checks both! Since both queues will _always_ be empty then it will
+ * _always_ return false and so awaitEvent will _never_ be called here for
+ * WinIO. This may explain why there is a second call to awaitEvent below
+ * for the case of !defined(THREADED_RTS) && defined(mingw32_HOST_OS).
+ */
+ if (anyPendingTimeoutsOrIO(cap->iomgr))
{
- awaitEvent (emptyRunQueue(cap));
+ awaitEvent (cap, emptyRunQueue(cap));
}
#endif
}
@@ -931,7 +948,7 @@ scheduleDetectDeadlock (Capability **pcap, Task *task)
* other tasks are waiting for work, we must have a deadlock of
* some description.
*/
- if ( emptyThreadQueues(cap) )
+ if ( emptyRunQueue(cap) && !anyPendingTimeoutsOrIO(cap->iomgr) )
{
#if defined(THREADED_RTS)
/*
@@ -2365,11 +2382,6 @@ deleteAllThreads ()
// somewhere, and the main scheduler loop has to deal with it.
// Also, the run queue is the only thing keeping these threads from
// being GC'd, and we don't want the "main thread has been GC'd" panic.
-
-#if !defined(THREADED_RTS)
- ASSERT(blocked_queue_hd == END_TSO_QUEUE);
- ASSERT(sleeping_queue == END_TSO_QUEUE);
-#endif
}
/* -----------------------------------------------------------------------------
@@ -2703,12 +2715,6 @@ startWorkerTasks (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
void
initScheduler(void)
{
-#if !defined(THREADED_RTS)
- blocked_queue_hd = END_TSO_QUEUE;
- blocked_queue_tl = END_TSO_QUEUE;
- sleeping_queue = END_TSO_QUEUE;
-#endif
-
sched_state = SCHED_RUNNING;
SEQ_CST_STORE(&recent_activity, ACTIVITY_YES);
@@ -2793,16 +2799,6 @@ freeScheduler( void )
#endif
}
-void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
- void *user USED_IF_NOT_THREADS)
-{
-#if !defined(THREADED_RTS)
- evac(user, (StgClosure **)(void *)&blocked_queue_hd);
- evac(user, (StgClosure **)(void *)&blocked_queue_tl);
- evac(user, (StgClosure **)(void *)&sleeping_queue);
-#endif
-}
-
/* -----------------------------------------------------------------------------
performGC
=====================================
rts/Schedule.h
=====================================
@@ -22,7 +22,6 @@
void initScheduler (void);
void exitScheduler (bool wait_foreign);
void freeScheduler (void);
-void markScheduler (evac_fn evac, void *user);
// Place a new thread on the run queue of the current Capability
void scheduleThread (Capability *cap, StgTSO *tso);
@@ -105,14 +104,6 @@ extern volatile StgWord sched_state;
*/
extern volatile StgWord recent_activity;
-/* Thread queues.
- * Locks required : sched_mutex
- */
-#if !defined(THREADED_RTS)
-extern StgTSO *blocked_queue_hd, *blocked_queue_tl;
-extern StgTSO *sleeping_queue;
-#endif
-
extern bool heap_overflow;
#if defined(THREADED_RTS)
@@ -155,30 +146,6 @@ peekRunQueue (Capability *cap)
void promoteInRunQueue (Capability *cap, StgTSO *tso);
-/* Add a thread to the end of the blocked queue.
- */
-#if !defined(THREADED_RTS)
-INLINE_HEADER void
-appendToBlockedQueue(StgTSO *tso)
-{
- ASSERT(tso->_link == END_TSO_QUEUE);
- if (blocked_queue_hd == END_TSO_QUEUE) {
- blocked_queue_hd = tso;
- } else {
- setTSOLink(&MainCapability, blocked_queue_tl, tso);
- }
- blocked_queue_tl = tso;
-}
-#endif
-
-/* Check whether various thread queues are empty
- */
-INLINE_HEADER bool
-emptyQueue (StgTSO *q)
-{
- return (q == END_TSO_QUEUE);
-}
-
INLINE_HEADER bool
emptyRunQueue(Capability *cap)
{
@@ -199,21 +166,6 @@ truncateRunQueue(Capability *cap)
cap->n_run_queue = 0;
}
-#if !defined(THREADED_RTS)
-#define EMPTY_BLOCKED_QUEUE() (emptyQueue(blocked_queue_hd))
-#define EMPTY_SLEEPING_QUEUE() (emptyQueue(sleeping_queue))
-#endif
-
-INLINE_HEADER bool
-emptyThreadQueues(Capability *cap)
-{
- return emptyRunQueue(cap)
-#if !defined(THREADED_RTS)
- && EMPTY_BLOCKED_QUEUE() && EMPTY_SLEEPING_QUEUE()
-#endif
- ;
-}
-
#endif /* !IN_STG_CODE */
#include "EndPrivate.h"
=====================================
rts/posix/Select.c
=====================================
@@ -19,7 +19,7 @@
#include "RtsUtils.h"
#include "Capability.h"
#include "Select.h"
-#include "AwaitEvent.h"
+#include "IOManager.h"
#include "Stats.h"
#include "GetTime.h"
@@ -93,23 +93,23 @@ LowResTime getDelayTarget (HsInt us)
* if this is true, then our time has expired.
* (idea due to Andy Gill).
*/
-static bool wakeUpSleepingThreads (LowResTime now)
+static bool wakeUpSleepingThreads (Capability *cap, LowResTime now)
{
+ CapIOManager *iomgr = cap->iomgr;
StgTSO *tso;
bool flag = false;
- while (sleeping_queue != END_TSO_QUEUE) {
- tso = sleeping_queue;
+ while (iomgr->sleeping_queue != END_TSO_QUEUE) {
+ tso = iomgr->sleeping_queue;
if (((long)now - (long)tso->block_info.target) < 0) {
break;
}
- sleeping_queue = tso->_link;
+ iomgr->sleeping_queue = tso->_link;
tso->why_blocked = NotBlocked;
tso->_link = END_TSO_QUEUE;
IF_DEBUG(scheduler, debugBelch("Waking up sleeping thread %"
FMT_StgThreadID "\n", tso->id));
- // MainCapability: this code is !THREADED_RTS
- pushOnRunQueue(&MainCapability,tso);
+ pushOnRunQueue(cap,tso);
flag = true;
}
return flag;
@@ -217,8 +217,9 @@ static enum FdState fdPollWriteState (int fd)
*
*/
void
-awaitEvent(bool wait)
+awaitEvent(Capability *cap, bool wait)
{
+ CapIOManager *iomgr = cap->iomgr;
StgTSO *tso, *prev, *next;
fd_set rfd,wfd;
int numFound;
@@ -243,7 +244,7 @@ awaitEvent(bool wait)
do {
now = getLowResTimeOfDay();
- if (wakeUpSleepingThreads(now)) {
+ if (wakeUpSleepingThreads(cap, now)) {
return;
}
@@ -253,7 +254,9 @@ awaitEvent(bool wait)
FD_ZERO(&rfd);
FD_ZERO(&wfd);
- for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
+ for(tso = iomgr->blocked_queue_hd;
+ tso != END_TSO_QUEUE;
+ tso = next) {
next = tso->_link;
/* On older FreeBSDs, FD_SETSIZE is unsigned. Cast it to signed int
@@ -298,7 +301,7 @@ awaitEvent(bool wait)
tv.tv_sec = 0;
tv.tv_usec = 0;
ptv = &tv;
- } else if (sleeping_queue != END_TSO_QUEUE) {
+ } else if (iomgr->sleeping_queue != END_TSO_QUEUE) {
/* SUSv2 allows implementations to have an implementation defined
* maximum timeout for select(2). The standard requires
* implementations to silently truncate values exceeding this maximum
@@ -317,7 +320,9 @@ awaitEvent(bool wait)
*/
const time_t max_seconds = 2678400; // 31 * 24 * 60 * 60
- Time min = LowResTimeToTime(sleeping_queue->block_info.target - now);
+ Time min = LowResTimeToTime(
+ iomgr->sleeping_queue->block_info.target - now
+ );
tv.tv_sec = TimeToSeconds(min);
if (tv.tv_sec < max_seconds) {
tv.tv_usec = TimeToUS(min) % 1000000;
@@ -350,7 +355,7 @@ awaitEvent(bool wait)
*/
#if defined(RTS_USER_SIGNALS)
if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
- startSignalHandlers(&MainCapability);
+ startSignalHandlers(cap);
return; /* still hold the lock */
}
#endif
@@ -363,12 +368,12 @@ awaitEvent(bool wait)
/* check for threads that need waking up
*/
- wakeUpSleepingThreads(getLowResTimeOfDay());
+ wakeUpSleepingThreads(cap, getLowResTimeOfDay());
/* If new runnable threads have arrived, stop waiting for
* I/O and run them.
*/
- if (!emptyRunQueue(&MainCapability)) {
+ if (!emptyRunQueue(cap)) {
return; /* still hold the lock */
}
}
@@ -385,7 +390,9 @@ awaitEvent(bool wait)
* traversed blocked TSOs. As a result you
* can't use functions accessing 'blocked_queue_hd'.
*/
- for(tso = blocked_queue_hd; tso != END_TSO_QUEUE; tso = next) {
+ for(tso = iomgr->blocked_queue_hd;
+ tso != END_TSO_QUEUE;
+ tso = next) {
next = tso->_link;
int fd;
enum FdState fd_state = RTS_FD_IS_BLOCKING;
@@ -422,7 +429,7 @@ awaitEvent(bool wait)
IF_DEBUG(scheduler,
debugBelch("Killing blocked thread %" FMT_StgThreadID
" on bad fd=%i\n", tso->id, fd));
- raiseAsync(&MainCapability, tso,
+ raiseAsync(cap, tso,
(StgClosure *)blockedOnBadFD_closure, false, NULL);
break;
case RTS_FD_IS_READY:
@@ -431,28 +438,29 @@ awaitEvent(bool wait)
tso->id));
tso->why_blocked = NotBlocked;
tso->_link = END_TSO_QUEUE;
- pushOnRunQueue(&MainCapability,tso);
+ pushOnRunQueue(cap,tso);
break;
case RTS_FD_IS_BLOCKING:
if (prev == NULL)
- blocked_queue_hd = tso;
+ iomgr->blocked_queue_hd = tso;
else
- setTSOLink(&MainCapability, prev, tso);
+ setTSOLink(cap, prev, tso);
prev = tso;
break;
}
}
if (prev == NULL)
- blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
+ iomgr->blocked_queue_hd =
+ iomgr->blocked_queue_tl = END_TSO_QUEUE;
else {
prev->_link = END_TSO_QUEUE;
- blocked_queue_tl = prev;
+ iomgr->blocked_queue_tl = prev;
}
}
} while (wait && sched_state == SCHED_RUNNING
- && emptyRunQueue(&MainCapability));
+ && emptyRunQueue(cap));
}
#endif /* THREADED_RTS */
=====================================
rts/posix/Signals.c
=====================================
@@ -203,11 +203,11 @@ ioManagerDie (void)
{
// Shut down IO managers
for (i=0; i < n_capabilities; i++) {
- const int fd = RELAXED_LOAD(&capabilities[i]->io_manager_control_wr_fd);
+ const int fd = RELAXED_LOAD(&capabilities[i]->iomgr->control_fd);
if (0 <= fd) {
r = write(fd, &byte, 1);
if (r == -1) { sysErrorBelch("ioManagerDie: write"); }
- RELAXED_STORE(&capabilities[i]->io_manager_control_wr_fd, -1);
+ RELAXED_STORE(&capabilities[i]->iomgr->control_fd, -1);
}
}
}
=====================================
rts/sm/Compact.c
=====================================
@@ -20,7 +20,6 @@
#include "BlockAlloc.h"
#include "GC.h"
#include "Compact.h"
-#include "Schedule.h"
#include "Apply.h"
#include "Trace.h"
#include "Weak.h"
@@ -981,8 +980,6 @@ compact(StgClosure *static_objects,
// 1. thread the roots
markCapabilities((evac_fn)thread_root, NULL);
- markScheduler((evac_fn)thread_root, NULL);
-
// the weak pointer lists...
for (W_ g = 0; g < RtsFlags.GcFlags.generations; g++) {
if (generations[g].weak_ptr_list != NULL) {
=====================================
rts/sm/GC.c
=====================================
@@ -537,8 +537,6 @@ GarbageCollect (uint32_t collect_gen,
markCapability(mark_root, gct, cap, true/*don't mark sparks*/);
}
- markScheduler(mark_root, gct);
-
// Mark the weak pointer list, and prepare to detect dead weak pointers.
markWeakPtrList();
initWeakForGC();
=====================================
rts/sm/NonMoving.c
=====================================
@@ -25,7 +25,6 @@
#include "NonMovingSweep.h"
#include "NonMovingCensus.h"
#include "StablePtr.h" // markStablePtrTable
-#include "Schedule.h" // markScheduler
#include "Weak.h" // dead_weak_ptr_list
struct NonmovingHeap nonmovingHeap;
@@ -949,7 +948,6 @@ void nonmovingCollect(StgWeak **dead_weaks, StgTSO **resurrected_threads)
markCapability((evac_fn)markQueueAddRoot, mark_queue,
capabilities[n], true/*don't mark sparks*/);
}
- markScheduler((evac_fn)markQueueAddRoot, mark_queue);
nonmovingMarkWeakPtrList(mark_queue, *dead_weaks);
markStablePtrTable((evac_fn)markQueueAddRoot, mark_queue);
=====================================
rts/win32/AsyncMIO.c
=====================================
@@ -224,6 +224,8 @@ awaitRequests(bool wait)
#if !defined(THREADED_RTS)
// none of this is actually used in the threaded RTS
+ CapIOManager *iomgr = MainCapability.iomgr;
+
start:
#if 0
fprintf(stderr, "awaitRequests(): %d %d %d\n",
@@ -289,7 +291,7 @@ start:
unsigned int rID = completedTable[i].reqID;
prev = NULL;
- for(tso = blocked_queue_hd; tso != END_TSO_QUEUE;
+ for(tso = iomgr->blocked_queue_hd; tso != END_TSO_QUEUE;
tso = tso->_link) {
switch(tso->why_blocked) {
@@ -309,10 +311,11 @@ start:
if (prev) {
setTSOLink(&MainCapability, prev, tso->_link);
} else {
- blocked_queue_hd = tso->_link;
+ iomgr->blocked_queue_hd = tso->_link;
}
- if (blocked_queue_tl == tso) {
- blocked_queue_tl = prev ? prev : END_TSO_QUEUE;
+ if (iomgr->blocked_queue_tl == tso) {
+ iomgr->blocked_queue_tl = prev ? prev
+ : END_TSO_QUEUE;
}
// Terminates the run queue + this inner for-loop.
=====================================
rts/win32/AwaitEvent.c
=====================================
@@ -16,7 +16,7 @@
#include "Rts.h"
#include "RtsFlags.h"
#include "Schedule.h"
-#include "AwaitEvent.h"
+#include "IOManager.h"
#include <windows.h>
#include "win32/AsyncMIO.h"
#include "win32/AsyncWinIO.h"
@@ -28,7 +28,7 @@
static bool workerWaitingForRequests = false;
void
-awaitEvent(bool wait)
+awaitEvent(Capability *cap, bool wait)
{
do {
/* Try to de-queue completed IO requests
@@ -45,7 +45,7 @@ awaitEvent(bool wait)
// startSignalHandlers(), but this is the way that posix/Select.c
// does it and I'm feeling too paranoid to refactor it today --SDM
if (stg_pending_events != 0) {
- startSignalHandlers(&MainCapability);
+ startSignalHandlers(cap);
return;
}
@@ -57,7 +57,7 @@ awaitEvent(bool wait)
} while (wait
&& sched_state == SCHED_RUNNING
- && emptyRunQueue(&MainCapability)
+ && emptyRunQueue(cap)
);
}
#endif
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/319b0f6b31cd47d79968be754dbe193c181aea3b...06f37debbc8b70c3e4cdc90c48586b53af58a1cf
--
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/319b0f6b31cd47d79968be754dbe193c181aea3b...06f37debbc8b70c3e4cdc90c48586b53af58a1cf
You're receiving this email because of your account on gitlab.haskell.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/ghc-commits/attachments/20221121/236d3220/attachment-0001.html>
More information about the ghc-commits
mailing list