// Copyright 2014-2023 Bloomberg Finance L.P. // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // bmqt_sessionoptions.h -*-C++-*- #ifndef INCLUDED_BMQT_SESSIONOPTIONS #define INCLUDED_BMQT_SESSIONOPTIONS //@PURPOSE: Provide a value-semantic type to configure session with the broker. // //@CLASSES: bmqt::SessionOptions: options to configure a session with a // BlazingMQ broker. // //@DESCRIPTION: 'bmqt::SessionOptions' provides a value-semantic type, // 'SessionOptions', which is used to specify session-level configuration // parameters. // // Most applications should not need to change the parameters for creating a // 'Session'; the default parameters are fine. // // The following parameters are supported: //: o !brokerUri!: //: Address of the broker to connect to. Default is to connect to the //: broker on the local host on the default port (30114). The format is //: 'tcp://<host>:port'. Host can be: //: o an explicit hostname or 'localhost' //: o an ip, example 10.11.12.13 //: o a DNS entry. In this case, the client will resolve the list of //: addresses from that entry, and try to connect to one of them. //: When the connection with the host goes down, it will automatically //: immediately failover and reconnects to another entry from the //: address list. //: If the environment variable 'BMQ_BROKER_URI' is set, then instances of //: 'bmqa::Session' will ignore the 'brokerUri' field from the provided //: 'SessionOptions' and use the value from this environment variable //: instead. //: //: o !processNameOverride!: //: If not empty, use this value for the processName in the identity //: message (useful for scripted language bindings). This field is used //: in the broker's logs to more easily identify the client's process. //: //: o !numProcessingThreads!: //: Number of threads to use for processing events. Default is 1. Note //: that this setting has an effect only if providing a //: 'SessionEventHandler' to the session. //: //: o !blobBufferSize!: //: Size (in bytes) of the blob buffers to use. Default value is 4k. //: //: o !channelHighWatermark!: //: Size (in bytes) to use for write cache high watermark on the //: channel. Default value is 128MB. This value is set on the //: 'writeCacheHiWatermark' of the 'btemt_ChannelPoolConfiguration' object //: used by the session with the broker. Note that BlazingMQ reserves 4MB //: of this value for control message, so the actual watermark for data //: published is 'channelHighWatermark - 4MB'. //: //: o !statsDumpInterval!: //: Interval (in seconds) at which to dump stats in the logs. Set to 0 to //: disable recurring dump of stats (final stats are always dumped at end //: of session). Default is 5min. The value must be a multiple of 30s, in //: the range [0s - 60min]. //: //: o !connectTimeout!, //: o !disconnetTimeout!, //: o !openQueueTimeout!, //: o !configureQueueTimeout!, //: o !closeQueueTimeout!, //: Default timeout to use for various operations. //: //: o !eventQueueLowWatermark!, //: o !eventQueueHighWatermark!, //: Parameters to configure the EventQueue notification watermarks //: thresholds. The EventQueue is the buffer of all incoming events from //: the broker (PUSH and ACK messages as well as session and queue //: operations result) pending being processed by the application code. A //: warning ('bmqt::SessionEventType::e_SLOWCONSUMER_HIGHWATERMARK') is //: emitted when the buffer reaches the 'highWatermark' value, and a //: notification ('bmqt::SessionEventType::e_SLOWCONSUMER_NORMAL') is //: sent when the buffer is back to the 'lowWatermark'. The //: 'highWatermark' typically would be reached in case of either a very //: slow consumer, causing events to accumulate in the buffer, or a huge //: burst of data. Setting the 'highWatermark' to a high value should be //: done cautiously because it will potentially hide slowness of the //: consumer because of the enqueuing of PUSH events for a consumer, ACK //: events for a producer as well as all system events to the buffer //: (meaning that the messages may have a huge latency). Note, it is also //: recommended to have a reasonable distance between 'highWatermark' and //: 'lowWatermark' values to avoid a constant back and forth toggling of //: state resulting from push pop of events. //: //: o !hostHealthMonitor!: //: Optional instance of a class derived from 'bmqpi::HostHealthMonitor', //: responsible for notifying the 'Session' when the health of the host //: machine has changed. A 'hostHealthMonitor' must be specified, in order //: for queues opened through the session to suspend on unhealthy hosts. //: //: o !traceOptions!: //: Provides the `bmqpi::DTContext` and `bmqpi::DTTracer` objects required //: for integration with a Distributed Trace framework. If these objects //: are provided, then the session will use them to create "spans" to //: represent requests made to the BlazingMQ broker on behalf of //: operations initiated by the client. This includes session-level //: operations (e.g., Session-Start, Session-Stop) as well as queue-level //: operations (e.g., Queue-Open, Queue-Configure, Queue-Close). // // BMQ #include <bmqscm_version.h> // BDE #include <bdlt_timeunitratio.h> #include <bsl_iosfwd.h> #include <bsl_memory.h> #include <bsl_string.h> #include <bslma_allocator.h> #include <bslma_usesbslmaallocator.h> #include <bslmf_nestedtraitdeclaration.h> #include <bsls_annotation.h> #include <bsls_assert.h> #include <bsls_timeinterval.h> #include <bsls_types.h> namespace BloombergLP { // FORWARD DECLARATIONS namespace bmqpi { class DTContext; } namespace bmqpi { class DTTracer; } namespace bmqpi { class HostHealthMonitor; } namespace bmqt { // ==================== // class SessionOptions // ==================== class SessionOptions { // value-semantic type for options to configure a session with a BlazingMQ // broker public: // CONSTANTS static const char k_BROKER_DEFAULT_URI[]; // Default URI of the 'bmqbrkr' to connect to. static const int k_BROKER_DEFAULT_PORT = 30114; // Default port the 'bmqbrkr' is listening to for client // to connect. static const int k_QUEUE_OPERATION_DEFAULT_TIMEOUT = 5 * bdlt::TimeUnitRatio::k_SECONDS_PER_MINUTE; // The default, and minimum recommended, value for queue // operations (open, configure, close). private: // DATA bsl::string d_brokerUri; // URI of the broker to connect to (ex: // 'tcp://localhost:30114'). Default // is to connect to the local broker. bsl::string d_processNameOverride; // If not empty, use this value for the // processName in the identity message // (useful for scripted language // bindings). int d_numProcessingThreads; // Number of processing threads. // Default is 1 thread. int d_blobBufferSize; // Size of the blobs buffer. bsls::Types::Int64 d_channelHighWatermark; // Write cache high watermark to use on // the channel bsls::TimeInterval d_statsDumpInterval; // Interval at which to dump stats to // log file (0 to disable dump) bsls::TimeInterval d_connectTimeout; bsls::TimeInterval d_disconnectTimeout; bsls::TimeInterval d_openQueueTimeout; bsls::TimeInterval d_configureQueueTimeout; bsls::TimeInterval d_closeQueueTimeout; // Timeout for operations of the // corresponding type. int d_eventQueueLowWatermark; int d_eventQueueHighWatermark; // Parameters to configure the // EventQueue. int d_eventQueueSize; // DEPRECATED: This parameter is no // longer relevant and will be removed // in future release of libbmq. bsl::shared_ptr<bmqpi::HostHealthMonitor> d_hostHealthMonitor_sp; bsl::shared_ptr<bmqpi::DTContext> d_dtContext_sp; bsl::shared_ptr<bmqpi::DTTracer> d_dtTracer_sp; public: // TRAITS BSLMF_NESTED_TRAIT_DECLARATION(SessionOptions, bslma::UsesBslmaAllocator) // CREATORS explicit SessionOptions(bslma::Allocator *allocator = 0); // Create a new SessionOptions using the optionally specified // 'allocator'. SessionOptions(const SessionOptions& other, bslma::Allocator *allocator = 0); // Create a new SessionOptions by copying values from the specified // 'other', using the optionally specified 'allocator'. // MANIPULATORS SessionOptions& setBrokerUri(const bslstl::StringRef& value); // Set the broker URI to the specified 'value'. SessionOptions& setProcessNameOverride(const bslstl::StringRef& value); // Set an override of the proces name to the specified 'value'. SessionOptions& setNumProcessingThreads(int value); // Set the number of processing threads to the specified 'value'. SessionOptions& setBlobBufferSize(int value); // Set the specified 'value' for the size of blobs buffers. SessionOptions& setChannelHighWatermark(bsls::Types::Int64 value); // Set the specified 'value' (in bytes) for the channel high // watermark. The behavior is undefined unless // '8 * 1024 * 1024 < value'. SessionOptions& setStatsDumpInterval(const bsls::TimeInterval& value); // Set the statsDumpInterval to the specified 'value'. The behavior is // undefined unless 'value' is a multiple of 30s and less than 60 // minutes. SessionOptions& setConnectTimeout(const bsls::TimeInterval& value); SessionOptions& setDisconnectTimeout(const bsls::TimeInterval& value); SessionOptions& setOpenQueueTimeout(const bsls::TimeInterval& value); SessionOptions& setConfigureQueueTimeout(const bsls::TimeInterval& value); SessionOptions& setCloseQueueTimeout(const bsls::TimeInterval& value); // Set the timeout for operations of the corresponding type to the // specified 'value'. SessionOptions& setHostHealthMonitor( const bsl::shared_ptr<bmqpi::HostHealthMonitor>& monitor); // Set a 'HostHealthMonitor' object that will notify the session when // the health of the host has changed. SessionOptions& setTraceOptions(const bsl::shared_ptr<bmqpi::DTContext>& dtContext, const bsl::shared_ptr<bmqpi::DTTracer>& dtTracer); // Set the 'DTContext' and 'DTTracer' objects needed to integrate with // Distributed Trace frameworks. Either both arguments must point to // valid instances, or both must be empty shared_ptr's; if either is an // empty shared_ptr and the other is not, then behavior is undefined. SessionOptions& configureEventQueue(int queueSize, int lowWatermark, int highWatermark); // DEPRECATED: Use 'configureEventQueue(int lowWatermark, // int highWatermark)' // instead. This method will be marked as // 'BSLS_ANNOTATION_DEPRECATED' in future release of // libbmq. SessionOptions& configureEventQueue(int lowWatermark, int highWatermark); // Configure the EventQueue notification watermarks thresholds with the // specified 'lowWatermark' and 'highWatermark' value. Refer to the // component level documentation for explanation of those watermarks. // The behavior is undefined unless 'lowWatermark < highWatermark'. // ACCESSORS const bsl::string& brokerUri() const; // Get the broker URI. const bsl::string& processNameOverride() const; // Return the process name override. int numProcessingThreads() const; // Get the number of processing threads. int blobBufferSize() const; // Get the size of the blobs buffer. bsls::Types::Int64 channelHighWatermark() const; // Get the channel high watermark. const bsls::TimeInterval& statsDumpInterval() const; // Get the stats dump interval. const bsls::TimeInterval& connectTimeout() const; const bsls::TimeInterval& disconnectTimeout() const; const bsls::TimeInterval& openQueueTimeout() const; const bsls::TimeInterval& configureQueueTimeout() const; const bsls::TimeInterval& closeQueueTimeout() const; // Get the timeout for the operations of the corresponding type. const bsl::shared_ptr<bmqpi::HostHealthMonitor>& hostHealthMonitor() const; const bsl::shared_ptr<bmqpi::DTContext>& traceContext() const; // Get the Distributed Trace context object. const bsl::shared_ptr<bmqpi::DTTracer>& tracer() const; // Get the Distributed Trace tracer object. int eventQueueLowWatermark() const; int eventQueueHighWatermark() const; int eventQueueSize() const; // DEPRECATED: This parameter is no longer relevant and will be removed // in future release of libbmq. bsl::ostream& print(bsl::ostream& stream, int level = 0, int spacesPerLevel = 4) const; // Format this object to the specified output 'stream' at the (absolute // value of) the optionally specified indentation 'level' and return a // reference to 'stream'. If 'level' is specified, optionally specify // 'spacesPerLevel', the number of spaces per indentation level for // this and all of its nested objects. If 'level' is negative, // suppress indentation of the first line. If 'spacesPerLevel' is // negative format the entire output on one line, suppressing all but // the initial indentation (as governed by 'level'). If 'stream' is // not valid on entry, this operation has no effect. }; // FREE OPERATORS bool operator==(const SessionOptions& lhs, const SessionOptions& rhs); // Return 'true' if the specified 'rhs' object contains the value of the // same type as contained in the specified 'lhs' object and the value // itself is the same in both objects, return false otherwise. bool operator!=(const SessionOptions& lhs, const SessionOptions& rhs); // Return 'false' if the specified 'rhs' object contains the value of the // same type as contained in the specified 'lhs' object and the value // itself is the same in both objects, return 'true' otherwise. bsl::ostream& operator<<(bsl::ostream& stream, const SessionOptions& rhs); // Format the specified 'rhs' to the specified output 'stream' and return a // reference to the modifiable 'stream'. // ============================================================================ // INLINE DEFINITIONS // ============================================================================ // -------------------- // class SessionOptions // -------------------- // MANIPULATORS inline SessionOptions& SessionOptions::setBrokerUri(const bslstl::StringRef& value) { d_brokerUri.assign(value.data(), value.length()); return *this; } inline SessionOptions& SessionOptions::setProcessNameOverride(const bslstl::StringRef& value) { d_processNameOverride.assign(value.data(), value.length()); return *this; } inline SessionOptions& SessionOptions::setNumProcessingThreads(int value) { d_numProcessingThreads = value; return *this; } inline SessionOptions& SessionOptions::setBlobBufferSize(int value) { d_blobBufferSize = value; return *this; } inline SessionOptions& SessionOptions::setChannelHighWatermark(bsls::Types::Int64 value) { // PRECONDITIONS BSLS_ASSERT_OPT(8 * 1024 * 1024 < value); // We reserve 4MB for control message (see // 'bmqimp::BrokerSession::k_CONTROL_DATA_WATERMARK_EXTRA') so make // sure the provided value is greater than it. d_channelHighWatermark = value; return *this; } inline SessionOptions& SessionOptions::setStatsDumpInterval(const bsls::TimeInterval& value) { // PRECONDITIONS BSLS_ASSERT_OPT( value.seconds() % 30 == 0 && "value must be a multiple of 30s"); BSLS_ASSERT_OPT( value.seconds() < bdlt::TimeUnitRatio::k_SECONDS_PER_HOUR && "value must be < 10 min"); d_statsDumpInterval = value; return *this; } inline SessionOptions& SessionOptions::setConnectTimeout(const bsls::TimeInterval& value) { d_connectTimeout = value; return *this; } inline SessionOptions& SessionOptions::setDisconnectTimeout(const bsls::TimeInterval& value) { d_disconnectTimeout = value; return *this; } inline SessionOptions& SessionOptions::setOpenQueueTimeout(const bsls::TimeInterval& value) { d_openQueueTimeout = value; return *this; } inline SessionOptions& SessionOptions::setConfigureQueueTimeout(const bsls::TimeInterval& value) { d_configureQueueTimeout = value; return *this; } inline SessionOptions& SessionOptions::setCloseQueueTimeout(const bsls::TimeInterval& value) { d_closeQueueTimeout = value; return *this; } inline SessionOptions& SessionOptions::setHostHealthMonitor( const bsl::shared_ptr<bmqpi::HostHealthMonitor>& monitor) { d_hostHealthMonitor_sp = monitor; return *this; } inline SessionOptions& SessionOptions::setTraceOptions( const bsl::shared_ptr<bmqpi::DTContext>& context, const bsl::shared_ptr<bmqpi::DTTracer>& tracer) { BSLS_ASSERT_OPT((context && tracer) || (!context && !tracer)); d_dtContext_sp = context; d_dtTracer_sp = tracer; return *this; } inline SessionOptions& SessionOptions::configureEventQueue(int queueSize, int lowWatermark, int highWatermark) { d_eventQueueSize = queueSize; return configureEventQueue(lowWatermark, highWatermark); } inline SessionOptions& SessionOptions::configureEventQueue(int lowWatermark, int highWatermark) { // PRECONDITIONS BSLS_ASSERT_OPT(lowWatermark < highWatermark); d_eventQueueLowWatermark = lowWatermark; d_eventQueueHighWatermark = highWatermark; return *this; } // ACCESSORS inline const bsl::string& SessionOptions::brokerUri() const { return d_brokerUri; } inline const bsl::string& SessionOptions::processNameOverride() const { return d_processNameOverride; } inline int SessionOptions::numProcessingThreads() const { return d_numProcessingThreads; } inline int SessionOptions::blobBufferSize() const { return d_blobBufferSize; } inline bsls::Types::Int64 SessionOptions::channelHighWatermark() const { return d_channelHighWatermark; } inline const bsls::TimeInterval& SessionOptions::statsDumpInterval() const { return d_statsDumpInterval; } inline const bsls::TimeInterval& SessionOptions::connectTimeout() const { return d_connectTimeout; } inline const bsls::TimeInterval& SessionOptions::disconnectTimeout() const { return d_disconnectTimeout; } inline const bsls::TimeInterval& SessionOptions::openQueueTimeout() const { return d_openQueueTimeout; } inline const bsls::TimeInterval& SessionOptions::configureQueueTimeout() const { return d_configureQueueTimeout; } inline const bsls::TimeInterval& SessionOptions::closeQueueTimeout() const { return d_closeQueueTimeout; } inline const bsl::shared_ptr<bmqpi::HostHealthMonitor>& SessionOptions::hostHealthMonitor() const { return d_hostHealthMonitor_sp; } inline const bsl::shared_ptr<bmqpi::DTContext>& SessionOptions::traceContext() const { return d_dtContext_sp; } inline const bsl::shared_ptr<bmqpi::DTTracer>& SessionOptions::tracer() const { return d_dtTracer_sp; } inline int SessionOptions::eventQueueLowWatermark() const { return d_eventQueueLowWatermark; } inline int SessionOptions::eventQueueHighWatermark() const { return d_eventQueueHighWatermark; } inline int SessionOptions::eventQueueSize() const { return d_eventQueueSize; } } // close package namespace // -------------------- // class SessionOptions // -------------------- inline bool bmqt::operator==(const bmqt::SessionOptions& lhs, const bmqt::SessionOptions& rhs) { return lhs.brokerUri() == rhs.brokerUri() && lhs.numProcessingThreads() == rhs.numProcessingThreads() && lhs.blobBufferSize() == rhs.blobBufferSize() && lhs.channelHighWatermark() == rhs.channelHighWatermark() && lhs.statsDumpInterval() == rhs.statsDumpInterval() && lhs.connectTimeout() == rhs.connectTimeout() && lhs.openQueueTimeout() == rhs.openQueueTimeout() && lhs.configureQueueTimeout() == rhs.configureQueueTimeout() && lhs.closeQueueTimeout() == rhs.closeQueueTimeout() && lhs.eventQueueLowWatermark() == rhs.eventQueueLowWatermark() && lhs.eventQueueHighWatermark() == rhs.eventQueueHighWatermark() && lhs.hostHealthMonitor() == rhs.hostHealthMonitor() && lhs.traceContext() == rhs.traceContext() && lhs.tracer() == rhs.tracer(); } inline bool bmqt::operator!=(const bmqt::SessionOptions& lhs, const bmqt::SessionOptions& rhs) { return lhs.brokerUri() != rhs.brokerUri() || lhs.numProcessingThreads() != rhs.numProcessingThreads() || lhs.blobBufferSize() != rhs.blobBufferSize() || lhs.channelHighWatermark() != rhs.channelHighWatermark() || lhs.statsDumpInterval() != rhs.statsDumpInterval() || lhs.connectTimeout() != rhs.connectTimeout() || lhs.openQueueTimeout() != rhs.openQueueTimeout() || lhs.configureQueueTimeout() != rhs.configureQueueTimeout() || lhs.closeQueueTimeout() != rhs.closeQueueTimeout() || lhs.eventQueueLowWatermark() != rhs.eventQueueLowWatermark() || lhs.eventQueueHighWatermark() != rhs.eventQueueHighWatermark() || lhs.hostHealthMonitor() != rhs.hostHealthMonitor() || lhs.traceContext() != rhs.traceContext() || lhs.tracer() != rhs.tracer(); } inline bsl::ostream& bmqt::operator<<(bsl::ostream& stream, const bmqt::SessionOptions& rhs) { return rhs.print(stream, 0, -1); } } // close enterprise namespace #endif