[Haskell-cafe] CURL and threads

Neil Brown nccb2 at kent.ac.uk
Thu Feb 18 11:33:19 EST 2010


Hi,

Your code forks off N threads to do HTTP response checking, then waits 
for the reply (invokeThreads).  Each thread (runHTTPThread) calls 
curlGetResponse and *immediately* sends the answer back down the channel 
to invokeThreads (checkAuthResponse) -- then waits for half a second 
before terminating.  As soon as the original process (invokeThreads) has 
all N responses, it forks off N threads again.

So if your code manages to process the N requests such that it can do 
them all in, say, 0.05 seconds, you'll have about ten times as many 
threads in your system as you intended (because they all hang around for 
0.5 seconds after completing their work).  I suspect what you intended 
to do was put that threadDelay call *before* sending back the response, 
which would prevent this leaking of threads.

Some quick style suggestions: your recursion pattern in dumpChannel is 
easily replaced with replicateM, and your infinite recursion in 
invokeThreads could easily become the function "forever".  Never recurse 
directly if a combinator can remove the need :-)

Your code could easily be accomplished in CHP 
(http://hackage.haskell.org/package/chp).  runParMapM would solve your 
exact problem easily; you could replace your code with:

====
module NTLMTest where

import Control.Monad.Trans (liftIO)
import Control.Applicative ((<$>))
import System.IO
import Network.Curl
import Control.Concurrent.CHP

type ResponseState = Either Bool String

isResponseOk :: String -> CurlResponse -> ResponseState
isResponseOk username response = case respCurlCode response of
                            CurlOK  -> Left True
                            _       -> Right $ username ++ " => " ++ 
respStatusLine response ++ " :: " ++ (show . respStatus $ response)
                       

-- Note: I re-ordered the parameters to this function
checkAuthResponse :: String -> String -> String -> IO ResponseState
checkAuthResponse url user passwd
  = isResponseOk user <$> curlGetResponse_ url [CurlHttpAuth 
[HttpAuthAny], CurlUserPwd $ user ++ ":" ++ passwd]

url = "http://localhost:8082/"
credentials = map (\i -> ("user" ++ show i,"123456")) [1..21]

main = runCHP_ $ runParMapM (liftIO . uncurry (checkAuthResponse url)) 
credentials
                   >>= mapM (liftIO . either (const $ return ()) putStrLn)

====

That above version will get all the responses in parallel and print them 
out once they are all done, and is quite short.  This isn't what your 
original code did though -- that read the responses from a channel and 
printed them as they arrived.  The below version is probably the closest 
CHP version to your original code:

====
module NTLMTest where

import Control.Monad (replicateM_, (<=<))
import Control.Monad.Trans (liftIO)
import Control.Applicative ((<$>))
import System.IO
import Network.Curl
import Control.Concurrent.CHP

type ResponseState = Either Bool String

isResponseOk :: String -> CurlResponse -> ResponseState
isResponseOk username response = case respCurlCode response of
                            CurlOK  -> Left True
                            _       -> Right $ username ++ " => " ++ 
respStatusLine response ++ " :: " ++ (show . respStatus $ response)
                       

-- Note: I re-ordered the parameters to this function
checkAuthResponse :: String -> String -> String -> IO ResponseState
checkAuthResponse url user passwd
  = isResponseOk user <$> curlGetResponse_ url [CurlHttpAuth 
[HttpAuthAny], CurlUserPwd $ user ++ ":" ++ passwd]

url = "http://localhost:8082/"
credentials = map (\i -> ("user" ++ show i,"123456")) [1..21]

main = runCHP_ $ do
  chan <- anyToOneChannel
  runParallel_ $ dumpChannel (reader chan) : map (claim (writer chan) . 
writeValue <=< liftIO . uncurry (checkAuthResponse url)) credentials
  where
    dumpChannel :: Chanin ResponseState -> CHP ()
    dumpChannel c = replicateM_ (length credentials) (readChannel c >>= 
liftIO . either (const $ return ()) putStrLn)
====

This version runs the dumpChannel procedure in parallel with a thread 
for each credential that writes the result to a shared channel (claiming 
it as it does so).

Neither of my versions checks the credentials repeatedly like yours 
does, but you can easily add that in.  If you're not a point-free fan (I 
find it irresistible these days), I can break those solutions down a bit 
into more functions.

Hope that helps,

Neil.


Eugeny N Dzhurinsky wrote:
> On Wed, Feb 17, 2010 at 07:34:07PM +0200, Eugene Dzhurinsky wrote:
>   
>> Hopefully, someone could help me in overcoming my ignorance :)
>>     
>
> I realized that I can share the same Chan instance over all invocations in
> main, and wrap internal function into withCurlDo to ensure only one IO action
> gets executed with this library. Finally I've come with the following code,
> which however still has some memory leaks. May be someone will get an idea
> what's wrong below?
>
> =============================================================================================
>
> module NTLMTest where
>
> import System.IO
> import Network.Curl
> import Control.Concurrent
> import Control.Concurrent.Chan
>
> type ResponseState = Either Bool String
>
> type RespChannel = Chan ResponseState
>
> delay = 500 * 1000
>
> isResponseOk :: String -> CurlResponse -> ResponseState
> isResponseOk username response = case respCurlCode response of
>                             CurlOK  -> Left True
>                             _       -> Right $ username ++ " => " ++ respStatusLine response ++ " :: " ++ (show . respStatus $ response)
>                         
>
> checkAuthResponse :: RespChannel -> String -> String -> String -> IO ()
> checkAuthResponse state user passwd url = do 
>                                     response <- curlGetResponse_ url [CurlHttpAuth [HttpAuthAny], CurlUserPwd $ user ++ ":" ++ passwd]
>                                     writeChan state $ isResponseOk user response
>                                     threadDelay $ delay
>
> runHTTPThread :: RespChannel -> (String,String) -> IO ()
> runHTTPThread state (user,passwd) = checkAuthResponse state user passwd url
>         
>
> url = "http://localhost:8082/"
> credentials = map (\i -> ("user" ++ show i,"123456")) [1..21]
>
> main = do
>     chan <- newChan :: IO (RespChannel)
>     withCurlDo $ invokeThreads chan
>     where 
>         invokeThreads chan = do
>             mapM_ ( \cred -> forkIO $ runHTTPThread chan cred ) credentials
>             dumpChannel chan $ length credentials
>             invokeThreads chan
>         dumpChannel :: RespChannel -> Int -> IO ()
>         dumpChannel _chan n | n == 0    = return ()
>                             | otherwise = do    state <- readChan _chan
>                                                 case state of
>                                                     (Left _) -> return () --putStrLn "OK"
>                                                     (Right err) -> putStrLn err
>                                                 dumpChannel _chan $ n-1
>
>
> =============================================================================================
>
> Thank you in advance!
>
>   
> ------------------------------------------------------------------------
>
> _______________________________________________
> Haskell-Cafe mailing list
> Haskell-Cafe at haskell.org
> http://www.haskell.org/mailman/listinfo/haskell-cafe
>   



More information about the Haskell-Cafe mailing list