mqtt_cpp
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
retained_topic_map.hpp
Go to the documentation of this file.
1 // Copyright Wouter van Kleunen 2019
2 //
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 
7 #if !defined(MQTT_BROKER_RETAINED_TOPIC_MAP_HPP)
8 #define MQTT_BROKER_RETAINED_TOPIC_MAP_HPP
9 
10 #include <boost/functional/hash.hpp>
11 #include <boost/multi_index_container.hpp>
12 #include <boost/multi_index/ordered_index.hpp>
13 #include <boost/multi_index/hashed_index.hpp>
14 #include <boost/multi_index/composite_key.hpp>
15 #include <boost/multi_index/member.hpp>
16 
18 #include <mqtt/string_view.hpp>
19 #include <mqtt/optional.hpp>
20 #include <mqtt/buffer.hpp>
21 
23 
25 
26 namespace mi = boost::multi_index;
27 
28 template<typename Value>
30 
31  // Exceptions used
32  static void throw_max_stored_topics() { throw std::overflow_error("Retained map maximum number of topics reached"); }
33  static void throw_no_wildcards_allowed() { throw std::runtime_error("Retained map no wildcards allowed in retained topic name"); }
34 
35  using node_id_t = std::size_t;
36 
37  static constexpr node_id_t root_parent_id = 0;
38  static constexpr node_id_t root_node_id = 1;
39  static constexpr node_id_t max_node_id = std::numeric_limits<node_id_t>::max();
40 
41  struct path_entry {
42  node_id_t parent_id;
43  buffer name_buffer;
44  string_view name;
45 
46  node_id_t id;
47 
48  std::size_t count = 1;
49  static constexpr std::size_t max_count = std::numeric_limits<std::size_t>::max();
50 
51  // Increase the count for this node
52  void increase_count() {
53  if (count == max_count) {
54  throw_max_stored_topics();
55  }
56 
57  ++count;
58  }
59 
60  // Decrease the count for this node
61  void decrease_count() {
62  BOOST_ASSERT(count >= count);
63  --count;
64  }
65 
66  optional<Value> value;
67 
68  path_entry(node_id_t parent_id, string_view name, node_id_t id)
69  : parent_id(parent_id), name_buffer(allocate_buffer(name)), name(name_buffer), id(id)
70  { }
71  };
72 
73  struct wildcard_index_tag { };
74  struct direct_index_tag { };
75 
76  // allow for two indices on retained topics
77  using path_entry_set = mi::multi_index_container<
78  path_entry,
79  mi::indexed_by<
80  // index required for direct child access
81  mi::hashed_unique <
82  mi::tag<direct_index_tag>,
83  mi::composite_key<path_entry,
84  BOOST_MULTI_INDEX_MEMBER(path_entry, node_id_t, parent_id),
85  BOOST_MULTI_INDEX_MEMBER(path_entry, string_view, name) >
86  >,
87 
88  // index required for wildcard processing
89  mi::ordered_non_unique< mi::tag<wildcard_index_tag>, BOOST_MULTI_INDEX_MEMBER(path_entry, node_id_t, parent_id) >
90  >
91  >;
92 
93  using direct_const_iterator = typename path_entry_set::template index<direct_index_tag>::type::const_iterator;
94  using wildcard_const_iterator = typename path_entry_set::template index<wildcard_index_tag>::type::const_iterator;
95 
96  path_entry_set map;
97  size_t map_size;
98  node_id_t next_node_id;
99 
100  direct_const_iterator root;
101 
102  direct_const_iterator create_topic(string_view topic) {
103  direct_const_iterator parent = root;
104 
106  topic,
107  [this, &parent](string_view t) {
108  if (t == "+" || t == "#") {
109  throw_no_wildcards_allowed();
110  }
111 
112  node_id_t parent_id = parent->id;
113 
114  auto& direct_index = map.template get<direct_index_tag>();
115  direct_const_iterator entry = direct_index.find(std::make_tuple(parent_id, t));
116 
117  if (entry == direct_index.end()) {
118  entry = map.insert(path_entry(parent->id, t, next_node_id++)).first;
119  if (next_node_id == max_node_id) {
120  throw_max_stored_topics();
121  }
122  }
123  else {
124  direct_index.modify(entry, [](path_entry& entry){ entry.increase_count(); });
125  }
126 
127  parent = entry;
128  return true;
129  }
130  );
131 
132  return parent;
133  }
134 
135  std::vector<direct_const_iterator> find_topic(string_view topic) {
136  std::vector<direct_const_iterator> path;
137  direct_const_iterator parent = root;
138 
140  topic,
141  [this, &parent, &path](string_view t) {
142  auto const& direct_index = map.template get<direct_index_tag>();
143  auto entry = direct_index.find(std::make_tuple(parent->id, t));
144 
145  if (entry == direct_index.end()) {
146  path = std::vector<direct_const_iterator>();
147  return false;
148  }
149 
150  path.push_back(entry);
151  parent = entry;
152  return true;
153  }
154  );
155 
156  return path;
157  }
158 
159  // Match all underlying topics when a hash entry is matched
160  // perform a breadth-first iteration over all items in the tree below
161  template<typename Output>
162  void match_hash_entries(node_id_t parent, Output&& callback, bool ignore_system) const {
163  std::deque<node_id_t> entries;
164  entries.push_back(parent);
165  std::deque<node_id_t> new_entries;
166 
167  auto const& wildcard_index = map.template get<wildcard_index_tag>();
168 
169  while (!entries.empty()) {
170  new_entries.resize(0);
171 
172  for (auto root : entries) {
173  // Find all entries below this node
174  for (auto i = wildcard_index.lower_bound(root); i != wildcard_index.end() && i->parent_id == root; ++i) {
175 
176  // Should we ignore system matches
177  if (!ignore_system || i->name.empty() || i->name[0] != '$') {
178  if (i->value) {
179  callback(*i->value);
180  }
181 
182  new_entries.push_back(i->id);
183  }
184  }
185  }
186 
187  // Ignore system only on first level
188  ignore_system = false;
189  std::swap(entries, new_entries);
190  }
191 
192  }
193 
194  // Find all topics that match the specified topic filter
195  template<typename Output>
196  void find_match(string_view topic_filter, Output&& callback) const {
197  std::deque<direct_const_iterator> entries;
198  entries.push_back(root);
199 
200  std::deque<direct_const_iterator> new_entries;
202  topic_filter,
203  [this, &entries, &new_entries, &callback](string_view t) {
204  auto const& direct_index = map.template get<direct_index_tag>();
205  auto const& wildcard_index = map.template get<wildcard_index_tag>();
206  new_entries.resize(0);
207 
208  for (auto const& entry : entries) {
209  node_id_t parent = entry->id;
210 
211  if (t == string_view("+")) {
212  for (auto i = wildcard_index.lower_bound(parent); i != wildcard_index.end() && i->parent_id == parent; ++i) {
213  if (parent != root_node_id || i->name.empty() || i->name[0] != '$') {
214  new_entries.push_back(map.template project<direct_index_tag, wildcard_const_iterator>(i));
215  }
216  else {
217  break;
218  }
219  }
220  }
221  else if (t == string_view("#")) {
222  match_hash_entries(parent, callback, parent == root_node_id);
223  return false;
224  }
225  else {
226  direct_const_iterator i = direct_index.find(std::make_tuple(parent, t));
227  if (i != direct_index.end()) {
228  new_entries.push_back(i);
229  }
230  }
231  }
232 
233  std::swap(new_entries, entries);
234  return !entries.empty();
235  }
236  );
237 
238  for (auto& entry : entries) {
239  if (entry->value) {
240  callback(*entry->value);
241  }
242  }
243  }
244 
245  // Remove a value at the specified topic
246  size_t erase_topic(string_view topic) {
247  auto path = find_topic(topic);
248 
249  // Reset the value if there is actually something stored
250  if (!path.empty() && path.back()->value) {
251  auto& direct_index = map.template get<direct_index_tag>();
252  direct_index.modify(path.back(), [](path_entry &entry){ entry.value = nullopt; });
253 
254  // Do iterators stay valid when erasing ? I think they do ?
255  for (auto entry : path) {
256  direct_index.modify(entry, [](path_entry& entry){ entry.decrease_count(); });
257 
258  if (entry->count == 0) {
259  map.erase(entry);
260  }
261  }
262 
263  return 1;
264  }
265 
266  return 0;
267  }
268 
269  // Increase the number of topics for this path
270  void increase_topics(std::vector<direct_const_iterator> const &path) {
271  auto& direct_index = map.template get<direct_index_tag>();
272 
273  for(auto& i : path) {
274  direct_index.modify(i, [](path_entry& entry){ entry.increase_count(); });
275  }
276  }
277 
278  // Increase the map size (total number of topics stored)
279  void increase_map_size() {
280  if(map_size == std::numeric_limits<decltype(map_size)>::max()) {
281  throw_max_stored_topics();
282  }
283 
284  ++map_size;
285  }
286 
287  // Decrease the map size (total number of topics stored)
288  void decrease_map_size(size_t count) {
289  BOOST_ASSERT(map_size >= count);
290  map_size -= count;
291  }
292 
293  void init_map() {
294  map_size = 0;
295  // Create the root node
296  root = map.insert(path_entry(root_parent_id, "", root_node_id)).first;
297  next_node_id = root_node_id + 1;
298  }
299 
300 public:
302  {
303  init_map();
304  }
305 
306  // Insert a value at the specified topic
307  template<typename V>
308  std::size_t insert_or_assign(string_view topic, V&& value) {
309  auto& direct_index = map.template get<direct_index_tag>();
310  auto path = this->find_topic(topic);
311 
312  if (path.empty()) {
313  auto new_topic = this->create_topic(topic);
314  direct_index.modify(new_topic, [&value](path_entry &entry) mutable { entry.value.emplace(std::forward<V>(value)); });
315  increase_map_size();
316  return 1;
317  }
318 
319  if (!path.back()->value) {
320  this->increase_topics(path);
321  direct_index.modify(path.back(), [&value](path_entry &entry) mutable { entry.value.emplace(std::forward<V>(value)); });
322  increase_map_size();
323  return 1;
324  }
325 
326  direct_index.modify(path.back(), [&value](path_entry &entry) mutable { entry.value.emplace(std::forward<V>(value)); });
327 
328  return 0;
329  }
330 
331  // Find all stored topics that math the specified topic_filter
332  template<typename Output>
333  void find(string_view topic_filter, Output&& callback) const {
334  find_match(topic_filter, std::forward<Output>(callback));
335  }
336 
337  // Remove a stored value at the specified topic
338  std::size_t erase(string_view topic) {
339  auto result = erase_topic(topic);
340  decrease_map_size(result);
341  return result;
342  }
343 
344  // Get the number of entries stored in the map
345  std::size_t size() const { return map_size; }
346 
347  // Get the number of entries in the map (for debugging purpose only)
348  std::size_t internal_size() const { return map.size(); }
349 
350  // Clear all topics
351  void clear() {
352  map.clear();
353  init_map();
354  }
355 
356  // Dump debug information
357  template<typename Output>
358  void dump(Output &out) {
359  auto const& direct_index = map.template get<direct_index_tag>();
360  for (auto const& i : direct_index) {
361  out << i.parent_id << " " << i.name << " " << (i.value ? "init" : "-") << " " << i.count << std::endl;
362  }
363  }
364 
365 };
366 
368 
369 #endif // MQTT_BROKER_RETAINED_TOPIC_MAP_HPP
#define MQTT_BROKER_NS_END
Definition: broker_namespace.hpp:22
#define MQTT_BROKER_NS_BEGIN
Definition: broker_namespace.hpp:21
Definition: retained_topic_map.hpp:29
std::size_t insert_or_assign(string_view topic, V &&value)
Definition: retained_topic_map.hpp:308
std::size_t erase(string_view topic)
Definition: retained_topic_map.hpp:338
void dump(Output &out)
Definition: retained_topic_map.hpp:358
std::size_t internal_size() const
Definition: retained_topic_map.hpp:348
retained_topic_map()
Definition: retained_topic_map.hpp:301
void find(string_view topic_filter, Output &&callback) const
Definition: retained_topic_map.hpp:333
std::size_t size() const
Definition: retained_topic_map.hpp:345
void clear()
Definition: retained_topic_map.hpp:351
id
Definition: property_id.hpp:19
boost::string_ref string_view
Definition: string_view.hpp:64
buffer allocate_buffer(Iterator b, Iterator e)
create buffer from the pair of iterators It copies string that from b to e into shared_ptr_array....
Definition: buffer.hpp:130
const_buffer buffer(MQTT_NS::buffer const &data)
create boost::asio::const_buffer from the MQTT_NS::buffer boost::asio::const_buffer is a kind of view...
Definition: buffer.hpp:253
void topic_filter_tokenizer(Iterator first, Iterator last, Output write)
Definition: topic_filter_tokenizer.hpp:20