67class basic_endpoint :
public std::enable_shared_from_this<basic_endpoint<Role, PacketIdBytes, NextLayer>>{
69 enum class connection_status {
77 static constexpr bool can_send_as_client(
role r) {
78 return static_cast<int>(
r) &
static_cast<int>(role::client);
81 static constexpr bool can_send_as_server(
role r) {
82 return static_cast<int>(
r) &
static_cast<int>(role::server);
87 for (
auto const&
prop : props) {
103 using this_type_sp = std::shared_ptr<this_type>;
104 using this_type_wp = std::weak_ptr<this_type>;
110 template <
typename T>
111 friend class make_shared_helper;
137 template <
typename...
Args>
138 static std::shared_ptr<this_type>
create(
142 return make_shared_helper<this_type>::make_shared(
ver, std::forward<Args>(
args)...);
146 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
147 << ASYNC_MQTT_ADD_VALUE(
address,
this)
151 basic_endpoint(this_type
const&) =
delete;
152 basic_endpoint(this_type&&) =
delete;
153 this_type& operator=(this_type
const&) =
delete;
154 this_type& operator=(this_type&&) =
delete;
161 return stream_->strand();
169 return stream_->strand();
177 return stream_->in_strand();
185 return stream_->next_layer();
192 return stream_->next_layer();
200 return stream_->lowest_layer();
207 return stream_->lowest_layer();
216 ASYNC_MQTT_LOG(
"mqtt_api", info)
217 << ASYNC_MQTT_ADD_VALUE(
address,
this)
218 <<
"set_auto_pub_response val:" << val;
219 auto_pub_response_ = val;
228 ASYNC_MQTT_LOG(
"mqtt_api", info)
229 << ASYNC_MQTT_ADD_VALUE(
address,
this)
230 <<
"set_auto_ping_response val:" << val;
231 auto_ping_response_ = val;
242 ASYNC_MQTT_LOG(
"mqtt_api", info)
243 << ASYNC_MQTT_ADD_VALUE(
address,
this)
244 <<
"set_auto_map_topic_alias_send val:" << val;
245 auto_map_topic_alias_send_ = val;
256 ASYNC_MQTT_LOG(
"mqtt_api", info)
257 << ASYNC_MQTT_ADD_VALUE(
address,
this)
258 <<
"set_auto_replace_topic_alias_send val:" << val;
259 auto_replace_topic_alias_send_ = val;
273 pingresp_recv_timeout_ms_ = nullopt;
276 pingresp_recv_timeout_ms_.emplace(
ms);
287 template <
typename CompletionToken>
292 ASYNC_MQTT_LOG(
"mqtt_api", info)
293 << ASYNC_MQTT_ADD_VALUE(
address,
this)
294 <<
"acquire_unique_packet_id";
300 acquire_unique_packet_id_impl{
313 template <
typename CompletionToken>
318 ASYNC_MQTT_LOG(
"mqtt_api", info)
319 << ASYNC_MQTT_ADD_VALUE(
address,
this)
320 <<
"acquire_unique_packet_id_wait_until";
326 acquire_unique_packet_id_wait_until_impl{
339 template <
typename CompletionToken>
345 ASYNC_MQTT_LOG(
"mqtt_api", info)
346 << ASYNC_MQTT_ADD_VALUE(
address,
this)
347 <<
"register_packet_id pid:" << packet_id;
353 register_packet_id_impl{
367 template <
typename CompletionToken>
373 ASYNC_MQTT_LOG(
"mqtt_api", info)
374 << ASYNC_MQTT_ADD_VALUE(
address,
this)
375 <<
"release_packet_id pid:" << packet_id;
381 release_packet_id_impl{
396 template <
typename Packet,
typename CompletionToken>
402 ASYNC_MQTT_LOG(
"mqtt_api", info)
403 << ASYNC_MQTT_ADD_VALUE(
address,
this)
404 <<
"send:" << packet;
405 if constexpr(!std::is_same_v<Packet, basic_packet_variant<PacketIdBytes>>) {
409 "Packet cannot be send by MQTT protocol"
417 std::forward<CompletionToken>(
token)
427 template <
typename CompletionToken>
432 ASYNC_MQTT_LOG(
"mqtt_api", info)
433 << ASYNC_MQTT_ADD_VALUE(
address,
this)
436 recv_processing_ =
true;
458 template <
typename CompletionToken>
461 std::set<control_packet_type> types,
464 ASYNC_MQTT_LOG(
"mqtt_api", info)
465 << ASYNC_MQTT_ADD_VALUE(
address,
this)
468 recv_processing_ =
true;
493 template <
typename CompletionToken>
497 std::set<control_packet_type> types,
500 ASYNC_MQTT_LOG(
"mqtt_api", info)
501 << ASYNC_MQTT_ADD_VALUE(
address,
this)
504 recv_processing_ =
true;
524 template<
typename CompletionToken>
527 ASYNC_MQTT_LOG(
"mqtt_api", info)
528 << ASYNC_MQTT_ADD_VALUE(
address,
this)
549 template <
typename CompletionToken>
555 ASYNC_MQTT_LOG(
"mqtt_api", info)
556 << ASYNC_MQTT_ADD_VALUE(
address,
this)
557 <<
"restore_packets";
563 restore_packets_impl{
580 template <
typename CompletionToken>
585 ASYNC_MQTT_LOG(
"mqtt_api", info)
586 << ASYNC_MQTT_ADD_VALUE(
address,
this)
587 <<
"get_stored_packets";
593 get_stored_packets_impl{
600 template <
typename CompletionToken>
603 v5::basic_publish_packet<PacketIdBytes> packet,
606 ASYNC_MQTT_LOG(
"mqtt_api", info)
607 << ASYNC_MQTT_ADD_VALUE(
address,
this)
608 <<
"regulate_for_store:" << packet;
612 void(v5::basic_publish_packet<PacketIdBytes>)
614 regulate_for_store_impl{
631 auto pid = pid_man_.acquire_unique_id();
633 ASYNC_MQTT_LOG(
"mqtt_api", info)
634 << ASYNC_MQTT_ADD_VALUE(
address,
this)
635 <<
"acquire_unique_packet_id:" << *
pid;
638 ASYNC_MQTT_LOG(
"mqtt_api", info)
639 << ASYNC_MQTT_ADD_VALUE(
address,
this)
640 <<
"acquire_unique_packet_id:full";
653 auto ret = pid_man_.register_id(
pid);
654 ASYNC_MQTT_LOG(
"mqtt_api", info)
655 << ASYNC_MQTT_ADD_VALUE(
address,
this)
656 <<
"register_packet_id:" <<
pid <<
" result:" <<
ret;
667 ASYNC_MQTT_LOG(
"mqtt_api", info)
668 << ASYNC_MQTT_ADD_VALUE(
address,
this)
669 <<
"release_packet_id:" <<
pid;
681 ASYNC_MQTT_LOG(
"mqtt_api", info)
682 << ASYNC_MQTT_ADD_VALUE(
address,
this)
683 <<
"get_qos2_publish_handled_pids";
684 return qos2_publish_handled_;
695 ASYNC_MQTT_LOG(
"mqtt_api", info)
696 << ASYNC_MQTT_ADD_VALUE(
address,
this)
697 <<
"restore_qos2_publish_handled_pids";
698 qos2_publish_handled_ = force_move(
pids);
711 ASYNC_MQTT_LOG(
"mqtt_api", info)
712 << ASYNC_MQTT_ADD_VALUE(
address,
this)
713 <<
"restore_packets";
714 for (
auto&
pv : pvs) {
717 if (pid_man_.register_id(p.packet_id())) {
718 store_.add(force_move(p));
721 ASYNC_MQTT_LOG(
"mqtt_impl", error)
722 << ASYNC_MQTT_ADD_VALUE(
address,
this)
723 <<
"packet_id:" << p.packet_id()
724 <<
" has already been used. Skip it";
742 ASYNC_MQTT_LOG(
"mqtt_api", info)
743 << ASYNC_MQTT_ADD_VALUE(
address,
this)
744 <<
"get_stored_packets";
745 return store_.get_stored();
755 ASYNC_MQTT_LOG(
"mqtt_api", info)
756 << ASYNC_MQTT_ADD_VALUE(
address,
this)
757 <<
"get_protocol_version:" << protocol_version_;
758 return protocol_version_;
769 ASYNC_MQTT_LOG(
"mqtt_api", info)
770 << ASYNC_MQTT_ADD_VALUE(
address,
this)
771 <<
"is_publish_processing:" <<
pid;
772 return qos2_publish_processing_.find(
pid) != qos2_publish_processing_.end();
784 ASYNC_MQTT_LOG(
"mqtt_api", info)
785 << ASYNC_MQTT_ADD_VALUE(
address,
this)
786 <<
"regulate_for_store:" << packet;
787 if (packet.topic().empty()) {
788 if (
auto ta_opt = get_topic_alias(packet.props())) {
789 auto topic = topic_alias_send_->find_without_touch(*
ta_opt);
790 if (!topic.empty()) {
791 packet.remove_topic_alias_add_topic(allocate_buffer(topic));
796 packet.remove_topic_alias();
800 void cancel_all_timers_for_test() {
802 tim_pingreq_send_->cancel();
803 tim_pingreq_recv_->cancel();
804 tim_pingresp_recv_->cancel();
807 void set_pingreq_send_interval_ms_for_test(std::size_t ms) {
809 pingreq_send_interval_ms_ = ms;
823 template <
typename... Args>
825 protocol_version ver,
827 ): protocol_version_{ver},
828 stream_{stream_type::
create(std::forward<Args>(args)...)}
831 (Role == role::client && ver != protocol_version::undetermined) ||
837 struct acquire_unique_packet_id_impl {
839 optional<packet_id_t> pid_opt = nullopt;
840 enum { dispatch, acquire, complete } state = dispatch;
842 template <
typename Self>
852 a_ep.stream_->raw_strand(),
858 BOOST_ASSERT(ep.in_strand());
859 pid_opt = ep.pid_man_.acquire_unique_id();
861 bind_dispatch(force_move(self));
864 self.complete(pid_opt);
870 struct acquire_unique_packet_id_wait_until_impl {
872 this_type_wp retry_wp = ep.weak_from_this();
873 optional<packet_id_t> pid_opt = nullopt;
874 enum { dispatch, acquire, complete } state = dispatch;
876 template <
typename Self>
879 error_code
const& ec = error_code{}
881 if (retry_wp.expired())
return;
888 a_ep.stream_->raw_strand(),
894 BOOST_ASSERT(ep.in_strand());
897 pid_opt = ep.pid_man_.acquire_unique_id();
900 bind_dispatch(force_move(self));
903 ASYNC_MQTT_LOG(
"mqtt_impl", warning)
904 << ASYNC_MQTT_ADD_VALUE(address, &ep)
905 <<
"packet_id is fully allocated. waiting release";
914 if (ec == errc::operation_canceled) {
915 ep.complete_retry_one();
918 else if (ep.has_retry()) {
919 ASYNC_MQTT_LOG(
"mqtt_impl", warning)
920 << ASYNC_MQTT_ADD_VALUE(address, &ep)
921 <<
"packet_id waiter exists. add the end of waiter queue";
933 BOOST_ASSERT(pid_opt);
934 self.complete(*pid_opt);
940 struct register_packet_id_impl {
944 enum { dispatch, regi, complete } state = dispatch;
946 template <
typename Self>
956 a_ep.stream_->raw_strand(),
962 BOOST_ASSERT(ep.in_strand());
963 result = ep.pid_man_.register_id(packet_id);
965 bind_dispatch(force_move(self));
968 self.complete(result);
974 struct release_packet_id_impl {
977 enum { dispatch, rel, complete } state = dispatch;
979 template <
typename Self>
989 a_ep.stream_->raw_strand(),
995 BOOST_ASSERT(ep.in_strand());
996 ep.release_pid(packet_id);
998 bind_dispatch(force_move(self));
1008 template <
typename Packet>
1012 bool from_queue =
false;
1013 error_code last_ec = error_code{};
1014 enum { dispatch, write, bind, complete } state = dispatch;
1016 template <
typename Self>
1019 error_code
const& ec = error_code{},
1023 ASYNC_MQTT_LOG(
"mqtt_impl", info)
1024 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1025 <<
"send error:" << ec.message();
1028 bind_dispatch(force_move(self));
1038 a_ep.stream_->raw_strand(),
1044 BOOST_ASSERT(ep.in_strand());
1047 std::is_same_v<std::decay_t<Packet>, basic_packet_variant<PacketIdBytes>> ||
1048 std::is_same_v<std::decay_t<Packet>, basic_store_packet_variant<PacketIdBytes>>
1052 [&](
auto& actual_packet) {
1053 if (process_send_packet(self, actual_packet)) {
1055 a_ep.stream_->write_packet(
1056 force_move(actual_packet),
1059 if constexpr(is_connack<std::remove_reference_t<
decltype(actual_packet)>>()) {
1063 if constexpr(Role == role::client) {
1064 a_ep.reset_pingreq_send_timer();
1068 [&](system_error&) {}
1073 if (process_send_packet(self, packet)) {
1075 auto& a_packet{packet};
1076 a_ep.stream_->write_packet(
1077 force_move(a_packet),
1080 if constexpr(is_connack<Packet>()) {
1084 if constexpr(Role == role::client) {
1085 a_ep.reset_pingreq_send_timer();
1091 BOOST_ASSERT(ep.in_strand());
1094 bind_dispatch(force_move(self));
1098 self.complete(last_ec);
1103 template <
typename Self,
typename ActualPacket>
1104 bool process_send_packet(Self& self, ActualPacket& actual_packet) {
1108 (can_send_as_client(Role) && is_client_sendable<std::decay_t<ActualPacket>>()) ||
1109 (can_send_as_server(Role) && is_server_sendable<std::decay_t<ActualPacket>>())
1114 errc::protocol_error,
1115 "packet cannot be send by MQTT protocol"
1121 auto version_check =
1123 if (ep.protocol_version_ == protocol_version::v3_1_1 && is_v3_1_1<ActualPacket>()) {
1126 if (ep.protocol_version_ == protocol_version::v5 && is_v5<ActualPacket>()) {
1133 if constexpr(is_connect<ActualPacket>()) {
1134 if (ep.status_ != connection_status::closed) {
1137 errc::protocol_error,
1138 "connect_packet can only be send on connection_status::closed"
1143 if (!version_check()) {
1146 errc::protocol_error,
1147 "protocol version mismatch"
1153 else if constexpr(is_connack<ActualPacket>()) {
1154 if (ep.status_ != connection_status::connecting) {
1157 errc::protocol_error,
1158 "connack_packet can only be send on connection_status::connecting"
1163 if (!version_check()) {
1166 errc::protocol_error,
1167 "protocol version mismatch"
1173 else if constexpr(std::is_same_v<v5::auth_packet, Packet>) {
1174 if (ep.status_ != connection_status::connected &&
1175 ep.status_ != connection_status::connecting) {
1178 errc::protocol_error,
1179 "auth packet can only be send on connection_status::connecting or status::connected"
1184 if (!version_check()) {
1187 errc::protocol_error,
1188 "protocol version mismatch"
1195 if (ep.status_ != connection_status::connected) {
1196 if constexpr(!is_publish<std::decay_t<ActualPacket>>()) {
1199 errc::protocol_error,
1200 "packet can only be send on connection_status::connected"
1206 if (!version_check()) {
1209 errc::protocol_error,
1210 "protocol version mismatch"
1218 bool topic_alias_validated =
false;
1220 if constexpr(std::is_same_v<v3_1_1::connect_packet, std::decay_t<ActualPacket>>) {
1222 ep.status_ = connection_status::connecting;
1223 auto keep_alive = actual_packet.keep_alive();
1224 if (keep_alive != 0 && !ep.pingreq_send_interval_ms_) {
1225 ep.pingreq_send_interval_ms_.emplace(keep_alive * 1000);
1227 if (actual_packet.clean_session()) {
1230 ep.need_store_ =
false;
1233 ep.need_store_ =
true;
1235 ep.topic_alias_send_ = nullopt;
1238 if constexpr(std::is_same_v<v5::connect_packet, std::decay_t<ActualPacket>>) {
1240 ep.status_ = connection_status::connecting;
1241 auto keep_alive = actual_packet.keep_alive();
1242 if (keep_alive != 0 && !ep.pingreq_send_interval_ms_) {
1243 ep.pingreq_send_interval_ms_.emplace(keep_alive * 1000);
1245 if (actual_packet.clean_start()) {
1249 for (
auto const& prop : actual_packet.props()) {
1252 [&](property::topic_alias_maximum
const& p) {
1254 ep.topic_alias_recv_.emplace(p.val());
1257 [&](property::receive_maximum
const& p) {
1258 BOOST_ASSERT(p.val() != 0);
1259 ep.publish_recv_max_ = p.val();
1261 [&](property::maximum_packet_size
const& p) {
1262 BOOST_ASSERT(p.val() != 0);
1263 ep.maximum_packet_size_recv_ = p.val();
1265 [&](property::session_expiry_interval
const& p) {
1267 ep.need_store_ =
true;
1276 if constexpr(std::is_same_v<v3_1_1::connack_packet, std::decay_t<ActualPacket>>) {
1277 if (actual_packet.code() == connect_return_code::accepted) {
1278 ep.status_ = connection_status::connected;
1281 ep.status_ = connection_status::disconnecting;
1285 if constexpr(std::is_same_v<v5::connack_packet, std::decay_t<ActualPacket>>) {
1286 if (actual_packet.code() == connect_reason_code::success) {
1287 ep.status_ = connection_status::connected;
1288 for (
auto const& prop : actual_packet.props()) {
1291 [&](property::topic_alias_maximum
const& p) {
1293 ep.topic_alias_recv_.emplace(p.val());
1296 [&](property::receive_maximum
const& p) {
1297 BOOST_ASSERT(p.val() != 0);
1298 ep.publish_recv_max_ = p.val();
1300 [&](property::maximum_packet_size
const& p) {
1301 BOOST_ASSERT(p.val() != 0);
1302 ep.maximum_packet_size_recv_ = p.val();
1310 ep.status_ = connection_status::disconnecting;
1315 if constexpr(is_publish<std::decay_t<ActualPacket>>()) {
1316 if (actual_packet.opts().get_qos() == qos::at_least_once ||
1317 actual_packet.opts().get_qos() == qos::exactly_once
1319 BOOST_ASSERT(ep.pid_man_.is_used_id(actual_packet.packet_id()));
1320 if (ep.need_store_) {
1321 if constexpr(is_instance_of<v5::basic_publish_packet, std::decay_t<ActualPacket>>::value) {
1322 auto ta_opt = get_topic_alias(actual_packet.props());
1323 if (actual_packet.topic().empty()) {
1324 auto topic_opt = validate_topic_alias(self, ta_opt);
1326 auto packet_id = actual_packet.packet_id();
1327 if (packet_id != 0) {
1328 ep.release_pid(packet_id);
1332 topic_alias_validated =
true;
1333 auto props = actual_packet.props();
1334 auto it = props.cbegin();
1335 auto end = props.cend();
1336 for (; it != end; ++it) {
1337 if (it->id() == property::id::topic_alias) {
1345 actual_packet.packet_id(),
1346 allocate_buffer(*topic_opt),
1347 actual_packet.payload(),
1348 actual_packet.opts(),
1351 if (!validate_maximum_packet_size(self, store_packet)) {
1352 auto packet_id = actual_packet.packet_id();
1353 if (packet_id != 0) {
1354 ep.release_pid(packet_id);
1360 store_packet.set_dup(
true);
1361 ep.store_.add(force_move(store_packet));
1364 auto props = actual_packet.props();
1365 auto it = props.cbegin();
1366 auto end = props.cend();
1367 for (; it != end; ++it) {
1368 if (it->id() == property::id::topic_alias) {
1376 actual_packet.packet_id(),
1377 actual_packet.topic(),
1378 actual_packet.payload(),
1379 actual_packet.opts(),
1382 if (!validate_maximum_packet_size(self, store_packet)) {
1383 auto packet_id = actual_packet.packet_id();
1384 if (packet_id != 0) {
1385 ep.release_pid(packet_id);
1389 store_packet.set_dup(
true);
1390 ep.store_.add(force_move(store_packet));
1394 if (!validate_maximum_packet_size(self, actual_packet)) {
1395 auto packet_id = actual_packet.packet_id();
1396 if (packet_id != 0) {
1397 ep.release_pid(packet_id);
1401 auto store_packet{actual_packet};
1402 store_packet.set_dup(
true);
1403 ep.store_.add(force_move(store_packet));
1406 if (actual_packet.opts().get_qos() == qos::exactly_once) {
1407 ep.qos2_publish_processing_.insert(actual_packet.packet_id());
1408 ep.pid_pubrec_.insert(actual_packet.packet_id());
1411 ep.pid_puback_.insert(actual_packet.packet_id());
1416 if constexpr(is_instance_of<v5::basic_publish_packet, std::decay_t<ActualPacket>>::value) {
1418 auto ta_opt = get_topic_alias(actual_packet.props());
1419 if (actual_packet.topic().empty()) {
1420 if (!topic_alias_validated &&
1421 !validate_topic_alias(self, ta_opt)) {
1422 auto packet_id = actual_packet.packet_id();
1423 if (packet_id != 0) {
1424 ep.release_pid(packet_id);
1432 if (validate_topic_alias_range(self, *ta_opt)) {
1433 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
1434 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1436 << actual_packet.topic() <<
" - " << *ta_opt
1437 <<
" is registered." ;
1438 ep.topic_alias_send_->insert_or_update(actual_packet.topic(), *ta_opt);
1441 auto packet_id = actual_packet.packet_id();
1442 if (packet_id != 0) {
1443 ep.release_pid(packet_id);
1448 else if (ep.auto_map_topic_alias_send_) {
1449 if (ep.topic_alias_send_) {
1450 if (
auto ta_opt = ep.topic_alias_send_->find(actual_packet.topic())) {
1451 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
1452 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1453 <<
"topia alias : " << actual_packet.topic() <<
" - " << *ta_opt
1455 actual_packet.remove_topic_add_topic_alias(*ta_opt);
1458 auto lru_ta = ep.topic_alias_send_->get_lru_alias();
1459 ep.topic_alias_send_->insert_or_update(actual_packet.topic(), lru_ta);
1460 actual_packet.add_topic_alias(lru_ta);
1464 else if (ep.auto_replace_topic_alias_send_) {
1465 if (ep.topic_alias_send_) {
1466 if (
auto ta_opt = ep.topic_alias_send_->find(actual_packet.topic())) {
1467 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
1468 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1469 <<
"topia alias : " << actual_packet.topic() <<
" - " << *ta_opt
1471 actual_packet.remove_topic_add_topic_alias(*ta_opt);
1478 if (!from_queue && ep.enqueue_publish(actual_packet)) {
1482 "publish_packet is enqueued due to receive_maximum for sending"
1489 if constexpr(is_instance_of<v5::basic_puback_packet, std::decay_t<ActualPacket>>::value) {
1490 ep.publish_recv_.erase(actual_packet.packet_id());
1494 if constexpr(is_instance_of<v5::basic_pubrec_packet, std::decay_t<ActualPacket>>::value) {
1495 if (is_error(actual_packet.code())) {
1496 ep.publish_recv_.erase(actual_packet.packet_id());
1497 ep.qos2_publish_handled_.erase(actual_packet.packet_id());
1501 if constexpr(is_pubrel<std::decay_t<ActualPacket>>()) {
1502 BOOST_ASSERT(ep.pid_man_.is_used_id(actual_packet.packet_id()));
1503 if (ep.need_store_) ep.store_.add(actual_packet);
1504 ep.pid_pubcomp_.insert(actual_packet.packet_id());
1507 if constexpr(is_instance_of<v5::basic_pubcomp_packet, std::decay_t<ActualPacket>>::value) {
1508 ep.publish_recv_.erase(actual_packet.packet_id());
1511 if constexpr(is_subscribe<std::decay_t<ActualPacket>>()) {
1512 BOOST_ASSERT(ep.pid_man_.is_used_id(actual_packet.packet_id()));
1513 ep.pid_suback_.insert(actual_packet.packet_id());
1516 if constexpr(is_unsubscribe<std::decay_t<ActualPacket>>()) {
1517 BOOST_ASSERT(ep.pid_man_.is_used_id(actual_packet.packet_id()));
1518 ep.pid_unsuback_.insert(actual_packet.packet_id());
1521 if constexpr(is_pingreq<std::decay_t<ActualPacket>>()) {
1522 ep.reset_pingresp_recv_timer();
1525 if constexpr(is_disconnect<std::decay_t<ActualPacket>>()) {
1526 ep.status_ = connection_status::disconnecting;
1529 if (!validate_maximum_packet_size(self, actual_packet)) {
1530 if constexpr(own_packet_id<std::decay_t<ActualPacket>>()) {
1531 auto packet_id = actual_packet.packet_id();
1532 if (packet_id != 0) {
1533 ep.release_pid(packet_id);
1539 if constexpr(is_publish<std::decay_t<ActualPacket>>()) {
1540 if (ep.status_ != connection_status::connected) {
1545 "packet is stored but not sent"
1555 template <
typename Self>
1556 bool validate_topic_alias_range(Self& self, topic_alias_t ta) {
1557 if (!ep.topic_alias_send_) {
1561 "topic_alias is set but topic_alias_maximum is 0"
1566 if (ta == 0 || ta > ep.topic_alias_send_->max()) {
1570 "topic_alias is set but out of range"
1578 template <
typename Self>
1579 optional<std::string> validate_topic_alias(Self& self, optional<topic_alias_t> ta_opt) {
1580 BOOST_ASSERT(ep.in_strand());
1585 "topic is empty but topic_alias isn't set"
1591 if (!validate_topic_alias_range(self, *ta_opt)) {
1595 auto topic = ep.topic_alias_send_->find(*ta_opt);
1596 if (topic.empty()) {
1600 "topic is empty but topic_alias is not registered"
1608 template <
typename Self,
typename PacketArg>
1609 bool validate_maximum_packet_size(Self& self, PacketArg
const& packet_arg) {
1610 if (packet_arg.size() > ep.maximum_packet_size_send_) {
1611 ASYNC_MQTT_LOG(
"mqtt_impl", error)
1612 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1613 <<
"packet size over maximum_packet_size for sending";
1617 "packet size is over maximum_packet_size for sending"
1628 optional<filter> fil = nullopt;
1629 std::set<control_packet_type> types = {};
1630 optional<system_error> decided_error = nullopt;
1631 optional<basic_packet_variant<PacketIdBytes>> pv_opt = nullopt;
1632 enum { initiate, disconnect, close, read, complete } state = initiate;
1634 template <
typename Self>
1637 error_code
const& ec = error_code{},
1638 buffer buf = buffer{}
1641 ASYNC_MQTT_LOG(
"mqtt_impl", info)
1642 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1643 <<
"recv error:" << ec.message();
1644 decided_error.emplace(ec);
1645 ep.recv_processing_ =
false;
1650 a_ep.stream_->raw_strand(),
1661 a_ep.stream_->read_packet(force_move(self));
1664 BOOST_ASSERT(ep.in_strand());
1665 if (buf.size() > ep.maximum_packet_size_recv_) {
1667 BOOST_ASSERT(ep.protocol_version_ == protocol_version::v5);
1669 decided_error.emplace(
1672 "too large packet received"
1677 v5::disconnect_packet{
1678 disconnect_reason_code::packet_too_large
1685 auto v = buffer_to_basic_packet_variant<PacketIdBytes>(buf, ep.protocol_version_);
1686 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
1687 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1689 bool call_complete =
true;
1693 [&](v3_1_1::connect_packet& p) {
1695 ep.protocol_version_ = protocol_version::v3_1_1;
1696 ep.status_ = connection_status::connecting;
1697 auto keep_alive = p.keep_alive();
1698 if (keep_alive != 0) {
1699 ep.pingreq_recv_timeout_ms_.emplace(keep_alive * 1000 * 3 / 2);
1701 if (p.clean_session()) {
1702 ep.need_store_ =
false;
1705 ep.need_store_ =
true;
1708 [&](v5::connect_packet& p) {
1710 ep.protocol_version_ = protocol_version::v5;
1711 ep.status_ = connection_status::connecting;
1712 auto keep_alive = p.keep_alive();
1713 if (keep_alive != 0) {
1714 ep.pingreq_recv_timeout_ms_.emplace(keep_alive * 1000 * 3 / 2);
1716 for (
auto const& prop : p.props()) {
1719 [&](property::topic_alias_maximum
const& p) {
1721 ep.topic_alias_send_.emplace(p.val());
1724 [&](property::receive_maximum
const& p) {
1725 BOOST_ASSERT(p.val() != 0);
1726 ep.publish_send_max_ = p.val();
1728 [&](property::maximum_packet_size
const& p) {
1729 BOOST_ASSERT(p.val() != 0);
1730 ep.maximum_packet_size_send_ = p.val();
1732 [&](property::session_expiry_interval
const& p) {
1734 ep.need_store_ =
true;
1743 [&](v3_1_1::connack_packet& p) {
1744 if (p.code() == connect_return_code::accepted) {
1745 ep.status_ = connection_status::connected;
1746 if (p.session_present()) {
1755 [&](v5::connack_packet& p) {
1756 if (p.code() == connect_reason_code::success) {
1757 ep.status_ = connection_status::connected;
1759 for (
auto const& prop : p.props()) {
1762 [&](property::topic_alias_maximum
const& p) {
1764 ep.topic_alias_send_.emplace(p.val());
1767 [&](property::receive_maximum
const& p) {
1768 BOOST_ASSERT(p.val() != 0);
1769 ep.publish_send_max_ = p.val();
1771 [&](property::maximum_packet_size
const& p) {
1772 BOOST_ASSERT(p.val() != 0);
1773 ep.maximum_packet_size_send_ = p.val();
1781 if (p.session_present()) {
1790 [&](v3_1_1::basic_publish_packet<PacketIdBytes>& p) {
1791 switch (p.opts().get_qos()) {
1792 case qos::at_least_once: {
1793 if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
1795 v3_1_1::basic_puback_packet<PacketIdBytes>(p.packet_id()),
1796 [](system_error
const&){}
1800 case qos::exactly_once:
1801 call_complete = process_qos2_publish(self, protocol_version::v3_1_1, p.packet_id());
1807 [&](v5::basic_publish_packet<PacketIdBytes>& p) {
1808 switch (p.opts().get_qos()) {
1809 case qos::at_least_once: {
1810 if (ep.publish_recv_.size() == ep.publish_recv_max_) {
1812 decided_error.emplace(
1815 "receive maximum exceeded"
1820 v5::disconnect_packet{
1821 disconnect_reason_code::receive_maximum_exceeded
1827 auto packet_id = p.packet_id();
1828 ep.publish_recv_.insert(packet_id);
1829 if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
1831 v5::basic_puback_packet<PacketIdBytes>{packet_id},
1832 [](system_error
const&){}
1836 case qos::exactly_once: {
1837 if (ep.publish_recv_.size() == ep.publish_recv_max_) {
1839 decided_error.emplace(
1842 "receive maximum exceeded"
1847 v5::disconnect_packet{
1848 disconnect_reason_code::receive_maximum_exceeded
1854 auto packet_id = p.packet_id();
1855 ep.publish_recv_.insert(packet_id);
1856 call_complete = process_qos2_publish(self, protocol_version::v5, packet_id);
1862 if (p.topic().empty()) {
1863 if (
auto ta_opt = get_topic_alias(p.props())) {
1866 *ta_opt > ep.topic_alias_recv_->max()) {
1868 decided_error.emplace(
1871 "topic alias invalid"
1876 v5::disconnect_packet{
1877 disconnect_reason_code::topic_alias_invalid
1884 auto topic = ep.topic_alias_recv_->find(*ta_opt);
1885 if (topic.empty()) {
1886 ASYNC_MQTT_LOG(
"mqtt_impl", error)
1887 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1888 <<
"no matching topic alias: "
1891 decided_error.emplace(
1894 "topic alias invalid"
1899 v5::disconnect_packet{
1900 disconnect_reason_code::topic_alias_invalid
1907 p.add_topic(allocate_buffer(topic));
1912 ASYNC_MQTT_LOG(
"mqtt_impl", error)
1913 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1914 <<
"topic is empty but topic_alias isn't set";
1916 decided_error.emplace(
1919 "topic alias invalid"
1924 v5::disconnect_packet{
1925 disconnect_reason_code::topic_alias_invalid
1933 if (
auto ta_opt = get_topic_alias(p.props())) {
1935 *ta_opt > ep.topic_alias_recv_->max()) {
1937 decided_error.emplace(
1940 "topic alias invalid"
1945 v5::disconnect_packet{
1946 disconnect_reason_code::topic_alias_invalid
1954 if (ep.topic_alias_recv_) {
1955 ep.topic_alias_recv_->insert_or_update(p.topic(), *ta_opt);
1961 [&](v3_1_1::basic_puback_packet<PacketIdBytes>& p) {
1962 auto packet_id = p.packet_id();
1963 if (ep.pid_puback_.erase(packet_id)) {
1964 ep.store_.erase(response_packet::v3_1_1_puback, packet_id);
1965 ep.release_pid(packet_id);
1968 ASYNC_MQTT_LOG(
"mqtt_impl", info)
1969 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1970 <<
"invalid packet_id puback received packet_id:" << packet_id;
1972 decided_error.emplace(
1980 v5::disconnect_packet{
1981 disconnect_reason_code::topic_alias_invalid
1988 [&](v5::basic_puback_packet<PacketIdBytes>& p) {
1989 auto packet_id = p.packet_id();
1990 if (ep.pid_puback_.erase(packet_id)) {
1991 ep.store_.erase(response_packet::v5_puback, packet_id);
1992 ep.release_pid(packet_id);
1993 --ep.publish_send_count_;
1994 send_publish_from_queue();
1997 ASYNC_MQTT_LOG(
"mqtt_impl", info)
1998 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1999 <<
"invalid packet_id puback received packet_id:" << packet_id;
2001 decided_error.emplace(
2009 v5::disconnect_packet{
2010 disconnect_reason_code::topic_alias_invalid
2017 [&](v3_1_1::basic_pubrec_packet<PacketIdBytes>& p) {
2018 auto packet_id = p.packet_id();
2019 if (ep.pid_pubrec_.erase(packet_id)) {
2020 ep.store_.erase(response_packet::v3_1_1_pubrec, packet_id);
2021 if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
2023 v3_1_1::basic_pubrel_packet<PacketIdBytes>(packet_id),
2024 [](system_error
const&){}
2029 ASYNC_MQTT_LOG(
"mqtt_impl", info)
2030 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2031 <<
"invalid packet_id pubrec received packet_id:" << packet_id;
2033 decided_error.emplace(
2041 v5::disconnect_packet{
2042 disconnect_reason_code::topic_alias_invalid
2049 [&](v5::basic_pubrec_packet<PacketIdBytes>& p) {
2050 auto packet_id = p.packet_id();
2051 if (ep.pid_pubrec_.erase(packet_id)) {
2052 ep.store_.erase(response_packet::v5_pubrec, packet_id);
2053 if (is_error(p.code())) {
2054 ep.release_pid(packet_id);
2055 ep.qos2_publish_processing_.erase(packet_id);
2056 --ep.publish_send_count_;
2057 send_publish_from_queue();
2059 else if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
2061 v5::basic_pubrel_packet<PacketIdBytes>(packet_id),
2062 [](system_error
const&){}
2067 ASYNC_MQTT_LOG(
"mqtt_impl", info)
2068 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2069 <<
"invalid packet_id pubrec received packet_id:" << packet_id;
2071 decided_error.emplace(
2079 v5::disconnect_packet{
2080 disconnect_reason_code::topic_alias_invalid
2087 [&](v3_1_1::basic_pubrel_packet<PacketIdBytes>& p) {
2088 auto packet_id = p.packet_id();
2089 ep.qos2_publish_handled_.erase(packet_id);
2090 if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
2092 v3_1_1::basic_pubcomp_packet<PacketIdBytes>(packet_id),
2093 [](system_error
const&){}
2097 [&](v5::basic_pubrel_packet<PacketIdBytes>& p) {
2098 auto packet_id = p.packet_id();
2099 ep.qos2_publish_handled_.erase(packet_id);
2100 if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
2102 v5::basic_pubcomp_packet<PacketIdBytes>(packet_id),
2103 [](system_error
const&){}
2107 [&](v3_1_1::basic_pubcomp_packet<PacketIdBytes>& p) {
2108 auto packet_id = p.packet_id();
2109 if (ep.pid_pubcomp_.erase(packet_id)) {
2110 ep.store_.erase(response_packet::v3_1_1_pubcomp, packet_id);
2111 ep.release_pid(packet_id);
2112 ep.qos2_publish_processing_.erase(packet_id);
2113 --ep.publish_send_count_;
2114 send_publish_from_queue();
2117 ASYNC_MQTT_LOG(
"mqtt_impl", info)
2118 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2119 <<
"invalid packet_id pubcomp received packet_id:" << packet_id;
2121 decided_error.emplace(
2129 v5::disconnect_packet{
2130 disconnect_reason_code::topic_alias_invalid
2137 [&](v5::basic_pubcomp_packet<PacketIdBytes>& p) {
2138 auto packet_id = p.packet_id();
2139 if (ep.pid_pubcomp_.erase(packet_id)) {
2140 ep.store_.erase(response_packet::v5_pubcomp, packet_id);
2141 ep.release_pid(packet_id);
2142 ep.qos2_publish_processing_.erase(packet_id);
2145 ASYNC_MQTT_LOG(
"mqtt_impl", info)
2146 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2147 <<
"invalid packet_id pubcomp received packet_id:" << packet_id;
2149 decided_error.emplace(
2157 v5::disconnect_packet{
2158 disconnect_reason_code::topic_alias_invalid
2165 [&](v3_1_1::basic_subscribe_packet<PacketIdBytes>&) {
2167 [&](v5::basic_subscribe_packet<PacketIdBytes>&) {
2169 [&](v3_1_1::basic_suback_packet<PacketIdBytes>& p) {
2170 auto packet_id = p.packet_id();
2171 if (ep.pid_suback_.erase(packet_id)) {
2172 ep.release_pid(packet_id);
2175 [&](v5::basic_suback_packet<PacketIdBytes>& p) {
2176 auto packet_id = p.packet_id();
2177 if (ep.pid_suback_.erase(packet_id)) {
2178 ep.release_pid(packet_id);
2181 [&](v3_1_1::basic_unsubscribe_packet<PacketIdBytes>&) {
2183 [&](v5::basic_unsubscribe_packet<PacketIdBytes>&) {
2185 [&](v3_1_1::basic_unsuback_packet<PacketIdBytes>& p) {
2186 auto packet_id = p.packet_id();
2187 if (ep.pid_unsuback_.erase(packet_id)) {
2188 ep.release_pid(packet_id);
2191 [&](v5::basic_unsuback_packet<PacketIdBytes>& p) {
2192 auto packet_id = p.packet_id();
2193 if (ep.pid_unsuback_.erase(packet_id)) {
2194 ep.release_pid(packet_id);
2197 [&](v3_1_1::pingreq_packet&) {
2198 if constexpr(can_send_as_server(Role)) {
2199 if (ep.auto_ping_response_ && ep.status_ == connection_status::connected) {
2201 v3_1_1::pingresp_packet(),
2202 [](system_error
const&){}
2207 [&](v5::pingreq_packet&) {
2208 if constexpr(can_send_as_server(Role)) {
2209 if (ep.auto_ping_response_ && ep.status_ == connection_status::connected) {
2211 v5::pingresp_packet(),
2212 [](system_error
const&){}
2217 [&](v3_1_1::pingresp_packet&) {
2218 ep.tim_pingresp_recv_->cancel();
2220 [&](v5::pingresp_packet&) {
2221 ep.tim_pingresp_recv_->cancel();
2223 [&](v3_1_1::disconnect_packet&) {
2224 ep.status_ = connection_status::disconnecting;
2226 [&](v5::disconnect_packet&) {
2227 ep.status_ = connection_status::disconnecting;
2229 [&](v5::auth_packet&) {
2231 [&](system_error&) {
2232 ep.status_ = connection_status::closed;
2236 ep.reset_pingreq_recv_timer();
2237 ep.recv_processing_ =
false;
2241 if (call_complete && !decided_error) {
2242 pv_opt.emplace(force_move(v));
2244 bind_dispatch(force_move(self));
2249 if (
auto type_opt = v.type()) {
2250 if ((*fil == filter::match && types.find(*type_opt) == types.end()) ||
2251 (*fil == filter::except && types.find(*type_opt) != types.end())
2258 a_ep.stream_->raw_strand(),
2280 a_ep.stream_->raw_strand(),
2286 BOOST_ASSERT(decided_error);
2287 ep.recv_processing_ =
false;
2288 ASYNC_MQTT_LOG(
"mqtt_impl", info)
2289 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2290 <<
"recv code triggers close:" << decided_error->what();
2291 pv_opt.emplace(force_move(*decided_error));
2293 bind_dispatch(force_move(self));
2296 BOOST_ASSERT(pv_opt);
2297 self.complete(force_move(*pv_opt));
2300 BOOST_ASSERT(
false);
2305 template <
typename Self>
2310 BOOST_ASSERT(state == disconnect);
2315 a_ep.stream_->raw_strand(),
2321 void send_publish_from_queue() {
2322 BOOST_ASSERT(ep.in_strand());
2323 if (ep.status_ != connection_status::connected)
return;
2324 while (!ep.publish_queue_.empty() &&
2325 ep.publish_send_count_ != ep.publish_send_max_) {
2327 force_move(ep.publish_queue_.front()),
2329 [](system_error
const&){}
2331 ep.publish_queue_.pop_front();
2335 template <
typename Self>
2336 bool process_qos2_publish(
2338 protocol_version ver,
2341 BOOST_ASSERT(ep.in_strand());
2342 bool already_handled =
false;
2343 if (ep.qos2_publish_handled_.find(packet_id) == ep.qos2_publish_handled_.end()) {
2344 ep.qos2_publish_handled_.emplace(packet_id);
2347 already_handled =
true;
2349 if (ep.status_ == connection_status::connected &&
2350 (ep.auto_pub_response_ ||
2355 case protocol_version::v3_1_1:
2357 v3_1_1::basic_pubrec_packet<PacketIdBytes>(packet_id),
2358 [](system_error
const&){}
2361 case protocol_version::v5:
2363 v5::basic_pubrec_packet<PacketIdBytes>(packet_id),
2364 [](system_error
const&){}
2368 BOOST_ASSERT(
false);
2372 if (already_handled) {
2375 a_ep.stream_->read_packet(force_move(self));
2384 enum { dispatch, close, bind, complete } state = dispatch;
2385 this_type_sp life_keeper = ep.shared_from_this();
2387 template <
typename Self>
2390 error_code
const& = error_code{}
2398 a_ep.stream_->raw_strand(),
2404 BOOST_ASSERT(ep.in_strand());
2405 switch (ep.status_) {
2406 case connection_status::connecting:
2407 case connection_status::connected:
2408 case connection_status::disconnecting: {
2409 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
2410 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2411 <<
"close initiate status:" <<
static_cast<int>(ep.status_);
2413 ep.status_ = connection_status::closing;
2415 a_ep.stream_->close(force_move(self));
2417 case connection_status::closing: {
2418 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
2419 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2420 <<
"already close requested";
2422 auto exe = as::get_associated_executor(self);
2423 a_ep.close_queue_.post(
2430 case connection_status::closed:
2431 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
2432 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2433 <<
"already closed";
2435 bind_dispatch(force_move(self));
2438 BOOST_ASSERT(ep.in_strand());
2439 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
2440 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2441 <<
"close complete status:" <<
static_cast<int>(ep.status_);
2443 a_ep.tim_pingreq_send_->cancel();
2444 a_ep.tim_pingreq_recv_->cancel();
2445 a_ep.tim_pingresp_recv_->cancel();
2446 a_ep.status_ = connection_status::closed;
2447 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
2448 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2449 <<
"process enqueued close";
2450 a_ep.close_queue_.poll();
2453 bind_dispatch(force_move(self));
2462 struct restore_packets_impl {
2464 std::vector<basic_store_packet_variant<PacketIdBytes>> pvs;
2465 enum { dispatch, restore, complete } state = dispatch;
2467 template <
typename Self>
2477 a_ep.stream_->raw_strand(),
2483 BOOST_ASSERT(ep.in_strand());
2484 ep.restore_packets(force_move(pvs));
2486 bind_dispatch(force_move(self));
2495 struct get_stored_packets_impl {
2496 this_type
const& ep;
2497 std::vector<basic_store_packet_variant<PacketIdBytes>> packets = {};
2498 enum { dispatch, get, complete } state = dispatch;
2500 template <
typename Self>
2510 a_ep.stream_->raw_strand(),
2516 BOOST_ASSERT(ep.in_strand());
2517 packets = ep.get_stored_packets();
2519 bind_dispatch(force_move(self));
2522 self.complete(force_move(packets));
2528 struct regulate_for_store_impl {
2529 this_type
const& ep;
2530 v5::basic_publish_packet<PacketIdBytes> packet;
2531 enum { dispatch, regulate, complete } state = dispatch;
2533 template <
typename Self>
2543 a_ep.stream_->raw_strand(),
2549 BOOST_ASSERT(ep.in_strand());
2550 ep.regulate_for_store(packet);
2552 bind_dispatch(force_move(self));
2555 self.complete(force_move(packet));
2563 template <
typename Packet,
typename CompletionToken>
2568 CompletionToken&& token
2584 bool enqueue_publish(v5::basic_publish_packet<PacketIdBytes>& packet) {
2586 if (packet.opts().get_qos() == qos::at_least_once ||
2587 packet.opts().get_qos() == qos::exactly_once
2589 if (publish_send_count_ == publish_send_max_) {
2590 publish_queue_.push_back(force_move(packet));
2594 ++publish_send_count_;
2595 if (!publish_queue_.empty()) {
2596 publish_queue_.push_back(force_move(packet));
2604 void send_stored() {
2607 [&](basic_store_packet_variant<PacketIdBytes>
const& pv) {
2608 if (pv.size() > maximum_packet_size_send_) {
2609 release_pid(pv.packet_id());
2616 [&](v3_1_1::basic_publish_packet<PacketIdBytes> p) {
2619 [](system_error const&){}
2622 [&](v5::basic_publish_packet<PacketIdBytes> p) {
2623 if (enqueue_publish(p)) return;
2626 [](system_error const&){}
2629 [&](v3_1_1::basic_pubrel_packet<PacketIdBytes> p) {
2632 [](system_error
const&){}
2635 [&](v5::basic_pubrel_packet<PacketIdBytes> p) {
2638 [](system_error
const&){}
2649 BOOST_ASSERT(in_strand());
2650 publish_send_count_ = 0;
2651 publish_queue_.clear();
2652 topic_alias_send_ = nullopt;
2653 topic_alias_recv_ = nullopt;
2654 publish_recv_.clear();
2655 qos2_publish_processing_.clear();
2656 need_store_ =
false;
2657 pid_suback_.clear();
2658 pid_unsuback_.clear();
2659 pid_puback_.clear();
2660 pid_pubrec_.clear();
2661 pid_pubcomp_.clear();
2664 void reset_pingreq_send_timer() {
2665 BOOST_ASSERT(in_strand());
2666 if (pingreq_send_interval_ms_) {
2667 tim_pingreq_send_->cancel();
2668 if (status_ == connection_status::disconnecting ||
2669 status_ == connection_status::closing ||
2670 status_ == connection_status::closed)
return;
2671 tim_pingreq_send_->expires_after(
2672 std::chrono::milliseconds{*pingreq_send_interval_ms_}
2674 tim_pingreq_send_->async_wait(
2675 [
this, wp = std::weak_ptr{tim_pingreq_send_}](error_code
const& ec) {
2677 if (
auto sp = wp.lock()) {
2678 switch (protocol_version_) {
2679 case protocol_version::v3_1_1:
2681 v3_1_1::pingreq_packet(),
2682 [](system_error
const&){}
2685 case protocol_version::v5:
2687 v5::pingreq_packet(),
2688 [](system_error
const&){}
2692 BOOST_ASSERT(
false);
2702 void reset_pingreq_recv_timer() {
2703 BOOST_ASSERT(in_strand());
2704 if (pingreq_recv_timeout_ms_) {
2705 tim_pingreq_recv_->cancel();
2706 if (status_ == connection_status::disconnecting ||
2707 status_ == connection_status::closing ||
2708 status_ == connection_status::closed)
return;
2709 tim_pingreq_recv_->expires_after(
2710 std::chrono::milliseconds{*pingreq_recv_timeout_ms_}
2712 tim_pingreq_recv_->async_wait(
2713 [
this, wp = std::weak_ptr{tim_pingreq_recv_}](error_code
const& ec) {
2715 if (
auto sp = wp.lock()) {
2716 switch (protocol_version_) {
2717 case protocol_version::v3_1_1:
2718 ASYNC_MQTT_LOG(
"mqtt_impl", error)
2719 << ASYNC_MQTT_ADD_VALUE(address,
this)
2720 <<
"pingreq recv timeout. close.";
2725 case protocol_version::v5:
2726 ASYNC_MQTT_LOG(
"mqtt_impl", error)
2727 << ASYNC_MQTT_ADD_VALUE(address,
this)
2728 <<
"pingreq recv timeout. close.";
2730 v5::disconnect_packet{
2731 disconnect_reason_code::keep_alive_timeout,
2734 [
this](system_error
const&){
2742 BOOST_ASSERT(
false);
2752 void reset_pingresp_recv_timer() {
2753 BOOST_ASSERT(in_strand());
2754 if (pingresp_recv_timeout_ms_) {
2755 tim_pingresp_recv_->cancel();
2756 if (status_ == connection_status::disconnecting ||
2757 status_ == connection_status::closing ||
2758 status_ == connection_status::closed)
return;
2759 tim_pingresp_recv_->expires_after(
2760 std::chrono::milliseconds{*pingresp_recv_timeout_ms_}
2762 tim_pingresp_recv_->async_wait(
2763 [
this, wp = std::weak_ptr{tim_pingresp_recv_}](error_code
const& ec) {
2765 if (
auto sp = wp.lock()) {
2766 switch (protocol_version_) {
2767 case protocol_version::v3_1_1:
2768 ASYNC_MQTT_LOG(
"mqtt_impl", error)
2769 << ASYNC_MQTT_ADD_VALUE(address,
this)
2770 <<
"pingresp recv timeout. close.";
2775 case protocol_version::v5:
2776 ASYNC_MQTT_LOG(
"mqtt_impl", error)
2777 << ASYNC_MQTT_ADD_VALUE(address,
this)
2778 <<
"pingresp recv timeout. close.";
2779 if (status_ == connection_status::connected) {
2781 v5::disconnect_packet{
2782 disconnect_reason_code::keep_alive_timeout,
2785 [
this](system_error
const&){
2799 BOOST_ASSERT(
false);
2809 template <
typename CompletionToken>
2811 CompletionToken&& token
2813 auto tim = std::make_shared<as::steady_timer>(stream_->raw_strand());
2814 tim->expires_at(std::chrono::steady_clock::time_point::max());
2817 stream_->raw_strand(),
2818 std::forward<CompletionToken>(token)
2821 tim_retry_acq_pid_queue_.emplace_back(force_move(tim));
2824 void notify_retry_one() {
2825 for (
auto it = tim_retry_acq_pid_queue_.begin();
2826 it != tim_retry_acq_pid_queue_.end();
2829 if (it->cancelled)
continue;
2831 it->cancelled =
true;
2836 void complete_retry_one() {
2837 if (!tim_retry_acq_pid_queue_.empty()) {
2838 tim_retry_acq_pid_queue_.pop_front();
2842 void notify_retry_all() {
2843 tim_retry_acq_pid_queue_.clear();
2846 bool has_retry()
const {
2847 return !tim_retry_acq_pid_queue_.empty();
2850 void clear_pid_man() {
2855 void release_pid(packet_id_t pid) {
2856 pid_man_.release_id(pid);
2862 std::shared_ptr<stream_type> stream_;
2863 packet_id_manager<packet_id_t> pid_man_;
2864 std::set<packet_id_t> pid_suback_;
2865 std::set<packet_id_t> pid_unsuback_;
2866 std::set<packet_id_t> pid_puback_;
2867 std::set<packet_id_t> pid_pubrec_;
2868 std::set<packet_id_t> pid_pubcomp_;
2870 bool need_store_ =
false;
2871 store<PacketIdBytes, as::strand<as::any_io_executor>> store_{stream_->raw_strand()};
2873 bool auto_pub_response_ =
false;
2874 bool auto_ping_response_ =
false;
2876 bool auto_map_topic_alias_send_ =
false;
2877 bool auto_replace_topic_alias_send_ =
false;
2878 optional<topic_alias_send> topic_alias_send_;
2879 optional<topic_alias_recv> topic_alias_recv_;
2881 receive_maximum_t publish_send_max_{receive_maximum_max};
2882 receive_maximum_t publish_recv_max_{receive_maximum_max};
2883 receive_maximum_t publish_send_count_{0};
2885 std::set<packet_id_t> publish_recv_;
2886 std::deque<v5::basic_publish_packet<PacketIdBytes>> publish_queue_;
2888 ioc_queue close_queue_;
2890 std::uint32_t maximum_packet_size_send_{packet_size_no_limit};
2891 std::uint32_t maximum_packet_size_recv_{packet_size_no_limit};
2893 connection_status status_{connection_status::closed};
2895 optional<std::size_t> pingreq_send_interval_ms_;
2896 optional<std::size_t> pingreq_recv_timeout_ms_;
2897 optional<std::size_t> pingresp_recv_timeout_ms_;
2899 std::shared_ptr<as::steady_timer> tim_pingreq_send_{std::make_shared<as::steady_timer>(stream_->raw_strand())};
2900 std::shared_ptr<as::steady_timer> tim_pingreq_recv_{std::make_shared<as::steady_timer>(stream_->raw_strand())};
2901 std::shared_ptr<as::steady_timer> tim_pingresp_recv_{std::make_shared<as::steady_timer>(stream_->raw_strand())};
2903 std::set<packet_id_t> qos2_publish_handled_;
2905 bool recv_processing_ =
false;
2906 std::set<packet_id_t> qos2_publish_processing_;
2908 struct tim_cancelled {
2910 std::shared_ptr<as::steady_timer> tim,
2911 bool cancelled =
false
2912 ):tim{force_move(tim)}, cancelled{cancelled}
2914 std::shared_ptr<as::steady_timer> tim;
2917 std::deque<tim_cancelled> tim_retry_acq_pid_queue_;