BDE 4.14.0 Production release
Loading...
Searching...
No Matches
bslmt_barrier.h
Go to the documentation of this file.
1/// @file bslmt_barrier.h
2///
3/// The content of this file has been pre-processed for Doxygen.
4///
5
6
7// bslmt_barrier.h -*-C++-*-
8#ifndef INCLUDED_BSLMT_BARRIER
9#define INCLUDED_BSLMT_BARRIER
10
11#include <bsls_ident.h>
12BSLS_IDENT("$Id: $")
13
14/// @defgroup bslmt_barrier bslmt_barrier
15/// @brief Provide a thread barrier component.
16/// @addtogroup bsl
17/// @{
18/// @addtogroup bslmt
19/// @{
20/// @addtogroup bslmt_barrier
21/// @{
22///
23/// <h1> Outline </h1>
24/// * <a href="#bslmt_barrier-purpose"> Purpose</a>
25/// * <a href="#bslmt_barrier-classes"> Classes </a>
26/// * <a href="#bslmt_barrier-description"> Description </a>
27/// * <a href="#bslmt_barrier-supported-clock-types"> Supported Clock-Types </a>
28/// * <a href="#bslmt_barrier-usage"> Usage </a>
29/// * <a href="#bslmt_barrier-example-1-basic-usage"> Example 1: Basic Usage </a>
30///
31/// # Purpose {#bslmt_barrier-purpose}
32/// Provide a thread barrier component.
33///
34/// # Classes {#bslmt_barrier-classes}
35///
36/// - bslmt::Barrier: thread barrier class
37///
38/// @see bslmt_latch
39///
40/// # Description {#bslmt_barrier-description}
41/// This component defines a thread barrier named `bslmt::Barrier`.
42/// Barriers provide a simple mechanism for synchronizing a series of threads at
43/// a given point in a program. A barrier is constructed with a number
44/// `numArrivals` which is the number of arrivals (invocations of `arrive`,
45/// `wait`, and `timedWait`) required to reach the synchronization point for the
46/// barrier to be unblocked. As each thread reaches the synchronization point,
47/// it calls either the `arrive` method to indicate reaching the synchronization
48/// point or a `wait` method to indicate reaching the synchronization point and
49/// blocking until the barrier has the required number of arrivals. An
50/// invariant is that the number of threads blocking on a barrier is always less
51/// than `numArrivals`. Once the required `numArrivals` has occurred, the
52/// invariant is restored by unblocking all the waiting threads and resetting
53/// the barrier to its initial state. In particular, the barrier can be reused
54/// several times in succession.
55///
56/// Note that, if `arrive` will not be used, the number of threads sharing the
57/// use of the barrier should be exactly `numArrivals`, as only exactly
58/// `numArrivals` threads calling `wait` will be unblocked. In particular,
59/// extra threads calling `wait` will block, perhaps unwittingly participating
60/// in the next round of reuse of the barrier together with the unblocked
61/// `numArrivals` threads (leading to potential race conditions).
62///
63/// Note also that the behavior is undefined if a barrier is destroyed while one
64/// or more threads are waiting on it.
65///
66/// ## Supported Clock-Types {#bslmt_barrier-supported-clock-types}
67///
68///
69/// `bsls::SystemClockType` supplies the enumeration indicating the system clock
70/// on which timeouts supplied to other methods should be based. If the clock
71/// type indicated at construction is `bsls::SystemClockType::e_REALTIME`, the
72/// `absTime` argument passed to the `timedWait` method should be expressed as
73/// an *absolute* offset since 00:00:00 UTC, January 1, 1970 (which matches the
74/// epoch used in `bsls::SystemTime::now(bsls::SystemClockType::e_REALTIME)`.
75/// If the clock type indicated at construction is
76/// `bsls::SystemClockType::e_MONOTONIC`, the `absTime` argument passed to the
77/// `timedWait` method should be expressed as an *absolute* offset since the
78/// epoch of this clock (which matches the epoch used in
79/// `bsls::SystemTime::now(bsls::SystemClockType::e_MONOTONIC)`.
80///
81/// ## Usage {#bslmt_barrier-usage}
82///
83///
84/// This section illustrates intended use of this component.
85///
86/// ### Example 1: Basic Usage {#bslmt_barrier-example-1-basic-usage}
87///
88///
89/// The following example demonstrates the use of a `bslmt::Barrier` to create
90/// "checkpoints" in a threaded "basket trade" processing logic. In this
91/// example, a "basket" is a series of trades submitted as one logical trade.
92/// If any given trade fails to process for any reason, then all the trades must
93/// be canceled.
94///
95/// The example is driven through function `processBasketTrade`, which takes as
96/// its argument a reference to a `BasketTrade` structure. The `BasketTrade`
97/// structure contains a collection of `Trade` objects; the `processBasketTrade`
98/// function creates a separate thread to manage each `Trade` object.
99///
100/// The `Trade` threads proceed independently, except that they synchronize with
101/// one another at various stages of the trade processing: each thread waits for
102/// all trades to complete a given step before any individual trade thread
103/// proceeds to the next step.
104///
105/// The `bslmt::Barrier` is used repeatedly at each processing stage to wait for
106/// all trades to complete the given stage before continuing to the next stage.
107///
108/// To begin, we define the fundamental structures `Trade` and `BasketTrade`.
109/// @code
110/// enum {
111/// k_MAX_BASKET_TRADES = 10
112/// };
113///
114/// /// Trade stuff...
115/// struct Trade {
116/// };
117///
118/// struct BasketTrade {
119/// bsl::vector<Trade> d_trades; // array of trade that comprise the
120/// // basket
121/// };
122/// @endcode
123/// Functions `validateTrade`, `insertToDatabase`, and `submitToExchange` define
124/// functionality for the three stages of trade processing. Again, the
125/// `bslmt::Barrier` will be used so that no individual trade moves forward to
126/// the next stage before all trades have completed the given stage. So, for
127/// instance, no individual trade can call the `insertToDatabase` function until
128/// all trades have successfully completed the `validateTrade` function.
129///
130/// Functions `deleteFromDatabase` and `cancelAtExchange` are used for rolling
131/// back all trades in the event that any one trade fails to move forward.
132///
133/// The implementation of these functions is left incomplete for our example.
134/// @code
135/// int validateTrade(Trade &trade)
136/// {
137/// (void)trade;
138/// int result = 0;
139/// // Do some checking here...
140///
141/// return result;
142/// }
143///
144/// int insertToDatabase(Trade &trade)
145/// {
146/// (void)trade;
147/// int result = 0;
148/// // Insert the record here...
149///
150/// return result;
151/// }
152///
153/// int submitToExchange(Trade &trade)
154/// {
155/// (void)trade;
156/// int result = 0;
157/// // Do submission here...
158///
159/// return result;
160/// }
161///
162/// int deleteFromDatabase(Trade &trade)
163/// {
164/// (void)trade;
165/// int result = 0;
166/// // Delete record here...
167///
168/// return result;
169/// }
170///
171/// int cancelAtExchange(Trade &trade)
172/// {
173/// (void)trade;
174/// int result = 0;
175/// // Cancel trade here...
176///
177/// return result;
178/// }
179/// @endcode
180/// The `processTrade` function handles a single trade within a Trade Basket.
181/// Because this function is called within a `bslmt::Thread` callback (see the
182/// `tradeProcessingThread` function, below), its arguments are passed in a
183/// single structure. The `processTrade` function validates a trade, stores the
184/// trade into a database, and registers that trade with an exchange. At each
185/// step, the `processTrade` function synchronizes with other trades in the
186/// Trade Basket.
187/// @code
188/// struct TradeThreadArgument {
189/// bsl::vector<Trade> *d_trades_p;
190/// bslmt::Barrier *d_barrier_p;
191/// volatile bool *d_errorFlag_p;
192/// int d_tradeNum;
193/// };
194///
195/// TradeThreadArgument *processTrade(TradeThreadArgument *arguments)
196/// {
197/// int retval;
198/// Trade &trade = (*arguments->d_trades_p)[arguments->d_tradeNum];
199///
200/// retval = validateTrade(trade);
201/// @endcode
202/// If this trade failed validation, then indicate that an error has occurred.
203/// Note that, even when an error occurs, we must still block on the barrier
204/// object; otherwise, other threads which did not fail would remain blocked
205/// indefinitely.
206/// @code
207/// if (retval) *arguments->d_errorFlag_p = true;
208/// arguments->d_barrier_p->wait();
209/// @endcode
210/// Once all threads have completed the validation phase, check to see if any
211/// errors occurred; if so, exit. Otherwise continue to the next step.
212/// @code
213/// if (*arguments->d_errorFlag_p) return arguments; // RETURN
214///
215/// retval = insertToDatabase(trade);
216/// if (retval) *arguments->d_errorFlag_p = true;
217/// arguments->d_barrier_p->wait();
218/// @endcode
219/// As before, if an error occurs on this thread, we must still block on the
220/// barrier object. This time, if an error has occurred, we need to check to
221/// see whether this trade had an error. If not, then the trade has been
222/// inserted into the database, so we need to remove it before we exit.
223/// @code
224/// if (*arguments->d_errorFlag_p) {
225/// if (!retval) deleteFromDatabase(trade);
226/// return arguments; // RETURN
227/// }
228/// @endcode
229/// The final synchronization point is at the exchange. As before, if there is
230/// an error in the basket, we may need to cancel the individual trade.
231/// @code
232/// retval = submitToExchange(trade);
233/// if (retval) *arguments->d_errorFlag_p = true;
234/// arguments->d_barrier_p->wait();
235/// if (*arguments->d_errorFlag_p) {
236/// if (!retval) cancelAtExchange(trade);
237/// deleteFromDatabase(trade);
238/// return arguments; // RETURN
239/// }
240/// @endcode
241/// All synchronized steps have completed for all trades in this basket. The
242/// basket trade is placed.
243/// @code
244/// return arguments;
245/// }
246/// @endcode
247/// Function `tradeProcessingThread` is a callback for `bslmt::ThreadUtil`,
248/// which requires `void` pointers for argument and return type and `extern "C"`
249/// linkage. `bslmt::ThreadUtil::create()` expects a pointer to this function,
250/// and provides that function pointer to the newly created thread. The new
251/// thread then executes this function.
252///
253/// The `tradeProcessingThread` function receives the `void` pointer, casts it
254/// to our required type (`TradeThreadArgument *`), and then calls the
255/// type-specific function, `processTrade`. On return, the specific type is
256/// cast back to `void*`.
257/// @code
258/// extern "C" void *tradeProcessingThread(void *argumentsIn)
259/// {
260/// return (void *) processTrade ((TradeThreadArgument *)argumentsIn);
261/// }
262/// @endcode
263/// Function `processBasketTrade` drives the example. Given a `BasketTrade`,
264/// the function spawns a separate thread for each individual trade in the
265/// basket, supplying the function `tradeProcessingThread` to be executed on
266/// each thread.
267/// @code
268/// /// Return `true` if the specified basket `trade` was processed
269/// /// successfully, and `false` otherwise. The `trade` is processed
270/// /// atomically, i.e., all the trades succeed, or none of the trades are
271/// /// executed.
272/// bool processBasketTrade(BasketTrade& trade)
273/// {
274/// TradeThreadArgument arguments[k_MAX_BASKET_TRADES];
275/// bslmt::ThreadAttributes attributes;
276/// bslmt::ThreadUtil::Handle threadHandles[k_MAX_BASKET_TRADES];
277///
278/// int numTrades = static_cast<int>(trade.d_trades.size());
279/// assert(0 < numTrades && k_MAX_BASKET_TRADES >= numTrades);
280/// @endcode
281/// Construct the barrier that will be used by the processing threads. Since a
282/// thread will be created for each trade in the basket, use the number of
283/// trades as the barrier count. When `bslmt::Barrier::wait()` is called, the
284/// barrier will require `numTrades` objects to wait before all are released.
285/// @code
286/// bslmt::Barrier barrier(numTrades);
287/// bool errorFlag = false;
288/// @endcode
289/// Create a thread to process each trade.
290/// @code
291/// for (int i = 0; i < numTrades; ++i) {
292/// arguments[i].d_trades_p = &trade.d_trades;
293/// arguments[i].d_barrier_p = &barrier;
294/// arguments[i].d_errorFlag_p = &errorFlag;
295/// arguments[i].d_tradeNum = i;
296/// bslmt::ThreadUtil::create(&threadHandles[i],
297/// attributes,
298/// tradeProcessingThread,
299/// &arguments[i]);
300/// }
301/// @endcode
302/// Wait for all threads to complete.
303/// @code
304/// for (int i = 0; i < numTrades; ++i) {
305/// bslmt::ThreadUtil::join(threadHandles[i]);
306/// }
307/// @endcode
308/// Check if any error occurred.
309/// @code
310/// return false == errorFlag;
311/// }
312/// @endcode
313/// @}
314/** @} */
315/** @} */
316
317/** @addtogroup bsl
318 * @{
319 */
320/** @addtogroup bslmt
321 * @{
322 */
323/** @addtogroup bslmt_barrier
324 * @{
325 */
326
327#include <bslscm_version.h>
328
329#include <bslmt_condition.h>
330#include <bslmt_mutex.h>
331
332#include <bsls_assert.h>
333#include <bsls_libraryfeatures.h>
334#include <bsls_systemclocktype.h>
335#include <bsls_timeinterval.h>
336
337#ifdef BSLS_LIBRARYFEATURES_HAS_CPP11_BASELINE_LIBRARY
338#include <bslmt_chronoutil.h>
339
340#include <bsl_chrono.h>
341#endif
342
343
344namespace bslmt {
345
346 // =============
347 // class Barrier
348 // =============
349
350/// This class defines a thread barrier.
351///
352/// See @ref bslmt_barrier
353class Barrier {
354
355 // DATA
356 Mutex d_mutex; // mutex used to control access to this
357 // barrier.
358
359 Condition d_cond; // condition variable used for signaling
360 // blocked threads.
361
362 const int d_numArrivals; // number of arrivals before this barrier
363 // can be signaled.
364
365 int d_numArrived; // number of arrivals at this barrier.
366
367 int d_numWaiting; // number of threads currently waiting for
368 // this barrier to be signaled.
369
370 int d_sigCount; // count of number of times this barrier has
371 // been signaled.
372
373 int d_numPending; // Number of threads that have been signaled
374 // but have not yet awakened.
375
376 // NOT IMPLEMENTED
377 Barrier(const Barrier&);
378 Barrier& operator=(const Barrier&);
379
380 public:
381 // TYPES
382
383 /// The value `timedWait` returns when a timeout occurs.
384 enum { e_TIMED_OUT = -1 };
385
386 // CREATORS
387
388 /// Create a barrier that requires the specified `numArrivals` to
389 /// unblock. Optionally specify a `clockType` indicating the type of
390 /// the system clock against which the `bsls::TimeInterval` `absTime`
391 /// timeouts passed to the `timedWait` method are to be interpreted (see
392 /// {Supported Clock-Types} in the component-level documentation). If
393 /// `clockType` is not specified then the realtime system clock is used.
394 /// The behavior is undefined unless `0 < numArrivals`.
395 explicit Barrier(
396 int numArrivals,
398
399#ifdef BSLS_LIBRARYFEATURES_HAS_CPP11_BASELINE_LIBRARY
400 /// Create a barrier that requires the specified `numArrivals` to
401 /// unblock. Use the realtime system clock as the clock against which
402 /// the `absTime` timeouts passed to the `timedWait` methods are
403 /// interpreted (see {Supported Clock-Types} in the component-level
404 /// documentation). The behavior is undefined unless `0 < numArrivals`.
405 Barrier(int numArrivals, const bsl::chrono::system_clock&);
406
407 /// Create a barrier that requires the specified `numArrivals` to
408 /// unblock. Use the monotonic system clock as the clock against which
409 /// the `absTime` timeouts passed to the `timedWait` methods are
410 /// interpreted (see {Supported Clock-Types} in the component-level
411 /// documentation). The behavior is undefined unless `0 < numArrivals`.
412 Barrier(int numArrivals, const bsl::chrono::steady_clock&);
413#endif
414
415 /// Wait for all *signaled* threads to unblock and destroy this barrier.
416 /// (See `wait` and `timedWait` below for the meaning of *signaled*.)
417 /// Note that the behavior is undefined if a barrier is destroyed while
418 /// one or more threads are waiting on it.
420
421 // MANIPULATORS
422
423 /// Arrive on this barrier. If this is the last required arrival,
424 /// *signal* all the threads that are currently waiting on this barrier
425 /// to unblock and reset the state of this barrier to its initial state.
426 void arrive();
427
428 /// Arrive and block until the required number of arrivals have
429 /// occurred, or until the specified `absTime` timeout expires. In the
430 /// former case, *signal* all the threads that are currently waiting on
431 /// this barrier to unblock, reset the state of this barrier to its
432 /// initial state, and return 0. If this method times out before the
433 /// required number of arrivals, the thread is released to proceed and
434 /// ceases to contribute to the number of arrivals, and `e_TIMED_OUT` is
435 /// returned. Any other return value indicates that an error has
436 /// occurred. Errors are unrecoverable. After an error, the barrier
437 /// may be destroyed, but any other use has undefined behavior.
438 /// `absTime` is an *absolute* time represented as an interval from some
439 /// epoch, which is determined by the clock indicated at construction
440 /// (see {Supported Clock-Types} in the component-level documentation).
441 /// Note that `timedWait` and `wait` should not generally be used
442 /// together; if one or more threads called `wait` while others called
443 /// `timedWait`, then if the thread(s) that called `timedWait` were to
444 /// time out and not retry, the threads that called `wait` would never
445 /// unblock.
446 int timedWait(const bsls::TimeInterval& absTime);
447
448#ifdef BSLS_LIBRARYFEATURES_HAS_CPP11_BASELINE_LIBRARY
449 /// Arrive and block until the required number of arrivals have
450 /// occurred, or until the specified `absTime` timeout expires. In the
451 /// former case, *signal* all the threads that are currently waiting on
452 /// this barrier to unblock, reset the state of this barrier to its
453 /// initial state, and return 0. If this method times out before the
454 /// required number of arrivals, the thread is released to proceed and
455 /// ceases to contribute to the number of arrivals, and `e_TIMED_OUT` is
456 /// returned. Any other return value indicates that an error has
457 /// occurred. Errors are unrecoverable. After an error, the barrier
458 /// may be destroyed, but any other use has undefined behavior.
459 /// `absTime` is an *absolute* time represented as an interval from some
460 /// epoch, which is determined by the clock associated with the time
461 /// point. Note that `timedWait` and `wait` should not generally be
462 /// used together; if one or more threads called `wait` while others
463 /// called `timedWait`, then if the thread(s) that called `timedWait`
464 /// were to time out and not retry, the threads that called `wait` would
465 /// never unblock.
466 template <class CLOCK, class DURATION>
467 int timedWait(const bsl::chrono::time_point<CLOCK, DURATION>& absTime);
468#endif
469
470 /// Arrive and block until the required number of arrivals have
471 /// occurred. Then *signal* all the threads that are currently waiting
472 /// on this barrier to unblock and reset the state of this barrier to
473 /// its initial state. Note that generally `wait` and `timedWait`
474 /// should not be used together, for reasons explained in the
475 /// documentation of `timedWait`.
476 void wait();
477
478 // ACCESSORS
479
480 /// Return the clock type used for timeouts.
482
483 /// Return the required number of arrivals before all waiting threads
484 /// will unblock.
485 int numArrivals() const;
486
487 // DEPRECATED METHODS
488
489 /// Return the required number of arrivals before all waiting threads
490 /// will unblock.
491 ///
492 /// @deprecated Use @ref numArrivals instead.
493 int numThreads() const;
494};
495
496// ============================================================================
497// INLINE DEFINITIONS
498// ============================================================================
499
500// CREATORS
501inline
502Barrier::Barrier(int numArrivals, bsls::SystemClockType::Enum clockType)
503: d_mutex()
504, d_cond(clockType)
505, d_numArrivals(numArrivals)
506, d_numArrived(0)
507, d_numWaiting(0)
508, d_sigCount(0)
509, d_numPending(0)
510{
512}
513
514#ifdef BSLS_LIBRARYFEATURES_HAS_CPP11_BASELINE_LIBRARY
515inline
516Barrier::Barrier(int numArrivals, const bsl::chrono::system_clock&)
517: d_mutex()
518, d_cond(bsls::SystemClockType::e_REALTIME)
519, d_numArrivals(numArrivals)
520, d_numArrived(0)
521, d_numWaiting(0)
522, d_sigCount(0)
523, d_numPending(0)
524{
526}
527
528inline
529Barrier::Barrier(int numArrivals, const bsl::chrono::steady_clock&)
530: d_mutex()
531, d_cond(bsls::SystemClockType::e_MONOTONIC)
532, d_numArrivals(numArrivals)
533, d_numArrived(0)
534, d_numWaiting(0)
535, d_sigCount(0)
536, d_numPending(0)
537{
539}
540
541// MANIPULATORS
542template <class CLOCK, class DURATION>
543inline
544int Barrier::timedWait(const bsl::chrono::time_point<CLOCK, DURATION>& absTime)
545{
546 return bslmt::ChronoUtil::timedWait(this, absTime);
547}
548#endif
549
550// ACCESSORS
551inline
553{
554 return d_cond.clockType();
555}
556
557inline
559{
560 return d_numArrivals;
561}
562
563// DEPRECATED METHODS
564inline
566{
567 return d_numArrivals;
568}
569
570} // close package namespace
571
572
573#endif
574
575// ----------------------------------------------------------------------------
576// Copyright 2015 Bloomberg Finance L.P.
577//
578// Licensed under the Apache License, Version 2.0 (the "License");
579// you may not use this file except in compliance with the License.
580// You may obtain a copy of the License at
581//
582// http://www.apache.org/licenses/LICENSE-2.0
583//
584// Unless required by applicable law or agreed to in writing, software
585// distributed under the License is distributed on an "AS IS" BASIS,
586// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
587// See the License for the specific language governing permissions and
588// limitations under the License.
589// ----------------------------- END-OF-FILE ----------------------------------
590
591/** @} */
592/** @} */
593/** @} */
Definition bslmt_barrier.h:353
int timedWait(const bsls::TimeInterval &absTime)
int numArrivals() const
Definition bslmt_barrier.h:558
@ e_TIMED_OUT
Definition bslmt_barrier.h:384
int numThreads() const
Definition bslmt_barrier.h:565
bsls::SystemClockType::Enum clockType() const
Return the clock type used for timeouts.
Definition bslmt_barrier.h:552
Definition bslmt_condition.h:220
bsls::SystemClockType::Enum clockType() const
Return the clock type used for timeouts.
Definition bslmt_condition.h:432
Definition bslmt_mutex.h:315
Definition bsls_timeinterval.h:301
#define BSLS_ASSERT_SAFE(X)
Definition bsls_assert.h:1762
#define BSLS_IDENT(str)
Definition bsls_ident.h:195
Definition bslmt_barrier.h:344
Definition bdlt_iso8601util.h:691
static int timedWait(PRIMITIVE *primitive, const bsl::chrono::time_point< CLOCK, DURATION > &absTime)
Definition bslmt_chronoutil.h:345
Enum
Definition bsls_systemclocktype.h:117
@ e_REALTIME
Definition bsls_systemclocktype.h:120