async_mqtt 4.1.0
Loading...
Searching...
No Matches
store.hpp
1// Copyright Takatoshi Kondo 2022
2//
3// Distributed under the Boost Software License, Version 1.0.
4// (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7#if !defined(ASYNC_MQTT_STORE_HPP)
8#define ASYNC_MQTT_STORE_HPP
9
10#include <boost/asio/steady_timer.hpp>
11#include <boost/multi_index_container.hpp>
12#include <boost/multi_index/key.hpp>
13#include <boost/multi_index/sequenced_index.hpp>
14#include <boost/multi_index/ordered_index.hpp>
15#include <boost/multi_index/hashed_index.hpp>
16
17#include <async_mqtt/log.hpp>
19#include <async_mqtt/packet/packet_traits.hpp>
20
21
22namespace async_mqtt {
23
24namespace as = boost::asio;
25namespace mi = boost::multi_index;
26
27template <std::size_t PacketIdBytes, typename Executor>
28class store {
29public:
30 using packet_id_t = typename packet_id_type<PacketIdBytes>::type;
31 using store_packet_t = basic_store_packet_variant<PacketIdBytes>;
32
33 store(Executor exe):exe_{exe}{}
34
35 template <typename Packet>
36 bool add(Packet const& packet) {
37 if constexpr(is_publish<Packet>()) {
38 if (packet.opts().get_qos() == qos::at_least_once ||
39 packet.opts().get_qos() == qos::exactly_once) {
40 std::uint32_t sec = 0;
41 if constexpr(is_v5<Packet>()) {
42 bool finish = false;
43 for (auto const& prop : packet.props()) {
44 prop.visit(
45 overload {
46 [&](property::message_expiry_interval const& p) {
47 sec = p.val();
48 finish = true;
49 },
50 [](auto const&) {
51 }
52 }
53 );
54 if (finish) break;
55 }
56 }
57 if (sec == 0) {
58 return elems_.emplace_back(packet).second;
59 }
60 else {
61 auto tim = std::make_shared<as::steady_timer>(exe_);
62 tim->expires_after(std::chrono::seconds(sec));
63 tim->async_wait(
64 [this, wp = std::weak_ptr<as::steady_timer>(tim)]
65 (error_code const& ec) {
66 if (auto tim = wp.lock()) {
67 if (!ec) {
68 auto& idx = elems_.template get<tag_tim>();
69 auto it = idx.find(tim.get());
70 if (it == idx.end()) return;
71 ASYNC_MQTT_LOG("mqtt_impl", info)
72 << "[store] message expired:" << it->packet;
73 idx.erase(it);
74 }
75 }
76 }
77 );
78 return elems_.emplace_back(packet, tim).second;
79 }
80 }
81 }
82 else if constexpr(is_pubrel<Packet>()) {
83 return elems_.emplace_back(packet).second;
84 }
85 return false;
86 }
87
88 bool erase(response_packet r, packet_id_t packet_id) {
89 ASYNC_MQTT_LOG("mqtt_impl", info)
90 << "[store] erase pid:" << packet_id;
91 auto& idx = elems_.template get<tag_res_id>();
92 auto it = idx.find(std::make_tuple(r, packet_id));
93 if (it == idx.end()) return false;
94 idx.erase(it);
95 return true;
96 }
97
98 void clear() {
99 ASYNC_MQTT_LOG("mqtt_impl", info)
100 << "[store] clear";
101 elems_.clear();
102 }
103
104 template <typename Func>
105 void for_each(Func const& func) {
106 ASYNC_MQTT_LOG("mqtt_impl", info)
107 << "[store] for_each";
108 for (auto it = elems_.begin(); it != elems_.end();) {
109 if (func(it->packet)) {
110 ++it;
111 }
112 else {
113 it = elems_.erase(it);
114 }
115 }
116 }
117
118 std::vector<store_packet_t> get_stored() const {
119 ASYNC_MQTT_LOG("mqtt_impl", info)
120 << "[store] get_stored";
121 std::vector<store_packet_t> ret;
122 ret.reserve(elems_.size());
123 for (auto elem : elems_) {
124 if (elem.tim) {
125 auto d =
126 std::chrono::duration_cast<std::chrono::seconds>(
127 elem.tim->expiry() - std::chrono::steady_clock::now()
128 ).count();
129 if (d < 0) d = 0;
130 elem.packet.update_message_expiry_interval(static_cast<std::uint32_t>(d));
131 }
132 ret.push_back(force_move(elem.packet));
133 }
134 return ret;
135 }
136
137private:
138 struct elem_t {
139 elem_t(
140 store_packet_t packet,
141 std::shared_ptr<as::steady_timer> tim = nullptr
142 ): packet{force_move(packet)}, tim{force_move(tim)} {}
143
144 packet_id_t packet_id() const {
145 return packet.packet_id();
146 }
147
148 response_packet response_packet_type() const {
149 return packet.response_packet_type();
150 }
151
152 void const* tim_address() const {
153 return tim.get();
154 }
155
156 store_packet_t packet;
157 std::shared_ptr<as::steady_timer> tim = nullptr;
158 };
159 struct tag_seq{};
160 struct tag_res_id{};
161 struct tag_tim{};
162 using mi_elem = mi::multi_index_container<
163 elem_t,
164 mi::indexed_by<
165 mi::sequenced<
166 mi::tag<tag_seq>
167 >,
168 mi::ordered_unique<
169 mi::tag<tag_res_id>,
170 mi::key<
171 &elem_t::response_packet_type,
172 &elem_t::packet_id
173 >
174 >,
175 mi::hashed_non_unique<
176 mi::tag<tag_tim>,
177 mi::key<
178 &elem_t::tim_address
179 >
180 >
181 >
182 >;
183
184 mi_elem elems_;
185 Executor exe_;
186};
187
188} // namespace async_mqtt
189
190#endif // ASYNC_MQTT_STORE_HPP
response_packet
corresponding response packet
Definition store_packet_variant.hpp:25