7 #if !defined(MQTT_BROKER_RETAINED_TOPIC_MAP_HPP)
8 #define MQTT_BROKER_RETAINED_TOPIC_MAP_HPP
10 #include <boost/functional/hash.hpp>
11 #include <boost/multi_index_container.hpp>
12 #include <boost/multi_index/ordered_index.hpp>
13 #include <boost/multi_index/hashed_index.hpp>
14 #include <boost/multi_index/composite_key.hpp>
15 #include <boost/multi_index/member.hpp>
26 namespace mi = boost::multi_index;
28 template<
typename Value>
32 static void throw_max_stored_topics() {
throw std::overflow_error(
"Retained map maximum number of topics reached"); }
33 static void throw_no_wildcards_allowed() {
throw std::runtime_error(
"Retained map no wildcards allowed in retained topic name"); }
35 using node_id_t = std::size_t;
37 static constexpr node_id_t root_parent_id = 0;
38 static constexpr node_id_t root_node_id = 1;
39 static constexpr node_id_t max_node_id = std::numeric_limits<node_id_t>::max();
48 std::size_t count = 1;
49 static constexpr std::size_t max_count = std::numeric_limits<std::size_t>::max();
52 void increase_count() {
53 if (count == max_count) {
54 throw_max_stored_topics();
61 void decrease_count() {
62 BOOST_ASSERT(count >= count);
66 optional<Value> value;
68 path_entry(node_id_t parent_id,
string_view name, node_id_t
id)
69 : parent_id(parent_id), name_buffer(
allocate_buffer(name)), name(name_buffer),
id(
id)
73 struct wildcard_index_tag { };
74 struct direct_index_tag { };
77 using path_entry_set = mi::multi_index_container<
82 mi::tag<direct_index_tag>,
83 mi::composite_key<path_entry,
84 BOOST_MULTI_INDEX_MEMBER(path_entry, node_id_t, parent_id),
85 BOOST_MULTI_INDEX_MEMBER(path_entry,
string_view, name) >
89 mi::ordered_non_unique< mi::tag<wildcard_index_tag>, BOOST_MULTI_INDEX_MEMBER(path_entry, node_id_t, parent_id) >
93 using direct_const_iterator =
typename path_entry_set::template index<direct_index_tag>::type::const_iterator;
94 using wildcard_const_iterator =
typename path_entry_set::template index<wildcard_index_tag>::type::const_iterator;
98 node_id_t next_node_id;
100 direct_const_iterator root;
102 direct_const_iterator create_topic(
string_view topic) {
103 direct_const_iterator parent = root;
108 if (t ==
"+" || t ==
"#") {
109 throw_no_wildcards_allowed();
112 node_id_t parent_id = parent->id;
114 auto& direct_index = map.template get<direct_index_tag>();
115 direct_const_iterator entry = direct_index.find(std::make_tuple(parent_id, t));
117 if (entry == direct_index.end()) {
118 entry = map.insert(path_entry(parent->id, t, next_node_id++)).first;
119 if (next_node_id == max_node_id) {
120 throw_max_stored_topics();
124 direct_index.modify(entry, [](path_entry& entry){ entry.increase_count(); });
135 std::vector<direct_const_iterator> find_topic(
string_view topic) {
136 std::vector<direct_const_iterator> path;
137 direct_const_iterator parent = root;
142 auto const& direct_index = map.template get<direct_index_tag>();
143 auto entry = direct_index.find(std::make_tuple(parent->id, t));
145 if (entry == direct_index.end()) {
146 path = std::vector<direct_const_iterator>();
150 path.push_back(entry);
161 template<
typename Output>
162 void match_hash_entries(node_id_t parent, Output&& callback,
bool ignore_system)
const {
163 std::deque<node_id_t> entries;
164 entries.push_back(parent);
165 std::deque<node_id_t> new_entries;
167 auto const& wildcard_index = map.template get<wildcard_index_tag>();
169 while (!entries.empty()) {
170 new_entries.resize(0);
172 for (
auto root : entries) {
174 for (
auto i = wildcard_index.lower_bound(root); i != wildcard_index.end() && i->parent_id == root; ++i) {
177 if (!ignore_system || i->name.empty() || i->name[0] !=
'$') {
182 new_entries.push_back(i->id);
188 ignore_system =
false;
189 std::swap(entries, new_entries);
195 template<
typename Output>
196 void find_match(
string_view topic_filter, Output&& callback)
const {
197 std::deque<direct_const_iterator> entries;
198 entries.push_back(root);
200 std::deque<direct_const_iterator> new_entries;
203 [
this, &entries, &new_entries, &callback](
string_view t) {
204 auto const& direct_index = map.template get<direct_index_tag>();
205 auto const& wildcard_index = map.template get<wildcard_index_tag>();
206 new_entries.resize(0);
208 for (
auto const& entry : entries) {
209 node_id_t parent = entry->id;
212 for (
auto i = wildcard_index.lower_bound(parent); i != wildcard_index.end() && i->parent_id == parent; ++i) {
213 if (parent != root_node_id || i->name.empty() || i->name[0] !=
'$') {
214 new_entries.push_back(map.template project<direct_index_tag, wildcard_const_iterator>(i));
222 match_hash_entries(parent, callback, parent == root_node_id);
226 direct_const_iterator i = direct_index.find(std::make_tuple(parent, t));
227 if (i != direct_index.end()) {
228 new_entries.push_back(i);
233 std::swap(new_entries, entries);
234 return !entries.empty();
238 for (
auto& entry : entries) {
240 callback(*entry->value);
247 auto path = find_topic(topic);
250 if (!path.empty() && path.back()->value) {
251 auto& direct_index = map.template get<direct_index_tag>();
252 direct_index.modify(path.back(), [](path_entry &entry){ entry.value = nullopt; });
255 for (
auto entry : path) {
256 direct_index.modify(entry, [](path_entry& entry){ entry.decrease_count(); });
258 if (entry->count == 0) {
270 void increase_topics(std::vector<direct_const_iterator>
const &path) {
271 auto& direct_index = map.template get<direct_index_tag>();
273 for(
auto& i : path) {
274 direct_index.modify(i, [](path_entry& entry){ entry.increase_count(); });
279 void increase_map_size() {
280 if(map_size == std::numeric_limits<decltype(map_size)>::max()) {
281 throw_max_stored_topics();
288 void decrease_map_size(
size_t count) {
289 BOOST_ASSERT(map_size >= count);
296 root = map.insert(path_entry(root_parent_id,
"", root_node_id)).first;
297 next_node_id = root_node_id + 1;
309 auto& direct_index = map.template get<direct_index_tag>();
310 auto path = this->find_topic(topic);
313 auto new_topic = this->create_topic(topic);
314 direct_index.modify(new_topic, [&value](path_entry &entry)
mutable { entry.value.emplace(std::forward<V>(value)); });
319 if (!path.back()->value) {
320 this->increase_topics(path);
321 direct_index.modify(path.back(), [&value](path_entry &entry)
mutable { entry.value.emplace(std::forward<V>(value)); });
326 direct_index.modify(path.back(), [&value](path_entry &entry)
mutable { entry.value.emplace(std::forward<V>(value)); });
332 template<
typename Output>
334 find_match(topic_filter, std::forward<Output>(callback));
339 auto result = erase_topic(topic);
340 decrease_map_size(result);
345 std::size_t
size()
const {
return map_size; }
357 template<
typename Output>
359 auto const& direct_index = map.template get<direct_index_tag>();
360 for (
auto const& i : direct_index) {
361 out << i.parent_id <<
" " << i.name <<
" " << (i.value ?
"init" :
"-") <<
" " << i.count << std::endl;
#define MQTT_BROKER_NS_END
Definition: broker_namespace.hpp:22
#define MQTT_BROKER_NS_BEGIN
Definition: broker_namespace.hpp:21
Definition: retained_topic_map.hpp:29
std::size_t insert_or_assign(string_view topic, V &&value)
Definition: retained_topic_map.hpp:308
std::size_t erase(string_view topic)
Definition: retained_topic_map.hpp:338
void dump(Output &out)
Definition: retained_topic_map.hpp:358
std::size_t internal_size() const
Definition: retained_topic_map.hpp:348
retained_topic_map()
Definition: retained_topic_map.hpp:301
void find(string_view topic_filter, Output &&callback) const
Definition: retained_topic_map.hpp:333
std::size_t size() const
Definition: retained_topic_map.hpp:345
void clear()
Definition: retained_topic_map.hpp:351
id
Definition: property_id.hpp:19
boost::string_ref string_view
Definition: string_view.hpp:64
buffer allocate_buffer(Iterator b, Iterator e)
create buffer from the pair of iterators It copies string that from b to e into shared_ptr_array....
Definition: buffer.hpp:130
const_buffer buffer(MQTT_NS::buffer const &data)
create boost::asio::const_buffer from the MQTT_NS::buffer boost::asio::const_buffer is a kind of view...
Definition: buffer.hpp:253
void topic_filter_tokenizer(Iterator first, Iterator last, Output write)
Definition: topic_filter_tokenizer.hpp:20