Quick Links:

bal | bbl | bdl | bsl

Namespaces

Component bdlmt_multiqueuethreadpool
[Package bdlmt]

Provide a pool of queues, each processed serially by a thread pool. More...

Namespaces

namespace  bdlmt

Detailed Description

Outline
Purpose:
Provide a pool of queues, each processed serially by a thread pool.
Classes:
bdlmt::MultiQueueThreadPool multi-threaded, serial processing of queues
See also:
Component bdlmt_threadpool
Description:
This component defines a dynamic, configurable pool of queues, each of which is processed by a thread in a thread pool, such that elements on a given queue are processed serially, regardless of which thread is processing the queue at a given time.
A bdlmt::MultiQueueThreadPool allows clients to create and delete queues, and to enqueue "jobs" (represented as client-specified functors) to specific queues. Queue processing is implemented on top of a bdlmt::ThreadPool by enqueuing a per-queue functor to the thread pool. Each functor dequeues the next item from its associated queue, processes it, and re-enqueues itself to the thread pool. Since there is at most one representative functor per queue, each queue is guaranteed to be processed serially by the thread pool.
In addition to the ability to create, delete, pause, and resume queues, clients are able to tune the underlying thread pool in accordance with the bdlmt::ThreadPool documentation.
Disabled Queues:
bdlmt::MultiQueueThreadPool allows clients to disable and re-enable queues. A disabled queue will allow no further jobs to be enqueued, but will continue to process the jobs that were enqueued prior to the call to disableQueue. Note that calling disableQueue will block the calling thread until the currently executing job (if any) on that queue completes.
Paused Queues:
bdlmt::MultiQueueThreadPool also allows clients to pause and resume queues. Pausing a queue suspends the processing of jobs from a queue -- i.e., after pause returns no further jobs will be processed on that queue until the queue is resumed. Note that calling pauseQueue will block the calling thread until the currently executing job (if any) on that queue completes.
Thread Safety:
The bdlmt::MultiQueueThreadPool class is fully thread-safe (i.e., all public methods of a particular instance may safely execute concurrently). This class is also thread-enabled (i.e., the class does not function correctly in a non-multi-threading environment). See bsldoc_glossary for complete definitions of fully thread-safe and thread-enabled.
Job Execution Batch Size:
bdlmt::MultiQueueThreadPool allows clients to configure the maximum size of a group of jobs that a queue will execute "atomically". "Atomically", in this context, means that no state changes to the queue will be observed by that queue during the processing of the collection of jobs (e.g., a call to pause will only pause the queue after the currently executing group of jobs completes execution). By default a queue's batch size is 1. Configuring a larger batch size may improve throughput by reducing the synchronization overhead needed to execute a job. However, for many use-cases the overall throughput is limited by the time it takes to process a job (rather than synchronization overhead), so users are strongly encouraged to use benchmarks to guide their decision when setting this option.
Thread Names for Sub-Threads:
To facilitate debugging, users can provide a thread name as the threadName attribute of the bslmt::ThreadAttributes argument passed to the constructor, that will be used for all the sub-threads. The thread name should not be used programmatically, but will appear in debugging tools on platforms that support naming threads to help users identify the source and purpose of a thread. If no ThreadAttributes object is passed, or if the threadName attribute is not set, the default value "bdl.MultiQuePl" will be used. Note that this only applies to a bdlmt::ThreadPool automatically created by a bdlmt::MultiQueueThreadPool. If a thread pool already exists and is passed to the multi queue thread pool at construction, the subthreads will be named however was specified when that thread pool was created.
Usage:
This section illustrates intended use of this component.
Example 1: A Word Search Application:
This example illustrates the use of a bdlmt::MultiQueueThreadPool in a word search application called fastSearch. fastSearch searches a list of files for a list of words, and returns the set of files which contain all of the specified words. bdlmt::MultiQueueThreadPool is used to provide concurrent processing of files, and to simplify the collection of results by serializing access to result sets which are maintained for each word.
First, we present a class used to manage a word, and the set of files which contain that word:
  class my_SearchProfile {
      // This class defines a search profile consisting of a word and a set
      // of files (given by name) that contain the word.  Here, "word" is
      // defined as any string of characters.

      bsl::string           d_word;     // word to search for
      bsl::set<bsl::string> d_fileSet;  // set of matching files

    private:
      // not implemented
      my_SearchProfile(const my_SearchProfile&);
      my_SearchProfile& operator=(const my_SearchProfile&);

    public:
      // CREATORS
      my_SearchProfile(const char       *word,
                       bslma::Allocator *basicAllocator = 0);
          // Create a 'my_SearchProfile' with the specified 'word'.
          // Optionally specify a 'basicAllocator' used to supply memory.  If
          // 'basicAllocator' is 0, the default memory allocator is used.

      ~my_SearchProfile();
          // Destroy this search profile.

      // MANIPULATORS
      void insert(const char *file);
          // Insert the specified 'file' into the file set maintained by this
          // search profile.

      // ACCESSORS
      bool isMatch(const char *file) const;
          // Return 'true' if the specified 'file' matches this search
          // profile.

      const bsl::set<bsl::string>& fileSet() const;
          // Return a reference to the non-modifiable file set maintained by
          // this search profile.

      const bsl::string& word() const;
          // Return a reference to the non-modifiable word maintained by this
          // search profile.
  };
And the implementation:
  // CREATORS
  my_SearchProfile::my_SearchProfile(const char       *word,
                                     bslma::Allocator *basicAllocator)
  : d_word(basicAllocator)
  , d_fileSet(bsl::less<bsl::string>(), basicAllocator)
  {
      assert(word);

      d_word.assign(word);
  }

  my_SearchProfile::~my_SearchProfile()
  {
  }

  // MANIPULATORS
  inline
  void my_SearchProfile::insert(const char *file)
  {
      assert(file);

      d_fileSet.insert(file);
  }

  // ACCESSORS
  bool my_SearchProfile::isMatch(const char *file) const
  {
      assert(file);

      bool          found = false;
      bsl::ifstream ifs(file);
      bsl::string   line;
      while (bsl::getline(ifs, line)) {
          if (bsl::string::npos != line.find(d_word)) {
              found = true;
              break;
          }
      }
      ifs.close();
      return found;
  }

  inline
  const bsl::set<bsl::string>& my_SearchProfile::fileSet() const
  {
      return d_fileSet;
  }

  inline
  const bsl::string& my_SearchProfile::word() const
  {
      return d_word;
  }
Next, we define a helper function to perform a search of a word in a particular file. The function is parameterized by a search profile and a file name. If the specified file name matches the profile, it is inserted into the profile's file list.
  void my_SearchCb(my_SearchProfile* profile, const char *file)
  {
      // Insert the specified 'file' to the file set of the specified search
      // 'profile' if 'file' matches the 'profile'.

      assert(profile);
      assert(file);

      if (profile->isMatch(file)) {
          profile->insert(file);
      }
  }
Lastly, we present the front end to the search application: fastSearch. fastSearch is parameterized by a list of words to search for, a list of files to search in, and a set which is populated with the search results. fastSearch instantiates a bdlmt::MultiQueueThreadPool, and creates a queue for each word. It then associates each queue with a search profile based on a word in the word list. Then, it enqueues a job to each queue for each file in the file list that tries to match the file to each search profile. Lastly, fastSearch collects the results, which is the set intersection of each file set maintained by the individual search profiles.
  void fastSearch(const bsl::vector<bsl::string>&  wordList,
                  const bsl::vector<bsl::string>&  fileList,
                  bsl::set<bsl::string>&           resultSet,
                  int                              repetitions = 1,
                  bslma::Allocator                *basicAllocator = 0)
  {
      // Return the set of files, specified by 'fileList', containing every
      // word in the specified 'wordList', in the specified 'resultSet'.
      // Optionally specify 'repetitions', the number of repetitions to run
      // the search jobs (it is used to increase the load for performance
      // testing).  Optionally specify a 'basicAllocator' used to supply
      // memory.  If 'basicAllocator' is 0, the default memory allocator is
      // used.

      typedef bsl::vector<bsl::string> ListType;
          // This type is defined for notational convenience when iterating
          // over 'wordList' or 'fileList'.

      typedef bsl::pair<int, my_SearchProfile*> RegistryValue;
          // This type is defined for notational convenience.  The first
          // parameter specifies a queue ID.  The second parameter specifies
          // an associated search profile.

      typedef bsl::map<bsl::string, RegistryValue> RegistryType;
          // This type is defined for notational convenience.  The first
          // parameter specifies a word.  The second parameter specifies a
          // tuple containing a queue ID, and an associated search profile
          // containing the specified word.

      enum {
          // thread pool configuration
          k_MIN_THREADS = 4,
          k_MAX_THREADS = 20,
          k_MAX_IDLE    = 100  // use a very short idle time since new jobs
                               // arrive only at startup
      };
      bslmt::ThreadAttributes     defaultAttrs;
      bdlmt::MultiQueueThreadPool pool(defaultAttrs,
                                       k_MIN_THREADS,
                                       k_MAX_THREADS,
                                       k_MAX_IDLE,
                                       basicAllocator);
      RegistryType profileRegistry(bsl::less<bsl::string>(), basicAllocator);

      // Create a queue and a search profile associated with each word in
      // 'wordList'.

      for (ListType::const_iterator it = wordList.begin();
           it != wordList.end();
           ++it) {
          bslma::Allocator *allocator =
                                   bslma::Default::allocator(basicAllocator);

          const bsl::string& word = *it;
          int                id = pool.createQueue();
          LOOP_ASSERT(word, 0 != id);
          my_SearchProfile *profile = new (*allocator)
                                               my_SearchProfile(word.c_str(),
                                                                allocator);

          bslma::RawDeleterProctor<my_SearchProfile, bslma::Allocator>
                                                 deleter(profile, allocator);

          profileRegistry[word] = bsl::make_pair(id, profile);
          deleter.release();
      }

      // Start the pool, enabling enqueuing and queue processing.
      pool.start();

      // Enqueue a job which tries to match each file in 'fileList' with each
      // search profile.

      for (ListType::const_iterator it = fileList.begin();
           it != fileList.end();
           ++it) {
          for (ListType::const_iterator jt = wordList.begin();
               jt != wordList.end();
               ++jt) {
              const bsl::string& file = *it;
              const bsl::string& word = *jt;
              RegistryValue&     rv   = profileRegistry[word];
              Func               job;
              makeFunc(&job, my_SearchCb, rv.second, file.c_str());
              for (int i = 0; i < repetitions; ++i) {
                  int rc = pool.enqueueJob(rv.first, job);
                  LOOP_ASSERT(word, 0 == rc);
              }
          }
      }

      // Stop the pool, and wait while enqueued jobs are processed.
      pool.stop();

      // Construct the 'resultSet' as the intersection of file sets collected
      // in each search profile.

      resultSet.insert(fileList.begin(), fileList.end());
      for (RegistryType::iterator it = profileRegistry.begin();
           it != profileRegistry.end();
           ++it) {
          my_SearchProfile *profile = it->second.second;
          const bsl::set<bsl::string>& fileSet = profile->fileSet();
          bsl::set<bsl::string> tmpSet;
          bsl::set_intersection(fileSet.begin(),
                                fileSet.end(),
                                resultSet.begin(),
                                resultSet.end(),
                                bsl::inserter(tmpSet, tmpSet.begin()));
          resultSet = tmpSet;
          bslma::Default::allocator(basicAllocator)->deleteObjectRaw(
                                                                    profile);
      }
  }