41 using packet_id_t =
typename packet_id_type<PacketIdBytes>::type;
53 std::vector<topic_subopts>
params,
56 : fixed_header_{make_fixed_header(control_packet_type::subscribe, 0b0010)},
57 entries_{force_move(
params)},
59 property_length_(async_mqtt::
size(
props)),
60 props_(force_move(
props))
62 using namespace std::literals;
63 topic_length_buf_entries_.reserve(entries_.size());
64 for (
auto const&
e : entries_) {
65 topic_length_buf_entries_.push_back(
67 boost::numeric_cast<std::uint16_t>(
e.all_topic().size())
72 endian_store(
packet_id, packet_id_.data());
74 auto pb = val_to_variable_bytes(boost::numeric_cast<std::uint32_t>(property_length_));
76 property_length_buf_.push_back(
e);
79 for (
auto const& prop : props_) {
81 if (!validate_property(property_location::subscribe,
id)) {
84 "v5::subscribe_packet property "s + id_to_str(
id) +
" is not allowed"
89 remaining_length_ += property_length_buf_.size() + property_length_;
91 for (
auto const& e : entries_) {
93 if (
static_cast<std::uint8_t
>(e.opts()) & 0b11000000) {
96 "v5::subscribe_packet subopts is invalid"
99 switch (e.opts().get_qos()) {
100 case qos::at_most_once:
101 case qos::at_least_once:
102 case qos::exactly_once:
107 "v5::subscribe_packet qos is invalid"
111 switch (e.opts().get_retain_handling()) {
112 case sub::retain_handling::send:
113 case sub::retain_handling::send_only_new_subscription:
114 case sub::retain_handling::not_send:
119 "v5::subscribe_packet retain_handling is invalid"
124 auto size = e.all_topic().size();
128 "v5::subscribe_packet length of topic is invalid"
136 if (!utf8string_check(e.all_topic())) {
139 "v5::subscribe_packet topic filter invalid utf8"
144 remaining_length_buf_ = val_to_variable_bytes(boost::numeric_cast<std::uint32_t>(remaining_length_));
152 "v5::subscribe_packet fixed_header doesn't exist"
155 fixed_header_ =
static_cast<std::uint8_t
>(buf.front());
156 buf.remove_prefix(1);
157 auto cpt_opt = get_control_packet_type_with_check(
static_cast<std::uint8_t
>(fixed_header_));
158 if (!cpt_opt || *cpt_opt != control_packet_type::subscribe) {
161 "v5::subscribe_packet fixed_header is invalid"
166 if (
auto vl_opt = insert_advance_variable_length(buf, remaining_length_buf_)) {
167 remaining_length_ = *vl_opt;
170 throw make_error(errc::bad_message,
"v5::subscribe_packet remaining length is invalid");
172 if (remaining_length_ != buf.size()) {
173 throw make_error(errc::bad_message,
"v5::subscribe_packet remaining length doesn't match buf.size()");
177 if (!copy_advance(buf, packet_id_)) {
180 "v5::subscribe_packet packet_id doesn't exist"
185 auto it = buf.begin();
186 if (
auto pl_opt = variable_bytes_to_val(it, buf.end())) {
187 property_length_ = *pl_opt;
188 std::copy(buf.begin(), it, std::back_inserter(property_length_buf_));
189 buf.remove_prefix(std::size_t(std::distance(buf.begin(), it)));
190 if (buf.size() < property_length_) {
193 "v5::subscribe_packet properties_don't match its length"
196 auto prop_buf = buf.substr(0, property_length_);
197 props_ = make_properties(prop_buf, property_location::subscribe);
198 buf.remove_prefix(property_length_);
203 "v5::subscribe_packet property_length is invalid"
207 if (remaining_length_ == 0) {
208 throw make_error(errc::bad_message,
"v5::subscribe_packet doesn't have entries");
211 while (!buf.empty()) {
213 static_vector<char, 2> topic_length_buf;
214 if (!insert_advance(buf, topic_length_buf)) {
217 "v5::subscribe_packet length of topic is invalid"
220 auto topic_length = endian_load<std::uint16_t>(topic_length_buf.data());
221 topic_length_buf_entries_.push_back(topic_length_buf);
224 if (buf.size() < topic_length) {
227 "v5::subscribe_packet topic doesn't match its length"
230 auto topic = buf.substr(0, topic_length);
232 if (!utf8string_check(topic)) {
235 "v5::subscribe_packet topic filter invalid utf8"
239 buf.remove_prefix(topic_length);
245 "v5::subscribe_packet subscribe options doesn't exist"
248 auto opts =
static_cast<sub::opts
>(std::uint8_t(buf.front()));
250 if (
static_cast<std::uint8_t
>(opts) & 0b11000000) {
253 "v5::subscribe_packet subopts is invalid"
256 switch (opts.get_qos()) {
257 case qos::at_most_once:
258 case qos::at_least_once:
259 case qos::exactly_once:
264 "v5::subscribe_packet qos is invalid"
268 switch (opts.get_retain_handling()) {
269 case sub::retain_handling::send:
270 case sub::retain_handling::send_only_new_subscription:
271 case sub::retain_handling::not_send:
276 "v5::subscribe_packet retain_handling is invalid"
280 entries_.emplace_back(force_move(topic), opts);
281 buf.remove_prefix(1);
285 constexpr control_packet_type type()
const {
286 return control_packet_type::subscribe;
295 std::vector<as::const_buffer>
ret;
298 ret.emplace_back(as::buffer(&fixed_header_, 1));
300 ret.emplace_back(as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()));
302 ret.emplace_back(as::buffer(packet_id_.data(), packet_id_.size()));
304 ret.emplace_back(as::buffer(property_length_buf_.data(), property_length_buf_.size()));
305 auto props_cbs = async_mqtt::const_buffer_sequence(props_);
308 BOOST_ASSERT(entries_.size() == topic_length_buf_entries_.size());
309 auto it = topic_length_buf_entries_.begin();
310 for (
auto const&
e : entries_) {
311 ret.emplace_back(as::buffer(
it->data(),
it->size()));
312 ret.emplace_back(as::buffer(
e.all_topic()));
313 ret.emplace_back(as::buffer(&
e.opts(), 1));
327 remaining_length_buf_.size() +
340 [&] () -> std::size_t {
341 if (property_length_buf_.size() == 0)
return 0;
344 async_mqtt::num_of_const_buffer_sequence(props_);
361 std::vector<topic_subopts>
const&
entries()
const {
374 std::uint8_t fixed_header_;
375 std::vector<static_vector<char, 2>> topic_length_buf_entries_;
376 std::vector<topic_subopts> entries_;
378 std::size_t remaining_length_;
381 std::size_t property_length_ = 0;