00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "libts.h"
00030
00031 #include "Error.h"
00032
00033 #include "LogUtils.h"
00034 #include "LogSock.h"
00035 #include "LogField.h"
00036 #include "LogFile.h"
00037 #include "LogFormat.h"
00038 #include "LogBuffer.h"
00039 #include "LogHost.h"
00040 #include "LogObject.h"
00041 #include "LogConfig.h"
00042 #include "Log.h"
00043
00044 #include "LogCollationClientSM.h"
00045
00046 #define PING true
00047 #define NOPING false
00048
00049
00050
00051
00052
00053 LogHost::LogHost(const char *object_filename, uint64_t object_signature)
00054 : m_object_filename(ats_strdup(object_filename)),
00055 m_object_signature(object_signature),
00056 m_port(0),
00057 m_name(NULL),
00058 m_sock(NULL),
00059 m_sock_fd(-1),
00060 m_connected(false),
00061 m_orphan_file(NULL)
00062 , m_log_collation_client_sm(NULL)
00063 {
00064 ink_zero(m_ip);
00065 ink_zero(m_ipstr);
00066 }
00067
00068 LogHost::LogHost(const LogHost & rhs)
00069 : m_object_filename(ats_strdup(rhs.m_object_filename)),
00070 m_object_signature(rhs.m_object_signature),
00071 m_ip(rhs.m_ip),
00072 m_port(0),
00073 m_name(ats_strdup(rhs.m_name)),
00074 m_sock(NULL),
00075 m_sock_fd(-1),
00076 m_connected(false),
00077 m_orphan_file(NULL)
00078 , m_log_collation_client_sm(NULL)
00079 {
00080 memcpy(m_ipstr, rhs.m_ipstr, sizeof(m_ipstr));
00081 create_orphan_LogFile_object();
00082 }
00083
00084 LogHost::~LogHost()
00085 {
00086 clear();
00087 ats_free(m_object_filename);
00088 }
00089
00090
00091
00092
00093
00094
00095
00096
00097 int
00098 LogHost::set_name_port(char *hostname, unsigned int pt)
00099 {
00100 if (!hostname || hostname[0] == 0) {
00101 Note("Cannot establish LogHost with NULL hostname");
00102 return 1;
00103 }
00104
00105 clear();
00106
00107 m_name = ats_strdup(hostname);
00108 m_port = pt;
00109
00110 Debug("log-host", "LogHost established as %s:%u", this->name(), this->port());
00111
00112 create_orphan_LogFile_object();
00113 return 0;
00114 }
00115
00116 int
00117 LogHost::set_ipstr_port(char *ipstr, unsigned int pt)
00118 {
00119 if (!ipstr || ipstr[0] == 0) {
00120 Note("Cannot establish LogHost with NULL ipstr");
00121 return 1;
00122 }
00123
00124 clear();
00125
00126 if (0 != m_ip.load(ipstr))
00127 Note("Log host failed to parse IP address %s", ipstr);
00128 m_port = pt;
00129 ink_strlcpy(m_ipstr, ipstr, sizeof(m_ipstr));
00130 m_name = ats_strdup(ipstr);
00131
00132 Debug("log-host", "LogHost established as %s:%u", name(), pt);
00133
00134 create_orphan_LogFile_object();
00135 return 0;
00136 }
00137
00138 int
00139 LogHost::set_name_or_ipstr(char *name_or_ip)
00140 {
00141 int retVal = 1;
00142
00143 if (name_or_ip && name_or_ip[0] != 0) {
00144 ts::ConstBuffer addr, port;
00145 if (ats_ip_parse(ts::ConstBuffer(name_or_ip, strlen(name_or_ip)), &addr, &port)==0) {
00146 uint16_t p = port ? atoi(port.data()) : Log::config->collation_port;
00147 char* n = const_cast<char*>(addr.data());
00148
00149
00150 n[addr.size()] = 0;
00151 if (AF_UNSPEC == ats_ip_check_characters(addr)) {
00152 retVal = set_name_port(n, p);
00153 } else {
00154 retVal = set_ipstr_port(n, p);
00155 }
00156 }
00157 }
00158 return retVal;
00159 }
00160
00161 bool LogHost::connected(bool ping)
00162 {
00163 if (m_connected && m_sock && m_sock_fd >= 0) {
00164 if (m_sock->is_connected(m_sock_fd, ping)) {
00165 return true;
00166 }
00167 }
00168 return false;
00169 }
00170
00171 bool LogHost::connect()
00172 {
00173 if (! m_ip.isValid()) {
00174 Note("Cannot connect to LogHost; host IP has not been established");
00175 return false;
00176 }
00177
00178 if (connected(PING)) {
00179 return true;
00180 }
00181
00182 IpEndpoint target;
00183 ip_port_text_buffer ipb;
00184 target.assign(m_ip, htons(m_port));
00185
00186 if (is_debug_tag_set("log-host")) {
00187 Debug("log-host", "Connecting to LogHost %s", ats_ip_nptop(&target, ipb, sizeof ipb));
00188 }
00189
00190 disconnect();
00191
00192 if (m_sock == NULL) {
00193 m_sock = new LogSock();
00194 ink_assert(m_sock != NULL);
00195 }
00196 m_sock_fd = m_sock->connect(&target.sa);
00197 if (m_sock_fd < 0) {
00198 Note("Connection to LogHost %s failed", ats_ip_nptop(&target, ipb, sizeof ipb));
00199 return false;
00200 }
00201 m_connected = true;
00202
00203 if (!authenticated()) {
00204 Note("Authentication to LogHost %s failed", ats_ip_nptop(&target, ipb, sizeof ipb));
00205 disconnect();
00206 return false;
00207 }
00208
00209 return true;
00210 }
00211
00212 void
00213 LogHost::disconnect()
00214 {
00215 if (m_sock && m_sock_fd >= 0) {
00216 m_sock->close(m_sock_fd);
00217 m_sock_fd = -1;
00218 }
00219 if (m_log_collation_client_sm) {
00220 delete m_log_collation_client_sm;
00221 m_log_collation_client_sm = NULL;
00222 }
00223 m_connected = false;
00224 }
00225
00226
00227 void
00228 LogHost::create_orphan_LogFile_object()
00229 {
00230 delete m_orphan_file;
00231
00232 const char *orphan_ext = "orphan";
00233 unsigned name_len = (unsigned) (strlen(m_object_filename) + strlen(name()) + strlen(orphan_ext) + 16);
00234 char *name_buf = (char *)ats_malloc(name_len);
00235
00236
00237
00238 snprintf(name_buf, name_len, "%s%s%s-%u.%s",
00239 m_object_filename, LOGFILE_SEPARATOR_STRING, name(), port(), orphan_ext);
00240
00241
00242
00243 m_orphan_file = new LogFile(name_buf, NULL, LOG_FILE_ASCII, m_object_signature);
00244 ink_assert(m_orphan_file != NULL);
00245 ats_free(name_buf);
00246 }
00247
00248
00249
00250
00251
00252 int
00253 LogHost::preproc_and_try_delete (LogBuffer *lb)
00254 {
00255 int ret = -1;
00256
00257 if (lb == NULL) {
00258 Note("Cannot write LogBuffer to LogHost %s; LogBuffer is NULL", name());
00259 return -1;
00260 }
00261 LogBufferHeader *buffer_header = lb->header();
00262 if (buffer_header == NULL) {
00263 Note("Cannot write LogBuffer to LogHost %s; LogBufferHeader is NULL",
00264 name());
00265 goto done;
00266 }
00267 if (buffer_header->entry_count == 0) {
00268
00269 goto done;
00270 }
00271
00272
00273 if (m_log_collation_client_sm == NULL) {
00274 m_log_collation_client_sm = new LogCollationClientSM(this);
00275 ink_assert(m_log_collation_client_sm != NULL);
00276 }
00277
00278
00279 if (m_log_collation_client_sm->send(lb) <= 0)
00280 goto done;
00281
00282 return 0;
00283
00284 done:
00285 LogBuffer::destroy(lb);
00286 return ret;
00287 }
00288
00289
00290
00291
00292
00293 void
00294 LogHost::orphan_write_and_try_delete(LogBuffer * lb)
00295 {
00296 RecIncrRawStat(log_rsb, this_thread()->mutex->thread_holding,
00297 log_stat_num_lost_before_sent_to_network_stat,
00298 lb->header()->entry_count);
00299
00300 RecIncrRawStat(log_rsb, this_thread()->mutex->thread_holding,
00301 log_stat_bytes_lost_before_sent_to_network_stat,
00302 lb->header()->byte_count);
00303
00304 if (!Log::config->logging_space_exhausted) {
00305 Debug("log-host", "Sending LogBuffer to orphan file %s", m_orphan_file->get_name());
00306 m_orphan_file->preproc_and_try_delete(lb);
00307 } else {
00308 Debug("log-host", "logging space exhausted, failed to write orphan file, drop(%" PRIu32 ") bytes",
00309 lb->header()->byte_count);
00310 LogBuffer::destroy(lb);
00311 }
00312 }
00313
00314 void
00315 LogHost::display(FILE * fd)
00316 {
00317 fprintf(fd, "LogHost: %s:%u, %s\n", name(), port(), (connected(NOPING)) ? "connected" : "not connected");
00318 }
00319
00320 void
00321 LogHost::clear()
00322 {
00323
00324
00325 disconnect();
00326
00327 ats_free(m_name);
00328 delete m_sock;
00329 m_orphan_file.clear();
00330
00331 ink_zero(m_ip);
00332 m_port = 0;
00333 ink_zero(m_ipstr);
00334 m_name = NULL;
00335 m_sock = NULL;
00336 m_sock_fd = -1;
00337 m_connected = false;
00338 }
00339
00340 bool LogHost::authenticated()
00341 {
00342 if (!connected(NOPING)) {
00343 Note("Cannot authenticate LogHost %s; not connected", name());
00344 return false;
00345 }
00346
00347 Debug("log-host", "Authenticating LogHost %s ...", name());
00348 char *
00349 auth_key = Log::config->collation_secret;
00350 unsigned
00351 auth_key_len = (unsigned)::strlen(auth_key) + 1;
00352 int
00353 bytes = m_sock->write(m_sock_fd, auth_key, auth_key_len);
00354 if ((unsigned) bytes != auth_key_len) {
00355 Debug("log-host", "... bad write on authenticate");
00356 return false;
00357 }
00358
00359 Debug("log-host", "... authenticated");
00360 return true;
00361 }
00362
00363
00364
00365
00366
00367 LogHostList::LogHostList()
00368 {
00369 }
00370
00371 LogHostList::~LogHostList()
00372 {
00373 clear();
00374 }
00375
00376 void
00377 LogHostList::add(LogHost * object, bool copy)
00378 {
00379 ink_assert(object != NULL);
00380 if (copy) {
00381 m_host_list.enqueue(new LogHost(*object));
00382 } else {
00383 m_host_list.enqueue(object);
00384 }
00385 }
00386
00387 unsigned
00388 LogHostList::count()
00389 {
00390 unsigned cnt = 0;
00391 for (LogHost * host = first(); host; host = next(host)) {
00392 cnt++;
00393 }
00394 return cnt;
00395 }
00396
00397 void
00398 LogHostList::clear()
00399 {
00400 LogHost *host;
00401 while ((host = m_host_list.dequeue())) {
00402 delete host;
00403 }
00404 }
00405
00406 int
00407 LogHostList::preproc_and_try_delete(LogBuffer * lb)
00408 {
00409 int ret;
00410 unsigned nr_host, nr;
00411 bool need_orphan = true;
00412 LogHost *available_host = NULL;
00413
00414 ink_release_assert(lb->m_references == 0);
00415
00416 nr_host = nr = count();
00417 ink_atomic_increment(&lb->m_references, nr_host);
00418
00419 for (LogHost * host = first(); host && nr; host = next(host)) {
00420 LogHost *lh = host;
00421 available_host = lh;
00422
00423 do {
00424 ink_atomic_increment(&lb->m_references, 1);
00425 ret = lh->preproc_and_try_delete(lb);
00426 need_orphan = need_orphan && (ret < 0);
00427 } while (ret < 0 && (lh = lh->failover_link.next));
00428
00429 nr--;
00430 }
00431
00432 if (need_orphan && available_host) {
00433 ink_atomic_increment(&lb->m_references, 1);
00434 available_host->orphan_write_and_try_delete(lb);
00435 }
00436
00437 LogBuffer::destroy(lb);
00438 return 0;
00439 }
00440
00441 void
00442 LogHostList::display(FILE * fd)
00443 {
00444 for (LogHost * host = first(); host; host = next(host)) {
00445 host->display(fd);
00446 }
00447 }
00448
00449 bool LogHostList::operator==(LogHostList & rhs)
00450 {
00451 LogHost *
00452 host;
00453 for (host = first(); host; host = next(host)) {
00454 LogHost* rhs_host;
00455 for (rhs_host = rhs.first(); rhs_host; rhs_host = next(host)) {
00456 if ((host->port() == rhs_host->port() && host->ip_addr().isValid() && host->ip_addr() == rhs_host->ip_addr()) ||
00457 (host->name() && rhs_host->name() && (strcmp(host->name(), rhs_host->name()) == 0)) ||
00458 (*(host->ipstr()) && *(rhs_host->ipstr()) && (strcmp(host->ipstr(), rhs_host->ipstr()) == 0))
00459 ) {
00460 break;
00461 }
00462 }
00463 if (rhs_host == NULL) {
00464 return false;
00465 }
00466 }
00467 return true;
00468 }
00469
00470 int
00471 LogHostList::do_filesystem_checks()
00472 {
00473 for (LogHost * host = first(); host; host = next(host)) {
00474 if (host->do_filesystem_checks() < 0) {
00475 return -1;
00476 }
00477 }
00478 return 0;
00479 }