Quick Links:

bal | bbl | bdl | bsl

Namespaces

Component bdlcc_timequeue
[Package bdlcc]

Provide an efficient queue for time events. More...

Namespaces

namespace  bdlcc

Detailed Description

Outline
Purpose:
Provide an efficient queue for time events.
Classes:
bdlcc::TimeQueue Templatized time event queue
bdlcc::TimeQueueItem (struct) Templatized item in the time event queue
See also:
Description:
This component provides a thread-safe and efficient templatized time queue. The queue stores an ordered list of time values and associated DATA. Each item added to the queue is assigned a unique identifier that can be used to efficiently remove the item making this queue suitable for conditions where time items are added and removed very frequently.
Class bdlcc::TimeQueue<DATA> provides a public interface which is similar in structure and intent to bdlcc::Queue<DATA>, with the exception that each item stored in the bdlcc::TimeQueue is of type bdlcc::TimeQueueItem<DATA>. This structure contains a single bsls::TimeInterval value along with the DATA value.
Idiomatic usage of bdlcc::TimeQueue includes the member function popLE, which finds all items on the queue whose bsls::TimeInterval are less than a specified value, and transfers those items to a provided vector of items. Through the use of this member function, clients can retrieve and process multiple elements that have expired, that is, whose bsls::TimeInterval values are in the past.
bdlcc::TimeQueue also makes use of an opaque data type bdlcc::TimeQueue::Handle which serves to identify an individual element on the Time Queue. A value of type Handle is returned from the add member function, and can then be used to remove or modify the corresponding element on the queue. In this way, the update member function can update the time value for a specific bdlcc::TimeQueueItem without removing it from the queue.
bdlcc::TimeQueue::Handle Uniqueness, Reuse and numIndexBits:
bdlcc::TimeQueue::Handle is an alias for a 32-bit int type. A handle consists of two parts, the "index section" and the "iteration section". The index section, which is the low-order numIndexBits (which defaults to numIndexBits == 17), uniquely identifies the node. Once a node is added, it never ceases to exist - it may be freed, but it will be kept on a free list to be eventually recycled, and the same index section will always identify that node. The iteration section, the high-order 32 - numIndexBits, is changed every time a node is freed, so that an out-of-date handle can be identified as out-of-date. But since the iteration section has only a finite number of bits, if a node is freed and re-added enough times, old handle values will eventually be reused.
Up to 2 ** numIndexBits - 1 nodes can exist in a given time queue. A given handle won't be reused for a node until that node has been freed and reused 2 ** (32 - numIndexBits) - 1 times.
numIndexBits is an optional parameter to the time queue constructors. If unspecified, it has a value of 17. The behavior is undefined unless the specified numIndexBits is in the range 8 <= numIndexBits <= 24.
Thread Safety:
It is safe to access or modify two distinct bdlcc::TimeQueue objects simultaneously, each from a separate thread. It is safe to access or modify a single bdlcc::TimeQueue object simultaneously from two or more separate threads.
It is safe to enqueue objects in a bdlcc::TimeQueue object whose destructor may access or even modify the same bdlcc::TimeQueue object. However, there is no guarantee regarding the safety of enqueuing objects whose copy constructors or assignment operators may modify or even merely access the same bdlcc::TimeQueue object (except length). Such attempts generally lead to a deadlock.
Ordering:
For a given bsls::TimeInterval value, the order of item removal (via popFront, popLE, removeAll, etc.) is guaranteed to match the order of item insertion (via add) for a particular insertion thread or group of externally synchronized insertion threads.
Usage:
The following shows a typical usage of the bdlcc::TimeQueue class, implementing a simple threaded server my_Server that manages individual Connections (my_Connection) on behalf of multiple Sessions (my_Session). Each Connection is timed, such that input requests on that Connection will "time out" after a user-specified time interval. When a specific Connection times out, that Connection is removed from the bdlcc::TimeQueue and the corresponding my_Session is informed.
In this simplified example, class my_Session will terminate when its Connection times out. A more sophisticated implementation of my_Session would attempt recovery, perhaps by closing and reopening the physical Connection.
Forward Declarations:
Class my_Server will spawn two service threads to monitor connections for available data and to manage time-outs, respectively. Two forward-declared "C" functions are invoked as the threads are spawned. The signature of each function follows the "C" standard "'void *'" interface for spawning threads. Each function will be called on a new thread when the start method is invoked for a given my_Server object. Each function then delegates processing for the thread back to the my_Server object that spawned it.
  extern "C" {

  void *my_connectionMonitorThreadEntry(void *server);

  void *my_timerMonitorThreadEntry(void *server);

  }
struct my_Connection:
The my_Connection structure is used by my_Server to manage a single physical connection on behalf of a my_Session.
  class my_Session;
  struct my_Connection {
      int         d_timerId;
      my_Session *d_session_p;
  };
Protocol Classes:
Protocol class my_Session provides a pure abstract protocol to manage a single "session" to be associated with a specific connection on a server.
  class my_Session {
      // Pure protocol class to process a data buffer of arbitrary size.
      // Concrete implementations in the "real world" would typically manage
      // an external connection like a socket.

    public:
      my_Session();
      virtual int processData(void *data, int length) = 0;
      virtual int handleTimeout(my_Connection *connection) = 0;
      virtual ~my_Session();
  };
The constructor and destructor do nothing:
  my_Session::my_Session()
  {
  }

  my_Session::~my_Session()
  {
  }
Protocol class my_Server provides a partial implementation of a simple server that supports and monitors an arbitrary number of connections and handles incoming data for those connections. Clients must provide a concrete implementation that binds connections to concrete my_Session objects and monitors all open connections for incoming requests. The concrete implementation calls my_Server::newConnection() when a new connections is required, and implements the virtual function monitorConnections to monitor all open connections.
  class my_Server {
      // Simple server supporting multiple Connections.

      bsl::vector<my_Connection*>      d_connections;
      bdlcc::TimeQueue<my_Connection*> d_timeQueue;
      int                              d_ioTimeout;
      bslmt::Mutex                     d_timerMonitorMutex;
      bslmt::Condition                 d_timerChangedCond;
      bslmt::ThreadUtil::Handle        d_connectionThreadHandle;
      bslmt::ThreadUtil::Handle        d_timerThreadHandle;
      volatile bool                    d_done;

    protected:
      void newConnection(my_Connection *connection);
          // Add the specified 'connection' to the current 'my_Server',
          // setting the new timeout value to the current time plus the
          // timeout value provided at construction of this 'my_Server'
          // instance.  If the added connection is the new "top" of the
          // queue, signal that the minimum time on the queue has changed.
          // Upon seeing this signal, the TimerMonitor thread will wake up
          // and look for expired timers.
          //
          // Behavior is undefined if 'connection' has already been added to
          // any 'my_Server' and has not been removed via member function
          // 'closeConnection'.

      void removeConnection(my_Connection *connection);
          // Remove the specified 'connection' from the current 'my_Server',
          // so that it will no longer be monitored for available data.

      virtual void closeConnection(my_Connection *connection)=0;
          // Provide a mechanism for a concrete implementation to close a
          // specified 'connection'.

      void dataAvailable(my_Connection *connection,
                         void          *buffer_p,
                         int            length);
          // Receive in the specified 'buffer_p' a pointer to a data buffer
          // of the specified 'length' bytes, and pass this to the specified
          // 'connection' to be processed.  Behavior is undefined if
          // 'connection' is not currently added to this 'my_Server' object,
          // or if 'length' <= 0.

    protected:
      virtual void monitorConnections()=0;
          // Monitor all connections in the current 'my_Server'.  When data
          // becomes available for a given connection, pass the data to that
          // connection for processing.

      void monitorTimers();
          // Monitor all timers in the current 'my_Server', and handle each
          // timer as it expires.

      friend void *my_connectionMonitorThreadEntry(void *server);
      friend void *my_timerMonitorThreadEntry(void *server);

    private:
      // Not implemented:
      my_Server(const my_Server&);

    public:
      // CREATORS
      explicit
      my_Server(int ioTimeout, bslma::Allocator *basicAllocator = 0);
          // Construct a 'my_Server' object with a timeout value of the
          // specified 'ioTimeout' seconds.  Use the optionally specified
          // 'basicAllocator' for all memory allocation for data members of
          // 'my_Server'.

      virtual ~my_Server();

      // MANIPULATORS
      int start();
          // Begin monitoring timers and connections.
  };
The constructor is simple: it initializes the internal bdlcc::TimeQueue and sets the I/O timeout value. The virtual destructor sets a shared completion flag to indicate completion, wakes up all waiting threads, and waits for them to join.
  my_Server::my_Server(int ioTimeout, bslma::Allocator *basicAllocator)
  : d_timeQueue(basicAllocator)
  , d_ioTimeout(ioTimeout)
  , d_connectionThreadHandle(bslmt::ThreadUtil::invalidHandle())
  , d_timerThreadHandle(bslmt::ThreadUtil::invalidHandle())
  {
  }

  my_Server::~my_Server()
  {
      d_done = true;
      d_timerChangedCond.broadcast();
      if (bslmt::ThreadUtil::invalidHandle() != d_connectionThreadHandle) {
          bslmt::ThreadUtil::join(d_connectionThreadHandle);
      }
      if (bslmt::ThreadUtil::invalidHandle()!= d_timerThreadHandle) {
          bslmt::ThreadUtil::join(d_timerThreadHandle);
      }
  }
Member function newConnection adds the connection to the current set of connections to be monitored. This is done in two steps. First, the connection is added to the internal array, and then a timer is set for the connection by creating a corresponding entry in the internal bdlcc::TimeQueue.
  void my_Server::newConnection(my_Connection *connection)
  {
      d_connections.push_back(connection);
      int isNewTop = 0;
      connection->d_timerId = d_timeQueue.add(bdlt::CurrentTime::now() +
                                                                 d_ioTimeout,
                                              connection,
                                              &isNewTop);
      if (isNewTop) {
          bslmt::LockGuard<bslmt::Mutex> lock(&d_timerMonitorMutex);
          d_timerChangedCond.signal();
      }
  }
Member function monitorConnections, provided by the concrete implementation class, can use the internal array to determine the set of connections to be monitored.
Member function removeConnection removes the connection from the current set of connections to be monitored. This is done in two steps, in reversed order from newConnection. First, the connection is removed from the internal bdlcc::TimeQueue, and then the connection is removed from the internal array.
The concrete implementation class must provide an implementation of virtual function closeConnection; this implementation must call removeConnection when the actual connection is to be removed from the my_Server object.
Function closeConnection is in turn called by function monitorTimers, which manages the overall timer monitor thread. Because monitorTimers takes responsibility for notifying other threads when the queue status changes, function removeConnection does not address these concerns.
  void my_Server::removeConnection(my_Connection *connection)
  {
      // Remove from d_timeQueue
      d_timeQueue.remove(connection->d_timerId);
      // Remove from d_connections
      bsl::vector<my_Connection*>::iterator begin = d_connections.begin(),
          end = d_connections.end(),
          it = begin;
      for (; it != end; ++it) {
          if (connection == *it) {
              d_connections.erase(it);
          }
      }
  }
The dataAvailable function will be called when data becomes available for a specific connection. It removes the connection from the timer queue while the connection is busy, processes the available data, and returns the connection to the queue with a new time value.
  void my_Server::dataAvailable(my_Connection *connection,
                                void          *buffer_p,
                                int            length)
  {
      if (connection->d_timerId) {
          if (d_timeQueue.remove(connection->d_timerId))  return;   // RETURN
          connection->d_timerId = 0;
      }
      connection->d_session_p->processData(buffer_p, length);

      int isNewTop = 0;

      connection->d_timerId = d_timeQueue.add(bdlt::CurrentTime::now() +
                                                                 d_ioTimeout,
                                              connection,
                                              &isNewTop);
      if (isNewTop) {
          bslmt::LockGuard<bslmt::Mutex> lock(&d_timerMonitorMutex);
          d_timerChangedCond.signal();
      }
  }
Function monitorTimers manages the timer monitor thread; it is called when the thread is spawned, and checks repeatedly for expired timers; after each check, it does a timed wait based upon the minimum time value seen in the queue after all expired timers have been removed.
  void my_Server::monitorTimers()
  {
      while (!d_done) {
          bsl::vector<bdlcc::TimeQueueItem<my_Connection*> > expiredTimers;
          {
              bslmt::LockGuard<bslmt::Mutex> lock(&d_timerMonitorMutex);
              bsls::TimeInterval minTime;
              int newLength;

              d_timeQueue.popLE(bdlt::CurrentTime::now(),
                                &expiredTimers,
                                &newLength,
                                &minTime );

              if (!expiredTimers.size()) {
                  if (newLength) {
                      // no expired timers, but unexpired timers remain.
                      d_timerChangedCond.timedWait(&d_timerMonitorMutex,
                                                   minTime);
                  }
                  else {
                      // no expired timers, and timer queue is empty.
                      d_timerChangedCond.wait(&d_timerMonitorMutex);
                  }
                  continue;
              }
          }

          int length = static_cast<int>(expiredTimers.size());
          if (length) {
              bdlcc::TimeQueueItem<my_Connection*> *data =
                                                      &expiredTimers.front();
              for (int i = 0; i < length; ++i) {
                  closeConnection(data[i].data());
              }
          }
      }
  }
Function start spawns two separate threads. The first thread will monitor connections and handle any data received on them. The second monitors the internal timer queue and removes connections that have timed out. Function start calls bslmt::ThreadUtil::create, which expects a function pointer to a function with the standard "C" callback signature void *fn(void *data). This non-member function will call back into the my_Server object immediately.
  int my_Server::start()
  {
      bslmt::ThreadAttributes attr;

      if (bslmt::ThreadUtil::create(&d_connectionThreadHandle, attr,
                                   &my_connectionMonitorThreadEntry,
                                   this)) {
          return -1;                                                // RETURN
      }

      if (bslmt::ThreadUtil::create(&d_timerThreadHandle, attr,
                                   &my_timerMonitorThreadEntry,
                                   this)) {
          return -1;                                                // RETURN
      }
      return 0;
  }
Finally, we are now in a position to implement the two thread dispatchers:
  extern "C" {

  void *my_connectionMonitorThreadEntry(void *server)
  {
      ((my_Server*)server)->monitorConnections();
      return server;
  }

  void *my_timerMonitorThreadEntry(void *server)
  {
      ((my_Server*)server)->monitorTimers();
      return server;
  }

  }
In order to test our server, we provide two concrete implementations of a test session and of a test server as follows.
  // myTestSession.h             -*-C++-*-

  class my_TestSession : public my_Session {
      // Concrete implementation of my_Session, providing simple test
      // semantics In particular, implement the virtual function
      // processData() to record all incoming data for the controlling
      // connection, and virtual function handleTimeout() for handling
      // timeouts.

      int d_verbose;

    public:
      // CREATORS
      explicit
      my_TestSession(int verbose) : my_Session(), d_verbose(verbose) { }

      // MANIPULATORS
      virtual int handleTimeout(my_Connection *connection)
      {
          // Do something to handle timeout.
          if (d_verbose) {
              bsl::cout << bdlt::CurrentTime::utc() << ": ";
              bsl::cout << "Connection " << connection << "timed out.\n";
          }
          return 0;
      }

      virtual int processData(void *data, int length)
      {
          // Do something with the data...
          if (d_verbose) {
              bsl::cout << bdlt::CurrentTime::utc() << ": ";
              bsl::cout << "Processing data at address " << data
                        << " and length " << length << ".\n";
          }
          return 0;
      }
  };

  // myTestSession.h             -*-C++-*-

  class my_TestServer :  public my_Server {
      // Concrete implementation of my_Server, providing connection logic.

      int d_verbose;

    protected:
      virtual void closeConnection(my_Connection *connection);
          // Close the specified external 'connection' and call
          // 'removeConnection' when done.

      virtual void monitorConnections();
          // Monitor all connections in the current 'my_Server'.  When data
          // becomes available for a given connection, pass the data to that
          // connection for processing.

    private:
      // Not implemented:
      my_TestServer(const my_TestServer&);

    public:
      // CREATORS
      explicit
      my_TestServer(int               ioTimeout,
                    int               verbose = 0,
                    bslma::Allocator *basicAllocator = 0)
      : my_Server(ioTimeout, basicAllocator)
      , d_verbose(verbose)
      {
      }

      virtual ~my_TestServer();
  };

  // myTestSession.cpp             -*-C++-*-

  my_TestServer::~my_TestServer()
  {
  }

  void my_TestServer::closeConnection(my_Connection *connection)
  {
      if (d_verbose) {
          bsl::cout << bdlt::CurrentTime::utc() << ": ";
          bsl::cout << "Closing connection " << connection << bsl::endl;
      }
      delete connection;
  }

  void my_TestServer::monitorConnections()
  {
      my_Session *session = new my_TestSession(d_verbose);

      // Simulate connection monitor logic...
      my_Connection *connection1 = new my_Connection;
      connection1->d_session_p = session;
      newConnection(connection1);
      if (d_verbose) {
          bsl::cout << bdlt::CurrentTime::utc() << ": ";
          bsl::cout << "Opening connection " << connection1 << endl;
      }

      my_Connection *connection2 = new my_Connection;
      connection2->d_session_p = session;
      newConnection(connection2);
      if (d_verbose) {
          bsl::cout << bdlt::CurrentTime::utc() << ": ";
          bsl::cout << "Opening connection " << connection2 << endl;
      }

      bslmt::ThreadUtil::sleep(bsls::TimeInterval(2)); // 2s

      // Simulate transmission...
      const int  length = 1024;
      const char*buffer[length];
      if (d_verbose) {
          bsl::cout << bdlt::CurrentTime::utc() << ": ";
          bsl::cout << "Connection " << connection1
                    << " receives " << length << " bytes " << endl;
      }
      dataAvailable(connection1, buffer, length);

      // Wait for timeout to occur, otherwise session gets destroyed from
      // stack too early.

      bslmt::ThreadUtil::sleep(bsls::TimeInterval(8)); // 8s
  }
The program that would exercise this test server would simply consist of:
  int usageExample(int verbose)
  {
      my_TestServer mX(5, verbose); // timeout for connections: 5s
      mX.start();

      // Wait sufficiently long to observe all events.
      bslmt::ThreadUtil::sleep(bsls::TimeInterval(10)); // 10s

      return 0;
  }
The output of this program would look something as follows:
  17:10:35.000: Opening connection 0x00161880
  17:10:35.000: Opening connection 0x001618b0
  17:10:37.000: Connection 0x00161880 receives 1024 bytes
  17:10:37.000: Processing data at address 0xfeefaf04 and length 1024.
  17:10:40.000: Closing connection 0x001618b0
  17:10:42.000: Closing connection 0x00161880