Thomas Horstmeyer horstmey at Mathematik.Uni-Marburg.de
Thu Feb 4 01:42:14 UTC 2016

```Hi Mark,

your question made me take a look at the TChan implementation, which I
always had wanted to do (but never had the time). To test my
understanding, I sketched a TChan variation that should solve the
problem. (A test with one sender and two receivers showed expected
output but I did not measure memory usage.)

The sender replaces older messages with a marker. This should make the
content available to the garbage collector (if it is not referenced by a
directly to the next valid message.

On the downside, the sender keeps a reference to the last n messages, so
they will not be garbage collected even if every receiver has read them.

Thomas

{-# LANGUAGE CPP, DeriveDataTypeable #-}

newSender, newSenderIO, writeBC,
#endif
) where

import GHC.Conc
import Data.Typeable (Typeable)

data TSender a = Sender {-# UNPACK #-} !(TVar Int)
{-# UNPACK #-} !(TVar (TVarList a))
{-# UNPACK #-} !(TVar (TVarList a))
deriving (Eq, Typeable)

type TVarList a = TVar (TList a)
data TList a = TNil
| TCons a {-# UNPACK #-} !(TVarList a)
| Outdated {-# UNPACK #-} !(TVar (TVarList a))

newSender :: Int -> STM (TSender a)
newSender n | n <= 0    = error "windows size must be >=0"
| otherwise = do
hole <- newTVar TNil
first <- newTVar hole
end <- newTVar hole
count <- newTVar n
return (Sender count first end)

newSenderIO :: Int -> IO (TSender a)
newSenderIO n | n <= 0    = error "windows size must be >=0"
| otherwise = do
hole <- newTVarIO TNil
first <- newTVarIO hole
end <- newTVarIO hole
count <- newTVarIO n
return (Sender count first end)

writeBC :: TSender a -> a -> STM ()
writeBC (Sender count first end) a = do
new_listend <- newTVar TNil
writeTVar listend (TCons a new_listend)
writeTVar end new_listend
case n of
0 -> do
TCons _ tl -> writeTVar first tl
_ -> writeTVar count \$! (n-1)

newReceiver (Sender _ _ end) = do
first <-newTVar hole

TNil -> retry
TCons a tl -> do
writeTVar first tl
return a
Outdated next -> do
writeTVar first next'

#endif

Am 28.01.2016 um 20:30 schrieb Mark Fine:
> We're currently using a TMChan to broadcast from a single producer
> thread to many consumer threads. This works well! However, we're seeing
> issues with a fast producer and/or a slow consumer, with the channel
> growing unbounded. Fortunately, our producer-consumer communication is
> time-sensitive and tolerant of loss: we're ok with the producer always
> writing at the expense of dropping communication to a slow consumer.
>
> A TMBChan provides a bounded channel (but no means to dupe/broadcast)
> where a writer will block once the channel fills up. In our use case,
> we'd like to continue writing to the channel but dropping off the end of
> the channel. Clojure's core-async module has some related concepts, in
> particular the notion of a sliding buffer
> <https://clojure.github.io/core.async/#clojure.core.async/sliding-buffer> that
> drops the oldest elements once full. Has anyone encountered something
> similar in working with channels and/or have any solutions? Thanks!
>
> Mark
>
>
> _______________________________________________