// bdlmt_multiqueuethreadpool.h                                       -*-C++-*-

#ifndef INCLUDED_BDLMT_MULTIQUEUETHREADPOOL
#define INCLUDED_BDLMT_MULTIQUEUETHREADPOOL

#include <bsls_ident.h>
BSLS_IDENT("$Id: $")

//@PURPOSE: Provide a pool of queues, each processed serially by a thread pool.
//
//@CLASSES:
// bdlmt::MultiQueueThreadPool: multi-threaded, serial processing of queues
//
//@SEE_ALSO: bdlmt_threadpool
//
//@DESCRIPTION: This component defines a dynamic, configurable pool of queues,
// each of which is processed by a thread in a thread pool, such that elements
// on a given queue are processed serially, regardless of which thread is
// processing the queue at a given time.
//
// A 'bdlmt::MultiQueueThreadPool' allows clients to create and delete queues,
// and to enqueue "jobs" (represented as client-specified functors) to specific
// queues.  Queue processing is implemented on top of a 'bdlmt::ThreadPool' by
// enqueuing a per-queue functor to the thread pool.  Each functor dequeues the
// next item from its associated queue, processes it, and re-enqueues itself to
// the thread pool.  Since there is at most one representative functor per
// queue, each queue is guaranteed to be processed serially by the thread pool.
//
// In addition to the ability to create, delete, pause, and resume queues,
// clients are able to tune the underlying thread pool in accordance with the
// 'bdlmt::ThreadPool' documentation.
//
///Disabled Queues
///---------------
// 'bdlmt::MultiQueueThreadPool' allows clients to disable and re-enable
// queues.  A disabled queue will allow no further jobs to be enqueued, but
// will continue to process the jobs that were enqueued prior to the call to
// 'disableQueue'.  Note that calling 'disableQueue' will block the calling
// thread until the currently executing job (if any) on that queue completes.
//
///Paused Queues
///-------------
// 'bdlmt::MultiQueueThreadPool' also allows clients to pause and resume
// queues.  Pausing a queue suspends the processing of jobs from a queue --
// i.e., after 'pause' returns no further jobs will be processed on that queue
// until the queue is resumed.  Note that calling 'pauseQueue' will block the
// calling thread until the currently executing job (if any) on that queue
// completes.
//
///Thread Safety
///-------------
// The 'bdlmt::MultiQueueThreadPool' class is *fully thread-safe* (i.e., all
// public methods of a particular instance may safely execute concurrently).
// This class is also *thread-enabled* (i.e., the class does not function
// correctly in a non-multi-threading environment).  See 'bsldoc_glossary' for
// complete definitions of *fully thread-safe* and *thread-enabled*.
//
///Job Execution Batch Size
///------------------------
// 'bdlmt::MultiQueueThreadPool' allows clients to configure the maximum size
// of a group of jobs that a queue will execute "atomically".  "Atomically", in
// this context, means that no state changes to the queue will be observed by
// that queue during the processing of the collection of jobs (e.g., a call to
// 'pause' will only pause the queue after the currently executing group of
// jobs completes execution).  By default a queue's batch size is 1.
// Configuring a larger batch size may improve throughput by reducing the
// synchronization overhead needed to execute a job.  However, for many
// use-cases the overall throughput is limited by the time it takes to process
// a job (rather than synchronization overhead), so users are strongly
// encouraged to use benchmarks to guide their decision when setting this
// option.
//
///Thread Names for Sub-Threads
///----------------------------
// To facilitate debugging, users can provide a thread name as the 'threadName'
// attribute of the 'bslmt::ThreadAttributes' argument passed to the
// constructor, that will be used for all the sub-threads.  The thread name
// should not be used programmatically, but will appear in debugging tools on
// platforms that support naming threads to help users identify the source and
// purpose of a thread.  If no 'ThreadAttributes' object is passed, or if the
// 'threadName' attribute is not set, the default value "bdl.MultiQuePl" will
// be used.  Note that this only applies to a 'bdlmt::ThreadPool' automatically
// created by a 'bdlmt::MultiQueueThreadPool'.  If a thread pool already exists
// and is passed to the multi queue thread pool at construction, the subthreads
// will be named however was specified when that thread pool was created.
//
///Usage
///-----
// This section illustrates intended use of this component.
//
///Example 1: A Word Search Application
/// - - - - - - - - - - - - - - - - - -
// This example illustrates the use of a 'bdlmt::MultiQueueThreadPool' in a
// word search application called 'fastSearch'.  'fastSearch' searches a list
// of files for a list of words, and returns the set of files which contain all
// of the specified words.  'bdlmt::MultiQueueThreadPool' is used to provide
// concurrent processing of files, and to simplify the collection of results by
// serializing access to result sets which are maintained for each word.
//
// First, we present a class used to manage a word, and the set of files which
// contain that word:
//..
//  class my_SearchProfile {
//      // This class defines a search profile consisting of a word and a set
//      // of files (given by name) that contain the word.  Here, "word" is
//      // defined as any string of characters.
//
//      bsl::string           d_word;     // word to search for
//      bsl::set<bsl::string> d_fileSet;  // set of matching files
//
//    private:
//      // not implemented
//      my_SearchProfile(const my_SearchProfile&);
//      my_SearchProfile& operator=(const my_SearchProfile&);
//
//    public:
//      // CREATORS
//      my_SearchProfile(const char       *word,
//                       bslma::Allocator *basicAllocator = 0);
//          // Create a 'my_SearchProfile' with the specified 'word'.
//          // Optionally specify a 'basicAllocator' used to supply memory.  If
//          // 'basicAllocator' is 0, the default memory allocator is used.
//
//      ~my_SearchProfile();
//          // Destroy this search profile.
//
//      // MANIPULATORS
//      void insert(const char *file);
//          // Insert the specified 'file' into the file set maintained by this
//          // search profile.
//
//      // ACCESSORS
//      bool isMatch(const char *file) const;
//          // Return 'true' if the specified 'file' matches this search
//          // profile.
//
//      const bsl::set<bsl::string>& fileSet() const;
//          // Return a reference to the non-modifiable file set maintained by
//          // this search profile.
//
//      const bsl::string& word() const;
//          // Return a reference to the non-modifiable word maintained by this
//          // search profile.
//  };
//..
// And the implementation:
//..
//  // CREATORS
//  my_SearchProfile::my_SearchProfile(const char       *word,
//                                     bslma::Allocator *basicAllocator)
//  : d_word(basicAllocator)
//  , d_fileSet(bsl::less<bsl::string>(), basicAllocator)
//  {
//      assert(word);
//
//      d_word.assign(word);
//  }
//
//  my_SearchProfile::~my_SearchProfile()
//  {
//  }
//
//  // MANIPULATORS
//  inline
//  void my_SearchProfile::insert(const char *file)
//  {
//      assert(file);
//
//      d_fileSet.insert(file);
//  }
//
//  // ACCESSORS
//  bool my_SearchProfile::isMatch(const char *file) const
//  {
//      assert(file);
//
//      bool          found = false;
//      bsl::ifstream ifs(file);
//      bsl::string   line;
//      while (bsl::getline(ifs, line)) {
//          if (bsl::string::npos != line.find(d_word)) {
//              found = true;
//              break;
//          }
//      }
//      ifs.close();
//      return found;
//  }
//
//  inline
//  const bsl::set<bsl::string>& my_SearchProfile::fileSet() const
//  {
//      return d_fileSet;
//  }
//
//  inline
//  const bsl::string& my_SearchProfile::word() const
//  {
//      return d_word;
//  }
//..
// Next, we define a helper function to perform a search of a word in a
// particular file.  The function is parameterized by a search profile and a
// file name.  If the specified file name matches the profile, it is inserted
// into the profile's file list.
//..
//  void my_SearchCb(my_SearchProfile* profile, const char *file)
//  {
//      // Insert the specified 'file' to the file set of the specified search
//      // 'profile' if 'file' matches the 'profile'.
//
//      assert(profile);
//      assert(file);
//
//      if (profile->isMatch(file)) {
//          profile->insert(file);
//      }
//  }
//..
// Lastly, we present the front end to the search application: 'fastSearch'.
// 'fastSearch' is parameterized by a list of words to search for, a list of
// files to search in, and a set which is populated with the search results.
// 'fastSearch' instantiates a 'bdlmt::MultiQueueThreadPool', and creates a
// queue for each word.  It then associates each queue with a search profile
// based on a word in the word list.  Then, it enqueues a job to each queue for
// each file in the file list that tries to match the file to each search
// profile.  Lastly, 'fastSearch' collects the results, which is the set
// intersection of each file set maintained by the individual search profiles.
//..
//  void fastSearch(const bsl::vector<bsl::string>&  wordList,
//                  const bsl::vector<bsl::string>&  fileList,
//                  bsl::set<bsl::string>&           resultSet,
//                  int                              repetitions = 1,
//                  bslma::Allocator                *basicAllocator = 0)
//  {
//      // Return the set of files, specified by 'fileList', containing every
//      // word in the specified 'wordList', in the specified 'resultSet'.
//      // Optionally specify 'repetitions', the number of repetitions to run
//      // the search jobs (it is used to increase the load for performance
//      // testing).  Optionally specify a 'basicAllocator' used to supply
//      // memory.  If 'basicAllocator' is 0, the default memory allocator is
//      // used.
//
//      typedef bsl::vector<bsl::string> ListType;
//          // This type is defined for notational convenience when iterating
//          // over 'wordList' or 'fileList'.
//
//      typedef bsl::pair<int, my_SearchProfile*> RegistryValue;
//          // This type is defined for notational convenience.  The first
//          // parameter specifies a queue ID.  The second parameter specifies
//          // an associated search profile.
//
//      typedef bsl::map<bsl::string, RegistryValue> RegistryType;
//          // This type is defined for notational convenience.  The first
//          // parameter specifies a word.  The second parameter specifies a
//          // tuple containing a queue ID, and an associated search profile
//          // containing the specified word.
//
//      enum {
//          // thread pool configuration
//          k_MIN_THREADS = 4,
//          k_MAX_THREADS = 20,
//          k_MAX_IDLE    = 100  // use a very short idle time since new jobs
//                               // arrive only at startup
//      };
//      bslmt::ThreadAttributes     defaultAttrs;
//      bdlmt::MultiQueueThreadPool pool(defaultAttrs,
//                                       k_MIN_THREADS,
//                                       k_MAX_THREADS,
//                                       k_MAX_IDLE,
//                                       basicAllocator);
//      RegistryType profileRegistry(bsl::less<bsl::string>(), basicAllocator);
//
//      // Create a queue and a search profile associated with each word in
//      // 'wordList'.
//
//      for (ListType::const_iterator it = wordList.begin();
//           it != wordList.end();
//           ++it) {
//          bslma::Allocator *allocator =
//                                   bslma::Default::allocator(basicAllocator);
//
//          const bsl::string& word = *it;
//          int                id = pool.createQueue();
//          LOOP_ASSERT(word, 0 != id);
//          my_SearchProfile *profile = new (*allocator)
//                                               my_SearchProfile(word.c_str(),
//                                                                allocator);
//
//          bslma::RawDeleterProctor<my_SearchProfile, bslma::Allocator>
//                                                 deleter(profile, allocator);
//
//          profileRegistry[word] = bsl::make_pair(id, profile);
//          deleter.release();
//      }
//
//      // Start the pool, enabling enqueuing and queue processing.
//      pool.start();
//
//      // Enqueue a job which tries to match each file in 'fileList' with each
//      // search profile.
//
//      for (ListType::const_iterator it = fileList.begin();
//           it != fileList.end();
//           ++it) {
//          for (ListType::const_iterator jt = wordList.begin();
//               jt != wordList.end();
//               ++jt) {
//              const bsl::string& file = *it;
//              const bsl::string& word = *jt;
//              RegistryValue&     rv   = profileRegistry[word];
//              Func               job;
//              makeFunc(&job, my_SearchCb, rv.second, file.c_str());
//              for (int i = 0; i < repetitions; ++i) {
//                  int rc = pool.enqueueJob(rv.first, job);
//                  LOOP_ASSERT(word, 0 == rc);
//              }
//          }
//      }
//
//      // Stop the pool, and wait while enqueued jobs are processed.
//      pool.stop();
//
//      // Construct the 'resultSet' as the intersection of file sets collected
//      // in each search profile.
//
//      resultSet.insert(fileList.begin(), fileList.end());
//      for (RegistryType::iterator it = profileRegistry.begin();
//           it != profileRegistry.end();
//           ++it) {
//          my_SearchProfile *profile = it->second.second;
//          const bsl::set<bsl::string>& fileSet = profile->fileSet();
//          bsl::set<bsl::string> tmpSet;
//          bsl::set_intersection(fileSet.begin(),
//                                fileSet.end(),
//                                resultSet.begin(),
//                                resultSet.end(),
//                                bsl::inserter(tmpSet, tmpSet.begin()));
//          resultSet = tmpSet;
//          bslma::Default::allocator(basicAllocator)->deleteObjectRaw(
//                                                                    profile);
//      }
//  }
//..

#include <bdlscm_version.h>

#include <bslmt_lockguard.h>
#include <bdlmt_threadpool.h>

#include <bdlcc_objectpool.h>

#include <bslma_allocator.h>
#include <bslma_usesbslmaallocator.h>

#include <bslmf_nestedtraitdeclaration.h>

#include <bslmt_condition.h>
#include <bslmt_mutex.h>
#include <bslmt_mutexassert.h>
#include <bslmt_readerwritermutex.h>
#include <bslmt_readlockguard.h>
#include <bslmt_writelockguard.h>

#include <bsls_assert.h>
#include <bsls_atomic.h>

#include <bsl_deque.h>
#include <bsl_functional.h>
#include <bsl_map.h>

#include <bslmt_latch.h>

namespace BloombergLP {
namespace bdlmt {

class MultiQueueThreadPool;

                     // ================================
                     // class MultiQueueThreadPool_Queue
                     // ================================

class MultiQueueThreadPool_Queue {
    // This private class provides a thread-safe, lightweight job queue.

  public:
    // PUBLIC TYPES
    typedef bsl::function<void()> Job;

  private:
    // PRIVATE TYPES
    enum EnqueueState {
        // enqueue states
        e_ENQUEUING_ENABLED,   // enqueuing is enabled
        e_ENQUEUING_DISABLED,  // enqueuing is disabled
        e_DELETING             // deleting
    };

    enum RunState {
        e_NOT_SCHEDULED,       // running but not scheduled
        e_SCHEDULED,           // running and scheduled
        e_PAUSING,             // pause requested but not completed yet
        e_PAUSED               // paused
    };

    // DATA
    MultiQueueThreadPool      *d_multiQueueThreadPool_p;
                                                 // the 'MultiQueueThreadPool'
                                                 // that owns this object

    bsl::deque<Job>            d_list;           // queue of jobs to be
                                                 // executed

    EnqueueState               d_enqueueState;   // maintains enqueue state

    RunState                   d_runState;       // maintains run state

    int                        d_batchSize;      // execution batch size

    mutable bslmt::Mutex       d_lock;           // protect queue and
                                                 // informational members

    bslmt::Condition           d_pauseCondition; // use to notify thread
                                                 // awaiting pause state

    int                        d_pauseCount;     // number of threads waiting
                                                 // for the pause to complete

    Job                        d_processingCb;   // bound processing callback
                                                 // for pool

    bslmt::ThreadUtil::Handle  d_processor;      // current worker thread, or
                                                 // ThreadUtil::invalidHandle()

    // NOT IMPLEMENTED
    MultiQueueThreadPool_Queue();
    MultiQueueThreadPool_Queue(const MultiQueueThreadPool_Queue&);
    MultiQueueThreadPool_Queue &operator=(const MultiQueueThreadPool_Queue &);

    // PRIVATE MANIPULATORS
    void setPaused();
        // Mark this queue as paused, notify any threads blocked on
        // 'd_pauseCondition', and schedule the deletion job if this queue is
        // to be deleted.  The behavior is undefined unless this queue's lock
        // is in a locked state and 'e_PAUSING == d_runState'.

  public:
    // TRAITS
    BSLMF_NESTED_TRAIT_DECLARATION(MultiQueueThreadPool_Queue,
                                   bslma::UsesBslmaAllocator);

    // CREATORS
    explicit
    MultiQueueThreadPool_Queue(MultiQueueThreadPool *multiQueueThreadPool,
                               bslma::Allocator     *basicAllocator = 0);
        // Create a 'MultiQueueThreadPool_Queue' with an initial capacity of 0
        // and initialized to use the specified 'multiQueueThreadPool' to track
        // aggregate values (e.g., the number of active queues) and to obtain
        // the thread pool used to execute jobs that are appended to this
        // queue.  Optionally specify a 'basicAllocator' used to supply memory.
        // If 'basicAllocator' is 0, the default memory allocator is used.

    ~MultiQueueThreadPool_Queue();
        // Destroy this queue.

    // MANIPULATORS
    int enable();
        // Enable enqueuing to this queue.  Return 0 on success, and a non-zero
        // value otherwise.  This method will fail (with an error) if
        // 'prepareForDeletion' has already been called on this object.

    int disable();
        // Disable enqueuing to this queue.  Return 0 on success, and a
        // non-zero value otherwise.  This method will fail (with an error) if
        // 'prepareForDeletion' has already been called on this object.

    void drainWaitWhilePausing();
        // Block until all threads waiting for this queue to pause are
        // released.

    void executeFront();
        // Execute the 'Job' at the front of this queue, dequeue the 'Job', and
        // if the queue is not paused schedule a callback from the associated
        // thread pool.  The behavior is undefined if this queue is empty.

    bool enqueueDeletion(const Job&    cleanupFunctor   = Job(),
                         bslmt::Latch *completionSignal = 0);
        // Permanently disable enqueueing from this queue, and enqueue a job
        // that will delete this queue.  Optionally specify 'cleanupFunctor',
        // which, if supplied, will be invoked immediately prior to this
        // queue's deletion.  Optionally specify 'completionSignal', on which
        // (if the calling thread is not processing a job - or batch of jobs -
        // for this queue) to invoke 'arrive' when the queue is deleted.
        // Return 'true' if the current thread is the thread processing a job
        // (or batch of jobs), and 'false' otherwise.  Note that if
        // 'completionSignal' is supplied, a return status of 'false' typically
        // indicates that 'completionSignal->wait()' should be invoked from the
        // calling function', while a return status of 'true' indicates this is
        // an attempt to delete the queue from within a job being processed on
        // the queue (so waiting on the queue's deletion would result in a
        // dead-lock).

    int initiatePause();
        // Initiate the pausing of this queue, prevent jobs from being executed
        // on this queue (excluding the currently-executing job - or batch of
        // jobs - if there is one), and prevent the queue from being deleted.
        // Return 0 on success, and a non-zero value if the queue is already
        // paused or is being paused or deleted by another thread.  The
        // behavior is undefined unless, after a successful invocation of
        // 'initiatePause', 'waitWhilePausing' is invoked (to complete the
        // pause operation and allow the queue to, potentially, be deleted).

    int pushBack(const Job& functor);
        // Enqueue the specified 'functor' at the end of this queue.  Return 0
        // on success, and a non-zero value if enqueuing is disabled.

    int pushFront(const Job& functor);
        // Add the specified 'functor' at the front of this queue.  Return 0 on
        // success, and a non-zero value if enqueuing is disabled.

    void reset();
        // Reset this queue to its initial state.  The behavior is undefined
        // unless this queue's lock is in an unlocked state.  After this method
        // returns, the object is ready for use as though it were a new object.
        // Note that this method is not thread-safe and is used by the object
        // pool contained within '*d_multiQueueThreadPool_p'.

    int resume();
        // Allow jobs on the queue to begin executing.  Return 0 on success,
        // and a non-zero value if the queue is not paused or '!d_list.empty()'
        // and the associated thread pool fails to enqueue a job.

    void setBatchSize(int batchSize);
        // Configure this queue to process jobs in groups of the specified
        // 'batchSize' (see {'Job Execution Batch Size'}).  When a thread is
        // selecting jobs for processing, if fewer than 'batchSize' jobs are
        // available then only the available jobs will be processed in the
        // current batch.  The behavior is undefined unless '1 <= batchSize'.
        // Note that the initial value for the execution batch size is 1 for
        // all queues.

    void waitWhilePausing();
        // Wait until any currently-executing job on the queue completes and
        // the queue is paused.  Note that pausing differs from 'disable' in
        // that (1) 'pause' stops processing for a queue, and (2) does *not*
        // prevent additional jobs from being enqueued.  The behavior of this
        // method is undefined unless it is invoked after a successful
        // 'initiatePause' invocation.

    // ACCESSORS
    int batchSize() const;
        // Return an instantaneous snapshot of the execution batch size (see
        // {'Job Execution Batch Size'}).  When a thread is selecting jobs for
        // processing, if fewer than 'batchSize' jobs are available then only
        // the available jobs will be processed in the current batch.

    bool isDrained() const;
        // Report whether all jobs in this queue are finished.

    bool isEnabled() const;
        // Report whether enqueuing to this object is enabled.  This object is
        // constructed with enqueuing enabled.

    bool isPaused() const;
        // Report whether this object is paused.

    int length() const;
        // Return an instantaneous snapshot of the length of this queue.
};

                        // ==========================
                        // class MultiQueueThreadPool
                        // ==========================

class MultiQueueThreadPool {
    // This class implements a dynamic, configurable pool of queues, each of
    // which is processed serially by a thread pool.

    // FRIENDS
    friend class MultiQueueThreadPool_Queue;

    // PRIVATE TYPES
    enum State {
        // Internal running states.
        e_STATE_RUNNING,
        e_STATE_STOPPING,
        e_STATE_STOPPED
    };

  public:
    // PUBLIC TYPES
    typedef bsl::function<void()>                       Job;
    typedef bsl::function<void()>                       CleanupFunctor;
    typedef bsl::map<int, MultiQueueThreadPool_Queue *> QueueRegistry;

  private:
    // PRIVATE CLASS DATA
    static const char       s_defaultThreadName[16];  // Thread name to use
                                                      // when none is
                                                      // specified.

    // PRIVATE DATA
    bslma::Allocator *d_allocator_p;        // memory allocator (held)

    ThreadPool       *d_threadPool_p;       // threads for queue processing

    bool              d_threadPoolIsOwned;  // 'true' if thread pool is owned

    bdlcc::ObjectPool<
          MultiQueueThreadPool_Queue,
          bdlcc::ObjectPoolFunctors::DefaultCreator,
          bdlcc::ObjectPoolFunctors::Reset<MultiQueueThreadPool_Queue>
    >                 d_queuePool;          // pool of queues

    QueueRegistry     d_queueRegistry;      // registry of queues

    int               d_nextId;             // next id to provide from
                                            // 'createQueue'

    State             d_state;              // maintains internal state

    mutable bslmt::ReaderWriterMutex
                      d_lock;               // locked for write when deleting
                                            // queues or changing pool state

    bsls::AtomicInt   d_numActiveQueues;    // number of non-empty queues

    bsls::AtomicInt   d_numExecuted;        // the total number of requests
                                            // processed by this pool since the
                                            // last time this value was reset

    bsls::AtomicInt   d_numEnqueued;        // the total number of requests
                                            // enqueued into this pool since
                                            // the last time this value was
                                            // reset

    bsls::AtomicInt   d_numDeleted;         // the total number of requests
                                            // deleted from this pool since the
                                            // last time this value was reset
  private:
    // NOT IMPLEMENTED
    MultiQueueThreadPool(const MultiQueueThreadPool&);
    MultiQueueThreadPool& operator=(const MultiQueueThreadPool&);

    // PRIVATE MANIPULATORS
    void deleteQueueCb(MultiQueueThreadPool_Queue *queue,
                       const CleanupFunctor&       cleanup,
                       bslmt::Latch               *completionSignal);
        // Delete the specified 'queue', if the specified 'cleanup' is valid
        // invoke 'cleanup', if the specified 'completionSignal' is not 0, call
        // 'completionSignal->arrive'.  'completionSignal' may be 0.  Note that
        // this callback provides a mechanism for proper lifetime management of
        // the 'queue' by scheduling the deletion with the associated thread
        // pool since the 'MultiQueueThreadPool' does not know *when* to delete
        // the queue and a 'MultiQueueThreadPool_Queue' cannot delete itself at
        // the appropriate time.

    int findIfUsable(int id, MultiQueueThreadPool_Queue **queue);
        // Load into the specified '*queue' a pointer to the queue referenced
        // by the specified 'id' if this 'MultiQueueThreadPool' is in a state
        // where the 'queue' can be used.  Return 0 on success, and a non-zero
        // value if the 'id' is not contained in 'd_queueRegistry', this
        // 'MultiQueueThreadPool' is not in the running state, or
        // '0 == d_threadPool_p->enabled()'.  The behavior is undefined unless
        // the invoking thread has a lock, read or write, on 'd_lock'.

  public:
    // TRAITS
    BSLMF_NESTED_TRAIT_DECLARATION(MultiQueueThreadPool,
                                   bslma::UsesBslmaAllocator);

    // CREATORS
    MultiQueueThreadPool(const bslmt::ThreadAttributes&  threadAttributes,
                         int                             minThreads,
                         int                             maxThreads,
                         int                             maxIdleTime,
                         bslma::Allocator               *basicAllocator = 0);
        // Construct a 'MultiQueueThreadPool' with the specified
        // 'threadAttributes', the specified 'minThreads' minimum number of
        // threads, the specified 'maxThreads' maximum number of threads, and
        // the specified 'maxIdleTime' idle time (in milliseconds) after which
        // a thread may be considered for destruction.  Optionally specify a
        // 'basicAllocator' used to supply memory.  If 'basicAllocator' is 0,
        // the currently installed default allocator is used.  The behavior is
        // undefined unless '0 <= minThreads', 'minThreads <= maxThreads', and
        // '0 <= maxIdleTime'.  Note that the 'MultiQueueThreadPool' is created
        // without any queues.  Although queues may be created, 'start' must be
        // called before enqueuing jobs.

    explicit
    MultiQueueThreadPool(ThreadPool       *threadPool,
                         bslma::Allocator *basicAllocator = 0);
        // Construct a 'MultiQueueThreadPool' with the specified 'threadPool'.
        // Optionally specify a 'basicAllocator' used to supply memory.  If
        // 'basicAllocator' is 0, the default memory allocator is used.  The
        // behavior is undefined if 'threadPool' is 0.  Note that the
        // 'MultiQueueThreadPool' is created without any queues.  Although
        // queues may be created, 'start' must be called before enqueuing jobs.

    ~MultiQueueThreadPool();
        // Destroy this multi-queue thread pool.  Disable queuing on all
        // queues, and wait until all queues are empty.  Then, delete all
        // queues, and shut down the thread pool if the thread pool is owned by
        // this object.  This method will block if any thread is executing
        // 'start' or 'stop' at the time of the call.

    // MANIPULATORS
    int addJobAtFront(int id, const Job& functor);
        // Add the specified 'functor' at the front of the queue specified by
        // 'id'.  Return 0 if added successfully, and a non-zero value if
        // queuing is disabled.  The behavior is undefined unless 'functor' is
        // bound.  Note that the position of 'functor' relative to any
        // currently queued jobs is unspecified unless the queue is currently
        // paused.

    int createQueue();
        // Create a queue with unlimited capacity and a default number of
        // initial elements.  Return a non-zero queue ID.  The queue ID can be
        // used to enqueue jobs to the queue, or to control or delete the
        // queue.

    int deleteQueue(int id, const CleanupFunctor& cleanupFunctor);
        // Disable enqueuing to the queue associated with the specified 'id',
        // and enqueue the specified 'cleanupFunctor' to the *front* of the
        // queue.  The 'cleanupFunctor' is guaranteed to be the last queue
        // element processed, after which the queue is destroyed.  This
        // function does not wait for the 'cleanupFunctor' to be executed
        // (instead the caller is notified asynchronously through the execution
        // of the supplied 'cleanupFunctor').  Return 0 on success, and a
        // non-zero value otherwise.  Note that this function will fail if this
        // pool is stopped.

    int deleteQueue(int id);
        // Disable enqueuing to the queue associated with the specified 'id',
        // and when the currently executing job (or batch of jobs) of that
        // queue, if any, is complete, then destroy the queue.  Return 0 on
        // success, and a non-zero value otherwise.  This function will fail if
        // the pool is stopped.  Any other (non-executing) jobs on the queue
        // are deleted asynchronously.  The calling thread blocks until
        // completion of the currently executing job (or batch of jobs), except
        // when 'deleteQueue' is called from a job in the queue being deleted.
        // In that latter case, no block takes place, the queue is deleted (no
        // longer observable from the 'MultiQueueThreadPool'), and the job
        // completes.

    int disableQueue(int id);
        // Disable enqueuing to the queue associated with the specified 'id'.
        // Return 0 on success, and a non-zero value otherwise.  Note that this
        // method differs from 'pauseQueue' in that (1) 'disableQueue' does
        // *not* stop processing for a queue, and (2) prevents additional jobs
        // from being enqueued.

    void drain();
        // Wait until all queues are empty.  This method waits until all
        // non-paused queues are empty without disabling the queues (and may
        // thus wait indefinitely).  The queues and/or the thread pool may be
        // either enabled or disabled when this method is called.  This method
        // may be called on a stopped or started thread pool.  Note that
        // 'drain' does not attempt to delete queues directly.  However, as a
        // side-effect of emptying all queues, any queue for which
        // 'deleteQueue' was called previously will be deleted before 'drain'
        // returns.  Note also that this method waits by repeatedly yielding.

    int drainQueue(int id);
        // Wait until all jobs in the queue indicated by the specified 'id' are
        // finished.  This method simply waits until that queue is empty,
        // without disabling the queue; it may thus wait indefinitely if more
        // jobs are being added.  The queue may be enabled or disabled when
        // this method is called.  Return 0 on success, and a non-zero value if
        // the specified queue does not exist or is deleted while this method
        // is waiting.  Note that this method waits by repeatedly yielding.

    int enqueueJob(int id, const Job& functor);
        // Enqueue the specified 'functor' to the queue specified by 'id'.
        // Return 0 if enqueued successfully, and a non-zero value if queuing
        // is disabled.  The behavior is undefined unless 'functor' is bound.

    int enableQueue(int id);
        // Enable enqueuing to the queue associated with the specified 'id'.
        // Return 0 on success, and a non-zero value otherwise.  It is an error
        // to call 'enableQueue' if a previous call to 'stop' is being
        // executed.

    void numProcessedReset(int *numExecuted,
                           int *numEnqueued,
                           int *numDeleted = 0);
        // Load into the specified 'numExecuted' and 'numEnqueued' the number
        // of items dequeued / enqueued (respectively) since the last time
        // these values were reset and reset these values.  Optionally specify
        // a 'numDeleted' used to load into the number of items deleted since
        // the last time this value was reset.  Reset the count of deleted
        // items.

    int pauseQueue(int id);
        // Wait until any currently-executing job (or batch of jobs) on the
        // queue with the specified 'id' completes, then prevent any more jobs
        // from being executed on that queue.  Return 0 on success, and a
        // non-zero value if the queue is already paused or is being paused or
        // deleted by another thread.  Note that this method may be invoked
        // from a job executing on the given queue, in which case this method
        // does not wait.  Note also that this method differs from
        // 'disableQueue' in that (1) 'pauseQueue' stops processing for a
        // queue, and (2) does *not* prevent additional jobs from being
        // enqueued.

    int resumeQueue(int id);
        // Allow jobs on the queue with the specified 'id' to begin executing.
        // Return 0 on success, and a non-zero value if the queue does not
        // exist or is not paused.

    int setBatchSize(int id, int batchSize);
        // Configure the queue specified by 'id' to process jobs in groups of
        // the specified 'batchSize' (see {'Job Execution Batch Size'}).  When
        // a thread is selecting jobs for processing, if fewer than 'batchSize'
        // jobs are available then only the available jobs will be processed in
        // the current batch.  Return 0 on success, and a non-zero value
        // otherwise.  The behavior is undefined unless '1 <= batchSize'.  Note
        // that the initial value for the execution batch size is 1 for all
        // queues.

    void shutdown();
        // Disable queuing on all queues, and wait until all non-paused queues
        // are empty.  Then, delete all queues, and shut down the thread pool
        // if the thread pool is owned by this object.

    int start();
        // Enable queuing on all queues, start the thread pool if the thread
        // pool is owned by this object, and ensure that at least the minimum
        // number of processing threads are started.  Return 0 on success, and
        // a non-zero value otherwise.  This method will block if any thread is
        // executing 'stop' or 'shutdown' at the time of the call.  This method
        // has no effect if this thread pool has already been started.  Note
        // that any paused queues remain paused.

    void stop();
        // Disable queuing on all queues and wait until all non-paused queues
        // are empty.  Then, stop the thread pool if the thread pool is owned
        // by this object.  Note that 'stop' does not attempt to delete queues
        // directly.  However, as a side-effect of emptying all queues, any
        // queue for which 'deleteQueue' was called previously will be deleted
        // before 'stop' unblocks.

    // ACCESSORS
    int batchSize(int id) const;
        // Return an instantaneous snapshot of the execution batch size (see
        // {'Job Execution Batch Size'}) of the queue associated with the
        // specified 'id', or -1 if 'id' is not a valid queue id.  When a
        // thread is selecting jobs for processing, if fewer than 'batchSize'
        // jobs are available then only the available jobs will be processed in
        // the current batch.

    bool isPaused(int id) const;
        // Return 'true' if the queue associated with the specified 'id' is
        // currently paused, or 'false' otherwise (including if 'id' is not a
        // valid queue id).

    bool isEnabled(int id) const;
        // Return 'true' if the queue associated with the specified 'id' is
        // currently enabled, or 'false' otherwise (including if 'id' is not a
        // valid queue id).

    int numQueues() const;
        // Return an instantaneous snapshot of the number of queues managed by
        // this object.

    int numElements() const;
        // Return an instantaneous snapshot of the total number of elements
        // enqueued.

    int numElements(int id) const;
        // Return an instantaneous snapshot of the number of elements enqueued
        // in the queue associated with the specified 'id' as a non-negative
        // integer, or -1 if 'id' does not specify a valid queue.

    void numProcessed(int *numExecuted,
                      int *numEnqueued,
                      int *numDeleted = 0) const;
        // Load into the specified 'numExecuted' and 'numEnqueued' the number
        // of items dequeued / enqueued (respectively) since the last time
        // these values were reset.  Optionally specify a 'numDeleted' used to
        // load into the number of items deleted since the last time this value
        // was reset.

    const ThreadPool& threadPool() const;
        // Return a reference to the non-modifiable thread pool owned by this
        // object.
};

// ============================================================================
//                             INLINE DEFINITIONS
// ============================================================================

                     // --------------------------------
                     // class MultiQueueThreadPool_Queue
                     // --------------------------------

// ACCESSORS
inline
int MultiQueueThreadPool_Queue::batchSize() const
{
    bslmt::LockGuard<bslmt::Mutex> guard(&d_lock);

    return d_batchSize;
}

inline
bool MultiQueueThreadPool_Queue::isDrained() const
{
    bslmt::LockGuard<bslmt::Mutex> guard(&d_lock);

    return 0 == d_list.size() && (   e_NOT_SCHEDULED == d_runState
                                  || e_PAUSED        == d_runState);
}

inline
bool MultiQueueThreadPool_Queue::isEnabled() const
{
    bslmt::LockGuard<bslmt::Mutex> guard(&d_lock);

    return e_ENQUEUING_ENABLED == d_enqueueState;
}

inline
bool MultiQueueThreadPool_Queue::isPaused() const
{
    bslmt::LockGuard<bslmt::Mutex> guard(&d_lock);

    return e_PAUSED == d_runState;
}

inline
int MultiQueueThreadPool_Queue::length() const
{
    bslmt::LockGuard<bslmt::Mutex> guard(&d_lock);

    return static_cast<int>(d_list.size());
}

                        // --------------------------
                        // class MultiQueueThreadPool
                        // --------------------------

// PRIVATE MANIPULATORS
inline
int MultiQueueThreadPool::findIfUsable(int                          id,
                                       MultiQueueThreadPool_Queue **queue)
{
    if (   e_STATE_RUNNING != d_state
        ||               0 == d_threadPool_p->enabled()) {
        return 1;                                                     // RETURN
    }

    QueueRegistry::iterator iter = d_queueRegistry.find(id);

    if (d_queueRegistry.end() == iter) {
        return 1;                                                     // RETURN
    }

    *queue = iter->second;

    return 0;
}

// MANIPULATORS
inline
int MultiQueueThreadPool::addJobAtFront(int id, const Job& functor)
{
    bslmt::ReadLockGuard<bslmt::ReaderWriterMutex> guard(&d_lock);

    MultiQueueThreadPool_Queue *queue;

    if (findIfUsable(id, &queue)) {
        return 1;                                                     // RETURN
    }

    if (0 == queue->pushFront(functor)) {
        ++d_numEnqueued;
        return 0;                                                     // RETURN
    }

    return 1;
}

inline
int MultiQueueThreadPool::enqueueJob(int id, const Job& functor)
{
    bslmt::ReadLockGuard<bslmt::ReaderWriterMutex> guard(&d_lock);

    MultiQueueThreadPool_Queue *queue;

    if (findIfUsable(id, &queue)) {
        return 1;                                                     // RETURN
    }

    if (0 == queue->pushBack(functor)) {
        ++d_numEnqueued;
        return 0;                                                     // RETURN
    }

    return 1;
}

inline
void MultiQueueThreadPool::numProcessedReset(int *numExecuted,
                                             int *numEnqueued,
                                             int *numDeleted)
{
    bslmt::WriteLockGuard<bslmt::ReaderWriterMutex> guard(&d_lock);

    // To maintain consistency, all three must be zeroed atomically.

    *numExecuted = d_numExecuted.swap(0);
    if (numDeleted) {
        *numDeleted = d_numDeleted.swap(0);
    }
    else {
        d_numDeleted = 0;
    }
    *numEnqueued = d_numEnqueued.swap(0);
}

inline
int MultiQueueThreadPool::setBatchSize(int id, int batchSize)
{
    BSLS_ASSERT_SAFE(1 <= batchSize);

    bslmt::ReadLockGuard<bslmt::ReaderWriterMutex> guard(&d_lock);

    MultiQueueThreadPool_Queue *queue;

    if (findIfUsable(id, &queue)) {
        return 1;                                                     // RETURN
    }

    queue->setBatchSize(batchSize);

    return 0;
}

// ACCESSORS
inline
int MultiQueueThreadPool::batchSize(int id) const
{
    bslmt::ReadLockGuard<bslmt::ReaderWriterMutex> guard(&d_lock);

    QueueRegistry::const_iterator iter = d_queueRegistry.find(id);

    if (d_queueRegistry.end() != iter) {
        return iter->second->batchSize();                             // RETURN
    }

    return -1;
}

inline
bool MultiQueueThreadPool::isEnabled(int id) const
{
    bslmt::ReadLockGuard<bslmt::ReaderWriterMutex> guard(&d_lock);

    QueueRegistry::const_iterator iter = d_queueRegistry.find(id);

    if (d_queueRegistry.end() != iter) {
        return iter->second->isEnabled();                             // RETURN
    }

    return false;
}

inline
bool MultiQueueThreadPool::isPaused(int id) const
{
    bslmt::ReadLockGuard<bslmt::ReaderWriterMutex> guard(&d_lock);

    QueueRegistry::const_iterator iter = d_queueRegistry.find(id);

    if (d_queueRegistry.end() != iter) {
        return iter->second->isPaused();                              // RETURN
    }

    return false;
}

inline
int MultiQueueThreadPool::numElements() const
{
    // Access 'd_numEnqueued' last to ensure the result is non-negative.

    return -(d_numExecuted + d_numDeleted) + d_numEnqueued;
}

inline
int MultiQueueThreadPool::numElements(int id) const
{
    bslmt::ReadLockGuard<bslmt::ReaderWriterMutex> guard(&d_lock);

    QueueRegistry::const_iterator iter = d_queueRegistry.find(id);

    if (d_queueRegistry.end() != iter) {
        return iter->second->length();                                // RETURN
    }

    return -1;
}

inline
void MultiQueueThreadPool::numProcessed(int *numExecuted,
                                        int *numEnqueued,
                                        int *numDeleted) const
{
    // Access 'd_numEnqueued' last to ensure
    // 'numEnqueued >= numExecuted + numDeleted'.

    *numExecuted = d_numExecuted;
    if (numDeleted) {
        *numDeleted = d_numDeleted;
    }
    *numEnqueued = d_numEnqueued;
}

inline
int MultiQueueThreadPool::numQueues() const
{
    bslmt::ReadLockGuard<bslmt::ReaderWriterMutex> guard(&d_lock);

    return static_cast<int>(d_queueRegistry.size());
}

inline
const ThreadPool& MultiQueueThreadPool::threadPool() const
{
    return *d_threadPool_p;
}

}  // close package namespace
}  // close enterprise namespace

#endif

// ----------------------------------------------------------------------------
// Copyright 2020 Bloomberg Finance L.P.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------- END-OF-FILE ----------------------------------