[Haskell-cafe] "branching" conduits

Michael Snoyman michael at snoyman.com
Thu Jan 31 13:50:04 CET 2013


On Thu, Jan 31, 2013 at 11:48 AM, Simon Marechal <simon at banquise.net> wrote:

> Hello,
>
>         I have found the Conduit abstraction to be very well suited to a
> set of
> problems I am facing. I am however wondering how to implement
> "branching" conduits, and even conduit pools.
>
>         I am currently in the process of rewriting parts (the simple
> parts) of
> the Logstash tool. There is a sample program that I use here:
>
>
> https://github.com/bartavelle/hslogstash/blob/deprecateUtils/examples/RedisToElasticsearch.hs
>
>         As it can be seen, it uses a "Redis" source, a conduit that
> decodes the
> JSON ByteString into a LogstashMessage, a conduit that stores it into
> Elasticsearch and outputs the result of that action as an Either, and
> finally a sink that prints the errors.
>
>         My problem is that I would like more complex behaviour. For
> example, I
> would like to route messages to another server instead of putting them
> into Elasticsearch when the LogstashMessage has some tag set. But this
> is just an example, and it is probable I will want much more complex
> behavior soon.
>
>         I am not sure how to proceed from here, but have the following
> ideas:
>
>  * investigate how the Conduits are made internally to see if I can
> create a operator similar to $$, but that would have a signature like:
>         Source m (Either a b) -> Sink a m r -> Sink b m r
> and would do the branching in a binary fashion. I am not sure this is
> even possible.
>
>  * create a "mvars" connectors constructor, which might have a signature
> like this:
>
>  Int -- ^ branch count
>  (LogstashMessage -> Int) -- ^ branching function
>  (Sink LogstashMessage m (), [Source m LogstashMessage])
>  -- ^ a suitable sink, several sources for the other conduits
>
>  it would internally create a MVar (Maybe LogstashMessage) for each
> branch, and put putMVar accordingly to the branching function. When the
> Conduit is destroyed, it will putMVar Nothing in all MVars.
>  the sources would takeMVar, check if it is Nothing, or just proceed as
> expected.
>
>  The MVar should guarantee the constant space property, but there is the
> risk of inter branch blocking when one of the branches is significantly
> slower than the others. It doesn't really matter to me anyway. And all
> the branch Sinks would have to have some synchronization mechanism so
> that the main thread waits for them (as they are going to be launched by
> a forkIO).
>
>
>
>   This is the simplest scheme I have thought of, and it is probably not
> a very good one. I am very interested in suggestions here.
>
> _______________________________________________
> Haskell-Cafe mailing list
> Haskell-Cafe at haskell.org
> http://www.haskell.org/mailman/listinfo/haskell-cafe
>


Hi Simon,

For your first approach, I think what you're looking to do is combine two
Sinks together, something like:

combine :: Monad m
        => Sink i1 m r1
        -> Sink i2 m r2
        -> Sink (Either i1 i2) m (r1, r2)

Then you'd be able to use the standard $$ and =$ operators on it. I've put
up an example implementation here[1]. The majority of the code is simple
pattern matching on the different possible combination, but some things to
point out:

* To simplify, we start off with a call to injectLeftovers. This means that
we can entirely ignore the Leftover constructor in the main function.
* Since a Sink will never yield values, we can also ignore the HaveOutput
constructor.
* As soon as either of the Sinks terminates, we terminate the other one as
well and return the results.

You can also consider going the mutable container route if you like.
Instead of creating a lot of stuff from scratch with MVars, you could use
stm-conduit[2]. In fact, that package already contains some kind of merging
behavior for sources, it might make sense to ask the author about including
unmerging behavior for Sinks.

Michael

[1] https://gist.github.com/4682609
[2] http://hackage.haskell.org/package/stm-conduit
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://www.haskell.org/pipermail/haskell-cafe/attachments/20130131/4b4ddf9d/attachment.htm>


More information about the Haskell-Cafe mailing list