[Haskell-cafe] broadcasting stateful computations

Olaf Klinke olf at aatal-apotheke.de
Mon Sep 6 16:10:27 UTC 2021


On Mon, 2021-09-06 at 18:27 +0800, YueCompl wrote:
> I happen to be pondering with ideas about mutable yet shared data/state, unlike the actor model, a Turing machine has no notation of external change notification, I wonder this limitation propagates to lambda calculus, so there seem be no approved ways in pure functional paradigm, to handle concurrent mutation on shared state, since it's not modeled in the first place.
> 

Concurrency is hard. All we can do is provide shorthand notation for
commonly used idioms. 
Consider another widely used shared state modifier: Version control
systems like SVN or git. For true concurrency you need a way of
conflict resolution, which your state may or may not support. Databases
(and STM) have rollbacks in case of conflicts as a simple kind of
conflict resolution. 
My conflict resolution now comprises fragmentation of the state into
small data address components, akin to row locks in a database. 

> From your updated description, I guess `atomicModifyIORef` (or maybe the strict version `atomicModifyIORef'` more desirable?) might work for you as well, and it's further more performant than 'TVar' based.
> 

I am a newbie to STM. I was hoping that STM provides some of the
aforementioned commonly used idioms and is therefore safer to use than
naked IORef. I do not want to worry about race conditions and
deadlocks, the library should to that for me. 

> Btw, just come to my mind about the timestamps, if they are of high precision and come from different server nodes, you would not trust their total ordering, since small drifts are allowed/inevitable even the clocks are actively synchronized.

Indeed I do not trust the timestamps and I never compare timestamps
obatained from different servers. However, I do trust that if I obtain
a timestamp t0 from server A and later ask server A for a timestamp t1,
then t1 >= t0 holds. 

Olaf


> 
> > On 2021-09-06, at 17:04, Olaf Klinke <olf at aatal-apotheke.de> wrote:
> > 
> > Dear Compl, 
> > 
> > thanks for putting so much thought into this. Regarding your
> > suggestion, I'm afraid I misled you somehow or I don't understand the
> > purpose of your code. I don't need the entire history of each value,
> > only the most recent one. Remark: The ValueNode looks like a ListT
> > (like in list-t package) over the STM monad.
> > 
> > We're developing a gateway between two data protocols. We have a Python
> > implementation, but it is not parallel enough and message parsing is
> > too slow. Therefore we hired MLabs to implement the source protocol in
> > Haskell [1]. The gateway does the following. 
> > 
> > 1. Parent thread reads a config and forks several concurrent child
> > threads, each talking to a different server. The state of the children
> > is read through TVars.
> > 2. The child thread polls data from its assigned server, decodes the
> > message and executes pre-defined callbacks [2], which are functions
> >    type Callback m = Value -> UTCTime -> m ().
> > 3. Each data packet can contain several data items and I want these
> > callbacks to be executed in parallel, too. This is because we expect
> > the network IO to be the slowest part in the entire process. I use the
> > parallel-io package for that. 
> > 
> > import Control.Monad.Reader
> > import Control.Monad.IO.Unlift (MonadUnliftIO(..))
> > import Control.Concurrent.ParallelIO.Local (Pool,parallel_)
> > 
> > type CallbackStrategy m n = forall f. Foldable f => 
> >   f (m ()) -> n ()
> > sequentialStrategy :: Applicative m => CallbackStrategy m m
> > sequentialStrategy = Data.Foldable.sequenceA_    
> > -- ^ just sequence the callbacks
> > 
> > concurrentStrategy :: MonadUnliftIO m => 
> >   CallbackStrategy m (ReaderT Pool m)
> > concurrentStrategy = execConcurrently . 
> >  Data.Foldable.toList where
> >    execConcurrently actionlist = ReaderT (\pool -> 
> >     withRunInIO (\asIO -> parallel_ pool (map asIO actionlist)))
> > -- ^ execute the callbacks concurrently on a pool of worker threads
> > 
> > I'd be quite happy with m = StateT s IO where s holds the most recent
> > time stamp and value for each data address. The simplest way to
> > broadcast state in this monad would be: 
> > 
> > broadcast :: Monoid s => TVar s -> StateT s IO () -> StateT s IO ()
> > broadcast var (StateT f) = StateT (\s -> do
> >  s' <- f s
> >  atomically (modifyTVar ref (\s -> s <> s')))
> > 
> > That is, we run the StateT action and send the changes to the TVar
> > afterwards. But there are two more problems with this:
> > 
> > (1) The concurrent callbacks all want to atomically modify the same
> > TVar. This is a concurrency bottleneck and effectively implements Isaac
> > Elliott's Locks. Hence it seems that giving each data address its own
> > value TVar could allow more parallelism. 
> > 
> > (2) StateT is inherently sequential, which defeats concurrentStrategy.
> > In fact the unliftio-core package states:
> > > Note that, in order to meet the laws given below, the intuition is that a monad must have no monadic state, but may have monadic context. This essentially limits MonadUnliftIO to ReaderT and IdentityT transformers on top of IO.
> > Intuitively, there is no way to safely and concurrently modify the same
> > state. Nice example of how the type system guides you towards doing the
> > right thing. 
> > 
> > Therefore I will go back to what Chris Smith and Bryan Richter
> > suggested, and use ReaderT (TVar s) individually for each value. And
> > your suggestion of TMVars is also good in finer granularity, I think.
> > In the following, m can have a MonadUnliftIO instance. 
> > 
> > unStateT :: MonadIO m => TMVar s -> 
> >  Callback (StateT s m) -> Callback m
> > unStateT var cb = \value time -> do
> >    before <- (liftIO.atomically) (takeTMVar var)
> >    -- TMVar is now empty, no other thread can access it
> >    after <- execStateT (cb value time) before
> >    (liftIO.atomically) (putTMVar var after)
> >    -- TMVar now full again
> > 
> > Thanks everyone for helping!
> > Olaf 
> > 
> > 
> > [1] https://github.com/mlabs-haskell/opc-xml-da-client <https://github.com/mlabs-haskell/opc-xml-da-client>
> > [2] I suppose in FP-land we'd name these continuations, not callbacks.
> > Not sure whether the continuation monad abstraction buys me any
> > advantage here, though. 
> > 
> > On Sat, 2021-09-04 at 18:50 +0800, YueCompl wrote:
> > > Oh, a bugfix: the new tail reference should be returned after update, so a thread local stream reference can technically be cached.
> > > 
> > > And I realize this is more than you originally need, never mind if it's not so useful to you.
> > > 
> > > 
> > > data ValueNode a = ValueNode
> > >  { node'value :: a,
> > >    node'timestamp :: Timestamp,
> > >    node'next :: ValueSink a
> > >  }
> > > 
> > > type ValueSink a = TMVar (ValueNode a)
> > > 
> > > type Timestamp = Int
> > > 
> > > seekTail ::
> > >  forall a.
> > >  (a -> Timestamp -> a -> Timestamp -> a) ->
> > >  ValueSink a ->
> > >  STM (ValueSink a, Maybe (ValueNode a))
> > > seekTail f sink = go sink Nothing
> > >  where
> > >    go ::
> > >      ValueSink a ->
> > >      Maybe (ValueNode a) ->
> > >      STM (ValueSink a, Maybe (ValueNode a))
> > >    go ref prevNode =
> > >      tryReadTMVar ref >>= \case
> > >        Nothing -> return (ref, prevNode)
> > >        Just self@(ValueNode spotVal spotTs nxt) ->
> > >          go nxt $
> > >            Just
> > >              self
> > >                { node'value = case prevNode of
> > >                    Nothing -> spotVal
> > >                    Just (ValueNode prevVal prevTs _prevNxt) ->
> > >                      f prevVal prevTs spotVal spotTs
> > >                }
> > > 
> > > updateValue ::
> > >  forall a m.
> > >  MonadIO m =>
> > >  (Maybe (a, Timestamp) -> m (a, Timestamp)) ->
> > >  ValueSink a ->
> > >  m (ValueSink a)
> > > updateValue f sink = do
> > >  (tailRef, tailNode) <- liftIO $ atomically $ seekTail justLatest sink
> > >  case tailNode of
> > >    Nothing -> do
> > >      (myVal, myTs) <- f Nothing
> > >      liftIO $
> > >        atomically $ do
> > >          nxt <- newEmptyTMVar
> > >          void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt
> > >          return nxt
> > >    Just (ValueNode spotVal spotTs spotNxt) -> do
> > >      (myVal, myTs) <- f $ Just (spotVal, spotTs)
> > >      newNxt <- liftIO newEmptyTMVarIO
> > >      let newTail = ValueNode myVal myTs newNxt
> > > 
> > >          putAsNewTailOrDiscard :: ValueSink a -> STM ()
> > >          putAsNewTailOrDiscard nodeRef =
> > >            putTMVar nodeRef newTail `orElse` yetOther'sTail
> > >            where
> > >              yetOther'sTail = do
> > >                (ValueNode _other'sVal other'sTs other'sNxt) <-
> > >                  readTMVar nodeRef
> > >                if other'sTs >= myTs
> > >                  then return ()
> > >                  else putAsNewTailOrDiscard other'sNxt
> > > 
> > >      liftIO $ atomically $ putAsNewTailOrDiscard spotNxt
> > >      return spotNxt
> > >  where
> > >    justLatest :: (a -> Timestamp -> a -> Timestamp -> a)
> > >    justLatest _prevVal _prevTs spotVal _spotTs = spotVal
> > > 
> > > -- Each concurrent thread is supposed to have its local 'ValueSink' reference
> > > -- "cached" over time, but keep in mind that for any such thread who is slow
> > > -- in unfolding the value stream, the historical values will pile up in heap.
> > > 
> > > 
> > > > On 2021-09-04, at 18:42, YueCompl <compl.yue at icloud.com> wrote:
> > > > 
> > > > I'd like to add a new feature that you can fold the historic value stream in deriving the new state value, then it becomes:
> > > > 
> > > > 
> > > > data ValueNode a = ValueNode
> > > >  { node'value :: a,
> > > >    node'timestamp :: Timestamp,
> > > >    node'next :: ValueSink a
> > > >  }
> > > > 
> > > > type ValueSink a = TMVar (ValueNode a)
> > > > 
> > > > type Timestamp = Int
> > > > 
> > > > seekTail ::
> > > >  forall a.
> > > >  (a -> Timestamp -> a -> Timestamp -> a) ->
> > > >  ValueSink a ->
> > > >  STM (ValueSink a, Maybe (ValueNode a))
> > > > seekTail f sink = go sink Nothing
> > > >  where
> > > >    go ::
> > > >      ValueSink a ->
> > > >      Maybe (ValueNode a) ->
> > > >      STM (ValueSink a, Maybe (ValueNode a))
> > > >    go ref prevNode =
> > > >      tryReadTMVar ref >>= \case
> > > >        Nothing -> return (ref, prevNode)
> > > >        Just self@(ValueNode spotVal spotTs nxt) ->
> > > >          go nxt $
> > > >            Just
> > > >              self
> > > >                { node'value = case prevNode of
> > > >                    Nothing -> spotVal
> > > >                    Just (ValueNode prevVal prevTs _prevNxt) ->
> > > >                      f prevVal prevTs spotVal spotTs
> > > >                }
> > > > 
> > > > updateValue ::
> > > >  forall a m.
> > > >  MonadIO m =>
> > > >  (Maybe (a, Timestamp) -> m (a, Timestamp)) ->
> > > >  ValueSink a ->
> > > >  m ()
> > > > updateValue f sink = do
> > > >  (tailRef, tailNode) <- liftIO $ atomically $ seekTail justLatest sink
> > > >  case tailNode of
> > > >    Nothing -> do
> > > >      (myVal, myTs) <- f Nothing
> > > >      liftIO $
> > > >        atomically $ do
> > > >          nxt <- newEmptyTMVar
> > > >          void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt
> > > >    Just (ValueNode spotVal spotTs spotNxt) -> do
> > > >      (myVal, myTs) <- f $ Just (spotVal, spotTs)
> > > >      newNxt <- liftIO newEmptyTMVarIO
> > > >      let newTail = ValueNode myVal myTs newNxt
> > > > 
> > > >          putAsNewTailOrDiscard :: ValueSink a -> STM ()
> > > >          putAsNewTailOrDiscard nodeRef =
> > > >            putTMVar nodeRef newTail `orElse` yetOther'sTail
> > > >            where
> > > >              yetOther'sTail = do
> > > >                (ValueNode _other'sVal other'sTs other'sNxt) <-
> > > >                  readTMVar nodeRef
> > > >                if other'sTs >= myTs
> > > >                  then return ()
> > > >                  else putAsNewTailOrDiscard other'sNxt
> > > > 
> > > >      liftIO $ atomically $ putAsNewTailOrDiscard spotNxt
> > > >  where
> > > >    justLatest :: (a -> Timestamp -> a -> Timestamp -> a)
> > > >    justLatest _prevVal _prevTs spotVal _spotTs = spotVal
> > > > 
> > > > -- Each concurrent thread is supposed to have its local 'ValueSink' reference
> > > > -- "cached" over time, but keep in mind that for any such thread who is slow
> > > > -- in unfolding the value stream, the historical values will pile up in heap.
> > > > 
> > > > 
> > > > 
> > > > > On 2021-09-03, at 16:26, YueCompl <compl.yue at icloud.com <mailto:compl.yue at icloud.com> <mailto:compl.yue at icloud.com <mailto:compl.yue at icloud.com>>> wrote:
> > > > > 
> > > > > It's a bit sad that I'm not so mathematically minded to understand you in that abstract level. But I have a more imperative solution in my mind, wrt the question:
> > > > > 
> > > > > > "server, tell me if there is a value of x newer than t." 
> > > > > 
> > > > > and do further mutate-or-giveup, like this:
> > > > > 
> > > > > 
> > > > > data ValueNode a = ValueNode
> > > > >  { node'value :: a,
> > > > >    node'timestamp :: Timestamp,
> > > > >    node'next :: ValueSink a
> > > > >  }
> > > > > 
> > > > > type ValueSink a = TMVar (ValueNode a)
> > > > > 
> > > > > type Timestamp = Int
> > > > > 
> > > > > seekTail :: ValueSink a -> STM (ValueSink a, Maybe (ValueNode a))
> > > > > seekTail sink = go sink Nothing
> > > > >  where
> > > > >    go ref ancestor =
> > > > >      tryReadTMVar ref >>= \case
> > > > >        Nothing -> return (ref, ancestor)
> > > > >        Just self@(ValueNode _ _ nxt) -> go nxt $ Just self
> > > > > 
> > > > > updateValue ::
> > > > >  forall a m.
> > > > >  MonadIO m =>
> > > > >  (Maybe (a, Timestamp) -> m (a, Timestamp)) ->
> > > > >  ValueSink a ->
> > > > >  m ()
> > > > > updateValue f sink = do
> > > > >  (tailRef, tailNode) <- liftIO $ atomically $ seekTail sink
> > > > >  case tailNode of
> > > > >    Nothing -> do
> > > > >      (myVal, myTs) <- f Nothing
> > > > >      liftIO $
> > > > >        atomically $ do
> > > > >          nxt <- newEmptyTMVar
> > > > >          void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt
> > > > >    Just (ValueNode seenVal seenTs seenNxt) -> do
> > > > >      (myVal, myTs) <- f $ Just (seenVal, seenTs)
> > > > >      newNxt <- liftIO newEmptyTMVarIO
> > > > >      let newTail = ValueNode myVal myTs newNxt
> > > > > 
> > > > >          putAsNewTailOrDiscard :: ValueSink a -> STM ()
> > > > >          putAsNewTailOrDiscard nodeRef =
> > > > >            putTMVar nodeRef newTail `orElse` yetOther'sTail
> > > > >            where
> > > > >              yetOther'sTail = do
> > > > >                (ValueNode _other'sVal other'sTs other'sNxt) <-
> > > > >                  readTMVar nodeRef
> > > > >                if other'sTs >= myTs
> > > > >                  then return ()
> > > > >                  else putAsNewTailOrDiscard other'sNxt
> > > > > 
> > > > >      liftIO $ atomically $ putAsNewTailOrDiscard seenNxt
> > > > > 
> > > > > -- Each concurrent thread is supposed to have its local 'ValueSink' reference
> > > > > -- "cached" over time, but keep in mind that for any such thread who is slow
> > > > > -- in unfolding the value stream, the historical values will pile up in heap.
> > > > > 
> > > > > 
> > > > > 
> > > > > > On 2021-09-03, at 01:42, Olaf Klinke <olf at aatal-apotheke.de <mailto:olf at aatal-apotheke.de> <mailto:olf at aatal-apotheke.de <mailto:olf at aatal-apotheke.de>>> wrote:
> > > > > > 
> > > > > > On Fri, 2021-09-03 at 00:00 +0800, YueCompl wrote:
> > > > > > > Um, I'm not sure I understand your case right, but if the "mutation" instead of the "mutated result" can be (might non-trivially) computed from a possibly outdated state, and the "mutation" can be trivially applied, I think `modifyTVar'` is the way to go. `readTVar` can be used to obtain an almost up-to-date state on demand, at low frequency.
> > > > > > 
> > > > > > To be concrete, my state is a collection of time stamped values, where
> > > > > > the monoid operation overwrites old values with new ones. 
> > > > > > But I need to know the current state (x,t) to determine the "mutation",
> > > > > > because I'll be asking questions like "server, tell me if there is a
> > > > > > value of x newer than t." 
> > > > > > Any observer whose initial state is synchronized with the worker thread
> > > > > > can in principle re-construct the worker's internal state by observing
> > > > > > the stream of emitted "mutations". 
> > > > > > 
> > > > > > The most general abstraction would be that of a monoid action on a
> > > > > > type, but in my case the monoid (mutations) and the mutated type are
> > > > > > identical. 
> > > > > > 
> > > > > > act :: m -> a -> a
> > > > > > act memtpy = id
> > > > > > act (x <> y) = act x . act y -- monoid homomorphism
> > > > > > act (x <> x) = act x         -- idempotent
> > > > > > 
> > > > > > Olaf




More information about the Haskell-Cafe mailing list