00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef HAVE_CONFIG_H
00018 # include <dtn-config.h>
00019 #endif
00020
00021 #include <oasys/util/OptParser.h>
00022 #include "StreamConvergenceLayer.h"
00023 #include "bundling/BundleDaemon.h"
00024 #include "bundling/SDNV.h"
00025 #include "bundling/TempBundle.h"
00026 #include "contacts/ContactManager.h"
00027
00028 namespace dtn {
00029
00030
00031 StreamConvergenceLayer::StreamLinkParams::StreamLinkParams(bool init_defaults)
00032 : LinkParams(init_defaults),
00033 segment_ack_enabled_(true),
00034 negative_ack_enabled_(true),
00035 keepalive_interval_(10),
00036 segment_length_(4096)
00037 {
00038 }
00039
00040
00041 StreamConvergenceLayer::StreamConvergenceLayer(const char* logpath,
00042 const char* cl_name,
00043 u_int8_t cl_version)
00044 : ConnectionConvergenceLayer(logpath, cl_name),
00045 cl_version_(cl_version)
00046 {
00047 }
00048
00049
00050 bool
00051 StreamConvergenceLayer::parse_link_params(LinkParams* lparams,
00052 int argc, const char** argv,
00053 const char** invalidp)
00054 {
00055
00056
00057 StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(lparams);
00058 ASSERT(params != NULL);
00059
00060 oasys::OptParser p;
00061
00062 p.addopt(new oasys::BoolOpt("segment_ack_enabled",
00063 ¶ms->segment_ack_enabled_));
00064
00065 p.addopt(new oasys::BoolOpt("negative_ack_enabled",
00066 ¶ms->negative_ack_enabled_));
00067
00068 p.addopt(new oasys::UIntOpt("keepalive_interval",
00069 ¶ms->keepalive_interval_));
00070
00071 p.addopt(new oasys::UIntOpt("segment_length",
00072 ¶ms->segment_length_));
00073
00074 p.addopt(new oasys::UInt8Opt("cl_version",
00075 &cl_version_));
00076
00077 int count = p.parse_and_shift(argc, argv, invalidp);
00078 if (count == -1) {
00079 return false;
00080 }
00081 argc -= count;
00082
00083 return ConnectionConvergenceLayer::parse_link_params(lparams, argc, argv,
00084 invalidp);
00085 }
00086
00087
00088 bool
00089 StreamConvergenceLayer::finish_init_link(const LinkRef& link,
00090 LinkParams* lparams)
00091 {
00092 StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(lparams);
00093 ASSERT(params != NULL);
00094
00095
00096 if (params->segment_ack_enabled_) {
00097 link->set_reliable(true);
00098 }
00099
00100 return true;
00101 }
00102
00103
00104 void
00105 StreamConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf)
00106 {
00107 ASSERT(link != NULL);
00108 ASSERT(!link->isdeleted());
00109 ASSERT(link->cl_info() != NULL);
00110
00111 ConnectionConvergenceLayer::dump_link(link, buf);
00112
00113 StreamLinkParams* params =
00114 dynamic_cast<StreamLinkParams*>(link->cl_info());
00115 ASSERT(params != NULL);
00116
00117 buf->appendf("segment_ack_enabled: %u\n", params->segment_ack_enabled_);
00118 buf->appendf("negative_ack_enabled: %u\n", params->negative_ack_enabled_);
00119 buf->appendf("keepalive_interval: %u\n", params->keepalive_interval_);
00120 buf->appendf("segment_length: %u\n", params->segment_length_);
00121 }
00122
00123
00124 StreamConvergenceLayer::Connection::Connection(const char* classname,
00125 const char* logpath,
00126 StreamConvergenceLayer* cl,
00127 StreamLinkParams* params,
00128 bool active_connector)
00129 : CLConnection(classname, logpath, cl, params, active_connector),
00130 current_inflight_(NULL),
00131 send_segment_todo_(0),
00132 recv_segment_todo_(0),
00133 breaking_contact_(false),
00134 contact_initiated_(false)
00135 {
00136 }
00137
00138
00139 void
00140 StreamConvergenceLayer::Connection::initiate_contact()
00141 {
00142 log_debug("initiate_contact called");
00143
00144
00145 ContactHeader contacthdr;
00146 contacthdr.magic = htonl(MAGIC);
00147 contacthdr.version = ((StreamConvergenceLayer*)cl_)->cl_version_;
00148
00149 contacthdr.flags = 0;
00150
00151 StreamLinkParams* params = stream_lparams();
00152
00153 if (params->segment_ack_enabled_)
00154 contacthdr.flags |= SEGMENT_ACK_ENABLED;
00155
00156 if (params->reactive_frag_enabled_)
00157 contacthdr.flags |= REACTIVE_FRAG_ENABLED;
00158
00159 contacthdr.keepalive_interval = htons(params->keepalive_interval_);
00160
00161
00162 ASSERT(sendbuf_.fullbytes() == 0);
00163 if (sendbuf_.tailbytes() < sizeof(ContactHeader)) {
00164 log_warn("send buffer too short: %zu < needed %zu",
00165 sendbuf_.tailbytes(), sizeof(ContactHeader));
00166 sendbuf_.reserve(sizeof(ContactHeader));
00167 }
00168
00169 memcpy(sendbuf_.start(), &contacthdr, sizeof(ContactHeader));
00170 sendbuf_.fill(sizeof(ContactHeader));
00171
00172
00173 BundleDaemon* bd = BundleDaemon::instance();
00174 size_t local_eid_len = bd->local_eid().length();
00175 size_t sdnv_len = SDNV::encoding_len(local_eid_len);
00176
00177 if (sendbuf_.tailbytes() < sdnv_len + local_eid_len) {
00178 log_warn("send buffer too short: %zu < needed %zu",
00179 sendbuf_.tailbytes(), sdnv_len + local_eid_len);
00180 sendbuf_.reserve(sdnv_len + local_eid_len);
00181 }
00182
00183 sdnv_len = SDNV::encode(local_eid_len,
00184 (u_char*)sendbuf_.end(),
00185 sendbuf_.tailbytes());
00186 sendbuf_.fill(sdnv_len);
00187
00188 memcpy(sendbuf_.end(), bd->local_eid().data(), local_eid_len);
00189 sendbuf_.fill(local_eid_len);
00190
00191
00192 note_data_sent();
00193 send_data();
00194
00195
00196
00197
00198
00199
00200 ::gettimeofday(&data_rcvd_, 0);
00201 ::gettimeofday(&data_sent_, 0);
00202 ::gettimeofday(&keepalive_sent_, 0);
00203
00204
00205
00206
00207 contact_initiated_ = true;
00208 }
00209
00210
00211 void
00212 StreamConvergenceLayer::Connection::handle_contact_initiation()
00213 {
00214 ASSERT(! contact_up_);
00215
00216
00217
00218
00219 u_int32_t magic = 0;
00220 size_t len_needed = sizeof(magic);
00221 if (recvbuf_.fullbytes() < len_needed) {
00222 tooshort:
00223 log_debug("handle_contact_initiation: not enough data received "
00224 "(need > %zu, got %zu)",
00225 len_needed, recvbuf_.fullbytes());
00226 return;
00227 }
00228
00229 memcpy(&magic, recvbuf_.start(), sizeof(magic));
00230 magic = ntohl(magic);
00231
00232 if (magic != MAGIC) {
00233 log_warn("remote sent magic number 0x%.8x, expected 0x%.8x "
00234 "-- disconnecting.", magic, MAGIC);
00235 break_contact(ContactEvent::CL_ERROR);
00236 oasys::Breaker::break_here();
00237 return;
00238 }
00239
00240
00241
00242
00243 len_needed = sizeof(ContactHeader);
00244 if (recvbuf_.fullbytes() < len_needed) {
00245 goto tooshort;
00246 }
00247
00248
00249
00250
00251 u_int64_t peer_eid_len;
00252 int sdnv_len = SDNV::decode((u_char*)recvbuf_.start() +
00253 sizeof(ContactHeader),
00254 recvbuf_.fullbytes() -
00255 sizeof(ContactHeader),
00256 &peer_eid_len);
00257 if (sdnv_len < 0) {
00258 goto tooshort;
00259 }
00260
00261 len_needed = sizeof(ContactHeader) + sdnv_len + peer_eid_len;
00262 if (recvbuf_.fullbytes() < len_needed) {
00263 goto tooshort;
00264 }
00265
00266
00267
00268
00269 ContactHeader contacthdr;
00270 memcpy(&contacthdr, recvbuf_.start(), sizeof(ContactHeader));
00271
00272 contacthdr.magic = ntohl(contacthdr.magic);
00273 contacthdr.keepalive_interval = ntohs(contacthdr.keepalive_interval);
00274
00275 recvbuf_.consume(sizeof(ContactHeader));
00276
00277
00278
00279
00280
00281
00282
00283 u_int8_t cl_version = ((StreamConvergenceLayer*)cl_)->cl_version_;
00284 if (contacthdr.version < cl_version) {
00285 log_warn("remote sent version %d, expected version %d "
00286 "-- disconnecting.", contacthdr.version, cl_version);
00287 break_contact(ContactEvent::CL_VERSION);
00288 return;
00289 }
00290
00291
00292
00293
00294 StreamLinkParams* params = stream_lparams();
00295
00296 params->keepalive_interval_ =
00297 std::min(params->keepalive_interval_,
00298 (u_int)contacthdr.keepalive_interval);
00299
00300 params->segment_ack_enabled_ = params->segment_ack_enabled_ &&
00301 (contacthdr.flags & SEGMENT_ACK_ENABLED);
00302
00303 params->reactive_frag_enabled_ = params->reactive_frag_enabled_ &&
00304 (contacthdr.flags & REACTIVE_FRAG_ENABLED);
00305
00306 params->negative_ack_enabled_ = params->negative_ack_enabled_ &&
00307 (contacthdr.flags & NEGATIVE_ACK_ENABLED);
00308
00309
00310
00311
00312
00313 if (params->keepalive_interval_ != 0 &&
00314 (params->keepalive_interval_ * 1000) < params->data_timeout_)
00315 {
00316 poll_timeout_ = params->keepalive_interval_ * 1000;
00317 }
00318
00319
00320
00321
00322
00323 recvbuf_.consume(sdnv_len);
00324
00325
00326
00327
00328
00329
00330 EndpointID peer_eid;
00331 if (! peer_eid.assign(recvbuf_.start(), peer_eid_len)) {
00332 log_err("protocol error: invalid endpoint id '%s' (len %llu)",
00333 peer_eid.c_str(), U64FMT(peer_eid_len));
00334 break_contact(ContactEvent::CL_ERROR);
00335 return;
00336 }
00337
00338 if (!find_contact(peer_eid)) {
00339 ASSERT(contact_ == NULL);
00340 log_debug("StreamConvergenceLayer::Connection::"
00341 "handle_contact_initiation: failed to find contact");
00342 break_contact(ContactEvent::CL_ERROR);
00343 return;
00344 }
00345 recvbuf_.consume(peer_eid_len);
00346
00347
00348
00349
00350 LinkRef link = contact_->link();
00351 if (link->remote_eid().str() == EndpointID::NULL_EID().str()) {
00352 link->set_remote_eid(peer_eid);
00353 } else if (link->remote_eid() != peer_eid) {
00354 log_warn("handle_contact_initiation: remote eid mismatch: "
00355 "link remote eid was set to %s but peer eid is %s",
00356 link->remote_eid().c_str(), peer_eid.c_str());
00357 }
00358
00359
00360
00361
00362 contact_up();
00363 }
00364
00365
00366 void
00367 StreamConvergenceLayer::Connection::handle_bundles_queued()
00368 {
00369
00370
00371
00372
00373 log_debug("handle_bundles_queued: %u bundles on link queue",
00374 contact_->link()->bundles_queued());
00375 }
00376
00377
00378 bool
00379 StreamConvergenceLayer::Connection::send_pending_data()
00380 {
00381
00382
00383 if (sendbuf_.tailbytes() == 0) {
00384 return false;
00385 }
00386
00387
00388
00389
00390
00391 if (send_segment_todo_ != 0) {
00392 ASSERT(current_inflight_ != NULL);
00393 send_data_todo(current_inflight_);
00394 }
00395
00396
00397 if (contact_broken_ || (send_segment_todo_ != 0)) {
00398 if (params_->test_write_delay_ != 0) {
00399 return true;
00400 }
00401
00402 return false;
00403 }
00404
00405
00406
00407
00408
00409 bool sent_ack = send_pending_acks();
00410
00411
00412 if (contact_broken_)
00413 {
00414 return sent_ack;
00415 }
00416
00417
00418
00419 bool sent_data;
00420 if (current_inflight_ == NULL) {
00421 sent_data = start_next_bundle();
00422 } else {
00423
00424 sent_data = send_next_segment(current_inflight_);
00425 }
00426
00427 return sent_ack || sent_data;
00428 }
00429
00430
00431 bool
00432 StreamConvergenceLayer::Connection::send_pending_acks()
00433 {
00434 if (contact_broken_ || incoming_.empty()) {
00435 return false;
00436 }
00437 IncomingBundle* incoming = incoming_.front();
00438 DataBitmap::iterator iter = incoming->ack_data_.begin();
00439 bool generated_ack = false;
00440
00441
00442
00443
00444 if (iter == incoming->ack_data_.end() || incoming->rcvd_data_.empty()) {
00445 goto check_done;
00446 }
00447
00448
00449
00450
00451
00452 while (1) {
00453 size_t rcvd_bytes = incoming->rcvd_data_.num_contiguous();
00454 size_t ack_len = *iter + 1;
00455 size_t segment_len = ack_len - incoming->acked_length_;
00456 (void)segment_len;
00457
00458 if (ack_len > rcvd_bytes) {
00459 log_debug("send_pending_acks: "
00460 "waiting to send ack length %zu for %zu byte segment "
00461 "since only received %zu",
00462 ack_len, segment_len, rcvd_bytes);
00463 break;
00464 }
00465
00466
00467 size_t encoding_len = 1 + SDNV::encoding_len(ack_len);
00468 if (encoding_len > sendbuf_.tailbytes()) {
00469 log_debug("send_pending_acks: "
00470 "no space for ack in buffer (need %zu, have %zu)",
00471 encoding_len, sendbuf_.tailbytes());
00472 break;
00473 }
00474
00475 log_debug("send_pending_acks: "
00476 "sending ack length %zu for %zu byte segment "
00477 "[range %u..%u] ack_data *%p",
00478 ack_len, segment_len, incoming->acked_length_, *iter,
00479 &incoming->ack_data_);
00480
00481 *sendbuf_.end() = ACK_SEGMENT;
00482 int len = SDNV::encode(ack_len, (u_char*)sendbuf_.end() + 1,
00483 sendbuf_.tailbytes() - 1);
00484 ASSERT(encoding_len = len + 1);
00485 sendbuf_.fill(encoding_len);
00486
00487 generated_ack = true;
00488 incoming->acked_length_ = ack_len;
00489 incoming->ack_data_.clear(*iter);
00490 iter = incoming->ack_data_.begin();
00491
00492 if (iter == incoming->ack_data_.end()) {
00493
00494
00495 break;
00496 }
00497
00498 log_debug("send_pending_acks: "
00499 "found another segment (%u)", *iter);
00500 }
00501
00502 if (generated_ack) {
00503 send_data();
00504 note_data_sent();
00505 }
00506
00507
00508
00509
00510 check_done:
00511 if ((incoming->total_length_ != 0) &&
00512 (incoming->total_length_ == incoming->acked_length_))
00513 {
00514 log_debug("send_pending_acks: acked all %u bytes of bundle %d",
00515 incoming->total_length_, incoming->bundle_->bundleid());
00516
00517 incoming_.pop_front();
00518 delete incoming;
00519 }
00520 else
00521 {
00522 log_debug("send_pending_acks: "
00523 "still need to send acks -- acked_range %u",
00524 incoming->ack_data_.num_contiguous());
00525 }
00526
00527
00528 return generated_ack;
00529 }
00530
00531
00532 bool
00533 StreamConvergenceLayer::Connection::start_next_bundle()
00534 {
00535 ASSERT(current_inflight_ == NULL);
00536
00537 if (! contact_up_) {
00538 log_debug("start_next_bundle: contact not yet set up");
00539 return false;
00540 }
00541
00542 const LinkRef& link = contact_->link();
00543 BundleRef bundle("StreamCL::Connection::start_next_bundle");
00544
00545
00546
00547
00548 oasys::ScopeLock l(link->queue()->lock(),
00549 "StreamCL::Connection::start_next_bundle");
00550
00551 bundle = link->queue()->front();
00552 if (bundle == NULL) {
00553 log_debug("start_next_bundle: nothing to start");
00554 return false;
00555 }
00556
00557 InFlightBundle* inflight = new InFlightBundle(bundle.object());
00558 log_debug("trying to find xmit blocks for bundle id:%d on link %s",
00559 bundle->bundleid(), link->name());
00560 inflight->blocks_ = bundle->xmit_blocks()->find_blocks(contact_->link());
00561 ASSERT(inflight->blocks_ != NULL);
00562 inflight->total_length_ = BundleProtocol::total_length(inflight->blocks_);
00563 inflight_.push_back(inflight);
00564 current_inflight_ = inflight;
00565
00566 link->add_to_inflight(bundle, inflight->total_length_);
00567 link->del_from_queue(bundle, inflight->total_length_);
00568
00569
00570
00571 l.unlock();
00572
00573
00574 return send_next_segment(current_inflight_);
00575 }
00576
00577
00578 bool
00579 StreamConvergenceLayer::Connection::send_next_segment(InFlightBundle* inflight)
00580 {
00581 if (sendbuf_.tailbytes() == 0) {
00582 return false;
00583 }
00584
00585 ASSERT(send_segment_todo_ == 0);
00586
00587 StreamLinkParams* params = stream_lparams();
00588
00589 size_t bytes_sent = inflight->sent_data_.empty() ? 0 :
00590 inflight->sent_data_.last() + 1;
00591
00592 if (bytes_sent == inflight->total_length_) {
00593 log_debug("send_next_segment: "
00594 "already sent all %zu bytes, finishing bundle",
00595 bytes_sent);
00596 ASSERT(inflight->send_complete_);
00597 return finish_bundle(inflight);
00598 }
00599
00600 u_int8_t flags = 0;
00601 size_t segment_len;
00602
00603 if (bytes_sent == 0) {
00604 flags |= BUNDLE_START;
00605 }
00606
00607 if (params->segment_length_ >= inflight->total_length_ - bytes_sent) {
00608 flags |= BUNDLE_END;
00609 segment_len = inflight->total_length_ - bytes_sent;
00610 } else {
00611 segment_len = params->segment_length_;
00612 }
00613
00614 size_t sdnv_len = SDNV::encoding_len(segment_len);
00615
00616 if (sendbuf_.tailbytes() < 1 + sdnv_len) {
00617 log_debug("send_next_segment: "
00618 "not enough space for segment header [need %zu, have %zu]",
00619 1 + sdnv_len, sendbuf_.tailbytes());
00620 return false;
00621 }
00622
00623 log_debug("send_next_segment: "
00624 "starting %zu byte segment [block byte range %zu..%zu]",
00625 segment_len, bytes_sent, bytes_sent + segment_len);
00626
00627 u_char* bp = (u_char*)sendbuf_.end();
00628 *bp++ = DATA_SEGMENT | flags;
00629 int cc = SDNV::encode(segment_len, bp, sendbuf_.tailbytes() - 1);
00630 ASSERT(cc == (int)sdnv_len);
00631 bp += sdnv_len;
00632
00633 sendbuf_.fill(1 + sdnv_len);
00634 send_segment_todo_ = segment_len;
00635
00636
00637 return send_data_todo(inflight);
00638 }
00639
00640
00641 bool
00642 StreamConvergenceLayer::Connection::send_data_todo(InFlightBundle* inflight)
00643 {
00644 ASSERT(send_segment_todo_ != 0);
00645
00646
00647
00648 while (send_segment_todo_ != 0 && sendbuf_.tailbytes() != 0) {
00649 size_t bytes_sent = inflight->sent_data_.empty() ? 0 :
00650 inflight->sent_data_.last() + 1;
00651 size_t send_len = std::min(send_segment_todo_, sendbuf_.tailbytes());
00652
00653 Bundle* bundle = inflight->bundle_.object();
00654 BlockInfoVec* blocks = inflight->blocks_;
00655
00656 size_t ret =
00657 BundleProtocol::produce(bundle, blocks, (u_char*)sendbuf_.end(),
00658 bytes_sent, send_len,
00659 &inflight->send_complete_);
00660 ASSERT(ret == send_len);
00661 sendbuf_.fill(send_len);
00662 inflight->sent_data_.set(bytes_sent, send_len);
00663
00664 log_debug("send_data_todo: "
00665 "sent %zu/%zu of current segment from block offset %zu "
00666 "(%zu todo), updated sent_data *%p",
00667 send_len, send_segment_todo_, bytes_sent,
00668 send_segment_todo_ - send_len, &inflight->sent_data_);
00669
00670 send_segment_todo_ -= send_len;
00671
00672 note_data_sent();
00673 send_data();
00674
00675
00676
00677
00678
00679
00680
00681
00682
00683 if (contact_broken_)
00684 return true;
00685
00686
00687
00688 if (params_->test_write_delay_ != 0) {
00689 log_debug("send_data_todo done, returning more to send "
00690 "(send_segment_todo_==%zu) since test_write_delay is non-zero",
00691 send_segment_todo_);
00692 return true;
00693 }
00694 }
00695
00696 return (send_segment_todo_ == 0);
00697 }
00698
00699
00700 bool
00701 StreamConvergenceLayer::Connection::finish_bundle(InFlightBundle* inflight)
00702 {
00703 ASSERT(inflight->send_complete_);
00704
00705 ASSERT(current_inflight_ == inflight);
00706 current_inflight_ = NULL;
00707
00708 check_completed(inflight);
00709
00710 return true;
00711 }
00712
00713
00714 void
00715 StreamConvergenceLayer::Connection::check_completed(InFlightBundle* inflight)
00716 {
00717
00718
00719
00720
00721
00722
00723 if (current_inflight_ == inflight) {
00724 log_debug("check_completed: bundle %d still waiting for finish_bundle",
00725 inflight->bundle_->bundleid());
00726 return;
00727 }
00728
00729 u_int32_t acked_len = inflight->ack_data_.num_contiguous();
00730 if (acked_len < inflight->total_length_) {
00731 log_debug("check_completed: bundle %d only acked %u/%u",
00732 inflight->bundle_->bundleid(),
00733 acked_len, inflight->total_length_);
00734 return;
00735 }
00736
00737 log_debug("check_completed: bundle %d transmission complete",
00738 inflight->bundle_->bundleid());
00739 ASSERT(inflight == inflight_.front());
00740 inflight_.pop_front();
00741 delete inflight;
00742 }
00743
00744
00745 void
00746 StreamConvergenceLayer::Connection::send_keepalive()
00747 {
00748
00749
00750
00751
00752 if (sendbuf_.fullbytes() != 0) {
00753 log_debug("send_keepalive: "
00754 "send buffer has %zu bytes queued, suppressing keepalive",
00755 sendbuf_.fullbytes());
00756 return;
00757 }
00758 ASSERT(sendbuf_.tailbytes() > 0);
00759
00760
00761
00762
00763 ASSERT(send_segment_todo_ == 0);
00764
00765 ::gettimeofday(&keepalive_sent_, 0);
00766
00767 *(sendbuf_.end()) = KEEPALIVE;
00768 sendbuf_.fill(1);
00769
00770
00771
00772 send_data();
00773 }
00774
00775 void
00776 StreamConvergenceLayer::Connection::handle_cancel_bundle(Bundle* bundle)
00777 {
00778
00779
00780
00781
00782 InFlightList::iterator iter;
00783 for (iter = inflight_.begin(); iter != inflight_.end(); ++iter) {
00784 InFlightBundle* inflight = *iter;
00785 if (inflight->bundle_ == bundle)
00786 {
00787 if (inflight->sent_data_.empty()) {
00788
00789
00790
00791 if (inflight == current_inflight_) {
00792
00793
00794
00795 if (send_segment_todo_ != 0) {
00796 log_debug("handle_cancel_bundle: bundle %d "
00797 "already in flight, can't cancel send",
00798 bundle->bundleid());
00799 return;
00800 }
00801 current_inflight_ = NULL;
00802 }
00803
00804 log_debug("handle_cancel_bundle: "
00805 "bundle %d not yet in flight, cancelling send",
00806 bundle->bundleid());
00807 inflight_.erase(iter);
00808 delete inflight;
00809 BundleDaemon::post(
00810 new BundleSendCancelledEvent(bundle, contact_->link()));
00811 return;
00812 } else {
00813 log_debug("handle_cancel_bundle: "
00814 "bundle %d already in flight, can't cancel send",
00815 bundle->bundleid());
00816 return;
00817 }
00818 }
00819 }
00820
00821 log_warn("handle_cancel_bundle: "
00822 "can't find bundle %d in the in flight list", bundle->bundleid());
00823 }
00824
00825
00826 void
00827 StreamConvergenceLayer::Connection::handle_poll_timeout()
00828 {
00829
00830
00831
00832
00833
00834
00835
00836 if (BundleDaemon::shutting_down())
00837 {
00838 sleep(1);
00839 return;
00840 }
00841
00842
00843
00844 if (!contact_initiated_)
00845 {
00846 return;
00847 }
00848
00849 struct timeval now;
00850 u_int elapsed, elapsed2;
00851
00852 StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(params_);
00853 ASSERT(params != NULL);
00854
00855 ::gettimeofday(&now, 0);
00856
00857
00858
00859 elapsed = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
00860 if (elapsed > params->data_timeout_) {
00861 log_info("handle_poll_timeout: no data heard for %d msecs "
00862 "(keepalive_sent %u.%u, data_rcvd %u.%u, now %u.%u, poll_timeout %d) "
00863 "-- closing contact",
00864 elapsed,
00865 (u_int)keepalive_sent_.tv_sec,
00866 (u_int)keepalive_sent_.tv_usec,
00867 (u_int)data_rcvd_.tv_sec, (u_int)data_rcvd_.tv_usec,
00868 (u_int)now.tv_sec, (u_int)now.tv_usec,
00869 poll_timeout_);
00870
00871 break_contact(ContactEvent::BROKEN);
00872 return;
00873 }
00874
00875
00876 ContactManager* cm = BundleDaemon::instance()->contactmgr();
00877 oasys::ScopeLock l(cm->lock(),"StreamConvergenceLayer::Connection::handle_poll_timeout");
00878 if (contact_ == NULL)
00879 {
00880 return;
00881 }
00882
00883
00884
00885 if (contact_->link()->type() == Link::ONDEMAND) {
00886 u_int idle_close_time = contact_->link()->params().idle_close_time_;
00887
00888 elapsed = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
00889 elapsed2 = TIMEVAL_DIFF_MSEC(now, data_sent_);
00890
00891 if (idle_close_time != 0 &&
00892 (elapsed > idle_close_time * 1000) &&
00893 (elapsed2 > idle_close_time * 1000))
00894 {
00895 log_info("closing idle connection "
00896 "(no data received for %d msecs or sent for %d msecs)",
00897 elapsed, elapsed2);
00898 break_contact(ContactEvent::IDLE);
00899 return;
00900 } else {
00901 log_debug("connection not idle: recvd %d / sent %d <= timeout %d",
00902 elapsed, elapsed2, idle_close_time * 1000);
00903 }
00904 }
00905
00906
00907
00908
00909 check_keepalive();
00910 }
00911
00912
00913 void
00914 StreamConvergenceLayer::Connection::check_keepalive()
00915 {
00916 struct timeval now;
00917 u_int elapsed, elapsed2;
00918
00919 StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(params_);
00920 ASSERT(params != NULL);
00921
00922 ::gettimeofday(&now, 0);
00923
00924 if (params->keepalive_interval_ != 0) {
00925 elapsed = TIMEVAL_DIFF_MSEC(now, data_sent_);
00926 elapsed2 = TIMEVAL_DIFF_MSEC(now, keepalive_sent_);
00927
00928
00929
00930
00931
00932
00933
00934 if (std::min(elapsed, elapsed2) > ((params->keepalive_interval_ * 1000) - 500))
00935 {
00936
00937
00938
00939 if (send_segment_todo_ != 0) {
00940 log_debug("not issuing keepalive in the middle of a segment");
00941 return;
00942 }
00943
00944 send_keepalive();
00945 }
00946 }
00947 }
00948
00949
00950 void
00951 StreamConvergenceLayer::Connection::process_data()
00952 {
00953 if (recvbuf_.fullbytes() == 0) {
00954 return;
00955 }
00956
00957 log_debug("processing up to %zu bytes from receive buffer",
00958 recvbuf_.fullbytes());
00959
00960
00961
00962
00963 note_data_rcvd();
00964
00965
00966
00967
00968 if (! contact_up_) {
00969 handle_contact_initiation();
00970 return;
00971 }
00972
00973
00974
00975
00976
00977 if (recv_segment_todo_ != 0) {
00978 bool ok = handle_data_todo();
00979
00980 if (!ok) {
00981 return;
00982 }
00983 }
00984
00985
00986
00987
00988
00989
00990 while (recvbuf_.fullbytes() != 0) {
00991 if (contact_broken_) return;
00992
00993 u_int8_t type = *recvbuf_.start() & 0xf0;
00994 u_int8_t flags = *recvbuf_.start() & 0x0f;
00995
00996 log_debug("recvbuf has %zu full bytes, dispatching to handler routine",
00997 recvbuf_.fullbytes());
00998 bool ok;
00999 switch (type) {
01000 case DATA_SEGMENT:
01001 ok = handle_data_segment(flags);
01002 break;
01003 case ACK_SEGMENT:
01004 ok = handle_ack_segment(flags);
01005 break;
01006 case REFUSE_BUNDLE:
01007 ok = handle_refuse_bundle(flags);
01008 break;
01009 case KEEPALIVE:
01010 ok = handle_keepalive(flags);
01011 break;
01012 case SHUTDOWN:
01013 ok = handle_shutdown(flags);
01014 break;
01015 default:
01016 log_err("invalid CL message type code 0x%x (flags 0x%x)",
01017 type >> 4, flags);
01018 break_contact(ContactEvent::CL_ERROR);
01019 return;
01020 }
01021
01022
01023
01024 if (! ok) {
01025 if (recvbuf_.fullbytes() == recvbuf_.size()) {
01026 log_warn("process_data: "
01027 "%zu byte recv buffer full but too small for msg %u... "
01028 "doubling buffer size",
01029 recvbuf_.size(), type);
01030
01031 recvbuf_.reserve(recvbuf_.size() * 2);
01032
01033 } else if (recvbuf_.tailbytes() == 0) {
01034
01035 recvbuf_.reserve(recvbuf_.size() - recvbuf_.fullbytes());
01036 ASSERT(recvbuf_.tailbytes() != 0);
01037 }
01038
01039 return;
01040 }
01041 }
01042 }
01043
01044
01045 void
01046 StreamConvergenceLayer::Connection::note_data_rcvd()
01047 {
01048 log_debug("noting data_rcvd");
01049 ::gettimeofday(&data_rcvd_, 0);
01050 }
01051
01052
01053 void
01054 StreamConvergenceLayer::Connection::note_data_sent()
01055 {
01056 log_debug("noting data_sent");
01057 ::gettimeofday(&data_sent_, 0);
01058 }
01059
01060
01061 bool
01062 StreamConvergenceLayer::Connection::handle_data_segment(u_int8_t flags)
01063 {
01064 IncomingBundle* incoming = NULL;
01065 if (flags & BUNDLE_START)
01066 {
01067
01068
01069
01070
01071 bool create_new_incoming = true;
01072 if (!incoming_.empty()) {
01073 incoming = incoming_.back();
01074
01075 if (incoming->rcvd_data_.empty() &&
01076 incoming->ack_data_.empty())
01077 {
01078 log_debug("found empty incoming bundle for BUNDLE_START");
01079 create_new_incoming = false;
01080 }
01081 else if (incoming->total_length_ == 0)
01082 {
01083 log_err("protocol error: "
01084 "got BUNDLE_START before bundle completed");
01085 break_contact(ContactEvent::CL_ERROR);
01086 return false;
01087 }
01088 }
01089
01090 if (create_new_incoming) {
01091 log_debug("got BUNDLE_START segment, creating new IncomingBundle");
01092 IncomingBundle* incoming = new IncomingBundle(new Bundle());
01093 incoming_.push_back(incoming);
01094 }
01095 }
01096 else if (incoming_.empty())
01097 {
01098 log_err("protocol error: "
01099 "first data segment doesn't have BUNDLE_START flag set");
01100 break_contact(ContactEvent::CL_ERROR);
01101 return false;
01102 }
01103
01104
01105
01106
01107 incoming = incoming_.back();
01108 u_char* bp = (u_char*)recvbuf_.start();
01109
01110
01111 u_int32_t segment_len;
01112 int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1,
01113 &segment_len);
01114
01115 if (sdnv_len < 0) {
01116 log_debug("handle_data_segment: "
01117 "too few bytes in buffer for sdnv (%zu)",
01118 recvbuf_.fullbytes());
01119 return false;
01120 }
01121
01122 recvbuf_.consume(1 + sdnv_len);
01123
01124 if (segment_len == 0) {
01125 log_err("protocol error -- zero length segment");
01126 break_contact(ContactEvent::CL_ERROR);
01127 return false;
01128 }
01129
01130 size_t segment_offset = incoming->rcvd_data_.num_contiguous();
01131 log_debug("handle_data_segment: "
01132 "got segment of length %u at offset %zu ",
01133 segment_len, segment_offset);
01134
01135 incoming->ack_data_.set(segment_offset + segment_len - 1);
01136
01137 log_debug("handle_data_segment: "
01138 "updated ack_data (segment_offset %zu) *%p ack_data *%p",
01139 segment_offset, &incoming->rcvd_data_, &incoming->ack_data_);
01140
01141
01142
01143
01144
01145 if (flags & BUNDLE_END)
01146 {
01147 incoming->total_length_ = incoming->rcvd_data_.num_contiguous() +
01148 segment_len;
01149
01150 log_debug("got BUNDLE_END: total length %u",
01151 incoming->total_length_);
01152 }
01153
01154 recv_segment_todo_ = segment_len;
01155 return handle_data_todo();
01156 }
01157
01158
01159 bool
01160 StreamConvergenceLayer::Connection::handle_data_todo()
01161 {
01162
01163
01164 ASSERT(!incoming_.empty());
01165 ASSERT(recv_segment_todo_ != 0);
01166
01167
01168
01169
01170 IncomingBundle* incoming = incoming_.back();
01171 size_t rcvd_offset = incoming->rcvd_data_.num_contiguous();
01172 size_t rcvd_len = recvbuf_.fullbytes();
01173 size_t chunk_len = std::min(rcvd_len, recv_segment_todo_);
01174
01175 if (rcvd_len == 0) {
01176 return false;
01177 }
01178
01179 log_debug("handle_data_todo: "
01180 "reading todo segment %zu/%zu at offset %zu",
01181 chunk_len, recv_segment_todo_, rcvd_offset);
01182
01183 bool last;
01184 int cc = BundleProtocol::consume(incoming->bundle_.object(),
01185 (u_char*)recvbuf_.start(),
01186 chunk_len, &last);
01187 if (cc < 0) {
01188 log_err("protocol error parsing bundle data segment");
01189 break_contact(ContactEvent::CL_ERROR);
01190 return false;
01191 }
01192
01193 ASSERT(cc == (int)chunk_len);
01194
01195 recv_segment_todo_ -= chunk_len;
01196 recvbuf_.consume(chunk_len);
01197
01198 incoming->rcvd_data_.set(rcvd_offset, chunk_len);
01199
01200 log_debug("handle_data_todo: "
01201 "updated recv_data (rcvd_offset %zu) *%p ack_data *%p",
01202 rcvd_offset, &incoming->rcvd_data_, &incoming->ack_data_);
01203
01204 if (recv_segment_todo_ == 0) {
01205 check_completed(incoming);
01206 return true;
01207 }
01208
01209 return false;
01210 }
01211
01212
01213 void
01214 StreamConvergenceLayer::Connection::check_completed(IncomingBundle* incoming)
01215 {
01216 u_int32_t rcvd_len = incoming->rcvd_data_.num_contiguous();
01217
01218
01219
01220 if (incoming->total_length_ == 0) {
01221 return;
01222 }
01223
01224 u_int32_t formatted_len =
01225 BundleProtocol::total_length(&incoming->bundle_->recv_blocks());
01226
01227 log_debug("check_completed: rcvd %u / %u (formatted length %u)",
01228 rcvd_len, incoming->total_length_, formatted_len);
01229
01230 if (rcvd_len < incoming->total_length_) {
01231 return;
01232 }
01233
01234 if (rcvd_len > incoming->total_length_) {
01235 log_err("protocol error: received too much data -- "
01236 "got %u, total length %u",
01237 rcvd_len, incoming->total_length_);
01238
01239
01240
01241
01242 protocol_err:
01243 incoming->rcvd_data_.clear();
01244 break_contact(ContactEvent::CL_ERROR);
01245 return;
01246 }
01247
01248
01249
01250 if (incoming->total_length_ != formatted_len) {
01251 log_err("protocol error: CL total length %u "
01252 "doesn't match bundle protocol total %u",
01253 incoming->total_length_, formatted_len);
01254 goto protocol_err;
01255
01256 }
01257
01258 BundleDaemon::post(
01259 new BundleReceivedEvent(incoming->bundle_.object(),
01260 EVENTSRC_PEER,
01261 incoming->total_length_,
01262 contact_->link()->remote_eid(),
01263 contact_->link().object()));
01264 }
01265
01266
01267 bool
01268 StreamConvergenceLayer::Connection::handle_ack_segment(u_int8_t flags)
01269 {
01270 (void)flags;
01271 u_char* bp = (u_char*)recvbuf_.start();
01272 u_int32_t acked_len;
01273 int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1, &acked_len);
01274
01275 if (sdnv_len < 0) {
01276 log_debug("handle_ack_segment: too few bytes for sdnv (%zu)",
01277 recvbuf_.fullbytes());
01278 return false;
01279 }
01280
01281 recvbuf_.consume(1 + sdnv_len);
01282
01283 if (inflight_.empty()) {
01284 log_err("protocol error: got ack segment with no inflight bundle");
01285 break_contact(ContactEvent::CL_ERROR);
01286 return false;
01287 }
01288
01289 InFlightBundle* inflight = inflight_.front();
01290
01291 size_t ack_begin;
01292 DataBitmap::iterator i = inflight->ack_data_.begin();
01293 if (i == inflight->ack_data_.end()) {
01294 ack_begin = 0;
01295 } else {
01296 i.skip_contiguous();
01297 ack_begin = *i + 1;
01298 }
01299
01300 if (acked_len < ack_begin) {
01301 log_err("protocol error: got ack for length %u but already acked up to %zu",
01302 acked_len, ack_begin);
01303 break_contact(ContactEvent::CL_ERROR);
01304 return false;
01305 }
01306
01307 inflight->ack_data_.set(0, acked_len);
01308
01309
01310
01311
01312 if (acked_len == inflight->total_length_) {
01313 log_debug("handle_ack_segment: got final ack for %zu byte range -- "
01314 "acked_len %u, ack_data *%p",
01315 (size_t)acked_len - ack_begin,
01316 acked_len, &inflight->ack_data_);
01317
01318 inflight->transmit_event_posted_ = true;
01319
01320 BundleDaemon::post(
01321 new BundleTransmittedEvent(inflight->bundle_.object(),
01322 contact_,
01323 contact_->link(),
01324 inflight->sent_data_.num_contiguous(),
01325 inflight->ack_data_.num_contiguous()));
01326
01327
01328 check_completed(inflight);
01329
01330 } else {
01331 log_debug("handle_ack_segment: "
01332 "got acked_len %u (%zu byte range) -- ack_data *%p",
01333 acked_len, (size_t)acked_len - ack_begin, &inflight->ack_data_);
01334 }
01335
01336 return true;
01337 }
01338
01339
01340 bool
01341 StreamConvergenceLayer::Connection::handle_refuse_bundle(u_int8_t flags)
01342 {
01343 (void)flags;
01344 log_debug("got refuse_bundle message");
01345 log_err("REFUSE_BUNDLE not implemented");
01346 break_contact(ContactEvent::CL_ERROR);
01347 return true;
01348 }
01349
01350 bool
01351 StreamConvergenceLayer::Connection::handle_keepalive(u_int8_t flags)
01352 {
01353 (void)flags;
01354 log_debug("got keepalive message");
01355 recvbuf_.consume(1);
01356 return true;
01357 }
01358
01359
01360 void
01361 StreamConvergenceLayer::Connection::break_contact(ContactEvent::reason_t reason)
01362 {
01363
01364
01365
01366 if (breaking_contact_) {
01367 return;
01368 }
01369 breaking_contact_ = true;
01370
01371
01372
01373
01374 bool send_shutdown = false;
01375 shutdown_reason_t shutdown_reason = SHUTDOWN_NO_REASON;
01376
01377 switch (reason) {
01378 case ContactEvent::USER:
01379
01380 send_shutdown = true;
01381 shutdown_reason = SHUTDOWN_BUSY;
01382 break;
01383
01384 case ContactEvent::IDLE:
01385
01386 send_shutdown = true;
01387 shutdown_reason = SHUTDOWN_IDLE_TIMEOUT;
01388 break;
01389
01390 case ContactEvent::SHUTDOWN:
01391
01392
01393
01394 send_shutdown = true;
01395 break;
01396
01397 case ContactEvent::BROKEN:
01398 case ContactEvent::CL_ERROR:
01399
01400 send_shutdown = false;
01401 break;
01402
01403 case ContactEvent::CL_VERSION:
01404
01405 send_shutdown = true;
01406 shutdown_reason = SHUTDOWN_VERSION_MISMATCH;
01407 break;
01408
01409 case ContactEvent::INVALID:
01410 case ContactEvent::NO_INFO:
01411 case ContactEvent::RECONNECT:
01412 case ContactEvent::TIMEOUT:
01413 case ContactEvent::DISCOVERY:
01414 NOTREACHED;
01415 break;
01416 }
01417
01418
01419
01420
01421
01422
01423
01424 if (send_shutdown &&
01425 sendbuf_.fullbytes() == 0 &&
01426 send_segment_todo_ == 0)
01427 {
01428 log_debug("break_contact: sending shutdown");
01429 char typecode = SHUTDOWN;
01430 if (shutdown_reason != SHUTDOWN_NO_REASON) {
01431 typecode |= SHUTDOWN_HAS_REASON;
01432 }
01433
01434
01435
01436 *sendbuf_.end() = typecode;
01437 sendbuf_.fill(1);
01438
01439 if (shutdown_reason != SHUTDOWN_NO_REASON) {
01440 *sendbuf_.end() = shutdown_reason;
01441 sendbuf_.fill(1);
01442 }
01443
01444 send_data();
01445 }
01446
01447 CLConnection::break_contact(reason);
01448 }
01449
01450
01451 bool
01452 StreamConvergenceLayer::Connection::handle_shutdown(u_int8_t flags)
01453 {
01454 log_debug("got SHUTDOWN byte");
01455 size_t shutdown_len = 1;
01456
01457 if (flags & SHUTDOWN_HAS_REASON)
01458 {
01459 shutdown_len += 1;
01460 }
01461
01462 if (flags & SHUTDOWN_HAS_DELAY)
01463 {
01464 shutdown_len += 2;
01465 }
01466
01467 if (recvbuf_.fullbytes() < shutdown_len)
01468 {
01469
01470
01471 log_debug("got %zu/%zu bytes for shutdown data... waiting for more",
01472 recvbuf_.fullbytes(), shutdown_len);
01473 return false;
01474 }
01475
01476
01477 recvbuf_.consume(1);
01478
01479 shutdown_reason_t reason = SHUTDOWN_NO_REASON;
01480 if (flags & SHUTDOWN_HAS_REASON)
01481 {
01482 switch (*recvbuf_.start()) {
01483 case SHUTDOWN_NO_REASON:
01484 reason = SHUTDOWN_NO_REASON;
01485 break;
01486 case SHUTDOWN_IDLE_TIMEOUT:
01487 reason = SHUTDOWN_IDLE_TIMEOUT;
01488 break;
01489 case SHUTDOWN_VERSION_MISMATCH:
01490 reason = SHUTDOWN_VERSION_MISMATCH;
01491 break;
01492 case SHUTDOWN_BUSY:
01493 reason = SHUTDOWN_BUSY;
01494 break;
01495 default:
01496 log_err("invalid shutdown reason code 0x%x", *recvbuf_.start());
01497 }
01498
01499 recvbuf_.consume(1);
01500 }
01501
01502 u_int16_t delay = 0;
01503 if (flags & SHUTDOWN_HAS_DELAY)
01504 {
01505 memcpy(&delay, recvbuf_.start(), 2);
01506 delay = ntohs(delay);
01507 recvbuf_.consume(2);
01508 }
01509
01510 log_info("got SHUTDOWN (%s) [reconnect delay %u]",
01511 shutdown_reason_to_str(reason), delay);
01512
01513 break_contact(ContactEvent::SHUTDOWN);
01514
01515 return false;
01516 }
01517
01518 }