[Haskell-cafe] "branching" conduits

Simon Marechal simon at banquise.net
Thu Jan 31 10:48:16 CET 2013


Hello,

	I have found the Conduit abstraction to be very well suited to a set of
problems I am facing. I am however wondering how to implement
"branching" conduits, and even conduit pools.

	I am currently in the process of rewriting parts (the simple parts) of
the Logstash tool. There is a sample program that I use here:

https://github.com/bartavelle/hslogstash/blob/deprecateUtils/examples/RedisToElasticsearch.hs

	As it can be seen, it uses a "Redis" source, a conduit that decodes the
JSON ByteString into a LogstashMessage, a conduit that stores it into
Elasticsearch and outputs the result of that action as an Either, and
finally a sink that prints the errors.

	My problem is that I would like more complex behaviour. For example, I
would like to route messages to another server instead of putting them
into Elasticsearch when the LogstashMessage has some tag set. But this
is just an example, and it is probable I will want much more complex
behavior soon.

	I am not sure how to proceed from here, but have the following ideas:

 * investigate how the Conduits are made internally to see if I can
create a operator similar to $$, but that would have a signature like:
	Source m (Either a b) -> Sink a m r -> Sink b m r
and would do the branching in a binary fashion. I am not sure this is
even possible.

 * create a "mvars" connectors constructor, which might have a signature
like this:

 Int -- ^ branch count
 (LogstashMessage -> Int) -- ^ branching function
 (Sink LogstashMessage m (), [Source m LogstashMessage])
 -- ^ a suitable sink, several sources for the other conduits

 it would internally create a MVar (Maybe LogstashMessage) for each
branch, and put putMVar accordingly to the branching function. When the
Conduit is destroyed, it will putMVar Nothing in all MVars.
 the sources would takeMVar, check if it is Nothing, or just proceed as
expected.

 The MVar should guarantee the constant space property, but there is the
risk of inter branch blocking when one of the branches is significantly
slower than the others. It doesn't really matter to me anyway. And all
the branch Sinks would have to have some synchronization mechanism so
that the main thread waits for them (as they are going to be launched by
a forkIO).



  This is the simplest scheme I have thought of, and it is probably not
a very good one. I am very interested in suggestions here.



More information about the Haskell-Cafe mailing list