Quick Links:

bal | bbl | bdl | bsl

Namespaces

Component bdlcc_multipriorityqueue
[Package bdlcc]

Provide a thread-enabled parameterized multi-priority queue. More...

Namespaces

namespace  bdlcc

Detailed Description

Outline
Purpose:
Provide a thread-enabled parameterized multi-priority queue.
Classes:
bdlcc::MultipriorityQueue thread-enabled, multi-priority queue
See also:
Description:
This component provides a thread-enabled mechanism, bdlcc::MultipriorityQueue, implementing a special-purpose priority queue container of items of parameterized TYPE. Each item has a priority which, for efficiency of implementation, is limited to a relatively small number N of contiguous integers [ 0 .. N - 1 ], with N indicated at construction, and 0 being the most urgent priority. This queue also takes an optional allocator, supplied at construction. Once configured, these instance parameters remain unchanged for the life of each multi-priority queue.
Thread-Enabled Idioms in the bdlcc::MultipriorityQueue Interface:
The thread-enabled bdlcc::MultipriorityQueue is, in many regards, similar to a value-semantic type in that there is an obvious abstract notion of "value" that can be described in terms of salient attributes, which for this type is a sequence of priority/element pairs, constrained to be in increasing order of priority. There are, however, several differences in method behavior and signature that arise due to the thread-enabled nature of the queue and its anticipated usage pattern.
For example, if a queue object is empty, popFront will block indefinitely until an element is added to the queue. Also, since dynamic instance information, such as the number of elements currently in a queue, can be out-of-date by the time it is returned, some manipulators (e.g., tryPopFront) are deliberately combined with an accessor operation (e.g., isEmpty) in order to guarantee proper behavior.
Finally, note that although the parameterized TYPE is expected to at least support copy construction and assignment, the bdec::MultipriorityQueue<TYPE> type currently does not support any value-semantic operations, since different queues could have different numbers of priorities, making comparison, assignment and copy construction awkward.
Possible Future Enhancements:
In addition to popFront and tryPopFront, a bdlcc::MultipriorityQueue may some day also provide a timedPopFront method. This method would block until it is able to complete successfully or until the specified time limit expires.
WARNING: Synchronization Required on Destruction:
The behavior for the destructor is undefined unless all access or modification of the object is completed prior to its destruction. Some form of synchronization, external to the component, is required to ensure the precondition on the destructor is met. For example, if two (or more) threads are manipulating a queue, it is not safe to anticipate the number of elements added to the queue, and destroy that queue immediately after the last element is popped (without additional synchronization) because one of the corresponding push functions may not have completed (push may, for instance, signal waiting threads after the element is considered added to the queue).
Usage:
This section illustrates intended use of this component.
Example 1: Simple Thread Pool:
This example demonstrates how we might use a bdlcc::MultipriorityQueue to communicate between a single "producer" thread and multiple "consumer" threads. The "producer" pushes work requests of varying priority onto the queue, and each "consumer" iteratively takes the highest priority work request from the queue and services it.
We begin our example with some utility classes that define a simple "work item":
  enum {
      k_MAX_CONSUMER_THREADS = 10
  };

  struct MyWorkData {
      int d_i;        // input to work to be done

      // Work data...
  };

  struct MyWorkRequest {
      enum RequestType {
          e_WORK = 1,
          e_STOP = 2
      };

      RequestType d_type;
      MyWorkData  d_data;

      // Work data...
  };
Next, we provide a simple function to service an individual work item, and a function to get a work item. The details are unimportant for this example:
  void myDoWork(MyWorkData& data)
  {
      // Do work...
      (void)data;
  }

  int getWorkData(MyWorkData *result)
  {
      static int count = 0;
      result->d_i = rand();   // Only one thread runs this routine, so it
                              // does not matter that 'rand()' is not
                              // thread-safe, or that 'count' is 'static'.

      return ++count >= 100;
  }
The myConsumer function (below) will pop elements off the queue in priority order and process them. As discussed above, note that the call to queue->popFront(&item) will block until there is an element available on the queue. This function will be executed in multiple threads, so that each thread waits in queue->popFront(); bdlcc::MultipriorityQueue guarantees that each thread gets a unique element from the queue:
  void myConsumer(bdlcc::MultipriorityQueue<MyWorkRequest> *queue)
  {
      MyWorkRequest item;
      while (1) {

          // The 'popFront' function will wait for a 'MyWorkRequest' until
          // one is available.

          queue->popFront(&item);

          if (MyWorkRequest::e_STOP == item.d_type) {
              break;
          }

          myDoWork(item.d_data);
      }
  }
The myConsumerThread function below is a callback for bslmt::ThreadUtil, which requires a "C" signature. bslmt::ThreadUtil::create() expects a pointer to this function, and provides that function pointer to the newly-created thread. The new thread then executes this function.
Since bslmt::ThreadUtil::create() uses the familiar "C" convention of passing a void pointer, our function simply casts that pointer to our required type (bdlcc::MultipriorityQueue<MyWorkRequest> *), and then delegates to the queue-specific function myConsumer (above):
  extern "C" void *myConsumerThread(void *queuePtr)
  {
      myConsumer ((bdlcc::MultipriorityQueue<MyWorkRequest>*) queuePtr);
      return queuePtr;
  }
In this simple example, the myProducer function (below) serves multiple roles: it creates the bdlcc::MultipriorityQueue, starts the consumer threads, and then produces and queues work items. When work requests are exhausted, this function queues one e_STOP item for each consumer thread.
When each consumer thread reads a e_STOP, it terminates its thread-handling function. Note that, although the producer cannot control which thread pops a particular work item, it can rely on the knowledge that each consumer thread will read a single e_STOP item and then terminate.
Finally, the myProducer function "joins" each consumer thread, which ensures that the thread itself will terminate correctly (see the bslmt_threadutil component-level documentation for details):
  void myProducer()
  {
      enum {
          k_NUM_PRIORITIES = 8,
          k_NUM_THREADS    = 8
      };

      MyWorkRequest item;
      MyWorkData    workData;

      // Create multi-priority queue with specified number of priorities.

      bdlcc::MultipriorityQueue<MyWorkRequest> queue(k_NUM_PRIORITIES);

      // Start the specified number of threads.

      assert(0 < k_NUM_THREADS
          && k_NUM_THREADS <= static_cast<int>(k_MAX_CONSUMER_THREADS));
      bslmt::ThreadUtil::Handle consumerHandles[k_MAX_CONSUMER_THREADS];

      for (int i = 0; i < k_NUM_THREADS; ++i) {
          bslmt::ThreadUtil::create(&consumerHandles[i],
                                   myConsumerThread,
                                   &queue);
      }

      // Load work data into work requests and push them onto the queue with
      // varying priority until all work data has been exhausted.

      int count = 0;                          // used to generate priorities

      while (!getWorkData(&workData)) {       // see declaration (above)
          item.d_type = MyWorkRequest::e_WORK;
          item.d_data = workData;
          queue.pushBack(item, count % k_NUM_PRIORITIES);  // mixed
                                                           // priorities
          ++count;
      }

      // Load as many stop requests as there are active consumer threads.

      for (int i = 0; i < k_NUM_THREADS; ++i) {
          item.d_type = MyWorkRequest::e_STOP;
          queue.pushBack(item, k_NUM_PRIORITIES - 1);  // lowest priority
      }

      // Join all of the consumer threads back with the main thread.

      for (int i = 0; i < k_NUM_THREADS; ++i) {
          bslmt::ThreadUtil::join(consumerHandles[i]);
      }
  }
Example 2: Multi-Threaded Observer:
The previous example shows a simple mechanism for distributing work requests over multiple threads. This approach works well for large tasks that can be decomposed into discrete, independent tasks that can benefit from parallel execution. Note also that the various threads are synchronized only at the end of execution, when the producer "joins" the various consumer threads.
The simple strategy used in the first example works well for tasks that share no state, and are completely independent of one another. For instance, a web server might use a similar strategy to distribute http requests across multiple worker threads.
In more complicated examples, it is often necessary or desirable to synchronize the separate tasks during execution. The second example below shows a single "Observer" mechanism that receives event notification from the various worker threads.
We first create a simple MyEvent data type. Worker threads will use this type to report information about their work. In our example, we will report the "worker Id", the event number, and some arbitrary text.
As with the previous example, class MyEvent also contains an EventType, an enumeration that indicates whether the worker has completed all work. The "Observer" will use this enumerated value to note when a worker thread has completed its work:
  enum {
      k_MAX_CONSUMER_THREADS = 10,
      k_MAX_EVENT_TEXT       = 80
  };

  struct MyEvent {
      enum EventType {
          e_IN_PROGRESS   = 1,
          e_TASK_COMPLETE = 2
      };

      EventType d_type;
      int       d_workerId;
      int       d_eventNumber;
      char      d_eventText[k_MAX_EVENT_TEXT];
  };
As noted in the previous example, bslmt::ThreadUtil::create() spawns a new thread, which invokes a simple "C" function taking a void pointer. In the previous example, we simply converted that void pointer into a pointer to bdlcc::MultipriorityQueue<MyWorkRequest>.
In this example, however, we want to pass an additional data item. Each worker thread is initialized with a unique integer value ("worker Id"), which identifies that thread. We therefore create a simple struct that contains both of these values:
  struct MyWorkerData {
      int                               d_workerId;
      bdlcc::MultipriorityQueue<MyEvent> *d_queue;
  };
Function myWorker (below) simulates a working thread by enqueuing multiple MyEvent events during execution. In a realistic application, each MyEvent structure would likely contain different textual information. For the sake of simplicity, however, our loop uses a constant value for the text field. Note that various priorities are generated to illustrate the multi-priority aspect of this particular queue:
  void myWorker(int workerId, bdlcc::MultipriorityQueue<MyEvent> *queue)
  {
      const int N = queue->numPriorities();
      const int NUM_EVENTS = 5;
      int eventNumber;    // used also to generate mixed priorities

      // First push 'NUM_EVENTS' events onto 'queue' with mixed priorities.

      for (eventNumber = 0; eventNumber < NUM_EVENTS; ++eventNumber) {
          MyEvent ev = {
              MyEvent::e_IN_PROGRESS,
              workerId,
              eventNumber,
              "In-Progress Event"         // constant (for simplicity)
          };
          queue->pushBack(ev, eventNumber % N);       // mixed priorities
      }

      // Now push an event to end this task.

      MyEvent ev = {
          MyEvent::e_TASK_COMPLETE,
          workerId,
          eventNumber,
          "Task Complete"
      };
      queue->pushBack(ev, N - 1);                     // lowest priority
  }
The callback function myWorkerThread (below) invoked by bslmt::ThreadUtil::create takes the traditional void pointer. The expected data is the composite structure MyWorkerData. The callback function casts the void pointer to the application-specific data type and then uses the referenced object to construct a call to the myWorker function:
  extern "C" void *myWorkerThread(void *vWorkerPtr)
  {
      MyWorkerData *workerPtr = (MyWorkerData *)vWorkerPtr;
      myWorker(workerPtr->d_workerId, workerPtr->d_queue);
      return vWorkerPtr;
  }
For the sake of simplicity, we will implement the Observer behavior (below) in the main thread. The void function myObserver starts multiple threads running the myWorker function, reads MyEvent values from the queue, and logs all messages in the order of arrival.
As each myWorker thread terminates, it sends a e_TASK_COMPLETE event. Upon receiving this event, the myObserver function uses the d_workerId to find the relevant thread, and then "joins" that thread.
The myObserver function determines when all tasks have completed simply by counting the number of e_TASK_COMPLETE messages received:
  void myObserver()
  {
      const int k_NUM_THREADS    = 10;
      const int k_NUM_PRIORITIES = 4;

      bdlcc::MultipriorityQueue<MyEvent> queue(k_NUM_PRIORITIES);

      assert(0 < k_NUM_THREADS
          && k_NUM_THREADS <= static_cast<int>(k_MAX_CONSUMER_THREADS));
      bslmt::ThreadUtil::Handle workerHandles[k_MAX_CONSUMER_THREADS];

      // Create 'k_NUM_THREADS' threads, each having a unique "worker id".

      MyWorkerData workerData[k_NUM_THREADS];
      for (int i = 0; i < k_NUM_THREADS; ++i) {
          workerData[i].d_queue = &queue;
          workerData[i].d_workerId = i;
          bslmt::ThreadUtil::create(&workerHandles[i],
                                   myWorkerThread,
                                   &workerData[i]);
      }

      // Now print out each of the 'MyEvent' values as the threads complete.
      // This function ends after a total of 'k_NUM_THREADS'
      // 'MyEvent::e_TASK_COMPLETE' events have been printed.

      int nStop = 0;
      while (nStop < k_NUM_THREADS) {
          MyEvent ev;
          queue.popFront(&ev);
          bsl::cout << "[" << ev.d_workerId << "] "
                    << ev.d_eventNumber << ". "
                    << ev.d_eventText << bsl::endl;
          if (MyEvent::e_TASK_COMPLETE == ev.d_type) {
              ++nStop;
              bslmt::ThreadUtil::join(workerHandles[ev.d_workerId]);
          }
      }
  }