BDE 4.14.0 Production release
|
Provide a thread-enabled parameterized multi-priority queue.
This component provides a thread-enabled mechanism, bdlcc::MultipriorityQueue
, implementing a special-purpose priority queue container of items of parameterized TYPE
. Each item has a priority which, for efficiency of implementation, is limited to a relatively small number N
of contiguous integers [ 0 .. N - 1 ]
, with N
indicated at construction, and 0 being the most urgent priority. This queue also takes an optional allocator, supplied at construction. Once configured, these instance parameters remain unchanged for the life of each multi-priority queue.
The thread-enabled bdlcc::MultipriorityQueue
is, in many regards, similar to a value-semantic type in that there is an obvious abstract notion of "value" that can be described in terms of salient attributes, which for this type is a sequence of priority/element pairs, constrained to be in increasing order of priority. There are, however, several differences in method behavior and signature that arise due to the thread-enabled nature of the queue and its anticipated usage pattern.
For example, if a queue object is empty, popFront
will block indefinitely until an element is added to the queue. Also, since dynamic instance information, such as the number of elements currently in a queue, can be out-of-date by the time it is returned, some manipulators (e.g., tryPopFront
) are deliberately combined with an accessor operation (e.g., isEmpty
) in order to guarantee proper behavior.
Finally, note that although the parameterized TYPE
is expected to at least support copy construction and assignment, the bdlcc::MultipriorityQueue<TYPE>
type currently does not support any value-semantic operations, since different queues could have different numbers of priorities, making comparison, assignment and copy construction awkward.
In addition to popFront
and tryPopFront
, a bdlcc::MultipriorityQueue
may some day also provide a timedPopFront
method. This method would block until it is able to complete successfully or until the specified time limit expires.
The behavior for the destructor is undefined unless all access or modification of the object is completed prior to its destruction. Some form of synchronization, external to the component, is required to ensure the precondition on the destructor is met. For example, if two (or more) threads are manipulating a queue, it is not safe to anticipate the number of elements added to the queue, and destroy that queue immediately after the last element is popped (without additional synchronization) because one of the corresponding push functions may not have completed (push may, for instance, signal waiting threads after the element is considered added to the queue).
This section illustrates intended use of this component.
This example demonstrates how we might use a bdlcc::MultipriorityQueue
to communicate between a single "producer" thread and multiple "consumer" threads. The "producer" pushes work requests of varying priority onto the queue, and each "consumer" iteratively takes the highest priority work request from the queue and services it.
We begin our example with some utility classes that define a simple "work item":
Next, we provide a simple function to service an individual work item, and a function to get a work item. The details are unimportant for this example:
The myConsumer
function (below) will pop elements off the queue in priority order and process them. As discussed above, note that the call to queue->popFront(&item)
will block until there is an element available on the queue. This function will be executed in multiple threads, so that each thread waits in queue->popFront()
; bdlcc::MultipriorityQueue
guarantees that each thread gets a unique element from the queue:
The myConsumerThread
function below is a callback for bslmt::ThreadUtil
, which requires a "C" signature. bslmt::ThreadUtil::create()
expects a pointer to this function, and provides that function pointer to the newly-created thread. The new thread then executes this function.
Since bslmt::ThreadUtil::create()
uses the familiar "C" convention of passing a void
pointer, our function simply casts that pointer to our required type (bdlcc::MultipriorityQueue<MyWorkRequest> *
), and then delegates to the queue-specific function myConsumer
(above):
In this simple example, the myProducer
function (below) serves multiple roles: it creates the bdlcc::MultipriorityQueue
, starts the consumer threads, and then produces and queues work items. When work requests are exhausted, this function queues one e_STOP
item for each consumer thread.
When each consumer thread reads a e_STOP
, it terminates its thread-handling function. Note that, although the producer cannot control which thread pops a particular work item, it can rely on the knowledge that each consumer thread will read a single e_STOP
item and then terminate.
Finally, the myProducer
function "joins" each consumer thread, which ensures that the thread itself will terminate correctly (see the bslmt_threadutil component-level documentation for details):
The previous example shows a simple mechanism for distributing work requests over multiple threads. This approach works well for large tasks that can be decomposed into discrete, independent tasks that can benefit from parallel execution. Note also that the various threads are synchronized only at the end of execution, when the producer "joins" the various consumer threads.
The simple strategy used in the first example works well for tasks that share no state, and are completely independent of one another. For instance, a web server might use a similar strategy to distribute http
requests across multiple worker threads.
In more complicated examples, it is often necessary or desirable to synchronize the separate tasks during execution. The second example below shows a single "Observer" mechanism that receives event notification from the various worker threads.
We first create a simple MyEvent
data type. Worker threads will use this type to report information about their work. In our example, we will report the "worker Id", the event number, and some arbitrary text.
As with the previous example, class MyEvent
also contains an EventType
, an enumeration that indicates whether the worker has completed all work. The "Observer" will use this enumerated value to note when a worker thread has completed its work:
As noted in the previous example, bslmt::ThreadUtil::create()
spawns a new thread, which invokes a simple "C" function taking a void
pointer. In the previous example, we simply converted that void
pointer into a pointer to bdlcc::MultipriorityQueue<MyWorkRequest>
.
In this example, however, we want to pass an additional data item. Each worker thread is initialized with a unique integer value ("worker Id"), which identifies that thread. We therefore create a simple struct
that contains both of these values:
Function myWorker
(below) simulates a working thread by enqueuing multiple MyEvent
events during execution. In a realistic application, each MyEvent
structure would likely contain different textual information. For the sake of simplicity, however, our loop uses a constant value for the text field. Note that various priorities are generated to illustrate the multi-priority aspect of this particular queue:
The callback function myWorkerThread
(below) invoked by bslmt::ThreadUtil::create
takes the traditional void
pointer. The expected data is the composite structure MyWorkerData
. The callback function casts the void
pointer to the application-specific data type and then uses the referenced object to construct a call to the myWorker
function:
For the sake of simplicity, we will implement the Observer behavior (below) in the main thread. The void
function myObserver
starts multiple threads running the myWorker
function, reads MyEvent
values from the queue, and logs all messages in the order of arrival.
As each myWorker
thread terminates, it sends a e_TASK_COMPLETE
event. Upon receiving this event, the myObserver
function uses the d_workerId
to find the relevant thread, and then "joins" that thread.
The myObserver
function determines when all tasks have completed simply by counting the number of e_TASK_COMPLETE
messages received: