7#if !defined(ASYNC_MQTT_STORE_HPP)
8#define ASYNC_MQTT_STORE_HPP
10#include <boost/asio/steady_timer.hpp>
11#include <boost/multi_index_container.hpp>
12#include <boost/multi_index/key.hpp>
13#include <boost/multi_index/sequenced_index.hpp>
14#include <boost/multi_index/ordered_index.hpp>
15#include <boost/multi_index/hashed_index.hpp>
17#include <async_mqtt/log.hpp>
19#include <async_mqtt/packet/packet_traits.hpp>
24namespace as = boost::asio;
25namespace mi = boost::multi_index;
27template <std::
size_t PacketIdBytes,
typename Executor>
30 using packet_id_t =
typename packet_id_type<PacketIdBytes>::type;
31 using store_packet_t = basic_store_packet_variant<PacketIdBytes>;
33 store(Executor exe):exe_{exe}{}
35 template <
typename Packet>
36 bool add(Packet
const& packet) {
37 if constexpr(is_publish<Packet>()) {
38 if (packet.opts().get_qos() == qos::at_least_once ||
39 packet.opts().get_qos() == qos::exactly_once) {
40 std::uint32_t sec = 0;
41 if constexpr(is_v5<Packet>()) {
43 for (
auto const& prop : packet.props()) {
46 [&](property::message_expiry_interval
const& p) {
58 return elems_.emplace_back(packet).second;
61 auto tim = std::make_shared<as::steady_timer>(exe_);
62 tim->expires_after(std::chrono::seconds(sec));
64 [
this, wp = std::weak_ptr<as::steady_timer>(tim)]
65 (error_code
const& ec) {
66 if (
auto tim = wp.lock()) {
68 auto& idx = elems_.template get<tag_tim>();
69 auto it = idx.find(tim.get());
70 if (it == idx.end()) return;
71 ASYNC_MQTT_LOG(
"mqtt_impl", info)
72 <<
"[store] message expired:" << it->packet;
78 return elems_.emplace_back(packet, tim).second;
82 else if constexpr(is_pubrel<Packet>()) {
83 return elems_.emplace_back(packet).second;
88 bool erase(response_packet r, packet_id_t packet_id) {
89 ASYNC_MQTT_LOG(
"mqtt_impl", info)
90 <<
"[store] erase pid:" << packet_id;
91 auto& idx = elems_.template get<tag_res_id>();
92 auto it = idx.find(std::make_tuple(r, packet_id));
93 if (it == idx.end())
return false;
99 ASYNC_MQTT_LOG(
"mqtt_impl", info)
104 template <
typename Func>
105 void for_each(Func
const& func) {
106 ASYNC_MQTT_LOG(
"mqtt_impl", info)
107 <<
"[store] for_each";
108 for (
auto it = elems_.begin(); it != elems_.end();) {
109 if (func(it->packet)) {
113 it = elems_.erase(it);
118 std::vector<store_packet_t> get_stored()
const {
119 ASYNC_MQTT_LOG(
"mqtt_impl", info)
120 <<
"[store] get_stored";
121 std::vector<store_packet_t> ret;
122 ret.reserve(elems_.size());
123 for (
auto elem : elems_) {
126 std::chrono::duration_cast<std::chrono::seconds>(
127 elem.tim->expiry() - std::chrono::steady_clock::now()
130 elem.packet.update_message_expiry_interval(
static_cast<std::uint32_t
>(d));
132 ret.push_back(force_move(elem.packet));
140 store_packet_t packet,
141 std::shared_ptr<as::steady_timer> tim =
nullptr
142 ): packet{force_move(packet)}, tim{force_move(tim)} {}
144 packet_id_t packet_id()
const {
145 return packet.packet_id();
149 return packet.response_packet_type();
152 void const* tim_address()
const {
156 store_packet_t packet;
157 std::shared_ptr<as::steady_timer> tim =
nullptr;
162 using mi_elem = mi::multi_index_container<
171 &elem_t::response_packet_type,
175 mi::hashed_non_unique<
response_packet
corresponding response packet
Definition store_packet_variant.hpp:25