BDE 4.14.0 Production release
Loading...
Searching...
No Matches
bdlmt_threadpool.h
Go to the documentation of this file.
1/// @file bdlmt_threadpool.h
2///
3/// The content of this file has been pre-processed for Doxygen.
4///
5
6
7// bdlmt_threadpool.h -*-C++-*-
8#ifndef INCLUDED_BDLMT_THREADPOOL
9#define INCLUDED_BDLMT_THREADPOOL
10
11#include <bsls_ident.h>
12BSLS_IDENT("$Id: $")
13
14/// @defgroup bdlmt_threadpool bdlmt_threadpool
15/// @brief Provide portable implementation for a dynamic pool of threads.
16/// @addtogroup bdl
17/// @{
18/// @addtogroup bdlmt
19/// @{
20/// @addtogroup bdlmt_threadpool
21/// @{
22///
23/// <h1> Outline </h1>
24/// * <a href="#bdlmt_threadpool-purpose"> Purpose</a>
25/// * <a href="#bdlmt_threadpool-classes"> Classes </a>
26/// * <a href="#bdlmt_threadpool-description"> Description </a>
27/// * <a href="#bdlmt_threadpool-thread-safety"> Thread Safety </a>
28/// * <a href="#bdlmt_threadpool-synchronous-signals-on-unix"> Synchronous Signals on Unix </a>
29/// * <a href="#bdlmt_threadpool-usage"> Usage </a>
30/// * <a href="#bdlmt_threadpool-setting-threadpool-attributes"> Setting ThreadPool Attributes </a>
31/// * <a href="#bdlmt_threadpool-the-void-functionvoid-pointer-interface"> The "void functionvoid pointer" Interface </a>
32/// * <a href="#bdlmt_threadpool-the-functor-interface"> The Functor Interface </a>
33///
34/// # Purpose {#bdlmt_threadpool-purpose}
35/// Provide portable implementation for a dynamic pool of threads.
36///
37/// # Classes {#bdlmt_threadpool-classes}
38///
39/// - bdlmt::ThreadPool: portable dynamic thread pool
40///
41/// @see
42///
43/// # Description {#bdlmt_threadpool-description}
44/// This component defines a portable and efficient implementation
45/// of a thread pool that can be used to distribute various user-defined
46/// functions ("jobs") to separate threads and execute the jobs concurrently.
47/// The thread pool manages a dynamic set of processing threads, adding or
48/// removing threads to manage load, based upon user-defined parameters.
49///
50/// The pool uses a queue mechanism to distribute work among the threads. Jobs
51/// are queued for execution as they arrive, and each queued job is processed by
52/// the next available thread. If no threads are available, new threads are
53/// created dynamically (up to the application defined maximum number). If the
54/// maximum number of concurrent threads has been reached, new jobs will remain
55/// enqueued until a thread becomes available. If the threads become idle for
56/// longer than a user-defined maximum idle time, they are automatically
57/// destroyed, releasing unused resources. A client-defined minimum number of
58/// threads is always maintained even when there is no work to be done.
59///
60/// The thread pool provides two interfaces for specifying jobs: the commonly
61/// used "void function/void pointer" interface and the more versatile functor
62/// based interface. The void function/void pointer interface allows callers to
63/// use a C-style function to be executed as a job. The application need only
64/// specify the address of the function, and a single void pointer argument, to
65/// be passed to the function. The specified function will be invoked with the
66/// specified argument by the processing thread. The functor based interface
67/// allows for flexible job execution such as the invocation of member functions
68/// or the passing of multiple user-defined arguments. See the `bdef` package
69/// documentation for more on functors and their usage.
70///
71/// An application can tune the thread pool by adjusting the minimum and maximum
72/// number of threads in the pool, and the maximum amount of time that
73/// dynamically created threads can idle before being destroyed. To avoid
74/// unnecessary and inefficient thread creation/destruction, an application
75/// should select a value for the minimum number of threads that reflects the
76/// expected average load. A higher value for the maximum number of threads can
77/// be used to handle periodic bursts. An application can also specify the
78/// attributes of the threads in the pool (e.g., thread priority or stack size),
79/// by providing a `bslmt::ThreadAttributes` object with the desired values set.
80/// See @ref bslmt_threadutil package documentation for a description of
81/// `bslmt::ThreadAttributes`.
82///
83/// Thread pools are ideal for developing multi-threaded server applications. A
84/// server need only package client requests to execute as jobs, and
85/// `bdlmt::ThreadPool` will handle the queue management, thread management, and
86/// request dispatching. Thread pools are also well suited for parallelizing
87/// certain types of application logic. Without any complex or redundant thread
88/// management code, an application can easily create a thread pool, enqueue a
89/// series of jobs to be executed, and wait until all the jobs have executed.
90///
91/// ## Thread Safety {#bdlmt_threadpool-thread-safety}
92///
93///
94/// The `bdlmt::ThreadPool` class is both **fully thread-safe** (i.e., all
95/// non-creator methods can correctly execute concurrently), and is
96/// **thread-enabled** (i.e., the class does not function correctly in a
97/// non-multi-threading environment). See @ref bsldoc_glossary for complete
98/// definitions of **fully thread-safe** and **thread-enabled**.
99///
100/// ## Synchronous Signals on Unix {#bdlmt_threadpool-synchronous-signals-on-unix}
101///
102///
103/// A thread pool ensures that, on unix platforms, all the threads in the pool
104/// block all asynchronous signals. Specifically all the signals, except the
105/// following synchronous signals are blocked.
106///
107/// SIGBUS
108/// SIGFPE
109/// SIGILL
110/// SIGSEGV
111/// SIGSYS
112/// SIGABRT
113/// SIGTRAP
114/// SIGIOT
115///
116/// ## Usage {#bdlmt_threadpool-usage}
117///
118///
119/// This example demonstrates the use of a `bdlmt::ThreadPool` to parallelize a
120/// segment of program logic. The example implements a multi-threaded file
121/// search utility. The utility searches multiple files for a string, similar
122/// to the Unix command `fgrep`; the use of a `bdlmt::ThreadPool` allows the
123/// utility to search multiple files concurrently.
124///
125/// The example program will take as input a string and a list of files to
126/// search. The program creates a `bdlmt::ThreadPool`, and then enqueues a
127/// single "job" for each file to be searched. Each thread in the pool will
128/// take a job from the queue, open the file, and search for the string. If a
129/// match is found, the job adds the filename to an array of matching filenames.
130/// Because this array of filenames is shared across multiple jobs and across
131/// multiple threads, access to the array is controlled via a `bslmt::Mutex`.
132///
133/// ### Setting ThreadPool Attributes {#bdlmt_threadpool-setting-threadpool-attributes}
134///
135///
136/// To get started, we declare thread attributes, to be used in constructing the
137/// thread pool. In this example, our choices for minimum search threads and
138/// maximum idle time are arbitrary; we don't expect the thread pool to become
139/// idle, so the thread pool should not begin to delete unused threads before
140/// the program terminates.
141///
142/// However, a maximum number of 50 threads is meaningful, and may affect
143/// overall performance. The maximum should cover the expected peak, in this
144/// case, the maximum number of files to search. However, if the maximum is too
145/// large for a given platform, it may cause a bottleneck as the operating
146/// system spends significant resources switching context among multiple
147/// threads. Also we use a very short idle time since new jobs will arrive only
148/// at startup.
149/// @code
150/// const int MIN_SEARCH_THREADS = 10;
151/// const int MAX_SEARCH_THREADS = 50;
152/// const bsls::TimeInterval MAX_SEARCH_THREAD_IDLE(0, 100000000);
153/// @endcode
154/// Below is the structure that will be used to pass arguments to the file
155/// search function. Since each job will be searching a separate file, a
156/// distinct instance of the structure will be used for each job.
157/// @code
158/// struct my_FastSearchJobInfo {
159/// const bsl::string *d_word; // word to search for
160/// const bsl::string *d_path; // path of the file to search
161/// bslmt::Mutex *d_mutex; // mutex to control access to the
162/// // result file list
163/// bsl::vector<bsl::string> *d_outList; // list of matching files
164/// };
165/// @endcode
166///
167/// ### The "void functionvoid pointer" Interface {#bdlmt_threadpool-the-void-functionvoid-pointer-interface}
168///
169///
170/// `myFastSearchJob` is the search function to be executed as a job by threads
171/// in the thread pool, matching the "void function/void pointer" interface.
172/// The single `void *` argument is received and cast to point to a 'struct
173/// my_FastSearchJobInfo', which then points to the search string and a single
174/// file to be searched. Note that different `my_FastSearchJobInfo` structures
175/// for the same search request will differ only in the attribute `d_path`,
176/// which points to a specific filename among the set of files to be searched;
177/// other fields will be identical across all structures for a given Fast
178/// Search.
179///
180/// See the following section for an illustration of the functor interface.
181/// @code
182/// static void myFastSearchJob(void *arg)
183/// {
184/// my_FastSearchJobInfo *job = (my_FastSearchJobInfo*)arg;
185/// FILE *file;
186///
187/// file = fopen(job->d_path->c_str(), "r");
188///
189/// if (file) {
190/// char buffer[1024];
191/// size_t nread;
192/// size_t wordLen = job->d_word->length();
193/// const char *word = job->d_word->c_str();
194///
195/// nread = fread(buffer, 1, sizeof(buffer) - 1, file);
196/// while(nread >= wordLen) {
197/// buffer[nread] = 0;
198/// if (strstr(buffer, word)) {
199/// @endcode
200/// If we find a match, we add the file to the result list and return. Since
201/// the result list is shared among multiple processing threads, we use a mutex
202/// lock to regulate access to the list. We use a `bslmt::LockGuard` to manage
203/// access to the mutex lock. This template object acquires a mutex lock on
204/// `job->d_mutex` at construction, releases that lock on destruction. Thus,
205/// the mutex will be locked within the scope of the `if` block, and released
206/// when the program exits that scope.
207///
208/// See @ref bslmt_threadutil for information about the `bslmt::Mutex` class, and
209/// component @ref bslmt_lockguard for information about the `bslmt::LockGuard`
210/// template class.
211/// @code
212/// bslmt::LockGuard<bslmt::Mutex> lock(job->d_mutex);
213/// job->d_outList->push_back(*job->d_path);
214/// break; // bslmt::LockGuard destructor unlocks mutex.
215/// }
216/// memcpy(buffer, &buffer[nread - wordLen - 1], wordLen - 1);
217/// nread = fread(buffer + wordLen - 1, 1, sizeof(buffer) - wordLen,
218/// file);
219/// }
220/// fclose(file);
221/// }
222/// }
223/// @endcode
224/// Routine `myFastSearch` is the main driving routine, taking three arguments:
225/// a single string to search for (`word`), a list of files to search, and an
226/// output list of files. When the function completes, the file list will
227/// contain the names of files where a match was found.
228/// @code
229/// void myFastSearch(const bsl::string& word,
230/// const bsl::vector<bsl::string>& fileList,
231/// bsl::vector<bsl::string>& outFileList)
232/// {
233/// bslmt::Mutex mutex;
234/// bslmt::ThreadAttributes defaultAttributes;
235/// @endcode
236/// We initialize the thread pool using default thread attributes. We then
237/// start the pool so that the threads can begin while we prepare the jobs.
238/// @code
239/// bdlmt::ThreadPool pool(defaultAttributes,
240/// MIN_SEARCH_THREADS,
241/// MAX_SEARCH_THREADS,
242/// MAX_SEARCH_THREAD_IDLE);
243///
244/// if (0 != pool.start()) {
245/// bsl::cerr << "Failed to start minimum number of threads.\n";
246/// exit(1);
247/// }
248/// @endcode
249/// For each file to be searched, we create the job info structure that will be
250/// passed to the search function and add the job to the pool.
251///
252/// As noted above, all jobs will share a single mutex to guard the output file
253/// list. Function `myFastSearchJob` uses a `bslmt::LockGuard` on this mutex to
254/// serialize access to the list.
255/// @code
256/// int count = fileList.size();
257/// my_FastSearchJobInfo *jobInfoArray = new my_FastSearchJobInfo[count];
258///
259/// for (int i = 0; i < count; ++i) {
260/// my_FastSearchJobInfo &job = jobInfoArray[i];
261/// job.d_word = &word;
262/// job.d_path = &fileList[i];
263/// job.d_mutex = &mutex;
264/// job.d_outList = &outFileList;
265/// pool.enqueueJob(myFastSearchJob, &job);
266/// }
267/// @endcode
268/// Now we simply wait for all the jobs in the queue to complete. Any matched
269/// files should have been added to the output file list.
270/// @code
271/// pool.drain();
272/// delete[] jobInfoArray;
273/// }
274/// @endcode
275///
276/// ### The Functor Interface {#bdlmt_threadpool-the-functor-interface}
277///
278///
279/// The "void function/void pointer" convention is idiomatic for C programs.
280/// The `void` pointer argument provides a generic way of passing in user data,
281/// without regard to the data type. Clients who prefer better or more explicit
282/// type safety may wish to use the Functor Interface instead. This interface
283/// uses the `bsl::function` component to provide type-safe wrappers that can
284/// match argument number and type for a C++ free function or member function.
285///
286/// To illustrate the Functor Interface, we will make two small changes to the
287/// usage example above. First, we change the signature of the function that
288/// executes a single job, so that it uses a `my_FastSearchJobInfo` pointer
289/// rather than a `void` pointer. With this change, we can remove the first
290/// executable statement, which casts the `void *` pointer to
291/// `my_FastSearchJobInfo *`.
292/// @code
293/// static void my_FastFunctorSearchJob(my_FastSearchJobInfo *job)
294/// {
295/// FILE *file;
296///
297/// file = fopen(job->d_path->c_str(), "r");
298///
299/// // The rest of the function is unchanged.
300/// if (file) {
301/// char buffer[1024];
302/// size_t nread;
303/// size_t wordLen = job->d_word->length();
304/// const char *word = job->d_word->c_str();
305///
306/// nread = fread(buffer, 1, sizeof(buffer) - 1, file);
307/// while(nread >= wordLen) {
308/// buffer[nread] = 0;
309/// if (strstr(buffer, word)) {
310/// bslmt::LockGuard<bslmt::Mutex> lock(job->d_mutex);
311/// job->d_outList->push_back(*job->d_path);
312/// break; // bslmt::LockGuard destructor unlocks mutex.
313/// }
314/// }
315/// bsl::memcpy(buffer, &buffer[nread - wordLen - 1], wordLen - 1);
316/// nread = fread(buffer + wordLen - 1, 1, sizeof(buffer) - wordLen,
317/// file);
318/// }
319/// fclose(file);
320/// }
321/// @endcode
322/// Next, we make a change to the loop that enqueues the jobs in `myFastSearch`.
323/// The function starts exactly as in the previous example:
324/// @code
325/// static void myFastFunctorSearch(const string& word,
326/// const vector<string>& fileList,
327/// vector<string>& outFileList)
328/// {
329/// bslmt::Mutex mutex;
330/// bslmt::ThreadAttributes defaultAttributes;
331/// bdlmt::ThreadPool pool(defaultAttributes,
332/// MIN_SEARCH_THREADS,
333/// MAX_SEARCH_THREADS,
334/// MAX_SEARCH_THREAD_IDLE);
335///
336/// if (0 != pool.start()) {
337/// bsl::cerr << "Failed to start minimum number of threads. "
338/// << "Thread quota exceeded?\n";
339/// assert(false);
340/// return; // things are SNAFU
341/// }
342///
343/// int count = fileList.size();
344/// my_FastSearchJobInfo *jobInfoArray = new my_FastSearchJobInfo[count];
345/// @endcode
346/// We create a functor - a C++ object that acts as a function. The thread pool
347/// will "execute" this functor (by calling its `operator()` member function) on
348/// a thread when one becomes available.
349/// @code
350/// for (int i = 0; i < count; ++i) {
351/// my_FastSearchJobInfo &job = jobInfoArray[i];
352/// job.d_word = &word;
353/// job.d_path = &fileList[i];
354/// job.d_mutex = &mutex;
355/// job.d_outList = &outFileList;
356///
357/// bsl::function<void()> jobHandle =
358/// bdlf::BindUtil::bind(&my_FastFunctorSearchJob, &job);
359/// pool.enqueueJob(jobHandle);
360/// }
361/// @endcode
362/// Note that the functor is created locally and handed to the thread pool. The
363/// thread pool copies the functor onto its internal queue, and takes
364/// responsibility for the copied functor until execution is complete.
365///
366/// The function is completed exactly as it was in the previous example.
367/// @code
368/// pool.drain();
369/// delete[] jobInfoArray;
370/// }
371/// @endcode
372/// @}
373/** @} */
374/** @} */
375
376/** @addtogroup bdl
377 * @{
378 */
379/** @addtogroup bdlmt
380 * @{
381 */
382/** @addtogroup bdlmt_threadpool
383 * @{
384 */
385
386#include <bdlf_bind.h>
387
388#include <bdlscm_version.h>
389
390#include <bdlm_metricsregistry.h>
391
392#include <bslma_allocator.h>
394
396#include <bslmf_movableref.h>
398
400#include <bslmt_condition.h>
401#include <bslmt_mutex.h>
402#include <bslmt_threadutil.h>
403
404#include <bsls_atomic.h>
406#include <bsls_platform.h>
407#include <bsls_timeinterval.h>
408
409#include <bsl_deque.h>
410#if defined(BSLS_PLATFORM_OS_UNIX)
411 #include <bsl_csignal.h> // sigfillset
412#endif
413#include <bsl_functional.h>
414#include <bsl_string.h>
415
416#ifndef BDE_DONT_ALLOW_TRANSITIVE_INCLUDES
417#include <bslalg_typetraits.h>
418#endif // BDE_DONT_ALLOW_TRANSITIVE_INCLUDES
419
420
421
422#ifndef BDE_OMIT_INTERNAL_DEPRECATED
423
424/// This type declares the prototype for functions that are suitable to
425/// be specified `bdlmt::FixedThreadPool::enqueueJob`.
426extern "C" typedef void (*bcep_ThreadPoolJobFunc)(void *);
427
428#endif // BDE_OMIT_INTERNAL_DEPRECATED
429
430namespace bdlmt {
431
432struct ThreadPoolWaitNode;
433
434/// Entry point for processing threads.
435extern "C" void *ThreadPoolEntry(void *);
436
437/// This type declares the prototype for functions that are suitable to be
438/// specified `bdlmt::FixedThreadPool::enqueueJob`.
439extern "C" typedef void (*ThreadPoolJobFunc)(void *);
440
441 // ================
442 // class ThreadPool
443 // ================
444
445/// This class implements a thread pool used for concurrently executing
446/// multiple user-defined functions ("jobs").
447///
448/// See @ref bdlmt_threadpool
450
451 public:
452 // TYPES
453 typedef bsl::function<void()> Job;
454
455 private:
456 // PRIVATE DATA
457 bsl::deque<Job> d_queue; // queue of pending jobs
458
459 mutable bslmt::Mutex d_mutex; // mutex used to control access to
460 // this thread pool
461
462 bslmt::Condition d_drainCond; // condition variable used to signal
463 // that the queue is fully drained
464 // and that all active jobs have
465 // completed
466
468 d_threadAttributes;
469 // thread attributes to be used when
470 // constructing processing threads
471
472 const int d_maxThreads; // maximum number of processing
473 // threads that can be started at
474 // any given time by this thread
475 // pool
476
477 const int d_minThreads; // minimum number of processing
478 // threads that must running at any
479 // given time
480
481 int d_threadCount; // current number of processing
482 // threads started by this thread
483 // pool
484
485 bsls::AtomicInt d_createFailures; // number of thread create failures
486
487
488 bsls::TimeInterval d_maxIdleTime; // time that threads (in excess of
489 // the minimum number of threads)
490 // remain idle before being shut
491 // down
492
493 int d_numActiveThreads;
494 // current number of threads that
495 // are actively processing a job
496
497 bsls::AtomicInt d_enabled; // indicates the enabled state of
498 // queue; queuing is disabled when
499 // 0, enabled otherwise
500
502 d_waitHead; // pointer to the 'WaitNode' control
503 // structure of the first thread
504 // that is waiting for a request
505
506 bsls::AtomicInt64 d_lastResetTime; // last reset time of percent-busy
507 // metric in nanoseconds from some
508 // arbitrary but fixed point in time
509
510 bsls::AtomicInt64 d_callbackTime; // the total time spent running jobs
511 // (callbacks) across all threads,
512 // in nanoseconds
513
514#if defined(BSLS_PLATFORM_OS_UNIX)
515 sigset_t d_blockSet; // set of signals to be blocked in
516 // managed threads
517#endif
518
520 d_backlogHandle; // backlog metric handle
521
522 // CLASS DATA
523 static const char s_defaultThreadName[16]; // default name of threads
524 // if supported and
525 // attributes doesn't
526 // specify another name
527
528 // FRIENDS
529 friend void* ThreadPoolEntry(void *);
530
531 // PRIVATE MANIPULATORS
532
533 void doEnqueueJob(const Job& job);
534 /// Internal method used to push the specified `job` onto `d_queue` and
535 /// signal the next waiting thread if any. Note that this method must
536 /// be called with `d_mutex` locked.
537 void doEnqueueJob(bslmf::MovableRef<Job> job);
538
539 /// Initialize this thread pool using the stored attributes and the
540 /// specified `metricsRegistry` and `threadPoolName`. If
541 /// `metricsRegistry` is 0, `bdlm::MetricsRegistry::singleton()` is
542 /// used.
543 void initialize(bdlm::MetricsRegistry *metricsRegistry,
544 const bsl::string_view& threadPoolName);
545
546 /// Signal this thread and pop the current thread from the wait list.
547 void wakeThreadIfNeeded();
548
549 /// Start a new thread if needed and the maximum number of threads are
550 /// not yet running. This method must be called with `d_mutex` locked.
551 /// Return 0 if at least one thread is running, and a non-zero value
552 /// otherwise.
553 int startThreadIfNeeded();
554
555#if defined(BSLS_PLATFORM_OS_UNIX)
556 /// Initialize the set of signals to be blocked in the managed threads.
557 void initBlockSet();
558#endif
559
560 /// Internal method to spawn a new processing thread and increment the
561 /// current count. This method must be called with `d_mutex` locked.
562 int startNewThread();
563
564 /// Processing thread function.
565 void workerThread();
566
567 private:
568 // NOT IMPLEMENTED
569 ThreadPool(const ThreadPool&);
570 ThreadPool& operator=(const ThreadPool&);
571
572 public:
573 // TRAITS
575
576 // CREATORS
577
578 /// Construct a thread pool with the specified `threadAttributes`, the
579 /// specified `minThreads` minimum number of threads, the specified
580 /// `maxThreads` maximum number of threads, and the specified
581 /// `maxIdleTime` idle time (in milliseconds) after which a thread may
582 /// be considered for destruction. Optionally specify a
583 /// `basicAllocator` used to supply memory. If `basicAllocator` is 0,
584 /// the currently installed default allocator is used. The name used for
585 /// created threads is `threadAttributes.threadName()` if not empty,
586 /// otherwise "bdl.ThreadPool". The behavior is undefined unless
587 /// `0 <= minThreads`, `minThreads <= maxThreads`, and `0 <= maxIdleTime`.
588 ThreadPool(const bslmt::ThreadAttributes& threadAttributes,
589 int minThreads,
590 int maxThreads,
591 int maxIdleTime,
592 bslma::Allocator *basicAllocator = 0);
593
594 /// Construct a thread pool with the specified `threadAttributes`, the
595 /// specified `minThreads` minimum number of threads, the specified
596 /// `maxThreads` maximum number of threads, the specified `maxIdleTime`
597 /// idle time (in milliseconds) after which a thread may be considered
598 /// for destruction, the specified `threadPoolName` to be used to
599 /// identify this thread pool, and the specified `metricsRegistry` to
600 /// be used for reporting metrics. If `metricsRegistry` is 0,
601 /// `bdlm::MetricsRegistry::singleton()` is used. Optionally specify a
602 /// `basicAllocator` used to supply memory. If `basicAllocator` is 0,
603 /// the currently installed default allocator is used. The name used for
604 /// created threads is `threadAttributes.threadName()` if not empty,
605 /// otherwise `threadPoolName` if not empty, otherwise "bdl.ThreadPool".
606 /// The behavior is undefined unless `0 <= minThreads`,
607 /// `minThreads <= maxThreads`, and `0 <= maxIdleTime`.
608 ThreadPool(const bslmt::ThreadAttributes& threadAttributes,
609 int minThreads,
610 int maxThreads,
611 int maxIdleTime,
612 const bsl::string_view& threadPoolName,
613 bdlm::MetricsRegistry *metricsRegistry,
614 bslma::Allocator *basicAllocator = 0);
615
616 /// Construct a thread pool with the specified `threadAttributes`, the
617 /// specified `minThreads` minimum number of threads, the specified
618 /// `maxThreads` maximum number of threads, and the specified
619 /// `maxIdleTime` idle time after which a thread may be considered for
620 /// destruction. Optionally specify a `basicAllocator` used to supply
621 /// memory. If `basicAllocator` is 0, the currently installed default
622 /// allocator is used. The name used for created threads is
623 /// `threadAttributes.threadName()` if not empty, otherwise
624 /// "bdl.ThreadPool". The behavior is undefined unless `0 <= minThreads`,
625 /// `minThreads <= maxThreads`, `0 <= maxIdleTime`, and the `maxIdleTime`
626 /// has a value less than or equal to `INT_MAX` milliseconds.
627 ThreadPool(const bslmt::ThreadAttributes& threadAttributes,
628 int minThreads,
629 int maxThreads,
631 bslma::Allocator *basicAllocator = 0);
632
633 /// Construct a thread pool with the specified `threadAttributes`, the
634 /// specified `minThreads` minimum number of threads, the specified
635 /// `maxThreads` maximum number of threads, the specified `maxIdleTime`
636 /// idle time after which a thread may be considered for destruction,
637 /// the specified `threadPoolName` to be used to identify this thread
638 /// pool, and the specified `metricsRegistry` to be used for reporting
639 /// metrics. If `metricsRegistry` is 0,
640 /// `bdlm::MetricsRegistry::singleton()` is used. Optionally specify a
641 /// `basicAllocator` used to supply memory. If `basicAllocator` is 0,
642 /// the currently installed default allocator is used. The name used for
643 /// created threads is `threadAttributes.threadName()` if not empty,
644 /// otherwise `threadPoolName` if not empty, otherwise "bdl.ThreadPool".
645 /// The behavior is undefined unless `0 <= minThreads`,
646 /// `minThreads <= maxThreads`, `0 <= maxIdleTime`, and the `maxIdleTime`
647 /// has a value less than or equal to `INT_MAX` milliseconds.
648 ThreadPool(const bslmt::ThreadAttributes& threadAttributes,
649 int minThreads,
650 int maxThreads,
652 const bsl::string_view& threadPoolName,
653 bdlm::MetricsRegistry *metricsRegistry,
654 bslma::Allocator *basicAllocator = 0);
655
656 /// Call `shutdown()` and destroy this thread pool.
658
659 // MANIPULATORS
660
661 /// Disable queuing on this thread pool and wait until all pending jobs
662 /// complete. Use `start` to re-enable queuing.
663 void drain();
664
665 int enqueueJob(const Job& functor);
666 /// Enqueue the specified `functor` to be executed by the next available
667 /// thread. Return 0 if enqueued successfully, and a non-zero value if
668 /// queuing is currently disabled. The behavior is undefined unless
669 /// `functor` is not "unset". See `bsl::function` for more information
670 /// on functors.
672
673 /// Enqueue the specified `function` to be executed by the next
674 /// available thread. The specified `userData` pointer will be passed
675 /// to the function by the processing thread. Return 0 if enqueued
676 /// successfully, and a non-zero value if queuing is currently disabled.
677 int enqueueJob(ThreadPoolJobFunc function, void *userData);
678
679 /// Atomically report the percentage of wall time spent by each thread of
680 /// this thread pool executing jobs since the last reset time, and set the
681 /// reset time to now. The creation of the thread pool is considered a
682 /// first reset time. This value is calculated as:
683 /// @code
684 /// sum(jobExecutionTime) 100%
685 /// P_busy = -------------------- x ----------
686 /// timeSinceLastReset maxThreads
687 /// @endcode
688 /// Note that this percentage reflects the wall time spent per thread, and
689 /// not CPU time per thread, or not even CPU time per processor. Also note
690 /// that there is no guarantee that all threads are processed concurrently
691 /// (e.g., the number of threads could be larger than the number of
692 /// processors).
694
695 /// Disable queuing on this thread pool, cancel all queued jobs, and shut
696 /// down all processing threads (after all active jobs complete).
697 void shutdown();
698
699 /// Enable queuing on this thread pool and spawn `minThreads()` processing
700 /// threads. Return 0 on success, and a non-zero value otherwise. If
701 /// `minThreads()` threads were not successfully started, all threads are
702 /// stopped.
703 int start();
704
705 /// Disable queuing on this thread pool and wait until all pending jobs
706 /// complete, then shut down all processing threads.
707 void stop();
708
709 // ACCESSORS
710
711 /// Return the state (enabled or not) of the thread pool.
712 int enabled() const;
713
714 /// Return the maximum number of threads that are allowed to be running at
715 /// given time.
716 int maxThreads() const;
717
718 /// Return the amount of time (in milliseconds) a thread remains idle
719 /// before being shut down when there are more than min threads started.
720 int maxIdleTime() const;
721
722 /// Return the amount of time a thread remains idle before being shut down
723 /// when there are more than min threads started.
725
726 /// Return the minimum number of threads that must be started at any given
727 /// time.
728 int minThreads() const;
729
730 /// Return the number of threads that are currently processing a job.
731 int numActiveThreads() const;
732
733 /// Return the number of jobs that are currently queued, but not yet being
734 /// processed.
735 int numPendingJobs() const;
736
737 /// Return the number of threads that are currently waiting for a job.
738 int numWaitingThreads() const;
739
740 /// Return the percentage of wall time spent by each thread of this thread
741 /// pool executing jobs since the last reset time. The creation of the
742 /// thread pool is considered a first reset time. This value is calculated
743 /// as:
744 /// @code
745 /// sum(jobExecutionTime) 100%
746 /// P_busy = -------------------- x ----------
747 /// timeSinceLastReset maxThreads
748 /// @endcode
749 /// Note that this percentage reflects the wall time spent per thread, and
750 /// not CPU time per thread, or not even CPU time per processor. Also note
751 /// that there is no guarantee that all threads are processed concurrently
752 /// (e.g., the number of threads could be larger than the number of
753 /// processors).
754 double percentBusy() const;
755
756 /// Return the number of times that thread creation failed.
757 int threadFailures() const;
758};
759
760// ============================================================================
761// INLINE DEFINITIONS
762// ============================================================================
763
764// MANIPULATORS
765
766inline
767int ThreadPool::enqueueJob(ThreadPoolJobFunc function, void *userData)
768{
769 return enqueueJob(bdlf::BindUtil::bindR<void>(function, userData));
770}
771
772// ACCESSORS
773
774inline
776{
777 return d_enabled;
778}
779
780inline
782{
783 return d_minThreads;
784}
785
786inline
788{
789 return d_maxThreads;
790}
791
792inline
794{
795 return d_createFailures;
796}
797
798inline
800{
801 return static_cast<int>(d_maxIdleTime.totalMilliseconds());
802}
803
804inline
806{
807 return d_maxIdleTime;
808}
809
810} // close package namespace
811
812#endif
813
814// ----------------------------------------------------------------------------
815// Copyright 2024 Bloomberg Finance L.P.
816//
817// Licensed under the Apache License, Version 2.0 (the "License");
818// you may not use this file except in compliance with the License.
819// You may obtain a copy of the License at
820//
821// http://www.apache.org/licenses/LICENSE-2.0
822//
823// Unless required by applicable law or agreed to in writing, software
824// distributed under the License is distributed on an "AS IS" BASIS,
825// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
826// See the License for the specific language governing permissions and
827// limitations under the License.
828// ----------------------------- END-OF-FILE ----------------------------------
829
830/** @} */
831/** @} */
832/** @} */
Definition bdlm_metricsregistry.h:287
Definition bdlm_metricsregistry.h:199
Definition bdlmt_threadpool.h:449
double percentBusy() const
bsl::function< void()> Job
Definition bdlmt_threadpool.h:453
int threadFailures() const
Return the number of times that thread creation failed.
Definition bdlmt_threadpool.h:793
int enqueueJob(const Job &functor)
ThreadPool(const bslmt::ThreadAttributes &threadAttributes, int minThreads, int maxThreads, int maxIdleTime, bslma::Allocator *basicAllocator=0)
int maxThreads() const
Definition bdlmt_threadpool.h:787
int enqueueJob(bslmf::MovableRef< Job > functor)
ThreadPool(const bslmt::ThreadAttributes &threadAttributes, int minThreads, int maxThreads, bsls::TimeInterval maxIdleTime, const bsl::string_view &threadPoolName, bdlm::MetricsRegistry *metricsRegistry, bslma::Allocator *basicAllocator=0)
int minThreads() const
Definition bdlmt_threadpool.h:781
int numPendingJobs() const
BSLMF_NESTED_TRAIT_DECLARATION(ThreadPool, bslma::UsesBslmaAllocator)
ThreadPool(const bslmt::ThreadAttributes &threadAttributes, int minThreads, int maxThreads, bsls::TimeInterval maxIdleTime, bslma::Allocator *basicAllocator=0)
bsls::TimeInterval maxIdleTimeInterval() const
Definition bdlmt_threadpool.h:805
int maxIdleTime() const
Definition bdlmt_threadpool.h:799
int numWaitingThreads() const
Return the number of threads that are currently waiting for a job.
int numActiveThreads() const
Return the number of threads that are currently processing a job.
friend void * ThreadPoolEntry(void *)
Entry point for processing threads.
int enabled() const
Return the state (enabled or not) of the thread pool.
Definition bdlmt_threadpool.h:775
double resetPercentBusy()
ThreadPool(const bslmt::ThreadAttributes &threadAttributes, int minThreads, int maxThreads, int maxIdleTime, const bsl::string_view &threadPoolName, bdlm::MetricsRegistry *metricsRegistry, bslma::Allocator *basicAllocator=0)
~ThreadPool()
Call shutdown() and destroy this thread pool.
Definition bslstl_stringview.h:441
Definition bslstl_deque.h:772
Forward declaration.
Definition bslstl_function.h:934
Definition bslma_allocator.h:457
Definition bslmf_movableref.h:751
Definition bslmt_condition.h:220
Definition bslmt_mutex.h:315
Definition bslmt_threadattributes.h:356
Definition bsls_atomic.h:892
Definition bsls_atomic.h:743
Definition bsls_atomic.h:1349
Definition bsls_timeinterval.h:301
BSLS_KEYWORD_CONSTEXPR_CPP14 bsls::Types::Int64 totalMilliseconds() const
Definition bsls_timeinterval.h:1384
void(* bcep_ThreadPoolJobFunc)(void *)
Definition bdlmt_threadpool.h:426
#define BSLS_IDENT(str)
Definition bsls_ident.h:195
Definition bdlmt_eventscheduler.h:522
void(* ThreadPoolJobFunc)(void *)
Definition bdlmt_threadpool.h:439
void * ThreadPoolEntry(void *)
Entry point for processing threads.
Definition bslma_usesbslmaallocator.h:343