[Git][ghc/ghc][master] 2 commits: testsuite: 21651 add test for closeFdWith + setNumCapabilities
Marge Bot (@marge-bot)
gitlab at gitlab.haskell.org
Wed Aug 10 10:02:08 UTC 2022
Marge Bot pushed to branch master at Glasgow Haskell Compiler / GHC
Commits:
76b52cf0 by Douglas Wilson at 2022-08-10T06:01:53-04:00
testsuite: 21651 add test for closeFdWith + setNumCapabilities
This bug does not affect windows, which does not use the
base module GHC.Event.Thread.
- - - - -
7589ee72 by Douglas Wilson at 2022-08-10T06:01:53-04:00
base: Fix races in IOManager (setNumCapabilities,closeFdWith)
Fix for #21651
Fixes three bugs:
- writes to eventManager should be atomic. It is accessed concurrently by ioManagerCapabilitiesChanged and closeFdWith.
- The race in closeFdWith described in the ticket.
- A race in getSystemEventManager where it accesses the 'IOArray' in
'eventManager' before 'ioManagerCapabilitiesChanged' has written to
'eventManager', causing an Array Index exception. The fix here is to
'yield' and retry.
- - - - -
4 changed files:
- libraries/base/GHC/Event/Thread.hs
- + testsuite/tests/concurrent/should_run/T21651.hs
- + testsuite/tests/concurrent/should_run/T21651.stdout
- testsuite/tests/concurrent/should_run/all.T
Changes:
=====================================
libraries/base/GHC/Event/Thread.hs
=====================================
@@ -18,7 +18,7 @@ module GHC.Event.Thread
-- TODO: Use new Windows I/O manager
import Control.Exception (finally, SomeException, toException)
import Data.Foldable (forM_, mapM_, sequence_)
-import Data.IORef (IORef, newIORef, readIORef, writeIORef)
+import Data.IORef (IORef, newIORef, readIORef, writeIORef, atomicWriteIORef)
import Data.Maybe (fromMaybe)
import Data.Tuple (snd)
import Foreign.C.Error (eBADF, errnoToIOError)
@@ -29,7 +29,8 @@ import GHC.List (zipWith, zipWith3)
import GHC.Conc.Sync (TVar, ThreadId, ThreadStatus(..), atomically, forkIO,
labelThread, modifyMVar_, withMVar, newTVar, sharedCAF,
getNumCapabilities, threadCapability, myThreadId, forkOn,
- threadStatus, writeTVar, newTVarIO, readTVar, retry,throwSTM,STM)
+ threadStatus, writeTVar, newTVarIO, readTVar, retry,
+ throwSTM, STM, yield)
import GHC.IO (mask_, uninterruptibleMask_, onException)
import GHC.IO.Exception (ioError)
import GHC.IOArray (IOArray, newIOArray, readIOArray, writeIOArray,
@@ -41,6 +42,7 @@ import GHC.Event.Manager (Event, EventManager, evtRead, evtWrite, loop,
new, registerFd, unregisterFd_)
import qualified GHC.Event.Manager as M
import qualified GHC.Event.TimerManager as TM
+import GHC.Ix (inRange)
import GHC.Num ((-), (+))
import GHC.Real (fromIntegral)
import GHC.Show (showSignedInt)
@@ -98,22 +100,44 @@ threadWaitWrite = threadWait evtWrite
closeFdWith :: (Fd -> IO ()) -- ^ Action that performs the close.
-> Fd -- ^ File descriptor to close.
-> IO ()
-closeFdWith close fd = do
- eventManagerArray <- readIORef eventManager
- let (low, high) = boundsIOArray eventManagerArray
- mgrs <- flip mapM [low..high] $ \i -> do
- Just (_,!mgr) <- readIOArray eventManagerArray i
- return mgr
- -- 'takeMVar', and 'M.closeFd_' might block, although for a very short time.
- -- To make 'closeFdWith' safe in presence of asynchronous exceptions we have
- -- to use uninterruptible mask.
- uninterruptibleMask_ $ do
- tables <- flip mapM mgrs $ \mgr -> takeMVar $ M.callbackTableVar mgr fd
- cbApps <- zipWithM (\mgr table -> M.closeFd_ mgr table fd) mgrs tables
- close fd `finally` sequence_ (zipWith3 finish mgrs tables cbApps)
+closeFdWith close fd = close_loop
where
finish mgr table cbApp = putMVar (M.callbackTableVar mgr fd) table >> cbApp
zipWithM f xs ys = sequence (zipWith f xs ys)
+ -- The array inside 'eventManager' can be swapped out at any time, see
+ -- 'ioManagerCapabilitiesChanged'. See #21651. We detect this case by
+ -- checking the array bounds before and after. When such a swap has
+ -- happened we cleanup and try again
+ close_loop = do
+ eventManagerArray <- readIORef eventManager
+ let ema_bounds@(low, high) = boundsIOArray eventManagerArray
+ mgrs <- flip mapM [low..high] $ \i -> do
+ Just (_,!mgr) <- readIOArray eventManagerArray i
+ return mgr
+
+ -- 'takeMVar', and 'M.closeFd_' might block, although for a very short time.
+ -- To make 'closeFdWith' safe in presence of asynchronous exceptions we have
+ -- to use uninterruptible mask.
+ join $ uninterruptibleMask_ $ do
+ tables <- flip mapM mgrs $ \mgr -> takeMVar $ M.callbackTableVar mgr fd
+ new_ema_bounds <- boundsIOArray `fmap` readIORef eventManager
+ -- Here we exploit Note [The eventManager Array]
+ if new_ema_bounds /= ema_bounds
+ then do
+ -- the array has been modified.
+ -- mgrs still holds the right EventManagers, by the Note.
+ -- new_ema_bounds must be larger than ema_bounds, by the note.
+ -- return the MVars we took and try again
+ sequence_ $ zipWith (\mgr table -> finish mgr table (pure ())) mgrs tables
+ pure close_loop
+ else do
+ -- We surely have taken all the appropriate MVars. Even if the array
+ -- has been swapped, our mgrs is still correct.
+ -- Remove the Fd from all callback tables, close the Fd, and run all
+ -- callbacks.
+ cbApps <- zipWithM (\mgr table -> M.closeFd_ mgr table fd) mgrs tables
+ close fd `finally` sequence_ (zipWith3 finish mgrs tables cbApps)
+ pure (pure ())
threadWait :: Event -> Fd -> IO ()
threadWait evt fd = mask_ $ do
@@ -177,10 +201,24 @@ threadWaitWriteSTM = threadWaitSTM evtWrite
getSystemEventManager :: IO (Maybe EventManager)
getSystemEventManager = do
t <- myThreadId
- (cap, _) <- threadCapability t
eventManagerArray <- readIORef eventManager
- mmgr <- readIOArray eventManagerArray cap
- return $ fmap snd mmgr
+ let r = boundsIOArray eventManagerArray
+ (cap, _) <- threadCapability t
+ -- It is possible that we've just increased the number of capabilities and the
+ -- new EventManager has not yet been constructed by
+ -- 'ioManagerCapabilitiesChanged'. We expect this to happen very rarely.
+ -- T21561 exercises this.
+ -- Two options to proceed:
+ -- 1) return the EventManager for capability 0. This is guaranteed to exist,
+ -- and "shouldn't" cause any correctness issues.
+ -- 2) Busy wait, with or without a call to 'yield'. This can't deadlock,
+ -- because we must be on a brand capability and there must be a call to
+ -- 'ioManagerCapabilitiesChanged' pending.
+ --
+ -- We take the second option, with the yield, judging it the most robust.
+ if not (inRange r cap)
+ then yield >> getSystemEventManager
+ else fmap snd `fmap` readIOArray eventManagerArray cap
getSystemEventManager_ :: IO EventManager
getSystemEventManager_ = do
@@ -191,6 +229,22 @@ getSystemEventManager_ = do
foreign import ccall unsafe "getOrSetSystemEventThreadEventManagerStore"
getOrSetSystemEventThreadEventManagerStore :: Ptr a -> IO (Ptr a)
+-- Note [The eventManager Array]
+-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+-- A mutable array holding the current EventManager for each capability
+-- An entry is Nothing only while the eventmanagers are initialised, see
+-- 'startIOManagerThread' and 'ioManagerCapabilitiesChanged'.
+-- The 'ThreadId' at array position 'cap' will have been 'forkOn'ed capabality
+-- 'cap'.
+-- The array will be swapped with newer arrays when the number of capabilities
+-- changes(via 'setNumCapabilities'). However:
+-- * the size of the arrays will never decrease; and
+-- * The 'EventManager's in the array are not replaced with other
+-- 'EventManager' constructors.
+--
+-- This is a similar strategy as the rts uses for it's
+-- capabilities array (n_capabilities is the size of the array,
+-- enabled_capabilities' is the number of active capabilities).
eventManager :: IORef (IOArray Int (Maybe (ThreadId, EventManager)))
eventManager = unsafePerformIO $ do
numCaps <- getNumCapabilities
@@ -351,7 +405,9 @@ ioManagerCapabilitiesChanged =
startIOManagerThread new_eventManagerArray
-- update the event manager array reference:
- writeIORef eventManager new_eventManagerArray
+ atomicWriteIORef eventManager new_eventManagerArray
+ -- We need an atomic write here because 'eventManager' is accessed
+ -- unsynchronized in 'getSystemEventManager' and 'closeFdWith'
else when (new_n_caps > numEnabled) $
forM_ [numEnabled..new_n_caps-1] $ \i -> do
Just (_,mgr) <- readIOArray eventManagerArray i
=====================================
testsuite/tests/concurrent/should_run/T21651.hs
=====================================
@@ -0,0 +1,124 @@
+{-# LANGUAGE MagicHash, UnboxedTuples #-}
+
+-- This test is adapted from setnumcapabilities001.
+
+import GHC.Conc hiding (threadWaitRead, threadWaitWrite)
+import GHC.Exts
+import GHC.IO.Encoding
+import System.Environment
+import System.IO
+import Control.Monad
+import Text.Printf
+import Data.Time.Clock
+import Control.DeepSeq
+
+import System.Posix.IO
+import System.Posix.Types
+import Control.Concurrent
+import Control.Exception
+
+passTheParcel :: Int -> IO (IO ())
+passTheParcel n = do
+ pipes@(p1 : rest) <- forM [0..n-1] $ \_ -> createPipe
+ rs@((_,tid1) : _) <- forM (pipes `zip` (rest ++ [p1])) $ \((readfd, _), (_, writefd)) -> do
+ let
+ read = fdRead readfd $ fromIntegral 1
+ write = fdWrite writefd
+ mv <- newEmptyMVar
+ tid <- forkIO $ let
+ loop = flip catch (\(x :: IOException) -> pure ()) $ forever $ do
+ threadWaitRead readfd
+ (s, _) <- read
+ threadWaitWrite writefd
+ write s
+ cleanup = do
+ closeFdWith closeFd readfd
+ closeFdWith closeFd writefd
+ putMVar mv ()
+ in loop `finally` cleanup
+ pure (mv, tid)
+
+ let
+ cleanup = do
+ killThread tid1
+ forM_ rs $ \(mv, _) -> takeMVar mv
+
+ fdWrite (snd p1) "a"
+ pure cleanup
+
+
+main = do
+ setLocaleEncoding latin1 -- fdRead and fdWrite depend on the current locale
+ [n,q,t,z] <- fmap (fmap read) getArgs
+ cleanup_ptp <- passTheParcel z
+ t <- forkIO $ do
+ forM_ (cycle ([n,n-1..1] ++ [2..n-1])) $ \m -> do
+ setNumCapabilities m
+ threadDelay t
+ printf "%d\n" (nqueens q)
+ cleanup_ptp
+ killThread t
+ -- If we don't kill the child thread, it might be about to
+ -- call setNumCapabilities() in C when the main thread exits,
+ -- and chaos can ensue. See #12038
+
+nqueens :: Int -> Int
+nqueens nq = length (pargen 0 [])
+ where
+ safe :: Int -> Int -> [Int] -> Bool
+ safe x d [] = True
+ safe x d (q:l) = x /= q && x /= q+d && x /= q-d && safe x (d+1) l
+
+ gen :: [[Int]] -> [[Int]]
+ gen bs = [ (q:b) | b <- bs, q <- [1..nq], safe q 1 b ]
+
+ pargen :: Int -> [Int] -> [[Int]]
+ pargen n b
+ | n >= threshold = iterate gen [b] !! (nq - n)
+ | otherwise = concat bs
+ where bs = map (pargen (n+1)) (gen [b]) `using` parList rdeepseq
+
+ threshold = 3
+
+using :: a -> Strategy a -> a
+x `using` strat = runEval (strat x)
+
+type Strategy a = a -> Eval a
+
+newtype Eval a = Eval (State# RealWorld -> (# State# RealWorld, a #))
+
+runEval :: Eval a -> a
+runEval (Eval x) = case x realWorld# of (# _, a #) -> a
+
+instance Functor Eval where
+ fmap = liftM
+
+instance Applicative Eval where
+ pure x = Eval $ \s -> (# s, x #)
+ (<*>) = ap
+
+instance Monad Eval where
+ return = pure
+ Eval x >>= k = Eval $ \s -> case x s of
+ (# s', a #) -> case k a of
+ Eval f -> f s'
+
+parList :: Strategy a -> Strategy [a]
+parList strat = traverse (rparWith strat)
+
+rpar :: Strategy a
+rpar x = Eval $ \s -> spark# x s
+
+rseq :: Strategy a
+rseq x = Eval $ \s -> seq# x s
+
+rparWith :: Strategy a -> Strategy a
+rparWith s a = do l <- rpar r; return (case l of Lift x -> x)
+ where r = case s a of
+ Eval f -> case f realWorld# of
+ (# _, a' #) -> Lift a'
+
+data Lift a = Lift a
+
+rdeepseq :: NFData a => Strategy a
+rdeepseq x = do rseq (rnf x); return x
=====================================
testsuite/tests/concurrent/should_run/T21651.stdout
=====================================
@@ -0,0 +1 @@
+14200
=====================================
testsuite/tests/concurrent/should_run/all.T
=====================================
@@ -218,12 +218,20 @@ test('conc067', ignore_stdout, compile_and_run, [''])
test('conc068', [ omit_ways(concurrent_ways), exit_code(1) ], compile_and_run, [''])
test('setnumcapabilities001',
- [ only_ways(['threaded1','threaded2', 'nonmoving_thr']),
+ [ only_ways(['threaded1','threaded2', 'nonmoving_thr', 'profthreaded']),
extra_run_opts('8 12 2000'),
when(have_thread_sanitizer(), expect_broken(18808)),
req_smp ],
compile_and_run, [''])
+test('T21651',
+ [ only_ways(['threaded1','threaded2', 'nonmoving_thr', 'profthreaded']),
+ when(opsys('mingw32'),skip), # uses POSIX pipes
+ when(opsys('darwin'),extra_run_opts('8 12 2000 100')),
+ unless(opsys('darwin'),extra_run_opts('8 12 2000 200')), # darwin runners complain of too many open files
+ req_smp ],
+ compile_and_run, [''])
+
test('hs_try_putmvar001',
[
when(opsys('mingw32'),skip), # uses pthread APIs in the C code
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/d71a20514546e0befe6e238d0658cbaad5a13996...7589ee7241d46b393979d98d4ded17a15ee974fb
--
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/d71a20514546e0befe6e238d0658cbaad5a13996...7589ee7241d46b393979d98d4ded17a15ee974fb
You're receiving this email because of your account on gitlab.haskell.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/ghc-commits/attachments/20220810/1a3a2d8f/attachment-0001.html>
More information about the ghc-commits
mailing list