mqtt_cpp
message.hpp
Go to the documentation of this file.
1 // Copyright Takatoshi Kondo 2018
2 //
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 
7 #if !defined(MQTT_MESSAGE_HPP)
8 #define MQTT_MESSAGE_HPP
9 
10 #include <string>
11 #include <vector>
12 #include <memory>
13 #include <algorithm>
14 #include <numeric>
15 
16 #include <boost/asio/buffer.hpp>
17 #include <boost/container/static_vector.hpp>
18 #include <boost/numeric/conversion/cast.hpp>
19 
20 #include <mqtt/namespace.hpp>
21 #include <mqtt/two_byte_util.hpp>
22 #include <mqtt/fixed_header.hpp>
26 #include <mqtt/will.hpp>
27 #include <mqtt/connect_flags.hpp>
28 #include <mqtt/publish.hpp>
29 #include <mqtt/exception.hpp>
31 #include <mqtt/four_byte_util.hpp>
32 #include <mqtt/packet_id_type.hpp>
33 #include <mqtt/optional.hpp>
34 #include <mqtt/string_view.hpp>
35 #include <mqtt/property.hpp>
36 #include <mqtt/string_check.hpp>
37 #include <mqtt/move.hpp>
38 #include <mqtt/reason_code.hpp>
40 #include <mqtt/publish.hpp>
41 
42 namespace MQTT_NS {
43 
44 namespace as = boost::asio;
45 
46 inline namespace v3_1_1 {
47 
48 namespace detail_v3_1_1 {
49 
51 public:
55  header_only_message(control_packet_type type, std::uint8_t flags)
56  : message_ { static_cast<char>(make_fixed_header(type, flags)), 0 }
57  {}
58 
64  std::vector<as::const_buffer> const_buffer_sequence() const {
65  return { as::buffer(message_.data(), message_.size()) };
66  }
67 
72  std::size_t size() const {
73  return message_.size();
74  }
75 
80  static constexpr std::size_t num_of_const_buffer_sequence() {
81  return 1;
82  }
83 
90  std::string continuous_buffer() const {
91  return std::string(message_.data(), size());
92  }
93 private:
94  boost::container::static_vector<char, 2> message_;
95 };
96 
97 
98 template <std::size_t PacketIdBytes>
99 class basic_header_packet_id_message;
100 
101 
102 
103 template <std::size_t PacketIdBytes>
105 public:
110  : message_ { static_cast<char>(make_fixed_header(type, flags)), PacketIdBytes }
111  {
112  add_packet_id_to_buf<PacketIdBytes>::apply(message_, packet_id);
113  }
114 
115  template <typename Iterator>
116  basic_header_packet_id_message(Iterator b, Iterator e) {
117  if (std::distance(b, e) != 2 + PacketIdBytes) throw remaining_length_error();
118  if (b[1] != PacketIdBytes) throw remaining_length_error();
119 
120  std::copy(b, e, std::back_inserter(message_));
121  }
122 
128  std::vector<as::const_buffer> const_buffer_sequence() const {
129  return { as::buffer(message_.data(), size()) };
130  }
131 
136  std::size_t size() const {
137  return message_.size();
138  }
139 
144  static constexpr std::size_t num_of_const_buffer_sequence() {
145  return 1;
146  }
147 
154  std::string continuous_buffer() const {
155  return std::string(message_.data(), size());
156  }
157 protected:
158  boost::container::static_vector<char, 2 + PacketIdBytes> const& message() const {
159  return message_;
160  }
161 
162 private:
163  boost::container::static_vector<char, 2 + PacketIdBytes> message_;
164 };
165 
166 } // namespace detail_v3_1_1
167 
168 template <std::size_t PacketIdBytes>
169 struct basic_puback_message : detail_v3_1_1::basic_header_packet_id_message<PacketIdBytes> {
172  : base(control_packet_type::puback, 0b0000, packet_id)
173  {}
174 };
175 
177 
178 template <std::size_t PacketIdBytes>
179 struct basic_pubrec_message : detail_v3_1_1::basic_header_packet_id_message<PacketIdBytes> {
182  : base(control_packet_type::pubrec, 0b0000, packet_id)
183  {}
184 };
185 
187 
188 template <std::size_t PacketIdBytes>
189 struct basic_pubrel_message : detail_v3_1_1::basic_header_packet_id_message<PacketIdBytes> {
193  {
194  }
195 
197  : base(buf.begin(), buf.end())
198  {
199  }
200 
206  return make_packet_id<PacketIdBytes>::apply(std::next(base::message().begin(), 2), base::message().end());
207  }
208 };
209 
212 
213 template <std::size_t PacketIdBytes>
214 struct basic_pubcomp_message : detail_v3_1_1::basic_header_packet_id_message<PacketIdBytes> {
216  : detail_v3_1_1::basic_header_packet_id_message<PacketIdBytes>(control_packet_type::pubcomp, 0b0000, packet_id)
217  {}
218 };
219 
221 
222 template <std::size_t PacketIdBytes>
223 struct basic_unsuback_message : detail_v3_1_1::basic_header_packet_id_message<PacketIdBytes> {
225  : detail_v3_1_1::basic_header_packet_id_message<PacketIdBytes>(control_packet_type::unsuback, 0b0000, packet_id)
226  {}
227 };
228 
230 
231 struct pingreq_message : detail_v3_1_1::header_only_message {
233  : detail_v3_1_1::header_only_message(control_packet_type::pingreq, 0b0000)
234  {}
235 };
236 
237 struct pingresp_message : detail_v3_1_1::header_only_message {
239  : detail_v3_1_1::header_only_message(control_packet_type::pingresp, 0b0000)
240  {}
241 };
242 
243 struct disconnect_message : detail_v3_1_1::header_only_message {
245  : detail_v3_1_1::header_only_message(control_packet_type::disconnect, 0b0000)
246  {}
247 };
248 
250 public:
251  connack_message(bool session_present, connect_return_code return_code)
252  : message_ {
253  static_cast<char>(make_fixed_header(control_packet_type::connack, 0b0000)),
254  0b0010,
255  static_cast<char>(session_present ? 1 : 0),
256  static_cast<char>(return_code)
257  }
258  {}
259 
265  std::vector<as::const_buffer> const_buffer_sequence() const {
266  return { as::buffer(message_.data(), size()) };
267  }
268 
273  std::size_t size() const {
274  return message_.size();
275  }
276 
281  static constexpr std::size_t num_of_const_buffer_sequence() {
282  return 1;
283  }
284 
291  std::string continuous_buffer() const {
292  return std::string(message_.data(), size());
293  }
294 
295 private:
296  boost::container::static_vector<char, 4> message_;
297 };
298 
299 // variable length messages
300 
302 public:
304  std::uint16_t keep_alive_sec,
305  buffer client_id,
306  bool clean_session,
307  optional<will> w,
308  optional<buffer> user_name,
309  optional<buffer> password
310  )
311  : fixed_header_(static_cast<char>(make_fixed_header(control_packet_type::connect, 0b0000))),
312  connect_flags_(0),
313  // protocol name length, protocol name, protocol level, connect flag, client id length, client id, keep alive
314  remaining_length_(
315  2 + // protocol name length
316  4 + // protocol name
317  1 + // protocol level
318  1 + // connect flag
319  2 + // keep alive
320  2 + // client id length
321  client_id.size() // client id
322  ),
323  protocol_name_and_level_ { 0x00, 0x04, 'M', 'Q', 'T', 'T', 0x04 },
324  client_id_(force_move(client_id)),
325  client_id_length_buf_{ num_to_2bytes(boost::numeric_cast<std::uint16_t>(client_id_.size())) },
326  keep_alive_buf_ { num_to_2bytes(keep_alive_sec) }
327  {
328  utf8string_check(client_id_);
329  if (clean_session) connect_flags_ |= connect_flags::clean_session;
330  if (user_name) {
331  utf8string_check(user_name.value());
332  connect_flags_ |= connect_flags::user_name_flag;
333  user_name_ = force_move(user_name.value());
334  add_uint16_t_to_buf(user_name_length_buf_, boost::numeric_cast<std::uint16_t>(user_name_.size()));
335 
336  remaining_length_ += 2 + user_name_.size();
337  }
338  if (password) {
339  connect_flags_ |= connect_flags::password_flag;
340  password_ = force_move(password.value());
341  add_uint16_t_to_buf(password_length_buf_, boost::numeric_cast<std::uint16_t>(password_.size()));
342 
343  remaining_length_ += 2 + password_.size();
344  }
345  if (w) {
346  connect_flags_ |= connect_flags::will_flag;
347  if (w.value().get_retain() == retain::yes) connect_flags_ |= connect_flags::will_retain;
348  connect_flags::set_will_qos(connect_flags_, w.value().get_qos());
349 
350  utf8string_check(w.value().topic());
351  will_topic_name_ = force_move(w.value().topic());
353  will_topic_name_length_buf_,
354  boost::numeric_cast<std::uint16_t>(will_topic_name_.size())
355  );
356  if (w.value().message().size() > 0xffffL) throw will_message_length_error();
357  will_message_ = force_move(w.value().message());
359  will_message_length_buf_,
360  boost::numeric_cast<std::uint16_t>(will_message_.size()));
361 
362  remaining_length_ += 2 + will_topic_name_.size() + 2 + will_message_.size();
363  }
364 
365  auto rb = remaining_bytes(remaining_length_);
366  for (auto e : rb) {
367  remaining_length_buf_.push_back(e);
368  }
369  }
370 
376  std::vector<as::const_buffer> const_buffer_sequence() const {
377  std::vector<as::const_buffer> ret;
378  ret.reserve(num_of_const_buffer_sequence());
379 
380  ret.emplace_back(as::buffer(&fixed_header_, 1));
381  ret.emplace_back(as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()));
382  ret.emplace_back(as::buffer(protocol_name_and_level_.data(), protocol_name_and_level_.size()));
383  ret.emplace_back(as::buffer(&connect_flags_, 1));
384  ret.emplace_back(as::buffer(keep_alive_buf_.data(), keep_alive_buf_.size()));
385 
386  ret.emplace_back(as::buffer(client_id_length_buf_.data(), client_id_length_buf_.size()));
387  ret.emplace_back(as::buffer(client_id_));
388 
389  if (connect_flags::has_will_flag(connect_flags_)) {
390  ret.emplace_back(as::buffer(will_topic_name_length_buf_.data(), will_topic_name_length_buf_.size()));
391  ret.emplace_back(as::buffer(will_topic_name_));
392  ret.emplace_back(as::buffer(will_message_length_buf_.data(), will_message_length_buf_.size()));
393  ret.emplace_back(as::buffer(will_message_));
394  }
395 
396  if (connect_flags::has_user_name_flag(connect_flags_)) {
397  ret.emplace_back(as::buffer(user_name_length_buf_.data(), user_name_length_buf_.size()));
398  ret.emplace_back(as::buffer(user_name_));
399  }
400 
401  if (connect_flags::has_password_flag(connect_flags_)) {
402  ret.emplace_back(as::buffer(password_length_buf_.data(), password_length_buf_.size()));
403  ret.emplace_back(as::buffer(password_));
404  }
405 
406  return ret;
407  }
408 
413  std::size_t size() const {
414  return
415  1 + // fixed header
416  remaining_length_buf_.size() +
417  remaining_length_;
418  }
419 
424  static constexpr std::size_t num_of_const_buffer_sequence() {
425  return
426  1 + // fixed header
427  1 + // remaining length
428  1 + // protocol name and level
429  1 + // connect flags
430  1 + // keep alive
431 
432  2 + // client id length, client id
433 
434  2 + // will topic name length, will topic name
435  2 + // will message length, will message
436  2 + // user name length, user name
437  2; // password length, password
438  }
439 
446  std::string continuous_buffer() const {
447  std::string ret;
448 
449  ret.reserve(size());
450 
451  ret.push_back(static_cast<char>(fixed_header_));
452  ret.append(remaining_length_buf_.data(), remaining_length_buf_.size());
453  ret.append(protocol_name_and_level_.data(), protocol_name_and_level_.size());
454  ret.push_back(connect_flags_);
455  ret.append(keep_alive_buf_.data(), keep_alive_buf_.size());
456 
457  ret.append(client_id_length_buf_.data(), client_id_length_buf_.size());
458  ret.append(client_id_.data(), client_id_.size());
459 
460  if (connect_flags::has_will_flag(connect_flags_)) {
461  ret.append(will_topic_name_length_buf_.data(), will_topic_name_length_buf_.size());
462  ret.append(will_topic_name_.data(), will_topic_name_.size());
463  ret.append(will_message_length_buf_.data(), will_message_length_buf_.size());
464  ret.append(will_message_.data(), will_message_.size());
465  }
466 
467  if (connect_flags::has_user_name_flag(connect_flags_)) {
468  ret.append(user_name_length_buf_.data(), user_name_length_buf_.size());
469  ret.append(user_name_.data(), user_name_.size());
470  }
471 
472  if (connect_flags::has_password_flag(connect_flags_)) {
473  ret.append(password_length_buf_.data(), password_length_buf_.size());
474  ret.append(password_.data(), password_.size());
475  }
476 
477  return ret;
478  }
479 
480 private:
481  std::uint8_t fixed_header_;
482  char connect_flags_;
483 
484  std::size_t remaining_length_;
485  boost::container::static_vector<char, 4> remaining_length_buf_;
486 
487  boost::container::static_vector<char, 7> protocol_name_and_level_;
488  buffer client_id_;
489  boost::container::static_vector<char, 2> client_id_length_buf_;
490 
491  buffer will_topic_name_;
492  boost::container::static_vector<char, 2> will_topic_name_length_buf_;
493  buffer will_message_;
494  boost::container::static_vector<char, 2> will_message_length_buf_;
495 
496  buffer user_name_;
497  boost::container::static_vector<char, 2> user_name_length_buf_;
498  buffer password_;
499  boost::container::static_vector<char, 2> password_length_buf_;
500 
501  boost::container::static_vector<char, 2> keep_alive_buf_;
502 };
503 
504 template <std::size_t PacketIdBytes>
506 public:
507  template <
508  typename ConstBufferSequence,
509  typename std::enable_if<
510  as::is_const_buffer_sequence<ConstBufferSequence>::value,
511  std::nullptr_t
512  >::type = nullptr
513  >
516  as::const_buffer topic_name,
517  ConstBufferSequence payloads,
518  publish_options pubopts
519  )
520  : fixed_header_(make_fixed_header(control_packet_type::publish, 0b0000) | pubopts.operator std::uint8_t()),
521  topic_name_(topic_name),
522  topic_name_length_buf_ { num_to_2bytes(boost::numeric_cast<std::uint16_t>(topic_name.size())) },
523  remaining_length_(
524  2 // topic name length
525  + topic_name_.size() // topic name
526  + ( (pubopts.get_qos() == qos::at_least_once || pubopts.get_qos() == qos::exactly_once)
527  ? PacketIdBytes // packet_id
528  : 0)
529  )
530  {
531  auto b = as::buffer_sequence_begin(payloads);
532  auto e = as::buffer_sequence_end(payloads);
533  auto num_of_payloads = static_cast<std::size_t>(std::distance(b, e));
534  payloads_.reserve(num_of_payloads);
535  for (; b != e; ++b) {
536  auto const& payload = *b;
537  remaining_length_ += payload.size();
538  payloads_.push_back(payload);
539  }
540 
541  utf8string_check(topic_name_);
542 
543  auto rb = remaining_bytes(remaining_length_);
544  for (auto e : rb) {
545  remaining_length_buf_.push_back(e);
546  }
547  if (pubopts.get_qos() == qos::at_least_once ||
548  pubopts.get_qos() == qos::exactly_once) {
549  packet_id_.reserve(PacketIdBytes);
551  }
552  }
553 
554  // Used in test code, and to deserialize stored messages.
556  if (buf.empty()) throw remaining_length_error();
557  fixed_header_ = static_cast<std::uint8_t>(buf.front());
558  qos qos_value = get_qos();
559  buf.remove_prefix(1);
560 
561  if (buf.empty()) throw remaining_length_error();
562  auto len_consumed = remaining_length(buf.begin(), buf.end());
563  remaining_length_ = std::get<0>(len_consumed);
564  auto consumed = std::get<1>(len_consumed);
565 
566  std::copy(
567  buf.begin(),
568  std::next(buf.begin(), static_cast<string_view::difference_type>(consumed)),
569  std::back_inserter(remaining_length_buf_));
570  buf.remove_prefix(consumed);
571 
572  if (buf.size() < 2) throw remaining_length_error();
573  std::copy(buf.begin(), std::next(buf.begin(), 2), std::back_inserter(topic_name_length_buf_));
574  auto topic_name_length = make_uint16_t(topic_name_length_buf_.begin(), topic_name_length_buf_.end());
575  buf.remove_prefix(2);
576 
577  if (buf.size() < topic_name_length) throw remaining_length_error();
578 
579  topic_name_ = as::buffer(buf.substr(0, topic_name_length));
580  utf8string_check(topic_name_);
581  buf.remove_prefix(topic_name_length);
582 
583  switch (qos_value) {
584  case qos::at_most_once:
585  break;
586  case qos::at_least_once:
587  case qos::exactly_once:
588  if (buf.size() < PacketIdBytes) throw remaining_length_error();
589  std::copy(buf.begin(), std::next(buf.begin(), PacketIdBytes), std::back_inserter(packet_id_));
590  buf.remove_prefix(PacketIdBytes);
591  break;
592  default:
593  throw protocol_error();
594  break;
595  };
596 
597  if (!buf.empty()) {
598  payloads_.emplace_back(as::buffer(buf));
599  }
600  }
601 
607  std::vector<as::const_buffer> const_buffer_sequence() const {
608  std::vector<as::const_buffer> ret;
609  ret.reserve(num_of_const_buffer_sequence());
610  ret.emplace_back(as::buffer(&fixed_header_, 1));
611  ret.emplace_back(as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()));
612  ret.emplace_back(as::buffer(topic_name_length_buf_.data(), topic_name_length_buf_.size()));
613  ret.emplace_back(as::buffer(topic_name_));
614  if (!packet_id_.empty()) {
615  ret.emplace_back(as::buffer(packet_id_.data(), packet_id_.size()));
616  }
617  std::copy(payloads_.begin(), payloads_.end(), std::back_inserter(ret));
618  return ret;
619  }
620 
625  std::size_t size() const {
626  return
627  1 + // fixed header
628  remaining_length_buf_.size() +
629  remaining_length_;
630  }
631 
636  std::size_t num_of_const_buffer_sequence() const {
637  return
638  1 + // fixed header
639  1 + // remaining length
640  2 + // topic name length, topic name
641  (packet_id_.empty() ? 0 : 1) + // packet_id
642  payloads_.size();
643  }
644 
651  std::string continuous_buffer() const {
652  std::string ret;
653 
654  ret.reserve(size());
655 
656  ret.push_back(static_cast<char>(fixed_header_));
657  ret.append(remaining_length_buf_.data(), remaining_length_buf_.size());
658 
659  ret.append(topic_name_length_buf_.data(), topic_name_length_buf_.size());
660  ret.append(get_pointer(topic_name_), get_size(topic_name_));
661 
662  ret.append(packet_id_.data(), packet_id_.size());
663  for (auto const& payload : payloads_) {
664  ret.append(get_pointer(payload), get_size(payload));
665  }
666 
667  return ret;
668  }
669 
675  return make_packet_id<PacketIdBytes>::apply(packet_id_.begin(), packet_id_.end());
676  }
677 
682  constexpr publish_options get_options() const {
683  return publish_options(fixed_header_);
684  }
685 
690  constexpr qos get_qos() const {
691  return publish::get_qos(fixed_header_);
692  }
693 
698  constexpr bool is_retain() const {
699  return publish::is_retain(fixed_header_);
700  }
701 
706  constexpr bool is_dup() const {
707  return publish::is_dup(fixed_header_);
708  }
709 
714  constexpr string_view topic() const {
715  return string_view(get_pointer(topic_name_), get_size(topic_name_));
716  }
717 
722  std::vector<string_view> payload() const {
723  std::vector<string_view> ret;
724  ret.reserve(payloads_.size());
725  for (auto const& payload : payloads_) {
726  ret.emplace_back(get_pointer(payload), get_size(payload));
727  }
728  return ret;
729  }
730 
736  auto size = std::accumulate(
737  payloads_.begin(),
738  payloads_.end(),
739  std::size_t(0),
740  [](std::size_t s, as::const_buffer const& payload) {
741  return s += payload.size();
742  }
743  );
744 
745  if (size == 0) return buffer();
746 
747  auto spa = make_shared_ptr_array(size);
748  auto ptr = spa.get();
749  auto it = ptr;
750  for (auto const& payload : payloads_) {
751  auto b = get_pointer(payload);
752  auto s = get_size(payload);
753  auto e = b + s;
754  std::copy(b, e, it);
755  it += s;
756  }
757  return buffer(string_view(ptr, size), force_move(spa));
758  }
759 
764  constexpr void set_dup(bool dup) {
765  publish::set_dup(fixed_header_, dup);
766  }
767 
768 private:
769  std::uint8_t fixed_header_;
770  as::const_buffer topic_name_;
771  boost::container::static_vector<char, 2> topic_name_length_buf_;
772  boost::container::static_vector<char, PacketIdBytes> packet_id_;
773  std::vector<as::const_buffer> payloads_;
774  std::size_t remaining_length_;
775  boost::container::static_vector<char, 4> remaining_length_buf_;
776 };
777 
780 
781 template <std::size_t PacketIdBytes>
783 private:
784  struct entry {
785  entry(as::const_buffer topic_name, subscribe_options qos_value)
786  : topic_name_(topic_name),
787  topic_name_length_buf_ { num_to_2bytes(boost::numeric_cast<std::uint16_t>(topic_name_.size())) },
788  qos_(qos_value.get_qos())
789  {}
790 
791  as::const_buffer topic_name_;
792  boost::container::static_vector<char, 2> topic_name_length_buf_;
793  qos qos_;
794  };
795 
796 public:
798  std::vector<std::tuple<as::const_buffer, subscribe_options>> params,
799  typename packet_id_type<PacketIdBytes>::type packet_id
800  )
801  : fixed_header_(make_fixed_header(control_packet_type::subscribe, 0b0010)),
802  remaining_length_(PacketIdBytes)
803  {
804  add_packet_id_to_buf<PacketIdBytes>::apply(packet_id_, packet_id);
805 
806  // Check for errors before allocating.
807  for (auto&& e : params) {
808  as::const_buffer topic_name = std::get<0>(e);
809  utf8string_check(topic_name);
810  }
811 
812  entries_.reserve(params.size());
813  for (auto&& e : params) {
814  as::const_buffer topic_name = std::get<0>(e);
815  size_t size = topic_name.size();
816 
817  entries_.emplace_back(topic_name, std::get<1>(e));
818  remaining_length_ +=
819  2 + // topic name length
820  size + // topic name
821  1; // means QoS
822  }
823  auto rb = remaining_bytes(remaining_length_);
824  for (auto e : rb) {
825  remaining_length_buf_.push_back(e);
826  }
827  }
828 
834  std::vector<as::const_buffer> const_buffer_sequence() const {
835  std::vector<as::const_buffer> ret;
836  ret.reserve(num_of_const_buffer_sequence());
837 
838  ret.emplace_back(as::buffer(&fixed_header_, 1));
839 
840  ret.emplace_back(as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()));
841 
842  ret.emplace_back(as::buffer(packet_id_.data(), packet_id_.size()));
843 
844  for (auto const& e : entries_) {
845  ret.emplace_back(as::buffer(e.topic_name_length_buf_.data(), e.topic_name_length_buf_.size()));
846  ret.emplace_back(as::buffer(e.topic_name_));
847  ret.emplace_back(as::buffer(&e.qos_, 1));
848  }
849 
850  return ret;
851  }
852 
857  std::size_t size() const {
858  return
859  1 + // fixed header
860  remaining_length_buf_.size() +
861  remaining_length_;
862  }
863 
868  std::size_t num_of_const_buffer_sequence() const {
869  return
870  1 + // fixed header
871  1 + // remaining length
872  1 + // packet id
873  entries_.size() * 3; // topic name length, topic name, qos
874  }
875 
882  std::string continuous_buffer() const {
883  std::string ret;
884 
885  ret.reserve(size());
886 
887  ret.push_back(static_cast<char>(fixed_header_));
888  ret.append(remaining_length_buf_.data(), remaining_length_buf_.size());
889 
890  ret.append(packet_id_.data(), packet_id_.size());
891 
892  for (auto const& e : entries_) {
893  ret.append(e.topic_name_length_buf_.data(), e.topic_name_length_buf_.size());
894  ret.append(get_pointer(e.topic_name_), get_size(e.topic_name_));
895  ret.push_back(static_cast<char>(e.qos_));
896  }
897 
898  return ret;
899  }
900 
901 private:
902  std::uint8_t fixed_header_;
903  std::vector<entry> entries_;
904  boost::container::static_vector<char, PacketIdBytes> packet_id_;
905  std::size_t remaining_length_;
906  boost::container::static_vector<char, 4> remaining_length_buf_;
907 };
908 
910 
911 template <std::size_t PacketIdBytes>
913 public:
915  std::vector<suback_return_code> params,
916  typename packet_id_type<PacketIdBytes>::type packet_id
917  )
918  : fixed_header_(make_fixed_header(control_packet_type::suback, 0b0000)),
919  remaining_length_(params.size() + PacketIdBytes)
920  {
921  add_packet_id_to_buf<PacketIdBytes>::apply(packet_id_, packet_id);
922  auto rb = remaining_bytes(remaining_length_);
923  for (auto e : rb) {
924  remaining_length_buf_.push_back(e);
925  }
926  // TODO: We should be able to simply static-cast params.data() into a char*.
927  entries_.reserve(params.size());
928  for (auto e : params) {
929  entries_.push_back(static_cast<char>(e));
930  }
931  }
932 
938  std::vector<as::const_buffer> const_buffer_sequence() const {
939  std::vector<as::const_buffer> ret;
940  ret.reserve(num_of_const_buffer_sequence());
941 
942  ret.emplace_back(as::buffer(&fixed_header_, 1));
943  ret.emplace_back(as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()));
944  ret.emplace_back(as::buffer(packet_id_.data(), packet_id_.size()));
945  ret.emplace_back(as::buffer(entries_));
946 
947  return ret;
948  }
949 
954  std::size_t size() const {
955  return
956  1 + // fixed header
957  remaining_length_buf_.size() +
958  remaining_length_;
959  }
960 
965  static constexpr std::size_t num_of_const_buffer_sequence() {
966  return 4; // fixed header, remaining length, packet_id, entries
967  }
968 
975  std::string continuous_buffer() const {
976  std::string ret;
977 
978  ret.reserve(size());
979 
980  ret.push_back(static_cast<char>(fixed_header_));
981  ret.append(remaining_length_buf_.data(), remaining_length_buf_.size());
982 
983  ret.append(packet_id_.data(), packet_id_.size());
984  ret.append(entries_);
985 
986  return ret;
987  }
988 
989 private:
990  std::uint8_t fixed_header_;
991  std::string entries_;
992  boost::container::static_vector<char, PacketIdBytes> packet_id_;
993  std::size_t remaining_length_;
994  boost::container::static_vector<char, 4> remaining_length_buf_;
995 };
996 
998 
999 template <std::size_t PacketIdBytes>
1001 private:
1002  struct entry {
1003  entry(as::const_buffer topic_name)
1004  : topic_name_(force_move(topic_name)),
1005  topic_name_length_buf_ { num_to_2bytes(boost::numeric_cast<std::uint16_t>(topic_name_.size())) }
1006  {}
1007 
1008  as::const_buffer topic_name_;
1009  boost::container::static_vector<char, 2> topic_name_length_buf_;
1010  };
1011 
1012 public:
1014  std::vector<as::const_buffer> params,
1015  typename packet_id_type<PacketIdBytes>::type packet_id
1016  )
1017  : fixed_header_(make_fixed_header(control_packet_type::unsubscribe, 0b0010)),
1018  remaining_length_(PacketIdBytes)
1019  {
1020  add_packet_id_to_buf<PacketIdBytes>::apply(packet_id_, packet_id);
1021 
1022  // Check for errors before allocating.
1023  for (auto&& e : params) {
1024  utf8string_check(e);
1025  }
1026 
1027  entries_.reserve(params.size());
1028  for (auto&& e : params) {
1029  entries_.emplace_back(e);
1030  remaining_length_ +=
1031  2 + // topic name length
1032  e.size(); // topic name
1033  }
1034  auto rb = remaining_bytes(remaining_length_);
1035  for (auto e : rb) {
1036  remaining_length_buf_.push_back(e);
1037  }
1038  }
1039 
1045  std::vector<as::const_buffer> const_buffer_sequence() const {
1046  std::vector<as::const_buffer> ret;
1047  ret.reserve(num_of_const_buffer_sequence());
1048 
1049  ret.emplace_back(as::buffer(&fixed_header_, 1));
1050  ret.emplace_back(as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()));
1051 
1052  ret.emplace_back(as::buffer(packet_id_.data(), packet_id_.size()));
1053 
1054  for (auto const& e : entries_) {
1055  ret.emplace_back(as::buffer(e.topic_name_length_buf_.data(), e.topic_name_length_buf_.size()));
1056  ret.emplace_back(as::buffer(e.topic_name_));
1057  }
1058 
1059  return ret;
1060  }
1061 
1066  std::size_t size() const {
1067  return
1068  1 + // fixed header
1069  remaining_length_buf_.size() +
1070  remaining_length_;
1071  }
1072 
1077  std::size_t num_of_const_buffer_sequence() const {
1078  return
1079  1 + // fixed header
1080  1 + // remaining length
1081  1 + // packet id
1082  entries_.size() * 2; // topic name length, topic name
1083  }
1084 
1091  std::string continuous_buffer() const {
1092  std::string ret;
1093  ret.reserve(size());
1094 
1095  ret.push_back(static_cast<char>(fixed_header_));
1096 
1097  ret.append(remaining_length_buf_.data(), remaining_length_buf_.size());
1098 
1099  ret.append(packet_id_.data(), packet_id_.size());
1100 
1101  for (auto const& e : entries_) {
1102  ret.append(e.topic_name_length_buf_.data(), e.topic_name_length_buf_.size());
1103  ret.append(get_pointer(e.topic_name_), get_size(e.topic_name_));
1104  }
1105 
1106  return ret;
1107  }
1108 
1109 private:
1110  std::uint8_t fixed_header_;
1111  std::vector<entry> entries_;
1112  boost::container::static_vector<char, PacketIdBytes> packet_id_;
1113  std::size_t remaining_length_;
1114  boost::container::static_vector<char, 4> remaining_length_buf_;
1115 };
1116 
1118 
1119 } // inline namespace v3_1_1
1120 
1121 } // namespace MQTT_NS
1122 
1123 #endif // MQTT_MESSAGE_HPP
buffer that has string_view interface This class provides string_view interface. This class hold stri...
Definition: buffer.hpp:30
buffer substr(std::size_t offset, std::size_t length=string_view::npos) const &
get substring The returned buffer ragnge is the same as std::string_view::substr()....
Definition: buffer.hpp:68
Definition: message.hpp:505
constexpr publish_options get_options() const
Get publish_options.
Definition: message.hpp:682
std::size_t size() const
Get whole size of sequence.
Definition: message.hpp:625
buffer payload_as_buffer() const
Get payload as single buffer.
Definition: message.hpp:735
std::vector< string_view > payload() const
Get payload.
Definition: message.hpp:722
constexpr void set_dup(bool dup)
Set dup flag.
Definition: message.hpp:764
constexpr bool is_dup() const
Check dup flag.
Definition: message.hpp:706
constexpr qos get_qos() const
Get qos.
Definition: message.hpp:690
basic_publish_message(typename packet_id_type< PacketIdBytes >::type packet_id, as::const_buffer topic_name, ConstBufferSequence payloads, publish_options pubopts)
Definition: message.hpp:514
std::string continuous_buffer() const
Create one continuous buffer. All sequence of buffers are concatinated. It is useful to store to file...
Definition: message.hpp:651
constexpr string_view topic() const
Get topic name.
Definition: message.hpp:714
basic_publish_message(buffer buf)
Definition: message.hpp:555
packet_id_type< PacketIdBytes >::type packet_id() const
Get packet id.
Definition: message.hpp:674
std::vector< as::const_buffer > const_buffer_sequence() const
Create const buffer sequence it is for boost asio APIs.
Definition: message.hpp:607
std::size_t num_of_const_buffer_sequence() const
Get number of element of const_buffer_sequence.
Definition: message.hpp:636
constexpr bool is_retain() const
Check retain flag.
Definition: message.hpp:698
Definition: message.hpp:912
std::string continuous_buffer() const
Create one continuours buffer. All sequence of buffers are concatinated. It is useful to store to fil...
Definition: message.hpp:975
basic_suback_message(std::vector< suback_return_code > params, typename packet_id_type< PacketIdBytes >::type packet_id)
Definition: message.hpp:914
std::vector< as::const_buffer > const_buffer_sequence() const
Create const buffer sequence it is for boost asio APIs.
Definition: message.hpp:938
static constexpr std::size_t num_of_const_buffer_sequence()
Get number of element of const_buffer_sequence.
Definition: message.hpp:965
std::size_t size() const
Get whole size of sequence.
Definition: message.hpp:954
Definition: message.hpp:782
std::vector< as::const_buffer > const_buffer_sequence() const
Create const buffer sequence it is for boost asio APIs.
Definition: message.hpp:834
basic_subscribe_message(std::vector< std::tuple< as::const_buffer, subscribe_options >> params, typename packet_id_type< PacketIdBytes >::type packet_id)
Definition: message.hpp:797
std::size_t size() const
Get whole size of sequence.
Definition: message.hpp:857
std::size_t num_of_const_buffer_sequence() const
Get number of element of const_buffer_sequence.
Definition: message.hpp:868
std::string continuous_buffer() const
Create one continuours buffer. All sequence of buffers are concatinated. It is useful to store to fil...
Definition: message.hpp:882
Definition: message.hpp:1000
std::string continuous_buffer() const
Create one continuours buffer. All sequence of buffers are concatinated. It is useful to store to fil...
Definition: message.hpp:1091
std::vector< as::const_buffer > const_buffer_sequence() const
Create const buffer sequence it is for boost asio APIs.
Definition: message.hpp:1045
std::size_t size() const
Get whole size of sequence.
Definition: message.hpp:1066
std::size_t num_of_const_buffer_sequence() const
Get number of element of const_buffer_sequence.
Definition: message.hpp:1077
basic_unsubscribe_message(std::vector< as::const_buffer > params, typename packet_id_type< PacketIdBytes >::type packet_id)
Definition: message.hpp:1013
Definition: message.hpp:249
std::size_t size() const
Get whole size of sequence.
Definition: message.hpp:273
std::string continuous_buffer() const
Create one continuours buffer. All sequence of buffers are concatinated. It is useful to store to fil...
Definition: message.hpp:291
std::vector< as::const_buffer > const_buffer_sequence() const
Create const buffer sequence it is for boost asio APIs.
Definition: message.hpp:265
connack_message(bool session_present, connect_return_code return_code)
Definition: message.hpp:251
static constexpr std::size_t num_of_const_buffer_sequence()
Get number of element of const_buffer_sequence.
Definition: message.hpp:281
Definition: message.hpp:301
std::string continuous_buffer() const
Create one continuours buffer. All sequence of buffers are concatinated. It is useful to store to fil...
Definition: message.hpp:446
std::size_t size() const
Get whole size of sequence.
Definition: message.hpp:413
std::vector< as::const_buffer > const_buffer_sequence() const
Create const buffer sequence it is for boost asio APIs.
Definition: message.hpp:376
connect_message(std::uint16_t keep_alive_sec, buffer client_id, bool clean_session, optional< will > w, optional< buffer > user_name, optional< buffer > password)
Definition: message.hpp:303
static constexpr std::size_t num_of_const_buffer_sequence()
Get number of element of const_buffer_sequence.
Definition: message.hpp:424
static constexpr std::size_t num_of_const_buffer_sequence()
Get number of element of const_buffer_sequence.
Definition: message.hpp:144
basic_header_packet_id_message(control_packet_type type, std::uint8_t flags, typename packet_id_type< PacketIdBytes >::type packet_id)
Create empty header_packet_id_message.
Definition: message.hpp:109
std::vector< as::const_buffer > const_buffer_sequence() const
Create const buffer sequence it is for boost asio APIs.
Definition: message.hpp:128
std::string continuous_buffer() const
Create one continuours buffer. All sequence of buffers are concatinated. It is useful to store to fil...
Definition: message.hpp:154
std::size_t size() const
Get whole size of sequence.
Definition: message.hpp:136
boost::container::static_vector< char, 2+PacketIdBytes > const & message() const
Definition: message.hpp:158
basic_header_packet_id_message(Iterator b, Iterator e)
Definition: message.hpp:116
std::string continuous_buffer() const
Create one continuours buffer. All sequence of buffers are concatinated. It is useful to store to fil...
Definition: message.hpp:90
header_only_message(control_packet_type type, std::uint8_t flags)
Create empty header_packet_id_message.
Definition: message.hpp:55
std::size_t size() const
Get whole size of sequence.
Definition: message.hpp:72
static constexpr std::size_t num_of_const_buffer_sequence()
Get number of element of const_buffer_sequence.
Definition: message.hpp:80
std::vector< as::const_buffer > const_buffer_sequence() const
Create const buffer sequence it is for boost asio APIs.
Definition: message.hpp:64
constexpr bool has_password_flag(char v)
Definition: connect_flags.hpp:43
constexpr char const will_retain
Definition: connect_flags.hpp:21
constexpr char const user_name_flag
Definition: connect_flags.hpp:23
constexpr char const will_flag
Definition: connect_flags.hpp:20
constexpr bool has_user_name_flag(char v)
Definition: connect_flags.hpp:47
constexpr void set_will_qos(char &v, qos qos_value)
Definition: connect_flags.hpp:51
constexpr bool has_will_flag(char v)
Definition: connect_flags.hpp:33
constexpr char const clean_session
Definition: connect_flags.hpp:18
constexpr char const password_flag
Definition: connect_flags.hpp:22
constexpr bool is_dup(std::uint8_t v)
Definition: publish.hpp:22
constexpr qos get_qos(std::uint8_t v)
Definition: publish.hpp:26
constexpr void set_dup(std::uint8_t &fixed_header, bool dup)
Definition: publish.hpp:34
constexpr bool is_retain(std::uint8_t v)
Definition: publish.hpp:30
Definition: any.hpp:27
std::string remaining_bytes(std::size_t size)
Definition: remaining_length.hpp:17
boost::string_ref string_view
Definition: string_view.hpp:64
void utf8string_check(string_view str)
Definition: string_check.hpp:20
constexpr std::tuple< std::size_t, std::size_t > remaining_length(string_view bytes)
Definition: remaining_length.hpp:24
control_packet_type
Definition: control_packet_type.hpp:18
constexpr std::remove_reference_t< T > && force_move(T &&t)
Definition: move.hpp:20
boost::container::static_vector< char, 2 > num_to_2bytes(std::uint16_t val)
Definition: two_byte_util.hpp:20
void add_uint16_t_to_buf(T &buf, std::uint16_t num)
Definition: two_byte_util.hpp:28
dup
Definition: publish.hpp:48
char const * get_pointer(as::const_buffer const &cb)
Definition: const_buffer_util.hpp:17
connect_return_code
Definition: connect_return_code.hpp:17
buffer const * buffer_sequence_end(buffer const &buf)
Definition: buffer.hpp:155
buffer const * buffer_sequence_begin(buffer const &buf)
Definition: buffer.hpp:151
qos
Definition: subscribe_options.hpp:34
constexpr std::uint16_t make_uint16_t(It b, It e)
Definition: two_byte_util.hpp:34
constexpr std::uint8_t make_fixed_header(control_packet_type type, std::uint8_t flags)
Definition: fixed_header.hpp:15
std::size_t get_size(as::const_buffer const &cb)
Definition: const_buffer_util.hpp:21
Definition: buffer.hpp:242
const_buffer buffer(MQTT_NS::buffer const &data)
create boost::asio::const_buffer from the MQTT_NS::buffer boost::asio::const_buffer is a kind of view...
Definition: buffer.hpp:253
Definition: buffer.hpp:241
shared_ptr_array make_shared_ptr_array(std::size_t size)
shared_ptr_array creating function. You can choose the target type.
Definition: two_or_four_byte_util.hpp:52
Definition: two_or_four_byte_util.hpp:33
Definition: exception.hpp:21
Definition: publish.hpp:53
constexpr qos get_qos() const
Definition: publish.hpp:84
Definition: exception.hpp:33
Definition: subscribe_options.hpp:40
constexpr qos get_qos() const
Definition: subscribe_options.hpp:73
Definition: message.hpp:169
basic_puback_message(typename packet_id_type< PacketIdBytes >::type packet_id)
Definition: message.hpp:171
Definition: message.hpp:214
basic_pubcomp_message(typename packet_id_type< PacketIdBytes >::type packet_id)
Definition: message.hpp:215
Definition: message.hpp:179
basic_pubrec_message(typename packet_id_type< PacketIdBytes >::type packet_id)
Definition: message.hpp:181
Definition: message.hpp:189
basic_pubrel_message(string_view buf)
Definition: message.hpp:196
basic_pubrel_message(typename packet_id_type< PacketIdBytes >::type packet_id)
Definition: message.hpp:191
packet_id_type< PacketIdBytes >::type packet_id() const
Get packet id.
Definition: message.hpp:205
Definition: message.hpp:223
basic_unsuback_message(typename packet_id_type< PacketIdBytes >::type packet_id)
Definition: message.hpp:224
Definition: message.hpp:243
disconnect_message()
Definition: message.hpp:244
Definition: message.hpp:231
pingreq_message()
Definition: message.hpp:232
Definition: message.hpp:237
pingresp_message()
Definition: message.hpp:238
Definition: exception.hpp:65