[commit: ghc] master: rts: Add simple resource pool (a3a8ce6)

git at git.haskell.org git at git.haskell.org
Mon Nov 23 16:56:07 UTC 2015


Repository : ssh://git@git.haskell.org/ghc

On branch  : master
Link       : http://ghc.haskell.org/trac/ghc/changeset/a3a8ce6e60466cb3742506c7d7bfa1a5b1012728/ghc

>---------------------------------------------------------------

commit a3a8ce6e60466cb3742506c7d7bfa1a5b1012728
Author: Ben Gamari <ben at smart-cactus.org>
Date:   Thu Oct 22 22:16:46 2015 +0200

    rts: Add simple resource pool


>---------------------------------------------------------------

a3a8ce6e60466cb3742506c7d7bfa1a5b1012728
 rts/Pool.c | 180 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 rts/Pool.h |  54 +++++++++++++++++++
 2 files changed, 234 insertions(+)

diff --git a/rts/Pool.c b/rts/Pool.c
new file mode 100644
index 0000000..6c23807
--- /dev/null
+++ b/rts/Pool.c
@@ -0,0 +1,180 @@
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 2014-2015
+ *
+ * A pool of lazily allocated things
+ *
+ * --------------------------------------------------------------------------*/
+
+#include "PosixSource.h"
+#include "Rts.h"
+#include "RtsUtils.h"
+#include "Pool.h"
+
+/* used to mark an entry as needing to be freed when released */
+#define FLAG_SHOULD_FREE (1 << 0)
+
+typedef struct PoolEntry_ {
+    struct PoolEntry_ *next;
+    void *thing;
+    StgWord flags;
+} PoolEntry;
+
+struct Pool_ {
+    /* the maximum number of allocated resources in the pool */
+    nat max_size;
+    /* the number of allocated resources to keep in the pool when idle */
+    nat desired_size;
+    /* how many things are currently allocated? (sum of lengths of available and
+     * taken lists) */
+    nat current_size;
+#ifdef THREADED_RTS
+    /* signaled when a thing is released */
+    Condition cond;
+#endif
+    alloc_thing_fn alloc_fn;
+    free_thing_fn free_fn;
+
+    PoolEntry *available;
+    PoolEntry *taken;
+#ifdef THREADED_RTS
+    /* protects entire data structure */
+    Mutex mutex;
+#endif
+};
+
+Pool *poolInit(nat max_size, nat desired_size,
+               alloc_thing_fn alloc_fn, free_thing_fn free_fn) {
+    Pool *pool = stgMallocBytes(sizeof(Pool), "pool_init");
+    pool->max_size = max_size == 0 ? (nat) -1 : max_size;
+    pool->desired_size = desired_size;
+    pool->current_size = 0;
+    pool->alloc_fn = alloc_fn;
+    pool->free_fn = free_fn;
+    pool->available = NULL;
+    pool->taken = NULL;
+#ifdef THREADED_RTS
+    initMutex(&pool->mutex);
+    initCondition(&pool->cond);
+#endif
+    return pool;
+}
+
+int poolFree(Pool *pool) {
+    if (pool->taken != NULL)
+        return 1;
+
+    poolSetMaxSize(pool, 0);
+#ifdef THREADED_RTS
+    closeCondition(&pool->cond);
+    closeMutex(&pool->mutex);
+#endif
+    free(pool);
+    return 0;
+}
+
+/* free available entries such that current_size <= size */
+static void free_available(Pool *pool, nat size) {
+    while (pool->current_size > size && pool->available != NULL) {
+        PoolEntry *ent = pool->available;
+        pool->free_fn(ent->thing);
+        pool->available = ent->next;
+        free(ent);
+        pool->current_size--;
+    }
+}
+
+void poolSetDesiredSize(Pool *pool, nat size) {
+    ACQUIRE_LOCK(&pool->mutex);
+    pool->desired_size = size;
+    free_available(pool, size);
+    RELEASE_LOCK(&pool->mutex);
+}
+
+void poolSetMaxSize(Pool *pool, nat size) {
+    ACQUIRE_LOCK(&pool->mutex);
+    if (size == 0)
+        size = (nat) -1;
+    pool->max_size = size;
+    if (pool->desired_size > pool->max_size) {
+        pool->desired_size = size;
+        free_available(pool, size);
+    }
+    RELEASE_LOCK(&pool->mutex);
+}
+
+nat poolGetMaxSize(Pool *pool) {
+    return pool->max_size;
+}
+
+nat poolGetDesiredSize(Pool *pool) {
+    return pool->desired_size;
+}
+
+void *poolTake(Pool *pool) {
+    PoolEntry *ent = NULL;
+    ACQUIRE_LOCK(&pool->mutex);
+    while (ent == NULL) {
+        if (pool->available != NULL) {
+            ent = pool->available;
+            pool->available = ent->next;
+        } else if (pool->current_size < pool->max_size) {
+            ent = stgMallocBytes(sizeof(PoolEntry), "pool_take");
+            ent->flags = 0;
+            ent->thing = pool->alloc_fn();
+            pool->current_size++;
+        } else {
+#ifdef THREADED_RTS
+            waitCondition(&pool->cond, &pool->mutex);
+#else
+            barf("Tried to take from an empty pool");
+#endif
+        }
+    }
+
+    ent->next = pool->taken;
+    pool->taken = ent;
+    RELEASE_LOCK(&pool->mutex);
+    return ent->thing;
+}
+
+void poolRelease(Pool *pool, void *thing) {
+    ACQUIRE_LOCK(&pool->mutex);
+    PoolEntry **last = &pool->taken;
+    PoolEntry *ent = pool->taken;
+    while (ent != NULL) {
+        if (ent->thing == thing) {
+            *last = ent->next;
+            if (pool->current_size > pool->desired_size
+                || ent->flags & FLAG_SHOULD_FREE) {
+                pool->free_fn(ent->thing);
+                free(ent);
+            } else {
+                ent->next = pool->available;
+                pool->available = ent;
+#ifdef THREADED_RTS
+                signalCondition(&pool->cond);
+#endif
+            }
+
+            RELEASE_LOCK(&pool->mutex);
+            return;
+        }
+
+        last = &ent->next;
+        ent = ent->next;
+    }
+
+    barf("pool_release: trying to release resource which doesn't belong to pool.");
+}
+
+void poolFlush(Pool *pool) {
+    ACQUIRE_LOCK(&pool->mutex);
+    free_available(pool, 0);
+    PoolEntry *ent = pool->taken;
+    while (ent != NULL) {
+        ent->flags |= FLAG_SHOULD_FREE;
+        ent = ent->next;
+    }
+    RELEASE_LOCK(&pool->mutex);
+}
diff --git a/rts/Pool.h b/rts/Pool.h
new file mode 100644
index 0000000..d1aeab5
--- /dev/null
+++ b/rts/Pool.h
@@ -0,0 +1,54 @@
+#include "Rts.h"
+
+/*
+ * Resource pools
+ *
+ * This module provides an implementation of a simple thread-safe resource pool.
+ * A pool is a shared set of resources, the size of which is bounded by a
+ * maximum size (0 indicates unbounded). Consumers can request a resource from
+ * the pool with pool_take and, when finished can return it to the pool with
+ * pool_release. Resources will be lazily allocated with alloc_fn as necessary.
+ * If the pool is already at its maximum size when a request is made, pool_take
+ * will block until a resource is freed.
+ *
+ * The pool will free resources such that there are at most desired_size
+ * resources in the pool when all resources have been released.
+ *
+ * invariant: desired_size <= max_size
+ *
+ */
+
+typedef void *(*alloc_thing_fn)(void);
+typedef void (*free_thing_fn)(void *);
+typedef struct Pool_ Pool;
+
+/* Create a pool of things. */
+Pool *poolInit(nat max_size, nat desired_size,
+               alloc_thing_fn alloc_fn, free_thing_fn free_fn);
+
+/* Free a pool. Returns 0 on success or 1 on failure due to things
+ * belonging to the pool currently being claimed. */
+int poolFree(Pool *pool);
+
+/* Set the maximum size of a pool (0 indicates unbounded). desired_size will be
+ * lowered if necessary. */
+void poolSetMaxSize(Pool *pool, nat size);
+
+/* Get the maximum size of a pool */
+nat poolGetMaxSize(Pool *pool);
+
+/* Set the desired size of a pool */
+void poolSetDesiredSize(Pool *pool, nat size);
+
+/* Get the desired size of a pool */
+nat poolGetDesiredSize(Pool *pool);
+
+/* Grab an available thing from a pool */
+void *poolTake(Pool *pool);
+
+/* Release a thing back to the pool from which it was taken */
+void poolRelease(Pool *pool, void *thing);
+
+/* Invalidate all currently allocated resources. Things which are currently
+ * taken will be freed upon release instead of being returned to the pool. */
+void poolFlush(Pool *pool);



More information about the ghc-commits mailing list