[Haskell-cafe] [Very long] (CHP?) Compressing, MD5 and big files
Maciej Piechotka
uzytkownik2 at gmail.com
Sun Jan 3 11:34:52 EST 2010
I have following problem: I'd like to operate on big files so I'd
prefere to operate on 'stream' instead of whole file at a time to avoid
keeping too much in memory. I need to calculate MD5 and compress file.
I tried to use something like that but I'm afraid that I'd need to patch
zlib package as it results in deadlock:
> {-# LANGUAGE GADTs #-}
> import Codec.Compression.GZip
> import Control.Applicative
> import Control.Concurrent.CHP
> import qualified Control.Concurrent.CHP.Common as CHP
> import Control.Concurrent.CHP.Enroll
> import Control.Concurrent.CHP.Utils
> import Control.Monad.State.Strict
> import Data.Digest.Pure.MD5
> import Data.Maybe
> import qualified Data.ByteString.Char8 as BS
> import qualified Data.ByteString.Lazy.Char8 as LBS
> import qualified Data.ByteString.Lazy.Internal as LBS
> import System.Environment
> import System.IO
> import System.IO.Unsafe
>
>
> calculateMD5 :: (ReadableChannel r,
> Poisonable (r (Maybe BS.ByteString)),
> WriteableChannel w,
> Poisonable (w MD5Digest))
> => r (Maybe BS.ByteString)
> -> w MD5Digest
> -> CHP ()
> calculateMD5 in_ out = evalStateT (forever loop) md5InitialContext
> `onPoisonRethrow` (poison in_ >> poison out)
> where loop = liftCHP (readChannel in_) >>=
> calc'
> calc' Nothing = gets md5Finalize >>=
> liftCHP .
> writeChannel out >>
> put md5InitialContext
> calc' (Just b) = modify (flip md5Update
> $ LBS.fromChunks [b])
Calculate MD5 hash of input stream. Nothing indicates EOF.
> unsafeInterleaveCHP :: CHP a -> CHP a
> unsafeInterleaveCHP = fromJust <.> liftIO <=<
> unsafeInterleaveIO <.> embedCHP
Helper function. It is suppose to move the execution in time - just as
unsafeInterleaveIO. I belive that the main problem lives here.
Especially that Maybe.fromJust: Nothing is the error.
> chan2List :: (ReadableChannel r, Poisonable (r a))
> => r a -> CHP [a]
> chan2List in_ = unsafeInterleaveCHP ((liftM2 (:) (readChannel in_)
> (chan2List in_))
> `onPoisonTrap` return [])
Changes channel to lazy read list.
> chanMaybe2List :: (ReadableChannel r,
> Poisonable (r (Maybe a)))
> => r (Maybe a)
> -> CHP [[a]]
> chanMaybe2List in_ = splitByMaybe <$> chan2List
> where splitByMaybe [] = []
> splitByMaybe (Nothing:xs) =
> []:splitByMaybe xs
> splitByMaybe (Just v :[]) = [[v]]
> splitByMaybe (Just v :xs) =
> let (y:ys) = splitByMaybe xs
> in (v:y):ys
Reads lazyly from channel o list of list
> compressCHP :: (ReadableChannel r,
> Poisonable (r (Maybe BS.ByteString)),
> WriteableChannel w,
> Poisonable (w (Maybe BS.ByteString)))
> => r (Maybe BS.ByteString)
> -> w (Maybe BS.ByteString)
> -> CHP ()
> compressCHP in_ out = toOut >>= mapM_ sendBS
> where in_' :: CHP [LBS.ByteString]
> in_' = fmap LBS.fromChunks <$>
> chanMaybe2List in_
> toOut :: CHP [LBS.ByteString]
> toOut = fmap compress <$> in_'
> sendBS :: LBS.ByteString -> CHP ()
> sendBS LBS.Empty = writeChannel out
> Nothing
> sendBS (LBS.Chunk c r) = writeChannel out
> (Just c)
> >> sendBS r
Compress process
> readFromFile :: (ReadableChannel r,
> Poisonable (r String),
> WriteableChannel w,
> Poisonable (w (Maybe BS.ByteString)))
> => r String
> -> w (Maybe BS.ByteString)
> -> CHP ()
> readFromFile file data_ =
> forever (do path <- readChannel file
> hnd <- liftIO $ openFile path ReadMode
> let copy = liftIO (BS.hGet hnd LBS.defaultChunkSize) >>=
> writeChannel data_ . Just
> copy `onPoisonRethrow` liftIO (hClose hnd)
> writeChannel data_ Nothing
> liftIO $ hClose hnd)
> `onPoisonRethrow` (poison file >> poison data_)
Process reading from file
> writeToFile :: (ReadableChannel r,
> Poisonable (r String),
> ReadableChannel r',
> Poisonable (r' (Maybe BS.ByteString)))
> => r String
> -> r' (Maybe BS.ByteString)
> -> CHP ()
> writeToFile file data_ =
> forever (do path <- readChannel file
> hnd <- liftIO $ openFile path WriteMode
> let writeUntilNothing = readChannel data_ >>=
> writeUntilNothing'
> writeUntilNothing' Nothing = return ()
> writeUntilNothing' (Just v) = liftIO (BS.hPutStr
> hnd v) >>
> writeUntilNothing
> writeUntilNothing `onPoisonFinally` liftIO (hClose hnd))
> `onPoisonRethrow` (poison file >> poison data_)
Process writing to file
> getFiles :: (WriteableChannel w, Poisonable (w String))
> => w String -> CHP ()
> getFiles out = mapM_ (writeChannel out) ["test1", "test2"] >>
> poison (out)
Sample files. Each contains "Test1\n"
> pipeline1 :: CHP ()
> pipeline1 = do md5sum <- oneToOneChannel' $ chanLabel "MD5"
> runParallel_ [(getFiles ->|^
> ("File", readFromFile) ->|^
> ("Data", calculateMD5))
> (writer md5sum),
> forever $ readChannel (reader md5sum) >>=
> liftIO . print]
First pipeline. Output:
fa029a7f2a3ca5a03fe682d3b77c7f0d
fa029a7f2a3ca5a03fe682d3b77c7f0d
< File."test1", Data.Just "Test1\n", Data.Nothing,
MD5.fa029a7f2a3ca5a03fe682d3b77c7f0d, File."test2", Data.Just "Test1\n",
Data.Nothing, MD5.fa029a7f2a3ca5a03fe682d3b77c7f0d >
> pipeline2 :: CHP ()
> pipeline2 = enrolling $ do
> file <- oneToManyChannel' $ chanLabel "File"
> fileMD5 <- oneToOneChannel' $ chanLabel "File MD5"
> data_ <- oneToOneChannel' $ chanLabel "Data"
> md5 <- oneToOneChannel' $ chanLabel "MD5"
> md5BS <- oneToOneChannel' $ chanLabel "MD5 ByteString"
> fileMD5' <- Enroll (reader file)
> fileData <- Enroll (reader file)
> liftCHP $ runParallel_ [getFiles (writer file),
> (forever $ readChannel fileMD5' >>=
> writeChannel (writer fileMD5) .
> (++".md5"))
> `onPoisonRethrow`
> (poison fileMD5' >>
> poison (writer fileMD5)),
> readFromFile fileData (writer data_),
> calculateMD5 (reader data_) (writer md5),
> (forever $ do v <- readChannel (reader md5)
> let v' = Just $ BS.pack $ show
v
> writeChannel (writer md5BS) v'
> writeChannel (writer md5BS)
> Nothing)
> `onPoisonRethrow`
> (poison (writer md5BS) >>
> poison (reader md5)),
> writeToFile (reader fileMD5) (reader md5BS)]
Correct pipeline (testing EnrollingT):
< _b4, File MD5."test1.md5", Data.Just "Test1\n", Data.Nothing,
MD5.fa029a7f2a3ca5a03fe682d3b77c7f0d, _b4, MD5 ByteString.Just
"fa029a7f2a3ca5a03fe682d3b77c7f0d", Data.Just "Test1\n", Data.Nothing,
MD5 ByteString.Nothing, MD5.fa029a7f2a3ca5a03fe682d3b77c7f0d, File
MD5."test2.md5", MD5 ByteString.Just "fa029a7f2a3ca5a03fe682d3b77c7f0d",
MD5 ByteString.Nothing >
% cat test1.md5
fa029a7f2a3ca5a03fe682d3b77c7f0d%
> pipeline3 :: CHP ()
> pipeline3 = enrolling $ do
> file <- oneToManyChannel' $ chanLabel "File"
> fileGZ <- oneToOneChannel' $ chanLabel "File GZ"
> data_ <- oneToManyChannel' $ chanLabel "Data"
> compressed <- oneToManyChannel' $ chanLabel "Data Compressed"
> md5 <- oneToOneChannel' $ chanLabel "MD5"
> md5Compressed <- oneToOneChannel' $ chanLabel "MD5 Compressed"
> fileGZ' <- Enroll (reader file)
> fileData <- Enroll (reader file)
> dataMD5 <- Enroll (reader data_)
> dataCompress <- Enroll (reader data_)
> compressedFile <- Enroll (reader compressed)
> compressedMD5 <- Enroll (reader compressed)
> liftCHP $ runParallel_ [getFiles (writer file),
> (forever $ readChannel fileGZ' >>=
> writeChannel (writer fileGZ) .
> (++".gz"))
> `onPoisonRethrow`
> (poison fileGZ' >> poison (writer fileGZ)),
> readFromFile fileData (writer data_),
> calculateMD5 dataMD5 (writer md5),
> compressCHP dataCompress
> (writer compressed),
> writeToFile (reader fileGZ) compressedFile,
> calculateMD5 compressedMD5
> (writer md5Compressed),
> forever $ readChannel dataMD5 >>=
> liftIO . print >>
> readChannel compressedMD5 >>=
> liftIO . print]
Problems:
(CHP) Thread terminated with: thread blocked indefinitely in an STM
transaction
< _b3, _b4, File GZ."test1.gz" >
> onPoisonFinally :: CHP a -> CHP () -> CHP a
> onPoisonFinally m b = (m `onPoisonRethrow` b) <* b
>
Utility function (used for closing handles)
> (<.>) :: Functor f => (b -> c) -> (a -> f b) -> a -> f c
> f <.> g = fmap f . g
<.> is for <$> as . to $.
> instance MonadCHP m => MonadCHP (StateT s m) where
> liftCHP = lift . liftCHP
Missing instance for strict monad
> (->|^) :: Show b
> => (Chanout b -> CHP ()) -> (String, Chanin b -> c -> CHP ())
> -> (c -> CHP ())
> (->|^) p (l, q) x = do c <- oneToOneChannel' $ chanLabel l
> runParallel_ [p (writer c), q (reader c) x]
'Missing' helper function
> data EnrollingT a where
> Lift :: CHP a -> EnrollingT a
> Enroll :: (Enrollable b z) => b z -> EnrollingT (Enrolled b z)
>
> enrolling :: EnrollingT a -> CHP a
> enrolling (Lift v) = v
> enrolling (Enroll b) = enroll b return
>
> instance Monad EnrollingT where
> (Lift m) >>= f = Lift $ m >>= enrolling . f
> (Enroll b) >>= f = Lift $ enroll b (enrolling . f)
> return = Lift . return
> instance MonadIO EnrollingT where
> liftIO = Lift . liftIO
> instance MonadCHP EnrollingT where
> liftCHP = Lift
Helper monad for enrolling (I know T should stand for transforming but
then I realize problems).
Thanks in advance
More information about the Haskell-Cafe
mailing list