[Haskell-cafe] A Sliding TChan?

Thomas Horstmeyer horstmey at Mathematik.Uni-Marburg.de
Thu Feb 4 01:42:14 UTC 2016


Hi Mark,

your question made me take a look at the TChan implementation, which I 
always had wanted to do (but never had the time). To test my 
understanding, I sketched a TChan variation that should solve the 
problem. (A test with one sender and two receivers showed expected 
output but I did not measure memory usage.)

The sender replaces older messages with a marker. This should make the 
content available to the garbage collector (if it is not referenced by a 
receiver who has read it). On reading a marker, a receiver skips 
directly to the next valid message.

On the downside, the sender keeps a reference to the last n messages, so 
they will not be garbage collected even if every receiver has read them.

Thomas


{-# LANGUAGE CPP, DeriveDataTypeable #-}

module Control.Concurrent.STM.TBBroadcast(
#ifdef __GLASGOW_HASKELL__
   TSender, TReceiver,
   newSender, newSenderIO, writeBC,
   newReceiver, readBC
#endif
) where

#ifdef __GLASGOW_HASKELL__

import GHC.Conc
import Data.Typeable (Typeable)


data TSender a = Sender {-# UNPACK #-} !(TVar Int)
                         {-# UNPACK #-} !(TVar (TVarList a))
                         {-# UNPACK #-} !(TVar (TVarList a))
   deriving (Eq, Typeable)


type TVarList a = TVar (TList a)
data TList a = TNil
              | TCons a {-# UNPACK #-} !(TVarList a)
              | Outdated {-# UNPACK #-} !(TVar (TVarList a))


newSender :: Int -> STM (TSender a)
newSender n | n <= 0    = error "windows size must be >=0"
             | otherwise = do
   hole <- newTVar TNil
   first <- newTVar hole
   end <- newTVar hole
   count <- newTVar n
   return (Sender count first end)

newSenderIO :: Int -> IO (TSender a)
newSenderIO n | n <= 0    = error "windows size must be >=0"
             | otherwise = do
   hole <- newTVarIO TNil
   first <- newTVarIO hole
   end <- newTVarIO hole
   count <- newTVarIO n
   return (Sender count first end)



writeBC :: TSender a -> a -> STM ()
writeBC (Sender count first end) a = do
   listend <- readTVar end
   new_listend <- newTVar TNil
   writeTVar listend (TCons a new_listend)
   writeTVar end new_listend
   n <- readTVar count
   case n of
      0 -> do
              listhead <- readTVar first
              head <- readTVar listhead
              case head of
                TCons _ tl -> writeTVar first tl
              writeTVar listhead (Outdated first)
      _ -> writeTVar count $! (n-1)


data TReceiver a = Receiver {-# UNPACK #-} !(TVar (TVarList a))

newReceiver :: TSender a -> STM (TReceiver a)
newReceiver (Sender _ _ end) = do
   hole <- readTVar end
   first <-newTVar hole
   return (Receiver first)


readBC :: TReceiver a -> STM a
readBC (Receiver first) = do
   listhead <- readTVar first
   head <- readTVar listhead
   case head of
     TNil -> retry
     TCons a tl -> do
       writeTVar first tl
       return a
     Outdated next -> do
       next' <- readTVar next
       writeTVar first next'
       readBC (Receiver first)

#endif


Am 28.01.2016 um 20:30 schrieb Mark Fine:
> We're currently using a TMChan to broadcast from a single producer
> thread to many consumer threads. This works well! However, we're seeing
> issues with a fast producer and/or a slow consumer, with the channel
> growing unbounded. Fortunately, our producer-consumer communication is
> time-sensitive and tolerant of loss: we're ok with the producer always
> writing at the expense of dropping communication to a slow consumer.
>
> A TMBChan provides a bounded channel (but no means to dupe/broadcast)
> where a writer will block once the channel fills up. In our use case,
> we'd like to continue writing to the channel but dropping off the end of
> the channel. Clojure's core-async module has some related concepts, in
> particular the notion of a sliding buffer
> <https://clojure.github.io/core.async/#clojure.core.async/sliding-buffer> that
> drops the oldest elements once full. Has anyone encountered something
> similar in working with channels and/or have any solutions? Thanks!
>
> Mark
>
>
> _______________________________________________
> Haskell-Cafe mailing list
> Haskell-Cafe at haskell.org
> http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe
>


More information about the Haskell-Cafe mailing list