Quick Links:

bal | bbl | bdl | bsl

Namespaces

Component bdlcc_queue
[Package bdlcc]

Provide a thread-enabled queue of items of parameterized TYPE. More...

Namespaces

namespace  bdlcc

Detailed Description

Outline
Purpose:
Provide a thread-enabled queue of items of parameterized TYPE.
Classes:
bdlcc::Queue thread-enabled bdlc::Queue wrapper
See also:
Component bdlc_queue
Deprecated:
use bdlcc::Deque instead.
Description:
This component provides a thread-enabled implementation of an efficient, in-place, indexable, double-ended queue of parameterized TYPE values, namely the bdlcc::Queue<TYPE> container. bdlcc::Queue is effectively a thread-enabled handle for bdlc::Queue, whose interface is also made available through bdlcc::Queue.
Thread-Enabled Idioms in the bdlcc::Queue Interface:
The thread-enabled bdlcc::Queue is similar to bdlc::Queue in many regards, but there are several differences in method behavior and signature that arise due to the thread-enabled nature of the queue and its anticipated usage pattern. Most notably, the popFront and popBack methods return a TYPE object by value, rather than returning void, as bdlc::Queue does. Moreover, if a queue object is empty, popFront and popBack will block indefinitely until an item is added to the queue.
As a corollary to this behavior choice, bdlcc::Queue also provides timedPopFront and timedPopBack methods. These methods wait until a specified timeout expires if the queue is empty, returning an item if one becomes available before the specified timeout; otherwise, they return a non-zero value to indicate that the specified timeout expired before an item was available. Note that all timeouts are expressed as values of type bsls::TimeInterval that represent ABSOLUTE times from 00:00:00 UTC, January 1, 1970.
The behavior of the push methods differs in a similar manner. bdlcc::Queue supports the notion of a suggested maximum queue size, called the "high-water mark", a value supplied at construction. The pushFront and pushBack methods will block indefinitely if the queue contains (at least) the high-water mark number of items, until the number of items falls below the high-water mark. The timedPushFront and timedPushBack are provided to limit the duration of blocking; note, however, that these methods can fail to add an item to the queue. For this reason, bdlcc::Queue also provides a forcePushFront method that will override the high-water mark, if needed, in order to succeed without blocking. Note that this design decision makes the high-water mark concept a suggestion and not an invariant.
Use of the bdlc::Queue Interface:
Class bdlcc::Queue provides access to an underlying bdlc::Queue, so clients of bdlcc::Queue have full access to the interface behavior of bdlc::Queue to inspect and modify the bdlcc::Queue.
Member function bdlcc::Queue::queue() provides direct modifiable access to the bdlc::Queue object used in the implementation. Member functions bdlcc::Queue::mutex(), bdlcc::Queue::notEmptyCondition(), and bdlcc::Queue::notFullCondition() correspondingly provide direct modifiable access to the underlying bslmt::Mutex and bslmt::Condition objects respectively. These underlying objects are used within bdlcc::Queue to manage concurrent access to the queue. Clients may use these member variables together if needed.
Whenever accessing the bdec queue directly, clients must be sure to lock and unlock the mutex or to signal or broadcast on the condition variable as appropriate. For example, a client might use the underlying queue and mutex as follows:
     bdlcc::Queue<myData>  myWorkQueue;
     bdlc::Queue<myData>& rawQueue = myWorkQueue.queue();
     bslmt::Mutex&        queueMutex = myWorkQueue.mutex();
         // other code omitted...

     myData  data1;
     myData  data2;
     bool pairFoundFlag = 0;
     // Take two items from the queue atomically, if available.

     queueMutex.lock();
     if (rawQueue.length() >= 2) {
         data1 = rawQueue.front();
         rawQueue.popFront();
         data2 = rawQueue.front();
         rawQueue.popFront();
         pairFound = 1;
     }
     queueMutex.unlock();

     if (pairFoundFlag) {
         // Process the pair
     }
Note that a future version of this component will provide access to a thread-safe "smart pointer" that will manage the bdlc::Queue with respect to locking and signaling. At that time, direct access to the bdlc::Queue will be deprecated. In the meanwhile, the user should be careful to use the bdlc::Queue and the synchronization objects properly.
WARNING: Synchronization Required on Destruction:
The behavior for the destructor is undefined unless all access or modification of the object is completed prior to its destruction. Some form of synchronization, external to the component, is required to ensure the precondition on the destructor is met. For example, if two (or more) threads are manipulating a queue, it is not safe to anticipate the number of elements added to the queue, and destroy that queue immediately after the last element is popped (without additional synchronization) because one of the corresponding push functions may not have completed (push may, for instance, signal waiting threads after the element is considered added to the queue).
Usage:
This section illustrates intended use of this component.
Example 1: Simple Thread Pool:
The following example demonstrates a typical usage of a bdlcc::Queue.
This bdlcc::Queue is used to communicate between a single "producer" thread and multiple "consumer" threads. The "producer" will push work requests onto the queue, and each "consumer" will iteratively take a work request from the queue and service the request. This example shows a partial, simplified implementation of the bdlmt::ThreadPool class. See component bdlmt_threadpool for more information.
We begin our example with some utility classes that define a simple "work item":
  enum {
      k_MAX_CONSUMER_THREADS = 10
  };

  struct my_WorkData {
      // Work data...
  };

  struct my_WorkRequest {
      enum RequestType {
          e_WORK = 1,
          e_STOP = 2
      };

      RequestType d_type;
      my_WorkData d_data;
      // Work data...
  };
Next, we provide a simple function to service an individual work item. The details are unimportant for this example.
  void myDoWork(my_WorkData& data)
  {
      // do some stuff...
      (void)data;
  }
The myConsumer function will pop items off the queue and process them. As discussed above, note that the call to queue->popFront() will block until there is an item available on the queue. This function will be executed in multiple threads, so that each thread waits in queue->popFront(), and bdlcc::Queue guarantees that each thread gets a unique item from the queue.
  void myConsumer(bdlcc::Queue<my_WorkRequest> *queue)
  {
      while (1) {
          // 'popFront()' will wait for a 'my_WorkRequest' until available.

          my_WorkRequest item = queue->popFront();
          if (item.d_type == my_WorkRequest::e_STOP) break;
          myDoWork(item.d_data);
      }
  }
The function below is a callback for bslmt::ThreadUtil, which requires a "C" signature. bslmt::ThreadUtil::create() expects a pointer to this function, and provides that function pointer to the newly created thread. The new thread then executes this function.
Since bslmt::ThreadUtil::create() uses the familiar "C" convention of passing a void pointer, our function simply casts that pointer to our required type (bdlcc::Queue<my_WorkRequest*> *), and then delegates to the queue-specific function myConsumer, above.
  extern "C" void *myConsumerThread(void *queuePtr)
  {
      myConsumer ((bdlcc::Queue<my_WorkRequest> *)queuePtr);
      return queuePtr;
  }
In this simple example, the myProducer function serves multiple roles: it creates the bdlcc::Queue, starts out the consumer threads, and then produces and queues work items. When work requests are exhausted, this function queues one STOP item for each consumer queue.
When each Consumer thread reads a STOP, it terminates its thread-handling function. Note that, although the producer cannot control which thread pops a particular work item, it can rely on the knowledge that each Consumer thread will read a single STOP item and then terminate.
Finally, the myProducer function "joins" each Consumer thread, which ensures that the thread itself will terminate correctly; see the bslmt_threadutil component for details.
  void myProducer(int numThreads)
  {
      my_WorkRequest item;
      my_WorkData    workData;

      bdlcc::Queue<my_WorkRequest> queue;

      assert(0 < numThreads && numThreads <= k_MAX_CONSUMER_THREADS);
      bslmt::ThreadUtil::Handle consumerHandles[k_MAX_CONSUMER_THREADS];

      for (int i = 0; i < numThreads; ++i) {
          bslmt::ThreadUtil::create(&consumerHandles[i],
                                    myConsumerThread,
                                    &queue);
      }

      while (!getWorkData(&workData)) {
          item.d_type = my_WorkRequest::e_WORK;
          item.d_data = workData;
          queue.pushBack(item);
      }

      for (int i = 0; i < numThreads; ++i) {
          item.d_type = my_WorkRequest::e_STOP;
          queue.pushBack(item);
      }

      for (int i = 0; i < numThreads; ++i) {
          bslmt::ThreadUtil::join(consumerHandles[i]);
      }
  }
Example 2: Multi-Threaded Observer:
The previous example shows a simple mechanism for distributing work requests over multiple threads. This approach works well for large tasks that can be decomposed into discrete, independent tasks that can benefit from parallel execution. Note also that the various threads are synchronized only at the end of execution, when the Producer "joins" the various consumer threads.
The simple strategy used in the first example works well for tasks that share no state, and are completely independent of one another. For instance, a web server might use a similar strategy to distribute http requests across multiple worker threads.
In more complicated examples, it is often necessary or desirable to synchronize the separate tasks during execution. The second example below shows a single "Observer" mechanism that receives event notification from the various worker threads.
We first create a simple my_Event data type. Worker threads will use this data type to report information about their work. In our example, we will report the "worker Id", the event number, and some arbitrary text.
As with the previous example, class my_Event also contains an EventType, which is an enumeration which that indicates whether the worker has completed all work. The "Observer" will use this enumerated value to note when a Worker thread has completed its work.
  enum {
      k_MAX_CONSUMER_THREADS = 10,
      k_MAX_EVENT_TEXT       = 80
  };

  struct my_Event {
      enum EventType {
          e_IN_PROGRESS   = 1,
          e_TASK_COMPLETE = 2
      };

      EventType d_type;
      int       d_workerId;
      int       d_eventNumber;
      char      d_eventText[k_MAX_EVENT_TEXT];
  };
As noted in the previous example, bslmt::ThreadUtil::create() spawns a new thread, which invokes a simple "C" function taking a void pointer. In the previous example, we simply converted that void pointer into a pointer to the parameterized bdlcc::Queue<TYPE> object.
In this example, we want to pass an additional data item. Each worker thread is initialized with a unique integer value ("worker Id") that identifies that thread. We create a simple data structure that contains both of these values:
  struct my_WorkerData {
      int                     d_workerId;
      bdlcc::Queue<my_Event> *d_queue_p;
  };
Function myWorker simulates a working thread by enqueuing multiple my_Event events during execution. In a normal application, each my_Event structure would likely contain different textual information; for the sake of simplicity, our loop uses a constant value for the text field.
  void myWorker(int workerId, bdlcc::Queue<my_Event> *queue)
  {
      const int NEVENTS = 5;
      int evnum;

      for (evnum = 0; evnum < NEVENTS; ++evnum) {
          my_Event ev = {
              my_Event::e_IN_PROGRESS,
              workerId,
              evnum,
              "In-Progress Event"
          };
          queue->pushBack(ev);
      }

      my_Event ev = {
          my_Event::e_TASK_COMPLETE,
          workerId,
          evnum,
          "Task Complete"
      };
      queue->pushBack(ev);
  }
The callback function invoked by bslmt::ThreadUtil::create() takes the traditional void pointer. The expected data is the composite structure my_WorkerData. The callback function casts the void pointer to the application-specific data type and then uses the referenced object to construct a call to the myWorker function.
  extern "C" void *myWorkerThread(void *v_worker_p)
  {
      my_WorkerData *worker_p = (my_WorkerData *) v_worker_p;
      myWorker(worker_p->d_workerId, worker_p->d_queue_p);
      return v_worker_p;
  }
For the sake of simplicity, we will implement the Observer behavior in the main thread. The void function myObserver starts out multiple threads running the myWorker function, reads my_Events from the queue, and logs all messages in the order of arrival.
As each myWorker thread terminates, it sends a e_TASK_COMPLETE event. Upon receiving this event, the myObserver function uses the d_workerId to find the relevant thread, and then "joins" that thread.
The myObserver function determines when all tasks have completed simply by counting the number of e_TASK_COMPLETE messages received.
  void myObserver()
  {
      const int NTHREADS = 10;
      bdlcc::Queue<my_Event> queue;

      assert(NTHREADS > 0 && NTHREADS <= k_MAX_CONSUMER_THREADS);
      bslmt::ThreadUtil::Handle workerHandles[k_MAX_CONSUMER_THREADS];

      my_WorkerData workerData;
      workerData.d_queue_p = &queue;
      for (int i = 0; i < NTHREADS; ++i) {
          workerData.d_workerId = i;
          bslmt::ThreadUtil::create(&workerHandles[i],
                                    myWorkerThread,
                                    &workerData);
      }
      int nStop = 0;
      while (nStop < NTHREADS) {
          my_Event ev = queue.popFront();
          bsl::cout << "[" << ev.d_workerId    << "] "
                           << ev.d_eventNumber << ". "
                           << ev.d_eventText   << bsl::endl;
          if (my_Event::e_TASK_COMPLETE == ev.d_type) {
              ++nStop;
              bslmt::ThreadUtil::join(workerHandles[ev.d_workerId]);
          }
      }
  }