Quick Links:

bal | bbl | bdl | bsl

Namespaces

Component bdlcc_deque
[Package bdlcc]

Provide a fully thread-safe deque container. More...

Namespaces

namespace  bdlcc

Detailed Description

Outline
Purpose:
Provide a fully thread-safe deque container.
Classes:
bdlcc::Deque thread-safe bsl::deque wrapper
See also:
bsl::deque
Description:
This component provides bdlcc::Deque<TYPE>, a fully thread-safe implementation of an efficient, double-ended queue of (template parameter) TYPE values. bdlcc::Deque is effectively a thread-safe wrapper for bsl::deque, whose interface is also made available through proctor types that are nested classes.
Thread Safety:
bdlcc::Deque is fully thread-safe, meaning that all non-creator operations on an object can be safely invoked simultaneously from multiple threads.
Exception Safety:
Provided the template parameter TYPE provides the following exception safety guarantees:
  1. The destructor provides the no-throw guarantee.
  2. Copy construction and assignment provide the strong guarantee and do not modify the source.
  3. Move construction and assignment where the allocators of the source and destination match, or if the type is non-allocating, provide the no-throw guarantee.
  4. Move construction and assignment where the allocators of source and destination do not match behave like non-moving copy construction and assignment.
All operations on bdlcc::Deque provide the strong exception guarantee, both for the bdlcc::Deques own salient state and the salient state of the vector, if any, passed to manipulators. However, the non-salient capacity of the underlying bsl::deque and of the passed vector may be modified.
Design Rationale for bdlcc::Deque:
The fully thread-safe bdlcc::Deque is similar to bsl::deque in many regards, but there are several differences in method behavior and signature that arise due to the thread-aware nature of the container and its anticipated usage pattern.
A user of bsl::deque is expected to consult the size or empty accessors before reading or popping to determine whether elements are available in the container to be read or popped. This won't work in a multithreaded context since reading the accessor is a separate operation than the read or pop, and another thread may have altered the state of the container in between.
So we have eliminated the front, back and random-access methods. Reading is done from the ends of the container via the popFront and popBack methods, which return a TYPE object by value, rather than returning void, as pop_front and pop_back in bsl::deque do. Moreover, if a bdlcc::Deque object is empty, popFront and popBack will block indefinitely until an item is added to the container.
High-Water Mark Feature:
The behaviors of the push methods differ from those of bsl::deque in that they can block under certain circumstances. bdlcc::Deque supports the notion of a suggested maximum capacity known as the high-water mark. The high-water mark value is supplied at construction, and affects some of the various forms of push* methods. The container is considered to be full if it contains (at least) the high-water mark number of items, and the container has space available if it is not full. The high-water mark is set at construction and cannot be changed afterward. If no high-water mark is specified, the high-water mark of the container is effectively inifinite. Some of the variants of push operations (described below) may fail, and the return status of those operations indicates whether the operation succeeded, failed, or partially succeeded (which may happen, for example, when pushing a range of values).
bdlcc::Deque supports four variants of the two push methods, whose behaviors differ when the container is full (i.e. when the push would raise the length of the container above the high-water mark).
  1. blocking: (pushBack, pushFront): If the container is full, block until space is available, then push, otherwise push immediately.
  2. try (tryPushBack, tryPushFront): If the container is full, fail immediately. If space is available, succeed immediately. Note that partial success is possible in the case of a range try push.
  3. timed blocking: (timedPushBack, timedPushFront): If the container is full, block until either space is available or the specified timeout has been reached. If space was, or became, available, push and succeed, otherwise fail.
  4. force: (forcePushBack, forcePushFront): If the container is full, push anyway, increasing the container's size above its high-water mark, always succeeding immediately.
Note that the availability of force pushes means that the high-water mark is a suggestion and not an invariant.
The purpose of a high-water mark is to enable the client to use the container as a fixed-length container, where pushes that will grow it above a certain size will block. The purpose of the force pushes is to allow high-priority items to be pushed regardless of whether the container is full.
Proctor Access:
There are public nested classes bdlcc::Deque::Proctor and bdlcc::Deque::ConstProctor through which the client can directly access the underlying bsl::deque contained in the bdlcc::Deque. When a proctor object is created, it acquires the container's mutex, and allows the client to use the overloaded -> and * operators on the proctor object to access the underlying bsl::deque. operator[] is also provided for direct random access to that deque. Because the mutex is locked, manipulators of bdlcc::Deque called by other threads will block, thus allowing safe access to the underlying thread-unsafe container. When the proctor is destroyed (or released via the release method), the proctor signals the thread-aware container's condition variables to inform manipulators in other threads of new items being available for pops or new space becoming available for pushes.
Supported Clock-Types:
The component bsls::SystemClockType supplies the enumeration indicating the system clock on which the timedPush* and timedPop* methods should be based. If the clock type indicated at construction is bsls::SystemClockType::e_REALTIME, time should be expressed as an absolute offset since 00:00:00 UTC, January 1, 1970 (which matches the epoch used in bdlt::SystemTime::now(bsls::SystemClockType::e_REALTIME). If the clock type indicated at construction is bsls::SystemClockType::e_MONOTONIC, time should be expressed as an absolute offset since the epoch of this clock (which matches the epoch used in bsls::SystemTime::now(bsls::SystemClockType::e_MONOTONIC).
WARNING: Synchronization Required on Destruction:
The behavior for the destructor is undefined unless all access or modification of the object is completed prior to its destruction. Some form of synchronization, external to the component, is required to ensure this precondition on the destructor is met. For example, if two (or more) threads are manipulating a queue, it is not safe to anticipate the number of elements added to the queue, and destroy that queue immediately after the last element is popped (without additional synchronization) because one of the corresponding push functions may not have completed (push may, for instance, signal waiting threads after the element is considered added to the queue).
Tips For Migrating From bcec_Queue:
  • InitialCapacity has been eliminated. Instead, construct your bdlcc::Deque object and then use proctor access to call reserve on the contained bsl::deque to reserve the desired initial capacity. (Note that deque::reserve is not part of the C++ standard, though bsl::deque does implement it).
  • The mutex and condition variables are no longer directly exposed, in favor of the new proctor access, which gives direct access to the underlying bsl::deque, automatically locking the mutex and updating the condition variables as necessary.
  • A new, thread-safe length accessor is provided, eliminating the need to access the underlying thread-unsafe container to obtain its length.
Usage:
This section illustrates intended use of this component.
Example 1: A Queue of Work Requests:
First, declarer the struct WordData. Imagine it contains some data one wants to process:
  struct WorkData {
      // work data...
  };
Then, create the function that will produce a WorkData object:
  bool getWorkData(WorkData *)
      // Dummy implementation of 'getWorkData' function required by the usage
      // example.
  {
      static bsls::AtomicInt i(1);
      return ++i < 1000;
  }
Next, declare WorkRequest, the type of object that will be stored in the container:
  struct WorkRequest {
      // PUBLIC TYPES
      enum RequestType {
          e_WORK = 1,
          e_STOP = 2
      };

      // PUBLIC DATA
      RequestType d_type;
      WorkData d_data;
  };
Then, create the function that will do work on a WorkRequest object:
  void doWork(WorkData *workData)
      // Function that pretends to do work on the specified 'workData'.
  {
      // do some stuff with '*workData' ...

      (void) workData;
  }
Next, create the functor that will be run in the consumer threads:
  struct ConsumerFunctor {
      // DATA
      bdlcc::Deque<WorkRequest> *d_deque_p;

      // CREATORS
      explicit
      ConsumerFunctor(bdlcc::Deque<WorkRequest> *container)
          // Create a ''ConsumerFunctor' object that will consumer work
          // requests from the specified 'container'.
      : d_deque_p(container)
      {}

      // MANIPULATORS
      void operator()()
          // Pop work requests off the deque and process them until an
          // 'e_STOP' request is encountered.
      {
          WorkRequest item;

          do {
              item = d_deque_p->popFront();
              if (WorkRequest::e_WORK == item.d_type) {
                  doWork(&item.d_data);
              }
          } while (WorkRequest::e_STOP != item.d_type);
      }
  };
Then, create the functor that will be run in the producer threads:
  struct ProducerFunctor {
      // DATA
      bdlcc::Deque<WorkRequest> *d_deque_p;

      // CREATORS
      explicit
      ProducerFunctor(bdlcc::Deque<WorkRequest> *container)
          // Create a 'ProducerFunctor' object that will enqueue work
          // requests into the specified 'container'.
      : d_deque_p(container)
      {}

      // MANIPULATORS
      void operator()()
          // Enqueue work requests to the container until 'getWorkData'
          // returns 'false', then enqueue an 'e_STOP' request.
      {
          WorkRequest item;
          WorkData    workData;

          while (!getWorkData(&workData)) {
              item.d_type = WorkRequest::e_WORK;
              item.d_data = workData;
              d_deque_p->pushBack(item);
          }

          item.d_type = WorkRequest::e_STOP;
          d_deque_p->pushBack(item);
      }
  };
Next, in main, define the number of consumer and producer threads (these numbers must be equal).
  enum { k_NUM_CONSUMER_THREADS = 10,
         k_NUM_PRODUCER_THREADS = k_NUM_CONSUMER_THREADS };
Then, create our container: Next, create the array of thread handles for the threads we will spawn:
  bslmt::ThreadUtil::Handle handles[k_NUM_CONSUMER_THREADS +
                                    k_NUM_PRODUCER_THREADS];
Now, spawn all the consumers and producers:
  int ti = 0, rc;
  while (ti < k_NUM_CONSUMER_THREADS) {
      rc = bslmt::ThreadUtil::create(&handles[ti++],
                                     ConsumerFunctor(&deque));
      assert(0 == rc);
  }
  while (ti < k_NUM_CONSUMER_THREADS + k_NUM_PRODUCER_THREADS) {
      rc = bslmt::ThreadUtil::create(&handles[ti++],
                                     ProducerFunctor(&deque));
      assert(0 == rc);
  }
Finally, join all the threads after they finish and confirm the container is empty afterward:
  while (ti > 0) {
      rc = bslmt::ThreadUtil::join(handles[--ti]);
      assert(0 == rc);
  }
  assert(0 == deque.length());
Example 2: A Queue of Events:
First, we declare the Event type, that will be contained in our bdlcc::Deque object.
  struct Event {
      enum EventType {
          e_IN_PROGRESS   = 1,
          e_TASK_COMPLETE = 2 };

      EventType   d_type;
      int         d_workerId;
      int         d_eventNumber;
      const char *d_eventText_p;
  };

 Then, we define the number of events each thread will push:

  const int k_NUM_TO_PUSH = 5;

 Next, we declare our 'WorkerFunctor' type, that will push 'k_NUM_TO_PUSH'
 events into the deque.

  struct WorkerFunctor {
      int                  d_workerId;
      bdlcc::Deque<Event> *d_deque_p;
      bslmt::Barrier      *d_barrier_p;

      void operator()()
          // All the threads will block on the same barrier so they all start
          // at once to maximize concurrency.
      {
          d_barrier_p->wait();

          // Loop to push 'k_NUM_TO_PUSH - 1' events onto the deque.

          int evnum = 1;
          while (evnum < k_NUM_TO_PUSH) {
              // Yield every loop to maximize concurrency.

              bslmt::ThreadUtil::yield();

              // Create the event object.

              Event ev = {
                  Event::e_IN_PROGRESS,
                  d_workerId,
                  evnum++,
                  "In-Progress Event"
              };

              // Push the event object.

              d_deque_p->pushBack(ev);
          }

          // Create the completing event object.

          Event ev = {
              Event::e_TASK_COMPLETE,
              d_workerId,
              evnum,
              "Task Complete"
          };

          // Push the completing event object.

          d_deque_p->pushBack(ev);
      }
  };
Next, in main, define the number of threads:
  const int k_NUM_THREADS = 10;
Then, declare out bdlcc::Deque object, the set of handles of the subthreads, and our barrier object:
  bdlcc::Deque<Event>       myDeque;
  bslmt::ThreadUtil::Handle handles[k_NUM_THREADS];
  bslmt::Barrier            barrier(k_NUM_THREADS + 1);
Next, spawn the worker threads:
  for (int ti = 0; ti < k_NUM_THREADS; ++ti) {
      WorkerFunctor functor = { ti, &myDeque, &barrier };

      bslmt::ThreadUtil::create(&handles[ti], functor);
  }
Then, wait on the barrier, that will set all the subthreads running:
  barrier.wait();
Now, loop to pop the events off the deque, and keep track of how many e_COMPLETE events have been popped. When this equals the number of subthreads, we are done.
  int numCompleted = 0, numEvents = 0;
  while (numCompleted < k_NUM_THREADS) {
      Event ev = myDeque.popFront();
      ++numEvents;
      if (verbose) {
          cout << "[" << ev.d_workerId << "] "
               << ev.d_eventNumber << ". "
               << ev.d_eventText_p << endl;
      }
      if (Event::e_TASK_COMPLETE == ev.d_type) {
          ++numCompleted;
          int rc = bslmt::ThreadUtil::join(handles[ev.d_workerId]);
          assert(!rc);
      }
  }
Finally, perform some sanity checks:
  assert(k_NUM_THREADS * k_NUM_TO_PUSH == numEvents);
  assert(0 == myDeque.length());