00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 #include "libts.h"
00030 
00031 #include "Error.h"
00032 
00033 #include "LogUtils.h"
00034 #include "LogSock.h"
00035 #include "LogField.h"
00036 #include "LogFile.h"
00037 #include "LogFormat.h"
00038 #include "LogBuffer.h"
00039 #include "LogHost.h"
00040 #include "LogObject.h"
00041 #include "LogConfig.h"
00042 #include "Log.h"
00043 
00044 #include "LogCollationClientSM.h"
00045 
00046 #define PING    true
00047 #define NOPING  false
00048 
00049 
00050 
00051 
00052 
00053 LogHost::LogHost(const char *object_filename, uint64_t object_signature)
00054   : m_object_filename(ats_strdup(object_filename)),
00055     m_object_signature(object_signature),
00056     m_port(0),
00057     m_name(NULL),
00058     m_sock(NULL),
00059     m_sock_fd(-1),
00060     m_connected(false),
00061     m_orphan_file(NULL)
00062   , m_log_collation_client_sm(NULL)
00063 {
00064   ink_zero(m_ip);
00065   ink_zero(m_ipstr);
00066 }
00067 
00068 LogHost::LogHost(const LogHost & rhs)
00069   : m_object_filename(ats_strdup(rhs.m_object_filename)),
00070     m_object_signature(rhs.m_object_signature),
00071     m_ip(rhs.m_ip),
00072     m_port(0),
00073     m_name(ats_strdup(rhs.m_name)),
00074     m_sock(NULL),
00075     m_sock_fd(-1),
00076     m_connected(false),
00077     m_orphan_file(NULL)
00078   , m_log_collation_client_sm(NULL)
00079 {
00080   memcpy(m_ipstr, rhs.m_ipstr, sizeof(m_ipstr));
00081   create_orphan_LogFile_object();
00082 }
00083 
00084 LogHost::~LogHost()
00085 {
00086   clear();
00087   ats_free(m_object_filename);
00088 }
00089 
00090 
00091 
00092 
00093 
00094 
00095 
00096 
00097 int
00098 LogHost::set_name_port(char *hostname, unsigned int pt)
00099 {
00100   if (!hostname || hostname[0] == 0) {
00101     Note("Cannot establish LogHost with NULL hostname");
00102     return 1;
00103   }
00104 
00105   clear();                      
00106 
00107   m_name = ats_strdup(hostname);
00108   m_port = pt;
00109 
00110   Debug("log-host", "LogHost established as %s:%u", this->name(), this->port());
00111 
00112   create_orphan_LogFile_object();
00113   return 0;
00114 }
00115 
00116 int
00117 LogHost::set_ipstr_port(char *ipstr, unsigned int pt)
00118 {
00119   if (!ipstr || ipstr[0] == 0) {
00120     Note("Cannot establish LogHost with NULL ipstr");
00121     return 1;
00122   }
00123 
00124   clear();                      
00125 
00126   if (0 != m_ip.load(ipstr))
00127     Note("Log host failed to parse IP address %s", ipstr);
00128   m_port = pt;
00129   ink_strlcpy(m_ipstr, ipstr, sizeof(m_ipstr));
00130   m_name = ats_strdup(ipstr);
00131 
00132   Debug("log-host", "LogHost established as %s:%u", name(), pt);
00133 
00134   create_orphan_LogFile_object();
00135   return 0;
00136 }
00137 
00138 int
00139 LogHost::set_name_or_ipstr(char *name_or_ip)
00140 {
00141   int retVal = 1;
00142 
00143   if (name_or_ip && name_or_ip[0] != 0) {
00144     ts::ConstBuffer addr, port;
00145     if (ats_ip_parse(ts::ConstBuffer(name_or_ip, strlen(name_or_ip)), &addr, &port)==0) {
00146       uint16_t p = port ? atoi(port.data()) : Log::config->collation_port;
00147       char* n = const_cast<char*>(addr.data());
00148       
00149       
00150       n[addr.size()] = 0;
00151       if (AF_UNSPEC == ats_ip_check_characters(addr)) {
00152         retVal = set_name_port(n, p);
00153       } else {
00154         retVal = set_ipstr_port(n, p);
00155       }
00156     }
00157   }
00158   return retVal;
00159 }
00160 
00161 bool LogHost::connected(bool ping)
00162 {
00163   if (m_connected && m_sock && m_sock_fd >= 0) {
00164     if (m_sock->is_connected(m_sock_fd, ping)) {
00165       return true;
00166     }
00167   }
00168   return false;
00169 }
00170 
00171 bool LogHost::connect()
00172 {
00173   if (! m_ip.isValid()) {
00174     Note("Cannot connect to LogHost; host IP has not been established");
00175     return false;
00176   }
00177 
00178   if (connected(PING)) {
00179     return true;
00180   }
00181 
00182   IpEndpoint target;
00183   ip_port_text_buffer ipb;
00184   target.assign(m_ip, htons(m_port));
00185 
00186   if (is_debug_tag_set("log-host")) {
00187     Debug("log-host", "Connecting to LogHost %s", ats_ip_nptop(&target, ipb, sizeof ipb));
00188   }
00189 
00190   disconnect();                 
00191 
00192   if (m_sock == NULL) {
00193     m_sock = new LogSock();
00194     ink_assert(m_sock != NULL);
00195   }
00196   m_sock_fd = m_sock->connect(&target.sa);
00197   if (m_sock_fd < 0) {
00198     Note("Connection to LogHost %s failed", ats_ip_nptop(&target, ipb, sizeof ipb));
00199     return false;
00200   }
00201   m_connected = true;
00202 
00203   if (!authenticated()) {
00204     Note("Authentication to LogHost %s failed", ats_ip_nptop(&target, ipb, sizeof ipb));
00205     disconnect();
00206     return false;
00207   }
00208 
00209   return true;
00210 }
00211 
00212 void
00213 LogHost::disconnect()
00214 {
00215   if (m_sock && m_sock_fd >= 0) {
00216     m_sock->close(m_sock_fd);
00217     m_sock_fd = -1;
00218   }
00219   if (m_log_collation_client_sm) {
00220     delete m_log_collation_client_sm;
00221     m_log_collation_client_sm = NULL;
00222   }
00223   m_connected = false;
00224 }
00225 
00226 
00227 void
00228 LogHost::create_orphan_LogFile_object()
00229 {
00230   delete m_orphan_file;
00231 
00232   const char *orphan_ext = "orphan";
00233   unsigned name_len = (unsigned) (strlen(m_object_filename) + strlen(name()) + strlen(orphan_ext) + 16);
00234   char *name_buf = (char *)ats_malloc(name_len);
00235 
00236   
00237   
00238   snprintf(name_buf, name_len, "%s%s%s-%u.%s",
00239                m_object_filename, LOGFILE_SEPARATOR_STRING, name(), port(), orphan_ext);
00240 
00241   
00242   
00243   m_orphan_file = new LogFile(name_buf, NULL, LOG_FILE_ASCII, m_object_signature);
00244   ink_assert(m_orphan_file != NULL);
00245   ats_free(name_buf);
00246 }
00247 
00248 
00249 
00250 
00251 
00252 int
00253 LogHost::preproc_and_try_delete (LogBuffer *lb)
00254 {
00255   int ret = -1;
00256 
00257   if (lb == NULL) {
00258     Note("Cannot write LogBuffer to LogHost %s; LogBuffer is NULL", name());
00259     return -1;
00260   }
00261   LogBufferHeader *buffer_header = lb->header();
00262   if (buffer_header == NULL) {
00263     Note("Cannot write LogBuffer to LogHost %s; LogBufferHeader is NULL",
00264         name());
00265     goto done;
00266   }
00267   if (buffer_header->entry_count == 0) {
00268     
00269     goto done;
00270   }
00271 
00272   
00273   if (m_log_collation_client_sm == NULL) {
00274     m_log_collation_client_sm = new LogCollationClientSM(this);
00275     ink_assert(m_log_collation_client_sm != NULL);
00276   }
00277 
00278   
00279   if (m_log_collation_client_sm->send(lb) <= 0)
00280     goto done;
00281 
00282   return 0;
00283 
00284 done:
00285   LogBuffer::destroy(lb);
00286   return ret;
00287 }
00288 
00289 
00290 
00291 
00292 
00293 void
00294 LogHost::orphan_write_and_try_delete(LogBuffer * lb)
00295 {
00296   RecIncrRawStat(log_rsb, this_thread()->mutex->thread_holding,
00297                  log_stat_num_lost_before_sent_to_network_stat,
00298                  lb->header()->entry_count);
00299 
00300   RecIncrRawStat(log_rsb, this_thread()->mutex->thread_holding,
00301                  log_stat_bytes_lost_before_sent_to_network_stat,
00302                  lb->header()->byte_count);
00303 
00304   if (!Log::config->logging_space_exhausted) {
00305     Debug("log-host", "Sending LogBuffer to orphan file %s", m_orphan_file->get_name());
00306     m_orphan_file->preproc_and_try_delete(lb);
00307   } else {
00308     Debug("log-host", "logging space exhausted, failed to write orphan file, drop(%" PRIu32 ") bytes",
00309          lb->header()->byte_count);
00310     LogBuffer::destroy(lb);
00311   }
00312 }
00313 
00314 void
00315 LogHost::display(FILE * fd)
00316 {
00317   fprintf(fd, "LogHost: %s:%u, %s\n", name(), port(), (connected(NOPING)) ? "connected" : "not connected");
00318 }
00319 
00320 void
00321 LogHost::clear()
00322 {
00323   
00324 
00325   disconnect();
00326 
00327   ats_free(m_name);
00328   delete m_sock;
00329   m_orphan_file.clear();
00330 
00331   ink_zero(m_ip);
00332   m_port = 0;
00333   ink_zero(m_ipstr);
00334   m_name = NULL;
00335   m_sock = NULL;
00336   m_sock_fd = -1;
00337   m_connected = false;
00338 }
00339 
00340 bool LogHost::authenticated()
00341 {
00342   if (!connected(NOPING)) {
00343     Note("Cannot authenticate LogHost %s; not connected", name());
00344     return false;
00345   }
00346 
00347   Debug("log-host", "Authenticating LogHost %s ...", name());
00348   char *
00349     auth_key = Log::config->collation_secret;
00350   unsigned
00351     auth_key_len = (unsigned)::strlen(auth_key) + 1;    
00352   int
00353     bytes = m_sock->write(m_sock_fd, auth_key, auth_key_len);
00354   if ((unsigned) bytes != auth_key_len) {
00355     Debug("log-host", "... bad write on authenticate");
00356     return false;
00357   }
00358 
00359   Debug("log-host", "... authenticated");
00360   return true;
00361 }
00362 
00363 
00364 
00365 
00366 
00367 LogHostList::LogHostList()
00368 {
00369 }
00370 
00371 LogHostList::~LogHostList()
00372 {
00373   clear();
00374 }
00375 
00376 void
00377 LogHostList::add(LogHost * object, bool copy)
00378 {
00379   ink_assert(object != NULL);
00380   if (copy) {
00381     m_host_list.enqueue(new LogHost(*object));
00382   } else {
00383     m_host_list.enqueue(object);
00384   }
00385 }
00386 
00387 unsigned
00388 LogHostList::count()
00389 {
00390   unsigned cnt = 0;
00391   for (LogHost * host = first(); host; host = next(host)) {
00392     cnt++;
00393   }
00394   return cnt;
00395 }
00396 
00397 void
00398 LogHostList::clear()
00399 {
00400   LogHost *host;
00401   while ((host = m_host_list.dequeue())) {
00402     delete host;
00403   }
00404 }
00405 
00406 int
00407 LogHostList::preproc_and_try_delete(LogBuffer * lb)
00408 {
00409   int ret;
00410   unsigned nr_host, nr;
00411   bool need_orphan = true;
00412   LogHost *available_host = NULL;
00413 
00414   ink_release_assert(lb->m_references == 0);
00415 
00416   nr_host = nr = count();
00417   ink_atomic_increment(&lb->m_references, nr_host);
00418 
00419   for (LogHost * host = first(); host && nr; host = next(host)) {
00420     LogHost *lh = host;
00421     available_host = lh;
00422 
00423     do {
00424       ink_atomic_increment(&lb->m_references, 1);
00425       ret = lh->preproc_and_try_delete(lb);
00426       need_orphan = need_orphan && (ret < 0);
00427     } while (ret < 0 && (lh = lh->failover_link.next));
00428 
00429     nr--;
00430   }
00431 
00432   if (need_orphan && available_host) {
00433     ink_atomic_increment(&lb->m_references, 1);
00434     available_host->orphan_write_and_try_delete(lb);
00435   }
00436 
00437   LogBuffer::destroy(lb);
00438   return 0;
00439 }
00440 
00441 void
00442 LogHostList::display(FILE * fd)
00443 {
00444   for (LogHost * host = first(); host; host = next(host)) {
00445     host->display(fd);
00446   }
00447 }
00448 
00449 bool LogHostList::operator==(LogHostList & rhs)
00450 {
00451   LogHost *
00452     host;
00453   for (host = first(); host; host = next(host)) {
00454     LogHost* rhs_host;
00455     for (rhs_host = rhs.first(); rhs_host; rhs_host = next(host)) {
00456       if ((host->port() == rhs_host->port() && host->ip_addr().isValid() && host->ip_addr() == rhs_host->ip_addr()) ||
00457         (host->name() && rhs_host->name() && (strcmp(host->name(), rhs_host->name()) == 0)) ||
00458         (*(host->ipstr()) && *(rhs_host->ipstr()) && (strcmp(host->ipstr(), rhs_host->ipstr()) == 0))
00459       ) {
00460         break;
00461       }
00462     }
00463     if (rhs_host == NULL) {
00464       return false;
00465     }
00466   }
00467   return true;
00468 }
00469 
00470 int
00471 LogHostList::do_filesystem_checks()
00472 {
00473   for (LogHost * host = first(); host; host = next(host)) {
00474     if (host->do_filesystem_checks() < 0) {
00475       return -1;
00476     }
00477   }
00478   return 0;
00479 }