7#if !defined(ASYNC_MQTT_UTIL_STORE_HPP)
8#define ASYNC_MQTT_UTIL_STORE_HPP
10#include <boost/asio/steady_timer.hpp>
11#include <boost/multi_index_container.hpp>
12#include <boost/multi_index/key.hpp>
13#include <boost/multi_index/sequenced_index.hpp>
14#include <boost/multi_index/ordered_index.hpp>
15#include <boost/multi_index/hashed_index.hpp>
17#include <async_mqtt/util/log.hpp>
18#include <async_mqtt/packet/store_packet_variant.hpp>
19#include <async_mqtt/packet/packet_traits.hpp>
24namespace as = boost::asio;
25namespace mi = boost::multi_index;
27template <std::
size_t PacketIdBytes>
30 using store_packet_type = basic_store_packet_variant<PacketIdBytes>;
32 explicit store(as::any_io_executor exe):exe_{exe}{}
34 template <
typename Packet>
35 bool add(Packet
const& packet) {
36 if constexpr(is_publish<Packet>()) {
39 std::uint32_t sec = 0;
40 if constexpr(is_v5<Packet>()) {
42 for (
auto const& prop : packet.props()) {
57 return elems_.emplace_back(packet).second;
60 auto tim = std::make_shared<as::steady_timer>(exe_);
61 tim->expires_after(std::chrono::seconds(sec));
63 [
this, wp = std::weak_ptr<as::steady_timer>(tim)]
65 if (
auto tim = wp.lock()) {
67 auto& idx = elems_.template get<tag_tim>();
68 auto it = idx.find(tim.get());
69 if (it == idx.end())
return;
70 ASYNC_MQTT_LOG(
"mqtt_impl",
info)
71 <<
"[store] message expired:" << it->packet;
77 return elems_.emplace_back(packet, tim).second;
81 else if constexpr(is_pubrel<Packet>()) {
82 return elems_.emplace_back(packet).second;
87 bool erase(
response_packet r,
typename basic_packet_id_type<PacketIdBytes>::type packet_id) {
88 ASYNC_MQTT_LOG(
"mqtt_impl",
info)
89 <<
"[store] erase pid:" << packet_id;
90 auto& idx = elems_.template get<tag_res_id>();
91 auto it = idx.find(std::make_tuple(r, packet_id));
92 if (it == idx.end())
return false;
98 ASYNC_MQTT_LOG(
"mqtt_impl",
info)
103 template <
typename Func>
104 void for_each(Func
const& func) {
105 ASYNC_MQTT_LOG(
"mqtt_impl",
info)
106 <<
"[store] for_each";
107 for (
auto it = elems_.begin(); it != elems_.end();) {
108 if (func(it->packet)) {
112 it = elems_.erase(it);
117 std::vector<store_packet_type> get_stored()
const {
118 ASYNC_MQTT_LOG(
"mqtt_impl",
info)
119 <<
"[store] get_stored";
120 std::vector<store_packet_type> ret;
121 ret.reserve(elems_.size());
122 for (
auto elem : elems_) {
125 std::chrono::duration_cast<std::chrono::seconds>(
126 elem.tim->expiry() - std::chrono::steady_clock::now()
129 elem.packet.update_message_expiry_interval(
static_cast<std::uint32_t
>(d));
131 ret.push_back(force_move(elem.packet));
139 store_packet_type packet,
140 std::shared_ptr<as::steady_timer> tim =
nullptr
141 ): packet{force_move(packet)}, tim{force_move(tim)} {}
143 typename basic_packet_id_type<PacketIdBytes>::type packet_id()
const {
144 return packet.packet_id();
148 return packet.response_packet_type();
151 void const* tim_address()
const {
155 store_packet_type packet;
156 std::shared_ptr<as::steady_timer> tim =
nullptr;
161 using mi_elem = mi::multi_index_container<
170 &elem_t::response_packet_type,
174 mi::hashed_non_unique<
184 as::any_io_executor exe_;
sys::error_code error_code
sys is a namespace alias of boost::sytem.
Definition error.hpp:56
@ info
info level api call is output
@ message_expiry_interval
Message Expiry Interval.
@ exactly_once
Exactly once delivery.
@ at_least_once
At least once delivery.
response_packet
corresponding response packet
Definition store_packet_variant.hpp:31