[Haskell-cafe] A Sliding TChan?

Mark Fine mark.fine at gmail.com
Tue Feb 9 17:23:30 UTC 2016


Hey Thomas,

Thanks for the implementation ideas! It's worked out great for us and
introduced a lot of stability in our system! Thanks again for your help!

Mark

On Wed, Feb 3, 2016 at 5:42 PM, Thomas Horstmeyer <
horstmey at mathematik.uni-marburg.de> wrote:

> Hi Mark,
>
> your question made me take a look at the TChan implementation, which I
> always had wanted to do (but never had the time). To test my understanding,
> I sketched a TChan variation that should solve the problem. (A test with
> one sender and two receivers showed expected output but I did not measure
> memory usage.)
>
> The sender replaces older messages with a marker. This should make the
> content available to the garbage collector (if it is not referenced by a
> receiver who has read it). On reading a marker, a receiver skips directly
> to the next valid message.
>
> On the downside, the sender keeps a reference to the last n messages, so
> they will not be garbage collected even if every receiver has read them.
>
> Thomas
>
>
> {-# LANGUAGE CPP, DeriveDataTypeable #-}
>
> module Control.Concurrent.STM.TBBroadcast(
> #ifdef __GLASGOW_HASKELL__
>   TSender, TReceiver,
>   newSender, newSenderIO, writeBC,
>   newReceiver, readBC
> #endif
> ) where
>
> #ifdef __GLASGOW_HASKELL__
>
> import GHC.Conc
> import Data.Typeable (Typeable)
>
>
> data TSender a = Sender {-# UNPACK #-} !(TVar Int)
>                         {-# UNPACK #-} !(TVar (TVarList a))
>                         {-# UNPACK #-} !(TVar (TVarList a))
>   deriving (Eq, Typeable)
>
>
> type TVarList a = TVar (TList a)
> data TList a = TNil
>              | TCons a {-# UNPACK #-} !(TVarList a)
>              | Outdated {-# UNPACK #-} !(TVar (TVarList a))
>
>
> newSender :: Int -> STM (TSender a)
> newSender n | n <= 0    = error "windows size must be >=0"
>             | otherwise = do
>   hole <- newTVar TNil
>   first <- newTVar hole
>   end <- newTVar hole
>   count <- newTVar n
>   return (Sender count first end)
>
> newSenderIO :: Int -> IO (TSender a)
> newSenderIO n | n <= 0    = error "windows size must be >=0"
>             | otherwise = do
>   hole <- newTVarIO TNil
>   first <- newTVarIO hole
>   end <- newTVarIO hole
>   count <- newTVarIO n
>   return (Sender count first end)
>
>
>
> writeBC :: TSender a -> a -> STM ()
> writeBC (Sender count first end) a = do
>   listend <- readTVar end
>   new_listend <- newTVar TNil
>   writeTVar listend (TCons a new_listend)
>   writeTVar end new_listend
>   n <- readTVar count
>   case n of
>      0 -> do
>              listhead <- readTVar first
>              head <- readTVar listhead
>              case head of
>                TCons _ tl -> writeTVar first tl
>              writeTVar listhead (Outdated first)
>      _ -> writeTVar count $! (n-1)
>
>
> data TReceiver a = Receiver {-# UNPACK #-} !(TVar (TVarList a))
>
> newReceiver :: TSender a -> STM (TReceiver a)
> newReceiver (Sender _ _ end) = do
>   hole <- readTVar end
>   first <-newTVar hole
>   return (Receiver first)
>
>
> readBC :: TReceiver a -> STM a
> readBC (Receiver first) = do
>   listhead <- readTVar first
>   head <- readTVar listhead
>   case head of
>     TNil -> retry
>     TCons a tl -> do
>       writeTVar first tl
>       return a
>     Outdated next -> do
>       next' <- readTVar next
>       writeTVar first next'
>       readBC (Receiver first)
>
> #endif
>
>
> Am 28.01.2016 um 20:30 schrieb Mark Fine:
>
>> We're currently using a TMChan to broadcast from a single producer
>> thread to many consumer threads. This works well! However, we're seeing
>> issues with a fast producer and/or a slow consumer, with the channel
>> growing unbounded. Fortunately, our producer-consumer communication is
>> time-sensitive and tolerant of loss: we're ok with the producer always
>> writing at the expense of dropping communication to a slow consumer.
>>
>> A TMBChan provides a bounded channel (but no means to dupe/broadcast)
>> where a writer will block once the channel fills up. In our use case,
>> we'd like to continue writing to the channel but dropping off the end of
>> the channel. Clojure's core-async module has some related concepts, in
>> particular the notion of a sliding buffer
>> <https://clojure.github.io/core.async/#clojure.core.async/sliding-buffer>
>> that
>> drops the oldest elements once full. Has anyone encountered something
>> similar in working with channels and/or have any solutions? Thanks!
>>
>> Mark
>>
>>
>> _______________________________________________
>> Haskell-Cafe mailing list
>> Haskell-Cafe at haskell.org
>> http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe
>>
>>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/haskell-cafe/attachments/20160209/cd10348b/attachment-0001.html>


More information about the Haskell-Cafe mailing list