[Git][ghc/ghc][wip/jsem] parent ad612f555821a44260e5d9654f940b71f5180817
Matthew Pickering (@mpickering)
gitlab at gitlab.haskell.org
Sun Dec 18 13:30:14 UTC 2022
Matthew Pickering pushed to branch wip/jsem at Glasgow Haskell Compiler / GHC
Commits:
aabed582 by sheaf at 2022-12-18T13:29:58+00:00
parent ad612f555821a44260e5d9654f940b71f5180817
author sheaf <sam.derbyshire at gmail.com> 1662553354 +0200
committer Matthew Pickering <matthewtpickering at gmail.com> 1671366685 +0000
WIP: jsem, using POSIX/Win32 semaphores
Updates submodule
- - - - -
15 changed files:
- .gitmodules
- cabal.project-reinstall
- compiler/GHC/Driver/Config.hs
- compiler/GHC/Driver/Make.hs
- + compiler/GHC/Driver/MakeSem.hs
- compiler/GHC/Driver/Pipeline/LogQueue.hs
- compiler/GHC/Driver/Session.hs
- compiler/ghc.cabal.in
- docs/users_guide/using.rst
- hadrian/src/Packages.hs
- hadrian/src/Settings/Default.hs
- libraries/Win32
- + libraries/semaphore-compat
- libraries/unix
- packages
Changes:
=====================================
.gitmodules
=====================================
@@ -83,6 +83,10 @@
url = https://gitlab.haskell.org/ghc/packages/unix.git
ignore = untracked
branch = 2.7
+[submodule "libraries/semaphore-compat"]
+ path = libraries/semaphore-compat
+ url = git at gitlab.haskell.org:ghc/semaphore-compat.git
+ ignore = untracked
[submodule "libraries/stm"]
path = libraries/stm
url = https://gitlab.haskell.org/ghc/packages/stm.git
=====================================
cabal.project-reinstall
=====================================
@@ -29,6 +29,7 @@ packages: ./compiler
./libraries/parsec/
-- ./libraries/pretty/
./libraries/process/
+ ./libraries/semaphore-compat
./libraries/stm
-- ./libraries/template-haskell/
./libraries/terminfo/
=====================================
compiler/GHC/Driver/Config.hs
=====================================
@@ -38,8 +38,10 @@ initBCOOpts dflags = do
-- Serializing ResolvedBCO is expensive, so if we're in parallel mode
-- (-j<n>) parallelise the serialization.
n_jobs <- case parMakeCount dflags of
- Nothing -> liftIO getNumProcessors
- Just n -> return n
+ Nothing -> pure 1
+ Just (ParMakeThisMany n) -> pure n
+ Just ParMakeNumProcessors -> liftIO getNumProcessors
+ -- jsem TODO
return $ BCOOpts n_jobs
-- | Extract GHCi options from DynFlags and step
=====================================
compiler/GHC/Driver/Make.hs
=====================================
@@ -75,6 +75,7 @@ import GHC.Driver.Env
import GHC.Driver.Errors
import GHC.Driver.Errors.Types
import GHC.Driver.Main
+import GHC.Driver.MakeSem
import GHC.Parser.Header
@@ -150,9 +151,9 @@ import GHC.Runtime.Loader
import GHC.Rename.Names
import GHC.Utils.Constants
import GHC.Types.Unique.DFM (udfmRestrictKeysSet)
-import qualified Data.IntSet as I
import GHC.Types.Unique
+import qualified Data.IntSet as I
-- -----------------------------------------------------------------------------
-- Loading the program
@@ -655,6 +656,34 @@ createBuildPlan mod_graph maybe_top_mod =
(vcat [text "Build plan missing nodes:", (text "PLAN:" <+> ppr (sum (map countMods build_plan))), (text "GRAPH:" <+> ppr (length (mgModSummaries' mod_graph )))])
build_plan
+-- data ParMakeMode = ParallelMake | SequentialMake
+-- deriving (Eq, Show)
+
+mkWorkerLimit :: DynFlags -> IO WorkerLimit
+mkWorkerLimit dflags =
+ case jsemHandle dflags of
+ Just h -> pure (JSemLimit $ SemaphoreName h)
+ Nothing -> case parMakeCount dflags of
+ Nothing -> pure $ num_procs 1
+ Just ParMakeNumProcessors -> num_procs <$> getNumProcessors
+ Just (ParMakeThisMany n) -> pure $ num_procs n
+ where
+ num_procs x = NumProcessorsLimit (max 1 x)
+
+isWorkerLimitSequential :: WorkerLimit -> Bool
+isWorkerLimitSequential (NumProcessorsLimit x) = x <= 1
+isWorkerLimitSequential (JSemLimit {}) = False
+
+-- | This describes what we use to limit the number of jobs, either we limit it
+-- ourselves to a specific number or we have an external parallelism semaphore
+-- limit it for us.
+data WorkerLimit
+ = NumProcessorsLimit Int
+ | JSemLimit
+ SemaphoreName
+ -- ^ Semaphore name to use
+ deriving Eq
+
-- | Generalized version of 'load' which also supports a custom
-- 'Messager' (for reporting progress) and 'ModuleGraph' (generally
-- produced by calling 'depanal'.
@@ -735,14 +764,12 @@ load' mhmi_cache how_much mHscMessage mod_graph = do
liftIO $ debugTraceMsg logger 2 (hang (text "Ready for upsweep")
2 (ppr build_plan))
- n_jobs <- case parMakeCount (hsc_dflags hsc_env) of
- Nothing -> liftIO getNumProcessors
- Just n -> return n
+ worker_limit <- liftIO $ mkWorkerLimit dflags
setSession $ hscUpdateHUG (unitEnv_map pruneHomeUnitEnv) hsc_env
(upsweep_ok, hsc_env1) <- withDeferredDiagnostics $ do
hsc_env <- getSession
- liftIO $ upsweep n_jobs hsc_env mhmi_cache mHscMessage (toCache pruned_cache) build_plan
+ liftIO $ upsweep worker_limit hsc_env mhmi_cache mHscMessage (toCache pruned_cache) build_plan
setSession hsc_env1
case upsweep_ok of
Failed -> loadFinish upsweep_ok
@@ -1023,13 +1050,7 @@ getDependencies direct_deps build_map =
type BuildM a = StateT BuildLoopState IO a
--- | Abstraction over the operations of a semaphore which allows usage with the
--- -j1 case
-data AbstractSem = AbstractSem { acquireSem :: IO ()
- , releaseSem :: IO () }
-withAbstractSem :: AbstractSem -> IO b -> IO b
-withAbstractSem sem = MC.bracket_ (acquireSem sem) (releaseSem sem)
-- | Environment used when compiling a module
data MakeEnv = MakeEnv { hsc_env :: !HscEnv -- The basic HscEnv which will be augmented for each module
@@ -1214,7 +1235,7 @@ withCurrentUnit uid = do
local (\env -> env { hsc_env = hscSetActiveUnitId uid (hsc_env env)})
upsweep
- :: Int -- ^ The number of workers we wish to run in parallel
+ :: WorkerLimit -- ^ The number of workers we wish to run in parallel
-> HscEnv -- ^ The base HscEnv, which is augmented for each module
-> Maybe ModIfaceCache -- ^ A cache to incrementally write final interface files to
-> Maybe Messager
@@ -2818,7 +2839,7 @@ label_self thread_name = do
CC.labelThread self_tid thread_name
-runPipelines :: Int -> HscEnv -> Maybe Messager -> [MakeAction] -> IO ()
+runPipelines :: WorkerLimit -> HscEnv -> Maybe Messager -> [MakeAction] -> IO ()
-- Don't even initialise plugins if there are no pipelines
runPipelines _ _ _ [] = return ()
runPipelines n_job orig_hsc_env mHscMessager all_pipelines = do
@@ -2826,7 +2847,7 @@ runPipelines n_job orig_hsc_env mHscMessager all_pipelines = do
plugins_hsc_env <- initializePlugins orig_hsc_env
case n_job of
- 1 -> runSeqPipelines plugins_hsc_env mHscMessager all_pipelines
+ NumProcessorsLimit n | n <= 1 -> runSeqPipelines plugins_hsc_env mHscMessager all_pipelines
_n -> runParPipelines n_job plugins_hsc_env mHscMessager all_pipelines
runSeqPipelines :: HscEnv -> Maybe Messager -> [MakeAction] -> IO ()
@@ -2836,16 +2857,40 @@ runSeqPipelines plugin_hsc_env mHscMessager all_pipelines =
, compile_sem = AbstractSem (return ()) (return ())
, env_messager = mHscMessager
}
- in runAllPipelines 1 env all_pipelines
+ in runAllPipelines (NumProcessorsLimit 1) env all_pipelines
+
+ -- TODO remove this capabilities management, it will be handled by the semaphore
+runNjobsAbstractSem :: Int -> (AbstractSem -> IO a) -> IO a
+runNjobsAbstractSem n_jobs action = do
+ compile_sem <- newQSem n_jobs
+ n_capabilities <- getNumCapabilities
+ n_cpus <- getNumProcessors
+ let
+ asem = AbstractSem (waitQSem compile_sem) (signalQSem compile_sem)
+ set_num_caps n = unless (n_capabilities /= 1) $ setNumCapabilities n
+ updNumCapabilities = do
+ -- Setting number of capabilities more than
+ -- CPU count usually leads to high userspace
+ -- lock contention. #9221
+ set_num_caps $ min n_jobs n_cpus
+ resetNumCapabilities = set_num_caps n_capabilities
+ MC.bracket_ updNumCapabilities resetNumCapabilities $ action asem
+
+runWorkerLimit :: WorkerLimit -> (AbstractSem -> IO a) -> IO a
+runWorkerLimit worker_limit action = case worker_limit of
+ NumProcessorsLimit n_jobs ->
+ runNjobsAbstractSem n_jobs action
+ JSemLimit sem ->
+ runJSemAbstractSem sem action
-- | Build and run a pipeline
-runParPipelines :: Int -- ^ How many capabilities to use
- -> HscEnv -- ^ The basic HscEnv which is augmented with specific info for each module
+runParPipelines :: WorkerLimit -- ^ How to limit work parallelism
+ -> HscEnv -- ^ The basic HscEnv which is augmented with specific info for each module
-> Maybe Messager -- ^ Optional custom messager to use to report progress
-> [MakeAction] -- ^ The build plan for all the module nodes
-> IO ()
-runParPipelines n_jobs plugin_hsc_env mHscMessager all_pipelines = do
+runParPipelines worker_limit plugin_hsc_env mHscMessager all_pipelines = do
-- A variable which we write to when an error has happened and we have to tell the
@@ -2855,39 +2900,23 @@ runParPipelines n_jobs plugin_hsc_env mHscMessager all_pipelines = do
-- will add it's LogQueue into this queue.
log_queue_queue_var <- newTVarIO newLogQueueQueue
-- Thread which coordinates the printing of logs
- wait_log_thread <- logThread n_jobs (length all_pipelines) (hsc_logger plugin_hsc_env) stopped_var log_queue_queue_var
+ wait_log_thread <- logThread (hsc_logger plugin_hsc_env) stopped_var log_queue_queue_var
-- Make the logger thread-safe, in case there is some output which isn't sent via the LogQueue.
thread_safe_logger <- liftIO $ makeThreadSafe (hsc_logger plugin_hsc_env)
let thread_safe_hsc_env = plugin_hsc_env { hsc_logger = thread_safe_logger }
- let updNumCapabilities = liftIO $ do
- n_capabilities <- getNumCapabilities
- n_cpus <- getNumProcessors
- -- Setting number of capabilities more than
- -- CPU count usually leads to high userspace
- -- lock contention. #9221
- let n_caps = min n_jobs n_cpus
- unless (n_capabilities /= 1) $ setNumCapabilities n_caps
- return n_capabilities
-
- let resetNumCapabilities orig_n = do
- liftIO $ setNumCapabilities orig_n
- atomically $ writeTVar stopped_var True
- wait_log_thread
-
- compile_sem <- newQSem n_jobs
- let abstract_sem = AbstractSem (waitQSem compile_sem) (signalQSem compile_sem)
+ runWorkerLimit worker_limit $ \abstract_sem -> do
+ let env = MakeEnv { hsc_env = thread_safe_hsc_env
+ , withLogger = withParLog log_queue_queue_var
+ , compile_sem = abstract_sem
+ , env_messager = mHscMessager
+ }
-- Reset the number of capabilities once the upsweep ends.
- let env = MakeEnv { hsc_env = thread_safe_hsc_env
- , withLogger = withParLog log_queue_queue_var
- , compile_sem = abstract_sem
- , env_messager = mHscMessager
- }
-
- MC.bracket updNumCapabilities resetNumCapabilities $ \_ ->
- runAllPipelines n_jobs env all_pipelines
+ runAllPipelines worker_limit env all_pipelines
+ atomically $ writeTVar stopped_var True
+ wait_log_thread
withLocalTmpFS :: RunMakeM a -> RunMakeM a
withLocalTmpFS act = do
@@ -2904,10 +2933,11 @@ withLocalTmpFS act = do
MC.bracket initialiser finaliser $ \lcl_hsc_env -> local (\env -> env { hsc_env = lcl_hsc_env}) act
-- | Run the given actions and then wait for them all to finish.
-runAllPipelines :: Int -> MakeEnv -> [MakeAction] -> IO ()
-runAllPipelines n_jobs env acts = do
- let spawn_actions :: IO [ThreadId]
- spawn_actions = if n_jobs == 1
+runAllPipelines :: WorkerLimit -> MakeEnv -> [MakeAction] -> IO ()
+runAllPipelines worker_limit env acts = do
+ let single_worker = isWorkerLimitSequential worker_limit
+ spawn_actions :: IO [ThreadId]
+ spawn_actions = if single_worker
then (:[]) <$> (forkIOWithUnmask $ \unmask -> void $ runLoop (\io -> io unmask) env acts)
else runLoop forkIOWithUnmask env acts
=====================================
compiler/GHC/Driver/MakeSem.hs
=====================================
@@ -0,0 +1,548 @@
+{-# LANGUAGE BlockArguments #-}
+{-# LANGUAGE NamedFieldPuns #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE TupleSections #-}
+{-# LANGUAGE NumericUnderscores #-}
+
+-- | Implementation of a jobserver using system semaphores.
+--
+--
+module GHC.Driver.MakeSem
+ ( -- * JSem: parallelism semaphore backed
+ -- by a system semaphore (Posix/Windows)
+ runJSemAbstractSem
+
+ -- * System semaphores
+ , Semaphore, SemaphoreName(..)
+
+ -- * Abstract semaphores
+ , AbstractSem(..)
+ , withAbstractSem
+ )
+ where
+
+import GHC.Prelude
+import GHC.Conc
+import GHC.Data.OrdList
+import GHC.IO.Exception
+import GHC.Utils.Outputable
+import GHC.Utils.Panic
+import GHC.Utils.Json
+
+import System.Semaphore
+
+import Control.Monad
+import qualified Control.Monad.Catch as MC
+import Control.Concurrent.MVar
+import Control.Concurrent.STM
+import Data.Foldable
+import Data.Functor
+import GHC.Stack
+import Debug.Trace
+
+---------------------------------------
+-- Semaphore jobserver
+
+-- | A jobserver based off a system 'Semaphore'.
+--
+-- Keeps track of the pending jobs and resources
+-- available from the semaphore.
+data Jobserver
+ = Jobserver
+ { jSemaphore :: !Semaphore
+ -- ^ The semaphore which controls available resources
+ , jobs :: !(TVar JobResources)
+ -- ^ The currently pending jobs, and the resources
+ -- obtained from the semaphore
+ }
+
+data JobserverOptions
+ = JobserverOptions
+ { releaseDebounce :: !Int
+ -- ^ Minimum delay, in milliseconds, between acquiring a token
+ -- and releasing a token.
+ , setNumCapsDebounce :: !Int
+ -- ^ Minimum delay, in milliseconds, between two consecutive
+ -- calls of 'setNumCapabilities'.
+ }
+
+defaultJobserverOptions :: JobserverOptions
+defaultJobserverOptions =
+ JobserverOptions
+ { releaseDebounce = 1000 -- 1 second
+ , setNumCapsDebounce = 1000 -- 1 second
+ }
+
+-- | Resources available for running jobs, i.e.
+-- tokens obtained from the parallelism semaphore.
+data JobResources
+ = Jobs
+ { tokensOwned :: !Int
+ -- ^ How many tokens have been claimed from the semaphore
+ , tokensFree :: !Int
+ -- ^ How many tokens are not currently being used
+ , jobsWaiting :: !(OrdList (TMVar ()))
+ -- ^ Pending jobs waiting on a token
+ }
+
+instance Outputable JobResources where
+ ppr Jobs{..}
+ = text "JobResources" <+>
+ ( braces $ hsep
+ [ text "owned=" <> ppr tokensOwned
+ , text "free=" <> ppr tokensFree
+ , text "num_waiting=" <> ppr (length jobsWaiting)
+ ] )
+
+-- | Add one new token.
+addToken :: JobResources -> JobResources
+addToken jobs@( Jobs { tokensOwned = owned, tokensFree = free })
+ = --if owned > 6 then pprPanic "addToken" (ppr jobs)
+ -- else
+ jobs { tokensOwned = owned + 1, tokensFree = free + 1 }
+
+-- | Free one token.
+addFreeToken :: JobResources -> JobResources
+addFreeToken jobs@( Jobs { tokensFree = free })
+ = assertPpr (tokensOwned jobs > free)
+ (text "addFreeToken:" <+> ppr (tokensOwned jobs) <+> ppr free)
+ $ jobs { tokensFree = free + 1 }
+
+-- | Use up one token.
+removeFreeToken :: JobResources -> JobResources
+removeFreeToken jobs@( Jobs { tokensFree = free })
+ = assertPpr (free > 0)
+ (text "removeFreeToken:" <+> ppr free)
+ $ jobs { tokensFree = free - 1 }
+
+-- | Return one owned token.
+removeOwnedToken :: JobResources -> JobResources
+removeOwnedToken jobs@( Jobs { tokensOwned = owned })
+ = assertPpr (owned > 1)
+ (text "removeOwnedToken:" <+> ppr owned)
+ $ jobs { tokensOwned = owned - 1 }
+
+-- | Add one new job to the end of the list of pending jobs.
+addJob :: TMVar () -> JobResources -> JobResources
+addJob job jobs@( Jobs { jobsWaiting = wait })
+ = jobs { jobsWaiting = wait `SnocOL` job }
+
+-- | The state of the semaphore job server.
+data JobserverState
+ = JobserverState
+ { jobserverAction :: !JobserverAction
+ -- ^ The current action being performed by the
+ -- job server.
+ , canChangeNumCaps :: !(TVar Bool)
+ -- ^ A TVar that signals whether it has been long
+ -- enough since we last changed 'numCapabilities'.
+ , canReleaseToken :: !(TVar Bool)
+ -- ^ A TVar that signals whether we last acquired
+ -- a token long enough ago that we can now release
+ -- a token.
+ }
+data JobserverAction
+ -- | The jobserver is idle: no thread is currently
+ -- interacting with the semaphore.
+ = Idle
+ -- | A thread is waiting for a token on the semaphore.
+ | Acquiring
+ { activeWaitId :: WaitId
+ , threadFinished :: TMVar (Maybe MC.SomeException) }
+
+-- | Retrieve the 'TMVar' that signals if the current thread has finished,
+-- if any thread is currently active in the jobserver.
+activeThread_maybe :: JobserverAction -> Maybe (TMVar (Maybe MC.SomeException))
+activeThread_maybe Idle = Nothing
+activeThread_maybe (Acquiring { threadFinished = tmvar }) = Just tmvar
+
+-- | Whether we should try to acquire a new token from the semaphore:
+-- there is a pending job and no free tokens.
+guardAcquire :: JobResources -> Bool
+guardAcquire ( Jobs { tokensFree, jobsWaiting } )
+ = tokensFree == 0 && not (null jobsWaiting)
+
+-- | Whether we should release a token from the semaphore:
+-- there are no pending jobs and we can release a token.
+guardRelease :: JobResources -> Bool
+guardRelease ( Jobs { tokensFree, tokensOwned, jobsWaiting } )
+ = null jobsWaiting && tokensFree > 0 && tokensOwned > 1
+
+---------------------------------------
+-- Semaphore jobserver implementation
+
+-- | Add one pending job to the jobserver.
+--
+-- Blocks, waiting on the jobserver to supply a free token.
+acquireJob :: TVar JobResources -> IO ()
+acquireJob jobs_tvar = do
+ (job_tmvar, _jobs0) <- tracedAtomically "acquire" $
+ modifyJobResources jobs_tvar \ jobs -> do
+ job_tmvar <- newEmptyTMVar
+ return ((job_tmvar, jobs), addJob job_tmvar jobs)
+ atomically $ takeTMVar job_tmvar
+
+-- | Signal to the job server that one job has completed,
+-- releasing its corresponding token.
+releaseJob :: TVar JobResources -> IO ()
+releaseJob jobs_tvar = do
+ tracedAtomically "release" do
+ modifyJobResources jobs_tvar \ jobs -> do
+ massertPpr (tokensFree jobs < tokensOwned jobs)
+ (text "releaseJob: more free jobs than owned jobs!")
+ return ((), addFreeToken jobs)
+
+
+-- | Release all tokens owned from the semaphore (to clean up
+-- the jobserver at the end).
+cleanupJobserver :: Jobserver -> IO ()
+cleanupJobserver (Jobserver { jSemaphore = sem
+ , jobs = jobs_tvar })
+ = do
+ Jobs { tokensOwned = owned } <- readTVarIO jobs_tvar
+ let toks_to_release = owned - 1
+ -- Subtract off the implicit token: whoever spawned the ghc process
+ -- in the first place is responsible for that token.
+ releaseSemaphore sem toks_to_release
+
+-- | Dispatch the available tokens acquired from the semaphore
+-- to the pending jobs in the job server.
+dispatchTokens :: JobResources -> STM JobResources
+dispatchTokens jobs@( Jobs { tokensFree = toks_free, jobsWaiting = wait } )
+ | toks_free > 0
+ , next `ConsOL` rest <- wait
+ -- There's a pending job and a free token:
+ -- pass on the token to that job, and recur.
+ = do
+ putTMVar next ()
+ let jobs' = jobs { tokensFree = toks_free - 1, jobsWaiting = rest }
+ dispatchTokens jobs'
+ | otherwise
+ = return jobs
+
+-- | Update the available resources used from a semaphore, dispatching
+-- any newly acquired resources.
+--
+-- Invariant: if the number of available resources decreases, there
+-- must be no pending jobs.
+--
+-- All modifications should go through this function to ensure the contents
+-- of the 'TVar' remains in normal form.
+modifyJobResources :: HasCallStack => TVar JobResources
+ -> (JobResources -> STM (a, JobResources))
+ -> STM (a, Maybe JobResources)
+modifyJobResources jobs_tvar action = do
+ old_jobs <- readTVar jobs_tvar
+ (a, jobs) <- action old_jobs
+
+ -- Check the invariant: if the number of free tokens has decreased,
+ -- there must be no pending jobs.
+ massertPpr (null (jobsWaiting jobs) || tokensFree jobs >= tokensFree old_jobs) $
+ vcat [ text "modiyJobResources: pending jobs but fewer free tokens" ]
+ dispatched_jobs <- dispatchTokens jobs
+ writeTVar jobs_tvar dispatched_jobs
+ return (a, Just dispatched_jobs)
+
+
+tracedAtomically_ :: String -> STM (Maybe JobResources) -> IO ()
+tracedAtomically_ s act = tracedAtomically s (((),) <$> act)
+
+tracedAtomically :: String -> STM (a, Maybe JobResources) -> IO a
+tracedAtomically origin act = do
+ (a, mjr) <- atomically act
+ forM_ mjr $ \ jr -> do
+ -- Use the "jsem:" prefix to identify where the write traces are
+ traceEventIO ("jsem:" ++ renderJobResources origin jr)
+ return a
+
+renderJobResources :: String -> JobResources -> String
+renderJobResources origin (Jobs own free pending) = showSDocUnsafe $ renderJSON $
+ JSObject [ ("name", JSString origin)
+ , ("owned", JSInt own)
+ , ("free", JSInt free)
+ , ("pending", JSInt (length pending) )
+ ]
+
+
+-- | Spawn a new thread that waits on the semaphore in order to acquire
+-- an additional token.
+acquireThread :: Jobserver -> IO JobserverAction
+acquireThread (Jobserver { jSemaphore = sem, jobs = jobs_tvar }) = do
+ threadFinished_tmvar <- newEmptyTMVarIO
+ let
+ wait_result_action :: Either MC.SomeException Bool -> IO ()
+ wait_result_action wait_res =
+ tracedAtomically_ "acquire_thread" do
+ (r, jb) <- case wait_res of
+ Left (e :: MC.SomeException) -> do
+ return $ (Just e, Nothing)
+ Right success -> do
+ if success
+ then do
+ modifyJobResources jobs_tvar \ jobs ->
+ return (Nothing, addToken jobs)
+ else
+ return (Nothing, Nothing)
+ putTMVar threadFinished_tmvar r
+ return jb
+ wait_id <- forkWaitOnSemaphoreInterruptible sem wait_result_action
+ labelThread (waitingThreadId wait_id) "acquire_thread"
+ return $ Acquiring { activeWaitId = wait_id
+ , threadFinished = threadFinished_tmvar }
+
+-- | Spawn a thread to release ownership of one resource from the semaphore,
+-- provided we have spare resources and no pending jobs.
+releaseThread :: Jobserver -> IO JobserverAction
+releaseThread (Jobserver { jSemaphore = sem, jobs = jobs_tvar }) = do
+ threadFinished_tmvar <- newEmptyTMVarIO
+ MC.mask_ do
+ -- Pre-release the resource so that another thread doesn't take control of it
+ -- just as we release the lock on the semaphore.
+ still_ok_to_release
+ <- tracedAtomically "pre_release" $
+ modifyJobResources jobs_tvar \ jobs ->
+ if guardRelease jobs
+ -- TODO: should this also debounce?
+ then return (True , removeOwnedToken $ removeFreeToken jobs)
+ else return (False, jobs)
+ if not still_ok_to_release
+ then return Idle
+ else do
+ tid <- forkIO $ do
+ x <- MC.try $ releaseSemaphore sem 1
+ tracedAtomically_ "post-release" $ do
+ (r, jobs) <- case x of
+ Left (e :: MC.SomeException) -> do
+ modifyJobResources jobs_tvar \ jobs ->
+ return (Just e, addToken jobs)
+ Right _ -> do
+ return (Nothing, Nothing)
+ putTMVar threadFinished_tmvar r
+ return jobs
+ labelThread tid "release_thread"
+ return Idle
+
+-- | When there are pending jobs but no free tokens,
+-- spawn a thread to acquire a new token from the semaphore.
+--
+-- See 'acquireThread'.
+tryAcquire :: JobserverOptions
+ -> Jobserver
+ -> JobserverState
+ -> STM (IO JobserverState)
+tryAcquire opts js@( Jobserver { jobs = jobs_tvar })
+ st@( JobserverState { jobserverAction = Idle } )
+ = do
+ jobs <- readTVar jobs_tvar
+ guard $ guardAcquire jobs
+ return do
+ action <- acquireThread js
+ -- Set a debounce after acquiring a token.
+ can_release_tvar <- registerDelay $ (releaseDebounce opts * 1000)
+ return $ st { jobserverAction = action
+ , canReleaseToken = can_release_tvar }
+tryAcquire _ _ _ = retry
+
+-- | When there are free tokens and no pending jobs,
+-- spawn a thread to release a token from the semamphore.
+--
+-- See 'releaseThread'.
+tryRelease :: Jobserver
+ -> JobserverState
+ -> STM (IO JobserverState)
+tryRelease sjs@( Jobserver { jobs = jobs_tvar } )
+ st@( JobserverState
+ { jobserverAction = Idle
+ , canReleaseToken = can_release_tvar } )
+ = do
+ jobs <- readTVar jobs_tvar
+ guard $ guardRelease jobs
+ can_release <- readTVar can_release_tvar
+ guard can_release
+ return do
+ action <- releaseThread sjs
+ return $ st { jobserverAction = action }
+tryRelease _ _ = retry
+
+-- | Wait for an active thread to finish. Once it finishes:
+--
+-- - set the 'JobserverAction' to 'Idle',
+-- - update the number of capabilities to reflect the number
+-- of owned tokens from the semaphore.
+tryNoticeIdle :: JobserverOptions
+ -> TVar JobResources
+ -> JobserverState
+ -> STM (IO JobserverState)
+tryNoticeIdle opts jobs_tvar jobserver_state
+ | Just threadFinished_tmvar <- activeThread_maybe $ jobserverAction jobserver_state
+ = sync_num_caps (canChangeNumCaps jobserver_state) threadFinished_tmvar
+ | otherwise
+ = retry -- no active thread: wait until jobserver isn't idle
+ where
+ sync_num_caps :: TVar Bool
+ -> TMVar (Maybe MC.SomeException)
+ -> STM (IO JobserverState)
+ sync_num_caps can_change_numcaps_tvar threadFinished_tmvar = do
+ mb_ex <- takeTMVar threadFinished_tmvar
+ for_ mb_ex MC.throwM
+ Jobs { tokensOwned } <- readTVar jobs_tvar
+ can_change_numcaps <- readTVar can_change_numcaps_tvar
+ guard can_change_numcaps
+ return do
+ x <- getNumCapabilities
+ can_change_numcaps_tvar_2 <-
+ if x == tokensOwned
+ then return can_change_numcaps_tvar
+ else do
+ setNumCapabilities tokensOwned
+ registerDelay $ (setNumCapsDebounce opts * 1000)
+ return $
+ jobserver_state
+ { jobserverAction = Idle
+ , canChangeNumCaps = can_change_numcaps_tvar_2 }
+
+-- | Try to stop the current thread which is acquiring/releasing resources
+-- if that operation is no longer relevant.
+tryStopThread :: TVar JobResources
+ -> JobserverState
+ -> STM (IO JobserverState)
+tryStopThread jobs_tvar jsj = do
+ case jobserverAction jsj of
+ Acquiring { activeWaitId = wait_id } -> do
+ jobs <- readTVar jobs_tvar
+ guard $ null (jobsWaiting jobs)
+ return do
+ interruptWaitOnSemaphore wait_id
+ return $ jsj { jobserverAction = Idle }
+ _ -> retry
+
+-- | Main jobserver loop: acquire/release resources as
+-- needed for the pending jobs and available semaphore tokens.
+jobserverLoop :: JobserverOptions -> Jobserver -> IO ()
+jobserverLoop opts sjs@(Jobserver { jobs = jobs_tvar })
+ = do
+ true_tvar <- newTVarIO True
+ let init_state :: JobserverState
+ init_state =
+ JobserverState
+ { jobserverAction = Idle
+ , canChangeNumCaps = true_tvar
+ , canReleaseToken = true_tvar }
+ loop init_state
+ where
+ loop s = do
+ action <- atomically $ asum $ (\x -> x s) <$>
+ [ tryRelease sjs
+ , tryAcquire opts sjs
+ , tryNoticeIdle opts jobs_tvar
+ , tryStopThread jobs_tvar
+ ]
+ s <- action
+ loop s
+
+-- | Create a new jobserver using the given semaphore handle.
+makeJobserver :: SemaphoreName -> IO (AbstractSem, IO ())
+makeJobserver sem_name = do
+ semaphore <- openSemaphore sem_name
+ let
+ init_jobs =
+ Jobs { tokensOwned = 1
+ , tokensFree = 1
+ , jobsWaiting = NilOL
+ }
+ jobs_tvar <- newTVarIO init_jobs
+ let
+ opts = defaultJobserverOptions -- TODO: allow this to be configured
+ sjs = Jobserver { jSemaphore = semaphore
+ , jobs = jobs_tvar }
+ loop_finished_mvar <- newEmptyMVar
+ loop_tid <- forkIOWithUnmask \ unmask -> do
+ r <- try $ unmask $ jobserverLoop opts sjs
+ putMVar loop_finished_mvar $
+ case r of
+ Left e
+ | Just ThreadKilled <- fromException e
+ -> Nothing
+ | otherwise
+ -> Just e
+ Right () -> Nothing
+ labelThread loop_tid "job_server"
+ let
+ acquireSem = acquireJob jobs_tvar
+ releaseSem = releaseJob jobs_tvar
+ cleanupSem = do
+ -- this is interruptible
+ cleanupJobserver sjs
+ killThread loop_tid
+ mb_ex <- takeMVar loop_finished_mvar
+ for_ mb_ex MC.throwM
+
+ return (AbstractSem{..}, cleanupSem)
+
+-- | Implement an abstract semaphore using a semaphore 'Jobserver'
+-- which queries the system semaphore of the given name for resources.
+runJSemAbstractSem :: SemaphoreName -- ^ the system semaphore to use
+ -> (AbstractSem -> IO a) -- ^ the operation to run
+ -- which requires a semaphore
+ -> IO a
+runJSemAbstractSem sem action = MC.mask \ unmask -> do
+ (abs, cleanup) <- makeJobserver sem
+ r <- try $ unmask $ action abs
+ case r of
+ Left (e1 :: MC.SomeException) -> do
+ (_ :: Either MC.SomeException ()) <- MC.try cleanup
+ MC.throwM e1
+ Right x -> cleanup $> x
+
+{-
+Note [Architecture of the Job Server]
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+In `-jsem` mode the amount of parrelism that GHC can use is controlled by a
+system semaphore. We take resources from it when we need them and give them back
+if we don't have enought to do.
+
+A naive implementation would just take and release the semaphore around performing
+the action but this leads to two issues.
+
+* When taking a slot in the semaphore we must call `setNumCapabilities` in order
+ to adjust how many capabilities are available for parralel garbage collection. This
+ causes a synchronisation
+* We want to implement a debounce so that whilst there is pending work in the current
+ process we prefer to keep hold of resources from the semaphore. This reduces
+ overall memory usage as there are less live GHC processes at once.
+
+Therefore the obtention of semaphore resources is separated away from the
+request for the resource in the driver.
+
+A slot from the semaphore is requested using `acquireJob`, this creates a pending
+job which is a MVar which can be filling in to signal that the requested slot is ready.
+
+When the job is finished, the slot is released by calling `releaseJob`, which just
+increases the number of `free` jobs. If there are more pending jobs when the free count
+is increased the slot is immediately reused (see `modifyJobResources`).
+
+The `jobServerLoop` interacts with the system semaphore, when there are still pending
+jobs then `acquireThread` blocks waiting for a slot in the semaphore and increases
+the owned count when the slot is obtained.
+
+When there are free slots, no pending jobs and the debounce has expired
+then `releaseThread` will release slots back to the global semaphore.
+
+`tryStopThread` attempts to kill threads which are waiting to acquire a resource
+when we no longer need it. For example, consider that we attempt to acquire two
+slots of the semaphore but the first job finishes before we acquire the second resources,
+the second slot is no longer needed so we should cancel the wait (as it would not be used to
+do any work and not returned until the debounce). We just need to kill in the acquiring
+state because the releading state can't block.
+
+Note [Eventlog Messages for jsem]
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+It can be tricky to verify that the work is shared adequately across different
+processes. To help debug this whenever the global state changes the values of
+`JobResources` are output to the eventlog. There are some scripts which can be used
+to analyse this output and report statistics about core saturation in this
+github repo (https://github.com/mpickering/ghc-jsem-analyse).
+
+-}
=====================================
compiler/GHC/Driver/Pipeline/LogQueue.hs
=====================================
@@ -100,10 +100,10 @@ dequeueLogQueueQueue (LogQueueQueue n lqq) = case IM.minViewWithKey lqq of
Just ((k, v), lqq') | k == n -> Just (v, LogQueueQueue (n + 1) lqq')
_ -> Nothing
-logThread :: Int -> Int -> Logger -> TVar Bool -- Signal that no more new logs will be added, clear the queue and exit
+logThread :: Logger -> TVar Bool -- Signal that no more new logs will be added, clear the queue and exit
-> TVar LogQueueQueue -- Queue for logs
-> IO (IO ())
-logThread _ _ logger stopped lqq_var = do
+logThread logger stopped lqq_var = do
finished_var <- newEmptyMVar
_ <- forkIO $ print_logs *> putMVar finished_var ()
return (takeMVar finished_var)
=====================================
compiler/GHC/Driver/Session.hs
=====================================
@@ -43,6 +43,7 @@ module GHC.Driver.Session (
needSourceNotes,
OnOff(..),
DynFlags(..),
+ ParMakeCount(..),
outputFile, objectSuf, ways,
FlagSpec(..),
HasDynFlags(..), ContainsDynFlags(..),
@@ -461,9 +462,13 @@ data DynFlags = DynFlags {
ruleCheck :: Maybe String,
strictnessBefore :: [Int], -- ^ Additional demand analysis
- parMakeCount :: Maybe Int, -- ^ The number of modules to compile in parallel
- -- in --make mode, where Nothing ==> compile as
- -- many in parallel as there are CPUs.
+ parMakeCount :: Maybe ParMakeCount,
+ -- ^ The number of modules to compile in parallel
+ -- in --make mode, ignored with a warning if jobServerAuth is specified.
+ -- If unspecified, compile with a single job.
+
+ jsemHandle :: Maybe FilePath,
+ -- ^ A handle to a parallelism semaphore
enableTimeStats :: Bool, -- ^ Enable RTS timing statistics?
ghcHeapSize :: Maybe Int, -- ^ The heap size to set.
@@ -775,6 +780,12 @@ instance (Monad m, HasDynFlags m) => HasDynFlags (ExceptT e m) where
class ContainsDynFlags t where
extractDynFlags :: t -> DynFlags
+-- | The type for the -jN argument, specifying that -j on its own represents
+-- using the number of machine processors.
+data ParMakeCount
+ = ParMakeThisMany Int
+ | ParMakeNumProcessors
+
-----------------------------------------------------------------------------
-- Accessors from 'DynFlags'
@@ -1138,7 +1149,8 @@ defaultDynFlags mySettings =
historySize = 20,
strictnessBefore = [],
- parMakeCount = Just 1,
+ parMakeCount = Nothing,
+ jsemHandle = Nothing,
enableTimeStats = False,
ghcHeapSize = Nothing,
@@ -1902,19 +1914,25 @@ parseDynamicFlagsFull activeFlags cmdline dflags0 args = do
throwGhcExceptionIO (CmdLineError ("combination not supported: " ++
intercalate "/" (map wayDesc (Set.toAscList theWays))))
- let (dflags3, consistency_warnings) = makeDynFlagsConsistent dflags2
+ (dflags3, io_warnings) <- liftIO $ dynFlagsIOCheck dflags2
+
+ let (dflags4, consistency_warnings) = makeDynFlagsConsistent dflags3
-- Set timer stats & heap size
- when (enableTimeStats dflags3) $ liftIO enableTimingStats
- case (ghcHeapSize dflags3) of
+ when (enableTimeStats dflags4) $ liftIO enableTimingStats
+ case (ghcHeapSize dflags4) of
Just x -> liftIO (setHeapSize x)
_ -> return ()
- liftIO $ setUnsafeGlobalDynFlags dflags3
+ liftIO $ setUnsafeGlobalDynFlags dflags4
- let warns' = map (Warn WarningWithoutFlag) (consistency_warnings ++ sh_warns)
+ let warns' = map (Warn WarningWithoutFlag) (consistency_warnings ++ sh_warns ++ io_warnings)
- return (dflags3, leftover, warns' ++ warns)
+ return (dflags4, leftover, warns' ++ warns)
+
+-- | Perform checks and fixes on DynFlags which require IO
+dynFlagsIOCheck :: DynFlags -> IO (DynFlags, [Located String])
+dynFlagsIOCheck dflags = pure (dflags, [])
-- | Check (and potentially disable) any extensions that aren't allowed
-- in safe mode.
@@ -2054,14 +2072,16 @@ dynamic_flags_deps = [
, make_ord_flag defGhcFlag "j" (OptIntSuffix
(\n -> case n of
Just n
- | n > 0 -> upd (\d -> d { parMakeCount = Just n })
+ | n > 0 -> upd (\d -> d { parMakeCount = Just (ParMakeThisMany n) })
| otherwise -> addErr "Syntax: -j[n] where n > 0"
- Nothing -> upd (\d -> d { parMakeCount = Nothing })))
+ Nothing -> upd (\d -> d { parMakeCount = Just ParMakeNumProcessors })))
-- When the number of parallel builds
-- is omitted, it is the same
-- as specifying that the number of
-- parallel builds is equal to the
-- result of getNumProcessors
+ , make_ord_flag defGhcFlag "jsem" $ hasArg $ \f d -> d { jsemHandle = Just f }
+
, make_ord_flag defFlag "instantiated-with" (sepArg setUnitInstantiations)
, make_ord_flag defFlag "this-component-id" (sepArg setUnitInstanceOf)
@@ -4860,6 +4880,11 @@ makeDynFlagsConsistent dflags
, Nothing <- outputFile dflags
= pgmError "--output must be specified when using --merge-objs"
+ | Just _ <- jsemHandle dflags
+ , Just _ <- parMakeCount dflags
+ = loop dflags{parMakeCount = Nothing}
+ "`-j` argument is ignored when using `-jsem`"
+
| otherwise = (dflags, [])
where loc = mkGeneralSrcSpan (fsLit "when making flags consistent")
loop updated_dflags warning
=====================================
compiler/ghc.cabal.in
=====================================
@@ -90,6 +90,7 @@ Library
hpc == 0.6.*,
transformers == 0.5.*,
exceptions == 0.10.*,
+ semaphore-compat,
stm,
ghc-boot == @ProjectVersionMunged@,
ghc-heap == @ProjectVersionMunged@,
@@ -442,6 +443,7 @@ Library
GHC.Driver.GenerateCgIPEStub
GHC.Driver.Hooks
GHC.Driver.LlvmConfigCache
+ GHC.Driver.MakeSem
GHC.Driver.Main
GHC.Driver.Make
GHC.Driver.MakeFile
=====================================
docs/users_guide/using.rst
=====================================
@@ -751,6 +751,14 @@ search path (see :ref:`search-path`).
number of processors. Note that compilation of a module may not begin
until its dependencies have been built.
+.. ghc-flag:: -jsem
+ :shortdesc: stub
+ :type: dynamic
+ :category: misc
+
+ Stub
+
+
.. _multi-home-units:
Multiple Home Units
=====================================
hadrian/src/Packages.hs
=====================================
@@ -8,7 +8,7 @@ module Packages (
ghcCompact, ghcConfig, ghcHeap, ghci, ghciWrapper, ghcPkg, ghcPrim, haddock, haskeline,
hsc2hs, hp2ps, hpc, hpcBin, integerGmp, integerSimple, iserv, iservProxy,
libffi, libiserv, mtl, parsec, pretty, primitive, process, remoteIserv, rts,
- runGhc, stm, templateHaskell, terminfo, text, time, timeout, touchy,
+ runGhc, semaphoreCompat, stm, templateHaskell, terminfo, text, time, timeout, touchy,
transformers, unlit, unix, win32, xhtml,
lintersCommon, lintNotes, lintCommitMsg, lintSubmoduleRefs, lintWhitespace,
ghcPackages, isGhcPackage,
@@ -39,7 +39,7 @@ ghcPackages =
, exceptions, filepath, genapply, genprimopcode, ghc, ghcBignum, ghcBoot, ghcBootTh
, ghcCompact, ghcConfig, ghcHeap, ghci, ghciWrapper, ghcPkg, ghcPrim, haddock, haskeline, hsc2hs
, hp2ps, hpc, hpcBin, integerGmp, integerSimple, iserv, libffi, libiserv, mtl
- , parsec, pretty, process, rts, runGhc, stm, templateHaskell
+ , parsec, pretty, process, rts, runGhc, semaphoreCompat, stm, templateHaskell
, terminfo, text, time, touchy, transformers, unlit, unix, win32, xhtml
, timeout
, lintersCommon
@@ -55,7 +55,7 @@ array, base, binary, bytestring, cabalSyntax, cabal, checkPpr, checkExact, count
exceptions, filepath, genapply, genprimopcode, ghc, ghcBignum, ghcBoot, ghcBootTh,
ghcCompact, ghcConfig, ghcHeap, ghci, ghciWrapper, ghcPkg, ghcPrim, haddock, haskeline, hsc2hs,
hp2ps, hpc, hpcBin, integerGmp, integerSimple, iserv, iservProxy, remoteIserv, libffi, libiserv, mtl,
- parsec, pretty, primitive, process, rts, runGhc, stm, templateHaskell,
+ parsec, pretty, primitive, process, rts, runGhc, semaphoreCompat, stm, templateHaskell,
terminfo, text, time, touchy, transformers, unlit, unix, win32, xhtml,
timeout,
lintersCommon, lintNotes, lintCommitMsg, lintSubmoduleRefs, lintWhitespace
@@ -111,6 +111,7 @@ process = lib "process"
remoteIserv = util "remote-iserv"
rts = top "rts"
runGhc = util "runghc"
+semaphoreCompat = lib "semaphore-compat"
stm = lib "stm"
templateHaskell = lib "template-haskell"
terminfo = lib "terminfo"
=====================================
hadrian/src/Settings/Default.hs
=====================================
@@ -95,6 +95,7 @@ stage0Packages = do
, hpcBin
, mtl
, parsec
+ , semaphoreCompat
, time
, templateHaskell
, text
@@ -134,6 +135,7 @@ stage1Packages = do
, integerGmp
, pretty
, rts
+ , semaphoreCompat
, stm
, unlit
, xhtml
=====================================
libraries/Win32
=====================================
@@ -1 +1 @@
-Subproject commit 931497f7052f63cb5cfd4494a94e572c5c570642
+Subproject commit efab7f1146da9741dc54fb35476d4aaabeff8d6d
=====================================
libraries/semaphore-compat
=====================================
@@ -0,0 +1 @@
+Subproject commit 663ef75467995acf41c51d3e21d03347e85b844e
=====================================
libraries/unix
=====================================
@@ -1 +1 @@
-Subproject commit 3be0223cee7395410915a127eba3acae5ff0b2f2
+Subproject commit 98adc732bfbfca4fef945d546ecbaae13952a950
=====================================
packages
=====================================
@@ -66,5 +66,6 @@ libraries/Win32 - - https:/
libraries/xhtml - - https://github.com/haskell/xhtml.git
libraries/exceptions - - https://github.com/ekmett/exceptions.git
nofib nofib - -
+libraries/semaphore-compat - - -
libraries/stm - - ssh://git@github.com/haskell/stm.git
. - ghc.git -
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/commit/aabed5822eaaeef1ca22b59a65e4b5b1a8d639a6
--
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/commit/aabed5822eaaeef1ca22b59a65e4b5b1a8d639a6
You're receiving this email because of your account on gitlab.haskell.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/ghc-commits/attachments/20221218/6001b3f4/attachment-0001.html>
More information about the ghc-commits
mailing list