// bdlmt_fixedthreadpool.h -*-C++-*- // ---------------------------------------------------------------------------- // NOTICE // // This component is not up to date with current BDE coding standards, and // should not be used as an example for new development. // ---------------------------------------------------------------------------- #ifndef INCLUDED_BDLMT_FIXEDTHREADPOOL #define INCLUDED_BDLMT_FIXEDTHREADPOOL #include <bsls_ident.h> BSLS_IDENT("$Id: $") //@PURPOSE: Provide portable implementation for a fixed-size pool of threads. // //@CLASSES: // bdlmt::FixedThreadPool: portable fixed-size thread pool // //@SEE_ALSO: bdlmt_threadpool // //@DESCRIPTION: This component defines a portable and efficient implementation // of a thread pool, 'bdlmt::FixedThreadPool', that can be used to distribute // various user-defined functions ("jobs") to a separate threads to execute the // jobs concurrently. Each thread pool object manages a fixed number of // processing threads and can hold up to a fixed maximum number of pending // jobs. // // 'bdlmt::FixedThreadPool' implements a queuing mechanism that distributes // work among the threads. Jobs are queued for execution as they arrive, and // each queued job is processed by the next available thread. If each of the // concurrent threads is busy processing a job, new jobs will remain enqueued // until a thread becomes available. If the queue capacity is reached, // enqueuing jobs will block until threads consume more jobs from the queue, // causing its length to drop below its capacity. Both the queue's capacity // and number of threads are specified at construction and cannot be changed. // // The thread pool provides two interfaces for specifying jobs: the commonly // used "void function/void pointer" interface and the more versatile functor // based interface. The void function/void pointer interface allows callers to // use a C-style function to be executed as a job. The application need only // specify the address of the function, and a single void pointer argument, to // be passed to the function. The specified function will be invoked with the // specified argument by the processing thread. The functor based interface // allows for more flexible job execution such as the invocation of member // functions or the passing of multiple user-defined arguments. See the 'bdef' // package-level documentation for more on functors and their usage. // // Unlike a 'bdlmt::ThreadPool', an application can not tune a // 'bdlmt::FixedThreadPool' once it is created with a specified number of // threads and queue capacity, hence the name "fixed" thread pool. An // application can, however, specify the attributes of the threads in the pool // (e.g., thread priority or stack size), by providing a // 'bslmt::ThreadAttributes' object with the desired values set. See // 'bslmt_threadutil' package documentation for a description of // 'bslmt::ThreadAttributes'. // // Thread pools are ideal for developing multi-threaded server applications. A // server need only package client requests to execute as jobs, and // 'bdlmt::FixedThreadPool' will handle the queue management, thread // management, and request dispatching. Thread pools are also well suited for // parallelizing certain types of application logic. Without any complex or // redundant thread management code, an application can easily create a thread // pool, enqueue a series of jobs to be executed, and wait until all the jobs // have executed. // ///Thread Safety ///------------- // The 'bdlmt::FixedThreadPool' class is both *fully thread-safe* (i.e., all // non-creator methods can correctly execute concurrently), and is // *thread-enabled* (i.e., the classes does not function correctly in a // non-multi-threading environment). See 'bsldoc_glossary' for complete // definitions of *fully thread-safe* and *thread-enabled*. // ///Synchronous Signals on Unix ///--------------------------- // A thread pool ensures that, on unix platforms, all the threads in the pool // block all asynchronous signals. Specifically all the signals, except the // following synchronous signals are blocked: //.. // SIGBUS // SIGFPE // SIGILL // SIGSEGV // SIGSYS // SIGABRT // SIGTRAP // SIGIOT //.. // ///Thread Names for Sub-Threads ///---------------------------- // To facilitate debugging, users can provide a thread name as the 'threadName' // attribute of the 'bslmt::ThreadAttributes' argument passed to the // constructor, that will be used for all the sub-threads. The thread name // should not be used programmatically, but will appear in debugging tools on // platforms that support naming threads to help users identify the source and // purpose of a thread. If no 'ThreadAttributes' object is passed, or if the // 'threadName' attribute is not set, the default value "bdl.FixedPool" will be // used. // ///Usage ///----- // This example demonstrates the use of a 'bdlmt::FixedThreadPool' to // parallelize a segment of program logic. The example implements a // multi-threaded file search utility. The utility searches multiple files for // a string, similar to the Unix command 'fgrep'; the use of a // 'bdlmt::FixedThreadPool' allows the utility to search multiple files // concurrently. // // The example program will take as input a string and a list of files to // search. The program creates a 'bdlmt::FixedThreadPool', and then enqueues a // single "job" for each file to be searched. Each thread in the pool will // take a job from the queue, open the file, and search for the string. If a // match is found, the job adds the filename to an array of matching filenames. // Because this array of filenames is shared across multiple jobs and across // multiple threads, access to the array is controlled via a 'bslmt::Mutex'. // ///Setting FixedThreadPool Attributes /// - - - - - - - - - - - - - - - - - // To get started, we declare thread attributes, to be used in constructing the // thread pool. In this example, our choices for number of threads and queue // capacity are arbitrary. //.. // #define SEARCH_THREADS 10 // #define SEARCH_QUEUE_CAPACITY 50 //.. // Below is the structure that will be used to pass arguments to the file // search function. Since each job will be searching a separate file, a // distinct instance of the structure will be used for each job. //.. // struct my_FastSearchJobInfo { // const bsl::string *d_word; // word to search for // const bsl::string *d_path; // path of the file to search // bslmt::Mutex *d_mutex; // mutex to control access to the // // result file list // bsl::vector<bsl::string> *d_outList; // list of matching files // }; //.. // ///The "void function/void pointer" Interface /// - - - - - - - - - - - - - - - - - - - - - // 'myFastSearchJob' is the search function to be executed as a job by threads // in the thread pool, matching the "void function/void pointer" interface. // The single 'void *' argument is received and cast to point to a // 'struct my_FastSearchJobInfo', which then points to the search string and a // single file to be searched. Note that different 'my_FastSearchJobInfo' // structures for the same search request will differ only in the attribute // 'd_path', which points to a specific filename among the set of files to be // searched; other fields will be identical across all structures for a given // Fast Search. // // See the following section for an illustration of the functor interface. //.. // static void myFastSearchJob(void *arg) // { // my_FastSearchJobInfo *job = (my_FastSearchJobInfo*)arg; // FILE *file; // // file = fopen(job->d_path->c_str(), "r"); // // if (file) { // char buffer[1024]; // size_t nread; // int wordLen = job->d_word->length(); // const char *word = job->d_word->c_str(); // // nread = fread(buffer, 1, sizeof(buffer) - 1, file); // while (nread >= wordLen) { // buffer[nread] = 0; // if (strstr(buffer, word)) { //.. // If we find a match, we add the file to the result list and return. Since // the result list is shared among multiple processing threads, we use a mutex // lock to regulate access to the list. We use a 'bslmt::LockGuard' to manage // access to the mutex lock. This template object acquires a mutex lock on // 'job->d_mutex' at construction, releases that lock on destruction. Thus, // the mutex will be locked within the scope of the 'if' block, and released // when the program exits that scope. // // See 'bslmt_threadutil' for information about the 'bslmt::Mutex' class, and // component 'bslmt_lockguard' for information about the 'bslmt::LockGuard' // template class. //.. // bslmt::LockGuard<bslmt::Mutex> lock(job->d_mutex); // job->d_outList->push_back(*job->d_path); // break; // bslmt::LockGuard destructor unlocks mutex. // } // memcpy(buffer, &buffer[nread - wordLen - 1], wordLen - 1); // nread = fread(buffer + wordLen - 1, 1, sizeof(buffer) - wordLen, // file); // } // fclose(file); // } // } //.. // Routine 'myFastSearch' is the main driving routine, taking three arguments: // a single string to search for ('word'), a list of files to search, and an // output list of files. When the function completes, the file list will // contain the names of files where a match was found. //.. // void myFastSearch(const bsl::string& word, // const bsl::vector<bsl::string>& fileList, // bsl::vector<bsl::string>& outFileList) // { // bslmt::Mutex mutex; // bslmt::ThreadAttributes defaultAttributes; //.. // We initialize the thread pool using default thread attributes. We then // start the pool so that the threads can begin while we prepare the jobs. //.. // bdlmt::FixedThreadPool pool(defaultAttributes, // SEARCH_THREADS, // SEARCH_QUEUE_CAPACITY); // // if (0 != pool.start()) { // bsl::cerr << "Thread start() failed. Thread quota exceeded?" // << bsl::endl; // exit(1); // } //.. // For each file to be searched, we create the job info structure that will be // passed to the search function and add the job to the pool. // // As noted above, all jobs will share a single mutex to guard the output file // list. Function 'myFastSearchJob' uses a 'bslmt::LockGuard' on this mutex to // serialize access to the list. //.. // int count = fileList.size(); // my_FastSearchJobInfo *jobInfoArray = new my_FastSearchJobInfo[count]; // // for (int i = 0; i < count; ++i) { // my_FastSearchJobInfo &job = jobInfoArray[i]; // job.d_word = &word; // job.d_path = &fileList[i]; // job.d_mutex = &mutex; // job.d_outList = &outFileList; // pool.enqueueJob(myFastSearchJob, &job); // } //.. // Now we simply wait for all the jobs in the queue to complete. Any matched // files should have been added to the output file list. //.. // pool.drain(); // delete[] jobInfoArray; // } //.. // ///The Functor Interface ///- - - - - - - - - - - // The "void function/void pointer" convention is idiomatic for C programs. // The 'void' pointer argument provides a generic way of passing in user data, // without regard to the data type. Clients who prefer better or more explicit // type safety may wish to use the Functor Interface instead. This interface // uses 'bsl::function' to provide type-safe wrappers that can match argument // number and type for a C++ free function or member function. // // To illustrate the Functor Interface, we will make two small changes to the // usage example above. First, we change the signature of the function that // executes a single job, so that it uses a 'my_FastSearchJobInfo' pointer // rather than a 'void' pointer. With this change, we can remove the first // executable statement, which casts the 'void *' pointer to // 'my_FastSearchJobInfo *'. //.. // static void myFastFunctorSearchJob(my_FastSearchJobInfo *job) // { // FILE *file; // // file = fopen(job->d_path->c_str(), "r"); // // the rest of the function is unchanged. //.. // Next, we make a change to the loop that enqueues the jobs in 'myFastSearch'. // We create a functor - a C++ object that acts as a function. The thread pool // will "execute" this functor (by calling its 'operator()' member function) on // a thread when one becomes available. //.. // for (int i = 0; i < count; ++i) { // my_FastSearchJobInfo &job = jobInfoArray[i]; // job.d_word = &word; // job.d_path = &fileList[i]; // job.d_mutex = &mutex; // job.d_outList = &outFileList; // // bsl::function<void()> jobHandle = // bdlf::BindUtil::bind(&myFastFunctorSearchJob, &job); // pool.enqueueJob(jobHandle); // } //.. // Use of 'bsl::function' and 'bdlf::BindUtil' is described in the 'bdef' // package documentation. For this example, it is important to note that // 'jobHandle' is a functor object, and that 'bdlf::BindUtil::bind' populates // that functor object with a function pointer (to the 'void' function // 'myFastFunctorSearchJob') and user data ('&job'). When the functor is // executed via 'operator()', it will in turn execute the // 'myFastFunctorSearchJob' function with the supplied data as its argument. // // Note also that the functor is created locally and handed to the thread pool. // The thread pool copies the functor onto its internal queue, and takes // responsibility for the copied functor until execution is complete. // // The function is completed exactly as it was in the previous example. //.. // pool.drain(); // delete[] jobInfoArray; // } //.. #include <bdlscm_version.h> #include <bdlcc_boundedqueue.h> #include <bslmf_movableref.h> #include <bslmt_barrier.h> #include <bslmt_lockguard.h> #include <bslmt_mutex.h> #include <bslmt_threadattributes.h> #include <bslmt_threadutil.h> #include <bslmt_threadgroup.h> #include <bsls_assert.h> #include <bsls_atomic.h> #include <bsls_platform.h> #include <bdlf_bind.h> #include <bslma_allocator.h> #include <bsl_cstdlib.h> #include <bsl_functional.h> #ifndef BDE_DONT_ALLOW_TRANSITIVE_INCLUDES #include <bdlcc_fixedqueue.h> #include <bslmt_condition.h> #include <bslmt_semaphore.h> #include <bsl_algorithm.h> #endif // BDE_DONT_ALLOW_TRANSITIVE_INCLUDES namespace BloombergLP { #ifndef BDE_OMIT_INTERNAL_DEPRECATED extern "C" typedef void (*bcep_FixedThreadPoolJobFunc)(void *); // This type declares the prototype for functions that are suitable to // be specified 'bdlmt::FixedThreadPool::enqueueJob'. #endif // BDE_OMIT_INTERNAL_DEPRECATED namespace bdlmt { extern "C" typedef void (*FixedThreadPoolJobFunc)(void *); // This type declares the prototype for functions that are suitable to be // specified 'bdlmt::FixedThreadPool::enqueueJob'. // ===================== // class FixedThreadPool // ===================== class FixedThreadPool { // This class implements a thread pool used for concurrently executing // multiple user-defined functions ("jobs"). public: // TYPES typedef bsl::function<void()> Job; typedef bdlcc::BoundedQueue<Job> Queue; // PUBLIC CONSTANTS enum { e_SUCCESS = Queue::e_SUCCESS, e_FULL = Queue::e_FULL, e_DISABLED = Queue::e_DISABLED, e_FAILED = Queue::e_FAILED }; enum { e_STOP , e_RUN , e_SUSPEND , e_DRAIN #ifndef BDE_OMIT_INTERNAL_DEPRECATED , BCEP_STOP = e_STOP , BCEP_RUN = e_RUN , BCEP_SUSPEND = e_SUSPEND , BCEP_DRAIN = e_DRAIN , TP_STOP = e_STOP , TP_RUN = e_RUN , TP_SUSPEND = e_SUSPEND , TP_DRAIN = e_DRAIN #endif // BDE_OMIT_INTERNAL_DEPRECATED }; private: // PRIVATE CLASS DATA static const char s_defaultThreadName[16]; // Thread name to use // when none is // specified. // PRIVATE DATA Queue d_queue; // underlying queue bsls::AtomicInt d_numActiveThreads; // number of threads // processing jobs bsls::AtomicBool d_drainFlag; // set when draining bslmt::Barrier d_barrier; // barrier to sync threads // during 'start' and 'drain' bslmt::Mutex d_metaMutex; // mutex to ensure that there // is only one controlling // thread at any time bslmt::ThreadGroup d_threadGroup; // threads used by this pool bslmt::ThreadAttributes d_threadAttributes; // thread attributes to be // used when constructing // processing threads const int d_numThreads; // number of configured // processing threads. #if defined(BSLS_PLATFORM_OS_UNIX) sigset_t d_blockSet; // set of signals to be // blocked in managed threads #endif // PRIVATE MANIPULATORS void workerThread(); // The main function executed by each worker thread. int startNewThread(); // Internal method to spawn a new processing thread and increment the // current count. Note that this method must be called with // 'd_metaMutex' locked. // NOT IMPLEMENTED FixedThreadPool(const FixedThreadPool&); FixedThreadPool& operator=(const FixedThreadPool&); public: // CREATORS FixedThreadPool(int numThreads, int maxNumPendingJobs, bslma::Allocator *basicAllocator = 0); // Construct a thread pool with the specified 'numThreads' number of // threads and a job queue of capacity sufficient to enqueue the // specified 'maxNumPendingJobs' without blocking. Optionally specify // a 'basicAllocator' used to supply memory. If 'basicAllocator' is 0, // the currently installed default allocator is used. The behavior is // undefined unless '1 <= numThreads'. FixedThreadPool(const bslmt::ThreadAttributes& threadAttributes, int numThreads, int maxNumPendingJobs, bslma::Allocator *basicAllocator = 0); // Construct a thread pool with the specified 'threadAttributes', // 'numThreads' number of threads, and a job queue with capacity // sufficient to enqueue the specified 'maxNumPendingJobs' without // blocking. Optionally specify a 'basicAllocator' used to supply // memory. If 'basicAllocator' is 0, the currently installed default // allocator is used. The behavior is undefined unless // '1 <= numThreads'. ~FixedThreadPool(); // Remove all pending jobs from the queue without executing them, block // until all currently running jobs complete, and then destroy this // thread pool. // MANIPULATORS void disable(); // Disable enqueueing into this pool. All subsequent invocations of // 'enqueueJob' or 'tryEnqueueJob' will fail immediately. All blocked // invocations of 'enqueueJob' will fail immediately. If the pool is // already enqueue disabled, this method has no effect. Note that this // method has no effect on jobs currently in the pool. void enable(); // Enable queuing into this pool. If the queue is not enqueue // disabled, this call has no effect. int enqueueJob(const Job& functor); int enqueueJob(bslmf::MovableRef<Job> functor); // Enqueue the specified 'functor' to be executed by the next available // thread. Return 0 on success, and a non-zero value otherwise. // Specifically, return 'e_SUCCESS' on success, 'e_DISABLED' if // '!isEnabled()', and 'e_FAILED' if an error occurs. This operation // will block if there is not sufficient capacity in the underlying // queue until there is free capacity to successfully enqueue this job. // Threads blocked (on enqueue methods) due to the underlying queue // being full will unblock and return 'e_DISABLED' if 'disable' is // invoked (on another thread). The behavior is undefined unless // 'functor' is not null. int enqueueJob(FixedThreadPoolJobFunc function, void *userData); // Enqueue the specified 'function' to be executed by the next // available thread. The specified 'userData' pointer will be passed // to the function by the processing thread. Return 0 on success, and // a non-zero value otherwise. Specifically, return 'e_SUCCESS' on // success, 'e_DISABLED' if '!isEnabled()', and 'e_FAILED' if an error // occurs. This operation will block if there is not sufficient // capacity in the underlying queue until there is free capacity to // successfully enqueue this job. Threads blocked (on enqueue methods) // due to the underlying queue being full will unblock and return // 'e_DISABLED' if 'disable' is invoked (on another thread). The // behavior is undefined unless 'function' is not null. int tryEnqueueJob(const Job& functor); int tryEnqueueJob(bslmf::MovableRef<Job> functor); // Enqueue the specified 'functor' to be executed by the next available // thread. Return 0 on success, and a non-zero value otherwise. // Specifically, return 'e_SUCCESS' on success, 'e_DISABLED' if // '!isEnabled()', 'e_FULL' if 'isEnabled()' and the underlying queue // was full, and 'e_FAILED' if an error occurs. The behavior is // undefined unless 'functor' is not null. int tryEnqueueJob(FixedThreadPoolJobFunc function, void *userData); // Enqueue the specified 'function' to be executed by the next // available thread. The specified 'userData' pointer will be passed // to the function by the processing thread. Return 0 on success, and // a non-zero value otherwise. Specifically, return 'e_SUCCESS' on // success, 'e_DISABLED' if '!isEnabled()', 'e_FULL' if 'isEnabled()' // and the underlying queue was full, and 'e_FAILED' if an error // occurs. The behavior is undefined unless 'function' is not null. void drain(); // Wait until the underlying queue is empty without disabling this pool // (and may thus wait indefinitely), and then wait until all executing // jobs complete. If the thread pool was not already started // ('isStarted()' is 'false'), this method has no effect. Note that if // any jobs are submitted concurrently with this method, this method // may or may not wait until they have also completed. void shutdown(); // Disable enqueuing jobs on this thread pool, cancel all pending jobs, // wait until all active jobs complete, and join all processing // threads. If the thread pool was not already started ('isStarted()' // is 'false'), this method has no effect. At the completion of this // method, 'false == isStarted()'. int start(); // Spawn threads until there are 'numThreads()' processing threads. On // success, enable enqueuing and return 0. Otherwise, join all threads // (ensuring 'false == isStarted()') and return -1. If the thread pool // was already started ('isStarted()' is 'true'), this method has no // effect. void stop(); // Disable enqueuing jobs on this thread pool, wait until all active // and pending jobs complete, and join all processing threads. If the // thread pool was not already started ('isStarted()' is 'false'), this // method has no effect. At the completion of this method, // 'false == isStarted()'. // ACCESSORS bool isEnabled() const; // Return 'true' if enqueuing jobs is enabled on this thread pool, and // 'false' otherwise. bool isStarted() const; // Return 'true' if 'numThreads()' are started on this threadpool and // 'false' otherwise (indicating that 0 threads are started on this // thread pool.) int numActiveThreads() const; // Return a snapshot of the number of threads that are currently // processing a job for this threadpool. int numPendingJobs() const; // Return a snapshot of the number of jobs currently enqueued to be // processed by thread pool. int numThreads() const; // Return the number of threads passed to this thread pool at // construction. int numThreadsStarted() const; // Return a snapshot of the number of threads currently started by this // thread pool. int queueCapacity() const; // Return the capacity of the queue used to enqueue jobs by this thread // pool. }; // ============================================================================ // INLINE DEFINITIONS // ============================================================================ // --------------------- // class FixedThreadPool // --------------------- // MANIPULATORS inline void FixedThreadPool::disable() { d_queue.disablePushBack(); } inline void FixedThreadPool::enable() { d_queue.enablePushBack(); } inline int FixedThreadPool::enqueueJob(const Job& functor) { BSLS_ASSERT(functor); return d_queue.pushBack(functor); } inline int FixedThreadPool::enqueueJob(bslmf::MovableRef<Job> functor) { BSLS_ASSERT(bslmf::MovableRefUtil::access(functor)); return d_queue.pushBack(bslmf::MovableRefUtil::move(functor)); } inline int FixedThreadPool::enqueueJob(FixedThreadPoolJobFunc function, void *userData) { BSLS_ASSERT(0 != function); return enqueueJob(bdlf::BindUtil::bindR<void>(function, userData)); } inline int FixedThreadPool::tryEnqueueJob(const Job& functor) { BSLS_ASSERT(functor); return d_queue.tryPushBack(functor); } inline int FixedThreadPool::tryEnqueueJob(bslmf::MovableRef<Job> functor) { BSLS_ASSERT(bslmf::MovableRefUtil::access(functor)); return d_queue.tryPushBack(bslmf::MovableRefUtil::move(functor)); } inline int FixedThreadPool::tryEnqueueJob(FixedThreadPoolJobFunc function, void *userData) { BSLS_ASSERT(0 != function); return tryEnqueueJob(bdlf::BindUtil::bindR<void>(function, userData)); } inline void FixedThreadPool::drain() { bslmt::LockGuard<bslmt::Mutex> lock(&d_metaMutex); if (isStarted()) { d_queue.waitUntilEmpty(); d_drainFlag = true; d_queue.disablePopFront(); d_barrier.wait(); d_drainFlag = false; d_queue.enablePopFront(); d_barrier.wait(); } } inline void FixedThreadPool::shutdown() { bslmt::LockGuard<bslmt::Mutex> lock(&d_metaMutex); if (isStarted()) { d_queue.disablePushBack(); d_queue.disablePopFront(); d_threadGroup.joinAll(); d_queue.removeAll(); } } inline void FixedThreadPool::stop() { bslmt::LockGuard<bslmt::Mutex> lock(&d_metaMutex); if (isStarted()) { d_queue.disablePushBack(); d_queue.waitUntilEmpty(); d_queue.disablePopFront(); d_threadGroup.joinAll(); } } // ACCESSORS inline bool FixedThreadPool::isEnabled() const { return !d_queue.isPushBackDisabled(); } inline bool FixedThreadPool::isStarted() const { return d_numThreads == d_threadGroup.numThreads(); } inline int FixedThreadPool::numActiveThreads() const { return d_numActiveThreads.loadAcquire(); } inline int FixedThreadPool::numPendingJobs() const { return static_cast<int>(d_queue.numElements()); } inline int FixedThreadPool::numThreads() const { return d_numThreads; } inline int FixedThreadPool::numThreadsStarted() const { return d_threadGroup.numThreads(); } inline int FixedThreadPool::queueCapacity() const { return static_cast<int>(d_queue.capacity()); } } // close package namespace } // close enterprise namespace #endif // ---------------------------------------------------------------------------- // Copyright 2021 Bloomberg Finance L.P. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // ----------------------------- END-OF-FILE ----------------------------------