[Haskell-cafe] Large JSON File Processing

erik eraker at gmail.com
Wed Jan 24 21:52:17 UTC 2018


With Michael Snoyman's help, I rewrote my Conduit version of the
application (without using stm-conduit). This was a large improvement: my
first Conduit version was operating over all data and I didn't realize
this.

I also increased the nursery size.

My revised function ended up looking like this:

module Search where
import           Conduit               ((.|))import qualified Conduit
             as Cimport           Control.Monadimport
Control.Monad.IO.Class   (MonadIO, liftIO)import
Control.Monad.Trans.Resource (MonadResource)import qualified
Data.ByteString       as Bimport           Data.List
(isPrefixOf)import           Data.Maybe            (fromJust,
isJust)import           System.Path.NameManip (guess_dotdot,
absolute_path)import           System.FilePath
(addTrailingPathSeparator, normalise)import           System.Directory
     (getHomeDirectory)
import           Filters


sourceFilesFilter :: (MonadResource m, MonadIO m) => ProjectFilter ->
FilePath -> C.ConduitM () String m ()
sourceFilesFilter projFilter dirname' =
    C.sourceDirectoryDeep False dirname'
    .| parseProject projFilter

parseProject :: (MonadResource m, MonadIO m) => ProjectFilter ->
C.ConduitM FilePath String m ()
parseProject (ProjectFilter filterFunc) = do
  C.awaitForever go
  where
    go path' = do
      bytes <- liftIO $ B.readFile path'
      let isProj = validProject bytes
      when (isJust isProj) $ do
        let proj' = fromJust isProj
        when (filterFunc proj') $ C.yield path'

My main just runs the conduit and prints those that pass the filter:

mainStreamingConduit :: IO ()
mainStreamingConduit = do
  options <- getRecord "Search JSON Files"
  let filterFunc = makeProjectFilter options
  searchDir <- absolutize (searchPath options)
  itExists <- doesDirectoryExist searchDir
  case itExists of
    False -> putStrLn "Search Directory does not exist" >> exitWith
(ExitFailure 1)
    True -> C.runConduitRes $ sourceFilesFilter filterFunc searchDir
.| C.mapM_ (liftIO . putStrLn)

I run it like this (without the stats, typically):

stack exec search-json -- --searchPath $FILES --name NAME +RTS -s -A32m -n4m

Without increasing nursery size, I get a productivity around 30%. With the
above, however, it looks like this:

  72,308,248,744 bytes allocated in the heap
     733,911,752 bytes copied during GC
       7,410,520 bytes maximum residency (8 sample(s))
         863,480 bytes maximum slop
             187 MB total memory in use (27 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0       580 colls,   580 par    2.731s   0.772s     0.0013s    0.0105s
  Gen  1         8 colls,     7 par    0.163s   0.044s     0.0055s    0.0109s

  Parallel GC work balance: 35.12% (serial 0%, perfect 100%)

  TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.001s  (  0.006s elapsed)
  MUT     time   26.155s  ( 31.602s elapsed)
  GC      time    2.894s  (  0.816s elapsed)
  EXIT    time   -0.003s  (  0.008s elapsed)
  Total   time   29.048s  ( 32.432s elapsed)

  Alloc rate    2,764,643,665 bytes per MUT second

  Productivity  90.0% of total user, 97.5% of total elapsed

gc_alloc_block_sync: 3494
whitehole_spin: 0
gen[0].sync: 15527
gen[1].sync: 177

I'd still like to figure out how to parallelize the filterProj . parseJson
. readFile part, but for now I'm satisfied with what I have.

(I also isolated my crashing to another process launched from the same
terminal window.)

On Sun, Jan 21, 2018 at 10:12 PM, Michael Snoyman <michael at snoyman.com>
wrote:

> I just wanted to comment on the conduit aspect of this in particular.
> Looking at your first version:
>
> conduitFilesFilter :: ProjectFilter -> Path Abs Dir -> IO [Path Abs File]
> conduitFilesFilter projFilter dirname' = do
>   (_, allFiles) <- listDirRecur dirname'
>   C.runConduit $
>     C.yieldMany allFiles
>     .| C.filterMC (filterMatchingFile projFilter)
>     .| C.sinkList
>
>
> This isn't taking full advantage of conduit: you're reading in a list of
> the files in the file system, instead of streaming those values. And the
> output is a list of `String`, instead of streaming out those `String`s.
> More idiomatic would look something like:
>
> sourceFilesFilter projFilter dirname' =
>   sourceDirectoryDeep False dirname' .| filterMC (filterMatchingFile
> projFilter)
>
> And then, wherever you're consuming the output, to do so in a streaming
> fashion, e.g.:
>
> runConduitRes $ sourceFilesFilter projFilter dirname' .| mapM_C print
>
> This should help with the increasing memory usage, though it will do
> nothing about the runtime overhead of parsing the JSON itself.
>
> On Mon, Jan 22, 2018 at 1:38 AM, erik <eraker at gmail.com> wrote:
>
>> Hello Haskell Cafe,
>>
>> I have written a small, pretty simple program but I am finding it hard to
>> reason about its behavior (and also about the best way to do what I want),
>> so I would like to ask you all for some suggestions.
>>
>> For reference, here's a Stack Overflow question
>> <https://stackoverflow.com/questions/48330690/haskell-conduit-aeson-parsing-large-jsons-and-filter-matching-key-values/48348153#48348153>
>> where I described what's going on, but I'll also describe it below.
>>
>> My program does the following:
>>
>>    1. Recursively list a directory,
>>    2. Parse the JSON files from the directory list into identifiable
>>    objects/records,
>>    3. Look for matching key-value pairs, and
>>    4. Return filenames where matches have been found.
>>
>> A few details for more context:
>>
>>    - I have to filter between 500,000 and 1 million files (I'm typically
>>    trying to reduce down to between 1,000 and 40,000 that represent a
>>    particular project). I usually just need the filenames.
>>    - Each file is quite large, some of them 5mb or 10mb, and it's not
>>    uncommon for them to have deeply nested keys (40,000 keys or so).
>>
>> My first version of this program was simple, synchronous, and as
>> straightforward as I could come up with. However, the memory usage
>> increased monotonically. Profiling, I found that most of the time was spent
>> in JSON-parsing into Objects before my code could turn the objects into
>> records (also, as you might imagine, tons of time in garbage collection).
>>
>> For my second version, I switched to conduit and it seemed to solve the
>> increasing memory issue. My core function now looked like this:
>>
>> conduitFilesFilter :: ProjectFilter -> Path Abs Dir -> IO [Path Abs File]
>> conduitFilesFilter projFilter dirname' = do
>>   (_, allFiles) <- listDirRecur dirname'
>>   C.runConduit $
>>     C.yieldMany allFiles
>>     .| C.filterMC (filterMatchingFile projFilter)
>>     .| C.sinkList
>>
>>
>> This was still slow and certainly still synchronous. What I really wanted
>> was to run that "filterMatchingFile..." part in parallel across a number of
>> CPUs. As an aside, my filtering function looks like this:
>>
>> filterMatchingFile :: ProjectFilter -> Path Abs File -> IO Bool
>> filterMatchingFile (ProjectFilter filterFunc) fpath = do
>>   let fp = toFilePath fpath
>>   bs <- B.readFile fp
>>   case validImplProject bs of  -- this is pretty much just `decodeStrict`
>>     Nothing -> pure False
>>     (Just proj') -> pure $ filterFunc proj'
>>
>> Here are the stats from running this:
>>
>> 115,961,554,600 bytes allocated in the heap
>>   35,870,639,768 bytes copied during GC
>>       56,467,720 bytes maximum residency (681 sample(s))
>>        1,283,008 bytes maximum slop
>>              145 MB total memory in use (0 MB lost due to fragmentation)
>>
>>                                      Tot time (elapsed)  Avg pause  Max pause
>>   Gen  0     108716 colls, 108716 par   76.915s  20.571s     0.0002s    0.0266s
>>   Gen  1       681 colls,   680 par    0.530s   0.147s     0.0002s    0.0009s
>>
>>   Parallel GC work balance: 14.99% (serial 0%, perfect 100%)
>>
>>   TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)
>>
>>   SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
>>
>>   INIT    time    0.001s  (  0.007s elapsed)
>>   MUT     time   34.813s  ( 42.938s elapsed)
>>   GC      time   77.445s  ( 20.718s elapsed)
>>   EXIT    time    0.000s  (  0.010s elapsed)
>>   Total   time  112.260s  ( 63.672s elapsed)
>>
>>   Alloc rate    3,330,960,996 bytes per MUT second
>>
>>   Productivity  31.0% of total user, 67.5% of total elapsed
>>
>> gc_alloc_block_sync: 188614
>> whitehole_spin: 0
>> gen[0].sync: 33
>> gen[1].sync: 811204
>>
>>
>> I thought about writing a plainer (non-conduit) parallel version but I
>> was afraid of the memory issue. I tried to write a Conduit-plus-channels
>> version but it didn't work.
>>
>> Finally, I wrote a version using stm-conduit, which I thought might be a
>> bit more efficient. It seems to be slightly better, but it's not really the
>> kind of parallelization I was imagining:
>>
>> conduitAsyncFilterFiles :: ProjectFilter -> Path Abs Dir -> IO [String]
>> conduitAsyncFilterFiles projFilter dirname' = do
>>   (_, allFiles) <- listDirRecur dirname'
>>   buffer 10
>>     (C.yieldMany allFiles
>>     .| (C.mapMC (readFileWithPath . toFilePath)))
>>     (C.mapC (filterProjForFilename projFilter)
>>          .| C.filterC isJust
>>          .| C.mapC fromJust
>>          .| C.sinkList)
>>
>> The first conduit passed to `buffer` does something like the following: parseStrict
>> . B.readFile.
>>
>> This still wasn't too great, but after reading about handing garbage
>> collection in smarter ways, I found that I could run my application like
>> this:
>>
>> stack exec search-json -- --searchPath $FILES --name hello +RTS -s -A32m -n4m
>>
>> And the "productivity" would shoot up quite a lot presumably because I'm
>> doing less frequent garbage collection. My program also got a bit faster:
>>
>>  36,379,265,096 bytes allocated in the heap
>>    1,238,438,160 bytes copied during GC
>>       22,996,264 bytes maximum residency (85 sample(s))
>>        3,834,152 bytes maximum slop
>>              207 MB total memory in use (14 MB lost due to fragmentation)
>>
>>                                      Tot time (elapsed)  Avg pause  Max pause
>>   Gen  0       211 colls,   211 par    1.433s   0.393s     0.0019s    0.0077s
>>   Gen  1        85 colls,    84 par    0.927s   0.256s     0.0030s    0.0067s
>>
>>   Parallel GC work balance: 67.93% (serial 0%, perfect 100%)
>>
>>   TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)
>>
>>   SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
>>
>>   INIT    time    0.001s  (  0.004s elapsed)
>>   MUT     time   12.636s  ( 12.697s elapsed)
>>   GC      time    2.359s  (  0.650s elapsed)
>>   EXIT    time   -0.015s  (  0.003s elapsed)
>>   Total   time   14.982s  ( 13.354s elapsed)
>>
>>   Alloc rate    2,878,972,840 bytes per MUT second
>>
>>   Productivity  84.2% of total user, 95.1% of total elapsed
>>
>> gc_alloc_block_sync: 9612
>> whitehole_spin: 0
>> gen[0].sync: 2044
>> gen[1].sync: 47704
>>
>>
>> Thanks for reading thus far. I now have three questions.
>>
>> 1. I understand that my program necessarily creates tons of garbage
>> because it parses and then throws away 5mb of JSON 500,000 times. However,
>> I don't really understand why this helps "+RTS -A32m -n4m" and I'm
>> always reluctant to sprinkle in magic I don't fully understand. Can anyone
>> help me understand what this means?
>>
>> 2. It seems that the allocation limit is really something I should be
>> using, but I can't figure out how to successfully add it to my package.yml
>> with the other options. From the documentation for GHC 8.2, I thought it
>> needed to look like this but it never works, usually telling me that -A32m
>> and -n4m are not recognizable flags (how do I add them in to my package.yml
>> so I don't have to pass them when running the program?):
>>
>> ghc-options:
>>     - -threaded
>>     - -rtsopts
>>     - "-with-rtsopts=-N4 -A32m -n4m"
>>
>> 3. Finally, the most important question I have is this. When I run this
>> program on OSX, it runs successfully through to completion. However, *a
>> few minutes after terminating*, my terminal becomes unresponsive. I use
>> emacs for my editor, typically launched from a terminal window and that too
>> becomes unresponsive. This is not a typical outcome for any programs I
>> write and it happens *every time* I run this particular application, so
>> I know that this application is to blame.
>>
>> The crazy thing is that force quitting the terminal or logging out
>> doesn't help: I have to actually restart my computer to use the terminal
>> application again.  Other details that may help:
>>
>>    - This crash happens after the process id for my program has
>>    terminated.
>>    - Watching its progress in HTOP, it never comes close to running out
>>    of memory: the value hovers in the same place.
>>
>> I can't really deploy an application that has this potential-crashing
>> problem, but I don't know to debug this issue. My total stab-in-the-dark
>> idea is that heap allocations somehow are unrecoverable even after the
>> process has terminated? Can anyone offer suggestions on things to look for
>> or ways to debug and/or fix this issue?
>>
>> Finally, if anyone has suggestions on better ways to structure my
>> application or parallelize the slow parts, I'll happily take those.
>>
>> Thanks again for reading. I appreciate any suggestions you may have.
>>
>> Best,
>>
>> --
>> Erik Aker
>>
>> _______________________________________________
>> Haskell-Cafe mailing list
>> To (un)subscribe, modify options or view archives go to:
>> http://mail.haskell.org/cgi-bin/mailman/listinfo/haskell-cafe
>> Only members subscribed via the mailman list are allowed to post.
>>
>
>


-- 
Erik Aker
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.haskell.org/pipermail/haskell-cafe/attachments/20180124/af265ef2/attachment.html>


More information about the Haskell-Cafe mailing list