[Haskell-cafe] conduit/pipes/streaming and prompt cleanup

Michael Snoyman michael at snoyman.com
Tue Jun 4 13:45:11 UTC 2019


I think you're seeing the fact that conduit no longer has finalizers, see:

https://www.snoyman.com/blog/2018/01/drop-conduits-finalizers

Though I don't think your example proves the point. You know that the code
following `takeC 3` will never `await`, but there's no way for the conduit
library to know that. Instead, if you wanted to demonstrate the limitation
of removing finalizers you'd need to rewrite your code to this:

pipe :: C.ConduitM a c (C.ResourceT IO) ()
pipe = ((fileLines "TODO" .| C.takeC 3) >> C.yield "***")
    .| C.mapM_C (Trans.liftIO . putStrLn)

Here, we can immediately see the `fileLines` is not going to be `await`ed
from again once `takeC 3` is complete, and yet without finalizers the close
is still delayed. However, now that I've done that slight rewrite, we can
make a further rewrite to leverage the bracket pattern and get back prompt
finalization:

pipe :: C.ConduitM a c (C.ResourceT IO) ()
pipe = ((withFileLines "TODO" (C.takeC 3)) >> C.yield "***")
    .| C.mapM_C (Trans.liftIO . putStrLn)

withFileLines
  :: C.MonadResource m
  => FilePath
  -> C.ConduitT String o m r
  -> C.ConduitT i o m r
withFileLines fname inner =
  C.bracketP
    (IO.openFile fname IO.ReadMode)
    close
    (\h -> handleLines h .| inner)

On Tue, Jun 4, 2019 at 12:17 AM Evan Laforge <qdunkan at gmail.com> wrote:

> I'm using the 'streaming' library and realized it doesn't close files in a
> timely way for the way I'm using it, and in fact can't, due to how the
> library
> works.  I know conduit has put a lot of thought into closing resources in a
> timely way, so I did an experiment to see what it does, but as far as I can
> tell, conduit has the same problem.  Maybe I'm doing it wrong?
>
> The situation is that I'm opening multiple files and mixing their output.
> I want to close the inputs as soon as I'm done with them.  But I can get
> done with them earlier than the end of the file.  Since all of these
> libraries
> are based on pulling from downstream, if you don't pull all the way to the
> end,
> the close at the end doesn't happen, and has to wait until runResourceT
> returns, which is too late.  I remember long ago reading Oleg's original
> iteratee paper, and it seems like he called out this problem with
> pull-based
> iterators, that the iterator doesn't know when its caller is done with it,
> so
> it can't close files on time.
>
> Here's a conduit version that I think illustrates the situation:
>
>     import qualified Conduit as C
>     import           Conduit ((.|))
>     import qualified Control.Monad.Trans as Trans
>     import qualified System.IO as IO
>
>     main :: IO ()
>     main = C.runResourceT $ C.runConduit pipe
>
>     pipe :: C.ConduitM a c (C.ResourceT IO) ()
>     pipe = fileLines "TODO" .| (C.takeC 3 >> C.yield "***")
>         .| C.mapM_C (Trans.liftIO . putStrLn)
>
>     fileLines :: C.MonadResource m => FilePath -> C.ConduitT i String m ()
>     fileLines fname = C.bracketP (IO.openFile fname IO.ReadMode) close
> handleLines
>
>     handleLines :: Trans.MonadIO m => IO.Handle -> C.ConduitT i String m ()
>     handleLines hdl = loop
>         where
>         loop = do
>             eof <- Trans.liftIO $ IO.hIsEOF hdl
>             if eof then return () else do
>                 line <- Trans.liftIO $ IO.hGetLine hdl
>                 C.yield line
>                 loop
>
>     close :: IO.Handle -> IO ()
>     close hdl = IO.hClose hdl >> putStrLn "=== close"
>
> This prints the first three lines of TOOD, then ***, and then "=== close",
> where the close should go before the ***s.
>
> As far as I can see, conduit can't do this any more than 'streaming' can,
> because 'C.takeC' is just some awaits and yields, with no indication that
> the final await is more special than any other await.
>
> I think what would be necessary to solve this is that combinators like
> 'take'
> have to be able to tell the stream to close, and that has to propagate
> back up
> to each producer that has registered a cleanup.  Of course this renders the
> stream invalid, so it's not safe to have any other references to the stream
> around, but I guess streams are stateful in general so that's always true.
> Maybe I could accumulate the finalizers in the stream data type and have
> combinators like 'take' call it as soon as they've taken their last.  What
> I actually wound up doing was make a 'takeClose' that also takes a 'close'
> action to run when its done with the stream.  It's not exactly general but
> I'm
> not writing a library so I don't need general.
>
> Is there some kind of standard or built-in solution for this situation?
> I know others have given a lot more thought to streaming than I have,
> so surely this issue has come up.
>
> I know there is a lot of talk about "prompt finalisation" and streams vs.
> pipes vs. conduit, and talk about brackets and whatnot, but despite reading
> various documents (https://hackage.haskell.org/package/streaming-with,
>
> http://www.haskellforall.com/2013/01/pipes-safe-10-resource-management-and.html
> ,
> etc.) I still don't really understand what they're talking about.
> It seems like they're really about reliable cleanup when there are
> exceptions, not really about prompt cleanup.  Certainly pipes-safe doesn't
> do
> prompt cleanup, at least not the kind I'm talking about.
> _______________________________________________
> Haskell-Cafe mailing list
> To (un)subscribe, modify options or view archives go to:
> http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe
> Only members subscribed via the mailman list are allowed to post.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/haskell-cafe/attachments/20190604/5fcd371f/attachment.html>


More information about the Haskell-Cafe mailing list