BDE 4.14.0 Production release
Loading...
Searching...
No Matches
bdlmt_threadmultiplexor.h
Go to the documentation of this file.
1/// @file bdlmt_threadmultiplexor.h
2///
3/// The content of this file has been pre-processed for Doxygen.
4///
5
6
7// bdlmt_threadmultiplexor.h -*-C++-*-
8#ifndef INCLUDED_BDLMT_THREADMULTIPLEXOR
9#define INCLUDED_BDLMT_THREADMULTIPLEXOR
10
11#include <bsls_ident.h>
12BSLS_IDENT("$Id: $")
13
14/// @defgroup bdlmt_threadmultiplexor bdlmt_threadmultiplexor
15/// @brief Provide a mechanism for partitioning a collection of threads.
16/// @addtogroup bdl
17/// @{
18/// @addtogroup bdlmt
19/// @{
20/// @addtogroup bdlmt_threadmultiplexor
21/// @{
22///
23/// <h1> Outline </h1>
24/// * <a href="#bdlmt_threadmultiplexor-purpose"> Purpose</a>
25/// * <a href="#bdlmt_threadmultiplexor-classes"> Classes </a>
26/// * <a href="#bdlmt_threadmultiplexor-description"> Description </a>
27/// * <a href="#bdlmt_threadmultiplexor-thread-safety"> Thread Safety </a>
28/// * <a href="#bdlmt_threadmultiplexor-usage"> Usage </a>
29/// * <a href="#bdlmt_threadmultiplexor-example-1-multiple-work-queues"> Example 1: Multiple Work Queues </a>
30///
31/// # Purpose {#bdlmt_threadmultiplexor-purpose}
32/// Provide a mechanism for partitioning a collection of threads.
33///
34/// # Classes {#bdlmt_threadmultiplexor-classes}
35///
36/// - bdlmt::ThreadMultiplexor: mechanism to partition multi-threaded processing
37///
38/// @see bdlmt_threadpool, bdlmt_fixedthreadpool
39///
40/// # Description {#bdlmt_threadmultiplexor-description}
41/// This component provides a mechanism for partitioning a
42/// collection of threads, so that a single collection of threads (e.g., a
43/// thread pool) can be used to process various types of user-defined functions
44/// ("jobs") while sharing the thread resources equitably between them. A
45/// typical example where this type of partitioning is desired is an application
46/// that performs both I/O and CPU-intensive processing. The traditional
47/// approach is to create two thread pools--one for I/O, and one for
48/// processing--and pass control (in the form of a callback) from one thread
49/// pool to the other. However, there are several problems with this approach.
50/// Firstly, the process incurs the overhead of context switching between
51/// threads, which must necessarily occur because there are two different thread
52/// pools. Secondly, the process may not be able to adapt well to imbalances
53/// between one type of processing versus the other if the number of threads in
54/// each thread pool is bounded. In this case, a large number of jobs may be
55/// enqueued while some portion of threads allocated to the process go unused.
56/// On the other hand, simply sharing a single thread pool without a provision
57/// for partitioning the use of threads may result in one type of processing
58/// starving the other.
59///
60/// The `bdlmt::ThreadMultiplexor` provides an API, `processJob`, to process
61/// user-specified jobs. A multiplexor instance is configured with a maximum
62/// number of "processors", i.e., the maximum number of threads that may process
63/// jobs at any particular time. Additional threads enqueue jobs to a pending
64/// job queue, which is processed by the next available processing thread.
65///
66/// Typically, a `bdlmt::ThreadMultiplexor` instance is used in conjunction with
67/// a thread pool (e.g., `bdlmt::FixedThreadPool`), where each thread pool
68/// thread calls the multiplexor `processJob` method to perform some work. The
69/// multiplexor guarantees that no more that the configured number of threads
70/// will process jobs concurrently. This guarantee allows a single thread pool
71/// to be used in a variety of situations that require partitioning thread
72/// resources.
73///
74/// ## Thread Safety {#bdlmt_threadmultiplexor-thread-safety}
75///
76///
77/// The `bdlmt::ThreadMultiplexor` class is both **fully thread-safe** (i.e.,
78/// all non-creator methods can correctly execute concurrently), and is
79/// **thread-enabled** (i.e., the class does not function correctly in a
80/// non-multi-threading environment). See @ref bsldoc_glossary for complete
81/// definitions of **fully thread-safe** and **thread-enabled**.
82///
83/// ## Usage {#bdlmt_threadmultiplexor-usage}
84///
85///
86/// This section illustrates intended use of this component.
87///
88/// ### Example 1: Multiple Work Queues {#bdlmt_threadmultiplexor-example-1-multiple-work-queues}
89///
90///
91/// The following usage example illustrates how the `bdlmt::ThreadMultiplexor`
92/// can be used to share thread resources between three separate work queues.
93/// Assume that there are three classes of jobs: jobs that are important, jobs
94/// that are urgent, and jobs that are critical. We would like to execute each
95/// class of jobs in a single thread pool, but we want ensure that all types of
96/// jobs can be executed at any time.
97///
98/// We begin by defining a class that encapsulates the notion of a job queue.
99/// Our `JobQueue` class holds a reference to a `bdlmt::FixedThreadPool`, used
100/// to instantiate the job queue, and owns an instance of
101/// `bdlmt::ThreadMultiplexor`, used to process jobs.
102/// @code
103/// /// This class defines a generic processor for user-defined functions
104/// /// ("jobs"). Jobs specified to the `processJob` method are executed
105/// /// in the thread pool specified at construction.
106/// class JobQueue {
107///
108/// public:
109/// // PUBLIC TYPES
110///
111/// /// A callback of this type my be specified to the `processJob` method.
112/// typedef bdlmt::ThreadMultiplexor::Job Job;
113///
114/// private:
115/// // DATA
116/// bdlmt::FixedThreadPool *d_threadPool_p; // (held, not owned)
117/// bdlmt::ThreadMultiplexor d_multiplexor; // used to partition threads
118///
119/// private:
120/// // NOT IMPLEMENTED
121/// JobQueue(const JobQueue&);
122/// JobQueue& operator=(const JobQueue&);
123///
124/// public:
125/// // CREATORS
126///
127/// /// Create a job queue that executes jobs in the specified
128/// /// 'threadPool' using no more than the specified 'maxProcessors'.
129/// /// Optionally specify a 'basicAllocator' used to supply memory. If
130/// /// 'basicAllocator' is 0, the currently installed default allocator
131/// /// is used.
132/// JobQueue(int maxProcessors,
133/// bdlmt::FixedThreadPool *threadPool,
134/// bslma::Allocator *basicAllocator = 0);
135///
136/// /// Destroy this object.
137/// ~JobQueue();
138///
139/// // MANIPULATORS
140///
141/// /// Process the specified `job` in the thread pool specified at
142/// /// construction. Return 0 on success, and a non-zero value otherwise.
143/// int processJob(const Job& job);
144/// };
145/// @endcode
146/// The maximum number of processors for the multiplexor instance owned by each
147/// `JobQueue` is configured using the following formula, for
148/// T = number of threads and M = number of multiplexors > 1:
149/// @code
150/// maxProc = ceil(T / (M-1))-1
151/// @endcode
152/// This allows multiple `JobQueue` instances to share the same threadpool
153/// without starving each other when the thread pool has more than one thread.
154/// For this usage example, we assume M (number of multiplexors) = 3, and T
155/// (number of threads) = 5, so maxProc = 2. It is important to note that every
156/// call to `processJob` enqueues a job to the thread pool, so the length of the
157/// thread pool queue determines the maximum number of jobs that can be accepted
158/// by the JobQueue. (Multiple JobQueues share the same maximum *together*, so
159/// not all will be able to reach their individual maximum at the same time).
160/// @code
161///
162/// JobQueue::JobQueue(int maxProcessors,
163/// bdlmt::FixedThreadPool *threadPool,
164/// bslma::Allocator *basicAllocator)
165/// : d_threadPool_p(threadPool)
166/// , d_multiplexor (maxProcessors,
167/// threadPool->queueCapacity(),
168/// basicAllocator)
169/// {
170/// }
171///
172/// JobQueue::~JobQueue()
173/// {
174/// }
175/// @endcode
176/// The `processJob` method enqueues a secondary callback into the thread pool
177/// that executes the user-specified `job` through the multiplexor.
178/// @code
179/// int JobQueue::processJob(const JobQueue::Job& job)
180/// {
181/// return d_threadPool_p->tryEnqueueJob(bdlf::BindUtil::bind(
182/// &bdlmt::ThreadMultiplexor::processJob<Job>,
183/// &d_multiplexor,
184/// job));
185/// }
186/// @endcode
187/// The following program uses three instances of `JobQueue` to process
188/// important, urgent, and critical jobs using a single collection of threads.
189/// @code
190/// int main(void)
191/// {
192/// enum {
193/// NUM_THREADS = 5, // total number of threads
194/// NUM_QUEUES = 3, // total number of JobQueue objects
195/// MAX_QUEUESIZE = 20 // total number of pending jobs
196/// };
197///
198/// int maxProc = bsl::max(1,
199/// ceil(double(NUM_THREADS) / (NUM_QUEUES-1))-1);
200///
201/// bdlmt::FixedThreadPool tp(NUM_THREADS, MAX_QUEUESIZE);
202/// JobQueue importantQueue(maxProc, &tp);
203/// JobQueue urgentQueue(maxProc, &tp);
204/// JobQueue criticalQueue(maxProc, &tp);
205///
206/// if (0 != tp.start()) {
207/// ASSERT(0 == "Could not start thread pool!");
208/// return -1;
209/// }
210///
211/// JobQueue::Job ijob =
212/// bdlf::BindUtil::bind(&bsls::AtomicInt::add, &iCheck, 1);
213///
214/// JobQueue::Job ujob = bdlf::BindUtil::bind(
215/// bdlf::BindUtil::bind(&bsls::AtomicInt::add, &uCheck, 1);
216///
217/// JobQueue::Job cjob = bdlf::BindUtil::bind(
218/// bdlf::BindUtil::bind(&bsls::AtomicInt::add, &cCheck, 1);
219///
220/// importantQueue.processJob(ijob);
221/// importantQueue.processJob(ijob);
222/// importantQueue.processJob(ijob);
223/// importantQueue.processJob(ijob);
224/// importantQueue.processJob(ijob);
225/// importantQueue.processJob(ijob);
226///
227/// urgentQueue.processJob(ujob);
228/// urgentQueue.processJob(ujob);
229/// urgentQueue.processJob(ujob);
230/// urgentQueue.processJob(ujob);
231///
232/// criticalQueue.processJob(cjob);
233/// criticalQueue.processJob(cjob);
234///
235/// tp.stop();
236/// ASSERT(6 == iCheck);
237/// ASSERT(4 == uCheck);
238/// ASSERT(2 == cCheck);
239/// return 0;
240/// }
241/// @endcode
242/// @}
243/** @} */
244/** @} */
245
246/** @addtogroup bdl
247 * @{
248 */
249/** @addtogroup bdlmt
250 * @{
251 */
252/** @addtogroup bdlmt_threadmultiplexor
253 * @{
254 */
255
256#include <bdlscm_version.h>
257
258#include <bdlcc_fixedqueue.h>
259
260#include <bsls_atomic.h>
261
262#include <bslma_allocator.h>
264
266
267#include <bsl_functional.h>
268
269#ifndef BDE_DONT_ALLOW_TRANSITIVE_INCLUDES
270#include <bslalg_typetraits.h>
271#endif // BDE_DONT_ALLOW_TRANSITIVE_INCLUDES
272
273
274
275namespace bdlmt {
276 // =======================
277 // class ThreadMultiplexor
278 // =======================
279
280/// This class provides a mechanism for facilitating the use of multiple
281/// threads to perform various user-defined functions ("jobs") when some
282/// degree of collaboration between threads is required. The thread
283/// multiplexor is configured with a total number of "processors",
284/// representing the number of threads that may process jobs at any
285/// particular time. Additional threads enqueue jobs to a pending job
286/// queue, which is processed by the next available processing thread.
287///
288/// See @ref bdlmt_threadmultiplexor
290
291 public:
292 // PUBLIC TYPES
293
294 /// A callback of this type may be passed to the `processJob` method.
295 typedef bsl::function<void()> Job;
296
297 private:
298 // DATA
299 bslma::Allocator *d_allocator_p; // memory allocator (held)
300 bdlcc::FixedQueue<Job> *d_jobQueue_p; // pending job queue (owned)
301 bsls::AtomicInt d_numProcessors; // current number of processors
302 int d_maxProcessors; // maximum number of processors
303
304 private:
305 // PRIVATE MANIPULATORS
306
307 /// Process the pending job queue. Execute each functor obtained from
308 /// the queue in the calling thread if the current number of processors
309 /// is less than the maximum number of processors. Otherwise, enqueue
310 /// the job back to the pending queue. Return 0 on success, and a
311 /// non-zero value otherwise.
312 int processJobQueue();
313
314 // NOT IMPLEMENTED
316 ThreadMultiplexor& operator=(const ThreadMultiplexor&);
317
318 public:
319 // TRAITS
322
323 // CREATORS
324
325 /// Create a thread multiplexor which uses, at most, the specified
326 /// `maxProcessors` number of threads to process user-specified jobs,
327 /// identified as callbacks of type `Job`. Jobs that cannot be processed
328 /// immediately are placed on a queue having the specified `maxQueueSize`
329 /// to be processed by the next free thread. Optionally specify a
330 /// `basicAllocator` used to supply memory. If `basicAllocator` is 0, the
331 /// currently installed default allocator is used. The behavior is
332 /// undefined unless `0 < maxProcessors` and `0 < maxQueueSize`.
334 int maxQueueSize,
335 bslma::Allocator *basicAllocator = 0);
336
337 /// Destroy this thread multiplexor object.
339
340 // MANIPULATORS
341
342 /// Process the specified `job` functor in the calling thread if the
343 /// current number of processors is less than the maximum number of
344 /// processors. Otherwise, enqueue `job` to the pending job queue. Return
345 /// 0 on success, and a non-zero value otherwise. Note that the only
346 /// requirements on `t_JOBTYPE` are that it defines `operator()`, having a
347 /// `void` return type, and that it defines a copy constructor.
348 template <class t_JOBTYPE>
349 int processJob(const t_JOBTYPE& job);
350
351 // ACCESSORS
352
353 /// Return the maximum number of active processors.
354 int maxProcessors() const;
355
356 /// Return the current number of active processors.
357 int numProcessors() const;
358};
359
360// ============================================================================
361// INLINE DEFINITIONS
362// ============================================================================
363
364// MANIPULATORS
365template <class t_JOBTYPE>
366inline
367int ThreadMultiplexor::processJob(const t_JOBTYPE& job)
368{
369 // Execute 'job' in the calling thread if the current number of processors
370 // is less than the maximum number of processors. Otherwise, enqueue 'job'
371 // to the pending job queue. In all cases, check the pending job queue at
372 // the end of the loop, as the number of processors may have changed,
373 // allowing the execution of a job in the current thread.
374
375 int previousNumProcessors = d_numProcessors;
376 if (previousNumProcessors < d_maxProcessors &&
377 previousNumProcessors ==
378 d_numProcessors.testAndSwap(previousNumProcessors,
379 previousNumProcessors + 1)) {
380 // Process the job
381 job();
382 --d_numProcessors;
383 }
384 else {
385 int rc = d_jobQueue_p->tryPushBack(job);
386 if (0 != rc) {
387 return rc; // RETURN
388 }
389 }
390
391 return processJobQueue();
392}
393
394// ACCESSORS
395inline
397{
398 return d_maxProcessors;
399}
400
401inline
403{
404 return d_numProcessors;
405}
406
407} // close package namespace
408
409#endif
410
411// ----------------------------------------------------------------------------
412// Copyright 2015 Bloomberg Finance L.P.
413//
414// Licensed under the Apache License, Version 2.0 (the "License");
415// you may not use this file except in compliance with the License.
416// You may obtain a copy of the License at
417//
418// http://www.apache.org/licenses/LICENSE-2.0
419//
420// Unless required by applicable law or agreed to in writing, software
421// distributed under the License is distributed on an "AS IS" BASIS,
422// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
423// See the License for the specific language governing permissions and
424// limitations under the License.
425// ----------------------------- END-OF-FILE ----------------------------------
426
427
428/** @} */
429/** @} */
430/** @} */
Definition bdlcc_fixedqueue.h:274
Definition bdlmt_threadmultiplexor.h:289
~ThreadMultiplexor()
Destroy this thread multiplexor object.
int processJob(const t_JOBTYPE &job)
Definition bdlmt_threadmultiplexor.h:367
int maxProcessors() const
Return the maximum number of active processors.
Definition bdlmt_threadmultiplexor.h:396
ThreadMultiplexor(int maxProcessors, int maxQueueSize, bslma::Allocator *basicAllocator=0)
bsl::function< void()> Job
A callback of this type may be passed to the processJob method.
Definition bdlmt_threadmultiplexor.h:295
BSLMF_NESTED_TRAIT_DECLARATION(ThreadMultiplexor, bslma::UsesBslmaAllocator)
int numProcessors() const
Return the current number of active processors.
Definition bdlmt_threadmultiplexor.h:402
Forward declaration.
Definition bslstl_function.h:934
Definition bslma_allocator.h:457
Definition bsls_atomic.h:743
int testAndSwap(int compareValue, int swapValue)
Definition bsls_atomic.h:1702
#define BSLS_IDENT(str)
Definition bsls_ident.h:195
Definition bdlmt_eventscheduler.h:522
Definition bslma_usesbslmaallocator.h:343