• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

LogCollationHostSM.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 //-------------------------------------------------------------------------
00025 // include files
00026 //-------------------------------------------------------------------------
00027 
00028 #include "ink_config.h"
00029 
00030 #include <stdio.h>
00031 #include <stdlib.h>
00032 #include <limits.h>
00033 #include <string.h>
00034 #include <sys/types.h>
00035 
00036 #include "P_EventSystem.h"
00037 #include "P_Net.h"
00038 
00039 #include "LogUtils.h"
00040 #include "LogSock.h"
00041 #include "LogField.h"
00042 #include "LogFile.h"
00043 #include "LogFormat.h"
00044 #include "LogBuffer.h"
00045 #include "LogHost.h"
00046 #include "LogObject.h"
00047 #include "LogConfig.h"
00048 #include "Log.h"
00049 
00050 #include "LogCollationHostSM.h"
00051 
00052 //-------------------------------------------------------------------------
00053 // statics
00054 //-------------------------------------------------------------------------
00055 
00056 int
00057   LogCollationHostSM::ID = 0;
00058 
00059 //-------------------------------------------------------------------------
00060 // LogCollationHostSM::LogCollationHostSM
00061 //-------------------------------------------------------------------------
00062 
00063 LogCollationHostSM::LogCollationHostSM(NetVConnection * client_vc):
00064 Continuation(new_ProxyMutex()),
00065 m_client_vc(client_vc),
00066 m_client_vio(NULL),
00067 m_client_buffer(NULL),
00068 m_client_reader(NULL),
00069 m_pending_event(NULL),
00070 m_read_buffer(NULL), m_read_bytes_wanted(0), m_read_bytes_received(0), m_client_ip(0), m_client_port(0), m_id(ID++)
00071 {
00072 
00073   Debug("log-coll", "[%d]host::constructor", m_id);
00074 
00075   ink_assert(m_client_vc != NULL);
00076 
00077   // get client info
00078   m_client_ip = m_client_vc->get_remote_ip();
00079   m_client_port = m_client_vc->get_remote_port();
00080   Note("[log-coll] client connected [%d.%d.%d.%d:%d]",
00081        ((unsigned char *) (&m_client_ip))[0],
00082        ((unsigned char *) (&m_client_ip))[1],
00083        ((unsigned char *) (&m_client_ip))[2], ((unsigned char *) (&m_client_ip))[3], m_client_port);
00084 
00085   SET_HANDLER((LogCollationHostSMHandler) & LogCollationHostSM::host_handler);
00086   host_init(LOG_COLL_EVENT_SWITCH, NULL);
00087 
00088 }
00089 
00090 //-------------------------------------------------------------------------
00091 //-------------------------------------------------------------------------
00092 //
00093 // handlers
00094 //
00095 //-------------------------------------------------------------------------
00096 //-------------------------------------------------------------------------
00097 
00098 //-------------------------------------------------------------------------
00099 // LogCollationHostSM::host_handler
00100 //-------------------------------------------------------------------------
00101 
00102 int
00103 LogCollationHostSM::host_handler(int event, void *data)
00104 {
00105 
00106   switch (m_host_state) {
00107   case LOG_COLL_HOST_AUTH:
00108     return host_auth(event, data);
00109   case LOG_COLL_HOST_DONE:
00110     return host_done(event, data);
00111   case LOG_COLL_HOST_INIT:
00112     return host_init(event, data);
00113   case LOG_COLL_HOST_RECV:
00114     return host_recv(event, data);
00115   default:
00116     ink_assert(!"unexpected state");
00117     return EVENT_CONT;
00118   }
00119 
00120 }
00121 
00122 //-------------------------------------------------------------------------
00123 // LogCollationHostSM::read_handler
00124 //-------------------------------------------------------------------------
00125 
00126 int
00127 LogCollationHostSM::read_handler(int event, void *data)
00128 {
00129 
00130   switch (m_read_state) {
00131   case LOG_COLL_READ_BODY:
00132     return read_body(event, (VIO *) data);
00133   case LOG_COLL_READ_HDR:
00134     return read_hdr(event, (VIO *) data);
00135   default:
00136     ink_assert(!"unexpected state");
00137     return EVENT_CONT;
00138   }
00139 
00140 }
00141 
00142 //-------------------------------------------------------------------------
00143 //-------------------------------------------------------------------------
00144 //
00145 // host states
00146 //
00147 //-------------------------------------------------------------------------
00148 //-------------------------------------------------------------------------
00149 
00150 //-------------------------------------------------------------------------
00151 // LogCollationHostSM::host_auth
00152 // next:  host_done || host_recv
00153 //-------------------------------------------------------------------------
00154 
00155 int
00156 LogCollationHostSM::host_auth(int event, void * /* data ATS_UNUSED */)
00157 {
00158   Debug("log-coll", "[%d]host::host_auth", m_id);
00159 
00160   switch (event) {
00161 
00162   case LOG_COLL_EVENT_SWITCH:
00163     Debug("log-coll", "[%d]host::host_auth - SWITCH", m_id);
00164     m_host_state = LOG_COLL_HOST_AUTH;
00165     return read_start();
00166 
00167   case LOG_COLL_EVENT_READ_COMPLETE:
00168     Debug("log-coll", "[%d]host::host_auth - READ_COMPLETE", m_id);
00169     {
00170       // compare authorization secrets
00171       ink_assert(m_read_buffer != NULL);
00172       int diff = strncmp(m_read_buffer, Log::config->collation_secret,
00173                          m_read_bytes_received);
00174       delete[]m_read_buffer;
00175       m_read_buffer = 0;
00176       if (!diff) {
00177         Debug("log-coll", "[%d]host::host_auth - authenticated!", m_id);
00178         return host_recv(LOG_COLL_EVENT_SWITCH, NULL);
00179       } else {
00180         Debug("log-coll", "[%d]host::host_auth - authenticated failed!", m_id);
00181         Note("[log-coll] authentication failed [%d.%d.%d.%d:%d]",
00182              ((unsigned char *) (&m_client_ip))[0],
00183              ((unsigned char *) (&m_client_ip))[1],
00184              ((unsigned char *) (&m_client_ip))[2], ((unsigned char *) (&m_client_ip))[3], m_client_port);
00185         return host_done(LOG_COLL_EVENT_SWITCH, NULL);
00186       }
00187 
00188     }
00189 
00190   case LOG_COLL_EVENT_ERROR:
00191     Debug("log-coll", "[%d]host::host_auth - ERROR", m_id);
00192     return host_done(LOG_COLL_EVENT_SWITCH, NULL);
00193 
00194   default:
00195     ink_assert(!"unexpected state");
00196     return EVENT_CONT;
00197 
00198   }
00199 
00200 }
00201 
00202 //-------------------------------------------------------------------------
00203 // LogCollationHostSM::host_done
00204 // next: none
00205 //-------------------------------------------------------------------------
00206 
00207 int
00208 LogCollationHostSM::host_done(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
00209 {
00210   Debug("log-coll", "[%d]host::host_done", m_id);
00211 
00212   // close connections
00213   if (m_client_vc) {
00214     Debug("log-coll", "[%d]host::host_done - disconnecting!", m_id);
00215     m_client_vc->do_io_close();
00216     m_client_vc = 0;
00217     Note("[log-coll] client disconnected [%d.%d.%d.%d:%d]",
00218          ((unsigned char *) (&m_client_ip))[0],
00219          ((unsigned char *) (&m_client_ip))[1],
00220          ((unsigned char *) (&m_client_ip))[2], ((unsigned char *) (&m_client_ip))[3], m_client_port);
00221   }
00222   // free memory
00223   if (m_client_buffer) {
00224     if (m_client_reader) {
00225       m_client_buffer->dealloc_reader(m_client_reader);
00226     }
00227     free_MIOBuffer(m_client_buffer);
00228   }
00229   // delete this state machine and return
00230   delete this;
00231   return EVENT_DONE;
00232 
00233 }
00234 
00235 //-------------------------------------------------------------------------
00236 // LogCollationHostSM::host_init
00237 // next: host_auth || host_done
00238 //-------------------------------------------------------------------------
00239 
00240 int
00241 LogCollationHostSM::host_init(int event, void * /* data ATS_UNUSED */)
00242 {
00243   Debug("log-coll", "[%d]host::host_init", m_id);
00244 
00245   switch (event) {
00246 
00247   case LOG_COLL_EVENT_SWITCH:
00248     m_host_state = LOG_COLL_HOST_INIT;
00249     m_pending_event = eventProcessor.schedule_imm(this);
00250     return EVENT_CONT;
00251 
00252   case EVENT_IMMEDIATE:
00253     // allocate memory
00254     m_client_buffer = new_MIOBuffer();
00255     ink_assert(m_client_buffer != NULL);
00256     m_client_reader = m_client_buffer->alloc_reader();
00257     ink_assert(m_client_reader != NULL);
00258     return host_auth(LOG_COLL_EVENT_SWITCH, NULL);
00259 
00260   default:
00261     ink_assert(!"unexpected state");
00262     return EVENT_DONE;
00263 
00264   }
00265 
00266 }
00267 
00268 //-------------------------------------------------------------------------
00269 // LogCollationHostSM::host_recv
00270 // next: host_done || host_recv
00271 //-------------------------------------------------------------------------
00272 
00273 int
00274 LogCollationHostSM::host_recv(int event, void * /* data ATS_UNUSED */)
00275 {
00276   Debug("log-coll", "[%d]host::host_recv", m_id);
00277 
00278   switch (event) {
00279 
00280   case LOG_COLL_EVENT_SWITCH:
00281     Debug("log-coll", "[%d]host::host_recv - SWITCH", m_id);
00282     m_host_state = LOG_COLL_HOST_RECV;
00283     return read_start();
00284 
00285   case LOG_COLL_EVENT_READ_COMPLETE:
00286     Debug("log-coll", "[%d]host::host_recv - READ_COMPLETE", m_id);
00287     {
00288       // grab the log_buffer
00289       LogBufferHeader *log_buffer_header;
00290       LogBuffer *log_buffer;
00291       LogFormat *log_format;
00292       LogObject *log_object;
00293       unsigned version;
00294 
00295       ink_assert(m_read_buffer != NULL);
00296       ink_assert(m_read_bytes_received >= (int64_t)sizeof(LogBufferHeader));
00297       log_buffer_header = (LogBufferHeader *) m_read_buffer;
00298 
00299       // convert the buffer we just received to host order
00300       // TODO: We currently don't try to make the log buffers handle little vs big endian. TS-1156.
00301       // LogBuffer::convert_to_host_order(log_buffer_header);
00302 
00303       version = log_buffer_header->version;
00304       if (version != LOG_SEGMENT_VERSION) {
00305         Note("[log-coll] invalid LogBuffer received; invalid version - "
00306              "buffer = %u, current = %u", version, LOG_SEGMENT_VERSION);
00307         delete[]m_read_buffer;
00308 
00309       } else {
00310         log_object = Log::match_logobject(log_buffer_header);
00311         if (!log_object) {
00312           Note("[log-coll] LogObject not found with fieldlist id; " "writing LogBuffer to scrap file");
00313           log_object = Log::global_scrap_object;
00314         }
00315         log_format = log_object->m_format;
00316         Debug("log-coll", "[%d]host::host_recv - using format '%s'", m_id, log_format->name());
00317 
00318         // make a new LogBuffer (log_buffer_header plus subsequent
00319         // buffer already converted to host order) and add it to the
00320         // object's flush queue
00321         //
00322         log_buffer = new LogBuffer(log_object, log_buffer_header);
00323 
00324         RecIncrRawStat(log_rsb, mutex->thread_holding, log_stat_num_received_from_network_stat,
00325                        log_buffer_header->entry_count);
00326 
00327         RecIncrRawStat(log_rsb, mutex->thread_holding, log_stat_bytes_received_from_network_stat,
00328                        log_buffer_header->byte_count);
00329 
00330         int idx = log_object->add_to_flush_queue(log_buffer);
00331         Log::preproc_notify[idx].signal();
00332       }
00333 
00334 #if defined(LOG_BUFFER_TRACKING)
00335       Debug("log-buftrak", "[%d]host::host_recv - network read complete", log_buffer_header->id);
00336 #endif // defined(LOG_BUFFER_TRACKING)
00337 
00338       // get ready for next read (memory may not be freed!!!)
00339       m_read_buffer = 0;
00340 
00341       return host_recv(LOG_COLL_EVENT_SWITCH, NULL);
00342 
00343     }
00344 
00345   case LOG_COLL_EVENT_ERROR:
00346     Debug("log-coll", "[%d]host::host_recv - ERROR", m_id);
00347     return host_done(LOG_COLL_EVENT_SWITCH, NULL);
00348 
00349   default:
00350     ink_assert(!"unexpected state");
00351     return EVENT_DONE;
00352 
00353   }
00354 
00355 }
00356 
00357 //-------------------------------------------------------------------------
00358 //-------------------------------------------------------------------------
00359 //
00360 // read states
00361 //
00362 //-------------------------------------------------------------------------
00363 //-------------------------------------------------------------------------
00364 
00365 //-------------------------------------------------------------------------
00366 // LogCollationHostSM::read_start
00367 // next: read_hdr
00368 //-------------------------------------------------------------------------
00369 
00370 int
00371 LogCollationHostSM::read_start()
00372 {
00373 
00374   Debug("log-coll", "[%d]host::read_start", m_id);
00375 
00376   SET_HANDLER((LogCollationHostSMHandler) & LogCollationHostSM::read_handler);
00377   if (m_read_buffer) {
00378     ink_assert(!"m_read_buffer still points to something, doh!");
00379   }
00380   return read_hdr(LOG_COLL_EVENT_SWITCH, NULL);
00381 
00382 }
00383 
00384 //-------------------------------------------------------------------------
00385 // LogCollationHostSM::read_hdr
00386 // next: read_body || read_done
00387 //-------------------------------------------------------------------------
00388 
00389 int
00390 LogCollationHostSM::read_hdr(int event, VIO * vio)
00391 {
00392 
00393   Debug("log-coll", "[%d]host::read_hdr", m_id);
00394 
00395   switch (event) {
00396 
00397   case LOG_COLL_EVENT_SWITCH:
00398     Debug("log-coll", "[%d]host:read_hdr - SWITCH", m_id);
00399     m_read_state = LOG_COLL_READ_HDR;
00400 
00401     m_read_bytes_wanted = sizeof(NetMsgHeader);
00402     m_read_bytes_received = 0;
00403     m_read_buffer = (char *) &m_net_msg_header;
00404     ink_assert(m_client_vc != NULL);
00405     Debug("log-coll", "[%d]host:read_hdr - do_io_read(%" PRId64")", m_id, m_read_bytes_wanted);
00406     m_client_vio = m_client_vc->do_io_read(this, m_read_bytes_wanted, m_client_buffer);
00407     ink_assert(m_client_vio != NULL);
00408     return EVENT_CONT;
00409 
00410   case VC_EVENT_IMMEDIATE:
00411     Debug("log-coll", "[%d]host::read_hdr - IMMEDIATE", m_id);
00412     return EVENT_CONT;
00413 
00414   case VC_EVENT_READ_READY:
00415     Debug("log-coll", "[%d]host::read_hdr - READ_READY", m_id);
00416     read_partial(vio);
00417     return EVENT_CONT;
00418 
00419   case VC_EVENT_READ_COMPLETE:
00420     Debug("log-coll", "[%d]host::read_hdr - READ_COMPLETE", m_id);
00421     read_partial(vio);
00422     ink_assert(m_read_bytes_wanted == m_read_bytes_received);
00423     return read_body(LOG_COLL_EVENT_SWITCH, NULL);
00424 
00425   case VC_EVENT_EOS:
00426   case VC_EVENT_ERROR:
00427     Debug("log-coll", "[%d]host::read_hdr - EOS|ERROR", m_id);
00428     return read_done(LOG_COLL_EVENT_ERROR, NULL);
00429 
00430   default:
00431     Debug("log-coll", "[%d]host::read_hdr - default %d", m_id, event);
00432     return read_done(LOG_COLL_EVENT_ERROR, NULL);
00433 
00434   }
00435 
00436 }
00437 
00438 //-------------------------------------------------------------------------
00439 // LogCollationHostSM::read_body
00440 // next: read_body || read_done
00441 //-------------------------------------------------------------------------
00442 
00443 int
00444 LogCollationHostSM::read_body(int event, VIO * vio)
00445 {
00446 
00447   Debug("log-coll", "[%d]host::read_body", m_id);
00448 
00449   switch (event) {
00450 
00451   case LOG_COLL_EVENT_SWITCH:
00452     Debug("log-coll", "[%d]host:read_body - SWITCH", m_id);
00453     m_read_state = LOG_COLL_READ_BODY;
00454 
00455     m_read_bytes_wanted = m_net_msg_header.msg_bytes;
00456     ink_assert(m_read_bytes_wanted > 0);
00457     m_read_bytes_received = 0;
00458     m_read_buffer = new char[m_read_bytes_wanted];
00459     ink_assert(m_read_buffer != NULL);
00460     ink_assert(m_client_vc != NULL);
00461     Debug("log-coll", "[%d]host:read_body - do_io_read(%" PRId64")", m_id, m_read_bytes_wanted);
00462     m_client_vio = m_client_vc->do_io_read(this, m_read_bytes_wanted, m_client_buffer);
00463     ink_assert(m_client_vio != NULL);
00464     return EVENT_CONT;
00465 
00466   case VC_EVENT_IMMEDIATE:
00467     Debug("log-coll", "[%d]host::read_body - IMMEDIATE", m_id);
00468     return EVENT_CONT;
00469 
00470   case VC_EVENT_READ_READY:
00471     Debug("log-coll", "[%d]host::read_body - READ_READY", m_id);
00472     read_partial(vio);
00473     return EVENT_CONT;
00474 
00475   case VC_EVENT_READ_COMPLETE:
00476     Debug("log-coll", "[%d]host::read_body - READ_COMPLETE", m_id);
00477     read_partial(vio);
00478     ink_assert(m_read_bytes_wanted == m_read_bytes_received);
00479     return read_done(LOG_COLL_EVENT_READ_COMPLETE, NULL);
00480 
00481   case VC_EVENT_EOS:
00482   case VC_EVENT_ERROR:
00483     Debug("log-coll", "[%d]host::read_body - EOS|ERROR", m_id);
00484     return read_done(LOG_COLL_EVENT_ERROR, NULL);
00485 
00486   default:
00487     Debug("log-coll", "[%d]host::read_body - default %d", m_id, event);
00488     return read_done(LOG_COLL_EVENT_ERROR, NULL);
00489 
00490   }
00491 
00492 }
00493 
00494 //-------------------------------------------------------------------------
00495 // LogCollationHostSM::read_done
00496 // next: give control back to host state-machine
00497 //-------------------------------------------------------------------------
00498 
00499 int
00500 LogCollationHostSM::read_done(int event, void * /* data ATS_UNUSED */)
00501 {
00502   SET_HANDLER((LogCollationHostSMHandler) & LogCollationHostSM::host_handler);
00503   return host_handler(event, NULL);
00504 
00505 }
00506 
00507 //-------------------------------------------------------------------------
00508 // LogCollationHostSM::read_partial
00509 //-------------------------------------------------------------------------
00510 
00511 void
00512 LogCollationHostSM::read_partial(VIO * vio)
00513 {
00514 
00515   // checks
00516   ink_assert(vio != NULL);
00517   ink_assert(vio->vc_server == m_client_vc);
00518   ink_assert(m_client_buffer != NULL);
00519   ink_assert(m_client_reader != NULL);
00520 
00521   // careful not to read more than we have memory for
00522   char *p = &(m_read_buffer[m_read_bytes_received]);
00523   int64_t bytes_wanted_now = m_read_bytes_wanted - m_read_bytes_received;
00524   int64_t bytes_received_now = m_client_reader->read(p, bytes_wanted_now);
00525 
00526   m_read_bytes_received += bytes_received_now;
00527 }

Generated by  doxygen 1.7.1