bes  Updated for version 3.20.6
GlobalMetadataStore.cc
1 // -*- mode: c++; c-basic-offset:4 -*-
2 
3 // This file is part of HYrax, A C++ implementation of the OPeNDAP Data
4 // Access Protocol.
5 
6 // Copyright (c) 2018 OPeNDAP, Inc.
7 // Author: James Gallagher <jgallagher@opendap.org>
8 //
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 //
23 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
24 
25 #include "config.h"
26 
27 #include <fcntl.h> // for posix_advise
28 #include <unistd.h>
29 
30 #include <cerrno>
31 #include <cstring>
32 
33 #include <iostream>
34 #include <string>
35 #include <fstream>
36 #include <sstream>
37 #include <functional>
38 #include <DAS.h>
39 #include <memory>
40 #include <sys/stat.h>
41 
42 #include <DapObj.h>
43 #include <DDS.h>
44 #include <DMR.h>
45 #include <D4ParserSax2.h>
46 #include <XMLWriter.h>
47 #include <BaseTypeFactory.h>
48 #include <D4BaseTypeFactory.h>
49 
50 #include "PicoSHA2/picosha2.h"
51 
52 #include "TempFile.h"
53 #include "TheBESKeys.h"
54 #include "BESUtil.h"
55 #include "BESLog.h"
56 #include "BESContextManager.h"
57 #include "BESDebug.h"
58 #include "BESRequestHandler.h"
59 #include "BESRequestHandlerList.h"
60 #include "BESNotFoundError.h"
61 
62 #include "BESInternalError.h"
63 #include "BESInternalFatalError.h"
64 
65 #include "GlobalMetadataStore.h"
66 
67 #define DEBUG_KEY "metadata_store"
68 #define MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED 0
69 
70 #ifdef HAVE_ATEXIT
71 #define AT_EXIT(x) atexit((x))
72 #else
73 #define AT_EXIT(x)
74 #endif
75 
85 #undef SYMETRIC_ADD_RESPONSES
86 
87 using namespace std;
88 using namespace libdap;
89 using namespace bes;
90 
91 static const unsigned int default_cache_size = 20; // 20 GB
92 static const string default_cache_prefix = "mds";
93 static const string default_cache_dir = ""; // I'm making the default empty so that no key == no caching. jhrg 9.26.16
94 static const string default_ledger_name = "mds_ledger.txt";
95 
96 static const string PATH_KEY = "DAP.GlobalMetadataStore.path";
97 static const string PREFIX_KEY = "DAP.GlobalMetadataStore.prefix";
98 static const string SIZE_KEY = "DAP.GlobalMetadataStore.size";
99 static const string LEDGER_KEY = "DAP.GlobalMetadataStore.ledger";
100 static const string LOCAL_TIME_KEY = "BES.LogTimeLocal";
101 
102 GlobalMetadataStore *GlobalMetadataStore::d_instance = 0;
103 bool GlobalMetadataStore::d_enabled = true;
104 
119 void GlobalMetadataStore::transfer_bytes(int fd, ostream &os)
120 {
121  static const int BUFFER_SIZE = 16*1024;
122 
123 #if _POSIX_C_SOURCE >= 200112L
124  /* Advise the kernel of our access pattern. */
125  int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
126  if (status != 0)
127  ERROR("Error calling posix_advise() in the GlobalMetadataStore: " << strerror(status) << endl);
128 #endif
129 
130  char buf[BUFFER_SIZE + 1];
131 
132  while(int bytes_read = read(fd, buf, BUFFER_SIZE))
133  {
134  if(bytes_read == -1)
135  throw BESInternalError("Could not read dds from the metadata store.", __FILE__, __LINE__);
136  if (!bytes_read)
137  break;
138 
139  os.write(buf, bytes_read);
140  }
141 }
142 
155 void GlobalMetadataStore::insert_xml_base(int fd, ostream &os, const string &xml_base)
156 {
157  static const int BUFFER_SIZE = 1024;
158 
159 #if _POSIX_C_SOURCE >= 200112L
160  /* Advise the kernel of our access pattern. */
161  int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
162  if (status != 0)
163  ERROR("Error calling posix_advise() in the GlobalMetadataStore: " << strerror(status) << endl);
164 #endif
165 
166  char buf[BUFFER_SIZE + 1];
167  size_t bytes_read = read(fd, buf, BUFFER_SIZE);
168 
169  if(bytes_read == (size_t)-1)
170  throw BESInternalError("Could not read dds from the metadata store.", __FILE__, __LINE__);
171 
172  if (bytes_read == 0)
173  return;
174 
175  // Every valid DMR/++ response in the MDS starts with:
176  // <?xml version="1.0" encoding="ISO‌-8859-1"?>
177  //
178  // and has one of two kinds of <Dataset...> tags
179  // 1: <Dataset xmlns="..." xml:base="file:DMR_1.xml" ... >
180  // 2: <Dataset xmlns="..." ... >
181  //
182  // Assume it is well formed and always includes the prolog,
183  // but might not use <CR> <CRLF> chars
184 
185  // transfer the prolog (<?xml version="1.0" encoding="ISO‌-8859-1"?>)
186  size_t i = 0;
187  while (buf[i++] != '>')
188  ; // 'i' now points one char past the xml prolog
189  os.write(buf, i);
190 
191  // transfer <Dataset ...> with new value for xml:base
192  size_t s = i; // start of <Dataset ...>
193  size_t j = 0;
194  char xml_base_literal[] = "xml:base";
195  while (i < bytes_read) {
196  if (buf[i] == '>') { // Found end of Dataset; no xml:base was present
197  os.write(buf + s, i - s);
198  os << " xml:base=\"" << xml_base << "\"";
199  break;
200  }
201  else if (j == sizeof(xml_base_literal) - 1) { // found 'xml:base' literal
202  os.write(buf + s, i - s); // This will include all of <Dataset... including 'xml:base'
203  while (buf[i++] != '=')
204  ; // read/discard '="..."'
205  while (buf[i++] != '"')
206  ;
207  while (buf[i++] != '"')
208  ;
209  os << "=\"" << xml_base << "\""; // write the new xml:base value
210  break;
211  }
212  else if (buf[i] == xml_base_literal[j]) {
213  ++j;
214  }
215  else {
216  j = 0;
217  }
218 
219  ++i;
220  }
221 
222  // transfer the rest
223  os.write(buf + i, bytes_read - i);
224 
225  // Now, if the response is more than 1k, use faster code to finish the tx
226  transfer_bytes(fd, os);
227 }
228 
229 unsigned long GlobalMetadataStore::get_cache_size_from_config()
230 {
231  bool found;
232  string size;
233  unsigned long size_in_megabytes = default_cache_size;
234  TheBESKeys::TheKeys()->get_value(SIZE_KEY, size, found);
235  if (found) {
236  BESDEBUG(DEBUG_KEY,
237  "GlobalMetadataStore::getCacheSizeFromConfig(): Located BES key " << SIZE_KEY << "=" << size << endl);
238  istringstream iss(size);
239  iss >> size_in_megabytes;
240  }
241 
242  return size_in_megabytes;
243 }
244 
245 string GlobalMetadataStore::get_cache_prefix_from_config()
246 {
247  bool found;
248  string prefix = default_cache_prefix;
249  TheBESKeys::TheKeys()->get_value(PREFIX_KEY, prefix, found);
250  if (found) {
251  BESDEBUG(DEBUG_KEY,
252  "GlobalMetadataStore::getCachePrefixFromConfig(): Located BES key " << PREFIX_KEY << "=" << prefix << endl);
253  prefix = BESUtil::lowercase(prefix);
254  }
255 
256  return prefix;
257 }
258 
259 // If the cache prefix is the empty string, the cache is turned off.
260 string GlobalMetadataStore::get_cache_dir_from_config()
261 {
262  bool found;
263 
264  string cacheDir = default_cache_dir;
265  TheBESKeys::TheKeys()->get_value(PATH_KEY, cacheDir, found);
266  if (found) {
267  BESDEBUG(DEBUG_KEY,
268  "GlobalMetadataStore::getCacheDirFromConfig(): Located BES key " << PATH_KEY<< "=" << cacheDir << endl);
269  }
270 
271  return cacheDir;
272 }
273 
288 
306 GlobalMetadataStore::get_instance(const string &cache_dir, const string &prefix, unsigned long long size)
307 {
308  if (d_enabled && d_instance == 0) {
309  d_instance = new GlobalMetadataStore(cache_dir, prefix, size); // never returns null_ptr
310  d_enabled = d_instance->cache_enabled();
311  if (!d_enabled) {
312  delete d_instance;
313  d_instance = 0;
314 
315  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is DISABLED"<< endl);
316  }
317  else {
318  AT_EXIT(delete_instance);
319 
320  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is ENABLED"<< endl);
321  }
322  }
323 
324  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::get_instance(dir,prefix,size) - d_instance: " << d_instance << endl);
325 
326  return d_instance;
327 }
328 
336 GlobalMetadataStore::get_instance()
337 {
338  if (d_enabled && d_instance == 0) {
339  d_instance = new GlobalMetadataStore(get_cache_dir_from_config(), get_cache_prefix_from_config(),
340  get_cache_size_from_config());
341  d_enabled = d_instance->cache_enabled();
342  if (!d_enabled) {
343  delete d_instance;
344  d_instance = NULL;
345  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is DISABLED"<< endl);
346  }
347  else {
348  AT_EXIT(delete_instance);
349 
350  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::"<<__func__ << "() - " << "MDS is ENABLED"<< endl);
351  }
352  }
353 
354  BESDEBUG(DEBUG_KEY, "GlobalMetadataStore::get_instance() - d_instance: " << (void *) d_instance << endl);
355 
356  return d_instance;
357 }
359 
363 void
364 GlobalMetadataStore::initialize()
365 {
366  bool found;
367 
368  TheBESKeys::TheKeys()->get_value(LEDGER_KEY, d_ledger_name, found);
369  if (found) {
370  BESDEBUG(DEBUG_KEY, "Located BES key " << LEDGER_KEY << "=" << d_ledger_name << endl);
371  }
372  else {
373  d_ledger_name = default_ledger_name;
374  }
375 
376  ofstream of(d_ledger_name.c_str(), ios::app);
377 
378  // By default, use UTC in the logs.
379  string local_time = "no";
380  TheBESKeys::TheKeys()->get_value(LOCAL_TIME_KEY, local_time, found);
381  d_use_local_time = (local_time == "YES" || local_time == "Yes" || local_time == "yes");
382 }
383 
399 GlobalMetadataStore::GlobalMetadataStore()
400  : BESFileLockingCache(get_cache_dir_from_config(), get_cache_prefix_from_config(), get_cache_size_from_config())
401 {
402  initialize();
403 }
404 
405 GlobalMetadataStore::GlobalMetadataStore(const string &cache_dir, const string &prefix,
406  unsigned long long size) : BESFileLockingCache(cache_dir, prefix, size)
407 {
408  initialize();
409 }
411 
416 static void dump_time(ostream &os, bool use_local_time)
417 {
418  time_t now;
419  time(&now);
420  char buf[sizeof "YYYY-MM-DDTHH:MM:SSzone"];
421  int status = 0;
422 
423  // From StackOverflow:
424  // This will work too, if your compiler doesn't support %F or %T:
425  // strftime(buf, sizeof buf, "%Y-%m-%dT%H:%M:%S%Z", gmtime(&now));
426  //
427  // Apologies for the twisted logic - UTC is the default. Override to
428  // local time using BES.LogTimeLocal=yes in bes.conf. jhrg 11/15/17
429  if (!use_local_time)
430  status = strftime(buf, sizeof buf, "%FT%T%Z", gmtime(&now));
431  else
432  status = strftime(buf, sizeof buf, "%FT%T%Z", localtime(&now));
433 
434  if (!status)
435  LOG("Error getting time for Metadata Store ledger.");
436 
437  os << buf;
438 }
439 
443 void
445 {
446  // open just once, <- done SBL 11.7.19
447 
448  int fd; // value-result parameter;
449  if (get_exclusive_lock(d_ledger_name, fd)) {
450  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Ledger " << d_ledger_name << " write locked." << endl);
451  if (of) {
452  try {
453  dump_time(of, d_use_local_time);
454  of << " " << d_ledger_entry << endl;
455  VERBOSE("MDS Ledger name: '" << d_ledger_name << "', entry: '" << d_ledger_entry + "'.");
456  unlock_and_close(d_ledger_name); // closes fd
457  }
458  catch (...) {
459  unlock_and_close(d_ledger_name);
460  throw;
461  }
462  }
463  else {
464  LOG("Warning: Metadata store could not write to its ledger file.");
465  unlock_and_close(d_ledger_name);
466  }
467  }
468  else {
469  throw BESInternalError("Could not write lock '" + d_ledger_name, __FILE__, __LINE__);
470  }
471 }
472 
479 string
480 GlobalMetadataStore::get_hash(const string &name)
481 {
482  if (name.empty())
483  throw BESInternalError("Empty name passed to the Metadata Store.", __FILE__, __LINE__);
484 
485  return picosha2::hash256_hex_string(name[0] == '/' ? name : "/" + name);
486 }
487 
505 {
506  if (d_dds) {
507  D4BaseTypeFactory factory;
508  DMR dmr(&factory, *d_dds);
509  XMLWriter xml;
510  dmr.print_dap4(xml);
511  os << xml.get_doc();
512  }
513  else if (d_dmr) {
514  XMLWriter xml;
515  d_dmr->print_dap4(xml);
516  os << xml.get_doc();
517  }
518  else {
519  throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
520  }
521 }
522 
525  if (d_dds)
526  d_dds->print(os);
527  else if (d_dmr)
528  d_dmr->getDDS()->print(os);
529  else
530  throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
531 }
532 
535  if (d_dds)
536  d_dds->print_das(os);
537  else if (d_dmr)
538  d_dmr->getDDS()->print_das(os);
539  else
540  throw BESInternalFatalError("Unknown DAP object type.", __FILE__, __LINE__);
541 }
542 
557 bool
558 GlobalMetadataStore::store_dap_response(StreamDAP &writer, const string &key, const string &name,
559  const string &response_name)
560 {
561  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " BEGIN " << key << endl);
562 
563  string item_name = get_cache_file_name(key, false /*mangle*/);
564 
565  int fd;
566  if (create_and_lock(item_name, fd)) {
567  BESDEBUG(DEBUG_KEY,__FUNCTION__ << " Storing " << item_name << endl);
568 
569  // Get an output stream directed at the locked cache file
570  ofstream response(item_name.c_str(), ios::out|ios::app);
571  if (!response.is_open())
572  throw BESInternalError("Could not open '" + key + "' to write the response.", __FILE__, __LINE__);
573 
574  try {
575  // for the different writers, look at the StreamDAP struct in the class
576  // definition. jhrg 2.27.18
577  writer(response); // different writers can write the DDS, DAS or DMR
578 
579  // Compute/update/maintain the cache size? This is extra work
580  // that might never be used. It also locks the cache...
581  if (!is_unlimited() || MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED) {
582  // This enables the call to update_cache_info() below.
584 
585  unsigned long long size = update_cache_info(item_name);
586  if (!is_unlimited() && cache_too_big(size)) update_and_purge(item_name);
587  }
588 
589  unlock_and_close(item_name);
590  }
591  catch (...) {
592  // Bummer. There was a problem doing The Stuff. Now we gotta clean up.
593  response.close();
594  this->purge_file(item_name);
595  unlock_and_close(item_name);
596  throw;
597  }
598 
599  VERBOSE("Metadata store: Wrote " << response_name << " response for '" << name << "'." << endl);
600  d_ledger_entry.append(" ").append(key);
601 
602  return true;
603  }
604  else if (get_read_lock(item_name, fd)) {
605  // We found the key; didn't add this because it was already here
606  BESDEBUG(DEBUG_KEY,__FUNCTION__ << " Found " << item_name << " in the store already." << endl);
607  unlock_and_close(item_name);
608 
609  LOG("Metadata store: unable to store the " << response_name << " response for '" << name << "'." << endl);
610 
611  return false;
612  }
613  else {
614  throw BESInternalError("Could neither create or open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
615  }
616 }
617 
632 
644 bool
645 GlobalMetadataStore::add_responses(DDS *dds, const string &name)
646 {
647  // Start the index entry
648  d_ledger_entry = string("add DDS ").append(name);
649 
650  // I'm appending the 'dds r' string to the name before hashing so that
651  // the different hashes for the file's DDS, DAS, ..., are all very different.
652  // This will be useful if we use S3 instead of EFS for the Metadata Store.
653  //
654  // The helper() also updates the ledger string.
655  StreamDDS write_the_dds_response(dds);
656  bool stored_dds = store_dap_response(write_the_dds_response, get_hash(name + "dds_r"), name, "DDS");
657 
658  StreamDAS write_the_das_response(dds);
659  bool stored_das = store_dap_response(write_the_das_response, get_hash(name + "das_r"), name, "DAS");
660 
661 #if SYMETRIC_ADD_RESPONSES
662  StreamDMR write_the_dmr_response(dds);
663  bool stored_dmr = store_dap_response(write_the_dmr_response, get_hash(name + "dmr_r"), name, "DMR");
664 #endif
665 
666  write_ledger(); // write the index line
667 
668 #if SYMETRIC_ADD_RESPONSES
669  return (stored_dds && stored_das && stored_dmr);
670 #else
671  return (stored_dds && stored_das);
672 #endif
673 }
674 
686 bool
687 GlobalMetadataStore::add_responses(DMR *dmr, const string &name)
688 {
689  // Start the index entry
690  d_ledger_entry = string("add DMR ").append(name);
691 
692  // I'm appending the 'dds r' string to the name before hashing so that
693  // the different hashes for the file's DDS, DAS, ..., are all very different.
694  // This will be useful if we use S3 instead of EFS for the Metadata Store.
695  //
696  // The helper() also updates the ledger string.
697 #if SYMETRIC_ADD_RESPONSES
698  StreamDDS write_the_dds_response(dmr);
699  bool stored_dds = store_dap_response(write_the_dds_response, get_hash(name + "dds_r"), name, "DDS");
700 
701  StreamDAS write_the_das_response(dmr);
702  bool stored_das = store_dap_response(write_the_das_response, get_hash(name + "das_r"), name, "DAS");
703 #endif
704 
705  StreamDMR write_the_dmr_response(dmr);
706  bool stored_dmr = store_dap_response(write_the_dmr_response, get_hash(name + "dmr_r"), name, "DMR");
707 
708  write_ledger(); // write the index line
709 
710 #if SYMETRIC_ADD_RESPONSES
711  return (stored_dds && stored_das && stored_dmr);
712 #else
713  return(stored_dmr /* && stored_dmrpp */);
714 #endif
715 }
717 
731 GlobalMetadataStore::get_read_lock_helper(const string &name, const string &suffix, const string &object_name)
732 {
733  BESDEBUG(DEBUG_KEY, __func__ << "() MDS hashing name '" << name << "', '" << suffix << "'"<< endl);
734 
735  if (name.empty())
736  throw BESInternalError("An empty name string was received by "
737  "GlobalMetadataStore::get_read_lock_helper(). That should never happen.", __FILE__, __LINE__);
738 
739  string item_name = get_cache_file_name(get_hash(name + suffix), false);
740  int fd;
741  MDSReadLock lock(item_name, get_read_lock(item_name, fd), this);
742  BESDEBUG(DEBUG_KEY, __func__ << "() MDS lock for " << item_name << ": " << lock() << endl);
743 
744  if (lock())
745  LOG("MDS Cache hit for '" << name << "' and response " << object_name << endl);
746  else
747  LOG("MDS Cache miss for '" << name << "' and response " << object_name << endl);
748 
749  return lock;
750  }
751 
772 {
773  return get_read_lock_helper(name,"dmr_r","DMR");
774 }//end is_dmr_available(string)
775 
778 {
779  //call get_read_lock_helper
780  MDSReadLock lock = get_read_lock_helper(container.get_relative_name(), "dmr_r", "DMR");
781  if (lock()){
782 
783  bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "dmr_r");
784 
785  if(reload){
786  lock.clearLock();
787  return lock;
788  }//end if
789  else{
790  return lock;
791  }//end else
792 
793  }//end if(is locked)
794  else{
795  return lock;
796  }//end else
797 
798 }//end is_dmr_available(BESContainer)
799 
801 GlobalMetadataStore::is_dmr_available(const std::string &realName, const std::string &relativeName, const std::string &fileType)
802 {
803  //call get_read_lock_helper
804  MDSReadLock lock = get_read_lock_helper(relativeName,"dmr_r","DMR");
805  if (lock()){
806 
807  bool reload = is_available_helper(realName, relativeName, fileType, "dmr_r");
808 
809  if(reload){
810  lock.clearLock();
811  return lock;
812  }//end if
813  else{
814  return lock;
815  }//end else
816 
817  }//end if(is locked)
818  else{
819  return lock;
820  }//end else
821 
822 }//end is_dmr_available(string, string, string)
823 
833 {
834  return get_read_lock_helper(name,"dds_r","DDS");
835 }//end is_dds_available(string)
836 
849 {
850  //call get_read_lock_helper
851  MDSReadLock lock = get_read_lock_helper(container.get_relative_name(),"dds_r","DDS");
852  if (lock()){
853 
854  bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "dds_r");
855 
856  if(reload){
857  lock.clearLock();
858  return lock;
859  }//end if
860  else{
861  return lock;
862  }//end else
863 
864  }//end if(is locked)
865  else{
866  return lock;
867  }//end else
868 
869 }//end is_dds_available(BESContainer)
870 
880 {
881  return get_read_lock_helper(name,"das_r","DAS");
882 }//end is_das_available(string)
883 
886 {
887  //return get_read_lock_helper(name, "das_r", "DAS");
888  //call get_read_lock_helper
889  MDSReadLock lock = get_read_lock_helper(container.get_relative_name(),"das_r","DAS");
890  if (lock()){
891 
892  bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "das_r");
893 
894  if(reload){
895  lock.clearLock();
896  return lock;
897  }//end if
898  else{
899  return lock;
900  }//end else
901 
902  }//end if(is locked)
903  else{
904  return lock;
905  }//end else
906 
907 }//end is_das_available(BESContainer)
908 
929 {
930  return get_read_lock_helper(name,"dmrpp_r","DMR++");
931 }//end is_dmrpp_available(string)
932 
935 {
936  //return get_read_lock_helper(name, "dmrpp_r", "DMR++");
937  //call get_read_lock_helper
938  MDSReadLock lock = get_read_lock_helper(container.get_relative_name(),"dmrpp_r","DMR++");
939  if (lock()){
940 
941  bool reload = is_available_helper(container.get_real_name(), container.get_relative_name(), container.get_container_type(), "dmrpp_r");
942 
943  if(reload){
944  lock.clearLock();
945  return lock;
946  }//end if
947  else{
948  return lock;
949  }//end else
950 
951  }//end if(is locked)
952  else{
953  return lock;
954  }//end else
955 
956 }//end is_dmrpp_available(BESContainer)
957 
968 bool
969 GlobalMetadataStore::is_available_helper(const string &realName, const string &relativeName, const string &fileType, const string &suffix)
970 {
971  //use type with find_handler() to get handler
972  BESRequestHandler *besRH = BESRequestHandlerList::TheList()->find_handler(fileType);
973 
974  //use handler.get_lmt()
975  time_t file_time = besRH->get_lmt(realName);
976 
977  //get the cache time of the handler
978  time_t cache_time = get_cache_lmt(relativeName, suffix);
979 
980  //compare file lmt and time of creation of cache
981  if (file_time > cache_time){
982  return true;
983  }//end if(file > cache)
984  else {
985  return false;
986  }//end else
987 }
988 
996 time_t
997 GlobalMetadataStore::get_cache_lmt(const string &fileName, const string &suffix)
998 {
999  string item_name = get_cache_file_name(get_hash(fileName + suffix), false);
1000  struct stat statbuf;
1001 
1002  if (stat(item_name.c_str(), &statbuf) == -1){
1003  throw BESNotFoundError(strerror(errno), __FILE__, __LINE__);
1004  }//end if(error)
1005 
1006  return statbuf.st_mtime;
1007 }//end get_cache_lmt()
1008 
1011 
1019 void
1020 GlobalMetadataStore::write_response_helper(const string &name, ostream &os, const string &suffix, const string &object_name)
1021 {
1022  string item_name = get_cache_file_name(get_hash(name + suffix), false);
1023  int fd; // value-result parameter;
1024  if (get_read_lock(item_name, fd)) {
1025  VERBOSE("Metadata store: Cache hit: read " << object_name << " response for '" << name << "'." << endl);
1026  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Found " << item_name << " in the store." << endl);
1027  try {
1028  transfer_bytes(fd, os);
1029  unlock_and_close(item_name); // closes fd
1030  }
1031  catch (...) {
1032  unlock_and_close(item_name);
1033  throw;
1034  }
1035  }
1036  else {
1037  throw BESInternalError("Could not open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
1038  }
1039 }
1040 
1049 void
1050 GlobalMetadataStore::write_response_helper(const string &name, ostream &os, const string &suffix, const string &xml_base,
1051  const string &object_name)
1052 {
1053  string item_name = get_cache_file_name(get_hash(name + suffix), false);
1054  int fd; // value-result parameter;
1055  if (get_read_lock(item_name, fd)) {
1056  VERBOSE("Metadata store: Cache hit: read " << object_name << " response for '" << name << "'." << endl);
1057  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Found " << item_name << " in the store." << endl);
1058  try {
1059  insert_xml_base(fd, os, xml_base);
1060 
1061  transfer_bytes(fd, os);
1062  unlock_and_close(item_name); // closes fd
1063  }
1064  catch (...) {
1065  unlock_and_close(item_name);
1066  throw;
1067  }
1068  }
1069  else {
1070  throw BESInternalError("Could not open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
1071  }
1072 }
1074 
1081 void
1082 GlobalMetadataStore::write_dds_response(const std::string &name, ostream &os)
1083 {
1084  write_response_helper(name, os, "dds_r", "DDS");
1085 }
1086 
1093 void
1094 GlobalMetadataStore::write_das_response(const std::string &name, ostream &os)
1095 {
1096  write_response_helper(name, os, "das_r", "DAS");
1097 }
1098 
1105 void
1106 GlobalMetadataStore::write_dmr_response(const std::string &name, ostream &os)
1107 {
1108  bool found = false;
1109  string xml_base = BESContextManager::TheManager()->get_context("xml:base", found);
1110  if (!found) {
1111 #if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE
1112  write_response_helper(name, os, "dmr_r", "DMR");
1113 #else
1114  throw BESInternalError("Could not read the value of xml:base.", __FILE__, __LINE__);
1115 #endif
1116  }
1117  else {
1118  write_response_helper(name, os, "dmr_r", xml_base, "DMR");
1119  }
1120 }
1121 
1128 void
1129 GlobalMetadataStore::write_dmrpp_response(const std::string &name, ostream &os)
1130 {
1131  bool found = false;
1132  string xml_base = BESContextManager::TheManager()->get_context("xml:base", found);
1133  if (!found) {
1134 #if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE
1135  write_response_helper(name, os, "dmrpp_r", "DMR++");
1136 #else
1137  throw BESInternalError("Could not read the value of xml:base.", __FILE__, __LINE__);
1138 #endif
1139  }
1140  else {
1141  write_response_helper(name, os, "dmrpp_r", xml_base, "DMR++");
1142  }
1143 }
1144 
1152 bool
1153 GlobalMetadataStore::remove_response_helper(const string& name, const string &suffix, const string &object_name)
1154 {
1155  string hash = get_hash(name + suffix);
1156  if (unlink(get_cache_file_name(hash, false).c_str()) == 0) {
1157  VERBOSE("Metadata store: Removed " << object_name << " response for '" << hash << "'." << endl);
1158  d_ledger_entry.append(" ").append(hash);
1159  return true;
1160  }
1161  else {
1162  LOG("Metadata store: unable to remove the " << object_name << " response for '" << name << "' (" << strerror(errno) << ")."<< endl);
1163  }
1164 
1165  return false;
1166 }
1167 
1174 bool
1176 {
1177  // Start the index entry
1178  d_ledger_entry = string("remove ").append(name);
1179 
1180  bool removed_dds = remove_response_helper(name, "dds_r", "DDS");
1181 
1182  bool removed_das = remove_response_helper(name, "das_r", "DAS");
1183 
1184  bool removed_dmr = remove_response_helper(name, "dmr_r", "DMR");
1185 
1186  bool removed_dmrpp = remove_response_helper(name, "dmrpp_r", "DMR++");
1187 
1188  write_ledger(); // write the index line
1189 
1190 #if SYMETRIC_ADD_RESPONSES
1191  return (removed_dds && removed_das && removed_dmr);
1192 #else
1193  return (removed_dds || removed_das || removed_dmr || removed_dmrpp);
1194 #endif
1195 }
1196 
1209 DMR *
1211 {
1212  stringstream oss;
1213  write_dmr_response(name, oss); // throws BESInternalError if not found
1214 
1215  D4BaseTypeFactory d4_btf;
1216  auto_ptr<DMR> dmr(new DMR(&d4_btf, "mds"));
1217 
1218  D4ParserSax2 parser;
1219  parser.intern(oss.str(), dmr.get());
1220 
1221  dmr->set_factory(0);
1222 
1223  return dmr.release();
1224 }
1225 
1247 DDS *
1249 {
1250  TempFile dds_tmp(get_cache_directory() + "/opendapXXXXXX");
1251 
1252  fstream dds_fs(dds_tmp.get_name().c_str(), std::fstream::out);
1253  try {
1254  write_dds_response(name, dds_fs); // throws BESInternalError if not found
1255  dds_fs.close();
1256  }
1257  catch (...) {
1258  dds_fs.close();
1259  throw;
1260  }
1261 
1262  BaseTypeFactory btf;
1263  auto_ptr<DDS> dds(new DDS(&btf));
1264  dds->parse(dds_tmp.get_name());
1265 
1266  TempFile das_tmp(get_cache_directory() + "/opendapXXXXXX");
1267  fstream das_fs(das_tmp.get_name().c_str(), std::fstream::out);
1268  try {
1269  write_das_response(name, das_fs); // throws BESInternalError if not found
1270  das_fs.close();
1271  }
1272  catch (...) {
1273  das_fs.close();
1274  throw;
1275  }
1276 
1277  auto_ptr<DAS> das(new DAS());
1278  das->parse(das_tmp.get_name());
1279 
1280  dds->transfer_attributes(das.get());
1281  dds->set_factory(0);
1282 
1283  return dds.release();
1284 }
1285 
1286 
1287 void
1288 GlobalMetadataStore::parse_das_from_mds(libdap::DAS* das, const std::string &name) {
1289  string suffix = "das_r";
1290  string item_name = get_cache_file_name(get_hash(name + suffix), false);
1291  int fd; // value-result parameter;
1292  if (get_read_lock(item_name, fd)) {
1293  VERBOSE("Metadata store: Cache hit: read " << " response for '" << name << "'." << endl);
1294  BESDEBUG(DEBUG_KEY, __FUNCTION__ << " Found " << item_name << " in the store." << endl);
1295  try {
1296  // Just generate the DAS by parsing from the file
1297  das->parse(item_name);
1298  unlock_and_close(item_name); // closes fd
1299  }
1300  catch (...) {
1301  unlock_and_close(item_name);
1302  throw;
1303  }
1304  }
1305  else {
1306  throw BESInternalError("Could not open '" + item_name + "' in the metadata store.", __FILE__, __LINE__);
1307  }
1308 
1309 }
1310 
A container is something that holds data. E.G., a netcdf file or a database entry.
Definition: BESContainer.h:65
std::string get_relative_name() const
Get the relative name of the object in this container.
Definition: BESContainer.h:186
std::string get_container_type() const
retrieve the type of data this container holds, such as cedar or netcdf.
Definition: BESContainer.h:232
std::string get_real_name() const
retrieve the real name for this container, such as a file name.
Definition: BESContainer.h:180
virtual std::string get_context(const std::string &name, bool &found)
retrieve the value of the specified context from the BES
Implementation of a caching mechanism for compressed data.
virtual void unlock_and_close(const std::string &target)
const std::string get_cache_directory()
bool is_unlimited() const
Is this cache allowed to store as much as it wants?
virtual unsigned long long update_cache_info(const std::string &target)
Update the cache info file to include 'target'.
virtual bool create_and_lock(const std::string &target, int &fd)
Create a file in the cache and lock it for write access.
virtual void exclusive_to_shared_lock(int fd)
Transfer from an exclusive lock to a shared lock.
virtual bool get_read_lock(const std::string &target, int &fd)
Get a read-only lock on the file if it exists.
virtual void purge_file(const std::string &file)
Purge a single file from the cache.
virtual bool cache_too_big(unsigned long long current_size) const
look at the cache size; is it too large? Look at the cache size and see if it is too big.
virtual void update_and_purge(const std::string &new_file)
Purge files from the cache.
virtual std::string get_cache_file_name(const std::string &src, bool mangle=true)
exception thrown if internal error encountered
exception thrown if an internal error is found and is fatal to the BES
error thrown if the resource requested cannot be found
virtual BESRequestHandler * find_handler(const std::string &handler_name)
find and return the specified request handler
Represents a specific data type request handler.
virtual time_t get_lmt(const std::string &name)
Get the Last modified time for.
static std::string lowercase(const std::string &s)
Definition: BESUtil.cc:200
void get_value(const std::string &s, std::string &val, bool &found)
Retrieve the value of a given key, if set.
Definition: TheBESKeys.cc:272
static TheBESKeys * TheKeys()
Definition: TheBESKeys.cc:62
Store the DAP metadata responses.
virtual libdap::DDS * get_dds_object(const std::string &name)
Build a DDS object from the cached Response.
virtual void write_dmr_response(const std::string &name, std::ostream &os)
Write the stored DMR response to a stream.
void write_response_helper(const std::string &name, std::ostream &os, const std::string &suffix, const std::string &object_name)
std::string get_hash(const std::string &name)
static void transfer_bytes(int fd, std::ostream &os)
virtual bool remove_responses(const std::string &name)
Remove all cached responses and objects for a granule.
virtual void write_dds_response(const std::string &name, std::ostream &os)
Write the stored DDS response to a stream.
void initialize()
Configure the ledger using LEDGER_KEY and LOCAL_TIME_KEY.
bool store_dap_response(StreamDAP &writer, const std::string &key, const std::string &name, const std::string &response_name)
static void insert_xml_base(int fd, std::ostream &os, const std::string &xml_base)
like transfer_bytes(), but adds the xml:base attribute to the DMR/++
virtual void write_das_response(const std::string &name, std::ostream &os)
Write the stored DAS response to a stream.
bool remove_response_helper(const std::string &name, const std::string &suffix, const std::string &object_name)
virtual MDSReadLock is_dmrpp_available(const std::string &name)
Is the DMR++ response for.
virtual MDSReadLock is_dds_available(const std::string &name)
Is the DDS response for.
virtual libdap::DMR * get_dmr_object(const std::string &name)
Build a DMR object from the cached Response.
MDSReadLock get_read_lock_helper(const std::string &name, const std::string &suffix, const std::string &object_name)
virtual time_t get_cache_lmt(const std::string &fileName, const std::string &suffix)
Get the last modified time for the cached object file.
virtual MDSReadLock is_dmr_available(const std::string &name)
Is the DMR response for.
virtual MDSReadLock is_das_available(const std::string &name)
Is the DAS response for.
virtual bool is_available_helper(const std::string &realName, const std::string &relativeName, const std::string &fileType, const std::string &suffix)
helper function that checks if last modified time is greater than cached file
virtual bool add_responses(libdap::DDS *dds, const std::string &name)
Add the DAP2 metadata responses using a DDS.
virtual void write_dmrpp_response(const std::string &name, std::ostream &os)
Write the stored DMR++ response to a stream.
Get a new temporary file.
Definition: TempFile.h:46
std::string get_name() const
Definition: TempFile.h:70
Unlock and close the MDS item when the ReadLock goes out of scope.
Instantiate with a DDS or DMR and use to write the DAS response.
virtual void operator()(std::ostream &os)
Instantiate with a DDS or DMR and use to write the DDS response.
virtual void operator()(std::ostream &os)
Instantiate with a DDS or DMR and use to write the DMR response.
virtual void operator()(std::ostream &os)
Use an object (DDS or DMR) to write data to the MDS.