BDE 4.14.0 Production release
Loading...
Searching...
No Matches
bdlmt_fixedthreadpool.h
Go to the documentation of this file.
1/// @file bdlmt_fixedthreadpool.h
2///
3/// The content of this file has been pre-processed for Doxygen.
4///
5
6
7// bdlmt_fixedthreadpool.h -*-C++-*-
8#ifndef INCLUDED_BDLMT_FIXEDTHREADPOOL
9#define INCLUDED_BDLMT_FIXEDTHREADPOOL
10
11#include <bsls_ident.h>
12BSLS_IDENT("$Id: $")
13
14/// @defgroup bdlmt_fixedthreadpool bdlmt_fixedthreadpool
15/// @brief Provide portable implementation for a fixed-size pool of threads.
16/// @addtogroup bdl
17/// @{
18/// @addtogroup bdlmt
19/// @{
20/// @addtogroup bdlmt_fixedthreadpool
21/// @{
22///
23/// <h1> Outline </h1>
24/// * <a href="#bdlmt_fixedthreadpool-purpose"> Purpose</a>
25/// * <a href="#bdlmt_fixedthreadpool-classes"> Classes </a>
26/// * <a href="#bdlmt_fixedthreadpool-description"> Description </a>
27/// * <a href="#bdlmt_fixedthreadpool-thread-safety"> Thread Safety </a>
28/// * <a href="#bdlmt_fixedthreadpool-synchronous-signals-on-unix"> Synchronous Signals on Unix </a>
29/// * <a href="#bdlmt_fixedthreadpool-thread-names-for-sub-threads"> Thread Names for Sub-Threads </a>
30/// * <a href="#bdlmt_fixedthreadpool-usage"> Usage </a>
31/// * <a href="#bdlmt_fixedthreadpool-setting-fixedthreadpool-attributes"> Setting FixedThreadPool Attributes </a>
32/// * <a href="#bdlmt_fixedthreadpool-the-void-functionvoid-pointer-interface"> The "void functionvoid pointer" Interface </a>
33/// * <a href="#bdlmt_fixedthreadpool-the-functor-interface"> The Functor Interface </a>
34///
35/// # Purpose {#bdlmt_fixedthreadpool-purpose}
36/// Provide portable implementation for a fixed-size pool of threads.
37///
38/// # Classes {#bdlmt_fixedthreadpool-classes}
39///
40/// - bdlmt::FixedThreadPool: portable fixed-size thread pool
41///
42/// @see bdlmt_threadpool
43///
44/// # Description {#bdlmt_fixedthreadpool-description}
45/// This component defines a portable and efficient implementation
46/// of a thread pool, `bdlmt::FixedThreadPool`, that can be used to distribute
47/// various user-defined functions ("jobs") to a separate threads to execute the
48/// jobs concurrently. Each thread pool object manages a fixed number of
49/// processing threads and can hold up to a fixed maximum number of pending
50/// jobs.
51///
52/// `bdlmt::FixedThreadPool` implements a queuing mechanism that distributes
53/// work among the threads. Jobs are queued for execution as they arrive, and
54/// each queued job is processed by the next available thread. If each of the
55/// concurrent threads is busy processing a job, new jobs will remain enqueued
56/// until a thread becomes available. If the queue capacity is reached,
57/// enqueuing jobs will block until threads consume more jobs from the queue,
58/// causing its length to drop below its capacity. Both the queue's capacity
59/// and number of threads are specified at construction and cannot be changed.
60///
61/// The thread pool provides two interfaces for specifying jobs: the commonly
62/// used "void function/void pointer" interface and the more versatile functor
63/// based interface. The void function/void pointer interface allows callers to
64/// use a C-style function to be executed as a job. The application need only
65/// specify the address of the function, and a single void pointer argument, to
66/// be passed to the function. The specified function will be invoked with the
67/// specified argument by the processing thread. The functor based interface
68/// allows for more flexible job execution such as the invocation of member
69/// functions or the passing of multiple user-defined arguments. See the `bdef`
70/// package-level documentation for more on functors and their usage.
71///
72/// Unlike a `bdlmt::ThreadPool`, an application can not tune a
73/// `bdlmt::FixedThreadPool` once it is created with a specified number of
74/// threads and queue capacity, hence the name "fixed" thread pool. An
75/// application can, however, specify the attributes of the threads in the pool
76/// (e.g., thread priority or stack size), by providing a
77/// `bslmt::ThreadAttributes` object with the desired values set. See
78/// @ref bslmt_threadutil package documentation for a description of
79/// `bslmt::ThreadAttributes`.
80///
81/// Thread pools are ideal for developing multi-threaded server applications. A
82/// server need only package client requests to execute as jobs, and
83/// `bdlmt::FixedThreadPool` will handle the queue management, thread
84/// management, and request dispatching. Thread pools are also well suited for
85/// parallelizing certain types of application logic. Without any complex or
86/// redundant thread management code, an application can easily create a thread
87/// pool, enqueue a series of jobs to be executed, and wait until all the jobs
88/// have executed.
89///
90/// ## Thread Safety {#bdlmt_fixedthreadpool-thread-safety}
91///
92///
93/// The `bdlmt::FixedThreadPool` class is both *fully thread-safe* (i.e., all
94/// non-creator methods can correctly execute concurrently), and is
95/// *thread-enabled* (i.e., the classes does not function correctly in a
96/// non-multi-threading environment). See @ref bsldoc_glossary for complete
97/// definitions of *fully thread-safe* and *thread-enabled*.
98///
99/// ## Synchronous Signals on Unix {#bdlmt_fixedthreadpool-synchronous-signals-on-unix}
100///
101///
102/// A thread pool ensures that, on unix platforms, all the threads in the pool
103/// block all asynchronous signals. Specifically all the signals, except the
104/// following synchronous signals are blocked:
105/// @code
106/// SIGBUS
107/// SIGFPE
108/// SIGILL
109/// SIGSEGV
110/// SIGSYS
111/// SIGABRT
112/// SIGTRAP
113/// SIGIOT
114/// @endcode
115///
116/// ## Thread Names for Sub-Threads {#bdlmt_fixedthreadpool-thread-names-for-sub-threads}
117///
118///
119/// To facilitate debugging, users can provide a thread name as the `threadName`
120/// attribute of the `bslmt::ThreadAttributes` argument passed to the
121/// constructor, that will be used for all the sub-threads. The thread name
122/// should not be used programmatically, but will appear in debugging tools on
123/// platforms that support naming threads to help users identify the source and
124/// purpose of a thread. If no `ThreadAttributes` object is passed, or if the
125/// `threadName` attribute is not set, the default value "bdl.FixedPool" will be
126/// used.
127///
128/// ## Usage {#bdlmt_fixedthreadpool-usage}
129///
130///
131/// This example demonstrates the use of a `bdlmt::FixedThreadPool` to
132/// parallelize a segment of program logic. The example implements a
133/// multi-threaded file search utility. The utility searches multiple files for
134/// a string, similar to the Unix command `fgrep`; the use of a
135/// `bdlmt::FixedThreadPool` allows the utility to search multiple files
136/// concurrently.
137///
138/// The example program will take as input a string and a list of files to
139/// search. The program creates a `bdlmt::FixedThreadPool`, and then enqueues a
140/// single "job" for each file to be searched. Each thread in the pool will
141/// take a job from the queue, open the file, and search for the string. If a
142/// match is found, the job adds the filename to an array of matching filenames.
143/// Because this array of filenames is shared across multiple jobs and across
144/// multiple threads, access to the array is controlled via a `bslmt::Mutex`.
145///
146/// ### Setting FixedThreadPool Attributes {#bdlmt_fixedthreadpool-setting-fixedthreadpool-attributes}
147///
148///
149/// To get started, we declare thread attributes, to be used in constructing the
150/// thread pool. In this example, our choices for number of threads and queue
151/// capacity are arbitrary.
152/// @code
153/// #define SEARCH_THREADS 10
154/// #define SEARCH_QUEUE_CAPACITY 50
155/// @endcode
156/// Below is the structure that will be used to pass arguments to the file
157/// search function. Since each job will be searching a separate file, a
158/// distinct instance of the structure will be used for each job.
159/// @code
160/// struct my_FastSearchJobInfo {
161/// const bsl::string *d_word; // word to search for
162/// const bsl::string *d_path; // path of the file to search
163/// bslmt::Mutex *d_mutex; // mutex to control access to the
164/// // result file list
165/// bsl::vector<bsl::string> *d_outList; // list of matching files
166/// };
167/// @endcode
168///
169/// ### The "void functionvoid pointer" Interface {#bdlmt_fixedthreadpool-the-void-functionvoid-pointer-interface}
170///
171///
172/// `myFastSearchJob` is the search function to be executed as a job by threads
173/// in the thread pool, matching the "void function/void pointer" interface.
174/// The single `void *` argument is received and cast to point to a
175/// `struct my_FastSearchJobInfo`, which then points to the search string and a
176/// single file to be searched. Note that different `my_FastSearchJobInfo`
177/// structures for the same search request will differ only in the attribute
178/// `d_path`, which points to a specific filename among the set of files to be
179/// searched; other fields will be identical across all structures for a given
180/// Fast Search.
181///
182/// See the following section for an illustration of the functor interface.
183/// @code
184/// static void myFastSearchJob(void *arg)
185/// {
186/// my_FastSearchJobInfo *job = (my_FastSearchJobInfo*)arg;
187/// FILE *file;
188///
189/// file = fopen(job->d_path->c_str(), "r");
190///
191/// if (file) {
192/// char buffer[1024];
193/// size_t nread;
194/// int wordLen = job->d_word->length();
195/// const char *word = job->d_word->c_str();
196///
197/// nread = fread(buffer, 1, sizeof(buffer) - 1, file);
198/// while (nread >= wordLen) {
199/// buffer[nread] = 0;
200/// if (strstr(buffer, word)) {
201/// @endcode
202/// If we find a match, we add the file to the result list and return. Since
203/// the result list is shared among multiple processing threads, we use a mutex
204/// lock to regulate access to the list. We use a `bslmt::LockGuard` to manage
205/// access to the mutex lock. This template object acquires a mutex lock on
206/// `job->d_mutex` at construction, releases that lock on destruction. Thus,
207/// the mutex will be locked within the scope of the `if` block, and released
208/// when the program exits that scope.
209///
210/// See @ref bslmt_threadutil for information about the `bslmt::Mutex` class, and
211/// component @ref bslmt_lockguard for information about the `bslmt::LockGuard`
212/// template class.
213/// @code
214/// bslmt::LockGuard<bslmt::Mutex> lock(job->d_mutex);
215/// job->d_outList->push_back(*job->d_path);
216/// break; // bslmt::LockGuard destructor unlocks mutex.
217/// }
218/// memcpy(buffer, &buffer[nread - wordLen - 1], wordLen - 1);
219/// nread = fread(buffer + wordLen - 1, 1, sizeof(buffer) - wordLen,
220/// file);
221/// }
222/// fclose(file);
223/// }
224/// }
225/// @endcode
226/// Routine `myFastSearch` is the main driving routine, taking three arguments:
227/// a single string to search for (`word`), a list of files to search, and an
228/// output list of files. When the function completes, the file list will
229/// contain the names of files where a match was found.
230/// @code
231/// void myFastSearch(const bsl::string& word,
232/// const bsl::vector<bsl::string>& fileList,
233/// bsl::vector<bsl::string>& outFileList)
234/// {
235/// bslmt::Mutex mutex;
236/// bslmt::ThreadAttributes defaultAttributes;
237/// @endcode
238/// We initialize the thread pool using default thread attributes. We then
239/// start the pool so that the threads can begin while we prepare the jobs.
240/// @code
241/// bdlmt::FixedThreadPool pool(defaultAttributes,
242/// SEARCH_THREADS,
243/// SEARCH_QUEUE_CAPACITY);
244///
245/// if (0 != pool.start()) {
246/// bsl::cerr << "Thread start() failed. Thread quota exceeded?"
247/// << bsl::endl;
248/// exit(1);
249/// }
250/// @endcode
251/// For each file to be searched, we create the job info structure that will be
252/// passed to the search function and add the job to the pool.
253///
254/// As noted above, all jobs will share a single mutex to guard the output file
255/// list. Function `myFastSearchJob` uses a `bslmt::LockGuard` on this mutex to
256/// serialize access to the list.
257/// @code
258/// int count = fileList.size();
259/// my_FastSearchJobInfo *jobInfoArray = new my_FastSearchJobInfo[count];
260///
261/// for (int i = 0; i < count; ++i) {
262/// my_FastSearchJobInfo &job = jobInfoArray[i];
263/// job.d_word = &word;
264/// job.d_path = &fileList[i];
265/// job.d_mutex = &mutex;
266/// job.d_outList = &outFileList;
267/// pool.enqueueJob(myFastSearchJob, &job);
268/// }
269/// @endcode
270/// Now we simply wait for all the jobs in the queue to complete. Any matched
271/// files should have been added to the output file list.
272/// @code
273/// pool.drain();
274/// delete[] jobInfoArray;
275/// }
276/// @endcode
277///
278/// ### The Functor Interface {#bdlmt_fixedthreadpool-the-functor-interface}
279///
280///
281/// The "void function/void pointer" convention is idiomatic for C programs.
282/// The `void` pointer argument provides a generic way of passing in user data,
283/// without regard to the data type. Clients who prefer better or more explicit
284/// type safety may wish to use the Functor Interface instead. This interface
285/// uses `bsl::function` to provide type-safe wrappers that can match argument
286/// number and type for a C++ free function or member function.
287///
288/// To illustrate the Functor Interface, we will make two small changes to the
289/// usage example above. First, we change the signature of the function that
290/// executes a single job, so that it uses a `my_FastSearchJobInfo` pointer
291/// rather than a `void` pointer. With this change, we can remove the first
292/// executable statement, which casts the `void *` pointer to
293/// `my_FastSearchJobInfo *`.
294/// @code
295/// static void myFastFunctorSearchJob(my_FastSearchJobInfo *job)
296/// {
297/// FILE *file;
298///
299/// file = fopen(job->d_path->c_str(), "r");
300/// // the rest of the function is unchanged.
301/// @endcode
302/// Next, we make a change to the loop that enqueues the jobs in `myFastSearch`.
303/// We create a functor - a C++ object that acts as a function. The thread pool
304/// will "execute" this functor (by calling its `operator()` member function) on
305/// a thread when one becomes available.
306/// @code
307/// for (int i = 0; i < count; ++i) {
308/// my_FastSearchJobInfo &job = jobInfoArray[i];
309/// job.d_word = &word;
310/// job.d_path = &fileList[i];
311/// job.d_mutex = &mutex;
312/// job.d_outList = &outFileList;
313///
314/// bsl::function<void()> jobHandle =
315/// bdlf::BindUtil::bind(&myFastFunctorSearchJob, &job);
316/// pool.enqueueJob(jobHandle);
317/// }
318/// @endcode
319/// Use of `bsl::function` and `bdlf::BindUtil` is described in the `bdef`
320/// package documentation. For this example, it is important to note that
321/// `jobHandle` is a functor object, and that `bdlf::BindUtil::bind` populates
322/// that functor object with a function pointer (to the `void` function
323/// `myFastFunctorSearchJob`) and user data (`&job`). When the functor is
324/// executed via `operator()`, it will in turn execute the
325/// `myFastFunctorSearchJob` function with the supplied data as its argument.
326///
327/// Note also that the functor is created locally and handed to the thread pool.
328/// The thread pool copies the functor onto its internal queue, and takes
329/// responsibility for the copied functor until execution is complete.
330///
331/// The function is completed exactly as it was in the previous example.
332/// @code
333/// pool.drain();
334/// delete[] jobInfoArray;
335/// }
336/// @endcode
337/// @}
338/** @} */
339/** @} */
340
341/** @addtogroup bdl
342 * @{
343 */
344/** @addtogroup bdlmt
345 * @{
346 */
347/** @addtogroup bdlmt_fixedthreadpool
348 * @{
349 */
350
351#include <bdlscm_version.h>
352
353#include <bdlcc_boundedqueue.h>
354
355#include <bdlf_bind.h>
356
357#include <bdlm_metricsregistry.h>
358
359#include <bslma_allocator.h>
360
361#include <bslmf_movableref.h>
362
363#include <bslmt_barrier.h>
364#include <bslmt_lockguard.h>
365#include <bslmt_mutex.h>
367#include <bslmt_threadutil.h>
368#include <bslmt_threadgroup.h>
369
370#include <bsls_assert.h>
371#include <bsls_atomic.h>
372#include <bsls_platform.h>
373
374#include <bsl_cstdlib.h>
375#include <bsl_functional.h>
376#include <bsl_string.h>
377
378#ifndef BDE_DONT_ALLOW_TRANSITIVE_INCLUDES
379
380#include <bdlcc_fixedqueue.h>
381
382#include <bslmt_condition.h>
383#include <bslmt_semaphore.h>
384
385#include <bsl_algorithm.h>
386
387#endif // BDE_DONT_ALLOW_TRANSITIVE_INCLUDES
388
389
390
391#ifndef BDE_OMIT_INTERNAL_DEPRECATED
392
393extern "C" typedef void (*bcep_FixedThreadPoolJobFunc)(void *);
394 // This type declares the prototype for functions that are suitable to
395 // be specified 'bdlmt::FixedThreadPool::enqueueJob'.
396
397#endif // BDE_OMIT_INTERNAL_DEPRECATED
398
399namespace bdlmt {
400
401/// This type declares the prototype for functions that are suitable to be
402/// specified `bdlmt::FixedThreadPool::enqueueJob`.
403extern "C" typedef void (*FixedThreadPoolJobFunc)(void *);
404
405 // =====================
406 // class FixedThreadPool
407 // =====================
408
409/// This class implements a thread pool used for concurrently executing
410/// multiple user-defined functions ("jobs").
411///
412/// See @ref bdlmt_fixedthreadpool
414
415 public:
416 // TYPES
417 typedef bsl::function<void()> Job;
419
420 // PUBLIC CONSTANTS
421 enum {
426 };
427
428 enum {
429 e_STOP
433#ifndef BDE_OMIT_INTERNAL_DEPRECATED
438#endif // BDE_OMIT_INTERNAL_DEPRECATED
439 };
440
441 private:
442 // PRIVATE CLASS DATA
443 static const char s_defaultThreadName[16]; // Thread name to use
444 // when none is
445 // specified.
446
447 // PRIVATE DATA
448 Queue d_queue; // underlying queue
449
450 bsls::AtomicInt d_numActiveThreads; // number of threads
451 // processing jobs
452
453 bsls::AtomicBool d_drainFlag; // set when draining
454
455 bslmt::Barrier d_barrier; // barrier to sync threads
456 // during `start` and `drain`
457
458 bslmt::Mutex d_metaMutex; // mutex to ensure that there
459 // is only one controlling
460 // thread at any time
461
462 bslmt::ThreadGroup d_threadGroup; // threads used by this pool
463
464 bslmt::ThreadAttributes d_threadAttributes; // thread attributes to be
465 // used when constructing
466 // processing threads
467
468 const int d_numThreads; // number of configured
469 // processing threads.
470
471#if defined(BSLS_PLATFORM_OS_UNIX)
472 sigset_t d_blockSet; // set of signals to be
473 // blocked in managed threads
474#endif
475
477 d_backlogHandle; // backlog metric handle
478
479
481 d_usedCapacityHandle; // used capacity metric
482 // handle
483
484 // PRIVATE MANIPULATORS
485
486 /// Initialize this thread pool using the stored attributes and the
487 /// specified `metricsRegistry` and `threadPoolName`. If
488 /// `metricsRegistry` is 0, `bdlm::MetricsRegistry::singleton()` is
489 /// used.
490 void initialize(bdlm::MetricsRegistry *metricsRegistry,
491 const bsl::string_view& threadPoolName);
492
493 /// The main function executed by each worker thread.
494 void workerThread();
495
496 /// Internal method to spawn a new processing thread and increment the
497 /// current count. Note that this method must be called with
498 /// `d_metaMutex` locked.
499 int startNewThread();
500
501 // NOT IMPLEMENTED
503 FixedThreadPool& operator=(const FixedThreadPool&);
504
505 public:
506 // CREATORS
507
508 /// Construct a thread pool with the specified `numThreads` number of
509 /// threads and a job queue of capacity sufficient to enqueue the
510 /// specified `maxNumPendingJobs` without blocking. Optionally specify
511 /// a `basicAllocator` used to supply memory. If `basicAllocator` is 0,
512 /// the currently installed default allocator is used. The name used for
513 /// created threads is "bdl.FixedPool". The behavior is undefined unless
514 /// `1 <= numThreads`.
516 int maxNumPendingJobs,
517 bslma::Allocator *basicAllocator = 0);
518
519 /// Construct a thread pool with the specified `numThreads` number of
520 /// threads, a job queue of capacity sufficient to enqueue the specified
521 /// `maxNumPendingJobs` without blocking, the specified
522 /// `threadPoolName` to be used to identify this thread pool, and the
523 /// specified `metricsRegistry` to be used for reporting metrics. If
524 /// `metricsRegistry` is 0, `bdlm::MetricsRegistry::singleton()` is
525 /// used. Optionally specify a'basicAllocator' used to supply memory.
526 /// If `basicAllocator` is 0, the currently installed default allocator
527 /// is used. The name used for created threads is `threadPoolName` if not
528 /// empty, otherwise "bdl.FixedPool". The behavior is undefined unless
529 /// `1 <= numThreads`.
531 int maxNumPendingJobs,
532 const bsl::string_view& threadPoolName,
533 bdlm::MetricsRegistry *metricsRegistry,
534 bslma::Allocator *basicAllocator = 0);
535
536 /// Construct a thread pool with the specified `threadAttributes`,
537 /// `numThreads` number of threads, and a job queue with capacity
538 /// sufficient to enqueue the specified `maxNumPendingJobs` without
539 /// blocking. Optionally specify a `basicAllocator` used to supply
540 /// memory. If `basicAllocator` is 0, the currently installed default
541 /// allocator is used. The name used for created threads is
542 /// `threadAttributes.threadName()` if not empty, otherwise
543 /// "bdl.FixedPool". The behavior is undefined unless `1 <= numThreads`.
545 int numThreads,
546 int maxNumPendingJobs,
547 bslma::Allocator *basicAllocator = 0);
548
549 /// Construct a thread pool with the specified `threadAttributes`,
550 /// `numThreads` number of threads, a job queue with capacity sufficient
551 /// to enqueue the specified `maxNumPendingJobs` without blocking, the
552 /// specified `threadPoolName` to be used to identify this thread
553 /// pool, and the specified `metricsRegistry` to be used for reporting
554 /// metrics. If `metricsRegistry` is 0,
555 /// `bdlm::MetricsRegistry::singleton()` is used. Optionally specify a
556 /// `basicAllocator` used to supply memory. If `basicAllocator` is 0,
557 /// the currently installed default allocator is used. The name used for
558 /// created threads is `threadAttributes.threadName()` if not empty,
559 /// otherwise `threadPoolName` if not empty, otherwise "bdl.FixedPool".
560 /// The behavior is undefined unless `1 <= numThreads`.
562 int numThreads,
563 int maxNumPendingJobs,
564 const bsl::string_view& threadPoolName,
565 bdlm::MetricsRegistry *metricsRegistry,
566 bslma::Allocator *basicAllocator = 0);
567
568 /// Remove all pending jobs from the queue without executing them, block
569 /// until all currently running jobs complete, and then destroy this
570 /// thread pool.
572
573 // MANIPULATORS
574
575 /// Disable enqueueing into this pool. All subsequent invocations of
576 /// `enqueueJob` or `tryEnqueueJob` will fail immediately. All blocked
577 /// invocations of `enqueueJob` will fail immediately. If the pool is
578 /// already enqueue disabled, this method has no effect. Note that this
579 /// method has no effect on jobs currently in the pool.
580 void disable();
581
582 /// Enable queuing into this pool. If the queue is not enqueue
583 /// disabled, this call has no effect.
584 void enable();
585
586 int enqueueJob(const Job& functor);
587 /// Enqueue the specified `functor` to be executed by the next available
588 /// thread. Return 0 on success, and a non-zero value otherwise.
589 /// Specifically, return `e_SUCCESS` on success, `e_DISABLED` if
590 /// `!isEnabled()`, and `e_FAILED` if an error occurs. This operation
591 /// will block if there is not sufficient capacity in the underlying
592 /// queue until there is free capacity to successfully enqueue this job.
593 /// Threads blocked (on enqueue methods) due to the underlying queue
594 /// being full will unblock and return `e_DISABLED` if `disable` is
595 /// invoked (on another thread). The behavior is undefined unless
596 /// `functor` is not null.
598
599 /// Enqueue the specified `function` to be executed by the next
600 /// available thread. The specified `userData` pointer will be passed
601 /// to the function by the processing thread. Return 0 on success, and
602 /// a non-zero value otherwise. Specifically, return `e_SUCCESS` on
603 /// success, `e_DISABLED` if `!isEnabled()`, and `e_FAILED` if an error
604 /// occurs. This operation will block if there is not sufficient
605 /// capacity in the underlying queue until there is free capacity to
606 /// successfully enqueue this job. Threads blocked (on enqueue methods)
607 /// due to the underlying queue being full will unblock and return
608 /// `e_DISABLED` if `disable` is invoked (on another thread). The
609 /// behavior is undefined unless `function` is not null.
610 int enqueueJob(FixedThreadPoolJobFunc function, void *userData);
611
612 int tryEnqueueJob(const Job& functor);
613 /// Enqueue the specified `functor` to be executed by the next available
614 /// thread. Return 0 on success, and a non-zero value otherwise.
615 /// Specifically, return `e_SUCCESS` on success, `e_DISABLED` if
616 /// `!isEnabled()`, `e_FULL` if `isEnabled()` and the underlying queue
617 /// was full, and `e_FAILED` if an error occurs. The behavior is
618 /// undefined unless `functor` is not null.
620
621 /// Enqueue the specified `function` to be executed by the next
622 /// available thread. The specified `userData` pointer will be passed
623 /// to the function by the processing thread. Return 0 on success, and
624 /// a non-zero value otherwise. Specifically, return `e_SUCCESS` on
625 /// success, `e_DISABLED` if `!isEnabled()`, `e_FULL` if `isEnabled()`
626 /// and the underlying queue was full, and `e_FAILED` if an error
627 /// occurs. The behavior is undefined unless `function` is not null.
628 int tryEnqueueJob(FixedThreadPoolJobFunc function, void *userData);
629
630 /// Wait until the underlying queue is empty without disabling this pool
631 /// (and may thus wait indefinitely), and then wait until all executing
632 /// jobs complete. If the thread pool was not already started
633 /// (`isStarted()` is `false`), this method has no effect. Note that if
634 /// any jobs are submitted concurrently with this method, this method
635 /// may or may not wait until they have also completed.
636 void drain();
637
638 /// Disable enqueuing jobs on this thread pool, cancel all pending jobs,
639 /// wait until all active jobs complete, and join all processing
640 /// threads. If the thread pool was not already started (`isStarted()`
641 /// is `false`), this method has no effect. At the completion of this
642 /// method, `false == isStarted()`.
643 void shutdown();
644
645 /// Spawn threads until there are `numThreads()` processing threads. On
646 /// success, enable enqueuing and return 0. Otherwise, join all threads
647 /// (ensuring `false == isStarted()`) and return -1. If the thread pool
648 /// was already started (`isStarted()` is `true`), this method has no
649 /// effect.
650 int start();
651
652 /// Disable enqueuing jobs on this thread pool, wait until all active
653 /// and pending jobs complete, and join all processing threads. If the
654 /// thread pool was not already started (`isStarted()` is `false`), this
655 /// method has no effect. At the completion of this method,
656 /// `false == isStarted()`.
657 void stop();
658
659 // ACCESSORS
660
661 /// Return `true` if enqueuing jobs is enabled on this thread pool, and
662 /// `false` otherwise.
663 bool isEnabled() const;
664
665 /// Return `true` if `numThreads()` are started on this threadpool and
666 /// `false` otherwise (indicating that 0 threads are started on this
667 /// thread pool.)
668 bool isStarted() const;
669
670 /// Return a snapshot of the number of threads that are currently
671 /// processing a job for this threadpool.
672 int numActiveThreads() const;
673
674 /// Return a snapshot of the number of jobs currently enqueued to be
675 /// processed by thread pool.
676 int numPendingJobs() const;
677
678 /// Return the number of threads passed to this thread pool at
679 /// construction.
680 int numThreads() const;
681
682 /// Return a snapshot of the number of threads currently started by this
683 /// thread pool.
684 int numThreadsStarted() const;
685
686 /// Return the capacity of the queue used to enqueue jobs by this thread
687 /// pool.
688 int queueCapacity() const;
689};
690
691// ============================================================================
692// INLINE DEFINITIONS
693// ============================================================================
694
695 // ---------------------
696 // class FixedThreadPool
697 // ---------------------
698
699// MANIPULATORS
700inline
702{
703 d_queue.disablePushBack();
704}
705
706inline
708{
709 d_queue.enablePushBack();
710}
711
712inline
714{
715 BSLS_ASSERT(functor);
716
717 return d_queue.pushBack(functor);
718}
719
720inline
727
728inline
730 void *userData)
731{
732 BSLS_ASSERT(0 != function);
733
734 return enqueueJob(bdlf::BindUtil::bindR<void>(function, userData));
735}
736
737inline
739{
740 BSLS_ASSERT(functor);
741
742 return d_queue.tryPushBack(functor);
743}
744
745inline
752
753inline
755 void *userData)
756{
757 BSLS_ASSERT(0 != function);
758
759 return tryEnqueueJob(bdlf::BindUtil::bindR<void>(function, userData));
760}
761
762inline
764{
765 bslmt::LockGuard<bslmt::Mutex> lock(&d_metaMutex);
766
767 if (isStarted()) {
768 d_queue.waitUntilEmpty();
769
770 d_drainFlag = true;
771 d_queue.disablePopFront();
772 d_barrier.wait();
773
774 d_drainFlag = false;
775 d_queue.enablePopFront();
776 d_barrier.wait();
777 }
778}
779
780inline
782{
783 bslmt::LockGuard<bslmt::Mutex> lock(&d_metaMutex);
784
785 if (isStarted()) {
786 d_queue.disablePushBack();
787 d_queue.disablePopFront();
788 d_threadGroup.joinAll();
789 d_queue.removeAll();
790 }
791}
792
793inline
795{
796 bslmt::LockGuard<bslmt::Mutex> lock(&d_metaMutex);
797
798 if (isStarted()) {
799 d_queue.disablePushBack();
800 d_queue.waitUntilEmpty();
801 d_queue.disablePopFront();
802 d_threadGroup.joinAll();
803 }
804}
805
806// ACCESSORS
807inline
809{
810 return !d_queue.isPushBackDisabled();
811}
812
813inline
815{
816 return d_numThreads == d_threadGroup.numThreads();
817}
818
819inline
821{
822 return d_numActiveThreads.loadAcquire();
823}
824
825inline
827{
828 return static_cast<int>(d_queue.numElements());
829}
830
831inline
833{
834 return d_numThreads;
835}
836
837inline
839{
840 return d_threadGroup.numThreads();
841}
842
843inline
845{
846 return static_cast<int>(d_queue.capacity());
847}
848
849} // close package namespace
850
851
852#endif
853
854// ----------------------------------------------------------------------------
855// Copyright 2024 Bloomberg Finance L.P.
856//
857// Licensed under the Apache License, Version 2.0 (the "License");
858// you may not use this file except in compliance with the License.
859// You may obtain a copy of the License at
860//
861// http://www.apache.org/licenses/LICENSE-2.0
862//
863// Unless required by applicable law or agreed to in writing, software
864// distributed under the License is distributed on an "AS IS" BASIS,
865// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
866// See the License for the specific language governing permissions and
867// limitations under the License.
868// ----------------------------- END-OF-FILE ----------------------------------
869
870/** @} */
871/** @} */
872/** @} */
Definition bdlcc_boundedqueue.h:415
bsl::size_t capacity() const
Definition bdlcc_boundedqueue.h:1381
int waitUntilEmpty() const
Definition bdlcc_boundedqueue.h:1422
void enablePushBack()
Definition bdlcc_boundedqueue.h:1373
void removeAll()
Definition bdlcc_boundedqueue.h:1179
void enablePopFront()
Definition bdlcc_boundedqueue.h:1366
@ e_FAILED
Definition bdlcc_boundedqueue.h:619
@ e_DISABLED
Definition bdlcc_boundedqueue.h:618
@ e_SUCCESS
Definition bdlcc_boundedqueue.h:615
@ e_FULL
Definition bdlcc_boundedqueue.h:617
void disablePopFront()
Definition bdlcc_boundedqueue.h:1347
int pushBack(const TYPE &value)
Definition bdlcc_boundedqueue.h:1106
bsl::size_t numElements() const
Definition bdlcc_boundedqueue.h:1416
bool isPushBackDisabled() const
Definition bdlcc_boundedqueue.h:1409
int tryPushBack(const TYPE &value)
Definition bdlcc_boundedqueue.h:1265
void disablePushBack()
Definition bdlcc_boundedqueue.h:1359
Definition bdlm_metricsregistry.h:287
Definition bdlm_metricsregistry.h:199
Definition bdlmt_fixedthreadpool.h:413
int numActiveThreads() const
Definition bdlmt_fixedthreadpool.h:820
void shutdown()
Definition bdlmt_fixedthreadpool.h:781
int tryEnqueueJob(const Job &functor)
Definition bdlmt_fixedthreadpool.h:738
bdlcc::BoundedQueue< Job > Queue
Definition bdlmt_fixedthreadpool.h:418
void disable()
Definition bdlmt_fixedthreadpool.h:701
@ e_SUSPEND
Definition bdlmt_fixedthreadpool.h:431
@ e_DRAIN
Definition bdlmt_fixedthreadpool.h:432
@ BCEP_RUN
Definition bdlmt_fixedthreadpool.h:435
@ e_STOP
Definition bdlmt_fixedthreadpool.h:429
@ e_RUN
Definition bdlmt_fixedthreadpool.h:430
@ BCEP_SUSPEND
Definition bdlmt_fixedthreadpool.h:436
@ BCEP_DRAIN
Definition bdlmt_fixedthreadpool.h:437
@ BCEP_STOP
Definition bdlmt_fixedthreadpool.h:434
bsl::function< void()> Job
Definition bdlmt_fixedthreadpool.h:417
FixedThreadPool(const bslmt::ThreadAttributes &threadAttributes, int numThreads, int maxNumPendingJobs, const bsl::string_view &threadPoolName, bdlm::MetricsRegistry *metricsRegistry, bslma::Allocator *basicAllocator=0)
FixedThreadPool(int numThreads, int maxNumPendingJobs, const bsl::string_view &threadPoolName, bdlm::MetricsRegistry *metricsRegistry, bslma::Allocator *basicAllocator=0)
int numThreads() const
Definition bdlmt_fixedthreadpool.h:832
int queueCapacity() const
Definition bdlmt_fixedthreadpool.h:844
bool isStarted() const
Definition bdlmt_fixedthreadpool.h:814
void enable()
Definition bdlmt_fixedthreadpool.h:707
bool isEnabled() const
Definition bdlmt_fixedthreadpool.h:808
int numThreadsStarted() const
Definition bdlmt_fixedthreadpool.h:838
@ e_FULL
Definition bdlmt_fixedthreadpool.h:423
@ e_FAILED
Definition bdlmt_fixedthreadpool.h:425
@ e_SUCCESS
Definition bdlmt_fixedthreadpool.h:422
@ e_DISABLED
Definition bdlmt_fixedthreadpool.h:424
void drain()
Definition bdlmt_fixedthreadpool.h:763
int enqueueJob(const Job &functor)
Definition bdlmt_fixedthreadpool.h:713
int numPendingJobs() const
Definition bdlmt_fixedthreadpool.h:826
void stop()
Definition bdlmt_fixedthreadpool.h:794
FixedThreadPool(int numThreads, int maxNumPendingJobs, bslma::Allocator *basicAllocator=0)
FixedThreadPool(const bslmt::ThreadAttributes &threadAttributes, int numThreads, int maxNumPendingJobs, bslma::Allocator *basicAllocator=0)
Definition bslstl_stringview.h:441
Forward declaration.
Definition bslstl_function.h:934
Definition bslma_allocator.h:457
Definition bslmf_movableref.h:751
Definition bslmt_barrier.h:353
Definition bslmt_lockguard.h:234
Definition bslmt_mutex.h:315
Definition bslmt_threadattributes.h:356
Definition bslmt_threadgroup.h:156
int numThreads() const
Definition bslmt_threadgroup.h:299
Definition bsls_atomic.h:1472
Definition bsls_atomic.h:743
int loadAcquire() const
Definition bsls_atomic.h:1732
void(* bcep_FixedThreadPoolJobFunc)(void *)
Definition bdlmt_fixedthreadpool.h:393
#define BSLS_ASSERT(X)
Definition bsls_assert.h:1804
#define BSLS_IDENT(str)
Definition bsls_ident.h:195
Definition bdlmt_eventscheduler.h:522
void(* FixedThreadPoolJobFunc)(void *)
Definition bdlmt_fixedthreadpool.h:403
static MovableRef< t_TYPE > move(t_TYPE &reference) BSLS_KEYWORD_NOEXCEPT
Definition bslmf_movableref.h:1060
static t_TYPE & access(t_TYPE &ref) BSLS_KEYWORD_NOEXCEPT
Definition bslmf_movableref.h:1032