BDE 4.14.0 Production release
Loading...
Searching...
No Matches
bdlcc_timequeue

Detailed Description

Outline

Usage

Purpose

Provide an efficient queue for time events.

Classes

See also

Description

This component provides a thread-safe and efficient templatized time queue. The queue stores an ordered list of time values and associated DATA. Each item added to the queue is assigned a unique identifier that can be used to efficiently remove the item making this queue suitable for conditions where time items are added and removed very frequently.

Class bdlcc::TimeQueue<DATA> provides a public interface which is similar in structure and intent to bdlcc::Queue<DATA>, with the exception that each item stored in the bdlcc::TimeQueue is of type bdlcc::TimeQueueItem<DATA>. This structure contains a single bsls::TimeInterval value along with the DATA value.

Idiomatic usage of bdlcc::TimeQueue includes the member function popLE, which finds all items on the queue whose bsls::TimeInterval are less than a specified value, and transfers those items to a provided vector of items. Through the use of this member function, clients can retrieve and process multiple elements that have expired, that is, whose bsls::TimeInterval values are in the past.

bdlcc::TimeQueue also makes use of an opaque data type bdlcc::TimeQueue::Handle which serves to identify an individual element on the Time Queue. A value of type Handle is returned from the add member function, and can then be used to remove or modify the corresponding element on the queue. In this way, the update member function can update the time value for a specific bdlcc::TimeQueueItem without removing it from the queue.

bdlcc::TimeQueue::Handle Uniqueness, Reuse and numIndexBits

bdlcc::TimeQueue::Handle is an alias for a 32-bit int type. A handle consists of two parts, the "index section" and the "iteration section". The index section, which is the low-order numIndexBits (which defaults to numIndexBits == 17), uniquely identifies the node. Once a node is added, it never ceases to exist - it may be freed, but it will be kept on a free list to be eventually recycled, and the same index section will always identify that node. The iteration section, the high-order 32 - numIndexBits, is changed every time a node is freed, so that an out-of-date handle can be identified as out-of-date. But since the iteration section has only a finite number of bits, if a node is freed and re-added enough times, old handle values will eventually be reused.

Up to 2 ** numIndexBits - 1 nodes can exist in a given time queue. A given handle won't be reused for a node until that node has been freed and reused 2 ** (32 - numIndexBits) - 1 times.

numIndexBits is an optional parameter to the time queue constructors. If unspecified, it has a value of 17. The behavior is undefined unless the specified numIndexBits is in the range 8 <= numIndexBits <= 24.

Thread Safety

It is safe to access or modify two distinct bdlcc::TimeQueue objects simultaneously, each from a separate thread. It is safe to access or modify a single bdlcc::TimeQueue object simultaneously from two or more separate threads.

It is safe to enqueue objects in a bdlcc::TimeQueue object whose destructor may access or even modify the same bdlcc::TimeQueue object. However, there is no guarantee regarding the safety of enqueuing objects whose copy constructors or assignment operators may modify or even merely access the same bdlcc::TimeQueue object (except length). Such attempts generally lead to a deadlock.

Ordering

For a given bsls::TimeInterval value, the order of item removal (via popFront, popLE, removeAll, etc.) is guaranteed to match the order of item insertion (via add) for a particular insertion thread or group of externally synchronized insertion threads.

Usage

The following shows a typical usage of the bdlcc::TimeQueue class, implementing a simple threaded server my_Server that manages individual Connections (my_Connection) on behalf of multiple Sessions (my_Session). Each Connection is timed, such that input requests on that Connection will "time out" after a user-specified time interval. When a specific Connection times out, that Connection is removed from the bdlcc::TimeQueue and the corresponding my_Session is informed.

In this simplified example, class my_Session will terminate when its Connection times out. A more sophisticated implementation of my_Session would attempt recovery, perhaps by closing and reopening the physical Connection.

Forward Declarations

Class my_Server will spawn two service threads to monitor connections for available data and to manage time-outs, respectively. Two forward-declared "C" functions are invoked as the threads are spawned. The signature of each function follows the "C" standard "`void *`" interface for spawning threads. Each function will be called on a new thread when the start method is invoked for a given my_Server object. Each function then delegates processing for the thread back to the my_Server object that spawned it.

extern "C" {
void *my_connectionMonitorThreadEntry(void *server);
void *my_timerMonitorThreadEntry(void *server);
}

struct my_Connection

The my_Connection structure is used by my_Server to manage a single physical connection on behalf of a my_Session.

class my_Session;
struct my_Connection {
int d_timerId;
my_Session *d_session_p;
};

Protocol Classes

Protocol class my_Session provides a pure abstract protocol to manage a single "session" to be associated with a specific connection on a server.

/// Pure protocol class to process a data buffer of arbitrary size.
/// Concrete implementations in the "real world" would typically manage
/// an external connection like a socket.
class my_Session {
public:
my_Session();
virtual int processData(void *data, int length) = 0;
virtual int handleTimeout(my_Connection *connection) = 0;
virtual ~my_Session();
};

The constructor and destructor do nothing:

my_Session::my_Session()
{
}
my_Session::~my_Session()
{
}

Protocol class my_Server provides a partial implementation of a simple server that supports and monitors an arbitrary number of connections and handles incoming data for those connections. Clients must provide a concrete implementation that binds connections to concrete my_Session objects and monitors all open connections for incoming requests. The concrete implementation calls my_Server::newConnection() when a new connections is required, and implements the virtual function monitorConnections to monitor all open connections.

/// Simple server supporting multiple Connections.
class my_Server {
int d_ioTimeout;
bslmt::Mutex d_timerMonitorMutex;
bslmt::Condition d_timerChangedCond;
bslmt::ThreadUtil::Handle d_connectionThreadHandle;
bslmt::ThreadUtil::Handle d_timerThreadHandle;
volatile bool d_done;
protected:
/// Add the specified `connection` to the current `my_Server`,
/// setting the new timeout value to the current time plus the
/// timeout value provided at construction of this `my_Server`
/// instance. If the added connection is the new "top" of the
/// queue, signal that the minimum time on the queue has changed.
/// Upon seeing this signal, the TimerMonitor thread will wake up
/// and look for expired timers.
///
/// Behavior is undefined if `connection` has already been added to
/// any `my_Server` and has not been removed via member function
/// `closeConnection`.
void newConnection(my_Connection *connection);
/// Remove the specified `connection` from the current `my_Server`,
/// so that it will no longer be monitored for available data.
void removeConnection(my_Connection *connection);
/// Provide a mechanism for a concrete implementation to close a
/// specified `connection`.
virtual void closeConnection(my_Connection *connection)=0;
/// Receive in the specified `buffer_p` a pointer to a data buffer
/// of the specified `length` bytes, and pass this to the specified
/// `connection` to be processed. Behavior is undefined if
/// `connection` is not currently added to this `my_Server` object,
/// or if `length` <= 0.
void dataAvailable(my_Connection *connection,
void *buffer_p,
int length);
protected:
/// Monitor all connections in the current `my_Server`. When data
/// becomes available for a given connection, pass the data to that
/// connection for processing.
virtual void monitorConnections()=0;
/// Monitor all timers in the current `my_Server`, and handle each
/// timer as it expires.
void monitorTimers();
friend void *my_connectionMonitorThreadEntry(void *server);
friend void *my_timerMonitorThreadEntry(void *server);
private:
// Not implemented:
my_Server(const my_Server&);
public:
// CREATORS
/// Construct a `my_Server` object with a timeout value of the
/// specified `ioTimeout` seconds. Use the optionally specified
/// `basicAllocator` for all memory allocation for data members of
/// `my_Server`.
explicit
my_Server(int ioTimeout, bslma::Allocator *basicAllocator = 0);
virtual ~my_Server();
// MANIPULATORS
/// Begin monitoring timers and connections.
int start();
};
Definition bdlcc_timequeue.h:706
Definition bslstl_vector.h:1025
Definition bslma_allocator.h:457
Definition bslmt_condition.h:220
Definition bslmt_mutex.h:315
Imp::Handle Handle
Definition bslmt_threadutil.h:385

The constructor is simple: it initializes the internal bdlcc::TimeQueue and sets the I/O timeout value. The virtual destructor sets a shared completion flag to indicate completion, wakes up all waiting threads, and waits for them to join.

my_Server::my_Server(int ioTimeout, bslma::Allocator *basicAllocator)
: d_timeQueue(basicAllocator)
, d_ioTimeout(ioTimeout)
, d_connectionThreadHandle(bslmt::ThreadUtil::invalidHandle())
, d_timerThreadHandle(bslmt::ThreadUtil::invalidHandle())
{
}
my_Server::~my_Server()
{
d_done = true;
d_timerChangedCond.broadcast();
if (bslmt::ThreadUtil::invalidHandle() != d_connectionThreadHandle) {
bslmt::ThreadUtil::join(d_connectionThreadHandle);
}
if (bslmt::ThreadUtil::invalidHandle()!= d_timerThreadHandle) {
bslmt::ThreadUtil::join(d_timerThreadHandle);
}
}
Definition bslmt_barrier.h:344
static int join(Handle &threadHandle, void **status=0)
Definition bslmt_threadutil.h:949
static const Handle & invalidHandle()
Definition bslmt_threadutil.h:1057

Member function newConnection adds the connection to the current set of connections to be monitored. This is done in two steps. First, the connection is added to the internal array, and then a timer is set for the connection by creating a corresponding entry in the internal bdlcc::TimeQueue.

void my_Server::newConnection(my_Connection *connection)
{
d_connections.push_back(connection);
int isNewTop = 0;
connection->d_timerId = d_timeQueue.add(bdlt::CurrentTime::now() +
d_ioTimeout,
connection,
&isNewTop);
if (isNewTop) {
bslmt::LockGuard<bslmt::Mutex> lock(&d_timerMonitorMutex);
d_timerChangedCond.signal();
}
}
Definition bslmt_lockguard.h:234
static bsls::TimeInterval now()
Definition bdlt_currenttime.h:290

Member function monitorConnections, provided by the concrete implementation class, can use the internal array to determine the set of connections to be monitored.

Member function removeConnection removes the connection from the current set of connections to be monitored. This is done in two steps, in reversed order from newConnection. First, the connection is removed from the internal bdlcc::TimeQueue, and then the connection is removed from the internal array.

The concrete implementation class must provide an implementation of virtual function closeConnection; this implementation must call removeConnection when the actual connection is to be removed from the my_Server object.

Function closeConnection is in turn called by function monitorTimers, which manages the overall timer monitor thread. Because monitorTimers takes responsibility for notifying other threads when the queue status changes, function removeConnection does not address these concerns.

void my_Server::removeConnection(my_Connection *connection)
{
// Remove from d_timeQueue
d_timeQueue.remove(connection->d_timerId);
// Remove from d_connections
bsl::vector<my_Connection*>::iterator begin = d_connections.begin(),
end = d_connections.end(),
it = begin;
for (; it != end; ++it) {
if (connection == *it) {
d_connections.erase(it);
}
}
}
VALUE_TYPE * iterator
Definition bslstl_vector.h:1057

The dataAvailable function will be called when data becomes available for a specific connection. It removes the connection from the timer queue while the connection is busy, processes the available data, and returns the connection to the queue with a new time value.

void my_Server::dataAvailable(my_Connection *connection,
void *buffer_p,
int length)
{
if (connection->d_timerId) {
if (d_timeQueue.remove(connection->d_timerId)) return; // RETURN
connection->d_timerId = 0;
}
connection->d_session_p->processData(buffer_p, length);
int isNewTop = 0;
connection->d_timerId = d_timeQueue.add(bdlt::CurrentTime::now() +
d_ioTimeout,
connection,
&isNewTop);
if (isNewTop) {
bslmt::LockGuard<bslmt::Mutex> lock(&d_timerMonitorMutex);
d_timerChangedCond.signal();
}
}

Function monitorTimers manages the timer monitor thread; it is called when the thread is spawned, and checks repeatedly for expired timers; after each check, it does a timed wait based upon the minimum time value seen in the queue after all expired timers have been removed.

void my_Server::monitorTimers()
{
while (!d_done) {
{
bslmt::LockGuard<bslmt::Mutex> lock(&d_timerMonitorMutex);
int newLength;
d_timeQueue.popLE(bdlt::CurrentTime::now(),
&expiredTimers,
&newLength,
&minTime );
if (!expiredTimers.size()) {
if (newLength) {
// no expired timers, but unexpired timers remain.
d_timerChangedCond.timedWait(&d_timerMonitorMutex,
minTime);
}
else {
// no expired timers, and timer queue is empty.
d_timerChangedCond.wait(&d_timerMonitorMutex);
}
continue;
}
}
int length = static_cast<int>(expiredTimers.size());
if (length) {
&expiredTimers.front();
for (int i = 0; i < length; ++i) {
closeConnection(data[i].data());
}
}
}
}
Definition bdlcc_timequeue.h:1135
size_type size() const BSLS_KEYWORD_NOEXCEPT
Return the number of elements in this vector.
Definition bslstl_vector.h:2664
reference front()
Definition bslstl_vector.h:2567
Definition bsls_timeinterval.h:301
BSLS_KEYWORD_CONSTEXPR CONTAINER::value_type * data(CONTAINER &container)
Definition bslstl_iterator.h:1231

Function start spawns two separate threads. The first thread will monitor connections and handle any data received on them. The second monitors the internal timer queue and removes connections that have timed out. Function start calls bslmt::ThreadUtil::create, which expects a function pointer to a function with the standard "C" callback signature void *fn(void *data). This non-member function will call back into the my_Server object immediately.

int my_Server::start()
{
if (bslmt::ThreadUtil::create(&d_connectionThreadHandle, attr,
&my_connectionMonitorThreadEntry,
this)) {
return -1; // RETURN
}
if (bslmt::ThreadUtil::create(&d_timerThreadHandle, attr,
&my_timerMonitorThreadEntry,
this)) {
return -1; // RETURN
}
return 0;
}
Definition bslmt_threadattributes.h:356
static int create(Handle *handle, ThreadFunction function, void *userData)
Definition bslmt_threadutil.h:813

Finally, we are now in a position to implement the two thread dispatchers:

extern "C" {
void *my_connectionMonitorThreadEntry(void *server)
{
((my_Server*)server)->monitorConnections();
return server;
}
void *my_timerMonitorThreadEntry(void *server)
{
((my_Server*)server)->monitorTimers();
return server;
}
}

In order to test our server, we provide two concrete implementations of a test session and of a test server as follows.

// myTestSession.h -*-C++-*-
/// Concrete implementation of my_Session, providing simple test
/// semantics In particular, implement the virtual function
/// processData() to record all incoming data for the controlling
/// connection, and virtual function handleTimeout() for handling
/// timeouts.
class my_TestSession : public my_Session {
int d_verbose;
public:
// CREATORS
explicit
my_TestSession(int verbose) : my_Session(), d_verbose(verbose) { }
// MANIPULATORS
virtual int handleTimeout(my_Connection *connection)
{
// Do something to handle timeout.
if (d_verbose) {
bsl::cout << bdlt::CurrentTime::utc() << ": ";
bsl::cout << "Connection " << connection << "timed out.\n";
}
return 0;
}
virtual int processData(void *data, int length)
{
// Do something with the data...
if (d_verbose) {
bsl::cout << bdlt::CurrentTime::utc() << ": ";
bsl::cout << "Processing data at address " << data
<< " and length " << length << ".\n";
}
return 0;
}
};
// myTestSession.h -*-C++-*-
/// Concrete implementation of my_Server, providing connection logic.
class my_TestServer : public my_Server {
int d_verbose;
protected:
/// Close the specified external `connection` and call
/// `removeConnection` when done.
virtual void closeConnection(my_Connection *connection);
/// Monitor all connections in the current `my_Server`. When data
/// becomes available for a given connection, pass the data to that
/// connection for processing.
virtual void monitorConnections();
private:
// NOT IMPLEMENTED
my_TestServer(const my_TestServer&);
public:
// CREATORS
explicit
my_TestServer(int ioTimeout,
int verbose = 0,
bslma::Allocator *basicAllocator = 0)
: my_Server(ioTimeout, basicAllocator)
, d_verbose(verbose)
{
}
virtual ~my_TestServer();
};
// myTestSession.cpp -*-C++-*-
my_TestServer::~my_TestServer()
{
}
void my_TestServer::closeConnection(my_Connection *connection)
{
if (d_verbose) {
bsl::cout << bdlt::CurrentTime::utc() << ": ";
bsl::cout << "Closing connection " << connection << bsl::endl;
}
delete connection;
}
void my_TestServer::monitorConnections()
{
my_Session *session = new my_TestSession(d_verbose);
// Simulate connection monitor logic...
my_Connection *connection1 = new my_Connection;
connection1->d_session_p = session;
newConnection(connection1);
if (d_verbose) {
bsl::cout << bdlt::CurrentTime::utc() << ": ";
bsl::cout << "Opening connection " << connection1 << endl;
}
my_Connection *connection2 = new my_Connection;
connection2->d_session_p = session;
newConnection(connection2);
if (d_verbose) {
bsl::cout << bdlt::CurrentTime::utc() << ": ";
bsl::cout << "Opening connection " << connection2 << endl;
}
// Simulate transmission...
const int length = 1024;
const char*buffer[length];
if (d_verbose) {
bsl::cout << bdlt::CurrentTime::utc() << ": ";
bsl::cout << "Connection " << connection1
<< " receives " << length << " bytes " << endl;
}
dataAvailable(connection1, buffer, length);
// Wait for timeout to occur, otherwise session gets destroyed from
// stack too early.
}
static Datetime utc()
Definition bdlt_currenttime.h:296
static void sleep(const bsls::TimeInterval &sleepTime)
Definition bslmt_threadutil.h:967

The program that would exercise this test server would simply consist of:

int usageExample(int verbose)
{
my_TestServer mX(5, verbose); // timeout for connections: 5s
mX.start();
// Wait sufficiently long to observe all events.
return 0;
}

The output of this program would look something as follows:

17:10:35.000: Opening connection 0x00161880
17:10:35.000: Opening connection 0x001618b0
17:10:37.000: Connection 0x00161880 receives 1024 bytes
17:10:37.000: Processing data at address 0xfeefaf04 and length 1024.
17:10:40.000: Closing connection 0x001618b0
17:10:42.000: Closing connection 0x00161880