[Haskell-cafe] A Sliding TChan?
Thomas Horstmeyer
horstmey at Mathematik.Uni-Marburg.de
Thu Feb 4 01:42:14 UTC 2016
Hi Mark,
your question made me take a look at the TChan implementation, which I
always had wanted to do (but never had the time). To test my
understanding, I sketched a TChan variation that should solve the
problem. (A test with one sender and two receivers showed expected
output but I did not measure memory usage.)
The sender replaces older messages with a marker. This should make the
content available to the garbage collector (if it is not referenced by a
receiver who has read it). On reading a marker, a receiver skips
directly to the next valid message.
On the downside, the sender keeps a reference to the last n messages, so
they will not be garbage collected even if every receiver has read them.
Thomas
{-# LANGUAGE CPP, DeriveDataTypeable #-}
module Control.Concurrent.STM.TBBroadcast(
#ifdef __GLASGOW_HASKELL__
TSender, TReceiver,
newSender, newSenderIO, writeBC,
newReceiver, readBC
#endif
) where
#ifdef __GLASGOW_HASKELL__
import GHC.Conc
import Data.Typeable (Typeable)
data TSender a = Sender {-# UNPACK #-} !(TVar Int)
{-# UNPACK #-} !(TVar (TVarList a))
{-# UNPACK #-} !(TVar (TVarList a))
deriving (Eq, Typeable)
type TVarList a = TVar (TList a)
data TList a = TNil
| TCons a {-# UNPACK #-} !(TVarList a)
| Outdated {-# UNPACK #-} !(TVar (TVarList a))
newSender :: Int -> STM (TSender a)
newSender n | n <= 0 = error "windows size must be >=0"
| otherwise = do
hole <- newTVar TNil
first <- newTVar hole
end <- newTVar hole
count <- newTVar n
return (Sender count first end)
newSenderIO :: Int -> IO (TSender a)
newSenderIO n | n <= 0 = error "windows size must be >=0"
| otherwise = do
hole <- newTVarIO TNil
first <- newTVarIO hole
end <- newTVarIO hole
count <- newTVarIO n
return (Sender count first end)
writeBC :: TSender a -> a -> STM ()
writeBC (Sender count first end) a = do
listend <- readTVar end
new_listend <- newTVar TNil
writeTVar listend (TCons a new_listend)
writeTVar end new_listend
n <- readTVar count
case n of
0 -> do
listhead <- readTVar first
head <- readTVar listhead
case head of
TCons _ tl -> writeTVar first tl
writeTVar listhead (Outdated first)
_ -> writeTVar count $! (n-1)
data TReceiver a = Receiver {-# UNPACK #-} !(TVar (TVarList a))
newReceiver :: TSender a -> STM (TReceiver a)
newReceiver (Sender _ _ end) = do
hole <- readTVar end
first <-newTVar hole
return (Receiver first)
readBC :: TReceiver a -> STM a
readBC (Receiver first) = do
listhead <- readTVar first
head <- readTVar listhead
case head of
TNil -> retry
TCons a tl -> do
writeTVar first tl
return a
Outdated next -> do
next' <- readTVar next
writeTVar first next'
readBC (Receiver first)
#endif
Am 28.01.2016 um 20:30 schrieb Mark Fine:
> We're currently using a TMChan to broadcast from a single producer
> thread to many consumer threads. This works well! However, we're seeing
> issues with a fast producer and/or a slow consumer, with the channel
> growing unbounded. Fortunately, our producer-consumer communication is
> time-sensitive and tolerant of loss: we're ok with the producer always
> writing at the expense of dropping communication to a slow consumer.
>
> A TMBChan provides a bounded channel (but no means to dupe/broadcast)
> where a writer will block once the channel fills up. In our use case,
> we'd like to continue writing to the channel but dropping off the end of
> the channel. Clojure's core-async module has some related concepts, in
> particular the notion of a sliding buffer
> <https://clojure.github.io/core.async/#clojure.core.async/sliding-buffer> that
> drops the oldest elements once full. Has anyone encountered something
> similar in working with channels and/or have any solutions? Thanks!
>
> Mark
>
>
> _______________________________________________
> Haskell-Cafe mailing list
> Haskell-Cafe at haskell.org
> http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe
>
More information about the Haskell-Cafe
mailing list