7 #if !defined(MQTT_BROKER_BROKER_HPP)
8 #define MQTT_BROKER_BROKER_HPP
14 #include <boost/lexical_cast.hpp>
30 namespace mi = boost::multi_index;
34 #if defined(MQTT_STD_STRING_VIEW)
35 #define MQTT_STRING_VIEW_CONSTEXPR constexpr
37 #define MQTT_STRING_VIEW_CONSTEXPR
56 if (topic_filter.empty() || (topic_filter.size() > std::numeric_limits<std::uint16_t>::max())) {
60 for (string_view::size_type idx = topic_filter.find_first_of(
string_view(
"\0+#", 3));
61 string_view::npos != idx;
62 idx = topic_filter.find_first_of(
string_view(
"\0+#", 3), idx+1)) {
64 (
'\0' == topic_filter[idx])
65 || (
'+' == topic_filter[idx])
66 || (
'#' == topic_filter[idx])
68 if (
'\0' == topic_filter[idx]) {
72 else if (
'+' == topic_filter[idx]) {
77 if ((0 != idx) && (
'/' != topic_filter[idx-1])) {
85 if ((topic_filter.size()-1 != idx) && (
'/' != topic_filter[idx+1])) {
90 else if (
'#' == topic_filter[idx]) {
95 if (idx != topic_filter.size()-1) {
104 if ((0 != idx) && (
'/' != topic_filter[idx-1])) {
115 #if defined(MQTT_STD_STRING_VIEW)
117 static_assert( !
validate_topic_filter(
""),
"All Topic Names and Topic Filters MUST be at least one character long");
118 static_assert(
validate_topic_filter(
"/"),
"A Topic Name or Topic Filter consisting only of the ‘/’ character is valid");
120 static_assert(
validate_topic_filter(
" "),
"Topic Names and Topic Filters can include the space character");
121 static_assert(
validate_topic_filter(
"/////"),
"Topic level separators can appear anywhere in a Topic Filter or Topic Name. Adjacent Topic level separators indicate a zero-length topic level");
122 static_assert(
validate_topic_filter(
"#"),
"The multi-level wildcard character MUST be specified either on its own or following a topic level separator");
123 static_assert(
validate_topic_filter(
"/#"),
"The multi-level wildcard character MUST be specified either on its own or following a topic level separator");
124 static_assert(
validate_topic_filter(
"+/#"),
"The multi-level wildcard character MUST be specified either on its own or following a topic level separator");
125 static_assert( !
validate_topic_filter(
"+#"),
"The multi-level wildcard character MUST be specified either on its own or following a topic level separator");
126 static_assert( !
validate_topic_filter(
"++"),
"The multi-level wildcard character MUST be specified either on its own or following a topic level separator");
127 static_assert( !
validate_topic_filter(
"f#"),
"The multi-level wildcard character MUST be specified either on its own or following a topic level separator");
128 static_assert( !
validate_topic_filter(
"#/"),
"In either case the multi-level wildcard character MUST be the last character specified in the Topic Filter");
130 static_assert(
validate_topic_filter(
"+"),
"The single-level wildcard can be used at any level in the Topic Filter, including first and last levels");
131 static_assert(
validate_topic_filter(
"+/bob/alice/sue"),
"The single-level wildcard can be used at any level in the Topic Filter, including first and last levels");
132 static_assert(
validate_topic_filter(
"bob/alice/sue/+"),
"The single-level wildcard can be used at any level in the Topic Filter, including first and last levels");
133 static_assert(
validate_topic_filter(
"+/bob/alice/sue/+"),
"The single-level wildcard can be used at any level in the Topic Filter, including first and last levels");
134 static_assert(
validate_topic_filter(
"+/bob/+/sue/+"),
"The single-level wildcard can be used at any level in the Topic Filter, including first and last levels");
135 static_assert(
validate_topic_filter(
"+/bob/+/sue/#"),
"The single-level wildcard can be used at more than one level in the Topic Filter and can be used in conjunction with the multi-level wildcard");
136 static_assert( !
validate_topic_filter(
"+a"),
"Where it is used, the single-level wildcard MUST occupy an entire level of the filter.");
137 static_assert( !
validate_topic_filter(
"a+"),
"Where it is used, the single-level wildcard MUST occupy an entire level of the filter.");
138 static_assert( !
validate_topic_filter(
"/a+"),
"Where it is used, the single-level wildcard MUST occupy an entire level of the filter.");
139 static_assert( !
validate_topic_filter(
"a+/"),
"Where it is used, the single-level wildcard MUST occupy an entire level of the filter.");
140 static_assert( !
validate_topic_filter(
"/a+/"),
"Where it is used, the single-level wildcard MUST occupy an entire level of the filter.");
156 && (topic_name.size() <= std::numeric_limits<std::uint16_t>::max())
157 && (string_view::npos == topic_name.find_first_of(
string_view(
"\0+#", 3)));
160 #if defined(MQTT_STD_STRING_VIEW)
162 static_assert( !
validate_topic_name(
""),
"All Topic Names and Topic Filters MUST be at least one character long");
163 static_assert(
validate_topic_name(
"/"),
"A Topic Name or Topic Filter consisting only of the ‘/’ character is valid");
164 static_assert( !
validate_topic_name(
string_view(
"\0", 1)),
"Topic Names and Topic Filters MUST NOT include the null character (Unicode U+0000)");
165 static_assert(
validate_topic_name(
" "),
"Topic Names and Topic Filters can include the space character");
166 static_assert(
validate_topic_name(
"/////"),
"Topic level separators can appear anywhere in a Topic Filter or Topic Name. Adjacent Topic level separators indicate a zero-length topic level");
167 static_assert( !
validate_topic_name(
"#"),
"The wildcard characters can be used in Topic Filters, but MUST NOT be used within a Topic Name");
168 static_assert( !
validate_topic_name(
"+"),
"The wildcard characters can be used in Topic Filters, but MUST NOT be used within a Topic Name");
169 static_assert( !
validate_topic_name(
"/#"),
"The wildcard characters can be used in Topic Filters, but MUST NOT be used within a Topic Name");
170 static_assert( !
validate_topic_name(
"+/#"),
"The wildcard characters can be used in Topic Filters, but MUST NOT be used within a Topic Name");
171 static_assert( !
validate_topic_name(
"f#"),
"The wildcard characters can be used in Topic Filters, but MUST NOT be used within a Topic Name");
172 static_assert( !
validate_topic_name(
"#/"),
"The wildcard characters can be used in Topic Filters, but MUST NOT be used within a Topic Name");
188 for (string_view::size_type idx = topic_filter.find_first_of(
"+#");
189 string_view::npos != idx;
190 idx = topic_filter.find_first_of(
"+#")) {
192 (
'+' == topic_filter[idx])
193 || (
'#' == topic_filter[idx])
196 if (
'+' == topic_filter[idx]) {
198 if (topic_filter.substr(0, idx) == topic_name.substr(0, idx)) {
209 topic_filter.remove_prefix(idx+1);
214 topic_name.remove_prefix(topic_name.find(
'/', idx));
226 return topic_filter.substr(0, idx) == topic_name.substr(0, idx);
231 return topic_filter == topic_name;
234 #if defined(MQTT_STD_STRING_VIEW)
235 static_assert(
compare_topic_filter(
"bob",
"bob"),
"Topic Names and Topic Filters are case sensitive");
236 static_assert( !
compare_topic_filter(
"Bob",
"bob"),
"Topic Names and Topic Filters are case sensitive");
237 static_assert( !
compare_topic_filter(
"bob",
"boB"),
"Topic Names and Topic Filters are case sensitive");
238 static_assert( !
compare_topic_filter(
"/bob",
"bob"),
"A leading or trailing ‘/’ creates a distinct Topic Name or Topic Filter");
239 static_assert( !
compare_topic_filter(
"bob/",
"bob"),
"A leading or trailing ‘/’ creates a distinct Topic Name or Topic Filter");
240 static_assert( !
compare_topic_filter(
"bob",
"/bob"),
"A leading or trailing ‘/’ creates a distinct Topic Name or Topic Filter");
241 static_assert( !
compare_topic_filter(
"bob",
"bob/"),
"A leading or trailing ‘/’ creates a distinct Topic Name or Topic Filter");
242 static_assert(
compare_topic_filter(
"bob/alice",
"bob/alice"),
"Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
243 static_assert(
compare_topic_filter(
"bob/alice/sue",
"bob/alice/sue"),
"Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
244 static_assert(
compare_topic_filter(
"bob//////sue",
"bob//////sue"),
"Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
245 static_assert(
compare_topic_filter(
"bob/#",
"bob//////sue"),
"Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
246 static_assert( !
compare_topic_filter(
"bob///#",
"bob/sue"),
"Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
247 static_assert(
compare_topic_filter(
"bob/+/sue",
"bob/alice/sue"),
"Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
248 static_assert( !
compare_topic_filter(
"bob/+/sue",
"bob/alice/mary/sue"),
"Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
249 static_assert(
compare_topic_filter(
"#",
"bob/alice/mary/sue"),
"Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
250 static_assert(
compare_topic_filter(
"bob/#",
"bob/alice/mary/sue"),
"Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
251 static_assert(
compare_topic_filter(
"bob/alice/#",
"bob/alice/mary/sue"),
"Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
252 static_assert(
compare_topic_filter(
"bob/alice/mary/#",
"bob/alice/mary/sue"),
"Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
253 static_assert( !
compare_topic_filter(
"bob/alice/mary/sue/#",
"bob/alice/mary/sue"),
"Each non-wildcarded level in the Topic Filter has to match the corresponding level in the Topic Name character for character for the match to succeed");
260 tim_disconnect_(ioc_)
308 ep.socket().lowest_layer().set_option(as::ip::tcp::no_delay(
true));
309 ep.set_auto_pub_response(
false);
310 ep.set_topic_alias_maximum(MQTT_NS::topic_alias_max);
314 ep.start_session(spep);
317 ep.set_close_handler(
324 ep.set_error_handler(
329 auto ver = sp->get_protocol_version();
332 <<
" error_handler is called. ec:" << ec.message() <<
" protocol_version:" << ver;
336 if (sp->connected()) {
338 [&] () -> MQTT_NS::optional<v5::disconnect_reason_code> {
339 if (ec == boost::system::errc::protocol_error) {
342 else if (ec == boost::system::errc::bad_message) {
345 return MQTT_NS::nullopt;
350 <<
"send DISCONNECT reason_code:" << rc.value();
351 sp->disconnect(rc.value());
354 else if (sp->underlying_connected()){
358 [&] () -> MQTT_NS::optional<v5::connect_reason_code> {
359 if (ec ==boost::system::errc::protocol_error) {
362 else if (ec == boost::system::errc::bad_message) {
365 return MQTT_NS::nullopt;
370 <<
"send CONNACK reason_code:" << rc.value();
371 sp->connack(
false, rc.value());
407 ep.set_connect_handler(
410 optional<buffer> username,
411 optional<buffer> password,
414 std::uint16_t keep_alive) {
417 return connect_handler(
429 ep.set_v5_connect_handler(
432 optional<buffer> username,
433 optional<buffer> password,
436 std::uint16_t keep_alive,
440 return connect_handler(
452 ep.set_disconnect_handler(
460 ep.set_v5_disconnect_handler(
463 if (h_disconnect_props_) h_disconnect_props_(
force_move(props));
469 ep.set_puback_handler(
474 return puback_handler(
477 v5::puback_reason_code::success,
482 ep.set_v5_puback_handler(
489 return puback_handler(
497 ep.set_pubrec_handler(
502 return pubrec_handler(
505 v5::pubrec_reason_code::success,
510 ep.set_v5_pubrec_handler(
517 return pubrec_handler(
525 ep.set_pubrel_handler(
530 return pubrel_handler(
533 v5::pubrel_reason_code::success,
538 ep.set_v5_pubrel_handler(
545 return pubrel_handler(
553 ep.set_pubcomp_handler(
558 return pubcomp_handler(
561 v5::pubcomp_reason_code::success,
566 ep.set_v5_pubcomp_handler(
573 return pubcomp_handler(
581 ep.set_publish_handler(
583 (optional<packet_id_t> packet_id,
584 publish_options pubopts,
589 return publish_handler(
599 ep.set_v5_publish_handler(
601 (optional<packet_id_t> packet_id,
602 publish_options pubopts,
607 if (h_publish_props_) h_publish_props_(props);
610 return publish_handler(
620 ep.set_subscribe_handler(
623 std::vector<subscribe_entry> entries) {
626 return subscribe_handler(
634 ep.set_v5_subscribe_handler(
637 std::vector<subscribe_entry> entries,
642 return subscribe_handler(
650 ep.set_unsubscribe_handler(
653 std::vector<unsubscribe_entry> entries) {
656 return unsubscribe_handler(
664 ep.set_v5_unsubscribe_handler(
667 std::vector<unsubscribe_entry> entries,
672 return unsubscribe_handler(
680 ep.set_pingreq_handler(
684 if (pingresp_) sp->pingresp();
688 ep.set_v5_auth_handler(
693 if (h_auth_props_) h_auth_props_(
force_move(props));
793 bool connect_handler(
805 optional<std::chrono::steady_clock::duration> session_expiry_interval;
806 optional<std::chrono::steady_clock::duration> will_expiry_interval;
808 if (ep.get_protocol_version() == protocol_version::v5) {
809 auto v = get_property<v5::property::session_expiry_interval>(props);
810 if (v && v.value().val() != 0) {
811 session_expiry_interval.emplace(std::chrono::seconds(v.value().val()));
815 auto v = get_property<v5::property::message_expiry_interval>(will.value().props());
817 will_expiry_interval.emplace(std::chrono::seconds(v.value().val()));
821 if (h_connect_props_) {
822 h_connect_props_(props);
829 switch (ep.get_protocol_version()) {
830 case protocol_version::v3_1_1:
832 ep.connack(
false, connect_return_code::identifier_rejected);
836 case protocol_version::v5:
838 ep.connack(
false, v5::connect_reason_code::client_identifier_not_valid);
848 [&](
bool session_present) {
850 switch (ep.get_protocol_version()) {
851 case protocol_version::v3_1_1:
854 connect_return_code::accepted
857 case protocol_version::v5:
858 if (connack_props_.empty()) {
861 v5::connect_reason_code::success,
863 v5::property::topic_alias_maximum{topic_alias_max}
870 v5::connect_reason_code::success,
891 auto it = idx.lower_bound(client_id);
892 if (it == idx.end() || it->client_id() != client_id) {
896 <<
"cid:" << client_id
897 <<
" new connection inserted.";
898 it = idx.emplace_hint(
912 else if (it->online()) {
914 if (close_proc(it->con(),
true)) {
920 <<
"cid:" << client_id
921 <<
"online connection exists, discard old one due to new one's clean_start and renew";
927 e.update_will(ioc_,
force_move(will), will_expiry_interval);
929 e.renew_session_expiry(
force_move(session_expiry_interval));
931 [](
auto&) { BOOST_ASSERT(
false); }
938 <<
"cid:" << client_id
939 <<
"online connection exists, inherit old one and renew";
945 e.update_will(ioc_,
force_move(will), will_expiry_interval);
947 e.renew_session_expiry(
force_move(session_expiry_interval));
948 e.send_inflight_messages();
949 e.send_all_offline_messages();
951 [](
auto&) { BOOST_ASSERT(
false); }
960 <<
"cid:" << client_id
961 <<
"online connection exists, discard old one due to session_expiry and renew";
963 std::tie(it, inserted) = idx.emplace(
973 BOOST_ASSERT(inserted);
983 <<
"cid:" << client_id
984 <<
"offline connection exists, discard old one due to new one's clean_start and renew";
991 e.update_will(ioc_,
force_move(will), will_expiry_interval);
993 e.renew_session_expiry(
force_move(session_expiry_interval));
995 [](
auto&) { BOOST_ASSERT(
false); }
1002 <<
"cid:" << client_id
1003 <<
"offline connection exists, inherit old one and renew";
1009 e.update_will(ioc_,
force_move(will), will_expiry_interval);
1011 e.renew_session_expiry(
force_move(session_expiry_interval));
1012 e.send_inflight_messages();
1013 e.send_all_offline_messages();
1015 [](
auto&) { BOOST_ASSERT(
false); }
1023 void disconnect_handler(
1026 if (delay_disconnect_) {
1027 tim_disconnect_.expires_after(delay_disconnect_.value());
1028 tim_disconnect_.wait();
1041 bool close_proc(
con_sp_t spep,
bool send_will) {
1045 auto it = idx.find(spep);
1051 if (it == idx.end())
return false;
1053 bool session_clear =
1055 if (ep.get_protocol_version() == protocol_version::v3_1_1) {
1056 return ep.clean_session();
1059 BOOST_ASSERT(ep.get_protocol_version() == protocol_version::v5);
1060 auto const& sei_opt = it->session_expiry_interval();
1061 return !sei_opt || sei_opt.value() == std::chrono::steady_clock::duration::zero();
1067 if (session.will()) {
1071 auto props =
force_move(session.will().value().props());
1073 if (session.get_tim_will_expiry()) {
1075 std::chrono::duration_cast<std::chrono::seconds>(
1076 session.get_tim_will_expiry()->expiry() - std::chrono::steady_clock::now()
1079 set_property<v5::property::message_expiry_interval>(
1081 v5::property::message_expiry_interval(
1082 static_cast<uint32_t
>(d)
1090 force_move(session.will().value().message()),
1091 session.will().value().get_qos() | session.will().value().get_retain(),
1096 session.reset_will();
1101 if (session_clear) {
1106 e.
con()->force_disconnect();
1108 [](
auto&) { BOOST_ASSERT(
false); }
1119 e.
con()->force_disconnect();
1122 (std::shared_ptr<as::steady_timer>
const& sp_tim) {
1127 [](
auto&) { BOOST_ASSERT(
false); }
1135 bool publish_handler(
1137 optional<packet_id_t> packet_id,
1138 publish_options pubopts,
1146 auto it = idx.find(spep);
1147 BOOST_ASSERT(it != idx.end());
1151 switch (pubopts.get_qos()) {
1152 case qos::at_least_once:
1153 ep.puback(packet_id.value(), v5::puback_reason_code::success, puback_props_);
1155 case qos::exactly_once: {
1162 ep.pubrec(packet_id.value(), v5::pubrec_reason_code::success, pubrec_props_);
1170 if (pubopts.get_qos() == qos::exactly_once &&
1171 it->exactly_once_processing(packet_id.value())) {
1174 <<
"receive already processed publish pid:" << packet_id.value();
1182 for (
auto&& p : props) {
1185 [](v5::property::topic_alias&&) {
1191 [&spep](v5::property::subscription_identifier&& p) {
1194 <<
"Subscription Identifier from client not forwarded sid:" << p.val();
1196 [&forward_props](
auto&& p) {
1208 pubopts.get_qos() | pubopts.get_retain(),
1212 switch (ep.get_protocol_version()) {
1213 case protocol_version::v3_1_1:
1214 switch (pubopts.get_qos()) {
1215 case qos::at_least_once:
1216 ep.puback(packet_id.value());
1218 case qos::exactly_once:
1225 case protocol_version::v5:
1226 switch (pubopts.get_qos()) {
1227 case qos::at_least_once:
1228 ep.puback(packet_id.value(), v5::puback_reason_code::success, puback_props_);
1230 case qos::exactly_once:
1238 BOOST_ASSERT(
false);
1245 bool puback_handler(
1251 auto it = idx.find(spep);
1252 BOOST_ASSERT(it != idx.end());
1263 bool pubrec_handler(
1269 auto it = idx.find(spep);
1270 BOOST_ASSERT(it != idx.end());
1280 switch (ep.get_protocol_version()) {
1281 case protocol_version::v3_1_1:
1282 ep.pubrel(packet_id);
1284 case protocol_version::v5:
1285 ep.pubrel(packet_id, v5::pubrel_reason_code::success, pubrel_props_);
1288 BOOST_ASSERT(
false);
1294 bool pubrel_handler(
1300 auto it = idx.find(spep);
1301 BOOST_ASSERT(it != idx.end());
1311 switch (ep.get_protocol_version()) {
1312 case protocol_version::v3_1_1:
1313 ep.pubcomp(packet_id);
1315 case protocol_version::v5:
1316 ep.pubcomp(packet_id, v5::pubcomp_reason_code::success, pubcomp_props_);
1319 BOOST_ASSERT(
false);
1325 bool pubcomp_handler(
1331 auto it = idx.find(spep);
1332 BOOST_ASSERT(it != idx.end());
1343 bool subscribe_handler(
1346 std::vector<subscribe_entry> entries,
1352 auto it = idx.find(spep);
1353 BOOST_ASSERT(it != idx.end());
1358 optional<session_state_ref> ssr_opt;
1364 [](
auto&) { BOOST_ASSERT(
false); }
1367 BOOST_ASSERT(ssr_opt);
1371 [
this, &ssr](
retain_t const& r,
qos qos_value, optional<std::size_t> sid) {
1372 auto props = r.
props;
1374 props.push_back(v5::property::subscription_identifier(*sid));
1378 std::chrono::duration_cast<std::chrono::seconds>(
1381 set_property<v5::property::message_expiry_interval>(
1383 v5::property::message_expiry_interval(
1384 static_cast<uint32_t
>(d)
1397 std::vector<std::function<void()>> retain_deliver;
1398 retain_deliver.reserve(entries.size());
1401 optional<std::size_t> sid;
1408 switch (ep.get_protocol_version()) {
1409 case protocol_version::v3_1_1: {
1410 std::vector<suback_return_code> res;
1411 res.reserve(entries.size());
1412 for (
auto& e : entries) {
1414 ssr.get().subscribe(
1421 [&](retain_t const& r) {
1422 retain_deliver.emplace_back(
1423 [&publish_proc, &r, qos_value = e.subopts.get_qos(), sid] {
1424 publish_proc(r, qos_value, sid);
1435 case protocol_version::v5: {
1437 auto v = get_property<v5::property::subscription_identifier>(props);
1438 if (v && v.value().val() != 0) {
1439 sid.emplace(v.value().val());
1442 std::vector<v5::suback_reason_code> res;
1443 res.reserve(entries.size());
1444 for (
auto& e : entries) {
1446 ssr.get().subscribe(
1453 [&](retain_t const& r) {
1454 retain_deliver.emplace_back(
1455 [&publish_proc, &r, qos_value = e.subopts.get_qos(), sid] {
1456 publish_proc(r, qos_value, sid);
1465 if (h_subscribe_props_) h_subscribe_props_(props);
1467 ep.suback(packet_id,
force_move(res), suback_props_);
1470 BOOST_ASSERT(
false);
1474 for (
auto const& f : retain_deliver) {
1480 bool unsubscribe_handler(
1483 std::vector<unsubscribe_entry> entries,
1488 auto& idx = sessions_.get<
tag_con>();
1489 auto it = idx.find(spep);
1490 BOOST_ASSERT(it != idx.end());
1496 optional<session_state_ref> ssr_opt;
1502 [](
auto&) { BOOST_ASSERT(
false); }
1505 BOOST_ASSERT(ssr_opt);
1511 for (
auto const& e : entries) {
1512 ssr.get().unsubscribe(e.share_name, e.topic_filter);
1515 switch (ep.get_protocol_version()) {
1516 case protocol_version::v3_1_1:
1517 ep.unsuback(packet_id);
1519 case protocol_version::v5:
1520 if (h_unsubscribe_props_) h_unsubscribe_props_(props);
1523 std::vector<v5::unsuback_reason_code>(
1525 v5::unsuback_reason_code::success
1531 BOOST_ASSERT(
false);
1552 publish_options pubopts,
1560 publish_options new_pubopts = std::min(pubopts.get_qos(), sub.subopts.get_qos());
1566 props.push_back(v5::property::subscription_identifier(sub.sid.value()));
1588 std::set<std::tuple<string_view, string_view>> sent;
1598 if (sub.subopts.get_nl() == nl::yes &&
1599 sub.ss.get().con().get() == &ep) return;
1600 deliver(sub.ss.get(), sub);
1605 std::tie(std::ignore, inserted) = sent.emplace(sub.share_name, sub.topic_filter);
1607 if (auto ssr_opt = shared_targets_.get_target(sub.share_name, sub.topic_filter)) {
1608 deliver(ssr_opt.value().get(), sub);
1616 if (ep.get_protocol_version() == protocol_version::v5) {
1617 auto v = get_property<v5::property::message_expiry_interval>(props);
1646 if (contents.empty()) {
1647 retains_.erase(topic);
1650 std::shared_ptr<as::steady_timer> tim_message_expiry;
1651 if (message_expiry_interval) {
1653 tim_message_expiry->async_wait(
1654 [
this, topic = topic, wp = std::weak_ptr<as::steady_timer>(tim_message_expiry)]
1656 if (
auto sp = wp.lock()) {
1658 retains_.erase(topic);
1665 retains_.insert_or_assign(
1680 as::io_context& ioc_;
1681 as::steady_timer tim_disconnect_;
1682 optional<std::chrono::steady_clock::duration> delay_disconnect_;
1713 bool pingresp_ =
true;
MQTT_STRING_VIEW_CONSTEXPR bool validate_topic_filter(string_view topic_filter)
Definition: broker.hpp:48
MQTT_STRING_VIEW_CONSTEXPR bool compare_topic_filter(string_view topic_filter, string_view topic_name)
Definition: broker.hpp:176
#define MQTT_STRING_VIEW_CONSTEXPR
Definition: broker.hpp:37
MQTT_STRING_VIEW_CONSTEXPR bool validate_topic_name(string_view topic_name)
Definition: broker.hpp:144
#define MQTT_BROKER_NS_END
Definition: broker_namespace.hpp:22
#define MQTT_BROKER_NS_BEGIN
Definition: broker_namespace.hpp:21
Definition: broker.hpp:256
void set_unsuback_props(v5::properties props)
Definition: broker.hpp:707
void set_publish_props_handler(std::function< void(v5::properties const &)> h)
Definition: broker.hpp:735
void set_pubrec_props(v5::properties props)
Definition: broker.hpp:715
void set_pubrel_props(v5::properties props)
Definition: broker.hpp:719
void set_pingresp(bool b)
set pingresp send operaton
Definition: broker.hpp:283
void set_unsubscribe_props_handler(std::function< void(v5::properties const &)> h)
Definition: broker.hpp:759
void set_connack_props(v5::properties props)
Definition: broker.hpp:699
void set_disconnect_props_handler(std::function< void(v5::properties const &)> h)
Definition: broker.hpp:731
void set_suback_props(v5::properties props)
Definition: broker.hpp:703
void set_pubcomp_props_handler(std::function< void(v5::properties const &)> h)
Definition: broker.hpp:751
void set_pubrel_props_handler(std::function< void(v5::properties const &)> h)
Definition: broker.hpp:747
void clear_all_sessions()
Definition: broker.hpp:767
void set_connect_props_handler(std::function< void(v5::properties const &)> h)
Definition: broker.hpp:727
broker_t(as::io_context &ioc)
Definition: broker.hpp:258
void handle_accept(con_sp_t spep)
handle_accept
Definition: broker.hpp:304
void clear_all_retained_topics()
Definition: broker.hpp:771
void set_disconnect_delay(std::chrono::steady_clock::duration delay)
set_disconnect_delay adds a delay to disconnect operations.
Definition: broker.hpp:273
void set_puback_props_handler(std::function< void(v5::properties const &)> h)
Definition: broker.hpp:739
void set_puback_props(v5::properties props)
Definition: broker.hpp:711
void set_auth_props_handler(std::function< void(v5::properties const &)> h)
Definition: broker.hpp:763
void set_pubrec_props_handler(std::function< void(v5::properties const &)> h)
Definition: broker.hpp:743
void set_subscribe_props_handler(std::function< void(v5::properties const &)> h)
Definition: broker.hpp:755
void set_pubcomp_props(v5::properties props)
Definition: broker.hpp:723
void clear()
Definition: retained_topic_map.hpp:351
Definition: session_state.hpp:433
decltype(auto) get()
Definition: session_state.hpp:436
void clear()
Definition: session_state.hpp:445
Definition: shared_target.hpp:32
std::weak_ptr< endpoint_t > con_wp_t
Definition: common_type.hpp:19
server<>::endpoint_t endpoint_t
Definition: common_type.hpp:17
endpoint_t::packet_id_t packet_id_t
Definition: common_type.hpp:20
std::shared_ptr< endpoint_t > con_sp_t
Definition: common_type.hpp:18
#define MQTT_LOG(chan, sev)
Definition: log.hpp:135
#define MQTT_ADD_VALUE(name, val)
Definition: log.hpp:136
constexpr char const clean_start
Definition: connect_flags.hpp:19
constexpr char const clean_session
Definition: connect_flags.hpp:18
@ message_expiry_interval
auth_reason_code
Definition: reason_code.hpp:385
pubrel_reason_code
Definition: reason_code.hpp:341
puback_reason_code
Definition: reason_code.hpp:269
pubrec_reason_code
Definition: reason_code.hpp:305
constexpr suback_reason_code qos_to_suback_reason_code(qos q)
Definition: reason_code.hpp:233
pubcomp_reason_code
Definition: reason_code.hpp:363
std::vector< property_variant > properties
Definition: property_variant.hpp:51
disconnect_reason_code
Definition: reason_code.hpp:114
boost::string_ref string_view
Definition: string_view.hpp:64
constexpr decltype(auto) visit(Visitor &&vis, Variants &&... vars)
Definition: variant.hpp:60
boost::system::error_code error_code
Definition: error_code.hpp:16
constexpr std::remove_reference_t< T > && force_move(T &&t)
Definition: move.hpp:20
constexpr suback_return_code qos_to_suback_return_code(qos q)
Definition: reason_code.hpp:44
retain
Definition: publish.hpp:42
lambda_visitor< Lambdas... > make_lambda_visitor(Lambdas &&... lambdas)
Definition: visitor_util.hpp:37
qos
Definition: subscribe_options.hpp:34
Definition: buffer.hpp:242
const_buffer buffer(MQTT_NS::buffer const &data)
create boost::asio::const_buffer from the MQTT_NS::buffer boost::asio::const_buffer is a kind of view...
Definition: buffer.hpp:253
std::reference_wrapper< session_state > session_state_ref
Definition: session_state_fwd.hpp:20
Definition: retain_t.hpp:25
buffer contents
Definition: retain_t.hpp:40
qos qos_value
Definition: retain_t.hpp:42
buffer topic
Definition: retain_t.hpp:39
v5::properties props
Definition: retain_t.hpp:41
std::shared_ptr< as::steady_timer > tim_message_expiry
Definition: retain_t.hpp:43
Definition: session_state.hpp:53
void erase_inflight_message_by_packet_id(packet_id_t packet_id)
Definition: session_state.hpp:369
void exactly_once_start(packet_id_t packet_id)
Definition: session_state.hpp:242
void deliver(as::io_context &ioc, buffer pub_topic, buffer contents, publish_options pubopts, v5::properties props)
Definition: session_state.hpp:207
con_sp_t const & con() const
Definition: session_state.hpp:396
void become_offline(SessionExpireHandler &&h)
Definition: session_state.hpp:88
void exactly_once_finish(packet_id_t packet_id)
Definition: session_state.hpp:250
void send_offline_messages_by_packet_id_release()
Definition: session_state.hpp:379
Definition: subscription.hpp:20
buffer share_name
Definition: subscription.hpp:35