[Haskell-cafe] monoid fold concurrently

Viktor Dukhovni ietf-dane at dukhovni.org
Sat Nov 16 07:40:04 UTC 2019

On Fri, Nov 15, 2019 at 05:06:16PM -0500, Viktor Dukhovni wrote:

> I've not used this module myself, please post a summary of your
> experience.

I was curious, so I decided to try a simple case:

    {-# LANGUAGE BlockArguments #-}
    {-# LANGUAGE BangPatterns #-}

    module Main (main) where

    import Control.Concurrent.Async.Pool
    import Control.Concurrent.STM
    import Control.Monad
    import Data.List
    import Data.Monoid
    import System.Environment

    defCount, batchSz :: Int
    defCount = 10000
    batchSz = 256

    batchList :: Int -> [a] -> [[a]]
    batchList sz as = case splitAt sz as of
        ([], _) -> []
        (t, []) -> [t]
        (h, t)  -> h : batchList sz t

    main :: IO ()
    main = do
        n <- maybe defCount read <$> (fmap fst . uncons) <$> getArgs
        let bs = batchList batchSz $ map Sum [1..n]
        s <- foldM mergeReduce mempty bs 
        print $ getSum s
        mergeReduce :: Sum Int -> [(Sum Int)] -> IO (Sum Int)
        mergeReduce !acc ms = (acc <>) <$> reduceBatch (return <$> ms)

        reduceBatch :: Monoid a => [IO a] -> IO a
        reduceBatch ms =
            withTaskGroup 8 $ (>>= wait) . atomically . flip mapReduce ms

Without batching, the whole list of actions is brought into memory,
all at once (to create the task dependency graph), and then the
outputs are folded concurrently, which does not run in constant
memory in the size of the list.

In the above the list of actions is chunked (256 at a time), these
are merged concurrently, but then the results from the chunks are
merged sequentially.

If the cost of storing the entire task list in memory is negligible,
a single mapReduce may perform better:

    {-# LANGUAGE BlockArguments #-}

    module Main (main) where

    import Control.Concurrent.Async.Pool
    import Control.Concurrent.STM
    import Data.List
    import Data.Monoid
    import System.Environment

    defCount :: Int
    defCount = 100

    main :: IO ()
    main = do
        n <- maybe defCount read <$> (fmap fst . uncons) <$> getArgs
        withTaskGroup 8 \tg -> do
            reduction <- atomically $ mapReduce tg $ map (return . Sum) [1..n]
            wait reduction >>= print . getSum


