[Haskell-cafe] question about conduit source
Michael Snoyman
michael at snoyman.com
Tue Feb 28 18:51:29 CET 2012
On Tue, Feb 28, 2012 at 6:04 PM, Alexander V Vershilov
<alexander.vershilov at gmail.com> wrote:
> Hello, cafe.
>
> Is it possible to read data from different concurrent sources,
> i.e. read data from source as soon as it become avaliable, e.g.
>
> runResourceT $ (source1 stdin $= CL.map Left)
> >=> (source2 handle $= CL.map Right)
> $= application
> $$ sink
> where >=> - stands for concurrent combining of sources
>
> It would be good if it can be sources of different types (handle or
> STM channel, etc..).
>
> Currently I've found no good way to handle with this situation,
> except of using STM Channels for collecting data
>
> source1 ---+ |
> | sink | output sink
> +---] Channel [-------> application----->]
> | source
> source2 ---+ |
>
> From this point of view application takes concurent data, but this
> implementation requires additional thread per data processing. Also
> in many cases it will require run additional runResourceT (see later
> example).
There's not really any way to do what you're looking to do *without*
spawning a separate thread (or using some evented system directly, but
I'm assuming that's not the case). If what you're looking to do is
block until data is available from source1, and block until data is
available from source2, you're going to have to use separate threads
and some kind of synchronization. STM Channels seem like a good fit,
and normal Chans would probably work as well.
Clark Gaebel has already put together stm-conduit[1], maybe he would
be interested in adding some additional functions for this use case.
[1] http://hackage.haskell.org/packages/archive/stm-conduit/0.2.2.1/doc/html/Data-Conduit-TMChan.html
> So if there any possible simplifications? Or ideas how to make (>=>)
> operator.
>
> Example:
>
> So I've got next code in my network-conduit based application:
>
> main :: IO ()
> main = do
> pool <- createDBPool "..." 10
> let r = ServerInit pool
> forkIO $ forever clientConsole --read channel list and send "Left"
> flip runReaderT r $
> runTCPServer (ServerSettings 3500 Nothing) (protoServer)
>
> myServer src sink = do
> ch <- liftIO $ atomically $ newTBMChan 16
> initState <- lift $ ask
> _ <- liftIO $ fork . (flip runReaderT initState) $
> runResourceT $ src $= C.sequence decode
> $= CL.map Right $$ sinkTBMChan ch
> sourceTBMChan ch
> $= process $= C.sequence encode $$ sinkHandle stdout
>
> But in this situation I don't know if freeing of all resources are guaranteed,
> because I'm running additional resourceT in main resourceT scope.
You can nest ResourceT as much as you want. Each time you call
runResourceT, the resources allocated in that block will be freed. I
haven't analyzed your code in detail, but it seems fine to me. The
only real way you can stop ResourceT from freeing resources is by
never triggering the final release, which can be done by either:
1. Having your entire application live inside ResourceT. In such a
case, your resources will still be freed, it will just happen at the
very end of your application.
2. Use resourceForkIO and let the child threads live indefinitely.
>
> So can you advice is it possible to make concurrent sources now with currenly
> implemented library?
> If it's not possible but worth of implementing, so I can make that functions?
> Is it correct to runResourceT inside another resourceT?
>
> --
> Best regards,
> Alexander V Vershilov
>
> _______________________________________________
> Haskell-Cafe mailing list
> Haskell-Cafe at haskell.org
> http://www.haskell.org/mailman/listinfo/haskell-cafe
>
More information about the Haskell-Cafe
mailing list