Quick Links: |
Provide a pool of queues, each processed serially by a thread pool. More...
Namespaces | |
namespace | bdlmt |
bdlmt::MultiQueueThreadPool | multi-threaded, serial processing of queues |
bdlmt::MultiQueueThreadPool
allows clients to create and delete queues, and to enqueue "jobs" (represented as client-specified functors) to specific queues. Queue processing is implemented on top of a bdlmt::ThreadPool
by enqueuing a per-queue functor to the thread pool. Each functor dequeues the next item from its associated queue, processes it, and re-enqueues itself to the thread pool. Since there is at most one representative functor per queue, each queue is guaranteed to be processed serially by the thread pool. bdlmt::ThreadPool
documentation. bdlmt::MultiQueueThreadPool
allows clients to disable and re-enable queues. A disabled queue will allow no further jobs to be enqueued, but will continue to process the jobs that were enqueued prior to the call to disableQueue
. Note that calling disableQueue
will block the calling thread until the currently executing job (if any) on that queue completes. bdlmt::MultiQueueThreadPool
also allows clients to pause and resume queues. Pausing a queue suspends the processing of jobs from a queue -- i.e., after pause
returns no further jobs will be processed on that queue until the queue is resumed. Note that calling pauseQueue
will block the calling thread until the currently executing job (if any) on that queue completes. bdlmt::MultiQueueThreadPool
class is fully thread-safe (i.e., all public methods of a particular instance may safely execute concurrently). This class is also thread-enabled (i.e., the class does not function correctly in a non-multi-threading environment). See bsldoc_glossary
for complete definitions of fully thread-safe and thread-enabled. bdlmt::MultiQueueThreadPool
allows clients to configure the maximum size of a group of jobs that a queue will execute "atomically". "Atomically", in this context, means that no state changes to the queue will be observed by that queue during the processing of the collection of jobs (e.g., a call to pause
will only pause the queue after the currently executing group of jobs completes execution). By default a queue's batch size is 1. Configuring a larger batch size may improve throughput by reducing the synchronization overhead needed to execute a job. However, for many use-cases the overall throughput is limited by the time it takes to process a job (rather than synchronization overhead), so users are strongly encouraged to use benchmarks to guide their decision when setting this option. threadName
attribute of the bslmt::ThreadAttributes
argument passed to the constructor, that will be used for all the sub-threads. The thread name should not be used programmatically, but will appear in debugging tools on platforms that support naming threads to help users identify the source and purpose of a thread. If no ThreadAttributes
object is passed, or if the threadName
attribute is not set, the default value "bdl.MultiQuePl" will be used. Note that this only applies to a bdlmt::ThreadPool
automatically created by a bdlmt::MultiQueueThreadPool
. If a thread pool already exists and is passed to the multi queue thread pool at construction, the subthreads will be named however was specified when that thread pool was created. bdlmt::MultiQueueThreadPool
in a word search application called fastSearch
. fastSearch
searches a list of files for a list of words, and returns the set of files which contain all of the specified words. bdlmt::MultiQueueThreadPool
is used to provide concurrent processing of files, and to simplify the collection of results by serializing access to result sets which are maintained for each word. class my_SearchProfile { // This class defines a search profile consisting of a word and a set // of files (given by name) that contain the word. Here, "word" is // defined as any string of characters. bsl::string d_word; // word to search for bsl::set<bsl::string> d_fileSet; // set of matching files private: // not implemented my_SearchProfile(const my_SearchProfile&); my_SearchProfile& operator=(const my_SearchProfile&); public: // CREATORS my_SearchProfile(const char *word, bslma::Allocator *basicAllocator = 0); // Create a 'my_SearchProfile' with the specified 'word'. // Optionally specify a 'basicAllocator' used to supply memory. If // 'basicAllocator' is 0, the default memory allocator is used. ~my_SearchProfile(); // Destroy this search profile. // MANIPULATORS void insert(const char *file); // Insert the specified 'file' into the file set maintained by this // search profile. // ACCESSORS bool isMatch(const char *file) const; // Return 'true' if the specified 'file' matches this search // profile. const bsl::set<bsl::string>& fileSet() const; // Return a reference to the non-modifiable file set maintained by // this search profile. const bsl::string& word() const; // Return a reference to the non-modifiable word maintained by this // search profile. };
// CREATORS my_SearchProfile::my_SearchProfile(const char *word, bslma::Allocator *basicAllocator) : d_word(basicAllocator) , d_fileSet(bsl::less<bsl::string>(), basicAllocator) { assert(word); d_word.assign(word); } my_SearchProfile::~my_SearchProfile() { } // MANIPULATORS inline void my_SearchProfile::insert(const char *file) { assert(file); d_fileSet.insert(file); } // ACCESSORS bool my_SearchProfile::isMatch(const char *file) const { assert(file); bool found = false; bsl::ifstream ifs(file); bsl::string line; while (bsl::getline(ifs, line)) { if (bsl::string::npos != line.find(d_word)) { found = true; break; } } ifs.close(); return found; } inline const bsl::set<bsl::string>& my_SearchProfile::fileSet() const { return d_fileSet; } inline const bsl::string& my_SearchProfile::word() const { return d_word; }
void my_SearchCb(my_SearchProfile* profile, const char *file) { // Insert the specified 'file' to the file set of the specified search // 'profile' if 'file' matches the 'profile'. assert(profile); assert(file); if (profile->isMatch(file)) { profile->insert(file); } }
fastSearch
. fastSearch
is parameterized by a list of words to search for, a list of files to search in, and a set which is populated with the search results. fastSearch
instantiates a bdlmt::MultiQueueThreadPool
, and creates a queue for each word. It then associates each queue with a search profile based on a word in the word list. Then, it enqueues a job to each queue for each file in the file list that tries to match the file to each search profile. Lastly, fastSearch
collects the results, which is the set intersection of each file set maintained by the individual search profiles. void fastSearch(const bsl::vector<bsl::string>& wordList, const bsl::vector<bsl::string>& fileList, bsl::set<bsl::string>& resultSet, int repetitions = 1, bslma::Allocator *basicAllocator = 0) { // Return the set of files, specified by 'fileList', containing every // word in the specified 'wordList', in the specified 'resultSet'. // Optionally specify 'repetitions', the number of repetitions to run // the search jobs (it is used to increase the load for performance // testing). Optionally specify a 'basicAllocator' used to supply // memory. If 'basicAllocator' is 0, the default memory allocator is // used. typedef bsl::vector<bsl::string> ListType; // This type is defined for notational convenience when iterating // over 'wordList' or 'fileList'. typedef bsl::pair<int, my_SearchProfile*> RegistryValue; // This type is defined for notational convenience. The first // parameter specifies a queue ID. The second parameter specifies // an associated search profile. typedef bsl::map<bsl::string, RegistryValue> RegistryType; // This type is defined for notational convenience. The first // parameter specifies a word. The second parameter specifies a // tuple containing a queue ID, and an associated search profile // containing the specified word. enum { // thread pool configuration k_MIN_THREADS = 4, k_MAX_THREADS = 20, k_MAX_IDLE = 100 // use a very short idle time since new jobs // arrive only at startup }; bslmt::ThreadAttributes defaultAttrs; bdlmt::MultiQueueThreadPool pool(defaultAttrs, k_MIN_THREADS, k_MAX_THREADS, k_MAX_IDLE, basicAllocator); RegistryType profileRegistry(bsl::less<bsl::string>(), basicAllocator); // Create a queue and a search profile associated with each word in // 'wordList'. for (ListType::const_iterator it = wordList.begin(); it != wordList.end(); ++it) { bslma::Allocator *allocator = bslma::Default::allocator(basicAllocator); const bsl::string& word = *it; int id = pool.createQueue(); LOOP_ASSERT(word, 0 != id); my_SearchProfile *profile = new (*allocator) my_SearchProfile(word.c_str(), allocator); bslma::RawDeleterProctor<my_SearchProfile, bslma::Allocator> deleter(profile, allocator); profileRegistry[word] = bsl::make_pair(id, profile); deleter.release(); } // Start the pool, enabling enqueuing and queue processing. pool.start(); // Enqueue a job which tries to match each file in 'fileList' with each // search profile. for (ListType::const_iterator it = fileList.begin(); it != fileList.end(); ++it) { for (ListType::const_iterator jt = wordList.begin(); jt != wordList.end(); ++jt) { const bsl::string& file = *it; const bsl::string& word = *jt; RegistryValue& rv = profileRegistry[word]; Func job; makeFunc(&job, my_SearchCb, rv.second, file.c_str()); for (int i = 0; i < repetitions; ++i) { int rc = pool.enqueueJob(rv.first, job); LOOP_ASSERT(word, 0 == rc); } } } // Stop the pool, and wait while enqueued jobs are processed. pool.stop(); // Construct the 'resultSet' as the intersection of file sets collected // in each search profile. resultSet.insert(fileList.begin(), fileList.end()); for (RegistryType::iterator it = profileRegistry.begin(); it != profileRegistry.end(); ++it) { my_SearchProfile *profile = it->second.second; const bsl::set<bsl::string>& fileSet = profile->fileSet(); bsl::set<bsl::string> tmpSet; bsl::set_intersection(fileSet.begin(), fileSet.end(), resultSet.begin(), resultSet.end(), bsl::inserter(tmpSet, tmpSet.begin())); resultSet = tmpSet; bslma::Default::allocator(basicAllocator)->deleteObjectRaw( profile); } }