[Git][ghc/ghc][wip/jsem] jsem: add debouncing

sheaf (@sheaf) gitlab at gitlab.haskell.org
Mon Sep 12 15:30:07 UTC 2022



sheaf pushed to branch wip/jsem at Glasgow Haskell Compiler / GHC


Commits:
c5cc0bc3 by sheaf at 2022-09-12T17:29:55+02:00
jsem: add debouncing

- - - - -


1 changed file:

- compiler/GHC/Driver/MakeSem.hs


Changes:

=====================================
compiler/GHC/Driver/MakeSem.hs
=====================================
@@ -134,6 +134,23 @@ data Jobserver
     -- obtained from the semaphore
   }
 
+data JobserverOptions
+  = JobserverOptions
+  { releaseDebounce    :: !Int
+     -- ^ Minimum delay, in milliseconds, between acquiring a token
+     -- and releasing a token.
+  , setNumCapsDebounce :: !Int
+    -- ^ Minimum delay, in milliseconds, between two consecutive
+    -- calls of 'setNumCapabilities'.
+  }
+
+defaultJobserverOptions :: JobserverOptions
+defaultJobserverOptions =
+  JobserverOptions
+    { releaseDebounce    = 100
+    , setNumCapsDebounce = 100
+    }
+
 -- | Resources available for running jobs, i.e.
 -- tokens obtained from the parallelism semaphore.
 data JobResources
@@ -180,6 +197,19 @@ addJob job jobs@( Jobs { jobsWaiting = wait })
 
 -- | The state of the semaphore job server.
 data JobserverState
+  = JobserverState
+    { jobserverAction  :: !JobserverAction
+      -- ^ The current action being performed by the
+      -- job server.
+    , canChangeNumCaps :: !(TVar Bool)
+      -- ^ A TVar that signals whether it has been long
+      -- enough since we last changed 'numCapabilities'.
+    , canReleaseToken  :: !(TVar Bool)
+      -- ^ A TVar that signals whether we last acquired
+      -- a token long enough ago that we can now release
+      -- a token.
+    }
+data JobserverAction
   -- | The jobserver is idle: no thread is currently
   -- interacting with the semaphore.
   = Idle
@@ -194,7 +224,7 @@ data JobserverState
 
 -- | Retrieve the 'TMVar' that signals if the current thread has finished,
 -- if any thread is currently active in the jobserver.
-activeThread_maybe :: JobserverState -> Maybe (TMVar (Maybe MC.SomeException))
+activeThread_maybe :: JobserverAction -> Maybe (TMVar (Maybe MC.SomeException))
 activeThread_maybe Idle                                   = Nothing
 activeThread_maybe (Acquiring { threadFinished = tmvar }) = Just tmvar
 activeThread_maybe (Releasing { threadFinished = tmvar }) = Just tmvar
@@ -289,7 +319,7 @@ modifyJobResources jobs_tvar action = do
 
 -- | Spawn a new thread that waits on the semaphore in order to acquire
 -- an additional token.
-acquireThread :: Jobserver -> IO JobserverState
+acquireThread :: Jobserver -> IO JobserverAction
 acquireThread (Jobserver { semaphore = sem, jobs = jobs_tvar })
   = do
     threadFinished_tmvar <- newEmptyTMVarIO
@@ -310,13 +340,14 @@ acquireThread (Jobserver { semaphore = sem, jobs = jobs_tvar })
 
 -- | Spawn a thread to release ownership of one resource from the semaphore,
 -- provided we have spare resources and no pending jobs.
-releaseThread :: Jobserver -> IO JobserverState
+releaseThread :: Jobserver -> IO JobserverAction
 releaseThread (Jobserver { semaphore = sem, jobs = jobs_tvar }) = do
   threadFinished_tmvar <- newEmptyTMVarIO
   MC.mask_ do
     still_ok_to_release
       <- atomically $ modifyJobResources jobs_tvar \ jobs ->
         if guardRelease jobs
+            -- TODO: should this also debounce?
         then return (True , removeFreeToken jobs)
         else return (False, jobs)
     if not still_ok_to_release
@@ -352,15 +383,22 @@ releaseThread (Jobserver { semaphore = sem, jobs = jobs_tvar }) = do
 -- spawn a thread to acquire a new token from the semaphore.
 --
 -- See 'acquireThread'.
-tryAcquire :: Jobserver
+tryAcquire :: JobserverOptions
+           -> Jobserver
            -> JobserverState
            -> STM (IO JobserverState)
-tryAcquire sjs@( Jobserver { jobs = jobs_tvar }) Idle
+tryAcquire opts js@( Jobserver { jobs = jobs_tvar })
+  st@( JobserverState { jobserverAction = Idle } )
   = do
     jobs <- readTVar jobs_tvar
     guard $ guardAcquire jobs
-    return $ acquireThread sjs
-tryAcquire _ _ = retry
+    return do
+      action           <- acquireThread js
+      -- Set a debounce after acquiring a token.
+      can_release_tvar <- registerDelay $ releaseDebounce opts
+      return $ st { jobserverAction = action
+                  , canReleaseToken = can_release_tvar }
+tryAcquire _ _ _ = retry
 
 -- | When there are free tokens and no pending jobs,
 -- spawn a thread to release a token from the semamphore.
@@ -369,43 +407,63 @@ tryAcquire _ _ = retry
 tryRelease :: Jobserver
            -> JobserverState
            -> STM (IO JobserverState)
-tryRelease sjs@( Jobserver { jobs = jobs_tvar }) Idle
+tryRelease sjs@( Jobserver { jobs = jobs_tvar } )
+  st@( JobserverState
+      { jobserverAction = Idle
+      , canReleaseToken = can_release_tvar } )
   = do
     jobs <- readTVar jobs_tvar
     guard  $ guardRelease jobs
-    return $ releaseThread sjs
+    can_release <- readTVar can_release_tvar
+    guard can_release
+    return do
+      action <- releaseThread sjs
+      return $ st { jobserverAction = action }
 tryRelease _ _ = retry
 
 -- | Wait for an active thread to finish. Once it finishes:
 --
---  - set the 'JobserverState' to 'Idle',
+--  - set the 'JobserverAction' to 'Idle',
 --  - update the number of capabilities to reflect the number
 --    of owned tokens from the semaphore.
-tryNoticeIdle :: TVar JobResources
+tryNoticeIdle :: JobserverOptions
+              -> TVar JobResources
               -> JobserverState
               -> STM (IO JobserverState)
-tryNoticeIdle jobs_tvar jobserver_state
-  | Just threadFinished_tmvar <- activeThread_maybe jobserver_state
-  = sync_num_caps threadFinished_tmvar
+tryNoticeIdle opts jobs_tvar jobserver_state
+  | Just threadFinished_tmvar <- activeThread_maybe $ jobserverAction jobserver_state
+  = sync_num_caps (canChangeNumCaps jobserver_state) threadFinished_tmvar
   | otherwise
   = retry -- no active thread: wait until jobserver isn't idle
   where
-    sync_num_caps :: TMVar (Maybe MC.SomeException) -> STM (IO JobserverState)
-    sync_num_caps threadFinished_tmvar = do
+    sync_num_caps :: TVar Bool
+                  -> TMVar (Maybe MC.SomeException)
+                  -> STM (IO JobserverState)
+    sync_num_caps can_change_numcaps_tvar threadFinished_tmvar = do
       mb_ex <- takeTMVar threadFinished_tmvar
       for_ mb_ex MC.throwM
       Jobs { tokensOwned } <- readTVar jobs_tvar
+      can_change_numcaps <- readTVar can_change_numcaps_tvar
+      guard can_change_numcaps
       return do
         x <- getNumCapabilities
-        when (x /= tokensOwned) $ setNumCapabilities tokensOwned
-        return Idle
+        can_change_numcaps_tvar_2 <-
+          if x == tokensOwned
+          then return can_change_numcaps_tvar
+          else do
+            setNumCapabilities tokensOwned
+            registerDelay $ setNumCapsDebounce opts
+        return $
+          jobserver_state
+            { jobserverAction  = Idle
+            , canChangeNumCaps = can_change_numcaps_tvar_2 }
 
 -- | Try to stop the current thread which is acquiring/releasing resources
 -- if that operation is no longer relevant.
 tryStopThread :: TVar JobResources
               -> JobserverState
               -> STM (IO JobserverState)
-tryStopThread jobs_tvar ls = case ls of
+tryStopThread jobs_tvar jsj = case jobserverAction jsj of
   Acquiring { activeThread = tid } -> do
     jobs <- readTVar jobs_tvar
     guard  $ null (jobsWaiting jobs)
@@ -416,19 +474,28 @@ tryStopThread jobs_tvar ls = case ls of
     return $ kill_thread_and_idle tid
   Idle -> retry
   where
-    kill_thread_and_idle tid = killThread tid $> Idle
+    kill_thread_and_idle tid =
+      killThread tid $> jsj { jobserverAction = Idle }
 
 -- | Main jobserver loop: acquire/release resources as
 -- needed for the pending jobs and available semaphore tokens.
-jobserverLoop :: Jobserver -> IO ()
-jobserverLoop sjs@(Jobserver { jobs = jobs_tvar })
-  = loop Idle
+jobserverLoop :: JobserverOptions -> Jobserver -> IO ()
+jobserverLoop opts sjs@(Jobserver { jobs = jobs_tvar })
+  = do
+      true_tvar <- newTVarIO True
+      let init_state :: JobserverState
+          init_state =
+            JobserverState
+              { jobserverAction  = Idle
+              , canChangeNumCaps = true_tvar
+              , canReleaseToken  = true_tvar }
+      loop init_state
   where
     loop s = do
       action <- atomically $ asum $ (\x -> x s) <$>
         [ tryRelease    sjs
-        , tryAcquire    sjs
-        , tryNoticeIdle jobs_tvar
+        , tryAcquire    opts sjs
+        , tryNoticeIdle opts jobs_tvar
         , tryStopThread jobs_tvar
         ]
       s <- action
@@ -446,10 +513,11 @@ makeJobserver (SemaphoreName sem_path) = do
            }
   jobs_tvar <- newTVarIO init_jobs
   let
+    opts = defaultJobserverOptions -- TODO: allow this to be configure
     sjs = Jobserver { semaphore, jobs = jobs_tvar }
   loop_finished_mvar <- newEmptyMVar
   loop_tid <- forkIOWithUnmask \ unmask -> do
-    r <- try $ unmask $ jobserverLoop sjs
+    r <- try $ unmask $ jobserverLoop opts sjs
     putMVar loop_finished_mvar $
       case r of
         Left e



View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/commit/c5cc0bc320a34fee0dd27ad965bf929c59425df7

-- 
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/commit/c5cc0bc320a34fee0dd27ad965bf929c59425df7
You're receiving this email because of your account on gitlab.haskell.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/ghc-commits/attachments/20220912/bbf27162/attachment-0001.html>


More information about the ghc-commits mailing list