35 #include <BESInternalError.h>
36 #include <BESSyntaxUserError.h>
37 #include <BESContextManager.h>
39 #include "xml2json/include/xml2json.hpp"
42 #include "xml2json/include/rapidjson/writer.h"
46 #include "CurlHandlePool.h"
47 #include "DmrppRequestHandler.h"
56 const std::string Chunk::tracking_context =
"cloudydap";
71 size_t chunk_write_data(
void *buffer,
size_t size,
size_t nmemb,
void *data)
73 size_t nbytes = size * nmemb;
79 string peek(
reinterpret_cast<const char *
>(buffer), 5);
80 if (peek ==
"<?xml") {
83 string xml_message =
reinterpret_cast<const char *
>(buffer);
84 xml_message.erase(xml_message.find_last_not_of(
"\t\n\v\f\r 0") + 1);
89 string json_message = xml2json(xml_message.c_str());
90 BESDEBUG(
"dmrpp",
"AWS S3 Access Error:" << json_message << endl);
91 VERBOSE(
"AWS S3 Access Error:" << json_message << endl);
94 d.Parse(json_message.c_str());
100 throw BESSyntaxUserError(
string(
"Error accessing object store data: ").append(s.GetString()), __FILE__, __LINE__);
107 catch(std::exception &e) {
108 BESDEBUG(
"dmrpp",
"AWS S3 Access Error:" << xml_message << endl);
109 VERBOSE(
"AWS S3 Access Error:" << xml_message << endl);
110 throw BESSyntaxUserError(
string(
"Error accessing object store data: Unrecognized error, likely an authentication failure."), __FILE__, __LINE__);
115 Chunk *c_ptr =
reinterpret_cast<Chunk*
>(data);
121 unsigned long long bytes_read = c_ptr->get_bytes_read();
128 if (nbytes <= 4096 && nbytes > c_ptr->get_rbuf_size()) {
130 c_ptr->set_rbuf(
new char[nbytes+2], nbytes+2);
134 assert(bytes_read + nbytes <= c_ptr->get_rbuf_size());
139 memcpy(c_ptr->get_rbuf() + bytes_read, buffer, nbytes);
141 c_ptr->set_bytes_read(bytes_read + nbytes);
156 void inflate(
char *dest,
unsigned int dest_len,
char *src,
unsigned int src_len)
161 assert(dest_len > 0);
168 memset(&z_strm, 0,
sizeof(z_strm));
169 z_strm.next_in = (Bytef *) src;
170 z_strm.avail_in = src_len;
171 z_strm.next_out = (Bytef *) dest;
172 z_strm.avail_out = dest_len;
175 if (Z_OK != inflateInit(&z_strm))
176 throw BESError(
"Failed to initialize inflate software.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
182 status = inflate(&z_strm, Z_SYNC_FLUSH);
185 if (Z_STREAM_END == status)
break;
188 if (Z_OK != status) {
189 (void) inflateEnd(&z_strm);
190 throw BESError(
"Failed to inflate data chunk.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
197 if (0 == z_strm.avail_out) {
198 throw BESError(
"Data buffer is not big enough for uncompressed data.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
205 if (NULL == (new_outbuf = H5MM_realloc(outbuf, nalloc))) {
206 (void) inflateEnd(&z_strm);
207 HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, 0,
"memory allocation failed for inflate decompression")
212 z_strm.next_out = (
unsigned char*) outbuf + z_strm.total_out;
213 z_strm.avail_out = (uInt) (nalloc - z_strm.total_out);
217 }
while (status == Z_OK);
220 (void) inflateEnd(&z_strm);
248 void unshuffle(
char *dest,
const char *src,
unsigned int src_size,
unsigned int width)
250 unsigned int elems = src_size / width;
253 if (!(width > 1 && elems > 1)) {
254 memcpy(dest,
const_cast<char*
>(src), src_size);
258 char *_src =
const_cast<char*
>(src);
262 for (
unsigned int i = 0; i < width; i++) {
274 size_t duffs_index = (elems + 7) / 8;
277 assert(0 &&
"This Should never be executed!");
282 #define DUFF_GUTS *_dest = *_src++; _dest += width;
299 }
while (--duffs_index > 0);
307 size_t leftover = src_size % width;
312 _dest -= (width - 1);
313 memcpy((
void*) _dest, (
void*) _src, leftover);
331 void Chunk::set_position_in_array(
const string &pia)
333 if (pia.empty())
return;
335 if (d_chunk_position_in_array.size()) d_chunk_position_in_array.clear();
339 if (pia.find(
'[') == string::npos || pia.find(
']') == string::npos || pia.length() < 3)
340 throw BESInternalError(
"while parsing a DMR++, chunk position string malformed", __FILE__, __LINE__);
342 if (pia.find_first_not_of(
"[]1234567890,") != string::npos)
343 throw BESInternalError(
"while parsing a DMR++, chunk position string illegal character(s)", __FILE__, __LINE__);
346 istringstream iss(pia.substr(1, pia.length()-2));
350 while (!iss.eof() ) {
353 d_chunk_position_in_array.push_back(i);
367 void Chunk::set_position_in_array(
const std::vector<unsigned int> &pia)
369 if (pia.size() == 0)
return;
371 if (d_chunk_position_in_array.size()) d_chunk_position_in_array.clear();
373 d_chunk_position_in_array = pia;
383 string Chunk::get_curl_range_arg_string()
386 range << d_offset <<
"-" << d_offset + d_size - 1;
401 void Chunk::add_tracking_query_param()
417 string aws_s3_url_https(
"https://s3.amazonaws.com/");
418 string aws_s3_url_http(
"http://s3.amazonaws.com/");
421 if (d_data_url.find(aws_s3_url_https) == 0 || d_data_url.find(aws_s3_url_http) == 0) {
424 string cloudydap_context_value = BESContextManager::TheManager()->
get_context(tracking_context, found);
426 d_query_marker.append(
"?").append(tracking_context).append(
"=").append(cloudydap_context_value);
443 void *inflate_chunk(
void *arg_list)
445 inflate_chunk_args *args =
reinterpret_cast<inflate_chunk_args*
>(arg_list);
448 args->chunk->inflate_chunk(args->deflate, args->shuffle, args->chunk_size, args->elem_width);
472 void Chunk::inflate_chunk(
bool deflate,
bool shuffle,
unsigned int chunk_size,
unsigned int elem_width)
488 chunk_size *= elem_width;
491 char *dest =
new char[chunk_size];
493 inflate(dest, chunk_size, get_rbuf(), get_rbuf_size());
495 set_rbuf(dest, chunk_size);
505 char *dest =
new char[get_rbuf_size()];
507 unshuffle(dest, get_rbuf(), get_rbuf_size(), elem_width);
508 set_rbuf(dest, get_rbuf_size());
516 d_is_inflated =
true;
520 unsigned long long chunk_buf_size = get_rbuf_size();
521 dods_float32 *vals = (dods_float32 *) get_rbuf();
523 (*os) << std::fixed << std::setfill(
'_') << std::setw(10) << std::setprecision(0);
524 (*os) <<
"DmrppArray::"<< __func__ <<
"() - Chunk[" << i <<
"]: " << endl;
525 for(
unsigned long long k=0; k< chunk_buf_size/prototype()->width(); k++) {
526 (*os) << vals[k] <<
", " << ((k==0)|((k+1)%10)?
"":
"\n");
541 void Chunk::read_chunk()
544 BESDEBUG(
"dmrpp",
"Chunk::"<< __func__ <<
"() - Already been read! Returning." << endl);
550 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(
this);
556 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
559 if (get_size() != get_bytes_read()) {
561 oss <<
"Wrong number of bytes read for chunk; read: " << get_bytes_read() <<
", expected: " << get_size();
578 void Chunk::dump(ostream &oss)
const
581 oss <<
"[ptr='" << (
void *)
this <<
"']";
582 oss <<
"[data_url='" << d_data_url <<
"']";
583 oss <<
"[offset=" << d_offset <<
"]";
584 oss <<
"[size=" << d_size <<
"]";
585 oss <<
"[chunk_position_in_array=(";
586 for (
unsigned long i = 0; i < d_chunk_position_in_array.size(); i++) {
588 oss << d_chunk_position_in_array[i];
591 oss <<
"[is_read=" << d_is_read <<
"]";
592 oss <<
"[is_inflated=" << d_is_inflated <<
"]";
595 string Chunk::to_string()
const
597 std::ostringstream oss;
virtual std::string get_context(const std::string &name, bool &found)
retrieve the value of the specified context from the BES
static std::ostream * GetStrm()
return the debug stream
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
Abstract exception class for the BES with basic string message.
exception thrown if internal error encountered
error thrown if there is a user syntax error in the request or any other user error
Bundle a libcurl easy handle to other information.
void read_data()
This is the read_data() method for serial transfers.
GenericValue< UTF8<> > Value
GenericValue with UTF8 encoding.
GenericDocument< UTF8<> > Document
GenericDocument with UTF8 encoding.