[Git][ghc/ghc][wip/io_uring] 2 commits: Refactor Event Manager Backend to allow for arbitrty asynchronous IO
David Eichmann
gitlab at gitlab.haskell.org
Tue Jun 2 14:52:28 UTC 2020
David Eichmann pushed to branch wip/io_uring at Glasgow Haskell Compiler / GHC
Commits:
6dcdfa25 by David Eichmann at 2020-06-01T15:12:47+01:00
Refactor Event Manager Backend to allow for arbitrty asynchronous IO
- - - - -
2aef7332 by David Eichmann at 2020-06-02T15:46:33+01:00
WIP
- - - - -
6 changed files:
- libraries/base/GHC/Event.hs
- libraries/base/GHC/Event/EPoll.hsc
- libraries/base/GHC/Event/Internal.hs
- libraries/base/GHC/Event/Manager.hs
- libraries/base/GHC/Event/Poll.hsc
- libraries/base/GHC/Event/TimerManager.hs
Changes:
=====================================
libraries/base/GHC/Event.hs
=====================================
@@ -23,7 +23,7 @@ module GHC.Event
, Event
, evtRead
, evtWrite
- , IOCallback
+ , EventCallback
, FdKey(keyFd)
, Lifetime(..)
, registerFd
=====================================
libraries/base/GHC/Event/EPoll.hsc
=====================================
@@ -71,7 +71,13 @@ new :: IO E.Backend
new = do
epfd <- epollCreate
evts <- A.new 64
- let !be = E.backend poll modifyFd modifyFdOnce delete (EPoll epfd evts)
+ let !be = E.backend
+ poll
+ modifyFd
+ modifyFdOnce
+ (\_ _ _ -> Nothing)
+ delete
+ (EPoll epfd evts)
return be
delete :: EPoll -> IO ()
@@ -109,7 +115,7 @@ modifyFdOnce ep fd evt =
-- events that are ready.
poll :: EPoll -- ^ state
-> Maybe Timeout -- ^ timeout in milliseconds
- -> (Fd -> E.Event -> IO ()) -- ^ I/O callback
+ -> (E.IOResult -> IO ()) -- ^ I/O callback
-> IO Int
poll ep mtimeout f = do
let events = epollEvents ep
@@ -122,7 +128,7 @@ poll ep mtimeout f = do
Nothing -> epollWaitNonBlock fd es cap
when (n > 0) $ do
- A.forM_ events $ \e -> f (eventFd e) (toEvent (eventTypes e))
+ A.forM_ events $ \e -> f (E.IOResult_Event (eventFd e) (toEvent (eventTypes e)))
cap <- A.capacity events
when (cap == n) $ A.ensureCapacity events (2 * cap)
return n
=====================================
libraries/base/GHC/Event/Internal.hs
=====================================
@@ -6,7 +6,10 @@ module GHC.Event.Internal
-- * Event back end
Backend
, backend
+ , IOOperation(..)
+ , IOResult(..)
, delete
+ , doIOOperation
, poll
, modifyFd
, modifyFdOnce
@@ -33,11 +36,23 @@ import Data.OldList (foldl', filter, intercalate, null)
import Foreign.C.Error (eINTR, getErrno, throwErrno)
import System.Posix.Types (Fd)
import GHC.Base
+import GHC.Event.Unique (Unique)
import GHC.Word (Word64)
import GHC.Num (Num(..))
import GHC.Show (Show(..))
import Data.Semigroup.Internal (stimesMonoid)
+data IOOperation
+ = IOOperation_Read
+ -- IOOperation_Write IOOperationID ...
+ -- ...
+
+data IOResult
+ -- | An event has occurred for a file handle.
+ = IOResult_Event Fd Event
+ -- | An IOOperation has completed.
+ | IOResult_IOComplete Unique
+
-- | An I\/O event.
newtype Event = Event Int
deriving Eq -- ^ @since 4.4.0.0
@@ -161,10 +176,11 @@ data Backend = forall a. Backend {
-- | Poll backend for new events. The provided callback is called
-- once per file descriptor with new events.
- , _bePoll :: a -- backend state
- -> Maybe Timeout -- timeout in milliseconds ('Nothing' for non-blocking poll)
- -> (Fd -> Event -> IO ()) -- I/O callback
- -> IO Int
+ , _bePoll
+ :: a -- backend state
+ -> Maybe Timeout -- timeout in milliseconds ('Nothing' for non-blocking poll)
+ -> (IOResult -> IO ()) -- I/O callback
+ -> IO Int -- ???? negative is error, 0 is success but no IOResults found, positive is success with IO Results. ???
-- | Register, modify, or unregister interest in the given events
-- on the given file descriptor.
@@ -172,48 +188,64 @@ data Backend = forall a. Backend {
-> Fd -- file descriptor
-> Event -- old events to watch for ('mempty' for new)
-> Event -- new events to watch for ('mempty' to delete)
- -> IO Bool
+ -> IO Bool -- The Bool indicates True for success,
+ -- False for a known failure, else this may throw
+ -- with `throwErrno`.
-- | Register interest in new events on a given file descriptor, set
-- to be deactivated after the first event.
, _beModifyFdOnce :: a
-> Fd -- file descriptor
-> Event -- new events to watch
- -> IO Bool
+ -> IO Bool -- Bool indicates success (see _beModifyFd)
+
+ -- | Perform some IO action (non-blocking).
+ , _beDoIOOperation
+ :: a
+ -> Unique -- Operation id.
+ -> IOOperation -- action to perform
+ -> Maybe (IO Bool) -- Nothing if the io action is not supported, and
+ -- the caller should use Fd Events instead. Else
+ -- Just the action to do the (non-blocking) IO
+ -- action. Bool indicates success (see _beModifyFd).
, _beDelete :: a -> IO ()
}
-backend :: (a -> Maybe Timeout -> (Fd -> Event -> IO ()) -> IO Int)
+backend :: (a -> Maybe Timeout -> (IOResult -> IO ()) -> IO Int)
-> (a -> Fd -> Event -> Event -> IO Bool)
-> (a -> Fd -> Event -> IO Bool)
+ -> (a -> Unique -> IOOperation -> Maybe (IO Bool))
-> (a -> IO ())
-> a
-> Backend
-backend bPoll bModifyFd bModifyFdOnce bDelete state =
- Backend state bPoll bModifyFd bModifyFdOnce bDelete
+backend bPoll bModifyFd bModifyFdOnce bDoIOOperation bDelete state =
+ Backend state bPoll bModifyFd bModifyFdOnce bDoIOOperation bDelete
{-# INLINE backend #-}
-poll :: Backend -> Maybe Timeout -> (Fd -> Event -> IO ()) -> IO Int
-poll (Backend bState bPoll _ _ _) = bPoll bState
+poll :: Backend -> Maybe Timeout -> (IOResult -> IO ()) -> IO Int
+poll (Backend bState bPoll _ _ _ _) = bPoll bState
{-# INLINE poll #-}
-- | Returns 'True' if the modification succeeded.
-- Returns 'False' if this backend does not support
-- event notifications on this type of file.
modifyFd :: Backend -> Fd -> Event -> Event -> IO Bool
-modifyFd (Backend bState _ bModifyFd _ _) = bModifyFd bState
+modifyFd (Backend bState _ bModifyFd _ _ _) = bModifyFd bState
{-# INLINE modifyFd #-}
-- | Returns 'True' if the modification succeeded.
-- Returns 'False' if this backend does not support
-- event notifications on this type of file.
modifyFdOnce :: Backend -> Fd -> Event -> IO Bool
-modifyFdOnce (Backend bState _ _ bModifyFdOnce _) = bModifyFdOnce bState
+modifyFdOnce (Backend bState _ _ bModifyFdOnce _ _) = bModifyFdOnce bState
{-# INLINE modifyFdOnce #-}
+doIOOperation :: Backend -> Unique -> IOOperation -> Maybe (IO Bool)
+doIOOperation (Backend bState _ _ _ bDoIOOperation _) = bDoIOOperation bState
+
delete :: Backend -> IO ()
-delete (Backend bState _ _ _ bDelete) = bDelete bState
+delete (Backend bState _ _ _ _ bDelete) = bDelete bState
{-# INLINE delete #-}
-- | Throw an 'Prelude.IOError' corresponding to the current value of
=====================================
libraries/base/GHC/Event/Manager.hs
=====================================
@@ -45,6 +45,7 @@ module GHC.Event.Manager
, Event
, evtRead
, evtWrite
+ , EventCallback
, IOCallback
, FdKey(keyFd)
, FdData
@@ -81,7 +82,7 @@ import GHC.Event.Control
import GHC.Event.IntTable (IntTable)
import GHC.Event.Internal (Backend, Event, evtClose, evtRead, evtWrite,
Lifetime(..), EventLifetime, Timeout(..))
-import GHC.Event.Unique (Unique, UniqueSource, newSource, newUnique)
+import GHC.Event.Unique (Unique, UniqueSource, asInt, newSource, newUnique)
import System.Posix.Types (Fd)
import qualified GHC.Event.IntTable as IT
@@ -103,7 +104,7 @@ import qualified GHC.Event.Poll as Poll
data FdData = FdData {
fdKey :: {-# UNPACK #-} !FdKey
, fdEvents :: {-# UNPACK #-} !EventLifetime
- , _fdCallback :: !IOCallback
+ , _fdCallback :: !EventCallback
}
-- | A file descriptor registration cookie.
@@ -115,7 +116,10 @@ data FdKey = FdKey {
)
-- | Callback invoked on I/O events.
-type IOCallback = FdKey -> Event -> IO ()
+type EventCallback = FdKey -> Event -> IO ()
+
+-- | Callback invoked on completion of I/O operations.
+type IOCallback = IO ()
data State = Created
| Running
@@ -130,6 +134,22 @@ data State = Created
data EventManager = EventManager
{ emBackend :: !Backend
, emFds :: {-# UNPACK #-} !(Array Int (MVar (IntTable [FdData])))
+ -- ^ The FdData for events. Array index is the Fd hash. IntTable index
+ -- is the Fd. To get all FdDatas for an Fd:
+ --
+ -- lookup (fromIntegral fd) (emFds ! hashFd someFd)
+ --
+ -- The reason for the Array is to reduce contention between threads. See
+ -- "stripping" from the ??????? paper.
+ , emOps :: {-# UNPACK #-} !(Array Int (MVar (IntTable IOCallback)))
+ -- ^ The callbackd for IO operations. Array index is the operation's
+ -- Unique hash. IntTable index is the operations Unique. To get the call
+ -- back for an operation:
+ --
+ -- lookup (asInt opUnique) (emOps ! hashUnique opUnique)
+ --
+ -- The reason for the Array is to reduce contention between threads. See
+ -- "stripping" from the ??????? paper.
, emState :: {-# UNPACK #-} !(IORef State)
, emUniqueSource :: {-# UNPACK #-} !UniqueSource
, emControl :: {-# UNPACK #-} !Control
@@ -141,13 +161,25 @@ callbackArraySize :: Int
callbackArraySize = 32
hashFd :: Fd -> Int
-hashFd fd = fromIntegral fd .&. (callbackArraySize - 1)
+hashFd fd = hashInt (fromIntegral fd)
{-# INLINE hashFd #-}
+hashUnique :: Unique -> Int
+hashUnique u = hashInt (asInt u)
+{-# INLINE hashUnique #-}
+
+hashInt :: Int -> Int
+hashInt int = int .&. (callbackArraySize - 1)
+{-# INLINE hashInt #-}
+
callbackTableVar :: EventManager -> Fd -> MVar (IntTable [FdData])
callbackTableVar mgr fd = emFds mgr ! hashFd fd
{-# INLINE callbackTableVar #-}
+opCallbackTableVar :: EventManager -> Unique -> MVar (IntTable IOCallback)
+opCallbackTableVar mgr unique = emOps mgr ! hashUnique unique
+{-# INLINE opCallbackTableVar #-}
+
haveOneShot :: Bool
{-# INLINE haveOneShot #-}
#if defined(HAVE_EPOLL) || defined(HAVE_KQUEUE)
@@ -184,8 +216,11 @@ new = newWith =<< newDefaultBackend
-- | Create a new 'EventManager' with the given polling backend.
newWith :: Backend -> IO EventManager
newWith be = do
+ let intTableInitSize = 8
iofds <- fmap (listArray (0, callbackArraySize-1)) $
- replicateM callbackArraySize (newMVar =<< IT.new 8)
+ replicateM callbackArraySize (newMVar =<< IT.new intTableInitSize)
+ ioops <- fmap (listArray (0, callbackArraySize-1)) $
+ replicateM callbackArraySize (newMVar =<< IT.new intTableInitSize)
ctrl <- newControl False
state <- newIORef Created
us <- newSource
@@ -197,6 +232,7 @@ newWith be = do
lockVar <- newMVar ()
let mgr = EventManager { emBackend = be
, emFds = iofds
+ , emOps = ioops
, emState = state
, emUniqueSource = us
, emControl = ctrl
@@ -293,12 +329,12 @@ step mgr at EventManager{..} = do
state `seq` return state
where
waitForIO = do
- n1 <- I.poll emBackend Nothing (onFdEvent mgr)
+ n1 <- I.poll emBackend Nothing (onIOResult mgr)
when (n1 <= 0) $ do
yield
- n2 <- I.poll emBackend Nothing (onFdEvent mgr)
+ n2 <- I.poll emBackend Nothing (onIOResult mgr)
when (n2 <= 0) $ do
- _ <- I.poll emBackend (Just Forever) (onFdEvent mgr)
+ _ <- I.poll emBackend (Just Forever) (onIOResult mgr)
return ()
------------------------------------------------------------------------
@@ -312,7 +348,7 @@ step mgr at EventManager{..} = do
-- platform's @select@ or @epoll@ system call, which tend to vary in
-- what sort of fds are permitted. For instance, waiting on regular files
-- is not allowed on many platforms.
-registerFd_ :: EventManager -> IOCallback -> Fd -> Event -> Lifetime
+registerFd_ :: EventManager -> EventCallback -> Fd -> Event -> Lifetime
-> IO (FdKey, Bool)
registerFd_ mgr@(EventManager{..}) cb fd evs lt = do
u <- newUnique emUniqueSource
@@ -356,13 +392,62 @@ registerFd_ mgr@(EventManager{..}) cb fd evs lt = do
-- on the file descriptor @fd@ for lifetime @lt at . @cb@ is called for
-- each event that occurs. Returns a cookie that can be handed to
-- 'unregisterFd'.
-registerFd :: EventManager -> IOCallback -> Fd -> Event -> Lifetime -> IO FdKey
+registerFd :: EventManager -> EventCallback -> Fd -> Event -> Lifetime -> IO FdKey
registerFd mgr cb fd evs lt = do
(r, wake) <- registerFd_ mgr cb fd evs lt
when wake $ wakeManager mgr
return r
{-# INLINE registerFd #-}
+-- | @registerOp mgr cb op@ registers the IO operation @op at . This returns
+-- immediately and the operation is done asynchronously. @cb@ is called when the
+-- operation completes.
+--
+-- TODO
+-- * what if the operation fails?
+-- * Should we support cancellation (return some cookie as `registerFd` does)?
+registerOp :: EventManager -> IOCallback -> I.IOOperation -> IO ()
+registerOp mgr@(EventManager{..}) cb op = do
+ -- registerFd_ mgr cb fd evs lt
+ u <- newUnique emUniqueSource
+ ok <- withMVar (opCallbackTableVar mgr u) $ \tbl -> do
+
+ ok <- case I.doIOOperation emBackend u op of
+ Nothing -> defaultRegisterOp_ mgr cb op u tbl
+ Just register -> register
+
+ if ok
+ then do
+ _nothing <- IT.insertWith
+ (error "Impossible! IO Operation Unique already exists")
+ (asInt u) cb tbl
+ return True
+ else return False
+
+ if ok
+ -- We've added an operation and need to wake the manager to check if the
+ -- operation is completed
+ then wakeManager mgr
+ -- Adding the operation failed. As with `registerFd`, we immediately call
+ -- the callback.
+ else cb
+ return ()
+{-# INLINE registerOp #-}
+
+-- | Provides a default implementation for registering an operation implemented
+-- in terms of Events. It is assumed that the caller has obtained the
+-- `opCallbackTableVar` MVar.
+defaultRegisterOp_
+ :: EventManager
+ -> IOCallback
+ -> I.IOOperation
+ -> Unique
+ -> IntTable IOCallback
+ -> IO Bool
+defaultRegisterOp_ mgr@(EventManager{..}) cb op u tbl = case op of
+ IOOperation_Read -> _
+{-# INLINE defaultRegisterOp_ #-}
+
-- | Wake up the event manager.
wakeManager :: EventManager -> IO ()
#if defined(HAVE_EPOLL) || defined(HAVE_KQUEUE)
@@ -444,6 +529,17 @@ closeFd_ mgr tbl fd = do
------------------------------------------------------------------------
-- Utilities
+-- | Call the callbacks corresponding ot the given IOResult.
+onIOResult :: EventManager -> I.IOResult -> IO ()
+onIOResult em ioResult = case ioResult of
+ I.IOResult_Event fd events -> onFdEvent em fd events
+ I.IOResult_IOComplete unique -> do
+ withMVar (opCallbackTableVar em unique) $ \tbl -> do
+ callbackMay <- IT.delete (asInt unique) tbl
+ case callbackMay of
+ Nothing -> return ()
+ Just callback -> callback
+
-- | Call the callbacks corresponding to the given file descriptor.
onFdEvent :: EventManager -> Fd -> Event -> IO ()
onFdEvent mgr fd evs
=====================================
libraries/base/GHC/Event/Poll.hsc
=====================================
@@ -51,7 +51,7 @@ data Poll = Poll {
}
new :: IO E.Backend
-new = E.backend poll modifyFd modifyFdOnce (\_ -> return ()) `liftM`
+new = E.backend poll modifyFd modifyFdOnce (\_ _ _ -> Nothing) (\_ -> return ()) `liftM`
liftM2 Poll (newMVar =<< A.empty) A.empty
modifyFd :: Poll -> Fd -> E.Event -> E.Event -> IO Bool
@@ -78,7 +78,7 @@ reworkFd p (PollFd fd npevt opevt) = do
poll :: Poll
-> Maybe E.Timeout
- -> (Fd -> E.Event -> IO ())
+ -> (E.IOResult -> IO ())
-> IO Int
poll p mtout f = do
let a = pollFd p
@@ -95,7 +95,7 @@ poll p mtout f = do
A.loop a 0 $ \i e -> do
let r = pfdRevents e
if r /= 0
- then do f (pfdFd e) (toEvent r)
+ then do f (E.IOResult_Event (pfdFd e) (toEvent r))
let i' = i + 1
return (i', i' == n)
else return (i, True)
=====================================
libraries/base/GHC/Event/TimerManager.hs
=====================================
@@ -50,9 +50,8 @@ import GHC.Num (Num(..))
import GHC.Real (quot, fromIntegral)
import GHC.Show (Show(..))
import GHC.Event.Control
-import GHC.Event.Internal (Backend, Event, evtRead, Timeout(..))
+import GHC.Event.Internal (Backend, evtRead, Timeout(..))
import GHC.Event.Unique (Unique, UniqueSource, newSource, newUnique)
-import System.Posix.Types (Fd)
import qualified GHC.Event.Internal as I
import qualified GHC.Event.PSQ as Q
@@ -99,13 +98,15 @@ data TimerManager = TimerManager
------------------------------------------------------------------------
-- Creation
-handleControlEvent :: TimerManager -> Fd -> Event -> IO ()
-handleControlEvent mgr fd _evt = do
+handleControlEvent :: TimerManager -> I.IOResult -> IO ()
+handleControlEvent mgr (I.IOResult_Event fd _evt) = do
msg <- readControlMessage (emControl mgr) fd
case msg of
CMsgWakeup -> return ()
CMsgDie -> writeIORef (emState mgr) Finished
CMsgSignal fp s -> runHandlers fp s
+-- TimerManager should only use the event api of the backend to wait on timers.
+handleControlEvent _ _ = errorWithoutStackTrace "unexpected non-event IO result"
newDefaultBackend :: IO Backend
#if defined(HAVE_POLL)
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/576e092b4a506a9b8bbc7702b03dbf80adb478fe...2aef7332df56e88c0f0555f8ca54bc945fb833f1
--
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/576e092b4a506a9b8bbc7702b03dbf80adb478fe...2aef7332df56e88c0f0555f8ca54bc945fb833f1
You're receiving this email because of your account on gitlab.haskell.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/ghc-commits/attachments/20200602/6a29b047/attachment-0001.html>
More information about the ghc-commits
mailing list