[Haskell-cafe] streaming package: How to demux a stream properly?

☂Josh Chia (謝任中) joshchia at gmail.com
Wed Apr 15 16:40:19 UTC 2020

I have a streaming package question.

Suppose I have a:
Stream (Of (Int, a)) m ()

I would like to demultiplex it into Vector (Stream (Of a) m ()) using the
Int as the index of an item into the Vector of output streams.

How can I do this efficiently (constant memory and linear time)?

Does the following work?
import qualified Streaming.Prelude as SP
import qualified Data.Vector as V

type StreamOf a m r = Stream (Of a) m r

demuxStream :: forall a m. MonadIO m
=> Int -> StreamOf (Int, a) m () -> m (Vector (StreamOf a m ()))
demuxStream numSyms stream =
let emptyStreams = V.replicate numSyms (pure ())
processItem v (iD, x) = V.modify (\vm -> VM.modify vm (>> SP.yield x) iD) v
in SP.fold_ processItem emptyStreams id stream

My guess is that it takes more than constant memory as it goes through the
entire input stream before returning.

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/haskell-cafe/attachments/20200416/b43cc9fa/attachment.html>

More information about the Haskell-Cafe mailing list