async_mqtt 5.0.0
Loading...
Searching...
No Matches
v3_1_1_publish.hpp
1// Copyright Takatoshi Kondo 2022
2//
3// Distributed under the Boost Software License, Version 1.0.
4// (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7#if !defined(ASYNC_MQTT_PACKET_V3_1_1_PUBLISH_HPP)
8#define ASYNC_MQTT_PACKET_V3_1_1_PUBLISH_HPP
9
10#include <utility>
11#include <numeric>
12
13#include <boost/numeric/conversion/cast.hpp>
14
15#include <async_mqtt/exception.hpp>
16#include <async_mqtt/buffer.hpp>
17#include <async_mqtt/variable_bytes.hpp>
18
19#include <async_mqtt/util/move.hpp>
20#include <async_mqtt/util/static_vector.hpp>
21#include <async_mqtt/util/endian_convert.hpp>
22#include <async_mqtt/util/utf8validate.hpp>
23
24#include <async_mqtt/packet/packet_iterator.hpp>
25#include <async_mqtt/packet/packet_id_type.hpp>
26#include <async_mqtt/packet/fixed_header.hpp>
28#include <async_mqtt/packet/copy_to_static_vector.hpp>
29
30#if defined(ASYNC_MQTT_PRINT_PAYLOAD)
31#include <async_mqtt/util/json_like_out.hpp>
32#endif // defined(ASYNC_MQTT_PRINT_PAYLOAD)
33
34namespace async_mqtt::v3_1_1 {
35
36namespace as = boost::asio;
37
49template <std::size_t PacketIdBytes>
51public:
52 using packet_id_t = typename packet_id_type<PacketIdBytes>::type;
53
72 template <
73 typename BufferSequence,
74 typename std::enable_if<
76 std::nullptr_t
77 >::type = nullptr
78 >
80 packet_id_t packet_id,
84 )
85 : fixed_header_(
86 make_fixed_header(control_packet_type::publish, 0b0000) | std::uint8_t(pubopts)
87 ),
88 topic_name_{force_move(topic_name)},
89 packet_id_(PacketIdBytes),
90 remaining_length_(
91 2 // topic name length
92 + topic_name_.size() // topic name
93 + ( (pubopts.get_qos() == qos::at_least_once || pubopts.get_qos() == qos::exactly_once)
94 ? PacketIdBytes // packet_id
95 : 0)
96 )
97 {
98 topic_name_length_buf_.resize(topic_name_length_buf_.capacity());
99 endian_store(
100 boost::numeric_cast<std::uint16_t>(topic_name_.size()),
101 topic_name_length_buf_.data()
102 );
103 auto b = buffer_sequence_begin(payloads);
104 auto e = buffer_sequence_end(payloads);
105 auto num_of_payloads = static_cast<std::size_t>(std::distance(b, e));
106 payloads_.reserve(num_of_payloads);
107 for (; b != e; ++b) {
108 auto const& payload = *b;
109 remaining_length_ += payload.size();
110 payloads_.push_back(payload);
111 }
112
113 if (!utf8string_check(topic_name_)) {
114 throw make_error(
115 errc::bad_message,
116 "v3_1_1::publish_packet topic name invalid utf8"
117 );
118 }
119
120 auto rb = val_to_variable_bytes(boost::numeric_cast<std::uint32_t>(remaining_length_));
121 for (auto e : rb) {
122 remaining_length_buf_.push_back(e);
123 }
124 switch (pubopts.get_qos()) {
125 case qos::at_most_once:
126 if (packet_id != 0) {
127 throw make_error(
128 errc::bad_message,
129 "v3_1_1::publish_packet qos0 but non 0 packet_id"
130 );
131 }
132 endian_store(0, packet_id_.data());
133 break;
134 case qos::at_least_once:
135 case qos::exactly_once:
136 if (packet_id == 0) {
137 throw make_error(
138 errc::bad_message,
139 "v3_1_1::publish_packet qos not 0 but packet_id is 0"
140 );
141 }
142 endian_store(packet_id, packet_id_.data());
143 break;
144 default:
145 throw make_error(
146 errc::bad_message,
147 "v3_1_1::publish_packet qos is invalid"
148 );
149 break;
150 }
151 }
152
166 template <
167 typename BufferSequence,
168 typename std::enable_if<
170 std::nullptr_t
171 >::type = nullptr
172 >
179
181 : packet_id_(PacketIdBytes) {
182 // fixed_header
183 if (buf.empty()) {
184 throw make_error(
185 errc::bad_message,
186 "v3_1_1::publish_packet fixed_header doesn't exist"
187 );
188 }
189 fixed_header_ = static_cast<std::uint8_t>(buf.front());
190 auto qos_value = pub::get_qos(fixed_header_);
191 buf.remove_prefix(1);
192 auto cpt_opt = get_control_packet_type_with_check(static_cast<std::uint8_t>(fixed_header_));
193 if (!cpt_opt || *cpt_opt != control_packet_type::publish) {
194 throw make_error(
195 errc::bad_message,
196 "v3_1_1::publish_packet fixed_header is invalid"
197 );
198 }
199
200 // remaining_length
201 if (auto vl_opt = insert_advance_variable_length(buf, remaining_length_buf_)) {
202 remaining_length_ = *vl_opt;
203 }
204 else {
205 throw make_error(errc::bad_message, "v3_1_1::publish_packet remaining length is invalid");
206 }
207 if (remaining_length_ != buf.size()) {
208 throw make_error(errc::bad_message, "v3_1_1::publish_packet remaining length doesn't match buf.size()");
209 }
210
211 // topic_name_length
212 if (!insert_advance(buf, topic_name_length_buf_)) {
213 throw make_error(
214 errc::bad_message,
215 "v3_1_1::publish_packet length of topic_name is invalid"
216 );
217 }
218 auto topic_name_length = endian_load<std::uint16_t>(topic_name_length_buf_.data());
219
220 // topic_name
221 if (buf.size() < topic_name_length) {
222 throw make_error(
223 errc::bad_message,
224 "v3_1_1::publish_packet topic_name doesn't match its length"
225 );
226 }
227 topic_name_ = buf.substr(0, topic_name_length);
228
229 if (!utf8string_check(topic_name_)) {
230 throw make_error(
231 errc::bad_message,
232 "v3_1_1::publish_packet topic name invalid utf8"
233 );
234 }
235
236 buf.remove_prefix(topic_name_length);
237
238 // packet_id
239 switch (qos_value) {
240 case qos::at_most_once:
241 endian_store(packet_id_t{0}, packet_id_.data());
242 break;
243 case qos::at_least_once:
244 case qos::exactly_once:
245 if (!copy_advance(buf, packet_id_)) {
246 throw make_error(
247 errc::bad_message,
248 "v3_1_1::publish_packet packet_id doesn't exist"
249 );
250 }
251 break;
252 default:
253 throw make_error(
254 errc::bad_message,
255 "v3_1_1::publish_packet qos is invalid"
256 );
257 break;
258 };
259
260 // payload
261 if (!buf.empty()) {
262 payloads_.emplace_back(force_move(buf));
263 }
264 }
265
266 constexpr control_packet_type type() const {
267 return control_packet_type::publish;
268 }
269
275 std::vector<as::const_buffer> const_buffer_sequence() const {
276 std::vector<as::const_buffer> ret;
278 ret.emplace_back(as::buffer(&fixed_header_, 1));
279 ret.emplace_back(as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()));
280 ret.emplace_back(as::buffer(topic_name_length_buf_.data(), topic_name_length_buf_.size()));
281 ret.emplace_back(as::buffer(topic_name_));
282 if (packet_id() != 0) {
283 ret.emplace_back(as::buffer(packet_id_.data(), packet_id_.size()));
284 }
285 for (auto const& payload : payloads_) {
286 ret.emplace_back(as::buffer(payload));
287 }
288 return ret;
289 }
290
295 std::size_t size() const {
296 return
297 1 + // fixed header
298 remaining_length_buf_.size() +
299 remaining_length_;
300 }
301
306 std::size_t num_of_const_buffer_sequence() const {
307 return
308 1 + // fixed header
309 1 + // remaining length
310 2 + // topic name length, topic name
311 [&] () -> std::size_t {
312 if (packet_id() == 0) return 0;
313 return 1;
314 }() +
315 payloads_.size();
316
317 }
318
323 packet_id_t packet_id() const {
324 return endian_load<packet_id_t>(packet_id_.data());
325 }
326
331 constexpr pub::opts opts() const {
332 return pub::opts(fixed_header_);
333 }
334
339 constexpr buffer const& topic() const {
340 return topic_name_;
341 }
342
347 std::vector<buffer> const& payload() const {
348 return payloads_;
349 }
350
355 auto payload_range() const {
356 return make_packet_range(payloads_);
357 }
358
363 constexpr void set_dup(bool dup) {
364 pub::set_dup(fixed_header_, dup);
365 }
366
367private:
368 std::uint8_t fixed_header_;
369 buffer topic_name_;
370 static_vector<char, 2> topic_name_length_buf_;
372 std::vector<buffer> payloads_;
373 std::size_t remaining_length_;
374 static_vector<char, 4> remaining_length_buf_;
375};
376
377template <std::size_t PacketIdBytes>
378inline std::ostream& operator<<(std::ostream& o, basic_publish_packet<PacketIdBytes> const& v) {
379 o << "v3_1_1::publish{" <<
380 "topic:" << v.topic() << "," <<
381 "qos:" << v.opts().get_qos() << "," <<
382 "retain:" << v.opts().get_retain() << "," <<
383 "dup:" << v.opts().get_dup();
384 if (v.opts().get_qos() == qos::at_least_once ||
385 v.opts().get_qos() == qos::exactly_once) {
386 o << ",pid:" << v.packet_id();
387 }
388#if defined(ASYNC_MQTT_PRINT_PAYLOAD)
389 o << ",payload:";
390 for (auto const& e : v.payload()) {
391 o << json_like_out(e);
392 }
393#endif // defined(ASYNC_MQTT_PRINT_PAYLOAD)
394 o << "}";
395 return o;
396}
397
403
404} // namespace async_mqtt::v3_1_1
405
406#endif // ASYNC_MQTT_PACKET_V3_1_1_PUBLISH_HPP
Definition packet_variant.hpp:49
buffer that has string_view interface This class provides string_view interface. This class hold stri...
Definition buffer.hpp:30
buffer substr(size_type pos=0, size_type count=npos) const &
get substring The returned buffer ragnge is the same as string_view::substr(). In addition the lifeti...
Definition buffer.hpp:201
MQTT PUBLISH packet (v3.1.1)
Definition v3_1_1_publish.hpp:50
basic_publish_packet(buffer topic_name, BufferSequence payloads, pub::opts pubopts)
constructor for QoS0 This constructor doesn't have packet_id parameter. The packet_id is set to 0 int...
Definition v3_1_1_publish.hpp:173
std::vector< as::const_buffer > const_buffer_sequence() const
Create const buffer sequence it is for boost asio APIs.
Definition v3_1_1_publish.hpp:275
auto payload_range() const
Get payload range.
Definition v3_1_1_publish.hpp:355
constexpr buffer const & topic() const
Get topic name.
Definition v3_1_1_publish.hpp:339
std::size_t num_of_const_buffer_sequence() const
Get number of element of const_buffer_sequence.
Definition v3_1_1_publish.hpp:306
constexpr void set_dup(bool dup)
Set dup flag.
Definition v3_1_1_publish.hpp:363
std::vector< buffer > const & payload() const
Get payload.
Definition v3_1_1_publish.hpp:347
packet_id_t packet_id() const
Get packet id.
Definition v3_1_1_publish.hpp:323
std::size_t size() const
Get packet size.
Definition v3_1_1_publish.hpp:295
basic_publish_packet(packet_id_t packet_id, buffer topic_name, BufferSequence payloads, pub::opts pubopts)
constructor
Definition v3_1_1_publish.hpp:79
constexpr pub::opts opts() const
Get publish_options.
Definition v3_1_1_publish.hpp:331
qos
MQTT QoS.
Definition qos.hpp:23
@ exactly_once
Exactly once delivery.
@ at_least_once
At least once delivery.
MQTT PublishOptions.
Definition pubopts.hpp:87