00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "libts.h"
00036
00037 #include "Error.h"
00038 #include "Main.h"
00039 #include "P_EventSystem.h"
00040 #include "P_Net.h"
00041 #include "I_Machine.h"
00042 #include "HTTP.h"
00043
00044 #include "LogAccess.h"
00045 #include "LogField.h"
00046 #include "LogFilter.h"
00047 #include "LogFormat.h"
00048 #include "LogFile.h"
00049 #include "LogHost.h"
00050 #include "LogObject.h"
00051 #include "LogConfig.h"
00052 #include "LogBuffer.h"
00053 #include "LogUtils.h"
00054 #include "Log.h"
00055 #include "LogSock.h"
00056 #include "SimpleTokenizer.h"
00057
00058 #include "ink_apidefs.h"
00059
00060 #define PERIODIC_TASKS_INTERVAL 5 // TODO: Maybe this should be done as a config option
00061
00062
00063 inkcoreapi LogObject *Log::error_log = NULL;
00064 LogFieldList Log::global_field_list;
00065 LogFormat *Log::global_scrap_format = NULL;
00066 LogObject *Log::global_scrap_object = NULL;
00067 Log::LoggingMode Log::logging_mode = LOG_MODE_NONE;
00068
00069
00070 EventNotify *Log::preproc_notify;
00071 EventNotify *Log::flush_notify;
00072 InkAtomicList *Log::flush_data_list;
00073
00074
00075 EventNotify Log::collate_notify;
00076 ink_thread Log::collate_thread;
00077 int Log::collation_accept_file_descriptor;
00078 int Log::collation_preproc_threads;
00079 int Log::collation_port;
00080
00081
00082 int Log::init_status = 0;
00083 int Log::config_flags = 0;
00084 bool Log::logging_mode_changed = false;
00085
00086
00087 InkHashTable *Log::field_symbol_hash = 0;
00088
00089 RecRawStatBlock *log_rsb;
00090
00091
00092
00093
00094
00095
00096
00097
00098 LogConfig *Log::config = NULL;
00099 static unsigned log_configid = 0;
00100
00101 void
00102 Log::change_configuration()
00103 {
00104 LogConfig * prev = Log::config;
00105 LogConfig * new_config = NULL;
00106
00107 Debug("log-config", "Changing configuration ...");
00108
00109 new_config = new LogConfig;
00110 ink_assert(new_config != NULL);
00111 new_config->read_configuration_variables();
00112
00113
00114
00115
00116 ink_mutex_acquire(prev->log_object_manager._APImutex);
00117 Debug("log-api-mutex", "Log::change_configuration acquired api mutex");
00118
00119 new_config->init(Log::config);
00120
00121
00122 ink_atomic_swap(&Log::config, new_config);
00123
00124
00125
00126
00127
00128
00129
00130 ink_mutex_release(prev->log_object_manager._APImutex);
00131 Debug("log-api-mutex", "Log::change_configuration released api mutex");
00132
00133
00134
00135
00136 configProcessor.set(log_configid, new_config);
00137
00138
00139
00140 prev->log_object_manager.flush_all_objects();
00141
00142 Debug("log-config", "... new configuration in place");
00143 }
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167 struct PeriodicWakeup;
00168 typedef int (PeriodicWakeup::*PeriodicWakeupHandler)(int, void *);
00169 struct PeriodicWakeup : Continuation
00170 {
00171 int m_preproc_threads;
00172 int m_flush_threads;
00173
00174 int wakeup (int , Event * )
00175 {
00176 for (int i = 0; i < m_preproc_threads; i++) {
00177 Log::preproc_notify[i].signal();
00178 }
00179 for (int i = 0; i < m_flush_threads; i++) {
00180 Log::flush_notify[i].signal();
00181 }
00182 return EVENT_CONT;
00183 }
00184
00185 PeriodicWakeup (int preproc_threads, int flush_threads) :
00186 Continuation (new_ProxyMutex()),
00187 m_preproc_threads(preproc_threads),
00188 m_flush_threads(flush_threads)
00189 {
00190 SET_HANDLER ((PeriodicWakeupHandler)&PeriodicWakeup::wakeup);
00191 }
00192 };
00193
00194
00195
00196
00197
00198
00199
00200
00201 void
00202 Log::periodic_tasks(long time_now)
00203 {
00204 Debug("log-api-mutex", "entering Log::periodic_tasks");
00205
00206 if (logging_mode_changed || Log::config->reconfiguration_needed) {
00207 Debug("log-config", "Performing reconfiguration, init status = %d", init_status);
00208
00209 if (logging_mode_changed) {
00210 int val = (int) REC_ConfigReadInteger("proxy.config.log.logging_enabled");
00211
00212 if (val<LOG_MODE_NONE || val> LOG_MODE_FULL) {
00213 logging_mode = LOG_MODE_FULL;
00214 Warning("proxy.config.log.logging_enabled has an invalid " "value setting it to %d", logging_mode);
00215 } else {
00216 logging_mode = (LoggingMode) val;
00217 }
00218 logging_mode_changed = false;
00219 }
00220
00221
00222
00223 change_configuration();
00224 } else if (logging_mode > LOG_MODE_NONE || config->collation_mode == Log::COLLATION_HOST ||
00225 config->has_api_objects()) {
00226 Debug("log-periodic", "Performing periodic tasks");
00227
00228
00229
00230 if (config->space_is_short() || time_now % config->space_used_frequency == 0) {
00231 Log::config->update_space_used();
00232 }
00233
00234
00235
00236 Log::config->log_object_manager.check_buffer_expiration(time_now);
00237
00238
00239
00240
00241 if (Log::config->roll_log_files_now) {
00242 if (error_log) {
00243 error_log->roll_files(time_now);
00244 }
00245 if (global_scrap_object) {
00246 global_scrap_object->roll_files(time_now);
00247 }
00248 Log::config->log_object_manager.roll_files(time_now);
00249 Log::config->roll_log_files_now = false;
00250 } else {
00251 if (error_log) {
00252 error_log->roll_files(time_now);
00253 }
00254 if (global_scrap_object) {
00255 global_scrap_object->roll_files(time_now);
00256 }
00257 Log::config->log_object_manager.roll_files(time_now);
00258 }
00259
00260 }
00261 }
00262
00263
00264
00265
00266 struct LoggingPreprocContinuation: public Continuation
00267 {
00268 int m_idx;
00269
00270 int mainEvent(int , void * )
00271 {
00272 Log::preproc_thread_main((void *)&m_idx);
00273 return 0;
00274 }
00275
00276 LoggingPreprocContinuation(int idx):Continuation(NULL), m_idx(idx)
00277 {
00278 SET_HANDLER(&LoggingPreprocContinuation::mainEvent);
00279 }
00280 };
00281
00282 struct LoggingFlushContinuation: public Continuation
00283 {
00284 int m_idx;
00285
00286 int mainEvent(int , void * )
00287 {
00288 Log::flush_thread_main((void *)&m_idx);
00289 return 0;
00290 }
00291
00292 LoggingFlushContinuation(int idx):Continuation(NULL), m_idx(idx)
00293 {
00294 SET_HANDLER(&LoggingFlushContinuation::mainEvent);
00295 }
00296 };
00297
00298 struct LoggingCollateContinuation: public Continuation
00299 {
00300 int mainEvent(int , void * )
00301 {
00302 Log::collate_thread_main(NULL);
00303 return 0;
00304 }
00305
00306 LoggingCollateContinuation():Continuation(NULL)
00307 {
00308 SET_HANDLER(&LoggingCollateContinuation::mainEvent);
00309 }
00310 };
00311
00312
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323 void
00324 Log::init_fields()
00325 {
00326 if (init_status & FIELDS_INITIALIZED)
00327 return;
00328
00329 LogField *field;
00330
00331
00332
00333
00334
00335 field_symbol_hash = ink_hash_table_create(InkHashTableKeyType_String);
00336
00337
00338 field =new LogField("client_host_ip", "chi",
00339 LogField::IP,
00340 &LogAccess::marshal_client_host_ip,
00341 &LogAccess::unmarshal_ip_to_str);
00342 global_field_list.add(field, false);
00343 ink_hash_table_insert(field_symbol_hash, "chi", field);
00344
00345 field = new LogField("client_host_port", "chp",
00346 LogField::sINT,
00347 &LogAccess::marshal_client_host_port,
00348 &LogAccess::unmarshal_int_to_str);
00349 global_field_list.add(field, false);
00350 ink_hash_table_insert(field_symbol_hash, "chp", field);
00351
00352 field = new LogField("client_host_ip_hex", "chih",
00353 LogField::IP,
00354 &LogAccess::marshal_client_host_ip,
00355 &LogAccess::unmarshal_ip_to_hex);
00356 global_field_list.add(field, false);
00357 ink_hash_table_insert(field_symbol_hash, "chih", field);
00358
00359 field = new LogField ("client_auth_user_name", "caun",
00360 LogField::STRING,
00361 &LogAccess::marshal_client_auth_user_name,
00362 (LogField::UnmarshalFunc)&LogAccess::unmarshal_str);
00363 global_field_list.add (field, false);
00364 ink_hash_table_insert (field_symbol_hash, "caun", field);
00365
00366 field = new LogField("plugin_identity_id", "piid",
00367 LogField::sINT,
00368 &LogAccess::marshal_plugin_identity_id,
00369 reinterpret_cast<LogField::UnmarshalFunc>(&LogAccess::unmarshal_int_to_str));
00370 global_field_list.add(field, false);
00371 ink_hash_table_insert(field_symbol_hash, "piid", field);
00372
00373 field = new LogField("plugin_identity_tag", "pitag",
00374 LogField::STRING,
00375 &LogAccess::marshal_plugin_identity_tag,
00376 reinterpret_cast<LogField::UnmarshalFunc>(&LogAccess::unmarshal_str));
00377 global_field_list.add(field, false);
00378 ink_hash_table_insert(field_symbol_hash, "pitag", field);
00379
00380 field = new LogField("client_req_timestamp_sec", "cqts",
00381 LogField::sINT,
00382 &LogAccess::marshal_client_req_timestamp_sec,
00383 &LogAccess::unmarshal_int_to_str);
00384 global_field_list.add(field, false);
00385 ink_hash_table_insert(field_symbol_hash, "cqts", field);
00386
00387 field = new LogField("client_req_timestamp_hex_sec", "cqth",
00388 LogField::sINT,
00389 &LogAccess::marshal_client_req_timestamp_sec,
00390 &LogAccess::unmarshal_int_to_str_hex);
00391 global_field_list.add(field, false);
00392 ink_hash_table_insert(field_symbol_hash, "cqth", field);
00393
00394 field = new LogField("client_req_timestamp_squid", "cqtq",
00395 LogField::sINT,
00396 &LogAccess::marshal_client_req_timestamp_sec,
00397 &LogAccess::unmarshal_int_to_str);
00398 global_field_list.add(field, false);
00399 ink_hash_table_insert(field_symbol_hash, "cqtq", field);
00400
00401 field = new LogField("client_req_timestamp_netscape", "cqtn",
00402 LogField::sINT,
00403 &LogAccess::marshal_client_req_timestamp_sec,
00404 &LogAccess::unmarshal_int_to_str);
00405 global_field_list.add(field, false);
00406 ink_hash_table_insert(field_symbol_hash, "cqtn", field);
00407
00408 field = new LogField("client_req_timestamp_date", "cqtd",
00409 LogField::sINT,
00410 &LogAccess::marshal_client_req_timestamp_sec,
00411 &LogAccess::unmarshal_int_to_str);
00412 global_field_list.add(field, false);
00413 ink_hash_table_insert(field_symbol_hash, "cqtd", field);
00414
00415 field = new LogField("client_req_timestamp_time", "cqtt",
00416 LogField::sINT,
00417 &LogAccess::marshal_client_req_timestamp_sec,
00418 &LogAccess::unmarshal_int_to_str);
00419 global_field_list.add(field, false);
00420 ink_hash_table_insert(field_symbol_hash, "cqtt", field);
00421
00422 field = new LogField("client_req_text", "cqtx",
00423 LogField::STRING,
00424 &LogAccess::marshal_client_req_text,
00425 (LogField::UnmarshalFunc)&LogAccess::unmarshal_http_text);
00426 global_field_list.add(field, false);
00427 ink_hash_table_insert(field_symbol_hash, "cqtx", field);
00428
00429 field = new LogField("client_req_http_method", "cqhm",
00430 LogField::STRING,
00431 &LogAccess::marshal_client_req_http_method,
00432 (LogField::UnmarshalFunc)&LogAccess::unmarshal_str);
00433 global_field_list.add(field, false);
00434 ink_hash_table_insert(field_symbol_hash, "cqhm", field);
00435
00436 field = new LogField("client_req_url", "cqu",
00437 LogField::STRING,
00438 &LogAccess::marshal_client_req_url,
00439 (LogField::UnmarshalFunc)&LogAccess::unmarshal_str,
00440 &LogAccess::set_client_req_url);
00441 global_field_list.add(field, false);
00442 ink_hash_table_insert(field_symbol_hash, "cqu", field);
00443
00444 field = new LogField("client_req_url_canonical", "cquc",
00445 LogField::STRING,
00446 &LogAccess::marshal_client_req_url_canon,
00447 (LogField::UnmarshalFunc)&LogAccess::unmarshal_str,
00448 &LogAccess::set_client_req_url_canon);
00449 global_field_list.add(field, false);
00450 ink_hash_table_insert(field_symbol_hash, "cquc", field);
00451
00452 field = new LogField("client_req_unmapped_url_canonical", "cquuc",
00453 LogField::STRING,
00454 &LogAccess::marshal_client_req_unmapped_url_canon,
00455 (LogField::UnmarshalFunc)&LogAccess::unmarshal_str,
00456 &LogAccess::set_client_req_unmapped_url_canon);
00457 global_field_list.add(field, false);
00458 ink_hash_table_insert(field_symbol_hash, "cquuc", field);
00459
00460 field = new LogField("client_req_unmapped_url_path", "cquup",
00461 LogField::STRING,
00462 &LogAccess::marshal_client_req_unmapped_url_path,
00463 (LogField::UnmarshalFunc)&LogAccess::unmarshal_str,
00464 &LogAccess::set_client_req_unmapped_url_path);
00465 global_field_list.add(field, false);
00466 ink_hash_table_insert(field_symbol_hash, "cquup", field);
00467
00468 field = new LogField("client_req_unmapped_url_host", "cquuh",
00469 LogField::STRING,
00470 &LogAccess::marshal_client_req_unmapped_url_host,
00471 (LogField::UnmarshalFunc)&LogAccess::unmarshal_str,
00472 &LogAccess::set_client_req_unmapped_url_host);
00473 global_field_list.add(field, false);
00474 ink_hash_table_insert(field_symbol_hash, "cquuh", field);
00475
00476 field = new LogField("client_req_url_scheme", "cqus",
00477 LogField::STRING,
00478 &LogAccess::marshal_client_req_url_scheme,
00479 (LogField::UnmarshalFunc)&LogAccess::unmarshal_str);
00480 global_field_list.add(field, false);
00481 ink_hash_table_insert(field_symbol_hash, "cqus", field);
00482
00483 field = new LogField("client_req_url_path", "cqup",
00484 LogField::STRING,
00485 &LogAccess::marshal_client_req_url_path,
00486 (LogField::UnmarshalFunc)&LogAccess::unmarshal_str,
00487 &LogAccess::set_client_req_url_path);
00488 global_field_list.add(field, false);
00489 ink_hash_table_insert(field_symbol_hash, "cqup", field);
00490
00491 field = new LogField("client_req_http_version", "cqhv",
00492 LogField::dINT,
00493 &LogAccess::marshal_client_req_http_version,
00494 &LogAccess::unmarshal_http_version);
00495 global_field_list.add(field, false);
00496 ink_hash_table_insert(field_symbol_hash, "cqhv", field);
00497
00498 field = new LogField("client_req_header_len", "cqhl",
00499 LogField::sINT,
00500 &LogAccess::marshal_client_req_header_len,
00501 &LogAccess::unmarshal_int_to_str);
00502 global_field_list.add(field, false);
00503 ink_hash_table_insert(field_symbol_hash, "cqhl", field);
00504
00505 field = new LogField("client_req_body_len", "cqbl",
00506 LogField::sINT,
00507 &LogAccess::marshal_client_req_body_len,
00508 &LogAccess::unmarshal_int_to_str);
00509 global_field_list.add(field, false);
00510 ink_hash_table_insert(field_symbol_hash, "cqbl", field);
00511
00512 Ptr<LogFieldAliasTable> finish_status_map = make_ptr(new LogFieldAliasTable);
00513 finish_status_map->init(N_LOG_FINISH_CODE_TYPES,
00514 LOG_FINISH_FIN, "FIN",
00515 LOG_FINISH_INTR, "INTR",
00516 LOG_FINISH_TIMEOUT, "TIMEOUT");
00517
00518 field = new LogField("client_finish_status_code", "cfsc",
00519 LogField::sINT,
00520 &LogAccess::marshal_client_finish_status_code,
00521 &LogAccess::unmarshal_finish_status,
00522 (Ptr<LogFieldAliasMap>) finish_status_map);
00523 global_field_list.add(field, false);
00524 ink_hash_table_insert(field_symbol_hash, "cfsc", field);
00525
00526
00527 field = new LogField("proxy_resp_content_type", "psct",
00528 LogField::STRING,
00529 &LogAccess::marshal_proxy_resp_content_type,
00530 (LogField::UnmarshalFunc)&LogAccess::unmarshal_str);
00531 global_field_list.add(field, false);
00532 ink_hash_table_insert(field_symbol_hash, "psct", field);
00533
00534 field = new LogField("proxy_resp_squid_len", "psql",
00535 LogField::sINT,
00536 &LogAccess::marshal_proxy_resp_squid_len,
00537 &LogAccess::unmarshal_int_to_str);
00538 global_field_list.add(field, false);
00539 ink_hash_table_insert(field_symbol_hash, "psql", field);
00540
00541 field = new LogField("proxy_resp_content_len", "pscl",
00542 LogField::sINT,
00543 &LogAccess::marshal_proxy_resp_content_len,
00544 &LogAccess::unmarshal_int_to_str);
00545 global_field_list.add(field, false);
00546 ink_hash_table_insert(field_symbol_hash, "pscl", field);
00547
00548 field = new LogField("proxy_resp_content_len_hex", "psch",
00549 LogField::sINT,
00550 &LogAccess::marshal_proxy_resp_content_len,
00551 &LogAccess::unmarshal_int_to_str_hex);
00552 global_field_list.add(field, false);
00553 ink_hash_table_insert(field_symbol_hash, "psch", field);
00554
00555 field = new LogField("proxy_resp_status_code", "pssc",
00556 LogField::sINT,
00557 &LogAccess::marshal_proxy_resp_status_code,
00558 &LogAccess::unmarshal_http_status);
00559 global_field_list.add(field, false);
00560 ink_hash_table_insert(field_symbol_hash, "pssc", field);
00561
00562 field = new LogField("proxy_resp_header_len", "pshl",
00563 LogField::sINT,
00564 &LogAccess::marshal_proxy_resp_header_len,
00565 &LogAccess::unmarshal_int_to_str);
00566 global_field_list.add(field, false);
00567 ink_hash_table_insert(field_symbol_hash, "pshl", field);
00568
00569 field = new LogField("proxy_finish_status_code", "pfsc",
00570 LogField::sINT,
00571 &LogAccess::marshal_proxy_finish_status_code,
00572 &LogAccess::unmarshal_finish_status,
00573 (Ptr<LogFieldAliasMap>) finish_status_map);
00574 global_field_list.add(field, false);
00575 ink_hash_table_insert(field_symbol_hash, "pfsc", field);
00576
00577 Ptr<LogFieldAliasTable> cache_code_map = make_ptr(new LogFieldAliasTable);
00578 cache_code_map->init(49,
00579 SQUID_LOG_EMPTY, "UNDEFINED",
00580 SQUID_LOG_TCP_HIT, "TCP_HIT",
00581 SQUID_LOG_TCP_DISK_HIT, "TCP_DISK_HIT",
00582 SQUID_LOG_TCP_MEM_HIT, "TCP_MEM_HIT",
00583 SQUID_LOG_TCP_MISS, "TCP_MISS",
00584 SQUID_LOG_TCP_EXPIRED_MISS, "TCP_EXPIRED_MISS",
00585 SQUID_LOG_TCP_REFRESH_HIT, "TCP_REFRESH_HIT",
00586 SQUID_LOG_TCP_REF_FAIL_HIT, "TCP_REFRESH_FAIL_HIT",
00587 SQUID_LOG_TCP_REFRESH_MISS, "TCP_REFRESH_MISS",
00588 SQUID_LOG_TCP_CLIENT_REFRESH, "TCP_CLIENT_REFRESH_MISS",
00589 SQUID_LOG_TCP_IMS_HIT, "TCP_IMS_HIT",
00590 SQUID_LOG_TCP_IMS_MISS, "TCP_IMS_MISS",
00591 SQUID_LOG_TCP_SWAPFAIL, "TCP_SWAPFAIL_MISS",
00592 SQUID_LOG_TCP_DENIED, "TCP_DENIED",
00593 SQUID_LOG_TCP_WEBFETCH_MISS, "TCP_WEBFETCH_MISS",
00594 SQUID_LOG_TCP_FUTURE_2, "TCP_FUTURE_2",
00595 SQUID_LOG_TCP_HIT_REDIRECT, "TCP_HIT_REDIRECT",
00596 SQUID_LOG_TCP_MISS_REDIRECT, "TCP_MISS_REDIRECT",
00597 SQUID_LOG_TCP_HIT_X_REDIRECT, "TCP_HIT_X_REDIRECT",
00598 SQUID_LOG_TCP_MISS_X_REDIRECT, "TCP_MISS_X_REDIRECT",
00599 SQUID_LOG_UDP_HIT, "UDP_HIT",
00600 SQUID_LOG_UDP_WEAK_HIT, "UDP_WEAK_HIT",
00601 SQUID_LOG_UDP_HIT_OBJ, "UDP_HIT_OBJ",
00602 SQUID_LOG_UDP_MISS, "UDP_MISS",
00603 SQUID_LOG_UDP_DENIED, "UDP_DENIED",
00604 SQUID_LOG_UDP_INVALID, "UDP_INVALID",
00605 SQUID_LOG_UDP_RELOADING, "UDP_RELOADING",
00606 SQUID_LOG_UDP_FUTURE_1, "UDP_FUTURE_1",
00607 SQUID_LOG_UDP_FUTURE_2, "UDP_FUTURE_2",
00608 SQUID_LOG_ERR_READ_TIMEOUT, "ERR_READ_TIMEOUT",
00609 SQUID_LOG_ERR_LIFETIME_EXP, "ERR_LIFETIME_EXP",
00610 SQUID_LOG_ERR_NO_CLIENTS_BIG_OBJ, "ERR_NO_CLIENTS_BIG_OBJ",
00611 SQUID_LOG_ERR_READ_ERROR, "ERR_READ_ERROR",
00612 SQUID_LOG_ERR_CLIENT_ABORT, "ERR_CLIENT_ABORT",
00613 SQUID_LOG_ERR_CONNECT_FAIL, "ERR_CONNECT_FAIL",
00614 SQUID_LOG_ERR_INVALID_REQ, "ERR_INVALID_REQ",
00615 SQUID_LOG_ERR_UNSUP_REQ, "ERR_UNSUP_REQ",
00616 SQUID_LOG_ERR_INVALID_URL, "ERR_INVALID_URL",
00617 SQUID_LOG_ERR_NO_FDS, "ERR_NO_FDS",
00618 SQUID_LOG_ERR_DNS_FAIL, "ERR_DNS_FAIL",
00619 SQUID_LOG_ERR_NOT_IMPLEMENTED, "ERR_NOT_IMPLEMENTED",
00620 SQUID_LOG_ERR_CANNOT_FETCH, "ERR_CANNOT_FETCH",
00621 SQUID_LOG_ERR_NO_RELAY, "ERR_NO_RELAY",
00622 SQUID_LOG_ERR_DISK_IO, "ERR_DISK_IO",
00623 SQUID_LOG_ERR_ZERO_SIZE_OBJECT, "ERR_ZERO_SIZE_OBJECT",
00624 SQUID_LOG_ERR_PROXY_DENIED, "ERR_PROXY_DENIED",
00625 SQUID_LOG_ERR_WEBFETCH_DETECTED, "ERR_WEBFETCH_DETECTED",
00626 SQUID_LOG_ERR_FUTURE_1, "ERR_FUTURE_1",
00627 SQUID_LOG_ERR_UNKNOWN, "ERR_UNKNOWN");
00628
00629 field = new LogField("cache_result_code", "crc",
00630 LogField::sINT,
00631 &LogAccess::marshal_cache_result_code,
00632 &LogAccess::unmarshal_cache_code,
00633 (Ptr<LogFieldAliasMap>) cache_code_map);
00634 global_field_list.add(field, false);
00635 ink_hash_table_insert(field_symbol_hash, "crc", field);
00636
00637
00638 field = new LogField("proxy_req_header_len", "pqhl",
00639 LogField::sINT,
00640 &LogAccess::marshal_proxy_req_header_len,
00641 &LogAccess::unmarshal_int_to_str);
00642 global_field_list.add(field, false);
00643 ink_hash_table_insert(field_symbol_hash, "pqhl", field);
00644
00645 field = new LogField("proxy_req_body_len", "pqbl",
00646 LogField::sINT,
00647 &LogAccess::marshal_proxy_req_body_len,
00648 &LogAccess::unmarshal_int_to_str);
00649 global_field_list.add(field, false);
00650 ink_hash_table_insert(field_symbol_hash, "pqbl", field);
00651
00652 field = new LogField("proxy_req_server_name", "pqsn",
00653 LogField::STRING,
00654 &LogAccess::marshal_proxy_req_server_name,
00655 (LogField::UnmarshalFunc)&LogAccess::unmarshal_str);
00656 global_field_list.add(field, false);
00657 ink_hash_table_insert(field_symbol_hash, "pqsn", field);
00658
00659 field = new LogField("proxy_req_server_ip", "pqsi",
00660 LogField::IP,
00661 &LogAccess::marshal_proxy_req_server_ip,
00662 &LogAccess::unmarshal_ip_to_str);
00663 global_field_list.add(field, false);
00664 ink_hash_table_insert(field_symbol_hash, "pqsi", field);
00665
00666 Ptr<LogFieldAliasTable> hierarchy_map = make_ptr(new LogFieldAliasTable);
00667 hierarchy_map->init(36,
00668 SQUID_HIER_EMPTY, "EMPTY",
00669 SQUID_HIER_NONE, "NONE",
00670 SQUID_HIER_DIRECT, "DIRECT",
00671 SQUID_HIER_SIBLING_HIT, "SIBLING_HIT",
00672 SQUID_HIER_PARENT_HIT, "PARENT_HIT",
00673 SQUID_HIER_DEFAULT_PARENT, "DEFAULT_PARENT",
00674 SQUID_HIER_SINGLE_PARENT, "SINGLE_PARENT",
00675 SQUID_HIER_FIRST_UP_PARENT, "FIRST_UP_PARENT",
00676 SQUID_HIER_NO_PARENT_DIRECT, "NO_PARENT_DIRECT",
00677 SQUID_HIER_FIRST_PARENT_MISS, "FIRST_PARENT_MISS",
00678 SQUID_HIER_LOCAL_IP_DIRECT, "LOCAL_IP_DIRECT",
00679 SQUID_HIER_FIREWALL_IP_DIRECT, "FIREWALL_IP_DIRECT",
00680 SQUID_HIER_NO_DIRECT_FAIL, "NO_DIRECT_FAIL",
00681 SQUID_HIER_SOURCE_FASTEST, "SOURCE_FASTEST",
00682 SQUID_HIER_SIBLING_UDP_HIT_OBJ, "SIBLING_UDP_HIT_OBJ",
00683 SQUID_HIER_PARENT_UDP_HIT_OBJ, "PARENT_UDP_HIT_OBJ",
00684 SQUID_HIER_PASSTHROUGH_PARENT, "PASSTHROUGH_PARENT",
00685 SQUID_HIER_SSL_PARENT_MISS, "SSL_PARENT_MISS",
00686 SQUID_HIER_INVALID_CODE, "INVALID_CODE",
00687 SQUID_HIER_TIMEOUT_DIRECT, "TIMEOUT_DIRECT",
00688 SQUID_HIER_TIMEOUT_SIBLING_HIT, "TIMEOUT_SIBLING_HIT",
00689 SQUID_HIER_TIMEOUT_PARENT_HIT, "TIMEOUT_PARENT_HIT",
00690 SQUID_HIER_TIMEOUT_DEFAULT_PARENT, "TIMEOUT_DEFAULT_PARENT",
00691 SQUID_HIER_TIMEOUT_SINGLE_PARENT, "TIMEOUT_SINGLE_PARENT",
00692 SQUID_HIER_TIMEOUT_FIRST_UP_PARENT, "TIMEOUT_FIRST_UP_PARENT",
00693 SQUID_HIER_TIMEOUT_NO_PARENT_DIRECT, "TIMEOUT_NO_PARENT_DIRECT",
00694 SQUID_HIER_TIMEOUT_FIRST_PARENT_MISS, "TIMEOUT_FIRST_PARENT_MISS",
00695 SQUID_HIER_TIMEOUT_LOCAL_IP_DIRECT, "TIMEOUT_LOCAL_IP_DIRECT",
00696 SQUID_HIER_TIMEOUT_FIREWALL_IP_DIRECT, "TIMEOUT_FIREWALL_IP_DIRECT",
00697 SQUID_HIER_TIMEOUT_NO_DIRECT_FAIL, "TIMEOUT_NO_DIRECT_FAIL",
00698 SQUID_HIER_TIMEOUT_SOURCE_FASTEST, "TIMEOUT_SOURCE_FASTEST",
00699 SQUID_HIER_TIMEOUT_SIBLING_UDP_HIT_OBJ, "TIMEOUT_SIBLING_UDP_HIT_OBJ",
00700 SQUID_HIER_TIMEOUT_PARENT_UDP_HIT_OBJ, "TIMEOUT_PARENT_UDP_HIT_OBJ",
00701 SQUID_HIER_TIMEOUT_PASSTHROUGH_PARENT, "TIMEOUT_PASSTHROUGH_PARENT",
00702 SQUID_HIER_TIMEOUT_TIMEOUT_SSL_PARENT_MISS, "TIMEOUT_TIMEOUT_SSL_PARENT_MISS",
00703 SQUID_HIER_INVALID_ASSIGNED_CODE, "INVALID_ASSIGNED_CODE");
00704
00705 field = new LogField("proxy_hierarchy_route", "phr",
00706 LogField::sINT,
00707 &LogAccess::marshal_proxy_hierarchy_route,
00708 &LogAccess::unmarshal_hierarchy,
00709 (Ptr<LogFieldAliasMap>) hierarchy_map);
00710 global_field_list.add(field, false);
00711 ink_hash_table_insert(field_symbol_hash, "phr", field);
00712
00713 field = new LogField("proxy_host_name", "phn",
00714 LogField::STRING,
00715 &LogAccess::marshal_proxy_host_name,
00716 (LogField::UnmarshalFunc)&LogAccess::unmarshal_str);
00717 global_field_list.add(field, false);
00718 ink_hash_table_insert(field_symbol_hash, "phn", field);
00719
00720 field = new LogField("proxy_host_ip", "phi",
00721 LogField::IP,
00722 &LogAccess::marshal_proxy_host_ip,
00723 &LogAccess::unmarshal_ip_to_str);
00724 global_field_list.add(field, false);
00725 ink_hash_table_insert(field_symbol_hash, "phi", field);
00726
00727
00728
00729 field = new LogField("server_host_ip", "shi",
00730 LogField::IP,
00731 &LogAccess::marshal_server_host_ip,
00732 &LogAccess::unmarshal_ip_to_str);
00733
00734 global_field_list.add(field, false);
00735 ink_hash_table_insert(field_symbol_hash, "shi", field);
00736
00737 field = new LogField("server_host_name", "shn",
00738 LogField::STRING,
00739 &LogAccess::marshal_server_host_name,
00740 (LogField::UnmarshalFunc)&LogAccess::unmarshal_str);
00741 global_field_list.add(field, false);
00742 ink_hash_table_insert(field_symbol_hash, "shn", field);
00743
00744 field = new LogField("server_resp_status_code", "sssc",
00745 LogField::sINT,
00746 &LogAccess::marshal_server_resp_status_code,
00747 &LogAccess::unmarshal_http_status);
00748 global_field_list.add(field, false);
00749 ink_hash_table_insert(field_symbol_hash, "sssc", field);
00750
00751 field = new LogField("server_resp_content_len", "sscl",
00752 LogField::sINT,
00753 &LogAccess::marshal_server_resp_content_len,
00754 &LogAccess::unmarshal_int_to_str);
00755 global_field_list.add(field, false);
00756 ink_hash_table_insert(field_symbol_hash, "sscl", field);
00757
00758 field = new LogField("server_resp_header_len", "sshl",
00759 LogField::sINT,
00760 &LogAccess::marshal_server_resp_header_len,
00761 &LogAccess::unmarshal_int_to_str);
00762 global_field_list.add(field, false);
00763 ink_hash_table_insert(field_symbol_hash, "sshl", field);
00764
00765 field = new LogField("server_resp_http_version", "sshv",
00766 LogField::dINT,
00767 &LogAccess::marshal_server_resp_http_version,
00768 &LogAccess::unmarshal_http_version);
00769 global_field_list.add(field, false);
00770 ink_hash_table_insert(field_symbol_hash, "sshv", field);
00771
00772 field = new LogField("cached_resp_status_code", "csssc",
00773 LogField::sINT,
00774 &LogAccess::marshal_cache_resp_status_code,
00775 &LogAccess::unmarshal_http_status);
00776 global_field_list.add(field, false);
00777 ink_hash_table_insert(field_symbol_hash, "csssc", field);
00778
00779 field = new LogField("cached_resp_content_len", "csscl",
00780 LogField::sINT,
00781 &LogAccess::marshal_cache_resp_content_len,
00782 &LogAccess::unmarshal_int_to_str);
00783 global_field_list.add(field, false);
00784 ink_hash_table_insert(field_symbol_hash, "csscl", field);
00785
00786 field = new LogField("cached_resp_header_len", "csshl",
00787 LogField::sINT,
00788 &LogAccess::marshal_cache_resp_header_len,
00789 &LogAccess::unmarshal_int_to_str);
00790 global_field_list.add(field, false);
00791 ink_hash_table_insert(field_symbol_hash, "csshl", field);
00792
00793 field = new LogField("cached_resp_http_version", "csshv",
00794 LogField::dINT,
00795 &LogAccess::marshal_cache_resp_http_version,
00796 &LogAccess::unmarshal_http_version);
00797 global_field_list.add(field, false);
00798 ink_hash_table_insert(field_symbol_hash, "csshv", field);
00799
00800 field = new LogField("client_retry_after_time", "crat",
00801 LogField::sINT,
00802 &LogAccess::marshal_client_retry_after_time,
00803 &LogAccess::unmarshal_int_to_str);
00804 global_field_list.add(field, false);
00805 ink_hash_table_insert(field_symbol_hash, "crat", field);
00806
00807
00808
00809 Ptr<LogFieldAliasTable> cache_write_code_map = make_ptr(new LogFieldAliasTable);
00810 cache_write_code_map->init(N_LOG_CACHE_WRITE_TYPES,
00811 LOG_CACHE_WRITE_NONE, "-",
00812 LOG_CACHE_WRITE_LOCK_MISSED, "WL_MISS",
00813 LOG_CACHE_WRITE_LOCK_ABORTED, "INTR",
00814 LOG_CACHE_WRITE_ERROR, "ERR", LOG_CACHE_WRITE_COMPLETE, "FIN");
00815 field = new LogField("cache_write_result", "cwr",
00816 LogField::sINT,
00817 &LogAccess::marshal_cache_write_code,
00818 &LogAccess::unmarshal_cache_write_code,
00819 (Ptr<LogFieldAliasMap>) cache_write_code_map);
00820 global_field_list.add(field, false);
00821 ink_hash_table_insert(field_symbol_hash, "cwr", field);
00822
00823 field = new LogField("cache_write_transform_result", "cwtr",
00824 LogField::sINT,
00825 &LogAccess::marshal_cache_write_transform_code,
00826 &LogAccess::unmarshal_cache_write_code,
00827 (Ptr<LogFieldAliasMap>) cache_write_code_map);
00828 global_field_list.add(field, false);
00829 ink_hash_table_insert(field_symbol_hash, "cwtr", field);
00830
00831
00832
00833 field = new LogField("transfer_time_ms", "ttms",
00834 LogField::sINT,
00835 &LogAccess::marshal_transfer_time_ms,
00836 &LogAccess::unmarshal_int_to_str);
00837 global_field_list.add(field, false);
00838 ink_hash_table_insert(field_symbol_hash, "ttms", field);
00839
00840 field = new LogField("transfer_time_ms_hex", "ttmh",
00841 LogField::sINT,
00842 &LogAccess::marshal_transfer_time_ms,
00843 &LogAccess::unmarshal_int_to_str_hex);
00844 global_field_list.add(field, false);
00845 ink_hash_table_insert(field_symbol_hash, "ttmh", field);
00846
00847 field = new LogField("transfer_time_ms_fractional", "ttmsf",
00848 LogField::sINT,
00849 &LogAccess::marshal_transfer_time_ms,
00850 &LogAccess::unmarshal_ttmsf);
00851 global_field_list.add(field, false);
00852 ink_hash_table_insert(field_symbol_hash, "ttmsf", field);
00853
00854 field = new LogField("transfer_time_sec", "tts",
00855 LogField::sINT,
00856 &LogAccess::marshal_transfer_time_s,
00857 &LogAccess::unmarshal_int_to_str);
00858 global_field_list.add(field, false);
00859 ink_hash_table_insert(field_symbol_hash, "tts", field);
00860
00861 field = new LogField("file_size", "fsiz",
00862 LogField::sINT,
00863 &LogAccess::marshal_file_size,
00864 &LogAccess::unmarshal_int_to_str);
00865 global_field_list.add(field, false);
00866 ink_hash_table_insert(field_symbol_hash, "fsiz", field);
00867
00868 Ptr<LogFieldAliasTable> entry_type_map = make_ptr(new LogFieldAliasTable);
00869 entry_type_map->init(N_LOG_ENTRY_TYPES,
00870 LOG_ENTRY_HTTP, "LOG_ENTRY_HTTP",
00871 LOG_ENTRY_ICP, "LOG_ENTRY_ICP");
00872 field = new LogField("log_entry_type", "etype",
00873 LogField::sINT,
00874 &LogAccess::marshal_entry_type,
00875 &LogAccess::unmarshal_entry_type,
00876 (Ptr<LogFieldAliasMap>) entry_type_map);
00877 global_field_list.add(field, false);
00878 ink_hash_table_insert(field_symbol_hash, "etype", field);
00879
00880 init_status |= FIELDS_INITIALIZED;
00881 }
00882
00883
00884
00885
00886
00887
00888 int
00889 Log::handle_logging_mode_change(const char *, RecDataT ,
00890 RecData , void * )
00891 {
00892 Debug("log-config", "Enabled status changed");
00893 logging_mode_changed = true;
00894 return 0;
00895 }
00896
00897 void
00898 Log::init(int flags)
00899 {
00900 collation_preproc_threads = 1;
00901 collation_accept_file_descriptor = NO_FD;
00902
00903
00904
00905 config_flags = flags;
00906
00907
00908 config = new LogConfig();
00909 ink_assert (config != NULL);
00910
00911 log_configid = configProcessor.set(log_configid, config);
00912
00913
00914
00915 if (config_flags & LOGCAT) {
00916 logging_mode = LOG_MODE_NONE;
00917 } else {
00918 log_rsb = RecAllocateRawStatBlock((int) log_stat_count);
00919 LogConfig::register_stat_callbacks();
00920
00921 config->read_configuration_variables();
00922 collation_port = config->collation_port;
00923 collation_preproc_threads = config->collation_preproc_threads;
00924
00925 if (config_flags & STANDALONE_COLLATOR) {
00926 logging_mode = LOG_MODE_TRANSACTIONS;
00927 } else {
00928 int val = (int) REC_ConfigReadInteger("proxy.config.log.logging_enabled");
00929 if (val < LOG_MODE_NONE || val > LOG_MODE_FULL) {
00930 logging_mode = LOG_MODE_FULL;
00931 Warning("proxy.config.log.logging_enabled has an invalid "
00932 "value, setting it to %d", logging_mode);
00933 } else {
00934 logging_mode = (LoggingMode) val;
00935 }
00936 }
00937 }
00938
00939
00940
00941
00942 if (!(config_flags & NO_REMOTE_MANAGEMENT)) {
00943
00944 REC_RegisterConfigUpdateFunc("proxy.config.log.logging_enabled",
00945 &Log::handle_logging_mode_change, NULL);
00946
00947 REC_RegisterConfigUpdateFunc("proxy.local.log.collation_mode",
00948 &Log::handle_logging_mode_change, NULL);
00949
00950
00951
00952 RecSetRawStatSum(log_rsb, log_stat_log_files_open_stat, 0);
00953 RecSetRawStatCount(log_rsb, log_stat_log_files_open_stat, 0);
00954 }
00955
00956 if (config_flags & LOGCAT) {
00957 init_fields();
00958 } else {
00959 Debug("log-config", "Log::init(): logging_mode = %d "
00960 "init status = %d", logging_mode, init_status);
00961 init_when_enabled();
00962 if (config_flags & STANDALONE_COLLATOR) {
00963 config->collation_mode = Log::COLLATION_HOST;
00964 }
00965 config->init();
00966 }
00967 }
00968
00969 void
00970 Log::init_when_enabled()
00971 {
00972 init_fields();
00973
00974 if (!(init_status & FULLY_INITIALIZED)) {
00975
00976 if (!(config_flags & STANDALONE_COLLATOR)) {
00977
00978
00979 if (!(config_flags & NO_REMOTE_MANAGEMENT)) {
00980 LogConfig::register_config_callbacks();
00981 }
00982
00983 LogConfig::register_mgmt_callbacks();
00984 }
00985
00986
00987 global_scrap_format = MakeTextLogFormat();
00988 global_scrap_object = new LogObject(global_scrap_format,
00989 Log::config->logfile_dir,
00990 "scrapfile.log",
00991 LOG_FILE_BINARY, NULL,
00992 Log::config->rolling_enabled,
00993 Log::config->collation_preproc_threads,
00994 Log::config->rolling_interval_sec,
00995 Log::config->rolling_offset_hr,
00996 Log::config->rolling_size_mb);
00997
00998
00999 create_threads();
01000 eventProcessor.schedule_every(new PeriodicWakeup(collation_preproc_threads, 1),
01001 HRTIME_SECOND, ET_CALL);
01002
01003 init_status |= FULLY_INITIALIZED;
01004 }
01005
01006 Note("logging initialized[%d], logging_mode = %d", init_status, logging_mode);
01007 if (is_debug_tag_set("log-config")) {
01008 config->display();
01009 }
01010 }
01011
01012 void
01013 Log::create_threads()
01014 {
01015 char desc[64];
01016 preproc_notify = new EventNotify[collation_preproc_threads];
01017
01018 size_t stacksize;
01019 REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
01020
01021
01022
01023
01024
01025 for (int i = 0; i < collation_preproc_threads; i++) {
01026 Continuation *preproc_cont = new LoggingPreprocContinuation(i);
01027 sprintf(desc, "[LOG_PREPROC %d]", i);
01028 eventProcessor.spawn_thread(preproc_cont, desc, stacksize);
01029 }
01030
01031
01032
01033
01034
01035 flush_notify = new EventNotify;
01036 flush_data_list = new InkAtomicList;
01037
01038 sprintf(desc, "Logging flush buffer list");
01039 ink_atomiclist_init(flush_data_list, desc, 0);
01040 Continuation *flush_cont = new LoggingFlushContinuation(0);
01041 sprintf(desc, "[LOG_FLUSH]");
01042 eventProcessor.spawn_thread(flush_cont, desc, stacksize);
01043
01044 }
01045
01046
01047
01048
01049
01050
01051
01052
01053 int
01054 Log::access(LogAccess * lad)
01055 {
01056
01057
01058 if (!transaction_logging_enabled()) {
01059 return Log::SKIP;
01060 }
01061
01062 ink_assert(init_status & FULLY_INITIALIZED);
01063 ink_assert(lad != NULL);
01064
01065 int ret;
01066 static long sample = 1;
01067 long this_sample;
01068 ProxyMutex *mutex = this_ethread()->mutex;
01069
01070
01071
01072 if (Log::config->sampling_frequency > 1) {
01073 this_sample = sample++;
01074 if (this_sample && this_sample % Log::config->sampling_frequency) {
01075 Debug("log", "sampling, skipping this entry ...");
01076 RecIncrRawStat(log_rsb, mutex->thread_holding, log_stat_event_log_access_skip_stat, 1);
01077 ret = Log::SKIP;
01078 goto done;
01079 } else {
01080 Debug("log", "sampling, LOGGING this entry ...");
01081 sample = 1;
01082 }
01083 }
01084
01085 if (Log::config->log_object_manager.get_num_objects() == 0) {
01086 Debug("log", "no log objects, skipping this entry ...");
01087 RecIncrRawStat(log_rsb, mutex->thread_holding, log_stat_event_log_access_skip_stat, 1);
01088 ret = Log::SKIP;
01089 goto done;
01090 }
01091
01092
01093 lad->init();
01094 ret = config->log_object_manager.log(lad);
01095
01096 done:
01097 return ret;
01098 }
01099
01100
01101
01102
01103
01104
01105
01106
01107
01108
01109
01110
01111
01112
01113 int
01114 Log::error(const char *format, ...)
01115 {
01116 va_list ap;
01117 int ret;
01118
01119 va_start(ap, format);
01120 ret = Log::va_error(format, ap);
01121 va_end(ap);
01122
01123 return ret;
01124 }
01125
01126 int
01127 Log::va_error(const char *format, va_list ap)
01128 {
01129 int ret_val = Log::SKIP;
01130 ProxyMutex *mutex = this_ethread()->mutex;
01131
01132 if (error_log) {
01133 ink_assert(format != NULL);
01134 ret_val = error_log->va_log(NULL, format, ap);
01135
01136 switch (ret_val) {
01137 case Log::LOG_OK:
01138 RecIncrRawStat(log_rsb, mutex->thread_holding,
01139 log_stat_event_log_error_ok_stat, 1);
01140 break;
01141 case Log::SKIP:
01142 RecIncrRawStat(log_rsb, mutex->thread_holding,
01143 log_stat_event_log_error_skip_stat, 1);
01144 break;
01145 case Log::AGGR:
01146 RecIncrRawStat(log_rsb, mutex->thread_holding,
01147 log_stat_event_log_error_aggr_stat, 1);
01148 break;
01149 case Log::FULL:
01150 RecIncrRawStat(log_rsb, mutex->thread_holding,
01151 log_stat_event_log_error_full_stat, 1);
01152 break;
01153 case Log::FAIL:
01154 RecIncrRawStat(log_rsb, mutex->thread_holding,
01155 log_stat_event_log_error_fail_stat, 1);
01156 break;
01157 default:
01158 ink_release_assert(!"Unexpected result");
01159 }
01160
01161 return ret_val;
01162 }
01163
01164 RecIncrRawStat(log_rsb, mutex->thread_holding,
01165 log_stat_event_log_error_skip_stat, 1);
01166
01167 return ret_val;
01168 }
01169
01170
01171
01172
01173
01174
01175
01176
01177
01178
01179 void *
01180 Log::preproc_thread_main(void *args)
01181 {
01182 int idx = *(int *)args;
01183
01184 Debug("log-preproc", "log preproc thread is alive ...");
01185
01186 Log::preproc_notify[idx].lock();
01187
01188 while (true) {
01189 size_t buffers_preproced = 0;
01190 LogConfig * current = (LogConfig *)configProcessor.get(log_configid);
01191
01192 if (likely(current)) {
01193 buffers_preproced = current->log_object_manager.preproc_buffers(idx);
01194
01195
01196
01197
01198 Debug("log-preproc","%zu buffers preprocessed from LogConfig %p (refcount=%d) this round",
01199 buffers_preproced, current, current->m_refcount);
01200
01201 configProcessor.release(log_configid, current);
01202 }
01203
01204
01205
01206
01207
01208 Log::preproc_notify[idx].wait();
01209 }
01210
01211
01212 Log::preproc_notify[idx].unlock();
01213 return NULL;
01214 }
01215
01216 void *
01217 Log::flush_thread_main(void * )
01218 {
01219 char *buf;
01220 LogFile *logfile;
01221 LogBuffer *logbuffer;
01222 LogFlushData *fdata;
01223 ink_hrtime now, last_time = 0;
01224 int len, bytes_written, total_bytes;
01225 SLL<LogFlushData, LogFlushData::Link_link> link, invert_link;
01226 ProxyMutex *mutex = this_thread()->mutex;
01227
01228 Log::flush_notify->lock();
01229
01230 while (true) {
01231 fdata = (LogFlushData *) ink_atomiclist_popall(flush_data_list);
01232
01233
01234
01235 link.head = fdata;
01236 while ((fdata = link.pop()))
01237 invert_link.push(fdata);
01238
01239
01240
01241 while ((fdata = invert_link.pop())) {
01242 buf = NULL;
01243 bytes_written = 0;
01244 logfile = fdata->m_logfile;
01245
01246 if (logfile->m_file_format == LOG_FILE_BINARY) {
01247
01248 logbuffer = (LogBuffer *)fdata->m_data;
01249 LogBufferHeader *buffer_header = logbuffer->header();
01250
01251 buf = (char *)buffer_header;
01252 total_bytes = buffer_header->byte_count;
01253
01254 } else if (logfile->m_file_format == LOG_FILE_ASCII
01255 || logfile->m_file_format == LOG_FILE_PIPE){
01256
01257 buf = (char *)fdata->m_data;
01258 total_bytes = fdata->m_len;
01259
01260 } else {
01261 ink_release_assert(!"Unknown file format type!");
01262 }
01263
01264
01265 logfile->check_fd();
01266 if (!logfile->is_open()) {
01267 Warning("File:%s was closed, have dropped (%d) bytes.",
01268 logfile->get_name(), total_bytes);
01269
01270 RecIncrRawStat(log_rsb, mutex->thread_holding,
01271 log_stat_bytes_lost_before_written_to_disk_stat,
01272 total_bytes);
01273 delete fdata;
01274 continue;
01275 }
01276
01277
01278
01279 while (total_bytes - bytes_written) {
01280 if (Log::config->logging_space_exhausted) {
01281 Debug("log", "logging space exhausted, failed to write file:%s, have dropped (%d) bytes.",
01282 logfile->get_name(), (total_bytes - bytes_written));
01283
01284 RecIncrRawStat(log_rsb, mutex->thread_holding,
01285 log_stat_bytes_lost_before_written_to_disk_stat,
01286 total_bytes - bytes_written);
01287 break;
01288 }
01289
01290 len = ::write(logfile->m_fd, &buf[bytes_written],
01291 total_bytes - bytes_written);
01292 if (len < 0) {
01293 Error("Failed to write log to %s: [tried %d, wrote %d, %s]",
01294 logfile->get_name(), total_bytes - bytes_written,
01295 bytes_written, strerror(errno));
01296
01297 RecIncrRawStat(log_rsb, mutex->thread_holding,
01298 log_stat_bytes_lost_before_written_to_disk_stat,
01299 total_bytes - bytes_written);
01300 break;
01301 }
01302 bytes_written += len;
01303 }
01304
01305 RecIncrRawStat(log_rsb, mutex->thread_holding,
01306 log_stat_bytes_written_to_disk_stat, bytes_written);
01307
01308 ink_atomic_increment(&logfile->m_bytes_written, bytes_written);
01309
01310 delete fdata;
01311 }
01312
01313
01314
01315 now = ink_get_hrtime() / HRTIME_SECOND;
01316 if (now >= last_time + PERIODIC_TASKS_INTERVAL) {
01317 Debug("log-preproc", "periodic tasks for %" PRId64, (int64_t)now);
01318 periodic_tasks(now);
01319 last_time = ink_get_hrtime() / HRTIME_SECOND;
01320 }
01321
01322
01323
01324
01325
01326 Log::flush_notify->wait();
01327 }
01328
01329
01330 Log::flush_notify->unlock();
01331 return NULL;
01332 }
01333
01334
01335
01336
01337
01338
01339
01340
01341 void *
01342 Log::collate_thread_main(void * )
01343 {
01344 LogSock *sock;
01345 LogBufferHeader *header;
01346 LogFormat *format;
01347 LogObject *obj;
01348 int bytes_read;
01349 int sock_id;
01350 int new_client;
01351
01352 Debug("log-thread", "Log collation thread is alive ...");
01353
01354 Log::collate_notify.lock();
01355
01356 while (true) {
01357 ink_assert(Log::config != NULL);
01358
01359
01360
01361
01362
01363 while (!Log::config->am_collation_host()) {
01364 Log::collate_notify.wait();
01365 }
01366
01367
01368
01369
01370
01371 Debug("log-sock", "collation thread starting, creating LogSock");
01372 sock = new LogSock(LogSock::LS_CONST_CLUSTER_MAX_MACHINES);
01373 ink_assert(sock != NULL);
01374
01375 if (sock->listen(Log::config->collation_port) != 0) {
01376 LogUtils::manager_alarm(LogUtils::LOG_ALARM_ERROR,
01377 "Collation server error; could not listen on port %d", Log::config->collation_port);
01378 Warning("Collation server error; could not listen on port %d", Log::config->collation_port);
01379 delete sock;
01380
01381
01382
01383 Log::collate_notify.wait();
01384 continue;
01385 }
01386
01387 while (true) {
01388 if (!Log::config->am_collation_host()) {
01389 break;
01390 }
01391
01392 if (sock->pending_connect(0)) {
01393 Debug("log-sock", "pending connection ...");
01394 if ((new_client = sock->accept()) < 0) {
01395 Debug("log-sock", "error accepting new collation client");
01396 } else {
01397 Debug("log-sock", "connection %d accepted", new_client);
01398 if (!sock->authorized_client(new_client, Log::config->collation_secret)) {
01399 Warning("Unauthorized client connecting to " "log collation port; connection refused.");
01400 sock->close(new_client);
01401 }
01402 }
01403 }
01404
01405 sock->check_connections();
01406
01407 if (!sock->pending_message_any(&sock_id, 0)) {
01408 continue;
01409 }
01410
01411 Debug("log-sock", "pending message ...");
01412 header = (LogBufferHeader *) sock->read_alloc(sock_id, &bytes_read);
01413 if (!header) {
01414 Debug("log-sock", "Error reading LogBuffer from collation client");
01415 continue;
01416 }
01417
01418 if (header->version != LOG_SEGMENT_VERSION) {
01419 Note("Invalid LogBuffer received; invalid version - buffer = %u, current = %u",
01420 header->version, LOG_SEGMENT_VERSION);
01421 delete[]header;
01422 continue;
01423 }
01424
01425 Debug("log-sock", "message accepted, size = %d", bytes_read);
01426
01427 obj = match_logobject(header);
01428 if (!obj) {
01429 Note("LogObject not found with fieldlist id; " "writing LogBuffer to scrap file");
01430 obj = global_scrap_object;
01431 }
01432
01433 format = obj->m_format;
01434 Debug("log-sock", "Using format '%s'", format->name());
01435
01436 delete[]header;
01437 }
01438
01439 Debug("log", "no longer collation host, deleting LogSock");
01440 delete sock;
01441 }
01442
01443
01444 Log::collate_notify.unlock();
01445 return NULL;
01446 }
01447
01448
01449
01450
01451
01452
01453
01454
01455
01456
01457 LogObject *
01458 Log::match_logobject(LogBufferHeader * header)
01459 {
01460 if (!header)
01461 return NULL;
01462
01463 LogObject *obj;
01464 obj = Log::config->log_object_manager.get_object_with_signature(header->log_object_signature);
01465
01466 if (!obj) {
01467
01468
01469 LogFormat *fmt = new LogFormat("__collation_format__", header->fmt_fieldlist(), header->fmt_printf());
01470
01471 if (fmt->valid()) {
01472 LogFileFormat file_format = header->log_object_flags & LogObject::BINARY ? LOG_FILE_BINARY :
01473 (header->log_object_flags & LogObject::WRITES_TO_PIPE ? LOG_FILE_PIPE : LOG_FILE_ASCII);
01474
01475 obj = new LogObject(fmt, Log::config->logfile_dir,
01476 header->log_filename(), file_format, NULL,
01477 (Log::RollingEnabledValues)Log::config->rolling_enabled,
01478 Log::config->collation_preproc_threads,
01479 Log::config->rolling_interval_sec,
01480 Log::config->rolling_offset_hr,
01481 Log::config->rolling_size_mb, true);
01482
01483 delete fmt;
01484
01485 obj->set_remote_flag();
01486
01487 if (Log::config->log_object_manager.manage_object(obj)) {
01488
01489
01490
01491 delete obj;
01492 obj = NULL;
01493 }
01494 }
01495 }
01496 return obj;
01497 }