[Haskell-cafe] A Sliding TChan?
Mark Fine
mark.fine at gmail.com
Tue Feb 9 17:23:30 UTC 2016
Hey Thomas,
Thanks for the implementation ideas! It's worked out great for us and
introduced a lot of stability in our system! Thanks again for your help!
Mark
On Wed, Feb 3, 2016 at 5:42 PM, Thomas Horstmeyer <
horstmey at mathematik.uni-marburg.de> wrote:
> Hi Mark,
>
> your question made me take a look at the TChan implementation, which I
> always had wanted to do (but never had the time). To test my understanding,
> I sketched a TChan variation that should solve the problem. (A test with
> one sender and two receivers showed expected output but I did not measure
> memory usage.)
>
> The sender replaces older messages with a marker. This should make the
> content available to the garbage collector (if it is not referenced by a
> receiver who has read it). On reading a marker, a receiver skips directly
> to the next valid message.
>
> On the downside, the sender keeps a reference to the last n messages, so
> they will not be garbage collected even if every receiver has read them.
>
> Thomas
>
>
> {-# LANGUAGE CPP, DeriveDataTypeable #-}
>
> module Control.Concurrent.STM.TBBroadcast(
> #ifdef __GLASGOW_HASKELL__
> TSender, TReceiver,
> newSender, newSenderIO, writeBC,
> newReceiver, readBC
> #endif
> ) where
>
> #ifdef __GLASGOW_HASKELL__
>
> import GHC.Conc
> import Data.Typeable (Typeable)
>
>
> data TSender a = Sender {-# UNPACK #-} !(TVar Int)
> {-# UNPACK #-} !(TVar (TVarList a))
> {-# UNPACK #-} !(TVar (TVarList a))
> deriving (Eq, Typeable)
>
>
> type TVarList a = TVar (TList a)
> data TList a = TNil
> | TCons a {-# UNPACK #-} !(TVarList a)
> | Outdated {-# UNPACK #-} !(TVar (TVarList a))
>
>
> newSender :: Int -> STM (TSender a)
> newSender n | n <= 0 = error "windows size must be >=0"
> | otherwise = do
> hole <- newTVar TNil
> first <- newTVar hole
> end <- newTVar hole
> count <- newTVar n
> return (Sender count first end)
>
> newSenderIO :: Int -> IO (TSender a)
> newSenderIO n | n <= 0 = error "windows size must be >=0"
> | otherwise = do
> hole <- newTVarIO TNil
> first <- newTVarIO hole
> end <- newTVarIO hole
> count <- newTVarIO n
> return (Sender count first end)
>
>
>
> writeBC :: TSender a -> a -> STM ()
> writeBC (Sender count first end) a = do
> listend <- readTVar end
> new_listend <- newTVar TNil
> writeTVar listend (TCons a new_listend)
> writeTVar end new_listend
> n <- readTVar count
> case n of
> 0 -> do
> listhead <- readTVar first
> head <- readTVar listhead
> case head of
> TCons _ tl -> writeTVar first tl
> writeTVar listhead (Outdated first)
> _ -> writeTVar count $! (n-1)
>
>
> data TReceiver a = Receiver {-# UNPACK #-} !(TVar (TVarList a))
>
> newReceiver :: TSender a -> STM (TReceiver a)
> newReceiver (Sender _ _ end) = do
> hole <- readTVar end
> first <-newTVar hole
> return (Receiver first)
>
>
> readBC :: TReceiver a -> STM a
> readBC (Receiver first) = do
> listhead <- readTVar first
> head <- readTVar listhead
> case head of
> TNil -> retry
> TCons a tl -> do
> writeTVar first tl
> return a
> Outdated next -> do
> next' <- readTVar next
> writeTVar first next'
> readBC (Receiver first)
>
> #endif
>
>
> Am 28.01.2016 um 20:30 schrieb Mark Fine:
>
>> We're currently using a TMChan to broadcast from a single producer
>> thread to many consumer threads. This works well! However, we're seeing
>> issues with a fast producer and/or a slow consumer, with the channel
>> growing unbounded. Fortunately, our producer-consumer communication is
>> time-sensitive and tolerant of loss: we're ok with the producer always
>> writing at the expense of dropping communication to a slow consumer.
>>
>> A TMBChan provides a bounded channel (but no means to dupe/broadcast)
>> where a writer will block once the channel fills up. In our use case,
>> we'd like to continue writing to the channel but dropping off the end of
>> the channel. Clojure's core-async module has some related concepts, in
>> particular the notion of a sliding buffer
>> <https://clojure.github.io/core.async/#clojure.core.async/sliding-buffer>
>> that
>> drops the oldest elements once full. Has anyone encountered something
>> similar in working with channels and/or have any solutions? Thanks!
>>
>> Mark
>>
>>
>> _______________________________________________
>> Haskell-Cafe mailing list
>> Haskell-Cafe at haskell.org
>> http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe
>>
>>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/haskell-cafe/attachments/20160209/cd10348b/attachment-0001.html>
More information about the Haskell-Cafe
mailing list