[Haskell-cafe] question about conduit source

Clark Gaebel cgaebel at csclub.uwaterloo.ca
Tue Feb 28 20:58:46 CET 2012


First of all, I'd probably name that operator >=<, since >=> is Kleisli
composition in Control.Monad.

Second, you're going to need new threads for this, since you'll be reading
from two sources concurrently. This isn't as big a problem as you might
think, because Haskell threads are dirt cheap, orders of magnitude cheaper
than pthread threads. If you're using multiple threads with conduits, I
just wrote a library to help you out with that! As Michael already
mentioned, stm-conduit could do this synchronization for you. This turns
your >=< function into:

infixl 5 >=<
(>=<) :: ResourceIO m
      => Source m a
      -> Source m a
      -> ResourceT m (Source m a)
sa >=< sb = do c <- liftIO . atomically $ newTMChan
               _ <- resourceForkIO $ sa $$ sinkTMChan c
               _ <- resourceForkIO $ sb $$ sinkTMChan c
               return $ sourceTMChan c

which returns a new source, combining two sources.

This can further be generalized to combining any number of sources:

mergeSources :: ResourceIO m
             => [Source m a]
             -> ResourceT m (Source m a)
mergeSources sx = do c <- liftIO . atomically $ newTMChan
                       mapM_ (\s -> resourceForkIO $ s $$ sinkTMChan c) sx
                       return $ sourceTMChan c

Hope this helps somewhat,
  - clark

On Tue, Feb 28, 2012 at 11:04 AM, Alexander V Vershilov <
alexander.vershilov at gmail.com> wrote:
>
> Hello, cafe.
>
> Is it possible to read data from different concurrent sources,
> i.e. read data from source as soon as it become avaliable, e.g.
>
>  runResourceT $ (source1 stdin $= CL.map Left)
>                   >=> (source2 handle $= CL.map Right)
>              $= application
>              $$ sink
>    where >=> - stands for concurrent combining of sources
>
> It would be good if it can be sources of different types (handle or
> STM channel, etc..).
>
> Currently I've found no good way to handle with this situation,
> except of using STM Channels for collecting data
>
>   source1 ---+            |
>              |   sink     |                       output sink
>              +---] Channel [-------> application----->]
>              |          source
>   source2 ---+            |
>
> From this point of view application takes concurent data, but this
> implementation requires additional thread per data processing. Also
> in many cases it will require run additional runResourceT (see later
> example).
>
> So if there any possible simplifications? Or ideas how to make (>=>)
> operator.
>
> Example:
>
> So I've got next code in my network-conduit based application:
>
>   main :: IO ()
>   main = do
>     pool <- createDBPool "..." 10
>     let r = ServerInit pool
>     forkIO $ forever clientConsole --read channel list and send "Left"
>     flip runReaderT r $
>       runTCPServer (ServerSettings 3500 Nothing) (protoServer)
>
>   myServer src sink = do
>    ch <- liftIO $ atomically $ newTBMChan 16
>    initState <- lift $ ask
>    _  <- liftIO $ fork . (flip runReaderT initState) $
>                   runResourceT $ src $= C.sequence decode
>                                      $= CL.map Right $$ sinkTBMChan ch
>    sourceTBMChan ch
>                 $= process $= C.sequence encode $$ sinkHandle stdout
>
> But in this situation I don't know if freeing of all resources are
guaranteed,
> because I'm running additional resourceT in main resourceT scope.
>
> So can you advice is it possible to make concurrent sources now with
currenly
> implemented library?
> If it's not possible but worth of implementing, so I can make that
functions?
> Is it correct to runResourceT inside another resourceT?
>
> --
> Best regards,
>   Alexander V Vershilov
>
> _______________________________________________
> Haskell-Cafe mailing list
> Haskell-Cafe at haskell.org
> http://www.haskell.org/mailman/listinfo/haskell-cafe
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://www.haskell.org/pipermail/haskell-cafe/attachments/20120228/9bd51278/attachment-0001.htm>


More information about the Haskell-Cafe mailing list