[Git][ghc/ghc][wip/jsem] 2 commits: remove -jsemcreate: semaphores should be created externally
sheaf (@sheaf)
gitlab at gitlab.haskell.org
Mon Sep 12 15:01:47 UTC 2022
sheaf pushed to branch wip/jsem at Glasgow Haskell Compiler / GHC
Commits:
701cbecd by sheaf at 2022-09-12T17:00:45+02:00
remove -jsemcreate: semaphores should be created externally
- - - - -
21bed9d5 by sheaf at 2022-09-12T17:01:15+02:00
jsem: add debouncing
- - - - -
3 changed files:
- compiler/GHC/Driver/Make.hs
- compiler/GHC/Driver/MakeSem.hs
- compiler/GHC/Driver/Session.hs
Changes:
=====================================
compiler/GHC/Driver/Make.hs
=====================================
@@ -662,7 +662,7 @@ createBuildPlan mod_graph maybe_top_mod =
mkWorkerLimit :: DynFlags -> IO WorkerLimit
mkWorkerLimit dflags =
case jsemHandle dflags of
- Just h -> pure (JSemLimit (SemaphoreName h) (jsemCreate dflags))
+ Just h -> pure (JSemLimit (SemaphoreName h))
Nothing -> case parMakeCount dflags of
Nothing -> pure $ num_procs 1
Just ParMakeNumProcessors -> num_procs <$> getNumProcessors
@@ -682,9 +682,6 @@ data WorkerLimit
| JSemLimit
SemaphoreName
-- ^ Semaphore name to use
- (Maybe Int)
- -- ^ Create a semaphore with this many tokens,
- -- or re-use an existing semaphore?
deriving Eq
-- | Generalized version of 'load' which also supports a custom
@@ -2812,8 +2809,8 @@ runWorkerLimit :: WorkerLimit -> (AbstractSem -> IO a) -> IO a
runWorkerLimit worker_limit action = case worker_limit of
NumProcessorsLimit n_jobs ->
runNjobsAbstractSem n_jobs action
- JSemLimit sem mb_create ->
- runJSemAbstractSem sem mb_create action
+ JSemLimit sem ->
+ runJSemAbstractSem sem action
-- | Build and run a pipeline
runParPipelines :: WorkerLimit -- ^ How to limit work parallelism
=====================================
compiler/GHC/Driver/MakeSem.hs
=====================================
@@ -34,7 +34,6 @@ import Control.Concurrent.MVar
import Control.Concurrent.STM
import Data.Foldable
import Data.Functor
-import Data.Maybe
#if defined(mingw32_HOST_OS)
import qualified System.Win32.Event as Win32
@@ -42,10 +41,8 @@ import qualified System.Win32.Event as Win32
import qualified System.Win32.Process as Win32
( iNFINITE )
import qualified System.Win32.Semaphore as Win32
- ( Semaphore(..)
- , createSemaphore, releaseSemaphore )
-import qualified System.Win32.Types as Win32
- ( errorWin )
+ ( Semaphore(..), sEMAPHORE_ALL_ACCESS
+ , openSemaphore, releaseSemaphore )
#else
import qualified System.Posix.Semaphore as Posix
( Semaphore, OpenSemFlags(..)
@@ -82,35 +79,17 @@ newtype SemaphoreName = SemaphoreName FilePath
-- | Open a semaphore with the given name.
--
--- If no such semaphore exists:
---
--- - create one with @toks@ tokens,
--- if the second argument is @Just toks@,
--- - error, if the second argument is @Nothing at .
+-- If no such semaphore exists, throws an error.
openSemaphore :: String -- ^ semaphore name
- -> Maybe Int -- ^ create a new semaphore with the given number of tokens,
- -- or only open an existing semaphore?
-> IO Semaphore
-openSemaphore sem_name mb_create =
+openSemaphore sem_name =
#if defined(mingw32_HOST_OS)
- do
- (sem, exists) <- Win32.createSemaphore Nothing init_toks init_toks (Just sem_name)
- unless (isJust mb_create || exists) $
- Win32.errorWin ("jsem: no semaphore " ++ sem_name ++ ", but not allowed to create one")
- pprTraceM "openSemaphore" $
- vcat [ text "sem_name:" <+> text sem_name
- , text "exists:" <+> ppr exists ]
- return sem
- where
- init_toks = maybe 0 fromIntegral mb_create
+ Win32.openSemaphore Win32.sEMAPHORE_ALL_ACCESS True sem_name
#else
- Posix.semOpen sem_name flags Posix.stdFileMode init_toks
+ Posix.semOpen sem_name flags Posix.stdFileMode 0
where
- init_toks = fromMaybe 0 mb_create
flags = Posix.OpenSemFlags
- { Posix.semCreate = isJust mb_create
- -- NB: this ensures we throw an error if no semaphore
- -- is found and we're not creating one
+ { Posix.semCreate = False
, Posix.semExclusive = False }
#endif
@@ -155,6 +134,23 @@ data Jobserver
-- obtained from the semaphore
}
+data JobserverOptions
+ = JobserverOptions
+ { releaseDebounce :: !Int
+ -- ^ Minimum delay, in milliseconds, between acquiring a token
+ -- and releasing a token.
+ , setNumCapsDebounce :: !Int
+ -- ^ Minimum delay, in milliseconds, between two consecutive
+ -- calls of 'setNumCapabilities'.
+ }
+
+defaultJobserverOptions :: JobserverOptions
+defaultJobserverOptions =
+ JobserverOptions
+ { releaseDebounce = 100
+ , setNumCapsDebounce = 100
+ }
+
-- | Resources available for running jobs, i.e.
-- tokens obtained from the parallelism semaphore.
data JobResources
@@ -167,6 +163,7 @@ data JobResources
-- ^ Pending jobs waiting on a token
}
+
-- | Add one new token.
addToken :: JobResources -> JobResources
addToken jobs@( Jobs { tokensOwned = owned, tokensFree = free })
@@ -200,6 +197,19 @@ addJob job jobs@( Jobs { jobsWaiting = wait })
-- | The state of the semaphore job server.
data JobserverState
+ = JobserverState
+ { jobserverAction :: !JobserverAction
+ -- ^ The current action being performed by the
+ -- job server.
+ , canChangeNumCaps :: !(TVar Bool)
+ -- ^ A TVar that signals whether it has been long
+ -- enough since we last changed 'numCapabilities'.
+ , canReleaseToken :: !(TVar Bool)
+ -- ^ A TVar that signals whether we last acquired
+ -- a token long enough ago that we can now release
+ -- a token.
+ }
+data JobserverAction
-- | The jobserver is idle: no thread is currently
-- interacting with the semaphore.
= Idle
@@ -214,7 +224,7 @@ data JobserverState
-- | Retrieve the 'TMVar' that signals if the current thread has finished,
-- if any thread is currently active in the jobserver.
-activeThread_maybe :: JobserverState -> Maybe (TMVar (Maybe MC.SomeException))
+activeThread_maybe :: JobserverAction -> Maybe (TMVar (Maybe MC.SomeException))
activeThread_maybe Idle = Nothing
activeThread_maybe (Acquiring { threadFinished = tmvar }) = Just tmvar
activeThread_maybe (Releasing { threadFinished = tmvar }) = Just tmvar
@@ -309,7 +319,7 @@ modifyJobResources jobs_tvar action = do
-- | Spawn a new thread that waits on the semaphore in order to acquire
-- an additional token.
-acquireThread :: Jobserver -> IO JobserverState
+acquireThread :: Jobserver -> IO JobserverAction
acquireThread (Jobserver { semaphore = sem, jobs = jobs_tvar })
= do
threadFinished_tmvar <- newEmptyTMVarIO
@@ -330,13 +340,14 @@ acquireThread (Jobserver { semaphore = sem, jobs = jobs_tvar })
-- | Spawn a thread to release ownership of one resource from the semaphore,
-- provided we have spare resources and no pending jobs.
-releaseThread :: Jobserver -> IO JobserverState
+releaseThread :: Jobserver -> IO JobserverAction
releaseThread (Jobserver { semaphore = sem, jobs = jobs_tvar }) = do
threadFinished_tmvar <- newEmptyTMVarIO
MC.mask_ do
still_ok_to_release
<- atomically $ modifyJobResources jobs_tvar \ jobs ->
if guardRelease jobs
+ -- TODO: should this also debounce?
then return (True , removeFreeToken jobs)
else return (False, jobs)
if not still_ok_to_release
@@ -372,15 +383,22 @@ releaseThread (Jobserver { semaphore = sem, jobs = jobs_tvar }) = do
-- spawn a thread to acquire a new token from the semaphore.
--
-- See 'acquireThread'.
-tryAcquire :: Jobserver
+tryAcquire :: JobserverOptions
+ -> Jobserver
-> JobserverState
-> STM (IO JobserverState)
-tryAcquire sjs@( Jobserver { jobs = jobs_tvar }) Idle
+tryAcquire opts js@( Jobserver { jobs = jobs_tvar })
+ st@( JobserverState { jobserverAction = Idle } )
= do
jobs <- readTVar jobs_tvar
guard $ guardAcquire jobs
- return $ acquireThread sjs
-tryAcquire _ _ = retry
+ return do
+ action <- acquireThread js
+ -- Set a debounce after acquiring a token.
+ can_release_tvar <- registerDelay $ releaseDebounce opts
+ return $ st { jobserverAction = action
+ , canReleaseToken = can_release_tvar }
+tryAcquire _ _ _ = retry
-- | When there are free tokens and no pending jobs,
-- spawn a thread to release a token from the semamphore.
@@ -389,43 +407,64 @@ tryAcquire _ _ = retry
tryRelease :: Jobserver
-> JobserverState
-> STM (IO JobserverState)
-tryRelease sjs@( Jobserver { jobs = jobs_tvar }) Idle
+tryRelease sjs@( Jobserver { jobs = jobs_tvar } )
+ st@( JobserverState
+ { jobserverAction = Idle
+ , canReleaseToken = can_release_tvar } )
= do
jobs <- readTVar jobs_tvar
guard $ guardRelease jobs
- return $ releaseThread sjs
+ can_release <- readTVar can_release_tvar
+ guard can_release
+ return do
+ action <- releaseThread sjs
+ return $ st { jobserverAction = action }
tryRelease _ _ = retry
-- | Wait for an active thread to finish. Once it finishes:
--
--- - set the 'JobserverState' to 'Idle',
+-- - set the 'JobserverAction' to 'Idle',
-- - update the number of capabilities to reflect the number
-- of owned tokens from the semaphore.
-tryNoticeIdle :: TVar JobResources
+tryNoticeIdle :: JobserverOptions
+ -> TVar JobResources
-> JobserverState
-> STM (IO JobserverState)
-tryNoticeIdle jobs_tvar jobserver_state
- | Just threadFinished_tmvar <- activeThread_maybe jobserver_state
- = sync_num_caps threadFinished_tmvar
+tryNoticeIdle opts jobs_tvar jobserver_state
+ | Just threadFinished_tmvar <- activeThread_maybe $ jobserverAction jobserver_state
+ = sync_num_caps (canChangeNumCaps jobserver_state) threadFinished_tmvar
| otherwise
= retry -- no active thread: wait until jobserver isn't idle
where
- sync_num_caps :: TMVar (Maybe MC.SomeException) -> STM (IO JobserverState)
- sync_num_caps threadFinished_tmvar = do
+ sync_num_caps :: TVar Bool
+ -> TMVar (Maybe MC.SomeException)
+ -> STM (IO JobserverState)
+ sync_num_caps can_change_numcaps_tvar threadFinished_tmvar = do
mb_ex <- takeTMVar threadFinished_tmvar
for_ mb_ex MC.throwM
Jobs { tokensOwned } <- readTVar jobs_tvar
+ can_change_numcaps <- readTVar can_change_numcaps_tvar
+ guard can_change_numcaps
return do
x <- getNumCapabilities
+ can_change_numcaps_tvar_2 <-
+ if x == tokensOwned
+ then return can_change_numcaps_tvar
+ else do
+ setNumCapabilities tokensOwned
+ registerDelay $ setNumCapsDebounce opts
when (x /= tokensOwned) $ setNumCapabilities tokensOwned
- return Idle
+ return $
+ jobserver_state
+ { jobserverAction = Idle
+ , canChangeNumCaps = can_change_numcaps_tvar_2 }
-- | Try to stop the current thread which is acquiring/releasing resources
-- if that operation is no longer relevant.
tryStopThread :: TVar JobResources
-> JobserverState
-> STM (IO JobserverState)
-tryStopThread jobs_tvar ls = case ls of
+tryStopThread jobs_tvar jsj = case jobserverAction jsj of
Acquiring { activeThread = tid } -> do
jobs <- readTVar jobs_tvar
guard $ null (jobsWaiting jobs)
@@ -436,36 +475,50 @@ tryStopThread jobs_tvar ls = case ls of
return $ kill_thread_and_idle tid
Idle -> retry
where
- kill_thread_and_idle tid = killThread tid $> Idle
+ kill_thread_and_idle tid =
+ killThread tid $> jsj { jobserverAction = Idle }
-- | Main jobserver loop: acquire/release resources as
-- needed for the pending jobs and available semaphore tokens.
-jobserverLoop :: Jobserver -> IO ()
-jobserverLoop sjs@(Jobserver { jobs = jobs_tvar })
- = loop Idle
+jobserverLoop :: JobserverOptions -> Jobserver -> IO ()
+jobserverLoop opts sjs@(Jobserver { jobs = jobs_tvar })
+ = do
+ true_tvar <- newTVarIO True
+ let init_state :: JobserverState
+ init_state =
+ JobserverState
+ { jobserverAction = Idle
+ , canChangeNumCaps = true_tvar
+ , canReleaseToken = true_tvar }
+ loop init_state
where
loop s = do
action <- atomically $ asum $ (\x -> x s) <$>
[ tryRelease sjs
- , tryAcquire sjs
- , tryNoticeIdle jobs_tvar
+ , tryAcquire opts sjs
+ , tryNoticeIdle opts jobs_tvar
, tryStopThread jobs_tvar
]
s <- action
loop s
-- | Create a new jobserver using the given semaphore handle.
-makeJobserver :: SemaphoreName -> Maybe Int -> IO (AbstractSem, IO ())
-makeJobserver (SemaphoreName sem_path) mb_create = do
- semaphore <- openSemaphore sem_path mb_create
+makeJobserver :: SemaphoreName -> IO (AbstractSem, IO ())
+makeJobserver (SemaphoreName sem_path) = do
+ semaphore <- openSemaphore sem_path
let
- init_jobs = Jobs { tokensOwned = 1, tokensFree = 1, jobsWaiting = NilOL }
+ init_jobs =
+ Jobs { tokensOwned = 1
+ , tokensFree = 1
+ , jobsWaiting = NilOL
+ }
jobs_tvar <- newTVarIO init_jobs
let
+ opts = defaultJobserverOptions -- TODO: allow this to be configure
sjs = Jobserver { semaphore, jobs = jobs_tvar }
loop_finished_mvar <- newEmptyMVar
loop_tid <- forkIOWithUnmask \ unmask -> do
- r <- try $ unmask $ jobserverLoop sjs
+ r <- try $ unmask $ jobserverLoop opts sjs
putMVar loop_finished_mvar $
case r of
Left e
@@ -489,12 +542,11 @@ makeJobserver (SemaphoreName sem_path) mb_create = do
-- | Implement an abstract semaphore using a semaphore 'Jobserver'
-- which queries the system semaphore of the given name for resources.
runJSemAbstractSem :: SemaphoreName -- ^ the system semaphore to use
- -> Maybe Int -- ^ create a semaphore if none exists?
-> (AbstractSem -> IO a) -- ^ the operation to run
-- which requires a semaphore
-> IO a
-runJSemAbstractSem sem mb_create action = MC.mask \ unmask -> do
- (abs, cleanup) <- makeJobserver sem mb_create
+runJSemAbstractSem sem action = MC.mask \ unmask -> do
+ (abs, cleanup) <- makeJobserver sem
r <- try $ unmask $ action abs
case r of
Left (e1 :: MC.SomeException) -> do
=====================================
compiler/GHC/Driver/Session.hs
=====================================
@@ -466,9 +466,6 @@ data DynFlags = DynFlags {
jsemHandle :: Maybe FilePath,
-- ^ A handle to a parallelism semaphore
- jsemCreate :: Maybe Int,
- -- ^ Create a semaphore with the given number of tokens?
- -- If 'Nothing', then use an existing semaphore.
enableTimeStats :: Bool, -- ^ Enable RTS timing statistics?
ghcHeapSize :: Maybe Int, -- ^ The heap size to set.
@@ -1155,7 +1152,6 @@ defaultDynFlags mySettings =
parMakeCount = Nothing,
jsemHandle = Nothing,
- jsemCreate = Nothing,
enableTimeStats = False,
ghcHeapSize = Nothing,
@@ -2087,8 +2083,7 @@ dynamic_flags_deps = [
-- as specifying that the number of
-- parallel builds is equal to the
-- result of getNumProcessors
- , make_ord_flag defGhcFlag "jsem" $ hasArg $ \f d -> d { jsemHandle = Just f }
- , make_ord_flag defGhcFlag "jsem-create" $ intSuffix $ \i d -> d { jsemCreate = Just i }
+ , make_ord_flag defGhcFlag "jsem" $ hasArg $ \f d -> d { jsemHandle = Just f }
, make_ord_flag defFlag "instantiated-with" (sepArg setUnitInstantiations)
, make_ord_flag defFlag "this-component-id" (sepArg setUnitInstanceOf)
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/1d40400534078b9151ceee6d55bf1a89d0f4dff7...21bed9d573ec6fb79c6776e9c27d64732235ebad
--
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/1d40400534078b9151ceee6d55bf1a89d0f4dff7...21bed9d573ec6fb79c6776e9c27d64732235ebad
You're receiving this email because of your account on gitlab.haskell.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/ghc-commits/attachments/20220912/891cb93d/attachment-0001.html>
More information about the ghc-commits
mailing list