[Haskell-cafe] Large JSON File Processing
Michael Snoyman
michael at snoyman.com
Thu Jan 25 04:27:37 UTC 2018
Parallelization in conduit can usually be achieved with the stm-conduit
library, which I believe provides the functionality you're looking for.
On Wed, Jan 24, 2018, 11:52 PM erik <eraker at gmail.com> wrote:
> With Michael Snoyman's help, I rewrote my Conduit version of the
> application (without using stm-conduit). This was a large improvement: my
> first Conduit version was operating over all data and I didn't realize
> this.
>
> I also increased the nursery size.
>
> My revised function ended up looking like this:
>
> module Search where
> import Conduit ((.|))import qualified Conduit as Cimport Control.Monadimport Control.Monad.IO.Class (MonadIO, liftIO)import Control.Monad.Trans.Resource (MonadResource)import qualified Data.ByteString as Bimport Data.List (isPrefixOf)import Data.Maybe (fromJust, isJust)import System.Path.NameManip (guess_dotdot, absolute_path)import System.FilePath (addTrailingPathSeparator, normalise)import System.Directory (getHomeDirectory)
> import Filters
>
>
> sourceFilesFilter :: (MonadResource m, MonadIO m) => ProjectFilter -> FilePath -> C.ConduitM () String m ()
> sourceFilesFilter projFilter dirname' =
> C.sourceDirectoryDeep False dirname'
> .| parseProject projFilter
>
> parseProject :: (MonadResource m, MonadIO m) => ProjectFilter -> C.ConduitM FilePath String m ()
> parseProject (ProjectFilter filterFunc) = do
> C.awaitForever go
> where
> go path' = do
> bytes <- liftIO $ B.readFile path'
> let isProj = validProject bytes
> when (isJust isProj) $ do
> let proj' = fromJust isProj
> when (filterFunc proj') $ C.yield path'
>
> My main just runs the conduit and prints those that pass the filter:
>
> mainStreamingConduit :: IO ()
> mainStreamingConduit = do
> options <- getRecord "Search JSON Files"
> let filterFunc = makeProjectFilter options
> searchDir <- absolutize (searchPath options)
> itExists <- doesDirectoryExist searchDir
> case itExists of
> False -> putStrLn "Search Directory does not exist" >> exitWith (ExitFailure 1)
> True -> C.runConduitRes $ sourceFilesFilter filterFunc searchDir .| C.mapM_ (liftIO . putStrLn)
>
> I run it like this (without the stats, typically):
>
> stack exec search-json -- --searchPath $FILES --name NAME +RTS -s -A32m -n4m
>
> Without increasing nursery size, I get a productivity around 30%. With the
> above, however, it looks like this:
>
> 72,308,248,744 bytes allocated in the heap
> 733,911,752 bytes copied during GC
> 7,410,520 bytes maximum residency (8 sample(s))
> 863,480 bytes maximum slop
> 187 MB total memory in use (27 MB lost due to fragmentation)
>
> Tot time (elapsed) Avg pause Max pause
> Gen 0 580 colls, 580 par 2.731s 0.772s 0.0013s 0.0105s
> Gen 1 8 colls, 7 par 0.163s 0.044s 0.0055s 0.0109s
>
> Parallel GC work balance: 35.12% (serial 0%, perfect 100%)
>
>
>
> TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)
>
> SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
>
>
>
> INIT time 0.001s ( 0.006s elapsed)
> MUT time 26.155s ( 31.602s elapsed)
> GC time 2.894s ( 0.816s elapsed)
> EXIT time -0.003s ( 0.008s elapsed)
> Total time 29.048s ( 32.432s elapsed)
>
> Alloc rate 2,764,643,665 bytes per MUT second
>
> Productivity 90.0% of total user, 97.5% of total elapsed
>
> gc_alloc_block_sync: 3494
> whitehole_spin: 0
> gen[0].sync: 15527
> gen[1].sync: 177
>
> I'd still like to figure out how to parallelize the filterProj .
> parseJson . readFile part, but for now I'm satisfied with what I have.
>
> (I also isolated my crashing to another process launched from the same
> terminal window.)
>
> On Sun, Jan 21, 2018 at 10:12 PM, Michael Snoyman <michael at snoyman.com>
> wrote:
>
>> I just wanted to comment on the conduit aspect of this in particular.
>> Looking at your first version:
>>
>> conduitFilesFilter :: ProjectFilter -> Path Abs Dir -> IO [Path Abs File]
>> conduitFilesFilter projFilter dirname' = do
>> (_, allFiles) <- listDirRecur dirname'
>> C.runConduit $
>> C.yieldMany allFiles
>> .| C.filterMC (filterMatchingFile projFilter)
>> .| C.sinkList
>>
>>
>> This isn't taking full advantage of conduit: you're reading in a list of
>> the files in the file system, instead of streaming those values. And the
>> output is a list of `String`, instead of streaming out those `String`s.
>> More idiomatic would look something like:
>>
>> sourceFilesFilter projFilter dirname' =
>> sourceDirectoryDeep False dirname' .| filterMC (filterMatchingFile
>> projFilter)
>>
>> And then, wherever you're consuming the output, to do so in a streaming
>> fashion, e.g.:
>>
>> runConduitRes $ sourceFilesFilter projFilter dirname' .| mapM_C print
>>
>> This should help with the increasing memory usage, though it will do
>> nothing about the runtime overhead of parsing the JSON itself.
>>
>> On Mon, Jan 22, 2018 at 1:38 AM, erik <eraker at gmail.com> wrote:
>>
>>> Hello Haskell Cafe,
>>>
>>> I have written a small, pretty simple program but I am finding it hard
>>> to reason about its behavior (and also about the best way to do what I
>>> want), so I would like to ask you all for some suggestions.
>>>
>>> For reference, here's a Stack Overflow question
>>> <https://stackoverflow.com/questions/48330690/haskell-conduit-aeson-parsing-large-jsons-and-filter-matching-key-values/48348153#48348153>
>>> where I described what's going on, but I'll also describe it below.
>>>
>>> My program does the following:
>>>
>>> 1. Recursively list a directory,
>>> 2. Parse the JSON files from the directory list into identifiable
>>> objects/records,
>>> 3. Look for matching key-value pairs, and
>>> 4. Return filenames where matches have been found.
>>>
>>> A few details for more context:
>>>
>>> - I have to filter between 500,000 and 1 million files (I'm
>>> typically trying to reduce down to between 1,000 and 40,000 that represent
>>> a particular project). I usually just need the filenames.
>>> - Each file is quite large, some of them 5mb or 10mb, and it's not
>>> uncommon for them to have deeply nested keys (40,000 keys or so).
>>>
>>> My first version of this program was simple, synchronous, and as
>>> straightforward as I could come up with. However, the memory usage
>>> increased monotonically. Profiling, I found that most of the time was spent
>>> in JSON-parsing into Objects before my code could turn the objects into
>>> records (also, as you might imagine, tons of time in garbage collection).
>>>
>>> For my second version, I switched to conduit and it seemed to solve the
>>> increasing memory issue. My core function now looked like this:
>>>
>>> conduitFilesFilter :: ProjectFilter -> Path Abs Dir -> IO [Path Abs File]
>>> conduitFilesFilter projFilter dirname' = do
>>> (_, allFiles) <- listDirRecur dirname'
>>> C.runConduit $
>>> C.yieldMany allFiles
>>> .| C.filterMC (filterMatchingFile projFilter)
>>> .| C.sinkList
>>>
>>>
>>> This was still slow and certainly still synchronous. What I really
>>> wanted was to run that "filterMatchingFile..." part in parallel across a
>>> number of CPUs. As an aside, my filtering function looks like this:
>>>
>>> filterMatchingFile :: ProjectFilter -> Path Abs File -> IO Bool
>>> filterMatchingFile (ProjectFilter filterFunc) fpath = do
>>> let fp = toFilePath fpath
>>> bs <- B.readFile fp
>>> case validImplProject bs of -- this is pretty much just `decodeStrict`
>>> Nothing -> pure False
>>> (Just proj') -> pure $ filterFunc proj'
>>>
>>> Here are the stats from running this:
>>>
>>> 115,961,554,600 bytes allocated in the heap
>>> 35,870,639,768 bytes copied during GC
>>> 56,467,720 bytes maximum residency (681 sample(s))
>>> 1,283,008 bytes maximum slop
>>> 145 MB total memory in use (0 MB lost due to fragmentation)
>>>
>>> Tot time (elapsed) Avg pause Max pause
>>> Gen 0 108716 colls, 108716 par 76.915s 20.571s 0.0002s 0.0266s
>>> Gen 1 681 colls, 680 par 0.530s 0.147s 0.0002s 0.0009s
>>>
>>> Parallel GC work balance: 14.99% (serial 0%, perfect 100%)
>>>
>>> TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)
>>>
>>> SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
>>>
>>> INIT time 0.001s ( 0.007s elapsed)
>>> MUT time 34.813s ( 42.938s elapsed)
>>> GC time 77.445s ( 20.718s elapsed)
>>> EXIT time 0.000s ( 0.010s elapsed)
>>> Total time 112.260s ( 63.672s elapsed)
>>>
>>> Alloc rate 3,330,960,996 bytes per MUT second
>>>
>>> Productivity 31.0% of total user, 67.5% of total elapsed
>>>
>>> gc_alloc_block_sync: 188614
>>> whitehole_spin: 0
>>> gen[0].sync: 33
>>> gen[1].sync: 811204
>>>
>>>
>>> I thought about writing a plainer (non-conduit) parallel version but I
>>> was afraid of the memory issue. I tried to write a Conduit-plus-channels
>>> version but it didn't work.
>>>
>>> Finally, I wrote a version using stm-conduit, which I thought might be a
>>> bit more efficient. It seems to be slightly better, but it's not really the
>>> kind of parallelization I was imagining:
>>>
>>> conduitAsyncFilterFiles :: ProjectFilter -> Path Abs Dir -> IO [String]
>>> conduitAsyncFilterFiles projFilter dirname' = do
>>> (_, allFiles) <- listDirRecur dirname'
>>> buffer 10
>>> (C.yieldMany allFiles
>>> .| (C.mapMC (readFileWithPath . toFilePath)))
>>> (C.mapC (filterProjForFilename projFilter)
>>> .| C.filterC isJust
>>> .| C.mapC fromJust
>>> .| C.sinkList)
>>>
>>> The first conduit passed to `buffer` does something like the following: parseStrict
>>> . B.readFile.
>>>
>>> This still wasn't too great, but after reading about handing garbage
>>> collection in smarter ways, I found that I could run my application like
>>> this:
>>>
>>> stack exec search-json -- --searchPath $FILES --name hello +RTS -s -A32m -n4m
>>>
>>> And the "productivity" would shoot up quite a lot presumably because I'm
>>> doing less frequent garbage collection. My program also got a bit faster:
>>>
>>> 36,379,265,096 bytes allocated in the heap
>>> 1,238,438,160 bytes copied during GC
>>> 22,996,264 bytes maximum residency (85 sample(s))
>>> 3,834,152 bytes maximum slop
>>> 207 MB total memory in use (14 MB lost due to fragmentation)
>>>
>>> Tot time (elapsed) Avg pause Max pause
>>> Gen 0 211 colls, 211 par 1.433s 0.393s 0.0019s 0.0077s
>>> Gen 1 85 colls, 84 par 0.927s 0.256s 0.0030s 0.0067s
>>>
>>> Parallel GC work balance: 67.93% (serial 0%, perfect 100%)
>>>
>>> TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)
>>>
>>> SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
>>>
>>> INIT time 0.001s ( 0.004s elapsed)
>>> MUT time 12.636s ( 12.697s elapsed)
>>> GC time 2.359s ( 0.650s elapsed)
>>> EXIT time -0.015s ( 0.003s elapsed)
>>> Total time 14.982s ( 13.354s elapsed)
>>>
>>> Alloc rate 2,878,972,840 bytes per MUT second
>>>
>>> Productivity 84.2% of total user, 95.1% of total elapsed
>>>
>>> gc_alloc_block_sync: 9612
>>> whitehole_spin: 0
>>> gen[0].sync: 2044
>>> gen[1].sync: 47704
>>>
>>>
>>> Thanks for reading thus far. I now have three questions.
>>>
>>> 1. I understand that my program necessarily creates tons of garbage
>>> because it parses and then throws away 5mb of JSON 500,000 times. However,
>>> I don't really understand why this helps "+RTS -A32m -n4m" and I'm
>>> always reluctant to sprinkle in magic I don't fully understand. Can anyone
>>> help me understand what this means?
>>>
>>> 2. It seems that the allocation limit is really something I should be
>>> using, but I can't figure out how to successfully add it to my package.yml
>>> with the other options. From the documentation for GHC 8.2, I thought it
>>> needed to look like this but it never works, usually telling me that -A32m
>>> and -n4m are not recognizable flags (how do I add them in to my package.yml
>>> so I don't have to pass them when running the program?):
>>>
>>> ghc-options:
>>> - -threaded
>>> - -rtsopts
>>> - "-with-rtsopts=-N4 -A32m -n4m"
>>>
>>> 3. Finally, the most important question I have is this. When I run this
>>> program on OSX, it runs successfully through to completion. However, *a
>>> few minutes after terminating*, my terminal becomes unresponsive. I use
>>> emacs for my editor, typically launched from a terminal window and that too
>>> becomes unresponsive. This is not a typical outcome for any programs I
>>> write and it happens *every time* I run this particular application, so
>>> I know that this application is to blame.
>>>
>>> The crazy thing is that force quitting the terminal or logging out
>>> doesn't help: I have to actually restart my computer to use the terminal
>>> application again. Other details that may help:
>>>
>>> - This crash happens after the process id for my program has
>>> terminated.
>>> - Watching its progress in HTOP, it never comes close to running out
>>> of memory: the value hovers in the same place.
>>>
>>> I can't really deploy an application that has this potential-crashing
>>> problem, but I don't know to debug this issue. My total stab-in-the-dark
>>> idea is that heap allocations somehow are unrecoverable even after the
>>> process has terminated? Can anyone offer suggestions on things to look for
>>> or ways to debug and/or fix this issue?
>>>
>>> Finally, if anyone has suggestions on better ways to structure my
>>> application or parallelize the slow parts, I'll happily take those.
>>>
>>> Thanks again for reading. I appreciate any suggestions you may have.
>>>
>>> Best,
>>>
>>> --
>>> Erik Aker
>>>
>>> _______________________________________________
>>> Haskell-Cafe mailing list
>>> To (un)subscribe, modify options or view archives go to:
>>> http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe
>>> Only members subscribed via the mailman list are allowed to post.
>>>
>>
>>
>
>
> --
> Erik Aker
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/haskell-cafe/attachments/20180125/d8a50272/attachment-0001.html>
More information about the Haskell-Cafe
mailing list