[Haskell-cafe] How to write a polymorphic serializer?

Viktor Dukhovni ietf-dane at dukhovni.org
Sat Sep 16 05:55:49 UTC 2017


On Fri, Sep 15, 2017 at 10:12:43AM -0400, Li-yao Xia wrote:

> This example is odd because it doesn't seem like the lock is doing anything.
> Probably, the details that would make it more interesting have just been
> abstracted away, and I would guess that you do want a way to work with a
> single global lock.

In a bit more detail I have a small number of locks (currently two)
that are used to serialize access to the stdout file handle and an
SQLite database respectively.  Though I know about "unsafePerformIO",
and understand that it is safe to use to create a global MVar (),
my locks are created on the fly in "main".

I was trying to avoid sprinkling the internal APIs and/or context
structures with explicit MVar-related types.

While implementing the typeclass idea that you helped me flesh out
(it works), I stumbled into a simpler alternative that meets my
needs and that I thought would not work, but does, and it helps me
to better underand and appreciate the semantic power of lazy
evaluation.  Full program below.  When compiled and run:as follows:

  $ ./locktest $(seq 100 199) | uniq -c | awk '{print $1}' | fmt

Correct output:

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
  28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
  52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
  77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100

shows the numbers from 1 to 100 (not necessarily in that order),
which means that the $n^{th}$ thread managed to output $n$ long
lines without other threads getting in the way.  Without locking
I get radically different results.

-- 
	Viktor.

---------------- Cut below ---------------- 

{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE Rank2Types #-}

module Main (main) where

import           Control.Concurrent (forkFinally)
import           Control.Concurrent.MVar (newMVar, withMVar)
import           Control.Concurrent.STM ( TVar
                                        , newTVar
                                        , readTVar
                                        , writeTVar
                                        , atomically
                                        , retry
                                        )
import           Control.Monad (mapM_, void, when)
import           Control.Monad.IO.Class (MonadIO, liftIO)
import           Control.Monad.Trans.Resource (runResourceT)
import           Control.Monad.Trans.State.Strict
import           Data.Conduit (await, ($$))
import           Data.Conduit.List (sourceList)
import           Data.List (concat, replicate)
import           System.Environment (getArgs)
import           System.IO (hFlush, hPutStrLn, stdout, stderr)

-- | Hide polymorphic lock closure inside a fixed existentially qualified
--   wrapper type.  The magic of lazy evaluation lets Haskell defer the
--   type resolution of the @serially@ method inside the Lockbox until
--   it is used, and so its polymorphism is retained.
--
type Serializer = forall a m. MonadIO m => IO a -> m a
newtype Lockbox = Lockbox { serially :: Serializer }

locksmith :: IO Lockbox
locksmith = (\ lock -> Lockbox (liftIO . withMVar lock . const)) <$> newMVar ()

-- | Stutter each input string enough times to cause unlocked writes to split
--   Only by locking do we reliably get the long lines to be written in their
--   entirety, otherwise the output is typically a mess of interleaved partial
--   outputs.
--
amplification :: Int
amplification = 8000

main :: IO ()
main = do
    args <- getArgs
    tc <- atomically $ newTVar 0
    lockbox <- locksmith
    spawn args tc 99 (\ n s -> evalStateT (worker n s) lockbox)

  where

    -- | We could equally have used ReaderT here, our state is read-only
    worker :: Int -> String -> StateT Lockbox IO ()
    worker num s = do
        dolocked <- gets serially
        dolocked $ do
            let bigs = concat $ replicate amplification s
            mapM_ (\_ -> putStrLn bigs) $ [1..num]
            hFlush stdout

type Worker = Int -> String -> IO ()

-- | Spawn worker thread up to designated thread count limit.  When the limit is
--   reached, wait an existing thread to finish.  Once all the work has been
--   dispatched, wait for the final threads to finish.
--
spawn :: [String] -- ^ Strings to amplify
      -> TVar Int -- ^ Active thread count
      -> Int      -- ^ Thread count limit
      -> Worker   -- ^ Per-thread worker function
      -> IO ()
spawn args threadCount threadMax worker =
    runResourceT $ sourceList args $$ sink 1
  where
    sink num = do
        next <- await
        case next of
            Nothing -> liftIO waitDone
            Just  a -> liftIO (waitTurn num a) >> sink (num + 1)

    -- | Wait for remaining threads to finish
    waitDone = atomically $ do
        tc <- readTVar threadCount
        when (tc > 0) retry

    -- | Increment busy thread-count if not maxed-out, else wait and retry
    waitTurn n s = do
        atomically $ do
            count <- readTVar threadCount
            if (count < threadMax)
                then writeTVar threadCount (count + 1)
                else retry
        void $ forkFinally (worker n s) finalize

    -- | Decrement busy thread-count and warn of any exceptions
    finalize res = do
        atomically $
          readTVar threadCount >>= writeTVar threadCount . pred
        either warn (\_ -> return ()) res
      where
        warn err = hPutStrLn stderr $ "Exception: " ++ show err


More information about the Haskell-Cafe mailing list