[Haskell-cafe] Large JSON File Processing

Michael Snoyman michael at snoyman.com
Mon Jan 22 06:12:47 UTC 2018


I just wanted to comment on the conduit aspect of this in particular.
Looking at your first version:

conduitFilesFilter :: ProjectFilter -> Path Abs Dir -> IO [Path Abs File]
conduitFilesFilter projFilter dirname' = do
  (_, allFiles) <- listDirRecur dirname'
  C.runConduit $
    C.yieldMany allFiles
    .| C.filterMC (filterMatchingFile projFilter)
    .| C.sinkList


This isn't taking full advantage of conduit: you're reading in a list of
the files in the file system, instead of streaming those values. And the
output is a list of `String`, instead of streaming out those `String`s.
More idiomatic would look something like:

sourceFilesFilter projFilter dirname' =
  sourceDirectoryDeep False dirname' .| filterMC (filterMatchingFile
projFilter)

And then, wherever you're consuming the output, to do so in a streaming
fashion, e.g.:

runConduitRes $ sourceFilesFilter projFilter dirname' .| mapM_C print

This should help with the increasing memory usage, though it will do
nothing about the runtime overhead of parsing the JSON itself.

On Mon, Jan 22, 2018 at 1:38 AM, erik <eraker at gmail.com> wrote:

> Hello Haskell Cafe,
>
> I have written a small, pretty simple program but I am finding it hard to
> reason about its behavior (and also about the best way to do what I want),
> so I would like to ask you all for some suggestions.
>
> For reference, here's a Stack Overflow question
> <https://stackoverflow.com/questions/48330690/haskell-conduit-aeson-parsing-large-jsons-and-filter-matching-key-values/48348153#48348153>
> where I described what's going on, but I'll also describe it below.
>
> My program does the following:
>
>    1. Recursively list a directory,
>    2. Parse the JSON files from the directory list into identifiable
>    objects/records,
>    3. Look for matching key-value pairs, and
>    4. Return filenames where matches have been found.
>
> A few details for more context:
>
>    - I have to filter between 500,000 and 1 million files (I'm typically
>    trying to reduce down to between 1,000 and 40,000 that represent a
>    particular project). I usually just need the filenames.
>    - Each file is quite large, some of them 5mb or 10mb, and it's not
>    uncommon for them to have deeply nested keys (40,000 keys or so).
>
> My first version of this program was simple, synchronous, and as
> straightforward as I could come up with. However, the memory usage
> increased monotonically. Profiling, I found that most of the time was spent
> in JSON-parsing into Objects before my code could turn the objects into
> records (also, as you might imagine, tons of time in garbage collection).
>
> For my second version, I switched to conduit and it seemed to solve the
> increasing memory issue. My core function now looked like this:
>
> conduitFilesFilter :: ProjectFilter -> Path Abs Dir -> IO [Path Abs File]
> conduitFilesFilter projFilter dirname' = do
>   (_, allFiles) <- listDirRecur dirname'
>   C.runConduit $
>     C.yieldMany allFiles
>     .| C.filterMC (filterMatchingFile projFilter)
>     .| C.sinkList
>
>
> This was still slow and certainly still synchronous. What I really wanted
> was to run that "filterMatchingFile..." part in parallel across a number of
> CPUs. As an aside, my filtering function looks like this:
>
> filterMatchingFile :: ProjectFilter -> Path Abs File -> IO Bool
> filterMatchingFile (ProjectFilter filterFunc) fpath = do
>   let fp = toFilePath fpath
>   bs <- B.readFile fp
>   case validImplProject bs of  -- this is pretty much just `decodeStrict`
>     Nothing -> pure False
>     (Just proj') -> pure $ filterFunc proj'
>
> Here are the stats from running this:
>
> 115,961,554,600 bytes allocated in the heap
>   35,870,639,768 bytes copied during GC
>       56,467,720 bytes maximum residency (681 sample(s))
>        1,283,008 bytes maximum slop
>              145 MB total memory in use (0 MB lost due to fragmentation)
>
>                                      Tot time (elapsed)  Avg pause  Max pause
>   Gen  0     108716 colls, 108716 par   76.915s  20.571s     0.0002s    0.0266s
>   Gen  1       681 colls,   680 par    0.530s   0.147s     0.0002s    0.0009s
>
>   Parallel GC work balance: 14.99% (serial 0%, perfect 100%)
>
>   TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)
>
>   SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
>
>   INIT    time    0.001s  (  0.007s elapsed)
>   MUT     time   34.813s  ( 42.938s elapsed)
>   GC      time   77.445s  ( 20.718s elapsed)
>   EXIT    time    0.000s  (  0.010s elapsed)
>   Total   time  112.260s  ( 63.672s elapsed)
>
>   Alloc rate    3,330,960,996 bytes per MUT second
>
>   Productivity  31.0% of total user, 67.5% of total elapsed
>
> gc_alloc_block_sync: 188614
> whitehole_spin: 0
> gen[0].sync: 33
> gen[1].sync: 811204
>
>
> I thought about writing a plainer (non-conduit) parallel version but I was
> afraid of the memory issue. I tried to write a Conduit-plus-channels
> version but it didn't work.
>
> Finally, I wrote a version using stm-conduit, which I thought might be a
> bit more efficient. It seems to be slightly better, but it's not really the
> kind of parallelization I was imagining:
>
> conduitAsyncFilterFiles :: ProjectFilter -> Path Abs Dir -> IO [String]
> conduitAsyncFilterFiles projFilter dirname' = do
>   (_, allFiles) <- listDirRecur dirname'
>   buffer 10
>     (C.yieldMany allFiles
>     .| (C.mapMC (readFileWithPath . toFilePath)))
>     (C.mapC (filterProjForFilename projFilter)
>          .| C.filterC isJust
>          .| C.mapC fromJust
>          .| C.sinkList)
>
> The first conduit passed to `buffer` does something like the following: parseStrict
> . B.readFile.
>
> This still wasn't too great, but after reading about handing garbage
> collection in smarter ways, I found that I could run my application like
> this:
>
> stack exec search-json -- --searchPath $FILES --name hello +RTS -s -A32m -n4m
>
> And the "productivity" would shoot up quite a lot presumably because I'm
> doing less frequent garbage collection. My program also got a bit faster:
>
>  36,379,265,096 bytes allocated in the heap
>    1,238,438,160 bytes copied during GC
>       22,996,264 bytes maximum residency (85 sample(s))
>        3,834,152 bytes maximum slop
>              207 MB total memory in use (14 MB lost due to fragmentation)
>
>                                      Tot time (elapsed)  Avg pause  Max pause
>   Gen  0       211 colls,   211 par    1.433s   0.393s     0.0019s    0.0077s
>   Gen  1        85 colls,    84 par    0.927s   0.256s     0.0030s    0.0067s
>
>   Parallel GC work balance: 67.93% (serial 0%, perfect 100%)
>
>   TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)
>
>   SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
>
>   INIT    time    0.001s  (  0.004s elapsed)
>   MUT     time   12.636s  ( 12.697s elapsed)
>   GC      time    2.359s  (  0.650s elapsed)
>   EXIT    time   -0.015s  (  0.003s elapsed)
>   Total   time   14.982s  ( 13.354s elapsed)
>
>   Alloc rate    2,878,972,840 bytes per MUT second
>
>   Productivity  84.2% of total user, 95.1% of total elapsed
>
> gc_alloc_block_sync: 9612
> whitehole_spin: 0
> gen[0].sync: 2044
> gen[1].sync: 47704
>
>
> Thanks for reading thus far. I now have three questions.
>
> 1. I understand that my program necessarily creates tons of garbage
> because it parses and then throws away 5mb of JSON 500,000 times. However,
> I don't really understand why this helps "+RTS -A32m -n4m" and I'm always
> reluctant to sprinkle in magic I don't fully understand. Can anyone help me
> understand what this means?
>
> 2. It seems that the allocation limit is really something I should be
> using, but I can't figure out how to successfully add it to my package.yml
> with the other options. From the documentation for GHC 8.2, I thought it
> needed to look like this but it never works, usually telling me that -A32m
> and -n4m are not recognizable flags (how do I add them in to my package.yml
> so I don't have to pass them when running the program?):
>
> ghc-options:
>     - -threaded
>     - -rtsopts
>     - "-with-rtsopts=-N4 -A32m -n4m"
>
> 3. Finally, the most important question I have is this. When I run this
> program on OSX, it runs successfully through to completion. However, *a
> few minutes after terminating*, my terminal becomes unresponsive. I use
> emacs for my editor, typically launched from a terminal window and that too
> becomes unresponsive. This is not a typical outcome for any programs I
> write and it happens *every time* I run this particular application, so I
> know that this application is to blame.
>
> The crazy thing is that force quitting the terminal or logging out doesn't
> help: I have to actually restart my computer to use the terminal
> application again.  Other details that may help:
>
>    - This crash happens after the process id for my program has
>    terminated.
>    - Watching its progress in HTOP, it never comes close to running out
>    of memory: the value hovers in the same place.
>
> I can't really deploy an application that has this potential-crashing
> problem, but I don't know to debug this issue. My total stab-in-the-dark
> idea is that heap allocations somehow are unrecoverable even after the
> process has terminated? Can anyone offer suggestions on things to look for
> or ways to debug and/or fix this issue?
>
> Finally, if anyone has suggestions on better ways to structure my
> application or parallelize the slow parts, I'll happily take those.
>
> Thanks again for reading. I appreciate any suggestions you may have.
>
> Best,
>
> --
> Erik Aker
>
> _______________________________________________
> Haskell-Cafe mailing list
> To (un)subscribe, modify options or view archives go to:
> http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe
> Only members subscribed via the mailman list are allowed to post.
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/haskell-cafe/attachments/20180122/c85a65a8/attachment.html>


More information about the Haskell-Cafe mailing list