7#if !defined(ASYNC_MQTT_STREAM_HPP)
8#define ASYNC_MQTT_STREAM_HPP
15#include <boost/system/error_code.hpp>
16#include <boost/asio/strand.hpp>
17#include <boost/asio/io_context.hpp>
18#include <boost/asio/compose.hpp>
19#include <boost/asio/bind_executor.hpp>
20#include <boost/asio/append.hpp>
21#include <boost/asio/consign.hpp>
23#if defined(ASYNC_MQTT_USE_WS)
24#include <boost/beast/websocket/stream.hpp>
27#include <async_mqtt/core/stream_traits.hpp>
28#include <async_mqtt/util/make_shared_helper.hpp>
29#include <async_mqtt/util/optional.hpp>
30#include <async_mqtt/util/static_vector.hpp>
31#include <async_mqtt/util/ioc_queue.hpp>
32#include <async_mqtt/buffer.hpp>
33#include <async_mqtt/constant.hpp>
34#include <async_mqtt/is_strand.hpp>
35#include <async_mqtt/ws_fixed_size_async_read.hpp>
36#include <async_mqtt/exception.hpp>
37#include <async_mqtt/tls.hpp>
38#include <async_mqtt/log.hpp>
42namespace as = boost::asio;
43namespace sys = boost::system;
45template <
typename Stream>
46struct is_ws :
public std::false_type {};
48#if defined(ASYNC_MQTT_USE_WS)
49template <
typename NextLayer>
50struct is_ws<bs::websocket::stream<NextLayer>> :
public std::true_type {};
53template <
typename Stream>
54struct is_tls :
public std::false_type {};
56#if defined(ASYNC_MQTT_USE_TLS)
57template <
typename NextLayer>
58struct is_tls<tls::stream<NextLayer>> :
public std::true_type {};
61template <
typename NextLayer>
62class stream :
public std::enable_shared_from_this<stream<NextLayer>> {
64 using this_type = stream<NextLayer>;
65 using this_type_sp = std::shared_ptr<this_type>;
66 using next_layer_type =
typename std::remove_reference<NextLayer>::type;
68 using raw_strand_type = as::strand<executor_type>;
69 using strand_type = as::strand<as::any_io_executor>;
72 friend class make_shared_helper;
77 std::enable_if_t<!std::is_same_v<std::decay_t<T>, this_type>>* =
nullptr
79 static std::shared_ptr<this_type> create(T&& t, Args&&... args) {
80 return make_shared_helper<this_type>::make_shared(std::forward<T>(t), std::forward<Args>(args)...);
84 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
85 << ASYNC_MQTT_ADD_VALUE(address,
this)
89 stream(this_type&&) =
delete;
90 stream(this_type
const&) =
delete;
91 this_type& operator=(this_type&&) =
delete;
92 this_type& operator=(this_type
const&) =
delete;
94 auto const& next_layer()
const {
101 auto const& lowest_layer()
const {
102 return get_lowest_layer(nl_);
104 auto& lowest_layer() {
105 return get_lowest_layer(nl_);
108 auto get_executor()
const {
109 return nl_.get_executor();
111 auto get_executor() {
112 return nl_.get_executor();
115 template <
typename CompletionToken>
118 CompletionToken&& token
123 void(error_code
const&, buffer)
132 template <
typename Packet,
typename CompletionToken>
136 CompletionToken&& token
141 void(error_code
const&, std::size_t)
143 write_packet_impl<Packet>{
145 std::make_shared<Packet>(force_move(packet))
151 strand_type
const& strand()
const {
155 strand_type& strand() {
159 raw_strand_type
const& raw_strand()
const {
163 raw_strand_type& raw_strand() {
167 bool in_strand()
const {
168 return raw_strand().running_in_this_thread();
171 template<
typename CompletionToken>
173 close(CompletionToken&& token) {
177 void(error_code
const&)
192 std::enable_if_t<!std::is_same_v<std::decay_t<T>, this_type>>* =
nullptr
195 stream(T&& t, Args&&... args)
196 :nl_{std::forward<T>(t), std::forward<Args>(args)...}
198 if constexpr(is_ws<next_layer_type>::value) {
201 bs::websocket::stream_base::decorator(
202 [](bs::websocket::request_type& req) {
203 req.set(
"Sec-WebSocket-Protocol",
"mqtt");
210 struct read_packet_impl {
212 std::size_t received = 0;
213 std::uint32_t mul = 1;
214 std::uint32_t rl = 0;
215 shared_ptr_array spa =
nullptr;
216 this_type_sp life_keeper = strm.shared_from_this();
217 enum { dispatch, header, remaining_length, complete } state = dispatch;
219 template <
typename Self>
235 BOOST_ASSERT(strm.in_strand());
237 auto address = &strm.header_remaining_length_buf_[received];
241 as::buffer(address, 1),
254 template <
typename Self>
257 error_code
const& ec,
258 std::size_t bytes_transferred
260 BOOST_ASSERT(strm.in_strand());
262 self.complete(ec, buffer{});
268 BOOST_ASSERT(bytes_transferred == 1);
269 state = remaining_length;
273 auto address = &strm.header_remaining_length_buf_[received];
277 as::buffer(address, 1),
285 case remaining_length:
286 BOOST_ASSERT(bytes_transferred == 1);
288 if (strm.header_remaining_length_buf_[received - 1] & 0b10000000) {
291 ASYNC_MQTT_LOG(
"mqtt_impl", warning)
292 << ASYNC_MQTT_ADD_VALUE(address,
this)
293 <<
"out of size remaining length";
295 sys::errc::make_error_code(sys::errc::protocol_error),
300 rl += (strm.header_remaining_length_buf_[received - 1] & 0b01111111) * mul;
302 auto address = &strm.header_remaining_length_buf_[received];
306 as::buffer(address, 1),
315 rl += (strm.header_remaining_length_buf_[received - 1] & 0b01111111) * mul;
317 spa = make_shared_ptr_array(received + rl);
319 strm.header_remaining_length_buf_.data(),
320 strm.header_remaining_length_buf_.data() + received, spa.get()
324 auto ptr = spa.get();
325 self.complete(ec, buffer{ptr, ptr + received + rl, force_move(spa)});
330 auto address = &spa[std::ptrdiff_t(received)];
334 as::buffer(address, rl),
344 auto ptr = spa.get();
345 self.complete(ec, buffer{ptr, ptr + received + rl, force_move(spa)});
354 template <
typename Packet>
355 struct write_packet_impl {
357 std::shared_ptr<Packet> packet;
358 this_type_sp life_keeper = strm.shared_from_this();
359 enum { dispatch, post, write, complete } state = dispatch;
361 template <
typename Self>
377 BOOST_ASSERT(strm.in_strand());
388 BOOST_ASSERT(strm.in_strand());
389 strm.queue_.start_work();
390 if (strm.lowest_layer().is_open()) {
393 auto cbs = packet->const_buffer_sequence();
411 errc::make_error_code(errc::connection_reset),
424 template <
typename Self>
427 error_code
const& ec,
428 std::size_t bytes_transferred
430 BOOST_ASSERT(strm.in_strand());
432 strm.queue_.stop_work();
437 [&a_strm,wp = a_strm.weak_from_this()] {
438 if (auto sp = wp.lock()) {
439 a_strm.queue_.poll_one();
444 self.complete(ec, bytes_transferred);
449 strm.queue_.stop_work();
454 [&a_strm, wp = a_strm.weak_from_this()] {
455 if (auto sp = wp.lock()) {
456 a_strm.queue_.poll_one();
461 self.complete(ec, bytes_transferred);
478 this_type_sp life_keeper = strm.shared_from_this();
480 template <
typename Self>
484 BOOST_ASSERT(state == dispatch);
499#if defined(ASYNC_MQTT_USE_WS)
500 template <
typename Self,
typename Stream>
503 error_code
const& ec,
505 std::reference_wrapper<Stream> stream
507 BOOST_ASSERT(strm.in_strand());
508 if constexpr(is_ws<Stream>::value) {
509 BOOST_ASSERT(state == complete);
511 if (ec == bs::websocket::error::closed) {
512 ASYNC_MQTT_LOG(
"mqtt_impl", info)
513 << ASYNC_MQTT_ADD_VALUE(address,
this)
514 <<
"ws async_read (for close) success";
517 ASYNC_MQTT_LOG(
"mqtt_impl", info)
518 << ASYNC_MQTT_ADD_VALUE(address,
this)
519 <<
"ws async_read (for close):" << ec.message();
529 std::ref(stream.get().next_layer())
536 auto buffer = std::make_shared<bs::flat_buffer>();
537 stream.get().async_read(
555 template <
typename Self,
typename Stream>
558 error_code
const& ec,
559 std::reference_wrapper<Stream> stream
561 BOOST_ASSERT(strm.in_strand());
564 if constexpr(is_ws<Stream>::value) {
565 if (stream.get().is_open()) {
568 stream.get().async_close(
569 bs::websocket::close_code::none,
588 std::ref(stream.get().next_layer())
594 else if constexpr(is_tls<Stream>::value) {
596 ASYNC_MQTT_LOG(
"mqtt_impl", info)
597 << ASYNC_MQTT_ADD_VALUE(address,
this)
598 <<
"TLS async_shutdown start with timeout";
599 auto tim = std::make_shared<as::steady_timer>(a_strm.raw_strand_, shutdown_timeout);
603 [
this, &next_layer = stream.get().next_layer()] (error_code
const& ec) {
605 ASYNC_MQTT_LOG(
"mqtt_impl", info)
606 << ASYNC_MQTT_ADD_VALUE(address, this)
607 <<
"TLS async_shutdown timeout";
609 next_layer.close(ec);
614 stream.get().async_shutdown(
622 std::ref(stream.get().next_layer())
629 if (stream.get().is_open()) {
630 ASYNC_MQTT_LOG(
"mqtt_impl", info)
631 << ASYNC_MQTT_ADD_VALUE(address,
this)
633 stream.get().close(ec);
636 ASYNC_MQTT_LOG(
"mqtt_impl", info)
637 << ASYNC_MQTT_ADD_VALUE(address,
this)
638 <<
"TCP already closed";
644 if constexpr(is_ws<Stream>::value) {
646 ASYNC_MQTT_LOG(
"mqtt_impl", info)
647 << ASYNC_MQTT_ADD_VALUE(address,
this)
648 <<
"ws async_close:" << ec.message();
657 std::ref(stream.get().next_layer())
665 auto buffer = std::make_shared<bs::flat_buffer>();
666 stream.get().async_read(
690 raw_strand_type raw_strand_{nl_.get_executor()};
691 strand_type strand_{as::any_io_executor{raw_strand_}};
693 static_vector<char, 5> header_remaining_length_buf_ = static_vector<char, 5>(5);
Definition packet_variant.hpp:49