public final class Session extends Object implements AbstractSession
This component provides access to a message queue broker. The broker manages named, persistent queues of messages. This broker allows a client to open queues, post messages to them, or retrieve messages from them. All of these operations take place within the context of the session opened by the client application.
Messages received from a broker are communicated to the application by the session associated
with that broker in the form of events (see Event). Events can be of
two different types: (1) Messages and queue status events (QueueEvent),
or (2) Session status events (SessionEvent).
A Session dispatches events to the application in an asynchronous mode. The applicaton
must supply a concrete SessionEventHandler object at construction time.
The concrete SessionEventHandler provided by the application must implement the SessionEventHandler.handleSessionEvent() method, which will be called by the Session
every time a session event is received.
Note that the concrete SessionEventHandler may implement a handler for the specific
session event type and it will be called by the Session for that event instead of the
default handleSessionEvent.
A Session allows start and stop operations in synchronous or
asynchronous version for the convenience of the programmer.
By default a Session connects to the local broker, which in turn may connect to a
remote cluster based on configuration.
After a Session started, the application has to open one or several queues in read
and/or write mode (see AbstractSession.getQueue()).
Session object is a heavy object representing the negotiated TCP session with the
broker, and the entire associated state (opened queues, etc.) Therefore, there should only be
one session per task, created at task startup and closed at task shutdown. It is extremely
inefficient, and an abuse of resource to create a session ad-hoc every time a message needs to be
posted.
Session establishes a communication with a broker service using TCP/IP. Each Session object must be constructed with a SessionOptions object, which
provides the necessary information to connect to the broker. In particular, the SessionOptions object must specify the IP address and port needed to connect to the broker. The
SessionOptions object may also provide extra parameters for tuning the TCP connection
behavior.
Note that in most cases the user does not need to explicitly construct a SessionOptions object: the default factory method SessionOptions.createDefault() creates an instance that will connect to the
broker service on the local machine using the standard port.
A Session object is created in an unconnected state. The start or startAsync method must be called to connect to the broker. Note that start method is
blocking, and returns either after connection to broker has been established (success), or after
specified timeout (failure). startAsync method, as the name suggests, connects to the
broker asynchronously (i.e., it returns immediately), and the result of the operation is notified
via SessionEvent.StartStatus session event.
When the Session is no longer needed, the application should call the stop
(blocking) or stopAsync (non-blocking) method to disconnect from the broker. Note that
the session can be restarted with a call to start or startAsync once it has been
fully stopped. To fully shut down the session linger should be called after stop, and
after that the session restart is not possible any more.
Session will
automatically try to reconnect periodically. The Session will also notify the application
of the event of losing the connection via SessionEvent.ConnectionLost
session event.
Once the connection has been re-established with the broker (as a result of one of the
periodic reconnection attempts), the Session will notify the application via SessionEvent.Reconnected session event. After the connection re-establishment,
the Session will attempt to reopen the queues that were in OPEN state prior to
connection loss. The Session will notify the application of the result of reopen
operation via QueueControlEvent.ReopenQueueResult for each queue. Note
that a reopen operation on a queue may fail (due to broker issue, machine issue, etc), so the
application must keep track on these session events, and stop posting on a queue that failed to
reopen.
After all reopen operations are complete and application has been notified with all ReopenQueueResult events, the Session delivers a SessionEvent.StateRestored session event to the application.
Note that if there were no opened queues prior to connection loss then the Session
will send StateRestored event immediately after Reconnected session event.
Session start it, stop and shutdown it.
void runSession() {
SessionOptions sessionOptions = SessionOptions.builder()
.setBrokerUri(URI.create(tcp://localhost:30114")
.build();
AbstractSession session = new Session(sessionOptions, null);
try {
session.start(Duration.ofSeconds(10));
System.out.println("Session started");
// Open queue in READ or WRITE or READ/WRITE mode, and receive or
// post messages, etc.
session.stop(Duration.ofSeconds(10));
} catch (BMQException e) {
System.out.println("Operation error: " + e);
} finally {
session.linger();
}
}
This example can be simplified because the constructor for Session uses a default SessionOptions object that will connect to the local broker service. The example may be
rewritten as follow:
void runSession() {
AbstractSession session = new Session(null);
try {
session.start(Duration.ofSeconds(10));
System.out.println("Session started");
// Open queue in READ or WRITE or READ/WRITE mode, and receive or
// post messages, etc.
session.stop(Duration.ofSeconds(10));
} catch (BMQException e) {
System.out.println("Operation error: " + e);
} finally {
session.linger();
}
}
Session is created with non null SessionEventHandler
then it will be used to notify the application about session events like results of Session.startAsync() and Session.stopAsync() operations, and connection statuses like
SessionEvent.ConnectionLost, SessionEvent.Reconnected, SessionEvent.StateRestored. Each event may be handled in its special callback if the application
implements related method, e.g. SessionEventHandler.handleStartStatusSessionEvent(), or it can use the default implementation
that calls SessionEventHandler.handleSessionEvent(), for each session event without user defined handler.
SessionEventHandler
Session session = new Session(new SessionEventHandler() {
public void handleSessionEvent(SessionEvent event) {
switch (event.type()) {
case START_STATUS_SESSION_EVENT:
GenericResult result = ((SessionEvent.StartStatus)event).result();
if (result.isSuccess()) {
System.out.println("The connection to the broker is established");
openQueues();
startPostingToQueues();
} else if (result.isTimeout()) {
System.out.println("The connection to the broker has timed out");
}
break;
case STOP_STATUS_SESSION_EVENT:
GenericResult result = ((SessionEvent.StopStatus)event).result();
if (result.isSuccess()) {
System.out.println("The connection to the broker is closed gracefully");
}
break;
case CONNECTION_LOST_SESSION_EVENT:
// The connection to the broker dropped. Stop posting to the queue.
stopPostingToQueues();
break;
case RECONNECTED_SESSION_EVENT:
// The connection to the broker has been restored, but the queue may not be ready yet.
break;
case STATE_RESTORED_SESSION_EVENT:
// The connection to the broker has been restored and all queues
// have been re-opened. Resume posting to the queue.
resumePostingToQueues();
break;
cae UNKNOWN_SESSION_EVENT:
default:
System.out.println("Unexpected session event: " + event);
}
}
});
session.startAsync(Duration.ofSeconds(10));
And in the following version session start status event will be handled in a separate callback
while all the other session events will be passed to handleSessionEvent
Session session = new Session( new SessionEventHandler() {
public void handleSessionEvent(SessionEvent event) {
System.out.println("Other session event: " + event);
}
public void handleStartStatusSessionEvent(SessionEvent.StartStatus event) {
System.out.println("Session start status: " + event.result());
if (event.result().isSuccess()) {
System.out.println("The connection to the broker is established");
}
}
});
session.startAsync(Duration.ofSeconds(10));
Note that after the Session is associated with some session event handler, this
association cannot be changed or canceled. The event handler will be used for processing events
until the Session.linger() is called.
Session has been created and started, the application can use it to open queues
for producing and/or consuming messages. A queue is associated with a domain. Domain metadata
must be deployed in the BlazingMQ infrastructure prior to opening queues under that domain,
because opening a queue actually loads the metadata deployed for the associated domain.
The metadata associated with a domain defines various parameters like maximum queue size and capacity, persistent policy, routing policy, etc.
Queue are identified by URIs (Unified Resource Identifiers) that must follow the BlazingMQ
syntax, manipulated as Uri objects. A queue URI is typically formatted
as follows:
bmq://my.domain/my.queueNote that domain names are unique in BlazingMQ infrastructure, which makes a fully qualified queue URI unique too.
Queues in BlazingMQ infrastructure are created by applications on demand. Broker creates a queue when it receives an open-queue request from an application for a queue that does not exist currently.
Application can open a queue by calling AbstractSession.getQueue() method on a started session. Application must pass appropriate flags
to indicate if it wants to post messages to queue, consume messages from the queue, or both. Also
it has to provide concrete QueueEventHandler to recieve queue related
events. Depending on the queue flags it also needs to supply concrete AckMessageHandler it the queue will be used for message posting. This handler
will be called each time when ACK event comes from the broker. And if the queue will be
used for comsuming the messages then PushMessageHandler should be
supplied. It will be called on each incoming PushMessage. Once the
application obtainns the Queue interface it can call Queue.open() or Queue.openAsync().
Note that open is a blocking method, and returns after the queue has been successfully
opened (success) or after specified timeout has expired (failure). openAsync method, as
the name suggests, is non blocking, and the result of the operation is notified via QueueControlEvent.OpenQueueResult session event.
Queue.createPutMessage(java.nio.ByteBuffer...)
to create a message and post to the queue.
// Session creation, startup logic and exception handling elided for brevity
Uri queueUri = new Uri("bmq://my.domain/my.queue");
// Application can implement concrete QueueMessageHandler and AckMessageHandler,
// or e.g. use a functional interface feature with a standard collection.
ArrayList<QueueControlEvent> queueEvents = new ArrayList();
ArrayList<AckMessage> ackMessages = new ArrayList();
long flags = QueueFlags.setWriter(0);
flags = QueueFlags.setAck(flags);
Queue myQueue = session.getQueue(
queueUri,
flags, // Setup queue flag for producer
queueEvents::add, // QueueEventHandler is a functional interface
ackMessages::add, // AckMessageHandler is a functional interface
null); // PushMessageHandler is null since we don't want
// to receive messages
myQueue.open(QueueOptions.createDefault(), Duration.ofSeconds(5));
Note that apart from QueueFlags.WRITE flag, QueueFlags.ACK flag has been passed to getQueue method above. This
indicates that application is interested in receiving ACK notification for each message
it posts to the queue, irrespective of whether or not the message was successfully received by
the broker and posted to the queue. Also note that if the queue has QueueFlags.ACK} flag
then every PUT message that is going to be posted via this queue should have a valid
CorrelationId, see Queue.createPutMessage(java.nio.ByteBuffer...).
Once the queue has been successfully opened for writing, messages can be posted to the queue for consumption by interested applications.
// Prepare user payload
String userMessage = "Hello!";
ByteBuffer byteBuffer = ByteBuffer.wrap(userMessage.getBytes());
// Get PUT message interface from the opened queue
PutMessage msg = myQueue.createPutMessage(byteBuffer);
// Add unique correlation ID. The broker will send ACK in response to this message.
// The ID can be used to bind this message with the corresponding ACK.
msg.setAutoCorrelationId();
// Add some optional properties
MessageProperties mp = msg.messageProperties();
mp.setPropertyAsString("myId", "abcd-efgh-ijkl");
mp.setPropertyAsInt64("timestamp", 123456789L);
// Post this message to the queue
myQueue.post(msg);
To post multiple messages at once the application may use Queue.pack() and Queue.flush() methods:
for (int i = 0; i < 10; i++) {
PutMessage msg = myQueue.createPutMessage(payload);
// ... Set CorrelationID message properties
myQueue.pack(msg);
}
// Now post the messages all together
myQueue.flush();
Queue.close() or Queue.closeAsync() method. Note that closing a queue closes
an application's "view" on the queue, and may not lead to queue deletion in the broker. A Session does not expose any method to explicitly delete a queue.
Note that close is a blocking method and returns after the queue has been successfully
closed (success) or after specified timeout has expired (failure). closeAsync, as the
name suggests, is a non-blocking method, and result of the operation is notified via QueueControlEvent.CloseQueueResult queue event.
| Constructor and Description |
|---|
Session(SessionEventHandler eh)
Creates a new session object with default session options.
|
Session(SessionOptions so,
SessionEventHandler eh)
Creates a new session object.
|
| Modifier and Type | Method and Description |
|---|---|
Queue |
getQueue(Uri uri,
long flags,
QueueEventHandler handler,
AckMessageHandler ackHandler,
PushMessageHandler pushHandler)
Creates a queue representation.
|
boolean |
isStarted()
Checks if the session is started.
|
void |
linger()
Shutdowns all connection and event handling threads.
|
void |
start(Duration timeout)
Connects to the BlazingMQ broker and start the message processing.
|
void |
startAsync(Duration timeout)
Connects to the BlazingMQ broker and start the message processing
|
void |
stop(Duration timeout)
Gracefully disconnects from the BlazingMQ broker and stop the operation of this session.
|
void |
stopAsync(Duration timeout)
Gracefully disconnects from the BlazingMQ broker and stop the operation of this session.
|
public Session(SessionEventHandler eh)
eh - session event handler callback interfaceSessionOptions,
SessionEventHandlerpublic Session(SessionOptions so, SessionEventHandler eh)
so - specified optionseh - session event handler callback interfaceIllegalArgumentException - if the specified session options or event handler is nullSessionOptions,
SessionEventHandlerpublic void start(Duration timeout)
This method blocks until either the session is connected to the broker, fails to connect, or the operation times out.
start in interface AbstractSessiontimeout - start timeout valueBMQException - if start attempt failedpublic void startAsync(Duration timeout)
This method returns without blocking. The result of the operation is communicated with a session event. If the optionally specified 'timeout' is not populated, use the one defined in the session options.
startAsync in interface AbstractSessiontimeout - start timeout valueBMQException - in case of failurepublic boolean isStarted()
isStarted in interface AbstractSessionpublic void stop(Duration timeout)
This method blocks waiting for all already invoked event handlers to exit and all session-related operations to be finished.
stop in interface AbstractSessiontimeout - stop timeout valueBMQException - if stop attempt failedpublic void stopAsync(Duration timeout)
This method returns without blocking. The result of the operation is communicated with a
session event. If the optionally specified timeout is not populated, use the one
defined in the session options.
stopAsync in interface AbstractSessiontimeout - stop timeout valueBMQException - in case of failurepublic void linger()
This method must be called when the session is stopped. No other method may be used after this method returns.
linger in interface AbstractSessionpublic Queue getQueue(Uri uri, long flags, QueueEventHandler handler, AckMessageHandler ackHandler, PushMessageHandler pushHandler)
Returned Queue may be in any state.
getQueue in interface AbstractSessionuri - URI of this queue, immutableflags - a combination of the values defined in QueueFlagshandler - queue event handler callback interface, must be non-null for async operations
and health sensitive queuesackHandler - callback handler for incoming ACK events, can be null if only consumerpushHandler - callback handler for incoming PUSH events, can be null if only producerIllegalArgumentException - in case of a wrong combination of the queue flags and the
handlersBMQException - in case of other failuresUri,
QueueFlags,
QueueEventHandler,
AckMessageHandler,
PushMessageHandlerCopyright © 2023 Bloomberg L.P.. All rights reserved.