00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "ink_config.h"
00029
00030 #include <stdio.h>
00031 #include <stdlib.h>
00032 #include <limits.h>
00033 #include <string.h>
00034 #include <sys/types.h>
00035
00036 #include "P_EventSystem.h"
00037 #include "P_Net.h"
00038
00039 #include "LogUtils.h"
00040 #include "LogSock.h"
00041 #include "LogField.h"
00042 #include "LogFile.h"
00043 #include "LogFormat.h"
00044 #include "LogBuffer.h"
00045 #include "LogHost.h"
00046 #include "LogObject.h"
00047 #include "LogConfig.h"
00048 #include "Log.h"
00049
00050 #include "LogCollationHostSM.h"
00051
00052
00053
00054
00055
00056 int
00057 LogCollationHostSM::ID = 0;
00058
00059
00060
00061
00062
00063 LogCollationHostSM::LogCollationHostSM(NetVConnection * client_vc):
00064 Continuation(new_ProxyMutex()),
00065 m_client_vc(client_vc),
00066 m_client_vio(NULL),
00067 m_client_buffer(NULL),
00068 m_client_reader(NULL),
00069 m_pending_event(NULL),
00070 m_read_buffer(NULL), m_read_bytes_wanted(0), m_read_bytes_received(0), m_client_ip(0), m_client_port(0), m_id(ID++)
00071 {
00072
00073 Debug("log-coll", "[%d]host::constructor", m_id);
00074
00075 ink_assert(m_client_vc != NULL);
00076
00077
00078 m_client_ip = m_client_vc->get_remote_ip();
00079 m_client_port = m_client_vc->get_remote_port();
00080 Note("[log-coll] client connected [%d.%d.%d.%d:%d]",
00081 ((unsigned char *) (&m_client_ip))[0],
00082 ((unsigned char *) (&m_client_ip))[1],
00083 ((unsigned char *) (&m_client_ip))[2], ((unsigned char *) (&m_client_ip))[3], m_client_port);
00084
00085 SET_HANDLER((LogCollationHostSMHandler) & LogCollationHostSM::host_handler);
00086 host_init(LOG_COLL_EVENT_SWITCH, NULL);
00087
00088 }
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102 int
00103 LogCollationHostSM::host_handler(int event, void *data)
00104 {
00105
00106 switch (m_host_state) {
00107 case LOG_COLL_HOST_AUTH:
00108 return host_auth(event, data);
00109 case LOG_COLL_HOST_DONE:
00110 return host_done(event, data);
00111 case LOG_COLL_HOST_INIT:
00112 return host_init(event, data);
00113 case LOG_COLL_HOST_RECV:
00114 return host_recv(event, data);
00115 default:
00116 ink_assert(!"unexpected state");
00117 return EVENT_CONT;
00118 }
00119
00120 }
00121
00122
00123
00124
00125
00126 int
00127 LogCollationHostSM::read_handler(int event, void *data)
00128 {
00129
00130 switch (m_read_state) {
00131 case LOG_COLL_READ_BODY:
00132 return read_body(event, (VIO *) data);
00133 case LOG_COLL_READ_HDR:
00134 return read_hdr(event, (VIO *) data);
00135 default:
00136 ink_assert(!"unexpected state");
00137 return EVENT_CONT;
00138 }
00139
00140 }
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155 int
00156 LogCollationHostSM::host_auth(int event, void * )
00157 {
00158 Debug("log-coll", "[%d]host::host_auth", m_id);
00159
00160 switch (event) {
00161
00162 case LOG_COLL_EVENT_SWITCH:
00163 Debug("log-coll", "[%d]host::host_auth - SWITCH", m_id);
00164 m_host_state = LOG_COLL_HOST_AUTH;
00165 return read_start();
00166
00167 case LOG_COLL_EVENT_READ_COMPLETE:
00168 Debug("log-coll", "[%d]host::host_auth - READ_COMPLETE", m_id);
00169 {
00170
00171 ink_assert(m_read_buffer != NULL);
00172 int diff = strncmp(m_read_buffer, Log::config->collation_secret,
00173 m_read_bytes_received);
00174 delete[]m_read_buffer;
00175 m_read_buffer = 0;
00176 if (!diff) {
00177 Debug("log-coll", "[%d]host::host_auth - authenticated!", m_id);
00178 return host_recv(LOG_COLL_EVENT_SWITCH, NULL);
00179 } else {
00180 Debug("log-coll", "[%d]host::host_auth - authenticated failed!", m_id);
00181 Note("[log-coll] authentication failed [%d.%d.%d.%d:%d]",
00182 ((unsigned char *) (&m_client_ip))[0],
00183 ((unsigned char *) (&m_client_ip))[1],
00184 ((unsigned char *) (&m_client_ip))[2], ((unsigned char *) (&m_client_ip))[3], m_client_port);
00185 return host_done(LOG_COLL_EVENT_SWITCH, NULL);
00186 }
00187
00188 }
00189
00190 case LOG_COLL_EVENT_ERROR:
00191 Debug("log-coll", "[%d]host::host_auth - ERROR", m_id);
00192 return host_done(LOG_COLL_EVENT_SWITCH, NULL);
00193
00194 default:
00195 ink_assert(!"unexpected state");
00196 return EVENT_CONT;
00197
00198 }
00199
00200 }
00201
00202
00203
00204
00205
00206
00207 int
00208 LogCollationHostSM::host_done(int , void * )
00209 {
00210 Debug("log-coll", "[%d]host::host_done", m_id);
00211
00212
00213 if (m_client_vc) {
00214 Debug("log-coll", "[%d]host::host_done - disconnecting!", m_id);
00215 m_client_vc->do_io_close();
00216 m_client_vc = 0;
00217 Note("[log-coll] client disconnected [%d.%d.%d.%d:%d]",
00218 ((unsigned char *) (&m_client_ip))[0],
00219 ((unsigned char *) (&m_client_ip))[1],
00220 ((unsigned char *) (&m_client_ip))[2], ((unsigned char *) (&m_client_ip))[3], m_client_port);
00221 }
00222
00223 if (m_client_buffer) {
00224 if (m_client_reader) {
00225 m_client_buffer->dealloc_reader(m_client_reader);
00226 }
00227 free_MIOBuffer(m_client_buffer);
00228 }
00229
00230 delete this;
00231 return EVENT_DONE;
00232
00233 }
00234
00235
00236
00237
00238
00239
00240 int
00241 LogCollationHostSM::host_init(int event, void * )
00242 {
00243 Debug("log-coll", "[%d]host::host_init", m_id);
00244
00245 switch (event) {
00246
00247 case LOG_COLL_EVENT_SWITCH:
00248 m_host_state = LOG_COLL_HOST_INIT;
00249 m_pending_event = eventProcessor.schedule_imm(this);
00250 return EVENT_CONT;
00251
00252 case EVENT_IMMEDIATE:
00253
00254 m_client_buffer = new_MIOBuffer();
00255 ink_assert(m_client_buffer != NULL);
00256 m_client_reader = m_client_buffer->alloc_reader();
00257 ink_assert(m_client_reader != NULL);
00258 return host_auth(LOG_COLL_EVENT_SWITCH, NULL);
00259
00260 default:
00261 ink_assert(!"unexpected state");
00262 return EVENT_DONE;
00263
00264 }
00265
00266 }
00267
00268
00269
00270
00271
00272
00273 int
00274 LogCollationHostSM::host_recv(int event, void * )
00275 {
00276 Debug("log-coll", "[%d]host::host_recv", m_id);
00277
00278 switch (event) {
00279
00280 case LOG_COLL_EVENT_SWITCH:
00281 Debug("log-coll", "[%d]host::host_recv - SWITCH", m_id);
00282 m_host_state = LOG_COLL_HOST_RECV;
00283 return read_start();
00284
00285 case LOG_COLL_EVENT_READ_COMPLETE:
00286 Debug("log-coll", "[%d]host::host_recv - READ_COMPLETE", m_id);
00287 {
00288
00289 LogBufferHeader *log_buffer_header;
00290 LogBuffer *log_buffer;
00291 LogFormat *log_format;
00292 LogObject *log_object;
00293 unsigned version;
00294
00295 ink_assert(m_read_buffer != NULL);
00296 ink_assert(m_read_bytes_received >= (int64_t)sizeof(LogBufferHeader));
00297 log_buffer_header = (LogBufferHeader *) m_read_buffer;
00298
00299
00300
00301
00302
00303 version = log_buffer_header->version;
00304 if (version != LOG_SEGMENT_VERSION) {
00305 Note("[log-coll] invalid LogBuffer received; invalid version - "
00306 "buffer = %u, current = %u", version, LOG_SEGMENT_VERSION);
00307 delete[]m_read_buffer;
00308
00309 } else {
00310 log_object = Log::match_logobject(log_buffer_header);
00311 if (!log_object) {
00312 Note("[log-coll] LogObject not found with fieldlist id; " "writing LogBuffer to scrap file");
00313 log_object = Log::global_scrap_object;
00314 }
00315 log_format = log_object->m_format;
00316 Debug("log-coll", "[%d]host::host_recv - using format '%s'", m_id, log_format->name());
00317
00318
00319
00320
00321
00322 log_buffer = new LogBuffer(log_object, log_buffer_header);
00323
00324 RecIncrRawStat(log_rsb, mutex->thread_holding, log_stat_num_received_from_network_stat,
00325 log_buffer_header->entry_count);
00326
00327 RecIncrRawStat(log_rsb, mutex->thread_holding, log_stat_bytes_received_from_network_stat,
00328 log_buffer_header->byte_count);
00329
00330 int idx = log_object->add_to_flush_queue(log_buffer);
00331 Log::preproc_notify[idx].signal();
00332 }
00333
00334 #if defined(LOG_BUFFER_TRACKING)
00335 Debug("log-buftrak", "[%d]host::host_recv - network read complete", log_buffer_header->id);
00336 #endif // defined(LOG_BUFFER_TRACKING)
00337
00338
00339 m_read_buffer = 0;
00340
00341 return host_recv(LOG_COLL_EVENT_SWITCH, NULL);
00342
00343 }
00344
00345 case LOG_COLL_EVENT_ERROR:
00346 Debug("log-coll", "[%d]host::host_recv - ERROR", m_id);
00347 return host_done(LOG_COLL_EVENT_SWITCH, NULL);
00348
00349 default:
00350 ink_assert(!"unexpected state");
00351 return EVENT_DONE;
00352
00353 }
00354
00355 }
00356
00357
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370 int
00371 LogCollationHostSM::read_start()
00372 {
00373
00374 Debug("log-coll", "[%d]host::read_start", m_id);
00375
00376 SET_HANDLER((LogCollationHostSMHandler) & LogCollationHostSM::read_handler);
00377 if (m_read_buffer) {
00378 ink_assert(!"m_read_buffer still points to something, doh!");
00379 }
00380 return read_hdr(LOG_COLL_EVENT_SWITCH, NULL);
00381
00382 }
00383
00384
00385
00386
00387
00388
00389 int
00390 LogCollationHostSM::read_hdr(int event, VIO * vio)
00391 {
00392
00393 Debug("log-coll", "[%d]host::read_hdr", m_id);
00394
00395 switch (event) {
00396
00397 case LOG_COLL_EVENT_SWITCH:
00398 Debug("log-coll", "[%d]host:read_hdr - SWITCH", m_id);
00399 m_read_state = LOG_COLL_READ_HDR;
00400
00401 m_read_bytes_wanted = sizeof(NetMsgHeader);
00402 m_read_bytes_received = 0;
00403 m_read_buffer = (char *) &m_net_msg_header;
00404 ink_assert(m_client_vc != NULL);
00405 Debug("log-coll", "[%d]host:read_hdr - do_io_read(%" PRId64")", m_id, m_read_bytes_wanted);
00406 m_client_vio = m_client_vc->do_io_read(this, m_read_bytes_wanted, m_client_buffer);
00407 ink_assert(m_client_vio != NULL);
00408 return EVENT_CONT;
00409
00410 case VC_EVENT_IMMEDIATE:
00411 Debug("log-coll", "[%d]host::read_hdr - IMMEDIATE", m_id);
00412 return EVENT_CONT;
00413
00414 case VC_EVENT_READ_READY:
00415 Debug("log-coll", "[%d]host::read_hdr - READ_READY", m_id);
00416 read_partial(vio);
00417 return EVENT_CONT;
00418
00419 case VC_EVENT_READ_COMPLETE:
00420 Debug("log-coll", "[%d]host::read_hdr - READ_COMPLETE", m_id);
00421 read_partial(vio);
00422 ink_assert(m_read_bytes_wanted == m_read_bytes_received);
00423 return read_body(LOG_COLL_EVENT_SWITCH, NULL);
00424
00425 case VC_EVENT_EOS:
00426 case VC_EVENT_ERROR:
00427 Debug("log-coll", "[%d]host::read_hdr - EOS|ERROR", m_id);
00428 return read_done(LOG_COLL_EVENT_ERROR, NULL);
00429
00430 default:
00431 Debug("log-coll", "[%d]host::read_hdr - default %d", m_id, event);
00432 return read_done(LOG_COLL_EVENT_ERROR, NULL);
00433
00434 }
00435
00436 }
00437
00438
00439
00440
00441
00442
00443 int
00444 LogCollationHostSM::read_body(int event, VIO * vio)
00445 {
00446
00447 Debug("log-coll", "[%d]host::read_body", m_id);
00448
00449 switch (event) {
00450
00451 case LOG_COLL_EVENT_SWITCH:
00452 Debug("log-coll", "[%d]host:read_body - SWITCH", m_id);
00453 m_read_state = LOG_COLL_READ_BODY;
00454
00455 m_read_bytes_wanted = m_net_msg_header.msg_bytes;
00456 ink_assert(m_read_bytes_wanted > 0);
00457 m_read_bytes_received = 0;
00458 m_read_buffer = new char[m_read_bytes_wanted];
00459 ink_assert(m_read_buffer != NULL);
00460 ink_assert(m_client_vc != NULL);
00461 Debug("log-coll", "[%d]host:read_body - do_io_read(%" PRId64")", m_id, m_read_bytes_wanted);
00462 m_client_vio = m_client_vc->do_io_read(this, m_read_bytes_wanted, m_client_buffer);
00463 ink_assert(m_client_vio != NULL);
00464 return EVENT_CONT;
00465
00466 case VC_EVENT_IMMEDIATE:
00467 Debug("log-coll", "[%d]host::read_body - IMMEDIATE", m_id);
00468 return EVENT_CONT;
00469
00470 case VC_EVENT_READ_READY:
00471 Debug("log-coll", "[%d]host::read_body - READ_READY", m_id);
00472 read_partial(vio);
00473 return EVENT_CONT;
00474
00475 case VC_EVENT_READ_COMPLETE:
00476 Debug("log-coll", "[%d]host::read_body - READ_COMPLETE", m_id);
00477 read_partial(vio);
00478 ink_assert(m_read_bytes_wanted == m_read_bytes_received);
00479 return read_done(LOG_COLL_EVENT_READ_COMPLETE, NULL);
00480
00481 case VC_EVENT_EOS:
00482 case VC_EVENT_ERROR:
00483 Debug("log-coll", "[%d]host::read_body - EOS|ERROR", m_id);
00484 return read_done(LOG_COLL_EVENT_ERROR, NULL);
00485
00486 default:
00487 Debug("log-coll", "[%d]host::read_body - default %d", m_id, event);
00488 return read_done(LOG_COLL_EVENT_ERROR, NULL);
00489
00490 }
00491
00492 }
00493
00494
00495
00496
00497
00498
00499 int
00500 LogCollationHostSM::read_done(int event, void * )
00501 {
00502 SET_HANDLER((LogCollationHostSMHandler) & LogCollationHostSM::host_handler);
00503 return host_handler(event, NULL);
00504
00505 }
00506
00507
00508
00509
00510
00511 void
00512 LogCollationHostSM::read_partial(VIO * vio)
00513 {
00514
00515
00516 ink_assert(vio != NULL);
00517 ink_assert(vio->vc_server == m_client_vc);
00518 ink_assert(m_client_buffer != NULL);
00519 ink_assert(m_client_reader != NULL);
00520
00521
00522 char *p = &(m_read_buffer[m_read_bytes_received]);
00523 int64_t bytes_wanted_now = m_read_bytes_wanted - m_read_bytes_received;
00524 int64_t bytes_received_now = m_client_reader->read(p, bytes_wanted_now);
00525
00526 m_read_bytes_received += bytes_received_now;
00527 }