22 #include "robot_memory.h"
24 #include <core/threading/mutex.h>
25 #include <core/threading/mutex_locker.h>
26 #include <interfaces/RobotMemoryInterface.h>
27 #include <plugins/mongodb/utils.h>
28 #include <utils/misc/string_conversions.h>
29 #include <utils/misc/string_split.h>
30 #include <utils/system/hostinfo.h>
31 #ifdef USE_TIMETRACKER
32 # include <utils/time/tracker.h>
34 #include <utils/time/tracker_macros.h>
36 #include <bsoncxx/builder/basic/document.hpp>
38 #include <mongocxx/client.hpp>
39 #include <mongocxx/exception/operation_exception.hpp>
40 #include <mongocxx/read_preference.hpp>
45 using namespace mongocxx;
46 using namespace bsoncxx;
76 mongo_connection_manager_ = mongo_connection_manager;
77 blackboard_ = blackboard;
78 mongodb_client_local_ =
nullptr;
79 mongodb_client_distributed_ =
nullptr;
83 RobotMemory::~RobotMemory()
85 mongo_connection_manager_->delete_client(mongodb_client_local_);
86 mongo_connection_manager_->delete_client(mongodb_client_distributed_);
87 delete trigger_manager_;
88 blackboard_->close(rm_if_);
89 #ifdef USE_TIMETRACKER
98 log(
"Started RobotMemory");
99 default_collection_ =
"robmem.test";
101 default_collection_ = config_->get_string(
"/plugins/robot-memory/default-collection");
105 debug_ = config_->get_bool(
"/plugins/robot-memory/more-debug-output");
108 database_name_ =
"robmem";
110 database_name_ = config_->get_string(
"/plugins/robot-memory/database");
113 distributed_dbs_ = config_->get_strings(
"/plugins/robot-memory/distributed-db-names");
115 cfg_coord_database_ = config_->get_string(
"/plugins/robot-memory/coordination/database");
116 cfg_coord_mutex_collection_ =
117 config_->get_string(
"/plugins/robot-memory/coordination/mutex-collection");
119 using namespace std::chrono_literals;
122 log(
"Connect to local mongod");
123 mongodb_client_local_ = mongo_connection_manager_->create_client(
"robot-memory-local");
125 if (config_->exists(
"/plugins/mongodb/clients/robot-memory-distributed/enabled")
126 && config_->get_bool(
"/plugins/mongodb/clients/robot-memory-distributed/enabled")) {
128 log(
"Connect to distributed mongod");
129 mongodb_client_distributed_ =
130 mongo_connection_manager_->create_client(
"robot-memory-distributed");
134 rm_if_ = blackboard_->open_for_writing<RobotMemoryInterface>(
135 config_->get_string(
"/plugins/robot-memory/interface-name").c_str());
136 rm_if_->set_error(
"");
137 rm_if_->set_result(
"");
144 log_deb(
"Initialized RobotMemory");
146 #ifdef USE_TIMETRACKER
149 ttc_events_ = tt_->add_class(
"RobotMemory Events");
150 ttc_cleanup_ = tt_->add_class(
"RobotMemory Cleanup");
157 TIMETRACK_START(ttc_events_);
158 trigger_manager_->check_events();
159 TIMETRACK_END(ttc_events_);
160 TIMETRACK_START(ttc_cleanup_);
161 computables_manager_->cleanup_computed_docs();
162 TIMETRACK_END(ttc_cleanup_);
163 #ifdef USE_TIMETRACKER
164 if (++tt_loopcount_ % 5 == 0) {
165 tt_->print_to_stdout();
179 const std::string & collection_name,
180 mongocxx::options::find query_options)
182 collection collection = get_collection(collection_name);
183 log_deb(std::string(
"Executing Query " + to_json(query) +
" on collection " + collection_name));
186 computables_manager_->check_and_compute(query, collection_name);
193 return collection.find(query, query_options);
194 }
catch (mongocxx::operation_exception &e) {
196 std::string(
"Error for query ") + to_json(query) +
"\n Exception: " + e.what();
208 bsoncxx::document::value
210 const std::string & collection)
254 collection collection = get_collection(collection_name);
255 log_deb(std::string(
"Inserting " + to_json(doc) +
" into collection " + collection_name));
260 collection.insert_one(doc);
261 }
catch (mongocxx::operation_exception &e) {
262 std::string error =
"Error for insert " + to_json(doc) +
"\n Exception: " + e.what();
263 log_deb(error,
"error");
278 const std::string & collection_name,
281 collection collection = get_collection(collection_name);
283 log_deb(std::string(
"Creating index " + to_json(keys) +
" on collection " + collection_name));
290 using namespace bsoncxx::builder::basic;
291 collection.create_index(keys, make_document(kvp(
"unique", unique)));
292 }
catch (operation_exception &e) {
293 std::string error =
"Error when creating index " + to_json(keys) +
"\n Exception: " + e.what();
294 log_deb(error,
"error");
310 collection collection = get_collection(collection_name);
311 std::string insert_string =
"[";
312 for (
auto &&doc : docs) {
313 insert_string += to_json(doc) +
",\n";
315 insert_string +=
"]";
317 log_deb(std::string(
"Inserting vector of documents " + insert_string +
" into collection "
325 collection.insert_many(docs);
326 }
catch (operation_exception &e) {
327 std::string error =
"Error for insert " + insert_string +
"\n Exception: " + e.what();
328 log_deb(error,
"error");
344 return insert(from_json(obj_str), collection);
357 const bsoncxx::document::view &update,
358 const std::string & collection_name,
361 collection collection = get_collection(collection_name);
362 log_deb(std::string(
"Executing Update " + to_json(update) +
" for query " + to_json(query)
363 +
" on collection " + collection_name));
370 collection.update_many(query,
371 builder::basic::make_document(
372 builder::basic::kvp(
"$set", builder::concatenate(update))),
373 options::update().upsert(upsert));
374 }
catch (operation_exception &e) {
375 log_deb(std::string(
"Error for update " + to_json(update) +
" for query " + to_json(query)
376 +
"\n Exception: " + e.what()),
394 const std::string & update_str,
395 const std::string & collection,
398 return update(query, from_json(update_str), collection, upsert);
412 const document::view &update,
413 const std::string & collection_name,
417 collection collection = get_collection(collection_name);
419 log_deb(std::string(
"Executing findOneAndUpdate " + to_json(update) +
" for filter "
420 + to_json(filter) +
" on collection " + collection_name));
426 collection.find_one_and_update(filter,
428 options::find_one_and_update().upsert(upsert).return_document(
429 return_new ? options::return_document::k_after
430 : options::return_document::k_before));
434 std::string error =
"Error for update " + to_json(update) +
" for query " + to_json(filter)
435 +
"FindOneAndUpdate unexpectedly did not return a document";
436 log_deb(error,
"warn");
437 return bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp(
"error", error));
439 }
catch (operation_exception &e) {
440 std::string error =
"Error for update " + to_json(update) +
" for query " + to_json(filter)
441 +
"\n Exception: " + e.what();
442 log_deb(error,
"error");
443 return bsoncxx::builder::basic::make_document(bsoncxx::builder::basic::kvp(
"error", error));
458 collection collection = get_collection(collection_name);
459 log_deb(std::string(
"Executing Remove " + to_json(query) +
" on collection " + collection_name));
462 collection.delete_many(query);
463 }
catch (operation_exception &e) {
464 log_deb(std::string(
"Error for query " + to_json(query) +
"\n Exception: " + e.what()),
480 bsoncxx::document::value
482 const std::string & collection,
483 const std::string & js_map_fun,
484 const std::string & js_reduce_fun)
534 collection collection = get_collection(collection_name);
535 log_deb(
"Dropping collection " + collection_name);
550 log_deb(
"Clearing whole robot memory");
551 mongodb_client_local_->database(database_name_).drop();
567 const std::string &directory,
568 std::string target_dbcollection)
570 if (target_dbcollection ==
"") {
571 target_dbcollection = dbcollection;
574 drop_collection(target_dbcollection);
579 auto [db, collection] = split_db_collection_string(dbcollection);
582 log_deb(std::string(
"Restore collection " + collection +
" from " + path),
"warn");
584 auto [target_db, target_collection] = split_db_collection_string(target_dbcollection);
587 std::string command =
"/usr/bin/mongorestore --dir " + path +
" -d " + target_db +
" -c "
588 + target_collection +
" --host=" + get_hostport(dbcollection);
589 log_deb(std::string(
"Restore command: " + command),
"warn");
590 FILE *bash_output = popen(command.c_str(),
"r");
594 log(std::string(
"Unable to restore collection" + collection),
"error");
597 std::string output_string =
"";
599 while (!feof(bash_output)) {
600 if (fgets(buffer, 100, bash_output) == NULL) {
603 output_string += buffer;
606 if (output_string.find(
"Failed") != std::string::npos) {
607 log(std::string(
"Unable to restore collection" + collection),
"error");
608 log_deb(output_string,
"error");
627 log_deb(std::string(
"Dump collection " + dbcollection +
" into " + path),
"warn");
629 auto [db, collection] = split_db_collection_string(dbcollection);
631 std::string command =
"/usr/bin/mongodump --out=" + path +
" --db=" + db
632 +
" --collection=" + collection +
" --forceTableScan"
633 +
" --host=" + get_hostport(dbcollection);
634 log(std::string(
"Dump command: " + command),
"info");
635 FILE *bash_output = popen(command.c_str(),
"r");
638 log(std::string(
"Unable to dump collection" + collection),
"error");
641 std::string output_string =
"";
643 while (!feof(bash_output)) {
644 if (fgets(buffer, 100, bash_output) == NULL) {
647 output_string += buffer;
650 if (output_string.find(
"Failed") != std::string::npos) {
651 log(std::string(
"Unable to dump collection" + collection),
"error");
652 log_deb(output_string,
"error");
659 RobotMemory::log(
const std::string &what,
const std::string &info)
661 if (!info.compare(
"error"))
662 logger_->log_error(name_,
"%s", what.c_str());
663 else if (!info.compare(
"warn"))
664 logger_->log_warn(name_,
"%s", what.c_str());
665 else if (!info.compare(
"debug"))
666 logger_->log_debug(name_,
"%s", what.c_str());
668 logger_->log_info(name_,
"%s", what.c_str());
672 RobotMemory::log_deb(
const std::string &what,
const std::string &level)
680 RobotMemory::log_deb(
const bsoncxx::document::view &query,
681 const std::string & what,
682 const std::string & level)
685 log(query, what, level);
690 RobotMemory::log(
const bsoncxx::document::view &query,
691 const std::string & what,
692 const std::string & level)
694 log(what +
" " + to_json(query), level);
702 RobotMemory::is_distributed_database(
const std::string &dbcollection)
704 return std::find(distributed_dbs_.begin(),
705 distributed_dbs_.end(),
706 split_db_collection_string(dbcollection).first)
707 != distributed_dbs_.end();
711 RobotMemory::get_hostport(
const std::string &dbcollection)
713 if (distributed_ && is_distributed_database(dbcollection)) {
714 return config_->get_string(
"/plugins/mongodb/clients/robot-memory-distributed-direct/hostport");
716 return config_->get_string(
"/plugins/mongodb/clients/robot-memory-local-direct/hostport");
726 RobotMemory::get_mongodb_client(
const std::string &collection)
729 return mongodb_client_local_;
731 if (is_distributed_database(collection)) {
732 return mongodb_client_distributed_;
734 return mongodb_client_local_;
745 RobotMemory::get_collection(
const std::string &dbcollection)
747 auto db_coll_pair = split_db_collection_string(dbcollection);
749 if (is_distributed_database(dbcollection)) {
750 client = mongodb_client_distributed_;
752 client = mongodb_client_local_;
754 return client->database(db_coll_pair.first)[db_coll_pair.second];
764 trigger_manager_->remove_trigger(trigger);
774 computables_manager_->remove_computable(computable);
788 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
789 using namespace bsoncxx::builder;
790 basic::document insert_doc{};
791 insert_doc.append(basic::kvp(
"$currentDate", [](basic::sub_document subdoc) {
792 subdoc.append(basic::kvp(
"lock-time",
true));
794 insert_doc.append(basic::kvp(
"_id", name));
795 insert_doc.append(basic::kvp(
"locked",
false));
798 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
799 auto write_concern = mongocxx::write_concern();
800 write_concern.majority(std::chrono::milliseconds(0));
801 collection.insert_one(insert_doc.view(), options::insert().write_concern(write_concern));
803 }
catch (operation_exception &e) {
804 logger_->log_info(name_,
"Failed to create mutex %s: %s", name.c_str(), e.what());
818 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
819 using namespace bsoncxx::builder;
820 basic::document destroy_doc;
821 destroy_doc.append(basic::kvp(
"_id", name));
824 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
825 auto write_concern = mongocxx::write_concern();
826 write_concern.majority(std::chrono::milliseconds(0));
827 collection.delete_one(destroy_doc.view(),
828 options::delete_options().write_concern(write_concern));
830 }
catch (operation_exception &e) {
831 logger_->log_info(name_,
"Failed to destroy mutex %s: %s", name.c_str(), e.what());
850 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
852 std::string locked_by{identity};
853 if (identity.empty()) {
855 locked_by = host_info.
name();
859 using namespace bsoncxx::builder;
860 basic::document filter_doc;
861 filter_doc.append(basic::kvp(
"_id", name));
863 filter_doc.append(basic::kvp(
"locked",
false));
866 basic::document update_doc;
867 update_doc.append(basic::kvp(
"$currentDate", [](basic::sub_document subdoc) {
868 subdoc.append(basic::kvp(
"lock-time",
true));
870 update_doc.append(basic::kvp(
"$set", [locked_by](basic::sub_document subdoc) {
871 subdoc.append(basic::kvp(
"locked",
true));
872 subdoc.append(basic::kvp(
"locked-by", locked_by));
876 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
877 auto write_concern = mongocxx::write_concern();
878 write_concern.majority(std::chrono::milliseconds(0));
880 collection.find_one_and_update(filter_doc.view(),
882 options::find_one_and_update()
884 .return_document(options::return_document::k_after)
885 .write_concern(write_concern));
890 auto new_view = new_doc->view();
891 return (new_view[
"locked-by"].get_utf8().value.to_string() == locked_by
892 && new_view[
"locked"].get_bool());
894 }
catch (operation_exception &e) {
895 logger_->log_error(name_,
"Mongo OperationException: %s", e.what());
898 basic::document check_doc;
899 check_doc.append(basic::kvp(
"_id", name));
900 check_doc.append(basic::kvp(
"locked",
true));
901 check_doc.append(basic::kvp(
"locked-by", locked_by));
903 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
904 auto res = collection.find_one(check_doc.view());
905 logger_->log_info(name_,
"Checking whether mutex was acquired succeeded");
907 logger_->log_warn(name_,
908 "Exception during try-lock for %s, "
909 "but mutex was still acquired",
912 logger_->log_info(name_,
913 "Exception during try-lock for %s, "
914 "and mutex was not acquired",
917 return static_cast<bool>(res);
918 }
catch (operation_exception &e) {
919 logger_->log_error(name_,
920 "Mongo OperationException while handling "
921 "the first exception: %s",
941 return mutex_try_lock(name,
"", force);
952 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
954 std::string locked_by{identity};
955 if (identity.empty()) {
957 locked_by = host_info.
name();
960 using namespace bsoncxx::builder;
962 basic::document filter_doc;
963 filter_doc.append(basic::kvp(
"_id", name));
964 filter_doc.append(basic::kvp(
"locked-by", locked_by));
966 basic::document update_doc;
967 update_doc.append(basic::kvp(
"$set", [](basic::sub_document subdoc) {
968 subdoc.append(basic::kvp(
"locked",
false));
970 update_doc.append(basic::kvp(
"$unset", [](basic::sub_document subdoc) {
971 subdoc.append(basic::kvp(
"locked-by",
true));
972 subdoc.append(basic::kvp(
"lock-time",
true));
977 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
978 auto write_concern = mongocxx::write_concern();
979 write_concern.majority(std::chrono::milliseconds(0));
981 collection.find_one_and_update(filter_doc.view(),
983 options::find_one_and_update()
985 .return_document(options::return_document::k_after)
986 .write_concern(write_concern));
990 return new_doc->view()[
"locked"].get_bool();
991 }
catch (operation_exception &e) {
1008 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1010 std::string locked_by{identity};
1011 if (identity.empty()) {
1013 locked_by = host_info.
name();
1016 using namespace bsoncxx::builder;
1018 basic::document filter_doc;
1019 filter_doc.append(basic::kvp(
"_id", name));
1020 filter_doc.append(basic::kvp(
"locked",
true));
1021 filter_doc.append(basic::kvp(
"locked-by", locked_by));
1025 basic::document update_doc;
1026 update_doc.append(basic::kvp(
"$currentDate", [](basic::sub_document subdoc) {
1027 subdoc.append(basic::kvp(
"lock-time",
true));
1029 update_doc.append(basic::kvp(
"$set", [locked_by](basic::sub_document subdoc) {
1030 subdoc.append(basic::kvp(
"locked",
true));
1031 subdoc.append(basic::kvp(
"locked-by", locked_by));
1036 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1037 auto write_concern = mongocxx::write_concern();
1038 write_concern.majority(std::chrono::milliseconds(0));
1040 collection.find_one_and_update(filter_doc.view(),
1042 options::find_one_and_update()
1044 .return_document(options::return_document::k_after)
1045 .write_concern(write_concern));
1046 return static_cast<bool>(new_doc);
1047 }
catch (operation_exception &e) {
1048 logger_->log_warn(name_,
"Renewing lock on mutex %s failed: %s", name.c_str(), e.what());
1069 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1071 auto keys = builder::basic::make_document(builder::basic::kvp(
"lock-time",
true));
1074 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1075 collection.create_index(keys.view(),
1076 builder::basic::make_document(
1077 builder::basic::kvp(
"expireAfterSeconds", max_age_sec)));
1078 }
catch (operation_exception &e) {
1079 logger_->log_warn(name_,
"Creating TTL index failed: %s", e.what());
1094 client *client = distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1096 using std::chrono::high_resolution_clock;
1097 using std::chrono::milliseconds;
1098 using std::chrono::time_point;
1099 using std::chrono::time_point_cast;
1101 auto max_age_ms = milliseconds(
static_cast<unsigned long int>(std::floor(max_age_sec * 1000)));
1102 time_point<high_resolution_clock, milliseconds> expire_before =
1103 time_point_cast<milliseconds>(high_resolution_clock::now()) - max_age_ms;
1104 types::b_date expire_before_mdb(expire_before);
1107 using namespace bsoncxx::builder;
1108 basic::document filter_doc;
1109 filter_doc.append(basic::kvp(
"locked",
true));
1110 filter_doc.append(basic::kvp(
"lock-time", [expire_before_mdb](basic::sub_document subdoc) {
1111 subdoc.append(basic::kvp(
"$lt", expire_before_mdb));
1114 basic::document update_doc;
1115 update_doc.append(basic::kvp(
"$set", [](basic::sub_document subdoc) {
1116 subdoc.append(basic::kvp(
"locked",
false));
1118 update_doc.append(basic::kvp(
"$unset", [](basic::sub_document subdoc) {
1119 subdoc.append(basic::kvp(
"locked-by",
true));
1120 subdoc.append(basic::kvp(
"lock-time",
true));
1125 collection collection = client->database(cfg_coord_database_)[cfg_coord_mutex_collection_];
1126 auto write_concern = mongocxx::write_concern();
1127 write_concern.majority(std::chrono::milliseconds(0));
1128 collection.update_many(filter_doc.view(),
1130 options::update().write_concern(write_concern));
1132 }
catch (operation_exception &e) {
1133 log(std::string(
"Failed to expire locks: " + std::string(e.what())),
"error");
Class holding information for a single computable this class also enhances computed documents by addi...
This class manages registering computables and can check if any computables are invoced by a query.
Manager to realize triggers on events in the robot memory.
Class holding all information about an EventTrigger.
bool mutex_create(const std::string &name)
Explicitly create a mutex.
int dump_collection(const std::string &dbcollection, const std::string &directory="@CONFDIR@/robot-memory")
Dump (= save) a collection to the filesystem to restore it later.
bool mutex_destroy(const std::string &name)
Destroy a mutex.
bool mutex_renew_lock(const std::string &name, const std::string &identity)
Renew a mutex.
bool mutex_unlock(const std::string &name, const std::string &identity)
Release lock on mutex.
void remove_trigger(EventTrigger *trigger)
Remove a previously registered trigger.
mongocxx::cursor query(bsoncxx::document::view query, const std::string &collection_name="", mongocxx::options::find query_options=mongocxx::options::find())
Query information from the robot memory.
bsoncxx::document::value find_one_and_update(const bsoncxx::document::view &filter, const bsoncxx::document::view &update, const std::string &collection, bool upsert=false, bool return_new=true)
Atomically update and retrieve document.
bool mutex_setup_ttl(float max_age_sec)
Setup time-to-live index for mutexes.
bsoncxx::document::value mapreduce(const bsoncxx::document::view &query, const std::string &collection, const std::string &js_map_fun, const std::string &js_reduce_fun)
Performs a MapReduce operation on the robot memory (https://docs.mongodb.com/manual/core/map-reduce/)
int remove(const bsoncxx::document::view &query, const std::string &collection="")
Remove documents from the robot memory.
int insert(bsoncxx::document::view, const std::string &collection="")
Inserts a document into the robot memory.
void remove_computable(Computable *computable)
Remove previously registered computable.
int drop_collection(const std::string &collection)
Drop (= remove) a whole collection and all documents inside it.
int update(const bsoncxx::document::view &query, const bsoncxx::document::view &update, const std::string &collection="", bool upsert=false)
Updates documents in the robot memory.
bsoncxx::document::value aggregate(const std::vector< bsoncxx::document::view > &pipeline, const std::string &collection="")
Aggregation call on the robot memory.
bool mutex_try_lock(const std::string &name, bool force=false)
Try to acquire a lock for a mutex.
int create_index(bsoncxx::document::view keys, const std::string &collection="", bool unique=false)
Create an index on a collection.
int clear_memory()
Remove the whole database of the robot memory and all documents inside.
RobotMemory(fawkes::Configuration *config, fawkes::Logger *logger, fawkes::Clock *clock, fawkes::MongoDBConnCreator *mongo_connection_manager, fawkes::BlackBoard *blackboard)
Robot Memory Constructor with objects of the thread.
int restore_collection(const std::string &dbcollection, const std::string &directory="@CONFDIR@/robot-memory", std::string target_dbcollection="")
Restore a previously dumped collection from a directory.
bool mutex_expire_locks(float max_age_sec)
Expire old locks on mutexes.
The BlackBoard abstract class.
This is supposed to be the central clock in Fawkes.
Interface for configuration handling.
Base class for exceptions in Fawkes.
const char * name()
Get full hostname.
Interface for a MongoDB connection creator.
static std::string resolve_path(std::string s)
Resolves path-string with @...@ tags.
Fawkes library namespace.