• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

LogCollationClientSM.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 //-------------------------------------------------------------------------
00025 // include files
00026 //-------------------------------------------------------------------------
00027 
00028 #include "libts.h"
00029 
00030 #include <stdio.h>
00031 #include <stdlib.h>
00032 #include <limits.h>
00033 #include <string.h>
00034 #include <sys/types.h>
00035 
00036 #include "P_EventSystem.h"
00037 #include "P_Net.h"
00038 
00039 #include "LogUtils.h"
00040 #include "LogSock.h"
00041 #include "LogField.h"
00042 #include "LogFile.h"
00043 #include "LogFormat.h"
00044 #include "LogBuffer.h"
00045 #include "LogHost.h"
00046 #include "LogObject.h"
00047 #include "LogConfig.h"
00048 #include "Log.h"
00049 
00050 #include "LogCollationClientSM.h"
00051 
00052 //-------------------------------------------------------------------------
00053 // statics
00054 //-------------------------------------------------------------------------
00055 
00056 int LogCollationClientSM::ID = 0;
00057 
00058 //-------------------------------------------------------------------------
00059 // LogCollationClientSM::LogCollationClientSM
00060 //-------------------------------------------------------------------------
00061 
00062 LogCollationClientSM::LogCollationClientSM(LogHost * log_host)
00063   : Continuation(new_ProxyMutex()),
00064     m_host_vc(NULL),
00065     m_host_vio(NULL),
00066     m_auth_buffer(NULL),
00067     m_auth_reader(NULL),
00068     m_send_buffer(NULL),
00069     m_send_reader(NULL),
00070     m_pending_action(NULL),
00071     m_pending_event(NULL),
00072     m_abort_vio(NULL),
00073     m_abort_buffer(NULL),
00074     m_host_is_up(false),
00075     m_buffer_send_list(NULL),
00076     m_buffer_in_iocore(NULL),
00077     m_flow(LOG_COLL_FLOW_ALLOW),
00078     m_log_host(log_host),
00079     m_id(ID++)
00080 {
00081   Debug("log-coll", "[%d]client::constructor", m_id);
00082 
00083   ink_assert(m_log_host != NULL);
00084 
00085   // allocate send_list before we do anything
00086   // we can accept logs to send before we're fully initialized
00087   m_buffer_send_list = new LogBufferList();
00088   ink_assert(m_buffer_send_list != NULL);
00089 
00090   SET_HANDLER((LogCollationClientSMHandler) & LogCollationClientSM::client_handler);
00091   client_init(LOG_COLL_EVENT_SWITCH, NULL);
00092 
00093 }
00094 
00095 //-------------------------------------------------------------------------
00096 // LogCollationClientSM::~LogCollationClientSM
00097 //-------------------------------------------------------------------------
00098 
00099 LogCollationClientSM::~LogCollationClientSM()
00100 {
00101   Debug("log-coll", "[%d]client::destructor", m_id);
00102 
00103   ink_mutex_acquire(&(mutex->the_mutex));
00104   client_done(LOG_COLL_EVENT_SWITCH, NULL);
00105   ink_mutex_release(&(mutex->the_mutex));
00106 }
00107 
00108 //-------------------------------------------------------------------------
00109 //-------------------------------------------------------------------------
00110 //
00111 // handler
00112 //
00113 //-------------------------------------------------------------------------
00114 //-------------------------------------------------------------------------
00115 
00116 //-------------------------------------------------------------------------
00117 // LogCollationClientSM::client_handler
00118 //-------------------------------------------------------------------------
00119 
00120 int
00121 LogCollationClientSM::client_handler(int event, void *data)
00122 {
00123   switch (m_client_state) {
00124   case LOG_COLL_CLIENT_AUTH:
00125     return client_auth(event, (VIO *) data);
00126   case LOG_COLL_CLIENT_DNS:
00127     return client_dns(event, (HostDBInfo *) data);
00128   case LOG_COLL_CLIENT_DONE:
00129     return client_done(event, data);
00130   case LOG_COLL_CLIENT_FAIL:
00131     return client_fail(event, data);
00132   case LOG_COLL_CLIENT_IDLE:
00133     return client_idle(event, data);
00134   case LOG_COLL_CLIENT_INIT:
00135     return client_init(event, data);
00136   case LOG_COLL_CLIENT_OPEN:
00137     return client_open(event, (NetVConnection *) data);
00138   case LOG_COLL_CLIENT_SEND:
00139     return client_send(event, (VIO *) data);
00140   default:
00141     ink_assert(!"unexpcted state");
00142     return EVENT_CONT;
00143   }
00144 }
00145 
00146 //-------------------------------------------------------------------------
00147 //-------------------------------------------------------------------------
00148 //
00149 // pubic interface
00150 //
00151 //-------------------------------------------------------------------------
00152 //-------------------------------------------------------------------------
00153 
00154 //-------------------------------------------------------------------------
00155 // LogCollationClientSM::send
00156 //-------------------------------------------------------------------------
00157 
00158 int
00159 LogCollationClientSM::send(LogBuffer * log_buffer)
00160 {
00161   ip_port_text_buffer ipb;
00162 
00163   // take lock (can block on call because we're on our own thread)
00164   ink_mutex_acquire(&(mutex->the_mutex));
00165 
00166   Debug("log-coll", "[%d]client::send", m_id);
00167 
00168   // deny if state is DONE or FAIL
00169   if (m_client_state == LOG_COLL_CLIENT_DONE || m_client_state == LOG_COLL_CLIENT_FAIL) {
00170     Debug("log-coll", "[%d]client::send - DONE/FAIL state; rejecting", m_id);
00171     ink_mutex_release(&(mutex->the_mutex));
00172     return 0;
00173   }
00174   // only allow send if m_flow is ALLOW
00175   if (m_flow == LOG_COLL_FLOW_DENY) {
00176     Debug("log-coll", "[%d]client::send - m_flow = DENY; rejecting", m_id);
00177     ink_mutex_release(&(mutex->the_mutex));
00178     return 0;
00179   }
00180   // add log_buffer to m_buffer_send_list
00181   ink_assert(log_buffer != NULL);
00182   ink_assert(m_buffer_send_list != NULL);
00183   m_buffer_send_list->add(log_buffer);
00184   Debug("log-coll", "[%d]client::send - new log_buffer to send_list", m_id);
00185 
00186   // disable m_flow if there's too much work to do now
00187   ink_assert(m_flow == LOG_COLL_FLOW_ALLOW);
00188   if (m_buffer_send_list->get_size() >= Log::config->collation_max_send_buffers) {
00189     Debug("log-coll", "[%d]client::send - m_flow = DENY", m_id);
00190     Note("[log-coll] send-queue full; orphaning logs      "
00191          "[%s:%u]", m_log_host->ip_addr().toString(ipb, sizeof(ipb)), m_log_host->port());
00192     m_flow = LOG_COLL_FLOW_DENY;
00193   }
00194   // compute return value
00195   //   must be done before call to client_send.  log_buffer may
00196   //   be converted to network order during that call.
00197   LogBufferHeader *log_buffer_header = log_buffer->header();
00198   ink_assert(log_buffer_header != NULL);
00199   int bytes_to_write = log_buffer_header->byte_count;
00200 
00201   // re-initiate sending if currently idle
00202   if (m_client_state == LOG_COLL_CLIENT_IDLE) {
00203     m_client_state = LOG_COLL_CLIENT_SEND;
00204     ink_assert(m_pending_event == NULL);
00205     m_pending_event = eventProcessor.schedule_imm(this);
00206     //eventProcessor.schedule_imm(this);
00207     //client_send(LOG_COLL_EVENT_SWITCH, NULL);
00208   }
00209 
00210   ink_mutex_release(&(mutex->the_mutex));
00211   return bytes_to_write;
00212 }
00213 
00214 //-------------------------------------------------------------------------
00215 //-------------------------------------------------------------------------
00216 //
00217 // client states
00218 //
00219 //-------------------------------------------------------------------------
00220 //-------------------------------------------------------------------------
00221 
00222 //-------------------------------------------------------------------------
00223 // LogCollationClientSM::client_auth
00224 // next: client_fail || client_send
00225 //-------------------------------------------------------------------------
00226 
00227 int
00228 LogCollationClientSM::client_auth(int event, VIO * /* vio ATS_UNUSED */)
00229 {
00230   ip_port_text_buffer ipb;
00231 
00232   Debug("log-coll", "[%d]client::client_auth", m_id);
00233 
00234   switch (event) {
00235   case LOG_COLL_EVENT_SWITCH:
00236     {
00237       Debug("log-coll", "[%d]client::client_auth - SWITCH", m_id);
00238       m_client_state = LOG_COLL_CLIENT_AUTH;
00239 
00240       NetMsgHeader nmh;
00241       int bytes_to_send = (int) strlen(Log::config->collation_secret);
00242       nmh.msg_bytes = bytes_to_send;
00243 
00244       // memory copies, I know...  but it happens rarely!!!  ^_^
00245       ink_assert(m_auth_buffer != NULL);
00246       m_auth_buffer->write((char *) &nmh, sizeof(NetMsgHeader));
00247       m_auth_buffer->write(Log::config->collation_secret, bytes_to_send);
00248       bytes_to_send += sizeof(NetMsgHeader);
00249 
00250       Debug("log-coll", "[%d]client::client_auth - do_io_write(%d)", m_id, bytes_to_send);
00251       ink_assert(m_host_vc != NULL);
00252       m_host_vio = m_host_vc->do_io_write(this, bytes_to_send, m_auth_reader);
00253       ink_assert(m_host_vio != NULL);
00254 
00255       return EVENT_CONT;
00256     }
00257 
00258   case VC_EVENT_WRITE_READY:
00259     Debug("log-coll", "[%d]client::client_auth - WRITE_READY", m_id);
00260     return EVENT_CONT;
00261 
00262   case VC_EVENT_WRITE_COMPLETE:
00263     Debug("log-coll", "[%d]client::client_auth - WRITE_COMPLETE", m_id);
00264 
00265     Note("[log-coll] host up [%s:%u]", m_log_host->ip_addr().toString(ipb, sizeof(ipb)), m_log_host->port());
00266     m_host_is_up = true;
00267 
00268     return client_send(LOG_COLL_EVENT_SWITCH, NULL);
00269 
00270   case VC_EVENT_EOS:
00271   case VC_EVENT_ERROR:
00272     {
00273       Debug("log-coll", "[%d]client::client_auth - EOS|ERROR", m_id);
00274       int64_t read_avail = m_auth_reader->read_avail();
00275 
00276       if (read_avail > 0) {
00277         Debug("log-coll", "[%d]client::client_auth - consuming unsent data", m_id);
00278         m_auth_reader->consume(read_avail);
00279       }
00280 
00281       return client_fail(LOG_COLL_EVENT_SWITCH, NULL);
00282     }
00283 
00284   default:
00285     ink_assert(!"unexpected event");
00286     return EVENT_CONT;
00287   }
00288 }
00289 
00290 //-------------------------------------------------------------------------
00291 // LogCollationClientSM::client_dns
00292 // next: client_open || client_done
00293 //-------------------------------------------------------------------------
00294 int
00295 LogCollationClientSM::client_dns(int event, HostDBInfo * hostdb_info)
00296 {
00297   Debug("log-coll", "[%d]client::client_dns", m_id);
00298 
00299   switch (event) {
00300   case LOG_COLL_EVENT_SWITCH:
00301     m_client_state = LOG_COLL_CLIENT_DNS;
00302     if (m_log_host->m_name == 0) {
00303       return client_done(LOG_COLL_EVENT_SWITCH, NULL);
00304     }
00305     hostDBProcessor.getbyname_re(this, m_log_host->m_name, 0,
00306                                  HostDBProcessor::Options().setFlags(HostDBProcessor::HOSTDB_FORCE_DNS_RELOAD));
00307     return EVENT_CONT;
00308 
00309   case EVENT_HOST_DB_LOOKUP:
00310     if (hostdb_info == NULL) {
00311       return client_done(LOG_COLL_EVENT_SWITCH, NULL);
00312     }
00313     m_log_host->m_ip.assign(hostdb_info->ip());
00314     m_log_host->m_ip.toString(m_log_host->m_ipstr, sizeof(m_log_host->m_ipstr));
00315 
00316     return client_open(LOG_COLL_EVENT_SWITCH, NULL);
00317 
00318   default:
00319     ink_assert(!"unexpected event");
00320     return EVENT_CONT;
00321 
00322   }
00323 
00324 }
00325 
00326 //-------------------------------------------------------------------------
00327 // LogCollationClientSM::client_done
00328 // next: <none>
00329 //-------------------------------------------------------------------------
00330 
00331 int
00332 LogCollationClientSM::client_done(int event, void * /* data ATS_UNUSED */)
00333 {
00334   ip_port_text_buffer ipb;
00335 
00336   Debug("log-coll", "[%d]client::client_done", m_id);
00337 
00338   switch (event) {
00339   case LOG_COLL_EVENT_SWITCH:
00340     m_client_state = LOG_COLL_CLIENT_DONE;
00341 
00342     Note("[log-coll] client shutdown [%s:%u]", m_log_host->ip_addr().toString(ipb, sizeof(ipb)), m_log_host->port());
00343 
00344     // close connections
00345     if (m_host_vc) {
00346       Debug("log-coll", "[%d]client::client_done - disconnecting!", m_id);
00347       // do I need to delete this???
00348       m_host_vc->do_io_close(0);
00349       m_host_vc = 0;
00350     }
00351     // flush unsent logs to orphan
00352     flush_to_orphan();
00353 
00354     // cancel any pending events/actions
00355     if (m_pending_action != NULL) {
00356       m_pending_action->cancel();
00357     }
00358     if (m_pending_event != NULL) {
00359       m_pending_event->cancel();
00360     }
00361     // free memory
00362     if (m_auth_buffer) {
00363       if (m_auth_reader) {
00364         m_auth_buffer->dealloc_reader(m_auth_reader);
00365       }
00366       free_MIOBuffer(m_auth_buffer);
00367     }
00368     if (m_send_buffer) {
00369       if (m_send_reader) {
00370         m_send_buffer->dealloc_reader(m_send_reader);
00371       }
00372       free_MIOBuffer(m_send_buffer);
00373     }
00374     if (m_abort_buffer) {
00375       free_MIOBuffer(m_abort_buffer);
00376     }
00377     if (m_buffer_send_list) {
00378       delete m_buffer_send_list;
00379     }
00380 
00381     return EVENT_DONE;
00382 
00383   default:
00384     ink_assert(!"unexpected event");
00385     return EVENT_DONE;
00386 
00387   }
00388 
00389 }
00390 
00391 //-------------------------------------------------------------------------
00392 // LogCollationClientSM::client_fail
00393 // next: client_fail || client_open
00394 //-------------------------------------------------------------------------
00395 
00396 int
00397 LogCollationClientSM::client_fail(int event, void * /* data ATS_UNUSED */)
00398 {
00399   ip_port_text_buffer ipb;
00400 
00401   Debug("log-coll", "[%d]client::client_fail", m_id);
00402 
00403   switch (event) {
00404   case LOG_COLL_EVENT_SWITCH:
00405     Debug("log-coll", "[%d]client::client_fail - SWITCH", m_id);
00406     m_client_state = LOG_COLL_CLIENT_FAIL;
00407 
00408     // avoid flooding log when host is down
00409     if (m_host_is_up) {
00410       Note("[log-coll] host down [%s:%u]", m_log_host->ip_addr().toString(ipb, sizeof ipb), m_log_host->m_port);
00411       char msg_buf[128];
00412       snprintf(msg_buf, sizeof(msg_buf), "Collation host %s:%u down",
00413                m_log_host->ip_addr().toString(ipb, sizeof ipb), m_log_host->m_port
00414       );
00415       RecSignalManager(MGMT_SIGNAL_SAC_SERVER_DOWN, msg_buf);
00416       m_host_is_up = false;
00417     }
00418 
00419     // close our NetVConnection (do I need to delete this)
00420     if (m_host_vc) {
00421       m_host_vc->do_io_close(0);
00422       m_host_vc = 0;
00423     }
00424     // flush unsent logs to orphan
00425     flush_to_orphan();
00426 
00427     // call back in collation_retry_sec seconds
00428     ink_assert(m_pending_event == NULL);
00429     m_pending_event = eventProcessor.schedule_in(this, HRTIME_SECONDS(Log::config->collation_retry_sec));
00430 
00431     return EVENT_CONT;
00432 
00433   case EVENT_INTERVAL:
00434     Debug("log-coll", "[%d]client::client_fail - INTERVAL", m_id);
00435     m_pending_event = NULL;
00436     return client_open(LOG_COLL_EVENT_SWITCH, NULL);
00437 
00438   default:
00439     ink_assert(!"unexpected event");
00440     return EVENT_CONT;
00441 
00442   }
00443 }
00444 
00445 //-------------------------------------------------------------------------
00446 // LogCollationClientSM::client_idle
00447 // next: client_send
00448 //-------------------------------------------------------------------------
00449 
00450 int
00451 LogCollationClientSM::client_idle(int event, void * /* data ATS_UNUSED */)
00452 {
00453   Debug("log-coll", "[%d]client::client_idle", m_id);
00454 
00455   switch (event) {
00456   case LOG_COLL_EVENT_SWITCH:
00457     m_client_state = LOG_COLL_CLIENT_IDLE;
00458     return EVENT_CONT;
00459 
00460   case VC_EVENT_EOS:
00461   case VC_EVENT_ERROR:
00462     Debug("log-coll", "[%d]client::client_idle - EOS|ERROR", m_id);
00463     return client_fail(LOG_COLL_EVENT_SWITCH, NULL);
00464 
00465   default:
00466     ink_assert(!"unexpcted state");
00467     return EVENT_CONT;
00468 
00469   }
00470 }
00471 
00472 //-------------------------------------------------------------------------
00473 // LogCollationClientSM::client_init
00474 // next: client_dns
00475 //-------------------------------------------------------------------------
00476 
00477 int
00478 LogCollationClientSM::client_init(int event, void * /* data ATS_UNUSED */)
00479 {
00480   Debug("log-coll", "[%d]client::client_init", m_id);
00481 
00482   switch (event) {
00483   case LOG_COLL_EVENT_SWITCH:
00484     m_client_state = LOG_COLL_CLIENT_INIT;
00485     ink_assert(m_pending_event == NULL);
00486     ink_mutex_acquire(&(mutex->the_mutex));
00487     m_pending_event = eventProcessor.schedule_imm(this);
00488     ink_mutex_release(&(mutex->the_mutex));
00489     return EVENT_CONT;
00490 
00491   case EVENT_IMMEDIATE:
00492     // callback complete, reset m_pending_event
00493     m_pending_event = NULL;
00494 
00495     // allocate buffers
00496     m_auth_buffer = new_MIOBuffer();
00497     ink_assert(m_auth_buffer != NULL);
00498     m_auth_reader = m_auth_buffer->alloc_reader();
00499     ink_assert(m_auth_reader != NULL);
00500     m_send_buffer = new_MIOBuffer();
00501     ink_assert(m_send_buffer != NULL);
00502     m_send_reader = m_send_buffer->alloc_reader();
00503     ink_assert(m_send_reader != NULL);
00504     m_abort_buffer = new_MIOBuffer();
00505     ink_assert(m_abort_buffer != NULL);
00506 
00507     // if we don't have an ip already, switch to client_dns
00508     if (! m_log_host->ip_addr().isValid()) {
00509       return client_dns(LOG_COLL_EVENT_SWITCH, NULL);
00510     } else {
00511       return client_open(LOG_COLL_EVENT_SWITCH, NULL);
00512     }
00513 
00514   default:
00515     ink_assert(!"unexpected state");
00516     return EVENT_CONT;
00517 
00518   }
00519 }
00520 
00521 //-------------------------------------------------------------------------
00522 // LogCollationClientSM::client_open
00523 // next: client_auth || client_fail
00524 //-------------------------------------------------------------------------
00525 
00526 int
00527 LogCollationClientSM::client_open(int event, NetVConnection * net_vc)
00528 {
00529   ip_port_text_buffer ipb;
00530   Debug("log-coll", "[%d]client::client_open", m_id);
00531 
00532   switch (event) {
00533   case LOG_COLL_EVENT_SWITCH:
00534     Debug("log-coll", "[%d]client::client_open - SWITCH", m_id);
00535     m_client_state = LOG_COLL_CLIENT_OPEN;
00536 
00537     {
00538       IpEndpoint target;
00539       target.assign(m_log_host->ip_addr(), htons(m_log_host->port()));
00540       ink_assert(target.isValid());
00541       Action *connect_action_handle = netProcessor.connect_re(this, &target.sa);
00542 
00543       if (connect_action_handle != ACTION_RESULT_DONE) {
00544         ink_assert(!m_pending_action);
00545         m_pending_action = connect_action_handle;
00546       }
00547     }
00548 
00549     return EVENT_CONT;
00550 
00551   case NET_EVENT_OPEN:
00552     Debug("log-coll", "[%d]client::client_open - %s:%u", m_id,
00553           m_log_host->ip_addr().toString(ipb, sizeof ipb), m_log_host->port()
00554     );
00555 
00556     // callback complete, reset m_pending_action
00557     m_pending_action = NULL;
00558 
00559     ink_assert(net_vc != NULL);
00560     m_host_vc = net_vc;
00561 
00562     // setup a client reader just for detecting a host disconnnect
00563     // (iocore should call back this function with and EOS/ERROR)
00564     m_abort_vio = m_host_vc->do_io_read(this, 1, m_abort_buffer);
00565 
00566     // change states
00567     return client_auth(LOG_COLL_EVENT_SWITCH, NULL);
00568 
00569   case NET_EVENT_OPEN_FAILED:
00570     Debug("log-coll", "[%d]client::client_open - OPEN_FAILED", m_id);
00571     // callback complete, reset m_pending_pending action
00572     m_pending_action = NULL;
00573     return client_fail(LOG_COLL_EVENT_SWITCH, NULL);
00574 
00575   default:
00576     ink_assert(!"unexpected event");
00577     return EVENT_CONT;
00578 
00579   }
00580 }
00581 
00582 //-------------------------------------------------------------------------
00583 // LogCollationClientSM::client_send
00584 // next: client_fail || client_idle || client_send
00585 //-------------------------------------------------------------------------
00586 
00587 int
00588 LogCollationClientSM::client_send(int event, VIO * /* vio ATS_UNUSED */)
00589 {
00590   ip_port_text_buffer ipb;
00591 
00592   Debug("log-coll", "[%d]client::client_send", m_id);
00593 
00594   switch (event) {
00595   case EVENT_IMMEDIATE:
00596     Debug("log-coll", "[%d]client::client_send - EVENT_IMMEDIATE", m_id);
00597     // callback complete, reset m_pending_event
00598     m_pending_event = NULL;
00599 
00600     // fall through to LOG_COLL_EVENT_SWITCH
00601 
00602   case LOG_COLL_EVENT_SWITCH:
00603     {
00604       Debug("log-coll", "[%d]client::client_send - SWITCH", m_id);
00605       m_client_state = LOG_COLL_CLIENT_SEND;
00606 
00607       // get a buffer off our queue
00608       ink_assert(m_buffer_send_list != NULL);
00609       ink_assert(m_buffer_in_iocore == NULL);
00610       if ((m_buffer_in_iocore = m_buffer_send_list->get()) == NULL) {
00611         return client_idle(LOG_COLL_EVENT_SWITCH, NULL);
00612       }
00613       Debug("log-coll", "[%d]client::client_send - send_list to m_buffer_in_iocore", m_id);
00614       Debug("log-coll", "[%d]client::client_send - send_list_size(%d)", m_id, m_buffer_send_list->get_size());
00615 
00616       // enable m_flow if we're out of work to do
00617       if (m_flow == LOG_COLL_FLOW_DENY && m_buffer_send_list->get_size() == 0) {
00618         Debug("log-coll", "[%d]client::client_send - m_flow = ALLOW", m_id);
00619         Note("[log-coll] send-queue clear; resuming collation [%s:%u]",
00620              m_log_host->ip_addr().toString(ipb, sizeof ipb), m_log_host->port());
00621         m_flow = LOG_COLL_FLOW_ALLOW;
00622       }
00623       // future work:
00624       // Wrap the buffer in a io_buffer_block and send directly to
00625       // do_io_write to save a memory copy.  But for now, just
00626       // write the lame way.
00627 
00628 #if defined(LOG_BUFFER_TRACKING)
00629       Debug("log-buftrak", "[%d]client::client_send - network write begin", m_buffer_in_iocore->header()->id);
00630 #endif // defined(LOG_BUFFER_TRACKING)
00631 
00632       // prepare to send data
00633       ink_assert(m_buffer_in_iocore != NULL);
00634       LogBufferHeader *log_buffer_header = m_buffer_in_iocore->header();
00635       ink_assert(log_buffer_header != NULL);
00636       NetMsgHeader nmh;
00637       int bytes_to_send = log_buffer_header->byte_count;
00638       nmh.msg_bytes = bytes_to_send;
00639       // TODO: We currently don't try to make the log buffers handle little vs big endian. TS-1156.
00640       //m_buffer_in_iocore->convert_to_network_order();
00641 
00642       RecIncrRawStat(log_rsb, mutex->thread_holding, log_stat_num_sent_to_network_stat,
00643                      log_buffer_header->entry_count);
00644 
00645       RecIncrRawStat(log_rsb, mutex->thread_holding, log_stat_bytes_sent_to_network_stat,
00646                      log_buffer_header->byte_count);
00647 
00648       // copy into m_send_buffer
00649       ink_assert(m_send_buffer != NULL);
00650       m_send_buffer->write((char *) &nmh, sizeof(NetMsgHeader));
00651       m_send_buffer->write((char *) log_buffer_header, bytes_to_send);
00652       bytes_to_send += sizeof(NetMsgHeader);
00653 
00654       // send m_send_buffer to iocore
00655       Debug("log-coll", "[%d]client::client_send - do_io_write(%d)", m_id, bytes_to_send);
00656       ink_assert(m_host_vc != NULL);
00657       m_host_vio = m_host_vc->do_io_write(this, bytes_to_send, m_send_reader);
00658       ink_assert(m_host_vio != NULL);
00659     }
00660     return EVENT_CONT;
00661 
00662   case VC_EVENT_WRITE_READY:
00663     Debug("log-coll", "[%d]client::client_send - WRITE_READY", m_id);
00664     return EVENT_CONT;
00665 
00666   case VC_EVENT_WRITE_COMPLETE:
00667     Debug("log-coll", "[%d]client::client_send - WRITE_COMPLETE", m_id);
00668 
00669     ink_assert(m_buffer_in_iocore != NULL);
00670 #if defined(LOG_BUFFER_TRACKING)
00671     Debug("log-buftrak", "[%d]client::client_send - network write complete", m_buffer_in_iocore->header()->id);
00672 #endif // defined(LOG_BUFFER_TRACKING)
00673 
00674     // done with the buffer, delete it
00675     Debug("log-coll", "[%d]client::client_send - m_buffer_in_iocore[%p] to delete_list", m_id, m_buffer_in_iocore);
00676     LogBuffer::destroy(m_buffer_in_iocore);
00677     m_buffer_in_iocore = NULL;
00678 
00679     // switch back to client_send
00680     return client_send(LOG_COLL_EVENT_SWITCH, NULL);
00681 
00682   case VC_EVENT_EOS:
00683   case VC_EVENT_ERROR:
00684     {
00685       Debug("log-coll", "[%d]client::client_send - EOS|ERROR", m_id);
00686       int64_t read_avail = m_send_reader->read_avail();
00687 
00688       if (read_avail > 0) {
00689         Debug("log-coll", "[%d]client::client_send - consuming unsent data", m_id);
00690         m_send_reader->consume(read_avail);
00691       }
00692 
00693       return client_fail(LOG_COLL_EVENT_SWITCH, NULL);
00694     }
00695 
00696   default:
00697     Debug("log-coll", "[%d]client::client_send - default", m_id);
00698     return client_fail(LOG_COLL_EVENT_SWITCH, NULL);
00699 
00700   }
00701 }
00702 
00703 //-------------------------------------------------------------------------
00704 //-------------------------------------------------------------------------
00705 //
00706 // support functions
00707 //
00708 //-------------------------------------------------------------------------
00709 //-------------------------------------------------------------------------
00710 
00711 //-------------------------------------------------------------------------
00712 // LogCollationClientSM::flush_to_orphan
00713 //-------------------------------------------------------------------------
00714 void
00715 LogCollationClientSM::flush_to_orphan()
00716 {
00717   Debug("log-coll", "[%d]client::flush_to_orphan", m_id);
00718 
00719   // if in middle of a write, flush buffer_in_iocore to orphan
00720   if (m_buffer_in_iocore != NULL) {
00721     Debug("log-coll", "[%d]client::flush_to_orphan - m_buffer_in_iocore to oprhan", m_id);
00722     // TODO: We currently don't try to make the log buffers handle little vs big endian. TS-1156.
00723     // m_buffer_in_iocore->convert_to_host_order();
00724     m_log_host->orphan_write_and_try_delete(m_buffer_in_iocore);
00725     m_buffer_in_iocore = NULL;
00726   }
00727   // flush buffers in send_list to orphan
00728   LogBuffer *log_buffer;
00729   ink_assert(m_buffer_send_list != NULL);
00730   while ((log_buffer = m_buffer_send_list->get()) != NULL) {
00731     Debug("log-coll", "[%d]client::flush_to_orphan - send_list to orphan", m_id);
00732     m_log_host->orphan_write_and_try_delete(log_buffer);
00733   }
00734 
00735   // Now send_list is empty, let's update m_flow to ALLOW status
00736   Debug("log-coll", "[%d]client::client_send - m_flow = ALLOW", m_id);
00737   m_flow = LOG_COLL_FLOW_ALLOW;
00738 }

Generated by  doxygen 1.7.1