00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "libts.h"
00029
00030 #include <stdio.h>
00031 #include <stdlib.h>
00032 #include <limits.h>
00033 #include <string.h>
00034 #include <sys/types.h>
00035
00036 #include "P_EventSystem.h"
00037 #include "P_Net.h"
00038
00039 #include "LogUtils.h"
00040 #include "LogSock.h"
00041 #include "LogField.h"
00042 #include "LogFile.h"
00043 #include "LogFormat.h"
00044 #include "LogBuffer.h"
00045 #include "LogHost.h"
00046 #include "LogObject.h"
00047 #include "LogConfig.h"
00048 #include "Log.h"
00049
00050 #include "LogCollationClientSM.h"
00051
00052
00053
00054
00055
00056 int LogCollationClientSM::ID = 0;
00057
00058
00059
00060
00061
00062 LogCollationClientSM::LogCollationClientSM(LogHost * log_host)
00063 : Continuation(new_ProxyMutex()),
00064 m_host_vc(NULL),
00065 m_host_vio(NULL),
00066 m_auth_buffer(NULL),
00067 m_auth_reader(NULL),
00068 m_send_buffer(NULL),
00069 m_send_reader(NULL),
00070 m_pending_action(NULL),
00071 m_pending_event(NULL),
00072 m_abort_vio(NULL),
00073 m_abort_buffer(NULL),
00074 m_host_is_up(false),
00075 m_buffer_send_list(NULL),
00076 m_buffer_in_iocore(NULL),
00077 m_flow(LOG_COLL_FLOW_ALLOW),
00078 m_log_host(log_host),
00079 m_id(ID++)
00080 {
00081 Debug("log-coll", "[%d]client::constructor", m_id);
00082
00083 ink_assert(m_log_host != NULL);
00084
00085
00086
00087 m_buffer_send_list = new LogBufferList();
00088 ink_assert(m_buffer_send_list != NULL);
00089
00090 SET_HANDLER((LogCollationClientSMHandler) & LogCollationClientSM::client_handler);
00091 client_init(LOG_COLL_EVENT_SWITCH, NULL);
00092
00093 }
00094
00095
00096
00097
00098
00099 LogCollationClientSM::~LogCollationClientSM()
00100 {
00101 Debug("log-coll", "[%d]client::destructor", m_id);
00102
00103 ink_mutex_acquire(&(mutex->the_mutex));
00104 client_done(LOG_COLL_EVENT_SWITCH, NULL);
00105 ink_mutex_release(&(mutex->the_mutex));
00106 }
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120 int
00121 LogCollationClientSM::client_handler(int event, void *data)
00122 {
00123 switch (m_client_state) {
00124 case LOG_COLL_CLIENT_AUTH:
00125 return client_auth(event, (VIO *) data);
00126 case LOG_COLL_CLIENT_DNS:
00127 return client_dns(event, (HostDBInfo *) data);
00128 case LOG_COLL_CLIENT_DONE:
00129 return client_done(event, data);
00130 case LOG_COLL_CLIENT_FAIL:
00131 return client_fail(event, data);
00132 case LOG_COLL_CLIENT_IDLE:
00133 return client_idle(event, data);
00134 case LOG_COLL_CLIENT_INIT:
00135 return client_init(event, data);
00136 case LOG_COLL_CLIENT_OPEN:
00137 return client_open(event, (NetVConnection *) data);
00138 case LOG_COLL_CLIENT_SEND:
00139 return client_send(event, (VIO *) data);
00140 default:
00141 ink_assert(!"unexpcted state");
00142 return EVENT_CONT;
00143 }
00144 }
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158 int
00159 LogCollationClientSM::send(LogBuffer * log_buffer)
00160 {
00161 ip_port_text_buffer ipb;
00162
00163
00164 ink_mutex_acquire(&(mutex->the_mutex));
00165
00166 Debug("log-coll", "[%d]client::send", m_id);
00167
00168
00169 if (m_client_state == LOG_COLL_CLIENT_DONE || m_client_state == LOG_COLL_CLIENT_FAIL) {
00170 Debug("log-coll", "[%d]client::send - DONE/FAIL state; rejecting", m_id);
00171 ink_mutex_release(&(mutex->the_mutex));
00172 return 0;
00173 }
00174
00175 if (m_flow == LOG_COLL_FLOW_DENY) {
00176 Debug("log-coll", "[%d]client::send - m_flow = DENY; rejecting", m_id);
00177 ink_mutex_release(&(mutex->the_mutex));
00178 return 0;
00179 }
00180
00181 ink_assert(log_buffer != NULL);
00182 ink_assert(m_buffer_send_list != NULL);
00183 m_buffer_send_list->add(log_buffer);
00184 Debug("log-coll", "[%d]client::send - new log_buffer to send_list", m_id);
00185
00186
00187 ink_assert(m_flow == LOG_COLL_FLOW_ALLOW);
00188 if (m_buffer_send_list->get_size() >= Log::config->collation_max_send_buffers) {
00189 Debug("log-coll", "[%d]client::send - m_flow = DENY", m_id);
00190 Note("[log-coll] send-queue full; orphaning logs "
00191 "[%s:%u]", m_log_host->ip_addr().toString(ipb, sizeof(ipb)), m_log_host->port());
00192 m_flow = LOG_COLL_FLOW_DENY;
00193 }
00194
00195
00196
00197 LogBufferHeader *log_buffer_header = log_buffer->header();
00198 ink_assert(log_buffer_header != NULL);
00199 int bytes_to_write = log_buffer_header->byte_count;
00200
00201
00202 if (m_client_state == LOG_COLL_CLIENT_IDLE) {
00203 m_client_state = LOG_COLL_CLIENT_SEND;
00204 ink_assert(m_pending_event == NULL);
00205 m_pending_event = eventProcessor.schedule_imm(this);
00206
00207
00208 }
00209
00210 ink_mutex_release(&(mutex->the_mutex));
00211 return bytes_to_write;
00212 }
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227 int
00228 LogCollationClientSM::client_auth(int event, VIO * )
00229 {
00230 ip_port_text_buffer ipb;
00231
00232 Debug("log-coll", "[%d]client::client_auth", m_id);
00233
00234 switch (event) {
00235 case LOG_COLL_EVENT_SWITCH:
00236 {
00237 Debug("log-coll", "[%d]client::client_auth - SWITCH", m_id);
00238 m_client_state = LOG_COLL_CLIENT_AUTH;
00239
00240 NetMsgHeader nmh;
00241 int bytes_to_send = (int) strlen(Log::config->collation_secret);
00242 nmh.msg_bytes = bytes_to_send;
00243
00244
00245 ink_assert(m_auth_buffer != NULL);
00246 m_auth_buffer->write((char *) &nmh, sizeof(NetMsgHeader));
00247 m_auth_buffer->write(Log::config->collation_secret, bytes_to_send);
00248 bytes_to_send += sizeof(NetMsgHeader);
00249
00250 Debug("log-coll", "[%d]client::client_auth - do_io_write(%d)", m_id, bytes_to_send);
00251 ink_assert(m_host_vc != NULL);
00252 m_host_vio = m_host_vc->do_io_write(this, bytes_to_send, m_auth_reader);
00253 ink_assert(m_host_vio != NULL);
00254
00255 return EVENT_CONT;
00256 }
00257
00258 case VC_EVENT_WRITE_READY:
00259 Debug("log-coll", "[%d]client::client_auth - WRITE_READY", m_id);
00260 return EVENT_CONT;
00261
00262 case VC_EVENT_WRITE_COMPLETE:
00263 Debug("log-coll", "[%d]client::client_auth - WRITE_COMPLETE", m_id);
00264
00265 Note("[log-coll] host up [%s:%u]", m_log_host->ip_addr().toString(ipb, sizeof(ipb)), m_log_host->port());
00266 m_host_is_up = true;
00267
00268 return client_send(LOG_COLL_EVENT_SWITCH, NULL);
00269
00270 case VC_EVENT_EOS:
00271 case VC_EVENT_ERROR:
00272 {
00273 Debug("log-coll", "[%d]client::client_auth - EOS|ERROR", m_id);
00274 int64_t read_avail = m_auth_reader->read_avail();
00275
00276 if (read_avail > 0) {
00277 Debug("log-coll", "[%d]client::client_auth - consuming unsent data", m_id);
00278 m_auth_reader->consume(read_avail);
00279 }
00280
00281 return client_fail(LOG_COLL_EVENT_SWITCH, NULL);
00282 }
00283
00284 default:
00285 ink_assert(!"unexpected event");
00286 return EVENT_CONT;
00287 }
00288 }
00289
00290
00291
00292
00293
00294 int
00295 LogCollationClientSM::client_dns(int event, HostDBInfo * hostdb_info)
00296 {
00297 Debug("log-coll", "[%d]client::client_dns", m_id);
00298
00299 switch (event) {
00300 case LOG_COLL_EVENT_SWITCH:
00301 m_client_state = LOG_COLL_CLIENT_DNS;
00302 if (m_log_host->m_name == 0) {
00303 return client_done(LOG_COLL_EVENT_SWITCH, NULL);
00304 }
00305 hostDBProcessor.getbyname_re(this, m_log_host->m_name, 0,
00306 HostDBProcessor::Options().setFlags(HostDBProcessor::HOSTDB_FORCE_DNS_RELOAD));
00307 return EVENT_CONT;
00308
00309 case EVENT_HOST_DB_LOOKUP:
00310 if (hostdb_info == NULL) {
00311 return client_done(LOG_COLL_EVENT_SWITCH, NULL);
00312 }
00313 m_log_host->m_ip.assign(hostdb_info->ip());
00314 m_log_host->m_ip.toString(m_log_host->m_ipstr, sizeof(m_log_host->m_ipstr));
00315
00316 return client_open(LOG_COLL_EVENT_SWITCH, NULL);
00317
00318 default:
00319 ink_assert(!"unexpected event");
00320 return EVENT_CONT;
00321
00322 }
00323
00324 }
00325
00326
00327
00328
00329
00330
00331 int
00332 LogCollationClientSM::client_done(int event, void * )
00333 {
00334 ip_port_text_buffer ipb;
00335
00336 Debug("log-coll", "[%d]client::client_done", m_id);
00337
00338 switch (event) {
00339 case LOG_COLL_EVENT_SWITCH:
00340 m_client_state = LOG_COLL_CLIENT_DONE;
00341
00342 Note("[log-coll] client shutdown [%s:%u]", m_log_host->ip_addr().toString(ipb, sizeof(ipb)), m_log_host->port());
00343
00344
00345 if (m_host_vc) {
00346 Debug("log-coll", "[%d]client::client_done - disconnecting!", m_id);
00347
00348 m_host_vc->do_io_close(0);
00349 m_host_vc = 0;
00350 }
00351
00352 flush_to_orphan();
00353
00354
00355 if (m_pending_action != NULL) {
00356 m_pending_action->cancel();
00357 }
00358 if (m_pending_event != NULL) {
00359 m_pending_event->cancel();
00360 }
00361
00362 if (m_auth_buffer) {
00363 if (m_auth_reader) {
00364 m_auth_buffer->dealloc_reader(m_auth_reader);
00365 }
00366 free_MIOBuffer(m_auth_buffer);
00367 }
00368 if (m_send_buffer) {
00369 if (m_send_reader) {
00370 m_send_buffer->dealloc_reader(m_send_reader);
00371 }
00372 free_MIOBuffer(m_send_buffer);
00373 }
00374 if (m_abort_buffer) {
00375 free_MIOBuffer(m_abort_buffer);
00376 }
00377 if (m_buffer_send_list) {
00378 delete m_buffer_send_list;
00379 }
00380
00381 return EVENT_DONE;
00382
00383 default:
00384 ink_assert(!"unexpected event");
00385 return EVENT_DONE;
00386
00387 }
00388
00389 }
00390
00391
00392
00393
00394
00395
00396 int
00397 LogCollationClientSM::client_fail(int event, void * )
00398 {
00399 ip_port_text_buffer ipb;
00400
00401 Debug("log-coll", "[%d]client::client_fail", m_id);
00402
00403 switch (event) {
00404 case LOG_COLL_EVENT_SWITCH:
00405 Debug("log-coll", "[%d]client::client_fail - SWITCH", m_id);
00406 m_client_state = LOG_COLL_CLIENT_FAIL;
00407
00408
00409 if (m_host_is_up) {
00410 Note("[log-coll] host down [%s:%u]", m_log_host->ip_addr().toString(ipb, sizeof ipb), m_log_host->m_port);
00411 char msg_buf[128];
00412 snprintf(msg_buf, sizeof(msg_buf), "Collation host %s:%u down",
00413 m_log_host->ip_addr().toString(ipb, sizeof ipb), m_log_host->m_port
00414 );
00415 RecSignalManager(MGMT_SIGNAL_SAC_SERVER_DOWN, msg_buf);
00416 m_host_is_up = false;
00417 }
00418
00419
00420 if (m_host_vc) {
00421 m_host_vc->do_io_close(0);
00422 m_host_vc = 0;
00423 }
00424
00425 flush_to_orphan();
00426
00427
00428 ink_assert(m_pending_event == NULL);
00429 m_pending_event = eventProcessor.schedule_in(this, HRTIME_SECONDS(Log::config->collation_retry_sec));
00430
00431 return EVENT_CONT;
00432
00433 case EVENT_INTERVAL:
00434 Debug("log-coll", "[%d]client::client_fail - INTERVAL", m_id);
00435 m_pending_event = NULL;
00436 return client_open(LOG_COLL_EVENT_SWITCH, NULL);
00437
00438 default:
00439 ink_assert(!"unexpected event");
00440 return EVENT_CONT;
00441
00442 }
00443 }
00444
00445
00446
00447
00448
00449
00450 int
00451 LogCollationClientSM::client_idle(int event, void * )
00452 {
00453 Debug("log-coll", "[%d]client::client_idle", m_id);
00454
00455 switch (event) {
00456 case LOG_COLL_EVENT_SWITCH:
00457 m_client_state = LOG_COLL_CLIENT_IDLE;
00458 return EVENT_CONT;
00459
00460 case VC_EVENT_EOS:
00461 case VC_EVENT_ERROR:
00462 Debug("log-coll", "[%d]client::client_idle - EOS|ERROR", m_id);
00463 return client_fail(LOG_COLL_EVENT_SWITCH, NULL);
00464
00465 default:
00466 ink_assert(!"unexpcted state");
00467 return EVENT_CONT;
00468
00469 }
00470 }
00471
00472
00473
00474
00475
00476
00477 int
00478 LogCollationClientSM::client_init(int event, void * )
00479 {
00480 Debug("log-coll", "[%d]client::client_init", m_id);
00481
00482 switch (event) {
00483 case LOG_COLL_EVENT_SWITCH:
00484 m_client_state = LOG_COLL_CLIENT_INIT;
00485 ink_assert(m_pending_event == NULL);
00486 ink_mutex_acquire(&(mutex->the_mutex));
00487 m_pending_event = eventProcessor.schedule_imm(this);
00488 ink_mutex_release(&(mutex->the_mutex));
00489 return EVENT_CONT;
00490
00491 case EVENT_IMMEDIATE:
00492
00493 m_pending_event = NULL;
00494
00495
00496 m_auth_buffer = new_MIOBuffer();
00497 ink_assert(m_auth_buffer != NULL);
00498 m_auth_reader = m_auth_buffer->alloc_reader();
00499 ink_assert(m_auth_reader != NULL);
00500 m_send_buffer = new_MIOBuffer();
00501 ink_assert(m_send_buffer != NULL);
00502 m_send_reader = m_send_buffer->alloc_reader();
00503 ink_assert(m_send_reader != NULL);
00504 m_abort_buffer = new_MIOBuffer();
00505 ink_assert(m_abort_buffer != NULL);
00506
00507
00508 if (! m_log_host->ip_addr().isValid()) {
00509 return client_dns(LOG_COLL_EVENT_SWITCH, NULL);
00510 } else {
00511 return client_open(LOG_COLL_EVENT_SWITCH, NULL);
00512 }
00513
00514 default:
00515 ink_assert(!"unexpected state");
00516 return EVENT_CONT;
00517
00518 }
00519 }
00520
00521
00522
00523
00524
00525
00526 int
00527 LogCollationClientSM::client_open(int event, NetVConnection * net_vc)
00528 {
00529 ip_port_text_buffer ipb;
00530 Debug("log-coll", "[%d]client::client_open", m_id);
00531
00532 switch (event) {
00533 case LOG_COLL_EVENT_SWITCH:
00534 Debug("log-coll", "[%d]client::client_open - SWITCH", m_id);
00535 m_client_state = LOG_COLL_CLIENT_OPEN;
00536
00537 {
00538 IpEndpoint target;
00539 target.assign(m_log_host->ip_addr(), htons(m_log_host->port()));
00540 ink_assert(target.isValid());
00541 Action *connect_action_handle = netProcessor.connect_re(this, &target.sa);
00542
00543 if (connect_action_handle != ACTION_RESULT_DONE) {
00544 ink_assert(!m_pending_action);
00545 m_pending_action = connect_action_handle;
00546 }
00547 }
00548
00549 return EVENT_CONT;
00550
00551 case NET_EVENT_OPEN:
00552 Debug("log-coll", "[%d]client::client_open - %s:%u", m_id,
00553 m_log_host->ip_addr().toString(ipb, sizeof ipb), m_log_host->port()
00554 );
00555
00556
00557 m_pending_action = NULL;
00558
00559 ink_assert(net_vc != NULL);
00560 m_host_vc = net_vc;
00561
00562
00563
00564 m_abort_vio = m_host_vc->do_io_read(this, 1, m_abort_buffer);
00565
00566
00567 return client_auth(LOG_COLL_EVENT_SWITCH, NULL);
00568
00569 case NET_EVENT_OPEN_FAILED:
00570 Debug("log-coll", "[%d]client::client_open - OPEN_FAILED", m_id);
00571
00572 m_pending_action = NULL;
00573 return client_fail(LOG_COLL_EVENT_SWITCH, NULL);
00574
00575 default:
00576 ink_assert(!"unexpected event");
00577 return EVENT_CONT;
00578
00579 }
00580 }
00581
00582
00583
00584
00585
00586
00587 int
00588 LogCollationClientSM::client_send(int event, VIO * )
00589 {
00590 ip_port_text_buffer ipb;
00591
00592 Debug("log-coll", "[%d]client::client_send", m_id);
00593
00594 switch (event) {
00595 case EVENT_IMMEDIATE:
00596 Debug("log-coll", "[%d]client::client_send - EVENT_IMMEDIATE", m_id);
00597
00598 m_pending_event = NULL;
00599
00600
00601
00602 case LOG_COLL_EVENT_SWITCH:
00603 {
00604 Debug("log-coll", "[%d]client::client_send - SWITCH", m_id);
00605 m_client_state = LOG_COLL_CLIENT_SEND;
00606
00607
00608 ink_assert(m_buffer_send_list != NULL);
00609 ink_assert(m_buffer_in_iocore == NULL);
00610 if ((m_buffer_in_iocore = m_buffer_send_list->get()) == NULL) {
00611 return client_idle(LOG_COLL_EVENT_SWITCH, NULL);
00612 }
00613 Debug("log-coll", "[%d]client::client_send - send_list to m_buffer_in_iocore", m_id);
00614 Debug("log-coll", "[%d]client::client_send - send_list_size(%d)", m_id, m_buffer_send_list->get_size());
00615
00616
00617 if (m_flow == LOG_COLL_FLOW_DENY && m_buffer_send_list->get_size() == 0) {
00618 Debug("log-coll", "[%d]client::client_send - m_flow = ALLOW", m_id);
00619 Note("[log-coll] send-queue clear; resuming collation [%s:%u]",
00620 m_log_host->ip_addr().toString(ipb, sizeof ipb), m_log_host->port());
00621 m_flow = LOG_COLL_FLOW_ALLOW;
00622 }
00623
00624
00625
00626
00627
00628 #if defined(LOG_BUFFER_TRACKING)
00629 Debug("log-buftrak", "[%d]client::client_send - network write begin", m_buffer_in_iocore->header()->id);
00630 #endif // defined(LOG_BUFFER_TRACKING)
00631
00632
00633 ink_assert(m_buffer_in_iocore != NULL);
00634 LogBufferHeader *log_buffer_header = m_buffer_in_iocore->header();
00635 ink_assert(log_buffer_header != NULL);
00636 NetMsgHeader nmh;
00637 int bytes_to_send = log_buffer_header->byte_count;
00638 nmh.msg_bytes = bytes_to_send;
00639
00640
00641
00642 RecIncrRawStat(log_rsb, mutex->thread_holding, log_stat_num_sent_to_network_stat,
00643 log_buffer_header->entry_count);
00644
00645 RecIncrRawStat(log_rsb, mutex->thread_holding, log_stat_bytes_sent_to_network_stat,
00646 log_buffer_header->byte_count);
00647
00648
00649 ink_assert(m_send_buffer != NULL);
00650 m_send_buffer->write((char *) &nmh, sizeof(NetMsgHeader));
00651 m_send_buffer->write((char *) log_buffer_header, bytes_to_send);
00652 bytes_to_send += sizeof(NetMsgHeader);
00653
00654
00655 Debug("log-coll", "[%d]client::client_send - do_io_write(%d)", m_id, bytes_to_send);
00656 ink_assert(m_host_vc != NULL);
00657 m_host_vio = m_host_vc->do_io_write(this, bytes_to_send, m_send_reader);
00658 ink_assert(m_host_vio != NULL);
00659 }
00660 return EVENT_CONT;
00661
00662 case VC_EVENT_WRITE_READY:
00663 Debug("log-coll", "[%d]client::client_send - WRITE_READY", m_id);
00664 return EVENT_CONT;
00665
00666 case VC_EVENT_WRITE_COMPLETE:
00667 Debug("log-coll", "[%d]client::client_send - WRITE_COMPLETE", m_id);
00668
00669 ink_assert(m_buffer_in_iocore != NULL);
00670 #if defined(LOG_BUFFER_TRACKING)
00671 Debug("log-buftrak", "[%d]client::client_send - network write complete", m_buffer_in_iocore->header()->id);
00672 #endif // defined(LOG_BUFFER_TRACKING)
00673
00674
00675 Debug("log-coll", "[%d]client::client_send - m_buffer_in_iocore[%p] to delete_list", m_id, m_buffer_in_iocore);
00676 LogBuffer::destroy(m_buffer_in_iocore);
00677 m_buffer_in_iocore = NULL;
00678
00679
00680 return client_send(LOG_COLL_EVENT_SWITCH, NULL);
00681
00682 case VC_EVENT_EOS:
00683 case VC_EVENT_ERROR:
00684 {
00685 Debug("log-coll", "[%d]client::client_send - EOS|ERROR", m_id);
00686 int64_t read_avail = m_send_reader->read_avail();
00687
00688 if (read_avail > 0) {
00689 Debug("log-coll", "[%d]client::client_send - consuming unsent data", m_id);
00690 m_send_reader->consume(read_avail);
00691 }
00692
00693 return client_fail(LOG_COLL_EVENT_SWITCH, NULL);
00694 }
00695
00696 default:
00697 Debug("log-coll", "[%d]client::client_send - default", m_id);
00698 return client_fail(LOG_COLL_EVENT_SWITCH, NULL);
00699
00700 }
00701 }
00702
00703
00704
00705
00706
00707
00708
00709
00710
00711
00712
00713
00714 void
00715 LogCollationClientSM::flush_to_orphan()
00716 {
00717 Debug("log-coll", "[%d]client::flush_to_orphan", m_id);
00718
00719
00720 if (m_buffer_in_iocore != NULL) {
00721 Debug("log-coll", "[%d]client::flush_to_orphan - m_buffer_in_iocore to oprhan", m_id);
00722
00723
00724 m_log_host->orphan_write_and_try_delete(m_buffer_in_iocore);
00725 m_buffer_in_iocore = NULL;
00726 }
00727
00728 LogBuffer *log_buffer;
00729 ink_assert(m_buffer_send_list != NULL);
00730 while ((log_buffer = m_buffer_send_list->get()) != NULL) {
00731 Debug("log-coll", "[%d]client::flush_to_orphan - send_list to orphan", m_id);
00732 m_log_host->orphan_write_and_try_delete(log_buffer);
00733 }
00734
00735
00736 Debug("log-coll", "[%d]client::client_send - m_flow = ALLOW", m_id);
00737 m_flow = LOG_COLL_FLOW_ALLOW;
00738 }