async_mqtt 4.1.0
Loading...
Searching...
No Matches
ws_fixed_size_async_read.hpp
1// Copyright Takatoshi Kondo 2022
2//
3// Distributed under the Boost Software License, Version 1.0.
4// (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7#if !defined(ASYNC_MQTT_WS_FIXED_SIZE_ASYNC_READ_HPP)
8#define ASYNC_MQTT_WS_FIXED_SIZE_ASYNC_READ_HPP
9
10#include <utility>
11#include <type_traits>
12
13#include <boost/asio/buffer.hpp>
14#include <boost/asio/read.hpp>
15#include <boost/asio/compose.hpp>
16#include <boost/system/error_code.hpp>
17#include <boost/beast/websocket/stream.hpp>
18
19namespace async_mqtt {
20
21namespace as = boost::asio;
22namespace bs = boost::beast;
23
24namespace detail {
25
26template <typename Stream, typename MutableBufferSequence>
27struct async_read_impl {
28 Stream& stream;
29 MutableBufferSequence mb;
30 std::size_t received = 0;
31 as::executor_work_guard<typename Stream::executor_type> wg{stream.get_executor()};
32
33 template <typename Self>
34 void operator()(
35 Self& self,
36 boost::system::error_code ec = boost::system::error_code{},
37 std::size_t bytes_transferred = 0
38 ) {
39 if (ec) {
40 self.complete(ec, received);
41 return;
42 }
43 received += bytes_transferred;
44 mb += bytes_transferred;
45 if (mb.size() == 0) {
46 self.complete(ec, received);
47 }
48 else {
49 auto a_mb{force_move(mb)};
50 auto exe = as::get_associated_executor(self);
51 stream.async_read_some(
52 a_mb,
53 as::bind_executor(
54 exe,
55 force_move(self)
56 )
57 );
58 }
59 }
60};
61
62} // namespace detail
63
64using as::async_read;
65
66template <
67 typename NextLayer,
68 typename MutableBufferSequence,
69 typename CompletionToken,
70 typename std::enable_if_t<
71 as::is_mutable_buffer_sequence<MutableBufferSequence>::value
72 >* = nullptr
73>
74auto
75async_read(
76 bs::websocket::stream<NextLayer>& stream,
77 MutableBufferSequence const& mb,
78 CompletionToken&& token
79) {
80 return
81 as::async_compose<
82 CompletionToken,
83 void(boost::system::error_code const&, std::size_t)
84 >(
85 detail::async_read_impl<bs::websocket::stream<NextLayer>, MutableBufferSequence>{
86 stream,
87 mb
88 },
89 token
90 );
91}
92
93template <
94 typename NextLayer,
95 typename ConstBufferSequence,
96 typename CompletionToken,
97 typename std::enable_if_t<
98 as::is_const_buffer_sequence<ConstBufferSequence>::value
99 >* = nullptr
100>
101auto
102async_write(
103 bs::websocket::stream<NextLayer>& stream,
104 ConstBufferSequence const& cbs,
105 CompletionToken&& token
106) {
107 return stream.async_write(cbs, std::forward<CompletionToken>(token));
108}
109
110} // namespace async_mqtt
111
112#endif // ASYNC_MQTT_WS_FIXED_SIZE_ASYNC_READ_HPP