Quick Links:

bmqa | bmqpi | bmqt

Namespaces

Component bmqa_session
[Package bmqa]

Provide access to the BlazingMQ broker. More...

Namespaces

namespace  bmqimp
namespace  bmqp
namespace  bslmt
namespace  bmqa

Detailed Description

Outline
Purpose:
Provide access to the BlazingMQ broker.
Classes:
bmqa::SessionEventHandler interface for receiving events asynchronously.
bmqa::Session mechanism for access to the BlazingMQ broker.
Description:
This component provides a mechanism, bmqa::Session, that provides access to a message queue broker and an interface, bmqa::SessionEventHandler for asynchronous notification of events. The broker manages named, persistent queues of messages. This broker allows a client to open queues, post messages to them, or retrieve messages from them. All of these operations take place within the context of the session opened by the client application.
Messages received from a broker are communicated to the application by the session associated with that broker in the form of events (see bmqa_event). Events can be of two different types: (1) Messages and message status events (bmqa::MessageEvent), or (2) Session or queue status events (bmqa::SessionEvent).
A Session can dispatch events to the application in either a synchronous or asynchronous mode. In synchronous mode, the application must call the nextEvent method in order to obtain events from the Session. In asynchronous mode, the application must supply a concrete SessionEventHandler object at construction time. The concrete SessionEventHandler provided by the application must implement the onSessionEvent and onMessageEvent methods, which will be called by the Session every time a session event or a message event is received. Note that by default, a session created in asynchronous mode creates only one internal thread to dispatch events, but a different value for number of threads can be specified in bmqt::SessionOptions.
A Session is created either in synchronous or in asynchronous mode, and it will remain in that mode until destruction. Allowing a mix between synchronous or asynchronous would make the SDK complicated. The only exceptions are the "start" and "open" operations that must be available in synchronous or asynchronous version for the convenience of the programmer.
By default a Session connects to the local broker, which in turn may connect to a remote cluster based on configuration.
After a bmqa::Session is started, the application has to open one or several queues in read and/or write mode.
Disclaimer:
A Session object is a heavy object representing the negotiated TCP session with the broker, and the entire associated state (opened queues, statistics, ...). Therefore, sessions should be always reused if possible, preferably with only one session per lifetime of a component/library/task. Note that at the time of this writing multiplexing of different logical sessions over the same physical connection is not supported, so in certain circumstances reuse of the same session across the whole of a single application will not be possible. For example, if an application uses two unrelated libraries both of which use BlazingMQ under the hood, they won't be able to share a session as it stands. An example of an extreme inefficiency and an abuse of resources is to create a session ad-hoc every time a message needs to be posted by the same component.
Thread-safety:
This session object is thread enabled, meaning that two threads can safely call any methods on the same instance without external synchronization.
Connecting to the Broker:
A Session establishes a communication with a broker service using TCP/IP. Each Session object must be constructed with a bmqa::SessionOptions object, which provides the necessary information to connect to the broker. In particular, the SessionOptions object must specify the IP address and port needed to connect to the broker. The SessionOptions object may also provide extra parameters for tuning the TCP connection behavior (see bmqa_sessionoptions for details).
Note that in most cases the user does not need to explicitly construct a SessionOptions object: the default constructor for SessionOptions creates an instance that will connect to the broker service on the local machine using the standard port.
Some options can also be provided using environment variables.
  • BMQ_BROKER_URI: Corresponds to SessionOptions::brokerUri. If this environment variable is set, its value will override the one specified in the SessionOptions.
A Session object is created in an unconnected state. The start or startAsync method must be called to connect to the broker. Note that start method is blocking, and returns either after connection to broker has been established (success), or after specified timeout (failure). startAsync method, as the name suggests, connects to the broker asynchronously (i.e., it returns immediately), and the result of the operation is notified via bmqt::SessionEventType::CONNECTED session event.
When the Session is no longer needed, the application should call the stop (blocking) or stopAsync (non-blocking) method to shut down the Session and disconnect from the broker. Note that destroying a Session automatically stops it. The session can be restarted with a call to start or startAsync once it has been fully stopped.
Connection loss and reconnection:
If the connection between the application and the broker is lost, the Session will automatically try to reconnect periodically. The Session will also notify the application of the event of losing the connection via bmqt::SessionEventType::CONNECTION_LOST session event.
Once the connection has been re-established with the broker (as a result of one of the periodic reconnection attempts), the Session will notify the application via bmqt::SessionEventType::RECONNECTED session event. After the connection re-establishment, the Session will attempt to reopen the queues that were in OPEN state prior to connection loss. The Session will notify the application of the result of reopen operation via bmqt::SessionEventType::QUEUE_REOPEN_RESULT for each queue. Note that a reopen operation on a queue may fail (due to broker issue, machine issue, etc), so the application must keep track on these session events, and stop posting on a queue that failed to reopen.
After all reopen operations are complete and application has been notified with all bmqt::SessionEventType::QUEUE_REOPEN_RESULT events, the Session delivers a bmqt::SessionEventType::STATE_RESTORED session event to the application.
Example 1:
The following example illustrates how to create a Session in synchronous mode, start it, and stop it.
  void runSession()
  {
      bmqt::SessionOptions options;
      options.setBrokerUri("tcp://localhost:30114");

      bmqa::Session session(options);
      int res = session.start();
      if (0 != res) {
          bsl::cout << "Failed to start session (" << res << ")"
                    << bsl::endl;
          return;
      }
      bsl::cout << "Session started." << bsl::endl;

      // Open queue in READ or WRITE or READ/WRITE mode, and receive or
      // post messages, etc.
      // ...

      session.stop();
  }
This example can be simplified because the constructor for Session uses a default SessionOptions object that will connect to the local broker service. The example may be rewritten as follow:
  void runSession()
  {
      bmqa::Session session;     // using default 'SessionOptions'
      int res = session.start();
      if (0 != res) {
          bsl::cout << "Failed to start session (" << res << ")"
                    << bsl::endl;
          return;
      }
      bsl::cout << "Session started." << bsl::endl;

      // Open queue in READ or WRITE or READ/WRITE mode, and receive or
      // post messages, etc.
      // ...

      session.stop();
  }
Processing session events - synchronous mode:
If the Session is created in synchronous mode, the application needs to call the nextEvent method on a regular basis in order to receive events. This method takes an optional wait timeout as a parameter, and it will return the next available Event from the session's internal event queue or it will block the calling thread execution until new Event arrives or until the specified timeout expires. It is safe to call the nextEvent method from different threads simultaneously: the Session class provides proper synchronization logic to protect the internal event queue from corruption in this scenario.
Example 2:
The following example demonstrates how to write a function that queries and processes events synchronously. In this example the switch form checks the type of the Event and performs the necessary actions.
We first define two functions to process SessionEvent and MessageEvent. These functions return true if we should keep processing events and false otherwise (i.e., no more events are expected from the Session).
  bool processSessionEvent(const bmqa::SessionEvent& event)
  {
      bool result = true;
      switch (event.type()) {

        case bmqt::SessionEventType::e_CONNECTED:
          // The connection to the broker is established (as a result
          // of a call to the 'start' method).
          openQueues();
          startPostingToQueues();
          break;

        case bmqt::SessionEventType::e_DISCONNECTED:
          // The connection to the broker is terminated (as a result
          // of a call to the 'stop' method).
          result = false;
          break;

        case bmqt::SessionEventType::e_CONNECTION_LOST:
          // The connection to the broker dropped. Stop posting to the queue.
          stopPostingToQueues();
          break;

        case bmqt::SessionEventType::e_STATE_RESTORED:
          // The connection to the broker has been restored (i.e., all queues
          // have been re-opened. Resume posting to the queue.
          resumePostingToQueues();
          break;

        case bmqt::SessionEventType::e_CONNECTION_TIMEOUT:
          // The connection to the broker has timed out.
          result = false;
          break;

        case bmqt::SessionEventType::e_ERROR:
          // Internal error
          bsl::cout << "Unexpected session error: "
                    << event.errorDescription() << bsl::endl;
          break;

      } // end switch

      return result;
  }

  bool processMessageEvent(const bmqa::MessageEvent& event)
  {
      bool result = true;
      switch (event.type()) {

        case bmqt::MessageEventType::e_PUSH: {
          // Received a 'PUSH' event from the broker.
          bmqa::MessageIterator msgIter = event.messageIterator();
          while (msgIter.nextMessage()) {
              const bmqa::Message& msg = msgIter.message();
              // Process 'PUSH' msg here (omitted for brevity)
              // ...
          }
      } break;

        case bmqt::MessageEventType::e_ACK: {
          // Received an 'ACK' event from the broker.
          bmqa::MessageIterator msgIter = event.messageIterator();
          while (msgIter.nextMessage()) {
              const bmqa::Message& msg = msgIter.message();
              // Process 'ACK' msg here (omitted for brevity)
              // ...
          }
      } break;

      } // end switch

      return result;
  }
Next, we define a function that handles events synchronously using the processSessionEvent and processMessageEvent functions.
  void handleEventsSynchronously(bmqa::Session *startedSession)
  {
      bool more = true;
      while (more) {
          bmqa::Event event =
                  startedSession->nextEvent(bsls::TimeInterval(2.0));
          if (event.isSessionEvent()) {
              more = processSessionEvent(event.sessionEvent());
          }
          else {
              more = processMessageEvent(event.messageEvent());
          }
      }
  }
Processing session events - asynchronous mode:
If application wishes to use Session in asynchronous mode, it must pass a managed pointer to an event handler implementing the SessionEventHandler. In this case, when Session is started, a thread pool owned by the Session is also started for processing events asynchronously. The Session will call event handler's onSessionEvent or onMessageEvent method every time a session event or a message event is available.
Note that after the Session is associated with some event handler, this association cannot be changed or canceled. The event handler will be used for processing events until the Session object is destroyed.
Example 3:
The following example demonstrates how to implement an event handler and how to make the Session use an instance of this event handler for processing events.
First, we define a concrete implementation of SessionEventHandler.
  class MyHandler: public bmqa::SessionEventHandler {
  public:
      MyHandler() { }
      virtual ~MyHandler() { }
      virtual void onSessionEvent(const bmqa::SessionEvent& event);
      virtual void onMessageEvent(const bmqa::MessageEvent& event);
  };

  void MyHandler::onSessionEvent(const bmqa::SessionEvent& event)
  {
      // The implementation is similar to our 'processSessionEvent' function
      // defined in the previous example.
      processSessionEvent(event);
  }

  void MyHandler::onMessageEvent(const bmqa::MessageEvent& event)
  {
      // The implementation is similar to our 'processMessageEvent' function
      // defined in the previous example.
      processMessageEvent(event);
  }
Next, we define a function that creates a Session using our implementation of SessionEventHandler.
  void runAsyncSession()
  {
      bslma::ManagedPtr<SessionEventHandler> handlerMp(new MyHandler());

      bmqa::Session session(handlerMp);   // using default 'SessionOptions'
      int res = session.start();
      if (0 != res) {
          bsl::cout << "Failed to start session (" << res << ")"
                    << bsl::endl;
          return;
      }

      // ...

      session.stop();
  }
Opening queues:
Once the Session has been created and started, the application can use it to open queues for producing and/or consuming messages. A queue is associated with a domain. Domain metadata must be deployed in the BlazingMQ infrastructure prior to opening queues under that domain, because opening a queue actually loads the metadata deployed for the associated domain.
The metadata associated with a domain defines various parameters like maximum queue size and capacity, persistent policy, routing policy, etc.
Queue are identified by URIs (Unified Resource Identifiers) that must follow the BlazingMQsyntax, manipulated as bmqt::Uri objects. A queue URI is typically formatted as follows:
  bmq://my.domain/my.queue
Note that domain names are unique in BlazingMQ infrastructure, which makes a fully qualified queue URI unique too.
Queues in BlazingMQ infrastructure are created by applications on demand. Broke creates a queue when it receives an open-queue request from an application for a queue that does not exist currently.
Application can open a queue by calling openQueue or openQueueAsync method on a started session. Application must pass appropriate flags to indicate if it wants to post messages to queue, consume messages from the queue, or both.
Note that openQueue is a blocking method, and returns after specified queue has been successfully opened (success) or after specified timeout has expired (failure). openQueueAsync method, as the name suggests, is non blocking, and the result of the operation is notified via bmqt::SessionEventType::QUEUE_OPEN_RESULT session event.
Example 4:
The following example demonstrates how to open a queue for posting messages. The code first opens the queue with appropriate flags, and then uses bmqa::MessageEventBuilder to build a message event and post to the queue.
  // Session creation and startup logic elided for brevity
  const char *queueUri = "bmq://my.domain/my.queue";
  bmqa::QueueId myQueueId(1);       // ID for the queue
  int rc = session.openQueue(
                      &myQueueId,
                      queueUri,
                      bmqt::QueueFlags::e_WRITE | bmqt::QueueFlags::e_ACK,
                      bsls::TimeInterval(30, 0));

  if (rc != 0) {
      bsl::cerr << "Failed to open queue, rc: "
                << bmqt::OpenQueueResult::Enum(rc)
                << bsl::endl;
      return;
  }
Note that apart from WRITE flag, ACK flag has been passed to openQueue method above. This indicates that application is interested in receiving ACK notification for each message it posts to the queue, irrespective of whether or not the message was successfully received by the broker and posted to the queue.
Once the queue has been successfully opened for writing, messages can be posted to the queue for consumption by interested applications. We will use bmqa::MessageEventBuilder to build a message event.
  // Create a message event builder
  bmqa::MessageEventBuilder builder;
  session.loadMessageEventBuilder(&builder);

  // Create and post a message event containing 1 message
  bmqa::Message& msg = builder.startMessage();

  msg.setCorrelationId(myCorrelationId);
  msg.setDataRef(&myPayload);  // where 'myPayload' is of type 'bdlbb::Blob'
  rc = builder.packMessage(myQueueId);
  if (rc != 0) {
      bsl::cerr << "Failed to pack message, rc: "
                << bmqt::EventBuilderResult::Enum(rc)
                << bsl::endl;
      return;
  }

  // Post message event
  rc = session.post(builder.messageEvent());
  if (rc != 0) {
      bsl::cerr << "Failed to post message event to the queue, rc: "
                << bmqt::PostResult::Enum(rc)
                << bsl::endl;
      return;
  }

  // ... post more messages
Closing queues:
After an application no longer needs to produce or consume messages from a queue, it can be closed by closeQueue or closeQueueAsync method. Note that closing a queue closes an application's "view" on the queue, and may not lead to queue deletion in the broker. A Session does not expose any method to explicitly delete a queue.
Note that closeQueue is a blocking method and returns after the specified queue has been successfully closed (success) or after specified timeout has expired (failure). closeQueueAsync, as the name suggests, is a non-blocking method, and result of the operation is notified via bmqt::SessionEventType::e_QUEUE_CLOSE_RESULT session event.
There are 3 flavors which behave differently with regard to thread blocking and callback execution: | openQueue | openQueueSync | openQueueAsync | configureQueue | configureQueueSync | configureQueueAsync | closeQueue | closeQueueSync | closeQueueAsync | (deprecated Sync) | (Synchronous) | (Asynchronous) -----------|-------------------|----------------------|---------------------- event | unblocks in | unblocks in event | executes callback in handler | internal thread | handler thread (*) | event handler thread | | | nextEvent | unblocks in | unblocks in | executes callback | internal thread | internal thread | in nextEvent thread -----------------------------------------------------------------------------
(*) - guarantees unblocking after all previously enqueued events have been emitted to the eventHandler, allowing the user to have proper serialization of events for the given queue (for example no more PUSH messages will be delivered through the eventHandler for the queue after configureQueueSync(maxUnconfirmed = 0) returns).