// Copyright 2015-2023 Bloomberg Finance L.P. // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // bmqt_queueoptions.h -*-C++-*- #ifndef INCLUDED_BMQT_QUEUEOPTIONS #define INCLUDED_BMQT_QUEUEOPTIONS //@PURPOSE: Provide a value-semantic type for options related to a queue. // //@CLASSES: // bmqt::QueueOptions: options related to a queue. // //@DESCRIPTION: 'bmqt::QueueOptions' provides a value-semantic type, // 'QueueOptions', which is used to specify parameters for a queue. // // // The following parameters are supported: //: o !maxUnconfirmedMessages!: //: Maximum number of outstanding messages that can be sent by the broker //: without being confirmed. //: //: o !maxUnconfirmedBytes!: //: Maximum accumulated bytes of all outstanding messages that can be sent //: by the broker without being confirmed. //: //: o !consumerPriority!: //: Priority of a consumer with respect to delivery of messages. //: //: o !suspendsOnBadHostHealth!: //: Sets whether the queue should suspend operation when the host machine //: is unhealthy. // BMQ #include <bmqscm_version.h> #include <bmqt_subscription.h> //BDE #include <bsl_iosfwd.h> #include <bsl_optional.h> #include <bsl_vector.h> #include <bsl_unordered_map.h> #include <bslma_allocator.h> #include <bslma_usesbslmaallocator.h> #include <bslmf_nestedtraitdeclaration.h> namespace BloombergLP { namespace bmqt { // ================== // class QueueOptions // ================== class QueueOptions { // Value-semantic type for options related to a queue. public: // PUBLIC CONSTANTS static const int k_CONSUMER_PRIORITY_MIN; // Constant representing the minimum // valid consumer priority static const int k_CONSUMER_PRIORITY_MAX; // Constant representing the maximum // valid consumer priority static const int k_DEFAULT_MAX_UNCONFIRMED_MESSAGES; static const int k_DEFAULT_MAX_UNCONFIRMED_BYTES; static const int k_DEFAULT_CONSUMER_PRIORITY; static const bool k_DEFAULT_SUSPENDS_ON_BAD_HOST_HEALTH; private: // PRIVATE TYPES typedef bsl::unordered_map<SubscriptionHandle, Subscription> Subscriptions; private: // DATA Subscription d_info; bsl::optional<bool> d_suspendsOnBadHostHealth; // Whether the queue suspends operation // while the host is unhealthy. Subscriptions d_subscriptions; bool d_hadSubscriptions; // 'true' if 'd_subscriptions' had a value, 'false' // otherwise. Emulates 'bsl::optional' for // 'd_subscriptions'. bslma::Allocator *d_allocator_p; // Allocator public: // PUBLIC TYPES typedef bsl::pair<SubscriptionHandle, Subscription> HandleAndSubscription; typedef bsl::vector<HandleAndSubscription> SubscriptionsSnapshot; // 'loadSubscriptions' return types // // EXPERIMENTAL. Do not use until this feature is announced. public: // TRAITS BSLMF_NESTED_TRAIT_DECLARATION(QueueOptions, bslma::UsesBslmaAllocator) // CREATORS explicit QueueOptions(bslma::Allocator *allocator = 0); // Create a new QueueOptions using the optionally specified // 'allocator'. QueueOptions(const QueueOptions& other, bslma::Allocator *allocator = 0); // Create a new QueueOptions by copying values from the specified // 'other', using the optionally specified 'allocator'. // MANIPULATORS QueueOptions& setMaxUnconfirmedMessages(int value); // Set the maxUnconfirmedMessages to the specified 'value'. The // behavior is undefined unless 'value >= 0'. If the specified 'value' // is set to 0, it means that the consumer does not receive any // messages. This might be useful when the consumer is shutting down // and wants to process only pending messages, or when it is unable to // process new messages because of transient issues. QueueOptions& setMaxUnconfirmedBytes(int value); // Set the maxUnconfirmedBytes to the specified 'value'. The behavior // is undefined unless 'value >= 0'. QueueOptions& setConsumerPriority(int value); // Set the consumerPriority to the specified 'value'. The behavior is // undefined unless 'k_CONSUMER_PRIORITY_MIN <= value <= // k_CONSUMER_PRIORITY_MAX' QueueOptions& setSuspendsOnBadHostHealth(bool value); // Set whether the queue suspends operation while host is unhealthy. QueueOptions& merge(const QueueOptions& other); // "Merges" another 'QueueOptions' into this one, by invoking // setF(other.F()) // for all fields 'F' for which 'other.hasF()' is true. Returns the // instance on which the method was invoked. bool addOrUpdateSubscription(bsl::string *errorDescription, const SubscriptionHandle& handle, const Subscription& subscription); // Add, or update if it exists, the specified 'subscription' for the // specified 'handle'. Return true on success, otherwise return false // and load the specified 'errorDescription' with a description of the // error. Note that 'errorDescription' may be null if the caller does // not care about getting error messages, but users are strongly // encouraged to log error string if this API returns failure. // // EXPERIMENTAL. Do not use until this feature is announced. bool removeSubscription(const SubscriptionHandle& handle); // Return false if subscription does not exist. // // EXPERIMENTAL. Do not use until this feature is announced. void removeAllSubscriptions(); // Remove all subscriptions. // // EXPERIMENTAL. Do not use until this feature is announced. // ACCESSORS int maxUnconfirmedMessages() const; // Get the number for the maxUnconfirmedMessages parameter. int maxUnconfirmedBytes() const; // Get the number for the maxUnconfirmedBytes parameter. int consumerPriority() const; // Get the number for the consumerPriority parameter. bool suspendsOnBadHostHealth() const; // Get whether the queue suspends operation while host is unhealthy. bool hasMaxUnconfirmedMessages() const; // Returns whether 'maxUnconfirmedMessages' has been set for this // object, or whether it implicitly holds // 'k_DEFAULT_MAX_UNCONFIRMED_MESSAGES'. bool hasMaxUnconfirmedBytes() const; // Returns whether 'maxUnconfirmedBytes' has been set for this object, // or whether it implicitly holds 'k_DEFAULT_MAX_UNCONFIRMED_BYTES'. bool hasConsumerPriority() const; // Returns whether 'consumerPriority' has been set for this object, or // whether it implicitly holds 'k_DEFAULT_CONSUMER_PRIORITY'. bool hasSuspendsOnBadHostHealth() const; // Returns whether 'suspendsOnBadHostHealth' has been set for this // object, or whether it implicitly holds // 'k_DEFAULT_SUSPENDS_ON_BAD_HOST_HEALTH'. bool loadSubscription(Subscription *subscription, const SubscriptionHandle& handle) const; // Return false if subscription does not exist. // // EXPERIMENTAL. Do not use until this feature is announced. void loadSubscriptions(SubscriptionsSnapshot *snapshot) const; // Load all handles and subscriptions into the specified 'snapshot'. // // EXPERIMENTAL. Do not use until this feature is announced. bsl::ostream& print(bsl::ostream& stream, int level = 0, int spacesPerLevel = 4) const; // Format this object to the specified output 'stream' at the (absolute // value of) the optionally specified indentation 'level' and return a // reference to 'stream'. If 'level' is specified, optionally specify // 'spacesPerLevel', the number of spaces per indentation level for // this and all of its nested objects. If 'level' is negative, // suppress indentation of the first line. If 'spacesPerLevel' is // negative format the entire output on one line, suppressing all but // the initial indentation (as governed by 'level'). If 'stream' is // not valid on entry, this operation has no effect. }; // FREE OPERATORS bool operator==(const QueueOptions& lhs, const QueueOptions& rhs); // Return 'true' if the specified 'rhs' object contains the value of the // same type as contained in the specified 'lhs' object and the value // itself is the same in both objects, return false otherwise. bool operator!=(const QueueOptions& lhs, const QueueOptions& rhs); // Return 'false' if the specified 'rhs' object contains the value of the // same type as contained in the specified 'lhs' object and the value // itself is the same in both objects, return 'true' otherwise. bsl::ostream& operator<<(bsl::ostream& stream, const QueueOptions& rhs); // Format the specified 'rhs' to the specified output 'stream' and return a // reference to the modifiable 'stream'. // ============================================================================ // INLINE DEFINITIONS // ============================================================================ // ------------------ // class QueueOptions // ------------------ // MANIPULATORS inline QueueOptions& QueueOptions::setMaxUnconfirmedMessages(int value) { d_info.setMaxUnconfirmedMessages(value); return *this; } inline QueueOptions& QueueOptions::setMaxUnconfirmedBytes(int value) { d_info.setMaxUnconfirmedBytes(value); return *this; } inline QueueOptions& QueueOptions::setConsumerPriority(int value) { d_info.setConsumerPriority(value); return *this; } inline QueueOptions& QueueOptions::setSuspendsOnBadHostHealth(bool value) { d_suspendsOnBadHostHealth.emplace(value); return *this; } // ACCESSORS inline int QueueOptions::maxUnconfirmedMessages() const { return d_info.maxUnconfirmedMessages(); } inline int QueueOptions::maxUnconfirmedBytes() const { return d_info.maxUnconfirmedBytes(); } inline int QueueOptions::consumerPriority() const { return d_info.consumerPriority(); } inline bool QueueOptions::suspendsOnBadHostHealth() const { return d_suspendsOnBadHostHealth.value_or( k_DEFAULT_SUSPENDS_ON_BAD_HOST_HEALTH); } inline bool QueueOptions::hasMaxUnconfirmedMessages() const { return d_info.hasMaxUnconfirmedMessages(); } inline bool QueueOptions::hasMaxUnconfirmedBytes() const { return d_info.hasMaxUnconfirmedBytes(); } inline bool QueueOptions::hasConsumerPriority() const { return d_info.hasConsumerPriority(); } inline bool QueueOptions::hasSuspendsOnBadHostHealth() const { return d_suspendsOnBadHostHealth.has_value(); } } // close package namespace // ------------------ // class QueueOptions // ------------------ inline bool bmqt::operator==(const bmqt::QueueOptions& lhs, const bmqt::QueueOptions& rhs) { return lhs.maxUnconfirmedMessages() == rhs.maxUnconfirmedMessages() && lhs.maxUnconfirmedBytes() == rhs.maxUnconfirmedBytes() && lhs.consumerPriority() == rhs.consumerPriority() && lhs.suspendsOnBadHostHealth() == rhs.suspendsOnBadHostHealth(); } inline bool bmqt::operator!=(const bmqt::QueueOptions& lhs, const bmqt::QueueOptions& rhs) { return lhs.maxUnconfirmedMessages() != rhs.maxUnconfirmedMessages() || lhs.maxUnconfirmedBytes() != rhs.maxUnconfirmedBytes() || lhs.consumerPriority() != rhs.consumerPriority() || lhs.suspendsOnBadHostHealth() != rhs.suspendsOnBadHostHealth(); } inline bsl::ostream& bmqt::operator<<(bsl::ostream& stream, const bmqt::QueueOptions& rhs) { return rhs.print(stream, 0, -1); } } // close enterprise namespace #endif