This component implements a mechanism, bmqa::MessageEventBuilder, that can be used for creating message events containing one or multiple messages. The resulting MessageEvent can be sent to the BlazingMQ broker using the bmqa::Session (refer to bmqa_session.h for details).
The builder holds a bmqa::MessageEvent under construction, and provides methods to return a reference to the current message (in order to set its various members), as well as to append a new message. Once the application is done, the builder provides a method to get the message event that has been constructed. See Usage section for more details.
Note that publishing events containing multiple messages may be more efficient than events limited to a single message for applications that send large volume of small messages.
Usage
- An instance of bmqa::MessageEventBuilder (the builder) can be used to create multiple bmqa::MessageEvent's, as long as the instance is reset in between. This reset is preferably to do right after sending the event to guarantee that all user resources bound to the bmqa::MessageEvent (e.g. CorrelationIds with user's shared_ptr) are kept no longer than expected (e.g. until related ACK event is received). The recommended approach is to create one instance of the builder and use that throughout the lifetime of the task (if the task is multi-threaded, an instance per thread must be created and maintained). See usage example 1 for an illustration.
- The lifetime of an instance of the builder is bound by the bmqa::Session from which it was created. In other words, bmqa::Session instance must outlive the builder instance.
- If it is desired to post the same bmqa::Message to different queues, bmqa::MessageEventBuilder::packMessage can be called multiple times in a row with different queue IDs. The builder will append the previously packed message with the new queue ID to the underlying message event. Note that after calling bmqa::MessageEventBuilder::packMessage, the message keeps all the attributes (payload, properties, etc) that were previously set (except for the
correlationId
which must be set explicitly for each individual message). If desired, any attribute can be tweaked before being packing the message again. Refer to usage example 2 for an illustration.
Example 1 - Basic Usage
bmqa::Session session;
bmqa::MessageProperties properties;
session.loadMessageProperties(&properties);
int rc = properties.setPropertyAsChar(
"encoding",
static_cast<char>(MyEncodingEnum::e_BER));
rc = properties.setPropertyAsString("producerId", "MyUniqueId");
bmqa::MessageEventBuilder builder;
session.loadMessageEventBuilder(&builder);
bmqa::Message& msg = builder.startMessage();
msg.setCorrelationId(myCorrelationId);
msg.setDataRef(&myPayload);
rc = properties.setPropertyAsInt64(
"timestamp",
bdlt::EpochUtil::convertToTimeT64(bdlt::CurrentTime::now()));
msg.setPropertiesRef(&properties);
rc = builder.packMessage(myQueueId);
rc = session.post(builder.messageEvent());
builder.reset();
bmqa::Message& msg = builder.startMessage();
msg.setCorrelationId(myAnotherCorrelationId);
msg.setDataRef(&myAnotherPayload);
rc = properties.setPropertyAsInt64(
"timestamp",
bdlt::EpochUtil::convertToTimeT64(bdlt::CurrentTime::now()));
msg.setPropertiesRef(&properties);
rc = builder.packMessage(myAnotherQueueId);
rc = session.post(builder.messageEvent());
builder.reset();
Example 2 - Packing multiple messages in a message event
bmqa::Session session;
bmqa::MessageProperties properties;
session.loadMessageProperties(&properties);
int rc = properties.setPropertyAsChar(
"encoding",
static_cast<char>(MyEncodingEnum::e_BER));
rc = properties.setPropertyAsString("producerId", "MyUniqueId");
bmqa::MessageEventBuilder builder;
session.loadMessageEventBuilder(&builder);
bmqa::Message& msg = builder.startMessage();
msg.setCorrelationId(correlationId1);
msg.setDataRef(&payload1);
int rc = properties.setPropertyAsInt64(
"timestamp",
bdlt::EpochUtil::convertToTimeT64(bdlt::CurrentTime::now()));
rc = builder.packMessage(queueId1);
msg.setCorrelationId(correlationId2);
rc = properties.setPropertyAsInt64(
"timestamp",
bdlt::EpochUtil::convertToTimeT64(bdlt::CurrentTime::now()));
rc = builder.packMessage(queueId2);
msg.setCorrelationId(correlationId3);
msg.setDataRef(&payload2);
msg.clearProperties();
rc = builder.packMessage(queueId1);
msg.setCorrelationId(correlationId4);
msg.setDataRef(&payload3);
rc = properties.setPropertyAsInt64(
"timestamp",
bdlt::EpochUtil::convertToTimeT64(bdlt::CurrentTime::now()));
msg.setPropertiesRef(&properties);
rc = builder.packMessage(queueId3);
rc = session.post(builder.messageEvent());
builder.reset();
Thread Safety
This component is NOT thread safe.