libbmq
e19ff338c707b114e9f84d83ea866a97518afb37
|
Provide access to the BlazingMQ broker. More...
#include <bmqa_abstractsession.h>
#include <bmqa_closequeuestatus.h>
#include <bmqa_configurequeuestatus.h>
#include <bmqa_confirmeventbuilder.h>
#include <bmqa_messageeventbuilder.h>
#include <bmqa_openqueuestatus.h>
#include <bmqt_queueoptions.h>
#include <bmqt_sessionoptions.h>
#include <bmqt_uri.h>
#include <ball_log.h>
#include <bsl_memory.h>
#include <bsl_string.h>
#include <bslma_allocator.h>
#include <bslma_managedptr.h>
#include <bslma_usesbslmaallocator.h>
#include <bslmf_nestedtraitdeclaration.h>
#include <bsls_keyword.h>
#include <bsls_timeinterval.h>
#include <bsls_types.h>
Go to the source code of this file.
Classes | |
class | BloombergLP::bmqa::SessionEventHandler |
class | BloombergLP::bmqa::Session |
A session with a BlazingMQ broker. More... | |
Namespaces | |
BloombergLP | |
BloombergLP::bmqa | |
This component provides a mechanism, bmqa::Session, that provides access to a message queue broker and an interface, bmqa::SessionEventHandler for asynchronous notification of events. The broker manages named, persistent queues of messages. This broker allows a client to open queues, post messages to them, or retrieve messages from them. All of these operations take place within the context of the session opened by the client application.
Messages received from a broker are communicated to the application by the session associated with that broker in the form of events (see bmqa_event.h). Events can be of two different types: (1) Messages and message status events (bmqa::MessageEvent), or (2) Session or queue status events (bmqa::SessionEvent).
A bmqa::Session can dispatch events to the application in either a synchronous or asynchronous mode. In synchronous mode, the application must call the nextEvent
method in order to obtain events from the bmqa::Session. In asynchronous mode, the application must supply a concrete bmqa::SessionEventHandler object at construction time. The concrete bmqa::SessionEventHandler provided by the application must implement the onSessionEvent
and onMessageEvent
methods, which will be called by the bmqa::Session every time a session event or a message event is received. Note that by default, a session created in asynchronous mode creates only one internal thread to dispatch events, but a different value for number of threads can be specified in bmqt::SessionOptions.
A bmqa::Session is created either in synchronous or in asynchronous mode, and it will remain in that mode until destruction. Allowing a mix between synchronous or asynchronous would make the SDK complicated. The only exceptions are the "start" and "open" operations that must be available in synchronous or asynchronous version for the convenience of the programmer.
By default a bmqa::Session connects to the local broker, which in turn may connect to a remote cluster based on configuration.
After a bmqa::Session is started, the application has to open one or several queues in read and/or write mode.
A bmqa::Session object is a heavy object representing the negotiated TCP session with the broker, and the entire associated state (opened queues, statistics, ...). Therefore, sessions should be always reused if possible, preferably with only one session per lifetime of a component/library/task. Note that at the time of this writing multiplexing of different logical sessions over the same physical connection is not supported, so in certain circumstances reuse of the same session across the whole of a single application will not be possible. For example, if an application uses two unrelated libraries both of which use BlazingMQ under the hood, they won't be able to share a session as it stands. An example of an extreme inefficiency and an abuse of resources is to create a session ad-hoc every time a message needs to be posted by the same component.
This session object is thread enabled, meaning that two threads can safely call any methods on the same instance without external synchronization.
A bmqa::Session establishes a communication with a broker service using TCP/IP. Each bmqa::Session object must be constructed with a bmqt::SessionOptions object, which provides the necessary information to connect to the broker. In particular, the bmqt::SessionOptions object must specify the IP address and port needed to connect to the broker. The bmqt::SessionOptions object may also provide extra parameters for tuning the TCP connection behavior (see bmqt_sessionoptions.h for details).
Note that in most cases the user does not need to explicitly construct a bmqt::SessionOptions object: the default constructor for bmqt::SessionOptions creates an instance that will connect to the broker service on the local machine using the standard port.
Some options can also be provided using environment variables.
A bmqa::Session object is created in an unconnected state. The start
or startAsync
method must be called to connect to the broker. Note that start
method is blocking, and returns either after connection to broker has been established (success), or after specified timeout (failure). startAsync
method, as the name suggests, connects to the broker asynchronously (i.e., it returns immediately), and the result of the operation is notified via bmqt::SessionEventType::e_CONNECTED session event.
When the bmqa::Session is no longer needed, the application should call the stop
(blocking) or stopAsync
(non-blocking) method to shut down the bmqa::Session and disconnect from the broker. Note that destroying a Session automatically stops it. The session can be restarted with a call to start
or startAsync
once it has been fully stopped.
If the connection between the application and the broker is lost, the bmqa::Session will automatically try to reconnect periodically. The bmqa::Session will also notify the application of the event of losing the connection via bmqt::SessionEventType::e_CONNECTION_LOST session event.
Once the connection has been re-established with the broker (as a result of one of the periodic reconnection attempts), the bmqa::Session will notify the application via bmqt::SessionEventType::e_RECONNECTED session event. After the connection re-establishment, the bmqa::Session will attempt to reopen the queues that were in OPEN
state prior to connection loss. The bmqa::Session will notify the application of the result of reopen operation via bmqt::SessionEventType::e_QUEUE_REOPEN_RESULT for each queue. Note that a reopen operation on a queue may fail (due to broker issue, machine issue, etc), so the application must keep track on these session events, and stop posting on a queue that failed to reopen.
After all reopen operations are complete and application has been notified with all bmqt::SessionEventType::e_QUEUE_REOPEN_RESULT events, the bmqa::Session delivers a bmqt::SessionEventType::e_STATE_RESTORED session event to the application.
The following example illustrates how to create a bmqa::Session in synchronous mode, start it, and stop it.
This example can be simplified because the constructor for bmqa::Session uses a default bmqt::SessionOptions object that will connect to the local broker service. The example may be rewritten as follow:
If the bmqa::Session is created in synchronous mode, the application needs to call the nextEvent
method on a regular basis in order to receive events. This method takes an optional wait timeout as a parameter, and it will return the next available bmqa::Event from the session's internal event queue or it will block the calling thread execution until new bmqa::Event arrives or until the specified timeout expires. It is safe to call the nextEvent
method from different threads simultaneously: the bmqa::Session class provides proper synchronization logic to protect the internal event queue from corruption in this scenario.
The following example demonstrates how to write a function that queries and processes events synchronously. In this example the switch form checks the type of the bmqa::Event and performs the necessary actions.
We first define two functions to process bmqa::SessionEvent and bmqa::MessageEvent. These functions return true
if we should keep processing events and false
otherwise (i.e., no more events are expected from the bmqa::Session).
Next, we define a function that handles events synchronously using the processSessionEvent
and processMessageEvent
functions.
If application wishes to use bmqa::Session in asynchronous mode, it must pass a managed pointer to an event handler implementing the bmqa::SessionEventHandler. In this case, when bmqa::Session is started, a thread pool owned by the bmqa::Session is also started for processing events asynchronously. The bmqa::Session will call event handler's onSessionEvent
or onMessageEvent
method every time a session event or a message event is available.
Note that after the bmqa::Session is associated with some event handler, this association cannot be changed or canceled. The event handler will be used for processing events until the bmqa::Session object is destroyed.
The following example demonstrates how to implement an event handler and how to make the bmqa::Session use an instance of this event handler for processing events.
First, we define a concrete implementation of bmqa::SessionEventHandler.
Next, we define a function that creates a bmqa::Session using our implementation of bmqa::SessionEventHandler.
Once the bmqa::Session has been created and started, the application can use it to open queues for producing and/or consuming messages. A queue is associated with a domain. Domain metadata must be deployed in the BlazingMQ infrastructure prior to opening queues under that domain, because opening a queue actually loads the metadata deployed for the associated domain.
The metadata associated with a domain defines various parameters like maximum queue size and capacity, persistent policy, routing policy, etc.
Queue are identified by URIs (Unified Resource Identifiers) that must follow the BlazingMQ syntax, manipulated as bmqt::Uri objects. A queue URI is typically formatted as follows:
Note that domain names are unique in BlazingMQ infrastructure, which makes a fully qualified queue URI unique too.
Queues in BlazingMQ infrastructure are created by applications on demand. Broke creates a queue when it receives an open-queue request from an application for a queue that does not exist currently.
Application can open a queue by calling openQueue
or openQueueAsync
method on a started session. Application must pass appropriate flags to indicate if it wants to post messages to queue, consume messages from the queue, or both.
Note that openQueue
is a blocking method, and returns after specified queue has been successfully opened (success) or after specified timeout has expired (failure). openQueueAsync
method, as the name suggests, is non blocking, and the result of the operation is notified via bmqt::SessionEventType::e_QUEUE_OPEN_RESULT session event.
The following example demonstrates how to open a queue for posting messages. The code first opens the queue with appropriate flags, and then uses bmqa::MessageEventBuilder to build a message event and post to the queue.
Note that apart from WRITE
flag, ACK
flag has been passed to openQueue
method above. This indicates that application is interested in receiving ACK
notification for each message it posts to the queue, irrespective of whether or not the message was successfully received by the broker and posted to the queue.
Once the queue has been successfully opened for writing, messages can be posted to the queue for consumption by interested applications. We will use bmqa::MessageEventBuilder to build a message event.
After an application no longer needs to produce or consume messages from a queue, it can be closed by closeQueue
or closeQueueAsync
method. Note that closing a queue closes an application's "view" on the queue, and may not lead to queue deletion in the broker. A bmqa::Session does not expose any method to explicitly delete a queue.
Note that closeQueue
is a blocking method and returns after the specified queue has been successfully closed (success) or after specified timeout has expired (failure). closeQueueAsync
, as the name suggests, is a non-blocking method, and result of the operation is notified via bmqt::SessionEventType::e_QUEUE_CLOSE_RESULT session event.
There are 3 flavors which behave differently with regard to thread blocking and callback execution:
openQueue, configureQueue, closeQueue (deprecated Sync) | openQueueSync, configureQueueSync, closeQueueSync (Synchronous) | openQueueAsync, configureQueueAsync, closeQueueAsync (Asynchronous) | |
---|---|---|---|
event handler | unblocks in internal thread | unblocks in event handler thread (*) | executes callback in event handler thread |
nextEvent | unblocks in internal thread | unblocks in internal thread | executes callback in nextEvent thread |
(*) - guarantees unblocking after all previously enqueued events have been emitted to the eventHandler, allowing the user to have proper serialization of events for the given queue (for example no more PUSH messages will be delivered through the eventHandler for the queue after configureQueueSync(maxUnconfirmed = 0)
returns).