[Haskell-cafe] Takusen and large PostgreSQL blobs [was: Handling custom types in Takusen]

oleg at pobox.com oleg at pobox.com
Fri Jul 27 23:34:29 EDT 2007


I have been using Takusen with PostgreSQL to store and retrieve
hundreds of multi-megabyte binary objects. A client may request
literally hundred of such objects in one request; the Haskell
(FastCGI) application server will send these objects in one multi-part
message. The handling of the entire request is done in *constant* and
small memory, at low latency and at the rate that is limited by
client's network connection. The server handles hundreds of such
requests without allocating memory: the Haskell server uses only one
16KB buffer for all of its I/O. Incidentally, with the exception of
occasional existential and extended pattern guards, all the server
code is in Haskell98.

I have been using LO objects of PostgreSQL. That is not the only
design choice: an alternative is to create a table where each row
holds a chunk (e.g., 16K) of data as a byte array. The row will have
two more columns: the blob id and the chunk ordinal counter. This
design lets one incrementally write binary data (using COPYIN
technique) and read data (using COPYOUT or the regular SQL
queries). The COPYIN feature lets us write to the database in
user-defined chunks. Alas, the converse, COPYOUT, can read only whole
rows, which precludes storing all of the data in one row. If we
segment the data in chunks spread across several rows, we regain
incrementality. I have a hunch this method may be preferable, although
I have not tried it. The drawback of LO objects is the need for
frequent vacuuming, which may take really a while if many large
objects are being created and deleted.

The blob interface is designed to permit incremental reading and
writing blobs. In fact, the server never stores the whole blob in
memory. 

Enclosed are the implementations of store_lo and consume_lo
functions. They rely on the notions of `generalized' input and output
ports and the generalized copier. I've been meaning to describe them
properly but don't seem to get around to it. I could refer to the
comments in the file
	http://okmij.org/ftp/Haskell/NewerCGI.hs
Frequently mentioned EMonadIO is a class of monad that permit both i/o
operations and throwing _and_ catching of arbitrary errors. Most of
the transformations of IO are in that class. EMonadIO lets me write
gthrow, gcatch, ghandle, gbracket, etc. without even thinking in which
monad I currently am.



-- Read data from a LO (a kind of PostgreSQL blobs). A blob is identified
-- by its Oid. We determine the size of the blob, create a generalized
-- input port for reading from the blob, and pass the size and the
-- port to the user function. The function will probably use BCopy
-- to copy data from the blob to somewhere else. The function should
-- not store the generalized input port anywhere as the port can't be
-- used after the function returns. We could have enforced that with a
-- marker and Typeable, as we do in Takusen.
-- This function must be invoked in a transaction (it would cause a
-- database error otherwise).
-- We don't bracket the call to the user function as any exceptions
-- are fatal anyway.
consume_lo :: EMonadIO m => Connection -> Oid ->
	      (Int -> Input -> m a) -> m a
consume_lo (Connection db) oid f = 
   do
    lofd <- liftIO $ check_pos "lo_open" $ flo_open db oid eINV_READ
	  -- get the size of LO by seeking to the end, and coming back
    size <- liftIO $ check_pos "lo_lseek" $ flo_lseek db lofd 0 eSEEK_END
    liftIO $ check_pos "lo_lseek" $ flo_lseek db lofd 0 eSEEK_SET
    let inp = Input (\ptr len -> 
         liftIO . liftM fromIntegral . check_pos "lo_read" $ 
		     flo_read db lofd ptr (fromIntegral len))
    r <- f (fromIntegral size) inp
    liftIO $ check_pos "lo_close" $ flo_close db lofd
    return r
  `gcatch` \e -> print_exc e >> liftIO (closeDb db) >> gthrow e
 where
 check_pos str a = a >>= \r -> if r >= 0 then return r else throwPG r str


-- Write data to a LO (a kind of PostgreSQL blobs). A blob is identified
-- by its Oid. We create a generalized output port for writing to the blob,
-- and pass it to the user function. The function will probably use BCopy
-- to copy data to the blob from somewhere else. The function should
-- not store the generalized output port anywhere as the port can't be
-- used after the function returns. We could have enforced that with a
-- marker and Typeable, as we do in Takusen.
-- This function must be invoked in a transaction (it would cause a
-- database error otherwise).
-- We don't bracket the call to the user function as any exceptions
-- are fatal anyway.
store_lo :: EMonadIO m => Connection -> Oid -> (Output -> m a) -> m a
store_lo (Connection db) oid f = 
   do
    lofd <- liftIO $ check_pos "lo_open" $ flo_open db oid eINV_WRITE
    let outp = Output (\ptr len -> 
         liftIO (check_size len =<< check_pos "lo_write"
		     (flo_write db lofd ptr (fromIntegral len))))
    r <- f outp
    liftIO $ check_pos "lo_close" $ flo_close db lofd
    return r
  `gcatch` \e -> print_exc e >> liftIO (closeDb db) >> gthrow e
 where
 check_pos str a = a >>= \r -> if r >= 0 then return r else throwPG r str
 check_size len lwritten = 
     if len == fromIntegral lwritten then return ()
	else throwPG (-1) "lo_write wrote less than expected"



More information about the Haskell-Cafe mailing list