libbmq  e19ff338c707b114e9f84d83ea866a97518afb37
bmqt_sessionoptions.h
Go to the documentation of this file.
1 // Copyright 2014-2023 Bloomberg Finance L.P.
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 
16 // bmqt_sessionoptions.h -*-C++-*-
17 #ifndef INCLUDED_BMQT_SESSIONOPTIONS
18 #define INCLUDED_BMQT_SESSIONOPTIONS
19 
124 
125 // BMQ
126 
127 // BDE
128 #include <bdlt_timeunitratio.h>
129 #include <bsl_iosfwd.h>
130 #include <bsl_memory.h>
131 #include <bsl_string.h>
132 #include <bslma_allocator.h>
133 #include <bslma_usesbslmaallocator.h>
134 #include <bslmf_nestedtraitdeclaration.h>
135 #include <bsls_annotation.h>
136 #include <bsls_assert.h>
137 #include <bsls_timeinterval.h>
138 #include <bsls_types.h>
139 
140 namespace BloombergLP {
141 
142 // FORWARD DECLARATIONS
143 namespace bmqpi {
144 class DTContext;
145 }
146 namespace bmqpi {
147 class DTTracer;
148 }
149 namespace bmqpi {
150 class HostHealthMonitor;
151 }
152 
153 namespace bmqt {
154 
155 // ====================
156 // class SessionOptions
157 // ====================
158 
162  public:
163  // CONSTANTS
164 
166  static const char k_BROKER_DEFAULT_URI[];
167 
169  static const int k_BROKER_DEFAULT_PORT = 30114;
170 
174  5 * bdlt::TimeUnitRatio::k_SECONDS_PER_MINUTE;
175 
176  private:
177  // DATA
178 
181  bsl::string d_brokerUri;
182 
185  bsl::string d_processNameOverride;
186 
188  int d_numProcessingThreads;
189 
191  int d_blobBufferSize;
192 
194  bsls::Types::Int64 d_channelHighWatermark;
195 
197  bsls::TimeInterval d_statsDumpInterval;
198 
199  bsls::TimeInterval d_connectTimeout;
200 
201  bsls::TimeInterval d_disconnectTimeout;
202 
203  bsls::TimeInterval d_openQueueTimeout;
204 
205  bsls::TimeInterval d_configureQueueTimeout;
206 
208  bsls::TimeInterval d_closeQueueTimeout;
209 
210  int d_eventQueueLowWatermark;
211 
213  int d_eventQueueHighWatermark;
214 
217  int d_eventQueueSize;
218 
219  bsl::shared_ptr<bmqpi::HostHealthMonitor> d_hostHealthMonitor_sp;
220 
221  bsl::shared_ptr<bmqpi::DTContext> d_dtContext_sp;
222  bsl::shared_ptr<bmqpi::DTTracer> d_dtTracer_sp;
223 
224  public:
225  // TRAITS
226  BSLMF_NESTED_TRAIT_DECLARATION(SessionOptions, bslma::UsesBslmaAllocator)
227 
228  // CREATORS
229 
230 
232  explicit SessionOptions(bslma::Allocator* allocator = 0);
233 
237  bslma::Allocator* allocator = 0);
238 
239  // MANIPULATORS
240 
242  SessionOptions& setBrokerUri(const bslstl::StringRef& value);
243 
245  SessionOptions& setProcessNameOverride(const bslstl::StringRef& value);
246 
249 
251  SessionOptions& setBlobBufferSize(int value);
252 
256  SessionOptions& setChannelHighWatermark(bsls::Types::Int64 value);
257 
261  SessionOptions& setStatsDumpInterval(const bsls::TimeInterval& value);
262 
264  SessionOptions& setConnectTimeout(const bsls::TimeInterval& value);
265 
268  SessionOptions& setDisconnectTimeout(const bsls::TimeInterval& value);
269 
271  SessionOptions& setOpenQueueTimeout(const bsls::TimeInterval& value);
272 
275  SessionOptions& setConfigureQueueTimeout(const bsls::TimeInterval& value);
276 
278  SessionOptions& setCloseQueueTimeout(const bsls::TimeInterval& value);
279 
283  const bsl::shared_ptr<bmqpi::HostHealthMonitor>& monitor);
284 
290  setTraceOptions(const bsl::shared_ptr<bmqpi::DTContext>& dtContext,
291  const bsl::shared_ptr<bmqpi::DTTracer>& dtTracer);
292 
299  configureEventQueue(int queueSize, int lowWatermark, int highWatermark);
300 
305  SessionOptions& configureEventQueue(int lowWatermark, int highWatermark);
306 
307  // ACCESSORS
308 
310  const bsl::string& brokerUri() const;
311 
313  const bsl::string& processNameOverride() const;
314 
316  int numProcessingThreads() const;
317 
319  int blobBufferSize() const;
320 
322  bsls::Types::Int64 channelHighWatermark() const;
323 
325  const bsls::TimeInterval& statsDumpInterval() const;
326 
328  const bsls::TimeInterval& connectTimeout() const;
329 
331  const bsls::TimeInterval& disconnectTimeout() const;
332 
334  const bsls::TimeInterval& openQueueTimeout() const;
335 
337  const bsls::TimeInterval& configureQueueTimeout() const;
338 
340  const bsls::TimeInterval& closeQueueTimeout() const;
341 
342  const bsl::shared_ptr<bmqpi::HostHealthMonitor>& hostHealthMonitor() const;
343 
345  const bsl::shared_ptr<bmqpi::DTContext>& traceContext() const;
346 
348  const bsl::shared_ptr<bmqpi::DTTracer>& tracer() const;
349 
350  int eventQueueLowWatermark() const;
351  int eventQueueHighWatermark() const;
352 
355  int eventQueueSize() const;
356 
366  bsl::ostream&
367  print(bsl::ostream& stream, int level = 0, int spacesPerLevel = 4) const;
368 };
369 
370 // FREE OPERATORS
371 
375 bool operator==(const SessionOptions& lhs, const SessionOptions& rhs);
376 
380 bool operator!=(const SessionOptions& lhs, const SessionOptions& rhs);
381 
384 bsl::ostream& operator<<(bsl::ostream& stream, const SessionOptions& rhs);
385 
386 // ============================================================================
387 // INLINE DEFINITIONS
388 // ============================================================================
389 
390 // --------------------
391 // class SessionOptions
392 // --------------------
393 
394 // MANIPULATORS
395 inline SessionOptions&
396 SessionOptions::setBrokerUri(const bslstl::StringRef& value)
397 {
398  d_brokerUri.assign(value.data(), value.length());
399  return *this;
400 }
401 
402 inline SessionOptions&
403 SessionOptions::setProcessNameOverride(const bslstl::StringRef& value)
404 {
405  d_processNameOverride.assign(value.data(), value.length());
406  return *this;
407 }
408 
410 {
411  d_numProcessingThreads = value;
412  return *this;
413 }
414 
416 {
417  d_blobBufferSize = value;
418  return *this;
419 }
420 
421 inline SessionOptions&
423 {
424  // PRECONDITIONS
425  BSLS_ASSERT_OPT(8 * 1024 * 1024 < value);
426  // We reserve 4MB for control message (see
427  // 'bmqimp::BrokerSession::k_CONTROL_DATA_WATERMARK_EXTRA') so make
428  // sure the provided value is greater than it.
429 
430  d_channelHighWatermark = value;
431  return *this;
432 }
433 
434 inline SessionOptions&
435 SessionOptions::setStatsDumpInterval(const bsls::TimeInterval& value)
436 {
437  // PRECONDITIONS
438  BSLS_ASSERT_OPT(value.seconds() % 30 == 0 &&
439  "value must be a multiple of 30s");
440  BSLS_ASSERT_OPT(value.seconds() <
441  bdlt::TimeUnitRatio::k_SECONDS_PER_HOUR &&
442  "value must be < 10 min");
443 
444  d_statsDumpInterval = value;
445  return *this;
446 }
447 
448 inline SessionOptions&
449 SessionOptions::setConnectTimeout(const bsls::TimeInterval& value)
450 {
451  d_connectTimeout = value;
452  return *this;
453 }
454 
455 inline SessionOptions&
456 SessionOptions::setDisconnectTimeout(const bsls::TimeInterval& value)
457 {
458  d_disconnectTimeout = value;
459  return *this;
460 }
461 
462 inline SessionOptions&
463 SessionOptions::setOpenQueueTimeout(const bsls::TimeInterval& value)
464 {
465  d_openQueueTimeout = value;
466  return *this;
467 }
468 
469 inline SessionOptions&
470 SessionOptions::setConfigureQueueTimeout(const bsls::TimeInterval& value)
471 {
472  d_configureQueueTimeout = value;
473  return *this;
474 }
475 
476 inline SessionOptions&
477 SessionOptions::setCloseQueueTimeout(const bsls::TimeInterval& value)
478 {
479  d_closeQueueTimeout = value;
480  return *this;
481 }
482 
484  const bsl::shared_ptr<bmqpi::HostHealthMonitor>& monitor)
485 {
486  d_hostHealthMonitor_sp = monitor;
487  return *this;
488 }
489 
491  const bsl::shared_ptr<bmqpi::DTContext>& context,
492  const bsl::shared_ptr<bmqpi::DTTracer>& tracer)
493 {
494  BSLS_ASSERT_OPT((context && tracer) || (!context && !tracer));
495 
496  d_dtContext_sp = context;
497  d_dtTracer_sp = tracer;
498  return *this;
499 }
500 
502  int lowWatermark,
503  int highWatermark)
504 {
505  d_eventQueueSize = queueSize;
506  return configureEventQueue(lowWatermark, highWatermark);
507 }
508 
510  int highWatermark)
511 {
512  // PRECONDITIONS
513  BSLS_ASSERT_OPT(lowWatermark < highWatermark);
514 
515  d_eventQueueLowWatermark = lowWatermark;
516  d_eventQueueHighWatermark = highWatermark;
517 
518  return *this;
519 }
520 
521 // ACCESSORS
522 inline const bsl::string& SessionOptions::brokerUri() const
523 {
524  return d_brokerUri;
525 }
526 
527 inline const bsl::string& SessionOptions::processNameOverride() const
528 {
529  return d_processNameOverride;
530 }
531 
533 {
534  return d_numProcessingThreads;
535 }
536 
538 {
539  return d_blobBufferSize;
540 }
541 
542 inline bsls::Types::Int64 SessionOptions::channelHighWatermark() const
543 {
544  return d_channelHighWatermark;
545 }
546 
547 inline const bsls::TimeInterval& SessionOptions::statsDumpInterval() const
548 {
549  return d_statsDumpInterval;
550 }
551 
552 inline const bsls::TimeInterval& SessionOptions::connectTimeout() const
553 {
554  return d_connectTimeout;
555 }
556 
557 inline const bsls::TimeInterval& SessionOptions::disconnectTimeout() const
558 {
559  return d_disconnectTimeout;
560 }
561 
562 inline const bsls::TimeInterval& SessionOptions::openQueueTimeout() const
563 {
564  return d_openQueueTimeout;
565 }
566 
567 inline const bsls::TimeInterval& SessionOptions::configureQueueTimeout() const
568 {
569  return d_configureQueueTimeout;
570 }
571 
572 inline const bsls::TimeInterval& SessionOptions::closeQueueTimeout() const
573 {
574  return d_closeQueueTimeout;
575 }
576 
577 inline const bsl::shared_ptr<bmqpi::HostHealthMonitor>&
579 {
580  return d_hostHealthMonitor_sp;
581 }
582 
583 inline const bsl::shared_ptr<bmqpi::DTContext>&
585 {
586  return d_dtContext_sp;
587 }
588 
589 inline const bsl::shared_ptr<bmqpi::DTTracer>& SessionOptions::tracer() const
590 {
591  return d_dtTracer_sp;
592 }
593 
595 {
596  return d_eventQueueLowWatermark;
597 }
598 
600 {
601  return d_eventQueueHighWatermark;
602 }
603 
605 {
606  return d_eventQueueSize;
607 }
608 
609 } // close package namespace
610 
611 // --------------------
612 // class SessionOptions
613 // --------------------
614 
615 inline bool bmqt::operator==(const bmqt::SessionOptions& lhs,
616  const bmqt::SessionOptions& rhs)
617 {
618  return lhs.brokerUri() == rhs.brokerUri() &&
620  lhs.blobBufferSize() == rhs.blobBufferSize() &&
622  lhs.statsDumpInterval() == rhs.statsDumpInterval() &&
623  lhs.connectTimeout() == rhs.connectTimeout() &&
624  lhs.openQueueTimeout() == rhs.openQueueTimeout() &&
626  lhs.closeQueueTimeout() == rhs.closeQueueTimeout() &&
629  lhs.hostHealthMonitor() == rhs.hostHealthMonitor() &&
630  lhs.traceContext() == rhs.traceContext() &&
631  lhs.tracer() == rhs.tracer();
632 }
633 
634 inline bool bmqt::operator!=(const bmqt::SessionOptions& lhs,
635  const bmqt::SessionOptions& rhs)
636 {
637  return lhs.brokerUri() != rhs.brokerUri() ||
639  lhs.blobBufferSize() != rhs.blobBufferSize() ||
641  lhs.statsDumpInterval() != rhs.statsDumpInterval() ||
642  lhs.connectTimeout() != rhs.connectTimeout() ||
643  lhs.openQueueTimeout() != rhs.openQueueTimeout() ||
645  lhs.closeQueueTimeout() != rhs.closeQueueTimeout() ||
648  lhs.hostHealthMonitor() != rhs.hostHealthMonitor() ||
649  lhs.traceContext() != rhs.traceContext() ||
650  lhs.tracer() != rhs.tracer();
651 }
652 
653 inline bsl::ostream& bmqt::operator<<(bsl::ostream& stream,
654  const bmqt::SessionOptions& rhs)
655 {
656  return rhs.print(stream, 0, -1);
657 }
658 
659 } // close enterprise namespace
660 
661 #endif
A pure interface for a context with a notion of a current span.
Definition: bmqpi_dtcontext.h:55
Definition: bmqt_sessionoptions.h:161
SessionOptions & setChannelHighWatermark(bsls::Types::Int64 value)
Definition: bmqt_sessionoptions.h:422
bsls::Types::Int64 channelHighWatermark() const
Get the channel high watermark.
Definition: bmqt_sessionoptions.h:542
SessionOptions & setCloseQueueTimeout(const bsls::TimeInterval &value)
Set the timeout for closing a queue to the specified value.
Definition: bmqt_sessionoptions.h:477
int eventQueueSize() const
Definition: bmqt_sessionoptions.h:604
const bsls::TimeInterval & closeQueueTimeout() const
Get the timeout for closing a queue.
Definition: bmqt_sessionoptions.h:572
SessionOptions & setConfigureQueueTimeout(const bsls::TimeInterval &value)
Definition: bmqt_sessionoptions.h:470
int eventQueueHighWatermark() const
Definition: bmqt_sessionoptions.h:599
int numProcessingThreads() const
Get the number of processing threads.
Definition: bmqt_sessionoptions.h:532
SessionOptions & setProcessNameOverride(const bslstl::StringRef &value)
Set an override of the proces name to the specified value.
Definition: bmqt_sessionoptions.h:403
const bsls::TimeInterval & disconnectTimeout() const
Get the timeout for disconnecting from the broker.
Definition: bmqt_sessionoptions.h:557
static const char k_BROKER_DEFAULT_URI[]
Default URI of the bmqbrkr to connect to.
Definition: bmqt_sessionoptions.h:166
int blobBufferSize() const
Get the size of the blobs buffer.
Definition: bmqt_sessionoptions.h:537
SessionOptions & setOpenQueueTimeout(const bsls::TimeInterval &value)
Set the timeout for opening a queue to the specified value.
Definition: bmqt_sessionoptions.h:463
SessionOptions & setBrokerUri(const bslstl::StringRef &value)
Set the broker URI to the specified value.
Definition: bmqt_sessionoptions.h:396
SessionOptions & setTraceOptions(const bsl::shared_ptr< bmqpi::DTContext > &dtContext, const bsl::shared_ptr< bmqpi::DTTracer > &dtTracer)
Definition: bmqt_sessionoptions.h:490
bsl::ostream & print(bsl::ostream &stream, int level=0, int spacesPerLevel=4) const
const bsl::string & processNameOverride() const
Return the process name override.
Definition: bmqt_sessionoptions.h:527
SessionOptions & setHostHealthMonitor(const bsl::shared_ptr< bmqpi::HostHealthMonitor > &monitor)
Definition: bmqt_sessionoptions.h:483
const bsl::shared_ptr< bmqpi::HostHealthMonitor > & hostHealthMonitor() const
Definition: bmqt_sessionoptions.h:578
static const int k_QUEUE_OPERATION_DEFAULT_TIMEOUT
Definition: bmqt_sessionoptions.h:173
SessionOptions & configureEventQueue(int queueSize, int lowWatermark, int highWatermark)
Definition: bmqt_sessionoptions.h:501
const bsls::TimeInterval & statsDumpInterval() const
Get the stats dump interval.
Definition: bmqt_sessionoptions.h:547
const bsls::TimeInterval & configureQueueTimeout() const
Get the timeout for configuring or disconfiguring a queue.
Definition: bmqt_sessionoptions.h:567
const bsl::string & brokerUri() const
Get the broker URI.
Definition: bmqt_sessionoptions.h:522
static const int k_BROKER_DEFAULT_PORT
Default port the bmqbrkr is listening to for client to connect.
Definition: bmqt_sessionoptions.h:169
SessionOptions & setConnectTimeout(const bsls::TimeInterval &value)
Set the timeout for connecting to the broker to the specified value.
Definition: bmqt_sessionoptions.h:449
const bsl::shared_ptr< bmqpi::DTTracer > & tracer() const
Get the Distributed Trace tracer object.
Definition: bmqt_sessionoptions.h:589
const bsls::TimeInterval & openQueueTimeout() const
Get the timeout for opening a queue.
Definition: bmqt_sessionoptions.h:562
SessionOptions & setStatsDumpInterval(const bsls::TimeInterval &value)
Definition: bmqt_sessionoptions.h:435
const bsl::shared_ptr< bmqpi::DTContext > & traceContext() const
Get the Distributed Trace context object.
Definition: bmqt_sessionoptions.h:584
int eventQueueLowWatermark() const
Definition: bmqt_sessionoptions.h:594
SessionOptions & setNumProcessingThreads(int value)
Set the number of processing threads to the specified value.
Definition: bmqt_sessionoptions.h:409
const bsls::TimeInterval & connectTimeout() const
Get the timeout for connecting to the broker.
Definition: bmqt_sessionoptions.h:552
SessionOptions & setDisconnectTimeout(const bsls::TimeInterval &value)
Definition: bmqt_sessionoptions.h:456
SessionOptions & setBlobBufferSize(int value)
Set the specified value for the size of blobs buffers.
Definition: bmqt_sessionoptions.h:415
bsl::ostream & operator<<(bsl::ostream &stream, CompressionAlgorithmType::Enum value)
Definition: bmqt_compressionalgorithmtype.h:141
bool operator!=(const CorrelationId &lhs, const CorrelationId &rhs)
Definition: bmqt_correlationid.h:582
bool operator==(const CorrelationId &lhs, const CorrelationId &rhs)
Definition: bmqt_correlationid.h:576
Definition: bmqa_abstractsession.h:42