// Copyright 2014-2023 Bloomberg Finance L.P. // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // bmqa_message.h -*-C++-*- #ifndef INCLUDED_BMQA_MESSAGE #define INCLUDED_BMQA_MESSAGE //@PURPOSE: Provide the application with a message data object. // //@CLASSES: // bmqa::Message: message received from/sent to a queue // bmqa::MessageConfirmationCookie: cookie for async message confirmation // //@SEE_ALSO: // bmqa::MessageEvent : Mechanism for data event notification. // bmqa::MessageEventBuilder: Builder for 'bmqa::MessageEvent' // //@DESCRIPTION: A 'bmqa::Message' represents the data message to put on a // queue, or retrieved from a queue. It is composed of the following fields: // //: o A message GUID, which is a printable string generated by the broker to //: uniquely identify this message. //: //: o A correlation Id, which is a user-provided identifier for the message. //: //: o A queue Id, to map with the queue this message is associated with. //: //: o The payload, which is opaque to the framework. At some point, framework //: may provide utilities to encode and decode schema messages using various //: CODECs. For example, a JavaScript publisher may publish a message into //: a queue using a JSON object as payload, and the consumer may be receiving //: that payload as a BER-encoded schema object. //: //: o At some point, system properties such as version, encoding, timestamp, //: priority, message group and "reply-to" will be supported. System //: properties will be used by the broker; for example to deliver //: high-priority messages first or to filter based on a minimum version. //: //: o At some point, application-defined message properties will also be //: supported, where properties are a list of name-value pairs. // // A 'bmqa::MessageConfirmationCookie' is a small object which allows to // confirm a 'bmqa::Message' asynchronously without having to hold on to the // entire message. This can be useful when, for example, the message is // decoded in the event handler, and the resulting object is enqueued for // asynchronous processing, along with that small cookie object for confirming // the message once successfully processed. // BMQ #include <bmqscm_version.h> #include <bmqa_queueid.h> #include <bmqt_compressionalgorithmtype.h> #include <bmqt_correlationid.h> #include <bmqt_messageguid.h> // BDE #include <bdlbb_blob.h> #include <bsl_cstddef.h> // for 'size_t' #include <bsl_iosfwd.h> #include <bsl_memory.h> #include <bsl_string.h> #include <bslma_allocator.h> #include <bsls_annotation.h> namespace BloombergLP { // FORWARD DECLARATION namespace bmqimp { class Event; } namespace bmqa { // FORWARD DECLARATION class MessageProperties; // ================== // struct MessageImpl // ================== struct MessageImpl { // Struct containing the internal (private) members of Message (That is so // that we can access private members of Message to initialize it, without // having to expose them publicly). // // IMPLEMENTATION NOTE: If adding new data members to this struct that are // lazily populated (such as 'queueId' or // 'correlationId'), then they should be reset in // 'bmqa::MessageIterator.nextMessage()'. // PUBLIC DATA bmqimp::Event *d_event_p; // Pointer to the Event this message is // associated with bsl::shared_ptr<bmqimp::Event> d_clonedEvent_sp; // May point to a bmqimp::Event (in case // this Message is a clone) bmqa::QueueId d_queueId; // QueueId this message is associated // with bmqt::CorrelationId d_correlationId; // CorrelationId this message is // associated with #ifdef BMQ_ENABLE_MSG_GROUPID bsl::string d_groupId; // Optional Group Id this message is // associated with #endif }; // =============================== // class MessageConfirmationCookie // =============================== class MessageConfirmationCookie { // Cookie for async message confirmation. private: // DATA bmqa::QueueId d_queueId; // QueueID associated to this cookie bmqt::MessageGUID d_guid; // GUID associated to this cookie public: // CREATORS MessageConfirmationCookie(); // Create an unset instance of this class. MessageConfirmationCookie(const QueueId& queueId, const bmqt::MessageGUID& messageGUID); // Create an instance with the specified 'queueId' and 'messageGUID'. // Users should not use that constructor directly, but rather load the // message cookie from an existing 'bmqa::Message' with the // 'bmqa::Message::confirmationCookie' accessor. // ACCESSORS const QueueId& queueId() const; // Return the queue ID of the message with which this confirmation // cookie is associated. const bmqt::MessageGUID& messageGUID() const; // Return message GUID of the message with which this confirmation // cookie is associated. }; // ============= // class Message // ============= class Message { // A message sent/received to/from the BlazingMQ broker. #ifdef BMQ_ENABLE_MSG_GROUPID public: // CONSTANTS static const int k_GROUP_ID_MAX_LENGTH = 31; // Constant representing the maximum length of a Group Id string. #endif private: // DATA mutable MessageImpl d_impl; // pimpl private: // PRIVATE ACCESSORS bool isInitialized() const; // Return true if the message has been initialized with an underlying // event (and thus is valid), and false otherwise. Any operation // except assignment or destruction on an uninitialized message is an // error. public: // CREATORS explicit Message(); // Create an invalid message having no content. Only valid operations // on an invalid message instance are assignment and destruction. //! Message(const Message& other) = default; // Create a message from the specified 'other' instance. //! ~Message() = default; // Destructor // MANIPULATORS //! Message& operator=(const Message& other) = default; // Assign to this object the value of the specified 'other' instance // and return a reference to this object. Message& setData(const bdlbb::Blob *data) BSLS_ANNOTATION_DEPRECATED; // Set the payload of this message to the blob pointed to by the // specified 'data'. Behavior is undefined unless 'data' is non-null // and payload's length is greater than zero. Note that payload // pointed to by 'data' is *not* copied right away, and should not be // destroyed or modified until this message has been packed (see // 'bmqa::MessageEventBuilder' component level documentation for // correct usage). // // This method is deprecated, please use 'setDataRef()' instead. Message& setData(const char *data,size_t length) BSLS_ANNOTATION_DEPRECATED; // Set the payload of this message to the specified 'length' bytes // starting at the specified 'data' address. The behavior is undefined // unless 'data' is non-null and 'length' is greater than zero. Note // that payload pointed to by 'data' is *not* copied right away, and // should not be destroyed or modified until this message has been // packed (see 'bmqa::MessageEventBuilder' component level // documentation for correct usage). // // This method is deprecated, please use 'setDataRef()' instead. Message& setDataRef(const bdlbb::Blob *data); // Set the payload of this message to the blob pointed to by the // specified 'data'. Behavior is undefined unless 'data' is non-null // and payload's length is greater than zero. Note that payload // pointed to by 'data' is *not* copied right away, and should not be // destroyed or modified until this message has been packed (see // 'bmqa::MessageEventBuilder' component level documentation for // correct usage). Message& setDataRef(const char *data, size_t length); // Set the payload of this message to the specified 'length' bytes // starting at the specified 'data' address. The behavior is undefined // unless 'data' is non-null and 'length' is greater than zero. Note // that payload pointed to by 'data' is *not* copied right away, and // should not be destroyed or modified until this message has been // packed (see 'bmqa::MessageEventBuilder' component level // documentation for correct usage). Message& setPropertiesRef(const MessageProperties *properties); // Set the properties of this message to the 'MessageProperties' // instance pointed by the specified 'properties'. Behavior is // undefined unless 'properties' is non-null. Note that properties are // *not* copied right away, and should not be destroyed or modified // until this message has been packed (see 'bmqa::MessageEventBuilder' // component level documentation for correct usage). Message& clearPropertiesRef(); // Clear out and properties associated with this message. Note that if // there are no properties associated with this message, this method // has no effect. Also note that the associated 'MessageProperties' // instance, if any, is not modified; it's simply dissociated from this // message. Message& setCorrelationId(const bmqt::CorrelationId& correlationId); // Set correlation ID of this message to the specified 'correlationId'. Message& setCompressionAlgorithmType(bmqt::CompressionAlgorithmType::Enum value); // Set the Compression algorithm type of the current message to the // specified 'value' and return a reference offering modifiable access // to this object. #ifdef BMQ_ENABLE_MSG_GROUPID Message& setGroupId(const bsl::string& groupId); // Set Group Id of this message to the specified 'groupId'. The // 'groupId' must be a null-terminated string with up to // 'Message::k_GROUP_ID_MAX_LENGTH' characters (excluding the // terminating '\0'). The behavior is undefined if the message is not // a started, 'PUT' message or if 'groupId' is empty. Message& clearGroupId(); // Clear the Group Id of this message. The behavior is undefined if // the message is not a started, 'PUT' message. #endif // ACCESSORS bool isValid() const; // TBD:BSLS_ANNOTATION_DEPRECATED // Return true if the message is valid, and false otherwise. Any // operation except assignment or destruction on an invalid message is // an error. // // This method is deprecated. operator bool() const; // TBD:BSLS_ANNOTATION_DEPRECATED // Same as 'isValid'. // // This method is deprecated. Message clone(bslma::Allocator *basicAllocator = 0) const; // Return a copy of this message, using the optionally specified // 'basicAllocator' with the copy holding all the data of this instance // and not backed by any 'MessageEvent'. Note that this operation // does *not* copy underlying data. const bmqa::QueueId& queueId() const; // Return the queue ID indicating the queue for which this message has // been received from. The behavior is undefined unless this instance // represents a 'PUT', 'PUSH' or 'ACK' message. const bmqt::CorrelationId& correlationId() const; // Return the correlation Id associated with this message. The // behavior is undefined unless this instance represents a 'PUT' or an // 'ACK', or a 'PUSH' message. Note that in case of failure to accept // a 'PUT' message, BlazingMQ sends an 'ACK' message with failed // status, even if an 'ACK' message was not requested by the // application (i.e., no correlationId was specified when posting the // message). In such cases, correlationId associated with the 'ACK' // message will be unset, and as such, application *must* check for // that by invoking 'isUnset' on the returned correlationId object. In // the case of a 'PUSH' message, the return value is the one specified // as the correlation id of the corresponding Subscription. Invoking // 'thePointer', 'theNumeric' or 'theSharedPtr' on an unset // correlationId instance will lead to undefined behavior. bmqt::CompressionAlgorithmType::Enum compressionAlgorithmType() const; // Return Compression algorithm type of the current message. Behavior // is undefined unless this instance represents a 'PUT' or 'PUSH' // message. #ifdef BMQ_ENABLE_MSG_GROUPID const bsl::string& groupId() const; // Return the Group Id associated with this message. The behavior is // undefined unless this instance represents a 'PUT' or a 'PUSH' // message and 'hasGroupId()' returns 'true". #endif const bmqt::MessageGUID& messageGUID() const; // Return the unique message Id generated by the SDK for this message. // The behavior is undefined unless this instance represents a 'PUT', // a 'PUSH' or an 'ACK' message. MessageConfirmationCookie confirmationCookie() const; // Return a cookie which can be used to confirm this message. The // behavior is undefined unless this instance represents a 'PUSH' // message. int ackStatus() const; // Return the status of the 'ACK' message. The behavior is undefined // unless this instance represents an 'ACK' message. This value // correspond to the 'bmqt::AckResult::Enum' enum. int getData(bdlbb::Blob *blob) const; // Load into the specified 'blob' the payload of the message, if any. // Return zero if the message has a payload and non-zero value // otherwise. The behaviour is undefined unless this instance // represents a 'PUT' or 'PUSH' message. Note that for efficiency, // application should fetch payload once and cache the value, instead // of invoking this method multiple times on a message. int dataSize() const; // Return the number of bytes in the payload. The behaviour is // undefined unless this instance represents a 'PUT' or a 'PUSH' // message. Note that for efficiency, application should fetch payload // size once and cache the value, instead of invoking this method // multiple times on a message. bool hasProperties() const; // Return 'true' if this instance has at least one message property // associated with it, 'false' otherwise. Behavior is undefined unless // this instance represents a 'PUT' or a 'PUSH' message. #ifdef BMQ_ENABLE_MSG_GROUPID bool hasGroupId() const; // Return whether this message has an associated GroupId set. The // behavior is undefined unless this instance represents a 'PUT' or a // 'PUSH' message. #endif int loadProperties(MessageProperties *buffer) const; // Load into the specified 'buffer' the properties associated with this // message. Return zero on success, and a non-zero value otherwise. // Behavior is undefined unless this instance represents a 'PUT' or a // 'PUSH' message, and unless 'buffer' is non-null. Note that if there // are no properties associated with this message, zero will be // returned and the 'MessageProperties' instance pointed by 'buffer' // will be cleared. Also note that for efficiency, application should // fetch properties once and cache the value, instead of invoking this // method multiple times on a message. bsl::ostream& print(bsl::ostream& stream, int level = 0, int spacesPerLevel = 4) const; // Format this object to the specified output 'stream' at the (absolute // value of) the optionally specified indentation 'level' and return a // reference to 'stream'. If 'level' is specified, optionally specify // 'spacesPerLevel', the number of spaces per indentation level for // this and all of its nested objects. If 'level' is negative, // suppress indentation of the first line. If 'spacesPerLevel' is // negative format the entire output on one line, suppressing all but // the initial indentation (as governed by 'level'). If 'stream' is // not valid on entry, this operation has no effect. }; // FREE OPERATORS bsl::ostream& operator<<(bsl::ostream& stream, const Message& rhs); // Format the specified 'rhs' to the specified output 'stream' and return a // reference to the modifiable 'stream'. // ============================================================================ // INLINE DEFINITIONS // ============================================================================ // ------------- // class Message // ------------- inline Message& Message::setData(const bdlbb::Blob *data) { return setDataRef(data); } inline Message& Message::setData(const char *data, size_t length) { return setDataRef(data, length); } inline Message::operator bool() const { return isInitialized(); } // ------------------------------- // class MessageConfirmationCookie // ------------------------------- inline MessageConfirmationCookie::MessageConfirmationCookie() : d_queueId() , d_guid() { // NOTHING } inline MessageConfirmationCookie::MessageConfirmationCookie( const bmqa::QueueId& queueId, const bmqt::MessageGUID& messageGUID) : d_queueId(queueId) , d_guid(messageGUID) { // NOTHING } // ACCESSORS inline const QueueId& MessageConfirmationCookie::queueId() const { return d_queueId; } inline const bmqt::MessageGUID& MessageConfirmationCookie::messageGUID() const { return d_guid; } } // close package namespace // ------------- // class Message // ------------- // FREE OPERATORS inline bsl::ostream& bmqa::operator<<(bsl::ostream& stream, const bmqa::Message& rhs) { return rhs.print(stream, 0, -1); } } // close enterprise namespace #endif