[Haskell-cafe] Large JSON File Processing

erik eraker at gmail.com
Thu Jan 25 04:32:34 UTC 2018


> Parallelization in conduit can usually be achieved with the stm-conduit
library, which I believe provides the functionality you're looking for.

That's what I thought, but I couldn't figure out how one typically would
fan-out work across one of the middle conduits. I'd like to run one or two
functions across a lot of inputs in parallel, not each section of the whole
pipeline in parallel.

My attempt at using channels with stm-conduit, dumped all the filepaths
into a channel and then I tried to spawn some async conduit workers that
would read from the channels and yield their results into a shared
downstream channel.

Hmm. It still sounds like it should work, actually. I wonder what went
wrong. Maybe I should try again.

On Wed, Jan 24, 2018 at 8:27 PM, Michael Snoyman <michael at snoyman.com>
wrote:

> Parallelization in conduit can usually be achieved with the stm-conduit
> library, which I believe provides the functionality you're looking for.
>
> On Wed, Jan 24, 2018, 11:52 PM erik <eraker at gmail.com> wrote:
>
>> With Michael Snoyman's help, I rewrote my Conduit version of the
>> application (without using stm-conduit). This was a large improvement: my
>> first Conduit version was operating over all data and I didn't realize
>> this.
>>
>> I also increased the nursery size.
>>
>> My revised function ended up looking like this:
>>
>> module Search where
>> import           Conduit               ((.|))import qualified Conduit               as Cimport           Control.Monadimport           Control.Monad.IO.Class   (MonadIO, liftIO)import           Control.Monad.Trans.Resource (MonadResource)import qualified Data.ByteString       as Bimport           Data.List             (isPrefixOf)import           Data.Maybe            (fromJust, isJust)import           System.Path.NameManip (guess_dotdot, absolute_path)import           System.FilePath       (addTrailingPathSeparator, normalise)import           System.Directory      (getHomeDirectory)
>> import           Filters
>>
>>
>> sourceFilesFilter :: (MonadResource m, MonadIO m) => ProjectFilter -> FilePath -> C.ConduitM () String m ()
>> sourceFilesFilter projFilter dirname' =
>>     C.sourceDirectoryDeep False dirname'
>>     .| parseProject projFilter
>>
>> parseProject :: (MonadResource m, MonadIO m) => ProjectFilter -> C.ConduitM FilePath String m ()
>> parseProject (ProjectFilter filterFunc) = do
>>   C.awaitForever go
>>   where
>>     go path' = do
>>       bytes <- liftIO $ B.readFile path'
>>       let isProj = validProject bytes
>>       when (isJust isProj) $ do
>>         let proj' = fromJust isProj
>>         when (filterFunc proj') $ C.yield path'
>>
>> My main just runs the conduit and prints those that pass the filter:
>>
>> mainStreamingConduit :: IO ()
>> mainStreamingConduit = do
>>   options <- getRecord "Search JSON Files"
>>   let filterFunc = makeProjectFilter options
>>   searchDir <- absolutize (searchPath options)
>>   itExists <- doesDirectoryExist searchDir
>>   case itExists of
>>     False -> putStrLn "Search Directory does not exist" >> exitWith (ExitFailure 1)
>>     True -> C.runConduitRes $ sourceFilesFilter filterFunc searchDir .| C.mapM_ (liftIO . putStrLn)
>>
>> I run it like this (without the stats, typically):
>>
>> stack exec search-json -- --searchPath $FILES --name NAME +RTS -s -A32m -n4m
>>
>> Without increasing nursery size, I get a productivity around 30%. With
>> the above, however, it looks like this:
>>
>>   72,308,248,744 bytes allocated in the heap
>>      733,911,752 bytes copied during GC
>>        7,410,520 bytes maximum residency (8 sample(s))
>>          863,480 bytes maximum slop
>>              187 MB total memory in use (27 MB lost due to fragmentation)
>>
>>                                      Tot time (elapsed)  Avg pause  Max pause
>>   Gen  0       580 colls,   580 par    2.731s   0.772s     0.0013s    0.0105s
>>   Gen  1         8 colls,     7 par    0.163s   0.044s     0.0055s    0.0109s
>>
>>   Parallel GC work balance: 35.12% (serial 0%, perfect 100%)
>>
>>
>>
>>   TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)
>>
>>   SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
>>
>>
>>
>>   INIT    time    0.001s  (  0.006s elapsed)
>>   MUT     time   26.155s  ( 31.602s elapsed)
>>   GC      time    2.894s  (  0.816s elapsed)
>>   EXIT    time   -0.003s  (  0.008s elapsed)
>>   Total   time   29.048s  ( 32.432s elapsed)
>>
>>   Alloc rate    2,764,643,665 bytes per MUT second
>>
>>   Productivity  90.0% of total user, 97.5% of total elapsed
>>
>> gc_alloc_block_sync: 3494
>> whitehole_spin: 0
>> gen[0].sync: 15527
>> gen[1].sync: 177
>>
>> I'd still like to figure out how to parallelize the filterProj .
>> parseJson . readFile part, but for now I'm satisfied with what I have.
>>
>> (I also isolated my crashing to another process launched from the same
>> terminal window.)
>>
>> On Sun, Jan 21, 2018 at 10:12 PM, Michael Snoyman <michael at snoyman.com>
>> wrote:
>>
>>> I just wanted to comment on the conduit aspect of this in particular.
>>> Looking at your first version:
>>>
>>> conduitFilesFilter :: ProjectFilter -> Path Abs Dir -> IO [Path Abs File]
>>> conduitFilesFilter projFilter dirname' = do
>>>   (_, allFiles) <- listDirRecur dirname'
>>>   C.runConduit $
>>>     C.yieldMany allFiles
>>>     .| C.filterMC (filterMatchingFile projFilter)
>>>     .| C.sinkList
>>>
>>>
>>> This isn't taking full advantage of conduit: you're reading in a list of
>>> the files in the file system, instead of streaming those values. And the
>>> output is a list of `String`, instead of streaming out those `String`s.
>>> More idiomatic would look something like:
>>>
>>> sourceFilesFilter projFilter dirname' =
>>>   sourceDirectoryDeep False dirname' .| filterMC (filterMatchingFile
>>> projFilter)
>>>
>>> And then, wherever you're consuming the output, to do so in a streaming
>>> fashion, e.g.:
>>>
>>> runConduitRes $ sourceFilesFilter projFilter dirname' .| mapM_C print
>>>
>>> This should help with the increasing memory usage, though it will do
>>> nothing about the runtime overhead of parsing the JSON itself.
>>>
>>> On Mon, Jan 22, 2018 at 1:38 AM, erik <eraker at gmail.com> wrote:
>>>
>>>> Hello Haskell Cafe,
>>>>
>>>> I have written a small, pretty simple program but I am finding it hard
>>>> to reason about its behavior (and also about the best way to do what I
>>>> want), so I would like to ask you all for some suggestions.
>>>>
>>>> For reference, here's a Stack Overflow question
>>>> <https://stackoverflow.com/questions/48330690/haskell-conduit-aeson-parsing-large-jsons-and-filter-matching-key-values/48348153#48348153>
>>>> where I described what's going on, but I'll also describe it below.
>>>>
>>>> My program does the following:
>>>>
>>>>    1. Recursively list a directory,
>>>>    2. Parse the JSON files from the directory list into identifiable
>>>>    objects/records,
>>>>    3. Look for matching key-value pairs, and
>>>>    4. Return filenames where matches have been found.
>>>>
>>>> A few details for more context:
>>>>
>>>>    - I have to filter between 500,000 and 1 million files (I'm
>>>>    typically trying to reduce down to between 1,000 and 40,000 that represent
>>>>    a particular project). I usually just need the filenames.
>>>>    - Each file is quite large, some of them 5mb or 10mb, and it's not
>>>>    uncommon for them to have deeply nested keys (40,000 keys or so).
>>>>
>>>> My first version of this program was simple, synchronous, and as
>>>> straightforward as I could come up with. However, the memory usage
>>>> increased monotonically. Profiling, I found that most of the time was spent
>>>> in JSON-parsing into Objects before my code could turn the objects into
>>>> records (also, as you might imagine, tons of time in garbage collection).
>>>>
>>>> For my second version, I switched to conduit and it seemed to solve the
>>>> increasing memory issue. My core function now looked like this:
>>>>
>>>> conduitFilesFilter :: ProjectFilter -> Path Abs Dir -> IO [Path Abs File]
>>>> conduitFilesFilter projFilter dirname' = do
>>>>   (_, allFiles) <- listDirRecur dirname'
>>>>   C.runConduit $
>>>>     C.yieldMany allFiles
>>>>     .| C.filterMC (filterMatchingFile projFilter)
>>>>     .| C.sinkList
>>>>
>>>>
>>>> This was still slow and certainly still synchronous. What I really
>>>> wanted was to run that "filterMatchingFile..." part in parallel across a
>>>> number of CPUs. As an aside, my filtering function looks like this:
>>>>
>>>> filterMatchingFile :: ProjectFilter -> Path Abs File -> IO Bool
>>>> filterMatchingFile (ProjectFilter filterFunc) fpath = do
>>>>   let fp = toFilePath fpath
>>>>   bs <- B.readFile fp
>>>>   case validImplProject bs of  -- this is pretty much just
>>>> `decodeStrict`
>>>>     Nothing -> pure False
>>>>     (Just proj') -> pure $ filterFunc proj'
>>>>
>>>> Here are the stats from running this:
>>>>
>>>> 115,961,554,600 bytes allocated in the heap
>>>>   35,870,639,768 bytes copied during GC
>>>>       56,467,720 bytes maximum residency (681 sample(s))
>>>>        1,283,008 bytes maximum slop
>>>>              145 MB total memory in use (0 MB lost due to fragmentation)
>>>>
>>>>                                      Tot time (elapsed)  Avg pause  Max pause
>>>>   Gen  0     108716 colls, 108716 par   76.915s  20.571s     0.0002s    0.0266s
>>>>   Gen  1       681 colls,   680 par    0.530s   0.147s     0.0002s    0.0009s
>>>>
>>>>   Parallel GC work balance: 14.99% (serial 0%, perfect 100%)
>>>>
>>>>   TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)
>>>>
>>>>   SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
>>>>
>>>>   INIT    time    0.001s  (  0.007s elapsed)
>>>>   MUT     time   34.813s  ( 42.938s elapsed)
>>>>   GC      time   77.445s  ( 20.718s elapsed)
>>>>   EXIT    time    0.000s  (  0.010s elapsed)
>>>>   Total   time  112.260s  ( 63.672s elapsed)
>>>>
>>>>   Alloc rate    3,330,960,996 bytes per MUT second
>>>>
>>>>   Productivity  31.0% of total user, 67.5% of total elapsed
>>>>
>>>> gc_alloc_block_sync: 188614
>>>> whitehole_spin: 0
>>>> gen[0].sync: 33
>>>> gen[1].sync: 811204
>>>>
>>>>
>>>> I thought about writing a plainer (non-conduit) parallel version but I
>>>> was afraid of the memory issue. I tried to write a Conduit-plus-channels
>>>> version but it didn't work.
>>>>
>>>> Finally, I wrote a version using stm-conduit, which I thought might be
>>>> a bit more efficient. It seems to be slightly better, but it's not really
>>>> the kind of parallelization I was imagining:
>>>>
>>>> conduitAsyncFilterFiles :: ProjectFilter -> Path Abs Dir -> IO [String]
>>>> conduitAsyncFilterFiles projFilter dirname' = do
>>>>   (_, allFiles) <- listDirRecur dirname'
>>>>   buffer 10
>>>>     (C.yieldMany allFiles
>>>>     .| (C.mapMC (readFileWithPath . toFilePath)))
>>>>     (C.mapC (filterProjForFilename projFilter)
>>>>          .| C.filterC isJust
>>>>          .| C.mapC fromJust
>>>>          .| C.sinkList)
>>>>
>>>> The first conduit passed to `buffer` does something like the following: parseStrict
>>>> . B.readFile.
>>>>
>>>> This still wasn't too great, but after reading about handing garbage
>>>> collection in smarter ways, I found that I could run my application like
>>>> this:
>>>>
>>>> stack exec search-json -- --searchPath $FILES --name hello +RTS -s -A32m -n4m
>>>>
>>>> And the "productivity" would shoot up quite a lot presumably because
>>>> I'm doing less frequent garbage collection. My program also got a bit
>>>> faster:
>>>>
>>>>  36,379,265,096 bytes allocated in the heap
>>>>    1,238,438,160 bytes copied during GC
>>>>       22,996,264 bytes maximum residency (85 sample(s))
>>>>        3,834,152 bytes maximum slop
>>>>              207 MB total memory in use (14 MB lost due to fragmentation)
>>>>
>>>>                                      Tot time (elapsed)  Avg pause  Max pause
>>>>   Gen  0       211 colls,   211 par    1.433s   0.393s     0.0019s    0.0077s
>>>>   Gen  1        85 colls,    84 par    0.927s   0.256s     0.0030s    0.0067s
>>>>
>>>>   Parallel GC work balance: 67.93% (serial 0%, perfect 100%)
>>>>
>>>>   TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)
>>>>
>>>>   SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
>>>>
>>>>   INIT    time    0.001s  (  0.004s elapsed)
>>>>   MUT     time   12.636s  ( 12.697s elapsed)
>>>>   GC      time    2.359s  (  0.650s elapsed)
>>>>   EXIT    time   -0.015s  (  0.003s elapsed)
>>>>   Total   time   14.982s  ( 13.354s elapsed)
>>>>
>>>>   Alloc rate    2,878,972,840 bytes per MUT second
>>>>
>>>>   Productivity  84.2% of total user, 95.1% of total elapsed
>>>>
>>>> gc_alloc_block_sync: 9612
>>>> whitehole_spin: 0
>>>> gen[0].sync: 2044
>>>> gen[1].sync: 47704
>>>>
>>>>
>>>> Thanks for reading thus far. I now have three questions.
>>>>
>>>> 1. I understand that my program necessarily creates tons of garbage
>>>> because it parses and then throws away 5mb of JSON 500,000 times. However,
>>>> I don't really understand why this helps "+RTS -A32m -n4m" and I'm
>>>> always reluctant to sprinkle in magic I don't fully understand. Can anyone
>>>> help me understand what this means?
>>>>
>>>> 2. It seems that the allocation limit is really something I should be
>>>> using, but I can't figure out how to successfully add it to my package.yml
>>>> with the other options. From the documentation for GHC 8.2, I thought it
>>>> needed to look like this but it never works, usually telling me that -A32m
>>>> and -n4m are not recognizable flags (how do I add them in to my package.yml
>>>> so I don't have to pass them when running the program?):
>>>>
>>>> ghc-options:
>>>>     - -threaded
>>>>     - -rtsopts
>>>>     - "-with-rtsopts=-N4 -A32m -n4m"
>>>>
>>>> 3. Finally, the most important question I have is this. When I run this
>>>> program on OSX, it runs successfully through to completion. However, *a
>>>> few minutes after terminating*, my terminal becomes unresponsive. I
>>>> use emacs for my editor, typically launched from a terminal window and that
>>>> too becomes unresponsive. This is not a typical outcome for any programs I
>>>> write and it happens *every time* I run this particular application,
>>>> so I know that this application is to blame.
>>>>
>>>> The crazy thing is that force quitting the terminal or logging out
>>>> doesn't help: I have to actually restart my computer to use the terminal
>>>> application again.  Other details that may help:
>>>>
>>>>    - This crash happens after the process id for my program has
>>>>    terminated.
>>>>    - Watching its progress in HTOP, it never comes close to running
>>>>    out of memory: the value hovers in the same place.
>>>>
>>>> I can't really deploy an application that has this potential-crashing
>>>> problem, but I don't know to debug this issue. My total stab-in-the-dark
>>>> idea is that heap allocations somehow are unrecoverable even after the
>>>> process has terminated? Can anyone offer suggestions on things to look for
>>>> or ways to debug and/or fix this issue?
>>>>
>>>> Finally, if anyone has suggestions on better ways to structure my
>>>> application or parallelize the slow parts, I'll happily take those.
>>>>
>>>> Thanks again for reading. I appreciate any suggestions you may have.
>>>>
>>>> Best,
>>>>
>>>> --
>>>> Erik Aker
>>>>
>>>> _______________________________________________
>>>> Haskell-Cafe mailing list
>>>> To (un)subscribe, modify options or view archives go to:
>>>> http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe
>>>> Only members subscribed via the mailman list are allowed to post.
>>>>
>>>
>>>
>>
>>
>> --
>> Erik Aker
>>
>


-- 
Erik Aker
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/haskell-cafe/attachments/20180124/6a990f25/attachment-0001.html>


More information about the Haskell-Cafe mailing list