mqtt_cpp
server.hpp
Go to the documentation of this file.
1 // Copyright Takatoshi Kondo 2017
2 //
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 
7 #if !defined(MQTT_SERVER_HPP)
8 #define MQTT_SERVER_HPP
9 
10 #include <mqtt/variant.hpp> // should be top to configure variant limit
11 
12 #include <memory>
13 #include <boost/asio.hpp>
14 
15 #include <mqtt/namespace.hpp>
16 #include <mqtt/tcp_endpoint.hpp>
17 
18 #include <mqtt/endpoint.hpp>
19 #include <mqtt/null_strand.hpp>
20 #include <mqtt/move.hpp>
22 
23 namespace MQTT_NS {
24 
25 namespace as = boost::asio;
26 
27 template <typename Mutex, template<typename...> class LockGuard, std::size_t PacketIdBytes>
28 class server_endpoint : public endpoint<Mutex, LockGuard, PacketIdBytes> {
29 public:
31 protected:
32  void on_pre_send() noexcept override {}
33  void on_close() noexcept override {}
34  void on_error(error_code /*ec*/) noexcept override {}
35 protected:
36  ~server_endpoint() = default;
37 };
38 
39 template <
40  typename Strand = as::io_context::strand,
41  typename Mutex = std::mutex,
42  template<typename...> class LockGuard = std::lock_guard,
43  std::size_t PacketIdBytes = 2
44 >
45 class server {
46 public:
49 
54  using accept_handler = std::function<void(std::shared_ptr<endpoint_t> ep)>;
55 
60  using error_handler = std::function<void(error_code ec)>;
61 
62  template <typename AsioEndpoint, typename AcceptorConfig>
64  AsioEndpoint&& ep,
65  as::io_context& ioc_accept,
66  as::io_context& ioc_con,
67  AcceptorConfig&& config)
68  : ep_(std::forward<AsioEndpoint>(ep)),
69  ioc_accept_(ioc_accept),
70  ioc_con_(ioc_con),
71  acceptor_(as::ip::tcp::acceptor(ioc_accept_, ep_)),
72  config_(std::forward<AcceptorConfig>(config)) {
73  config_(acceptor_.value());
74  }
75 
76  template <typename AsioEndpoint>
78  AsioEndpoint&& ep,
79  as::io_context& ioc_accept,
80  as::io_context& ioc_con)
81  : server(std::forward<AsioEndpoint>(ep), ioc_accept, ioc_con, [](as::ip::tcp::acceptor&) {}) {}
82 
83  template <typename AsioEndpoint, typename AcceptorConfig>
85  AsioEndpoint&& ep,
86  as::io_context& ioc,
87  AcceptorConfig&& config)
88  : server(std::forward<AsioEndpoint>(ep), ioc, ioc, std::forward<AcceptorConfig>(config)) {}
89 
90  template <typename AsioEndpoint>
92  AsioEndpoint&& ep,
93  as::io_context& ioc)
94  : server(std::forward<AsioEndpoint>(ep), ioc, ioc, [](as::ip::tcp::acceptor&) {}) {}
95 
96  void listen() {
97  close_request_ = false;
98 
99  if (!acceptor_) {
100  try {
101  acceptor_.emplace(ioc_accept_, ep_);
102  config_(acceptor_.value());
103  }
104  catch (boost::system::system_error const& e) {
105  as::post(
106  ioc_accept_,
107  [this, ec = e.code()] {
108  if (h_error_) h_error_(ec);
109  }
110  );
111  return;
112  }
113  }
114  do_accept();
115  }
116 
117  unsigned short port() const { return acceptor_.value().local_endpoint().port(); }
118 
119  void close() {
120  close_request_ = true;
121  acceptor_.reset();
122  }
123 
125  h_accept_ = force_move(h);
126  }
127 
133  h_error_ = force_move(h);
134  }
135 
144  version_ = version;
145  }
146 
151  as::io_context& ioc_con() const {
152  return ioc_con_;
153  }
154 
159  as::io_context& ioc_accept() const {
160  return ioc_accept_;
161  }
162 
163 private:
164  void do_accept() {
165  if (close_request_) return;
166  auto socket = std::make_shared<socket_t>(ioc_con_);
167  acceptor_.value().async_accept(
168  socket->lowest_layer(),
169  [this, socket]
170  (error_code ec) mutable {
171  if (ec) {
172  acceptor_.reset();
173  if (h_error_) h_error_(ec);
174  return;
175  }
176  auto sp = std::make_shared<endpoint_t>(ioc_con_, force_move(socket), version_);
177  if (h_accept_) h_accept_(force_move(sp));
178  do_accept();
179  }
180  );
181  }
182 
183 private:
184  as::ip::tcp::endpoint ep_;
185  as::io_context& ioc_accept_;
186  as::io_context& ioc_con_;
187  optional<as::ip::tcp::acceptor> acceptor_;
188  std::function<void(as::ip::tcp::acceptor&)> config_;
189  bool close_request_{false};
190  accept_handler h_accept_;
191  error_handler h_error_;
193 };
194 
195 #if defined(MQTT_USE_TLS)
196 
197 template <
198  typename Strand = as::io_context::strand,
199  typename Mutex = std::mutex,
200  template<typename...> class LockGuard = std::lock_guard,
201  std::size_t PacketIdBytes = 2
202 >
203 class server_tls {
204 public:
205  using socket_t = tcp_endpoint<tls::stream<as::ip::tcp::socket>, Strand>;
206  using endpoint_t = callable_overlay<server_endpoint<Mutex, LockGuard, PacketIdBytes>>;
207 
212  using accept_handler = std::function<void(std::shared_ptr<endpoint_t> ep)>;
213 
218  using error_handler = std::function<void(error_code ec)>;
219 
220  template <typename AsioEndpoint, typename AcceptorConfig>
221  server_tls(
222  AsioEndpoint&& ep,
223  tls::context&& ctx,
224  as::io_context& ioc_accept,
225  as::io_context& ioc_con,
226  AcceptorConfig&& config)
227  : ep_(std::forward<AsioEndpoint>(ep)),
228  ioc_accept_(ioc_accept),
229  ioc_con_(ioc_con),
230  acceptor_(as::ip::tcp::acceptor(ioc_accept_, ep_)),
231  config_(std::forward<AcceptorConfig>(config)),
232  ctx_(force_move(ctx)) {
233  config_(acceptor_.value());
234  }
235 
236  template <typename AsioEndpoint>
237  server_tls(
238  AsioEndpoint&& ep,
239  tls::context&& ctx,
240  as::io_context& ioc_accept,
241  as::io_context& ioc_con)
242  : server_tls(std::forward<AsioEndpoint>(ep), force_move(ctx), ioc_accept, ioc_con, [](as::ip::tcp::acceptor&) {}) {}
243 
244  template <typename AsioEndpoint, typename AcceptorConfig>
245  server_tls(
246  AsioEndpoint&& ep,
247  tls::context&& ctx,
248  as::io_context& ioc,
249  AcceptorConfig&& config)
250  : server_tls(std::forward<AsioEndpoint>(ep), force_move(ctx), ioc, ioc, std::forward<AcceptorConfig>(config)) {}
251 
252  template <typename AsioEndpoint>
253  server_tls(
254  AsioEndpoint&& ep,
255  tls::context&& ctx,
256  as::io_context& ioc)
257  : server_tls(std::forward<AsioEndpoint>(ep), force_move(ctx), ioc, ioc, [](as::ip::tcp::acceptor&) {}) {}
258 
259  void listen() {
260  close_request_ = false;
261 
262  if (!acceptor_) {
263  try {
264  acceptor_.emplace(ioc_accept_, ep_);
265  config_(acceptor_.value());
266  }
267  catch (boost::system::system_error const& e) {
268  as::post(
269  ioc_accept_,
270  [this, ec = e.code()] {
271  if (h_error_) h_error_(ec);
272  }
273  );
274  return;
275  }
276  }
277  do_accept();
278  }
279 
280  unsigned short port() const { return acceptor_.value().local_endpoint().port(); }
281 
282  void close() {
283  close_request_ = true;
284  acceptor_.reset();
285  }
286 
287  void set_accept_handler(accept_handler h = accept_handler()) {
288  h_accept_ = force_move(h);
289  }
290 
295  void set_error_handler(error_handler h = error_handler()) {
296  h_error_ = force_move(h);
297  }
298 
306  void set_protocol_version(protocol_version version) {
307  version_ = version;
308  }
309 
314  as::io_context& ioc_con() const {
315  return ioc_con_;
316  }
317 
322  as::io_context& ioc_accept() const {
323  return ioc_accept_;
324  }
325 
334  void set_underlying_connect_timeout(std::chrono::steady_clock::duration timeout) {
335  underlying_connect_timeout_ = force_move(timeout);
336  }
337 
342  tls::context& get_ssl_context() {
343  return ctx_;
344  }
345 
350  tls::context const& get_ssl_context() const {
351  return ctx_;
352  }
353 
354 private:
355  void do_accept() {
356  if (close_request_) return;
357  auto socket = std::make_shared<socket_t>(ioc_con_, ctx_);
358  auto ps = socket.get();
359  acceptor_.value().async_accept(
360  ps->lowest_layer(),
361  [this, socket = force_move(socket)]
362  (error_code ec) mutable {
363  if (ec) {
364  acceptor_.reset();
365  if (h_error_) h_error_(ec);
366  return;
367  }
368  auto underlying_finished = std::make_shared<bool>(false);
369  auto tim = std::make_shared<as::steady_timer>(ioc_con_);
370  tim->expires_after(underlying_connect_timeout_);
371  tim->async_wait(
372  [socket, tim, underlying_finished]
373  (error_code ec) {
374  if (*underlying_finished) return;
375  if (ec) return;
376  socket->post(
377  [socket] {
378  boost::system::error_code close_ec;
379  socket->lowest_layer().close(close_ec);
380  }
381  );
382  }
383  );
384  auto ps = socket.get();
385  ps->async_handshake(
386  tls::stream_base::server,
387  [this, socket = force_move(socket), tim, underlying_finished]
388  (error_code ec) mutable {
389  *underlying_finished = true;
390  tim->cancel();
391  if (ec) {
392  return;
393  }
394  auto sp = std::make_shared<endpoint_t>(ioc_con_, force_move(socket), version_);
395  if (h_accept_) h_accept_(force_move(sp));
396  }
397  );
398  do_accept();
399  }
400  );
401  }
402 
403 private:
404  as::ip::tcp::endpoint ep_;
405  as::io_context& ioc_accept_;
406  as::io_context& ioc_con_;
407  optional<as::ip::tcp::acceptor> acceptor_;
408  std::function<void(as::ip::tcp::acceptor&)> config_;
409  bool close_request_{false};
410  accept_handler h_accept_;
411  error_handler h_error_;
412  tls::context ctx_;
413  protocol_version version_ = protocol_version::undetermined;
414  std::chrono::steady_clock::duration underlying_connect_timeout_ = std::chrono::seconds(10);
415 };
416 
417 #endif // defined(MQTT_USE_TLS)
418 
419 #if defined(MQTT_USE_WS)
420 
421 template <
422  typename Strand = as::io_context::strand,
423  typename Mutex = std::mutex,
424  template<typename...> class LockGuard = std::lock_guard,
425  std::size_t PacketIdBytes = 2
426 >
427 class server_ws {
428 public:
429  using socket_t = ws_endpoint<as::ip::tcp::socket, Strand>;
430  using endpoint_t = callable_overlay<server_endpoint<Mutex, LockGuard, PacketIdBytes>>;
431 
436  using accept_handler = std::function<void(std::shared_ptr<endpoint_t> ep)>;
437 
442  using error_handler = std::function<void(error_code ec)>;
443 
444  template <typename AsioEndpoint, typename AcceptorConfig>
445  server_ws(
446  AsioEndpoint&& ep,
447  as::io_context& ioc_accept,
448  as::io_context& ioc_con,
449  AcceptorConfig&& config)
450  : ep_(std::forward<AsioEndpoint>(ep)),
451  ioc_accept_(ioc_accept),
452  ioc_con_(ioc_con),
453  acceptor_(as::ip::tcp::acceptor(ioc_accept_, ep_)),
454  config_(std::forward<AcceptorConfig>(config)) {
455  config_(acceptor_.value());
456  }
457 
458  template <typename AsioEndpoint>
459  server_ws(
460  AsioEndpoint&& ep,
461  as::io_context& ioc_accept,
462  as::io_context& ioc_con)
463  : server_ws(std::forward<AsioEndpoint>(ep), ioc_accept, ioc_con, [](as::ip::tcp::acceptor&) {}) {}
464 
465  template <typename AsioEndpoint, typename AcceptorConfig>
466  server_ws(
467  AsioEndpoint&& ep,
468  as::io_context& ioc,
469  AcceptorConfig&& config)
470  : server_ws(std::forward<AsioEndpoint>(ep), ioc, ioc, std::forward<AcceptorConfig>(config)) {}
471 
472  template <typename AsioEndpoint>
473  server_ws(
474  AsioEndpoint&& ep,
475  as::io_context& ioc)
476  : server_ws(std::forward<AsioEndpoint>(ep), ioc, ioc, [](as::ip::tcp::acceptor&) {}) {}
477 
478  void listen() {
479  close_request_ = false;
480 
481  if (!acceptor_) {
482  try {
483  acceptor_.emplace(ioc_accept_, ep_);
484  config_(acceptor_.value());
485  }
486  catch (boost::system::system_error const& e) {
487  as::post(
488  ioc_accept_,
489  [this, ec = e.code()] {
490  if (h_error_) h_error_(ec);
491  }
492  );
493  return;
494  }
495  }
496  do_accept();
497  }
498 
499  unsigned short port() const { return acceptor_.value().local_endpoint().port(); }
500 
501  void close() {
502  close_request_ = true;
503  acceptor_.reset();
504  }
505 
506  void set_accept_handler(accept_handler h = accept_handler()) {
507  h_accept_ = force_move(h);
508  }
509 
514  void set_error_handler(error_handler h = error_handler()) {
515  h_error_ = force_move(h);
516  }
517 
525  void set_protocol_version(protocol_version version) {
526  version_ = version;
527  }
528 
533  as::io_context& ioc_con() const {
534  return ioc_con_;
535  }
536 
541  as::io_context& ioc_accept() const {
542  return ioc_accept_;
543  }
544 
553  void set_underlying_connect_timeout(std::chrono::steady_clock::duration timeout) {
554  underlying_connect_timeout_ = force_move(timeout);
555  }
556 
557 private:
558  void do_accept() {
559  if (close_request_) return;
560  auto socket = std::make_shared<socket_t>(ioc_con_);
561  auto ps = socket.get();
562  acceptor_.value().async_accept(
563  ps->next_layer(),
564  [this, socket = force_move(socket)]
565  (error_code ec) mutable {
566  if (ec) {
567  acceptor_.reset();
568  if (h_error_) h_error_(ec);
569  return;
570  }
571  auto underlying_finished = std::make_shared<bool>(false);
572  auto tim = std::make_shared<as::steady_timer>(ioc_con_);
573  tim->expires_after(underlying_connect_timeout_);
574  tim->async_wait(
575  [socket, tim, underlying_finished]
576  (error_code ec) {
577  if (*underlying_finished) return;
578  if (ec) return;
579  socket->post(
580  [socket] {
581  boost::system::error_code close_ec;
582  socket->lowest_layer().close(close_ec);
583  }
584  );
585  }
586  );
587 
588  auto sb = std::make_shared<boost::asio::streambuf>();
589  auto request = std::make_shared<boost::beast::http::request<boost::beast::http::string_body>>();
590  auto ps = socket.get();
591  boost::beast::http::async_read(
592  ps->next_layer(),
593  *sb,
594  *request,
595  [this, socket = force_move(socket), sb, request, tim, underlying_finished]
596  (error_code ec, std::size_t) mutable {
597  if (ec) {
598  *underlying_finished = true;
599  tim->cancel();
600  return;
601  }
602  if (!boost::beast::websocket::is_upgrade(*request)) {
603  *underlying_finished = true;
604  tim->cancel();
605  return;
606  }
607  auto ps = socket.get();
608 
609 #if BOOST_BEAST_VERSION >= 248
610 
611  auto it = request->find("Sec-WebSocket-Protocol");
612  if (it != request->end()) {
613  ps->set_option(
614  boost::beast::websocket::stream_base::decorator(
615  [name = it->name(), value = it->value()] // name is enum, value is boost::string_view
616  (boost::beast::websocket::response_type& res) {
617  // This lambda is called before the scope out point *1
618  res.set(name, value);
619  }
620  )
621  );
622  }
623  ps->async_accept(
624  *request,
625  [this, socket = force_move(socket), tim, underlying_finished]
626  (error_code ec) mutable {
627  *underlying_finished = true;
628  tim->cancel();
629  if (ec) {
630  return;
631  }
632  auto sp = std::make_shared<endpoint_t>(ioc_con_, force_move(socket), version_);
633  if (h_accept_) h_accept_(force_move(sp));
634  }
635  );
636 
637 #else // BOOST_BEAST_VERSION >= 248
638 
639  ps->async_accept_ex(
640  *request,
641  [request]
642  (boost::beast::websocket::response_type& m) {
643  auto it = request->find("Sec-WebSocket-Protocol");
644  if (it != request->end()) {
645  m.insert(it->name(), it->value());
646  }
647  },
648  [this, socket = force_move(socket), tim, underlying_finished]
649  (error_code ec) mutable {
650  *underlying_finished = true;
651  tim->cancel();
652  if (ec) {
653  return;
654  }
655  auto sp = std::make_shared<endpoint_t>(ioc_con_, force_move(socket), version_);
656  if (h_accept_) h_accept_(force_move(sp));
657  }
658  );
659 
660 #endif // BOOST_BEAST_VERSION >= 248
661 
662  // scope out point *1
663  }
664  );
665  do_accept();
666  }
667  );
668  }
669 
670 private:
671  as::ip::tcp::endpoint ep_;
672  as::io_context& ioc_accept_;
673  as::io_context& ioc_con_;
674  optional<as::ip::tcp::acceptor> acceptor_;
675  std::function<void(as::ip::tcp::acceptor&)> config_;
676  bool close_request_{false};
677  accept_handler h_accept_;
678  error_handler h_error_;
679  protocol_version version_ = protocol_version::undetermined;
680  std::chrono::steady_clock::duration underlying_connect_timeout_ = std::chrono::seconds(10);
681 };
682 
683 
684 #if defined(MQTT_USE_TLS)
685 
686 template <
687  typename Strand = as::io_context::strand,
688  typename Mutex = std::mutex,
689  template<typename...> class LockGuard = std::lock_guard,
690  std::size_t PacketIdBytes = 2
691 >
692 class server_tls_ws {
693 public:
694  using socket_t = ws_endpoint<tls::stream<as::ip::tcp::socket>, Strand>;
695  using endpoint_t = callable_overlay<server_endpoint<Mutex, LockGuard, PacketIdBytes>>;
696 
701  using accept_handler = std::function<void(std::shared_ptr<endpoint_t> ep)>;
702 
707  using error_handler = std::function<void(error_code ec)>;
708 
709  template <typename AsioEndpoint, typename AcceptorConfig>
710  server_tls_ws(
711  AsioEndpoint&& ep,
712  tls::context&& ctx,
713  as::io_context& ioc_accept,
714  as::io_context& ioc_con,
715  AcceptorConfig&& config)
716  : ep_(std::forward<AsioEndpoint>(ep)),
717  ioc_accept_(ioc_accept),
718  ioc_con_(ioc_con),
719  acceptor_(as::ip::tcp::acceptor(ioc_accept_, ep_)),
720  config_(std::forward<AcceptorConfig>(config)),
721  ctx_(force_move(ctx)) {
722  config_(acceptor_.value());
723  }
724 
725  template <typename AsioEndpoint>
726  server_tls_ws(
727  AsioEndpoint&& ep,
728  tls::context&& ctx,
729  as::io_context& ioc_accept,
730  as::io_context& ioc_con)
731  : server_tls_ws(std::forward<AsioEndpoint>(ep), force_move(ctx), ioc_accept, ioc_con, [](as::ip::tcp::acceptor&) {}) {}
732 
733  template <typename AsioEndpoint, typename AcceptorConfig>
734  server_tls_ws(
735  AsioEndpoint&& ep,
736  tls::context&& ctx,
737  as::io_context& ioc,
738  AcceptorConfig&& config)
739  : server_tls_ws(std::forward<AsioEndpoint>(ep), force_move(ctx), ioc, ioc, std::forward<AcceptorConfig>(config)) {}
740 
741  template <typename AsioEndpoint>
742  server_tls_ws(
743  AsioEndpoint&& ep,
744  tls::context&& ctx,
745  as::io_context& ioc)
746  : server_tls_ws(std::forward<AsioEndpoint>(ep), force_move(ctx), ioc, ioc, [](as::ip::tcp::acceptor&) {}) {}
747 
748  void listen() {
749  close_request_ = false;
750 
751  if (!acceptor_) {
752  try {
753  acceptor_.emplace(ioc_accept_, ep_);
754  config_(acceptor_.value());
755  }
756  catch (boost::system::system_error const& e) {
757  as::post(
758  ioc_accept_,
759  [this, ec = e.code()] {
760  if (h_error_) h_error_(ec);
761  }
762  );
763  return;
764  }
765  }
766  do_accept();
767  }
768 
769  unsigned short port() const { return acceptor_.value().local_endpoint().port(); }
770 
771  void close() {
772  close_request_ = true;
773  acceptor_.reset();
774  }
775 
776  void set_accept_handler(accept_handler h = accept_handler()) {
777  h_accept_ = force_move(h);
778  }
779 
784  void set_error_handler(error_handler h = error_handler()) {
785  h_error_ = force_move(h);
786  }
787 
795  void set_protocol_version(protocol_version version) {
796  version_ = version;
797  }
798 
803  as::io_context& ioc_con() const {
804  return ioc_con_;
805  }
806 
811  as::io_context& ioc_accept() const {
812  return ioc_accept_;
813  }
814 
823  void set_underlying_connect_timeout(std::chrono::steady_clock::duration timeout) {
824  underlying_connect_timeout_ = force_move(timeout);
825  }
826 
831  tls::context& get_ssl_context() {
832  return ctx_;
833  }
834 
839  tls::context const& get_ssl_context() const {
840  return ctx_;
841  }
842 
843 private:
844  void do_accept() {
845  if (close_request_) return;
846  auto socket = std::make_shared<socket_t>(ioc_con_, ctx_);
847  auto ps = socket.get();
848  acceptor_.value().async_accept(
849  ps->next_layer().next_layer(),
850  [this, socket = force_move(socket)]
851  (error_code ec) mutable {
852  if (ec) {
853  acceptor_.reset();
854  if (h_error_) h_error_(ec);
855  return;
856  }
857  auto underlying_finished = std::make_shared<bool>(false);
858  auto tim = std::make_shared<as::steady_timer>(ioc_con_);
859  tim->expires_after(underlying_connect_timeout_);
860  tim->async_wait(
861  [socket, tim, underlying_finished]
862  (error_code ec) {
863  if (*underlying_finished) return;
864  if (ec) return;
865  socket->post(
866  [socket] {
867  boost::system::error_code close_ec;
868  socket->lowest_layer().close(close_ec);
869  }
870  );
871  }
872  );
873 
874  auto ps = socket.get();
875  ps->next_layer().async_handshake(
876  tls::stream_base::server,
877  [this, socket = force_move(socket), tim, underlying_finished]
878  (error_code ec) mutable {
879  if (ec) {
880  *underlying_finished = true;
881  tim->cancel();
882  return;
883  }
884  auto sb = std::make_shared<boost::asio::streambuf>();
885  auto request = std::make_shared<boost::beast::http::request<boost::beast::http::string_body>>();
886  auto ps = socket.get();
887  boost::beast::http::async_read(
888  ps->next_layer(),
889  *sb,
890  *request,
891  [this, socket = force_move(socket), sb, request, tim, underlying_finished]
892  (error_code ec, std::size_t) mutable {
893  if (ec) {
894  *underlying_finished = true;
895  tim->cancel();
896  return;
897  }
898  if (!boost::beast::websocket::is_upgrade(*request)) {
899  *underlying_finished = true;
900  tim->cancel();
901  return;
902  }
903  auto ps = socket.get();
904 
905 #if BOOST_BEAST_VERSION >= 248
906 
907  auto it = request->find("Sec-WebSocket-Protocol");
908  if (it != request->end()) {
909  ps->set_option(
910  boost::beast::websocket::stream_base::decorator(
911  [name = it->name(), value = it->value()] // name is enum, value is boost::string_view
912  (boost::beast::websocket::response_type& res) {
913  // This lambda is called before the scope out point *1
914  res.set(name, value);
915  }
916  )
917  );
918  }
919  ps->async_accept(
920  *request,
921  [this, socket = force_move(socket), tim, underlying_finished]
922  (error_code ec) mutable {
923  *underlying_finished = true;
924  tim->cancel();
925  if (ec) {
926  return;
927  }
928  auto sp = std::make_shared<endpoint_t>(ioc_con_, force_move(socket), version_);
929  if (h_accept_) h_accept_(force_move(sp));
930  }
931  );
932 
933 #else // BOOST_BEAST_VERSION >= 248
934 
935  ps->async_accept_ex(
936  *request,
937  [request]
938  (boost::beast::websocket::response_type& m) {
939  auto it = request->find("Sec-WebSocket-Protocol");
940  if (it != request->end()) {
941  m.insert(it->name(), it->value());
942  }
943  },
944  [this, socket = force_move(socket), tim, underlying_finished]
945  (error_code ec) mutable {
946  *underlying_finished = true;
947  tim->cancel();
948  if (ec) {
949  return;
950  }
951  // TODO: The use of force_move on this line of code causes
952  // a static assertion that socket is a const object when
953  // TLS is enabled, and WS is enabled, with Boost 1.70, and gcc 8.3.0
954  auto sp = std::make_shared<endpoint_t>(ioc_con_, socket, version_);
955  if (h_accept_) h_accept_(force_move(sp));
956  }
957  );
958 
959 #endif // BOOST_BEAST_VERSION >= 248
960 
961  // scope out point *1
962  }
963  );
964  }
965  );
966  do_accept();
967  }
968  );
969  }
970 
971 private:
972  as::ip::tcp::endpoint ep_;
973  as::io_context& ioc_accept_;
974  as::io_context& ioc_con_;
975  optional<as::ip::tcp::acceptor> acceptor_;
976  std::function<void(as::ip::tcp::acceptor&)> config_;
977  bool close_request_{false};
978  accept_handler h_accept_;
979  error_handler h_error_;
980  tls::context ctx_;
981  protocol_version version_ = protocol_version::undetermined;
982  std::chrono::steady_clock::duration underlying_connect_timeout_ = std::chrono::seconds(10);
983 };
984 
985 #endif // defined(MQTT_USE_TLS)
986 
987 #endif // defined(MQTT_USE_WS)
988 
989 } // namespace MQTT_NS
990 
991 #endif // MQTT_SERVER_HPP
Definition: endpoint.hpp:171
Definition: server.hpp:28
void on_close() noexcept override
Close handler.
Definition: server.hpp:33
void on_error(error_code) noexcept override
Error handler.
Definition: server.hpp:34
void on_pre_send() noexcept override
Pre-send handler This handler is called when any mqtt control packet is decided to send.
Definition: server.hpp:32
Definition: server.hpp:45
as::io_context & ioc_accept() const
Get reference of boost::asio::io_context for acceptor.
Definition: server.hpp:159
void close()
Definition: server.hpp:119
unsigned short port() const
Definition: server.hpp:117
server(AsioEndpoint &&ep, as::io_context &ioc, AcceptorConfig &&config)
Definition: server.hpp:84
std::function< void(std::shared_ptr< endpoint_t > ep)> accept_handler
Accept handler.
Definition: server.hpp:54
void set_protocol_version(protocol_version version)
Set MQTT protocol version.
Definition: server.hpp:143
server(AsioEndpoint &&ep, as::io_context &ioc_accept, as::io_context &ioc_con, AcceptorConfig &&config)
Definition: server.hpp:63
void set_error_handler(error_handler h=error_handler())
Set error handler.
Definition: server.hpp:132
void set_accept_handler(accept_handler h=accept_handler())
Definition: server.hpp:124
server(AsioEndpoint &&ep, as::io_context &ioc_accept, as::io_context &ioc_con)
Definition: server.hpp:77
void listen()
Definition: server.hpp:96
server(AsioEndpoint &&ep, as::io_context &ioc)
Definition: server.hpp:91
std::function< void(error_code ec)> error_handler
Error handler.
Definition: server.hpp:60
as::io_context & ioc_con() const
Get reference of boost::asio::io_context for connections.
Definition: server.hpp:151
Definition: type_erased_socket.hpp:22
virtual as::ip::tcp::socket::lowest_layer_type & lowest_layer()=0
Definition: tcp_endpoint.hpp:23
server<>::endpoint_t endpoint_t
Definition: common_type.hpp:17
Definition: any.hpp:27
boost::system::error_code error_code
Definition: error_code.hpp:16
constexpr std::remove_reference_t< T > && force_move(T &&t)
Definition: move.hpp:20
protocol_version
Definition: protocol_version.hpp:17
Definition: buffer.hpp:242
Definition: callable_overlay.hpp:20