7#if !defined(ASYNC_MQTT_STORE_HPP) 
    8#define ASYNC_MQTT_STORE_HPP 
   10#include <boost/asio/steady_timer.hpp> 
   11#include <boost/multi_index_container.hpp> 
   12#include <boost/multi_index/key.hpp> 
   13#include <boost/multi_index/sequenced_index.hpp> 
   14#include <boost/multi_index/ordered_index.hpp> 
   15#include <boost/multi_index/hashed_index.hpp> 
   17#include <async_mqtt/log.hpp> 
   19#include <async_mqtt/packet/packet_traits.hpp> 
   24namespace as = boost::asio;
 
   25namespace mi = boost::multi_index;
 
   27template <std::
size_t PacketIdBytes, 
typename Executor>
 
   30    using packet_id_t = 
typename packet_id_type<PacketIdBytes>::type;
 
   31    using store_packet_t = basic_store_packet_variant<PacketIdBytes>;
 
   33    store(Executor exe):exe_{exe}{}
 
   35    template <
typename Packet>
 
   36    bool add(Packet 
const& packet) {
 
   37        if constexpr(is_publish<Packet>()) {
 
   38            if (packet.opts().get_qos() == qos::at_least_once ||
 
   39                packet.opts().get_qos() == qos::exactly_once) {
 
   40                std::uint32_t sec = 0;
 
   41                if constexpr(is_v5<Packet>()) {
 
   43                    for (
auto const& prop : packet.props()) {
 
   46                                [&](property::message_expiry_interval 
const& p) {
 
   58                    return elems_.emplace_back(packet).second;
 
   61                    auto tim = std::make_shared<as::steady_timer>(exe_);
 
   62                    tim->expires_after(std::chrono::seconds(sec));
 
   64                        [
this, wp = std::weak_ptr<as::steady_timer>(tim)]
 
   65                        (error_code 
const& ec) {
 
   66                            if (
auto tim = wp.lock()) {
 
   68                                    auto& idx = elems_.template get<tag_tim>();
 
   69                                    auto it = idx.find(tim.get());
 
   70                                    if (it == idx.end()) return;
 
   71                                    ASYNC_MQTT_LOG(
"mqtt_impl", info)
 
   72                                        << 
"[store] message expired:" << it->packet;
 
   78                    return elems_.emplace_back(packet, tim).second;
 
   82        else if constexpr(is_pubrel<Packet>()) {
 
   83            return elems_.emplace_back(packet).second;
 
   88    bool erase(response_packet r, packet_id_t packet_id) {
 
   89        ASYNC_MQTT_LOG(
"mqtt_impl", info)
 
   90            << 
"[store] erase pid:" << packet_id;
 
   91        auto& idx = elems_.template get<tag_res_id>();
 
   92        auto it = idx.find(std::make_tuple(r, packet_id));
 
   93        if (it == idx.end()) 
return false;
 
   99        ASYNC_MQTT_LOG(
"mqtt_impl", info)
 
  104    template <
typename Func>
 
  105    void for_each(Func 
const& func) {
 
  106        ASYNC_MQTT_LOG(
"mqtt_impl", info)
 
  107            << 
"[store] for_each";
 
  108        for (
auto it = elems_.begin(); it != elems_.end();) {
 
  109            if (func(it->packet)) {
 
  113                it = elems_.erase(it);
 
  118    std::vector<store_packet_t> get_stored()
 const {
 
  119        ASYNC_MQTT_LOG(
"mqtt_impl", info)
 
  120            << 
"[store] get_stored";
 
  121        std::vector<store_packet_t> ret;
 
  122        ret.reserve(elems_.size());
 
  123        for (
auto elem : elems_) {
 
  126                    std::chrono::duration_cast<std::chrono::seconds>(
 
  127                        elem.tim->expiry() - std::chrono::steady_clock::now()
 
  130                elem.packet.update_message_expiry_interval(
static_cast<std::uint32_t
>(d));
 
  132            ret.push_back(force_move(elem.packet));
 
  140            store_packet_t packet,
 
  141            std::shared_ptr<as::steady_timer> tim = 
nullptr 
  142        ): packet{force_move(packet)}, tim{force_move(tim)} {}
 
  144        packet_id_t packet_id()
 const {
 
  145            return packet.packet_id();
 
  149            return packet.response_packet_type();
 
  152        void const* tim_address()
 const {
 
  156        store_packet_t packet;
 
  157        std::shared_ptr<as::steady_timer> tim = 
nullptr;
 
  162    using mi_elem = mi::multi_index_container<
 
  171                    &elem_t::response_packet_type,
 
  175            mi::hashed_non_unique<
 
response_packet
corresponding response packet
Definition store_packet_variant.hpp:25