00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "libts.h"
00029
00030 #include <stdio.h>
00031 #include <stdlib.h>
00032 #include <string.h>
00033
00034 #include "Error.h"
00035 #include "P_EventSystem.h"
00036 #include "LogField.h"
00037 #include "LogFilter.h"
00038 #include "LogFormat.h"
00039 #include "LogUtils.h"
00040 #include "LogFile.h"
00041 #include "LogHost.h"
00042 #include "LogObject.h"
00043 #include "LogAccess.h"
00044 #include "LogConfig.h"
00045 #include "LogBuffer.h"
00046 #include "Log.h"
00047
00048
00049 struct FieldListCacheElement
00050 {
00051 LogFieldList *fieldlist;
00052 char *symbol_str;
00053 };
00054
00055 enum
00056 {
00057 FIELDLIST_CACHE_SIZE = 256
00058 };
00059
00060 FieldListCacheElement fieldlist_cache[FIELDLIST_CACHE_SIZE];
00061 int fieldlist_cache_entries = 0;
00062 vint32 LogBuffer::M_ID = 0;
00063
00064
00065
00066
00067
00068
00069 char *
00070 LogBufferHeader::fmt_name()
00071 {
00072 char *addr = NULL;
00073 if (fmt_name_offset) {
00074 addr = (char *) this + fmt_name_offset;
00075 }
00076 return addr;
00077 }
00078
00079 char *
00080 LogBufferHeader::fmt_fieldlist()
00081 {
00082 char *addr = NULL;
00083 if (fmt_fieldlist_offset) {
00084 addr = (char *) this + fmt_fieldlist_offset;
00085 }
00086 return addr;
00087 }
00088
00089 char *
00090 LogBufferHeader::fmt_printf()
00091 {
00092 char *addr = NULL;
00093 if (fmt_printf_offset) {
00094 addr = (char *) this + fmt_printf_offset;
00095 }
00096 return addr;
00097 }
00098
00099 char *
00100 LogBufferHeader::src_hostname()
00101 {
00102 char *addr = NULL;
00103 if (src_hostname_offset) {
00104 addr = (char *) this + src_hostname_offset;
00105 }
00106 return addr;
00107 }
00108
00109 char *
00110 LogBufferHeader::log_filename()
00111 {
00112 char *addr = NULL;
00113 if (log_filename_offset) {
00114 addr = (char *) this + log_filename_offset;
00115 }
00116 return addr;
00117 }
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127 LogBuffer::LogBuffer(LogObject * owner, size_t size, size_t buf_align, size_t write_align):
00128 m_size(size),
00129 m_buf_align(buf_align),
00130 m_write_align(write_align), m_owner(owner),
00131 m_references(0)
00132 {
00133 size_t hdr_size;
00134
00135
00136
00137 m_unaligned_buffer = new char [size + buf_align];
00138 m_buffer = (char *)align_pointer_forward(m_unaligned_buffer, buf_align);
00139
00140
00141 hdr_size = _add_buffer_header();
00142
00143
00144 m_state.s.offset = hdr_size;
00145
00146
00147 m_id = (uint32_t) ink_atomic_increment((pvint32) & M_ID, 1);
00148
00149 m_expiration_time = LogUtils::timestamp() + Log::config->max_secs_per_buffer;
00150
00151 Debug("log-logbuffer","[%p] Created buffer %u for %s at address %p, size %d",
00152 this_ethread(), m_id, m_owner->get_base_filename(), m_buffer, (int)size);
00153 }
00154
00155 LogBuffer::LogBuffer(LogObject * owner, LogBufferHeader * header):
00156 m_unaligned_buffer(NULL),
00157 m_buffer((char *) header),
00158 m_size(0),
00159 m_buf_align(LB_DEFAULT_ALIGN),
00160 m_write_align(INK_MIN_ALIGN), m_expiration_time(0), m_owner(owner), m_header(header),
00161 m_references(0)
00162 {
00163
00164
00165
00166
00167
00168
00169
00170 m_id = (uint32_t) ink_atomic_increment((pvint32) & M_ID, 1);
00171
00172 Debug("log-logbuffer","[%p] Created repurposed buffer %u for %s at address %p",
00173 this_ethread(), m_id, m_owner->get_base_filename(), m_buffer);
00174 }
00175
00176 LogBuffer::~LogBuffer()
00177 {
00178 Debug("log-logbuffer", "[%p] Deleting buffer %u at address %p",
00179 this_ethread(), m_id, m_unaligned_buffer ? m_unaligned_buffer : m_buffer);
00180
00181 if (m_unaligned_buffer) {
00182 delete [] m_unaligned_buffer;
00183 } else {
00184 delete [] m_buffer;
00185 }
00186
00187 m_buffer = 0;
00188 m_unaligned_buffer = 0;
00189 }
00190
00191
00192
00193
00194
00195 LogBuffer::LB_ResultCode LogBuffer::checkout_write(size_t * write_offset, size_t write_size)
00196 {
00197
00198
00199
00200
00201
00202
00203 ink_assert(m_unaligned_buffer);
00204
00205 LB_ResultCode ret_val = LB_BUSY;
00206 LB_State old_s, new_s;
00207 size_t offset = 0;
00208 size_t actual_write_size = INK_ALIGN(write_size + sizeof(LogEntryHeader), m_write_align);
00209
00210 uint64_t retries = (uint64_t) - 1;
00211 do {
00212 new_s = old_s = m_state;
00213
00214 if (old_s.s.full) {
00215
00216
00217 ret_val = LB_RETRY;
00218 break;
00219 } else {
00220
00221
00222
00223 if (write_offset) {
00224 if (old_s.s.offset + actual_write_size <= m_size) {
00225
00226
00227 offset = old_s.s.offset;
00228
00229 ++new_s.s.num_writers;
00230 new_s.s.offset += actual_write_size;
00231 ++new_s.s.num_entries;
00232
00233 ret_val = LB_OK;
00234 } else {
00235
00236
00237 if (old_s.s.num_entries == 0) {
00238 ret_val = LB_BUFFER_TOO_SMALL;
00239 } else {
00240 new_s.s.full = 1;
00241 ret_val = old_s.s.num_writers ? LB_FULL_ACTIVE_WRITERS : LB_FULL_NO_WRITERS;
00242 }
00243 }
00244 } else {
00245
00246
00247
00248 if (old_s.s.num_entries) {
00249 new_s.s.full = 1;
00250
00251 ret_val = (old_s.s.num_writers ? LB_FULL_ACTIVE_WRITERS : LB_FULL_NO_WRITERS);
00252 } else {
00253
00254
00255 ret_val = LB_OK;
00256 break;
00257 }
00258 }
00259
00260 if (switch_state(old_s, new_s)) {
00261
00262 break;
00263 }
00264 }
00265 ret_val = LB_BUSY;
00266 } while (--retries);
00267
00268
00269
00270
00271 if (write_offset && ret_val == LB_OK) {
00272
00273
00274
00275
00276
00277 LogEntryHeader *entry_header = (LogEntryHeader *) & m_buffer[offset];
00278
00279 struct timeval tp;
00280
00281 ink_gethrtimeofday(&tp, 0);
00282 entry_header->timestamp = tp.tv_sec;
00283 entry_header->timestamp_usec = tp.tv_usec;
00284 entry_header->entry_len = actual_write_size;
00285
00286 *write_offset = offset + sizeof(LogEntryHeader);
00287 }
00288
00289
00290
00291
00292
00293 return ret_val;
00294 }
00295
00296
00297
00298
00299
00300 LogBuffer::LB_ResultCode LogBuffer::checkin_write(size_t write_offset)
00301 {
00302
00303
00304
00305
00306
00307
00308 ink_assert(m_unaligned_buffer);
00309
00310 LB_ResultCode ret_val = LB_OK;
00311 LB_State old_s, new_s;
00312
00313 do {
00314 new_s = old_s = m_state;
00315
00316 ink_assert(write_offset < old_s.s.offset);
00317 ink_assert(old_s.s.num_writers > 0);
00318
00319 if (--new_s.s.num_writers == 0) {
00320 ret_val = (old_s.s.full ? LB_ALL_WRITERS_DONE : LB_OK);
00321 }
00322
00323 } while (!switch_state(old_s, new_s));
00324
00325
00326
00327
00328
00329 return ret_val;
00330 }
00331
00332
00333 unsigned
00334 LogBuffer::add_header_str(const char *str, char *buf_ptr, unsigned buf_len)
00335 {
00336 unsigned len = 0;
00337
00338
00339
00340 if (likely(str && (len = (unsigned) (::strlen(str) + 1)) < buf_len)) {
00341 ink_strlcpy(buf_ptr, str, buf_len);
00342 }
00343 return len;
00344 }
00345
00346
00347 size_t
00348 LogBuffer::_add_buffer_header()
00349 {
00350 size_t header_len;
00351
00352
00353
00354
00355 LogFormat *fmt = m_owner->m_format;
00356 m_header = (LogBufferHeader *) m_buffer;
00357 m_header->cookie = LOG_SEGMENT_COOKIE;
00358 m_header->version = LOG_SEGMENT_VERSION;
00359 m_header->format_type = fmt->type();
00360 m_header->entry_count = 0;
00361 m_header->low_timestamp = LogUtils::timestamp();
00362 m_header->high_timestamp = 0;
00363 m_header->log_object_signature = m_owner->get_signature();
00364 m_header->log_object_flags = m_owner->get_flags();
00365 #if defined(LOG_BUFFER_TRACKING)
00366 m_header->id = lrand48();
00367 #endif // defined(LOG_BUFFER_TRACKING)
00368
00369
00370
00371
00372
00373
00374
00375 header_len = sizeof(LogBufferHeader);
00376
00377 m_header->fmt_name_offset = 0;
00378 m_header->fmt_fieldlist_offset = 0;
00379 m_header->fmt_printf_offset = 0;
00380 m_header->src_hostname_offset = 0;
00381 m_header->log_filename_offset = 0;
00382
00383 if (fmt->name()) {
00384 m_header->fmt_name_offset = header_len;
00385 header_len += add_header_str(fmt->name(), &m_buffer[header_len], m_size - header_len);
00386 }
00387 if (fmt->fieldlist()) {
00388 m_header->fmt_fieldlist_offset = header_len;
00389 header_len += add_header_str(fmt->fieldlist(), &m_buffer[header_len], m_size - header_len);
00390 }
00391 if (fmt->printf_str()) {
00392 m_header->fmt_printf_offset = header_len;
00393 header_len += add_header_str(fmt->printf_str(), &m_buffer[header_len], m_size - header_len);
00394 }
00395 if (Log::config->hostname) {
00396 m_header->src_hostname_offset = header_len;
00397 header_len += add_header_str(Log::config->hostname, &m_buffer[header_len], m_size - header_len);
00398 }
00399 if (m_owner->get_base_filename()) {
00400 m_header->log_filename_offset = header_len;
00401 header_len += add_header_str(m_owner->get_base_filename(), &m_buffer[header_len], m_size - header_len);
00402 }
00403
00404
00405
00406
00407
00408 header_len = INK_ALIGN_DEFAULT(header_len);
00409
00410 m_header->byte_count = header_len;
00411 m_header->data_offset = header_len;
00412
00413 return header_len;
00414 }
00415
00416 void
00417 LogBuffer::update_header_data()
00418 {
00419
00420
00421
00422
00423 if (m_unaligned_buffer) {
00424 m_header->entry_count = m_state.s.num_entries;
00425 m_header->byte_count = m_state.s.offset;
00426 m_header->high_timestamp = LogUtils::timestamp();
00427 }
00428 }
00429
00430
00431
00432
00433
00434
00435
00436 size_t LogBuffer::max_entry_bytes()
00437 {
00438 return (Log::config->log_buffer_size - sizeof(LogBufferHeader));
00439 }
00440
00441
00442
00443
00444 int
00445 LogBuffer::resolve_custom_entry(LogFieldList * fieldlist,
00446 char *printf_str, char *read_from, char *write_to,
00447 int write_to_len, long timestamp, long timestamp_usec,
00448 unsigned buffer_version, LogFieldList * alt_fieldlist, char *alt_printf_str)
00449 {
00450 if (fieldlist == NULL || printf_str == NULL)
00451 return 0;
00452
00453 int *readfrom_map = NULL;
00454
00455 if (alt_fieldlist && alt_printf_str) {
00456 LogField *f, *g;
00457 int n_alt_fields = alt_fieldlist->count();
00458 int i = 0;
00459
00460 readfrom_map = (int *)ats_malloc(n_alt_fields * sizeof(int));
00461 for (f = alt_fieldlist->first(); f; f = alt_fieldlist->next(f)) {
00462 int readfrom_pos = 0;
00463 bool found_match = false;
00464 for (g = fieldlist->first(); g; g = fieldlist->next(g)) {
00465 if (strcmp(f->symbol(), g->symbol()) == 0) {
00466 found_match = true;
00467 readfrom_map[i++] = readfrom_pos;
00468 break;
00469 }
00470
00471
00472 }
00473 if (!found_match) {
00474 Note("Alternate format contains a field (%s) not in the " "format logged", f->symbol());
00475 break;
00476 }
00477 }
00478 }
00479
00480
00481
00482
00483
00484
00485
00486 LogField *field = fieldlist->first();
00487 int printf_len = (int)::strlen(printf_str);
00488 int bytes_written = 0;
00489 int res, i;
00490
00491 const char *buffer_size_exceeded_msg =
00492 "Traffic Server is skipping the current log entry because its size "
00493 "exceeds the maximum line (entry) size for an ascii log buffer";
00494
00495 for (i = 0; i < printf_len; i++) {
00496 if (printf_str[i] == LOG_FIELD_MARKER) {
00497 if (field != NULL) {
00498 char *to = &write_to[bytes_written];
00499
00500
00501
00502
00503 bool non_aggregate_timestamp = false;
00504
00505 if (field->aggregate() == LogField::NO_AGGREGATE) {
00506 char *sym = field->symbol();
00507
00508 if (strcmp(sym, "cqts") == 0) {
00509 char *ptr = (char *) ×tamp;
00510 res = LogAccess::unmarshal_int_to_str(&ptr, to, write_to_len - bytes_written);
00511 if (buffer_version > 1) {
00512
00513 read_from += INK_MIN_ALIGN;
00514 }
00515
00516 non_aggregate_timestamp = true;
00517
00518 } else if (strcmp(sym, "cqth") == 0) {
00519 char *ptr = (char *) ×tamp;
00520 res = LogAccess::unmarshal_int_to_str_hex(&ptr, to, write_to_len - bytes_written);
00521 if (buffer_version > 1) {
00522
00523 read_from += INK_MIN_ALIGN;
00524 }
00525
00526 non_aggregate_timestamp = true;
00527
00528 } else if (strcmp(sym, "cqtq") == 0) {
00529
00530 res = squid_timestamp_to_buf(to, write_to_len - bytes_written, timestamp, timestamp_usec);
00531 if (res < 0)
00532 res = -1;
00533
00534 if (buffer_version > 1) {
00535
00536 read_from += INK_MIN_ALIGN;
00537 }
00538
00539 non_aggregate_timestamp = true;
00540
00541 } else if (strcmp(sym, "cqtn") == 0) {
00542 char *str = LogUtils::timestamp_to_netscape_str(timestamp);
00543 res = (int)::strlen(str);
00544 if (res < write_to_len - bytes_written) {
00545 memcpy(to, str, res);
00546 } else {
00547 res = -1;
00548 }
00549 if (buffer_version > 1) {
00550
00551 read_from += INK_MIN_ALIGN;
00552 }
00553
00554 non_aggregate_timestamp = true;
00555
00556 } else if (strcmp(sym, "cqtd") == 0) {
00557 char *str = LogUtils::timestamp_to_date_str(timestamp);
00558 res = (int)::strlen(str);
00559 if (res < write_to_len - bytes_written) {
00560 memcpy(to, str, res);
00561 } else {
00562 res = -1;
00563 }
00564 if (buffer_version > 1) {
00565
00566 read_from += INK_MIN_ALIGN;
00567 }
00568
00569 non_aggregate_timestamp = true;
00570
00571 } else if (strcmp(sym, "cqtt") == 0) {
00572 char *str = LogUtils::timestamp_to_time_str(timestamp);
00573 res = (int)::strlen(str);
00574 if (res < write_to_len - bytes_written) {
00575 memcpy(to, str, res);
00576 } else {
00577 res = -1;
00578 }
00579 if (buffer_version > 1) {
00580
00581 read_from += INK_MIN_ALIGN;
00582 }
00583
00584 non_aggregate_timestamp = true;
00585 }
00586 }
00587
00588 if (!non_aggregate_timestamp) {
00589 res = field->unmarshal(&read_from, to, write_to_len - bytes_written);
00590 }
00591
00592 if (res < 0) {
00593 Note("%s", buffer_size_exceeded_msg);
00594 bytes_written = 0;
00595 break;
00596 }
00597
00598 bytes_written += res;
00599 field = fieldlist->next(field);
00600 } else {
00601 Note("There are more field markers than fields;" " cannot process log entry");
00602 bytes_written = 0;
00603 break;
00604 }
00605 } else {
00606 if (1 + bytes_written < write_to_len) {
00607 write_to[bytes_written++] = printf_str[i];
00608 } else {
00609 Note("%s", buffer_size_exceeded_msg);
00610 bytes_written = 0;
00611 break;
00612 }
00613 }
00614 }
00615
00616 ats_free(readfrom_map);
00617 return bytes_written;
00618 }
00619
00620
00621
00622
00623
00624
00625
00626
00627 int
00628 LogBuffer::to_ascii(LogEntryHeader * entry, LogFormatType type,
00629 char *buf, int buf_len, const char *symbol_str, char *printf_str,
00630 unsigned buffer_version, const char *alt_format)
00631 {
00632 ink_assert(entry != NULL);
00633 ink_assert(type == LOG_FORMAT_CUSTOM || type == LOG_FORMAT_TEXT);
00634 ink_assert(buf != NULL);
00635
00636 char *read_from;
00637 char *write_to;
00638
00639 read_from = (char *) entry + sizeof(LogEntryHeader);
00640 write_to = buf;
00641
00642 if (type == LOG_FORMAT_TEXT) {
00643
00644
00645
00646
00647 return ink_strlcpy(write_to, read_from, buf_len);
00648 }
00649
00650
00651
00652
00653
00654
00655
00656
00657
00658
00659
00660 int i;
00661 LogFieldList *fieldlist = NULL;
00662 bool delete_fieldlist_p = false;
00663
00664 for (i = 0; i < fieldlist_cache_entries; i++) {
00665 if (strcmp(symbol_str, fieldlist_cache[i].symbol_str) == 0) {
00666 Debug("log-fieldlist", "Fieldlist for %s found in cache, #%d", symbol_str, i);
00667 fieldlist = fieldlist_cache[i].fieldlist;
00668 break;
00669 }
00670 }
00671
00672 if (!fieldlist) {
00673 Debug("log-fieldlist", "Fieldlist for %s not found; creating ...", symbol_str);
00674 fieldlist = new LogFieldList;
00675 ink_assert(fieldlist != NULL);
00676 bool contains_aggregates = false;
00677 LogFormat::parse_symbol_string(symbol_str, fieldlist, &contains_aggregates);
00678
00679 if (fieldlist_cache_entries < FIELDLIST_CACHE_SIZE) {
00680 Debug("log-fieldlist", "Fieldlist cached as entry %d", fieldlist_cache_entries);
00681 fieldlist_cache[fieldlist_cache_entries].fieldlist = fieldlist;
00682 fieldlist_cache[fieldlist_cache_entries].symbol_str = ats_strdup(symbol_str);
00683 fieldlist_cache_entries++;
00684 } else {
00685 delete_fieldlist_p = true;
00686 }
00687 }
00688
00689 LogFieldList *alt_fieldlist = NULL;
00690 char *alt_printf_str = NULL;
00691 char *alt_symbol_str = NULL;
00692 bool bad_alt_format = false;
00693
00694 if (alt_format) {
00695 int n_alt_fields = LogFormat::parse_format_string(alt_format,
00696 &alt_printf_str, &alt_symbol_str);
00697 if (n_alt_fields < 0) {
00698 Note("Error parsing alternate format string: %s", alt_format);
00699 bad_alt_format = true;
00700 }
00701
00702 if (!bad_alt_format) {
00703 alt_fieldlist = new LogFieldList;
00704 bool contains_aggs = false;
00705 int n_alt_fields2 = LogFormat::parse_symbol_string(alt_symbol_str,
00706 alt_fieldlist, &contains_aggs);
00707 if (n_alt_fields2 > 0 && contains_aggs) {
00708 Note("Alternative formats not allowed to contain aggregates");
00709 bad_alt_format = true;;
00710 }
00711 }
00712 }
00713
00714 if (bad_alt_format) {
00715 delete alt_fieldlist;
00716 ats_free(alt_printf_str);
00717 ats_free(alt_symbol_str);
00718 alt_fieldlist = NULL;
00719 alt_printf_str = NULL;
00720 alt_symbol_str = NULL;
00721 }
00722
00723 int ret = resolve_custom_entry(fieldlist, printf_str,
00724 read_from, write_to, buf_len, entry->timestamp,
00725 entry->timestamp_usec, buffer_version,
00726 alt_fieldlist, alt_printf_str);
00727
00728 delete alt_fieldlist;
00729 ats_free(alt_printf_str);
00730 ats_free(alt_symbol_str);
00731 if (delete_fieldlist_p) delete fieldlist;
00732
00733 return ret;
00734 }
00735
00736
00737
00738
00739
00740
00741
00742
00743
00744
00745
00746
00747
00748
00749
00750
00751
00752 LogBufferList::LogBufferList()
00753 {
00754 m_size = 0;
00755 ink_mutex_init(&m_mutex, "LogBufferList");
00756 }
00757
00758
00759
00760
00761
00762 LogBufferList::~LogBufferList()
00763 {
00764 LogBuffer *lb;
00765 while ((lb = get()) != NULL) {
00766 delete lb;
00767 }
00768 m_size = 0;
00769 ink_mutex_destroy(&m_mutex);
00770 }
00771
00772
00773
00774
00775
00776 void
00777 LogBufferList::add(LogBuffer * lb)
00778 {
00779 ink_assert(lb != NULL);
00780
00781 ink_mutex_acquire(&m_mutex);
00782 m_buffer_list.enqueue (lb);
00783 ink_assert(m_size >= 0);
00784 m_size++;
00785 ink_mutex_release(&m_mutex);
00786 }
00787
00788
00789
00790
00791
00792 LogBuffer *
00793 LogBufferList::get()
00794 {
00795 LogBuffer *lb;
00796
00797 ink_mutex_acquire(&m_mutex);
00798 lb = m_buffer_list.dequeue ();
00799 if (lb != NULL) {
00800 m_size--;
00801 ink_assert(m_size >= 0);
00802 }
00803 ink_mutex_release(&m_mutex);
00804 return lb;
00805 }
00806
00807
00808
00809
00810 LogEntryHeader *
00811 LogBufferIterator::next()
00812 {
00813 LogEntryHeader *ret_val = NULL;
00814 LogEntryHeader *entry = (LogEntryHeader *) m_next;
00815
00816 if (entry) {
00817 if (m_iter_entry_count < m_buffer_entry_count) {
00818 m_next += entry->entry_len;
00819 ++m_iter_entry_count;
00820 ret_val = entry;
00821 }
00822 }
00823
00824 return ret_val;
00825 }