async_mqtt 5.0.0
Loading...
Searching...
No Matches
v5_subscribe.hpp
1// Copyright Takatoshi Kondo 2022
2//
3// Distributed under the Boost Software License, Version 1.0.
4// (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7#if !defined(ASYNC_MQTT_PACKET_V5_SUBSCRIBE_HPP)
8#define ASYNC_MQTT_PACKET_V5_SUBSCRIBE_HPP
9
10#include <boost/numeric/conversion/cast.hpp>
11
12#include <async_mqtt/exception.hpp>
13#include <async_mqtt/buffer.hpp>
14
15#include <async_mqtt/util/move.hpp>
16#include <async_mqtt/util/static_vector.hpp>
17#include <async_mqtt/util/endian_convert.hpp>
18#include <async_mqtt/util/utf8validate.hpp>
19
20#include <async_mqtt/packet/packet_id_type.hpp>
21#include <async_mqtt/packet/fixed_header.hpp>
22#include <async_mqtt/packet/topic_subopts.hpp>
23#include <async_mqtt/packet/reason_code.hpp>
24#include <async_mqtt/packet/property_variant.hpp>
25#include <async_mqtt/packet/copy_to_static_vector.hpp>
26
27namespace async_mqtt::v5 {
28
29namespace as = boost::asio;
30
38template <std::size_t PacketIdBytes>
40public:
41 using packet_id_t = typename packet_id_type<PacketIdBytes>::type;
42
52 packet_id_t packet_id,
53 std::vector<topic_subopts> params,
54 properties props = {}
55 )
56 : fixed_header_{make_fixed_header(control_packet_type::subscribe, 0b0010)},
57 entries_{force_move(params)},
58 remaining_length_{PacketIdBytes},
59 property_length_(async_mqtt::size(props)),
60 props_(force_move(props))
61 {
62 using namespace std::literals;
63 topic_length_buf_entries_.reserve(entries_.size());
64 for (auto const& e : entries_) {
65 topic_length_buf_entries_.push_back(
66 endian_static_vector(
67 boost::numeric_cast<std::uint16_t>(e.all_topic().size())
68 )
69 );
70 }
71
72 endian_store(packet_id, packet_id_.data());
73
74 auto pb = val_to_variable_bytes(boost::numeric_cast<std::uint32_t>(property_length_));
75 for (auto e : pb) {
76 property_length_buf_.push_back(e);
77 }
78
79 for (auto const& prop : props_) {
80 auto id = prop.id();
81 if (!validate_property(property_location::subscribe, id)) {
82 throw make_error(
83 errc::bad_message,
84 "v5::subscribe_packet property "s + id_to_str(id) + " is not allowed"
85 );
86 }
87 }
88
89 remaining_length_ += property_length_buf_.size() + property_length_;
90
91 for (auto const& e : entries_) {
92 // reserved bits check
93 if (static_cast<std::uint8_t>(e.opts()) & 0b11000000) {
94 throw make_error(
95 errc::bad_message,
96 "v5::subscribe_packet subopts is invalid"
97 );
98 }
99 switch (e.opts().get_qos()) {
100 case qos::at_most_once:
101 case qos::at_least_once:
102 case qos::exactly_once:
103 break;
104 default:
105 throw make_error(
106 errc::bad_message,
107 "v5::subscribe_packet qos is invalid"
108 );
109 break;
110 }
111 switch (e.opts().get_retain_handling()) {
112 case sub::retain_handling::send:
113 case sub::retain_handling::send_only_new_subscription:
114 case sub::retain_handling::not_send:
115 break;
116 default:
117 throw make_error(
118 errc::bad_message,
119 "v5::subscribe_packet retain_handling is invalid"
120 );
121 break;
122 }
123
124 auto size = e.all_topic().size();
125 if (size > 0xffff) {
126 throw make_error(
127 errc::bad_message,
128 "v5::subscribe_packet length of topic is invalid"
129 );
130 }
131 remaining_length_ +=
132 2 + // topic name length
133 size + // topic filter
134 1; // opts
135
136 if (!utf8string_check(e.all_topic())) {
137 throw make_error(
138 errc::bad_message,
139 "v5::subscribe_packet topic filter invalid utf8"
140 );
141 }
142 }
143
144 remaining_length_buf_ = val_to_variable_bytes(boost::numeric_cast<std::uint32_t>(remaining_length_));
145 }
146
147 basic_subscribe_packet(buffer buf) {
148 // fixed_header
149 if (buf.empty()) {
150 throw make_error(
151 errc::bad_message,
152 "v5::subscribe_packet fixed_header doesn't exist"
153 );
154 }
155 fixed_header_ = static_cast<std::uint8_t>(buf.front());
156 buf.remove_prefix(1);
157 auto cpt_opt = get_control_packet_type_with_check(static_cast<std::uint8_t>(fixed_header_));
158 if (!cpt_opt || *cpt_opt != control_packet_type::subscribe) {
159 throw make_error(
160 errc::bad_message,
161 "v5::subscribe_packet fixed_header is invalid"
162 );
163 }
164
165 // remaining_length
166 if (auto vl_opt = insert_advance_variable_length(buf, remaining_length_buf_)) {
167 remaining_length_ = *vl_opt;
168 }
169 else {
170 throw make_error(errc::bad_message, "v5::subscribe_packet remaining length is invalid");
171 }
172 if (remaining_length_ != buf.size()) {
173 throw make_error(errc::bad_message, "v5::subscribe_packet remaining length doesn't match buf.size()");
174 }
175
176 // packet_id
177 if (!copy_advance(buf, packet_id_)) {
178 throw make_error(
179 errc::bad_message,
180 "v5::subscribe_packet packet_id doesn't exist"
181 );
182 }
183
184 // property
185 auto it = buf.begin();
186 if (auto pl_opt = variable_bytes_to_val(it, buf.end())) {
187 property_length_ = *pl_opt;
188 std::copy(buf.begin(), it, std::back_inserter(property_length_buf_));
189 buf.remove_prefix(std::size_t(std::distance(buf.begin(), it)));
190 if (buf.size() < property_length_) {
191 throw make_error(
192 errc::bad_message,
193 "v5::subscribe_packet properties_don't match its length"
194 );
195 }
196 auto prop_buf = buf.substr(0, property_length_);
197 props_ = make_properties(prop_buf, property_location::subscribe);
198 buf.remove_prefix(property_length_);
199 }
200 else {
201 throw make_error(
202 errc::bad_message,
203 "v5::subscribe_packet property_length is invalid"
204 );
205 }
206
207 if (remaining_length_ == 0) {
208 throw make_error(errc::bad_message, "v5::subscribe_packet doesn't have entries");
209 }
210
211 while (!buf.empty()) {
212 // topic_length
213 static_vector<char, 2> topic_length_buf;
214 if (!insert_advance(buf, topic_length_buf)) {
215 throw make_error(
216 errc::bad_message,
217 "v5::subscribe_packet length of topic is invalid"
218 );
219 }
220 auto topic_length = endian_load<std::uint16_t>(topic_length_buf.data());
221 topic_length_buf_entries_.push_back(topic_length_buf);
222
223 // topic
224 if (buf.size() < topic_length) {
225 throw make_error(
226 errc::bad_message,
227 "v5::subscribe_packet topic doesn't match its length"
228 );
229 }
230 auto topic = buf.substr(0, topic_length);
231
232 if (!utf8string_check(topic)) {
233 throw make_error(
234 errc::bad_message,
235 "v5::subscribe_packet topic filter invalid utf8"
236 );
237 }
238
239 buf.remove_prefix(topic_length);
240
241 // opts
242 if (buf.empty()) {
243 throw make_error(
244 errc::bad_message,
245 "v5::subscribe_packet subscribe options doesn't exist"
246 );
247 }
248 auto opts = static_cast<sub::opts>(std::uint8_t(buf.front()));
249 // reserved bits check
250 if (static_cast<std::uint8_t>(opts) & 0b11000000) {
251 throw make_error(
252 errc::bad_message,
253 "v5::subscribe_packet subopts is invalid"
254 );
255 }
256 switch (opts.get_qos()) {
257 case qos::at_most_once:
258 case qos::at_least_once:
259 case qos::exactly_once:
260 break;
261 default:
262 throw make_error(
263 errc::bad_message,
264 "v5::subscribe_packet qos is invalid"
265 );
266 break;
267 }
268 switch (opts.get_retain_handling()) {
269 case sub::retain_handling::send:
270 case sub::retain_handling::send_only_new_subscription:
271 case sub::retain_handling::not_send:
272 break;
273 default:
274 throw make_error(
275 errc::bad_message,
276 "v5::subscribe_packet retain_handling is invalid"
277 );
278 break;
279 }
280 entries_.emplace_back(force_move(topic), opts);
281 buf.remove_prefix(1);
282 }
283 }
284
285 constexpr control_packet_type type() const {
286 return control_packet_type::subscribe;
287 }
288
294 std::vector<as::const_buffer> const_buffer_sequence() const {
295 std::vector<as::const_buffer> ret;
297
298 ret.emplace_back(as::buffer(&fixed_header_, 1));
299
300 ret.emplace_back(as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()));
301
302 ret.emplace_back(as::buffer(packet_id_.data(), packet_id_.size()));
303
304 ret.emplace_back(as::buffer(property_length_buf_.data(), property_length_buf_.size()));
305 auto props_cbs = async_mqtt::const_buffer_sequence(props_);
306 std::move(props_cbs.begin(), props_cbs.end(), std::back_inserter(ret));
307
308 BOOST_ASSERT(entries_.size() == topic_length_buf_entries_.size());
309 auto it = topic_length_buf_entries_.begin();
310 for (auto const& e : entries_) {
311 ret.emplace_back(as::buffer(it->data(), it->size()));
312 ret.emplace_back(as::buffer(e.all_topic()));
313 ret.emplace_back(as::buffer(&e.opts(), 1));
314 ++it;
315 }
316
317 return ret;
318 }
319
324 std::size_t size() const {
325 return
326 1 + // fixed header
327 remaining_length_buf_.size() +
328 remaining_length_;
329 }
330
335 std::size_t num_of_const_buffer_sequence() const {
336 return
337 1 + // fixed header
338 1 + // remaining length
339 1 + // packet id
340 [&] () -> std::size_t {
341 if (property_length_buf_.size() == 0) return 0;
342 return
343 1 + // property length
344 async_mqtt::num_of_const_buffer_sequence(props_);
345 }() +
346 entries_.size() * 3; // topic name length, topic name, opts
347 }
348
353 packet_id_t packet_id() const {
354 return endian_load<packet_id_t>(packet_id_.data());
355 }
356
361 std::vector<topic_subopts> const& entries() const {
362 return entries_;
363 }
364
369 properties const& props() const {
370 return props_;
371 }
372
373private:
374 std::uint8_t fixed_header_;
375 std::vector<static_vector<char, 2>> topic_length_buf_entries_;
376 std::vector<topic_subopts> entries_;
378 std::size_t remaining_length_;
379 static_vector<char, 4> remaining_length_buf_;
380
381 std::size_t property_length_ = 0;
382 static_vector<char, 4> property_length_buf_;
383 properties props_;
384};
385
386template <std::size_t PacketIdBytes>
387inline std::ostream& operator<<(std::ostream& o, basic_subscribe_packet<PacketIdBytes> const& v) {
388 o <<
389 "v5::subscribe{" <<
390 "pid:" << v.packet_id() << ",[";
391 auto b = v.entries().cbegin();
392 auto e = v.entries().cend();
393 if (b != e) {
394 o <<
395 "{"
396 "topic:" << b->topic() << "," <<
397 "sn:" << b->sharename() << "," <<
398 "qos:" << b->opts().get_qos() << "," <<
399 "rh:" << b->opts().get_retain_handling() << "," <<
400 "nl:" << b->opts().get_nl() << "," <<
401 "rap:" << b->opts().get_rap() <<
402 "}";
403 ++b;
404 }
405 for (; b != e; ++b) {
406 o << "," <<
407 "{"
408 "topic:" << b->topic() << "," <<
409 "sn:" << b->sharename() << "," <<
410 "qos:" << b->opts().get_qos() << "," <<
411 "rh:" << b->opts().get_retain_handling() << "," <<
412 "nl:" << b->opts().get_nl() << "," <<
413 "rap:" << b->opts().get_rap() <<
414 "}";
415 }
416 o << "]";
417 if (!v.props().empty()) {
418 o << ",ps:" << v.props();
419 };
420 o << "}";
421 return o;
422}
423
424using subscribe_packet = basic_subscribe_packet<2>;
425
426} // namespace async_mqtt::v5
427
428#endif // ASYNC_MQTT_PACKET_V5_SUBSCRIBE_HPP
Definition packet_variant.hpp:49
MQTT SUBSCRIBE packet (v5)
Definition v5_subscribe.hpp:39
std::size_t size() const
Get packet size.
Definition v5_subscribe.hpp:324
std::vector< topic_subopts > const & entries() const
Get entries.
Definition v5_subscribe.hpp:361
packet_id_t packet_id() const
Get packet_id.
Definition v5_subscribe.hpp:353
basic_subscribe_packet(packet_id_t packet_id, std::vector< topic_subopts > params, properties props={})
constructor
Definition v5_subscribe.hpp:51
properties const & props() const
Definition v5_subscribe.hpp:369
std::vector< as::const_buffer > const_buffer_sequence() const
Create const buffer sequence it is for boost asio APIs.
Definition v5_subscribe.hpp:294
std::size_t num_of_const_buffer_sequence() const
Get number of element of const_buffer_sequence.
Definition v5_subscribe.hpp:335