[Haskell-cafe] How to write a polymorphic serializer?
Viktor Dukhovni
ietf-dane at dukhovni.org
Sat Sep 16 05:55:49 UTC 2017
On Fri, Sep 15, 2017 at 10:12:43AM -0400, Li-yao Xia wrote:
> This example is odd because it doesn't seem like the lock is doing anything.
> Probably, the details that would make it more interesting have just been
> abstracted away, and I would guess that you do want a way to work with a
> single global lock.
In a bit more detail I have a small number of locks (currently two)
that are used to serialize access to the stdout file handle and an
SQLite database respectively. Though I know about "unsafePerformIO",
and understand that it is safe to use to create a global MVar (),
my locks are created on the fly in "main".
I was trying to avoid sprinkling the internal APIs and/or context
structures with explicit MVar-related types.
While implementing the typeclass idea that you helped me flesh out
(it works), I stumbled into a simpler alternative that meets my
needs and that I thought would not work, but does, and it helps me
to better underand and appreciate the semantic power of lazy
evaluation. Full program below. When compiled and run:as follows:
$ ./locktest $(seq 100 199) | uniq -c | awk '{print $1}' | fmt
Correct output:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
shows the numbers from 1 to 100 (not necessarily in that order),
which means that the $n^{th}$ thread managed to output $n$ long
lines without other threads getting in the way. Without locking
I get radically different results.
--
Viktor.
---------------- Cut below ----------------
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE Rank2Types #-}
module Main (main) where
import Control.Concurrent (forkFinally)
import Control.Concurrent.MVar (newMVar, withMVar)
import Control.Concurrent.STM ( TVar
, newTVar
, readTVar
, writeTVar
, atomically
, retry
)
import Control.Monad (mapM_, void, when)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Resource (runResourceT)
import Control.Monad.Trans.State.Strict
import Data.Conduit (await, ($$))
import Data.Conduit.List (sourceList)
import Data.List (concat, replicate)
import System.Environment (getArgs)
import System.IO (hFlush, hPutStrLn, stdout, stderr)
-- | Hide polymorphic lock closure inside a fixed existentially qualified
-- wrapper type. The magic of lazy evaluation lets Haskell defer the
-- type resolution of the @serially@ method inside the Lockbox until
-- it is used, and so its polymorphism is retained.
--
type Serializer = forall a m. MonadIO m => IO a -> m a
newtype Lockbox = Lockbox { serially :: Serializer }
locksmith :: IO Lockbox
locksmith = (\ lock -> Lockbox (liftIO . withMVar lock . const)) <$> newMVar ()
-- | Stutter each input string enough times to cause unlocked writes to split
-- Only by locking do we reliably get the long lines to be written in their
-- entirety, otherwise the output is typically a mess of interleaved partial
-- outputs.
--
amplification :: Int
amplification = 8000
main :: IO ()
main = do
args <- getArgs
tc <- atomically $ newTVar 0
lockbox <- locksmith
spawn args tc 99 (\ n s -> evalStateT (worker n s) lockbox)
where
-- | We could equally have used ReaderT here, our state is read-only
worker :: Int -> String -> StateT Lockbox IO ()
worker num s = do
dolocked <- gets serially
dolocked $ do
let bigs = concat $ replicate amplification s
mapM_ (\_ -> putStrLn bigs) $ [1..num]
hFlush stdout
type Worker = Int -> String -> IO ()
-- | Spawn worker thread up to designated thread count limit. When the limit is
-- reached, wait an existing thread to finish. Once all the work has been
-- dispatched, wait for the final threads to finish.
--
spawn :: [String] -- ^ Strings to amplify
-> TVar Int -- ^ Active thread count
-> Int -- ^ Thread count limit
-> Worker -- ^ Per-thread worker function
-> IO ()
spawn args threadCount threadMax worker =
runResourceT $ sourceList args $$ sink 1
where
sink num = do
next <- await
case next of
Nothing -> liftIO waitDone
Just a -> liftIO (waitTurn num a) >> sink (num + 1)
-- | Wait for remaining threads to finish
waitDone = atomically $ do
tc <- readTVar threadCount
when (tc > 0) retry
-- | Increment busy thread-count if not maxed-out, else wait and retry
waitTurn n s = do
atomically $ do
count <- readTVar threadCount
if (count < threadMax)
then writeTVar threadCount (count + 1)
else retry
void $ forkFinally (worker n s) finalize
-- | Decrement busy thread-count and warn of any exceptions
finalize res = do
atomically $
readTVar threadCount >>= writeTVar threadCount . pred
either warn (\_ -> return ()) res
where
warn err = hPutStrLn stderr $ "Exception: " ++ show err
More information about the Haskell-Cafe
mailing list