[Haskell-cafe]
Re: [Very long] (CHP?) Compressing, MD5 and big files
Maciej Piechotka
uzytkownik2 at gmail.com
Mon Jan 4 04:30:51 EST 2010
On Sun, 2010-01-03 at 17:34 +0100, Maciej Piechotka wrote:
> I have following problem: I'd like to operate on big files so I'd
> prefere to operate on 'stream' instead of whole file at a time to avoid
> keeping too much in memory. I need to calculate MD5 and compress file.
>
> I tried to use something like that but I'm afraid that I'd need to patch
> zlib package as it results in deadlock:
>
If I add:
> pipeline4 = do file <- oneToOneChannel' $ chanLabel "File"
> data_ <- oneToOneChannel' $ chanLabel "Data"
> compressed <- oneToOneChannel' $ chanLabel "Compressed"
> runParallel_ [getFiles (writer file),
> readFromFile (reader file)
> (writer data_),
> compressCHP (reader data_)
> (writer compressed),
> CHP.consume (reader compressed)]
And change compress to(I'm not tested without change but here I omit
explicit interleave):
> stateM :: Monad m => a -> (a -> m a) -> m b
> stateM i f = f i >>= flip stateM f
Like forever but with state
> chanMaybe2List :: (ReadableChannel r, Poisonable (r (Maybe a)),
> WriteableChannel w, Poisonable (w [a]))
> => r (Maybe a)
> -> w [a]
> -> CHP ()
> chanMaybe2List in_ out = do
> chan <- liftIO $ newChan
> list <- liftIO ((Just Nothing :) <$> getChanContents chan)
> runParallel_ [forever (readChannel in_ >>=
> liftIO . writeChan chan . Just)
> `onPoisonRethrow` (liftIO (writeChan chan Nothing) >>
> poison in_),
> forever $ stateM list process]
> where process (Nothing :_) = poison out >> throwPoison
> process (Just Nothing:Nothing:_) = poison out >> throwPoison
> process (Just Nothing:xs) =
> let (this, that) = span isJust xs
> isJust = maybe False (maybe False (const True))
> this' = map fromJust (map fromJust this)
> in writeChannel out (map fromJust $ map fromJust this) >>
> process that
Writes to output lazy list of all elements in input
> compressCHP' :: (ReadableChannel r, Poisonable (r [BS.ByteString]),
> WriteableChannel w, Poisonable (w [BS.ByteString]))
> => r [BS.ByteString]
> -> w [BS.ByteString]
> -> CHP ()
> compressCHP' in_ out = forever (writeChannel out .
> LBS.toChunks . compress .
> LBS.fromChunks =<<
> readChannel in_)
> `onPoisonRethrow` (poison in_ >> poison out)
Compresses the lists of chunks
> toMaybeList :: (ReadableChannel r, Poisonable (r [a]),
> WriteableChannel w, Poisonable (w (Maybe a)))
> => r [a]
> -> w (Maybe a)
> -> CHP ()
> toMaybeList in_ out = forever (readChannel in_ >>=
> mapM_ (writeChannel out . Just) >>
> writeChannel out Nothing)
> `onPoisonRethrow` (poison in_ >> poison out)
Converts back to list
> compressCHP :: (ReadableChannel r,
> Poisonable (r (Maybe BS.ByteString)),
> WriteableChannel w,
> Poisonable (w (Maybe BS.ByteString)))
> => r (Maybe BS.ByteString)
> -> w (Maybe BS.ByteString)
> -> CHP ()
> compressCHP = chanMaybe2List |->| compressCHP' |->| toMaybeList
Combines all 3 operations.
However pipeline3 still results in deadlock:
Just "Test1\n"
(CHP) Thread terminated with: thread blocked indefinitely in an STM
transaction
< _b3, _b4, _b4, File GZ."test1.gz", _c5, _b6, _c7, _b3 >
Well - at least I know where there is no problem.
Regardss
More information about the Haskell-Cafe
mailing list