00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 #include "libts.h"
00025 
00026 #include "I_Tasks.h"
00027 
00028 #include "P_EventSystem.h"
00029 #include "P_RecCore.h"
00030 #include "P_RecProcess.h"
00031 #include "P_RecMessage.h"
00032 #include "P_RecUtils.h"
00033 #include "P_RecFile.h"
00034 
00035 #include "mgmtapi.h"
00036 #include "ProcessManager.h"
00037 
00038 
00039 static bool message_initialized_p = false;
00040 static bool g_started = false;
00041 static EventNotify g_force_req_notify;
00042 static int g_rec_raw_stat_sync_interval_ms = REC_RAW_STAT_SYNC_INTERVAL_MS;
00043 static int g_rec_config_update_interval_ms = REC_CONFIG_UPDATE_INTERVAL_MS;
00044 static int g_rec_remote_sync_interval_ms = REC_REMOTE_SYNC_INTERVAL_MS;
00045 static Event *raw_stat_sync_cont_event;
00046 static Event *config_update_cont_event;
00047 static Event *sync_cont_event;
00048 
00049 
00050 
00051 
00052 bool
00053 i_am_the_record_owner(RecT rec_type)
00054 {
00055   if (g_mode_type == RECM_CLIENT) {
00056     switch (rec_type) {
00057     case RECT_PROCESS:
00058     case RECT_PLUGIN:
00059       return true;
00060     case RECT_CONFIG:
00061     case RECT_NODE:
00062     case RECT_CLUSTER:
00063     case RECT_LOCAL:
00064       return false;
00065     default:
00066       ink_assert(!"Unexpected RecT type");
00067       return false;
00068     }
00069   } else if (g_mode_type == RECM_STAND_ALONE) {
00070     switch (rec_type) {
00071     case RECT_CONFIG:
00072     case RECT_PROCESS:
00073     case RECT_NODE:
00074     case RECT_CLUSTER:
00075     case RECT_LOCAL:
00076     case RECT_PLUGIN:
00077       return true;
00078     default:
00079       ink_assert(!"Unexpected RecT type");
00080       return false;
00081     }
00082   }
00083 
00084   return false;
00085 }
00086 
00087 
00088 
00089 
00090 void
00091 RecProcess_set_raw_stat_sync_interval_ms(int ms) {
00092   Debug("statsproc", "g_rec_raw_stat_sync_interval_ms -> %d", ms);
00093   g_rec_raw_stat_sync_interval_ms = ms;
00094   if (raw_stat_sync_cont_event) {
00095     Debug("statsproc", "Rescheduling raw-stat syncer");
00096     raw_stat_sync_cont_event->schedule_every(HRTIME_MSECONDS(g_rec_raw_stat_sync_interval_ms));
00097   }
00098 }
00099 void
00100 RecProcess_set_config_update_interval_ms(int ms) {
00101   Debug("statsproc", "g_rec_config_update_interval_ms -> %d", ms);
00102   g_rec_config_update_interval_ms = ms;
00103   if (config_update_cont_event) {
00104     Debug("statsproc", "Rescheduling config syncer");
00105     config_update_cont_event->schedule_every(HRTIME_MSECONDS(g_rec_config_update_interval_ms));
00106   }
00107 }
00108 void
00109 RecProcess_set_remote_sync_interval_ms(int ms) {
00110   Debug("statsproc", "g_rec_remote_sync_interval_ms -> %d", ms);
00111   g_rec_remote_sync_interval_ms = ms;
00112   if (sync_cont_event) {
00113     Debug("statsproc", "Rescheduling remote syncer");
00114     sync_cont_event->schedule_every(HRTIME_MSECONDS(g_rec_remote_sync_interval_ms));
00115   }
00116 }
00117 
00118 
00119 
00120 
00121 static int
00122 raw_stat_get_total(RecRawStatBlock *rsb, int id, RecRawStat *total)
00123 {
00124   int i;
00125   RecRawStat *tlp;
00126 
00127   total->sum = 0;
00128   total->count = 0;
00129 
00130   
00131   total->sum = rsb->global[id]->sum;
00132   total->count = rsb->global[id]->count;
00133 
00134   
00135   for (i = 0; i < eventProcessor.n_ethreads; i++) {
00136     tlp = ((RecRawStat *) ((char *) (eventProcessor.all_ethreads[i]) + rsb->ethr_stat_offset)) + id;
00137     total->sum += tlp->sum;
00138     total->count += tlp->count;
00139   }
00140 
00141   for (i = 0; i < eventProcessor.n_dthreads; i++) {
00142     tlp = ((RecRawStat *) ((char *) (eventProcessor.all_dthreads[i]) + rsb->ethr_stat_offset)) + id;
00143     total->sum += tlp->sum;
00144     total->count += tlp->count;
00145   }
00146 
00147   if (total->sum < 0) { 
00148     total->sum = 0;
00149   }
00150 
00151   return REC_ERR_OKAY;
00152 }
00153 
00154 
00155 
00156 
00157 
00158 static int
00159 raw_stat_sync_to_global(RecRawStatBlock *rsb, int id)
00160 {
00161   int i;
00162   RecRawStat *tlp;
00163   RecRawStat total;
00164 
00165   total.sum = 0;
00166   total.count = 0;
00167 
00168   
00169   for (i = 0; i < eventProcessor.n_ethreads; i++) {
00170     tlp = ((RecRawStat *) ((char *) (eventProcessor.all_ethreads[i]) + rsb->ethr_stat_offset)) + id;
00171     total.sum += tlp->sum;
00172     total.count += tlp->count;
00173   }
00174 
00175   for (i = 0; i < eventProcessor.n_dthreads; i++) {
00176     tlp = ((RecRawStat *) ((char *) (eventProcessor.all_dthreads[i]) + rsb->ethr_stat_offset)) + id;
00177     total.sum += tlp->sum;
00178     total.count += tlp->count;
00179   }
00180 
00181   if (total.sum < 0) { 
00182     total.sum = 0;
00183   }
00184 
00185   
00186   ink_mutex_acquire(&(rsb->mutex));
00187 
00188   
00189   RecRawStat delta;
00190   delta.sum = total.sum - rsb->global[id]->last_sum;
00191   delta.count = total.count - rsb->global[id]->last_count;
00192 
00193   
00194   
00195   
00196 
00197   
00198   ink_atomic_increment(&(rsb->global[id]->sum), delta.sum);
00199   ink_atomic_increment(&(rsb->global[id]->count), delta.count);
00200 
00201   
00202   ink_atomic_swap(&(rsb->global[id]->last_sum), total.sum);
00203   ink_atomic_swap(&(rsb->global[id]->last_count), total.count);
00204 
00205   ink_mutex_release(&(rsb->mutex));
00206 
00207   return REC_ERR_OKAY;
00208 }
00209 
00210 
00211 
00212 
00213 
00214 static int
00215 raw_stat_clear(RecRawStatBlock *rsb, int id)
00216 {
00217   Debug("stats", "raw_stat_clear(): rsb pointer:%p id:%d\n", rsb, id);
00218 
00219   
00220   
00221   ink_mutex_acquire(&(rsb->mutex));
00222   ink_atomic_swap(&(rsb->global[id]->sum), (int64_t)0);
00223   ink_atomic_swap(&(rsb->global[id]->last_sum), (int64_t)0);
00224   ink_atomic_swap(&(rsb->global[id]->count), (int64_t)0);
00225   ink_atomic_swap(&(rsb->global[id]->last_count), (int64_t)0);
00226   ink_mutex_release(&(rsb->mutex));
00227 
00228   
00229   RecRawStat *tlp;
00230   for (int i = 0; i < eventProcessor.n_ethreads; i++) {
00231     tlp = ((RecRawStat *) ((char *) (eventProcessor.all_ethreads[i]) + rsb->ethr_stat_offset)) + id;
00232     ink_atomic_swap(&(tlp->sum), (int64_t)0);
00233     ink_atomic_swap(&(tlp->count), (int64_t)0);
00234   }
00235 
00236   for (int i = 0; i < eventProcessor.n_dthreads; i++) {
00237     tlp = ((RecRawStat *) ((char *) (eventProcessor.all_dthreads[i]) + rsb->ethr_stat_offset)) + id;
00238     ink_atomic_swap(&(tlp->sum), (int64_t)0);
00239     ink_atomic_swap(&(tlp->count), (int64_t)0);
00240   }
00241 
00242   return REC_ERR_OKAY;
00243 }
00244 
00245 
00246 
00247 
00248 
00249 static int
00250 raw_stat_clear_sum(RecRawStatBlock *rsb, int id)
00251 {
00252   Debug("stats", "raw_stat_clear_sum(): rsb pointer:%p id:%d\n", rsb, id);
00253 
00254   
00255   
00256   ink_mutex_acquire(&(rsb->mutex));
00257   ink_atomic_swap(&(rsb->global[id]->sum), (int64_t)0);
00258   ink_atomic_swap(&(rsb->global[id]->last_sum), (int64_t)0);
00259   ink_mutex_release(&(rsb->mutex));
00260 
00261   
00262   RecRawStat *tlp;
00263   for (int i = 0; i < eventProcessor.n_ethreads; i++) {
00264     tlp = ((RecRawStat *) ((char *) (eventProcessor.all_ethreads[i]) + rsb->ethr_stat_offset)) + id;
00265     ink_atomic_swap(&(tlp->sum), (int64_t)0);
00266   }
00267 
00268   for (int i = 0; i < eventProcessor.n_dthreads; i++) {
00269     tlp = ((RecRawStat *) ((char *) (eventProcessor.all_dthreads[i]) + rsb->ethr_stat_offset)) + id;
00270     ink_atomic_swap(&(tlp->sum), (int64_t)0);
00271   }
00272 
00273   return REC_ERR_OKAY;
00274 }
00275 
00276 
00277 
00278 
00279 
00280 static int
00281 raw_stat_clear_count(RecRawStatBlock *rsb, int id)
00282 {
00283   Debug("stats", "raw_stat_clear_count(): rsb pointer:%p id:%d\n", rsb, id);
00284 
00285   
00286   
00287   ink_mutex_acquire(&(rsb->mutex));
00288   ink_atomic_swap(&(rsb->global[id]->count), (int64_t)0);
00289   ink_atomic_swap(&(rsb->global[id]->last_count), (int64_t)0);
00290   ink_mutex_release(&(rsb->mutex));
00291 
00292   
00293   RecRawStat *tlp;
00294   for (int i = 0; i < eventProcessor.n_ethreads; i++) {
00295     tlp = ((RecRawStat *) ((char *) (eventProcessor.all_ethreads[i]) + rsb->ethr_stat_offset)) + id;
00296     ink_atomic_swap(&(tlp->count), (int64_t)0);
00297   }
00298 
00299   for (int i = 0; i < eventProcessor.n_dthreads; i++) {
00300     tlp = ((RecRawStat *) ((char *) (eventProcessor.all_dthreads[i]) + rsb->ethr_stat_offset)) + id;
00301     ink_atomic_swap(&(tlp->count), (int64_t)0);
00302   }
00303 
00304   return REC_ERR_OKAY;
00305 }
00306 
00307 
00308 
00309 
00310 
00311 static int
00312 recv_message_cb__process(RecMessage *msg, RecMessageT msg_type, void *cookie)
00313 {
00314   int err;
00315 
00316   if ((err = recv_message_cb(msg, msg_type, cookie)) == REC_ERR_OKAY) {
00317     if (msg_type == RECG_PULL_ACK) {
00318       g_force_req_notify.lock();
00319       g_force_req_notify.signal();
00320       g_force_req_notify.unlock();
00321     }
00322   }
00323   return err;
00324 }
00325 
00326 
00327 
00328 
00329 
00330 struct raw_stat_sync_cont: public Continuation
00331 {
00332   raw_stat_sync_cont(ProxyMutex *m)
00333     : Continuation(m)
00334   {
00335     SET_HANDLER(&raw_stat_sync_cont::exec_callbacks);
00336   }
00337 
00338   int exec_callbacks(int , Event * )
00339   {
00340     RecExecRawStatSyncCbs();
00341     Debug("statsproc", "raw_stat_sync_cont() processed");
00342 
00343     return EVENT_CONT;
00344   }
00345 };
00346 
00347 
00348 
00349 
00350 
00351 struct config_update_cont: public Continuation
00352 {
00353   config_update_cont(ProxyMutex *m)
00354     : Continuation(m)
00355   {
00356     SET_HANDLER(&config_update_cont::exec_callbacks);
00357   }
00358 
00359   int exec_callbacks(int , Event * )
00360   {
00361     RecExecConfigUpdateCbs(REC_PROCESS_UPDATE_REQUIRED);
00362     Debug("statsproc", "config_update_cont() processed");
00363 
00364     return EVENT_CONT;
00365   }
00366 };
00367 
00368 
00369 
00370 
00371 
00372 struct sync_cont: public Continuation
00373 {
00374   textBuffer *m_tb;
00375 
00376   sync_cont(ProxyMutex *m)
00377     : Continuation(m)
00378   {
00379     SET_HANDLER(&sync_cont::sync);
00380     m_tb = new textBuffer(65536);
00381   }
00382 
00383    ~sync_cont()
00384   {
00385     if (m_tb != NULL) {
00386       delete m_tb;
00387       m_tb = NULL;
00388     }
00389   }
00390 
00391   int sync(int , Event * )
00392   {
00393     send_push_message();
00394     RecSyncStatsFile();
00395     if (RecSyncConfigToTB(m_tb) == REC_ERR_OKAY) {
00396         RecWriteConfigFile(m_tb);
00397     }
00398     Debug("statsproc", "sync_cont() processed");
00399 
00400     return EVENT_CONT;
00401   }
00402 };
00403 
00404 
00405 
00406 
00407 
00408 int
00409 RecProcessInit(RecModeT mode_type, Diags *_diags)
00410 {
00411   static bool initialized_p = false;
00412 
00413   if (initialized_p) {
00414     return REC_ERR_OKAY;
00415   }
00416 
00417   g_mode_type = mode_type;
00418 
00419   if (RecCoreInit(mode_type, _diags) == REC_ERR_FAIL) {
00420     return REC_ERR_FAIL;
00421   }
00422 
00423   
00424 
00425 
00426 
00427 
00428 
00429 
00430 
00431 
00432 
00433 
00434 
00435 
00436 
00437 
00438 
00439 
00440 
00441 
00442   initialized_p = true;
00443 
00444   return REC_ERR_OKAY;
00445 }
00446 
00447 
00448 void
00449 RecMessageInit()
00450 {
00451   ink_assert(g_mode_type != RECM_NULL);
00452   pmgmt->registerMgmtCallback(MGMT_EVENT_LIBRECORDS, RecMessageRecvThis, NULL);
00453   message_initialized_p = true;
00454 }
00455 
00456 
00457 
00458 
00459 int
00460 RecProcessInitMessage(RecModeT mode_type)
00461 {
00462   static bool initialized_p = false;
00463 
00464   if (initialized_p) {
00465     return REC_ERR_OKAY;
00466   }
00467 
00468   RecMessageInit();
00469   if (RecMessageRegisterRecvCb(recv_message_cb__process, NULL)) {
00470     return REC_ERR_FAIL;
00471   }
00472 
00473   if (mode_type == RECM_CLIENT) {
00474     send_pull_message(RECG_PULL_REQ);
00475     g_force_req_notify.lock();
00476     g_force_req_notify.wait();
00477     g_force_req_notify.unlock();
00478   }
00479 
00480   initialized_p = true;
00481 
00482   return REC_ERR_OKAY;
00483 }
00484 
00485 
00486 
00487 
00488 
00489 int
00490 RecProcessStart(void)
00491 {
00492   if (g_started) {
00493     return REC_ERR_OKAY;
00494   }
00495 
00496   Debug("statsproc", "Starting sync continuations:");
00497   raw_stat_sync_cont *rssc = new raw_stat_sync_cont(new_ProxyMutex());
00498   Debug("statsproc", "\traw-stat syncer");
00499   raw_stat_sync_cont_event = eventProcessor.schedule_every(rssc, HRTIME_MSECONDS(g_rec_raw_stat_sync_interval_ms), ET_TASK);
00500 
00501   config_update_cont *cuc = new config_update_cont(new_ProxyMutex());
00502   Debug("statsproc", "\tconfig syncer");
00503   config_update_cont_event = eventProcessor.schedule_every(cuc, HRTIME_MSECONDS(g_rec_config_update_interval_ms), ET_TASK);
00504 
00505   sync_cont *sc = new sync_cont(new_ProxyMutex());
00506   Debug("statsproc", "\tremote syncer");
00507   sync_cont_event = eventProcessor.schedule_every(sc, HRTIME_MSECONDS(g_rec_remote_sync_interval_ms), ET_TASK);
00508 
00509   g_started = true;
00510 
00511   return REC_ERR_OKAY;
00512 }
00513 
00514 
00515 
00516 
00517 
00518 RecRawStatBlock *
00519 RecAllocateRawStatBlock(int num_stats)
00520 {
00521   off_t ethr_stat_offset;
00522   RecRawStatBlock *rsb;
00523 
00524   
00525   if ((ethr_stat_offset = eventProcessor.allocate(num_stats * sizeof(RecRawStat))) == -1) {
00526     return NULL;
00527   }
00528   
00529   rsb = (RecRawStatBlock *)ats_malloc(sizeof(RecRawStatBlock));
00530   memset(rsb, 0, sizeof(RecRawStatBlock));
00531   rsb->ethr_stat_offset = ethr_stat_offset;
00532   rsb->global = (RecRawStat **)ats_malloc(num_stats * sizeof(RecRawStat *));
00533   memset(rsb->global, 0, num_stats * sizeof(RecRawStat *));
00534   rsb->num_stats = 0;
00535   rsb->max_stats = num_stats;
00536   ink_mutex_init(&(rsb->mutex),"net stat mutex");
00537   return rsb;
00538 }
00539 
00540 
00541 
00542 
00543 
00544 int
00545 _RecRegisterRawStat(RecRawStatBlock *rsb, RecT rec_type, const char *name, RecDataT data_type, RecPersistT persist_type, int id,
00546                    RecRawStatSyncCb sync_cb)
00547 {
00548   Debug("stats", "RecRawStatSyncCb(%s): rsb pointer:%p id:%d\n", name, rsb, id);
00549 
00550   
00551   ink_assert(id < rsb->max_stats);
00552 
00553   int err = REC_ERR_OKAY;
00554 
00555   RecRecord *r;
00556   RecData data_default;
00557   memset(&data_default, 0, sizeof(RecData));
00558 
00559   
00560   if ((r = RecRegisterStat(rec_type, name, data_type, data_default, persist_type)) == NULL) {
00561     err = REC_ERR_FAIL;
00562     goto Ldone;
00563   }
00564   r->rsb_id = id; 
00565   if (i_am_the_record_owner(r->rec_type)) {
00566     r->sync_required = r->sync_required | REC_PEER_SYNC_REQUIRED;
00567   } else {
00568     send_register_message(r);
00569   }
00570 
00571   
00572   rsb->global[id] = &(r->stat_meta.data_raw);
00573   rsb->global[id]->last_sum = 0;
00574   rsb->global[id]->last_count = 0;
00575 
00576   
00577   RecRegisterRawStatSyncCb(name, sync_cb, rsb, id);
00578 
00579 Ldone:
00580   return err;
00581 }
00582 
00583 
00584 
00585 
00586 
00587 
00588 
00589 
00590 int
00591 RecRawStatSyncSum(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00592 {
00593   RecRawStat total;
00594 
00595   Debug("stats", "raw sync:sum for %s", name);
00596   raw_stat_sync_to_global(rsb, id);
00597   total.sum = rsb->global[id]->sum;
00598   total.count = rsb->global[id]->count;
00599   RecDataSetFromInk64(data_type, data, total.sum);
00600 
00601   return REC_ERR_OKAY;
00602 }
00603 
00604 int
00605 RecRawStatSyncCount(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00606 {
00607   RecRawStat total;
00608 
00609   Debug("stats", "raw sync:count for %s", name);
00610   raw_stat_sync_to_global(rsb, id);
00611   total.sum = rsb->global[id]->sum;
00612   total.count = rsb->global[id]->count;
00613   RecDataSetFromInk64(data_type, data, total.count);
00614 
00615   return REC_ERR_OKAY;
00616 }
00617 
00618 int
00619 RecRawStatSyncAvg(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00620 {
00621   RecRawStat total;
00622   RecFloat avg = 0.0f;
00623 
00624   Debug("stats", "raw sync:avg for %s", name);
00625   raw_stat_sync_to_global(rsb, id);
00626   total.sum = rsb->global[id]->sum;
00627   total.count = rsb->global[id]->count;
00628   if (total.count != 0)
00629     avg = (float) ((double) total.sum / (double) total.count);
00630   RecDataSetFromFloat(data_type, data, avg);
00631   return REC_ERR_OKAY;
00632 }
00633 
00634 int
00635 RecRawStatSyncHrTimeAvg(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00636 {
00637   RecRawStat total;
00638   RecFloat r;
00639 
00640   Debug("stats", "raw sync:hr-timeavg for %s", name);
00641   raw_stat_sync_to_global(rsb, id);
00642   total.sum = rsb->global[id]->sum;
00643   total.count = rsb->global[id]->count;
00644   if (total.count == 0) {
00645     r = 0.0f;
00646   } else {
00647     r = (float) ((double) total.sum / (double) total.count);
00648     r = r / (float) (HRTIME_SECOND);
00649   }
00650   RecDataSetFromFloat(data_type, data, r);
00651   return REC_ERR_OKAY;
00652 }
00653 
00654 int
00655 RecRawStatSyncIntMsecsToFloatSeconds(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00656 {
00657   RecRawStat total;
00658   RecFloat r;
00659 
00660   Debug("stats", "raw sync:seconds for %s", name);
00661   raw_stat_sync_to_global(rsb, id);
00662   total.sum = rsb->global[id]->sum;
00663   total.count = rsb->global[id]->count;
00664   if (total.count == 0) {
00665     r = 0.0f;
00666   } else {
00667     r = (float) ((double) total.sum / 1000);
00668   }
00669   RecDataSetFromFloat(data_type, data, r);
00670   return REC_ERR_OKAY;
00671 }
00672 
00673 int
00674 RecRawStatSyncMHrTimeAvg(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00675 {
00676   RecRawStat total;
00677   RecFloat r;
00678 
00679   Debug("stats", "raw sync:mhr-timeavg for %s", name);
00680   raw_stat_sync_to_global(rsb, id);
00681   total.sum = rsb->global[id]->sum;
00682   total.count = rsb->global[id]->count;
00683   if (total.count == 0) {
00684     r = 0.0f;
00685   } else {
00686     r = (float) ((double) total.sum / (double) total.count);
00687     r = r / (float) (HRTIME_MSECOND);
00688   }
00689   RecDataSetFromFloat(data_type, data, r);
00690   return REC_ERR_OKAY;
00691 }
00692 
00693 
00694 
00695 
00696 
00697 int
00698 RecIncrRawStatBlock(RecRawStatBlock *, EThread *,
00699                     RecRawStat *)
00700 {
00701   return REC_ERR_FAIL;
00702 }
00703 
00704 
00705 
00706 
00707 
00708 int
00709 RecSetRawStatSum(RecRawStatBlock *rsb, int id, int64_t data)
00710 {
00711   raw_stat_clear_sum(rsb, id);
00712   ink_atomic_swap(&(rsb->global[id]->sum), data);
00713   return REC_ERR_OKAY;
00714 }
00715 
00716 int
00717 RecSetRawStatCount(RecRawStatBlock *rsb, int id, int64_t data)
00718 {
00719   raw_stat_clear_count(rsb, id);
00720   ink_atomic_swap(&(rsb->global[id]->count), data);
00721   return REC_ERR_OKAY;
00722 }
00723 
00724 int
00725 RecSetRawStatBlock(RecRawStatBlock *, RecRawStat *)
00726 {
00727   return REC_ERR_FAIL;
00728 }
00729 
00730 
00731 
00732 
00733 
00734 
00735 int
00736 RecGetRawStatSum(RecRawStatBlock *rsb, int id, int64_t *data)
00737 {
00738   RecRawStat total;
00739 
00740   raw_stat_get_total(rsb, id, &total);
00741   *data = total.sum;
00742   return REC_ERR_OKAY;
00743 }
00744 
00745 int
00746 RecGetRawStatCount(RecRawStatBlock *rsb, int id, int64_t *data)
00747 {
00748   RecRawStat total;
00749 
00750   raw_stat_get_total(rsb, id, &total);
00751   *data = total.count;
00752   return REC_ERR_OKAY;
00753 }
00754 
00755 
00756 
00757 
00758 
00759 int
00760 RecIncrGlobalRawStat(RecRawStatBlock *rsb, int id, int64_t incr)
00761 {
00762   ink_atomic_increment(&(rsb->global[id]->sum), incr);
00763   ink_atomic_increment(&(rsb->global[id]->count), 1);
00764   return REC_ERR_OKAY;
00765 }
00766 
00767 int
00768 RecIncrGlobalRawStatSum(RecRawStatBlock *rsb, int id, int64_t incr)
00769 {
00770   ink_atomic_increment(&(rsb->global[id]->sum), incr);
00771   return REC_ERR_OKAY;
00772 }
00773 
00774 int
00775 RecIncrGlobalRawStatCount(RecRawStatBlock *rsb, int id, int64_t incr)
00776 {
00777   ink_atomic_increment(&(rsb->global[id]->count), incr);
00778   return REC_ERR_OKAY;
00779 }
00780 
00781 
00782 
00783 
00784 
00785 int
00786 RecSetGlobalRawStatSum(RecRawStatBlock *rsb, int id, int64_t data)
00787 {
00788   ink_atomic_swap(&(rsb->global[id]->sum), data);
00789   return REC_ERR_OKAY;
00790 }
00791 
00792 int
00793 RecSetGlobalRawStatCount(RecRawStatBlock *rsb, int id, int64_t data)
00794 {
00795   ink_atomic_swap(&(rsb->global[id]->count), data);
00796   return REC_ERR_OKAY;
00797 }
00798 
00799 
00800 
00801 
00802 
00803 int
00804 RecGetGlobalRawStatSum(RecRawStatBlock *rsb, int id, int64_t *data)
00805 {
00806   *data = rsb->global[id]->sum;
00807   return REC_ERR_OKAY;
00808 }
00809 
00810 int
00811 RecGetGlobalRawStatCount(RecRawStatBlock *rsb, int id, int64_t *data)
00812 {
00813   *data = rsb->global[id]->count;
00814   return REC_ERR_OKAY;
00815 }
00816 
00817 
00818 
00819 
00820 
00821 RecRawStat *
00822 RecGetGlobalRawStatPtr(RecRawStatBlock *rsb, int id)
00823 {
00824   return rsb->global[id];
00825 }
00826 
00827 int64_t *
00828 RecGetGlobalRawStatSumPtr(RecRawStatBlock *rsb, int id)
00829 {
00830   return &(rsb->global[id]->sum);
00831 }
00832 
00833 int64_t *
00834 RecGetGlobalRawStatCountPtr(RecRawStatBlock *rsb, int id)
00835 {
00836   return &(rsb->global[id]->count);
00837 }
00838 
00839 
00840 
00841 
00842 
00843 int
00844 RecRegisterRawStatSyncCb(const char *name, RecRawStatSyncCb sync_cb, RecRawStatBlock *rsb, int id)
00845 {
00846   int err = REC_ERR_FAIL;
00847   RecRecord *r;
00848 
00849   ink_rwlock_rdlock(&g_records_rwlock);
00850   if (ink_hash_table_lookup(g_records_ht, name, (void **) &r)) {
00851     rec_mutex_acquire(&(r->lock));
00852     if (REC_TYPE_IS_STAT(r->rec_type)) {
00853       if (!(r->stat_meta.sync_cb)) {
00854         r->stat_meta.sync_rsb = rsb;
00855         r->stat_meta.sync_id = id;
00856         r->stat_meta.sync_cb = sync_cb;
00857         r->stat_meta.sync_rsb->global[r->stat_meta.sync_id]->version = r->version;
00858         err = REC_ERR_OKAY;
00859       } else {
00860         ink_release_assert(false); 
00861       }
00862     }
00863     rec_mutex_release(&(r->lock));
00864   }
00865   ink_rwlock_unlock(&g_records_rwlock);
00866 
00867   return err;
00868 }
00869 
00870 
00871 
00872 
00873 
00874 int
00875 RecExecRawStatSyncCbs()
00876 {
00877   RecRecord *r;
00878   int i, num_records;
00879 
00880   num_records = g_num_records;
00881   for (i = 0; i < num_records; i++) {
00882     r = &(g_records[i]);
00883     rec_mutex_acquire(&(r->lock));
00884     if (REC_TYPE_IS_STAT(r->rec_type)) {
00885       if (r->stat_meta.sync_cb) {
00886         if (r->version && r->version != r->stat_meta.sync_rsb->global[r->stat_meta.sync_id]->version) {
00887           raw_stat_clear(r->stat_meta.sync_rsb, r->stat_meta.sync_id);
00888           r->stat_meta.sync_rsb->global[r->stat_meta.sync_id]->version = r->version;
00889         } else {
00890           (*(r->stat_meta.sync_cb)) (r->name, r->data_type, &(r->data), r->stat_meta.sync_rsb, r->stat_meta.sync_id);
00891         }
00892         r->sync_required = REC_SYNC_REQUIRED;
00893       }
00894     }
00895     rec_mutex_release(&(r->lock));
00896   }
00897 
00898   return REC_ERR_OKAY;
00899 }
00900 
00901 void
00902 RecSignalManager(int id, const char * msg, size_t msgsize)
00903 {
00904   ink_assert(pmgmt);
00905   pmgmt->signalManager(id, msg, msgsize);
00906 }
00907 
00908 int
00909 RecRegisterManagerCb(int _signal, RecManagerCb _fn, void *_data)
00910 {
00911   return pmgmt->registerMgmtCallback(_signal, _fn, _data);
00912 }
00913 
00914 
00915 
00916 
00917 
00918 int
00919 RecMessageSend(RecMessage * msg)
00920 {
00921   int msg_size;
00922 
00923   if (!message_initialized_p)
00924     return REC_ERR_OKAY;
00925 
00926   
00927   if (g_mode_type == RECM_CLIENT || g_mode_type == RECM_SERVER) {
00928     msg->o_end = msg->o_write;
00929     msg_size = sizeof(RecMessageHdr) + (msg->o_write - msg->o_start);
00930     pmgmt->signalManager(MGMT_SIGNAL_LIBRECORDS, (char *) msg, msg_size);
00931   }
00932 
00933   return REC_ERR_OKAY;
00934 }
00935