async_mqtt 5.0.0
Loading...
Searching...
No Matches
endpoint.hpp
Go to the documentation of this file.
1// Copyright Takatoshi Kondo 2022
2//
3// Distributed under the Boost Software License, Version 1.0.
4// (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7#if !defined(ASYNC_MQTT_ENDPOINT_HPP)
8#define ASYNC_MQTT_ENDPOINT_HPP
9
10#include <set>
11#include <deque>
12#include <atomic>
13
14#include <async_mqtt/packet/packet_variant.hpp>
15#include <async_mqtt/util/value_allocator.hpp>
16#include <async_mqtt/util/make_shared_helper.hpp>
17#include <async_mqtt/null_strand.hpp>
18#include <async_mqtt/stream.hpp>
19#include <async_mqtt/store.hpp>
20#include <async_mqtt/log.hpp>
21#include <async_mqtt/topic_alias_send.hpp>
22#include <async_mqtt/topic_alias_recv.hpp>
23#include <async_mqtt/packet_id_manager.hpp>
25#include <async_mqtt/buffer_to_packet_variant.hpp>
26#include <async_mqtt/packet/packet_traits.hpp>
27
29
30namespace async_mqtt {
31
35enum class role {
36 client = 0b01,
37 server = 0b10,
39 any = 0b11,
40};
41
45enum class filter {
46 match,
47 except
48};
49
50template <typename Self>
51auto bind_dispatch(Self&& self) {
52 auto exe = as::get_associated_executor(self);
53 return as::dispatch(
54 as::bind_executor(
55 exe,
56 std::forward<Self>(self)
57 )
58 );
59}
60
69template <role Role, std::size_t PacketIdBytes, template <typename> typename Strand, typename NextLayer>
70class basic_endpoint : public std::enable_shared_from_this<basic_endpoint<Role, PacketIdBytes, Strand, NextLayer>>{
71
72 enum class connection_status {
73 connecting,
74 connected,
75 disconnecting,
76 closing,
77 closed
78 };
79
80 static constexpr bool can_send_as_client(role r) {
81 return static_cast<int>(r) & static_cast<int>(role::client);
82 }
83
84 static constexpr bool can_send_as_server(role r) {
85 return static_cast<int>(r) & static_cast<int>(role::server);
86 }
87
88 static inline optional<topic_alias_t> get_topic_alias(properties const& props) {
90 for (auto const& prop : props) {
91 prop.visit(
92 overload {
93 [&](property::topic_alias const& p) {
94 ta_opt.emplace(p.val());
95 },
96 [](auto const&) {
97 }
98 }
99 );
100 if (ta_opt) return ta_opt;
101 }
102 return ta_opt;
103 }
104
106 using this_type_sp = std::shared_ptr<this_type>;
107 using this_type_wp = std::weak_ptr<this_type>;
108 using stream_type =
109 stream<
110 NextLayer,
111 Strand
112 >;
113
114 template <typename T>
115 friend class make_shared_helper;
116
117public:
121 using strand_type = typename stream_type::strand_type;
123 static constexpr std::size_t packet_id_bytes = PacketIdBytes;
124
127
129 using packet_id_t = typename packet_id_type<PacketIdBytes>::type;
130
141 template <typename... Args>
142 static std::shared_ptr<this_type> create(
144 Args&&... args
145 ) {
146 return make_shared_helper<this_type>::make_shared(ver, std::forward<Args>(args)...);
147 }
148
150 ASYNC_MQTT_LOG("mqtt_impl", trace)
151 << ASYNC_MQTT_ADD_VALUE(address, this)
152 << "destroy";
153 }
154
155 basic_endpoint(this_type const&) = delete;
156 basic_endpoint(this_type&&) = delete;
157 this_type& operator=(this_type const&) = delete;
158 this_type& operator=(this_type&&) = delete;
159
164 strand_type const& strand() const {
165 return stream_->strand();
166 }
167
173 return stream_->strand();
174 }
175
180 bool in_strand() const {
181 return stream_->in_strand();
182 }
183
189 return stream_->next_layer();
190 }
196 return stream_->next_layer();
197 }
198
203 auto const& lowest_layer() const {
204 return stream_->lowest_layer();
205 }
210 auto& lowest_layer() {
211 return stream_->lowest_layer();
212 }
213
219 void set_auto_pub_response(bool val) {
220 ASYNC_MQTT_LOG("mqtt_api", info)
221 << ASYNC_MQTT_ADD_VALUE(address, this)
222 << "set_auto_pub_response val:" << val;
223 auto_pub_response_ = val;
224 }
225
231 void set_auto_ping_response(bool val) {
232 ASYNC_MQTT_LOG("mqtt_api", info)
233 << ASYNC_MQTT_ADD_VALUE(address, this)
234 << "set_auto_ping_response val:" << val;
235 auto_ping_response_ = val;
236 }
237
246 ASYNC_MQTT_LOG("mqtt_api", info)
247 << ASYNC_MQTT_ADD_VALUE(address, this)
248 << "set_auto_map_topic_alias_send val:" << val;
249 auto_map_topic_alias_send_ = val;
250 }
251
260 ASYNC_MQTT_LOG("mqtt_api", info)
261 << ASYNC_MQTT_ADD_VALUE(address, this)
262 << "set_auto_replace_topic_alias_send val:" << val;
263 auto_replace_topic_alias_send_ = val;
264 }
265
276 if (ms == 0) {
277 pingresp_recv_timeout_ms_ = nullopt;
278 }
279 else {
280 pingresp_recv_timeout_ms_.emplace(ms);
281 }
282 }
283
293 void set_bulk_write(bool val) {
294 stream_->set_bulk_write(val);
295 }
296
297
298 // async functions
299
305 template <typename CompletionToken>
306 auto
309 ) {
310 ASYNC_MQTT_LOG("mqtt_api", info)
311 << ASYNC_MQTT_ADD_VALUE(address, this)
312 << "acquire_unique_packet_id";
313 return
314 as::async_compose<
317 >(
318 acquire_unique_packet_id_impl{
319 *this
320 },
321 token
322 );
323 }
324
331 template <typename CompletionToken>
332 auto
335 ) {
336 ASYNC_MQTT_LOG("mqtt_api", info)
337 << ASYNC_MQTT_ADD_VALUE(address, this)
338 << "acquire_unique_packet_id_wait_until";
339 return
340 as::async_compose<
343 >(
344 acquire_unique_packet_id_wait_until_impl{
345 *this
346 },
347 token
348 );
349 }
350
357 template <typename CompletionToken>
358 auto
360 packet_id_t packet_id,
362 ) {
363 ASYNC_MQTT_LOG("mqtt_api", info)
364 << ASYNC_MQTT_ADD_VALUE(address, this)
365 << "register_packet_id pid:" << packet_id;
366 return
367 as::async_compose<
369 void(bool)
370 >(
371 register_packet_id_impl{
372 *this,
373 packet_id
374 },
375 token
376 );
377 }
378
385 template <typename CompletionToken>
386 auto
388 packet_id_t packet_id,
390 ) {
391 ASYNC_MQTT_LOG("mqtt_api", info)
392 << ASYNC_MQTT_ADD_VALUE(address, this)
393 << "release_packet_id pid:" << packet_id;
394 return
395 as::async_compose<
397 void()
398 >(
399 release_packet_id_impl{
400 *this,
401 packet_id
402 },
403 token
404 );
405 }
406
414 template <typename Packet, typename CompletionToken>
415 auto
417 Packet packet,
419 ) {
420 ASYNC_MQTT_LOG("mqtt_api", info)
421 << ASYNC_MQTT_ADD_VALUE(address, this)
422 << "send:" << packet;
423 if constexpr(!std::is_same_v<Packet, basic_packet_variant<PacketIdBytes>>) {
424 static_assert(
425 (can_send_as_client(Role) && is_client_sendable<std::decay_t<Packet>>()) ||
426 (can_send_as_server(Role) && is_server_sendable<std::decay_t<Packet>>()),
427 "Packet cannot be send by MQTT protocol"
428 );
429 }
430
431 return
432 send(
433 force_move(packet),
434 false, // not from queue
435 std::forward<CompletionToken>(token)
436 );
437 }
438
445 template <typename CompletionToken>
446 auto
449 ) {
450 ASYNC_MQTT_LOG("mqtt_api", info)
451 << ASYNC_MQTT_ADD_VALUE(address, this)
452 << "recv";
453 BOOST_ASSERT(!recv_processing_);
454 recv_processing_ = true;
455 return
456 as::async_compose<
459 >(
460 recv_impl{
461 *this
462 },
463 token
464 );
465 }
466
476 template <typename CompletionToken>
477 auto
479 std::set<control_packet_type> types,
481 ) {
482 ASYNC_MQTT_LOG("mqtt_api", info)
483 << ASYNC_MQTT_ADD_VALUE(address, this)
484 << "recv";
485 BOOST_ASSERT(!recv_processing_);
486 recv_processing_ = true;
487 return
488 as::async_compose<
491 >(
492 recv_impl{
493 *this,
494 filter::match,
495 force_move(types)
496 },
497 token
498 );
499 }
500
511 template <typename CompletionToken>
512 auto
514 filter fil,
515 std::set<control_packet_type> types,
517 ) {
518 ASYNC_MQTT_LOG("mqtt_api", info)
519 << ASYNC_MQTT_ADD_VALUE(address, this)
520 << "recv";
521 BOOST_ASSERT(!recv_processing_);
522 recv_processing_ = true;
523 return
524 as::async_compose<
527 >(
528 recv_impl{
529 *this,
530 fil,
531 force_move(types)
532 },
533 token
534 );
535 }
536
542 template<typename CompletionToken>
543 auto
545 ASYNC_MQTT_LOG("mqtt_api", info)
546 << ASYNC_MQTT_ADD_VALUE(address, this)
547 << "close";
548 return
549 as::async_compose<
551 void()
552 >(
553 close_impl{
554 *this
555 },
556 token
557 );
558 }
559
567 template <typename CompletionToken>
568 auto
572 ) {
573 ASYNC_MQTT_LOG("mqtt_api", info)
574 << ASYNC_MQTT_ADD_VALUE(address, this)
575 << "restore_packets";
576 return
577 as::async_compose<
579 void()
580 >(
581 restore_packets_impl{
582 *this,
583 force_move(pvs)
584 },
585 token
586 );
587 }
588
598 template <typename CompletionToken>
599 auto
602 ) const {
603 ASYNC_MQTT_LOG("mqtt_api", info)
604 << ASYNC_MQTT_ADD_VALUE(address, this)
605 << "get_stored_packets";
606 return
607 as::async_compose<
610 >(
611 get_stored_packets_impl{
612 *this
613 },
614 token
615 );
616 }
617
618 template <typename CompletionToken>
619 auto
620 regulate_for_store(
621 v5::basic_publish_packet<PacketIdBytes> packet,
623 ) const {
624 ASYNC_MQTT_LOG("mqtt_api", info)
625 << ASYNC_MQTT_ADD_VALUE(address, this)
626 << "regulate_for_store:" << packet;
627 return
628 as::async_compose<
630 void(v5::basic_publish_packet<PacketIdBytes>)
631 >(
632 regulate_for_store_impl{
633 *this,
634 force_move(packet)
635 },
636 token
637 );
638 }
639
640 // sync APIs that require working on strand
641
649 auto pid = pid_man_.acquire_unique_id();
650 if (pid) {
651 ASYNC_MQTT_LOG("mqtt_api", info)
652 << ASYNC_MQTT_ADD_VALUE(address, this)
653 << "acquire_unique_packet_id:" << *pid;
654 }
655 else {
656 ASYNC_MQTT_LOG("mqtt_api", info)
657 << ASYNC_MQTT_ADD_VALUE(address, this)
658 << "acquire_unique_packet_id:full";
659 }
660 return pid;
661 }
662
671 auto ret = pid_man_.register_id(pid);
672 ASYNC_MQTT_LOG("mqtt_api", info)
673 << ASYNC_MQTT_ADD_VALUE(address, this)
674 << "register_packet_id:" << pid << " result:" << ret;
675 return ret;
676 }
677
685 ASYNC_MQTT_LOG("mqtt_api", info)
686 << ASYNC_MQTT_ADD_VALUE(address, this)
687 << "release_packet_id:" << pid;
688 release_pid(pid);
689 }
690
697 std::set<packet_id_t> get_qos2_publish_handled_pids() const {
699 ASYNC_MQTT_LOG("mqtt_api", info)
700 << ASYNC_MQTT_ADD_VALUE(address, this)
701 << "get_qos2_publish_handled_pids";
702 return qos2_publish_handled_;
703 }
704
711 void restore_qos2_publish_handled_pids(std::set<packet_id_t> pids) {
713 ASYNC_MQTT_LOG("mqtt_api", info)
714 << ASYNC_MQTT_ADD_VALUE(address, this)
715 << "restore_qos2_publish_handled_pids";
716 qos2_publish_handled_ = force_move(pids);
717 }
718
727 ) {
729 ASYNC_MQTT_LOG("mqtt_api", info)
730 << ASYNC_MQTT_ADD_VALUE(address, this)
731 << "restore_packets";
732 for (auto& pv : pvs) {
733 pv.visit(
734 [&](auto& p) {
735 if (pid_man_.register_id(p.packet_id())) {
736 store_.add(force_move(p));
737 }
738 else {
739 ASYNC_MQTT_LOG("mqtt_impl", error)
740 << ASYNC_MQTT_ADD_VALUE(address, this)
741 << "packet_id:" << p.packet_id()
742 << " has already been used. Skip it";
743 }
744 }
745 );
746 }
747 }
748
758 std::vector<basic_store_packet_variant<PacketIdBytes>> get_stored_packets() const {
760 ASYNC_MQTT_LOG("mqtt_api", info)
761 << ASYNC_MQTT_ADD_VALUE(address, this)
762 << "get_stored_packets";
763 return store_.get_stored();
764 }
765
773 ASYNC_MQTT_LOG("mqtt_api", info)
774 << ASYNC_MQTT_ADD_VALUE(address, this)
775 << "get_protocol_version:" << protocol_version_;
776 return protocol_version_;
777 }
778
787 ASYNC_MQTT_LOG("mqtt_api", info)
788 << ASYNC_MQTT_ADD_VALUE(address, this)
789 << "is_publish_processing:" << pid;
790 return qos2_publish_processing_.find(pid) != qos2_publish_processing_.end();
791 }
792
800 void regulate_for_store(v5::basic_publish_packet<PacketIdBytes>& packet) const {
802 ASYNC_MQTT_LOG("mqtt_api", info)
803 << ASYNC_MQTT_ADD_VALUE(address, this)
804 << "regulate_for_store:" << packet;
805 if (packet.topic().empty()) {
806 if (auto ta_opt = get_topic_alias(packet.props())) {
807 auto topic = topic_alias_send_->find_without_touch(*ta_opt);
808 if (!topic.empty()) {
809 packet.remove_topic_alias_add_topic(allocate_buffer(topic));
810 }
811 }
812 }
813 else {
814 packet.remove_topic_alias();
815 }
816 }
817
818 void cancel_all_timers_for_test() {
820 tim_pingreq_send_->cancel();
821 tim_pingreq_recv_->cancel();
822 tim_pingresp_recv_->cancel();
823 }
824
825 void set_pingreq_send_interval_ms_for_test(std::size_t ms) {
826 BOOST_ASSERT(in_strand());
827 pingreq_send_interval_ms_ = ms;
828 }
829
830private: // compose operation impl
831
841 template <typename... Args>
842 basic_endpoint(
843 protocol_version ver,
844 Args&&... args
845 ): protocol_version_{ver},
846 stream_{stream_type::create(std::forward<Args>(args)...)}
847 {
848 BOOST_ASSERT(
849 (Role == role::client && ver != protocol_version::undetermined) ||
850 Role != role::client
851 );
852
853 }
854
855 struct acquire_unique_packet_id_impl {
856 this_type& ep;
857 optional<packet_id_t> pid_opt = nullopt;
858 enum { dispatch, acquire, complete } state = dispatch;
859
860 template <typename Self>
861 void operator()(
862 Self& self
863 ) {
864 switch (state) {
865 case dispatch: {
866 state = acquire;
867 auto& a_ep{ep};
868 as::dispatch(
869 as::bind_executor(
870 a_ep.stream_->raw_strand(),
871 force_move(self)
872 )
873 );
874 } break;
875 case acquire: {
876 BOOST_ASSERT(ep.in_strand());
877 pid_opt = ep.pid_man_.acquire_unique_id();
878 state = complete;
879 bind_dispatch(force_move(self));
880 } break;
881 case complete:
882 self.complete(pid_opt);
883 break;
884 }
885 }
886 };
887
888 struct acquire_unique_packet_id_wait_until_impl {
889 this_type& ep;
890 this_type_wp retry_wp = ep.weak_from_this();
891 optional<packet_id_t> pid_opt = nullopt;
892 enum { dispatch, acquire, complete } state = dispatch;
893
894 template <typename Self>
895 void operator()(
896 Self& self,
897 error_code const& ec = error_code{}
898 ) {
899 if (retry_wp.expired()) return;
900 switch (state) {
901 case dispatch: {
902 state = acquire;
903 auto& a_ep{ep};
904 as::dispatch(
905 as::bind_executor(
906 a_ep.stream_->raw_strand(),
907 force_move(self)
908 )
909 );
910 } break;
911 case acquire: {
912 BOOST_ASSERT(ep.in_strand());
913 auto acq_proc =
914 [&] {
915 pid_opt = ep.pid_man_.acquire_unique_id();
916 if (pid_opt) {
917 state = complete;
918 bind_dispatch(force_move(self));
919 }
920 else {
921 ASYNC_MQTT_LOG("mqtt_impl", warning)
922 << ASYNC_MQTT_ADD_VALUE(address, &ep)
923 << "packet_id is fully allocated. waiting release";
924 // infinity timer. cancel is retry trigger.
925 auto& a_ep{ep};
926 a_ep.add_retry(
927 force_move(self)
928 );
929 }
930 };
931
932 if (ec == errc::operation_canceled) {
933 ep.complete_retry_one();
934 acq_proc();
935 }
936 else if (ep.has_retry()) {
937 ASYNC_MQTT_LOG("mqtt_impl", warning)
938 << ASYNC_MQTT_ADD_VALUE(address, &ep)
939 << "packet_id waiter exists. add the end of waiter queue";
940 // infinity timer. cancel is retry trigger.
941 auto& a_ep{ep};
942 a_ep.add_retry(
943 force_move(self)
944 );
945 }
946 else {
947 acq_proc();
948 }
949 } break;
950 case complete:
951 BOOST_ASSERT(pid_opt);
952 self.complete(*pid_opt);
953 break;
954 }
955 }
956 };
957
958 struct register_packet_id_impl {
959 this_type& ep;
960 packet_id_t packet_id;
961 bool result = false;
962 enum { dispatch, regi, complete } state = dispatch;
963
964 template <typename Self>
965 void operator()(
966 Self& self
967 ) {
968 switch (state) {
969 case dispatch: {
970 state = regi;
971 auto& a_ep{ep};
972 as::dispatch(
973 as::bind_executor(
974 a_ep.stream_->raw_strand(),
975 force_move(self)
976 )
977 );
978 } break;
979 case regi: {
980 BOOST_ASSERT(ep.in_strand());
981 result = ep.pid_man_.register_id(packet_id);
982 state = complete;
983 bind_dispatch(force_move(self));
984 } break;
985 case complete:
986 self.complete(result);
987 break;
988 }
989 }
990 };
991
992 struct release_packet_id_impl {
993 this_type& ep;
994 packet_id_t packet_id;
995 enum { dispatch, rel, complete } state = dispatch;
996
997 template <typename Self>
998 void operator()(
999 Self& self
1000 ) {
1001 switch (state) {
1002 case dispatch: {
1003 state = rel;
1004 auto& a_ep{ep};
1005 as::dispatch(
1006 as::bind_executor(
1007 a_ep.stream_->raw_strand(),
1008 force_move(self)
1009 )
1010 );
1011 } break;
1012 case rel: {
1013 BOOST_ASSERT(ep.in_strand());
1014 ep.release_pid(packet_id);
1015 state = complete;
1016 bind_dispatch(force_move(self));
1017 } break;
1018 case complete:
1019 self.complete();
1020 break;
1021 }
1022 }
1023 };
1024
1025
1026 template <typename Packet>
1027 struct send_impl {
1028 this_type& ep;
1029 Packet packet;
1030 bool from_queue = false;
1031 error_code last_ec = error_code{};
1032 enum { dispatch, write, bind, complete } state = dispatch;
1033
1034 template <typename Self>
1035 void operator()(
1036 Self& self,
1037 error_code const& ec = error_code{},
1038 std::size_t /*bytes_transferred*/ = 0
1039 ) {
1040 if (ec) {
1041 ASYNC_MQTT_LOG("mqtt_impl", info)
1042 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1043 << "send error:" << ec.message();
1044 last_ec = ec;
1045 state = complete;
1046 bind_dispatch(force_move(self));
1047 return;
1048 }
1049
1050 switch (state) {
1051 case dispatch: {
1052 state = write;
1053 auto& a_ep{ep};
1054 as::dispatch(
1055 as::bind_executor(
1056 a_ep.stream_->raw_strand(),
1057 force_move(self)
1058 )
1059 );
1060 } break;
1061 case write: {
1062 BOOST_ASSERT(ep.in_strand());
1063 state = bind;
1064 if constexpr(
1065 std::is_same_v<std::decay_t<Packet>, basic_packet_variant<PacketIdBytes>> ||
1066 std::is_same_v<std::decay_t<Packet>, basic_store_packet_variant<PacketIdBytes>>
1067 ) {
1068 packet.visit(
1069 overload {
1070 [&](auto& actual_packet) {
1071 if (process_send_packet(self, actual_packet)) {
1072 auto& a_ep{ep};
1073 a_ep.stream_->write_packet(
1074 force_move(actual_packet),
1075 force_move(self)
1076 );
1077 if constexpr(is_connack<std::remove_reference_t<decltype(actual_packet)>>()) {
1078 // server send stored packets after connack sent
1079 a_ep.send_stored();
1080 }
1081 if constexpr(Role == role::client) {
1082 a_ep.reset_pingreq_send_timer();
1083 }
1084 }
1085 },
1086 [&](system_error&) {}
1087 }
1088 );
1089 }
1090 else {
1091 if (process_send_packet(self, packet)) {
1092 auto& a_ep{ep};
1093 auto& a_packet{packet};
1094 a_ep.stream_->write_packet(
1095 force_move(a_packet),
1096 force_move(self)
1097 );
1098 if constexpr(is_connack<Packet>()) {
1099 // server send stored packets after connack sent
1100 a_ep.send_stored();
1101 }
1102 if constexpr(Role == role::client) {
1103 a_ep.reset_pingreq_send_timer();
1104 }
1105 }
1106 }
1107 } break;
1108 case bind: {
1109 BOOST_ASSERT(ep.in_strand());
1110 last_ec = ec;
1111 state = complete;
1112 bind_dispatch(force_move(self));
1113 } break;
1114 case complete:
1115 // out of strand
1116 self.complete(last_ec);
1117 break;
1118 }
1119 }
1120
1121 template <typename Self, typename ActualPacket>
1122 bool process_send_packet(Self& self, ActualPacket& actual_packet) {
1123 // MQTT protocol sendable packet check
1124 if (
1125 !(
1126 (can_send_as_client(Role) && is_client_sendable<std::decay_t<ActualPacket>>()) ||
1127 (can_send_as_server(Role) && is_server_sendable<std::decay_t<ActualPacket>>())
1128 )
1129 ) {
1130 self.complete(
1131 make_error(
1132 errc::protocol_error,
1133 "packet cannot be send by MQTT protocol"
1134 )
1135 );
1136 return false;
1137 }
1138
1139 auto version_check =
1140 [&] {
1141 if (ep.protocol_version_ == protocol_version::v3_1_1 && is_v3_1_1<ActualPacket>()) {
1142 return true;
1143 }
1144 if (ep.protocol_version_ == protocol_version::v5 && is_v5<ActualPacket>()) {
1145 return true;
1146 }
1147 return false;
1148 };
1149
1150 // connection status check
1151 if constexpr(is_connect<ActualPacket>()) {
1152 if (ep.status_ != connection_status::closed) {
1153 self.complete(
1154 make_error(
1155 errc::protocol_error,
1156 "connect_packet can only be send on connection_status::closed"
1157 )
1158 );
1159 return false;
1160 }
1161 if (!version_check()) {
1162 self.complete(
1163 make_error(
1164 errc::protocol_error,
1165 "protocol version mismatch"
1166 )
1167 );
1168 return false;
1169 }
1170 }
1171 else if constexpr(is_connack<ActualPacket>()) {
1172 if (ep.status_ != connection_status::connecting) {
1173 self.complete(
1174 make_error(
1175 errc::protocol_error,
1176 "connack_packet can only be send on connection_status::connecting"
1177 )
1178 );
1179 return false;
1180 }
1181 if (!version_check()) {
1182 self.complete(
1183 make_error(
1184 errc::protocol_error,
1185 "protocol version mismatch"
1186 )
1187 );
1188 return false;
1189 }
1190 }
1191 else if constexpr(std::is_same_v<v5::auth_packet, Packet>) {
1192 if (ep.status_ != connection_status::connected &&
1193 ep.status_ != connection_status::connecting) {
1194 self.complete(
1195 make_error(
1196 errc::protocol_error,
1197 "auth packet can only be send on connection_status::connecting or status::connected"
1198 )
1199 );
1200 return false;
1201 }
1202 if (!version_check()) {
1203 self.complete(
1204 make_error(
1205 errc::protocol_error,
1206 "protocol version mismatch"
1207 )
1208 );
1209 return false;
1210 }
1211 }
1212 else {
1213 if (ep.status_ != connection_status::connected) {
1214 if constexpr(!is_publish<std::decay_t<ActualPacket>>()) {
1215 self.complete(
1216 make_error(
1217 errc::protocol_error,
1218 "packet can only be send on connection_status::connected"
1219 )
1220 );
1221 return false;
1222 }
1223 }
1224 if (!version_check()) {
1225 self.complete(
1226 make_error(
1227 errc::protocol_error,
1228 "protocol version mismatch"
1229 )
1230 );
1231 return false;
1232 }
1233 }
1234
1235 // sending process
1236 bool topic_alias_validated = false;
1237
1238 if constexpr(std::is_same_v<v3_1_1::connect_packet, std::decay_t<ActualPacket>>) {
1239 ep.initialize();
1240 ep.status_ = connection_status::connecting;
1241 auto keep_alive = actual_packet.keep_alive();
1242 if (keep_alive != 0 && !ep.pingreq_send_interval_ms_) {
1243 ep.pingreq_send_interval_ms_.emplace(keep_alive * 1000);
1244 }
1245 if (actual_packet.clean_session()) {
1246 ep.clear_pid_man();
1247 ep.store_.clear();
1248 ep.need_store_ = false;
1249 }
1250 else {
1251 ep.need_store_ = true;
1252 }
1253 ep.topic_alias_send_ = nullopt;
1254 }
1255
1256 if constexpr(std::is_same_v<v5::connect_packet, std::decay_t<ActualPacket>>) {
1257 ep.initialize();
1258 ep.status_ = connection_status::connecting;
1259 auto keep_alive = actual_packet.keep_alive();
1260 if (keep_alive != 0 && !ep.pingreq_send_interval_ms_) {
1261 ep.pingreq_send_interval_ms_.emplace(keep_alive * 1000);
1262 }
1263 if (actual_packet.clean_start()) {
1264 ep.clear_pid_man();
1265 ep.store_.clear();
1266 }
1267 for (auto const& prop : actual_packet.props()) {
1268 prop.visit(
1269 overload {
1270 [&](property::topic_alias_maximum const& p) {
1271 if (p.val() != 0) {
1272 ep.topic_alias_recv_.emplace(p.val());
1273 }
1274 },
1275 [&](property::receive_maximum const& p) {
1276 BOOST_ASSERT(p.val() != 0);
1277 ep.publish_recv_max_ = p.val();
1278 },
1279 [&](property::maximum_packet_size const& p) {
1280 BOOST_ASSERT(p.val() != 0);
1281 ep.maximum_packet_size_recv_ = p.val();
1282 },
1283 [&](property::session_expiry_interval const& p) {
1284 if (p.val() != 0) {
1285 ep.need_store_ = true;
1286 }
1287 },
1288 [](auto const&){}
1289 }
1290 );
1291 }
1292 }
1293
1294 if constexpr(std::is_same_v<v3_1_1::connack_packet, std::decay_t<ActualPacket>>) {
1295 if (actual_packet.code() == connect_return_code::accepted) {
1296 ep.status_ = connection_status::connected;
1297 }
1298 else {
1299 ep.status_ = connection_status::disconnecting;
1300 }
1301 }
1302
1303 if constexpr(std::is_same_v<v5::connack_packet, std::decay_t<ActualPacket>>) {
1304 if (actual_packet.code() == connect_reason_code::success) {
1305 ep.status_ = connection_status::connected;
1306 for (auto const& prop : actual_packet.props()) {
1307 prop.visit(
1308 overload {
1309 [&](property::topic_alias_maximum const& p) {
1310 if (p.val() != 0) {
1311 ep.topic_alias_recv_.emplace(p.val());
1312 }
1313 },
1314 [&](property::receive_maximum const& p) {
1315 BOOST_ASSERT(p.val() != 0);
1316 ep.publish_recv_max_ = p.val();
1317 },
1318 [&](property::maximum_packet_size const& p) {
1319 BOOST_ASSERT(p.val() != 0);
1320 ep.maximum_packet_size_recv_ = p.val();
1321 },
1322 [](auto const&){}
1323 }
1324 );
1325 }
1326 }
1327 else {
1328 ep.status_ = connection_status::disconnecting;
1329 }
1330 }
1331
1332 // store publish/pubrel packet
1333 if constexpr(is_publish<std::decay_t<ActualPacket>>()) {
1334 if (actual_packet.opts().get_qos() == qos::at_least_once ||
1335 actual_packet.opts().get_qos() == qos::exactly_once
1336 ) {
1337 BOOST_ASSERT(ep.pid_man_.is_used_id(actual_packet.packet_id()));
1338 if (ep.need_store_) {
1339 if constexpr(is_instance_of<v5::basic_publish_packet, std::decay_t<ActualPacket>>::value) {
1340 auto ta_opt = get_topic_alias(actual_packet.props());
1341 if (actual_packet.topic().empty()) {
1342 auto topic_opt = validate_topic_alias(self, ta_opt);
1343 if (!topic_opt) {
1344 auto packet_id = actual_packet.packet_id();
1345 if (packet_id != 0) {
1346 ep.release_pid(packet_id);
1347 }
1348 return false;
1349 }
1350 topic_alias_validated = true;
1351 auto props = actual_packet.props();
1352 auto it = props.cbegin();
1353 auto end = props.cend();
1354 for (; it != end; ++it) {
1355 if (it->id() == property::id::topic_alias) {
1356 props.erase(it);
1357 break;
1358 }
1359 }
1360
1361 auto store_packet =
1362 ActualPacket(
1363 actual_packet.packet_id(),
1364 allocate_buffer(*topic_opt),
1365 actual_packet.payload(),
1366 actual_packet.opts(),
1367 force_move(props)
1368 );
1369 if (!validate_maximum_packet_size(self, store_packet)) {
1370 auto packet_id = actual_packet.packet_id();
1371 if (packet_id != 0) {
1372 ep.release_pid(packet_id);
1373 }
1374 return false;
1375 }
1376 // add new packet that doesn't have topic_aliass to store
1377 // the original packet still use topic alias to send
1378 store_packet.set_dup(true);
1379 ep.store_.add(force_move(store_packet));
1380 }
1381 else {
1382 auto props = actual_packet.props();
1383 auto it = props.cbegin();
1384 auto end = props.cend();
1385 for (; it != end; ++it) {
1386 if (it->id() == property::id::topic_alias) {
1387 props.erase(it);
1388 break;
1389 }
1390 }
1391
1392 auto store_packet =
1393 ActualPacket(
1394 actual_packet.packet_id(),
1395 actual_packet.topic(),
1396 actual_packet.payload(),
1397 actual_packet.opts(),
1398 force_move(props)
1399 );
1400 if (!validate_maximum_packet_size(self, store_packet)) {
1401 auto packet_id = actual_packet.packet_id();
1402 if (packet_id != 0) {
1403 ep.release_pid(packet_id);
1404 }
1405 return false;
1406 }
1407 store_packet.set_dup(true);
1408 ep.store_.add(force_move(store_packet));
1409 }
1410 }
1411 else {
1412 if (!validate_maximum_packet_size(self, actual_packet)) {
1413 auto packet_id = actual_packet.packet_id();
1414 if (packet_id != 0) {
1415 ep.release_pid(packet_id);
1416 }
1417 return false;
1418 }
1419 auto store_packet{actual_packet};
1420 store_packet.set_dup(true);
1421 ep.store_.add(force_move(store_packet));
1422 }
1423 }
1424 if (actual_packet.opts().get_qos() == qos::exactly_once) {
1425 ep.qos2_publish_processing_.insert(actual_packet.packet_id());
1426 ep.pid_pubrec_.insert(actual_packet.packet_id());
1427 }
1428 else {
1429 ep.pid_puback_.insert(actual_packet.packet_id());
1430 }
1431 }
1432 }
1433
1434 if constexpr(is_instance_of<v5::basic_publish_packet, std::decay_t<ActualPacket>>::value) {
1435 // apply topic_alias
1436 auto ta_opt = get_topic_alias(actual_packet.props());
1437 if (actual_packet.topic().empty()) {
1438 if (!topic_alias_validated &&
1439 !validate_topic_alias(self, ta_opt)) {
1440 auto packet_id = actual_packet.packet_id();
1441 if (packet_id != 0) {
1442 ep.release_pid(packet_id);
1443 }
1444 return false;
1445 }
1446 // use topic_alias set by user
1447 }
1448 else {
1449 if (ta_opt) {
1450 if (validate_topic_alias_range(self, *ta_opt)) {
1451 ASYNC_MQTT_LOG("mqtt_impl", trace)
1452 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1453 << "topia alias : "
1454 << actual_packet.topic() << " - " << *ta_opt
1455 << " is registered." ;
1456 ep.topic_alias_send_->insert_or_update(actual_packet.topic(), *ta_opt);
1457 }
1458 else {
1459 auto packet_id = actual_packet.packet_id();
1460 if (packet_id != 0) {
1461 ep.release_pid(packet_id);
1462 }
1463 return false;
1464 }
1465 }
1466 else if (ep.auto_map_topic_alias_send_) {
1467 if (ep.topic_alias_send_) {
1468 if (auto ta_opt = ep.topic_alias_send_->find(actual_packet.topic())) {
1469 ASYNC_MQTT_LOG("mqtt_impl", trace)
1470 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1471 << "topia alias : " << actual_packet.topic() << " - " << *ta_opt
1472 << " is found." ;
1473 actual_packet.remove_topic_add_topic_alias(*ta_opt);
1474 }
1475 else {
1476 auto lru_ta = ep.topic_alias_send_->get_lru_alias();
1477 ep.topic_alias_send_->insert_or_update(actual_packet.topic(), lru_ta); // remap topic alias
1478 actual_packet.add_topic_alias(lru_ta);
1479 }
1480 }
1481 }
1482 else if (ep.auto_replace_topic_alias_send_) {
1483 if (ep.topic_alias_send_) {
1484 if (auto ta_opt = ep.topic_alias_send_->find(actual_packet.topic())) {
1485 ASYNC_MQTT_LOG("mqtt_impl", trace)
1486 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1487 << "topia alias : " << actual_packet.topic() << " - " << *ta_opt
1488 << " is found." ;
1489 actual_packet.remove_topic_add_topic_alias(*ta_opt);
1490 }
1491 }
1492 }
1493 }
1494
1495 // receive_maximum for sending
1496 if (!from_queue && ep.enqueue_publish(actual_packet)) {
1497 self.complete(
1498 make_error(
1499 errc::success,
1500 "publish_packet is enqueued due to receive_maximum for sending"
1501 )
1502 );
1503 return false;
1504 }
1505 }
1506
1507 if constexpr(is_instance_of<v5::basic_puback_packet, std::decay_t<ActualPacket>>::value) {
1508 ep.publish_recv_.erase(actual_packet.packet_id());
1509 }
1510
1511
1512 if constexpr(is_instance_of<v5::basic_pubrec_packet, std::decay_t<ActualPacket>>::value) {
1513 if (is_error(actual_packet.code())) {
1514 ep.publish_recv_.erase(actual_packet.packet_id());
1515 ep.qos2_publish_handled_.erase(actual_packet.packet_id());
1516 }
1517 }
1518
1519 if constexpr(is_pubrel<std::decay_t<ActualPacket>>()) {
1520 BOOST_ASSERT(ep.pid_man_.is_used_id(actual_packet.packet_id()));
1521 if (ep.need_store_) ep.store_.add(actual_packet);
1522 ep.pid_pubcomp_.insert(actual_packet.packet_id());
1523 }
1524
1525 if constexpr(is_instance_of<v5::basic_pubcomp_packet, std::decay_t<ActualPacket>>::value) {
1526 ep.publish_recv_.erase(actual_packet.packet_id());
1527 }
1528
1529 if constexpr(is_subscribe<std::decay_t<ActualPacket>>()) {
1530 BOOST_ASSERT(ep.pid_man_.is_used_id(actual_packet.packet_id()));
1531 ep.pid_suback_.insert(actual_packet.packet_id());
1532 }
1533
1534 if constexpr(is_unsubscribe<std::decay_t<ActualPacket>>()) {
1535 BOOST_ASSERT(ep.pid_man_.is_used_id(actual_packet.packet_id()));
1536 ep.pid_unsuback_.insert(actual_packet.packet_id());
1537 }
1538
1539 if constexpr(is_pingreq<std::decay_t<ActualPacket>>()) {
1540 ep.reset_pingresp_recv_timer();
1541 }
1542
1543 if constexpr(is_disconnect<std::decay_t<ActualPacket>>()) {
1544 ep.status_ = connection_status::disconnecting;
1545 }
1546
1547 if (!validate_maximum_packet_size(self, actual_packet)) {
1548 if constexpr(own_packet_id<std::decay_t<ActualPacket>>()) {
1549 auto packet_id = actual_packet.packet_id();
1550 if (packet_id != 0) {
1551 ep.release_pid(packet_id);
1552 }
1553 }
1554 return false;
1555 }
1556
1557 if constexpr(is_publish<std::decay_t<ActualPacket>>()) {
1558 if (ep.status_ != connection_status::connected) {
1559 // offline publish
1560 self.complete(
1561 make_error(
1562 errc::success,
1563 "packet is stored but not sent"
1564 )
1565 );
1566 return false;
1567 }
1568 }
1569
1570 return true;
1571 }
1572
1573 template <typename Self>
1574 bool validate_topic_alias_range(Self& self, topic_alias_t ta) {
1575 if (!ep.topic_alias_send_) {
1576 self.complete(
1577 make_error(
1578 errc::bad_message,
1579 "topic_alias is set but topic_alias_maximum is 0"
1580 )
1581 );
1582 return false;
1583 }
1584 if (ta == 0 || ta > ep.topic_alias_send_->max()) {
1585 self.complete(
1586 make_error(
1587 errc::bad_message,
1588 "topic_alias is set but out of range"
1589 )
1590 );
1591 return false;
1592 }
1593 return true;
1594 }
1595
1596 template <typename Self>
1597 optional<std::string> validate_topic_alias(Self& self, optional<topic_alias_t> ta_opt) {
1598 BOOST_ASSERT(ep.in_strand());
1599 if (!ta_opt) {
1600 self.complete(
1601 make_error(
1602 errc::bad_message,
1603 "topic is empty but topic_alias isn't set"
1604 )
1605 );
1606 return nullopt;
1607 }
1608
1609 if (!validate_topic_alias_range(self, *ta_opt)) {
1610 return nullopt;
1611 }
1612
1613 auto topic = ep.topic_alias_send_->find(*ta_opt);
1614 if (topic.empty()) {
1615 self.complete(
1616 make_error(
1617 errc::bad_message,
1618 "topic is empty but topic_alias is not registered"
1619 )
1620 );
1621 return nullopt;
1622 }
1623 return topic;
1624 }
1625
1626 template <typename Self, typename PacketArg>
1627 bool validate_maximum_packet_size(Self& self, PacketArg const& packet_arg) {
1628 if (packet_arg.size() > ep.maximum_packet_size_send_) {
1629 ASYNC_MQTT_LOG("mqtt_impl", error)
1630 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1631 << "packet size over maximum_packet_size for sending";
1632 self.complete(
1633 make_error(
1634 errc::bad_message,
1635 "packet size is over maximum_packet_size for sending"
1636 )
1637 );
1638 return false;
1639 }
1640 return true;
1641 }
1642 };
1643
1644 struct recv_impl {
1645 this_type& ep;
1646 optional<filter> fil = nullopt;
1647 std::set<control_packet_type> types = {};
1648 optional<system_error> decided_error = nullopt;
1649 optional<basic_packet_variant<PacketIdBytes>> pv_opt = nullopt;
1650 enum { initiate, disconnect, close, read, complete } state = initiate;
1651
1652 template <typename Self>
1653 void operator()(
1654 Self& self,
1655 error_code const& ec = error_code{},
1656 buffer buf = buffer{}
1657 ) {
1658 if (ec) {
1659 ASYNC_MQTT_LOG("mqtt_impl", info)
1660 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1661 << "recv error:" << ec.message();
1662 decided_error.emplace(ec);
1663 ep.recv_processing_ = false;
1664 state = close;
1665 auto& a_ep{ep};
1666 a_ep.close(
1667 as::bind_executor(
1668 a_ep.stream_->raw_strand(),
1669 force_move(self)
1670 )
1671 );
1672 return;
1673 }
1674
1675 switch (state) {
1676 case initiate: {
1677 state = read;
1678 auto& a_ep{ep};
1679 a_ep.stream_->read_packet(force_move(self));
1680 } break;
1681 case read: {
1682 BOOST_ASSERT(ep.in_strand());
1683 if (buf.size() > ep.maximum_packet_size_recv_) {
1684 // on v3.1.1 maximum_packet_size_recv_ is initialized as packet_size_no_limit
1685 BOOST_ASSERT(ep.protocol_version_ == protocol_version::v5);
1686 state = disconnect;
1687 decided_error.emplace(
1688 make_error(
1689 errc::bad_message,
1690 "too large packet received"
1691 )
1692 );
1693 auto& a_ep{ep};
1694 a_ep.send(
1695 v5::disconnect_packet{
1696 disconnect_reason_code::packet_too_large
1697 },
1698 force_move(self)
1699 );
1700 return;
1701 }
1702
1703 auto v = buffer_to_basic_packet_variant<PacketIdBytes>(buf, ep.protocol_version_);
1704 ASYNC_MQTT_LOG("mqtt_impl", trace)
1705 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1706 << "recv:" << v;
1707 bool call_complete = true;
1708 v.visit(
1709 // do internal protocol processing
1710 overload {
1711 [&](v3_1_1::connect_packet& p) {
1712 ep.initialize();
1713 ep.protocol_version_ = protocol_version::v3_1_1;
1714 ep.status_ = connection_status::connecting;
1715 auto keep_alive = p.keep_alive();
1716 if (keep_alive != 0) {
1717 ep.pingreq_recv_timeout_ms_.emplace(keep_alive * 1000 * 3 / 2);
1718 }
1719 if (p.clean_session()) {
1720 ep.need_store_ = false;
1721 }
1722 else {
1723 ep.need_store_ = true;
1724 }
1725 },
1726 [&](v5::connect_packet& p) {
1727 ep.initialize();
1728 ep.protocol_version_ = protocol_version::v5;
1729 ep.status_ = connection_status::connecting;
1730 auto keep_alive = p.keep_alive();
1731 if (keep_alive != 0) {
1732 ep.pingreq_recv_timeout_ms_.emplace(keep_alive * 1000 * 3 / 2);
1733 }
1734 for (auto const& prop : p.props()) {
1735 prop.visit(
1736 overload {
1737 [&](property::topic_alias_maximum const& p) {
1738 if (p.val() > 0) {
1739 ep.topic_alias_send_.emplace(p.val());
1740 }
1741 },
1742 [&](property::receive_maximum const& p) {
1743 BOOST_ASSERT(p.val() != 0);
1744 ep.publish_send_max_ = p.val();
1745 },
1746 [&](property::maximum_packet_size const& p) {
1747 BOOST_ASSERT(p.val() != 0);
1748 ep.maximum_packet_size_send_ = p.val();
1749 },
1750 [&](property::session_expiry_interval const& p) {
1751 if (p.val() != 0) {
1752 ep.need_store_ = true;
1753 }
1754 },
1755 [](auto const&) {
1756 }
1757 }
1758 );
1759 }
1760 },
1761 [&](v3_1_1::connack_packet& p) {
1762 if (p.code() == connect_return_code::accepted) {
1763 ep.status_ = connection_status::connected;
1764 if (p.session_present()) {
1765 ep.send_stored();
1766 }
1767 else {
1768 ep.clear_pid_man();
1769 ep.store_.clear();
1770 }
1771 }
1772 },
1773 [&](v5::connack_packet& p) {
1774 if (p.code() == connect_reason_code::success) {
1775 ep.status_ = connection_status::connected;
1776
1777 for (auto const& prop : p.props()) {
1778 prop.visit(
1779 overload {
1780 [&](property::topic_alias_maximum const& p) {
1781 if (p.val() > 0) {
1782 ep.topic_alias_send_.emplace(p.val());
1783 }
1784 },
1785 [&](property::receive_maximum const& p) {
1786 BOOST_ASSERT(p.val() != 0);
1787 ep.publish_send_max_ = p.val();
1788 },
1789 [&](property::maximum_packet_size const& p) {
1790 BOOST_ASSERT(p.val() != 0);
1791 ep.maximum_packet_size_send_ = p.val();
1792 },
1793 [](auto const&) {
1794 }
1795 }
1796 );
1797 }
1798
1799 if (p.session_present()) {
1800 ep.send_stored();
1801 }
1802 else {
1803 ep.clear_pid_man();
1804 ep.store_.clear();
1805 }
1806 }
1807 },
1808 [&](v3_1_1::basic_publish_packet<PacketIdBytes>& p) {
1809 switch (p.opts().get_qos()) {
1810 case qos::at_least_once: {
1811 if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
1812 ep.send(
1813 v3_1_1::basic_puback_packet<PacketIdBytes>(p.packet_id()),
1814 [](system_error const&){}
1815 );
1816 }
1817 } break;
1818 case qos::exactly_once:
1819 call_complete = process_qos2_publish(self, protocol_version::v3_1_1, p.packet_id());
1820 break;
1821 default:
1822 break;
1823 }
1824 },
1825 [&](v5::basic_publish_packet<PacketIdBytes>& p) {
1826 switch (p.opts().get_qos()) {
1827 case qos::at_least_once: {
1828 if (ep.publish_recv_.size() == ep.publish_recv_max_) {
1829 state = disconnect;
1830 decided_error.emplace(
1831 make_error(
1832 errc::bad_message,
1833 "receive maximum exceeded"
1834 )
1835 );
1836 auto& a_ep{ep};
1837 a_ep.send(
1838 v5::disconnect_packet{
1839 disconnect_reason_code::receive_maximum_exceeded
1840 },
1841 force_move(self)
1842 );
1843 return;
1844 }
1845 auto packet_id = p.packet_id();
1846 ep.publish_recv_.insert(packet_id);
1847 if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
1848 ep.send(
1849 v5::basic_puback_packet<PacketIdBytes>{packet_id},
1850 [](system_error const&){}
1851 );
1852 }
1853 } break;
1854 case qos::exactly_once: {
1855 if (ep.publish_recv_.size() == ep.publish_recv_max_) {
1856 state = disconnect;
1857 decided_error.emplace(
1858 make_error(
1859 errc::bad_message,
1860 "receive maximum exceeded"
1861 )
1862 );
1863 auto& a_ep{ep};
1864 a_ep.send(
1865 v5::disconnect_packet{
1866 disconnect_reason_code::receive_maximum_exceeded
1867 },
1868 force_move(self)
1869 );
1870 return;
1871 }
1872 auto packet_id = p.packet_id();
1873 ep.publish_recv_.insert(packet_id);
1874 call_complete = process_qos2_publish(self, protocol_version::v5, packet_id);
1875 } break;
1876 default:
1877 break;
1878 }
1879
1880 if (p.topic().empty()) {
1881 if (auto ta_opt = get_topic_alias(p.props())) {
1882 // extract topic from topic_alias
1883 if (*ta_opt == 0 ||
1884 *ta_opt > ep.topic_alias_recv_->max()) {
1885 state = disconnect;
1886 decided_error.emplace(
1887 make_error(
1888 errc::bad_message,
1889 "topic alias invalid"
1890 )
1891 );
1892 auto& a_ep{ep};
1893 a_ep.send(
1894 v5::disconnect_packet{
1895 disconnect_reason_code::topic_alias_invalid
1896 },
1897 force_move(self)
1898 );
1899 return;
1900 }
1901 else {
1902 auto topic = ep.topic_alias_recv_->find(*ta_opt);
1903 if (topic.empty()) {
1904 ASYNC_MQTT_LOG("mqtt_impl", error)
1905 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1906 << "no matching topic alias: "
1907 << *ta_opt;
1908 state = disconnect;
1909 decided_error.emplace(
1910 make_error(
1911 errc::bad_message,
1912 "topic alias invalid"
1913 )
1914 );
1915 auto& a_ep{ep};
1916 a_ep.send(
1917 v5::disconnect_packet{
1918 disconnect_reason_code::topic_alias_invalid
1919 },
1920 force_move(self)
1921 );
1922 return;
1923 }
1924 else {
1925 p.add_topic(allocate_buffer(topic));
1926 }
1927 }
1928 }
1929 else {
1930 ASYNC_MQTT_LOG("mqtt_impl", error)
1931 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1932 << "topic is empty but topic_alias isn't set";
1933 state = disconnect;
1934 decided_error.emplace(
1935 make_error(
1936 errc::bad_message,
1937 "topic alias invalid"
1938 )
1939 );
1940 auto& a_ep{ep};
1941 a_ep.send(
1942 v5::disconnect_packet{
1943 disconnect_reason_code::topic_alias_invalid
1944 },
1945 force_move(self)
1946 );
1947 return;
1948 }
1949 }
1950 else {
1951 if (auto ta_opt = get_topic_alias(p.props())) {
1952 if (*ta_opt == 0 ||
1953 *ta_opt > ep.topic_alias_recv_->max()) {
1954 state = disconnect;
1955 decided_error.emplace(
1956 make_error(
1957 errc::bad_message,
1958 "topic alias invalid"
1959 )
1960 );
1961 auto& a_ep{ep};
1962 a_ep.send(
1963 v5::disconnect_packet{
1964 disconnect_reason_code::topic_alias_invalid
1965 },
1966 force_move(self)
1967 );
1968 return;
1969 }
1970 else {
1971 // extract topic from topic_alias
1972 if (ep.topic_alias_recv_) {
1973 ep.topic_alias_recv_->insert_or_update(p.topic(), *ta_opt);
1974 }
1975 }
1976 }
1977 }
1978 },
1979 [&](v3_1_1::basic_puback_packet<PacketIdBytes>& p) {
1980 auto packet_id = p.packet_id();
1981 if (ep.pid_puback_.erase(packet_id)) {
1982 ep.store_.erase(response_packet::v3_1_1_puback, packet_id);
1983 ep.release_pid(packet_id);
1984 }
1985 else {
1986 ASYNC_MQTT_LOG("mqtt_impl", info)
1987 << ASYNC_MQTT_ADD_VALUE(address, &ep)
1988 << "invalid packet_id puback received packet_id:" << packet_id;
1989 state = disconnect;
1990 decided_error.emplace(
1991 make_error(
1992 errc::bad_message,
1993 "packet_id invalid"
1994 )
1995 );
1996 auto& a_ep{ep};
1997 a_ep.send(
1998 v5::disconnect_packet{
1999 disconnect_reason_code::topic_alias_invalid
2000 },
2001 force_move(self)
2002 );
2003 return;
2004 }
2005 },
2006 [&](v5::basic_puback_packet<PacketIdBytes>& p) {
2007 auto packet_id = p.packet_id();
2008 if (ep.pid_puback_.erase(packet_id)) {
2009 ep.store_.erase(response_packet::v5_puback, packet_id);
2010 ep.release_pid(packet_id);
2011 --ep.publish_send_count_;
2012 send_publish_from_queue();
2013 }
2014 else {
2015 ASYNC_MQTT_LOG("mqtt_impl", info)
2016 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2017 << "invalid packet_id puback received packet_id:" << packet_id;
2018 state = disconnect;
2019 decided_error.emplace(
2020 make_error(
2021 errc::bad_message,
2022 "packet_id invalid"
2023 )
2024 );
2025 auto& a_ep{ep};
2026 a_ep.send(
2027 v5::disconnect_packet{
2028 disconnect_reason_code::topic_alias_invalid
2029 },
2030 force_move(self)
2031 );
2032 return;
2033 }
2034 },
2035 [&](v3_1_1::basic_pubrec_packet<PacketIdBytes>& p) {
2036 auto packet_id = p.packet_id();
2037 if (ep.pid_pubrec_.erase(packet_id)) {
2038 ep.store_.erase(response_packet::v3_1_1_pubrec, packet_id);
2039 if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
2040 ep.send(
2041 v3_1_1::basic_pubrel_packet<PacketIdBytes>(packet_id),
2042 [](system_error const&){}
2043 );
2044 }
2045 }
2046 else {
2047 ASYNC_MQTT_LOG("mqtt_impl", info)
2048 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2049 << "invalid packet_id pubrec received packet_id:" << packet_id;
2050 state = disconnect;
2051 decided_error.emplace(
2052 make_error(
2053 errc::bad_message,
2054 "packet_id invalid"
2055 )
2056 );
2057 auto& a_ep{ep};
2058 a_ep.send(
2059 v5::disconnect_packet{
2060 disconnect_reason_code::topic_alias_invalid
2061 },
2062 force_move(self)
2063 );
2064 return;
2065 }
2066 },
2067 [&](v5::basic_pubrec_packet<PacketIdBytes>& p) {
2068 auto packet_id = p.packet_id();
2069 if (ep.pid_pubrec_.erase(packet_id)) {
2070 ep.store_.erase(response_packet::v5_pubrec, packet_id);
2071 if (is_error(p.code())) {
2072 ep.release_pid(packet_id);
2073 ep.qos2_publish_processing_.erase(packet_id);
2074 --ep.publish_send_count_;
2075 send_publish_from_queue();
2076 }
2077 else if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
2078 ep.send(
2079 v5::basic_pubrel_packet<PacketIdBytes>(packet_id),
2080 [](system_error const&){}
2081 );
2082 }
2083 }
2084 else {
2085 ASYNC_MQTT_LOG("mqtt_impl", info)
2086 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2087 << "invalid packet_id pubrec received packet_id:" << packet_id;
2088 state = disconnect;
2089 decided_error.emplace(
2090 make_error(
2091 errc::bad_message,
2092 "packet_id invalid"
2093 )
2094 );
2095 auto& a_ep{ep};
2096 a_ep.send(
2097 v5::disconnect_packet{
2098 disconnect_reason_code::topic_alias_invalid
2099 },
2100 force_move(self)
2101 );
2102 return;
2103 }
2104 },
2105 [&](v3_1_1::basic_pubrel_packet<PacketIdBytes>& p) {
2106 auto packet_id = p.packet_id();
2107 ep.qos2_publish_handled_.erase(packet_id);
2108 if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
2109 ep.send(
2110 v3_1_1::basic_pubcomp_packet<PacketIdBytes>(packet_id),
2111 [](system_error const&){}
2112 );
2113 }
2114 },
2115 [&](v5::basic_pubrel_packet<PacketIdBytes>& p) {
2116 auto packet_id = p.packet_id();
2117 ep.qos2_publish_handled_.erase(packet_id);
2118 if (ep.auto_pub_response_ && ep.status_ == connection_status::connected) {
2119 ep.send(
2120 v5::basic_pubcomp_packet<PacketIdBytes>(packet_id),
2121 [](system_error const&){}
2122 );
2123 }
2124 },
2125 [&](v3_1_1::basic_pubcomp_packet<PacketIdBytes>& p) {
2126 auto packet_id = p.packet_id();
2127 if (ep.pid_pubcomp_.erase(packet_id)) {
2128 ep.store_.erase(response_packet::v3_1_1_pubcomp, packet_id);
2129 ep.release_pid(packet_id);
2130 ep.qos2_publish_processing_.erase(packet_id);
2131 --ep.publish_send_count_;
2132 send_publish_from_queue();
2133 }
2134 else {
2135 ASYNC_MQTT_LOG("mqtt_impl", info)
2136 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2137 << "invalid packet_id pubcomp received packet_id:" << packet_id;
2138 state = disconnect;
2139 decided_error.emplace(
2140 make_error(
2141 errc::bad_message,
2142 "packet_id invalid"
2143 )
2144 );
2145 auto& a_ep{ep};
2146 a_ep.send(
2147 v5::disconnect_packet{
2148 disconnect_reason_code::topic_alias_invalid
2149 },
2150 force_move(self)
2151 );
2152 return;
2153 }
2154 },
2155 [&](v5::basic_pubcomp_packet<PacketIdBytes>& p) {
2156 auto packet_id = p.packet_id();
2157 if (ep.pid_pubcomp_.erase(packet_id)) {
2158 ep.store_.erase(response_packet::v5_pubcomp, packet_id);
2159 ep.release_pid(packet_id);
2160 ep.qos2_publish_processing_.erase(packet_id);
2161 }
2162 else {
2163 ASYNC_MQTT_LOG("mqtt_impl", info)
2164 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2165 << "invalid packet_id pubcomp received packet_id:" << packet_id;
2166 state = disconnect;
2167 decided_error.emplace(
2168 make_error(
2169 errc::bad_message,
2170 "packet_id invalid"
2171 )
2172 );
2173 auto& a_ep{ep};
2174 a_ep.send(
2175 v5::disconnect_packet{
2176 disconnect_reason_code::topic_alias_invalid
2177 },
2178 force_move(self)
2179 );
2180 return;
2181 }
2182 },
2183 [&](v3_1_1::basic_subscribe_packet<PacketIdBytes>&) {
2184 },
2185 [&](v5::basic_subscribe_packet<PacketIdBytes>&) {
2186 },
2187 [&](v3_1_1::basic_suback_packet<PacketIdBytes>& p) {
2188 auto packet_id = p.packet_id();
2189 if (ep.pid_suback_.erase(packet_id)) {
2190 ep.release_pid(packet_id);
2191 }
2192 },
2193 [&](v5::basic_suback_packet<PacketIdBytes>& p) {
2194 auto packet_id = p.packet_id();
2195 if (ep.pid_suback_.erase(packet_id)) {
2196 ep.release_pid(packet_id);
2197 }
2198 },
2199 [&](v3_1_1::basic_unsubscribe_packet<PacketIdBytes>&) {
2200 },
2201 [&](v5::basic_unsubscribe_packet<PacketIdBytes>&) {
2202 },
2203 [&](v3_1_1::basic_unsuback_packet<PacketIdBytes>& p) {
2204 auto packet_id = p.packet_id();
2205 if (ep.pid_unsuback_.erase(packet_id)) {
2206 ep.release_pid(packet_id);
2207 }
2208 },
2209 [&](v5::basic_unsuback_packet<PacketIdBytes>& p) {
2210 auto packet_id = p.packet_id();
2211 if (ep.pid_unsuback_.erase(packet_id)) {
2212 ep.release_pid(packet_id);
2213 }
2214 },
2215 [&](v3_1_1::pingreq_packet&) {
2216 if constexpr(can_send_as_server(Role)) {
2217 if (ep.auto_ping_response_ && ep.status_ == connection_status::connected) {
2218 ep.send(
2219 v3_1_1::pingresp_packet(),
2220 [](system_error const&){}
2221 );
2222 }
2223 }
2224 },
2225 [&](v5::pingreq_packet&) {
2226 if constexpr(can_send_as_server(Role)) {
2227 if (ep.auto_ping_response_ && ep.status_ == connection_status::connected) {
2228 ep.send(
2229 v5::pingresp_packet(),
2230 [](system_error const&){}
2231 );
2232 }
2233 }
2234 },
2235 [&](v3_1_1::pingresp_packet&) {
2236 ep.tim_pingresp_recv_->cancel();
2237 },
2238 [&](v5::pingresp_packet&) {
2239 ep.tim_pingresp_recv_->cancel();
2240 },
2241 [&](v3_1_1::disconnect_packet&) {
2242 ep.status_ = connection_status::disconnecting;
2243 },
2244 [&](v5::disconnect_packet&) {
2245 ep.status_ = connection_status::disconnecting;
2246 },
2247 [&](v5::auth_packet&) {
2248 },
2249 [&](system_error&) {
2250 ep.status_ = connection_status::closed;
2251 }
2252 }
2253 );
2254 ep.reset_pingreq_recv_timer();
2255 ep.recv_processing_ = false;
2256
2257 auto try_to_comp =
2258 [&] {
2259 if (call_complete && !decided_error) {
2260 pv_opt.emplace(force_move(v));
2261 state = complete;
2262 bind_dispatch(force_move(self));
2263 }
2264 };
2265
2266 if (fil) {
2267 if (auto type_opt = v.type()) {
2268 if ((*fil == filter::match && types.find(*type_opt) == types.end()) ||
2269 (*fil == filter::except && types.find(*type_opt) != types.end())
2270 ) {
2271 // read the next packet
2272 state = initiate;
2273 auto& a_ep{ep};
2274 as::dispatch(
2275 as::bind_executor(
2276 a_ep.stream_->raw_strand(),
2277 force_move(self)
2278 )
2279 );
2280 }
2281 else {
2282 try_to_comp();
2283 }
2284 }
2285 else {
2286 try_to_comp();
2287 }
2288 }
2289 else {
2290 try_to_comp();
2291 }
2292 } break;
2293 case disconnect: {
2294 state = close;
2295 auto& a_ep{ep};
2296 a_ep.close(
2297 as::bind_executor(
2298 a_ep.stream_->raw_strand(),
2299 force_move(self)
2300 )
2301 );
2302 } break;
2303 case close: {
2304 BOOST_ASSERT(decided_error);
2305 ep.recv_processing_ = false;
2306 ASYNC_MQTT_LOG("mqtt_impl", info)
2307 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2308 << "recv code triggers close:" << decided_error->what();
2309 pv_opt.emplace(force_move(*decided_error));
2310 state = complete;
2311 bind_dispatch(force_move(self));
2312 } break;
2313 case complete:
2314 BOOST_ASSERT(pv_opt);
2315 self.complete(force_move(*pv_opt));
2316 break;
2317 default:
2318 BOOST_ASSERT(false);
2319 break;
2320 }
2321 }
2322
2323 template <typename Self>
2324 void operator()(
2325 Self& self,
2326 system_error const&
2327 ) {
2328 BOOST_ASSERT(state == disconnect);
2329 state = close;
2330 auto& a_ep{ep};
2331 a_ep.close(
2332 as::bind_executor(
2333 a_ep.stream_->raw_strand(),
2334 force_move(self)
2335 )
2336 );
2337 }
2338
2339 void send_publish_from_queue() {
2340 BOOST_ASSERT(ep.in_strand());
2341 if (ep.status_ != connection_status::connected) return;
2342 while (!ep.publish_queue_.empty() &&
2343 ep.publish_send_count_ != ep.publish_send_max_) {
2344 ep.send(
2345 force_move(ep.publish_queue_.front()),
2346 true, // from queue
2347 [](system_error const&){}
2348 );
2349 ep.publish_queue_.pop_front();
2350 }
2351 }
2352
2353 template <typename Self>
2354 bool process_qos2_publish(
2355 Self& self,
2356 protocol_version ver,
2357 packet_id_t packet_id
2358 ) {
2359 BOOST_ASSERT(ep.in_strand());
2360 bool already_handled = false;
2361 if (ep.qos2_publish_handled_.find(packet_id) == ep.qos2_publish_handled_.end()) {
2362 ep.qos2_publish_handled_.emplace(packet_id);
2363 }
2364 else {
2365 already_handled = true;
2366 }
2367 if (ep.status_ == connection_status::connected &&
2368 (ep.auto_pub_response_ ||
2369 already_handled) // already_handled is true only if the pubrec packet
2370 ) { // corresponding to the publish packet has already
2371 // been sent as success
2372 switch (ver) {
2373 case protocol_version::v3_1_1:
2374 ep.send(
2375 v3_1_1::basic_pubrec_packet<PacketIdBytes>(packet_id),
2376 [](system_error const&){}
2377 );
2378 break;
2379 case protocol_version::v5:
2380 ep.send(
2381 v5::basic_pubrec_packet<PacketIdBytes>(packet_id),
2382 [](system_error const&){}
2383 );
2384 break;
2385 default:
2386 BOOST_ASSERT(false);
2387 break;
2388 }
2389 }
2390 if (already_handled) {
2391 // do the next read
2392 auto& a_ep{ep};
2393 a_ep.stream_->read_packet(force_move(self));
2394 return false;
2395 }
2396 return true;
2397 }
2398 };
2399
2400 struct close_impl {
2401 this_type& ep;
2402 enum { dispatch, close, bind, complete } state = dispatch;
2403 this_type_sp life_keeper = ep.shared_from_this();
2404
2405 template <typename Self>
2406 void operator()(
2407 Self& self,
2408 error_code const& = error_code{}
2409 ) {
2410 switch (state) {
2411 case dispatch: {
2412 state = close;
2413 auto& a_ep{ep};
2414 as::dispatch(
2415 as::bind_executor(
2416 a_ep.stream_->raw_strand(),
2417 force_move(self)
2418 )
2419 );
2420 } break;
2421 case close:
2422 BOOST_ASSERT(ep.in_strand());
2423 switch (ep.status_) {
2424 case connection_status::connecting:
2425 case connection_status::connected:
2426 case connection_status::disconnecting: {
2427 ASYNC_MQTT_LOG("mqtt_impl", trace)
2428 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2429 << "close initiate status:" << static_cast<int>(ep.status_);
2430 state = bind;
2431 ep.status_ = connection_status::closing;
2432 auto& a_ep{ep};
2433 a_ep.stream_->close(force_move(self));
2434 } break;
2435 case connection_status::closing: {
2436 ASYNC_MQTT_LOG("mqtt_impl", trace)
2437 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2438 << "already close requested";
2439 auto& a_ep{ep};
2440 auto exe = as::get_associated_executor(self);
2441 a_ep.close_queue_.post(
2442 as::bind_executor(
2443 exe,
2444 force_move(self)
2445 )
2446 );
2447 } break;
2448 case connection_status::closed:
2449 ASYNC_MQTT_LOG("mqtt_impl", trace)
2450 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2451 << "already closed";
2452 state = complete;
2453 bind_dispatch(force_move(self));
2454 } break;
2455 case bind: {
2456 BOOST_ASSERT(ep.in_strand());
2457 ASYNC_MQTT_LOG("mqtt_impl", trace)
2458 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2459 << "close complete status:" << static_cast<int>(ep.status_);
2460 auto& a_ep{ep};
2461 a_ep.tim_pingreq_send_->cancel();
2462 a_ep.tim_pingreq_recv_->cancel();
2463 a_ep.tim_pingresp_recv_->cancel();
2464 a_ep.status_ = connection_status::closed;
2465 ASYNC_MQTT_LOG("mqtt_impl", trace)
2466 << ASYNC_MQTT_ADD_VALUE(address, &ep)
2467 << "process enqueued close";
2468 a_ep.close_queue_.poll();
2469 state = complete;
2470 state = complete;
2471 bind_dispatch(force_move(self));
2472 } break;
2473 case complete:
2474 self.complete();
2475 break;
2476 }
2477 }
2478 };
2479
2480 struct restore_packets_impl {
2481 this_type& ep;
2482 std::vector<basic_store_packet_variant<PacketIdBytes>> pvs;
2483 enum { dispatch, restore, complete } state = dispatch;
2484
2485 template <typename Self>
2486 void operator()(
2487 Self& self
2488 ) {
2489 switch (state) {
2490 case dispatch: {
2491 state = restore;
2492 auto& a_ep{ep};
2493 as::dispatch(
2494 as::bind_executor(
2495 a_ep.stream_->raw_strand(),
2496 force_move(self)
2497 )
2498 );
2499 } break;
2500 case restore: {
2501 BOOST_ASSERT(ep.in_strand());
2502 ep.restore_packets(force_move(pvs));
2503 state = complete;
2504 bind_dispatch(force_move(self));
2505 } break;
2506 case complete:
2507 self.complete();
2508 break;
2509 }
2510 }
2511 };
2512
2513 struct get_stored_packets_impl {
2514 this_type const& ep;
2515 std::vector<basic_store_packet_variant<PacketIdBytes>> packets = {};
2516 enum { dispatch, get, complete } state = dispatch;
2517
2518 template <typename Self>
2519 void operator()(
2520 Self& self
2521 ) {
2522 switch (state) {
2523 case dispatch: {
2524 state = get;
2525 auto& a_ep{ep};
2526 as::dispatch(
2527 as::bind_executor(
2528 a_ep.stream_->raw_strand(),
2529 force_move(self)
2530 )
2531 );
2532 } break;
2533 case get: {
2534 BOOST_ASSERT(ep.in_strand());
2535 packets = ep.get_stored_packets();
2536 state = complete;
2537 bind_dispatch(force_move(self));
2538 } break;
2539 case complete:
2540 self.complete(force_move(packets));
2541 break;
2542 }
2543 }
2544 };
2545
2546 struct regulate_for_store_impl {
2547 this_type const& ep;
2548 v5::basic_publish_packet<PacketIdBytes> packet;
2549 enum { dispatch, regulate, complete } state = dispatch;
2550
2551 template <typename Self>
2552 void operator()(
2553 Self& self
2554 ) {
2555 switch (state) {
2556 case dispatch: {
2557 state = regulate;
2558 auto& a_ep{ep};
2559 as::dispatch(
2560 as::bind_executor(
2561 a_ep.stream_->raw_strand(),
2562 force_move(self)
2563 )
2564 );
2565 } break;
2566 case regulate: {
2567 BOOST_ASSERT(ep.in_strand());
2568 ep.regulate_for_store(packet);
2569 state = complete;
2570 bind_dispatch(force_move(self));
2571 } break;
2572 case complete:
2573 self.complete(force_move(packet));
2574 break;
2575 }
2576 }
2577 };
2578
2579private:
2580
2581 template <typename Packet, typename CompletionToken>
2582 auto
2583 send(
2584 Packet packet,
2585 bool from_queue,
2586 CompletionToken&& token
2587 ) {
2588 return
2589 as::async_compose<
2590 CompletionToken,
2591 void(system_error)
2592 >(
2593 send_impl<Packet>{
2594 *this,
2595 force_move(packet),
2596 from_queue
2597 },
2598 token
2599 );
2600 }
2601
2602 bool enqueue_publish(v5::basic_publish_packet<PacketIdBytes>& packet) {
2603 BOOST_ASSERT(in_strand());
2604 if (packet.opts().get_qos() == qos::at_least_once ||
2605 packet.opts().get_qos() == qos::exactly_once
2606 ) {
2607 if (publish_send_count_ == publish_send_max_) {
2608 publish_queue_.push_back(force_move(packet));
2609 return true;
2610 }
2611 else {
2612 ++publish_send_count_;
2613 if (!publish_queue_.empty()) {
2614 publish_queue_.push_back(force_move(packet));
2615 return true;
2616 }
2617 }
2618 }
2619 return false;
2620 }
2621
2622 void send_stored() {
2623 BOOST_ASSERT(in_strand());
2624 store_.for_each(
2625 [&](basic_store_packet_variant<PacketIdBytes> const& pv) {
2626 if (pv.size() > maximum_packet_size_send_) {
2627 release_pid(pv.packet_id());
2628 return false;
2629 }
2630 pv.visit(
2631 // copy packet because the stored packets need to be preserved
2632 // until receiving puback/pubrec/pubcomp
2633 overload {
2634 [&](v3_1_1::basic_publish_packet<PacketIdBytes> p) {
2635 send(
2636 p,
2637 [](system_error const&){}
2638 );
2639 },
2640 [&](v5::basic_publish_packet<PacketIdBytes> p) {
2641 if (enqueue_publish(p)) return;
2642 send(
2643 p,
2644 [](system_error const&){}
2645 );
2646 },
2647 [&](v3_1_1::basic_pubrel_packet<PacketIdBytes> p) {
2648 send(
2649 p,
2650 [](system_error const&){}
2651 );
2652 },
2653 [&](v5::basic_pubrel_packet<PacketIdBytes> p) {
2654 send(
2655 p,
2656 [](system_error const&){}
2657 );
2658 }
2659 }
2660 );
2661 return true;
2662 }
2663 );
2664 }
2665
2666 void initialize() {
2667 BOOST_ASSERT(in_strand());
2668 publish_send_count_ = 0;
2669 publish_queue_.clear();
2670 topic_alias_send_ = nullopt;
2671 topic_alias_recv_ = nullopt;
2672 publish_recv_.clear();
2673 qos2_publish_processing_.clear();
2674 need_store_ = false;
2675 pid_suback_.clear();
2676 pid_unsuback_.clear();
2677 pid_puback_.clear();
2678 pid_pubrec_.clear();
2679 pid_pubcomp_.clear();
2680 }
2681
2682 void reset_pingreq_send_timer() {
2683 BOOST_ASSERT(in_strand());
2684 if (pingreq_send_interval_ms_) {
2685 tim_pingreq_send_->cancel();
2686 if (status_ == connection_status::disconnecting ||
2687 status_ == connection_status::closing ||
2688 status_ == connection_status::closed) return;
2689 tim_pingreq_send_->expires_after(
2690 std::chrono::milliseconds{*pingreq_send_interval_ms_}
2691 );
2692 tim_pingreq_send_->async_wait(
2693 [this, wp = std::weak_ptr{tim_pingreq_send_}](error_code const& ec) {
2694 if (!ec) {
2695 if (auto sp = wp.lock()) {
2696 switch (protocol_version_) {
2697 case protocol_version::v3_1_1:
2698 send(
2699 v3_1_1::pingreq_packet(),
2700 [](system_error const&){}
2701 );
2702 break;
2703 case protocol_version::v5:
2704 send(
2705 v5::pingreq_packet(),
2706 [](system_error const&){}
2707 );
2708 break;
2709 default:
2710 BOOST_ASSERT(false);
2711 break;
2712 }
2713 }
2714 }
2715 }
2716 );
2717 }
2718 }
2719
2720 void reset_pingreq_recv_timer() {
2721 BOOST_ASSERT(in_strand());
2722 if (pingreq_recv_timeout_ms_) {
2723 tim_pingreq_recv_->cancel();
2724 if (status_ == connection_status::disconnecting ||
2725 status_ == connection_status::closing ||
2726 status_ == connection_status::closed) return;
2727 tim_pingreq_recv_->expires_after(
2728 std::chrono::milliseconds{*pingreq_recv_timeout_ms_}
2729 );
2730 tim_pingreq_recv_->async_wait(
2731 [this, wp = std::weak_ptr{tim_pingreq_recv_}](error_code const& ec) {
2732 if (!ec) {
2733 if (auto sp = wp.lock()) {
2734 switch (protocol_version_) {
2735 case protocol_version::v3_1_1:
2736 ASYNC_MQTT_LOG("mqtt_impl", error)
2737 << ASYNC_MQTT_ADD_VALUE(address, this)
2738 << "pingreq recv timeout. close.";
2739 close(
2740 []{}
2741 );
2742 break;
2743 case protocol_version::v5:
2744 ASYNC_MQTT_LOG("mqtt_impl", error)
2745 << ASYNC_MQTT_ADD_VALUE(address, this)
2746 << "pingreq recv timeout. close.";
2747 send(
2748 v5::disconnect_packet{
2749 disconnect_reason_code::keep_alive_timeout,
2750 properties{}
2751 },
2752 [this](system_error const&){
2753 close(
2754 []{}
2755 );
2756 }
2757 );
2758 break;
2759 default:
2760 BOOST_ASSERT(false);
2761 break;
2762 }
2763 }
2764 }
2765 }
2766 );
2767 }
2768 }
2769
2770 void reset_pingresp_recv_timer() {
2771 BOOST_ASSERT(in_strand());
2772 if (pingresp_recv_timeout_ms_) {
2773 tim_pingresp_recv_->cancel();
2774 if (status_ == connection_status::disconnecting ||
2775 status_ == connection_status::closing ||
2776 status_ == connection_status::closed) return;
2777 tim_pingresp_recv_->expires_after(
2778 std::chrono::milliseconds{*pingresp_recv_timeout_ms_}
2779 );
2780 tim_pingresp_recv_->async_wait(
2781 [this, wp = std::weak_ptr{tim_pingresp_recv_}](error_code const& ec) {
2782 if (!ec) {
2783 if (auto sp = wp.lock()) {
2784 switch (protocol_version_) {
2785 case protocol_version::v3_1_1:
2786 ASYNC_MQTT_LOG("mqtt_impl", error)
2787 << ASYNC_MQTT_ADD_VALUE(address, this)
2788 << "pingresp recv timeout. close.";
2789 close(
2790 []{}
2791 );
2792 break;
2793 case protocol_version::v5:
2794 ASYNC_MQTT_LOG("mqtt_impl", error)
2795 << ASYNC_MQTT_ADD_VALUE(address, this)
2796 << "pingresp recv timeout. close.";
2797 if (status_ == connection_status::connected) {
2798 send(
2799 v5::disconnect_packet{
2800 disconnect_reason_code::keep_alive_timeout,
2801 properties{}
2802 },
2803 [this](system_error const&){
2804 close(
2805 []{}
2806 );
2807 }
2808 );
2809 }
2810 else {
2811 close(
2812 []{}
2813 );
2814 }
2815 break;
2816 default:
2817 BOOST_ASSERT(false);
2818 break;
2819 }
2820 }
2821 }
2822 }
2823 );
2824 }
2825 }
2826
2827 template <typename CompletionToken>
2828 void add_retry(
2829 CompletionToken&& token
2830 ) {
2831 auto tim = std::make_shared<as::steady_timer>(stream_->raw_strand());
2832 tim->expires_at(std::chrono::steady_clock::time_point::max());
2833 tim->async_wait(
2834 as::bind_executor(
2835 stream_->raw_strand(),
2836 std::forward<CompletionToken>(token)
2837 )
2838 );
2839 tim_retry_acq_pid_queue_.emplace_back(force_move(tim));
2840 }
2841
2842 void notify_retry_one() {
2843 for (auto it = tim_retry_acq_pid_queue_.begin();
2844 it != tim_retry_acq_pid_queue_.end();
2845 ++it
2846 ) {
2847 if (it->cancelled) continue;
2848 it->tim->cancel();
2849 it->cancelled = true;
2850 return;
2851 }
2852 }
2853
2854 void complete_retry_one() {
2855 if (!tim_retry_acq_pid_queue_.empty()) {
2856 tim_retry_acq_pid_queue_.pop_front();
2857 }
2858 }
2859
2860 void notify_retry_all() {
2861 tim_retry_acq_pid_queue_.clear();
2862 }
2863
2864 bool has_retry() const {
2865 return !tim_retry_acq_pid_queue_.empty();
2866 }
2867
2868 void clear_pid_man() {
2869 pid_man_.clear();
2870 notify_retry_all();
2871 }
2872
2873 void release_pid(packet_id_t pid) {
2874 pid_man_.release_id(pid);
2875 notify_retry_one();
2876 }
2877
2878private:
2879 protocol_version protocol_version_;
2880 std::shared_ptr<stream_type> stream_;
2881 packet_id_manager<packet_id_t> pid_man_;
2882 std::set<packet_id_t> pid_suback_;
2883 std::set<packet_id_t> pid_unsuback_;
2884 std::set<packet_id_t> pid_puback_;
2885 std::set<packet_id_t> pid_pubrec_;
2886 std::set<packet_id_t> pid_pubcomp_;
2887
2888 bool need_store_ = false;
2889 store<PacketIdBytes, as::strand<as::any_io_executor>> store_{stream_->raw_strand()};
2890
2891 bool auto_pub_response_ = false;
2892 bool auto_ping_response_ = false;
2893
2894 bool auto_map_topic_alias_send_ = false;
2895 bool auto_replace_topic_alias_send_ = false;
2896 optional<topic_alias_send> topic_alias_send_;
2897 optional<topic_alias_recv> topic_alias_recv_;
2898
2899 receive_maximum_t publish_send_max_{receive_maximum_max};
2900 receive_maximum_t publish_recv_max_{receive_maximum_max};
2901 receive_maximum_t publish_send_count_{0};
2902
2903 std::set<packet_id_t> publish_recv_;
2904 std::deque<v5::basic_publish_packet<PacketIdBytes>> publish_queue_;
2905
2906 ioc_queue close_queue_;
2907
2908 std::uint32_t maximum_packet_size_send_{packet_size_no_limit};
2909 std::uint32_t maximum_packet_size_recv_{packet_size_no_limit};
2910
2911 connection_status status_{connection_status::closed};
2912
2913 optional<std::size_t> pingreq_send_interval_ms_;
2914 optional<std::size_t> pingreq_recv_timeout_ms_;
2915 optional<std::size_t> pingresp_recv_timeout_ms_;
2916
2917 std::shared_ptr<as::steady_timer> tim_pingreq_send_{std::make_shared<as::steady_timer>(stream_->raw_strand())};
2918 std::shared_ptr<as::steady_timer> tim_pingreq_recv_{std::make_shared<as::steady_timer>(stream_->raw_strand())};
2919 std::shared_ptr<as::steady_timer> tim_pingresp_recv_{std::make_shared<as::steady_timer>(stream_->raw_strand())};
2920
2921 std::set<packet_id_t> qos2_publish_handled_;
2922
2923 bool recv_processing_ = false;
2924 std::set<packet_id_t> qos2_publish_processing_;
2925
2926 struct tim_cancelled {
2927 tim_cancelled(
2928 std::shared_ptr<as::steady_timer> tim,
2929 bool cancelled = false
2930 ):tim{force_move(tim)}, cancelled{cancelled}
2931 {}
2932 std::shared_ptr<as::steady_timer> tim;
2933 bool cancelled;
2934 };
2935 std::deque<tim_cancelled> tim_retry_acq_pid_queue_;
2936};
2937
2945template <role Role, typename NextLayer>
2947
2955template <role Role, typename NextLayer>
2957
2958} // namespace async_mqtt
2959
2960#endif // ASYNC_MQTT_ENDPOINT_HPP
MQTT endpoint corresponding to the connection.
Definition endpoint.hpp:70
auto restore_packets(std::vector< basic_store_packet_variant< PacketIdBytes > > pvs, CompletionToken &&token)
restore packets the restored packets would automatically send when CONNACK packet is received
Definition endpoint.hpp:569
auto recv(filter fil, std::set< control_packet_type > types, CompletionToken &&token)
receive packet users CANNOT call recv() before the previous recv()'s CompletionToken is invoked if pa...
Definition endpoint.hpp:513
auto release_packet_id(packet_id_t packet_id, CompletionToken &&token)
release packet_id.
Definition endpoint.hpp:387
std::vector< basic_store_packet_variant< PacketIdBytes > > get_stored_packets() const
get stored packets sotred packets mean inflight packets.
Definition endpoint.hpp:758
void release_packet_id(packet_id_t pid)
release packet_id.
Definition endpoint.hpp:683
static std::shared_ptr< this_type > create(protocol_version ver, Args &&... args)
create
Definition endpoint.hpp:142
void set_pingresp_recv_timeout_ms(std::size_t ms)
Set timeout for receiving PINGRESP packet after PINGREQ packet is sent. If the timer is fired,...
Definition endpoint.hpp:275
strand_type & strand()
strand getter
Definition endpoint.hpp:172
next_layer_type & next_layer()
next_layer getter
Definition endpoint.hpp:195
auto recv(std::set< control_packet_type > types, CompletionToken &&token)
receive packet users CANNOT call recv() before the previous recv()'s CompletionToken is invoked if pa...
Definition endpoint.hpp:478
std::set< packet_id_t > get_qos2_publish_handled_pids() const
Get processed but not released QoS2 packet ids This function should be called after disconnection.
Definition endpoint.hpp:697
auto send(Packet packet, CompletionToken &&token)
send packet users can call send() before the previous send()'s CompletionToken is invoked
Definition endpoint.hpp:416
auto acquire_unique_packet_id(CompletionToken &&token)
acuire unique packet_id.
Definition endpoint.hpp:307
protocol_version get_protocol_version() const
get MQTT protocol version
Definition endpoint.hpp:771
bool in_strand() const
strand checker
Definition endpoint.hpp:180
bool register_packet_id(packet_id_t pid)
register packet_id.
Definition endpoint.hpp:669
auto close(CompletionToken &&token)
close the underlying connection
Definition endpoint.hpp:544
void restore_qos2_publish_handled_pids(std::set< packet_id_t > pids)
Restore processed but not released QoS2 packet ids This function should be called before receive the ...
Definition endpoint.hpp:711
bool is_publish_processing(packet_id_t pid) const
Get MQTT PUBLISH packet processing status.
Definition endpoint.hpp:785
void set_auto_ping_response(bool val)
auto pingreq response setter. Should be called before send()/recv() call.
Definition endpoint.hpp:231
strand_type const & strand() const
strand getter
Definition endpoint.hpp:164
auto get_stored_packets(CompletionToken &&token) const
get stored packets sotred packets mean inflight packets.
Definition endpoint.hpp:600
void set_bulk_write(bool val)
Set bulk write mode. If true, then concatenate multiple packets' const buffer sequence when send() is...
Definition endpoint.hpp:293
auto acquire_unique_packet_id_wait_until(CompletionToken &&token)
acuire unique packet_id. If packet_id is fully acquired, then wait until released.
Definition endpoint.hpp:333
auto & lowest_layer()
lowest_layer getter
Definition endpoint.hpp:210
auto register_packet_id(packet_id_t packet_id, CompletionToken &&token)
register packet_id.
Definition endpoint.hpp:359
typename packet_id_type< PacketIdBytes >::type packet_id_t
Type of MQTT Packet Identifier.
Definition endpoint.hpp:129
void regulate_for_store(v5::basic_publish_packet< PacketIdBytes > &packet) const
Regulate publish packet for store If topic is empty, extract topic from topic alias,...
Definition endpoint.hpp:800
static constexpr std::size_t packet_id_bytes
The value given as PacketIdBytes.
Definition endpoint.hpp:123
void set_auto_replace_topic_alias_send(bool val)
auto replace topic with corresponding topic alias on send PUBLISH packet. Registering topic alias nee...
Definition endpoint.hpp:259
auto const & lowest_layer() const
lowest_layer getter
Definition endpoint.hpp:203
next_layer_type const & next_layer() const
next_layer getter
Definition endpoint.hpp:188
void restore_packets(std::vector< basic_store_packet_variant< PacketIdBytes > > pvs)
restore packets the restored packets would automatically send when CONNACK packet is received
Definition endpoint.hpp:725
auto recv(CompletionToken &&token)
receive packet users CANNOT call recv() before the previous recv()'s CompletionToken is invoked
Definition endpoint.hpp:447
optional< packet_id_t > acquire_unique_packet_id()
acuire unique packet_id.
Definition endpoint.hpp:647
void set_auto_pub_response(bool val)
auto publish response setter. Should be called before send()/recv() call.
Definition endpoint.hpp:219
void set_auto_map_topic_alias_send(bool val)
auto map (allocate) topic alias on send PUBLISH packet. If all topic aliases are used,...
Definition endpoint.hpp:245
typename stream_type::strand_type strand_type
The type of stand that is used MQTT stream exclusive control.
Definition endpoint.hpp:121
Definition packet_variant.hpp:49
auto visit(Func &&func) const &
visit to variant
Definition packet_variant.hpp:74
Definition property.hpp:714
role
MQTT endpoint connection role.
Definition endpoint.hpp:35
@ client
as client. Can't send CONNACK, SUBACK, UNSUBACK, PINGRESP. Can send Other packets.
filter
receive packet filter
Definition endpoint.hpp:45
@ except
no matched control_packet_type is target
@ match
matched control_packet_type is target
protocol_version
MQTT protocol version.
Definition protocol_version.hpp:20
@ send
Always send. Same as MQTT v.3.1.1.