[Haskell-cafe] STM, Concurrent Haskell and network clients (long,
code)
Joel Reymont
joelr1 at gmail.com
Fri Dec 2 10:20:20 EST 2005
Folks,
After two months with Haskell I think I'm starting to get the proper
spirit. Inspired by Simon PJ's recent comments about lots of threads
not hurting Haskell I introduced STM into my network client. I would
like to run my architecture by you to see if there are any
improvements to be made. I would like the code to be as bullet-proof
as possible.
I'm also jumping through some hoops to make sure none of the 3
threads (main, socket reader and socket writer) block indefinitely.
I'm getting a "thread blocked indefinitely" exception now and I think
this is because the main thread is not getting notified when the
socket threads die.
My network client has two threads. #1 reads network packets, converts
them to events and posts them to a TChan using a function passed to
it by the main thread. #2 blocks on a second TChan and waits for
events to be sent out through the socket.
The main thread creates a TChan to read events from, passes a closure
that writes to the TChan to the network client and uses a network
client function to send events to the network client.
All in all it works out very nicely as there's no networking, FFI,
etc. in the network client logic. This is an improvement from what I
had before.
The function that starts the poker client has this signature:
startPokerClient :: (Event -> IO ()) -- fun to send events to the
main thread
-> HostInfo -- (host, port)
-> AuthInfo -- (user, pass, etc.)
-> IO PokerClient
where
data PokerClient = PC
!MaybeThread
!MaybeThread
!(TChan Command)
The main network client thread calls this fun to deliver events to
the network client:
writePokerClient :: PokerClient -> Command -> IO ()
writePokerClient (PC _ _ chan) cmd =
atomically $ writeTChan chan cmd
Main thread can stop the network client using
stopPokerClient :: PokerClient -> IO ()
stopPokerClient (PC tmv2 tmv1 _) =
do putStrLn $ "stopPokerClient invoked"
maybeKillThread tmv2
maybeKillThread tmv1
The abbreviated startPokerClient looks like this..
{-# NOINLINE lock #-}
lock :: MVar ()
lock = unsafePerformIO $ newMVar ()
startPokerClient post (host, port) auth =
do h <- withMVar lock -- getHostByName is not thread-safe
$ \_ -> connectTo host (PortNumber $ fromIntegral port)
`catchError`
(\e -> fail $ "Failed to connect to server: "
++ show e)
-- kick off the SSL handshake
ssl <- startSSL
doSSLHandshake h ssl
...
-- we need one thread to always kill the other
(tmv1 :: MaybeThread) <- atomically $ newEmptyTMVar
(tmv2 :: MaybeThread) <- atomically $ newEmptyTMVar
(tmvssl :: MaybeSSL) <- atomically $ newTMVar (Just ssl)
-- thread to read from the socket and post commands
tid1 <- forkIO $ writeLoop post h ssl
`finally` finalize tmv1 tmv2 h tmvssl post
-- thread to write to grab commands generated
-- by the main thread and write them to the socket
chan <- atomically $ newTChan
tid2 <- forkIO $ readLoop h chan
`finally` finalize tmv2 tmv1 h tmvssl post
-- we have both threads now
atomically $! do putTMVar tmv1 (Just tid1)
putTMVar tmv2 (Just tid2)
return $! PC tmv2 tmv1 chan
Interestingly enough, I had to use $! otherwise startPokerClient was
sitting there for a while, due to due to lazy evaluation I assume.
I'm using the following finalizer for my socket reader/writer threads
to make sure that if one thread dies the other one is taken with it
and that the main thread is notified. I don't have a Maybe around the
socket itself since I don't think closing a socket twice has any bad
side effects. I put it in a try, though, for good measure.
finalize :: MaybeThread
-> MaybeThread
-> Handle
-> MaybeSSL
-> (Event -> IO ())
-> IO ()
finalize me buddy h ssl post =
do -- make sure we are not killed again
-- killing a dead thread is apparently
-- not free of side effects in GHC 6.4.1
atomically $ putTMVar me Nothing
-- kill the other thread
maybeKillThread buddy
maybeFreeSSL ssl
try $ hClose h
post $ StopScript -- notify main thread
return ()
The rest of the relevant code is below. Please let me know if it can
be simplified or made more robust.
maybeFreeSSL :: MaybeSSL -> IO ()
maybeFreeSSL tmv =
do putStrLn $ "maybeFreeSSL invoked"
mssl <- atomically $ swapTMVar tmv Nothing
case mssl of
Nothing -> return ()
Just (ssl, _, _) -> do sslFree ssl
sslFree ssl
maybeKillThread :: MaybeThread -> IO ()
maybeKillThread tmv =
do putStrLn $ "maybeKillThread invoked "
mtid <- atomically $ swapTMVar tmv Nothing
case mtid of
Nothing -> return ()
Just tid -> killThread tid
--- Socket reader
writeLoop :: (Event -> IO ()) -> Handle -> (SSL, BIO, BIO) -> IO ()
writeLoop post h ssl =
do cmd <- read h ssl
post $ Cmd cmd
writeLoop post h ssl
--- Socket writer
readLoop :: Handle -> TChan Command -> IO ()
readLoop h chan =
do cmd <- atomically $ readTChan chan
write h cmd
readLoop h chan
--- SSL, etc. not used now but will be used to decrypt later
read :: Handle -> (SSL, BIO, BIO) -> IO Command
read h _ =
do bytes <- hGet h 4
let (size', _) = unpickle endian32 bytes
size = fromIntegral $ size' - 4
if size > 0
then do packet <- hGet h size
when (size /= length packet) $ fail "Packet size
mismatch"
unstuff packet
else do fail $ "read: size is 0: bytes: " ++ show bytes
unstuff :: [Word8] -> IO Command
unstuff packet =
do let (kind, packet') = unpickle puCmdType packet
(cmd', _) = unpickle (puCommand kind) packet'
case cmd' of
InvalidCommand -> do fail $ "unstuff: Cannot parse: " ++
show packet
SrvCompressedCommands sz data' ->
do data'' <- uncompress data' $ fromIntegral sz
unstuff $ drop 4 data''
_ -> return $! cmd'
write :: Handle -> Command -> IO ()
write h cmd =
do let kind = cmdType cmd
bytes' = pickle puCmdType kind
bytes = bytes' ++ pickle (puCommand kind) cmd
when (length bytes <= 0) $
fail $ "write: Could not pack: " ++ show cmd
let size = 4 + length bytes
size' = pickle endian32 $ fromIntegral size
when (length size' <= 0) $
fail $ "write: Could not pack: " ++ show size
hPut h (size' ++ bytes)
hFlush h
--
http://wagerlabs.com/
More information about the Haskell-Cafe
mailing list