RMQ - RabbitMQ C++ Library
rmqa_producerimpl.h
1// Copyright 2020-2023 Bloomberg Finance L.P.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#ifndef INCLUDED_RMQA_PRODUCERIMPL
17#define INCLUDED_RMQA_PRODUCERIMPL
18
19#include <rmqp_messagetransformer.h>
20#include <rmqp_producer.h>
21#include <rmqt_endpoint.h>
22#include <rmqt_exchange.h>
23#include <rmqt_future.h>
24#include <rmqt_message.h>
25#include <rmqt_queue.h>
26#include <rmqt_result.h>
27
28#include <rmqamqp_sendchannel.h>
29
30#include <bdlb_guid.h>
31#include <bdlmt_threadpool.h>
32#include <bslma_managedptr.h>
33#include <bslmt_mutex.h>
34#include <bslmt_timedsemaphore.h>
35#include <bsls_keyword.h>
36#include <bsls_timeinterval.h>
37
38#include <bsl_memory.h>
39#include <bsl_string.h>
40#include <bsl_unordered_map.h>
41#include <bsl_vector.h>
42
43//@PURPOSE: Implements the rmqa::Producer interface
44//
45//@CLASSES:
46// rmqa::ProducerImpl: Manages interaction between rmqa <-> rmq internals
47
48namespace BloombergLP {
49namespace rmqio {
50class EventLoop;
51}
52namespace rmqa {
53
55 public:
56 class Factory {
57 public:
58 virtual ~Factory();
59 virtual bsl::shared_ptr<ProducerImpl>
60 create(uint16_t maxOutstandingConfirms,
61 const rmqt::ExchangeHandle& exchange,
62 const bsl::shared_ptr<rmqamqp::SendChannel>& channel,
63 bdlmt::ThreadPool& threadPool,
64 rmqio::EventLoop& eventLoop) const;
65 };
66
67 // CREATORS
68 ProducerImpl(uint16_t maxOutstandingConfirms,
69 const bsl::shared_ptr<rmqamqp::SendChannel>& channel,
70 bdlmt::ThreadPool& threadPool,
71 rmqio::EventLoop& eventLoop);
72
73 ~ProducerImpl() BSLS_KEYWORD_OVERRIDE;
74
75 void
76 addTransformer(const bsl::shared_ptr<rmqp::MessageTransformer>& transformer)
77 BSLS_KEYWORD_OVERRIDE;
78
79 SendStatus send(const rmqt::Message& message,
80 const bsl::string& routingKey,
81 const rmqp::Producer::ConfirmationCallback& confirmCallback,
82 const bsls::TimeInterval& timeout) BSLS_KEYWORD_OVERRIDE;
83
84 SendStatus send(const rmqt::Message& message,
85 const bsl::string& routingKey,
86 rmqt::Mandatory::Value mandatoryFlag,
87 const rmqp::Producer::ConfirmationCallback& confirmCallback,
88 const bsls::TimeInterval& timeout) BSLS_KEYWORD_OVERRIDE;
89
91 trySend(const rmqt::Message& message,
92 const bsl::string& routingKey,
93 const rmqp::Producer::ConfirmationCallback& confirmCallback)
94 BSLS_KEYWORD_OVERRIDE;
95
96 rmqt::Future<> updateTopologyAsync(
97 const rmqt::TopologyUpdate& topologyUpdate) BSLS_KEYWORD_OVERRIDE;
98
99 rmqt::Result<>
100 waitForConfirms(const bsls::TimeInterval& timeout = bsls::TimeInterval(0))
101 BSLS_KEYWORD_OVERRIDE;
102
103 typedef bsl::unordered_map<bdlb::Guid, rmqp::Producer::ConfirmationCallback>
104 CallbackMap;
105
106 // State shared with event loop thread
107 struct SharedState {
108 SharedState(bool _isValid,
109 bdlmt::ThreadPool& _threadPool,
110 uint16_t maxOutstandingConfirms)
111 : callbackMap()
112 , mutex()
113 , isValid(_isValid)
114 , threadPool(_threadPool)
115 , outstandingMessagesCap(maxOutstandingConfirms)
116 , waitForConfirmsFuture()
117 {
118 }
119
120 // Can only be accessed when mutex is held
121 CallbackMap callbackMap;
122
123 bslmt::Mutex mutex;
124 bool isValid;
125 bdlmt::ThreadPool& threadPool;
126 bslmt::TimedSemaphore outstandingMessagesCap;
127 bsl::optional<rmqt::Future<>::Pair> waitForConfirmsFuture;
128 };
129
130 private:
131 ProducerImpl(const ProducerImpl&) BSLS_KEYWORD_DELETED;
132 ProducerImpl& operator=(const ProducerImpl&) BSLS_KEYWORD_DELETED;
133
134 bool registerUniqueCallback(
135 const bdlb::Guid& guid,
136 const rmqp::Producer::ConfirmationCallback& confirmCallback);
137
139 doSend(const rmqt::Message& message,
140 const bsl::string& routingKey,
141 rmqt::Mandatory::Value mandatoryFlag,
142 const rmqp::Producer::ConfirmationCallback& confirmCallback);
143
145 sendImpl(const rmqt::Message& message,
146 const bsl::string& routingKey,
147 rmqt::Mandatory::Value mandatoryFlag,
148 const rmqp::Producer::ConfirmationCallback& confirmCallback,
149 const bsls::TimeInterval& timeout);
150
151 bool applyTransformations(rmqt::Message& dstMessage,
152 const rmqt::Message& srcMessage);
153
154 rmqio::EventLoop& d_eventLoop;
155
156 bsl::shared_ptr<rmqamqp::SendChannel> d_channel;
157
158 bsl::shared_ptr<SharedState> d_sharedState;
159
160 bsl::vector<bsl::shared_ptr<rmqp::MessageTransformer> > d_transformers;
161
162}; // class Producer
163
164} // namespace rmqa
165} // namespace BloombergLP
166
167#endif // ! INCLUDED_RMQA_PRODUCER
Definition: rmqa_producerimpl.h:56
Definition: rmqa_producerimpl.h:54
SendStatus send(const rmqt::Message &message, const bsl::string &routingKey, const rmqp::Producer::ConfirmationCallback &confirmCallback, const bsls::TimeInterval &timeout) BSLS_KEYWORD_OVERRIDE
Send a message with the given routingKey to the exchange targeted by the producer.
Definition: rmqa_producerimpl.cpp:175
SendStatus trySend(const rmqt::Message &message, const bsl::string &routingKey, const rmqp::Producer::ConfirmationCallback &confirmCallback) BSLS_KEYWORD_OVERRIDE
Send a message with the given routingKey to the exchange targeted by the producer.
Definition: rmqa_producerimpl.cpp:275
rmqt::Future updateTopologyAsync(const rmqt::TopologyUpdate &topologyUpdate) BSLS_KEYWORD_OVERRIDE
Updates topology.
Definition: rmqa_producerimpl.cpp:293
rmqt::Result waitForConfirms(const bsls::TimeInterval &timeout=bsls::TimeInterval(0)) BSLS_KEYWORD_OVERRIDE
Wait for all outstanding publisher confirms to arrive.
Definition: rmqa_producerimpl.cpp:321
Provide a RabbitMQ Producer API for publishing to a specific Exchange.
Definition: rmqa_producer.h:43
Definition: rmqa_topologyupdate.h:27
RabbitMQ Producer API for publishing to a specific Exchange.
Definition: rmqp_producer.h:46
bsl::function< void(const rmqt::Message &, const bsl::string &routingKey, const rmqt::ConfirmResponse &)> ConfirmationCallback
Invoked on receipt of message confirmation.
Definition: rmqp_producer.h:71
SendStatus
Possible results of rmqp::Producer::send.
Definition: rmqp_producer.h:50
An AMQP content message.
Definition: rmqt_message.h:43
Definition: rmqa_producerimpl.h:107