mqtt_cpp
shared_target_impl.hpp
Go to the documentation of this file.
1 // Copyright Takatoshi Kondo 2020
2 //
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 
7 #if !defined(MQTT_BROKER_SHARED_TARGET_IMPL_HPP)
8 #define MQTT_BROKER_SHARED_TARGET_IMPL_HPP
9 
12 
14 
15 inline void shared_target::insert(buffer share_name, buffer topic_filter, session_state& ss) {
16  auto& idx = targets_.get<tag_cid_sn>();
17  auto it = idx.lower_bound(std::make_tuple(ss.client_id(), share_name));
18  if (it == idx.end() || (it->share_name != share_name || it->client_id() != ss.client_id())) {
19  it = idx.emplace_hint(it, force_move(share_name), ss, std::chrono::steady_clock::now());
20  idx.modify(
21  it,
22  [&](auto& e) {
23  bool inserted;
24  std::tie(std::ignore, inserted) = e.topic_filters.insert(force_move(topic_filter));
25  BOOST_ASSERT(inserted);
26  }
27  );
28  }
29  else {
30  // entry exists
31  idx.modify(
32  it,
33  [&](auto& e) {
34  e.topic_filters.insert(force_move(topic_filter)); // ignore overwrite
35  }
36  );
37  }
38 }
39 
40 inline void shared_target::erase(buffer share_name, buffer topic_filter, session_state const& ss) {
41  auto& idx = targets_.get<tag_cid_sn>();
42  auto it = idx.find(std::make_tuple(ss.client_id(), share_name));
43  if (it == idx.end()) {
44  MQTT_LOG("mqtt_broker", warning)
45  << "attempt to erase non exist entry"
46  << " share_name:" << share_name
47  << " topic_filtere:" << topic_filter
48  << " client_id:" << ss.client_id();
49  return;
50  }
51  // entry exists
52  idx.modify(it, [&](auto& e) { e.topic_filters.erase(topic_filter); });
53  if (it->topic_filters.empty()) {
54  idx.erase(it);
55  }
56 }
57 
58 inline void shared_target::erase(session_state const& ss) {
59  auto& idx = targets_.get<tag_cid_sn>();
60  auto r = idx.equal_range(ss.client_id());
61  idx.erase(r.first, r.second);
62 }
63 
64 inline optional<session_state_ref> shared_target::get_target(buffer const& share_name, buffer const& topic_filter) {
65  // get share_name matched range ordered by timestamp (ascending)
66  auto& idx = targets_.get<tag_sn_tp>();
67  auto r = idx.equal_range(share_name);
68  for (; r.first != r.second; ++r.first) {
69  auto const& elem = *r.first;
70  auto it = elem.topic_filters.find(topic_filter);
71 
72  // no share_name/topic_filter matched
73  if (it == elem.topic_filters.end()) continue;
74 
75  // matched
76  // update timestamp
77  idx.modify(r.first, [](auto& e) { e.tp = std::chrono::steady_clock::now(); });
78  return elem.ssr;
79  }
80  return nullopt;
81 }
82 
83 inline shared_target::entry::entry(
84  buffer share_name,
85  session_state& ss,
86  time_point_t tp)
87  : share_name { force_move(share_name) },
88  ssr { ss },
89  tp { force_move(tp) }
90 {}
91 
92 inline buffer const& shared_target::entry::client_id() const {
93  return ssr.get().client_id();
94 }
95 
97 
98 #endif // MQTT_BROKER_SHARED_TARGET_IMPL_HPP
std::chrono::time_point< std::chrono::steady_clock > time_point_t
Definition: time_point_t.hpp:18
#define MQTT_BROKER_NS_END
Definition: broker_namespace.hpp:22
#define MQTT_BROKER_NS_BEGIN
Definition: broker_namespace.hpp:21
void insert(buffer share_name, buffer topic_filter, session_state &ss)
Definition: shared_target_impl.hpp:15
optional< session_state_ref > get_target(buffer const &share_name, buffer const &topic_filter)
Definition: shared_target_impl.hpp:64
void erase(buffer share_name, buffer topic_filter, session_state const &ss)
Definition: shared_target_impl.hpp:40
#define MQTT_LOG(chan, sev)
Definition: log.hpp:135
constexpr std::remove_reference_t< T > && force_move(T &&t)
Definition: move.hpp:20
const_buffer buffer(MQTT_NS::buffer const &data)
create boost::asio::const_buffer from the MQTT_NS::buffer boost::asio::const_buffer is a kind of view...
Definition: buffer.hpp:253
Definition: session_state.hpp:53
buffer const & client_id() const
Definition: session_state.hpp:384
Definition: tags.hpp:26
Definition: tags.hpp:25