7 #if !defined(MQTT_BROKER_SESSION_STATE_HPP)
8 #define MQTT_BROKER_SESSION_STATE_HPP
14 #include <boost/asio/io_context.hpp>
15 #include <boost/multi_index_container.hpp>
16 #include <boost/multi_index/ordered_index.hpp>
17 #include <boost/multi_index/member.hpp>
32 namespace mi = boost::multi_index;
62 optional<std::chrono::steady_clock::duration> will_expiry_interval,
66 shared_targets_(shared_targets),
87 template <
typename SessionExpireHandler>
90 con_->for_each_store_with_life_keeper(
94 <<
"store inflight message";
96 std::shared_ptr<as::steady_timer> tim_message_expiry;
100 [&](v5::basic_publish_message<
sizeof(
packet_id_t)>
const& m) {
101 auto v = get_property<v5::property::message_expiry_interval>(m.props());
104 std::make_shared<as::steady_timer>(ioc_, std::chrono::seconds(v.value().val()));
105 tim_message_expiry->async_wait(
106 [
this, wp = std::weak_ptr<as::steady_timer>(tim_message_expiry)]
108 if (
auto sp = wp.lock()) {
110 erase_inflight_message_by_expiry(sp);
132 if (session_expiry_interval_ &&
133 session_expiry_interval_.value() != std::chrono::seconds(session_never_expire)) {
137 <<
"session expiry interval timer set";
139 tim_session_expiry_ = std::make_shared<as::steady_timer>(ioc_, session_expiry_interval_.value());
140 tim_session_expiry_->async_wait(
141 [
this, wp = std::weak_ptr<as::steady_timer>(tim_session_expiry_), h = std::forward<SessionExpireHandler>(h)]
143 if (
auto sp = wp.lock()) {
145 MQTT_LOG(
"mqtt_broker", info)
146 << MQTT_ADD_VALUE(address, this)
147 <<
"session expired";
159 <<
"renew_session expiry";
161 tim_session_expiry_.reset();
165 return tim_session_expiry_;
172 publish_options pubopts,
175 BOOST_ASSERT(online());
177 if (offline_messages_.empty()) {
178 auto qos_value = pubopts.get_qos();
179 if (qos_value == qos::at_least_once ||
180 qos_value == qos::exactly_once) {
181 if (
auto pid = con_->acquire_unique_packet_id_no_except()) {
187 con_->publish(pid.value(), pub_topic, contents, pubopts, props);
192 con_->publish(pub_topic, contents, pubopts, props);
198 offline_messages_.push_back(
211 publish_options pubopts,
224 offline_messages_.push_back(
235 inflight_messages_.clear();
236 offline_messages_.clear();
237 qos2_publish_processed_.clear();
238 shared_targets_.erase(*
this);
243 qos2_publish_processed_.insert(packet_id);
247 return qos2_publish_processed_.find(packet_id) != qos2_publish_processed_.end();
251 qos2_publish_processed_.erase(packet_id);
254 template <
typename PublishRetainHandler>
258 subscribe_options subopts,
259 PublishRetainHandler&& h,
260 optional<std::size_t> sid = nullopt
262 if (!share_name.empty()) {
263 shared_targets_.insert(share_name, topic_filter, *
this);
268 <<
" share_name:" << share_name
269 <<
" topic_filter:" << topic_filter
270 <<
" qos:" << subopts.get_qos();
273 auto handle_ret = subs_map_.insert_or_assign(
279 auto rh = subopts.get_retain_handling();
281 if (handle_ret.second) {
284 <<
"subscription inserted";
286 handles_.insert(handle_ret.first);
287 if (rh == retain_handling::send ||
288 rh == retain_handling::send_only_new_subscription) {
289 std::forward<PublishRetainHandler>(h)();
295 <<
"subscription updated";
297 if (rh == retain_handling::send) {
298 std::forward<PublishRetainHandler>(h)();
304 if (!share_name.empty()) {
305 shared_targets_.erase(share_name, topic_filter, *
this);
307 auto handle = subs_map_.lookup(topic_filter);
309 handles_.erase(handle.value());
310 subs_map_.erase(handle.value(), client_id_);
315 for (
auto const& h : handles_) {
316 subs_map_.erase(h, client_id_);
323 optional<MQTT_NS::will> will,
324 optional<std::chrono::steady_clock::duration> will_expiry_interval) {
325 tim_will_expiry_.reset();
328 if (will_value_ && will_expiry_interval) {
329 tim_will_expiry_ = std::make_shared<as::steady_timer>(ioc, will_expiry_interval.value());
330 tim_will_expiry_->async_wait(
331 [
this, wp = std::weak_ptr<as::steady_timer>(tim_will_expiry_)]
333 if (
auto sp = wp.lock()) {
344 tim_will_expiry_.reset();
345 will_value_ = nullopt;
351 std::shared_ptr<as::steady_timer> tim_message_expiry
353 inflight_messages_.insert(
362 inflight_messages_.send_all_messages(*con_);
366 inflight_messages_.get<
tag_tim>().erase(sp);
370 auto& idx = inflight_messages_.get<
tag_pid>();
371 idx.erase(packet_id);
376 offline_messages_.send_all(*con_);
381 offline_messages_.send_by_packet_id_release(*con_);
401 return session_expiry_interval_;
404 optional<MQTT_NS::will>&
will() {
return will_value_; }
405 optional<MQTT_NS::will>
const&
will()
const {
return will_value_; }
412 as::io_context& ioc_;
413 std::shared_ptr<as::steady_timer> tim_will_expiry_;
414 optional<MQTT_NS::will> will_value_;
421 optional<std::chrono::steady_clock::duration> will_delay_;
422 optional<std::chrono::steady_clock::duration> session_expiry_interval_;
423 std::shared_ptr<as::steady_timer> tim_session_expiry_;
426 std::set<packet_id_t> qos2_publish_processed_;
430 std::set<sub_con_map::handle> handles_;
435 template <
typename Tag>
437 return entries_.get<Tag>();
440 template <
typename Tag>
441 decltype(
auto)
get()
const {
442 return entries_.get<Tag>();
452 using mi_session_state = mi::multi_index_container<
456 mi::ordered_non_unique<
464 mi::ordered_non_unique<
466 BOOST_MULTI_INDEX_MEMBER(
session_state, std::shared_ptr<as::steady_timer>, tim_session_expiry_)
471 mi_session_state entries_;
#define MQTT_BROKER_NS_END
Definition: broker_namespace.hpp:22
#define MQTT_BROKER_NS_BEGIN
Definition: broker_namespace.hpp:21
Definition: inflight_message.hpp:91
Definition: offline_message.hpp:92
Definition: session_state.hpp:433
decltype(auto) get()
Definition: session_state.hpp:436
void clear()
Definition: session_state.hpp:445
Definition: shared_target.hpp:32
endpoint_t::packet_id_t packet_id_t
Definition: common_type.hpp:20
std::shared_ptr< endpoint_t > con_sp_t
Definition: common_type.hpp:18
#define MQTT_LOG(chan, sev)
Definition: log.hpp:135
#define MQTT_ADD_VALUE(name, val)
Definition: log.hpp:136
std::vector< property_variant > properties
Definition: property_variant.hpp:51
constexpr decltype(auto) visit(Visitor &&vis, Variants &&... vars)
Definition: variant.hpp:60
basic_store_message_variant< 2 > store_message_variant
Definition: message_variant.hpp:119
boost::system::error_code error_code
Definition: error_code.hpp:16
constexpr std::remove_reference_t< T > && force_move(T &&t)
Definition: move.hpp:20
lambda_visitor< Lambdas... > make_lambda_visitor(Lambdas &&... lambdas)
Definition: visitor_util.hpp:37
Definition: buffer.hpp:242
const_buffer buffer(MQTT_NS::buffer const &data)
create boost::asio::const_buffer from the MQTT_NS::buffer boost::asio::const_buffer is a kind of view...
Definition: buffer.hpp:253
Definition: session_state.hpp:53
void erase_inflight_message_by_packet_id(packet_id_t packet_id)
Definition: session_state.hpp:369
void unsubscribe_all()
Definition: session_state.hpp:314
void update_will(as::io_context &ioc, optional< MQTT_NS::will > will, optional< std::chrono::steady_clock::duration > will_expiry_interval)
Definition: session_state.hpp:321
void exactly_once_start(packet_id_t packet_id)
Definition: session_state.hpp:242
void reset_con(con_sp_t con)
Definition: session_state.hpp:392
void deliver(as::io_context &ioc, buffer pub_topic, buffer contents, publish_options pubopts, v5::properties props)
Definition: session_state.hpp:207
optional< MQTT_NS::will > const & will() const
Definition: session_state.hpp:405
void unsubscribe(buffer const &share_name, buffer const &topic_filter)
Definition: session_state.hpp:303
std::shared_ptr< as::steady_timer > & get_tim_will_expiry()
Definition: session_state.hpp:407
void publish(as::io_context &ioc, buffer pub_topic, buffer contents, publish_options pubopts, v5::properties props)
Definition: session_state.hpp:168
con_sp_t const & con() const
Definition: session_state.hpp:396
bool exactly_once_processing(packet_id_t packet_id) const
Definition: session_state.hpp:246
void clean()
Definition: session_state.hpp:234
~session_state()
Definition: session_state.hpp:76
void subscribe(buffer share_name, buffer topic_filter, subscribe_options subopts, PublishRetainHandler &&h, optional< std::size_t > sid=nullopt)
Definition: session_state.hpp:255
void become_offline(SessionExpireHandler &&h)
Definition: session_state.hpp:88
optional< std::chrono::steady_clock::duration > session_expiry_interval() const
Definition: session_state.hpp:400
session_state(session_state &&)=default
void send_inflight_messages()
Definition: session_state.hpp:360
void renew_session_expiry(optional< std::chrono::steady_clock::duration > v)
Definition: session_state.hpp:156
void reset_con()
Definition: session_state.hpp:388
void send_all_offline_messages()
Definition: session_state.hpp:374
bool online() const
Definition: session_state.hpp:83
void exactly_once_finish(packet_id_t packet_id)
Definition: session_state.hpp:250
void insert_inflight_message(store_message_variant msg, any life_keeper, std::shared_ptr< as::steady_timer > tim_message_expiry)
Definition: session_state.hpp:348
void erase_inflight_message_by_expiry(std::shared_ptr< as::steady_timer > const &sp)
Definition: session_state.hpp:365
buffer const & client_id() const
Definition: session_state.hpp:384
session_state(as::io_context &ioc, sub_con_map &subs_map, shared_target &shared_targets, con_sp_t con, buffer client_id, optional< will > will, optional< std::chrono::steady_clock::duration > will_expiry_interval, optional< std::chrono::steady_clock::duration > session_expiry_interval=nullopt)
Definition: session_state.hpp:55
void send_offline_messages_by_packet_id_release()
Definition: session_state.hpp:379
optional< MQTT_NS::will > & will()
Definition: session_state.hpp:404
std::shared_ptr< as::steady_timer > const & tim_session_expiry() const
Definition: session_state.hpp:164
void reset_will()
Definition: session_state.hpp:343
Definition: subscription.hpp:20