// bdlcc_multipriorityqueue.h                                         -*-C++-*-

// ----------------------------------------------------------------------------
//                                   NOTICE
//
// This component is not up to date with current BDE coding standards, and
// should not be used as an example for new development.
// ----------------------------------------------------------------------------

#ifndef INCLUDED_BDLCC_MULTIPRIORITYQUEUE
#define INCLUDED_BDLCC_MULTIPRIORITYQUEUE

#include <bsls_ident.h>
BSLS_IDENT("$Id: $")

//@PURPOSE: Provide a thread-enabled parameterized multi-priority queue.
//
//@CLASSES:
//  bdlcc::MultipriorityQueue: thread-enabled, multi-priority queue
//
//@SEE_ALSO:
//
//@DESCRIPTION: This component provides a thread-enabled mechanism,
// 'bdlcc::MultipriorityQueue', implementing a special-purpose priority queue
// container of items of parameterized 'TYPE'.  Each item has a priority which,
// for efficiency of implementation, is limited to a relatively small number
// 'N' of contiguous integers '[ 0 .. N - 1 ]', with 'N' indicated at
// construction, and 0 being the most urgent priority.  This queue also takes
// an optional allocator, supplied at construction.  Once configured, these
// instance parameters remain unchanged for the life of each multi-priority
// queue.
//
///Thread-Enabled Idioms in the 'bdlcc::MultipriorityQueue' Interface
///------------------------------------------------------------------
// The thread-enabled 'bdlcc::MultipriorityQueue' is, in many regards, similar
// to a value-semantic type in that there is an obvious abstract notion of
// "value" that can be described in terms of salient attributes, which for this
// type is a sequence of priority/element pairs, constrained to be in
// increasing order of priority.  There are, however, several differences in
// method behavior and signature that arise due to the thread-enabled nature of
// the queue and its anticipated usage pattern.
//
// For example, if a queue object is empty, 'popFront' will block indefinitely
// until an element is added to the queue.  Also, since dynamic instance
// information, such as the number of elements currently in a queue, can be
// out-of-date by the time it is returned, some manipulators (e.g.,
// 'tryPopFront') are deliberately combined with an accessor operation (e.g.,
// 'isEmpty') in order to guarantee proper behavior.
//
// Finally, note that although the parameterized 'TYPE' is expected to at least
// support copy construction and assignment, the
// 'bdec::MultipriorityQueue<TYPE>' type currently does not support any
// value-semantic operations, since different queues could have different
// numbers of priorities, making comparison, assignment and copy construction
// awkward.
//
///Possible Future Enhancements
///----------------------------
// In addition to 'popFront' and 'tryPopFront', a 'bdlcc::MultipriorityQueue'
// may some day also provide a 'timedPopFront' method.  This method would block
// until it is able to complete successfully or until the specified time limit
// expires.
//
///WARNING: Synchronization Required on Destruction
///------------------------------------------------
// The behavior for the destructor is undefined unless all access or
// modification of the object is completed prior to its destruction.  Some form
// of synchronization, external to the component, is required to ensure the
// precondition on the destructor is met.  For example, if two (or more)
// threads are manipulating a queue, it is *not* safe to anticipate the number
// of elements added to the queue, and destroy that queue immediately after the
// last element is popped (without additional synchronization) because one of
// the corresponding push functions may not have completed (push may, for
// instance, signal waiting threads after the element is considered added to
// the queue).
//
///Usage
///-----
// This section illustrates intended use of this component.
//
///Example 1: Simple Thread Pool
///- - - - - - - - - - - - - - -
// This example demonstrates how we might use a 'bdlcc::MultipriorityQueue' to
// communicate between a single "producer" thread and multiple "consumer"
// threads.  The "producer" pushes work requests of varying priority onto the
// queue, and each "consumer" iteratively takes the highest priority work
// request from the queue and services it.
//
// We begin our example with some utility classes that define a simple "work
// item":
//..
//  enum {
//      k_MAX_CONSUMER_THREADS = 10
//  };
//
//  struct MyWorkData {
//      int d_i;        // input to work to be done
//
//      // Work data...
//  };
//
//  struct MyWorkRequest {
//      enum RequestType {
//          e_WORK = 1,
//          e_STOP = 2
//      };
//
//      RequestType d_type;
//      MyWorkData  d_data;
//
//      // Work data...
//  };
//..
// Next, we provide a simple function to service an individual work item, and a
// function to get a work item.  The details are unimportant for this example:
//..
//  void myDoWork(MyWorkData& data)
//  {
//      // Do work...
//      (void)data;
//  }
//
//  int getWorkData(MyWorkData *result)
//  {
//      static int count = 0;
//      result->d_i = rand();   // Only one thread runs this routine, so it
//                              // does not matter that 'rand()' is not
//                              // thread-safe, or that 'count' is 'static'.
//
//      return ++count >= 100;
//  }
//..
// The 'myConsumer' function (below) will pop elements off the queue in
// priority order and process them.  As discussed above, note that the call to
// 'queue->popFront(&item)' will block until there is an element available on
// the queue.  This function will be executed in multiple threads, so that each
// thread waits in 'queue->popFront()'; 'bdlcc::MultipriorityQueue' guarantees
// that each thread gets a unique element from the queue:
//..
//  void myConsumer(bdlcc::MultipriorityQueue<MyWorkRequest> *queue)
//  {
//      MyWorkRequest item;
//      while (1) {
//
//          // The 'popFront' function will wait for a 'MyWorkRequest' until
//          // one is available.
//
//          queue->popFront(&item);
//
//          if (MyWorkRequest::e_STOP == item.d_type) {
//              break;
//          }
//
//          myDoWork(item.d_data);
//      }
//  }
//..
// The 'myConsumerThread' function below is a callback for 'bslmt::ThreadUtil',
// which requires a "C" signature.  'bslmt::ThreadUtil::create()' expects a
// pointer to this function, and provides that function pointer to the
// newly-created thread.  The new thread then executes this function.
//
// Since 'bslmt::ThreadUtil::create()' uses the familiar "C" convention of
// passing a 'void' pointer, our function simply casts that pointer to our
// required type ('bdlcc::MultipriorityQueue<MyWorkRequest> *'), and then
// delegates to the queue-specific function 'myConsumer' (above):
//..
//  extern "C" void *myConsumerThread(void *queuePtr)
//  {
//      myConsumer ((bdlcc::MultipriorityQueue<MyWorkRequest>*) queuePtr);
//      return queuePtr;
//  }
//..
// In this simple example, the 'myProducer' function (below) serves multiple
// roles: it creates the 'bdlcc::MultipriorityQueue', starts the consumer
// threads, and then produces and queues work items.  When work requests are
// exhausted, this function queues one 'e_STOP' item for each consumer thread.
//
// When each consumer thread reads a 'e_STOP', it terminates its
// thread-handling function.  Note that, although the producer cannot control
// which thread pops a particular work item, it can rely on the knowledge that
// each consumer thread will read a single 'e_STOP' item and then terminate.
//
// Finally, the 'myProducer' function "joins" each consumer thread, which
// ensures that the thread itself will terminate correctly (see the
// 'bslmt_threadutil' component-level documentation for details):
//..
//  void myProducer()
//  {
//      enum {
//          k_NUM_PRIORITIES = 8,
//          k_NUM_THREADS    = 8
//      };
//
//      MyWorkRequest item;
//      MyWorkData    workData;
//
//      // Create multi-priority queue with specified number of priorities.
//
//      bdlcc::MultipriorityQueue<MyWorkRequest> queue(k_NUM_PRIORITIES);
//
//      // Start the specified number of threads.
//
//      assert(0 < k_NUM_THREADS
//          && k_NUM_THREADS <= static_cast<int>(k_MAX_CONSUMER_THREADS));
//      bslmt::ThreadUtil::Handle consumerHandles[k_MAX_CONSUMER_THREADS];
//
//      for (int i = 0; i < k_NUM_THREADS; ++i) {
//          bslmt::ThreadUtil::create(&consumerHandles[i],
//                                   myConsumerThread,
//                                   &queue);
//      }
//
//      // Load work data into work requests and push them onto the queue with
//      // varying priority until all work data has been exhausted.
//
//      int count = 0;                          // used to generate priorities
//
//      while (!getWorkData(&workData)) {       // see declaration (above)
//          item.d_type = MyWorkRequest::e_WORK;
//          item.d_data = workData;
//          queue.pushBack(item, count % k_NUM_PRIORITIES);  // mixed
//                                                           // priorities
//          ++count;
//      }
//
//      // Load as many stop requests as there are active consumer threads.
//
//      for (int i = 0; i < k_NUM_THREADS; ++i) {
//          item.d_type = MyWorkRequest::e_STOP;
//          queue.pushBack(item, k_NUM_PRIORITIES - 1);  // lowest priority
//      }
//
//      // Join all of the consumer threads back with the main thread.
//
//      for (int i = 0; i < k_NUM_THREADS; ++i) {
//          bslmt::ThreadUtil::join(consumerHandles[i]);
//      }
//  }
//..
//
///Example 2: Multi-Threaded Observer
/// - - - - - - - - - - - - - - - - -
// The previous example shows a simple mechanism for distributing work requests
// over multiple threads.  This approach works well for large tasks that can be
// decomposed into discrete, independent tasks that can benefit from parallel
// execution.  Note also that the various threads are synchronized only at the
// end of execution, when the producer "joins" the various consumer threads.
//
// The simple strategy used in the first example works well for tasks that
// share no state, and are completely independent of one another.  For
// instance, a web server might use a similar strategy to distribute 'http'
// requests across multiple worker threads.
//
// In more complicated examples, it is often necessary or desirable to
// synchronize the separate tasks during execution.  The second example below
// shows a single "Observer" mechanism that receives event notification from
// the various worker threads.
//
// We first create a simple 'MyEvent' data type.  Worker threads will use this
// type to report information about their work.  In our example, we will report
// the "worker Id", the event number, and some arbitrary text.
//
// As with the previous example, class 'MyEvent' also contains an 'EventType',
// an enumeration that indicates whether the worker has completed all work.
// The "Observer" will use this enumerated value to note when a worker thread
// has completed its work:
//..
//  enum {
//      k_MAX_CONSUMER_THREADS = 10,
//      k_MAX_EVENT_TEXT       = 80
//  };
//
//  struct MyEvent {
//      enum EventType {
//          e_IN_PROGRESS   = 1,
//          e_TASK_COMPLETE = 2
//      };
//
//      EventType d_type;
//      int       d_workerId;
//      int       d_eventNumber;
//      char      d_eventText[k_MAX_EVENT_TEXT];
//  };
//..
// As noted in the previous example, 'bslmt::ThreadUtil::create()' spawns a new
// thread, which invokes a simple "C" function taking a 'void' pointer.  In the
// previous example, we simply converted that 'void' pointer into a pointer to
// 'bdlcc::MultipriorityQueue<MyWorkRequest>'.
//
// In this example, however, we want to pass an additional data item.  Each
// worker thread is initialized with a unique integer value ("worker Id"),
// which identifies that thread.  We therefore create a simple 'struct' that
// contains both of these values:
//..
//  struct MyWorkerData {
//      int                               d_workerId;
//      bdlcc::MultipriorityQueue<MyEvent> *d_queue;
//  };
//..
// Function 'myWorker' (below) simulates a working thread by enqueuing multiple
// 'MyEvent' events during execution.  In a realistic application, each
// 'MyEvent' structure would likely contain different textual information.  For
// the sake of simplicity, however, our loop uses a constant value for the text
// field.  Note that various priorities are generated to illustrate the
// multi-priority aspect of this particular queue:
//..
//  void myWorker(int workerId, bdlcc::MultipriorityQueue<MyEvent> *queue)
//  {
//      const int N = queue->numPriorities();
//      const int NUM_EVENTS = 5;
//      int eventNumber;    // used also to generate mixed priorities
//
//      // First push 'NUM_EVENTS' events onto 'queue' with mixed priorities.
//
//      for (eventNumber = 0; eventNumber < NUM_EVENTS; ++eventNumber) {
//          MyEvent ev = {
//              MyEvent::e_IN_PROGRESS,
//              workerId,
//              eventNumber,
//              "In-Progress Event"         // constant (for simplicity)
//          };
//          queue->pushBack(ev, eventNumber % N);       // mixed priorities
//      }
//
//      // Now push an event to end this task.
//
//      MyEvent ev = {
//          MyEvent::e_TASK_COMPLETE,
//          workerId,
//          eventNumber,
//          "Task Complete"
//      };
//      queue->pushBack(ev, N - 1);                     // lowest priority
//  }
//..
// The callback function 'myWorkerThread' (below) invoked by
// 'bslmt::ThreadUtil::create' takes the traditional 'void' pointer.  The
// expected data is the composite structure 'MyWorkerData'.  The callback
// function casts the 'void' pointer to the application-specific data type and
// then uses the referenced object to construct a call to the 'myWorker'
// function:
//..
//  extern "C" void *myWorkerThread(void *vWorkerPtr)
//  {
//      MyWorkerData *workerPtr = (MyWorkerData *)vWorkerPtr;
//      myWorker(workerPtr->d_workerId, workerPtr->d_queue);
//      return vWorkerPtr;
//  }
//..
// For the sake of simplicity, we will implement the Observer behavior (below)
// in the main thread.  The 'void' function 'myObserver' starts multiple
// threads running the 'myWorker' function, reads 'MyEvent' values from the
// queue, and logs all messages in the order of arrival.
//
// As each 'myWorker' thread terminates, it sends a 'e_TASK_COMPLETE' event.
// Upon receiving this event, the 'myObserver' function uses the 'd_workerId'
// to find the relevant thread, and then "joins" that thread.
//
// The 'myObserver' function determines when all tasks have completed simply by
// counting the number of 'e_TASK_COMPLETE' messages received:
//..
//  void myObserver()
//  {
//      const int k_NUM_THREADS    = 10;
//      const int k_NUM_PRIORITIES = 4;
//
//      bdlcc::MultipriorityQueue<MyEvent> queue(k_NUM_PRIORITIES);
//
//      assert(0 < k_NUM_THREADS
//          && k_NUM_THREADS <= static_cast<int>(k_MAX_CONSUMER_THREADS));
//      bslmt::ThreadUtil::Handle workerHandles[k_MAX_CONSUMER_THREADS];
//
//      // Create 'k_NUM_THREADS' threads, each having a unique "worker id".
//
//      MyWorkerData workerData[k_NUM_THREADS];
//      for (int i = 0; i < k_NUM_THREADS; ++i) {
//          workerData[i].d_queue = &queue;
//          workerData[i].d_workerId = i;
//          bslmt::ThreadUtil::create(&workerHandles[i],
//                                   myWorkerThread,
//                                   &workerData[i]);
//      }
//
//      // Now print out each of the 'MyEvent' values as the threads complete.
//      // This function ends after a total of 'k_NUM_THREADS'
//      // 'MyEvent::e_TASK_COMPLETE' events have been printed.
//
//      int nStop = 0;
//      while (nStop < k_NUM_THREADS) {
//          MyEvent ev;
//          queue.popFront(&ev);
//          bsl::cout << "[" << ev.d_workerId << "] "
//                    << ev.d_eventNumber << ". "
//                    << ev.d_eventText << bsl::endl;
//          if (MyEvent::e_TASK_COMPLETE == ev.d_type) {
//              ++nStop;
//              bslmt::ThreadUtil::join(workerHandles[ev.d_workerId]);
//          }
//      }
//  }
//..

#include <bdlscm_version.h>

#include <bdlb_bitutil.h>

#include <bdlma_concurrentpool.h>

#include <bslalg_constructorproxy.h>

#include <bslma_allocator.h>
#include <bslma_deallocatorproctor.h>
#include <bslma_default.h>
#include <bslma_managedptr.h>
#include <bslma_usesbslmaallocator.h>

#include <bslmf_movableref.h>
#include <bslmf_nestedtraitdeclaration.h>

#include <bslmt_condition.h>
#include <bslmt_lockguard.h>
#include <bslmt_mutex.h>
#include <bslmt_threadutil.h>

#include <bsls_assert.h>
#include <bsls_atomic.h>

#include <bsl_climits.h>
#include <bsl_cstdint.h>
#include <bsl_new.h>
#include <bsl_vector.h>

#ifndef BDE_DONT_ALLOW_TRANSITIVE_INCLUDES
#include <bslalg_typetraits.h>
#endif // BDE_DONT_ALLOW_TRANSITIVE_INCLUDES

namespace BloombergLP {
namespace bdlcc {

                 // =========================================
                 // local class MultipriorityQueue_Node<TYPE>
                 // =========================================

template <class TYPE>
class MultipriorityQueue_Node {
    // This class handles storage of one item of parameterized 'TYPE' as a node
    // in a linked list of items stored in a multipriority queue for a given
    // priority.  This class is not to be used from outside this component.

    // DATA
    bslalg::ConstructorProxy<TYPE>  d_item;    // object stored in node
    MultipriorityQueue_Node<TYPE>  *d_next_p;  // next node on linked list

  private:
    // NOT IMPLEMENTED
    MultipriorityQueue_Node(const MultipriorityQueue_Node&);
    MultipriorityQueue_Node& operator=(const MultipriorityQueue_Node&);

  public:
    // TRAITS
    BSLMF_NESTED_TRAIT_DECLARATION(MultipriorityQueue_Node,
                                   bslma::UsesBslmaAllocator);

    // CREATORS
    MultipriorityQueue_Node(const TYPE&              item,
                            bslma::Allocator        *basicAllocator);
        // Create a node containing a copy of the specified 'item' and having
        // the specified 'next' pointer.  Use the specified 'basicAllocator' to
        // supply memory.  The behavior is undefined unless 'basicAllocator' is
        // non-null.  Note that 'item' must be copyable and assignable.

    MultipriorityQueue_Node(bslmf::MovableRef<TYPE>  item,
                            bslma::Allocator        *basicAllocator);
        // Create a node containing the value of the specified 'item' and
        // having the specified 'next' pointer.  'item' is left in a valid but
        // unspecified state.  Use the specified 'basicAllocator' to supply
        // memory.  The behavior is undefined unless 'basicAllocator' is
        // non-null.

    ~MultipriorityQueue_Node();
        // Destroy this node and free all memory that was allocated on its
        // behalf, if any.

    // MANIPULATORS
    TYPE& item();
        // Return a reference to the non-modifiable item stored in this node.

    MultipriorityQueue_Node*& nextPtr();
        // Return a reference to the modifiable pointer to the node following
        // this node on the linked list.

    // ACCESSORS
    const MultipriorityQueue_Node *nextPtr() const;
        // Return a pointer to the non-modifiable node following this node on
        // the linked list, or 0 if this node has no successor.
};

                       // ==============================
                       // class MultipriorityQueue<TYPE>
                       // ==============================

template <class TYPE>
class MultipriorityQueue {
    // This class implements a thread-enabled multipriority queue whose
    // priorities are restricted to a (small) set of contiguous 'N' integer
    // values, '[ 0 .. N - 1 ]', with 0 being the most urgent.
    //
    // This class does have a notion of value, namely the sequence of
    // priority/element pairs, constrained to be in decreasing order of urgency
    // (i.e., monotonically increasing priority values).  However, no
    // value-semantic operations are implemented.  Note that elements having
    // the same priority are maintained in First-In-First-Out (FIFO) order.
    // Note that the current implementation supports up to a maximum of
    // 'sizeof(int) * CHAR_BIT' priorities.
    //
    // This class is implemented as a set of linked lists, one for each
    // priority.  Two vectors are used to maintain head and tail pointers for
    // the lists.

    // PRIVATE CONSTANTS
    enum {
        k_BITS_PER_INT           = sizeof(int) * CHAR_BIT,
        k_DEFAULT_NUM_PRIORITIES = k_BITS_PER_INT,
        k_MAX_NUM_PRIORITIES     = k_BITS_PER_INT
    };

    // PRIVATE TYPES
    typedef MultipriorityQueue_Node<TYPE> Node;
        // The type of the elements on the linked lists of items that are
        // maintained for the 'N' priorities handled by this multipriority
        // queue.

    typedef bsl::vector<Node *>                NodePtrVector;
        // The type of the vectors of list head and tail pointers.

    // DATA
    mutable bslmt::Mutex  d_mutex;         // used to synchronize access
                                           // (including 'const' access)

    bslmt::Condition      d_notEmptyCondition;
                                           // signaled on each push

    NodePtrVector         d_heads;         // pointers to heads of linked lists
                                           // -- one for each priority

    NodePtrVector         d_tails;         // pointers to tails of linked lists
                                           // -- one for each priority

    int                   d_notEmptyFlags; // bit mask indicating priorities
                                           // for which there is data, where
                                           // bit 0 is the lowest order bit,
                                           // representing most urgent priority

    bdlma::ConcurrentPool d_pool;          // memory pool used for node storage

    bsls::AtomicInt       d_length;        // total number of items in this
                                           // multipriority queue

    bool                  d_enabledFlag;   // enabled/disabled state of pushes
                                           // to the multipriority queue (does
                                           // not affect pops)

    bslma::Allocator     *d_allocator_p;   // memory allocator (held)

  private:
    // NOT IMPLEMENTED
    MultipriorityQueue(const MultipriorityQueue&);
    MultipriorityQueue& operator=(const MultipriorityQueue&);

  private:
    // PRIVATE MANIPULATORS
    int tryPopFrontImpl(TYPE *item, int *itemPriority, bool blockFlag);
        // Attempt to remove (immediately) the least-recently added item having
        // the most urgent priority (lowest value) from this multipriority
        // queue.  If the specified 'blockFlag' is 'true', this method blocks
        // the calling thread until an item becomes available.  On success,
        // load the value of the popped item into the specified 'item'; if the
        // specified 'itemPriority' is non-null, load the priority of the
        // popped item into 'itemPriority'; and return 0.  Otherwise, leave
        // 'item' and 'itemPriority' unmodified, and return a non-zero value
        // indicating that this multipriority queue was empty.  The behavior is
        // undefined unless 'item' is non-null.  Note that a non-zero value can
        // be returned only if 'blockFlag' is 'false'.

  public:
    // TRAITS
    BSLMF_NESTED_TRAIT_DECLARATION(MultipriorityQueue,
                                   bslma::UsesBslmaAllocator);

    // CREATORS
    explicit MultipriorityQueue(bslma::Allocator *basicAllocator = 0);
    explicit MultipriorityQueue(int               numPriorities,
                                bslma::Allocator *basicAllocator = 0);
        // Create a multipriority queue.  Optionally specify 'numPriorities',
        // the number of distinct priorities supported by the multipriority
        // queue.  If 'numPriorities' is not specified, the
        // (implementation-imposed maximum) number 32 is used.  Optionally
        // specify a 'basicAllocator' used to supply memory.  If
        // 'basicAllocator' is 0, the currently installed default allocator is
        // used.  The behavior is undefined unless '1 <= numPriorities <= 32'
        // (if specified).

    ~MultipriorityQueue();
        // Destroy this container.  The behavior is undefined unless all access
        // or modification of the container has completed prior to this call.

    // MANIPULATORS
    void popFront(TYPE *item, int *itemPriority = 0);
        // Remove the least-recently added item having the most urgent priority
        // (lowest value) from this multi-priority queue and load its value
        // into the specified 'item'.  If this queue is empty, this method
        // blocks the calling thread until an item becomes available.  If the
        // optionally specified 'itemPriority' is non-null, load the priority
        // of the popped item into 'itemPriority'.  The behavior is undefined
        // unless 'item' is non-null.  Note this is unaffected by the enabled /
        // disabled state of the queue.

    int pushBack(const TYPE& item, int itemPriority);
        // Insert the value of the specified 'item' with the specified
        // 'itemPriority' into this multipriority queue before any queued items
        // having a less urgent priority (higher value) than 'itemPriority',
        // and after any items having the same or more urgent priority (lower
        // value) than 'itemPriority'.  If the multipriority queue is enabled,
        // the push succeeds and '0' is returned, otherwise the push fails, the
        // queue remains unchanged, and a nonzero value is returned.  The
        // behavior is undefined unless '0 <= itemPriority < numPriorities()'.

    int pushBack(bslmf::MovableRef<TYPE> item, int itemPriority);
        // Insert the value of the specified 'item' with the specified
        // 'itemPriority' into this multipriority queue before any queued items
        // having a less urgent priority (higher value) than 'itemPriority',
        // and after any items having the same or more urgent priority (lower
        // value) than 'itemPriority'.  'item' is left in a valid but
        // unspecified state.  If the multipriority queue is enabled, the push
        // succeeds and '0' is returned, otherwise the push fails, the queue
        // remains unchanged, and a nonzero value is returned.  The behavior is
        // undefined unless '0 <= itemPriority < numPriorities()'.

    void pushBackMultipleRaw(const TYPE& item, int itemPriority, int numItems);
        // Insert the value of the specified 'item' with the specified
        // 'itemPriority' onto the back of this multipriority queue before any
        // queued items having a less urgent priority (higher value) than
        // 'itemPriority', and after any items having the same or more urgent
        // priority (lower value) than 'itemPriority'.  All of the specified
        // 'numItems' items are pushed as a single atomic action, unless the
        // copy constructor for one of them throws an exception, in which case
        // a possibly empty subset of the pushes will have completed and no
        // memory will be leaked.  'Raw' means that the push will succeed even
        // if the multipriority queue is disabled.  Note that this method is
        // targeted for specific use by the class
        // 'bdlmt::MultipriorityThreadPool'.  The behavior is undefined unless
        // '0 <= itemPriority < numPriorities()'.

    void pushFrontMultipleRaw(const TYPE& item,
                              int         itemPriority,
                              int         numItems);
        // Insert the value of the specified 'item' with the specified
        // 'itemPriority' into the front of this multipriority queue the
        // specified 'numItems' times, before any queued items having the same
        // or less urgent priority (higher value) than 'itemPriority', and
        // after any items having more urgent priority (lower value) than
        // 'itemPriority'.  All 'numItems' items are pushed as a single atomic
        // action, unless the copy constructor throws while creating one of
        // them, in which case a possibly empty subset of the pushes will have
        // completed and no memory will be leaked.  'Raw' means that the push
        // will succeed even if the multipriority queue is disabled.  The
        // behavior is undefined unless '0 <= itemPriority < numPriorities()'.
        // Note that this method is targeted at specific uses by the class
        // 'bdlmt::MultipriorityThreadPool'.

    int tryPopFront(TYPE *item, int *itemPriority = 0);
        // Attempt to remove (immediately) the least-recently added item having
        // the most urgent priority (lowest value) from this multi-priority
        // queue.  On success, load the value of the popped item into the
        // specified 'item'; if the optionally specified 'itemPriority' is
        // non-null, load the priority of the popped item into 'itemPriority';
        // and return 0.  Otherwise, leave 'item' and 'itemPriority'
        // unmodified, and return a non-zero value indicating that this queue
        // was empty.  The behavior is undefined unless 'item' is non-null.
        // Note this is unaffected by the enabled / disabled state of the
        // queue.

    void removeAll();
        // Remove and destroy all items from this multi-priority queue.

    void enable();
        // Enable pushes to this multipriority queue.  This method has no
        // effect unless the queue was disabled.

    void disable();
        // Disable pushes to this multipriority queue.  This method has no
        // effect unless the queue was enabled.

    // ACCESSORS
    int numPriorities() const;
        // Return the number of distinct priorities (indicated at construction)
        // that are supported by this multi-priority queue.

    int length() const;
        // Return the total number of items in this multi-priority queue.

    bool isEmpty() const;
        // Return 'true' if there are no items in this multi-priority queue,
        // and 'false' otherwise.

    bool isEnabled() const;
        // Return 'true' if this multipriority queue is enable and 'false'
        // otherwise.
};

// ============================================================================
//                            INLINE DEFINITIONS
// ============================================================================

                 // -----------------------------------------
                 // local class MultipriorityQueue_Node<TYPE>
                 // -----------------------------------------

// CREATORS
template <class TYPE>
inline
MultipriorityQueue_Node<TYPE>::MultipriorityQueue_Node(
                                              const TYPE&       item,
                                              bslma::Allocator *basicAllocator)
: d_item(item, basicAllocator)
, d_next_p(0)
{}

template <class TYPE>
inline
MultipriorityQueue_Node<TYPE>::MultipriorityQueue_Node(
                                       bslmf::MovableRef<TYPE>  item,
                                       bslma::Allocator        *basicAllocator)
: d_item(bslmf::MovableRefUtil::move(item), basicAllocator)
, d_next_p(0)
{}

template <class TYPE>
inline
MultipriorityQueue_Node<TYPE>::~MultipriorityQueue_Node()
{}

// MANIPULATORS
template <class TYPE>
inline
TYPE& MultipriorityQueue_Node<TYPE>::item()
{
    return d_item.object();
}

template <class TYPE>
inline
MultipriorityQueue_Node<TYPE>*& MultipriorityQueue_Node<TYPE>::nextPtr()
{
    return d_next_p;
}

// ACCESSORS
template <class TYPE>
inline
const MultipriorityQueue_Node<TYPE> *
                            MultipriorityQueue_Node<TYPE>::nextPtr() const
{
    return d_next_p;
}

                       // ------------------------------
                       // class MultipriorityQueue<TYPE>
                       // ------------------------------

// PRIVATE MANIPULATORS
template <class TYPE>
int MultipriorityQueue<TYPE>::tryPopFrontImpl(TYPE *item,
                                              int  *itemPriority,
                                              bool  blockFlag)
{
    enum { e_SUCCESS = 0, e_FAILURE = -1 };

    Node *condemned;
    int priority;

    BSLS_ASSERT(item);

    {
        bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);

        while (0 == d_length) {
            // Note that if we get a spurious signal, we will check the
            // 'blockFlag' unnecessarily, but that will typically be a rare
            // occurrence.  This arrangement minimizes the time taken in the
            // case where '0 != d_length', which will typically be a frequent
            // occurrence.

            if (blockFlag) {
                d_notEmptyCondition.wait(&d_mutex);
            }
            else {
                return e_FAILURE;                                     // RETURN
            }
        }

        priority = bdlb::BitUtil::numTrailingUnsetBits(
                                               (bsl::uint32_t)d_notEmptyFlags);
        BSLS_ASSERT(priority < k_MAX_NUM_PRIORITIES);
            // verifies there is at least one priority bit set.  Note that
            // 'numTrailingUnsetBits' cannot return a negative value.

        Node *& head = d_heads[priority];
        condemned = head;

        *item = bslmf::MovableRefUtil::move(condemned->item());  // might throw

        head = head->nextPtr();
        if (0 == head) {
            // The last item with this priority was just popped.

            BSLS_ASSERT(d_tails[priority] == condemned);
            d_notEmptyFlags &= ~(1 << priority);
        }

        --d_length;
    }

    if (itemPriority) {
        *itemPriority = priority;
    }

    condemned->~Node();
    d_pool.deallocate(condemned);

    return e_SUCCESS;
}

// CREATORS
template <class TYPE>
MultipriorityQueue<TYPE>::MultipriorityQueue(bslma::Allocator *basicAllocator)
: d_heads((typename NodePtrVector::size_type)k_DEFAULT_NUM_PRIORITIES, 0,
          basicAllocator)
, d_tails((typename NodePtrVector::size_type)k_DEFAULT_NUM_PRIORITIES, 0,
          basicAllocator)
, d_notEmptyFlags(0)
, d_pool(sizeof(Node), bslma::Default::allocator(basicAllocator))
, d_length(0)
, d_enabledFlag(true)
, d_allocator_p(bslma::Default::allocator(basicAllocator))
{
}

template <class TYPE>
MultipriorityQueue<TYPE>::MultipriorityQueue(int               numPriorities,
                                             bslma::Allocator *basicAllocator)
: d_heads((typename NodePtrVector::size_type)numPriorities, 0, basicAllocator)
, d_tails((typename NodePtrVector::size_type)numPriorities, 0, basicAllocator)
, d_notEmptyFlags(0)
, d_pool(sizeof(Node), bslma::Default::allocator(basicAllocator))
, d_length(0)
, d_enabledFlag(true)
, d_allocator_p(bslma::Default::allocator(basicAllocator))
{
    BSLS_ASSERT(1                       <= numPriorities);
    BSLS_ASSERT(k_MAX_NUM_PRIORITIES >= numPriorities);
}

template <class TYPE>
MultipriorityQueue<TYPE>::~MultipriorityQueue()
{
    removeAll();

    typename NodePtrVector::iterator it;
    typename NodePtrVector::iterator endIt;

    for (it = d_heads.begin(), endIt = d_heads.end(); endIt != it; ++it) {
        BSLS_ASSERT(!*it);
    }

    // Tails do not get set to null by 'removeAll', so are indeterminate.

    BSLS_ASSERT(isEmpty());
    BSLS_ASSERT(0 == d_notEmptyFlags);
}

// MANIPULATORS
template <class TYPE>
inline
void MultipriorityQueue<TYPE>::popFront(TYPE *item, int *itemPriority)
{
    tryPopFrontImpl(item, itemPriority, true);
}

template <class TYPE>
int MultipriorityQueue<TYPE>::pushBack(const TYPE& item, int itemPriority)
{
    enum { e_SUCCESS = 0, e_FAILURE = -1 };

    BSLS_ASSERT((unsigned)itemPriority < d_heads.size());

    // Allocate and copy construct.  Note we are doing this work outside the
    // mutex, which is advantageous in that no one is waiting on us, but it has
    // the disadvantage that we haven't checked whether this multipriority
    // queue is disabled, in which case we'll throw the new node away.

    // Note the queue being disabled is not the usual case.  Note a race
    // condition occurs if we check d_enabledFlag outside the mutex.

    Node *newNode = (Node *)d_pool.allocate();
    bslma::DeallocatorProctor<bdlma::ConcurrentPool> deallocator(newNode,
                                                                 &d_pool);

    ::new (newNode) Node(item, d_allocator_p);                   // might throw
    deallocator.release();
    bslma::ManagedPtr<Node> deleter(newNode, &d_pool);

    {
        bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);

        if (!d_enabledFlag) {
            return e_FAILURE;                                         // RETURN
        }

        deleter.release();

        const int mask = 1 << itemPriority;
        if (d_notEmptyFlags & mask) {
            d_tails[itemPriority]->nextPtr() = newNode;
        }
        else {
            d_heads[itemPriority] = newNode;
            d_notEmptyFlags |= mask;
        }
        d_tails[itemPriority] = newNode;

        ++d_length;
    }

    d_notEmptyCondition.signal();

    return e_SUCCESS;
}

template <class TYPE>
int MultipriorityQueue<TYPE>::pushBack(bslmf::MovableRef<TYPE> item,
                                       int                     itemPriority)
{
    enum { e_SUCCESS = 0, e_FAILURE = -1 };

    BSLS_ASSERT((unsigned)itemPriority < d_heads.size());

    // Allocate and copy construct.  Note we are doing this work outside the
    // mutex, which is advantageous in that no one is waiting on us, but it has
    // the disadvantage that we haven't checked whether this multipriority
    // queue is disabled, in which case we'll throw the new node away.
    //
    // Note the queue being disabled is not the usual case.  Note a race
    // condition occurs if we check d_enabledFlag outside the mutex.

    Node *newNode = static_cast<Node *>(d_pool.allocate());
    bslma::DeallocatorProctor<bdlma::ConcurrentPool> deallocator(newNode,
                                                                 &d_pool);

    {
        bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);

        // Do the enable check before the move, since if it is a move and not a
        // copy, there's no backing out after that.

        if (!d_enabledFlag) {
            return e_FAILURE;                                         // RETURN
        }

        ::new (newNode) Node(bslmf::MovableRefUtil::move(item),  // might throw
                             d_allocator_p);
        deallocator.release();

        const int mask = 1 << itemPriority;
        if (d_notEmptyFlags & mask) {
            d_tails[itemPriority]->nextPtr() = newNode;
        }
        else {
            d_heads[itemPriority] = newNode;
            d_notEmptyFlags |= mask;
        }
        d_tails[itemPriority] = newNode;

        ++d_length;
    }

    d_notEmptyCondition.signal();

    return e_SUCCESS;
}

template <class TYPE>
void MultipriorityQueue<TYPE>::pushBackMultipleRaw(const TYPE& item,
                                                   int         itemPriority,
                                                   int         numItems)
{
    BSLS_ASSERT((unsigned)itemPriority < d_heads.size());

    const int mask = 1 << itemPriority;

    {
        bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);

        for (int ii = 0; ii < numItems; ++ii) {
            Node *newNode = (Node *)d_pool.allocate();
            bslma::DeallocatorProctor<bdlma::ConcurrentPool> deallocator(
                                                             newNode, &d_pool);

            ::new (newNode) Node(item, d_allocator_p);           // might throw
            deallocator.release();

            if (d_notEmptyFlags & mask) {
                d_tails[itemPriority]->nextPtr() = newNode;
            }
            else {
                d_heads[itemPriority] = newNode;
                d_notEmptyFlags |= mask;
            }
            d_tails[itemPriority] = newNode;

            ++d_length;
        }
    }

    for (int ii = 0; ii < numItems; ++ii) {
        d_notEmptyCondition.signal();
    }
}

template <class TYPE>
void MultipriorityQueue<TYPE>::pushFrontMultipleRaw(const TYPE& item,
                                                    int         itemPriority,
                                                    int         numItems)
{
    BSLS_ASSERT((unsigned)itemPriority < d_heads.size());

    const int mask = 1 << itemPriority;

    {
        bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);

        for (int ii = 0; ii < numItems; ++ii) {
            Node *newNode = (Node *)d_pool.allocate();
            bslma::DeallocatorProctor<bdlma::ConcurrentPool> deallocator(
                                                             newNode, &d_pool);

            ::new (newNode) Node(item, d_allocator_p);           // might throw
            deallocator.release();

            Node *& head = d_heads[itemPriority];
            if (!head) {
                d_tails[itemPriority] = newNode;
                d_notEmptyFlags |= mask;
            }
            newNode->nextPtr() = head;
            head = newNode;

            ++d_length;
        }
    }

    for (int ii = 0; ii < numItems; ++ii) {
        d_notEmptyCondition.signal();
    }
}

template <class TYPE>
inline
int MultipriorityQueue<TYPE>::tryPopFront(TYPE *item, int *itemPriority)
{
    return tryPopFrontImpl(item, itemPriority, false);
}

template <class TYPE>
void MultipriorityQueue<TYPE>::removeAll()
{
    Node *condemnedList = 0;

    {
        bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);

        while (d_notEmptyFlags) {
            const int priority = bdlb::BitUtil::numTrailingUnsetBits(
                                  static_cast<bsl::uint32_t>(d_notEmptyFlags));

            Node *& head = d_heads[priority];
            BSLS_ASSERT(head);

            d_tails[priority]->nextPtr() = condemnedList;
            condemnedList = head;

            head = 0;

            d_notEmptyFlags &= ~(1 << priority);
        }

        BSLS_ASSERT(0 == d_notEmptyFlags);

        d_length = 0;
    }

    Node *node = condemnedList;
    while (node) {
        Node *condemnedNode = node;
        node = node->nextPtr();

        condemnedNode->~Node();
        d_pool.deallocate(condemnedNode);
    }
}

template <class TYPE>
inline
void MultipriorityQueue<TYPE>::enable()
{
    bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);

    d_enabledFlag = true;
}

template <class TYPE>
inline
void MultipriorityQueue<TYPE>::disable()
{
    bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);

    d_enabledFlag = false;
}

// ACCESSORS
template <class TYPE>
inline
int MultipriorityQueue<TYPE>::numPriorities() const
{
    return static_cast<int>(d_heads.size());
}

template <class TYPE>
inline
int MultipriorityQueue<TYPE>::length() const
{
    return d_length;
}

template <class TYPE>
inline
bool MultipriorityQueue<TYPE>::isEmpty() const
{
    return 0 == d_length;
}

template <class TYPE>
inline
bool MultipriorityQueue<TYPE>::isEnabled() const
{
    return d_enabledFlag;
}

}  // close package namespace
}  // close enterprise namespace

#endif

// ----------------------------------------------------------------------------
// Copyright 2015 Bloomberg Finance L.P.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------- END-OF-FILE ----------------------------------