BDE 4.14.0 Production release
Loading...
Searching...
No Matches
bdlcc_singleproducerqueueimpl.h
Go to the documentation of this file.
1/// @file bdlcc_singleproducerqueueimpl.h
2///
3/// The content of this file has been pre-processed for Doxygen.
4///
5
6
7// bdlcc_singleproducerqueueimpl.h -*-C++-*-
8
9#ifndef INCLUDED_BDLCC_SINGLEPRODUCERQUEUEIMPL
10#define INCLUDED_BDLCC_SINGLEPRODUCERQUEUEIMPL
11
12#include <bsls_ident.h>
13BSLS_IDENT("$Id: $")
14
15/// @defgroup bdlcc_singleproducerqueueimpl bdlcc_singleproducerqueueimpl
16/// @brief Provide a testable thread-aware single producer queue of values.
17/// @addtogroup bdl
18/// @{
19/// @addtogroup bdlcc
20/// @{
21/// @addtogroup bdlcc_singleproducerqueueimpl
22/// @{
23///
24/// <h1> Outline </h1>
25/// * <a href="#bdlcc_singleproducerqueueimpl-purpose"> Purpose</a>
26/// * <a href="#bdlcc_singleproducerqueueimpl-classes"> Classes </a>
27/// * <a href="#bdlcc_singleproducerqueueimpl-description"> Description </a>
28/// * <a href="#bdlcc_singleproducerqueueimpl-exception-safety"> Exception safety </a>
29/// * <a href="#bdlcc_singleproducerqueueimpl-move-semantics-in-c-03"> Move Semantics in C++03 </a>
30/// * <a href="#bdlcc_singleproducerqueueimpl-memory-usage"> Memory Usage </a>
31/// * <a href="#bdlcc_singleproducerqueueimpl-usage"> Usage </a>
32///
33/// # Purpose {#bdlcc_singleproducerqueueimpl-purpose}
34/// Provide a testable thread-aware single producer queue of values.
35///
36/// # Classes {#bdlcc_singleproducerqueueimpl-classes}
37///
38/// - bdlcc::SingleProducerQueueImpl: thread-aware single producer `TYPE` queue
39///
40/// # Description {#bdlcc_singleproducerqueueimpl-description}
41/// This component defines a type,
42/// `bdlcc::SingleProducerQueueImpl`, that provides an efficient, thread-aware
43/// queue of values assuming a single producer (the use of `pushBack` and
44/// `tryPushBack` is done by one thread or a group of threads using external
45/// synchronization). The behavior of the methods `pushBack` and `tryPushBack`
46/// is undefined unless the use is by a single producer. This class is ideal
47/// for synchronization and communication between threads in a producer-consumer
48/// model when there is only one producer thread.
49///
50/// The queue provides `pushBack` and `popFront` methods for pushing data into
51/// the queue and popping data from the queue. The queue will allocate memory
52/// as necessary to accommodate `pushBack` invocations (`pushBack` will never
53/// block and is provided for consistency with other containers). When the
54/// queue is empty, the `popFront` methods block until data appears in the
55/// queue. Non-blocking methods `tryPushBack` and `tryPopFront` are also
56/// provided. The `tryPopFront` method fails immediately, returning a non-zero
57/// value, if the queue is empty.
58///
59/// The queue may be placed into a "enqueue disabled" state using the
60/// `disablePushBack` method. When disabled, `pushBack` and `tryPushBack` fail
61/// immediately and return an error code. The queue may be restored to normal
62/// operation with the `enablePushBack` method.
63///
64/// The queue may be placed into a "dequeue disabled" state using the
65/// `disablePopFront` method. When dequeue disabled, `popFront` and
66/// `tryPopFront` fail immediately and return an error code. Any threads
67/// blocked in `popFront` when the queue is dequeue disabled return from
68/// `popFront` immediately and return an error code.
69///
70/// ## Exception safety {#bdlcc_singleproducerqueueimpl-exception-safety}
71///
72///
73/// A `bdlcc::SingleProducerQueueImpl` is exception neutral, and all of the
74/// methods of `bdlcc::SingleProducerQueueImpl` provide the basic exception
75/// safety guarantee (see @ref bsldoc_glossary ).
76///
77/// ## Move Semantics in C++03 {#bdlcc_singleproducerqueueimpl-move-semantics-in-c-03}
78///
79///
80/// Move-only types are supported by `bdlcc::SingleProducerQueueImpl` on C++11
81/// platforms only (where `BSLMF_MOVABLEREF_USES_RVALUE_REFERENCES` is defined),
82/// and are not supported on C++03 platforms. Unfortunately, in C++03, there
83/// are user types where a `bslmf::MovableRef` will not safely degrade to a
84/// lvalue reference when a move constructor is not available (types providing a
85/// constructor template taking any type), so `bslmf::MovableRefUtil::move`
86/// cannot be used directly on a user supplied template type. See internal bug
87/// report 99039150 for more information.
88///
89/// ## Memory Usage {#bdlcc_singleproducerqueueimpl-memory-usage}
90///
91///
92/// `bdlcc::SingleProducerQueueImpl` is most efficient when dealing with small
93/// objects or fundamental types (as a thread-safe container, its methods pass
94/// objects *by* *value*). We recommend large objects be stored as
95/// shared-pointers (or possibly raw pointers).
96///
97/// ## Usage {#bdlcc_singleproducerqueueimpl-usage}
98///
99///
100/// There is no usage example for this component since it is not meant for
101/// direct client use.
102/// @}
103/** @} */
104/** @} */
105
106/** @addtogroup bdl
107 * @{
108 */
109/** @addtogroup bdlcc
110 * @{
111 */
112/** @addtogroup bdlcc_singleproducerqueueimpl
113 * @{
114 */
115
116#include <bdlscm_version.h>
117
119
121#include <bslma_default.h>
123
124#include <bslmf_movableref.h>
126
127#include <bslmt_lockguard.h>
128#include <bslmt_threadutil.h>
129
130#include <bsls_assert.h>
131#include <bsls_objectbuffer.h>
132#include <bsls_types.h>
133
134#include <bsl_cstddef.h>
135
136
137namespace bdlcc {
138
139 // ==================================================
140 // class SingleProducerQueueImpl_ReleaseAllRawProctor
141 // ==================================================
142
143/// This class implements a proctor that, unless its `release` method has
144/// previously been invoked, automatically invokes `releaseAllRaw` on a
145/// `TYPE` upon destruction.
146///
147/// See @ref bdlcc_singleproducerqueueimpl
148template <class TYPE>
150
151 // DATA
152 TYPE *d_queue_p; // managed queue
153
154 // NOT IMPLEMENTED
160
161 public:
162 // CREATORS
163
164 /// Create a `removeAll` proctor that conditionally manages the
165 /// specified `queue` (if non-zero).
167
168 /// Destroy this object and, if `release` has not been invoked, invoke
169 /// the managed queue's `releaseAllRaw` method.
171
172 // MANIPULATORS
173
174 /// Release from management the queue currently managed by this proctor.
175 /// If no queue, this method has no effect.
176 void release();
177};
178
179 // ==============================================
180 // class SingleProducerQueueImpl_PopCompleteGuard
181 // ==============================================
182
183/// This class implements a guard automatically invokes `popComplete` on a
184/// `NODE` upon destruction.
185///
186/// See @ref bdlcc_singleproducerqueueimpl
187template <class TYPE, class NODE>
189
190 // DATA
191 TYPE *d_queue_p; // managed queue owning the managed node
192 NODE *d_node_p; // managed node
193 bool d_isEmpty; // if true, the empty condition will be signalled
194
195 // NOT IMPLEMENTED
201
202 public:
203 // CREATORS
204
205 /// Create a `popComplete` guard managing the specified `queue` and
206 /// `node` that will cause the empty condition to be signalled if the
207 /// specified `isEmpty` is `true`.
209 NODE *node,
210 bool isEmpty);
211
212 /// Destroy this object and invoke the `TYPE::popComplete` method with
213 /// the managed `node`.
215};
216
217 // =============================
218 // class SingleProducerQueueImpl
219 // =============================
220
221/// This class provides a thread-safe unbounded queue of values that assumes
222/// a single producer thread.
223///
224/// The types `ATOMIC_OP`, `MUTEX`, and `CONDITION` are exposed for testing.
225/// Typical usage is with `bsls::AtomicOperations` for `ATOMIC_OP`,
226/// `bslmt::Mutex` for `MUTEX`, and `bslmt::Condition` for `CONDITION`.
227///
228/// See @ref bdlcc_singleproducerqueueimpl
229template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
231
232 // PRIVATE CONSTANTS
233 enum {
234 // These value are used as values for `d_state` in `Node`. A node is
235 // writable at creation and after a read completes (when the single
236 // producer can write to the node). A node is readable after it is
237 // written (when the node can be read by a consumer). The states
238 // in-between these two states (e.g., writing) are not needed by this
239 // implementation of the queue.
240
241 e_READABLE, // node can be read
242 e_WRITABLE // node can be written
243 };
244
245 static const int k_POP_YIELD_SPIN = 10; // number of yield-spins to
246 // attempt before acquiring
247 // `d_readMutex`
248
249 // The following constants are used to maintain the queue's `d_state`
250 // value. See *Implementation* *Note* for details.
251
252 static const bsls::Types::Int64 k_BLOCKED_MASK = 0x0000000000ffffffLL;
253 static const bsls::Types::Int64 k_BLOCKED_INC = 0x0000000000000001LL;
254 static const bsls::Types::Int64 k_AVAILABLE_INC = 0x0000000001000000LL;
255 static const bsls::Types::Int64 k_AVAILABLE_MASK = 0xffffffffff000000LL;
256 static const int k_AVAILABLE_SHIFT = 24;
257
258 // PRIVATE TYPES
259 typedef typename ATOMIC_OP::AtomicTypes::Int AtomicInt;
260 typedef typename ATOMIC_OP::AtomicTypes::Uint AtomicUint;
261 typedef typename ATOMIC_OP::AtomicTypes::Int64 AtomicInt64;
262 typedef typename ATOMIC_OP::AtomicTypes::Pointer AtomicPointer;
263
264 template <class T>
265 struct QueueNode {
266 // PUBLIC DATA
267 bsls::ObjectBuffer<T> d_value; // stored value
268 AtomicInt d_state; // `e_READABLE` or `e_WRITABLE`
269 AtomicPointer d_next; // pointer to next node
270 };
271
272 typedef QueueNode<TYPE> Node;
273
274 // DATA
275 AtomicPointer d_nextWrite; // pointer to next write to node
276
277 AtomicPointer d_nextRead; // pointer to next read from node
278
279 MUTEX d_readMutex; // used with `d_readCondition` to
280 // block until an element is
281 // available for popping
282
283 CONDITION d_readCondition; // condition variable for popping
284 // threads
285
286 mutable MUTEX d_emptyMutex; // blocking point for producer
287 // during `waitUntilEmpty`
288
289 mutable CONDITION d_emptyCondition; // condition variable for producer
290 // during `waitUntilEmpty`
291
292 AtomicInt64 d_state; // bit pattern representing the
293 // state of the queue (see
294 // implementation notes)
295
296 AtomicUint d_popFrontDisabled; // is queue pop disabled and
297 // generation count; see
298 // *Implementation* *Note*
299
300 AtomicUint d_pushBackDisabled; // is queue push disabled and
301 // generation count; see
302 // *Implementation* *Note*
303
304 bslma::Allocator *d_allocator_p; // allocator, held not owned
305
306 // FRIENDS
309 ATOMIC_OP,
310 MUTEX,
311 CONDITION> >;
312
315 ATOMIC_OP,
316 MUTEX,
317 CONDITION>,
318 typename SingleProducerQueueImpl<TYPE,
319 ATOMIC_OP,
320 MUTEX,
321 CONDITION>::Node >;
322
323 // PRIVATE CLASS METHODS
324
325 /// Return `true` if the specified `state` implies all elements in the
326 /// associated queue will be used to complete currently active dequeue
327 /// operations.
328 static bool allElementsReserved(bsls::Types::Int64 state);
329
330 /// Return `true` if the specified `state` implies the associated queue
331 /// can supply an element to a thread blocked in a dequeue operation.
332 static bool canSupplyBlockedThread(bsls::Types::Int64 state);
333
334 /// Return `true` if the specified `state` implies the associated queue
335 /// can supply exactly one element to a thread blocked in a dequeue
336 /// operation.
337 static bool canSupplyOneBlockedThread(bsls::Types::Int64 state);
338
339 /// Return the available attribute from the specified `state`.
340 static bsls::Types::Int64 getAvailable(bsls::Types::Int64 state);
341
342 /// Return `true` if the specified `state` implies the associated queue
343 /// is empty, and `false` otherwise.
344 static bool isEmpty(bsls::Types::Int64 state);
345
346 /// Return `true` if the specified `state` implies the associated queue
347 /// will have one or more threads blocked in a dequeue operation.
348 static bool willHaveBlockedThread(bsls::Types::Int64 state);
349
350 // PRIVATE MANIPULATORS
351
352 /// If the specified `value` does not have its lowest-order bit set to
353 /// the value of the specified `bitValue`, increment `value` until it
354 /// does. Note that this method is used to modify the generation counts
355 /// stored in `d_popFrontDisabled` and `d_pushBackDisabled`. See
356 /// *Implementation* *Note* for further details.
357 void incrementUntil(AtomicUint *value, unsigned int bitValue);
358
359 /// Destruct the value stored in the specified `node`, mark the `node`
360 /// writable, and if the specified `isEmpty` is `true` then signal the
361 /// queue empty condition. This method is used within `popFrontRaw` by
362 /// a guard to complete the reclamation of a node in the presence of an
363 /// exception.
364 void popComplete(Node *node, bool isEmpty);
365
366 /// Remove the element, without verifying the availability of the
367 /// element, from the front of this queue, load that element into the
368 /// specified `value`, and if the specified `isEmpty` is `true` then
369 /// signal the queue empty condition.
370 void popFrontRaw(TYPE* value, bool isEmpty);
371
372 /// Return all memory to the allocator. This method is intended to be
373 /// used by the destructor and to avoid a memory leak when there is an
374 /// exception during construction.
375 void releaseAllRaw();
376
377 // NOT IMPLEMENTED
378 SingleProducerQueueImpl(const SingleProducerQueueImpl&);
379 SingleProducerQueueImpl& operator=(const SingleProducerQueueImpl&);
380
381 public:
382 // TRAITS
383 BSLMF_NESTED_TRAIT_DECLARATION(SingleProducerQueueImpl,
384 bslma::UsesBslmaAllocator);
385
386 // PUBLIC TYPES
387 typedef TYPE value_type; // The type for elements.
388
389 // PUBLIC CONSTANTS
390 enum {
391 e_SUCCESS = 0, // must be 0
392 e_EMPTY = -1,
393 e_DISABLED = -2
394 };
395
396 // CREATORS
397
398 /// Create a thread-aware queue. Optionally specify a `basicAllocator`
399 /// used to supply memory. If `basicAllocator` is 0, the currently
400 /// installed default allocator is used.
401 explicit SingleProducerQueueImpl(bslma::Allocator *basicAllocator = 0);
402
403 /// Create a thread-aware queue with, at least, the specified
404 /// `capacity`. Optionally specify a `basicAllocator` used to supply
405 /// memory. If `basicAllocator` is 0, the currently installed default
406 /// allocator is used.
407 SingleProducerQueueImpl(bsl::size_t capacity,
408 bslma::Allocator *basicAllocator = 0);
409
410 /// Destroy this object.
412
413 // MANIPULATORS
414
415 /// Remove the element from the front of this queue and load that
416 /// element into the specified `value`. If the queue is empty, block
417 /// until it is not empty. Return 0 on success, and a non-zero value
418 /// otherwise. Specifically, return `e_DISABLED` if
419 /// `isPopFrontDisabled()`. On failure, `value` is not changed.
420 /// Threads blocked due to the queue being empty will return
421 /// `e_DISABLED` if `disablePopFront` is invoked.
422 int popFront(TYPE *value);
423
424 /// Append the specified `value` to the back of this queue. Return 0 on
425 /// success, and a non-zero value otherwise. Specifically, return
426 /// `e_DISABLED` if `isPushBackDisabled()`. The behavior is undefined
427 /// unless the invoker of this method is the single producer.
428 int pushBack(const TYPE& value);
429
430 /// Append the specified move-insertable `value` to the back of this
431 /// queue. `value` is left in a valid but unspecified state. Return 0
432 /// on success, and a non-zero value otherwise. Specifically, return
433 /// `e_DISABLED` if `isPushBackDisabled()`. On failure, `value` is not
434 /// changed. The behavior is undefined unless the invoker of this
435 /// method is the single producer.
437
438 /// Remove all items currently in this queue. Note that this operation
439 /// is not atomic; if other threads are concurrently pushing items into
440 /// the queue the result of `numElements()` after this function returns
441 /// is not guaranteed to be 0.
442 void removeAll();
443
444 /// Attempt to remove the element from the front of this queue without
445 /// blocking, and, if successful, load the specified `value` with the
446 /// removed element. Return 0 on success, and a non-zero value
447 /// otherwise. Specifically, return `e_DISABLED` if
448 /// `isPopFrontDisabled()`, and `e_EMPTY` if `!isPopFrontDisabled()` and
449 /// the queue was empty. On failure, `value` is not changed.
450 int tryPopFront(TYPE *value);
451
452 /// Append the specified `value` to the back of this queue. Return 0 on
453 /// success, and a non-zero value otherwise. Specifically, return
454 /// `e_DISABLED` if `isPushBackDisabled()`. The behavior is undefined
455 /// unless the invoker of this method is the single producer.
456 int tryPushBack(const TYPE& value);
457
458 /// Append the specified move-insertable `value` to the back of this
459 /// queue. `value` is left in a valid but unspecified state. Return 0
460 /// on success, and a non-zero value otherwise. Specifically, return
461 /// `e_DISABLED" if `isPushBackDisabled()`. On failure, `value' is not
462 /// changed. The behavior is undefined unless the invoker of this
463 /// method is the single producer.
465
466 // Enqueue/Dequeue State
467
468 /// Disable dequeueing from this queue. All subsequent invocations of
469 /// `popFront` or `tryPopFront` will fail immediately. All blocked
470 /// invocations of `popFront` and `waitUntilEmpty` will fail
471 /// immediately. If the queue is already dequeue disabled, this method
472 /// has no effect.
474
475 /// Disable enqueueing into this queue. All subsequent invocations of
476 /// `pushBack` or `tryPushBack` will fail immediately. All blocked
477 /// invocations of `pushBack` will fail immediately. If the queue is
478 /// already enqueue disabled, this method has no effect.
480
481 /// Enable dequeueing. If the queue is not dequeue disabled, this call
482 /// has no effect.
484
485 /// Enable queuing. If the queue is not enqueue disabled, this call has
486 /// no effect.
488
489 // ACCESSORS
490
491 /// Return `true` if this queue is empty (has no elements), or `false`
492 /// otherwise.
493 bool isEmpty() const;
494
495 /// Return `true` if this queue is full (has no available capacity), or
496 /// `false` otherwise. Note that for unbounded queues, this method
497 /// always returns `false`.
498 bool isFull() const;
499
500 /// Return `true` if this queue is dequeue disabled, and `false`
501 /// otherwise. Note that the queue is created in the "dequeue enabled"
502 /// state.
503 bool isPopFrontDisabled() const;
504
505 /// Return `true` if this queue is enqueue disabled, and `false`
506 /// otherwise. Note that the queue is created in the "enqueue enabled"
507 /// state.
508 bool isPushBackDisabled() const;
509
510 /// Returns the number of elements currently in this queue.
511 bsl::size_t numElements() const;
512
513 /// Block until all the elements in this queue are removed. Return 0 on
514 /// success, and a non-zero value otherwise. Specifically, return
515 /// `e_DISABLED` if `!isEmpty() && isPopFrontDisabled()`. A blocked
516 /// thread waiting for the queue to empty will return `e_DISABLED` if
517 /// `disablePopFront` is invoked.
518 int waitUntilEmpty() const;
519
520 // Aspects
521
522 /// Return the allocator used by this object to supply memory.
524};
525
526// ============================================================================
527// INLINE DEFINITIONS
528// ============================================================================
529
530 // --------------------------------------------------
531 // class SingleProducerQueueImpl_ReleaseAllRawProctor
532 // --------------------------------------------------
533
534// CREATORS
535template <class TYPE>
541
542template <class TYPE>
545{
546 if (d_queue_p) {
547 d_queue_p->releaseAllRaw();
548 }
549}
550
551// MANIPULATORS
552template <class TYPE>
557
558 // ----------------------------------------------
559 // class SingleProducerQueueImpl_PopCompleteGuard
560 // ----------------------------------------------
561
562// CREATORS
563template <class TYPE, class NODE>
566 NODE *node,
567 bool isEmpty)
568: d_queue_p(queue)
569, d_node_p(node)
570, d_isEmpty(isEmpty)
571{
572}
573
574template <class TYPE, class NODE>
580
581 // -----------------------------
582 // class SingleProducerQueueImpl
583 // -----------------------------
584
585// PRIVATE CLASS METHODS
586template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
587inline
590{
591 return (state >> k_AVAILABLE_SHIFT) <= (state & k_BLOCKED_MASK);
592}
593
594template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
595inline
596bool SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::
597 canSupplyBlockedThread(bsls::Types::Int64 state)
598{
599 return k_AVAILABLE_INC <= state && (state & k_BLOCKED_MASK);
600}
601
602template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
603inline
604bool SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::
605 canSupplyOneBlockedThread(bsls::Types::Int64 state)
606{
607 return k_AVAILABLE_INC == (state & k_AVAILABLE_MASK)
608 && (state & k_BLOCKED_MASK);
609}
610
611template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
612inline
613bsls::Types::Int64 SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::
614 getAvailable(bsls::Types::Int64 state)
615{
616 return state >> k_AVAILABLE_SHIFT;
617}
618
619template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
620inline
623{
624 return k_AVAILABLE_INC > state;
625}
626
627template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
628inline
629bool SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::
630 willHaveBlockedThread(bsls::Types::Int64 state)
631{
632 return (state >> k_AVAILABLE_SHIFT) < (state & k_BLOCKED_MASK);
633}
634
635// PRIVATE MANIPULATORS
636template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
637void SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>
638 ::incrementUntil(AtomicUint *value, unsigned int bitValue)
639{
640 unsigned int state = ATOMIC_OP::getUintAcquire(value);
641 if (bitValue != (state & 1)) {
642 unsigned int expState;
643 do {
644 expState = state;
645 state = ATOMIC_OP::testAndSwapUintAcqRel(value,
646 state,
647 state + 1);
648 } while (state != expState && (bitValue == (state & 1)));
649 }
650}
651
652template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
653void SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::
654 popComplete(Node *node, bool isEmpty)
655{
656 node->d_value.object().~TYPE();
657
658 ATOMIC_OP::setIntRelease(&node->d_state, e_WRITABLE);
659
660 if (isEmpty) {
661 {
662 bslmt::LockGuard<MUTEX> guard(&d_emptyMutex);
663 }
664 d_emptyCondition.broadcast();
665 }
666
667}
668
669template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
670void SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::
671 popFrontRaw(TYPE *value,
672 bool isEmpty)
673{
674 Node *readFrom =
675 static_cast<Node *>(ATOMIC_OP::getPtrAcquire(&d_nextRead));
676
677 Node *exp;
678 do {
679 Node *next =
680 static_cast<Node *>(ATOMIC_OP::getPtrAcquire(&readFrom->d_next));
681
682 exp = readFrom;
683 readFrom = static_cast<Node *>(ATOMIC_OP::testAndSwapPtrAcqRel(
684 &d_nextRead,
685 readFrom,
686 next));
687 } while (readFrom != exp);
688
689 SingleProducerQueueImpl_PopCompleteGuard<
690 SingleProducerQueueImpl <TYPE,
691 ATOMIC_OP,
692 MUTEX,
693 CONDITION>,
694 Node> guard(this, readFrom, isEmpty);
695
696#if defined(BSLMF_MOVABLEREF_USES_RVALUE_REFERENCES)
697 *value = bslmf::MovableRefUtil::move(readFrom->d_value.object());
698#else
699 *value = readFrom->d_value.object();
700#endif
701}
702
703template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
704void SingleProducerQueueImpl<TYPE, ATOMIC_OP, MUTEX, CONDITION>::
705 releaseAllRaw()
706{
707 Node *end = static_cast<Node *>(ATOMIC_OP::getPtrAcquire(&d_nextWrite));
708
709 if (end) {
710 Node *at = static_cast<Node *>(ATOMIC_OP::getPtrAcquire(&end->d_next));
711
712 while (at != end) {
713 Node *next =
714 static_cast<Node *>(ATOMIC_OP::getPtrAcquire(&at->d_next));
715
716 if (e_WRITABLE != ATOMIC_OP::getIntAcquire(&at->d_state)) {
717 at->d_value.object().~TYPE();
718 }
719
720 d_allocator_p->deallocate(at);
721
722 at = next;
723 }
724
725 if (e_WRITABLE != ATOMIC_OP::getIntAcquire(&at->d_state)) {
726 at->d_value.object().~TYPE();
727 }
728
729 d_allocator_p->deallocate(at);
730 }
731}
732
733// CREATORS
734template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
737: d_readMutex()
738, d_readCondition()
739, d_emptyMutex()
740, d_emptyCondition()
741, d_allocator_p(bslma::Default::allocator(basicAllocator))
742{
743 ATOMIC_OP::initInt64(&d_state, 0); // there are no available elements, the
744 // enable/disable generation is
745 // initialized to zero, and there are
746 // no threads blocked in 'popFront'
747
748 ATOMIC_OP::initUint(&d_popFrontDisabled, 0);
749 ATOMIC_OP::initUint(&d_pushBackDisabled, 0);
750
751 ATOMIC_OP::initPointer(&d_nextWrite, 0);
752
754 TYPE,
755 ATOMIC_OP,
756 MUTEX,
757 CONDITION> > proctor(this);
758
759 Node *n1 = static_cast<Node *>(d_allocator_p->allocate(sizeof(Node)));
760 ATOMIC_OP::initInt(&n1->d_state, e_WRITABLE);
761 ATOMIC_OP::initPointer(&n1->d_next, n1);
762
763 ATOMIC_OP::setPtrRelease(&d_nextWrite, n1);
764
765 ATOMIC_OP::initPointer(&d_nextRead, n1);
766
767 Node *n2 = static_cast<Node *>(d_allocator_p->allocate(sizeof(Node)));
768 ATOMIC_OP::initInt(&n2->d_state, e_WRITABLE);
769 ATOMIC_OP::initPointer(&n2->d_next, n1);
770 ATOMIC_OP::setPtrRelease(&n1->d_next, n2);
771
772 proctor.release();
773}
774
775template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
777 SingleProducerQueueImpl(bsl::size_t capacity,
778 bslma::Allocator *basicAllocator)
779: d_readMutex()
780, d_readCondition()
781, d_emptyMutex()
782, d_emptyCondition()
783, d_allocator_p(bslma::Default::allocator(basicAllocator))
784{
785 ATOMIC_OP::initInt64(&d_state, 0); // there are no available elements, the
786 // enable/disable generation is
787 // initialized to zero, and there are
788 // no threads blocked in 'popFront'
789
790 ATOMIC_OP::initUint(&d_popFrontDisabled, 0);
791 ATOMIC_OP::initUint(&d_pushBackDisabled, 0);
792
793 ATOMIC_OP::initPointer(&d_nextWrite, 0);
794
796 TYPE,
797 ATOMIC_OP,
798 MUTEX,
799 CONDITION> > proctor(this);
800
801 Node *n1 = static_cast<Node *>(d_allocator_p->allocate(sizeof(Node)));
802 ATOMIC_OP::initInt(&n1->d_state, e_WRITABLE);
803 ATOMIC_OP::initPointer(&n1->d_next, n1);
804
805 ATOMIC_OP::setPtrRelease(&d_nextWrite, n1);
806
807 ATOMIC_OP::initPointer(&d_nextRead, n1);
808
809 Node *n2 = static_cast<Node *>(d_allocator_p->allocate(sizeof(Node)));
810 ATOMIC_OP::initInt(&n2->d_state, e_WRITABLE);
811 ATOMIC_OP::initPointer(&n2->d_next, n1);
812 ATOMIC_OP::setPtrRelease(&n1->d_next, n2);
813
814 capacity = (2 <= capacity ? capacity : 2);
815
816 for (bsl::size_t i = 2; i < capacity; ++i) {
817 Node *n = static_cast<Node *>(d_allocator_p->allocate(sizeof(Node)));
818 ATOMIC_OP::initInt(&n->d_state, e_WRITABLE);
819 ATOMIC_OP::initPointer(&n->d_next,
820 ATOMIC_OP::getPtrAcquire(&n2->d_next));
821
822 ATOMIC_OP::setPtrRelease(&n2->d_next, n);
823 }
824
825 proctor.release();
826}
827
828template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
834
835// MANIPULATORS
836template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
838 TYPE *value)
839{
840 unsigned int generation = ATOMIC_OP::getUintAcquire(&d_popFrontDisabled);
841 if (1 == (generation & 1)) {
842 return e_DISABLED; // RETURN
843 }
844
845 bsls::Types::Int64 state = ATOMIC_OP::addInt64NvAcqRel(&d_state,
846 -k_AVAILABLE_INC);
847
848 if (willHaveBlockedThread(state)) {
850 state = ATOMIC_OP::getInt64Acquire(&d_state);
851 if (willHaveBlockedThread(state)) {
852 {
853 bslmt::LockGuard<MUTEX> guard(&d_readMutex);
854
855 state = ATOMIC_OP::addInt64NvAcqRel(
856 &d_state,
857 k_AVAILABLE_INC + k_BLOCKED_INC);
858
859 while (isEmpty(state)) {
860 if (generation !=
861 ATOMIC_OP::getUintAcquire(&d_popFrontDisabled)) {
862 ATOMIC_OP::addInt64AcqRel(&d_state, -k_BLOCKED_INC);
863 return e_DISABLED; // RETURN
864 }
865 d_readCondition.wait(&d_readMutex);
866 state = ATOMIC_OP::getInt64Acquire(&d_state);
867 }
868
869 state = ATOMIC_OP::addInt64NvAcqRel(
870 &d_state,
871 -(k_AVAILABLE_INC + k_BLOCKED_INC));
872 }
873 if (canSupplyBlockedThread(state)) {
874 d_readCondition.signal();
875 }
876 }
877 }
878
879 popFrontRaw(value, isEmpty(state));
880
881 return 0;
882}
883
884template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
886 const TYPE& value)
887{
888 if (1 == (ATOMIC_OP::getUintAcquire(&d_pushBackDisabled) & 1)) {
889 return e_DISABLED; // RETURN
890 }
891
892 Node *nextWrite = static_cast<Node *>(
893 ATOMIC_OP::getPtrAcquire(&d_nextWrite));
894
895 Node *next = static_cast<Node *>(
896 ATOMIC_OP::getPtrAcquire(&nextWrite->d_next));
897
898 if (e_WRITABLE != ATOMIC_OP::getIntAcquire(&next->d_state)) {
899 Node *n = static_cast<Node *>(d_allocator_p->allocate(sizeof(Node)));
900
901 ATOMIC_OP::initInt(&n->d_state, e_WRITABLE);
902 ATOMIC_OP::initPointer(&n->d_next, next);
903
904 ATOMIC_OP::setPtrRelease(&nextWrite->d_next, n);
905
906 next = n;
907 }
908
909 bslalg::ScalarPrimitives::copyConstruct(nextWrite->d_value.address(),
910 value,
911 d_allocator_p);
912
913 ATOMIC_OP::setIntRelease(&nextWrite->d_state, e_READABLE);
914 ATOMIC_OP::setPtrRelease(&d_nextWrite, next);
915
916 bsls::Types::Int64 state = ATOMIC_OP::addInt64NvAcqRel(&d_state,
917 k_AVAILABLE_INC);
918
919 if (canSupplyOneBlockedThread(state)) {
920 {
921 bslmt::LockGuard<MUTEX> guard(&d_readMutex);
922 }
923 d_readCondition.signal();
924 }
925
926 return 0;
927}
928
929template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
932{
933 if (1 == (ATOMIC_OP::getUintAcquire(&d_pushBackDisabled) & 1)) {
934 return e_DISABLED; // RETURN
935 }
936
937 Node *nextWrite = static_cast<Node *>(
938 ATOMIC_OP::getPtrAcquire(&d_nextWrite));
939
940 Node *next = static_cast<Node *>(
941 ATOMIC_OP::getPtrAcquire(&nextWrite->d_next));
942
943 if (e_WRITABLE != ATOMIC_OP::getIntAcquire(&next->d_state)) {
944 Node *n = static_cast<Node *>(d_allocator_p->allocate(sizeof(Node)));
945
946 ATOMIC_OP::initInt(&n->d_state, e_WRITABLE);
947 ATOMIC_OP::initPointer(&n->d_next, next);
948
949 ATOMIC_OP::setPtrRelease(&nextWrite->d_next, n);
950
951 next = n;
952 }
953
954 TYPE& dummy = value;
955 bslalg::ScalarPrimitives::moveConstruct(nextWrite->d_value.address(),
956 dummy,
957 d_allocator_p);
958
959 ATOMIC_OP::setIntRelease(&nextWrite->d_state, e_READABLE);
960 ATOMIC_OP::setPtrRelease(&d_nextWrite, next);
961
962 bsls::Types::Int64 state = ATOMIC_OP::addInt64NvAcqRel(&d_state,
963 k_AVAILABLE_INC);
964
965 if (canSupplyOneBlockedThread(state)) {
966 {
967 bslmt::LockGuard<MUTEX> guard(&d_readMutex);
968 }
969 d_readCondition.signal();
970 }
971
972 return 0;
973}
974
975template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
977 TYPE *value)
978{
979 unsigned int generation = ATOMIC_OP::getUintAcquire(&d_popFrontDisabled);
980 if (1 == (generation & 1)) {
981 return e_DISABLED; // RETURN
982 }
983
984 // optimistically attempt to acquire resource (representing an element)
985
986 bsls::Types::Int64 state = ATOMIC_OP::addInt64NvAcqRel(&d_state,
987 -k_AVAILABLE_INC);
988
989 while (willHaveBlockedThread(state)) {
990
991 // failed to acquire resource, must revert the change to 'd_state' or
992 // acquire the resource (due to actions of other threads)
993
994 const bsls::Types::Int64 expState = state;
995
996 state = ATOMIC_OP::testAndSwapInt64AcqRel(&d_state,
997 state,
998 state + k_AVAILABLE_INC);
999 if (expState == state) {
1000 // reverted the change to 'd_state'
1001
1002 state += k_AVAILABLE_INC;
1003 if (canSupplyBlockedThread(state)) {
1004 {
1005 bslmt::LockGuard<MUTEX> guard(&d_readMutex);
1006 }
1007 d_readCondition.signal();
1008 }
1009 return e_EMPTY; // RETURN
1010 }
1011 }
1012
1013 popFrontRaw(value, isEmpty(state));
1014
1015 return 0;
1016}
1017
1018template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
1020 const TYPE& value)
1021{
1022 return pushBack(value);
1023}
1024
1025template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
1031
1032template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
1034{
1035 bsls::Types::Int64 state = ATOMIC_OP::getInt64Acquire(&d_state);
1036 bsls::Types::Int64 expState;
1037
1038 do {
1039 if (allElementsReserved(state)) {
1040 return; // RETURN
1041 }
1042 expState = state;
1043 state = ATOMIC_OP::testAndSwapInt64AcqRel(
1044 &d_state,
1045 state,
1046 (state & ~k_AVAILABLE_MASK)
1047 | ( (state & k_BLOCKED_MASK)
1048 << k_AVAILABLE_SHIFT));
1049 } while (state != expState);
1050
1051 state = (state >> k_AVAILABLE_SHIFT) - (state & k_BLOCKED_MASK);
1052
1053 for (bsls::Types::Int64 i = 0; i < state; ++i) {
1054 Node *readFrom =
1055 static_cast<Node *>(ATOMIC_OP::getPtrAcquire(&d_nextRead));
1056 Node *exp;
1057 do {
1058 Node *next =
1059 static_cast<Node *>(ATOMIC_OP::getPtrAcquire(&readFrom->d_next));
1060
1061 exp = readFrom;
1062 readFrom = static_cast<Node *>(ATOMIC_OP::testAndSwapPtrAcqRel(
1063 &d_nextRead,
1064 readFrom,
1065 next));
1066 } while (readFrom != exp);
1067
1068 readFrom->d_value.object().~TYPE();
1069
1070 ATOMIC_OP::setIntRelease(&readFrom->d_state, e_WRITABLE);
1071 }
1072
1073 {
1074 bslmt::LockGuard<MUTEX> guard(&d_emptyMutex);
1075 }
1076 d_emptyCondition.broadcast();
1077}
1078
1079 // Enqueue/Dequeue State
1080
1081template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
1084{
1085 incrementUntil(&d_popFrontDisabled, 1);
1086
1087 {
1088 bslmt::LockGuard<MUTEX> guard(&d_readMutex);
1089 }
1090 d_readCondition.broadcast();
1091
1092 {
1093 bslmt::LockGuard<MUTEX> guard(&d_emptyMutex);
1094 }
1095 d_emptyCondition.broadcast();
1096}
1097
1098template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
1101{
1102 incrementUntil(&d_pushBackDisabled, 1);
1103}
1104
1105template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
1108{
1109 incrementUntil(&d_popFrontDisabled, 0);
1110}
1111
1112template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
1115{
1116 incrementUntil(&d_pushBackDisabled, 0);
1117}
1118
1119// ACCESSORS
1120template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
1122 isEmpty() const
1123{
1124 bsls::Types::Int64 state = ATOMIC_OP::getInt64Acquire(&d_state);
1125 return isEmpty(state);
1126}
1127
1128template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
1133
1134template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
1137{
1138 return 1 == (ATOMIC_OP::getUintAcquire(&d_popFrontDisabled) & 1);
1139}
1140
1141template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
1144{
1145 return 1 == (ATOMIC_OP::getUintAcquire(&d_pushBackDisabled) & 1);
1146}
1147
1148template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
1150 numElements() const
1151{
1152 bsls::Types::Int64 state = ATOMIC_OP::getInt64Acquire(&d_state);
1153 bsls::Types::Int64 avail = getAvailable(state);
1154
1155 return avail >= 0 ? static_cast<bsl::size_t>(avail) : 0;
1156}
1157
1158template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
1160 waitUntilEmpty() const
1161{
1162 unsigned int generation = ATOMIC_OP::getUintAcquire(&d_popFrontDisabled);
1163 if (1 == (generation & 1)) {
1164 return e_DISABLED; // RETURN
1165 }
1166
1167 bslmt::LockGuard<MUTEX> guard(&d_emptyMutex);
1168
1169 bsls::Types::Int64 state = ATOMIC_OP::getInt64Acquire(&d_state);
1170 while (!isEmpty(state)) {
1171 if (generation != ATOMIC_OP::getUintAcquire(&d_popFrontDisabled)) {
1172 return e_DISABLED; // RETURN
1173 }
1174 d_emptyCondition.wait(&d_emptyMutex);
1175 state = ATOMIC_OP::getInt64Acquire(&d_state);
1176 }
1177
1178 return 0;
1179}
1180
1181 // Aspects
1182
1183template <class TYPE, class ATOMIC_OP, class MUTEX, class CONDITION>
1189
1190} // close package namespace
1191
1192
1193#endif
1194
1195// ----------------------------------------------------------------------------
1196// Copyright 2019 Bloomberg Finance L.P.
1197//
1198// Licensed under the Apache License, Version 2.0 (the "License");
1199// you may not use this file except in compliance with the License.
1200// You may obtain a copy of the License at
1201//
1202// http://www.apache.org/licenses/LICENSE-2.0
1203//
1204// Unless required by applicable law or agreed to in writing, software
1205// distributed under the License is distributed on an "AS IS" BASIS,
1206// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1207// See the License for the specific language governing permissions and
1208// limitations under the License.
1209// ----------------------------- END-OF-FILE ----------------------------------
1210
1211/** @} */
1212/** @} */
1213/** @} */
#define BSLMF_NESTED_TRAIT_DECLARATION(t_TYPE, t_TRAIT)
Definition bslmf_nestedtraitdeclaration.h:231
Definition bdlcc_singleproducerqueueimpl.h:188
~SingleProducerQueueImpl_PopCompleteGuard()
Definition bdlcc_singleproducerqueueimpl.h:576
Definition bdlcc_singleproducerqueueimpl.h:149
~SingleProducerQueueImpl_ReleaseAllRawProctor()
Definition bdlcc_singleproducerqueueimpl.h:544
void release()
Definition bdlcc_singleproducerqueueimpl.h:553
Definition bdlcc_singleproducerqueueimpl.h:230
void disablePopFront()
Definition bdlcc_singleproducerqueueimpl.h:1083
bool isFull() const
Definition bdlcc_singleproducerqueueimpl.h:1129
SingleProducerQueueImpl(bslma::Allocator *basicAllocator=0)
Definition bdlcc_singleproducerqueueimpl.h:736
void enablePushBack()
Definition bdlcc_singleproducerqueueimpl.h:1114
SingleProducerQueueImpl(bsl::size_t capacity, bslma::Allocator *basicAllocator=0)
Definition bdlcc_singleproducerqueueimpl.h:777
bool isEmpty() const
Definition bdlcc_singleproducerqueueimpl.h:1122
int waitUntilEmpty() const
Definition bdlcc_singleproducerqueueimpl.h:1160
void disablePushBack()
Definition bdlcc_singleproducerqueueimpl.h:1100
int tryPushBack(bslmf::MovableRef< TYPE > value)
Definition bdlcc_singleproducerqueueimpl.h:1026
void enablePopFront()
Definition bdlcc_singleproducerqueueimpl.h:1107
int popFront(TYPE *value)
Definition bdlcc_singleproducerqueueimpl.h:837
int tryPopFront(TYPE *value)
Definition bdlcc_singleproducerqueueimpl.h:976
int tryPushBack(const TYPE &value)
Definition bdlcc_singleproducerqueueimpl.h:1019
TYPE value_type
Definition bdlcc_singleproducerqueueimpl.h:387
bslma::Allocator * allocator() const
Return the allocator used by this object to supply memory.
Definition bdlcc_singleproducerqueueimpl.h:1185
int pushBack(const TYPE &value)
Definition bdlcc_singleproducerqueueimpl.h:885
bsl::size_t numElements() const
Returns the number of elements currently in this queue.
Definition bdlcc_singleproducerqueueimpl.h:1150
bool isPopFrontDisabled() const
Definition bdlcc_singleproducerqueueimpl.h:1136
int pushBack(bslmf::MovableRef< TYPE > value)
Definition bdlcc_singleproducerqueueimpl.h:930
void removeAll()
Definition bdlcc_singleproducerqueueimpl.h:1033
~SingleProducerQueueImpl()
Destroy this object.
Definition bdlcc_singleproducerqueueimpl.h:830
bool isPushBackDisabled() const
Definition bdlcc_singleproducerqueueimpl.h:1143
Definition bslma_allocator.h:457
virtual void * allocate(size_type size)=0
Definition bslmf_movableref.h:751
Definition bslmt_lockguard.h:234
#define BSLS_IDENT(str)
Definition bsls_ident.h:195
Definition bdlcc_boundedqueue.h:270
T::iterator end(T &container)
Definition bslstl_iterator.h:1523
Definition balxml_encoderoptions.h:68
static void moveConstruct(TARGET_TYPE *address, TARGET_TYPE &original, bslma::Allocator *allocator)
Definition bslalg_scalarprimitives.h:1642
static void copyConstruct(TARGET_TYPE *address, const TARGET_TYPE &original, bslma::Allocator *allocator)
Definition bslalg_scalarprimitives.h:1599
static MovableRef< t_TYPE > move(t_TYPE &reference) BSLS_KEYWORD_NOEXCEPT
Definition bslmf_movableref.h:1060
static void yield()
Definition bslmt_threadutil.h:1013
long long Int64
Definition bsls_types.h:132
Definition bsls_objectbuffer.h:276