[Haskell-cafe] Blocking IO & FIFOs

Jason Dusek jason.dusek at gmail.com
Sat Oct 20 11:19:36 CEST 2012


Hi all,

I am developing a coroutine-like interface to Bash.

  http://hpaste.org/76523

The idea is that one can send multiple "queries" to an
interpreter and then collect the results of each query. How do
we know when Bash is done with each query? Waiting for "no more
output" seems ambiguous; so the way CoBash works is:

  * Each query gets a tmp dir with two named pipes in it.
  * The query is wrapped in redirections to the pipes.
  * The pipes are removed when the query completes.

This does work, sort of:

  +Prelude> :load CoBash.hs
  [1 of 1] Compiling CoBash           ( CoBash.hs, interpreted )
  Ok, modules loaded: CoBash.
  *CoBash> tuple@(i,o,e,p) <- start
  e :: Handle
  i :: Handle
  o :: Handle
  p :: ProcessHandle
  tuple :: (Handle, Handle, Handle, ProcessHandle)
  *CoBash> query tuple "for n in {1..4}; do sleep 1; echo $n; done"
  ("1\n2\n3\n4\n","")
  it :: (ByteString, ByteString)

I say sort of because it is quite brittle. Many commands do not
return at all, for example:

  *CoBash> query tuple "uname -a"

The way I retrieve the output from the FIFOs seems dangerous:

  (,) <$> Bytes.hGetContents oh <*> Bytes.hGetContents eh

Surely, the FIFO for STDERR can not be read from until the FIFO
for STDOUT is finished; but if there is a great deal of error
output then the process will fill the FIFO's buffer and get
stuck. If we switch the order of the reads, the "for n in ..."
example above blocks:

  (,) <$> Bytes.hGetContents eh <*> Bytes.hGetContents oh

I have tried a few different ways to read from the two handles
concurrently; for example, by giving each thread an MVar
to put the contents in, or by using hGetNonBlocking on a list of
handles in a loop. Using the latter method, I never get EOF; it
just collects empty strings forever.

For comparison's sake, the expect behaviour with FIFOs is:

  In the first terminal:

   :; mkfifo fifo
   :; cat > fifo
    a
    b
    c
    d
    ^D
   :;

  In the second terminal:

   :; cat < fifo
    a
    b
    c
    d
   :;

Here I "open" the FIFO for reading with < while opening it for
writing with >. As long as the writer writes, the reader reads;
when the writer closes the write end of the pipe, the reader
receives EOF. Trying to duplicate the read behaviour in Haskell,
using hGetContents from GHCi while using cat to write to the
FIFO, doesn't work; which seems a little bogus.

There have a been a few past threads about FIFOs and their
troublesome interaction with Haskell's async-by-default IO
style. To switch to System.Posix for IO -- and deal with Ptr
Word8, in order to handle binary data -- seems like an awful
step down.

--
Jason Dusek
pgp // solidsnack // C1EBC57DC55144F35460C8DF1FD4C6C1FED18A2B




{-# LANGUAGE OverloadedStrings
           , ScopedTypeVariables
           , ParallelListComp
           , TupleSections #-}

module CoBash where

import           Control.Applicative
import           Control.Concurrent
import           Control.Concurrent.MVar
import           Control.Exception
import           Control.Monad
import           Data.Bits
import           Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as Bytes
import           Data.Maybe
import           Data.Monoid
import qualified GHC.IO.Handle.FD
import           System.IO
import           System.IO.Error
import           System.Process
import           System.Posix.ByteString

import           System.IO.Temp

import qualified Text.ShellEscape as Esc


start :: IO (Handle, Handle, Handle, ProcessHandle)
start = runInteractiveProcess "bash" [] Nothing (Just [])

query :: (Handle, Handle, Handle, ProcessHandle) -> ByteString
      -> IO (ByteString, ByteString)
query (i, _, _, _) query = withFIFOs query'
 where query' ofo efo = do
         Bytes.hPut i cmd
         hFlush i
         [oh, eh] <- mapM openFIFO [ofo, efo]
         (,) <$> Bytes.hGetContents oh <*> Bytes.hGetContents eh -- Works.
--       (,) <$> Bytes.hGetContents eh <*> Bytes.hGetContents oh -- Blocks.
        where cmd = Bytes.unlines ["{", query, "} 1>" <> ofo <> " 2>" <> efo]

shutdown :: (Handle, Handle, Handle, ProcessHandle) -> IO ()
shutdown (i, _, _, p) = () <$ hClose i <* waitForProcess p


openFIFO path = GHC.IO.Handle.FD.openFileBlocking (Bytes.unpack path) ReadMode

-- | Run an IO action with two FIFOs in scope, which will removed after it
--   completes.
withFIFOs :: (RawFilePath -> RawFilePath -> IO a) -> IO a
withFIFOs m = withSystemTempDirectory "cobash." m'
 where m'   = (uncurry m =<<) . mk . Bytes.pack
       mk d = (o, e) <$ (createNamedPipe o mode >> createNamedPipe e mode)
        where (o, e) = (d <> "/o", d <> "/e")
              mode   = ownerReadMode .|. ownerWriteMode .|. namedPipeMode



More information about the Haskell-Cafe mailing list