[Git][ghc/ghc][master] 2 commits: testsuite: 21651 add test for closeFdWith + setNumCapabilities

Marge Bot (@marge-bot) gitlab at gitlab.haskell.org
Wed Aug 10 10:02:08 UTC 2022



Marge Bot pushed to branch master at Glasgow Haskell Compiler / GHC


Commits:
76b52cf0 by Douglas Wilson at 2022-08-10T06:01:53-04:00
testsuite: 21651 add test for closeFdWith + setNumCapabilities

This bug does not affect windows, which does not use the
base module GHC.Event.Thread.

- - - - -
7589ee72 by Douglas Wilson at 2022-08-10T06:01:53-04:00
base: Fix races in IOManager (setNumCapabilities,closeFdWith)

Fix for #21651

Fixes three bugs:

- writes to eventManager should be atomic. It is accessed concurrently by ioManagerCapabilitiesChanged and closeFdWith.
- The race in closeFdWith described in the ticket.
- A race in getSystemEventManager where it accesses the 'IOArray' in
  'eventManager' before 'ioManagerCapabilitiesChanged' has written to
  'eventManager', causing an Array Index exception. The fix here is to
  'yield' and retry.

- - - - -


4 changed files:

- libraries/base/GHC/Event/Thread.hs
- + testsuite/tests/concurrent/should_run/T21651.hs
- + testsuite/tests/concurrent/should_run/T21651.stdout
- testsuite/tests/concurrent/should_run/all.T


Changes:

=====================================
libraries/base/GHC/Event/Thread.hs
=====================================
@@ -18,7 +18,7 @@ module GHC.Event.Thread
 -- TODO: Use new Windows I/O manager
 import Control.Exception (finally, SomeException, toException)
 import Data.Foldable (forM_, mapM_, sequence_)
-import Data.IORef (IORef, newIORef, readIORef, writeIORef)
+import Data.IORef (IORef, newIORef, readIORef, writeIORef, atomicWriteIORef)
 import Data.Maybe (fromMaybe)
 import Data.Tuple (snd)
 import Foreign.C.Error (eBADF, errnoToIOError)
@@ -29,7 +29,8 @@ import GHC.List (zipWith, zipWith3)
 import GHC.Conc.Sync (TVar, ThreadId, ThreadStatus(..), atomically, forkIO,
                       labelThread, modifyMVar_, withMVar, newTVar, sharedCAF,
                       getNumCapabilities, threadCapability, myThreadId, forkOn,
-                      threadStatus, writeTVar, newTVarIO, readTVar, retry,throwSTM,STM)
+                      threadStatus, writeTVar, newTVarIO, readTVar, retry,
+                      throwSTM, STM, yield)
 import GHC.IO (mask_, uninterruptibleMask_, onException)
 import GHC.IO.Exception (ioError)
 import GHC.IOArray (IOArray, newIOArray, readIOArray, writeIOArray,
@@ -41,6 +42,7 @@ import GHC.Event.Manager (Event, EventManager, evtRead, evtWrite, loop,
                              new, registerFd, unregisterFd_)
 import qualified GHC.Event.Manager as M
 import qualified GHC.Event.TimerManager as TM
+import GHC.Ix (inRange)
 import GHC.Num ((-), (+))
 import GHC.Real (fromIntegral)
 import GHC.Show (showSignedInt)
@@ -98,22 +100,44 @@ threadWaitWrite = threadWait evtWrite
 closeFdWith :: (Fd -> IO ())        -- ^ Action that performs the close.
             -> Fd                   -- ^ File descriptor to close.
             -> IO ()
-closeFdWith close fd = do
-  eventManagerArray <- readIORef eventManager
-  let (low, high) = boundsIOArray eventManagerArray
-  mgrs <- flip mapM [low..high] $ \i -> do
-    Just (_,!mgr) <- readIOArray eventManagerArray i
-    return mgr
-  -- 'takeMVar', and 'M.closeFd_' might block, although for a very short time.
-  -- To make 'closeFdWith' safe in presence of asynchronous exceptions we have
-  -- to use uninterruptible mask.
-  uninterruptibleMask_ $ do
-    tables <- flip mapM mgrs $ \mgr -> takeMVar $ M.callbackTableVar mgr fd
-    cbApps <- zipWithM (\mgr table -> M.closeFd_ mgr table fd) mgrs tables
-    close fd `finally` sequence_ (zipWith3 finish mgrs tables cbApps)
+closeFdWith close fd = close_loop
   where
     finish mgr table cbApp = putMVar (M.callbackTableVar mgr fd) table >> cbApp
     zipWithM f xs ys = sequence (zipWith f xs ys)
+      -- The array inside 'eventManager' can be swapped out at any time, see
+      -- 'ioManagerCapabilitiesChanged'. See #21651. We detect this case by
+      -- checking the array bounds before and after. When such a swap has
+      -- happened we cleanup and try again
+    close_loop = do
+      eventManagerArray <- readIORef eventManager
+      let ema_bounds@(low, high) = boundsIOArray eventManagerArray
+      mgrs <- flip mapM [low..high] $ \i -> do
+        Just (_,!mgr) <- readIOArray eventManagerArray i
+        return mgr
+
+      -- 'takeMVar', and 'M.closeFd_' might block, although for a very short time.
+      -- To make 'closeFdWith' safe in presence of asynchronous exceptions we have
+      -- to use uninterruptible mask.
+      join $ uninterruptibleMask_ $ do
+        tables <- flip mapM mgrs $ \mgr -> takeMVar $ M.callbackTableVar mgr fd
+        new_ema_bounds <- boundsIOArray `fmap` readIORef eventManager
+        -- Here we exploit Note [The eventManager Array]
+        if new_ema_bounds /= ema_bounds
+          then do
+            -- the array has been modified.
+            -- mgrs still holds the right EventManagers, by the Note.
+            -- new_ema_bounds must be larger than ema_bounds, by the note.
+            -- return the MVars we took and try again
+            sequence_ $ zipWith (\mgr table -> finish mgr table (pure ())) mgrs tables
+            pure close_loop
+          else do
+            -- We surely have taken all the appropriate MVars. Even if the array
+            -- has been swapped, our mgrs is still correct.
+            -- Remove the Fd from all callback tables, close the Fd, and run all
+            -- callbacks.
+            cbApps <- zipWithM (\mgr table -> M.closeFd_ mgr table fd) mgrs tables
+            close fd `finally` sequence_ (zipWith3 finish mgrs tables cbApps)
+            pure (pure ())
 
 threadWait :: Event -> Fd -> IO ()
 threadWait evt fd = mask_ $ do
@@ -177,10 +201,24 @@ threadWaitWriteSTM = threadWaitSTM evtWrite
 getSystemEventManager :: IO (Maybe EventManager)
 getSystemEventManager = do
   t <- myThreadId
-  (cap, _) <- threadCapability t
   eventManagerArray <- readIORef eventManager
-  mmgr <- readIOArray eventManagerArray cap
-  return $ fmap snd mmgr
+  let r = boundsIOArray eventManagerArray
+  (cap, _) <- threadCapability t
+  -- It is possible that we've just increased the number of capabilities and the
+  -- new EventManager has not yet been constructed by
+  -- 'ioManagerCapabilitiesChanged'. We expect this to happen very rarely.
+  -- T21561 exercises this.
+  -- Two options to proceed:
+  --  1) return the EventManager for capability 0. This is guaranteed to exist,
+  --     and "shouldn't" cause any correctness issues.
+  --  2) Busy wait, with or without a call to 'yield'. This can't deadlock,
+  --     because we must be on a brand capability and there must be a call to
+  --     'ioManagerCapabilitiesChanged' pending.
+  --
+  -- We take the second option, with the yield, judging it the most robust.
+  if not (inRange r cap)
+    then yield >> getSystemEventManager
+    else fmap snd `fmap` readIOArray eventManagerArray cap
 
 getSystemEventManager_ :: IO EventManager
 getSystemEventManager_ = do
@@ -191,6 +229,22 @@ getSystemEventManager_ = do
 foreign import ccall unsafe "getOrSetSystemEventThreadEventManagerStore"
     getOrSetSystemEventThreadEventManagerStore :: Ptr a -> IO (Ptr a)
 
+-- Note [The eventManager Array]
+-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+-- A mutable array holding the current EventManager for each capability
+-- An entry is Nothing only while the eventmanagers are initialised, see
+-- 'startIOManagerThread' and 'ioManagerCapabilitiesChanged'.
+-- The 'ThreadId' at array position 'cap'  will have been 'forkOn'ed capabality
+-- 'cap'.
+-- The array will be swapped with newer arrays when the number of capabilities
+-- changes(via 'setNumCapabilities'). However:
+--   * the size of the arrays will never decrease; and
+--   * The 'EventManager's in the array are not replaced with other
+--     'EventManager' constructors.
+--
+-- This is a similar strategy as the rts uses for it's
+-- capabilities array (n_capabilities is the size of the array,
+-- enabled_capabilities' is the number of active capabilities).
 eventManager :: IORef (IOArray Int (Maybe (ThreadId, EventManager)))
 eventManager = unsafePerformIO $ do
     numCaps <- getNumCapabilities
@@ -351,7 +405,9 @@ ioManagerCapabilitiesChanged =
                 startIOManagerThread new_eventManagerArray
 
               -- update the event manager array reference:
-              writeIORef eventManager new_eventManagerArray
+              atomicWriteIORef eventManager new_eventManagerArray
+              -- We need an atomic write here because 'eventManager' is accessed
+              -- unsynchronized in 'getSystemEventManager' and 'closeFdWith'
       else when (new_n_caps > numEnabled) $
             forM_ [numEnabled..new_n_caps-1] $ \i -> do
               Just (_,mgr) <- readIOArray eventManagerArray i


=====================================
testsuite/tests/concurrent/should_run/T21651.hs
=====================================
@@ -0,0 +1,124 @@
+{-# LANGUAGE MagicHash, UnboxedTuples #-}
+
+-- This test is adapted from setnumcapabilities001.
+
+import GHC.Conc hiding (threadWaitRead, threadWaitWrite)
+import GHC.Exts
+import GHC.IO.Encoding
+import System.Environment
+import System.IO
+import Control.Monad
+import Text.Printf
+import Data.Time.Clock
+import Control.DeepSeq
+
+import System.Posix.IO
+import System.Posix.Types
+import Control.Concurrent
+import Control.Exception
+
+passTheParcel :: Int -> IO (IO ())
+passTheParcel n = do
+  pipes@(p1 : rest) <- forM [0..n-1] $ \_ -> createPipe
+  rs@((_,tid1) : _) <- forM (pipes `zip` (rest ++ [p1])) $ \((readfd, _), (_, writefd)) -> do
+    let
+      read = fdRead readfd $ fromIntegral 1
+      write = fdWrite writefd
+    mv <- newEmptyMVar
+    tid <- forkIO $ let
+      loop = flip catch (\(x :: IOException) -> pure ()) $ forever $ do
+        threadWaitRead readfd
+        (s, _) <- read
+        threadWaitWrite writefd
+        write s
+      cleanup = do
+        closeFdWith closeFd readfd
+        closeFdWith closeFd writefd
+        putMVar mv ()
+      in loop `finally` cleanup
+    pure (mv, tid)
+
+  let
+    cleanup = do
+      killThread tid1
+      forM_ rs $ \(mv, _) -> takeMVar mv
+
+  fdWrite (snd p1) "a"
+  pure cleanup
+
+
+main = do
+  setLocaleEncoding latin1 -- fdRead and fdWrite depend on the current locale
+  [n,q,t,z] <- fmap (fmap read) getArgs
+  cleanup_ptp <- passTheParcel z
+  t <- forkIO $ do
+    forM_ (cycle ([n,n-1..1] ++ [2..n-1])) $ \m -> do
+      setNumCapabilities m
+      threadDelay t
+  printf "%d\n" (nqueens q)
+  cleanup_ptp
+  killThread t
+      -- If we don't kill the child thread, it might be about to
+      -- call setNumCapabilities() in C when the main thread exits,
+      -- and chaos can ensue.  See #12038
+
+nqueens :: Int -> Int
+nqueens nq = length (pargen 0 [])
+ where
+    safe :: Int -> Int -> [Int] -> Bool
+    safe x d []    = True
+    safe x d (q:l) = x /= q && x /= q+d && x /= q-d && safe x (d+1) l
+
+    gen :: [[Int]] -> [[Int]]
+    gen bs = [ (q:b) | b <- bs, q <- [1..nq], safe q 1 b ]
+
+    pargen :: Int -> [Int] -> [[Int]]
+    pargen n b
+       | n >= threshold = iterate gen [b] !! (nq - n)
+       | otherwise      = concat bs
+       where bs = map (pargen (n+1)) (gen [b]) `using` parList rdeepseq
+
+    threshold = 3
+
+using :: a -> Strategy a -> a
+x `using` strat = runEval (strat x)
+
+type Strategy a = a -> Eval a
+
+newtype Eval a = Eval (State# RealWorld -> (# State# RealWorld, a #))
+
+runEval :: Eval a -> a
+runEval (Eval x) = case x realWorld# of (# _, a #) -> a
+
+instance Functor Eval where
+  fmap = liftM
+
+instance Applicative Eval where
+  pure x = Eval $ \s -> (# s, x #)
+  (<*>)  = ap
+
+instance Monad Eval where
+  return = pure
+  Eval x >>= k = Eval $ \s -> case x s of
+                                (# s', a #) -> case k a of
+                                                      Eval f -> f s'
+
+parList :: Strategy a -> Strategy [a]
+parList strat = traverse (rparWith strat)
+
+rpar :: Strategy a
+rpar  x = Eval $ \s -> spark# x s
+
+rseq :: Strategy a
+rseq x = Eval $ \s -> seq# x s
+
+rparWith :: Strategy a -> Strategy a
+rparWith s a = do l <- rpar r; return (case l of Lift x -> x)
+  where r = case s a of
+              Eval f -> case f realWorld# of
+                          (# _, a' #) -> Lift a'
+
+data Lift a = Lift a
+
+rdeepseq :: NFData a => Strategy a
+rdeepseq x = do rseq (rnf x); return x


=====================================
testsuite/tests/concurrent/should_run/T21651.stdout
=====================================
@@ -0,0 +1 @@
+14200


=====================================
testsuite/tests/concurrent/should_run/all.T
=====================================
@@ -218,12 +218,20 @@ test('conc067', ignore_stdout, compile_and_run, [''])
 test('conc068', [ omit_ways(concurrent_ways), exit_code(1) ], compile_and_run, [''])
 
 test('setnumcapabilities001',
-     [ only_ways(['threaded1','threaded2', 'nonmoving_thr']),
+     [ only_ways(['threaded1','threaded2', 'nonmoving_thr', 'profthreaded']),
        extra_run_opts('8 12 2000'),
        when(have_thread_sanitizer(), expect_broken(18808)),
        req_smp ],
      compile_and_run, [''])
 
+test('T21651',
+     [ only_ways(['threaded1','threaded2', 'nonmoving_thr', 'profthreaded']),
+       when(opsys('mingw32'),skip), # uses POSIX pipes
+       when(opsys('darwin'),extra_run_opts('8 12 2000 100')),
+       unless(opsys('darwin'),extra_run_opts('8 12 2000 200')), # darwin runners complain of too many open files
+       req_smp ],
+     compile_and_run, [''])
+
 test('hs_try_putmvar001',
      [
      when(opsys('mingw32'),skip), # uses pthread APIs in the C code



View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/d71a20514546e0befe6e238d0658cbaad5a13996...7589ee7241d46b393979d98d4ded17a15ee974fb

-- 
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/d71a20514546e0befe6e238d0658cbaad5a13996...7589ee7241d46b393979d98d4ded17a15ee974fb
You're receiving this email because of your account on gitlab.haskell.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/ghc-commits/attachments/20220810/1a3a2d8f/attachment-0001.html>


More information about the ghc-commits mailing list