22 #include <gtest/gtest.h>
26 # include <pthread_np.h>
28 #include <core/threading/barrier.h>
29 #include <core/threading/mutex.h>
30 #include <core/threading/mutex_locker.h>
31 #include <core/threading/wait_condition.h>
32 #include <core/utils/refptr.h>
33 #include <libs/syncpoint/exceptions.h>
34 #include <libs/syncpoint/syncpoint.h>
35 #include <libs/syncpoint/syncpoint_manager.h>
36 #include <logging/cache.h>
37 #include <logging/multi.h>
103 logger_->add_logger(cache_logger_);
106 pthread_attr_init(&attrs);
114 pthread_attr_destroy(&attrs);
148 ASSERT_TRUE(*sp1 != NULL);
156 ASSERT_NE(*sp1, *sp2);
158 ASSERT_EQ(**sp1, **sp2);
163 ASSERT_LT(**sp1, **sp3);
164 ASSERT_FALSE(**sp3 < **sp1);
165 ASSERT_FALSE(**sp1 < **sp2);
166 ASSERT_FALSE(**sp2 < **sp1);
173 pair<set<RefPtr<SyncPoint>>::iterator,
bool> ret;
176 ret = sp_set.insert(sp1);
177 ASSERT_TRUE(ret.second);
178 ASSERT_EQ(sp1->get_identifier(), (*(ret.first))->get_identifier());
181 ret = sp_set.insert(sp3);
182 ASSERT_TRUE(ret.second);
183 ASSERT_EQ(sp3->get_identifier(), (*(ret.first))->get_identifier());
186 ret = sp_set.insert(sp1);
187 ASSERT_FALSE(ret.second);
188 ASSERT_EQ(sp1->get_identifier(), (*(ret.first))->get_identifier());
191 ret = sp_set.insert(sp2);
192 ASSERT_FALSE(ret.second);
193 ASSERT_EQ(sp2->get_identifier(), (*(ret.first))->get_identifier());
198 ASSERT_EQ(0u, manager->get_syncpoints().size());
199 manager->get_syncpoint(
"test",
"/test/1");
200 ASSERT_EQ(3u, manager->get_syncpoints().size());
203 manager->get_syncpoint(
"test2",
"/test/2");
204 ASSERT_EQ(4u, manager->get_syncpoints().size());
209 manager->get_syncpoint(
"test3",
"/test/1");
210 ASSERT_EQ(4u, manager->get_syncpoints().size());
222 ASSERT_NO_THROW(manager->get_syncpoint(
"component 1",
"/test"));
223 ASSERT_NO_THROW(manager->get_syncpoint(
"component 2",
"/test"));
224 ASSERT_NO_THROW(manager->get_syncpoint(
"component 3",
"/test"));
233 string comp =
"component";
234 string id =
"/test/sp1";
238 for (set<
RefPtr<SyncPoint>>::const_iterator sp_it = syncpoints.begin(); sp_it != syncpoints.end();
240 EXPECT_EQ(1, (*sp_it)->get_watchers().count(comp))
241 <<
"for component '" << comp <<
"' and SyncPoint '" << (*sp_it)->get_identifier() <<
"'";
243 manager->release_syncpoint(comp, sp);
244 for (set<
RefPtr<SyncPoint>>::const_iterator sp_it = syncpoints.begin(); sp_it != syncpoints.end();
246 EXPECT_EQ(0, (*sp_it)->get_watchers().count(comp))
247 <<
"for component '" << comp <<
"' and SyncPoint '" << (*sp_it)->get_identifier() <<
"'";
249 ASSERT_NO_THROW(manager->get_syncpoint(comp,
id));
260 EXPECT_NO_THROW(sp1 =
new SyncPoint(
"/", NULL));
267 ASSERT_THROW(invalid_sp = manager->get_syncpoint(
"",
"/test/sp1"),
271 ASSERT_THROW(invalid_sp = manager->get_syncpoint(
"waiter",
""),
273 ASSERT_THROW(invalid_sp = manager->get_syncpoint(
"waiter",
"invalid"),
279 string comp =
"component1";
280 string id =
"/test/sp1";
283 set<RefPtr<SyncPoint>>::iterator sp_test_it =
285 set<RefPtr<SyncPoint>>::iterator sp_root_it =
287 ASSERT_NE(syncpoints.end(), sp_test_it);
288 ASSERT_NE(syncpoints.end(), sp_root_it);
291 EXPECT_EQ(1, syncpoints.count(sp_test));
292 EXPECT_EQ(1, syncpoints.count(sp_root));
300 manager->release_syncpoint(comp, sp);
306 string comp =
"component1";
307 string sp1_id =
"/test/sp1";
308 string sp2_id =
"/test/sp2";
316 << comp <<
" is not registered for " << sp1->
get_identifier() <<
", but should be!";
318 << comp <<
" is not registered for " << sp2->
get_identifier() <<
", but should be!";
320 << comp <<
" is not registered for " << predecessor->
get_identifier() <<
", but should be!";
322 manager->release_syncpoint(comp, sp1);
325 << comp <<
" is not registered for " << predecessor->
get_identifier() <<
", but should be!";
328 enum ThreadStatus { PENDING, RUNNING, FINISHED };
344 string component =
"";
346 uint timeout_sec = 0;
348 uint timeout_nsec = 0;
368 const int wait_time_us = 1000;
369 for (uint i = 0; i < (sec * pow(10, 9) + nanosec) / (wait_time_us * pow(10, 3)); i++) {
373 usleep(wait_time_us);
383 if (params->
status == FINISHED) {
392 start_waiter_thread(
void *data)
407 params->
status = FINISHED;
423 pthread_create(&thread1, &attrs, start_waiter_thread, ¶ms);
424 wait_for_running(¶ms);
426 pthread_cancel(thread1);
427 pthread_join(thread1, NULL);
436 uint num_threads = 50;
437 pthread_t threads[num_threads];
439 string sp_identifier =
"/test/sp1";
440 for (uint i = 0; i < num_threads; i++) {
442 params[i]->
component =
"component " + to_string(i);
447 pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
449 ASSERT_LE(manager->get_syncpoints().size(), 3u);
452 for (uint i = 0; i < num_threads; i++) {
453 pthread_join(threads[i], NULL);
464 uint num_threads = 50;
465 uint num_wait_calls = 10;
466 pthread_t threads[num_threads];
468 string sp_identifier =
"/test/sp1";
469 for (uint i = 0; i < num_threads; i++) {
471 params[i]->
component =
"component " + to_string(i);
476 pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
478 ASSERT_LE(manager->get_syncpoints().size(), 3u);
481 for (uint i = 0; i < num_threads; i++) {
482 EXPECT_TRUE(wait_for_running(params[i]));
484 for (uint i = 0; i < num_threads; i++) {
485 pthread_cancel(threads[i]);
486 ASSERT_EQ(0, pthread_join(threads[i], NULL));
496 uint num_threads = 10;
497 uint num_wait_calls = 5;
498 pthread_t threads[num_threads];
500 string sp_identifier =
"/test/sp1";
501 for (uint i = 0; i < num_threads; i++) {
503 params[i]->
component =
"component " + to_string(i);
508 pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
512 for (uint i = 0; i < num_threads; i++) {
513 EXPECT_TRUE(wait_for_running(params[i]));
516 string component =
"emitter";
519 for (uint i = 0; i < num_wait_calls; i++) {
524 for (uint i = 0; i < num_threads; i++) {
525 ASSERT_TRUE(wait_for_finished(params[i]));
526 pthread_join(threads[i], NULL);
536 uint num_threads = 50;
537 pthread_t threads[num_threads];
539 for (uint i = 0; i < num_threads; i++) {
541 params[i]->
component =
"component " + to_string(i);
546 pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
549 for (uint i = 0; i < num_threads; i++) {
550 EXPECT_TRUE(wait_for_running(params[i]));
553 for (uint i = 0; i < num_threads; i++) {
554 EXPECT_EQ(RUNNING, params[i]->status);
555 pthread_cancel(threads[i]);
556 ASSERT_EQ(0, pthread_join(threads[i], NULL));
569 vector<string> identifiers = {
"/test/topic",
"/test",
"/",
"/other/topic"};
570 uint num_threads = identifiers.size();
571 pthread_t threads[num_threads];
573 for (uint i = 0; i < num_threads; i++) {
575 params[i]->
component =
"component " + to_string(i);
580 pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
583 for (uint i = 0; i < num_threads; i++) {
584 EXPECT_TRUE(wait_for_running(params[i]));
591 for (uint i = 0; i < num_threads - 1; i++) {
592 ASSERT_TRUE(wait_for_finished(params[i]));
593 pthread_join(threads[i], NULL);
598 pthread_t last_thread = threads[num_threads - 1];
599 EXPECT_FALSE(wait_for_finished(params[num_threads - 1], 0, pow(10, 6)));
600 pthread_cancel(last_thread);
601 ASSERT_EQ(0, pthread_join(last_thread, NULL));
607 string component =
"emitter";
616 string component =
"emitter";
624 start_barrier_emitter_thread(
void *data)
627 string component =
"emitter " + to_string(params->
thread_nr);
647 : identifier_(identifier), manager_(manager)
656 barrier_->unregister_emitter(identifier_);
657 manager_->release_syncpoint(identifier_, barrier_);
664 barrier_->emit(identifier_);
676 string barrier_id =
"/test/barrier";
678 const uint num_waiter_threads = 1;
679 const uint num_wait_calls = 1;
680 pthread_t waiter_threads[num_waiter_threads];
682 for (uint i = 0; i < num_waiter_threads; i++) {
684 params[i]->
type = SyncPoint::WAIT_FOR_ALL;
685 params[i]->
component =
"component " + to_string(i);
690 pthread_create(&waiter_threads[i], &attrs, start_waiter_thread, params[i]);
692 for (uint i = 0; i < num_waiter_threads; i++) {
693 ASSERT_TRUE(wait_for_finished(params[i]));
694 pthread_join(waiter_threads[i], NULL);
710 string barrier_id =
"/test/barrier";
711 Emitter em1(
"emitter 1", barrier_id, manager);
712 Emitter em2(
"emitter 2", barrier_id, manager);
716 const uint num_waiter_threads = 50;
717 const uint num_wait_calls = 1;
718 pthread_t waiter_threads[num_waiter_threads];
720 for (uint i = 0; i < num_waiter_threads; i++) {
722 params[i]->
component =
"component " + to_string(i);
723 params[i]->
type = SyncPoint::WAIT_FOR_ALL;
728 pthread_create(&waiter_threads[i], &attrs, start_waiter_thread, params[i]);
731 for (uint i = 0; i < num_waiter_threads; i++) {
732 EXPECT_TRUE(wait_for_running(params[i]));
737 for (uint i = 0; i < num_waiter_threads; i++) {
738 EXPECT_EQ(RUNNING, params[i]->status);
744 for (uint i = 0; i < num_waiter_threads; i++) {
745 ASSERT_TRUE(wait_for_finished(params[i]));
746 pthread_join(waiter_threads[i], NULL);
756 string barrier1_id =
"/test/barrier1";
757 string barrier2_id =
"/test/barrier2";
758 Emitter em1(
"em1", barrier1_id, manager);
759 Emitter em2(
"em2", barrier2_id, manager);
765 const uint num_waiter_threads = 50;
766 const uint num_wait_calls = 1;
767 pthread_t waiter_threads1[num_waiter_threads];
769 for (uint i = 0; i < num_waiter_threads; i++) {
771 params1[i]->
component =
"component " + to_string(i);
772 params1[i]->
type = SyncPoint::WAIT_FOR_ALL;
777 pthread_create(&waiter_threads1[i], &attrs, start_waiter_thread, params1[i]);
780 pthread_t waiter_threads2[num_waiter_threads];
782 for (uint i = 0; i < num_waiter_threads; i++) {
784 params2[i]->
component =
"component " + to_string(i);
785 params2[i]->
type = SyncPoint::WAIT_FOR_ALL;
787 params2[i]->
thread_nr = num_waiter_threads + i;
790 pthread_create(&waiter_threads2[i], &attrs, start_waiter_thread, params2[i]);
793 for (uint i = 0; i < num_waiter_threads; i++) {
794 EXPECT_TRUE(wait_for_running(params1[i]));
797 for (uint i = 0; i < num_waiter_threads; i++) {
798 EXPECT_TRUE(wait_for_running(params2[i]));
803 for (uint i = 0; i < num_waiter_threads; i++) {
804 ASSERT_TRUE(wait_for_finished(params1[i]));
805 pthread_join(waiter_threads1[i], NULL);
809 for (uint i = 0; i < num_waiter_threads; i++) {
810 EXPECT_EQ(RUNNING, params2[i]->status);
815 for (uint i = 0; i < num_waiter_threads; i++) {
816 ASSERT_TRUE(wait_for_finished(params2[i]));
817 pthread_join(waiter_threads2[i], NULL);
830 Emitter em1(
"emitter 1",
"/test/topic/b1", manager);
831 Emitter em2(
"emitter 2",
"/test/topic/b2", manager);
832 Emitter em3(
"emitter 3",
"/other/topic", manager);
834 vector<string> identifiers = {
"/test/topic",
"/test",
"/",
"/other/topic"};
835 uint num_threads = identifiers.size();
836 pthread_t threads[num_threads];
839 for (uint i = 0; i < num_threads; i++) {
841 params[i]->
component =
"component " + to_string(i);
842 params[i]->
type = SyncPoint::WAIT_FOR_ALL;
848 pthread_create(&threads[i], &attrs, start_waiter_thread, params[i]);
854 for (uint i = 0; i < num_threads; i++) {
855 EXPECT_TRUE(wait_for_running(params[i]));
859 for (uint i = 0; i < num_threads; i++) {
860 ASSERT_EQ(RUNNING, params[i]->status);
864 for (uint i = 0; i < num_threads - 2; i++) {
865 ASSERT_TRUE(wait_for_finished(params[i]));
866 pthread_join(threads[i], NULL);
870 for (uint i = num_threads - 2; i < num_threads; i++) {
871 EXPECT_EQ(RUNNING, params[i]->status);
872 pthread_cancel(threads[i]);
873 ASSERT_EQ(0, pthread_join(threads[i], NULL));
887 string id_sp1 =
"/test/sp1";
888 string id_sp2 =
"/test/sp2";
889 string id_sp_pred =
"/test";
890 string id_emitter =
"component_emitter";
891 string id_waiter1 =
"component_waiter1";
892 string id_waiter2 =
"component_waiter2";
893 string id_waiter3 =
"component_waiter_on_predecessor";
897 manager->get_syncpoint(id_waiter1, id_sp1);
898 manager->get_syncpoint(id_waiter2, id_sp2);
910 params1->
type = SyncPoint::WAIT_FOR_ALL;
917 params2->
type = SyncPoint::WAIT_FOR_ALL;
924 params3->
type = SyncPoint::WAIT_FOR_ALL;
929 pthread_create(&pthread1, &attrs, start_waiter_thread, params1);
931 pthread_create(&pthread2, &attrs, start_waiter_thread, params2);
933 pthread_create(&pthread3, &attrs, start_waiter_thread, params3);
934 EXPECT_TRUE(wait_for_running(params1));
935 EXPECT_TRUE(wait_for_running(params2));
936 EXPECT_TRUE(wait_for_running(params3));
938 sp1->
emit(id_emitter);
940 ASSERT_TRUE(wait_for_finished(params1));
941 ASSERT_FALSE(wait_for_finished(params2, 0, 10 * pow(10, 6)));
944 ASSERT_FALSE(wait_for_finished(params3, 0, 10 * pow(10, 6)));
945 sp2->
emit(id_emitter);
946 ASSERT_TRUE(wait_for_finished(params2, 0, 10 * pow(10, 6)));
947 ASSERT_TRUE(wait_for_finished(params3, 0, 10 * pow(10, 6)));
949 pthread_join(pthread1, NULL);
950 pthread_join(pthread2, NULL);
951 pthread_join(pthread3, NULL);
958 pthread_create(&pthread1, &attrs, start_waiter_thread, params1);
959 pthread_create(&pthread2, &attrs, start_waiter_thread, params2);
960 pthread_create(&pthread3, &attrs, start_waiter_thread, params3);
962 ASSERT_TRUE(wait_for_running(params1));
963 ASSERT_TRUE(wait_for_running(params3));
965 ASSERT_FALSE(wait_for_finished(params1, 0, 10 * pow(10, 6)));
966 ASSERT_TRUE(wait_for_finished(params2));
967 ASSERT_FALSE(wait_for_finished(params3, 0, 10 * pow(10, 6)));
969 sp1->
emit(id_emitter);
970 ASSERT_TRUE(wait_for_finished(params1));
971 ASSERT_TRUE(wait_for_finished(params3));
972 pthread_join(pthread1, NULL);
973 pthread_join(pthread2, NULL);
974 pthread_join(pthread3, NULL);
985 Emitter em1(
"em1",
"/barrier", manager);
987 Emitter em2(
"em2",
"/barrier", manager);
988 EXPECT_NO_THROW(em1.emit());
989 EXPECT_NO_THROW(em1.emit());
1008 params1->
type = SyncPoint::WAIT_FOR_ALL;
1013 pthread_create(&pthread1, &attrs, start_waiter_thread, params1);
1015 EXPECT_FALSE(wait_for_finished(params1, 0, 10 * pow(10, 6)));
1017 sp1->
emit(
"emitter");
1019 EXPECT_FALSE(wait_for_finished(params1, 0, 10 * pow(10, 6)));
1021 sp1->
emit(
"emitter");
1022 EXPECT_FALSE(wait_for_finished(params1, 0, 10 * pow(10, 6)));
1024 sp2->
emit(
"emitter");
1025 ASSERT_TRUE(wait_for_finished(params1));
1026 pthread_join(pthread1, NULL);
1035 manager->get_syncpoint(
"waiter",
"/test/sp1");
1040 params.
type = SyncPoint::WAIT_FOR_ALL;
1046 pthread_create(&thread, NULL, start_waiter_thread, ¶ms);
1047 ASSERT_TRUE(wait_for_finished(¶ms));
1049 ASSERT_GT(cache_logger_->get_messages().size(), 0);
1053 struct emitter_thread_data
1057 std::string sp_name;
1058 atomic<ThreadStatus> status;
1059 Mutex mutex_running;
1061 Mutex mutex_finished;
1068 call_emit(
void *data)
1070 emitter_thread_data *tdata = (emitter_thread_data *)data;
1071 tdata->status = RUNNING;
1072 tdata->mutex_running.lock();
1073 tdata->cond_running.wake_all();
1074 tdata->mutex_running.unlock();
1075 RefPtr<SyncPoint> sp = tdata->manager->get_syncpoint(tdata->name, tdata->sp_name);
1077 sp->
emit(tdata->name);
1078 tdata->status = FINISHED;
1079 tdata->mutex_finished.lock();
1080 tdata->cond_finished.wake_all();
1081 tdata->mutex_finished.unlock();
1092 emitter_thread_data *emitter_params =
new emitter_thread_data();
1093 emitter_params->manager = manager;
1094 emitter_params->name =
"emitter";
1095 emitter_params->sp_name =
"/test";
1096 pthread_create(&thread, NULL, call_emit, (
void *)emitter_params);
1098 emitter_params->mutex_running.lock();
1099 if (emitter_params->status != RUNNING) {
1100 ASSERT_TRUE(emitter_params->cond_running.reltimed_wait(1, 0));
1102 emitter_params->mutex_running.unlock();
1103 emitter_params->mutex_finished.lock();
1104 EXPECT_FALSE(emitter_params->cond_finished.reltimed_wait(0, 100000));
1105 emitter_params->mutex_finished.unlock();
1107 pthread_t waiter_thread;
1109 waiter_params.
manager = manager;
1113 pthread_create(&waiter_thread, NULL, start_waiter_thread, &waiter_params);
1115 emitter_params->mutex_finished.lock();
1116 ASSERT_TRUE(emitter_params->status == FINISHED
1117 || emitter_params->cond_finished.reltimed_wait(1, 0));
1118 emitter_params->mutex_finished.unlock();
1119 pthread_join(thread, NULL);
1120 pthread_join(waiter_thread, NULL);
1121 delete emitter_params;
1126 call_wait_for_all(
void *data)
1142 uint num_emitters = 100;
1143 pthread_t emitter_thread[num_emitters];
1144 emitter_thread_data *params[num_emitters];
1145 for (uint i = 0; i < num_emitters; i++) {
1146 params[i] =
new emitter_thread_data();
1147 params[i]->manager = manager;
1148 string emitter_name =
"emitter" + to_string(i);
1149 params[i]->name = emitter_name;
1150 params[i]->sp_name =
"/test";
1151 pthread_create(&emitter_thread[i], NULL, call_emit, (
void *)params[i]);
1154 for (uint i = 0; i < num_emitters; i++) {
1155 params[i]->mutex_running.lock();
1156 if (params[i]->status != RUNNING) {
1157 ASSERT_TRUE(params[i]->cond_running.reltimed_wait(1, 0));
1159 params[i]->mutex_running.unlock();
1162 pthread_t waiter_thread;
1170 pthread_create(&waiter_thread, &attrs, start_waiter_thread, &
thread_params);
1172 for (uint i = 0; i < num_emitters; i++) {
1173 params[i]->mutex_finished.lock();
1174 ASSERT_TRUE(params[i]->status == FINISHED || params[i]->cond_finished.reltimed_wait(1, 0));
1175 params[i]->mutex_finished.unlock();
1176 pthread_join(emitter_thread[i], NULL);
1181 pthread_join(waiter_thread, NULL);
1195 string sp_identifier =
"/test";
1197 manager->get_syncpoint(
"emitter2", sp_identifier);
1200 uint num_threads = 2;
1201 pthread_t threads[num_threads];
1203 for (uint i = 0; i < num_threads; i++) {
1204 params[i].
component =
"component " + to_string(i);
1206 params[i].
type = SyncPoint::WAIT_FOR_ALL;
1211 pthread_create(&threads[0], &attrs, start_waiter_thread, ¶ms[0]);
1212 ASSERT_FALSE(wait_for_finished(¶ms[0], 0, 10 * pow(10, 6)));
1213 sp->
emit(
"emitter1");
1214 ASSERT_FALSE(wait_for_finished(¶ms[0], 0, 10 * pow(10, 6)));
1215 pthread_create(&threads[1], &attrs, start_waiter_thread, ¶ms[1]);
1216 for (uint i = 0; i < num_threads; i++) {
1217 ASSERT_FALSE(wait_for_finished(¶ms[i], 0, 10 * pow(10, 6)));
1219 sp->
emit(
"emitter2");
1220 for (uint i = 0; i < num_threads; i++) {
1221 ASSERT_TRUE(wait_for_finished(¶ms[i]));
1222 pthread_join(threads[i], NULL);
1234 uint num_threads = 2;
1235 pthread_t threads[num_threads];
1236 string sp_identifier =
"/test";
1238 for (uint i = 0; i < num_threads; i++) {
1239 params[i].
component =
"component " + to_string(i);
1240 params[i].
type = SyncPoint::WAIT_FOR_ALL;
1248 pthread_create(&threads[0], &attrs, start_waiter_thread, ¶ms[0]);
1249 EXPECT_TRUE(wait_for_running(¶ms[0]));
1252 pthread_create(&threads[1], &attrs, start_waiter_thread, ¶ms[1]);
1253 for (uint i = 0; i < num_threads; i++) {
1254 EXPECT_TRUE(wait_for_running(¶ms[i]));
1256 wait_for_finished(¶ms[0], params[0].timeout_sec, params[0].timeout_nsec);
1257 wait_for_finished(¶ms[1], 0, pow(10, 6));
1258 for (uint i = 0; i < num_threads; i++) {
1259 pthread_join(threads[i], NULL);
1273 string sp_identifier =
"/test";
1274 uint num_threads = 2;
1276 pthread_t wait_for_one_thread;
1278 wait_for_one_params.
component =
"wait_for_one";
1279 wait_for_one_params.
type = SyncPoint::WAIT_FOR_ONE;
1280 wait_for_one_params.
manager = manager;
1285 wait_for_one_params.
status = PENDING;
1288 pthread_create(&wait_for_one_thread, &attrs, start_waiter_thread, &wait_for_one_params);
1289 pthread_t threads[num_threads];
1291 for (uint i = 0; i < num_threads; i++) {
1292 params[i].
component =
"component " + to_string(i);
1293 params[i].
type = SyncPoint::WAIT_FOR_ALL;
1301 pthread_create(&threads[i], &attrs, start_waiter_thread, ¶ms[i]);
1304 EXPECT_TRUE(wait_for_running(&wait_for_one_params));
1305 for (uint i = 0; i < num_threads; i++) {
1306 EXPECT_TRUE(wait_for_running(¶ms[i]));
1308 EXPECT_TRUE(wait_for_finished(&wait_for_one_params));
1309 for (uint i = 0; i < num_threads; i++) {
1310 EXPECT_EQ(RUNNING, params[i].status);
1312 for (uint i = 0; i < num_threads; i++) {
1313 EXPECT_TRUE(wait_for_finished(¶ms[i], params[i].timeout_sec, params[i].timeout_nsec));
1314 pthread_join(threads[i], NULL);
1316 pthread_join(wait_for_one_thread, NULL);
1322 pthread_t waiter_thread;
1330 pthread_create(&waiter_thread, &attrs, start_waiter_thread, &
thread_params);
1332 pthread_join(waiter_thread, NULL);
1355 pthread_t waiter_thread;
1364 pthread_create(&waiter_thread, &attrs, start_waiter_thread, &
thread_params);
1367 pthread_cancel(waiter_thread);
1368 pthread_join(waiter_thread, NULL);
1370 manager->release_syncpoint(
"component 1", sp);
1371 sp = manager->get_syncpoint(
"component 1",
"/test");
Helper class which registers and emits a given SyncBarrier.
virtual ~Emitter()
Destructor.
Emitter(string identifier, string syncbarrier, RefPtr< SyncPointManager > manager)
Constructor.
void emit()
emit the SyncBarrier
SyncBarrierTest()
Constructor.
Test class for SyncPointManager This class tests basic functionality of the SyncPointManager.
SyncPointManagerTest()
Initialize the test class.
virtual ~SyncPointManagerTest()
Deinitialize the test class.
RefPtr< SyncPointManager > manager
A Pointer to a SyncPointManager.
pthread_attr_t attrs
Thread attributes.
CacheLogger * cache_logger_
Cache Logger used for testing.
MultiLogger * logger_
Logger used to initialize SyncPoints.
Test class for SyncPoint This class tests basic functionality of SyncPoints.
MultiLogger * logger_
Logger for testing.
virtual void SetUp()
Initialize the test class.
RefPtr< SyncPoint > sp1
A syncpoint for testing.
RefPtr< SyncPoint > sp3
A syncpoint for testing.
RefPtr< SyncPoint > sp2
A syncpoint for testing.
virtual void TearDown()
Clean up.
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
virtual void wait()
Wait for other threads.
Log through multiple loggers.
Mutex mutual exclusion lock.
void lock()
Lock this mutex.
void unlock()
Unlock the mutex.
The component called release but is still registered as emitter.
Invalid component name used (i.e.
Invalid identifier used (i.e.
This class gives access to SyncPoints.
RefPtr< SyncPoint > get_syncpoint(const std::string &component, const std::string &identifier)
Get a SyncPoint.
A component called wait() but is already waiting.
Emit was called on a SyncBarrier but the calling component is not registered as emitter.
Compare sets of syncpoints.
virtual void wait(const std::string &component, WakeupType=WAIT_FOR_ONE, uint wait_sec=0, uint wait_nsec=0)
wait for the sync point to be emitted by any other component
std::multiset< std::string > get_emitters() const
virtual void reltime_wait_for_all(const std::string &component, uint wait_sec, uint wait_nsec)
Wait for all registered emitters for the given time.
virtual void unregister_emitter(const std::string &component, bool emit_if_pending=true)
unregister as emitter
std::set< std::string > get_watchers() const
virtual void register_emitter(const std::string &component)
register as emitter
bool watcher_is_waiting(std::string watcher, WakeupType type) const
Check if the given waiter is currently waiting with the given type.
virtual void emit(const std::string &component)
send a signal to all waiting threads
void lock_until_next_wait(const std::string &component)
Lock the SyncPoint for emitters until the specified component does the next wait() call.
std::string get_identifier() const
virtual void wait_for_all(const std::string &component)
Wait for all registered emitters.
WakeupType
Type to define when a thread wakes up after waiting for a SyncPoint.
Wait until a given condition holds.
void wake_all()
Wake up all waiting threads.
bool reltimed_wait(unsigned int sec, unsigned int nanosec)
Wait with relative timeout.
Fawkes library namespace.
The parameters passed to the threads.
struct used for multithreading tests
uint num_wait_calls
Number of wait calls the thread should make.
Mutex mutex_finished
Mutex to protect cond_finished.
Barrier * start_barrier
Barrier for startup synchronization.
WaitCondition cond_finished
WaitCondition to indicate that the thread has finished.
RefPtr< SyncPointManager > manager
SyncPointManager passed to the thread.
WaitCondition cond_running
WaitCondition to indicate that the thread is running.
uint timeout_nsec
timeout in nsec
SyncPoint::WakeupType type
Wait type.
uint timeout_sec
timeout in sec
uint thread_nr
Thread number.
string component
Name of the component.
atomic< ThreadStatus > status
current status of the thread
string sp_identifier
Name of the SyncPoint.
Mutex mutex_running
Mutex to protect cond_running.