• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

RecProcess.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   Record process definitions
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 #include "libts.h"
00025 
00026 #include "I_Tasks.h"
00027 
00028 #include "P_EventSystem.h"
00029 #include "P_RecCore.h"
00030 #include "P_RecProcess.h"
00031 #include "P_RecMessage.h"
00032 #include "P_RecUtils.h"
00033 #include "P_RecFile.h"
00034 
00035 #include "mgmtapi.h"
00036 #include "ProcessManager.h"
00037 
00038 // Marks whether the message handler has been initialized.
00039 static bool message_initialized_p = false;
00040 static bool g_started = false;
00041 static EventNotify g_force_req_notify;
00042 static int g_rec_raw_stat_sync_interval_ms = REC_RAW_STAT_SYNC_INTERVAL_MS;
00043 static int g_rec_config_update_interval_ms = REC_CONFIG_UPDATE_INTERVAL_MS;
00044 static int g_rec_remote_sync_interval_ms = REC_REMOTE_SYNC_INTERVAL_MS;
00045 static Event *raw_stat_sync_cont_event;
00046 static Event *config_update_cont_event;
00047 static Event *sync_cont_event;
00048 
00049 //-------------------------------------------------------------------------
00050 // i_am_the_record_owner, only used for librecords_p.a
00051 //-------------------------------------------------------------------------
00052 bool
00053 i_am_the_record_owner(RecT rec_type)
00054 {
00055   if (g_mode_type == RECM_CLIENT) {
00056     switch (rec_type) {
00057     case RECT_PROCESS:
00058     case RECT_PLUGIN:
00059       return true;
00060     case RECT_CONFIG:
00061     case RECT_NODE:
00062     case RECT_CLUSTER:
00063     case RECT_LOCAL:
00064       return false;
00065     default:
00066       ink_assert(!"Unexpected RecT type");
00067       return false;
00068     }
00069   } else if (g_mode_type == RECM_STAND_ALONE) {
00070     switch (rec_type) {
00071     case RECT_CONFIG:
00072     case RECT_PROCESS:
00073     case RECT_NODE:
00074     case RECT_CLUSTER:
00075     case RECT_LOCAL:
00076     case RECT_PLUGIN:
00077       return true;
00078     default:
00079       ink_assert(!"Unexpected RecT type");
00080       return false;
00081     }
00082   }
00083 
00084   return false;
00085 }
00086 
00087 //-------------------------------------------------------------------------
00088 // Simple setters for the intervals to decouple this from the proxy
00089 //-------------------------------------------------------------------------
00090 void
00091 RecProcess_set_raw_stat_sync_interval_ms(int ms) {
00092   Debug("statsproc", "g_rec_raw_stat_sync_interval_ms -> %d", ms);
00093   g_rec_raw_stat_sync_interval_ms = ms;
00094   if (raw_stat_sync_cont_event) {
00095     Debug("statsproc", "Rescheduling raw-stat syncer");
00096     raw_stat_sync_cont_event->schedule_every(HRTIME_MSECONDS(g_rec_raw_stat_sync_interval_ms));
00097   }
00098 }
00099 void
00100 RecProcess_set_config_update_interval_ms(int ms) {
00101   Debug("statsproc", "g_rec_config_update_interval_ms -> %d", ms);
00102   g_rec_config_update_interval_ms = ms;
00103   if (config_update_cont_event) {
00104     Debug("statsproc", "Rescheduling config syncer");
00105     config_update_cont_event->schedule_every(HRTIME_MSECONDS(g_rec_config_update_interval_ms));
00106   }
00107 }
00108 void
00109 RecProcess_set_remote_sync_interval_ms(int ms) {
00110   Debug("statsproc", "g_rec_remote_sync_interval_ms -> %d", ms);
00111   g_rec_remote_sync_interval_ms = ms;
00112   if (sync_cont_event) {
00113     Debug("statsproc", "Rescheduling remote syncer");
00114     sync_cont_event->schedule_every(HRTIME_MSECONDS(g_rec_remote_sync_interval_ms));
00115   }
00116 }
00117 
00118 //-------------------------------------------------------------------------
00119 // raw_stat_get_total
00120 //-------------------------------------------------------------------------
00121 static int
00122 raw_stat_get_total(RecRawStatBlock *rsb, int id, RecRawStat *total)
00123 {
00124   int i;
00125   RecRawStat *tlp;
00126 
00127   total->sum = 0;
00128   total->count = 0;
00129 
00130   // get global values
00131   total->sum = rsb->global[id]->sum;
00132   total->count = rsb->global[id]->count;
00133 
00134   // get thread local values
00135   for (i = 0; i < eventProcessor.n_ethreads; i++) {
00136     tlp = ((RecRawStat *) ((char *) (eventProcessor.all_ethreads[i]) + rsb->ethr_stat_offset)) + id;
00137     total->sum += tlp->sum;
00138     total->count += tlp->count;
00139   }
00140 
00141   for (i = 0; i < eventProcessor.n_dthreads; i++) {
00142     tlp = ((RecRawStat *) ((char *) (eventProcessor.all_dthreads[i]) + rsb->ethr_stat_offset)) + id;
00143     total->sum += tlp->sum;
00144     total->count += tlp->count;
00145   }
00146 
00147   if (total->sum < 0) { // Assure that we stay positive
00148     total->sum = 0;
00149   }
00150 
00151   return REC_ERR_OKAY;
00152 }
00153 
00154 
00155 //-------------------------------------------------------------------------
00156 // raw_stat_sync_to_global
00157 //-------------------------------------------------------------------------
00158 static int
00159 raw_stat_sync_to_global(RecRawStatBlock *rsb, int id)
00160 {
00161   int i;
00162   RecRawStat *tlp;
00163   RecRawStat total;
00164 
00165   total.sum = 0;
00166   total.count = 0;
00167 
00168   // sum the thread local values
00169   for (i = 0; i < eventProcessor.n_ethreads; i++) {
00170     tlp = ((RecRawStat *) ((char *) (eventProcessor.all_ethreads[i]) + rsb->ethr_stat_offset)) + id;
00171     total.sum += tlp->sum;
00172     total.count += tlp->count;
00173   }
00174 
00175   for (i = 0; i < eventProcessor.n_dthreads; i++) {
00176     tlp = ((RecRawStat *) ((char *) (eventProcessor.all_dthreads[i]) + rsb->ethr_stat_offset)) + id;
00177     total.sum += tlp->sum;
00178     total.count += tlp->count;
00179   }
00180 
00181   if (total.sum < 0) { // Assure that we stay positive
00182     total.sum = 0;
00183   }
00184 
00185   // lock so the setting of the globals and last values are atomic
00186   ink_mutex_acquire(&(rsb->mutex));
00187 
00188   // get the delta from the last sync
00189   RecRawStat delta;
00190   delta.sum = total.sum - rsb->global[id]->last_sum;
00191   delta.count = total.count - rsb->global[id]->last_count;
00192 
00193   // This is too verbose now, so leaving it out / leif
00194   //Debug("stats", "raw_stat_sync_to_global(): rsb pointer:%p id:%d delta:%" PRId64 " total:%" PRId64 " last:%" PRId64 " global:%" PRId64 "\n",
00195   //rsb, id, delta.sum, total.sum, rsb->global[id]->last_sum, rsb->global[id]->sum);
00196 
00197   // increment the global values by the delta
00198   ink_atomic_increment(&(rsb->global[id]->sum), delta.sum);
00199   ink_atomic_increment(&(rsb->global[id]->count), delta.count);
00200 
00201   // set the new totals as the last values seen
00202   ink_atomic_swap(&(rsb->global[id]->last_sum), total.sum);
00203   ink_atomic_swap(&(rsb->global[id]->last_count), total.count);
00204 
00205   ink_mutex_release(&(rsb->mutex));
00206 
00207   return REC_ERR_OKAY;
00208 }
00209 
00210 
00211 //-------------------------------------------------------------------------
00212 // raw_stat_clear
00213 //-------------------------------------------------------------------------
00214 static int
00215 raw_stat_clear(RecRawStatBlock *rsb, int id)
00216 {
00217   Debug("stats", "raw_stat_clear(): rsb pointer:%p id:%d\n", rsb, id);
00218 
00219   // the globals need to be reset too
00220   // lock so the setting of the globals and last values are atomic
00221   ink_mutex_acquire(&(rsb->mutex));
00222   ink_atomic_swap(&(rsb->global[id]->sum), (int64_t)0);
00223   ink_atomic_swap(&(rsb->global[id]->last_sum), (int64_t)0);
00224   ink_atomic_swap(&(rsb->global[id]->count), (int64_t)0);
00225   ink_atomic_swap(&(rsb->global[id]->last_count), (int64_t)0);
00226   ink_mutex_release(&(rsb->mutex));
00227 
00228   // reset the local stats
00229   RecRawStat *tlp;
00230   for (int i = 0; i < eventProcessor.n_ethreads; i++) {
00231     tlp = ((RecRawStat *) ((char *) (eventProcessor.all_ethreads[i]) + rsb->ethr_stat_offset)) + id;
00232     ink_atomic_swap(&(tlp->sum), (int64_t)0);
00233     ink_atomic_swap(&(tlp->count), (int64_t)0);
00234   }
00235 
00236   for (int i = 0; i < eventProcessor.n_dthreads; i++) {
00237     tlp = ((RecRawStat *) ((char *) (eventProcessor.all_dthreads[i]) + rsb->ethr_stat_offset)) + id;
00238     ink_atomic_swap(&(tlp->sum), (int64_t)0);
00239     ink_atomic_swap(&(tlp->count), (int64_t)0);
00240   }
00241 
00242   return REC_ERR_OKAY;
00243 }
00244 
00245 
00246 //-------------------------------------------------------------------------
00247 // raw_stat_clear_sum
00248 //-------------------------------------------------------------------------
00249 static int
00250 raw_stat_clear_sum(RecRawStatBlock *rsb, int id)
00251 {
00252   Debug("stats", "raw_stat_clear_sum(): rsb pointer:%p id:%d\n", rsb, id);
00253 
00254   // the globals need to be reset too
00255   // lock so the setting of the globals and last values are atomic
00256   ink_mutex_acquire(&(rsb->mutex));
00257   ink_atomic_swap(&(rsb->global[id]->sum), (int64_t)0);
00258   ink_atomic_swap(&(rsb->global[id]->last_sum), (int64_t)0);
00259   ink_mutex_release(&(rsb->mutex));
00260 
00261   // reset the local stats
00262   RecRawStat *tlp;
00263   for (int i = 0; i < eventProcessor.n_ethreads; i++) {
00264     tlp = ((RecRawStat *) ((char *) (eventProcessor.all_ethreads[i]) + rsb->ethr_stat_offset)) + id;
00265     ink_atomic_swap(&(tlp->sum), (int64_t)0);
00266   }
00267 
00268   for (int i = 0; i < eventProcessor.n_dthreads; i++) {
00269     tlp = ((RecRawStat *) ((char *) (eventProcessor.all_dthreads[i]) + rsb->ethr_stat_offset)) + id;
00270     ink_atomic_swap(&(tlp->sum), (int64_t)0);
00271   }
00272 
00273   return REC_ERR_OKAY;
00274 }
00275 
00276 
00277 //-------------------------------------------------------------------------
00278 // raw_stat_clear_count
00279 //-------------------------------------------------------------------------
00280 static int
00281 raw_stat_clear_count(RecRawStatBlock *rsb, int id)
00282 {
00283   Debug("stats", "raw_stat_clear_count(): rsb pointer:%p id:%d\n", rsb, id);
00284 
00285   // the globals need to be reset too
00286   // lock so the setting of the globals and last values are atomic
00287   ink_mutex_acquire(&(rsb->mutex));
00288   ink_atomic_swap(&(rsb->global[id]->count), (int64_t)0);
00289   ink_atomic_swap(&(rsb->global[id]->last_count), (int64_t)0);
00290   ink_mutex_release(&(rsb->mutex));
00291 
00292   // reset the local stats
00293   RecRawStat *tlp;
00294   for (int i = 0; i < eventProcessor.n_ethreads; i++) {
00295     tlp = ((RecRawStat *) ((char *) (eventProcessor.all_ethreads[i]) + rsb->ethr_stat_offset)) + id;
00296     ink_atomic_swap(&(tlp->count), (int64_t)0);
00297   }
00298 
00299   for (int i = 0; i < eventProcessor.n_dthreads; i++) {
00300     tlp = ((RecRawStat *) ((char *) (eventProcessor.all_dthreads[i]) + rsb->ethr_stat_offset)) + id;
00301     ink_atomic_swap(&(tlp->count), (int64_t)0);
00302   }
00303 
00304   return REC_ERR_OKAY;
00305 }
00306 
00307 
00308 //-------------------------------------------------------------------------
00309 // recv_message_cb__process
00310 //-------------------------------------------------------------------------
00311 static int
00312 recv_message_cb__process(RecMessage *msg, RecMessageT msg_type, void *cookie)
00313 {
00314   int err;
00315 
00316   if ((err = recv_message_cb(msg, msg_type, cookie)) == REC_ERR_OKAY) {
00317     if (msg_type == RECG_PULL_ACK) {
00318       g_force_req_notify.lock();
00319       g_force_req_notify.signal();
00320       g_force_req_notify.unlock();
00321     }
00322   }
00323   return err;
00324 }
00325 
00326 
00327 //-------------------------------------------------------------------------
00328 // raw_stat_sync_cont
00329 //-------------------------------------------------------------------------
00330 struct raw_stat_sync_cont: public Continuation
00331 {
00332   raw_stat_sync_cont(ProxyMutex *m)
00333     : Continuation(m)
00334   {
00335     SET_HANDLER(&raw_stat_sync_cont::exec_callbacks);
00336   }
00337 
00338   int exec_callbacks(int /* event */, Event * /* e */)
00339   {
00340     RecExecRawStatSyncCbs();
00341     Debug("statsproc", "raw_stat_sync_cont() processed");
00342 
00343     return EVENT_CONT;
00344   }
00345 };
00346 
00347 
00348 //-------------------------------------------------------------------------
00349 // config_update_cont
00350 //-------------------------------------------------------------------------
00351 struct config_update_cont: public Continuation
00352 {
00353   config_update_cont(ProxyMutex *m)
00354     : Continuation(m)
00355   {
00356     SET_HANDLER(&config_update_cont::exec_callbacks);
00357   }
00358 
00359   int exec_callbacks(int /* event */, Event * /* e */)
00360   {
00361     RecExecConfigUpdateCbs(REC_PROCESS_UPDATE_REQUIRED);
00362     Debug("statsproc", "config_update_cont() processed");
00363 
00364     return EVENT_CONT;
00365   }
00366 };
00367 
00368 
00369 //-------------------------------------------------------------------------
00370 // sync_cont
00371 //-------------------------------------------------------------------------
00372 struct sync_cont: public Continuation
00373 {
00374   textBuffer *m_tb;
00375 
00376   sync_cont(ProxyMutex *m)
00377     : Continuation(m)
00378   {
00379     SET_HANDLER(&sync_cont::sync);
00380     m_tb = new textBuffer(65536);
00381   }
00382 
00383    ~sync_cont()
00384   {
00385     if (m_tb != NULL) {
00386       delete m_tb;
00387       m_tb = NULL;
00388     }
00389   }
00390 
00391   int sync(int /* event */, Event * /* e */)
00392   {
00393     send_push_message();
00394     RecSyncStatsFile();
00395     if (RecSyncConfigToTB(m_tb) == REC_ERR_OKAY) {
00396         RecWriteConfigFile(m_tb);
00397     }
00398     Debug("statsproc", "sync_cont() processed");
00399 
00400     return EVENT_CONT;
00401   }
00402 };
00403 
00404 
00405 //-------------------------------------------------------------------------
00406 // RecProcessInit
00407 //-------------------------------------------------------------------------
00408 int
00409 RecProcessInit(RecModeT mode_type, Diags *_diags)
00410 {
00411   static bool initialized_p = false;
00412 
00413   if (initialized_p) {
00414     return REC_ERR_OKAY;
00415   }
00416 
00417   g_mode_type = mode_type;
00418 
00419   if (RecCoreInit(mode_type, _diags) == REC_ERR_FAIL) {
00420     return REC_ERR_FAIL;
00421   }
00422 
00423   /* -- defer RecMessageInit() until ProcessManager is initialized and
00424    *    started
00425    if (RecMessageInit(mode_type) == REC_ERR_FAIL) {
00426    return REC_ERR_FAIL;
00427    }
00428 
00429    if (RecMessageRegisterRecvCb(recv_message_cb__process, NULL)) {
00430    return REC_ERR_FAIL;
00431    }
00432 
00433    ink_cond_init(&g_force_req_cond);
00434    ink_mutex_init(&g_force_req_mutex, NULL);
00435    if (mode_type == RECM_CLIENT) {
00436    send_pull_message(RECG_PULL_REQ);
00437    ink_cond_wait(&g_force_req_cond, &g_force_req_mutex);
00438    ink_mutex_release(&g_force_req_mutex);
00439    }
00440    */
00441 
00442   initialized_p = true;
00443 
00444   return REC_ERR_OKAY;
00445 }
00446 
00447 
00448 void
00449 RecMessageInit()
00450 {
00451   ink_assert(g_mode_type != RECM_NULL);
00452   pmgmt->registerMgmtCallback(MGMT_EVENT_LIBRECORDS, RecMessageRecvThis, NULL);
00453   message_initialized_p = true;
00454 }
00455 
00456 //-------------------------------------------------------------------------
00457 // RecProcessInitMessage
00458 //-------------------------------------------------------------------------
00459 int
00460 RecProcessInitMessage(RecModeT mode_type)
00461 {
00462   static bool initialized_p = false;
00463 
00464   if (initialized_p) {
00465     return REC_ERR_OKAY;
00466   }
00467 
00468   RecMessageInit();
00469   if (RecMessageRegisterRecvCb(recv_message_cb__process, NULL)) {
00470     return REC_ERR_FAIL;
00471   }
00472 
00473   if (mode_type == RECM_CLIENT) {
00474     send_pull_message(RECG_PULL_REQ);
00475     g_force_req_notify.lock();
00476     g_force_req_notify.wait();
00477     g_force_req_notify.unlock();
00478   }
00479 
00480   initialized_p = true;
00481 
00482   return REC_ERR_OKAY;
00483 }
00484 
00485 
00486 //-------------------------------------------------------------------------
00487 // RecProcessStart
00488 //-------------------------------------------------------------------------
00489 int
00490 RecProcessStart(void)
00491 {
00492   if (g_started) {
00493     return REC_ERR_OKAY;
00494   }
00495 
00496   Debug("statsproc", "Starting sync continuations:");
00497   raw_stat_sync_cont *rssc = new raw_stat_sync_cont(new_ProxyMutex());
00498   Debug("statsproc", "\traw-stat syncer");
00499   raw_stat_sync_cont_event = eventProcessor.schedule_every(rssc, HRTIME_MSECONDS(g_rec_raw_stat_sync_interval_ms), ET_TASK);
00500 
00501   config_update_cont *cuc = new config_update_cont(new_ProxyMutex());
00502   Debug("statsproc", "\tconfig syncer");
00503   config_update_cont_event = eventProcessor.schedule_every(cuc, HRTIME_MSECONDS(g_rec_config_update_interval_ms), ET_TASK);
00504 
00505   sync_cont *sc = new sync_cont(new_ProxyMutex());
00506   Debug("statsproc", "\tremote syncer");
00507   sync_cont_event = eventProcessor.schedule_every(sc, HRTIME_MSECONDS(g_rec_remote_sync_interval_ms), ET_TASK);
00508 
00509   g_started = true;
00510 
00511   return REC_ERR_OKAY;
00512 }
00513 
00514 
00515 //-------------------------------------------------------------------------
00516 // RecAllocateRawStatBlock
00517 //-------------------------------------------------------------------------
00518 RecRawStatBlock *
00519 RecAllocateRawStatBlock(int num_stats)
00520 {
00521   off_t ethr_stat_offset;
00522   RecRawStatBlock *rsb;
00523 
00524   // allocate thread-local raw-stat memory
00525   if ((ethr_stat_offset = eventProcessor.allocate(num_stats * sizeof(RecRawStat))) == -1) {
00526     return NULL;
00527   }
00528   // create the raw-stat-block structure
00529   rsb = (RecRawStatBlock *)ats_malloc(sizeof(RecRawStatBlock));
00530   memset(rsb, 0, sizeof(RecRawStatBlock));
00531   rsb->ethr_stat_offset = ethr_stat_offset;
00532   rsb->global = (RecRawStat **)ats_malloc(num_stats * sizeof(RecRawStat *));
00533   memset(rsb->global, 0, num_stats * sizeof(RecRawStat *));
00534   rsb->num_stats = 0;
00535   rsb->max_stats = num_stats;
00536   ink_mutex_init(&(rsb->mutex),"net stat mutex");
00537   return rsb;
00538 }
00539 
00540 
00541 //-------------------------------------------------------------------------
00542 // RecRegisterRawStat
00543 //-------------------------------------------------------------------------
00544 int
00545 _RecRegisterRawStat(RecRawStatBlock *rsb, RecT rec_type, const char *name, RecDataT data_type, RecPersistT persist_type, int id,
00546                    RecRawStatSyncCb sync_cb)
00547 {
00548   Debug("stats", "RecRawStatSyncCb(%s): rsb pointer:%p id:%d\n", name, rsb, id);
00549 
00550   // check to see if we're good to proceed
00551   ink_assert(id < rsb->max_stats);
00552 
00553   int err = REC_ERR_OKAY;
00554 
00555   RecRecord *r;
00556   RecData data_default;
00557   memset(&data_default, 0, sizeof(RecData));
00558 
00559   // register the record
00560   if ((r = RecRegisterStat(rec_type, name, data_type, data_default, persist_type)) == NULL) {
00561     err = REC_ERR_FAIL;
00562     goto Ldone;
00563   }
00564   r->rsb_id = id; // This is the index within the RSB raw block for this stat, used for lookups by name.
00565   if (i_am_the_record_owner(r->rec_type)) {
00566     r->sync_required = r->sync_required | REC_PEER_SYNC_REQUIRED;
00567   } else {
00568     send_register_message(r);
00569   }
00570 
00571   // store a pointer to our record->stat_meta.data_raw in our rsb
00572   rsb->global[id] = &(r->stat_meta.data_raw);
00573   rsb->global[id]->last_sum = 0;
00574   rsb->global[id]->last_count = 0;
00575 
00576   // setup the periodic sync callback
00577   RecRegisterRawStatSyncCb(name, sync_cb, rsb, id);
00578 
00579 Ldone:
00580   return err;
00581 }
00582 
00583 
00584 //-------------------------------------------------------------------------
00585 // RecRawStatSync...
00586 //-------------------------------------------------------------------------
00587 
00588 // Note: On these RecRawStatSync callbacks, our 'data' is protected
00589 // under its lock by the caller, so no need to worry!
00590 int
00591 RecRawStatSyncSum(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00592 {
00593   RecRawStat total;
00594 
00595   Debug("stats", "raw sync:sum for %s", name);
00596   raw_stat_sync_to_global(rsb, id);
00597   total.sum = rsb->global[id]->sum;
00598   total.count = rsb->global[id]->count;
00599   RecDataSetFromInk64(data_type, data, total.sum);
00600 
00601   return REC_ERR_OKAY;
00602 }
00603 
00604 int
00605 RecRawStatSyncCount(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00606 {
00607   RecRawStat total;
00608 
00609   Debug("stats", "raw sync:count for %s", name);
00610   raw_stat_sync_to_global(rsb, id);
00611   total.sum = rsb->global[id]->sum;
00612   total.count = rsb->global[id]->count;
00613   RecDataSetFromInk64(data_type, data, total.count);
00614 
00615   return REC_ERR_OKAY;
00616 }
00617 
00618 int
00619 RecRawStatSyncAvg(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00620 {
00621   RecRawStat total;
00622   RecFloat avg = 0.0f;
00623 
00624   Debug("stats", "raw sync:avg for %s", name);
00625   raw_stat_sync_to_global(rsb, id);
00626   total.sum = rsb->global[id]->sum;
00627   total.count = rsb->global[id]->count;
00628   if (total.count != 0)
00629     avg = (float) ((double) total.sum / (double) total.count);
00630   RecDataSetFromFloat(data_type, data, avg);
00631   return REC_ERR_OKAY;
00632 }
00633 
00634 int
00635 RecRawStatSyncHrTimeAvg(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00636 {
00637   RecRawStat total;
00638   RecFloat r;
00639 
00640   Debug("stats", "raw sync:hr-timeavg for %s", name);
00641   raw_stat_sync_to_global(rsb, id);
00642   total.sum = rsb->global[id]->sum;
00643   total.count = rsb->global[id]->count;
00644   if (total.count == 0) {
00645     r = 0.0f;
00646   } else {
00647     r = (float) ((double) total.sum / (double) total.count);
00648     r = r / (float) (HRTIME_SECOND);
00649   }
00650   RecDataSetFromFloat(data_type, data, r);
00651   return REC_ERR_OKAY;
00652 }
00653 
00654 int
00655 RecRawStatSyncIntMsecsToFloatSeconds(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00656 {
00657   RecRawStat total;
00658   RecFloat r;
00659 
00660   Debug("stats", "raw sync:seconds for %s", name);
00661   raw_stat_sync_to_global(rsb, id);
00662   total.sum = rsb->global[id]->sum;
00663   total.count = rsb->global[id]->count;
00664   if (total.count == 0) {
00665     r = 0.0f;
00666   } else {
00667     r = (float) ((double) total.sum / 1000);
00668   }
00669   RecDataSetFromFloat(data_type, data, r);
00670   return REC_ERR_OKAY;
00671 }
00672 
00673 int
00674 RecRawStatSyncMHrTimeAvg(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00675 {
00676   RecRawStat total;
00677   RecFloat r;
00678 
00679   Debug("stats", "raw sync:mhr-timeavg for %s", name);
00680   raw_stat_sync_to_global(rsb, id);
00681   total.sum = rsb->global[id]->sum;
00682   total.count = rsb->global[id]->count;
00683   if (total.count == 0) {
00684     r = 0.0f;
00685   } else {
00686     r = (float) ((double) total.sum / (double) total.count);
00687     r = r / (float) (HRTIME_MSECOND);
00688   }
00689   RecDataSetFromFloat(data_type, data, r);
00690   return REC_ERR_OKAY;
00691 }
00692 
00693 
00694 //-------------------------------------------------------------------------
00695 // RecIncrRawStatXXX
00696 //-------------------------------------------------------------------------
00697 int
00698 RecIncrRawStatBlock(RecRawStatBlock */* rsb ATS_UNUSED */, EThread */* ethread ATS_UNUSED */,
00699                     RecRawStat */* stat_array ATS_UNUSED */)
00700 {
00701   return REC_ERR_FAIL;
00702 }
00703 
00704 
00705 //-------------------------------------------------------------------------
00706 // RecSetRawStatXXX
00707 //-------------------------------------------------------------------------
00708 int
00709 RecSetRawStatSum(RecRawStatBlock *rsb, int id, int64_t data)
00710 {
00711   raw_stat_clear_sum(rsb, id);
00712   ink_atomic_swap(&(rsb->global[id]->sum), data);
00713   return REC_ERR_OKAY;
00714 }
00715 
00716 int
00717 RecSetRawStatCount(RecRawStatBlock *rsb, int id, int64_t data)
00718 {
00719   raw_stat_clear_count(rsb, id);
00720   ink_atomic_swap(&(rsb->global[id]->count), data);
00721   return REC_ERR_OKAY;
00722 }
00723 
00724 int
00725 RecSetRawStatBlock(RecRawStatBlock */* rsb ATS_UNUSED */, RecRawStat */* stat_array ATS_UNUSED */)
00726 {
00727   return REC_ERR_FAIL;
00728 }
00729 
00730 
00731 //-------------------------------------------------------------------------
00732 // RecGetRawStatXXX
00733 //-------------------------------------------------------------------------
00734 
00735 int
00736 RecGetRawStatSum(RecRawStatBlock *rsb, int id, int64_t *data)
00737 {
00738   RecRawStat total;
00739 
00740   raw_stat_get_total(rsb, id, &total);
00741   *data = total.sum;
00742   return REC_ERR_OKAY;
00743 }
00744 
00745 int
00746 RecGetRawStatCount(RecRawStatBlock *rsb, int id, int64_t *data)
00747 {
00748   RecRawStat total;
00749 
00750   raw_stat_get_total(rsb, id, &total);
00751   *data = total.count;
00752   return REC_ERR_OKAY;
00753 }
00754 
00755 
00756 //-------------------------------------------------------------------------
00757 // RecIncrGlobalRawStatXXX
00758 //-------------------------------------------------------------------------
00759 int
00760 RecIncrGlobalRawStat(RecRawStatBlock *rsb, int id, int64_t incr)
00761 {
00762   ink_atomic_increment(&(rsb->global[id]->sum), incr);
00763   ink_atomic_increment(&(rsb->global[id]->count), 1);
00764   return REC_ERR_OKAY;
00765 }
00766 
00767 int
00768 RecIncrGlobalRawStatSum(RecRawStatBlock *rsb, int id, int64_t incr)
00769 {
00770   ink_atomic_increment(&(rsb->global[id]->sum), incr);
00771   return REC_ERR_OKAY;
00772 }
00773 
00774 int
00775 RecIncrGlobalRawStatCount(RecRawStatBlock *rsb, int id, int64_t incr)
00776 {
00777   ink_atomic_increment(&(rsb->global[id]->count), incr);
00778   return REC_ERR_OKAY;
00779 }
00780 
00781 
00782 //-------------------------------------------------------------------------
00783 // RecSetGlobalRawStatXXX
00784 //-------------------------------------------------------------------------
00785 int
00786 RecSetGlobalRawStatSum(RecRawStatBlock *rsb, int id, int64_t data)
00787 {
00788   ink_atomic_swap(&(rsb->global[id]->sum), data);
00789   return REC_ERR_OKAY;
00790 }
00791 
00792 int
00793 RecSetGlobalRawStatCount(RecRawStatBlock *rsb, int id, int64_t data)
00794 {
00795   ink_atomic_swap(&(rsb->global[id]->count), data);
00796   return REC_ERR_OKAY;
00797 }
00798 
00799 
00800 //-------------------------------------------------------------------------
00801 // RecGetGlobalRawStatXXX
00802 //-------------------------------------------------------------------------
00803 int
00804 RecGetGlobalRawStatSum(RecRawStatBlock *rsb, int id, int64_t *data)
00805 {
00806   *data = rsb->global[id]->sum;
00807   return REC_ERR_OKAY;
00808 }
00809 
00810 int
00811 RecGetGlobalRawStatCount(RecRawStatBlock *rsb, int id, int64_t *data)
00812 {
00813   *data = rsb->global[id]->count;
00814   return REC_ERR_OKAY;
00815 }
00816 
00817 
00818 //-------------------------------------------------------------------------
00819 // RegGetGlobalRawStatXXXPtr
00820 //-------------------------------------------------------------------------
00821 RecRawStat *
00822 RecGetGlobalRawStatPtr(RecRawStatBlock *rsb, int id)
00823 {
00824   return rsb->global[id];
00825 }
00826 
00827 int64_t *
00828 RecGetGlobalRawStatSumPtr(RecRawStatBlock *rsb, int id)
00829 {
00830   return &(rsb->global[id]->sum);
00831 }
00832 
00833 int64_t *
00834 RecGetGlobalRawStatCountPtr(RecRawStatBlock *rsb, int id)
00835 {
00836   return &(rsb->global[id]->count);
00837 }
00838 
00839 
00840 //-------------------------------------------------------------------------
00841 // RecRegisterRawStatSyncCb
00842 //-------------------------------------------------------------------------
00843 int
00844 RecRegisterRawStatSyncCb(const char *name, RecRawStatSyncCb sync_cb, RecRawStatBlock *rsb, int id)
00845 {
00846   int err = REC_ERR_FAIL;
00847   RecRecord *r;
00848 
00849   ink_rwlock_rdlock(&g_records_rwlock);
00850   if (ink_hash_table_lookup(g_records_ht, name, (void **) &r)) {
00851     rec_mutex_acquire(&(r->lock));
00852     if (REC_TYPE_IS_STAT(r->rec_type)) {
00853       if (!(r->stat_meta.sync_cb)) {
00854         r->stat_meta.sync_rsb = rsb;
00855         r->stat_meta.sync_id = id;
00856         r->stat_meta.sync_cb = sync_cb;
00857         r->stat_meta.sync_rsb->global[r->stat_meta.sync_id]->version = r->version;
00858         err = REC_ERR_OKAY;
00859       } else {
00860         ink_release_assert(false); // We shouldn't register CBs twice...
00861       }
00862     }
00863     rec_mutex_release(&(r->lock));
00864   }
00865   ink_rwlock_unlock(&g_records_rwlock);
00866 
00867   return err;
00868 }
00869 
00870 
00871 //-------------------------------------------------------------------------
00872 // RecExecRawStatSyncCbs
00873 //-------------------------------------------------------------------------
00874 int
00875 RecExecRawStatSyncCbs()
00876 {
00877   RecRecord *r;
00878   int i, num_records;
00879 
00880   num_records = g_num_records;
00881   for (i = 0; i < num_records; i++) {
00882     r = &(g_records[i]);
00883     rec_mutex_acquire(&(r->lock));
00884     if (REC_TYPE_IS_STAT(r->rec_type)) {
00885       if (r->stat_meta.sync_cb) {
00886         if (r->version && r->version != r->stat_meta.sync_rsb->global[r->stat_meta.sync_id]->version) {
00887           raw_stat_clear(r->stat_meta.sync_rsb, r->stat_meta.sync_id);
00888           r->stat_meta.sync_rsb->global[r->stat_meta.sync_id]->version = r->version;
00889         } else {
00890           (*(r->stat_meta.sync_cb)) (r->name, r->data_type, &(r->data), r->stat_meta.sync_rsb, r->stat_meta.sync_id);
00891         }
00892         r->sync_required = REC_SYNC_REQUIRED;
00893       }
00894     }
00895     rec_mutex_release(&(r->lock));
00896   }
00897 
00898   return REC_ERR_OKAY;
00899 }
00900 
00901 void
00902 RecSignalManager(int id, const char * msg, size_t msgsize)
00903 {
00904   ink_assert(pmgmt);
00905   pmgmt->signalManager(id, msg, msgsize);
00906 }
00907 
00908 int
00909 RecRegisterManagerCb(int _signal, RecManagerCb _fn, void *_data)
00910 {
00911   return pmgmt->registerMgmtCallback(_signal, _fn, _data);
00912 }
00913 
00914 //-------------------------------------------------------------------------
00915 // RecMessageSend
00916 //-------------------------------------------------------------------------
00917 
00918 int
00919 RecMessageSend(RecMessage * msg)
00920 {
00921   int msg_size;
00922 
00923   if (!message_initialized_p)
00924     return REC_ERR_OKAY;
00925 
00926   // Make a copy of the record, but truncate it to the size actually used
00927   if (g_mode_type == RECM_CLIENT || g_mode_type == RECM_SERVER) {
00928     msg->o_end = msg->o_write;
00929     msg_size = sizeof(RecMessageHdr) + (msg->o_write - msg->o_start);
00930     pmgmt->signalManager(MGMT_SIGNAL_LIBRECORDS, (char *) msg, msg_size);
00931   }
00932 
00933   return REC_ERR_OKAY;
00934 }
00935 

Generated by  doxygen 1.7.1