async_mqtt 5.0.0
Loading...
Searching...
No Matches
v5_unsubscribe.hpp
1// Copyright Takatoshi Kondo 2022
2//
3// Distributed under the Boost Software License, Version 1.0.
4// (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7#if !defined(ASYNC_MQTT_PACKET_V5_UNSUBSCRIBE_HPP)
8#define ASYNC_MQTT_PACKET_V5_UNSUBSCRIBE_HPP
9
10#include <boost/numeric/conversion/cast.hpp>
11
12#include <async_mqtt/exception.hpp>
13#include <async_mqtt/buffer.hpp>
14
15#include <async_mqtt/util/move.hpp>
16#include <async_mqtt/util/static_vector.hpp>
17#include <async_mqtt/util/endian_convert.hpp>
18#include <async_mqtt/util/utf8validate.hpp>
19
20#include <async_mqtt/packet/packet_id_type.hpp>
21#include <async_mqtt/packet/fixed_header.hpp>
22#include <async_mqtt/packet/topic_sharename.hpp>
23#include <async_mqtt/packet/reason_code.hpp>
24#include <async_mqtt/packet/property_variant.hpp>
25#include <async_mqtt/packet/copy_to_static_vector.hpp>
26
27namespace async_mqtt::v5 {
28
29namespace as = boost::asio;
30
38template <std::size_t PacketIdBytes>
40public:
41 using packet_id_t = typename packet_id_type<PacketIdBytes>::type;
42
52 packet_id_t packet_id,
53 std::vector<topic_sharename> params,
54 properties props = {}
55 )
56 : fixed_header_{make_fixed_header(control_packet_type::unsubscribe, 0b0010)},
57 entries_{force_move(params)},
58 remaining_length_{PacketIdBytes},
59 property_length_(async_mqtt::size(props)),
60 props_(force_move(props))
61 {
62 using namespace std::literals;
63 topic_length_buf_entries_.reserve(entries_.size());
64 for (auto const& e : entries_) {
65 topic_length_buf_entries_.push_back(
66 endian_static_vector(
67 boost::numeric_cast<std::uint16_t>(e.all_topic().size())
68 )
69 );
70 }
71
72 endian_store(packet_id, packet_id_.data());
73
74 auto pb = val_to_variable_bytes(boost::numeric_cast<std::uint32_t>(property_length_));
75 for (auto e : pb) {
76 property_length_buf_.push_back(e);
77 }
78
79 for (auto const& prop : props_) {
80 auto id = prop.id();
81 if (!validate_property(property_location::unsubscribe, id)) {
82 throw make_error(
83 errc::bad_message,
84 "v5::unsubscribe_packet property "s + id_to_str(id) + " is not allowed"
85 );
86 }
87 }
88
89 remaining_length_ += property_length_buf_.size() + property_length_;
90
91 for (auto const& e : entries_) {
92 auto size = e.all_topic().size();
93 if (size > 0xffff) {
94 throw make_error(
95 errc::bad_message,
96 "v5::unsubscribe_packet length of topic is invalid"
97 );
98 }
99 remaining_length_ +=
100 2 + // topic filter length
101 size; // topic filter
102
103 if (!utf8string_check(e.all_topic())) {
104 throw make_error(
105 errc::bad_message,
106 "v5::unsubscribe_packet topic filter invalid utf8"
107 );
108 }
109 }
110
111 remaining_length_buf_ = val_to_variable_bytes(boost::numeric_cast<std::uint32_t>(remaining_length_));
112 }
113
114 basic_unsubscribe_packet(buffer buf) {
115 // fixed_header
116 if (buf.empty()) {
117 throw make_error(
118 errc::bad_message,
119 "v5::unsubscribe_packet fixed_header doesn't exist"
120 );
121 }
122 fixed_header_ = static_cast<std::uint8_t>(buf.front());
123 buf.remove_prefix(1);
124 auto cpt_opt = get_control_packet_type_with_check(static_cast<std::uint8_t>(fixed_header_));
125 if (!cpt_opt || *cpt_opt != control_packet_type::unsubscribe) {
126 throw make_error(
127 errc::bad_message,
128 "v5::unsubscribe_packet fixed_header is invalid"
129 );
130 }
131
132 // remaining_length
133 if (auto vl_opt = insert_advance_variable_length(buf, remaining_length_buf_)) {
134 remaining_length_ = *vl_opt;
135 }
136 else {
137 throw make_error(errc::bad_message, "v5::unsubscribe_packet remaining length is invalid");
138 }
139 if (remaining_length_ != buf.size()) {
140 throw make_error(errc::bad_message, "v5::unsubscribe_packet remaining length doesn't match buf.size()");
141 }
142
143 // packet_id
144 if (!copy_advance(buf, packet_id_)) {
145 throw make_error(
146 errc::bad_message,
147 "v5::unsubscribe_packet packet_id doesn't exist"
148 );
149 }
150
151 // property
152 auto it = buf.begin();
153 if (auto pl_opt = variable_bytes_to_val(it, buf.end())) {
154 property_length_ = *pl_opt;
155 std::copy(buf.begin(), it, std::back_inserter(property_length_buf_));
156 buf.remove_prefix(std::size_t(std::distance(buf.begin(), it)));
157 if (buf.size() < property_length_) {
158 throw make_error(
159 errc::bad_message,
160 "v5::unsubscribe_packet properties_don't match its length"
161 );
162 }
163 auto prop_buf = buf.substr(0, property_length_);
164 props_ = make_properties(prop_buf, property_location::unsubscribe);
165 buf.remove_prefix(property_length_);
166 }
167 else {
168 throw make_error(
169 errc::bad_message,
170 "v5::unsubscribe_packet property_length is invalid"
171 );
172 }
173
174 if (remaining_length_ == 0) {
175 throw make_error(errc::bad_message, "v5::unsubscribe_packet doesn't have entries");
176 }
177
178 while (!buf.empty()) {
179 // topic_length
180 static_vector<char, 2> topic_length_buf;
181 if (!insert_advance(buf, topic_length_buf)) {
182 throw make_error(
183 errc::bad_message,
184 "v5::unsubscribe_packet length of topic is invalid"
185 );
186 }
187 auto topic_length = endian_load<std::uint16_t>(topic_length_buf.data());
188 topic_length_buf_entries_.push_back(topic_length_buf);
189
190 // topic
191 if (buf.size() < topic_length) {
192 throw make_error(
193 errc::bad_message,
194 "v5::unsubscribe_packet topic doesn't match its length"
195 );
196 }
197 auto topic = buf.substr(0, topic_length);
198 if (!utf8string_check(topic)) {
199 throw make_error(
200 errc::bad_message,
201 "v5::unsubscribe_packet topic filter invalid utf8"
202 );
203 }
204 buf.remove_prefix(topic_length);
205 entries_.emplace_back(force_move(topic));
206 }
207 }
208
209 constexpr control_packet_type type() const {
210 return control_packet_type::unsubscribe;
211 }
212
218 std::vector<as::const_buffer> const_buffer_sequence() const {
219 std::vector<as::const_buffer> ret;
221
222 ret.emplace_back(as::buffer(&fixed_header_, 1));
223
224 ret.emplace_back(as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()));
225
226 ret.emplace_back(as::buffer(packet_id_.data(), packet_id_.size()));
227
228 ret.emplace_back(as::buffer(property_length_buf_.data(), property_length_buf_.size()));
229 auto props_cbs = async_mqtt::const_buffer_sequence(props_);
230 std::move(props_cbs.begin(), props_cbs.end(), std::back_inserter(ret));
231
232 BOOST_ASSERT(entries_.size() == topic_length_buf_entries_.size());
233 auto it = topic_length_buf_entries_.begin();
234 for (auto const& e : entries_) {
235 ret.emplace_back(as::buffer(it->data(), it->size()));
236 ret.emplace_back(as::buffer(e.all_topic()));
237 ++it;
238 }
239
240 return ret;
241 }
242
247 std::size_t size() const {
248 return
249 1 + // fixed header
250 remaining_length_buf_.size() +
251 remaining_length_;
252 }
253
258 std::size_t num_of_const_buffer_sequence() const {
259 return
260 1 + // fixed header
261 1 + // remaining length
262 1 + // packet id
263 [&] () -> std::size_t {
264 if (property_length_buf_.size() == 0) return 0;
265 return
266 1 + // property length
267 async_mqtt::num_of_const_buffer_sequence(props_);
268 }() +
269 entries_.size() * 2; // topic name length, topic name
270 }
271
276 packet_id_t packet_id() const {
277 return endian_load<packet_id_t>(packet_id_.data());
278 }
279
284 std::vector<topic_sharename> const& entries() const {
285 return entries_;
286 }
287
292 properties const& props() const {
293 return props_;
294 }
295
296private:
297 std::uint8_t fixed_header_;
298 std::vector<static_vector<char, 2>> topic_length_buf_entries_;
299 std::vector<topic_sharename> entries_;
301 std::size_t remaining_length_;
302 static_vector<char, 4> remaining_length_buf_;
303
304 std::size_t property_length_ = 0;
305 static_vector<char, 4> property_length_buf_;
306 properties props_;
307};
308
309template <std::size_t PacketIdBytes>
310inline std::ostream& operator<<(std::ostream& o, basic_unsubscribe_packet<PacketIdBytes> const& v) {
311 o <<
312 "v5::unsubscribe{" <<
313 "pid:" << v.packet_id() << ",[";
314 auto b = v.entries().cbegin();
315 auto e = v.entries().cend();
316 if (b != e) {
317 o <<
318 "{topic:" << b->topic() << "," <<
319 "sn:" << b->sharename() << "}";
320 ++b;
321 }
322 for (; b != e; ++b) {
323 o << "," <<
324 "{topic:" << b->topic() << "," <<
325 "sn:" << b->sharename() << "}";
326 }
327 o << "]";
328 if (!v.props().empty()) {
329 o << ",ps:" << v.props();
330 };
331 o << "}";
332 return o;
333}
334
335using unsubscribe_packet = basic_unsubscribe_packet<2>;
336
337} // namespace async_mqtt::v5
338
339#endif // ASYNC_MQTT_PACKET_V5_UNSUBSCRIBE_HPP
Definition packet_variant.hpp:49
MQTT UNSUBSCRIBE packet (v5)
Definition v5_unsubscribe.hpp:39
basic_unsubscribe_packet(packet_id_t packet_id, std::vector< topic_sharename > params, properties props={})
constructor
Definition v5_unsubscribe.hpp:51
std::vector< as::const_buffer > const_buffer_sequence() const
Create const buffer sequence it is for boost asio APIs.
Definition v5_unsubscribe.hpp:218
std::size_t num_of_const_buffer_sequence() const
Get number of element of const_buffer_sequence.
Definition v5_unsubscribe.hpp:258
std::size_t size() const
Get packet size.
Definition v5_unsubscribe.hpp:247
properties const & props() const
Definition v5_unsubscribe.hpp:292
std::vector< topic_sharename > const & entries() const
Get entries.
Definition v5_unsubscribe.hpp:284
packet_id_t packet_id() const
Get packet_id.
Definition v5_unsubscribe.hpp:276