[Haskell-cafe] broadcasting stateful computations

YueCompl compl.yue at icloud.com
Sat Sep 4 10:50:47 UTC 2021


Oh, a bugfix: the new tail reference should be returned after update, so a thread local stream reference can technically be cached.

And I realize this is more than you originally need, never mind if it's not so useful to you.


data ValueNode a = ValueNode
  { node'value :: a,
    node'timestamp :: Timestamp,
    node'next :: ValueSink a
  }

type ValueSink a = TMVar (ValueNode a)

type Timestamp = Int

seekTail ::
  forall a.
  (a -> Timestamp -> a -> Timestamp -> a) ->
  ValueSink a ->
  STM (ValueSink a, Maybe (ValueNode a))
seekTail f sink = go sink Nothing
  where
    go ::
      ValueSink a ->
      Maybe (ValueNode a) ->
      STM (ValueSink a, Maybe (ValueNode a))
    go ref prevNode =
      tryReadTMVar ref >>= \case
        Nothing -> return (ref, prevNode)
        Just self@(ValueNode spotVal spotTs nxt) ->
          go nxt $
            Just
              self
                { node'value = case prevNode of
                    Nothing -> spotVal
                    Just (ValueNode prevVal prevTs _prevNxt) ->
                      f prevVal prevTs spotVal spotTs
                }

updateValue ::
  forall a m.
  MonadIO m =>
  (Maybe (a, Timestamp) -> m (a, Timestamp)) ->
  ValueSink a ->
  m (ValueSink a)
updateValue f sink = do
  (tailRef, tailNode) <- liftIO $ atomically $ seekTail justLatest sink
  case tailNode of
    Nothing -> do
      (myVal, myTs) <- f Nothing
      liftIO $
        atomically $ do
          nxt <- newEmptyTMVar
          void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt
          return nxt
    Just (ValueNode spotVal spotTs spotNxt) -> do
      (myVal, myTs) <- f $ Just (spotVal, spotTs)
      newNxt <- liftIO newEmptyTMVarIO
      let newTail = ValueNode myVal myTs newNxt

          putAsNewTailOrDiscard :: ValueSink a -> STM ()
          putAsNewTailOrDiscard nodeRef =
            putTMVar nodeRef newTail `orElse` yetOther'sTail
            where
              yetOther'sTail = do
                (ValueNode _other'sVal other'sTs other'sNxt) <-
                  readTMVar nodeRef
                if other'sTs >= myTs
                  then return ()
                  else putAsNewTailOrDiscard other'sNxt

      liftIO $ atomically $ putAsNewTailOrDiscard spotNxt
      return spotNxt
  where
    justLatest :: (a -> Timestamp -> a -> Timestamp -> a)
    justLatest _prevVal _prevTs spotVal _spotTs = spotVal

-- Each concurrent thread is supposed to have its local 'ValueSink' reference
-- "cached" over time, but keep in mind that for any such thread who is slow
-- in unfolding the value stream, the historical values will pile up in heap.


> On 2021-09-04, at 18:42, YueCompl <compl.yue at icloud.com> wrote:
> 
> I'd like to add a new feature that you can fold the historic value stream in deriving the new state value, then it becomes:
> 
> 
> data ValueNode a = ValueNode
>   { node'value :: a,
>     node'timestamp :: Timestamp,
>     node'next :: ValueSink a
>   }
> 
> type ValueSink a = TMVar (ValueNode a)
> 
> type Timestamp = Int
> 
> seekTail ::
>   forall a.
>   (a -> Timestamp -> a -> Timestamp -> a) ->
>   ValueSink a ->
>   STM (ValueSink a, Maybe (ValueNode a))
> seekTail f sink = go sink Nothing
>   where
>     go ::
>       ValueSink a ->
>       Maybe (ValueNode a) ->
>       STM (ValueSink a, Maybe (ValueNode a))
>     go ref prevNode =
>       tryReadTMVar ref >>= \case
>         Nothing -> return (ref, prevNode)
>         Just self@(ValueNode spotVal spotTs nxt) ->
>           go nxt $
>             Just
>               self
>                 { node'value = case prevNode of
>                     Nothing -> spotVal
>                     Just (ValueNode prevVal prevTs _prevNxt) ->
>                       f prevVal prevTs spotVal spotTs
>                 }
> 
> updateValue ::
>   forall a m.
>   MonadIO m =>
>   (Maybe (a, Timestamp) -> m (a, Timestamp)) ->
>   ValueSink a ->
>   m ()
> updateValue f sink = do
>   (tailRef, tailNode) <- liftIO $ atomically $ seekTail justLatest sink
>   case tailNode of
>     Nothing -> do
>       (myVal, myTs) <- f Nothing
>       liftIO $
>         atomically $ do
>           nxt <- newEmptyTMVar
>           void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt
>     Just (ValueNode spotVal spotTs spotNxt) -> do
>       (myVal, myTs) <- f $ Just (spotVal, spotTs)
>       newNxt <- liftIO newEmptyTMVarIO
>       let newTail = ValueNode myVal myTs newNxt
> 
>           putAsNewTailOrDiscard :: ValueSink a -> STM ()
>           putAsNewTailOrDiscard nodeRef =
>             putTMVar nodeRef newTail `orElse` yetOther'sTail
>             where
>               yetOther'sTail = do
>                 (ValueNode _other'sVal other'sTs other'sNxt) <-
>                   readTMVar nodeRef
>                 if other'sTs >= myTs
>                   then return ()
>                   else putAsNewTailOrDiscard other'sNxt
> 
>       liftIO $ atomically $ putAsNewTailOrDiscard spotNxt
>   where
>     justLatest :: (a -> Timestamp -> a -> Timestamp -> a)
>     justLatest _prevVal _prevTs spotVal _spotTs = spotVal
> 
> -- Each concurrent thread is supposed to have its local 'ValueSink' reference
> -- "cached" over time, but keep in mind that for any such thread who is slow
> -- in unfolding the value stream, the historical values will pile up in heap.
> 
> 
> 
>> On 2021-09-03, at 16:26, YueCompl <compl.yue at icloud.com <mailto:compl.yue at icloud.com>> wrote:
>> 
>> It's a bit sad that I'm not so mathematically minded to understand you in that abstract level. But I have a more imperative solution in my mind, wrt the question:
>> 
>>> "server, tell me if there is a value of x newer than t." 
>> 
>> 
>> and do further mutate-or-giveup, like this:
>> 
>> 
>> data ValueNode a = ValueNode
>>   { node'value :: a,
>>     node'timestamp :: Timestamp,
>>     node'next :: ValueSink a
>>   }
>> 
>> type ValueSink a = TMVar (ValueNode a)
>> 
>> type Timestamp = Int
>> 
>> seekTail :: ValueSink a -> STM (ValueSink a, Maybe (ValueNode a))
>> seekTail sink = go sink Nothing
>>   where
>>     go ref ancestor =
>>       tryReadTMVar ref >>= \case
>>         Nothing -> return (ref, ancestor)
>>         Just self@(ValueNode _ _ nxt) -> go nxt $ Just self
>> 
>> updateValue ::
>>   forall a m.
>>   MonadIO m =>
>>   (Maybe (a, Timestamp) -> m (a, Timestamp)) ->
>>   ValueSink a ->
>>   m ()
>> updateValue f sink = do
>>   (tailRef, tailNode) <- liftIO $ atomically $ seekTail sink
>>   case tailNode of
>>     Nothing -> do
>>       (myVal, myTs) <- f Nothing
>>       liftIO $
>>         atomically $ do
>>           nxt <- newEmptyTMVar
>>           void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt
>>     Just (ValueNode seenVal seenTs seenNxt) -> do
>>       (myVal, myTs) <- f $ Just (seenVal, seenTs)
>>       newNxt <- liftIO newEmptyTMVarIO
>>       let newTail = ValueNode myVal myTs newNxt
>> 
>>           putAsNewTailOrDiscard :: ValueSink a -> STM ()
>>           putAsNewTailOrDiscard nodeRef =
>>             putTMVar nodeRef newTail `orElse` yetOther'sTail
>>             where
>>               yetOther'sTail = do
>>                 (ValueNode _other'sVal other'sTs other'sNxt) <-
>>                   readTMVar nodeRef
>>                 if other'sTs >= myTs
>>                   then return ()
>>                   else putAsNewTailOrDiscard other'sNxt
>> 
>>       liftIO $ atomically $ putAsNewTailOrDiscard seenNxt
>> 
>> -- Each concurrent thread is supposed to have its local 'ValueSink' reference
>> -- "cached" over time, but keep in mind that for any such thread who is slow
>> -- in unfolding the value stream, the historical values will pile up in heap.
>> 
>> 
>> 
>>> On 2021-09-03, at 01:42, Olaf Klinke <olf at aatal-apotheke.de <mailto:olf at aatal-apotheke.de>> wrote:
>>> 
>>> On Fri, 2021-09-03 at 00:00 +0800, YueCompl wrote:
>>>> Um, I'm not sure I understand your case right, but if the "mutation" instead of the "mutated result" can be (might non-trivially) computed from a possibly outdated state, and the "mutation" can be trivially applied, I think `modifyTVar'` is the way to go. `readTVar` can be used to obtain an almost up-to-date state on demand, at low frequency.
>>> 
>>> To be concrete, my state is a collection of time stamped values, where
>>> the monoid operation overwrites old values with new ones. 
>>> But I need to know the current state (x,t) to determine the "mutation",
>>> because I'll be asking questions like "server, tell me if there is a
>>> value of x newer than t." 
>>> Any observer whose initial state is synchronized with the worker thread
>>> can in principle re-construct the worker's internal state by observing
>>> the stream of emitted "mutations". 
>>> 
>>> The most general abstraction would be that of a monoid action on a
>>> type, but in my case the monoid (mutations) and the mutated type are
>>> identical. 
>>> 
>>> act :: m -> a -> a
>>> act memtpy = id
>>> act (x <> y) = act x . act y -- monoid homomorphism
>>> act (x <> x) = act x         -- idempotent
>>> 
>>> Olaf
>>> 
>> 
> 

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/haskell-cafe/attachments/20210904/d16b1259/attachment-0001.html>


More information about the Haskell-Cafe mailing list