[Git][ghc/ghc][wip/jsem] parent ad612f555821a44260e5d9654f940b71f5180817

Matthew Pickering (@mpickering) gitlab at gitlab.haskell.org
Tue Dec 20 09:40:58 UTC 2022



Matthew Pickering pushed to branch wip/jsem at Glasgow Haskell Compiler / GHC


Commits:
ae811dfe by sheaf at 2022-12-20T09:40:42+00:00
parent ad612f555821a44260e5d9654f940b71f5180817
author sheaf <sam.derbyshire at gmail.com> 1662553354 +0200
committer Matthew Pickering <matthewtpickering at gmail.com> 1671366685 +0000

WIP: jsem, using POSIX/Win32 semaphores

Updates submodule

- - - - -


15 changed files:

- .gitmodules
- cabal.project-reinstall
- compiler/GHC/Driver/Config.hs
- compiler/GHC/Driver/Make.hs
- + compiler/GHC/Driver/MakeSem.hs
- compiler/GHC/Driver/Pipeline/LogQueue.hs
- compiler/GHC/Driver/Session.hs
- compiler/ghc.cabal.in
- docs/users_guide/using.rst
- hadrian/src/Packages.hs
- hadrian/src/Settings/Default.hs
- libraries/Win32
- + libraries/semaphore-compat
- libraries/unix
- packages


Changes:

=====================================
.gitmodules
=====================================
@@ -83,6 +83,10 @@
 	url = https://gitlab.haskell.org/ghc/packages/unix.git
 	ignore = untracked
 	branch = 2.7
+[submodule "libraries/semaphore-compat"]
+	path = libraries/semaphore-compat
+	url = https://gitlab.haskell.org:ghc/semaphore-compat.git
+	ignore = untracked
 [submodule "libraries/stm"]
 	path = libraries/stm
 	url = https://gitlab.haskell.org/ghc/packages/stm.git


=====================================
cabal.project-reinstall
=====================================
@@ -29,6 +29,7 @@ packages: ./compiler
           ./libraries/parsec/
           -- ./libraries/pretty/
           ./libraries/process/
+          ./libraries/semaphore-compat
           ./libraries/stm
           -- ./libraries/template-haskell/
           ./libraries/terminfo/


=====================================
compiler/GHC/Driver/Config.hs
=====================================
@@ -38,8 +38,10 @@ initBCOOpts dflags = do
   -- Serializing ResolvedBCO is expensive, so if we're in parallel mode
   -- (-j<n>) parallelise the serialization.
   n_jobs <- case parMakeCount dflags of
-              Nothing -> liftIO getNumProcessors
-              Just n  -> return n
+              Nothing -> pure 1
+              Just (ParMakeThisMany n) -> pure n
+              Just ParMakeNumProcessors -> liftIO getNumProcessors
+              -- jsem TODO
   return $ BCOOpts n_jobs
 
 -- | Extract GHCi options from DynFlags and step


=====================================
compiler/GHC/Driver/Make.hs
=====================================
@@ -75,6 +75,7 @@ import GHC.Driver.Env
 import GHC.Driver.Errors
 import GHC.Driver.Errors.Types
 import GHC.Driver.Main
+import GHC.Driver.MakeSem
 
 import GHC.Parser.Header
 
@@ -150,9 +151,9 @@ import GHC.Runtime.Loader
 import GHC.Rename.Names
 import GHC.Utils.Constants
 import GHC.Types.Unique.DFM (udfmRestrictKeysSet)
-import qualified Data.IntSet as I
 import GHC.Types.Unique
 
+import qualified Data.IntSet as I
 
 -- -----------------------------------------------------------------------------
 -- Loading the program
@@ -655,6 +656,34 @@ createBuildPlan mod_graph maybe_top_mod =
               (vcat [text "Build plan missing nodes:", (text "PLAN:" <+> ppr (sum (map countMods build_plan))), (text "GRAPH:" <+> ppr (length (mgModSummaries' mod_graph )))])
               build_plan
 
+-- data ParMakeMode = ParallelMake | SequentialMake
+--   deriving (Eq, Show)
+
+mkWorkerLimit :: DynFlags -> IO WorkerLimit
+mkWorkerLimit dflags =
+  case jsemHandle dflags of
+    Just h -> pure (JSemLimit $ SemaphoreName h)
+    Nothing -> case parMakeCount dflags of
+      Nothing -> pure $ num_procs 1
+      Just ParMakeNumProcessors -> num_procs <$> getNumProcessors
+      Just (ParMakeThisMany n) -> pure $ num_procs n
+  where
+    num_procs x = NumProcessorsLimit (max 1 x)
+
+isWorkerLimitSequential :: WorkerLimit -> Bool
+isWorkerLimitSequential (NumProcessorsLimit x) = x <= 1
+isWorkerLimitSequential (JSemLimit {})         = False
+
+-- | This describes what we use to limit the number of jobs, either we limit it
+-- ourselves to a specific number or we have an external parallelism semaphore
+-- limit it for us.
+data WorkerLimit
+  = NumProcessorsLimit Int
+  | JSemLimit
+    SemaphoreName
+      -- ^ Semaphore name to use
+  deriving Eq
+
 -- | Generalized version of 'load' which also supports a custom
 -- 'Messager' (for reporting progress) and 'ModuleGraph' (generally
 -- produced by calling 'depanal'.
@@ -735,14 +764,12 @@ load' mhmi_cache how_much mHscMessage mod_graph = do
     liftIO $ debugTraceMsg logger 2 (hang (text "Ready for upsweep")
                                     2 (ppr build_plan))
 
-    n_jobs <- case parMakeCount (hsc_dflags hsc_env) of
-                    Nothing -> liftIO getNumProcessors
-                    Just n  -> return n
+    worker_limit <- liftIO $ mkWorkerLimit dflags
 
     setSession $ hscUpdateHUG (unitEnv_map pruneHomeUnitEnv) hsc_env
     (upsweep_ok, hsc_env1) <- withDeferredDiagnostics $ do
       hsc_env <- getSession
-      liftIO $ upsweep n_jobs hsc_env mhmi_cache mHscMessage (toCache pruned_cache) build_plan
+      liftIO $ upsweep worker_limit hsc_env mhmi_cache mHscMessage (toCache pruned_cache) build_plan
     setSession hsc_env1
     case upsweep_ok of
       Failed -> loadFinish upsweep_ok
@@ -1023,13 +1050,7 @@ getDependencies direct_deps build_map =
 type BuildM a = StateT BuildLoopState IO a
 
 
--- | Abstraction over the operations of a semaphore which allows usage with the
---  -j1 case
-data AbstractSem = AbstractSem { acquireSem :: IO ()
-                               , releaseSem :: IO () }
 
-withAbstractSem :: AbstractSem -> IO b -> IO b
-withAbstractSem sem = MC.bracket_ (acquireSem sem) (releaseSem sem)
 
 -- | Environment used when compiling a module
 data MakeEnv = MakeEnv { hsc_env :: !HscEnv -- The basic HscEnv which will be augmented for each module
@@ -1214,7 +1235,7 @@ withCurrentUnit uid = do
   local (\env -> env { hsc_env = hscSetActiveUnitId uid (hsc_env env)})
 
 upsweep
-    :: Int -- ^ The number of workers we wish to run in parallel
+    :: WorkerLimit -- ^ The number of workers we wish to run in parallel
     -> HscEnv -- ^ The base HscEnv, which is augmented for each module
     -> Maybe ModIfaceCache -- ^ A cache to incrementally write final interface files to
     -> Maybe Messager
@@ -2818,7 +2839,7 @@ label_self thread_name = do
     CC.labelThread self_tid thread_name
 
 
-runPipelines :: Int -> HscEnv -> Maybe Messager -> [MakeAction] -> IO ()
+runPipelines :: WorkerLimit -> HscEnv -> Maybe Messager -> [MakeAction] -> IO ()
 -- Don't even initialise plugins if there are no pipelines
 runPipelines _ _ _ [] = return ()
 runPipelines n_job orig_hsc_env mHscMessager all_pipelines = do
@@ -2826,7 +2847,7 @@ runPipelines n_job orig_hsc_env mHscMessager all_pipelines = do
 
   plugins_hsc_env <- initializePlugins orig_hsc_env
   case n_job of
-    1 -> runSeqPipelines plugins_hsc_env mHscMessager all_pipelines
+    NumProcessorsLimit n | n <= 1 -> runSeqPipelines plugins_hsc_env mHscMessager all_pipelines
     _n -> runParPipelines n_job plugins_hsc_env mHscMessager all_pipelines
 
 runSeqPipelines :: HscEnv -> Maybe Messager -> [MakeAction] -> IO ()
@@ -2836,16 +2857,40 @@ runSeqPipelines plugin_hsc_env mHscMessager all_pipelines =
                     , compile_sem = AbstractSem (return ()) (return ())
                     , env_messager = mHscMessager
                     }
-  in runAllPipelines 1 env all_pipelines
+  in runAllPipelines (NumProcessorsLimit 1) env all_pipelines
+
+  -- TODO remove this capabilities management, it will be handled by the semaphore
 
+runNjobsAbstractSem :: Int -> (AbstractSem -> IO a) -> IO a
+runNjobsAbstractSem n_jobs action = do
+  compile_sem <- newQSem n_jobs
+  n_capabilities <- getNumCapabilities
+  n_cpus <- getNumProcessors
+  let
+    asem = AbstractSem (waitQSem compile_sem) (signalQSem compile_sem)
+    set_num_caps n = unless (n_capabilities /= 1) $ setNumCapabilities n
+    updNumCapabilities =  do
+      -- Setting number of capabilities more than
+      -- CPU count usually leads to high userspace
+      -- lock contention. #9221
+      set_num_caps $ min n_jobs n_cpus
+    resetNumCapabilities = set_num_caps n_capabilities
+  MC.bracket_ updNumCapabilities resetNumCapabilities $ action asem
+
+runWorkerLimit :: WorkerLimit -> (AbstractSem -> IO a) -> IO a
+runWorkerLimit worker_limit action = case worker_limit of
+    NumProcessorsLimit n_jobs ->
+      runNjobsAbstractSem n_jobs action
+    JSemLimit sem ->
+      runJSemAbstractSem sem action
 
 -- | Build and run a pipeline
-runParPipelines :: Int              -- ^ How many capabilities to use
-             -> HscEnv           -- ^ The basic HscEnv which is augmented with specific info for each module
+runParPipelines :: WorkerLimit -- ^ How to limit work parallelism
+             -> HscEnv         -- ^ The basic HscEnv which is augmented with specific info for each module
              -> Maybe Messager   -- ^ Optional custom messager to use to report progress
              -> [MakeAction]  -- ^ The build plan for all the module nodes
              -> IO ()
-runParPipelines n_jobs plugin_hsc_env mHscMessager all_pipelines = do
+runParPipelines worker_limit plugin_hsc_env mHscMessager all_pipelines = do
 
 
   -- A variable which we write to when an error has happened and we have to tell the
@@ -2855,39 +2900,23 @@ runParPipelines n_jobs plugin_hsc_env mHscMessager all_pipelines = do
   -- will add it's LogQueue into this queue.
   log_queue_queue_var <- newTVarIO newLogQueueQueue
   -- Thread which coordinates the printing of logs
-  wait_log_thread <- logThread n_jobs (length all_pipelines) (hsc_logger plugin_hsc_env) stopped_var log_queue_queue_var
+  wait_log_thread <- logThread (hsc_logger plugin_hsc_env) stopped_var log_queue_queue_var
 
 
   -- Make the logger thread-safe, in case there is some output which isn't sent via the LogQueue.
   thread_safe_logger <- liftIO $ makeThreadSafe (hsc_logger plugin_hsc_env)
   let thread_safe_hsc_env = plugin_hsc_env { hsc_logger = thread_safe_logger }
 
-  let updNumCapabilities = liftIO $ do
-          n_capabilities <- getNumCapabilities
-          n_cpus <- getNumProcessors
-          -- Setting number of capabilities more than
-          -- CPU count usually leads to high userspace
-          -- lock contention. #9221
-          let n_caps = min n_jobs n_cpus
-          unless (n_capabilities /= 1) $ setNumCapabilities n_caps
-          return n_capabilities
-
-  let resetNumCapabilities orig_n = do
-          liftIO $ setNumCapabilities orig_n
-          atomically $ writeTVar stopped_var True
-          wait_log_thread
-
-  compile_sem <- newQSem n_jobs
-  let abstract_sem = AbstractSem (waitQSem compile_sem) (signalQSem compile_sem)
+  runWorkerLimit worker_limit $ \abstract_sem -> do
+    let env = MakeEnv { hsc_env = thread_safe_hsc_env
+                      , withLogger = withParLog log_queue_queue_var
+                      , compile_sem = abstract_sem
+                      , env_messager = mHscMessager
+                      }
     -- Reset the number of capabilities once the upsweep ends.
-  let env = MakeEnv { hsc_env = thread_safe_hsc_env
-                    , withLogger = withParLog log_queue_queue_var
-                    , compile_sem = abstract_sem
-                    , env_messager = mHscMessager
-                    }
-
-  MC.bracket updNumCapabilities resetNumCapabilities $ \_ ->
-    runAllPipelines n_jobs env all_pipelines
+    runAllPipelines worker_limit env all_pipelines
+    atomically $ writeTVar stopped_var True
+    wait_log_thread
 
 withLocalTmpFS :: RunMakeM a -> RunMakeM a
 withLocalTmpFS act = do
@@ -2904,10 +2933,11 @@ withLocalTmpFS act = do
   MC.bracket initialiser finaliser $ \lcl_hsc_env -> local (\env -> env { hsc_env = lcl_hsc_env}) act
 
 -- | Run the given actions and then wait for them all to finish.
-runAllPipelines :: Int -> MakeEnv -> [MakeAction] -> IO ()
-runAllPipelines n_jobs env acts = do
-  let spawn_actions :: IO [ThreadId]
-      spawn_actions = if n_jobs == 1
+runAllPipelines :: WorkerLimit -> MakeEnv -> [MakeAction] -> IO ()
+runAllPipelines worker_limit env acts = do
+  let single_worker = isWorkerLimitSequential worker_limit
+      spawn_actions :: IO [ThreadId]
+      spawn_actions = if single_worker
         then (:[]) <$> (forkIOWithUnmask $ \unmask -> void $ runLoop (\io -> io unmask) env acts)
         else runLoop forkIOWithUnmask env acts
 


=====================================
compiler/GHC/Driver/MakeSem.hs
=====================================
@@ -0,0 +1,548 @@
+{-# LANGUAGE BlockArguments #-}
+{-# LANGUAGE NamedFieldPuns #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE TupleSections #-}
+{-# LANGUAGE NumericUnderscores #-}
+
+-- | Implementation of a jobserver using system semaphores.
+--
+--
+module GHC.Driver.MakeSem
+  ( -- * JSem: parallelism semaphore backed
+    -- by a system semaphore (Posix/Windows)
+    runJSemAbstractSem
+
+  -- * System semaphores
+  , Semaphore, SemaphoreName(..)
+
+  -- * Abstract semaphores
+  , AbstractSem(..)
+  , withAbstractSem
+  )
+  where
+
+import GHC.Prelude
+import GHC.Conc
+import GHC.Data.OrdList
+import GHC.IO.Exception
+import GHC.Utils.Outputable
+import GHC.Utils.Panic
+import GHC.Utils.Json
+
+import System.Semaphore
+
+import Control.Monad
+import qualified Control.Monad.Catch as MC
+import Control.Concurrent.MVar
+import Control.Concurrent.STM
+import Data.Foldable
+import Data.Functor
+import GHC.Stack
+import Debug.Trace
+
+---------------------------------------
+-- Semaphore jobserver
+
+-- | A jobserver based off a system 'Semaphore'.
+--
+-- Keeps track of the pending jobs and resources
+-- available from the semaphore.
+data Jobserver
+  = Jobserver
+  { jSemaphore :: !Semaphore
+    -- ^ The semaphore which controls available resources
+  , jobs :: !(TVar JobResources)
+    -- ^ The currently pending jobs, and the resources
+    -- obtained from the semaphore
+  }
+
+data JobserverOptions
+  = JobserverOptions
+  { releaseDebounce    :: !Int
+     -- ^ Minimum delay, in milliseconds, between acquiring a token
+     -- and releasing a token.
+  , setNumCapsDebounce :: !Int
+    -- ^ Minimum delay, in milliseconds, between two consecutive
+    -- calls of 'setNumCapabilities'.
+  }
+
+defaultJobserverOptions :: JobserverOptions
+defaultJobserverOptions =
+  JobserverOptions
+    { releaseDebounce    = 1000 -- 1 second
+    , setNumCapsDebounce = 1000 -- 1 second
+    }
+
+-- | Resources available for running jobs, i.e.
+-- tokens obtained from the parallelism semaphore.
+data JobResources
+  = Jobs
+  { tokensOwned :: !Int
+    -- ^ How many tokens have been claimed from the semaphore
+  , tokensFree  :: !Int
+    -- ^ How many tokens are not currently being used
+  , jobsWaiting :: !(OrdList (TMVar ()))
+    -- ^ Pending jobs waiting on a token
+  }
+
+instance Outputable JobResources where
+  ppr Jobs{..}
+    = text "JobResources" <+>
+        ( braces $ hsep
+          [ text "owned=" <> ppr tokensOwned
+          , text "free=" <> ppr tokensFree
+          , text "num_waiting=" <> ppr (length jobsWaiting)
+          ] )
+
+-- | Add one new token.
+addToken :: JobResources -> JobResources
+addToken jobs@( Jobs { tokensOwned = owned, tokensFree = free })
+  = --if owned > 6 then pprPanic "addToken" (ppr jobs)
+    --             else
+    jobs { tokensOwned = owned + 1, tokensFree = free + 1 }
+
+-- | Free one token.
+addFreeToken :: JobResources -> JobResources
+addFreeToken jobs@( Jobs { tokensFree = free })
+  = assertPpr (tokensOwned jobs > free)
+      (text "addFreeToken:" <+> ppr (tokensOwned jobs) <+> ppr free)
+  $ jobs { tokensFree = free + 1 }
+
+-- | Use up one token.
+removeFreeToken :: JobResources -> JobResources
+removeFreeToken jobs@( Jobs { tokensFree = free })
+  = assertPpr (free > 0)
+      (text "removeFreeToken:" <+> ppr free)
+  $ jobs { tokensFree = free - 1 }
+
+-- | Return one owned token.
+removeOwnedToken :: JobResources -> JobResources
+removeOwnedToken jobs@( Jobs { tokensOwned = owned })
+  = assertPpr (owned > 1)
+      (text "removeOwnedToken:" <+> ppr owned)
+  $ jobs { tokensOwned = owned - 1 }
+
+-- | Add one new job to the end of the list of pending jobs.
+addJob :: TMVar () -> JobResources -> JobResources
+addJob job jobs@( Jobs { jobsWaiting = wait })
+  = jobs { jobsWaiting = wait `SnocOL` job }
+
+-- | The state of the semaphore job server.
+data JobserverState
+  = JobserverState
+    { jobserverAction  :: !JobserverAction
+      -- ^ The current action being performed by the
+      -- job server.
+    , canChangeNumCaps :: !(TVar Bool)
+      -- ^ A TVar that signals whether it has been long
+      -- enough since we last changed 'numCapabilities'.
+    , canReleaseToken  :: !(TVar Bool)
+      -- ^ A TVar that signals whether we last acquired
+      -- a token long enough ago that we can now release
+      -- a token.
+    }
+data JobserverAction
+  -- | The jobserver is idle: no thread is currently
+  -- interacting with the semaphore.
+  = Idle
+  -- | A thread is waiting for a token on the semaphore.
+  | Acquiring
+    { activeWaitId   :: WaitId
+    , threadFinished :: TMVar (Maybe MC.SomeException) }
+
+-- | Retrieve the 'TMVar' that signals if the current thread has finished,
+-- if any thread is currently active in the jobserver.
+activeThread_maybe :: JobserverAction -> Maybe (TMVar (Maybe MC.SomeException))
+activeThread_maybe Idle                                   = Nothing
+activeThread_maybe (Acquiring { threadFinished = tmvar }) = Just tmvar
+
+-- | Whether we should try to acquire a new token from the semaphore:
+-- there is a pending job and no free tokens.
+guardAcquire :: JobResources -> Bool
+guardAcquire ( Jobs { tokensFree, jobsWaiting } )
+  = tokensFree == 0 && not (null jobsWaiting)
+
+-- | Whether we should release a token from the semaphore:
+-- there are no pending jobs and we can release a token.
+guardRelease :: JobResources -> Bool
+guardRelease ( Jobs { tokensFree, tokensOwned, jobsWaiting } )
+  = null jobsWaiting && tokensFree > 0 && tokensOwned > 1
+
+---------------------------------------
+-- Semaphore jobserver implementation
+
+-- | Add one pending job to the jobserver.
+--
+-- Blocks, waiting on the jobserver to supply a free token.
+acquireJob :: TVar JobResources -> IO ()
+acquireJob jobs_tvar = do
+  (job_tmvar, _jobs0) <- tracedAtomically "acquire" $
+    modifyJobResources jobs_tvar \ jobs -> do
+      job_tmvar <- newEmptyTMVar
+      return ((job_tmvar, jobs), addJob job_tmvar jobs)
+  atomically $ takeTMVar job_tmvar
+
+-- | Signal to the job server that one job has completed,
+-- releasing its corresponding token.
+releaseJob :: TVar JobResources -> IO ()
+releaseJob jobs_tvar = do
+  tracedAtomically "release" do
+    modifyJobResources jobs_tvar \ jobs -> do
+      massertPpr (tokensFree jobs < tokensOwned jobs)
+        (text "releaseJob: more free jobs than owned jobs!")
+      return ((), addFreeToken jobs)
+
+
+-- | Release all tokens owned from the semaphore (to clean up
+-- the jobserver at the end).
+cleanupJobserver :: Jobserver -> IO ()
+cleanupJobserver (Jobserver { jSemaphore = sem
+                            , jobs       = jobs_tvar })
+  = do
+    Jobs { tokensOwned = owned } <- readTVarIO jobs_tvar
+    let toks_to_release = owned - 1
+      -- Subtract off the implicit token: whoever spawned the ghc process
+      -- in the first place is responsible for that token.
+    releaseSemaphore sem toks_to_release
+
+-- | Dispatch the available tokens acquired from the semaphore
+-- to the pending jobs in the job server.
+dispatchTokens :: JobResources -> STM JobResources
+dispatchTokens jobs@( Jobs { tokensFree = toks_free, jobsWaiting = wait } )
+  | toks_free > 0
+  , next `ConsOL` rest <- wait
+  -- There's a pending job and a free token:
+  -- pass on the token to that job, and recur.
+  = do
+      putTMVar next ()
+      let jobs' = jobs { tokensFree = toks_free - 1, jobsWaiting = rest }
+      dispatchTokens jobs'
+  | otherwise
+  = return jobs
+
+-- | Update the available resources used from a semaphore, dispatching
+-- any newly acquired resources.
+--
+-- Invariant: if the number of available resources decreases, there
+-- must be no pending jobs.
+--
+-- All modifications should go through this function to ensure the contents
+-- of the 'TVar' remains in normal form.
+modifyJobResources :: HasCallStack => TVar JobResources
+                   -> (JobResources -> STM (a, JobResources))
+                   -> STM (a, Maybe JobResources)
+modifyJobResources jobs_tvar action = do
+  old_jobs  <- readTVar jobs_tvar
+  (a, jobs) <- action old_jobs
+
+  -- Check the invariant: if the number of free tokens has decreased,
+  -- there must be no pending jobs.
+  massertPpr (null (jobsWaiting jobs) || tokensFree jobs >= tokensFree old_jobs) $
+    vcat [ text "modiyJobResources: pending jobs but fewer free tokens" ]
+  dispatched_jobs <- dispatchTokens jobs
+  writeTVar jobs_tvar dispatched_jobs
+  return (a, Just dispatched_jobs)
+
+
+tracedAtomically_ :: String -> STM (Maybe JobResources) -> IO ()
+tracedAtomically_ s act = tracedAtomically s (((),) <$> act)
+
+tracedAtomically :: String -> STM (a, Maybe JobResources) -> IO a
+tracedAtomically origin act = do
+  (a, mjr) <- atomically act
+  forM_ mjr $ \ jr -> do
+    -- Use the "jsem:" prefix to identify where the write traces are
+    traceEventIO ("jsem:" ++ renderJobResources origin jr)
+  return a
+
+renderJobResources :: String -> JobResources -> String
+renderJobResources origin (Jobs own free pending) = showSDocUnsafe $ renderJSON $
+  JSObject [ ("name", JSString origin)
+           , ("owned", JSInt own)
+           , ("free", JSInt free)
+           , ("pending", JSInt (length pending) )
+           ]
+
+
+-- | Spawn a new thread that waits on the semaphore in order to acquire
+-- an additional token.
+acquireThread :: Jobserver -> IO JobserverAction
+acquireThread (Jobserver { jSemaphore = sem, jobs = jobs_tvar }) = do
+    threadFinished_tmvar <- newEmptyTMVarIO
+    let
+      wait_result_action :: Either MC.SomeException Bool -> IO ()
+      wait_result_action wait_res =
+        tracedAtomically_ "acquire_thread" do
+          (r, jb) <- case wait_res of
+            Left (e :: MC.SomeException) -> do
+              return $ (Just e, Nothing)
+            Right success -> do
+              if success
+                then do
+                  modifyJobResources jobs_tvar \ jobs ->
+                    return (Nothing, addToken jobs)
+                else
+                  return (Nothing, Nothing)
+          putTMVar threadFinished_tmvar r
+          return jb
+    wait_id <- forkWaitOnSemaphoreInterruptible sem wait_result_action
+    labelThread (waitingThreadId wait_id) "acquire_thread"
+    return $ Acquiring { activeWaitId   = wait_id
+                       , threadFinished = threadFinished_tmvar }
+
+-- | Spawn a thread to release ownership of one resource from the semaphore,
+-- provided we have spare resources and no pending jobs.
+releaseThread :: Jobserver -> IO JobserverAction
+releaseThread (Jobserver { jSemaphore = sem, jobs = jobs_tvar }) = do
+  threadFinished_tmvar <- newEmptyTMVarIO
+  MC.mask_ do
+    -- Pre-release the resource so that another thread doesn't take control of it
+    -- just as we release the lock on the semaphore.
+    still_ok_to_release
+      <- tracedAtomically "pre_release" $
+         modifyJobResources jobs_tvar \ jobs ->
+           if guardRelease jobs
+               -- TODO: should this also debounce?
+           then return (True , removeOwnedToken $ removeFreeToken jobs)
+           else return (False, jobs)
+    if not still_ok_to_release
+    then return Idle
+    else do
+      tid <- forkIO $ do
+        x <- MC.try $ releaseSemaphore sem 1
+        tracedAtomically_ "post-release" $ do
+          (r, jobs) <- case x of
+            Left (e :: MC.SomeException) -> do
+              modifyJobResources jobs_tvar \ jobs ->
+                return (Just e, addToken jobs)
+            Right _ -> do
+              return (Nothing, Nothing)
+          putTMVar threadFinished_tmvar r
+          return jobs
+      labelThread tid "release_thread"
+      return Idle
+
+-- | When there are pending jobs but no free tokens,
+-- spawn a thread to acquire a new token from the semaphore.
+--
+-- See 'acquireThread'.
+tryAcquire :: JobserverOptions
+           -> Jobserver
+           -> JobserverState
+           -> STM (IO JobserverState)
+tryAcquire opts js@( Jobserver { jobs = jobs_tvar })
+  st@( JobserverState { jobserverAction = Idle } )
+  = do
+    jobs <- readTVar jobs_tvar
+    guard $ guardAcquire jobs
+    return do
+      action           <- acquireThread js
+      -- Set a debounce after acquiring a token.
+      can_release_tvar <- registerDelay $ (releaseDebounce opts * 1000)
+      return $ st { jobserverAction = action
+                  , canReleaseToken = can_release_tvar }
+tryAcquire _ _ _ = retry
+
+-- | When there are free tokens and no pending jobs,
+-- spawn a thread to release a token from the semamphore.
+--
+-- See 'releaseThread'.
+tryRelease :: Jobserver
+           -> JobserverState
+           -> STM (IO JobserverState)
+tryRelease sjs@( Jobserver { jobs = jobs_tvar } )
+  st@( JobserverState
+      { jobserverAction = Idle
+      , canReleaseToken = can_release_tvar } )
+  = do
+    jobs <- readTVar jobs_tvar
+    guard  $ guardRelease jobs
+    can_release <- readTVar can_release_tvar
+    guard can_release
+    return do
+      action <- releaseThread sjs
+      return $ st { jobserverAction = action }
+tryRelease _ _ = retry
+
+-- | Wait for an active thread to finish. Once it finishes:
+--
+--  - set the 'JobserverAction' to 'Idle',
+--  - update the number of capabilities to reflect the number
+--    of owned tokens from the semaphore.
+tryNoticeIdle :: JobserverOptions
+              -> TVar JobResources
+              -> JobserverState
+              -> STM (IO JobserverState)
+tryNoticeIdle opts jobs_tvar jobserver_state
+  | Just threadFinished_tmvar <- activeThread_maybe $ jobserverAction jobserver_state
+  = sync_num_caps (canChangeNumCaps jobserver_state) threadFinished_tmvar
+  | otherwise
+  = retry -- no active thread: wait until jobserver isn't idle
+  where
+    sync_num_caps :: TVar Bool
+                  -> TMVar (Maybe MC.SomeException)
+                  -> STM (IO JobserverState)
+    sync_num_caps can_change_numcaps_tvar threadFinished_tmvar = do
+      mb_ex <- takeTMVar threadFinished_tmvar
+      for_ mb_ex MC.throwM
+      Jobs { tokensOwned } <- readTVar jobs_tvar
+      can_change_numcaps <- readTVar can_change_numcaps_tvar
+      guard can_change_numcaps
+      return do
+        x <- getNumCapabilities
+        can_change_numcaps_tvar_2 <-
+          if x == tokensOwned
+          then return can_change_numcaps_tvar
+          else do
+            setNumCapabilities tokensOwned
+            registerDelay $ (setNumCapsDebounce opts * 1000)
+        return $
+          jobserver_state
+            { jobserverAction  = Idle
+            , canChangeNumCaps = can_change_numcaps_tvar_2 }
+
+-- | Try to stop the current thread which is acquiring/releasing resources
+-- if that operation is no longer relevant.
+tryStopThread :: TVar JobResources
+              -> JobserverState
+              -> STM (IO JobserverState)
+tryStopThread jobs_tvar jsj = do
+  case jobserverAction jsj of
+    Acquiring { activeWaitId = wait_id } -> do
+     jobs <- readTVar jobs_tvar
+     guard $ null (jobsWaiting jobs)
+     return do
+       interruptWaitOnSemaphore wait_id
+       return $ jsj { jobserverAction = Idle }
+    _ -> retry
+
+-- | Main jobserver loop: acquire/release resources as
+-- needed for the pending jobs and available semaphore tokens.
+jobserverLoop :: JobserverOptions -> Jobserver -> IO ()
+jobserverLoop opts sjs@(Jobserver { jobs = jobs_tvar })
+  = do
+      true_tvar <- newTVarIO True
+      let init_state :: JobserverState
+          init_state =
+            JobserverState
+              { jobserverAction  = Idle
+              , canChangeNumCaps = true_tvar
+              , canReleaseToken  = true_tvar }
+      loop init_state
+  where
+    loop s = do
+      action <- atomically $ asum $ (\x -> x s) <$>
+        [ tryRelease    sjs
+        , tryAcquire    opts sjs
+        , tryNoticeIdle opts jobs_tvar
+        , tryStopThread jobs_tvar
+        ]
+      s <- action
+      loop s
+
+-- | Create a new jobserver using the given semaphore handle.
+makeJobserver :: SemaphoreName -> IO (AbstractSem, IO ())
+makeJobserver sem_name = do
+  semaphore <- openSemaphore sem_name
+  let
+    init_jobs =
+      Jobs { tokensOwned = 1
+           , tokensFree  = 1
+           , jobsWaiting = NilOL
+           }
+  jobs_tvar <- newTVarIO init_jobs
+  let
+    opts = defaultJobserverOptions -- TODO: allow this to be configured
+    sjs = Jobserver { jSemaphore = semaphore
+                    , jobs       = jobs_tvar }
+  loop_finished_mvar <- newEmptyMVar
+  loop_tid <- forkIOWithUnmask \ unmask -> do
+    r <- try $ unmask $ jobserverLoop opts sjs
+    putMVar loop_finished_mvar $
+      case r of
+        Left e
+          | Just ThreadKilled <- fromException e
+          -> Nothing
+          | otherwise
+          -> Just e
+        Right () -> Nothing
+  labelThread loop_tid "job_server"
+  let
+    acquireSem = acquireJob jobs_tvar
+    releaseSem = releaseJob jobs_tvar
+    cleanupSem = do
+      -- this is interruptible
+      cleanupJobserver sjs
+      killThread loop_tid
+      mb_ex <- takeMVar loop_finished_mvar
+      for_ mb_ex MC.throwM
+
+  return (AbstractSem{..}, cleanupSem)
+
+-- | Implement an abstract semaphore using a semaphore 'Jobserver'
+-- which queries the system semaphore of the given name for resources.
+runJSemAbstractSem :: SemaphoreName         -- ^ the system semaphore to use
+                   -> (AbstractSem -> IO a) -- ^ the operation to run
+                                            -- which requires a semaphore
+                   -> IO a
+runJSemAbstractSem sem action = MC.mask \ unmask -> do
+  (abs, cleanup) <- makeJobserver sem
+  r <- try $ unmask $ action abs
+  case r of
+    Left (e1 :: MC.SomeException) -> do
+      (_ :: Either MC.SomeException ()) <- MC.try cleanup
+      MC.throwM e1
+    Right x -> cleanup $> x
+
+{-
+Note [Architecture of the Job Server]
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+In `-jsem` mode the amount of parrelism that GHC can use is controlled by a
+system semaphore. We take resources from it when we need them and give them back
+if we don't have enought to do.
+
+A naive implementation would just take and release the semaphore around performing
+the action but this leads to two issues.
+
+* When taking a slot in the semaphore we must call `setNumCapabilities` in order
+  to adjust how many capabilities are available for parralel garbage collection. This
+  causes a synchronisation
+* We want to implement a debounce so that whilst there is pending work in the current
+  process we prefer to keep hold of resources from the semaphore. This reduces
+  overall memory usage as there are less live GHC processes at once.
+
+Therefore the obtention of semaphore resources is separated away from the
+request for the resource in the driver.
+
+A slot from the semaphore is requested using `acquireJob`, this creates a pending
+job which is a MVar which can be filling in to signal that the requested slot is ready.
+
+When the job is finished, the slot is released by calling `releaseJob`, which just
+increases the number of `free` jobs. If there are more pending jobs when the free count
+is increased the slot is immediately reused (see `modifyJobResources`).
+
+The `jobServerLoop` interacts with the system semaphore, when there are still pending
+jobs then `acquireThread` blocks waiting for a slot in the semaphore and increases
+the owned count when the slot is obtained.
+
+When there are free slots, no pending jobs and the debounce has expired
+then `releaseThread` will release slots back to the global semaphore.
+
+`tryStopThread` attempts to kill threads which are waiting to acquire a resource
+when we no longer need it. For example, consider that we attempt to acquire two
+slots of the semaphore but the first job finishes before we acquire the second resources,
+the second slot is no longer needed so we should cancel the wait (as it would not be used to
+do any work and not returned until the debounce). We just need to kill in the acquiring
+state because the releading state can't block.
+
+Note [Eventlog Messages for jsem]
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+It can be tricky to verify that the work is shared adequately across different
+processes. To help debug this whenever the global state changes the values of
+`JobResources` are output to the eventlog. There are some scripts which can be used
+to analyse this output and report statistics about core saturation in this
+github repo (https://github.com/mpickering/ghc-jsem-analyse).
+
+-}


=====================================
compiler/GHC/Driver/Pipeline/LogQueue.hs
=====================================
@@ -100,10 +100,10 @@ dequeueLogQueueQueue (LogQueueQueue n lqq) = case IM.minViewWithKey lqq of
                                                 Just ((k, v), lqq') | k == n -> Just (v, LogQueueQueue (n + 1) lqq')
                                                 _ -> Nothing
 
-logThread :: Int -> Int -> Logger -> TVar Bool -- Signal that no more new logs will be added, clear the queue and exit
+logThread :: Logger -> TVar Bool -- Signal that no more new logs will be added, clear the queue and exit
                     -> TVar LogQueueQueue -- Queue for logs
                     -> IO (IO ())
-logThread _ _ logger stopped lqq_var = do
+logThread logger stopped lqq_var = do
   finished_var <- newEmptyMVar
   _ <- forkIO $ print_logs *> putMVar finished_var ()
   return (takeMVar finished_var)


=====================================
compiler/GHC/Driver/Session.hs
=====================================
@@ -43,6 +43,7 @@ module GHC.Driver.Session (
         needSourceNotes,
         OnOff(..),
         DynFlags(..),
+        ParMakeCount(..),
         outputFile, objectSuf, ways,
         FlagSpec(..),
         HasDynFlags(..), ContainsDynFlags(..),
@@ -461,9 +462,13 @@ data DynFlags = DynFlags {
   ruleCheck             :: Maybe String,
   strictnessBefore      :: [Int],       -- ^ Additional demand analysis
 
-  parMakeCount          :: Maybe Int,   -- ^ The number of modules to compile in parallel
-                                        --   in --make mode, where Nothing ==> compile as
-                                        --   many in parallel as there are CPUs.
+  parMakeCount          :: Maybe ParMakeCount,
+    -- ^ The number of modules to compile in parallel
+    --   in --make mode, ignored with a warning if jobServerAuth is specified.
+    --   If unspecified, compile with a single job.
+
+  jsemHandle            :: Maybe FilePath,
+    -- ^ A handle to a parallelism semaphore
 
   enableTimeStats       :: Bool,        -- ^ Enable RTS timing statistics?
   ghcHeapSize           :: Maybe Int,   -- ^ The heap size to set.
@@ -775,6 +780,12 @@ instance (Monad m, HasDynFlags m) => HasDynFlags (ExceptT e m) where
 class ContainsDynFlags t where
     extractDynFlags :: t -> DynFlags
 
+-- | The type for the -jN argument, specifying that -j on its own represents
+-- using the number of machine processors.
+data ParMakeCount
+  = ParMakeThisMany Int
+  | ParMakeNumProcessors
+
 -----------------------------------------------------------------------------
 -- Accessors from 'DynFlags'
 
@@ -1138,7 +1149,8 @@ defaultDynFlags mySettings =
         historySize             = 20,
         strictnessBefore        = [],
 
-        parMakeCount            = Just 1,
+        parMakeCount            = Nothing,
+        jsemHandle              = Nothing,
 
         enableTimeStats         = False,
         ghcHeapSize             = Nothing,
@@ -1902,19 +1914,25 @@ parseDynamicFlagsFull activeFlags cmdline dflags0 args = do
       throwGhcExceptionIO (CmdLineError ("combination not supported: " ++
                                intercalate "/" (map wayDesc (Set.toAscList theWays))))
 
-  let (dflags3, consistency_warnings) = makeDynFlagsConsistent dflags2
+  (dflags3, io_warnings) <- liftIO $ dynFlagsIOCheck dflags2
+
+  let (dflags4, consistency_warnings) = makeDynFlagsConsistent dflags3
 
   -- Set timer stats & heap size
-  when (enableTimeStats dflags3) $ liftIO enableTimingStats
-  case (ghcHeapSize dflags3) of
+  when (enableTimeStats dflags4) $ liftIO enableTimingStats
+  case (ghcHeapSize dflags4) of
     Just x -> liftIO (setHeapSize x)
     _      -> return ()
 
-  liftIO $ setUnsafeGlobalDynFlags dflags3
+  liftIO $ setUnsafeGlobalDynFlags dflags4
 
-  let warns' = map (Warn WarningWithoutFlag) (consistency_warnings ++ sh_warns)
+  let warns' = map (Warn WarningWithoutFlag) (consistency_warnings ++ sh_warns ++ io_warnings)
 
-  return (dflags3, leftover, warns' ++ warns)
+  return (dflags4, leftover, warns' ++ warns)
+
+-- | Perform checks and fixes on DynFlags which require IO
+dynFlagsIOCheck :: DynFlags -> IO (DynFlags, [Located String])
+dynFlagsIOCheck dflags = pure (dflags, [])
 
 -- | Check (and potentially disable) any extensions that aren't allowed
 -- in safe mode.
@@ -2054,14 +2072,16 @@ dynamic_flags_deps = [
   , make_ord_flag defGhcFlag "j"     (OptIntSuffix
         (\n -> case n of
                  Just n
-                     | n > 0     -> upd (\d -> d { parMakeCount = Just n })
+                     | n > 0     -> upd (\d -> d { parMakeCount = Just (ParMakeThisMany n) })
                      | otherwise -> addErr "Syntax: -j[n] where n > 0"
-                 Nothing -> upd (\d -> d { parMakeCount = Nothing })))
+                 Nothing -> upd (\d -> d { parMakeCount = Just ParMakeNumProcessors })))
                  -- When the number of parallel builds
                  -- is omitted, it is the same
                  -- as specifying that the number of
                  -- parallel builds is equal to the
                  -- result of getNumProcessors
+  , make_ord_flag defGhcFlag "jsem" $ hasArg $ \f d -> d { jsemHandle = Just f }
+
   , make_ord_flag defFlag "instantiated-with"   (sepArg setUnitInstantiations)
   , make_ord_flag defFlag "this-component-id"   (sepArg setUnitInstanceOf)
 
@@ -4860,6 +4880,11 @@ makeDynFlagsConsistent dflags
  , Nothing <- outputFile dflags
  = pgmError "--output must be specified when using --merge-objs"
 
+ | Just _ <- jsemHandle dflags
+ , Just _ <- parMakeCount dflags
+   = loop dflags{parMakeCount = Nothing}
+         "`-j` argument is ignored when using `-jsem`"
+
  | otherwise = (dflags, [])
     where loc = mkGeneralSrcSpan (fsLit "when making flags consistent")
           loop updated_dflags warning


=====================================
compiler/ghc.cabal.in
=====================================
@@ -90,6 +90,7 @@ Library
                    hpc        == 0.6.*,
                    transformers == 0.5.*,
                    exceptions == 0.10.*,
+                   semaphore-compat,
                    stm,
                    ghc-boot   == @ProjectVersionMunged@,
                    ghc-heap   == @ProjectVersionMunged@,
@@ -442,6 +443,7 @@ Library
         GHC.Driver.GenerateCgIPEStub
         GHC.Driver.Hooks
         GHC.Driver.LlvmConfigCache
+        GHC.Driver.MakeSem
         GHC.Driver.Main
         GHC.Driver.Make
         GHC.Driver.MakeFile


=====================================
docs/users_guide/using.rst
=====================================
@@ -751,6 +751,14 @@ search path (see :ref:`search-path`).
     number of processors. Note that compilation of a module may not begin
     until its dependencies have been built.
 
+.. ghc-flag:: -jsem
+    :shortdesc:  stub
+    :type: dynamic
+    :category: misc
+
+    Stub
+
+
 .. _multi-home-units:
 
 Multiple Home Units


=====================================
hadrian/src/Packages.hs
=====================================
@@ -8,7 +8,7 @@ module Packages (
     ghcCompact, ghcConfig, ghcHeap, ghci, ghciWrapper, ghcPkg, ghcPrim, haddock, haskeline,
     hsc2hs, hp2ps, hpc, hpcBin, integerGmp, integerSimple, iserv, iservProxy,
     libffi, libiserv, mtl, parsec, pretty, primitive, process, remoteIserv, rts,
-    runGhc, stm, templateHaskell, terminfo, text, time, timeout, touchy,
+    runGhc, semaphoreCompat, stm, templateHaskell, terminfo, text, time, timeout, touchy,
     transformers, unlit, unix, win32, xhtml,
     lintersCommon, lintNotes, lintCommitMsg, lintSubmoduleRefs, lintWhitespace,
     ghcPackages, isGhcPackage,
@@ -39,7 +39,7 @@ ghcPackages =
     , exceptions, filepath, genapply, genprimopcode, ghc, ghcBignum, ghcBoot, ghcBootTh
     , ghcCompact, ghcConfig, ghcHeap, ghci, ghciWrapper, ghcPkg, ghcPrim, haddock, haskeline, hsc2hs
     , hp2ps, hpc, hpcBin, integerGmp, integerSimple, iserv, libffi, libiserv, mtl
-    , parsec, pretty, process, rts, runGhc, stm, templateHaskell
+    , parsec, pretty, process, rts, runGhc, semaphoreCompat, stm, templateHaskell
     , terminfo, text, time, touchy, transformers, unlit, unix, win32, xhtml
     , timeout
     , lintersCommon
@@ -55,7 +55,7 @@ array, base, binary, bytestring, cabalSyntax, cabal, checkPpr, checkExact, count
   exceptions, filepath, genapply, genprimopcode, ghc, ghcBignum, ghcBoot, ghcBootTh,
   ghcCompact, ghcConfig, ghcHeap, ghci, ghciWrapper, ghcPkg, ghcPrim, haddock, haskeline, hsc2hs,
   hp2ps, hpc, hpcBin, integerGmp, integerSimple, iserv, iservProxy, remoteIserv, libffi, libiserv, mtl,
-  parsec, pretty, primitive, process, rts, runGhc, stm, templateHaskell,
+  parsec, pretty, primitive, process, rts, runGhc, semaphoreCompat, stm, templateHaskell,
   terminfo, text, time, touchy, transformers, unlit, unix, win32, xhtml,
   timeout,
   lintersCommon, lintNotes, lintCommitMsg, lintSubmoduleRefs, lintWhitespace
@@ -111,6 +111,7 @@ process             = lib  "process"
 remoteIserv         = util "remote-iserv"
 rts                 = top  "rts"
 runGhc              = util "runghc"
+semaphoreCompat     = lib  "semaphore-compat"
 stm                 = lib  "stm"
 templateHaskell     = lib  "template-haskell"
 terminfo            = lib  "terminfo"


=====================================
hadrian/src/Settings/Default.hs
=====================================
@@ -95,6 +95,7 @@ stage0Packages = do
              , hpcBin
              , mtl
              , parsec
+             , semaphoreCompat
              , time
              , templateHaskell
              , text
@@ -134,6 +135,7 @@ stage1Packages = do
         , integerGmp
         , pretty
         , rts
+        , semaphoreCompat
         , stm
         , unlit
         , xhtml


=====================================
libraries/Win32
=====================================
@@ -1 +1 @@
-Subproject commit 931497f7052f63cb5cfd4494a94e572c5c570642
+Subproject commit efab7f1146da9741dc54fb35476d4aaabeff8d6d


=====================================
libraries/semaphore-compat
=====================================
@@ -0,0 +1 @@
+Subproject commit 663ef75467995acf41c51d3e21d03347e85b844e


=====================================
libraries/unix
=====================================
@@ -1 +1 @@
-Subproject commit 3be0223cee7395410915a127eba3acae5ff0b2f2
+Subproject commit 98adc732bfbfca4fef945d546ecbaae13952a950


=====================================
packages
=====================================
@@ -66,5 +66,6 @@ libraries/Win32              -           -                               https:/
 libraries/xhtml              -           -                               https://github.com/haskell/xhtml.git
 libraries/exceptions         -           -                               https://github.com/ekmett/exceptions.git
 nofib                        nofib       -                               -
+libraries/semaphore-compat   -           -                               -
 libraries/stm                -           -                               ssh://git@github.com/haskell/stm.git
 .                            -           ghc.git                         -



View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/commit/ae811dfeb8587111a26f0e2675f3e51640970ec0

-- 
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/commit/ae811dfeb8587111a26f0e2675f3e51640970ec0
You're receiving this email because of your account on gitlab.haskell.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/ghc-commits/attachments/20221220/4a43dc7a/attachment-0001.html>


More information about the ghc-commits mailing list