Provide access to the BlazingMQ broker.
More...
Detailed Description
- Outline
-
-
- Purpose:
- Provide access to the BlazingMQ broker.
-
- Classes:
-
-
- Description:
- This component provides a mechanism,
bmqa::Session
, that provides access to a message queue broker and an interface, bmqa::SessionEventHandler
for asynchronous notification of events. The broker manages named, persistent queues of messages. This broker allows a client to open queues, post messages to them, or retrieve messages from them. All of these operations take place within the context of the session opened by the client application.
- Messages received from a broker are communicated to the application by the session associated with that broker in the form of events (see
bmqa_event
). Events can be of two different types: (1) Messages and message status events (bmqa::MessageEvent
), or (2) Session or queue status events (bmqa::SessionEvent
).
- A
Session
can dispatch events to the application in either a synchronous or asynchronous mode. In synchronous mode, the application must call the nextEvent
method in order to obtain events from the Session
. In asynchronous mode, the application must supply a concrete SessionEventHandler
object at construction time. The concrete SessionEventHandler
provided by the application must implement the onSessionEvent
and onMessageEvent
methods, which will be called by the Session
every time a session event or a message event is received. Note that by default, a session created in asynchronous mode creates only one internal thread to dispatch events, but a different value for number of threads can be specified in bmqt::SessionOptions
.
- A
Session
is created either in synchronous or in asynchronous mode, and it will remain in that mode until destruction. Allowing a mix between synchronous or asynchronous would make the SDK complicated. The only exceptions are the "start" and "open" operations that must be available in synchronous or asynchronous version for the convenience of the programmer.
- By default a
Session
connects to the local broker, which in turn may connect to a remote cluster based on configuration.
- After a
bmqa::Session
is started, the application has to open one or several queues in read and/or write mode.
-
- Disclaimer:
- A
Session
object is a heavy object representing the negotiated TCP session with the broker, and the entire associated state (opened queues, statistics, ...). Therefore, sessions should be always reused if possible, preferably with only one session per lifetime of a component/library/task. Note that at the time of this writing multiplexing of different logical sessions over the same physical connection is not supported, so in certain circumstances reuse of the same session across the whole of a single application will not be possible. For example, if an application uses two unrelated libraries both of which use BlazingMQ under the hood, they won't be able to share a session as it stands. An example of an extreme inefficiency and an abuse of resources is to create a session ad-hoc every time a message needs to be posted by the same component.
-
- Thread-safety:
- This session object is thread enabled, meaning that two threads can safely call any methods on the same instance without external synchronization.
-
- Connecting to the Broker:
- A
Session
establishes a communication with a broker service using TCP/IP. Each Session
object must be constructed with a bmqa::SessionOptions
object, which provides the necessary information to connect to the broker. In particular, the SessionOptions
object must specify the IP address and port needed to connect to the broker. The SessionOptions
object may also provide extra parameters for tuning the TCP connection behavior (see bmqa_sessionoptions
for details).
- Note that in most cases the user does not need to explicitly construct a
SessionOptions
object: the default constructor for SessionOptions
creates an instance that will connect to the broker service on the local machine using the standard port.
- Some options can also be provided using environment variables.
-
BMQ_BROKER_URI: Corresponds to
SessionOptions::brokerUri
. If this environment variable is set, its value will override the one specified in the SessionOptions
.
- A
Session
object is created in an unconnected state. The start
or startAsync
method must be called to connect to the broker. Note that start
method is blocking, and returns either after connection to broker has been established (success), or after specified timeout (failure). startAsync
method, as the name suggests, connects to the broker asynchronously (i.e., it returns immediately), and the result of the operation is notified via bmqt::SessionEventType::CONNECTED
session event.
- When the
Session
is no longer needed, the application should call the stop
(blocking) or stopAsync
(non-blocking) method to shut down the Session
and disconnect from the broker. Note that destroying a Session automatically stops it. The session can be restarted with a call to start
or startAsync
once it has been fully stopped.
-
- Connection loss and reconnection:
- If the connection between the application and the broker is lost, the
Session
will automatically try to reconnect periodically. The Session
will also notify the application of the event of losing the connection via bmqt::SessionEventType::CONNECTION_LOST
session event.
- Once the connection has been re-established with the broker (as a result of one of the periodic reconnection attempts), the
Session
will notify the application via bmqt::SessionEventType::RECONNECTED
session event. After the connection re-establishment, the Session
will attempt to reopen the queues that were in OPEN
state prior to connection loss. The Session
will notify the application of the result of reopen operation via bmqt::SessionEventType::QUEUE_REOPEN_RESULT
for each queue. Note that a reopen operation on a queue may fail (due to broker issue, machine issue, etc), so the application must keep track on these session events, and stop posting on a queue that failed to reopen.
- After all reopen operations are complete and application has been notified with all
bmqt::SessionEventType::QUEUE_REOPEN_RESULT
events, the Session
delivers a bmqt::SessionEventType::STATE_RESTORED
session event to the application.
-
- Example 1:
- The following example illustrates how to create a
Session
in synchronous mode, start it, and stop it. void runSession()
{
bmqt::SessionOptions options;
options.setBrokerUri("tcp://localhost:30114");
bmqa::Session session(options);
int res = session.start();
if (0 != res) {
bsl::cout << "Failed to start session (" << res << ")"
<< bsl::endl;
return;
}
bsl::cout << "Session started." << bsl::endl;
session.stop();
}
This example can be simplified because the constructor for Session
uses a default SessionOptions
object that will connect to the local broker service. The example may be rewritten as follow: void runSession()
{
bmqa::Session session;
int res = session.start();
if (0 != res) {
bsl::cout << "Failed to start session (" << res << ")"
<< bsl::endl;
return;
}
bsl::cout << "Session started." << bsl::endl;
session.stop();
}
-
- Processing session events - synchronous mode:
- If the
Session
is created in synchronous mode, the application needs to call the nextEvent
method on a regular basis in order to receive events. This method takes an optional wait timeout as a parameter, and it will return the next available Event
from the session's internal event queue or it will block the calling thread execution until new Event
arrives or until the specified timeout expires. It is safe to call the nextEvent
method from different threads simultaneously: the Session
class provides proper synchronization logic to protect the internal event queue from corruption in this scenario.
-
- Example 2:
- The following example demonstrates how to write a function that queries and processes events synchronously. In this example the switch form checks the type of the
Event
and performs the necessary actions.
- We first define two functions to process
SessionEvent
and MessageEvent
. These functions return true
if we should keep processing events and false
otherwise (i.e., no more events are expected from the Session
).
- Next, we define a function that handles events synchronously using the
processSessionEvent
and processMessageEvent
functions. void handleEventsSynchronously(bmqa::Session *startedSession)
{
bool more = true;
while (more) {
bmqa::Event event =
startedSession->nextEvent(bsls::TimeInterval(2.0));
if (event.isSessionEvent()) {
more = processSessionEvent(event.sessionEvent());
}
else {
more = processMessageEvent(event.messageEvent());
}
}
}
-
- Processing session events - asynchronous mode:
- If application wishes to use
Session
in asynchronous mode, it must pass a managed pointer to an event handler implementing the SessionEventHandler
. In this case, when Session
is started, a thread pool owned by the Session
is also started for processing events asynchronously. The Session
will call event handler's onSessionEvent
or onMessageEvent
method every time a session event or a message event is available.
- Note that after the
Session
is associated with some event handler, this association cannot be changed or canceled. The event handler will be used for processing events until the Session
object is destroyed.
-
- Example 3:
- The following example demonstrates how to implement an event handler and how to make the
Session
use an instance of this event handler for processing events.
- First, we define a concrete implementation of
SessionEventHandler
.
class MyHandler: public bmqa::SessionEventHandler {
public:
MyHandler() { }
virtual ~MyHandler() { }
virtual void onSessionEvent(const bmqa::SessionEvent& event);
virtual void onMessageEvent(const bmqa::MessageEvent& event);
};
void MyHandler::onSessionEvent(const bmqa::SessionEvent& event)
{
processSessionEvent(event);
}
void MyHandler::onMessageEvent(const bmqa::MessageEvent& event)
{
processMessageEvent(event);
}
Next, we define a function that creates a Session
using our implementation of SessionEventHandler
. void runAsyncSession()
{
bslma::ManagedPtr<SessionEventHandler> handlerMp(new MyHandler());
bmqa::Session session(handlerMp);
int res = session.start();
if (0 != res) {
bsl::cout << "Failed to start session (" << res << ")"
<< bsl::endl;
return;
}
session.stop();
}
-
- Opening queues:
- Once the
Session
has been created and started, the application can use it to open queues for producing and/or consuming messages. A queue is associated with a domain. Domain metadata must be deployed in the BlazingMQ infrastructure prior to opening queues under that domain, because opening a queue actually loads the metadata deployed for the associated domain.
- The metadata associated with a domain defines various parameters like maximum queue size and capacity, persistent policy, routing policy, etc.
- Queue are identified by URIs (Unified Resource Identifiers) that must follow the BlazingMQsyntax, manipulated as
bmqt::Uri
objects. A queue URI is typically formatted as follows: Note that domain names are unique in BlazingMQ infrastructure, which makes a fully qualified queue URI unique too.
- Queues in BlazingMQ infrastructure are created by applications on demand. Broke creates a queue when it receives an open-queue request from an application for a queue that does not exist currently.
- Application can open a queue by calling
openQueue
or openQueueAsync
method on a started session. Application must pass appropriate flags to indicate if it wants to post messages to queue, consume messages from the queue, or both.
- Note that
openQueue
is a blocking method, and returns after specified queue has been successfully opened (success) or after specified timeout has expired (failure). openQueueAsync
method, as the name suggests, is non blocking, and the result of the operation is notified via bmqt::SessionEventType::QUEUE_OPEN_RESULT
session event.
-
- Example 4:
- The following example demonstrates how to open a queue for posting messages. The code first opens the queue with appropriate flags, and then uses
bmqa::MessageEventBuilder
to build a message event and post to the queue. Note that apart from WRITE
flag, ACK
flag has been passed to openQueue
method above. This indicates that application is interested in receiving ACK
notification for each message it posts to the queue, irrespective of whether or not the message was successfully received by the broker and posted to the queue.
- Once the queue has been successfully opened for writing, messages can be posted to the queue for consumption by interested applications. We will use
bmqa::MessageEventBuilder
to build a message event.
-
- Closing queues:
- After an application no longer needs to produce or consume messages from a queue, it can be closed by
closeQueue
or closeQueueAsync
method. Note that closing a queue closes an application's "view" on the queue, and may not lead to queue deletion in the broker. A Session
does not expose any method to explicitly delete a queue.
- Note that
closeQueue
is a blocking method and returns after the specified queue has been successfully closed (success) or after specified timeout has expired (failure). closeQueueAsync
, as the name suggests, is a non-blocking method, and result of the operation is notified via bmqt::SessionEventType::e_QUEUE_CLOSE_RESULT
session event.
- There are 3 flavors which behave differently with regard to thread blocking and callback execution: | openQueue | openQueueSync | openQueueAsync | configureQueue | configureQueueSync | configureQueueAsync | closeQueue | closeQueueSync | closeQueueAsync | (deprecated Sync) | (Synchronous) | (Asynchronous) -----------|-------------------|----------------------|---------------------- event | unblocks in | unblocks in event | executes callback in handler | internal thread | handler thread (*) | event handler thread | | | nextEvent | unblocks in | unblocks in | executes callback | internal thread | internal thread | in nextEvent thread -----------------------------------------------------------------------------
- (*) - guarantees unblocking after all previously enqueued events have been emitted to the eventHandler, allowing the user to have proper serialization of events for the given queue (for example no more PUSH messages will be delivered through the eventHandler for the queue after configureQueueSync(maxUnconfirmed = 0) returns).