00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef HAVE_CONFIG_H
00018 # include <dtn-config.h>
00019 #endif
00020
00021 #include "naming/EndpointID.h"
00022 #include "storage/BundleStore.h"
00023 #include "bundling/BundleTimestamp.h"
00024 #include "bundling/BundleDaemon.h"
00025 #include "ProphetTimer.h"
00026 #include <oasys/util/Random.h>
00027
00028 #include "BundleRouter.h"
00029 #include "ProphetBundleCore.h"
00030
00031 #define LOG(_args...) print_log("core",LOG_DEBUG,_args);
00032
00033 #define TEST_MODE_QUOTA 0xffffffff
00034
00035 namespace dtn
00036 {
00037
00038 ProphetBundleCore::ProphetBundleCore(const std::string& local_eid,
00039 BundleActions* actions,
00040 oasys::SpinLock* lock)
00041 : oasys::Logger("ProphetBundleCore","/dtn/route/prophet"),
00042 actions_(actions),
00043 bundles_(this),
00044 local_eid_(local_eid),
00045 lock_(lock),
00046 test_mode_(false)
00047 {
00048 ASSERT(actions_ != NULL);
00049 oasys::Random::seed(time(0));
00050 }
00051
00052 ProphetBundleCore::ProphetBundleCore(oasys::Builder)
00053 : oasys::Logger("ProphetBundleCore","/dtn/route/prophet/test"),
00054 actions_(NULL),
00055 bundles_(this),
00056 lock_(NULL)
00057 {
00058 test_mode_ = true;
00059 }
00060
00061 void
00062 ProphetBundleCore::load_dtn_bundles(const BundleList* bundles)
00063 {
00064 ASSERT(bundles != NULL);
00065 oasys::ScopeLock l(bundles->lock(),"ProphetBundleCore::constructor");
00066 for (BundleList::iterator i = bundles->begin();
00067 i != bundles->end(); i++)
00068 {
00069 BundleRef ref("ProphetBundleCore");
00070 ref = *i;
00071 add(ref);
00072 }
00073 LOG("added %zu bundles to core",bundles->size());
00074 }
00075
00076 ProphetBundleCore::~ProphetBundleCore()
00077 {
00078 }
00079
00080 bool
00081 ProphetBundleCore::should_fwd(const prophet::Bundle* b,
00082 const prophet::Link* link) const
00083 {
00084
00085 BundleRef bundle("should_fwd");
00086 bundle = bundles_.find_ref(b).object();
00087 LinkRef nexthop("should_fwd");
00088 nexthop = links_.find_ref(link).object();
00089 ProphetBundleCore* me = const_cast<ProphetBundleCore*>(this);
00090
00091 if (bundle.object() != NULL && nexthop.object() != NULL)
00092 {
00093 if (test_mode_) return true;
00094 BundleRouter* router = BundleDaemon::instance()->router();
00095 bool ok = false;
00096 if (router != NULL)
00097 {
00098 ok = router->should_fwd(bundle.object(),nexthop);
00099 if (me != NULL)
00100 me->print_log("core",LOG_DEBUG,
00101 "BundleRouter says%sok to fwd *%p",
00102 ok ? " " : " not ", bundle.object());
00103 }
00104 return ok;
00105 }
00106 else
00107 {
00108 if (me != NULL)
00109 me->print_log("core",LOG_DEBUG,
00110 "failed to convert prophet handle to DTN object");
00111 }
00112 return false;
00113 }
00114
00115 bool
00116 ProphetBundleCore::is_route(const std::string& dest_id,
00117 const std::string& route) const
00118 {
00119 EndpointIDPattern routeid(get_route_pattern(route));
00120 return routeid.match(EndpointID(dest_id));
00121 }
00122
00123 std::string
00124 ProphetBundleCore::get_route_pattern(const std::string& dest_id) const
00125 {
00126 std::string routeid = get_route(dest_id);
00127 EndpointIDPattern route(routeid);
00128 if (!route.append_service_wildcard())
00129 return "";
00130 return route.str();
00131 }
00132
00133 std::string
00134 ProphetBundleCore::get_route(const std::string& dest_id) const
00135 {
00136 EndpointID eid(dest_id);
00137
00138 if (!eid.remove_service_tag())
00139 return "";
00140 return eid.str();
00141 }
00142
00143 u_int64_t
00144 ProphetBundleCore::max_bundle_quota() const
00145 {
00146 if (test_mode_) return TEST_MODE_QUOTA;
00147 return BundleStore::instance()->payload_quota();
00148 }
00149
00150 bool
00151 ProphetBundleCore::custody_accepted() const
00152 {
00153 if (test_mode_) return false;
00154 return BundleDaemon::params_.accept_custody_;
00155 }
00156
00157 void
00158 ProphetBundleCore::drop_bundle(const prophet::Bundle* bundle)
00159 {
00160 if (bundle == NULL)
00161 {
00162 LOG("drop_bundle(NULL)");
00163 return;
00164 }
00165 else
00166 LOG("drop_bundle(%d)",bundle->sequence_num());
00167
00168
00169 const Bundle* dtn_b = get_bundle(bundle);
00170 if (dtn_b == NULL)
00171 {
00172 log_err("drop_bundle: failed to convert prophet handle %p to "
00173 "dtn bundle", bundle);
00174 return;
00175 }
00176
00177
00178 Bundle* b = const_cast<Bundle*>(dtn_b);
00179
00180 if (b == NULL)
00181 {
00182 log_err("drop_bundle: const cast failed");
00183 return;
00184 }
00185
00186 log_debug("drop bundle *%p",b);
00187
00188 bundles_.del(bundle);
00189
00190 actions_->delete_bundle(b, BundleProtocol::REASON_NO_ADDTL_INFO);
00191 }
00192
00193 bool
00194 ProphetBundleCore::send_bundle(const prophet::Bundle* bundle,
00195 const prophet::Link* link)
00196 {
00197 LOG("send_bundle(%u)",bundle == NULL ? 0 : bundle->sequence_num());
00198 Link* l = const_cast<Link*>(get_link(link));
00199 if (l == NULL)
00200 {
00201 log_err("failed to convert prophet handle for link (%s)",
00202 link->remote_eid());
00203 return false;
00204 }
00205 Bundle* b = const_cast<Bundle*>(get_bundle(bundle));
00206 if (b == NULL)
00207 {
00208 log_err("failed to convert prophet handle for bundle (%s, %u)",
00209 bundle->destination_id().c_str(),bundle->sequence_num());
00210 return false;
00211 }
00212 LinkRef nexthop("send_bundle");
00213 nexthop = l;
00214
00215
00216 if (nexthop->isavailable() &&
00217 (!nexthop->isopen()) && (!nexthop->isopening()))
00218 {
00219 log_debug("opening *%p because a message is intended for it",
00220 nexthop.object());
00221 actions_->open_link(nexthop);
00222 }
00223
00224
00225 if (nexthop->isopen() && !nexthop->queue_is_full())
00226 {
00227 log_debug("send bundle *%p over link *%p", b, l);
00228 actions_->queue_bundle(b, nexthop,
00229 ForwardingInfo::COPY_ACTION,
00230 CustodyTimerSpec());
00231 return true;
00232 }
00233
00234
00235 if (!nexthop->isavailable())
00236 log_debug("can't forward *%p to *%p because link not available",
00237 b,nexthop.object());
00238 else if (!nexthop->isopen())
00239 log_debug("can't forward *%p to *%p because link not open",
00240 b,nexthop.object());
00241 else if (nexthop->queue_is_full())
00242 log_debug("can't forward *%p to *%p because link queue is full",
00243 b, nexthop.object());
00244 else
00245 log_debug("can't forward *%p to *%p", b,nexthop.object());
00246
00247 return false;
00248 }
00249
00250 bool
00251 ProphetBundleCore::write_bundle(const prophet::Bundle* bundle,
00252 const u_char* buffer, size_t len)
00253 {
00254 LOG("write_bundle(%u)",bundle == NULL ? 0 : bundle->sequence_num());
00255 Bundle* b = const_cast<Bundle*>(get_bundle(bundle));
00256 if (b != NULL)
00257 {
00258 b->mutable_payload()->append_data(buffer,len);
00259 return true;
00260 }
00261 return false;
00262 }
00263
00264 bool
00265 ProphetBundleCore::read_bundle(const prophet::Bundle* bundle,
00266 u_char* buffer, size_t& len)
00267 {
00268 LOG("read_bundle(%u)",bundle == NULL ? 0 : bundle->sequence_num());
00269 const Bundle* b = get_bundle(bundle);
00270 if (b != NULL)
00271 {
00272 size_t blen = b->payload().length();
00273 if (blen < len)
00274 return false;
00275 b->payload().read_data(0,blen,buffer);
00276 len = blen;
00277 return true;
00278 }
00279 return false;
00280 }
00281
00282 prophet::Bundle*
00283 ProphetBundleCore::create_bundle(const std::string& src,
00284 const std::string& dst, u_int expiration)
00285 {
00286 LOG("create_bundle");
00287 Bundle* b = new Bundle();
00288 b->mutable_source()->assign(src);
00289 b->mutable_dest()->assign(dst);
00290 b->mutable_replyto()->assign(EndpointID::NULL_EID());
00291 b->mutable_custodian()->assign(EndpointID::NULL_EID());
00292 b->set_expiration(expiration);
00293 BundleRef tmp("create_bundle");
00294 tmp = b;
00295 add(tmp);
00296 return const_cast<prophet::Bundle*>(get_bundle(b));
00297 }
00298
00299 const prophet::Bundle*
00300 ProphetBundleCore::find(const prophet::BundleList& list,
00301 const std::string& eid, u_int32_t creation_ts, u_int32_t seqno) const
00302 {
00303 for (prophet::BundleList::const_iterator i = list.begin();
00304 i != list.end(); i++)
00305 {
00306 if ((*i)->creation_ts() == creation_ts &&
00307 (*i)->sequence_num() == seqno &&
00308 is_route((*i)->destination_id(), eid)) return *i;
00309 }
00310 return NULL;
00311 }
00312
00313 void
00314 ProphetBundleCore::load_prophet_nodes(prophet::Table* nodes,
00315 prophet::ProphetParams* params)
00316 {
00317 ASSERTF(nodes_.empty(), "load_prophet_nodes called more than once");
00318
00319 prophet::Node* node = NULL;
00320 ProphetStore* prophet_store = ProphetStore::instance();
00321 ProphetStore::iterator* iter = prophet_store->new_iterator();
00322
00323 log_notice("loading prophet nodes from data store");
00324
00325 for (iter->begin(); iter->more(); iter->next())
00326 {
00327 node = prophet_store->get(iter->cur_val());
00328 if (node == NULL)
00329 {
00330 log_err("failed to deserialize Prophet route for %s",
00331 iter->cur_val().c_str());
00332 continue;
00333 }
00334 nodes_.load(node);
00335 }
00336 delete iter;
00337
00338
00339 nodes_.clone(nodes,params);
00340
00341
00342 nodes->age_nodes();
00343
00344
00345 nodes->truncate(params->epsilon());
00346 log_notice("prophet nodes loaded");
00347 }
00348
00349 void
00350 ProphetBundleCore::update_node(const prophet::Node* node)
00351 {
00352 LOG("update_node");
00353 nodes_.update(node);
00354 }
00355
00356 void
00357 ProphetBundleCore::delete_node(const prophet::Node* node)
00358 {
00359 LOG("delete_node");
00360 nodes_.del(node);
00361 }
00362
00363 std::string
00364 ProphetBundleCore::prophet_id(const prophet::Link* link) const
00365 {
00366 if (link == NULL) return "";
00367 std::string remote_eid(link->remote_eid());
00368 EndpointID eid(remote_eid.c_str());
00369 eid.append_service_tag("prophet");
00370 ASSERT( eid.str().find("prophet") != std::string::npos );
00371 return eid.str();
00372 }
00373
00374 prophet::Alarm*
00375 ProphetBundleCore::create_alarm(prophet::ExpirationHandler* handler,
00376 u_int timeout, bool jitter)
00377 {
00378 LOG("create_alarm (%u)",timeout);
00379 if (handler == NULL) return NULL;
00380 ProphetTimer* alarm = new ProphetTimer(handler,lock_);
00381 if (jitter)
00382 {
00383
00384
00385 u_int twelve_pct = timeout >> 3;
00386 u_int six_pct = timeout >> 4;
00387 u_int zero_to_twelve = oasys::Random::rand(twelve_pct);
00388 timeout = timeout - six_pct + zero_to_twelve;
00389 }
00390 alarm->schedule(timeout);
00391 log_debug("scheduling alarm %s for %u ms",handler->name(),timeout);
00392 return alarm;
00393 }
00394
00395 void
00396 ProphetBundleCore::print_log(const char* name,
00397 int level, const char* fmt, ...)
00398 {
00399
00400 oasys::log_level_t l_int = (oasys::log_level_t)level;
00401 oasys::log_level_t log_level =
00402 (l_int >= oasys::LOG_DEBUG && l_int <= oasys::LOG_ALWAYS) ?
00403 l_int : oasys::LOG_DEBUG;
00404
00405
00406 std::string prev(logpath_);
00407 if (name[0] != '/')
00408 logpathf("%s/%s",logpath_,name);
00409 else
00410 logpathf("%s%s",logpath_,name);
00411
00412
00413 va_list ap;
00414 va_start(ap, fmt);
00415 vlogf(log_level,fmt,ap);
00416 va_end(ap);
00417
00418
00419 set_logpath(prev.c_str());
00420 }
00421
00422 const Bundle*
00423 ProphetBundleCore::get_bundle(const prophet::Bundle* bundle)
00424 {
00425 LOG("get_bundle prophet -> DTN");
00426 if (bundle == NULL) return NULL;
00427 BundleRef tmp("get_bundle");
00428 tmp = bundles_.find_ref(bundle);
00429 return tmp.object();
00430 }
00431
00432 const prophet::Bundle*
00433 ProphetBundleCore::get_bundle(const Bundle* b)
00434 {
00435 LOG("get_bundle DTN -> prophet");
00436 if (b == NULL) return NULL;
00437 return bundles_.find(b);
00438 }
00439
00440 const prophet::Bundle*
00441 ProphetBundleCore::get_temp_bundle(const BundleRef& b)
00442 {
00443 LOG("get_temp_bundle DTN -> prophet");
00444 if (b.object() == NULL) return NULL;
00445 return new ProphetBundle(b);
00446 }
00447
00448 const Link*
00449 ProphetBundleCore::get_link(const prophet::Link* link)
00450 {
00451 LOG("get_link prophet -> DTN");
00452 if (link == NULL) return NULL;
00453 LinkRef tmp("get_link");
00454 tmp = links_.find_ref(link->remote_eid());
00455 return tmp.object();
00456 }
00457
00458 const prophet::Link*
00459 ProphetBundleCore::get_link(const Link* link)
00460 {
00461 LOG("get_link DTN -> prophet");
00462 Link* l = const_cast<Link*>(link);
00463 if (l == NULL) return NULL;
00464 return links_.find(l->remote_eid().c_str());
00465 }
00466
00467 void
00468 ProphetBundleCore::add(const BundleRef& b)
00469 {
00470 LOG("add(bundle)");
00471 bundles_.add(b);
00472 }
00473
00474 void
00475 ProphetBundleCore::del(const BundleRef& b)
00476 {
00477 LOG("del(bundle)");
00478 bundles_.del(b);
00479 }
00480
00481 void
00482 ProphetBundleCore::add(const LinkRef& link)
00483 {
00484 LOG("add(link)");
00485 links_.add(link);
00486 }
00487
00488 void
00489 ProphetBundleCore::del(const LinkRef& link)
00490 {
00491 LOG("del(link)");
00492 links_.del(link);
00493 }
00494
00495 };