// bdlcc_singleproducerqueueimpl.h -*-C++-*- #ifndef INCLUDED_BDLCC_SINGLEPRODUCERQUEUEIMPL #define INCLUDED_BDLCC_SINGLEPRODUCERQUEUEIMPL #include <bsls_ident.h> BSLS_IDENT("$Id: $") //@PURPOSE: Provide a testable thread-aware single producer queue of values. // //@CLASSES: // bdlcc::SingleProducerQueueImpl: thread-aware single producer 'TYPE' queue // //@DESCRIPTION: This component defines a type, // 'bdlcc::SingleProducerQueueImpl', that provides an efficient, thread-aware // queue of values assuming a single producer (the use of 'pushBack' and // 'tryPushBack' is done by one thread or a group of threads using external // synchronization). The behavior of the methods 'pushBack' and 'tryPushBack' // is undefined unless the use is by a single producer. This class is ideal // for synchronization and communication between threads in a producer-consumer // model when there is only one producer thread. // // The queue provides 'pushBack' and 'popFront' methods for pushing data into // the queue and popping data from the queue. The queue will allocate memory // as necessary to accommodate 'pushBack' invocations ('pushBack' will never // block and is provided for consistency with other containers). When the // queue is empty, the 'popFront' methods block until data appears in the // queue. Non-blocking methods 'tryPushBack' and 'tryPopFront' are also // provided. The 'tryPopFront' method fails immediately, returning a non-zero // value, if the queue is empty. // // The queue may be placed into a "enqueue disabled" state using the // 'disablePushBack' method. When disabled, 'pushBack' and 'tryPushBack' fail // immediately and return an error code. The queue may be restored to normal // operation with the 'enablePushBack' method. // // The queue may be placed into a "dequeue disabled" state using the // 'disablePopFront' method. When dequeue disabled, 'popFront' and // 'tryPopFront' fail immediately and return an error code. Any threads // blocked in 'popFront' when the queue is dequeue disabled return from // 'popFront' immediately and return an error code. // ///Exception safety ///---------------- // A 'bdlcc::SingleProducerQueueImpl' is exception neutral, and all of the // methods of 'bdlcc::SingleProducerQueueImpl' provide the basic exception // safety guarantee (see 'bsldoc_glossary'). // ///Move Semantics in C++03 ///----------------------- // Move-only types are supported by 'bdlcc::SingleProducerQueueImpl' on C++11 // platforms only (where 'BSLMF_MOVABLEREF_USES_RVALUE_REFERENCES' is defined), // and are not supported on C++03 platforms. Unfortunately, in C++03, there // are user types where a 'bslmf::MovableRef' will not safely degrade to a // lvalue reference when a move constructor is not available (types providing a // constructor template taking any type), so 'bslmf::MovableRefUtil::move' // cannot be used directly on a user supplied template type. See internal bug // report 99039150 for more information. // ///Memory Usage ///------------ // 'bdlcc::SingleProducerQueueImpl' is most efficient when dealing with small // objects or fundamental types (as a thread-safe container, its methods pass // objects *by* *value*). We recommend large objects be stored as // shared-pointers (or possibly raw pointers). // ///Usage ///----- // There is no usage example for this component since it is not meant for // direct client use. #include <bdlscm_version.h> #include <bslalg_scalarprimitives.h> #include <bslma_deallocatorproctor.h> #include <bslma_default.h> #include <bslma_usesbslmaallocator.h> #include <bslmf_movableref.h> #include <bslmf_nestedtraitdeclaration.h> #include <bslmt_lockguard.h> #include <bslmt_threadutil.h> #include <bsls_assert.h> #include <bsls_objectbuffer.h> #include <bsls_types.h> #include <bsl_cstddef.h> namespace BloombergLP { namespace bdlcc { // ================================================== // class SingleProducerQueueImpl_ReleaseAllRawProctor // ================================================== template <class TYPE> class SingleProducerQueueImpl_ReleaseAllRawProctor { // This class implements a proctor that, unless its 'release' method has // previously been invoked, automatically invokes 'releaseAllRaw' on a // 'TYPE' upon destruction. // DATA TYPE *d_queue_p; // managed queue // NOT IMPLEMENTED SingleProducerQueueImpl_ReleaseAllRawProctor(); SingleProducerQueueImpl_ReleaseAllRawProctor( const SingleProducerQueueImpl_ReleaseAllRawProctor&); SingleProducerQueueImpl_ReleaseAllRawProctor& operator=( const SingleProducerQueueImpl_ReleaseAllRawProctor&); public: // CREATORS SingleProducerQueueImpl_ReleaseAllRawProctor(TYPE *queue); // Create a 'removeAll' proctor that conditionally manages the // specified 'queue' (if non-zero). ~SingleProducerQueueImpl_ReleaseAllRawProctor(); // Destroy this object and, if 'release' has not been invoked, invoke // the managed queue's 'releaseAllRaw' method. // MANIPULATORS void release(); // Release from management the queue currently managed by this proctor. // If no queue, this method has no effect. }; // ============================================== // class SingleProducerQueueImpl_PopCompleteGuard // ============================================== template <class TYPE, class NODE> class SingleProducerQueueImpl_PopCompleteGuard { // This class implements a guard automatically invokes 'popComplete' on a // 'NODE' upon destruction. // DATA TYPE *d_queue_p; // managed queue owning the managed node NODE *d_node_p; // managed node bool d_isEmpty; // if true, the empty condition will be signalled // NOT IMPLEMENTED SingleProducerQueueImpl_PopCompleteGuard(); SingleProducerQueueImpl_PopCompleteGuard( const SingleProducerQueueImpl_PopCompleteGuard&); SingleProducerQueueImpl_PopCompleteGuard& operator=( const SingleProducerQueueImpl_PopCompleteGuard&); public: // CREATORS SingleProducerQueueImpl_PopCompleteGuard(TYPE *queue, NODE *node, bool isEmpty); // Create a 'popComplete' guard managing the specified 'queue' and // 'node' that will cause the empty condition to be signalled if the // specified 'isEmpty' is 'true'. ~SingleProducerQueueImpl_PopCompleteGuard(); // Destroy this object and invoke the 'TYPE::popComplete' method with // the managed 'node'. }; // ============================= // class SingleProducerQueueImpl // ============================= template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> class SingleProducerQueueImpl { // This class provides a thread-safe unbounded queue of values that assumes // a single producer thread. // // The types 'ATOMIC_OP', 'MUTEX', and 'CONDITION' are exposed for testing. // Typical usage is with 'bsls::AtomicOperations' for 'ATOMIC_OP', // 'bslmt::Mutex' for 'MUTEX', and 'bslmt::Condition' for 'CONDITION'. // PRIVATE CONSTANTS enum { // These value are used as values for 'd_state' in 'Node'. A node is // writable at creation and after a read completes (when the single // producer can write to the node). A node is readable after it is // written (when the node can be read by a consumer). The states // in-between these two states (e.g., writing) are not needed by this // implementation of the queue. e_READABLE, // node can be read e_WRITABLE // node can be written }; static const int k_POP_YIELD_SPIN = 10; // number of yield-spins to // attempt before acquiring // 'd_readMutex' // The following constants are used to maintain the queue's 'd_state' // value. See *Implementation* *Note* for details. static const bsls::Types::Int64 k_BLOCKED_MASK = 0x0000000000ffffffLL; static const bsls::Types::Int64 k_BLOCKED_INC = 0x0000000000000001LL; static const bsls::Types::Int64 k_AVAILABLE_INC = 0x0000000001000000LL; static const bsls::Types::Int64 k_AVAILABLE_MASK = 0xffffffffff000000LL; static const int k_AVAILABLE_SHIFT = 24; // PRIVATE TYPES typedef typename ATOMIC_OP::AtomicTypes::Int AtomicInt; typedef typename ATOMIC_OP::AtomicTypes::Uint AtomicUint; typedef typename ATOMIC_OP::AtomicTypes::Int64 AtomicInt64; typedef typename ATOMIC_OP::AtomicTypes::Pointer AtomicPointer; template <class T> struct QueueNode { // PUBLIC DATA bsls::ObjectBuffer<T> d_value; // stored value AtomicInt d_state; // 'e_READABLE' or 'e_WRITABLE' AtomicPointer d_next; // pointer to next node }; typedef QueueNode<TYPE> Node; // DATA AtomicPointer d_nextWrite; // pointer to next write to node AtomicPointer d_nextRead; // pointer to next read from node MUTEX d_readMutex; // used with 'd_readCondition' to // block until an element is // available for popping CONDITION d_readCondition; // condition variable for popping // threads mutable MUTEX d_emptyMutex; // blocking point for producer // during 'waitUntilEmpty' mutable CONDITION d_emptyCondition; // condition variable for producer // during 'waitUntilEmpty' AtomicInt64 d_state; // bit pattern representing the // state of the queue (see // implementation notes) AtomicUint d_popFrontDisabled; // is queue pop disabled and // generation count; see // *Implementation* *Note* AtomicUint d_pushBackDisabled; // is queue push disabled and // generation count; see // *Implementation* *Note* bslma::Allocator *d_allocator_p; // allocator, held not owned // FRIENDS friend class SingleProducerQueueImpl_ReleaseAllRawProctor< SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION> >; friend class SingleProducerQueueImpl_PopCompleteGuard< SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>, typename SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::Node >; // PRIVATE CLASS METHODS static bool allElementsReserved(bsls::Types::Int64 state); // Return 'true' if the specified 'state' implies all elements in the // associated queue will be used to complete currently active dequeue // operations. static bool canSupplyBlockedThread(bsls::Types::Int64 state); // Return 'true' if the specified 'state' implies the associated queue // can supply an element to a thread blocked in a dequeue operation. static bool canSupplyOneBlockedThread(bsls::Types::Int64 state); // Return 'true' if the specified 'state' implies the associated queue // can supply exactly one element to a thread blocked in a dequeue // operation. static bsls::Types::Int64 getAvailable(bsls::Types::Int64 state); // Return the available attribute from the specified 'state'. static bool isEmpty(bsls::Types::Int64 state); // Return 'true' if the specified 'state' implies the associated queue // is empty, and 'false' otherwise. static bool willHaveBlockedThread(bsls::Types::Int64 state); // Return 'true' if the specified 'state' implies the associated queue // will have one or more threads blocked in a dequeue operation. // PRIVATE MANIPULATORS void incrementUntil(AtomicUint *value, unsigned int bitValue); // If the specified 'value' does not have its lowest-order bit set to // the value of the specified 'bitValue', increment 'value' until it // does. Note that this method is used to modify the generation counts // stored in 'd_popFrontDisabled' and 'd_pushBackDisabled'. See // *Implementation* *Note* for further details. void popComplete(Node *node, bool isEmpty); // Destruct the value stored in the specified 'node', mark the 'node' // writable, and if the specified 'isEmpty' is 'true' then signal the // queue empty condition. This method is used within 'popFrontRaw' by // a guard to complete the reclamation of a node in the presence of an // exception. void popFrontRaw(TYPE* value, bool isEmpty); // Remove the element, without verifying the availability of the // element, from the front of this queue, load that element into the // specified 'value', and if the specified 'isEmpty' is 'true' then // signal the queue empty condition. void releaseAllRaw(); // Return all memory to the allocator. This method is intended to be // used by the destructor and to avoid a memory leak when there is an // exception during construction. // NOT IMPLEMENTED SingleProducerQueueImpl(const SingleProducerQueueImpl&); SingleProducerQueueImpl& operator=(const SingleProducerQueueImpl&); public: // TRAITS BSLMF_NESTED_TRAIT_DECLARATION(SingleProducerQueueImpl, bslma::UsesBslmaAllocator); // PUBLIC TYPES typedef TYPE value_type; // The type for elements. // PUBLIC CONSTANTS enum { e_SUCCESS = 0, // must be 0 e_EMPTY = -1, e_DISABLED = -2 }; // CREATORS SingleProducerQueueImpl(bslma::Allocator *basicAllocator = 0); // Create a thread-aware queue. Optionally specify a 'basicAllocator' // used to supply memory. If 'basicAllocator' is 0, the currently // installed default allocator is used. SingleProducerQueueImpl(bsl::size_t capacity, bslma::Allocator *basicAllocator = 0); // Create a thread-aware queue with, at least, the specified // 'capacity'. Optionally specify a 'basicAllocator' used to supply // memory. If 'basicAllocator' is 0, the currently installed default // allocator is used. ~SingleProducerQueueImpl(); // Destroy this object. // MANIPULATORS int popFront(TYPE *value); // Remove the element from the front of this queue and load that // element into the specified 'value'. If the queue is empty, block // until it is not empty. Return 0 on success, and a non-zero value // otherwise. Specifically, return 'e_DISABLED' if // 'isPopFrontDisabled()'. On failure, 'value' is not changed. // Threads blocked due to the queue being empty will return // 'e_DISABLED' if 'disablePopFront' is invoked. int pushBack(const TYPE& value); // Append the specified 'value' to the back of this queue. Return 0 on // success, and a non-zero value otherwise. Specifically, return // 'e_DISABLED' if 'isPushBackDisabled()'. The behavior is undefined // unless the invoker of this method is the single producer. int pushBack(bslmf::MovableRef<TYPE> value); // Append the specified move-insertable 'value' to the back of this // queue. 'value' is left in a valid but unspecified state. Return 0 // on success, and a non-zero value otherwise. Specifically, return // 'e_DISABLED' if 'isPushBackDisabled()'. On failure, 'value' is not // changed. The behavior is undefined unless the invoker of this // method is the single producer. void removeAll(); // Remove all items currently in this queue. Note that this operation // is not atomic; if other threads are concurrently pushing items into // the queue the result of 'numElements()' after this function returns // is not guaranteed to be 0. int tryPopFront(TYPE *value); // Attempt to remove the element from the front of this queue without // blocking, and, if successful, load the specified 'value' with the // removed element. Return 0 on success, and a non-zero value // otherwise. Specifically, return 'e_DISABLED' if // 'isPopFrontDisabled()', and 'e_EMPTY' if '!isPopFrontDisabled()' and // the queue was empty. On failure, 'value' is not changed. int tryPushBack(const TYPE& value); // Append the specified 'value' to the back of this queue. Return 0 on // success, and a non-zero value otherwise. Specifically, return // 'e_DISABLED' if 'isPushBackDisabled()'. The behavior is undefined // unless the invoker of this method is the single producer. int tryPushBack(bslmf::MovableRef<TYPE> value); // Append the specified move-insertable 'value' to the back of this // queue. 'value' is left in a valid but unspecified state. Return 0 // on success, and a non-zero value otherwise. Specifically, return // 'e_DISABLED" if 'isPushBackDisabled()'. On failure, 'value' is not // changed. The behavior is undefined unless the invoker of this // method is the single producer. // Enqueue/Dequeue State void disablePopFront(); // Disable dequeueing from this queue. All subsequent invocations of // 'popFront' or 'tryPopFront' will fail immediately. All blocked // invocations of 'popFront' and 'waitUntilEmpty' will fail // immediately. If the queue is already dequeue disabled, this method // has no effect. void disablePushBack(); // Disable enqueueing into this queue. All subsequent invocations of // 'pushBack' or 'tryPushBack' will fail immediately. All blocked // invocations of 'pushBack' will fail immediately. If the queue is // already enqueue disabled, this method has no effect. void enablePopFront(); // Enable dequeueing. If the queue is not dequeue disabled, this call // has no effect. void enablePushBack(); // Enable queuing. If the queue is not enqueue disabled, this call has // no effect. // ACCESSORS bool isEmpty() const; // Return 'true' if this queue is empty (has no elements), or 'false' // otherwise. bool isFull() const; // Return 'true' if this queue is full (has no available capacity), or // 'false' otherwise. Note that for unbounded queues, this method // always returns 'false'. bool isPopFrontDisabled() const; // Return 'true' if this queue is dequeue disabled, and 'false' // otherwise. Note that the queue is created in the "dequeue enabled" // state. bool isPushBackDisabled() const; // Return 'true' if this queue is enqueue disabled, and 'false' // otherwise. Note that the queue is created in the "enqueue enabled" // state. bsl::size_t numElements() const; // Returns the number of elements currently in this queue. int waitUntilEmpty() const; // Block until all the elements in this queue are removed. Return 0 on // success, and a non-zero value otherwise. Specifically, return // 'e_DISABLED' if '!isEmpty() && isPopFrontDisabled()'. A blocked // thread waiting for the queue to empty will return 'e_DISABLED' if // 'disablePopFront' is invoked. // Aspects bslma::Allocator *allocator() const; // Return the allocator used by this object to supply memory. }; // ============================================================================ // INLINE DEFINITIONS // ============================================================================ // -------------------------------------------------- // class SingleProducerQueueImpl_ReleaseAllRawProctor // -------------------------------------------------- // CREATORS template <class TYPE> SingleProducerQueueImpl_ReleaseAllRawProctor<TYPE>:: SingleProducerQueueImpl_ReleaseAllRawProctor(TYPE *queue) : d_queue_p(queue) { } template <class TYPE> SingleProducerQueueImpl_ReleaseAllRawProctor<TYPE>:: ~SingleProducerQueueImpl_ReleaseAllRawProctor() { if (d_queue_p) { d_queue_p->releaseAllRaw(); } } // MANIPULATORS template <class TYPE> void SingleProducerQueueImpl_ReleaseAllRawProctor<TYPE>::release() { d_queue_p = 0; } // ---------------------------------------------- // class SingleProducerQueueImpl_PopCompleteGuard // ---------------------------------------------- // CREATORS template <class TYPE, class NODE> SingleProducerQueueImpl_PopCompleteGuard<TYPE, NODE>:: SingleProducerQueueImpl_PopCompleteGuard(TYPE *queue, NODE *node, bool isEmpty) : d_queue_p(queue) , d_node_p(node) , d_isEmpty(isEmpty) { } template <class TYPE, class NODE> SingleProducerQueueImpl_PopCompleteGuard<TYPE, NODE>:: ~SingleProducerQueueImpl_PopCompleteGuard() { d_queue_p->popComplete(d_node_p, d_isEmpty); } // ----------------------------- // class SingleProducerQueueImpl // ----------------------------- // PRIVATE CLASS METHODS template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> inline bool SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>:: allElementsReserved(bsls::Types::Int64 state) { return (state >> k_AVAILABLE_SHIFT) <= (state & k_BLOCKED_MASK); } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> inline bool SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>:: canSupplyBlockedThread(bsls::Types::Int64 state) { return k_AVAILABLE_INC <= state && (state & k_BLOCKED_MASK); } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> inline bool SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>:: canSupplyOneBlockedThread(bsls::Types::Int64 state) { return k_AVAILABLE_INC == (state & k_AVAILABLE_MASK) && (state & k_BLOCKED_MASK); } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> inline bsls::Types::Int64 SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>:: getAvailable(bsls::Types::Int64 state) { return state >> k_AVAILABLE_SHIFT; } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> inline bool SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>:: isEmpty(bsls::Types::Int64 state) { return k_AVAILABLE_INC > state; } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> inline bool SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>:: willHaveBlockedThread(bsls::Types::Int64 state) { return (state >> k_AVAILABLE_SHIFT) < (state & k_BLOCKED_MASK); } // PRIVATE MANIPULATORS template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> void SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION> ::incrementUntil(AtomicUint *value, unsigned int bitValue) { unsigned int state = ATOMIC_OP::getUintAcquire(value); if (bitValue != (state & 1)) { unsigned int expState; do { expState = state; state = ATOMIC_OP::testAndSwapUintAcqRel(value, state, state + 1); } while (state != expState && (bitValue == (state & 1))); } } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> void SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>:: popComplete(Node *node, bool isEmpty) { node->d_value.object().~TYPE(); ATOMIC_OP::setIntRelease(&node->d_state, e_WRITABLE); if (isEmpty) { { bslmt::LockGuard<MUTEX> guard(&d_emptyMutex); } d_emptyCondition.broadcast(); } } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> void SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>:: popFrontRaw(TYPE *value, bool isEmpty) { Node *readFrom = static_cast<Node *>(ATOMIC_OP::getPtrAcquire(&d_nextRead)); Node *exp; do { Node *next = static_cast<Node *>(ATOMIC_OP::getPtrAcquire(&readFrom->d_next)); exp = readFrom; readFrom = static_cast<Node *>(ATOMIC_OP::testAndSwapPtrAcqRel( &d_nextRead, readFrom, next)); } while (readFrom != exp); SingleProducerQueueImpl_PopCompleteGuard< SingleProducerQueueImpl <TYPE, ATOMIC_OP, MUTEX, CONDITION>, Node> guard(this, readFrom, isEmpty); #if defined(BSLMF_MOVABLEREF_USES_RVALUE_REFERENCES) *value = bslmf::MovableRefUtil::move(readFrom->d_value.object()); #else *value = readFrom->d_value.object(); #endif } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> void SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>:: releaseAllRaw() { Node *end = static_cast<Node *>(ATOMIC_OP::getPtrAcquire(&d_nextWrite)); if (end) { Node *at = static_cast<Node *>(ATOMIC_OP::getPtrAcquire(&end->d_next)); while (at != end) { Node *next = static_cast<Node *>(ATOMIC_OP::getPtrAcquire(&at->d_next)); if (e_WRITABLE != ATOMIC_OP::getIntAcquire(&at->d_state)) { at->d_value.object().~TYPE(); } d_allocator_p->deallocate(at); at = next; } if (e_WRITABLE != ATOMIC_OP::getIntAcquire(&at->d_state)) { at->d_value.object().~TYPE(); } d_allocator_p->deallocate(at); } } // CREATORS template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>:: SingleProducerQueueImpl(bslma::Allocator *basicAllocator) : d_readMutex() , d_readCondition() , d_emptyMutex() , d_emptyCondition() , d_allocator_p(bslma::Default::allocator(basicAllocator)) { ATOMIC_OP::initInt64(&d_state, 0); // there are no available elements, the // enable/disable generation is // initialized to zero, and there are // no threads blocked in 'popFront' ATOMIC_OP::initUint(&d_popFrontDisabled, 0); ATOMIC_OP::initUint(&d_pushBackDisabled, 0); ATOMIC_OP::initPointer(&d_nextWrite, 0); SingleProducerQueueImpl_ReleaseAllRawProctor<SingleProducerQueueImpl < TYPE, ATOMIC_OP, MUTEX, CONDITION> > proctor(this); Node *n1 = static_cast<Node *>(d_allocator_p->allocate(sizeof(Node))); ATOMIC_OP::initInt(&n1->d_state, e_WRITABLE); ATOMIC_OP::initPointer(&n1->d_next, n1); ATOMIC_OP::setPtrRelease(&d_nextWrite, n1); ATOMIC_OP::initPointer(&d_nextRead, n1); Node *n2 = static_cast<Node *>(d_allocator_p->allocate(sizeof(Node))); ATOMIC_OP::initInt(&n2->d_state, e_WRITABLE); ATOMIC_OP::initPointer(&n2->d_next, n1); ATOMIC_OP::setPtrRelease(&n1->d_next, n2); proctor.release(); } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>:: SingleProducerQueueImpl(bsl::size_t capacity, bslma::Allocator *basicAllocator) : d_readMutex() , d_readCondition() , d_emptyMutex() , d_emptyCondition() , d_allocator_p(bslma::Default::allocator(basicAllocator)) { ATOMIC_OP::initInt64(&d_state, 0); // there are no available elements, the // enable/disable generation is // initialized to zero, and there are // no threads blocked in 'popFront' ATOMIC_OP::initUint(&d_popFrontDisabled, 0); ATOMIC_OP::initUint(&d_pushBackDisabled, 0); ATOMIC_OP::initPointer(&d_nextWrite, 0); SingleProducerQueueImpl_ReleaseAllRawProctor<SingleProducerQueueImpl < TYPE, ATOMIC_OP, MUTEX, CONDITION> > proctor(this); Node *n1 = static_cast<Node *>(d_allocator_p->allocate(sizeof(Node))); ATOMIC_OP::initInt(&n1->d_state, e_WRITABLE); ATOMIC_OP::initPointer(&n1->d_next, n1); ATOMIC_OP::setPtrRelease(&d_nextWrite, n1); ATOMIC_OP::initPointer(&d_nextRead, n1); Node *n2 = static_cast<Node *>(d_allocator_p->allocate(sizeof(Node))); ATOMIC_OP::initInt(&n2->d_state, e_WRITABLE); ATOMIC_OP::initPointer(&n2->d_next, n1); ATOMIC_OP::setPtrRelease(&n1->d_next, n2); capacity = (2 <= capacity ? capacity : 2); for (bsl::size_t i = 2; i < capacity; ++i) { Node *n = static_cast<Node *>(d_allocator_p->allocate(sizeof(Node))); ATOMIC_OP::initInt(&n->d_state, e_WRITABLE); ATOMIC_OP::initPointer(&n->d_next, ATOMIC_OP::getPtrAcquire(&n2->d_next)); ATOMIC_OP::setPtrRelease(&n2->d_next, n); } proctor.release(); } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>:: ~SingleProducerQueueImpl() { releaseAllRaw(); } // MANIPULATORS template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> int SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::popFront( TYPE *value) { unsigned int generation = ATOMIC_OP::getUintAcquire(&d_popFrontDisabled); if (1 == (generation & 1)) { return e_DISABLED; // RETURN } bsls::Types::Int64 state = ATOMIC_OP::addInt64NvAcqRel(&d_state, -k_AVAILABLE_INC); if (willHaveBlockedThread(state)) { bslmt::ThreadUtil::yield(); state = ATOMIC_OP::getInt64Acquire(&d_state); if (willHaveBlockedThread(state)) { { bslmt::LockGuard<MUTEX> guard(&d_readMutex); state = ATOMIC_OP::addInt64NvAcqRel( &d_state, k_AVAILABLE_INC + k_BLOCKED_INC); while (isEmpty(state)) { if (generation != ATOMIC_OP::getUintAcquire(&d_popFrontDisabled)) { ATOMIC_OP::addInt64AcqRel(&d_state, -k_BLOCKED_INC); return e_DISABLED; // RETURN } d_readCondition.wait(&d_readMutex); state = ATOMIC_OP::getInt64Acquire(&d_state); } state = ATOMIC_OP::addInt64NvAcqRel( &d_state, -(k_AVAILABLE_INC + k_BLOCKED_INC)); } if (canSupplyBlockedThread(state)) { d_readCondition.signal(); } } } popFrontRaw(value, isEmpty(state)); return 0; } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> int SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::pushBack( const TYPE& value) { if (1 == (ATOMIC_OP::getUintAcquire(&d_pushBackDisabled) & 1)) { return e_DISABLED; // RETURN } Node *nextWrite = static_cast<Node *>( ATOMIC_OP::getPtrAcquire(&d_nextWrite)); Node *next = static_cast<Node *>( ATOMIC_OP::getPtrAcquire(&nextWrite->d_next)); if (e_WRITABLE != ATOMIC_OP::getIntAcquire(&next->d_state)) { Node *n = static_cast<Node *>(d_allocator_p->allocate(sizeof(Node))); ATOMIC_OP::initInt(&n->d_state, e_WRITABLE); ATOMIC_OP::initPointer(&n->d_next, next); ATOMIC_OP::setPtrRelease(&nextWrite->d_next, n); next = n; } bslalg::ScalarPrimitives::copyConstruct(nextWrite->d_value.address(), value, d_allocator_p); ATOMIC_OP::setIntRelease(&nextWrite->d_state, e_READABLE); ATOMIC_OP::setPtrRelease(&d_nextWrite, next); bsls::Types::Int64 state = ATOMIC_OP::addInt64NvAcqRel(&d_state, k_AVAILABLE_INC); if (canSupplyOneBlockedThread(state)) { { bslmt::LockGuard<MUTEX> guard(&d_readMutex); } d_readCondition.signal(); } return 0; } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> int SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::pushBack( bslmf::MovableRef<TYPE> value) { if (1 == (ATOMIC_OP::getUintAcquire(&d_pushBackDisabled) & 1)) { return e_DISABLED; // RETURN } Node *nextWrite = static_cast<Node *>( ATOMIC_OP::getPtrAcquire(&d_nextWrite)); Node *next = static_cast<Node *>( ATOMIC_OP::getPtrAcquire(&nextWrite->d_next)); if (e_WRITABLE != ATOMIC_OP::getIntAcquire(&next->d_state)) { Node *n = static_cast<Node *>(d_allocator_p->allocate(sizeof(Node))); ATOMIC_OP::initInt(&n->d_state, e_WRITABLE); ATOMIC_OP::initPointer(&n->d_next, next); ATOMIC_OP::setPtrRelease(&nextWrite->d_next, n); next = n; } TYPE& dummy = value; bslalg::ScalarPrimitives::moveConstruct(nextWrite->d_value.address(), dummy, d_allocator_p); ATOMIC_OP::setIntRelease(&nextWrite->d_state, e_READABLE); ATOMIC_OP::setPtrRelease(&d_nextWrite, next); bsls::Types::Int64 state = ATOMIC_OP::addInt64NvAcqRel(&d_state, k_AVAILABLE_INC); if (canSupplyOneBlockedThread(state)) { { bslmt::LockGuard<MUTEX> guard(&d_readMutex); } d_readCondition.signal(); } return 0; } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> int SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::tryPopFront( TYPE *value) { unsigned int generation = ATOMIC_OP::getUintAcquire(&d_popFrontDisabled); if (1 == (generation & 1)) { return e_DISABLED; // RETURN } // optimistically attempt to acquire resource (representing an element) bsls::Types::Int64 state = ATOMIC_OP::addInt64NvAcqRel(&d_state, -k_AVAILABLE_INC); while (willHaveBlockedThread(state)) { // failed to acquire resource, must revert the change to 'd_state' or // acquire the resource (due to actions of other threads) const bsls::Types::Int64 expState = state; state = ATOMIC_OP::testAndSwapInt64AcqRel(&d_state, state, state + k_AVAILABLE_INC); if (expState == state) { // reverted the change to 'd_state' state += k_AVAILABLE_INC; if (canSupplyBlockedThread(state)) { { bslmt::LockGuard<MUTEX> guard(&d_readMutex); } d_readCondition.signal(); } return e_EMPTY; // RETURN } } popFrontRaw(value, isEmpty(state)); return 0; } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> int SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::tryPushBack( const TYPE& value) { return pushBack(value); } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> int SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::tryPushBack( bslmf::MovableRef<TYPE> value) { return pushBack(bslmf::MovableRefUtil::move(value)); } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> void SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::removeAll() { bsls::Types::Int64 state = ATOMIC_OP::getInt64Acquire(&d_state); bsls::Types::Int64 expState; do { if (allElementsReserved(state)) { return; // RETURN } expState = state; state = ATOMIC_OP::testAndSwapInt64AcqRel( &d_state, state, (state & ~k_AVAILABLE_MASK) | ( (state & k_BLOCKED_MASK) << k_AVAILABLE_SHIFT)); } while (state != expState); state = (state >> k_AVAILABLE_SHIFT) - (state & k_BLOCKED_MASK); for (bsls::Types::Int64 i = 0; i < state; ++i) { Node *readFrom = static_cast<Node *>(ATOMIC_OP::getPtrAcquire(&d_nextRead)); Node *exp; do { Node *next = static_cast<Node *>(ATOMIC_OP::getPtrAcquire(&readFrom->d_next)); exp = readFrom; readFrom = static_cast<Node *>(ATOMIC_OP::testAndSwapPtrAcqRel( &d_nextRead, readFrom, next)); } while (readFrom != exp); readFrom->d_value.object().~TYPE(); ATOMIC_OP::setIntRelease(&readFrom->d_state, e_WRITABLE); } { bslmt::LockGuard<MUTEX> guard(&d_emptyMutex); } d_emptyCondition.broadcast(); } // Enqueue/Dequeue State template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> void SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION> ::disablePopFront() { incrementUntil(&d_popFrontDisabled, 1); { bslmt::LockGuard<MUTEX> guard(&d_readMutex); } d_readCondition.broadcast(); { bslmt::LockGuard<MUTEX> guard(&d_emptyMutex); } d_emptyCondition.broadcast(); } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> void SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION> ::disablePushBack() { incrementUntil(&d_pushBackDisabled, 1); } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> void SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION> ::enablePopFront() { incrementUntil(&d_popFrontDisabled, 0); } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> void SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION> ::enablePushBack() { incrementUntil(&d_pushBackDisabled, 0); } // ACCESSORS template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> bool SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>:: isEmpty() const { bsls::Types::Int64 state = ATOMIC_OP::getInt64Acquire(&d_state); return isEmpty(state); } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> bool SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::isFull() const { return false; } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> bool SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION> ::isPopFrontDisabled() const { return 1 == (ATOMIC_OP::getUintAcquire(&d_popFrontDisabled) & 1); } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> bool SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION> ::isPushBackDisabled() const { return 1 == (ATOMIC_OP::getUintAcquire(&d_pushBackDisabled) & 1); } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> bsl::size_t SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>:: numElements() const { bsls::Types::Int64 state = ATOMIC_OP::getInt64Acquire(&d_state); bsls::Types::Int64 avail = getAvailable(state); return avail >= 0 ? static_cast<bsl::size_t>(avail) : 0; } template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> int SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>:: waitUntilEmpty() const { unsigned int generation = ATOMIC_OP::getUintAcquire(&d_popFrontDisabled); if (1 == (generation & 1)) { return e_DISABLED; // RETURN } bslmt::LockGuard<MUTEX> guard(&d_emptyMutex); bsls::Types::Int64 state = ATOMIC_OP::getInt64Acquire(&d_state); while (!isEmpty(state)) { if (generation != ATOMIC_OP::getUintAcquire(&d_popFrontDisabled)) { return e_DISABLED; // RETURN } d_emptyCondition.wait(&d_emptyMutex); state = ATOMIC_OP::getInt64Acquire(&d_state); } return 0; } // Aspects template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION> bslma::Allocator *SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>:: allocator() const { return d_allocator_p; } } // close package namespace } // close enterprise namespace #endif // ---------------------------------------------------------------------------- // Copyright 2019 Bloomberg Finance L.P. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // ----------------------------- END-OF-FILE ----------------------------------