mqtt_cpp
session_state.hpp
Go to the documentation of this file.
1 // Copyright Takatoshi Kondo 2020
2 //
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 
7 #if !defined(MQTT_BROKER_SESSION_STATE_HPP)
8 #define MQTT_BROKER_SESSION_STATE_HPP
9 
10 #include <mqtt/config.hpp>
11 
12 #include <chrono>
13 
14 #include <boost/asio/io_context.hpp>
15 #include <boost/multi_index_container.hpp>
16 #include <boost/multi_index/ordered_index.hpp>
17 #include <boost/multi_index/member.hpp>
18 
20 
25 #include <mqtt/broker/tags.hpp>
28 
30 
31 namespace as = boost::asio;
32 namespace mi = boost::multi_index;
33 
34 class session_states;
35 
53 struct session_state {
54  // TODO: Currently not fully implemented...
56  as::io_context& ioc,
57  sub_con_map& subs_map,
58  shared_target& shared_targets,
59  con_sp_t con,
61  optional<will> will,
62  optional<std::chrono::steady_clock::duration> will_expiry_interval,
63  optional<std::chrono::steady_clock::duration> session_expiry_interval = nullopt)
64  :ioc_(ioc),
65  subs_map_(subs_map),
66  shared_targets_(shared_targets),
67  con_(force_move(con)),
68  client_id_(force_move(client_id)),
69  session_expiry_interval_(force_move(session_expiry_interval))
70  {
71  update_will(ioc, will, will_expiry_interval);
72  }
73 
75 
77  MQTT_LOG("mqtt_broker", trace)
78  << MQTT_ADD_VALUE(address, this)
79  << "session destroy";
80  clean();
81  }
82 
83  bool online() const {
84  return bool(con_);
85  }
86 
87  template <typename SessionExpireHandler>
88  void become_offline(SessionExpireHandler&& h) {
89  BOOST_ASSERT(con_);
90  con_->for_each_store_with_life_keeper(
91  [this] (store_message_variant msg, any life_keeper) {
92  MQTT_LOG("mqtt_broker", trace)
93  << MQTT_ADD_VALUE(address, this)
94  << "store inflight message";
95 
96  std::shared_ptr<as::steady_timer> tim_message_expiry;
97 
100  [&](v5::basic_publish_message<sizeof(packet_id_t)> const& m) {
101  auto v = get_property<v5::property::message_expiry_interval>(m.props());
102  if (v) {
103  tim_message_expiry =
104  std::make_shared<as::steady_timer>(ioc_, std::chrono::seconds(v.value().val()));
105  tim_message_expiry->async_wait(
106  [this, wp = std::weak_ptr<as::steady_timer>(tim_message_expiry)]
107  (error_code ec) {
108  if (auto sp = wp.lock()) {
109  if (!ec) {
110  erase_inflight_message_by_expiry(sp);
111  }
112  }
113  }
114  );
115  }
116  },
117  [&](auto const&) {}
118  ),
119  msg
120  );
121 
123  force_move(msg),
124  force_move(life_keeper),
125  force_move(tim_message_expiry)
126  );
127  }
128  );
129 
130  reset_con();
131 
132  if (session_expiry_interval_ &&
133  session_expiry_interval_.value() != std::chrono::seconds(session_never_expire)) {
134 
135  MQTT_LOG("mqtt_broker", trace)
136  << MQTT_ADD_VALUE(address, this)
137  << "session expiry interval timer set";
138 
139  tim_session_expiry_ = std::make_shared<as::steady_timer>(ioc_, session_expiry_interval_.value());
140  tim_session_expiry_->async_wait(
141  [this, wp = std::weak_ptr<as::steady_timer>(tim_session_expiry_), h = std::forward<SessionExpireHandler>(h)]
142  (error_code ec) {
143  if (auto sp = wp.lock()) {
144  if (!ec) {
145  MQTT_LOG("mqtt_broker", info)
146  << MQTT_ADD_VALUE(address, this)
147  << "session expired";
148  h(sp);
149  }
150  }
151  }
152  );
153  }
154  }
155 
156  void renew_session_expiry(optional<std::chrono::steady_clock::duration> v) {
157  MQTT_LOG("mqtt_broker", trace)
158  << MQTT_ADD_VALUE(address, this)
159  << "renew_session expiry";
160  session_expiry_interval_ = force_move(v);
161  tim_session_expiry_.reset();
162  }
163 
164  std::shared_ptr<as::steady_timer> const& tim_session_expiry() const {
165  return tim_session_expiry_;
166  }
167 
168  void publish(
169  as::io_context& ioc,
170  buffer pub_topic,
171  buffer contents,
172  publish_options pubopts,
173  v5::properties props) {
174 
175  BOOST_ASSERT(online());
176 
177  if (offline_messages_.empty()) {
178  auto qos_value = pubopts.get_qos();
179  if (qos_value == qos::at_least_once ||
180  qos_value == qos::exactly_once) {
181  if (auto pid = con_->acquire_unique_packet_id_no_except()) {
182  // TODO: Probably this should be switched to async_publish?
183  // Given the async_client / sync_client seperation
184  // and the way they have different function names,
185  // it wouldn't be possible for broker.hpp to be
186  // used with some hypothetical "async_server" in the future.
187  con_->publish(pid.value(), pub_topic, contents, pubopts, props);
188  return;
189  }
190  }
191  else {
192  con_->publish(pub_topic, contents, pubopts, props);
193  return;
194  }
195  }
196 
197  // offline_messages_ is not empty or packet_id_exhausted
198  offline_messages_.push_back(
199  ioc,
200  force_move(pub_topic),
201  force_move(contents),
202  pubopts,
203  force_move(props)
204  );
205  }
206 
207  void deliver(
208  as::io_context& ioc,
209  buffer pub_topic,
210  buffer contents,
211  publish_options pubopts,
212  v5::properties props) {
213 
214  if (online()) {
215  publish(
216  ioc,
217  force_move(pub_topic),
218  force_move(contents),
219  pubopts,
220  force_move(props)
221  );
222  }
223  else {
224  offline_messages_.push_back(
225  ioc,
226  force_move(pub_topic),
227  force_move(contents),
228  pubopts,
229  force_move(props)
230  );
231  }
232  }
233 
234  void clean() {
235  inflight_messages_.clear();
236  offline_messages_.clear();
237  qos2_publish_processed_.clear();
238  shared_targets_.erase(*this);
239  unsubscribe_all();
240  }
241 
242  void exactly_once_start(packet_id_t packet_id) {
243  qos2_publish_processed_.insert(packet_id);
244  }
245 
246  bool exactly_once_processing(packet_id_t packet_id) const {
247  return qos2_publish_processed_.find(packet_id) != qos2_publish_processed_.end();
248  }
249 
251  qos2_publish_processed_.erase(packet_id);
252  }
253 
254  template <typename PublishRetainHandler>
255  void subscribe(
256  buffer share_name,
257  buffer topic_filter,
258  subscribe_options subopts,
259  PublishRetainHandler&& h,
260  optional<std::size_t> sid = nullopt
261  ) {
262  if (!share_name.empty()) {
263  shared_targets_.insert(share_name, topic_filter, *this);
264  }
265  MQTT_LOG("mqtt_broker", trace)
266  << MQTT_ADD_VALUE(address, this)
267  << "subscribe"
268  << " share_name:" << share_name
269  << " topic_filter:" << topic_filter
270  << " qos:" << subopts.get_qos();
271 
272  subscription sub {*this, force_move(share_name), topic_filter, subopts, sid };
273  auto handle_ret = subs_map_.insert_or_assign(
274  force_move(topic_filter),
275  client_id_,
276  force_move(sub)
277  );
278 
279  auto rh = subopts.get_retain_handling();
280 
281  if (handle_ret.second) { // insert
282  MQTT_LOG("mqtt_broker", trace)
283  << MQTT_ADD_VALUE(address, this)
284  << "subscription inserted";
285 
286  handles_.insert(handle_ret.first);
287  if (rh == retain_handling::send ||
288  rh == retain_handling::send_only_new_subscription) {
289  std::forward<PublishRetainHandler>(h)();
290  }
291  }
292  else { // update
293  MQTT_LOG("mqtt_broker", trace)
294  << MQTT_ADD_VALUE(address, this)
295  << "subscription updated";
296 
297  if (rh == retain_handling::send) {
298  std::forward<PublishRetainHandler>(h)();
299  }
300  }
301  }
302 
303  void unsubscribe(buffer const& share_name, buffer const& topic_filter) {
304  if (!share_name.empty()) {
305  shared_targets_.erase(share_name, topic_filter, *this);
306  }
307  auto handle = subs_map_.lookup(topic_filter);
308  if (handle) {
309  handles_.erase(handle.value());
310  subs_map_.erase(handle.value(), client_id_);
311  }
312  }
313 
315  for (auto const& h : handles_) {
316  subs_map_.erase(h, client_id_);
317  }
318  handles_.clear();
319  }
320 
322  as::io_context& ioc,
323  optional<MQTT_NS::will> will,
324  optional<std::chrono::steady_clock::duration> will_expiry_interval) {
325  tim_will_expiry_.reset();
326  will_value_ = force_move(will);
327 
328  if (will_value_ && will_expiry_interval) {
329  tim_will_expiry_ = std::make_shared<as::steady_timer>(ioc, will_expiry_interval.value());
330  tim_will_expiry_->async_wait(
331  [this, wp = std::weak_ptr<as::steady_timer>(tim_will_expiry_)]
332  (error_code ec) {
333  if (auto sp = wp.lock()) {
334  if (!ec) {
335  reset_will();
336  }
337  }
338  }
339  );
340  }
341  }
342 
343  void reset_will() {
344  tim_will_expiry_.reset();
345  will_value_ = nullopt;
346  }
347 
350  any life_keeper,
351  std::shared_ptr<as::steady_timer> tim_message_expiry
352  ) {
353  inflight_messages_.insert(
354  force_move(msg),
355  force_move(life_keeper),
356  force_move(tim_message_expiry)
357  );
358  }
359 
361  BOOST_ASSERT(con_);
362  inflight_messages_.send_all_messages(*con_);
363  }
364 
365  void erase_inflight_message_by_expiry(std::shared_ptr<as::steady_timer> const& sp) {
366  inflight_messages_.get<tag_tim>().erase(sp);
367  }
368 
370  auto& idx = inflight_messages_.get<tag_pid>();
371  idx.erase(packet_id);
372  }
373 
375  BOOST_ASSERT(con_);
376  offline_messages_.send_all(*con_);
377  }
378 
380  BOOST_ASSERT(con_);
381  offline_messages_.send_by_packet_id_release(*con_);
382  }
383 
384  buffer const& client_id() const {
385  return client_id_;
386  }
387 
388  void reset_con() {
389  con_.reset();
390  }
391 
392  void reset_con(con_sp_t con) {
393  con_ = force_move(con);
394  }
395 
396  con_sp_t const& con() const {
397  return con_;
398  }
399 
400  optional<std::chrono::steady_clock::duration> session_expiry_interval() const {
401  return session_expiry_interval_;
402  }
403 
404  optional<MQTT_NS::will>& will() { return will_value_; }
405  optional<MQTT_NS::will> const& will() const { return will_value_; }
406 
407  std::shared_ptr<as::steady_timer>& get_tim_will_expiry() { return tim_will_expiry_; }
408 
409 private:
410  friend class session_states;
411 
412  as::io_context& ioc_;
413  std::shared_ptr<as::steady_timer> tim_will_expiry_;
414  optional<MQTT_NS::will> will_value_;
415 
416  sub_con_map& subs_map_;
417  shared_target& shared_targets_;
418  con_sp_t con_;
419  buffer client_id_;
420 
421  optional<std::chrono::steady_clock::duration> will_delay_;
422  optional<std::chrono::steady_clock::duration> session_expiry_interval_;
423  std::shared_ptr<as::steady_timer> tim_session_expiry_;
424 
425  inflight_messages inflight_messages_;
426  std::set<packet_id_t> qos2_publish_processed_;
427 
428  offline_messages offline_messages_;
429 
430  std::set<sub_con_map::handle> handles_; // to efficient remove
431 };
432 
434 public:
435  template <typename Tag>
436  decltype(auto) get() {
437  return entries_.get<Tag>();
438  }
439 
440  template <typename Tag>
441  decltype(auto) get() const {
442  return entries_.get<Tag>();
443  }
444 
445  void clear() {
446  entries_.clear();
447  }
448 
449 private:
450  // The mi_session_online container holds the relevant data about an active connection with the broker.
451  // It can be queried either with the clientid, or with the shared pointer to the mqtt endpoint object
452  using mi_session_state = mi::multi_index_container<
454  mi::indexed_by<
455  // non is nullable
456  mi::ordered_non_unique<
457  mi::tag<tag_con>,
458  BOOST_MULTI_INDEX_MEMBER(session_state, con_sp_t, con_)
459  >,
460  mi::ordered_unique<
461  mi::tag<tag_cid>,
462  BOOST_MULTI_INDEX_MEMBER(session_state, buffer, client_id_)
463  >,
464  mi::ordered_non_unique<
465  mi::tag<tag_tim>,
466  BOOST_MULTI_INDEX_MEMBER(session_state, std::shared_ptr<as::steady_timer>, tim_session_expiry_)
467  >
468  >
469 >;
470 
471  mi_session_state entries_;
472 };
473 
475 
476 #endif // MQTT_BROKER_SESSION_STATE_HPP
#define MQTT_BROKER_NS_END
Definition: broker_namespace.hpp:22
#define MQTT_BROKER_NS_BEGIN
Definition: broker_namespace.hpp:21
Definition: inflight_message.hpp:91
Definition: offline_message.hpp:92
Definition: session_state.hpp:433
decltype(auto) get()
Definition: session_state.hpp:436
void clear()
Definition: session_state.hpp:445
Definition: shared_target.hpp:32
endpoint_t::packet_id_t packet_id_t
Definition: common_type.hpp:20
std::shared_ptr< endpoint_t > con_sp_t
Definition: common_type.hpp:18
#define MQTT_LOG(chan, sev)
Definition: log.hpp:135
#define MQTT_ADD_VALUE(name, val)
Definition: log.hpp:136
std::vector< property_variant > properties
Definition: property_variant.hpp:51
constexpr decltype(auto) visit(Visitor &&vis, Variants &&... vars)
Definition: variant.hpp:60
basic_store_message_variant< 2 > store_message_variant
Definition: message_variant.hpp:119
boost::system::error_code error_code
Definition: error_code.hpp:16
constexpr std::remove_reference_t< T > && force_move(T &&t)
Definition: move.hpp:20
lambda_visitor< Lambdas... > make_lambda_visitor(Lambdas &&... lambdas)
Definition: visitor_util.hpp:37
Definition: buffer.hpp:242
const_buffer buffer(MQTT_NS::buffer const &data)
create boost::asio::const_buffer from the MQTT_NS::buffer boost::asio::const_buffer is a kind of view...
Definition: buffer.hpp:253
Definition: session_state.hpp:53
void erase_inflight_message_by_packet_id(packet_id_t packet_id)
Definition: session_state.hpp:369
void unsubscribe_all()
Definition: session_state.hpp:314
void update_will(as::io_context &ioc, optional< MQTT_NS::will > will, optional< std::chrono::steady_clock::duration > will_expiry_interval)
Definition: session_state.hpp:321
void exactly_once_start(packet_id_t packet_id)
Definition: session_state.hpp:242
void reset_con(con_sp_t con)
Definition: session_state.hpp:392
void deliver(as::io_context &ioc, buffer pub_topic, buffer contents, publish_options pubopts, v5::properties props)
Definition: session_state.hpp:207
optional< MQTT_NS::will > const & will() const
Definition: session_state.hpp:405
void unsubscribe(buffer const &share_name, buffer const &topic_filter)
Definition: session_state.hpp:303
std::shared_ptr< as::steady_timer > & get_tim_will_expiry()
Definition: session_state.hpp:407
void publish(as::io_context &ioc, buffer pub_topic, buffer contents, publish_options pubopts, v5::properties props)
Definition: session_state.hpp:168
con_sp_t const & con() const
Definition: session_state.hpp:396
bool exactly_once_processing(packet_id_t packet_id) const
Definition: session_state.hpp:246
void clean()
Definition: session_state.hpp:234
~session_state()
Definition: session_state.hpp:76
void subscribe(buffer share_name, buffer topic_filter, subscribe_options subopts, PublishRetainHandler &&h, optional< std::size_t > sid=nullopt)
Definition: session_state.hpp:255
void become_offline(SessionExpireHandler &&h)
Definition: session_state.hpp:88
optional< std::chrono::steady_clock::duration > session_expiry_interval() const
Definition: session_state.hpp:400
session_state(session_state &&)=default
void send_inflight_messages()
Definition: session_state.hpp:360
void renew_session_expiry(optional< std::chrono::steady_clock::duration > v)
Definition: session_state.hpp:156
void reset_con()
Definition: session_state.hpp:388
void send_all_offline_messages()
Definition: session_state.hpp:374
bool online() const
Definition: session_state.hpp:83
void exactly_once_finish(packet_id_t packet_id)
Definition: session_state.hpp:250
void insert_inflight_message(store_message_variant msg, any life_keeper, std::shared_ptr< as::steady_timer > tim_message_expiry)
Definition: session_state.hpp:348
void erase_inflight_message_by_expiry(std::shared_ptr< as::steady_timer > const &sp)
Definition: session_state.hpp:365
buffer const & client_id() const
Definition: session_state.hpp:384
session_state(as::io_context &ioc, sub_con_map &subs_map, shared_target &shared_targets, con_sp_t con, buffer client_id, optional< will > will, optional< std::chrono::steady_clock::duration > will_expiry_interval, optional< std::chrono::steady_clock::duration > session_expiry_interval=nullopt)
Definition: session_state.hpp:55
void send_offline_messages_by_packet_id_release()
Definition: session_state.hpp:379
optional< MQTT_NS::will > & will()
Definition: session_state.hpp:404
std::shared_ptr< as::steady_timer > const & tim_session_expiry() const
Definition: session_state.hpp:164
void reset_will()
Definition: session_state.hpp:343
Definition: subscription.hpp:20
Definition: tags.hpp:24
Definition: tags.hpp:23