// Copyright 2014-2023 Bloomberg Finance L.P.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//     http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// bmqa_session.h                                                     -*-C++-*-

//@PURPOSE: Provide access to the BlazingMQ broker.
//  bmqa::SessionEventHandler: interface for receiving events asynchronously.
//  bmqa::Session            : mechanism for access to the BlazingMQ broker.
//@DESCRIPTION: This component provides a mechanism, 'bmqa::Session', that
// provides access to a message queue broker and an interface,
// 'bmqa::SessionEventHandler' for asynchronous notification of events.  The
// broker manages named, persistent queues of messages.  This broker allows a
// client to open queues, post messages to them, or retrieve messages from
// them.  All of these operations take place within the context of the session
// opened by the client application.
// Messages received from a broker are communicated to the application by the
// session associated with that broker in the form of events (see
// 'bmqa_event').  Events can be of two different types: (1) Messages and
// message status events ('bmqa::MessageEvent'), or (2) Session or queue
// status events ('bmqa::SessionEvent').
// A 'Session' can dispatch events to the application in either a synchronous
// or asynchronous mode.  In synchronous mode, the application must call the
// 'nextEvent' method in order to obtain events from the 'Session'.  In
// asynchronous mode, the application must supply a concrete
// 'SessionEventHandler' object at construction time.  The concrete
// 'SessionEventHandler' provided by the application must implement the
// 'onSessionEvent' and 'onMessageEvent' methods, which will be called by the
// 'Session' every time a session event or a message event is received.  Note
// that by default, a session created in asynchronous mode creates only one
// internal thread to dispatch events, but a different value for number of
// threads can be specified in 'bmqt::SessionOptions'.
// A 'Session' is created either in synchronous or in asynchronous mode, and it
// will remain in that mode until destruction.  Allowing a mix between
// synchronous or asynchronous would make the SDK complicated.  The only
// exceptions are the "start" and "open" operations that must be available in
// synchronous or asynchronous version for the convenience of the programmer.
// By default a 'Session' connects to the local broker, which in turn may
// connect to a remote cluster based on configuration.
// After a 'bmqa::Session' is started, the application has to open one or
// several queues in read and/or write mode.
// A 'Session' object is a heavy object representing the negotiated TCP session
// with the broker, and the entire associated state (opened queues, statistics,
// ...).  Therefore, sessions should be always reused if possible, preferably
// with only *one* session per lifetime of a component/library/task.
// Note that at the time of this writing multiplexing of different logical
// sessions over the same physical connection is not supported, so in certain
// circumstances reuse of the same session across the whole of a single
// application will not be possible. For example, if an application uses two
// unrelated libraries both of which use BlazingMQ under the hood, they won't
// be able to share a session as it stands.
// An example of an extreme inefficiency and an abuse of resources is to
// create a session ad-hoc every time a message needs to be posted by the same
// component.
// This session object is *thread* *enabled*, meaning that two threads can
// safely call any methods on the *same* *instance* without external
// synchronization.
///Connecting to the Broker
// A 'Session' establishes a communication with a broker service using TCP/IP.
// Each 'Session' object must be constructed with a 'bmqa::SessionOptions'
// object, which provides the necessary information to connect to the broker.
// In particular, the 'SessionOptions' object must specify the IP address and
// port needed to connect to the broker.  The 'SessionOptions' object may also
// provide extra parameters for tuning the TCP connection behavior (see
// 'bmqa_sessionoptions' for details).
// Note that in most cases the user does not need to explicitly construct a
// 'SessionOptions' object: the default constructor for 'SessionOptions'
// creates an instance that will connect to the broker service on the local
// machine using the standard port.
// Some options can also be provided using environment variables.
//: o !BMQ_BROKER_URI!: Corresponds to 'SessionOptions::brokerUri'.
//:   If this environment variable is set, its value will override the one
//:   specified in the 'SessionOptions'.
// A 'Session' object is created in an unconnected state.  The 'start' or
// 'startAsync' method must be called to connect to the broker.  Note that
// 'start' method is blocking, and returns either after connection to broker
// has been established (success), or after specified timeout (failure).
// 'startAsync' method, as the name suggests, connects to the broker
// asynchronously (i.e., it returns immediately), and the result of the
// operation is notified via 'bmqt::SessionEventType::CONNECTED' session event.
// When the 'Session' is no longer needed, the application should call the
// 'stop' (blocking) or 'stopAsync' (non-blocking) method to shut down the
// 'Session' and disconnect from the broker.  Note that destroying a Session
// automatically stops it.  The session can be restarted with a call to 'start'
// or 'startAsync' once it has been fully stopped.
///Connection loss and reconnection
// If the connection between the application and the broker is lost, the
// 'Session' will automatically try to reconnect periodically.  The 'Session'
// will also notify the application of the event of losing the connection via
// 'bmqt::SessionEventType::CONNECTION_LOST' session event.
// Once the connection has been re-established with the broker (as a result of
// one of the periodic reconnection attempts), the 'Session' will notify the
// application via 'bmqt::SessionEventType::RECONNECTED' session event.  After
// the connection re-establishment, the 'Session' will attempt to reopen the
// queues that were in 'OPEN' state prior to connection loss.  The 'Session'
// will notify the application of the result of reopen operation via
// 'bmqt::SessionEventType::QUEUE_REOPEN_RESULT' for each queue.  Note that a
// reopen operation on a queue may fail (due to broker issue, machine issue,
// etc), so the application must keep track on these session events, and stop
// posting on a queue that failed to reopen.
// After all reopen operations are complete and application has been notified
// with all 'bmqt::SessionEventType::QUEUE_REOPEN_RESULT' events, the 'Session'
// delivers a 'bmqt::SessionEventType::STATE_RESTORED' session event to the
// application.
///Example 1
///- - - - -
// The following example illustrates how to create a 'Session' in synchronous
// mode, start it, and stop it.
//  void runSession()
//  {
//      bmqt::SessionOptions options;
//      options.setBrokerUri("tcp://localhost:30114");
//      bmqa::Session session(options);
//      int res = session.start();
//      if (0 != res) {
//          bsl::cout << "Failed to start session (" << res << ")"
//                    << bsl::endl;
//          return;
//      }
//      bsl::cout << "Session started." << bsl::endl;
//      // Open queue in READ or WRITE or READ/WRITE mode, and receive or
//      // post messages, etc.
//      // ...
//      session.stop();
//  }
// This example can be simplified because the constructor for 'Session' uses a
// default 'SessionOptions' object that will connect to the local broker
// service.  The example may be rewritten as follow:
//  void runSession()
//  {
//      bmqa::Session session;     // using default 'SessionOptions'
//      int res = session.start();
//      if (0 != res) {
//          bsl::cout << "Failed to start session (" << res << ")"
//                    << bsl::endl;
//          return;
//      }
//      bsl::cout << "Session started." << bsl::endl;
//      // Open queue in READ or WRITE or READ/WRITE mode, and receive or
//      // post messages, etc.
//      // ...
//      session.stop();
//  }
///Processing session events - synchronous mode
// If the 'Session' is created in synchronous mode, the application needs to
// call the 'nextEvent' method on a regular basis in order to receive events.
// This method takes an optional wait timeout as a parameter, and it will
// return the next available 'Event' from the session's internal event queue or
// it will block the calling thread execution until new 'Event' arrives or
// until the specified timeout expires.  It is safe to call the 'nextEvent'
// method from different threads simultaneously: the 'Session' class provides
// proper synchronization logic to protect the internal event queue from
// corruption in this scenario.
///Example 2
///- - - - -
// The following example demonstrates how to write a function that queries and
// processes events synchronously.  In this example the switch form checks the
// type of the 'Event' and performs the necessary actions.
// We first define two functions to process 'SessionEvent' and 'MessageEvent'.
// These functions return 'true' if we should keep processing events and
// 'false' otherwise (i.e., no more events are expected from the 'Session').
//  bool processSessionEvent(const bmqa::SessionEvent& event)
//  {
//      bool result = true;
//      switch (event.type()) {
//        case bmqt::SessionEventType::e_CONNECTED:
//          // The connection to the broker is established (as a result
//          // of a call to the 'start' method).
//          openQueues();
//          startPostingToQueues();
//          break;
//        case bmqt::SessionEventType::e_DISCONNECTED:
//          // The connection to the broker is terminated (as a result
//          // of a call to the 'stop' method).
//          result = false;
//          break;
//        case bmqt::SessionEventType::e_CONNECTION_LOST:
//          // The connection to the broker dropped. Stop posting to the queue.
//          stopPostingToQueues();
//          break;
//        case bmqt::SessionEventType::e_STATE_RESTORED:
//          // The connection to the broker has been restored (i.e., all queues
//          // have been re-opened. Resume posting to the queue.
//          resumePostingToQueues();
//          break;
//        case bmqt::SessionEventType::e_CONNECTION_TIMEOUT:
//          // The connection to the broker has timed out.
//          result = false;
//          break;
//        case bmqt::SessionEventType::e_ERROR:
//          // Internal error
//          bsl::cout << "Unexpected session error: "
//                    << event.errorDescription() << bsl::endl;
//          break;
//      } // end switch
//      return result;
//  }
//  bool processMessageEvent(const bmqa::MessageEvent& event)
//  {
//      bool result = true;
//      switch (event.type()) {
//        case bmqt::MessageEventType::e_PUSH: {
//          // Received a 'PUSH' event from the broker.
//          bmqa::MessageIterator msgIter = event.messageIterator();
//          while (msgIter.nextMessage()) {
//              const bmqa::Message& msg = msgIter.message();
//              // Process 'PUSH' msg here (omitted for brevity)
//              // ...
//          }
//      } break;
//        case bmqt::MessageEventType::e_ACK: {
//          // Received an 'ACK' event from the broker.
//          bmqa::MessageIterator msgIter = event.messageIterator();
//          while (msgIter.nextMessage()) {
//              const bmqa::Message& msg = msgIter.message();
//              // Process 'ACK' msg here (omitted for brevity)
//              // ...
//          }
//      } break;
//      } // end switch
//      return result;
//  }
// Next, we define a function that handles events synchronously using the
// 'processSessionEvent' and 'processMessageEvent' functions.
//  void handleEventsSynchronously(bmqa::Session *startedSession)
//  {
//      bool more = true;
//      while (more) {
//          bmqa::Event event =
//                  startedSession->nextEvent(bsls::TimeInterval(2.0));
//          if (event.isSessionEvent()) {
//              more = processSessionEvent(event.sessionEvent());
//          }
//          else {
//              more = processMessageEvent(event.messageEvent());
//          }
//      }
//  }
///Processing session events - asynchronous mode
// If application wishes to use 'Session' in asynchronous mode, it must pass a
// managed pointer to an event handler implementing the 'SessionEventHandler'.
// In this case, when 'Session' is started, a thread pool owned by the
// 'Session' is also started for processing events asynchronously.  The
// 'Session' will call event handler's 'onSessionEvent' or 'onMessageEvent'
// method every time a session event or a message event is available.
// Note that after the 'Session' is associated with some event handler, this
// association cannot be changed or canceled.  The event handler will be used
// for processing events until the 'Session' object is destroyed.
///Example 3
///- - - - -
// The following example demonstrates how to implement an event handler and how
// to make the 'Session' use an instance of this event handler for processing
// events.
// First, we define a concrete implementation of 'SessionEventHandler'.
//  class MyHandler: public bmqa::SessionEventHandler {
//  public:
//      MyHandler() { }
//      virtual ~MyHandler() { }
//      virtual void onSessionEvent(const bmqa::SessionEvent& event);
//      virtual void onMessageEvent(const bmqa::MessageEvent& event);
//  };
//  void MyHandler::onSessionEvent(const bmqa::SessionEvent& event)
//  {
//      // The implementation is similar to our 'processSessionEvent' function
//      // defined in the previous example.
//      processSessionEvent(event);
//  }
//  void MyHandler::onMessageEvent(const bmqa::MessageEvent& event)
//  {
//      // The implementation is similar to our 'processMessageEvent' function
//      // defined in the previous example.
//      processMessageEvent(event);
//  }
// Next, we define a function that creates a 'Session' using our implementation
// of 'SessionEventHandler'.
//  void runAsyncSession()
//  {
//      bslma::ManagedPtr<SessionEventHandler> handlerMp(new MyHandler());
//      bmqa::Session session(handlerMp);   // using default 'SessionOptions'
//      int res = session.start();
//      if (0 != res) {
//          bsl::cout << "Failed to start session (" << res << ")"
//                    << bsl::endl;
//          return;
//      }
//      // ...
//      session.stop();
//  }
///Opening queues
// Once the 'Session' has been created and started, the application can use it
// to open queues for producing and/or consuming messages.  A queue is
// associated with a domain.  Domain metadata must be deployed in the BlazingMQ
// infrastructure prior to opening queues under that domain, because opening a
// queue actually loads the metadata deployed for the associated domain.
// The metadata associated with a domain defines various parameters like
// maximum queue size and capacity, persistent policy, routing policy, etc.
// Queue are identified by URIs (Unified Resource Identifiers) that must
// follow the BlazingMQsyntax, manipulated as 'bmqt::Uri' objects.  A queue URI
// is typically formatted as follows:
//  bmq://my.domain/my.queue
// Note that domain names are unique in BlazingMQ infrastructure, which makes a
// fully qualified queue URI unique too.
// Queues in BlazingMQ infrastructure are created by applications on demand.
// Broke creates a queue when it receives an open-queue request from an
// application for a queue that does not exist currently.
// Application can open a queue by calling 'openQueue' or 'openQueueAsync'
// method on a started session.  Application must pass appropriate flags to
// indicate if it wants to post messages to queue, consume messages from the
// queue, or both.
// Note that 'openQueue' is a blocking method, and returns after specified
// queue has been successfully opened (success) or after specified timeout has
// expired (failure).  'openQueueAsync' method, as the name suggests, is non
// blocking, and the result of the operation is notified via
// 'bmqt::SessionEventType::QUEUE_OPEN_RESULT' session event.
///Example 4
///- - - - -
// The following example demonstrates how to open a queue for posting messages.
// The code first opens the queue with appropriate flags, and then uses
// 'bmqa::MessageEventBuilder' to build a message event and post to the queue.
//  // Session creation and startup logic elided for brevity
//  const char *queueUri = "bmq://my.domain/my.queue";
//  bmqa::QueueId myQueueId(1);       // ID for the queue
//  int rc = session.openQueue(
//                      &myQueueId,
//                      queueUri,
//                      bmqt::QueueFlags::e_WRITE | bmqt::QueueFlags::e_ACK,
//                      bsls::TimeInterval(30, 0));
//  if (rc != 0) {
//      bsl::cerr << "Failed to open queue, rc: "
//                << bmqt::OpenQueueResult::Enum(rc)
//                << bsl::endl;
//      return;
//  }
// Note that apart from 'WRITE' flag, 'ACK' flag has been passed to
// 'openQueue' method above.  This indicates that application is interested in
// receiving 'ACK' notification for each message it posts to the queue,
// irrespective of whether or not the message was successfully received by the
// broker and posted to the queue.
// Once the queue has been successfully opened for writing, messages can be
// posted to the queue for consumption by interested applications.  We will use
// 'bmqa::MessageEventBuilder' to build a message event.
//  // Create a message event builder
//  bmqa::MessageEventBuilder builder;
//  session.loadMessageEventBuilder(&builder);
//  // Create and post a message event containing 1 message
//  bmqa::Message& msg = builder.startMessage();
//  msg.setCorrelationId(myCorrelationId);
//  msg.setDataRef(&myPayload);  // where 'myPayload' is of type 'bdlbb::Blob'
//  rc = builder.packMessage(myQueueId);
//  if (rc != 0) {
//      bsl::cerr << "Failed to pack message, rc: "
//                << bmqt::EventBuilderResult::Enum(rc)
//                << bsl::endl;
//      return;
//  }
//  // Post message event
//  rc = session.post(builder.messageEvent());
//  if (rc != 0) {
//      bsl::cerr << "Failed to post message event to the queue, rc: "
//                << bmqt::PostResult::Enum(rc)
//                << bsl::endl;
//      return;
//  }
//  // ... post more messages
///Closing queues
// After an application no longer needs to produce or consume messages from a
// queue, it can be closed by 'closeQueue' or 'closeQueueAsync' method.  Note
// that closing a queue closes an application's "view" on the queue, and may
// not lead to queue deletion in the broker.  A 'Session' does not expose any
// method to explicitly delete a queue.
// Note that 'closeQueue' is a blocking method and returns after the specified
// queue has been successfully closed (success) or after specified timeout has
// expired (failure).  'closeQueueAsync', as the name suggests, is a
// non-blocking method, and result of the operation is notified via
// 'bmqt::SessionEventType::e_QUEUE_CLOSE_RESULT' session event.
// There are 3 flavors which behave differently with regard to thread blocking
// and callback execution:
//           |  openQueue        |  openQueueSync       |  openQueueAsync
//           |  configureQueue   |  configureQueueSync  |  configureQueueAsync
//           |  closeQueue       |  closeQueueSync      |  closeQueueAsync
//           | (deprecated Sync) | (Synchronous)        | (Asynchronous)
//   event   | unblocks in       | unblocks in event    | executes callback in
//  handler  | internal thread   | handler thread  (*)  | event handler thread
//           |                   |                      |
// nextEvent | unblocks in       | unblocks in          | executes callback
//           | internal thread   | internal thread      | in nextEvent thread
// (*) - guarantees unblocking after all previously enqueued events have been
// emitted to the eventHandler, allowing the user to have proper serialization
// of events for the given queue (for example no more PUSH messages will be
// delivered through the eventHandler for the queue after
// configureQueueSync(maxUnconfirmed = 0) returns).

// BMQ
#include <bmqscm_version.h>
#include <bmqa_abstractsession.h>
#include <bmqa_closequeuestatus.h>
#include <bmqa_configurequeuestatus.h>
#include <bmqa_confirmeventbuilder.h>
#include <bmqa_messageeventbuilder.h>
#include <bmqa_openqueuestatus.h>
#include <bmqt_queueoptions.h>
#include <bmqt_sessionoptions.h>
#include <bmqt_uri.h>

// BDE
#include <ball_log.h>
#include <bsl_memory.h>
#include <bsl_string.h>
#include <bslma_allocator.h>
#include <bslma_managedptr.h>
#include <bslma_usesbslmaallocator.h>
#include <bslmf_nestedtraitdeclaration.h>
#include <bsls_keyword.h>
#include <bsls_timeinterval.h>
#include <bsls_types.h>

namespace BloombergLP {

namespace bmqimp { class Application; }
namespace bmqimp { class Event; }
namespace bmqp   { class MessageGUIDGenerator; }
namespace bslmt  { class Semaphore; }

namespace bmqa {

class Event;
class Message;
class MessageEvent;
class MessageProperties;
class QueueId;
class SessionEvent;

                         // =========================
                         // class SessionEventHandler
                         // =========================

class SessionEventHandler {
    // Pure protocol for an asynchronous event handler.  The implementation
    // must be thread safe if the 'Session' is configured to use multiple
    // threads.

        // Destroy this object.

    void onSessionEvent(const SessionEvent& event) = 0;
        // Process the specified session 'event' (connected, disconnected,
        // queue opened, queue closed, etc.).

    void onMessageEvent(const MessageEvent& event) = 0;
        // Process the specified message 'event' containing one or more
        // messages.

                             // ==================
                             // struct SessionImpl
                             // ==================

struct SessionImpl {
    // Impl structure for the session data members, so that special task such
    // as 'bmqadm' can access them by reinterpret casting a 'Session' object.
    // Care should be taken though since 'Session' is a polymorphic class.

    bslma::Allocator                       *d_allocator_p;
                                             // The allocator to use

    bmqt::SessionOptions                    d_sessionOptions;
                                             // Session options as provided by
                                             // the application.

    bslma::ManagedPtr<SessionEventHandler>  d_eventHandler_mp;
                                             // Event handler, if any, to use
                                             // for notifying application of
                                             // events.

                                             // GUID generator object.

    bslma::ManagedPtr<bmqimp::Application>  d_application_mp;
                                             // The application object.

    SessionImpl(const SessionImpl&)            BSLS_KEYWORD_DELETED;
    SessionImpl& operator=(const SessionImpl&) BSLS_KEYWORD_DELETED;

    // TRAITS
    BSLMF_NESTED_TRAIT_DECLARATION(SessionImpl, bslma::UsesBslmaAllocator)

    SessionImpl(const bmqt::SessionOptions&             options,
                bslma::ManagedPtr<SessionEventHandler>  eventHandler,
                bslma::Allocator                       *allocator = 0);
        // Create a new object having the specified 'options' and
        // 'eventHandler' and using the optionally specified 'allocator'.

                               // =============
                               // class Session
                               // =============

class Session: public AbstractSession {
    // A session with a BlazingMQ broker.

    // TYPES
    typedef AbstractSession::OpenQueueCallback      OpenQueueCallback;
        // Invoked as a response to an asynchronous open queue operation,
        // 'OpenQueueCallback' is an alias for a callback function object
        // (functor) that takes as an argument the specified 'result',
        // providing the result and context of the requested operation.

    typedef AbstractSession::ConfigureQueueCallback ConfigureQueueCallback;
        // Invoked as a response to an asynchronous configure queue operation,
        // 'ConfigureQueueCallback' is an alias for a callback function object
        // (functor) that takes as an argument the specified 'result',
        // providing the result and context of the requested operation.

    typedef AbstractSession::CloseQueueCallback     CloseQueueCallback;
        // Invoked as a response to an asynchronous close queue operation,
        // 'CloseQueueCallback' is an alias for a callback function object
        // (functor) that takes as an argument the specified 'result',
        // providing the result and context of the requested operation.


    // DATA
    SessionImpl d_impl; // Sole data member of this object.

    Session(const Session&)            BSLS_KEYWORD_DELETED;
    Session& operator=(const Session&) BSLS_KEYWORD_DELETED;
        // Copy constructor and assignment operator are not implemented

    // TRAITS
    BSLMF_NESTED_TRAIT_DECLARATION(Session, bslma::UsesBslmaAllocator)

    Session(const bmqt::SessionOptions&  options   = bmqt::SessionOptions(),
            bslma::Allocator            *allocator = 0);
        // Create a new 'Session' in *synchronous* mode using the optionally
        // specified 'options'.  In such mode, events have to be fetched by the
        // application using the 'nextEvent()' method.  Optionally specify an
        // 'allocator' used to supply memory.  If 'allocator' is 0, the
        // currently installed default allocator is used.

    Session(bslma::ManagedPtr<SessionEventHandler>  eventHandler,
            const bmqt::SessionOptions&             options   =
            bslma::Allocator                       *allocator = 0);
        // Create a 'Session' in *asynchronous* mode using the specified
        // 'eventHandler' as callback for event processing and the optionally
        // specified 'options'.  Optionally specify an 'allocator' used to
        // supply memory.  If the optionally specified 'allocator' is 0, the
        // currently installed default allocator is used.

        // Stop the 'Session' and destroy this object.

    //   (virtual bmqa::AbstractSession)

    ///Session management
    int start(const bsls::TimeInterval& timeout = bsls::TimeInterval())
        // Connect to the BlazingMQ broker and start the message processing for
        // this 'Session'.  This method blocks until either the 'Session' is
        // connected to the broker, fails to connect, or the operation times
        // out.  If the optionally specified 'timeout' is not populated, use
        // the one defined in the session options.  Return 0 on success, or a
        // non-zero value corresponding to the 'bmqt::GenericResult::Enum' enum
        // values otherwise.  The behavior is undefined if this method is
        // called on an already started 'Session'.

    int startAsync(const bsls::TimeInterval& timeout = bsls::TimeInterval())
        // Connect to the BlazingMQ broker and start the message processing for
        // this 'Session'.  This method returns without blocking.  The result
        // of the operation is communicated with a session event.  If the
        // optionally specified 'timeout' is not populated, use the one defined
        // in the session options.  Return 0 on success (this doesn't imply the
        // session is connected !), or a non-zero value corresponding to the
        // 'bmqt::GenericResult::Enum' enum values otherwise.  The behavior is
        // undefined if this method is called on an already started 'Session'.

    void stop() BSLS_KEYWORD_OVERRIDE;
        // Gracefully disconnect from the BlazingMQ broker and stop the
        // operation of this 'Session'.  This method blocks waiting for all
        // already invoked event handlers to exit and all session-related
        // operations to be finished.  No other method but 'start()' may be
        // used after this method returns.  This method must *NOT* be called if
        // the session is in synchronous mode (i.e., not using the
        // EventHandler), 'stopAsync()' should be called in this case.

    void stopAsync() BSLS_KEYWORD_OVERRIDE;
        // Disconnect from the BlazingMQ broker and stop the operation of this
        // 'Session'.  This method returns without blocking and neither enforce
        // nor waits for any already started session-related operation to be
        // finished.  No method may be used after this method returns.

    void finalizeStop() BSLS_KEYWORD_OVERRIDE;
        // **DEPRECATED**
        // This method is only to be used if the session is in synchronous mode
        // (i.e., not using the EventHandler): it must be called once all
        // threads getting events with 'nextEvent()' have been joined.

    MessageEventBuilder createMessageEventBuilder();
        // Return a MessageEventBuilder that can be used to build message event
        // for posting on this session.  The behavior is undefined unless the
        // session has been successfully started.  Note that lifetime of the
        // returned object is bound by the lifetime of this session instance
        // (i.e., returned instance cannot outlive this session instance).
        // Also note that the 'MessageEventBuilder' objects are pooled, so this
        // operation is cheap, and 'MessageEventBuilder' can be obtained on
        // demand and kept on the stack.
        // DEPRECATED: Use the 'loadMessageEventBuilder instead.  This
        //             method will be marked as 'BSLS_ANNOTATION_DEPRECATED' in
        //             future release of libbmq.

    void loadMessageEventBuilder(MessageEventBuilder *builder)
        // Load into the specified 'builder' an instance of
        // 'bmqa::MessageEventBuilder' that can be used to build message event
        // for posting on this session.  The behavior is undefined unless the
        // session has been successfully started and 'builder' is non-null.
        // Note that lifetime of the loaded object is bound by the lifetime of
        // this session instance (i.e., loaded instance cannot outlive this
        // session instance).  Also note that the 'MessageEventBuilder' objects
        // are pooled, so this operation is cheap, and 'MessageEventBuilder'
        // can be obtained on demand and kept on the stack.

    void loadConfirmEventBuilder(ConfirmEventBuilder *builder)
        // Load into the specified 'builder' an instance of
        // 'bmqa::ConfirmEventBuilder' that can be used to build a batch of
        // CONFIRM messages for sending to the broker.  The behavior is
        // undefined unless the session has been successfully started and
        // 'builder' is non-null.  Note that the lifetime of the loaded object
        // is bound by the lifetime of this session instance (i.e., loaded
        // instance cannot outlive this session instance).

    void loadMessageProperties(MessageProperties *buffer)
        // Load into the specified 'buffer' an instance of 'MessageProperties'
        // that can be used to specify and associate properties while building
        // a 'bmqa::Message'.  The behavior is undefined unless the session has
        // been successfully started and 'buffer' is non-null.  Note that
        // lifetime of the loaded object is bound by the lifetime of this
        // session instance (i.e., loaded instance cannot outlive this session
        // instance).

    ///Queue management
    int getQueueId(QueueId          *queueId,
                   const bmqt::Uri&  uri) const BSLS_KEYWORD_OVERRIDE;
        // Load in the specified 'queueId' the queue corresponding to the
        // specified 'uri' and return 0 if such a queue was found, or leave
        // 'queueId' untouched and return a non-zero value if no queue
        // corresponding to 'uri' is currently open.

    int getQueueId(QueueId                    *queueId,
                   const bmqt::CorrelationId&  correlationId) const
        // Load in the specified 'queueId' the queue corresponding to the
        // specified 'correlationId' and return 0 if such a queue was found, or
        // leave 'queueId' untouched and return a non-zero value if no queue
        // corresponding to 'correlationId' is currently open.

    int openQueue(QueueId                   *queueId,
                  const bmqt::Uri&           uri,
                  bsls::Types::Uint64        flags,
                  const bmqt::QueueOptions&  options = bmqt::QueueOptions(),
                  const bsls::TimeInterval&  timeout = bsls::TimeInterval())
        // DEPRECATED: Use the 'openQueueSync(QueueId *queueId...)' instead.
        //             This method will be marked as
        //             'BSLS_ANNOTATION_DEPRECATED' in future release of
        //             libbmq.

    openQueueSync(QueueId                   *queueId,
                  const bmqt::Uri&           uri,
                  bsls::Types::Uint64        flags,
                  const bmqt::QueueOptions&  options = bmqt::QueueOptions(),
                  const bsls::TimeInterval&  timeout = bsls::TimeInterval())
        // Open the queue having the specified 'uri' with the specified 'flags'
        // (a combination of the values defined in 'bmqt::QueueFlags::Enum'),
        // using the specified 'queueId' to correlate events related to that
        // queue.  The object 'queueId' referring to is modified, so the
        // 'queueId' represents the actual queue uri, flags and options.
        // Return a result providing the status and context of the operation.
        // Use the optionally specified 'options' to configure some advanced
        // settings.  Note that this operation fails if 'queueId' is
        // non-unique.  If the optionally specified 'timeout' is not populated,
        // use the one defined in the session options.  This operation will
        // block until either success, failure, or timing out happens.
        // THREAD: Note that calling this method from the event processing
        //         thread(s) (i.e., from the EventHandler callback, if
        //         provided) *WILL* lead to a *DEADLOCK*.

    int openQueue(const QueueId&            queueId,
                  const bmqt::Uri&          uri,
                  bsls::Types::Uint64       flags,
                  const bmqt::QueueOptions& options = bmqt::QueueOptions(),
                  const bsls::TimeInterval& timeout = bsls::TimeInterval());
        // DEPRECATED: Use the 'openQueue(QueueId *queueId...)' instead.  This
        //             method will be marked as 'BSLS_ANNOTATION_DEPRECATED' in
        //             future release of libbmq.

    openQueueAsync(QueueId                   *queueId,
                   const bmqt::Uri&           uri,
                   bsls::Types::Uint64        flags,
                   const bmqt::QueueOptions&  options = bmqt::QueueOptions(),
                   const bsls::TimeInterval&  timeout = bsls::TimeInterval())
        // DEPRECATED: Use the 'openQueueAsync(...)' with callback flavor
        //             instead.  This method will be marked as
        //             'BSLS_ANNOTATION_DEPRECATED' in future release of
        //             libbmq.

    openQueueAsync(QueueId                   *queueId,
                   const bmqt::Uri&           uri,
                   bsls::Types::Uint64        flags,
                   const OpenQueueCallback&   callback,
                   const bmqt::QueueOptions&  options  = bmqt::QueueOptions(),
                   const bsls::TimeInterval&  timeout  = bsls::TimeInterval())
        // Asynchronously open the queue having the specified 'uri' with the
        // specified 'flags' (a combination of the values defined in
        // 'bmqt::QueueFlags::Enum'), using the specified 'queueId' to
        // correlate events related to that queue and the optionally specified
        // 'options' to configure some advanced settings.  The object 'queueId'
        // referring to is modified, so the 'queueId' represents the actual
        // queue uri, flags and options.  The result of the operation is
        // communicated to the specified 'callback' via a
        // 'bmqa::OpenQueueStatus', providing the status and context of the
        // requested operation.  Note that this operation fails if 'queueId' is
        // non-unique.  If the optionally specified 'timeout' is not populated,
        // use the one defined in the session options.
        // THREAD: The 'callback' will *ALWAYS* be invoked from the
        //         EventHandler thread(s) (or if a SessionEventHandler was not
        //         specified, from the thread invoking 'nextEvent').

    configureQueue(QueueId                   *queueId,
                   const bmqt::QueueOptions&  options,
                   const bsls::TimeInterval&  timeout = bsls::TimeInterval())
        // DEPRECATED: Use the 'configureQueueSync(QueueId *queueId...)
        //             instead.  This method will be marked as
        //             'BSLS_ANNOTATION_DEPRECATED' in future release of
        //             libbmq.

                    const QueueId             *queueId,
                    const bmqt::QueueOptions&  options,
                    const bsls::TimeInterval&  timeout = bsls::TimeInterval())
        // Configure the queue identified by the specified 'queueId' using the
        // specified 'options' and return a result providing the status and
        // context of the operation.  If the optionally specified 'timeout' is
        // not populated, use the one defined in the session options.  This
        // operation returns error if there is a pending configure for the same
        // queue.  This operation will block until either success, failure, or
        // timing out happens.
        // Note that the following 'bmqt::QueueOptions' fields cannot be
        // reconfigured after the queue has been opened:
        //   - suspendsOnBadHostHealth
        // Attempts to reconfigure these fields will yield an 'e_NOT_SUPPORTED'
        // error code.
        // THREAD: Note that calling this method from the event processing
        //         thread(s) (i.e., from the EventHandler callback, if
        //         provided) *WILL* lead to a *DEADLOCK*.

    int configureQueueAsync(
                    QueueId                   *queueId,
                    const bmqt::QueueOptions&  options,
                    const bsls::TimeInterval&  timeout = bsls::TimeInterval())
        // DEPRECATED: Use the 'configureQueueAsync(...)' with callback flavor
        //             instead.  This method will be marked as
        //             'BSLS_ANNOTATION_DEPRECATED' in future release of
        //             libbmq.

    void configureQueueAsync(
               const QueueId                 *queueId,
               const bmqt::QueueOptions&      options,
               const ConfigureQueueCallback&  callback,
               const bsls::TimeInterval&      timeout  = bsls::TimeInterval())
        // Asynchronously configure the queue identified by the specified
        // 'queueId' using the specified 'options' to configure some advanced
        // settings.  The result of the operation is communicated to the
        // specified 'callback' via a 'bmqa::ConfigureQueueStatus', providing
        // the status and context of the requested operation.  If the
        // optionally specified 'timeout' is not populated, use the one defined
        // in the session options.
        // Note that the following 'bmqt::QueueOptions' fields cannot be
        // reconfigured after the queue has been opened:
        //   - suspendsOnBadHostHealth
        // Attempts to reconfigure these fields will yield an 'e_NOT_SUPPORTED'
        // error code.
        // THREAD: The 'callback' will *ALWAYS* be invoked from the
        //         EventHandler thread(s) (or if a SessionEventHandler was not
        //         specified, from the thread invoking 'nextEvent').

    int closeQueue(QueueId                   *queueId,
                   const bsls::TimeInterval&  timeout = bsls::TimeInterval())
        // DEPRECATED: Use the 'closeQueueSync(QueueId *queueId...) instead.
        //             This method will be marked as
        //             'BSLS_ANNOTATION_DEPRECATED' in future release of
        //             libbmq.

    closeQueueSync(const QueueId             *queueId,
                   const bsls::TimeInterval&  timeout = bsls::TimeInterval())
        // Close the queue identified by the specified 'queueId' and return a
        // result providing the status and context of the operation.  If the
        // optionally specified 'timeout' is not populated, use the one defined
        // in the session options.  Any outstanding configureQueue request for
        // this 'queueId' will be canceled.  This operation will block until
        // either success, failure, or timing out happens.  Once this method
        // returns, there is guarantee that no more messages and events for
        // this 'queueId' will be received.  Note that successful processing of
        // this request in the broker closes this session's view of the queue;
        // the underlying queue may not be deleted in the broker.  When this
        // method returns, the correlationId associated to the queue is
        // cleared.
        // THREAD: Note that calling this method from the event processing
        //         thread(s) (i.e., from the EventHandler callback, if
        //         provided) *WILL* lead to a *DEADLOCK*.

    int closeQueue(const QueueId&            queueId,
                   const bsls::TimeInterval& timeout = bsls::TimeInterval());
        // DEPRECATED: Use the 'closeQueue(QueueId *queueId...) instead.  This
        //             method will be marked as 'BSLS_ANNOTATION_DEPRECATED' in
        //             future release of libbmq.

    closeQueueAsync(QueueId                   *queueId,
                    const bsls::TimeInterval&  timeout = bsls::TimeInterval())
        // DEPRECATED: Use the 'closeQueueAsync(...)' with callback flavor
        //             instead.  This method will be marked as
        //             'BSLS_ANNOTATION_DEPRECATED' in future release of
        //             libbmq.

    void closeQueueAsync(
                   const QueueId             *queueId,
                   const CloseQueueCallback&  callback,
                   const bsls::TimeInterval&  timeout  = bsls::TimeInterval())
        // Asynchronously close the queue identified by the specified
        // 'queueId'.  Any outstanding configureQueue requests will be
        // canceled.  The result of the operation is communicated to the
        // specified 'callback' via a 'bmqa::CloseQueueStatus', providing the
        // status and context of the operation.  If the optionally specified
        // 'timeout' is not populated, use the one defined in the session
        // options.  Note that successful processing of this request in the
        // broker closes this session's view of the queue; the underlying queue
        // may not be deleted in the broker.  The correlationId associated to
        // the queue remains valid until the 'bmqa::CloseQueueStatus' result
        // has been received and processed by the 'callback', after which it
        // will be cleared and no more messages and events for this 'queueId'
        // will be received.
        // THREAD: The 'callback' will *ALWAYS* be invoked from the
        //         EventHandler thread(s) (or if a SessionEventHandler was not
        //         specified, from the thread invoking 'nextEvent').

    closeQueueAsync(const QueueId&            queueId,
                    const bsls::TimeInterval& timeout = bsls::TimeInterval());
        // DEPRECATED: Use the 'closeQueueAsync(...)' with callback flavor
        //             instead.  This method will be marked as
        //             'BSLS_ANNOTATION_DEPRECATED' in future release of
        //             libbmq.

    ///Queue manipulation
    Event nextEvent(const bsls::TimeInterval& timeout = bsls::TimeInterval())
        // Return the next available event received for this session.  If there
        // is no event available, this method blocks for up to the optionally
        // specified 'timeout' time interval for an event to arrive.  An empty
        // time interval for 'timeout' (the default) indicates that the method
        // should not timeout (the method will not return until the next event
        // is available).  Return a 'bmqa::SessionEvent' of type
        // 'bmqt::SessionEventType::e_TIMEOUT' if a timeout was specified and
        // that timeout expired before any event was received.  Note that this
        // method can only be used if the session is in synchronous mode (ie
        // not using the EventHandler).  The behavior is undefined unless the
        // session was started.

    int post(const MessageEvent& event) BSLS_KEYWORD_OVERRIDE;
        // Asynchronously post the specified 'event' that must contain one or
        // more 'Messages'.  The return value is one of the values defined in
        // the 'bmqt::PostResult::Enum' enum.  Return zero on success and a
        // non-zero value otherwise.  Note that success implies that SDK has
        // accepted the 'event' and will eventually deliver it to the broker.
        // The behavior is undefined unless the session was started.

    int confirmMessage(const Message& message) BSLS_KEYWORD_OVERRIDE;
        // Asynchronously confirm the receipt of the specified 'message'.  This
        // indicates that the application is done processing the message and
        // that the broker can safely discard it from the queue according to
        // the retention policy set up for that queue.  Return 0 on success,
        // and a non-zero value otherwise.  Note that success implies that SDK
        // has accepted the 'message' and will eventually send confirmation
        // notification to the broker.

    int confirmMessage(const MessageConfirmationCookie& cookie)
        // Asynchronously confirm the receipt of the message with which the
        // specified 'cookie' is associated.  This indicates that the
        // application is done processing the message and that the broker can
        // safely discard it from the queue according to the retention policy
        // set up for that queue.  Return 0 on success, and a non-zero value
        // otherwise.  Note that success implies that SDK has accepted the
        // 'message' and will eventually send confirmation notification to the
        // broker.

    int confirmMessages(ConfirmEventBuilder *builder) BSLS_KEYWORD_OVERRIDE;
        // Asynchronously confirm the receipt of the batch of CONFIRM messages
        // contained in the specified 'builder'.  This indicates that the
        // application is done processing all of the messages and that the
        // broker can safely discard them from the queue according to the
        // retention policy set up for that queue.  The return value is one of
        // the values defined in the 'bmqt::GenericResult::Enum' enum.  Note
        // that in case of success, the instance pointed by the 'builder' will
        // be reset.  Also note that success implies that SDK has accepted all
        // of the messages in 'builder' and will eventually send confirmation
        // notification to the broker, whereas failure implies that none of the
        // messages in 'builder' were accepted.  Behavior is undefined unless
        // 'builder' is non-null.

    ///Debugging related
    int configureMessageDumping(const bslstl::StringRef& command)
        // Configure this session instance to dump messages to the installed
        // logger at 'ball::Severity::INFO' level according to the specified
        // 'command' that should adhere to the following pattern:
        //   IN|OUT|PUSH|ACK|PUT|CONFIRM ON|OFF|100|10s
        // where each token has a specific meaning:
        //: o !IN!      : incoming ('PUSH' and 'ACK') events
        //: o !OUT!     : outgoing ('PUT' and 'CONFIRM') events
        //: o !PUSH!    : incoming 'PUSH' events
        //: o !ACK!     : incoming 'ACK' events
        //: o !PUT!     : outgoing 'PUT' events
        //: o !CONFIRM! : outgoing 'CONFIRM' events
        //: o !ON!      : turn on message dumping until explicitly turned off
        //: o !OFF!     : turn off message dumping
        //: o !100!     : turn on message dumping for the next 100 messages
        //: o !10s!     : turn on message dumping for the next 10 seconds
        // Above, the numerical values '100' and '10' are just for illustration
        // purposes, and application can choose an appropriate positive numeric
        // value for them.  Also, pattern is case-insensitive.  Return zero if
        // 'command' is valid and message dumping has been configured, non-zero
        // value otherwise.  The behavior is undefined unless the session has
        // been started.

    void *impl();
        // Do *NOT* use.  Internal function, reserved for BlazingMQ internal
        // usage.

// ============================================================================
//                             INLINE DEFINITIONS
// ============================================================================

                               // -------------
                               // class Session
                               // -------------

Session::confirmMessage(const Message& message)
    return confirmMessage(message.confirmationCookie());

    return &d_impl;

}  // close package namespace
}  // close enterprise namespace
