Fawkes API  Fawkes Development Version
plexil_thread.cpp
1 
2 /***************************************************************************
3  * plexil_thread.cpp - PLEXIL executive
4  *
5  * Created: Mon Aug 13 11:20:12 2018
6  * Copyright 2006-2018 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "plexil_thread.h"
23 
24 #include "be_adapter.h"
25 #include "clock_adapter.h"
26 #include "log_adapter.h"
27 #include "log_stream.h"
28 #include "protobuf_adapter.h"
29 #include "thread_adapter.h"
30 #ifdef HAVE_NAVGRAPH
31 # include "navgraph_access_thread.h"
32 # include "navgraph_adapter.h"
33 #endif
34 #include "utils.h"
35 
36 #include <core/threading/mutex_locker.h>
37 #include <utils/sub_process/proc.h>
38 #include <utils/system/dynamic_module/module.h>
39 
40 #include <AdapterConfiguration.hh>
41 #include <Debug.hh>
42 #include <ExecApplication.hh>
43 #include <InterfaceManager.hh>
44 #include <InterfaceSchema.hh>
45 #include <boost/filesystem.hpp>
46 #include <boost/interprocess/sync/file_lock.hpp>
47 #include <cstring>
48 #include <fstream>
49 #include <numeric>
50 #include <pugixml.hpp>
51 
52 using namespace fawkes;
53 namespace fs = boost::filesystem;
54 // for C++17 could be:
55 // namespace fs = std::filesystem;
56 
57 /** @class PlexilExecutiveThread "plexil_thread.h"
58  * Main thread of PLEXIL executive.
59  *
60  * @author Tim Niemueller
61  */
62 
63 /** Constructor. */
65 : Thread("PlexilExecutiveThread", Thread::OPMODE_CONTINUOUS)
66 {
68 }
69 
70 /** Destructor. */
72 {
73 }
74 
75 void
77 {
78  cfg_spec_ = config->get_string("/plexil/spec");
79 
80  std::string cfg_prefix = "/plexil/" + cfg_spec_ + "/";
81 
82  bool cfg_print_xml = config->get_bool_or_default((cfg_prefix + "debug/print-xml").c_str(), false);
83 
84  std::map<std::string, plexil_interface_config> cfg_adapters =
85  read_plexil_interface_configs(cfg_prefix + "adapters/");
86 
87  std::map<std::string, plexil_interface_config> cfg_listeners =
88  read_plexil_interface_configs(cfg_prefix + "listeners/");
89 
90  std::vector<std::string> cfg_lib_path =
91  config->get_strings_or_defaults((cfg_prefix + "plan/lib-path").c_str(), {});
92 
93  std::string cfg_basedir =
94  config->get_string_or_default((cfg_prefix + "plan/basedir").c_str(), "");
95 
96  for (auto &a_item : cfg_adapters) {
97  auto &a = a_item.second;
98  if (a.type == "Utility") {
99  logger->log_warn(name(), "Utility adapter configured, consider using FawkesLogging instead");
100  } else if (a.type == "OSNativeTime") {
101  logger->log_warn(name(),
102  "OSNativeTime adapter configured, consider using FawkesTime instead");
103  } else if (a.type == "FawkesRemoteAdapter") {
104  logger->log_error(name(), "Cannot load FawkesRemoteAdapter when running internally");
105  throw Exception("Plexil: cannot load FawkesRemoteAdapter when running internally");
106  }
107 
108  std::string filename =
109  std::string(LIBDIR) + "/plexil/" + a.type + "." + fawkes::Module::get_file_extension();
110  if (fs::exists(filename)) {
111  a.attr["LibPath"] = filename;
112  }
113  }
114 
115  plexil_.reset(new PLEXIL::ExecApplication);
116 
117  PLEXIL::g_manager->setProperty("::Fawkes::Config", config);
118  PLEXIL::g_manager->setProperty("::Fawkes::Clock", clock);
119  PLEXIL::g_manager->setProperty("::Fawkes::Logger", logger);
120  PLEXIL::g_manager->setProperty("::Fawkes::BlackBoard", blackboard);
121 
122  for (const auto &p : cfg_lib_path) {
123  plexil_->addLibraryPath(p);
124  }
125 
126  pugi::xml_document xml_config;
127  pugi::xml_node xml_interfaces =
128  xml_config.append_child(PLEXIL::InterfaceSchema::INTERFACES_TAG());
129 
130  add_plexil_interface_configs(xml_interfaces,
131  cfg_adapters,
132  PLEXIL::InterfaceSchema::ADAPTER_TAG(),
133  PLEXIL::InterfaceSchema::ADAPTER_TYPE_ATTR());
134  add_plexil_interface_configs(xml_interfaces,
135  cfg_listeners,
136  PLEXIL::InterfaceSchema::LISTENER_TAG(),
137  PLEXIL::InterfaceSchema::LISTENER_TYPE_ATTR());
138 
139  auto navgraph_adapter_config =
140  std::find_if(cfg_adapters.begin(), cfg_adapters.end(), [](const auto &entry) {
141  return entry.second.type == "NavGraphAdapter";
142  });
143  if (navgraph_adapter_config != cfg_adapters.end()) {
144 #ifdef HAVE_NAVGRAPH
145  navgraph_access_thread_ = new PlexilNavgraphAccessThread();
146  thread_collector->add(navgraph_access_thread_);
147  navgraph_ = navgraph_access_thread_->get_navgraph();
148  PLEXIL::g_manager->setProperty("::Fawkes::NavGraph", &navgraph_);
149 #else
150  throw Exception("NavGraph adapter configured, "
151  "but navgraph library not available at compile time");
152 #endif
153  }
154 
155  if (cfg_print_xml) {
156  struct xml_string_writer : pugi::xml_writer
157  {
158  std::string result;
159  virtual void
160  write(const void *data, size_t size)
161  {
162  result.append(static_cast<const char *>(data), size);
163  }
164  };
165 
166  xml_string_writer writer;
167  xml_config.save(writer);
168  logger->log_info(name(), "Interface config XML:\n%s", writer.result.c_str());
169  }
170 
171  if (config->get_bool_or_default((cfg_prefix + "debug/enable").c_str(), false)) {
172  std::vector<std::string> debug_markers =
173  config->get_strings_or_defaults((cfg_prefix + "debug/markers").c_str(), {});
174 
175  std::stringstream dbg_config;
176  for (const auto &m : debug_markers) {
177  dbg_config << m << std::endl;
178  }
179  PLEXIL::readDebugConfigStream(dbg_config);
180  }
181 
182  log_buffer_.reset(new PlexilLogStreamBuffer(logger));
183  log_stream_.reset(new std::ostream(&*log_buffer_));
184  PLEXIL::setDebugOutputStream(*log_stream_);
185 
186  if (!plexil_->initialize(xml_interfaces)) {
187  throw Exception("Failed to initialize Plexil application");
188  }
189 
190  if (config->is_list(cfg_prefix + "plan/ple")) {
191  cfg_plan_ple_ = config->get_strings_or_defaults((cfg_prefix + "plan/ple").c_str(), {});
192  } else {
193  std::string ple = config->get_string_or_default((cfg_prefix + "plan/ple").c_str(), "");
194  if (!ple.empty()) {
195  cfg_plan_ple_ = {ple};
196  }
197  }
198  if (cfg_plan_ple_.empty()) {
199  throw Exception("No PLE configured");
200  }
201  cfg_plan_plx_ = config->get_string((cfg_prefix + "plan/plx").c_str());
202  cfg_plan_auto_compile_ =
203  config->get_bool_or_default((cfg_prefix + "plan/compilation/enable").c_str(), false);
204  cfg_plan_force_compile_ =
205  config->get_bool_or_default((cfg_prefix + "plan/compilation/force").c_str(), false);
206 
207  if (!cfg_plan_plx_.empty()) {
208  cfg_plan_plx_ = cfg_basedir + "/" + cfg_plan_plx_;
209  replace_tokens(cfg_plan_plx_);
210  }
211 
212  std::set<std::string> base_paths;
213 
214  for (auto &p : cfg_plan_ple_) {
215  p = cfg_basedir + "/" + p;
216  replace_tokens(p);
217 
218  fs::path ple_path{p};
219  fs::path plx_path{fs::path{ple_path}.replace_extension(".plx")};
220 
221  // make sure not two processes try to compile at the same time
222  boost::interprocess::file_lock flock(ple_path.string().c_str());
223 
224  base_paths.insert(plx_path.parent_path().string());
225 
226  if (cfg_plan_auto_compile_) {
227  if (cfg_plan_force_compile_ || !fs::exists(plx_path)
228  || fs::last_write_time(plx_path) < fs::last_write_time(ple_path)) {
229  logger->log_info(name(), "Compiling %s", ple_path.string().c_str());
230  plexil_compile(ple_path.string());
231  }
232  } else {
233  if (!fs::exists(plx_path)) {
234  throw Exception("PLX %s does not exist and auto-compile disabled");
235  } else if (fs::last_write_time(plx_path) < fs::last_write_time(ple_path)) {
236  logger->log_warn(name(),
237  "PLX %s older than PLE, auto-compile disabled",
238  plx_path.string().c_str());
239  }
240  }
241  }
242 
243  if (!fs::exists(cfg_plan_plx_)) {
244  throw Exception("PLX %s does not exist", cfg_plan_plx_.c_str());
245  }
246 
247  for (const auto &p : base_paths) {
248  plexil_->addLibraryPath(p);
249  }
250 
251  plan_plx_.reset(new pugi::xml_document);
252  pugi::xml_parse_result parse_result = plan_plx_->load_file(cfg_plan_plx_.c_str());
253  if (parse_result.status != pugi::status_ok) {
254  throw Exception("Failed to parse plan '%s': %s",
255  cfg_plan_plx_.c_str(),
256  parse_result.description());
257  }
258 }
259 
260 void
262 {
263  if (!plexil_->startInterfaces()) {
264  throw Exception("Failed to start Plexil interfaces");
265  }
266  if (!plexil_->run()) {
267  throw Exception("Failed to start Plexil");
268  }
269 
270  if (!plexil_->addPlan(&*plan_plx_)) {
271  logger->log_error(name(), "Failed to add Plexil plan. See log for details");
272  } else {
273  plexil_->notifyExec();
274  }
275 }
276 
277 bool
279 {
280  if (!plexil_->stop()) {
281  logger->log_error(name(), "Failed to stop Plexil");
282  }
283  plexil_->notifyExec();
284  return true;
285 }
286 
287 void
289 {
290  if (!plexil_->shutdown()) {
291  logger->log_error(name(), "Failed to shutdown Plexil");
292  }
293  PLEXIL::g_configuration->clearAdapterRegistry();
294  plexil_->waitForShutdown();
295 
296  // We really should do a reset here, killing off the ExecApplication instance.
297  // However, the executive crashes in a state cache destructor if there is any
298  // active wait (or probably any active LookupOnChange, as here on time).
299  // Therefore, we accept this memleak here under the assumption, that we do not
300  // frequently reload the plexil plugin. This at least avoids the segfaut on quit.
301  plexil_.release();
302  //plexil_.reset();
303  log_stream_.reset();
304  log_buffer_.reset();
305  plan_plx_.reset();
306 #ifdef HAVE_NAVGRAPH
307  if (navgraph_) {
308  navgraph_.clear();
309  thread_collector->remove(navgraph_access_thread_);
310  delete navgraph_access_thread_;
311  }
312 #endif
313 }
314 
315 void
317 {
318  //plexil_->notifyExec();
319  //plexil_->waitForPlanFinished();
320  static PLEXIL::ExecApplication::ApplicationState state = PLEXIL::ExecApplication::APP_SHUTDOWN;
321  PLEXIL::ExecApplication::ApplicationState new_state = plexil_->getApplicationState();
322  if (new_state != state) {
323  logger->log_info(name(), "State changed to %s", plexil_->getApplicationStateName(new_state));
324  state = new_state;
325  }
326 
327  using namespace std::chrono_literals;
328  std::this_thread::sleep_for(500ms);
329 }
330 
331 // Parse adapter configurations
332 std::map<std::string, PlexilExecutiveThread::plexil_interface_config>
333 PlexilExecutiveThread::read_plexil_interface_configs(const std::string &config_prefix)
334 {
335  std::map<std::string, plexil_interface_config> cfg_adapters;
336 
337  std::unique_ptr<Configuration::ValueIterator> cfg_item{config->search(config_prefix)};
338  while (cfg_item->next()) {
339  std::string path = cfg_item->path();
340 
341  std::string::size_type start_pos = config_prefix.size();
342  std::string::size_type slash_pos = path.find("/", start_pos + 1);
343  if (slash_pos != std::string::npos) {
344  std::string id = path.substr(start_pos, slash_pos - start_pos);
345 
346  start_pos = slash_pos + 1;
347  slash_pos = path.find("/", start_pos);
348  std::string what = path.substr(start_pos, slash_pos - start_pos);
349 
350  if (what == "type") {
351  cfg_adapters[id].type = cfg_item->get_string();
352  } else if (what == "attr") {
353  start_pos = slash_pos + 1;
354  slash_pos = path.find("/", start_pos);
355  std::string key = path.substr(start_pos, slash_pos - start_pos);
356  cfg_adapters[id].attr[key] = cfg_item->get_as_string();
357  } else if (what == "args") {
358  start_pos = slash_pos + 1;
359  slash_pos = path.find("/", start_pos);
360  std::string key = path.substr(start_pos, slash_pos - start_pos);
361  cfg_adapters[id].args[key] = cfg_item->get_as_string();
362  } else if (what == "verbatim-args") {
363  start_pos = slash_pos + 1;
364  slash_pos = path.find("/", start_pos);
365  std::string verb_id = path.substr(start_pos, slash_pos - start_pos);
366 
367  start_pos = slash_pos + 1;
368  slash_pos = path.find("/", start_pos);
369  std::string verb_what = path.substr(start_pos, slash_pos - start_pos);
370 
371  if (verb_what == "tag") {
372  cfg_adapters[id].verbatim_args[verb_id].tag = cfg_item->get_as_string();
373  } else if (verb_what == "text") {
374  cfg_adapters[id].verbatim_args[verb_id].has_text = true;
375  cfg_adapters[id].verbatim_args[verb_id].text = cfg_item->get_as_string();
376  } else if (verb_what == "attr") {
377  start_pos = slash_pos + 1;
378  slash_pos = path.find("/", start_pos);
379  std::string verb_key = path.substr(start_pos, slash_pos - start_pos);
380  cfg_adapters[id].verbatim_args[verb_id].attr[verb_key] = cfg_item->get_as_string();
381  }
382  } else if (what == "verbatim-xml") {
383  logger->log_warn(name(), "Parsing verbatim");
384  pugi::xml_parse_result parse_result =
385  cfg_adapters[id].verbatim.load_string(cfg_item->get_string().c_str());
386  if (parse_result.status != pugi::status_ok) {
387  throw Exception("Failed to parse verbatim-xml for '%s': %s",
388  cfg_adapters[id].type.c_str(),
389  parse_result.description());
390  }
391  }
392  }
393  }
394  return cfg_adapters;
395 }
396 
397 // Add adapter configurations to Plexil interface XML config
398 void
399 PlexilExecutiveThread::add_plexil_interface_configs(
400  pugi::xml_node & parent,
401  const std::map<std::string, PlexilExecutiveThread::plexil_interface_config> &configs,
402  const char * tag_name,
403  const char * type_attr_name)
404 {
405  for (const auto &a_item : configs) {
406  const auto & a = a_item.second;
407  pugi::xml_node xml_adapter = parent.append_child(tag_name);
408  xml_adapter.append_attribute(type_attr_name).set_value(a.type.c_str());
409  for (const auto &attr : a.attr) {
410  xml_adapter.append_attribute(attr.first.c_str()).set_value(attr.second.c_str());
411  }
412  for (const auto &arg : a.args) {
413  pugi::xml_node xml_adapter_arg = xml_adapter.append_child("Parameter");
414  xml_adapter_arg.append_attribute("key").set_value(arg.first.c_str());
415  xml_adapter_arg.text().set(arg.second.c_str());
416  }
417  for (const auto &arg : a.verbatim_args) {
418  const auto & varg = arg.second;
419  pugi::xml_node xml_adapter_arg = xml_adapter.append_child(varg.tag.c_str());
420  for (const auto &attr : varg.attr) {
421  xml_adapter_arg.append_attribute(attr.first.c_str()).set_value(attr.second.c_str());
422  }
423  if (varg.has_text) {
424  xml_adapter_arg.text().set(varg.text.c_str());
425  }
426  }
427  if (a.verbatim && a.verbatim.children().begin() != a.verbatim.children().end()) {
428  for (const auto &child : a.verbatim.children()) {
429  xml_adapter.append_copy(child);
430  }
431  }
432  }
433 }
434 
435 void
436 PlexilExecutiveThread::plexil_compile(const std::string &ple_file)
437 {
438  std::vector<std::string> argv{"plexilc", ple_file};
439  std::string command_line =
440  std::accumulate(std::next(argv.begin()),
441  argv.end(),
442  argv.front(),
443  [](std::string &s, const std::string &a) { return s + " " + a; });
444  logger->log_debug(name(), "Compiler command: %s", command_line.c_str());
445 
446  SubProcess proc("plexilc", "plexilc", argv, {}, logger);
447  using namespace std::chrono_literals;
448  auto compile_start = std::chrono::system_clock::now();
449  auto now = std::chrono::system_clock::now();
450  do {
451  proc.check_proc();
452  if (!proc.alive()) {
453  if (proc.exit_status() != 0) {
454  throw Exception("Plexil compilation failed, check log for messages.");
455  } else {
456  break;
457  }
458  }
459  now = std::chrono::system_clock::now();
460  std::this_thread::sleep_for(500ms);
461  } while (now < compile_start + 30s);
462  if (proc.alive()) {
463  proc.kill(SIGINT);
464  throw Exception("Plexil compilation timeout after 30s");
465  }
466 }
virtual void init()
Initialize the thread.
virtual ~PlexilExecutiveThread()
Destructor.
PlexilExecutiveThread()
Constructor.
virtual void loop()
Code to execute in the thread.
virtual void finalize()
Finalize the thread.
virtual bool prepare_finalize_user()
Prepare finalization user implementation.
virtual void once()
Execute an action exactly once.
Log Plexil log output to Fawkes logger.
Definition: log_stream.h:31
Access to internal navgraph for Plexil.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual const char * path() const =0
Path of value.
virtual std::string get_string_or_default(const char *path, const std::string &default_val)
Get value from configuration which is of type string, or the given default if the path does not exist...
Definition: config.cpp:736
virtual ValueIterator * search(const char *path)=0
Iterator with search results.
virtual bool is_list(const char *path)=0
Check if a value is a list.
virtual bool get_bool_or_default(const char *path, const bool &default_val)
Get value from configuration which is of type bool, or the given default if the path does not exist.
Definition: config.cpp:726
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
virtual std::vector< std::string > get_strings_or_defaults(const char *path, const std::vector< std::string > &default_val)
Get list of values from configuration which is of type string, or the given default if the path does ...
Definition: config.cpp:786
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
static const char * get_file_extension()
Get file extension for dl modules.
Definition: module.cpp:260
Sub-process execution with stdin/stdout/stderr redirection.
Definition: proc.h:37
bool alive()
Check if process is alive.
Definition: proc.cpp:353
int exit_status()
Get exit status of process once it ended.
Definition: proc.cpp:365
void check_proc()
Check if the process is still alive.
Definition: proc.cpp:375
void kill(int signum)
Send a signal to the process.
Definition: proc.cpp:188
Thread class encapsulation of pthreads.
Definition: thread.h:46
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:716
const char * name() const
Get name of thread.
Definition: thread.h:100
Fawkes library namespace.