async_mqtt 4.1.0
Loading...
Searching...
No Matches
v3_1_1_subscribe.hpp
1// Copyright Takatoshi Kondo 2022
2//
3// Distributed under the Boost Software License, Version 1.0.
4// (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7#if !defined(ASYNC_MQTT_PACKET_V3_1_1_SUBSCRIBE_HPP)
8#define ASYNC_MQTT_PACKET_V3_1_1_SUBSCRIBE_HPP
9
10#include <boost/numeric/conversion/cast.hpp>
11
12#include <async_mqtt/exception.hpp>
13#include <async_mqtt/buffer.hpp>
14
15#include <async_mqtt/util/move.hpp>
16#include <async_mqtt/util/static_vector.hpp>
17#include <async_mqtt/util/endian_convert.hpp>
18#include <async_mqtt/util/utf8validate.hpp>
19
20#include <async_mqtt/packet/packet_id_type.hpp>
21#include <async_mqtt/packet/fixed_header.hpp>
22#include <async_mqtt/packet/topic_subopts.hpp>
23#include <async_mqtt/variable_bytes.hpp>
24#include <async_mqtt/packet/copy_to_static_vector.hpp>
25
26namespace async_mqtt::v3_1_1 {
27
28namespace as = boost::asio;
29
37template <std::size_t PacketIdBytes>
39public:
40 using packet_id_t = typename packet_id_type<PacketIdBytes>::type;
41
49 packet_id_t packet_id,
50 std::vector<topic_subopts> params
51 )
52 : fixed_header_{make_fixed_header(control_packet_type::subscribe, 0b0010)},
53 entries_{force_move(params)},
54 remaining_length_{PacketIdBytes}
55 {
56 topic_length_buf_entries_.reserve(entries_.size());
57 for (auto const& e : entries_) {
58 topic_length_buf_entries_.push_back(
59 endian_static_vector(
60 boost::numeric_cast<std::uint16_t>(e.all_topic().size())
61 )
62 );
63 }
64
65 endian_store(packet_id, packet_id_.data());
66
67 for (auto const& e : entries_) {
68 // reserved bits check
69 if (static_cast<std::uint8_t>(e.opts()) & 0b11111100) {
70 throw make_error(
71 errc::bad_message,
72 "v3_1_1::subscribe_packet subopts is invalid"
73 );
74 }
75 switch (e.opts().get_qos()) {
76 case qos::at_most_once:
77 case qos::at_least_once:
78 case qos::exactly_once:
79 break;
80 default:
81 throw make_error(
82 errc::bad_message,
83 "v3_1_1::subscribe_packet qos is invalid"
84 );
85 break;
86 }
87
88 auto size = e.all_topic().size();
89 if (size > 0xffff) {
90 throw make_error(
91 errc::bad_message,
92 "v3_1_1::subscribe_packet length of topic is invalid"
93 );
94 }
95 remaining_length_ +=
96 2 + // topic filter length
97 size + // topic filter
98 1; // opts
99
100 if (!utf8string_check(e.all_topic())) {
101 throw make_error(
102 errc::bad_message,
103 "v3_1_1::subscribe_packet topic filter invalid utf8"
104 );
105 }
106 }
107
108 remaining_length_buf_ = val_to_variable_bytes(boost::numeric_cast<std::uint32_t>(remaining_length_));
109 }
110
112 // fixed_header
113 if (buf.empty()) {
114 throw make_error(
115 errc::bad_message,
116 "v3_1_1::subscribe_packet fixed_header doesn't exist"
117 );
118 }
119 fixed_header_ = static_cast<std::uint8_t>(buf.front());
120 buf.remove_prefix(1);
121 auto cpt_opt = get_control_packet_type_with_check(static_cast<std::uint8_t>(fixed_header_));
122 if (!cpt_opt || *cpt_opt != control_packet_type::subscribe) {
123 throw make_error(
124 errc::bad_message,
125 "v3_1_1::subscribe_packet fixed_header is invalid"
126 );
127 }
128
129 // remaining_length
130 if (auto vl_opt = insert_advance_variable_length(buf, remaining_length_buf_)) {
131 remaining_length_ = *vl_opt;
132 }
133 else {
134 throw make_error(errc::bad_message, "v3_1_1::subscribe_packet remaining length is invalid");
135 }
136 if (remaining_length_ != buf.size()) {
137 throw make_error(errc::bad_message, "v3_1_1::subscribe_packet remaining length doesn't match buf.size()");
138 }
139
140 // packet_id
141 if (!copy_advance(buf, packet_id_)) {
142 throw make_error(
143 errc::bad_message,
144 "v3_1_1::subscribe_packet packet_id doesn't exist"
145 );
146 }
147
148 if (remaining_length_ == 0) {
149 throw make_error(errc::bad_message, "v3_1_1::subscribe_packet doesn't have entries");
150 }
151
152 while (!buf.empty()) {
153 // topic_length
154 static_vector<char, 2> topic_length_buf;
155 if (!insert_advance(buf, topic_length_buf)) {
156 throw make_error(
157 errc::bad_message,
158 "v3_1_1::subscribe_packet length of topic is invalid"
159 );
160 }
161 auto topic_length = endian_load<std::uint16_t>(topic_length_buf.data());
162 topic_length_buf_entries_.push_back(topic_length_buf);
163
164 // topic
165 if (buf.size() < topic_length) {
166 throw make_error(
167 errc::bad_message,
168 "v3_1_1::subscribe_packet topic doesn't match its length"
169 );
170 }
171 auto topic = buf.substr(0, topic_length);
172
173 if (!utf8string_check(topic)) {
174 throw make_error(
175 errc::bad_message,
176 "v3_1_1::subscribe_packet topic filter invalid utf8"
177 );
178 }
179
180 buf.remove_prefix(topic_length);
181
182 // opts
183 if (buf.empty()) {
184 throw make_error(
185 errc::bad_message,
186 "v3_1_1::subscribe_packet subscribe options doesn't exist"
187 );
188 }
189 auto opts = static_cast<sub::opts>(std::uint8_t(buf.front()));
190 if (static_cast<std::uint8_t>(opts) & 0b11111100) {
191 throw make_error(
192 errc::bad_message,
193 "v3_1_1::subscribe_packet subopts is invalid"
194 );
195 }
196 switch (opts.get_qos()) {
197 case qos::at_most_once:
198 case qos::at_least_once:
199 case qos::exactly_once:
200 break;
201 default:
202 throw make_error(
203 errc::bad_message,
204 "v3_1_1::subscribe_packet qos is invalid"
205 );
206 break;
207 }
208 entries_.emplace_back(force_move(topic), opts);
209 buf.remove_prefix(1);
210 }
211 }
212
213 constexpr control_packet_type type() const {
214 return control_packet_type::subscribe;
215 }
216
222 std::vector<as::const_buffer> const_buffer_sequence() const {
223 std::vector<as::const_buffer> ret;
225
226 ret.emplace_back(as::buffer(&fixed_header_, 1));
227
228 ret.emplace_back(as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()));
229
230 ret.emplace_back(as::buffer(packet_id_.data(), packet_id_.size()));
231
232 BOOST_ASSERT(entries_.size() == topic_length_buf_entries_.size());
233 auto it = topic_length_buf_entries_.begin();
234 for (auto const& e : entries_) {
235 ret.emplace_back(as::buffer(it->data(), it->size()));
236 ret.emplace_back(as::buffer(e.all_topic()));
237 ret.emplace_back(as::buffer(&e.opts(), 1));
238 ++it;
239 }
240
241 return ret;
242 }
243
248 std::size_t size() const {
249 return
250 1 + // fixed header
251 remaining_length_buf_.size() +
252 remaining_length_;
253 }
254
259 std::size_t num_of_const_buffer_sequence() const {
260 return
261 1 + // fixed header
262 1 + // remaining length
263 1 + // packet id
264 entries_.size() * 3; // topic name length, topic name, qos
265 }
266
271 packet_id_t packet_id() const {
272 return endian_load<packet_id_t>(packet_id_.data());
273 }
274
279 std::vector<topic_subopts> const& entries() const {
280 return entries_;
281 }
282
283private:
284 std::uint8_t fixed_header_;
285 std::vector<static_vector<char, 2>> topic_length_buf_entries_;
286 std::vector<topic_subopts> entries_;
288 std::size_t remaining_length_;
289 static_vector<char, 4> remaining_length_buf_;
290};
291
292template <std::size_t PacketIdBytes>
293inline std::ostream& operator<<(std::ostream& o, basic_subscribe_packet<PacketIdBytes> const& v) {
294 o <<
295 "v3_1_1::subscribe{" <<
296 "pid:" << v.packet_id() << ",[";
297 auto b = v.entries().cbegin();
298 auto e = v.entries().cend();
299 if (b != e) {
300 o <<
301 "{topic:" <<
302 b->all_topic() << "," <<
303 "qos:" << b->opts().get_qos() << "}";
304 ++b;
305 }
306 for (; b != e; ++b) {
307 o << "," <<
308 "{topic:" <<
309 b->all_topic() << "," <<
310 "qos:" << b->opts().get_qos() << "}";
311 }
312 o << "]}";
313 return o;
314}
315
321
322} // namespace async_mqtt::v3_1_1
323
324#endif // ASYNC_MQTT_PACKET_V3_1_1_SUBSCRIBE_HPP
Definition packet_variant.hpp:49
buffer that has string_view interface This class provides string_view interface. This class hold stri...
Definition buffer.hpp:30
buffer substr(size_type pos=0, size_type count=npos) const &
get substring The returned buffer ragnge is the same as string_view::substr(). In addition the lifeti...
Definition buffer.hpp:201
MQTT SUBSCRIBE packet (v3.1.1)
Definition v3_1_1_subscribe.hpp:38
basic_subscribe_packet(packet_id_t packet_id, std::vector< topic_subopts > params)
constructor
Definition v3_1_1_subscribe.hpp:48
packet_id_t packet_id() const
Get packet_id.
Definition v3_1_1_subscribe.hpp:271
std::size_t num_of_const_buffer_sequence() const
Get number of element of const_buffer_sequence.
Definition v3_1_1_subscribe.hpp:259
std::size_t size() const
Get packet size.
Definition v3_1_1_subscribe.hpp:248
std::vector< topic_subopts > const & entries() const
Get entries.
Definition v3_1_1_subscribe.hpp:279
std::vector< as::const_buffer > const_buffer_sequence() const
Create const buffer sequence it is for boost asio APIs.
Definition v3_1_1_subscribe.hpp:222