7#if !defined(ASYNC_MQTT_PACKET_V5_PUBLISH_HPP)
8#define ASYNC_MQTT_PACKET_V5_PUBLISH_HPP
13#include <boost/numeric/conversion/cast.hpp>
15#include <async_mqtt/exception.hpp>
16#include <async_mqtt/buffer.hpp>
17#include <async_mqtt/variable_bytes.hpp>
18#include <async_mqtt/type.hpp>
20#include <async_mqtt/util/move.hpp>
21#include <async_mqtt/util/static_vector.hpp>
22#include <async_mqtt/util/endian_convert.hpp>
23#include <async_mqtt/util/utf8validate.hpp>
25#include <async_mqtt/packet/packet_iterator.hpp>
26#include <async_mqtt/packet/packet_id_type.hpp>
27#include <async_mqtt/packet/fixed_header.hpp>
29#include <async_mqtt/packet/copy_to_static_vector.hpp>
30#include <async_mqtt/packet/property_variant.hpp>
31#if defined(ASYNC_MQTT_PRINT_PAYLOAD)
32#include <async_mqtt/util/json_like_out.hpp>
35namespace async_mqtt::v5 {
37namespace as = boost::asio;
39template <std::
size_t PacketIdBytes>
40class basic_publish_packet {
42 using packet_id_t =
typename packet_id_type<PacketIdBytes>::type;
44 typename BufferSequence,
46 is_buffer_sequence<std::decay_t<BufferSequence>>::value,
51 packet_id_t packet_id,
53 BufferSequence payloads,
58 make_fixed_header(control_packet_type::publish, 0b0000) | std::uint8_t(pubopts)
60 topic_name_{force_move(topic_name)},
61 packet_id_(PacketIdBytes),
62 property_length_(async_mqtt::size(props)),
63 props_(force_move(props)),
72 using namespace std::literals;
73 topic_name_length_buf_.resize(topic_name_length_buf_.capacity());
75 boost::numeric_cast<std::uint16_t>(topic_name_.size()),
76 topic_name_length_buf_.data()
78 auto b = buffer_sequence_begin(payloads);
79 auto e = buffer_sequence_end(payloads);
80 auto num_of_payloads =
static_cast<std::size_t
>(std::distance(b, e));
81 payloads_.reserve(num_of_payloads);
83 auto const& payload = *b;
84 remaining_length_ += payload.size();
85 payloads_.push_back(payload);
88 if (!utf8string_check(topic_name_)) {
91 "v5::publish_packet topic name invalid utf8"
95 auto pb = val_to_variable_bytes(boost::numeric_cast<std::uint32_t>(property_length_));
97 property_length_buf_.push_back(e);
100 for (
auto const& prop : props_) {
102 if (!validate_property(property_location::publish,
id)) {
105 "v5::publish_packet property "s + id_to_str(
id) +
" is not allowed"
110 remaining_length_ += property_length_buf_.size() + property_length_;
112 auto rb = val_to_variable_bytes(boost::numeric_cast<std::uint32_t>(remaining_length_));
114 remaining_length_buf_.push_back(e);
117 if (pubopts.get_qos() == qos::at_least_once ||
118 pubopts.get_qos() == qos::exactly_once) {
119 if (packet_id == 0) {
122 "v5::publish_packet qos not 0 but packet_id is 0"
125 endian_store(packet_id, packet_id_.data());
128 if (packet_id != 0) {
131 "v5::publish_packet qos0 but non 0 packet_id"
134 endian_store(0, packet_id_.data());
139 typename BufferSequence,
140 typename std::enable_if<
141 is_buffer_sequence<std::decay_t<BufferSequence>>::value,
145 basic_publish_packet(
147 BufferSequence payloads,
149 properties props = {}
150 ) : basic_publish_packet(0, force_move(topic_name), force_move(payloads), pubopts, force_move(props)) {
153 basic_publish_packet(buffer buf)
154 : packet_id_(PacketIdBytes) {
159 "v5::publish_packet fixed_header doesn't exist"
162 fixed_header_ =
static_cast<std::uint8_t
>(buf.front());
163 buf.remove_prefix(1);
166 if (
auto vl_opt = insert_advance_variable_length(buf, remaining_length_buf_)) {
167 remaining_length_ = *vl_opt;
170 throw make_error(errc::bad_message,
"v5::publish_packet remaining length is invalid");
174 if (!insert_advance(buf, topic_name_length_buf_)) {
177 "v5::publish_packet length of topic_name is invalid"
180 auto topic_name_length = endian_load<std::uint16_t>(topic_name_length_buf_.data());
183 if (buf.size() < topic_name_length) {
186 "v5::publish_packet topic_name doesn't match its length"
189 topic_name_ = buf.
substr(0, topic_name_length);
191 if (!utf8string_check(topic_name_)) {
194 "v5::publish_packet topic name invalid utf8"
198 buf.remove_prefix(topic_name_length);
201 switch (pub::get_qos(fixed_header_)) {
202 case qos::at_most_once:
203 endian_store(packet_id_t{0}, packet_id_.data());
205 case qos::at_least_once:
206 case qos::exactly_once:
207 if (!copy_advance(buf, packet_id_)) {
210 "v5::publish_packet packet_id doesn't exist"
217 "v5::publish_packet qos is invalid"
223 auto it = buf.begin();
224 if (
auto pl_opt = variable_bytes_to_val(it, buf.end())) {
225 property_length_ = *pl_opt;
226 std::copy(buf.begin(), it, std::back_inserter(property_length_buf_));
227 buf.remove_prefix(std::size_t(std::distance(buf.begin(), it)));
228 if (buf.size() < property_length_) {
231 "v5::publish_packet properties_don't match its length"
234 auto prop_buf = buf.substr(0, property_length_);
235 props_ = make_properties(prop_buf, property_location::publish);
236 buf.remove_prefix(property_length_);
241 "v5::publish_packet property_length is invalid"
247 payloads_.emplace_back(force_move(buf));
251 constexpr control_packet_type type()
const {
252 return control_packet_type::publish;
260 std::vector<as::const_buffer> const_buffer_sequence()
const {
261 std::vector<as::const_buffer> ret;
262 ret.reserve(num_of_const_buffer_sequence());
263 ret.emplace_back(as::buffer(&fixed_header_, 1));
264 ret.emplace_back(as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()));
265 ret.emplace_back(as::buffer(topic_name_length_buf_.data(), topic_name_length_buf_.size()));
266 ret.emplace_back(as::buffer(topic_name_));
267 if (packet_id() != 0) {
268 ret.emplace_back(as::buffer(packet_id_.data(), packet_id_.size()));
270 ret.emplace_back(as::buffer(property_length_buf_.data(), property_length_buf_.size()));
271 auto props_cbs = async_mqtt::const_buffer_sequence(props_);
272 std::move(props_cbs.begin(), props_cbs.end(), std::back_inserter(ret));
273 for (
auto const& payload : payloads_) {
274 ret.emplace_back(as::buffer(payload));
283 std::size_t size()
const {
286 remaining_length_buf_.size() +
294 std::size_t num_of_const_buffer_sequence()
const {
300 if (packet_id() == 0)
return 0U;
304 async_mqtt::num_of_const_buffer_sequence(props_) +
312 packet_id_t packet_id()
const {
313 return endian_load<packet_id_t>(packet_id_.data());
320 constexpr pub::opts opts()
const {
321 return pub::opts(fixed_header_);
328 constexpr buffer
const& topic()
const {
336 std::vector<buffer>
const& payload()
const {
344 auto payload_range()
const {
345 return make_packet_range(payloads_);
352 constexpr void set_dup(
bool dup) {
353 pub::set_dup(fixed_header_, dup);
360 properties
const& props()
const {
370 void remove_topic_add_topic_alias(topic_alias_t val) {
372 auto prop{property::topic_alias{val}};
373 auto prop_size = prop.size();
374 property_length_ += prop_size;
375 props_.push_back(force_move(prop));
378 auto [old_property_length_buf_size, new_property_length_buf_size] =
379 update_property_length_buf();
382 auto old_topic_name_size = topic_name_.size();
383 topic_name_ = buffer{};
385 boost::numeric_cast<std::uint16_t>(topic_name_.size()),
386 topic_name_length_buf_.data()
392 (new_property_length_buf_size - old_property_length_buf_size) -
394 update_remaining_length_buf();
403 void add_topic_alias(topic_alias_t val) {
405 auto prop{property::topic_alias{val}};
406 auto prop_size = prop.size();
407 property_length_ += prop_size;
408 props_.push_back(force_move(prop));
411 auto [old_property_length_buf_size, new_property_length_buf_size] =
412 update_property_length_buf();
417 (new_property_length_buf_size - old_property_length_buf_size);
418 update_remaining_length_buf();
427 void add_topic(buffer topic) {
428 add_topic_impl(force_move(topic));
430 remaining_length_ += topic_name_.size();
431 update_remaining_length_buf();
434 void remove_topic_alias() {
435 auto prop_size = remove_topic_alias_impl();
436 property_length_ -= prop_size;
438 auto [old_property_length_buf_size, new_property_length_buf_size] =
439 update_property_length_buf();
444 (new_property_length_buf_size - old_property_length_buf_size);
445 update_remaining_length_buf();
448 void remove_topic_alias_add_topic(buffer topic) {
449 auto prop_size = remove_topic_alias_impl();
450 property_length_ -= prop_size;
451 add_topic_impl(force_move(topic));
453 auto [old_property_length_buf_size, new_property_length_buf_size] =
454 update_property_length_buf();
460 (new_property_length_buf_size - old_property_length_buf_size);
461 update_remaining_length_buf();
468 void update_message_expiry_interval(std::uint32_t val) {
469 bool updated =
false;
470 for (
auto& prop : props_) {
473 [&](property::message_expiry_interval& p) {
474 p = property::message_expiry_interval(val);
485 void update_remaining_length_buf() {
486 remaining_length_buf_.clear();
487 auto rb = val_to_variable_bytes(boost::numeric_cast<std::uint32_t>(remaining_length_));
489 remaining_length_buf_.push_back(e);
493 std::tuple<std::size_t, std::size_t> update_property_length_buf() {
494 auto old_property_length_buf_size = property_length_buf_.size();
495 property_length_buf_.clear();
496 auto pb = val_to_variable_bytes(boost::numeric_cast<std::uint32_t>(property_length_));
498 property_length_buf_.push_back(e);
500 auto new_property_length_buf_size = property_length_buf_.size();
501 return {old_property_length_buf_size, new_property_length_buf_size};
504 std::size_t remove_topic_alias_impl() {
505 auto it = props_.cbegin();
506 std::size_t size = 0;
507 while (it != props_.cend()) {
508 if (it->id() == property::id::topic_alias) {
510 it = props_.erase(it);
519 void add_topic_impl(buffer topic) {
520 BOOST_ASSERT(topic_name_.empty());
523 topic_name_ = force_move(topic);
525 boost::numeric_cast<std::uint16_t>(topic_name_.size()),
526 topic_name_length_buf_.data()
531 std::uint8_t fixed_header_;
533 static_vector<char, 2> topic_name_length_buf_;
534 static_vector<char, PacketIdBytes> packet_id_;
535 std::size_t property_length_;
536 static_vector<char, 4> property_length_buf_;
538 std::vector<buffer> payloads_;
539 std::size_t remaining_length_;
540 static_vector<char, 4> remaining_length_buf_;
543template <std::
size_t PacketIdBytes>
544inline std::ostream& operator<<(std::ostream& o, basic_publish_packet<PacketIdBytes>
const& v) {
545 o <<
"v5::publish{" <<
546 "topic:" << v.topic() <<
"," <<
547 "qos:" << v.opts().get_qos() <<
"," <<
548 "retain:" << v.opts().get_retain() <<
"," <<
549 "dup:" << v.opts().get_dup();
550 if (v.opts().get_qos() == qos::at_least_once ||
551 v.opts().get_qos() == qos::exactly_once) {
552 o <<
",pid:" << v.packet_id();
554#if defined(ASYNC_MQTT_PRINT_PAYLOAD)
556 for (
auto const& e : v.payload()) {
557 o << json_like_out(e);
560 if (!v.props().empty()) {
561 o <<
",ps:" << v.props();
567using publish_packet = basic_publish_packet<2>;
buffer substr(size_type pos=0, size_type count=npos) const &
get substring The returned buffer ragnge is the same as string_view::substr(). In addition the lifeti...
Definition buffer.hpp:201
qos
MQTT QoS.
Definition qos.hpp:23
@ exactly_once
Exactly once delivery.
@ at_least_once
At least once delivery.