[Haskell-cafe] Problems with iteratees

wren ng thornton wren at freegeek.org
Thu Feb 3 02:06:35 CET 2011


I'm working on a project that's using John Lato's old implementation of 
iteratees (iteratee >= 0.3.5 && < 0.4; I'm hoping to migrate to 0.7 
soon, but that's a ways off yet) and I'm running into some issues I 
haven't been able to untangle. Maybe a new set of eyes can help...

The overarching program brings three things together for doing some 
interprocess communication: the medium is Posix FIFOs, the messages 
themselves are encoded with Google's Protocol Buffers[1], and the 
control flow for getting and processing the messages is handled by 
iteratees. The error message indicates iteratees are at fault, though it 
could be an error elsewhere instead.

First, some boilerplate.

     -- For messageWithLengthEnumeratee only
     {-# LANGUAGE ScopedTypeVariables #-}

     import qualified Text.ProtocolBuffers.Reflections as R
     import qualified Text.ProtocolBuffers.WireMessage as W
     import qualified Text.ProtocolBuffers.Get         as G
     import qualified Data.ByteString                  as S
     import qualified Data.ByteString.Lazy             as L
     import qualified Data.Iteratee                    as I
     import           Data.Iteratee.WrappedByteString
     import           Data.Word                        (Word8)
     import           Control.Monad                    (liftM)


     -- | Return a final value, and the remainder of the stream.
     idone :: a -> c el -> I.IterGV c el m a
     idone a xs = I.Done a (I.Chunk xs)
     {-# INLINE idone #-}


     -- | Convert a continuation into 'I.IterGV'.
     icontinue
         :: (I.StreamG c el -> m (I.IterGV c el m a))
         -> I.IterGV c el m a
     icontinue k = I.Cont (I.IterateeG k) Nothing
     {-# INLINE icontinue #-}


     -- | Throw an error message.
     ifail :: (Monad m) => String -> I.IterGV c el m a
     ifail msg = ierror (I.Err msg)
     {-# INLINE ifail #-}


     -- | An 'I.IterGV' variant of 'I.throwErr'.
     ierror :: (Monad m) => I.ErrMsg -> I.IterGV c el m a
     ierror err = I.Cont (I.throwErr err) (Just err)
     {-# INLINE ierror #-}


     toLazyBS :: S.ByteString -> L.ByteString
     toLazyBS = L.fromChunks . (:[])
     {-# INLINE toLazyBS #-}


     toStrictBS :: L.ByteString -> S.ByteString
     toStrictBS = S.concat . L.toChunks
     {-# INLINE toStrictBS #-}

Now we have the code for converting the Get monad used by protocol 
buffers into an iteratee. This should be correct, and it's pretty 
straightforward.

     -- | Convert a 'G.Result' iteratee state into a 'I.IterGV'
     -- iteratee state.
     result2iterv
         :: (Monad m)
         => G.Result a
         -> I.IterGV WrappedByteString Word8 m a
     result2iterv (G.Finished rest _ a) = idone a (WrapBS $ toStrictBS rest)
     result2iterv (G.Failed _ msg)      = ifail msg
     result2iterv (G.Partial k)         = I.Cont (iterify k) Nothing


     -- | Convert a protobuf-style continuation into an
     -- iteratee-style continuation.
     iterify
         :: (Monad m)
         => (Maybe L.ByteString -> G.Result a)
         -> I.IterateeG WrappedByteString Word8 m a
     iterify k =
         I.IterateeG $ \s -> return $!
             case s of
             I.Chunk (WrapBS xs) -> result2iterv $ k (Just $ toLazyBS xs)
             I.EOF Nothing       -> result2iterv $ k Nothing
             I.EOF (Just err)    -> ierror err


     -- | A variant of 'G.runGet' as an iteratee.
     runGetIteratee
         :: (Monad m, R.ReflectDescriptor a, W.Wire a)
         => G.Get a
         -> I.IterateeG WrappedByteString Word8 m a
     runGetIteratee g =
         I.IterateeG $ \s -> return $!
             case s of
             I.Chunk (WrapBS xs) -> result2iterv $ G.runGet g (toLazyBS xs)
             I.EOF Nothing       -> result2iterv $ G.runGet g L.empty
             I.EOF (Just err)    -> ierror err

Okay, now we have an iteratee which consumes a stream of bytestrings and 
will render a protocol buffer message. But what we really want is an 
enumeratee to do this repeatedly so we can use an iteratee to consume 
the stream of messages. I have the following definition which 
typechecks, but doesn't seem to work. The call to convStream seems like 
it always hangs:

     -- | A variant of 'G.runGet' as an enumeratee.
     runGetEnumeratee
         :: (Monad m, R.ReflectDescriptor a, W.Wire a)
         => G.Get a
         -> I.EnumeratorN WrappedByteString Word8 [] a m b
     runGetEnumeratee =
         I.convStream . liftM (Just . (:[])) . runGetIteratee

Once we have a working definition of runGetEnumeratee, then we can 
define the specific enumeratee we need:

     -- | An enumeratee for converting bytestrings into protocol
     -- buffer messages.
     messageWithLengthEnumeratee
         :: forall m msg a
         .  (Monad m, R.ReflectDescriptor msg, W.Wire msg)
         => I.EnumeratorN WrappedByteString Word8 [] msg m a
     messageWithLengthEnumeratee =
        runGetEnumeratee (W.messageWithLengthGetM :: G.Get msg)

And then at the use site we have the following:

     let processRequest = ... :: msg -> IO ()
     I.run                             -- run the [()], sending EOF
         . I.joinIM                    -- push monadic effects inside
         . I.enumFdFollow fifo_in      -- read from the Fd forever
         . I.joinI                     -- when EOF bytestrings, EOF msgs
         . messageWithLengthEnumeratee -- ByteStrings -> messages
         $ I.mapM_ processRequest      -- process messages

I think this part is correct too, but just to be sure... the goal is 
that we should read bytestrings from the FIFO forever (or until the 
other process closes their end), and then we read off the messages one 
by one, handing them off to processRequest to interpret them and respond 
accordingly.

When I put this all together, the process is killed with:

     control message: Just (Err "endOfInput")

Data.Iteratee.Base.run is the origin of the "control message:" part of 
the error, but I don't know where (Err "endOfInput") is coming from 
since Data.Iteratee.Base only uses (Err "EOF") or (Err "Divergent 
Iteratee"). I believe runGetEnumeratee is where the problem is, though 
it could also be the use site or something in one of the libraries. Any 
help would be appreciated.


[1] http://hackage.haskell.org/package/protocol-buffers
     http://hackage.haskell.org/package/hprotoc

-- 
Live well,
~wren



More information about the Haskell-Cafe mailing list