[Haskell-cafe] Problems with iteratees
wren ng thornton
wren at freegeek.org
Thu Feb 3 02:06:35 CET 2011
I'm working on a project that's using John Lato's old implementation of
iteratees (iteratee >= 0.3.5 && < 0.4; I'm hoping to migrate to 0.7
soon, but that's a ways off yet) and I'm running into some issues I
haven't been able to untangle. Maybe a new set of eyes can help...
The overarching program brings three things together for doing some
interprocess communication: the medium is Posix FIFOs, the messages
themselves are encoded with Google's Protocol Buffers[1], and the
control flow for getting and processing the messages is handled by
iteratees. The error message indicates iteratees are at fault, though it
could be an error elsewhere instead.
First, some boilerplate.
-- For messageWithLengthEnumeratee only
{-# LANGUAGE ScopedTypeVariables #-}
import qualified Text.ProtocolBuffers.Reflections as R
import qualified Text.ProtocolBuffers.WireMessage as W
import qualified Text.ProtocolBuffers.Get as G
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
import qualified Data.Iteratee as I
import Data.Iteratee.WrappedByteString
import Data.Word (Word8)
import Control.Monad (liftM)
-- | Return a final value, and the remainder of the stream.
idone :: a -> c el -> I.IterGV c el m a
idone a xs = I.Done a (I.Chunk xs)
{-# INLINE idone #-}
-- | Convert a continuation into 'I.IterGV'.
icontinue
:: (I.StreamG c el -> m (I.IterGV c el m a))
-> I.IterGV c el m a
icontinue k = I.Cont (I.IterateeG k) Nothing
{-# INLINE icontinue #-}
-- | Throw an error message.
ifail :: (Monad m) => String -> I.IterGV c el m a
ifail msg = ierror (I.Err msg)
{-# INLINE ifail #-}
-- | An 'I.IterGV' variant of 'I.throwErr'.
ierror :: (Monad m) => I.ErrMsg -> I.IterGV c el m a
ierror err = I.Cont (I.throwErr err) (Just err)
{-# INLINE ierror #-}
toLazyBS :: S.ByteString -> L.ByteString
toLazyBS = L.fromChunks . (:[])
{-# INLINE toLazyBS #-}
toStrictBS :: L.ByteString -> S.ByteString
toStrictBS = S.concat . L.toChunks
{-# INLINE toStrictBS #-}
Now we have the code for converting the Get monad used by protocol
buffers into an iteratee. This should be correct, and it's pretty
straightforward.
-- | Convert a 'G.Result' iteratee state into a 'I.IterGV'
-- iteratee state.
result2iterv
:: (Monad m)
=> G.Result a
-> I.IterGV WrappedByteString Word8 m a
result2iterv (G.Finished rest _ a) = idone a (WrapBS $ toStrictBS rest)
result2iterv (G.Failed _ msg) = ifail msg
result2iterv (G.Partial k) = I.Cont (iterify k) Nothing
-- | Convert a protobuf-style continuation into an
-- iteratee-style continuation.
iterify
:: (Monad m)
=> (Maybe L.ByteString -> G.Result a)
-> I.IterateeG WrappedByteString Word8 m a
iterify k =
I.IterateeG $ \s -> return $!
case s of
I.Chunk (WrapBS xs) -> result2iterv $ k (Just $ toLazyBS xs)
I.EOF Nothing -> result2iterv $ k Nothing
I.EOF (Just err) -> ierror err
-- | A variant of 'G.runGet' as an iteratee.
runGetIteratee
:: (Monad m, R.ReflectDescriptor a, W.Wire a)
=> G.Get a
-> I.IterateeG WrappedByteString Word8 m a
runGetIteratee g =
I.IterateeG $ \s -> return $!
case s of
I.Chunk (WrapBS xs) -> result2iterv $ G.runGet g (toLazyBS xs)
I.EOF Nothing -> result2iterv $ G.runGet g L.empty
I.EOF (Just err) -> ierror err
Okay, now we have an iteratee which consumes a stream of bytestrings and
will render a protocol buffer message. But what we really want is an
enumeratee to do this repeatedly so we can use an iteratee to consume
the stream of messages. I have the following definition which
typechecks, but doesn't seem to work. The call to convStream seems like
it always hangs:
-- | A variant of 'G.runGet' as an enumeratee.
runGetEnumeratee
:: (Monad m, R.ReflectDescriptor a, W.Wire a)
=> G.Get a
-> I.EnumeratorN WrappedByteString Word8 [] a m b
runGetEnumeratee =
I.convStream . liftM (Just . (:[])) . runGetIteratee
Once we have a working definition of runGetEnumeratee, then we can
define the specific enumeratee we need:
-- | An enumeratee for converting bytestrings into protocol
-- buffer messages.
messageWithLengthEnumeratee
:: forall m msg a
. (Monad m, R.ReflectDescriptor msg, W.Wire msg)
=> I.EnumeratorN WrappedByteString Word8 [] msg m a
messageWithLengthEnumeratee =
runGetEnumeratee (W.messageWithLengthGetM :: G.Get msg)
And then at the use site we have the following:
let processRequest = ... :: msg -> IO ()
I.run -- run the [()], sending EOF
. I.joinIM -- push monadic effects inside
. I.enumFdFollow fifo_in -- read from the Fd forever
. I.joinI -- when EOF bytestrings, EOF msgs
. messageWithLengthEnumeratee -- ByteStrings -> messages
$ I.mapM_ processRequest -- process messages
I think this part is correct too, but just to be sure... the goal is
that we should read bytestrings from the FIFO forever (or until the
other process closes their end), and then we read off the messages one
by one, handing them off to processRequest to interpret them and respond
accordingly.
When I put this all together, the process is killed with:
control message: Just (Err "endOfInput")
Data.Iteratee.Base.run is the origin of the "control message:" part of
the error, but I don't know where (Err "endOfInput") is coming from
since Data.Iteratee.Base only uses (Err "EOF") or (Err "Divergent
Iteratee"). I believe runGetEnumeratee is where the problem is, though
it could also be the use site or something in one of the libraries. Any
help would be appreciated.
[1] http://hackage.haskell.org/package/protocol-buffers
http://hackage.haskell.org/package/hprotoc
--
Live well,
~wren
More information about the Haskell-Cafe
mailing list