[Haskell-cafe] broadcasting stateful computations

Olaf Klinke olf at aatal-apotheke.de
Mon Sep 6 09:04:52 UTC 2021


Dear Compl, 

thanks for putting so much thought into this. Regarding your
suggestion, I'm afraid I misled you somehow or I don't understand the
purpose of your code. I don't need the entire history of each value,
only the most recent one. Remark: The ValueNode looks like a ListT
(like in list-t package) over the STM monad.

We're developing a gateway between two data protocols. We have a Python
implementation, but it is not parallel enough and message parsing is
too slow. Therefore we hired MLabs to implement the source protocol in
Haskell [1]. The gateway does the following. 

1. Parent thread reads a config and forks several concurrent child
threads, each talking to a different server. The state of the children
is read through TVars.
2. The child thread polls data from its assigned server, decodes the
message and executes pre-defined callbacks [2], which are functions
    type Callback m = Value -> UTCTime -> m ().
3. Each data packet can contain several data items and I want these
callbacks to be executed in parallel, too. This is because we expect
the network IO to be the slowest part in the entire process. I use the
parallel-io package for that. 

import Control.Monad.Reader
import Control.Monad.IO.Unlift (MonadUnliftIO(..))
import Control.Concurrent.ParallelIO.Local (Pool,parallel_)

type CallbackStrategy m n = forall f. Foldable f => 
   f (m ()) -> n ()
sequentialStrategy :: Applicative m => CallbackStrategy m m
sequentialStrategy = Data.Foldable.sequenceA_    
-- ^ just sequence the callbacks

concurrentStrategy :: MonadUnliftIO m => 
   CallbackStrategy m (ReaderT Pool m)
concurrentStrategy = execConcurrently . 
  Data.Foldable.toList where
    execConcurrently actionlist = ReaderT (\pool -> 
     withRunInIO (\asIO -> parallel_ pool (map asIO actionlist)))
-- ^ execute the callbacks concurrently on a pool of worker threads

I'd be quite happy with m = StateT s IO where s holds the most recent
time stamp and value for each data address. The simplest way to
broadcast state in this monad would be: 

broadcast :: Monoid s => TVar s -> StateT s IO () -> StateT s IO ()
broadcast var (StateT f) = StateT (\s -> do
  s' <- f s
  atomically (modifyTVar ref (\s -> s <> s')))

That is, we run the StateT action and send the changes to the TVar
afterwards. But there are two more problems with this:

(1) The concurrent callbacks all want to atomically modify the same
TVar. This is a concurrency bottleneck and effectively implements Isaac
Elliott's Locks. Hence it seems that giving each data address its own
value TVar could allow more parallelism. 

(2) StateT is inherently sequential, which defeats concurrentStrategy.
In fact the unliftio-core package states:
> Note that, in order to meet the laws given below, the intuition is that a monad must have no monadic state, but may have monadic context. This essentially limits MonadUnliftIO to ReaderT and IdentityT transformers on top of IO.
Intuitively, there is no way to safely and concurrently modify the same
state. Nice example of how the type system guides you towards doing the
right thing. 

Therefore I will go back to what Chris Smith and Bryan Richter
suggested, and use ReaderT (TVar s) individually for each value. And
your suggestion of TMVars is also good in finer granularity, I think.
In the following, m can have a MonadUnliftIO instance. 

unStateT :: MonadIO m => TMVar s -> 
  Callback (StateT s m) -> Callback m
unStateT var cb = \value time -> do
    before <- (liftIO.atomically) (takeTMVar var)
    -- TMVar is now empty, no other thread can access it
    after <- execStateT (cb value time) before
    (liftIO.atomically) (putTMVar var after)
    -- TMVar now full again

Thanks everyone for helping!
Olaf 


[1] https://github.com/mlabs-haskell/opc-xml-da-client
[2] I suppose in FP-land we'd name these continuations, not callbacks.
Not sure whether the continuation monad abstraction buys me any
advantage here, though. 

On Sat, 2021-09-04 at 18:50 +0800, YueCompl wrote:
> Oh, a bugfix: the new tail reference should be returned after update, so a thread local stream reference can technically be cached.
> 
> And I realize this is more than you originally need, never mind if it's not so useful to you.
> 
> 
> data ValueNode a = ValueNode
>   { node'value :: a,
>     node'timestamp :: Timestamp,
>     node'next :: ValueSink a
>   }
> 
> type ValueSink a = TMVar (ValueNode a)
> 
> type Timestamp = Int
> 
> seekTail ::
>   forall a.
>   (a -> Timestamp -> a -> Timestamp -> a) ->
>   ValueSink a ->
>   STM (ValueSink a, Maybe (ValueNode a))
> seekTail f sink = go sink Nothing
>   where
>     go ::
>       ValueSink a ->
>       Maybe (ValueNode a) ->
>       STM (ValueSink a, Maybe (ValueNode a))
>     go ref prevNode =
>       tryReadTMVar ref >>= \case
>         Nothing -> return (ref, prevNode)
>         Just self@(ValueNode spotVal spotTs nxt) ->
>           go nxt $
>             Just
>               self
>                 { node'value = case prevNode of
>                     Nothing -> spotVal
>                     Just (ValueNode prevVal prevTs _prevNxt) ->
>                       f prevVal prevTs spotVal spotTs
>                 }
> 
> updateValue ::
>   forall a m.
>   MonadIO m =>
>   (Maybe (a, Timestamp) -> m (a, Timestamp)) ->
>   ValueSink a ->
>   m (ValueSink a)
> updateValue f sink = do
>   (tailRef, tailNode) <- liftIO $ atomically $ seekTail justLatest sink
>   case tailNode of
>     Nothing -> do
>       (myVal, myTs) <- f Nothing
>       liftIO $
>         atomically $ do
>           nxt <- newEmptyTMVar
>           void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt
>           return nxt
>     Just (ValueNode spotVal spotTs spotNxt) -> do
>       (myVal, myTs) <- f $ Just (spotVal, spotTs)
>       newNxt <- liftIO newEmptyTMVarIO
>       let newTail = ValueNode myVal myTs newNxt
> 
>           putAsNewTailOrDiscard :: ValueSink a -> STM ()
>           putAsNewTailOrDiscard nodeRef =
>             putTMVar nodeRef newTail `orElse` yetOther'sTail
>             where
>               yetOther'sTail = do
>                 (ValueNode _other'sVal other'sTs other'sNxt) <-
>                   readTMVar nodeRef
>                 if other'sTs >= myTs
>                   then return ()
>                   else putAsNewTailOrDiscard other'sNxt
> 
>       liftIO $ atomically $ putAsNewTailOrDiscard spotNxt
>       return spotNxt
>   where
>     justLatest :: (a -> Timestamp -> a -> Timestamp -> a)
>     justLatest _prevVal _prevTs spotVal _spotTs = spotVal
> 
> -- Each concurrent thread is supposed to have its local 'ValueSink' reference
> -- "cached" over time, but keep in mind that for any such thread who is slow
> -- in unfolding the value stream, the historical values will pile up in heap.
> 
> 
> > On 2021-09-04, at 18:42, YueCompl <compl.yue at icloud.com> wrote:
> > 
> > I'd like to add a new feature that you can fold the historic value stream in deriving the new state value, then it becomes:
> > 
> > 
> > data ValueNode a = ValueNode
> >   { node'value :: a,
> >     node'timestamp :: Timestamp,
> >     node'next :: ValueSink a
> >   }
> > 
> > type ValueSink a = TMVar (ValueNode a)
> > 
> > type Timestamp = Int
> > 
> > seekTail ::
> >   forall a.
> >   (a -> Timestamp -> a -> Timestamp -> a) ->
> >   ValueSink a ->
> >   STM (ValueSink a, Maybe (ValueNode a))
> > seekTail f sink = go sink Nothing
> >   where
> >     go ::
> >       ValueSink a ->
> >       Maybe (ValueNode a) ->
> >       STM (ValueSink a, Maybe (ValueNode a))
> >     go ref prevNode =
> >       tryReadTMVar ref >>= \case
> >         Nothing -> return (ref, prevNode)
> >         Just self@(ValueNode spotVal spotTs nxt) ->
> >           go nxt $
> >             Just
> >               self
> >                 { node'value = case prevNode of
> >                     Nothing -> spotVal
> >                     Just (ValueNode prevVal prevTs _prevNxt) ->
> >                       f prevVal prevTs spotVal spotTs
> >                 }
> > 
> > updateValue ::
> >   forall a m.
> >   MonadIO m =>
> >   (Maybe (a, Timestamp) -> m (a, Timestamp)) ->
> >   ValueSink a ->
> >   m ()
> > updateValue f sink = do
> >   (tailRef, tailNode) <- liftIO $ atomically $ seekTail justLatest sink
> >   case tailNode of
> >     Nothing -> do
> >       (myVal, myTs) <- f Nothing
> >       liftIO $
> >         atomically $ do
> >           nxt <- newEmptyTMVar
> >           void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt
> >     Just (ValueNode spotVal spotTs spotNxt) -> do
> >       (myVal, myTs) <- f $ Just (spotVal, spotTs)
> >       newNxt <- liftIO newEmptyTMVarIO
> >       let newTail = ValueNode myVal myTs newNxt
> > 
> >           putAsNewTailOrDiscard :: ValueSink a -> STM ()
> >           putAsNewTailOrDiscard nodeRef =
> >             putTMVar nodeRef newTail `orElse` yetOther'sTail
> >             where
> >               yetOther'sTail = do
> >                 (ValueNode _other'sVal other'sTs other'sNxt) <-
> >                   readTMVar nodeRef
> >                 if other'sTs >= myTs
> >                   then return ()
> >                   else putAsNewTailOrDiscard other'sNxt
> > 
> >       liftIO $ atomically $ putAsNewTailOrDiscard spotNxt
> >   where
> >     justLatest :: (a -> Timestamp -> a -> Timestamp -> a)
> >     justLatest _prevVal _prevTs spotVal _spotTs = spotVal
> > 
> > -- Each concurrent thread is supposed to have its local 'ValueSink' reference
> > -- "cached" over time, but keep in mind that for any such thread who is slow
> > -- in unfolding the value stream, the historical values will pile up in heap.
> > 
> > 
> > 
> > > On 2021-09-03, at 16:26, YueCompl <compl.yue at icloud.com <mailto:compl.yue at icloud.com>> wrote:
> > > 
> > > It's a bit sad that I'm not so mathematically minded to understand you in that abstract level. But I have a more imperative solution in my mind, wrt the question:
> > > 
> > > > "server, tell me if there is a value of x newer than t." 
> > > 
> > > and do further mutate-or-giveup, like this:
> > > 
> > > 
> > > data ValueNode a = ValueNode
> > >   { node'value :: a,
> > >     node'timestamp :: Timestamp,
> > >     node'next :: ValueSink a
> > >   }
> > > 
> > > type ValueSink a = TMVar (ValueNode a)
> > > 
> > > type Timestamp = Int
> > > 
> > > seekTail :: ValueSink a -> STM (ValueSink a, Maybe (ValueNode a))
> > > seekTail sink = go sink Nothing
> > >   where
> > >     go ref ancestor =
> > >       tryReadTMVar ref >>= \case
> > >         Nothing -> return (ref, ancestor)
> > >         Just self@(ValueNode _ _ nxt) -> go nxt $ Just self
> > > 
> > > updateValue ::
> > >   forall a m.
> > >   MonadIO m =>
> > >   (Maybe (a, Timestamp) -> m (a, Timestamp)) ->
> > >   ValueSink a ->
> > >   m ()
> > > updateValue f sink = do
> > >   (tailRef, tailNode) <- liftIO $ atomically $ seekTail sink
> > >   case tailNode of
> > >     Nothing -> do
> > >       (myVal, myTs) <- f Nothing
> > >       liftIO $
> > >         atomically $ do
> > >           nxt <- newEmptyTMVar
> > >           void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt
> > >     Just (ValueNode seenVal seenTs seenNxt) -> do
> > >       (myVal, myTs) <- f $ Just (seenVal, seenTs)
> > >       newNxt <- liftIO newEmptyTMVarIO
> > >       let newTail = ValueNode myVal myTs newNxt
> > > 
> > >           putAsNewTailOrDiscard :: ValueSink a -> STM ()
> > >           putAsNewTailOrDiscard nodeRef =
> > >             putTMVar nodeRef newTail `orElse` yetOther'sTail
> > >             where
> > >               yetOther'sTail = do
> > >                 (ValueNode _other'sVal other'sTs other'sNxt) <-
> > >                   readTMVar nodeRef
> > >                 if other'sTs >= myTs
> > >                   then return ()
> > >                   else putAsNewTailOrDiscard other'sNxt
> > > 
> > >       liftIO $ atomically $ putAsNewTailOrDiscard seenNxt
> > > 
> > > -- Each concurrent thread is supposed to have its local 'ValueSink' reference
> > > -- "cached" over time, but keep in mind that for any such thread who is slow
> > > -- in unfolding the value stream, the historical values will pile up in heap.
> > > 
> > > 
> > > 
> > > > On 2021-09-03, at 01:42, Olaf Klinke <olf at aatal-apotheke.de <mailto:olf at aatal-apotheke.de>> wrote:
> > > > 
> > > > On Fri, 2021-09-03 at 00:00 +0800, YueCompl wrote:
> > > > > Um, I'm not sure I understand your case right, but if the "mutation" instead of the "mutated result" can be (might non-trivially) computed from a possibly outdated state, and the "mutation" can be trivially applied, I think `modifyTVar'` is the way to go. `readTVar` can be used to obtain an almost up-to-date state on demand, at low frequency.
> > > > 
> > > > To be concrete, my state is a collection of time stamped values, where
> > > > the monoid operation overwrites old values with new ones. 
> > > > But I need to know the current state (x,t) to determine the "mutation",
> > > > because I'll be asking questions like "server, tell me if there is a
> > > > value of x newer than t." 
> > > > Any observer whose initial state is synchronized with the worker thread
> > > > can in principle re-construct the worker's internal state by observing
> > > > the stream of emitted "mutations". 
> > > > 
> > > > The most general abstraction would be that of a monoid action on a
> > > > type, but in my case the monoid (mutations) and the mutated type are
> > > > identical. 
> > > > 
> > > > act :: m -> a -> a
> > > > act memtpy = id
> > > > act (x <> y) = act x . act y -- monoid homomorphism
> > > > act (x <> x) = act x         -- idempotent
> > > > 
> > > > Olaf
> > > > 




More information about the Haskell-Cafe mailing list