00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 #include "libts.h"
00029 
00030 #include <stdio.h>
00031 #include <stdlib.h>
00032 #include <string.h>
00033 
00034 #include "Error.h"
00035 #include "P_EventSystem.h"
00036 #include "LogField.h"
00037 #include "LogFilter.h"
00038 #include "LogFormat.h"
00039 #include "LogUtils.h"
00040 #include "LogFile.h"
00041 #include "LogHost.h"
00042 #include "LogObject.h"
00043 #include "LogAccess.h"
00044 #include "LogConfig.h"
00045 #include "LogBuffer.h"
00046 #include "Log.h"
00047 
00048 
00049 struct FieldListCacheElement
00050 {
00051   LogFieldList *fieldlist;
00052   char *symbol_str;
00053 };
00054 
00055 enum
00056 {
00057   FIELDLIST_CACHE_SIZE = 256
00058 };
00059 
00060 FieldListCacheElement fieldlist_cache[FIELDLIST_CACHE_SIZE];
00061 int fieldlist_cache_entries = 0;
00062 vint32 LogBuffer::M_ID = 0;
00063 
00064 
00065 
00066 
00067 
00068 
00069 char *
00070 LogBufferHeader::fmt_name()
00071 {
00072   char *addr = NULL;
00073   if (fmt_name_offset) {
00074     addr = (char *) this + fmt_name_offset;
00075   }
00076   return addr;
00077 }
00078 
00079 char *
00080 LogBufferHeader::fmt_fieldlist()
00081 {
00082   char *addr = NULL;
00083   if (fmt_fieldlist_offset) {
00084     addr = (char *) this + fmt_fieldlist_offset;
00085   }
00086   return addr;
00087 }
00088 
00089 char *
00090 LogBufferHeader::fmt_printf()
00091 {
00092   char *addr = NULL;
00093   if (fmt_printf_offset) {
00094     addr = (char *) this + fmt_printf_offset;
00095   }
00096   return addr;
00097 }
00098 
00099 char *
00100 LogBufferHeader::src_hostname()
00101 {
00102   char *addr = NULL;
00103   if (src_hostname_offset) {
00104     addr = (char *) this + src_hostname_offset;
00105   }
00106   return addr;
00107 }
00108 
00109 char *
00110 LogBufferHeader::log_filename()
00111 {
00112   char *addr = NULL;
00113   if (log_filename_offset) {
00114     addr = (char *) this + log_filename_offset;
00115   }
00116   return addr;
00117 }
00118 
00119 
00120 
00121 
00122 
00123 
00124 
00125 
00126 
00127 LogBuffer::LogBuffer(LogObject * owner, size_t size, size_t buf_align, size_t write_align):
00128   m_size(size),
00129   m_buf_align(buf_align),
00130   m_write_align(write_align), m_owner(owner),
00131   m_references(0)
00132 {
00133   size_t hdr_size;
00134 
00135   
00136   
00137   m_unaligned_buffer = new char [size + buf_align];
00138   m_buffer = (char *)align_pointer_forward(m_unaligned_buffer, buf_align);
00139 
00140   
00141   hdr_size = _add_buffer_header();
00142 
00143   
00144   m_state.s.offset = hdr_size;
00145 
00146   
00147   m_id = (uint32_t) ink_atomic_increment((pvint32) & M_ID, 1);
00148 
00149   m_expiration_time = LogUtils::timestamp() + Log::config->max_secs_per_buffer;
00150 
00151   Debug("log-logbuffer","[%p] Created buffer %u for %s at address %p, size %d",
00152         this_ethread(), m_id, m_owner->get_base_filename(), m_buffer, (int)size);
00153 }
00154 
00155 LogBuffer::LogBuffer(LogObject * owner, LogBufferHeader * header):
00156   m_unaligned_buffer(NULL),
00157   m_buffer((char *) header),
00158   m_size(0),
00159   m_buf_align(LB_DEFAULT_ALIGN),
00160   m_write_align(INK_MIN_ALIGN), m_expiration_time(0), m_owner(owner), m_header(header),
00161   m_references(0)
00162 {
00163   
00164   
00165   
00166   
00167 
00168   
00169   
00170   m_id = (uint32_t) ink_atomic_increment((pvint32) & M_ID, 1);
00171 
00172   Debug("log-logbuffer","[%p] Created repurposed buffer %u for %s at address %p",
00173         this_ethread(), m_id, m_owner->get_base_filename(), m_buffer);
00174 }
00175 
00176 LogBuffer::~LogBuffer()
00177 {
00178   Debug("log-logbuffer", "[%p] Deleting buffer %u at address %p",
00179         this_ethread(), m_id, m_unaligned_buffer ? m_unaligned_buffer : m_buffer);
00180 
00181   if (m_unaligned_buffer) {
00182     delete [] m_unaligned_buffer;
00183   } else {
00184     delete [] m_buffer;
00185   }
00186 
00187   m_buffer = 0;
00188   m_unaligned_buffer = 0;
00189 }
00190 
00191 
00192 
00193 
00194 
00195 LogBuffer::LB_ResultCode LogBuffer::checkout_write(size_t * write_offset, size_t write_size)
00196 {
00197   
00198   
00199   
00200   
00201   
00202   
00203   ink_assert(m_unaligned_buffer);
00204 
00205   LB_ResultCode ret_val = LB_BUSY;
00206   LB_State old_s, new_s;
00207   size_t offset = 0;
00208   size_t actual_write_size = INK_ALIGN(write_size + sizeof(LogEntryHeader), m_write_align);
00209 
00210   uint64_t retries = (uint64_t) - 1;
00211   do {
00212     new_s = old_s = m_state;
00213 
00214     if (old_s.s.full) {
00215       
00216       
00217       ret_val = LB_RETRY;
00218       break;
00219     } else {
00220       
00221       
00222 
00223       if (write_offset) {
00224         if (old_s.s.offset + actual_write_size <= m_size) {
00225           
00226 
00227           offset = old_s.s.offset;
00228 
00229           ++new_s.s.num_writers;
00230           new_s.s.offset += actual_write_size;
00231           ++new_s.s.num_entries;
00232 
00233           ret_val = LB_OK;
00234         } else {
00235           
00236 
00237           if (old_s.s.num_entries == 0) {
00238             ret_val = LB_BUFFER_TOO_SMALL;
00239           } else {
00240             new_s.s.full = 1;
00241             ret_val = old_s.s.num_writers ? LB_FULL_ACTIVE_WRITERS : LB_FULL_NO_WRITERS;
00242           }
00243         }
00244       } else {
00245         
00246         
00247 
00248         if (old_s.s.num_entries) {
00249           new_s.s.full = 1;
00250 
00251           ret_val = (old_s.s.num_writers ? LB_FULL_ACTIVE_WRITERS : LB_FULL_NO_WRITERS);
00252         } else {
00253           
00254 
00255           ret_val = LB_OK;
00256           break;
00257         }
00258       }
00259 
00260       if (switch_state(old_s, new_s)) {
00261         
00262         break;
00263       }
00264     }
00265     ret_val = LB_BUSY;
00266   } while (--retries);
00267 
00268   
00269   
00270   
00271   if (write_offset && ret_val == LB_OK) {
00272     
00273     
00274     
00275     
00276 
00277     LogEntryHeader *entry_header = (LogEntryHeader *) & m_buffer[offset];
00278     
00279     struct timeval tp;
00280 
00281     ink_gethrtimeofday(&tp, 0);
00282     entry_header->timestamp = tp.tv_sec;
00283     entry_header->timestamp_usec = tp.tv_usec;
00284     entry_header->entry_len = actual_write_size;
00285 
00286     *write_offset = offset + sizeof(LogEntryHeader);
00287   }
00288 
00289 
00290 
00291 
00292 
00293   return ret_val;
00294 }
00295 
00296 
00297 
00298 
00299 
00300 LogBuffer::LB_ResultCode LogBuffer::checkin_write(size_t write_offset)
00301 {
00302   
00303   
00304   
00305   
00306   
00307   
00308   ink_assert(m_unaligned_buffer);
00309 
00310   LB_ResultCode ret_val = LB_OK;
00311   LB_State old_s, new_s;
00312 
00313   do {
00314     new_s = old_s = m_state;
00315 
00316     ink_assert(write_offset < old_s.s.offset);
00317     ink_assert(old_s.s.num_writers > 0);
00318 
00319     if (--new_s.s.num_writers == 0) {
00320       ret_val = (old_s.s.full ? LB_ALL_WRITERS_DONE : LB_OK);
00321     }
00322 
00323   } while (!switch_state(old_s, new_s));
00324 
00325 
00326 
00327 
00328 
00329   return ret_val;
00330 }
00331 
00332 
00333 unsigned
00334 LogBuffer::add_header_str(const char *str, char *buf_ptr, unsigned buf_len)
00335 {
00336   unsigned len = 0;
00337   
00338   
00339   
00340   if (likely(str && (len = (unsigned) (::strlen(str) + 1)) < buf_len)) {
00341     ink_strlcpy(buf_ptr, str, buf_len);
00342   }
00343   return len;
00344 }
00345 
00346 
00347 size_t
00348 LogBuffer::_add_buffer_header()
00349 {
00350   size_t header_len;
00351 
00352   
00353   
00354   
00355   LogFormat *fmt = m_owner->m_format;
00356   m_header = (LogBufferHeader *) m_buffer;
00357   m_header->cookie = LOG_SEGMENT_COOKIE;
00358   m_header->version = LOG_SEGMENT_VERSION;
00359   m_header->format_type = fmt->type();
00360   m_header->entry_count = 0;
00361   m_header->low_timestamp = LogUtils::timestamp();
00362   m_header->high_timestamp = 0;
00363   m_header->log_object_signature = m_owner->get_signature();
00364   m_header->log_object_flags = m_owner->get_flags();
00365 #if defined(LOG_BUFFER_TRACKING)
00366   m_header->id = lrand48();
00367 #endif // defined(LOG_BUFFER_TRACKING)
00368 
00369   
00370   
00371   
00372   
00373   
00374 
00375   header_len = sizeof(LogBufferHeader); 
00376 
00377   m_header->fmt_name_offset = 0;
00378   m_header->fmt_fieldlist_offset = 0;
00379   m_header->fmt_printf_offset = 0;
00380   m_header->src_hostname_offset = 0;
00381   m_header->log_filename_offset = 0;
00382 
00383   if (fmt->name()) {
00384     m_header->fmt_name_offset = header_len;
00385     header_len += add_header_str(fmt->name(), &m_buffer[header_len], m_size - header_len);
00386   }
00387   if (fmt->fieldlist()) {
00388     m_header->fmt_fieldlist_offset = header_len;
00389     header_len += add_header_str(fmt->fieldlist(), &m_buffer[header_len], m_size - header_len);
00390   }
00391   if (fmt->printf_str()) {
00392     m_header->fmt_printf_offset = header_len;
00393     header_len += add_header_str(fmt->printf_str(), &m_buffer[header_len], m_size - header_len);
00394   }
00395   if (Log::config->hostname) {
00396     m_header->src_hostname_offset = header_len;
00397     header_len += add_header_str(Log::config->hostname, &m_buffer[header_len], m_size - header_len);
00398   }
00399   if (m_owner->get_base_filename()) {
00400     m_header->log_filename_offset = header_len;
00401     header_len += add_header_str(m_owner->get_base_filename(), &m_buffer[header_len], m_size - header_len);
00402   }
00403   
00404   
00405   
00406   
00407 
00408   header_len = INK_ALIGN_DEFAULT(header_len);
00409 
00410   m_header->byte_count = header_len;
00411   m_header->data_offset = header_len;
00412 
00413   return header_len;
00414 }
00415 
00416 void
00417 LogBuffer::update_header_data()
00418 {
00419   
00420   
00421   
00422 
00423   if (m_unaligned_buffer) {
00424     m_header->entry_count = m_state.s.num_entries;
00425     m_header->byte_count = m_state.s.offset;
00426     m_header->high_timestamp = LogUtils::timestamp();
00427   }
00428 }
00429 
00430 
00431 
00432 
00433 
00434 
00435 
00436 size_t LogBuffer::max_entry_bytes()
00437 {
00438   return (Log::config->log_buffer_size - sizeof(LogBufferHeader));
00439 }
00440 
00441 
00442 
00443 
00444 int
00445 LogBuffer::resolve_custom_entry(LogFieldList * fieldlist,
00446                                 char *printf_str, char *read_from, char *write_to,
00447                                 int write_to_len, long timestamp, long timestamp_usec,
00448                                 unsigned buffer_version, LogFieldList * alt_fieldlist, char *alt_printf_str)
00449 {
00450   if (fieldlist == NULL || printf_str == NULL)
00451     return 0;
00452 
00453   int *readfrom_map = NULL;
00454 
00455   if (alt_fieldlist && alt_printf_str) {
00456     LogField *f, *g;
00457     int n_alt_fields = alt_fieldlist->count();
00458     int i = 0;
00459 
00460     readfrom_map = (int *)ats_malloc(n_alt_fields * sizeof(int));
00461     for (f = alt_fieldlist->first(); f; f = alt_fieldlist->next(f)) {
00462       int readfrom_pos = 0;
00463       bool found_match = false;
00464       for (g = fieldlist->first(); g; g = fieldlist->next(g)) {
00465         if (strcmp(f->symbol(), g->symbol()) == 0) {
00466           found_match = true;
00467           readfrom_map[i++] = readfrom_pos;
00468           break;
00469         }
00470         
00471         
00472       }
00473       if (!found_match) {
00474         Note("Alternate format contains a field (%s) not in the " "format logged", f->symbol());
00475         break;
00476       }
00477     }
00478   }
00479   
00480   
00481   
00482   
00483   
00484   
00485 
00486   LogField *field = fieldlist->first();
00487   int printf_len = (int)::strlen(printf_str);   
00488   int bytes_written = 0;
00489   int res, i;
00490 
00491   const char *buffer_size_exceeded_msg =
00492     "Traffic Server is skipping the current log entry because its size "
00493     "exceeds the maximum line (entry) size for an ascii log buffer";
00494 
00495   for (i = 0; i < printf_len; i++) {
00496     if (printf_str[i] == LOG_FIELD_MARKER) {
00497       if (field != NULL) {
00498         char *to = &write_to[bytes_written];
00499 
00500         
00501         
00502         
00503         bool non_aggregate_timestamp = false;
00504 
00505         if (field->aggregate() == LogField::NO_AGGREGATE) {
00506           char *sym = field->symbol();
00507 
00508           if (strcmp(sym, "cqts") == 0) {
00509             char *ptr = (char *) ×tamp;
00510             res = LogAccess::unmarshal_int_to_str(&ptr, to, write_to_len - bytes_written);
00511             if (buffer_version > 1) {
00512               
00513               read_from += INK_MIN_ALIGN;
00514             }
00515 
00516             non_aggregate_timestamp = true;
00517 
00518           } else if (strcmp(sym, "cqth") == 0) {
00519             char *ptr = (char *) ×tamp;
00520             res = LogAccess::unmarshal_int_to_str_hex(&ptr, to, write_to_len - bytes_written);
00521             if (buffer_version > 1) {
00522               
00523               read_from += INK_MIN_ALIGN;
00524             }
00525 
00526             non_aggregate_timestamp = true;
00527 
00528           } else if (strcmp(sym, "cqtq") == 0) {
00529             
00530             res = squid_timestamp_to_buf(to, write_to_len - bytes_written, timestamp, timestamp_usec);
00531             if (res < 0)
00532               res = -1;
00533 
00534             if (buffer_version > 1) {
00535               
00536               read_from += INK_MIN_ALIGN;
00537             }
00538 
00539             non_aggregate_timestamp = true;
00540 
00541           } else if (strcmp(sym, "cqtn") == 0) {
00542             char *str = LogUtils::timestamp_to_netscape_str(timestamp);
00543             res = (int)::strlen(str);
00544             if (res < write_to_len - bytes_written) {
00545               memcpy(to, str, res);
00546             } else {
00547               res = -1;
00548             }
00549             if (buffer_version > 1) {
00550               
00551               read_from += INK_MIN_ALIGN;
00552             }
00553 
00554             non_aggregate_timestamp = true;
00555 
00556           } else if (strcmp(sym, "cqtd") == 0) {
00557             char *str = LogUtils::timestamp_to_date_str(timestamp);
00558             res = (int)::strlen(str);
00559             if (res < write_to_len - bytes_written) {
00560               memcpy(to, str, res);
00561             } else {
00562               res = -1;
00563             }
00564             if (buffer_version > 1) {
00565               
00566               read_from += INK_MIN_ALIGN;
00567             }
00568 
00569             non_aggregate_timestamp = true;
00570 
00571           } else if (strcmp(sym, "cqtt") == 0) {
00572             char *str = LogUtils::timestamp_to_time_str(timestamp);
00573             res = (int)::strlen(str);
00574             if (res < write_to_len - bytes_written) {
00575               memcpy(to, str, res);
00576             } else {
00577               res = -1;
00578             }
00579             if (buffer_version > 1) {
00580               
00581               read_from += INK_MIN_ALIGN;
00582             }
00583 
00584             non_aggregate_timestamp = true;
00585           }
00586         }
00587 
00588         if (!non_aggregate_timestamp) {
00589           res = field->unmarshal(&read_from, to, write_to_len - bytes_written);
00590         }
00591 
00592         if (res < 0) {
00593           Note("%s", buffer_size_exceeded_msg);
00594           bytes_written = 0;
00595           break;
00596         }
00597 
00598         bytes_written += res;
00599         field = fieldlist->next(field);
00600       } else {
00601         Note("There are more field markers than fields;" " cannot process log entry");
00602         bytes_written = 0;
00603         break;
00604       }
00605     } else {
00606       if (1 + bytes_written < write_to_len) {
00607         write_to[bytes_written++] = printf_str[i];
00608       } else {
00609         Note("%s", buffer_size_exceeded_msg);
00610         bytes_written = 0;
00611         break;
00612       }
00613     }
00614   }
00615 
00616   ats_free(readfrom_map);
00617   return bytes_written;
00618 }
00619 
00620 
00621 
00622 
00623 
00624 
00625 
00626 
00627 int
00628 LogBuffer::to_ascii(LogEntryHeader * entry, LogFormatType type,
00629                     char *buf, int buf_len, const char *symbol_str, char *printf_str,
00630                     unsigned buffer_version, const char *alt_format)
00631 {
00632   ink_assert(entry != NULL);
00633   ink_assert(type == LOG_FORMAT_CUSTOM || type == LOG_FORMAT_TEXT);
00634   ink_assert(buf != NULL);
00635 
00636   char *read_from;              
00637   char *write_to;               
00638 
00639   read_from = (char *) entry + sizeof(LogEntryHeader);
00640   write_to = buf;
00641 
00642   if (type == LOG_FORMAT_TEXT) {
00643     
00644     
00645     
00646     
00647     return ink_strlcpy(write_to, read_from, buf_len);
00648   }
00649   
00650   
00651   
00652   
00653   
00654   
00655   
00656   
00657   
00658   
00659 
00660   int i;
00661   LogFieldList *fieldlist = NULL;
00662   bool delete_fieldlist_p = false; 
00663 
00664   for (i = 0; i < fieldlist_cache_entries; i++) {
00665     if (strcmp(symbol_str, fieldlist_cache[i].symbol_str) == 0) {
00666       Debug("log-fieldlist", "Fieldlist for %s found in cache, #%d", symbol_str, i);
00667       fieldlist = fieldlist_cache[i].fieldlist;
00668       break;
00669     }
00670   }
00671 
00672   if (!fieldlist) {
00673     Debug("log-fieldlist", "Fieldlist for %s not found; creating ...", symbol_str);
00674     fieldlist = new LogFieldList;
00675     ink_assert(fieldlist != NULL);
00676     bool contains_aggregates = false;
00677     LogFormat::parse_symbol_string(symbol_str, fieldlist, &contains_aggregates);
00678 
00679     if (fieldlist_cache_entries < FIELDLIST_CACHE_SIZE) {
00680       Debug("log-fieldlist", "Fieldlist cached as entry %d", fieldlist_cache_entries);
00681       fieldlist_cache[fieldlist_cache_entries].fieldlist = fieldlist;
00682       fieldlist_cache[fieldlist_cache_entries].symbol_str = ats_strdup(symbol_str);
00683       fieldlist_cache_entries++;
00684     } else {
00685       delete_fieldlist_p = true;
00686     }
00687   }
00688 
00689   LogFieldList *alt_fieldlist = NULL;
00690   char *alt_printf_str = NULL;
00691   char *alt_symbol_str = NULL;
00692   bool bad_alt_format = false;
00693 
00694   if (alt_format) {
00695     int n_alt_fields = LogFormat::parse_format_string(alt_format,
00696                                                       &alt_printf_str, &alt_symbol_str);
00697     if (n_alt_fields < 0) {
00698       Note("Error parsing alternate format string: %s", alt_format);
00699       bad_alt_format = true;
00700     }
00701 
00702     if (!bad_alt_format) {
00703       alt_fieldlist = new LogFieldList;
00704       bool contains_aggs = false;
00705       int n_alt_fields2 = LogFormat::parse_symbol_string(alt_symbol_str,
00706                                                          alt_fieldlist, &contains_aggs);
00707       if (n_alt_fields2 > 0 && contains_aggs) {
00708         Note("Alternative formats not allowed to contain aggregates");
00709         bad_alt_format = true;;
00710       }
00711     }
00712   }
00713 
00714   if (bad_alt_format) {
00715     delete alt_fieldlist;
00716     ats_free(alt_printf_str);
00717     ats_free(alt_symbol_str);
00718     alt_fieldlist = NULL;
00719     alt_printf_str = NULL;
00720     alt_symbol_str = NULL;
00721   }
00722 
00723   int ret = resolve_custom_entry(fieldlist, printf_str,
00724                                  read_from, write_to, buf_len, entry->timestamp,
00725                                  entry->timestamp_usec, buffer_version,
00726                                  alt_fieldlist, alt_printf_str);
00727 
00728   delete alt_fieldlist;
00729   ats_free(alt_printf_str);
00730   ats_free(alt_symbol_str);
00731   if (delete_fieldlist_p) delete fieldlist;
00732 
00733   return ret;
00734 }
00735 
00736 
00737 
00738 
00739 
00740 
00741 
00742 
00743 
00744 
00745 
00746 
00747 
00748 
00749 
00750 
00751 
00752 LogBufferList::LogBufferList()
00753 {
00754   m_size = 0;
00755   ink_mutex_init(&m_mutex, "LogBufferList");
00756 }
00757 
00758 
00759 
00760 
00761 
00762 LogBufferList::~LogBufferList()
00763 {
00764   LogBuffer *lb;
00765   while ((lb = get()) != NULL) {
00766       delete lb;
00767   }
00768   m_size = 0;
00769   ink_mutex_destroy(&m_mutex);
00770 }
00771 
00772 
00773 
00774 
00775 
00776 void
00777 LogBufferList::add(LogBuffer * lb)
00778 {
00779   ink_assert(lb != NULL);
00780 
00781   ink_mutex_acquire(&m_mutex);
00782   m_buffer_list.enqueue (lb);
00783   ink_assert(m_size >= 0);
00784   m_size++;
00785   ink_mutex_release(&m_mutex);
00786 }
00787 
00788 
00789 
00790 
00791 
00792 LogBuffer *
00793 LogBufferList::get()
00794 {
00795   LogBuffer *lb;
00796 
00797   ink_mutex_acquire(&m_mutex);
00798   lb =  m_buffer_list.dequeue ();
00799   if (lb != NULL) {
00800     m_size--;
00801     ink_assert(m_size >= 0);
00802   }
00803   ink_mutex_release(&m_mutex);
00804   return lb;
00805 }
00806 
00807 
00808 
00809 
00810 LogEntryHeader *
00811 LogBufferIterator::next()
00812 {
00813   LogEntryHeader *ret_val = NULL;
00814   LogEntryHeader *entry = (LogEntryHeader *) m_next;
00815 
00816   if (entry) {
00817     if (m_iter_entry_count < m_buffer_entry_count) {
00818       m_next += entry->entry_len;
00819       ++m_iter_entry_count;
00820       ret_val = entry;
00821     }
00822   }
00823 
00824   return ret_val;
00825 }