BDE 4.14.0 Production release
Loading...
Searching...
No Matches
bdlmt_multiqueuethreadpool

Detailed Description

Outline

Purpose

Provide a pool of queues, each processed serially by a thread pool.

Classes

See also
bdlmt_threadpool

Description

This component defines a dynamic, configurable pool of queues, each of which is processed by a thread in a thread pool, such that elements on a given queue are processed serially, regardless of which thread is processing the queue at a given time.

A bdlmt::MultiQueueThreadPool allows clients to create and delete queues, and to enqueue "jobs" (represented as client-specified functors) to specific queues. Queue processing is implemented on top of a bdlmt::ThreadPool by enqueuing a per-queue functor to the thread pool. Each functor dequeues the next item from its associated queue, processes it, and re-enqueues itself to the thread pool. Since there is at most one representative functor per queue, each queue is guaranteed to be processed serially by the thread pool.

In addition to the ability to create, delete, pause, and resume queues, clients are able to tune the underlying thread pool in accordance with the bdlmt::ThreadPool documentation.

Disabled Queues

bdlmt::MultiQueueThreadPool allows clients to disable and re-enable queues. A disabled queue will allow no further jobs to be enqueued, but will continue to process the jobs that were enqueued prior to the call to disableQueue. Note that calling disableQueue will block the calling thread until the currently executing job (if any) on that queue completes.

Paused Queues

bdlmt::MultiQueueThreadPool also allows clients to pause and resume queues. Pausing a queue suspends the processing of jobs from a queue – i.e., after pause returns no further jobs will be processed on that queue until the queue is resumed. Note that calling pauseQueue will block the calling thread until the currently executing job (if any) on that queue completes.

Thread Safety

The bdlmt::MultiQueueThreadPool class is fully thread-safe (i.e., all public methods of a particular instance may safely execute concurrently). This class is also thread-enabled (i.e., the class does not function correctly in a non-multi-threading environment). See bsldoc_glossary for complete definitions of fully thread-safe and thread-enabled.

Job Execution Batch Size

bdlmt::MultiQueueThreadPool allows clients to configure the maximum size of a group of jobs that a queue will execute "atomically". "Atomically", in this context, means that no state changes to the queue will be observed by that queue during the processing of the collection of jobs (e.g., a call to pause will only pause the queue after the currently executing group of jobs completes execution). By default a queue's batch size is 1. Configuring a larger batch size may improve throughput by reducing the synchronization overhead needed to execute a job. However, for many use-cases the overall throughput is limited by the time it takes to process a job (rather than synchronization overhead), so users are strongly encouraged to use benchmarks to guide their decision when setting this option.

Thread Names for Sub-Threads

To facilitate debugging, users can provide a thread name as the threadName attribute of the bslmt::ThreadAttributes argument passed to the constructor, that will be used for all the sub-threads. The thread name should not be used programmatically, but will appear in debugging tools on platforms that support naming threads to help users identify the source and purpose of a thread. If no ThreadAttributes object is passed, or if the threadName attribute is not set, the default value "bdl.MultiQuePl" will be used. Note that this only applies to a bdlmt::ThreadPool automatically created by a bdlmt::MultiQueueThreadPool. If a thread pool already exists and is passed to the multi queue thread pool at construction, the subthreads will be named however was specified when that thread pool was created.

Usage

This section illustrates intended use of this component.

Example 1: A Word Search Application

This example illustrates the use of a bdlmt::MultiQueueThreadPool in a word search application called fastSearch. fastSearch searches a list of files for a list of words, and returns the set of files which contain all of the specified words. bdlmt::MultiQueueThreadPool is used to provide concurrent processing of files, and to simplify the collection of results by serializing access to result sets which are maintained for each word.

First, we present a class used to manage a word, and the set of files which contain that word:

/// This class defines a search profile consisting of a word and a set
/// of files (given by name) that contain the word. Here, "word" is
/// defined as any string of characters.
class my_SearchProfile {
bsl::string d_word; // word to search for
bsl::set<bsl::string> d_fileSet; // set of matching files
private:
// not implemented
my_SearchProfile(const my_SearchProfile&);
my_SearchProfile& operator=(const my_SearchProfile&);
public:
// CREATORS
/// Create a `my_SearchProfile` with the specified `word`.
/// Optionally specify a `basicAllocator` used to supply memory. If
/// `basicAllocator` is 0, the default memory allocator is used.
my_SearchProfile(const char *word,
bslma::Allocator *basicAllocator = 0);
/// Destroy this search profile.
~my_SearchProfile();
// MANIPULATORS
/// Insert the specified `file` into the file set maintained by this
/// search profile.
void insert(const char *file);
// ACCESSORS
/// Return `true` if the specified `file` matches this search profile.
bool isMatch(const char *file) const;
/// Return a reference to the non-modifiable file set maintained by
/// this search profile.
const bsl::set<bsl::string>& fileSet() const;
/// Return a reference to the non-modifiable word maintained by this
/// search profile.
const bsl::string& word() const;
};
Definition bslstl_string.h:1281
Definition bslstl_set.h:657
Definition bslma_allocator.h:457

And the implementation:

// CREATORS
my_SearchProfile::my_SearchProfile(const char *word,
bslma::Allocator *basicAllocator)
: d_word(basicAllocator)
, d_fileSet(bsl::less<bsl::string>(), basicAllocator)
{
assert(word);
d_word.assign(word);
}
my_SearchProfile::~my_SearchProfile()
{
}
// MANIPULATORS
inline
void my_SearchProfile::insert(const char *file)
{
assert(file);
d_fileSet.insert(file);
}
// ACCESSORS
bool my_SearchProfile::isMatch(const char *file) const
{
assert(file);
bool found = false;
bsl::ifstream ifs(file);
while (bsl::getline(ifs, line)) {
if (bsl::string::npos != line.find(d_word)) {
found = true;
break;
}
}
ifs.close();
return found;
}
inline
const bsl::set<bsl::string>& my_SearchProfile::fileSet() const
{
return d_fileSet;
}
inline
const bsl::string& my_SearchProfile::word() const
{
return d_word;
}
size_type find(const basic_string &substring, size_type position=0) const BSLS_KEYWORD_NOEXCEPT
Definition bslstl_string.h:6732
static const size_type npos
Definition bslstl_string.h:1676
Definition bdlb_printmethods.h:283
std::basic_istream< CHAR_TYPE, CHAR_TRAITS > & getline(std::basic_istream< CHAR_TYPE, CHAR_TRAITS > &is, basic_string< CHAR_TYPE, CHAR_TRAITS, ALLOCATOR > &str, CHAR_TYPE delim)

Next, we define a helper function to perform a search of a word in a particular file. The function is parameterized by a search profile and a file name. If the specified file name matches the profile, it is inserted into the profile's file list.

/// Insert the specified `file` to the file set of the specified search
/// `profile` if `file` matches the `profile`.
void my_SearchCb(my_SearchProfile* profile, const char *file)
{
assert(profile);
assert(file);
if (profile->isMatch(file)) {
profile->insert(file);
}
}

Lastly, we present the front end to the search application: fastSearch. fastSearch is parameterized by a list of words to search for, a list of files to search in, and a set which is populated with the search results. fastSearch instantiates a bdlmt::MultiQueueThreadPool, and creates a queue for each word. It then associates each queue with a search profile based on a word in the word list. Then, it enqueues a job to each queue for each file in the file list that tries to match the file to each search profile. Lastly, fastSearch collects the results, which is the set intersection of each file set maintained by the individual search profiles.

/// Return the set of files, specified by `fileList`, containing every
/// word in the specified `wordList`, in the specified `resultSet`.
/// Optionally specify `repetitions`, the number of repetitions to run
/// the search jobs (it is used to increase the load for performance
/// testing). Optionally specify a `basicAllocator` used to supply
/// memory. If `basicAllocator` is 0, the default memory allocator is
/// used.
void fastSearch(const bsl::vector<bsl::string>& wordList,
const bsl::vector<bsl::string>& fileList,
int repetitions = 1,
bslma::Allocator *basicAllocator = 0)
{
typedef bsl::vector<bsl::string> ListType;
// This type is defined for notational convenience when iterating
// over 'wordList' or 'fileList'.
typedef bsl::pair<int, my_SearchProfile*> RegistryValue;
// This type is defined for notational convenience. The first
// parameter specifies a queue ID. The second parameter specifies
// an associated search profile.
// This type is defined for notational convenience. The first
// parameter specifies a word. The second parameter specifies a
// tuple containing a queue ID, and an associated search profile
// containing the specified word.
enum {
// thread pool configuration
k_MIN_THREADS = 4,
k_MAX_THREADS = 20,
k_MAX_IDLE = 100 // use a very short idle time since new jobs
// arrive only at startup
};
bdlmt::MultiQueueThreadPool pool(defaultAttrs,
k_MIN_THREADS,
k_MAX_THREADS,
k_MAX_IDLE,
basicAllocator);
RegistryType profileRegistry(bsl::less<bsl::string>(), basicAllocator);
// Create a queue and a search profile associated with each word in
// 'wordList'.
for (ListType::const_iterator it = wordList.begin();
it != wordList.end();
++it) {
bslma::Allocator *allocator =
bslma::Default::allocator(basicAllocator);
const bsl::string& word = *it;
int id = pool.createQueue();
LOOP_ASSERT(word, 0 != id);
my_SearchProfile *profile = new (*allocator)
my_SearchProfile(word.c_str(),
allocator);
deleter(profile, allocator);
profileRegistry[word] = bsl::make_pair(id, profile);
deleter.release();
}
// Start the pool, enabling enqueuing and queue processing.
pool.start();
// Enqueue a job which tries to match each file in 'fileList' with each
// search profile.
for (ListType::const_iterator it = fileList.begin();
it != fileList.end();
++it) {
for (ListType::const_iterator jt = wordList.begin();
jt != wordList.end();
++jt) {
const bsl::string& file = *it;
const bsl::string& word = *jt;
RegistryValue& rv = profileRegistry[word];
Func job;
makeFunc(&job, my_SearchCb, rv.second, file.c_str());
for (int i = 0; i < repetitions; ++i) {
int rc = pool.enqueueJob(rv.first, job);
LOOP_ASSERT(word, 0 == rc);
}
}
}
// Stop the pool, and wait while enqueued jobs are processed.
pool.stop();
// Construct the 'resultSet' as the intersection of file sets collected
// in each search profile.
resultSet.insert(fileList.begin(), fileList.end());
for (RegistryType::iterator it = profileRegistry.begin();
it != profileRegistry.end();
++it) {
my_SearchProfile *profile = it->second.second;
const bsl::set<bsl::string>& fileSet = profile->fileSet();
bsl::set_intersection(fileSet.begin(),
fileSet.end(),
resultSet.begin(),
resultSet.end(),
bsl::inserter(tmpSet, tmpSet.begin()));
resultSet = tmpSet;
profile);
}
}
Definition bdlmt_multiqueuethreadpool.h:636
const CHAR_TYPE * c_str() const BSLS_KEYWORD_NOEXCEPT
Definition bslstl_string.h:6705
Definition bslstl_map.h:619
Definition bslstl_pair.h:1210
pair< iterator, bool > insert(const value_type &value)
Definition bslstl_set.h:2326
set &operator=(BloombergLP::bslmf::MovableRef< set > rhs) BSLS_KEYWORD_NOEXCEPT_SPECIFICATION(AllocatorTraits iterator begin() BSLS_KEYWORD_NOEXCEPT
Definition bslstl_set.h:2294
iterator end() BSLS_KEYWORD_NOEXCEPT
Definition bslstl_set.h:2302
iterator begin() BSLS_KEYWORD_NOEXCEPT
Definition bslstl_vector.h:2511
iterator end() BSLS_KEYWORD_NOEXCEPT
Definition bslstl_vector.h:2519
Definition bslstl_vector.h:1025
void deleteObjectRaw(const TYPE *object)
Definition bslma_allocator.h:694
Definition bslma_rawdeleterproctor.h:242
Definition bslmt_threadattributes.h:356
static Allocator * allocator(Allocator *basicAllocator=0)
Definition bslma_default.h:897