public final class Session extends Object implements AbstractSession
This component provides access to a message queue broker. The broker manages named, persistent queues of messages. This broker allows a client to open queues, post messages to them, or retrieve messages from them. All of these operations take place within the context of the session opened by the client application.
Messages received from a broker are communicated to the application by the session associated
with that broker in the form of events (see Event
). Events can be of
two different types: (1) Messages and queue status events (QueueEvent
),
or (2) Session status events (SessionEvent
).
A Session
dispatches events to the application in an asynchronous mode. The applicaton
must supply a concrete SessionEventHandler
object at construction time.
The concrete SessionEventHandler
provided by the application must implement the SessionEventHandler.handleSessionEvent()
method, which will be called by the Session
every time a session event is received.
Note that the concrete SessionEventHandler
may implement a handler for the specific
session event type and it will be called by the Session
for that event instead of the
default handleSessionEvent
.
A Session
allows start
and stop
operations in synchronous or
asynchronous version for the convenience of the programmer.
By default a Session
connects to the local broker, which in turn may connect to a
remote cluster based on configuration.
After a Session
started, the application has to open one or several queues in read
and/or write mode (see AbstractSession.getQueue()
).
Session
object is a heavy object representing the negotiated TCP session with the
broker, and the entire associated state (opened queues, etc.) Therefore, there should only be
one session per task, created at task startup and closed at task shutdown. It is extremely
inefficient, and an abuse of resource to create a session ad-hoc every time a message needs to be
posted.
Session
establishes a communication with a broker service using TCP/IP. Each Session
object must be constructed with a SessionOptions
object, which
provides the necessary information to connect to the broker. In particular, the SessionOptions
object must specify the IP address and port needed to connect to the broker. The
SessionOptions
object may also provide extra parameters for tuning the TCP connection
behavior.
Note that in most cases the user does not need to explicitly construct a SessionOptions
object: the default factory method SessionOptions.createDefault()
creates an instance that will connect to the
broker service on the local machine using the standard port.
A Session
object is created in an unconnected state. The start
or startAsync
method must be called to connect to the broker. Note that start
method is
blocking, and returns either after connection to broker has been established (success), or after
specified timeout (failure). startAsync
method, as the name suggests, connects to the
broker asynchronously (i.e., it returns immediately), and the result of the operation is notified
via SessionEvent.StartStatus
session event.
When the Session
is no longer needed, the application should call the stop
(blocking) or stopAsync
(non-blocking) method to disconnect from the broker. Note that
the session can be restarted with a call to start
or startAsync
once it has been
fully stopped. To fully shut down the session linger
should be called after stop, and
after that the session restart is not possible any more.
Session
will
automatically try to reconnect periodically. The Session
will also notify the application
of the event of losing the connection via SessionEvent.ConnectionLost
session event.
Once the connection has been re-established with the broker (as a result of one of the
periodic reconnection attempts), the Session
will notify the application via SessionEvent.Reconnected
session event. After the connection re-establishment,
the Session
will attempt to reopen the queues that were in OPEN
state prior to
connection loss. The Session
will notify the application of the result of reopen
operation via QueueControlEvent.ReopenQueueResult
for each queue. Note
that a reopen operation on a queue may fail (due to broker issue, machine issue, etc), so the
application must keep track on these session events, and stop posting on a queue that failed to
reopen.
After all reopen operations are complete and application has been notified with all ReopenQueueResult
events, the Session
delivers a SessionEvent.StateRestored
session event to the application.
Note that if there were no opened queues prior to connection loss then the Session
will send StateRestored
event immediately after Reconnected
session event.
Session
start it, stop and shutdown it.
void runSession() { SessionOptions sessionOptions = SessionOptions.builder() .setBrokerUri(URI.create(tcp://localhost:30114") .build(); AbstractSession session = new Session(sessionOptions, null); try { session.start(Duration.ofSeconds(10)); System.out.println("Session started"); // Open queue in READ or WRITE or READ/WRITE mode, and receive or // post messages, etc. session.stop(Duration.ofSeconds(10)); } catch (BMQException e) { System.out.println("Operation error: " + e); } finally { session.linger(); } }This example can be simplified because the constructor for
Session
uses a default SessionOptions
object that will connect to the local broker service. The example may be
rewritten as follow:
void runSession() { AbstractSession session = new Session(null); try { session.start(Duration.ofSeconds(10)); System.out.println("Session started"); // Open queue in READ or WRITE or READ/WRITE mode, and receive or // post messages, etc. session.stop(Duration.ofSeconds(10)); } catch (BMQException e) { System.out.println("Operation error: " + e); } finally { session.linger(); } }
Session
is created with non null SessionEventHandler
then it will be used to notify the application about session events like results of Session.startAsync()
and Session.stopAsync()
operations, and connection statuses like
SessionEvent.ConnectionLost
, SessionEvent.Reconnected
, SessionEvent.StateRestored
. Each event may be handled in its special callback if the application
implements related method, e.g. SessionEventHandler.handleStartStatusSessionEvent()
, or it can use the default implementation
that calls SessionEventHandler.handleSessionEvent()
, for each session event without user defined handler.
SessionEventHandler
Session session = new Session(new SessionEventHandler() { public void handleSessionEvent(SessionEvent event) { switch (event.type()) { case START_STATUS_SESSION_EVENT: GenericResult result = ((SessionEvent.StartStatus)event).result(); if (result.isSuccess()) { System.out.println("The connection to the broker is established"); openQueues(); startPostingToQueues(); } else if (result.isTimeout()) { System.out.println("The connection to the broker has timed out"); } break; case STOP_STATUS_SESSION_EVENT: GenericResult result = ((SessionEvent.StopStatus)event).result(); if (result.isSuccess()) { System.out.println("The connection to the broker is closed gracefully"); } break; case CONNECTION_LOST_SESSION_EVENT: // The connection to the broker dropped. Stop posting to the queue. stopPostingToQueues(); break; case RECONNECTED_SESSION_EVENT: // The connection to the broker has been restored, but the queue may not be ready yet. break; case STATE_RESTORED_SESSION_EVENT: // The connection to the broker has been restored and all queues // have been re-opened. Resume posting to the queue. resumePostingToQueues(); break; cae UNKNOWN_SESSION_EVENT: default: System.out.println("Unexpected session event: " + event); } } }); session.startAsync(Duration.ofSeconds(10));And in the following version session start status event will be handled in a separate callback while all the other session events will be passed to
handleSessionEvent
Session session = new Session( new SessionEventHandler() { public void handleSessionEvent(SessionEvent event) { System.out.println("Other session event: " + event); } public void handleStartStatusSessionEvent(SessionEvent.StartStatus event) { System.out.println("Session start status: " + event.result()); if (event.result().isSuccess()) { System.out.println("The connection to the broker is established"); } } }); session.startAsync(Duration.ofSeconds(10));Note that after the
Session
is associated with some session event handler, this
association cannot be changed or canceled. The event handler will be used for processing events
until the Session.linger()
is called.
Session
has been created and started, the application can use it to open queues
for producing and/or consuming messages. A queue is associated with a domain. Domain metadata
must be deployed in the BlazingMQ infrastructure prior to opening queues under that domain,
because opening a queue actually loads the metadata deployed for the associated domain.
The metadata associated with a domain defines various parameters like maximum queue size and capacity, persistent policy, routing policy, etc.
Queue are identified by URIs (Unified Resource Identifiers) that must follow the BlazingMQ
syntax, manipulated as Uri
objects. A queue URI is typically formatted
as follows:
bmq://my.domain/my.queueNote that domain names are unique in BlazingMQ infrastructure, which makes a fully qualified queue URI unique too.
Queues in BlazingMQ infrastructure are created by applications on demand. Broker creates a queue when it receives an open-queue request from an application for a queue that does not exist currently.
Application can open a queue by calling AbstractSession.getQueue()
method on a started session. Application must pass appropriate flags
to indicate if it wants to post messages to queue, consume messages from the queue, or both. Also
it has to provide concrete QueueEventHandler
to recieve queue related
events. Depending on the queue flags it also needs to supply concrete AckMessageHandler
it the queue will be used for message posting. This handler
will be called each time when ACK
event comes from the broker. And if the queue will be
used for comsuming the messages then PushMessageHandler
should be
supplied. It will be called on each incoming PushMessage
. Once the
application obtainns the Queue
interface it can call Queue.open()
or Queue.openAsync()
.
Note that open
is a blocking method, and returns after the queue has been successfully
opened (success) or after specified timeout has expired (failure). openAsync
method, as
the name suggests, is non blocking, and the result of the operation is notified via QueueControlEvent.OpenQueueResult
session event.
Queue.createPutMessage(java.nio.ByteBuffer...)
to create a message and post to the queue.
// Session creation, startup logic and exception handling elided for brevity
Uri queueUri = new Uri("bmq://my.domain/my.queue");
// Application can implement concrete QueueMessageHandler and AckMessageHandler,
// or e.g. use a functional interface feature with a standard collection.
ArrayList<QueueControlEvent> queueEvents = new ArrayList();
ArrayList<AckMessage> ackMessages = new ArrayList();
long flags = QueueFlags.setWriter(0);
flags = QueueFlags.setAck(flags);
Queue myQueue = session.getQueue(
queueUri,
flags, // Setup queue flag for producer
queueEvents::add, // QueueEventHandler is a functional interface
ackMessages::add, // AckMessageHandler is a functional interface
null); // PushMessageHandler is null since we don't want
// to receive messages
myQueue.open(QueueOptions.createDefault(), Duration.ofSeconds(5));
Note that apart from QueueFlags.WRITE
flag, QueueFlags.ACK
flag has been passed to getQueue
method above. This
indicates that application is interested in receiving ACK
notification for each message
it posts to the queue, irrespective of whether or not the message was successfully received by
the broker and posted to the queue. Also note that if the queue has QueueFlags.ACK
} flag
then every PUT
message that is going to be posted via this queue should have a valid
CorrelationId
, see Queue.createPutMessage(java.nio.ByteBuffer...)
.
Once the queue has been successfully opened for writing, messages can be posted to the queue for consumption by interested applications.
// Prepare user payload String userMessage = "Hello!"; ByteBuffer byteBuffer = ByteBuffer.wrap(userMessage.getBytes()); // Get PUT message interface from the opened queue PutMessage msg = myQueue.createPutMessage(byteBuffer); // Add unique correlation ID. The broker will send ACK in response to this message. // The ID can be used to bind this message with the corresponding ACK. msg.setAutoCorrelationId(); // Add some optional properties MessageProperties mp = msg.messageProperties(); mp.setPropertyAsString("myId", "abcd-efgh-ijkl"); mp.setPropertyAsInt64("timestamp", 123456789L); // Post this message to the queue myQueue.post(msg);To post multiple messages at once the application may use
Queue.pack()
and Queue.flush()
methods:
for (int i = 0; i < 10; i++) {
PutMessage msg = myQueue.createPutMessage(payload);
// ... Set CorrelationID message properties
myQueue.pack(msg);
}
// Now post the messages all together
myQueue.flush();
Queue.close()
or Queue.closeAsync()
method. Note that closing a queue closes
an application's "view" on the queue, and may not lead to queue deletion in the broker. A Session
does not expose any method to explicitly delete a queue.
Note that close
is a blocking method and returns after the queue has been successfully
closed (success) or after specified timeout has expired (failure). closeAsync
, as the
name suggests, is a non-blocking method, and result of the operation is notified via QueueControlEvent.CloseQueueResult
queue event.
Constructor and Description |
---|
Session(SessionEventHandler eh)
Creates a new session object with default session options.
|
Session(SessionOptions so,
SessionEventHandler eh)
Creates a new session object.
|
Modifier and Type | Method and Description |
---|---|
Queue |
getQueue(Uri uri,
long flags,
QueueEventHandler handler,
AckMessageHandler ackHandler,
PushMessageHandler pushHandler)
Creates a queue representation.
|
boolean |
isStarted()
Checks if the session is started.
|
void |
linger()
Shutdowns all connection and event handling threads.
|
void |
start(Duration timeout)
Connects to the BlazingMQ broker and start the message processing.
|
void |
startAsync(Duration timeout)
Connects to the BlazingMQ broker and start the message processing
|
void |
stop(Duration timeout)
Gracefully disconnects from the BlazingMQ broker and stop the operation of this session.
|
void |
stopAsync(Duration timeout)
Gracefully disconnects from the BlazingMQ broker and stop the operation of this session.
|
public Session(SessionEventHandler eh)
eh
- session event handler callback interfaceSessionOptions
,
SessionEventHandler
public Session(SessionOptions so, SessionEventHandler eh)
so
- specified optionseh
- session event handler callback interfaceIllegalArgumentException
- if the specified session options or event handler is nullSessionOptions
,
SessionEventHandler
public void start(Duration timeout)
This method blocks until either the session is connected to the broker, fails to connect, or the operation times out.
start
in interface AbstractSession
timeout
- start timeout valueBMQException
- if start attempt failedpublic void startAsync(Duration timeout)
This method returns without blocking. The result of the operation is communicated with a session event. If the optionally specified 'timeout' is not populated, use the one defined in the session options.
startAsync
in interface AbstractSession
timeout
- start timeout valueBMQException
- in case of failurepublic boolean isStarted()
isStarted
in interface AbstractSession
public void stop(Duration timeout)
This method blocks waiting for all already invoked event handlers to exit and all session-related operations to be finished.
stop
in interface AbstractSession
timeout
- stop timeout valueBMQException
- if stop attempt failedpublic void stopAsync(Duration timeout)
This method returns without blocking. The result of the operation is communicated with a
session event. If the optionally specified timeout
is not populated, use the one
defined in the session options.
stopAsync
in interface AbstractSession
timeout
- stop timeout valueBMQException
- in case of failurepublic void linger()
This method must be called when the session is stopped. No other method may be used after this method returns.
linger
in interface AbstractSession
public Queue getQueue(Uri uri, long flags, QueueEventHandler handler, AckMessageHandler ackHandler, PushMessageHandler pushHandler)
Returned Queue
may be in any state.
getQueue
in interface AbstractSession
uri
- URI of this queue, immutableflags
- a combination of the values defined in QueueFlags
handler
- queue event handler callback interface, must be non-null for async operations
and health sensitive queuesackHandler
- callback handler for incoming ACK events, can be null if only consumerpushHandler
- callback handler for incoming PUSH events, can be null if only producerIllegalArgumentException
- in case of a wrong combination of the queue flags and the
handlersBMQException
- in case of other failuresUri
,
QueueFlags
,
QueueEventHandler
,
AckMessageHandler
,
PushMessageHandler
Copyright © 2023 Bloomberg L.P.. All rights reserved.