[Haskell-cafe] synchronous channels in STM

Andrea Vezzosi sanzhiyan at gmail.com
Wed Oct 8 21:41:12 EDT 2008


2008/10/9 Claus Reinke <claus.reinke at talk21.com>

> I was wondering if it was possible to implement synchronous channels
>> within STM. In particular, I'd like to have CSP-like send and recv
>> primitives
>> on a channel that each block until the other side arrives to complete
>> the transaction.
>>
>
> Assuming that retry blocks until something changes, you could associate
> a channel with a thread that encapsulates the transaction. Somewhat like
> this?


You don't need an additional channel thread:

module SyncChan (SyncChan, send, recv, newSyncChan) where

import Control.Concurrent.STM
import Control.Monad
import Control.Concurrent

newtype SyncChan a = SC { unSC :: TVar (State a) }

data State a = Ready | Sent a | Received

newSyncChan :: STM (SyncChan a)
newSyncChan = SC `fmap` newTVar Ready

send :: SyncChan a -> a -> IO ()
send (SC chan) x = do
    atomically $ unsafeSend chan x
    atomically $ waitReceiver chan

recv :: SyncChan a -> STM a
recv (SC chan) = do
  s <- readTVar chan
  case s of
    Sent s -> writeTVar chan Received >> return s
    _ -> retry

unsafeSend chan x = do
  s <- readTVar chan
  case s of
    Ready -> writeTVar chan (Sent x)
    _        -> retry

waitReceiver chan = do
  s <- readTVar chan
  case s of
    Received -> writeTVar chan Ready
    _            -> retry

x |> f = fmap f x

test b = do
  (x,y) <- atomically $ liftM2 (,) newSyncChan newSyncChan
  forkIO $ join $ atomically $ -- since recv is in STM you can wait on
multiple channels at the same time
             (recv x |> print)
             `mplus`
             (recv y |> print)
  if b
     then send x 'a'
     else send y 1

as a bonus you can also try to send to the first available among multiple
channels:
(this formulation uses ExistentialQuantification but it's just a
convenience)

data Sending a = forall b. Sending (SyncChan b) b a

sendMulti :: [Sending a] -> IO a
sendMulti [] = fail "empty"
sendMulti xs = do (m,r) <- atomically $ msum $ map sending xs
                  atomically m
                  return r

sending :: Sending t -> STM (STM (), t)
sending (Sending (SC chan) x k) = do
  unsafeSend chan x
  return (waitReceiver chan,k)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://www.haskell.org/pipermail/haskell-cafe/attachments/20081009/6ceac905/attachment.htm


More information about the Haskell-Cafe mailing list