[Git][ghc/ghc][wip/jsem] Implement -jsem: parallelism controlled by semaphores
Matthew Pickering (@mpickering)
gitlab at gitlab.haskell.org
Wed Mar 22 11:36:33 UTC 2023
Matthew Pickering pushed to branch wip/jsem at Glasgow Haskell Compiler / GHC
1755da7f by sheaf at 2023-03-22T11:36:12+00:00
Implement -jsem: parallelism controlled by semaphores
See https://github.com/ghc-proposals/ghc-proposals/pull/540/ for a
complete description for the motivation for this feature.
The `-jsem` option allows a build tool to pass a semaphore to GHC which
GHC can use in order to control how much parallelism it requests.
GHC itself acts as a client in the GHC jobserver protocol.
GHC Jobserver Protocol
This proposal introduces the GHC Jobserver Protocol. This protocol allows
a server to dynamically invoke many instances of a client process,
while restricting all of those instances to use no more than <n> capabilities.
This is achieved by coordination over a system semaphore (either a POSIX
semaphore [6]_ in the case of Linux and Darwin, or a Win32 semaphore [7]_
in the case of Windows platforms).
There are two kinds of participants in the GHC Jobserver protocol:
- The *jobserver* creates a system semaphore with a certain number of
available tokens.
Each time the jobserver wants to spawn a new jobclient subprocess, it **must**
first acquire a single token from the semaphore, before spawning
the subprocess. This token **must** be released once the subprocess terminates.
Once work is finished, the jobserver **must** destroy the semaphore it created.
- A *jobclient* is a subprocess spawned by the jobserver or another jobclient.
Each jobclient starts with one available token (its *implicit token*,
which was acquired by the parent which spawned it), and can request more
tokens through the Jobserver Protocol by waiting on the semaphore.
Each time a jobclient wants to spawn a new jobclient subprocess, it **must**
pass on a single token to the child jobclient. This token can either be the
jobclient's implicit token, or another token which the jobclient acquired
from the semaphore.
Each jobclient **must** release exactly as many tokens as it has acquired from
the semaphore (this does not include the implicit tokens).
Build tools such as cabal act as jobservers in the protocol and are
responsibile for correctly creating, cleaning up and managing the
Fixes #19349
- - - - -
13 changed files:
- .gitmodules
- cabal.project-reinstall
- compiler/GHC/Driver/Make.hs
- + compiler/GHC/Driver/MakeSem.hs
- compiler/GHC/Driver/Pipeline/LogQueue.hs
- compiler/GHC/Driver/Session.hs
- compiler/ghc.cabal.in
- docs/users_guide/using.rst
- hadrian/src/Packages.hs
- hadrian/src/Rules/ToolArgs.hs
- hadrian/src/Settings/Default.hs
- + libraries/semaphore-compat
- packages
@@ -83,6 +83,10 @@
url = https://gitlab.haskell.org/ghc/packages/unix.git
ignore = untracked
branch = 2.7
+[submodule "libraries/semaphore-compat"]
+ path = libraries/semaphore-compat
+ url = https://gitlab.haskell.org/ghc/semaphore-compat.git
+ ignore = untracked
[submodule "libraries/stm"]
path = libraries/stm
url = https://gitlab.haskell.org/ghc/packages/stm.git
@@ -28,6 +28,7 @@ packages: ./compiler
-- ./libraries/pretty/
+ ./libraries/semaphore-compat
-- ./libraries/template-haskell/
@@ -75,6 +75,7 @@ import GHC.Driver.Env
import GHC.Driver.Errors
import GHC.Driver.Errors.Types
import GHC.Driver.Main
+import GHC.Driver.MakeSem
import GHC.Parser.Header
@@ -149,9 +150,9 @@ import GHC.Runtime.Loader
import GHC.Rename.Names
import GHC.Utils.Constants
import GHC.Types.Unique.DFM (udfmRestrictKeysSet)
-import qualified Data.IntSet as I
import GHC.Types.Unique
+import qualified Data.IntSet as I
-- -----------------------------------------------------------------------------
-- Loading the program
@@ -657,6 +658,30 @@ createBuildPlan mod_graph maybe_top_mod =
(vcat [text "Build plan missing nodes:", (text "PLAN:" <+> ppr (sum (map countMods build_plan))), (text "GRAPH:" <+> ppr (length (mgModSummaries' mod_graph )))])
+mkWorkerLimit :: DynFlags -> IO WorkerLimit
+mkWorkerLimit dflags =
+ case parMakeCount dflags of
+ Nothing -> pure $ num_procs 1
+ Just (ParMakeSemaphore h) -> pure (JSemLimit (SemaphoreName h))
+ Just ParMakeNumProcessors -> num_procs <$> getNumProcessors
+ Just (ParMakeThisMany n) -> pure $ num_procs n
+ where
+ num_procs x = NumProcessorsLimit (max 1 x)
+isWorkerLimitSequential :: WorkerLimit -> Bool
+isWorkerLimitSequential (NumProcessorsLimit x) = x <= 1
+isWorkerLimitSequential (JSemLimit {}) = False
+-- | This describes what we use to limit the number of jobs, either we limit it
+-- ourselves to a specific number or we have an external parallelism semaphore
+-- limit it for us.
+data WorkerLimit
+ = NumProcessorsLimit Int
+ | JSemLimit
+ SemaphoreName
+ -- ^ Semaphore name to use
+ deriving Eq
-- | Generalized version of 'load' which also supports a custom
-- 'Messager' (for reporting progress) and 'ModuleGraph' (generally
-- produced by calling 'depanal'.
@@ -737,14 +762,12 @@ load' mhmi_cache how_much mHscMessage mod_graph = do
liftIO $ debugTraceMsg logger 2 (hang (text "Ready for upsweep")
2 (ppr build_plan))
- n_jobs <- case parMakeCount (hsc_dflags hsc_env) of
- Nothing -> liftIO getNumProcessors
- Just n -> return n
+ worker_limit <- liftIO $ mkWorkerLimit dflags
setSession $ hscUpdateHUG (unitEnv_map pruneHomeUnitEnv) hsc_env
(upsweep_ok, hsc_env1) <- withDeferredDiagnostics $ do
hsc_env <- getSession
- liftIO $ upsweep n_jobs hsc_env mhmi_cache mHscMessage (toCache pruned_cache) build_plan
+ liftIO $ upsweep worker_limit hsc_env mhmi_cache mHscMessage (toCache pruned_cache) build_plan
setSession hsc_env1
case upsweep_ok of
Failed -> loadFinish upsweep_ok
@@ -1029,13 +1052,7 @@ getDependencies direct_deps build_map =
type BuildM a = StateT BuildLoopState IO a
--- | Abstraction over the operations of a semaphore which allows usage with the
--- -j1 case
-data AbstractSem = AbstractSem { acquireSem :: IO ()
- , releaseSem :: IO () }
-withAbstractSem :: AbstractSem -> IO b -> IO b
-withAbstractSem sem = MC.bracket_ (acquireSem sem) (releaseSem sem)
-- | Environment used when compiling a module
data MakeEnv = MakeEnv { hsc_env :: !HscEnv -- The basic HscEnv which will be augmented for each module
@@ -1220,7 +1237,7 @@ withCurrentUnit uid = do
local (\env -> env { hsc_env = hscSetActiveUnitId uid (hsc_env env)})
- :: Int -- ^ The number of workers we wish to run in parallel
+ :: WorkerLimit -- ^ The number of workers we wish to run in parallel
-> HscEnv -- ^ The base HscEnv, which is augmented for each module
-> Maybe ModIfaceCache -- ^ A cache to incrementally write final interface files to
-> Maybe Messager
@@ -2825,7 +2842,7 @@ label_self thread_name = do
CC.labelThread self_tid thread_name
-runPipelines :: Int -> HscEnv -> Maybe Messager -> [MakeAction] -> IO ()
+runPipelines :: WorkerLimit -> HscEnv -> Maybe Messager -> [MakeAction] -> IO ()
-- Don't even initialise plugins if there are no pipelines
runPipelines _ _ _ [] = return ()
runPipelines n_job orig_hsc_env mHscMessager all_pipelines = do
@@ -2833,7 +2850,7 @@ runPipelines n_job orig_hsc_env mHscMessager all_pipelines = do
plugins_hsc_env <- initializePlugins orig_hsc_env
case n_job of
- 1 -> runSeqPipelines plugins_hsc_env mHscMessager all_pipelines
+ NumProcessorsLimit n | n <= 1 -> runSeqPipelines plugins_hsc_env mHscMessager all_pipelines
_n -> runParPipelines n_job plugins_hsc_env mHscMessager all_pipelines
runSeqPipelines :: HscEnv -> Maybe Messager -> [MakeAction] -> IO ()
@@ -2843,16 +2860,38 @@ runSeqPipelines plugin_hsc_env mHscMessager all_pipelines =
, compile_sem = AbstractSem (return ()) (return ())
, env_messager = mHscMessager
- in runAllPipelines 1 env all_pipelines
+ in runAllPipelines (NumProcessorsLimit 1) env all_pipelines
+runNjobsAbstractSem :: Int -> (AbstractSem -> IO a) -> IO a
+runNjobsAbstractSem n_jobs action = do
+ compile_sem <- newQSem n_jobs
+ n_capabilities <- getNumCapabilities
+ n_cpus <- getNumProcessors
+ let
+ asem = AbstractSem (waitQSem compile_sem) (signalQSem compile_sem)
+ set_num_caps n = unless (n_capabilities /= 1) $ setNumCapabilities n
+ updNumCapabilities = do
+ -- Setting number of capabilities more than
+ -- CPU count usually leads to high userspace
+ -- lock contention. #9221
+ set_num_caps $ min n_jobs n_cpus
+ resetNumCapabilities = set_num_caps n_capabilities
+ MC.bracket_ updNumCapabilities resetNumCapabilities $ action asem
+runWorkerLimit :: WorkerLimit -> (AbstractSem -> IO a) -> IO a
+runWorkerLimit worker_limit action = case worker_limit of
+ NumProcessorsLimit n_jobs ->
+ runNjobsAbstractSem n_jobs action
+ JSemLimit sem ->
+ runJSemAbstractSem sem action
-- | Build and run a pipeline
-runParPipelines :: Int -- ^ How many capabilities to use
- -> HscEnv -- ^ The basic HscEnv which is augmented with specific info for each module
+runParPipelines :: WorkerLimit -- ^ How to limit work parallelism
+ -> HscEnv -- ^ The basic HscEnv which is augmented with specific info for each module
-> Maybe Messager -- ^ Optional custom messager to use to report progress
-> [MakeAction] -- ^ The build plan for all the module nodes
-> IO ()
-runParPipelines n_jobs plugin_hsc_env mHscMessager all_pipelines = do
+runParPipelines worker_limit plugin_hsc_env mHscMessager all_pipelines = do
-- A variable which we write to when an error has happened and we have to tell the
@@ -2862,39 +2901,23 @@ runParPipelines n_jobs plugin_hsc_env mHscMessager all_pipelines = do
-- will add it's LogQueue into this queue.
log_queue_queue_var <- newTVarIO newLogQueueQueue
-- Thread which coordinates the printing of logs
- wait_log_thread <- logThread n_jobs (length all_pipelines) (hsc_logger plugin_hsc_env) stopped_var log_queue_queue_var
+ wait_log_thread <- logThread (hsc_logger plugin_hsc_env) stopped_var log_queue_queue_var
-- Make the logger thread-safe, in case there is some output which isn't sent via the LogQueue.
thread_safe_logger <- liftIO $ makeThreadSafe (hsc_logger plugin_hsc_env)
let thread_safe_hsc_env = plugin_hsc_env { hsc_logger = thread_safe_logger }
- let updNumCapabilities = liftIO $ do
- n_capabilities <- getNumCapabilities
- n_cpus <- getNumProcessors
- -- Setting number of capabilities more than
- -- CPU count usually leads to high userspace
- -- lock contention. #9221
- let n_caps = min n_jobs n_cpus
- unless (n_capabilities /= 1) $ setNumCapabilities n_caps
- return n_capabilities
- let resetNumCapabilities orig_n = do
- liftIO $ setNumCapabilities orig_n
- atomically $ writeTVar stopped_var True
- wait_log_thread
- compile_sem <- newQSem n_jobs
- let abstract_sem = AbstractSem (waitQSem compile_sem) (signalQSem compile_sem)
+ runWorkerLimit worker_limit $ \abstract_sem -> do
+ let env = MakeEnv { hsc_env = thread_safe_hsc_env
+ , withLogger = withParLog log_queue_queue_var
+ , compile_sem = abstract_sem
+ , env_messager = mHscMessager
+ }
-- Reset the number of capabilities once the upsweep ends.
- let env = MakeEnv { hsc_env = thread_safe_hsc_env
- , withLogger = withParLog log_queue_queue_var
- , compile_sem = abstract_sem
- , env_messager = mHscMessager
- }
- MC.bracket updNumCapabilities resetNumCapabilities $ \_ ->
- runAllPipelines n_jobs env all_pipelines
+ runAllPipelines worker_limit env all_pipelines
+ atomically $ writeTVar stopped_var True
+ wait_log_thread
withLocalTmpFS :: RunMakeM a -> RunMakeM a
withLocalTmpFS act = do
@@ -2911,10 +2934,11 @@ withLocalTmpFS act = do
MC.bracket initialiser finaliser $ \lcl_hsc_env -> local (\env -> env { hsc_env = lcl_hsc_env}) act
-- | Run the given actions and then wait for them all to finish.
-runAllPipelines :: Int -> MakeEnv -> [MakeAction] -> IO ()
-runAllPipelines n_jobs env acts = do
- let spawn_actions :: IO [ThreadId]
- spawn_actions = if n_jobs == 1
+runAllPipelines :: WorkerLimit -> MakeEnv -> [MakeAction] -> IO ()
+runAllPipelines worker_limit env acts = do
+ let single_worker = isWorkerLimitSequential worker_limit
+ spawn_actions :: IO [ThreadId]
+ spawn_actions = if single_worker
then (:[]) <$> (forkIOWithUnmask $ \unmask -> void $ runLoop (\io -> io unmask) env acts)
else runLoop forkIOWithUnmask env acts
@@ -0,0 +1,548 @@
+{-# LANGUAGE BlockArguments #-}
+{-# LANGUAGE NamedFieldPuns #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE TupleSections #-}
+{-# LANGUAGE NumericUnderscores #-}
+-- | Implementation of a jobserver using system semaphores.
+module GHC.Driver.MakeSem
+ ( -- * JSem: parallelism semaphore backed
+ -- by a system semaphore (Posix/Windows)
+ runJSemAbstractSem
+ -- * System semaphores
+ , Semaphore, SemaphoreName(..)
+ -- * Abstract semaphores
+ , AbstractSem(..)
+ , withAbstractSem
+ )
+ where
+import GHC.Prelude
+import GHC.Conc
+import GHC.Data.OrdList
+import GHC.IO.Exception
+import GHC.Utils.Outputable
+import GHC.Utils.Panic
+import GHC.Utils.Json
+import System.Semaphore
+import Control.Monad
+import qualified Control.Monad.Catch as MC
+import Control.Concurrent.MVar
+import Control.Concurrent.STM
+import Data.Foldable
+import Data.Functor
+import GHC.Stack
+import Debug.Trace
+-- Semaphore jobserver
+-- | A jobserver based off a system 'Semaphore'.
+-- Keeps track of the pending jobs and resources
+-- available from the semaphore.
+data Jobserver
+ = Jobserver
+ { jSemaphore :: !Semaphore
+ -- ^ The semaphore which controls available resources
+ , jobs :: !(TVar JobResources)
+ -- ^ The currently pending jobs, and the resources
+ -- obtained from the semaphore
+ }
+data JobserverOptions
+ = JobserverOptions
+ { releaseDebounce :: !Int
+ -- ^ Minimum delay, in milliseconds, between acquiring a token
+ -- and releasing a token.
+ , setNumCapsDebounce :: !Int
+ -- ^ Minimum delay, in milliseconds, between two consecutive
+ -- calls of 'setNumCapabilities'.
+ }
+defaultJobserverOptions :: JobserverOptions
+defaultJobserverOptions =
+ JobserverOptions
+ { releaseDebounce = 1000 -- 1 second
+ , setNumCapsDebounce = 1000 -- 1 second
+ }
+-- | Resources available for running jobs, i.e.
+-- tokens obtained from the parallelism semaphore.
+data JobResources
+ = Jobs
+ { tokensOwned :: !Int
+ -- ^ How many tokens have been claimed from the semaphore
+ , tokensFree :: !Int
+ -- ^ How many tokens are not currently being used
+ , jobsWaiting :: !(OrdList (TMVar ()))
+ -- ^ Pending jobs waiting on a token, the job will be blocked on the TMVar so putting into
+ -- the TMVar will allow the job to continue.
+ }
+instance Outputable JobResources where
+ ppr Jobs{..}
+ = text "JobResources" <+>
+ ( braces $ hsep
+ [ text "owned=" <> ppr tokensOwned
+ , text "free=" <> ppr tokensFree
+ , text "num_waiting=" <> ppr (length jobsWaiting)
+ ] )
+-- | Add one new token.
+addToken :: JobResources -> JobResources
+addToken jobs@( Jobs { tokensOwned = owned, tokensFree = free })
+ = jobs { tokensOwned = owned + 1, tokensFree = free + 1 }
+-- | Free one token.
+addFreeToken :: JobResources -> JobResources
+addFreeToken jobs@( Jobs { tokensFree = free })
+ = assertPpr (tokensOwned jobs > free)
+ (text "addFreeToken:" <+> ppr (tokensOwned jobs) <+> ppr free)
+ $ jobs { tokensFree = free + 1 }
+-- | Use up one token.
+removeFreeToken :: JobResources -> JobResources
+removeFreeToken jobs@( Jobs { tokensFree = free })
+ = assertPpr (free > 0)
+ (text "removeFreeToken:" <+> ppr free)
+ $ jobs { tokensFree = free - 1 }
+-- | Return one owned token.
+removeOwnedToken :: JobResources -> JobResources
+removeOwnedToken jobs@( Jobs { tokensOwned = owned })
+ = assertPpr (owned > 1)
+ (text "removeOwnedToken:" <+> ppr owned)
+ $ jobs { tokensOwned = owned - 1 }
+-- | Add one new job to the end of the list of pending jobs.
+addJob :: TMVar () -> JobResources -> JobResources
+addJob job jobs@( Jobs { jobsWaiting = wait })
+ = jobs { jobsWaiting = wait `SnocOL` job }
+-- | The state of the semaphore job server.
+data JobserverState
+ = JobserverState
+ { jobserverAction :: !JobserverAction
+ -- ^ The current action being performed by the
+ -- job server.
+ , canChangeNumCaps :: !(TVar Bool)
+ -- ^ A TVar that signals whether it has been long
+ -- enough since we last changed 'numCapabilities'.
+ , canReleaseToken :: !(TVar Bool)
+ -- ^ A TVar that signals whether we last acquired
+ -- a token long enough ago that we can now release
+ -- a token.
+ }
+data JobserverAction
+ -- | The jobserver is idle: no thread is currently
+ -- interacting with the semaphore.
+ = Idle
+ -- | A thread is waiting for a token on the semaphore.
+ | Acquiring
+ { activeWaitId :: WaitId
+ , threadFinished :: TMVar (Maybe MC.SomeException) }
+-- | Retrieve the 'TMVar' that signals if the current thread has finished,
+-- if any thread is currently active in the jobserver.
+activeThread_maybe :: JobserverAction -> Maybe (TMVar (Maybe MC.SomeException))
+activeThread_maybe Idle = Nothing
+activeThread_maybe (Acquiring { threadFinished = tmvar }) = Just tmvar
+-- | Whether we should try to acquire a new token from the semaphore:
+-- there is a pending job and no free tokens.
+guardAcquire :: JobResources -> Bool
+guardAcquire ( Jobs { tokensFree, jobsWaiting } )
+ = tokensFree == 0 && not (null jobsWaiting)
+-- | Whether we should release a token from the semaphore:
+-- there are no pending jobs and we can release a token.
+guardRelease :: JobResources -> Bool
+guardRelease ( Jobs { tokensFree, tokensOwned, jobsWaiting } )
+ = null jobsWaiting && tokensFree > 0 && tokensOwned > 1
+-- Semaphore jobserver implementation
+-- | Add one pending job to the jobserver.
+-- Blocks, waiting on the jobserver to supply a free token.
+acquireJob :: TVar JobResources -> IO ()
+acquireJob jobs_tvar = do
+ (job_tmvar, _jobs0) <- tracedAtomically "acquire" $
+ modifyJobResources jobs_tvar \ jobs -> do
+ job_tmvar <- newEmptyTMVar
+ return ((job_tmvar, jobs), addJob job_tmvar jobs)
+ atomically $ takeTMVar job_tmvar
+-- | Signal to the job server that one job has completed,
+-- releasing its corresponding token.
+releaseJob :: TVar JobResources -> IO ()
+releaseJob jobs_tvar = do
+ tracedAtomically "release" do
+ modifyJobResources jobs_tvar \ jobs -> do
+ massertPpr (tokensFree jobs < tokensOwned jobs)
+ (text "releaseJob: more free jobs than owned jobs!")
+ return ((), addFreeToken jobs)
+-- | Release all tokens owned from the semaphore (to clean up
+-- the jobserver at the end).
+cleanupJobserver :: Jobserver -> IO ()
+cleanupJobserver (Jobserver { jSemaphore = sem
+ , jobs = jobs_tvar })
+ = do
+ Jobs { tokensOwned = owned } <- readTVarIO jobs_tvar
+ let toks_to_release = owned - 1
+ -- Subtract off the implicit token: whoever spawned the ghc process
+ -- in the first place is responsible for that token.
+ releaseSemaphore sem toks_to_release
+-- | Dispatch the available tokens acquired from the semaphore
+-- to the pending jobs in the job server.
+dispatchTokens :: JobResources -> STM JobResources
+dispatchTokens jobs@( Jobs { tokensFree = toks_free, jobsWaiting = wait } )
+ | toks_free > 0
+ , next `ConsOL` rest <- wait
+ -- There's a pending job and a free token:
+ -- pass on the token to that job, and recur.
+ = do
+ putTMVar next ()
+ let jobs' = jobs { tokensFree = toks_free - 1, jobsWaiting = rest }
+ dispatchTokens jobs'
+ | otherwise
+ = return jobs
+-- | Update the available resources used from a semaphore, dispatching
+-- any newly acquired resources.
+-- Invariant: if the number of available resources decreases, there
+-- must be no pending jobs.
+-- All modifications should go through this function to ensure the contents
+-- of the 'TVar' remains in normal form.
+modifyJobResources :: HasCallStack => TVar JobResources
+ -> (JobResources -> STM (a, JobResources))
+ -> STM (a, Maybe JobResources)
+modifyJobResources jobs_tvar action = do
+ old_jobs <- readTVar jobs_tvar
+ (a, jobs) <- action old_jobs
+ -- Check the invariant: if the number of free tokens has decreased,
+ -- there must be no pending jobs.
+ massertPpr (null (jobsWaiting jobs) || tokensFree jobs >= tokensFree old_jobs) $
+ vcat [ text "modiyJobResources: pending jobs but fewer free tokens" ]
+ dispatched_jobs <- dispatchTokens jobs
+ writeTVar jobs_tvar dispatched_jobs
+ return (a, Just dispatched_jobs)
+tracedAtomically_ :: String -> STM (Maybe JobResources) -> IO ()
+tracedAtomically_ s act = tracedAtomically s (((),) <$> act)
+tracedAtomically :: String -> STM (a, Maybe JobResources) -> IO a
+tracedAtomically origin act = do
+ (a, mjr) <- atomically act
+ forM_ mjr $ \ jr -> do
+ -- Use the "jsem:" prefix to identify where the write traces are
+ traceEventIO ("jsem:" ++ renderJobResources origin jr)
+ return a
+renderJobResources :: String -> JobResources -> String
+renderJobResources origin (Jobs own free pending) = showSDocUnsafe $ renderJSON $
+ JSObject [ ("name", JSString origin)
+ , ("owned", JSInt own)
+ , ("free", JSInt free)
+ , ("pending", JSInt (length pending) )
+ ]
+-- | Spawn a new thread that waits on the semaphore in order to acquire
+-- an additional token.
+acquireThread :: Jobserver -> IO JobserverAction
+acquireThread (Jobserver { jSemaphore = sem, jobs = jobs_tvar }) = do
+ threadFinished_tmvar <- newEmptyTMVarIO
+ let
+ wait_result_action :: Either MC.SomeException Bool -> IO ()
+ wait_result_action wait_res =
+ tracedAtomically_ "acquire_thread" do
+ (r, jb) <- case wait_res of
+ Left (e :: MC.SomeException) -> do
+ return $ (Just e, Nothing)
+ Right success -> do
+ if success
+ then do
+ modifyJobResources jobs_tvar \ jobs ->
+ return (Nothing, addToken jobs)
+ else
+ return (Nothing, Nothing)
+ putTMVar threadFinished_tmvar r
+ return jb
+ wait_id <- forkWaitOnSemaphoreInterruptible sem wait_result_action
+ labelThread (waitingThreadId wait_id) "acquire_thread"
+ return $ Acquiring { activeWaitId = wait_id
+ , threadFinished = threadFinished_tmvar }
+-- | Spawn a thread to release ownership of one resource from the semaphore,
+-- provided we have spare resources and no pending jobs.
+releaseThread :: Jobserver -> IO JobserverAction
+releaseThread (Jobserver { jSemaphore = sem, jobs = jobs_tvar }) = do
+ threadFinished_tmvar <- newEmptyTMVarIO
+ MC.mask_ do
+ -- Pre-release the resource so that another thread doesn't take control of it
+ -- just as we release the lock on the semaphore.
+ still_ok_to_release
+ <- tracedAtomically "pre_release" $
+ modifyJobResources jobs_tvar \ jobs ->
+ if guardRelease jobs
+ -- TODO: should this also debounce?
+ then return (True , removeOwnedToken $ removeFreeToken jobs)
+ else return (False, jobs)
+ if not still_ok_to_release
+ then return Idle
+ else do
+ tid <- forkIO $ do
+ x <- MC.try $ releaseSemaphore sem 1
+ tracedAtomically_ "post-release" $ do
+ (r, jobs) <- case x of
+ Left (e :: MC.SomeException) -> do
+ modifyJobResources jobs_tvar \ jobs ->
+ return (Just e, addToken jobs)
+ Right _ -> do
+ return (Nothing, Nothing)
+ putTMVar threadFinished_tmvar r
+ return jobs
+ labelThread tid "release_thread"
+ return Idle
+-- | When there are pending jobs but no free tokens,
+-- spawn a thread to acquire a new token from the semaphore.
+-- See 'acquireThread'.
+tryAcquire :: JobserverOptions
+ -> Jobserver
+ -> JobserverState
+ -> STM (IO JobserverState)
+tryAcquire opts js@( Jobserver { jobs = jobs_tvar })
+ st@( JobserverState { jobserverAction = Idle } )
+ = do
+ jobs <- readTVar jobs_tvar
+ guard $ guardAcquire jobs
+ return do
+ action <- acquireThread js
+ -- Set a debounce after acquiring a token.
+ can_release_tvar <- registerDelay $ (releaseDebounce opts * 1000)
+ return $ st { jobserverAction = action
+ , canReleaseToken = can_release_tvar }
+tryAcquire _ _ _ = retry
+-- | When there are free tokens and no pending jobs,
+-- spawn a thread to release a token from the semamphore.
+-- See 'releaseThread'.
+tryRelease :: Jobserver
+ -> JobserverState
+ -> STM (IO JobserverState)
+tryRelease sjs@( Jobserver { jobs = jobs_tvar } )
+ st@( JobserverState
+ { jobserverAction = Idle
+ , canReleaseToken = can_release_tvar } )
+ = do
+ jobs <- readTVar jobs_tvar
+ guard $ guardRelease jobs
+ can_release <- readTVar can_release_tvar
+ guard can_release
+ return do
+ action <- releaseThread sjs
+ return $ st { jobserverAction = action }
+tryRelease _ _ = retry
+-- | Wait for an active thread to finish. Once it finishes:
+-- - set the 'JobserverAction' to 'Idle',
+-- - update the number of capabilities to reflect the number
+-- of owned tokens from the semaphore.
+tryNoticeIdle :: JobserverOptions
+ -> TVar JobResources
+ -> JobserverState
+ -> STM (IO JobserverState)
+tryNoticeIdle opts jobs_tvar jobserver_state
+ | Just threadFinished_tmvar <- activeThread_maybe $ jobserverAction jobserver_state
+ = sync_num_caps (canChangeNumCaps jobserver_state) threadFinished_tmvar
+ | otherwise
+ = retry -- no active thread: wait until jobserver isn't idle
+ where
+ sync_num_caps :: TVar Bool
+ -> TMVar (Maybe MC.SomeException)
+ -> STM (IO JobserverState)
+ sync_num_caps can_change_numcaps_tvar threadFinished_tmvar = do
+ mb_ex <- takeTMVar threadFinished_tmvar
+ for_ mb_ex MC.throwM
+ Jobs { tokensOwned } <- readTVar jobs_tvar
+ can_change_numcaps <- readTVar can_change_numcaps_tvar
+ guard can_change_numcaps
+ return do
+ x <- getNumCapabilities
+ can_change_numcaps_tvar_2 <-
+ if x == tokensOwned
+ then return can_change_numcaps_tvar
+ else do
+ setNumCapabilities tokensOwned
+ registerDelay $ (setNumCapsDebounce opts * 1000)
+ return $
+ jobserver_state
+ { jobserverAction = Idle
+ , canChangeNumCaps = can_change_numcaps_tvar_2 }
+-- | Try to stop the current thread which is acquiring/releasing resources
+-- if that operation is no longer relevant.
+tryStopThread :: TVar JobResources
+ -> JobserverState
+ -> STM (IO JobserverState)
+tryStopThread jobs_tvar jsj = do
+ case jobserverAction jsj of
+ Acquiring { activeWaitId = wait_id } -> do
+ jobs <- readTVar jobs_tvar
+ guard $ null (jobsWaiting jobs)
+ return do
+ interruptWaitOnSemaphore wait_id
+ return $ jsj { jobserverAction = Idle }
+ _ -> retry
+-- | Main jobserver loop: acquire/release resources as
+-- needed for the pending jobs and available semaphore tokens.
+jobserverLoop :: JobserverOptions -> Jobserver -> IO ()
+jobserverLoop opts sjs@(Jobserver { jobs = jobs_tvar })
+ = do
+ true_tvar <- newTVarIO True
+ let init_state :: JobserverState
+ init_state =
+ JobserverState
+ { jobserverAction = Idle
+ , canChangeNumCaps = true_tvar
+ , canReleaseToken = true_tvar }
+ loop init_state
+ where
+ loop s = do
+ action <- atomically $ asum $ (\x -> x s) <$>
+ [ tryRelease sjs
+ , tryAcquire opts sjs
+ , tryNoticeIdle opts jobs_tvar
+ , tryStopThread jobs_tvar
+ ]
+ s <- action
+ loop s
+-- | Create a new jobserver using the given semaphore handle.
+makeJobserver :: SemaphoreName -> IO (AbstractSem, IO ())
+makeJobserver sem_name = do
+ semaphore <- openSemaphore sem_name
+ let
+ init_jobs =
+ Jobs { tokensOwned = 1
+ , tokensFree = 1
+ , jobsWaiting = NilOL
+ }
+ jobs_tvar <- newTVarIO init_jobs
+ let
+ opts = defaultJobserverOptions -- TODO: allow this to be configured
+ sjs = Jobserver { jSemaphore = semaphore
+ , jobs = jobs_tvar }
+ loop_finished_mvar <- newEmptyMVar
+ loop_tid <- forkIOWithUnmask \ unmask -> do
+ r <- try $ unmask $ jobserverLoop opts sjs
+ putMVar loop_finished_mvar $
+ case r of
+ Left e
+ | Just ThreadKilled <- fromException e
+ -> Nothing
+ | otherwise
+ -> Just e
+ Right () -> Nothing
+ labelThread loop_tid "job_server"
+ let
+ acquireSem = acquireJob jobs_tvar
+ releaseSem = releaseJob jobs_tvar
+ cleanupSem = do
+ -- this is interruptible
+ cleanupJobserver sjs
+ killThread loop_tid
+ mb_ex <- takeMVar loop_finished_mvar
+ for_ mb_ex MC.throwM
+ return (AbstractSem{..}, cleanupSem)
+-- | Implement an abstract semaphore using a semaphore 'Jobserver'
+-- which queries the system semaphore of the given name for resources.
+runJSemAbstractSem :: SemaphoreName -- ^ the system semaphore to use
+ -> (AbstractSem -> IO a) -- ^ the operation to run
+ -- which requires a semaphore
+ -> IO a
+runJSemAbstractSem sem action = MC.mask \ unmask -> do
+ (abs, cleanup) <- makeJobserver sem
+ r <- try $ unmask $ action abs
+ case r of
+ Left (e1 :: MC.SomeException) -> do
+ (_ :: Either MC.SomeException ()) <- MC.try cleanup
+ MC.throwM e1
+ Right x -> cleanup $> x
+Note [Architecture of the Job Server]
+In `-jsem` mode, the amount of parallelism that GHC can use is controlled by a
+system semaphore. We take resources from the semaphore when we need them, and
+give them back if we don't have enough to do.
+A naive implementation would just take and release the semaphore around performing
+the action, but this leads to two issues:
+* When taking a token in the semaphore, we must call `setNumCapabilities` in order
+ to adjust how many capabilities are available for parallel garbage collection.
+ This causes unnecessary synchronisations.
+* We want to implement a debounce, so that whilst there is pending work in the
+ current process we prefer to keep hold of resources from the semaphore.
+ This reduces overall memory usage, as there are fewer live GHC processes at once.
+Therefore, the obtention of semaphore resources is separated away from the
+request for the resource in the driver.
+A token from the semaphore is requested using `acquireJob`. This creates a pending
+job, which is a MVar that can be filled in to signal that the requested token is ready.
+When the job is finished, the token is released by calling `releaseJob`, which just
+increases the number of `free` jobs. If there are more pending jobs when the free count
+is increased, the token is immediately reused (see `modifyJobResources`).
+The `jobServerLoop` interacts with the system semaphore: when there are pending
+jobs, `acquireThread` blocks, waiting for a token from the semaphore. Once a
+token is obtained, it increases the owned count.
+When GHC has free tokens (tokens from the semaphore that it is not using),
+no pending jobs, and the debounce has expired, then `releaseThread` will
+release tokens back to the global semaphore.
+`tryStopThread` attempts to kill threads which are waiting to acquire a resource
+when we no longer need it. For example, consider that we attempt to acquire two
+tokens, but the first job finishes before we acquire the second token.
+This second token is no longer needed, so we should cancel the wait
+(as it would not be used to do any work, and not be returned until the debounce).
+We only need to kill `acquireJob`, because `releaseJob` never blocks.
+Note [Eventlog Messages for jsem]
+It can be tricky to verify that the work is shared adequately across different
+processes. To help debug this, we output the values of `JobResource` to the
+eventlog whenever the global state changes. There are some scripts which can be used
+to analyse this output and report statistics about core saturation in the
+GitHub repo (https://github.com/mpickering/ghc-jsem-analyse).
@@ -100,10 +100,10 @@ dequeueLogQueueQueue (LogQueueQueue n lqq) = case IM.minViewWithKey lqq of
Just ((k, v), lqq') | k == n -> Just (v, LogQueueQueue (n + 1) lqq')
_ -> Nothing
-logThread :: Int -> Int -> Logger -> TVar Bool -- Signal that no more new logs will be added, clear the queue and exit
+logThread :: Logger -> TVar Bool -- Signal that no more new logs will be added, clear the queue and exit
-> TVar LogQueueQueue -- Queue for logs
-> IO (IO ())
-logThread _ _ logger stopped lqq_var = do
+logThread logger stopped lqq_var = do
finished_var <- newEmptyMVar
_ <- forkIO $ print_logs *> putMVar finished_var ()
return (takeMVar finished_var)
@@ -43,6 +43,7 @@ module GHC.Driver.Session (
+ ParMakeCount(..),
outputFile, objectSuf, ways,
HasDynFlags(..), ContainsDynFlags(..),
@@ -461,9 +462,9 @@ data DynFlags = DynFlags {
ruleCheck :: Maybe String,
strictnessBefore :: [Int], -- ^ Additional demand analysis
- parMakeCount :: Maybe Int, -- ^ The number of modules to compile in parallel
- -- in --make mode, where Nothing ==> compile as
- -- many in parallel as there are CPUs.
+ parMakeCount :: Maybe ParMakeCount,
+ -- ^ The number of modules to compile in parallel
+ -- If unspecified, compile with a single job.
enableTimeStats :: Bool, -- ^ Enable RTS timing statistics?
ghcHeapSize :: Maybe Int, -- ^ The heap size to set.
@@ -783,6 +784,13 @@ instance (Monad m, HasDynFlags m) => HasDynFlags (ExceptT e m) where
class ContainsDynFlags t where
extractDynFlags :: t -> DynFlags
+-- | The type for the -jN argument, specifying that -j on its own represents
+-- using the number of machine processors.
+data ParMakeCount
+ = ParMakeThisMany Int -- -j <n>
+ | ParMakeSemaphore FilePath --jsem <semaphore>
+ | ParMakeNumProcessors -- -j
-- Accessors from 'DynFlags'
@@ -1146,7 +1154,7 @@ defaultDynFlags mySettings =
historySize = 20,
strictnessBefore = [],
- parMakeCount = Just 1,
+ parMakeCount = Nothing,
enableTimeStats = False,
ghcHeapSize = Nothing,
@@ -2066,14 +2074,16 @@ dynamic_flags_deps = [
, make_ord_flag defGhcFlag "j" (OptIntSuffix
(\n -> case n of
Just n
- | n > 0 -> upd (\d -> d { parMakeCount = Just n })
+ | n > 0 -> upd (\d -> d { parMakeCount = Just (ParMakeThisMany n) })
| otherwise -> addErr "Syntax: -j[n] where n > 0"
- Nothing -> upd (\d -> d { parMakeCount = Nothing })))
+ Nothing -> upd (\d -> d { parMakeCount = Just ParMakeNumProcessors })))
-- When the number of parallel builds
-- is omitted, it is the same
-- as specifying that the number of
-- parallel builds is equal to the
-- result of getNumProcessors
+ , make_ord_flag defGhcFlag "jsem" $ hasArg $ \f d -> d { parMakeCount = Just (ParMakeSemaphore f) }
, make_ord_flag defFlag "instantiated-with" (sepArg setUnitInstantiations)
, make_ord_flag defFlag "this-component-id" (sepArg setUnitInstanceOf)
@@ -85,6 +85,7 @@ Library
hpc == 0.6.*,
transformers >= 0.5 && < 0.7,
exceptions == 0.10.*,
+ semaphore-compat,
ghc-boot == @ProjectVersionMunged@,
ghc-heap == @ProjectVersionMunged@,
@@ -436,6 +437,7 @@ Library
+ GHC.Driver.MakeSem
@@ -751,6 +751,59 @@ search path (see :ref:`search-path`).
number of processors. Note that compilation of a module may not begin
until its dependencies have been built.
+GHC Jobserver Protocol
+This protocol allows
+a server to dynamically invoke many instances of a client process,
+while restricting all of those instances to use no more than <n> capabilities.
+This is achieved by coordination over a system semaphore (either a POSIX
+semaphore in the case of Linux and Darwin, or a Win32 semaphore
+in the case of Windows platforms).
+There are two kinds of participants in the GHC Jobserver protocol:
+- The *jobserver* creates a system semaphore with a certain number of
+ available tokens.
+ Each time the jobserver wants to spawn a new jobclient subprocess, it **must**
+ first acquire a single token from the semaphore, before spawning
+ the subprocess. This token **must** be released once the subprocess terminates.
+ Once work is finished, the jobserver **must** destroy the semaphore it created.
+- A *jobclient* is a subprocess spawned by the jobserver or another jobclient.
+ Each jobclient starts with one available token (its *implicit token*,
+ which was acquired by the parent which spawned it), and can request more
+ tokens through the Jobserver Protocol by waiting on the semaphore.
+ Each time a jobclient wants to spawn a new jobclient subprocess, it **must**
+ pass on a single token to the child jobclient. This token can either be the
+ jobclient's implicit token, or another token which the jobclient acquired
+ from the semaphore.
+ Each jobclient **must** release exactly as many tokens as it has acquired from
+ the semaphore (this does not include the implicit tokens).
+ GHC itself acts as a jobclient which can be enabled by using the flag ``-jsem``.
+.. ghc-flag:: -jsem
+ :shortdesc: When compiling with :ghc-flag:`--make`, coordinate with
+ other processes through the semaphore ⟨sem⟩ to compile
+ modules in parallel.
+ :type: dynamic
+ :category: misc
+ Perform compilation in parallel when possible, coordinating with other
+ processes through the semaphore ⟨sem⟩.
+ Error if the semaphore doesn't exist.
+ Use of ``-jsem`` will override use of `-j[⟨n⟩]`:ghc-flag:.
.. _multi-home-units:
Multiple Home Units
@@ -8,7 +8,7 @@ module Packages (
ghcCompact, ghcConfig, ghcHeap, ghci, ghciWrapper, ghcPkg, ghcPrim, haddock, haskeline,
hsc2hs, hp2ps, hpc, hpcBin, integerGmp, integerSimple, iserv, iservProxy,
libffi, mtl, parsec, pretty, primitive, process, remoteIserv, rts,
- runGhc, stm, templateHaskell, terminfo, text, time, timeout, touchy,
+ runGhc, semaphoreCompat, stm, templateHaskell, terminfo, text, time, timeout, touchy,
transformers, unlit, unix, win32, xhtml,
lintersCommon, lintNotes, lintCommitMsg, lintSubmoduleRefs, lintWhitespace,
ghcPackages, isGhcPackage,
@@ -39,7 +39,7 @@ ghcPackages =
, exceptions, filepath, genapply, genprimopcode, ghc, ghcBignum, ghcBoot, ghcBootTh
, ghcCompact, ghcConfig, ghcHeap, ghci, ghciWrapper, ghcPkg, ghcPrim, haddock, haskeline, hsc2hs
, hp2ps, hpc, hpcBin, integerGmp, integerSimple, iserv, libffi, mtl
- , parsec, pretty, process, rts, runGhc, stm, templateHaskell
+ , parsec, pretty, process, rts, runGhc, stm, semaphoreCompat, templateHaskell
, terminfo, text, time, touchy, transformers, unlit, unix, win32, xhtml
, timeout
, lintersCommon
@@ -55,7 +55,7 @@ array, base, binary, bytestring, cabalSyntax, cabal, checkPpr, checkExact, count
exceptions, filepath, genapply, genprimopcode, ghc, ghcBignum, ghcBoot, ghcBootTh,
ghcCompact, ghcConfig, ghcHeap, ghci, ghciWrapper, ghcPkg, ghcPrim, haddock, haskeline, hsc2hs,
hp2ps, hpc, hpcBin, integerGmp, integerSimple, iserv, iservProxy, remoteIserv, libffi, mtl,
- parsec, pretty, primitive, process, rts, runGhc, stm, templateHaskell,
+ parsec, pretty, primitive, process, rts, runGhc, semaphoreCompat, stm, templateHaskell,
terminfo, text, time, touchy, transformers, unlit, unix, win32, xhtml,
lintersCommon, lintNotes, lintCommitMsg, lintSubmoduleRefs, lintWhitespace
@@ -110,6 +110,7 @@ process = lib "process"
remoteIserv = util "remote-iserv"
rts = top "rts"
runGhc = util "runghc"
+semaphoreCompat = lib "semaphore-compat"
stm = lib "stm"
templateHaskell = lib "template-haskell"
terminfo = lib "terminfo"
@@ -171,6 +171,7 @@ toolTargets = [ binary
, templateHaskell
, text
, transformers
+ , semaphoreCompat
, unlit -- # executable
] ++ if windowsHost then [ win32 ] else [ unix ]
@@ -95,6 +95,7 @@ stage0Packages = do
, hpcBin
, mtl
, parsec
+ , semaphoreCompat
, time
, templateHaskell
, text
@@ -142,6 +143,7 @@ stage1Packages = do
, integerGmp
, pretty
, rts
+ , semaphoreCompat
, stm
, unlit
, xhtml
@@ -0,0 +1 @@
+Subproject commit 663ef75467995acf41c51d3e21d03347e85b844e
@@ -65,5 +65,6 @@ libraries/Win32 - - https:/
libraries/xhtml - - https://github.com/haskell/xhtml.git
libraries/exceptions - - https://github.com/ekmett/exceptions.git
nofib nofib - -
+libraries/semaphore-compat - - -
libraries/stm - - ssh://git@github.com/haskell/stm.git
. - ghc.git -
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/commit/1755da7f439bbedd9dd3bdfd3ef2366f50c8fea1
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/commit/1755da7f439bbedd9dd3bdfd3ef2366f50c8fea1
You're receiving this email because of your account on gitlab.haskell.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/ghc-commits/attachments/20230322/803775ac/attachment-0001.html>
More information about the ghc-commits
mailing list