// bdlmt_threadmultiplexor.h                                          -*-C++-*-

// ----------------------------------------------------------------------------
//                                   NOTICE
//
// This component is not up to date with current BDE coding standards, and
// should not be used as an example for new development.
// ----------------------------------------------------------------------------

#ifndef INCLUDED_BDLMT_THREADMULTIPLEXOR
#define INCLUDED_BDLMT_THREADMULTIPLEXOR

#include <bsls_ident.h>
BSLS_IDENT("$Id: $")

//@PURPOSE: Provide a mechanism for partitioning a collection of threads.
//
//@CLASSES:
// bdlmt::ThreadMultiplexor: mechanism to partition multi-threaded processing
//
//@SEE_ALSO: bdlmt_threadpool, bdlmt_fixedthreadpool
//
//@DESCRIPTION: This component provides a mechanism for partitioning a
// collection of threads, so that a single collection of threads (e.g., a
// thread pool) can be used to process various types of user-defined functions
// ("jobs") while sharing the thread resources equitably between them.  A
// typical example where this type of partitioning is desired is an application
// that performs both I/O and CPU-intensive processing.  The traditional
// approach is to create two thread pools--one for I/O, and one for
// processing--and pass control (in the form of a callback) from one thread
// pool to the other.  However, there are several problems with this approach.
// Firstly, the process incurs the overhead of context switching between
// threads, which must necessarily occur because there are two different thread
// pools.  Secondly, the process may not be able to adapt well to imbalances
// between one type of processing versus the other if the number of threads in
// each thread pool is bounded.  In this case, a large number of jobs may be
// enqueued while some portion of threads allocated to the process go unused.
// On the other hand, simply sharing a single thread pool without a provision
// for partitioning the use of threads may result in one type of processing
// starving the other.
//
// The 'bdlmt::ThreadMultiplexor' provides an API, 'processJob', to process
// user-specified jobs.  A multiplexor instance is configured with a maximum
// number of "processors", i.e., the maximum number of threads that may process
// jobs at any particular time.  Additional threads enqueue jobs to a pending
// job queue, which is processed by the next available processing thread.
//
// Typically, a 'bdlmt::ThreadMultiplexor' instance is used in conjunction with
// a thread pool (e.g., 'bdlmt::FixedThreadPool'), where each thread pool
// thread calls the multiplexor 'processJob' method to perform some work.  The
// multiplexor guarantees that no more that the configured number of threads
// will process jobs concurrently.  This guarantee allows a single thread pool
// to be used in a variety of situations that require partitioning thread
// resources.
//
///Thread Safety
///-------------
// The 'bdlmt::ThreadMultiplexor' class is both *fully thread-safe* (i.e., all
// non-creator methods can correctly execute concurrently), and is
// *thread-enabled* (i.e., the class does not function correctly in a
// non-multi-threading environment).  See 'bsldoc_glossary' for complete
// definitions of *fully thread-safe* and *thread-enabled*.
//
///Usage
///-----
// This section illustrates intended use of this component.
//
///Example 1: Multiple Work Queues
///- - - - - - - - - - - - - - - -
// The following usage example illustrates how the 'bdlmt::ThreadMultiplexor'
// can be used to share thread resources between three separate work queues.
// Assume that there are three classes of jobs: jobs that are important, jobs
// that are urgent, and jobs that are critical.  We would like to execute each
// class of jobs in a single thread pool, but we want ensure that all types of
// jobs can be executed at any time.
//
// We begin by defining a class that encapsulates the notion of a job queue.
// Our 'JobQueue' class holds a reference to a 'bdlmt::FixedThreadPool', used
// to instantiate the job queue, and owns an instance of
// 'bdlmt::ThreadMultiplexor', used to process jobs.
//..
//  class JobQueue {
//      // This class defines a generic processor for user-defined functions
//      // ("jobs").  Jobs specified to the 'processJob' method are executed
//      // in the thread pool specified at construction.
//
//    public:
//      // PUBLIC TYPES
//      typedef bdlmt::ThreadMultiplexor::Job Job;
//          // A callback of this type my be specified to the 'processJob'
//          // method.
//
//    private:
//      // DATA
//      bdlmt::FixedThreadPool   *d_threadPool_p;  // (held, not owned)
//      bdlmt::ThreadMultiplexor  d_multiplexor;   // used to partition threads
//
//    private:
//      // NOT IMPLEMENTED
//      JobQueue(const JobQueue&);
//      JobQueue& operator=(const JobQueue&);
//
//    public:
//      // CREATORS
//      JobQueue(int                     maxProcessors,
//               bdlmt::FixedThreadPool *threadPool,
//               bslma::Allocator       *basicAllocator = 0);
//        // Create a job queue that executes jobs in the specified
//        // 'threadPool' using no more than the specified 'maxProcessors'.
//        // Optionally specify a 'basicAllocator' used to supply memory.  If
//        // 'basicAllocator' is 0, the currently installed default allocator
//        // is used.
//
//      ~JobQueue();
//          // Destroy this object.
//
//      // MANIPULATORS
//      int processJob(const Job& job);
//          // Process the specified 'job' in the thread pool specified at
//          // construction.  Return 0 on success, and a non-zero value
//          // otherwise.
//  };
//..
// The maximum number of processors for the multiplexor instance owned by each
// 'JobQueue' is configured using the following formula, for
// T = number of threads and M = number of multiplexors > 1:
//..
//   maxProc = ceil(T / (M-1))-1
//..
// This allows multiple 'JobQueue' instances to share the same threadpool
// without starving each other when the thread pool has more than one thread.
// For this usage example, we assume M (number of multiplexors) = 3, and T
// (number of threads) = 5, so maxProc = 2.  It is important to note that every
// call to 'processJob' enqueues a job to the thread pool, so the length of the
// thread pool queue determines the maximum number of jobs that can be accepted
// by the JobQueue.  (Multiple JobQueues share the same maximum *together*, so
// not all will be able to reach their individual maximum at the same time).
//..
//
//   JobQueue::JobQueue(int                     maxProcessors,
//                      bdlmt::FixedThreadPool *threadPool,
//                      bslma::Allocator       *basicAllocator)
//   : d_threadPool_p(threadPool)
//   , d_multiplexor (maxProcessors,
//                    threadPool->queueCapacity(),
//                    basicAllocator)
//  {
//  }
//
//  JobQueue::~JobQueue()
//  {
//  }
//..
// The 'processJob' method enqueues a secondary callback into the thread pool
// that executes the user-specified 'job' through the multiplexor.
//..
//  int JobQueue::processJob(const JobQueue::Job& job)
//  {
//      return d_threadPool_p->tryEnqueueJob(bdlf::BindUtil::bind(
//                                  &bdlmt::ThreadMultiplexor::processJob<Job>,
//                                  &d_multiplexor,
//                                  job));
//  }
//..
// The following program uses three instances of 'JobQueue' to process
// important, urgent, and critical jobs using a single collection of threads.
//..
//  int main(void)
//  {
//      enum {
//          NUM_THREADS   = 5,   // total number of threads
//          NUM_QUEUES    = 3,   // total number of JobQueue objects
//          MAX_QUEUESIZE = 20   // total number of pending jobs
//      };
//
//      int maxProc = bsl::max(1,
//                             ceil(double(NUM_THREADS) / (NUM_QUEUES-1))-1);
//
//      bdlmt::FixedThreadPool tp(NUM_THREADS, MAX_QUEUESIZE);
//      JobQueue             importantQueue(maxProc, &tp);
//      JobQueue             urgentQueue(maxProc, &tp);
//      JobQueue             criticalQueue(maxProc, &tp);
//
//      if (0 != tp.start()) {
//         ASSERT(!"Could not start thread pool!");
//         return -1;
//      }
//
//      JobQueue::Job ijob =
//         bdlf::BindUtil::bind(&bsls::AtomicInt::add, &iCheck, 1);
//
//      JobQueue::Job ujob = bdlf::BindUtil::bind(
//         bdlf::BindUtil::bind(&bsls::AtomicInt::add, &uCheck, 1);
//
//      JobQueue::Job cjob = bdlf::BindUtil::bind(
//         bdlf::BindUtil::bind(&bsls::AtomicInt::add, &cCheck, 1);
//
//      importantQueue.processJob(ijob);
//      importantQueue.processJob(ijob);
//      importantQueue.processJob(ijob);
//      importantQueue.processJob(ijob);
//      importantQueue.processJob(ijob);
//      importantQueue.processJob(ijob);
//
//      urgentQueue.processJob(ujob);
//      urgentQueue.processJob(ujob);
//      urgentQueue.processJob(ujob);
//      urgentQueue.processJob(ujob);
//
//      criticalQueue.processJob(cjob);
//      criticalQueue.processJob(cjob);
//
//      tp.stop();
//      ASSERT(6 == iCheck);
//      ASSERT(4 == uCheck);
//      ASSERT(2 == cCheck);
//      return 0;
//  }
//..

#include <bdlscm_version.h>

#include <bdlcc_fixedqueue.h>

#include <bsls_atomic.h>

#include <bslma_allocator.h>
#include <bslma_usesbslmaallocator.h>

#include <bslmf_nestedtraitdeclaration.h>

#include <bsl_functional.h>

#ifndef BDE_DONT_ALLOW_TRANSITIVE_INCLUDES
#include <bslalg_typetraits.h>
#endif // BDE_DONT_ALLOW_TRANSITIVE_INCLUDES

namespace BloombergLP {

namespace bdlmt {
                          // =======================
                          // class ThreadMultiplexor
                          // =======================

class ThreadMultiplexor {
    // This class provides a mechanism for facilitating the use of multiple
    // threads to perform various user-defined functions ("jobs") when some
    // degree of collaboration between threads is required.  The thread
    // multiplexor is configured with a total number of "processors",
    // representing the number of threads that may process jobs at any
    // particular time.  Additional threads enqueue jobs to a pending job
    // queue, which is processed by the next available processing thread.

  public:
    // PUBLIC TYPES
    typedef bsl::function<void()> Job;
        // A callback of this type may be passed to the 'processJob' method.

  private:
    // DATA
    bslma::Allocator     *d_allocator_p;      // memory allocator (held)
    bdlcc::FixedQueue<Job> *d_jobQueue_p;       // pending job queue (owned)
    bsls::AtomicInt        d_numProcessors;    // current number of processors
    int                   d_maxProcessors;    // maximum number of processors

  private:
    // PRIVATE MANIPULATORS
    int processJobQueue();
        // Process the pending job queue.  Execute each functor obtained from
        // the queue in the calling thread if the current number of processors
        // is less than the maximum number of processors.  Otherwise, enqueue
        // the job back to the pending queue.  Return 0 on success, and a
        // non-zero value otherwise.

    // NOT IMPLEMENTED
    ThreadMultiplexor(const ThreadMultiplexor&);
    ThreadMultiplexor& operator=(const ThreadMultiplexor&);

  public:
    // TRAITS
    BSLMF_NESTED_TRAIT_DECLARATION(ThreadMultiplexor,
                                   bslma::UsesBslmaAllocator);

    // CREATORS
    ThreadMultiplexor(int               maxProcessors,
                      int               maxQueueSize,
                      bslma::Allocator *basicAllocator = 0);
        // Create a thread multiplexor which uses, at most, the specified
        // 'maxProcessors' number of threads to process user-specified jobs,
        // identified as callbacks of type 'Job'.  Jobs that cannot be
        // processed immediately are placed on a queue having the specified
        // 'maxQueueSize' to be processed by the next free thread.  Optionally
        // specify a 'basicAllocator' used to supply memory.  If
        // 'basicAllocator' is 0, the currently installed default allocator is
        // used.  The behavior is undefined unless '0 < maxProcessors' and
        // '0 < maxQueueSize'.

    ~ThreadMultiplexor();
        // Destroy this thread multiplexor object.

    // MANIPULATORS
    template <class t_JOBTYPE>
    int processJob(const t_JOBTYPE& job);
        // Process the specified 'job' functor in the calling thread if the
        // current number of processors is less than the maximum number of
        // processors.  Otherwise, enqueue 'job' to the pending job queue.
        // Return 0 on success, and a non-zero value otherwise.  Note that the
        // only requirements on 't_JOBTYPE' are that it defines 'operator()',
        // having a 'void' return type, and that it defines a copy constructor.

    // ACCESSORS
    int maxProcessors() const;
        // Return the maximum number of active processors.

    int numProcessors() const;
        // Return the current number of active processors.
};

// ============================================================================
//                            INLINE DEFINITIONS
// ============================================================================

// MANIPULATORS
template <class t_JOBTYPE>
inline
int ThreadMultiplexor::processJob(const t_JOBTYPE& job)
{
    // Execute 'job' in the calling thread if the current number of processors
    // is less than the maximum number of processors.  Otherwise, enqueue 'job'
    // to the pending job queue.  In all cases, check the pending job queue at
    // the end of the loop, as the number of processors may have changed,
    // allowing the execution of a job in the current thread.

    int previousNumProcessors = d_numProcessors;
    if (previousNumProcessors < d_maxProcessors &&
        previousNumProcessors ==
                      d_numProcessors.testAndSwap(previousNumProcessors,
                                                  previousNumProcessors + 1)) {
        // Process the job
        job();
        --d_numProcessors;
    }
    else {
        int rc = d_jobQueue_p->tryPushBack(job);
        if (0 != rc) {
            return rc;                                                // RETURN
        }
    }

    return processJobQueue();
}

// ACCESSORS
inline
int ThreadMultiplexor::maxProcessors() const
{
    return d_maxProcessors;
}

inline
int ThreadMultiplexor::numProcessors() const
{
    return d_numProcessors;
}

}  // close package namespace
}  // close enterprise namespace
#endif

// ----------------------------------------------------------------------------
// Copyright 2015 Bloomberg Finance L.P.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------- END-OF-FILE ----------------------------------