22 #include "event_trigger_manager.h"
24 #ifdef USE_TIMETRACKER
25 # include <utils/time/tracker.h>
27 #include <plugins/mongodb/utils.h>
28 #include <utils/time/tracker_macros.h>
30 #include <boost/bind.hpp>
31 #include <bsoncxx/json.hpp>
32 #include <mongocxx/exception/operation_exception.hpp>
33 #include <mongocxx/exception/query_exception.hpp>
36 using namespace mongocxx;
56 mongo_connection_manager_ = mongo_connection_manager;
58 con_local_ = mongo_connection_manager_->
create_client(
"robot-memory-local");
59 if (config_->
exists(
"/plugins/mongodb/clients/robot-memory-distributed/enabled")
60 && config_->
get_bool(
"/plugins/mongodb/clients/robot-memory-distributed/enabled")) {
61 con_replica_ = mongo_connection_manager_->
create_client(
"robot-memory-distributed");
65 std::string local_db = config_->
get_string(
"/plugins/robot-memory/database");
66 dbnames_local_.push_back(local_db);
67 dbnames_distributed_ = config_->
get_strings(
"/plugins/robot-memory/distributed-db-names");
72 cfg_debug_ = config->
get_bool(
"/plugins/robot-memory/more-debug-output");
75 #ifdef USE_TIMETRACKER
77 ttc_trigger_loop_ = tt_->add_class(
"RM Trigger Trigger Loop");
78 ttc_callback_loop_ = tt_->add_class(
"RM Trigger Callback Loop");
79 ttc_callback_ = tt_->add_class(
"RM Trigger Single Callback");
80 ttc_reinit_ = tt_->add_class(
"RM Trigger Reinit");
84 EventTriggerManager::~EventTriggerManager()
92 #ifdef USE_TIMETRACKER
98 EventTriggerManager::check_events()
103 TIMETRACK_START(ttc_trigger_loop_);
107 auto next = trigger->change_stream.begin();
108 TIMETRACK_START(ttc_callback_loop_);
109 while (next != trigger->change_stream.end()) {
112 TIMETRACK_START(ttc_callback_);
113 trigger->callback(*next);
115 TIMETRACK_END(ttc_callback_);
117 TIMETRACK_END(ttc_callback_loop_);
118 }
catch (operation_exception &e) {
119 logger_->
log_error(name.c_str(),
"Error while reading the change stream");
125 TIMETRACK_START(ttc_reinit_);
127 logger_->
log_debug(name.c_str(),
"Tailable Cursor is dead, requerying");
130 if (std::find(dbnames_distributed_.begin(),
131 dbnames_distributed_.end(),
133 != dbnames_distributed_.end()) {
138 auto db_coll_pair = split_db_collection_string(trigger->ns);
139 auto collection = con->database(db_coll_pair.first)[db_coll_pair.second];
141 trigger->change_stream = create_change_stream(collection, trigger->filter_query.view());
142 }
catch (mongocxx::query_exception &e) {
144 "Failed to create change stream, broken trigger for collection %s: %s",
148 TIMETRACK_END(ttc_reinit_);
151 TIMETRACK_END(ttc_trigger_loop_);
152 #ifdef USE_TIMETRACKER
153 if (++tt_loopcount_ % 5 == 0) {
154 tt_->print_to_stdout();
166 triggers.remove(trigger);
171 EventTriggerManager::create_change_stream(mongocxx::collection &coll, bsoncxx::document::view query)
177 if (!query.empty()) {
180 mongocxx::options::change_stream opts;
181 opts.full_document(
"updateLookup");
182 opts.max_await_time(std::chrono::milliseconds(1));
183 auto res = coll.watch(opts);
185 auto it = res.begin();
186 while (std::next(it) != res.end()) {}
198 std::string::size_type dot_pos = ns.find(
".");
199 if (dot_pos == std::string::npos) {
202 return ns.substr(0, dot_pos);
EventTriggerManager(fawkes::Logger *logger, fawkes::Configuration *config, fawkes::MongoDBConnCreator *mongo_connection_manager)
Constructor for class managing EventTriggers.
void remove_trigger(EventTrigger *trigger)
Remove a previously registered trigger.
static std::string get_db_name(const std::string &ns)
Split database name from namespace.
Class holding all information about an EventTrigger.
Interface for configuration handling.
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
virtual std::vector< std::string > get_strings(const char *path)=0
Get list of values from configuration which is of type string.
virtual bool exists(const char *path)=0
Check if a given value exists.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
Interface for a MongoDB connection creator.
virtual mongocxx::client * create_client(const std::string &config_name="")=0
Create a new MongoDB client.
virtual void delete_client(mongocxx::client *client)=0
Delete a client.
Mutex mutual exclusion lock.
Fawkes library namespace.