[Git][ghc/ghc][wip/jsem] 2 commits: remove -jsemcreate: semaphores should be created externally

sheaf (@sheaf) gitlab at gitlab.haskell.org
Mon Sep 12 15:01:47 UTC 2022



sheaf pushed to branch wip/jsem at Glasgow Haskell Compiler / GHC


Commits:
701cbecd by sheaf at 2022-09-12T17:00:45+02:00
remove -jsemcreate: semaphores should be created externally

- - - - -
21bed9d5 by sheaf at 2022-09-12T17:01:15+02:00
jsem: add debouncing

- - - - -


3 changed files:

- compiler/GHC/Driver/Make.hs
- compiler/GHC/Driver/MakeSem.hs
- compiler/GHC/Driver/Session.hs


Changes:

=====================================
compiler/GHC/Driver/Make.hs
=====================================
@@ -662,7 +662,7 @@ createBuildPlan mod_graph maybe_top_mod =
 mkWorkerLimit :: DynFlags -> IO WorkerLimit
 mkWorkerLimit dflags =
   case jsemHandle dflags of
-    Just h -> pure (JSemLimit (SemaphoreName h) (jsemCreate dflags))
+    Just h -> pure (JSemLimit (SemaphoreName h))
     Nothing -> case parMakeCount dflags of
       Nothing -> pure $ num_procs 1
       Just ParMakeNumProcessors -> num_procs <$> getNumProcessors
@@ -682,9 +682,6 @@ data WorkerLimit
   | JSemLimit
     SemaphoreName
       -- ^ Semaphore name to use
-    (Maybe Int)
-      -- ^ Create a semaphore with this many tokens,
-      -- or re-use an existing semaphore?
   deriving Eq
 
 -- | Generalized version of 'load' which also supports a custom
@@ -2812,8 +2809,8 @@ runWorkerLimit :: WorkerLimit -> (AbstractSem -> IO a) -> IO a
 runWorkerLimit worker_limit action = case worker_limit of
     NumProcessorsLimit n_jobs ->
       runNjobsAbstractSem n_jobs action
-    JSemLimit sem mb_create ->
-      runJSemAbstractSem sem mb_create action
+    JSemLimit sem ->
+      runJSemAbstractSem sem action
 
 -- | Build and run a pipeline
 runParPipelines :: WorkerLimit -- ^ How to limit work parallelism


=====================================
compiler/GHC/Driver/MakeSem.hs
=====================================
@@ -34,7 +34,6 @@ import Control.Concurrent.MVar
 import Control.Concurrent.STM
 import Data.Foldable
 import Data.Functor
-import Data.Maybe
 
 #if defined(mingw32_HOST_OS)
 import qualified System.Win32.Event     as Win32
@@ -42,10 +41,8 @@ import qualified System.Win32.Event     as Win32
 import qualified System.Win32.Process   as Win32
   ( iNFINITE )
 import qualified System.Win32.Semaphore as Win32
-  ( Semaphore(..)
-  , createSemaphore, releaseSemaphore )
-import qualified System.Win32.Types     as Win32
-  ( errorWin )
+  ( Semaphore(..), sEMAPHORE_ALL_ACCESS
+  , openSemaphore, releaseSemaphore )
 #else
 import qualified System.Posix.Semaphore as Posix
   ( Semaphore, OpenSemFlags(..)
@@ -82,35 +79,17 @@ newtype SemaphoreName = SemaphoreName FilePath
 
 -- | Open a semaphore with the given name.
 --
--- If no such semaphore exists:
---
---  - create one with @toks@ tokens,
---    if the second argument is @Just toks@,
---  - error, if the second argument is @Nothing at .
+-- If no such semaphore exists, throws an error.
 openSemaphore :: String    -- ^ semaphore name
-              -> Maybe Int -- ^ create a new semaphore with the given number of tokens,
-                           -- or only open an existing semaphore?
               -> IO Semaphore
-openSemaphore sem_name mb_create =
+openSemaphore sem_name =
 #if defined(mingw32_HOST_OS)
-  do
-    (sem, exists) <- Win32.createSemaphore Nothing init_toks init_toks (Just sem_name)
-    unless (isJust mb_create || exists) $
-      Win32.errorWin ("jsem: no semaphore " ++ sem_name ++ ", but not allowed to create one")
-    pprTraceM "openSemaphore" $
-      vcat [ text "sem_name:" <+> text sem_name
-           , text "exists:" <+> ppr exists ]
-    return sem
-  where
-    init_toks = maybe 0 fromIntegral mb_create
+  Win32.openSemaphore Win32.sEMAPHORE_ALL_ACCESS True sem_name
 #else
-  Posix.semOpen sem_name flags Posix.stdFileMode init_toks
+  Posix.semOpen sem_name flags Posix.stdFileMode 0
   where
-    init_toks = fromMaybe 0 mb_create
     flags = Posix.OpenSemFlags
-          { Posix.semCreate    = isJust mb_create
-              -- NB: this ensures we throw an error if no semaphore
-              -- is found and we're not creating one
+          { Posix.semCreate    = False
           , Posix.semExclusive = False }
 #endif
 
@@ -155,6 +134,23 @@ data Jobserver
     -- obtained from the semaphore
   }
 
+data JobserverOptions
+  = JobserverOptions
+  { releaseDebounce    :: !Int
+     -- ^ Minimum delay, in milliseconds, between acquiring a token
+     -- and releasing a token.
+  , setNumCapsDebounce :: !Int
+    -- ^ Minimum delay, in milliseconds, between two consecutive
+    -- calls of 'setNumCapabilities'.
+  }
+
+defaultJobserverOptions :: JobserverOptions
+defaultJobserverOptions =
+  JobserverOptions
+    { releaseDebounce    = 100
+    , setNumCapsDebounce = 100
+    }
+
 -- | Resources available for running jobs, i.e.
 -- tokens obtained from the parallelism semaphore.
 data JobResources
@@ -167,6 +163,7 @@ data JobResources
     -- ^ Pending jobs waiting on a token
   }
 
+
 -- | Add one new token.
 addToken :: JobResources -> JobResources
 addToken jobs@( Jobs { tokensOwned = owned, tokensFree = free })
@@ -200,6 +197,19 @@ addJob job jobs@( Jobs { jobsWaiting = wait })
 
 -- | The state of the semaphore job server.
 data JobserverState
+  = JobserverState
+    { jobserverAction  :: !JobserverAction
+      -- ^ The current action being performed by the
+      -- job server.
+    , canChangeNumCaps :: !(TVar Bool)
+      -- ^ A TVar that signals whether it has been long
+      -- enough since we last changed 'numCapabilities'.
+    , canReleaseToken  :: !(TVar Bool)
+      -- ^ A TVar that signals whether we last acquired
+      -- a token long enough ago that we can now release
+      -- a token.
+    }
+data JobserverAction
   -- | The jobserver is idle: no thread is currently
   -- interacting with the semaphore.
   = Idle
@@ -214,7 +224,7 @@ data JobserverState
 
 -- | Retrieve the 'TMVar' that signals if the current thread has finished,
 -- if any thread is currently active in the jobserver.
-activeThread_maybe :: JobserverState -> Maybe (TMVar (Maybe MC.SomeException))
+activeThread_maybe :: JobserverAction -> Maybe (TMVar (Maybe MC.SomeException))
 activeThread_maybe Idle                                   = Nothing
 activeThread_maybe (Acquiring { threadFinished = tmvar }) = Just tmvar
 activeThread_maybe (Releasing { threadFinished = tmvar }) = Just tmvar
@@ -309,7 +319,7 @@ modifyJobResources jobs_tvar action = do
 
 -- | Spawn a new thread that waits on the semaphore in order to acquire
 -- an additional token.
-acquireThread :: Jobserver -> IO JobserverState
+acquireThread :: Jobserver -> IO JobserverAction
 acquireThread (Jobserver { semaphore = sem, jobs = jobs_tvar })
   = do
     threadFinished_tmvar <- newEmptyTMVarIO
@@ -330,13 +340,14 @@ acquireThread (Jobserver { semaphore = sem, jobs = jobs_tvar })
 
 -- | Spawn a thread to release ownership of one resource from the semaphore,
 -- provided we have spare resources and no pending jobs.
-releaseThread :: Jobserver -> IO JobserverState
+releaseThread :: Jobserver -> IO JobserverAction
 releaseThread (Jobserver { semaphore = sem, jobs = jobs_tvar }) = do
   threadFinished_tmvar <- newEmptyTMVarIO
   MC.mask_ do
     still_ok_to_release
       <- atomically $ modifyJobResources jobs_tvar \ jobs ->
         if guardRelease jobs
+            -- TODO: should this also debounce?
         then return (True , removeFreeToken jobs)
         else return (False, jobs)
     if not still_ok_to_release
@@ -372,15 +383,22 @@ releaseThread (Jobserver { semaphore = sem, jobs = jobs_tvar }) = do
 -- spawn a thread to acquire a new token from the semaphore.
 --
 -- See 'acquireThread'.
-tryAcquire :: Jobserver
+tryAcquire :: JobserverOptions
+           -> Jobserver
            -> JobserverState
            -> STM (IO JobserverState)
-tryAcquire sjs@( Jobserver { jobs = jobs_tvar }) Idle
+tryAcquire opts js@( Jobserver { jobs = jobs_tvar })
+  st@( JobserverState { jobserverAction = Idle } )
   = do
     jobs <- readTVar jobs_tvar
     guard $ guardAcquire jobs
-    return $ acquireThread sjs
-tryAcquire _ _ = retry
+    return do
+      action           <- acquireThread js
+      -- Set a debounce after acquiring a token.
+      can_release_tvar <- registerDelay $ releaseDebounce opts
+      return $ st { jobserverAction = action
+                  , canReleaseToken = can_release_tvar }
+tryAcquire _ _ _ = retry
 
 -- | When there are free tokens and no pending jobs,
 -- spawn a thread to release a token from the semamphore.
@@ -389,43 +407,64 @@ tryAcquire _ _ = retry
 tryRelease :: Jobserver
            -> JobserverState
            -> STM (IO JobserverState)
-tryRelease sjs@( Jobserver { jobs = jobs_tvar }) Idle
+tryRelease sjs@( Jobserver { jobs = jobs_tvar } )
+  st@( JobserverState
+      { jobserverAction = Idle
+      , canReleaseToken = can_release_tvar } )
   = do
     jobs <- readTVar jobs_tvar
     guard  $ guardRelease jobs
-    return $ releaseThread sjs
+    can_release <- readTVar can_release_tvar
+    guard can_release
+    return do
+      action <- releaseThread sjs
+      return $ st { jobserverAction = action }
 tryRelease _ _ = retry
 
 -- | Wait for an active thread to finish. Once it finishes:
 --
---  - set the 'JobserverState' to 'Idle',
+--  - set the 'JobserverAction' to 'Idle',
 --  - update the number of capabilities to reflect the number
 --    of owned tokens from the semaphore.
-tryNoticeIdle :: TVar JobResources
+tryNoticeIdle :: JobserverOptions
+              -> TVar JobResources
               -> JobserverState
               -> STM (IO JobserverState)
-tryNoticeIdle jobs_tvar jobserver_state
-  | Just threadFinished_tmvar <- activeThread_maybe jobserver_state
-  = sync_num_caps threadFinished_tmvar
+tryNoticeIdle opts jobs_tvar jobserver_state
+  | Just threadFinished_tmvar <- activeThread_maybe $ jobserverAction jobserver_state
+  = sync_num_caps (canChangeNumCaps jobserver_state) threadFinished_tmvar
   | otherwise
   = retry -- no active thread: wait until jobserver isn't idle
   where
-    sync_num_caps :: TMVar (Maybe MC.SomeException) -> STM (IO JobserverState)
-    sync_num_caps threadFinished_tmvar = do
+    sync_num_caps :: TVar Bool
+                  -> TMVar (Maybe MC.SomeException)
+                  -> STM (IO JobserverState)
+    sync_num_caps can_change_numcaps_tvar threadFinished_tmvar = do
       mb_ex <- takeTMVar threadFinished_tmvar
       for_ mb_ex MC.throwM
       Jobs { tokensOwned } <- readTVar jobs_tvar
+      can_change_numcaps <- readTVar can_change_numcaps_tvar
+      guard can_change_numcaps
       return do
         x <- getNumCapabilities
+        can_change_numcaps_tvar_2 <-
+          if x == tokensOwned
+          then return can_change_numcaps_tvar
+          else do
+            setNumCapabilities tokensOwned
+            registerDelay $ setNumCapsDebounce opts
         when (x /= tokensOwned) $ setNumCapabilities tokensOwned
-        return Idle
+        return $
+          jobserver_state
+            { jobserverAction  = Idle
+            , canChangeNumCaps = can_change_numcaps_tvar_2 }
 
 -- | Try to stop the current thread which is acquiring/releasing resources
 -- if that operation is no longer relevant.
 tryStopThread :: TVar JobResources
               -> JobserverState
               -> STM (IO JobserverState)
-tryStopThread jobs_tvar ls = case ls of
+tryStopThread jobs_tvar jsj = case jobserverAction jsj of
   Acquiring { activeThread = tid } -> do
     jobs <- readTVar jobs_tvar
     guard  $ null (jobsWaiting jobs)
@@ -436,36 +475,50 @@ tryStopThread jobs_tvar ls = case ls of
     return $ kill_thread_and_idle tid
   Idle -> retry
   where
-    kill_thread_and_idle tid = killThread tid $> Idle
+    kill_thread_and_idle tid =
+      killThread tid $> jsj { jobserverAction = Idle }
 
 -- | Main jobserver loop: acquire/release resources as
 -- needed for the pending jobs and available semaphore tokens.
-jobserverLoop :: Jobserver -> IO ()
-jobserverLoop sjs@(Jobserver { jobs = jobs_tvar })
-  = loop Idle
+jobserverLoop :: JobserverOptions -> Jobserver -> IO ()
+jobserverLoop opts sjs@(Jobserver { jobs = jobs_tvar })
+  = do
+      true_tvar <- newTVarIO True
+      let init_state :: JobserverState
+          init_state =
+            JobserverState
+              { jobserverAction  = Idle
+              , canChangeNumCaps = true_tvar
+              , canReleaseToken  = true_tvar }
+      loop init_state
   where
     loop s = do
       action <- atomically $ asum $ (\x -> x s) <$>
         [ tryRelease    sjs
-        , tryAcquire    sjs
-        , tryNoticeIdle jobs_tvar
+        , tryAcquire    opts sjs
+        , tryNoticeIdle opts jobs_tvar
         , tryStopThread jobs_tvar
         ]
       s <- action
       loop s
 
 -- | Create a new jobserver using the given semaphore handle.
-makeJobserver :: SemaphoreName -> Maybe Int -> IO (AbstractSem, IO ())
-makeJobserver (SemaphoreName sem_path) mb_create = do
-  semaphore <- openSemaphore sem_path mb_create
+makeJobserver :: SemaphoreName -> IO (AbstractSem, IO ())
+makeJobserver (SemaphoreName sem_path) = do
+  semaphore <- openSemaphore sem_path
   let
-    init_jobs = Jobs { tokensOwned = 1, tokensFree = 1, jobsWaiting = NilOL }
+    init_jobs =
+      Jobs { tokensOwned = 1
+           , tokensFree  = 1
+           , jobsWaiting = NilOL
+           }
   jobs_tvar <- newTVarIO init_jobs
   let
+    opts = defaultJobserverOptions -- TODO: allow this to be configure
     sjs = Jobserver { semaphore, jobs = jobs_tvar }
   loop_finished_mvar <- newEmptyMVar
   loop_tid <- forkIOWithUnmask \ unmask -> do
-    r <- try $ unmask $ jobserverLoop sjs
+    r <- try $ unmask $ jobserverLoop opts sjs
     putMVar loop_finished_mvar $
       case r of
         Left e
@@ -489,12 +542,11 @@ makeJobserver (SemaphoreName sem_path) mb_create = do
 -- | Implement an abstract semaphore using a semaphore 'Jobserver'
 -- which queries the system semaphore of the given name for resources.
 runJSemAbstractSem :: SemaphoreName         -- ^ the system semaphore to use
-                   -> Maybe Int             -- ^ create a semaphore if none exists?
                    -> (AbstractSem -> IO a) -- ^ the operation to run
                                             -- which requires a semaphore
                    -> IO a
-runJSemAbstractSem sem mb_create action = MC.mask \ unmask -> do
-  (abs, cleanup) <- makeJobserver sem mb_create
+runJSemAbstractSem sem action = MC.mask \ unmask -> do
+  (abs, cleanup) <- makeJobserver sem
   r <- try $ unmask $ action abs
   case r of
     Left (e1 :: MC.SomeException) -> do


=====================================
compiler/GHC/Driver/Session.hs
=====================================
@@ -466,9 +466,6 @@ data DynFlags = DynFlags {
 
   jsemHandle            :: Maybe FilePath,
     -- ^ A handle to a parallelism semaphore
-  jsemCreate            :: Maybe Int,
-    -- ^ Create a semaphore with the given number of tokens?
-    -- If 'Nothing', then use an existing semaphore.
 
   enableTimeStats       :: Bool,        -- ^ Enable RTS timing statistics?
   ghcHeapSize           :: Maybe Int,   -- ^ The heap size to set.
@@ -1155,7 +1152,6 @@ defaultDynFlags mySettings =
 
         parMakeCount            = Nothing,
         jsemHandle              = Nothing,
-        jsemCreate              = Nothing,
 
         enableTimeStats         = False,
         ghcHeapSize             = Nothing,
@@ -2087,8 +2083,7 @@ dynamic_flags_deps = [
                  -- as specifying that the number of
                  -- parallel builds is equal to the
                  -- result of getNumProcessors
-  , make_ord_flag defGhcFlag "jsem"        $ hasArg    $ \f d -> d { jsemHandle = Just f }
-  , make_ord_flag defGhcFlag "jsem-create" $ intSuffix $ \i d -> d { jsemCreate = Just i }
+  , make_ord_flag defGhcFlag "jsem" $ hasArg $ \f d -> d { jsemHandle = Just f }
 
   , make_ord_flag defFlag "instantiated-with"   (sepArg setUnitInstantiations)
   , make_ord_flag defFlag "this-component-id"   (sepArg setUnitInstanceOf)



View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/1d40400534078b9151ceee6d55bf1a89d0f4dff7...21bed9d573ec6fb79c6776e9c27d64732235ebad

-- 
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/1d40400534078b9151ceee6d55bf1a89d0f4dff7...21bed9d573ec6fb79c6776e9c27d64732235ebad
You're receiving this email because of your account on gitlab.haskell.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/ghc-commits/attachments/20220912/891cb93d/attachment-0001.html>


More information about the ghc-commits mailing list