mqtt_cpp
broker.hpp
Go to the documentation of this file.
1 // Copyright Takatoshi Kondo 2017
2 //
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 
7 #if !defined(MQTT_BROKER_BROKER_HPP)
8 #define MQTT_BROKER_BROKER_HPP
9 
10 #include <mqtt/config.hpp>
11 
12 #include <set>
13 
14 #include <boost/lexical_cast.hpp>
15 
17 #include <mqtt/optional.hpp>
18 #include <mqtt/property.hpp>
19 #include <mqtt/visitor_util.hpp>
20 
24 
27 
29 
30 namespace mi = boost::multi_index;
31 namespace as = boost::asio;
32 
33 
34 #if defined(MQTT_STD_STRING_VIEW)
35 #define MQTT_STRING_VIEW_CONSTEXPR constexpr
36 #else // defined(MQTT_STD_STRING_VIEW)
37 #define MQTT_STRING_VIEW_CONSTEXPR
38 #endif // defined(MQTT_STD_STRING_VIEW)
39 
40 
41 // TODO: Technically this function is simply wrong, since it's treating the
42 // topic pattern as if it were an ASCII sequence.
43 // To make this function correct per the standard, it would be necessary
44 // to conduct the search for the wildcard characters using a proper
45 // UTF-8 API to avoid problems of interpreting parts of multi-byte characters
46 // as if they were individual ASCII characters
48 bool validate_topic_filter(string_view topic_filter) {
49  /*
50  * Confirm the topic pattern is valid before registering it.
51  * Use rules from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718106
52  */
53 
54  // All Topic Names and Topic Filters MUST be at least one character long
55  // Topic Names and Topic Filters are UTF-8 Encoded Strings; they MUST NOT encode to more than 65,535 bytes
56  if (topic_filter.empty() || (topic_filter.size() > std::numeric_limits<std::uint16_t>::max())) {
57  return false;
58  }
59 
60  for (string_view::size_type idx = topic_filter.find_first_of(string_view("\0+#", 3));
61  string_view::npos != idx;
62  idx = topic_filter.find_first_of(string_view("\0+#", 3), idx+1)) {
63  BOOST_ASSERT(
64  ('\0' == topic_filter[idx])
65  || ('+' == topic_filter[idx])
66  || ('#' == topic_filter[idx])
67  );
68  if ('\0' == topic_filter[idx]) {
69  // Topic Names and Topic Filters MUST NOT include the null character (Unicode U+0000)
70  return false;
71  }
72  else if ('+' == topic_filter[idx]) {
73  /*
74  * Either must be the first character,
75  * or be preceeded by a topic seperator.
76  */
77  if ((0 != idx) && ('/' != topic_filter[idx-1])) {
78  return false;
79  }
80 
81  /*
82  * Either must be the last character,
83  * or be followed by a topic seperator.
84  */
85  if ((topic_filter.size()-1 != idx) && ('/' != topic_filter[idx+1])) {
86  return false;
87  }
88  }
89  // multilevel wildcard
90  else if ('#' == topic_filter[idx]) {
91  /*
92  * Must be absolute last character.
93  * Must only be one multi level wild card.
94  */
95  if (idx != topic_filter.size()-1) {
96  return false;
97  }
98 
99  /*
100  * If not the first character, then the
101  * immediately preceeding character must
102  * be a topic level separator.
103  */
104  if ((0 != idx) && ('/' != topic_filter[idx-1])) {
105  return false;
106  }
107  }
108  else {
109  return false;
110  }
111  }
112  return true;
113 }
114 
115 #if defined(MQTT_STD_STRING_VIEW)
116 // The following rules come from https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901247
117 static_assert( ! validate_topic_filter(""), "All Topic Names and Topic Filters MUST be at least one character long");
118 static_assert(validate_topic_filter("/"), "A Topic Name or Topic Filter consisting only of the ‘/’ character is valid");
119 static_assert( ! validate_topic_filter(string_view("\0", 1)), "Topic Names and Topic Filters MUST NOT include the null character (Unicode U+0000)");
120 static_assert(validate_topic_filter(" "), "Topic Names and Topic Filters can include the space character");
121 static_assert(validate_topic_filter("/////"), "Topic level separators can appear anywhere in a Topic Filter or Topic Name. Adjacent Topic level separators indicate a zero-length topic level");
122 static_assert(validate_topic_filter("#"), "The multi-level wildcard character MUST be specified either on its own or following a topic level separator");
123 static_assert(validate_topic_filter("/#"), "The multi-level wildcard character MUST be specified either on its own or following a topic level separator");
124 static_assert(validate_topic_filter("+/#"), "The multi-level wildcard character MUST be specified either on its own or following a topic level separator");
125 static_assert( ! validate_topic_filter("+#"), "The multi-level wildcard character MUST be specified either on its own or following a topic level separator");
126 static_assert( ! validate_topic_filter("++"), "The multi-level wildcard character MUST be specified either on its own or following a topic level separator");
127 static_assert( ! validate_topic_filter("f#"), "The multi-level wildcard character MUST be specified either on its own or following a topic level separator");
128 static_assert( ! validate_topic_filter("#/"), "In either case the multi-level wildcard character MUST be the last character specified in the Topic Filter");
129 
130 static_assert(validate_topic_filter("+"), "The single-level wildcard can be used at any level in the Topic Filter, including first and last levels");
131 static_assert(validate_topic_filter("+/bob/alice/sue"), "The single-level wildcard can be used at any level in the Topic Filter, including first and last levels");
132 static_assert(validate_topic_filter("bob/alice/sue/+"), "The single-level wildcard can be used at any level in the Topic Filter, including first and last levels");
133 static_assert(validate_topic_filter("+/bob/alice/sue/+"), "The single-level wildcard can be used at any level in the Topic Filter, including first and last levels");
134 static_assert(validate_topic_filter("+/bob/+/sue/+"), "The single-level wildcard can be used at any level in the Topic Filter, including first and last levels");
135 static_assert(validate_topic_filter("+/bob/+/sue/#"), "The single-level wildcard can be used at more than one level in the Topic Filter and can be used in conjunction with the multi-level wildcard");
136 static_assert( ! validate_topic_filter("+a"), "Where it is used, the single-level wildcard MUST occupy an entire level of the filter.");
137 static_assert( ! validate_topic_filter("a+"), "Where it is used, the single-level wildcard MUST occupy an entire level of the filter.");
138 static_assert( ! validate_topic_filter("/a+"), "Where it is used, the single-level wildcard MUST occupy an entire level of the filter.");
139 static_assert( ! validate_topic_filter("a+/"), "Where it is used, the single-level wildcard MUST occupy an entire level of the filter.");
140 static_assert( ! validate_topic_filter("/a+/"), "Where it is used, the single-level wildcard MUST occupy an entire level of the filter.");
141 #endif // defined(MQTT_STD_STRING_VIEW)
142 
145  /*
146  * Confirm the topic name is valid
147  * Use rules from https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901247
148  */
149 
150  // All Topic Names and Topic Filters MUST be at least one character long
151  // Topic Names and Topic Filters are UTF-8 Encoded Strings; they MUST NOT encode to more than 65,535 bytes
152  // The wildcard characters can be used in Topic Filters, but MUST NOT be used within a Topic Name
153  // Topic Names and Topic Filters MUST NOT include the null character (Unicode U+0000)
154  return
155  ! topic_name.empty()
156  && (topic_name.size() <= std::numeric_limits<std::uint16_t>::max())
157  && (string_view::npos == topic_name.find_first_of(string_view("\0+#", 3)));
158 }
159 
160 #if defined(MQTT_STD_STRING_VIEW)
161 // The following rules come from https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901247
162 static_assert( ! validate_topic_name(""), "All Topic Names and Topic Filters MUST be at least one character long");
163 static_assert(validate_topic_name("/"), "A Topic Name or Topic Filter consisting only of the ‘/’ character is valid");
164 static_assert( ! validate_topic_name(string_view("\0", 1)), "Topic Names and Topic Filters MUST NOT include the null character (Unicode U+0000)");
165 static_assert(validate_topic_name(" "), "Topic Names and Topic Filters can include the space character");
166 static_assert(validate_topic_name("/////"), "Topic level separators can appear anywhere in a Topic Filter or Topic Name. Adjacent Topic level separators indicate a zero-length topic level");
167 static_assert( ! validate_topic_name("#"), "The wildcard characters can be used in Topic Filters, but MUST NOT be used within a Topic Name");
168 static_assert( ! validate_topic_name("+"), "The wildcard characters can be used in Topic Filters, but MUST NOT be used within a Topic Name");
169 static_assert( ! validate_topic_name("/#"), "The wildcard characters can be used in Topic Filters, but MUST NOT be used within a Topic Name");
170 static_assert( ! validate_topic_name("+/#"), "The wildcard characters can be used in Topic Filters, but MUST NOT be used within a Topic Name");
171 static_assert( ! validate_topic_name("f#"), "The wildcard characters can be used in Topic Filters, but MUST NOT be used within a Topic Name");
172 static_assert( ! validate_topic_name("#/"), "The wildcard characters can be used in Topic Filters, but MUST NOT be used within a Topic Name");
173 #endif // defined(MQTT_STD_STRING_VIEW)
174 
176 bool compare_topic_filter(string_view topic_filter, string_view topic_name) {
177  if ( ! validate_topic_filter(topic_filter)) {
178  BOOST_ASSERT(validate_topic_filter(topic_filter));
179  return false;
180  }
181 
182  if ( ! validate_topic_name(topic_name)) {
183  BOOST_ASSERT(validate_topic_name(topic_name));
184  return false;
185  }
186 
187  // TODO: The Server MUST NOT match Topic Filters starting with a wildcard character (# or +) with Topic Names beginning with a $ character
188  for (string_view::size_type idx = topic_filter.find_first_of("+#");
189  string_view::npos != idx;
190  idx = topic_filter.find_first_of("+#")) {
191  BOOST_ASSERT(
192  ('+' == topic_filter[idx])
193  || ('#' == topic_filter[idx])
194  );
195 
196  if ('+' == topic_filter[idx]) {
197  // Compare everything up to the first +
198  if (topic_filter.substr(0, idx) == topic_name.substr(0, idx)) {
199  /*
200  * We already know thanks to the topic filter being validated
201  * that the + symbol is directly touching '/'s on both sides
202  * (if not the first or last character), so we don't need to
203  * double check that.
204  *
205  * By simply removing the prefix that we've compared and letting
206  * the loop continue, we get the proper comparison of the '/'s
207  * automatically when the loop continues.
208  */
209  topic_filter.remove_prefix(idx+1);
210  /*
211  * It's a bit more complicated for the incoming topic though
212  * as we need to remove everything up to the next seperator.
213  */
214  topic_name.remove_prefix(topic_name.find('/', idx));
215  }
216  else {
217  return false;
218  }
219  }
220  // multilevel wildcard
221  else {
222  /*
223  * Compare up to where the multilevel wild card is found
224  * and then anything after that matches the wildcard.
225  */
226  return topic_filter.substr(0, idx) == topic_name.substr(0, idx);
227  }
228  }
229 
230  // No + or # found in the remaining topic filter. Just do a string compare.
231  return topic_filter == topic_name;
232 }
233 
234 #if defined(MQTT_STD_STRING_VIEW)
235 static_assert(compare_topic_filter("bob", "bob"), "Topic Names and Topic Filters are case sensitive");
236 static_assert( ! compare_topic_filter("Bob", "bob"), "Topic Names and Topic Filters are case sensitive");
237 static_assert( ! compare_topic_filter("bob", "boB"), "Topic Names and Topic Filters are case sensitive");
238 static_assert( ! compare_topic_filter("/bob", "bob"), "A leading or trailing ‘/’ creates a distinct Topic Name or Topic Filter");
239 static_assert( ! compare_topic_filter("bob/", "bob"), "A leading or trailing ‘/’ creates a distinct Topic Name or Topic Filter");
240 static_assert( ! compare_topic_filter("bob", "/bob"), "A leading or trailing ‘/’ creates a distinct Topic Name or Topic Filter");
241 static_assert( ! compare_topic_filter("bob", "bob/"), "A leading or trailing ‘/’ creates a distinct Topic Name or Topic Filter");
242 static_assert(compare_topic_filter("bob/alice", "bob/alice"), "Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
243 static_assert(compare_topic_filter("bob/alice/sue", "bob/alice/sue"), "Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
244 static_assert(compare_topic_filter("bob//////sue", "bob//////sue"), "Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
245 static_assert(compare_topic_filter("bob/#", "bob//////sue"), "Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
246 static_assert( ! compare_topic_filter("bob///#", "bob/sue"), "Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
247 static_assert(compare_topic_filter("bob/+/sue", "bob/alice/sue"), "Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
248 static_assert( ! compare_topic_filter("bob/+/sue", "bob/alice/mary/sue"), "Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
249 static_assert(compare_topic_filter("#", "bob/alice/mary/sue"), "Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
250 static_assert(compare_topic_filter("bob/#", "bob/alice/mary/sue"), "Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
251 static_assert(compare_topic_filter("bob/alice/#", "bob/alice/mary/sue"), "Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
252 static_assert(compare_topic_filter("bob/alice/mary/#", "bob/alice/mary/sue"), "Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
253 static_assert( ! compare_topic_filter("bob/alice/mary/sue/#", "bob/alice/mary/sue"), "Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
254 #endif // defined(MQTT_STD_STRING_VIEW)
255 
256 class broker_t {
257 public:
258  broker_t(as::io_context& ioc)
259  :ioc_(ioc),
260  tim_disconnect_(ioc_)
261  {}
262 
263  // [begin] for test setting
273  void set_disconnect_delay(std::chrono::steady_clock::duration delay) {
274  delay_disconnect_ = force_move(delay);
275  }
276 
283  void set_pingresp(bool b) {
284  pingresp_ = b;
285  }
286  // [end] for test setting
287 
304  void handle_accept(con_sp_t spep) {
305  con_wp_t wp(spep);
306  endpoint_t& ep = *spep;
307 
308  ep.socket().lowest_layer().set_option(as::ip::tcp::no_delay(true));
309  ep.set_auto_pub_response(false);
310  ep.set_topic_alias_maximum(MQTT_NS::topic_alias_max);
311  // Pass spep to keep lifetime.
312  // It makes sure wp.lock() never return nullptr in the handlers below
313  // including close_handler and error_handler.
314  ep.start_session(spep);
315 
316  // set connection (lower than MQTT) level handlers
317  ep.set_close_handler(
318  [this, wp]
319  (){
320  con_sp_t sp = wp.lock();
321  BOOST_ASSERT(sp);
322  close_proc(force_move(sp), true);
323  });
324  ep.set_error_handler(
325  [this, wp]
326  (error_code ec){
327  con_sp_t sp = wp.lock();
328  BOOST_ASSERT(sp);
329  auto ver = sp->get_protocol_version();
330  MQTT_LOG("mqtt_broker", info)
331  << MQTT_ADD_VALUE(address, this)
332  << " error_handler is called. ec:" << ec.message() << " protocol_version:" << ver;
333 
334  auto send_response =
335  [&](auto ec) {
336  if (sp->connected()) {
337  auto rc =
338  [&] () -> MQTT_NS::optional<v5::disconnect_reason_code> {
339  if (ec == boost::system::errc::protocol_error) {
341  }
342  else if (ec == boost::system::errc::bad_message) {
344  }
345  return MQTT_NS::nullopt;
346  }();
347  if (rc) {
348  MQTT_LOG("mqtt_broker", trace)
349  << MQTT_ADD_VALUE(address, this)
350  << "send DISCONNECT reason_code:" << rc.value();
351  sp->disconnect(rc.value());
352  }
353  }
354  else if (sp->underlying_connected()){
355  // underlying layer connected, mqtt connecting
356  // and protocol_version has already been determind as v5
357  auto rc =
358  [&] () -> MQTT_NS::optional<v5::connect_reason_code> {
359  if (ec ==boost::system::errc::protocol_error) {
361  }
362  else if (ec == boost::system::errc::bad_message) {
364  }
365  return MQTT_NS::nullopt;
366  }();
367  if (rc) {
368  MQTT_LOG("mqtt_broker", trace)
369  << MQTT_ADD_VALUE(address, this)
370  << "send CONNACK reason_code:" << rc.value();
371  sp->connack(false, rc.value());
372  }
373  }
374  };
375 
376  switch (ver) {
378  // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#S4_13_Errors
379 
380  // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901205
381  //
382  // The DISCONNECT packet is the final MQTT Control Packet sent from the Client or
383  // the Server.
384  send_response(ec);
385  break;
387  // DISCONNECT can't be sent by broker on v3.1.1
388  //
389  // http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718090
390  //
391  // The DISCONNECT Packet is the final Control Packet sent from the Client to the Server.
392  // It indicates that the Client is disconnecting cleanly.
393  //
394  // At the MQTT connecting, there is no appropriate Connect Return Code on v3.1.1
395  // http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718035
396  break;
397  default:
398  // The protocol_version is in the CONNECT packet.
399  // Protocol error could happen before the protocol_version is parsed.
400  break;
401  }
402  close_proc(force_move(sp), true);
403  }
404  );
405 
406  // set MQTT level handlers
407  ep.set_connect_handler(
408  [this, wp]
409  (buffer client_id,
410  optional<buffer> username,
411  optional<buffer> password,
412  optional<will> will,
413  bool clean_session,
414  std::uint16_t keep_alive) {
415  con_sp_t sp = wp.lock();
416  BOOST_ASSERT(sp);
417  return connect_handler(
418  force_move(sp),
419  force_move(client_id),
420  force_move(username),
421  force_move(password),
422  force_move(will),
424  keep_alive,
426  );
427  }
428  );
429  ep.set_v5_connect_handler(
430  [this, wp]
431  (buffer client_id,
432  optional<buffer> username,
433  optional<buffer> password,
434  optional<will> will,
435  bool clean_start,
436  std::uint16_t keep_alive,
437  v5::properties props) {
438  con_sp_t sp = wp.lock();
439  BOOST_ASSERT(sp);
440  return connect_handler(
441  force_move(sp),
442  force_move(client_id),
443  force_move(username),
444  force_move(password),
445  force_move(will),
446  clean_start,
447  keep_alive,
448  force_move(props)
449  );
450  }
451  );
452  ep.set_disconnect_handler(
453  [this, wp]
454  (){
455  con_sp_t sp = wp.lock();
456  BOOST_ASSERT(sp);
457  return disconnect_handler(force_move(sp));
458  }
459  );
460  ep.set_v5_disconnect_handler(
461  [this, wp]
462  (v5::disconnect_reason_code /*reason_code*/, v5::properties props) {
463  if (h_disconnect_props_) h_disconnect_props_(force_move(props));
464  con_sp_t sp = wp.lock();
465  BOOST_ASSERT(sp);
466  return disconnect_handler(force_move(sp));
467  }
468  );
469  ep.set_puback_handler(
470  [this, wp]
471  (packet_id_t packet_id){
472  con_sp_t sp = wp.lock();
473  BOOST_ASSERT(sp);
474  return puback_handler(
475  force_move(sp),
476  packet_id,
477  v5::puback_reason_code::success,
479  );
480  }
481  );
482  ep.set_v5_puback_handler(
483  [this, wp]
484  (packet_id_t packet_id,
485  v5::puback_reason_code reason_code,
486  v5::properties props){
487  con_sp_t sp = wp.lock();
488  BOOST_ASSERT(sp);
489  return puback_handler(
490  force_move(sp),
491  packet_id,
492  reason_code,
493  force_move(props)
494  );
495  }
496  );
497  ep.set_pubrec_handler(
498  [this, wp]
499  (packet_id_t packet_id){
500  con_sp_t sp = wp.lock();
501  BOOST_ASSERT(sp);
502  return pubrec_handler(
503  force_move(sp),
504  packet_id,
505  v5::pubrec_reason_code::success,
507  );
508  }
509  );
510  ep.set_v5_pubrec_handler(
511  [this, wp]
512  (packet_id_t packet_id,
513  v5::pubrec_reason_code reason_code,
514  v5::properties props){
515  con_sp_t sp = wp.lock();
516  BOOST_ASSERT(sp);
517  return pubrec_handler(
518  force_move(sp),
519  packet_id,
520  reason_code,
521  force_move(props)
522  );
523  }
524  );
525  ep.set_pubrel_handler(
526  [this, wp]
527  (packet_id_t packet_id){
528  con_sp_t sp = wp.lock();
529  BOOST_ASSERT(sp);
530  return pubrel_handler(
531  force_move(sp),
532  packet_id,
533  v5::pubrel_reason_code::success,
535  );
536  }
537  );
538  ep.set_v5_pubrel_handler(
539  [this, wp]
540  (packet_id_t packet_id,
541  v5::pubrel_reason_code reason_code,
542  v5::properties props){
543  con_sp_t sp = wp.lock();
544  BOOST_ASSERT(sp);
545  return pubrel_handler(
546  force_move(sp),
547  packet_id,
548  reason_code,
549  force_move(props)
550  );
551  }
552  );
553  ep.set_pubcomp_handler(
554  [this, wp]
555  (packet_id_t packet_id){
556  con_sp_t sp = wp.lock();
557  BOOST_ASSERT(sp);
558  return pubcomp_handler(
559  force_move(sp),
560  packet_id,
561  v5::pubcomp_reason_code::success,
563  );
564  }
565  );
566  ep.set_v5_pubcomp_handler(
567  [this, wp]
568  (packet_id_t packet_id,
569  v5::pubcomp_reason_code reason_code,
570  v5::properties props){
571  con_sp_t sp = wp.lock();
572  BOOST_ASSERT(sp);
573  return pubcomp_handler(
574  force_move(sp),
575  packet_id,
576  reason_code,
577  force_move(props)
578  );
579  }
580  );
581  ep.set_publish_handler(
582  [this, wp]
583  (optional<packet_id_t> packet_id,
584  publish_options pubopts,
585  buffer topic_name,
586  buffer contents){
587  con_sp_t sp = wp.lock();
588  BOOST_ASSERT(sp);
589  return publish_handler(
590  force_move(sp),
591  packet_id,
592  pubopts,
593  force_move(topic_name),
594  force_move(contents),
596  );
597  }
598  );
599  ep.set_v5_publish_handler(
600  [this, wp]
601  (optional<packet_id_t> packet_id,
602  publish_options pubopts,
603  buffer topic_name,
604  buffer contents,
605  v5::properties props
606  ) {
607  if (h_publish_props_) h_publish_props_(props);
608  con_sp_t sp = wp.lock();
609  BOOST_ASSERT(sp);
610  return publish_handler(
611  force_move(sp),
612  packet_id,
613  pubopts,
614  force_move(topic_name),
615  force_move(contents),
616  force_move(props)
617  );
618  }
619  );
620  ep.set_subscribe_handler(
621  [this, wp]
622  (packet_id_t packet_id,
623  std::vector<subscribe_entry> entries) {
624  con_sp_t sp = wp.lock();
625  BOOST_ASSERT(sp);
626  return subscribe_handler(
627  force_move(sp),
628  packet_id,
629  force_move(entries),
631  );
632  }
633  );
634  ep.set_v5_subscribe_handler(
635  [this, wp]
636  (packet_id_t packet_id,
637  std::vector<subscribe_entry> entries,
638  v5::properties props
639  ) {
640  con_sp_t sp = wp.lock();
641  BOOST_ASSERT(sp);
642  return subscribe_handler(
643  force_move(sp),
644  packet_id,
645  force_move(entries),
646  force_move(props)
647  );
648  }
649  );
650  ep.set_unsubscribe_handler(
651  [this, wp]
652  (packet_id_t packet_id,
653  std::vector<unsubscribe_entry> entries) {
654  con_sp_t sp = wp.lock();
655  BOOST_ASSERT(sp);
656  return unsubscribe_handler(
657  force_move(sp),
658  packet_id,
659  force_move(entries),
661  );
662  }
663  );
664  ep.set_v5_unsubscribe_handler(
665  [this, wp]
666  (packet_id_t packet_id,
667  std::vector<unsubscribe_entry> entries,
668  v5::properties props
669  ) {
670  con_sp_t sp = wp.lock();
671  BOOST_ASSERT(sp);
672  return unsubscribe_handler(
673  force_move(sp),
674  packet_id,
675  force_move(entries),
676  force_move(props)
677  );
678  }
679  );
680  ep.set_pingreq_handler(
681  [this, wp] {
682  con_sp_t sp = wp.lock();
683  BOOST_ASSERT(sp);
684  if (pingresp_) sp->pingresp();
685  return true;
686  }
687  );
688  ep.set_v5_auth_handler(
689  [this]
690  (v5::auth_reason_code /*reason_code*/,
691  v5::properties props
692  ) {
693  if (h_auth_props_) h_auth_props_(force_move(props));
694  return true;
695  }
696  );
697  }
698 
700  connack_props_ = force_move(props);
701  }
702 
704  suback_props_ = force_move(props);
705  }
706 
708  unsuback_props_ = force_move(props);
709  }
710 
712  puback_props_ = force_move(props);
713  }
714 
716  pubrec_props_ = force_move(props);
717  }
718 
720  pubrel_props_ = force_move(props);
721  }
722 
724  pubcomp_props_ = force_move(props);
725  }
726 
727  void set_connect_props_handler(std::function<void(v5::properties const&)> h) {
728  h_connect_props_ = force_move(h);
729  }
730 
731  void set_disconnect_props_handler(std::function<void(v5::properties const&)> h) {
732  h_disconnect_props_ = force_move(h);
733  }
734 
735  void set_publish_props_handler(std::function<void(v5::properties const&)> h) {
736  h_publish_props_ = force_move(h);
737  }
738 
739  void set_puback_props_handler(std::function<void(v5::properties const&)> h) {
740  h_puback_props_ = force_move(h);
741  }
742 
743  void set_pubrec_props_handler(std::function<void(v5::properties const&)> h) {
744  h_pubrec_props_ = force_move(h);
745  }
746 
747  void set_pubrel_props_handler(std::function<void(v5::properties const&)> h) {
748  h_pubrel_props_ = force_move(h);
749  }
750 
751  void set_pubcomp_props_handler(std::function<void(v5::properties const&)> h) {
752  h_pubcomp_props_ = force_move(h);
753  }
754 
755  void set_subscribe_props_handler(std::function<void(v5::properties const&)> h) {
756  h_subscribe_props_ = force_move(h);
757  }
758 
759  void set_unsubscribe_props_handler(std::function<void(v5::properties const&)> h) {
760  h_unsubscribe_props_ = force_move(h);
761  }
762 
763  void set_auth_props_handler(std::function<void(v5::properties const&)> h) {
764  h_auth_props_ = force_move(h);
765  }
766 
768  sessions_.clear();
769  }
770 
772  retains_.clear();
773  }
774 
775 private:
793  bool connect_handler(
794  con_sp_t spep,
795  buffer client_id,
796  optional<buffer> /*username*/,
797  optional<buffer> /*password*/,
798  optional<will> will,
799  bool clean_start,
800  std::uint16_t /*keep_alive*/,
801  v5::properties props
802  ) {
803  auto& ep = *spep;
804 
805  optional<std::chrono::steady_clock::duration> session_expiry_interval;
806  optional<std::chrono::steady_clock::duration> will_expiry_interval;
807 
808  if (ep.get_protocol_version() == protocol_version::v5) {
809  auto v = get_property<v5::property::session_expiry_interval>(props);
810  if (v && v.value().val() != 0) {
811  session_expiry_interval.emplace(std::chrono::seconds(v.value().val()));
812  }
813 
814  if (will) {
815  auto v = get_property<v5::property::message_expiry_interval>(will.value().props());
816  if (v) {
817  will_expiry_interval.emplace(std::chrono::seconds(v.value().val()));
818  }
819  }
820 
821  if (h_connect_props_) {
822  h_connect_props_(props);
823  }
824  }
825 
826  // If the Client supplies a zero-byte ClientId, the Client MUST also set CleanSession to 1 [MQTT-3.1.3-7].
827  // If it's a not a clean session, but no client id is provided, we would have no way to map this
828  // connection's session to a new connection later. So the connection must be rejected.
829  switch (ep.get_protocol_version()) {
830  case protocol_version::v3_1_1:
831  if (client_id.empty() && !clean_start) {
832  ep.connack(false, connect_return_code::identifier_rejected);
833  return false;
834  }
835  break;
836  case protocol_version::v5:
837  if (client_id.empty() && !clean_start) {
838  ep.connack(false, v5::connect_reason_code::client_identifier_not_valid);
839  return false;
840  }
841  break;
842  default:
843  BOOST_ASSERT(false);
844  break;
845  }
846 
847  auto send_connack =
848  [&](bool session_present) {
849  // Reply to the connect message.
850  switch (ep.get_protocol_version()) {
851  case protocol_version::v3_1_1:
852  ep.connack(
853  session_present,
854  connect_return_code::accepted
855  );
856  break;
857  case protocol_version::v5:
858  if (connack_props_.empty()) {
859  ep.connack(
860  session_present,
861  v5::connect_reason_code::success,
863  v5::property::topic_alias_maximum{topic_alias_max}
864  }
865  );
866  }
867  else {
868  ep.connack(
869  session_present,
870  v5::connect_reason_code::success,
871  connack_props_
872  );
873  }
874  break;
875  default:
876  BOOST_ASSERT(false);
877  break;
878  }
879  };
880 
889  // Find any sessions that have the same client_id
890  auto& idx = sessions_.get<tag_cid>();
891  auto it = idx.lower_bound(client_id);
892  if (it == idx.end() || it->client_id() != client_id) {
893  // new connection
894  MQTT_LOG("mqtt_broker", trace)
895  << MQTT_ADD_VALUE(address, this)
896  << "cid:" << client_id
897  << " new connection inserted.";
898  it = idx.emplace_hint(
899  it,
900  ioc_,
901  subs_map_,
902  shared_targets_,
903  spep,
904  client_id,
905  force_move(will),
906  force_move(will_expiry_interval),
907  force_move(session_expiry_interval)
908  );
909 
910  send_connack(false);
911  }
912  else if (it->online()) {
913  // online overwrite
914  if (close_proc(it->con(), true)) {
915  // remain offline
916  if (clean_start) {
917  // discard offline session
918  MQTT_LOG("mqtt_broker", trace)
919  << MQTT_ADD_VALUE(address, this)
920  << "cid:" << client_id
921  << "online connection exists, discard old one due to new one's clean_start and renew";
922  send_connack(false);
923  idx.modify(
924  it,
925  [&](auto& e) {
926  e.clean();
927  e.update_will(ioc_, force_move(will), will_expiry_interval);
928  // TODO: e.will_delay = force_move(will_delay);
929  e.renew_session_expiry(force_move(session_expiry_interval));
930  },
931  [](auto&) { BOOST_ASSERT(false); }
932  );
933  }
934  else {
935  // inherit offline session
936  MQTT_LOG("mqtt_broker", trace)
937  << MQTT_ADD_VALUE(address, this)
938  << "cid:" << client_id
939  << "online connection exists, inherit old one and renew";
940  send_connack(true);
941  idx.modify(
942  it,
943  [&](auto& e) {
944  e.reset_con(spep);
945  e.update_will(ioc_, force_move(will), will_expiry_interval);
946  // TODO: e.will_delay = force_move(will_delay);
947  e.renew_session_expiry(force_move(session_expiry_interval));
948  e.send_inflight_messages();
949  e.send_all_offline_messages();
950  },
951  [](auto&) { BOOST_ASSERT(false); }
952  );
953  // send offline messages
954  }
955  }
956  else {
957  // new connection
958  MQTT_LOG("mqtt_broker", trace)
959  << MQTT_ADD_VALUE(address, this)
960  << "cid:" << client_id
961  << "online connection exists, discard old one due to session_expiry and renew";
962  bool inserted;
963  std::tie(it, inserted) = idx.emplace(
964  ioc_,
965  subs_map_,
966  shared_targets_,
967  spep,
968  client_id,
969  force_move(will),
970  force_move(will_expiry_interval),
971  force_move(session_expiry_interval)
972  );
973  BOOST_ASSERT(inserted);
974  send_connack(false);
975  }
976  }
977  else {
978  // offline -> online
979  if (clean_start) {
980  // discard offline session
981  MQTT_LOG("mqtt_broker", trace)
982  << MQTT_ADD_VALUE(address, this)
983  << "cid:" << client_id
984  << "offline connection exists, discard old one due to new one's clean_start and renew";
985  send_connack(false);
986  idx.modify(
987  it,
988  [&](auto& e) {
989  e.clean();
990  e.reset_con(spep);
991  e.update_will(ioc_, force_move(will), will_expiry_interval);
992  // TODO: e.will_delay = force_move(will_delay);
993  e.renew_session_expiry(force_move(session_expiry_interval));
994  },
995  [](auto&) { BOOST_ASSERT(false); }
996  );
997  }
998  else {
999  // inherit offline session
1000  MQTT_LOG("mqtt_broker", trace)
1001  << MQTT_ADD_VALUE(address, this)
1002  << "cid:" << client_id
1003  << "offline connection exists, inherit old one and renew";
1004  send_connack(true);
1005  idx.modify(
1006  it,
1007  [&](auto& e) {
1008  e.reset_con(spep);
1009  e.update_will(ioc_, force_move(will), will_expiry_interval);
1010  // TODO: e.will_delay = force_move(will_delay);
1011  e.renew_session_expiry(force_move(session_expiry_interval));
1012  e.send_inflight_messages();
1013  e.send_all_offline_messages();
1014  },
1015  [](auto&) { BOOST_ASSERT(false); }
1016  );
1017  }
1018  }
1019 
1020  return true;
1021  }
1022 
1023  void disconnect_handler(
1024  con_sp_t spep
1025  ) {
1026  if (delay_disconnect_) {
1027  tim_disconnect_.expires_after(delay_disconnect_.value());
1028  tim_disconnect_.wait();
1029  }
1030  close_proc(force_move(spep), false);
1031  }
1032 
1040  // TODO: Maybe change the name of this function.
1041  bool close_proc(con_sp_t spep, bool send_will) {
1042  endpoint_t& ep = *spep;
1043 
1044  auto& idx = sessions_.get<tag_con>();
1045  auto it = idx.find(spep);
1046 
1047  // act_sess_it == act_sess_idx.end() could happen if broker accepts
1048  // the session from client but the client closes the session before sending
1049  // MQTT `CONNECT` message.
1050  // In this case, do nothing is correct behavior.
1051  if (it == idx.end()) return false;
1052 
1053  bool session_clear =
1054  [&] {
1055  if (ep.get_protocol_version() == protocol_version::v3_1_1) {
1056  return ep.clean_session();
1057  }
1058  else {
1059  BOOST_ASSERT(ep.get_protocol_version() == protocol_version::v5);
1060  auto const& sei_opt = it->session_expiry_interval();
1061  return !sei_opt || sei_opt.value() == std::chrono::steady_clock::duration::zero();
1062  }
1063  } ();
1064 
1065  auto do_send_will =
1066  [&](session_state& session) {
1067  if (session.will()) {
1068  if (send_will) {
1069  // TODO: This should be triggered by the will delay
1070  // Not sent immediately.
1071  auto props = force_move(session.will().value().props());
1072 
1073  if (session.get_tim_will_expiry()) {
1074  auto d =
1075  std::chrono::duration_cast<std::chrono::seconds>(
1076  session.get_tim_will_expiry()->expiry() - std::chrono::steady_clock::now()
1077  ).count();
1078  if (d < 0) d = 0;
1079  set_property<v5::property::message_expiry_interval>(
1080  props,
1081  v5::property::message_expiry_interval(
1082  static_cast<uint32_t>(d)
1083  )
1084  );
1085  }
1086 
1087  do_publish(
1088  ep,
1089  force_move(session.will().value().topic()),
1090  force_move(session.will().value().message()),
1091  session.will().value().get_qos() | session.will().value().get_retain(),
1092  props
1093  );
1094  }
1095  else {
1096  session.reset_will();
1097  }
1098  }
1099  };
1100 
1101  if (session_clear) {
1102  idx.modify(
1103  it,
1104  [&](session_state& e) {
1105  do_send_will(e);
1106  e.con()->force_disconnect();
1107  },
1108  [](auto&) { BOOST_ASSERT(false); }
1109  );
1110  idx.erase(it);
1111  BOOST_ASSERT(sessions_.get<tag_con>().find(spep) == sessions_.get<tag_con>().end());
1112  return false;
1113  }
1114  else {
1115  idx.modify(
1116  it,
1117  [&](session_state& e) {
1118  do_send_will(e);
1119  e.con()->force_disconnect();
1120  e.become_offline(
1121  [this]
1122  (std::shared_ptr<as::steady_timer> const& sp_tim) {
1123  sessions_.get<tag_tim>().erase(sp_tim);
1124  }
1125  );
1126  },
1127  [](auto&) { BOOST_ASSERT(false); }
1128  );
1129 
1130  return true;
1131  }
1132 
1133  }
1134 
1135  bool publish_handler(
1136  con_sp_t spep,
1137  optional<packet_id_t> packet_id,
1138  publish_options pubopts,
1139  buffer topic_name,
1140  buffer contents,
1141  v5::properties props) {
1142 
1143  auto& ep = *spep;
1144 
1145  auto& idx = sessions_.get<tag_con>();
1146  auto it = idx.find(spep);
1147  BOOST_ASSERT(it != idx.end());
1148 
1149  auto send_pubrec =
1150  [&] {
1151  switch (pubopts.get_qos()) {
1152  case qos::at_least_once:
1153  ep.puback(packet_id.value(), v5::puback_reason_code::success, puback_props_);
1154  break;
1155  case qos::exactly_once: {
1156  idx.modify(
1157  it,
1158  [&](auto& e) {
1159  e.exactly_once_start(packet_id.value());
1160  }
1161  );
1162  ep.pubrec(packet_id.value(), v5::pubrec_reason_code::success, pubrec_props_);
1163  } break;
1164  default:
1165  break;
1166  }
1167  };
1168 
1169  if (packet_id) {
1170  if (pubopts.get_qos() == qos::exactly_once &&
1171  it->exactly_once_processing(packet_id.value())) {
1172  MQTT_LOG("mqtt_broker", info)
1173  << MQTT_ADD_VALUE(address, spep.get())
1174  << "receive already processed publish pid:" << packet_id.value();
1175  send_pubrec();
1176  return true;
1177  }
1178  }
1179 
1180  v5::properties forward_props;
1181 
1182  for (auto&& p : props) {
1185  [](v5::property::topic_alias&&) {
1186  // TopicAlias is not forwarded
1187  // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901113
1188  // A receiver MUST NOT carry forward any Topic Alias mappings from
1189  // one Network Connection to another [MQTT-3.3.2-7].
1190  },
1191  [&spep](v5::property::subscription_identifier&& p) {
1192  MQTT_LOG("mqtt_broker", warning)
1193  << MQTT_ADD_VALUE(address, spep.get())
1194  << "Subscription Identifier from client not forwarded sid:" << p.val();
1195  },
1196  [&forward_props](auto&& p) {
1197  forward_props.push_back(force_move(p));
1198  }
1199  ),
1200  force_move(p)
1201  );
1202  }
1203 
1204  do_publish(
1205  ep,
1206  force_move(topic_name),
1207  force_move(contents),
1208  pubopts.get_qos() | pubopts.get_retain(), // remove dup flag
1209  force_move(forward_props)
1210  );
1211 
1212  switch (ep.get_protocol_version()) {
1213  case protocol_version::v3_1_1:
1214  switch (pubopts.get_qos()) {
1215  case qos::at_least_once:
1216  ep.puback(packet_id.value());
1217  break;
1218  case qos::exactly_once:
1219  send_pubrec();
1220  break;
1221  default:
1222  break;
1223  }
1224  break;
1225  case protocol_version::v5:
1226  switch (pubopts.get_qos()) {
1227  case qos::at_least_once:
1228  ep.puback(packet_id.value(), v5::puback_reason_code::success, puback_props_);
1229  break;
1230  case qos::exactly_once:
1231  send_pubrec();
1232  break;
1233  default:
1234  break;
1235  }
1236  break;
1237  default:
1238  BOOST_ASSERT(false);
1239  break;
1240  }
1241 
1242  return true;
1243  }
1244 
1245  bool puback_handler(
1246  con_sp_t spep,
1247  packet_id_t packet_id,
1248  v5::puback_reason_code /*reason_code*/,
1249  v5::properties /*props*/) {
1250  auto& idx = sessions_.get<tag_con>();
1251  auto it = idx.find(spep);
1252  BOOST_ASSERT(it != idx.end());
1253  idx.modify(
1254  it,
1255  [&](auto& e) {
1258  }
1259  );
1260  return true;
1261  }
1262 
1263  bool pubrec_handler(
1264  con_sp_t spep,
1265  packet_id_t packet_id,
1266  v5::pubrec_reason_code /*reason_code*/,
1267  v5::properties /*props*/) {
1268  auto& idx = sessions_.get<tag_con>();
1269  auto it = idx.find(spep);
1270  BOOST_ASSERT(it != idx.end());
1271  idx.modify(
1272  it,
1273  [&](auto& e) {
1275  }
1276  );
1277 
1278  auto& ep = *spep;
1279 
1280  switch (ep.get_protocol_version()) {
1281  case protocol_version::v3_1_1:
1282  ep.pubrel(packet_id);
1283  break;
1284  case protocol_version::v5:
1285  ep.pubrel(packet_id, v5::pubrel_reason_code::success, pubrel_props_);
1286  break;
1287  default:
1288  BOOST_ASSERT(false);
1289  break;
1290  }
1291  return true;
1292  }
1293 
1294  bool pubrel_handler(
1295  con_sp_t spep,
1296  packet_id_t packet_id,
1297  v5::pubrel_reason_code /*reason_code*/,
1298  v5::properties /*props*/) {
1299  auto& idx = sessions_.get<tag_con>();
1300  auto it = idx.find(spep);
1301  BOOST_ASSERT(it != idx.end());
1302  idx.modify(
1303  it,
1304  [&](auto& e) {
1305  e.exactly_once_finish(packet_id);
1306  }
1307  );
1308 
1309  auto& ep = *spep;
1310 
1311  switch (ep.get_protocol_version()) {
1312  case protocol_version::v3_1_1:
1313  ep.pubcomp(packet_id);
1314  break;
1315  case protocol_version::v5:
1316  ep.pubcomp(packet_id, v5::pubcomp_reason_code::success, pubcomp_props_);
1317  break;
1318  default:
1319  BOOST_ASSERT(false);
1320  break;
1321  }
1322  return true;
1323  }
1324 
1325  bool pubcomp_handler(
1326  con_sp_t spep,
1327  packet_id_t packet_id,
1328  v5::pubcomp_reason_code /*reason_code*/,
1329  v5::properties /*props*/){
1330  auto& idx = sessions_.get<tag_con>();
1331  auto it = idx.find(spep);
1332  BOOST_ASSERT(it != idx.end());
1333  idx.modify(
1334  it,
1335  [&](auto& e) {
1338  }
1339  );
1340  return true;
1341  }
1342 
1343  bool subscribe_handler(
1344  con_sp_t spep,
1345  packet_id_t packet_id,
1346  std::vector<subscribe_entry> entries,
1347  v5::properties props) {
1348 
1349  auto& ep = *spep;
1350 
1351  auto& idx = sessions_.get<tag_con>();
1352  auto it = idx.find(spep);
1353  BOOST_ASSERT(it != idx.end());
1354 
1355  // The element of sessions_ must have longer lifetime
1356  // than corresponding subscription.
1357  // Because the subscription store the reference of the element.
1358  optional<session_state_ref> ssr_opt;
1359  idx.modify(
1360  it,
1361  [&](session_state& e) {
1362  ssr_opt.emplace(e);
1363  },
1364  [](auto&) { BOOST_ASSERT(false); }
1365  );
1366 
1367  BOOST_ASSERT(ssr_opt);
1368  session_state_ref ssr {ssr_opt.value()};
1369 
1370  auto publish_proc =
1371  [this, &ssr](retain_t const& r, qos qos_value, optional<std::size_t> sid) {
1372  auto props = r.props;
1373  if (sid) {
1374  props.push_back(v5::property::subscription_identifier(*sid));
1375  }
1376  if (r.tim_message_expiry) {
1377  auto d =
1378  std::chrono::duration_cast<std::chrono::seconds>(
1379  r.tim_message_expiry->expiry() - std::chrono::steady_clock::now()
1380  ).count();
1381  set_property<v5::property::message_expiry_interval>(
1382  props,
1383  v5::property::message_expiry_interval(
1384  static_cast<uint32_t>(d)
1385  )
1386  );
1387  }
1388  ssr.get().publish(
1389  ioc_,
1390  r.topic,
1391  r.contents,
1392  std::min(r.qos_value, qos_value) | MQTT_NS::retain::yes,
1393  props
1394  );
1395  };
1396 
1397  std::vector<std::function<void()>> retain_deliver;
1398  retain_deliver.reserve(entries.size());
1399 
1400  // subscription identifier
1401  optional<std::size_t> sid;
1402 
1403  // An in-order list of qos settings, used to send the reply.
1404  // The MQTT protocol 3.1.1 - 3.8.4 Response - paragraph 6
1405  // allows the server to grant a lower QOS than requested
1406  // So we reply with the QOS setting that was granted
1407  // not the one requested.
1408  switch (ep.get_protocol_version()) {
1409  case protocol_version::v3_1_1: {
1410  std::vector<suback_return_code> res;
1411  res.reserve(entries.size());
1412  for (auto& e : entries) {
1413  res.emplace_back(qos_to_suback_return_code(e.subopts.get_qos())); // converts to granted_qos_x
1414  ssr.get().subscribe(
1415  force_move(e.share_name),
1416  e.topic_filter,
1417  e.subopts,
1418  [&] {
1419  retains_.find(
1420  e.topic_filter,
1421  [&](retain_t const& r) {
1422  retain_deliver.emplace_back(
1423  [&publish_proc, &r, qos_value = e.subopts.get_qos(), sid] {
1424  publish_proc(r, qos_value, sid);
1425  }
1426  );
1427  }
1428  );
1429  }
1430  );
1431  }
1432  // Acknowledge the subscriptions, and the registered QOS settings
1433  ep.suback(packet_id, force_move(res));
1434  } break;
1435  case protocol_version::v5: {
1436  // Get subscription identifier
1437  auto v = get_property<v5::property::subscription_identifier>(props);
1438  if (v && v.value().val() != 0) {
1439  sid.emplace(v.value().val());
1440  }
1441 
1442  std::vector<v5::suback_reason_code> res;
1443  res.reserve(entries.size());
1444  for (auto& e : entries) {
1445  res.emplace_back(v5::qos_to_suback_reason_code(e.subopts.get_qos())); // converts to granted_qos_x
1446  ssr.get().subscribe(
1447  force_move(e.share_name),
1448  e.topic_filter,
1449  e.subopts,
1450  [&] {
1451  retains_.find(
1452  e.topic_filter,
1453  [&](retain_t const& r) {
1454  retain_deliver.emplace_back(
1455  [&publish_proc, &r, qos_value = e.subopts.get_qos(), sid] {
1456  publish_proc(r, qos_value, sid);
1457  }
1458  );
1459  }
1460  );
1461  },
1462  sid
1463  );
1464  }
1465  if (h_subscribe_props_) h_subscribe_props_(props);
1466  // Acknowledge the subscriptions, and the registered QOS settings
1467  ep.suback(packet_id, force_move(res), suback_props_);
1468  } break;
1469  default:
1470  BOOST_ASSERT(false);
1471  break;
1472  }
1473 
1474  for (auto const& f : retain_deliver) {
1475  f();
1476  }
1477  return true;
1478  }
1479 
1480  bool unsubscribe_handler(
1481  con_sp_t spep,
1482  packet_id_t packet_id,
1483  std::vector<unsubscribe_entry> entries,
1484  v5::properties props) {
1485 
1486  auto& ep = *spep;
1487 
1488  auto& idx = sessions_.get<tag_con>();
1489  auto it = idx.find(spep);
1490  BOOST_ASSERT(it != idx.end());
1491 
1492 
1493  // The element of sessions_ must have longer lifetime
1494  // than corresponding subscription.
1495  // Because the subscription store the reference of the element.
1496  optional<session_state_ref> ssr_opt;
1497  idx.modify(
1498  it,
1499  [&](session_state& e) {
1500  ssr_opt.emplace(e);
1501  },
1502  [](auto&) { BOOST_ASSERT(false); }
1503  );
1504 
1505  BOOST_ASSERT(ssr_opt);
1506  session_state_ref ssr {ssr_opt.value()};
1507 
1508  // For each subscription that this connection has
1509  // Compare against the list of topic filters, and remove
1510  // the subscription if the topic filter is in the list.
1511  for (auto const& e : entries) {
1512  ssr.get().unsubscribe(e.share_name, e.topic_filter);
1513  }
1514 
1515  switch (ep.get_protocol_version()) {
1516  case protocol_version::v3_1_1:
1517  ep.unsuback(packet_id);
1518  break;
1519  case protocol_version::v5:
1520  if (h_unsubscribe_props_) h_unsubscribe_props_(props);
1521  ep.unsuback(
1522  packet_id,
1523  std::vector<v5::unsuback_reason_code>(
1524  entries.size(),
1525  v5::unsuback_reason_code::success
1526  ),
1527  unsuback_props_
1528  );
1529  break;
1530  default:
1531  BOOST_ASSERT(false);
1532  break;
1533  }
1534 
1535  return true;
1536  }
1537 
1548  void do_publish(
1549  endpoint_t& ep,
1550  buffer topic,
1551  buffer contents,
1552  publish_options pubopts,
1553  v5::properties props) {
1554 
1555  // publish the message to subscribers.
1556  // retain is delivered as the original only if rap_value is rap::retain.
1557  // On MQTT v3.1.1, rap_value is always rap::dont.
1558  auto deliver =
1559  [&] (session_state& ss, subscription& sub) {
1560  publish_options new_pubopts = std::min(pubopts.get_qos(), sub.subopts.get_qos());
1561  if (sub.subopts.get_rap() == rap::retain && pubopts.get_retain() == MQTT_NS::retain::yes) {
1562  new_pubopts |= MQTT_NS::retain::yes;
1563  }
1564 
1565  if (sub.sid) {
1566  props.push_back(v5::property::subscription_identifier(sub.sid.value()));
1567  ss.deliver(
1568  ioc_,
1569  topic,
1570  contents,
1571  new_pubopts,
1572  props
1573  );
1574  props.pop_back();
1575  }
1576  else {
1577  ss.deliver(
1578  ioc_,
1579  topic,
1580  contents,
1581  new_pubopts,
1582  props
1583  );
1584  }
1585  };
1586 
1587  // share_name topic_filter
1588  std::set<std::tuple<string_view, string_view>> sent;
1589 
1590  subs_map_.modify(
1591  topic,
1592  [&](buffer const& /*key*/, subscription& sub) {
1593  if (sub.share_name.empty()) {
1594  // Non shared subscriptions
1595 
1596  // If NL (no local) subscription option is set and
1597  // publisher is the same as subscriber, then skip it.
1598  if (sub.subopts.get_nl() == nl::yes &&
1599  sub.ss.get().con().get() == &ep) return;
1600  deliver(sub.ss.get(), sub);
1601  }
1602  else {
1603  // Shared subscriptions
1604  bool inserted;
1605  std::tie(std::ignore, inserted) = sent.emplace(sub.share_name, sub.topic_filter);
1606  if (inserted) {
1607  if (auto ssr_opt = shared_targets_.get_target(sub.share_name, sub.topic_filter)) {
1608  deliver(ssr_opt.value().get(), sub);
1609  }
1610  }
1611  }
1612  }
1613  );
1614 
1615  optional<std::chrono::steady_clock::duration> message_expiry_interval;
1616  if (ep.get_protocol_version() == protocol_version::v5) {
1617  auto v = get_property<v5::property::message_expiry_interval>(props);
1618  if (v) {
1619  message_expiry_interval.emplace(std::chrono::seconds(v.value().val()));
1620  }
1621 
1622  }
1623 
1624  /*
1625  * If the message is marked as being retained, then we
1626  * keep it in case a new subscription is added that matches
1627  * this topic.
1628  *
1629  * @note: The MQTT standard 3.3.1.3 RETAIN makes it clear that
1630  * retained messages are global based on the topic, and
1631  * are not scoped by the client id. So any client may
1632  * publish a retained message on any topic, and the most
1633  * recently published retained message on a particular
1634  * topic is the message that is stored on the server.
1635  *
1636  * @note: The standard doesn't make it clear that publishing
1637  * a message with zero length, but the retain flag not
1638  * set, does not result in any existing retained message
1639  * being removed. However, internet searching indicates
1640  * that most brokers have opted to keep retained messages
1641  * when receiving contents of zero bytes, unless the so
1642  * received message has the retain flag set, in which case
1643  * the retained message is removed.
1644  */
1645  if (pubopts.get_retain() == MQTT_NS::retain::yes) {
1646  if (contents.empty()) {
1647  retains_.erase(topic);
1648  }
1649  else {
1650  std::shared_ptr<as::steady_timer> tim_message_expiry;
1651  if (message_expiry_interval) {
1652  tim_message_expiry = std::make_shared<as::steady_timer>(ioc_, message_expiry_interval.value());
1653  tim_message_expiry->async_wait(
1654  [this, topic = topic, wp = std::weak_ptr<as::steady_timer>(tim_message_expiry)]
1655  (boost::system::error_code const& ec) {
1656  if (auto sp = wp.lock()) {
1657  if (!ec) {
1658  retains_.erase(topic);
1659  }
1660  }
1661  }
1662  );
1663  }
1664 
1665  retains_.insert_or_assign(
1666  topic,
1667  retain_t {
1668  force_move(topic),
1669  force_move(contents),
1670  force_move(props),
1671  pubopts.get_qos(),
1672  tim_message_expiry
1673  }
1674  );
1675  }
1676  }
1677  }
1678 
1679 private:
1680  as::io_context& ioc_;
1681  as::steady_timer tim_disconnect_;
1682  optional<std::chrono::steady_clock::duration> delay_disconnect_;
1683 
1684  sub_con_map subs_map_;
1685  shared_target shared_targets_;
1686 
1690  session_states sessions_;
1691 
1692 
1693  retained_messages retains_;
1694 
1695  // MQTTv5 members
1696  v5::properties connack_props_;
1697  v5::properties suback_props_;
1698  v5::properties unsuback_props_;
1699  v5::properties puback_props_;
1700  v5::properties pubrec_props_;
1701  v5::properties pubrel_props_;
1702  v5::properties pubcomp_props_;
1703  std::function<void(v5::properties const&)> h_connect_props_;
1704  std::function<void(v5::properties const&)> h_disconnect_props_;
1705  std::function<void(v5::properties const&)> h_publish_props_;
1706  std::function<void(v5::properties const&)> h_puback_props_;
1707  std::function<void(v5::properties const&)> h_pubrec_props_;
1708  std::function<void(v5::properties const&)> h_pubrel_props_;
1709  std::function<void(v5::properties const&)> h_pubcomp_props_;
1710  std::function<void(v5::properties const&)> h_subscribe_props_;
1711  std::function<void(v5::properties const&)> h_unsubscribe_props_;
1712  std::function<void(v5::properties const&)> h_auth_props_;
1713  bool pingresp_ = true;
1714 };
1715 
1717 
1718 #endif // MQTT_BROKER_BROKER_HPP
MQTT_STRING_VIEW_CONSTEXPR bool validate_topic_filter(string_view topic_filter)
Definition: broker.hpp:48
MQTT_STRING_VIEW_CONSTEXPR bool compare_topic_filter(string_view topic_filter, string_view topic_name)
Definition: broker.hpp:176
#define MQTT_STRING_VIEW_CONSTEXPR
Definition: broker.hpp:37
MQTT_STRING_VIEW_CONSTEXPR bool validate_topic_name(string_view topic_name)
Definition: broker.hpp:144
#define MQTT_BROKER_NS_END
Definition: broker_namespace.hpp:22
#define MQTT_BROKER_NS_BEGIN
Definition: broker_namespace.hpp:21
Definition: broker.hpp:256
void set_unsuback_props(v5::properties props)
Definition: broker.hpp:707
void set_publish_props_handler(std::function< void(v5::properties const &)> h)
Definition: broker.hpp:735
void set_pubrec_props(v5::properties props)
Definition: broker.hpp:715
void set_pubrel_props(v5::properties props)
Definition: broker.hpp:719
void set_pingresp(bool b)
set pingresp send operaton
Definition: broker.hpp:283
void set_unsubscribe_props_handler(std::function< void(v5::properties const &)> h)
Definition: broker.hpp:759
void set_connack_props(v5::properties props)
Definition: broker.hpp:699
void set_disconnect_props_handler(std::function< void(v5::properties const &)> h)
Definition: broker.hpp:731
void set_suback_props(v5::properties props)
Definition: broker.hpp:703
void set_pubcomp_props_handler(std::function< void(v5::properties const &)> h)
Definition: broker.hpp:751
void set_pubrel_props_handler(std::function< void(v5::properties const &)> h)
Definition: broker.hpp:747
void clear_all_sessions()
Definition: broker.hpp:767
void set_connect_props_handler(std::function< void(v5::properties const &)> h)
Definition: broker.hpp:727
broker_t(as::io_context &ioc)
Definition: broker.hpp:258
void handle_accept(con_sp_t spep)
handle_accept
Definition: broker.hpp:304
void clear_all_retained_topics()
Definition: broker.hpp:771
void set_disconnect_delay(std::chrono::steady_clock::duration delay)
set_disconnect_delay adds a delay to disconnect operations.
Definition: broker.hpp:273
void set_puback_props_handler(std::function< void(v5::properties const &)> h)
Definition: broker.hpp:739
void set_puback_props(v5::properties props)
Definition: broker.hpp:711
void set_auth_props_handler(std::function< void(v5::properties const &)> h)
Definition: broker.hpp:763
void set_pubrec_props_handler(std::function< void(v5::properties const &)> h)
Definition: broker.hpp:743
void set_subscribe_props_handler(std::function< void(v5::properties const &)> h)
Definition: broker.hpp:755
void set_pubcomp_props(v5::properties props)
Definition: broker.hpp:723
void clear()
Definition: retained_topic_map.hpp:351
Definition: session_state.hpp:433
decltype(auto) get()
Definition: session_state.hpp:436
void clear()
Definition: session_state.hpp:445
Definition: shared_target.hpp:32
std::weak_ptr< endpoint_t > con_wp_t
Definition: common_type.hpp:19
server<>::endpoint_t endpoint_t
Definition: common_type.hpp:17
endpoint_t::packet_id_t packet_id_t
Definition: common_type.hpp:20
std::shared_ptr< endpoint_t > con_sp_t
Definition: common_type.hpp:18
#define MQTT_LOG(chan, sev)
Definition: log.hpp:135
#define MQTT_ADD_VALUE(name, val)
Definition: log.hpp:136
constexpr char const clean_start
Definition: connect_flags.hpp:19
constexpr char const clean_session
Definition: connect_flags.hpp:18
auth_reason_code
Definition: reason_code.hpp:385
pubrel_reason_code
Definition: reason_code.hpp:341
puback_reason_code
Definition: reason_code.hpp:269
pubrec_reason_code
Definition: reason_code.hpp:305
constexpr suback_reason_code qos_to_suback_reason_code(qos q)
Definition: reason_code.hpp:233
pubcomp_reason_code
Definition: reason_code.hpp:363
std::vector< property_variant > properties
Definition: property_variant.hpp:51
disconnect_reason_code
Definition: reason_code.hpp:114
boost::string_ref string_view
Definition: string_view.hpp:64
constexpr decltype(auto) visit(Visitor &&vis, Variants &&... vars)
Definition: variant.hpp:60
boost::system::error_code error_code
Definition: error_code.hpp:16
constexpr std::remove_reference_t< T > && force_move(T &&t)
Definition: move.hpp:20
constexpr suback_return_code qos_to_suback_return_code(qos q)
Definition: reason_code.hpp:44
retain
Definition: publish.hpp:42
lambda_visitor< Lambdas... > make_lambda_visitor(Lambdas &&... lambdas)
Definition: visitor_util.hpp:37
qos
Definition: subscribe_options.hpp:34
Definition: buffer.hpp:242
const_buffer buffer(MQTT_NS::buffer const &data)
create boost::asio::const_buffer from the MQTT_NS::buffer boost::asio::const_buffer is a kind of view...
Definition: buffer.hpp:253
std::reference_wrapper< session_state > session_state_ref
Definition: session_state_fwd.hpp:20
Definition: retain_t.hpp:25
buffer contents
Definition: retain_t.hpp:40
qos qos_value
Definition: retain_t.hpp:42
buffer topic
Definition: retain_t.hpp:39
v5::properties props
Definition: retain_t.hpp:41
std::shared_ptr< as::steady_timer > tim_message_expiry
Definition: retain_t.hpp:43
Definition: session_state.hpp:53
void erase_inflight_message_by_packet_id(packet_id_t packet_id)
Definition: session_state.hpp:369
void exactly_once_start(packet_id_t packet_id)
Definition: session_state.hpp:242
void deliver(as::io_context &ioc, buffer pub_topic, buffer contents, publish_options pubopts, v5::properties props)
Definition: session_state.hpp:207
con_sp_t const & con() const
Definition: session_state.hpp:396
void become_offline(SessionExpireHandler &&h)
Definition: session_state.hpp:88
void exactly_once_finish(packet_id_t packet_id)
Definition: session_state.hpp:250
void send_offline_messages_by_packet_id_release()
Definition: session_state.hpp:379
Definition: subscription.hpp:20
buffer share_name
Definition: subscription.hpp:35
Definition: tags.hpp:21
Definition: tags.hpp:17
Definition: tags.hpp:23