BDE 4.14.0 Production release
Loading...
Searching...
No Matches
bdlcc_multipriorityqueue.h
Go to the documentation of this file.
1/// @file bdlcc_multipriorityqueue.h
2///
3/// The content of this file has been pre-processed for Doxygen.
4///
5
6
7// bdlcc_multipriorityqueue.h -*-C++-*-
8#ifndef INCLUDED_BDLCC_MULTIPRIORITYQUEUE
9#define INCLUDED_BDLCC_MULTIPRIORITYQUEUE
10
11#include <bsls_ident.h>
12BSLS_IDENT("$Id: $")
13
14/// @defgroup bdlcc_multipriorityqueue bdlcc_multipriorityqueue
15/// @brief Provide a thread-enabled parameterized multi-priority queue.
16/// @addtogroup bdl
17/// @{
18/// @addtogroup bdlcc
19/// @{
20/// @addtogroup bdlcc_multipriorityqueue
21/// @{
22///
23/// <h1> Outline </h1>
24/// * <a href="#bdlcc_multipriorityqueue-purpose"> Purpose</a>
25/// * <a href="#bdlcc_multipriorityqueue-classes"> Classes </a>
26/// * <a href="#bdlcc_multipriorityqueue-description"> Description </a>
27/// * <a href="#bdlcc_multipriorityqueue-thread-enabled-idioms-in-the-bdlcc-multipriorityqueue-interface"> Thread-Enabled Idioms in the bdlcc::MultipriorityQueue Interface </a>
28/// * <a href="#bdlcc_multipriorityqueue-possible-future-enhancements"> Possible Future Enhancements </a>
29/// * <a href="#bdlcc_multipriorityqueue-warning-synchronization-required-on-destruction"> WARNING: Synchronization Required on Destruction </a>
30/// * <a href="#bdlcc_multipriorityqueue-usage"> Usage </a>
31/// * <a href="#bdlcc_multipriorityqueue-example-1-simple-thread-pool"> Example 1: Simple Thread Pool </a>
32/// * <a href="#bdlcc_multipriorityqueue-example-2-multi-threaded-observer"> Example 2: Multi-Threaded Observer </a>
33///
34/// # Purpose {#bdlcc_multipriorityqueue-purpose}
35/// Provide a thread-enabled parameterized multi-priority queue.
36///
37/// # Classes {#bdlcc_multipriorityqueue-classes}
38///
39/// - bdlcc::MultipriorityQueue: thread-enabled, multi-priority queue
40///
41/// @see
42///
43/// # Description {#bdlcc_multipriorityqueue-description}
44/// This component provides a thread-enabled mechanism,
45/// `bdlcc::MultipriorityQueue`, implementing a special-purpose priority queue
46/// container of items of parameterized `TYPE`. Each item has a priority which,
47/// for efficiency of implementation, is limited to a relatively small number
48/// `N` of contiguous integers `[ 0 .. N - 1 ]`, with `N` indicated at
49/// construction, and 0 being the most urgent priority. This queue also takes
50/// an optional allocator, supplied at construction. Once configured, these
51/// instance parameters remain unchanged for the life of each multi-priority
52/// queue.
53///
54/// ## Thread-Enabled Idioms in the bdlcc::MultipriorityQueue Interface {#bdlcc_multipriorityqueue-thread-enabled-idioms-in-the-bdlcc-multipriorityqueue-interface}
55///
56///
57/// The thread-enabled `bdlcc::MultipriorityQueue` is, in many regards, similar
58/// to a value-semantic type in that there is an obvious abstract notion of
59/// "value" that can be described in terms of salient attributes, which for this
60/// type is a sequence of priority/element pairs, constrained to be in
61/// increasing order of priority. There are, however, several differences in
62/// method behavior and signature that arise due to the thread-enabled nature of
63/// the queue and its anticipated usage pattern.
64///
65/// For example, if a queue object is empty, `popFront` will block indefinitely
66/// until an element is added to the queue. Also, since dynamic instance
67/// information, such as the number of elements currently in a queue, can be
68/// out-of-date by the time it is returned, some manipulators (e.g.,
69/// `tryPopFront`) are deliberately combined with an accessor operation (e.g.,
70/// `isEmpty`) in order to guarantee proper behavior.
71///
72/// Finally, note that although the parameterized `TYPE` is expected to at least
73/// support copy construction and assignment, the
74/// `bdlcc::MultipriorityQueue<TYPE>` type currently does not support any
75/// value-semantic operations, since different queues could have different
76/// numbers of priorities, making comparison, assignment and copy construction
77/// awkward.
78///
79/// ## Possible Future Enhancements {#bdlcc_multipriorityqueue-possible-future-enhancements}
80///
81///
82/// In addition to `popFront` and `tryPopFront`, a `bdlcc::MultipriorityQueue`
83/// may some day also provide a `timedPopFront` method. This method would block
84/// until it is able to complete successfully or until the specified time limit
85/// expires.
86///
87/// ## WARNING: Synchronization Required on Destruction {#bdlcc_multipriorityqueue-warning-synchronization-required-on-destruction}
88///
89///
90/// The behavior for the destructor is undefined unless all access or
91/// modification of the object is completed prior to its destruction. Some form
92/// of synchronization, external to the component, is required to ensure the
93/// precondition on the destructor is met. For example, if two (or more)
94/// threads are manipulating a queue, it is *not* safe to anticipate the number
95/// of elements added to the queue, and destroy that queue immediately after the
96/// last element is popped (without additional synchronization) because one of
97/// the corresponding push functions may not have completed (push may, for
98/// instance, signal waiting threads after the element is considered added to
99/// the queue).
100///
101/// ## Usage {#bdlcc_multipriorityqueue-usage}
102///
103///
104/// This section illustrates intended use of this component.
105///
106/// ### Example 1: Simple Thread Pool {#bdlcc_multipriorityqueue-example-1-simple-thread-pool}
107///
108///
109/// This example demonstrates how we might use a `bdlcc::MultipriorityQueue` to
110/// communicate between a single "producer" thread and multiple "consumer"
111/// threads. The "producer" pushes work requests of varying priority onto the
112/// queue, and each "consumer" iteratively takes the highest priority work
113/// request from the queue and services it.
114///
115/// We begin our example with some utility classes that define a simple "work
116/// item":
117/// @code
118/// enum {
119/// k_MAX_CONSUMER_THREADS = 10
120/// };
121///
122/// struct MyWorkData {
123/// int d_i; // input to work to be done
124///
125/// // Work data...
126/// };
127///
128/// struct MyWorkRequest {
129/// enum RequestType {
130/// e_WORK = 1,
131/// e_STOP = 2
132/// };
133///
134/// RequestType d_type;
135/// MyWorkData d_data;
136///
137/// // Work data...
138/// };
139/// @endcode
140/// Next, we provide a simple function to service an individual work item, and a
141/// function to get a work item. The details are unimportant for this example:
142/// @code
143/// void myDoWork(MyWorkData& data)
144/// {
145/// // Do work...
146/// (void)data;
147/// }
148///
149/// int getWorkData(MyWorkData *result)
150/// {
151/// static int count = 0;
152/// result->d_i = rand(); // Only one thread runs this routine, so it
153/// // does not matter that 'rand()' is not
154/// // thread-safe, or that 'count' is 'static'.
155///
156/// return ++count >= 100;
157/// }
158/// @endcode
159/// The `myConsumer` function (below) will pop elements off the queue in
160/// priority order and process them. As discussed above, note that the call to
161/// `queue->popFront(&item)` will block until there is an element available on
162/// the queue. This function will be executed in multiple threads, so that each
163/// thread waits in `queue->popFront()`; `bdlcc::MultipriorityQueue` guarantees
164/// that each thread gets a unique element from the queue:
165/// @code
166/// void myConsumer(bdlcc::MultipriorityQueue<MyWorkRequest> *queue)
167/// {
168/// MyWorkRequest item;
169/// while (1) {
170///
171/// // The 'popFront' function will wait for a 'MyWorkRequest' until
172/// // one is available.
173///
174/// queue->popFront(&item);
175///
176/// if (MyWorkRequest::e_STOP == item.d_type) {
177/// break;
178/// }
179///
180/// myDoWork(item.d_data);
181/// }
182/// }
183/// @endcode
184/// The `myConsumerThread` function below is a callback for `bslmt::ThreadUtil`,
185/// which requires a "C" signature. `bslmt::ThreadUtil::create()` expects a
186/// pointer to this function, and provides that function pointer to the
187/// newly-created thread. The new thread then executes this function.
188///
189/// Since `bslmt::ThreadUtil::create()` uses the familiar "C" convention of
190/// passing a `void` pointer, our function simply casts that pointer to our
191/// required type (`bdlcc::MultipriorityQueue<MyWorkRequest> *`), and then
192/// delegates to the queue-specific function `myConsumer` (above):
193/// @code
194/// extern "C" void *myConsumerThread(void *queuePtr)
195/// {
196/// myConsumer ((bdlcc::MultipriorityQueue<MyWorkRequest>*) queuePtr);
197/// return queuePtr;
198/// }
199/// @endcode
200/// In this simple example, the `myProducer` function (below) serves multiple
201/// roles: it creates the `bdlcc::MultipriorityQueue`, starts the consumer
202/// threads, and then produces and queues work items. When work requests are
203/// exhausted, this function queues one `e_STOP` item for each consumer thread.
204///
205/// When each consumer thread reads a `e_STOP`, it terminates its
206/// thread-handling function. Note that, although the producer cannot control
207/// which thread pops a particular work item, it can rely on the knowledge that
208/// each consumer thread will read a single `e_STOP` item and then terminate.
209///
210/// Finally, the `myProducer` function "joins" each consumer thread, which
211/// ensures that the thread itself will terminate correctly (see the
212/// @ref bslmt_threadutil component-level documentation for details):
213/// @code
214/// void myProducer()
215/// {
216/// enum {
217/// k_NUM_PRIORITIES = 8,
218/// k_NUM_THREADS = 8
219/// };
220///
221/// MyWorkRequest item;
222/// MyWorkData workData;
223///
224/// // Create multi-priority queue with specified number of priorities.
225///
226/// bdlcc::MultipriorityQueue<MyWorkRequest> queue(k_NUM_PRIORITIES);
227///
228/// // Start the specified number of threads.
229///
230/// assert(0 < k_NUM_THREADS
231/// && k_NUM_THREADS <= static_cast<int>(k_MAX_CONSUMER_THREADS));
232/// bslmt::ThreadUtil::Handle consumerHandles[k_MAX_CONSUMER_THREADS];
233///
234/// for (int i = 0; i < k_NUM_THREADS; ++i) {
235/// bslmt::ThreadUtil::create(&consumerHandles[i],
236/// myConsumerThread,
237/// &queue);
238/// }
239///
240/// // Load work data into work requests and push them onto the queue with
241/// // varying priority until all work data has been exhausted.
242///
243/// int count = 0; // used to generate priorities
244///
245/// while (!getWorkData(&workData)) { // see declaration (above)
246/// item.d_type = MyWorkRequest::e_WORK;
247/// item.d_data = workData;
248/// queue.pushBack(item, count % k_NUM_PRIORITIES); // mixed
249/// // priorities
250/// ++count;
251/// }
252///
253/// // Load as many stop requests as there are active consumer threads.
254///
255/// for (int i = 0; i < k_NUM_THREADS; ++i) {
256/// item.d_type = MyWorkRequest::e_STOP;
257/// queue.pushBack(item, k_NUM_PRIORITIES - 1); // lowest priority
258/// }
259///
260/// // Join all of the consumer threads back with the main thread.
261///
262/// for (int i = 0; i < k_NUM_THREADS; ++i) {
263/// bslmt::ThreadUtil::join(consumerHandles[i]);
264/// }
265/// }
266/// @endcode
267///
268/// ### Example 2: Multi-Threaded Observer {#bdlcc_multipriorityqueue-example-2-multi-threaded-observer}
269///
270///
271/// The previous example shows a simple mechanism for distributing work requests
272/// over multiple threads. This approach works well for large tasks that can be
273/// decomposed into discrete, independent tasks that can benefit from parallel
274/// execution. Note also that the various threads are synchronized only at the
275/// end of execution, when the producer "joins" the various consumer threads.
276///
277/// The simple strategy used in the first example works well for tasks that
278/// share no state, and are completely independent of one another. For
279/// instance, a web server might use a similar strategy to distribute `http`
280/// requests across multiple worker threads.
281///
282/// In more complicated examples, it is often necessary or desirable to
283/// synchronize the separate tasks during execution. The second example below
284/// shows a single "Observer" mechanism that receives event notification from
285/// the various worker threads.
286///
287/// We first create a simple `MyEvent` data type. Worker threads will use this
288/// type to report information about their work. In our example, we will report
289/// the "worker Id", the event number, and some arbitrary text.
290///
291/// As with the previous example, class `MyEvent` also contains an `EventType`,
292/// an enumeration that indicates whether the worker has completed all work.
293/// The "Observer" will use this enumerated value to note when a worker thread
294/// has completed its work:
295/// @code
296/// enum {
297/// k_MAX_CONSUMER_THREADS = 10,
298/// k_MAX_EVENT_TEXT = 80
299/// };
300///
301/// struct MyEvent {
302/// enum EventType {
303/// e_IN_PROGRESS = 1,
304/// e_TASK_COMPLETE = 2
305/// };
306///
307/// EventType d_type;
308/// int d_workerId;
309/// int d_eventNumber;
310/// char d_eventText[k_MAX_EVENT_TEXT];
311/// };
312/// @endcode
313/// As noted in the previous example, `bslmt::ThreadUtil::create()` spawns a new
314/// thread, which invokes a simple "C" function taking a `void` pointer. In the
315/// previous example, we simply converted that `void` pointer into a pointer to
316/// `bdlcc::MultipriorityQueue<MyWorkRequest>`.
317///
318/// In this example, however, we want to pass an additional data item. Each
319/// worker thread is initialized with a unique integer value ("worker Id"),
320/// which identifies that thread. We therefore create a simple `struct` that
321/// contains both of these values:
322/// @code
323/// struct MyWorkerData {
324/// int d_workerId;
325/// bdlcc::MultipriorityQueue<MyEvent> *d_queue;
326/// };
327/// @endcode
328/// Function `myWorker` (below) simulates a working thread by enqueuing multiple
329/// `MyEvent` events during execution. In a realistic application, each
330/// `MyEvent` structure would likely contain different textual information. For
331/// the sake of simplicity, however, our loop uses a constant value for the text
332/// field. Note that various priorities are generated to illustrate the
333/// multi-priority aspect of this particular queue:
334/// @code
335/// void myWorker(int workerId, bdlcc::MultipriorityQueue<MyEvent> *queue)
336/// {
337/// const int N = queue->numPriorities();
338/// const int NUM_EVENTS = 5;
339/// int eventNumber; // used also to generate mixed priorities
340///
341/// // First push 'NUM_EVENTS' events onto 'queue' with mixed priorities.
342///
343/// for (eventNumber = 0; eventNumber < NUM_EVENTS; ++eventNumber) {
344/// MyEvent ev = {
345/// MyEvent::e_IN_PROGRESS,
346/// workerId,
347/// eventNumber,
348/// "In-Progress Event" // constant (for simplicity)
349/// };
350/// queue->pushBack(ev, eventNumber % N); // mixed priorities
351/// }
352///
353/// // Now push an event to end this task.
354///
355/// MyEvent ev = {
356/// MyEvent::e_TASK_COMPLETE,
357/// workerId,
358/// eventNumber,
359/// "Task Complete"
360/// };
361/// queue->pushBack(ev, N - 1); // lowest priority
362/// }
363/// @endcode
364/// The callback function `myWorkerThread` (below) invoked by
365/// `bslmt::ThreadUtil::create` takes the traditional `void` pointer. The
366/// expected data is the composite structure `MyWorkerData`. The callback
367/// function casts the `void` pointer to the application-specific data type and
368/// then uses the referenced object to construct a call to the `myWorker`
369/// function:
370/// @code
371/// extern "C" void *myWorkerThread(void *vWorkerPtr)
372/// {
373/// MyWorkerData *workerPtr = (MyWorkerData *)vWorkerPtr;
374/// myWorker(workerPtr->d_workerId, workerPtr->d_queue);
375/// return vWorkerPtr;
376/// }
377/// @endcode
378/// For the sake of simplicity, we will implement the Observer behavior (below)
379/// in the main thread. The `void` function `myObserver` starts multiple
380/// threads running the `myWorker` function, reads `MyEvent` values from the
381/// queue, and logs all messages in the order of arrival.
382///
383/// As each `myWorker` thread terminates, it sends a `e_TASK_COMPLETE` event.
384/// Upon receiving this event, the `myObserver` function uses the `d_workerId`
385/// to find the relevant thread, and then "joins" that thread.
386///
387/// The `myObserver` function determines when all tasks have completed simply by
388/// counting the number of `e_TASK_COMPLETE` messages received:
389/// @code
390/// void myObserver()
391/// {
392/// const int k_NUM_THREADS = 10;
393/// const int k_NUM_PRIORITIES = 4;
394///
395/// bdlcc::MultipriorityQueue<MyEvent> queue(k_NUM_PRIORITIES);
396///
397/// assert(0 < k_NUM_THREADS
398/// && k_NUM_THREADS <= static_cast<int>(k_MAX_CONSUMER_THREADS));
399/// bslmt::ThreadUtil::Handle workerHandles[k_MAX_CONSUMER_THREADS];
400///
401/// // Create `k_NUM_THREADS` threads, each having a unique "worker id".
402///
403/// MyWorkerData workerData[k_NUM_THREADS];
404/// for (int i = 0; i < k_NUM_THREADS; ++i) {
405/// workerData[i].d_queue = &queue;
406/// workerData[i].d_workerId = i;
407/// bslmt::ThreadUtil::create(&workerHandles[i],
408/// myWorkerThread,
409/// &workerData[i]);
410/// }
411///
412/// // Now print out each of the `MyEvent` values as the threads complete.
413/// // This function ends after a total of `k_NUM_THREADS`
414/// // `MyEvent::e_TASK_COMPLETE` events have been printed.
415///
416/// int nStop = 0;
417/// while (nStop < k_NUM_THREADS) {
418/// MyEvent ev;
419/// queue.popFront(&ev);
420/// bsl::cout << "[" << ev.d_workerId << "] "
421/// << ev.d_eventNumber << ". "
422/// << ev.d_eventText << bsl::endl;
423/// if (MyEvent::e_TASK_COMPLETE == ev.d_type) {
424/// ++nStop;
425/// bslmt::ThreadUtil::join(workerHandles[ev.d_workerId]);
426/// }
427/// }
428/// }
429/// @endcode
430/// @}
431/** @} */
432/** @} */
433
434/** @addtogroup bdl
435 * @{
436 */
437/** @addtogroup bdlcc
438 * @{
439 */
440/** @addtogroup bdlcc_multipriorityqueue
441 * @{
442 */
443
444#include <bdlscm_version.h>
445
446#include <bdlb_bitutil.h>
447
448#include <bdlma_concurrentpool.h>
449
451
452#include <bslma_allocator.h>
454#include <bslma_default.h>
455#include <bslma_managedptr.h>
457
458#include <bslmf_movableref.h>
460
461#include <bslmt_condition.h>
462#include <bslmt_lockguard.h>
463#include <bslmt_mutex.h>
464#include <bslmt_threadutil.h>
465
466#include <bsls_assert.h>
467#include <bsls_atomic.h>
468
469#include <bsl_climits.h>
470#include <bsl_cstdint.h>
471#include <bsl_new.h>
472#include <bsl_vector.h>
473
474#ifndef BDE_DONT_ALLOW_TRANSITIVE_INCLUDES
475#include <bslalg_typetraits.h>
476#endif // BDE_DONT_ALLOW_TRANSITIVE_INCLUDES
477
478
479namespace bdlcc {
480
481 // =========================================
482 // local class MultipriorityQueue_Node<TYPE>
483 // =========================================
484
485/// This class handles storage of one item of parameterized `TYPE` as a node
486/// in a linked list of items stored in a multipriority queue for a given
487/// priority. This class is not to be used from outside this component.
488///
489/// See @ref bdlcc_multipriorityqueue
490template <class TYPE>
492
493 // DATA
494 bslalg::ConstructorProxy<TYPE> d_item; // object stored in node
495 MultipriorityQueue_Node<TYPE> *d_next_p; // next node on linked list
496
497 private:
498 // NOT IMPLEMENTED
501
502 public:
503 // TRAITS
506
507 // CREATORS
508
509 /// Create a node containing a copy of the specified `item` and having
510 /// the specified `next` pointer. Use the specified `basicAllocator` to
511 /// supply memory. The behavior is undefined unless `basicAllocator` is
512 /// non-null. Note that `item` must be copyable and assignable.
513 MultipriorityQueue_Node(const TYPE& item,
514 bslma::Allocator *basicAllocator);
515
516 /// Create a node containing the value of the specified `item` and
517 /// having the specified `next` pointer. `item` is left in a valid but
518 /// unspecified state. Use the specified `basicAllocator` to supply
519 /// memory. The behavior is undefined unless `basicAllocator` is
520 /// non-null.
522 bslma::Allocator *basicAllocator);
523
524 /// Destroy this node and free all memory that was allocated on its
525 /// behalf, if any.
527
528 // MANIPULATORS
529
530 /// Return a reference to the non-modifiable item stored in this node.
531 TYPE& item();
532
533 /// Return a reference to the modifiable pointer to the node following
534 /// this node on the linked list.
536
537 // ACCESSORS
538
539 /// Return a pointer to the non-modifiable node following this node on
540 /// the linked list, or 0 if this node has no successor.
541 const MultipriorityQueue_Node *nextPtr() const;
542};
543
544 // ==============================
545 // class MultipriorityQueue<TYPE>
546 // ==============================
547
548/// This class implements a thread-enabled multipriority queue whose
549/// priorities are restricted to a (small) set of contiguous `N` integer
550/// values, `[ 0 .. N - 1 ]`, with 0 being the most urgent.
551///
552/// This class does have a notion of value, namely the sequence of
553/// priority/element pairs, constrained to be in decreasing order of urgency
554/// (i.e., monotonically increasing priority values). However, no
555/// value-semantic operations are implemented. Note that elements having
556/// the same priority are maintained in First-In-First-Out (FIFO) order.
557/// Note that the current implementation supports up to a maximum of
558/// `sizeof(int) * CHAR_BIT` priorities.
559///
560/// This class is implemented as a set of linked lists, one for each
561/// priority. Two vectors are used to maintain head and tail pointers for
562/// the lists.
563///
564/// See @ref bdlcc_multipriorityqueue
565template <class TYPE>
567
568 // PRIVATE CONSTANTS
569 enum {
570 k_BITS_PER_INT = sizeof(int) * CHAR_BIT,
571 k_DEFAULT_NUM_PRIORITIES = k_BITS_PER_INT,
572 k_MAX_NUM_PRIORITIES = k_BITS_PER_INT
573 };
574
575 // PRIVATE TYPES
576
577 /// The type of the elements on the linked lists of items that are
578 /// maintained for the `N` priorities handled by this multipriority
579 /// queue.
581
582 /// The type of the vectors of list head and tail pointers.
584
585 // DATA
586 mutable bslmt::Mutex d_mutex; // used to synchronize access
587 // (including 'const' access)
588
589 bslmt::Condition d_notEmptyCondition;
590 // signaled on each push
591
592 NodePtrVector d_heads; // pointers to heads of linked lists
593 // -- one for each priority
594
595 NodePtrVector d_tails; // pointers to tails of linked lists
596 // -- one for each priority
597
598 int d_notEmptyFlags; // bit mask indicating priorities
599 // for which there is data, where
600 // bit 0 is the lowest order bit,
601 // representing most urgent priority
602
603 bdlma::ConcurrentPool d_pool; // memory pool used for node storage
604
605 bsls::AtomicInt d_length; // total number of items in this
606 // multipriority queue
607
608 bool d_enabledFlag; // enabled/disabled state of pushes
609 // to the multipriority queue (does
610 // not affect pops)
611
612 bslma::Allocator *d_allocator_p; // memory allocator (held)
613
614 private:
615 // NOT IMPLEMENTED
617 MultipriorityQueue& operator=(const MultipriorityQueue&);
618
619 private:
620 // PRIVATE MANIPULATORS
621
622 /// Attempt to remove (immediately) the least-recently added item having
623 /// the most urgent priority (lowest value) from this multipriority
624 /// queue. If the specified `blockFlag` is `true`, this method blocks
625 /// the calling thread until an item becomes available. On success,
626 /// load the value of the popped item into the specified `item`; if the
627 /// specified `itemPriority` is non-null, load the priority of the
628 /// popped item into `itemPriority`; and return 0. Otherwise, leave
629 /// `item` and `itemPriority` unmodified, and return a non-zero value
630 /// indicating that this multipriority queue was empty. The behavior is
631 /// undefined unless `item` is non-null. Note that a non-zero value can
632 /// be returned only if `blockFlag` is `false`.
633 int tryPopFrontImpl(TYPE *item, int *itemPriority, bool blockFlag);
634
635 public:
636 // TRAITS
639
640 // CREATORS
641
642 explicit MultipriorityQueue(bslma::Allocator *basicAllocator = 0);
643 /// Create a multipriority queue. Optionally specify `numPriorities`,
644 /// the number of distinct priorities supported by the multipriority
645 /// queue. If `numPriorities` is not specified, the
646 /// (implementation-imposed maximum) number 32 is used. Optionally
647 /// specify a `basicAllocator` used to supply memory. If
648 /// `basicAllocator` is 0, the currently installed default allocator is
649 /// used. The behavior is undefined unless `1 <= numPriorities <= 32`
650 /// (if specified).
652 bslma::Allocator *basicAllocator = 0);
653
654 /// Destroy this container. The behavior is undefined unless all access
655 /// or modification of the container has completed prior to this call.
657
658 // MANIPULATORS
659
660 /// Remove the least-recently added item having the most urgent priority
661 /// (lowest value) from this multi-priority queue and load its value
662 /// into the specified `item`. If this queue is empty, this method
663 /// blocks the calling thread until an item becomes available. If the
664 /// optionally specified `itemPriority` is non-null, load the priority
665 /// of the popped item into `itemPriority`. The behavior is undefined
666 /// unless `item` is non-null. Note this is unaffected by the enabled /
667 /// disabled state of the queue.
668 void popFront(TYPE *item, int *itemPriority = 0);
669
670 /// Insert the value of the specified `item` with the specified
671 /// `itemPriority` into this multipriority queue before any queued items
672 /// having a less urgent priority (higher value) than `itemPriority`,
673 /// and after any items having the same or more urgent priority (lower
674 /// value) than `itemPriority`. If the multipriority queue is enabled,
675 /// the push succeeds and `0` is returned, otherwise the push fails, the
676 /// queue remains unchanged, and a nonzero value is returned. The
677 /// behavior is undefined unless `0 <= itemPriority < numPriorities()`.
678 int pushBack(const TYPE& item, int itemPriority);
679
680 /// Insert the value of the specified `item` with the specified
681 /// `itemPriority` into this multipriority queue before any queued items
682 /// having a less urgent priority (higher value) than `itemPriority`,
683 /// and after any items having the same or more urgent priority (lower
684 /// value) than `itemPriority`. `item` is left in a valid but
685 /// unspecified state. If the multipriority queue is enabled, the push
686 /// succeeds and `0` is returned, otherwise the push fails, the queue
687 /// remains unchanged, and a nonzero value is returned. The behavior is
688 /// undefined unless `0 <= itemPriority < numPriorities()`.
689 int pushBack(bslmf::MovableRef<TYPE> item, int itemPriority);
690
691 /// Insert the value of the specified `item` with the specified
692 /// `itemPriority` onto the back of this multipriority queue before any
693 /// queued items having a less urgent priority (higher value) than
694 /// `itemPriority`, and after any items having the same or more urgent
695 /// priority (lower value) than `itemPriority`. All of the specified
696 /// `numItems` items are pushed as a single atomic action, unless the
697 /// copy constructor for one of them throws an exception, in which case
698 /// a possibly empty subset of the pushes will have completed and no
699 /// memory will be leaked. `Raw` means that the push will succeed even
700 /// if the multipriority queue is disabled. Note that this method is
701 /// targeted for specific use by the class
702 /// `bdlmt::MultipriorityThreadPool`. The behavior is undefined unless
703 /// `0 <= itemPriority < numPriorities()`.
704 void pushBackMultipleRaw(const TYPE& item, int itemPriority, int numItems);
705
706 /// Insert the value of the specified `item` with the specified
707 /// `itemPriority` into the front of this multipriority queue the
708 /// specified `numItems` times, before any queued items having the same
709 /// or less urgent priority (higher value) than `itemPriority`, and
710 /// after any items having more urgent priority (lower value) than
711 /// `itemPriority`. All `numItems` items are pushed as a single atomic
712 /// action, unless the copy constructor throws while creating one of
713 /// them, in which case a possibly empty subset of the pushes will have
714 /// completed and no memory will be leaked. `Raw` means that the push
715 /// will succeed even if the multipriority queue is disabled. The
716 /// behavior is undefined unless `0 <= itemPriority < numPriorities()`.
717 /// Note that this method is targeted at specific uses by the class
718 /// `bdlmt::MultipriorityThreadPool`.
719 void pushFrontMultipleRaw(const TYPE& item,
720 int itemPriority,
721 int numItems);
722
723 /// Attempt to remove (immediately) the least-recently added item having
724 /// the most urgent priority (lowest value) from this multi-priority
725 /// queue. On success, load the value of the popped item into the
726 /// specified `item`; if the optionally specified `itemPriority` is
727 /// non-null, load the priority of the popped item into `itemPriority`;
728 /// and return 0. Otherwise, leave `item` and `itemPriority`
729 /// unmodified, and return a non-zero value indicating that this queue
730 /// was empty. The behavior is undefined unless `item` is non-null.
731 /// Note this is unaffected by the enabled / disabled state of the
732 /// queue.
733 int tryPopFront(TYPE *item, int *itemPriority = 0);
734
735 /// Remove and destroy all items from this multi-priority queue.
736 void removeAll();
737
738 /// Enable pushes to this multipriority queue. This method has no
739 /// effect unless the queue was disabled.
740 void enable();
741
742 /// Disable pushes to this multipriority queue. This method has no
743 /// effect unless the queue was enabled.
744 void disable();
745
746 // ACCESSORS
747
748 /// Return the number of distinct priorities (indicated at construction)
749 /// that are supported by this multi-priority queue.
750 int numPriorities() const;
751
752 /// Return the total number of items in this multi-priority queue.
753 int length() const;
754
755 /// Return `true` if there are no items in this multi-priority queue,
756 /// and `false` otherwise.
757 bool isEmpty() const;
758
759 /// Return `true` if this multipriority queue is enable and `false`
760 /// otherwise.
761 bool isEnabled() const;
762};
763
764// ============================================================================
765// INLINE DEFINITIONS
766// ============================================================================
767
768 // -----------------------------------------
769 // local class MultipriorityQueue_Node<TYPE>
770 // -----------------------------------------
771
772// CREATORS
773template <class TYPE>
774inline
776 const TYPE& item,
777 bslma::Allocator *basicAllocator)
778: d_item(item, basicAllocator)
779, d_next_p(0)
780{}
781
782template <class TYPE>
783inline
786 bslma::Allocator *basicAllocator)
787: d_item(bslmf::MovableRefUtil::move(item), basicAllocator)
788, d_next_p(0)
789{}
790
791template <class TYPE>
792inline
795
796// MANIPULATORS
797template <class TYPE>
798inline
800{
801 return d_item.object();
802}
803
804template <class TYPE>
805inline
810
811// ACCESSORS
812template <class TYPE>
813inline
816{
817 return d_next_p;
818}
819
820 // ------------------------------
821 // class MultipriorityQueue<TYPE>
822 // ------------------------------
823
824// PRIVATE MANIPULATORS
825template <class TYPE>
827 int *itemPriority,
828 bool blockFlag)
829{
830 enum { e_SUCCESS = 0, e_FAILURE = -1 };
831
832 Node *condemned;
833 int priority;
834
835 BSLS_ASSERT(item);
836
837 {
838 bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);
839
840 while (0 == d_length) {
841 // Note that if we get a spurious signal, we will check the
842 // 'blockFlag' unnecessarily, but that will typically be a rare
843 // occurrence. This arrangement minimizes the time taken in the
844 // case where '0 != d_length', which will typically be a frequent
845 // occurrence.
846
847 if (blockFlag) {
848 d_notEmptyCondition.wait(&d_mutex);
849 }
850 else {
851 return e_FAILURE; // RETURN
852 }
853 }
854
856 (bsl::uint32_t)d_notEmptyFlags);
857 BSLS_ASSERT(priority < k_MAX_NUM_PRIORITIES);
858 // verifies there is at least one priority bit set. Note that
859 // 'numTrailingUnsetBits' cannot return a negative value.
860
861 Node *& head = d_heads[priority];
862 condemned = head;
863
864 *item = bslmf::MovableRefUtil::move(condemned->item()); // might throw
865
866 head = head->nextPtr();
867 if (0 == head) {
868 // The last item with this priority was just popped.
869
870 BSLS_ASSERT(d_tails[priority] == condemned);
871 d_notEmptyFlags &= ~(1 << priority);
872 }
873
874 --d_length;
875 }
876
877 if (itemPriority) {
878 *itemPriority = priority;
879 }
880
881 condemned->~Node();
882 d_pool.deallocate(condemned);
883
884 return e_SUCCESS;
885}
886
887// CREATORS
888template <class TYPE>
890: d_heads((typename NodePtrVector::size_type)k_DEFAULT_NUM_PRIORITIES, 0,
891 basicAllocator)
892, d_tails((typename NodePtrVector::size_type)k_DEFAULT_NUM_PRIORITIES, 0,
893 basicAllocator)
894, d_notEmptyFlags(0)
895, d_pool(sizeof(Node), bslma::Default::allocator(basicAllocator))
896, d_length(0)
897, d_enabledFlag(true)
898, d_allocator_p(bslma::Default::allocator(basicAllocator))
899{
900}
901
902template <class TYPE>
904 bslma::Allocator *basicAllocator)
905: d_heads((typename NodePtrVector::size_type)numPriorities, 0, basicAllocator)
906, d_tails((typename NodePtrVector::size_type)numPriorities, 0, basicAllocator)
907, d_notEmptyFlags(0)
908, d_pool(sizeof(Node), bslma::Default::allocator(basicAllocator))
909, d_length(0)
910, d_enabledFlag(true)
911, d_allocator_p(bslma::Default::allocator(basicAllocator))
912{
914 BSLS_ASSERT(k_MAX_NUM_PRIORITIES >= numPriorities);
915}
916
917template <class TYPE>
919{
920 removeAll();
921
922 typename NodePtrVector::iterator it;
923 typename NodePtrVector::iterator endIt;
924
925 for (it = d_heads.begin(), endIt = d_heads.end(); endIt != it; ++it) {
926 BSLS_ASSERT(!*it);
927 }
928
929 // Tails do not get set to null by 'removeAll', so are indeterminate.
930
931 BSLS_ASSERT(isEmpty());
932 BSLS_ASSERT(0 == d_notEmptyFlags);
933}
934
935// MANIPULATORS
936template <class TYPE>
937inline
938void MultipriorityQueue<TYPE>::popFront(TYPE *item, int *itemPriority)
939{
940 tryPopFrontImpl(item, itemPriority, true);
941}
942
943template <class TYPE>
944int MultipriorityQueue<TYPE>::pushBack(const TYPE& item, int itemPriority)
945{
946 enum { e_SUCCESS = 0, e_FAILURE = -1 };
947
948 BSLS_ASSERT((unsigned)itemPriority < d_heads.size());
949
950 // Allocate and copy construct. Note we are doing this work outside the
951 // mutex, which is advantageous in that no one is waiting on us, but it has
952 // the disadvantage that we haven't checked whether this multipriority
953 // queue is disabled, in which case we'll throw the new node away.
954
955 // Note the queue being disabled is not the usual case. Note a race
956 // condition occurs if we check d_enabledFlag outside the mutex.
957
958 Node *newNode = (Node *)d_pool.allocate();
960 &d_pool);
961
962 ::new (newNode) Node(item, d_allocator_p); // might throw
963 deallocator.release();
964 bslma::ManagedPtr<Node> deleter(newNode, &d_pool);
965
966 {
967 bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);
968
969 if (!d_enabledFlag) {
970 return e_FAILURE; // RETURN
971 }
972
973 deleter.release();
974
975 const int mask = 1 << itemPriority;
976 if (d_notEmptyFlags & mask) {
977 d_tails[itemPriority]->nextPtr() = newNode;
978 }
979 else {
980 d_heads[itemPriority] = newNode;
981 d_notEmptyFlags |= mask;
982 }
983 d_tails[itemPriority] = newNode;
984
985 ++d_length;
986 }
987
988 d_notEmptyCondition.signal();
989
990 return e_SUCCESS;
991}
992
993template <class TYPE>
995 int itemPriority)
996{
997 enum { e_SUCCESS = 0, e_FAILURE = -1 };
998
999 BSLS_ASSERT((unsigned)itemPriority < d_heads.size());
1000
1001 // Allocate and copy construct. Note we are doing this work outside the
1002 // mutex, which is advantageous in that no one is waiting on us, but it has
1003 // the disadvantage that we haven't checked whether this multipriority
1004 // queue is disabled, in which case we'll throw the new node away.
1005 //
1006 // Note the queue being disabled is not the usual case. Note a race
1007 // condition occurs if we check d_enabledFlag outside the mutex.
1008
1009 Node *newNode = static_cast<Node *>(d_pool.allocate());
1011 &d_pool);
1012
1013 {
1014 bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);
1015
1016 // Do the enable check before the move, since if it is a move and not a
1017 // copy, there's no backing out after that.
1018
1019 if (!d_enabledFlag) {
1020 return e_FAILURE; // RETURN
1021 }
1022
1023 ::new (newNode) Node(bslmf::MovableRefUtil::move(item), // might throw
1024 d_allocator_p);
1025 deallocator.release();
1026
1027 const int mask = 1 << itemPriority;
1028 if (d_notEmptyFlags & mask) {
1029 d_tails[itemPriority]->nextPtr() = newNode;
1030 }
1031 else {
1032 d_heads[itemPriority] = newNode;
1033 d_notEmptyFlags |= mask;
1034 }
1035 d_tails[itemPriority] = newNode;
1036
1037 ++d_length;
1038 }
1039
1040 d_notEmptyCondition.signal();
1041
1042 return e_SUCCESS;
1043}
1044
1045template <class TYPE>
1047 int itemPriority,
1048 int numItems)
1049{
1050 BSLS_ASSERT((unsigned)itemPriority < d_heads.size());
1051
1052 const int mask = 1 << itemPriority;
1053
1054 {
1055 bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);
1056
1057 for (int ii = 0; ii < numItems; ++ii) {
1058 Node *newNode = (Node *)d_pool.allocate();
1060 newNode, &d_pool);
1061
1062 ::new (newNode) Node(item, d_allocator_p); // might throw
1063 deallocator.release();
1064
1065 if (d_notEmptyFlags & mask) {
1066 d_tails[itemPriority]->nextPtr() = newNode;
1067 }
1068 else {
1069 d_heads[itemPriority] = newNode;
1070 d_notEmptyFlags |= mask;
1071 }
1072 d_tails[itemPriority] = newNode;
1073
1074 ++d_length;
1075 }
1076 }
1077
1078 for (int ii = 0; ii < numItems; ++ii) {
1079 d_notEmptyCondition.signal();
1080 }
1081}
1082
1083template <class TYPE>
1085 int itemPriority,
1086 int numItems)
1087{
1088 BSLS_ASSERT((unsigned)itemPriority < d_heads.size());
1089
1090 const int mask = 1 << itemPriority;
1091
1092 {
1093 bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);
1094
1095 for (int ii = 0; ii < numItems; ++ii) {
1096 Node *newNode = (Node *)d_pool.allocate();
1098 newNode, &d_pool);
1099
1100 ::new (newNode) Node(item, d_allocator_p); // might throw
1101 deallocator.release();
1102
1103 Node *& head = d_heads[itemPriority];
1104 if (!head) {
1105 d_tails[itemPriority] = newNode;
1106 d_notEmptyFlags |= mask;
1107 }
1108 newNode->nextPtr() = head;
1109 head = newNode;
1110
1111 ++d_length;
1112 }
1113 }
1114
1115 for (int ii = 0; ii < numItems; ++ii) {
1116 d_notEmptyCondition.signal();
1117 }
1118}
1119
1120template <class TYPE>
1121inline
1122int MultipriorityQueue<TYPE>::tryPopFront(TYPE *item, int *itemPriority)
1123{
1124 return tryPopFrontImpl(item, itemPriority, false);
1125}
1126
1127template <class TYPE>
1129{
1130 Node *condemnedList = 0;
1131
1132 {
1133 bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);
1134
1135 while (d_notEmptyFlags) {
1136 const int priority = bdlb::BitUtil::numTrailingUnsetBits(
1137 static_cast<bsl::uint32_t>(d_notEmptyFlags));
1138
1139 Node *& head = d_heads[priority];
1140 BSLS_ASSERT(head);
1141
1142 d_tails[priority]->nextPtr() = condemnedList;
1143 condemnedList = head;
1144
1145 head = 0;
1146
1147 d_notEmptyFlags &= ~(1 << priority);
1148 }
1149
1150 BSLS_ASSERT(0 == d_notEmptyFlags);
1151
1152 d_length = 0;
1153 }
1154
1155 Node *node = condemnedList;
1156 while (node) {
1157 Node *condemnedNode = node;
1158 node = node->nextPtr();
1159
1160 condemnedNode->~Node();
1161 d_pool.deallocate(condemnedNode);
1162 }
1163}
1164
1165template <class TYPE>
1166inline
1168{
1169 bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);
1170
1171 d_enabledFlag = true;
1172}
1173
1174template <class TYPE>
1175inline
1177{
1178 bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);
1179
1180 d_enabledFlag = false;
1181}
1182
1183// ACCESSORS
1184template <class TYPE>
1185inline
1187{
1188 return static_cast<int>(d_heads.size());
1189}
1190
1191template <class TYPE>
1192inline
1194{
1195 return d_length;
1196}
1197
1198template <class TYPE>
1199inline
1201{
1202 return 0 == d_length;
1203}
1204
1205template <class TYPE>
1206inline
1208{
1209 return d_enabledFlag;
1210}
1211
1212} // close package namespace
1213
1214
1215#endif
1216
1217// ----------------------------------------------------------------------------
1218// Copyright 2015 Bloomberg Finance L.P.
1219//
1220// Licensed under the Apache License, Version 2.0 (the "License");
1221// you may not use this file except in compliance with the License.
1222// You may obtain a copy of the License at
1223//
1224// http://www.apache.org/licenses/LICENSE-2.0
1225//
1226// Unless required by applicable law or agreed to in writing, software
1227// distributed under the License is distributed on an "AS IS" BASIS,
1228// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1229// See the License for the specific language governing permissions and
1230// limitations under the License.
1231// ----------------------------- END-OF-FILE ----------------------------------
1232
1233/** @} */
1234/** @} */
1235/** @} */
Definition bdlcc_multipriorityqueue.h:491
MultipriorityQueue_Node *& nextPtr()
Definition bdlcc_multipriorityqueue.h:806
~MultipriorityQueue_Node()
Definition bdlcc_multipriorityqueue.h:793
BSLMF_NESTED_TRAIT_DECLARATION(MultipriorityQueue_Node, bslma::UsesBslmaAllocator)
TYPE & item()
Return a reference to the non-modifiable item stored in this node.
Definition bdlcc_multipriorityqueue.h:799
Definition bdlcc_multipriorityqueue.h:566
void pushBackMultipleRaw(const TYPE &item, int itemPriority, int numItems)
Definition bdlcc_multipriorityqueue.h:1046
int pushBack(bslmf::MovableRef< TYPE > item, int itemPriority)
Definition bdlcc_multipriorityqueue.h:994
int pushBack(const TYPE &item, int itemPriority)
Definition bdlcc_multipriorityqueue.h:944
void enable()
Definition bdlcc_multipriorityqueue.h:1167
void disable()
Definition bdlcc_multipriorityqueue.h:1176
MultipriorityQueue(int numPriorities, bslma::Allocator *basicAllocator=0)
Definition bdlcc_multipriorityqueue.h:903
void popFront(TYPE *item, int *itemPriority=0)
Definition bdlcc_multipriorityqueue.h:938
void pushFrontMultipleRaw(const TYPE &item, int itemPriority, int numItems)
Definition bdlcc_multipriorityqueue.h:1084
bool isEmpty() const
Definition bdlcc_multipriorityqueue.h:1200
int length() const
Return the total number of items in this multi-priority queue.
Definition bdlcc_multipriorityqueue.h:1193
int tryPopFront(TYPE *item, int *itemPriority=0)
Definition bdlcc_multipriorityqueue.h:1122
void removeAll()
Remove and destroy all items from this multi-priority queue.
Definition bdlcc_multipriorityqueue.h:1128
int numPriorities() const
Definition bdlcc_multipriorityqueue.h:1186
bool isEnabled() const
Definition bdlcc_multipriorityqueue.h:1207
BSLMF_NESTED_TRAIT_DECLARATION(MultipriorityQueue, bslma::UsesBslmaAllocator)
MultipriorityQueue(bslma::Allocator *basicAllocator=0)
Definition bdlcc_multipriorityqueue.h:889
~MultipriorityQueue()
Definition bdlcc_multipriorityqueue.h:918
Definition bdlma_concurrentpool.h:332
Definition bslstl_vector.h:1025
Node * * iterator
Definition bslstl_vector.h:1057
Definition bslalg_constructorproxy.h:368
Definition bslma_allocator.h:457
Definition bslma_deallocatorproctor.h:312
void release()
Definition bslma_deallocatorproctor.h:384
Definition bslma_managedptr.h:1182
ManagedPtr_PairProxy< TARGET_TYPE, ManagedPtrDeleter > release()
Definition bslma_managedptr.h:2454
Definition bslmf_movableref.h:751
Definition bslmt_condition.h:220
Definition bslmt_lockguard.h:234
Definition bslmt_mutex.h:315
Definition bsls_atomic.h:743
#define BSLS_ASSERT(X)
Definition bsls_assert.h:1804
#define BSLS_IDENT(str)
Definition bsls_ident.h:195
Definition bdlcc_boundedqueue.h:270
Definition balxml_encoderoptions.h:68
Definition bdlbb_blob.h:576
static int numTrailingUnsetBits(unsigned int value)
Definition bdlb_bitutil.h:462
Definition bslma_usesbslmaallocator.h:343
static MovableRef< t_TYPE > move(t_TYPE &reference) BSLS_KEYWORD_NOEXCEPT
Definition bslmf_movableref.h:1060