70class basic_endpoint :
public std::enable_shared_from_this<basic_endpoint<Role, PacketIdBytes, Strand, NextLayer>>{
72 enum class connection_status {
80 static constexpr bool can_send_as_client(
role r) {
81 return static_cast<int>(
r) &
static_cast<int>(role::client);
84 static constexpr bool can_send_as_server(
role r) {
85 return static_cast<int>(
r) &
static_cast<int>(role::server);
90 for (
auto const&
prop : props) {
106 using this_type_sp = std::shared_ptr<this_type>;
107 using this_type_wp = std::weak_ptr<this_type>;
114 template <
typename T>
115 friend class make_shared_helper;
141 template <
typename...
Args>
142 static std::shared_ptr<this_type>
create(
146 return make_shared_helper<this_type>::make_shared(
ver, std::forward<Args>(
args)...);
150 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
151 << ASYNC_MQTT_ADD_VALUE(
address,
this)
155 basic_endpoint(this_type
const&) =
delete;
156 basic_endpoint(this_type&&) =
delete;
157 this_type& operator=(this_type
const&) =
delete;
158 this_type& operator=(this_type&&) =
delete;
165 return stream_->strand();
173 return stream_->strand();
181 return stream_->in_strand();
189 return stream_->next_layer();
196 return stream_->next_layer();
204 return stream_->lowest_layer();
211 return stream_->lowest_layer();
220 ASYNC_MQTT_LOG(
"mqtt_api", info)
221 << ASYNC_MQTT_ADD_VALUE(
address,
this)
222 <<
"set_auto_pub_response val:" << val;
223 auto_pub_response_ = val;
232 ASYNC_MQTT_LOG(
"mqtt_api", info)
233 << ASYNC_MQTT_ADD_VALUE(
address,
this)
234 <<
"set_auto_ping_response val:" << val;
235 auto_ping_response_ = val;
246 ASYNC_MQTT_LOG(
"mqtt_api", info)
247 << ASYNC_MQTT_ADD_VALUE(
address,
this)
248 <<
"set_auto_map_topic_alias_send val:" << val;
249 auto_map_topic_alias_send_ = val;
260 ASYNC_MQTT_LOG(
"mqtt_api", info)
261 << ASYNC_MQTT_ADD_VALUE(
address,
this)
262 <<
"set_auto_replace_topic_alias_send val:" << val;
263 auto_replace_topic_alias_send_ = val;
277 pingresp_recv_timeout_ms_ = nullopt;
280 pingresp_recv_timeout_ms_.emplace(
ms);
294 stream_->set_bulk_write(val);
305 template <
typename CompletionToken>
310 ASYNC_MQTT_LOG(
"mqtt_api", info)
311 << ASYNC_MQTT_ADD_VALUE(
address,
this)
312 <<
"acquire_unique_packet_id";
318 acquire_unique_packet_id_impl{
331 template <
typename CompletionToken>
336 ASYNC_MQTT_LOG(
"mqtt_api", info)
337 << ASYNC_MQTT_ADD_VALUE(
address,
this)
338 <<
"acquire_unique_packet_id_wait_until";
344 acquire_unique_packet_id_wait_until_impl{
357 template <
typename CompletionToken>
363 ASYNC_MQTT_LOG(
"mqtt_api", info)
364 << ASYNC_MQTT_ADD_VALUE(
address,
this)
365 <<
"register_packet_id pid:" << packet_id;
371 register_packet_id_impl{
385 template <
typename CompletionToken>
391 ASYNC_MQTT_LOG(
"mqtt_api", info)
392 << ASYNC_MQTT_ADD_VALUE(
address,
this)
393 <<
"release_packet_id pid:" << packet_id;
399 release_packet_id_impl{
414 template <
typename Packet,
typename CompletionToken>
420 ASYNC_MQTT_LOG(
"mqtt_api", info)
421 << ASYNC_MQTT_ADD_VALUE(
address,
this)
422 <<
"send:" << packet;
423 if constexpr(!std::is_same_v<Packet, basic_packet_variant<PacketIdBytes>>) {
427 "Packet cannot be send by MQTT protocol"
435 std::forward<CompletionToken>(
token)
445 template <
typename CompletionToken>
450 ASYNC_MQTT_LOG(
"mqtt_api", info)
451 << ASYNC_MQTT_ADD_VALUE(
address,
this)
454 recv_processing_ =
true;
476 template <
typename CompletionToken>
479 std::set<control_packet_type> types,
482 ASYNC_MQTT_LOG(
"mqtt_api", info)
483 << ASYNC_MQTT_ADD_VALUE(
address,
this)
486 recv_processing_ =
true;
511 template <
typename CompletionToken>
515 std::set<control_packet_type> types,
518 ASYNC_MQTT_LOG(
"mqtt_api", info)
519 << ASYNC_MQTT_ADD_VALUE(
address,
this)
522 recv_processing_ =
true;
542 template<
typename CompletionToken>
545 ASYNC_MQTT_LOG(
"mqtt_api", info)
546 << ASYNC_MQTT_ADD_VALUE(
address,
this)
567 template <
typename CompletionToken>
573 ASYNC_MQTT_LOG(
"mqtt_api", info)
574 << ASYNC_MQTT_ADD_VALUE(
address,
this)
575 <<
"restore_packets";
581 restore_packets_impl{
598 template <
typename CompletionToken>
603 ASYNC_MQTT_LOG(
"mqtt_api", info)
604 << ASYNC_MQTT_ADD_VALUE(
address,
this)
605 <<
"get_stored_packets";
611 get_stored_packets_impl{
618 template <
typename CompletionToken>
621 v5::basic_publish_packet<PacketIdBytes> packet,
624 ASYNC_MQTT_LOG(
"mqtt_api", info)
625 << ASYNC_MQTT_ADD_VALUE(
address,
this)
626 <<
"regulate_for_store:" << packet;
630 void(v5::basic_publish_packet<PacketIdBytes>)
632 regulate_for_store_impl{
649 auto pid = pid_man_.acquire_unique_id();
651 ASYNC_MQTT_LOG(
"mqtt_api", info)
652 << ASYNC_MQTT_ADD_VALUE(
address,
this)
653 <<
"acquire_unique_packet_id:" << *
pid;
656 ASYNC_MQTT_LOG(
"mqtt_api", info)
657 << ASYNC_MQTT_ADD_VALUE(
address,
this)
658 <<
"acquire_unique_packet_id:full";
671 auto ret = pid_man_.register_id(
pid);
672 ASYNC_MQTT_LOG(
"mqtt_api", info)
673 << ASYNC_MQTT_ADD_VALUE(
address,
this)
674 <<
"register_packet_id:" <<
pid <<
" result:" <<
ret;
685 ASYNC_MQTT_LOG(
"mqtt_api", info)
686 << ASYNC_MQTT_ADD_VALUE(
address,
this)
687 <<
"release_packet_id:" <<
pid;
699 ASYNC_MQTT_LOG(
"mqtt_api", info)
700 << ASYNC_MQTT_ADD_VALUE(
address,
this)
701 <<
"get_qos2_publish_handled_pids";
702 return qos2_publish_handled_;
713 ASYNC_MQTT_LOG(
"mqtt_api", info)
714 << ASYNC_MQTT_ADD_VALUE(
address,
this)
715 <<
"restore_qos2_publish_handled_pids";
716 qos2_publish_handled_ = force_move(
pids);
729 ASYNC_MQTT_LOG(
"mqtt_api", info)
730 << ASYNC_MQTT_ADD_VALUE(
address,
this)
731 <<
"restore_packets";
732 for (
auto&
pv : pvs) {
735 if (pid_man_.register_id(p.packet_id())) {
736 store_.add(force_move(p));
739 ASYNC_MQTT_LOG(
"mqtt_impl", error)
740 << ASYNC_MQTT_ADD_VALUE(
address,
this)
741 <<
"packet_id:" << p.packet_id()
742 <<
" has already been used. Skip it";
760 ASYNC_MQTT_LOG(
"mqtt_api", info)
761 << ASYNC_MQTT_ADD_VALUE(
address,
this)
762 <<
"get_stored_packets";
763 return store_.get_stored();
773 ASYNC_MQTT_LOG(
"mqtt_api", info)
774 << ASYNC_MQTT_ADD_VALUE(
address,
this)
775 <<
"get_protocol_version:" << protocol_version_;
776 return protocol_version_;
787 ASYNC_MQTT_LOG(
"mqtt_api", info)
788 << ASYNC_MQTT_ADD_VALUE(
address,
this)
789 <<
"is_publish_processing:" <<
pid;
790 return qos2_publish_processing_.find(
pid) != qos2_publish_processing_.end();
802 ASYNC_MQTT_LOG(
"mqtt_api", info)
803 << ASYNC_MQTT_ADD_VALUE(
address,
this)
804 <<
"regulate_for_store:" << packet;
805 if (packet.topic().empty()) {
806 if (
auto ta_opt = get_topic_alias(packet.props())) {
807 auto topic = topic_alias_send_->find_without_touch(*
ta_opt);
808 if (!topic.empty()) {
809 packet.remove_topic_alias_add_topic(allocate_buffer(topic));
814 packet.remove_topic_alias();
818 void cancel_all_timers_for_test() {
820 tim_pingreq_send_->cancel();
821 tim_pingreq_recv_->cancel();
822 tim_pingresp_recv_->cancel();
825 void set_pingreq_send_interval_ms_for_test(std::size_t ms) {
827 pingreq_send_interval_ms_ = ms;
841 template <
typename... Args>
843 protocol_version ver,
845 ): protocol_version_{ver},
846 stream_{stream_type::
create(std::forward<Args>(args)...)}
849 (Role == role::client && ver != protocol_version::undetermined) ||
855 struct acquire_unique_packet_id_impl {
857 optional<packet_id_t> pid_opt = nullopt;
858 enum { dispatch, acquire, complete } state = dispatch;
860 template <
typename Self>
870 a_ep.stream_->raw_strand(),
876 BOOST_ASSERT(ep.in_strand());
877 pid_opt = ep.pid_man_.acquire_unique_id();
879 bind_dispatch(force_move(self));
882 self.complete(pid_opt);
888 struct acquire_unique_packet_id_wait_until_impl {
890 this_type_wp retry_wp = ep.weak_from_this();
891 optional<packet_id_t> pid_opt = nullopt;
892 enum { dispatch, acquire, complete } state = dispatch;
894 template <
typename Self>
897 error_code
const& ec = error_code{}
899 if (retry_wp.expired())
return;
906 a_ep.stream_->raw_strand(),
912 BOOST_ASSERT(ep.in_strand());
915 pid_opt = ep.pid_man_.acquire_unique_id();
918 bind_dispatch(force_move(self));
921 ASYNC_MQTT_LOG(
"mqtt_impl", warning)
922 << ASYNC_MQTT_ADD_VALUE(address, &ep)
923 <<
"packet_id is fully allocated. waiting release";
932 if (ec == errc::operation_canceled) {
933 ep.complete_retry_one();
936 else if (ep.has_retry()) {
937 ASYNC_MQTT_LOG(
"mqtt_impl", warning)
938 << ASYNC_MQTT_ADD_VALUE(address, &ep)
939 <<
"packet_id waiter exists. add the end of waiter queue";
951 BOOST_ASSERT(pid_opt);
952 self.complete(*pid_opt);
958 struct register_packet_id_impl {
962 enum { dispatch, regi, complete } state = dispatch;
964 template <
typename Self>
974 a_ep.stream_->raw_strand(),
980 BOOST_ASSERT(ep.in_strand());
981 result = ep.pid_man_.register_id(packet_id);
983 bind_dispatch(force_move(self));
986 self.complete(result);
992 struct release_packet_id_impl {
995 enum { dispatch, rel, complete } state = dispatch;
997 template <
typename Self>
1007 a_ep.stream_->raw_strand(),
1013 BOOST_ASSERT(ep.in_strand());
1014 ep.release_pid(packet_id);
1016 bind_dispatch(force_move(self));
1026 template <
typename Packet>
1030 bool from_queue =
false;
1031 error_code last_ec = error_code{};
1032 enum { dispatch, write, bind, complete } state = dispatch;
1034 template <
typename Self>
1037 error_code
const& ec = error_code{},
1041 ASYNC_MQTT_LOG(
"mqtt_impl", info)
1042 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1043 <<
"send error:" << ec.message();
1046 bind_dispatch(force_move(self));
1056 a_ep.stream_->raw_strand(),
1062 BOOST_ASSERT(ep.in_strand());
1065 std::is_same_v<std::decay_t<Packet>, basic_packet_variant<PacketIdBytes>> ||
1066 std::is_same_v<std::decay_t<Packet>, basic_store_packet_variant<PacketIdBytes>>
1070 [&](
auto& actual_packet) {
1071 if (process_send_packet(self, actual_packet)) {
1073 a_ep.stream_->write_packet(
1074 force_move(actual_packet),
1077 if constexpr(is_connack<std::remove_reference_t<
decltype(actual_packet)>>()) {
1081 if constexpr(Role == role::client) {
1082 a_ep.reset_pingreq_send_timer();
1086 [&](system_error&) {}
1091 if (process_send_packet(self, packet)) {
1093 auto& a_packet{packet};
1094 a_ep.stream_->write_packet(
1095 force_move(a_packet),
1098 if constexpr(is_connack<Packet>()) {
1102 if constexpr(Role == role::client) {
1103 a_ep.reset_pingreq_send_timer();
1109 BOOST_ASSERT(ep.in_strand());
1112 bind_dispatch(force_move(self));
1116 self.complete(last_ec);
1121 template <
typename Self,
typename ActualPacket>
1122 bool process_send_packet(Self& self, ActualPacket& actual_packet) {
1126 (can_send_as_client(Role) && is_client_sendable<std::decay_t<ActualPacket>>()) ||
1127 (can_send_as_server(Role) && is_server_sendable<std::decay_t<ActualPacket>>())
1132 errc::protocol_error,
1133 "packet cannot be send by MQTT protocol"
1139 auto version_check =
1141 if (ep.protocol_version_ == protocol_version::v3_1_1 && is_v3_1_1<ActualPacket>()) {
1144 if (ep.protocol_version_ == protocol_version::v5 && is_v5<ActualPacket>()) {
1151 if constexpr(is_connect<ActualPacket>()) {
1152 if (ep.status_ != connection_status::closed) {
1155 errc::protocol_error,
1156 "connect_packet can only be send on connection_status::closed"
1161 if (!version_check()) {
1164 errc::protocol_error,
1165 "protocol version mismatch"
1171 else if constexpr(is_connack<ActualPacket>()) {
1172 if (ep.status_ != connection_status::connecting) {
1175 errc::protocol_error,
1176 "connack_packet can only be send on connection_status::connecting"
1181 if (!version_check()) {
1184 errc::protocol_error,
1185 "protocol version mismatch"
1191 else if constexpr(std::is_same_v<v5::auth_packet, Packet>) {
1192 if (ep.status_ != connection_status::connected &&
1193 ep.status_ != connection_status::connecting) {
1196 errc::protocol_error,
1197 "auth packet can only be send on connection_status::connecting or status::connected"
1202 if (!version_check()) {
1205 errc::protocol_error,
1206 "protocol version mismatch"
1213 if (ep.status_ != connection_status::connected) {
1214 if constexpr(!is_publish<std::decay_t<ActualPacket>>()) {
1217 errc::protocol_error,
1218 "packet can only be send on connection_status::connected"
1224 if (!version_check()) {
1227 errc::protocol_error,
1228 "protocol version mismatch"
1236 bool topic_alias_validated =
false;
1238 if constexpr(std::is_same_v<v3_1_1::connect_packet, std::decay_t<ActualPacket>>) {
1240 ep.status_ = connection_status::connecting;
1241 auto keep_alive = actual_packet.keep_alive();
1242 if (keep_alive != 0 && !ep.pingreq_send_interval_ms_) {
1243 ep.pingreq_send_interval_ms_.emplace(keep_alive * 1000);
1245 if (actual_packet.clean_session()) {
1248 ep.need_store_ =
false;
1251 ep.need_store_ =
true;
1253 ep.topic_alias_send_ = nullopt;
1256 if constexpr(std::is_same_v<v5::connect_packet, std::decay_t<ActualPacket>>) {
1258 ep.status_ = connection_status::connecting;
1259 auto keep_alive = actual_packet.keep_alive();
1260 if (keep_alive != 0 && !ep.pingreq_send_interval_ms_) {
1261 ep.pingreq_send_interval_ms_.emplace(keep_alive * 1000);
1263 if (actual_packet.clean_start()) {
1267 for (
auto const& prop : actual_packet.props()) {
1270 [&](property::topic_alias_maximum
const& p) {
1272 ep.topic_alias_recv_.emplace(p.val());
1275 [&](property::receive_maximum
const& p) {
1276 BOOST_ASSERT(p.val() != 0);
1277 ep.publish_recv_max_ = p.val();
1279 [&](property::maximum_packet_size
const& p) {
1280 BOOST_ASSERT(p.val() != 0);
1281 ep.maximum_packet_size_recv_ = p.val();
1283 [&](property::session_expiry_interval
const& p) {
1285 ep.need_store_ =
true;
1294 if constexpr(std::is_same_v<v3_1_1::connack_packet, std::decay_t<ActualPacket>>) {
1295 if (actual_packet.code() == connect_return_code::accepted) {
1296 ep.status_ = connection_status::connected;
1299 ep.status_ = connection_status::disconnecting;
1303 if constexpr(std::is_same_v<v5::connack_packet, std::decay_t<ActualPacket>>) {
1304 if (actual_packet.code() == connect_reason_code::success) {
1305 ep.status_ = connection_status::connected;
1306 for (
auto const& prop : actual_packet.props()) {
1309 [&](property::topic_alias_maximum
const& p) {
1311 ep.topic_alias_recv_.emplace(p.val());
1314 [&](property::receive_maximum
const& p) {
1315 BOOST_ASSERT(p.val() != 0);
1316 ep.publish_recv_max_ = p.val();
1318 [&](property::maximum_packet_size
const& p) {
1319 BOOST_ASSERT(p.val() != 0);
1320 ep.maximum_packet_size_recv_ = p.val();
1328 ep.status_ = connection_status::disconnecting;
1333 if constexpr(is_publish<std::decay_t<ActualPacket>>()) {
1334 if (actual_packet.opts().get_qos() == qos::at_least_once ||
1335 actual_packet.opts().get_qos() == qos::exactly_once
1337 BOOST_ASSERT(ep.pid_man_.is_used_id(actual_packet.packet_id()));
1338 if (ep.need_store_) {
1339 if constexpr(is_instance_of<v5::basic_publish_packet, std::decay_t<ActualPacket>>::value) {
1340 auto ta_opt = get_topic_alias(actual_packet.props());
1341 if (actual_packet.topic().empty()) {
1342 auto topic_opt = validate_topic_alias(self, ta_opt);
1344 auto packet_id = actual_packet.packet_id();
1345 if (packet_id != 0) {
1346 ep.release_pid(packet_id);
1350 topic_alias_validated =
true;
1351 auto props = actual_packet.props();
1352 auto it = props.cbegin();
1353 auto end = props.cend();
1354 for (; it != end; ++it) {
1355 if (it->id() == property::id::topic_alias) {
1363 actual_packet.packet_id(),
1364 allocate_buffer(*topic_opt),
1365 actual_packet.payload(),
1366 actual_packet.opts(),
1369 if (!validate_maximum_packet_size(self, store_packet)) {
1370 auto packet_id = actual_packet.packet_id();
1371 if (packet_id != 0) {
1372 ep.release_pid(packet_id);
1378 store_packet.set_dup(
true);
1379 ep.store_.add(force_move(store_packet));
1382 auto props = actual_packet.props();
1383 auto it = props.cbegin();
1384 auto end = props.cend();
1385 for (; it != end; ++it) {
1386 if (it->id() == property::id::topic_alias) {
1394 actual_packet.packet_id(),
1395 actual_packet.topic(),
1396 actual_packet.payload(),
1397 actual_packet.opts(),
1400 if (!validate_maximum_packet_size(self, store_packet)) {
1401 auto packet_id = actual_packet.packet_id();
1402 if (packet_id != 0) {
1403 ep.release_pid(packet_id);
1407 store_packet.set_dup(
true);
1408 ep.store_.add(force_move(store_packet));
1412 if (!validate_maximum_packet_size(self, actual_packet)) {
1413 auto packet_id = actual_packet.packet_id();
1414 if (packet_id != 0) {
1415 ep.release_pid(packet_id);
1419 auto store_packet{actual_packet};
1420 store_packet.set_dup(
true);
1421 ep.store_.add(force_move(store_packet));
1424 if (actual_packet.opts().get_qos() == qos::exactly_once) {
1425 ep.qos2_publish_processing_.insert(actual_packet.packet_id());
1426 ep.pid_pubrec_.insert(actual_packet.packet_id());
1429 ep.pid_puback_.insert(actual_packet.packet_id());
1434 if constexpr(is_instance_of<v5::basic_publish_packet, std::decay_t<ActualPacket>>::value) {
1436 auto ta_opt = get_topic_alias(actual_packet.props());
1437 if (actual_packet.topic().empty()) {
1438 if (!topic_alias_validated &&
1439 !validate_topic_alias(self, ta_opt)) {
1440 auto packet_id = actual_packet.packet_id();
1441 if (packet_id != 0) {
1442 ep.release_pid(packet_id);
1450 if (validate_topic_alias_range(self, *ta_opt)) {
1451 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
1452 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1454 << actual_packet.topic() <<
" - " << *ta_opt
1455 <<
" is registered." ;
1456 ep.topic_alias_send_->insert_or_update(actual_packet.topic(), *ta_opt);
1459 auto packet_id = actual_packet.packet_id();
1460 if (packet_id != 0) {
1461 ep.release_pid(packet_id);
1466 else if (ep.auto_map_topic_alias_send_) {
1467 if (ep.topic_alias_send_) {
1468 if (
auto ta_opt = ep.topic_alias_send_->find(actual_packet.topic())) {
1469 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
1470 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1471 <<
"topia alias : " << actual_packet.topic() <<
" - " << *ta_opt
1473 actual_packet.remove_topic_add_topic_alias(*ta_opt);
1476 auto lru_ta = ep.topic_alias_send_->get_lru_alias();
1477 ep.topic_alias_send_->insert_or_update(actual_packet.topic(), lru_ta);
1478 actual_packet.add_topic_alias(lru_ta);
1482 else if (ep.auto_replace_topic_alias_send_) {
1483 if (ep.topic_alias_send_) {
1484 if (
auto ta_opt = ep.topic_alias_send_->find(actual_packet.topic())) {
1485 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
1486 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1487 <<
"topia alias : " << actual_packet.topic() <<
" - " << *ta_opt
1489 actual_packet.remove_topic_add_topic_alias(*ta_opt);
1496 if (!from_queue && ep.enqueue_publish(actual_packet)) {
1500 "publish_packet is enqueued due to receive_maximum for sending"
1507 if constexpr(is_instance_of<v5::basic_puback_packet, std::decay_t<ActualPacket>>::value) {
1508 ep.publish_recv_.erase(actual_packet.packet_id());
1512 if constexpr(is_instance_of<v5::basic_pubrec_packet, std::decay_t<ActualPacket>>::value) {
1513 if (is_error(actual_packet.code())) {
1514 ep.publish_recv_.erase(actual_packet.packet_id());
1515 ep.qos2_publish_handled_.erase(actual_packet.packet_id());
1519 if constexpr(is_pubrel<std::decay_t<ActualPacket>>()) {
1520 BOOST_ASSERT(ep.pid_man_.is_used_id(actual_packet.packet_id()));
1521 if (ep.need_store_) ep.store_.add(actual_packet);
1522 ep.pid_pubcomp_.insert(actual_packet.packet_id());
1525 if constexpr(is_instance_of<v5::basic_pubcomp_packet, std::decay_t<ActualPacket>>::value) {
1526 ep.publish_recv_.erase(actual_packet.packet_id());
1529 if constexpr(is_subscribe<std::decay_t<ActualPacket>>()) {
1530 BOOST_ASSERT(ep.pid_man_.is_used_id(actual_packet.packet_id()));
1531 ep.pid_suback_.insert(actual_packet.packet_id());
1534 if constexpr(is_unsubscribe<std::decay_t<ActualPacket>>()) {
1535 BOOST_ASSERT(ep.pid_man_.is_used_id(actual_packet.packet_id()));
1536 ep.pid_unsuback_.insert(actual_packet.packet_id());
1539 if constexpr(is_pingreq<std::decay_t<ActualPacket>>()) {
1540 ep.reset_pingresp_recv_timer();
1543 if constexpr(is_disconnect<std::decay_t<ActualPacket>>()) {
1544 ep.status_ = connection_status::disconnecting;
1547 if (!validate_maximum_packet_size(self, actual_packet)) {
1548 if constexpr(own_packet_id<std::decay_t<ActualPacket>>()) {
1549 auto packet_id = actual_packet.packet_id();
1550 if (packet_id != 0) {
1551 ep.release_pid(packet_id);
1557 if constexpr(is_publish<std::decay_t<ActualPacket>>()) {
1558 if (ep.status_ != connection_status::connected) {
1563 "packet is stored but not sent"
1573 template <
typename Self>
1574 bool validate_topic_alias_range(Self& self, topic_alias_t ta) {
1575 if (!ep.topic_alias_send_) {
1579 "topic_alias is set but topic_alias_maximum is 0"
1584 if (ta == 0 || ta > ep.topic_alias_send_->max()) {
1588 "topic_alias is set but out of range"
1596 template <
typename Self>
1597 optional<std::string> validate_topic_alias(Self& self, optional<topic_alias_t> ta_opt) {
1598 BOOST_ASSERT(ep.in_strand());
1603 "topic is empty but topic_alias isn't set"
1609 if (!validate_topic_alias_range(self, *ta_opt)) {
1613 auto topic = ep.topic_alias_send_->find(*ta_opt);
1614 if (topic.empty()) {
1618 "topic is empty but topic_alias is not registered"
1626 template <
typename Self,
typename PacketArg>
1627 bool validate_maximum_packet_size(Self& self, PacketArg
const& packet_arg) {
1628 if (packet_arg.size() > ep.maximum_packet_size_send_) {
1629 ASYNC_MQTT_LOG(
"mqtt_impl", error)
1630 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1631 <<
"packet size over maximum_packet_size for sending";
1635 "packet size is over maximum_packet_size for sending"
1646 optional<filter> fil = nullopt;
1647 std::set<control_packet_type> types = {};
1648 optional<system_error> decided_error = nullopt;
1649 optional<basic_packet_variant<PacketIdBytes>> pv_opt = nullopt;
1650 enum { initiate, disconnect, close, read, complete } state = initiate;
1652 template <
typename Self>
1655 error_code
const& ec = error_code{},
1656 buffer buf = buffer{}
1659 ASYNC_MQTT_LOG(
"mqtt_impl", info)
1660 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1661 <<
"recv error:" << ec.message();
1662 decided_error.emplace(ec);
1663 ep.recv_processing_ =
false;
1668 a_ep.stream_->raw_strand(),
1679 a_ep.stream_->read_packet(force_move(self));
1682 BOOST_ASSERT(ep.in_strand());
1683 if (buf.size() > ep.maximum_packet_size_recv_) {
1685 BOOST_ASSERT(ep.protocol_version_ == protocol_version::v5);
1687 decided_error.emplace(
1690 "too large packet received"
1695 v5::disconnect_packet{
1696 disconnect_reason_code::packet_too_large
1703 auto v = buffer_to_basic_packet_variant<PacketIdBytes>(buf, ep.protocol_version_);
1704 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
1705 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1707 bool call_complete =
true;
1711 [&](v3_1_1::connect_packet& p) {
1713 ep.protocol_version_ = protocol_version::v3_1_1;
1714 ep.status_ = connection_status::connecting;
1715 auto keep_alive = p.keep_alive();
1716 if (keep_alive != 0) {
1717 ep.pingreq_recv_timeout_ms_.emplace(keep_alive * 1000 * 3 / 2);
1719 if (p.clean_session()) {
1720 ep.need_store_ =
false;
1723 ep.need_store_ =
true;
1726 [&](v5::connect_packet& p) {
1728 ep.protocol_version_ = protocol_version::v5;
1729 ep.status_ = connection_status::connecting;
1730 auto keep_alive = p.keep_alive();
1731 if (keep_alive != 0) {
1732 ep.pingreq_recv_timeout_ms_.emplace(keep_alive * 1000 * 3 / 2);
1734 for (
auto const& prop : p.props()) {
1737 [&](property::topic_alias_maximum
const& p) {
1739 ep.topic_alias_send_.emplace(p.val());
1742 [&](property::receive_maximum
const& p) {
1743 BOOST_ASSERT(p.val() != 0);
1744 ep.publish_send_max_ = p.val();
1746 [&](property::maximum_packet_size
const& p) {
1747 BOOST_ASSERT(p.val() != 0);
1748 ep.maximum_packet_size_send_ = p.val();
1750 [&](property::session_expiry_interval
const& p) {
1752 ep.need_store_ =
true;
1761 [&](v3_1_1::connack_packet& p) {
1762 if (p.code() == connect_return_code::accepted) {
1763 ep.status_ = connection_status::connected;
1764 if (p.session_present()) {
1773 [&](v5::connack_packet& p) {
1774 if (p.code() == connect_reason_code::success) {
1775 ep.status_ = connection_status::connected;
1777 for (
auto const& prop : p.props()) {
1780 [&](property::topic_alias_maximum
const& p) {
1782 ep.topic_alias_send_.emplace(p.val());
1785 [&](property::receive_maximum
const& p) {
1786 BOOST_ASSERT(p.val() != 0);
1787 ep.publish_send_max_ = p.val();
1789 [&](property::maximum_packet_size
const& p) {
1790 BOOST_ASSERT(p.val() != 0);
1791 ep.maximum_packet_size_send_ = p.val();
1799 if (p.session_present()) {
1808 [&](v3_1_1::basic_publish_packet<PacketIdBytes>& p) {
1809 switch (p.opts().get_qos()) {
1810 case qos::at_least_once: {
1811 if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
1813 v3_1_1::basic_puback_packet<PacketIdBytes>(p.packet_id()),
1814 [](system_error
const&){}
1818 case qos::exactly_once:
1819 call_complete = process_qos2_publish(self, protocol_version::v3_1_1, p.packet_id());
1825 [&](v5::basic_publish_packet<PacketIdBytes>& p) {
1826 switch (p.opts().get_qos()) {
1827 case qos::at_least_once: {
1828 if (ep.publish_recv_.size() == ep.publish_recv_max_) {
1830 decided_error.emplace(
1833 "receive maximum exceeded"
1838 v5::disconnect_packet{
1839 disconnect_reason_code::receive_maximum_exceeded
1845 auto packet_id = p.packet_id();
1846 ep.publish_recv_.insert(packet_id);
1847 if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
1849 v5::basic_puback_packet<PacketIdBytes>{packet_id},
1850 [](system_error
const&){}
1854 case qos::exactly_once: {
1855 if (ep.publish_recv_.size() == ep.publish_recv_max_) {
1857 decided_error.emplace(
1860 "receive maximum exceeded"
1865 v5::disconnect_packet{
1866 disconnect_reason_code::receive_maximum_exceeded
1872 auto packet_id = p.packet_id();
1873 ep.publish_recv_.insert(packet_id);
1874 call_complete = process_qos2_publish(self, protocol_version::v5, packet_id);
1880 if (p.topic().empty()) {
1881 if (
auto ta_opt = get_topic_alias(p.props())) {
1884 *ta_opt > ep.topic_alias_recv_->max()) {
1886 decided_error.emplace(
1889 "topic alias invalid"
1894 v5::disconnect_packet{
1895 disconnect_reason_code::topic_alias_invalid
1902 auto topic = ep.topic_alias_recv_->find(*ta_opt);
1903 if (topic.empty()) {
1904 ASYNC_MQTT_LOG(
"mqtt_impl", error)
1905 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1906 <<
"no matching topic alias: "
1909 decided_error.emplace(
1912 "topic alias invalid"
1917 v5::disconnect_packet{
1918 disconnect_reason_code::topic_alias_invalid
1925 p.add_topic(allocate_buffer(topic));
1930 ASYNC_MQTT_LOG(
"mqtt_impl", error)
1931 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1932 <<
"topic is empty but topic_alias isn't set";
1934 decided_error.emplace(
1937 "topic alias invalid"
1942 v5::disconnect_packet{
1943 disconnect_reason_code::topic_alias_invalid
1951 if (
auto ta_opt = get_topic_alias(p.props())) {
1953 *ta_opt > ep.topic_alias_recv_->max()) {
1955 decided_error.emplace(
1958 "topic alias invalid"
1963 v5::disconnect_packet{
1964 disconnect_reason_code::topic_alias_invalid
1972 if (ep.topic_alias_recv_) {
1973 ep.topic_alias_recv_->insert_or_update(p.topic(), *ta_opt);
1979 [&](v3_1_1::basic_puback_packet<PacketIdBytes>& p) {
1980 auto packet_id = p.packet_id();
1981 if (ep.pid_puback_.erase(packet_id)) {
1982 ep.store_.erase(response_packet::v3_1_1_puback, packet_id);
1983 ep.release_pid(packet_id);
1986 ASYNC_MQTT_LOG(
"mqtt_impl", info)
1987 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1988 <<
"invalid packet_id puback received packet_id:" << packet_id;
1990 decided_error.emplace(
1998 v5::disconnect_packet{
1999 disconnect_reason_code::topic_alias_invalid
2006 [&](v5::basic_puback_packet<PacketIdBytes>& p) {
2007 auto packet_id = p.packet_id();
2008 if (ep.pid_puback_.erase(packet_id)) {
2009 ep.store_.erase(response_packet::v5_puback, packet_id);
2010 ep.release_pid(packet_id);
2011 --ep.publish_send_count_;
2012 send_publish_from_queue();
2015 ASYNC_MQTT_LOG(
"mqtt_impl", info)
2016 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2017 <<
"invalid packet_id puback received packet_id:" << packet_id;
2019 decided_error.emplace(
2027 v5::disconnect_packet{
2028 disconnect_reason_code::topic_alias_invalid
2035 [&](v3_1_1::basic_pubrec_packet<PacketIdBytes>& p) {
2036 auto packet_id = p.packet_id();
2037 if (ep.pid_pubrec_.erase(packet_id)) {
2038 ep.store_.erase(response_packet::v3_1_1_pubrec, packet_id);
2039 if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
2041 v3_1_1::basic_pubrel_packet<PacketIdBytes>(packet_id),
2042 [](system_error
const&){}
2047 ASYNC_MQTT_LOG(
"mqtt_impl", info)
2048 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2049 <<
"invalid packet_id pubrec received packet_id:" << packet_id;
2051 decided_error.emplace(
2059 v5::disconnect_packet{
2060 disconnect_reason_code::topic_alias_invalid
2067 [&](v5::basic_pubrec_packet<PacketIdBytes>& p) {
2068 auto packet_id = p.packet_id();
2069 if (ep.pid_pubrec_.erase(packet_id)) {
2070 ep.store_.erase(response_packet::v5_pubrec, packet_id);
2071 if (is_error(p.code())) {
2072 ep.release_pid(packet_id);
2073 ep.qos2_publish_processing_.erase(packet_id);
2074 --ep.publish_send_count_;
2075 send_publish_from_queue();
2077 else if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
2079 v5::basic_pubrel_packet<PacketIdBytes>(packet_id),
2080 [](system_error
const&){}
2085 ASYNC_MQTT_LOG(
"mqtt_impl", info)
2086 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2087 <<
"invalid packet_id pubrec received packet_id:" << packet_id;
2089 decided_error.emplace(
2097 v5::disconnect_packet{
2098 disconnect_reason_code::topic_alias_invalid
2105 [&](v3_1_1::basic_pubrel_packet<PacketIdBytes>& p) {
2106 auto packet_id = p.packet_id();
2107 ep.qos2_publish_handled_.erase(packet_id);
2108 if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
2110 v3_1_1::basic_pubcomp_packet<PacketIdBytes>(packet_id),
2111 [](system_error
const&){}
2115 [&](v5::basic_pubrel_packet<PacketIdBytes>& p) {
2116 auto packet_id = p.packet_id();
2117 ep.qos2_publish_handled_.erase(packet_id);
2118 if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
2120 v5::basic_pubcomp_packet<PacketIdBytes>(packet_id),
2121 [](system_error
const&){}
2125 [&](v3_1_1::basic_pubcomp_packet<PacketIdBytes>& p) {
2126 auto packet_id = p.packet_id();
2127 if (ep.pid_pubcomp_.erase(packet_id)) {
2128 ep.store_.erase(response_packet::v3_1_1_pubcomp, packet_id);
2129 ep.release_pid(packet_id);
2130 ep.qos2_publish_processing_.erase(packet_id);
2131 --ep.publish_send_count_;
2132 send_publish_from_queue();
2135 ASYNC_MQTT_LOG(
"mqtt_impl", info)
2136 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2137 <<
"invalid packet_id pubcomp received packet_id:" << packet_id;
2139 decided_error.emplace(
2147 v5::disconnect_packet{
2148 disconnect_reason_code::topic_alias_invalid
2155 [&](v5::basic_pubcomp_packet<PacketIdBytes>& p) {
2156 auto packet_id = p.packet_id();
2157 if (ep.pid_pubcomp_.erase(packet_id)) {
2158 ep.store_.erase(response_packet::v5_pubcomp, packet_id);
2159 ep.release_pid(packet_id);
2160 ep.qos2_publish_processing_.erase(packet_id);
2163 ASYNC_MQTT_LOG(
"mqtt_impl", info)
2164 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2165 <<
"invalid packet_id pubcomp received packet_id:" << packet_id;
2167 decided_error.emplace(
2175 v5::disconnect_packet{
2176 disconnect_reason_code::topic_alias_invalid
2183 [&](v3_1_1::basic_subscribe_packet<PacketIdBytes>&) {
2185 [&](v5::basic_subscribe_packet<PacketIdBytes>&) {
2187 [&](v3_1_1::basic_suback_packet<PacketIdBytes>& p) {
2188 auto packet_id = p.packet_id();
2189 if (ep.pid_suback_.erase(packet_id)) {
2190 ep.release_pid(packet_id);
2193 [&](v5::basic_suback_packet<PacketIdBytes>& p) {
2194 auto packet_id = p.packet_id();
2195 if (ep.pid_suback_.erase(packet_id)) {
2196 ep.release_pid(packet_id);
2199 [&](v3_1_1::basic_unsubscribe_packet<PacketIdBytes>&) {
2201 [&](v5::basic_unsubscribe_packet<PacketIdBytes>&) {
2203 [&](v3_1_1::basic_unsuback_packet<PacketIdBytes>& p) {
2204 auto packet_id = p.packet_id();
2205 if (ep.pid_unsuback_.erase(packet_id)) {
2206 ep.release_pid(packet_id);
2209 [&](v5::basic_unsuback_packet<PacketIdBytes>& p) {
2210 auto packet_id = p.packet_id();
2211 if (ep.pid_unsuback_.erase(packet_id)) {
2212 ep.release_pid(packet_id);
2215 [&](v3_1_1::pingreq_packet&) {
2216 if constexpr(can_send_as_server(Role)) {
2217 if (ep.auto_ping_response_ && ep.status_ == connection_status::connected) {
2219 v3_1_1::pingresp_packet(),
2220 [](system_error
const&){}
2225 [&](v5::pingreq_packet&) {
2226 if constexpr(can_send_as_server(Role)) {
2227 if (ep.auto_ping_response_ && ep.status_ == connection_status::connected) {
2229 v5::pingresp_packet(),
2230 [](system_error
const&){}
2235 [&](v3_1_1::pingresp_packet&) {
2236 ep.tim_pingresp_recv_->cancel();
2238 [&](v5::pingresp_packet&) {
2239 ep.tim_pingresp_recv_->cancel();
2241 [&](v3_1_1::disconnect_packet&) {
2242 ep.status_ = connection_status::disconnecting;
2244 [&](v5::disconnect_packet&) {
2245 ep.status_ = connection_status::disconnecting;
2247 [&](v5::auth_packet&) {
2249 [&](system_error&) {
2250 ep.status_ = connection_status::closed;
2254 ep.reset_pingreq_recv_timer();
2255 ep.recv_processing_ =
false;
2259 if (call_complete && !decided_error) {
2260 pv_opt.emplace(force_move(v));
2262 bind_dispatch(force_move(self));
2267 if (
auto type_opt = v.type()) {
2268 if ((*fil == filter::match && types.find(*type_opt) == types.end()) ||
2269 (*fil == filter::except && types.find(*type_opt) != types.end())
2276 a_ep.stream_->raw_strand(),
2298 a_ep.stream_->raw_strand(),
2304 BOOST_ASSERT(decided_error);
2305 ep.recv_processing_ =
false;
2306 ASYNC_MQTT_LOG(
"mqtt_impl", info)
2307 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2308 <<
"recv code triggers close:" << decided_error->what();
2309 pv_opt.emplace(force_move(*decided_error));
2311 bind_dispatch(force_move(self));
2314 BOOST_ASSERT(pv_opt);
2315 self.complete(force_move(*pv_opt));
2318 BOOST_ASSERT(
false);
2323 template <
typename Self>
2328 BOOST_ASSERT(state == disconnect);
2333 a_ep.stream_->raw_strand(),
2339 void send_publish_from_queue() {
2340 BOOST_ASSERT(ep.in_strand());
2341 if (ep.status_ != connection_status::connected)
return;
2342 while (!ep.publish_queue_.empty() &&
2343 ep.publish_send_count_ != ep.publish_send_max_) {
2345 force_move(ep.publish_queue_.front()),
2347 [](system_error
const&){}
2349 ep.publish_queue_.pop_front();
2353 template <
typename Self>
2354 bool process_qos2_publish(
2356 protocol_version ver,
2359 BOOST_ASSERT(ep.in_strand());
2360 bool already_handled =
false;
2361 if (ep.qos2_publish_handled_.find(packet_id) == ep.qos2_publish_handled_.end()) {
2362 ep.qos2_publish_handled_.emplace(packet_id);
2365 already_handled =
true;
2367 if (ep.status_ == connection_status::connected &&
2368 (ep.auto_pub_response_ ||
2373 case protocol_version::v3_1_1:
2375 v3_1_1::basic_pubrec_packet<PacketIdBytes>(packet_id),
2376 [](system_error
const&){}
2379 case protocol_version::v5:
2381 v5::basic_pubrec_packet<PacketIdBytes>(packet_id),
2382 [](system_error
const&){}
2386 BOOST_ASSERT(
false);
2390 if (already_handled) {
2393 a_ep.stream_->read_packet(force_move(self));
2402 enum { dispatch, close, bind, complete } state = dispatch;
2403 this_type_sp life_keeper = ep.shared_from_this();
2405 template <
typename Self>
2408 error_code
const& = error_code{}
2416 a_ep.stream_->raw_strand(),
2422 BOOST_ASSERT(ep.in_strand());
2423 switch (ep.status_) {
2424 case connection_status::connecting:
2425 case connection_status::connected:
2426 case connection_status::disconnecting: {
2427 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
2428 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2429 <<
"close initiate status:" <<
static_cast<int>(ep.status_);
2431 ep.status_ = connection_status::closing;
2433 a_ep.stream_->close(force_move(self));
2435 case connection_status::closing: {
2436 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
2437 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2438 <<
"already close requested";
2440 auto exe = as::get_associated_executor(self);
2441 a_ep.close_queue_.post(
2448 case connection_status::closed:
2449 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
2450 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2451 <<
"already closed";
2453 bind_dispatch(force_move(self));
2456 BOOST_ASSERT(ep.in_strand());
2457 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
2458 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2459 <<
"close complete status:" <<
static_cast<int>(ep.status_);
2461 a_ep.tim_pingreq_send_->cancel();
2462 a_ep.tim_pingreq_recv_->cancel();
2463 a_ep.tim_pingresp_recv_->cancel();
2464 a_ep.status_ = connection_status::closed;
2465 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
2466 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2467 <<
"process enqueued close";
2468 a_ep.close_queue_.poll();
2471 bind_dispatch(force_move(self));
2480 struct restore_packets_impl {
2482 std::vector<basic_store_packet_variant<PacketIdBytes>> pvs;
2483 enum { dispatch, restore, complete } state = dispatch;
2485 template <
typename Self>
2495 a_ep.stream_->raw_strand(),
2501 BOOST_ASSERT(ep.in_strand());
2502 ep.restore_packets(force_move(pvs));
2504 bind_dispatch(force_move(self));
2513 struct get_stored_packets_impl {
2514 this_type
const& ep;
2515 std::vector<basic_store_packet_variant<PacketIdBytes>> packets = {};
2516 enum { dispatch, get, complete } state = dispatch;
2518 template <
typename Self>
2528 a_ep.stream_->raw_strand(),
2534 BOOST_ASSERT(ep.in_strand());
2535 packets = ep.get_stored_packets();
2537 bind_dispatch(force_move(self));
2540 self.complete(force_move(packets));
2546 struct regulate_for_store_impl {
2547 this_type
const& ep;
2548 v5::basic_publish_packet<PacketIdBytes> packet;
2549 enum { dispatch, regulate, complete } state = dispatch;
2551 template <
typename Self>
2561 a_ep.stream_->raw_strand(),
2567 BOOST_ASSERT(ep.in_strand());
2568 ep.regulate_for_store(packet);
2570 bind_dispatch(force_move(self));
2573 self.complete(force_move(packet));
2581 template <
typename Packet,
typename CompletionToken>
2586 CompletionToken&& token
2602 bool enqueue_publish(v5::basic_publish_packet<PacketIdBytes>& packet) {
2604 if (packet.opts().get_qos() == qos::at_least_once ||
2605 packet.opts().get_qos() == qos::exactly_once
2607 if (publish_send_count_ == publish_send_max_) {
2608 publish_queue_.push_back(force_move(packet));
2612 ++publish_send_count_;
2613 if (!publish_queue_.empty()) {
2614 publish_queue_.push_back(force_move(packet));
2622 void send_stored() {
2625 [&](basic_store_packet_variant<PacketIdBytes>
const& pv) {
2626 if (pv.size() > maximum_packet_size_send_) {
2627 release_pid(pv.packet_id());
2634 [&](v3_1_1::basic_publish_packet<PacketIdBytes> p) {
2637 [](system_error const&){}
2640 [&](v5::basic_publish_packet<PacketIdBytes> p) {
2641 if (enqueue_publish(p)) return;
2644 [](system_error const&){}
2647 [&](v3_1_1::basic_pubrel_packet<PacketIdBytes> p) {
2650 [](system_error
const&){}
2653 [&](v5::basic_pubrel_packet<PacketIdBytes> p) {
2656 [](system_error
const&){}
2667 BOOST_ASSERT(in_strand());
2668 publish_send_count_ = 0;
2669 publish_queue_.clear();
2670 topic_alias_send_ = nullopt;
2671 topic_alias_recv_ = nullopt;
2672 publish_recv_.clear();
2673 qos2_publish_processing_.clear();
2674 need_store_ =
false;
2675 pid_suback_.clear();
2676 pid_unsuback_.clear();
2677 pid_puback_.clear();
2678 pid_pubrec_.clear();
2679 pid_pubcomp_.clear();
2682 void reset_pingreq_send_timer() {
2683 BOOST_ASSERT(in_strand());
2684 if (pingreq_send_interval_ms_) {
2685 tim_pingreq_send_->cancel();
2686 if (status_ == connection_status::disconnecting ||
2687 status_ == connection_status::closing ||
2688 status_ == connection_status::closed)
return;
2689 tim_pingreq_send_->expires_after(
2690 std::chrono::milliseconds{*pingreq_send_interval_ms_}
2692 tim_pingreq_send_->async_wait(
2693 [
this, wp = std::weak_ptr{tim_pingreq_send_}](error_code
const& ec) {
2695 if (
auto sp = wp.lock()) {
2696 switch (protocol_version_) {
2697 case protocol_version::v3_1_1:
2699 v3_1_1::pingreq_packet(),
2700 [](system_error
const&){}
2703 case protocol_version::v5:
2705 v5::pingreq_packet(),
2706 [](system_error
const&){}
2710 BOOST_ASSERT(
false);
2720 void reset_pingreq_recv_timer() {
2721 BOOST_ASSERT(in_strand());
2722 if (pingreq_recv_timeout_ms_) {
2723 tim_pingreq_recv_->cancel();
2724 if (status_ == connection_status::disconnecting ||
2725 status_ == connection_status::closing ||
2726 status_ == connection_status::closed)
return;
2727 tim_pingreq_recv_->expires_after(
2728 std::chrono::milliseconds{*pingreq_recv_timeout_ms_}
2730 tim_pingreq_recv_->async_wait(
2731 [
this, wp = std::weak_ptr{tim_pingreq_recv_}](error_code
const& ec) {
2733 if (
auto sp = wp.lock()) {
2734 switch (protocol_version_) {
2735 case protocol_version::v3_1_1:
2736 ASYNC_MQTT_LOG(
"mqtt_impl", error)
2737 << ASYNC_MQTT_ADD_VALUE(address,
this)
2738 <<
"pingreq recv timeout. close.";
2743 case protocol_version::v5:
2744 ASYNC_MQTT_LOG(
"mqtt_impl", error)
2745 << ASYNC_MQTT_ADD_VALUE(address,
this)
2746 <<
"pingreq recv timeout. close.";
2748 v5::disconnect_packet{
2749 disconnect_reason_code::keep_alive_timeout,
2752 [
this](system_error
const&){
2760 BOOST_ASSERT(
false);
2770 void reset_pingresp_recv_timer() {
2771 BOOST_ASSERT(in_strand());
2772 if (pingresp_recv_timeout_ms_) {
2773 tim_pingresp_recv_->cancel();
2774 if (status_ == connection_status::disconnecting ||
2775 status_ == connection_status::closing ||
2776 status_ == connection_status::closed)
return;
2777 tim_pingresp_recv_->expires_after(
2778 std::chrono::milliseconds{*pingresp_recv_timeout_ms_}
2780 tim_pingresp_recv_->async_wait(
2781 [
this, wp = std::weak_ptr{tim_pingresp_recv_}](error_code
const& ec) {
2783 if (
auto sp = wp.lock()) {
2784 switch (protocol_version_) {
2785 case protocol_version::v3_1_1:
2786 ASYNC_MQTT_LOG(
"mqtt_impl", error)
2787 << ASYNC_MQTT_ADD_VALUE(address,
this)
2788 <<
"pingresp recv timeout. close.";
2793 case protocol_version::v5:
2794 ASYNC_MQTT_LOG(
"mqtt_impl", error)
2795 << ASYNC_MQTT_ADD_VALUE(address,
this)
2796 <<
"pingresp recv timeout. close.";
2797 if (status_ == connection_status::connected) {
2799 v5::disconnect_packet{
2800 disconnect_reason_code::keep_alive_timeout,
2803 [
this](system_error
const&){
2817 BOOST_ASSERT(
false);
2827 template <
typename CompletionToken>
2829 CompletionToken&& token
2831 auto tim = std::make_shared<as::steady_timer>(stream_->raw_strand());
2832 tim->expires_at(std::chrono::steady_clock::time_point::max());
2835 stream_->raw_strand(),
2836 std::forward<CompletionToken>(token)
2839 tim_retry_acq_pid_queue_.emplace_back(force_move(tim));
2842 void notify_retry_one() {
2843 for (
auto it = tim_retry_acq_pid_queue_.begin();
2844 it != tim_retry_acq_pid_queue_.end();
2847 if (it->cancelled)
continue;
2849 it->cancelled =
true;
2854 void complete_retry_one() {
2855 if (!tim_retry_acq_pid_queue_.empty()) {
2856 tim_retry_acq_pid_queue_.pop_front();
2860 void notify_retry_all() {
2861 tim_retry_acq_pid_queue_.clear();
2864 bool has_retry()
const {
2865 return !tim_retry_acq_pid_queue_.empty();
2868 void clear_pid_man() {
2873 void release_pid(packet_id_t pid) {
2874 pid_man_.release_id(pid);
2880 std::shared_ptr<stream_type> stream_;
2881 packet_id_manager<packet_id_t> pid_man_;
2882 std::set<packet_id_t> pid_suback_;
2883 std::set<packet_id_t> pid_unsuback_;
2884 std::set<packet_id_t> pid_puback_;
2885 std::set<packet_id_t> pid_pubrec_;
2886 std::set<packet_id_t> pid_pubcomp_;
2888 bool need_store_ =
false;
2889 store<PacketIdBytes, as::strand<as::any_io_executor>> store_{stream_->raw_strand()};
2891 bool auto_pub_response_ =
false;
2892 bool auto_ping_response_ =
false;
2894 bool auto_map_topic_alias_send_ =
false;
2895 bool auto_replace_topic_alias_send_ =
false;
2896 optional<topic_alias_send> topic_alias_send_;
2897 optional<topic_alias_recv> topic_alias_recv_;
2899 receive_maximum_t publish_send_max_{receive_maximum_max};
2900 receive_maximum_t publish_recv_max_{receive_maximum_max};
2901 receive_maximum_t publish_send_count_{0};
2903 std::set<packet_id_t> publish_recv_;
2904 std::deque<v5::basic_publish_packet<PacketIdBytes>> publish_queue_;
2906 ioc_queue close_queue_;
2908 std::uint32_t maximum_packet_size_send_{packet_size_no_limit};
2909 std::uint32_t maximum_packet_size_recv_{packet_size_no_limit};
2911 connection_status status_{connection_status::closed};
2913 optional<std::size_t> pingreq_send_interval_ms_;
2914 optional<std::size_t> pingreq_recv_timeout_ms_;
2915 optional<std::size_t> pingresp_recv_timeout_ms_;
2917 std::shared_ptr<as::steady_timer> tim_pingreq_send_{std::make_shared<as::steady_timer>(stream_->raw_strand())};
2918 std::shared_ptr<as::steady_timer> tim_pingreq_recv_{std::make_shared<as::steady_timer>(stream_->raw_strand())};
2919 std::shared_ptr<as::steady_timer> tim_pingresp_recv_{std::make_shared<as::steady_timer>(stream_->raw_strand())};
2921 std::set<packet_id_t> qos2_publish_handled_;
2923 bool recv_processing_ =
false;
2924 std::set<packet_id_t> qos2_publish_processing_;
2926 struct tim_cancelled {
2928 std::shared_ptr<as::steady_timer> tim,
2929 bool cancelled =
false
2930 ):tim{force_move(tim)}, cancelled{cancelled}
2932 std::shared_ptr<as::steady_timer> tim;
2935 std::deque<tim_cancelled> tim_retry_acq_pid_queue_;