7 #if !defined(MQTT_BROKER_OFFLINE_MESSAGE_HPP)
8 #define MQTT_BROKER_OFFLINE_MESSAGE_HPP
12 #include <boost/asio/steady_timer.hpp>
13 #include <boost/multi_index_container.hpp>
14 #include <boost/multi_index/ordered_index.hpp>
15 #include <boost/multi_index/sequenced_index.hpp>
16 #include <boost/multi_index/member.hpp>
29 namespace mi = boost::multi_index;
42 publish_options pubopts,
44 std::shared_ptr<as::steady_timer> tim_message_expiry)
49 tim_message_expiry_(
force_move(tim_message_expiry))
54 if (tim_message_expiry_) {
56 std::chrono::duration_cast<std::chrono::seconds>(
57 tim_message_expiry_->expiry() - std::chrono::steady_clock::now()
60 set_property<v5::property::message_expiry_interval>(
62 v5::property::message_expiry_interval(
63 static_cast<uint32_t
>(d)
67 auto qos_value = pubopts_.get_qos();
68 if (qos_value == qos::at_least_once ||
69 qos_value == qos::exactly_once) {
70 if (
auto pid = ep.acquire_unique_packet_id_no_except()) {
71 ep.publish(pid.value(), topic_, contents_, pubopts_,
force_move(props));
76 ep.publish(topic_, contents_, pubopts_,
force_move(props));
87 publish_options pubopts_;
89 std::shared_ptr<as::steady_timer> tim_message_expiry_;
95 auto& idx = messages_.get<
tag_seq>();
96 while (!idx.empty()) {
97 if (idx.front().send(ep)) {
107 auto& idx = messages_.get<
tag_seq>();
108 while (!idx.empty()) {
109 if (idx.front().send(ep)) {
124 return messages_.empty();
131 publish_options pubopts,
133 optional<std::chrono::steady_clock::duration> message_expiry_interval;
135 auto v = get_property<v5::property::message_expiry_interval>(props);
137 message_expiry_interval.emplace(std::chrono::seconds(v.value().val()));
140 std::shared_ptr<as::steady_timer> tim_message_expiry;
141 if (message_expiry_interval) {
142 tim_message_expiry = std::make_shared<as::steady_timer>(ioc, message_expiry_interval.value());
143 tim_message_expiry->async_wait(
144 [
this, wp = std::weak_ptr<as::steady_timer>(tim_message_expiry)](
error_code ec)
mutable {
145 if (
auto sp = wp.lock()) {
147 messages_.get<tag_tim>().erase(sp);
154 auto& seq_idx = messages_.get<
tag_seq>();
155 seq_idx.emplace_back(
165 using mi_offline_message = mi::multi_index_container<
171 mi::ordered_non_unique<
173 BOOST_MULTI_INDEX_MEMBER(
offline_message, std::shared_ptr<as::steady_timer>, tim_message_expiry_)
178 mi_offline_message messages_;
#define MQTT_BROKER_NS_END
Definition: broker_namespace.hpp:22
#define MQTT_BROKER_NS_BEGIN
Definition: broker_namespace.hpp:21
Definition: offline_message.hpp:37
offline_message(buffer topic, buffer contents, publish_options pubopts, v5::properties props, std::shared_ptr< as::steady_timer > tim_message_expiry)
Definition: offline_message.hpp:39
bool send(endpoint_t &ep) const
Definition: offline_message.hpp:52
Definition: offline_message.hpp:92
void clear()
Definition: offline_message.hpp:119
void send_by_packet_id_release(endpoint_t &ep)
Definition: offline_message.hpp:106
void send_all(endpoint_t &ep)
Definition: offline_message.hpp:94
void push_back(as::io_context &ioc, buffer pub_topic, buffer contents, publish_options pubopts, v5::properties props)
Definition: offline_message.hpp:127
bool empty() const
Definition: offline_message.hpp:123
server<>::endpoint_t endpoint_t
Definition: common_type.hpp:17
std::vector< property_variant > properties
Definition: property_variant.hpp:51
boost::system::error_code error_code
Definition: error_code.hpp:16
constexpr std::remove_reference_t< T > && force_move(T &&t)
Definition: move.hpp:20
const_buffer buffer(MQTT_NS::buffer const &data)
create boost::asio::const_buffer from the MQTT_NS::buffer boost::asio::const_buffer is a kind of view...
Definition: buffer.hpp:253