BDE 4.14.0 Production release
Loading...
Searching...
No Matches
bdlmt_fixedthreadpool

Typedefs

typedef void(* bcep_FixedThreadPoolJobFunc) (void *)
 

Detailed Description

Outline

Purpose

Provide portable implementation for a fixed-size pool of threads.

Classes

See also
bdlmt_threadpool

Description

This component defines a portable and efficient implementation of a thread pool, bdlmt::FixedThreadPool, that can be used to distribute various user-defined functions ("jobs") to a separate threads to execute the jobs concurrently. Each thread pool object manages a fixed number of processing threads and can hold up to a fixed maximum number of pending jobs.

bdlmt::FixedThreadPool implements a queuing mechanism that distributes work among the threads. Jobs are queued for execution as they arrive, and each queued job is processed by the next available thread. If each of the concurrent threads is busy processing a job, new jobs will remain enqueued until a thread becomes available. If the queue capacity is reached, enqueuing jobs will block until threads consume more jobs from the queue, causing its length to drop below its capacity. Both the queue's capacity and number of threads are specified at construction and cannot be changed.

The thread pool provides two interfaces for specifying jobs: the commonly used "void function/void pointer" interface and the more versatile functor based interface. The void function/void pointer interface allows callers to use a C-style function to be executed as a job. The application need only specify the address of the function, and a single void pointer argument, to be passed to the function. The specified function will be invoked with the specified argument by the processing thread. The functor based interface allows for more flexible job execution such as the invocation of member functions or the passing of multiple user-defined arguments. See the bdef package-level documentation for more on functors and their usage.

Unlike a bdlmt::ThreadPool, an application can not tune a bdlmt::FixedThreadPool once it is created with a specified number of threads and queue capacity, hence the name "fixed" thread pool. An application can, however, specify the attributes of the threads in the pool (e.g., thread priority or stack size), by providing a bslmt::ThreadAttributes object with the desired values set. See bslmt_threadutil package documentation for a description of bslmt::ThreadAttributes.

Thread pools are ideal for developing multi-threaded server applications. A server need only package client requests to execute as jobs, and bdlmt::FixedThreadPool will handle the queue management, thread management, and request dispatching. Thread pools are also well suited for parallelizing certain types of application logic. Without any complex or redundant thread management code, an application can easily create a thread pool, enqueue a series of jobs to be executed, and wait until all the jobs have executed.

Thread Safety

The bdlmt::FixedThreadPool class is both fully thread-safe (i.e., all non-creator methods can correctly execute concurrently), and is thread-enabled (i.e., the classes does not function correctly in a non-multi-threading environment). See bsldoc_glossary for complete definitions of fully thread-safe and thread-enabled.

Synchronous Signals on Unix

A thread pool ensures that, on unix platforms, all the threads in the pool block all asynchronous signals. Specifically all the signals, except the following synchronous signals are blocked:

SIGBUS
SIGFPE
SIGILL
SIGSEGV
SIGSYS
SIGABRT
SIGTRAP
SIGIOT

Thread Names for Sub-Threads

To facilitate debugging, users can provide a thread name as the threadName attribute of the bslmt::ThreadAttributes argument passed to the constructor, that will be used for all the sub-threads. The thread name should not be used programmatically, but will appear in debugging tools on platforms that support naming threads to help users identify the source and purpose of a thread. If no ThreadAttributes object is passed, or if the threadName attribute is not set, the default value "bdl.FixedPool" will be used.

Usage

This example demonstrates the use of a bdlmt::FixedThreadPool to parallelize a segment of program logic. The example implements a multi-threaded file search utility. The utility searches multiple files for a string, similar to the Unix command fgrep; the use of a bdlmt::FixedThreadPool allows the utility to search multiple files concurrently.

The example program will take as input a string and a list of files to search. The program creates a bdlmt::FixedThreadPool, and then enqueues a single "job" for each file to be searched. Each thread in the pool will take a job from the queue, open the file, and search for the string. If a match is found, the job adds the filename to an array of matching filenames. Because this array of filenames is shared across multiple jobs and across multiple threads, access to the array is controlled via a bslmt::Mutex.

Setting FixedThreadPool Attributes

To get started, we declare thread attributes, to be used in constructing the thread pool. In this example, our choices for number of threads and queue capacity are arbitrary.

#define SEARCH_THREADS 10
#define SEARCH_QUEUE_CAPACITY 50

Below is the structure that will be used to pass arguments to the file search function. Since each job will be searching a separate file, a distinct instance of the structure will be used for each job.

struct my_FastSearchJobInfo {
const bsl::string *d_word; // word to search for
const bsl::string *d_path; // path of the file to search
bslmt::Mutex *d_mutex; // mutex to control access to the
// result file list
bsl::vector<bsl::string> *d_outList; // list of matching files
};
Definition bslstl_string.h:1281
Definition bslstl_vector.h:1025
Definition bslmt_mutex.h:315

The "void functionvoid pointer" Interface

myFastSearchJob is the search function to be executed as a job by threads in the thread pool, matching the "void function/void pointer" interface. The single void * argument is received and cast to point to a struct my_FastSearchJobInfo, which then points to the search string and a single file to be searched. Note that different my_FastSearchJobInfo structures for the same search request will differ only in the attribute d_path, which points to a specific filename among the set of files to be searched; other fields will be identical across all structures for a given Fast Search.

See the following section for an illustration of the functor interface.

static void myFastSearchJob(void *arg)
{
my_FastSearchJobInfo *job = (my_FastSearchJobInfo*)arg;
FILE *file;
file = fopen(job->d_path->c_str(), "r");
if (file) {
char buffer[1024];
size_t nread;
int wordLen = job->d_word->length();
const char *word = job->d_word->c_str();
nread = fread(buffer, 1, sizeof(buffer) - 1, file);
while (nread >= wordLen) {
buffer[nread] = 0;
if (strstr(buffer, word)) {

If we find a match, we add the file to the result list and return. Since the result list is shared among multiple processing threads, we use a mutex lock to regulate access to the list. We use a bslmt::LockGuard to manage access to the mutex lock. This template object acquires a mutex lock on job->d_mutex at construction, releases that lock on destruction. Thus, the mutex will be locked within the scope of the if block, and released when the program exits that scope.

See bslmt_threadutil for information about the bslmt::Mutex class, and component bslmt_lockguard for information about the bslmt::LockGuard template class.

bslmt::LockGuard<bslmt::Mutex> lock(job->d_mutex);
job->d_outList->push_back(*job->d_path);
break; // bslmt::LockGuard destructor unlocks mutex.
}
memcpy(buffer, &buffer[nread - wordLen - 1], wordLen - 1);
nread = fread(buffer + wordLen - 1, 1, sizeof(buffer) - wordLen,
file);
}
fclose(file);
}
}
Definition bslmt_lockguard.h:234

Routine myFastSearch is the main driving routine, taking three arguments: a single string to search for (word), a list of files to search, and an output list of files. When the function completes, the file list will contain the names of files where a match was found.

void myFastSearch(const bsl::string& word,
const bsl::vector<bsl::string>& fileList,
{
bslmt::Mutex mutex;
bslmt::ThreadAttributes defaultAttributes;
Definition bslmt_threadattributes.h:356

We initialize the thread pool using default thread attributes. We then start the pool so that the threads can begin while we prepare the jobs.

bdlmt::FixedThreadPool pool(defaultAttributes,
SEARCH_THREADS,
SEARCH_QUEUE_CAPACITY);
if (0 != pool.start()) {
bsl::cerr << "Thread start() failed. Thread quota exceeded?"
<< bsl::endl;
exit(1);
}
Definition bdlmt_fixedthreadpool.h:413

For each file to be searched, we create the job info structure that will be passed to the search function and add the job to the pool.

As noted above, all jobs will share a single mutex to guard the output file list. Function myFastSearchJob uses a bslmt::LockGuard on this mutex to serialize access to the list.

int count = fileList.size();
my_FastSearchJobInfo *jobInfoArray = new my_FastSearchJobInfo[count];
for (int i = 0; i < count; ++i) {
my_FastSearchJobInfo &job = jobInfoArray[i];
job.d_word = &word;
job.d_path = &fileList[i];
job.d_mutex = &mutex;
job.d_outList = &outFileList;
pool.enqueueJob(myFastSearchJob, &job);
}
size_type size() const BSLS_KEYWORD_NOEXCEPT
Return the number of elements in this vector.
Definition bslstl_vector.h:2664

Now we simply wait for all the jobs in the queue to complete. Any matched files should have been added to the output file list.

pool.drain();
delete[] jobInfoArray;
}

The Functor Interface

The "void function/void pointer" convention is idiomatic for C programs. The void pointer argument provides a generic way of passing in user data, without regard to the data type. Clients who prefer better or more explicit type safety may wish to use the Functor Interface instead. This interface uses bsl::function to provide type-safe wrappers that can match argument number and type for a C++ free function or member function.

To illustrate the Functor Interface, we will make two small changes to the usage example above. First, we change the signature of the function that executes a single job, so that it uses a my_FastSearchJobInfo pointer rather than a void pointer. With this change, we can remove the first executable statement, which casts the void * pointer to my_FastSearchJobInfo *.

static void myFastFunctorSearchJob(my_FastSearchJobInfo *job)
{
FILE *file;
file = fopen(job->d_path->c_str(), "r");
// the rest of the function is unchanged.

Next, we make a change to the loop that enqueues the jobs in myFastSearch. We create a functor - a C++ object that acts as a function. The thread pool will "execute" this functor (by calling its operator() member function) on a thread when one becomes available.

for (int i = 0; i < count; ++i) {
my_FastSearchJobInfo &job = jobInfoArray[i];
job.d_word = &word;
job.d_path = &fileList[i];
job.d_mutex = &mutex;
job.d_outList = &outFileList;
bsl::function<void()> jobHandle =
bdlf::BindUtil::bind(&myFastFunctorSearchJob, &job);
pool.enqueueJob(jobHandle);
}
static Bind< bslmf::Nil, t_FUNC, Bind_BoundTuple0 > bind(t_FUNC func)
Definition bdlf_bind.h:1830
Forward declaration.
Definition bslstl_function.h:934

Use of bsl::function and bdlf::BindUtil is described in the bdef package documentation. For this example, it is important to note that jobHandle is a functor object, and that bdlf::BindUtil::bind populates that functor object with a function pointer (to the void function myFastFunctorSearchJob) and user data (&job). When the functor is executed via operator(), it will in turn execute the myFastFunctorSearchJob function with the supplied data as its argument.

Note also that the functor is created locally and handed to the thread pool. The thread pool copies the functor onto its internal queue, and takes responsibility for the copied functor until execution is complete.

The function is completed exactly as it was in the previous example.

pool.drain();
delete[] jobInfoArray;
}

Typedef Documentation

◆ bcep_FixedThreadPoolJobFunc

typedef void(* bcep_FixedThreadPoolJobFunc) (void *)