[Haskell-cafe] Conduit and pipelined protocol processing using a threadpool

Nicolas Trangez nicolas at incubaid.com
Tue Nov 27 11:57:09 CET 2012


I've written a library to implement servers for some protocol using
Conduit (I'll announce more details later).

The protocol supports pipelining, i.e. a client can send a 'command'
which contains some opaque 'handle' chosen by the client, the server
processes this command, then returns some reply which contains this
handle. The client is free to send other commands before receiving a
reply for any previous request, and the server can process these
commands in any order, sequential or concurrently.

The library is based on network-conduit's "Application" style [1], as
such now I write code like (OTOH)

> application :: AppData IO -> IO ()
> application client = appSource client $= handler $$ appSink client
>   where
>     handler = do
>         negotiateResult <- MyLib.negotiate
>         liftIO $ validateNegotiateResult negotiateResult
>         MyLib.sendInformation 123
>         loop
>    loop = do
>        command <- MyLib.getCommand
>        case command of
>            CommandA handle arg -> do
>                result <- liftIO $ doComplexProcessingA arg
>                MyLib.sendReply handle result
>                loop
>            Disconnect -> return ()

This approach handles commands in-order, sequentially. Since command
processing can involve quite some IO operations to disk or network, I've
been trying to support pipelining on the server-side, but as of now I
was unable to get things working.

The idea would be to have a pool of worker threads, which receive work
items from some channel, then return any result on some other channel,
which should then be returned to the client.

This means inside "loop" I would have 2 sources: commands coming from
the client (using 'MyLib.getCommand :: MonadIO m => Pipe ByteString
ByteString o u m Command'), as well as command results coming from the
worker threads through the result channel. Whenever the first source
produces something, it should be pushed onto the work queue, and
whenever the second on yields some result it should be sent to the
client using 'MyLib.sendReply :: Monad m => Handle -> Result -> Pipe l i
ByteString u m ()'

I've been fighting this for a while and haven't managed to get something
sensible working. Maybe the design of my library is flawed, or maybe I'm
approaching the problem incorrectly, or ...

Has this ever been done before, or would anyone have some pointers how
to tackle this?




More information about the Haskell-Cafe mailing list