9#ifndef INCLUDED_BDLCC_SINGLEPRODUCERSINGLECONSUMERBOUNDEDQUEUE
10#define INCLUDED_BDLCC_SINGLEPRODUCERSINGLECONSUMERBOUNDEDQUEUE
224#include <bdlscm_version.h>
257template <
class TYPE,
class NODE>
293#if defined(BSLS_COMPILERFEATURES_SUPPORT_ALIGNAS)
294class alignas(bslmt::Platform::e_CACHE_LINE_SIZE)
302 typedef unsigned int Uint;
304 typedef typename bsls::AtomicOperations::AtomicTypes::Uint AtomicUint;
305 typedef typename bsls::AtomicOperations::AtomicTypes::Uint64 AtomicUint64;
319 e_READABLE_AND_BLOCKED,
323 e_WRITABLE_AND_EMPTY,
326 e_WRITABLE_AND_BLOCKED
333 template <
class DATA>
340 typedef QueueNode<TYPE> Node;
343 AtomicUint64 d_popIndex;
346 Node *d_popElement_p;
351 const bsl::size_t d_popCapacity;
355 AtomicUint d_popDisabledGeneration;
359 mutable AtomicUint d_emptyCount;
362 AtomicUint d_emptyGeneration;
368 -
sizeof(AtomicUint64)
370 -
sizeof(bsl::size_t)
373 -
sizeof(AtomicUint)];
379 AtomicUint64 d_pushIndex;
382 Node *d_pushElement_p;
387 const bsl::size_t d_pushCapacity;
391 AtomicUint d_pushDisabledGeneration;
396 -
sizeof(AtomicUint64)
398 -
sizeof(bsl::size_t)
399 -
sizeof(AtomicUint)];
440 static void incrementUntil(AtomicUint *value, unsigned int bitValue);
450 void popComplete(Node *node, Uint64 index);
465 int popFrontImp(TYPE *value, bool isTry);
476 int pushBackImp(const TYPE& value, bool isTry);
488 int pushBackImp(bslmf::MovableRef<TYPE> value, bool isTry);
494 void pushComplete(Node *node, Uint64 index);
498 const SingleProducerSingleConsumerBoundedQueue&);
500 const SingleProducerSingleConsumerBoundedQueue&);
505 bslma::UsesBslmaAllocator);
676template <
class TYPE,
class NODE>
678SingleProducerSingleConsumerBoundedQueue_PopCompleteGuard<TYPE, NODE>
679 ::SingleProducerSingleConsumerBoundedQueue_PopCompleteGuard(
689template <
class TYPE,
class NODE>
694 d_queue_p->popComplete(d_node_p, d_index);
706 unsigned int state = AtomicOp::getUintAcquire(value);
707 if (bitValue != (state & 1)) {
708 unsigned int expState;
711 state = AtomicOp::testAndSwapUintAcqRel(value,
714 }
while (state != expState && (bitValue == (state & 1)));
721void SingleProducerSingleConsumerBoundedQueue<TYPE>::popComplete(Node *node,
725 if (index == d_popCapacity) {
728 AtomicOp::setUint64Release(&d_popIndex, index);
730 node->d_value.object().~TYPE();
732 Uint nodeState = AtomicOp::swapUintAcqRel(&node->d_state, e_WRITABLE);
733 if (e_READABLE_AND_BLOCKED == nodeState) {
737 d_pushCondition.signal();
743 nodeState = AtomicOp::testAndSwapUintAcqRel(&d_popElement_p[index].d_state,
745 e_WRITABLE_AND_EMPTY);
746 if (e_WRITABLE == nodeState) {
749 AtomicOp::addUintAcqRel(&d_emptyGeneration, 1);
750 if (0 < AtomicOp::getUintAcquire(&d_emptyCount)) {
754 d_emptyCondition.broadcast();
760int SingleProducerSingleConsumerBoundedQueue<TYPE>::popFrontImp(TYPE *value,
763 Uint64 index = AtomicOp::getUint64Acquire(&d_popIndex);
764 const Uint disabledGen =
765 AtomicOp::getUintAcquire(&d_popDisabledGeneration);
767 if (disabledGen & 1) {
771 Node& node = d_popElement_p[index];
773 Uint nodeState = AtomicOp::getUintAcquire(&node.d_state);
781 if (e_WRITABLE_AND_EMPTY == nodeState) {
787 nodeState = AtomicOp::getUintAcquire(&node.d_state);
788 if (e_WRITABLE_AND_EMPTY == nodeState) {
791 nodeState = AtomicOp::testAndSwapUintAcqRel(
794 e_WRITABLE_AND_BLOCKED);
796 while (( e_WRITABLE_AND_EMPTY == nodeState
797 || e_WRITABLE_AND_BLOCKED == nodeState)
799 AtomicOp::getUintAcquire(&d_popDisabledGeneration)) {
800 int rv = d_popCondition.wait(&d_popMutex);
802 AtomicOp::testAndSwapUintAcqRel(&node.d_state,
803 e_WRITABLE_AND_BLOCKED,
804 e_WRITABLE_AND_EMPTY);
807 nodeState = AtomicOp::getUint(&node.d_state);
813 if ( e_WRITABLE_AND_EMPTY == nodeState
814 || e_WRITABLE_AND_BLOCKED == nodeState) {
815 AtomicOp::testAndSwapUintAcqRel(&node.d_state,
816 e_WRITABLE_AND_BLOCKED,
817 e_WRITABLE_AND_EMPTY);
823 SingleProducerSingleConsumerBoundedQueue_PopCompleteGuard<
824 SingleProducerSingleConsumerBoundedQueue<TYPE>, Node>
825 guard(
this, &node, index);
827#if defined(BSLMF_MOVABLEREF_USES_RVALUE_REFERENCES)
830 *value = node.d_value.object();
837int SingleProducerSingleConsumerBoundedQueue<TYPE>::pushBackImp(
841 Uint64 index = AtomicOp::getUint64Acquire(&d_pushIndex);
842 const Uint disabledGen =
843 AtomicOp::getUintAcquire(&d_pushDisabledGeneration);
845 if (disabledGen & 1) {
849 Node& node = d_pushElement_p[index];
851 Uint nodeState = AtomicOp::getUintAcquire(&node.d_state);
859 if (e_READABLE == nodeState) {
865 nodeState = AtomicOp::getUintAcquire(&node.d_state);
866 if (e_READABLE == nodeState) {
869 nodeState = AtomicOp::testAndSwapUintAcqRel(
872 e_READABLE_AND_BLOCKED);
874 while (( e_READABLE == nodeState
875 || e_READABLE_AND_BLOCKED == nodeState)
877 AtomicOp::getUintAcquire(&d_pushDisabledGeneration)) {
878 int rv = d_pushCondition.wait(&d_pushMutex);
880 AtomicOp::testAndSwapUintAcqRel(&node.d_state,
881 e_READABLE_AND_BLOCKED,
885 nodeState = AtomicOp::getUint(&node.d_state);
891 if ( e_READABLE == nodeState
892 || e_READABLE_AND_BLOCKED == nodeState) {
893 AtomicOp::testAndSwapUintAcqRel(&node.d_state,
894 e_READABLE_AND_BLOCKED,
905 pushComplete(&node, index);
911int SingleProducerSingleConsumerBoundedQueue<TYPE>::pushBackImp(
915 Uint64 index = AtomicOp::getUint64Acquire(&d_pushIndex);
916 const Uint disabledGen =
917 AtomicOp::getUintAcquire(&d_pushDisabledGeneration);
919 if (disabledGen & 1) {
923 Node& node = d_pushElement_p[index];
925 Uint nodeState = AtomicOp::getUintAcquire(&node.d_state);
927 if (e_READABLE == nodeState) {
933 nodeState = AtomicOp::getUintAcquire(&node.d_state);
934 if (e_READABLE == nodeState) {
937 nodeState = AtomicOp::testAndSwapUintAcqRel(
940 e_READABLE_AND_BLOCKED);
942 while (( e_READABLE == nodeState
943 || e_READABLE_AND_BLOCKED == nodeState)
945 AtomicOp::getUintAcquire(&d_pushDisabledGeneration)) {
946 int rv = d_pushCondition.wait(&d_pushMutex);
948 AtomicOp::testAndSwapUintAcqRel(&node.d_state,
949 e_READABLE_AND_BLOCKED,
953 nodeState = AtomicOp::getUint(&node.d_state);
956 if ( e_READABLE == nodeState
957 || e_READABLE_AND_BLOCKED == nodeState) {
958 AtomicOp::testAndSwapUintAcqRel(&node.d_state,
959 e_READABLE_AND_BLOCKED,
971 pushComplete(&node, index);
978void SingleProducerSingleConsumerBoundedQueue<TYPE>::pushComplete(
983 Uint nodeState = AtomicOp::swapUintAcqRel(&node->d_state, e_READABLE);
984 if (e_WRITABLE_AND_BLOCKED == nodeState) {
987 AtomicOp::addUintAcqRel(&d_emptyGeneration, 1);
991 d_popCondition.signal();
993 else if (e_WRITABLE_AND_EMPTY == nodeState) {
996 AtomicOp::addUintAcqRel(&d_emptyGeneration, 1);
1000 if (index == d_pushCapacity) {
1003 AtomicOp::setUint64Release(&d_pushIndex, index);
1007template <
class TYPE>
1012, d_popCapacity(capacity > 0 ? capacity : 1)
1014, d_pushCapacity(capacity > 0 ? capacity : 1)
1022, d_allocator_p(
bslma::Default::allocator(basicAllocator))
1032 d_popElement_p =
static_cast<Node *
>(
1033 d_allocator_p->
allocate(d_popCapacity *
sizeof(Node)));
1035 d_pushElement_p = d_popElement_p;
1038 for (bsl::size_t i = 1; i < d_popCapacity; ++i) {
1043template <
class TYPE>
1047 if (d_popElement_p) {
1049 d_allocator_p->deallocate(d_popElement_p);
1054template <
class TYPE>
1058 return popFrontImp(value,
false);
1061template <
class TYPE>
1065 return pushBackImp(value,
false);
1068template <
class TYPE>
1076template <
class TYPE>
1079 Uint64 index = AtomicOp::getUint64Acquire(&d_popIndex);
1080 Uint nodeState = AtomicOp::getUintAcquire(
1081 &d_popElement_p[index].d_state);
1083 while (e_READABLE == nodeState || e_READABLE_AND_BLOCKED == nodeState) {
1084 d_popElement_p[index].d_value.object().~TYPE();
1086 AtomicOp::swapUintAcqRel(&d_popElement_p[index].d_state, e_WRITABLE);
1089 if (index == d_popCapacity) {
1093 nodeState = AtomicOp::getUintAcquire(&d_popElement_p[index].d_state);
1100 nodeState = AtomicOp::testAndSwapUintAcqRel(&d_popElement_p[index].d_state,
1102 e_WRITABLE_AND_EMPTY);
1104 if (e_WRITABLE == nodeState) {
1107 AtomicOp::addUintAcqRel(&d_emptyGeneration, 1);
1116 AtomicOp::addUintAcqRel(&d_emptyGeneration, 2);
1119 AtomicOp::setUint64Release(&d_popIndex, index);
1124 d_pushCondition.signal();
1126 if (0 < AtomicOp::getUintAcquire(&d_emptyCount)) {
1130 d_emptyCondition.broadcast();
1134template <
class TYPE>
1138 return popFrontImp(value,
true);
1141template <
class TYPE>
1146 return pushBackImp(value,
true);
1149template <
class TYPE>
1159template <
class TYPE>
1163 incrementUntil(&d_popDisabledGeneration, 1);
1168 d_popCondition.broadcast();
1170 if (0 < AtomicOp::getUintAcquire(&d_emptyCount)) {
1174 d_emptyCondition.broadcast();
1178template <
class TYPE>
1182 incrementUntil(&d_pushDisabledGeneration, 1);
1187 d_pushCondition.broadcast();
1190template <
class TYPE>
1194 incrementUntil(&d_popDisabledGeneration, 0);
1197template <
class TYPE>
1201 incrementUntil(&d_pushDisabledGeneration, 0);
1205template <
class TYPE>
1209 return d_popCapacity;
1212template <
class TYPE>
1216 return 0 == (AtomicOp::getUintAcquire(&d_emptyGeneration) & 1);
1219template <
class TYPE>
1223 Node& node = d_pushElement_p[AtomicOp::getUint64Acquire(
1225 Uint nodeState = AtomicOp::getUintAcquire(&node.d_state);
1227 return e_READABLE == nodeState || e_READABLE_AND_BLOCKED == nodeState;
1230template <
class TYPE>
1234 return 1 == (AtomicOp::getUintAcquire(&d_popDisabledGeneration) & 1);
1237template <
class TYPE>
1241 return 1 == (AtomicOp::getUintAcquire(&d_pushDisabledGeneration) & 1);
1244template <
class TYPE>
1248 Uint64 popIndex = AtomicOp::getUint64Acquire(&d_popIndex);
1249 Uint64 pushIndex = AtomicOp::getUint64Acquire(&d_pushIndex);
1250 Node& node = d_pushElement_p[pushIndex];
1251 Uint nodeState = AtomicOp::getUintAcquire(&node.d_state);
1253 if (e_READABLE == nodeState || e_READABLE_AND_BLOCKED == nodeState) {
1254 return d_popCapacity;
1257 return static_cast<bsl::size_t
>( pushIndex >= popIndex
1258 ? pushIndex - popIndex
1259 : pushIndex + d_popCapacity - popIndex);
1262template <
class TYPE>
1265 AtomicOp::addUintAcqRel(&d_emptyCount, 1);
1267 const Uint initEmptyGen = AtomicOp::getUintAcquire(&d_emptyGeneration);
1269 const Uint disabledGen =
1270 AtomicOp::getUintAcquire(&d_popDisabledGeneration);
1272 if (disabledGen & 1) {
1273 AtomicOp::addUintAcqRel(&d_emptyCount, -1);
1277 if (0 == (initEmptyGen & 1)) {
1278 AtomicOp::addUintAcqRel(&d_emptyCount, -1);
1284 Uint emptyGen = AtomicOp::getUintAcquire(&d_emptyGeneration);
1286 while ( initEmptyGen == emptyGen
1288 AtomicOp::getUintAcquire(&d_popDisabledGeneration)) {
1289 int rv = d_emptyCondition.wait(&d_emptyMutex);
1291 AtomicOp::addUintAcqRel(&d_emptyCount, -1);
1294 emptyGen = AtomicOp::getUintAcquire(&d_emptyGeneration);
1297 AtomicOp::addUintAcqRel(&d_emptyCount, -1);
1299 if (initEmptyGen == emptyGen) {
1308template <
class TYPE>
1313 return d_allocator_p;
#define BSLMF_NESTED_TRAIT_DECLARATION(t_TYPE, t_TRAIT)
Definition bslmf_nestedtraitdeclaration.h:231
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:258
~SingleProducerSingleConsumerBoundedQueue_PopCompleteGuard()
Destroy this object and invoke the TYPE::popComplete.
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:692
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:297
void removeAll()
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:1077
int waitUntilEmpty() const
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:1263
bool isPushBackDisabled() const
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:1239
bsl::size_t capacity() const
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:1207
void disablePopFront()
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:1161
int tryPushBack(const TYPE &value)
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:1143
bslma::Allocator * allocator() const
Return the allocator used by this object to supply memory.
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:1310
int pushBack(const TYPE &value)
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:1063
bool isPopFrontDisabled() const
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:1232
TYPE value_type
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:508
bool isEmpty() const
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:1214
~SingleProducerSingleConsumerBoundedQueue()
Destroy this object.
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:1045
void enablePushBack()
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:1199
bool isFull() const
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:1221
void disablePushBack()
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:1180
bsl::size_t numElements() const
Returns the number of elements currently in this queue.
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:1246
int tryPopFront(TYPE *value)
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:1136
int popFront(TYPE *value)
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:1056
void enablePopFront()
Definition bdlcc_singleproducersingleconsumerboundedqueue.h:1192
Definition bslma_allocator.h:457
virtual void * allocate(size_type size)=0
Definition bslmf_movableref.h:751
Definition bslmt_condition.h:220
Definition bslmt_lockguard.h:234
Definition bslmt_mutex.h:315
#define BSLS_IDENT(str)
Definition bsls_ident.h:195
Definition bdlcc_boundedqueue.h:270
Definition balxml_encoderoptions.h:68
static void moveConstruct(TARGET_TYPE *address, TARGET_TYPE &original, bslma::Allocator *allocator)
Definition bslalg_scalarprimitives.h:1642
static void copyConstruct(TARGET_TYPE *address, const TARGET_TYPE &original, bslma::Allocator *allocator)
Definition bslalg_scalarprimitives.h:1599
static MovableRef< t_TYPE > move(t_TYPE &reference) BSLS_KEYWORD_NOEXCEPT
Definition bslmf_movableref.h:1060
static void yield()
Definition bslmt_threadutil.h:1013
Definition bsls_atomicoperations.h:834
static void initUint64(AtomicTypes::Uint64 *atomicUint, Types::Uint64 initialValue=0)
Definition bsls_atomicoperations.h:2121
static void initUint(AtomicTypes::Uint *atomicUint, unsigned int initialValue=0)
Definition bsls_atomicoperations.h:1922
unsigned long long Uint64
Definition bsls_types.h:137
Definition bsls_objectbuffer.h:276