[Haskell-cafe] Large JSON File Processing
erik
eraker at gmail.com
Sun Jan 21 23:38:42 UTC 2018
Hello Haskell Cafe,
I have written a small, pretty simple program but I am finding it hard to
reason about its behavior (and also about the best way to do what I want),
so I would like to ask you all for some suggestions.
For reference, here's a Stack Overflow question
<https://stackoverflow.com/questions/48330690/haskell-conduit-aeson-parsing-large-jsons-and-filter-matching-key-values/48348153#48348153>
where I described what's going on, but I'll also describe it below.
My program does the following:
1. Recursively list a directory,
2. Parse the JSON files from the directory list into identifiable
objects/records,
3. Look for matching key-value pairs, and
4. Return filenames where matches have been found.
A few details for more context:
- I have to filter between 500,000 and 1 million files (I'm typically
trying to reduce down to between 1,000 and 40,000 that represent a
particular project). I usually just need the filenames.
- Each file is quite large, some of them 5mb or 10mb, and it's not
uncommon for them to have deeply nested keys (40,000 keys or so).
My first version of this program was simple, synchronous, and as
straightforward as I could come up with. However, the memory usage
increased monotonically. Profiling, I found that most of the time was spent
in JSON-parsing into Objects before my code could turn the objects into
records (also, as you might imagine, tons of time in garbage collection).
For my second version, I switched to conduit and it seemed to solve the
increasing memory issue. My core function now looked like this:
conduitFilesFilter :: ProjectFilter -> Path Abs Dir -> IO [Path Abs File]
conduitFilesFilter projFilter dirname' = do
(_, allFiles) <- listDirRecur dirname'
C.runConduit $
C.yieldMany allFiles
.| C.filterMC (filterMatchingFile projFilter)
.| C.sinkList
This was still slow and certainly still synchronous. What I really wanted
was to run that "filterMatchingFile..." part in parallel across a number of
CPUs. As an aside, my filtering function looks like this:
filterMatchingFile :: ProjectFilter -> Path Abs File -> IO Bool
filterMatchingFile (ProjectFilter filterFunc) fpath = do
let fp = toFilePath fpath
bs <- B.readFile fp
case validImplProject bs of -- this is pretty much just `decodeStrict`
Nothing -> pure False
(Just proj') -> pure $ filterFunc proj'
Here are the stats from running this:
115,961,554,600 bytes allocated in the heap
35,870,639,768 bytes copied during GC
56,467,720 bytes maximum residency (681 sample(s))
1,283,008 bytes maximum slop
145 MB total memory in use (0 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 108716 colls, 108716 par 76.915s 20.571s 0.0002s 0.0266s
Gen 1 681 colls, 680 par 0.530s 0.147s 0.0002s 0.0009s
Parallel GC work balance: 14.99% (serial 0%, perfect 100%)
TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)
SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
INIT time 0.001s ( 0.007s elapsed)
MUT time 34.813s ( 42.938s elapsed)
GC time 77.445s ( 20.718s elapsed)
EXIT time 0.000s ( 0.010s elapsed)
Total time 112.260s ( 63.672s elapsed)
Alloc rate 3,330,960,996 bytes per MUT second
Productivity 31.0% of total user, 67.5% of total elapsed
gc_alloc_block_sync: 188614
whitehole_spin: 0
gen[0].sync: 33
gen[1].sync: 811204
I thought about writing a plainer (non-conduit) parallel version but I was
afraid of the memory issue. I tried to write a Conduit-plus-channels
version but it didn't work.
Finally, I wrote a version using stm-conduit, which I thought might be a
bit more efficient. It seems to be slightly better, but it's not really the
kind of parallelization I was imagining:
conduitAsyncFilterFiles :: ProjectFilter -> Path Abs Dir -> IO [String]
conduitAsyncFilterFiles projFilter dirname' = do
(_, allFiles) <- listDirRecur dirname'
buffer 10
(C.yieldMany allFiles
.| (C.mapMC (readFileWithPath . toFilePath)))
(C.mapC (filterProjForFilename projFilter)
.| C.filterC isJust
.| C.mapC fromJust
.| C.sinkList)
The first conduit passed to `buffer` does something like the
following: parseStrict
. B.readFile.
This still wasn't too great, but after reading about handing garbage
collection in smarter ways, I found that I could run my application like
this:
stack exec search-json -- --searchPath $FILES --name hello +RTS -s -A32m -n4m
And the "productivity" would shoot up quite a lot presumably because I'm
doing less frequent garbage collection. My program also got a bit faster:
36,379,265,096 bytes allocated in the heap
1,238,438,160 bytes copied during GC
22,996,264 bytes maximum residency (85 sample(s))
3,834,152 bytes maximum slop
207 MB total memory in use (14 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 211 colls, 211 par 1.433s 0.393s 0.0019s 0.0077s
Gen 1 85 colls, 84 par 0.927s 0.256s 0.0030s 0.0067s
Parallel GC work balance: 67.93% (serial 0%, perfect 100%)
TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)
SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
INIT time 0.001s ( 0.004s elapsed)
MUT time 12.636s ( 12.697s elapsed)
GC time 2.359s ( 0.650s elapsed)
EXIT time -0.015s ( 0.003s elapsed)
Total time 14.982s ( 13.354s elapsed)
Alloc rate 2,878,972,840 bytes per MUT second
Productivity 84.2% of total user, 95.1% of total elapsed
gc_alloc_block_sync: 9612
whitehole_spin: 0
gen[0].sync: 2044
gen[1].sync: 47704
Thanks for reading thus far. I now have three questions.
1. I understand that my program necessarily creates tons of garbage because
it parses and then throws away 5mb of JSON 500,000 times. However, I don't
really understand why this helps "+RTS -A32m -n4m" and I'm always reluctant
to sprinkle in magic I don't fully understand. Can anyone help me
understand what this means?
2. It seems that the allocation limit is really something I should be
using, but I can't figure out how to successfully add it to my package.yml
with the other options. From the documentation for GHC 8.2, I thought it
needed to look like this but it never works, usually telling me that -A32m
and -n4m are not recognizable flags (how do I add them in to my package.yml
so I don't have to pass them when running the program?):
ghc-options:
- -threaded
- -rtsopts
- "-with-rtsopts=-N4 -A32m -n4m"
3. Finally, the most important question I have is this. When I run this
program on OSX, it runs successfully through to completion. However, *a few
minutes after terminating*, my terminal becomes unresponsive. I use emacs
for my editor, typically launched from a terminal window and that too
becomes unresponsive. This is not a typical outcome for any programs I
write and it happens *every time* I run this particular application, so I
know that this application is to blame.
The crazy thing is that force quitting the terminal or logging out doesn't
help: I have to actually restart my computer to use the terminal
application again. Other details that may help:
- This crash happens after the process id for my program has terminated.
- Watching its progress in HTOP, it never comes close to running out of
memory: the value hovers in the same place.
I can't really deploy an application that has this potential-crashing
problem, but I don't know to debug this issue. My total stab-in-the-dark
idea is that heap allocations somehow are unrecoverable even after the
process has terminated? Can anyone offer suggestions on things to look for
or ways to debug and/or fix this issue?
Finally, if anyone has suggestions on better ways to structure my
application or parallelize the slow parts, I'll happily take those.
Thanks again for reading. I appreciate any suggestions you may have.
Best,
--
Erik Aker
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/haskell-cafe/attachments/20180121/48fb3759/attachment.html>
More information about the Haskell-Cafe
mailing list