[Haskell-cafe] CPS Streams

jeff p mutjida at gmail.com
Wed Oct 10 09:27:36 CEST 2012


{-

I've always thought that the essence of iteratees is just CPS (I have
no desire to argue the merits of this statement, and I preemptively
concede to anyone who disagrees with this), so I decided to experiment
with a direct CPS version of streams. The resulting code turned out
nicer than I'd expected, so I'm writing about it in case anyone else
finds it interesting or instructive (it seems to me that Haskellers
tend to shy away from CPS). I'm pretty sure this technique is not new,
but a cursory search didn't turn up quite the same thing. I'd be
interested in any pointers to this somewhere else, as well any general
feedback about this.

-}

{-# LANGUAGE ScopedTypeVariables, TupleSections #-}

import Control.Applicative hiding (empty)
import Control.Arrow
import Control.Exception
import Control.Monad (unless)
import Data.Attoparsec.Text
import Data.Text
import Data.Text.IO
import Prelude hiding (catch, getLine, lines, length, null, putStrLn,
readFile, splitAt)
import qualified Prelude as P
import System.IO (IOMode(..), hClose, hIsClosed, withFile)
import System.IO.Error hiding (catch)

{-

A stream is a function which takes a stream continuation and gives a
result. A stream continuation takes an input (the next element of the
stream) and the rest of the stream and returns a result. If the
continuation is given Nothing, then the stream it was given to is
done. This intuition can be realized with the following type which
will let us do compositional incremental input processing in a direct
functional style.

-}
newtype Stream0CPS a r = Stream0CPS { stream0CPS :: Maybe (a, Stream0
a r) -> r }
type Stream0 a r = Stream0CPS a r -> r


-- Stream the lines of a file.
-- The finally statement is not necessary, but it lets us observe when
files are closed.
fileStream0 :: FilePath -> Stream0 Text (IO r)
fileStream0 fileName k = withFile fileName ReadMode $ \h -> do
    P.putStrLn $ "___opening "++fileName
    finally (go h k)
            (do isClosed <- hIsClosed h
                unless isClosed $ hClose h >> P.putStrLn ("___finally
closing "++fileName)
            )
  where
    go h k = stream0CPS k . fmap (, go h) =<< safeNextLine h
    safeNextLine h = catch (Just <$> hGetLine h) $ \e ->
                           if isEOFError e then hClose h >> P.putStrLn
("___EOF closing "++fileName) >> return Nothing
                                           else ioError e

-- Print elements of a stream.
printStream0 :: Show a => Stream0 a (IO ()) -> IO ()
printStream0 xs = xs $ Stream0CPS $ maybe (P.putStrLn "NOTHING") (\(x,
xs') -> P.putStrLn (show x) >> printStream0 xs')

-- Append two streams together.
appendStream0 :: Stream0 a r -> Stream0 a r -> Stream0 a r
appendStream0 xs ys k = xs $ Stream0CPS cont
  where cont Nothing = ys k
        cont (Just (v, xs')) = stream0CPS k $ Just (v, appendStream0 xs' ys)

-- Create a stream with only the first n elements of a given stream.
takeStream0 :: Int -> Stream0 a r -> Stream0 a r
takeStream0 n xs k | n > 0 = xs $ Stream0CPS $ stream0CPS k . fmap (id
*** takeStream0 (n - 1))
                  | otherwise = stream0CPS k Nothing

{-

Suppose we have a file named testWords with the words "one" through
"five" on separate lines, and a file called testNums with the numbers 1
though 5 on separate lines. Then we can do the following:

    *Main> printStream0 $ appendStream0 (fileStream0 "testWords")
(fileStream0 "testNums")
    ___opening testWords
    "one"
    "two"
    "three"
    "four"
    "five"
    ___EOF closing testWords
    ___opening testNums
    "1"
    "2"
    "3"
    "4"
    "5"
    ___EOF closing testNums
    NOTHING

which is as expected, however, we also see:

    *Main> printStream0 $ appendStream0 (takeStream0 3 $ fileStream0
"testWords") (fileStream0 "testNums")
    ___opening testWords
    "one"
    "two"
    "three"
    ___opening testNums
    "1"
    "2"
    "3"
    "4"
    "5"
    ___EOF closing testNums
    NOTHING
    ___finally closing testWords

which is not ideal since we'd like testWords to be closed as soon as
we're done with it. In order to accomplish this, we have to add a way
to tell a stream to close down. This is easily accomplished by giving
an extra argument to streams, which tells them what to do. While below
we have only allowed for streams to continue and stop, there is no
reason the StreamAction type, and the stream functions, couldn't be
expanded to allow for dynamically changing stream behavior, e.g. a
variable sized chunker, or a tunable filter.

-}
newtype StreamCPS a r = StreamCPS { streamCPS :: Maybe (a, Stream a r) -> r }
type Stream a r = StreamAction -> StreamCPS a r -> r
data StreamAction = Continue | Stop

{-

Now we just have to modify the previous functions to explain what to
do when asked to stop.

-}
fileStream :: FilePath -> Stream Text (IO r)
fileStream fileName action k = withFile fileName ReadMode $ \h -> do
    P.putStrLn $ "___opening "++fileName
    finally (go h action k)
            (do isClosed <- hIsClosed h
                unless isClosed $ hClose h >> P.putStrLn ("___finally
closing "++fileName)
            )
  where
    go h Continue k = streamCPS k . fmap (, go h) =<< safeNextLine h
    go h Stop k = hClose h >> P.putStrLn ("___Stop closing
"++fileName) >> streamCPS k Nothing
    safeNextLine h = catch (Just <$> hGetLine h) $ \e ->
                         if isEOFError e then hClose h >> P.putStrLn
("___EOF closing "++fileName) >> return Nothing
                                         else ioError e

printStream :: Show a => Stream a (IO ()) -> IO ()
printStream xs = xs Continue $ StreamCPS $
                   maybe (P.putStrLn "NOTHING") (\(x, xs') ->
P.putStrLn (show x) >> printStream xs')

-- In order to stop appendStream xs ys, stop xs then stop ys
appendStream :: Stream a r -> Stream a r -> Stream a r
appendStream xs ys Stop k = xs Stop $ StreamCPS $ \_ -> ys Stop k
appendStream xs ys Continue k = xs Continue $ StreamCPS cont
  where cont Nothing = ys Continue k
        cont (Just (v, xs')) = streamCPS k $ Just (v, appendStream xs' ys)

takeStream :: Int -> Stream a r -> Stream a r
takeStream _ xs Stop k = xs Stop k
takeStream n xs Continue k | n > 0 = xs Continue $ StreamCPS $
streamCPS k . fmap (id *** takeStream (n - 1))
                           | otherwise = xs Stop k

{-

Now we get the desired behavior:

    *Main> printStream $ appendStream (takeStream 3 $ fileStream
"testWords") (fileStream "testNums")
    ___opening testWords
    "one"
    "two"
    "three"
    ___Stop closing testWords
    ___opening testNums
    "1"
    "2"
    "3"
    "4"
    "5"
    ___EOF closing testNums
    NOTHING

Here are a few more stream functions to give a feel for what sorts of
things can easily be done.
-}

emptyStream :: Stream a r
emptyStream _ k = streamCPS k Nothing

getLineStream :: Stream Text (IO r)
getLineStream Stop k = streamCPS k Nothing
getLineStream Continue k = streamCPS k . Just . (, getLineStream) =<< getLine

listToStream :: [a] -> Stream a r
listToStream _ Stop k = streamCPS k Nothing
listToStream [] Continue k = streamCPS k Nothing
listToStream (x:xs) Continue k = streamCPS k $ Just (x, listToStream xs)


-- Stream consumers should have a general enough return type to work
with streams in a monad.
countStream :: Monad m => Stream a (m Int) -> m Int
countStream xs = xs Continue $ StreamCPS $ go 0
  where go n (Just (_, xs')) = xs' Continue $ StreamCPS $ go $ n + 1
        go n Nothing = return n


consStream :: a -> Stream a r -> Stream a r
consStream _ s Stop k = s Stop k
consStream x s Continue k = streamCPS k $ Just (x, s)

mapStream :: (a -> b) -> Stream a r -> Stream b r
mapStream f xs action k = xs action $ StreamCPS $ streamCPS k . fmap
(f *** mapStream f)

filterStream :: (a -> Bool) -> Stream a r -> Stream a r
filterStream p xs action k =
    xs action $ StreamCPS $ maybe (streamCPS k Nothing)
                                  (\(x, xs') -> if p x then streamCPS
k $ Just (x, filterStream p xs') else filterStream p xs' Continue k)

dropStream :: Int -> Stream a r -> Stream a r
dropStream _ xs Stop k = xs Stop k
dropStream n xs Continue k = xs Continue newK
  where newK | n > 0 = StreamCPS $ maybe (streamCPS k Nothing) (\(_,
xs') -> dropStream (n - 1) xs' Continue k)
             | otherwise = k

-- Round robin interleaving of a list of streams.
mergeStreams :: [Stream a r] -> Stream a r
mergeStreams [] _ k = streamCPS k Nothing
mergeStreams (s:ss) Stop k = s Stop $ StreamCPS $ \_ -> mergeStreams ss Stop k
mergeStreams (s:ss) Continue k = s Continue $ StreamCPS $ maybe
(mergeStreams ss Continue k) $

\(v,s') -> streamCPS k $ Just (v, mergeStreams $ ss++[s'])

-- Run an attoparsec parser over the given stream.
-- Note the parser can consume partial and/or multiple lines from the
stream to generate its result.
attoStream :: forall a r . Parser a -> Stream Text (IO r) -> Stream a (IO r)
attoStream p = go (parse p)
  where go :: (Text -> Result a) -> Stream Text (IO r) -> Stream a (IO r)
        go _ xs Stop k = xs Stop $ StreamCPS $ \_ -> streamCPS k Nothing
        go f xs Continue k = xs Continue $ StreamCPS $ maybe
(streamCPS k Nothing) $ \(line, xs') ->
                             case f line of
                               Done rest v -> streamCPS k $ Just (v,
attoStream p $ consStream rest xs')
                               f'@(Partial _) -> go (feed f') xs' Continue k
                               f'@(Fail _ _ctxs err) -> P.putStrLn
("___Atto Error: "++err) >> go (feed f') xs' Stop k



More information about the Haskell-Cafe mailing list