00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 #include "ink_config.h"
00029 
00030 #include <stdio.h>
00031 #include <stdlib.h>
00032 #include <limits.h>
00033 #include <string.h>
00034 #include <sys/types.h>
00035 
00036 #include "P_EventSystem.h"
00037 #include "P_Net.h"
00038 
00039 #include "LogUtils.h"
00040 #include "LogSock.h"
00041 #include "LogField.h"
00042 #include "LogFile.h"
00043 #include "LogFormat.h"
00044 #include "LogBuffer.h"
00045 #include "LogHost.h"
00046 #include "LogObject.h"
00047 #include "LogConfig.h"
00048 #include "Log.h"
00049 
00050 #include "LogCollationHostSM.h"
00051 
00052 
00053 
00054 
00055 
00056 int
00057   LogCollationHostSM::ID = 0;
00058 
00059 
00060 
00061 
00062 
00063 LogCollationHostSM::LogCollationHostSM(NetVConnection * client_vc):
00064 Continuation(new_ProxyMutex()),
00065 m_client_vc(client_vc),
00066 m_client_vio(NULL),
00067 m_client_buffer(NULL),
00068 m_client_reader(NULL),
00069 m_pending_event(NULL),
00070 m_read_buffer(NULL), m_read_bytes_wanted(0), m_read_bytes_received(0), m_client_ip(0), m_client_port(0), m_id(ID++)
00071 {
00072 
00073   Debug("log-coll", "[%d]host::constructor", m_id);
00074 
00075   ink_assert(m_client_vc != NULL);
00076 
00077   
00078   m_client_ip = m_client_vc->get_remote_ip();
00079   m_client_port = m_client_vc->get_remote_port();
00080   Note("[log-coll] client connected [%d.%d.%d.%d:%d]",
00081        ((unsigned char *) (&m_client_ip))[0],
00082        ((unsigned char *) (&m_client_ip))[1],
00083        ((unsigned char *) (&m_client_ip))[2], ((unsigned char *) (&m_client_ip))[3], m_client_port);
00084 
00085   SET_HANDLER((LogCollationHostSMHandler) & LogCollationHostSM::host_handler);
00086   host_init(LOG_COLL_EVENT_SWITCH, NULL);
00087 
00088 }
00089 
00090 
00091 
00092 
00093 
00094 
00095 
00096 
00097 
00098 
00099 
00100 
00101 
00102 int
00103 LogCollationHostSM::host_handler(int event, void *data)
00104 {
00105 
00106   switch (m_host_state) {
00107   case LOG_COLL_HOST_AUTH:
00108     return host_auth(event, data);
00109   case LOG_COLL_HOST_DONE:
00110     return host_done(event, data);
00111   case LOG_COLL_HOST_INIT:
00112     return host_init(event, data);
00113   case LOG_COLL_HOST_RECV:
00114     return host_recv(event, data);
00115   default:
00116     ink_assert(!"unexpected state");
00117     return EVENT_CONT;
00118   }
00119 
00120 }
00121 
00122 
00123 
00124 
00125 
00126 int
00127 LogCollationHostSM::read_handler(int event, void *data)
00128 {
00129 
00130   switch (m_read_state) {
00131   case LOG_COLL_READ_BODY:
00132     return read_body(event, (VIO *) data);
00133   case LOG_COLL_READ_HDR:
00134     return read_hdr(event, (VIO *) data);
00135   default:
00136     ink_assert(!"unexpected state");
00137     return EVENT_CONT;
00138   }
00139 
00140 }
00141 
00142 
00143 
00144 
00145 
00146 
00147 
00148 
00149 
00150 
00151 
00152 
00153 
00154 
00155 int
00156 LogCollationHostSM::host_auth(int event, void * )
00157 {
00158   Debug("log-coll", "[%d]host::host_auth", m_id);
00159 
00160   switch (event) {
00161 
00162   case LOG_COLL_EVENT_SWITCH:
00163     Debug("log-coll", "[%d]host::host_auth - SWITCH", m_id);
00164     m_host_state = LOG_COLL_HOST_AUTH;
00165     return read_start();
00166 
00167   case LOG_COLL_EVENT_READ_COMPLETE:
00168     Debug("log-coll", "[%d]host::host_auth - READ_COMPLETE", m_id);
00169     {
00170       
00171       ink_assert(m_read_buffer != NULL);
00172       int diff = strncmp(m_read_buffer, Log::config->collation_secret,
00173                          m_read_bytes_received);
00174       delete[]m_read_buffer;
00175       m_read_buffer = 0;
00176       if (!diff) {
00177         Debug("log-coll", "[%d]host::host_auth - authenticated!", m_id);
00178         return host_recv(LOG_COLL_EVENT_SWITCH, NULL);
00179       } else {
00180         Debug("log-coll", "[%d]host::host_auth - authenticated failed!", m_id);
00181         Note("[log-coll] authentication failed [%d.%d.%d.%d:%d]",
00182              ((unsigned char *) (&m_client_ip))[0],
00183              ((unsigned char *) (&m_client_ip))[1],
00184              ((unsigned char *) (&m_client_ip))[2], ((unsigned char *) (&m_client_ip))[3], m_client_port);
00185         return host_done(LOG_COLL_EVENT_SWITCH, NULL);
00186       }
00187 
00188     }
00189 
00190   case LOG_COLL_EVENT_ERROR:
00191     Debug("log-coll", "[%d]host::host_auth - ERROR", m_id);
00192     return host_done(LOG_COLL_EVENT_SWITCH, NULL);
00193 
00194   default:
00195     ink_assert(!"unexpected state");
00196     return EVENT_CONT;
00197 
00198   }
00199 
00200 }
00201 
00202 
00203 
00204 
00205 
00206 
00207 int
00208 LogCollationHostSM::host_done(int , void * )
00209 {
00210   Debug("log-coll", "[%d]host::host_done", m_id);
00211 
00212   
00213   if (m_client_vc) {
00214     Debug("log-coll", "[%d]host::host_done - disconnecting!", m_id);
00215     m_client_vc->do_io_close();
00216     m_client_vc = 0;
00217     Note("[log-coll] client disconnected [%d.%d.%d.%d:%d]",
00218          ((unsigned char *) (&m_client_ip))[0],
00219          ((unsigned char *) (&m_client_ip))[1],
00220          ((unsigned char *) (&m_client_ip))[2], ((unsigned char *) (&m_client_ip))[3], m_client_port);
00221   }
00222   
00223   if (m_client_buffer) {
00224     if (m_client_reader) {
00225       m_client_buffer->dealloc_reader(m_client_reader);
00226     }
00227     free_MIOBuffer(m_client_buffer);
00228   }
00229   
00230   delete this;
00231   return EVENT_DONE;
00232 
00233 }
00234 
00235 
00236 
00237 
00238 
00239 
00240 int
00241 LogCollationHostSM::host_init(int event, void * )
00242 {
00243   Debug("log-coll", "[%d]host::host_init", m_id);
00244 
00245   switch (event) {
00246 
00247   case LOG_COLL_EVENT_SWITCH:
00248     m_host_state = LOG_COLL_HOST_INIT;
00249     m_pending_event = eventProcessor.schedule_imm(this);
00250     return EVENT_CONT;
00251 
00252   case EVENT_IMMEDIATE:
00253     
00254     m_client_buffer = new_MIOBuffer();
00255     ink_assert(m_client_buffer != NULL);
00256     m_client_reader = m_client_buffer->alloc_reader();
00257     ink_assert(m_client_reader != NULL);
00258     return host_auth(LOG_COLL_EVENT_SWITCH, NULL);
00259 
00260   default:
00261     ink_assert(!"unexpected state");
00262     return EVENT_DONE;
00263 
00264   }
00265 
00266 }
00267 
00268 
00269 
00270 
00271 
00272 
00273 int
00274 LogCollationHostSM::host_recv(int event, void * )
00275 {
00276   Debug("log-coll", "[%d]host::host_recv", m_id);
00277 
00278   switch (event) {
00279 
00280   case LOG_COLL_EVENT_SWITCH:
00281     Debug("log-coll", "[%d]host::host_recv - SWITCH", m_id);
00282     m_host_state = LOG_COLL_HOST_RECV;
00283     return read_start();
00284 
00285   case LOG_COLL_EVENT_READ_COMPLETE:
00286     Debug("log-coll", "[%d]host::host_recv - READ_COMPLETE", m_id);
00287     {
00288       
00289       LogBufferHeader *log_buffer_header;
00290       LogBuffer *log_buffer;
00291       LogFormat *log_format;
00292       LogObject *log_object;
00293       unsigned version;
00294 
00295       ink_assert(m_read_buffer != NULL);
00296       ink_assert(m_read_bytes_received >= (int64_t)sizeof(LogBufferHeader));
00297       log_buffer_header = (LogBufferHeader *) m_read_buffer;
00298 
00299       
00300       
00301       
00302 
00303       version = log_buffer_header->version;
00304       if (version != LOG_SEGMENT_VERSION) {
00305         Note("[log-coll] invalid LogBuffer received; invalid version - "
00306              "buffer = %u, current = %u", version, LOG_SEGMENT_VERSION);
00307         delete[]m_read_buffer;
00308 
00309       } else {
00310         log_object = Log::match_logobject(log_buffer_header);
00311         if (!log_object) {
00312           Note("[log-coll] LogObject not found with fieldlist id; " "writing LogBuffer to scrap file");
00313           log_object = Log::global_scrap_object;
00314         }
00315         log_format = log_object->m_format;
00316         Debug("log-coll", "[%d]host::host_recv - using format '%s'", m_id, log_format->name());
00317 
00318         
00319         
00320         
00321         
00322         log_buffer = new LogBuffer(log_object, log_buffer_header);
00323 
00324         RecIncrRawStat(log_rsb, mutex->thread_holding, log_stat_num_received_from_network_stat,
00325                        log_buffer_header->entry_count);
00326 
00327         RecIncrRawStat(log_rsb, mutex->thread_holding, log_stat_bytes_received_from_network_stat,
00328                        log_buffer_header->byte_count);
00329 
00330         int idx = log_object->add_to_flush_queue(log_buffer);
00331         Log::preproc_notify[idx].signal();
00332       }
00333 
00334 #if defined(LOG_BUFFER_TRACKING)
00335       Debug("log-buftrak", "[%d]host::host_recv - network read complete", log_buffer_header->id);
00336 #endif // defined(LOG_BUFFER_TRACKING)
00337 
00338       
00339       m_read_buffer = 0;
00340 
00341       return host_recv(LOG_COLL_EVENT_SWITCH, NULL);
00342 
00343     }
00344 
00345   case LOG_COLL_EVENT_ERROR:
00346     Debug("log-coll", "[%d]host::host_recv - ERROR", m_id);
00347     return host_done(LOG_COLL_EVENT_SWITCH, NULL);
00348 
00349   default:
00350     ink_assert(!"unexpected state");
00351     return EVENT_DONE;
00352 
00353   }
00354 
00355 }
00356 
00357 
00358 
00359 
00360 
00361 
00362 
00363 
00364 
00365 
00366 
00367 
00368 
00369 
00370 int
00371 LogCollationHostSM::read_start()
00372 {
00373 
00374   Debug("log-coll", "[%d]host::read_start", m_id);
00375 
00376   SET_HANDLER((LogCollationHostSMHandler) & LogCollationHostSM::read_handler);
00377   if (m_read_buffer) {
00378     ink_assert(!"m_read_buffer still points to something, doh!");
00379   }
00380   return read_hdr(LOG_COLL_EVENT_SWITCH, NULL);
00381 
00382 }
00383 
00384 
00385 
00386 
00387 
00388 
00389 int
00390 LogCollationHostSM::read_hdr(int event, VIO * vio)
00391 {
00392 
00393   Debug("log-coll", "[%d]host::read_hdr", m_id);
00394 
00395   switch (event) {
00396 
00397   case LOG_COLL_EVENT_SWITCH:
00398     Debug("log-coll", "[%d]host:read_hdr - SWITCH", m_id);
00399     m_read_state = LOG_COLL_READ_HDR;
00400 
00401     m_read_bytes_wanted = sizeof(NetMsgHeader);
00402     m_read_bytes_received = 0;
00403     m_read_buffer = (char *) &m_net_msg_header;
00404     ink_assert(m_client_vc != NULL);
00405     Debug("log-coll", "[%d]host:read_hdr - do_io_read(%" PRId64")", m_id, m_read_bytes_wanted);
00406     m_client_vio = m_client_vc->do_io_read(this, m_read_bytes_wanted, m_client_buffer);
00407     ink_assert(m_client_vio != NULL);
00408     return EVENT_CONT;
00409 
00410   case VC_EVENT_IMMEDIATE:
00411     Debug("log-coll", "[%d]host::read_hdr - IMMEDIATE", m_id);
00412     return EVENT_CONT;
00413 
00414   case VC_EVENT_READ_READY:
00415     Debug("log-coll", "[%d]host::read_hdr - READ_READY", m_id);
00416     read_partial(vio);
00417     return EVENT_CONT;
00418 
00419   case VC_EVENT_READ_COMPLETE:
00420     Debug("log-coll", "[%d]host::read_hdr - READ_COMPLETE", m_id);
00421     read_partial(vio);
00422     ink_assert(m_read_bytes_wanted == m_read_bytes_received);
00423     return read_body(LOG_COLL_EVENT_SWITCH, NULL);
00424 
00425   case VC_EVENT_EOS:
00426   case VC_EVENT_ERROR:
00427     Debug("log-coll", "[%d]host::read_hdr - EOS|ERROR", m_id);
00428     return read_done(LOG_COLL_EVENT_ERROR, NULL);
00429 
00430   default:
00431     Debug("log-coll", "[%d]host::read_hdr - default %d", m_id, event);
00432     return read_done(LOG_COLL_EVENT_ERROR, NULL);
00433 
00434   }
00435 
00436 }
00437 
00438 
00439 
00440 
00441 
00442 
00443 int
00444 LogCollationHostSM::read_body(int event, VIO * vio)
00445 {
00446 
00447   Debug("log-coll", "[%d]host::read_body", m_id);
00448 
00449   switch (event) {
00450 
00451   case LOG_COLL_EVENT_SWITCH:
00452     Debug("log-coll", "[%d]host:read_body - SWITCH", m_id);
00453     m_read_state = LOG_COLL_READ_BODY;
00454 
00455     m_read_bytes_wanted = m_net_msg_header.msg_bytes;
00456     ink_assert(m_read_bytes_wanted > 0);
00457     m_read_bytes_received = 0;
00458     m_read_buffer = new char[m_read_bytes_wanted];
00459     ink_assert(m_read_buffer != NULL);
00460     ink_assert(m_client_vc != NULL);
00461     Debug("log-coll", "[%d]host:read_body - do_io_read(%" PRId64")", m_id, m_read_bytes_wanted);
00462     m_client_vio = m_client_vc->do_io_read(this, m_read_bytes_wanted, m_client_buffer);
00463     ink_assert(m_client_vio != NULL);
00464     return EVENT_CONT;
00465 
00466   case VC_EVENT_IMMEDIATE:
00467     Debug("log-coll", "[%d]host::read_body - IMMEDIATE", m_id);
00468     return EVENT_CONT;
00469 
00470   case VC_EVENT_READ_READY:
00471     Debug("log-coll", "[%d]host::read_body - READ_READY", m_id);
00472     read_partial(vio);
00473     return EVENT_CONT;
00474 
00475   case VC_EVENT_READ_COMPLETE:
00476     Debug("log-coll", "[%d]host::read_body - READ_COMPLETE", m_id);
00477     read_partial(vio);
00478     ink_assert(m_read_bytes_wanted == m_read_bytes_received);
00479     return read_done(LOG_COLL_EVENT_READ_COMPLETE, NULL);
00480 
00481   case VC_EVENT_EOS:
00482   case VC_EVENT_ERROR:
00483     Debug("log-coll", "[%d]host::read_body - EOS|ERROR", m_id);
00484     return read_done(LOG_COLL_EVENT_ERROR, NULL);
00485 
00486   default:
00487     Debug("log-coll", "[%d]host::read_body - default %d", m_id, event);
00488     return read_done(LOG_COLL_EVENT_ERROR, NULL);
00489 
00490   }
00491 
00492 }
00493 
00494 
00495 
00496 
00497 
00498 
00499 int
00500 LogCollationHostSM::read_done(int event, void * )
00501 {
00502   SET_HANDLER((LogCollationHostSMHandler) & LogCollationHostSM::host_handler);
00503   return host_handler(event, NULL);
00504 
00505 }
00506 
00507 
00508 
00509 
00510 
00511 void
00512 LogCollationHostSM::read_partial(VIO * vio)
00513 {
00514 
00515   
00516   ink_assert(vio != NULL);
00517   ink_assert(vio->vc_server == m_client_vc);
00518   ink_assert(m_client_buffer != NULL);
00519   ink_assert(m_client_reader != NULL);
00520 
00521   
00522   char *p = &(m_read_buffer[m_read_bytes_received]);
00523   int64_t bytes_wanted_now = m_read_bytes_wanted - m_read_bytes_received;
00524   int64_t bytes_received_now = m_client_reader->read(p, bytes_wanted_now);
00525 
00526   m_read_bytes_received += bytes_received_now;
00527 }