libbmq  a5f8a06ba1d16cb5a65643e1fa7f1a1d6aadef40
bmqa_session.h File Reference

Provide access to the BlazingMQ broker. More...

#include <bmqa_abstractsession.h>
#include <bmqa_closequeuestatus.h>
#include <bmqa_configurequeuestatus.h>
#include <bmqa_confirmeventbuilder.h>
#include <bmqa_messageeventbuilder.h>
#include <bmqa_openqueuestatus.h>
#include <bmqt_queueoptions.h>
#include <bmqt_sessionoptions.h>
#include <bmqt_uri.h>
#include <ball_log.h>
#include <bsl_memory.h>
#include <bsl_string.h>
#include <bslma_allocator.h>
#include <bslma_managedptr.h>
#include <bslma_usesbslmaallocator.h>
#include <bslmf_nestedtraitdeclaration.h>
#include <bsls_keyword.h>
#include <bsls_timeinterval.h>
#include <bsls_types.h>

Go to the source code of this file.

Classes

class  BloombergLP::bmqa::SessionEventHandler
 
class  BloombergLP::bmqa::Session
 A session with a BlazingMQ broker. More...
 

Namespaces

 BloombergLP
 
 BloombergLP::bmqa
 

Detailed Description

This component provides a mechanism, bmqa::Session, that provides access to a message queue broker and an interface, bmqa::SessionEventHandler for asynchronous notification of events. The broker manages named, persistent queues of messages. This broker allows a client to open queues, post messages to them, or retrieve messages from them. All of these operations take place within the context of the session opened by the client application.

Messages received from a broker are communicated to the application by the session associated with that broker in the form of events (see bmqa_event.h). Events can be of two different types: (1) Messages and message status events (bmqa::MessageEvent), or (2) Session or queue status events (bmqa::SessionEvent).

A bmqa::Session can dispatch events to the application in either a synchronous or asynchronous mode. In synchronous mode, the application must call the nextEvent method in order to obtain events from the bmqa::Session. In asynchronous mode, the application must supply a concrete bmqa::SessionEventHandler object at construction time. The concrete bmqa::SessionEventHandler provided by the application must implement the onSessionEvent and onMessageEvent methods, which will be called by the bmqa::Session every time a session event or a message event is received. Note that by default, a session created in asynchronous mode creates only one internal thread to dispatch events, but a different value for number of threads can be specified in bmqt::SessionOptions.

A bmqa::Session is created either in synchronous or in asynchronous mode, and it will remain in that mode until destruction. Allowing a mix between synchronous or asynchronous would make the SDK complicated. The only exceptions are the "start" and "open" operations that must be available in synchronous or asynchronous version for the convenience of the programmer.

By default a bmqa::Session connects to the local broker, which in turn may connect to a remote cluster based on configuration.

After a bmqa::Session is started, the application has to open one or several queues in read and/or write mode.

Disclaimer

A bmqa::Session object is a heavy object representing the negotiated TCP session with the broker, and the entire associated state (opened queues, statistics, ...). Therefore, sessions should be always reused if possible, preferably with only one session per lifetime of a component/library/task. Note that at the time of this writing multiplexing of different logical sessions over the same physical connection is not supported, so in certain circumstances reuse of the same session across the whole of a single application will not be possible. For example, if an application uses two unrelated libraries both of which use BlazingMQ under the hood, they won't be able to share a session as it stands. An example of an extreme inefficiency and an abuse of resources is to create a session ad-hoc every time a message needs to be posted by the same component.

Thread-safety

This session object is thread enabled, meaning that two threads can safely call any methods on the same instance without external synchronization.

Connecting to the Broker

A bmqa::Session establishes a communication with a broker service using TCP/IP. Each bmqa::Session object must be constructed with a bmqt::SessionOptions object, which provides the necessary information to connect to the broker. In particular, the bmqt::SessionOptions object must specify the IP address and port needed to connect to the broker. The bmqt::SessionOptions object may also provide extra parameters for tuning the TCP connection behavior (see bmqt_sessionoptions.h for details).

Note that in most cases the user does not need to explicitly construct a bmqt::SessionOptions object: the default constructor for bmqt::SessionOptions creates an instance that will connect to the broker service on the local machine using the standard port.

Some options can also be provided using environment variables.

A bmqa::Session object is created in an unconnected state. The start or startAsync method must be called to connect to the broker. Note that start method is blocking, and returns either after connection to broker has been established (success), or after specified timeout (failure). startAsync method, as the name suggests, connects to the broker asynchronously (i.e., it returns immediately), and the result of the operation is notified via bmqt::SessionEventType::e_CONNECTED session event.

When the bmqa::Session is no longer needed, the application should call the stop (blocking) or stopAsync (non-blocking) method to shut down the bmqa::Session and disconnect from the broker. Note that destroying a Session automatically stops it. The session can be restarted with a call to start or startAsync once it has been fully stopped.

Connection loss and reconnection

If the connection between the application and the broker is lost, the bmqa::Session will automatically try to reconnect periodically. The bmqa::Session will also notify the application of the event of losing the connection via bmqt::SessionEventType::e_CONNECTION_LOST session event.

Once the connection has been re-established with the broker (as a result of one of the periodic reconnection attempts), the bmqa::Session will notify the application via bmqt::SessionEventType::e_RECONNECTED session event. After the connection re-establishment, the bmqa::Session will attempt to reopen the queues that were in OPEN state prior to connection loss. The bmqa::Session will notify the application of the result of reopen operation via bmqt::SessionEventType::e_QUEUE_REOPEN_RESULT for each queue. Note that a reopen operation on a queue may fail (due to broker issue, machine issue, etc), so the application must keep track on these session events, and stop posting on a queue that failed to reopen.

After all reopen operations are complete and application has been notified with all bmqt::SessionEventType::e_QUEUE_REOPEN_RESULT events, the bmqa::Session delivers a bmqt::SessionEventType::e_STATE_RESTORED session event to the application.

Example 1

The following example illustrates how to create a bmqa::Session in synchronous mode, start it, and stop it.

void runSession()
{
bmqt::SessionOptions options;
options.setBrokerUri("tcp://localhost:30114");
bmqa::Session session(options);
int res = session.start();
if (0 != res) {
bsl::cout << "Failed to start session (" << res << ")"
<< bsl::endl;
return;
}
bsl::cout << "Session started." << bsl::endl;
// Open queue in READ or WRITE or READ/WRITE mode, and receive or
// post messages, etc.
// ...
session.stop();
}

This example can be simplified because the constructor for bmqa::Session uses a default bmqt::SessionOptions object that will connect to the local broker service. The example may be rewritten as follow:

void runSession()
{
bmqa::Session session; // using default 'SessionOptions'
int res = session.start();
if (0 != res) {
bsl::cout << "Failed to start session (" << res << ")"
<< bsl::endl;
return;
}
bsl::cout << "Session started." << bsl::endl;
// Open queue in READ or WRITE or READ/WRITE mode, and receive or
// post messages, etc.
// ...
session.stop();
}

Processing session events - synchronous mode

If the bmqa::Session is created in synchronous mode, the application needs to call the nextEvent method on a regular basis in order to receive events. This method takes an optional wait timeout as a parameter, and it will return the next available bmqa::Event from the session's internal event queue or it will block the calling thread execution until new bmqa::Event arrives or until the specified timeout expires. It is safe to call the nextEvent method from different threads simultaneously: the bmqa::Session class provides proper synchronization logic to protect the internal event queue from corruption in this scenario.

Example 2

The following example demonstrates how to write a function that queries and processes events synchronously. In this example the switch form checks the type of the bmqa::Event and performs the necessary actions.

We first define two functions to process bmqa::SessionEvent and bmqa::MessageEvent. These functions return true if we should keep processing events and false otherwise (i.e., no more events are expected from the bmqa::Session).

bool processSessionEvent(const bmqa::SessionEvent& event)
{
bool result = true;
switch (event.type()) {
case bmqt::SessionEventType::e_CONNECTED:
// The connection to the broker is established (as a result
// of a call to the 'start' method).
openQueues();
startPostingToQueues();
break;
case bmqt::SessionEventType::e_DISCONNECTED:
// The connection to the broker is terminated (as a result
// of a call to the 'stop' method).
result = false;
break;
case bmqt::SessionEventType::e_CONNECTION_LOST:
// The connection to the broker dropped. Stop posting to the queue.
stopPostingToQueues();
break;
case bmqt::SessionEventType::e_STATE_RESTORED:
// The connection to the broker has been restored (i.e., all queues
// have been re-opened. Resume posting to the queue.
resumePostingToQueues();
break;
case bmqt::SessionEventType::e_CONNECTION_TIMEOUT:
// The connection to the broker has timed out.
result = false;
break;
case bmqt::SessionEventType::e_ERROR:
// Internal error
bsl::cout << "Unexpected session error: "
<< event.errorDescription() << bsl::endl;
break;
} // end switch
return result;
}
bool processMessageEvent(const bmqa::MessageEvent& event)
{
bool result = true;
switch (event.type()) {
case bmqt::MessageEventType::e_PUSH: {
// Received a 'PUSH' event from the broker.
bmqa::MessageIterator msgIter = event.messageIterator();
while (msgIter.nextMessage()) {
const bmqa::Message& msg = msgIter.message();
// Process 'PUSH' msg here (omitted for brevity)
// ...
}
} break;
case bmqt::MessageEventType::e_ACK: {
// Received an 'ACK' event from the broker.
bmqa::MessageIterator msgIter = event.messageIterator();
while (msgIter.nextMessage()) {
const bmqa::Message& msg = msgIter.message();
// Process 'ACK' msg here (omitted for brevity)
// ...
}
} break;
} // end switch
return result;
}

Next, we define a function that handles events synchronously using the processSessionEvent and processMessageEvent functions.

void handleEventsSynchronously(bmqa::Session *startedSession)
{
bool more = true;
while (more) {
bmqa::Event event =
startedSession->nextEvent(bsls::TimeInterval(2.0));
if (event.isSessionEvent()) {
more = processSessionEvent(event.sessionEvent());
}
else {
more = processMessageEvent(event.messageEvent());
}
}
}

Processing session events - asynchronous mode

If application wishes to use bmqa::Session in asynchronous mode, it must pass a managed pointer to an event handler implementing the bmqa::SessionEventHandler. In this case, when bmqa::Session is started, a thread pool owned by the bmqa::Session is also started for processing events asynchronously. The bmqa::Session will call event handler's onSessionEvent or onMessageEvent method every time a session event or a message event is available.

Note that after the bmqa::Session is associated with some event handler, this association cannot be changed or canceled. The event handler will be used for processing events until the bmqa::Session object is destroyed.

Example 3

The following example demonstrates how to implement an event handler and how to make the bmqa::Session use an instance of this event handler for processing events.

First, we define a concrete implementation of bmqa::SessionEventHandler.

class MyHandler: public bmqa::SessionEventHandler {
public:
MyHandler() { }
virtual ~MyHandler() { }
virtual void onSessionEvent(const bmqa::SessionEvent& event);
virtual void onMessageEvent(const bmqa::MessageEvent& event);
};
void MyHandler::onSessionEvent(const bmqa::SessionEvent& event)
{
// The implementation is similar to our 'processSessionEvent' function
// defined in the previous example.
processSessionEvent(event);
}
void MyHandler::onMessageEvent(const bmqa::MessageEvent& event)
{
// The implementation is similar to our 'processMessageEvent' function
// defined in the previous example.
processMessageEvent(event);
}

Next, we define a function that creates a bmqa::Session using our implementation of bmqa::SessionEventHandler.

void runAsyncSession()
{
bslma::ManagedPtr<SessionEventHandler> handlerMp(new MyHandler());
bmqa::Session session(handlerMp); // using default 'SessionOptions'
int res = session.start();
if (0 != res) {
bsl::cout << "Failed to start session (" << res << ")"
<< bsl::endl;
return;
}
// ...
session.stop();
}

Opening queues

Once the bmqa::Session has been created and started, the application can use it to open queues for producing and/or consuming messages. A queue is associated with a domain. Domain metadata must be deployed in the BlazingMQ infrastructure prior to opening queues under that domain, because opening a queue actually loads the metadata deployed for the associated domain.

The metadata associated with a domain defines various parameters like maximum queue size and capacity, persistent policy, routing policy, etc.

Queue are identified by URIs (Unified Resource Identifiers) that must follow the BlazingMQ syntax, manipulated as bmqt::Uri objects. A queue URI is typically formatted as follows:

bmq://my.domain/my.queue

Note that domain names are unique in BlazingMQ infrastructure, which makes a fully qualified queue URI unique too.

Queues in BlazingMQ infrastructure are created by applications on demand. Broke creates a queue when it receives an open-queue request from an application for a queue that does not exist currently.

Application can open a queue by calling openQueue or openQueueAsync method on a started session. Application must pass appropriate flags to indicate if it wants to post messages to queue, consume messages from the queue, or both.

Note that openQueue is a blocking method, and returns after specified queue has been successfully opened (success) or after specified timeout has expired (failure). openQueueAsync method, as the name suggests, is non blocking, and the result of the operation is notified via bmqt::SessionEventType::e_QUEUE_OPEN_RESULT session event.

Example 4

The following example demonstrates how to open a queue for posting messages. The code first opens the queue with appropriate flags, and then uses bmqa::MessageEventBuilder to build a message event and post to the queue.

// Session creation and startup logic elided for brevity
const char *queueUri = "bmq://my.domain/my.queue";
bmqa::QueueId myQueueId(1); // ID for the queue
int rc = session.openQueue(
&myQueueId,
queueUri,
bmqt::QueueFlags::e_WRITE | bmqt::QueueFlags::e_ACK,
bsls::TimeInterval(30, 0));
if (rc != 0) {
bsl::cerr << "Failed to open queue, rc: "
<< bmqt::OpenQueueResult::Enum(rc)
<< bsl::endl;
return;
}

Note that apart from WRITE flag, ACK flag has been passed to openQueue method above. This indicates that application is interested in receiving ACK notification for each message it posts to the queue, irrespective of whether or not the message was successfully received by the broker and posted to the queue.

Once the queue has been successfully opened for writing, messages can be posted to the queue for consumption by interested applications. We will use bmqa::MessageEventBuilder to build a message event.

// Create a message event builder
bmqa::MessageEventBuilder builder;
session.loadMessageEventBuilder(&builder);
// Create and post a message event containing 1 message
bmqa::Message& msg = builder.startMessage();
msg.setCorrelationId(myCorrelationId);
msg.setDataRef(&myPayload); // where 'myPayload' is of type 'bdlbb::Blob'
rc = builder.packMessage(myQueueId);
if (rc != 0) {
bsl::cerr << "Failed to pack message, rc: "
<< bmqt::EventBuilderResult::Enum(rc)
<< bsl::endl;
return;
}
// Post message event
rc = session.post(builder.messageEvent());
if (rc != 0) {
bsl::cerr << "Failed to post message event to the queue, rc: "
<< bmqt::PostResult::Enum(rc)
<< bsl::endl;
return;
}
// ... post more messages

Closing queues

After an application no longer needs to produce or consume messages from a queue, it can be closed by closeQueue or closeQueueAsync method. Note that closing a queue closes an application's "view" on the queue, and may not lead to queue deletion in the broker. A bmqa::Session does not expose any method to explicitly delete a queue.

Note that closeQueue is a blocking method and returns after the specified queue has been successfully closed (success) or after specified timeout has expired (failure). closeQueueAsync, as the name suggests, is a non-blocking method, and result of the operation is notified via bmqt::SessionEventType::e_QUEUE_CLOSE_RESULT session event.

There are 3 flavors which behave differently with regard to thread blocking and callback execution:

openQueue, configureQueue, closeQueue (deprecated Sync) openQueueSync, configureQueueSync, closeQueueSync (Synchronous) openQueueAsync, configureQueueAsync, closeQueueAsync (Asynchronous)
event handler unblocks in internal thread unblocks in event handler thread (*) executes callback in event handler thread
nextEvent unblocks in internal thread unblocks in internal thread executes callback in nextEvent thread

(*) - guarantees unblocking after all previously enqueued events have been emitted to the eventHandler, allowing the user to have proper serialization of events for the given queue (for example no more PUSH messages will be delivered through the eventHandler for the queue after configureQueueSync(maxUnconfirmed = 0) returns).