[Haskell-cafe] broadcasting stateful computations

YueCompl compl.yue at icloud.com
Fri Sep 3 08:26:10 UTC 2021


It's a bit sad that I'm not so mathematically minded to understand you in that abstract level. But I have a more imperative solution in my mind, wrt the question:

> "server, tell me if there is a value of x newer than t." 


and do further mutate-or-giveup, like this:


data ValueNode a = ValueNode
  { node'value :: a,
    node'timestamp :: Timestamp,
    node'next :: ValueSink a
  }

type ValueSink a = TMVar (ValueNode a)

type Timestamp = Int

seekTail :: ValueSink a -> STM (ValueSink a, Maybe (ValueNode a))
seekTail sink = go sink Nothing
  where
    go ref ancestor =
      tryReadTMVar ref >>= \case
        Nothing -> return (ref, ancestor)
        Just self@(ValueNode _ _ nxt) -> go nxt $ Just self

updateValue ::
  forall a m.
  MonadIO m =>
  (Maybe (a, Timestamp) -> m (a, Timestamp)) ->
  ValueSink a ->
  m ()
updateValue f sink = do
  (tailRef, tailNode) <- liftIO $ atomically $ seekTail sink
  case tailNode of
    Nothing -> do
      (myVal, myTs) <- f Nothing
      liftIO $
        atomically $ do
          nxt <- newEmptyTMVar
          void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt
    Just (ValueNode seenVal seenTs seenNxt) -> do
      (myVal, myTs) <- f $ Just (seenVal, seenTs)
      newNxt <- liftIO newEmptyTMVarIO
      let newTail = ValueNode myVal myTs newNxt

          putAsNewTailOrDiscard :: ValueSink a -> STM ()
          putAsNewTailOrDiscard nodeRef =
            putTMVar nodeRef newTail `orElse` yetOther'sTail
            where
              yetOther'sTail = do
                (ValueNode _other'sVal other'sTs other'sNxt) <-
                  readTMVar nodeRef
                if other'sTs >= myTs
                  then return ()
                  else putAsNewTailOrDiscard other'sNxt

      liftIO $ atomically $ putAsNewTailOrDiscard seenNxt

-- Each concurrent thread is supposed to have its local 'ValueSink' reference
-- "cached" over time, but keep in mind that for any such thread who is slow
-- in unfolding the value stream, the historical values will pile up in heap.



> On 2021-09-03, at 01:42, Olaf Klinke <olf at aatal-apotheke.de> wrote:
> 
> On Fri, 2021-09-03 at 00:00 +0800, YueCompl wrote:
>> Um, I'm not sure I understand your case right, but if the "mutation" instead of the "mutated result" can be (might non-trivially) computed from a possibly outdated state, and the "mutation" can be trivially applied, I think `modifyTVar'` is the way to go. `readTVar` can be used to obtain an almost up-to-date state on demand, at low frequency.
> 
> To be concrete, my state is a collection of time stamped values, where
> the monoid operation overwrites old values with new ones. 
> But I need to know the current state (x,t) to determine the "mutation",
> because I'll be asking questions like "server, tell me if there is a
> value of x newer than t." 
> Any observer whose initial state is synchronized with the worker thread
> can in principle re-construct the worker's internal state by observing
> the stream of emitted "mutations". 
> 
> The most general abstraction would be that of a monoid action on a
> type, but in my case the monoid (mutations) and the mutated type are
> identical. 
> 
> act :: m -> a -> a
> act memtpy = id
> act (x <> y) = act x . act y -- monoid homomorphism
> act (x <> x) = act x         -- idempotent
> 
> Olaf
> 

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/haskell-cafe/attachments/20210903/20844dfa/attachment.html>


More information about the Haskell-Cafe mailing list