// bdlcc_singleproducersingleconsumerboundedqueue.h -*-C++-*- #ifndef INCLUDED_BDLCC_SINGLEPRODUCERSINGLECONSUMERBOUNDEDQUEUE #define INCLUDED_BDLCC_SINGLEPRODUCERSINGLECONSUMERBOUNDEDQUEUE #include <bsls_ident.h> BSLS_IDENT("$Id: $") //@PURPOSE: Provide a thread-aware SPSC bounded queue of values. // //@CLASSES: // bdlcc::SingleProducerSingleConsumerBoundedQueue: SPSCB concurrent queue // //@DESCRIPTION: This component defines a type, // 'bdlcc::SingleProducerSingleConsumerBoundedQueue', that provides an // efficient, thread-aware bounded (capacity fixed at construction) queue of // values assuming a single producer and a single consumer. The behavior of // the methods 'pushBack' and 'tryPushBack' is undefined unless the use is by a // single producer (one thread or a group of threads using external // synchronization). Also, the behavior of the methods 'popFront', // 'tryPopFront', and 'removeAll' is undefined unless the use is by a single // consumer. This class is ideal for synchronization and communication between // threads in a producer-consumer model when a bounded queue is appropriate and // there is only one producer thread and one consumer thread. // // The queue provides 'pushBack' and 'popFront' methods for pushing data into // the queue and popping data from the queue. When the queue is full, the // 'pushBack' methods block until data is removed from the queue. When the // queue is empty, the 'popFront' methods block until data appears in the // queue. Non-blocking methods 'tryPushBack' and 'tryPopFront' are also // provided. The 'tryPushBack' method fails immediately, returning a non-zero // value, if the queue is full. The 'tryPopFront' method fails immediately, // returning a non-zero value, if the queue is empty. // // The queue may be placed into a "enqueue disabled" state using the // 'disablePushBack' method. When disabled, 'pushBack' and 'tryPushBack' fail // immediately and return an error code. Any threads blocked in 'pushBack' // when the queue is enqueue disabled return from 'pushBack' immediately and // return an error code. The queue may be restored to normal operation with // the 'enablePushBack' method. // // The queue may be placed into a "dequeue disabled" state using the // 'disablePopFront' method. When dequeue disabled, 'popFront', 'tryPopFront', // and 'waitUntilEmpty' fail immediately and return an error code. Any threads // blocked in 'popFront' and 'waitUntilEmpty' when the queue is dequeue // disabled return immediately and return an error code. The queue may be // restored to normal operation with the 'enablePopFront' method. // ///Template Requirements ///--------------------- // 'bdlcc::SingleProducerSingleConsumerBoundedQueue' is a template that is // parameterized on the type of element contained within the queue. The // supplied template argument, 'TYPE', must provide both a default constructor // and a copy constructor, as well as an assignment operator. If the default // constructor accepts a 'bslma::Allocator *', 'TYPE' must declare the uses // 'bslma::Allocator' trait (see 'bslma_usesbslmaallocator') so that the // allocator of the queue is propagated to the elements contained in the queue. // ///Exception Safety ///---------------- // A 'bdlcc::SingleProducerSingleConsumerBoundedQueue' is exception neutral, // and all of the methods of 'bdlcc::SingleProducerSingleConsumerBoundedQueue' // provide the strong exception safety guarantee (see 'bsldoc_glossary'). // ///Move Semantics in C++03 ///----------------------- // Move-only types are supported by // 'bdlcc::SingleProducerSingleConsumerBoundedQueue' on C++11 platforms only // (where 'BSLMF_MOVABLEREF_USES_RVALUE_REFERENCES' is defined), and are not // supported on C++03 platforms. Unfortunately, in C++03, there are user types // where a 'bslmf::MovableRef' will not safely degrade to a lvalue reference // when a move constructor is not available (types providing a constructor // template taking any type), so 'bslmf::MovableRefUtil::move' cannot be used // directly on a user supplied template type. See internal bug report 99039150 // for more information. // ///Usage ///----- // This section illustrates intended use of this component. // ///Example 1: A Simple Thread Pool ///- - - - - - - - - - - - - - - - // In the following example a 'bdlcc::SingleProducerSingleConsumerBoundedQueue' // is used to communicate between a single "producer" thread and a single // "consumer" thread. The "producer" will push work requests onto the queue, // and the "consumer" will iteratively take a work request from the queue and // service the request. This example shows a partial, simplified // implementation of the 'bdlmt::FixedThreadPool' class. See component // 'bdlmt_fixedthreadpool' for more information. // // First, we define a utility classes that handles a simple "work item": //.. // struct my_WorkData { // // Work data... // }; // // struct my_WorkRequest { // enum RequestType { // e_WORK = 1, // e_STOP = 2 // }; // // RequestType d_type; // my_WorkData d_data; // // Work data... // }; //.. // Next, we provide a simple function to service an individual work item. The // details are unimportant for this example: //.. // void myDoWork(const my_WorkData& data) // // Do some work based upon the specified 'data'. // { // // do some stuff... // (void)data; // } //.. // Then, we define a 'myConsumer' function that will pop elements off the queue // and process them. Note that the call to 'queue->popFront()' will block // until there is an element available on the queue: //.. // void myConsumer( // bdlcc::SingleProducerSingleConsumerBoundedQueue<my_WorkRequest> *queue) // // Pop elements from the specified 'queue'. // { // while (1) { // // 'popFront()' will wait for a 'my_WorkRequest' until available. // // my_WorkRequest item; // item.d_type = my_WorkRequest::e_WORK; // // assert(0 == queue->popFront(&item)); // // if (item.d_type == my_WorkRequest::e_STOP) { break; } // myDoWork(item.d_data); // } // } //.. // Finally, we define a 'myProducer' function that serves multiple roles: it // creates the 'bdlcc::SingleProducerSingleConsumerBoundedQueue', starts the // consumer thread, and then produces and enqueues work items. When work // requests are exhausted, this function enqueues one 'e_STOP' item for the // consumer queue. This 'e_STOP' item indicates to the consumer thread to // terminate its thread-handling function. //.. // void myProducer() // // Create a queue, start consumer thread, produce and enqueue work. // { // enum { // k_MAX_QUEUE_LENGTH = 100, // k_NUM_WORK_ITEMS = 1000 // }; // // bdlcc::SingleProducerSingleConsumerBoundedQueue<my_WorkRequest> // queue(k_MAX_QUEUE_LENGTH); // // bslmt::ThreadGroup consumerThreads; // consumerThreads.addThreads(bdlf::BindUtil::bind(&myConsumer, &queue), // 1); // // for (int i = 0; i < k_NUM_WORK_ITEMS; ++i) { // my_WorkRequest item; // item.d_type = my_WorkRequest::e_WORK; // item.d_data = my_WorkData(); // some stuff to do // queue.pushBack(item); // } // // { // my_WorkRequest item; // item.d_type = my_WorkRequest::e_STOP; // queue.pushBack(item); // } // // consumerThreads.joinAll(); // } //.. #include <bdlscm_version.h> #include <bslalg_scalarprimitives.h> #include <bslma_default.h> #include <bslma_usesbslmaallocator.h> #include <bslmf_movableref.h> #include <bslmf_nestedtraitdeclaration.h> #include <bslmt_condition.h> #include <bslmt_lockguard.h> #include <bslmt_mutex.h> #include <bslmt_platform.h> #include <bslmt_threadutil.h> #include <bsls_assert.h> #include <bsls_atomicoperations.h> #include <bsls_compilerfeatures.h> #include <bsls_objectbuffer.h> #include <bsls_types.h> namespace BloombergLP { namespace bdlcc { // =============================================================== // class SingleProducerSingleConsumerBoundedQueue_PopCompleteGuard // =============================================================== template <class TYPE, class NODE> class SingleProducerSingleConsumerBoundedQueue_PopCompleteGuard { // This class implements a guard that invokes 'TYPE::popComplete' on a // 'NODE' upon destruction. // PRIVATE TYPES typedef typename bsls::Types::Uint64 Uint64; // DATA TYPE *d_queue_p; // managed queue owning the managed node NODE *d_node_p; // managed node Uint64 d_index; // value of 'd_queue_p->d_popIndex' // NOT IMPLEMENTED SingleProducerSingleConsumerBoundedQueue_PopCompleteGuard(); SingleProducerSingleConsumerBoundedQueue_PopCompleteGuard( const SingleProducerSingleConsumerBoundedQueue_PopCompleteGuard&); SingleProducerSingleConsumerBoundedQueue_PopCompleteGuard& operator=( const SingleProducerSingleConsumerBoundedQueue_PopCompleteGuard&); public: // CREATORS SingleProducerSingleConsumerBoundedQueue_PopCompleteGuard(TYPE *queue, NODE *node, Uint64 index); // Create a guard managing the specified 'queue' and will invoke // 'popComplete' with the specified 'node' and 'index'. ~SingleProducerSingleConsumerBoundedQueue_PopCompleteGuard(); // Destroy this object and invoke the 'TYPE::popComplete'. }; // ============================================== // class SingleProducerSingleConsumerBoundedQueue // ============================================== template <class TYPE> #if defined(BSLS_COMPILERFEATURES_SUPPORT_ALIGNAS) class alignas(bslmt::Platform::e_CACHE_LINE_SIZE) SingleProducerSingleConsumerBoundedQueue { #else class SingleProducerSingleConsumerBoundedQueue { #endif // This class provides a thread-safe bounded queue of values. // PRIVATE TYPES typedef unsigned int Uint; typedef typename bsls::Types::Uint64 Uint64; typedef typename bsls::AtomicOperations::AtomicTypes::Uint AtomicUint; typedef typename bsls::AtomicOperations::AtomicTypes::Uint64 AtomicUint64; typedef typename bsls::AtomicOperations AtomicOp; // PRIVATE CONSTANTS enum { // These value are used as values for 'd_state' in 'Node'. A node is // writable at creation and after a read completes (when the single // producer can write to the node). A node is readable after it is // written (when the node can be read by the single consumer). The // states in-between these two states (e.g., writing) are not needed by // this implementation of the queue. e_READABLE, // node can be read e_READABLE_AND_BLOCKED, // node can be read and has blocked producer e_WRITABLE, // node can be written e_WRITABLE_AND_EMPTY, // node can be written, queue is empty and // this is the *first* writable node e_WRITABLE_AND_BLOCKED // node can be written, queue is empty, this // is the *first* writable node, and the // consumer is blocked waiting for this node // to be readable }; // PRIVATE TYPES template <class DATA> struct QueueNode { // PUBLIC DATA bsls::ObjectBuffer<DATA> d_value; // stored value AtomicUint d_state; // 'e_READABLE', 'e_WRITABLE', etc. }; typedef QueueNode<TYPE> Node; // DATA AtomicUint64 d_popIndex; // index of next element to // pop Node *d_popElement_p; // array of elements that // comprise the bounded queue; // identical to // 'd_pushElement_p' const bsl::size_t d_popCapacity; // the capacity of the queue; // identical to // 'd_pushCapacity' AtomicUint d_popDisabledGeneration; // generation count of pop // disablements mutable AtomicUint d_emptyCount; // count of threads in // 'waitUntilEmpty' AtomicUint d_emptyGeneration; // generation count for the // empty queue state, queue is // empty whenever least // significant bit is zero const char d_popPad[ bslmt::Platform::e_CACHE_LINE_SIZE - sizeof(AtomicUint64) - sizeof(Node *) - sizeof(bsl::size_t) - sizeof(AtomicUint) - sizeof(AtomicUint) - sizeof(AtomicUint)]; // padding to prevent // subsequent data from being // in the same cache line as // the prior data AtomicUint64 d_pushIndex; // index of next target // element for a push Node *d_pushElement_p; // array of elements that // comprise the bounded queue; // identical to // 'd_popElement_p' const bsl::size_t d_pushCapacity; // the capacity of the queue; // identical to // 'd_popCapacity' AtomicUint d_pushDisabledGeneration; // generation count of push // disablements const char d_pushPad[ bslmt::Platform::e_CACHE_LINE_SIZE - sizeof(AtomicUint64) - sizeof(Node *) - sizeof(bsl::size_t) - sizeof(AtomicUint)]; // padding to prevent // subsequent data from being // in the same cache line as // the prior data bslmt::Mutex d_popMutex; // used with 'd_popCondition' // to block the consumer when // the queue is empty bslmt::Condition d_popCondition; // condition for blocking the // consumer when the queue is // empty bslmt::Mutex d_pushMutex; // used with 'd_pushCondition' // to block the producer when // the queue is full bslmt::Condition d_pushCondition; // condition for blocking the // producer when the queue is // full mutable bslmt::Mutex d_emptyMutex; // blocking point for // 'waitUntilEmpty' mutable bslmt::Condition d_emptyCondition; // condition variable for // 'waitUntilEmpty' bslma::Allocator *d_allocator_p; // allocator, held not owned // FRIENDS friend class SingleProducerSingleConsumerBoundedQueue_PopCompleteGuard< SingleProducerSingleConsumerBoundedQueue<TYPE>, typename SingleProducerSingleConsumerBoundedQueue<TYPE>::Node>; // PRIVATE CLASS METHODS static void incrementUntil(AtomicUint *value, unsigned int bitValue); // If the specified 'value' does not have its lowest-order bit set to // the value of the specified 'bitValue', increment 'value' until it // does. Note that this method is used to modify the generation counts // stored in 'd_popDisabledGeneration' and 'd_pushDisabledGeneration'. // PRIVATE MANIPULATORS void popComplete(Node *node, Uint64 index); // Destruct the value stored in the specified 'node', use the specified // 'index' in calculations to mark the 'node' writable, unblock any // blocked "push" threads, and if the queue is empty update the empty // generation and signal the queue empty condition. This method is // used within 'popFrontImp' by a guard to complete the reclamation of // a node in the presence of an exception. int popFrontImp(TYPE *value, bool isTry); // If the specified 'isTry' is 'false', remove the element from the // front of this queue and load that element into the specified // 'value'; otherwise, attempt to remove the element from the front of // this queue without blocking, and, if successful, load the 'value' // with the removed element. If 'false == isTry' and the queue is // empty, block until it is not empty. Return 0 on success, and a // non-zero value otherwise. Specifically, return 'e_SUCCESS' on // success, 'e_DISABLED' if 'isPopFrontDisabled()', 'e_EMPTY' if // 'true == isTry', '!isPopFrontDisabled()', and the queue is empty, // and 'e_FAILED' if an underlying mechanism returns an error. On // failure, 'value' is not changed. Threads blocked due to the queue // being empty will return 'e_DISABLED' if 'disablePopFront' is // invoked. int pushBackImp(const TYPE& value, bool isTry); // If the specified 'isTry' is 'false', append the specified 'value' to // the back of this queue; otherwise, attempt to append the 'value' to // the back of this queue without blocking. Return 0 on success, and a // non-zero value otherwise. Specifically, return 'e_SUCCESS' on // success, 'e_DISABLED' if 'isPushBackDisabled()', 'e_FULL' if // 'true == isTry', '!isPushBackDisabled()', and the queue is full, and // 'e_FAILED' if an underlying mechanism returns an error. Threads // blocked due to the queue being full will return 'e_DISABLED' if // 'disablePushFront' is invoked. int pushBackImp(bslmf::MovableRef<TYPE> value, bool isTry); // If the specified 'isTry' is 'false', append the specified // move-insertable 'value' to the back of this queue; otherwise, // attempt to append the 'value' to the back of this queue without // blocking. 'value' is left in a valid but unspecified state. Return // 0 on success, and a non-zero value otherwise. Specifically, return // 'e_DISABLED' if 'isPushBackDisabled()', 'e_FULL' if 'true == isTry', // '!isPushBackDisabled()', and the queue is full, and 'e_FAILED' if an // underlying mechanism returns an error. On failure, 'value' is not // changed. Threads blocked due to the queue being full will return // 'e_DISABLED' if 'disablePushFront' is invoked. void pushComplete(Node *node, Uint64 index); // Mark the specified 'node' readable, signal 'd_popCondition' if // necessary, and update 'd_popIndex' to be the index value of the // location to be used after specified 'index' location. This method // is invoked from 'pushBackImp'. // NOT IMPLEMENTED SingleProducerSingleConsumerBoundedQueue( const SingleProducerSingleConsumerBoundedQueue&); SingleProducerSingleConsumerBoundedQueue& operator=( const SingleProducerSingleConsumerBoundedQueue&); public: // TRAITS BSLMF_NESTED_TRAIT_DECLARATION(SingleProducerSingleConsumerBoundedQueue, bslma::UsesBslmaAllocator); // PUBLIC TYPES typedef TYPE value_type; // The type for elements. // PUBLIC CONSTANTS enum { e_SUCCESS = 0, e_EMPTY = -1, e_FULL = -2, e_DISABLED = -3, e_FAILED = -4 }; // CREATORS explicit SingleProducerSingleConsumerBoundedQueue( bsl::size_t capacity, bslma::Allocator *basicAllocator = 0); // Create a thread-aware queue with at least the specified 'capacity'. // Optionally specify a 'basicAllocator' used to supply memory. If // 'basicAllocator' is 0, the currently installed default allocator is // used. ~SingleProducerSingleConsumerBoundedQueue(); // Destroy this object. // MANIPULATORS int popFront(TYPE *value); // Remove the element from the front of this queue and load that // element into the specified 'value'. If the queue is empty, block // until it is not empty. Return 0 on success, and a non-zero value // otherwise. Specifically, return 'e_SUCCESS' on success, // 'e_DISABLED' if 'isPopFrontDisabled()' and 'e_FAILED' if an // underlying mechanism returns an error. On failure, 'value' is not // changed. Threads blocked due to the queue being empty will return // 'e_DISABLED' if 'disablePopFront' is invoked. The behavior is // undefined unless the invoker of this method is the single consumer. int pushBack(const TYPE& value); // Append the specified 'value' to the back of this queue. Return 0 on // success, and a non-zero value otherwise. Specifically, return // 'e_SUCCESS' on success, 'e_DISABLED' if 'isPushBackDisabled()' and // 'e_FAILED' if an underlying mechanism returns an error. Threads // blocked due to the queue being full will return 'e_DISABLED' if // 'disablePushFront' is invoked. The behavior is undefined unless the // invoker of this method is the single producer. int pushBack(bslmf::MovableRef<TYPE> value); // Append the specified move-insertable 'value' to the back of this // queue. 'value' is left in a valid but unspecified state. Return 0 // on success, and a non-zero value otherwise. Specifically, return // 'e_SUCCESS' on success, 'e_DISABLED' if 'isPushBackDisabled()' and // 'e_FAILED' if an underlying mechanism returns an error. On failure, // 'value' is not changed. Threads blocked due to the queue being full // will return 'e_DISABLED' if 'disablePushFront' is invoked. The // behavior is undefined unless the invoker of this method is the // single producer. void removeAll(); // Remove all items currently in this queue. Note that this operation // is not atomic; if other threads are concurrently pushing items into // the queue the result of 'numElements()' after this function returns // is not guaranteed to be 0. The behavior is undefined unless the // invoker of this method is the single consumer. int tryPopFront(TYPE *value); // Attempt to remove the element from the front of this queue without // blocking, and, if successful, load the specified 'value' with the // removed element. Return 0 on success, and a non-zero value // otherwise. Specifically, return 'e_SUCCESS' on success, // 'e_DISABLED' if 'isPopFrontDisabled()', and 'e_EMPTY' if // '!isPopFrontDisabled()' and the queue was empty. On failure, // 'value' is not changed. The behavior is undefined unless the // invoker of this method is the single consumer. int tryPushBack(const TYPE& value); // Append the specified 'value' to the back of this queue. Return 0 on // success, and a non-zero value otherwise. Specifically, return // 'e_SUCCESS' on success, 'e_DISABLED' if 'isPushBackDisabled()', and // 'e_FULL' if '!isPushBackDisabled()' and the queue was full. The // behavior is undefined unless the invoker of this method is the // single producer. int tryPushBack(bslmf::MovableRef<TYPE> value); // Append the specified move-insertable 'value' to the back of this // queue. 'value' is left in a valid but unspecified state. Return 0 // on success, and a non-zero value otherwise. Specifically, return // 'e_SUCCESS' on success, 'e_DISABLED' if 'isPushBackDisabled()', and // 'e_FULL' if '!isPushBackDisabled()' and the queue was full. On // failure, 'value' is not changed. The behavior is undefined unless // the invoker of this method is the single producer. // Enqueue/Dequeue State void disablePopFront(); // Disable dequeueing from this queue. All subsequent invocations of // 'popFront' or 'tryPopFront' will fail immediately. If the single // consumer is blocked in 'popFront', the invocation of 'popFront' will // fail immediately. Any blocked invocations of 'waitUntilEmpty' will // fail immediately. If the queue is already dequeue disabled, this // method has no effect. void disablePushBack(); // Disable enqueueing into this queue. All subsequent invocations of // 'pushBack' or 'tryPushBack' will fail immediately. If the single // producer is blocked in 'pushBack', the invocation of 'pushBack' will // fail immediately. If the queue is already enqueue disabled, this // method has no effect. void enablePushBack(); // Enable queuing. If the queue is not enqueue disabled, this call has // no effect. void enablePopFront(); // Enable dequeueing. If the queue is not dequeue disabled, this call // has no effect. // ACCESSORS bsl::size_t capacity() const; // Return the maximum number of elements that may be stored in this // queue. Note that the value returned may be greater than that // supplied at construction. bool isEmpty() const; // Return 'true' if this queue is empty (has no elements), or 'false' // otherwise. bool isFull() const; // Return 'true' if this queue is full (has no available capacity), or // 'false' otherwise. bool isPopFrontDisabled() const; // Return 'true' if this queue is dequeue disabled, and 'false' // otherwise. Note that the queue is created in the "dequeue enabled" // state. bool isPushBackDisabled() const; // Return 'true' if this queue is enqueue disabled, and 'false' // otherwise. Note that the queue is created in the "enqueue enabled" // state. bsl::size_t numElements() const; // Returns the number of elements currently in this queue. int waitUntilEmpty() const; // Block until all the elements in this queue are removed. Return 0 on // success, and a non-zero value otherwise. Specifically, return // 'e_SUCCESS' on success, 'e_DISABLED' if 'isPopFrontDisabled()' and // 'e_FAILED' if an underlying mechanism returns an error. A blocked // thread waiting for the queue to empty will return a non-zero value // if 'disablePopFront' is invoked. // Aspects bslma::Allocator *allocator() const; // Return the allocator used by this object to supply memory. }; // ============================================================================ // INLINE DEFINITIONS // ============================================================================ // --------------------------------------------------------------- // class SingleProducerSingleConsumerBoundedQueue_PopCompleteGuard // --------------------------------------------------------------- // CREATORS template <class TYPE, class NODE> inline SingleProducerSingleConsumerBoundedQueue_PopCompleteGuard<TYPE, NODE> ::SingleProducerSingleConsumerBoundedQueue_PopCompleteGuard( TYPE *queue, NODE *node, Uint64 index) : d_queue_p(queue) , d_node_p(node) , d_index(index) { } template <class TYPE, class NODE> inline SingleProducerSingleConsumerBoundedQueue_PopCompleteGuard<TYPE, NODE> ::~SingleProducerSingleConsumerBoundedQueue_PopCompleteGuard() { d_queue_p->popComplete(d_node_p, d_index); } // ---------------------------------------------- // class SingleProducerSingleConsumerBoundedQueue // ---------------------------------------------- // PRIVATE CLASS METHODS template <class TYPE> void SingleProducerSingleConsumerBoundedQueue<TYPE> ::incrementUntil(AtomicUint *value, unsigned int bitValue) { unsigned int state = AtomicOp::getUintAcquire(value); if (bitValue != (state & 1)) { unsigned int expState; do { expState = state; state = AtomicOp::testAndSwapUintAcqRel(value, state, state + 1); } while (state != expState && (bitValue == (state & 1))); } } // PRIVATE MANIPULATORS template <class TYPE> inline void SingleProducerSingleConsumerBoundedQueue<TYPE>::popComplete(Node *node, Uint64 index) { ++index; if (index == d_popCapacity) { index = 0; } AtomicOp::setUint64Release(&d_popIndex, index); node->d_value.object().~TYPE(); Uint nodeState = AtomicOp::swapUintAcqRel(&node->d_state, e_WRITABLE); if (e_READABLE_AND_BLOCKED == nodeState) { { bslmt::LockGuard<bslmt::Mutex> guard(&d_pushMutex); } d_pushCondition.signal(); } // If the node subsequent to 'node' is writable, the queue is empty and the // node subsequent to 'node' must be marked as 'e_WRITABLE_AND_EMPTY'. nodeState = AtomicOp::testAndSwapUintAcqRel(&d_popElement_p[index].d_state, e_WRITABLE, e_WRITABLE_AND_EMPTY); if (e_WRITABLE == nodeState) { // The queue is empty, increment the empty generation count. AtomicOp::addUintAcqRel(&d_emptyGeneration, 1); if (0 < AtomicOp::getUintAcquire(&d_emptyCount)) { { bslmt::LockGuard<bslmt::Mutex> guard(&d_emptyMutex); } d_emptyCondition.broadcast(); } } } template <class TYPE> int SingleProducerSingleConsumerBoundedQueue<TYPE>::popFrontImp(TYPE *value, bool isTry) { Uint64 index = AtomicOp::getUint64Acquire(&d_popIndex); const Uint disabledGen = AtomicOp::getUintAcquire(&d_popDisabledGeneration); if (disabledGen & 1) { return e_DISABLED; // RETURN } Node& node = d_popElement_p[index]; Uint nodeState = AtomicOp::getUintAcquire(&node.d_state); // If the node is not available for reading: // * if this is a "try" invocation, return // * otherwise, yield and check again, then block // Note that 'e_WRITABLE_AND_BLOCKED != nodeState' since this is the one // consumer. if (e_WRITABLE_AND_EMPTY == nodeState) { if (isTry) { return e_EMPTY; // RETURN } bslmt::ThreadUtil::yield(); nodeState = AtomicOp::getUintAcquire(&node.d_state); if (e_WRITABLE_AND_EMPTY == nodeState) { bslmt::LockGuard<bslmt::Mutex> guard(&d_popMutex); nodeState = AtomicOp::testAndSwapUintAcqRel( &node.d_state, nodeState, e_WRITABLE_AND_BLOCKED); while (( e_WRITABLE_AND_EMPTY == nodeState || e_WRITABLE_AND_BLOCKED == nodeState) && disabledGen == AtomicOp::getUintAcquire(&d_popDisabledGeneration)) { int rv = d_popCondition.wait(&d_popMutex); if (rv) { AtomicOp::testAndSwapUintAcqRel(&node.d_state, e_WRITABLE_AND_BLOCKED, e_WRITABLE_AND_EMPTY); return e_FAILED; // RETURN } nodeState = AtomicOp::getUint(&node.d_state); } // The following checks for disablement being the cause of exiting // the 'while' loop. if ( e_WRITABLE_AND_EMPTY == nodeState || e_WRITABLE_AND_BLOCKED == nodeState) { AtomicOp::testAndSwapUintAcqRel(&node.d_state, e_WRITABLE_AND_BLOCKED, e_WRITABLE_AND_EMPTY); return e_DISABLED; // RETURN } } } SingleProducerSingleConsumerBoundedQueue_PopCompleteGuard< SingleProducerSingleConsumerBoundedQueue<TYPE>, Node> guard(this, &node, index); #if defined(BSLMF_MOVABLEREF_USES_RVALUE_REFERENCES) *value = bslmf::MovableRefUtil::move(node.d_value.object()); #else *value = node.d_value.object(); #endif return e_SUCCESS; } template <class TYPE> int SingleProducerSingleConsumerBoundedQueue<TYPE>::pushBackImp( const TYPE& value, bool isTry) { Uint64 index = AtomicOp::getUint64Acquire(&d_pushIndex); const Uint disabledGen = AtomicOp::getUintAcquire(&d_pushDisabledGeneration); if (disabledGen & 1) { return e_DISABLED; // RETURN } Node& node = d_pushElement_p[index]; Uint nodeState = AtomicOp::getUintAcquire(&node.d_state); // If the node is not available for writing: // * if this is a "try" invocation, return // * otherwise, yield and check again, then block // Note that 'e_READABLE_AND_BLOCKED != nodeState' since this is the one // producer. if (e_READABLE == nodeState) { if (isTry) { return e_FULL; // RETURN } bslmt::ThreadUtil::yield(); nodeState = AtomicOp::getUintAcquire(&node.d_state); if (e_READABLE == nodeState) { bslmt::LockGuard<bslmt::Mutex> guard(&d_pushMutex); nodeState = AtomicOp::testAndSwapUintAcqRel( &node.d_state, e_READABLE, e_READABLE_AND_BLOCKED); while (( e_READABLE == nodeState || e_READABLE_AND_BLOCKED == nodeState) && disabledGen == AtomicOp::getUintAcquire(&d_pushDisabledGeneration)) { int rv = d_pushCondition.wait(&d_pushMutex); if (rv) { AtomicOp::testAndSwapUintAcqRel(&node.d_state, e_READABLE_AND_BLOCKED, e_READABLE); return e_FAILED; // RETURN } nodeState = AtomicOp::getUint(&node.d_state); } // The following checks for disablement being the cause of exiting // the 'while' loop. if ( e_READABLE == nodeState || e_READABLE_AND_BLOCKED == nodeState) { AtomicOp::testAndSwapUintAcqRel(&node.d_state, e_READABLE_AND_BLOCKED, e_READABLE); return e_DISABLED; // RETURN } } } bslalg::ScalarPrimitives::copyConstruct(node.d_value.address(), value, d_allocator_p); pushComplete(&node, index); return e_SUCCESS; } template <class TYPE> int SingleProducerSingleConsumerBoundedQueue<TYPE>::pushBackImp( bslmf::MovableRef<TYPE> value, bool isTry) { Uint64 index = AtomicOp::getUint64Acquire(&d_pushIndex); const Uint disabledGen = AtomicOp::getUintAcquire(&d_pushDisabledGeneration); if (disabledGen & 1) { return e_DISABLED; // RETURN } Node& node = d_pushElement_p[index]; Uint nodeState = AtomicOp::getUintAcquire(&node.d_state); if (e_READABLE == nodeState) { if (isTry) { return e_FULL; // RETURN } bslmt::ThreadUtil::yield(); nodeState = AtomicOp::getUintAcquire(&node.d_state); if (e_READABLE == nodeState) { bslmt::LockGuard<bslmt::Mutex> guard(&d_pushMutex); nodeState = AtomicOp::testAndSwapUintAcqRel( &node.d_state, e_READABLE, e_READABLE_AND_BLOCKED); while (( e_READABLE == nodeState || e_READABLE_AND_BLOCKED == nodeState) && disabledGen == AtomicOp::getUintAcquire(&d_pushDisabledGeneration)) { int rv = d_pushCondition.wait(&d_pushMutex); if (rv) { AtomicOp::testAndSwapUintAcqRel(&node.d_state, e_READABLE_AND_BLOCKED, e_READABLE); return e_FAILED; // RETURN } nodeState = AtomicOp::getUint(&node.d_state); } if ( e_READABLE == nodeState || e_READABLE_AND_BLOCKED == nodeState) { AtomicOp::testAndSwapUintAcqRel(&node.d_state, e_READABLE_AND_BLOCKED, e_READABLE); return e_DISABLED; // RETURN } } } TYPE& dummy = value; bslalg::ScalarPrimitives::moveConstruct(node.d_value.address(), dummy, d_allocator_p); pushComplete(&node, index); return e_SUCCESS; } template <class TYPE> inline void SingleProducerSingleConsumerBoundedQueue<TYPE>::pushComplete( Node *node, Uint64 index) { Uint nodeState = AtomicOp::swapUintAcqRel(&node->d_state, e_READABLE); if (e_WRITABLE_AND_BLOCKED == nodeState) { // Queue is no longer empty and the consumer is blocked. AtomicOp::addUintAcqRel(&d_emptyGeneration, 1); { bslmt::LockGuard<bslmt::Mutex> guard(&d_popMutex); } d_popCondition.signal(); } else if (e_WRITABLE_AND_EMPTY == nodeState) { // Queue is no longer empty. AtomicOp::addUintAcqRel(&d_emptyGeneration, 1); } ++index; if (index == d_pushCapacity) { index = 0; } AtomicOp::setUint64Release(&d_pushIndex, index); } // CREATORS template <class TYPE> SingleProducerSingleConsumerBoundedQueue<TYPE>:: SingleProducerSingleConsumerBoundedQueue(bsl::size_t capacity, bslma::Allocator *basicAllocator) : d_popElement_p(0) , d_popCapacity(capacity > 0 ? capacity : 1) , d_popPad() , d_pushCapacity(capacity > 0 ? capacity : 1) , d_pushPad() , d_popMutex() , d_popCondition() , d_pushMutex() , d_pushCondition() , d_emptyMutex() , d_emptyCondition() , d_allocator_p(bslma::Default::allocator(basicAllocator)) { AtomicOp::initUint64(&d_popIndex, 0); AtomicOp::initUint64(&d_pushIndex, 0); AtomicOp::initUint(&d_popDisabledGeneration, 0); AtomicOp::initUint(&d_emptyCount, 0); AtomicOp::initUint(&d_emptyGeneration, 0); AtomicOp::initUint(&d_pushDisabledGeneration, 0); d_popElement_p = static_cast<Node *>( d_allocator_p->allocate(d_popCapacity * sizeof(Node))); d_pushElement_p = d_popElement_p; AtomicOp::initUint(&d_popElement_p[0].d_state, e_WRITABLE_AND_EMPTY); for (bsl::size_t i = 1; i < d_popCapacity; ++i) { AtomicOp::initUint(&d_popElement_p[i].d_state, e_WRITABLE); } } template <class TYPE> SingleProducerSingleConsumerBoundedQueue<TYPE> ::~SingleProducerSingleConsumerBoundedQueue() { if (d_popElement_p) { removeAll(); d_allocator_p->deallocate(d_popElement_p); } } // MANIPULATORS template <class TYPE> inline int SingleProducerSingleConsumerBoundedQueue<TYPE>::popFront(TYPE *value) { return popFrontImp(value, false); } template <class TYPE> inline int SingleProducerSingleConsumerBoundedQueue<TYPE>::pushBack(const TYPE& value) { return pushBackImp(value, false); } template <class TYPE> inline int SingleProducerSingleConsumerBoundedQueue<TYPE>::pushBack( bslmf::MovableRef<TYPE> value) { return pushBackImp(bslmf::MovableRefUtil::move(value), false); } template <class TYPE> void SingleProducerSingleConsumerBoundedQueue<TYPE>::removeAll() { Uint64 index = AtomicOp::getUint64Acquire(&d_popIndex); Uint nodeState = AtomicOp::getUintAcquire( &d_popElement_p[index].d_state); while (e_READABLE == nodeState || e_READABLE_AND_BLOCKED == nodeState) { d_popElement_p[index].d_value.object().~TYPE(); AtomicOp::swapUintAcqRel(&d_popElement_p[index].d_state, e_WRITABLE); ++index; if (index == d_popCapacity) { index = 0; } nodeState = AtomicOp::getUintAcquire(&d_popElement_p[index].d_state); } // If the node subsequent to the last removed element is writable, the // queue is empty and the node subsequent to the last removed element' must // be marked as 'e_WRITABLE_AND_EMPTY'. nodeState = AtomicOp::testAndSwapUintAcqRel(&d_popElement_p[index].d_state, e_WRITABLE, e_WRITABLE_AND_EMPTY); if (e_WRITABLE == nodeState) { // The queue is empty, increment the empty generation count. AtomicOp::addUintAcqRel(&d_emptyGeneration, 1); } else { // A 'removeAll' makes the queue empty. Since there has been a // 'pushBack' before the queue could be marked empty ('e_WRITABLE != // nodeState'), increase the empty generation by 2 to note the queue // was empty at some point during this method call but is no longer // empty (1 for becoming empty, 1 for leaving the empty state). AtomicOp::addUintAcqRel(&d_emptyGeneration, 2); } AtomicOp::setUint64Release(&d_popIndex, index); { bslmt::LockGuard<bslmt::Mutex> guard(&d_pushMutex); } d_pushCondition.signal(); if (0 < AtomicOp::getUintAcquire(&d_emptyCount)) { { bslmt::LockGuard<bslmt::Mutex> guard(&d_emptyMutex); } d_emptyCondition.broadcast(); } } template <class TYPE> inline int SingleProducerSingleConsumerBoundedQueue<TYPE>::tryPopFront(TYPE *value) { return popFrontImp(value, true); } template <class TYPE> inline int SingleProducerSingleConsumerBoundedQueue<TYPE>::tryPushBack( const TYPE& value) { return pushBackImp(value, true); } template <class TYPE> inline int SingleProducerSingleConsumerBoundedQueue<TYPE>::tryPushBack( bslmf::MovableRef<TYPE> value) { return pushBackImp(bslmf::MovableRefUtil::move(value), true); } // Enqueue/Dequeue State template <class TYPE> inline void SingleProducerSingleConsumerBoundedQueue<TYPE>::disablePopFront() { incrementUntil(&d_popDisabledGeneration, 1); { bslmt::LockGuard<bslmt::Mutex> guard(&d_popMutex); } d_popCondition.broadcast(); if (0 < AtomicOp::getUintAcquire(&d_emptyCount)) { { bslmt::LockGuard<bslmt::Mutex> guard(&d_emptyMutex); } d_emptyCondition.broadcast(); } } template <class TYPE> inline void SingleProducerSingleConsumerBoundedQueue<TYPE>::disablePushBack() { incrementUntil(&d_pushDisabledGeneration, 1); { bslmt::LockGuard<bslmt::Mutex> guard(&d_pushMutex); } d_pushCondition.broadcast(); } template <class TYPE> inline void SingleProducerSingleConsumerBoundedQueue<TYPE>::enablePopFront() { incrementUntil(&d_popDisabledGeneration, 0); } template <class TYPE> inline void SingleProducerSingleConsumerBoundedQueue<TYPE>::enablePushBack() { incrementUntil(&d_pushDisabledGeneration, 0); } // ACCESSORS template <class TYPE> inline bsl::size_t SingleProducerSingleConsumerBoundedQueue<TYPE>::capacity() const { return d_popCapacity; } template <class TYPE> inline bool SingleProducerSingleConsumerBoundedQueue<TYPE>::isEmpty() const { return 0 == (AtomicOp::getUintAcquire(&d_emptyGeneration) & 1); } template <class TYPE> inline bool SingleProducerSingleConsumerBoundedQueue<TYPE>::isFull() const { Node& node = d_pushElement_p[AtomicOp::getUint64Acquire( &d_pushIndex)]; Uint nodeState = AtomicOp::getUintAcquire(&node.d_state); return e_READABLE == nodeState || e_READABLE_AND_BLOCKED == nodeState; } template <class TYPE> inline bool SingleProducerSingleConsumerBoundedQueue<TYPE>::isPopFrontDisabled() const { return 1 == (AtomicOp::getUintAcquire(&d_popDisabledGeneration) & 1); } template <class TYPE> inline bool SingleProducerSingleConsumerBoundedQueue<TYPE>::isPushBackDisabled() const { return 1 == (AtomicOp::getUintAcquire(&d_pushDisabledGeneration) & 1); } template <class TYPE> inline bsl::size_t SingleProducerSingleConsumerBoundedQueue<TYPE>::numElements() const { Uint64 popIndex = AtomicOp::getUint64Acquire(&d_popIndex); Uint64 pushIndex = AtomicOp::getUint64Acquire(&d_pushIndex); Node& node = d_pushElement_p[pushIndex]; Uint nodeState = AtomicOp::getUintAcquire(&node.d_state); if (e_READABLE == nodeState || e_READABLE_AND_BLOCKED == nodeState) { return d_popCapacity; // RETURN } return pushIndex >= popIndex ? pushIndex - popIndex : pushIndex + d_popCapacity - popIndex; } template <class TYPE> int SingleProducerSingleConsumerBoundedQueue<TYPE>::waitUntilEmpty() const { AtomicOp::addUintAcqRel(&d_emptyCount, 1); const Uint initEmptyGen = AtomicOp::getUintAcquire(&d_emptyGeneration); const Uint disabledGen = AtomicOp::getUintAcquire(&d_popDisabledGeneration); if (disabledGen & 1) { AtomicOp::addUintAcqRel(&d_emptyCount, -1); return e_DISABLED; // RETURN } if (0 == (initEmptyGen & 1)) { AtomicOp::addUintAcqRel(&d_emptyCount, -1); return e_SUCCESS; // RETURN } bslmt::LockGuard<bslmt::Mutex> guard(&d_emptyMutex); Uint emptyGen = AtomicOp::getUintAcquire(&d_emptyGeneration); while ( initEmptyGen == emptyGen && disabledGen == AtomicOp::getUintAcquire(&d_popDisabledGeneration)) { int rv = d_emptyCondition.wait(&d_emptyMutex); if (rv) { AtomicOp::addUintAcqRel(&d_emptyCount, -1); return e_FAILED; // RETURN } emptyGen = AtomicOp::getUintAcquire(&d_emptyGeneration); } AtomicOp::addUintAcqRel(&d_emptyCount, -1); if (initEmptyGen == emptyGen) { return e_DISABLED; // RETURN } return e_SUCCESS; } // Aspects template <class TYPE> inline bslma::Allocator *SingleProducerSingleConsumerBoundedQueue<TYPE>::allocator() const { return d_allocator_p; } } // close package namespace } // close enterprise namespace #endif // ---------------------------------------------------------------------------- // Copyright 2019 Bloomberg Finance L.P. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // ----------------------------- END-OF-FILE ----------------------------------