7 #if !defined(MQTT_ENDPOINT_HPP)
8 #define MQTT_ENDPOINT_HPP
22 #include <boost/any.hpp>
23 #include <boost/lexical_cast.hpp>
24 #include <boost/asio.hpp>
25 #include <boost/lexical_cast.hpp>
26 #include <boost/multi_index_container.hpp>
27 #include <boost/multi_index/sequenced_index.hpp>
28 #include <boost/multi_index/ordered_index.hpp>
29 #include <boost/multi_index/member.hpp>
30 #include <boost/multi_index/mem_fun.hpp>
31 #include <boost/multi_index/composite_key.hpp>
32 #include <boost/system/error_code.hpp>
33 #include <boost/assert.hpp>
75 #if defined(MQTT_USE_WS)
83 #define MQTT_LIBSTDCXX_GCC_730 20180125
84 #define MQTT_LIBSTDCXX_GCC_740 20181206
85 #define MQTT_LIBSTDCXX_GCC_750 20191114
86 #define MQTT_LIBSTDCXX_GCC_810 20180502
87 #define MQTT_LIBSTDCXX_GCC_820 20180726
88 #define MQTT_LIBSTDCXX_GCC_830 20190222
89 #define MQTT_LIBSTDCXX_GCC_910 20190503
90 #define MQTT_LIBSTDCXX_GCC_920 20190812
92 #if !defined(MQTT_DISABLE_LIBSTDCXX_TUPLE_ANY_WORKAROUND)
93 #if defined(MQTT_STD_ANY) && defined(__GLIBCXX__) && (__GLIBCXX__ != MQTT_LIBSTDCXX_GCC_820) && (__GLIBCXX__ != MQTT_LIBSTDCXX_GCC_830)
96 struct std::is_constructible<std::tuple<std::any>> : std::true_type {
100 struct std::is_constructible<std::tuple<std::any>, std::tuple<std::any> const&> : std::true_type {
104 struct std::is_copy_constructible<std::tuple<std::any>> : std::true_type {
108 struct std::is_copy_constructible<std::_Head_base<0, std::any, false>> : std::true_type {
112 struct std::is_constructible<std::_Head_base<0, std::any, false>, std::_Head_base<0, std::any, false> const&> : std::true_type {
118 #undef MQTT_LIBSTDCXX_GCC_730
119 #undef MQTT_LIBSTDCXX_GCC_740
120 #undef MQTT_LIBSTDCXX_GCC_750
121 #undef MQTT_LIBSTDCXX_GCC_810
122 #undef MQTT_LIBSTDCXX_GCC_820
123 #undef MQTT_LIBSTDCXX_GCC_830
124 #undef MQTT_LIBSTDCXX_GCC_910
125 #undef MQTT_LIBSTDCXX_GCC_920
127 #if defined(__GNUC__)
128 #pragma GCC diagnostic push
129 #pragma GCC diagnostic ignored "-Wimplicit-fallthrough"
132 #include <boost/asio/yield.hpp>
139 template <
typename T>
141 std::enable_if_t< ! std::is_convertible<std::decay_t<T>, publish_options>::value,
bool>
150 template<
typename ... Params>
152 #if __cplusplus >= 201703L
157 for(
const bool val : results)
168 namespace mi = boost::multi_index;
170 template <
typename Mutex = std::mutex,
template<
typename...>
class LockGuard = std::lock_guard, std::size_t PacketIdBytes = 2>
171 class endpoint :
public std::enable_shared_from_this<endpoint<Mutex, LockGuard, PacketIdBytes>> {
173 using this_type_sp = std::shared_ptr<this_type>;
183 :async_send_store_{async_send_store},
190 <<
" version:" << version
191 <<
" async_send_store:" << std::boolalpha << async_send_store;
201 async_send_store_{async_send_store},
208 <<
" version:" << version
209 <<
" async_send_store:" << std::boolalpha << async_send_store;
221 virtual bool on_pingreq() noexcept = 0;
229 virtual
bool on_pingresp() noexcept = 0;
271 virtual
bool on_connect(
buffer client_id,
272 optional<
buffer> user_name,
273 optional<
buffer> password,
276 std::uint16_t keep_alive) noexcept = 0;
290 virtual
bool on_connack(
bool session_present,
connect_return_code return_code) noexcept = 0;
309 virtual
bool on_publish(optional<
packet_id_t> packet_id,
312 buffer contents) noexcept = 0;
322 virtual
bool on_puback(
packet_id_t packet_id) noexcept = 0;
332 virtual
bool on_pubrec(
packet_id_t packet_id) noexcept = 0;
342 virtual
bool on_pubrel(
packet_id_t packet_id) noexcept = 0;
352 virtual
bool on_pubcomp(
packet_id_t packet_id) noexcept = 0;
399 virtual
bool on_unsuback(
packet_id_t) noexcept = 0;
406 virtual
void on_disconnect() noexcept = 0;
453 virtual
bool on_v5_connect(
buffer client_id,
454 optional<
buffer> user_name,
455 optional<
buffer> password,
458 std::uint16_t keep_alive,
477 virtual
bool on_v5_connack(
bool session_present,
506 virtual
bool on_v5_publish(optional<
packet_id_t> packet_id,
606 virtual
bool on_v5_subscribe(
packet_id_t packet_id,
644 virtual
bool on_v5_unsubscribe(
packet_id_t packet_id,
729 virtual
void on_pub_res_sent(
packet_id_t packet_id) noexcept = 0;
771 virtual
void on_serialize_remove(
packet_id_t packet_id) noexcept = 0;
797 if (async_read_on_message_processed_) {
837 return total_bytes_received_;
845 return total_bytes_sent_;
856 auto_pub_response_ = b;
857 auto_pub_response_async_ = async;
869 auto_map_topic_alias_send_ = b;
881 auto_replace_topic_alias_send_ = b;
885 packet_bulk_read_limit_ =
size;
889 props_bulk_read_limit_ =
size;
898 LockGuard<Mutex> lck (topic_alias_recv_mtx_);
899 topic_alias_recv_.emplace(max);
939 template <
typename T,
typename... Params>
944 publish(packet_id, std::forward<T>(t), std::forward<Params>(params)...);
948 publish(0, std::forward<T>(t), std::forward<Params>(params)...);
970 template <
typename T,
typename... Params>
974 subscribe(packet_id, std::forward<T>(t), std::forward<Params>(params)...);
991 template <
typename T,
typename... Params>
995 unsubscribe(packet_id, std::forward<T>(t), std::forward<Params>(params)...);
1021 <<
" reason:" << reason;
1023 if (connected_ && mqtt_connected_) {
1024 disconnect_requested_ =
true;
1037 <<
"force_disconnect";
1042 LockGuard<Mutex> lck (topic_alias_send_mtx_);
1043 if (topic_alias_send_) topic_alias_send_.value().clear();
1046 LockGuard<Mutex> lck (topic_alias_recv_mtx_);
1047 if (topic_alias_recv_) topic_alias_recv_.value().clear();
1078 std::string topic_name,
1079 std::string contents,
1082 any life_keeper = {}
1087 <<
" pid:" << packet_id
1088 <<
" topic:" << topic_name
1089 <<
" qos:" << pubopts.get_qos()
1090 <<
" retain:" << pubopts.get_retain()
1091 <<
" dup:" << pubopts.get_dup();
1095 send_publish(packet_id,
1103 auto sp_topic_name = std::make_shared<std::string>(
force_move(topic_name));
1104 auto sp_contents = std::make_shared<std::string>(
force_move(contents));
1106 auto contents_buf =
as::buffer(*sp_contents);
1148 template <
typename ConstBufferSequence>
1149 typename std::enable_if<
1150 as::is_const_buffer_sequence<ConstBufferSequence>::value
1154 as::const_buffer topic_name,
1155 ConstBufferSequence contents,
1163 <<
" pid:" << packet_id
1165 <<
" qos:" << pubopts.
get_qos()
1167 <<
" dup:" << pubopts.
get_dup();
1202 template <
typename ConstBufferSequence>
1203 typename std::enable_if<
1204 as::is_const_buffer_sequence<ConstBufferSequence>::value
1208 as::const_buffer topic_name,
1209 ConstBufferSequence contents,
1216 <<
" pid:" << packet_id
1218 <<
" qos:" << pubopts.
get_qos()
1220 <<
" dup:" << pubopts.
get_dup();
1257 template <
typename BufferSequence>
1258 typename std::enable_if<
1264 BufferSequence contents,
1266 any life_keeper = {}
1271 <<
" pid:" << packet_id
1272 <<
" topic:" << topic_name
1273 <<
" qos:" << pubopts.get_qos()
1274 <<
" retain:" << pubopts.get_retain()
1275 <<
" dup:" << pubopts.get_dup();
1279 auto topic_name_buf =
as::buffer(topic_name);
1281 std::vector<as::const_buffer> cbs;
1285 cbs.reserve(
static_cast<std::size_t
>(std::distance(b, e)));
1286 for (; b != e; ++b) {
1330 template <
typename BufferSequence>
1331 typename std::enable_if<
1332 is_buffer_sequence<BufferSequence>::value
1337 BufferSequence contents,
1340 any life_keeper = {}
1345 <<
" pid:" << packet_id
1346 <<
" topic:" << topic_name
1347 <<
" qos:" << pubopts.
get_qos()
1349 <<
" dup:" << pubopts.
get_dup();
1353 auto topic_name_buf =
as::buffer(topic_name);
1355 std::vector<as::const_buffer> cbs;
1359 cbs.reserve(
static_cast<std::size_t
>(std::distance(b, e)));
1360 for (; b != e; ++b) {
1406 <<
" pid:" << packet_id
1407 <<
" topic:" << topic_filter
1408 <<
" qos:" << option.
get_qos()
1410 <<
" nl:" << option.
get_nl()
1411 <<
" rap:" << option.
get_rap();
1414 std::vector<std::tuple<as::const_buffer, subscribe_options>>{
1415 {
as::buffer(topic_filter.data(), topic_filter.size()), option }
1442 as::const_buffer topic_filter,
1449 <<
" pid:" << packet_id
1451 <<
" qos:" << option.
get_qos()
1453 <<
" nl:" << option.
get_nl()
1454 <<
" rap:" << option.
get_rap();
1457 std::vector<std::tuple<as::const_buffer, subscribe_options>>{ { topic_filter, option } },
1478 std::vector<std::tuple<string_view, subscribe_options>> params,
1484 <<
" pid:" << packet_id;
1486 std::vector<std::tuple<as::const_buffer, subscribe_options>> cb_params;
1487 cb_params.reserve(params.size());
1488 for (
auto const& e : params) {
1489 cb_params.emplace_back(
as::buffer(std::get<0>(e).data(), std::get<0>(e).
size()), std::get<1>(e));
1509 std::vector<std::tuple<buffer, subscribe_options>> params,
1515 <<
" pid:" << packet_id;
1517 std::vector<std::tuple<as::const_buffer, subscribe_options>> buffers;
1518 buffers.reserve(params.size());
1519 for (
auto const& tup : params) {
1520 buffers.emplace_back(
as::buffer(std::get<0>(tup)), std::get<1>(tup));
1547 <<
" pid:" << packet_id
1548 <<
" topic:" << topic_filter;
1551 std::vector<as::const_buffer> {
1552 as::buffer(topic_filter.data(), topic_filter.size())
1575 as::const_buffer topic_filter,
1581 <<
" pid:" << packet_id
1584 send_unsubscribe(std::vector<as::const_buffer>{ topic_filter }, packet_id,
force_move(props));
1602 std::vector<string_view> params,
1608 <<
" pid:" << packet_id;
1610 std::vector<as::const_buffer> cb_params;
1611 cb_params.reserve(params.size());
1613 for (
auto&& e : params) {
1614 cb_params.emplace_back(
as::buffer(e.data(), e.size()));
1634 std::vector<as::const_buffer> params,
1640 <<
" pid:" << packet_id;
1642 std::vector<buffer> cb_params;
1643 cb_params.reserve(params.size());
1645 for (
auto&& e : params) {
1648 send_unsubscribe(params, packet_id,
force_move(props));
1666 std::vector<buffer> params,
1672 <<
" pid:" << packet_id;
1674 std::vector<as::const_buffer> cb_params;
1675 cb_params.reserve(params.size());
1677 for (
auto&& e : params) {
1693 if (connected_ && mqtt_connected_) send_pingreq();
1728 <<
" reason:" << reason_code;
1764 std::string
const& client_id,
1765 optional<std::string>
const& user_name,
1766 optional<std::string>
const& password,
1768 std::uint16_t keep_alive_sec,
1774 <<
" client_id:" << client_id
1775 <<
" user_name:" << (user_name ? user_name.value() :
"none")
1776 <<
" keep_alive:" << std::dec << keep_alive_sec;
1778 connect_requested_ =
true;
1835 optional<buffer> user_name,
1836 optional<buffer> password,
1838 std::uint16_t keep_alive_sec,
1844 <<
" client_id:" << client_id
1846 <<
" keep_alive:" << std::dec << keep_alive_sec;
1848 connect_requested_ =
true;
1870 bool session_present,
1871 variant<connect_return_code, v5::connect_reason_code> reason_code,
1877 <<
" session_present:" << std::boolalpha << session_present
1878 <<
" reason:" << reason_code;
1880 send_connack(session_present, reason_code,
force_move(props));
1903 <<
" pid:" << packet_id
1904 <<
" reason:" << reason_code;
1906 send_puback(packet_id, reason_code,
force_move(props));
1929 <<
" pid:" << packet_id
1930 <<
" reason:" << reason_code;
1932 send_pubrec(packet_id, reason_code,
force_move(props));
1958 any life_keeper = {}
1963 <<
" pid:" << packet_id
1964 <<
" reason:" << reason_code;
1989 <<
" pid:" << packet_id
1990 <<
" reason:" << reason_code;
1992 send_pubcomp(packet_id, reason_code,
force_move(props));
2009 variant<suback_return_code, v5::suback_reason_code> reason,
2015 <<
" pid:" << packet_id
2016 <<
" reason:" < reason;
2019 send_suback(std::vector<suback_return_code>{ variant_get<suback_return_code>(reason) }, packet_id,
force_move(props));
2022 send_suback(std::vector<v5::suback_reason_code>{ variant_get<v5::suback_reason_code>(reason) }, packet_id,
force_move(props));
2040 variant<std::vector<suback_return_code>, std::vector<v5::suback_reason_code>> reasons,
2046 <<
" pid:" << packet_id;
2061 <<
" pid:" << packet_id;
2063 send_unsuback(packet_id);
2086 <<
" pid:" << packet_id
2087 <<
" reason:" << reason;
2089 send_unsuback(std::vector<v5::unsuback_reason_code>{ reason }, packet_id,
force_move(props));
2106 std::vector<v5::unsuback_reason_code> reasons,
2112 <<
" pid:" << packet_id;
2141 template <
typename T,
typename... Params>
2142 std::enable_if_t< ! std::is_convertible<std::decay_t<T>,
packet_id_t>::value >
2146 async_publish(packet_id, std::forward<T>(t), std::forward<Params>(params)...);
2149 async_publish(0, std::forward<T>(t), std::forward<Params>(params)...);
2166 <<
"async_disconnect";
2168 if (connected_ && mqtt_connected_) {
2169 disconnect_requested_ =
true;
2200 <<
"async_disconnect"
2201 <<
" reason:" << reason;
2203 if (connected_ && mqtt_connected_) {
2204 disconnect_requested_ =
true;
2225 template <
typename T,
typename... Params>
2226 std::enable_if_t< ! std::is_convertible<std::decay_t<T>,
packet_id_t>::value >
2229 async_subscribe(packet_id, std::forward<T>(t), std::forward<Params>(params)...);
2242 template <
typename T,
typename... Params>
2243 std::enable_if_t< ! std::is_convertible<std::decay_t<T>,
packet_id_t>::value >
2246 async_unsubscribe(packet_id, std::forward<T>(t), std::forward<Params>(params)...);
2266 std::string topic_name,
2267 std::string contents,
2274 <<
" pid:" << packet_id
2275 <<
" topic:" << topic_name
2276 <<
" qos:" << pubopts.
get_qos()
2278 <<
" dup:" << pubopts.
get_dup();
2282 auto sp_topic_name = std::make_shared<std::string>(
force_move(topic_name));
2283 auto sp_contents = std::make_shared<std::string>(
force_move(contents));
2284 auto topic_name_buf =
as::buffer(*sp_topic_name);
2285 auto contents_buf =
as::buffer(*sp_contents);
2327 std::string topic_name,
2328 std::string contents,
2331 any life_keeper = {},
2337 <<
" pid:" << packet_id
2338 <<
" topic:" << topic_name
2339 <<
" qos:" << pubopts.
get_qos()
2341 <<
" dup:" << pubopts.
get_dup();
2345 auto sp_topic_name = std::make_shared<std::string>(
force_move(topic_name));
2346 auto sp_contents = std::make_shared<std::string>(
force_move(contents));
2347 auto topic_name_buf =
as::buffer(*sp_topic_name);
2348 auto contents_buf =
as::buffer(*sp_contents);
2388 template <
typename ConstBufferSequence>
2389 typename std::enable_if<
2390 as::is_const_buffer_sequence<ConstBufferSequence>::value
2394 as::const_buffer topic_name,
2395 ConstBufferSequence contents,
2397 any life_keeper = {},
2403 <<
" pid:" << packet_id
2405 <<
" qos:" << pubopts.
get_qos()
2407 <<
" dup:" << pubopts.
get_dup();
2445 template <
typename ConstBufferSequence>
2446 typename std::enable_if<
2447 as::is_const_buffer_sequence<ConstBufferSequence>::value
2451 as::const_buffer topic_name,
2452 ConstBufferSequence contents,
2455 any life_keeper = {},
2461 <<
" pid:" << packet_id
2463 <<
" qos:" << pubopts.
get_qos()
2465 <<
" dup:" << pubopts.
get_dup();
2499 template <
typename BufferSequence>
2500 typename std::enable_if<
2501 is_buffer_sequence<BufferSequence>::value
2506 BufferSequence contents,
2508 any life_keeper = {},
2514 <<
" pid:" << packet_id
2515 <<
" topic:" << topic_name
2516 <<
" qos:" << pubopts.
get_qos()
2518 <<
" dup:" << pubopts.
get_dup();
2522 auto topic_name_buf =
as::buffer(topic_name);
2524 std::vector<as::const_buffer> cbs;
2528 cbs.reserve(
static_cast<std::size_t
>(std::distance(b, e)));
2529 for (; b != e; ++b) {
2576 template <
typename BufferSequence>
2577 typename std::enable_if<
2578 is_buffer_sequence<BufferSequence>::value
2583 BufferSequence contents,
2586 any life_keeper = {},
2592 <<
" pid:" << packet_id
2593 <<
" topic:" << topic_name
2594 <<
" qos:" << pubopts.
get_qos()
2596 <<
" dup:" << pubopts.
get_dup();
2600 auto topic_name_buf =
as::buffer(topic_name);
2602 std::vector<as::const_buffer> cbs;
2606 cbs.reserve(
static_cast<std::size_t
>(std::distance(b, e)));
2607 for (; b != e; ++b) {
2648 std::string topic_filter,
2654 <<
"async_subscribe"
2655 <<
" pid:" << packet_id
2656 <<
" topic:" << topic_filter
2657 <<
" qos:" << option.
get_qos()
2659 <<
" nl:" << option.
get_nl()
2660 <<
" rap:" << option.
get_rap();
2662 auto sp_topic_filter = std::make_shared<std::string>(
force_move(topic_filter));
2663 auto topic_filter_buf =
as::buffer(*sp_topic_filter);
2665 async_send_subscribe(
2666 std::vector<std::tuple<as::const_buffer, subscribe_options>>{ { topic_filter_buf, option } },
2698 std::string topic_filter,
2705 <<
"async_subscribe"
2706 <<
" pid:" << packet_id
2707 <<
" topic:" << topic_filter
2708 <<
" qos:" << option.
get_qos()
2710 <<
" nl:" << option.
get_nl()
2711 <<
" rap:" << option.
get_rap();
2713 auto sp_topic_filter = std::make_shared<std::string>(
force_move(topic_filter));
2714 auto topic_filter_buf =
as::buffer(*sp_topic_filter);
2716 async_send_subscribe(
2717 std::vector<std::tuple<as::const_buffer, subscribe_options>>{ { topic_filter_buf, option } },
2746 as::const_buffer topic_filter,
2752 <<
"async_subscribe"
2753 <<
" pid:" << packet_id
2755 <<
" qos:" << option.
get_qos()
2757 <<
" nl:" << option.
get_nl()
2758 <<
" rap:" << option.
get_rap();
2760 async_send_subscribe(
2761 std::vector<std::tuple<as::const_buffer, subscribe_options>>{ { topic_filter, option } },
2791 as::const_buffer topic_filter,
2798 <<
"async_subscribe"
2799 <<
" pid:" << packet_id
2801 <<
" qos:" << option.
get_qos()
2803 <<
" nl:" << option.
get_nl()
2804 <<
" rap:" << option.
get_rap();
2806 async_send_subscribe(
2807 std::vector<std::tuple<as::const_buffer, subscribe_options>>{ { topic_filter, option } },
2838 <<
"async_subscribe"
2839 <<
" pid:" << packet_id
2840 <<
" topic:" << topic_filter
2841 <<
" qos:" << option.
get_qos()
2843 <<
" nl:" << option.
get_nl()
2844 <<
" rap:" << option.
get_rap();
2846 auto topic_filter_buf =
as::buffer(topic_filter);
2847 async_send_subscribe(
2848 std::vector<std::tuple<as::const_buffer, subscribe_options>>{ { topic_filter_buf, option } },
2887 <<
"async_subscribe"
2888 <<
" pid:" << packet_id
2889 <<
" topic:" << topic_filter
2890 <<
" qos:" << option.
get_qos()
2892 <<
" nl:" << option.
get_nl()
2893 <<
" rap:" << option.
get_rap();
2895 auto topic_filter_buf =
as::buffer(topic_filter);
2896 async_send_subscribe(
2897 std::vector<std::tuple<as::const_buffer, subscribe_options>>{ { topic_filter_buf, option } },
2923 std::vector<std::tuple<std::string, subscribe_options>> params,
2928 <<
"async_subscribe"
2929 <<
" pid:" << packet_id;
2931 std::vector<std::tuple<as::const_buffer, subscribe_options>> cb_params;
2932 cb_params.reserve(params.size());
2934 std::vector<std::shared_ptr<std::string>> life_keepers;
2935 life_keepers.reserve(params.size());
2937 for (
auto&& e : params) {
2938 auto sp_topic_filter = std::make_shared<std::string>(
force_move(std::get<0>(e)));
2939 cb_params.emplace_back(
as::buffer(*sp_topic_filter), std::get<1>(e));
2940 life_keepers.emplace_back(
force_move(sp_topic_filter));
2943 async_send_subscribe(
2974 std::vector<std::tuple<std::string, subscribe_options>> params,
2980 <<
"async_subscribe"
2981 <<
" pid:" << packet_id;
2983 std::vector<std::tuple<as::const_buffer, subscribe_options>> cb_params;
2984 cb_params.reserve(params.size());
2986 std::vector<std::shared_ptr<std::string>> life_keepers;
2987 life_keepers.reserve(params.size());
2989 for (
auto&& e : params) {
2990 auto sp_topic_filter = std::make_shared<std::string>(
force_move(std::get<0>(e)));
2991 cb_params.emplace_back(
as::buffer(*sp_topic_filter), std::get<1>(e));
2992 life_keepers.emplace_back(
force_move(sp_topic_filter));
2994 async_send_subscribe(
3019 std::vector<std::tuple<as::const_buffer, subscribe_options>> params,
3024 <<
"async_subscribe"
3025 <<
" pid:" << packet_id;
3027 async_send_subscribe(
3052 std::vector<std::tuple<as::const_buffer, subscribe_options>> params,
3058 <<
"async_subscribe"
3059 <<
" pid:" << packet_id;
3061 async_send_subscribe(
3082 std::vector<std::tuple<buffer, subscribe_options>> params,
3087 <<
"async_subscribe"
3088 <<
" pid:" << packet_id;
3090 std::vector<std::tuple<as::const_buffer, subscribe_options>> cb_params;
3091 cb_params.reserve(params.size());
3093 for (
auto&& e : params) {
3094 cb_params.emplace_back(
3100 async_send_subscribe(
3127 std::vector<std::tuple<buffer, subscribe_options>> params,
3133 <<
"async_subscribe"
3134 <<
" pid:" << packet_id;
3136 std::vector<std::tuple<as::const_buffer, subscribe_options>> cb_params;
3137 cb_params.reserve(params.size());
3139 for (
auto&& e : params) {
3140 cb_params.emplace_back(
3146 async_send_subscribe(
3170 std::string topic_filter,
3175 <<
"async_unsubscribe"
3176 <<
" pid:" << packet_id
3177 <<
" topic:" << topic_filter;
3179 auto sp_topic_filter = std::make_shared<std::string>(
force_move(topic_filter));
3180 auto topic_filter_buf =
as::buffer(*sp_topic_filter);
3181 async_send_unsubscribe(
3182 std::vector<as::const_buffer>{ topic_filter_buf },
3206 as::const_buffer topic_filter,
3211 <<
"async_unsubscribe"
3212 <<
" pid:" << packet_id
3215 async_send_unsubscribe(std::vector<as::const_buffer>{ topic_filter }, packet_id,
v5::properties{},
force_move(func));
3236 <<
"async_unsubscribe"
3237 <<
" pid:" << packet_id
3238 <<
" topic:" << topic_filter;
3240 auto topic_filter_buf =
as::buffer(topic_filter);
3241 async_send_unsubscribe(std::vector<as::const_buffer>{ topic_filter_buf },
3273 <<
"async_unsubscribe"
3274 <<
" pid:" << packet_id
3275 <<
" topic:" << topic_filter;
3277 auto topic_filter_buf =
as::buffer(topic_filter);
3278 async_send_unsubscribe(std::vector<as::const_buffer>{ topic_filter_buf },
3301 std::vector<std::string> params,
3306 <<
"async_unsubscribe"
3307 <<
" pid:" << packet_id;
3309 std::vector<as::const_buffer> cb_params;
3310 cb_params.reserve(params.size());
3312 std::vector<std::shared_ptr<std::string>> life_keepers;
3313 life_keepers.reserve(params.size());
3315 for (
auto&& e : params) {
3316 life_keepers.emplace_back(std::make_shared<std::string>(
force_move(e)));
3317 cb_params.emplace_back(
as::buffer(*life_keepers.back()));
3320 async_send_unsubscribe(
3349 std::vector<std::string> params,
3355 <<
"async_unsubscribe"
3356 <<
" pid:" << packet_id;
3358 std::vector<as::const_buffer> cb_params;
3359 cb_params.reserve(params.size());
3366 std::vector<std::shared_ptr<std::string>> life_keepers;
3367 life_keepers.reserve(params.size());
3369 for (
auto&& e : params) {
3370 life_keepers.emplace_back(std::make_shared<std::string>(
force_move(e)));
3371 cb_params.emplace_back(
as::buffer(*life_keepers.back()));
3374 async_send_unsubscribe(
3400 std::vector<as::const_buffer> params,
3405 <<
"async_unsubscribe"
3406 <<
" pid:" << packet_id;
3408 async_send_unsubscribe(
3435 std::vector<as::const_buffer> params,
3441 <<
"async_unsubscribe"
3442 <<
" pid:" << packet_id;
3444 async_send_unsubscribe(
3466 std::vector<buffer> params,
3471 <<
"async_unsubscribe"
3472 <<
" pid:" << packet_id;
3474 std::vector<as::const_buffer> cb_params;
3475 cb_params.reserve(params.size());
3476 for (
auto const& buf : params) {
3480 async_send_unsubscribe(
3509 std::vector<buffer> params,
3515 <<
"async_unsubscribe"
3516 <<
" pid:" << packet_id;
3518 std::vector<as::const_buffer> cb_params;
3519 cb_params.reserve(params.size());
3520 for (
auto const& buf : params) {
3524 async_send_unsubscribe(
3546 if (connected_ && mqtt_connected_) async_send_pingreq(
force_move(func));
3585 <<
" reason:" << reason_code;
3615 optional<buffer> user_name,
3616 optional<buffer> password,
3618 std::uint16_t keep_alive_sec,
3660 optional<buffer> user_name,
3661 optional<buffer> password,
3663 std::uint16_t keep_alive_sec,
3670 <<
" client_id:" << client_id
3672 <<
" keep_alive:" << std::dec << keep_alive_sec;
3674 connect_requested_ =
true;
3694 bool session_present,
3695 variant<connect_return_code, v5::connect_reason_code> reason_code,
3701 <<
" session_present:" << std::boolalpha << session_present
3702 <<
" reason:" << reason_code;
3720 bool session_present,
3721 variant<connect_return_code, v5::connect_reason_code> reason_code,
3728 <<
" session_present:" << std::boolalpha << session_present
3729 <<
" reason:" << reason_code;
3748 <<
" pid:" << packet_id;
3777 <<
" pid:" << packet_id
3778 <<
" reason:" << reason_code;
3797 <<
" pid:" << packet_id;
3826 <<
" pid:" << packet_id
3827 <<
" reason:" << reason_code;
3846 <<
" pid:" << packet_id;
3878 any life_keeper = {},
3884 <<
" pid:" << packet_id
3885 <<
" reason:" << reason_code;
3904 <<
" pid:" << packet_id;
3933 <<
" pid:" << packet_id
3934 <<
" reason:" << reason_code;
3952 variant<suback_return_code, v5::suback_reason_code> reason,
3958 <<
" pid:" << packet_id
3959 <<
" reason:" < reason;
3962 async_send_suback(std::vector<suback_return_code>{ variant_get<suback_return_code>(reason) }, packet_id,
v5::properties{},
force_move(func));
3965 async_send_suback(std::vector<v5::suback_reason_code>{ variant_get<v5::suback_reason_code>(reason) }, packet_id,
v5::properties{},
force_move(func));
3986 variant<suback_return_code, v5::suback_reason_code> reason,
3993 <<
" pid:" << packet_id
3994 <<
" reason:" < reason;
3997 async_send_suback(std::vector<suback_return_code>{ variant_get<suback_return_code>(reason) }, packet_id,
force_move(props),
force_move(func));
4000 async_send_suback(std::vector<v5::suback_reason_code>{ variant_get<v5::suback_reason_code>(reason) }, packet_id,
force_move(props),
force_move(func));
4017 variant<std::vector<suback_return_code>, std::vector<v5::suback_reason_code>> reasons,
4023 <<
" pid:" << packet_id;
4045 variant<std::vector<suback_return_code>, std::vector<v5::suback_reason_code>> reasons,
4052 <<
" pid:" << packet_id;
4076 <<
" pid:" << packet_id
4077 <<
" reason:" < reason;
4079 async_send_unsuback(std::vector<v5::unsuback_reason_code>{ reason }, packet_id,
force_move(func));
4106 <<
" pid:" << packet_id
4107 <<
" reason:" < reason;
4109 async_send_unsuback(std::vector<v5::unsuback_reason_code>{ reason }, packet_id,
force_move(props),
force_move(func));
4125 std::vector<v5::unsuback_reason_code> reasons,
4131 <<
" pid:" << packet_id;
4153 std::vector<v5::unsuback_reason_code> reasons,
4160 <<
" pid:" << packet_id;
4180 <<
" pid:" << packet_id;
4182 async_send_unsuback(packet_id,
force_move(func));
4190 LockGuard<Mutex> lck (store_mtx_);
4191 auto& idx = store_.template get<tag_packet_id>();
4192 auto r = idx.equal_range(packet_id);
4193 idx.erase(std::get<0>(r), std::get<1>(r));
4204 <<
"for_each_store(ptr, size)";
4205 LockGuard<Mutex> lck (store_mtx_);
4206 auto const& idx = store_.template get<tag_seq>();
4207 for (
auto const & e : idx) {
4208 auto const& m = e.message();
4210 f(cb.data(), cb.size());
4221 <<
"for_each_store(store_message_variant)";
4222 LockGuard<Mutex> lck (store_mtx_);
4223 auto const& idx = store_.template get<tag_seq>();
4224 for (
auto const & e : idx) {
4237 <<
"for_each_store(store_message_variant, life_keeper)";
4238 LockGuard<Mutex> lck (store_mtx_);
4239 auto const& idx = store_.template get<tag_seq>();
4240 for (
auto const & e : idx) {
4241 f(e.message(), e.life_keeper());
4269 LockGuard<Mutex> lck (store_mtx_);
4281 LockGuard<Mutex> lck (store_mtx_);
4292 LockGuard<Mutex> lck (store_mtx_);
4302 template <
typename Iterator>
4303 std::enable_if_t< std::is_convertible<typename Iterator::value_type, char>::value >
4307 typename std::iterator_traits<Iterator>::iterator_category,
4308 std::random_access_iterator_tag
4310 "Iterators provided to restore_serialized_message() must be random access iterators."
4315 <<
"restore_serialized_message(b, e)";
4319 auto fixed_header =
static_cast<std::uint8_t
>(*b);
4324 <<
"invalid fixed_header ignored. "
4325 << std::hex << static_cast<int>(fixed_header);
4328 switch (cpt_opt.value()) {
4352 <<
"invalid control packet type. "
4353 << std::hex << static_cast<int>(fixed_header);
4369 LockGuard<Mutex> lck (store_mtx_);
4371 auto ret = store_.emplace(
4387 ((qos_value == qos::at_least_once) ? control_packet_type::puback
4388 : control_packet_type::pubrec),
4390 force_move(life_keeper)
4405 LockGuard<Mutex> lck (store_mtx_);
4407 auto ret = store_.emplace(
4422 control_packet_type::pubcomp,
4424 force_move(life_keeper)
4439 template <
typename Iterator>
4440 std::enable_if_t< std::is_convertible<typename Iterator::value_type, char>::value >
4444 auto fixed_header =
static_cast<std::uint8_t
>(*b);
4449 <<
"invalid fixed_header ignored. "
4450 << std::hex << static_cast<int>(fixed_header);
4453 switch (cpt_opt.value()) {
4471 <<
"invalid control packet type. "
4472 << std::hex << static_cast<int>(fixed_header);
4486 BOOST_ASSERT(!msg.
topic().empty());
4489 LockGuard<Mutex> lck (store_mtx_);
4491 auto ret = store_.emplace(
4507 qos == qos::at_least_once ? control_packet_type::puback
4508 : control_packet_type::pubrec,
4510 force_move(life_keeper)
4527 LockGuard<Mutex> lck (store_mtx_);
4529 auto ret = store_.emplace(
4544 control_packet_type::pubcomp,
4554 struct restore_basic_message_variant_visitor {
4555 restore_basic_message_variant_visitor(this_type& ep, any life_keeper):ep_(ep), life_keeper_(
force_move(life_keeper)) {}
4557 void operator()(basic_publish_message<PacketIdBytes>&& msg) {
4560 void operator()(basic_pubrel_message<PacketIdBytes>&& msg) {
4563 void operator()(v5::basic_publish_message<PacketIdBytes>&& msg) {
4566 void operator()(v5::basic_pubrel_message<PacketIdBytes>&& msg) {
4569 template <
typename T>
4570 void operator()(T&&)
const {
4571 throw restore_type_error();
4586 [&](
auto msg,
auto const& serialize) {
4587 preprocess_publish_message(
4597 [&](
auto msg,
auto const& serialize) {
4598 auto packet_id = msg.packet_id();
4600 LockGuard<Mutex> lck (store_mtx_);
4602 auto ret = store_.emplace(
4609 BOOST_ASSERT(ret.second);
4610 (this->*serialize)(msg);
4619 <<
"send_store_message publish v3.1.1";
4620 publish_proc(
force_move(m), &endpoint::on_serialize_publish_message);
4625 <<
"send_store_message pubrel v3.1.1";
4626 pubrel_proc(
force_move(m), &endpoint::on_serialize_pubrel_message);
4631 <<
"send_store_message publish v5";
4632 publish_proc(
force_move(m), &endpoint::on_serialize_v5_publish_message);
4637 <<
"send_store_message pubrel v5";
4638 pubrel_proc(
force_move(m), &endpoint::on_serialize_v5_pubrel_message);
4647 [&](
auto msg,
auto const& serialize) {
4648 preprocess_publish_message(
4658 [&](
auto msg,
auto const& serialize) {
4659 auto packet_id = msg.packet_id();
4661 LockGuard<Mutex> lck (store_mtx_);
4663 auto ret = store_.emplace(
4670 BOOST_ASSERT(ret.second);
4671 (this->*serialize)(msg);
4680 <<
"async_send_store_message publish v3.1.1";
4681 publish_proc(
force_move(m), &endpoint::on_serialize_publish_message);
4686 <<
"async_send_store_message pubrel v3.1.1";
4687 pubrel_proc(
force_move(m), &endpoint::on_serialize_pubrel_message);
4692 <<
"async_send_store_message publish v5";
4693 publish_proc(
force_move(m), &endpoint::on_serialize_v5_publish_message);
4698 <<
"async_send_store_message pubrel v5";
4699 pubrel_proc(
force_move(m), &endpoint::on_serialize_v5_pubrel_message);
4711 return connected_ && mqtt_connected_;
4744 max_queue_send_count_ = count;
4760 max_queue_send_size_ =
size;
4776 return socket_->get_executor();
4804 socket_->async_read(
4806 [
this,
self = this->shared_from_this(), session_life_keeper =
force_move(session_life_keeper)](
4808 std::size_t bytes_transferred)
mutable {
4809 this->total_bytes_received_ += bytes_transferred;
4810 if (!check_error_and_transferred_length(ec, bytes_transferred, 1))
return;
4818 if (!ec)
return false;
4820 mqtt_connected_ =
false;
4823 socket_->close(ignored_ec);
4827 LockGuard<Mutex> lck (topic_alias_send_mtx_);
4828 if (topic_alias_send_) topic_alias_send_.value().clear();
4831 LockGuard<Mutex> lck (topic_alias_recv_mtx_);
4832 if (topic_alias_recv_) topic_alias_recv_.value().clear();
4834 if (disconnect_requested_) {
4835 disconnect_requested_ =
false;
4836 connect_requested_ =
false;
4837 clean_sub_unsub_inflight();
4841 disconnect_requested_ =
false;
4842 connect_requested_ =
false;
4843 if (!ec) ec = boost::system::errc::make_error_code(boost::system::errc::not_connected);
4844 clean_sub_unsub_inflight_on_error(ec);
4857 LockGuard<Mutex> lck (store_mtx_);
4863 bool check_transferred_length(
4864 std::size_t bytes_transferred,
4865 std::size_t bytes_expected) {
4866 if (bytes_transferred != bytes_expected) {
4867 call_bad_message_error_handlers();
4873 bool check_error_and_transferred_length(
4875 std::size_t bytes_transferred,
4876 std::size_t bytes_expected) {
4878 if (!check_transferred_length(bytes_transferred, bytes_expected))
return false;
4882 void call_bad_message_error_handlers() {
4883 clean_sub_unsub_inflight_on_error(boost::system::errc::make_error_code(boost::system::errc::bad_message));
4886 void call_protocol_error_handlers() {
4887 clean_sub_unsub_inflight_on_error(boost::system::errc::make_error_code(boost::system::errc::protocol_error));
4890 template <
typename T>
4891 void shutdown(T&
socket) {
4893 mqtt_connected_ =
false;
4901 send_buffer():buf_(std::make_shared<std::string>(static_cast<int>(payload_position_), 0)) {}
4903 std::shared_ptr<std::string>
const& buf()
const {
4907 std::shared_ptr<std::string>& buf() {
4911 std::pair<char*, std::size_t> finalize(std::uint8_t fixed_header) {
4913 std::size_t start_position = payload_position_ - rb.size() - 1;
4914 (*buf_)[start_position] = fixed_header;
4915 buf_->replace(start_position + 1, rb.size(), rb);
4916 return std::make_pair(
4917 &(*buf_)[start_position],
4918 buf_->size() - start_position);
4921 static constexpr std::size_t payload_position_ = 5;
4922 std::shared_ptr<std::string> buf_;
4929 basic_store_message_variant<PacketIdBytes> smv,
4930 any life_keeper = any())
4932 , expected_control_packet_type_(type)
4935 packet_id_t packet_id()
const {
return packet_id_; }
4936 control_packet_type expected_control_packet_type()
const {
return expected_control_packet_type_; }
4937 basic_store_message_variant<PacketIdBytes>
const& message()
const {
4940 basic_store_message_variant<PacketIdBytes>& message() {
4943 any
const& life_keeper()
const {
4944 return life_keeper_;
4950 basic_store_message_variant<PacketIdBytes> smv_;
4954 struct tag_packet_id {};
4955 struct tag_packet_id_type {};
4957 using mi_store = mi::multi_index_container<
4961 mi::tag<tag_packet_id_type>,
4970 &store::expected_control_packet_type
4974 mi::ordered_non_unique<
4975 mi::tag<tag_packet_id>,
4987 void handle_control_packet_type(any session_life_keeper, this_type_sp
self) {
4988 fixed_header_ =
static_cast<std::uint8_t
>(buf_.front());
4989 remaining_length_ = 0;
4990 remaining_length_multiplier_ = 1;
4991 socket_->async_read(
4995 std::size_t bytes_transferred)
mutable {
4996 this->total_bytes_received_ += bytes_transferred;
4997 if (!check_error_and_transferred_length(ec, bytes_transferred, 1))
return;
5003 bool calc_variable_length(std::size_t& v, std::size_t& multiplier,
char buf) {
5004 v += (buf & 0b01111111) * multiplier;
5006 return multiplier <= 128 * 128 * 128 * 128;
5009 void handle_remaining_length(any session_life_keeper, this_type_sp
self) {
5010 if (!calc_variable_length(remaining_length_, remaining_length_multiplier_, buf_.front())) {
5011 call_bad_message_error_handlers();
5014 if (buf_.front() & variable_length_continue_flag) {
5015 socket_->async_read(
5019 std::size_t bytes_transferred)
mutable {
5020 this->total_bytes_received_ += bytes_transferred;
5024 if (bytes_transferred != 1) {
5025 call_bad_message_error_handlers();
5035 call_bad_message_error_handlers();
5038 auto cpt = cpt_opt.value();
5049 return check_is_valid_length(cpt, remaining_length_);
5051 return remaining_length_ == 2;
5061 return remaining_length_ == 0;
5085 return check_is_valid_length(cpt, remaining_length_);
5088 return remaining_length_ == 0;
5094 call_protocol_error_handlers();
5102 void process_payload(any session_life_keeper, this_type_sp
self) {
5106 (*std::make_shared<process_connect>(*
this, remaining_length_ < packet_bulk_read_limit_))
5110 (*std::make_shared<process_connack>(*
this, remaining_length_ < packet_bulk_read_limit_))
5114 if (mqtt_connected_) {
5115 (*std::make_shared<process_publish>(*
this, remaining_length_ < packet_bulk_read_limit_))
5119 call_protocol_error_handlers();
5123 if (mqtt_connected_) {
5124 (*std::make_shared<process_puback>(*
this, remaining_length_ < packet_bulk_read_limit_))
5128 call_protocol_error_handlers();
5132 if (mqtt_connected_) {
5133 (*std::make_shared<process_pubrec>(*
this, remaining_length_ < packet_bulk_read_limit_))
5137 call_protocol_error_handlers();
5141 if (mqtt_connected_) {
5142 (*std::make_shared<process_pubrel>(*
this, remaining_length_ < packet_bulk_read_limit_))
5146 call_protocol_error_handlers();
5150 if (mqtt_connected_) {
5151 (*std::make_shared<process_pubcomp>(*
this, remaining_length_ < packet_bulk_read_limit_))
5155 call_protocol_error_handlers();
5159 if (mqtt_connected_) {
5160 (*std::make_shared<process_subscribe>(*
this, remaining_length_ < packet_bulk_read_limit_))
5164 call_protocol_error_handlers();
5168 if (mqtt_connected_) {
5169 (*std::make_shared<process_suback>(*
this, remaining_length_ < packet_bulk_read_limit_))
5173 call_protocol_error_handlers();
5177 if (mqtt_connected_) {
5178 (*std::make_shared<process_unsubscribe>(*
this, remaining_length_ < packet_bulk_read_limit_))
5182 call_protocol_error_handlers();
5186 if (mqtt_connected_) {
5187 (*std::make_shared<process_unsuback>(*
this, remaining_length_ < packet_bulk_read_limit_))
5191 call_protocol_error_handlers();
5195 if (mqtt_connected_) {
5196 process_pingreq(
force_move(session_life_keeper));
5199 call_protocol_error_handlers();
5203 if (mqtt_connected_) {
5204 process_pingresp(
force_move(session_life_keeper));
5207 call_protocol_error_handlers();
5211 (*std::make_shared<process_disconnect>(*
this, remaining_length_ < packet_bulk_read_limit_))
5215 (*std::make_shared<process_auth>(*
this, remaining_length_ < packet_bulk_read_limit_))
5223 using parse_handler_variant =
5225 std::conditional_t<
sizeof(std::size_t) == 4, std::nullptr_t, std::size_t>,
5231 using parse_handler =
5234 this_type_sp&& spep,
5235 any&& session_life_keeper,
5236 parse_handler_variant,
5242 void process_nbytes(
5243 this_type_sp&&
self,
5244 any&& session_life_keeper,
5247 parse_handler&& handler
5249 if (remaining_length_ <
size) {
5250 call_protocol_error_handlers();
5253 remaining_length_ -=
size;
5257 auto ptr = spa.get();
5258 socket_->async_read(
5263 session_life_keeper =
force_move(session_life_keeper),
5268 std::size_t bytes_transferred)
mutable {
5269 this->total_bytes_received_ += bytes_transferred;
5270 if (!check_error_and_transferred_length(ec, bytes_transferred, buf.size()))
return;
5281 if (buf.size() <
size) {
5282 call_protocol_error_handlers();
5288 buf.substr(0,
size),
5294 template <std::
size_t Bytes>
5295 void process_fixed_length(
5296 this_type_sp&&
self,
5297 any&& session_life_keeper,
5299 parse_handler&& handler
5301 if (remaining_length_ < Bytes) {
5302 call_protocol_error_handlers();
5306 remaining_length_ -= Bytes;
5309 socket_->async_read(
5314 session_life_keeper =
force_move(session_life_keeper),
5318 std::size_t bytes_transferred)
mutable {
5319 this->total_bytes_received_ += bytes_transferred;
5320 if (!check_error_and_transferred_length(ec, bytes_transferred, Bytes))
return;
5324 make_two_or_four_byte<Bytes>::apply(
5326 std::next(buf_.data(), boost::numeric_cast<buffer::difference_type>(Bytes))
5335 make_two_or_four_byte<Bytes>::apply(
5337 std::next(buf.data(), boost::numeric_cast<buffer::difference_type>(Bytes))
5339 buf.remove_prefix(Bytes);
5351 void process_variable_length(
5352 this_type_sp&&
self,
5353 any&& session_life_keeper,
5355 parse_handler&& handler
5357 process_variable_length_impl(
5367 void process_variable_length_impl(
5368 this_type_sp&&
self,
5369 any&& session_life_keeper,
5371 parse_handler&& handler,
5373 std::size_t multiplier
5375 if (remaining_length_ == 0) {
5376 call_protocol_error_handlers();
5379 --remaining_length_;
5387 this_type_sp&&
self,
5388 any&& session_life_keeper,
5392 std::size_t multiplier
5394 if (!calc_variable_length(
size, multiplier, buf.front())) {
5395 call_protocol_error_handlers();
5398 if (buf.front() & variable_length_continue_flag) {
5399 BOOST_ASSERT(!buf.empty());
5400 buf.remove_prefix(1);
5401 process_variable_length_impl(
5405 std::forward<decltype(handler)>(handler),
5411 buf.remove_prefix(1);
5422 socket_->async_read(
5427 session_life_keeper =
force_move(session_life_keeper),
5434 std::size_t bytes_transferred)
mutable {
5435 this->total_bytes_received_ += bytes_transferred;
5436 if (!check_error_and_transferred_length(ec, bytes_transferred, 1))
return;
5460 void process_packet_id(
5461 this_type_sp&&
self,
5462 any&& session_life_keeper,
5464 parse_handler&& handler
5466 process_fixed_length<sizeof(packet_id_t)>(
5474 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
5475 auto packet_id = variant_get<packet_id_t>(var);
5476 if (packet_id == 0) {
5477 call_protocol_error_handlers();
5490 void process_binary(
5491 this_type_sp&&
self,
5492 any&& session_life_keeper,
5494 parse_handler&& handler
5496 if (remaining_length_ < 2) {
5497 call_protocol_error_handlers();
5500 process_fixed_length<2>(
5508 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
5509 auto size = variant_get<typename two_or_four_byte_type<2>::type>(var);
5510 if (remaining_length_ <
size) {
5511 call_protocol_error_handlers();
5525 void process_string(
5526 this_type_sp&&
self,
5527 any&& session_life_keeper,
5529 parse_handler&& handler
5536 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
5537 auto& str = variant_get<buffer>(var);
5540 call_bad_message_error_handlers();
5554 void process_properties(
5555 this_type_sp&&
self,
5556 any&& session_life_keeper,
5558 parse_handler&& handler
5560 process_variable_length(
5568 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
5569 auto property_length = variant_get<std::size_t>(var);
5570 if (property_length > remaining_length_) {
5571 call_protocol_error_handlers();
5574 if (property_length == 0) {
5585 struct spa_address_len {
5591 [&] () -> spa_address_len {
5592 if (property_length < props_bulk_read_limit_) {
5594 auto ptr = spa.get();
5609 socket_->async_read(
5615 session_life_keeper =
force_move(session_life_keeper),
5619 (
error_code ec, std::size_t bytes_transferred)
mutable {
5620 this->total_bytes_received_ += bytes_transferred;
5621 if (!check_error_and_transferred_length(ec, bytes_transferred, result.len))
return;
5622 process_property_id(
5634 process_property_id(
5647 void process_property_id(
5648 this_type_sp&&
self,
5649 any&& session_life_keeper,
5651 std::size_t property_length_rest,
5653 parse_handler&& handler
5656 if (property_length_rest == 0) {
5666 --remaining_length_;
5668 socket_->async_read(
5673 session_life_keeper =
force_move(session_life_keeper),
5676 property_length_rest
5679 std::size_t bytes_transferred)
mutable {
5680 this->total_bytes_received_ += bytes_transferred;
5681 if (!check_error_and_transferred_length(ec, bytes_transferred, 1))
return;
5682 process_property_body(
5687 property_length_rest - 1,
5696 buf.remove_prefix(1);
5697 process_property_body(
5702 property_length_rest - 1,
5709 void process_property_body(
5710 this_type_sp&&
self,
5711 any&& session_life_keeper,
5714 std::size_t property_length_rest,
5716 parse_handler&& handler
5719 static constexpr std::size_t length_bytes = 2;
5721 if (property_length_rest == 0) {
5722 call_protocol_error_handlers();
5728 static constexpr std::size_t len = 1;
5729 if (property_length_rest < len) {
5730 call_protocol_error_handlers();
5742 rest = property_length_rest - len
5744 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
5745 auto& body = variant_get<buffer>(var);
5747 v5::property::payload_format_indicator(body.begin(), body.end())
5749 process_property_id(
5761 static constexpr std::size_t len = 4;
5762 if (property_length_rest < len) {
5763 call_protocol_error_handlers();
5775 rest = property_length_rest - len
5777 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
5778 auto& body = variant_get<buffer>(var);
5780 v5::property::message_expiry_interval(body.begin(), body.end())
5782 process_property_id(
5802 property_length_rest
5804 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
5805 auto& body = variant_get<buffer>(var);
5806 auto rest = property_length_rest - length_bytes - body.size();
5808 v5::property::content_type(
force_move(body),
true)
5810 process_property_id(
5830 property_length_rest
5832 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
5833 auto& body = variant_get<buffer>(var);
5834 auto rest = property_length_rest - length_bytes - body.size();
5836 v5::property::response_topic(
force_move(body),
true)
5838 process_property_id(
5858 property_length_rest
5860 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
5861 auto& body = variant_get<buffer>(var);
5862 auto rest = property_length_rest - length_bytes - body.size();
5864 v5::property::correlation_data(
force_move(body),
true)
5866 process_property_id(
5878 process_variable_length(
5886 property_length_rest,
5887 remaining_length_before = remaining_length_
5889 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
5890 auto size = variant_get<std::size_t>(var);
5891 auto consumed = remaining_length_before - remaining_length_;
5892 auto rest = property_length_rest - consumed;
5894 v5::property::subscription_identifier(
size)
5896 process_property_id(
5908 static constexpr std::size_t len = 4;
5909 if (property_length_rest < len) {
5910 call_protocol_error_handlers();
5922 rest = property_length_rest - len
5924 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
5925 auto& body = variant_get<buffer>(var);
5927 v5::property::session_expiry_interval(body.begin(), body.end())
5929 process_property_id(
5949 property_length_rest
5951 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
5952 auto& body = variant_get<buffer>(var);
5953 auto rest = property_length_rest - length_bytes - body.size();
5955 v5::property::assigned_client_identifier(
force_move(body),
true)
5957 process_property_id(
5970 static constexpr std::size_t len = 2;
5971 if (property_length_rest < len) {
5972 call_protocol_error_handlers();
5984 rest = property_length_rest - len
5986 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
5987 auto& body = variant_get<buffer>(var);
5989 v5::property::server_keep_alive(body.begin(), body.end())
5991 process_property_id(
6011 property_length_rest
6013 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
6014 auto& body = variant_get<buffer>(var);
6015 auto rest = property_length_rest - length_bytes - body.size();
6017 v5::property::authentication_method(
force_move(body),
true)
6019 process_property_id(
6039 property_length_rest
6041 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
6042 auto& body = variant_get<buffer>(var);
6043 auto rest = property_length_rest - length_bytes - body.size();
6045 v5::property::authentication_data(
force_move(body))
6047 process_property_id(
6059 static constexpr std::size_t len = 1;
6060 if (property_length_rest < len) {
6061 call_protocol_error_handlers();
6073 rest = property_length_rest - len
6075 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
6076 auto& body = variant_get<buffer>(var);
6078 v5::property::request_problem_information(body.begin(), body.end())
6080 process_property_id(
6092 static constexpr std::size_t len = 4;
6093 if (property_length_rest < len) {
6094 call_protocol_error_handlers();
6106 rest = property_length_rest - len
6108 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
6109 auto& body = variant_get<buffer>(var);
6111 v5::property::will_delay_interval(body.begin(), body.end())
6113 process_property_id(
6125 static constexpr std::size_t len = 1;
6126 if (property_length_rest < len) {
6127 call_protocol_error_handlers();
6139 rest = property_length_rest - len
6141 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
6142 auto& body = variant_get<buffer>(var);
6144 v5::property::request_response_information(body.begin(), body.end())
6146 process_property_id(
6166 property_length_rest
6168 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
6169 auto& body = variant_get<buffer>(var);
6170 auto rest = property_length_rest - length_bytes - body.size();
6172 v5::property::response_information(
force_move(body),
true)
6174 process_property_id(
6194 property_length_rest
6196 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
6197 auto& body = variant_get<buffer>(var);
6198 auto rest = property_length_rest - length_bytes - body.size();
6200 v5::property::server_reference(
force_move(body),
true)
6202 process_property_id(
6222 property_length_rest
6224 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
6225 auto& body = variant_get<buffer>(var);
6226 auto rest = property_length_rest - length_bytes - body.size();
6228 v5::property::reason_string(
force_move(body),
true)
6230 process_property_id(
6242 static constexpr std::size_t len = 2;
6243 if (property_length_rest < len) {
6244 call_protocol_error_handlers();
6256 rest = property_length_rest - len
6258 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
6259 auto& body = variant_get<buffer>(var);
6261 v5::property::receive_maximum(body.begin(), body.end())
6263 process_property_id(
6275 static constexpr std::size_t len = 2;
6276 if (property_length_rest < len) {
6277 call_protocol_error_handlers();
6289 rest = property_length_rest - len
6291 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
6292 auto& body = variant_get<buffer>(var);
6294 v5::property::topic_alias_maximum(body.begin(), body.end())
6296 process_property_id(
6308 static constexpr std::size_t len = 2;
6309 if (property_length_rest < len) {
6310 call_protocol_error_handlers();
6322 rest = property_length_rest - len
6324 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
6325 auto& body = variant_get<buffer>(var);
6327 v5::property::topic_alias(body.begin(), body.end())
6329 process_property_id(
6341 static constexpr std::size_t len = 1;
6342 if (property_length_rest < len) {
6343 call_protocol_error_handlers();
6355 rest = property_length_rest - len
6357 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
6358 auto& body = variant_get<buffer>(var);
6360 v5::property::maximum_qos(body.begin(), body.end())
6362 process_property_id(
6374 static constexpr std::size_t len = 1;
6375 if (property_length_rest < len) {
6376 call_protocol_error_handlers();
6388 rest = property_length_rest - len
6390 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
6391 auto& body = variant_get<buffer>(var);
6393 v5::property::retain_available(body.begin(), body.end())
6395 process_property_id(
6415 property_length_rest
6417 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
6418 auto& key = variant_get<buffer>(var);
6419 auto rest = property_length_rest - length_bytes - key.size();
6429 property_length_rest = rest
6431 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
6432 auto& val = variant_get<buffer>(var);
6433 auto rest = property_length_rest - length_bytes - val.size();
6435 v5::property::user_property(
6442 process_property_id(
6456 static constexpr std::size_t len = 4;
6457 if (property_length_rest < len) {
6458 call_protocol_error_handlers();
6470 rest = property_length_rest - len
6472 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
6473 auto& body = variant_get<buffer>(var);
6475 v5::property::maximum_packet_size(body.begin(), body.end())
6477 process_property_id(
6489 static constexpr std::size_t len = 1;
6490 if (property_length_rest < len) {
6491 call_protocol_error_handlers();
6503 rest = property_length_rest - len
6505 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
6506 auto& body = variant_get<buffer>(var);
6508 v5::property::wildcard_subscription_available(body.begin(), body.end())
6510 process_property_id(
6522 static constexpr std::size_t len = 1;
6523 if (property_length_rest < len) {
6524 call_protocol_error_handlers();
6536 rest = property_length_rest - len
6538 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
6539 auto& body = variant_get<buffer>(var);
6541 v5::property::subscription_identifier_available(body.begin(), body.end())
6543 process_property_id(
6555 static constexpr std::size_t len = 1;
6556 if (property_length_rest < len) {
6557 call_protocol_error_handlers();
6569 rest = property_length_rest - len
6571 (this_type_sp&&
self, any&& session_life_keeper, parse_handler_variant var,
buffer buf)
mutable {
6572 auto& body = variant_get<buffer>(var);
6574 v5::property::shared_subscription_available(body.begin(), body.end())
6576 process_property_id(
6592 void process_header(
6593 this_type_sp&&
self,
6594 any&& session_life_keeper,
6596 std::size_t header_len,
6597 parse_handler&& handler
6602 auto ptr = spa.get();
6603 socket_->async_read(
6608 session_life_keeper =
force_move(session_life_keeper),
6612 (
error_code ec, std::size_t bytes_transferred)
mutable {
6613 this->total_bytes_received_ += bytes_transferred;
6614 if (!check_error_and_transferred_length(ec, bytes_transferred, remaining_length_))
return;
6626 if (header_len == 0) {
6636 socket_->async_read(
6641 session_life_keeper =
force_move(session_life_keeper),
6646 std::size_t bytes_transferred)
mutable {
6647 this->total_bytes_received_ += bytes_transferred;
6648 if (!check_error_and_transferred_length(ec, bytes_transferred, header_len))
return;
6661 struct process_connect : as::coroutine, std::enable_shared_from_this<process_connect> {
6662 using ep_t = this_type;
6663 using ep_t_sp = this_type_sp;
6665 using process_type_sp = std::shared_ptr<process_type>;
6671 all_read_{all_read} {
6675 any&& session_life_keeper,
6676 parse_handler_variant var = std::size_t(0),
6680 if (ep_.remaining_length_ < header_len_) {
6681 ep_.call_protocol_error_handlers();
6685 yield ep_.process_header(
6687 std::make_tuple(
force_move(session_life_keeper), this->shared_from_this()),
6692 (*this)(std::forward<decltype(args)>(args)...);
6697 auto& buf = variant_get<buffer>(var);
6699 static constexpr
char protocol_name[] = { 0x00, 0x04,
'M',
'Q',
'T',
'T' };
6700 if (std::memcmp(buf.data(), protocol_name,
sizeof(protocol_name)) != 0) {
6701 ep_.call_protocol_error_handlers();
6704 std::size_t i =
sizeof(protocol_name);
6707 ep_.call_protocol_error_handlers();
6712 ep_.version_ = version;
6714 else if (ep_.version_ != version) {
6715 ep_.call_protocol_error_handlers();
6719 connect_flag_ = buf[i++];
6724 buf.remove_prefix(header_len_);
6727 ep_.process_properties(
6733 (*this)(std::forward<decltype(args)>(args)...);
6741 props_ =
force_move(variant_get<v5::properties>(var));
6744 yield ep_.process_string(
6750 (*this)(std::forward<decltype(args)>(args)...);
6753 client_id_ =
force_move(variant_get<buffer>(var));
6757 yield ep_.process_properties(
6763 (*this)(std::forward<decltype(args)>(args)...);
6766 will_props_ =
force_move(variant_get<v5::properties>(var));
6769 yield ep_.process_string(
6775 (*this)(std::forward<decltype(args)>(args)...);
6778 will_topic_ =
force_move(variant_get<buffer>(var));
6780 yield ep_.process_binary(
6786 (*this)(std::forward<decltype(args)>(args)...);
6789 will_payload_ =
force_move(variant_get<buffer>(var));
6792 yield ep_.process_string(
6798 (*this)(std::forward<decltype(args)>(args)...);
6801 user_name_ =
force_move(variant_get<buffer>(var));
6804 yield ep_.process_binary(
6810 (*this)(std::forward<decltype(args)>(args)...);
6813 password_ =
force_move(variant_get<buffer>(var));
6815 ep_.mqtt_connected_ =
true;
6816 switch (ep_.version_) {
6823 ? optional<will>(in_place_init,
6827 : optional<will>(nullopt),
6828 ep_.clean_session(),
6832 ep_.on_mqtt_message_processed(
6836 std::tuple<any, process_type_sp>
6837 >(session_life_keeper)
6844 if (
auto ta_opt = get_topic_alias_maximum_from_props(props_)) {
6845 if (ta_opt.value() > 0) {
6846 LockGuard<Mutex> lck (ep_.topic_alias_send_mtx_);
6847 ep_.topic_alias_send_.emplace(ta_opt.value());
6850 if (ep_.on_v5_connect(
6855 ? optional<will>(in_place_init,
6860 : optional<will>(nullopt),
6866 ep_.on_mqtt_message_processed(
6870 std::tuple<any, process_type_sp>
6871 >(session_life_keeper)
6878 BOOST_ASSERT(
false);
6884 static constexpr std::size_t header_len_ =
6895 std::uint16_t keep_alive_;
6901 optional<buffer> user_name_;
6902 optional<buffer> password_;
6908 struct process_connack : as::coroutine, std::enable_shared_from_this<process_connack> {
6910 using ep_t_sp = this_type_sp;
6912 using process_type_sp = std::shared_ptr<process_type>;
6918 all_read_{all_read} {
6922 any&& session_life_keeper,
6923 parse_handler_variant var = std::size_t(0),
6927 if (ep_.remaining_length_ < header_len_) {
6928 ep_.call_protocol_error_handlers();
6932 yield ep_.process_header(
6934 std::make_tuple(
force_move(session_life_keeper), this->shared_from_this()),
6939 (*this)(std::forward<decltype(args)>(args)...);
6944 auto& buf = variant_get<buffer>(var);
6946 switch (ep_.version_) {
6954 BOOST_ASSERT(
false);
6956 buf.remove_prefix(header_len_);
6959 ep_.process_properties(
6965 (*this)(std::forward<decltype(args)>(args)...);
6973 props_ =
force_move(variant_get<v5::properties>(var));
6974 ep_.mqtt_connected_ =
true;
6979 (any&& session_life_keeper)
mutable {
6980 switch (ep_.version_) {
6984 variant_get<connect_return_code>(reason_code_)
6987 ep_.on_mqtt_message_processed(
6991 std::tuple<any, process_type_sp>
6992 >(session_life_keeper)
6999 if (
auto ta_opt = get_topic_alias_maximum_from_props(props_)) {
7000 if (ta_opt.value() > 0) {
7001 LockGuard<Mutex> lck (ep_.topic_alias_send_mtx_);
7002 ep_.topic_alias_send_.emplace(ta_opt.value());
7005 if (ep_.on_v5_connack(
7007 variant_get<v5::connect_reason_code>(reason_code_),
7011 ep_.on_mqtt_message_processed(
7015 std::tuple<any, process_type_sp>
7016 >(session_life_keeper)
7023 BOOST_ASSERT(
false);
7068 if (session_present_) {
7069 if (ep_.async_send_store_) {
7072 ep_.async_read_on_message_processed_ =
false;
7073 auto async_connack_proc =
7077 session_life_keeper =
force_move(session_life_keeper),
7083 ep_.async_read_on_message_processed_ =
true;
7084 connack_proc(
force_move(session_life_keeper));
7086 ep_.async_send_store(
force_move(async_connack_proc));
7092 ep_.clear_session_data();
7095 connack_proc(
force_move(session_life_keeper));
7101 static constexpr std::size_t header_len_ =
7108 bool session_present_;
7109 variant<connect_return_code, v5::connect_reason_code> reason_code_;
7116 struct process_publish : as::coroutine, std::enable_shared_from_this<process_publish> {
7118 using ep_t_sp = this_type_sp;
7120 using process_type_sp = std::shared_ptr<process_type>;
7126 all_read_{all_read} {
7130 any&& session_life_keeper,
7131 parse_handler_variant var = std::size_t(0),
7135 if (ep_.remaining_length_ < min_len_) {
7136 ep_.call_protocol_error_handlers();
7140 yield ep_.process_header(
7142 std::make_tuple(
force_move(session_life_keeper), this->shared_from_this()),
7147 (*this)(std::forward<decltype(args)>(args)...);
7152 yield ep_.process_string(
7158 (*this)(std::forward<decltype(args)>(args)...);
7161 topic_name_ =
force_move(variant_get<buffer>(var));
7167 ep_.call_bad_message_error_handlers();
7173 yield ep_.process_packet_id(
7179 (*this)(std::forward<decltype(args)>(args)...);
7182 packet_id_ =
force_move(variant_get<packet_id_t>(var));
7185 yield ep_.process_properties(
7191 (*this)(std::forward<decltype(args)>(args)...);
7194 props_ =
force_move(variant_get<v5::properties>(var));
7196 yield ep_.process_nbytes(
7200 ep_.remaining_length_,
7203 (*this)(std::forward<decltype(args)>(args)...);
7209 switch (ep_.version_) {
7213 publish_options(ep_.fixed_header_),
7216 ep_.on_mqtt_message_processed(
7220 std::tuple<any, process_type_sp>
7221 >(session_life_keeper)
7229 if (topic_name_.empty()) {
7230 if (
auto topic_alias = get_topic_alias_from_props(props_)) {
7231 auto topic_name = [&] {
7232 LockGuard<Mutex> lck (ep_.topic_alias_recv_mtx_);
7233 if (ep_.topic_alias_recv_) {
7234 return ep_.topic_alias_recv_.value().find(
topic_alias.value());
7236 return std::string();
7238 if (topic_name.empty()) {
7241 <<
"no matching topic alias: "
7243 ep_.call_protocol_error_handlers();
7252 if (
auto topic_alias = ep_.get_topic_alias_from_props(props_)) {
7253 LockGuard<Mutex> lck (ep_.topic_alias_recv_mtx_);
7254 if (ep_.topic_alias_recv_) {
7255 ep_.topic_alias_recv_.value().insert_or_update(topic_name_,
topic_alias.value());
7259 if (ep_.on_v5_publish(
7261 publish_options(ep_.fixed_header_),
7267 ep_.on_mqtt_message_processed(
7271 std::tuple<any, process_type_sp>
7272 >(session_life_keeper)
7280 BOOST_ASSERT(
false);
7284 switch (qos_value_) {
7289 if (handler_call()) {
7290 ep_.auto_pub_response(
7292 if (ep_.connected_) {
7295 v5::puback_reason_code::success,
7301 if (ep_.connected_) {
7302 ep_.async_send_puback(
7314 if (handler_call()) {
7315 ep_.qos2_publish_handled_.emplace(*packet_id_);
7316 ep_.auto_pub_response(
7318 if (ep_.connected_) {
7327 if (ep_.connected_) {
7328 ep_.async_send_pubrec(
7345 static constexpr std::size_t min_len_ = 2;
7352 optional<packet_id_t> packet_id_;
7356 friend struct process_publish;
7360 struct process_puback : as::coroutine, std::enable_shared_from_this<process_puback> {
7362 using ep_t_sp = this_type_sp;
7363 using process_type = process_puback;
7364 using process_type_sp = std::shared_ptr<process_type>;
7370 all_read_{all_read} {
7374 any&& session_life_keeper,
7375 parse_handler_variant var = std::size_t(0),
7379 if (ep_.remaining_length_ < header_len_) {
7380 ep_.call_protocol_error_handlers();
7384 yield ep_.process_header(
7386 std::make_tuple(
force_move(session_life_keeper), this->shared_from_this()),
7391 (*this)(std::forward<decltype(args)>(args)...);
7396 yield ep_.process_packet_id(
7402 (*this)(std::forward<decltype(args)>(args)...);
7405 packet_id_ =
force_move(variant_get<packet_id_t>(var));
7407 if (ep_.remaining_length_ == 0) {
7415 yield ep_.process_nbytes(
7422 (*this)(std::forward<decltype(args)>(args)...);
7430 if (ep_.remaining_length_ > 0) {
7432 yield ep_.process_properties(
7438 (*this)(std::forward<decltype(args)>(args)...);
7441 props_ =
force_move(variant_get<v5::properties>(var));
7445 LockGuard<Mutex> lck (ep_.store_mtx_);
7446 auto& idx = ep_.store_.template get<tag_packet_id_type>();
7448 idx.erase(std::get<0>(r), std::get<1>(r));
7449 ep_.pid_man_.release_id(packet_id_);
7451 ep_.on_serialize_remove(packet_id_);
7452 switch (ep_.version_) {
7454 if (ep_.on_puback(packet_id_)) {
7455 ep_.on_mqtt_message_processed(
7459 std::tuple<any, process_type_sp>
7460 >(session_life_keeper)
7467 if (ep_.on_v5_puback(packet_id_, reason_code_,
force_move(props_))) {
7468 ep_.on_mqtt_message_processed(
7472 std::tuple<any, process_type_sp>
7473 >(session_life_keeper)
7480 BOOST_ASSERT(
false);
7488 static constexpr std::size_t header_len_ =
sizeof(
packet_id_t);
7497 friend struct process_puback;
7501 struct process_pubrec : as::coroutine, std::enable_shared_from_this<process_pubrec> {
7503 using ep_t_sp = this_type_sp;
7504 using process_type = process_pubrec;
7505 using process_type_sp = std::shared_ptr<process_type>;
7511 all_read_{all_read} {
7515 any&& session_life_keeper,
7516 parse_handler_variant var = std::size_t(0),
7520 if (ep_.remaining_length_ < header_len_) {
7521 ep_.call_protocol_error_handlers();
7525 yield ep_.process_header(
7527 std::make_tuple(
force_move(session_life_keeper), this->shared_from_this()),
7532 (*this)(std::forward<decltype(args)>(args)...);
7537 yield ep_.process_packet_id(
7543 (*this)(std::forward<decltype(args)>(args)...);
7546 packet_id_ =
force_move(variant_get<packet_id_t>(var));
7548 if (ep_.remaining_length_ == 0) {
7556 yield ep_.process_nbytes(
7563 (*this)(std::forward<decltype(args)>(args)...);
7571 if (ep_.remaining_length_ > 0) {
7573 yield ep_.process_properties(
7579 (*this)(std::forward<decltype(args)>(args)...);
7582 props_ =
force_move(variant_get<v5::properties>(var));
7586 LockGuard<Mutex> lck (ep_.store_mtx_);
7587 auto& idx = ep_.store_.template get<tag_packet_id_type>();
7589 idx.erase(std::get<0>(r), std::get<1>(r));
7596 ep_.auto_pub_response(
7598 if (ep_.connected_) {
7601 v5::pubrel_reason_code::success,
7616 if (ep_.connected_) {
7617 ep_.async_send_pubrel(
7636 switch (ep_.version_) {
7638 if (ep_.on_pubrec(packet_id_)) {
7640 ep_.on_mqtt_message_processed(
7644 std::tuple<any, process_type_sp>
7645 >(session_life_keeper)
7652 if (ep_.on_v5_pubrec(packet_id_, reason_code_,
force_move(props_))) {
7654 ep_.on_mqtt_message_processed(
7658 std::tuple<any, process_type_sp>
7659 >(session_life_keeper)
7666 BOOST_ASSERT(
false);
7673 static constexpr std::size_t header_len_ =
sizeof(
packet_id_t);
7682 friend struct process_pubrec;
7684 struct process_pubrel : as::coroutine, std::enable_shared_from_this<process_pubrel> {
7686 using ep_t_sp = this_type_sp;
7687 using process_type = process_pubrel;
7688 using process_type_sp = std::shared_ptr<process_type>;
7694 all_read_{all_read} {
7698 any&& session_life_keeper,
7699 parse_handler_variant var = std::size_t(0),
7703 if (ep_.remaining_length_ < header_len_) {
7704 ep_.call_protocol_error_handlers();
7708 yield ep_.process_header(
7710 std::make_tuple(
force_move(session_life_keeper), this->shared_from_this()),
7715 (*this)(std::forward<decltype(args)>(args)...);
7720 yield ep_.process_packet_id(
7726 (*this)(std::forward<decltype(args)>(args)...);
7729 packet_id_ =
force_move(variant_get<packet_id_t>(var));
7731 if (ep_.remaining_length_ == 0) {
7735 reason_code_ = v5::pubrel_reason_code::success;
7739 yield ep_.process_nbytes(
7746 (*this)(std::forward<decltype(args)>(args)...);
7754 if (ep_.remaining_length_ > 0) {
7756 yield ep_.process_properties(
7762 (*this)(std::forward<decltype(args)>(args)...);
7765 props_ =
force_move(variant_get<v5::properties>(var));
7772 ep_.auto_pub_response(
7774 if (ep_.connected_) {
7777 v5::pubcomp_reason_code::success,
7783 if (ep_.connected_) {
7784 ep_.async_send_pubcomp(
7786 v5::pubcomp_reason_code::success,
7794 ep_.qos2_publish_handled_.erase(packet_id_);
7795 switch (ep_.version_) {
7796 case protocol_version::v3_1_1:
7797 if (ep_.on_pubrel(packet_id_)) {
7799 ep_.on_mqtt_message_processed(
7803 std::tuple<any, process_type_sp>
7804 >(session_life_keeper)
7810 case protocol_version::v5:
7811 if (ep_.on_v5_pubrel(packet_id_, reason_code_,
force_move(props_))) {
7813 ep_.on_mqtt_message_processed(
7817 std::tuple<any, process_type_sp>
7818 >(session_life_keeper)
7825 BOOST_ASSERT(
false);
7832 static constexpr std::size_t header_len_ =
sizeof(
packet_id_t);
7841 friend struct process_pubrel;
7845 struct process_pubcomp : as::coroutine, std::enable_shared_from_this<process_pubcomp> {
7847 using ep_t_sp = this_type_sp;
7848 using process_type = process_pubcomp;
7849 using process_type_sp = std::shared_ptr<process_type>;
7855 all_read_{all_read} {
7859 any&& session_life_keeper,
7860 parse_handler_variant var = std::size_t(0),
7864 if (ep_.remaining_length_ < header_len_) {
7865 ep_.call_protocol_error_handlers();
7869 yield ep_.process_header(
7871 std::make_tuple(
force_move(session_life_keeper), this->shared_from_this()),
7876 (*this)(std::forward<decltype(args)>(args)...);
7881 yield ep_.process_packet_id(
7887 (*this)(std::forward<decltype(args)>(args)...);
7890 packet_id_ =
force_move(variant_get<packet_id_t>(var));
7892 if (ep_.remaining_length_ == 0) {
7896 reason_code_ = v5::pubcomp_reason_code::success;
7900 yield ep_.process_nbytes(
7907 (*this)(std::forward<decltype(args)>(args)...);
7915 if (ep_.remaining_length_ > 0) {
7917 yield ep_.process_properties(
7923 (*this)(std::forward<decltype(args)>(args)...);
7926 props_ =
force_move(variant_get<v5::properties>(var));
7930 LockGuard<Mutex> lck (ep_.store_mtx_);
7931 auto& idx = ep_.store_.template get<tag_packet_id_type>();
7932 auto r = idx.equal_range(std::make_tuple(packet_id_, control_packet_type::pubcomp));
7933 idx.erase(std::get<0>(r), std::get<1>(r));
7934 ep_.pid_man_.release_id(packet_id_);
7936 ep_.on_serialize_remove(packet_id_);
7937 switch (ep_.version_) {
7938 case protocol_version::v3_1_1:
7939 if (ep_.on_pubcomp(packet_id_)) {
7940 ep_.on_mqtt_message_processed(
7944 std::tuple<any, process_type_sp>
7945 >(session_life_keeper)
7951 case protocol_version::v5:
7952 if (ep_.on_v5_pubcomp(packet_id_, reason_code_,
force_move(props_))) {
7953 ep_.on_mqtt_message_processed(
7957 std::tuple<any, process_type_sp>
7958 >(session_life_keeper)
7965 BOOST_ASSERT(
false);
7973 static constexpr std::size_t header_len_ =
sizeof(
packet_id_t);
7982 friend struct process_pubcomp;
7986 struct process_subscribe : as::coroutine, std::enable_shared_from_this<process_subscribe> {
7988 using ep_t_sp = this_type_sp;
7989 using process_type = process_subscribe;
7990 using process_type_sp = std::shared_ptr<process_type>;
7996 all_read_{all_read} {
8000 any&& session_life_keeper,
8001 parse_handler_variant var = std::size_t(0),
8005 if (ep_.remaining_length_ < header_len_) {
8006 ep_.call_protocol_error_handlers();
8010 yield ep_.process_header(
8012 std::make_tuple(
force_move(session_life_keeper), this->shared_from_this()),
8017 (*this)(std::forward<decltype(args)>(args)...);
8022 yield ep_.process_packet_id(
8028 (*this)(std::forward<decltype(args)>(args)...);
8031 packet_id_ =
force_move(variant_get<packet_id_t>(var));
8033 if (ep_.version_ == protocol_version::v5) {
8035 yield ep_.process_properties(
8041 (*this)(std::forward<decltype(args)>(args)...);
8044 props_ =
force_move(variant_get<v5::properties>(var));
8049 yield ep_.process_string(
8055 (*this)(std::forward<decltype(args)>(args)...);
8062 <<
"topic_filter parse error"
8063 <<
" whole_topic_filter: "
8064 << variant_get<buffer>(var);
8065 ep_.call_protocol_error_handlers();
8070 yield ep_.process_nbytes(
8077 (*this)(std::forward<decltype(args)>(args)...);
8080 sub_opts_opt_.emplace(
static_cast<std::uint8_t
>(variant_get<buffer>(var)[0]));
8081 if (sub_opts_opt_.value().get_qos() != qos::at_most_once &&
8082 sub_opts_opt_.value().get_qos() != qos::at_least_once &&
8083 sub_opts_opt_.value().get_qos() != qos::exactly_once) {
8084 ep_.call_bad_message_error_handlers();
8088 entries_.emplace_back(
8091 sub_opts_opt_.value()
8094 if (ep_.remaining_length_ == 0) {
8095 switch (ep_.version_) {
8096 case protocol_version::v3_1_1:
8097 if (ep_.on_subscribe(packet_id_,
force_move(entries_))) {
8098 ep_.on_mqtt_message_processed(
8102 std::tuple<any, process_type_sp>
8103 >(session_life_keeper)
8109 case protocol_version::v5:
8111 ep_.on_mqtt_message_processed(
8115 std::tuple<any, process_type_sp>
8116 >(session_life_keeper)
8123 BOOST_ASSERT(
false);
8132 static constexpr std::size_t header_len_ =
sizeof(
packet_id_t);
8139 optional<share_name_topic_filter> sn_tf_opt_;
8140 optional<subscribe_options> sub_opts_opt_;
8141 std::vector<subscribe_entry> entries_;
8143 friend struct process_subscribe;
8147 struct process_suback : as::coroutine, std::enable_shared_from_this<process_suback> {
8149 using ep_t_sp = this_type_sp;
8150 using process_type = process_suback;
8151 using process_type_sp = std::shared_ptr<process_type>;
8157 all_read_{all_read} {
8161 any&& session_life_keeper,
8162 parse_handler_variant var = std::size_t(0),
8166 if (ep_.remaining_length_ < header_len_) {
8167 ep_.call_protocol_error_handlers();
8171 yield ep_.process_header(
8173 std::make_tuple(
force_move(session_life_keeper), this->shared_from_this()),
8178 (*this)(std::forward<decltype(args)>(args)...);
8183 yield ep_.process_packet_id(
8189 (*this)(std::forward<decltype(args)>(args)...);
8192 packet_id_ =
force_move(variant_get<packet_id_t>(var));
8194 if (ep_.version_ == protocol_version::v5) {
8196 yield ep_.process_properties(
8202 (*this)(std::forward<decltype(args)>(args)...);
8205 props_ =
force_move(variant_get<v5::properties>(var));
8209 yield ep_.process_nbytes(
8213 ep_.remaining_length_,
8216 (*this)(std::forward<decltype(args)>(args)...);
8220 LockGuard<Mutex> lck_store (ep_.store_mtx_);
8221 LockGuard<Mutex> lck_sub_unsub (ep_.sub_unsub_inflight_mtx_);
8222 ep_.pid_man_.release_id(packet_id_);
8223 ep_.sub_unsub_inflight_.erase(packet_id_);
8225 switch (ep_.version_) {
8226 case protocol_version::v3_1_1:
8231 std::vector<suback_return_code> results;
8232 auto& body = variant_get<buffer>(var);
8233 results.resize(body.size());
8245 return static_cast<suback_return_code>(e);
8248 if (ep_.on_suback(packet_id_,
force_move(results))) {
8249 ep_.on_mqtt_message_processed(
8253 std::tuple<any, process_type_sp>
8254 >(session_life_keeper)
8261 case protocol_version::v5:
8266 std::vector<v5::suback_reason_code> reasons;
8267 auto& body = variant_get<buffer>(var);
8268 reasons.resize(body.size());
8280 [&](
auto const& e) {
8281 return static_cast<v5::suback_reason_code>(e);
8285 ep_.on_mqtt_message_processed(
8289 std::tuple<any, process_type_sp>
8290 >(session_life_keeper)
8298 BOOST_ASSERT(
false);
8304 static constexpr std::size_t header_len_ =
sizeof(
packet_id_t);
8312 friend struct process_suback;
8316 struct process_unsubscribe : as::coroutine, std::enable_shared_from_this<process_unsubscribe> {
8318 using ep_t_sp = this_type_sp;
8319 using process_type = process_unsubscribe;
8320 using process_type_sp = std::shared_ptr<process_type>;
8322 process_unsubscribe(
8326 all_read_{all_read} {
8330 any&& session_life_keeper,
8331 parse_handler_variant var = std::size_t(0),
8335 if (ep_.remaining_length_ < header_len_) {
8336 ep_.call_protocol_error_handlers();
8340 yield ep_.process_header(
8342 std::make_tuple(
force_move(session_life_keeper), this->shared_from_this()),
8347 (*this)(std::forward<decltype(args)>(args)...);
8352 yield ep_.process_packet_id(
8358 (*this)(std::forward<decltype(args)>(args)...);
8361 packet_id_ =
force_move(variant_get<packet_id_t>(var));
8363 if (ep_.version_ == protocol_version::v5) {
8365 yield ep_.process_properties(
8371 (*this)(std::forward<decltype(args)>(args)...);
8374 props_ =
force_move(variant_get<v5::properties>(var));
8379 yield ep_.process_string(
8385 (*this)(std::forward<decltype(args)>(args)...);
8392 <<
"topic_filter parse error"
8393 <<
" whole_topic_filter: "
8394 << variant_get<buffer>(var);
8395 ep_.call_protocol_error_handlers();
8399 entries_.emplace_back(
8404 if (ep_.remaining_length_ == 0) {
8405 switch (ep_.version_) {
8406 case protocol_version::v3_1_1:
8407 if (ep_.on_unsubscribe(packet_id_,
force_move(entries_))) {
8408 ep_.on_mqtt_message_processed(
8412 std::tuple<any, process_type_sp>
8413 >(session_life_keeper)
8419 case protocol_version::v5:
8421 ep_.on_mqtt_message_processed(
8425 std::tuple<any, process_type_sp>
8426 >(session_life_keeper)
8433 BOOST_ASSERT(
false);
8442 static constexpr std::size_t header_len_ =
sizeof(
packet_id_t);
8449 optional<share_name_topic_filter> sn_tf_opt_;
8450 std::vector<unsubscribe_entry> entries_;
8452 friend struct process_unsubscribe;
8456 struct process_unsuback : as::coroutine, std::enable_shared_from_this<process_unsuback> {
8458 using ep_t_sp = this_type_sp;
8459 using process_type = process_unsuback;
8460 using process_type_sp = std::shared_ptr<process_type>;
8466 all_read_{all_read} {
8470 any&& session_life_keeper,
8471 parse_handler_variant var = std::size_t(0),
8475 if (ep_.remaining_length_ < header_len_) {
8476 ep_.call_protocol_error_handlers();
8480 yield ep_.process_header(
8482 std::make_tuple(
force_move(session_life_keeper), this->shared_from_this()),
8487 (*this)(std::forward<decltype(args)>(args)...);
8492 yield ep_.process_packet_id(
8498 (*this)(std::forward<decltype(args)>(args)...);
8501 packet_id_ =
force_move(variant_get<packet_id_t>(var));
8503 if (ep_.version_ == protocol_version::v5) {
8505 yield ep_.process_properties(
8511 (*this)(std::forward<decltype(args)>(args)...);
8514 props_ =
force_move(variant_get<v5::properties>(var));
8517 yield ep_.process_nbytes(
8521 ep_.remaining_length_,
8524 (*this)(std::forward<decltype(args)>(args)...);
8527 auto body = variant_get<buffer>(var);
8528 reasons_.resize(body.size());
8533 [&](
auto const& e) {
8534 return static_cast<v5::unsuback_reason_code>(e);
8539 LockGuard<Mutex> lck_store (ep_.store_mtx_);
8540 LockGuard<Mutex> lck_sub_unsub (ep_.sub_unsub_inflight_mtx_);
8541 ep_.pid_man_.release_id(packet_id_);
8542 ep_.sub_unsub_inflight_.erase(packet_id_);
8544 switch (ep_.version_) {
8545 case protocol_version::v3_1_1:
8546 if (ep_.on_unsuback(packet_id_)) {
8547 ep_.on_mqtt_message_processed(
8551 std::tuple<any, process_type_sp>
8552 >(session_life_keeper)
8558 case protocol_version::v5:
8560 ep_.on_mqtt_message_processed(
8564 std::tuple<any, process_type_sp>
8565 >(session_life_keeper)
8572 BOOST_ASSERT(
false);
8578 static constexpr std::size_t header_len_ =
sizeof(
packet_id_t);
8585 std::vector<v5::unsuback_reason_code> reasons_;
8587 friend struct process_unsuback;
8591 void process_pingreq(
8592 any session_life_keeper
8594 static constexpr std::size_t header_len = 0;
8596 if (remaining_length_ != header_len) {
8597 call_protocol_error_handlers();
8601 on_mqtt_message_processed(
force_move(session_life_keeper));
8607 void process_pingresp(
8608 any session_life_keeper
8610 static constexpr std::size_t header_len = 0;
8612 if (remaining_length_ != header_len) {
8613 call_protocol_error_handlers();
8616 if (on_pingresp()) {
8617 on_mqtt_message_processed(
force_move(session_life_keeper));
8619 if (pingresp_timeout_ != std::chrono::steady_clock::duration::zero()) tim_pingresp_.cancel();
8624 struct process_disconnect : as::coroutine, std::enable_shared_from_this<process_disconnect> {
8625 using ep_t = this_type;
8626 using ep_t_sp = this_type_sp;
8627 using process_type = process_disconnect;
8628 using process_type_sp = std::shared_ptr<process_type>;
8634 all_read_{all_read} {
8638 any&& session_life_keeper,
8639 parse_handler_variant var = std::size_t(0),
8643 if (ep_.remaining_length_ > 0) {
8644 if (ep_.version_ != protocol_version::v5) {
8645 ep_.call_protocol_error_handlers();
8650 yield ep_.process_header(
8652 std::make_tuple(
force_move(session_life_keeper), this->shared_from_this()),
8657 (*this)(std::forward<decltype(args)>(args)...);
8662 yield ep_.process_nbytes(
8669 (*this)(std::forward<decltype(args)>(args)...);
8676 if (ep_.remaining_length_ > 0) {
8677 yield ep_.process_properties(
8683 (*this)(std::forward<decltype(args)>(args)...);
8686 props_ =
force_move(variant_get<v5::properties>(var));
8688 switch (ep_.version_) {
8689 case protocol_version::v3_1_1:
8690 ep_.on_disconnect();
8692 case protocol_version::v5:
8693 ep_.on_v5_disconnect(reason_code_,
force_move(props_));
8696 BOOST_ASSERT(
false);
8698 ep_.shutdown(*ep_.socket_);
8699 ep_.on_mqtt_message_processed(
8703 std::tuple<any, process_type_sp>
8704 >(session_life_keeper)
8710 switch (ep_.version_) {
8711 case protocol_version::v3_1_1:
8712 ep_.on_disconnect();
8714 case protocol_version::v5:
8715 ep_.on_v5_disconnect(reason_code_,
force_move(props_));
8718 BOOST_ASSERT(
false);
8720 ep_.shutdown(*ep_.socket_);
8721 ep_.on_mqtt_message_processed(
8730 static constexpr std::size_t header_len_ =
sizeof(
packet_id_t);
8738 friend struct process_disconnect;
8742 struct process_auth : as::coroutine, std::enable_shared_from_this<process_auth> {
8744 using ep_t_sp = this_type_sp;
8745 using process_type = process_auth;
8746 using process_type_sp = std::shared_ptr<process_type>;
8752 all_read_{all_read} {
8756 any&& session_life_keeper,
8757 parse_handler_variant var = std::size_t(0),
8761 if (ep_.version_ != protocol_version::v5) {
8762 ep_.call_protocol_error_handlers();
8765 if (ep_.remaining_length_ > 0) {
8767 yield ep_.process_header(
8769 std::make_tuple(
force_move(session_life_keeper), this->shared_from_this()),
8774 (*this)(std::forward<decltype(args)>(args)...);
8779 yield ep_.process_nbytes(
8786 (*this)(std::forward<decltype(args)>(args)...);
8791 yield ep_.process_properties(
8797 (*this)(std::forward<decltype(args)>(args)...);
8800 props_ =
force_move(variant_get<v5::properties>(var));
8801 BOOST_ASSERT(ep_.version_ == protocol_version::v5);
8802 if (ep_.on_v5_auth(reason_code_,
force_move(props_))) {
8803 ep_.on_mqtt_message_processed(
8807 std::tuple<any, process_type_sp>
8808 >(session_life_keeper)
8815 BOOST_ASSERT(ep_.version_ == protocol_version::v5);
8816 if (ep_.on_v5_auth(reason_code_,
force_move(props_))) {
8817 ep_.on_mqtt_message_processed(
8827 static constexpr std::size_t header_len_ =
sizeof(
packet_id_t);
8835 friend struct process_auth;
8837 template <
typename F,
typename AF>
8838 void auto_pub_response(F
const& f, AF
const& af) {
8839 if (auto_pub_response_) {
8840 if (auto_pub_response_async_) af();
8848 optional<buffer> user_name,
8849 optional<buffer> password,
8851 std::uint16_t keep_alive_sec,
8855 case protocol_version::v3_1_1:
8857 v3_1_1::connect_message(
8867 case protocol_version::v5:
8868 update_topic_alias_maximum_recv(props);
8870 v5::connect_message(
8882 BOOST_ASSERT(
false);
8888 bool session_present,
8889 variant<connect_return_code, v5::connect_reason_code> reason_code,
8893 case protocol_version::v3_1_1:
8895 v3_1_1::connack_message(
8897 variant_get<connect_return_code>(reason_code)
8901 case protocol_version::v5:
8902 update_topic_alias_maximum_recv(props);
8904 v5::connack_message(
8906 variant_get<v5::connect_reason_code>(reason_code),
8912 BOOST_ASSERT(
false);
8917 template <
typename PublishMessage>
8921 v5::basic_publish_message<PacketIdBytes>
8925 PublishMessage& msg,
8928 if (msg.topic().empty()) {
8930 if (
auto ta_opt = get_topic_alias_from_props(msg.props())) {
8931 LockGuard<Mutex> lck (topic_alias_send_mtx_);
8933 if (topic_alias_send_) t = topic_alias_send_.value().find(ta_opt.value());
8937 <<
"publish topic_name is empty, topic alias " << ta_opt.value()
8938 <<
" is not registered." ;
8944 <<
"topia alias : " << t <<
" - " << ta_opt.value() <<
" is recovered for store." ;
8946 msg.set_topic_name(
as::buffer(topic_name_buf));
8953 <<
"publish topic_name is empty, no topic alias set.";
8957 msg.remove_prop(v5::property::id::topic_alias);
8960 template <
typename PublishMessage>
8964 v5::basic_publish_message<PacketIdBytes>
8967 store_topic_alias(PublishMessage&, any&) {}
8969 template <
typename PublishMessage>
8973 v5::basic_publish_message<PacketIdBytes>
8976 apply_topic_alias(PublishMessage& msg, any& life_keeper,
bool checked) {
8977 auto clear_topic_name_and_add_topic_alias =
8979 auto topic_name_buf =
buffer();
8980 msg.set_topic_name(
as::buffer(topic_name_buf));
8982 msg.add_prop(v5::property::topic_alias(ta));
8985 if (msg.topic().empty()) {
8986 if (checked)
return;
8989 if (
auto ta_opt = get_topic_alias_from_props(msg.props())) {
8990 LockGuard<Mutex> lck (topic_alias_send_mtx_);
8992 if (topic_alias_send_) t = topic_alias_send_.value().find(ta_opt.value());
8996 <<
"publish topic_name is empty, topic alias " << ta_opt.value()
8997 <<
" is not registered." ;
9004 <<
"publish topic_name is empty, no topic alias set.";
9009 if (
auto ta_opt = get_topic_alias_from_props(msg.props())) {
9012 <<
"topia alias : " << msg.topic() <<
" - " << ta_opt.value() <<
" is registered." ;
9013 LockGuard<Mutex> lck (topic_alias_send_mtx_);
9014 if (topic_alias_send_) {
9015 topic_alias_send_.value().insert_or_update(
9021 else if (auto_map_topic_alias_send_) {
9022 LockGuard<Mutex> lck (topic_alias_send_mtx_);
9023 if (topic_alias_send_) {
9024 auto lru_ta = topic_alias_send_.value().get_lru_alias();
9025 if (
auto ta_opt = topic_alias_send_.value().find(msg.topic())) {
9028 <<
"topia alias : " << msg.topic() <<
" - " << ta_opt.value() <<
" is found." ;
9029 topic_alias_send_.value().insert_or_update(msg.topic(), ta_opt.value());
9030 clear_topic_name_and_add_topic_alias(ta_opt.value());
9033 topic_alias_send_.value().insert_or_update(msg.topic(), lru_ta);
9034 msg.add_prop(v5::property::topic_alias(lru_ta));
9038 else if (auto_replace_topic_alias_send_) {
9039 LockGuard<Mutex> lck (topic_alias_send_mtx_);
9040 if (topic_alias_send_) {
9041 if (
auto ta_opt = topic_alias_send_.value().find(msg.topic())) {
9044 <<
"topia alias : " << msg.topic() <<
" - " << ta_opt.value() <<
" is found." ;
9045 topic_alias_send_.value().insert_or_update(msg.topic(), ta_opt.value());
9046 clear_topic_name_and_add_topic_alias(ta_opt.value());
9053 template <
typename PublishMessage>
9057 v5::basic_publish_message<PacketIdBytes>
9060 apply_topic_alias(PublishMessage&, any&,
bool) {}
9062 template <
typename PublishMessage,
typename SerializePublish>
9063 void preprocess_publish_message(
9064 PublishMessage& msg,
9066 SerializePublish
const& serialize_publish,
9067 bool register_pid =
false
9069 auto qos_value = msg.get_qos();
9070 bool checked =
false;
9071 if (qos_value == qos::at_least_once || qos_value == qos::exactly_once) {
9072 auto store_msg = msg;
9073 store_topic_alias(store_msg, life_keeper);
9075 store_msg.set_dup(
true);
9076 auto packet_id = store_msg.packet_id();
9078 LockGuard<Mutex> lck (store_mtx_);
9080 auto ret = pid_man_.register_id(packet_id);
9086 qos_value == qos::at_least_once
9087 ? control_packet_type::puback
9088 : control_packet_type::pubrec,
9092 (this->*serialize_publish)(
force_move(store_msg));
9094 apply_topic_alias(msg, life_keeper, checked);
9097 template <
typename ConstBufferSequence>
9098 typename std::enable_if<
9099 as::is_const_buffer_sequence<ConstBufferSequence>::value
9103 as::const_buffer topic_name,
9104 ConstBufferSequence payloads,
9105 publish_options pubopts,
9109 auto do_send_publish =
9110 [&](
auto msg,
auto const& serialize_publish) {
9111 preprocess_publish_message(
9120 case protocol_version::v3_1_1:
9122 v3_1_1::basic_publish_message<PacketIdBytes>(
9128 &endpoint::on_serialize_publish_message
9131 case protocol_version::v5:
9133 v5::basic_publish_message<PacketIdBytes>(
9140 &endpoint::on_serialize_v5_publish_message
9144 BOOST_ASSERT(
false);
9155 case protocol_version::v3_1_1:
9156 do_sync_write(v3_1_1::basic_puback_message<PacketIdBytes>(packet_id));
9158 case protocol_version::v5:
9159 do_sync_write(v5::basic_puback_message<PacketIdBytes>(packet_id, reason,
force_move(props)));
9162 BOOST_ASSERT(
false);
9166 on_pub_res_sent(packet_id);
9175 case protocol_version::v3_1_1:
9176 do_sync_write(v3_1_1::basic_pubrec_message<PacketIdBytes>(packet_id));
9178 case protocol_version::v5:
9179 do_sync_write(v5::basic_pubrec_message<PacketIdBytes>(packet_id, reason,
force_move(props)));
9182 BOOST_ASSERT(
false);
9195 [&](
auto msg,
auto const& serialize) {
9197 LockGuard<Mutex> lck (store_mtx_);
9200 pid_man_.register_id(packet_id);
9202 auto ret = store_.emplace(
9204 control_packet_type::pubcomp,
9217 <<
"overwrite pubrel"
9218 <<
" packet_id:" << packet_id;
9224 control_packet_type::pubcomp,
9233 (this->*serialize)(msg);
9238 case protocol_version::v3_1_1:
9240 v3_1_1::basic_pubrel_message<PacketIdBytes>(packet_id),
9241 &endpoint::on_serialize_pubrel_message
9244 case protocol_version::v5:
9246 v5::basic_pubrel_message<PacketIdBytes>(packet_id, reason,
force_move(props)),
9247 &endpoint::on_serialize_v5_pubrel_message
9251 BOOST_ASSERT(
false);
9264 [&](
auto msg,
auto const& serialize) {
9266 LockGuard<Mutex> lck (store_mtx_);
9269 pid_man_.register_id(packet_id);
9271 auto ret = store_.emplace(
9273 control_packet_type::pubcomp,
9278 BOOST_ASSERT(ret.second);
9281 (this->*serialize)(msg);
9285 case protocol_version::v3_1_1:
9287 v3_1_1::basic_pubrel_message<PacketIdBytes>(packet_id),
9288 &endpoint::on_serialize_pubrel_message
9291 case protocol_version::v5:
9293 v5::basic_pubrel_message<PacketIdBytes>(packet_id, reason,
force_move(props)),
9294 &endpoint::on_serialize_v5_pubrel_message
9298 BOOST_ASSERT(
false);
9309 case protocol_version::v3_1_1:
9310 do_sync_write(v3_1_1::basic_pubcomp_message<PacketIdBytes>(packet_id));
9312 case protocol_version::v5:
9313 do_sync_write(v5::basic_pubcomp_message<PacketIdBytes>(packet_id, reason,
force_move(props)));
9316 BOOST_ASSERT(
false);
9320 on_pub_res_sent(packet_id);
9323 void send_subscribe(
9324 std::vector<std::tuple<as::const_buffer, subscribe_options>> params,
9329 LockGuard<Mutex> lck (sub_unsub_inflight_mtx_);
9330 sub_unsub_inflight_.insert(packet_id);
9332 for(
auto const& p : params)
9336 std::get<1>(p).
get_qos() == qos::at_most_once ||
9337 std::get<1>(p).
get_qos() == qos::at_least_once ||
9338 std::get<1>(p).
get_qos() == qos::exactly_once
9342 case protocol_version::v3_1_1:
9343 do_sync_write(v3_1_1::basic_subscribe_message<PacketIdBytes>(
force_move(params), packet_id));
9345 case protocol_version::v5:
9346 do_sync_write(v5::basic_subscribe_message<PacketIdBytes>(
force_move(params), packet_id,
force_move(props)));
9349 BOOST_ASSERT(
false);
9355 variant<std::vector<suback_return_code>, std::vector<v5::suback_reason_code>> params,
9360 case protocol_version::v3_1_1:
9361 do_sync_write(v3_1_1::basic_suback_message<PacketIdBytes>(
force_move(
variant_get<std::vector<suback_return_code>>(params)), packet_id));
9363 case protocol_version::v5:
9364 do_sync_write(v5::basic_suback_message<PacketIdBytes>(
force_move(
variant_get<std::vector<v5::suback_reason_code>>(params)), packet_id,
force_move(props)));
9367 BOOST_ASSERT(
false);
9372 void send_unsubscribe(
9373 std::vector<as::const_buffer> params,
9378 LockGuard<Mutex> lck (sub_unsub_inflight_mtx_);
9379 sub_unsub_inflight_.insert(packet_id);
9382 case protocol_version::v3_1_1:
9383 do_sync_write(v3_1_1::basic_unsubscribe_message<PacketIdBytes>(
force_move(params), packet_id));
9385 case protocol_version::v5:
9386 do_sync_write(v5::basic_unsubscribe_message<PacketIdBytes>(
force_move(params), packet_id,
force_move(props)));
9389 BOOST_ASSERT(
false);
9398 case protocol_version::v3_1_1:
9399 do_sync_write(v3_1_1::basic_unsuback_message<PacketIdBytes>(packet_id));
9401 case protocol_version::v5:
9402 BOOST_ASSERT(
false);
9405 BOOST_ASSERT(
false);
9411 std::vector<v5::unsuback_reason_code> params,
9416 case protocol_version::v3_1_1:
9417 BOOST_ASSERT(
false);
9419 case protocol_version::v5:
9420 do_sync_write(v5::basic_unsuback_message<PacketIdBytes>(
force_move(params), packet_id,
force_move(props)));
9423 BOOST_ASSERT(
false);
9428 void send_pingreq() {
9430 case protocol_version::v3_1_1:
9431 do_sync_write(v3_1_1::pingreq_message());
9432 set_pingresp_timer();
9434 case protocol_version::v5:
9435 do_sync_write(v5::pingreq_message());
9436 set_pingresp_timer();
9439 BOOST_ASSERT(
false);
9444 void send_pingresp() {
9446 case protocol_version::v3_1_1:
9447 do_sync_write(v3_1_1::pingresp_message());
9449 case protocol_version::v5:
9450 do_sync_write(v5::pingresp_message());
9453 BOOST_ASSERT(
false);
9463 case protocol_version::v3_1_1:
9464 BOOST_ASSERT(
false);
9466 case protocol_version::v5:
9467 do_sync_write(v5::auth_message(reason,
force_move(props)));
9470 BOOST_ASSERT(
false);
9475 void send_disconnect(
9480 case protocol_version::v3_1_1:
9481 do_sync_write(v3_1_1::disconnect_message());
9483 case protocol_version::v5:
9484 do_sync_write(v5::disconnect_message(reason,
force_move(props)));
9487 BOOST_ASSERT(
false);
9493 LockGuard<Mutex> lck (store_mtx_);
9494 auto const& idx = store_.template get<tag_seq>();
9495 for (
auto const& e : idx) {
9496 do_sync_write(get_basic_message_variant<PacketIdBytes>(e.message()));
9501 template <
typename MessageVariant>
9502 void do_sync_write(MessageVariant&& mv) {
9504 if (!connected_)
return;
9506 total_bytes_sent_ += socket_->write(const_buffer_sequence<PacketIdBytes>(mv), ec);
9512 void async_send_connect(
9514 optional<buffer> user_name,
9515 optional<buffer> password,
9516 optional<will>
const& w,
9517 std::uint16_t keep_alive_sec,
9519 async_handler_t func
9523 case protocol_version::v3_1_1:
9525 v3_1_1::connect_message(
9536 case protocol_version::v5:
9537 update_topic_alias_maximum_recv(props);
9539 v5::connect_message(
9552 BOOST_ASSERT(
false);
9557 void async_send_connack(
9558 bool session_present,
9559 variant<connect_return_code, v5::connect_reason_code> reason_code,
9561 async_handler_t func
9564 case protocol_version::v3_1_1:
9566 v3_1_1::connack_message(
9568 variant_get<connect_return_code>(reason_code)
9573 case protocol_version::v5:
9574 update_topic_alias_maximum_recv(props);
9576 v5::connack_message(
9578 variant_get<v5::connect_reason_code>(reason_code),
9585 BOOST_ASSERT(
false);
9590 template <
typename ConstBufferSequence>
9591 typename std::enable_if<
9592 as::is_const_buffer_sequence<ConstBufferSequence>::value
9596 as::const_buffer topic_name,
9597 ConstBufferSequence payloads,
9598 publish_options pubopts,
9601 async_handler_t func
9603 auto do_async_send_publish =
9604 [&](
auto msg,
auto const& serialize_publish) {
9605 preprocess_publish_message(
9619 case protocol_version::v3_1_1:
9620 do_async_send_publish(
9621 v3_1_1::basic_publish_message<PacketIdBytes>(
9627 &endpoint::on_serialize_publish_message
9630 case protocol_version::v5:
9631 do_async_send_publish(
9632 v5::basic_publish_message<PacketIdBytes>(
9639 &endpoint::on_serialize_v5_publish_message
9643 BOOST_ASSERT(
false);
9648 void async_send_puback(
9652 async_handler_t func
9655 case protocol_version::v3_1_1:
9657 v3_1_1::basic_puback_message<PacketIdBytes>(packet_id),
9658 [
this,
self = this->shared_from_this(), packet_id, func =
force_move(func)]
9661 on_pub_res_sent(packet_id);
9665 case protocol_version::v5:
9667 v5::basic_puback_message<PacketIdBytes>(packet_id, reason,
force_move(props)),
9668 [
this,
self = this->shared_from_this(), packet_id, func =
force_move(func)]
9671 on_pub_res_sent(packet_id);
9676 BOOST_ASSERT(
false);
9681 void async_send_pubrec(
9685 async_handler_t func
9688 case protocol_version::v3_1_1:
9690 v3_1_1::basic_pubrec_message<PacketIdBytes>(packet_id),
9694 case protocol_version::v5:
9696 v5::basic_pubrec_message<PacketIdBytes>(packet_id, reason,
force_move(props)),
9701 BOOST_ASSERT(
false);
9706 void async_send_pubrel(
9711 async_handler_t func
9714 auto msg = basic_pubrel_message<PacketIdBytes>(packet_id);
9717 [&](
auto msg,
auto const& serialize) {
9719 LockGuard<Mutex> lck (store_mtx_);
9722 pid_man_.register_id(packet_id);
9724 auto ret = store_.emplace(
9726 control_packet_type::pubcomp,
9739 <<
"overwrite pubrel"
9740 <<
" packet_id:" << packet_id;
9746 control_packet_type::pubcomp,
9755 (this->*serialize)(msg);
9766 case protocol_version::v3_1_1:
9768 v3_1_1::basic_pubrel_message<PacketIdBytes>(packet_id),
9769 &endpoint::on_serialize_pubrel_message
9772 case protocol_version::v5:
9774 v5::basic_pubrel_message<PacketIdBytes>(packet_id, reason,
force_move(props)),
9775 &endpoint::on_serialize_v5_pubrel_message
9779 BOOST_ASSERT(
false);
9784 void async_send_pubcomp(
9788 async_handler_t func
9791 case protocol_version::v3_1_1:
9793 v3_1_1::basic_pubcomp_message<PacketIdBytes>(packet_id),
9794 [
this,
self = this->shared_from_this(), packet_id, func =
force_move(func)]
9797 on_pub_res_sent(packet_id);
9801 case protocol_version::v5:
9803 v5::basic_pubcomp_message<PacketIdBytes>(packet_id, reason,
force_move(props)),
9804 [
this,
self = this->shared_from_this(), packet_id, func =
force_move(func)]
9807 on_pub_res_sent(packet_id);
9812 BOOST_ASSERT(
false);
9817 void async_send_subscribe(
9818 std::vector<std::tuple<as::const_buffer, subscribe_options>> params,
9821 async_handler_t func
9824 LockGuard<Mutex> lck (sub_unsub_inflight_mtx_);
9825 sub_unsub_inflight_.insert(packet_id);
9828 case protocol_version::v3_1_1:
9830 v3_1_1::basic_subscribe_message<PacketIdBytes>(
9836 case protocol_version::v5:
9838 v5::basic_subscribe_message<PacketIdBytes>(
9846 BOOST_ASSERT(
false);
9851 void async_send_suback(
9852 variant<std::vector<suback_return_code>, std::vector<v5::suback_reason_code>> params,
9855 async_handler_t func
9858 case protocol_version::v3_1_1:
9860 v3_1_1::basic_suback_message<PacketIdBytes>(
9866 case protocol_version::v5:
9868 v5::basic_suback_message<PacketIdBytes>(
9876 BOOST_ASSERT(
false);
9881 void async_send_unsubscribe(
9882 std::vector<as::const_buffer> params,
9885 async_handler_t func
9888 LockGuard<Mutex> lck (sub_unsub_inflight_mtx_);
9889 sub_unsub_inflight_.insert(packet_id);
9892 case protocol_version::v3_1_1:
9894 v3_1_1::basic_unsubscribe_message<PacketIdBytes>(
9901 case protocol_version::v5:
9903 v5::basic_unsubscribe_message<PacketIdBytes>(
9912 BOOST_ASSERT(
false);
9917 void async_send_unsuback(
9919 async_handler_t func
9922 case protocol_version::v3_1_1:
9924 v3_1_1::basic_unsuback_message<PacketIdBytes>(packet_id),
force_move(func)
9927 case protocol_version::v5:
9928 BOOST_ASSERT(
false);
9931 BOOST_ASSERT(
false);
9936 void async_send_unsuback(
9937 std::vector<v5::unsuback_reason_code> params,
9940 async_handler_t func
9943 case protocol_version::v3_1_1:
9944 BOOST_ASSERT(
false);
9946 case protocol_version::v5:
9948 v5::basic_unsuback_message<PacketIdBytes>(
9957 BOOST_ASSERT(
false);
9962 void async_send_pingreq(async_handler_t func) {
9964 case protocol_version::v3_1_1:
9965 do_async_write(v3_1_1::pingreq_message(),
force_move(func));
9966 set_pingresp_timer();
9968 case protocol_version::v5:
9969 do_async_write(v5::pingreq_message(),
force_move(func));
9970 set_pingresp_timer();
9973 BOOST_ASSERT(
false);
9978 void async_send_pingresp(async_handler_t func) {
9980 case protocol_version::v3_1_1:
9981 do_async_write(v3_1_1::pingresp_message(),
force_move(func));
9983 case protocol_version::v5:
9984 do_async_write(v5::pingresp_message(),
force_move(func));
9987 BOOST_ASSERT(
false);
9992 void async_send_auth(
9995 async_handler_t func
9998 case protocol_version::v3_1_1:
9999 BOOST_ASSERT(
false);
10001 case protocol_version::v5:
10005 BOOST_ASSERT(
false);
10010 void async_send_disconnect(
10013 async_handler_t func
10015 switch (version_) {
10016 case protocol_version::v3_1_1:
10017 do_async_write(v3_1_1::disconnect_message(),
force_move(func));
10019 case protocol_version::v5:
10023 BOOST_ASSERT(
false);
10028 void async_send_store(std::function<
void()> func) {
10034 LockGuard<Mutex> lck (store_mtx_);
10035 auto const& idx = store_.template get<tag_seq>();
10036 for (
auto const& e : idx) {
10038 get_basic_message_variant<PacketIdBytes>(e.message()),
10048 class async_packet {
10051 basic_message_variant<PacketIdBytes> mv,
10052 async_handler_t h = {})
10055 basic_message_variant<PacketIdBytes>
const& message()
const {
10058 basic_message_variant<PacketIdBytes>& message() {
10061 async_handler_t
const& handler()
const {
return handler_; }
10062 async_handler_t& handler() {
return handler_; }
10064 basic_message_variant<PacketIdBytes> mv_;
10065 async_handler_t handler_;
10068 struct write_completion_handler {
10069 write_completion_handler(
10070 std::shared_ptr<this_type>
self,
10071 async_handler_t func,
10072 std::size_t num_of_messages,
10073 std::size_t expected)
10076 num_of_messages_(num_of_messages),
10077 bytes_to_transfer_(expected)
10083 BOOST_ASSERT(func_);
10087 for (std::size_t i = 0; i != num_of_messages_; ++i) {
10088 self_->queue_.pop_front();
10091 !self_->connected_) {
10092 self_->connected_ =
false;
10093 while (!self_->queue_.empty()) {
10095 if(
auto&& h = self_->queue_.front().handler()) h(ec);
10096 self_->queue_.pop_front();
10100 if (!self_->queue_.empty()) {
10101 self_->do_async_write();
10106 std::size_t bytes_transferred)
const {
10108 self_->total_bytes_sent_ += bytes_transferred;
10109 for (std::size_t i = 0; i != num_of_messages_; ++i) {
10110 self_->queue_.pop_front();
10113 !self_->connected_) {
10114 self_->connected_ =
false;
10115 while (!self_->queue_.empty()) {
10117 if(
auto&& h = self_->queue_.front().handler()) h(ec);
10118 self_->queue_.pop_front();
10122 if (bytes_to_transfer_ != bytes_transferred) {
10123 self_->connected_ =
false;
10124 while (!self_->queue_.empty()) {
10126 if(
auto&& h = self_->queue_.front().handler()) h(ec);
10127 self_->queue_.pop_front();
10129 throw write_bytes_transferred_error(bytes_to_transfer_, bytes_transferred);
10131 if (!self_->queue_.empty()) {
10132 self_->do_async_write();
10135 std::shared_ptr<this_type> self_;
10136 async_handler_t func_;
10137 std::size_t num_of_messages_;
10138 std::size_t bytes_to_transfer_;
10141 void do_async_write() {
10143 using difference_t =
typename decltype(queue_)::difference_type;
10144 std::size_t iterator_count = (max_queue_send_count_ == 0)
10146 : std::min(max_queue_send_count_, queue_.size());
10147 auto const& start = queue_.cbegin();
10148 auto end = std::next(start, boost::numeric_cast<difference_t>(iterator_count));
10151 std::size_t total_bytes = 0;
10152 std::size_t total_const_buffer_sequence = 0;
10153 for (
auto it = start; it != end; ++it) {
10154 auto const& elem = *it;
10155 auto const& mv = elem.message();
10156 std::size_t
const size = MQTT_NS::size<PacketIdBytes>(mv);
10159 if (max_queue_send_size_ != 0 && max_queue_send_size_ < total_bytes +
size) {
10161 iterator_count = boost::numeric_cast<std::size_t>(std::distance(start, end));
10164 total_bytes +=
size;
10168 std::vector<as::const_buffer> buf;
10169 std::vector<async_handler_t> handlers;
10171 buf.reserve(total_const_buffer_sequence);
10172 handlers.reserve(iterator_count);
10174 for (
auto it = start; it != end; ++it) {
10175 auto const& elem = *it;
10176 auto const& mv = elem.message();
10178 std::copy(cbs.begin(), cbs.end(), std::back_inserter(buf));
10179 handlers.emplace_back(elem.handler());
10184 socket_->async_write(
10186 write_completion_handler(
10187 this->shared_from_this(),
10190 for (
auto const& h : handlers) {
10200 void do_async_write(basic_message_variant<PacketIdBytes> mv, async_handler_t func) {
10207 if (func) func(boost::system::errc::make_error_code(boost::system::errc::success));
10212 if (queue_.size() > 1)
return;
10218 static constexpr std::uint16_t
make_uint16_t(
char b1,
char b2) {
10220 static_cast<std::uint16_t
>(
10221 ((
static_cast<std::uint16_t
>(b1) & 0xff)) << 8 |
10222 (
static_cast<std::uint16_t
>(b2) & 0xff)
10226 void clean_sub_unsub_inflight() {
10227 LockGuard<Mutex> lck_store (store_mtx_);
10228 LockGuard<Mutex> lck_sub_unsub (sub_unsub_inflight_mtx_);
10229 for (
auto packet_id : sub_unsub_inflight_) {
10230 pid_man_.release_id(packet_id);
10234 void clean_sub_unsub_inflight_on_error(
error_code ec) {
10235 clean_sub_unsub_inflight();
10239 void set_pingresp_timer() {
10240 if (pingresp_timeout_ == std::chrono::steady_clock::duration::zero())
return;
10241 if (tim_pingresp_set_)
return;
10242 tim_pingresp_set_ =
true;
10243 tim_pingresp_.expires_after(pingresp_timeout_);
10244 std::weak_ptr<this_type> wp(std::static_pointer_cast<this_type>(this->shared_from_this()));
10245 tim_pingresp_.async_wait(
10247 if (
auto sp = wp.lock()) {
10248 sp->tim_pingresp_set_ = false;
10252 sp->force_disconnect();
10262 if (
auto ta_max = get_topic_alias_maximum_from_props(props)) {
10263 if (ta_max.value() == 0) {
10264 topic_alias_recv_ = nullopt;
10267 topic_alias_recv_.emplace(ta_max.value());
10271 if (topic_alias_recv_&& topic_alias_recv_.value().max() != 0) {
10272 props.emplace_back(
10279 static optional<topic_alias_t> get_topic_alias_maximum_from_prop(
v5::property_variant const& prop) {
10280 optional<topic_alias_t> val;
10283 [&val](v5::property::topic_alias_maximum
const& p) {
10293 static optional<topic_alias_t> get_topic_alias_maximum_from_props(
v5::properties const& props) {
10294 for (
auto const& prop : props) {
10295 if (
auto val = get_topic_alias_maximum_from_prop(prop)) {
10303 optional<topic_alias_t> val;
10306 [&val](v5::property::topic_alias
const& p) {
10316 static optional<topic_alias_t> get_topic_alias_from_props(
v5::properties const& props) {
10317 for (
auto const& prop : props) {
10318 if (
auto val = get_topic_alias_from_prop(prop)) {
10334 bool clean_start_{
false};
10337 std::shared_ptr<MQTT_NS::socket> socket_;
10338 std::atomic<bool> connected_{
false};
10339 std::atomic<bool> mqtt_connected_{
false};
10341 std::array<char, 10> buf_;
10342 std::uint8_t fixed_header_;
10343 std::size_t remaining_length_multiplier_;
10344 std::size_t remaining_length_;
10345 std::vector<char> payload_;
10349 std::set<packet_id_t> qos2_publish_handled_;
10350 std::deque<async_packet> queue_;
10352 packet_id_manager<packet_id_t> pid_man_;
10354 Mutex sub_unsub_inflight_mtx_;
10355 std::set<packet_id_t> sub_unsub_inflight_;
10356 bool auto_pub_response_{
true};
10357 bool auto_pub_response_async_{
false};
10358 bool async_send_store_ {
false };
10359 bool async_read_on_message_processed_ {
true };
10360 bool disconnect_requested_{
false};
10361 bool connect_requested_{
false};
10362 std::size_t max_queue_send_count_{1};
10363 std::size_t max_queue_send_size_{0};
10365 std::size_t packet_bulk_read_limit_ = 256;
10366 std::size_t props_bulk_read_limit_ = packet_bulk_read_limit_;
10367 std::size_t total_bytes_sent_ = 0;
10368 std::size_t total_bytes_received_ = 0;
10369 static constexpr std::uint8_t variable_length_continue_flag = 0b10000000;
10371 std::chrono::steady_clock::duration pingresp_timeout_ = std::chrono::steady_clock::duration::zero();
10372 as::steady_timer tim_pingresp_;
10373 bool tim_pingresp_set_ =
false;
10375 bool auto_map_topic_alias_send_ =
false;
10376 bool auto_replace_topic_alias_send_ =
false;
10377 mutable Mutex topic_alias_send_mtx_;
10378 optional<topic_alias_send> topic_alias_send_;
10380 mutable Mutex topic_alias_recv_mtx_;
10381 optional<topic_alias_recv> topic_alias_recv_;
10386 #include <boost/asio/unyield.hpp>
10388 #if defined(__GNUC__)
10389 #pragma GCC diagnostic pop
#define MQTT_ALWAYS_INLINE
Definition: attributes.hpp:18
buffer that has string_view interface This class provides string_view interface. This class hold stri...
Definition: buffer.hpp:30
Definition: endpoint.hpp:171
void async_unsuback(packet_id_t packet_id, async_handler_t func={})
Send ununsuback packet. This function is for broker.
Definition: endpoint.hpp:4173
void async_send_store_message(basic_store_message_variant< PacketIdBytes > msg, any life_keeper, async_handler_t func)
Definition: endpoint.hpp:4645
protocol_version get_protocol_version() const
Definition: endpoint.hpp:4763
void async_subscribe(packet_id_t packet_id, buffer topic_filter, subscribe_options option, v5::properties props, async_handler_t func={})
Subscribe.
Definition: endpoint.hpp:2878
void async_subscribe(packet_id_t packet_id, std::vector< std::tuple< buffer, subscribe_options >> params, async_handler_t func={})
Subscribe.
Definition: endpoint.hpp:3080
void async_subscribe(packet_id_t packet_id, std::vector< std::tuple< std::string, subscribe_options >> params, v5::properties props, async_handler_t func={})
Subscribe.
Definition: endpoint.hpp:2972
void async_pingresp(async_handler_t func={})
Send pingresp packet. This function is for broker.
Definition: endpoint.hpp:3555
void async_subscribe(packet_id_t packet_id, std::string topic_filter, subscribe_options option, async_handler_t func={})
Subscribe.
Definition: endpoint.hpp:2646
void pubrel(packet_id_t packet_id, v5::pubrel_reason_code reason_code=v5::pubrel_reason_code::success, v5::properties props={}, any life_keeper={})
Send pubrel packet.
Definition: endpoint.hpp:1954
void unsubscribe(packet_id_t packet_id, std::vector< as::const_buffer > params, v5::properties props={})
Unsubscribe with already acquired packet identifier.
Definition: endpoint.hpp:1632
void async_publish(packet_id_t packet_id, std::string topic_name, std::string contents, publish_options pubopts={}, async_handler_t func={})
Publish with a manual set packet identifier.
Definition: endpoint.hpp:2264
void unsuback(packet_id_t packet_id)
Send unsuback packet. This function is for broker.
Definition: endpoint.hpp:2055
std::enable_if_t< ! std::is_convertible< std::decay_t< T >, packet_id_t >::value, packet_id_t > publish(T &&t, Params &&... params)
Publish.
Definition: endpoint.hpp:941
void connect(std::string const &client_id, optional< std::string > const &user_name, optional< std::string > const &password, optional< will > w, std::uint16_t keep_alive_sec, v5::properties props={})
Send connect packet.
Definition: endpoint.hpp:1763
void unsubscribe(packet_id_t packet_id, std::vector< buffer > params, v5::properties props={})
Unsubscribe with already acquired packet identifier.
Definition: endpoint.hpp:1664
endpoint(as::io_context &ioc, protocol_version version=protocol_version::undetermined, bool async_send_store=false)
Constructor for client.
Definition: endpoint.hpp:182
void async_unsubscribe(packet_id_t packet_id, std::vector< as::const_buffer > params, v5::properties props, async_handler_t func)
Unsubscribe.
Definition: endpoint.hpp:3433
void async_unsuback(packet_id_t packet_id, v5::unsuback_reason_code reason, v5::properties props, async_handler_t func={})
Send unsuback packet. This function is for broker.
Definition: endpoint.hpp:4097
void async_connect(buffer client_id, optional< buffer > user_name, optional< buffer > password, optional< will > w, std::uint16_t keep_alive_sec, v5::properties props, async_handler_t func={})
Send connect packet.
Definition: endpoint.hpp:3658
void async_auth(v5::auth_reason_code reason_code=v5::auth_reason_code::success, v5::properties props={}, async_handler_t func={})
Send auth packet.
Definition: endpoint.hpp:3577
void async_subscribe(packet_id_t packet_id, std::vector< std::tuple< buffer, subscribe_options >> params, v5::properties props, async_handler_t func={})
Subscribe.
Definition: endpoint.hpp:3125
virtual void on_close() noexcept=0
Close handler.
void restore_serialized_message(basic_store_message_variant< PacketIdBytes > msg, any life_keeper={})
Definition: endpoint.hpp:4579
void async_unsubscribe(packet_id_t packet_id, as::const_buffer topic_filter, async_handler_t func)
Unsubscribe.
Definition: endpoint.hpp:3204
void force_disconnect()
Disconnect by endpoint Force disconnect. It is not a clean disconnect sequence. When the endpoint di...
Definition: endpoint.hpp:1034
bool connected() const
Check connection status.
Definition: endpoint.hpp:4710
friend struct process_connack
Definition: endpoint.hpp:7112
void suback(packet_id_t packet_id, variant< suback_return_code, v5::suback_reason_code > reason, v5::properties props={})
Send suback packet. This function is for broker.
Definition: endpoint.hpp:2007
void async_pubcomp(packet_id_t packet_id, async_handler_t func={})
Send pubcomp packet.
Definition: endpoint.hpp:3897
void for_each_store(std::function< void(char const *, std::size_t)> const &f)
Apply f to stored messages.
Definition: endpoint.hpp:4201
void async_suback(packet_id_t packet_id, variant< suback_return_code, v5::suback_reason_code > reason, async_handler_t func={})
Send suback packet. This function is for broker.
Definition: endpoint.hpp:3950
void set_max_queue_send_count(std::size_t count)
Set maximum number of queued message sending. When async message sending function called during async...
Definition: endpoint.hpp:4743
void async_publish(packet_id_t packet_id, std::string topic_name, std::string contents, publish_options pubopts, v5::properties props, any life_keeper={}, async_handler_t func={})
Publish with a manual set packet identifier.
Definition: endpoint.hpp:2325
std::size_t get_total_bytes_received() const
get_total_bytes_received
Definition: endpoint.hpp:836
void async_unsubscribe(packet_id_t packet_id, std::vector< as::const_buffer > params, async_handler_t func)
Unsubscribe.
Definition: endpoint.hpp:3398
std::function< void(error_code ec)> async_handler_t
Definition: endpoint.hpp:176
void async_connect(buffer client_id, optional< buffer > user_name, optional< buffer > password, optional< will > w, std::uint16_t keep_alive_sec, async_handler_t func={})
Send connect packet.
Definition: endpoint.hpp:3613
void start_session(any session_life_keeper=any())
start session with a connected endpoint.
Definition: endpoint.hpp:907
std::enable_if_t< ! std::is_convertible< std::decay_t< T >, packet_id_t >::value > async_unsubscribe(T &&t, Params &&... params)
Unsubscribe.
Definition: endpoint.hpp:2244
void set_topic_alias_maximum(topic_alias_t max)
set topic alias maximum for receiving
Definition: endpoint.hpp:897
virtual MQTT_ALWAYS_INLINE void on_mqtt_message_processed(any session_life_keeper)
next read handler This handler is called when the current mqtt message has been processed.
Definition: endpoint.hpp:796
void set_auto_map_topic_alias_send(bool b=true)
Set topic alias send auto mapping enable flag.
Definition: endpoint.hpp:868
bool underlying_connected() const
Check underlying layer connection status.
Definition: endpoint.hpp:4718
void async_suback(packet_id_t packet_id, variant< std::vector< suback_return_code >, std::vector< v5::suback_reason_code >> reasons, async_handler_t func={})
Send suback packet. This function is for broker.
Definition: endpoint.hpp:4015
void async_unsubscribe(packet_id_t packet_id, buffer topic_filter, async_handler_t func={})
Unsubscribe.
Definition: endpoint.hpp:3229
bool handle_close_or_error(error_code ec)
Definition: endpoint.hpp:4816
void subscribe(packet_id_t packet_id, std::vector< std::tuple< string_view, subscribe_options >> params, v5::properties props={})
Subscribe with already acquired packet identifier.
Definition: endpoint.hpp:1476
void async_connack(bool session_present, variant< connect_return_code, v5::connect_reason_code > reason_code, async_handler_t func={})
Send connack packet. This function is for broker.
Definition: endpoint.hpp:3693
void disconnect(v5::disconnect_reason_code reason=v5::disconnect_reason_code::normal_disconnection, v5::properties props={})
Disconnect Send a disconnect packet to the connected broker. It is a clean disconnecting sequence....
Definition: endpoint.hpp:1014
void async_subscribe(packet_id_t packet_id, std::string topic_filter, subscribe_options option, v5::properties props, async_handler_t func={})
Subscribe.
Definition: endpoint.hpp:2696
void unsubscribe(packet_id_t packet_id, as::const_buffer topic_filter, v5::properties props={})
Unsubscribe with already acquired packet identifier.
Definition: endpoint.hpp:1573
void set_max_queue_send_size(std::size_t size)
Set maximum size of queued message sending. When async message sending function called during asynchr...
Definition: endpoint.hpp:4759
void release_packet_id(packet_id_t packet_id)
Release packet_id.
Definition: endpoint.hpp:4291
void async_puback(packet_id_t packet_id, v5::puback_reason_code reason_code, v5::properties props, async_handler_t func={})
Send puback packet.
Definition: endpoint.hpp:3768
void async_subscribe(packet_id_t packet_id, std::vector< std::tuple< as::const_buffer, subscribe_options >> params, v5::properties props, async_handler_t func)
Subscribe.
Definition: endpoint.hpp:3050
void async_subscribe(packet_id_t packet_id, buffer topic_filter, subscribe_options option, async_handler_t func={})
Subscribe.
Definition: endpoint.hpp:2830
void async_subscribe(packet_id_t packet_id, std::vector< std::tuple< as::const_buffer, subscribe_options >> params, async_handler_t func)
Subscribe.
Definition: endpoint.hpp:3017
void async_unsuback(packet_id_t packet_id, std::vector< v5::unsuback_reason_code > reasons, async_handler_t func={})
Send unsuback packet. This function is for broker.
Definition: endpoint.hpp:4123
void set_props_bulk_read_limit(std::size_t size)
Definition: endpoint.hpp:888
MQTT_NS::socket & socket()
Definition: endpoint.hpp:4771
void suback(packet_id_t packet_id, variant< std::vector< suback_return_code >, std::vector< v5::suback_reason_code >> reasons, v5::properties props={})
Send suback packet. This function is for broker.
Definition: endpoint.hpp:2038
void unsuback(packet_id_t packet_id, v5::unsuback_reason_code reason, v5::properties props={})
Send unsuback packet. This function is for broker.
Definition: endpoint.hpp:2078
auto get_executor()
Definition: endpoint.hpp:4775
void async_suback(packet_id_t packet_id, variant< suback_return_code, v5::suback_reason_code > reason, v5::properties props, async_handler_t func={})
Send suback packet. This function is for broker.
Definition: endpoint.hpp:3984
void puback(packet_id_t packet_id, v5::puback_reason_code reason_code=v5::puback_reason_code::success, v5::properties props={})
Send puback packet.
Definition: endpoint.hpp:1895
void async_disconnect(v5::disconnect_reason_code reason, v5::properties props, async_handler_t func={})
Disconnect.
Definition: endpoint.hpp:2193
std::enable_if< as::is_const_buffer_sequence< ConstBufferSequence >::value >::type publish(packet_id_t packet_id, as::const_buffer topic_name, ConstBufferSequence contents, publish_options pubopts, any life_keeper)
Publish with already acquired packet identifier.
Definition: endpoint.hpp:1206
bool clean_session() const
Get clean session.
Definition: endpoint.hpp:816
std::enable_if< as::is_const_buffer_sequence< ConstBufferSequence >::value >::type async_publish(packet_id_t packet_id, as::const_buffer topic_name, ConstBufferSequence contents, publish_options pubopts={}, any life_keeper={}, async_handler_t func={})
Publish with a manual set packet identifier.
Definition: endpoint.hpp:2392
MQTT_NS::socket const & socket() const
Definition: endpoint.hpp:4767
optional< packet_id_t > acquire_unique_packet_id_no_except()
Acquire the new unique packet id. If all packet ids are already in use, then returns nullopt After ac...
Definition: endpoint.hpp:4268
endpoint(as::io_context &ioc, std::shared_ptr< MQTT_NS::socket > socket, protocol_version version=protocol_version::undetermined, bool async_send_store=false)
Constructor for server. socket should have already been connected with another endpoint.
Definition: endpoint.hpp:198
void set_pingresp_timeout(std::chrono::steady_clock::duration tim)
Set pingresp timeout.
Definition: endpoint.hpp:4789
void pingresp()
Send pingresp packet. This function is for broker. See https://docs.oasis-open.org/mqtt/mqtt/v5....
Definition: endpoint.hpp:1700
endpoint(this_type const &)=delete
void async_unsubscribe(packet_id_t packet_id, std::vector< buffer > params, v5::properties props, async_handler_t func={})
Unsubscribe.
Definition: endpoint.hpp:3507
typename packet_id_type< PacketIdBytes >::type packet_id_t
Definition: endpoint.hpp:177
std::enable_if< is_buffer_sequence< BufferSequence >::value >::type publish(packet_id_t packet_id, buffer topic_name, BufferSequence contents, publish_options pubopts={}, any life_keeper={})
Publish with already acquired packet identifier.
Definition: endpoint.hpp:1261
virtual void on_pre_send() noexcept=0
Pre-send handler This handler is called when any mqtt control packet is decided to send.
void async_subscribe(packet_id_t packet_id, std::vector< std::tuple< std::string, subscribe_options >> params, async_handler_t func={})
Subscribe.
Definition: endpoint.hpp:2921
void auth(v5::auth_reason_code reason_code=v5::auth_reason_code::success, v5::properties props={})
Send auth packet. See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc398718086.
Definition: endpoint.hpp:1721
std::enable_if< as::is_const_buffer_sequence< ConstBufferSequence >::value >::type publish(packet_id_t packet_id, as::const_buffer topic_name, ConstBufferSequence contents, publish_options pubopts, v5::properties props, any life_keeper)
Publish with already acquired packet identifier.
Definition: endpoint.hpp:1152
void async_unsubscribe(packet_id_t packet_id, std::string topic_filter, async_handler_t func={})
Unsubscribe.
Definition: endpoint.hpp:3168
void async_subscribe(packet_id_t packet_id, as::const_buffer topic_filter, subscribe_options option, v5::properties props, async_handler_t func)
Subscribe.
Definition: endpoint.hpp:2789
endpoint & operator=(this_type &&)=delete
void restore_serialized_message(basic_pubrel_message< PacketIdBytes > msg, any life_keeper={})
Restore serialized pubrel message. This function should be called before connect.
Definition: endpoint.hpp:4403
endpoint & operator=(this_type const &)=delete
packet_id_t acquire_unique_packet_id()
Acquire the new unique packet id. If all packet ids are already in use, then throw packet_id_exhauste...
Definition: endpoint.hpp:4255
void unsuback(packet_id_t packet_id, std::vector< v5::unsuback_reason_code > reasons, v5::properties props={})
Send unsuback packet. This function is for broker.
Definition: endpoint.hpp:2104
std::enable_if_t< ! std::is_convertible< std::decay_t< T >, packet_id_t >::value > async_publish(T &&t, Params &&... params)
Publish.
Definition: endpoint.hpp:2143
void clear_session_data()
Definition: endpoint.hpp:4856
void subscribe(packet_id_t packet_id, std::vector< std::tuple< buffer, subscribe_options >> params, v5::properties props={})
Subscribe with already acquired packet identifier.
Definition: endpoint.hpp:1507
void async_unsubscribe(packet_id_t packet_id, buffer topic_filter, v5::properties props, async_handler_t func={})
Unsubscribe.
Definition: endpoint.hpp:3265
std::enable_if_t< std::is_convertible< typename Iterator::value_type, char >::value > restore_serialized_message(Iterator b, Iterator e)
Restore serialized publish and pubrel messages. This function should be called before connect.
Definition: endpoint.hpp:4304
void async_pubrec(packet_id_t packet_id, v5::pubrec_reason_code reason_code, v5::properties props, async_handler_t func={})
Send pubrec packet.
Definition: endpoint.hpp:3817
void async_puback(packet_id_t packet_id, async_handler_t func={})
Send puback packet.
Definition: endpoint.hpp:3741
virtual void on_error(error_code ec) noexcept=0
Error handler.
void async_read_next_message(any session_life_keeper)
Trigger next mqtt message manually. If you call this function, you need to set manual receive mode us...
Definition: endpoint.hpp:4727
void async_unsuback(packet_id_t packet_id, std::vector< v5::unsuback_reason_code > reasons, v5::properties props, async_handler_t func={})
Send unsuback packet. This function is for broker.
Definition: endpoint.hpp:4151
void pubrec(packet_id_t packet_id, v5::pubrec_reason_code reason_code=v5::pubrec_reason_code::success, v5::properties props={})
Send packet.
Definition: endpoint.hpp:1921
void set_packet_bulk_read_limit(std::size_t size)
Definition: endpoint.hpp:884
void set_protocol_version(protocol_version version)
Definition: endpoint.hpp:4852
void set_auto_replace_topic_alias_send(bool b=true)
Set topic alias send auto replacing enable flag.
Definition: endpoint.hpp:880
friend struct process_connect
Definition: endpoint.hpp:6904
std::shared_ptr< MQTT_NS::socket > & socket_sp_ref()
Get shared_ptr of socket.
Definition: endpoint.hpp:4799
void async_pubrel(packet_id_t packet_id, async_handler_t func={})
Send pubrel packet.
Definition: endpoint.hpp:3839
void async_connack(bool session_present, variant< connect_return_code, v5::connect_reason_code > reason_code, v5::properties props, async_handler_t func={})
Send connack packet. This function is for broker.
Definition: endpoint.hpp:3719
std::enable_if< as::is_const_buffer_sequence< ConstBufferSequence >::value >::type async_publish(packet_id_t packet_id, as::const_buffer topic_name, ConstBufferSequence contents, publish_options pubopts, v5::properties props, any life_keeper={}, async_handler_t func={})
Publish with a manual set packet identifier.
Definition: endpoint.hpp:2449
void async_unsubscribe(packet_id_t packet_id, std::vector< buffer > params, async_handler_t func={})
Unsubscribe.
Definition: endpoint.hpp:3464
std::enable_if_t< ! std::is_convertible< std::decay_t< T >, packet_id_t >::value, packet_id_t > subscribe(T &&t, Params &&... params)
Subscribe.
Definition: endpoint.hpp:972
void connect(buffer client_id, optional< buffer > user_name, optional< buffer > password, optional< will > w, std::uint16_t keep_alive_sec, v5::properties props={})
Send connect packet.
Definition: endpoint.hpp:1833
void restore_serialized_message(basic_publish_message< PacketIdBytes > msg, any life_keeper={})
Restore serialized publish message. This function should be called before connect.
Definition: endpoint.hpp:4366
std::enable_if< is_buffer_sequence< BufferSequence >::value >::type async_publish(packet_id_t packet_id, buffer topic_name, BufferSequence contents, publish_options pubopts, v5::properties props, any life_keeper={}, async_handler_t func={})
Publish with a manual set packet identifier.
Definition: endpoint.hpp:2580
void for_each_store(std::function< void(basic_store_message_variant< PacketIdBytes >)> const &f)
Apply f to stored messages.
Definition: endpoint.hpp:4218
void async_disconnect(async_handler_t func={})
Disconnect.
Definition: endpoint.hpp:2161
void set_auto_pub_response(bool b=true, bool async=true)
Set auto publish response mode.
Definition: endpoint.hpp:855
std::enable_if_t< std::is_convertible< typename Iterator::value_type, char >::value > restore_v5_serialized_message(Iterator b, Iterator e)
Restore serialized publish and pubrel messages. This function shouold be called before connect.
Definition: endpoint.hpp:4441
void async_pingreq(async_handler_t func={})
Send pingreq packet.
Definition: endpoint.hpp:3541
void clear_stored_publish(packet_id_t packet_id)
Clear stored publish message that has packet_id.
Definition: endpoint.hpp:4189
void async_unsubscribe(packet_id_t packet_id, std::vector< std::string > params, async_handler_t func={})
Unsubscribe.
Definition: endpoint.hpp:3299
std::enable_if_t< ! std::is_convertible< std::decay_t< T >, packet_id_t >::value, packet_id_t > unsubscribe(T &&t, Params &&... params)
Unsubscribe.
Definition: endpoint.hpp:993
bool register_packet_id(packet_id_t packet_id)
Register packet_id to the library. After registering the packet_id, you can call acquired_* functions...
Definition: endpoint.hpp:4280
void subscribe(packet_id_t packet_id, as::const_buffer topic_filter, subscribe_options option, v5::properties props={})
Subscribe with already acquired packet identifier.
Definition: endpoint.hpp:1440
void async_pubrec(packet_id_t packet_id, async_handler_t func={})
Send pubrec packet.
Definition: endpoint.hpp:3790
std::enable_if_t< ! std::is_convertible< std::decay_t< T >, packet_id_t >::value > async_subscribe(T &&t, Params &&... params)
Subscribe.
Definition: endpoint.hpp:2227
bool clean_start_
Definition: endpoint.hpp:10334
void subscribe(packet_id_t packet_id, string_view topic_filter, subscribe_options option, v5::properties props={})
Subscribe with already acquired packet identifier.
Definition: endpoint.hpp:1397
std::enable_if< is_buffer_sequence< BufferSequence >::value >::type publish(packet_id_t packet_id, buffer topic_name, BufferSequence contents, publish_options pubopts, v5::properties props, any life_keeper={})
Publish with already acquired packet identifier.
Definition: endpoint.hpp:1334
void async_unsubscribe(packet_id_t packet_id, std::vector< std::string > params, v5::properties props, async_handler_t func={})
Unsubscribe.
Definition: endpoint.hpp:3347
void for_each_store_with_life_keeper(std::function< void(basic_store_message_variant< PacketIdBytes >, any)> const &f)
Apply f to stored messages.
Definition: endpoint.hpp:4233
void set_connect()
Definition: endpoint.hpp:4848
friend struct process_publish
Definition: endpoint.hpp:7356
void publish(packet_id_t packet_id, std::string topic_name, std::string contents, publish_options pubopts={}, v5::properties props={}, any life_keeper={})
Publish with already acquired packet identifier.
Definition: endpoint.hpp:1076
void restore_v5_serialized_message(v5::basic_publish_message< PacketIdBytes > msg, any life_keeper={})
Restore serialized publish message. This function shouold be called before connect.
Definition: endpoint.hpp:4485
void async_pubcomp(packet_id_t packet_id, v5::pubcomp_reason_code reason_code, v5::properties props, async_handler_t func={})
Send pubcomp packet.
Definition: endpoint.hpp:3924
std::size_t get_total_bytes_sent() const
get_total_bytes_sent
Definition: endpoint.hpp:844
void async_read_control_packet_type(any session_life_keeper)
Definition: endpoint.hpp:4803
void send_store_message(basic_store_message_variant< PacketIdBytes > msg, any life_keeper)
Definition: endpoint.hpp:4584
bool clean_start() const
Get clean start.
Definition: endpoint.hpp:828
void async_subscribe(packet_id_t packet_id, as::const_buffer topic_filter, subscribe_options option, async_handler_t func)
Subscribe.
Definition: endpoint.hpp:2744
void connack(bool session_present, variant< connect_return_code, v5::connect_reason_code > reason_code, v5::properties props={})
Send connack packet. This function is for broker.
Definition: endpoint.hpp:1869
endpoint(this_type &&)=delete
void unsubscribe(packet_id_t packet_id, std::vector< string_view > params, v5::properties props={})
Unsubscribe with already acquired packet identifier.
Definition: endpoint.hpp:1600
void pubcomp(packet_id_t packet_id, v5::pubcomp_reason_code reason_code=v5::pubcomp_reason_code::success, v5::properties props={})
Send pubcomp packet.
Definition: endpoint.hpp:1981
void async_unsuback(packet_id_t packet_id, v5::unsuback_reason_code reason, async_handler_t func={})
Send unsuback packet. This function is for broker.
Definition: endpoint.hpp:4068
std::enable_if< is_buffer_sequence< BufferSequence >::value >::type async_publish(packet_id_t packet_id, buffer topic_name, BufferSequence contents, publish_options pubopts={}, any life_keeper={}, async_handler_t func={})
Publish with a manual set packet identifier.
Definition: endpoint.hpp:2503
void pingreq()
Send pingreq packet. See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os....
Definition: endpoint.hpp:1687
void async_pubrel(packet_id_t packet_id, v5::pubrel_reason_code reason_code, v5::properties props={}, any life_keeper={}, async_handler_t func={})
Send pubrel packet.
Definition: endpoint.hpp:3874
void restore_v5_serialized_message(v5::basic_pubrel_message< PacketIdBytes > msg, any life_keeper={})
Restore serialized pubrel message. This function shouold be called before connect.
Definition: endpoint.hpp:4525
void unsubscribe(packet_id_t packet_id, string_view topic_filter, v5::properties props={})
Unsubscribe with already acquired packet identifier.
Definition: endpoint.hpp:1539
void async_suback(packet_id_t packet_id, variant< std::vector< suback_return_code >, std::vector< v5::suback_reason_code >> reasons, v5::properties props, async_handler_t func={})
Send suback packet. This function is for broker.
Definition: endpoint.hpp:4043
void clear()
Clear all packet ids.
Definition: packet_id_manager.hpp:59
optional< packet_id_t > acquire_unique_id()
Acquire the new unique packet id. If all packet ids are already in use, then returns nullopt After ac...
Definition: packet_id_manager.hpp:31
bool register_id(packet_id_t packet_id)
Register packet_id to the library. After registering the packet_id, you can call acquired_* functions...
Definition: packet_id_manager.hpp:42
void release_id(packet_id_t packet_id)
Release packet_id.
Definition: packet_id_manager.hpp:52
Definition: type_erased_socket.hpp:22
virtual as::ip::tcp::socket::lowest_layer_type & lowest_layer()=0
Definition: message.hpp:505
constexpr qos get_qos() const
Get qos.
Definition: message.hpp:690
packet_id_type< PacketIdBytes >::type packet_id() const
Get packet id.
Definition: message.hpp:674
Definition: v5_message.hpp:544
packet_id_type< PacketIdBytes >::type packet_id() const
Get packet id.
Definition: v5_message.hpp:796
string_view topic() const
Get topic name.
Definition: v5_message.hpp:836
constexpr qos get_qos() const
Get qos.
Definition: v5_message.hpp:812
Definition: property.hpp:462
endpoint_t::packet_id_t packet_id_t
Definition: common_type.hpp:20
#define MQTT_LOG(chan, sev)
Definition: log.hpp:135
#define MQTT_ADD_VALUE(name, val)
Definition: log.hpp:136
constexpr bool has_password_flag(char v)
Definition: connect_flags.hpp:43
constexpr qos will_qos(char v)
Definition: connect_flags.hpp:55
constexpr retain has_will_retain(char v)
Definition: connect_flags.hpp:37
constexpr bool has_user_name_flag(char v)
Definition: connect_flags.hpp:47
constexpr bool has_clean_start(char v)
Definition: connect_flags.hpp:29
constexpr char const clean_start
Definition: connect_flags.hpp:19
constexpr bool has_will_flag(char v)
Definition: connect_flags.hpp:33
constexpr char const clean_session
Definition: connect_flags.hpp:18
constexpr bool should_generate_packet_id(Params const &... params)
Definition: endpoint.hpp:151
constexpr std::enable_if_t< ! std::is_convertible< std::decay_t< T >, publish_options >::value, bool > check_qos_value(T const &)
Definition: endpoint.hpp:142
constexpr qos get_qos(std::uint8_t v)
Definition: publish.hpp:26
@ well_formed
UTF-8 string is well_formed. See http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3....
constexpr validation validate_contents(string_view str)
Definition: utf8encoded_strings.hpp:47
id
Definition: property_id.hpp:19
@ subscription_identifier
@ request_problem_information
@ message_expiry_interval
@ shared_subscription_available
@ request_response_information
@ session_expiry_interval
@ assigned_client_identifier
@ subscription_identifier_available
@ payload_format_indicator
@ wildcard_subscription_available
connect_reason_code
Definition: reason_code.hpp:50
unsuback_reason_code
Definition: reason_code.hpp:237
auth_reason_code
Definition: reason_code.hpp:385
suback_reason_code
Definition: reason_code.hpp:191
pubrel_reason_code
Definition: reason_code.hpp:341
puback_reason_code
Definition: reason_code.hpp:269
pubrec_reason_code
Definition: reason_code.hpp:305
pubcomp_reason_code
Definition: reason_code.hpp:363
std::vector< property_variant > properties
Definition: property_variant.hpp:51
disconnect_reason_code
Definition: reason_code.hpp:114
variant< property::payload_format_indicator, property::message_expiry_interval, property::content_type, property::response_topic, property::correlation_data, property::subscription_identifier, property::session_expiry_interval, property::assigned_client_identifier, property::server_keep_alive, property::authentication_method, property::authentication_data, property::request_problem_information, property::will_delay_interval, property::request_response_information, property::response_information, property::server_reference, property::reason_string, property::receive_maximum, property::topic_alias_maximum, property::topic_alias, property::maximum_qos, property::retain_available, property::user_property, property::maximum_packet_size, property::wildcard_subscription_available, property::subscription_identifier_available, property::shared_subscription_available > property_variant
Definition: property_variant.hpp:49
std::string remaining_bytes(std::size_t size)
Definition: remaining_length.hpp:17
boost::string_ref string_view
Definition: string_view.hpp:64
optional< share_name_topic_filter > parse_shared_subscription(buffer whole_topic_filter)
Definition: shared_subscriptions.hpp:51
constexpr decltype(auto) visit(Visitor &&vis, Variants &&... vars)
Definition: variant.hpp:60
variant< v3_1_1::basic_publish_message< PacketIdBytes >, v3_1_1::basic_pubrel_message< PacketIdBytes >, v5::basic_publish_message< PacketIdBytes >, v5::basic_pubrel_message< PacketIdBytes > > basic_store_message_variant
Definition: message_variant.hpp:117
constexpr std::tuple< std::size_t, std::size_t > remaining_length(string_view bytes)
Definition: remaining_length.hpp:24
suback_return_code
Definition: reason_code.hpp:18
control_packet_type
Definition: control_packet_type.hpp:18
constexpr control_packet_type get_control_packet_type(std::uint8_t v)
Definition: control_packet_type.hpp:39
boost::system::error_code error_code
Definition: error_code.hpp:16
constexpr std::remove_reference_t< T > && force_move(T &&t)
Definition: move.hpp:20
std::uint16_t topic_alias_t
Definition: type.hpp:17
buffer allocate_buffer(Iterator b, Iterator e)
create buffer from the pair of iterators It copies string that from b to e into shared_ptr_array....
Definition: buffer.hpp:130
decltype(auto) variant_get(U &&arg)
Definition: variant.hpp:48
std::size_t num_of_const_buffer_sequence(basic_message_variant< PacketIdBytes > const &mv)
Definition: message_variant.hpp:98
auto shared_scope_guard(Proc &&proc)
Definition: shared_scope_guard.hpp:17
std::vector< as::const_buffer > const_buffer_sequence(basic_message_variant< PacketIdBytes > const &mv)
Definition: message_variant.hpp:87
char const * get_pointer(as::const_buffer const &cb)
Definition: const_buffer_util.hpp:17
connect_return_code
Definition: connect_return_code.hpp:17
buffer const * buffer_sequence_end(buffer const &buf)
Definition: buffer.hpp:155
std::string continuous_buffer(basic_message_variant< PacketIdBytes > const &mv)
Definition: message_variant.hpp:104
buffer const * buffer_sequence_begin(buffer const &buf)
Definition: buffer.hpp:151
lambda_visitor< Lambdas... > make_lambda_visitor(Lambdas &&... lambdas)
Definition: visitor_util.hpp:37
std::size_t size(basic_message_variant< PacketIdBytes > const &mv)
Definition: message_variant.hpp:93
protocol_version
Definition: protocol_version.hpp:17
qos
Definition: subscribe_options.hpp:34
constexpr bool is_session_present(char v)
Definition: session_present.hpp:14
constexpr std::uint16_t make_uint16_t(It b, It e)
Definition: two_byte_util.hpp:34
decltype(auto) variant_idx(T const &arg)
Definition: variant.hpp:54
optional< control_packet_type > get_control_packet_type_with_check(std::uint8_t v)
Definition: control_packet_type.hpp:90
std::size_t get_size(as::const_buffer const &cb)
Definition: const_buffer_util.hpp:21
Definition: buffer.hpp:242
const_buffer buffer(MQTT_NS::buffer const &data)
create boost::asio::const_buffer from the MQTT_NS::buffer boost::asio::const_buffer is a kind of view...
Definition: buffer.hpp:253
std::shared_ptr< char[]> shared_ptr_array
Type alias of shared_ptr char array. You can choose the target type.
Definition: shared_ptr_array.hpp:20
shared_ptr_array make_shared_ptr_array(std::size_t size)
shared_ptr_array creating function. You can choose the target type.
Definition: buffer.hpp:231
Definition: exception.hpp:103
Definition: exception.hpp:21
Definition: publish.hpp:53
constexpr qos get_qos() const
Definition: publish.hpp:84
constexpr retain get_retain() const
Definition: publish.hpp:80
constexpr dup get_dup() const
Definition: publish.hpp:82
Definition: subscribe_entry.hpp:18
Definition: subscribe_options.hpp:40
constexpr rap get_rap() const
Definition: subscribe_options.hpp:69
constexpr nl get_nl() const
Definition: subscribe_options.hpp:71
constexpr retain_handling get_retain_handling() const
Definition: subscribe_options.hpp:67
constexpr qos get_qos() const
Definition: subscribe_options.hpp:73
Definition: subscribe_entry.hpp:46
Definition: message.hpp:189
packet_id_type< PacketIdBytes >::type packet_id() const
Get packet id.
Definition: message.hpp:205
Definition: v5_message.hpp:1422
decltype(auto) packet_id() const
Get packet id.
Definition: v5_message.hpp:1711