Quick Links:

bal | bbl | bdl | bsl

Namespaces | Typedefs

Component bdlmt_threadpool
[Package bdlmt]

Provide portable implementation for a dynamic pool of threads. More...

Namespaces

namespace  bdlmt

Typedefs

typedef void(* bcep_ThreadPoolJobFunc )(void *)

Detailed Description

Outline
Purpose:
Provide portable implementation for a dynamic pool of threads.
Classes:
bdlmt::ThreadPool portable dynamic thread pool
See also:
Description:
This component defines a portable and efficient implementation of a thread pool that can be used to distribute various user-defined functions ("jobs") to separate threads and execute the jobs concurrently. The thread pool manages a dynamic set of processing threads, adding or removing threads to manage load, based upon user-defined parameters.
The pool uses a queue mechanism to distribute work among the threads. Jobs are queued for execution as they arrive, and each queued job is processed by the next available thread. If no threads are available, new threads are created dynamically (up to the application defined maximum number). If the maximum number of concurrent threads has been reached, new jobs will remain enqueued until a thread becomes available. If the threads become idle for longer than a user-defined maximum idle time, they are automatically destroyed, releasing unused resources. A client-defined minimum number of threads is always maintained even when there is no work to be done.
The thread pool provides two interfaces for specifying jobs: the commonly used "void function/void pointer" interface and the more versatile functor based interface. The void function/void pointer interface allows callers to use a C-style function to be executed as a job. The application need only specify the address of the function, and a single void pointer argument, to be passed to the function. The specified function will be invoked with the specified argument by the processing thread. The functor based interface allows for flexible job execution such as the invocation of member functions or the passing of multiple user-defined arguments. See the bdef package documentation for more on functors and their usage.
An application can tune the thread pool by adjusting the minimum and maximum number of threads in the pool, and the maximum amount of time that dynamically created threads can idle before being destroyed. To avoid unnecessary and inefficient thread creation/destruction, an application should select a value for the minimum number of threads that reflects the expected average load. A higher value for the maximum number of threads can be used to handle periodic bursts. An application can also specify the attributes of the threads in the pool (e.g., thread priority or stack size), by providing a bslmt::ThreadAttributes object with the desired values set. See bslmt_threadutil package documentation for a description of bslmt::ThreadAttributes.
Thread pools are ideal for developing multi-threaded server applications. A server need only package client requests to execute as jobs, and bdlmt::ThreadPool will handle the queue management, thread management, and request dispatching. Thread pools are also well suited for parallelizing certain types of application logic. Without any complex or redundant thread management code, an application can easily create a thread pool, enqueue a series of jobs to be executed, and wait until all the jobs have executed.
Thread Safety:
The bdlmt::ThreadPool class is both fully thread-safe (i.e., all non-creator methods can correctly execute concurrently), and is thread-enabled (i.e., the class does not function correctly in a non-multi-threading environment). See bsldoc_glossary for complete definitions of fully thread-safe and thread-enabled.
Synchronous Signals on Unix:
A thread pool ensures that, on unix platforms, all the threads in the pool block all asynchronous signals. Specifically all the signals, except the following synchronous signals are blocked.
SIGBUS SIGFPE SIGILL SIGSEGV SIGSYS SIGABRT SIGTRAP SIGIOT
Usage:
This example demonstrates the use of a bdlmt::ThreadPool to parallelize a segment of program logic. The example implements a multi-threaded file search utility. The utility searches multiple files for a string, similar to the Unix command fgrep; the use of a bdlmt::ThreadPool allows the utility to search multiple files concurrently.
The example program will take as input a string and a list of files to search. The program creates a bdlmt::ThreadPool, and then enqueues a single "job" for each file to be searched. Each thread in the pool will take a job from the queue, open the file, and search for the string. If a match is found, the job adds the filename to an array of matching filenames. Because this array of filenames is shared across multiple jobs and across multiple threads, access to the array is controlled via a bslmt::Mutex.
Setting ThreadPool Attributes:
To get started, we declare thread attributes, to be used in constructing the thread pool. In this example, our choices for minimum search threads and maximum idle time are arbitrary; we don't expect the thread pool to become idle, so the thread pool should not begin to delete unused threads before the program terminates.
However, a maximum number of 50 threads is meaningful, and may affect overall performance. The maximum should cover the expected peak, in this case, the maximum number of files to search. However, if the maximum is too large for a given platform, it may cause a bottleneck as the operating system spends significant resources switching context among multiple threads. Also we use a very short idle time since new jobs will arrive only at startup.
  const int                MIN_SEARCH_THREADS = 10;
  const int                MAX_SEARCH_THREADS = 50;
  const bsls::TimeInterval MAX_SEARCH_THREAD_IDLE(0, 100000000);
Below is the structure that will be used to pass arguments to the file search function. Since each job will be searching a separate file, a distinct instance of the structure will be used for each job.
   struct my_FastSearchJobInfo {
       const bsl::string        *d_word;    // word to search for
       const bsl::string        *d_path;    // path of the file to search
       bslmt::Mutex             *d_mutex;   // mutex to control access to the
                                            // result file list
       bsl::vector<bsl::string> *d_outList; // list of matching files
   };
The "void function/void pointer" Interface:
myFastSearchJob is the search function to be executed as a job by threads in the thread pool, matching the "void function/void pointer" interface. The single void * argument is received and cast to point to a 'struct my_FastSearchJobInfo', which then points to the search string and a single file to be searched. Note that different my_FastSearchJobInfo structures for the same search request will differ only in the attribute d_path, which points to a specific filename among the set of files to be searched; other fields will be identical across all structures for a given Fast Search.
See the following section for an illustration of the functor interface.
   static void myFastSearchJob(void *arg)
   {
       my_FastSearchJobInfo *job =  (my_FastSearchJobInfo*)arg;
       FILE *file;

       file = fopen(job->d_path->c_str(), "r");

       if (file) {
           char  buffer[1024];
           size_t nread;
           size_t wordLen = job->d_word->length();
           const char *word = job->d_word->c_str();

           nread = fread(buffer, 1, sizeof(buffer) - 1, file);
           while(nread >= wordLen) {
               buffer[nread] = 0;
               if (strstr(buffer, word)) {
If we find a match, we add the file to the result list and return. Since the result list is shared among multiple processing threads, we use a mutex lock to regulate access to the list. We use a bslmt::LockGuard to manage access to the mutex lock. This template object acquires a mutex lock on job->d_mutex at construction, releases that lock on destruction. Thus, the mutex will be locked within the scope of the if block, and released when the program exits that scope.
See bslmt_threadutil for information about the bslmt::Mutex class, and component bslmt_lockguard for information about the bslmt::LockGuard template class.
                bslmt::LockGuard<bslmt::Mutex> lock(job->d_mutex);
                job->d_outList->push_back(*job->d_path);
                break;  // bslmt::LockGuard destructor unlocks mutex.
            }
            memcpy(buffer, &buffer[nread - wordLen - 1], wordLen - 1);
            nread = fread(buffer + wordLen - 1, 1, sizeof(buffer) - wordLen,
                          file);
        }
        fclose(file);
       }
   }
Routine myFastSearch is the main driving routine, taking three arguments: a single string to search for (word), a list of files to search, and an output list of files. When the function completes, the file list will contain the names of files where a match was found.
   void  myFastSearch(const bsl::string&              word,
                      const bsl::vector<bsl::string>& fileList,
                      bsl::vector<bsl::string>&       outFileList)
   {
       bslmt::Mutex            mutex;
       bslmt::ThreadAttributes defaultAttributes;
We initialize the thread pool using default thread attributes. We then start the pool so that the threads can begin while we prepare the jobs.
       bdlmt::ThreadPool       pool(defaultAttributes,
                                    MIN_SEARCH_THREADS,
                                    MAX_SEARCH_THREADS,
                                    MAX_SEARCH_THREAD_IDLE);

       if (0 != pool.start()) {
           bsl::cerr << "Failed to start minimum number of threads.\n";
           exit(1);
       }
For each file to be searched, we create the job info structure that will be passed to the search function and add the job to the pool.
As noted above, all jobs will share a single mutex to guard the output file list. Function myFastSearchJob uses a bslmt::LockGuard on this mutex to serialize access to the list.
       int count = fileList.size();
       my_FastSearchJobInfo *jobInfoArray = new my_FastSearchJobInfo[count];

       for (int i = 0; i < count; ++i) {
           my_FastSearchJobInfo &job = jobInfoArray[i];
           job.d_word    = &word;
           job.d_path    = &fileList[i];
           job.d_mutex   = &mutex;
           job.d_outList = &outFileList;
           pool.enqueueJob(myFastSearchJob, &job);
       }
Now we simply wait for all the jobs in the queue to complete. Any matched files should have been added to the output file list.
       pool.drain();
       delete[] jobInfoArray;
   }
The Functor Interface:
The "void function/void pointer" convention is idiomatic for C programs. The void pointer argument provides a generic way of passing in user data, without regard to the data type. Clients who prefer better or more explicit type safety may wish to use the Functor Interface instead. This interface uses the bsl::function component to provide type-safe wrappers that can match argument number and type for a C++ free function or member function.
To illustrate the Functor Interface, we will make two small changes to the usage example above. First, we change the signature of the function that executes a single job, so that it uses a my_FastSearchJobInfo pointer rather than a void pointer. With this change, we can remove the first executable statement, which casts the void * pointer to my_FastSearchJobInfo *.
  static void my_FastFunctorSearchJob(my_FastSearchJobInfo *job)
  {
      FILE *file;

      file = fopen(job->d_path->c_str(), "r");

      // The rest of the function is unchanged.
      if (file) {
          char  buffer[1024];
          size_t nread;
          size_t wordLen = job->d_word->length();
          const char *word = job->d_word->c_str();

          nread = fread(buffer, 1, sizeof(buffer) - 1, file);
          while(nread >= wordLen) {
              buffer[nread] = 0;
              if (strstr(buffer, word)) {
                  bslmt::LockGuard<bslmt::Mutex> lock(job->d_mutex);
                  job->d_outList->push_back(*job->d_path);
                  break;  // bslmt::LockGuard destructor unlocks mutex.
              }
          }
          bsl::memcpy(buffer, &buffer[nread - wordLen - 1], wordLen - 1);
          nread = fread(buffer + wordLen - 1, 1, sizeof(buffer) - wordLen,
                        file);
      }
      fclose(file);
  }
Next, we make a change to the loop that enqueues the jobs in myFastSearch. The function starts exactly as in the previous example:
  static void myFastFunctorSearch(const string&         word,
                                  const vector<string>& fileList,
                                  vector<string>&       outFileList)
  {
      bslmt::Mutex            mutex;
      bslmt::ThreadAttributes defaultAttributes;
      bdlmt::ThreadPool       pool(defaultAttributes,
                                   MIN_SEARCH_THREADS,
                                   MAX_SEARCH_THREADS,
                                   MAX_SEARCH_THREAD_IDLE);

      if (0 != pool.start()) {
          bsl::cerr << "Failed to start minimum number of threads.  "
                    << "Thread quota exceeded?\n";
          assert(false);
          return; // things are SNAFU
      }

      int count = fileList.size();
      my_FastSearchJobInfo  *jobInfoArray = new my_FastSearchJobInfo[count];
We create a functor - a C++ object that acts as a function. The thread pool will "execute" this functor (by calling its operator() member function) on a thread when one becomes available.
      for (int i = 0; i < count; ++i) {
          my_FastSearchJobInfo &job = jobInfoArray[i];
          job.d_word    = &word;
          job.d_path    = &fileList[i];
          job.d_mutex   = &mutex;
          job.d_outList = &outFileList;

          bsl::function<void()> jobHandle =
                        bdlf::BindUtil::bind(&my_FastFunctorSearchJob, &job);
          pool.enqueueJob(jobHandle);
      }
Note that the functor is created locally and handed to the thread pool. The thread pool copies the functor onto its internal queue, and takes responsibility for the copied functor until execution is complete.
The function is completed exactly as it was in the previous example.
      pool.drain();
      delete[] jobInfoArray;
  }

Typedef Documentation

typedef void(* bcep_ThreadPoolJobFunc)(void *)

This type declares the prototype for functions that are suitable to be specified bdlmt::FixedThreadPool::enqueueJob.