00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "libts.h"
00025
00026 #include "I_Tasks.h"
00027
00028 #include "P_EventSystem.h"
00029 #include "P_RecCore.h"
00030 #include "P_RecProcess.h"
00031 #include "P_RecMessage.h"
00032 #include "P_RecUtils.h"
00033 #include "P_RecFile.h"
00034
00035 #include "mgmtapi.h"
00036 #include "ProcessManager.h"
00037
00038
00039 static bool message_initialized_p = false;
00040 static bool g_started = false;
00041 static EventNotify g_force_req_notify;
00042 static int g_rec_raw_stat_sync_interval_ms = REC_RAW_STAT_SYNC_INTERVAL_MS;
00043 static int g_rec_config_update_interval_ms = REC_CONFIG_UPDATE_INTERVAL_MS;
00044 static int g_rec_remote_sync_interval_ms = REC_REMOTE_SYNC_INTERVAL_MS;
00045 static Event *raw_stat_sync_cont_event;
00046 static Event *config_update_cont_event;
00047 static Event *sync_cont_event;
00048
00049
00050
00051
00052 bool
00053 i_am_the_record_owner(RecT rec_type)
00054 {
00055 if (g_mode_type == RECM_CLIENT) {
00056 switch (rec_type) {
00057 case RECT_PROCESS:
00058 case RECT_PLUGIN:
00059 return true;
00060 case RECT_CONFIG:
00061 case RECT_NODE:
00062 case RECT_CLUSTER:
00063 case RECT_LOCAL:
00064 return false;
00065 default:
00066 ink_assert(!"Unexpected RecT type");
00067 return false;
00068 }
00069 } else if (g_mode_type == RECM_STAND_ALONE) {
00070 switch (rec_type) {
00071 case RECT_CONFIG:
00072 case RECT_PROCESS:
00073 case RECT_NODE:
00074 case RECT_CLUSTER:
00075 case RECT_LOCAL:
00076 case RECT_PLUGIN:
00077 return true;
00078 default:
00079 ink_assert(!"Unexpected RecT type");
00080 return false;
00081 }
00082 }
00083
00084 return false;
00085 }
00086
00087
00088
00089
00090 void
00091 RecProcess_set_raw_stat_sync_interval_ms(int ms) {
00092 Debug("statsproc", "g_rec_raw_stat_sync_interval_ms -> %d", ms);
00093 g_rec_raw_stat_sync_interval_ms = ms;
00094 if (raw_stat_sync_cont_event) {
00095 Debug("statsproc", "Rescheduling raw-stat syncer");
00096 raw_stat_sync_cont_event->schedule_every(HRTIME_MSECONDS(g_rec_raw_stat_sync_interval_ms));
00097 }
00098 }
00099 void
00100 RecProcess_set_config_update_interval_ms(int ms) {
00101 Debug("statsproc", "g_rec_config_update_interval_ms -> %d", ms);
00102 g_rec_config_update_interval_ms = ms;
00103 if (config_update_cont_event) {
00104 Debug("statsproc", "Rescheduling config syncer");
00105 config_update_cont_event->schedule_every(HRTIME_MSECONDS(g_rec_config_update_interval_ms));
00106 }
00107 }
00108 void
00109 RecProcess_set_remote_sync_interval_ms(int ms) {
00110 Debug("statsproc", "g_rec_remote_sync_interval_ms -> %d", ms);
00111 g_rec_remote_sync_interval_ms = ms;
00112 if (sync_cont_event) {
00113 Debug("statsproc", "Rescheduling remote syncer");
00114 sync_cont_event->schedule_every(HRTIME_MSECONDS(g_rec_remote_sync_interval_ms));
00115 }
00116 }
00117
00118
00119
00120
00121 static int
00122 raw_stat_get_total(RecRawStatBlock *rsb, int id, RecRawStat *total)
00123 {
00124 int i;
00125 RecRawStat *tlp;
00126
00127 total->sum = 0;
00128 total->count = 0;
00129
00130
00131 total->sum = rsb->global[id]->sum;
00132 total->count = rsb->global[id]->count;
00133
00134
00135 for (i = 0; i < eventProcessor.n_ethreads; i++) {
00136 tlp = ((RecRawStat *) ((char *) (eventProcessor.all_ethreads[i]) + rsb->ethr_stat_offset)) + id;
00137 total->sum += tlp->sum;
00138 total->count += tlp->count;
00139 }
00140
00141 for (i = 0; i < eventProcessor.n_dthreads; i++) {
00142 tlp = ((RecRawStat *) ((char *) (eventProcessor.all_dthreads[i]) + rsb->ethr_stat_offset)) + id;
00143 total->sum += tlp->sum;
00144 total->count += tlp->count;
00145 }
00146
00147 if (total->sum < 0) {
00148 total->sum = 0;
00149 }
00150
00151 return REC_ERR_OKAY;
00152 }
00153
00154
00155
00156
00157
00158 static int
00159 raw_stat_sync_to_global(RecRawStatBlock *rsb, int id)
00160 {
00161 int i;
00162 RecRawStat *tlp;
00163 RecRawStat total;
00164
00165 total.sum = 0;
00166 total.count = 0;
00167
00168
00169 for (i = 0; i < eventProcessor.n_ethreads; i++) {
00170 tlp = ((RecRawStat *) ((char *) (eventProcessor.all_ethreads[i]) + rsb->ethr_stat_offset)) + id;
00171 total.sum += tlp->sum;
00172 total.count += tlp->count;
00173 }
00174
00175 for (i = 0; i < eventProcessor.n_dthreads; i++) {
00176 tlp = ((RecRawStat *) ((char *) (eventProcessor.all_dthreads[i]) + rsb->ethr_stat_offset)) + id;
00177 total.sum += tlp->sum;
00178 total.count += tlp->count;
00179 }
00180
00181 if (total.sum < 0) {
00182 total.sum = 0;
00183 }
00184
00185
00186 ink_mutex_acquire(&(rsb->mutex));
00187
00188
00189 RecRawStat delta;
00190 delta.sum = total.sum - rsb->global[id]->last_sum;
00191 delta.count = total.count - rsb->global[id]->last_count;
00192
00193
00194
00195
00196
00197
00198 ink_atomic_increment(&(rsb->global[id]->sum), delta.sum);
00199 ink_atomic_increment(&(rsb->global[id]->count), delta.count);
00200
00201
00202 ink_atomic_swap(&(rsb->global[id]->last_sum), total.sum);
00203 ink_atomic_swap(&(rsb->global[id]->last_count), total.count);
00204
00205 ink_mutex_release(&(rsb->mutex));
00206
00207 return REC_ERR_OKAY;
00208 }
00209
00210
00211
00212
00213
00214 static int
00215 raw_stat_clear(RecRawStatBlock *rsb, int id)
00216 {
00217 Debug("stats", "raw_stat_clear(): rsb pointer:%p id:%d\n", rsb, id);
00218
00219
00220
00221 ink_mutex_acquire(&(rsb->mutex));
00222 ink_atomic_swap(&(rsb->global[id]->sum), (int64_t)0);
00223 ink_atomic_swap(&(rsb->global[id]->last_sum), (int64_t)0);
00224 ink_atomic_swap(&(rsb->global[id]->count), (int64_t)0);
00225 ink_atomic_swap(&(rsb->global[id]->last_count), (int64_t)0);
00226 ink_mutex_release(&(rsb->mutex));
00227
00228
00229 RecRawStat *tlp;
00230 for (int i = 0; i < eventProcessor.n_ethreads; i++) {
00231 tlp = ((RecRawStat *) ((char *) (eventProcessor.all_ethreads[i]) + rsb->ethr_stat_offset)) + id;
00232 ink_atomic_swap(&(tlp->sum), (int64_t)0);
00233 ink_atomic_swap(&(tlp->count), (int64_t)0);
00234 }
00235
00236 for (int i = 0; i < eventProcessor.n_dthreads; i++) {
00237 tlp = ((RecRawStat *) ((char *) (eventProcessor.all_dthreads[i]) + rsb->ethr_stat_offset)) + id;
00238 ink_atomic_swap(&(tlp->sum), (int64_t)0);
00239 ink_atomic_swap(&(tlp->count), (int64_t)0);
00240 }
00241
00242 return REC_ERR_OKAY;
00243 }
00244
00245
00246
00247
00248
00249 static int
00250 raw_stat_clear_sum(RecRawStatBlock *rsb, int id)
00251 {
00252 Debug("stats", "raw_stat_clear_sum(): rsb pointer:%p id:%d\n", rsb, id);
00253
00254
00255
00256 ink_mutex_acquire(&(rsb->mutex));
00257 ink_atomic_swap(&(rsb->global[id]->sum), (int64_t)0);
00258 ink_atomic_swap(&(rsb->global[id]->last_sum), (int64_t)0);
00259 ink_mutex_release(&(rsb->mutex));
00260
00261
00262 RecRawStat *tlp;
00263 for (int i = 0; i < eventProcessor.n_ethreads; i++) {
00264 tlp = ((RecRawStat *) ((char *) (eventProcessor.all_ethreads[i]) + rsb->ethr_stat_offset)) + id;
00265 ink_atomic_swap(&(tlp->sum), (int64_t)0);
00266 }
00267
00268 for (int i = 0; i < eventProcessor.n_dthreads; i++) {
00269 tlp = ((RecRawStat *) ((char *) (eventProcessor.all_dthreads[i]) + rsb->ethr_stat_offset)) + id;
00270 ink_atomic_swap(&(tlp->sum), (int64_t)0);
00271 }
00272
00273 return REC_ERR_OKAY;
00274 }
00275
00276
00277
00278
00279
00280 static int
00281 raw_stat_clear_count(RecRawStatBlock *rsb, int id)
00282 {
00283 Debug("stats", "raw_stat_clear_count(): rsb pointer:%p id:%d\n", rsb, id);
00284
00285
00286
00287 ink_mutex_acquire(&(rsb->mutex));
00288 ink_atomic_swap(&(rsb->global[id]->count), (int64_t)0);
00289 ink_atomic_swap(&(rsb->global[id]->last_count), (int64_t)0);
00290 ink_mutex_release(&(rsb->mutex));
00291
00292
00293 RecRawStat *tlp;
00294 for (int i = 0; i < eventProcessor.n_ethreads; i++) {
00295 tlp = ((RecRawStat *) ((char *) (eventProcessor.all_ethreads[i]) + rsb->ethr_stat_offset)) + id;
00296 ink_atomic_swap(&(tlp->count), (int64_t)0);
00297 }
00298
00299 for (int i = 0; i < eventProcessor.n_dthreads; i++) {
00300 tlp = ((RecRawStat *) ((char *) (eventProcessor.all_dthreads[i]) + rsb->ethr_stat_offset)) + id;
00301 ink_atomic_swap(&(tlp->count), (int64_t)0);
00302 }
00303
00304 return REC_ERR_OKAY;
00305 }
00306
00307
00308
00309
00310
00311 static int
00312 recv_message_cb__process(RecMessage *msg, RecMessageT msg_type, void *cookie)
00313 {
00314 int err;
00315
00316 if ((err = recv_message_cb(msg, msg_type, cookie)) == REC_ERR_OKAY) {
00317 if (msg_type == RECG_PULL_ACK) {
00318 g_force_req_notify.lock();
00319 g_force_req_notify.signal();
00320 g_force_req_notify.unlock();
00321 }
00322 }
00323 return err;
00324 }
00325
00326
00327
00328
00329
00330 struct raw_stat_sync_cont: public Continuation
00331 {
00332 raw_stat_sync_cont(ProxyMutex *m)
00333 : Continuation(m)
00334 {
00335 SET_HANDLER(&raw_stat_sync_cont::exec_callbacks);
00336 }
00337
00338 int exec_callbacks(int , Event * )
00339 {
00340 RecExecRawStatSyncCbs();
00341 Debug("statsproc", "raw_stat_sync_cont() processed");
00342
00343 return EVENT_CONT;
00344 }
00345 };
00346
00347
00348
00349
00350
00351 struct config_update_cont: public Continuation
00352 {
00353 config_update_cont(ProxyMutex *m)
00354 : Continuation(m)
00355 {
00356 SET_HANDLER(&config_update_cont::exec_callbacks);
00357 }
00358
00359 int exec_callbacks(int , Event * )
00360 {
00361 RecExecConfigUpdateCbs(REC_PROCESS_UPDATE_REQUIRED);
00362 Debug("statsproc", "config_update_cont() processed");
00363
00364 return EVENT_CONT;
00365 }
00366 };
00367
00368
00369
00370
00371
00372 struct sync_cont: public Continuation
00373 {
00374 textBuffer *m_tb;
00375
00376 sync_cont(ProxyMutex *m)
00377 : Continuation(m)
00378 {
00379 SET_HANDLER(&sync_cont::sync);
00380 m_tb = new textBuffer(65536);
00381 }
00382
00383 ~sync_cont()
00384 {
00385 if (m_tb != NULL) {
00386 delete m_tb;
00387 m_tb = NULL;
00388 }
00389 }
00390
00391 int sync(int , Event * )
00392 {
00393 send_push_message();
00394 RecSyncStatsFile();
00395 if (RecSyncConfigToTB(m_tb) == REC_ERR_OKAY) {
00396 RecWriteConfigFile(m_tb);
00397 }
00398 Debug("statsproc", "sync_cont() processed");
00399
00400 return EVENT_CONT;
00401 }
00402 };
00403
00404
00405
00406
00407
00408 int
00409 RecProcessInit(RecModeT mode_type, Diags *_diags)
00410 {
00411 static bool initialized_p = false;
00412
00413 if (initialized_p) {
00414 return REC_ERR_OKAY;
00415 }
00416
00417 g_mode_type = mode_type;
00418
00419 if (RecCoreInit(mode_type, _diags) == REC_ERR_FAIL) {
00420 return REC_ERR_FAIL;
00421 }
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441
00442 initialized_p = true;
00443
00444 return REC_ERR_OKAY;
00445 }
00446
00447
00448 void
00449 RecMessageInit()
00450 {
00451 ink_assert(g_mode_type != RECM_NULL);
00452 pmgmt->registerMgmtCallback(MGMT_EVENT_LIBRECORDS, RecMessageRecvThis, NULL);
00453 message_initialized_p = true;
00454 }
00455
00456
00457
00458
00459 int
00460 RecProcessInitMessage(RecModeT mode_type)
00461 {
00462 static bool initialized_p = false;
00463
00464 if (initialized_p) {
00465 return REC_ERR_OKAY;
00466 }
00467
00468 RecMessageInit();
00469 if (RecMessageRegisterRecvCb(recv_message_cb__process, NULL)) {
00470 return REC_ERR_FAIL;
00471 }
00472
00473 if (mode_type == RECM_CLIENT) {
00474 send_pull_message(RECG_PULL_REQ);
00475 g_force_req_notify.lock();
00476 g_force_req_notify.wait();
00477 g_force_req_notify.unlock();
00478 }
00479
00480 initialized_p = true;
00481
00482 return REC_ERR_OKAY;
00483 }
00484
00485
00486
00487
00488
00489 int
00490 RecProcessStart(void)
00491 {
00492 if (g_started) {
00493 return REC_ERR_OKAY;
00494 }
00495
00496 Debug("statsproc", "Starting sync continuations:");
00497 raw_stat_sync_cont *rssc = new raw_stat_sync_cont(new_ProxyMutex());
00498 Debug("statsproc", "\traw-stat syncer");
00499 raw_stat_sync_cont_event = eventProcessor.schedule_every(rssc, HRTIME_MSECONDS(g_rec_raw_stat_sync_interval_ms), ET_TASK);
00500
00501 config_update_cont *cuc = new config_update_cont(new_ProxyMutex());
00502 Debug("statsproc", "\tconfig syncer");
00503 config_update_cont_event = eventProcessor.schedule_every(cuc, HRTIME_MSECONDS(g_rec_config_update_interval_ms), ET_TASK);
00504
00505 sync_cont *sc = new sync_cont(new_ProxyMutex());
00506 Debug("statsproc", "\tremote syncer");
00507 sync_cont_event = eventProcessor.schedule_every(sc, HRTIME_MSECONDS(g_rec_remote_sync_interval_ms), ET_TASK);
00508
00509 g_started = true;
00510
00511 return REC_ERR_OKAY;
00512 }
00513
00514
00515
00516
00517
00518 RecRawStatBlock *
00519 RecAllocateRawStatBlock(int num_stats)
00520 {
00521 off_t ethr_stat_offset;
00522 RecRawStatBlock *rsb;
00523
00524
00525 if ((ethr_stat_offset = eventProcessor.allocate(num_stats * sizeof(RecRawStat))) == -1) {
00526 return NULL;
00527 }
00528
00529 rsb = (RecRawStatBlock *)ats_malloc(sizeof(RecRawStatBlock));
00530 memset(rsb, 0, sizeof(RecRawStatBlock));
00531 rsb->ethr_stat_offset = ethr_stat_offset;
00532 rsb->global = (RecRawStat **)ats_malloc(num_stats * sizeof(RecRawStat *));
00533 memset(rsb->global, 0, num_stats * sizeof(RecRawStat *));
00534 rsb->num_stats = 0;
00535 rsb->max_stats = num_stats;
00536 ink_mutex_init(&(rsb->mutex),"net stat mutex");
00537 return rsb;
00538 }
00539
00540
00541
00542
00543
00544 int
00545 _RecRegisterRawStat(RecRawStatBlock *rsb, RecT rec_type, const char *name, RecDataT data_type, RecPersistT persist_type, int id,
00546 RecRawStatSyncCb sync_cb)
00547 {
00548 Debug("stats", "RecRawStatSyncCb(%s): rsb pointer:%p id:%d\n", name, rsb, id);
00549
00550
00551 ink_assert(id < rsb->max_stats);
00552
00553 int err = REC_ERR_OKAY;
00554
00555 RecRecord *r;
00556 RecData data_default;
00557 memset(&data_default, 0, sizeof(RecData));
00558
00559
00560 if ((r = RecRegisterStat(rec_type, name, data_type, data_default, persist_type)) == NULL) {
00561 err = REC_ERR_FAIL;
00562 goto Ldone;
00563 }
00564 r->rsb_id = id;
00565 if (i_am_the_record_owner(r->rec_type)) {
00566 r->sync_required = r->sync_required | REC_PEER_SYNC_REQUIRED;
00567 } else {
00568 send_register_message(r);
00569 }
00570
00571
00572 rsb->global[id] = &(r->stat_meta.data_raw);
00573 rsb->global[id]->last_sum = 0;
00574 rsb->global[id]->last_count = 0;
00575
00576
00577 RecRegisterRawStatSyncCb(name, sync_cb, rsb, id);
00578
00579 Ldone:
00580 return err;
00581 }
00582
00583
00584
00585
00586
00587
00588
00589
00590 int
00591 RecRawStatSyncSum(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00592 {
00593 RecRawStat total;
00594
00595 Debug("stats", "raw sync:sum for %s", name);
00596 raw_stat_sync_to_global(rsb, id);
00597 total.sum = rsb->global[id]->sum;
00598 total.count = rsb->global[id]->count;
00599 RecDataSetFromInk64(data_type, data, total.sum);
00600
00601 return REC_ERR_OKAY;
00602 }
00603
00604 int
00605 RecRawStatSyncCount(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00606 {
00607 RecRawStat total;
00608
00609 Debug("stats", "raw sync:count for %s", name);
00610 raw_stat_sync_to_global(rsb, id);
00611 total.sum = rsb->global[id]->sum;
00612 total.count = rsb->global[id]->count;
00613 RecDataSetFromInk64(data_type, data, total.count);
00614
00615 return REC_ERR_OKAY;
00616 }
00617
00618 int
00619 RecRawStatSyncAvg(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00620 {
00621 RecRawStat total;
00622 RecFloat avg = 0.0f;
00623
00624 Debug("stats", "raw sync:avg for %s", name);
00625 raw_stat_sync_to_global(rsb, id);
00626 total.sum = rsb->global[id]->sum;
00627 total.count = rsb->global[id]->count;
00628 if (total.count != 0)
00629 avg = (float) ((double) total.sum / (double) total.count);
00630 RecDataSetFromFloat(data_type, data, avg);
00631 return REC_ERR_OKAY;
00632 }
00633
00634 int
00635 RecRawStatSyncHrTimeAvg(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00636 {
00637 RecRawStat total;
00638 RecFloat r;
00639
00640 Debug("stats", "raw sync:hr-timeavg for %s", name);
00641 raw_stat_sync_to_global(rsb, id);
00642 total.sum = rsb->global[id]->sum;
00643 total.count = rsb->global[id]->count;
00644 if (total.count == 0) {
00645 r = 0.0f;
00646 } else {
00647 r = (float) ((double) total.sum / (double) total.count);
00648 r = r / (float) (HRTIME_SECOND);
00649 }
00650 RecDataSetFromFloat(data_type, data, r);
00651 return REC_ERR_OKAY;
00652 }
00653
00654 int
00655 RecRawStatSyncIntMsecsToFloatSeconds(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00656 {
00657 RecRawStat total;
00658 RecFloat r;
00659
00660 Debug("stats", "raw sync:seconds for %s", name);
00661 raw_stat_sync_to_global(rsb, id);
00662 total.sum = rsb->global[id]->sum;
00663 total.count = rsb->global[id]->count;
00664 if (total.count == 0) {
00665 r = 0.0f;
00666 } else {
00667 r = (float) ((double) total.sum / 1000);
00668 }
00669 RecDataSetFromFloat(data_type, data, r);
00670 return REC_ERR_OKAY;
00671 }
00672
00673 int
00674 RecRawStatSyncMHrTimeAvg(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00675 {
00676 RecRawStat total;
00677 RecFloat r;
00678
00679 Debug("stats", "raw sync:mhr-timeavg for %s", name);
00680 raw_stat_sync_to_global(rsb, id);
00681 total.sum = rsb->global[id]->sum;
00682 total.count = rsb->global[id]->count;
00683 if (total.count == 0) {
00684 r = 0.0f;
00685 } else {
00686 r = (float) ((double) total.sum / (double) total.count);
00687 r = r / (float) (HRTIME_MSECOND);
00688 }
00689 RecDataSetFromFloat(data_type, data, r);
00690 return REC_ERR_OKAY;
00691 }
00692
00693
00694
00695
00696
00697 int
00698 RecIncrRawStatBlock(RecRawStatBlock *, EThread *,
00699 RecRawStat *)
00700 {
00701 return REC_ERR_FAIL;
00702 }
00703
00704
00705
00706
00707
00708 int
00709 RecSetRawStatSum(RecRawStatBlock *rsb, int id, int64_t data)
00710 {
00711 raw_stat_clear_sum(rsb, id);
00712 ink_atomic_swap(&(rsb->global[id]->sum), data);
00713 return REC_ERR_OKAY;
00714 }
00715
00716 int
00717 RecSetRawStatCount(RecRawStatBlock *rsb, int id, int64_t data)
00718 {
00719 raw_stat_clear_count(rsb, id);
00720 ink_atomic_swap(&(rsb->global[id]->count), data);
00721 return REC_ERR_OKAY;
00722 }
00723
00724 int
00725 RecSetRawStatBlock(RecRawStatBlock *, RecRawStat *)
00726 {
00727 return REC_ERR_FAIL;
00728 }
00729
00730
00731
00732
00733
00734
00735 int
00736 RecGetRawStatSum(RecRawStatBlock *rsb, int id, int64_t *data)
00737 {
00738 RecRawStat total;
00739
00740 raw_stat_get_total(rsb, id, &total);
00741 *data = total.sum;
00742 return REC_ERR_OKAY;
00743 }
00744
00745 int
00746 RecGetRawStatCount(RecRawStatBlock *rsb, int id, int64_t *data)
00747 {
00748 RecRawStat total;
00749
00750 raw_stat_get_total(rsb, id, &total);
00751 *data = total.count;
00752 return REC_ERR_OKAY;
00753 }
00754
00755
00756
00757
00758
00759 int
00760 RecIncrGlobalRawStat(RecRawStatBlock *rsb, int id, int64_t incr)
00761 {
00762 ink_atomic_increment(&(rsb->global[id]->sum), incr);
00763 ink_atomic_increment(&(rsb->global[id]->count), 1);
00764 return REC_ERR_OKAY;
00765 }
00766
00767 int
00768 RecIncrGlobalRawStatSum(RecRawStatBlock *rsb, int id, int64_t incr)
00769 {
00770 ink_atomic_increment(&(rsb->global[id]->sum), incr);
00771 return REC_ERR_OKAY;
00772 }
00773
00774 int
00775 RecIncrGlobalRawStatCount(RecRawStatBlock *rsb, int id, int64_t incr)
00776 {
00777 ink_atomic_increment(&(rsb->global[id]->count), incr);
00778 return REC_ERR_OKAY;
00779 }
00780
00781
00782
00783
00784
00785 int
00786 RecSetGlobalRawStatSum(RecRawStatBlock *rsb, int id, int64_t data)
00787 {
00788 ink_atomic_swap(&(rsb->global[id]->sum), data);
00789 return REC_ERR_OKAY;
00790 }
00791
00792 int
00793 RecSetGlobalRawStatCount(RecRawStatBlock *rsb, int id, int64_t data)
00794 {
00795 ink_atomic_swap(&(rsb->global[id]->count), data);
00796 return REC_ERR_OKAY;
00797 }
00798
00799
00800
00801
00802
00803 int
00804 RecGetGlobalRawStatSum(RecRawStatBlock *rsb, int id, int64_t *data)
00805 {
00806 *data = rsb->global[id]->sum;
00807 return REC_ERR_OKAY;
00808 }
00809
00810 int
00811 RecGetGlobalRawStatCount(RecRawStatBlock *rsb, int id, int64_t *data)
00812 {
00813 *data = rsb->global[id]->count;
00814 return REC_ERR_OKAY;
00815 }
00816
00817
00818
00819
00820
00821 RecRawStat *
00822 RecGetGlobalRawStatPtr(RecRawStatBlock *rsb, int id)
00823 {
00824 return rsb->global[id];
00825 }
00826
00827 int64_t *
00828 RecGetGlobalRawStatSumPtr(RecRawStatBlock *rsb, int id)
00829 {
00830 return &(rsb->global[id]->sum);
00831 }
00832
00833 int64_t *
00834 RecGetGlobalRawStatCountPtr(RecRawStatBlock *rsb, int id)
00835 {
00836 return &(rsb->global[id]->count);
00837 }
00838
00839
00840
00841
00842
00843 int
00844 RecRegisterRawStatSyncCb(const char *name, RecRawStatSyncCb sync_cb, RecRawStatBlock *rsb, int id)
00845 {
00846 int err = REC_ERR_FAIL;
00847 RecRecord *r;
00848
00849 ink_rwlock_rdlock(&g_records_rwlock);
00850 if (ink_hash_table_lookup(g_records_ht, name, (void **) &r)) {
00851 rec_mutex_acquire(&(r->lock));
00852 if (REC_TYPE_IS_STAT(r->rec_type)) {
00853 if (!(r->stat_meta.sync_cb)) {
00854 r->stat_meta.sync_rsb = rsb;
00855 r->stat_meta.sync_id = id;
00856 r->stat_meta.sync_cb = sync_cb;
00857 r->stat_meta.sync_rsb->global[r->stat_meta.sync_id]->version = r->version;
00858 err = REC_ERR_OKAY;
00859 } else {
00860 ink_release_assert(false);
00861 }
00862 }
00863 rec_mutex_release(&(r->lock));
00864 }
00865 ink_rwlock_unlock(&g_records_rwlock);
00866
00867 return err;
00868 }
00869
00870
00871
00872
00873
00874 int
00875 RecExecRawStatSyncCbs()
00876 {
00877 RecRecord *r;
00878 int i, num_records;
00879
00880 num_records = g_num_records;
00881 for (i = 0; i < num_records; i++) {
00882 r = &(g_records[i]);
00883 rec_mutex_acquire(&(r->lock));
00884 if (REC_TYPE_IS_STAT(r->rec_type)) {
00885 if (r->stat_meta.sync_cb) {
00886 if (r->version && r->version != r->stat_meta.sync_rsb->global[r->stat_meta.sync_id]->version) {
00887 raw_stat_clear(r->stat_meta.sync_rsb, r->stat_meta.sync_id);
00888 r->stat_meta.sync_rsb->global[r->stat_meta.sync_id]->version = r->version;
00889 } else {
00890 (*(r->stat_meta.sync_cb)) (r->name, r->data_type, &(r->data), r->stat_meta.sync_rsb, r->stat_meta.sync_id);
00891 }
00892 r->sync_required = REC_SYNC_REQUIRED;
00893 }
00894 }
00895 rec_mutex_release(&(r->lock));
00896 }
00897
00898 return REC_ERR_OKAY;
00899 }
00900
00901 void
00902 RecSignalManager(int id, const char * msg, size_t msgsize)
00903 {
00904 ink_assert(pmgmt);
00905 pmgmt->signalManager(id, msg, msgsize);
00906 }
00907
00908 int
00909 RecRegisterManagerCb(int _signal, RecManagerCb _fn, void *_data)
00910 {
00911 return pmgmt->registerMgmtCallback(_signal, _fn, _data);
00912 }
00913
00914
00915
00916
00917
00918 int
00919 RecMessageSend(RecMessage * msg)
00920 {
00921 int msg_size;
00922
00923 if (!message_initialized_p)
00924 return REC_ERR_OKAY;
00925
00926
00927 if (g_mode_type == RECM_CLIENT || g_mode_type == RECM_SERVER) {
00928 msg->o_end = msg->o_write;
00929 msg_size = sizeof(RecMessageHdr) + (msg->o_write - msg->o_start);
00930 pmgmt->signalManager(MGMT_SIGNAL_LIBRECORDS, (char *) msg, msg_size);
00931 }
00932
00933 return REC_ERR_OKAY;
00934 }
00935