[Haskell-cafe] STM, Concurrent Haskell and network clients (long, code)

Joel Reymont joelr1 at gmail.com
Fri Dec 2 10:20:20 EST 2005


Folks,

After two months with Haskell I think I'm starting to get the proper  
spirit. Inspired by Simon PJ's recent comments about lots of threads  
not hurting Haskell I introduced STM into my network client. I would  
like to run my architecture by you to see if there are any  
improvements to be made. I would like the code to be as bullet-proof  
as possible.

I'm also jumping through some hoops to make sure none of the 3  
threads (main, socket reader and socket writer) block indefinitely.  
I'm getting a "thread blocked indefinitely" exception now and I think  
this is because the main thread is not getting notified when the  
socket threads die.

My network client has two threads. #1 reads network packets, converts  
them to events and posts them to a TChan using a function passed to  
it by the main thread. #2 blocks on a second TChan and waits for  
events to be sent out through the socket.

The main thread creates a TChan to read events from, passes a closure  
that writes to the TChan to the network client and uses a network  
client function to send events to the network client.

All in all it works out very nicely as there's no networking, FFI,  
etc. in the network client logic. This is an improvement from what I  
had before.

The function that starts the poker client has this signature:

startPokerClient :: (Event -> IO ()) -- fun to send events to the  
main thread
                  -> HostInfo -- (host, port)
                  -> AuthInfo -- (user, pass, etc.)
                  -> IO PokerClient

where

data PokerClient = PC
     !MaybeThread
     !MaybeThread
     !(TChan Command)

The main network client thread calls this fun to deliver events to  
the network client:

writePokerClient :: PokerClient -> Command -> IO ()
writePokerClient (PC _ _ chan) cmd =
     atomically $ writeTChan chan cmd

Main thread can stop the network client using

stopPokerClient :: PokerClient -> IO ()
stopPokerClient (PC tmv2 tmv1 _) =
     do putStrLn $ "stopPokerClient invoked"
        maybeKillThread tmv2
        maybeKillThread tmv1

The abbreviated startPokerClient looks like this..

{-# NOINLINE lock #-}

lock :: MVar ()
lock = unsafePerformIO $ newMVar ()

startPokerClient post (host, port) auth =
     do h <- withMVar lock -- getHostByName is not thread-safe
             $ \_ -> connectTo host (PortNumber $ fromIntegral port)
                     `catchError`
                     (\e -> fail $ "Failed to connect to server: "
                            ++ show e)
        -- kick off the SSL handshake
        ssl <- startSSL
        doSSLHandshake h ssl
        ...
        -- we need one thread to always kill the other
        (tmv1 :: MaybeThread) <- atomically $ newEmptyTMVar
        (tmv2 :: MaybeThread) <- atomically $ newEmptyTMVar
        (tmvssl :: MaybeSSL) <- atomically $ newTMVar (Just ssl)
        -- thread to read from the socket and post commands
        tid1 <- forkIO $ writeLoop post h ssl
                `finally` finalize tmv1 tmv2 h tmvssl post
        -- thread to write to grab commands generated
        -- by the main thread and write them to the socket
        chan <- atomically $ newTChan
        tid2 <- forkIO $ readLoop h chan
                `finally` finalize tmv2 tmv1 h tmvssl post
        -- we have both threads now
        atomically $! do putTMVar tmv1 (Just tid1)
                         putTMVar tmv2 (Just tid2)
        return $! PC tmv2 tmv1 chan

Interestingly enough, I had to use $! otherwise startPokerClient was  
sitting there for a while, due to due to lazy evaluation I assume.

I'm using the following finalizer for my socket reader/writer threads  
to make sure that if one thread dies the other one is taken with it  
and that the main thread is notified. I don't have a Maybe around the  
socket itself since I don't think closing a socket twice has any bad  
side effects. I put it in a try, though, for good measure.

finalize :: MaybeThread
          -> MaybeThread
          -> Handle
          -> MaybeSSL
          -> (Event -> IO ())
          -> IO ()
finalize me buddy h ssl post =
     do -- make sure we are not killed again
        -- killing a dead thread is apparently
        -- not free of side effects in GHC 6.4.1
        atomically $ putTMVar me Nothing
        -- kill the other thread
        maybeKillThread buddy
        maybeFreeSSL ssl
        try $ hClose h
        post $ StopScript -- notify main thread
        return ()

The rest of the relevant code is below. Please let me know if it can  
be simplified or made more robust.

maybeFreeSSL :: MaybeSSL -> IO ()
maybeFreeSSL tmv =
     do putStrLn $ "maybeFreeSSL invoked"
        mssl <- atomically $ swapTMVar tmv Nothing
        case mssl of
          Nothing -> return ()
          Just (ssl, _, _) -> do sslFree ssl
                                 sslFree ssl

maybeKillThread :: MaybeThread -> IO ()
maybeKillThread tmv =
     do putStrLn $ "maybeKillThread invoked "
        mtid <- atomically $ swapTMVar tmv Nothing
        case mtid of
          Nothing -> return ()
          Just tid -> killThread tid

--- Socket reader

writeLoop :: (Event -> IO ()) -> Handle -> (SSL, BIO, BIO) -> IO ()
writeLoop post h ssl =
     do cmd <- read h ssl
        post $ Cmd cmd
        writeLoop post h ssl

--- Socket writer

readLoop :: Handle -> TChan Command -> IO ()
readLoop h chan =
     do cmd <- atomically $ readTChan chan
        write h cmd
        readLoop h chan

--- SSL, etc. not used now but will be used to decrypt later

read :: Handle -> (SSL, BIO, BIO) -> IO Command
read h _ =
     do bytes <- hGet h 4
        let (size', _) = unpickle endian32 bytes
            size = fromIntegral $ size' - 4
        if size > 0
           then do packet <- hGet h size
                   when (size /= length packet) $ fail "Packet size  
mismatch"
                   unstuff packet
           else do fail $ "read: size is 0: bytes: " ++ show bytes

unstuff :: [Word8] -> IO Command
unstuff packet =
     do let (kind, packet') = unpickle puCmdType packet
            (cmd', _) = unpickle (puCommand kind) packet'
        case cmd' of
          InvalidCommand -> do fail $ "unstuff: Cannot parse: " ++  
show packet
          SrvCompressedCommands sz data' ->
              do data'' <- uncompress data' $ fromIntegral sz
                 unstuff $ drop 4 data''
          _ -> return $! cmd'

write :: Handle -> Command -> IO ()
write h cmd =
     do let kind = cmdType cmd
            bytes' = pickle puCmdType kind
            bytes = bytes' ++ pickle (puCommand kind) cmd
        when (length bytes <= 0) $
             fail $ "write: Could not pack: " ++ show cmd
        let size = 4 + length bytes
            size' = pickle endian32 $ fromIntegral size
        when (length size' <= 0) $
             fail $ "write: Could not pack: " ++ show size
        hPut h (size' ++ bytes)
        hFlush h


--
http://wagerlabs.com/







More information about the Haskell-Cafe mailing list