mqtt_cpp
client.hpp
Go to the documentation of this file.
1 // Copyright Takatoshi Kondo 2015
2 //
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 
7 #if !defined(MQTT_CLIENT_HPP)
8 #define MQTT_CLIENT_HPP
9 
10 #include <mqtt/variant.hpp> // should be top to configure variant limit
11 
12 #include <string>
13 #include <vector>
14 #include <functional>
15 #include <type_traits>
16 
17 #include <mqtt/namespace.hpp>
18 #include <mqtt/optional.hpp>
19 
20 #include <boost/lexical_cast.hpp>
21 #include <boost/asio.hpp>
22 
23 #include <mqtt/endpoint.hpp>
24 #include <mqtt/null_strand.hpp>
25 #include <mqtt/move.hpp>
26 #include <mqtt/constant.hpp>
27 
29 
30 namespace MQTT_NS {
31 
32 namespace as = boost::asio;
33 namespace mi = boost::multi_index;
34 
35 template <typename Socket, std::size_t PacketIdBytes = 2>
36 class client : public endpoint<std::mutex, std::lock_guard, PacketIdBytes> {
39 protected:
41 public:
43 
47  template<typename ... Args>
48  client(constructor_access, Args && ... args)
49  : client(std::forward<Args>(args)...)
50  { }
51 
59  friend std::shared_ptr<callable_overlay<client<tcp_endpoint<as::ip::tcp::socket, as::io_context::strand>>>>
60  make_client(as::io_context& ioc, std::string host, std::string port, protocol_version version);
61 
69  friend std::shared_ptr<callable_overlay<client<tcp_endpoint<as::ip::tcp::socket, null_strand>>>>
70  make_client_no_strand(as::io_context& ioc, std::string host, std::string port, protocol_version version);
71 
72 #if defined(MQTT_USE_WS)
82  friend std::shared_ptr<callable_overlay<client<ws_endpoint<as::ip::tcp::socket, as::io_context::strand>>>>
83  make_client_ws(as::io_context& ioc, std::string host, std::string port, std::string path, protocol_version version);
84 
93  friend std::shared_ptr<callable_overlay<client<ws_endpoint<as::ip::tcp::socket, null_strand>>>>
94  make_client_no_strand_ws(as::io_context& ioc, std::string host, std::string port, std::string path, protocol_version version);
95 #endif // defined(MQTT_USE_WS)
96 
97 #if defined(MQTT_USE_TLS)
105  friend std::shared_ptr<callable_overlay<client<tcp_endpoint<tls::stream<as::ip::tcp::socket>, as::io_context::strand>>>>
106  make_tls_client(as::io_context& ioc, std::string host, std::string port, protocol_version version);
107 
115  friend std::shared_ptr<callable_overlay<client<tcp_endpoint<tls::stream<as::ip::tcp::socket>, null_strand>>>>
116  make_tls_client_no_strand(as::io_context& ioc, std::string host, std::string port, protocol_version version);
117 
118 #if defined(MQTT_USE_WS)
128  friend std::shared_ptr<callable_overlay<client<ws_endpoint<tls::stream<as::ip::tcp::socket>, as::io_context::strand>>>>
129  make_tls_client_ws(as::io_context& ioc, std::string host, std::string port, std::string path, protocol_version version);
130 
139  friend std::shared_ptr<callable_overlay<client<ws_endpoint<tls::stream<as::ip::tcp::socket>, null_strand>>>>
140  make_tls_client_no_strand_ws(as::io_context& ioc, std::string host, std::string port, std::string path, protocol_version version);
141 #endif // defined(MQTT_USE_WS)
142 #endif // defined(MQTT_USE_TLS)
143 
151  friend std::shared_ptr<callable_overlay<client<tcp_endpoint<as::ip::tcp::socket, as::io_context::strand>, 4>>>
152  make_client_32(as::io_context& ioc, std::string host, std::string port, protocol_version version);
153 
161  friend std::shared_ptr<callable_overlay<client<tcp_endpoint<as::ip::tcp::socket, null_strand>, 4>>>
162  make_client_no_strand_32(as::io_context& ioc, std::string host, std::string port, protocol_version version);
163 
164 #if defined(MQTT_USE_WS)
174  friend std::shared_ptr<callable_overlay<client<ws_endpoint<as::ip::tcp::socket, as::io_context::strand>, 4>>>
175  make_client_ws_32(as::io_context& ioc, std::string host, std::string port, std::string path, protocol_version version);
176 
185  friend std::shared_ptr<callable_overlay<client<ws_endpoint<as::ip::tcp::socket, null_strand>, 4>>>
186  make_client_no_strand_ws_32(as::io_context& ioc, std::string host, std::string port, std::string path, protocol_version version);
187 #endif // defined(MQTT_USE_WS)
188 
189 #if defined(MQTT_USE_TLS)
197  friend std::shared_ptr<callable_overlay<client<tcp_endpoint<tls::stream<as::ip::tcp::socket>, as::io_context::strand>, 4>>>
198  make_tls_client_32(as::io_context& ioc, std::string host, std::string port, protocol_version version);
199 
207  friend std::shared_ptr<callable_overlay<client<tcp_endpoint<tls::stream<as::ip::tcp::socket>, null_strand>, 4>>>
208  make_tls_client_no_strand_32(as::io_context& ioc, std::string host, std::string port, protocol_version version);
209 
210 #if defined(MQTT_USE_WS)
220  friend std::shared_ptr<callable_overlay<client<ws_endpoint<tls::stream<as::ip::tcp::socket>, as::io_context::strand>, 4>>>
221  make_tls_client_ws_32(as::io_context& ioc, std::string host, std::string port, std::string path, protocol_version version);
222 
231  friend std::shared_ptr<callable_overlay<client<ws_endpoint<tls::stream<as::ip::tcp::socket>, null_strand>, 4>>>
232  make_tls_client_no_strand_ws_32(as::io_context& ioc, std::string host, std::string port, std::string path, protocol_version version);
233 #endif // defined(MQTT_USE_WS)
234 #endif // defined(MQTT_USE_TLS)
235 
242  void set_host(std::string host) {
243  host_ = force_move(host);
244  }
245 
252  void set_port(std::string port) {
253  port_ = force_move(port);
254  }
255 
264  void set_client_id(std::string id) {
265  client_id_ = force_move(id);
266  }
267 
272  std::string const& get_client_id() const {
273  return client_id_;
274  }
275 
285  void set_clean_session(bool cs) {
286  set_clean_start(cs);
287  }
288 
298  void set_clean_start(bool cs) {
299  base::clean_start_ = cs;
300  }
301 
310  void set_user_name(std::string name) {
311  user_name_ = force_move(name);
312  }
313 
322  void set_password(std::string password) {
323  password_ = force_move(password);
324  }
325 
333  void set_will(will w) {
334  will_ = force_move(w);
335  }
336 
337  template<typename ... Args>
338  void set_will(Args && ... args) {
339  will_.emplace(std::forward<Args>(args)...);
340  }
341 
342 #if defined(MQTT_USE_TLS)
343 
348  tls::context& get_ssl_context() {
349  static_assert(has_tls<std::decay_t<decltype(*this)>>::value, "Client is required to support TLS.");
350  return ctx_;
351  }
352 
357  tls::context const& get_ssl_context() const {
358  static_assert(has_tls<std::decay_t<decltype(*this)>>::value, "Client is required to support TLS.");
359  return ctx_;
360  }
361 
362 #endif // defined(MQTT_USE_TLS)
363 
364 
380  void set_keep_alive_sec(std::uint16_t keep_alive_sec, std::chrono::steady_clock::duration ping) {
381  if ((ping_duration_ != std::chrono::steady_clock::duration::zero()) && base::connected() && (ping == std::chrono::steady_clock::duration::zero())) {
382  tim_ping_.cancel();
383  }
384  keep_alive_sec_ = keep_alive_sec;
385  ping_duration_ = force_move(ping);
386  }
387 
403  void set_keep_alive_sec_ping_ms(std::uint16_t keep_alive_sec, std::size_t ping_ms) {
404  set_keep_alive_sec(keep_alive_sec, std::chrono::milliseconds(ping_ms));
405  }
406 
416  void set_keep_alive_sec(std::uint16_t keep_alive_sec) {
417  set_keep_alive_sec(keep_alive_sec, std::chrono::seconds(keep_alive_sec / 2));
418  }
419 
420 
426  void connect(any session_life_keeper = any()) {
427  connect(v5::properties{}, force_move(session_life_keeper));
428  }
429 
436  void connect(boost::system::error_code& ec, any session_life_keeper = any()) {
437  connect(v5::properties{}, ec, force_move(session_life_keeper));
438  }
439 
448  void connect(v5::properties props, any session_life_keeper = any()) {
449  setup_socket(socket_);
450  connect_impl(force_move(props), force_move(session_life_keeper));
451  }
452 
462  void connect(
463  v5::properties props,
465  any session_life_keeper = any()) {
466  setup_socket(socket_);
467  connect_impl(force_move(props), force_move(session_life_keeper), ec);
468  }
469 
477  void connect(std::shared_ptr<Socket>&& socket, any session_life_keeper = any()) {
478  connect(force_move(socket), v5::properties{}, force_move(session_life_keeper));
479  }
480 
489  void connect(
490  std::shared_ptr<Socket>&& socket,
492  any session_life_keeper = any()) {
493  connect(force_move(socket), v5::properties{}, ec, force_move(session_life_keeper));
494  }
495 
506  void connect(
507  std::shared_ptr<Socket>&& socket,
508  v5::properties props,
509  any session_life_keeper = any()) {
510  socket_ = force_move(socket);
511  base::socket_sp_ref() = socket_;
512  connect_impl(force_move(props), force_move(session_life_keeper));
513  }
514 
526  void connect(
527  std::shared_ptr<Socket>&& socket,
528  v5::properties props,
530  any session_life_keeper = any()) {
531  socket_ = force_move(socket);
532  base::socket_sp_ref() = socket_;
533  connect_impl(force_move(props), force_move(session_life_keeper), ec);
534  }
535 
540  void async_connect() {
541  async_connect(any(), async_handler_t());
542  }
543 
549  template <typename T>
550  // to avoid ambiguousness between any and async_handler_t
551  std::enable_if_t<
552  !std::is_convertible<T, async_handler_t>::value
553  >
554  async_connect(T session_life_keeper) {
555  async_connect(force_move(session_life_keeper), async_handler_t());
556  }
557 
564  async_connect(any(), force_move(func));
565  }
566 
573  void async_connect(any session_life_keeper, async_handler_t func) {
574  async_connect(v5::properties{}, force_move(session_life_keeper), force_move(func));
575  }
576 
585  template <typename T>
586  // to avoid ambiguousness between any and async_handler_t
587  std::enable_if_t<
588  !std::is_convertible<T, async_handler_t>::value
589  >
590  async_connect(v5::properties props, T session_life_keeper) {
591  async_connect(force_move(props), force_move(session_life_keeper), async_handler_t());
592  }
593 
603  async_connect(force_move(props), any(), force_move(func));
604  }
605 
615  void async_connect(v5::properties props, any session_life_keeper, async_handler_t func) {
616  setup_socket(socket_);
617  async_connect_impl(force_move(props), force_move(session_life_keeper), force_move(func));
618  }
619 
626  void async_connect(std::shared_ptr<Socket>&& socket) {
628  }
629 
637  template <typename T>
638  // to avoid ambiguousness between any and async_handler_t
639  std::enable_if_t<
640  !std::is_convertible<T, async_handler_t>::value
641  >
642  async_connect(std::shared_ptr<Socket>&& socket, T session_life_keeper) {
644  }
645 
653  void async_connect(std::shared_ptr<Socket>&& socket, async_handler_t func) {
655  }
656 
665  void async_connect(std::shared_ptr<Socket>&& socket, any session_life_keeper, async_handler_t func) {
666  async_connect(force_move(socket), v5::properties{}, force_move(session_life_keeper), force_move(func));
667  }
668 
678  void async_connect(std::shared_ptr<Socket>&& socket, v5::properties props) {
680  }
681 
692  template <typename T>
693  // to avoid ambiguousness between any and async_handler_t
694  std::enable_if_t<
695  !std::is_convertible<T, async_handler_t>::value
696  >
697  async_connect(std::shared_ptr<Socket>&& socket, v5::properties props, T session_life_keeper) {
698  async_connect(force_move(socket), force_move(props), force_move(session_life_keeper), async_handler_t());
699  }
700 
711  void async_connect(std::shared_ptr<Socket>&& socket, v5::properties props, async_handler_t func) {
712  async_connect(force_move(socket), force_move(props), any(), force_move(func));
713  }
714 
726  void async_connect(std::shared_ptr<Socket>&& socket, v5::properties props, any session_life_keeper, async_handler_t func) {
727  socket_ = force_move(socket);
728  base::socket_sp_ref() = socket_;
729  async_connect_impl(force_move(props), force_move(session_life_keeper), force_move(func));
730  }
731 
749  std::chrono::steady_clock::duration timeout,
751  v5::properties props = {}
752  ) {
753  if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
754  if (base::connected()) {
755  std::weak_ptr<this_type> wp(std::static_pointer_cast<this_type>(this->shared_from_this()));
756  tim_close_.expires_after(force_move(timeout));
757  tim_close_.async_wait(
758  [wp = force_move(wp)](error_code ec) {
759  if (auto sp = wp.lock()) {
760  if (!ec) {
761  sp->socket()->post(
762  [sp] {
763  sp->force_disconnect();
764  }
765  );
766  }
767  }
768  }
769  );
770  base::disconnect(reason_code, force_move(props));
771  }
772  }
773 
791  v5::properties props = {}
792  ) {
793  if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
794  if (base::connected()) {
795  base::disconnect(reason_code, force_move(props));
796  }
797  }
798 
809  std::chrono::steady_clock::duration timeout,
810  async_handler_t func = async_handler_t()) {
811  if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
812  if (base::connected()) {
813  std::weak_ptr<this_type> wp(std::static_pointer_cast<this_type>(this->shared_from_this()));
814  tim_close_.expires_after(force_move(timeout));
815  tim_close_.async_wait(
816  [wp = force_move(wp)](error_code ec) {
817  if (auto sp = wp.lock()) {
818  if (!ec) {
819  sp->socket()->post(
820  [sp] {
821  sp->force_disconnect();
822  }
823  );
824  }
825  }
826  }
827  );
828  base::async_disconnect(force_move(func));
829  }
830  }
831 
850  std::chrono::steady_clock::duration timeout,
851  v5::disconnect_reason_code reason_code,
852  v5::properties props,
853  async_handler_t func = async_handler_t()) {
854  if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
855  if (base::connected()) {
856  std::weak_ptr<this_type> wp(std::static_pointer_cast<this_type>(this->shared_from_this()));
857  tim_close_.expires_after(force_move(timeout));
858  tim_close_.async_wait(
859  [wp = force_move(wp)](error_code ec) {
860  if (auto sp = wp.lock()) {
861  if (!ec) {
862  sp->socket()->post(
863  [sp] {
864  sp->force_disconnect();
865  }
866  );
867  }
868  }
869  }
870  );
871  base::async_disconnect(reason_code, force_move(props), force_move(func));
872  }
873  }
874 
884  async_handler_t func = async_handler_t()) {
885  if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
886  if (base::connected()) {
887  base::async_disconnect(force_move(func));
888  }
889  }
890 
908  v5::disconnect_reason_code reason_code,
909  v5::properties props,
910  async_handler_t func = async_handler_t()) {
911  if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
912  if (base::connected()) {
913  base::async_disconnect(reason_code, force_move(props), force_move(func));
914  }
915  }
916 
923  if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
924  tim_close_.cancel();
925  base::force_disconnect();
926  }
927 
932  void set_async_pingreq(bool b) {
933  async_pingreq_ = b;
934  }
935 
936  std::shared_ptr<Socket> const& socket() const {
937  return socket_;
938  }
939 
940  std::shared_ptr<Socket>& socket() {
941  return socket_;
942  }
943 
944 protected:
945  client(as::io_context& ioc,
946  std::string host,
947  std::string port
948 #if defined(MQTT_USE_WS)
949  ,
950  std::string path = "/"
951 #endif // defined(MQTT_USE_WS)
952  ,
953  protocol_version version = protocol_version::v3_1_1,
954  bool async_store_send = false
955  )
956  :base(ioc, version, async_store_send),
957  ioc_(ioc),
958  tim_ping_(ioc_),
959  tim_close_(ioc_),
960  host_(force_move(host)),
961  port_(force_move(port))
962 #if defined(MQTT_USE_WS)
963  ,
964  path_(force_move(path))
965 #endif // defined(MQTT_USE_WS)
966  {
967 #if defined(MQTT_USE_TLS)
968  ctx_.set_verify_mode(tls::verify_peer);
969 #endif // defined(MQTT_USE_TLS)
970  }
971 
972 private:
973  template <typename Strand>
974  void setup_socket(std::shared_ptr<tcp_endpoint<as::ip::tcp::socket, Strand>>& socket) {
975  socket = std::make_shared<Socket>(ioc_);
976  base::socket_sp_ref() = socket;
977  }
978 
979 #if defined(MQTT_USE_WS)
980  template <typename Strand>
981  void setup_socket(std::shared_ptr<ws_endpoint<as::ip::tcp::socket, Strand>>& socket) {
982  socket = std::make_shared<Socket>(ioc_);
983  base::socket_sp_ref() = socket;
984  }
985 #endif // defined(MQTT_USE_WS)
986 
987 #if defined(MQTT_USE_TLS)
988  template <typename Strand>
989  void setup_socket(std::shared_ptr<tcp_endpoint<tls::stream<as::ip::tcp::socket>, Strand>>& socket) {
990  socket = std::make_shared<Socket>(ioc_, ctx_);
991  base::socket_sp_ref() = socket;
992  }
993 
994 #if defined(MQTT_USE_WS)
995  template <typename Strand>
996  void setup_socket(std::shared_ptr<ws_endpoint<tls::stream<as::ip::tcp::socket>, Strand>>& socket) {
997  socket = std::make_shared<Socket>(ioc_, ctx_);
998  base::socket_sp_ref() = socket;
999  }
1000 #endif // defined(MQTT_USE_WS)
1001 
1002 #endif // defined(MQTT_USE_TLS)
1003 
1004  void start_session(v5::properties props, any session_life_keeper) {
1005  base::async_read_control_packet_type(force_move(session_life_keeper));
1006  // sync base::connect() refer to parameters only in the function.
1007  // So they can be passed as view.
1008  base::connect(
1009  buffer(string_view(client_id_)),
1010  ( user_name_ ? buffer(string_view(user_name_.value()))
1011  : optional<buffer>() ),
1012  ( password_ ? buffer(string_view(password_.value()))
1013  : optional<buffer>() ),
1014  will_,
1015  keep_alive_sec_,
1016  force_move(props)
1017  );
1018  }
1019 
1020  void async_start_session(v5::properties props, any session_life_keeper, async_handler_t func) {
1021  base::async_read_control_packet_type(force_move(session_life_keeper));
1022  // sync base::connect() refer to parameters only in the function.
1023  // So they can be passed as view.
1024  base::async_connect(
1025  buffer(string_view(client_id_)),
1026  ( user_name_ ? buffer(string_view(user_name_.value()))
1027  : optional<buffer>() ),
1028  ( password_ ? buffer(string_view(password_.value()))
1029  : optional<buffer>() ),
1030  will_,
1031  keep_alive_sec_,
1032  force_move(props),
1033  force_move(func)
1034  );
1035  }
1036 
1037  template <typename Strand>
1038  void handshake_socket(
1039  tcp_endpoint<as::ip::tcp::socket, Strand>&,
1040  v5::properties props,
1041  any session_life_keeper) {
1042  start_session(force_move(props), force_move(session_life_keeper));
1043  }
1044 
1045  template <typename Strand>
1046  void handshake_socket(
1047  tcp_endpoint<as::ip::tcp::socket, Strand>&,
1048  v5::properties props,
1049  any session_life_keeper,
1051  start_session(force_move(props), force_move(session_life_keeper));
1052  ec = boost::system::errc::make_error_code(boost::system::errc::success);
1053  }
1054 
1055 #if defined(MQTT_USE_WS)
1056 
1057  template <typename Strand>
1058  void handshake_socket(
1059  ws_endpoint<as::ip::tcp::socket, Strand>& socket,
1060  v5::properties props,
1061  any session_life_keeper) {
1062  socket.handshake(host_, path_);
1063  start_session(force_move(props), force_move(session_life_keeper));
1064  }
1065 
1066  template <typename Strand>
1067  void handshake_socket(
1068  ws_endpoint<as::ip::tcp::socket, Strand>& socket,
1069  v5::properties props,
1070  any session_life_keeper,
1072  socket.handshake(host_, path_, ec);
1073  if (ec) return;
1074  start_session(force_move(props), force_move(session_life_keeper));
1075  }
1076 
1077 #endif // defined(MQTT_USE_WS)
1078 
1079 #if defined(MQTT_USE_TLS)
1080 
1081  template <typename Strand>
1082  void handshake_socket(
1083  tcp_endpoint<tls::stream<as::ip::tcp::socket>, Strand>& socket,
1084  v5::properties props,
1085  any session_life_keeper) {
1086  socket.handshake(tls::stream_base::client);
1087  start_session(force_move(props), force_move(session_life_keeper));
1088  }
1089 
1090  template <typename Strand>
1091  void handshake_socket(
1092  tcp_endpoint<tls::stream<as::ip::tcp::socket>, Strand>& socket,
1093  v5::properties props,
1094  any session_life_keeper,
1096  socket.handshake(tls::stream_base::client, ec);
1097  if (ec) return;
1098  start_session(force_move(props), force_move(session_life_keeper));
1099  }
1100 
1101 #if defined(MQTT_USE_WS)
1102 
1103  template <typename Strand>
1104  void handshake_socket(
1105  ws_endpoint<tls::stream<as::ip::tcp::socket>, Strand>& socket,
1106  v5::properties props,
1107  any session_life_keeper) {
1108  socket.next_layer().handshake(tls::stream_base::client);
1109  socket.handshake(host_, path_);
1110  start_session(force_move(props), force_move(session_life_keeper));
1111  }
1112 
1113  template <typename Strand>
1114  void handshake_socket(
1115  ws_endpoint<tls::stream<as::ip::tcp::socket>, Strand>& socket,
1116  v5::properties props,
1117  any session_life_keeper,
1119  socket.next_layer().handshake(tls::stream_base::client, ec);
1120  if (ec) return;
1121  socket.handshake(host_, path_, ec);
1122  if (ec) return;
1123  start_session(force_move(props), force_move(session_life_keeper));
1124  }
1125 
1126 #endif // defined(MQTT_USE_WS)
1127 
1128 #endif // defined(MQTT_USE_TLS)
1129 
1130  template <typename Strand>
1131  void async_handshake_socket(
1132  tcp_endpoint<as::ip::tcp::socket, Strand>&,
1133  v5::properties props,
1134  any session_life_keeper,
1135  async_handler_t func) {
1136  async_start_session(force_move(props), force_move(session_life_keeper), force_move(func));
1137  }
1138 
1139 #if defined(MQTT_USE_WS)
1140  template <typename Strand>
1141  void async_handshake_socket(
1142  ws_endpoint<as::ip::tcp::socket, Strand>& socket,
1143  v5::properties props,
1144  any session_life_keeper,
1145  async_handler_t func) {
1146  socket.async_handshake(
1147  host_,
1148  path_,
1149  [
1150  this,
1151  self = this->shared_from_this(),
1152  session_life_keeper = force_move(session_life_keeper),
1153  props = force_move(props),
1154  func = force_move(func)
1155  ]
1156  (error_code ec) mutable {
1157  if (ec) {
1158  if (func) func(ec);
1159  return;
1160  }
1161  async_start_session(force_move(props), force_move(session_life_keeper), force_move(func));
1162  });
1163  }
1164 #endif // defined(MQTT_USE_WS)
1165 
1166 #if defined(MQTT_USE_TLS)
1167 
1168  template <typename Strand>
1169  void async_handshake_socket(
1170  tcp_endpoint<tls::stream<as::ip::tcp::socket>, Strand>& socket,
1171  v5::properties props,
1172  any session_life_keeper,
1173  async_handler_t func) {
1174  socket.async_handshake(
1175  tls::stream_base::client,
1176  [
1177  this,
1178  self = this->shared_from_this(),
1179  session_life_keeper = force_move(session_life_keeper),
1180  props = force_move(props),
1181  func = force_move(func)
1182  ]
1183  (error_code ec) mutable {
1184  if (ec) {
1185  if (func) func(ec);
1186  return;
1187  }
1188  async_start_session(force_move(props), force_move(session_life_keeper), force_move(func));
1189  });
1190  }
1191 
1192 #if defined(MQTT_USE_WS)
1193  template <typename Strand>
1194  void async_handshake_socket(
1195  ws_endpoint<tls::stream<as::ip::tcp::socket>, Strand>& socket,
1196  v5::properties props,
1197  any session_life_keeper,
1198  async_handler_t func) {
1199  socket.next_layer().async_handshake(
1200  tls::stream_base::client,
1201  [
1202  this,
1203  self = this->shared_from_this(),
1204  session_life_keeper = force_move(session_life_keeper),
1205  &socket,
1206  props = force_move(props),
1207  func = force_move(func)
1208  ]
1209  (error_code ec) mutable {
1210  if (ec) {
1211  if (func) func(ec);
1212  return;
1213  }
1214  socket.async_handshake(
1215  host_,
1216  path_,
1217  [
1218  this,
1219  self,
1220  session_life_keeper = force_move(session_life_keeper),
1221  props = force_move(props),
1222  func = force_move(func)
1223  ]
1224  (error_code ec) mutable {
1225  if (ec) {
1226  if (func) func(ec);
1227  return;
1228  }
1229  async_start_session(force_move(props), force_move(session_life_keeper), force_move(func));
1230  });
1231  });
1232  }
1233 #endif // defined(MQTT_USE_WS)
1234 
1235 #endif // defined(MQTT_USE_TLS)
1236 
1237  void connect_impl(
1238  v5::properties props,
1239  any session_life_keeper) {
1240  as::ip::tcp::resolver r(ioc_);
1241  auto eps = r.resolve(host_, port_);
1242  as::connect(socket_->lowest_layer(), eps.begin(), eps.end());
1243  base::set_connect();
1244  if (ping_duration_ != std::chrono::steady_clock::duration::zero()) {
1245  set_timer();
1246  }
1247  handshake_socket(*socket_, force_move(props), force_move(session_life_keeper));
1248  }
1249 
1250  void connect_impl(
1251  v5::properties props,
1252  any session_life_keeper,
1254  as::ip::tcp::resolver r(ioc_);
1255  auto eps = r.resolve(host_, port_, ec);
1256  if (ec) return;
1257  as::connect(socket_->lowest_layer(), eps.begin(), eps.end(), ec);
1258  if (ec) return;
1259  base::set_connect();
1260  if (ping_duration_ != std::chrono::steady_clock::duration::zero()) {
1261  set_timer();
1262  }
1263  handshake_socket(*socket_, force_move(props), force_move(session_life_keeper), ec);
1264  }
1265 
1266  void async_connect_impl(
1267  v5::properties props,
1268  any session_life_keeper,
1269  async_handler_t func) {
1270  auto r = std::make_shared<as::ip::tcp::resolver>(ioc_);
1271  auto p = r.get();
1272  p->async_resolve(
1273  host_,
1274  port_,
1275  [
1276  this,
1277  self = this->shared_from_this(),
1278  props = force_move(props),
1279  session_life_keeper = force_move(session_life_keeper),
1280  func = force_move(func),
1281  r = force_move(r)
1282  ]
1283  (
1284  error_code ec,
1285  as::ip::tcp::resolver::results_type eps
1286  ) mutable {
1287  if (ec) {
1288  if (func) func(ec);
1289  return;
1290  }
1291  as::async_connect(
1292  socket_->lowest_layer(), eps.begin(), eps.end(),
1293  [
1294  this,
1295  self = force_move(self),
1296  props = force_move(props),
1297  session_life_keeper = force_move(session_life_keeper),
1298  func = force_move(func)
1299  ]
1300  (error_code ec, auto /* unused */) mutable {
1301  if (ec) {
1302  if (func) func(ec);
1303  return;
1304  }
1305  base::set_connect();
1306  if (ping_duration_ != std::chrono::steady_clock::duration::zero()) {
1307  set_timer();
1308  }
1309  async_handshake_socket(*socket_, force_move(props), force_move(session_life_keeper), force_move(func));
1310  }
1311  );
1312  }
1313  );
1314  }
1315 
1316 protected:
1317  void on_pre_send() noexcept override {
1318  if (ping_duration_ != std::chrono::steady_clock::duration::zero()) {
1319  reset_timer();
1320  }
1321  }
1322 
1323 private:
1324  void handle_timer(error_code ec) {
1325  if (!ec) {
1326  if (async_pingreq_) {
1327  base::async_pingreq();
1328  }
1329  else {
1330  base::pingreq();
1331  }
1332  }
1333  }
1334 
1335  void set_timer() {
1336  tim_ping_.expires_after(ping_duration_);
1337  std::weak_ptr<this_type> wp(std::static_pointer_cast<this_type>(this->shared_from_this()));
1338  tim_ping_.async_wait(
1339  [wp = force_move(wp)](error_code ec) {
1340  if (auto sp = wp.lock()) {
1341  sp->handle_timer(ec);
1342  }
1343  }
1344  );
1345  }
1346 
1347  void reset_timer() {
1348  tim_ping_.cancel();
1349  set_timer();
1350  }
1351 
1352 protected:
1353  void on_close() noexcept override {
1354  if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
1355  }
1356 
1357  void on_error(error_code ec) noexcept override {
1358  (void)ec;
1359  if (ping_duration_ != std::chrono::steady_clock::duration::zero()) tim_ping_.cancel();
1360  }
1361 
1362  // Ensure that only code that knows the *exact* type of an object
1363  // inheriting from this abstract base class can destruct it.
1364  // This avoids issues of the destructor not triggering destruction
1365  // of derived classes, and any member variables contained in them.
1366  // Note: Not virtual to avoid need for a vtable when possible.
1367  ~client() = default;
1368 
1369 private:
1370 
1371 #if defined(MQTT_USE_TLS)
1372 
1373  template <typename T>
1374  struct has_tls : std::false_type {
1375  };
1376 
1377  template <typename U>
1378  struct has_tls<client<tcp_endpoint<tls::stream<as::ip::tcp::socket>, U>>> : std::true_type {
1379  };
1380 
1381 #if defined(MQTT_USE_WS)
1382 
1383  template <typename U>
1384  struct has_tls<client<ws_endpoint<tls::stream<as::ip::tcp::socket>, U>>> : std::true_type {
1385  };
1386 
1387 #endif // defined(MQTT_USE_WS)
1388 
1389 #endif // defined(MQTT_USE_TLS)
1390 
1391  std::shared_ptr<Socket> socket_;
1392  as::io_context& ioc_;
1393  as::steady_timer tim_ping_;
1394  as::steady_timer tim_close_;
1395  std::string host_;
1396  std::string port_;
1397  std::uint16_t keep_alive_sec_{0};
1398  std::chrono::steady_clock::duration ping_duration_{std::chrono::steady_clock::duration::zero()};
1399  std::string client_id_;
1400  optional<will> will_;
1401  optional<std::string> user_name_;
1402  optional<std::string> password_;
1403  bool async_pingreq_ = false;
1404 #if defined(MQTT_USE_TLS)
1405  tls::context ctx_{tls::context::tlsv12};
1406 #endif // defined(MQTT_USE_TLS)
1407 #if defined(MQTT_USE_WS)
1408  std::string path_;
1409 #endif // defined(MQTT_USE_WS)
1410 };
1411 
1412 inline std::shared_ptr<callable_overlay<client<tcp_endpoint<as::ip::tcp::socket, as::io_context::strand>>>>
1413 make_client(as::io_context& ioc, std::string host, std::string port, protocol_version version = protocol_version::v3_1_1) {
1415  return std::make_shared<callable_overlay<client_t>>(
1416  client_t::constructor_access(),
1417  ioc,
1418  force_move(host),
1419  force_move(port),
1420 #if defined(MQTT_USE_WS)
1421  "/",
1422 #endif // defined(MQTT_USE_WS)
1423  version
1424  );
1425 }
1426 
1427 inline std::shared_ptr<callable_overlay<client<tcp_endpoint<as::ip::tcp::socket, as::io_context::strand>>>>
1428 make_client(as::io_context& ioc, std::string host, std::uint16_t port, protocol_version version = protocol_version::v3_1_1) {
1429  return make_client(
1430  ioc,
1431  force_move(host),
1432  std::to_string(port),
1433  version
1434  );
1435 }
1436 
1437 inline std::shared_ptr<callable_overlay<client<tcp_endpoint<as::ip::tcp::socket, null_strand>>>>
1438 make_client_no_strand(as::io_context& ioc, std::string host, std::string port, protocol_version version = protocol_version::v3_1_1) {
1440  return std::make_shared<callable_overlay<client_t>>(
1441  client_t::constructor_access(),
1442  ioc,
1443  force_move(host),
1444  force_move(port),
1445 #if defined(MQTT_USE_WS)
1446  "/",
1447 #endif // defined(MQTT_USE_WS)
1448  version
1449  );
1450 }
1451 
1452 inline std::shared_ptr<callable_overlay<client<tcp_endpoint<as::ip::tcp::socket, null_strand>>>>
1453 make_client_no_strand(as::io_context& ioc, std::string host, std::uint16_t port, protocol_version version = protocol_version::v3_1_1) {
1454  return make_client_no_strand(
1455  ioc,
1456  force_move(host),
1457  std::to_string(port),
1458  version
1459  );
1460 }
1461 
1462 #if defined(MQTT_USE_WS)
1463 
1464 inline std::shared_ptr<callable_overlay<client<ws_endpoint<as::ip::tcp::socket, as::io_context::strand>>>>
1465 make_client_ws(as::io_context& ioc, std::string host, std::string port, std::string path = "/", protocol_version version = protocol_version::v3_1_1) {
1466  using client_t = client<ws_endpoint<as::ip::tcp::socket, as::io_context::strand>>;
1467  return std::make_shared<callable_overlay<client_t>>(
1468  client_t::constructor_access(),
1469  ioc,
1470  force_move(host),
1471  force_move(port),
1472  force_move(path),
1473  version
1474  );
1475 }
1476 
1477 inline std::shared_ptr<callable_overlay<client<ws_endpoint<as::ip::tcp::socket, as::io_context::strand>>>>
1478 make_client_ws(as::io_context& ioc, std::string host, std::uint16_t port, std::string path = "/", protocol_version version = protocol_version::v3_1_1) {
1479  return make_client_ws(
1480  ioc,
1481  force_move(host),
1482  std::to_string(port),
1483  force_move(path),
1484  version
1485  );
1486 }
1487 
1488 inline std::shared_ptr<callable_overlay<client<ws_endpoint<as::ip::tcp::socket, null_strand>>>>
1489 make_client_no_strand_ws(as::io_context& ioc, std::string host, std::string port, std::string path = "/", protocol_version version = protocol_version::v3_1_1) {
1490  using client_t = client<ws_endpoint<as::ip::tcp::socket, null_strand>>;
1491  return std::make_shared<callable_overlay<client_t>>(
1492  client_t::constructor_access(),
1493  ioc,
1494  force_move(host),
1495  force_move(port),
1496  force_move(path),
1497  version
1498  );
1499 }
1500 
1501 inline std::shared_ptr<callable_overlay<client<ws_endpoint<as::ip::tcp::socket, null_strand>>>>
1502 make_client_no_strand_ws(as::io_context& ioc, std::string host, std::uint16_t port, std::string path = "/", protocol_version version = protocol_version::v3_1_1) {
1503  return make_client_no_strand_ws(
1504  ioc,
1505  force_move(host),
1506  std::to_string(port),
1507  force_move(path),
1508  version
1509  );
1510 }
1511 
1512 #endif // defined(MQTT_USE_WS)
1513 
1514 #if defined(MQTT_USE_TLS)
1515 
1516 inline std::shared_ptr<callable_overlay<client<tcp_endpoint<tls::stream<as::ip::tcp::socket>, as::io_context::strand>>>>
1517 make_tls_client(as::io_context& ioc, std::string host, std::string port, protocol_version version = protocol_version::v3_1_1) {
1518  using client_t = client<tcp_endpoint<tls::stream<as::ip::tcp::socket>, as::io_context::strand>>;
1519  return std::make_shared<callable_overlay<client_t>>(
1520  client_t::constructor_access(),
1521  ioc,
1522  force_move(host),
1523  force_move(port),
1524 #if defined(MQTT_USE_WS)
1525  "/",
1526 #endif // defined(MQTT_USE_WS)
1527  version
1528  );
1529 }
1530 
1531 inline std::shared_ptr<callable_overlay<client<tcp_endpoint<tls::stream<as::ip::tcp::socket>, as::io_context::strand>>>>
1532 make_tls_client(as::io_context& ioc, std::string host, std::uint16_t port, protocol_version version = protocol_version::v3_1_1) {
1533  return make_tls_client(
1534  ioc,
1535  force_move(host),
1536  std::to_string(port),
1537  version
1538  );
1539 }
1540 
1541 inline std::shared_ptr<callable_overlay<client<tcp_endpoint<tls::stream<as::ip::tcp::socket>, null_strand>>>>
1542 make_tls_client_no_strand(as::io_context& ioc, std::string host, std::string port, protocol_version version = protocol_version::v3_1_1) {
1543  using client_t = client<tcp_endpoint<tls::stream<as::ip::tcp::socket>, null_strand>>;
1544  return std::make_shared<callable_overlay<client_t>>(
1545  client_t::constructor_access(),
1546  ioc,
1547  force_move(host),
1548  force_move(port),
1549 #if defined(MQTT_USE_WS)
1550  "/",
1551 #endif // defined(MQTT_USE_WS)
1552  version
1553  );
1554 }
1555 
1556 inline std::shared_ptr<callable_overlay<client<tcp_endpoint<tls::stream<as::ip::tcp::socket>, null_strand>>>>
1557 make_tls_client_no_strand(as::io_context& ioc, std::string host, std::uint16_t port, protocol_version version = protocol_version::v3_1_1) {
1558  return make_tls_client_no_strand(
1559  ioc,
1560  force_move(host),
1561  std::to_string(port),
1562  version
1563  );
1564 }
1565 
1566 #if defined(MQTT_USE_WS)
1567 
1568 inline std::shared_ptr<callable_overlay<client<ws_endpoint<tls::stream<as::ip::tcp::socket>, as::io_context::strand>>>>
1569 make_tls_client_ws(as::io_context& ioc, std::string host, std::string port, std::string path = "/", protocol_version version = protocol_version::v3_1_1) {
1570  using client_t = client<ws_endpoint<tls::stream<as::ip::tcp::socket>, as::io_context::strand>>;
1571  return std::make_shared<callable_overlay<client_t>>(
1572  client_t::constructor_access(),
1573  ioc,
1574  force_move(host),
1575  force_move(port),
1576  force_move(path),
1577  version
1578  );
1579 }
1580 
1581 inline std::shared_ptr<callable_overlay<client<ws_endpoint<tls::stream<as::ip::tcp::socket>, as::io_context::strand>>>>
1582 make_tls_client_ws(as::io_context& ioc, std::string host, std::uint16_t port, std::string path = "/", protocol_version version = protocol_version::v3_1_1) {
1583  return make_tls_client_ws(
1584  ioc,
1585  force_move(host),
1586  std::to_string(port),
1587  force_move(path),
1588  version
1589  );
1590 }
1591 
1592 inline std::shared_ptr<callable_overlay<client<ws_endpoint<tls::stream<as::ip::tcp::socket>, null_strand>>>>
1593 make_tls_client_no_strand_ws(as::io_context& ioc, std::string host, std::string port, std::string path = "/", protocol_version version = protocol_version::v3_1_1) {
1594  using client_t = client<ws_endpoint<tls::stream<as::ip::tcp::socket>, null_strand>>;
1595  return std::make_shared<callable_overlay<client_t>>(
1596  client_t::constructor_access(),
1597  ioc,
1598  force_move(host),
1599  force_move(port),
1600  force_move(path),
1601  version
1602  );
1603 }
1604 
1605 inline std::shared_ptr<callable_overlay<client<ws_endpoint<tls::stream<as::ip::tcp::socket>, null_strand>>>>
1606 make_tls_client_no_strand_ws(as::io_context& ioc, std::string host, std::uint16_t port, std::string path = "/", protocol_version version = protocol_version::v3_1_1) {
1607  return make_tls_client_no_strand_ws(
1608  ioc,
1609  force_move(host),
1610  std::to_string(port),
1611  force_move(path),
1612  version
1613  );
1614 }
1615 
1616 #endif // defined(MQTT_USE_WS)
1617 
1618 #endif // defined(MQTT_USE_TLS)
1619 
1620 
1621 // 32bit Packet Id (experimental)
1622 
1623 inline std::shared_ptr<callable_overlay<client<tcp_endpoint<as::ip::tcp::socket, as::io_context::strand>, 4>>>
1624 make_client_32(as::io_context& ioc, std::string host, std::string port, protocol_version version = protocol_version::v3_1_1) {
1626  return std::make_shared<callable_overlay<client_t>>(
1627  client_t::constructor_access(),
1628  ioc,
1629  force_move(host),
1630  force_move(port),
1631 #if defined(MQTT_USE_WS)
1632  "/",
1633 #endif // defined(MQTT_USE_WS)
1634  version
1635  );
1636 }
1637 
1638 inline std::shared_ptr<callable_overlay<client<tcp_endpoint<as::ip::tcp::socket, as::io_context::strand>, 4>>>
1639 make_client_32(as::io_context& ioc, std::string host, std::uint16_t port, protocol_version version = protocol_version::v3_1_1) {
1640  return make_client_32(
1641  ioc,
1642  force_move(host),
1643  std::to_string(port),
1644  version
1645  );
1646 }
1647 
1648 inline std::shared_ptr<callable_overlay<client<tcp_endpoint<as::ip::tcp::socket, null_strand>, 4>>>
1649 make_client_no_strand_32(as::io_context& ioc, std::string host, std::string port, protocol_version version = protocol_version::v3_1_1) {
1651  return std::make_shared<callable_overlay<client_t>>(
1652  client_t::constructor_access(),
1653  ioc,
1654  force_move(host),
1655  force_move(port),
1656 #if defined(MQTT_USE_WS)
1657  "/",
1658 #endif // defined(MQTT_USE_WS)
1659  version
1660  );
1661 }
1662 
1663 inline std::shared_ptr<callable_overlay<client<tcp_endpoint<as::ip::tcp::socket, null_strand>, 4>>>
1664 make_client_no_strand_32(as::io_context& ioc, std::string host, std::uint16_t port, protocol_version version = protocol_version::v3_1_1) {
1665  return make_client_no_strand_32(
1666  ioc,
1667  force_move(host),
1668  std::to_string(port),
1669  version
1670  );
1671 }
1672 
1673 #if defined(MQTT_USE_WS)
1674 
1675 inline std::shared_ptr<callable_overlay<client<ws_endpoint<as::ip::tcp::socket, as::io_context::strand>, 4>>>
1676 make_client_ws_32(as::io_context& ioc, std::string host, std::string port, std::string path = "/", protocol_version version = protocol_version::v3_1_1) {
1677  using client_t = client<ws_endpoint<as::ip::tcp::socket, as::io_context::strand>, 4>;
1678  return std::make_shared<callable_overlay<client_t>>(
1679  client_t::constructor_access(),
1680  ioc,
1681  force_move(host),
1682  force_move(port),
1683  force_move(path),
1684  version
1685  );
1686 }
1687 
1688 inline std::shared_ptr<callable_overlay<client<ws_endpoint<as::ip::tcp::socket, as::io_context::strand>, 4>>>
1689 make_client_ws_32(as::io_context& ioc, std::string host, std::uint16_t port, std::string path = "/", protocol_version version = protocol_version::v3_1_1) {
1690  return make_client_ws_32(
1691  ioc,
1692  force_move(host),
1693  std::to_string(port),
1694  force_move(path),
1695  version
1696  );
1697 }
1698 
1699 inline std::shared_ptr<callable_overlay<client<ws_endpoint<as::ip::tcp::socket, null_strand>, 4>>>
1700 make_client_no_strand_ws_32(as::io_context& ioc, std::string host, std::string port, std::string path = "/", protocol_version version = protocol_version::v3_1_1) {
1701  using client_t = client<ws_endpoint<as::ip::tcp::socket, null_strand>, 4>;
1702  return std::make_shared<callable_overlay<client_t>>(
1703  client_t::constructor_access(),
1704  ioc,
1705  force_move(host),
1706  force_move(port),
1707  force_move(path),
1708  version
1709  );
1710 }
1711 
1712 inline std::shared_ptr<callable_overlay<client<ws_endpoint<as::ip::tcp::socket, null_strand>, 4>>>
1713 make_client_no_strand_ws_32(as::io_context& ioc, std::string host, std::uint16_t port, std::string path = "/", protocol_version version = protocol_version::v3_1_1) {
1714  return make_client_no_strand_ws_32(
1715  ioc,
1716  force_move(host),
1717  std::to_string(port),
1718  force_move(path),
1719  version
1720  );
1721 }
1722 
1723 #endif // defined(MQTT_USE_WS)
1724 
1725 #if defined(MQTT_USE_TLS)
1726 
1727 inline std::shared_ptr<callable_overlay<client<tcp_endpoint<tls::stream<as::ip::tcp::socket>, as::io_context::strand>, 4>>>
1728 make_tls_client_32(as::io_context& ioc, std::string host, std::string port, protocol_version version = protocol_version::v3_1_1) {
1729  using client_t = client<tcp_endpoint<tls::stream<as::ip::tcp::socket>, as::io_context::strand>, 4>;
1730  return std::make_shared<callable_overlay<client_t>>(
1731  client_t::constructor_access(),
1732  ioc,
1733  force_move(host),
1734  force_move(port),
1735 #if defined(MQTT_USE_WS)
1736  "/",
1737 #endif // defined(MQTT_USE_WS)
1738  version
1739  );
1740 }
1741 
1742 inline std::shared_ptr<callable_overlay<client<tcp_endpoint<tls::stream<as::ip::tcp::socket>, as::io_context::strand>, 4>>>
1743 make_tls_client_32(as::io_context& ioc, std::string host, std::uint16_t port, protocol_version version = protocol_version::v3_1_1) {
1744  return make_tls_client_32(
1745  ioc,
1746  force_move(host),
1747  std::to_string(port),
1748  version
1749  );
1750 }
1751 
1752 inline std::shared_ptr<callable_overlay<client<tcp_endpoint<tls::stream<as::ip::tcp::socket>, null_strand>, 4>>>
1753 make_tls_client_no_strand_32(as::io_context& ioc, std::string host, std::string port, protocol_version version = protocol_version::v3_1_1) {
1754  using client_t = client<tcp_endpoint<tls::stream<as::ip::tcp::socket>, null_strand>, 4>;
1755  return std::make_shared<callable_overlay<client_t>>(
1756  client_t::constructor_access(),
1757  ioc,
1758  force_move(host),
1759  force_move(port),
1760 #if defined(MQTT_USE_WS)
1761  "/",
1762 #endif // defined(MQTT_USE_WS)
1763  version
1764  );
1765 }
1766 
1767 inline std::shared_ptr<callable_overlay<client<tcp_endpoint<tls::stream<as::ip::tcp::socket>, null_strand>, 4>>>
1768 make_tls_client_no_strand_32(as::io_context& ioc, std::string host, std::uint16_t port, protocol_version version = protocol_version::v3_1_1) {
1769  return make_tls_client_no_strand_32(
1770  ioc,
1771  force_move(host),
1772  std::to_string(port),
1773  version
1774  );
1775 }
1776 
1777 #if defined(MQTT_USE_WS)
1778 
1779 inline std::shared_ptr<callable_overlay<client<ws_endpoint<tls::stream<as::ip::tcp::socket>, as::io_context::strand>, 4>>>
1780 make_tls_client_ws_32(as::io_context& ioc, std::string host, std::string port, std::string path = "/", protocol_version version = protocol_version::v3_1_1) {
1781  using client_t = client<ws_endpoint<tls::stream<as::ip::tcp::socket>, as::io_context::strand>, 4>;
1782  return std::make_shared<callable_overlay<client_t>>(
1783  client_t::constructor_access(),
1784  ioc,
1785  force_move(host),
1786  force_move(port),
1787  force_move(path),
1788  version
1789  );
1790 }
1791 
1792 inline std::shared_ptr<callable_overlay<client<ws_endpoint<tls::stream<as::ip::tcp::socket>, as::io_context::strand>, 4>>>
1793 make_tls_client_ws_32(as::io_context& ioc, std::string host, std::uint16_t port, std::string path = "/", protocol_version version = protocol_version::v3_1_1) {
1794  return make_tls_client_ws_32(
1795  ioc,
1796  force_move(host),
1797  std::to_string(port),
1798  force_move(path),
1799  version
1800  );
1801 }
1802 
1803 inline std::shared_ptr<callable_overlay<client<ws_endpoint<tls::stream<as::ip::tcp::socket>, null_strand>, 4>>>
1804 make_tls_client_no_strand_ws_32(as::io_context& ioc, std::string host, std::string port, std::string path = "/", protocol_version version = protocol_version::v3_1_1) {
1805  using client_t = client<ws_endpoint<tls::stream<as::ip::tcp::socket>, null_strand>, 4>;
1806  return std::make_shared<callable_overlay<client_t>>(
1807  client_t::constructor_access(),
1808  ioc,
1809  force_move(host),
1810  force_move(port),
1811  force_move(path),
1812  version
1813  );
1814 }
1815 
1816 inline std::shared_ptr<callable_overlay<client<ws_endpoint<tls::stream<as::ip::tcp::socket>, null_strand>, 4>>>
1817 make_tls_client_no_strand_ws_32(as::io_context& ioc, std::string host, std::uint16_t port, std::string path = "/", protocol_version version = protocol_version::v3_1_1) {
1818  return make_tls_client_no_strand_ws_32(
1819  ioc,
1820  force_move(host),
1821  std::to_string(port),
1822  force_move(path),
1823  version
1824  );
1825 }
1826 
1827 #endif // defined(MQTT_USE_WS)
1828 
1829 #endif // defined(MQTT_USE_TLS)
1830 
1831 } // namespace MQTT_NS
1832 
1833 #endif // MQTT_CLIENT_HPP
Definition: client.hpp:36
std::shared_ptr< Socket > const & socket() const
Definition: client.hpp:936
void connect(v5::properties props, any session_life_keeper=any())
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:448
std::enable_if_t< !std::is_convertible< T, async_handler_t >::value > async_connect(std::shared_ptr< Socket > &&socket, v5::properties props, T session_life_keeper)
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:697
void set_password(std::string password)
Set password.
Definition: client.hpp:322
std::string const & get_client_id() const
Get the client id.
Definition: client.hpp:272
void connect(v5::properties props, boost::system::error_code &ec, any session_life_keeper=any())
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:462
friend std::shared_ptr< callable_overlay< client< tcp_endpoint< as::ip::tcp::socket, as::io_context::strand > > > > make_client(as::io_context &ioc, std::string host, std::string port, protocol_version version)
Create no tls client with strand.
Definition: client.hpp:1413
void async_connect(any session_life_keeper, async_handler_t func)
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:573
friend std::shared_ptr< callable_overlay< client< tcp_endpoint< as::ip::tcp::socket, null_strand >, 4 > > > make_client_no_strand_32(as::io_context &ioc, std::string host, std::string port, protocol_version version)
Create no tls client without strand.
Definition: client.hpp:1649
client(as::io_context &ioc, std::string host, std::string port, protocol_version version=protocol_version::v3_1_1, bool async_store_send=false)
Definition: client.hpp:945
void async_disconnect(std::chrono::steady_clock::duration timeout, v5::disconnect_reason_code reason_code, v5::properties props, async_handler_t func=async_handler_t())
Disconnect Send a disconnect packet to the connected broker. It is a clean disconnecting sequence....
Definition: client.hpp:849
void async_connect(v5::properties props, any session_life_keeper, async_handler_t func)
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:615
void set_will(Args &&... args)
Definition: client.hpp:338
void async_connect(std::shared_ptr< Socket > &&socket)
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:626
void set_async_pingreq(bool b)
Set pingreq message sending mode.
Definition: client.hpp:932
std::shared_ptr< Socket > & socket()
Definition: client.hpp:940
friend std::shared_ptr< callable_overlay< client< tcp_endpoint< as::ip::tcp::socket, as::io_context::strand >, 4 > > > make_client_32(as::io_context &ioc, std::string host, std::string port, protocol_version version)
Create no tls client with strand.
Definition: client.hpp:1624
void async_connect(std::shared_ptr< Socket > &&socket, async_handler_t func)
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:653
void set_clean_start(bool cs)
Set clean start.
Definition: client.hpp:298
void on_error(error_code ec) noexcept override
Error handler.
Definition: client.hpp:1357
void connect(std::shared_ptr< Socket > &&socket, v5::properties props, any session_life_keeper=any())
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:506
void async_connect(std::shared_ptr< Socket > &&socket, any session_life_keeper, async_handler_t func)
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:665
void disconnect(std::chrono::steady_clock::duration timeout, v5::disconnect_reason_code reason_code=v5::disconnect_reason_code::normal_disconnection, v5::properties props={})
Disconnect Send a disconnect packet to the connected broker. It is a clean disconnecting sequence....
Definition: client.hpp:748
void async_connect()
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:540
~client()=default
void on_close() noexcept override
Close handler.
Definition: client.hpp:1353
void connect(std::shared_ptr< Socket > &&socket, boost::system::error_code &ec, any session_life_keeper=any())
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:489
void async_connect(async_handler_t func)
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:563
void set_client_id(std::string id)
Set client id.
Definition: client.hpp:264
void set_keep_alive_sec_ping_ms(std::uint16_t keep_alive_sec, std::size_t ping_ms)
Set a keep alive second and a ping milli seconds.
Definition: client.hpp:403
void async_connect(std::shared_ptr< Socket > &&socket, v5::properties props, any session_life_keeper, async_handler_t func)
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:726
void set_port(std::string port)
Set port.
Definition: client.hpp:252
void set_host(std::string host)
Set host.
Definition: client.hpp:242
void on_pre_send() noexcept override
Pre-send handler This handler is called when any mqtt control packet is decided to send.
Definition: client.hpp:1317
std::enable_if_t< !std::is_convertible< T, async_handler_t >::value > async_connect(v5::properties props, T session_life_keeper)
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:590
void async_connect(std::shared_ptr< Socket > &&socket, v5::properties props)
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:678
typename base::async_handler_t async_handler_t
Definition: client.hpp:42
void async_disconnect(v5::disconnect_reason_code reason_code, v5::properties props, async_handler_t func=async_handler_t())
Disconnect Send a disconnect packet to the connected broker. It is a clean disconnecting sequence....
Definition: client.hpp:907
void force_disconnect()
Disconnect by endpoint Force disconnect. It is not a clean disconnect sequence. When the endpoint di...
Definition: client.hpp:922
void set_keep_alive_sec(std::uint16_t keep_alive_sec, std::chrono::steady_clock::duration ping)
Set a keep alive second and a ping duration.
Definition: client.hpp:380
void async_connect(std::shared_ptr< Socket > &&socket, v5::properties props, async_handler_t func)
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:711
client(constructor_access, Args &&... args)
Definition: client.hpp:48
void set_will(will w)
Set will.
Definition: client.hpp:333
void async_disconnect(async_handler_t func=async_handler_t())
Disconnect Send a disconnect packet to the connected broker. It is a clean disconnecting sequence....
Definition: client.hpp:883
void async_connect(v5::properties props, async_handler_t func)
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:602
void set_clean_session(bool cs)
Set clean session.
Definition: client.hpp:285
friend std::shared_ptr< callable_overlay< client< tcp_endpoint< as::ip::tcp::socket, null_strand > > > > make_client_no_strand(as::io_context &ioc, std::string host, std::string port, protocol_version version)
Create no tls client without strand.
Definition: client.hpp:1438
void set_user_name(std::string name)
Set username.
Definition: client.hpp:310
std::enable_if_t< !std::is_convertible< T, async_handler_t >::value > async_connect(T session_life_keeper)
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:554
void connect(any session_life_keeper=any())
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:426
void connect(std::shared_ptr< Socket > &&socket, v5::properties props, boost::system::error_code &ec, any session_life_keeper=any())
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:526
void connect(std::shared_ptr< Socket > &&socket, any session_life_keeper=any())
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:477
std::enable_if_t< !std::is_convertible< T, async_handler_t >::value > async_connect(std::shared_ptr< Socket > &&socket, T session_life_keeper)
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:642
void connect(boost::system::error_code &ec, any session_life_keeper=any())
Connect to a broker Before calling connect(), call set_xxx member functions to configure the connecti...
Definition: client.hpp:436
void async_disconnect(std::chrono::steady_clock::duration timeout, async_handler_t func=async_handler_t())
Disconnect Send a disconnect packet to the connected broker. It is a clean disconnecting sequence....
Definition: client.hpp:808
void disconnect(v5::disconnect_reason_code reason_code=v5::disconnect_reason_code::normal_disconnection, v5::properties props={})
Disconnect Send a disconnect packet to the connected broker. It is a clean disconnecting sequence....
Definition: client.hpp:789
void set_keep_alive_sec(std::uint16_t keep_alive_sec)
Set a keep alive second and a ping milli seconds.
Definition: client.hpp:416
Definition: endpoint.hpp:171
bool connected() const
Check connection status.
Definition: endpoint.hpp:4710
std::function< void(error_code ec)> async_handler_t
Definition: endpoint.hpp:176
void disconnect(v5::disconnect_reason_code reason=v5::disconnect_reason_code::normal_disconnection, v5::properties props={})
Disconnect Send a disconnect packet to the connected broker. It is a clean disconnecting sequence....
Definition: endpoint.hpp:1014
std::shared_ptr< MQTT_NS::socket > & socket_sp_ref()
Get shared_ptr of socket.
Definition: endpoint.hpp:4799
Definition: type_erased_socket.hpp:22
Definition: tcp_endpoint.hpp:23
Definition: will.hpp:21
std::vector< property_variant > properties
Definition: property_variant.hpp:51
disconnect_reason_code
Definition: reason_code.hpp:114
Definition: any.hpp:27
std::shared_ptr< callable_overlay< client< tcp_endpoint< as::ip::tcp::socket, as::io_context::strand >, 4 > > > make_client_32(as::io_context &ioc, std::string host, std::uint16_t port, protocol_version version=protocol_version::v3_1_1)
Definition: client.hpp:1639
boost::string_ref string_view
Definition: string_view.hpp:64
std::shared_ptr< callable_overlay< client< tcp_endpoint< as::ip::tcp::socket, null_strand >, 4 > > > make_client_no_strand_32(as::io_context &ioc, std::string host, std::uint16_t port, protocol_version version=protocol_version::v3_1_1)
Definition: client.hpp:1664
std::shared_ptr< callable_overlay< client< tcp_endpoint< as::ip::tcp::socket, as::io_context::strand > > > > make_client(as::io_context &ioc, std::string host, std::uint16_t port, protocol_version version=protocol_version::v3_1_1)
Definition: client.hpp:1428
boost::system::error_code error_code
Definition: error_code.hpp:16
constexpr std::remove_reference_t< T > && force_move(T &&t)
Definition: move.hpp:20
protocol_version
Definition: protocol_version.hpp:17
std::shared_ptr< callable_overlay< client< tcp_endpoint< as::ip::tcp::socket, null_strand > > > > make_client_no_strand(as::io_context &ioc, std::string host, std::uint16_t port, protocol_version version=protocol_version::v3_1_1)
Definition: client.hpp:1453
Definition: buffer.hpp:242
const_buffer buffer(MQTT_NS::buffer const &data)
create boost::asio::const_buffer from the MQTT_NS::buffer boost::asio::const_buffer is a kind of view...
Definition: buffer.hpp:253
Definition: client.hpp:40
Definition: null_strand.hpp:20