libbmq b6028b29b733bc7541593d2905a5f79a9f0192fc
Loading...
Searching...
No Matches
bmqt_sessionoptions.h
Go to the documentation of this file.
1// Copyright 2014-2023 Bloomberg Finance L.P.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16// bmqt_sessionoptions.h -*-C++-*-
17#ifndef INCLUDED_BMQT_SESSIONOPTIONS
18#define INCLUDED_BMQT_SESSIONOPTIONS
19
124
125// BMQ
126
127// BDE
128#include <bdlt_timeunitratio.h>
129#include <bsl_iosfwd.h>
130#include <bsl_memory.h>
131#include <bsl_string.h>
132#include <bsla_annotations.h>
133#include <bslma_allocator.h>
134#include <bslma_usesbslmaallocator.h>
135#include <bslmf_nestedtraitdeclaration.h>
136#include <bsls_assert.h>
137#include <bsls_timeinterval.h>
138#include <bsls_types.h>
139
140namespace BloombergLP {
141
142// FORWARD DECLARATIONS
143namespace bmqpi {
144class DTContext;
145}
146namespace bmqpi {
147class DTTracer;
148}
149namespace bmqpi {
150class HostHealthMonitor;
151}
152
153namespace bmqt {
154
155// ====================
156// class SessionOptions
157// ====================
158
162 public:
163 // CONSTANTS
164
166 static const char k_BROKER_DEFAULT_URI[];
167
169 static const int k_BROKER_DEFAULT_PORT = 30114;
170
174 5 * bdlt::TimeUnitRatio::k_SECONDS_PER_MINUTE;
175
176 private:
177 // DATA
178
181 bsl::string d_brokerUri;
182
185 bsl::string d_processNameOverride;
186
188 int d_numProcessingThreads;
189
191 int d_blobBufferSize;
192
194 bsls::Types::Int64 d_channelHighWatermark;
195
197 bsls::TimeInterval d_statsDumpInterval;
198
199 bsls::TimeInterval d_connectTimeout;
200
201 bsls::TimeInterval d_disconnectTimeout;
202
203 bsls::TimeInterval d_openQueueTimeout;
204
205 bsls::TimeInterval d_configureQueueTimeout;
206
208 bsls::TimeInterval d_closeQueueTimeout;
209
210 int d_eventQueueLowWatermark;
211
213 int d_eventQueueHighWatermark;
214
217 int d_eventQueueSize;
218
219 bsl::shared_ptr<bmqpi::HostHealthMonitor> d_hostHealthMonitor_sp;
220
221 bsl::shared_ptr<bmqpi::DTContext> d_dtContext_sp;
222 bsl::shared_ptr<bmqpi::DTTracer> d_dtTracer_sp;
223
224 public:
225 // TRAITS
226 BSLMF_NESTED_TRAIT_DECLARATION(SessionOptions, bslma::UsesBslmaAllocator)
227
228 // CREATORS
229
230
232 explicit SessionOptions(bslma::Allocator* allocator = 0);
233
237 bslma::Allocator* allocator = 0);
238
239 // MANIPULATORS
240
242 SessionOptions& setBrokerUri(const bslstl::StringRef& value);
243
245 SessionOptions& setProcessNameOverride(const bslstl::StringRef& value);
246
249
252
256 SessionOptions& setChannelHighWatermark(bsls::Types::Int64 value);
257
261 SessionOptions& setStatsDumpInterval(const bsls::TimeInterval& value);
262
264 SessionOptions& setConnectTimeout(const bsls::TimeInterval& value);
265
268 SessionOptions& setDisconnectTimeout(const bsls::TimeInterval& value);
269
271 SessionOptions& setOpenQueueTimeout(const bsls::TimeInterval& value);
272
275 SessionOptions& setConfigureQueueTimeout(const bsls::TimeInterval& value);
276
278 SessionOptions& setCloseQueueTimeout(const bsls::TimeInterval& value);
279
283 const bsl::shared_ptr<bmqpi::HostHealthMonitor>& monitor);
284
290 setTraceOptions(const bsl::shared_ptr<bmqpi::DTContext>& dtContext,
291 const bsl::shared_ptr<bmqpi::DTTracer>& dtTracer);
292
298 configureEventQueue(int queueSize, int lowWatermark, int highWatermark);
299
304 SessionOptions& configureEventQueue(int lowWatermark, int highWatermark);
305
306 // ACCESSORS
307
309 const bsl::string& brokerUri() const;
310
312 const bsl::string& processNameOverride() const;
313
315 int numProcessingThreads() const;
316
318 int blobBufferSize() const;
319
321 bsls::Types::Int64 channelHighWatermark() const;
322
324 const bsls::TimeInterval& statsDumpInterval() const;
325
327 const bsls::TimeInterval& connectTimeout() const;
328
330 const bsls::TimeInterval& disconnectTimeout() const;
331
333 const bsls::TimeInterval& openQueueTimeout() const;
334
336 const bsls::TimeInterval& configureQueueTimeout() const;
337
339 const bsls::TimeInterval& closeQueueTimeout() const;
340
341 const bsl::shared_ptr<bmqpi::HostHealthMonitor>& hostHealthMonitor() const;
342
344 const bsl::shared_ptr<bmqpi::DTContext>& traceContext() const;
345
347 const bsl::shared_ptr<bmqpi::DTTracer>& tracer() const;
348
349 int eventQueueLowWatermark() const;
350 int eventQueueHighWatermark() const;
351
354 int eventQueueSize() const;
355
365 bsl::ostream&
366 print(bsl::ostream& stream, int level = 0, int spacesPerLevel = 4) const;
367};
368
369// FREE OPERATORS
370
374bool operator==(const SessionOptions& lhs, const SessionOptions& rhs);
375
379bool operator!=(const SessionOptions& lhs, const SessionOptions& rhs);
380
383bsl::ostream& operator<<(bsl::ostream& stream, const SessionOptions& rhs);
384
385// ============================================================================
386// INLINE DEFINITIONS
387// ============================================================================
388
389// --------------------
390// class SessionOptions
391// --------------------
392
393// MANIPULATORS
394inline SessionOptions&
395SessionOptions::setBrokerUri(const bslstl::StringRef& value)
396{
397 d_brokerUri.assign(value.data(), value.length());
398 return *this;
399}
400
401inline SessionOptions&
402SessionOptions::setProcessNameOverride(const bslstl::StringRef& value)
403{
404 d_processNameOverride.assign(value.data(), value.length());
405 return *this;
406}
407
409{
410 d_numProcessingThreads = value;
411 return *this;
412}
413
415{
416 d_blobBufferSize = value;
417 return *this;
418}
419
420inline SessionOptions&
422{
423 // PRECONDITIONS
424 BSLS_ASSERT_OPT(8 * 1024 * 1024 < value);
425 // We reserve 4MB for control message (see
426 // 'bmqimp::BrokerSession::k_CONTROL_DATA_WATERMARK_EXTRA') so make
427 // sure the provided value is greater than it.
428
429 d_channelHighWatermark = value;
430 return *this;
431}
432
433inline SessionOptions&
434SessionOptions::setStatsDumpInterval(const bsls::TimeInterval& value)
435{
436 // PRECONDITIONS
437 BSLS_ASSERT_OPT(value.seconds() % 30 == 0 &&
438 "value must be a multiple of 30s");
439 BSLS_ASSERT_OPT(value.seconds() <
440 bdlt::TimeUnitRatio::k_SECONDS_PER_HOUR &&
441 "value must be < 10 min");
442
443 d_statsDumpInterval = value;
444 return *this;
445}
446
447inline SessionOptions&
448SessionOptions::setConnectTimeout(const bsls::TimeInterval& value)
449{
450 d_connectTimeout = value;
451 return *this;
452}
453
454inline SessionOptions&
455SessionOptions::setDisconnectTimeout(const bsls::TimeInterval& value)
456{
457 d_disconnectTimeout = value;
458 return *this;
459}
460
461inline SessionOptions&
462SessionOptions::setOpenQueueTimeout(const bsls::TimeInterval& value)
463{
464 d_openQueueTimeout = value;
465 return *this;
466}
467
468inline SessionOptions&
469SessionOptions::setConfigureQueueTimeout(const bsls::TimeInterval& value)
470{
471 d_configureQueueTimeout = value;
472 return *this;
473}
474
475inline SessionOptions&
476SessionOptions::setCloseQueueTimeout(const bsls::TimeInterval& value)
477{
478 d_closeQueueTimeout = value;
479 return *this;
480}
481
483 const bsl::shared_ptr<bmqpi::HostHealthMonitor>& monitor)
484{
485 d_hostHealthMonitor_sp = monitor;
486 return *this;
487}
488
490 const bsl::shared_ptr<bmqpi::DTContext>& context,
491 const bsl::shared_ptr<bmqpi::DTTracer>& tracer)
492{
493 BSLS_ASSERT_OPT((context && tracer) || (!context && !tracer));
494
495 d_dtContext_sp = context;
496 d_dtTracer_sp = tracer;
497 return *this;
498}
499
501 int lowWatermark,
502 int highWatermark)
503{
504 d_eventQueueSize = queueSize;
505 return configureEventQueue(lowWatermark, highWatermark);
506}
507
509 int highWatermark)
510{
511 // PRECONDITIONS
512 BSLS_ASSERT_OPT(lowWatermark < highWatermark);
513
514 d_eventQueueLowWatermark = lowWatermark;
515 d_eventQueueHighWatermark = highWatermark;
516
517 return *this;
518}
519
520// ACCESSORS
521inline const bsl::string& SessionOptions::brokerUri() const
522{
523 return d_brokerUri;
524}
525
526inline const bsl::string& SessionOptions::processNameOverride() const
527{
528 return d_processNameOverride;
529}
530
532{
533 return d_numProcessingThreads;
534}
535
537{
538 return d_blobBufferSize;
539}
540
541inline bsls::Types::Int64 SessionOptions::channelHighWatermark() const
542{
543 return d_channelHighWatermark;
544}
545
546inline const bsls::TimeInterval& SessionOptions::statsDumpInterval() const
547{
548 return d_statsDumpInterval;
549}
550
551inline const bsls::TimeInterval& SessionOptions::connectTimeout() const
552{
553 return d_connectTimeout;
554}
555
556inline const bsls::TimeInterval& SessionOptions::disconnectTimeout() const
557{
558 return d_disconnectTimeout;
559}
560
561inline const bsls::TimeInterval& SessionOptions::openQueueTimeout() const
562{
563 return d_openQueueTimeout;
564}
565
566inline const bsls::TimeInterval& SessionOptions::configureQueueTimeout() const
567{
568 return d_configureQueueTimeout;
569}
570
571inline const bsls::TimeInterval& SessionOptions::closeQueueTimeout() const
572{
573 return d_closeQueueTimeout;
574}
575
576inline const bsl::shared_ptr<bmqpi::HostHealthMonitor>&
578{
579 return d_hostHealthMonitor_sp;
580}
581
582inline const bsl::shared_ptr<bmqpi::DTContext>&
584{
585 return d_dtContext_sp;
586}
587
588inline const bsl::shared_ptr<bmqpi::DTTracer>& SessionOptions::tracer() const
589{
590 return d_dtTracer_sp;
591}
592
594{
595 return d_eventQueueLowWatermark;
596}
597
599{
600 return d_eventQueueHighWatermark;
601}
602
604{
605 return d_eventQueueSize;
606}
607
608} // close package namespace
609
610// --------------------
611// class SessionOptions
612// --------------------
613
615 const bmqt::SessionOptions& rhs)
616{
617 return lhs.brokerUri() == rhs.brokerUri() &&
619 lhs.blobBufferSize() == rhs.blobBufferSize() &&
621 lhs.statsDumpInterval() == rhs.statsDumpInterval() &&
622 lhs.connectTimeout() == rhs.connectTimeout() &&
623 lhs.openQueueTimeout() == rhs.openQueueTimeout() &&
625 lhs.closeQueueTimeout() == rhs.closeQueueTimeout() &&
628 lhs.hostHealthMonitor() == rhs.hostHealthMonitor() &&
629 lhs.traceContext() == rhs.traceContext() &&
630 lhs.tracer() == rhs.tracer();
631}
632
634 const bmqt::SessionOptions& rhs)
635{
636 return lhs.brokerUri() != rhs.brokerUri() ||
638 lhs.blobBufferSize() != rhs.blobBufferSize() ||
640 lhs.statsDumpInterval() != rhs.statsDumpInterval() ||
641 lhs.connectTimeout() != rhs.connectTimeout() ||
642 lhs.openQueueTimeout() != rhs.openQueueTimeout() ||
644 lhs.closeQueueTimeout() != rhs.closeQueueTimeout() ||
647 lhs.hostHealthMonitor() != rhs.hostHealthMonitor() ||
648 lhs.traceContext() != rhs.traceContext() ||
649 lhs.tracer() != rhs.tracer();
650}
651
652inline bsl::ostream& bmqt::operator<<(bsl::ostream& stream,
653 const bmqt::SessionOptions& rhs)
654{
655 return rhs.print(stream, 0, -1);
656}
657
658} // close enterprise namespace
659
660#endif
A pure interface for a context with a notion of a current span.
Definition bmqpi_dtcontext.h:55
Definition bmqt_sessionoptions.h:161
SessionOptions & setChannelHighWatermark(bsls::Types::Int64 value)
Definition bmqt_sessionoptions.h:421
bsls::Types::Int64 channelHighWatermark() const
Get the channel high watermark.
Definition bmqt_sessionoptions.h:541
SessionOptions & setCloseQueueTimeout(const bsls::TimeInterval &value)
Set the timeout for closing a queue to the specified value.
Definition bmqt_sessionoptions.h:476
int eventQueueSize() const
Definition bmqt_sessionoptions.h:603
const bsls::TimeInterval & closeQueueTimeout() const
Get the timeout for closing a queue.
Definition bmqt_sessionoptions.h:571
SessionOptions & setConfigureQueueTimeout(const bsls::TimeInterval &value)
Definition bmqt_sessionoptions.h:469
int eventQueueHighWatermark() const
Definition bmqt_sessionoptions.h:598
int numProcessingThreads() const
Get the number of processing threads.
Definition bmqt_sessionoptions.h:531
SessionOptions & setProcessNameOverride(const bslstl::StringRef &value)
Set an override of the proces name to the specified value.
Definition bmqt_sessionoptions.h:402
const bsls::TimeInterval & disconnectTimeout() const
Get the timeout for disconnecting from the broker.
Definition bmqt_sessionoptions.h:556
static const char k_BROKER_DEFAULT_URI[]
Default URI of the bmqbrkr to connect to.
Definition bmqt_sessionoptions.h:166
bsl::ostream & print(bsl::ostream &stream, int level=0, int spacesPerLevel=4) const
int blobBufferSize() const
Get the size of the blobs buffer.
Definition bmqt_sessionoptions.h:536
SessionOptions & setOpenQueueTimeout(const bsls::TimeInterval &value)
Set the timeout for opening a queue to the specified value.
Definition bmqt_sessionoptions.h:462
SessionOptions & setBrokerUri(const bslstl::StringRef &value)
Set the broker URI to the specified value.
Definition bmqt_sessionoptions.h:395
SessionOptions & setTraceOptions(const bsl::shared_ptr< bmqpi::DTContext > &dtContext, const bsl::shared_ptr< bmqpi::DTTracer > &dtTracer)
Definition bmqt_sessionoptions.h:489
const bsl::string & processNameOverride() const
Return the process name override.
Definition bmqt_sessionoptions.h:526
SessionOptions & setHostHealthMonitor(const bsl::shared_ptr< bmqpi::HostHealthMonitor > &monitor)
Definition bmqt_sessionoptions.h:482
const bsl::shared_ptr< bmqpi::HostHealthMonitor > & hostHealthMonitor() const
Definition bmqt_sessionoptions.h:577
static const int k_QUEUE_OPERATION_DEFAULT_TIMEOUT
Definition bmqt_sessionoptions.h:173
SessionOptions & configureEventQueue(int queueSize, int lowWatermark, int highWatermark)
Definition bmqt_sessionoptions.h:500
const bsls::TimeInterval & statsDumpInterval() const
Get the stats dump interval.
Definition bmqt_sessionoptions.h:546
const bsls::TimeInterval & configureQueueTimeout() const
Get the timeout for configuring or disconfiguring a queue.
Definition bmqt_sessionoptions.h:566
const bsl::string & brokerUri() const
Get the broker URI.
Definition bmqt_sessionoptions.h:521
static const int k_BROKER_DEFAULT_PORT
Default port the bmqbrkr is listening to for client to connect.
Definition bmqt_sessionoptions.h:169
SessionOptions & setConnectTimeout(const bsls::TimeInterval &value)
Set the timeout for connecting to the broker to the specified value.
Definition bmqt_sessionoptions.h:448
const bsl::shared_ptr< bmqpi::DTTracer > & tracer() const
Get the Distributed Trace tracer object.
Definition bmqt_sessionoptions.h:588
const bsls::TimeInterval & openQueueTimeout() const
Get the timeout for opening a queue.
Definition bmqt_sessionoptions.h:561
SessionOptions & setStatsDumpInterval(const bsls::TimeInterval &value)
Definition bmqt_sessionoptions.h:434
const bsl::shared_ptr< bmqpi::DTContext > & traceContext() const
Get the Distributed Trace context object.
Definition bmqt_sessionoptions.h:583
int eventQueueLowWatermark() const
Definition bmqt_sessionoptions.h:593
SessionOptions & setNumProcessingThreads(int value)
Set the number of processing threads to the specified value.
Definition bmqt_sessionoptions.h:408
const bsls::TimeInterval & connectTimeout() const
Get the timeout for connecting to the broker.
Definition bmqt_sessionoptions.h:551
SessionOptions & setDisconnectTimeout(const bsls::TimeInterval &value)
Definition bmqt_sessionoptions.h:455
SessionOptions & setBlobBufferSize(int value)
Set the specified value for the size of blobs buffers.
Definition bmqt_sessionoptions.h:414
bsl::ostream & operator<<(bsl::ostream &stream, CompressionAlgorithmType::Enum value)
Definition bmqt_compressionalgorithmtype.h:141
bool operator!=(const CorrelationId &lhs, const CorrelationId &rhs)
Definition bmqt_correlationid.h:582
bool operator==(const CorrelationId &lhs, const CorrelationId &rhs)
Definition bmqt_correlationid.h:576
Definition bmqa_abstractsession.h:42