40 using packet_id_t =
typename packet_id_type<PacketIdBytes>::type;
50 std::vector<topic_subopts>
params
52 : fixed_header_{make_fixed_header(control_packet_type::subscribe, 0b0010)},
53 entries_{force_move(
params)},
56 topic_length_buf_entries_.reserve(entries_.size());
57 for (
auto const&
e : entries_) {
58 topic_length_buf_entries_.push_back(
60 boost::numeric_cast<std::uint16_t>(
e.all_topic().size())
65 endian_store(
packet_id, packet_id_.data());
67 for (
auto const&
e : entries_) {
69 if (
static_cast<std::uint8_t
>(
e.opts()) & 0b11111100) {
72 "v3_1_1::subscribe_packet subopts is invalid"
75 switch (
e.opts().get_qos()) {
76 case qos::at_most_once:
77 case qos::at_least_once:
78 case qos::exactly_once:
83 "v3_1_1::subscribe_packet qos is invalid"
88 auto size =
e.all_topic().size();
92 "v3_1_1::subscribe_packet length of topic is invalid"
100 if (!utf8string_check(
e.all_topic())) {
103 "v3_1_1::subscribe_packet topic filter invalid utf8"
108 remaining_length_buf_ = val_to_variable_bytes(boost::numeric_cast<std::uint32_t>(remaining_length_));
116 "v3_1_1::subscribe_packet fixed_header doesn't exist"
119 fixed_header_ =
static_cast<std::uint8_t
>(buf.front());
120 buf.remove_prefix(1);
121 auto cpt_opt = get_control_packet_type_with_check(
static_cast<std::uint8_t
>(fixed_header_));
125 "v3_1_1::subscribe_packet fixed_header is invalid"
130 if (
auto vl_opt = insert_advance_variable_length(buf, remaining_length_buf_)) {
131 remaining_length_ = *vl_opt;
134 throw make_error(errc::bad_message,
"v3_1_1::subscribe_packet remaining length is invalid");
136 if (remaining_length_ != buf.size()) {
137 throw make_error(errc::bad_message,
"v3_1_1::subscribe_packet remaining length doesn't match buf.size()");
141 if (!copy_advance(buf, packet_id_)) {
144 "v3_1_1::subscribe_packet packet_id doesn't exist"
148 if (remaining_length_ == 0) {
149 throw make_error(errc::bad_message,
"v3_1_1::subscribe_packet doesn't have entries");
152 while (!buf.empty()) {
154 static_vector<char, 2> topic_length_buf;
155 if (!insert_advance(buf, topic_length_buf)) {
158 "v3_1_1::subscribe_packet length of topic is invalid"
161 auto topic_length = endian_load<std::uint16_t>(topic_length_buf.data());
162 topic_length_buf_entries_.push_back(topic_length_buf);
165 if (buf.size() < topic_length) {
168 "v3_1_1::subscribe_packet topic doesn't match its length"
171 auto topic = buf.
substr(0, topic_length);
173 if (!utf8string_check(topic)) {
176 "v3_1_1::subscribe_packet topic filter invalid utf8"
180 buf.remove_prefix(topic_length);
186 "v3_1_1::subscribe_packet subscribe options doesn't exist"
189 auto opts =
static_cast<sub::opts
>(std::uint8_t(buf.front()));
190 if (
static_cast<std::uint8_t
>(opts) & 0b11111100) {
193 "v3_1_1::subscribe_packet subopts is invalid"
196 switch (opts.get_qos()) {
197 case qos::at_most_once:
198 case qos::at_least_once:
199 case qos::exactly_once:
204 "v3_1_1::subscribe_packet qos is invalid"
208 entries_.emplace_back(force_move(topic), opts);
209 buf.remove_prefix(1);
213 constexpr control_packet_type type()
const {
214 return control_packet_type::subscribe;
223 std::vector<as::const_buffer>
ret;
226 ret.emplace_back(as::buffer(&fixed_header_, 1));
228 ret.emplace_back(as::buffer(remaining_length_buf_.data(), remaining_length_buf_.size()));
230 ret.emplace_back(as::buffer(packet_id_.data(), packet_id_.size()));
232 BOOST_ASSERT(entries_.size() == topic_length_buf_entries_.size());
233 auto it = topic_length_buf_entries_.begin();
234 for (
auto const&
e : entries_) {
235 ret.emplace_back(as::buffer(
it->data(),
it->size()));
236 ret.emplace_back(as::buffer(
e.all_topic()));
237 ret.emplace_back(as::buffer(&
e.opts(), 1));
251 remaining_length_buf_.size() +
279 std::vector<topic_subopts>
const&
entries()
const {
284 std::uint8_t fixed_header_;
285 std::vector<static_vector<char, 2>> topic_length_buf_entries_;
286 std::vector<topic_subopts> entries_;
288 std::size_t remaining_length_;
buffer substr(size_type pos=0, size_type count=npos) const &
get substring The returned buffer ragnge is the same as string_view::substr(). In addition the lifeti...
Definition buffer.hpp:201