[Haskell-cafe] Re: [Very long] (CHP?) Compressing, MD5 and big files

Maciej Piechotka uzytkownik2 at gmail.com
Mon Jan 4 04:30:51 EST 2010


On Sun, 2010-01-03 at 17:34 +0100, Maciej Piechotka wrote:
> I have following problem: I'd like to operate on big files so I'd
> prefere to operate on 'stream' instead of whole file at a time to avoid
> keeping too much in memory. I need to calculate MD5 and compress file. 
> 
> I tried to use something like that but I'm afraid that I'd need to patch
> zlib package as it results in deadlock:
> 
If I add:

> pipeline4 = do file <- oneToOneChannel' $ chanLabel "File"
>                data_ <- oneToOneChannel' $ chanLabel "Data"
>                compressed <- oneToOneChannel' $ chanLabel "Compressed"
>                runParallel_ [getFiles (writer file),
>                              readFromFile (reader file)
>                                           (writer data_),
>                            compressCHP (reader data_)
>                                        (writer compressed),
>                            CHP.consume (reader compressed)]

And change compress to(I'm not tested without change but here I omit
explicit interleave):

> stateM :: Monad m => a -> (a -> m a) -> m b
> stateM i f = f i >>= flip stateM f

Like forever but with state

> chanMaybe2List :: (ReadableChannel r, Poisonable (r (Maybe a)),
>                    WriteableChannel w, Poisonable (w [a]))
>                => r (Maybe a)
>                -> w [a]
>                -> CHP ()
> chanMaybe2List in_ out = do
>   chan <- liftIO $ newChan
>   list <- liftIO ((Just Nothing :) <$> getChanContents chan)
>   runParallel_ [forever (readChannel in_ >>=
>                          liftIO . writeChan chan . Just)
>                 `onPoisonRethrow` (liftIO (writeChan chan Nothing) >>
>                                    poison in_),
>                forever $ stateM list process]
>   where process (Nothing     :_)  = poison out >> throwPoison
>         process (Just Nothing:Nothing:_) = poison out >> throwPoison
>         process (Just Nothing:xs) =
>           let (this, that) = span isJust xs
>               isJust = maybe False (maybe False (const True))
>               this' = map fromJust (map fromJust this)
>           in writeChannel out (map fromJust $ map fromJust this) >>
>              process that

Writes to output lazy list of all elements in input

> compressCHP' :: (ReadableChannel r, Poisonable (r [BS.ByteString]),
>                  WriteableChannel w, Poisonable (w [BS.ByteString]))
>              => r [BS.ByteString]
>              -> w [BS.ByteString]
>              -> CHP ()
> compressCHP' in_ out = forever (writeChannel out .
>                                 LBS.toChunks . compress . 
>                                 LBS.fromChunks =<<
>                                 readChannel in_)
>                        `onPoisonRethrow` (poison in_ >> poison out)

Compresses the lists of chunks

> toMaybeList :: (ReadableChannel r, Poisonable (r [a]),
>                 WriteableChannel w, Poisonable (w (Maybe a)))
>             => r [a]
>             -> w (Maybe a)
>             -> CHP ()
> toMaybeList in_ out = forever (readChannel in_ >>=
>                                mapM_ (writeChannel out . Just) >>
>                                writeChannel out Nothing)
>                       `onPoisonRethrow` (poison in_ >> poison out)

Converts back to list

> compressCHP :: (ReadableChannel r,
>                 Poisonable (r (Maybe BS.ByteString)),
>                 WriteableChannel w,
>                 Poisonable (w (Maybe BS.ByteString)))
>             => r (Maybe BS.ByteString)
>             -> w (Maybe BS.ByteString)
>             -> CHP ()
> compressCHP = chanMaybe2List |->| compressCHP' |->| toMaybeList

Combines all 3 operations.


However pipeline3 still results in deadlock:
Just "Test1\n"
(CHP) Thread terminated with: thread blocked indefinitely in an STM
transaction
< _b3, _b4, _b4, File GZ."test1.gz", _c5, _b6, _c7, _b3 >

Well - at least I know where there is no problem.

Regardss




More information about the Haskell-Cafe mailing list