7#if !defined(ASYNC_MQTT_STREAM_HPP)
8#define ASYNC_MQTT_STREAM_HPP
15#include <boost/system/error_code.hpp>
16#include <boost/asio/strand.hpp>
17#include <boost/asio/io_context.hpp>
18#include <boost/asio/compose.hpp>
19#include <boost/asio/bind_executor.hpp>
20#include <boost/asio/append.hpp>
21#include <boost/asio/consign.hpp>
23#if defined(ASYNC_MQTT_USE_WS)
24#include <boost/beast/websocket/stream.hpp>
27#include <async_mqtt/stream_traits.hpp>
28#include <async_mqtt/util/make_shared_helper.hpp>
29#include <async_mqtt/util/optional.hpp>
30#include <async_mqtt/util/static_vector.hpp>
31#include <async_mqtt/util/ioc_queue.hpp>
32#include <async_mqtt/buffer.hpp>
33#include <async_mqtt/constant.hpp>
34#include <async_mqtt/is_strand.hpp>
35#include <async_mqtt/exception.hpp>
36#include <async_mqtt/tls.hpp>
37#include <async_mqtt/log.hpp>
41namespace as = boost::asio;
42namespace sys = boost::system;
44template <
typename Stream>
45struct is_ws :
public std::false_type {};
47#if defined(ASYNC_MQTT_USE_WS)
48namespace bs = boost::beast;
50template <
typename NextLayer>
51struct is_ws<bs::websocket::stream<NextLayer>> :
public std::true_type {};
55 typename ConstBufferSequence,
56 typename CompletionToken,
57 typename std::enable_if_t<
58 as::is_const_buffer_sequence<ConstBufferSequence>::value
63 bs::websocket::stream<NextLayer>& stream,
64 ConstBufferSequence
const& cbs,
65 CompletionToken&& token
67 return stream.async_write(cbs, std::forward<CompletionToken>(token));
72template <
typename Stream>
73struct is_tls :
public std::false_type {};
75#if defined(ASYNC_MQTT_USE_TLS)
76template <
typename NextLayer>
77struct is_tls<tls::stream<NextLayer>> :
public std::true_type {};
80template <
typename NextLayer,
template <
typename>
typename Strand = as::strand>
81class stream :
public std::enable_shared_from_this<stream<NextLayer, Strand>> {
83 using this_type = stream<NextLayer, Strand>;
84 using this_type_sp = std::shared_ptr<this_type>;
85 using next_layer_type =
typename std::remove_reference<NextLayer>::type;
87 using raw_strand_type = as::strand<executor_type>;
88 using strand_type = Strand<as::any_io_executor>;
91 friend class make_shared_helper;
96 std::enable_if_t<!std::is_same_v<std::decay_t<T>, this_type>>* =
nullptr
98 static std::shared_ptr<this_type> create(T&& t, Args&&... args) {
99 return make_shared_helper<this_type>::make_shared(std::forward<T>(t), std::forward<Args>(args)...);
103 ASYNC_MQTT_LOG(
"mqtt_impl", trace)
104 << ASYNC_MQTT_ADD_VALUE(address,
this)
108 stream(this_type&&) =
delete;
109 stream(this_type
const&) =
delete;
110 this_type& operator=(this_type&&) =
delete;
111 this_type& operator=(this_type
const&) =
delete;
113 auto const& next_layer()
const {
120 auto const& lowest_layer()
const {
121 return get_lowest_layer(nl_);
123 auto& lowest_layer() {
124 return get_lowest_layer(nl_);
127 auto get_executor()
const {
128 return nl_.get_executor();
130 auto get_executor() {
131 return nl_.get_executor();
134 template <
typename CompletionToken>
137 CompletionToken&& token
142 void(error_code
const&, buffer)
151 template <
typename Packet,
typename CompletionToken>
155 CompletionToken&& token
160 void(error_code
const&, std::size_t)
162 write_packet_impl<Packet>{
164 std::make_shared<Packet>(force_move(packet))
170 strand_type
const& strand()
const {
174 strand_type& strand() {
178 raw_strand_type
const& raw_strand()
const {
182 raw_strand_type& raw_strand() {
186 bool in_strand()
const {
187 return raw_strand().running_in_this_thread();
190 template<
typename CompletionToken>
192 close(CompletionToken&& token) {
196 void(error_code
const&)
205 void set_bulk_write(
bool val) {
215 std::enable_if_t<!std::is_same_v<std::decay_t<T>, this_type>>* =
nullptr
218 stream(T&& t, Args&&... args)
219 :nl_{std::forward<T>(t), std::forward<Args>(args)...}
221#if defined(ASYNC_MQTT_USE_WS)
222 if constexpr(is_ws<next_layer_type>::value) {
225 bs::websocket::stream_base::decorator(
226 [](bs::websocket::request_type& req) {
227 req.set(
"Sec-WebSocket-Protocol",
"mqtt");
235 struct read_packet_impl {
237 std::size_t received = 0;
238 std::uint32_t mul = 1;
239 std::uint32_t rl = 0;
240 shared_ptr_array spa =
nullptr;
241 this_type_sp life_keeper = strm.shared_from_this();
242 enum { dispatch, header, remaining_length, complete } state = dispatch;
244 template <
typename Self>
260 BOOST_ASSERT(strm.in_strand());
262 auto address = &strm.header_remaining_length_buf_[received];
266 as::buffer(address, 1),
279 template <
typename Self>
282 error_code
const& ec,
283 std::size_t bytes_transferred
285 (void)bytes_transferred;
287 BOOST_ASSERT(strm.in_strand());
289 self.complete(ec, buffer{});
295 BOOST_ASSERT(bytes_transferred == 1);
296 state = remaining_length;
300 auto address = &strm.header_remaining_length_buf_[received];
304 as::buffer(address, 1),
312 case remaining_length:
313 BOOST_ASSERT(bytes_transferred == 1);
315 if (strm.header_remaining_length_buf_[received - 1] & 0b10000000) {
318 ASYNC_MQTT_LOG(
"mqtt_impl", warning)
319 << ASYNC_MQTT_ADD_VALUE(address,
this)
320 <<
"out of size remaining length";
322 sys::errc::make_error_code(sys::errc::protocol_error),
327 rl += (strm.header_remaining_length_buf_[received - 1] & 0b01111111) * mul;
329 auto address = &strm.header_remaining_length_buf_[received];
333 as::buffer(address, 1),
342 rl += (strm.header_remaining_length_buf_[received - 1] & 0b01111111) * mul;
344 spa = make_shared_ptr_array(received + rl);
346 strm.header_remaining_length_buf_.data(),
347 strm.header_remaining_length_buf_.data() + received, spa.get()
351 auto ptr = spa.get();
352 self.complete(ec, buffer{ptr, ptr + received + rl, force_move(spa)});
357 auto address = &spa[std::ptrdiff_t(received)];
361 as::buffer(address, rl),
371 auto ptr = spa.get();
372 self.complete(ec, buffer{ptr, ptr + received + rl, force_move(spa)});
381 template <
typename Packet>
382 struct write_packet_impl {
384 std::shared_ptr<Packet> packet;
385 std::size_t size = packet->size();
386 this_type_sp life_keeper = strm.shared_from_this();
387 enum { dispatch, post, write, bulk_write, complete } state = dispatch;
389 template <
typename Self>
405 BOOST_ASSERT(strm.in_strand());
407 auto& a_packet{*packet};
408 if (!a_strm.bulk_write_ || a_strm.queue_.immediate_executable()) {
413 auto cbs = a_packet.const_buffer_sequence();
414 std::copy(cbs.begin(), cbs.end(), std::back_inserter(a_strm.storing_cbs_));
424 BOOST_ASSERT(strm.in_strand());
425 strm.queue_.start_work();
426 if (strm.lowest_layer().is_open()) {
429 auto& a_packet{*packet};
432 a_packet.const_buffer_sequence(),
447 errc::make_error_code(errc::connection_reset),
455 BOOST_ASSERT(strm.in_strand());
456 strm.queue_.start_work();
457 if (strm.lowest_layer().is_open()) {
460 if (a_strm.storing_cbs_.empty()) {
468 errc::make_error_code(errc::success),
475 a_strm.sending_cbs_ = force_move(a_strm.storing_cbs_);
494 errc::make_error_code(errc::connection_reset),
507 template <
typename Self>
510 error_code
const& ec,
511 std::size_t bytes_transferred
513 BOOST_ASSERT(strm.in_strand());
515 strm.queue_.stop_work();
520 [&a_strm,wp = a_strm.weak_from_this()] {
521 if (auto sp = wp.lock()) {
522 a_strm.queue_.poll_one();
527 self.complete(ec, bytes_transferred);
532 strm.queue_.stop_work();
533 strm.sending_cbs_.clear();
538 [&a_strm, wp = a_strm.weak_from_this()] {
539 if (auto sp = wp.lock()) {
540 a_strm.queue_.poll_one();
545 self.complete(ec, size);
562 this_type_sp life_keeper = strm.shared_from_this();
564 template <
typename Self>
568 BOOST_ASSERT(state == dispatch);
583#if defined(ASYNC_MQTT_USE_WS)
584 template <
typename Self,
typename Stream>
587 error_code
const& ec,
589 std::reference_wrapper<Stream> stream
591 BOOST_ASSERT(strm.in_strand());
592 if constexpr(is_ws<Stream>::value) {
593 BOOST_ASSERT(state == complete);
595 if (ec == bs::websocket::error::closed) {
596 ASYNC_MQTT_LOG(
"mqtt_impl", info)
597 << ASYNC_MQTT_ADD_VALUE(address,
this)
598 <<
"ws async_read (for close) success";
601 ASYNC_MQTT_LOG(
"mqtt_impl", info)
602 << ASYNC_MQTT_ADD_VALUE(address,
this)
603 <<
"ws async_read (for close):" << ec.message();
613 std::ref(stream.get().next_layer())
620 auto buffer = std::make_shared<bs::flat_buffer>();
621 stream.get().async_read(
639 template <
typename Self,
typename Stream>
642 error_code
const& ec,
643 std::reference_wrapper<Stream> stream
645 BOOST_ASSERT(strm.in_strand());
648#if defined(ASYNC_MQTT_USE_WS)
649 if constexpr(is_ws<Stream>::value) {
650 if (stream.get().is_open()) {
653 stream.get().async_close(
654 bs::websocket::close_code::none,
673 std::ref(stream.get().next_layer())
681 if constexpr(is_tls<Stream>::value) {
683 ASYNC_MQTT_LOG(
"mqtt_impl", info)
684 << ASYNC_MQTT_ADD_VALUE(address,
this)
685 <<
"TLS async_shutdown start with timeout";
686 auto tim = std::make_shared<as::steady_timer>(a_strm.raw_strand_, shutdown_timeout);
690 [
this, &next_layer = stream.get().next_layer()] (error_code
const& ec) {
692 ASYNC_MQTT_LOG(
"mqtt_impl", info)
693 << ASYNC_MQTT_ADD_VALUE(address, this)
694 <<
"TLS async_shutdown timeout";
696 next_layer.close(ec);
701 stream.get().async_shutdown(
709 std::ref(stream.get().next_layer())
716 if (stream.get().is_open()) {
717 ASYNC_MQTT_LOG(
"mqtt_impl", info)
718 << ASYNC_MQTT_ADD_VALUE(address,
this)
720 stream.get().close(ec);
723 ASYNC_MQTT_LOG(
"mqtt_impl", info)
724 << ASYNC_MQTT_ADD_VALUE(address,
this)
725 <<
"TCP already closed";
727 strm.storing_cbs_.clear();
728 strm.sending_cbs_.clear();
733#if defined(ASYNC_MQTT_USE_WS)
734 if constexpr(is_ws<Stream>::value) {
736 ASYNC_MQTT_LOG(
"mqtt_impl", info)
737 << ASYNC_MQTT_ADD_VALUE(address,
this)
738 <<
"ws async_close:" << ec.message();
747 std::ref(stream.get().next_layer())
755 auto buffer = std::make_shared<bs::flat_buffer>();
756 stream.get().async_read(
783 raw_strand_type raw_strand_{nl_.get_executor()};
784 strand_type strand_{as::any_io_executor{raw_strand_}};
786 static_vector<char, 5> header_remaining_length_buf_ = static_vector<char, 5>(5);
787 std::vector<as::const_buffer> storing_cbs_;
788 std::vector<as::const_buffer> sending_cbs_;
789 bool bulk_write_ =
false;
Definition packet_variant.hpp:49