[Haskell-cafe] Conduit Best Practices for leftover data

Michael Snoyman michael at snoyman.com
Mon Apr 16 06:08:21 CEST 2012

I'm not really certain of your previous example, but what you're
describing here is data loss. AFAIK, this is inherent to any kind of
streaming approach. Have a look at the chapter in the Yesod book on
conduit, the description of data loss is still accurate.

On Mon, Apr 16, 2012 at 3:46 AM, Myles C. Maxfield
<myles.maxfield at gmail.com> wrote:
> Sorry for the spam.
> A similar matter is this following program, where something downstream
> reaches EOF right after a conduit outputs a HaveOutput. Because the type of
> the early-closed function is just 'r' or 'm r', there is no way for the
> conduit to return any partial output. This means that any extra values in
> the chunk the conduit read are lost. Is there some way around this?
> -- takeConduit as in previous email
> -- partial2 outputs ([[1,2,3,4,5]],[]) instead of ([[1,2,3,4,5]],[6,7,8])
> monadSink :: Monad m => CI.Sink [a1] m ([[a1]], [[a1]])
> monadSink = do
>   output <- takeConduit 5 C.=$ CL.take 1
>   output' <- CL.consume
>   return (output, output')
> partial2 :: (Num t, Monad m, Enum t) => m ([[t]], [[t]])
> partial2 = CL.sourceList [[1..8]] C.$$ monadSink
> Thanks,
> Myles
> On Sun, Apr 15, 2012 at 4:53 PM, Myles C. Maxfield
> <myles.maxfield at gmail.com> wrote:
>>>> 2. If you use connect-and-resume ($$+), the leftovers are returned as
>>>> part of the `Source`, and provided downstream.
>> I'm trying to figure out how to use this, but I'm getting a little bit
>> confused. In particular, here is a conduit that produces an output for
>> every 'i' inputs. I'm returning partial data when the input stream
>> hits an EOF (And I verified that the partial data is correct with
>> Debug.Trace), yet the output of 'partial' is ([[1,2,3,4,5]],[])
>> instead of ([[1,2,3,4,5]],[6,7,8]). Can you help me understand what's
>> going on?
>> Thanks,
>> Myles
>> import qualified Data.Conduit as C
>> import qualified Data.Conduit.List as CL
>> -- functionally the same as concatenating all the inputs, then
>> repeatedly running splitAt on the concatenation.
>> takeConduit :: (Num a, Monad m) => a -> C.Pipe [a1] [a1] m ()
>> takeConduit i = takeConduitHelper i [] []
>>  where takeConduitHelper x lout lin
>>          | x == 0 = C.HaveOutput (takeConduitHelper i [] lin) (return
>> ()) $ reverse lout
>>          | null lin = C.NeedInput (takeConduitHelper x lout) (C.Done
>> (Just $ reverse lout) ())
>>          | otherwise = takeConduitHelper (x - 1) (head lin : lout) $ tail
>> lin
>> partial :: (Num t, Monad m, Enum t) => m ([[t]], [[t]])
>> partial = do
>>  (source, output) <- CL.sourceList [[1..8]] C.$$+ (takeConduit 5 C.=$
>> CL.consume)
>>  output' <- source C.$$ CL.consume
>>  return (output, output')
>> On Sun, Apr 15, 2012 at 2:12 PM, Myles C. Maxfield
>> <myles.maxfield at gmail.com> wrote:
>>> Thanks for responding to this. Some responses are inline.
>>> On Sat, Apr 14, 2012 at 8:30 PM, Michael Snoyman <michael at snoyman.com>
>>> wrote:
>>>> On Thu, Apr 12, 2012 at 9:25 AM, Myles C. Maxfield
>>>> <myles.maxfield at gmail.com> wrote:
>>>>> Hello,
>>>>> I am interested in the argument to Done, namely, leftover data. More
>>>>> specifically, when implementing a conduit/sink, what should the
>>>>> conduit specify for the (Maybe i) argument to Done in the following
>>>>> scenarios (Please note that these scenarios only make sense if the
>>>>> type of 'i' is something in Monoid):
>>>>> 1) The conduit outputted the last thing that it felt like outputting,
>>>>> and exited willfully. There seem to be two options here - a) the
>>>>> conduit/sink should greedily gather up all the remaining input in the
>>>>> stream and mconcat them, or b) Return the part of the last thing that
>>>>> never got represented in any part of anything outputted. Option b
>>>>> seems to make the most sense here.
>>>> Yes, option (b) is definitely what's intended.
>>>>> 2) Something upstream produced Done, so the second argument to
>>>>> NeedInput gets run. This is guaranteed to be run at the boundary of an
>>>>> item, so should it always return Nothing? Instead, should it remember
>>>>> all the input it has consumed for the current (yet-to-be-outputted)
>>>>> element, so it can let Data.Conduit know that, even though the conduit
>>>>> appeared to consume the past few items, it actually didn't (because it
>>>>> needs more input items to make an output)? Remembering this sequence
>>>>> could potentially have disastrous memory usage. On the other hand, It
>>>>> could also greedily gather everything remaining in the stream.
>>>> No, nothing so complicated is intended. Most likely you'll never
>>>> return any leftovers from the second field of NeedInput. One other
>>>> minor point: it's also possible that the second field will be used if
>>>> the *downstream* pipe returns Done.
>>> Just to help me understand, what is a case when you want to specify
>>> something in this field? I can't think of a case when a Conduit would
>>> specify anything in this case.
>>>>> 3) The conduit/sink encountered an error mid-item. In general, is
>>>>> there a commonly-accepted way to deal with this? If a conduit fails in
>>>>> the middle of an item, it might not be clear where it should pick up
>>>>> processing, so the conduit probably shouldn't even attempt to
>>>>> continue. It would probably be good to return some notion of where it
>>>>> was in the input when it failed. It could return (Done (???) (Left
>>>>> errcode)) but this requires that everything downstream in the pipeline
>>>>> be aware of Errcode, which is not ideal.I could use MonadError along
>>>>> with PipeM, but this approach completely abandons the part of the
>>>>> stream that has been processed successfully. I'd like to avoid using
>>>>> Exceptions if at all possible.
>>>> Why avoid Exceptions? It's the right fit for the job. You can still
>>>> keep your conduit pure by setting up an `ExceptionT Identity` stack,
>>>> which is exactly how you can use the Data.Conduit.Text functions from
>>>> pure code. Really, what you need to be asking is "is there any logical
>>>> way to recover from an exception here?"
>>> I suppose this is a little off-topic, but do you prefer ExceptionT or
>>> ErrorT? Any exception/error that I'd be throwing is just  a container
>>> around a String, so both of them will work fine for my purposes.
>>>>> It doesn't seem that a user application even has any way to access
>>>>> leftover data anyway, so perhaps this discussion will only be relevant
>>>>> in a future version of Conduit. At any rate, any feedback you could
>>>>> give me on this issue would be greatly appreciated.
>>>> Leftover data is definitely used:
>>>> 1. If you compose together two `Sink` with monadic bind, the leftovers
>>>> from the first will be passed to the second.
>>> You can do that???? That's so cool!I never realized that Pipes are
>>> members of Monad.
>>>> 2. If you use connect-and-resume ($$+), the leftovers are returned as
>>>> part of the `Source`, and provided downstream.
>>> This too is really neat :] I didn't realize how this worked.
>>>> Michael

More information about the Haskell-Cafe mailing list