[Git][ghc/ghc][wip/io_uring] 2 commits: Refactor Event Manager Backend to allow for arbitrty asynchronous IO

David Eichmann gitlab at gitlab.haskell.org
Tue Jun 2 14:52:28 UTC 2020



David Eichmann pushed to branch wip/io_uring at Glasgow Haskell Compiler / GHC


Commits:
6dcdfa25 by David Eichmann at 2020-06-01T15:12:47+01:00
Refactor Event Manager Backend to allow for arbitrty asynchronous IO

- - - - -
2aef7332 by David Eichmann at 2020-06-02T15:46:33+01:00
WIP

- - - - -


6 changed files:

- libraries/base/GHC/Event.hs
- libraries/base/GHC/Event/EPoll.hsc
- libraries/base/GHC/Event/Internal.hs
- libraries/base/GHC/Event/Manager.hs
- libraries/base/GHC/Event/Poll.hsc
- libraries/base/GHC/Event/TimerManager.hs


Changes:

=====================================
libraries/base/GHC/Event.hs
=====================================
@@ -23,7 +23,7 @@ module GHC.Event
     , Event
     , evtRead
     , evtWrite
-    , IOCallback
+    , EventCallback
     , FdKey(keyFd)
     , Lifetime(..)
     , registerFd


=====================================
libraries/base/GHC/Event/EPoll.hsc
=====================================
@@ -71,7 +71,13 @@ new :: IO E.Backend
 new = do
   epfd <- epollCreate
   evts <- A.new 64
-  let !be = E.backend poll modifyFd modifyFdOnce delete (EPoll epfd evts)
+  let !be = E.backend
+                poll
+                modifyFd
+                modifyFdOnce
+                (\_ _ _ -> Nothing)
+                delete
+                (EPoll epfd evts)
   return be
 
 delete :: EPoll -> IO ()
@@ -109,7 +115,7 @@ modifyFdOnce ep fd evt =
 -- events that are ready.
 poll :: EPoll                     -- ^ state
      -> Maybe Timeout             -- ^ timeout in milliseconds
-     -> (Fd -> E.Event -> IO ())  -- ^ I/O callback
+     -> (E.IOResult -> IO ())     -- ^ I/O callback
      -> IO Int
 poll ep mtimeout f = do
   let events = epollEvents ep
@@ -122,7 +128,7 @@ poll ep mtimeout f = do
     Nothing      -> epollWaitNonBlock fd es cap
 
   when (n > 0) $ do
-    A.forM_ events $ \e -> f (eventFd e) (toEvent (eventTypes e))
+    A.forM_ events $ \e -> f (E.IOResult_Event (eventFd e) (toEvent (eventTypes e)))
     cap <- A.capacity events
     when (cap == n) $ A.ensureCapacity events (2 * cap)
   return n


=====================================
libraries/base/GHC/Event/Internal.hs
=====================================
@@ -6,7 +6,10 @@ module GHC.Event.Internal
     -- * Event back end
       Backend
     , backend
+    , IOOperation(..)
+    , IOResult(..)
     , delete
+    , doIOOperation
     , poll
     , modifyFd
     , modifyFdOnce
@@ -33,11 +36,23 @@ import Data.OldList (foldl', filter, intercalate, null)
 import Foreign.C.Error (eINTR, getErrno, throwErrno)
 import System.Posix.Types (Fd)
 import GHC.Base
+import GHC.Event.Unique (Unique)
 import GHC.Word (Word64)
 import GHC.Num (Num(..))
 import GHC.Show (Show(..))
 import Data.Semigroup.Internal (stimesMonoid)
 
+data IOOperation
+    = IOOperation_Read
+    -- IOOperation_Write IOOperationID ...
+    -- ...
+
+data IOResult
+    -- | An event has occurred for a file handle.
+    = IOResult_Event Fd Event
+    -- | An IOOperation has completed.
+    | IOResult_IOComplete Unique
+
 -- | An I\/O event.
 newtype Event = Event Int
     deriving Eq -- ^ @since 4.4.0.0
@@ -161,10 +176,11 @@ data Backend = forall a. Backend {
 
     -- | Poll backend for new events.  The provided callback is called
     -- once per file descriptor with new events.
-    , _bePoll :: a                          -- backend state
-              -> Maybe Timeout              -- timeout in milliseconds ('Nothing' for non-blocking poll)
-              -> (Fd -> Event -> IO ())     -- I/O callback
-              -> IO Int
+    , _bePoll
+        :: a                    -- backend state
+        -> Maybe Timeout        -- timeout in milliseconds ('Nothing' for non-blocking poll)
+        -> (IOResult -> IO ())  -- I/O callback
+        -> IO Int               -- ???? negative is error, 0 is success but no IOResults found, positive is success with IO Results. ???
 
     -- | Register, modify, or unregister interest in the given events
     -- on the given file descriptor.
@@ -172,48 +188,64 @@ data Backend = forall a. Backend {
                   -> Fd       -- file descriptor
                   -> Event    -- old events to watch for ('mempty' for new)
                   -> Event    -- new events to watch for ('mempty' to delete)
-                  -> IO Bool
+                  -> IO Bool  -- The Bool indicates True for success,
+                              -- False for a known failure, else this may throw
+                              -- with `throwErrno`.
 
     -- | Register interest in new events on a given file descriptor, set
     -- to be deactivated after the first event.
     , _beModifyFdOnce :: a
                          -> Fd    -- file descriptor
                          -> Event -- new events to watch
-                         -> IO Bool
+                         -> IO Bool -- Bool indicates success (see _beModifyFd)
+
+    -- | Perform some IO action (non-blocking).
+    , _beDoIOOperation
+        :: a
+        -> Unique            -- Operation id.
+        -> IOOperation       -- action to perform
+        -> Maybe (IO Bool)   -- Nothing if the io action is not supported, and
+                             -- the caller should use Fd Events instead. Else
+                             -- Just the action to do the (non-blocking) IO
+                             -- action. Bool indicates success (see _beModifyFd).
 
     , _beDelete :: a -> IO ()
     }
 
-backend :: (a -> Maybe Timeout -> (Fd -> Event -> IO ()) -> IO Int)
+backend :: (a -> Maybe Timeout -> (IOResult -> IO ()) -> IO Int)
         -> (a -> Fd -> Event -> Event -> IO Bool)
         -> (a -> Fd -> Event -> IO Bool)
+        -> (a -> Unique -> IOOperation -> Maybe (IO Bool))
         -> (a -> IO ())
         -> a
         -> Backend
-backend bPoll bModifyFd bModifyFdOnce bDelete state =
-  Backend state bPoll bModifyFd bModifyFdOnce bDelete
+backend bPoll bModifyFd bModifyFdOnce bDoIOOperation bDelete state =
+  Backend state bPoll bModifyFd bModifyFdOnce bDoIOOperation bDelete
 {-# INLINE backend #-}
 
-poll :: Backend -> Maybe Timeout -> (Fd -> Event -> IO ()) -> IO Int
-poll (Backend bState bPoll _ _ _) = bPoll bState
+poll :: Backend -> Maybe Timeout -> (IOResult -> IO ()) -> IO Int
+poll (Backend bState bPoll _ _ _ _) = bPoll bState
 {-# INLINE poll #-}
 
 -- | Returns 'True' if the modification succeeded.
 -- Returns 'False' if this backend does not support
 -- event notifications on this type of file.
 modifyFd :: Backend -> Fd -> Event -> Event -> IO Bool
-modifyFd (Backend bState _ bModifyFd _ _) = bModifyFd bState
+modifyFd (Backend bState _ bModifyFd _ _ _) = bModifyFd bState
 {-# INLINE modifyFd #-}
 
 -- | Returns 'True' if the modification succeeded.
 -- Returns 'False' if this backend does not support
 -- event notifications on this type of file.
 modifyFdOnce :: Backend -> Fd -> Event -> IO Bool
-modifyFdOnce (Backend bState _ _ bModifyFdOnce _) = bModifyFdOnce bState
+modifyFdOnce (Backend bState _ _ bModifyFdOnce _ _) = bModifyFdOnce bState
 {-# INLINE modifyFdOnce #-}
 
+doIOOperation :: Backend -> Unique -> IOOperation -> Maybe (IO Bool)
+doIOOperation (Backend bState _ _ _ bDoIOOperation _) = bDoIOOperation bState
+
 delete :: Backend -> IO ()
-delete (Backend bState _ _ _ bDelete) = bDelete bState
+delete (Backend bState _ _ _ _ bDelete) = bDelete bState
 {-# INLINE delete #-}
 
 -- | Throw an 'Prelude.IOError' corresponding to the current value of


=====================================
libraries/base/GHC/Event/Manager.hs
=====================================
@@ -45,6 +45,7 @@ module GHC.Event.Manager
     , Event
     , evtRead
     , evtWrite
+    , EventCallback
     , IOCallback
     , FdKey(keyFd)
     , FdData
@@ -81,7 +82,7 @@ import GHC.Event.Control
 import GHC.Event.IntTable (IntTable)
 import GHC.Event.Internal (Backend, Event, evtClose, evtRead, evtWrite,
                            Lifetime(..), EventLifetime, Timeout(..))
-import GHC.Event.Unique (Unique, UniqueSource, newSource, newUnique)
+import GHC.Event.Unique (Unique, UniqueSource, asInt, newSource, newUnique)
 import System.Posix.Types (Fd)
 
 import qualified GHC.Event.IntTable as IT
@@ -103,7 +104,7 @@ import qualified GHC.Event.Poll   as Poll
 data FdData = FdData {
       fdKey       :: {-# UNPACK #-} !FdKey
     , fdEvents    :: {-# UNPACK #-} !EventLifetime
-    , _fdCallback :: !IOCallback
+    , _fdCallback :: !EventCallback
     }
 
 -- | A file descriptor registration cookie.
@@ -115,7 +116,10 @@ data FdKey = FdKey {
                )
 
 -- | Callback invoked on I/O events.
-type IOCallback = FdKey -> Event -> IO ()
+type EventCallback = FdKey -> Event -> IO ()
+
+-- | Callback invoked on completion of I/O operations.
+type IOCallback = IO ()
 
 data State = Created
            | Running
@@ -130,6 +134,22 @@ data State = Created
 data EventManager = EventManager
     { emBackend      :: !Backend
     , emFds          :: {-# UNPACK #-} !(Array Int (MVar (IntTable [FdData])))
+        -- ^ The FdData for events. Array index is the Fd hash. IntTable index
+        -- is the Fd. To get all FdDatas for an Fd:
+        --
+        --     lookup (fromIntegral fd) (emFds ! hashFd someFd)
+        --
+        -- The reason for the Array is to reduce contention between threads. See
+        -- "stripping" from the ??????? paper.
+    , emOps          :: {-# UNPACK #-} !(Array Int (MVar (IntTable IOCallback)))
+        -- ^ The callbackd for IO operations. Array index is the operation's
+        -- Unique hash. IntTable index is the operations Unique. To get the call
+        -- back for an operation:
+        --
+        --     lookup (asInt opUnique) (emOps ! hashUnique opUnique)
+        --
+        -- The reason for the Array is to reduce contention between threads. See
+        -- "stripping" from the ??????? paper.
     , emState        :: {-# UNPACK #-} !(IORef State)
     , emUniqueSource :: {-# UNPACK #-} !UniqueSource
     , emControl      :: {-# UNPACK #-} !Control
@@ -141,13 +161,25 @@ callbackArraySize :: Int
 callbackArraySize = 32
 
 hashFd :: Fd -> Int
-hashFd fd = fromIntegral fd .&. (callbackArraySize - 1)
+hashFd fd = hashInt (fromIntegral fd)
 {-# INLINE hashFd #-}
 
+hashUnique :: Unique -> Int
+hashUnique u = hashInt (asInt u)
+{-# INLINE hashUnique #-}
+
+hashInt :: Int -> Int
+hashInt int = int .&. (callbackArraySize - 1)
+{-# INLINE hashInt #-}
+
 callbackTableVar :: EventManager -> Fd -> MVar (IntTable [FdData])
 callbackTableVar mgr fd = emFds mgr ! hashFd fd
 {-# INLINE callbackTableVar #-}
 
+opCallbackTableVar :: EventManager -> Unique -> MVar (IntTable IOCallback)
+opCallbackTableVar mgr unique = emOps mgr ! hashUnique unique
+{-# INLINE opCallbackTableVar #-}
+
 haveOneShot :: Bool
 {-# INLINE haveOneShot #-}
 #if defined(HAVE_EPOLL) || defined(HAVE_KQUEUE)
@@ -184,8 +216,11 @@ new = newWith =<< newDefaultBackend
 -- | Create a new 'EventManager' with the given polling backend.
 newWith :: Backend -> IO EventManager
 newWith be = do
+  let intTableInitSize = 8
   iofds <- fmap (listArray (0, callbackArraySize-1)) $
-           replicateM callbackArraySize (newMVar =<< IT.new 8)
+           replicateM callbackArraySize (newMVar =<< IT.new intTableInitSize)
+  ioops <- fmap (listArray (0, callbackArraySize-1)) $
+           replicateM callbackArraySize (newMVar =<< IT.new intTableInitSize)
   ctrl <- newControl False
   state <- newIORef Created
   us <- newSource
@@ -197,6 +232,7 @@ newWith be = do
   lockVar <- newMVar ()
   let mgr = EventManager { emBackend = be
                          , emFds = iofds
+                         , emOps = ioops
                          , emState = state
                          , emUniqueSource = us
                          , emControl = ctrl
@@ -293,12 +329,12 @@ step mgr at EventManager{..} = do
   state `seq` return state
   where
     waitForIO = do
-      n1 <- I.poll emBackend Nothing (onFdEvent mgr)
+      n1 <- I.poll emBackend Nothing (onIOResult mgr)
       when (n1 <= 0) $ do
         yield
-        n2 <- I.poll emBackend Nothing (onFdEvent mgr)
+        n2 <- I.poll emBackend Nothing (onIOResult mgr)
         when (n2 <= 0) $ do
-          _ <- I.poll emBackend (Just Forever) (onFdEvent mgr)
+          _ <- I.poll emBackend (Just Forever) (onIOResult mgr)
           return ()
 
 ------------------------------------------------------------------------
@@ -312,7 +348,7 @@ step mgr at EventManager{..} = do
 -- platform's @select@ or @epoll@ system call, which tend to vary in
 -- what sort of fds are permitted. For instance, waiting on regular files
 -- is not allowed on many platforms.
-registerFd_ :: EventManager -> IOCallback -> Fd -> Event -> Lifetime
+registerFd_ :: EventManager -> EventCallback -> Fd -> Event -> Lifetime
             -> IO (FdKey, Bool)
 registerFd_ mgr@(EventManager{..}) cb fd evs lt = do
   u <- newUnique emUniqueSource
@@ -356,13 +392,62 @@ registerFd_ mgr@(EventManager{..}) cb fd evs lt = do
 -- on the file descriptor @fd@ for lifetime @lt at . @cb@ is called for
 -- each event that occurs.  Returns a cookie that can be handed to
 -- 'unregisterFd'.
-registerFd :: EventManager -> IOCallback -> Fd -> Event -> Lifetime -> IO FdKey
+registerFd :: EventManager -> EventCallback -> Fd -> Event -> Lifetime -> IO FdKey
 registerFd mgr cb fd evs lt = do
   (r, wake) <- registerFd_ mgr cb fd evs lt
   when wake $ wakeManager mgr
   return r
 {-# INLINE registerFd #-}
 
+-- | @registerOp mgr cb op@ registers the IO operation @op at . This returns
+-- immediately and the operation is done asynchronously. @cb@ is called when the
+-- operation completes.
+--
+-- TODO
+--  * what if the operation fails?
+--  * Should we support cancellation (return some cookie as `registerFd` does)?
+registerOp :: EventManager -> IOCallback -> I.IOOperation -> IO ()
+registerOp mgr@(EventManager{..}) cb op = do
+  -- registerFd_ mgr cb fd evs lt
+  u <- newUnique emUniqueSource
+  ok <- withMVar (opCallbackTableVar mgr u) $ \tbl -> do
+
+    ok <- case I.doIOOperation emBackend u op of
+      Nothing -> defaultRegisterOp_ mgr cb op u tbl
+      Just register -> register
+
+    if ok
+      then do
+        _nothing <- IT.insertWith
+          (error "Impossible! IO Operation Unique already exists")
+          (asInt u) cb tbl
+        return True
+      else return False
+
+  if ok
+    -- We've added an operation and need to wake the manager to check if the
+    -- operation is completed
+    then wakeManager mgr
+    -- Adding the operation failed. As with `registerFd`, we immediately call
+    -- the callback.
+    else cb
+  return ()
+{-# INLINE registerOp #-}
+
+-- | Provides a default implementation for registering an operation implemented
+-- in terms of Events. It is assumed that the caller has obtained the
+-- `opCallbackTableVar` MVar.
+defaultRegisterOp_
+  :: EventManager
+  -> IOCallback
+  -> I.IOOperation
+  -> Unique
+  -> IntTable IOCallback
+  -> IO Bool
+defaultRegisterOp_ mgr@(EventManager{..}) cb op u tbl = case op of
+  IOOperation_Read -> _
+{-# INLINE defaultRegisterOp_ #-}
+
 -- | Wake up the event manager.
 wakeManager :: EventManager -> IO ()
 #if defined(HAVE_EPOLL) || defined(HAVE_KQUEUE)
@@ -444,6 +529,17 @@ closeFd_ mgr tbl fd = do
 ------------------------------------------------------------------------
 -- Utilities
 
+-- | Call the callbacks corresponding ot the given IOResult.
+onIOResult :: EventManager -> I.IOResult -> IO ()
+onIOResult em ioResult = case ioResult of
+  I.IOResult_Event fd events -> onFdEvent em fd events
+  I.IOResult_IOComplete unique -> do
+    withMVar (opCallbackTableVar em unique) $ \tbl -> do
+      callbackMay <- IT.delete (asInt unique) tbl
+      case callbackMay of
+        Nothing -> return ()
+        Just callback -> callback
+
 -- | Call the callbacks corresponding to the given file descriptor.
 onFdEvent :: EventManager -> Fd -> Event -> IO ()
 onFdEvent mgr fd evs


=====================================
libraries/base/GHC/Event/Poll.hsc
=====================================
@@ -51,7 +51,7 @@ data Poll = Poll {
     }
 
 new :: IO E.Backend
-new = E.backend poll modifyFd modifyFdOnce (\_ -> return ()) `liftM`
+new = E.backend poll modifyFd modifyFdOnce (\_ _ _ -> Nothing) (\_ -> return ()) `liftM`
       liftM2 Poll (newMVar =<< A.empty) A.empty
 
 modifyFd :: Poll -> Fd -> E.Event -> E.Event -> IO Bool
@@ -78,7 +78,7 @@ reworkFd p (PollFd fd npevt opevt) = do
 
 poll :: Poll
      -> Maybe E.Timeout
-     -> (Fd -> E.Event -> IO ())
+     -> (E.IOResult -> IO ())
      -> IO Int
 poll p mtout f = do
   let a = pollFd p
@@ -95,7 +95,7 @@ poll p mtout f = do
     A.loop a 0 $ \i e -> do
       let r = pfdRevents e
       if r /= 0
-        then do f (pfdFd e) (toEvent r)
+        then do f (E.IOResult_Event (pfdFd e) (toEvent r))
                 let i' = i + 1
                 return (i', i' == n)
         else return (i, True)


=====================================
libraries/base/GHC/Event/TimerManager.hs
=====================================
@@ -50,9 +50,8 @@ import GHC.Num (Num(..))
 import GHC.Real (quot, fromIntegral)
 import GHC.Show (Show(..))
 import GHC.Event.Control
-import GHC.Event.Internal (Backend, Event, evtRead, Timeout(..))
+import GHC.Event.Internal (Backend, evtRead, Timeout(..))
 import GHC.Event.Unique (Unique, UniqueSource, newSource, newUnique)
-import System.Posix.Types (Fd)
 
 import qualified GHC.Event.Internal as I
 import qualified GHC.Event.PSQ as Q
@@ -99,13 +98,15 @@ data TimerManager = TimerManager
 ------------------------------------------------------------------------
 -- Creation
 
-handleControlEvent :: TimerManager -> Fd -> Event -> IO ()
-handleControlEvent mgr fd _evt = do
+handleControlEvent :: TimerManager -> I.IOResult -> IO ()
+handleControlEvent mgr (I.IOResult_Event fd _evt) = do
   msg <- readControlMessage (emControl mgr) fd
   case msg of
     CMsgWakeup      -> return ()
     CMsgDie         -> writeIORef (emState mgr) Finished
     CMsgSignal fp s -> runHandlers fp s
+-- TimerManager should only use the event api of the backend to wait on timers.
+handleControlEvent _ _ = errorWithoutStackTrace "unexpected non-event IO result"
 
 newDefaultBackend :: IO Backend
 #if defined(HAVE_POLL)



View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/576e092b4a506a9b8bbc7702b03dbf80adb478fe...2aef7332df56e88c0f0555f8ca54bc945fb833f1

-- 
View it on GitLab: https://gitlab.haskell.org/ghc/ghc/-/compare/576e092b4a506a9b8bbc7702b03dbf80adb478fe...2aef7332df56e88c0f0555f8ca54bc945fb833f1
You're receiving this email because of your account on gitlab.haskell.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/ghc-commits/attachments/20200602/6a29b047/attachment-0001.html>


More information about the ghc-commits mailing list