9#ifndef INCLUDED_BDLCC_SINGLEPRODUCERQUEUEIMPL
10#define INCLUDED_BDLCC_SINGLEPRODUCERQUEUEIMPL
116#include <bdlscm_version.h>
134#include <bsl_cstddef.h>
187template <
class TYPE,
class NODE>
229template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
245 static const int k_POP_YIELD_SPIN = 10;
256 static const int k_AVAILABLE_SHIFT = 24;
259 typedef typename ATOMIC_OP::AtomicTypes::Int AtomicInt;
260 typedef typename ATOMIC_OP::AtomicTypes::Uint AtomicUint;
261 typedef typename ATOMIC_OP::AtomicTypes::Int64 AtomicInt64;
262 typedef typename ATOMIC_OP::AtomicTypes::Pointer AtomicPointer;
269 AtomicPointer d_next;
272 typedef QueueNode<TYPE> Node;
275 AtomicPointer d_nextWrite;
277 AtomicPointer d_nextRead;
283 CONDITION d_readCondition;
286 mutable MUTEX d_emptyMutex;
289 mutable CONDITION d_emptyCondition;
296 AtomicUint d_popFrontDisabled;
300 AtomicUint d_pushBackDisabled;
328 static bool allElementsReserved(bsls::Types::Int64 state);
332 static bool canSupplyBlockedThread(bsls::Types::Int64 state);
337 static bool canSupplyOneBlockedThread(bsls::Types::Int64 state);
344 static bool
isEmpty(bsls::Types::Int64 state);
348 static bool willHaveBlockedThread(bsls::Types::Int64 state);
357 void incrementUntil(AtomicUint *value, unsigned int bitValue);
364 void popComplete(Node *node, bool isEmpty);
370 void popFrontRaw(TYPE* value, bool isEmpty);
375 void releaseAllRaw();
384 bslma::UsesBslmaAllocator);
547 d_queue_p->releaseAllRaw();
563template <
class TYPE,
class NODE>
574template <
class TYPE,
class NODE>
578 d_queue_p->popComplete(d_node_p, d_isEmpty);
586template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
591 return (state >> k_AVAILABLE_SHIFT) <= (state & k_BLOCKED_MASK);
594template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
596bool SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::
599 return k_AVAILABLE_INC <= state && (state & k_BLOCKED_MASK);
602template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
604bool SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::
607 return k_AVAILABLE_INC == (state & k_AVAILABLE_MASK)
608 && (state & k_BLOCKED_MASK);
611template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
616 return state >> k_AVAILABLE_SHIFT;
619template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
624 return k_AVAILABLE_INC > state;
627template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
629bool SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::
632 return (state >> k_AVAILABLE_SHIFT) < (state & k_BLOCKED_MASK);
636template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
637void SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>
638 ::incrementUntil(AtomicUint *value,
unsigned int bitValue)
640 unsigned int state = ATOMIC_OP::getUintAcquire(value);
641 if (bitValue != (state & 1)) {
642 unsigned int expState;
645 state = ATOMIC_OP::testAndSwapUintAcqRel(value,
648 }
while (state != expState && (bitValue == (state & 1)));
652template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
653void SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::
654 popComplete(Node *node,
bool isEmpty)
656 node->d_value.object().~TYPE();
658 ATOMIC_OP::setIntRelease(&node->d_state, e_WRITABLE);
664 d_emptyCondition.broadcast();
669template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
670void SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::
671 popFrontRaw(TYPE *value,
675 static_cast<Node *
>(ATOMIC_OP::getPtrAcquire(&d_nextRead));
680 static_cast<Node *
>(ATOMIC_OP::getPtrAcquire(&readFrom->d_next));
683 readFrom =
static_cast<Node *
>(ATOMIC_OP::testAndSwapPtrAcqRel(
687 }
while (readFrom != exp);
689 SingleProducerQueueImpl_PopCompleteGuard<
690 SingleProducerQueueImpl <TYPE,
694 Node> guard(
this, readFrom, isEmpty);
696#if defined(BSLMF_MOVABLEREF_USES_RVALUE_REFERENCES)
699 *value = readFrom->d_value.object();
703template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
704void SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::
707 Node *
end =
static_cast<Node *
>(ATOMIC_OP::getPtrAcquire(&d_nextWrite));
710 Node *at =
static_cast<Node *
>(ATOMIC_OP::getPtrAcquire(&
end->d_next));
714 static_cast<Node *
>(ATOMIC_OP::getPtrAcquire(&at->d_next));
716 if (e_WRITABLE != ATOMIC_OP::getIntAcquire(&at->d_state)) {
717 at->d_value.object().~TYPE();
720 d_allocator_p->deallocate(at);
725 if (e_WRITABLE != ATOMIC_OP::getIntAcquire(&at->d_state)) {
726 at->d_value.object().~TYPE();
729 d_allocator_p->deallocate(at);
734template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
741, d_allocator_p(
bslma::Default::allocator(basicAllocator))
743 ATOMIC_OP::initInt64(&d_state, 0);
748 ATOMIC_OP::initUint(&d_popFrontDisabled, 0);
749 ATOMIC_OP::initUint(&d_pushBackDisabled, 0);
751 ATOMIC_OP::initPointer(&d_nextWrite, 0);
757 CONDITION> > proctor(
this);
759 Node *n1 =
static_cast<Node *
>(d_allocator_p->
allocate(
sizeof(Node)));
760 ATOMIC_OP::initInt(&n1->d_state, e_WRITABLE);
761 ATOMIC_OP::initPointer(&n1->d_next, n1);
763 ATOMIC_OP::setPtrRelease(&d_nextWrite, n1);
765 ATOMIC_OP::initPointer(&d_nextRead, n1);
767 Node *n2 =
static_cast<Node *
>(d_allocator_p->
allocate(
sizeof(Node)));
768 ATOMIC_OP::initInt(&n2->d_state, e_WRITABLE);
769 ATOMIC_OP::initPointer(&n2->d_next, n1);
770 ATOMIC_OP::setPtrRelease(&n1->d_next, n2);
775template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
783, d_allocator_p(
bslma::Default::allocator(basicAllocator))
785 ATOMIC_OP::initInt64(&d_state, 0);
790 ATOMIC_OP::initUint(&d_popFrontDisabled, 0);
791 ATOMIC_OP::initUint(&d_pushBackDisabled, 0);
793 ATOMIC_OP::initPointer(&d_nextWrite, 0);
799 CONDITION> > proctor(
this);
801 Node *n1 =
static_cast<Node *
>(d_allocator_p->
allocate(
sizeof(Node)));
802 ATOMIC_OP::initInt(&n1->d_state, e_WRITABLE);
803 ATOMIC_OP::initPointer(&n1->d_next, n1);
805 ATOMIC_OP::setPtrRelease(&d_nextWrite, n1);
807 ATOMIC_OP::initPointer(&d_nextRead, n1);
809 Node *n2 =
static_cast<Node *
>(d_allocator_p->
allocate(
sizeof(Node)));
810 ATOMIC_OP::initInt(&n2->d_state, e_WRITABLE);
811 ATOMIC_OP::initPointer(&n2->d_next, n1);
812 ATOMIC_OP::setPtrRelease(&n1->d_next, n2);
814 capacity = (2 <= capacity ? capacity : 2);
816 for (bsl::size_t i = 2; i < capacity; ++i) {
817 Node *n =
static_cast<Node *
>(d_allocator_p->
allocate(
sizeof(Node)));
818 ATOMIC_OP::initInt(&n->d_state, e_WRITABLE);
819 ATOMIC_OP::initPointer(&n->d_next,
820 ATOMIC_OP::getPtrAcquire(&n2->d_next));
822 ATOMIC_OP::setPtrRelease(&n2->d_next, n);
828template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
836template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
840 unsigned int generation = ATOMIC_OP::getUintAcquire(&d_popFrontDisabled);
841 if (1 == (generation & 1)) {
848 if (willHaveBlockedThread(state)) {
850 state = ATOMIC_OP::getInt64Acquire(&d_state);
851 if (willHaveBlockedThread(state)) {
855 state = ATOMIC_OP::addInt64NvAcqRel(
857 k_AVAILABLE_INC + k_BLOCKED_INC);
859 while (isEmpty(state)) {
861 ATOMIC_OP::getUintAcquire(&d_popFrontDisabled)) {
862 ATOMIC_OP::addInt64AcqRel(&d_state, -k_BLOCKED_INC);
865 d_readCondition.wait(&d_readMutex);
866 state = ATOMIC_OP::getInt64Acquire(&d_state);
869 state = ATOMIC_OP::addInt64NvAcqRel(
871 -(k_AVAILABLE_INC + k_BLOCKED_INC));
873 if (canSupplyBlockedThread(state)) {
874 d_readCondition.signal();
879 popFrontRaw(value, isEmpty(state));
884template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
888 if (1 == (ATOMIC_OP::getUintAcquire(&d_pushBackDisabled) & 1)) {
892 Node *nextWrite =
static_cast<Node *
>(
893 ATOMIC_OP::getPtrAcquire(&d_nextWrite));
895 Node *next =
static_cast<Node *
>(
896 ATOMIC_OP::getPtrAcquire(&nextWrite->d_next));
898 if (e_WRITABLE != ATOMIC_OP::getIntAcquire(&next->d_state)) {
899 Node *n =
static_cast<Node *
>(d_allocator_p->allocate(
sizeof(Node)));
901 ATOMIC_OP::initInt(&n->d_state, e_WRITABLE);
902 ATOMIC_OP::initPointer(&n->d_next, next);
904 ATOMIC_OP::setPtrRelease(&nextWrite->d_next, n);
913 ATOMIC_OP::setIntRelease(&nextWrite->d_state, e_READABLE);
914 ATOMIC_OP::setPtrRelease(&d_nextWrite, next);
919 if (canSupplyOneBlockedThread(state)) {
923 d_readCondition.signal();
929template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
933 if (1 == (ATOMIC_OP::getUintAcquire(&d_pushBackDisabled) & 1)) {
937 Node *nextWrite =
static_cast<Node *
>(
938 ATOMIC_OP::getPtrAcquire(&d_nextWrite));
940 Node *next =
static_cast<Node *
>(
941 ATOMIC_OP::getPtrAcquire(&nextWrite->d_next));
943 if (e_WRITABLE != ATOMIC_OP::getIntAcquire(&next->d_state)) {
944 Node *n =
static_cast<Node *
>(d_allocator_p->allocate(
sizeof(Node)));
946 ATOMIC_OP::initInt(&n->d_state, e_WRITABLE);
947 ATOMIC_OP::initPointer(&n->d_next, next);
949 ATOMIC_OP::setPtrRelease(&nextWrite->d_next, n);
959 ATOMIC_OP::setIntRelease(&nextWrite->d_state, e_READABLE);
960 ATOMIC_OP::setPtrRelease(&d_nextWrite, next);
965 if (canSupplyOneBlockedThread(state)) {
969 d_readCondition.signal();
975template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
979 unsigned int generation = ATOMIC_OP::getUintAcquire(&d_popFrontDisabled);
980 if (1 == (generation & 1)) {
989 while (willHaveBlockedThread(state)) {
996 state = ATOMIC_OP::testAndSwapInt64AcqRel(&d_state,
998 state + k_AVAILABLE_INC);
999 if (expState == state) {
1002 state += k_AVAILABLE_INC;
1003 if (canSupplyBlockedThread(state)) {
1007 d_readCondition.signal();
1013 popFrontRaw(value, isEmpty(state));
1018template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1022 return pushBack(value);
1025template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1032template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1039 if (allElementsReserved(state)) {
1043 state = ATOMIC_OP::testAndSwapInt64AcqRel(
1046 (state & ~k_AVAILABLE_MASK)
1047 | ( (state & k_BLOCKED_MASK)
1048 << k_AVAILABLE_SHIFT));
1049 }
while (state != expState);
1051 state = (state >> k_AVAILABLE_SHIFT) - (state & k_BLOCKED_MASK);
1055 static_cast<Node *
>(ATOMIC_OP::getPtrAcquire(&d_nextRead));
1059 static_cast<Node *
>(ATOMIC_OP::getPtrAcquire(&readFrom->d_next));
1062 readFrom =
static_cast<Node *
>(ATOMIC_OP::testAndSwapPtrAcqRel(
1066 }
while (readFrom != exp);
1068 readFrom->d_value.object().~TYPE();
1070 ATOMIC_OP::setIntRelease(&readFrom->d_state, e_WRITABLE);
1076 d_emptyCondition.broadcast();
1081template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1085 incrementUntil(&d_popFrontDisabled, 1);
1090 d_readCondition.broadcast();
1095 d_emptyCondition.broadcast();
1098template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1102 incrementUntil(&d_pushBackDisabled, 1);
1105template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1109 incrementUntil(&d_popFrontDisabled, 0);
1112template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1116 incrementUntil(&d_pushBackDisabled, 0);
1120template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1125 return isEmpty(state);
1128template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1134template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1138 return 1 == (ATOMIC_OP::getUintAcquire(&d_popFrontDisabled) & 1);
1141template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1145 return 1 == (ATOMIC_OP::getUintAcquire(&d_pushBackDisabled) & 1);
1148template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1155 return avail >= 0 ?
static_cast<bsl::size_t
>(avail) : 0;
1158template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1162 unsigned int generation = ATOMIC_OP::getUintAcquire(&d_popFrontDisabled);
1163 if (1 == (generation & 1)) {
1170 while (!isEmpty(state)) {
1171 if (generation != ATOMIC_OP::getUintAcquire(&d_popFrontDisabled)) {
1174 d_emptyCondition.wait(&d_emptyMutex);
1175 state = ATOMIC_OP::getInt64Acquire(&d_state);
1183template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1187 return d_allocator_p;
#define BSLMF_NESTED_TRAIT_DECLARATION(t_TYPE, t_TRAIT)
Definition bslmf_nestedtraitdeclaration.h:231
Definition bdlcc_singleproducerqueueimpl.h:188
~SingleProducerQueueImpl_PopCompleteGuard()
Definition bdlcc_singleproducerqueueimpl.h:576
Definition bdlcc_singleproducerqueueimpl.h:149
~SingleProducerQueueImpl_ReleaseAllRawProctor()
Definition bdlcc_singleproducerqueueimpl.h:544
void release()
Definition bdlcc_singleproducerqueueimpl.h:553
Definition bdlcc_singleproducerqueueimpl.h:230
void disablePopFront()
Definition bdlcc_singleproducerqueueimpl.h:1083
bool isFull() const
Definition bdlcc_singleproducerqueueimpl.h:1129
SingleProducerQueueImpl(bslma::Allocator *basicAllocator=0)
Definition bdlcc_singleproducerqueueimpl.h:736
void enablePushBack()
Definition bdlcc_singleproducerqueueimpl.h:1114
SingleProducerQueueImpl(bsl::size_t capacity, bslma::Allocator *basicAllocator=0)
Definition bdlcc_singleproducerqueueimpl.h:777
bool isEmpty() const
Definition bdlcc_singleproducerqueueimpl.h:1122
int waitUntilEmpty() const
Definition bdlcc_singleproducerqueueimpl.h:1160
void disablePushBack()
Definition bdlcc_singleproducerqueueimpl.h:1100
int tryPushBack(bslmf::MovableRef< TYPE > value)
Definition bdlcc_singleproducerqueueimpl.h:1026
void enablePopFront()
Definition bdlcc_singleproducerqueueimpl.h:1107
int popFront(TYPE *value)
Definition bdlcc_singleproducerqueueimpl.h:837
int tryPopFront(TYPE *value)
Definition bdlcc_singleproducerqueueimpl.h:976
int tryPushBack(const TYPE &value)
Definition bdlcc_singleproducerqueueimpl.h:1019
TYPE value_type
Definition bdlcc_singleproducerqueueimpl.h:387
bslma::Allocator * allocator() const
Return the allocator used by this object to supply memory.
Definition bdlcc_singleproducerqueueimpl.h:1185
int pushBack(const TYPE &value)
Definition bdlcc_singleproducerqueueimpl.h:885
bsl::size_t numElements() const
Returns the number of elements currently in this queue.
Definition bdlcc_singleproducerqueueimpl.h:1150
bool isPopFrontDisabled() const
Definition bdlcc_singleproducerqueueimpl.h:1136
int pushBack(bslmf::MovableRef< TYPE > value)
Definition bdlcc_singleproducerqueueimpl.h:930
void removeAll()
Definition bdlcc_singleproducerqueueimpl.h:1033
~SingleProducerQueueImpl()
Destroy this object.
Definition bdlcc_singleproducerqueueimpl.h:830
bool isPushBackDisabled() const
Definition bdlcc_singleproducerqueueimpl.h:1143
Definition bslma_allocator.h:457
virtual void * allocate(size_type size)=0
Definition bslmf_movableref.h:751
Definition bslmt_lockguard.h:234
#define BSLS_IDENT(str)
Definition bsls_ident.h:195
Definition bdlcc_boundedqueue.h:270
T::iterator end(T &container)
Definition bslstl_iterator.h:1523
Definition balxml_encoderoptions.h:68
static void moveConstruct(TARGET_TYPE *address, TARGET_TYPE &original, bslma::Allocator *allocator)
Definition bslalg_scalarprimitives.h:1642
static void copyConstruct(TARGET_TYPE *address, const TARGET_TYPE &original, bslma::Allocator *allocator)
Definition bslalg_scalarprimitives.h:1599
static MovableRef< t_TYPE > move(t_TYPE &reference) BSLS_KEYWORD_NOEXCEPT
Definition bslmf_movableref.h:1060
static void yield()
Definition bslmt_threadutil.h:1013
long long Int64
Definition bsls_types.h:132
Definition bsls_objectbuffer.h:276