BDE 4.14.0 Production release
Loading...
Searching...
No Matches
bdlmt_threadmultiplexor

Detailed Description

Outline

Purpose

Provide a mechanism for partitioning a collection of threads.

Classes

See also
bdlmt_threadpool, bdlmt_fixedthreadpool

Description

This component provides a mechanism for partitioning a collection of threads, so that a single collection of threads (e.g., a thread pool) can be used to process various types of user-defined functions ("jobs") while sharing the thread resources equitably between them. A typical example where this type of partitioning is desired is an application that performs both I/O and CPU-intensive processing. The traditional approach is to create two thread pools–one for I/O, and one for processing–and pass control (in the form of a callback) from one thread pool to the other. However, there are several problems with this approach. Firstly, the process incurs the overhead of context switching between threads, which must necessarily occur because there are two different thread pools. Secondly, the process may not be able to adapt well to imbalances between one type of processing versus the other if the number of threads in each thread pool is bounded. In this case, a large number of jobs may be enqueued while some portion of threads allocated to the process go unused. On the other hand, simply sharing a single thread pool without a provision for partitioning the use of threads may result in one type of processing starving the other.

The bdlmt::ThreadMultiplexor provides an API, processJob, to process user-specified jobs. A multiplexor instance is configured with a maximum number of "processors", i.e., the maximum number of threads that may process jobs at any particular time. Additional threads enqueue jobs to a pending job queue, which is processed by the next available processing thread.

Typically, a bdlmt::ThreadMultiplexor instance is used in conjunction with a thread pool (e.g., bdlmt::FixedThreadPool), where each thread pool thread calls the multiplexor processJob method to perform some work. The multiplexor guarantees that no more that the configured number of threads will process jobs concurrently. This guarantee allows a single thread pool to be used in a variety of situations that require partitioning thread resources.

Thread Safety

The bdlmt::ThreadMultiplexor class is both fully thread-safe (i.e., all non-creator methods can correctly execute concurrently), and is thread-enabled (i.e., the class does not function correctly in a non-multi-threading environment). See bsldoc_glossary for complete definitions of fully thread-safe and thread-enabled.

Usage

This section illustrates intended use of this component.

Example 1: Multiple Work Queues

The following usage example illustrates how the bdlmt::ThreadMultiplexor can be used to share thread resources between three separate work queues. Assume that there are three classes of jobs: jobs that are important, jobs that are urgent, and jobs that are critical. We would like to execute each class of jobs in a single thread pool, but we want ensure that all types of jobs can be executed at any time.

We begin by defining a class that encapsulates the notion of a job queue. Our JobQueue class holds a reference to a bdlmt::FixedThreadPool, used to instantiate the job queue, and owns an instance of bdlmt::ThreadMultiplexor, used to process jobs.

/// This class defines a generic processor for user-defined functions
/// ("jobs"). Jobs specified to the `processJob` method are executed
/// in the thread pool specified at construction.
class JobQueue {
public:
// PUBLIC TYPES
/// A callback of this type my be specified to the `processJob` method.
private:
// DATA
bdlmt::FixedThreadPool *d_threadPool_p; // (held, not owned)
bdlmt::ThreadMultiplexor d_multiplexor; // used to partition threads
private:
// NOT IMPLEMENTED
JobQueue(const JobQueue&);
JobQueue& operator=(const JobQueue&);
public:
// CREATORS
/// Create a job queue that executes jobs in the specified
/// 'threadPool' using no more than the specified 'maxProcessors'.
/// Optionally specify a 'basicAllocator' used to supply memory. If
/// 'basicAllocator' is 0, the currently installed default allocator
/// is used.
JobQueue(int maxProcessors,
bslma::Allocator *basicAllocator = 0);
/// Destroy this object.
~JobQueue();
// MANIPULATORS
/// Process the specified `job` in the thread pool specified at
/// construction. Return 0 on success, and a non-zero value otherwise.
int processJob(const Job& job);
};
Definition bdlmt_fixedthreadpool.h:413
Definition bdlmt_threadmultiplexor.h:289
Definition bslma_allocator.h:457

The maximum number of processors for the multiplexor instance owned by each JobQueue is configured using the following formula, for T = number of threads and M = number of multiplexors > 1:

maxProc = ceil(T / (M-1))-1

This allows multiple JobQueue instances to share the same threadpool without starving each other when the thread pool has more than one thread. For this usage example, we assume M (number of multiplexors) = 3, and T (number of threads) = 5, so maxProc = 2. It is important to note that every call to processJob enqueues a job to the thread pool, so the length of the thread pool queue determines the maximum number of jobs that can be accepted by the JobQueue. (Multiple JobQueues share the same maximum together, so not all will be able to reach their individual maximum at the same time).

JobQueue::JobQueue(int maxProcessors,
bslma::Allocator *basicAllocator)
: d_threadPool_p(threadPool)
, d_multiplexor (maxProcessors,
threadPool->queueCapacity(),
basicAllocator)
{
}
JobQueue::~JobQueue()
{
}

The processJob method enqueues a secondary callback into the thread pool that executes the user-specified job through the multiplexor.

int JobQueue::processJob(const JobQueue::Job& job)
{
return d_threadPool_p->tryEnqueueJob(bdlf::BindUtil::bind(
&bdlmt::ThreadMultiplexor::processJob<Job>,
&d_multiplexor,
job));
}
static Bind< bslmf::Nil, t_FUNC, Bind_BoundTuple0 > bind(t_FUNC func)
Definition bdlf_bind.h:1830

The following program uses three instances of JobQueue to process important, urgent, and critical jobs using a single collection of threads.

int main(void)
{
enum {
NUM_THREADS = 5, // total number of threads
NUM_QUEUES = 3, // total number of JobQueue objects
MAX_QUEUESIZE = 20 // total number of pending jobs
};
int maxProc = bsl::max(1,
ceil(double(NUM_THREADS) / (NUM_QUEUES-1))-1);
bdlmt::FixedThreadPool tp(NUM_THREADS, MAX_QUEUESIZE);
JobQueue importantQueue(maxProc, &tp);
JobQueue urgentQueue(maxProc, &tp);
JobQueue criticalQueue(maxProc, &tp);
if (0 != tp.start()) {
ASSERT(0 == "Could not start thread pool!");
return -1;
}
JobQueue::Job ijob =
JobQueue::Job ujob = bdlf::BindUtil::bind(
JobQueue::Job cjob = bdlf::BindUtil::bind(
importantQueue.processJob(ijob);
importantQueue.processJob(ijob);
importantQueue.processJob(ijob);
importantQueue.processJob(ijob);
importantQueue.processJob(ijob);
importantQueue.processJob(ijob);
urgentQueue.processJob(ujob);
urgentQueue.processJob(ujob);
urgentQueue.processJob(ujob);
urgentQueue.processJob(ujob);
criticalQueue.processJob(cjob);
criticalQueue.processJob(cjob);
tp.stop();
ASSERT(6 == iCheck);
ASSERT(4 == uCheck);
ASSERT(2 == cCheck);
return 0;
}
int add(int value)
Definition bsls_atomic.h:1636