<div dir="ltr">Hey Thomas,<div><br></div><div>Thanks for the implementation ideas! It's worked out great for us and introduced a lot of stability in our system! Thanks again for your help!</div><div><br></div><div>Mark</div></div><div class="gmail_extra"><br><div class="gmail_quote">On Wed, Feb 3, 2016 at 5:42 PM, Thomas Horstmeyer <span dir="ltr"><<a href="mailto:horstmey@mathematik.uni-marburg.de" target="_blank">horstmey@mathematik.uni-marburg.de</a>></span> wrote:<br><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">Hi Mark,<br>
<br>
your question made me take a look at the TChan implementation, which I always had wanted to do (but never had the time). To test my understanding, I sketched a TChan variation that should solve the problem. (A test with one sender and two receivers showed expected output but I did not measure memory usage.)<br>
<br>
The sender replaces older messages with a marker. This should make the content available to the garbage collector (if it is not referenced by a receiver who has read it). On reading a marker, a receiver skips directly to the next valid message.<br>
<br>
On the downside, the sender keeps a reference to the last n messages, so they will not be garbage collected even if every receiver has read them.<br>
<br>
Thomas<br>
<br>
<br>
{-# LANGUAGE CPP, DeriveDataTypeable #-}<br>
<br>
module Control.Concurrent.STM.TBBroadcast(<br>
#ifdef __GLASGOW_HASKELL__<br>
  TSender, TReceiver,<br>
  newSender, newSenderIO, writeBC,<br>
  newReceiver, readBC<br>
#endif<br>
) where<br>
<br>
#ifdef __GLASGOW_HASKELL__<br>
<br>
import GHC.Conc<br>
import Data.Typeable (Typeable)<br>
<br>
<br>
data TSender a = Sender {-# UNPACK #-} !(TVar Int)<br>
                        {-# UNPACK #-} !(TVar (TVarList a))<br>
                        {-# UNPACK #-} !(TVar (TVarList a))<br>
  deriving (Eq, Typeable)<br>
<br>
<br>
type TVarList a = TVar (TList a)<br>
data TList a = TNil<br>
             | TCons a {-# UNPACK #-} !(TVarList a)<br>
             | Outdated {-# UNPACK #-} !(TVar (TVarList a))<br>
<br>
<br>
newSender :: Int -> STM (TSender a)<br>
newSender n | n <= 0    = error "windows size must be >=0"<br>
            | otherwise = do<br>
  hole <- newTVar TNil<br>
  first <- newTVar hole<br>
  end <- newTVar hole<br>
  count <- newTVar n<br>
  return (Sender count first end)<br>
<br>
newSenderIO :: Int -> IO (TSender a)<br>
newSenderIO n | n <= 0    = error "windows size must be >=0"<br>
            | otherwise = do<br>
  hole <- newTVarIO TNil<br>
  first <- newTVarIO hole<br>
  end <- newTVarIO hole<br>
  count <- newTVarIO n<br>
  return (Sender count first end)<br>
<br>
<br>
<br>
writeBC :: TSender a -> a -> STM ()<br>
writeBC (Sender count first end) a = do<br>
  listend <- readTVar end<br>
  new_listend <- newTVar TNil<br>
  writeTVar listend (TCons a new_listend)<br>
  writeTVar end new_listend<br>
  n <- readTVar count<br>
  case n of<br>
     0 -> do<br>
             listhead <- readTVar first<br>
             head <- readTVar listhead<br>
             case head of<br>
               TCons _ tl -> writeTVar first tl<br>
             writeTVar listhead (Outdated first)<br>
     _ -> writeTVar count $! (n-1)<br>
<br>
<br>
data TReceiver a = Receiver {-# UNPACK #-} !(TVar (TVarList a))<br>
<br>
newReceiver :: TSender a -> STM (TReceiver a)<br>
newReceiver (Sender _ _ end) = do<br>
  hole <- readTVar end<br>
  first <-newTVar hole<br>
  return (Receiver first)<br>
<br>
<br>
readBC :: TReceiver a -> STM a<br>
readBC (Receiver first) = do<br>
  listhead <- readTVar first<br>
  head <- readTVar listhead<br>
  case head of<br>
    TNil -> retry<br>
    TCons a tl -> do<br>
      writeTVar first tl<br>
      return a<br>
    Outdated next -> do<br>
      next' <- readTVar next<br>
      writeTVar first next'<br>
      readBC (Receiver first)<br>
<br>
#endif<span class=""><br>
<br>
<br>
Am 28.01.2016 um 20:30 schrieb Mark Fine:<br>
</span><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex"><span class="">
We're currently using a TMChan to broadcast from a single producer<br>
thread to many consumer threads. This works well! However, we're seeing<br>
issues with a fast producer and/or a slow consumer, with the channel<br>
growing unbounded. Fortunately, our producer-consumer communication is<br>
time-sensitive and tolerant of loss: we're ok with the producer always<br>
writing at the expense of dropping communication to a slow consumer.<br>
<br>
A TMBChan provides a bounded channel (but no means to dupe/broadcast)<br>
where a writer will block once the channel fills up. In our use case,<br>
we'd like to continue writing to the channel but dropping off the end of<br>
the channel. Clojure's core-async module has some related concepts, in<br>
particular the notion of a sliding buffer<br></span>
<<a href="https://clojure.github.io/core.async/#clojure.core.async/sliding-buffer" rel="noreferrer" target="_blank">https://clojure.github.io/core.async/#clojure.core.async/sliding-buffer</a>> that<span class=""><br>
drops the oldest elements once full. Has anyone encountered something<br>
similar in working with channels and/or have any solutions? Thanks!<br>
<br>
Mark<br>
<br>
<br></span><span class="">
_______________________________________________<br>
Haskell-Cafe mailing list<br>
<a href="mailto:Haskell-Cafe@haskell.org" target="_blank">Haskell-Cafe@haskell.org</a><br>
<a href="http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe" rel="noreferrer" target="_blank">http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe</a><br>
<br>
</span></blockquote>
</blockquote></div><br></div>