[Haskell-cafe] streaming package: How to demux a stream properly?

Tikhon Jelvis tikhon at jelv.is
Wed Apr 15 17:04:32 UTC 2020


I'm not sure it's possible to do that efficiently.

Here's a hypothetical situation: you have a stream with elements tagged as
1 and 2, but none tagged as 0. If somebody applies an operation to the
stream of 0 elements (say stdoutLn), we have to process every single
element—and perform every single effect—in the input stream before we know
that the stream at index 0 is empty. In general, if we apply an operation
to an element of one of the output streams, we'd have to process at minimum
all the input elements up to and including that particular element. The
important thing is that, semantically, the output of your demux operation
is not n independent streams, but n views into a single stream.

It's probably possible to implement a version of this function that doesn't
process the *entire* input stream up-front—it would just process as much of
the input as it needed when you look at any given element in the output—but
it probably needs a different type than just Vector to make it work
correctly, and I'm not sure how to do that. More importantly, the behavior
of this function would still be confusing; it might *look* like you have
several distinct streams, but you'd have to do the effects and store the
results of every single step in the input stream even if you only used one
of your demuxed streams.

Does that explanation make sense? I haven't done much streaming stuff in a
while, so I'm struggling a bit with how to express my intuitions about it.

On Wed, Apr 15, 2020 at 9:42 AM ☂Josh Chia (謝任中) <joshchia at gmail.com> wrote:

> I have a streaming package question.
>
> Suppose I have a:
> Stream (Of (Int, a)) m ()
>
> I would like to demultiplex it into Vector (Stream (Of a) m ()) using the
> Int as the index of an item into the Vector of output streams.
>
> How can I do this efficiently (constant memory and linear time)?
>
> Does the following work?
> import qualified Streaming.Prelude as SP
> import qualified Data.Vector as V
>
> type StreamOf a m r = Stream (Of a) m r
>
> demuxStream :: forall a m. MonadIO m
> => Int -> StreamOf (Int, a) m () -> m (Vector (StreamOf a m ()))
> demuxStream numSyms stream =
> let emptyStreams = V.replicate numSyms (pure ())
> processItem v (iD, x) = V.modify (\vm -> VM.modify vm (>> SP.yield x) iD)
> v
> in SP.fold_ processItem emptyStreams id stream
>
> My guess is that it takes more than constant memory as it goes through the
> entire input stream before returning.
>
> Josh
> _______________________________________________
> Haskell-Cafe mailing list
> To (un)subscribe, modify options or view archives go to:
> http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe
> Only members subscribed via the mailman list are allowed to post.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/haskell-cafe/attachments/20200415/b46f8f86/attachment.html>


More information about the Haskell-Cafe mailing list