7 #if !defined(MQTT_BROKER_SUBSCRIPTION_MAP_HPP)
8 #define MQTT_BROKER_SUBSCRIPTION_MAP_HPP
10 #include <unordered_map>
12 #include <boost/functional/hash.hpp>
13 #include <boost/range/adaptor/reversed.hpp>
90 template<std::
size_t N>
92 static_assert(N == 4 || N == 8,
"Subscription map count_storage only knows how to handle architectures with 32 or 64 bit size_t: please update to support your platform.");
99 : value_(v & 0x3fffffffUL), has_hash_child_(false), has_plus_child_(false)
102 static constexpr std::size_t
max() {
return std::numeric_limits<uint32_t>::max() >> 2; }
104 std::uint32_t
value()
const {
return value_; }
106 value_ = v & 0x3fffffffUL;
126 std::uint32_t value_ : 30;
127 std::uint32_t has_hash_child_ : 1;
128 std::uint32_t has_plus_child_ : 1;
136 : value_(v & 0x3fffffffffffffffULL), has_hash_child_(false), has_plus_child_(false)
139 static constexpr std::uint64_t
max() {
return std::numeric_limits<uint64_t>::max() >> 2; }
141 std::uint64_t
value()
const {
return value_; }
143 value_ = v & 0x3fffffffffffffffULL;
164 std::uint64_t value_ : 62;
165 std::uint64_t has_hash_child_ : 1;
166 std::uint64_t has_plus_child_ : 1;
169 template<
typename Value>
180 if(next_node_id == std::numeric_limits<node_id_t>::max())
182 return ++next_node_id;
191 count_storage_t count;
196 :
id(
id), parent(parent)
201 static void increase_count_storage(count_storage_t &count) {
202 if(count.value() == count_storage_t::max()) {
206 count.increment_value();
210 static void decrease_count_storage(count_storage_t& count) {
211 BOOST_ASSERT(count.value() > 0);
212 count.decrement_value();
218 using map_type = std::unordered_map< path_entry_key, path_entry, boost::hash< path_entry_key > >;
221 using map_type_iterator =
typename map_type::iterator;
222 using map_type_const_iterator =
typename map_type::const_iterator;
240 map_type_iterator
begin() {
return map.begin(); }
241 map_type_iterator
end() {
return map.end(); }
242 map_type
const&
get_map()
const {
return map; }
245 return path.back()->first;
249 auto parent_id =
get_root()->second.id;
250 std::vector< map_type_iterator > path;
257 if (entry == map.end()) {
262 path.push_back(entry);
263 parent_id = entry->second.id;
274 std::vector<map_type_iterator> result;
281 if (entry == map.end()) {
288 path_entry(generate_node_id(), parent->first)
291 parent->second.count.set_plus_child(parent->second.count.has_plus_child() | (t ==
"+"));
292 parent->second.count.set_hash_child(parent->second.count.has_hash_child() | (t ==
"#"));
295 increase_count_storage(entry->second.count);
298 result.push_back(entry);
309 bool remove_plus_child_flag =
false;
310 bool remove_hash_child_flag =
false;
313 for (
auto& entry : boost::adaptors::reverse(path)) {
314 if (remove_plus_child_flag) {
315 entry->second.count.set_plus_child(
false);
316 remove_plus_child_flag =
false;
319 if (remove_hash_child_flag) {
320 entry->second.count.set_hash_child(
false);
321 remove_hash_child_flag =
false;
324 decrease_count_storage(entry->second.count);
325 if (entry->second.count.value() == 0) {
326 remove_plus_child_flag = (entry->first.second ==
"+");
327 remove_hash_child_flag = (entry->first.second ==
"#");
331 map.erase(entry->first);
336 if (remove_plus_child_flag) {
337 root->second.count.set_plus_child(
false);
340 if (remove_hash_child_flag) {
341 root->second.count.set_hash_child(
false);
345 template <
typename ThisType,
typename Output>
347 using iterator_type = decltype(
self.map.end());
349 std::vector<iterator_type> entries;
355 std::vector<iterator_type> new_entries;
357 for (
auto& entry : entries) {
358 auto parent = entry->second.id;
360 if (i !=
self.map.end()) {
361 new_entries.push_back(i);
364 if (entry->second.count .has_plus_child()) {
365 i = self.map.find(path_entry_key(parent, string_view(
"+")));
366 if (i != self.map.end()) {
367 if (parent != self.root_node_id || t.empty() || t[0] !=
'$') {
368 new_entries.push_back(i);
373 if (entry->second.count.has_hash_child()) {
374 i = self.map.find(path_entry_key(parent, string_view(
"#")));
375 if (i != self.map.end()) {
376 if (parent != self.root_node_id || t.empty() || t[0] !=
'$'){
377 callback(i->second.value);
383 std::swap(entries, new_entries);
384 return !entries.empty();
388 for (
auto& entry : entries) {
389 callback(entry->second.value);
394 template<
typename Output>
396 find_match_impl(*
this, topic, std::forward<Output>(callback));
400 template<
typename Output>
402 find_match_impl(*
this, topic, std::forward<Output>(callback));
405 template<
typename ThisType,
typename Output>
408 while(i !=
self.root_key) {
409 auto entry_iter =
self.map.find(i);
410 if (entry_iter ==
self.map.end()) {
411 throw_invalid_handle();
415 i = entry_iter->second.parent;
421 static void throw_invalid_handle() {
throw std::runtime_error(
"Subscription map invalid handle was specified"); }
422 static void throw_max_stored_topics() {
throw std::overflow_error(
"Subscription map maximum number of stored topic filters reached"); }
426 std::vector<map_type_iterator> result;
427 handle_to_iterators(*
this, h, [&result](map_type_iterator i) { result.push_back(i); });
428 std::reverse(result.begin(), result.end());
434 handle_to_iterators(*
this, h, [](map_type_iterator i) {
435 increase_count_storage(i->second.count);
441 if(map_size == std::numeric_limits<decltype(map_size)>::max()) {
442 throw_max_stored_topics();
450 BOOST_ASSERT(map_size > 0);
456 for (
auto i : path) {
457 increase_count_storage(i->second.count);
464 root_node_id = generate_node_id();
466 map.emplace(root_key, path_entry(root_node_id,
path_entry_key()));
474 std::size_t
size()
const {
return this->map_size; }
478 auto path = this->find_topic_filter(topic_filter);
480 return optional<handle>();
482 return this->path_to_handle(
force_move(path));
489 handle_to_iterators(*
this, h, [&result](map_type_const_iterator i) {
490 if (result.empty()) {
491 result = std::string(i->first.second);
494 result = std::string(i->first.second) +
"/" + result;
502 template<
typename Value>
512 template <
typename V>
515 if (!existing_subscription.empty()) {
516 if(existing_subscription.back()->second.value)
519 existing_subscription.back()->second.value.emplace(std::forward<V>(value));
524 new_topic_filter.back()->second.value = value;
530 template <
typename V>
537 path.back()->second.value.emplace(std::forward<V>(value));
540 template <
typename V>
542 auto entry_iter = this->
get_key(h);
543 if (entry_iter == this->
end()) {
546 entry_iter->second.value.emplace(std::forward<V>(value));
552 if (path.empty() || !path.back()->second.value) {
564 if (path.empty() || !path.back()->second.value) {
574 template<
typename Output>
578 [&callback]( optional<Value>
const& value ) {
580 callback(value.value());
588 template<
typename Key,
typename Value,
class Hash = std::hash<Key>,
class Pred = std::equal_to<Key>,
class Cont = std::unordered_map<Key, Value, Hash, Pred, std::allocator< std::pair<const Key, Value> > > >
601 template <
typename K,
typename V>
606 new_topic_filter.back()->second.value.emplace(std::forward<K>(key), std::forward<V>(value));
611 auto& subscription_set = path.back()->second.value;
613 #if __cpp_lib_unordered_map_try_emplace >= 201411L
614 auto insert_result = subscription_set.insert_or_assign(std::forward<K>(key), std::forward<V>(value));
615 if(insert_result.second) {
621 auto iter = subscription_set.find(key);
622 if(iter == subscription_set.end()) {
623 subscription_set.emplace(std::forward<K>(key), std::forward<V>(value));
627 iter->second = std::forward<V>(value);
637 template <
typename K,
typename V>
639 auto h_iter = this->
get_key(h);
640 if (h_iter == this->
end()) {
644 auto& subscription_set = h_iter->second.value;
646 #if __cpp_lib_unordered_map_try_emplace >= 201411L
647 auto insert_result = subscription_set.insert_or_assign(std::forward<K>(key), std::forward<V>(value));
648 if(insert_result.second) {
652 return std::make_pair(h, insert_result.second);
654 auto iter = subscription_set.find(key);
655 if(iter == subscription_set.end()) {
656 subscription_set.emplace(std::forward<K>(key), std::forward<V>(value));
660 iter->second = std::forward<V>(value);
662 return std::make_pair(h, iter == subscription_set.end());
670 auto h_iter = this->
get_key(h);
671 if (h_iter == this->
end()) {
676 auto result = h_iter->second.value.erase(key);
695 auto result = path.back()->second.value.erase(key);
705 template<
typename Output>
709 [&callback]( Cont
const &values ) {
710 for (
auto const& i : values) {
711 callback(i.first, i.second);
718 template<
typename Output>
722 [&callback]( Cont &values ) {
723 for (
auto& i : values) {
724 callback(i.first, i.second);
730 template<
typename Output>
732 out <<
"Root node id: " << this->
root_node_id << std::endl;
733 for (
auto const& i: this->
get_map()) {
734 out <<
"(" << i.first.first <<
", " << i.first.second <<
"): id: " << i.second.id <<
", size: " << i.second.value.size() <<
", value: " << i.second.count.value << std::endl;
#define MQTT_BROKER_NS_END
Definition: broker_namespace.hpp:22
#define MQTT_BROKER_NS_BEGIN
Definition: broker_namespace.hpp:21
Definition: subscription_map.hpp:591
std::pair< handle, bool > insert_or_assign(handle const &h, K &&key, V &&value)
Definition: subscription_map.hpp:638
Cont container_t
Definition: subscription_map.hpp:594
void modify(string_view topic, Output &&callback)
Definition: subscription_map.hpp:719
std::size_t erase(string_view topic_filter, Key const &key)
Definition: subscription_map.hpp:687
std::size_t erase(handle const &h, Key const &key)
Definition: subscription_map.hpp:668
void dump(Output &out)
Definition: subscription_map.hpp:731
std::pair< handle, bool > insert_or_assign(string_view topic_filter, K &&key, V &&value)
Definition: subscription_map.hpp:602
typename subscription_map_base< Value >::handle handle
Definition: subscription_map.hpp:597
void find(string_view topic, Output &&callback) const
Definition: subscription_map.hpp:706
Definition: subscription_map.hpp:504
void update(handle const &h, V &&value)
Definition: subscription_map.hpp:541
typename subscription_map_base< Value >::handle handle
Definition: subscription_map.hpp:509
std::size_t erase(handle const &h)
Definition: subscription_map.hpp:562
std::size_t erase(string_view topic_filter)
Definition: subscription_map.hpp:550
void update(string_view topic_filter, V &&value)
Definition: subscription_map.hpp:531
std::pair< handle, bool > insert(string_view topic_filter, V &&value)
Definition: subscription_map.hpp:513
void find(string_view topic, Output &&callback) const
Definition: subscription_map.hpp:575
Definition: subscription_map.hpp:170
handle path_to_handle(std::vector< map_type_iterator > const &path) const
Definition: subscription_map.hpp:244
subscription_map_base()
Definition: subscription_map.hpp:461
std::pair< node_id_t, buffer > path_entry_key
Definition: subscription_map.hpp:173
std::size_t internal_size() const
Definition: subscription_map.hpp:471
void remove_topic_filter(std::vector< map_type_iterator > const &path)
Definition: subscription_map.hpp:308
std::vector< map_type_iterator > handle_to_iterators(handle const &h)
Definition: subscription_map.hpp:425
map_type_iterator get_root()
Definition: subscription_map.hpp:232
void decrease_map_size()
Definition: subscription_map.hpp:449
void increase_map_size()
Definition: subscription_map.hpp:440
optional< handle > lookup(string_view topic_filter)
Definition: subscription_map.hpp:477
void increase_subscriptions(std::vector< map_type_iterator > const &path)
Definition: subscription_map.hpp:455
map_type_iterator get_key(path_entry_key key)
Definition: subscription_map.hpp:239
map_type_iterator end()
Definition: subscription_map.hpp:241
path_entry_key root_key
Definition: subscription_map.hpp:228
std::size_t node_id_t
Definition: subscription_map.hpp:172
node_id_t root_node_id
Definition: subscription_map.hpp:229
void increase_subscriptions(handle const &h)
Definition: subscription_map.hpp:433
static void find_match_impl(ThisType &self, string_view topic, Output &&callback)
Definition: subscription_map.hpp:346
map_type_const_iterator get_root() const
Definition: subscription_map.hpp:233
void find_match(string_view topic, Output &&callback) const
Definition: subscription_map.hpp:395
path_entry_key handle
Definition: subscription_map.hpp:174
std::vector< map_type_iterator > find_topic_filter(string_view topic_filter)
Definition: subscription_map.hpp:248
static void throw_invalid_topic_filter()
Definition: subscription_map.hpp:420
size_t map_size
Definition: subscription_map.hpp:237
static void handle_to_iterators(ThisType &self, handle const &h, Output &&output)
Definition: subscription_map.hpp:406
std::vector< map_type_iterator > create_topic_filter(string_view topic_filter)
Definition: subscription_map.hpp:271
void modify_match(string_view topic, Output &&callback)
Definition: subscription_map.hpp:401
static void throw_invalid_handle()
Definition: subscription_map.hpp:421
static void throw_max_stored_topics()
Definition: subscription_map.hpp:422
map_type const & get_map() const
Definition: subscription_map.hpp:242
map_type_iterator begin()
Definition: subscription_map.hpp:240
std::string handle_to_topic_filter(handle const &h) const
Definition: subscription_map.hpp:486
std::size_t size() const
Definition: subscription_map.hpp:474
id
Definition: property_id.hpp:19
boost::string_ref string_view
Definition: string_view.hpp:64
constexpr std::remove_reference_t< T > && force_move(T &&t)
Definition: move.hpp:20
const_buffer buffer(MQTT_NS::buffer const &data)
create boost::asio::const_buffer from the MQTT_NS::buffer boost::asio::const_buffer is a kind of view...
Definition: buffer.hpp:253
void increment_value()
Definition: subscription_map.hpp:108
void set_hash_child(bool v)
Definition: subscription_map.hpp:116
void set_plus_child(bool v)
Definition: subscription_map.hpp:121
void decrement_value()
Definition: subscription_map.hpp:111
static constexpr std::size_t max()
Definition: subscription_map.hpp:102
count_storage(std::uint32_t v=1)
Definition: subscription_map.hpp:98
bool has_hash_child() const
Definition: subscription_map.hpp:115
std::uint32_t value() const
Definition: subscription_map.hpp:104
bool has_plus_child() const
Definition: subscription_map.hpp:120
void set_value(std::uint32_t v)
Definition: subscription_map.hpp:105
void decrement_value()
Definition: subscription_map.hpp:148
bool has_plus_child() const
Definition: subscription_map.hpp:157
static constexpr std::uint64_t max()
Definition: subscription_map.hpp:139
void set_plus_child(bool v)
Definition: subscription_map.hpp:158
void set_hash_child(bool v)
Definition: subscription_map.hpp:153
bool has_hash_child() const
Definition: subscription_map.hpp:152
std::uint64_t value() const
Definition: subscription_map.hpp:141
void increment_value()
Definition: subscription_map.hpp:145
void set_value(std::uint64_t v)
Definition: subscription_map.hpp:142
count_storage(std::uint64_t v=1)
Definition: subscription_map.hpp:135
Definition: subscription_map.hpp:91
void topic_filter_tokenizer(Iterator first, Iterator last, Output write)
Definition: topic_filter_tokenizer.hpp:20