libbmq 1576e52e850d5ec711093d9c6d671e80491fd316
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
bmqt_sessionoptions.h
Go to the documentation of this file.
1// Copyright 2014-2023 Bloomberg Finance L.P.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16// bmqt_sessionoptions.h -*-C++-*-
17#ifndef INCLUDED_BMQT_SESSIONOPTIONS
18#define INCLUDED_BMQT_SESSIONOPTIONS
19
124
125// BMQ
126
127// BDE
128#include <bdlt_timeunitratio.h>
129#include <bsl_iosfwd.h>
130#include <bsl_memory.h>
131#include <bsl_string.h>
132#include <bslma_allocator.h>
133#include <bslma_usesbslmaallocator.h>
134#include <bslmf_nestedtraitdeclaration.h>
135#include <bsls_annotation.h>
136#include <bsls_assert.h>
137#include <bsls_timeinterval.h>
138#include <bsls_types.h>
139
140namespace BloombergLP {
141
142// FORWARD DECLARATIONS
143namespace bmqpi {
144class DTContext;
145}
146namespace bmqpi {
147class DTTracer;
148}
149namespace bmqpi {
150class HostHealthMonitor;
151}
152
153namespace bmqt {
154
155// ====================
156// class SessionOptions
157// ====================
158
162 public:
163 // CONSTANTS
164
166 static const char k_BROKER_DEFAULT_URI[];
167
169 static const int k_BROKER_DEFAULT_PORT = 30114;
170
174 5 * bdlt::TimeUnitRatio::k_SECONDS_PER_MINUTE;
175
176 private:
177 // DATA
178
181 bsl::string d_brokerUri;
182
185 bsl::string d_processNameOverride;
186
188 int d_numProcessingThreads;
189
191 int d_blobBufferSize;
192
194 bsls::Types::Int64 d_channelHighWatermark;
195
197 bsls::TimeInterval d_statsDumpInterval;
198
199 bsls::TimeInterval d_connectTimeout;
200
201 bsls::TimeInterval d_disconnectTimeout;
202
203 bsls::TimeInterval d_openQueueTimeout;
204
205 bsls::TimeInterval d_configureQueueTimeout;
206
208 bsls::TimeInterval d_closeQueueTimeout;
209
210 int d_eventQueueLowWatermark;
211
213 int d_eventQueueHighWatermark;
214
217 int d_eventQueueSize;
218
219 bsl::shared_ptr<bmqpi::HostHealthMonitor> d_hostHealthMonitor_sp;
220
221 bsl::shared_ptr<bmqpi::DTContext> d_dtContext_sp;
222 bsl::shared_ptr<bmqpi::DTTracer> d_dtTracer_sp;
223
224 public:
225 // TRAITS
226 BSLMF_NESTED_TRAIT_DECLARATION(SessionOptions, bslma::UsesBslmaAllocator)
227
228 // CREATORS
229
230
232 explicit SessionOptions(bslma::Allocator* allocator = 0);
233
237 bslma::Allocator* allocator = 0);
238
239 // MANIPULATORS
240
242 SessionOptions& setBrokerUri(const bslstl::StringRef& value);
243
245 SessionOptions& setProcessNameOverride(const bslstl::StringRef& value);
246
249
252
256 SessionOptions& setChannelHighWatermark(bsls::Types::Int64 value);
257
261 SessionOptions& setStatsDumpInterval(const bsls::TimeInterval& value);
262
264 SessionOptions& setConnectTimeout(const bsls::TimeInterval& value);
265
268 SessionOptions& setDisconnectTimeout(const bsls::TimeInterval& value);
269
271 SessionOptions& setOpenQueueTimeout(const bsls::TimeInterval& value);
272
275 SessionOptions& setConfigureQueueTimeout(const bsls::TimeInterval& value);
276
278 SessionOptions& setCloseQueueTimeout(const bsls::TimeInterval& value);
279
283 const bsl::shared_ptr<bmqpi::HostHealthMonitor>& monitor);
284
290 setTraceOptions(const bsl::shared_ptr<bmqpi::DTContext>& dtContext,
291 const bsl::shared_ptr<bmqpi::DTTracer>& dtTracer);
292
299 configureEventQueue(int queueSize, int lowWatermark, int highWatermark);
300
305 SessionOptions& configureEventQueue(int lowWatermark, int highWatermark);
306
307 // ACCESSORS
308
310 const bsl::string& brokerUri() const;
311
313 const bsl::string& processNameOverride() const;
314
316 int numProcessingThreads() const;
317
319 int blobBufferSize() const;
320
322 bsls::Types::Int64 channelHighWatermark() const;
323
325 const bsls::TimeInterval& statsDumpInterval() const;
326
328 const bsls::TimeInterval& connectTimeout() const;
329
331 const bsls::TimeInterval& disconnectTimeout() const;
332
334 const bsls::TimeInterval& openQueueTimeout() const;
335
337 const bsls::TimeInterval& configureQueueTimeout() const;
338
340 const bsls::TimeInterval& closeQueueTimeout() const;
341
342 const bsl::shared_ptr<bmqpi::HostHealthMonitor>& hostHealthMonitor() const;
343
345 const bsl::shared_ptr<bmqpi::DTContext>& traceContext() const;
346
348 const bsl::shared_ptr<bmqpi::DTTracer>& tracer() const;
349
350 int eventQueueLowWatermark() const;
351 int eventQueueHighWatermark() const;
352
355 int eventQueueSize() const;
356
366 bsl::ostream&
367 print(bsl::ostream& stream, int level = 0, int spacesPerLevel = 4) const;
368};
369
370// FREE OPERATORS
371
375bool operator==(const SessionOptions& lhs, const SessionOptions& rhs);
376
380bool operator!=(const SessionOptions& lhs, const SessionOptions& rhs);
381
384bsl::ostream& operator<<(bsl::ostream& stream, const SessionOptions& rhs);
385
386// ============================================================================
387// INLINE DEFINITIONS
388// ============================================================================
389
390// --------------------
391// class SessionOptions
392// --------------------
393
394// MANIPULATORS
395inline SessionOptions&
396SessionOptions::setBrokerUri(const bslstl::StringRef& value)
397{
398 d_brokerUri.assign(value.data(), value.length());
399 return *this;
400}
401
402inline SessionOptions&
403SessionOptions::setProcessNameOverride(const bslstl::StringRef& value)
404{
405 d_processNameOverride.assign(value.data(), value.length());
406 return *this;
407}
408
410{
411 d_numProcessingThreads = value;
412 return *this;
413}
414
416{
417 d_blobBufferSize = value;
418 return *this;
419}
420
421inline SessionOptions&
423{
424 // PRECONDITIONS
425 BSLS_ASSERT_OPT(8 * 1024 * 1024 < value);
426 // We reserve 4MB for control message (see
427 // 'bmqimp::BrokerSession::k_CONTROL_DATA_WATERMARK_EXTRA') so make
428 // sure the provided value is greater than it.
429
430 d_channelHighWatermark = value;
431 return *this;
432}
433
434inline SessionOptions&
435SessionOptions::setStatsDumpInterval(const bsls::TimeInterval& value)
436{
437 // PRECONDITIONS
438 BSLS_ASSERT_OPT(value.seconds() % 30 == 0 &&
439 "value must be a multiple of 30s");
440 BSLS_ASSERT_OPT(value.seconds() <
441 bdlt::TimeUnitRatio::k_SECONDS_PER_HOUR &&
442 "value must be < 10 min");
443
444 d_statsDumpInterval = value;
445 return *this;
446}
447
448inline SessionOptions&
449SessionOptions::setConnectTimeout(const bsls::TimeInterval& value)
450{
451 d_connectTimeout = value;
452 return *this;
453}
454
455inline SessionOptions&
456SessionOptions::setDisconnectTimeout(const bsls::TimeInterval& value)
457{
458 d_disconnectTimeout = value;
459 return *this;
460}
461
462inline SessionOptions&
463SessionOptions::setOpenQueueTimeout(const bsls::TimeInterval& value)
464{
465 d_openQueueTimeout = value;
466 return *this;
467}
468
469inline SessionOptions&
470SessionOptions::setConfigureQueueTimeout(const bsls::TimeInterval& value)
471{
472 d_configureQueueTimeout = value;
473 return *this;
474}
475
476inline SessionOptions&
477SessionOptions::setCloseQueueTimeout(const bsls::TimeInterval& value)
478{
479 d_closeQueueTimeout = value;
480 return *this;
481}
482
484 const bsl::shared_ptr<bmqpi::HostHealthMonitor>& monitor)
485{
486 d_hostHealthMonitor_sp = monitor;
487 return *this;
488}
489
491 const bsl::shared_ptr<bmqpi::DTContext>& context,
492 const bsl::shared_ptr<bmqpi::DTTracer>& tracer)
493{
494 BSLS_ASSERT_OPT((context && tracer) || (!context && !tracer));
495
496 d_dtContext_sp = context;
497 d_dtTracer_sp = tracer;
498 return *this;
499}
500
502 int lowWatermark,
503 int highWatermark)
504{
505 d_eventQueueSize = queueSize;
506 return configureEventQueue(lowWatermark, highWatermark);
507}
508
510 int highWatermark)
511{
512 // PRECONDITIONS
513 BSLS_ASSERT_OPT(lowWatermark < highWatermark);
514
515 d_eventQueueLowWatermark = lowWatermark;
516 d_eventQueueHighWatermark = highWatermark;
517
518 return *this;
519}
520
521// ACCESSORS
522inline const bsl::string& SessionOptions::brokerUri() const
523{
524 return d_brokerUri;
525}
526
527inline const bsl::string& SessionOptions::processNameOverride() const
528{
529 return d_processNameOverride;
530}
531
533{
534 return d_numProcessingThreads;
535}
536
538{
539 return d_blobBufferSize;
540}
541
542inline bsls::Types::Int64 SessionOptions::channelHighWatermark() const
543{
544 return d_channelHighWatermark;
545}
546
547inline const bsls::TimeInterval& SessionOptions::statsDumpInterval() const
548{
549 return d_statsDumpInterval;
550}
551
552inline const bsls::TimeInterval& SessionOptions::connectTimeout() const
553{
554 return d_connectTimeout;
555}
556
557inline const bsls::TimeInterval& SessionOptions::disconnectTimeout() const
558{
559 return d_disconnectTimeout;
560}
561
562inline const bsls::TimeInterval& SessionOptions::openQueueTimeout() const
563{
564 return d_openQueueTimeout;
565}
566
567inline const bsls::TimeInterval& SessionOptions::configureQueueTimeout() const
568{
569 return d_configureQueueTimeout;
570}
571
572inline const bsls::TimeInterval& SessionOptions::closeQueueTimeout() const
573{
574 return d_closeQueueTimeout;
575}
576
577inline const bsl::shared_ptr<bmqpi::HostHealthMonitor>&
579{
580 return d_hostHealthMonitor_sp;
581}
582
583inline const bsl::shared_ptr<bmqpi::DTContext>&
585{
586 return d_dtContext_sp;
587}
588
589inline const bsl::shared_ptr<bmqpi::DTTracer>& SessionOptions::tracer() const
590{
591 return d_dtTracer_sp;
592}
593
595{
596 return d_eventQueueLowWatermark;
597}
598
600{
601 return d_eventQueueHighWatermark;
602}
603
605{
606 return d_eventQueueSize;
607}
608
609} // close package namespace
610
611// --------------------
612// class SessionOptions
613// --------------------
614
616 const bmqt::SessionOptions& rhs)
617{
618 return lhs.brokerUri() == rhs.brokerUri() &&
620 lhs.blobBufferSize() == rhs.blobBufferSize() &&
622 lhs.statsDumpInterval() == rhs.statsDumpInterval() &&
623 lhs.connectTimeout() == rhs.connectTimeout() &&
624 lhs.openQueueTimeout() == rhs.openQueueTimeout() &&
626 lhs.closeQueueTimeout() == rhs.closeQueueTimeout() &&
629 lhs.hostHealthMonitor() == rhs.hostHealthMonitor() &&
630 lhs.traceContext() == rhs.traceContext() &&
631 lhs.tracer() == rhs.tracer();
632}
633
635 const bmqt::SessionOptions& rhs)
636{
637 return lhs.brokerUri() != rhs.brokerUri() ||
639 lhs.blobBufferSize() != rhs.blobBufferSize() ||
641 lhs.statsDumpInterval() != rhs.statsDumpInterval() ||
642 lhs.connectTimeout() != rhs.connectTimeout() ||
643 lhs.openQueueTimeout() != rhs.openQueueTimeout() ||
645 lhs.closeQueueTimeout() != rhs.closeQueueTimeout() ||
648 lhs.hostHealthMonitor() != rhs.hostHealthMonitor() ||
649 lhs.traceContext() != rhs.traceContext() ||
650 lhs.tracer() != rhs.tracer();
651}
652
653inline bsl::ostream& bmqt::operator<<(bsl::ostream& stream,
654 const bmqt::SessionOptions& rhs)
655{
656 return rhs.print(stream, 0, -1);
657}
658
659} // close enterprise namespace
660
661#endif
A pure interface for a context with a notion of a current span.
Definition bmqpi_dtcontext.h:55
Definition bmqt_sessionoptions.h:161
SessionOptions & setChannelHighWatermark(bsls::Types::Int64 value)
Definition bmqt_sessionoptions.h:422
bsls::Types::Int64 channelHighWatermark() const
Get the channel high watermark.
Definition bmqt_sessionoptions.h:542
SessionOptions & setCloseQueueTimeout(const bsls::TimeInterval &value)
Set the timeout for closing a queue to the specified value.
Definition bmqt_sessionoptions.h:477
int eventQueueSize() const
Definition bmqt_sessionoptions.h:604
const bsls::TimeInterval & closeQueueTimeout() const
Get the timeout for closing a queue.
Definition bmqt_sessionoptions.h:572
SessionOptions & setConfigureQueueTimeout(const bsls::TimeInterval &value)
Definition bmqt_sessionoptions.h:470
int eventQueueHighWatermark() const
Definition bmqt_sessionoptions.h:599
int numProcessingThreads() const
Get the number of processing threads.
Definition bmqt_sessionoptions.h:532
SessionOptions & setProcessNameOverride(const bslstl::StringRef &value)
Set an override of the proces name to the specified value.
Definition bmqt_sessionoptions.h:403
const bsls::TimeInterval & disconnectTimeout() const
Get the timeout for disconnecting from the broker.
Definition bmqt_sessionoptions.h:557
static const char k_BROKER_DEFAULT_URI[]
Default URI of the bmqbrkr to connect to.
Definition bmqt_sessionoptions.h:166
bsl::ostream & print(bsl::ostream &stream, int level=0, int spacesPerLevel=4) const
int blobBufferSize() const
Get the size of the blobs buffer.
Definition bmqt_sessionoptions.h:537
SessionOptions & setOpenQueueTimeout(const bsls::TimeInterval &value)
Set the timeout for opening a queue to the specified value.
Definition bmqt_sessionoptions.h:463
SessionOptions & setBrokerUri(const bslstl::StringRef &value)
Set the broker URI to the specified value.
Definition bmqt_sessionoptions.h:396
SessionOptions & setTraceOptions(const bsl::shared_ptr< bmqpi::DTContext > &dtContext, const bsl::shared_ptr< bmqpi::DTTracer > &dtTracer)
Definition bmqt_sessionoptions.h:490
const bsl::string & processNameOverride() const
Return the process name override.
Definition bmqt_sessionoptions.h:527
SessionOptions & setHostHealthMonitor(const bsl::shared_ptr< bmqpi::HostHealthMonitor > &monitor)
Definition bmqt_sessionoptions.h:483
const bsl::shared_ptr< bmqpi::HostHealthMonitor > & hostHealthMonitor() const
Definition bmqt_sessionoptions.h:578
static const int k_QUEUE_OPERATION_DEFAULT_TIMEOUT
Definition bmqt_sessionoptions.h:173
SessionOptions & configureEventQueue(int queueSize, int lowWatermark, int highWatermark)
Definition bmqt_sessionoptions.h:501
const bsls::TimeInterval & statsDumpInterval() const
Get the stats dump interval.
Definition bmqt_sessionoptions.h:547
const bsls::TimeInterval & configureQueueTimeout() const
Get the timeout for configuring or disconfiguring a queue.
Definition bmqt_sessionoptions.h:567
const bsl::string & brokerUri() const
Get the broker URI.
Definition bmqt_sessionoptions.h:522
static const int k_BROKER_DEFAULT_PORT
Default port the bmqbrkr is listening to for client to connect.
Definition bmqt_sessionoptions.h:169
SessionOptions & setConnectTimeout(const bsls::TimeInterval &value)
Set the timeout for connecting to the broker to the specified value.
Definition bmqt_sessionoptions.h:449
const bsl::shared_ptr< bmqpi::DTTracer > & tracer() const
Get the Distributed Trace tracer object.
Definition bmqt_sessionoptions.h:589
const bsls::TimeInterval & openQueueTimeout() const
Get the timeout for opening a queue.
Definition bmqt_sessionoptions.h:562
SessionOptions & setStatsDumpInterval(const bsls::TimeInterval &value)
Definition bmqt_sessionoptions.h:435
const bsl::shared_ptr< bmqpi::DTContext > & traceContext() const
Get the Distributed Trace context object.
Definition bmqt_sessionoptions.h:584
int eventQueueLowWatermark() const
Definition bmqt_sessionoptions.h:594
SessionOptions & setNumProcessingThreads(int value)
Set the number of processing threads to the specified value.
Definition bmqt_sessionoptions.h:409
const bsls::TimeInterval & connectTimeout() const
Get the timeout for connecting to the broker.
Definition bmqt_sessionoptions.h:552
SessionOptions & setDisconnectTimeout(const bsls::TimeInterval &value)
Definition bmqt_sessionoptions.h:456
SessionOptions & setBlobBufferSize(int value)
Set the specified value for the size of blobs buffers.
Definition bmqt_sessionoptions.h:415
bsl::ostream & operator<<(bsl::ostream &stream, CompressionAlgorithmType::Enum value)
Definition bmqt_compressionalgorithmtype.h:141
bool operator!=(const CorrelationId &lhs, const CorrelationId &rhs)
Definition bmqt_correlationid.h:582
bool operator==(const CorrelationId &lhs, const CorrelationId &rhs)
Definition bmqt_correlationid.h:576
Definition bmqa_abstractsession.h:42