BDE 4.14.0 Production release
Loading...
Searching...
No Matches
bdlmt_multiqueuethreadpool.h
Go to the documentation of this file.
1/// @file bdlmt_multiqueuethreadpool.h
2///
3/// The content of this file has been pre-processed for Doxygen.
4///
5
6
7// bdlmt_multiqueuethreadpool.h -*-C++-*-
8
9#ifndef INCLUDED_BDLMT_MULTIQUEUETHREADPOOL
10#define INCLUDED_BDLMT_MULTIQUEUETHREADPOOL
11
12#include <bsls_ident.h>
13BSLS_IDENT("$Id: $")
14
15/// @defgroup bdlmt_multiqueuethreadpool bdlmt_multiqueuethreadpool
16/// @brief Provide a pool of queues, each processed serially by a thread pool.
17/// @addtogroup bdl
18/// @{
19/// @addtogroup bdlmt
20/// @{
21/// @addtogroup bdlmt_multiqueuethreadpool
22/// @{
23///
24/// <h1> Outline </h1>
25/// * <a href="#bdlmt_multiqueuethreadpool-purpose"> Purpose</a>
26/// * <a href="#bdlmt_multiqueuethreadpool-classes"> Classes </a>
27/// * <a href="#bdlmt_multiqueuethreadpool-description"> Description </a>
28/// * <a href="#bdlmt_multiqueuethreadpool-disabled-queues"> Disabled Queues </a>
29/// * <a href="#bdlmt_multiqueuethreadpool-paused-queues"> Paused Queues </a>
30/// * <a href="#bdlmt_multiqueuethreadpool-thread-safety"> Thread Safety </a>
31/// * <a href="#bdlmt_multiqueuethreadpool-job-execution-batch-size"> Job Execution Batch Size </a>
32/// * <a href="#bdlmt_multiqueuethreadpool-thread-names-for-sub-threads"> Thread Names for Sub-Threads </a>
33/// * <a href="#bdlmt_multiqueuethreadpool-usage"> Usage </a>
34/// * <a href="#bdlmt_multiqueuethreadpool-example-1-a-word-search-application"> Example 1: A Word Search Application </a>
35///
36/// # Purpose {#bdlmt_multiqueuethreadpool-purpose}
37/// Provide a pool of queues, each processed serially by a thread pool.
38///
39/// # Classes {#bdlmt_multiqueuethreadpool-classes}
40///
41/// - bdlmt::MultiQueueThreadPool: multi-threaded, serial processing of queues
42///
43/// @see bdlmt_threadpool
44///
45/// # Description {#bdlmt_multiqueuethreadpool-description}
46/// This component defines a dynamic, configurable pool of queues,
47/// each of which is processed by a thread in a thread pool, such that elements
48/// on a given queue are processed serially, regardless of which thread is
49/// processing the queue at a given time.
50///
51/// A `bdlmt::MultiQueueThreadPool` allows clients to create and delete queues,
52/// and to enqueue "jobs" (represented as client-specified functors) to specific
53/// queues. Queue processing is implemented on top of a `bdlmt::ThreadPool` by
54/// enqueuing a per-queue functor to the thread pool. Each functor dequeues the
55/// next item from its associated queue, processes it, and re-enqueues itself to
56/// the thread pool. Since there is at most one representative functor per
57/// queue, each queue is guaranteed to be processed serially by the thread pool.
58///
59/// In addition to the ability to create, delete, pause, and resume queues,
60/// clients are able to tune the underlying thread pool in accordance with the
61/// `bdlmt::ThreadPool` documentation.
62///
63/// ## Disabled Queues {#bdlmt_multiqueuethreadpool-disabled-queues}
64///
65///
66/// `bdlmt::MultiQueueThreadPool` allows clients to disable and re-enable
67/// queues. A disabled queue will allow no further jobs to be enqueued, but
68/// will continue to process the jobs that were enqueued prior to the call to
69/// `disableQueue`. Note that calling `disableQueue` will block the calling
70/// thread until the currently executing job (if any) on that queue completes.
71///
72/// ## Paused Queues {#bdlmt_multiqueuethreadpool-paused-queues}
73///
74///
75/// `bdlmt::MultiQueueThreadPool` also allows clients to pause and resume
76/// queues. Pausing a queue suspends the processing of jobs from a queue --
77/// i.e., after `pause` returns no further jobs will be processed on that queue
78/// until the queue is resumed. Note that calling `pauseQueue` will block the
79/// calling thread until the currently executing job (if any) on that queue
80/// completes.
81///
82/// ## Thread Safety {#bdlmt_multiqueuethreadpool-thread-safety}
83///
84///
85/// The `bdlmt::MultiQueueThreadPool` class is **fully thread-safe** (i.e., all
86/// public methods of a particular instance may safely execute concurrently).
87/// This class is also **thread-enabled** (i.e., the class does not function
88/// correctly in a non-multi-threading environment). See @ref bsldoc_glossary for
89/// complete definitions of **fully thread-safe** and **thread-enabled**.
90///
91/// ## Job Execution Batch Size {#bdlmt_multiqueuethreadpool-job-execution-batch-size}
92///
93///
94/// `bdlmt::MultiQueueThreadPool` allows clients to configure the maximum size
95/// of a group of jobs that a queue will execute "atomically". "Atomically", in
96/// this context, means that no state changes to the queue will be observed by
97/// that queue during the processing of the collection of jobs (e.g., a call to
98/// `pause` will only pause the queue after the currently executing group of
99/// jobs completes execution). By default a queue's batch size is 1.
100/// Configuring a larger batch size may improve throughput by reducing the
101/// synchronization overhead needed to execute a job. However, for many
102/// use-cases the overall throughput is limited by the time it takes to process
103/// a job (rather than synchronization overhead), so users are strongly
104/// encouraged to use benchmarks to guide their decision when setting this
105/// option.
106///
107/// ## Thread Names for Sub-Threads {#bdlmt_multiqueuethreadpool-thread-names-for-sub-threads}
108///
109///
110/// To facilitate debugging, users can provide a thread name as the `threadName`
111/// attribute of the `bslmt::ThreadAttributes` argument passed to the
112/// constructor, that will be used for all the sub-threads. The thread name
113/// should not be used programmatically, but will appear in debugging tools on
114/// platforms that support naming threads to help users identify the source and
115/// purpose of a thread. If no `ThreadAttributes` object is passed, or if the
116/// `threadName` attribute is not set, the default value "bdl.MultiQuePl" will
117/// be used. Note that this only applies to a `bdlmt::ThreadPool` automatically
118/// created by a `bdlmt::MultiQueueThreadPool`. If a thread pool already exists
119/// and is passed to the multi queue thread pool at construction, the subthreads
120/// will be named however was specified when that thread pool was created.
121///
122/// ## Usage {#bdlmt_multiqueuethreadpool-usage}
123///
124///
125/// This section illustrates intended use of this component.
126///
127/// ### Example 1: A Word Search Application {#bdlmt_multiqueuethreadpool-example-1-a-word-search-application}
128///
129///
130/// This example illustrates the use of a `bdlmt::MultiQueueThreadPool` in a
131/// word search application called `fastSearch`. `fastSearch` searches a list
132/// of files for a list of words, and returns the set of files which contain all
133/// of the specified words. `bdlmt::MultiQueueThreadPool` is used to provide
134/// concurrent processing of files, and to simplify the collection of results by
135/// serializing access to result sets which are maintained for each word.
136///
137/// First, we present a class used to manage a word, and the set of files which
138/// contain that word:
139/// @code
140/// /// This class defines a search profile consisting of a word and a set
141/// /// of files (given by name) that contain the word. Here, "word" is
142/// /// defined as any string of characters.
143/// class my_SearchProfile {
144///
145/// bsl::string d_word; // word to search for
146/// bsl::set<bsl::string> d_fileSet; // set of matching files
147///
148/// private:
149/// // not implemented
150/// my_SearchProfile(const my_SearchProfile&);
151/// my_SearchProfile& operator=(const my_SearchProfile&);
152///
153/// public:
154/// // CREATORS
155///
156/// /// Create a `my_SearchProfile` with the specified `word`.
157/// /// Optionally specify a `basicAllocator` used to supply memory. If
158/// /// `basicAllocator` is 0, the default memory allocator is used.
159/// my_SearchProfile(const char *word,
160/// bslma::Allocator *basicAllocator = 0);
161///
162/// /// Destroy this search profile.
163/// ~my_SearchProfile();
164///
165/// // MANIPULATORS
166///
167/// /// Insert the specified `file` into the file set maintained by this
168/// /// search profile.
169/// void insert(const char *file);
170///
171/// // ACCESSORS
172///
173/// /// Return `true` if the specified `file` matches this search profile.
174/// bool isMatch(const char *file) const;
175///
176/// /// Return a reference to the non-modifiable file set maintained by
177/// /// this search profile.
178/// const bsl::set<bsl::string>& fileSet() const;
179///
180/// /// Return a reference to the non-modifiable word maintained by this
181/// /// search profile.
182/// const bsl::string& word() const;
183/// };
184/// @endcode
185/// And the implementation:
186/// @code
187/// // CREATORS
188/// my_SearchProfile::my_SearchProfile(const char *word,
189/// bslma::Allocator *basicAllocator)
190/// : d_word(basicAllocator)
191/// , d_fileSet(bsl::less<bsl::string>(), basicAllocator)
192/// {
193/// assert(word);
194///
195/// d_word.assign(word);
196/// }
197///
198/// my_SearchProfile::~my_SearchProfile()
199/// {
200/// }
201///
202/// // MANIPULATORS
203/// inline
204/// void my_SearchProfile::insert(const char *file)
205/// {
206/// assert(file);
207///
208/// d_fileSet.insert(file);
209/// }
210///
211/// // ACCESSORS
212/// bool my_SearchProfile::isMatch(const char *file) const
213/// {
214/// assert(file);
215///
216/// bool found = false;
217/// bsl::ifstream ifs(file);
218/// bsl::string line;
219/// while (bsl::getline(ifs, line)) {
220/// if (bsl::string::npos != line.find(d_word)) {
221/// found = true;
222/// break;
223/// }
224/// }
225/// ifs.close();
226/// return found;
227/// }
228///
229/// inline
230/// const bsl::set<bsl::string>& my_SearchProfile::fileSet() const
231/// {
232/// return d_fileSet;
233/// }
234///
235/// inline
236/// const bsl::string& my_SearchProfile::word() const
237/// {
238/// return d_word;
239/// }
240/// @endcode
241/// Next, we define a helper function to perform a search of a word in a
242/// particular file. The function is parameterized by a search profile and a
243/// file name. If the specified file name matches the profile, it is inserted
244/// into the profile's file list.
245/// @code
246/// /// Insert the specified `file` to the file set of the specified search
247/// /// `profile` if `file` matches the `profile`.
248/// void my_SearchCb(my_SearchProfile* profile, const char *file)
249/// {
250///
251/// assert(profile);
252/// assert(file);
253///
254/// if (profile->isMatch(file)) {
255/// profile->insert(file);
256/// }
257/// }
258/// @endcode
259/// Lastly, we present the front end to the search application: `fastSearch`.
260/// `fastSearch` is parameterized by a list of words to search for, a list of
261/// files to search in, and a set which is populated with the search results.
262/// `fastSearch` instantiates a `bdlmt::MultiQueueThreadPool`, and creates a
263/// queue for each word. It then associates each queue with a search profile
264/// based on a word in the word list. Then, it enqueues a job to each queue for
265/// each file in the file list that tries to match the file to each search
266/// profile. Lastly, `fastSearch` collects the results, which is the set
267/// intersection of each file set maintained by the individual search profiles.
268/// @code
269/// /// Return the set of files, specified by `fileList`, containing every
270/// /// word in the specified `wordList`, in the specified `resultSet`.
271/// /// Optionally specify `repetitions`, the number of repetitions to run
272/// /// the search jobs (it is used to increase the load for performance
273/// /// testing). Optionally specify a `basicAllocator` used to supply
274/// /// memory. If `basicAllocator` is 0, the default memory allocator is
275/// /// used.
276/// void fastSearch(const bsl::vector<bsl::string>& wordList,
277/// const bsl::vector<bsl::string>& fileList,
278/// bsl::set<bsl::string>& resultSet,
279/// int repetitions = 1,
280/// bslma::Allocator *basicAllocator = 0)
281/// {
282///
283/// typedef bsl::vector<bsl::string> ListType;
284/// // This type is defined for notational convenience when iterating
285/// // over 'wordList' or 'fileList'.
286///
287/// typedef bsl::pair<int, my_SearchProfile*> RegistryValue;
288/// // This type is defined for notational convenience. The first
289/// // parameter specifies a queue ID. The second parameter specifies
290/// // an associated search profile.
291///
292/// typedef bsl::map<bsl::string, RegistryValue> RegistryType;
293/// // This type is defined for notational convenience. The first
294/// // parameter specifies a word. The second parameter specifies a
295/// // tuple containing a queue ID, and an associated search profile
296/// // containing the specified word.
297///
298/// enum {
299/// // thread pool configuration
300/// k_MIN_THREADS = 4,
301/// k_MAX_THREADS = 20,
302/// k_MAX_IDLE = 100 // use a very short idle time since new jobs
303/// // arrive only at startup
304/// };
305/// bslmt::ThreadAttributes defaultAttrs;
306/// bdlmt::MultiQueueThreadPool pool(defaultAttrs,
307/// k_MIN_THREADS,
308/// k_MAX_THREADS,
309/// k_MAX_IDLE,
310/// basicAllocator);
311/// RegistryType profileRegistry(bsl::less<bsl::string>(), basicAllocator);
312///
313/// // Create a queue and a search profile associated with each word in
314/// // 'wordList'.
315///
316/// for (ListType::const_iterator it = wordList.begin();
317/// it != wordList.end();
318/// ++it) {
319/// bslma::Allocator *allocator =
320/// bslma::Default::allocator(basicAllocator);
321///
322/// const bsl::string& word = *it;
323/// int id = pool.createQueue();
324/// LOOP_ASSERT(word, 0 != id);
325/// my_SearchProfile *profile = new (*allocator)
326/// my_SearchProfile(word.c_str(),
327/// allocator);
328///
329/// bslma::RawDeleterProctor<my_SearchProfile, bslma::Allocator>
330/// deleter(profile, allocator);
331///
332/// profileRegistry[word] = bsl::make_pair(id, profile);
333/// deleter.release();
334/// }
335///
336/// // Start the pool, enabling enqueuing and queue processing.
337/// pool.start();
338///
339/// // Enqueue a job which tries to match each file in 'fileList' with each
340/// // search profile.
341///
342/// for (ListType::const_iterator it = fileList.begin();
343/// it != fileList.end();
344/// ++it) {
345/// for (ListType::const_iterator jt = wordList.begin();
346/// jt != wordList.end();
347/// ++jt) {
348/// const bsl::string& file = *it;
349/// const bsl::string& word = *jt;
350/// RegistryValue& rv = profileRegistry[word];
351/// Func job;
352/// makeFunc(&job, my_SearchCb, rv.second, file.c_str());
353/// for (int i = 0; i < repetitions; ++i) {
354/// int rc = pool.enqueueJob(rv.first, job);
355/// LOOP_ASSERT(word, 0 == rc);
356/// }
357/// }
358/// }
359///
360/// // Stop the pool, and wait while enqueued jobs are processed.
361/// pool.stop();
362///
363/// // Construct the 'resultSet' as the intersection of file sets collected
364/// // in each search profile.
365///
366/// resultSet.insert(fileList.begin(), fileList.end());
367/// for (RegistryType::iterator it = profileRegistry.begin();
368/// it != profileRegistry.end();
369/// ++it) {
370/// my_SearchProfile *profile = it->second.second;
371/// const bsl::set<bsl::string>& fileSet = profile->fileSet();
372/// bsl::set<bsl::string> tmpSet;
373/// bsl::set_intersection(fileSet.begin(),
374/// fileSet.end(),
375/// resultSet.begin(),
376/// resultSet.end(),
377/// bsl::inserter(tmpSet, tmpSet.begin()));
378/// resultSet = tmpSet;
379/// bslma::Default::allocator(basicAllocator)->deleteObjectRaw(
380/// profile);
381/// }
382/// }
383/// @endcode
384/// @}
385/** @} */
386/** @} */
387
388/** @addtogroup bdl
389 * @{
390 */
391/** @addtogroup bdlmt
392 * @{
393 */
394/** @addtogroup bdlmt_multiqueuethreadpool
395 * @{
396 */
397
398#include <bdlscm_version.h>
399
400#include <bslmt_lockguard.h>
401#include <bdlmt_threadpool.h>
402
403#include <bdlcc_objectpool.h>
404
405#include <bslma_allocator.h>
407
409
410#include <bslmt_condition.h>
411#include <bslmt_mutex.h>
412#include <bslmt_mutexassert.h>
414#include <bslmt_readlockguard.h>
415#include <bslmt_writelockguard.h>
416
417#include <bsls_assert.h>
418#include <bsls_atomic.h>
419
420#include <bsl_deque.h>
421#include <bsl_functional.h>
422#include <bsl_map.h>
423
424#include <bslmt_latch.h>
425
426
427namespace bdlmt {
428
429class MultiQueueThreadPool;
430
431 // ================================
432 // class MultiQueueThreadPool_Queue
433 // ================================
434
435/// This private class provides a thread-safe, lightweight job queue.
436///
437/// See @ref bdlmt_multiqueuethreadpool
439
440 public:
441 // PUBLIC TYPES
442 typedef bsl::function<void()> Job;
443
444 private:
445 // PRIVATE TYPES
446 enum EnqueueState {
447 // enqueue states
448 e_ENQUEUING_ENABLED, // enqueuing is enabled
449 e_ENQUEUING_DISABLED, // enqueuing is disabled
450 e_DELETING // deleting
451 };
452
453 enum RunState {
454 e_NOT_SCHEDULED, // running but not scheduled
455 e_SCHEDULED, // running and scheduled
456 e_PAUSING, // pause requested but not completed yet
457 e_PAUSED // paused
458 };
459
460 // DATA
461 MultiQueueThreadPool *d_multiQueueThreadPool_p;
462 // the `MultiQueueThreadPool`
463 // that owns this object
464
465 bsl::deque<Job> d_list; // queue of jobs to be
466 // executed
467
468 EnqueueState d_enqueueState; // maintains enqueue state
469
470 RunState d_runState; // maintains run state
471
472 int d_batchSize; // execution batch size
473
474 mutable bslmt::Mutex d_lock; // protect queue and
475 // informational members
476
477 bslmt::Condition d_pauseCondition; // use to notify thread
478 // awaiting pause state
479
480 int d_pauseCount; // number of threads waiting
481 // for the pause to complete
482
483 Job d_processingCb; // bound processing callback
484 // for pool
485
486 bslmt::ThreadUtil::Handle d_processor; // current worker thread, or
487 // ThreadUtil::invalidHandle()
488
489 // NOT IMPLEMENTED
490 MultiQueueThreadPool_Queue();
491 MultiQueueThreadPool_Queue(const MultiQueueThreadPool_Queue&);
492 MultiQueueThreadPool_Queue &operator=(const MultiQueueThreadPool_Queue &);
493
494 // PRIVATE MANIPULATORS
495
496 /// Mark this queue as paused, notify any threads blocked on
497 /// `d_pauseCondition`, and schedule the deletion job if this queue is
498 /// to be deleted. The behavior is undefined unless this queue's lock
499 /// is in a locked state and `e_PAUSING == d_runState`.
500 void setPaused();
501
502 public:
503 // TRAITS
506
507 // CREATORS
508
509 /// Create a `MultiQueueThreadPool_Queue` with an initial capacity of 0
510 /// and initialized to use the specified `multiQueueThreadPool` to track
511 /// aggregate values (e.g., the number of active queues) and to obtain
512 /// the thread pool used to execute jobs that are appended to this
513 /// queue. Optionally specify a `basicAllocator` used to supply memory.
514 /// If `basicAllocator` is 0, the default memory allocator is used.
515 explicit
517 bslma::Allocator *basicAllocator = 0);
518
519 /// Destroy this queue.
521
522 // MANIPULATORS
523
524 /// Enable enqueuing to this queue. Return 0 on success, and a non-zero
525 /// value otherwise. This method will fail (with an error) if
526 /// `prepareForDeletion` has already been called on this object.
527 int enable();
528
529 /// Disable enqueuing to this queue. Return 0 on success, and a non-zero
530 /// value otherwise. This method will fail (with an error) if
531 /// `prepareForDeletion` has already been called on this object.
532 int disable();
533
534 /// Block until all threads waiting for this queue to pause are released.
536
537 /// Execute the `Job` at the front of this queue, dequeue the `Job`, and
538 /// if the queue is not paused schedule a callback from the associated
539 /// thread pool. The behavior is undefined if this queue is empty.
541
542 /// Permanently disable enqueueing from this queue, and enqueue a job
543 /// that will delete this queue. Optionally specify `cleanupFunctor`,
544 /// which, if supplied, will be invoked immediately prior to this
545 /// queue's deletion. Optionally specify `completionSignal`, on which
546 /// (if the calling thread is not processing a job - or batch of jobs -
547 /// for this queue) to invoke `arrive` when the queue is deleted.
548 /// Return `true` if the current thread is the thread processing a job
549 /// (or batch of jobs), and `false` otherwise. Note that if
550 /// `completionSignal` is supplied, a return status of `false` typically
551 /// indicates that `completionSignal->wait()` should be invoked from the
552 /// calling function', while a return status of `true` indicates this is
553 /// an attempt to delete the queue from within a job being processed on
554 /// the queue (so waiting on the queue's deletion would result in a
555 /// dead-lock).
556 bool enqueueDeletion(const Job& cleanupFunctor = Job(),
557 bslmt::Latch *completionSignal = 0);
558
559 /// Initiate the pausing of this queue, prevent jobs from being executed
560 /// on this queue (excluding the currently-executing job - or batch of
561 /// jobs - if there is one), and prevent the queue from being deleted.
562 /// Return 0 on success, and a non-zero value if the queue is already
563 /// paused or is being paused or deleted by another thread. The
564 /// behavior is undefined unless, after a successful invocation of
565 /// `initiatePause`, `waitWhilePausing` is invoked (to complete the
566 /// pause operation and allow the queue to, potentially, be deleted).
568
569 /// Enqueue the specified `functor` at the end of this queue. Return 0
570 /// on success, and a non-zero value if enqueuing is disabled.
571 int pushBack(const Job& functor);
572
573 /// Add the specified `functor` at the front of this queue. Return 0 on
574 /// success, and a non-zero value if enqueuing is disabled.
575 int pushFront(const Job& functor);
576
577 /// Reset this queue to its initial state. The behavior is undefined
578 /// unless this queue's lock is in an unlocked state. After this method
579 /// returns, the object is ready for use as though it were a new object.
580 /// Note that this method is not thread-safe and is used by the object
581 /// pool contained within `*d_multiQueueThreadPool_p`.
582 void reset();
583
584 /// Allow jobs on the queue to begin executing. Return 0 on success,
585 /// and a non-zero value if the queue is not paused or `!d_list.empty()`
586 /// and the associated thread pool fails to enqueue a job.
587 int resume();
588
589 /// Configure this queue to process jobs in groups of the specified
590 /// `batchSize` (see {`Job Execution Batch Size`}). When a thread is
591 /// selecting jobs for processing, if fewer than `batchSize` jobs are
592 /// available then only the available jobs will be processed in the
593 /// current batch. The behavior is undefined unless `1 <= batchSize`.
594 /// Note that the initial value for the execution batch size is 1 for
595 /// all queues.
597
598 /// Wait until any currently-executing job on the queue completes and
599 /// the queue is paused. Note that pausing differs from `disable` in
600 /// that (1) `pause` stops processing for a queue, and (2) does *not*
601 /// prevent additional jobs from being enqueued. The behavior of this
602 /// method is undefined unless it is invoked after a successful
603 /// `initiatePause` invocation.
605
606 // ACCESSORS
607
608 /// Return an instantaneous snapshot of the execution batch size (see
609 /// {`Job Execution Batch Size`}). When a thread is selecting jobs for
610 /// processing, if fewer than `batchSize` jobs are available then only
611 /// the available jobs will be processed in the current batch.
612 int batchSize() const;
613
614 /// Report whether all jobs in this queue are finished.
615 bool isDrained() const;
616
617 /// Report whether enqueuing to this object is enabled. This object is
618 /// constructed with enqueuing enabled.
619 bool isEnabled() const;
620
621 /// Report whether this object is paused.
622 bool isPaused() const;
623
624 /// Return an instantaneous snapshot of the length of this queue.
625 int length() const;
626};
627
628 // ==========================
629 // class MultiQueueThreadPool
630 // ==========================
631
632/// This class implements a dynamic, configurable pool of queues, each of
633/// which is processed serially by a thread pool.
634///
635/// See @ref bdlmt_multiqueuethreadpool
637
638 // FRIENDS
640
641 // PRIVATE TYPES
642 enum State {
643 // Internal running states.
644 e_STATE_RUNNING,
645 e_STATE_STOPPING,
646 e_STATE_STOPPED
647 };
648
649 public:
650 // PUBLIC TYPES
651 typedef bsl::function<void()> Job;
654
655 private:
656 // PRIVATE CLASS DATA
657 static const char s_defaultThreadName[16]; // Thread name to use
658 // when none is
659 // specified.
660
661 // PRIVATE DATA
662 bslma::Allocator *d_allocator_p; // memory allocator (held)
663
664 ThreadPool *d_threadPool_p; // threads for queue processing
665
666 bool d_threadPoolIsOwned; // `true` if thread pool is owned
667
672 > d_queuePool; // pool of queues
673
674 QueueRegistry d_queueRegistry; // registry of queues
675
676 int d_nextId; // next id to provide from
677 // `createQueue`
678
679 State d_state; // maintains internal state
680
682 d_lock; // locked for write when deleting
683 // queues or changing pool state
684
685 bsls::AtomicInt d_numActiveQueues; // number of non-empty queues
686
687 bsls::AtomicInt d_numExecuted; // the total number of requests
688 // processed by this pool since the
689 // last time this value was reset
690
691 bsls::AtomicInt d_numEnqueued; // the total number of requests
692 // enqueued into this pool since
693 // the last time this value was
694 // reset
695
696 bsls::AtomicInt d_numDeleted; // the total number of requests
697 // deleted from this pool since the
698 // last time this value was reset
699 private:
700 // NOT IMPLEMENTED
703
704 // PRIVATE MANIPULATORS
705
706 /// Delete the specified `queue`, if the specified `cleanup` is valid
707 /// invoke `cleanup`, if the specified `completionSignal` is not 0, call
708 /// `completionSignal->arrive`. `completionSignal` may be 0. Note that
709 /// this callback provides a mechanism for proper lifetime management of
710 /// the `queue` by scheduling the deletion with the associated thread
711 /// pool since the `MultiQueueThreadPool` does not know *when* to delete
712 /// the queue and a `MultiQueueThreadPool_Queue` cannot delete itself at
713 /// the appropriate time.
714 void deleteQueueCb(MultiQueueThreadPool_Queue *queue,
715 const CleanupFunctor& cleanup,
716 bslmt::Latch *completionSignal);
717
718 /// Load into the specified `*queue` a pointer to the queue referenced
719 /// by the specified `id` if this `MultiQueueThreadPool` is in a state
720 /// where the `queue` can be used. Return 0 on success, and a non-zero
721 /// value if the `id` is not contained in `d_queueRegistry`, this
722 /// `MultiQueueThreadPool` is not in the running state, or
723 /// `0 == d_threadPool_p->enabled()`. The behavior is undefined unless
724 /// the invoking thread has a lock, read or write, on `d_lock`.
725 int findIfUsable(int id, MultiQueueThreadPool_Queue **queue);
726
727 public:
728 // TRAITS
731
732 // CREATORS
733
734 /// Construct a `MultiQueueThreadPool` with the specified
735 /// `threadAttributes`, the specified `minThreads` minimum number of
736 /// threads, the specified `maxThreads` maximum number of threads, and
737 /// the specified `maxIdleTime` idle time (in milliseconds) after which
738 /// a thread may be considered for destruction. Optionally specify a
739 /// `basicAllocator` used to supply memory. If `basicAllocator` is 0,
740 /// the currently installed default allocator is used. The behavior is
741 /// undefined unless `0 <= minThreads`, `minThreads <= maxThreads`, and
742 /// `0 <= maxIdleTime`. Note that the `MultiQueueThreadPool` is created
743 /// without any queues. Although queues may be created, `start` must be
744 /// called before enqueuing jobs.
746 int minThreads,
747 int maxThreads,
748 int maxIdleTime,
749 bslma::Allocator *basicAllocator = 0);
750
751 /// Construct a `MultiQueueThreadPool` with the specified `threadPool`.
752 /// Optionally specify a `basicAllocator` used to supply memory. If
753 /// `basicAllocator` is 0, the default memory allocator is used. The
754 /// behavior is undefined if `threadPool` is 0. Note that the
755 /// `MultiQueueThreadPool` is created without any queues. Although
756 /// queues may be created, `start` must be called before enqueuing jobs.
757 explicit
759 bslma::Allocator *basicAllocator = 0);
760
761 /// Destroy this multi-queue thread pool. Disable queuing on all
762 /// queues, and wait until all queues are empty. Then, delete all
763 /// queues, and shut down the thread pool if the thread pool is owned by
764 /// this object. This method will block if any thread is executing
765 /// `start` or `stop` at the time of the call.
767
768 // MANIPULATORS
769
770 /// Add the specified `functor` at the front of the queue specified by
771 /// `id`. Return 0 if added successfully, and a non-zero value if
772 /// queuing is disabled. The behavior is undefined unless `functor` is
773 /// bound. Note that the position of `functor` relative to any
774 /// currently queued jobs is unspecified unless the queue is currently
775 /// paused.
776 int addJobAtFront(int id, const Job& functor);
777
778 /// Create a queue with unlimited capacity and a default number of
779 /// initial elements. Return a non-zero queue ID. The queue ID can be
780 /// used to enqueue jobs to the queue, or to control or delete the
781 /// queue.
783
784 /// Disable enqueuing to the queue associated with the specified `id`,
785 /// and enqueue the specified `cleanupFunctor` to the *front* of the
786 /// queue. The `cleanupFunctor` is guaranteed to be the last queue
787 /// element processed, after which the queue is destroyed. This
788 /// function does not wait for the `cleanupFunctor` to be executed
789 /// (instead the caller is notified asynchronously through the execution
790 /// of the supplied `cleanupFunctor`). Return 0 on success, and a
791 /// non-zero value otherwise. Note that this function will fail if this
792 /// pool is stopped.
793 int deleteQueue(int id, const CleanupFunctor& cleanupFunctor);
794
795 /// Disable enqueuing to the queue associated with the specified `id`,
796 /// and when the currently executing job (or batch of jobs) of that
797 /// queue, if any, is complete, then destroy the queue. Return 0 on
798 /// success, and a non-zero value otherwise. This function will fail if
799 /// the pool is stopped. Any other (non-executing) jobs on the queue
800 /// are deleted asynchronously. The calling thread blocks until
801 /// completion of the currently executing job (or batch of jobs), except
802 /// when `deleteQueue` is called from a job in the queue being deleted.
803 /// In that latter case, no block takes place, the queue is deleted (no
804 /// longer observable from the `MultiQueueThreadPool`), and the job
805 /// completes.
806 int deleteQueue(int id);
807
808 /// Disable enqueuing to the queue associated with the specified `id`.
809 /// Return 0 on success, and a non-zero value otherwise. Note that this
810 /// method differs from `pauseQueue` in that (1) `disableQueue` does
811 /// *not* stop processing for a queue, and (2) prevents additional jobs
812 /// from being enqueued.
813 int disableQueue(int id);
814
815 /// Wait until all queues are empty. This method waits until all
816 /// non-paused queues are empty without disabling the queues (and may
817 /// thus wait indefinitely). The queues and/or the thread pool may be
818 /// either enabled or disabled when this method is called. This method
819 /// may be called on a stopped or started thread pool. Note that
820 /// `drain` does not attempt to delete queues directly. However, as a
821 /// side-effect of emptying all queues, any queue for which
822 /// `deleteQueue` was called previously will be deleted before `drain`
823 /// returns. Note also that this method waits by repeatedly yielding.
824 void drain();
825
826 /// Wait until all jobs in the queue indicated by the specified `id` are
827 /// finished. This method simply waits until that queue is empty,
828 /// without disabling the queue; it may thus wait indefinitely if more
829 /// jobs are being added. The queue may be enabled or disabled when
830 /// this method is called. Return 0 on success, and a non-zero value if
831 /// the specified queue does not exist or is deleted while this method
832 /// is waiting. Note that this method waits by repeatedly yielding.
833 int drainQueue(int id);
834
835 /// Enqueue the specified `functor` to the queue specified by `id`.
836 /// Return 0 if enqueued successfully, and a non-zero value if queuing
837 /// is disabled. The behavior is undefined unless `functor` is bound.
838 int enqueueJob(int id, const Job& functor);
839
840 /// Enable enqueuing to the queue associated with the specified `id`.
841 /// Return 0 on success, and a non-zero value otherwise. It is an error
842 /// to call `enableQueue` if a previous call to `stop` is being
843 /// executed.
844 int enableQueue(int id);
845
846 /// Load into the specified `numExecuted` and `numEnqueued` the number
847 /// of items dequeued / enqueued (respectively) since the last time
848 /// these values were reset and reset these values. Optionally specify
849 /// a `numDeleted` used to load into the number of items deleted since
850 /// the last time this value was reset. Reset the count of deleted
851 /// items.
852 void numProcessedReset(int *numExecuted,
853 int *numEnqueued,
854 int *numDeleted = 0);
855
856 /// Wait until any currently-executing job (or batch of jobs) on the
857 /// queue with the specified `id` completes, then prevent any more jobs
858 /// from being executed on that queue. Return 0 on success, and a
859 /// non-zero value if the queue is already paused or is being paused or
860 /// deleted by another thread. Note that this method may be invoked
861 /// from a job executing on the given queue, in which case this method
862 /// does not wait. Note also that this method differs from
863 /// `disableQueue` in that (1) `pauseQueue` stops processing for a
864 /// queue, and (2) does *not* prevent additional jobs from being
865 /// enqueued.
866 int pauseQueue(int id);
867
868 /// Allow jobs on the queue with the specified `id` to begin executing.
869 /// Return 0 on success, and a non-zero value if the queue does not
870 /// exist or is not paused.
871 int resumeQueue(int id);
872
873 /// Configure the queue specified by `id` to process jobs in groups of
874 /// the specified `batchSize` (see {`Job Execution Batch Size`}). When
875 /// a thread is selecting jobs for processing, if fewer than `batchSize`
876 /// jobs are available then only the available jobs will be processed in
877 /// the current batch. Return 0 on success, and a non-zero value
878 /// otherwise. The behavior is undefined unless `1 <= batchSize`. Note
879 /// that the initial value for the execution batch size is 1 for all
880 /// queues.
881 int setBatchSize(int id, int batchSize);
882
883 /// Disable queuing on all queues, and wait until all non-paused queues
884 /// are empty. Then, delete all queues, and shut down the thread pool
885 /// if the thread pool is owned by this object.
886 void shutdown();
887
888 /// Enable queuing on all queues, start the thread pool if the thread
889 /// pool is owned by this object, and ensure that at least the minimum
890 /// number of processing threads are started. Return 0 on success, and
891 /// a non-zero value otherwise. This method will block if any thread is
892 /// executing `stop` or `shutdown` at the time of the call. This method
893 /// has no effect if this thread pool has already been started. Note
894 /// that any paused queues remain paused.
895 int start();
896
897 /// Disable queuing on all queues and wait until all non-paused queues
898 /// are empty. Then, stop the thread pool if the thread pool is owned
899 /// by this object. Note that `stop` does not attempt to delete queues
900 /// directly. However, as a side-effect of emptying all queues, any
901 /// queue for which `deleteQueue` was called previously will be deleted
902 /// before `stop` unblocks.
903 void stop();
904
905 // ACCESSORS
906
907 /// Return an instantaneous snapshot of the execution batch size (see
908 /// @ref bdlmt_multiqueuethreadpool-job-execution-batch-size of the queue associated with the
909 /// specified `id`, or -1 if `id` is not a valid queue id. When a
910 /// thread is selecting jobs for processing, if fewer than `batchSize`
911 /// jobs are available then only the available jobs will be processed in
912 /// the current batch.
913 int batchSize(int id) const;
914
915 /// Return `true` if the queue associated with the specified `id` is
916 /// currently paused, or `false` otherwise (including if `id` is not a
917 /// valid queue id).
918 bool isPaused(int id) const;
919
920 /// Return `true` if the queue associated with the specified `id` is
921 /// currently enabled, or `false` otherwise (including if `id` is not a
922 /// valid queue id).
923 bool isEnabled(int id) const;
924
925 /// Return an instantaneous snapshot of the number of queues managed by
926 /// this object.
927 int numQueues() const;
928
929 /// Return an instantaneous snapshot of the total number of elements
930 /// enqueued.
931 int numElements() const;
932
933 /// Return an instantaneous snapshot of the number of elements enqueued in
934 /// the queue associated with the specified `id` as a non-negative integer,
935 /// or -1 if `id` does not specify a valid queue.
936 int numElements(int id) const;
937
938 /// Load into the specified `numExecuted` and `numEnqueued` the number of
939 /// items dequeued / enqueued (respectively) since the last time these
940 /// values were reset. Optionally specify a `numDeleted` used to load into
941 /// the number of items deleted since the last time this value was reset.
942 void numProcessed(int *numExecuted,
943 int *numEnqueued,
944 int *numDeleted = 0) const;
945
946 /// Return a reference to the non-modifiable thread pool owned by this
947 /// object.
948 const ThreadPool& threadPool() const;
949};
950
951// ============================================================================
952// INLINE DEFINITIONS
953// ============================================================================
954
955 // --------------------------------
956 // class MultiQueueThreadPool_Queue
957 // --------------------------------
958
959// ACCESSORS
960inline
962{
963 bslmt::LockGuard<bslmt::Mutex> guard(&d_lock);
964
965 return d_batchSize;
966}
967
968inline
970{
971 bslmt::LockGuard<bslmt::Mutex> guard(&d_lock);
972
973 return 0 == d_list.size() && ( e_NOT_SCHEDULED == d_runState
974 || e_PAUSED == d_runState);
975}
976
977inline
979{
980 bslmt::LockGuard<bslmt::Mutex> guard(&d_lock);
981
982 return e_ENQUEUING_ENABLED == d_enqueueState;
983}
984
985inline
987{
988 bslmt::LockGuard<bslmt::Mutex> guard(&d_lock);
989
990 return e_PAUSED == d_runState;
991}
992
993inline
995{
996 bslmt::LockGuard<bslmt::Mutex> guard(&d_lock);
997
998 return static_cast<int>(d_list.size());
999}
1000
1001 // --------------------------
1002 // class MultiQueueThreadPool
1003 // --------------------------
1004
1005// PRIVATE MANIPULATORS
1006inline
1007int MultiQueueThreadPool::findIfUsable(int id,
1009{
1010 if ( e_STATE_RUNNING != d_state
1011 || 0 == d_threadPool_p->enabled()) {
1012 return 1; // RETURN
1013 }
1014
1015 QueueRegistry::iterator iter = d_queueRegistry.find(id);
1016
1017 if (d_queueRegistry.end() == iter) {
1018 return 1; // RETURN
1019 }
1020
1021 *queue = iter->second;
1022
1023 return 0;
1024}
1025
1026// MANIPULATORS
1027inline
1028int MultiQueueThreadPool::addJobAtFront(int id, const Job& functor)
1029{
1031
1033
1034 if (findIfUsable(id, &queue)) {
1035 return 1; // RETURN
1036 }
1037
1038 if (0 == queue->pushFront(functor)) {
1039 ++d_numEnqueued;
1040 return 0; // RETURN
1041 }
1042
1043 return 1;
1044}
1045
1046inline
1047int MultiQueueThreadPool::enqueueJob(int id, const Job& functor)
1048{
1050
1052
1053 if (findIfUsable(id, &queue)) {
1054 return 1; // RETURN
1055 }
1056
1057 if (0 == queue->pushBack(functor)) {
1058 ++d_numEnqueued;
1059 return 0; // RETURN
1060 }
1061
1062 return 1;
1063}
1064
1065inline
1067 int *numEnqueued,
1068 int *numDeleted)
1069{
1071
1072 // To maintain consistency, all three must be zeroed atomically.
1073
1074 *numExecuted = d_numExecuted.swap(0);
1075 if (numDeleted) {
1076 *numDeleted = d_numDeleted.swap(0);
1077 }
1078 else {
1079 d_numDeleted = 0;
1080 }
1081 *numEnqueued = d_numEnqueued.swap(0);
1082}
1083
1084inline
1085int MultiQueueThreadPool::setBatchSize(int id, int batchSize)
1086{
1088
1090
1092
1093 if (findIfUsable(id, &queue)) {
1094 return 1; // RETURN
1095 }
1096
1097 queue->setBatchSize(batchSize);
1098
1099 return 0;
1100}
1101
1102// ACCESSORS
1103inline
1105{
1107
1108 QueueRegistry::const_iterator iter = d_queueRegistry.find(id);
1109
1110 if (d_queueRegistry.end() != iter) {
1111 return iter->second->batchSize(); // RETURN
1112 }
1113
1114 return -1;
1115}
1116
1117inline
1119{
1121
1122 QueueRegistry::const_iterator iter = d_queueRegistry.find(id);
1123
1124 if (d_queueRegistry.end() != iter) {
1125 return iter->second->isEnabled(); // RETURN
1126 }
1127
1128 return false;
1129}
1130
1131inline
1133{
1135
1136 QueueRegistry::const_iterator iter = d_queueRegistry.find(id);
1137
1138 if (d_queueRegistry.end() != iter) {
1139 return iter->second->isPaused(); // RETURN
1140 }
1141
1142 return false;
1143}
1144
1145inline
1147{
1148 // Access 'd_numEnqueued' last to ensure the result is non-negative.
1149
1150 return -(d_numExecuted + d_numDeleted) + d_numEnqueued;
1151}
1152
1153inline
1155{
1157
1158 QueueRegistry::const_iterator iter = d_queueRegistry.find(id);
1159
1160 if (d_queueRegistry.end() != iter) {
1161 return iter->second->length(); // RETURN
1162 }
1163
1164 return -1;
1165}
1166
1167inline
1169 int *numEnqueued,
1170 int *numDeleted) const
1171{
1172 // Access 'd_numEnqueued' last to ensure
1173 // 'numEnqueued >= numExecuted + numDeleted'.
1174
1175 *numExecuted = d_numExecuted;
1176 if (numDeleted) {
1177 *numDeleted = d_numDeleted;
1178 }
1179 *numEnqueued = d_numEnqueued;
1180}
1181
1182inline
1184{
1186
1187 return static_cast<int>(d_queueRegistry.size());
1188}
1189
1190inline
1192{
1193 return *d_threadPool_p;
1194}
1195
1196} // close package namespace
1197
1198
1199#endif
1200
1201// ----------------------------------------------------------------------------
1202// Copyright 2020 Bloomberg Finance L.P.
1203//
1204// Licensed under the Apache License, Version 2.0 (the "License");
1205// you may not use this file except in compliance with the License.
1206// You may obtain a copy of the License at
1207//
1208// http://www.apache.org/licenses/LICENSE-2.0
1209//
1210// Unless required by applicable law or agreed to in writing, software
1211// distributed under the License is distributed on an "AS IS" BASIS,
1212// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1213// See the License for the specific language governing permissions and
1214// limitations under the License.
1215// ----------------------------- END-OF-FILE ----------------------------------
1216
1217/** @} */
1218/** @} */
1219/** @} */
Definition bdlcc_objectpool.h:444
Definition bdlcc_objectpool.h:686
Definition bdlmt_multiqueuethreadpool.h:438
bool isPaused() const
Report whether this object is paused.
Definition bdlmt_multiqueuethreadpool.h:986
int pushBack(const Job &functor)
BSLMF_NESTED_TRAIT_DECLARATION(MultiQueueThreadPool_Queue, bslma::UsesBslmaAllocator)
bool isDrained() const
Report whether all jobs in this queue are finished.
Definition bdlmt_multiqueuethreadpool.h:969
bsl::function< void()> Job
Definition bdlmt_multiqueuethreadpool.h:442
~MultiQueueThreadPool_Queue()
Destroy this queue.
int batchSize() const
Definition bdlmt_multiqueuethreadpool.h:961
int pushFront(const Job &functor)
void setBatchSize(int batchSize)
bool enqueueDeletion(const Job &cleanupFunctor=Job(), bslmt::Latch *completionSignal=0)
MultiQueueThreadPool_Queue(MultiQueueThreadPool *multiQueueThreadPool, bslma::Allocator *basicAllocator=0)
bool isEnabled() const
Definition bdlmt_multiqueuethreadpool.h:978
void drainWaitWhilePausing()
Block until all threads waiting for this queue to pause are released.
int length() const
Return an instantaneous snapshot of the length of this queue.
Definition bdlmt_multiqueuethreadpool.h:994
Definition bdlmt_multiqueuethreadpool.h:636
MultiQueueThreadPool(const bslmt::ThreadAttributes &threadAttributes, int minThreads, int maxThreads, int maxIdleTime, bslma::Allocator *basicAllocator=0)
bsl::function< void()> Job
Definition bdlmt_multiqueuethreadpool.h:651
bool isPaused(int id) const
Definition bdlmt_multiqueuethreadpool.h:1132
int numElements() const
Definition bdlmt_multiqueuethreadpool.h:1146
int enqueueJob(int id, const Job &functor)
Definition bdlmt_multiqueuethreadpool.h:1047
void numProcessedReset(int *numExecuted, int *numEnqueued, int *numDeleted=0)
Definition bdlmt_multiqueuethreadpool.h:1066
int setBatchSize(int id, int batchSize)
Definition bdlmt_multiqueuethreadpool.h:1085
const ThreadPool & threadPool() const
Definition bdlmt_multiqueuethreadpool.h:1191
int batchSize(int id) const
Definition bdlmt_multiqueuethreadpool.h:1104
int addJobAtFront(int id, const Job &functor)
Definition bdlmt_multiqueuethreadpool.h:1028
void numProcessed(int *numExecuted, int *numEnqueued, int *numDeleted=0) const
Definition bdlmt_multiqueuethreadpool.h:1168
int deleteQueue(int id, const CleanupFunctor &cleanupFunctor)
bool isEnabled(int id) const
Definition bdlmt_multiqueuethreadpool.h:1118
friend class MultiQueueThreadPool_Queue
Definition bdlmt_multiqueuethreadpool.h:639
int numQueues() const
Definition bdlmt_multiqueuethreadpool.h:1183
BSLMF_NESTED_TRAIT_DECLARATION(MultiQueueThreadPool, bslma::UsesBslmaAllocator)
bsl::map< int, MultiQueueThreadPool_Queue * > QueueRegistry
Definition bdlmt_multiqueuethreadpool.h:653
MultiQueueThreadPool(ThreadPool *threadPool, bslma::Allocator *basicAllocator=0)
bsl::function< void()> CleanupFunctor
Definition bdlmt_multiqueuethreadpool.h:652
Definition bdlmt_threadpool.h:449
int enabled() const
Return the state (enabled or not) of the thread pool.
Definition bdlmt_threadpool.h:775
size_type size() const BSLS_KEYWORD_NOEXCEPT
Return the number of elements contained by this deque.
Definition bslstl_deque.h:2074
Definition bslstl_deque.h:772
Forward declaration.
Definition bslstl_function.h:934
Definition bslstl_map.h:619
BloombergLP::bslstl::TreeIterator< const value_type, Node, difference_type > const_iterator
Definition bslstl_map.h:724
iterator end() BSLS_KEYWORD_NOEXCEPT
Definition bslstl_map.h:2759
iterator find(const key_type &key)
Definition bslstl_map.h:1542
size_type size() const BSLS_KEYWORD_NOEXCEPT
Return the number of elements in this map.
Definition bslstl_map.h:3518
BloombergLP::bslstl::TreeIterator< value_type, Node, difference_type > iterator
Definition bslstl_map.h:722
Definition bslma_allocator.h:457
Definition bslmt_condition.h:220
Definition bslmt_latch.h:349
Definition bslmt_lockguard.h:234
Definition bslmt_mutex.h:315
Definition bslmt_readlockguard.h:287
Definition bslmt_readerwritermutex.h:244
Definition bslmt_threadattributes.h:356
Definition bslmt_writelockguard.h:221
Definition bsls_atomic.h:743
int swap(int swapValue)
Definition bsls_atomic.h:1690
#define BSLS_ASSERT_SAFE(X)
Definition bsls_assert.h:1762
#define BSLS_IDENT(str)
Definition bsls_ident.h:195
Definition bdlmt_eventscheduler.h:522
bsl::function< void(void *, bslma::Allocator *)> DefaultCreator
Definition bdlcc_objectpool.h:419
Definition bslma_usesbslmaallocator.h:343
Imp::Handle Handle
Definition bslmt_threadutil.h:385