async_mqtt 9.0.1
Loading...
Searching...
No Matches
customized_websocket_stream.hpp
Go to the documentation of this file.
1// Copyright Takatoshi Kondo 2024
2//
3// Distributed under the Boost Software License, Version 1.0.
4// (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7#if !defined(ASYNC_MQTT_PREDEFINED_LAYER_CUSTOMIZED_WEBSOCKET_STREAM_HPP)
8#define ASYNC_MQTT_PREDEFINED_LAYER_CUSTOMIZED_WEBSOCKET_STREAM_HPP
9
10#include <boost/asio.hpp>
11#include <boost/beast/websocket/stream.hpp>
12
13#include <async_mqtt/util/stream_traits.hpp>
14#include <async_mqtt/util/log.hpp>
15
17
18namespace async_mqtt {
19
20namespace as = boost::asio;
21namespace bs = boost::beast;
22
31template <typename NextLayer>
32struct layer_customize<bs::websocket::stream<NextLayer>> {
33
34 // initialize
35
36 static void initialize(bs::websocket::stream<NextLayer>& stream) {
37 stream.binary(true);
38 stream.set_option(
39 bs::websocket::stream_base::decorator(
40 [](bs::websocket::request_type& req) {
41 req.set("Sec-WebSocket-Protocol", "mqtt");
42 }
43 )
44 );
45 }
46
47 // async_write
48
49 template <
50 typename ConstBufferSequence,
51 typename CompletionToken
52 >
53 static auto
54 async_write(
55 bs::websocket::stream<NextLayer>& stream,
56 ConstBufferSequence const& cbs,
57 CompletionToken&& token
58 ) {
59 return as::async_compose<
60 CompletionToken,
61 void(error_code const& ec, std::size_t size)
62 > (
63 async_write_impl<ConstBufferSequence>{
64 stream,
65 cbs
66 },
67 token,
68 stream
69 );
70 }
71
72 template <typename ConstBufferSequence>
73 struct async_write_impl {
74 bs::websocket::stream<NextLayer>& stream;
75 ConstBufferSequence const& cbs;
76
77 template <typename Self>
78 void operator()(
79 Self& self
80 ) {
81 stream.async_write(
82 cbs,
83 force_move(self)
84 );
85 }
86
87 template <typename Self>
88 void operator()(
89 Self& self,
90 error_code const& ec,
91 std::size_t size
92 ) {
93 self.complete(ec, size);
94 }
95 };
96
97 // async_close
98
99 template <
100 typename CompletionToken
101 >
102 static auto
103 async_close(
104 bs::websocket::stream<NextLayer>& stream,
105 CompletionToken&& token
106 ) {
107 return as::async_compose<
108 CompletionToken,
109 void(error_code const& ec)
110 > (
111 async_close_impl{
112 stream
113 },
114 token,
115 stream
116 );
117 }
118
119 struct async_close_impl {
120 bs::websocket::stream<NextLayer>& stream;
121
122 template <typename Self>
123 void operator()(
124 Self& self
125 ) {
126 stream.async_close(
127 bs::websocket::close_code::none,
128 force_move(self)
129 );
130 }
131
132 template <typename Self>
133 void operator()(
134 Self& self,
135 error_code const& ec
136 ) {
137 if (ec) {
138 self.complete(ec);
139 }
140 else {
141 auto buffer = std::make_shared<bs::flat_buffer>();
142 stream.async_read(
143 *buffer,
144 as::consign(
145 force_move(self),
146 buffer
147 )
148 );
149 }
150 }
151
152 template <typename Self>
153 void operator()(
154 Self& self,
155 error_code const& ec,
156 std::size_t /* size */
157 ) {
158 if (ec) {
159 if (ec == bs::websocket::error::closed) {
160 ASYNC_MQTT_LOG("mqtt_impl", info)
161 << "ws async_read (for close) success";
162 }
163 else {
164 ASYNC_MQTT_LOG("mqtt_impl", info)
165 << "ws async_read (for close):" << ec.message();
166 }
167 self.complete(ec);
168 }
169 else {
170 auto buffer = std::make_shared<bs::flat_buffer>();
171 stream.async_read(
172 *buffer,
173 as::consign(
174 force_move(self),
175 buffer
176 )
177 );
178 }
179 }
180 };
181};
182
183} // namespace async_mqtt
184
185#endif // ASYNC_MQTT_PREDEFINED_LAYER_CUSTOMIZED_WEBSOCKET_STREAM_HPP
buffer that has string_view interface and shared ownership This class is only for advanced usecase su...
Definition buffer.hpp:46
sys::error_code error_code
sys is a namespace alias of boost::sytem.
Definition error.hpp:56
@ info
info level api call is output
customization class template for underlying layer In order to adapt your layer to async_mqtt,...
Definition stream_traits.hpp:101