// bslmt_barrier.h -*-C++-*- // ---------------------------------------------------------------------------- // NOTICE // // This component is not up to date with current BDE coding standards, and // should not be used as an example for new development. // ---------------------------------------------------------------------------- #ifndef INCLUDED_BSLMT_BARRIER #define INCLUDED_BSLMT_BARRIER #include <bsls_ident.h> BSLS_IDENT("$Id: $") //@PURPOSE: Provide a thread barrier component. // //@CLASSES: // bslmt::Barrier: thread barrier class // //@SEE_ALSO: bslmt_latch // //@DESCRIPTION: This component defines a thread barrier named 'bslmt::Barrier'. // Barriers provide a simple mechanism for synchronizing a series of threads at // a given point in a program. A barrier is constructed with a number // 'numArrivals' which is the number of arrivals (invocations of 'arrive', // 'wait', and 'timedWait') required to reach the synchronization point for the // barrier to be unblocked. As each thread reaches the synchronization point, // it calls either the 'arrive' method to indicate reaching the synchronization // point or a 'wait' method to indicate reaching the synchronization point and // blocking until the barrier has the required number of arrivals. An // invariant is that the number of threads blocking on a barrier is always less // than 'numArrivals'. Once the required 'numArrivals' has occurred, the // invariant is restored by unblocking all the waiting threads and resetting // the barrier to its initial state. In particular, the barrier can be reused // several times in succession. // // Note that, if 'arrive' will not be used, the number of threads sharing the // use of the barrier should be exactly 'numArrivals', as only exactly // 'numArrivals' threads calling 'wait' will be unblocked. In particular, // extra threads calling 'wait' will block, perhaps unwittingly participating // in the next round of reuse of the barrier together with the unblocked // 'numArrivals' threads (leading to potential race conditions). // // Note also that the behavior is undefined if a barrier is destroyed while one // or more threads are waiting on it. // ///Supported Clock-Types ///--------------------- // 'bsls::SystemClockType' supplies the enumeration indicating the system clock // on which timeouts supplied to other methods should be based. If the clock // type indicated at construction is 'bsls::SystemClockType::e_REALTIME', the // 'absTime' argument passed to the 'timedWait' method should be expressed as // an *absolute* offset since 00:00:00 UTC, January 1, 1970 (which matches the // epoch used in 'bsls::SystemTime::now(bsls::SystemClockType::e_REALTIME)'. // If the clock type indicated at construction is // 'bsls::SystemClockType::e_MONOTONIC', the 'absTime' argument passed to the // 'timedWait' method should be expressed as an *absolute* offset since the // epoch of this clock (which matches the epoch used in // 'bsls::SystemTime::now(bsls::SystemClockType::e_MONOTONIC)'. // ///Usage ///----- // The following example demonstrates the use of a 'bslmt::Barrier' to create // "checkpoints" in a threaded "basket trade" processing logic. In this // example, a "basket" is a series of trades submitted as one logical trade. // If any given trade fails to process for any reason, then all the trades must // be canceled. // // The example is driven through function 'processBasketTrade', which takes as // its argument a reference to a 'BasketTrade' structure. The 'BasketTrade' // structure contains a collection of 'Trade' objects; the 'processBasketTrade' // function creates a separate thread to manage each 'Trade' object. // // The 'Trade' threads proceed independently, except that they synchronize with // one another at various stages of the trade processing: each thread waits for // all trades to complete a given step before any individual trade thread // proceeds to the next step. // // The 'bslmt::Barrier' is used repeatedly at each processing stage to wait for // all trades to complete the given stage before continuing to the next stage. // // To begin, we define the fundamental structures 'Trade' and 'BasketTrade'. //.. // enum { // k_MAX_BASKET_TRADES = 10 // }; // // struct Trade { // // Trade stuff... // }; // // struct BasketTrade { // bsl::vector<Trade> d_trades; // array of trade that comprise the // // basket // }; //.. // Functions 'validateTrade', 'insertToDatabase', and 'submitToExchange' define // functionality for the three stages of trade processing. Again, the // 'bslmt::Barrier' will be used so that no individual trade moves forward to // the next stage before all trades have completed the given stage. So, for // instance, no individual trade can call the 'insertToDatabase' function until // all trades have successfully completed the 'validateTrade' function. // // Functions 'deleteFromDatabase' and 'cancelAtExchange' are used for rolling // back all trades in the event that any one trade fails to move forward. // // The implementation of these functions is left incomplete for our example. //.. // int validateTrade(Trade &trade) // { // (void)trade; // int result = 0; // // Do some checking here... // // return result; // } // // int insertToDatabase(Trade &trade) // { // (void)trade; // int result = 0; // // Insert the record here... // // return result; // } // // int submitToExchange(Trade &trade) // { // (void)trade; // int result = 0; // // Do submission here... // // return result; // } // // int deleteFromDatabase(Trade &trade) // { // (void)trade; // int result = 0; // // Delete record here... // // return result; // } // // int cancelAtExchange(Trade &trade) // { // (void)trade; // int result = 0; // // Cancel trade here... // // return result; // } //.. // The 'processTrade' function handles a single trade within a Trade Basket. // Because this function is called within a 'bslmt::Thread' callback (see the // 'tradeProcessingThread' function, below), its arguments are passed in a // single structure. The 'processTrade' function validates a trade, stores the // trade into a database, and registers that trade with an exchange. At each // step, the 'processTrade' function synchronizes with other trades in the // Trade Basket. //.. // struct TradeThreadArgument { // bsl::vector<Trade> *d_trades_p; // bslmt::Barrier *d_barrier_p; // volatile bool *d_errorFlag_p; // int d_tradeNum; // }; // // TradeThreadArgument *processTrade(TradeThreadArgument *arguments) // { // int retval; // Trade &trade = (*arguments->d_trades_p)[arguments->d_tradeNum]; // // retval = validateTrade(trade); //.. // If this trade failed validation, then indicate that an error has occurred. // Note that, even when an error occurs, we must still block on the barrier // object; otherwise, other threads which did not fail would remain blocked // indefinitely. //.. // if (retval) *arguments->d_errorFlag_p = true; // arguments->d_barrier_p->wait(); //.. // Once all threads have completed the validation phase, check to see if any // errors occurred; if so, exit. Otherwise continue to the next step. //.. // if (*arguments->d_errorFlag_p) return arguments; // RETURN // // retval = insertToDatabase(trade); // if (retval) *arguments->d_errorFlag_p = true; // arguments->d_barrier_p->wait(); //.. // As before, if an error occurs on this thread, we must still block on the // barrier object. This time, if an error has occurred, we need to check to // see whether this trade had an error. If not, then the trade has been // inserted into the database, so we need to remove it before we exit. //.. // if (*arguments->d_errorFlag_p) { // if (!retval) deleteFromDatabase(trade); // return arguments; // RETURN // } //.. // The final synchronization point is at the exchange. As before, if there is // an error in the basket, we may need to cancel the individual trade. //.. // retval = submitToExchange(trade); // if (retval) *arguments->d_errorFlag_p = true; // arguments->d_barrier_p->wait(); // if (*arguments->d_errorFlag_p) { // if (!retval) cancelAtExchange(trade); // deleteFromDatabase(trade); // return arguments; // RETURN // } //.. // All synchronized steps have completed for all trades in this basket. The // basket trade is placed. //.. // return arguments; // } //.. // Function 'tradeProcessingThread' is a callback for 'bslmt::ThreadUtil', // which requires 'void' pointers for argument and return type and 'extern "C"' // linkage. 'bslmt::ThreadUtil::create()' expects a pointer to this function, // and provides that function pointer to the newly created thread. The new // thread then executes this function. // // The 'tradeProcessingThread' function receives the 'void' pointer, casts it // to our required type ('TradeThreadArgument *'), and then calls the // type-specific function, 'processTrade'. On return, the specific type is // cast back to 'void*'. //.. // extern "C" void *tradeProcessingThread(void *argumentsIn) // { // return (void *) processTrade ((TradeThreadArgument *)argumentsIn); // } //.. // Function 'processBasketTrade' drives the example. Given a 'BasketTrade', // the function spawns a separate thread for each individual trade in the // basket, supplying the function 'tradeProcessingThread' to be executed on // each thread. //.. // bool processBasketTrade(BasketTrade& trade) // // Return 'true' if the specified basket 'trade' was processed // // successfully, and 'false' otherwise. The 'trade' is processed // // atomically, i.e., all the trades succeed, or none of the trades are // // executed. // { // TradeThreadArgument arguments[k_MAX_BASKET_TRADES]; // bslmt::ThreadAttributes attributes; // bslmt::ThreadUtil::Handle threadHandles[k_MAX_BASKET_TRADES]; // // int numTrades = static_cast<int>(trade.d_trades.size()); // assert(0 < numTrades && k_MAX_BASKET_TRADES >= numTrades); //.. // Construct the barrier that will be used by the processing threads. Since a // thread will be created for each trade in the basket, use the number of // trades as the barrier count. When 'bslmt::Barrier::wait()' is called, the // barrier will require 'numTrades' objects to wait before all are released. //.. // bslmt::Barrier barrier(numTrades); // bool errorFlag = false; //.. // Create a thread to process each trade. //.. // for (int i = 0; i < numTrades; ++i) { // arguments[i].d_trades_p = &trade.d_trades; // arguments[i].d_barrier_p = &barrier; // arguments[i].d_errorFlag_p = &errorFlag; // arguments[i].d_tradeNum = i; // bslmt::ThreadUtil::create(&threadHandles[i], // attributes, // tradeProcessingThread, // &arguments[i]); // } //.. // Wait for all threads to complete. //.. // for (int i = 0; i < numTrades; ++i) { // bslmt::ThreadUtil::join(threadHandles[i]); // } //.. // Check if any error occurred. //.. // return false == errorFlag; // } //.. #include <bslscm_version.h> #include <bslmt_condition.h> #include <bslmt_mutex.h> #include <bsls_assert.h> #include <bsls_libraryfeatures.h> #include <bsls_systemclocktype.h> #include <bsls_timeinterval.h> #ifdef BSLS_LIBRARYFEATURES_HAS_CPP11_BASELINE_LIBRARY #include <bslmt_chronoutil.h> #include <bsl_chrono.h> #endif namespace BloombergLP { namespace bslmt { // ============= // class Barrier // ============= class Barrier { // This class defines a thread barrier. // DATA Mutex d_mutex; // mutex used to control access to this // barrier. Condition d_cond; // condition variable used for signaling // blocked threads. const int d_numArrivals; // number of arrivals before this barrier // can be signaled. int d_numArrived; // number of arrivals at this barrier. int d_numWaiting; // number of threads currently waiting for // this barrier to be signaled. int d_sigCount; // count of number of times this barrier has // been signaled. int d_numPending; // Number of threads that have been signaled // but have not yet awakened. // NOT IMPLEMENTED Barrier(const Barrier&); Barrier& operator=(const Barrier&); public: // TYPES enum { e_TIMED_OUT = -1 }; // The value 'timedWait' returns when a timeout occurs. // CREATORS explicit Barrier( int numArrivals, bsls::SystemClockType::Enum clockType = bsls::SystemClockType::e_REALTIME); // Create a barrier that requires the specified 'numArrivals' to // unblock. Optionally specify a 'clockType' indicating the type of // the system clock against which the 'bsls::TimeInterval' 'absTime' // timeouts passed to the 'timedWait' method are to be interpreted (see // {Supported Clock-Types} in the component-level documentation). If // 'clockType' is not specified then the realtime system clock is used. // The behavior is undefined unless '0 < numArrivals'. #ifdef BSLS_LIBRARYFEATURES_HAS_CPP11_BASELINE_LIBRARY Barrier(int numArrivals, const bsl::chrono::system_clock&); // Create a barrier that requires the specified 'numArrivals' to // unblock. Use the realtime system clock as the clock against which // the 'absTime' timeouts passed to the 'timedWait' methods are // interpreted (see {Supported Clock-Types} in the component-level // documentation). The behavior is undefined unless '0 < numArrivals'. Barrier(int numArrivals, const bsl::chrono::steady_clock&); // Create a barrier that requires the specified 'numArrivals' to // unblock. Use the monotonic system clock as the clock against which // the 'absTime' timeouts passed to the 'timedWait' methods are // interpreted (see {Supported Clock-Types} in the component-level // documentation). The behavior is undefined unless '0 < numArrivals'. #endif ~Barrier(); // Wait for all *signaled* threads to unblock and destroy this barrier. // (See 'wait' and 'timedWait' below for the meaning of *signaled*.) // Note that the behavior is undefined if a barrier is destroyed while // one or more threads are waiting on it. // MANIPULATORS void arrive(); // Arrive on this barrier. If this is the last required arrival, // *signal* all the threads that are currently waiting on this barrier // to unblock and reset the state of this barrier to its initial state. int timedWait(const bsls::TimeInterval& absTime); // Arrive and block until the required number of arrivals have // occurred, or until the specified 'absTime' timeout expires. In the // former case, *signal* all the threads that are currently waiting on // this barrier to unblock, reset the state of this barrier to its // initial state, and return 0. If this method times out before the // required number of arrivals, the thread is released to proceed and // ceases to contribute to the number of arrivals, and 'e_TIMED_OUT' is // returned. Any other return value indicates that an error has // occurred. Errors are unrecoverable. After an error, the barrier // may be destroyed, but any other use has undefined behavior. // 'absTime' is an *absolute* time represented as an interval from some // epoch, which is determined by the clock indicated at construction // (see {Supported Clock-Types} in the component-level documentation). // Note that 'timedWait' and 'wait' should not generally be used // together; if one or more threads called 'wait' while others called // 'timedWait', then if the thread(s) that called 'timedWait' were to // time out and not retry, the threads that called 'wait' would never // unblock. #ifdef BSLS_LIBRARYFEATURES_HAS_CPP11_BASELINE_LIBRARY template <class CLOCK, class DURATION> int timedWait(const bsl::chrono::time_point<CLOCK, DURATION>& absTime); // Arrive and block until the required number of arrivals have // occurred, or until the specified 'absTime' timeout expires. In the // former case, *signal* all the threads that are currently waiting on // this barrier to unblock, reset the state of this barrier to its // initial state, and return 0. If this method times out before the // required number of arrivals, the thread is released to proceed and // ceases to contribute to the number of arrivals, and 'e_TIMED_OUT' is // returned. Any other return value indicates that an error has // occurred. Errors are unrecoverable. After an error, the barrier // may be destroyed, but any other use has undefined behavior. // 'absTime' is an *absolute* time represented as an interval from some // epoch, which is determined by the clock associated with the time // point. Note that 'timedWait' and 'wait' should not generally be // used together; if one or more threads called 'wait' while others // called 'timedWait', then if the thread(s) that called 'timedWait' // were to time out and not retry, the threads that called 'wait' would // never unblock. #endif void wait(); // Arrive and block until the required number of arrivals have // occurred. Then *signal* all the threads that are currently waiting // on this barrier to unblock and reset the state of this barrier to // its initial state. Note that generally 'wait' and 'timedWait' // should not be used together, for reasons explained in the // documentation of 'timedWait'. // ACCESSORS bsls::SystemClockType::Enum clockType() const; // Return the clock type used for timeouts. int numArrivals() const; // Return the required number of arrivals before all waiting threads // will unblock. // DEPRECATED METHODS int numThreads() const; // !DEPRECATED!: Use 'numArrivals' instead. // // Return the required number of arrivals before all waiting threads // will unblock. }; } // close package namespace // ============================================================================ // INLINE DEFINITIONS // ============================================================================ // CREATORS inline bslmt::Barrier::Barrier(int numArrivals, bsls::SystemClockType::Enum clockType) : d_mutex() , d_cond(clockType) , d_numArrivals(numArrivals) , d_numArrived(0) , d_numWaiting(0) , d_sigCount(0) , d_numPending(0) { BSLS_ASSERT_SAFE(0 < numArrivals); } #ifdef BSLS_LIBRARYFEATURES_HAS_CPP11_BASELINE_LIBRARY inline bslmt::Barrier::Barrier(int numArrivals, const bsl::chrono::system_clock&) : d_mutex() , d_cond(bsls::SystemClockType::e_REALTIME) , d_numArrivals(numArrivals) , d_numArrived(0) , d_numWaiting(0) , d_sigCount(0) , d_numPending(0) { BSLS_ASSERT_SAFE(0 < numArrivals); } inline bslmt::Barrier::Barrier(int numArrivals, const bsl::chrono::steady_clock&) : d_mutex() , d_cond(bsls::SystemClockType::e_MONOTONIC) , d_numArrivals(numArrivals) , d_numArrived(0) , d_numWaiting(0) , d_sigCount(0) , d_numPending(0) { BSLS_ASSERT_SAFE(0 < numArrivals); } // MANIPULATORS template <class CLOCK, class DURATION> inline int bslmt::Barrier::timedWait( const bsl::chrono::time_point<CLOCK, DURATION>& absTime) { return bslmt::ChronoUtil::timedWait(this, absTime); } #endif // ACCESSORS inline bsls::SystemClockType::Enum bslmt::Barrier::clockType() const { return d_cond.clockType(); } inline int bslmt::Barrier::numArrivals() const { return d_numArrivals; } // DEPRECATED METHODS inline int bslmt::Barrier::numThreads() const { return d_numArrivals; } } // close enterprise namespace #endif // ---------------------------------------------------------------------------- // Copyright 2015 Bloomberg Finance L.P. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // ----------------------------- END-OF-FILE ----------------------------------