7 #if !defined(MQTT_WS_ENDPOINT_HPP)
8 #define MQTT_WS_ENDPOINT_HPP
10 #include <boost/beast/websocket.hpp>
11 #include <boost/beast/core/flat_buffer.hpp>
12 #include <boost/asio/bind_executor.hpp>
25 template <
typename Socket,
typename Strand>
28 template <
typename... Args>
30 :ws_(ioc, std::forward<Args>(args)...),
34 boost::beast::websocket::stream_base::decorator(
35 [](boost::beast::websocket::request_type& req) {
36 req.set(
"Sec-WebSocket-Protocol",
"mqtt");
43 as::mutable_buffer buffers,
44 std::function<
void(
error_code, std::size_t)> handler
46 auto req_size = as::buffer_size(buffers);
48 using beast_read_handler_t =
49 std::function<void(
error_code ec, std::shared_ptr<void>)>;
51 std::shared_ptr<beast_read_handler_t> beast_read_handler;
52 if (req_size <= buffer_.size()) {
53 as::buffer_copy(buffers, buffer_.data(), req_size);
54 buffer_.consume(req_size);
55 handler(boost::system::errc::make_error_code(boost::system::errc::success), req_size);
59 beast_read_handler.reset(
60 new beast_read_handler_t(
61 [
this, req_size, buffers, handler =
force_move(handler)]
62 (
error_code ec, std::shared_ptr<void>
const& v)
mutable {
67 if (!ws_.got_binary()) {
68 buffer_.consume(buffer_.size());
70 (boost::system::errc::make_error_code(boost::system::errc::bad_message), 0);
73 if (req_size > buffer_.size()) {
74 auto beast_read_handler = std::static_pointer_cast<beast_read_handler_t>(v);
81 (*beast_read_handler)(ec, beast_read_handler);
87 as::buffer_copy(buffers, buffer_.data(), req_size);
88 buffer_.consume(req_size);
89 force_move(handler)(boost::system::errc::make_error_code(boost::system::errc::success), req_size);
99 (*beast_read_handler)(ec, beast_read_handler);
106 std::vector<as::const_buffer> buffers,
107 std::function<
void(
error_code, std::size_t)> handler
119 std::vector<as::const_buffer> buffers,
122 ws_.write(buffers, ec);
123 return as::buffer_size(buffers);
133 #if BOOST_VERSION >= 107000
136 return boost::beast::get_lowest_layer(ws_);
142 return ws_.lowest_layer();
152 ws_.close(boost::beast::websocket::close_code::normal, ec);
155 boost::beast::flat_buffer
buffer;
158 if (ec != boost::beast::websocket::error::closed)
return;
159 ec = boost::system::errc::make_error_code(boost::system::errc::success);
162 #if BOOST_VERSION < 107400 || defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
172 typename boost::beast::websocket::stream<Socket>::next_layer_type&
next_layer() {
173 return ws_.next_layer();
176 template <
typename T>
178 ws_.set_option(std::forward<T>(t));
181 template <
typename ConstBufferSequence,
typename AcceptHandler>
183 ConstBufferSequence
const& buffers,
184 AcceptHandler&& handler) {
185 ws_.async_accept(buffers, std::forward<AcceptHandler>(handler));
188 template<
typename ConstBufferSequence,
typename ResponseDecorator,
typename AcceptHandler>
190 ConstBufferSequence
const& buffers,
191 ResponseDecorator
const& decorator,
192 AcceptHandler&& handler) {
193 ws_.async_accept_ex(buffers, decorator, std::forward<AcceptHandler>(handler));
196 template <
typename... Args>
198 ws_.async_handshake(std::forward<Args>(args)...);
201 template <
typename... Args>
203 ws_.handshake(std::forward<Args>(args)...);
206 template <
typename ConstBufferSequence>
208 ConstBufferSequence
const& buffers) {
210 return as::buffer_size(buffers);
214 boost::beast::websocket::stream<Socket> ws_;
215 boost::beast::flat_buffer buffer_;
#define MQTT_ALWAYS_INLINE
Definition: attributes.hpp:18
buffer that has string_view interface This class provides string_view interface. This class hold stri...
Definition: buffer.hpp:30
Definition: type_erased_socket.hpp:22
Definition: ws_endpoint.hpp:26
ws_endpoint(as::io_context &ioc, Args &&... args)
Definition: ws_endpoint.hpp:29
MQTT_ALWAYS_INLINE as::executor get_executor() override final
Definition: ws_endpoint.hpp:163
MQTT_ALWAYS_INLINE void async_read(as::mutable_buffer buffers, std::function< void(error_code, std::size_t)> handler) override final
Definition: ws_endpoint.hpp:42
MQTT_ALWAYS_INLINE void close(boost::system::error_code &ec) override final
Definition: ws_endpoint.hpp:151
void handshake(Args &&... args)
Definition: ws_endpoint.hpp:202
MQTT_ALWAYS_INLINE as::ip::tcp::socket::lowest_layer_type & lowest_layer() override final
Definition: ws_endpoint.hpp:141
void async_accept_ex(ConstBufferSequence const &buffers, ResponseDecorator const &decorator, AcceptHandler &&handler)
Definition: ws_endpoint.hpp:189
boost::beast::websocket::stream< Socket >::next_layer_type & next_layer()
Definition: ws_endpoint.hpp:172
void set_option(T &&t)
Definition: ws_endpoint.hpp:177
MQTT_ALWAYS_INLINE any native_handle() override final
Definition: ws_endpoint.hpp:147
void async_handshake(Args &&... args)
Definition: ws_endpoint.hpp:197
MQTT_ALWAYS_INLINE void post(std::function< void()> handler) override final
Definition: ws_endpoint.hpp:126
std::size_t write(ConstBufferSequence const &buffers)
Definition: ws_endpoint.hpp:207
MQTT_ALWAYS_INLINE void async_write(std::vector< as::const_buffer > buffers, std::function< void(error_code, std::size_t)> handler) override final
Definition: ws_endpoint.hpp:105
MQTT_ALWAYS_INLINE std::size_t write(std::vector< as::const_buffer > buffers, boost::system::error_code &ec) override final
Definition: ws_endpoint.hpp:118
void async_accept(ConstBufferSequence const &buffers, AcceptHandler &&handler)
Definition: ws_endpoint.hpp:182
boost::system::error_code error_code
Definition: error_code.hpp:16
constexpr std::remove_reference_t< T > && force_move(T &&t)
Definition: move.hpp:20
Definition: buffer.hpp:242
const_buffer buffer(MQTT_NS::buffer const &data)
create boost::asio::const_buffer from the MQTT_NS::buffer boost::asio::const_buffer is a kind of view...
Definition: buffer.hpp:253