[Haskell-cafe] Problems with iteratees

Maciej Wos maciej.wos at gmail.com
Thu Feb 3 05:25:40 CET 2011


I think the problem is that the iteratee you give to I.convStream
always returns Just [something] while you should return Nothing on
EOF.

Suppose you want to have an enumeratee that adds 1 to each integer in
the stream and then use stream2list to get an iteratee that consumes
the result stream and returns it as a list:

> let iter = joinI $ (convStream (head >>= return . Just . (:[]) . (+1))) stream2list :: IterateeG [] Int IO [Int]
> run iter
*** Exception: control message: Just (Err "EOF")

Note that run simply passes EOF to iter and extracts the result.
Instead of throwing an error the code above should produce an [] (i.e.
no stream to consume, no elements in the list). This can be fixed by
checking whether the stream is empty:

> let iter = joinI $ (convStream (isFinished >>= maybe (head >>= return . Just . (:[]) . (+1)) (\_ -> return Nothing))) stream2list :: IterateeG [] Int IO [Int]
> run iter
[]

I think you should do the same in your code:

runGetEnumeratee get =
    I.convStream $ isFinished >>= maybe convIter (\_ -> return Nothing)
    where
        convIter = (Just . return) `liftM` (runGetIteratee get)

When the stream is not empty, it runs (runGetIteratee get) and returns
its result wrapped in Just . (:[]). When the stream is empty, it
returns Nothing so convStream knows it is done.

-- Maciej

On Thu, Feb 3, 2011 at 10:06 AM, wren ng thornton <wren at freegeek.org> wrote:
> I'm working on a project that's using John Lato's old implementation of
> iteratees (iteratee >= 0.3.5 && < 0.4; I'm hoping to migrate to 0.7 soon,
> but that's a ways off yet) and I'm running into some issues I haven't been
> able to untangle. Maybe a new set of eyes can help...
>
> The overarching program brings three things together for doing some
> interprocess communication: the medium is Posix FIFOs, the messages
> themselves are encoded with Google's Protocol Buffers[1], and the control
> flow for getting and processing the messages is handled by iteratees. The
> error message indicates iteratees are at fault, though it could be an error
> elsewhere instead.
>
> First, some boilerplate.
>
>    -- For messageWithLengthEnumeratee only
>    {-# LANGUAGE ScopedTypeVariables #-}
>
>    import qualified Text.ProtocolBuffers.Reflections as R
>    import qualified Text.ProtocolBuffers.WireMessage as W
>    import qualified Text.ProtocolBuffers.Get         as G
>    import qualified Data.ByteString                  as S
>    import qualified Data.ByteString.Lazy             as L
>    import qualified Data.Iteratee                    as I
>    import           Data.Iteratee.WrappedByteString
>    import           Data.Word                        (Word8)
>    import           Control.Monad                    (liftM)
>
>
>    -- | Return a final value, and the remainder of the stream.
>    idone :: a -> c el -> I.IterGV c el m a
>    idone a xs = I.Done a (I.Chunk xs)
>    {-# INLINE idone #-}
>
>
>    -- | Convert a continuation into 'I.IterGV'.
>    icontinue
>        :: (I.StreamG c el -> m (I.IterGV c el m a))
>        -> I.IterGV c el m a
>    icontinue k = I.Cont (I.IterateeG k) Nothing
>    {-# INLINE icontinue #-}
>
>
>    -- | Throw an error message.
>    ifail :: (Monad m) => String -> I.IterGV c el m a
>    ifail msg = ierror (I.Err msg)
>    {-# INLINE ifail #-}
>
>
>    -- | An 'I.IterGV' variant of 'I.throwErr'.
>    ierror :: (Monad m) => I.ErrMsg -> I.IterGV c el m a
>    ierror err = I.Cont (I.throwErr err) (Just err)
>    {-# INLINE ierror #-}
>
>
>    toLazyBS :: S.ByteString -> L.ByteString
>    toLazyBS = L.fromChunks . (:[])
>    {-# INLINE toLazyBS #-}
>
>
>    toStrictBS :: L.ByteString -> S.ByteString
>    toStrictBS = S.concat . L.toChunks
>    {-# INLINE toStrictBS #-}
>
> Now we have the code for converting the Get monad used by protocol buffers
> into an iteratee. This should be correct, and it's pretty straightforward.
>
>    -- | Convert a 'G.Result' iteratee state into a 'I.IterGV'
>    -- iteratee state.
>    result2iterv
>        :: (Monad m)
>        => G.Result a
>        -> I.IterGV WrappedByteString Word8 m a
>    result2iterv (G.Finished rest _ a) = idone a (WrapBS $ toStrictBS rest)
>    result2iterv (G.Failed _ msg)      = ifail msg
>    result2iterv (G.Partial k)         = I.Cont (iterify k) Nothing
>
>
>    -- | Convert a protobuf-style continuation into an
>    -- iteratee-style continuation.
>    iterify
>        :: (Monad m)
>        => (Maybe L.ByteString -> G.Result a)
>        -> I.IterateeG WrappedByteString Word8 m a
>    iterify k =
>        I.IterateeG $ \s -> return $!
>            case s of
>            I.Chunk (WrapBS xs) -> result2iterv $ k (Just $ toLazyBS xs)
>            I.EOF Nothing       -> result2iterv $ k Nothing
>            I.EOF (Just err)    -> ierror err
>
>
>    -- | A variant of 'G.runGet' as an iteratee.
>    runGetIteratee
>        :: (Monad m, R.ReflectDescriptor a, W.Wire a)
>        => G.Get a
>        -> I.IterateeG WrappedByteString Word8 m a
>    runGetIteratee g =
>        I.IterateeG $ \s -> return $!
>            case s of
>            I.Chunk (WrapBS xs) -> result2iterv $ G.runGet g (toLazyBS xs)
>            I.EOF Nothing       -> result2iterv $ G.runGet g L.empty
>            I.EOF (Just err)    -> ierror err
>
> Okay, now we have an iteratee which consumes a stream of bytestrings and
> will render a protocol buffer message. But what we really want is an
> enumeratee to do this repeatedly so we can use an iteratee to consume the
> stream of messages. I have the following definition which typechecks, but
> doesn't seem to work. The call to convStream seems like it always hangs:
>
>    -- | A variant of 'G.runGet' as an enumeratee.
>    runGetEnumeratee
>        :: (Monad m, R.ReflectDescriptor a, W.Wire a)
>        => G.Get a
>        -> I.EnumeratorN WrappedByteString Word8 [] a m b
>    runGetEnumeratee =
>        I.convStream . liftM (Just . (:[])) . runGetIteratee
>
> Once we have a working definition of runGetEnumeratee, then we can define
> the specific enumeratee we need:
>
>    -- | An enumeratee for converting bytestrings into protocol
>    -- buffer messages.
>    messageWithLengthEnumeratee
>        :: forall m msg a
>        .  (Monad m, R.ReflectDescriptor msg, W.Wire msg)
>        => I.EnumeratorN WrappedByteString Word8 [] msg m a
>    messageWithLengthEnumeratee =
>       runGetEnumeratee (W.messageWithLengthGetM :: G.Get msg)
>
> And then at the use site we have the following:
>
>    let processRequest = ... :: msg -> IO ()
>    I.run                             -- run the [()], sending EOF
>        . I.joinIM                    -- push monadic effects inside
>        . I.enumFdFollow fifo_in      -- read from the Fd forever
>        . I.joinI                     -- when EOF bytestrings, EOF msgs
>        . messageWithLengthEnumeratee -- ByteStrings -> messages
>        $ I.mapM_ processRequest      -- process messages
>
> I think this part is correct too, but just to be sure... the goal is that we
> should read bytestrings from the FIFO forever (or until the other process
> closes their end), and then we read off the messages one by one, handing
> them off to processRequest to interpret them and respond accordingly.
>
> When I put this all together, the process is killed with:
>
>    control message: Just (Err "endOfInput")
>
> Data.Iteratee.Base.run is the origin of the "control message:" part of the
> error, but I don't know where (Err "endOfInput") is coming from since
> Data.Iteratee.Base only uses (Err "EOF") or (Err "Divergent Iteratee"). I
> believe runGetEnumeratee is where the problem is, though it could also be
> the use site or something in one of the libraries. Any help would be
> appreciated.
>
>
> [1] http://hackage.haskell.org/package/protocol-buffers
>    http://hackage.haskell.org/package/hprotoc
>
> --
> Live well,
> ~wren
>
> _______________________________________________
> Haskell-Cafe mailing list
> Haskell-Cafe at haskell.org
> http://www.haskell.org/mailman/listinfo/haskell-cafe
>



More information about the Haskell-Cafe mailing list