608template <
class TYPE,
class NODE>
616template <
class TYPE,
class NODE>
621 d_queue_p->markReclaim(d_node_p);
626template <
class TYPE,
class NODE>
648 d_queue_p->popComplete(
true);
667 d_queue_p->releaseAllocateLock();
675template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
679 return state >> k_AVAILABLE_SHIFT;
683template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
684void SingleConsumerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>
685 ::incrementUntil(AtomicUint *value,
unsigned int bitValue)
687 unsigned int state = ATOMIC_OP::getUintAcquire(value);
688 if (bitValue != (state & 1)) {
689 unsigned int expState;
692 state = ATOMIC_OP::testAndSwapUintAcqRel(value,
695 }
while (state != expState && (bitValue == (state & 1)));
699template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
700void SingleConsumerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>
701 ::markReclaim(Node *node)
708 ATOMIC_OP::addInt64AcqRel(&d_capacity, -1);
710 int nodeState = ATOMIC_OP::swapIntAcqRel(&node->d_state, e_RECLAIM);
711 if (e_WRITABLE_AND_BLOCKED == nodeState) {
715 d_readCondition.signal();
719template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
720void SingleConsumerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>
721 ::popComplete(
bool destruct)
724 static_cast<Node *
>(ATOMIC_OP::getPtrAcquire(&d_nextRead));
727 nextRead->d_value.object().~TYPE();
730 ATOMIC_OP::setIntRelease(&nextRead->d_state, e_WRITABLE);
732 ATOMIC_OP::setPtrRelease(&d_nextRead,
733 ATOMIC_OP::getPtrAcquire(&nextRead->d_next));
738 if (ATOMIC_OP::getInt64Acquire(&d_capacity) == available(state)) {
742 d_emptyCondition.broadcast();
746template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
747typename SingleConsumerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::Node *
748 SingleConsumerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>
751 if (1 == (ATOMIC_OP::getUintAcquire(&d_pushBackDisabled) & 1)) {
760 k_USE_INC - k_AVAILABLE_INC);
762 if (state < 0 || 0 != (state & k_ALLOCATE_MASK)) {
766 state = ATOMIC_OP::addInt64NvAcqRel(&d_state,
767 k_AVAILABLE_INC - k_USE_INC);
773 if (state >= k_AVAILABLE_INC && 0 == (state & k_ALLOCATE_MASK)) {
778 state = ATOMIC_OP::testAndSwapInt64AcqRel(
781 state + k_USE_INC - k_AVAILABLE_INC);
783 else if ( 0 == ( (state >> k_AVAILABLE_SHIFT)
784 + (state & k_USE_MASK))
785 && 0 == (state & k_ALLOCATE_MASK)) {
796 state = ATOMIC_OP::testAndSwapInt64AcqRel(
799 state + k_ALLOCATE_INC);
800 if (expState == state) {
817 SingleConsumerQueueImpl_AllocateLockGuard<
818 SingleConsumerQueueImpl<TYPE,
821 CONDITION> > guard(
this);
823 Node *a =
static_cast<Node *
>(
824 ATOMIC_OP::getPtrAcquire(&d_nextWrite));
826 Node *b =
static_cast<Node *
>(
827 ATOMIC_OP::getPtrAcquire(&a->d_next));
829 Node *nodes =
static_cast<Node *
>(
830 d_allocator.allocate(
sizeof(Node)
831 * k_ALLOCATION_BATCH_SIZE));
832 for (bsl::size_t i = 0;
833 i < k_ALLOCATION_BATCH_SIZE - 1;
836 ATOMIC_OP::initInt(&n->d_state, e_WRITABLE);
837 ATOMIC_OP::initPointer(&n->d_next, n + 1);
840 Node *n = nodes + k_ALLOCATION_BATCH_SIZE - 1;
841 ATOMIC_OP::initInt(&n->d_state, e_WRITABLE);
842 ATOMIC_OP::initPointer(&n->d_next, b);
845 ATOMIC_OP::setPtrRelease(&a->d_next, nodes);
846 ATOMIC_OP::setPtrRelease(&d_nextWrite, nodes);
848 ATOMIC_OP::addInt64AcqRel(&d_capacity,
849 k_ALLOCATION_BATCH_SIZE);
854 ATOMIC_OP::addInt64AcqRel(
856 k_AVAILABLE_INC * (k_ALLOCATION_BATCH_SIZE - 1));
866 state = ATOMIC_OP::getInt64Acquire(&d_state);
871 }
while (state != expState);
875 static_cast<Node *
>(ATOMIC_OP::getPtrAcquire(&d_nextWrite));
879 expNextWrite = nextWrite;
880 Node *next =
static_cast<Node *
>(ATOMIC_OP::getPtrAcquire(
881 &nextWrite->d_next));
883 nextWrite =
static_cast<Node *
>(ATOMIC_OP::testAndSwapPtrAcqRel(
887 }
while (nextWrite != expNextWrite);
889 ATOMIC_OP::addInt64AcqRel(&d_state, -k_USE_INC);
894template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
896void SingleConsumerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>
897 ::releaseAllocateLock()
899 ATOMIC_OP::addInt64AcqRel(&d_state, -k_ALLOCATE_INC);
903template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
911, d_allocator(basicAllocator)
913 ATOMIC_OP::initInt64(&d_capacity, 0);
914 ATOMIC_OP::initInt64(&d_state, 0);
916 ATOMIC_OP::initUint(&d_popFrontDisabled, 0);
917 ATOMIC_OP::initUint(&d_pushBackDisabled, 0);
919 ATOMIC_OP::initPointer(&d_nextWrite, 0);
921 Node *n =
static_cast<Node *
>(d_allocator.
allocate(
sizeof(Node)));
922 ATOMIC_OP::initInt(&n->d_state, e_WRITABLE);
923 ATOMIC_OP::initPointer(&n->d_next, n);
925 ATOMIC_OP::setPtrRelease(&d_nextWrite, n);
926 ATOMIC_OP::setPtrRelease(&d_nextRead, n);
929template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
938, d_allocator(basicAllocator)
940 ATOMIC_OP::initInt64(&d_capacity, 0);
941 ATOMIC_OP::initInt64(&d_state, 0);
943 ATOMIC_OP::initUint(&d_popFrontDisabled, 0);
944 ATOMIC_OP::initUint(&d_pushBackDisabled, 0);
946 ATOMIC_OP::initPointer(&d_nextWrite, 0);
948 Node *nodes =
static_cast<Node *
>(d_allocator.
allocate(
sizeof(Node)
950 for (bsl::size_t i = 0; i < capacity; ++i) {
952 ATOMIC_OP::initInt(&n->d_state, e_WRITABLE);
953 ATOMIC_OP::initPointer(&n->d_next, n + 1);
956 Node *n = nodes + capacity;
957 ATOMIC_OP::initInt(&n->d_state, e_WRITABLE);
958 ATOMIC_OP::initPointer(&n->d_next, nodes);
961 ATOMIC_OP::setPtrRelease(&d_nextWrite, nodes);
962 ATOMIC_OP::setPtrRelease(&d_nextRead, nodes);
964 ATOMIC_OP::addInt64AcqRel(&d_capacity, capacity);
965 ATOMIC_OP::addInt64AcqRel(&d_state, k_AVAILABLE_INC * capacity);
968template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
972 Node *end =
static_cast<Node *
>(ATOMIC_OP::getPtrAcquire(&d_nextWrite));
975 Node *at =
static_cast<Node *
>(ATOMIC_OP::getPtrAcquire(&end->d_next));
979 static_cast<Node *
>(ATOMIC_OP::getPtrAcquire(&at->d_next));
981 if (e_READABLE == ATOMIC_OP::getIntAcquire(&at->d_state)) {
982 at->d_value.object().~TYPE();
988 if (e_READABLE == ATOMIC_OP::getIntAcquire(&at->d_state)) {
989 at->d_value.object().~TYPE();
995template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
999 unsigned int generation = ATOMIC_OP::getUintAcquire(&d_popFrontDisabled);
1000 if (1 == (generation & 1)) {
1005 static_cast<Node *
>(ATOMIC_OP::getPtrAcquire(&d_nextRead));
1006 int nodeState = ATOMIC_OP::getIntAcquire(&nextRead->d_state);
1014 if (e_WRITABLE == nodeState) {
1016 nodeState = ATOMIC_OP::getIntAcquire(&nextRead->d_state);
1017 if (e_WRITABLE == nodeState) {
1019 nodeState = ATOMIC_OP::swapIntAcqRel(&nextRead->d_state,
1020 e_WRITABLE_AND_BLOCKED);
1021 while (e_READABLE != nodeState && e_RECLAIM != nodeState) {
1023 ATOMIC_OP::getUintAcquire(&d_popFrontDisabled)) {
1026 d_readCondition.wait(&d_readMutex);
1027 nodeState = ATOMIC_OP::getIntAcquire(&nextRead->d_state);
1031 if (e_RECLAIM == nodeState) {
1032 ATOMIC_OP::addInt64AcqRel(&d_capacity, 1);
1035 static_cast<Node *
>(ATOMIC_OP::getPtrAcquire(&d_nextRead));
1036 nodeState = ATOMIC_OP::getIntAcquire(&nextRead->d_state);
1038 }
while (e_RECLAIM == nodeState);
1044 CONDITION> > guard(
this);
1046#if defined(BSLMF_MOVABLEREF_USES_RVALUE_REFERENCES)
1049 *value = nextRead->d_value.object();
1055template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1059 Node *target = pushBackHelper();
1070 Node> proctor(
this, target);
1078 int nodeState = ATOMIC_OP::swapIntAcqRel(&target->d_state, e_READABLE);
1079 if (e_WRITABLE_AND_BLOCKED == nodeState) {
1083 d_readCondition.signal();
1089template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1093 Node *target = pushBackHelper();
1104 Node> proctor(
this, target);
1106 TYPE& dummy = value;
1113 int nodeState = ATOMIC_OP::swapIntAcqRel(&target->d_state, e_READABLE);
1114 if (e_WRITABLE_AND_BLOCKED == nodeState) {
1118 d_readCondition.signal();
1124template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1132 static_cast<Node *
>(ATOMIC_OP::getPtrAcquire(&d_nextRead));
1133 int nodeState = ATOMIC_OP::getIntAcquire(&nextRead->d_state);
1134 while (e_READABLE == nodeState || e_RECLAIM == nodeState) {
1135 if (e_READABLE == nodeState) {
1136 nextRead->d_value.object().~TYPE();
1141 ATOMIC_OP::setIntRelease(&nextRead->d_state, e_WRITABLE);
1143 static_cast<Node *
>(ATOMIC_OP::getPtrAcquire(&nextRead->d_next));
1144 ATOMIC_OP::setPtrRelease(&d_nextRead, nextRead);
1145 nodeState = ATOMIC_OP::getIntAcquire(&nextRead->d_state);
1149 ATOMIC_OP::addInt64AcqRel(&d_capacity, reclaim);
1150 ATOMIC_OP::addInt64AcqRel(&d_state, k_AVAILABLE_INC * count);
1155 d_emptyCondition.broadcast();
1158template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1162 unsigned int generation = ATOMIC_OP::getUintAcquire(&d_popFrontDisabled);
1163 if (1 == (generation & 1)) {
1168 static_cast<Node *
>(ATOMIC_OP::getPtrAcquire(&d_nextRead));
1169 int nodeState = ATOMIC_OP::getIntAcquire(&nextRead->d_state);
1171 while (e_RECLAIM == nodeState) {
1172 ATOMIC_OP::addInt64AcqRel(&d_capacity, 1);
1174 nextRead =
static_cast<Node *
>(ATOMIC_OP::getPtrAcquire(&d_nextRead));
1175 nodeState = ATOMIC_OP::getIntAcquire(&nextRead->d_state);
1178 if (e_READABLE != nodeState) {
1186 CONDITION> > guard(
this);
1188#if defined(BSLMF_MOVABLEREF_USES_RVALUE_REFERENCES)
1191 *value = nextRead->d_value.object();
1197template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1201 return pushBack(value);
1204template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1213template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1217 incrementUntil(&d_popFrontDisabled, 1);
1222 d_readCondition.signal();
1227 d_emptyCondition.broadcast();
1230template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1234 incrementUntil(&d_pushBackDisabled, 1);
1237template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1241 incrementUntil(&d_popFrontDisabled, 0);
1244template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1248 incrementUntil(&d_pushBackDisabled, 0);
1252template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1256 return ATOMIC_OP::getInt64Acquire(&d_capacity) ==
1257 available(ATOMIC_OP::getInt64Acquire(&d_state));
1260template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1266template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1270 return 1 == (ATOMIC_OP::getUintAcquire(&d_popFrontDisabled) & 1);
1273template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1277 return 1 == (ATOMIC_OP::getUintAcquire(&d_pushBackDisabled) & 1);
1280template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1285 return static_cast<bsl::size_t
>(
1287 ? ATOMIC_OP::getInt64Acquire(&d_capacity) - avail
1288 : ATOMIC_OP::getInt64Acquire(&d_capacity));
1291template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1295 unsigned int generation = ATOMIC_OP::getUintAcquire(&d_popFrontDisabled);
1296 if (1 == (generation & 1)) {
1303 while (ATOMIC_OP::getInt64Acquire(&d_capacity) != available(state)) {
1304 if (generation != ATOMIC_OP::getUintAcquire(&d_popFrontDisabled)) {
1307 d_emptyCondition.wait(&d_emptyMutex);
1308 state = ATOMIC_OP::getInt64Acquire(&d_state);
1316template <
class TYPE,
class ATOMIC_OP,
class MUTEX,
class CONDITION>
1320 return d_allocator.allocator();