// Copyright 2022-2023 Bloomberg Finance L.P. // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // bmqt_subscription.h -*-C++-*- #ifndef INCLUDED_BMQT_SUBSCRIPTION #define INCLUDED_BMQT_SUBSCRIPTION //@PURPOSE: Provide a value-semantic types for subscription related API. // //@CLASSES: // bmqt::Subscription_Handle: uniquely identifies Subscription // bmqt::Subscription_Expression: Subscription criteria // bmqt::Subscription: Subscription parameters // //@DESCRIPTION: 'bmqt::Subscription' provides a value-semantic type carried // by 'bmqt::QueueOptions', when opening and configuring a queue. // //@NOTE: Experimental. Do not use until this feature is announced. // // BMQ #include <bmqscm_version.h> #include <bmqt_correlationid.h> //BDE #include <bsl_optional.h> #include <bsl_ostream.h> #include <bsl_string.h> #include <bslim_printer.h> #include <bslma_allocator.h> #include <bsls_types.h> namespace BloombergLP { namespace bmqt { class QueueOptions; // ========================= // class Subscription_Handle // ========================= class SubscriptionHandle { // Value-semantic type for unique Subscription id. private: // PRIVATE DATA unsigned int d_id; // Internal, unique key const bmqt::CorrelationId d_correlationId; // User-specified, not required to be // unique private: // PRIVATE CLASS METHODS static unsigned int nextId(); public: // CREATORS SubscriptionHandle(const bmqt::CorrelationId& cid); SubscriptionHandle(const SubscriptionHandle& other); // ACCESSORS const bmqt::CorrelationId& correlationId() const; unsigned int id() const; bsl::ostream& print(bsl::ostream& stream, int level = 0, int spacesPerLevel = 4) const; // Format this object to the specified output 'stream' at the (absolute // value of) the optionally specified indentation 'level' and return a // reference to 'stream'. If 'level' is specified, optionally specify // 'spacesPerLevel', the number of spaces per indentation level for // this and all of its nested objects. If 'level' is negative, // suppress indentation of the first line. If 'spacesPerLevel' is // negative format the entire output on one line, suppressing all but // the initial indentation (as governed by 'level'). If 'stream' is // not valid on entry, this operation has no effect. // FRIENDS friend bool operator<(const SubscriptionHandle& lhs, const SubscriptionHandle& rhs); friend bool operator==(const SubscriptionHandle& lhs, const SubscriptionHandle& rhs); friend bool operator!=(const SubscriptionHandle& lhs, const SubscriptionHandle& rhs); template <class HASH_ALGORITHM> friend void hashAppend(HASH_ALGORITHM& hashAlgo, const SubscriptionHandle& id); // Will only consider 'd_id' field when comparing 'lhs' and 'rhs' }; class SubscriptionExpression { // Value-semantic type to carry Subscription criteria. public: // TYPES enum Enum { // Enum representing criteria format e_NONE = 0 // EMPTY , e_VERSION_1 = 1 // Simple Evaluator }; private: bsl::string d_expression; // e.g., "firmId == foo" Enum d_version; // Required to support newer style // of expressions in future public: // CREATORS SubscriptionExpression(); SubscriptionExpression(const bsl::string& expression, Enum version); // ACCESSORS const bsl::string& text() const; Enum version() const; bool isValid() const; bsl::ostream& print(bsl::ostream& stream, int level = 0, int spacesPerLevel = 4) const; // Format this object to the specified output 'stream' at the (absolute // value of) the optionally specified indentation 'level' and return a // reference to 'stream'. If 'level' is specified, optionally specify // 'spacesPerLevel', the number of spaces per indentation level for // this and all of its nested objects. If 'level' is negative, // suppress indentation of the first line. If 'spacesPerLevel' is // negative format the entire output on one line, suppressing all but // the initial indentation (as governed by 'level'). If 'stream' is // not valid on entry, this operation has no effect. }; class Subscription { // Value-semantic type to carry Subscription parameters. public: // PUBLIC CONSTANTS static const int k_CONSUMER_PRIORITY_MIN; // Constant representing the minimum // valid consumer priority static const int k_CONSUMER_PRIORITY_MAX; // Constant representing the maximum // valid consumer priority static const int k_DEFAULT_MAX_UNCONFIRMED_MESSAGES; static const int k_DEFAULT_MAX_UNCONFIRMED_BYTES; static const int k_DEFAULT_CONSUMER_PRIORITY; private: // PRIVATE DATA bsl::optional<int> d_maxUnconfirmedMessages; // Maximum number of outstanding // messages that can be sent by the // broker without being confirmed. bsl::optional<int> d_maxUnconfirmedBytes; // Maximum accumulated bytes of all // outstanding messages that can be // sent by the broker without being // confirmed. bsl::optional<int> d_consumerPriority; // Priority of a consumer with respect // to delivery of messages SubscriptionExpression d_expression; public: // CREATORS Subscription(); // Create a new Subscription Subscription(const Subscription& other); // Create a new Subscription by copying values from the specified // 'other'. // MANIPULATORS Subscription& setMaxUnconfirmedMessages(int value); // Set the maxUnconfirmedMessages to the specified 'value'. The // behavior is undefined unless 'value >= 0'. If the specified 'value' // is set to 0, it means that the consumer does not receive any // messages. This might be useful when the consumer is shutting down // and wants to process only pending messages, or when it is unable to // process new messages because of transient issues. Subscription& setMaxUnconfirmedBytes(int value); // Set the maxUnconfirmedBytes to the specified 'value'. The behavior // is undefined unless 'value >= 0'. Subscription& setConsumerPriority(int value); // Set the consumerPriority to the specified 'value'. The behavior is // undefined unless 'k_CONSUMER_PRIORITY_MIN <= value <= // k_CONSUMER_PRIORITY_MAX' Subscription& setExpression(const SubscriptionExpression& value); // ACCESSORS int maxUnconfirmedMessages() const; // Get the number for the maxUnconfirmedMessages parameter. int maxUnconfirmedBytes() const; // Get the number for the maxUnconfirmedBytes parameter. int consumerPriority() const; // Get the number for the consumerPriority parameter. const SubscriptionExpression& expression() const; bool hasMaxUnconfirmedMessages() const; // Returns whether 'maxUnconfirmedMessages' has been set for this // object, or whether it implicitly holds // 'k_DEFAULT_MAX_UNCONFIRMED_MESSAGES'. bool hasMaxUnconfirmedBytes() const; // Returns whether 'maxUnconfirmedBytes' has been set for this object, // or whether it implicitly holds 'k_DEFAULT_MAX_UNCONFIRMED_BYTES'. bool hasConsumerPriority() const; // Returns whether 'consumerPriority' has been set for this object, or // whether it implicitly holds 'k_DEFAULT_CONSUMER_PRIORITY'. bsl::ostream& print(bsl::ostream& stream, int level = 0, int spacesPerLevel = 4) const; // Format this object to the specified output 'stream' at the (absolute // value of) the optionally specified indentation 'level' and return a // reference to 'stream'. If 'level' is specified, optionally specify // 'spacesPerLevel', the number of spaces per indentation level for // this and all of its nested objects. If 'level' is negative, // suppress indentation of the first line. If 'spacesPerLevel' is // negative format the entire output on one line, suppressing all but // the initial indentation (as governed by 'level'). If 'stream' is // not valid on entry, this operation has no effect. }; // FREE OPERATORS bsl::ostream& operator<<(bsl::ostream& stream, const Subscription& rhs); // Format the specified 'rhs' to the specified output 'stream' and return a // reference to the modifiable 'stream'. // ============================================================================ // INLINE DEFINITIONS // ============================================================================ // ---------------------- // class Subscription_Handle // ---------------------- inline SubscriptionHandle::SubscriptionHandle(const bmqt::CorrelationId& cid) : d_id(nextId()) , d_correlationId(cid) { // NOTHING } inline SubscriptionHandle::SubscriptionHandle(const SubscriptionHandle& other) : d_id(other.d_id) , d_correlationId(other.d_correlationId) { // NOTHING } // ACCESSORS inline const bmqt::CorrelationId& SubscriptionHandle::correlationId() const { return d_correlationId; } inline unsigned int SubscriptionHandle::id() const { return d_id; } inline bsl::ostream& SubscriptionHandle::print(bsl::ostream& stream, int level, int spacesPerLevel) const { if (stream.bad()) { return stream; // RETURN } bslim::Printer printer(&stream, level, spacesPerLevel); printer.start(); printer.printAttribute("id", d_id); printer.printAttribute("CorrelationId", d_correlationId); printer.end(); return stream; } // ---------------------------- // class SubscriptionExpression // ---------------------------- inline SubscriptionExpression::SubscriptionExpression() : d_expression() , d_version(e_NONE) { // NOTHING } inline SubscriptionExpression::SubscriptionExpression(const bsl::string& expression, Enum version) : d_expression(expression) , d_version(version) { // NOTHING } inline const bsl::string& SubscriptionExpression::text() const { return d_expression; } inline SubscriptionExpression::Enum SubscriptionExpression::version() const { return d_version; } inline bool SubscriptionExpression::isValid() const { return d_expression.length() == 0 ? d_version == e_NONE : d_version > e_NONE; } inline bsl::ostream& SubscriptionExpression::print(bsl::ostream& stream, int level, int spacesPerLevel) const { if (stream.bad()) { return stream; // RETURN } bslim::Printer printer(&stream, level, spacesPerLevel); printer.start(); printer.printAttribute("Expression", d_expression); printer.printAttribute("Version", d_version); printer.end(); return stream; } // ---------------------- // class Subscription // ---------------------- inline Subscription::Subscription() : d_maxUnconfirmedMessages() , d_maxUnconfirmedBytes() , d_consumerPriority() , d_expression() { // NOTHING } inline Subscription::Subscription(const Subscription& other) : d_maxUnconfirmedMessages(other.d_maxUnconfirmedMessages) , d_maxUnconfirmedBytes(other.d_maxUnconfirmedBytes) , d_consumerPriority(other.d_consumerPriority) , d_expression(other.d_expression) { // NOTHING } inline bsl::ostream& Subscription::print(bsl::ostream& stream, int level, int spacesPerLevel) const { if (stream.bad()) { return stream; // RETURN } bslim::Printer printer(&stream, level, spacesPerLevel); printer.start(); printer.printAttribute("maxUnconfirmedMessages", maxUnconfirmedMessages()); printer.printAttribute("maxUnconfirmedBytes", maxUnconfirmedBytes()); printer.printAttribute("consumerPriority", consumerPriority()); d_expression.print(stream, level, spacesPerLevel); printer.end(); return stream; } // MANIPULATORS inline Subscription& Subscription::setMaxUnconfirmedMessages(int value) { d_maxUnconfirmedMessages.emplace(value); return *this; } inline Subscription& Subscription::setMaxUnconfirmedBytes(int value) { d_maxUnconfirmedBytes.emplace(value); return *this; } inline Subscription& Subscription::setConsumerPriority(int value) { d_consumerPriority.emplace(value); return *this; } inline Subscription& Subscription::setExpression(const SubscriptionExpression& value) { d_expression = value; return *this; } // ACCESSORS inline int Subscription::maxUnconfirmedMessages() const { return d_maxUnconfirmedMessages.value_or( k_DEFAULT_MAX_UNCONFIRMED_MESSAGES); } inline int Subscription::maxUnconfirmedBytes() const { return d_maxUnconfirmedBytes.value_or(k_DEFAULT_MAX_UNCONFIRMED_BYTES); } inline int Subscription::consumerPriority() const { return d_consumerPriority.value_or(k_DEFAULT_CONSUMER_PRIORITY); } inline const SubscriptionExpression& Subscription::expression() const { return d_expression; } inline bool Subscription::hasMaxUnconfirmedMessages() const { return d_maxUnconfirmedMessages.has_value(); } inline bool Subscription::hasMaxUnconfirmedBytes() const { return d_maxUnconfirmedBytes.has_value(); } inline bool Subscription::hasConsumerPriority() const { return d_consumerPriority.has_value(); } // FREE FUNCTIONS template <class HASH_ALGORITHM> void hashAppend(HASH_ALGORITHM& hashAlgo, const SubscriptionHandle& id) // Apply the specified 'hashAlgo' to the specified 'id'. { using bslh::hashAppend; // for ADL hashAppend(hashAlgo, id.d_id); } inline bool operator<(const SubscriptionHandle& lhs, const SubscriptionHandle& rhs) { return lhs.d_id < rhs.d_id; } inline bool operator==(const SubscriptionHandle& lhs, const SubscriptionHandle& rhs) { return lhs.d_id == rhs.d_id; } inline bool operator!=(const SubscriptionHandle& lhs, const SubscriptionHandle& rhs) { return lhs.d_id != rhs.d_id; } inline bsl::ostream& operator<<(bsl::ostream&stream, const Subscription& rhs) { return rhs.print(stream, 0, -1); } } // close package namespace } // close enterprise namespace #endif