7 #if !defined(MQTT_SERVER_HPP)
8 #define MQTT_SERVER_HPP
13 #include <boost/asio.hpp>
27 template <
typename Mutex,
template<
typename...>
class LockGuard, std::size_t PacketIdBytes>
40 typename Strand = as::io_context::strand,
41 typename Mutex = std::mutex,
42 template<
typename...>
class LockGuard = std::lock_guard,
43 std::size_t PacketIdBytes = 2
62 template <
typename AsioEndpo
int,
typename AcceptorConfig>
67 AcceptorConfig&& config)
68 : ep_(std::forward<AsioEndpoint>(ep)),
71 acceptor_(
as::ip::tcp::acceptor(ioc_accept_, ep_)),
72 config_(std::forward<AcceptorConfig>(config)) {
73 config_(acceptor_.value());
76 template <
typename AsioEndpo
int>
83 template <
typename AsioEndpo
int,
typename AcceptorConfig>
87 AcceptorConfig&& config)
88 :
server(std::forward<AsioEndpoint>(ep), ioc, ioc, std::forward<AcceptorConfig>(config)) {}
90 template <
typename AsioEndpo
int>
94 :
server(std::forward<AsioEndpoint>(ep), ioc, ioc, [](
as::ip::tcp::acceptor&) {}) {}
97 close_request_ =
false;
101 acceptor_.emplace(ioc_accept_, ep_);
102 config_(acceptor_.value());
104 catch (boost::system::system_error
const& e) {
107 [
this, ec = e.code()] {
108 if (h_error_) h_error_(ec);
117 unsigned short port()
const {
return acceptor_.value().local_endpoint().port(); }
120 close_request_ =
true;
165 if (close_request_)
return;
166 auto socket = std::make_shared<socket_t>(ioc_con_);
167 acceptor_.value().async_accept(
173 if (h_error_) h_error_(ec);
176 auto sp = std::make_shared<endpoint_t>(ioc_con_,
force_move(
socket), version_);
184 as::ip::tcp::endpoint ep_;
185 as::io_context& ioc_accept_;
186 as::io_context& ioc_con_;
187 optional<as::ip::tcp::acceptor> acceptor_;
188 std::function<void(as::ip::tcp::acceptor&)> config_;
189 bool close_request_{
false};
190 accept_handler h_accept_;
191 error_handler h_error_;
195 #if defined(MQTT_USE_TLS)
198 typename Strand = as::io_context::strand,
199 typename Mutex = std::mutex,
200 template<
typename...>
class LockGuard = std::lock_guard,
201 std::size_t PacketIdBytes = 2
205 using socket_t = tcp_endpoint<tls::stream<as::ip::tcp::socket>, Strand>;
206 using endpoint_t = callable_overlay<server_endpoint<Mutex, LockGuard, PacketIdBytes>>;
212 using accept_handler = std::function<void(std::shared_ptr<endpoint_t> ep)>;
218 using error_handler = std::function<void(
error_code ec)>;
220 template <
typename AsioEndpo
int,
typename AcceptorConfig>
224 as::io_context& ioc_accept,
225 as::io_context& ioc_con,
226 AcceptorConfig&& config)
227 : ep_(std::forward<AsioEndpoint>(ep)),
228 ioc_accept_(ioc_accept),
230 acceptor_(
as::ip::tcp::acceptor(ioc_accept_, ep_)),
231 config_(std::forward<AcceptorConfig>(config)),
233 config_(acceptor_.value());
236 template <
typename AsioEndpo
int>
240 as::io_context& ioc_accept,
241 as::io_context& ioc_con)
242 : server_tls(std::forward<AsioEndpoint>(ep),
force_move(ctx), ioc_accept, ioc_con, [](
as::ip::tcp::acceptor&) {}) {}
244 template <
typename AsioEndpo
int,
typename AcceptorConfig>
249 AcceptorConfig&& config)
250 : server_tls(std::forward<AsioEndpoint>(ep),
force_move(ctx), ioc, ioc, std::forward<AcceptorConfig>(config)) {}
252 template <
typename AsioEndpo
int>
257 : server_tls(std::forward<AsioEndpoint>(ep),
force_move(ctx), ioc, ioc, [](
as::ip::tcp::acceptor&) {}) {}
260 close_request_ =
false;
264 acceptor_.emplace(ioc_accept_, ep_);
265 config_(acceptor_.value());
267 catch (boost::system::system_error
const& e) {
270 [
this, ec = e.code()] {
271 if (h_error_) h_error_(ec);
280 unsigned short port()
const {
return acceptor_.value().local_endpoint().port(); }
283 close_request_ =
true;
287 void set_accept_handler(accept_handler h = accept_handler()) {
295 void set_error_handler(error_handler h = error_handler()) {
314 as::io_context& ioc_con()
const {
322 as::io_context& ioc_accept()
const {
334 void set_underlying_connect_timeout(std::chrono::steady_clock::duration timeout) {
335 underlying_connect_timeout_ =
force_move(timeout);
342 tls::context& get_ssl_context() {
350 tls::context
const& get_ssl_context()
const {
356 if (close_request_)
return;
357 auto socket = std::make_shared<socket_t>(ioc_con_, ctx_);
358 auto ps = socket.get();
359 acceptor_.value().async_accept(
365 if (h_error_) h_error_(ec);
368 auto underlying_finished = std::make_shared<bool>(
false);
369 auto tim = std::make_shared<as::steady_timer>(ioc_con_);
370 tim->expires_after(underlying_connect_timeout_);
372 [socket, tim, underlying_finished]
374 if (*underlying_finished)
return;
379 socket->lowest_layer().close(close_ec);
384 auto ps = socket.get();
386 tls::stream_base::server,
387 [
this, socket =
force_move(socket), tim, underlying_finished]
389 *underlying_finished =
true;
394 auto sp = std::make_shared<endpoint_t>(ioc_con_,
force_move(socket), version_);
404 as::ip::tcp::endpoint ep_;
405 as::io_context& ioc_accept_;
406 as::io_context& ioc_con_;
407 optional<as::ip::tcp::acceptor> acceptor_;
408 std::function<void(as::ip::tcp::acceptor&)> config_;
409 bool close_request_{
false};
410 accept_handler h_accept_;
411 error_handler h_error_;
414 std::chrono::steady_clock::duration underlying_connect_timeout_ = std::chrono::seconds(10);
419 #if defined(MQTT_USE_WS)
422 typename Strand = as::io_context::strand,
423 typename Mutex = std::mutex,
424 template<
typename...>
class LockGuard = std::lock_guard,
425 std::size_t PacketIdBytes = 2
429 using socket_t = ws_endpoint<as::ip::tcp::socket, Strand>;
430 using endpoint_t = callable_overlay<server_endpoint<Mutex, LockGuard, PacketIdBytes>>;
436 using accept_handler = std::function<void(std::shared_ptr<endpoint_t> ep)>;
442 using error_handler = std::function<void(
error_code ec)>;
444 template <
typename AsioEndpo
int,
typename AcceptorConfig>
447 as::io_context& ioc_accept,
448 as::io_context& ioc_con,
449 AcceptorConfig&& config)
450 : ep_(std::forward<AsioEndpoint>(ep)),
451 ioc_accept_(ioc_accept),
453 acceptor_(
as::ip::tcp::acceptor(ioc_accept_, ep_)),
454 config_(std::forward<AcceptorConfig>(config)) {
455 config_(acceptor_.value());
458 template <
typename AsioEndpo
int>
461 as::io_context& ioc_accept,
462 as::io_context& ioc_con)
463 : server_ws(std::forward<AsioEndpoint>(ep), ioc_accept, ioc_con, [](
as::ip::tcp::acceptor&) {}) {}
465 template <
typename AsioEndpo
int,
typename AcceptorConfig>
469 AcceptorConfig&& config)
470 : server_ws(std::forward<AsioEndpoint>(ep), ioc, ioc, std::forward<AcceptorConfig>(config)) {}
472 template <
typename AsioEndpo
int>
476 : server_ws(std::forward<AsioEndpoint>(ep), ioc, ioc, [](
as::ip::tcp::acceptor&) {}) {}
479 close_request_ =
false;
483 acceptor_.emplace(ioc_accept_, ep_);
484 config_(acceptor_.value());
486 catch (boost::system::system_error
const& e) {
489 [
this, ec = e.code()] {
490 if (h_error_) h_error_(ec);
499 unsigned short port()
const {
return acceptor_.value().local_endpoint().port(); }
502 close_request_ =
true;
506 void set_accept_handler(accept_handler h = accept_handler()) {
514 void set_error_handler(error_handler h = error_handler()) {
533 as::io_context& ioc_con()
const {
541 as::io_context& ioc_accept()
const {
553 void set_underlying_connect_timeout(std::chrono::steady_clock::duration timeout) {
554 underlying_connect_timeout_ =
force_move(timeout);
559 if (close_request_)
return;
560 auto socket = std::make_shared<socket_t>(ioc_con_);
561 auto ps = socket.get();
562 acceptor_.value().async_accept(
568 if (h_error_) h_error_(ec);
571 auto underlying_finished = std::make_shared<bool>(
false);
572 auto tim = std::make_shared<as::steady_timer>(ioc_con_);
573 tim->expires_after(underlying_connect_timeout_);
575 [socket, tim, underlying_finished]
577 if (*underlying_finished)
return;
582 socket->lowest_layer().close(close_ec);
588 auto sb = std::make_shared<boost::asio::streambuf>();
589 auto request = std::make_shared<boost::beast::http::request<boost::beast::http::string_body>>();
590 auto ps = socket.get();
591 boost::beast::http::async_read(
595 [
this, socket =
force_move(socket), sb, request, tim, underlying_finished]
598 *underlying_finished =
true;
602 if (!boost::beast::websocket::is_upgrade(*request)) {
603 *underlying_finished =
true;
607 auto ps = socket.get();
609 #if BOOST_BEAST_VERSION >= 248
611 auto it = request->find(
"Sec-WebSocket-Protocol");
612 if (it != request->end()) {
614 boost::beast::websocket::stream_base::decorator(
615 [name = it->name(), value = it->value()]
616 (boost::beast::websocket::response_type& res) {
618 res.set(name, value);
625 [
this, socket =
force_move(socket), tim, underlying_finished]
627 *underlying_finished =
true;
632 auto sp = std::make_shared<endpoint_t>(ioc_con_,
force_move(socket), version_);
642 (boost::beast::websocket::response_type& m) {
643 auto it = request->find(
"Sec-WebSocket-Protocol");
644 if (it != request->end()) {
645 m.insert(it->name(), it->value());
648 [
this, socket =
force_move(socket), tim, underlying_finished]
650 *underlying_finished =
true;
655 auto sp = std::make_shared<endpoint_t>(ioc_con_,
force_move(socket), version_);
671 as::ip::tcp::endpoint ep_;
672 as::io_context& ioc_accept_;
673 as::io_context& ioc_con_;
674 optional<as::ip::tcp::acceptor> acceptor_;
675 std::function<void(as::ip::tcp::acceptor&)> config_;
676 bool close_request_{
false};
677 accept_handler h_accept_;
678 error_handler h_error_;
680 std::chrono::steady_clock::duration underlying_connect_timeout_ = std::chrono::seconds(10);
684 #if defined(MQTT_USE_TLS)
687 typename Strand = as::io_context::strand,
688 typename Mutex = std::mutex,
689 template<
typename...>
class LockGuard = std::lock_guard,
690 std::size_t PacketIdBytes = 2
692 class server_tls_ws {
694 using socket_t = ws_endpoint<tls::stream<as::ip::tcp::socket>, Strand>;
695 using endpoint_t = callable_overlay<server_endpoint<Mutex, LockGuard, PacketIdBytes>>;
701 using accept_handler = std::function<void(std::shared_ptr<endpoint_t> ep)>;
707 using error_handler = std::function<void(
error_code ec)>;
709 template <
typename AsioEndpo
int,
typename AcceptorConfig>
713 as::io_context& ioc_accept,
714 as::io_context& ioc_con,
715 AcceptorConfig&& config)
716 : ep_(std::forward<AsioEndpoint>(ep)),
717 ioc_accept_(ioc_accept),
719 acceptor_(
as::ip::tcp::acceptor(ioc_accept_, ep_)),
720 config_(std::forward<AcceptorConfig>(config)),
722 config_(acceptor_.value());
725 template <
typename AsioEndpo
int>
729 as::io_context& ioc_accept,
730 as::io_context& ioc_con)
731 : server_tls_ws(std::forward<AsioEndpoint>(ep),
force_move(ctx), ioc_accept, ioc_con, [](
as::ip::tcp::acceptor&) {}) {}
733 template <
typename AsioEndpo
int,
typename AcceptorConfig>
738 AcceptorConfig&& config)
739 : server_tls_ws(std::forward<AsioEndpoint>(ep),
force_move(ctx), ioc, ioc, std::forward<AcceptorConfig>(config)) {}
741 template <
typename AsioEndpo
int>
746 : server_tls_ws(std::forward<AsioEndpoint>(ep),
force_move(ctx), ioc, ioc, [](
as::ip::tcp::acceptor&) {}) {}
749 close_request_ =
false;
753 acceptor_.emplace(ioc_accept_, ep_);
754 config_(acceptor_.value());
756 catch (boost::system::system_error
const& e) {
759 [
this, ec = e.code()] {
760 if (h_error_) h_error_(ec);
769 unsigned short port()
const {
return acceptor_.value().local_endpoint().port(); }
772 close_request_ =
true;
776 void set_accept_handler(accept_handler h = accept_handler()) {
784 void set_error_handler(error_handler h = error_handler()) {
803 as::io_context& ioc_con()
const {
811 as::io_context& ioc_accept()
const {
823 void set_underlying_connect_timeout(std::chrono::steady_clock::duration timeout) {
824 underlying_connect_timeout_ =
force_move(timeout);
831 tls::context& get_ssl_context() {
839 tls::context
const& get_ssl_context()
const {
845 if (close_request_)
return;
846 auto socket = std::make_shared<socket_t>(ioc_con_, ctx_);
847 auto ps = socket.get();
848 acceptor_.value().async_accept(
849 ps->next_layer().next_layer(),
854 if (h_error_) h_error_(ec);
857 auto underlying_finished = std::make_shared<bool>(
false);
858 auto tim = std::make_shared<as::steady_timer>(ioc_con_);
859 tim->expires_after(underlying_connect_timeout_);
861 [socket, tim, underlying_finished]
863 if (*underlying_finished)
return;
868 socket->lowest_layer().close(close_ec);
874 auto ps = socket.get();
875 ps->next_layer().async_handshake(
876 tls::stream_base::server,
877 [
this, socket =
force_move(socket), tim, underlying_finished]
880 *underlying_finished =
true;
884 auto sb = std::make_shared<boost::asio::streambuf>();
885 auto request = std::make_shared<boost::beast::http::request<boost::beast::http::string_body>>();
886 auto ps = socket.get();
887 boost::beast::http::async_read(
891 [
this, socket =
force_move(socket), sb, request, tim, underlying_finished]
894 *underlying_finished =
true;
898 if (!boost::beast::websocket::is_upgrade(*request)) {
899 *underlying_finished =
true;
903 auto ps = socket.get();
905 #if BOOST_BEAST_VERSION >= 248
907 auto it = request->find(
"Sec-WebSocket-Protocol");
908 if (it != request->end()) {
910 boost::beast::websocket::stream_base::decorator(
911 [name = it->name(), value = it->value()]
912 (boost::beast::websocket::response_type& res) {
914 res.set(name, value);
921 [
this, socket =
force_move(socket), tim, underlying_finished]
923 *underlying_finished =
true;
928 auto sp = std::make_shared<endpoint_t>(ioc_con_,
force_move(socket), version_);
938 (boost::beast::websocket::response_type& m) {
939 auto it = request->find(
"Sec-WebSocket-Protocol");
940 if (it != request->end()) {
941 m.insert(it->name(), it->value());
944 [
this, socket =
force_move(socket), tim, underlying_finished]
946 *underlying_finished =
true;
954 auto sp = std::make_shared<endpoint_t>(ioc_con_, socket, version_);
972 as::ip::tcp::endpoint ep_;
973 as::io_context& ioc_accept_;
974 as::io_context& ioc_con_;
975 optional<as::ip::tcp::acceptor> acceptor_;
976 std::function<void(as::ip::tcp::acceptor&)> config_;
977 bool close_request_{
false};
978 accept_handler h_accept_;
979 error_handler h_error_;
982 std::chrono::steady_clock::duration underlying_connect_timeout_ = std::chrono::seconds(10);
Definition: endpoint.hpp:171
Definition: server.hpp:28
void on_close() noexcept override
Close handler.
Definition: server.hpp:33
void on_error(error_code) noexcept override
Error handler.
Definition: server.hpp:34
void on_pre_send() noexcept override
Pre-send handler This handler is called when any mqtt control packet is decided to send.
Definition: server.hpp:32
~server_endpoint()=default
Definition: server.hpp:45
as::io_context & ioc_accept() const
Get reference of boost::asio::io_context for acceptor.
Definition: server.hpp:159
void close()
Definition: server.hpp:119
unsigned short port() const
Definition: server.hpp:117
server(AsioEndpoint &&ep, as::io_context &ioc, AcceptorConfig &&config)
Definition: server.hpp:84
std::function< void(std::shared_ptr< endpoint_t > ep)> accept_handler
Accept handler.
Definition: server.hpp:54
void set_protocol_version(protocol_version version)
Set MQTT protocol version.
Definition: server.hpp:143
server(AsioEndpoint &&ep, as::io_context &ioc_accept, as::io_context &ioc_con, AcceptorConfig &&config)
Definition: server.hpp:63
void set_error_handler(error_handler h=error_handler())
Set error handler.
Definition: server.hpp:132
void set_accept_handler(accept_handler h=accept_handler())
Definition: server.hpp:124
server(AsioEndpoint &&ep, as::io_context &ioc_accept, as::io_context &ioc_con)
Definition: server.hpp:77
void listen()
Definition: server.hpp:96
server(AsioEndpoint &&ep, as::io_context &ioc)
Definition: server.hpp:91
std::function< void(error_code ec)> error_handler
Error handler.
Definition: server.hpp:60
as::io_context & ioc_con() const
Get reference of boost::asio::io_context for connections.
Definition: server.hpp:151
Definition: type_erased_socket.hpp:22
virtual as::ip::tcp::socket::lowest_layer_type & lowest_layer()=0
Definition: tcp_endpoint.hpp:23
server<>::endpoint_t endpoint_t
Definition: common_type.hpp:17
boost::system::error_code error_code
Definition: error_code.hpp:16
constexpr std::remove_reference_t< T > && force_move(T &&t)
Definition: move.hpp:20
protocol_version
Definition: protocol_version.hpp:17
Definition: buffer.hpp:242
Definition: callable_overlay.hpp:20