Fawkes API  Fawkes Development Version
notifier.cpp
1 
2 /***************************************************************************
3  * notifier.cpp - BlackBoard notifier
4  *
5  * Created: Mon Mar 03 23:28:18 2008
6  * Copyright 2006-2008 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <blackboard/blackboard.h>
25 #include <blackboard/interface_listener.h>
26 #include <blackboard/interface_observer.h>
27 #include <blackboard/internal/notifier.h>
28 #include <core/threading/mutex.h>
29 #include <core/threading/mutex_locker.h>
30 #include <core/utils/lock_hashmap.h>
31 #include <core/utils/lock_hashset.h>
32 #include <interface/interface.h>
33 #include <logging/liblogger.h>
34 
35 #include <algorithm>
36 #include <cstdlib>
37 #include <cstring>
38 #include <fnmatch.h>
39 #include <functional>
40 
41 namespace fawkes {
42 
43 /** @class BlackBoardNotifier <blackboard/internal/notifier.h>
44  * BlackBoard notifier.
45  * This class is used by the BlackBoard to notify listeners and observers
46  * of changes.
47  *
48  * @author Tim Niemueller
49  */
50 
51 /** Constructor. */
53 {
54  bbil_writer_events_ = 0;
55  bbil_writer_mutex_ = new Mutex();
56 
57  bbil_reader_events_ = 0;
58  bbil_reader_mutex_ = new Mutex();
59 
60  bbil_data_events_ = 0;
61  bbil_data_mutex_ = new Mutex();
62 
63  bbil_messages_events_ = 0;
64  bbil_messages_mutex_ = new Mutex();
65 
66  bbio_events_ = 0;
67  bbio_mutex_ = new Mutex();
68 }
69 
70 /** Destructor */
72 {
73  delete bbil_writer_mutex_;
74  delete bbil_reader_mutex_;
75  delete bbil_data_mutex_;
76  delete bbil_messages_mutex_;
77 
78  delete bbio_mutex_;
79 }
80 
81 /** Register BB event listener.
82  * @param listener BlackBoard event listener to register
83  * @param flag concatenation of flags denoting which queue entries should be
84  * processed
85  */
86 void
89 {
90  update_listener(listener, flag);
91 }
92 
93 /** Update BB event listener.
94  * @param listener BlackBoard event listener to update subscriptions of
95  * @param flag concatenation of flags denoting which queue entries should be
96  * processed
97  */
98 void
101 {
102  const BlackBoardInterfaceListener::InterfaceQueue &queue = listener->bbil_acquire_queue();
103 
104  BlackBoardInterfaceListener::InterfaceQueue::const_iterator i = queue.begin();
105 
106  for (i = queue.begin(); i != queue.end(); ++i) {
107  switch (i->type) {
109  if (flag & BlackBoard::BBIL_FLAG_DATA) {
110  proc_listener_maybe_queue(i->op,
111  i->interface,
112  listener,
113  bbil_data_mutex_,
114  bbil_data_events_,
115  bbil_data_,
116  bbil_data_queue_,
117  "data");
118  }
119  break;
121  if (flag & BlackBoard::BBIL_FLAG_MESSAGES) {
122  proc_listener_maybe_queue(i->op,
123  i->interface,
124  listener,
125  bbil_messages_mutex_,
126  bbil_messages_events_,
127  bbil_messages_,
128  bbil_messages_queue_,
129  "messages");
130  }
131  break;
133  if (flag & BlackBoard::BBIL_FLAG_READER) {
134  proc_listener_maybe_queue(i->op,
135  i->interface,
136  listener,
137  bbil_reader_mutex_,
138  bbil_reader_events_,
139  bbil_reader_,
140  bbil_reader_queue_,
141  "reader");
142  }
143  break;
145  if (flag & BlackBoard::BBIL_FLAG_WRITER) {
146  proc_listener_maybe_queue(i->op,
147  i->interface,
148  listener,
149  bbil_writer_mutex_,
150  bbil_writer_events_,
151  bbil_writer_,
152  bbil_writer_queue_,
153  "writer");
154  }
155  break;
156  default: break;
157  }
158  }
159 
160  listener->bbil_release_queue(flag);
161 }
162 
163 void
164 BlackBoardNotifier::proc_listener_maybe_queue(bool op,
165  Interface * interface,
166  BlackBoardInterfaceListener *listener,
167  Mutex * mutex,
168  unsigned int & events,
169  BBilMap & map,
170  BBilQueue & queue,
171  const char * hint)
172 {
173  MutexLocker lock(mutex);
174  if (events > 0) {
175  LibLogger::log_warn("BlackBoardNotifier",
176  "%s interface "
177  "listener %s for %s events (queued)",
178  op ? "Registering" : "Unregistering",
179  listener->bbil_name(),
180  hint);
181 
182  queue_listener(op, interface, listener, queue);
183  } else {
184  if (op) { // add
185  add_listener(interface, listener, map);
186  } else {
187  remove_listener(interface, listener, map);
188  }
189  }
190 }
191 
192 /** Unregister BB interface listener.
193  * This will remove the given BlackBoard interface listener from any
194  * event that it was previously registered for.
195  * @param listener BlackBoard event listener to remove
196  */
197 void
199 {
200  const BlackBoardInterfaceListener::InterfaceMaps maps = listener->bbil_acquire_maps();
201 
202  BlackBoardInterfaceListener::InterfaceMap::const_iterator i;
203  for (i = maps.data.begin(); i != maps.data.end(); ++i) {
204  proc_listener_maybe_queue(false,
205  i->second,
206  listener,
207  bbil_data_mutex_,
208  bbil_data_events_,
209  bbil_data_,
210  bbil_data_queue_,
211  "data");
212  }
213 
214  for (i = maps.messages.begin(); i != maps.messages.end(); ++i) {
215  proc_listener_maybe_queue(false,
216  i->second,
217  listener,
218  bbil_messages_mutex_,
219  bbil_messages_events_,
220  bbil_messages_,
221  bbil_messages_queue_,
222  "messages");
223  }
224 
225  for (i = maps.reader.begin(); i != maps.reader.end(); ++i) {
226  proc_listener_maybe_queue(false,
227  i->second,
228  listener,
229  bbil_reader_mutex_,
230  bbil_reader_events_,
231  bbil_reader_,
232  bbil_reader_queue_,
233  "reader");
234  }
235 
236  for (i = maps.writer.begin(); i != maps.writer.end(); ++i) {
237  proc_listener_maybe_queue(false,
238  i->second,
239  listener,
240  bbil_writer_mutex_,
241  bbil_writer_events_,
242  bbil_writer_,
243  bbil_writer_queue_,
244  "writer");
245  }
246 
247  listener->bbil_release_maps();
248 }
249 
250 /** Add listener for specified map.
251  * @param listener interface listener for events
252  * @param im map of interfaces to listen for
253  * @param ilmap internal map to add listener to
254  */
255 void
256 BlackBoardNotifier::add_listener(Interface * interface,
257  BlackBoardInterfaceListener *listener,
258  BBilMap & ilmap)
259 {
260  std::pair<BBilMap::iterator, BBilMap::iterator> ret = ilmap.equal_range(interface->uid());
261 
262  BBilMap::value_type v = std::make_pair(interface->uid(), listener);
263  BBilMap::iterator f = std::find(ret.first, ret.second, v);
264 
265  if (f == ret.second) {
266  ilmap.insert(std::make_pair(interface->uid(), listener));
267  }
268 }
269 
270 void
271 BlackBoardNotifier::remove_listener(Interface * interface,
272  BlackBoardInterfaceListener *listener,
273  BBilMap & ilmap)
274 {
275  std::pair<BBilMap::iterator, BBilMap::iterator> ret = ilmap.equal_range(interface->uid());
276  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
277  if (j->second == listener) {
278  ilmap.erase(j);
279  break;
280  }
281  }
282 }
283 
284 bool
285 BlackBoardNotifier::is_in_queue(bool op,
286  BBilQueue & queue,
287  const char * uid,
288  BlackBoardInterfaceListener *bbil)
289 {
290  BBilQueue::iterator q;
291  for (q = queue.begin(); q != queue.end(); ++q) {
292  if ((q->op == op) && (q->uid == uid) && (q->listener == bbil)) {
293  return true;
294  }
295  }
296  return false;
297 }
298 
299 void
300 BlackBoardNotifier::queue_listener(bool op,
301  Interface * interface,
302  BlackBoardInterfaceListener *listener,
303  BBilQueue & queue)
304 {
305  BBilQueueEntry qe = {op, interface->uid(), interface, listener};
306  queue.push_back(qe);
307 }
308 
309 /** Register BB interface observer.
310  * @param observer BlackBoard interface observer to register
311  */
312 void
314 {
315  bbio_mutex_->lock();
316  if (bbio_events_ > 0) {
317  bbio_queue_.push_back(std::make_pair(1, observer));
318  } else {
319  add_observer(observer, observer->bbio_get_observed_create(), bbio_created_);
320  add_observer(observer, observer->bbio_get_observed_destroy(), bbio_destroyed_);
321  }
322  bbio_mutex_->unlock();
323 }
324 
325 void
326 BlackBoardNotifier::add_observer(BlackBoardInterfaceObserver * observer,
328  BBioMap & bbiomap)
329 {
331  its->lock();
332  for (i = its->begin(); i != its->end(); ++i) {
333  bbiomap[i->first].push_back(make_pair(observer, i->second));
334  }
335  its->unlock();
336 }
337 
338 /** Remove observer from map.
339  * @param iomap interface observer map to remove the observer from
340  * @param observer observer to remove
341  */
342 void
343 BlackBoardNotifier::remove_observer(BBioMap &iomap, BlackBoardInterfaceObserver *observer)
344 {
345  BBioMapIterator i, tmp;
346 
347  i = iomap.begin();
348  while (i != iomap.end()) {
349  BBioListIterator j = i->second.begin();
350  while (j != i->second.end()) {
351  if (j->first == observer) {
352  j = i->second.erase(j);
353  } else {
354  ++j;
355  }
356  }
357  if (i->second.empty()) {
358  tmp = i;
359  ++i;
360  iomap.erase(tmp);
361  } else {
362  ++i;
363  }
364  }
365 }
366 
367 /** Unregister BB interface observer.
368  * This will remove the given BlackBoard event listener from any event that it was
369  * previously registered for.
370  * @param observer BlackBoard event listener to remove
371  */
372 void
374 {
375  MutexLocker lock(bbio_mutex_);
376  if (bbio_events_ > 0) {
377  BBioQueueEntry e = std::make_pair((unsigned int)0, observer);
378  BBioQueue::iterator re;
379  while ((re = find_if(bbio_queue_.begin(),
380  bbio_queue_.end(),
381  bind2nd(std::not_equal_to<BBioQueueEntry>(), e)))
382  != bbio_queue_.end()) {
383  // if there is an entry in the register queue, remove it!
384  if (re->second == observer) {
385  bbio_queue_.erase(re);
386  }
387  }
388  bbio_queue_.push_back(std::make_pair(0, observer));
389 
390  } else {
391  remove_observer(bbio_created_, observer);
392  remove_observer(bbio_destroyed_, observer);
393  }
394 }
395 
396 /** Notify that an interface has been created.
397  * @param type type of the interface
398  * @param id ID of the interface
399  */
400 void
401 BlackBoardNotifier::notify_of_interface_created(const char *type, const char *id) throw()
402 {
403  bbio_mutex_->lock();
404  bbio_events_ += 1;
405  bbio_mutex_->unlock();
406 
407  BBioMapIterator lhmi;
408  BBioListIterator i, l;
409  for (lhmi = bbio_created_.begin(); lhmi != bbio_created_.end(); ++lhmi) {
410  if (fnmatch(lhmi->first.c_str(), type, 0) != 0)
411  continue;
412 
413  BBioList &list = lhmi->second;
414  for (i = list.begin(); i != list.end(); ++i) {
415  BlackBoardInterfaceObserver *bbio = i->first;
416  for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
417  if (fnmatch(pi->c_str(), id, 0) == 0) {
418  bbio->bb_interface_created(type, id);
419  break;
420  }
421  }
422  }
423  }
424 
425  bbio_mutex_->lock();
426  bbio_events_ -= 1;
427  process_bbio_queue();
428  bbio_mutex_->unlock();
429 }
430 
431 /** Notify that an interface has been destroyed.
432  * @param type type of the interface
433  * @param id ID of the interface
434  */
435 void
436 BlackBoardNotifier::notify_of_interface_destroyed(const char *type, const char *id) throw()
437 {
438  bbio_mutex_->lock();
439  bbio_events_ += 1;
440  bbio_mutex_->unlock();
441 
442  BBioMapIterator lhmi;
443  BBioListIterator i, l;
444  for (lhmi = bbio_destroyed_.begin(); lhmi != bbio_destroyed_.end(); ++lhmi) {
445  if (fnmatch(lhmi->first.c_str(), type, 0) != 0)
446  continue;
447 
448  BBioList &list = (*lhmi).second;
449  for (i = list.begin(); i != list.end(); ++i) {
450  BlackBoardInterfaceObserver *bbio = i->first;
451  for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
452  if (fnmatch(pi->c_str(), id, 0) == 0) {
453  bbio->bb_interface_destroyed(type, id);
454  break;
455  }
456  }
457  }
458  }
459 
460  bbio_mutex_->lock();
461  bbio_events_ -= 1;
462  process_bbio_queue();
463  bbio_mutex_->unlock();
464 }
465 
466 void
467 BlackBoardNotifier::process_bbio_queue()
468 {
469  if (!bbio_queue_.empty()) {
470  if (bbio_events_ > 0) {
471  return;
472  } else {
473  while (!bbio_queue_.empty()) {
474  BBioQueueEntry &e = bbio_queue_.front();
475  if (e.first) { // register
476  add_observer(e.second, e.second->bbio_get_observed_create(), bbio_created_);
477  add_observer(e.second, e.second->bbio_get_observed_destroy(), bbio_destroyed_);
478  } else { // unregister
479  remove_observer(bbio_created_, e.second);
480  remove_observer(bbio_destroyed_, e.second);
481  }
482  bbio_queue_.pop_front();
483  }
484  }
485  }
486 }
487 
488 /** Notify that writer has been added.
489  * @param interface the interface for which the event happened. It is not necessarily the
490  * instance which caused the event, but it must have the same mem serial.
491  * @param event_instance_serial the instance serial of the interface that caused the event
492  * @see BlackBoardInterfaceListener::bb_interface_writer_added()
493  */
494 void
496  unsigned int event_instance_serial) throw()
497 {
498  bbil_writer_mutex_->lock();
499  bbil_writer_events_ += 1;
500  bbil_writer_mutex_->unlock();
501 
502  const char * uid = interface->uid();
503  std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_writer_.equal_range(uid);
504  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
505  BlackBoardInterfaceListener *bbil = j->second;
506  if (!is_in_queue(/* remove op*/ false, bbil_writer_queue_, uid, bbil)) {
507  Interface *bbil_iface = bbil->bbil_writer_interface(uid);
508  if (bbil_iface != NULL) {
509  bbil->bb_interface_writer_added(bbil_iface, event_instance_serial);
510  } else {
511  LibLogger::log_warn("BlackBoardNotifier",
512  "BBIL[%s] registered for writer events "
513  "(open) for '%s' but has no such interface",
514  bbil->bbil_name(),
515  uid);
516  }
517  }
518  }
519 
520  bbil_writer_mutex_->lock();
521  bbil_writer_events_ -= 1;
522  process_writer_queue();
523  bbil_writer_mutex_->unlock();
524 }
525 
526 /** Notify that writer has been removed.
527  * @param interface interface for which the writer has been removed
528  * @param event_instance_serial instance serial of the interface that caused the event
529  * @see BlackBoardInterfaceListener::bb_interface_writer_removed()
530  */
531 void
533  unsigned int event_instance_serial) throw()
534 {
535  bbil_writer_mutex_->lock();
536  bbil_writer_events_ += 1;
537  bbil_writer_mutex_->unlock();
538 
539  const char * uid = interface->uid();
540  std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_writer_.equal_range(uid);
541  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
542  BlackBoardInterfaceListener *bbil = j->second;
543  if (!is_in_queue(/* remove op*/ false, bbil_data_queue_, uid, bbil)) {
544  Interface *bbil_iface = bbil->bbil_writer_interface(uid);
545  if (bbil_iface != NULL) {
546  bbil->bb_interface_writer_removed(bbil_iface, event_instance_serial);
547  } else {
548  LibLogger::log_warn("BlackBoardNotifier",
549  "BBIL[%s] registered for writer events "
550  "(close) for '%s' but has no such interface",
551  bbil->bbil_name(),
552  uid);
553  }
554  }
555  }
556 
557  bbil_writer_mutex_->lock();
558  bbil_writer_events_ -= 1;
559  process_writer_queue();
560  bbil_writer_mutex_->unlock();
561 }
562 
563 void
564 BlackBoardNotifier::process_writer_queue()
565 {
566  if (!bbil_writer_queue_.empty()) {
567  if (bbil_writer_events_ > 0) {
568  return;
569  } else {
570  while (!bbil_writer_queue_.empty()) {
571  BBilQueueEntry &e = bbil_writer_queue_.front();
572  if (e.op) { // register
573  add_listener(e.interface, e.listener, bbil_writer_);
574  } else { // unregister
575  remove_listener(e.interface, e.listener, bbil_writer_);
576  }
577  bbil_writer_queue_.pop_front();
578  }
579  }
580  }
581 }
582 
583 /** Notify that reader has been added.
584  * @param interface interface for which the reader has been added
585  * @param event_instance_serial instance serial of the interface that caused the event
586  * @see BlackBoardInterfaceListener::bb_interface_reader_added()
587  */
588 void
590  unsigned int event_instance_serial) throw()
591 {
592  bbil_reader_mutex_->lock();
593  bbil_reader_events_ += 1;
594  bbil_reader_mutex_->unlock();
595 
596  const char * uid = interface->uid();
597  std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_reader_.equal_range(uid);
598  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
599  BlackBoardInterfaceListener *bbil = j->second;
600  if (!is_in_queue(/* remove op*/ false, bbil_reader_queue_, uid, bbil)) {
601  Interface *bbil_iface = bbil->bbil_reader_interface(uid);
602  if (bbil_iface != NULL) {
603  bbil->bb_interface_reader_added(bbil_iface, event_instance_serial);
604  } else {
605  LibLogger::log_warn("BlackBoardNotifier",
606  "BBIL[%s] registered for reader events "
607  "(open) for '%s' but has no such interface",
608  bbil->bbil_name(),
609  uid);
610  }
611  }
612  }
613 
614  bbil_reader_mutex_->lock();
615  bbil_reader_events_ -= 1;
616  process_reader_queue();
617  bbil_reader_mutex_->unlock();
618 }
619 
620 /** Notify that reader has been removed.
621  * @param interface interface for which the reader has been removed
622  * @param event_instance_serial instance serial of the interface that caused the event
623  * @see BlackBoardInterfaceListener::bb_interface_reader_removed()
624  */
625 void
627  unsigned int event_instance_serial) throw()
628 {
629  bbil_reader_mutex_->lock();
630  bbil_reader_events_ += 1;
631  bbil_reader_mutex_->unlock();
632 
633  const char * uid = interface->uid();
634  std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_reader_.equal_range(uid);
635  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
636  BlackBoardInterfaceListener *bbil = j->second;
637  if (!is_in_queue(/* remove op*/ false, bbil_data_queue_, uid, bbil)) {
638  Interface *bbil_iface = bbil->bbil_reader_interface(uid);
639  if (bbil_iface != NULL) {
640  bbil->bb_interface_reader_removed(bbil_iface, event_instance_serial);
641  } else {
642  LibLogger::log_warn("BlackBoardNotifier",
643  "BBIL[%s] registered for reader events "
644  "(close) for '%s' but has no such interface",
645  bbil->bbil_name(),
646  uid);
647  }
648  }
649  }
650 
651  bbil_reader_mutex_->lock();
652  bbil_reader_events_ -= 1;
653  process_reader_queue();
654  bbil_reader_mutex_->unlock();
655 }
656 
657 void
658 BlackBoardNotifier::process_reader_queue()
659 {
660  if (!bbil_reader_queue_.empty()) {
661  if (bbil_reader_events_ > 0) {
662  return;
663  } else {
664  while (!bbil_reader_queue_.empty()) {
665  BBilQueueEntry &e = bbil_reader_queue_.front();
666  if (e.op) { // register
667  add_listener(e.interface, e.listener, bbil_reader_);
668  } else { // unregister
669  remove_listener(e.interface, e.listener, bbil_reader_);
670  }
671  bbil_reader_queue_.pop_front();
672  }
673  }
674  }
675 }
676 
677 /** Notify of data change.
678  * Notify all subscribers of the given interface of a data change.
679  * This also influences logging and sending data over the network so it is
680  * mandatory to call this function! The interface base class write method does
681  * that for you.
682  * @param interface interface whose subscribers to notify
683  * @see Interface::write()
684  * @see BlackBoardInterfaceListener::bb_interface_data_changed()
685  */
686 void
688 {
689  bbil_data_mutex_->lock();
690  bbil_data_events_ += 1;
691  bbil_data_mutex_->unlock();
692 
693  const char * uid = interface->uid();
694  std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_data_.equal_range(uid);
695  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
696  BlackBoardInterfaceListener *bbil = j->second;
697  if (!is_in_queue(/* remove op*/ false, bbil_data_queue_, uid, bbil)) {
698  Interface *bbil_iface = bbil->bbil_data_interface(uid);
699  if (bbil_iface != NULL) {
700  bbil->bb_interface_data_changed(bbil_iface);
701  } else {
702  LibLogger::log_warn("BlackBoardNotifier",
703  "BBIL[%s] registered for data change events "
704  "for '%s' but has no such interface",
705  bbil->bbil_name(),
706  uid);
707  }
708  }
709  }
710 
711  bbil_data_mutex_->lock();
712  bbil_data_events_ -= 1;
713  if (!bbil_data_queue_.empty()) {
714  if (bbil_data_events_ == 0) {
715  while (!bbil_data_queue_.empty()) {
716  BBilQueueEntry &e = bbil_data_queue_.front();
717  if (e.op) { // register
718  add_listener(e.interface, e.listener, bbil_data_);
719  } else { // unregister
720  remove_listener(e.interface, e.listener, bbil_data_);
721  }
722  bbil_data_queue_.pop_front();
723  }
724  }
725  }
726  bbil_data_mutex_->unlock();
727 }
728 
729 /** Notify of message received
730  * Notify all subscribers of the given interface of an incoming message
731  * This also influences logging and sending data over the network so it is
732  * mandatory to call this function! The interface base class write method does
733  * that for you.
734  * @param interface interface whose subscribers to notify
735  * @param message message which is being received
736  * @return false if any listener returned false, true otherwise
737  * @see BlackBoardInterfaceListener::bb_interface_message_received()
738  */
739 bool
741 {
742  bbil_messages_mutex_->lock();
743  bbil_messages_events_ += 1;
744  bbil_messages_mutex_->unlock();
745 
746  bool enqueue = true;
747 
748  const char * uid = interface->uid();
749  std::pair<BBilMap::iterator, BBilMap::iterator> ret = bbil_messages_.equal_range(uid);
750  for (BBilMap::iterator j = ret.first; j != ret.second; ++j) {
751  BlackBoardInterfaceListener *bbil = j->second;
752  if (!is_in_queue(/* remove op*/ false, bbil_messages_queue_, uid, bbil)) {
753  Interface *bbil_iface = bbil->bbil_message_interface(uid);
754  if (bbil_iface != NULL) {
755  bool abort = !bbil->bb_interface_message_received(bbil_iface, message);
756  if (abort) {
757  enqueue = false;
758  break;
759  }
760  } else {
761  LibLogger::log_warn("BlackBoardNotifier",
762  "BBIL[%s] registered for message events "
763  "for '%s' but has no such interface",
764  bbil->bbil_name(),
765  uid);
766  }
767  }
768  }
769 
770  bbil_messages_mutex_->lock();
771  bbil_messages_events_ -= 1;
772  if (!bbil_messages_queue_.empty()) {
773  if (bbil_messages_events_ == 0) {
774  while (!bbil_messages_queue_.empty()) {
775  BBilQueueEntry &e = bbil_messages_queue_.front();
776  if (e.op) { // register
777  add_listener(e.interface, e.listener, bbil_messages_);
778  } else { // unregister
779  remove_listener(e.interface, e.listener, bbil_messages_);
780  }
781  bbil_messages_queue_.pop_front();
782  }
783  }
784  }
785  bbil_messages_mutex_->unlock();
786 
787  return enqueue;
788 }
789 
790 } // end namespace fawkes
BlackBoard interface listener.
Interface * bbil_writer_interface(const char *iuid)
Get interface instance for given UID.
@ MESSAGES
Message received event entry.
@ DATA
Data changed event entry.
Interface * bbil_reader_interface(const char *iuid)
Get interface instance for given UID.
virtual void bb_interface_writer_removed(Interface *interface, unsigned int instance_serial)
A writing instance has been closed for a watched interface.
virtual void bb_interface_reader_removed(Interface *interface, unsigned int instance_serial)
A reading instance has been closed for a watched interface.
Interface * bbil_data_interface(const char *iuid)
Get interface instance for given UID.
virtual void bb_interface_reader_added(Interface *interface, unsigned int instance_serial)
A reading instance has been opened for a watched interface.
virtual bool bb_interface_message_received(Interface *interface, Message *message)
BlackBoard message received notification.
const char * bbil_name() const
Get BBIL name.
std::list< QueueEntry > InterfaceQueue
Queue of additions/removal of interfaces.
virtual void bb_interface_writer_added(Interface *interface, unsigned int instance_serial)
A writing instance has been opened for a watched interface.
Interface * bbil_message_interface(const char *iuid)
Get interface instance for given UID.
virtual void bb_interface_data_changed(Interface *interface)
BlackBoard data changed notification.
BlackBoard interface observer.
ObservedInterfaceLockMap * bbio_get_observed_create()
Get interface creation type watch list.
ObservedInterfaceLockMap::iterator ObservedInterfaceLockMapIterator
Type for iterator of lockable interface type hash sets.
ObservedInterfaceLockMap * bbio_get_observed_destroy()
Get interface destriction type watch list.
virtual void bb_interface_created(const char *type, const char *id)
BlackBoard interface created notification.
virtual void bb_interface_destroyed(const char *type, const char *id)
BlackBoard interface destroyed notification.
BlackBoardNotifier()
Constructor.
Definition: notifier.cpp:52
void notify_of_reader_added(const Interface *interface, unsigned int event_instance_serial)
Notify that reader has been added.
Definition: notifier.cpp:589
void notify_of_data_change(const Interface *interface)
Notify of data change.
Definition: notifier.cpp:687
void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
Definition: notifier.cpp:198
void notify_of_writer_removed(const Interface *interface, unsigned int event_instance_serial)
Notify that writer has been removed.
Definition: notifier.cpp:532
void notify_of_writer_added(const Interface *interface, unsigned int event_instance_serial)
Notify that writer has been added.
Definition: notifier.cpp:495
void notify_of_interface_destroyed(const char *type, const char *id)
Notify that an interface has been destroyed.
Definition: notifier.cpp:436
virtual ~BlackBoardNotifier()
Destructor.
Definition: notifier.cpp:71
void notify_of_interface_created(const char *type, const char *id)
Notify that an interface has been created.
Definition: notifier.cpp:401
void unregister_observer(BlackBoardInterfaceObserver *observer)
Unregister BB interface observer.
Definition: notifier.cpp:373
void register_listener(BlackBoardInterfaceListener *listener, BlackBoard::ListenerRegisterFlag flag)
Register BB event listener.
Definition: notifier.cpp:87
void register_observer(BlackBoardInterfaceObserver *observer)
Register BB interface observer.
Definition: notifier.cpp:313
void notify_of_reader_removed(const Interface *interface, unsigned int event_instance_serial)
Notify that reader has been removed.
Definition: notifier.cpp:626
void update_listener(BlackBoardInterfaceListener *listener, BlackBoard::ListenerRegisterFlag flag)
Update BB event listener.
Definition: notifier.cpp:99
bool notify_of_message_received(const Interface *interface, Message *message)
Notify of message received Notify all subscribers of the given interface of an incoming message This ...
Definition: notifier.cpp:740
ListenerRegisterFlag
Flags to constrain listener registration/updates.
Definition: blackboard.h:87
@ BBIL_FLAG_READER
consider reader events
Definition: blackboard.h:90
@ BBIL_FLAG_DATA
consider data events
Definition: blackboard.h:88
@ BBIL_FLAG_WRITER
consider writer events
Definition: blackboard.h:91
@ BBIL_FLAG_MESSAGES
consider message received events
Definition: blackboard.h:89
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:79
const char * uid() const
Get unique identifier of interface.
Definition: interface.cpp:677
static void log_warn(const char *component, const char *format,...)
Log warning message.
Definition: liblogger.cpp:156
void lock() const
Lock list.
Definition: lock_map.h:91
void unlock() const
Unlock list.
Definition: lock_map.h:109
Base class for all messages passed through interfaces in Fawkes BlackBoard.
Definition: message.h:45
Mutex locking helper.
Definition: mutex_locker.h:34
Mutex mutual exclusion lock.
Definition: mutex.h:33
void lock()
Lock this mutex.
Definition: mutex.cpp:87
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
Fawkes library namespace.
Structure to hold maps for active subscriptions.
InterfaceMap writer
Writer event subscriptions.
InterfaceMap messages
Message received event subscriptions.
InterfaceMap data
Data event subscriptions.
InterfaceMap reader
Reader event subscriptions.