[Haskell-cafe] question about conduit source

Clark Gaebel cgaebel at csclub.uwaterloo.ca
Tue Feb 28 21:08:07 CET 2012


Finally, I've uploaded a new version of stm-conduit [1] with these
combinators included. You should "cabal update" and then "cabal install
stm-conduit" to get the latest version, and now you can vertically compose
your sources!

Regards,
  - clark

[1] http://hackage.haskell.org/package/stm-conduit-0.2.3.0

On Tue, Feb 28, 2012 at 2:58 PM, Clark Gaebel
<cgaebel at csclub.uwaterloo.ca>wrote:

> First of all, I'd probably name that operator >=<, since >=> is Kleisli
> composition in Control.Monad.
>
> Second, you're going to need new threads for this, since you'll be reading
> from two sources concurrently. This isn't as big a problem as you might
> think, because Haskell threads are dirt cheap, orders of magnitude cheaper
> than pthread threads. If you're using multiple threads with conduits, I
> just wrote a library to help you out with that! As Michael already
> mentioned, stm-conduit could do this synchronization for you. This turns
> your >=< function into:
>
> infixl 5 >=<
> (>=<) :: ResourceIO m
>       => Source m a
>       -> Source m a
>       -> ResourceT m (Source m a)
> sa >=< sb = do c <- liftIO . atomically $ newTMChan
>                _ <- resourceForkIO $ sa $$ sinkTMChan c
>                _ <- resourceForkIO $ sb $$ sinkTMChan c
>                return $ sourceTMChan c
>
> which returns a new source, combining two sources.
>
> This can further be generalized to combining any number of sources:
>
> mergeSources :: ResourceIO m
>              => [Source m a]
>              -> ResourceT m (Source m a)
> mergeSources sx = do c <- liftIO . atomically $ newTMChan
>                        mapM_ (\s -> resourceForkIO $ s $$ sinkTMChan c) sx
>                        return $ sourceTMChan c
>
> Hope this helps somewhat,
>   - clark
>
>
> On Tue, Feb 28, 2012 at 11:04 AM, Alexander V Vershilov <
> alexander.vershilov at gmail.com> wrote:
> >
> > Hello, cafe.
> >
> > Is it possible to read data from different concurrent sources,
> > i.e. read data from source as soon as it become avaliable, e.g.
> >
> >  runResourceT $ (source1 stdin $= CL.map Left)
> >                   >=> (source2 handle $= CL.map Right)
> >              $= application
> >              $$ sink
> >    where >=> - stands for concurrent combining of sources
> >
> > It would be good if it can be sources of different types (handle or
> > STM channel, etc..).
> >
> > Currently I've found no good way to handle with this situation,
> > except of using STM Channels for collecting data
> >
> >   source1 ---+            |
> >              |   sink     |                       output sink
> >              +---] Channel [-------> application----->]
> >              |          source
> >   source2 ---+            |
> >
> > From this point of view application takes concurent data, but this
> > implementation requires additional thread per data processing. Also
> > in many cases it will require run additional runResourceT (see later
> > example).
> >
> > So if there any possible simplifications? Or ideas how to make (>=>)
> > operator.
> >
> > Example:
> >
> > So I've got next code in my network-conduit based application:
> >
> >   main :: IO ()
> >   main = do
> >     pool <- createDBPool "..." 10
> >     let r = ServerInit pool
> >     forkIO $ forever clientConsole --read channel list and send "Left"
> >     flip runReaderT r $
> >       runTCPServer (ServerSettings 3500 Nothing) (protoServer)
> >
> >   myServer src sink = do
> >    ch <- liftIO $ atomically $ newTBMChan 16
> >    initState <- lift $ ask
> >    _  <- liftIO $ fork . (flip runReaderT initState) $
> >                   runResourceT $ src $= C.sequence decode
> >                                      $= CL.map Right $$ sinkTBMChan ch
> >    sourceTBMChan ch
> >                 $= process $= C.sequence encode $$ sinkHandle stdout
> >
> > But in this situation I don't know if freeing of all resources are
> guaranteed,
> > because I'm running additional resourceT in main resourceT scope.
> >
> > So can you advice is it possible to make concurrent sources now with
> currenly
> > implemented library?
> > If it's not possible but worth of implementing, so I can make that
> functions?
> > Is it correct to runResourceT inside another resourceT?
> >
> > --
> > Best regards,
> >   Alexander V Vershilov
> >
> > _______________________________________________
> > Haskell-Cafe mailing list
> > Haskell-Cafe at haskell.org
> > http://www.haskell.org/mailman/listinfo/haskell-cafe
> >
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://www.haskell.org/pipermail/haskell-cafe/attachments/20120228/4c63c75d/attachment.htm>


More information about the Haskell-Cafe mailing list