mqtt_cpp
inflight_message.hpp
Go to the documentation of this file.
1 // Copyright Takatoshi Kondo 2020
2 //
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 
7 #if !defined(MQTT_BROKER_INFLIGHT_MESSAGE_HPP)
8 #define MQTT_BROKER_INFLIGHT_MESSAGE_HPP
9 
10 #include <mqtt/config.hpp>
11 
12 #include <chrono>
13 
14 #include <boost/asio/steady_timer.hpp>
15 
17 #include <mqtt/message_variant.hpp>
18 #include <mqtt/any.hpp>
19 #include <mqtt/visitor_util.hpp>
20 
22 #include <mqtt/broker/tags.hpp>
24 
26 
27 class inflight_messages;
28 
30 public:
33  any life_keeper,
34  std::shared_ptr<as::steady_timer> tim_message_expiry)
35  :msg_ { force_move(msg) },
36  life_keeper_ { force_move(life_keeper) },
37  tim_message_expiry_ { force_move(tim_message_expiry) }
38  {}
39 
41  return
44  [](auto const& m) {
45  return m.packet_id();
46  }
47  ),
48  msg_
49  );
50  }
51 
52  void send(endpoint_t& ep) const {
53  optional<store_message_variant> msg_opt;
54  if (tim_message_expiry_) {
57  [&](v5::basic_publish_message<sizeof(packet_id_t)> const& m) {
58  auto updated_msg = m;
59  auto d =
60  std::chrono::duration_cast<std::chrono::seconds>(
61  tim_message_expiry_->expiry() - std::chrono::steady_clock::now()
62  ).count();
63  if (d < 0) d = 0;
64  updated_msg.update_prop(
65  v5::property::message_expiry_interval(
66  static_cast<uint32_t>(d)
67  )
68  );
69  msg_opt.emplace(force_move(updated_msg));
70  },
71  [](auto const&) {
72  }
73  ),
74  msg_
75  );
76  }
77  // packet_id_exhausted never happen because inflight message has already
78  // allocated packet_id at the previous connection.
79  // In send_store_message(), packet_id is registered.
80  ep.send_store_message(msg_opt ? msg_opt.value() : msg_, life_keeper_);
81  }
82 
83 private:
84  friend class inflight_messages;
85 
87  any life_keeper_;
88  std::shared_ptr<as::steady_timer> tim_message_expiry_;
89 };
90 
92 public:
93  void insert(
95  any life_keeper,
96  std::shared_ptr<as::steady_timer> tim_message_expiry
97  ) {
98  messages_.emplace_back(
99  force_move(msg),
100  force_move(life_keeper),
101  force_move(tim_message_expiry)
102  );
103  }
104 
106  for (auto const& ifm : messages_) {
107  ifm.send(ep);
108  }
109  }
110 
111  void clear() {
112  messages_.clear();
113  }
114 
115  template <typename Tag>
116  decltype(auto) get() {
117  return messages_.get<Tag>();
118  }
119 
120  template <typename Tag>
121  decltype(auto) get() const {
122  return messages_.get<Tag>();
123  }
124 
125 private:
126  using mi_inflight_message = mi::multi_index_container<
128  mi::indexed_by<
129  mi::sequenced<
130  mi::tag<tag_seq>
131  >,
132  mi::ordered_unique<
133  mi::tag<tag_pid>,
134  BOOST_MULTI_INDEX_CONST_MEM_FUN(inflight_message, packet_id_t, packet_id)
135  >,
136  mi::ordered_non_unique<
137  mi::tag<tag_tim>,
138  BOOST_MULTI_INDEX_MEMBER(inflight_message, std::shared_ptr<as::steady_timer>, tim_message_expiry_)
139  >
140  >
141  >;
142 
143  mi_inflight_message messages_;
144 };
145 
147 
148 #endif // MQTT_BROKER_INFLIGHT_MESSAGE_HPP
#define MQTT_BROKER_NS_END
Definition: broker_namespace.hpp:22
#define MQTT_BROKER_NS_BEGIN
Definition: broker_namespace.hpp:21
Definition: inflight_message.hpp:29
void send(endpoint_t &ep) const
Definition: inflight_message.hpp:52
inflight_message(store_message_variant msg, any life_keeper, std::shared_ptr< as::steady_timer > tim_message_expiry)
Definition: inflight_message.hpp:31
packet_id_t packet_id() const
Definition: inflight_message.hpp:40
Definition: inflight_message.hpp:91
void insert(store_message_variant msg, any life_keeper, std::shared_ptr< as::steady_timer > tim_message_expiry)
Definition: inflight_message.hpp:93
void clear()
Definition: inflight_message.hpp:111
decltype(auto) get()
Definition: inflight_message.hpp:116
void send_all_messages(endpoint_t &ep)
Definition: inflight_message.hpp:105
server<>::endpoint_t endpoint_t
Definition: common_type.hpp:17
endpoint_t::packet_id_t packet_id_t
Definition: common_type.hpp:20
constexpr decltype(auto) visit(Visitor &&vis, Variants &&... vars)
Definition: variant.hpp:60
basic_store_message_variant< 2 > store_message_variant
Definition: message_variant.hpp:119
constexpr std::remove_reference_t< T > && force_move(T &&t)
Definition: move.hpp:20
lambda_visitor< Lambdas... > make_lambda_visitor(Lambdas &&... lambdas)
Definition: visitor_util.hpp:37