Thu Feb 3 05:25:40 CET 2011
I think the problem is that the iteratee you give to I.convStream
always returns Just [something] while you should return Nothing on
Suppose you want to have an enumeratee that adds 1 to each integer in
the stream and then use stream2list to get an iteratee that consumes
the result stream and returns it as a list:
> let iter = joinI $ (convStream (head >>= return . Just . (:[]) . (+1))) stream2list :: IterateeG [] Int IO [Int]
> run iter
*** Exception: control message: Just (Err "EOF")
Note that run simply passes EOF to iter and extracts the result.
Instead of throwing an error the code above should produce an [] (i.e.
no stream to consume, no elements in the list). This can be fixed by
checking whether the stream is empty:
> let iter = joinI $ (convStream (isFinished >>= maybe (head >>= return . Just . (:[]) . (+1)) (\_ -> return Nothing))) stream2list :: IterateeG [] Int IO [Int]
> run iter
I think you should do the same in your code:
runGetEnumeratee get =
I.convStream $ isFinished >>= maybe convIter (\_ -> return Nothing)
convIter = (Just . return) `liftM` (runGetIteratee get)
When the stream is not empty, it runs (runGetIteratee get) and returns
its result wrapped in Just . (:[]). When the stream is empty, it
returns Nothing so convStream knows it is done.
-- Maciej
On Thu, Feb 3, 2011 at 10:06 AM, wren ng thornton <wren at freegeek.org> wrote:
> I'm working on a project that's using John Lato's old implementation of
> iteratees (iteratee >= 0.3.5 && < 0.4; I'm hoping to migrate to 0.7 soon,
> but that's a ways off yet) and I'm running into some issues I haven't been
> able to untangle. Maybe a new set of eyes can help...
> The overarching program brings three things together for doing some
> interprocess communication: the medium is Posix FIFOs, the messages
> themselves are encoded with Google's Protocol Buffers[1], and the control
> flow for getting and processing the messages is handled by iteratees. The
> error message indicates iteratees are at fault, though it could be an error
> elsewhere instead.
> First, some boilerplate.
> -- For messageWithLengthEnumeratee only
> {-# LANGUAGE ScopedTypeVariables #-}
> import qualified Text.ProtocolBuffers.Reflections as R
> import qualified Text.ProtocolBuffers.WireMessage as W
> import qualified Text.ProtocolBuffers.Get as G
> import qualified Data.ByteString as S
> import qualified Data.ByteString.Lazy as L
> import qualified Data.Iteratee as I
> import Data.Iteratee.WrappedByteString
> import Data.Word (Word8)
> import Control.Monad (liftM)
> -- | Return a final value, and the remainder of the stream.
> idone :: a -> c el -> I.IterGV c el m a
> idone a xs = I.Done a (I.Chunk xs)
> {-# INLINE idone #-}
> -- | Convert a continuation into 'I.IterGV'.
> icontinue
> :: (I.StreamG c el -> m (I.IterGV c el m a))
> -> I.IterGV c el m a
> icontinue k = I.Cont (I.IterateeG k) Nothing
> {-# INLINE icontinue #-}
> -- | Throw an error message.
> ifail :: (Monad m) => String -> I.IterGV c el m a
> ifail msg = ierror (I.Err msg)
> {-# INLINE ifail #-}
> -- | An 'I.IterGV' variant of 'I.throwErr'.
> ierror :: (Monad m) => I.ErrMsg -> I.IterGV c el m a
> ierror err = I.Cont (I.throwErr err) (Just err)
> {-# INLINE ierror #-}
> toLazyBS :: S.ByteString -> L.ByteString
> toLazyBS = L.fromChunks . (:[])
> {-# INLINE toLazyBS #-}
> toStrictBS :: L.ByteString -> S.ByteString
> toStrictBS = S.concat . L.toChunks
> {-# INLINE toStrictBS #-}
> Now we have the code for converting the Get monad used by protocol buffers
> into an iteratee. This should be correct, and it's pretty straightforward.
> -- | Convert a 'G.Result' iteratee state into a 'I.IterGV'
> -- iteratee state.
> result2iterv
> :: (Monad m)
> => G.Result a
> -> I.IterGV WrappedByteString Word8 m a
> result2iterv (G.Finished rest _ a) = idone a (WrapBS $ toStrictBS rest)
> result2iterv (G.Failed _ msg) = ifail msg
> result2iterv (G.Partial k) = I.Cont (iterify k) Nothing
> -- | Convert a protobuf-style continuation into an
> -- iteratee-style continuation.
> iterify
> :: (Monad m)
> => (Maybe L.ByteString -> G.Result a)
> -> I.IterateeG WrappedByteString Word8 m a
> iterify k =
> I.IterateeG $ \s -> return $!
> case s of
> I.Chunk (WrapBS xs) -> result2iterv $ k (Just $ toLazyBS xs)
> I.EOF Nothing -> result2iterv $ k Nothing
> I.EOF (Just err) -> ierror err
> -- | A variant of 'G.runGet' as an iteratee.
> runGetIteratee
> :: (Monad m, R.ReflectDescriptor a, W.Wire a)
> => G.Get a
> -> I.IterateeG WrappedByteString Word8 m a
> runGetIteratee g =
> I.IterateeG $ \s -> return $!
> case s of
> I.Chunk (WrapBS xs) -> result2iterv $ G.runGet g (toLazyBS xs)
> I.EOF Nothing -> result2iterv $ G.runGet g L.empty
> I.EOF (Just err) -> ierror err
> Okay, now we have an iteratee which consumes a stream of bytestrings and
> will render a protocol buffer message. But what we really want is an
> enumeratee to do this repeatedly so we can use an iteratee to consume the
> stream of messages. I have the following definition which typechecks, but
> doesn't seem to work. The call to convStream seems like it always hangs:
> -- | A variant of 'G.runGet' as an enumeratee.
> runGetEnumeratee
> :: (Monad m, R.ReflectDescriptor a, W.Wire a)
> => G.Get a
> -> I.EnumeratorN WrappedByteString Word8 [] a m b
> runGetEnumeratee =
> I.convStream . liftM (Just . (:[])) . runGetIteratee
> Once we have a working definition of runGetEnumeratee, then we can define
> the specific enumeratee we need:
> -- | An enumeratee for converting bytestrings into protocol
> -- buffer messages.
> messageWithLengthEnumeratee
> :: forall m msg a
> . (Monad m, R.ReflectDescriptor msg, W.Wire msg)
> => I.EnumeratorN WrappedByteString Word8 [] msg m a
> messageWithLengthEnumeratee =
> runGetEnumeratee (W.messageWithLengthGetM :: G.Get msg)
> And then at the use site we have the following:
> let processRequest = ... :: msg -> IO ()
> I.run -- run the [()], sending EOF
> . I.joinIM -- push monadic effects inside
> . I.enumFdFollow fifo_in -- read from the Fd forever
> . I.joinI -- when EOF bytestrings, EOF msgs
> . messageWithLengthEnumeratee -- ByteStrings -> messages
> $ I.mapM_ processRequest -- process messages
> I think this part is correct too, but just to be sure... the goal is that we
> should read bytestrings from the FIFO forever (or until the other process
> closes their end), and then we read off the messages one by one, handing
> them off to processRequest to interpret them and respond accordingly.
> When I put this all together, the process is killed with:
> control message: Just (Err "endOfInput")
> Data.Iteratee.Base.run is the origin of the "control message:" part of the
> error, but I don't know where (Err "endOfInput") is coming from since
> Data.Iteratee.Base only uses (Err "EOF") or (Err "Divergent Iteratee"). I
> believe runGetEnumeratee is where the problem is, though it could also be
> the use site or something in one of the libraries. Any help would be
> appreciated.
> [1] http://hackage.haskell.org/package/protocol-buffers
> http://hackage.haskell.org/package/hprotoc
> --
> Live well,
> ~wren
