async_mqtt 4.1.0
Loading...
Searching...
No Matches
v5_publish.hpp
1// Copyright Takatoshi Kondo 2022
2//
3// Distributed under the Boost Software License, Version 1.0.
4// (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7#if !defined(ASYNC_MQTT_PACKET_V5_PUBLISH_HPP)
8#define ASYNC_MQTT_PACKET_V5_PUBLISH_HPP
9
10#include <utility>
11#include <numeric>
12
13#include <boost/numeric/conversion/cast.hpp>
14
15#include <async_mqtt/exception.hpp>
16#include <async_mqtt/buffer.hpp>
17#include <async_mqtt/variable_bytes.hpp>
18#include <async_mqtt/type.hpp>
19
20#include <async_mqtt/util/move.hpp>
21#include <async_mqtt/util/static_vector.hpp>
22#include <async_mqtt/util/endian_convert.hpp>
23#include <async_mqtt/util/utf8validate.hpp>
24
25#include <async_mqtt/packet/packet_iterator.hpp>
26#include <async_mqtt/packet/packet_id_type.hpp>
27#include <async_mqtt/packet/fixed_header.hpp>
29#include <async_mqtt/packet/copy_to_static_vector.hpp>
30#include <async_mqtt/packet/property_variant.hpp>
31#if defined(ASYNC_MQTT_PRINT_PAYLOAD)
32#include <async_mqtt/util/json_like_out.hpp>
33#endif // defined(ASYNC_MQTT_PRINT_PAYLOAD)
34
35namespace async_mqtt::v5 {
36
37namespace as = boost::asio;
38
39template <std::size_t PacketIdBytes>
40class basic_publish_packet {
41public:
42 using packet_id_t = typename packet_id_type<PacketIdBytes>::type;
43 template <
44 typename BufferSequence,
45 std::enable_if_t<
46 is_buffer_sequence<std::decay_t<BufferSequence>>::value,
47 std::nullptr_t
48 > = nullptr
49 >
50 basic_publish_packet(
51 packet_id_t packet_id,
52 buffer topic_name,
53 BufferSequence payloads,
54 pub::opts pubopts,
55 properties props = {}
56 )
57 : fixed_header_(
58 make_fixed_header(control_packet_type::publish, 0b0000) | std::uint8_t(pubopts)
59 ),
60 topic_name_{force_move(topic_name)},
61 packet_id_(PacketIdBytes),
62 property_length_(async_mqtt::size(props)),
63 props_(force_move(props)),
64 remaining_length_(
65 2 // topic name length
66 + topic_name_.size() // topic name
67 + ( (pubopts.get_qos() == qos::at_least_once || pubopts.get_qos() == qos::exactly_once)
68 ? PacketIdBytes // packet_id
69 : 0)
70 )
71 {
72 using namespace std::literals;
73 topic_name_length_buf_.resize(topic_name_length_buf_.capacity());
74 endian_store(
75 boost::numeric_cast<std::uint16_t>(topic_name_.size()),
76 topic_name_length_buf_.data()
77 );
78 auto b = buffer_sequence_begin(payloads);
79 auto e = buffer_sequence_end(payloads);
80 auto num_of_payloads = static_cast<std::size_t>(std::distance(b, e));
81 payloads_.reserve(num_of_payloads);
82 for (; b != e; ++b) {
83 auto const& payload = *b;
84 remaining_length_ += payload.size();
85 payloads_.push_back(payload);
86 }
87
88 if (!utf8string_check(topic_name_)) {
89 throw make_error(
90 errc::bad_message,
91 "v5::publish_packet topic name invalid utf8"
92 );
93 }
94
95 auto pb = val_to_variable_bytes(boost::numeric_cast<std::uint32_t>(property_length_));
96 for (auto e : pb) {
97 property_length_buf_.push_back(e);
98 }
99
100 for (auto const& prop : props_) {
101 auto id = prop.id();
102 if (!validate_property(property_location::publish, id)) {
103 throw make_error(
104 errc::bad_message,
105 "v5::publish_packet property "s + id_to_str(id) + " is not allowed"
106 );
107 }
108 }
109
110 remaining_length_ += property_length_buf_.size() + property_length_;
111
112 auto rb = val_to_variable_bytes(boost::numeric_cast<std::uint32_t>(remaining_length_));
113 for (auto e : rb) {
114 remaining_length_buf_.push_back(e);
115 }
116
117 if (pubopts.get_qos() == qos::at_least_once ||
118 pubopts.get_qos() == qos::exactly_once) {
119 if (packet_id == 0) {
120 throw make_error(
121 errc::bad_message,
122 "v5::publish_packet qos not 0 but packet_id is 0"
123 );
124 }
125 endian_store(packet_id, packet_id_.data());
126 }
127 else {
128 if (packet_id != 0) {
129 throw make_error(
130 errc::bad_message,
131 "v5::publish_packet qos0 but non 0 packet_id"
132 );
133 }
134 endian_store(0, packet_id_.data());
135 }
136 }
137
138 template <
139 typename BufferSequence,
140 typename std::enable_if<
141 is_buffer_sequence<std::decay_t<BufferSequence>>::value,
142 std::nullptr_t
143 >::type = nullptr
144 >
145 basic_publish_packet(
146 buffer topic_name,
147 BufferSequence payloads,
148 pub::opts pubopts,
149 properties props = {}
150 ) : basic_publish_packet(0, force_move(topic_name), force_move(payloads), pubopts, force_move(props)) {
151 }
152
153 basic_publish_packet(buffer buf)
154 : packet_id_(PacketIdBytes) {
155 // fixed_header
156 if (buf.empty()) {
157 throw make_error(
158 errc::bad_message,
159 "v5::publish_packet fixed_header doesn't exist"
160 );
161 }
162 fixed_header_ = static_cast<std::uint8_t>(buf.front());
163 buf.remove_prefix(1);
164
165 // remaining_length
166 if (auto vl_opt = insert_advance_variable_length(buf, remaining_length_buf_)) {
167 remaining_length_ = *vl_opt;
168 }
169 else {
170 throw make_error(errc::bad_message, "v5::publish_packet remaining length is invalid");
171 }
172
173 // topic_name_length
174 if (!insert_advance(buf, topic_name_length_buf_)) {
175 throw make_error(
176 errc::bad_message,
177 "v5::publish_packet length of topic_name is invalid"
178 );
179 }
180 auto topic_name_length = endian_load<std::uint16_t>(topic_name_length_buf_.data());
181
182 // topic_name
183 if (buf.size() < topic_name_length) {
184 throw make_error(
185 errc::bad_message,
186 "v5::publish_packet topic_name doesn't match its length"
187 );
188 }
189 topic_name_ = buf.substr(0, topic_name_length);
190
191 if (!utf8string_check(topic_name_)) {
192 throw make_error(
193 errc::bad_message,
194 "v5::publish_packet topic name invalid utf8"
195 );
196 }
197
198 buf.remove_prefix(topic_name_length);
199
200 // packet_id
201 switch (pub::get_qos(fixed_header_)) {
202 case qos::at_most_once:
203 endian_store(packet_id_t{0}, packet_id_.data());
204 break;
205 case qos::at_least_once:
206 case qos::exactly_once:
207 if (!copy_advance(buf, packet_id_)) {
208 throw make_error(
209 errc::bad_message,
210 "v5::publish_packet packet_id doesn't exist"
211 );
212 }
213 break;
214 default:
215 throw make_error(
216 errc::bad_message,
217 "v5::publish_packet qos is invalid"
218 );
219 break;
220 };
221
222 // property
223 auto it = buf.begin();
224 if (auto pl_opt = variable_bytes_to_val(it, buf.end())) {
225 property_length_ = *pl_opt;
226 std::copy(buf.begin(), it, std::back_inserter(property_length_buf_));
227 buf.remove_prefix(std::size_t(std::distance(buf.begin(), it)));
228 if (buf.size() < property_length_) {
229 throw make_error(
230 errc::bad_message,
231 "v5::publish_packet properties_don't match its length"
232 );
233 }
234 auto prop_buf = buf.substr(0, property_length_);
235 props_ = make_properties(prop_buf, property_location::publish);
236 buf.remove_prefix(property_length_);
237 }
238 else {
239 throw make_error(
240 errc::bad_message,
241 "v5::publish_packet property_length is invalid"
242 );
243 }
244
245 // payload
246 if (!buf.empty()) {
247 payloads_.emplace_back(force_move(buf));
248 }
249 }
250
251 constexpr control_packet_type type() const {
252 return control_packet_type::publish;
253 }
254
260 std::vector<as::const_buffer> const_buffer_sequence() const {
261 std::vector<as::const_buffer> ret;
262 ret.reserve(num_of_const_buffer_sequence());
263 ret.emplace_back(as::buffer(&fixed_header_, 1));
264 ret.emplace_back(as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()));
265 ret.emplace_back(as::buffer(topic_name_length_buf_.data(), topic_name_length_buf_.size()));
266 ret.emplace_back(as::buffer(topic_name_));
267 if (packet_id() != 0) {
268 ret.emplace_back(as::buffer(packet_id_.data(), packet_id_.size()));
269 }
270 ret.emplace_back(as::buffer(property_length_buf_.data(), property_length_buf_.size()));
271 auto props_cbs = async_mqtt::const_buffer_sequence(props_);
272 std::move(props_cbs.begin(), props_cbs.end(), std::back_inserter(ret));
273 for (auto const& payload : payloads_) {
274 ret.emplace_back(as::buffer(payload));
275 }
276 return ret;
277 }
278
283 std::size_t size() const {
284 return
285 1 + // fixed header
286 remaining_length_buf_.size() +
287 remaining_length_;
288 }
289
294 std::size_t num_of_const_buffer_sequence() const {
295 return
296 1U + // fixed header
297 1U + // remaining length
298 2U + // topic name length, topic name
299 [&] {
300 if (packet_id() == 0) return 0U;
301 return 1U;
302 }() +
303 1U + // property length
304 async_mqtt::num_of_const_buffer_sequence(props_) +
305 payloads_.size();
306 }
307
312 packet_id_t packet_id() const {
313 return endian_load<packet_id_t>(packet_id_.data());
314 }
315
320 constexpr pub::opts opts() const {
321 return pub::opts(fixed_header_);
322 }
323
328 constexpr buffer const& topic() const {
329 return topic_name_;
330 }
331
336 std::vector<buffer> const& payload() const {
337 return payloads_;
338 }
339
344 auto payload_range() const {
345 return make_packet_range(payloads_);
346 }
347
352 constexpr void set_dup(bool dup) {
353 pub::set_dup(fixed_header_, dup);
354 }
355
360 properties const& props() const {
361 return props_;
362 }
363
370 void remove_topic_add_topic_alias(topic_alias_t val) {
371 // add topic_alias property
372 auto prop{property::topic_alias{val}};
373 auto prop_size = prop.size();
374 property_length_ += prop_size;
375 props_.push_back(force_move(prop));
376
377 // update property_length_buf
378 auto [old_property_length_buf_size, new_property_length_buf_size] =
379 update_property_length_buf();
380
381 // remove topic_name
382 auto old_topic_name_size = topic_name_.size();
383 topic_name_ = buffer{};
384 endian_store(
385 boost::numeric_cast<std::uint16_t>(topic_name_.size()),
386 topic_name_length_buf_.data()
387 );
388
389 // update remaining_length
390 remaining_length_ +=
391 prop_size +
392 (new_property_length_buf_size - old_property_length_buf_size) -
393 old_topic_name_size;
394 update_remaining_length_buf();
395 }
396
403 void add_topic_alias(topic_alias_t val) {
404 // add topic_alias property
405 auto prop{property::topic_alias{val}};
406 auto prop_size = prop.size();
407 property_length_ += prop_size;
408 props_.push_back(force_move(prop));
409
410 // update property_length_buf
411 auto [old_property_length_buf_size, new_property_length_buf_size] =
412 update_property_length_buf();
413
414 // update remaining_length
415 remaining_length_ +=
416 prop_size +
417 (new_property_length_buf_size - old_property_length_buf_size);
418 update_remaining_length_buf();
419 }
420
427 void add_topic(buffer topic) {
428 add_topic_impl(force_move(topic));
429 // update remaining_length
430 remaining_length_ += topic_name_.size();
431 update_remaining_length_buf();
432 }
433
434 void remove_topic_alias() {
435 auto prop_size = remove_topic_alias_impl();
436 property_length_ -= prop_size;
437 // update property_length_buf
438 auto [old_property_length_buf_size, new_property_length_buf_size] =
439 update_property_length_buf();
440
441 // update remaining_length
442 remaining_length_ +=
443 -prop_size +
444 (new_property_length_buf_size - old_property_length_buf_size);
445 update_remaining_length_buf();
446 }
447
448 void remove_topic_alias_add_topic(buffer topic) {
449 auto prop_size = remove_topic_alias_impl();
450 property_length_ -= prop_size;
451 add_topic_impl(force_move(topic));
452 // update property_length_buf
453 auto [old_property_length_buf_size, new_property_length_buf_size] =
454 update_property_length_buf();
455
456 // update remaining_length
457 remaining_length_ +=
458 topic_name_.size() -
459 prop_size +
460 (new_property_length_buf_size - old_property_length_buf_size);
461 update_remaining_length_buf();
462 }
463
468 void update_message_expiry_interval(std::uint32_t val) {
469 bool updated = false;
470 for (auto& prop : props_) {
471 prop.visit(
472 overload {
473 [&](property::message_expiry_interval& p) {
474 p = property::message_expiry_interval(val);
475 updated = true;
476 },
477 [&](auto&){}
478 }
479 );
480 if (updated) return;
481 }
482 }
483
484private:
485 void update_remaining_length_buf() {
486 remaining_length_buf_.clear();
487 auto rb = val_to_variable_bytes(boost::numeric_cast<std::uint32_t>(remaining_length_));
488 for (auto e : rb) {
489 remaining_length_buf_.push_back(e);
490 }
491 }
492
493 std::tuple<std::size_t, std::size_t> update_property_length_buf() {
494 auto old_property_length_buf_size = property_length_buf_.size();
495 property_length_buf_.clear();
496 auto pb = val_to_variable_bytes(boost::numeric_cast<std::uint32_t>(property_length_));
497 for (auto e : pb) {
498 property_length_buf_.push_back(e);
499 }
500 auto new_property_length_buf_size = property_length_buf_.size();
501 return {old_property_length_buf_size, new_property_length_buf_size};
502 }
503
504 std::size_t remove_topic_alias_impl() {
505 auto it = props_.cbegin();
506 std::size_t size = 0;
507 while (it != props_.cend()) {
508 if (it->id() == property::id::topic_alias) {
509 size += it->size();
510 it = props_.erase(it);
511 }
512 else {
513 ++it;
514 }
515 }
516 return size;
517 }
518
519 void add_topic_impl(buffer topic) {
520 BOOST_ASSERT(topic_name_.empty());
521
522 // add topic
523 topic_name_ = force_move(topic);
524 endian_store(
525 boost::numeric_cast<std::uint16_t>(topic_name_.size()),
526 topic_name_length_buf_.data()
527 );
528 }
529
530private:
531 std::uint8_t fixed_header_;
532 buffer topic_name_;
533 static_vector<char, 2> topic_name_length_buf_;
534 static_vector<char, PacketIdBytes> packet_id_;
535 std::size_t property_length_;
536 static_vector<char, 4> property_length_buf_;
537 properties props_;
538 std::vector<buffer> payloads_;
539 std::size_t remaining_length_;
540 static_vector<char, 4> remaining_length_buf_;
541};
542
543template <std::size_t PacketIdBytes>
544inline std::ostream& operator<<(std::ostream& o, basic_publish_packet<PacketIdBytes> const& v) {
545 o << "v5::publish{" <<
546 "topic:" << v.topic() << "," <<
547 "qos:" << v.opts().get_qos() << "," <<
548 "retain:" << v.opts().get_retain() << "," <<
549 "dup:" << v.opts().get_dup();
550 if (v.opts().get_qos() == qos::at_least_once ||
551 v.opts().get_qos() == qos::exactly_once) {
552 o << ",pid:" << v.packet_id();
553 }
554#if defined(ASYNC_MQTT_PRINT_PAYLOAD)
555 o << ",payload:";
556 for (auto const& e : v.payload()) {
557 o << json_like_out(e);
558 }
559#endif // defined(ASYNC_MQTT_PRINT_PAYLOAD)
560 if (!v.props().empty()) {
561 o << ",ps:" << v.props();
562 };
563 o << "}";
564 return o;
565}
566
567using publish_packet = basic_publish_packet<2>;
568
569} // namespace async_mqtt::v5
570
571#endif // ASYNC_MQTT_PACKET_V5_PUBLISH_HPP
buffer substr(size_type pos=0, size_type count=npos) const &
get substring The returned buffer ragnge is the same as string_view::substr(). In addition the lifeti...
Definition buffer.hpp:201
qos
MQTT QoS.
Definition qos.hpp:23
@ exactly_once
Exactly once delivery.
@ at_least_once
At least once delivery.