// bdlcc_boundedqueue.h                                               -*-C++-*-

#ifndef INCLUDED_BDLCC_BOUNDEDQUEUE
#define INCLUDED_BDLCC_BOUNDEDQUEUE

#include <bsls_ident.h>
BSLS_IDENT("$Id: $")

//@PURPOSE: Provide a thread-aware bounded queue of values.
//
//@CLASSES:
//  bdlcc::BoundedQueue: thread-aware bounded queue of 'TYPE'
//
//@SEE_ALSO: bdlcc_fixedqueue
//
//@DESCRIPTION: This component defines a type, 'bdlcc::BoundedQueue', that
// provides an efficient, thread-aware bounded (capacity fixed at construction)
// queue of values.  This class is ideal for synchronization and communication
// between threads in a producer-consumer model when a bounded queue is
// appropriate.   Under most cicrumstances developers should prefer
// this component to the older {bdlcc_fixedqueue} (see {Comparison to
// FixedQueue}).
//
// The queue provides 'pushBack' and 'popFront' methods for pushing data into
// the queue and popping data from the queue.  When the queue is full, the
// 'pushBack' methods block until data is removed from the queue.  When the
// queue is empty, the 'popFront' methods block until data appears in the
// queue.  Non-blocking methods 'tryPushBack' and 'tryPopFront' are also
// provided.  The 'tryPushBack' method fails immediately, returning a non-zero
// value, if the queue is full.  The 'tryPopFront' method fails immediately,
// returning a non-zero value, if the queue is empty.
//
// The queue may be placed into a "enqueue disabled" state using the
// 'disablePushBack' method.  When disabled, 'pushBack' and 'tryPushBack' fail
// immediately and return an error code.  Any threads blocked in 'pushBack'
// when the queue is enqueue disabled return from 'pushBack' immediately and
// return an error code.  The queue may be restored to normal operation with
// the 'enablePushBack' method.
//
// The queue may be placed into a "dequeue disabled" state using the
// 'disablePopFront' method.  When dequeue disabled, 'popFront' and
// 'tryPopFront' fail immediately and return an error code.  Any threads
// blocked in 'popFront' when the queue is dequeue disabled return from
// 'popFront' immediately and return an error code.  The queue may be restored
// to normal operation with the 'enablePopFront' method.
//
///Comparison To FixedQueue
///------------------------
// Both 'bdlcc::FixedQueue' and 'bdlcc::BoundedQueue' provide thread-aware
// bounded queues.  Under most circumstances developers should prefer
// {bdlcc_boundedqueue}: it is newer, has additional features, and provides
// better performance under most circumstances.  'bdlcc::BoundedQueue' is not
// quite a drop in replacement for 'bdlcc::FixedQueue' so both types are
// currently maintained.  There is additional information about
// performance of various queues in the article Concurrent Queue Evaluation
// (https://tinyurl.com/mr2un9f7).
//
///Template Requirements
///---------------------
// 'bdlcc::BoundedQueue' is a template that is parameterized on the type of
// element contained within the queue.  The supplied template argument, 'TYPE',
// must provide both a default constructor and a copy constructor, as well as
// an assignment operator.  If the default constructor accepts a
// 'bslma::Allocator *', 'TYPE' must declare the uses 'bslma::Allocator' trait
// (see 'bslma_usesbslmaallocator') so that the allocator of the queue is
// propagated to the elements contained in the queue.
//
///Exception Safety
///----------------
// A 'bdlcc::BoundedQueue' is exception neutral, and all of the methods of
// 'bdlcc::BoundedQueue' provide the basic exception safety guarantee (see
// 'bsldoc_glossary').  If an exception occurs while writing to an element, the
// element is marked unusable until after a read attempt from the element (at
// which point the element is "reclaimed").  This failure to write does not
// increment the result returned by 'numElements'.  Hence,
// 'numElements() == capacity()' is not a valid replacement for 'isFull()'.
//
///Move Semantics in C++03
///-----------------------
// Move-only types are supported by 'bdlcc::BoundedQueue' on C++11 platforms
// only (where 'BSLMF_MOVABLEREF_USES_RVALUE_REFERENCES' is defined), and are
// not supported on C++03 platforms.  Unfortunately, in C++03, there are user
// types where a 'bslmf::MovableRef' will not safely degrade to a lvalue
// reference when a move constructor is not available (types providing a
// constructor template taking any type), so 'bslmf::MovableRefUtil::move'
// cannot be used directly on a user supplied template type.  See internal bug
// report 99039150 for more information.
//
///Usage
///-----
// This section illustrates intended use of this component.
//
///Example 1: A Simple Thread Pool
///- - - - - - - - - - - - - - - -
// In the following example a 'bdlcc::BoundedQueue' is used to communicate
// between a single "producer" thread and multiple "consumer" threads.  The
// "producer" will push work requests onto the queue, and each "consumer" will
// iteratively take a work request from the queue and service the request.
// This example shows a partial, simplified implementation of the
// 'bdlmt::FixedThreadPool' class.  See component 'bdlmt_fixedthreadpool' for
// more information.
//
// First, we define a utility classes that handles a simple "work item":
//..
//  struct my_WorkData {
//      // Work data...
//  };
//
//  struct my_WorkRequest {
//      enum RequestType {
//          e_WORK = 1,
//          e_STOP = 2
//      };
//
//      RequestType d_type;
//      my_WorkData d_data;
//      // Work data...
//  };
//..
// Next, we provide a simple function to service an individual work item.  The
// details are unimportant for this example:
//..
//  void myDoWork(const my_WorkData& data)
//      // Do some work based upon the specified 'data'.
//  {
//      // do some stuff...
//      (void)data;
//  }
//..
// Then, we define a 'myConsumer' function that will pop elements off the queue
// and process them.  Note that the call to 'queue->popFront()' will block
// until there is an element available on the queue.  This function will be
// executed in multiple threads, so that each thread waits in
// 'queue->popFront()', and 'bdlcc::BoundedQueue' guarantees that each thread
// gets a unique element from the queue:
//..
//  void myConsumer(bdlcc::BoundedQueue<my_WorkRequest> *queue)
//      // Pop elements from the specified 'queue'.
//  {
//      while (1) {
//          // 'popFront()' will wait for a 'my_WorkRequest' until available.
//
//          my_WorkRequest item;
//          item.d_type = my_WorkRequest::e_WORK;
//
//          assert(0 == queue->popFront(&item));
//
//          if (item.d_type == my_WorkRequest::e_STOP) { break; }
//          myDoWork(item.d_data);
//      }
//  }
//..
// Finally, we define a 'myProducer' function that serves multiple roles: it
// creates the 'bdlcc::BoundedQueue', starts the consumer threads, and then
// produces and enqueues work items.  When work requests are exhausted, this
// function enqueues one 'e_STOP' item for each consumer queue.  This 'e_STOP'
// item indicates to the consumer thread to terminate its thread-handling
// function.
//
// Note that, although the producer cannot control which thread 'pop's a
// particular work item, it can rely on the knowledge that each consumer thread
// will read a single 'e_STOP' item and then terminate.
//..
//  void myProducer(int numThreads)
//      // Create a queue, start the specified 'numThreads' consumer threads,
//      // produce and enqueue work.
//  {
//      enum {
//          k_MAX_QUEUE_LENGTH = 100,
//          k_NUM_WORK_ITEMS   = 1000
//      };
//
//      bdlcc::BoundedQueue<my_WorkRequest> queue(k_MAX_QUEUE_LENGTH);
//
//      bslmt::ThreadGroup consumerThreads;
//      consumerThreads.addThreads(bdlf::BindUtil::bind(&myConsumer, &queue),
//                                 numThreads);
//
//      for (int i = 0; i < k_NUM_WORK_ITEMS; ++i) {
//          my_WorkRequest item;
//          item.d_type = my_WorkRequest::e_WORK;
//          item.d_data = my_WorkData(); // some stuff to do
//          queue.pushBack(item);
//      }
//
//      for (int i = 0; i < numThreads; ++i) {
//          my_WorkRequest item;
//          item.d_type = my_WorkRequest::e_STOP;
//          queue.pushBack(item);
//      }
//
//      consumerThreads.joinAll();
//  }
//..

#include <bdlscm_version.h>

#include <bdlb_bitutil.h>

#include <bslalg_scalarprimitives.h>

#include <bslma_usesbslmaallocator.h>

#include <bslmf_istriviallycopyable.h>
#include <bslmf_movableref.h>
#include <bslmf_nestedtraitdeclaration.h>

#include <bslmt_condition.h>
#include <bslmt_fastpostsemaphore.h>
#include <bslmt_lockguard.h>
#include <bslmt_mutex.h>

#include <bsls_assert.h>
#include <bsls_atomicoperations.h>
#include <bsls_objectbuffer.h>
#include <bsls_types.h>

#include <bsl_climits.h>
#include <bsl_cstdint.h>

namespace BloombergLP {
namespace bdlcc {

                   // ===================================
                   // class BoundedQueue_PopCompleteGuard
                   // ===================================

template <class TYPE, class NODE>
class BoundedQueue_PopCompleteGuard {
    // This class implements a guard that invokes 'TYPE::popComplete' on a
    // 'NODE' upon destruction.

    // DATA
    TYPE *d_queue_p;  // managed queue owning the managed node
    NODE *d_node_p;   // managed node

    // NOT IMPLEMENTED
    BoundedQueue_PopCompleteGuard();
    BoundedQueue_PopCompleteGuard(const BoundedQueue_PopCompleteGuard&);
    BoundedQueue_PopCompleteGuard& operator=(
                                         const BoundedQueue_PopCompleteGuard&);

  public:
    // CREATORS
    BoundedQueue_PopCompleteGuard(TYPE *queue, NODE *node);
        // Create a 'popComplete' guard managing the specified 'queue' and
        // 'node'.

    ~BoundedQueue_PopCompleteGuard();
        // Destroy this object and invoke the 'TYPE::popComplete' method with
        // the managed 'node'.
};

             // ===============================================
             // class BoundedQueue_PushExceptionCompleteProctor
             // ===============================================

template <class TYPE>
class BoundedQueue_PushExceptionCompleteProctor {
    // This class implements a proctor that invokes
    // 'TYPE::pushExceptionComplete' upon destruction unless 'release' has been
    // called.

    // DATA
    TYPE *d_queue_p;  // managed queue

    // NOT IMPLEMENTED
    BoundedQueue_PushExceptionCompleteProctor();
    BoundedQueue_PushExceptionCompleteProctor(
                             const BoundedQueue_PushExceptionCompleteProctor&);
    BoundedQueue_PushExceptionCompleteProctor& operator=(
                             const BoundedQueue_PushExceptionCompleteProctor&);

  public:
    // CREATORS
    explicit
    BoundedQueue_PushExceptionCompleteProctor(TYPE *queue);
        // Create a 'pushExceptionComplete' proctor that conditionally manages
        // the specified 'queue' (if non-zero).

    ~BoundedQueue_PushExceptionCompleteProctor();
        // Destroy this object and, if 'release' has not been invoked', invoke
        // the managed queue's 'pushExceptionComplete' method.

    // MANIPULATORS
    void release();
        // Release from management the queue currently managed by this proctor.
        // If no queue is currently managed, this method has no effect.
};

                         // ========================
                         // struct BoundedQueue_Node
                         // ========================

template <class TYPE, bool RECLAIMABLE>
struct BoundedQueue_Node;
    // This class implements the queue's node.  A node stores an instance of
    // the specified (template parameter) 'TYPE', and provides an accessor
    // 'isUnconstructed' that indicates whether the value of the node was
    // correctly constructed.  If 'isUnconstructed' is 'false', then the value
    // ('d_value') refers to a valid object.  If 'isUnconstructed' is 'true'
    // then 'd_value' does not refer to a valid object, it does not represent a
    // value in this queue, and the destructor of 'd_value' should not be
    // called.  The specified (template parameter) type 'RECLAIMABLE' is used
    // to provide a compile time optimization for the footprint of this
    // template when the value of 'isUnconstructed' is known at compile-time.
    // If 'RECLAIMABLE' is 'false' then it can be determined at compile time
    // that the construction of 'TYPE' will uncoditionally succeed (e.g., it
    // 'is_trivially_copyable'), and the 'isUnconstructed' property does not
    // require a data member to be accessed at run-time.

template <class TYPE>
struct BoundedQueue_Node<TYPE, true> {
  private:
    // DATA
    bool                     d_isUnconstructedFlag;  // node suffered exception

  public:
    // PUBLIC DATA
    bsls::ObjectBuffer<TYPE> d_value;                // stored value

    // MANIPULATORS
    void setIsUnconstructed(bool isUnconstructedFlag);
        // If the specified 'isUnconstructedFlag' is 'false', then 'd_value'
        // refers to a valid object of (the template parameter) 'TYPE',
        // otherwise (if 'isUnconstrucedFlag' is 'true') 'd_value' does not
        // refer to a valid object (because the attempt to construct 'TYPE'
        // resulted in an expection).  Note that this method is normally
        // invoked after each write to 'd_value'.

    // ACCESSORS
    bool isUnconstructed() const;
        // Return whether an exception occurred when last writing to 'd_value'.
};

template <class TYPE>
struct BoundedQueue_Node<TYPE, false> {
    // PUBLIC DATA
    bsls::ObjectBuffer<TYPE> d_value;    // stored value

    // MANIPULATORS
    void setIsUnconstructed(bool /* value */);
        // Do nothing.

    // ACCESSORS
    bool isUnconstructed() const;
        // Return 'false'.
};

                            // ==================
                            // class BoundedQueue
                            // ==================

template <class TYPE>
class BoundedQueue {
    // This class provides a thread-safe bounded queue of values.

    // PRIVATE CONSTANTS

    // The following constants are used to maintain the queue's 'd_popCount'
    // and 'd_pushCount' values.  See *Implementation* *Note* for details.

    static const bsls::Types::Uint64 k_STARTED_MASK   = 0x00000000ffffffffLL;
    static const bsls::Types::Uint64 k_STARTED_INC    = 0x0000000000000001LL;
    static const bsls::Types::Uint64 k_STARTED_DEC    = 0xffffffffffffffffLL;
    static const bsls::Types::Uint64 k_FINISHED_INC   = 0x0000000100000000LL;
    static const unsigned int        k_FINISHED_SHIFT = 32;

    static const unsigned int k_MAXIMUM_CIRCULAR_DIFFERENCE =
         static_cast<unsigned int>(1) << (sizeof(unsigned int) * CHAR_BIT - 1);

    // PRIVATE TYPES
    typedef unsigned int                                         Uint;
    typedef typename bsls::Types::Uint64                         Uint64;
    typedef typename bsls::AtomicOperations::AtomicTypes::Uint   AtomicUint;
    typedef typename bsls::AtomicOperations::AtomicTypes::Uint64 AtomicUint64;

    typedef typename bsls::AtomicOperations AtomicOp;

    typedef BoundedQueue_Node<TYPE,
                              !bsl::is_trivially_copyable<TYPE>::value> Node;

    // DATA
    bslmt::FastPostSemaphore  d_pushSemaphore;     // synchronization primitive
                                                   // restricting access to
                                                   // empty elements and
                                                   // providing
                                                   // enablement/disablement of
                                                   // "push" operations

    AtomicUint64              d_pushCount;         // count of "push"
                                                   // operations started and
                                                   // completed, used to detect
                                                   // a quiescent state for
                                                   // safely incrementing
                                                   // number of available
                                                   // elements

    AtomicUint64              d_pushIndex;         // index of next enqueue
                                                   // element location

    bslmt::FastPostSemaphore  d_popSemaphore;      // synchronization primitive
                                                   // restricting access to
                                                   // available elements and
                                                   // providing
                                                   // enablement/disablement of
                                                   // "pop" operations

    AtomicUint64              d_popCount;          // count of "pop" operations
                                                   // started and completed,
                                                   // used to detect a
                                                   // quiescent state for
                                                   // safely incrementing
                                                   // number of empty elements

    AtomicUint64              d_popIndex;          // index of next dequeue
                                                   // element location

    mutable AtomicUint        d_emptyWaiterCount;  // circular count of
                                                   // 'waitUntilEmpty'
                                                   // invocations

    AtomicUint                d_emptyCountSeen;    // maximum
                                                   // 'd_emptyWaiterCount' seen
                                                   // prior to the queue being
                                                   // observed as empty; used
                                                   // to detect most very short
                                                   // lived transitions to the
                                                   // queue being empty

    mutable bslmt::Mutex      d_emptyMutex;        // blocking point for
                                                   // 'waitUntilEmpty'

    mutable bslmt::Condition  d_emptyCondition;    // condition variable for
                                                   // 'waitUntilEmpty'

    Node                     *d_element_p;         // array of elements that
                                                   // comprise the bounded
                                                   // queue

    Uint64                    d_capacity;          // capacity of the queue

    bslma::Allocator         *d_allocator_p;       // allocator, held not owned

    // FRIENDS
    friend class BoundedQueue_PopCompleteGuard<
                                            BoundedQueue<TYPE>,
                                            typename BoundedQueue<TYPE>::Node>;

    friend class BoundedQueue_PushExceptionCompleteProctor<
                                                          BoundedQueue<TYPE> >;

    // PRIVATE CLASS METHODS
    static bool circularlyGreater(Uint lhs, Uint rhs);
        // Return 'true' if the specified 'lhs' is circularly greater than the
        // specified 'rhs', and 'false' otherwise.  'lhs' is cicularly greater
        // than 'rhs' if 'lhs' is equal to a value obtained by adding a value
        // in '[1 .. 2^31]' to 'rhs'.

    static bool isQuiescentState(bsls::Types::Uint64 count);
        // Return 'true' if the specified 'count' implies a quiescent state
        // (see *Implementation* *Note*), and 'false' otherwise.  A quiescent
        // state indicates there is a (possibly zero length) contiguous set of
        // elements that can safely be made available to the operation
        // complementary to the operation 'count' tracks (pop and push are
        // complementary operations).

    static Uint64 markFinishedOperation(AtomicUint64 *count);
    static Uint64 markFinishedOperation(AtomicUint64 *count, int num);
        // Update the specified 'count' to indicate that an operation (or the
        // optionally specified 'num' operations) has completed, and return the
        // updated 'count' value.  Marking an operation finished means that
        // 'isQuiscentState' *may* now be true.  The behavior is undefined
        // unless 'markStartedOperation' was previously called on this 'count',
        // and a corresponding 'markFinishedOperation' or
        // 'unmarkStartOperation' has not already been called.  Note that the
        // "operation" that has finished refers to either a push or pop
        // (depending on whether this is applied to 'd_pushCount' or
        // 'd_popCount').

    static void markReclaimed(AtomicUint64 *count);
        // Update the specified 'count' to indicate that a node that suffered
        // an exception has been reclaimed.  Marking a node reclaimed can *not*
        // alter the value of 'isQuiescentState', but does increase the size of
        // the contiguous set of elements that will eventially be made
        // available to the operation complementary to the operation 'count'
        // tracks (pop and push are complementary operations).  Note that this
        // method does not require a previous call to 'markStartedOperation'
        // and does not meet the requirements for 'markFinishedOperation' or
        // 'unmarkStartOperation' to be invoked.

    static void markStartedOperation(AtomicUint64 *count);
    static void markStartedOperation(AtomicUint64 *count, int num);
        // Update the specified 'count' to indicate that an operation (or the
        // optionally specified 'num' operations) has started, and return the
        // updated 'count' value.  Marking an operation started means that
        // 'isQuiescentState' is not true and will not be true until
        // 'markFinishedOperation' or 'unmarkStartedOperation' is invoked.
        // Note that the "operation" that has started refers to either a push
        // or pop (depending on whether this is applied to 'd_pushCount' or
        // 'd_popCount').

    static Uint64 unmarkStartedOperation(AtomicUint64 *count);
        // Update the specified 'count' to indicate that an operation has
        // aborted without finishing, and return the updated 'count' value.
        // Marking a started operation as having aborted means that
        // 'isQuiscentState' *may* now be true.  The behavior is undefined
        // unless 'markStartedOperation' was previously called on this 'count',
        // and a corresponding 'markFinishedOperation' or
        // 'unmarkStartOperation' has not already been called.  Note that the
        // "operation" that has aborted refers to either a push or pop
        // (depending on whether this is applied to 'd_pushCount' or
        // 'd_popCount').

    // PRIVATE MANIPULATORS
    void popComplete(Node *node);
        // Destruct the value stored in the specified 'node', and mark the
        // 'node' writable.  This method is used within 'popFrontHelper' by a
        // guard to complete the reclamation of a node in the presence of an
        // exception.

    void popFrontHelper(TYPE *value);
        // Remove the element from the front of this queue and load that
        // element into the specified 'value'.  This method is invoked by
        // 'popFront' and 'tryPopFront' once an element is available.

    void pushComplete();
        // Mark a "push" operation as complete, and 'post' to the
        // 'd_popSemaphore' if appropriate.

    void pushExceptionComplete();
        // Remove the indicator for a started push operation, and 'post' to the
        // 'd_popSemaphore' if appropriate.  This method is used within
        // 'pushFront' by a proctor to complete the marking of a node to
        // reclaim in the presence of an exception.

    bool updateEmptyCountSeen(Uint emptyCount);
        // If the specified 'emptyCount' is (circularly) greater than
        // 'd_emptyCountSeen', assign 'd_emptyCountSeen' the value of
        // 'emptyCount' and return 'true'.  Otherwise, return 'false'.  A
        // return value of 'true' indicates this thread *must* signal any
        // waiting threads.  Note that a return value of 'false' indicates
        // another thread has (or will) signal the queue is empty and this
        // thread does not need to signal.

    // NOT IMPLEMENTED
    BoundedQueue(const BoundedQueue&);
    BoundedQueue& operator=(const BoundedQueue&);

  public:
    // TRAITS
    BSLMF_NESTED_TRAIT_DECLARATION(BoundedQueue, bslma::UsesBslmaAllocator);

    // PUBLIC TYPES
    typedef TYPE value_type;  // The type for elements.

    // PUBLIC CONSTANTS
    enum {
        e_SUCCESS  =  0,  // must be 0
        e_EMPTY    = -1,
        e_FULL     = -2,
        e_DISABLED = -3,
        e_FAILED   = -4
    };

    // CREATORS
    explicit
    BoundedQueue(bsl::size_t capacity, bslma::Allocator *basicAllocator = 0);
        // Create a thread-aware queue with at least the specified 'capacity'.
        // Optionally specify a 'basicAllocator' used to supply memory.  If
        // 'basicAllocator' is 0, the currently installed default allocator is
        // used.

    ~BoundedQueue();
        // Destroy this object.

    // MANIPULATORS
    int popFront(TYPE *value);
        // Remove the element from the front of this queue and load that
        // element into the specified 'value'.  If the queue is empty, block
        // until it is not empty.  Return 0 on success, and a non-zero value
        // otherwise.  Specifically, return 'e_SUCCESS' on success,
        // 'e_DISABLED' if 'isPopFrontDisabled()' and 'e_FAILED' if an error
        // occurs.  On failure, 'value' is not changed.  Threads blocked due to
        // the queue being empty will return 'e_DISABLED' if 'disablePopFront'
        // is invoked.

    int pushBack(const TYPE& value);
        // Append the specified 'value' to the back of this queue.  If the
        // queue is full, block until it is not full.  Return 0 on success, and
        // a non-zero value otherwise.  Specifically, return 'e_SUCCESS' on
        // success, 'e_DISABLED' if 'isPushBackDisabled()' and 'e_FAILED' if an
        // error occurs.  Threads blocked due to the queue being full will
        // return 'e_DISABLED' if 'disablePushBack' is invoked.

    int pushBack(bslmf::MovableRef<TYPE> value);
        // Append the specified move-insertable 'value' to the back of this
        // queue.  If the queue is full, block until it is not full.  'value'
        // is left in a valid but unspecified state.  Return 0 on success, and
        // a non-zero value otherwise.  Specifically, return 'e_SUCCESS' on
        // success, 'e_DISABLED' if 'isPushBackDisabled()' and 'e_FAILED' if an
        // error occurs.  On failure, 'value' is not changed.  Threads blocked
        // due to the queue being full will return 'e_DISABLED' if
        // 'disablePushBack' is invoked.

    void removeAll();
        // Remove all items currently in this queue.  Note that this operation
        // is not atomic; if other threads are concurrently pushing items into
        // the queue the result of 'numElements()' after this function returns
        // is not guaranteed to be 0.

    int tryPopFront(TYPE *value);
        // Attempt to remove the element from the front of this queue without
        // blocking, and, if successful, load the specified 'value' with the
        // removed element.  Return 0 on success, and a non-zero value
        // otherwise.  Specifically, return 'e_SUCCESS' on success,
        // 'e_DISABLED' if 'isPopFrontDisabled()', 'e_EMPTY' if
        // '!isPopFrontDisabled()' and the queue was empty, and 'e_FAILED' if
        // an error occurs.  On failure, 'value' is not changed.

    int tryPushBack(const TYPE& value);
        // Append the specified 'value' to the back of this queue.  Return 0 on
        // success, and a non-zero value otherwise.  Specifically, return
        // 'e_SUCCESS' on success, 'e_DISABLED' if 'isPushBackDisabled()',
        // 'e_FULL' if '!isPushBackDisabled()' and the queue was full, and
        // 'e_FAILED' if an error occurs.

    int tryPushBack(bslmf::MovableRef<TYPE> value);
        // Append the specified move-insertable 'value' to the back of this
        // queue.  'value' is left in a valid but unspecified state.  Return 0
        // on success, and a non-zero value otherwise.  Specifically, return
        // 'e_SUCCESS' on success, 'e_DISABLED' if 'isPushBackDisabled()',
        // 'e_FULL' if '!isPushBackDisabled()' and the queue was full, and
        // 'e_FAILED' if an error occurs.  On failure, 'value' is not changed.

                       // Enqueue/Dequeue State

    void disablePopFront();
        // Disable dequeueing from this queue.  All subsequent invocations of
        // 'popFront' or 'tryPopFront' will fail immediately.  All blocked
        // invocations of 'popFront' and 'waitUntilEmpty' will fail
        // immediately.  If the queue is already dequeue disabled, this method
        // has no effect.

    void disablePushBack();
        // Disable enqueueing into this queue.  All subsequent invocations of
        // 'pushBack' or 'tryPushBack' will fail immediately.  All blocked
        // invocations of 'pushBack' will fail immediately.  If the queue is
        // already enqueue disabled, this method has no effect.

    void enablePopFront();
        // Enable dequeueing.  If the queue is not dequeue disabled, this call
        // has no effect.

    void enablePushBack();
        // Enable queuing.  If the queue is not enqueue disabled, this call has
        // no effect.

    // ACCESSORS
    bsl::size_t capacity() const;
        // Return the maximum number of elements that may be stored in this
        // queue.  Note that the value returned may be greater than that
        // supplied at construction.

    bool isEmpty() const;
        // Return 'true' if this queue is empty (has no elements), or 'false'
        // otherwise.

    bool isFull() const;
        // Return 'true' if this queue is full (has no available capacity), or
        // 'false' otherwise.  Note that for unbounded queues, this method
        // always returns 'false'.

    bool isPopFrontDisabled() const;
        // Return 'true' if this queue is dequeue disabled, and 'false'
        // otherwise.  Note that the queue is created in the "dequeue enabled"
        // state.

    bool isPushBackDisabled() const;
        // Return 'true' if this queue is enqueue disabled, and 'false'
        // otherwise.  Note that the queue is created in the "enqueue enabled"
        // state.

    bsl::size_t numElements() const;
        // Returns the number of elements currently in this queue.  Note that
        // 'numElements() == capacity()' is not a valid replacement for
        // 'isFull' (see {Exception Safety} for details).

    int waitUntilEmpty() const;
        // Block until all the elements in this queue are removed.  Return 0 on
        // success, and a non-zero value otherwise.  Specifically, return
        // 'e_SUCCESS' on success, 'e_DISABLED' if
        // '!isEmpty() && isPopFrontDisabled()'.  A blocked thread waiting for
        // the queue to empty will return 'e_DISABLED' if 'disablePopFront' is
        // invoked.

                                  // Aspects

    bslma::Allocator *allocator() const;
        // Return the allocator used by this object to supply memory.
};

// ============================================================================
//                             INLINE DEFINITIONS
// ============================================================================

                   // -----------------------------------
                   // class BoundedQueue_PopCompleteGuard
                   // -----------------------------------

// CREATORS
template <class TYPE, class NODE>
inline
BoundedQueue_PopCompleteGuard<TYPE, NODE>::
                         BoundedQueue_PopCompleteGuard(TYPE *queue, NODE *node)
: d_queue_p(queue)
, d_node_p(node)
{
}

template <class TYPE, class NODE>
inline
BoundedQueue_PopCompleteGuard<TYPE, NODE>::~BoundedQueue_PopCompleteGuard()
{
    d_queue_p->popComplete(d_node_p);
}

             // -----------------------------------------------
             // class BoundedQueue_PushExceptionCompleteProctor
             // -----------------------------------------------

// CREATORS
template <class TYPE>
inline
BoundedQueue_PushExceptionCompleteProctor<TYPE>::
                         BoundedQueue_PushExceptionCompleteProctor(TYPE *queue)
: d_queue_p(queue)
{
}

template <class TYPE>
inline
BoundedQueue_PushExceptionCompleteProctor<TYPE>::
                                   ~BoundedQueue_PushExceptionCompleteProctor()
{
    if (d_queue_p) {
        d_queue_p->pushExceptionComplete();
    }
}

// MANIPULATORS
template <class TYPE>
inline
void BoundedQueue_PushExceptionCompleteProctor<TYPE>::release()
{
    d_queue_p = 0;
}

                         // ------------------------
                         // struct BoundedQueue_Node
                         // ------------------------

// MANIPULATORS
template <class TYPE>
inline
void BoundedQueue_Node<TYPE, true>::setIsUnconstructed(
                                                      bool isUnconstructedFlag)
{
    d_isUnconstructedFlag = isUnconstructedFlag;
}

template <class TYPE>
inline
void BoundedQueue_Node<TYPE, false>::setIsUnconstructed(
                                                 bool /* isUnconstrutedFlag */)
{
}

// ACCESSORS
template <class TYPE>
inline
bool BoundedQueue_Node<TYPE, true>::isUnconstructed() const
{
    return d_isUnconstructedFlag;
}

template <class TYPE>
inline
bool BoundedQueue_Node<TYPE, false>::isUnconstructed() const
{
    return false;
}

                            // ------------------
                            // class BoundedQueue
                            // ------------------

// PRIVATE CLASS METHODS
template <class TYPE>
inline
bool BoundedQueue<TYPE>::circularlyGreater(Uint lhs, Uint rhs)
{
    return lhs > rhs ? (lhs - rhs) <= k_MAXIMUM_CIRCULAR_DIFFERENCE
                     : (rhs - lhs) >  k_MAXIMUM_CIRCULAR_DIFFERENCE;
}

template <class TYPE>
inline
bool BoundedQueue<TYPE>::isQuiescentState(bsls::Types::Uint64 count)
{
    return (count >> k_FINISHED_SHIFT) == (count & k_STARTED_MASK);
}

template <class TYPE>
inline
bsls::Types::Uint64 BoundedQueue<TYPE>::markFinishedOperation(
                                                           AtomicUint64 *count)
{
    return AtomicOp::addUint64NvAcqRel(count, k_FINISHED_INC);
}

template <class TYPE>
inline
bsls::Types::Uint64 BoundedQueue<TYPE>::markFinishedOperation(
                                                           AtomicUint64 *count,
                                                           int           num)
{
    return AtomicOp::addUint64NvAcqRel(count, num * k_FINISHED_INC);
}

template <class TYPE>
inline
void BoundedQueue<TYPE>::markReclaimed(AtomicUint64 *count)
{
    AtomicOp::addUint64AcqRel(count, k_STARTED_INC + k_FINISHED_INC);
}

template <class TYPE>
inline
void BoundedQueue<TYPE>::markStartedOperation(AtomicUint64 *count)
{
    AtomicOp::addUint64AcqRel(count, k_STARTED_INC);
}

template <class TYPE>
inline
void BoundedQueue<TYPE>::markStartedOperation(AtomicUint64 *count, int num)
{
    AtomicOp::addUint64AcqRel(count, num * k_STARTED_INC);
}

template <class TYPE>
inline
bsls::Types::Uint64 BoundedQueue<TYPE>::unmarkStartedOperation(
                                                           AtomicUint64 *count)
{
    return AtomicOp::addUint64NvAcqRel(count, k_STARTED_DEC);
}

// PRIVATE MANIPULATORS
template <class TYPE>
inline
void BoundedQueue<TYPE>::popComplete(Node *node)
{
    node->d_value.object().~TYPE();

    Uint64 count = markFinishedOperation(&d_popCount);
    if (isQuiescentState(count)) {

        // The total number of popped elements is 'count & k_STARTED_MASK'.
        // Attempt, once, to zero the count and, if successful, post to the
        // push semaphore.

        if (AtomicOp::testAndSwapUint64AcqRel(&d_popCount,
                                              count,
                                              0) == count) {
            d_pushSemaphore.post(static_cast<int>(count & k_STARTED_MASK));

            Uint emptyCount = AtomicOp::getUintAcquire(&d_emptyWaiterCount);

            if (isEmpty() && updateEmptyCountSeen(emptyCount)) {
                {
                    bslmt::LockGuard<bslmt::Mutex> guard(&d_emptyMutex);
                }
                d_emptyCondition.broadcast();
            }
        }
    }
}

template <class TYPE>
void BoundedQueue<TYPE>::popFrontHelper(TYPE *value)
{
    markStartedOperation(&d_popCount);

    // 'd_popIndex' stores the next location to use (want the original value)

    Uint64  index = (AtomicOp::addUint64NvAcqRel(&d_popIndex, 1) - 1)
                                                                  % d_capacity;
    Node   *node  = &d_element_p[index];

    // Nodes marked for reclamation are not counted in 'd_popSemaphore' and are
    // to be skipped; 'd_isUnconstructed' does not need to be modified here
    // since it will be updated in a "push" operation.  However, the node does
    // need to be counted in the 'd_pushSemaphore' as an empty node.  This is
    // accomplished by incrementing the started and finished attributes in
    // 'd_popCount'.

    while (node->isUnconstructed()) {
        markReclaimed(&d_popCount);

        index = (AtomicOp::addUint64NvAcqRel(&d_popIndex, 1) - 1) % d_capacity;
        node  = &d_element_p[index];
    }

    BoundedQueue_PopCompleteGuard<BoundedQueue<TYPE>, Node> guard(this, node);

#if defined(BSLMF_MOVABLEREF_USES_RVALUE_REFERENCES)
    *value = bslmf::MovableRefUtil::move(node->d_value.object());
#else
    *value = node->d_value.object();
#endif
}

template <class TYPE>
inline
void BoundedQueue<TYPE>::pushComplete()
{
    Uint64 count = markFinishedOperation(&d_pushCount);
    if (isQuiescentState(count)) {

        // The total number of pushed elements is 'count & k_STARTED_MASK'.
        // Attempt, once, to zero the count and, if successful, post to the pop
        // semaphore.

        if (AtomicOp::testAndSwapUint64AcqRel(&d_pushCount,
                                               count,
                                               0) == count) {
            d_popSemaphore.post(static_cast<int>(count & k_STARTED_MASK));
        }
    }
}

template <class TYPE>
inline
void BoundedQueue<TYPE>::pushExceptionComplete()
{
    Uint64 count = unmarkStartedOperation(&d_pushCount);

    int numToPost = static_cast<int>(count & k_STARTED_MASK);

    if (0 != numToPost && isQuiescentState(count)) {

        // The total number of pushed elements is 'count & k_STARTED_MASK'.
        // Attempt, once, to zero the count and, if successful, post to the pop
        // semaphore.

        if (AtomicOp::testAndSwapUint64AcqRel(&d_pushCount,
                                               count,
                                               0) == count) {
            d_popSemaphore.post(numToPost);
        }
    }
}

template <class TYPE>
inline
bool BoundedQueue<TYPE>::updateEmptyCountSeen(Uint emptyCount)
{
    Uint emptyCountSeen = AtomicOp::getUintAcquire(&d_emptyCountSeen);
    while (circularlyGreater(emptyCount, emptyCountSeen)) {
        const Uint origEmptyCountSeen = emptyCountSeen;

        emptyCountSeen = AtomicOp::testAndSwapUintAcqRel(&d_emptyCountSeen,
                                                         emptyCountSeen,
                                                         emptyCount);

        if (origEmptyCountSeen == emptyCountSeen) {
            return true;                                              // RETURN
        }
    }
    return false;
}

// CREATORS
template <class TYPE>
BoundedQueue<TYPE>::BoundedQueue(bsl::size_t       capacity,
                                 bslma::Allocator *basicAllocator)
: d_pushSemaphore()
, d_popSemaphore()
, d_emptyMutex()
, d_emptyCondition()
, d_element_p(0)
, d_capacity(capacity > 2 ? capacity : 2)
, d_allocator_p(bslma::Default::allocator(basicAllocator))
{
    AtomicOp::initUint64(&d_pushCount, 0);
    AtomicOp::initUint64(&d_pushIndex, 0);
    AtomicOp::initUint64(&d_popCount,  0);
    AtomicOp::initUint64(&d_popIndex,  0);

    AtomicOp::initUint(&d_emptyWaiterCount, 0);
    AtomicOp::initUint(&d_emptyCountSeen,   0);

    d_element_p = static_cast<Node *>(
                              d_allocator_p->allocate(static_cast<bsl::size_t>(
                                                  d_capacity * sizeof(Node))));

    for (bsl::size_t i = 0; i < d_capacity; ++i) {
        d_element_p[i].setIsUnconstructed(false);
    }

    d_pushSemaphore.post(static_cast<int>(d_capacity));
}

template <class TYPE>
BoundedQueue<TYPE>::~BoundedQueue()
{
    if (d_element_p) {
        removeAll();
        d_allocator_p->deallocate(d_element_p);
    }
}

// MANIPULATORS
template <class TYPE>
inline
int BoundedQueue<TYPE>::popFront(TYPE *value)
{
    int rv = d_popSemaphore.wait();
    if (rv) {
        if (bslmt::FastPostSemaphore::e_DISABLED == rv) {
            return e_DISABLED;                                        // RETURN
        }
        return e_FAILED;                                              // RETURN
    }

    popFrontHelper(value);

    return e_SUCCESS;
}

template <class TYPE>
int BoundedQueue<TYPE>::pushBack(const TYPE& value)
{
    int rv = d_pushSemaphore.wait();
    if (rv) {
        if (bslmt::FastPostSemaphore::e_DISABLED == rv) {
            return e_DISABLED;                                        // RETURN
        }
        return e_FAILED;                                              // RETURN
    }

    markStartedOperation(&d_pushCount);

    // 'd_pushIndex' stores the next location to use (want the original value)

    Uint64 index = (AtomicOp::addUint64NvAcqRel(&d_pushIndex, 1) - 1)
                                                                  % d_capacity;
    Node&  node  = d_element_p[index];

    node.setIsUnconstructed(true);

    BoundedQueue_PushExceptionCompleteProctor<BoundedQueue<TYPE> > guard(this);

    bslalg::ScalarPrimitives::copyConstruct(node.d_value.address(),
                                            value,
                                            d_allocator_p);

    guard.release();

    node.setIsUnconstructed(false);

    pushComplete();

    return e_SUCCESS;
}

template <class TYPE>
int BoundedQueue<TYPE>::pushBack(bslmf::MovableRef<TYPE> value)
{
    int rv = d_pushSemaphore.wait();
    if (rv) {
        if (bslmt::FastPostSemaphore::e_DISABLED == rv) {
            return e_DISABLED;                                        // RETURN
        }
        return e_FAILED;                                              // RETURN
    }

    markStartedOperation(&d_pushCount);

    // 'd_pushIndex' stores the next location to use (want the original value)

    Uint64 index = (AtomicOp::addUint64NvAcqRel(&d_pushIndex, 1) - 1)
                                                                  % d_capacity;
    Node&  node  = d_element_p[index];

    node.setIsUnconstructed(true);

    BoundedQueue_PushExceptionCompleteProctor<BoundedQueue<TYPE> > guard(this);

    TYPE& dummy = value;
    bslalg::ScalarPrimitives::moveConstruct(node.d_value.address(),
                                            dummy,
                                            d_allocator_p);

    guard.release();

    node.setIsUnconstructed(false);

    pushComplete();

    return e_SUCCESS;
}

template <class TYPE>
void BoundedQueue<TYPE>::removeAll()
{
    int reclaim = d_popSemaphore.takeAll();

    if (reclaim) {
        while (reclaim) {
            int count = reclaim;

            reclaim = 0;

            // For quiescent state detection (see *Implementation* *Note*) and
            // eventual 'post' to the 'd_pushSemaphore' to indicate node
            // availability, indicate 'count' remove operations have begin.

            markStartedOperation(&d_popCount, count);

            // 'd_popIndex' stores the next location to use (want the original
            // value)

            Uint64 index = AtomicOp::addUint64NvAcqRel(&d_popIndex, count)
                                                                       - count;

            for (int i = 0; i < count; ++i, ++index) {
                Node& node = d_element_p[index % d_capacity];

                if (!node.isUnconstructed()) {
                    node.d_value.object().~TYPE();
                }
                else {
                    ++reclaim;
                }
            }

            // For quiescent state detection (see *Implementation* *Note*) and
            // eventual 'post' to the 'd_pushSemaphore' to indicate node
            // availability, indicate 'count' remove operations have finished.

            Uint64 popCount = markFinishedOperation(&d_popCount, count);

            if (isQuiescentState(popCount)) {
                // The total number of popped elements is
                // 'popCount & k_STARTED_MASK'.  Attempt, once, to zero the
                // count and, if successful, post to the push semaphore.

                if (AtomicOp::testAndSwapUint64AcqRel(&d_popCount,
                                                      popCount,
                                                      0) == popCount) {
                    d_pushSemaphore.post(static_cast<int>(
                                                   popCount & k_STARTED_MASK));
                }

                Uint emptyCount = AtomicOp::getUintAcquire(
                                                          &d_emptyWaiterCount);

                if (isEmpty() && updateEmptyCountSeen(emptyCount)) {
                    {
                        bslmt::LockGuard<bslmt::Mutex> guard(&d_emptyMutex);
                    }
                    d_emptyCondition.broadcast();
                }
            }
        }
    }
}

template <class TYPE>
inline
int BoundedQueue<TYPE>::tryPopFront(TYPE *value)
{
    int rv = d_popSemaphore.tryWait();
    if (rv) {
        if (bslmt::FastPostSemaphore::e_DISABLED == rv) {
            return e_DISABLED;                                        // RETURN
        }
        if (bslmt::FastPostSemaphore::e_WOULD_BLOCK == rv) {
            return e_EMPTY;                                           // RETURN
        }
        return e_FAILED;                                              // RETURN
    }

    popFrontHelper(value);

    return e_SUCCESS;
}

template <class TYPE>
int BoundedQueue<TYPE>::tryPushBack(const TYPE& value)
{
    int rv = d_pushSemaphore.tryWait();
    if (rv) {
        if (bslmt::FastPostSemaphore::e_DISABLED == rv) {
            return e_DISABLED;                                        // RETURN
        }
        if (bslmt::FastPostSemaphore::e_WOULD_BLOCK == rv) {
            return e_FULL;                                            // RETURN
        }
        return e_FAILED;                                              // RETURN
    }

    markStartedOperation(&d_pushCount);

    // 'd_pushIndex' stores the next location to use (want the original value)

    Uint64 index = (AtomicOp::addUint64NvAcqRel(&d_pushIndex, 1) - 1)
                                                                  % d_capacity;
    Node&  node  = d_element_p[index];

    node.setIsUnconstructed(true);

    BoundedQueue_PushExceptionCompleteProctor<BoundedQueue<TYPE> > guard(this);

    bslalg::ScalarPrimitives::copyConstruct(node.d_value.address(),
                                            value,
                                            d_allocator_p);

    guard.release();

    node.setIsUnconstructed(false);

    pushComplete();

    return e_SUCCESS;
}

template <class TYPE>
int BoundedQueue<TYPE>::tryPushBack(bslmf::MovableRef<TYPE> value)
{
    int rv = d_pushSemaphore.tryWait();
    if (rv) {
        if (bslmt::FastPostSemaphore::e_DISABLED == rv) {
            return e_DISABLED;                                        // RETURN
        }
        if (bslmt::FastPostSemaphore::e_WOULD_BLOCK == rv) {
            return e_FULL;                                            // RETURN
        }
        return e_FAILED;                                              // RETURN
    }

    markStartedOperation(&d_pushCount);

    // 'd_pushIndex' stores the next location to use (want the original value)

    Uint64 index = (AtomicOp::addUint64NvAcqRel(&d_pushIndex, 1) - 1)
                                                                  % d_capacity;
    Node&  node  = d_element_p[index];

    node.setIsUnconstructed(true);

    BoundedQueue_PushExceptionCompleteProctor<BoundedQueue<TYPE> > guard(this);

    TYPE& dummy = value;
    bslalg::ScalarPrimitives::moveConstruct(node.d_value.address(),
                                            dummy,
                                            d_allocator_p);

    guard.release();

    node.setIsUnconstructed(false);

    pushComplete();

    return e_SUCCESS;
}

                       // Enqueue/Dequeue State

template <class TYPE>
inline
void BoundedQueue<TYPE>::disablePopFront()
{
    d_popSemaphore.disable();

    {
        bslmt::LockGuard<bslmt::Mutex> guard(&d_emptyMutex);
    }
    d_emptyCondition.broadcast();
}

template <class TYPE>
inline
void BoundedQueue<TYPE>::disablePushBack()
{
    d_pushSemaphore.disable();
}

template <class TYPE>
inline
void BoundedQueue<TYPE>::enablePopFront()
{
    d_popSemaphore.enable();
}

template <class TYPE>
inline
void BoundedQueue<TYPE>::enablePushBack()
{
    d_pushSemaphore.enable();
}

// ACCESSORS
template <class TYPE>
inline
bsl::size_t BoundedQueue<TYPE>::capacity() const
{
    return static_cast<bsl::size_t>(d_capacity);
}

template <class TYPE>
inline
bool BoundedQueue<TYPE>::isEmpty() const
{
    return d_capacity == static_cast<Uint64>(d_pushSemaphore.getValue());
}

template <class TYPE>
inline
bool BoundedQueue<TYPE>::isFull() const
{
    return 0 == d_pushSemaphore.getValue();
}

template <class TYPE>
inline
bool BoundedQueue<TYPE>::isPopFrontDisabled() const
{
    return d_popSemaphore.isDisabled();
}

template <class TYPE>
inline
bool BoundedQueue<TYPE>::isPushBackDisabled() const
{
    return d_pushSemaphore.isDisabled();
}

template <class TYPE>
inline
bsl::size_t BoundedQueue<TYPE>::numElements() const
{
    return d_popSemaphore.getValue();
}

template <class TYPE>
int BoundedQueue<TYPE>::waitUntilEmpty() const
{
    Uint emptyCount = AtomicOp::addUintNvAcqRel(&d_emptyWaiterCount, 1) - 1;

    int state = d_popSemaphore.getDisabledState();
    if (1 == (state & 1)) {
        return e_DISABLED;                                            // RETURN
    }

    if (isEmpty()) {
        return e_SUCCESS;                                             // RETURN
    }

    bslmt::LockGuard<bslmt::Mutex> guard(&d_emptyMutex);

    // Return successfully when this queue is empty ('isEmpty()') or this queue
    // was empty at some point since this method was invoked (the condition
    // tested by 'circularlyGreater' in the below).

    bool empty = isEmpty()
              || circularlyGreater(AtomicOp::getUintAcquire(&d_emptyCountSeen),
                                   emptyCount);

    while (!empty && state == d_popSemaphore.getDisabledState()) {
        int rv = d_emptyCondition.wait(&d_emptyMutex);
        if (rv) {
            return e_FAILED;                                          // RETURN
        }
        empty = isEmpty()
             || circularlyGreater(AtomicOp::getUintAcquire(&d_emptyCountSeen),
                                  emptyCount);
    }

    if (!empty) {
        return e_DISABLED;                                            // RETURN
    }

    return e_SUCCESS;
}

                                  // Aspects

template <class TYPE>
inline
bslma::Allocator *BoundedQueue<TYPE>::allocator() const
{
    return d_allocator_p;
}

}  // close package namespace
}  // close enterprise namespace

#endif

// ----------------------------------------------------------------------------
// Copyright 2019 Bloomberg Finance L.P.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------- END-OF-FILE ----------------------------------