mqtt_cpp
ws_endpoint.hpp
Go to the documentation of this file.
1 // Copyright Takatoshi Kondo 2016
2 //
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 
7 #if !defined(MQTT_WS_ENDPOINT_HPP)
8 #define MQTT_WS_ENDPOINT_HPP
9 
10 #include <boost/beast/websocket.hpp>
11 #include <boost/beast/core/flat_buffer.hpp>
12 #include <boost/asio/bind_executor.hpp>
13 
14 #include <mqtt/namespace.hpp>
16 #include <mqtt/move.hpp>
17 #include <mqtt/attributes.hpp>
18 #include <mqtt/string_view.hpp>
19 #include <mqtt/error_code.hpp>
20 
21 namespace MQTT_NS {
22 
23 namespace as = boost::asio;
24 
25 template <typename Socket, typename Strand>
26 class ws_endpoint : public socket {
27 public:
28  template <typename... Args>
29  ws_endpoint(as::io_context& ioc, Args&&... args)
30  :ws_(ioc, std::forward<Args>(args)...),
31  strand_(ioc) {
32  ws_.binary(true);
33  ws_.set_option(
34  boost::beast::websocket::stream_base::decorator(
35  [](boost::beast::websocket::request_type& req) {
36  req.set("Sec-WebSocket-Protocol", "mqtt");
37  }
38  )
39  );
40  }
41 
43  as::mutable_buffer buffers,
44  std::function<void(error_code, std::size_t)> handler
45  ) override final {
46  auto req_size = as::buffer_size(buffers);
47 
48  using beast_read_handler_t =
49  std::function<void(error_code ec, std::shared_ptr<void>)>;
50 
51  std::shared_ptr<beast_read_handler_t> beast_read_handler;
52  if (req_size <= buffer_.size()) {
53  as::buffer_copy(buffers, buffer_.data(), req_size);
54  buffer_.consume(req_size);
55  handler(boost::system::errc::make_error_code(boost::system::errc::success), req_size);
56  return;
57  }
58 
59  beast_read_handler.reset(
60  new beast_read_handler_t(
61  [this, req_size, buffers, handler = force_move(handler)]
62  (error_code ec, std::shared_ptr<void> const& v) mutable {
63  if (ec) {
64  force_move(handler)(ec, 0);
65  return;
66  }
67  if (!ws_.got_binary()) {
68  buffer_.consume(buffer_.size());
69  force_move(handler)
70  (boost::system::errc::make_error_code(boost::system::errc::bad_message), 0);
71  return;
72  }
73  if (req_size > buffer_.size()) {
74  auto beast_read_handler = std::static_pointer_cast<beast_read_handler_t>(v);
75  ws_.async_read(
76  buffer_,
77  as::bind_executor(
78  strand_,
79  [beast_read_handler]
80  (error_code ec, std::size_t) {
81  (*beast_read_handler)(ec, beast_read_handler);
82  }
83  )
84  );
85  return;
86  }
87  as::buffer_copy(buffers, buffer_.data(), req_size);
88  buffer_.consume(req_size);
89  force_move(handler)(boost::system::errc::make_error_code(boost::system::errc::success), req_size);
90  }
91  )
92  );
93  ws_.async_read(
94  buffer_,
95  as::bind_executor(
96  strand_,
97  [beast_read_handler]
98  (error_code ec, std::size_t) {
99  (*beast_read_handler)(ec, beast_read_handler);
100  }
101  )
102  );
103  }
104 
106  std::vector<as::const_buffer> buffers,
107  std::function<void(error_code, std::size_t)> handler
108  ) override final {
109  ws_.async_write(
110  buffers,
111  as::bind_executor(
112  strand_,
113  force_move(handler)
114  )
115  );
116  }
117 
119  std::vector<as::const_buffer> buffers,
121  ) override final {
122  ws_.write(buffers, ec);
123  return as::buffer_size(buffers);
124  }
125 
126  MQTT_ALWAYS_INLINE void post(std::function<void()> handler) override final {
127  as::post(
128  strand_,
129  force_move(handler)
130  );
131  }
132 
133 #if BOOST_VERSION >= 107000
134 
135  MQTT_ALWAYS_INLINE as::ip::tcp::socket::lowest_layer_type& lowest_layer() override final {
136  return boost::beast::get_lowest_layer(ws_);
137  }
138 
139 #else // BOOST_VERSION >= 107000
140 
141  MQTT_ALWAYS_INLINE as::ip::tcp::socket::lowest_layer_type& lowest_layer() override final {
142  return ws_.lowest_layer();
143  }
144 
145 #endif // BOOST_VERSION >= 107000
146 
147  MQTT_ALWAYS_INLINE any native_handle() override final {
148  return next_layer().native_handle();
149  }
150 
152  ws_.close(boost::beast::websocket::close_code::normal, ec);
153  if (ec) return;
154  do {
155  boost::beast::flat_buffer buffer;
156  ws_.read(buffer, ec);
157  } while (!ec);
158  if (ec != boost::beast::websocket::error::closed) return;
159  ec = boost::system::errc::make_error_code(boost::system::errc::success);
160  }
161 
162 #if BOOST_VERSION < 107400 || defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
163  MQTT_ALWAYS_INLINE as::executor get_executor() override final {
164  return lowest_layer().get_executor();
165  }
166 #else // BOOST_VERSION < 107400 || defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
167  MQTT_ALWAYS_INLINE as::any_io_executor get_executor() override final {
168  return lowest_layer().get_executor();
169  }
170 #endif // BOOST_VERSION < 107400 || defined(BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT)
171 
172  typename boost::beast::websocket::stream<Socket>::next_layer_type& next_layer() {
173  return ws_.next_layer();
174  }
175 
176  template <typename T>
177  void set_option(T&& t) {
178  ws_.set_option(std::forward<T>(t));
179  }
180 
181  template <typename ConstBufferSequence, typename AcceptHandler>
183  ConstBufferSequence const& buffers,
184  AcceptHandler&& handler) {
185  ws_.async_accept(buffers, std::forward<AcceptHandler>(handler));
186  }
187 
188  template<typename ConstBufferSequence, typename ResponseDecorator, typename AcceptHandler>
190  ConstBufferSequence const& buffers,
191  ResponseDecorator const& decorator,
192  AcceptHandler&& handler) {
193  ws_.async_accept_ex(buffers, decorator, std::forward<AcceptHandler>(handler));
194  }
195 
196  template <typename... Args>
197  void async_handshake(Args&& ... args) {
198  ws_.async_handshake(std::forward<Args>(args)...);
199  }
200 
201  template <typename... Args>
202  void handshake(Args&& ... args) {
203  ws_.handshake(std::forward<Args>(args)...);
204  }
205 
206  template <typename ConstBufferSequence>
207  std::size_t write(
208  ConstBufferSequence const& buffers) {
209  ws_.write(buffers);
210  return as::buffer_size(buffers);
211  }
212 
213 private:
214  boost::beast::websocket::stream<Socket> ws_;
215  boost::beast::flat_buffer buffer_;
216  Strand strand_;
217 };
218 
219 } // namespace MQTT_NS
220 
221 #endif // MQTT_WS_ENDPOINT_HPP
#define MQTT_ALWAYS_INLINE
Definition: attributes.hpp:18
buffer that has string_view interface This class provides string_view interface. This class hold stri...
Definition: buffer.hpp:30
Definition: type_erased_socket.hpp:22
Definition: ws_endpoint.hpp:26
ws_endpoint(as::io_context &ioc, Args &&... args)
Definition: ws_endpoint.hpp:29
MQTT_ALWAYS_INLINE as::executor get_executor() override final
Definition: ws_endpoint.hpp:163
MQTT_ALWAYS_INLINE void async_read(as::mutable_buffer buffers, std::function< void(error_code, std::size_t)> handler) override final
Definition: ws_endpoint.hpp:42
MQTT_ALWAYS_INLINE void close(boost::system::error_code &ec) override final
Definition: ws_endpoint.hpp:151
void handshake(Args &&... args)
Definition: ws_endpoint.hpp:202
MQTT_ALWAYS_INLINE as::ip::tcp::socket::lowest_layer_type & lowest_layer() override final
Definition: ws_endpoint.hpp:141
void async_accept_ex(ConstBufferSequence const &buffers, ResponseDecorator const &decorator, AcceptHandler &&handler)
Definition: ws_endpoint.hpp:189
boost::beast::websocket::stream< Socket >::next_layer_type & next_layer()
Definition: ws_endpoint.hpp:172
void set_option(T &&t)
Definition: ws_endpoint.hpp:177
MQTT_ALWAYS_INLINE any native_handle() override final
Definition: ws_endpoint.hpp:147
void async_handshake(Args &&... args)
Definition: ws_endpoint.hpp:197
MQTT_ALWAYS_INLINE void post(std::function< void()> handler) override final
Definition: ws_endpoint.hpp:126
std::size_t write(ConstBufferSequence const &buffers)
Definition: ws_endpoint.hpp:207
MQTT_ALWAYS_INLINE void async_write(std::vector< as::const_buffer > buffers, std::function< void(error_code, std::size_t)> handler) override final
Definition: ws_endpoint.hpp:105
MQTT_ALWAYS_INLINE std::size_t write(std::vector< as::const_buffer > buffers, boost::system::error_code &ec) override final
Definition: ws_endpoint.hpp:118
void async_accept(ConstBufferSequence const &buffers, AcceptHandler &&handler)
Definition: ws_endpoint.hpp:182
Definition: any.hpp:27
boost::system::error_code error_code
Definition: error_code.hpp:16
constexpr std::remove_reference_t< T > && force_move(T &&t)
Definition: move.hpp:20
Definition: buffer.hpp:242
const_buffer buffer(MQTT_NS::buffer const &data)
create boost::asio::const_buffer from the MQTT_NS::buffer boost::asio::const_buffer is a kind of view...
Definition: buffer.hpp:253