9#ifndef INCLUDED_BDLCC_BOUNDEDQUEUE
10#define INCLUDED_BDLCC_BOUNDEDQUEUE
244#include <bdlscm_version.h>
266#include <bsl_climits.h>
267#include <bsl_cstdint.h>
280template <
class TYPE,
class NODE>
362template <
class TYPE,
bool RECLAIMABLE>
369 bool d_isUnconstructedFlag;
383 void setIsUnconstructed(
bool isUnconstructedFlag);
388 bool isUnconstructed()
const;
399 void setIsUnconstructed(
bool );
404 bool isUnconstructed()
const;
426 static const unsigned int k_FINISHED_SHIFT = 32;
428 static const unsigned int k_MAXIMUM_CIRCULAR_DIFFERENCE =
429 static_cast<unsigned int>(1) << (
sizeof(
unsigned int) * CHAR_BIT - 1);
432 typedef unsigned int Uint;
434 typedef typename bsls::AtomicOperations::AtomicTypes::Uint AtomicUint;
435 typedef typename bsls::AtomicOperations::AtomicTypes::Uint64 AtomicUint64;
450 AtomicUint64 d_pushCount;
458 AtomicUint64 d_pushIndex;
468 AtomicUint64 d_popCount;
475 AtomicUint64 d_popIndex;
478 mutable AtomicUint d_emptyWaiterCount;
482 AtomicUint d_emptyCountSeen;
500 const Uint64 d_capacity;
510 BoundedQueue<TYPE> >;
518 static bool circularlyGreater(Uint lhs, Uint rhs);
525 static bool isQuiescentState(bsls::Types::Uint64 count);
527 static Uint64 markFinishedOperation(AtomicUint64 *count);
537 static Uint64 markFinishedOperation(AtomicUint64 *count, int num);
548 static void markReclaimed(AtomicUint64 *count);
550 static void markStartedOperation(AtomicUint64 *count);
559 static void markStartedOperation(AtomicUint64 *count, int num);
570 static Uint64 unmarkStartedOperation(AtomicUint64 *count);
577 void popComplete(Node *node);
582 void popFrontHelper(TYPE *value);
592 void pushExceptionComplete();
600 bool updateEmptyCountSeen(Uint emptyCount);
768template <
class TYPE,
class NODE>
777template <
class TYPE,
class NODE>
781 d_queue_p->popComplete(d_node_p);
803 d_queue_p->pushExceptionComplete();
823 bool isUnconstructedFlag)
825 d_isUnconstructedFlag = isUnconstructedFlag;
840 return d_isUnconstructedFlag;
859 return lhs > rhs ? (lhs - rhs) <= k_MAXIMUM_CIRCULAR_DIFFERENCE
860 : (rhs - lhs) > k_MAXIMUM_CIRCULAR_DIFFERENCE;
867 return (count >> k_FINISHED_SHIFT) == (count & k_STARTED_MASK);
875 return AtomicOp::addUint64NvAcqRel(count, k_FINISHED_INC);
884 return AtomicOp::addUint64NvAcqRel(count, num * k_FINISHED_INC);
889void BoundedQueue<TYPE>::markReclaimed(AtomicUint64 *count)
891 AtomicOp::addUint64AcqRel(count, k_STARTED_INC + k_FINISHED_INC);
896void BoundedQueue<TYPE>::markStartedOperation(AtomicUint64 *count)
898 AtomicOp::addUint64AcqRel(count, k_STARTED_INC);
903void BoundedQueue<TYPE>::markStartedOperation(AtomicUint64 *count,
int num)
905 AtomicOp::addUint64AcqRel(count, num * k_STARTED_INC);
913 return AtomicOp::addUint64NvAcqRel(count, k_STARTED_DEC);
919void BoundedQueue<TYPE>::popComplete(Node *node)
921 node->d_value.object().~TYPE();
923 Uint64 count = markFinishedOperation(&d_popCount);
924 if (isQuiescentState(count)) {
930 if (AtomicOp::testAndSwapUint64AcqRel(&d_popCount,
933 d_pushSemaphore.postWithRedundantSignal(
934 static_cast<int>(count & k_STARTED_MASK),
935 static_cast<int>(d_capacity),
938 Uint emptyCount = AtomicOp::getUintAcquire(&d_emptyWaiterCount);
940 if (isEmpty() && updateEmptyCountSeen(emptyCount)) {
944 d_emptyCondition.broadcast();
951void BoundedQueue<TYPE>::popFrontHelper(TYPE *value)
953 markStartedOperation(&d_popCount);
957 Uint64 index = (AtomicOp::addUint64NvAcqRel(&d_popIndex, 1) - 1)
959 Node *node = &d_element_p[index];
968 while (node->isUnconstructed()) {
969 markReclaimed(&d_popCount);
971 index = (AtomicOp::addUint64NvAcqRel(&d_popIndex, 1) - 1) % d_capacity;
972 node = &d_element_p[index];
975 BoundedQueue_PopCompleteGuard<BoundedQueue<TYPE>, Node> guard(
this, node);
977#if defined(BSLMF_MOVABLEREF_USES_RVALUE_REFERENCES)
980 *value = node->d_value.object();
986void BoundedQueue<TYPE>::pushComplete()
988 Uint64 count = markFinishedOperation(&d_pushCount);
989 if (isQuiescentState(count)) {
995 if (AtomicOp::testAndSwapUint64AcqRel(&d_pushCount,
998 d_popSemaphore.postWithRedundantSignal(
999 static_cast<int>(count & k_STARTED_MASK),
1000 static_cast<int>(d_capacity),
1006template <
class TYPE>
1008void BoundedQueue<TYPE>::pushExceptionComplete()
1010 Uint64 count = unmarkStartedOperation(&d_pushCount);
1012 int numToPost =
static_cast<int>(count & k_STARTED_MASK);
1014 if (0 != numToPost && isQuiescentState(count)) {
1020 if (AtomicOp::testAndSwapUint64AcqRel(&d_pushCount,
1023 d_popSemaphore.post(numToPost);
1028template <
class TYPE>
1030bool BoundedQueue<TYPE>::updateEmptyCountSeen(Uint emptyCount)
1032 Uint emptyCountSeen = AtomicOp::getUintAcquire(&d_emptyCountSeen);
1033 while (circularlyGreater(emptyCount, emptyCountSeen)) {
1034 const Uint origEmptyCountSeen = emptyCountSeen;
1036 emptyCountSeen = AtomicOp::testAndSwapUintAcqRel(&d_emptyCountSeen,
1040 if (origEmptyCountSeen == emptyCountSeen) {
1048template <
class TYPE>
1056, d_capacity(capacity > 2 ? capacity : 2)
1057, d_allocator_p(
bslma::Default::allocator(basicAllocator))
1067 d_element_p =
static_cast<Node *
>(
1068 d_allocator_p->
allocate(
static_cast<bsl::size_t
>(
1069 d_capacity *
sizeof(
Node))));
1071 for (bsl::size_t i = 0; i < d_capacity; ++i) {
1072 d_element_p[i].setIsUnconstructed(
false);
1075 d_pushSemaphore.
post(
static_cast<int>(d_capacity));
1078template <
class TYPE>
1083 d_allocator_p->deallocate(d_element_p);
1088template <
class TYPE>
1092 int rv = d_popSemaphore.wait();
1100 popFrontHelper(value);
1105template <
class TYPE>
1108 int rv = d_pushSemaphore.wait();
1116 markStartedOperation(&d_pushCount);
1120 Uint64 index = (AtomicOp::addUint64NvAcqRel(&d_pushIndex, 1) - 1)
1122 Node& node = d_element_p[index];
1124 node.setIsUnconstructed(
true);
1134 node.setIsUnconstructed(
false);
1141template <
class TYPE>
1144 int rv = d_pushSemaphore.wait();
1152 markStartedOperation(&d_pushCount);
1156 Uint64 index = (AtomicOp::addUint64NvAcqRel(&d_pushIndex, 1) - 1)
1158 Node& node = d_element_p[index];
1160 node.setIsUnconstructed(
true);
1164 TYPE& dummy = value;
1171 node.setIsUnconstructed(
false);
1178template <
class TYPE>
1181 int reclaim = d_popSemaphore.takeAll();
1185 int count = reclaim;
1193 markStartedOperation(&d_popCount, count);
1198 Uint64 index = AtomicOp::addUint64NvAcqRel(&d_popIndex, count)
1201 for (
int i = 0; i < count; ++i, ++index) {
1202 Node& node = d_element_p[index % d_capacity];
1204 if (!node.isUnconstructed()) {
1205 node.d_value.object().~TYPE();
1216 Uint64 popCount = markFinishedOperation(&d_popCount, count);
1218 if (isQuiescentState(popCount)) {
1223 if (AtomicOp::testAndSwapUint64AcqRel(&d_popCount,
1226 d_pushSemaphore.post(
static_cast<int>(
1227 popCount & k_STARTED_MASK));
1230 Uint emptyCount = AtomicOp::getUintAcquire(
1231 &d_emptyWaiterCount);
1233 if (isEmpty() && updateEmptyCountSeen(emptyCount)) {
1237 d_emptyCondition.broadcast();
1244template <
class TYPE>
1248 int rv = d_popSemaphore.tryWait();
1259 popFrontHelper(value);
1264template <
class TYPE>
1267 int rv = d_pushSemaphore.tryWait();
1278 markStartedOperation(&d_pushCount);
1282 Uint64 index = (AtomicOp::addUint64NvAcqRel(&d_pushIndex, 1) - 1)
1284 Node& node = d_element_p[index];
1286 node.setIsUnconstructed(
true);
1296 node.setIsUnconstructed(
false);
1303template <
class TYPE>
1306 int rv = d_pushSemaphore.tryWait();
1317 markStartedOperation(&d_pushCount);
1321 Uint64 index = (AtomicOp::addUint64NvAcqRel(&d_pushIndex, 1) - 1)
1323 Node& node = d_element_p[index];
1325 node.setIsUnconstructed(
true);
1329 TYPE& dummy = value;
1336 node.setIsUnconstructed(
false);
1345template <
class TYPE>
1349 d_popSemaphore.disable();
1354 d_emptyCondition.broadcast();
1357template <
class TYPE>
1361 d_pushSemaphore.disable();
1364template <
class TYPE>
1368 d_popSemaphore.enable();
1371template <
class TYPE>
1375 d_pushSemaphore.enable();
1379template <
class TYPE>
1383 return static_cast<bsl::size_t
>(d_capacity);
1386template <
class TYPE>
1390 return d_capacity ==
static_cast<Uint64
>(d_pushSemaphore.getValue());
1393template <
class TYPE>
1397 return 0 == d_pushSemaphore.getValue();
1400template <
class TYPE>
1404 return d_popSemaphore.isDisabled();
1407template <
class TYPE>
1411 return d_pushSemaphore.isDisabled();
1414template <
class TYPE>
1418 return d_popSemaphore.getValue();
1421template <
class TYPE>
1424 Uint emptyCount = AtomicOp::addUintNvAcqRel(&d_emptyWaiterCount, 1) - 1;
1426 int state = d_popSemaphore.getDisabledState();
1427 if (1 == (state & 1)) {
1441 bool empty = isEmpty()
1442 || circularlyGreater(AtomicOp::getUintAcquire(&d_emptyCountSeen),
1445 while (!empty && state == d_popSemaphore.getDisabledState()) {
1446 int rv = d_emptyCondition.wait(&d_emptyMutex);
1451 || circularlyGreater(AtomicOp::getUintAcquire(&d_emptyCountSeen),
1464template <
class TYPE>
1468 return d_allocator_p;
#define BSLMF_NESTED_TRAIT_DECLARATION(t_TYPE, t_TRAIT)
Definition bslmf_nestedtraitdeclaration.h:231
Definition bdlcc_boundedqueue.h:281
~BoundedQueue_PopCompleteGuard()
Definition bdlcc_boundedqueue.h:779
Definition bdlcc_boundedqueue.h:313
void release()
Definition bdlcc_boundedqueue.h:810
~BoundedQueue_PushExceptionCompleteProctor()
Definition bdlcc_boundedqueue.h:800
Definition bdlcc_boundedqueue.h:415
bsl::size_t capacity() const
Definition bdlcc_boundedqueue.h:1381
bool isPopFrontDisabled() const
Definition bdlcc_boundedqueue.h:1402
int pushBack(bslmf::MovableRef< TYPE > value)
Definition bdlcc_boundedqueue.h:1142
int waitUntilEmpty() const
Definition bdlcc_boundedqueue.h:1422
void enablePushBack()
Definition bdlcc_boundedqueue.h:1373
BoundedQueue(bsl::size_t capacity, bslma::Allocator *basicAllocator=0)
Definition bdlcc_boundedqueue.h:1049
void removeAll()
Definition bdlcc_boundedqueue.h:1179
void enablePopFront()
Definition bdlcc_boundedqueue.h:1366
int tryPopFront(TYPE *value)
Definition bdlcc_boundedqueue.h:1246
int popFront(TYPE *value)
Definition bdlcc_boundedqueue.h:1090
bool isEmpty() const
Definition bdlcc_boundedqueue.h:1388
void disablePopFront()
Definition bdlcc_boundedqueue.h:1347
int pushBack(const TYPE &value)
Definition bdlcc_boundedqueue.h:1106
bsl::size_t numElements() const
Definition bdlcc_boundedqueue.h:1416
bool isFull() const
Definition bdlcc_boundedqueue.h:1395
bool isPushBackDisabled() const
Definition bdlcc_boundedqueue.h:1409
~BoundedQueue()
Destroy this object.
Definition bdlcc_boundedqueue.h:1079
bslma::Allocator * allocator() const
Return the allocator used by this object to supply memory.
Definition bdlcc_boundedqueue.h:1466
int tryPushBack(const TYPE &value)
Definition bdlcc_boundedqueue.h:1265
void disablePushBack()
Definition bdlcc_boundedqueue.h:1359
TYPE value_type
Definition bdlcc_boundedqueue.h:611
int tryPushBack(bslmf::MovableRef< TYPE > value)
Definition bdlcc_boundedqueue.h:1304
Definition bslma_allocator.h:457
virtual void * allocate(size_type size)=0
Definition bslmf_movableref.h:751
Definition bslmt_condition.h:220
Definition bslmt_fastpostsemaphore.h:327
void post()
Atomically increment the count of this semaphore.
Definition bslmt_fastpostsemaphore.h:604
@ e_DISABLED
Definition bslmt_fastpostsemaphore.h:347
@ e_WOULD_BLOCK
Definition bslmt_fastpostsemaphore.h:352
Definition bslmt_lockguard.h:234
Definition bslmt_mutex.h:315
#define BSLS_IDENT(str)
Definition bsls_ident.h:195
Definition bdlcc_boundedqueue.h:270
Definition balxml_encoderoptions.h:68
bsls::ObjectBuffer< TYPE > d_value
Definition bdlcc_boundedqueue.h:394
bsls::ObjectBuffer< TYPE > d_value
Definition bdlcc_boundedqueue.h:373
Definition bdlcc_boundedqueue.h:363
static void moveConstruct(TARGET_TYPE *address, TARGET_TYPE &original, bslma::Allocator *allocator)
Definition bslalg_scalarprimitives.h:1642
static void copyConstruct(TARGET_TYPE *address, const TARGET_TYPE &original, bslma::Allocator *allocator)
Definition bslalg_scalarprimitives.h:1599
Definition bslmf_isbitwisecopyable.h:298
static MovableRef< t_TYPE > move(t_TYPE &reference) BSLS_KEYWORD_NOEXCEPT
Definition bslmf_movableref.h:1060
Definition bsls_atomicoperations.h:834
static void initUint64(AtomicTypes::Uint64 *atomicUint, Types::Uint64 initialValue=0)
Definition bsls_atomicoperations.h:2121
static void initUint(AtomicTypes::Uint *atomicUint, unsigned int initialValue=0)
Definition bsls_atomicoperations.h:1922
unsigned long long Uint64
Definition bsls_types.h:137
Definition bsls_objectbuffer.h:276