BDE 4.14.0 Production release
Loading...
Searching...
No Matches
bdlma_concurrentpoolallocator

Detailed Description

Outline

Purpose

Provide thread-safe memory-pooling allocator of fixed-size blocks.

Classes

Description

This component defines a class, bdlma::ConcurrentPoolAllocator, that implements the bslma::Allocator protocol and provides a thread-safe allocator of pooled memory blocks of uniform size (the "pooled size"). The pooled size is either (1) configured at construction, or (2) equal to the size of the first block allocated through the allocator. All of the allocation requests of sizes up to the pooled size are satisfied with blocks from the underlying pool. All requests of sizes larger than the pooled size will be satisfied through the external allocator (the allocator supplied at construction, or the default allocator if no allocator was provided).

Protocol Hierarchy

The interface hierarchy (defined by direct public inheritance) of bdlma::ConcurrentPoolAllocator is as follows:

,-------------------.
`-------------------'
| ctor/dtor
| objectSize
V
,----------------.
( bslma::Allocator )
`----------------'
allocate
deallocate
Definition bdlma_concurrentpoolallocator.h:635

bdlma::ConcurrentPoolAllocator provides a concrete, thread-safe implementation of the bslma::Allocator protocol.

Usage

The bdlma::ConcurrentPoolAllocator is intended to be used in either of the following two cases.

The first case is where frequent allocation and deallocation of memory occurs through the bslma::Allocator protocol and all of the allocated blocks have the same size. In this case, the size of blocks to pool is determined the first time allocate is called and need not be specified at construction.

The second case is where frequent allocation and deallocation of memory occurs through the bslma::Allocator protocol, most of the allocations have similar sizes, and a likely maximum for the largest allocation is known at the time of construction.

Example 1: Uniform Sized Allocations

The following example illustrates the use of bdlma::ConcurrentPoolAllocator when all allocations are of uniform size. A bdlma::ConcurrentPoolAllocator is used in the implementation of a "work queue" where each "item" enqueued by a producer thread is of identical size. Concurrently, a consumer dequeues each work item when it becomes available, verifies the content (a sequence number in ASCII), and deallocates the work item. The concurrent allocations and deallocations are valid because bdlma::ConcurrentPoolAllocator is thread-safe.

First, an abstract of the example will be given with focus and commentary on the relevant details of bdlma::ConcurrentPoolAllocator. Details pertaining to queue management, thread creation, thread synchronization, etc., can be seen in the full listing at the end of this example.

The parent thread creates the bdlma::ConcurrentPoolAllocator and work queue by the statements:

my1_WorkQueue queue(&poolAlloc);

Note that since the default constructor is used to create poolAlloc, the pooled size has not yet been fixed.

The work queue is defined by the following data structures.

struct my1_WorkItem {
// DATA
char *d_item; // represents work to perform
};
struct my1_WorkQueue {
// DATA
bsl::list<my1_WorkItem> d_queue; // queue of work requests
bslmt::Mutex d_mx; // protects the shared queue
bslmt::Condition d_cv; // signals existence of new work
bslma::Allocator *d_alloc_p; // pooled allocator
// CREATORS
explicit my1_WorkQueue(bslma::Allocator *basicAllocator = 0)
: d_alloc_p(bslma::Default::allocator(basicAllocator))
{
}
};
Forward declaration required by List_NodeProctor.
Definition bslstl_list.h:1033
Definition bslma_allocator.h:457
Definition bslmt_condition.h:220
Definition bslmt_mutex.h:315
Definition balxml_encoderoptions.h:68

The producer and consumer threads are given the address of the work queue as their sole argument. Here, the producer allocates a work item, initializes it with a sequence number in ASCII, enqueues it, and signals its presence to the consumer thread. This action is done 50 times, and then a 51st, empty work item is added to inform the consumer of the end of the queue. The first allocation of a work item (100 bytes) fixes the pooled size. Each subsequent allocation is that same size (100 bytes). The producer's actions are shown below:

extern "C"
void *my1_producer(void *arg)
{
my1_WorkQueue *queue = (my1_WorkQueue *)arg;
for (int i = 0; i < 50; ++i) {
char b[100];
bsl::sprintf(b, "%d", i);
int len = static_cast<int>(bsl::strlen(b));
my1_WorkItem request;
// Fixed allocation size sufficient for content.
request.d_item = (char *)queue->d_alloc_p->allocate(100);
bsl::memcpy(request.d_item, b, len+1);
// Enqueue item and signal any waiting threads.
// ...
}
// Add empty item.
// ...
return queue;
}

When the consumer thread finds that the queue is not empty it dequeues the item, verifies its content (a sequence number in ASCII), returns the work item to the pool, and checks for the next item. If the queue is empty, the consumer blocks until signaled by the producer. An empty work item indicates that the producer will send no more items, so the consumer exits. The consumer's actions are shown below:

extern "C"
void *my1_consumer(void *arg)
{
my1_WorkQueue *queue = (my1_WorkQueue *)arg;
for (int i = 0; ; ++i) {
// Block until work item on queue.
// ...
// Dequeue item.
// ...
// Break when end-of-work item received.
// ...
char b[100];
bsl::sprintf(b, "%d", i);
assert(bsl::strcmp(b, item.d_item) == 0); // check content
queue->d_alloc_p->deallocate(item.d_item); // deallocate
}
return 0;
}

A complete listing of the example's structures and functions follows:

struct my1_WorkItem {
// DATA
char *d_item; // represents work to perform
};
struct my1_WorkQueue {
// DATA
bsl::list<my1_WorkItem> d_queue; // queue of work requests
bslmt::Mutex d_mx; // protects the shared queue
bslmt::Condition d_cv; // signals existence of new work
bslma::Allocator *d_alloc_p; // pooled allocator
private:
// Not implemented:
my1_WorkQueue(const my1_WorkQueue&);
public:
// CREATORS
explicit my1_WorkQueue(bslma::Allocator *basicAllocator = 0)
: d_alloc_p(bslma::Default::allocator(basicAllocator))
{
}
};
extern "C" void *my1_producer(void *arg)
{
my1_WorkQueue *queue = (my1_WorkQueue *)arg;
for (int i = 0; i < 50; ++i) {
char b[100];
bsl::sprintf(b, "%d", i);
int len = static_cast<int>(bsl::strlen(b));
my1_WorkItem request;
request.d_item = (char *)queue->d_alloc_p->allocate(100);
bsl::memcpy(request.d_item, b, len+1);
if (veryVerbose) {
// Assume thread-safe implementations of 'cout' and 'endl'
// exist (named 'MTCOUT' and 'MTENDL', respectively).
MTCOUT << "Enqueuing " << request.d_item << MTENDL;
}
queue->d_mx.lock();
queue->d_queue.push_back(request);
queue->d_mx.unlock();
queue->d_cv.signal();
}
my1_WorkItem request;
request.d_item = 0;
queue->d_mx.lock();
queue->d_queue.push_back(request);
queue->d_mx.unlock();
queue->d_cv.signal();
return queue;
}
extern "C" void *my1_consumer(void *arg)
{
my1_WorkQueue *queue = (my1_WorkQueue *)arg;
for (int i = 0; ; ++i) {
queue->d_mx.lock();
while (0 == queue->d_queue.size()) {
queue->d_cv.wait(&queue->d_mx);
}
my1_WorkItem item = queue->d_queue.front();
queue->d_queue.pop_front();
queue->d_mx.unlock();
if (0 == item.d_item) {
break;
}
// Process the work requests.
if (veryVerbose) {
// Assume thread-safe implementations of 'cout' and 'endl'
// exist (named 'MTCOUT' and 'MTENDL', respectively).
MTCOUT << "Processing " << item.d_item << MTENDL;
}
char b[100];
bsl::sprintf(b, "%d", i);
assert(bsl::strcmp(b, item.d_item) == 0);
queue->d_alloc_p->deallocate(item.d_item);
}
return 0;
}

In the application main:

{
my1_WorkQueue queue(&poolAlloc);
bslmt::ThreadUtil::Handle producerHandle;
int status = bslmt::ThreadUtil::create(&producerHandle,
attributes,
&my1_producer,
&queue);
assert(0 == status);
bslmt::ThreadUtil::Handle consumerHandle;
status = bslmt::ThreadUtil::create(&consumerHandle,
attributes,
&my1_consumer,
&queue);
assert(0 == status);
status = bslmt::ThreadUtil::join(consumerHandle);
assert(0 == status);
status = bslmt::ThreadUtil::join(producerHandle);
assert(0 == status);
}
Definition bslmt_threadattributes.h:356
static int create(Handle *handle, ThreadFunction function, void *userData)
Definition bslmt_threadutil.h:813
Imp::Handle Handle
Definition bslmt_threadutil.h:385
static int join(Handle &threadHandle, void **status=0)
Definition bslmt_threadutil.h:949

Example 2: Variable Allocation Size

The following example illustrates the use of bdlma::ConcurrentPoolAllocator when allocations are of varying size. A bdlma::ConcurrentPoolAllocator is used in the implementation of a "work queue" where each "item" enqueued by a producer thread varies in size, but all items are smaller than a known maximum. Concurrently, a consumer thread dequeues each work item when it is available, verifies its content (a sequence number in ASCII), and deallocates the work item. The concurrent allocations and deallocations are valid because bdlma::ConcurrentPoolAllocator is thread-safe.

First, an abstract of the example will be given with focus and commentary on the relevant details of bdlma::ConcurrentPoolAllocator. Details pertaining to queue management, thread creation, thread synchronization, etc., can be seen in the full listing at the end of this example.

The parent thread creates the bdlma::ConcurrentPoolAllocator and work queue by the statements:

my1_WorkQueue queue(&poolAlloc);

Note that the pooled size (100) is specified in the construction of poolAlloc. Any requests in excess of that size will be satisfied by implicit calls to the default allocator, not from the underlying pool.

The work queue is defined by the following data structures.

struct my2_WorkItem {
// DATA
char *d_item; // represents work to perform
};
struct my2_WorkQueue {
// DATA
bsl::list<my2_WorkItem> d_queue; // queue of work requests
bslmt::Mutex d_mx; // protects the shared queue
bslmt::Condition d_cv; // signals existence of new work
bslma::Allocator *d_alloc_p; // pooled allocator
// CREATORS
explicit my2_WorkQueue(bslma::Allocator *basicAllocator = 0)
: d_queue(basic_Allocator)
, d_alloc_p(bslma::Default::allocator(basic_Allocator))
{
}
};

In this example (unlike Example 1), the given allocator is used not only for the work items, but is also passed to the constructor of d_queue so that it also serves memory for the operations of bsl::list<my2_WorkItem>.

The producer and consumer threads are given the address of the work queue as their sole argument. Here, the producer allocates a work item, initializes it with a sequence number in ASCII, enqueues it, and signals its presence to the consumer thread. The action is done 50 times, and then a 51st, empty work item is added to inform the consumer of the end of the queue. In this example, each work item is sized to match the length of its contents, the sequence number in ASCII. The producer's actions are shown below:

extern "C" void *my2_producer(void *arg)
{
my2_WorkQueue *queue = (my2_WorkQueue *)arg;
for (int i = 0; i < 50; ++i) {
char b[100];
bsl::sprintf(b, "%d", i);
int len = static_cast<int>(bsl::strlen(b));
my2_WorkItem request;
// Allocate item to exactly match space needed for content.
request.d_item = (char *)queue->d_alloc_p->allocate(len+1);
bsl::memcpy(request.d_item, b, len+1);
// Enqueue item and signal any waiting threads.
// ...
}
// Add empty item.
// ...
return queue;
}

The actions of this consumer thread are essentially the same as those of the consumer thread in Example 1.

When the consumer thread finds that the queue is not empty, it dequeues the item, verifies its content (a sequence number in ASCII), returns the work item to the pool, and checks for the next item. If the queue is empty, the consumer blocks until signaled by the producer. An empty work item indicates that the producer will send no more items, so the consumer exits. The consumer's actions are shown below.

extern "C" void *my2_consumer(void *arg)
{
my2_WorkQueue *queue = (my2_WorkQueue *)arg;
while (int i = 0; ; ++i) {
// Block until work item on queue.
// ...
// Deque item.
// ...
// Break when end-of-work item received.
// ...
char b[100];
bsl::sprintf(b, "%d", i);
assert(bsl::strcmp(b, item.d_item) == 0); // verify content
queue->d_alloc_p->deallocate(item.d_item); // deallocate
}
return 0;
}

A complete listing of the example's structures and functions follows:

struct my2_WorkItem {
// DATA
char *d_item; // represents work to perform
};
struct my2_WorkQueue {
// DATA
bsl::list<my2_WorkItem> d_queue; // queue of work requests
bslmt::Mutex d_mx; // protects the shared queue
bslmt::Condition d_cv; // signals existence of new work
bslma::Allocator *d_alloc_p; // pooled allocator
private:
// Not implemented:
my2_WorkQueue(const my2_WorkQueue&);
public:
// CREATORS
explicit my2_WorkQueue(bslma::Allocator *basicAllocator = 0)
: d_queue(basicAllocator)
, d_alloc_p(basicAllocator)
{
}
};
extern "C" void *my2_producer(void *arg)
{
my2_WorkQueue *queue = (my2_WorkQueue *)arg;
for (int i = 0; i < 50; ++i) {
char b[100];
bsl::sprintf(b, "%d", i);
int len = static_cast<int>(bsl::strlen(b));
my2_WorkItem request;
request.d_item = (char *)queue->d_alloc_p->allocate(len+1);
bsl::memcpy(request.d_item, b, len+1);
if (veryVerbose) {
// Assume thread-safe implementations of 'cout' and 'endl'
// exist (named 'MTCOUT' and 'MTENDL', respectively).
MTCOUT << "Enqueuing " << request.d_item << MTENDL;
}
queue->d_mx.lock();
queue->d_queue.push_back(request);
queue->d_mx.unlock();
queue->d_cv.signal();
}
my2_WorkItem request;
request.d_item = 0;
queue->d_mx.lock();
queue->d_queue.push_back(request);
queue->d_mx.unlock();
queue->d_cv.signal();
return queue;
}
extern "C" void *my2_consumer(void *arg)
{
my2_WorkQueue *queue = (my2_WorkQueue *)arg;
for (int i = 0; ; ++i) {
queue->d_mx.lock();
while (0 == queue->d_queue.size()) {
queue->d_cv.wait(&queue->d_mx);
}
my2_WorkItem item = queue->d_queue.front();
queue->d_queue.pop_front();
queue->d_mx.unlock();
if (0 == item.d_item) {
break;
}
// Process the work requests.
if (veryVerbose) {
// Assume thread-safe implementations of 'cout' and 'endl'
// exist (named 'MTCOUT' and 'MTENDL', respectively).
MTCOUT << "Processing " << item.d_item << MTENDL;
}
char b[100];
bsl::sprintf(b, "%d", i);
assert(bsl::strcmp(b, item.d_item) == 0);
queue->d_alloc_p->deallocate(item.d_item);
}
return 0;
}

In the application's main:

{
my2_WorkQueue queue(&poolAlloc);
bslmt::ThreadUtil::Handle producerHandle;
int status = bslmt::ThreadUtil::create(&producerHandle,
attributes,
&my2_producer,
&queue);
assert(0 == status);
bslmt::ThreadUtil::Handle consumerHandle;
status = bslmt::ThreadUtil::create(&consumerHandle,
attributes,
&my2_consumer,
&queue);
assert(0 == status);
status = bslmt::ThreadUtil::join(consumerHandle);
assert(0 == status);
status = bslmt::ThreadUtil::join(producerHandle);
assert(0 == status);
}