async_mqtt 9.0.1
Loading...
Searching...
No Matches
store.hpp
1// Copyright Takatoshi Kondo 2022
2//
3// Distributed under the Boost Software License, Version 1.0.
4// (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7#if !defined(ASYNC_MQTT_UTIL_STORE_HPP)
8#define ASYNC_MQTT_UTIL_STORE_HPP
9
10#include <boost/asio/steady_timer.hpp>
11#include <boost/multi_index_container.hpp>
12#include <boost/multi_index/key.hpp>
13#include <boost/multi_index/sequenced_index.hpp>
14#include <boost/multi_index/ordered_index.hpp>
15#include <boost/multi_index/hashed_index.hpp>
16
17#include <async_mqtt/util/log.hpp>
18#include <async_mqtt/packet/store_packet_variant.hpp>
19#include <async_mqtt/packet/packet_traits.hpp>
20
21
22namespace async_mqtt {
23
24namespace as = boost::asio;
25namespace mi = boost::multi_index;
26
27template <std::size_t PacketIdBytes>
28class store {
29public:
30 using store_packet_type = basic_store_packet_variant<PacketIdBytes>;
31
32 explicit store(as::any_io_executor exe):exe_{exe}{}
33
34 template <typename Packet>
35 bool add(Packet const& packet) {
36 if constexpr(is_publish<Packet>()) {
37 if (packet.opts().get_qos() == qos::at_least_once ||
38 packet.opts().get_qos() == qos::exactly_once) {
39 std::uint32_t sec = 0;
40 if constexpr(is_v5<Packet>()) {
41 bool finish = false;
42 for (auto const& prop : packet.props()) {
43 prop.visit(
44 overload {
46 sec = p.val();
47 finish = true;
48 },
49 [](auto const&) {
50 }
51 }
52 );
53 if (finish) break;
54 }
55 }
56 if (sec == 0) {
57 return elems_.emplace_back(packet).second;
58 }
59 else {
60 auto tim = std::make_shared<as::steady_timer>(exe_);
61 tim->expires_after(std::chrono::seconds(sec));
62 tim->async_wait(
63 [this, wp = std::weak_ptr<as::steady_timer>(tim)]
64 (error_code const& ec) {
65 if (auto tim = wp.lock()) {
66 if (!ec) {
67 auto& idx = elems_.template get<tag_tim>();
68 auto it = idx.find(tim.get());
69 if (it == idx.end()) return;
70 ASYNC_MQTT_LOG("mqtt_impl", info)
71 << "[store] message expired:" << it->packet;
72 idx.erase(it);
73 }
74 }
75 }
76 );
77 return elems_.emplace_back(packet, tim).second;
78 }
79 }
80 }
81 else if constexpr(is_pubrel<Packet>()) {
82 return elems_.emplace_back(packet).second;
83 }
84 return false;
85 }
86
87 bool erase(response_packet r, typename basic_packet_id_type<PacketIdBytes>::type packet_id) {
88 ASYNC_MQTT_LOG("mqtt_impl", info)
89 << "[store] erase pid:" << packet_id;
90 auto& idx = elems_.template get<tag_res_id>();
91 auto it = idx.find(std::make_tuple(r, packet_id));
92 if (it == idx.end()) return false;
93 idx.erase(it);
94 return true;
95 }
96
97 void clear() {
98 ASYNC_MQTT_LOG("mqtt_impl", info)
99 << "[store] clear";
100 elems_.clear();
101 }
102
103 template <typename Func>
104 void for_each(Func const& func) {
105 ASYNC_MQTT_LOG("mqtt_impl", info)
106 << "[store] for_each";
107 for (auto it = elems_.begin(); it != elems_.end();) {
108 if (func(it->packet)) {
109 ++it;
110 }
111 else {
112 it = elems_.erase(it);
113 }
114 }
115 }
116
117 std::vector<store_packet_type> get_stored() const {
118 ASYNC_MQTT_LOG("mqtt_impl", info)
119 << "[store] get_stored";
120 std::vector<store_packet_type> ret;
121 ret.reserve(elems_.size());
122 for (auto elem : elems_) {
123 if (elem.tim) {
124 auto d =
125 std::chrono::duration_cast<std::chrono::seconds>(
126 elem.tim->expiry() - std::chrono::steady_clock::now()
127 ).count();
128 if (d < 0) d = 0;
129 elem.packet.update_message_expiry_interval(static_cast<std::uint32_t>(d));
130 }
131 ret.push_back(force_move(elem.packet));
132 }
133 return ret;
134 }
135
136private:
137 struct elem_t {
138 elem_t(
139 store_packet_type packet,
140 std::shared_ptr<as::steady_timer> tim = nullptr
141 ): packet{force_move(packet)}, tim{force_move(tim)} {}
142
143 typename basic_packet_id_type<PacketIdBytes>::type packet_id() const {
144 return packet.packet_id();
145 }
146
147 response_packet response_packet_type() const {
148 return packet.response_packet_type();
149 }
150
151 void const* tim_address() const {
152 return tim.get();
153 }
154
155 store_packet_type packet;
156 std::shared_ptr<as::steady_timer> tim = nullptr;
157 };
158 struct tag_seq{};
159 struct tag_res_id{};
160 struct tag_tim{};
161 using mi_elem = mi::multi_index_container<
162 elem_t,
163 mi::indexed_by<
164 mi::sequenced<
165 mi::tag<tag_seq>
166 >,
167 mi::ordered_unique<
168 mi::tag<tag_res_id>,
169 mi::key<
170 &elem_t::response_packet_type,
171 &elem_t::packet_id
172 >
173 >,
174 mi::hashed_non_unique<
175 mi::tag<tag_tim>,
176 mi::key<
177 &elem_t::tim_address
178 >
179 >
180 >
181 >;
182
183 mi_elem elems_;
184 as::any_io_executor exe_;
185};
186
187} // namespace async_mqtt
188
189#endif // ASYNC_MQTT_UTIL_STORE_HPP
sys::error_code error_code
sys is a namespace alias of boost::sytem.
Definition error.hpp:56
@ info
info level api call is output
@ message_expiry_interval
Message Expiry Interval.
@ exactly_once
Exactly once delivery.
@ at_least_once
At least once delivery.
response_packet
corresponding response packet
Definition store_packet_variant.hpp:31