RMQ - RabbitMQ C++ Library
rmqt_future.h
1// Copyright 2020-2023 Bloomberg Finance L.P.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#ifndef INCLUDED_RMQT_FUTURE
17#define INCLUDED_RMQT_FUTURE
18
19#include <rmqt_result.h>
20
21#include <ball_log.h>
22#include <bdlf_bind.h>
23#include <bslmt_condition.h>
24#include <bslmt_lockguard.h>
25#include <bslmt_mutex.h>
26#include <bsls_assert.h>
27#include <bsls_systemtime.h>
28
29#include <bsl_exception.h>
30#include <bsl_functional.h>
31#include <bsl_list.h>
32#include <bsl_memory.h>
33#include <bsl_utility.h>
34
35//@PURPOSE: An async-style Future/Promise object
36//
37//@CLASSES:
38// rmqt::Future: Represents an action which may or may not have completed yet.
39//
40// rmqt::Future<T>::Impl: The thread-safe control block for Future<T>. These
41// are shared between copies of `Future` objects.
42//
43// rmqt::FutureUtil: 'helper' functionality for working with Futures which is
44// used in multiple places
45
46namespace BloombergLP {
47namespace rmqt {
48
103
104template <typename T>
105class Future;
106
107template <typename T = void>
108class Future {
109 public:
111 typedef bsl::function<void(const Result<T>& result)> Maker;
112
114 typedef bsl::pair<typename Future<T>::Maker, Future<T> > Pair;
115
120 static Pair make();
121
129 static Pair make(const bsl::function<void()>& cancelFunction);
130
132 ~Future();
133
141
146
153 Result<T> timedWaitResult(const bsls::TimeInterval& absoluteTime);
154
161 Result<T> waitResult(const bsls::TimeInterval& relativeTimeout);
162
164 explicit Future(const Result<T>& result);
165
170 Future(const Future& future);
171
176 Future& operator=(const Future& future);
177
178 template <typename newT>
180 then(const bsl::function<Result<newT>(const Result<T>&)>& converter);
181
182 template <typename newT>
184 thenFuture(const bsl::function<Future<newT>(const Result<T>&)>& nextFuture);
185
186 private:
187 class Impl;
188
189 private:
190#ifdef __xlC__
191 // IBM compilers don't feel the above friend declaration is good enough
192 // to allow permission to a future of a different T's
193 // impl, so this is necessary. But also doesn't sit
194 // well with pretty much any other compiler
195 template <typename newT>
196 friend class typename Future<newT>::Impl;
197#else
198 // Required for Future<T>::Impl::addChain
199 template <typename newT>
200 friend class Future;
201#endif
202
203 Future();
204
205 bsl::shared_ptr<Impl> d_impl;
206}; // class Future
207
209
210 public:
213 template <typename A, typename B>
214 static Result<B> convertViaManagedPtr(const Result<A>& a);
215
216 template <typename A, typename B>
217 static bsl::function<Future<B>(const Result<A>&)> propagateError(
218 const bsl::function<Future<B>(const bsl::shared_ptr<A>&)>& fn)
219 {
220 return bdlf::BindUtil::bind(
221 &propagateErrorImplWithItem<A, B>, fn, bdlf::PlaceHolders::_1);
222 }
223
224 template <typename T>
225 static bsl::function<Future<T>(const Result<void>&)>
226 propagateError(const bsl::function<Future<T>()>& fn)
227 {
228
229 return bdlf::BindUtil::bind(
230 &propagateErrorImpl<T>, fn, bdlf::PlaceHolders::_1);
231 }
232
233 template <typename T>
234 static bsl::function<rmqt::Result<T>()>
235 resultWrapper(const bsl::function<T()>& tProducer)
236 {
237 return bdlf::BindUtil::bind(&resultWrapperImpl<T>, tProducer);
238 }
239
240 template <typename T>
241 static void processResult(const typename Future<T>::Maker& maker,
242 const bsl::function<Result<T>()>& resultProducer)
243 {
244 maker(resultProducer());
245 }
246
247 template <typename T>
248 static bsl::function<Result<T>(const Result<T>&)>
249 makerWrapper(const typename Future<T>::Maker& maker)
250 {
251 return bdlf::BindUtil::bind(
252 &makerWrapperImpl<T>, maker, bdlf::PlaceHolders::_1);
253 }
254
255 template <typename T>
256 static bsl::function<Future<T>(const Result<Future<T> >&)> unravelFuture()
257 {
258 bsl::function<Future<T>(const bsl::shared_ptr<Future<T> >&)> convert =
259 bdlf::BindUtil::bind(&unravelImpl<T>, bdlf::PlaceHolders::_1);
260 return propagateError<Future<T> >(convert);
261 }
262
263 template <typename T>
264 static Future<T> flatten(Future<Future<T> > futurefuture)
265 {
266 return futurefuture.template thenFuture<T>(
267 FutureUtil::unravelFuture<T>());
268 }
269
270 private:
271 template <typename T>
272 static Future<T> propagateErrorImpl(const bsl::function<Future<T>()>& t,
273 const Result<void>& r)
274 {
275 if (!r) {
276 typename Future<T>::Pair fail = Future<T>::make();
277 fail.first(Result<T>(r.error(), r.returnCode()));
278 return fail.second;
279 }
280 return t();
281 }
282
283 template <typename A, typename B>
284 static Future<B> propagateErrorImplWithItem(
285 const bsl::function<Future<B>(const bsl::shared_ptr<A>&)>& b,
286 const Result<A>& a)
287 {
288 if (!a) {
289 typename Future<B>::Pair fail = Future<B>::make();
290 fail.first(Result<B>(a.error(), a.returnCode()));
291 return fail.second;
292 }
293 return b(a.value());
294 }
295
296 template <typename T>
297 static Result<T> makerWrapperImpl(const typename Future<T>::Maker& maker,
298 const Result<T>& result)
299 {
300 maker(result);
301 return result;
302 }
303
304 template <typename T>
305 static Future<T> unravelImpl(const bsl::shared_ptr<Future<T> >& t)
306 {
307 return *t;
308 }
309
310 template <typename T>
311 static Result<T> resultWrapperImpl(const bsl::function<T()>& tProducer);
312};
313
314template <typename T>
315bsl::pair<typename Future<T>::Maker, Future<T> > Future<T>::make()
316{
317 Future<T> f;
318 f.d_impl = bsl::make_shared<Impl>(bsl::function<void()>());
319
320 return bsl::make_pair(f.d_impl->generateMaker(), f);
321}
322
323template <typename T>
324bsl::pair<typename Future<T>::Maker, Future<T> >
325Future<T>::make(const bsl::function<void()>& cancelFunction)
326{
327 Future<T> f;
328 f.d_impl = bsl::make_shared<Impl>(cancelFunction);
329
330 return bsl::make_pair(f.d_impl->generateMaker(), f);
331}
332
333template <typename T>
335{
336}
337
338template <typename T>
340{
341 return waitResult(bsls::TimeInterval(0));
342}
343
344template <typename T>
346{
347 d_impl->blockUntilMade();
348 return d_impl->result();
349}
350
351template <typename T>
352Result<T> Future<T>::waitResult(const bsls::TimeInterval& relativeTimeout)
353{
354 bsls::TimeInterval timeoutTime = bsls::SystemTime::nowRealtimeClock();
355 timeoutTime += relativeTimeout;
356 return timedWaitResult(timeoutTime);
357}
358
359template <typename T>
360Result<T> Future<T>::timedWaitResult(const bsls::TimeInterval& absoluteTime)
361{
362 if (d_impl->timedWaitUntilMade(absoluteTime)) {
363 return d_impl->result();
364 }
365 return Result<T>("TIMED OUT", TIMEOUT);
366}
367
368template <typename T>
370: d_impl()
371{
372 Pair p = make();
373 d_impl = p.second.d_impl;
374 p.first(result);
375}
376
377template <typename T>
379: d_impl()
380{
381}
382
383template <typename T>
385: d_impl(future.d_impl)
386{
387}
388
389template <typename T>
391{
392 this->d_impl = future.d_impl;
393 return *this;
394}
395
396template <typename T>
397template <typename newT>
399Future<T>::then(const bsl::function<Result<newT>(const Result<T>&)>& converter)
400{
401 return d_impl->addChain(converter);
402}
403
404template <typename T>
405template <typename newT>
406Future<newT> Future<T>::thenFuture(
407 const bsl::function<Future<newT>(const Result<T>&)>& nextFutureMaker)
408{
409 return d_impl->addChain(nextFutureMaker);
410}
411
412template <typename T>
413Result<T> FutureUtil::resultWrapperImpl(const bsl::function<T()>& tProducer)
414{
415 return rmqt::Result<T>(bsl::make_shared<T>(tProducer()));
416}
417
418template <>
419Result<void>
420FutureUtil::resultWrapperImpl<void>(const bsl::function<void()>& tProducer);
421
422template <typename A, typename B>
424{
425 if (a) {
426 bslma::ManagedPtr<A> am(a.value().managedPtr());
427 return Result<B>(bsl::shared_ptr<B>(new B(am)));
428 }
429 return Result<B>(a.error(), a.returnCode());
430}
431
432template <typename T>
433class Future<T>::Impl
434: public bsl::enable_shared_from_this<typename Future<T>::Impl> {
435 private:
436 static void made(const bsl::weak_ptr<typename Future<T>::Impl>& weakSelf,
437 const Result<T>& item)
438 {
439 bsl::shared_ptr<Future<T>::Impl> self = weakSelf.lock();
440
441 if (!self) {
442 BALL_LOG_SET_CATEGORY("RMQT.FUTURE.IMPL");
443 BALL_LOG_DEBUG << "Resolved " << bsl::string(!item ? "un" : "")
444 << "successful future<" << typeid(T).name()
445 << "> after cancel";
446 // This is a safe race condition - e.g. if a user stops caring about
447 // a future but the future resolves anyway in the background, we
448 // just drop the result.
449 return;
450 }
451
452 bslmt::LockGuard<bslmt::Mutex> guard(&self->d_mutex);
453 self->d_result = item;
454 self->d_done = true;
455 self->notifyChain(&guard);
456 self->d_condition.broadcast(); // there could be more than one waiter
457 }
458
459 template <typename newT>
460 void static converter(
461 const typename Future<newT>::Maker& maker,
462 const bsl::function<Result<newT>(const Result<T>&)>& newTConverter,
463 const Result<T>& result)
464 {
465 maker(newTConverter(result));
466 }
467
475 template <typename newT>
476 static void futureConverter(
477 const bsl::function<Future<newT>(const Result<T>&)>& cGenerator,
478 const typename Future<newT>::Maker& bMaker,
479 const bsl::weak_ptr<typename Future<newT>::Impl>& wpBImpl,
480 const Result<T>& aResult)
481 {
482 bsl::shared_ptr<typename Future<newT>::Impl> bImpl = wpBImpl.lock();
483 if (bImpl) {
484 Future<newT> cFut = cGenerator(aResult);
485
486 Future<newT> dFut =
487 cFut.then(FutureUtil::makerWrapper<newT>(bMaker));
488
489 bImpl->updateCancel(dFut.d_impl->getLifetimeExtension());
490 }
491 }
492
493 static void
494 extendChainParentLifetime(const bsl::shared_ptr<typename Future<T>::Impl>&)
495 {
496 // This function exists only as a bind target so that a Future
497 // created in .then() can hold onto the parent Future to keep it
498 // alive as long as itself.
499
500 // E.g.
501
502 // rmqt::Future<bsl::string> asyncGetMultiplyAsString(int a, int b)
503 // {
504 // rmqt::Future<int> x = asyncMultiply(a, b);
505 // rmqt::Future<bsl::string> y =
506 // x.then<bsl::string>(&convertIntToString);
507 //
508 // return y;
509 // }
510
511 // In the above example, `x` will go out of scope at the end of the
512 // function, but it's successor `y` is still alive.
513
514 // Since the type of `y` does not know about `x`, but it needs to
515 // extend it's lifetime (call the `x` destructor when `y`
516 // destructs), we need to hold a type-erased form of `x`
517
518 // Note: if `y` then goes out of scope, the whole future chain
519 // should be destructed
520 }
521
522 public:
523 Impl(const bsl::function<void()>& cancelFunc)
524 : d_mutex()
525 , d_condition()
526 , d_result("Null")
527 , d_chain()
528 , d_done(false)
529 , d_cancelFunc(cancelFunc)
530 {
531 }
532
533 ~Impl()
534 {
535 BALL_LOG_SET_CATEGORY("RMQT.FUTURE.IMPL");
536 try {
537 bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex);
538 if (!d_done && d_cancelFunc) {
539 d_cancelFunc();
540 }
541 }
542 catch (bsl::exception& e) {
543 BALL_LOG_ERROR << "Caught exception in future<" << typeid(T).name()
544 << "> dtor: " << e.what();
545 }
546 catch (...) {
547 BALL_LOG_ERROR << "Caught unknown exception in future<"
548 << typeid(T).name() << "> dtor";
549 }
550 }
551
552 void blockUntilMade()
553 {
554 bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex);
555 while (!d_done) {
556 d_condition.wait(&d_mutex);
557 }
558 }
559
560 // returns true if there was no timeout
561 bool timedWaitUntilMade(const bsls::TimeInterval& absoluteTime)
562 {
563 bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex);
564 while (!d_done) {
565 if (bslmt::Condition::e_TIMED_OUT ==
566 d_condition.timedWait(&d_mutex, absoluteTime)) {
567 return false;
568 }
569 }
570 return true;
571 }
572
573 typename Future<T>::Maker generateMaker()
574 {
575 return bdlf::BindUtil::bind(&Future<T>::Impl::made,
577 bdlf::PlaceHolders::_1);
578 }
579
580 const Result<T>& result(bslmt::LockGuard<bslmt::Mutex>* haveGuard = 0)
581 {
582 if (!haveGuard) {
583 bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex);
584 return resultNoLock();
585 }
586 BSLS_ASSERT(haveGuard->ptr() == &d_mutex);
587 return resultNoLock();
588 }
589
590 template <typename newT>
592 addChain(const bsl::function<Result<newT>(const Result<T>&)>& newTConverter)
593 {
594 typename rmqt::Future<newT>::Pair futurePair(
595 rmqt::Future<newT>::make(getLifetimeExtension()));
596
597 bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex);
598 if (d_done) {
599 converter(futurePair.first, newTConverter, result(&guard));
600 }
601 else {
602 d_chain.push_back(
603 bdlf::BindUtil::bind(&Future<T>::Impl::converter<newT>,
604 futurePair.first,
605 newTConverter,
606 bdlf::PlaceHolders::_1));
607 }
608
609 return futurePair.second;
610 }
611
612 template <typename newT>
614 addChain(const bsl::function<Future<newT>(const Result<T>&)>& futureMaker)
615 {
616 typename rmqt::Future<newT>::Pair futurePair(
617 rmqt::Future<newT>::make(getLifetimeExtension()));
618
619 bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex);
620 if (d_done) {
621 futureConverter<newT>(
622 futureMaker,
623 futurePair.first,
624 bsl::weak_ptr<typename rmqt::Future<newT>::Impl>(
625 futurePair.second.d_impl),
626 result(&guard));
627 }
628 else {
629 d_chain.push_back(bdlf::BindUtil::bind(
631 futureMaker,
632 futurePair.first,
633 bsl::weak_ptr<typename rmqt::Future<newT>::Impl>(
634 futurePair.second.d_impl),
635 bdlf::PlaceHolders::_1));
636 }
637 return futurePair.second;
638 }
639
640 bsl::function<void()> getLifetimeExtension()
641 {
642 bsl::function<void(const bsl::shared_ptr<typename Future<T>::Impl>&)>
643 func;
645 return bdlf::BindUtil::bind(func, Future<T>::Impl::shared_from_this());
646 }
647
648 void updateCancel(const bsl::function<void()>& newCanc)
649 {
650 bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex);
651 d_cancelFunc = newCanc;
652 }
653
654 private:
655 const Result<T>& resultNoLock()
656 {
657 BSLS_ASSERT(d_done);
658 return d_result;
659 }
660
661 void notifyChain(bslmt::LockGuard<bslmt::Mutex>* haveGuard)
662 {
663 BSLS_ASSERT(haveGuard);
664 typename bsl::list<bsl::function<void(const Result<T>&)> >::iterator it;
665 for (it = d_chain.begin(); it != d_chain.end(); ++it) {
666 (*it)(result(haveGuard));
667 }
668 d_chain.clear();
669 }
670
671 private:
672 bslmt::Mutex d_mutex;
673 bslmt::Condition d_condition;
674 Result<T> d_result;
675 bsl::list<typename rmqt::Future<T>::Maker> d_chain;
676 bool d_done;
677
678 // Called if the future is not completed when Future<T>::Impl destructs
679 // This function is used to extend the lifetime of chained futures.
680 // See extendChainParentLifetime
681 bsl::function<void()> d_cancelFunc;
682};
683
684} // namespace rmqt
685} // namespace BloombergLP
686
687#endif
Definition: rmqt_future.h:208
static Result< B > convertViaManagedPtr(const Result< A > &a)
Definition: rmqt_future.h:423
Definition: rmqt_future.h:434
An async-style Future/Promise object.
Definition: rmqt_future.h:108
Future & operator=(const Future &future)
Assignment (copy) Future.
Definition: rmqt_future.h:390
bsl::function< void(const Result< T > &result)> Maker
Used to resolve a Future<T>. Somewhat equivalent to std::promise.
Definition: rmqt_future.h:111
Result< T > waitResult(const bsls::TimeInterval &relativeTimeout)
Fetch the result, waiting up to relativeTimeout period (from now) if it isn't ready.
Definition: rmqt_future.h:352
static Pair make()
Creates a pair of (Promise, Future).
Definition: rmqt_future.h:315
bsl::pair< typename Future< T >::Maker, Future< T > > Pair
A (Promise, Future) pair. Used to create Futures.
Definition: rmqt_future.h:114
Result< T > blockResult()
Fetch the result, and block if it isn't ready yet.
Definition: rmqt_future.h:345
Result< T > timedWaitResult(const bsls::TimeInterval &absoluteTime)
Fetch the result, waiting up to absoluteTime if it isn't ready.
Definition: rmqt_future.h:360
~Future()
Destructor.
Definition: rmqt_future.h:334
Result< T > tryResult()
Attempt to fetch the result, do not block.
Definition: rmqt_future.h:339
A result of an operation.
Definition: rmqt_result.h:37
const bsl::shared_ptr< T > & value() const
Definition: rmqt_result.h:71
const bsl::string & error() const
Definition: rmqt_result.h:75
int returnCode() const
Definition: rmqt_result.h:79