libbmq  a5f8a06ba1d16cb5a65643e1fa7f1a1d6aadef40
bmqt_subscription.h
Go to the documentation of this file.
1 // Copyright 2022-2023 Bloomberg Finance L.P.
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 
16 // bmqt_subscription.h -*-C++-*-
17 #ifndef INCLUDED_BMQT_SUBSCRIPTION
18 #define INCLUDED_BMQT_SUBSCRIPTION
19 
28 
29 // BMQ
30 
31 #include <bmqt_correlationid.h>
32 
33 // BDE
34 #include <bsl_optional.h>
35 #include <bsl_ostream.h>
36 #include <bsl_string.h>
37 #include <bslim_printer.h>
38 #include <bslma_allocator.h>
39 #include <bsls_types.h>
40 
41 namespace BloombergLP {
42 
43 // FORWARD DECLARATION
44 namespace bmqa {
45 struct MessageImpl;
46 }
47 namespace bmqa {
48 class MessageIterator;
49 }
50 
51 namespace bmqt {
52 
53 class QueueOptions;
54 // =========================
55 // class Subscription_Handle
56 // =========================
57 
60  friend struct bmqa::MessageImpl;
61  friend class bmqa::MessageIterator;
62 
63  public:
65  static const unsigned int k_INVALID_HANDLE_ID = 0;
66 
67  private:
68  // PRIVATE DATA
69 
71  unsigned int d_id;
72 
74  bmqt::CorrelationId d_correlationId;
75 
76  private:
77  // PRIVATE CLASS METHODS
78  static unsigned int nextId();
79 
80  // PRIVATE CREATORS
82  SubscriptionHandle(unsigned int id, const bmqt::CorrelationId& cid);
83 
84  public:
85  // CREATORS
87 
88  // ACCESSORS
89  const bmqt::CorrelationId& correlationId() const;
90 
93  unsigned int id() const;
94 
104  bsl::ostream&
105  print(bsl::ostream& stream, int level = 0, int spacesPerLevel = 4) const;
106 
107  // FRIENDS
108  friend bool operator<(const SubscriptionHandle& lhs,
109  const SubscriptionHandle& rhs);
110  friend bool operator==(const SubscriptionHandle& lhs,
111  const SubscriptionHandle& rhs);
112  friend bool operator!=(const SubscriptionHandle& lhs,
113  const SubscriptionHandle& rhs);
114 
115  template <class HASH_ALGORITHM>
116  friend void hashAppend(HASH_ALGORITHM& hashAlgo,
117  const SubscriptionHandle& id);
118 
119  // Will only consider 'd_id' field when comparing 'lhs' and 'rhs'
120 };
121 
124  public:
125  // TYPES
126 
128  enum Enum {
130  e_NONE = 0
131 
132  ,
134  e_VERSION_1 = 1
135  };
136 
137  private:
138  bsl::string d_expression; // e.g., "firmId == foo"
139 
140  // Required to support newer style of expressions in future
141  Enum d_version;
142 
143  public:
144  // CREATORS
146  SubscriptionExpression(const bsl::string& expression, Enum version);
147 
148  // ACCESSORS
149  const bsl::string& text() const;
150 
151  Enum version() const;
152 
153  bool isValid() const;
154 
164  bsl::ostream&
165  print(bsl::ostream& stream, int level = 0, int spacesPerLevel = 4) const;
166 };
167 
170  public:
171  // PUBLIC CONSTANTS
172 
174  static const int k_CONSUMER_PRIORITY_MIN;
175 
177  static const int k_CONSUMER_PRIORITY_MAX;
178 
181  static const int k_DEFAULT_CONSUMER_PRIORITY;
182 
183  private:
184  // PRIVATE DATA
185 
188  bsl::optional<int> d_maxUnconfirmedMessages;
189 
192  bsl::optional<int> d_maxUnconfirmedBytes;
193 
195  bsl::optional<int> d_consumerPriority;
196 
197  SubscriptionExpression d_expression;
198 
199  public:
200  // CREATORS
201 
203  Subscription();
204 
212 
216 
220  Subscription& setConsumerPriority(int value);
221 
223 
224  // ACCESSORS
225 
227  int maxUnconfirmedMessages() const;
228 
230  int maxUnconfirmedBytes() const;
231 
233  int consumerPriority() const;
234 
235  const SubscriptionExpression& expression() const;
236 
240  bool hasMaxUnconfirmedMessages() const;
241 
244  bool hasMaxUnconfirmedBytes() const;
245 
248  bool hasConsumerPriority() const;
249 
259  bsl::ostream&
260  print(bsl::ostream& stream, int level = 0, int spacesPerLevel = 4) const;
261 };
262 
263 // FREE OPERATORS
264 
267 bsl::ostream& operator<<(bsl::ostream& stream, const SubscriptionHandle& rhs);
268 bsl::ostream& operator<<(bsl::ostream& stream, const Subscription& rhs);
269 
270 // ============================================================================
271 // INLINE DEFINITIONS
272 // ============================================================================
273 
274 // -------------------------
275 // class Subscription_Handle
276 // -------------------------
277 
278 inline SubscriptionHandle::SubscriptionHandle()
279 : d_id(k_INVALID_HANDLE_ID)
280 , d_correlationId()
281 {
282  // NOTHING
283 }
284 
285 inline SubscriptionHandle::SubscriptionHandle(unsigned int id,
286  const bmqt::CorrelationId& cid)
287 : d_id(id)
288 , d_correlationId(cid)
289 {
290  // NOTHING
291 }
292 
293 inline SubscriptionHandle::SubscriptionHandle(const bmqt::CorrelationId& cid)
294 : d_id(nextId())
295 , d_correlationId(cid)
296 {
297  // NOTHING
298 }
299 
300 // ACCESSORS
302 {
303  return d_correlationId;
304 }
305 
306 inline unsigned int SubscriptionHandle::id() const
307 {
308  return d_id;
309 }
310 
311 inline bsl::ostream& SubscriptionHandle::print(bsl::ostream& stream,
312  int level,
313  int spacesPerLevel) const
314 {
315  if (stream.bad()) {
316  return stream; // RETURN
317  }
318 
319  bslim::Printer printer(&stream, level, spacesPerLevel);
320  printer.start();
321 
322  printer.printAttribute("id", d_id);
323  printer.printAttribute("CorrelationId", d_correlationId);
324 
325  printer.end();
326 
327  return stream;
328 }
329 // ----------------------------
330 // class SubscriptionExpression
331 // ----------------------------
333 : d_expression()
334 , d_version(e_NONE)
335 {
336  // NOTHING
337 }
338 
340  const bsl::string& expression,
341  Enum version)
342 : d_expression(expression)
343 , d_version(version)
344 {
345  // NOTHING
346 }
347 
348 inline const bsl::string& SubscriptionExpression::text() const
349 {
350  return d_expression;
351 }
352 
354 {
355  return d_version;
356 }
357 
359 {
360  return d_expression.length() == 0 ? d_version == e_NONE
361  : d_version > e_NONE;
362 }
363 
364 inline bsl::ostream& SubscriptionExpression::print(bsl::ostream& stream,
365  int level,
366  int spacesPerLevel) const
367 {
368  if (stream.bad()) {
369  return stream; // RETURN
370  }
371 
372  bslim::Printer printer(&stream, level, spacesPerLevel);
373  printer.start();
374 
375  printer.printAttribute("Expression", d_expression);
376  printer.printAttribute("Version", d_version);
377 
378  printer.end();
379 
380  return stream;
381 }
382 
383 // ------------------
384 // class Subscription
385 // ------------------
386 
388 : d_maxUnconfirmedMessages()
389 , d_maxUnconfirmedBytes()
390 , d_consumerPriority()
391 , d_expression()
392 {
393  // NOTHING
394 }
395 
396 inline bsl::ostream&
397 Subscription::print(bsl::ostream& stream, int level, int spacesPerLevel) const
398 {
399  if (stream.bad()) {
400  return stream; // RETURN
401  }
402 
403  bslim::Printer printer(&stream, level, spacesPerLevel);
404  printer.start();
405  printer.printAttribute("maxUnconfirmedMessages", maxUnconfirmedMessages());
406  printer.printAttribute("maxUnconfirmedBytes", maxUnconfirmedBytes());
407  printer.printAttribute("consumerPriority", consumerPriority());
408 
409  d_expression.print(stream, level, spacesPerLevel);
410 
411  printer.end();
412 
413  return stream;
414 }
415 
416 // MANIPULATORS
418 {
419  d_maxUnconfirmedMessages.emplace(value);
420  return *this;
421 }
422 
424 {
425  d_maxUnconfirmedBytes.emplace(value);
426  return *this;
427 }
428 
430 {
431  d_consumerPriority.emplace(value);
432  return *this;
433 }
434 
435 inline Subscription&
437 {
438  d_expression = value;
439  return *this;
440 }
441 
442 // ACCESSORS
444 {
445  return d_maxUnconfirmedMessages.value_or(
447 }
448 
450 {
451  return d_maxUnconfirmedBytes.value_or(k_DEFAULT_MAX_UNCONFIRMED_BYTES);
452 }
453 
455 {
456  return d_consumerPriority.value_or(k_DEFAULT_CONSUMER_PRIORITY);
457 }
458 
460 {
461  return d_expression;
462 }
463 
465 {
466  return d_maxUnconfirmedMessages.has_value();
467 }
468 
470 {
471  return d_maxUnconfirmedBytes.has_value();
472 }
473 
475 {
476  return d_consumerPriority.has_value();
477 }
478 
479 // FREE FUNCTIONS
480 
482 template <class HASH_ALGORITHM>
483 void hashAppend(HASH_ALGORITHM& hashAlgo, const SubscriptionHandle& id)
484 {
485  using bslh::hashAppend; // for ADL
486  hashAppend(hashAlgo, id.d_id);
487 }
488 
489 inline bool operator<(const SubscriptionHandle& lhs,
490  const SubscriptionHandle& rhs)
491 {
492  return lhs.d_id < rhs.d_id;
493 }
494 
495 inline bool operator==(const SubscriptionHandle& lhs,
496  const SubscriptionHandle& rhs)
497 {
498  return lhs.d_id == rhs.d_id;
499 }
500 
501 inline bool operator!=(const SubscriptionHandle& lhs,
502  const SubscriptionHandle& rhs)
503 {
504  return lhs.d_id != rhs.d_id;
505 }
506 
507 inline bsl::ostream& operator<<(bsl::ostream& stream,
508  const SubscriptionHandle& rhs)
509 {
510  return rhs.print(stream, 0, -1);
511 }
512 
513 inline bsl::ostream& operator<<(bsl::ostream& stream, const Subscription& rhs)
514 {
515  return rhs.print(stream, 0, -1);
516 }
517 
518 } // close package namespace
519 
520 } // close enterprise namespace
521 
522 #endif
Provide a value-semantic type usable as an efficient identifier.
Definition: bmqa_messageiterator.h:87
Definition: bmqt_correlationid.h:193
Value-semantic type to carry Subscription criteria.
Definition: bmqt_subscription.h:123
const bsl::string & text() const
Definition: bmqt_subscription.h:348
SubscriptionExpression()
Definition: bmqt_subscription.h:332
bsl::ostream & print(bsl::ostream &stream, int level=0, int spacesPerLevel=4) const
Definition: bmqt_subscription.h:364
Enum
Enum representing criteria format.
Definition: bmqt_subscription.h:128
@ e_NONE
EMPTY.
Definition: bmqt_subscription.h:130
@ e_VERSION_1
Simple Evaluator.
Definition: bmqt_subscription.h:134
Enum version() const
Definition: bmqt_subscription.h:353
bool isValid() const
Definition: bmqt_subscription.h:358
Value-semantic type for unique Subscription id.
Definition: bmqt_subscription.h:59
bsl::ostream & print(bsl::ostream &stream, int level=0, int spacesPerLevel=4) const
Definition: bmqt_subscription.h:311
friend struct bmqa::MessageImpl
Definition: bmqt_subscription.h:60
const bmqt::CorrelationId & correlationId() const
Definition: bmqt_subscription.h:301
friend bool operator!=(const SubscriptionHandle &lhs, const SubscriptionHandle &rhs)
Definition: bmqt_subscription.h:501
unsigned int id() const
Definition: bmqt_subscription.h:306
friend bool operator<(const SubscriptionHandle &lhs, const SubscriptionHandle &rhs)
Definition: bmqt_subscription.h:489
static const unsigned int k_INVALID_HANDLE_ID
Initial (invalid) value for bmqt::SubscriptionHandle::d_id
Definition: bmqt_subscription.h:65
friend bool operator==(const SubscriptionHandle &lhs, const SubscriptionHandle &rhs)
Definition: bmqt_subscription.h:495
friend void hashAppend(HASH_ALGORITHM &hashAlgo, const SubscriptionHandle &id)
Apply the specified hashAlgo to the specified id.
Definition: bmqt_subscription.h:483
Value-semantic type to carry Subscription parameters.
Definition: bmqt_subscription.h:169
bool hasMaxUnconfirmedBytes() const
Definition: bmqt_subscription.h:469
bool hasMaxUnconfirmedMessages() const
Definition: bmqt_subscription.h:464
Subscription & setConsumerPriority(int value)
Definition: bmqt_subscription.h:429
static const int k_DEFAULT_MAX_UNCONFIRMED_MESSAGES
Definition: bmqt_subscription.h:179
int consumerPriority() const
Get the number for the consumerPriority parameter.
Definition: bmqt_subscription.h:454
static const int k_DEFAULT_CONSUMER_PRIORITY
Definition: bmqt_subscription.h:181
Subscription()
Create a new Subscription.
Definition: bmqt_subscription.h:387
Subscription & setMaxUnconfirmedBytes(int value)
Definition: bmqt_subscription.h:423
static const int k_CONSUMER_PRIORITY_MIN
Constant representing the minimum valid consumer priority.
Definition: bmqt_subscription.h:174
Subscription & setMaxUnconfirmedMessages(int value)
Definition: bmqt_subscription.h:417
static const int k_DEFAULT_MAX_UNCONFIRMED_BYTES
Definition: bmqt_subscription.h:180
const SubscriptionExpression & expression() const
Definition: bmqt_subscription.h:459
int maxUnconfirmedMessages() const
Get the number for the maxUnconfirmedMessages parameter.
Definition: bmqt_subscription.h:443
Subscription & setExpression(const SubscriptionExpression &value)
Definition: bmqt_subscription.h:436
static const int k_CONSUMER_PRIORITY_MAX
Constant representing the maximum valid consumer priority.
Definition: bmqt_subscription.h:177
int maxUnconfirmedBytes() const
Get the number for the maxUnconfirmedBytes parameter.
Definition: bmqt_subscription.h:449
bsl::ostream & print(bsl::ostream &stream, int level=0, int spacesPerLevel=4) const
Definition: bmqt_subscription.h:397
bool hasConsumerPriority() const
Definition: bmqt_subscription.h:474
void hashAppend(HASH_ALGORITHM &hashAlgo, const SubscriptionHandle &id)
Apply the specified hashAlgo to the specified id.
Definition: bmqt_subscription.h:483
void hashAppend(HASH_ALGORITHM &hashAlgo, const CorrelationId &value)
Definition: bmqt_correlationid.h:504
bsl::ostream & operator<<(bsl::ostream &stream, CompressionAlgorithmType::Enum value)
Definition: bmqt_compressionalgorithmtype.h:141
bool operator!=(const CorrelationId &lhs, const CorrelationId &rhs)
Definition: bmqt_correlationid.h:582
bool operator==(const CorrelationId &lhs, const CorrelationId &rhs)
Definition: bmqt_correlationid.h:576
bool operator<(const CorrelationId &lhs, const CorrelationId &rhs)
Definition: bmqt_correlationid.h:588
Definition: bmqa_abstractsession.h:42