Quick Links:

bal | bbl | bdl | bsl

Namespaces

Component bdlmt_multiprioritythreadpool
[Package bdlmt]

Provide a mechanism to parallelize a prioritized sequence of jobs. More...

Namespaces

namespace  bdlmt

Detailed Description

Outline
Purpose:
Provide a mechanism to parallelize a prioritized sequence of jobs.
Classes:
bdlmt::MultipriorityThreadPool mechanism to parallelize prioritized jobs
See also:
Component bslmt_threadutil
Description:
This component defines an implementation of a thread pool in which work items ("jobs") are associated with a limited number of integer priorities that determine the sequence in which enqueued jobs are executed. (See the package-level documentation for general information on thread pools.)
This flavor of our generalized thread pool model associates an integral priority with each work item. For efficiency of implementation, these priorities are limited, as indicated at construction, to a relatively small number N of contiguous integers [ 0 .. N - 1 ], 0 being the most urgent. For this implementation, the maximum number of priorities N is 32. A fixed number of worker threads is also specified at construction. Finally, this thread pool takes an optional allocator supplied at construction. Once configured, these instance parameters remain unchanged for the lifetime of each multi-priority thread pool object.
The associated priority of a job is relevant only while that job is pending; once a job has begun executing, it will not be interrupted or suspended to make way for a another job regardless of their relative priorities. While processing jobs, worker threads will always choose a more urgent job (lower integer value for priority) over a less urgent one. Given two jobs having the same priority value, the one that has been in the thread pool's queue the longest is selected (FIFO order). Note that the number of active worker threads does not increase or decrease depending on load. If no jobs remain to be executed, surplus threads will block until work arrives. If there are more jobs than threads, excess jobs wait in the queue until previous jobs finish.
bdlmt::MultipriorityThreadPool provides two interfaces for specifying jobs: the traditional void function/void pointer interface and the more versatile functor-based interface. The void function/void pointer interface allows callers to use a C-style function to be executed as a job. The application need specify only the address of the function, and a single void * argument to be passed to the function. The specified function will be invoked with the specified argument by the processing (worker) thread. The functor-based interface allows for flexible job execution by copying the passed functor and executing its (invokable) operator() method. Note that the functor gets copied twice before it is executed, once when pushed into the queue, and once when popped out of it, something to keep in mind if the object is going to be expensive to copy. (See the bdef package-level documentation for more information on functors and their use.)
Note that except in the case where numThreads() == 1, we cannot guarantee the exact order of the execution of the jobs in the queue.
Finally an application can specify the attributes of the worker threads in a thread pool (e.g., guard size or stack size), by optionally supplying an appropriately configured bslmt::ThreadAttributes object. (See the bslmt_threadutil component-level documentation for a description of the bslmt::ThreadAttributes class.) Note that the field pertaining to whether the worker threads should be detached or joinable is ignored.
Thread Safety:
The bdlmt::MultipriorityThreadPool class is both fully thread-safe (i.e., all non-creator methods can correctly execute concurrently), and is thread-enabled (i.e., the classes does not function correctly in a non-multi-threading environment). See bsldoc_glossary for complete definitions of fully thread-safe and thread-enabled.
Be aware that the behavior is undefined if any of the following methods are called on a threadpool from any of the threads belonging to that thread pool.
  • stopThreads
  • suspendProcessing
  • drainJobs
Note that, in these cases, such undefined behavior may include deadlock.
Thread Names for Sub-Threads:
To facilitate debugging, users can provide a thread name as the threadName attribute of the bslmt::ThreadAttributes argument passed to the constructor, that will be used for all the sub-threads. The thread name should not be used programmatically, but will appear in debugging tools on platforms that support naming threads to help users identify the source and purpose of a thread. If no ThreadAttributes object is passed, or if the threadName attribute is not set, the default value "bdl.MultiPriPl" will be used.
Usage:
The following two examples illustrate use of the multi-priority thread pool provided in this component.
Example 1: The void function/void pointer Interface:
It is possible to enqueue a job to a multi-priority thread pool as a pointer to a function that takes a single void * argument. This first usage example will demonstrate that high-priority traffic through a thread pool is unimpeded by a much greater quantity of low-priority traffic.
The idea here is we have a large number of jobs submitted in too little time for all of them to be completed. All jobs take the same amount of time to complete, but there are two different priorities of jobs. There are 100 times more jobs of less urgent priority than of the more urgent priority, and there is more than enough time for the jobs of more urgent priority to be completed. We want to verify that all the jobs of more urgent priority get completed while most of the jobs of less urgent priority do not. This will demonstrate that we can construct an arrangement where traffic of low priority, while massively more numerous, does not impede the progress of higher-priority jobs:
   bsls::AtomicInt     urgentJobsDone;
   bsls::AtomicInt lessUrgentJobsDone;

   inline
   extern "C" void *urgentJob(void *)
   {
       bslmt::ThreadUtil::microSleep(10000);          // 10 mSec

       ++urgentJobsDone;

       return 0;
   }

   extern "C" void *lessUrgentJob(void *)
   {
       bslmt::ThreadUtil::microSleep(10000);          // 10 mSec

       ++lessUrgentJobsDone;

       return 0;
   }
The main program (below) enqueues 100 times as many low-priority jobs as high priority ones. 10,100 jobs are submitted, each taking at least 0.01 seconds, for a total cpu time of 101 seconds. We use 20 threads, so that is about 5 seconds. But we shut down the run after only 0.5 seconds, so that means at least 90% of the jobs will not complete. When run, typical output of this program is:
  Jobs done: urgent: 100, less urgent: 507
meaning all of the urgent jobs completed, while approximately 95% of the less urgent jobs did not:
       bdlmt::MultipriorityThreadPool pool(20,  // # of threads
                                         2);  // # of priorities

       bsls::TimeInterval finishTime =
                                  bsls::SystemTime::nowRealtimeClock() + 0.5;
       pool.startThreads();
We use 1 as our less urgent priority, leaving 0 as our urgent priority:
       for (int i = 0; i < 100; ++i) {
           for (int j = 0; j < 100; ++j) {
               pool.enqueueJob(&lessUrgentJob, (void *) 0, 1); // less urgent
           }
           pool.enqueueJob(&urgentJob, (void *) 0, 0);         // urgent
       }

       bslmt::ThreadUtil::sleepUntil(finishTime);
       pool.shutdown();

       bsl::cout << "Jobs done: urgent: " << urgentJobsDone <<
                    ", less urgent: "     << lessUrgentJobsDone << bsl::endl;
Example 2: The Functor-Based Interface:
In this second example we present a multi-threaded algorithm for calculating prime numbers. This is just to serve as an illustration; although it works, it is not really any faster than doing it with a single thread.
For every prime number P, we have to mark all multiples of it in two ranges, [ P .. P ** 2 ] and [ P ** 2 .. TOP_NUMBER ], as non-prime, where we use 2000 for TOP_NUMBER in this example. For any P ** 2, if we can determine that all primes below P have marked all their multiples up to P ** 2, then we can scan that range and any unmarked values in it will be a new prime. The we can start out with our first prime, 2, and mark all primes between it and 2 ** 2 == 4, thus discovering 3 is prime. Once we have marked all multiples of 2 and 3 below 3 * 3 == 9, we can then scan that range and discover 5 and 7 are primes, and repeat the process to discover bigger and bigger primes until we have covered an entire range (in this example all ints below TOP_NUMBER == 2000):
   enum {
       TOP_NUMBER     = 2000,
       NUM_PRIORITIES = 32
   };

   bool isStillPrime[TOP_NUMBER];
   bsls::AtomicInt scannedTo[TOP_NUMBER];  // if 'P' is a prime, what is the
                                           // highest multiple of 'P' that
                                           // we have marked
                                           // 'isStillPrime[P] = false'

   bsls::AtomicInt maxPrimeFound;          // maximum prime identified so far
   int primeNumbers[TOP_NUMBER];           // elements in the range
                                           // '0 .. numPrimeNumbers - 1' are
                                           // the prime numbers we have found
                                           // so far
   bsls::AtomicInt numPrimeNumbers;

   bdlmt::MultipriorityThreadPool *threadPool;

   bool          doneFlag;                 // set this flag to signal
                                           // other jobs that we're done
   bslmt::Barrier doneBarrier(2);           // we wait on this barrier
                                           // to signal the main thread
                                           // that we're done

   struct Functor {
       static bslmt::Mutex s_mutex;
       int                d_numToScan;
       int                d_priority;
       int                d_limit;

       Functor(int numToScan)
       : d_numToScan(numToScan)
       , d_priority(0)
       {
           d_limit = bsl::min((double) numToScan * numToScan,
                              (double) TOP_NUMBER);
       }

       void setNewPrime(int newPrime) {
           maxPrimeFound = newPrime;
           primeNumbers[numPrimeNumbers] = newPrime;
           ++numPrimeNumbers;

           if (2 * newPrime < TOP_NUMBER) {
               Functor f(newPrime);

               threadPool->enqueueJob(f, 0);
           }
       }

       void evaluateCandidatesForPrime() {
           if (maxPrimeFound > d_limit) {
               return;
           }

           int numToScanI;
           for (numToScanI = numPrimeNumbers - 1; numToScanI > 0;
                                                           --numToScanI) {
               if (primeNumbers[numToScanI] == d_numToScan) {
                   break;
               }
           }
           for (int i = numToScanI - 1; i < 0; --i) {
               if (TOP_NUMBER < scannedTo[primeNumbers[i]]) {
                   for (int j = i + 1; j < numPrimeNumbers; ++j) {
                       if (TOP_NUMBER == scannedTo[primeNumbers[j]]) {
                           scannedTo[primeNumbers[j]] = TOP_NUMBER + 1;
                       }
                       else {
                           break;
                       }
                   }
                   break;
               }

               if (scannedTo[primeNumbers[i]] < d_limit) {
                   // Not all multiples of all prime numbers below
                   // us have been adequately marked as non-prime.  We
                   // cannot yet draw any new conclusions about what
                   // is and what is not prime in this range.

                   // Resubmit ourselves to the back of the priority queue
                   // so that we'll get re-evaluated when previous prime
                   // numbers are done scanning.  Note that we could get
                   // reenqueued several times.

                   // Note that jobs marking the 'isStillPrime' array are
                   // at priority 0, while later incarnations that can
                   // only set new primes are at priority 1 and keep
                   // getting resubmitted at less and less urgent
                   // priorities until all their prerequisites (which
                   // are at priority 0) are done.

                   d_priority = bsl::min(NUM_PRIORITIES - 2,
                                         d_priority + 1);
                   threadPool->enqueueJob(*this, d_priority);

                   return;
               }
           }

           // Everything up to 'd_limit' that has not been marked
           // non-prime is prime.

           bslmt::LockGuard<bslmt::Mutex> guard(&s_mutex);

           for (int i = maxPrimeFound + 1; d_limit > i; ++i) {
               if (isStillPrime[i]) {
                   setNewPrime(i);
               }
           }

           if (TOP_NUMBER == d_limit && !doneFlag) {
               // We have successfully listed all primes below 'TOP_NUMBER'.
               // Touch the done barrier and our caller will then know that
               // we are done and shut down the queue.

               doneFlag = true;
               doneBarrier.wait();
           }
       }

       void operator()() {
           if (0 == d_priority) {
               bsls::AtomicInt& rScannedTo = scannedTo[d_numToScan];

               for (int i = d_numToScan; i < d_limit; i += d_numToScan) {
                   isStillPrime[i] = false;
                   rScannedTo = i;
               }

               d_priority = 1;
               threadPool->enqueueJob(*this, d_priority);

               for (int i = d_limit; i < TOP_NUMBER; i += d_numToScan) {
                   isStillPrime[i] = false;
                   rScannedTo = i;
               }
               rScannedTo = TOP_NUMBER;
           }
           else {
               evaluateCandidatesForPrime();
           }
       }
   };
   bslmt::Mutex Functor::s_mutex;
and in the main program:
      bsls::TimeInterval startTime = bsls::SystemTime::nowRealtimeClock();

      for (int i = 0; TOP_NUMBER > i; ++i) {
          isStillPrime[i] = true;
          scannedTo[i] = 0;
      }

      scannedTo[0] = TOP_NUMBER + 1;
      scannedTo[1] = TOP_NUMBER + 1;

      maxPrimeFound = 2;
      primeNumbers[0] = 2;
      numPrimeNumbers = 1;
      doneFlag = false;

      threadPool =
          new (ta) bdlmt::MultipriorityThreadPool(20, NUM_PRIORITIES, &ta);
      threadPool->startThreads();

      bsls::TimeInterval startJobs = bsls::SystemTime::nowRealtimeClock();

      Functor f(2);
      threadPool->enqueueJob(f, 0);

      doneBarrier.wait();

      bsls::TimeInterval finish = bsls::SystemTime::nowRealtimeClock();

      threadPool->shutdown();
      ta.deleteObjectRaw(threadPool);

      if (verbose) {
          bsls::TimeInterval endTime = bsls::SystemTime::nowRealtimeClock();

          double elapsed      = (endTime - startTime).totalSecondsAsDouble();
          double elapsedNoInit = (finish - startJobs).totalSecondsAsDouble();

          printf("Runtime: %g seconds, %g seconds w/o init & cleanup\n",
                                                     elapsed, elapsedNoInit);

          printf("%d prime numbers below %d:", (int) numPrimeNumbers,
                                                            TOP_NUMBER);

          for (int i = 0; numPrimeNumbers > i; ++i) {
              printf("%s%4d", 0 == i % 10 ? "\n    " : ", ",
                     primeNumbers[i]);
          }
          printf("\n");
      }