mqtt_cpp
offline_message.hpp
Go to the documentation of this file.
1 // Copyright Takatoshi Kondo 2020
2 //
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 
7 #if !defined(MQTT_BROKER_OFFLINE_MESSAGE_HPP)
8 #define MQTT_BROKER_OFFLINE_MESSAGE_HPP
9 
10 #include <mqtt/config.hpp>
11 
12 #include <boost/asio/steady_timer.hpp>
13 #include <boost/multi_index_container.hpp>
14 #include <boost/multi_index/ordered_index.hpp>
15 #include <boost/multi_index/sequenced_index.hpp>
16 #include <boost/multi_index/member.hpp>
17 
18 #include <mqtt/buffer.hpp>
20 #include <mqtt/publish.hpp>
21 
24 #include <mqtt/broker/tags.hpp>
26 
28 
29 namespace mi = boost::multi_index;
30 
31 class offline_messages;
32 
33 // The offline_message structure holds messages that have been published on a
34 // topic that a not-currently-connected client is subscribed to.
35 // When a new connection is made with the client id for this saved data,
36 // these messages will be published to that client, and only that client.
38 public:
40  buffer topic,
41  buffer contents,
42  publish_options pubopts,
43  v5::properties props,
44  std::shared_ptr<as::steady_timer> tim_message_expiry)
45  : topic_(force_move(topic)),
46  contents_(force_move(contents)),
47  pubopts_(pubopts),
48  props_(force_move(props)),
49  tim_message_expiry_(force_move(tim_message_expiry))
50  { }
51 
52  bool send(endpoint_t& ep) const {
53  auto props = props_;
54  if (tim_message_expiry_) {
55  auto d =
56  std::chrono::duration_cast<std::chrono::seconds>(
57  tim_message_expiry_->expiry() - std::chrono::steady_clock::now()
58  ).count();
59  if (d < 0) d = 0;
60  set_property<v5::property::message_expiry_interval>(
61  props,
62  v5::property::message_expiry_interval(
63  static_cast<uint32_t>(d)
64  )
65  );
66  }
67  auto qos_value = pubopts_.get_qos();
68  if (qos_value == qos::at_least_once ||
69  qos_value == qos::exactly_once) {
70  if (auto pid = ep.acquire_unique_packet_id_no_except()) {
71  ep.publish(pid.value(), topic_, contents_, pubopts_, force_move(props));
72  return true;
73  }
74  }
75  else {
76  ep.publish(topic_, contents_, pubopts_, force_move(props));
77  return true;
78  }
79  return false;
80  }
81 
82 private:
83  friend class offline_messages;
84 
85  buffer topic_;
86  buffer contents_;
87  publish_options pubopts_;
88  v5::properties props_;
89  std::shared_ptr<as::steady_timer> tim_message_expiry_;
90 };
91 
93 public:
94  void send_all(endpoint_t& ep) {
95  auto& idx = messages_.get<tag_seq>();
96  while (!idx.empty()) {
97  if (idx.front().send(ep)) {
98  idx.pop_front();
99  }
100  else {
101  break;
102  }
103  }
104  }
105 
107  auto& idx = messages_.get<tag_seq>();
108  while (!idx.empty()) {
109  if (idx.front().send(ep)) {
110  // if packet_id is consumed, then finish
111  idx.pop_front();
112  }
113  else {
114  break;
115  }
116  }
117  }
118 
119  void clear() {
120  messages_.clear();
121  }
122 
123  bool empty() const {
124  return messages_.empty();
125  }
126 
127  void push_back(
128  as::io_context& ioc,
129  buffer pub_topic,
130  buffer contents,
131  publish_options pubopts,
132  v5::properties props) {
133  optional<std::chrono::steady_clock::duration> message_expiry_interval;
134 
135  auto v = get_property<v5::property::message_expiry_interval>(props);
136  if (v) {
137  message_expiry_interval.emplace(std::chrono::seconds(v.value().val()));
138  }
139 
140  std::shared_ptr<as::steady_timer> tim_message_expiry;
141  if (message_expiry_interval) {
142  tim_message_expiry = std::make_shared<as::steady_timer>(ioc, message_expiry_interval.value());
143  tim_message_expiry->async_wait(
144  [this, wp = std::weak_ptr<as::steady_timer>(tim_message_expiry)](error_code ec) mutable {
145  if (auto sp = wp.lock()) {
146  if (!ec) {
147  messages_.get<tag_tim>().erase(sp);
148  }
149  }
150  }
151  );
152  }
153 
154  auto& seq_idx = messages_.get<tag_seq>();
155  seq_idx.emplace_back(
156  force_move(pub_topic),
157  force_move(contents),
158  pubopts,
159  force_move(props),
160  force_move(tim_message_expiry)
161  );
162  }
163 
164 private:
165  using mi_offline_message = mi::multi_index_container<
167  mi::indexed_by<
168  mi::sequenced<
169  mi::tag<tag_seq>
170  >,
171  mi::ordered_non_unique<
172  mi::tag<tag_tim>,
173  BOOST_MULTI_INDEX_MEMBER(offline_message, std::shared_ptr<as::steady_timer>, tim_message_expiry_)
174  >
175  >
176  >;
177 
178  mi_offline_message messages_;
179 };
180 
182 
183 #endif // MQTT_BROKER_OFFLINE_MESSAGE_HPP
#define MQTT_BROKER_NS_END
Definition: broker_namespace.hpp:22
#define MQTT_BROKER_NS_BEGIN
Definition: broker_namespace.hpp:21
Definition: offline_message.hpp:37
offline_message(buffer topic, buffer contents, publish_options pubopts, v5::properties props, std::shared_ptr< as::steady_timer > tim_message_expiry)
Definition: offline_message.hpp:39
bool send(endpoint_t &ep) const
Definition: offline_message.hpp:52
Definition: offline_message.hpp:92
void clear()
Definition: offline_message.hpp:119
void send_by_packet_id_release(endpoint_t &ep)
Definition: offline_message.hpp:106
void send_all(endpoint_t &ep)
Definition: offline_message.hpp:94
void push_back(as::io_context &ioc, buffer pub_topic, buffer contents, publish_options pubopts, v5::properties props)
Definition: offline_message.hpp:127
bool empty() const
Definition: offline_message.hpp:123
server<>::endpoint_t endpoint_t
Definition: common_type.hpp:17
std::vector< property_variant > properties
Definition: property_variant.hpp:51
boost::system::error_code error_code
Definition: error_code.hpp:16
constexpr std::remove_reference_t< T > && force_move(T &&t)
Definition: move.hpp:20
const_buffer buffer(MQTT_NS::buffer const &data)
create boost::asio::const_buffer from the MQTT_NS::buffer boost::asio::const_buffer is a kind of view...
Definition: buffer.hpp:253
Definition: tags.hpp:16