async_mqtt 4.1.0
Loading...
Searching...
No Matches
endpoint.hpp
Go to the documentation of this file.
1// Copyright Takatoshi Kondo 2022
2//
3// Distributed under the Boost Software License, Version 1.0.
4// (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7#if !defined(ASYNC_MQTT_ENDPOINT_HPP)
8#define ASYNC_MQTT_ENDPOINT_HPP
9
10#include <set>
11#include <deque>
12#include <atomic>
13
14#include <async_mqtt/packet/packet_variant.hpp>
15#include <async_mqtt/util/value_allocator.hpp>
16#include <async_mqtt/util/make_shared_helper.hpp>
17#include <async_mqtt/stream.hpp>
18#include <async_mqtt/store.hpp>
19#include <async_mqtt/log.hpp>
20#include <async_mqtt/topic_alias_send.hpp>
21#include <async_mqtt/topic_alias_recv.hpp>
22#include <async_mqtt/packet_id_manager.hpp>
24#include <async_mqtt/buffer_to_packet_variant.hpp>
25#include <async_mqtt/packet/packet_traits.hpp>
26
28
29namespace async_mqtt {
30
34enum class role {
35 client = 0b01,
36 server = 0b10,
38 any = 0b11,
39};
40
44enum class filter {
45 match,
46 except
47};
48
49template <typename Self>
50auto bind_dispatch(Self&& self) {
51 auto exe = as::get_associated_executor(self);
52 return as::dispatch(
53 as::bind_executor(
54 exe,
55 std::forward<Self>(self)
56 )
57 );
58}
59
66template <role Role, std::size_t PacketIdBytes, typename NextLayer>
67class basic_endpoint : public std::enable_shared_from_this<basic_endpoint<Role, PacketIdBytes, NextLayer>>{
68
69 enum class connection_status {
70 connecting,
71 connected,
72 disconnecting,
73 closing,
74 closed
75 };
76
77 static constexpr bool can_send_as_client(role r) {
78 return static_cast<int>(r) & static_cast<int>(role::client);
79 }
80
81 static constexpr bool can_send_as_server(role r) {
82 return static_cast<int>(r) & static_cast<int>(role::server);
83 }
84
85 static inline optional<topic_alias_t> get_topic_alias(properties const& props) {
87 for (auto const& prop : props) {
88 prop.visit(
89 overload {
90 [&](property::topic_alias const& p) {
91 ta_opt.emplace(p.val());
92 },
93 [](auto const&) {
94 }
95 }
96 );
97 if (ta_opt) return ta_opt;
98 }
99 return ta_opt;
100 }
101
103 using this_type_sp = std::shared_ptr<this_type>;
104 using this_type_wp = std::weak_ptr<this_type>;
105 using stream_type =
106 stream<
108 >;
109
110 template <typename T>
111 friend class make_shared_helper;
112
113public:
117 using strand_type = typename stream_type::strand_type;
119 static constexpr std::size_t packet_id_bytes = PacketIdBytes;
120
123
125 using packet_id_t = typename packet_id_type<PacketIdBytes>::type;
126
137 template <typename... Args>
138 static std::shared_ptr<this_type> create(
140 Args&&... args
141 ) {
142 return make_shared_helper<this_type>::make_shared(ver, std::forward<Args>(args)...);
143 }
144
146 ASYNC_MQTT_LOG("mqtt_impl", trace)
147 << ASYNC_MQTT_ADD_VALUE(address, this)
148 << "destroy";
149 }
150
151 basic_endpoint(this_type const&) = delete;
152 basic_endpoint(this_type&&) = delete;
153 this_type& operator=(this_type const&) = delete;
154 this_type& operator=(this_type&&) = delete;
155
160 strand_type const& strand() const {
161 return stream_->strand();
162 }
163
169 return stream_->strand();
170 }
171
176 bool in_strand() const {
177 return stream_->in_strand();
178 }
179
185 return stream_->next_layer();
186 }
192 return stream_->next_layer();
193 }
194
199 auto const& lowest_layer() const {
200 return stream_->lowest_layer();
201 }
206 auto& lowest_layer() {
207 return stream_->lowest_layer();
208 }
209
215 void set_auto_pub_response(bool val) {
216 ASYNC_MQTT_LOG("mqtt_api", info)
217 << ASYNC_MQTT_ADD_VALUE(address, this)
218 << "set_auto_pub_response val:" << val;
219 auto_pub_response_ = val;
220 }
221
227 void set_auto_ping_response(bool val) {
228 ASYNC_MQTT_LOG("mqtt_api", info)
229 << ASYNC_MQTT_ADD_VALUE(address, this)
230 << "set_auto_ping_response val:" << val;
231 auto_ping_response_ = val;
232 }
233
242 ASYNC_MQTT_LOG("mqtt_api", info)
243 << ASYNC_MQTT_ADD_VALUE(address, this)
244 << "set_auto_map_topic_alias_send val:" << val;
245 auto_map_topic_alias_send_ = val;
246 }
247
256 ASYNC_MQTT_LOG("mqtt_api", info)
257 << ASYNC_MQTT_ADD_VALUE(address, this)
258 << "set_auto_replace_topic_alias_send val:" << val;
259 auto_replace_topic_alias_send_ = val;
260 }
261
272 if (ms == 0) {
273 pingresp_recv_timeout_ms_ = nullopt;
274 }
275 else {
276 pingresp_recv_timeout_ms_.emplace(ms);
277 }
278 }
279
280 // async functions
281
287 template <typename CompletionToken>
288 auto
291 ) {
292 ASYNC_MQTT_LOG("mqtt_api", info)
293 << ASYNC_MQTT_ADD_VALUE(address, this)
294 << "acquire_unique_packet_id";
295 return
296 as::async_compose<
299 >(
300 acquire_unique_packet_id_impl{
301 *this
302 },
303 token
304 );
305 }
306
313 template <typename CompletionToken>
314 auto
317 ) {
318 ASYNC_MQTT_LOG("mqtt_api", info)
319 << ASYNC_MQTT_ADD_VALUE(address, this)
320 << "acquire_unique_packet_id_wait_until";
321 return
322 as::async_compose<
325 >(
326 acquire_unique_packet_id_wait_until_impl{
327 *this
328 },
329 token
330 );
331 }
332
339 template <typename CompletionToken>
340 auto
342 packet_id_t packet_id,
344 ) {
345 ASYNC_MQTT_LOG("mqtt_api", info)
346 << ASYNC_MQTT_ADD_VALUE(address, this)
347 << "register_packet_id pid:" << packet_id;
348 return
349 as::async_compose<
351 void(bool)
352 >(
353 register_packet_id_impl{
354 *this,
355 packet_id
356 },
357 token
358 );
359 }
360
367 template <typename CompletionToken>
368 auto
370 packet_id_t packet_id,
372 ) {
373 ASYNC_MQTT_LOG("mqtt_api", info)
374 << ASYNC_MQTT_ADD_VALUE(address, this)
375 << "release_packet_id pid:" << packet_id;
376 return
377 as::async_compose<
379 void()
380 >(
381 release_packet_id_impl{
382 *this,
383 packet_id
384 },
385 token
386 );
387 }
388
396 template <typename Packet, typename CompletionToken>
397 auto
399 Packet packet,
401 ) {
402 ASYNC_MQTT_LOG("mqtt_api", info)
403 << ASYNC_MQTT_ADD_VALUE(address, this)
404 << "send:" << packet;
405 if constexpr(!std::is_same_v<Packet, basic_packet_variant<PacketIdBytes>>) {
406 static_assert(
407 (can_send_as_client(Role) && is_client_sendable<std::decay_t<Packet>>()) ||
408 (can_send_as_server(Role) && is_server_sendable<std::decay_t<Packet>>()),
409 "Packet cannot be send by MQTT protocol"
410 );
411 }
412
413 return
414 send(
415 force_move(packet),
416 false, // not from queue
417 std::forward<CompletionToken>(token)
418 );
419 }
420
427 template <typename CompletionToken>
428 auto
431 ) {
432 ASYNC_MQTT_LOG("mqtt_api", info)
433 << ASYNC_MQTT_ADD_VALUE(address, this)
434 << "recv";
435 BOOST_ASSERT(!recv_processing_);
436 recv_processing_ = true;
437 return
438 as::async_compose<
441 >(
442 recv_impl{
443 *this
444 },
445 token
446 );
447 }
448
458 template <typename CompletionToken>
459 auto
461 std::set<control_packet_type> types,
463 ) {
464 ASYNC_MQTT_LOG("mqtt_api", info)
465 << ASYNC_MQTT_ADD_VALUE(address, this)
466 << "recv";
467 BOOST_ASSERT(!recv_processing_);
468 recv_processing_ = true;
469 return
470 as::async_compose<
473 >(
474 recv_impl{
475 *this,
476 filter::match,
477 force_move(types)
478 },
479 token
480 );
481 }
482
493 template <typename CompletionToken>
494 auto
496 filter fil,
497 std::set<control_packet_type> types,
499 ) {
500 ASYNC_MQTT_LOG("mqtt_api", info)
501 << ASYNC_MQTT_ADD_VALUE(address, this)
502 << "recv";
503 BOOST_ASSERT(!recv_processing_);
504 recv_processing_ = true;
505 return
506 as::async_compose<
509 >(
510 recv_impl{
511 *this,
512 fil,
513 force_move(types)
514 },
515 token
516 );
517 }
518
524 template<typename CompletionToken>
525 auto
527 ASYNC_MQTT_LOG("mqtt_api", info)
528 << ASYNC_MQTT_ADD_VALUE(address, this)
529 << "close";
530 return
531 as::async_compose<
533 void()
534 >(
535 close_impl{
536 *this
537 },
538 token
539 );
540 }
541
549 template <typename CompletionToken>
550 auto
554 ) {
555 ASYNC_MQTT_LOG("mqtt_api", info)
556 << ASYNC_MQTT_ADD_VALUE(address, this)
557 << "restore_packets";
558 return
559 as::async_compose<
561 void()
562 >(
563 restore_packets_impl{
564 *this,
565 force_move(pvs)
566 },
567 token
568 );
569 }
570
580 template <typename CompletionToken>
581 auto
584 ) const {
585 ASYNC_MQTT_LOG("mqtt_api", info)
586 << ASYNC_MQTT_ADD_VALUE(address, this)
587 << "get_stored_packets";
588 return
589 as::async_compose<
592 >(
593 get_stored_packets_impl{
594 *this
595 },
596 token
597 );
598 }
599
600 template <typename CompletionToken>
601 auto
602 regulate_for_store(
603 v5::basic_publish_packet<PacketIdBytes> packet,
605 ) const {
606 ASYNC_MQTT_LOG("mqtt_api", info)
607 << ASYNC_MQTT_ADD_VALUE(address, this)
608 << "regulate_for_store:" << packet;
609 return
610 as::async_compose<
612 void(v5::basic_publish_packet<PacketIdBytes>)
613 >(
614 regulate_for_store_impl{
615 *this,
616 force_move(packet)
617 },
618 token
619 );
620 }
621
622 // sync APIs that require working on strand
623
631 auto pid = pid_man_.acquire_unique_id();
632 if (pid) {
633 ASYNC_MQTT_LOG("mqtt_api", info)
634 << ASYNC_MQTT_ADD_VALUE(address, this)
635 << "acquire_unique_packet_id:" << *pid;
636 }
637 else {
638 ASYNC_MQTT_LOG("mqtt_api", info)
639 << ASYNC_MQTT_ADD_VALUE(address, this)
640 << "acquire_unique_packet_id:full";
641 }
642 return pid;
643 }
644
653 auto ret = pid_man_.register_id(pid);
654 ASYNC_MQTT_LOG("mqtt_api", info)
655 << ASYNC_MQTT_ADD_VALUE(address, this)
656 << "register_packet_id:" << pid << " result:" << ret;
657 return ret;
658 }
659
667 ASYNC_MQTT_LOG("mqtt_api", info)
668 << ASYNC_MQTT_ADD_VALUE(address, this)
669 << "release_packet_id:" << pid;
670 release_pid(pid);
671 }
672
679 std::set<packet_id_t> get_qos2_publish_handled_pids() const {
681 ASYNC_MQTT_LOG("mqtt_api", info)
682 << ASYNC_MQTT_ADD_VALUE(address, this)
683 << "get_qos2_publish_handled_pids";
684 return qos2_publish_handled_;
685 }
686
693 void restore_qos2_publish_handled_pids(std::set<packet_id_t> pids) {
695 ASYNC_MQTT_LOG("mqtt_api", info)
696 << ASYNC_MQTT_ADD_VALUE(address, this)
697 << "restore_qos2_publish_handled_pids";
698 qos2_publish_handled_ = force_move(pids);
699 }
700
709 ) {
711 ASYNC_MQTT_LOG("mqtt_api", info)
712 << ASYNC_MQTT_ADD_VALUE(address, this)
713 << "restore_packets";
714 for (auto& pv : pvs) {
715 pv.visit(
716 [&](auto& p) {
717 if (pid_man_.register_id(p.packet_id())) {
718 store_.add(force_move(p));
719 }
720 else {
721 ASYNC_MQTT_LOG("mqtt_impl", error)
722 << ASYNC_MQTT_ADD_VALUE(address, this)
723 << "packet_id:" << p.packet_id()
724 << " has already been used. Skip it";
725 }
726 }
727 );
728 }
729 }
730
740 std::vector<basic_store_packet_variant<PacketIdBytes>> get_stored_packets() const {
742 ASYNC_MQTT_LOG("mqtt_api", info)
743 << ASYNC_MQTT_ADD_VALUE(address, this)
744 << "get_stored_packets";
745 return store_.get_stored();
746 }
747
755 ASYNC_MQTT_LOG("mqtt_api", info)
756 << ASYNC_MQTT_ADD_VALUE(address, this)
757 << "get_protocol_version:" << protocol_version_;
758 return protocol_version_;
759 }
760
769 ASYNC_MQTT_LOG("mqtt_api", info)
770 << ASYNC_MQTT_ADD_VALUE(address, this)
771 << "is_publish_processing:" << pid;
772 return qos2_publish_processing_.find(pid) != qos2_publish_processing_.end();
773 }
774
782 void regulate_for_store(v5::basic_publish_packet<PacketIdBytes>& packet) const {
784 ASYNC_MQTT_LOG("mqtt_api", info)
785 << ASYNC_MQTT_ADD_VALUE(address, this)
786 << "regulate_for_store:" << packet;
787 if (packet.topic().empty()) {
788 if (auto ta_opt = get_topic_alias(packet.props())) {
789 auto topic = topic_alias_send_->find_without_touch(*ta_opt);
790 if (!topic.empty()) {
791 packet.remove_topic_alias_add_topic(allocate_buffer(topic));
792 }
793 }
794 }
795 else {
796 packet.remove_topic_alias();
797 }
798 }
799
800 void cancel_all_timers_for_test() {
802 tim_pingreq_send_->cancel();
803 tim_pingreq_recv_->cancel();
804 tim_pingresp_recv_->cancel();
805 }
806
807 void set_pingreq_send_interval_ms_for_test(std::size_t ms) {
808 BOOST_ASSERT(in_strand());
809 pingreq_send_interval_ms_ = ms;
810 }
811
812private: // compose operation impl
813
823 template <typename... Args>
824 basic_endpoint(
825 protocol_version ver,
826 Args&&... args
827 ): protocol_version_{ver},
828 stream_{stream_type::create(std::forward<Args>(args)...)}
829 {
830 BOOST_ASSERT(
831 (Role == role::client && ver != protocol_version::undetermined) ||
832 Role != role::client
833 );
834
835 }
836
837 struct acquire_unique_packet_id_impl {
838 this_type& ep;
839 optional<packet_id_t> pid_opt = nullopt;
840 enum { dispatch, acquire, complete } state = dispatch;
841
842 template <typename Self>
843 void operator()(
844 Self& self
845 ) {
846 switch (state) {
847 case dispatch: {
848 state = acquire;
849 auto& a_ep{ep};
850 as::dispatch(
851 as::bind_executor(
852 a_ep.stream_->raw_strand(),
853 force_move(self)
854 )
855 );
856 } break;
857 case acquire: {
858 BOOST_ASSERT(ep.in_strand());
859 pid_opt = ep.pid_man_.acquire_unique_id();
860 state = complete;
861 bind_dispatch(force_move(self));
862 } break;
863 case complete:
864 self.complete(pid_opt);
865 break;
866 }
867 }
868 };
869
870 struct acquire_unique_packet_id_wait_until_impl {
871 this_type& ep;
872 this_type_wp retry_wp = ep.weak_from_this();
873 optional<packet_id_t> pid_opt = nullopt;
874 enum { dispatch, acquire, complete } state = dispatch;
875
876 template <typename Self>
877 void operator()(
878 Self& self,
879 error_code const& ec = error_code{}
880 ) {
881 if (retry_wp.expired()) return;
882 switch (state) {
883 case dispatch: {
884 state = acquire;
885 auto& a_ep{ep};
886 as::dispatch(
887 as::bind_executor(
888 a_ep.stream_->raw_strand(),
889 force_move(self)
890 )
891 );
892 } break;
893 case acquire: {
894 BOOST_ASSERT(ep.in_strand());
895 auto acq_proc =
896 [&] {
897 pid_opt = ep.pid_man_.acquire_unique_id();
898 if (pid_opt) {
899 state = complete;
900 bind_dispatch(force_move(self));
901 }
902 else {
903 ASYNC_MQTT_LOG("mqtt_impl", warning)
904 << ASYNC_MQTT_ADD_VALUE(address, &ep)
905 << "packet_id is fully allocated. waiting release";
906 // infinity timer. cancel is retry trigger.
907 auto& a_ep{ep};
908 a_ep.add_retry(
909 force_move(self)
910 );
911 }
912 };
913
914 if (ec == errc::operation_canceled) {
915 ep.complete_retry_one();
916 acq_proc();
917 }
918 else if (ep.has_retry()) {
919 ASYNC_MQTT_LOG("mqtt_impl", warning)
920 << ASYNC_MQTT_ADD_VALUE(address, &ep)
921 << "packet_id waiter exists. add the end of waiter queue";
922 // infinity timer. cancel is retry trigger.
923 auto& a_ep{ep};
924 a_ep.add_retry(
925 force_move(self)
926 );
927 }
928 else {
929 acq_proc();
930 }
931 } break;
932 case complete:
933 BOOST_ASSERT(pid_opt);
934 self.complete(*pid_opt);
935 break;
936 }
937 }
938 };
939
940 struct register_packet_id_impl {
941 this_type& ep;
942 packet_id_t packet_id;
943 bool result = false;
944 enum { dispatch, regi, complete } state = dispatch;
945
946 template <typename Self>
947 void operator()(
948 Self& self
949 ) {
950 switch (state) {
951 case dispatch: {
952 state = regi;
953 auto& a_ep{ep};
954 as::dispatch(
955 as::bind_executor(
956 a_ep.stream_->raw_strand(),
957 force_move(self)
958 )
959 );
960 } break;
961 case regi: {
962 BOOST_ASSERT(ep.in_strand());
963 result = ep.pid_man_.register_id(packet_id);
964 state = complete;
965 bind_dispatch(force_move(self));
966 } break;
967 case complete:
968 self.complete(result);
969 break;
970 }
971 }
972 };
973
974 struct release_packet_id_impl {
975 this_type& ep;
976 packet_id_t packet_id;
977 enum { dispatch, rel, complete } state = dispatch;
978
979 template <typename Self>
980 void operator()(
981 Self& self
982 ) {
983 switch (state) {
984 case dispatch: {
985 state = rel;
986 auto& a_ep{ep};
987 as::dispatch(
988 as::bind_executor(
989 a_ep.stream_->raw_strand(),
990 force_move(self)
991 )
992 );
993 } break;
994 case rel: {
995 BOOST_ASSERT(ep.in_strand());
996 ep.release_pid(packet_id);
997 state = complete;
998 bind_dispatch(force_move(self));
999 } break;
1000 case complete:
1001 self.complete();
1002 break;
1003 }
1004 }
1005 };
1006
1007
1008 template <typename Packet>
1009 struct send_impl {
1010 this_type& ep;
1011 Packet packet;
1012 bool from_queue = false;
1013 error_code last_ec = error_code{};
1014 enum { dispatch, write, bind, complete } state = dispatch;
1015
1016 template <typename Self>
1017 void operator()(
1018 Self& self,
1019 error_code const& ec = error_code{},
1020 std::size_t /*bytes_transferred*/ = 0
1021 ) {
1022 if (ec) {
1023 ASYNC_MQTT_LOG("mqtt_impl", info)
1024 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1025 << "send error:" << ec.message();
1026 last_ec = ec;
1027 state = complete;
1028 bind_dispatch(force_move(self));
1029 return;
1030 }
1031
1032 switch (state) {
1033 case dispatch: {
1034 state = write;
1035 auto& a_ep{ep};
1036 as::dispatch(
1037 as::bind_executor(
1038 a_ep.stream_->raw_strand(),
1039 force_move(self)
1040 )
1041 );
1042 } break;
1043 case write: {
1044 BOOST_ASSERT(ep.in_strand());
1045 state = bind;
1046 if constexpr(
1047 std::is_same_v<std::decay_t<Packet>, basic_packet_variant<PacketIdBytes>> ||
1048 std::is_same_v<std::decay_t<Packet>, basic_store_packet_variant<PacketIdBytes>>
1049 ) {
1050 packet.visit(
1051 overload {
1052 [&](auto& actual_packet) {
1053 if (process_send_packet(self, actual_packet)) {
1054 auto& a_ep{ep};
1055 a_ep.stream_->write_packet(
1056 force_move(actual_packet),
1057 force_move(self)
1058 );
1059 if constexpr(is_connack<std::remove_reference_t<decltype(actual_packet)>>()) {
1060 // server send stored packets after connack sent
1061 a_ep.send_stored();
1062 }
1063 if constexpr(Role == role::client) {
1064 a_ep.reset_pingreq_send_timer();
1065 }
1066 }
1067 },
1068 [&](system_error&) {}
1069 }
1070 );
1071 }
1072 else {
1073 if (process_send_packet(self, packet)) {
1074 auto& a_ep{ep};
1075 auto& a_packet{packet};
1076 a_ep.stream_->write_packet(
1077 force_move(a_packet),
1078 force_move(self)
1079 );
1080 if constexpr(is_connack<Packet>()) {
1081 // server send stored packets after connack sent
1082 a_ep.send_stored();
1083 }
1084 if constexpr(Role == role::client) {
1085 a_ep.reset_pingreq_send_timer();
1086 }
1087 }
1088 }
1089 } break;
1090 case bind: {
1091 BOOST_ASSERT(ep.in_strand());
1092 last_ec = ec;
1093 state = complete;
1094 bind_dispatch(force_move(self));
1095 } break;
1096 case complete:
1097 // out of strand
1098 self.complete(last_ec);
1099 break;
1100 }
1101 }
1102
1103 template <typename Self, typename ActualPacket>
1104 bool process_send_packet(Self& self, ActualPacket& actual_packet) {
1105 // MQTT protocol sendable packet check
1106 if (
1107 !(
1108 (can_send_as_client(Role) && is_client_sendable<std::decay_t<ActualPacket>>()) ||
1109 (can_send_as_server(Role) && is_server_sendable<std::decay_t<ActualPacket>>())
1110 )
1111 ) {
1112 self.complete(
1113 make_error(
1114 errc::protocol_error,
1115 "packet cannot be send by MQTT protocol"
1116 )
1117 );
1118 return false;
1119 }
1120
1121 auto version_check =
1122 [&] {
1123 if (ep.protocol_version_ == protocol_version::v3_1_1 && is_v3_1_1<ActualPacket>()) {
1124 return true;
1125 }
1126 if (ep.protocol_version_ == protocol_version::v5 && is_v5<ActualPacket>()) {
1127 return true;
1128 }
1129 return false;
1130 };
1131
1132 // connection status check
1133 if constexpr(is_connect<ActualPacket>()) {
1134 if (ep.status_ != connection_status::closed) {
1135 self.complete(
1136 make_error(
1137 errc::protocol_error,
1138 "connect_packet can only be send on connection_status::closed"
1139 )
1140 );
1141 return false;
1142 }
1143 if (!version_check()) {
1144 self.complete(
1145 make_error(
1146 errc::protocol_error,
1147 "protocol version mismatch"
1148 )
1149 );
1150 return false;
1151 }
1152 }
1153 else if constexpr(is_connack<ActualPacket>()) {
1154 if (ep.status_ != connection_status::connecting) {
1155 self.complete(
1156 make_error(
1157 errc::protocol_error,
1158 "connack_packet can only be send on connection_status::connecting"
1159 )
1160 );
1161 return false;
1162 }
1163 if (!version_check()) {
1164 self.complete(
1165 make_error(
1166 errc::protocol_error,
1167 "protocol version mismatch"
1168 )
1169 );
1170 return false;
1171 }
1172 }
1173 else if constexpr(std::is_same_v<v5::auth_packet, Packet>) {
1174 if (ep.status_ != connection_status::connected &&
1175 ep.status_ != connection_status::connecting) {
1176 self.complete(
1177 make_error(
1178 errc::protocol_error,
1179 "auth packet can only be send on connection_status::connecting or status::connected"
1180 )
1181 );
1182 return false;
1183 }
1184 if (!version_check()) {
1185 self.complete(
1186 make_error(
1187 errc::protocol_error,
1188 "protocol version mismatch"
1189 )
1190 );
1191 return false;
1192 }
1193 }
1194 else {
1195 if (ep.status_ != connection_status::connected) {
1196 if constexpr(!is_publish<std::decay_t<ActualPacket>>()) {
1197 self.complete(
1198 make_error(
1199 errc::protocol_error,
1200 "packet can only be send on connection_status::connected"
1201 )
1202 );
1203 return false;
1204 }
1205 }
1206 if (!version_check()) {
1207 self.complete(
1208 make_error(
1209 errc::protocol_error,
1210 "protocol version mismatch"
1211 )
1212 );
1213 return false;
1214 }
1215 }
1216
1217 // sending process
1218 bool topic_alias_validated = false;
1219
1220 if constexpr(std::is_same_v<v3_1_1::connect_packet, std::decay_t<ActualPacket>>) {
1221 ep.initialize();
1222 ep.status_ = connection_status::connecting;
1223 auto keep_alive = actual_packet.keep_alive();
1224 if (keep_alive != 0 && !ep.pingreq_send_interval_ms_) {
1225 ep.pingreq_send_interval_ms_.emplace(keep_alive * 1000);
1226 }
1227 if (actual_packet.clean_session()) {
1228 ep.clear_pid_man();
1229 ep.store_.clear();
1230 ep.need_store_ = false;
1231 }
1232 else {
1233 ep.need_store_ = true;
1234 }
1235 ep.topic_alias_send_ = nullopt;
1236 }
1237
1238 if constexpr(std::is_same_v<v5::connect_packet, std::decay_t<ActualPacket>>) {
1239 ep.initialize();
1240 ep.status_ = connection_status::connecting;
1241 auto keep_alive = actual_packet.keep_alive();
1242 if (keep_alive != 0 && !ep.pingreq_send_interval_ms_) {
1243 ep.pingreq_send_interval_ms_.emplace(keep_alive * 1000);
1244 }
1245 if (actual_packet.clean_start()) {
1246 ep.clear_pid_man();
1247 ep.store_.clear();
1248 }
1249 for (auto const& prop : actual_packet.props()) {
1250 prop.visit(
1251 overload {
1252 [&](property::topic_alias_maximum const& p) {
1253 if (p.val() != 0) {
1254 ep.topic_alias_recv_.emplace(p.val());
1255 }
1256 },
1257 [&](property::receive_maximum const& p) {
1258 BOOST_ASSERT(p.val() != 0);
1259 ep.publish_recv_max_ = p.val();
1260 },
1261 [&](property::maximum_packet_size const& p) {
1262 BOOST_ASSERT(p.val() != 0);
1263 ep.maximum_packet_size_recv_ = p.val();
1264 },
1265 [&](property::session_expiry_interval const& p) {
1266 if (p.val() != 0) {
1267 ep.need_store_ = true;
1268 }
1269 },
1270 [](auto const&){}
1271 }
1272 );
1273 }
1274 }
1275
1276 if constexpr(std::is_same_v<v3_1_1::connack_packet, std::decay_t<ActualPacket>>) {
1277 if (actual_packet.code() == connect_return_code::accepted) {
1278 ep.status_ = connection_status::connected;
1279 }
1280 else {
1281 ep.status_ = connection_status::disconnecting;
1282 }
1283 }
1284
1285 if constexpr(std::is_same_v<v5::connack_packet, std::decay_t<ActualPacket>>) {
1286 if (actual_packet.code() == connect_reason_code::success) {
1287 ep.status_ = connection_status::connected;
1288 for (auto const& prop : actual_packet.props()) {
1289 prop.visit(
1290 overload {
1291 [&](property::topic_alias_maximum const& p) {
1292 if (p.val() != 0) {
1293 ep.topic_alias_recv_.emplace(p.val());
1294 }
1295 },
1296 [&](property::receive_maximum const& p) {
1297 BOOST_ASSERT(p.val() != 0);
1298 ep.publish_recv_max_ = p.val();
1299 },
1300 [&](property::maximum_packet_size const& p) {
1301 BOOST_ASSERT(p.val() != 0);
1302 ep.maximum_packet_size_recv_ = p.val();
1303 },
1304 [](auto const&){}
1305 }
1306 );
1307 }
1308 }
1309 else {
1310 ep.status_ = connection_status::disconnecting;
1311 }
1312 }
1313
1314 // store publish/pubrel packet
1315 if constexpr(is_publish<std::decay_t<ActualPacket>>()) {
1316 if (actual_packet.opts().get_qos() == qos::at_least_once ||
1317 actual_packet.opts().get_qos() == qos::exactly_once
1318 ) {
1319 BOOST_ASSERT(ep.pid_man_.is_used_id(actual_packet.packet_id()));
1320 if (ep.need_store_) {
1321 if constexpr(is_instance_of<v5::basic_publish_packet, std::decay_t<ActualPacket>>::value) {
1322 auto ta_opt = get_topic_alias(actual_packet.props());
1323 if (actual_packet.topic().empty()) {
1324 auto topic_opt = validate_topic_alias(self, ta_opt);
1325 if (!topic_opt) {
1326 auto packet_id = actual_packet.packet_id();
1327 if (packet_id != 0) {
1328 ep.release_pid(packet_id);
1329 }
1330 return false;
1331 }
1332 topic_alias_validated = true;
1333 auto props = actual_packet.props();
1334 auto it = props.cbegin();
1335 auto end = props.cend();
1336 for (; it != end; ++it) {
1337 if (it->id() == property::id::topic_alias) {
1338 props.erase(it);
1339 break;
1340 }
1341 }
1342
1343 auto store_packet =
1344 ActualPacket(
1345 actual_packet.packet_id(),
1346 allocate_buffer(*topic_opt),
1347 actual_packet.payload(),
1348 actual_packet.opts(),
1349 force_move(props)
1350 );
1351 if (!validate_maximum_packet_size(self, store_packet)) {
1352 auto packet_id = actual_packet.packet_id();
1353 if (packet_id != 0) {
1354 ep.release_pid(packet_id);
1355 }
1356 return false;
1357 }
1358 // add new packet that doesn't have topic_aliass to store
1359 // the original packet still use topic alias to send
1360 store_packet.set_dup(true);
1361 ep.store_.add(force_move(store_packet));
1362 }
1363 else {
1364 auto props = actual_packet.props();
1365 auto it = props.cbegin();
1366 auto end = props.cend();
1367 for (; it != end; ++it) {
1368 if (it->id() == property::id::topic_alias) {
1369 props.erase(it);
1370 break;
1371 }
1372 }
1373
1374 auto store_packet =
1375 ActualPacket(
1376 actual_packet.packet_id(),
1377 actual_packet.topic(),
1378 actual_packet.payload(),
1379 actual_packet.opts(),
1380 force_move(props)
1381 );
1382 if (!validate_maximum_packet_size(self, store_packet)) {
1383 auto packet_id = actual_packet.packet_id();
1384 if (packet_id != 0) {
1385 ep.release_pid(packet_id);
1386 }
1387 return false;
1388 }
1389 store_packet.set_dup(true);
1390 ep.store_.add(force_move(store_packet));
1391 }
1392 }
1393 else {
1394 if (!validate_maximum_packet_size(self, actual_packet)) {
1395 auto packet_id = actual_packet.packet_id();
1396 if (packet_id != 0) {
1397 ep.release_pid(packet_id);
1398 }
1399 return false;
1400 }
1401 auto store_packet{actual_packet};
1402 store_packet.set_dup(true);
1403 ep.store_.add(force_move(store_packet));
1404 }
1405 }
1406 if (actual_packet.opts().get_qos() == qos::exactly_once) {
1407 ep.qos2_publish_processing_.insert(actual_packet.packet_id());
1408 ep.pid_pubrec_.insert(actual_packet.packet_id());
1409 }
1410 else {
1411 ep.pid_puback_.insert(actual_packet.packet_id());
1412 }
1413 }
1414 }
1415
1416 if constexpr(is_instance_of<v5::basic_publish_packet, std::decay_t<ActualPacket>>::value) {
1417 // apply topic_alias
1418 auto ta_opt = get_topic_alias(actual_packet.props());
1419 if (actual_packet.topic().empty()) {
1420 if (!topic_alias_validated &&
1421 !validate_topic_alias(self, ta_opt)) {
1422 auto packet_id = actual_packet.packet_id();
1423 if (packet_id != 0) {
1424 ep.release_pid(packet_id);
1425 }
1426 return false;
1427 }
1428 // use topic_alias set by user
1429 }
1430 else {
1431 if (ta_opt) {
1432 if (validate_topic_alias_range(self, *ta_opt)) {
1433 ASYNC_MQTT_LOG("mqtt_impl", trace)
1434 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1435 << "topia alias : "
1436 << actual_packet.topic() << " - " << *ta_opt
1437 << " is registered." ;
1438 ep.topic_alias_send_->insert_or_update(actual_packet.topic(), *ta_opt);
1439 }
1440 else {
1441 auto packet_id = actual_packet.packet_id();
1442 if (packet_id != 0) {
1443 ep.release_pid(packet_id);
1444 }
1445 return false;
1446 }
1447 }
1448 else if (ep.auto_map_topic_alias_send_) {
1449 if (ep.topic_alias_send_) {
1450 if (auto ta_opt = ep.topic_alias_send_->find(actual_packet.topic())) {
1451 ASYNC_MQTT_LOG("mqtt_impl", trace)
1452 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1453 << "topia alias : " << actual_packet.topic() << " - " << *ta_opt
1454 << " is found." ;
1455 actual_packet.remove_topic_add_topic_alias(*ta_opt);
1456 }
1457 else {
1458 auto lru_ta = ep.topic_alias_send_->get_lru_alias();
1459 ep.topic_alias_send_->insert_or_update(actual_packet.topic(), lru_ta); // remap topic alias
1460 actual_packet.add_topic_alias(lru_ta);
1461 }
1462 }
1463 }
1464 else if (ep.auto_replace_topic_alias_send_) {
1465 if (ep.topic_alias_send_) {
1466 if (auto ta_opt = ep.topic_alias_send_->find(actual_packet.topic())) {
1467 ASYNC_MQTT_LOG("mqtt_impl", trace)
1468 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1469 << "topia alias : " << actual_packet.topic() << " - " << *ta_opt
1470 << " is found." ;
1471 actual_packet.remove_topic_add_topic_alias(*ta_opt);
1472 }
1473 }
1474 }
1475 }
1476
1477 // receive_maximum for sending
1478 if (!from_queue && ep.enqueue_publish(actual_packet)) {
1479 self.complete(
1480 make_error(
1481 errc::success,
1482 "publish_packet is enqueued due to receive_maximum for sending"
1483 )
1484 );
1485 return false;
1486 }
1487 }
1488
1489 if constexpr(is_instance_of<v5::basic_puback_packet, std::decay_t<ActualPacket>>::value) {
1490 ep.publish_recv_.erase(actual_packet.packet_id());
1491 }
1492
1493
1494 if constexpr(is_instance_of<v5::basic_pubrec_packet, std::decay_t<ActualPacket>>::value) {
1495 if (is_error(actual_packet.code())) {
1496 ep.publish_recv_.erase(actual_packet.packet_id());
1497 ep.qos2_publish_handled_.erase(actual_packet.packet_id());
1498 }
1499 }
1500
1501 if constexpr(is_pubrel<std::decay_t<ActualPacket>>()) {
1502 BOOST_ASSERT(ep.pid_man_.is_used_id(actual_packet.packet_id()));
1503 if (ep.need_store_) ep.store_.add(actual_packet);
1504 ep.pid_pubcomp_.insert(actual_packet.packet_id());
1505 }
1506
1507 if constexpr(is_instance_of<v5::basic_pubcomp_packet, std::decay_t<ActualPacket>>::value) {
1508 ep.publish_recv_.erase(actual_packet.packet_id());
1509 }
1510
1511 if constexpr(is_subscribe<std::decay_t<ActualPacket>>()) {
1512 BOOST_ASSERT(ep.pid_man_.is_used_id(actual_packet.packet_id()));
1513 ep.pid_suback_.insert(actual_packet.packet_id());
1514 }
1515
1516 if constexpr(is_unsubscribe<std::decay_t<ActualPacket>>()) {
1517 BOOST_ASSERT(ep.pid_man_.is_used_id(actual_packet.packet_id()));
1518 ep.pid_unsuback_.insert(actual_packet.packet_id());
1519 }
1520
1521 if constexpr(is_pingreq<std::decay_t<ActualPacket>>()) {
1522 ep.reset_pingresp_recv_timer();
1523 }
1524
1525 if constexpr(is_disconnect<std::decay_t<ActualPacket>>()) {
1526 ep.status_ = connection_status::disconnecting;
1527 }
1528
1529 if (!validate_maximum_packet_size(self, actual_packet)) {
1530 if constexpr(own_packet_id<std::decay_t<ActualPacket>>()) {
1531 auto packet_id = actual_packet.packet_id();
1532 if (packet_id != 0) {
1533 ep.release_pid(packet_id);
1534 }
1535 }
1536 return false;
1537 }
1538
1539 if constexpr(is_publish<std::decay_t<ActualPacket>>()) {
1540 if (ep.status_ != connection_status::connected) {
1541 // offline publish
1542 self.complete(
1543 make_error(
1544 errc::success,
1545 "packet is stored but not sent"
1546 )
1547 );
1548 return false;
1549 }
1550 }
1551
1552 return true;
1553 }
1554
1555 template <typename Self>
1556 bool validate_topic_alias_range(Self& self, topic_alias_t ta) {
1557 if (!ep.topic_alias_send_) {
1558 self.complete(
1559 make_error(
1560 errc::bad_message,
1561 "topic_alias is set but topic_alias_maximum is 0"
1562 )
1563 );
1564 return false;
1565 }
1566 if (ta == 0 || ta > ep.topic_alias_send_->max()) {
1567 self.complete(
1568 make_error(
1569 errc::bad_message,
1570 "topic_alias is set but out of range"
1571 )
1572 );
1573 return false;
1574 }
1575 return true;
1576 }
1577
1578 template <typename Self>
1579 optional<std::string> validate_topic_alias(Self& self, optional<topic_alias_t> ta_opt) {
1580 BOOST_ASSERT(ep.in_strand());
1581 if (!ta_opt) {
1582 self.complete(
1583 make_error(
1584 errc::bad_message,
1585 "topic is empty but topic_alias isn't set"
1586 )
1587 );
1588 return nullopt;
1589 }
1590
1591 if (!validate_topic_alias_range(self, *ta_opt)) {
1592 return nullopt;
1593 }
1594
1595 auto topic = ep.topic_alias_send_->find(*ta_opt);
1596 if (topic.empty()) {
1597 self.complete(
1598 make_error(
1599 errc::bad_message,
1600 "topic is empty but topic_alias is not registered"
1601 )
1602 );
1603 return nullopt;
1604 }
1605 return topic;
1606 }
1607
1608 template <typename Self, typename PacketArg>
1609 bool validate_maximum_packet_size(Self& self, PacketArg const& packet_arg) {
1610 if (packet_arg.size() > ep.maximum_packet_size_send_) {
1611 ASYNC_MQTT_LOG("mqtt_impl", error)
1612 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1613 << "packet size over maximum_packet_size for sending";
1614 self.complete(
1615 make_error(
1616 errc::bad_message,
1617 "packet size is over maximum_packet_size for sending"
1618 )
1619 );
1620 return false;
1621 }
1622 return true;
1623 }
1624 };
1625
1626 struct recv_impl {
1627 this_type& ep;
1628 optional<filter> fil = nullopt;
1629 std::set<control_packet_type> types = {};
1630 optional<system_error> decided_error = nullopt;
1631 optional<basic_packet_variant<PacketIdBytes>> pv_opt = nullopt;
1632 enum { initiate, disconnect, close, read, complete } state = initiate;
1633
1634 template <typename Self>
1635 void operator()(
1636 Self& self,
1637 error_code const& ec = error_code{},
1638 buffer buf = buffer{}
1639 ) {
1640 if (ec) {
1641 ASYNC_MQTT_LOG("mqtt_impl", info)
1642 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1643 << "recv error:" << ec.message();
1644 decided_error.emplace(ec);
1645 ep.recv_processing_ = false;
1646 state = close;
1647 auto& a_ep{ep};
1648 a_ep.close(
1649 as::bind_executor(
1650 a_ep.stream_->raw_strand(),
1651 force_move(self)
1652 )
1653 );
1654 return;
1655 }
1656
1657 switch (state) {
1658 case initiate: {
1659 state = read;
1660 auto& a_ep{ep};
1661 a_ep.stream_->read_packet(force_move(self));
1662 } break;
1663 case read: {
1664 BOOST_ASSERT(ep.in_strand());
1665 if (buf.size() > ep.maximum_packet_size_recv_) {
1666 // on v3.1.1 maximum_packet_size_recv_ is initialized as packet_size_no_limit
1667 BOOST_ASSERT(ep.protocol_version_ == protocol_version::v5);
1668 state = disconnect;
1669 decided_error.emplace(
1670 make_error(
1671 errc::bad_message,
1672 "too large packet received"
1673 )
1674 );
1675 auto& a_ep{ep};
1676 a_ep.send(
1677 v5::disconnect_packet{
1678 disconnect_reason_code::packet_too_large
1679 },
1680 force_move(self)
1681 );
1682 return;
1683 }
1684
1685 auto v = buffer_to_basic_packet_variant<PacketIdBytes>(buf, ep.protocol_version_);
1686 ASYNC_MQTT_LOG("mqtt_impl", trace)
1687 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1688 << "recv:" << v;
1689 bool call_complete = true;
1690 v.visit(
1691 // do internal protocol processing
1692 overload {
1693 [&](v3_1_1::connect_packet& p) {
1694 ep.initialize();
1695 ep.protocol_version_ = protocol_version::v3_1_1;
1696 ep.status_ = connection_status::connecting;
1697 auto keep_alive = p.keep_alive();
1698 if (keep_alive != 0) {
1699 ep.pingreq_recv_timeout_ms_.emplace(keep_alive * 1000 * 3 / 2);
1700 }
1701 if (p.clean_session()) {
1702 ep.need_store_ = false;
1703 }
1704 else {
1705 ep.need_store_ = true;
1706 }
1707 },
1708 [&](v5::connect_packet& p) {
1709 ep.initialize();
1710 ep.protocol_version_ = protocol_version::v5;
1711 ep.status_ = connection_status::connecting;
1712 auto keep_alive = p.keep_alive();
1713 if (keep_alive != 0) {
1714 ep.pingreq_recv_timeout_ms_.emplace(keep_alive * 1000 * 3 / 2);
1715 }
1716 for (auto const& prop : p.props()) {
1717 prop.visit(
1718 overload {
1719 [&](property::topic_alias_maximum const& p) {
1720 if (p.val() > 0) {
1721 ep.topic_alias_send_.emplace(p.val());
1722 }
1723 },
1724 [&](property::receive_maximum const& p) {
1725 BOOST_ASSERT(p.val() != 0);
1726 ep.publish_send_max_ = p.val();
1727 },
1728 [&](property::maximum_packet_size const& p) {
1729 BOOST_ASSERT(p.val() != 0);
1730 ep.maximum_packet_size_send_ = p.val();
1731 },
1732 [&](property::session_expiry_interval const& p) {
1733 if (p.val() != 0) {
1734 ep.need_store_ = true;
1735 }
1736 },
1737 [](auto const&) {
1738 }
1739 }
1740 );
1741 }
1742 },
1743 [&](v3_1_1::connack_packet& p) {
1744 if (p.code() == connect_return_code::accepted) {
1745 ep.status_ = connection_status::connected;
1746 if (p.session_present()) {
1747 ep.send_stored();
1748 }
1749 else {
1750 ep.clear_pid_man();
1751 ep.store_.clear();
1752 }
1753 }
1754 },
1755 [&](v5::connack_packet& p) {
1756 if (p.code() == connect_reason_code::success) {
1757 ep.status_ = connection_status::connected;
1758
1759 for (auto const& prop : p.props()) {
1760 prop.visit(
1761 overload {
1762 [&](property::topic_alias_maximum const& p) {
1763 if (p.val() > 0) {
1764 ep.topic_alias_send_.emplace(p.val());
1765 }
1766 },
1767 [&](property::receive_maximum const& p) {
1768 BOOST_ASSERT(p.val() != 0);
1769 ep.publish_send_max_ = p.val();
1770 },
1771 [&](property::maximum_packet_size const& p) {
1772 BOOST_ASSERT(p.val() != 0);
1773 ep.maximum_packet_size_send_ = p.val();
1774 },
1775 [](auto const&) {
1776 }
1777 }
1778 );
1779 }
1780
1781 if (p.session_present()) {
1782 ep.send_stored();
1783 }
1784 else {
1785 ep.clear_pid_man();
1786 ep.store_.clear();
1787 }
1788 }
1789 },
1790 [&](v3_1_1::basic_publish_packet<PacketIdBytes>& p) {
1791 switch (p.opts().get_qos()) {
1792 case qos::at_least_once: {
1793 if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
1794 ep.send(
1795 v3_1_1::basic_puback_packet<PacketIdBytes>(p.packet_id()),
1796 [](system_error const&){}
1797 );
1798 }
1799 } break;
1800 case qos::exactly_once:
1801 call_complete = process_qos2_publish(self, protocol_version::v3_1_1, p.packet_id());
1802 break;
1803 default:
1804 break;
1805 }
1806 },
1807 [&](v5::basic_publish_packet<PacketIdBytes>& p) {
1808 switch (p.opts().get_qos()) {
1809 case qos::at_least_once: {
1810 if (ep.publish_recv_.size() == ep.publish_recv_max_) {
1811 state = disconnect;
1812 decided_error.emplace(
1813 make_error(
1814 errc::bad_message,
1815 "receive maximum exceeded"
1816 )
1817 );
1818 auto& a_ep{ep};
1819 a_ep.send(
1820 v5::disconnect_packet{
1821 disconnect_reason_code::receive_maximum_exceeded
1822 },
1823 force_move(self)
1824 );
1825 return;
1826 }
1827 auto packet_id = p.packet_id();
1828 ep.publish_recv_.insert(packet_id);
1829 if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
1830 ep.send(
1831 v5::basic_puback_packet<PacketIdBytes>{packet_id},
1832 [](system_error const&){}
1833 );
1834 }
1835 } break;
1836 case qos::exactly_once: {
1837 if (ep.publish_recv_.size() == ep.publish_recv_max_) {
1838 state = disconnect;
1839 decided_error.emplace(
1840 make_error(
1841 errc::bad_message,
1842 "receive maximum exceeded"
1843 )
1844 );
1845 auto& a_ep{ep};
1846 a_ep.send(
1847 v5::disconnect_packet{
1848 disconnect_reason_code::receive_maximum_exceeded
1849 },
1850 force_move(self)
1851 );
1852 return;
1853 }
1854 auto packet_id = p.packet_id();
1855 ep.publish_recv_.insert(packet_id);
1856 call_complete = process_qos2_publish(self, protocol_version::v5, packet_id);
1857 } break;
1858 default:
1859 break;
1860 }
1861
1862 if (p.topic().empty()) {
1863 if (auto ta_opt = get_topic_alias(p.props())) {
1864 // extract topic from topic_alias
1865 if (*ta_opt == 0 ||
1866 *ta_opt > ep.topic_alias_recv_->max()) {
1867 state = disconnect;
1868 decided_error.emplace(
1869 make_error(
1870 errc::bad_message,
1871 "topic alias invalid"
1872 )
1873 );
1874 auto& a_ep{ep};
1875 a_ep.send(
1876 v5::disconnect_packet{
1877 disconnect_reason_code::topic_alias_invalid
1878 },
1879 force_move(self)
1880 );
1881 return;
1882 }
1883 else {
1884 auto topic = ep.topic_alias_recv_->find(*ta_opt);
1885 if (topic.empty()) {
1886 ASYNC_MQTT_LOG("mqtt_impl", error)
1887 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1888 << "no matching topic alias: "
1889 << *ta_opt;
1890 state = disconnect;
1891 decided_error.emplace(
1892 make_error(
1893 errc::bad_message,
1894 "topic alias invalid"
1895 )
1896 );
1897 auto& a_ep{ep};
1898 a_ep.send(
1899 v5::disconnect_packet{
1900 disconnect_reason_code::topic_alias_invalid
1901 },
1902 force_move(self)
1903 );
1904 return;
1905 }
1906 else {
1907 p.add_topic(allocate_buffer(topic));
1908 }
1909 }
1910 }
1911 else {
1912 ASYNC_MQTT_LOG("mqtt_impl", error)
1913 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1914 << "topic is empty but topic_alias isn't set";
1915 state = disconnect;
1916 decided_error.emplace(
1917 make_error(
1918 errc::bad_message,
1919 "topic alias invalid"
1920 )
1921 );
1922 auto& a_ep{ep};
1923 a_ep.send(
1924 v5::disconnect_packet{
1925 disconnect_reason_code::topic_alias_invalid
1926 },
1927 force_move(self)
1928 );
1929 return;
1930 }
1931 }
1932 else {
1933 if (auto ta_opt = get_topic_alias(p.props())) {
1934 if (*ta_opt == 0 ||
1935 *ta_opt > ep.topic_alias_recv_->max()) {
1936 state = disconnect;
1937 decided_error.emplace(
1938 make_error(
1939 errc::bad_message,
1940 "topic alias invalid"
1941 )
1942 );
1943 auto& a_ep{ep};
1944 a_ep.send(
1945 v5::disconnect_packet{
1946 disconnect_reason_code::topic_alias_invalid
1947 },
1948 force_move(self)
1949 );
1950 return;
1951 }
1952 else {
1953 // extract topic from topic_alias
1954 if (ep.topic_alias_recv_) {
1955 ep.topic_alias_recv_->insert_or_update(p.topic(), *ta_opt);
1956 }
1957 }
1958 }
1959 }
1960 },
1961 [&](v3_1_1::basic_puback_packet<PacketIdBytes>& p) {
1962 auto packet_id = p.packet_id();
1963 if (ep.pid_puback_.erase(packet_id)) {
1964 ep.store_.erase(response_packet::v3_1_1_puback, packet_id);
1965 ep.release_pid(packet_id);
1966 }
1967 else {
1968 ASYNC_MQTT_LOG("mqtt_impl", info)
1969 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1970 << "invalid packet_id puback received packet_id:" << packet_id;
1971 state = disconnect;
1972 decided_error.emplace(
1973 make_error(
1974 errc::bad_message,
1975 "packet_id invalid"
1976 )
1977 );
1978 auto& a_ep{ep};
1979 a_ep.send(
1980 v5::disconnect_packet{
1981 disconnect_reason_code::topic_alias_invalid
1982 },
1983 force_move(self)
1984 );
1985 return;
1986 }
1987 },
1988 [&](v5::basic_puback_packet<PacketIdBytes>& p) {
1989 auto packet_id = p.packet_id();
1990 if (ep.pid_puback_.erase(packet_id)) {
1991 ep.store_.erase(response_packet::v5_puback, packet_id);
1992 ep.release_pid(packet_id);
1993 --ep.publish_send_count_;
1994 send_publish_from_queue();
1995 }
1996 else {
1997 ASYNC_MQTT_LOG("mqtt_impl", info)
1998 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1999 << "invalid packet_id puback received packet_id:" << packet_id;
2000 state = disconnect;
2001 decided_error.emplace(
2002 make_error(
2003 errc::bad_message,
2004 "packet_id invalid"
2005 )
2006 );
2007 auto& a_ep{ep};
2008 a_ep.send(
2009 v5::disconnect_packet{
2010 disconnect_reason_code::topic_alias_invalid
2011 },
2012 force_move(self)
2013 );
2014 return;
2015 }
2016 },
2017 [&](v3_1_1::basic_pubrec_packet<PacketIdBytes>& p) {
2018 auto packet_id = p.packet_id();
2019 if (ep.pid_pubrec_.erase(packet_id)) {
2020 ep.store_.erase(response_packet::v3_1_1_pubrec, packet_id);
2021 if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
2022 ep.send(
2023 v3_1_1::basic_pubrel_packet<PacketIdBytes>(packet_id),
2024 [](system_error const&){}
2025 );
2026 }
2027 }
2028 else {
2029 ASYNC_MQTT_LOG("mqtt_impl", info)
2030 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2031 << "invalid packet_id pubrec received packet_id:" << packet_id;
2032 state = disconnect;
2033 decided_error.emplace(
2034 make_error(
2035 errc::bad_message,
2036 "packet_id invalid"
2037 )
2038 );
2039 auto& a_ep{ep};
2040 a_ep.send(
2041 v5::disconnect_packet{
2042 disconnect_reason_code::topic_alias_invalid
2043 },
2044 force_move(self)
2045 );
2046 return;
2047 }
2048 },
2049 [&](v5::basic_pubrec_packet<PacketIdBytes>& p) {
2050 auto packet_id = p.packet_id();
2051 if (ep.pid_pubrec_.erase(packet_id)) {
2052 ep.store_.erase(response_packet::v5_pubrec, packet_id);
2053 if (is_error(p.code())) {
2054 ep.release_pid(packet_id);
2055 ep.qos2_publish_processing_.erase(packet_id);
2056 --ep.publish_send_count_;
2057 send_publish_from_queue();
2058 }
2059 else if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
2060 ep.send(
2061 v5::basic_pubrel_packet<PacketIdBytes>(packet_id),
2062 [](system_error const&){}
2063 );
2064 }
2065 }
2066 else {
2067 ASYNC_MQTT_LOG("mqtt_impl", info)
2068 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2069 << "invalid packet_id pubrec received packet_id:" << packet_id;
2070 state = disconnect;
2071 decided_error.emplace(
2072 make_error(
2073 errc::bad_message,
2074 "packet_id invalid"
2075 )
2076 );
2077 auto& a_ep{ep};
2078 a_ep.send(
2079 v5::disconnect_packet{
2080 disconnect_reason_code::topic_alias_invalid
2081 },
2082 force_move(self)
2083 );
2084 return;
2085 }
2086 },
2087 [&](v3_1_1::basic_pubrel_packet<PacketIdBytes>& p) {
2088 auto packet_id = p.packet_id();
2089 ep.qos2_publish_handled_.erase(packet_id);
2090 if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
2091 ep.send(
2092 v3_1_1::basic_pubcomp_packet<PacketIdBytes>(packet_id),
2093 [](system_error const&){}
2094 );
2095 }
2096 },
2097 [&](v5::basic_pubrel_packet<PacketIdBytes>& p) {
2098 auto packet_id = p.packet_id();
2099 ep.qos2_publish_handled_.erase(packet_id);
2100 if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
2101 ep.send(
2102 v5::basic_pubcomp_packet<PacketIdBytes>(packet_id),
2103 [](system_error const&){}
2104 );
2105 }
2106 },
2107 [&](v3_1_1::basic_pubcomp_packet<PacketIdBytes>& p) {
2108 auto packet_id = p.packet_id();
2109 if (ep.pid_pubcomp_.erase(packet_id)) {
2110 ep.store_.erase(response_packet::v3_1_1_pubcomp, packet_id);
2111 ep.release_pid(packet_id);
2112 ep.qos2_publish_processing_.erase(packet_id);
2113 --ep.publish_send_count_;
2114 send_publish_from_queue();
2115 }
2116 else {
2117 ASYNC_MQTT_LOG("mqtt_impl", info)
2118 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2119 << "invalid packet_id pubcomp received packet_id:" << packet_id;
2120 state = disconnect;
2121 decided_error.emplace(
2122 make_error(
2123 errc::bad_message,
2124 "packet_id invalid"
2125 )
2126 );
2127 auto& a_ep{ep};
2128 a_ep.send(
2129 v5::disconnect_packet{
2130 disconnect_reason_code::topic_alias_invalid
2131 },
2132 force_move(self)
2133 );
2134 return;
2135 }
2136 },
2137 [&](v5::basic_pubcomp_packet<PacketIdBytes>& p) {
2138 auto packet_id = p.packet_id();
2139 if (ep.pid_pubcomp_.erase(packet_id)) {
2140 ep.store_.erase(response_packet::v5_pubcomp, packet_id);
2141 ep.release_pid(packet_id);
2142 ep.qos2_publish_processing_.erase(packet_id);
2143 }
2144 else {
2145 ASYNC_MQTT_LOG("mqtt_impl", info)
2146 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2147 << "invalid packet_id pubcomp received packet_id:" << packet_id;
2148 state = disconnect;
2149 decided_error.emplace(
2150 make_error(
2151 errc::bad_message,
2152 "packet_id invalid"
2153 )
2154 );
2155 auto& a_ep{ep};
2156 a_ep.send(
2157 v5::disconnect_packet{
2158 disconnect_reason_code::topic_alias_invalid
2159 },
2160 force_move(self)
2161 );
2162 return;
2163 }
2164 },
2165 [&](v3_1_1::basic_subscribe_packet<PacketIdBytes>&) {
2166 },
2167 [&](v5::basic_subscribe_packet<PacketIdBytes>&) {
2168 },
2169 [&](v3_1_1::basic_suback_packet<PacketIdBytes>& p) {
2170 auto packet_id = p.packet_id();
2171 if (ep.pid_suback_.erase(packet_id)) {
2172 ep.release_pid(packet_id);
2173 }
2174 },
2175 [&](v5::basic_suback_packet<PacketIdBytes>& p) {
2176 auto packet_id = p.packet_id();
2177 if (ep.pid_suback_.erase(packet_id)) {
2178 ep.release_pid(packet_id);
2179 }
2180 },
2181 [&](v3_1_1::basic_unsubscribe_packet<PacketIdBytes>&) {
2182 },
2183 [&](v5::basic_unsubscribe_packet<PacketIdBytes>&) {
2184 },
2185 [&](v3_1_1::basic_unsuback_packet<PacketIdBytes>& p) {
2186 auto packet_id = p.packet_id();
2187 if (ep.pid_unsuback_.erase(packet_id)) {
2188 ep.release_pid(packet_id);
2189 }
2190 },
2191 [&](v5::basic_unsuback_packet<PacketIdBytes>& p) {
2192 auto packet_id = p.packet_id();
2193 if (ep.pid_unsuback_.erase(packet_id)) {
2194 ep.release_pid(packet_id);
2195 }
2196 },
2197 [&](v3_1_1::pingreq_packet&) {
2198 if constexpr(can_send_as_server(Role)) {
2199 if (ep.auto_ping_response_ && ep.status_ == connection_status::connected) {
2200 ep.send(
2201 v3_1_1::pingresp_packet(),
2202 [](system_error const&){}
2203 );
2204 }
2205 }
2206 },
2207 [&](v5::pingreq_packet&) {
2208 if constexpr(can_send_as_server(Role)) {
2209 if (ep.auto_ping_response_ && ep.status_ == connection_status::connected) {
2210 ep.send(
2211 v5::pingresp_packet(),
2212 [](system_error const&){}
2213 );
2214 }
2215 }
2216 },
2217 [&](v3_1_1::pingresp_packet&) {
2218 ep.tim_pingresp_recv_->cancel();
2219 },
2220 [&](v5::pingresp_packet&) {
2221 ep.tim_pingresp_recv_->cancel();
2222 },
2223 [&](v3_1_1::disconnect_packet&) {
2224 ep.status_ = connection_status::disconnecting;
2225 },
2226 [&](v5::disconnect_packet&) {
2227 ep.status_ = connection_status::disconnecting;
2228 },
2229 [&](v5::auth_packet&) {
2230 },
2231 [&](system_error&) {
2232 ep.status_ = connection_status::closed;
2233 }
2234 }
2235 );
2236 ep.reset_pingreq_recv_timer();
2237 ep.recv_processing_ = false;
2238
2239 auto try_to_comp =
2240 [&] {
2241 if (call_complete && !decided_error) {
2242 pv_opt.emplace(force_move(v));
2243 state = complete;
2244 bind_dispatch(force_move(self));
2245 }
2246 };
2247
2248 if (fil) {
2249 if (auto type_opt = v.type()) {
2250 if ((*fil == filter::match && types.find(*type_opt) == types.end()) ||
2251 (*fil == filter::except && types.find(*type_opt) != types.end())
2252 ) {
2253 // read the next packet
2254 state = initiate;
2255 auto& a_ep{ep};
2256 as::dispatch(
2257 as::bind_executor(
2258 a_ep.stream_->raw_strand(),
2259 force_move(self)
2260 )
2261 );
2262 }
2263 else {
2264 try_to_comp();
2265 }
2266 }
2267 else {
2268 try_to_comp();
2269 }
2270 }
2271 else {
2272 try_to_comp();
2273 }
2274 } break;
2275 case disconnect: {
2276 state = close;
2277 auto& a_ep{ep};
2278 a_ep.close(
2279 as::bind_executor(
2280 a_ep.stream_->raw_strand(),
2281 force_move(self)
2282 )
2283 );
2284 } break;
2285 case close: {
2286 BOOST_ASSERT(decided_error);
2287 ep.recv_processing_ = false;
2288 ASYNC_MQTT_LOG("mqtt_impl", info)
2289 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2290 << "recv code triggers close:" << decided_error->what();
2291 pv_opt.emplace(force_move(*decided_error));
2292 state = complete;
2293 bind_dispatch(force_move(self));
2294 } break;
2295 case complete:
2296 BOOST_ASSERT(pv_opt);
2297 self.complete(force_move(*pv_opt));
2298 break;
2299 default:
2300 BOOST_ASSERT(false);
2301 break;
2302 }
2303 }
2304
2305 template <typename Self>
2306 void operator()(
2307 Self& self,
2308 system_error const&
2309 ) {
2310 BOOST_ASSERT(state == disconnect);
2311 state = close;
2312 auto& a_ep{ep};
2313 a_ep.close(
2314 as::bind_executor(
2315 a_ep.stream_->raw_strand(),
2316 force_move(self)
2317 )
2318 );
2319 }
2320
2321 void send_publish_from_queue() {
2322 BOOST_ASSERT(ep.in_strand());
2323 if (ep.status_ != connection_status::connected) return;
2324 while (!ep.publish_queue_.empty() &&
2325 ep.publish_send_count_ != ep.publish_send_max_) {
2326 ep.send(
2327 force_move(ep.publish_queue_.front()),
2328 true, // from queue
2329 [](system_error const&){}
2330 );
2331 ep.publish_queue_.pop_front();
2332 }
2333 }
2334
2335 template <typename Self>
2336 bool process_qos2_publish(
2337 Self& self,
2338 protocol_version ver,
2339 packet_id_t packet_id
2340 ) {
2341 BOOST_ASSERT(ep.in_strand());
2342 bool already_handled = false;
2343 if (ep.qos2_publish_handled_.find(packet_id) == ep.qos2_publish_handled_.end()) {
2344 ep.qos2_publish_handled_.emplace(packet_id);
2345 }
2346 else {
2347 already_handled = true;
2348 }
2349 if (ep.status_ == connection_status::connected &&
2350 (ep.auto_pub_response_ ||
2351 already_handled) // already_handled is true only if the pubrec packet
2352 ) { // corresponding to the publish packet has already
2353 // been sent as success
2354 switch (ver) {
2355 case protocol_version::v3_1_1:
2356 ep.send(
2357 v3_1_1::basic_pubrec_packet<PacketIdBytes>(packet_id),
2358 [](system_error const&){}
2359 );
2360 break;
2361 case protocol_version::v5:
2362 ep.send(
2363 v5::basic_pubrec_packet<PacketIdBytes>(packet_id),
2364 [](system_error const&){}
2365 );
2366 break;
2367 default:
2368 BOOST_ASSERT(false);
2369 break;
2370 }
2371 }
2372 if (already_handled) {
2373 // do the next read
2374 auto& a_ep{ep};
2375 a_ep.stream_->read_packet(force_move(self));
2376 return false;
2377 }
2378 return true;
2379 }
2380 };
2381
2382 struct close_impl {
2383 this_type& ep;
2384 enum { dispatch, close, bind, complete } state = dispatch;
2385 this_type_sp life_keeper = ep.shared_from_this();
2386
2387 template <typename Self>
2388 void operator()(
2389 Self& self,
2390 error_code const& = error_code{}
2391 ) {
2392 switch (state) {
2393 case dispatch: {
2394 state = close;
2395 auto& a_ep{ep};
2396 as::dispatch(
2397 as::bind_executor(
2398 a_ep.stream_->raw_strand(),
2399 force_move(self)
2400 )
2401 );
2402 } break;
2403 case close:
2404 BOOST_ASSERT(ep.in_strand());
2405 switch (ep.status_) {
2406 case connection_status::connecting:
2407 case connection_status::connected:
2408 case connection_status::disconnecting: {
2409 ASYNC_MQTT_LOG("mqtt_impl", trace)
2410 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2411 << "close initiate status:" << static_cast<int>(ep.status_);
2412 state = bind;
2413 ep.status_ = connection_status::closing;
2414 auto& a_ep{ep};
2415 a_ep.stream_->close(force_move(self));
2416 } break;
2417 case connection_status::closing: {
2418 ASYNC_MQTT_LOG("mqtt_impl", trace)
2419 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2420 << "already close requested";
2421 auto& a_ep{ep};
2422 auto exe = as::get_associated_executor(self);
2423 a_ep.close_queue_.post(
2424 as::bind_executor(
2425 exe,
2426 force_move(self)
2427 )
2428 );
2429 } break;
2430 case connection_status::closed:
2431 ASYNC_MQTT_LOG("mqtt_impl", trace)
2432 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2433 << "already closed";
2434 state = complete;
2435 bind_dispatch(force_move(self));
2436 } break;
2437 case bind: {
2438 BOOST_ASSERT(ep.in_strand());
2439 ASYNC_MQTT_LOG("mqtt_impl", trace)
2440 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2441 << "close complete status:" << static_cast<int>(ep.status_);
2442 auto& a_ep{ep};
2443 a_ep.tim_pingreq_send_->cancel();
2444 a_ep.tim_pingreq_recv_->cancel();
2445 a_ep.tim_pingresp_recv_->cancel();
2446 a_ep.status_ = connection_status::closed;
2447 ASYNC_MQTT_LOG("mqtt_impl", trace)
2448 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2449 << "process enqueued close";
2450 a_ep.close_queue_.poll();
2451 state = complete;
2452 state = complete;
2453 bind_dispatch(force_move(self));
2454 } break;
2455 case complete:
2456 self.complete();
2457 break;
2458 }
2459 }
2460 };
2461
2462 struct restore_packets_impl {
2463 this_type& ep;
2464 std::vector<basic_store_packet_variant<PacketIdBytes>> pvs;
2465 enum { dispatch, restore, complete } state = dispatch;
2466
2467 template <typename Self>
2468 void operator()(
2469 Self& self
2470 ) {
2471 switch (state) {
2472 case dispatch: {
2473 state = restore;
2474 auto& a_ep{ep};
2475 as::dispatch(
2476 as::bind_executor(
2477 a_ep.stream_->raw_strand(),
2478 force_move(self)
2479 )
2480 );
2481 } break;
2482 case restore: {
2483 BOOST_ASSERT(ep.in_strand());
2484 ep.restore_packets(force_move(pvs));
2485 state = complete;
2486 bind_dispatch(force_move(self));
2487 } break;
2488 case complete:
2489 self.complete();
2490 break;
2491 }
2492 }
2493 };
2494
2495 struct get_stored_packets_impl {
2496 this_type const& ep;
2497 std::vector<basic_store_packet_variant<PacketIdBytes>> packets = {};
2498 enum { dispatch, get, complete } state = dispatch;
2499
2500 template <typename Self>
2501 void operator()(
2502 Self& self
2503 ) {
2504 switch (state) {
2505 case dispatch: {
2506 state = get;
2507 auto& a_ep{ep};
2508 as::dispatch(
2509 as::bind_executor(
2510 a_ep.stream_->raw_strand(),
2511 force_move(self)
2512 )
2513 );
2514 } break;
2515 case get: {
2516 BOOST_ASSERT(ep.in_strand());
2517 packets = ep.get_stored_packets();
2518 state = complete;
2519 bind_dispatch(force_move(self));
2520 } break;
2521 case complete:
2522 self.complete(force_move(packets));
2523 break;
2524 }
2525 }
2526 };
2527
2528 struct regulate_for_store_impl {
2529 this_type const& ep;
2530 v5::basic_publish_packet<PacketIdBytes> packet;
2531 enum { dispatch, regulate, complete } state = dispatch;
2532
2533 template <typename Self>
2534 void operator()(
2535 Self& self
2536 ) {
2537 switch (state) {
2538 case dispatch: {
2539 state = regulate;
2540 auto& a_ep{ep};
2541 as::dispatch(
2542 as::bind_executor(
2543 a_ep.stream_->raw_strand(),
2544 force_move(self)
2545 )
2546 );
2547 } break;
2548 case regulate: {
2549 BOOST_ASSERT(ep.in_strand());
2550 ep.regulate_for_store(packet);
2551 state = complete;
2552 bind_dispatch(force_move(self));
2553 } break;
2554 case complete:
2555 self.complete(force_move(packet));
2556 break;
2557 }
2558 }
2559 };
2560
2561private:
2562
2563 template <typename Packet, typename CompletionToken>
2564 auto
2565 send(
2566 Packet packet,
2567 bool from_queue,
2568 CompletionToken&& token
2569 ) {
2570 return
2571 as::async_compose<
2572 CompletionToken,
2573 void(system_error)
2574 >(
2575 send_impl<Packet>{
2576 *this,
2577 force_move(packet),
2578 from_queue
2579 },
2580 token
2581 );
2582 }
2583
2584 bool enqueue_publish(v5::basic_publish_packet<PacketIdBytes>& packet) {
2585 BOOST_ASSERT(in_strand());
2586 if (packet.opts().get_qos() == qos::at_least_once ||
2587 packet.opts().get_qos() == qos::exactly_once
2588 ) {
2589 if (publish_send_count_ == publish_send_max_) {
2590 publish_queue_.push_back(force_move(packet));
2591 return true;
2592 }
2593 else {
2594 ++publish_send_count_;
2595 if (!publish_queue_.empty()) {
2596 publish_queue_.push_back(force_move(packet));
2597 return true;
2598 }
2599 }
2600 }
2601 return false;
2602 }
2603
2604 void send_stored() {
2605 BOOST_ASSERT(in_strand());
2606 store_.for_each(
2607 [&](basic_store_packet_variant<PacketIdBytes> const& pv) {
2608 if (pv.size() > maximum_packet_size_send_) {
2609 release_pid(pv.packet_id());
2610 return false;
2611 }
2612 pv.visit(
2613 // copy packet because the stored packets need to be preserved
2614 // until receiving puback/pubrec/pubcomp
2615 overload {
2616 [&](v3_1_1::basic_publish_packet<PacketIdBytes> p) {
2617 send(
2618 p,
2619 [](system_error const&){}
2620 );
2621 },
2622 [&](v5::basic_publish_packet<PacketIdBytes> p) {
2623 if (enqueue_publish(p)) return;
2624 send(
2625 p,
2626 [](system_error const&){}
2627 );
2628 },
2629 [&](v3_1_1::basic_pubrel_packet<PacketIdBytes> p) {
2630 send(
2631 p,
2632 [](system_error const&){}
2633 );
2634 },
2635 [&](v5::basic_pubrel_packet<PacketIdBytes> p) {
2636 send(
2637 p,
2638 [](system_error const&){}
2639 );
2640 }
2641 }
2642 );
2643 return true;
2644 }
2645 );
2646 }
2647
2648 void initialize() {
2649 BOOST_ASSERT(in_strand());
2650 publish_send_count_ = 0;
2651 publish_queue_.clear();
2652 topic_alias_send_ = nullopt;
2653 topic_alias_recv_ = nullopt;
2654 publish_recv_.clear();
2655 qos2_publish_processing_.clear();
2656 need_store_ = false;
2657 pid_suback_.clear();
2658 pid_unsuback_.clear();
2659 pid_puback_.clear();
2660 pid_pubrec_.clear();
2661 pid_pubcomp_.clear();
2662 }
2663
2664 void reset_pingreq_send_timer() {
2665 BOOST_ASSERT(in_strand());
2666 if (pingreq_send_interval_ms_) {
2667 tim_pingreq_send_->cancel();
2668 if (status_ == connection_status::disconnecting ||
2669 status_ == connection_status::closing ||
2670 status_ == connection_status::closed) return;
2671 tim_pingreq_send_->expires_after(
2672 std::chrono::milliseconds{*pingreq_send_interval_ms_}
2673 );
2674 tim_pingreq_send_->async_wait(
2675 [this, wp = std::weak_ptr{tim_pingreq_send_}](error_code const& ec) {
2676 if (!ec) {
2677 if (auto sp = wp.lock()) {
2678 switch (protocol_version_) {
2679 case protocol_version::v3_1_1:
2680 send(
2681 v3_1_1::pingreq_packet(),
2682 [](system_error const&){}
2683 );
2684 break;
2685 case protocol_version::v5:
2686 send(
2687 v5::pingreq_packet(),
2688 [](system_error const&){}
2689 );
2690 break;
2691 default:
2692 BOOST_ASSERT(false);
2693 break;
2694 }
2695 }
2696 }
2697 }
2698 );
2699 }
2700 }
2701
2702 void reset_pingreq_recv_timer() {
2703 BOOST_ASSERT(in_strand());
2704 if (pingreq_recv_timeout_ms_) {
2705 tim_pingreq_recv_->cancel();
2706 if (status_ == connection_status::disconnecting ||
2707 status_ == connection_status::closing ||
2708 status_ == connection_status::closed) return;
2709 tim_pingreq_recv_->expires_after(
2710 std::chrono::milliseconds{*pingreq_recv_timeout_ms_}
2711 );
2712 tim_pingreq_recv_->async_wait(
2713 [this, wp = std::weak_ptr{tim_pingreq_recv_}](error_code const& ec) {
2714 if (!ec) {
2715 if (auto sp = wp.lock()) {
2716 switch (protocol_version_) {
2717 case protocol_version::v3_1_1:
2718 ASYNC_MQTT_LOG("mqtt_impl", error)
2719 << ASYNC_MQTT_ADD_VALUE(address, this)
2720 << "pingreq recv timeout. close.";
2721 close(
2722 []{}
2723 );
2724 break;
2725 case protocol_version::v5:
2726 ASYNC_MQTT_LOG("mqtt_impl", error)
2727 << ASYNC_MQTT_ADD_VALUE(address, this)
2728 << "pingreq recv timeout. close.";
2729 send(
2730 v5::disconnect_packet{
2731 disconnect_reason_code::keep_alive_timeout,
2732 properties{}
2733 },
2734 [this](system_error const&){
2735 close(
2736 []{}
2737 );
2738 }
2739 );
2740 break;
2741 default:
2742 BOOST_ASSERT(false);
2743 break;
2744 }
2745 }
2746 }
2747 }
2748 );
2749 }
2750 }
2751
2752 void reset_pingresp_recv_timer() {
2753 BOOST_ASSERT(in_strand());
2754 if (pingresp_recv_timeout_ms_) {
2755 tim_pingresp_recv_->cancel();
2756 if (status_ == connection_status::disconnecting ||
2757 status_ == connection_status::closing ||
2758 status_ == connection_status::closed) return;
2759 tim_pingresp_recv_->expires_after(
2760 std::chrono::milliseconds{*pingresp_recv_timeout_ms_}
2761 );
2762 tim_pingresp_recv_->async_wait(
2763 [this, wp = std::weak_ptr{tim_pingresp_recv_}](error_code const& ec) {
2764 if (!ec) {
2765 if (auto sp = wp.lock()) {
2766 switch (protocol_version_) {
2767 case protocol_version::v3_1_1:
2768 ASYNC_MQTT_LOG("mqtt_impl", error)
2769 << ASYNC_MQTT_ADD_VALUE(address, this)
2770 << "pingresp recv timeout. close.";
2771 close(
2772 []{}
2773 );
2774 break;
2775 case protocol_version::v5:
2776 ASYNC_MQTT_LOG("mqtt_impl", error)
2777 << ASYNC_MQTT_ADD_VALUE(address, this)
2778 << "pingresp recv timeout. close.";
2779 if (status_ == connection_status::connected) {
2780 send(
2781 v5::disconnect_packet{
2782 disconnect_reason_code::keep_alive_timeout,
2783 properties{}
2784 },
2785 [this](system_error const&){
2786 close(
2787 []{}
2788 );
2789 }
2790 );
2791 }
2792 else {
2793 close(
2794 []{}
2795 );
2796 }
2797 break;
2798 default:
2799 BOOST_ASSERT(false);
2800 break;
2801 }
2802 }
2803 }
2804 }
2805 );
2806 }
2807 }
2808
2809 template <typename CompletionToken>
2810 void add_retry(
2811 CompletionToken&& token
2812 ) {
2813 auto tim = std::make_shared<as::steady_timer>(stream_->raw_strand());
2814 tim->expires_at(std::chrono::steady_clock::time_point::max());
2815 tim->async_wait(
2816 as::bind_executor(
2817 stream_->raw_strand(),
2818 std::forward<CompletionToken>(token)
2819 )
2820 );
2821 tim_retry_acq_pid_queue_.emplace_back(force_move(tim));
2822 }
2823
2824 void notify_retry_one() {
2825 for (auto it = tim_retry_acq_pid_queue_.begin();
2826 it != tim_retry_acq_pid_queue_.end();
2827 ++it
2828 ) {
2829 if (it->cancelled) continue;
2830 it->tim->cancel();
2831 it->cancelled = true;
2832 return;
2833 }
2834 }
2835
2836 void complete_retry_one() {
2837 if (!tim_retry_acq_pid_queue_.empty()) {
2838 tim_retry_acq_pid_queue_.pop_front();
2839 }
2840 }
2841
2842 void notify_retry_all() {
2843 tim_retry_acq_pid_queue_.clear();
2844 }
2845
2846 bool has_retry() const {
2847 return !tim_retry_acq_pid_queue_.empty();
2848 }
2849
2850 void clear_pid_man() {
2851 pid_man_.clear();
2852 notify_retry_all();
2853 }
2854
2855 void release_pid(packet_id_t pid) {
2856 pid_man_.release_id(pid);
2857 notify_retry_one();
2858 }
2859
2860private:
2861 protocol_version protocol_version_;
2862 std::shared_ptr<stream_type> stream_;
2863 packet_id_manager<packet_id_t> pid_man_;
2864 std::set<packet_id_t> pid_suback_;
2865 std::set<packet_id_t> pid_unsuback_;
2866 std::set<packet_id_t> pid_puback_;
2867 std::set<packet_id_t> pid_pubrec_;
2868 std::set<packet_id_t> pid_pubcomp_;
2869
2870 bool need_store_ = false;
2871 store<PacketIdBytes, as::strand<as::any_io_executor>> store_{stream_->raw_strand()};
2872
2873 bool auto_pub_response_ = false;
2874 bool auto_ping_response_ = false;
2875
2876 bool auto_map_topic_alias_send_ = false;
2877 bool auto_replace_topic_alias_send_ = false;
2878 optional<topic_alias_send> topic_alias_send_;
2879 optional<topic_alias_recv> topic_alias_recv_;
2880
2881 receive_maximum_t publish_send_max_{receive_maximum_max};
2882 receive_maximum_t publish_recv_max_{receive_maximum_max};
2883 receive_maximum_t publish_send_count_{0};
2884
2885 std::set<packet_id_t> publish_recv_;
2886 std::deque<v5::basic_publish_packet<PacketIdBytes>> publish_queue_;
2887
2888 ioc_queue close_queue_;
2889
2890 std::uint32_t maximum_packet_size_send_{packet_size_no_limit};
2891 std::uint32_t maximum_packet_size_recv_{packet_size_no_limit};
2892
2893 connection_status status_{connection_status::closed};
2894
2895 optional<std::size_t> pingreq_send_interval_ms_;
2896 optional<std::size_t> pingreq_recv_timeout_ms_;
2897 optional<std::size_t> pingresp_recv_timeout_ms_;
2898
2899 std::shared_ptr<as::steady_timer> tim_pingreq_send_{std::make_shared<as::steady_timer>(stream_->raw_strand())};
2900 std::shared_ptr<as::steady_timer> tim_pingreq_recv_{std::make_shared<as::steady_timer>(stream_->raw_strand())};
2901 std::shared_ptr<as::steady_timer> tim_pingresp_recv_{std::make_shared<as::steady_timer>(stream_->raw_strand())};
2902
2903 std::set<packet_id_t> qos2_publish_handled_;
2904
2905 bool recv_processing_ = false;
2906 std::set<packet_id_t> qos2_publish_processing_;
2907
2908 struct tim_cancelled {
2909 tim_cancelled(
2910 std::shared_ptr<as::steady_timer> tim,
2911 bool cancelled = false
2912 ):tim{force_move(tim)}, cancelled{cancelled}
2913 {}
2914 std::shared_ptr<as::steady_timer> tim;
2915 bool cancelled;
2916 };
2917 std::deque<tim_cancelled> tim_retry_acq_pid_queue_;
2918};
2919
2926template <role Role, typename NextLayer>
2928
2929} // namespace async_mqtt
2930
2931#endif // ASYNC_MQTT_ENDPOINT_HPP
MQTT endpoint corresponding to the connection.
Definition endpoint.hpp:67
std::set< packet_id_t > get_qos2_publish_handled_pids() const
Get processed but not released QoS2 packet ids This function should be called after disconnection.
Definition endpoint.hpp:679
auto get_stored_packets(CompletionToken &&token) const
get stored packets sotred packets mean inflight packets.
Definition endpoint.hpp:582
auto register_packet_id(packet_id_t packet_id, CompletionToken &&token)
register packet_id.
Definition endpoint.hpp:341
bool in_strand() const
strand checker
Definition endpoint.hpp:176
void regulate_for_store(v5::basic_publish_packet< PacketIdBytes > &packet) const
Regulate publish packet for store If topic is empty, extract topic from topic alias,...
Definition endpoint.hpp:782
strand_type const & strand() const
strand getter
Definition endpoint.hpp:160
void restore_qos2_publish_handled_pids(std::set< packet_id_t > pids)
Restore processed but not released QoS2 packet ids This function should be called before receive the ...
Definition endpoint.hpp:693
void set_auto_map_topic_alias_send(bool val)
auto map (allocate) topic alias on send PUBLISH packet. If all topic aliases are used,...
Definition endpoint.hpp:241
auto recv(std::set< control_packet_type > types, CompletionToken &&token)
receive packet users CANNOT call recv() before the previous recv()'s CompletionToken is invoked if pa...
Definition endpoint.hpp:460
void set_auto_ping_response(bool val)
auto pingreq response setter. Should be called before send()/recv() call.
Definition endpoint.hpp:227
void set_auto_pub_response(bool val)
auto publish response setter. Should be called before send()/recv() call.
Definition endpoint.hpp:215
std::vector< basic_store_packet_variant< PacketIdBytes > > get_stored_packets() const
get stored packets sotred packets mean inflight packets.
Definition endpoint.hpp:740
typename packet_id_type< PacketIdBytes >::type packet_id_t
Type of MQTT Packet Identifier.
Definition endpoint.hpp:125
auto recv(CompletionToken &&token)
receive packet users CANNOT call recv() before the previous recv()'s CompletionToken is invoked
Definition endpoint.hpp:429
auto restore_packets(std::vector< basic_store_packet_variant< PacketIdBytes > > pvs, CompletionToken &&token)
restore packets the restored packets would automatically send when CONNACK packet is received
Definition endpoint.hpp:551
void restore_packets(std::vector< basic_store_packet_variant< PacketIdBytes > > pvs)
restore packets the restored packets would automatically send when CONNACK packet is received
Definition endpoint.hpp:707
auto acquire_unique_packet_id(CompletionToken &&token)
acuire unique packet_id.
Definition endpoint.hpp:289
void set_auto_replace_topic_alias_send(bool val)
auto replace topic with corresponding topic alias on send PUBLISH packet. Registering topic alias nee...
Definition endpoint.hpp:255
optional< packet_id_t > acquire_unique_packet_id()
acuire unique packet_id.
Definition endpoint.hpp:629
auto acquire_unique_packet_id_wait_until(CompletionToken &&token)
acuire unique packet_id. If packet_id is fully acquired, then wait until released.
Definition endpoint.hpp:315
typename stream_type::strand_type strand_type
The type of stand that is used MQTT stream exclusive control.
Definition endpoint.hpp:117
auto & lowest_layer()
lowest_layer getter
Definition endpoint.hpp:206
void release_packet_id(packet_id_t pid)
release packet_id.
Definition endpoint.hpp:665
bool register_packet_id(packet_id_t pid)
register packet_id.
Definition endpoint.hpp:651
protocol_version get_protocol_version() const
get MQTT protocol version
Definition endpoint.hpp:753
auto recv(filter fil, std::set< control_packet_type > types, CompletionToken &&token)
receive packet users CANNOT call recv() before the previous recv()'s CompletionToken is invoked if pa...
Definition endpoint.hpp:495
auto release_packet_id(packet_id_t packet_id, CompletionToken &&token)
release packet_id.
Definition endpoint.hpp:369
auto const & lowest_layer() const
lowest_layer getter
Definition endpoint.hpp:199
auto close(CompletionToken &&token)
close the underlying connection
Definition endpoint.hpp:526
strand_type & strand()
strand getter
Definition endpoint.hpp:168
next_layer_type & next_layer()
next_layer getter
Definition endpoint.hpp:191
auto send(Packet packet, CompletionToken &&token)
send packet users can call send() before the previous send()'s CompletionToken is invoked
Definition endpoint.hpp:398
next_layer_type const & next_layer() const
next_layer getter
Definition endpoint.hpp:184
void set_pingresp_recv_timeout_ms(std::size_t ms)
Set timeout for receiving PINGRESP packet after PINGREQ packet is sent. If the timer is fired,...
Definition endpoint.hpp:271
bool is_publish_processing(packet_id_t pid) const
Get MQTT PUBLISH packet processing status.
Definition endpoint.hpp:767
static constexpr std::size_t packet_id_bytes
The value given as PacketIdBytes.
Definition endpoint.hpp:119
static std::shared_ptr< this_type > create(protocol_version ver, Args &&... args)
create
Definition endpoint.hpp:138
Definition packet_variant.hpp:49
auto visit(Func &&func) const &
visit to variant
Definition packet_variant.hpp:74
Definition property.hpp:714
role
MQTT endpoint connection role.
Definition endpoint.hpp:34
@ client
as client. Can't send CONNACK, SUBACK, UNSUBACK, PINGRESP. Can send Other packets.
filter
receive packet filter
Definition endpoint.hpp:44
@ except
no matched control_packet_type is target
@ match
matched control_packet_type is target
protocol_version
MQTT protocol version.
Definition protocol_version.hpp:20
@ send
Always send. Same as MQTT v.3.1.1.