[Haskell-cafe] CURL and threads
Neil Brown
nccb2 at kent.ac.uk
Thu Feb 18 11:33:19 EST 2010
Hi,
Your code forks off N threads to do HTTP response checking, then waits
for the reply (invokeThreads). Each thread (runHTTPThread) calls
curlGetResponse and *immediately* sends the answer back down the channel
to invokeThreads (checkAuthResponse) -- then waits for half a second
before terminating. As soon as the original process (invokeThreads) has
all N responses, it forks off N threads again.
So if your code manages to process the N requests such that it can do
them all in, say, 0.05 seconds, you'll have about ten times as many
threads in your system as you intended (because they all hang around for
0.5 seconds after completing their work). I suspect what you intended
to do was put that threadDelay call *before* sending back the response,
which would prevent this leaking of threads.
Some quick style suggestions: your recursion pattern in dumpChannel is
easily replaced with replicateM, and your infinite recursion in
invokeThreads could easily become the function "forever". Never recurse
directly if a combinator can remove the need :-)
Your code could easily be accomplished in CHP
(http://hackage.haskell.org/package/chp). runParMapM would solve your
exact problem easily; you could replace your code with:
====
module NTLMTest where
import Control.Monad.Trans (liftIO)
import Control.Applicative ((<$>))
import System.IO
import Network.Curl
import Control.Concurrent.CHP
type ResponseState = Either Bool String
isResponseOk :: String -> CurlResponse -> ResponseState
isResponseOk username response = case respCurlCode response of
CurlOK -> Left True
_ -> Right $ username ++ " => " ++
respStatusLine response ++ " :: " ++ (show . respStatus $ response)
-- Note: I re-ordered the parameters to this function
checkAuthResponse :: String -> String -> String -> IO ResponseState
checkAuthResponse url user passwd
= isResponseOk user <$> curlGetResponse_ url [CurlHttpAuth
[HttpAuthAny], CurlUserPwd $ user ++ ":" ++ passwd]
url = "http://localhost:8082/"
credentials = map (\i -> ("user" ++ show i,"123456")) [1..21]
main = runCHP_ $ runParMapM (liftIO . uncurry (checkAuthResponse url))
credentials
>>= mapM (liftIO . either (const $ return ()) putStrLn)
====
That above version will get all the responses in parallel and print them
out once they are all done, and is quite short. This isn't what your
original code did though -- that read the responses from a channel and
printed them as they arrived. The below version is probably the closest
CHP version to your original code:
====
module NTLMTest where
import Control.Monad (replicateM_, (<=<))
import Control.Monad.Trans (liftIO)
import Control.Applicative ((<$>))
import System.IO
import Network.Curl
import Control.Concurrent.CHP
type ResponseState = Either Bool String
isResponseOk :: String -> CurlResponse -> ResponseState
isResponseOk username response = case respCurlCode response of
CurlOK -> Left True
_ -> Right $ username ++ " => " ++
respStatusLine response ++ " :: " ++ (show . respStatus $ response)
-- Note: I re-ordered the parameters to this function
checkAuthResponse :: String -> String -> String -> IO ResponseState
checkAuthResponse url user passwd
= isResponseOk user <$> curlGetResponse_ url [CurlHttpAuth
[HttpAuthAny], CurlUserPwd $ user ++ ":" ++ passwd]
url = "http://localhost:8082/"
credentials = map (\i -> ("user" ++ show i,"123456")) [1..21]
main = runCHP_ $ do
chan <- anyToOneChannel
runParallel_ $ dumpChannel (reader chan) : map (claim (writer chan) .
writeValue <=< liftIO . uncurry (checkAuthResponse url)) credentials
where
dumpChannel :: Chanin ResponseState -> CHP ()
dumpChannel c = replicateM_ (length credentials) (readChannel c >>=
liftIO . either (const $ return ()) putStrLn)
====
This version runs the dumpChannel procedure in parallel with a thread
for each credential that writes the result to a shared channel (claiming
it as it does so).
Neither of my versions checks the credentials repeatedly like yours
does, but you can easily add that in. If you're not a point-free fan (I
find it irresistible these days), I can break those solutions down a bit
into more functions.
Hope that helps,
Neil.
Eugeny N Dzhurinsky wrote:
> On Wed, Feb 17, 2010 at 07:34:07PM +0200, Eugene Dzhurinsky wrote:
>
>> Hopefully, someone could help me in overcoming my ignorance :)
>>
>
> I realized that I can share the same Chan instance over all invocations in
> main, and wrap internal function into withCurlDo to ensure only one IO action
> gets executed with this library. Finally I've come with the following code,
> which however still has some memory leaks. May be someone will get an idea
> what's wrong below?
>
> =============================================================================================
>
> module NTLMTest where
>
> import System.IO
> import Network.Curl
> import Control.Concurrent
> import Control.Concurrent.Chan
>
> type ResponseState = Either Bool String
>
> type RespChannel = Chan ResponseState
>
> delay = 500 * 1000
>
> isResponseOk :: String -> CurlResponse -> ResponseState
> isResponseOk username response = case respCurlCode response of
> CurlOK -> Left True
> _ -> Right $ username ++ " => " ++ respStatusLine response ++ " :: " ++ (show . respStatus $ response)
>
>
> checkAuthResponse :: RespChannel -> String -> String -> String -> IO ()
> checkAuthResponse state user passwd url = do
> response <- curlGetResponse_ url [CurlHttpAuth [HttpAuthAny], CurlUserPwd $ user ++ ":" ++ passwd]
> writeChan state $ isResponseOk user response
> threadDelay $ delay
>
> runHTTPThread :: RespChannel -> (String,String) -> IO ()
> runHTTPThread state (user,passwd) = checkAuthResponse state user passwd url
>
>
> url = "http://localhost:8082/"
> credentials = map (\i -> ("user" ++ show i,"123456")) [1..21]
>
> main = do
> chan <- newChan :: IO (RespChannel)
> withCurlDo $ invokeThreads chan
> where
> invokeThreads chan = do
> mapM_ ( \cred -> forkIO $ runHTTPThread chan cred ) credentials
> dumpChannel chan $ length credentials
> invokeThreads chan
> dumpChannel :: RespChannel -> Int -> IO ()
> dumpChannel _chan n | n == 0 = return ()
> | otherwise = do state <- readChan _chan
> case state of
> (Left _) -> return () --putStrLn "OK"
> (Right err) -> putStrLn err
> dumpChannel _chan $ n-1
>
>
> =============================================================================================
>
> Thank you in advance!
>
>
> ------------------------------------------------------------------------
>
> _______________________________________________
> Haskell-Cafe mailing list
> Haskell-Cafe at haskell.org
> http://www.haskell.org/mailman/listinfo/haskell-cafe
>
More information about the Haskell-Cafe
mailing list