[Haskell-cafe] broadcasting stateful computations
YueCompl
compl.yue at icloud.com
Sat Sep 4 10:50:47 UTC 2021
Oh, a bugfix: the new tail reference should be returned after update, so a thread local stream reference can technically be cached.
And I realize this is more than you originally need, never mind if it's not so useful to you.
data ValueNode a = ValueNode
{ node'value :: a,
node'timestamp :: Timestamp,
node'next :: ValueSink a
}
type ValueSink a = TMVar (ValueNode a)
type Timestamp = Int
seekTail ::
forall a.
(a -> Timestamp -> a -> Timestamp -> a) ->
ValueSink a ->
STM (ValueSink a, Maybe (ValueNode a))
seekTail f sink = go sink Nothing
where
go ::
ValueSink a ->
Maybe (ValueNode a) ->
STM (ValueSink a, Maybe (ValueNode a))
go ref prevNode =
tryReadTMVar ref >>= \case
Nothing -> return (ref, prevNode)
Just self@(ValueNode spotVal spotTs nxt) ->
go nxt $
Just
self
{ node'value = case prevNode of
Nothing -> spotVal
Just (ValueNode prevVal prevTs _prevNxt) ->
f prevVal prevTs spotVal spotTs
}
updateValue ::
forall a m.
MonadIO m =>
(Maybe (a, Timestamp) -> m (a, Timestamp)) ->
ValueSink a ->
m (ValueSink a)
updateValue f sink = do
(tailRef, tailNode) <- liftIO $ atomically $ seekTail justLatest sink
case tailNode of
Nothing -> do
(myVal, myTs) <- f Nothing
liftIO $
atomically $ do
nxt <- newEmptyTMVar
void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt
return nxt
Just (ValueNode spotVal spotTs spotNxt) -> do
(myVal, myTs) <- f $ Just (spotVal, spotTs)
newNxt <- liftIO newEmptyTMVarIO
let newTail = ValueNode myVal myTs newNxt
putAsNewTailOrDiscard :: ValueSink a -> STM ()
putAsNewTailOrDiscard nodeRef =
putTMVar nodeRef newTail `orElse` yetOther'sTail
where
yetOther'sTail = do
(ValueNode _other'sVal other'sTs other'sNxt) <-
readTMVar nodeRef
if other'sTs >= myTs
then return ()
else putAsNewTailOrDiscard other'sNxt
liftIO $ atomically $ putAsNewTailOrDiscard spotNxt
return spotNxt
where
justLatest :: (a -> Timestamp -> a -> Timestamp -> a)
justLatest _prevVal _prevTs spotVal _spotTs = spotVal
-- Each concurrent thread is supposed to have its local 'ValueSink' reference
-- "cached" over time, but keep in mind that for any such thread who is slow
-- in unfolding the value stream, the historical values will pile up in heap.
> On 2021-09-04, at 18:42, YueCompl <compl.yue at icloud.com> wrote:
>
> I'd like to add a new feature that you can fold the historic value stream in deriving the new state value, then it becomes:
>
>
> data ValueNode a = ValueNode
> { node'value :: a,
> node'timestamp :: Timestamp,
> node'next :: ValueSink a
> }
>
> type ValueSink a = TMVar (ValueNode a)
>
> type Timestamp = Int
>
> seekTail ::
> forall a.
> (a -> Timestamp -> a -> Timestamp -> a) ->
> ValueSink a ->
> STM (ValueSink a, Maybe (ValueNode a))
> seekTail f sink = go sink Nothing
> where
> go ::
> ValueSink a ->
> Maybe (ValueNode a) ->
> STM (ValueSink a, Maybe (ValueNode a))
> go ref prevNode =
> tryReadTMVar ref >>= \case
> Nothing -> return (ref, prevNode)
> Just self@(ValueNode spotVal spotTs nxt) ->
> go nxt $
> Just
> self
> { node'value = case prevNode of
> Nothing -> spotVal
> Just (ValueNode prevVal prevTs _prevNxt) ->
> f prevVal prevTs spotVal spotTs
> }
>
> updateValue ::
> forall a m.
> MonadIO m =>
> (Maybe (a, Timestamp) -> m (a, Timestamp)) ->
> ValueSink a ->
> m ()
> updateValue f sink = do
> (tailRef, tailNode) <- liftIO $ atomically $ seekTail justLatest sink
> case tailNode of
> Nothing -> do
> (myVal, myTs) <- f Nothing
> liftIO $
> atomically $ do
> nxt <- newEmptyTMVar
> void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt
> Just (ValueNode spotVal spotTs spotNxt) -> do
> (myVal, myTs) <- f $ Just (spotVal, spotTs)
> newNxt <- liftIO newEmptyTMVarIO
> let newTail = ValueNode myVal myTs newNxt
>
> putAsNewTailOrDiscard :: ValueSink a -> STM ()
> putAsNewTailOrDiscard nodeRef =
> putTMVar nodeRef newTail `orElse` yetOther'sTail
> where
> yetOther'sTail = do
> (ValueNode _other'sVal other'sTs other'sNxt) <-
> readTMVar nodeRef
> if other'sTs >= myTs
> then return ()
> else putAsNewTailOrDiscard other'sNxt
>
> liftIO $ atomically $ putAsNewTailOrDiscard spotNxt
> where
> justLatest :: (a -> Timestamp -> a -> Timestamp -> a)
> justLatest _prevVal _prevTs spotVal _spotTs = spotVal
>
> -- Each concurrent thread is supposed to have its local 'ValueSink' reference
> -- "cached" over time, but keep in mind that for any such thread who is slow
> -- in unfolding the value stream, the historical values will pile up in heap.
>
>
>
>> On 2021-09-03, at 16:26, YueCompl <compl.yue at icloud.com <mailto:compl.yue at icloud.com>> wrote:
>>
>> It's a bit sad that I'm not so mathematically minded to understand you in that abstract level. But I have a more imperative solution in my mind, wrt the question:
>>
>>> "server, tell me if there is a value of x newer than t."
>>
>>
>> and do further mutate-or-giveup, like this:
>>
>>
>> data ValueNode a = ValueNode
>> { node'value :: a,
>> node'timestamp :: Timestamp,
>> node'next :: ValueSink a
>> }
>>
>> type ValueSink a = TMVar (ValueNode a)
>>
>> type Timestamp = Int
>>
>> seekTail :: ValueSink a -> STM (ValueSink a, Maybe (ValueNode a))
>> seekTail sink = go sink Nothing
>> where
>> go ref ancestor =
>> tryReadTMVar ref >>= \case
>> Nothing -> return (ref, ancestor)
>> Just self@(ValueNode _ _ nxt) -> go nxt $ Just self
>>
>> updateValue ::
>> forall a m.
>> MonadIO m =>
>> (Maybe (a, Timestamp) -> m (a, Timestamp)) ->
>> ValueSink a ->
>> m ()
>> updateValue f sink = do
>> (tailRef, tailNode) <- liftIO $ atomically $ seekTail sink
>> case tailNode of
>> Nothing -> do
>> (myVal, myTs) <- f Nothing
>> liftIO $
>> atomically $ do
>> nxt <- newEmptyTMVar
>> void $ tryPutTMVar tailRef $ ValueNode myVal myTs nxt
>> Just (ValueNode seenVal seenTs seenNxt) -> do
>> (myVal, myTs) <- f $ Just (seenVal, seenTs)
>> newNxt <- liftIO newEmptyTMVarIO
>> let newTail = ValueNode myVal myTs newNxt
>>
>> putAsNewTailOrDiscard :: ValueSink a -> STM ()
>> putAsNewTailOrDiscard nodeRef =
>> putTMVar nodeRef newTail `orElse` yetOther'sTail
>> where
>> yetOther'sTail = do
>> (ValueNode _other'sVal other'sTs other'sNxt) <-
>> readTMVar nodeRef
>> if other'sTs >= myTs
>> then return ()
>> else putAsNewTailOrDiscard other'sNxt
>>
>> liftIO $ atomically $ putAsNewTailOrDiscard seenNxt
>>
>> -- Each concurrent thread is supposed to have its local 'ValueSink' reference
>> -- "cached" over time, but keep in mind that for any such thread who is slow
>> -- in unfolding the value stream, the historical values will pile up in heap.
>>
>>
>>
>>> On 2021-09-03, at 01:42, Olaf Klinke <olf at aatal-apotheke.de <mailto:olf at aatal-apotheke.de>> wrote:
>>>
>>> On Fri, 2021-09-03 at 00:00 +0800, YueCompl wrote:
>>>> Um, I'm not sure I understand your case right, but if the "mutation" instead of the "mutated result" can be (might non-trivially) computed from a possibly outdated state, and the "mutation" can be trivially applied, I think `modifyTVar'` is the way to go. `readTVar` can be used to obtain an almost up-to-date state on demand, at low frequency.
>>>
>>> To be concrete, my state is a collection of time stamped values, where
>>> the monoid operation overwrites old values with new ones.
>>> But I need to know the current state (x,t) to determine the "mutation",
>>> because I'll be asking questions like "server, tell me if there is a
>>> value of x newer than t."
>>> Any observer whose initial state is synchronized with the worker thread
>>> can in principle re-construct the worker's internal state by observing
>>> the stream of emitted "mutations".
>>>
>>> The most general abstraction would be that of a monoid action on a
>>> type, but in my case the monoid (mutations) and the mutated type are
>>> identical.
>>>
>>> act :: m -> a -> a
>>> act memtpy = id
>>> act (x <> y) = act x . act y -- monoid homomorphism
>>> act (x <> x) = act x -- idempotent
>>>
>>> Olaf
>>>
>>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/haskell-cafe/attachments/20210904/d16b1259/attachment-0001.html>
More information about the Haskell-Cafe
mailing list