• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

P_RecCore.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   Private record core definitions
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 #include "TextBuffer.h"
00025 #include "Tokenizer.h"
00026 #include "ink_defs.h"
00027 #include "ink_string.h"
00028 
00029 #include "P_RecFile.h"
00030 #include "P_RecUtils.h"
00031 #include "P_RecMessage.h"
00032 #include "P_RecCore.h"
00033 
00034 RecModeT g_mode_type = RECM_NULL;
00035 
00036 
00037 //-------------------------------------------------------------------------
00038 // send_reset_message
00039 //-------------------------------------------------------------------------
00040 static int
00041 send_reset_message(RecRecord * record)
00042 {
00043   RecMessage *m;
00044 
00045   rec_mutex_acquire(&(record->lock));
00046   m = RecMessageAlloc(RECG_RESET);
00047   m = RecMessageMarshal_Realloc(m, record);
00048   RecDebug(DL_Note, "[send] RECG_RESET [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
00049   RecMessageSend(m);
00050   RecMessageFree(m);
00051   rec_mutex_release(&(record->lock));
00052 
00053   return REC_ERR_OKAY;
00054 }
00055 
00056 
00057 //-------------------------------------------------------------------------
00058 // send_set_message
00059 //-------------------------------------------------------------------------
00060 static int
00061 send_set_message(RecRecord * record)
00062 {
00063   RecMessage *m;
00064 
00065   rec_mutex_acquire(&(record->lock));
00066   m = RecMessageAlloc(RECG_SET);
00067   m = RecMessageMarshal_Realloc(m, record);
00068   RecDebug(DL_Note, "[send] RECG_SET [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
00069   RecMessageSend(m);
00070   RecMessageFree(m);
00071   rec_mutex_release(&(record->lock));
00072 
00073   return REC_ERR_OKAY;
00074 }
00075 
00076 
00077 //-------------------------------------------------------------------------
00078 // send_register_message
00079 //-------------------------------------------------------------------------
00080 int
00081 send_register_message(RecRecord * record)
00082 {
00083   RecMessage *m;
00084 
00085   rec_mutex_acquire(&(record->lock));
00086   m = RecMessageAlloc(RECG_REGISTER);
00087   m = RecMessageMarshal_Realloc(m, record);
00088   RecDebug(DL_Note, "[send] RECG_REGISTER [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
00089   RecMessageSend(m);
00090   RecMessageFree(m);
00091   rec_mutex_release(&(record->lock));
00092 
00093   return REC_ERR_OKAY;
00094 }
00095 
00096 
00097 //-------------------------------------------------------------------------
00098 // send_push_message
00099 //-------------------------------------------------------------------------
00100 int
00101 send_push_message()
00102 {
00103   RecRecord *r;
00104   RecMessage *m;
00105   int i, num_records;
00106   bool send_msg = false;
00107 
00108   m = RecMessageAlloc(RECG_PUSH);
00109   num_records = g_num_records;
00110   for (i = 0; i < num_records; i++) {
00111     r = &(g_records[i]);
00112     rec_mutex_acquire(&(r->lock));
00113     if (i_am_the_record_owner(r->rec_type)) {
00114       if (r->sync_required & REC_PEER_SYNC_REQUIRED) {
00115         m = RecMessageMarshal_Realloc(m, r);
00116         r->sync_required &= ~REC_PEER_SYNC_REQUIRED;
00117         send_msg = true;
00118       }
00119     }
00120     rec_mutex_release(&(r->lock));
00121   }
00122   if (send_msg) {
00123     RecDebug(DL_Note, "[send] RECG_PUSH [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
00124     RecMessageSend(m);
00125   }
00126   RecMessageFree(m);
00127 
00128   return REC_ERR_OKAY;
00129 }
00130 
00131 
00132 //-------------------------------------------------------------------------
00133 // send_pull_message
00134 //-------------------------------------------------------------------------
00135 int
00136 send_pull_message(RecMessageT msg_type)
00137 {
00138   RecRecord *r;
00139   RecMessage *m;
00140   int i, num_records;
00141 
00142   m = RecMessageAlloc(msg_type);
00143   switch (msg_type) {
00144 
00145   case RECG_PULL_REQ:
00146     // We're requesting all of the records from our peer.  No payload
00147     // here, just send the message.
00148     RecDebug(DL_Note, "[send] RECG_PULL_REQ [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
00149     break;
00150 
00151   case RECG_PULL_ACK:
00152     // Respond to a RECG_PULL_REQ message from our peer.  Send ALL
00153     // records!  Also be sure to send a response even if it has no
00154     // payload.  Our peer may be blocking and waiting for a response!
00155     num_records = g_num_records;
00156     for (i = 0; i < num_records; i++) {
00157       r = &(g_records[i]);
00158       if (i_am_the_record_owner(r->rec_type) ||
00159           (REC_TYPE_IS_STAT(r->rec_type) && !(r->registered)) ||
00160           (REC_TYPE_IS_STAT(r->rec_type) && (r->stat_meta.persist_type == RECP_NON_PERSISTENT))) {
00161         rec_mutex_acquire(&(r->lock));
00162         m = RecMessageMarshal_Realloc(m, r);
00163         r->sync_required &= ~REC_PEER_SYNC_REQUIRED;
00164         rec_mutex_release(&(r->lock));
00165       }
00166     }
00167     RecDebug(DL_Note, "[send] RECG_PULL_ACK [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
00168     break;
00169 
00170   default:
00171     RecMessageFree(m);
00172     return REC_ERR_FAIL;
00173 
00174   }
00175 
00176   RecMessageSend(m);
00177   RecMessageFree(m);
00178 
00179   return REC_ERR_OKAY;
00180 }
00181 
00182 
00183 //-------------------------------------------------------------------------
00184 // recv_message_cb
00185 //-------------------------------------------------------------------------
00186 int
00187 recv_message_cb(RecMessage * msg, RecMessageT msg_type, void */* cookie */)
00188 {
00189   RecRecord *r;
00190   RecMessageItr itr;
00191 
00192   switch (msg_type) {
00193 
00194   case RECG_SET:
00195 
00196     RecDebug(DL_Note, "[recv] RECG_SET [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
00197     if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
00198       do {
00199         if (REC_TYPE_IS_STAT(r->rec_type)) {
00200           RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), &(r->stat_meta.data_raw));
00201         } else {
00202           RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), NULL);
00203         }
00204       } while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
00205     }
00206     break;
00207 
00208   case RECG_RESET:
00209 
00210     RecDebug(DL_Note, "[recv] RECG_RESET [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
00211     if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
00212       do {
00213         if (REC_TYPE_IS_STAT(r->rec_type)) {
00214           RecResetStatRecord(r->name);
00215         } else {
00216           RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), NULL);
00217         }
00218       } while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
00219     }
00220     break;
00221 
00222   case RECG_REGISTER:
00223     RecDebug(DL_Note, "[recv] RECG_REGISTER [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
00224     if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
00225       do {
00226         if (REC_TYPE_IS_STAT(r->rec_type)) {
00227           RecRegisterStat(r->rec_type, r->name, r->data_type, r->data_default, r->stat_meta.persist_type);
00228         } else if (REC_TYPE_IS_CONFIG(r->rec_type)) {
00229           RecRegisterConfig(r->rec_type, r->name, r->data_type,
00230                             r->data_default, r->config_meta.update_type,
00231                             r->config_meta.check_type, r->config_meta.check_expr, r->config_meta.access_type);
00232         }
00233       } while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
00234     }
00235     break;
00236 
00237   case RECG_PUSH:
00238     RecDebug(DL_Note, "[recv] RECG_PUSH [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
00239     if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
00240       do {
00241         RecForceInsert(r);
00242       } while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
00243     }
00244     break;
00245 
00246   case RECG_PULL_ACK:
00247     RecDebug(DL_Note, "[recv] RECG_PULL_ACK [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
00248     if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
00249       do {
00250         RecForceInsert(r);
00251       } while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
00252     }
00253     break;
00254 
00255   case RECG_PULL_REQ:
00256     RecDebug(DL_Note, "[recv] RECG_PULL_REQ [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
00257     send_pull_message(RECG_PULL_ACK);
00258     break;
00259 
00260   default:
00261     ink_assert(!"Unexpected RecG type");
00262     return REC_ERR_FAIL;
00263 
00264   }
00265 
00266   return REC_ERR_OKAY;
00267 }
00268 
00269 
00270 //-------------------------------------------------------------------------
00271 // RecRegisterStatXXX
00272 //-------------------------------------------------------------------------
00273 #define REC_REGISTER_STAT_XXX(A, B) \
00274   ink_assert((rec_type == RECT_NODE)    || \
00275                    (rec_type == RECT_CLUSTER) || \
00276                    (rec_type == RECT_PROCESS) || \
00277                    (rec_type == RECT_LOCAL)   || \
00278                    (rec_type == RECT_PLUGIN));   \
00279   RecRecord *r; \
00280   RecData my_data_default; \
00281   my_data_default.A = data_default; \
00282   if ((r = RecRegisterStat(rec_type, name, B, my_data_default, \
00283                            persist_type)) != NULL) { \
00284     if (i_am_the_record_owner(r->rec_type)) { \
00285       r->sync_required = r->sync_required | REC_PEER_SYNC_REQUIRED; \
00286     } else { \
00287       send_register_message(r); \
00288     } \
00289     return REC_ERR_OKAY; \
00290   } else { \
00291     return REC_ERR_FAIL; \
00292   }
00293 
00294 int
00295 _RecRegisterStatInt(RecT rec_type, const char *name, RecInt data_default, RecPersistT persist_type)
00296 {
00297   REC_REGISTER_STAT_XXX(rec_int, RECD_INT);
00298 }
00299 
00300 int
00301 _RecRegisterStatFloat(RecT rec_type, const char *name, RecFloat data_default, RecPersistT persist_type)
00302 {
00303   REC_REGISTER_STAT_XXX(rec_float, RECD_FLOAT);
00304 }
00305 
00306 int
00307 _RecRegisterStatString(RecT rec_type, const char *name, RecString data_default, RecPersistT persist_type)
00308 {
00309   REC_REGISTER_STAT_XXX(rec_string, RECD_STRING);
00310 }
00311 
00312 int
00313 _RecRegisterStatCounter(RecT rec_type, const char *name, RecCounter data_default, RecPersistT persist_type)
00314 {
00315   REC_REGISTER_STAT_XXX(rec_counter, RECD_COUNTER);
00316 }
00317 
00318 
00319 //-------------------------------------------------------------------------
00320 // RecRegisterConfigXXX
00321 //-------------------------------------------------------------------------
00322 #define REC_REGISTER_CONFIG_XXX(A, B) \
00323   RecRecord *r; \
00324   RecData my_data_default; \
00325   my_data_default.A = data_default; \
00326   if ((r = RecRegisterConfig(rec_type, name, B, my_data_default, \
00327                              update_type, check_type,              \
00328                              check_regex, access_type)) != NULL) { \
00329     if (i_am_the_record_owner(r->rec_type)) { \
00330       r->sync_required = r->sync_required | REC_PEER_SYNC_REQUIRED; \
00331     } else { \
00332       send_register_message(r); \
00333     } \
00334     return REC_ERR_OKAY; \
00335   } else { \
00336     return REC_ERR_FAIL; \
00337   }
00338 
00339 int
00340 RecRegisterConfigInt(RecT rec_type, const char *name,
00341                      RecInt data_default, RecUpdateT update_type,
00342                      RecCheckT check_type, const char *check_regex, RecAccessT access_type)
00343 {
00344   ink_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
00345   REC_REGISTER_CONFIG_XXX(rec_int, RECD_INT);
00346 }
00347 
00348 int
00349 RecRegisterConfigFloat(RecT rec_type, const char *name,
00350                        RecFloat data_default, RecUpdateT update_type,
00351                        RecCheckT check_type, const char *check_regex, RecAccessT access_type)
00352 {
00353   ink_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
00354   REC_REGISTER_CONFIG_XXX(rec_float, RECD_FLOAT);
00355 }
00356 
00357 
00358 int
00359 RecRegisterConfigString(RecT rec_type, const char *name,
00360                         const char *data_default_tmp, RecUpdateT update_type,
00361                         RecCheckT check_type, const char *check_regex, RecAccessT access_type)
00362 {
00363   RecString data_default = (RecString)data_default_tmp;
00364   ink_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
00365   REC_REGISTER_CONFIG_XXX(rec_string, RECD_STRING);
00366 }
00367 
00368 int
00369 RecRegisterConfigCounter(RecT rec_type, const char *name,
00370                          RecCounter data_default, RecUpdateT update_type,
00371                          RecCheckT check_type, const char *check_regex, RecAccessT access_type)
00372 {
00373   ink_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
00374   REC_REGISTER_CONFIG_XXX(rec_counter, RECD_COUNTER);
00375 }
00376 
00377 
00378 //-------------------------------------------------------------------------
00379 // RecSetRecordXXX
00380 //-------------------------------------------------------------------------
00381 int
00382 RecSetRecord(RecT rec_type, const char *name, RecDataT data_type, RecData *data, RecRawStat *data_raw, bool lock, bool inc_version)
00383 {
00384   int err = REC_ERR_OKAY;
00385   RecRecord *r1;
00386 
00387   // FIXME: Most of the time we set, we don't actually need to wrlock
00388   // since we are not modifying the g_records_ht.
00389   if (lock) {
00390     ink_rwlock_wrlock(&g_records_rwlock);
00391   }
00392 
00393   if (ink_hash_table_lookup(g_records_ht, name, (void **) &r1)) {
00394     if (i_am_the_record_owner(r1->rec_type)) {
00395       rec_mutex_acquire(&(r1->lock));
00396       if ((data_type != RECD_NULL) && (r1->data_type != data_type)) {
00397         err = REC_ERR_FAIL;
00398       } else {
00399         if (data_type == RECD_NULL) {
00400           ink_assert(data->rec_string);
00401           switch (r1->data_type) {
00402           case RECD_INT:
00403             r1->data.rec_int = ink_atoi64(data->rec_string);
00404             data_type = RECD_INT;
00405             break;
00406           case RECD_FLOAT:
00407             r1->data.rec_float = atof(data->rec_string);
00408             data_type = RECD_FLOAT;
00409             break;
00410           case RECD_STRING:
00411             data_type = RECD_STRING;
00412             r1->data.rec_string = data->rec_string;
00413             break;
00414           case RECD_COUNTER:
00415             r1->data.rec_int = ink_atoi64(data->rec_string);
00416             data_type = RECD_COUNTER;
00417             break;
00418           default:
00419             err = REC_ERR_FAIL;
00420             break;
00421           }
00422         }
00423 
00424         if (RecDataSet(data_type, &(r1->data), data)) {
00425           r1->sync_required = REC_SYNC_REQUIRED;
00426           if (inc_version) {
00427             r1->sync_required |= REC_INC_CONFIG_VERSION;
00428           }
00429           if (REC_TYPE_IS_CONFIG(r1->rec_type)) {
00430             r1->config_meta.update_required = REC_UPDATE_REQUIRED;
00431           }
00432         }
00433         if (REC_TYPE_IS_STAT(r1->rec_type) && (data_raw != NULL)) {
00434           r1->stat_meta.data_raw = *data_raw;
00435         }
00436       }
00437       rec_mutex_release(&(r1->lock));
00438     } else {
00439       // We don't need to ats_strdup() here as we will make copies of any
00440       // strings when we marshal them into our RecMessage buffer.
00441       RecRecord r2;
00442 
00443       RecRecordInit(&r2);
00444       r2.rec_type = rec_type;
00445       r2.name = name;
00446       r2.data_type = (data_type != RECD_NULL) ? data_type : r1->data_type;
00447       r2.data = *data;
00448       if (REC_TYPE_IS_STAT(r2.rec_type) && (data_raw != NULL)) {
00449         r2.stat_meta.data_raw = *data_raw;
00450       }
00451       err = send_set_message(&r2);
00452       RecRecordFree(&r2);
00453     }
00454   } else {
00455     // Add the record but do not set the 'registered' flag, as this
00456     // record really hasn't been registered yet.  Also, in order to
00457     // add the record, we need to have a rec_type, so if the user
00458     // calls RecSetRecord on a record we haven't registered yet, we
00459     // should fail out here.
00460     if ((rec_type == RECT_NULL) || (data_type == RECD_NULL)) {
00461       err = REC_ERR_FAIL;
00462       goto Ldone;
00463     }
00464     r1 = RecAlloc(rec_type, name, data_type);
00465     RecDataSet(data_type, &(r1->data), data);
00466     if (REC_TYPE_IS_STAT(r1->rec_type) && (data_raw != NULL)) {
00467       r1->stat_meta.data_raw = *data_raw;
00468     }
00469     if (i_am_the_record_owner(r1->rec_type)) {
00470       r1->sync_required = r1->sync_required | REC_PEER_SYNC_REQUIRED;
00471     } else {
00472       err = send_set_message(r1);
00473     }
00474     ink_hash_table_insert(g_records_ht, name, (void *) r1);
00475 
00476   }
00477 
00478 Ldone:
00479   if (lock) {
00480     ink_rwlock_unlock(&g_records_rwlock);
00481   }
00482 
00483   return err;
00484 }
00485 
00486 int
00487 RecSetRecordConvert(const char *name, const RecString rec_string, bool lock, bool inc_version)
00488 {
00489   RecData data;
00490   data.rec_string = rec_string;
00491   return RecSetRecord(RECT_NULL, name, RECD_NULL, &data, NULL, lock, inc_version);
00492 }
00493 
00494 int
00495 RecSetRecordInt(const char *name, RecInt rec_int, bool lock, bool inc_version)
00496 {
00497   RecData data;
00498   data.rec_int = rec_int;
00499   return RecSetRecord(RECT_NULL, name, RECD_INT, &data, NULL, lock, inc_version);
00500 }
00501 
00502 int
00503 RecSetRecordFloat(const char *name, RecFloat rec_float, bool lock, bool inc_version)
00504 {
00505   RecData data;
00506   data.rec_float = rec_float;
00507   return RecSetRecord(RECT_NULL, name, RECD_FLOAT, &data, NULL, lock, inc_version);
00508 }
00509 
00510 int
00511 RecSetRecordString(const char *name, const RecString rec_string, bool lock, bool inc_version)
00512 {
00513   RecData data;
00514   data.rec_string = rec_string;
00515   return RecSetRecord(RECT_NULL, name, RECD_STRING, &data, NULL, lock, inc_version);
00516 }
00517 
00518 int
00519 RecSetRecordCounter(const char *name, RecCounter rec_counter, bool lock, bool inc_version)
00520 {
00521   RecData data;
00522   data.rec_counter = rec_counter;
00523   return RecSetRecord(RECT_NULL, name, RECD_COUNTER, &data, NULL, lock, inc_version);
00524 }
00525 
00526 
00527 //-------------------------------------------------------------------------
00528 // RecReadStatsFile
00529 //-------------------------------------------------------------------------
00530 int
00531 RecReadStatsFile()
00532 {
00533   RecRecord *r;
00534   RecMessage *m;
00535   RecMessageItr itr;
00536   RecPersistT persist_type = RECP_NULL;
00537   ats_scoped_str snap_fpath(RecConfigReadPersistentStatsPath());
00538 
00539   // lock our hash table
00540   ink_rwlock_wrlock(&g_records_rwlock);
00541 
00542   if ((m = RecMessageReadFromDisk(snap_fpath)) != NULL) {
00543     if (RecMessageUnmarshalFirst(m, &itr, &r) != REC_ERR_FAIL) {
00544       do {
00545         if ((r->name == NULL) || (!strlen(r->name))) {
00546           continue;
00547         }
00548 
00549         // If we don't have a persistence type for this record, it means that it is not a stat, or it is
00550         // not registered yet. Either way, it's ok to just set the persisted value and keep going.
00551         if (RecGetRecordPersistenceType(r->name, &persist_type, false /* lock */) != REC_ERR_OKAY) {
00552           RecDebug(DL_Debug, "restoring value for persisted stat '%s'", r->name);
00553           RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), &(r->stat_meta.data_raw), false);
00554           continue;
00555         }
00556 
00557         if (!REC_TYPE_IS_STAT(r->rec_type)) {
00558           // This should not happen, but be defensive against records changing their type ..
00559           RecLog(DL_Warning, "skipping restore of non-stat record '%s'", r->name);
00560           continue;
00561         }
00562 
00563         // Check whether the persistence type was changed by a new software version. If the record is
00564         // already registered with an updated persistence type, then we don't want to set it. We should
00565         // keep the registered value.
00566         if (persist_type == RECP_NON_PERSISTENT) {
00567           RecDebug(DL_Debug, "preserving current value of formerly persistent stat '%s'", r->name);
00568           continue;
00569         }
00570 
00571         RecDebug(DL_Debug, "restoring value for persisted stat '%s'", r->name);
00572         RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), &(r->stat_meta.data_raw), false);
00573       } while (RecMessageUnmarshalNext(m, &itr, &r) != REC_ERR_FAIL);
00574     }
00575   }
00576 
00577   ink_rwlock_unlock(&g_records_rwlock);
00578   ats_free(m);
00579 
00580   return REC_ERR_OKAY;
00581 }
00582 
00583 
00584 //-------------------------------------------------------------------------
00585 // RecSyncStatsFile
00586 //-------------------------------------------------------------------------
00587 int
00588 RecSyncStatsFile()
00589 {
00590   RecRecord *r;
00591   RecMessage *m;
00592   int i, num_records;
00593   bool sync_to_disk;
00594   ats_scoped_str snap_fpath(RecConfigReadPersistentStatsPath());
00595 
00596   /*
00597    * g_mode_type should be initialized by
00598    * RecLocalInit() or RecProcessInit() earlier.
00599    */
00600   ink_assert(g_mode_type != RECM_NULL);
00601 
00602   if (g_mode_type == RECM_SERVER || g_mode_type == RECM_STAND_ALONE) {
00603     m = RecMessageAlloc(RECG_NULL);
00604     num_records = g_num_records;
00605     sync_to_disk = false;
00606     for (i = 0; i < num_records; i++) {
00607       r = &(g_records[i]);
00608       rec_mutex_acquire(&(r->lock));
00609       if (REC_TYPE_IS_STAT(r->rec_type)) {
00610         if (r->stat_meta.persist_type == RECP_PERSISTENT) {
00611           m = RecMessageMarshal_Realloc(m, r);
00612           sync_to_disk = true;
00613         }
00614       }
00615       rec_mutex_release(&(r->lock));
00616     }
00617     if (sync_to_disk) {
00618       RecDebug(DL_Note, "Writing '%s' [%d bytes]", (const char *)snap_fpath, m->o_write - m->o_start + sizeof(RecMessageHdr));
00619       RecMessageWriteToDisk(m, snap_fpath);
00620     }
00621     RecMessageFree(m);
00622   }
00623 
00624   return REC_ERR_OKAY;
00625 }
00626 
00627 // Consume a parsed record, pushing it into the records hash table.
00628 static void
00629 RecConsumeConfigEntry(RecT rec_type, RecDataT data_type, const char * name, const char * value, bool inc_version)
00630 {
00631     RecData data;
00632 
00633     memset(&data, 0, sizeof(RecData));
00634     RecDataSetFromString(data_type, &data, value);
00635     RecSetRecord(rec_type, name, data_type, &data, NULL, false, inc_version);
00636     RecDataClear(data_type, &data);
00637 }
00638 
00639 //-------------------------------------------------------------------------
00640 // RecReadConfigFile
00641 //-------------------------------------------------------------------------
00642 int
00643 RecReadConfigFile(bool inc_version)
00644 {
00645   RecDebug(DL_Note, "Reading '%s'", g_rec_config_fpath);
00646 
00647   // lock our hash table
00648   ink_rwlock_wrlock(&g_records_rwlock);
00649 
00650   // Parse the actual fileand hash the values.
00651   RecConfigFileParse(g_rec_config_fpath, RecConsumeConfigEntry, inc_version);
00652 
00653   // release our hash table
00654   ink_rwlock_unlock(&g_records_rwlock);
00655 
00656   return REC_ERR_OKAY;
00657 }
00658 
00659 
00660 //-------------------------------------------------------------------------
00661 // RecSyncConfigFile
00662 //-------------------------------------------------------------------------
00663 int
00664 RecSyncConfigToTB(textBuffer * tb, bool *inc_version)
00665 {
00666   int err = REC_ERR_FAIL;
00667 
00668   if (inc_version != NULL) {
00669     *inc_version = false;
00670   }
00671   /*
00672    * g_mode_type should be initialized by
00673    * RecLocalInit() or RecProcessInit() earlier.
00674    */
00675   ink_assert(g_mode_type != RECM_NULL);
00676 
00677   if (g_mode_type == RECM_SERVER || g_mode_type == RECM_STAND_ALONE) {
00678     RecRecord *r;
00679     int i, num_records;
00680     RecConfigFileEntry *cfe;
00681     bool sync_to_disk;
00682 
00683     ink_mutex_acquire(&g_rec_config_lock);
00684 
00685     num_records = g_num_records;
00686     sync_to_disk = false;
00687     for (i = 0; i < num_records; i++) {
00688       r = &(g_records[i]);
00689       rec_mutex_acquire(&(r->lock));
00690       if (REC_TYPE_IS_CONFIG(r->rec_type)) {
00691         if (r->sync_required & REC_DISK_SYNC_REQUIRED) {
00692           if (!ink_hash_table_isbound(g_rec_config_contents_ht, r->name)) {
00693             cfe = (RecConfigFileEntry *)ats_malloc(sizeof(RecConfigFileEntry));
00694             cfe->entry_type = RECE_RECORD;
00695             cfe->entry = ats_strdup(r->name);
00696             enqueue(g_rec_config_contents_llq, (void *) cfe);
00697             ink_hash_table_insert(g_rec_config_contents_ht, r->name, NULL);
00698           }
00699           r->sync_required = r->sync_required & ~REC_DISK_SYNC_REQUIRED;
00700           sync_to_disk = true;
00701           if (r->sync_required & REC_INC_CONFIG_VERSION) {
00702             r->sync_required = r->sync_required & ~REC_INC_CONFIG_VERSION;
00703             if (r->rec_type != RECT_LOCAL && inc_version != NULL) {
00704               *inc_version = true;
00705             }
00706           }
00707         }
00708       }
00709       rec_mutex_release(&(r->lock));
00710     }
00711 
00712     if (sync_to_disk) {
00713       char b[1024];
00714 
00715       // okay, we're going to write into our textBuffer
00716       err = REC_ERR_OKAY;
00717       tb->reUse();
00718 
00719       ink_rwlock_rdlock(&g_records_rwlock);
00720 
00721       LLQrec *llq_rec = g_rec_config_contents_llq->head;
00722       while (llq_rec != NULL) {
00723         cfe = (RecConfigFileEntry *) llq_rec->data;
00724         if (cfe->entry_type == RECE_COMMENT) {
00725           tb->copyFrom(cfe->entry, strlen(cfe->entry));
00726           tb->copyFrom("\n", 1);
00727         } else {
00728           if (ink_hash_table_lookup(g_records_ht, cfe->entry, (void **) &r)) {
00729             rec_mutex_acquire(&(r->lock));
00730             // rec_type
00731             switch (r->rec_type) {
00732             case RECT_CONFIG:
00733               tb->copyFrom("CONFIG ", 7);
00734               break;
00735             case RECT_PROCESS:
00736               tb->copyFrom("PROCESS ", 8);
00737               break;
00738             case RECT_NODE:
00739               tb->copyFrom("NODE ", 5);
00740               break;
00741             case RECT_CLUSTER:
00742               tb->copyFrom("CLUSTER ", 8);
00743               break;
00744             case RECT_LOCAL:
00745               tb->copyFrom("LOCAL ", 6);
00746               break;
00747             default:
00748               ink_assert(!"Unexpected RecT type");
00749               break;
00750             }
00751             // name
00752             tb->copyFrom(cfe->entry, strlen(cfe->entry));
00753             tb->copyFrom(" ", 1);
00754             // data_type and value
00755             switch (r->data_type) {
00756             case RECD_INT:
00757               tb->copyFrom("INT ", 4);
00758               snprintf(b, 1023, "%" PRId64 "", r->data.rec_int);
00759               tb->copyFrom(b, strlen(b));
00760               break;
00761             case RECD_FLOAT:
00762               tb->copyFrom("FLOAT ", 6);
00763               snprintf(b, 1023, "%f", r->data.rec_float);
00764               tb->copyFrom(b, strlen(b));
00765               break;
00766             case RECD_STRING:
00767               tb->copyFrom("STRING ", 7);
00768               if (r->data.rec_string) {
00769                 tb->copyFrom(r->data.rec_string, strlen(r->data.rec_string));
00770               } else {
00771                 tb->copyFrom("NULL", strlen("NULL"));
00772               }
00773               break;
00774             case RECD_COUNTER:
00775               tb->copyFrom("COUNTER ", 8);
00776               snprintf(b, 1023, "%" PRId64 "", r->data.rec_counter);
00777               tb->copyFrom(b, strlen(b));
00778               break;
00779             default:
00780               ink_assert(!"Unexpected RecD type");
00781               break;
00782             }
00783             tb->copyFrom("\n", 1);
00784             rec_mutex_release(&(r->lock));
00785           }
00786         }
00787         llq_rec = llq_rec->next;
00788       }
00789       ink_rwlock_unlock(&g_records_rwlock);
00790     }
00791     ink_mutex_release(&g_rec_config_lock);
00792   }
00793 
00794   return err;
00795 }
00796 
00797 
00798 //-------------------------------------------------------------------------
00799 // RecExecConifgUpdateCbs
00800 //-------------------------------------------------------------------------
00801 int
00802 RecExecConfigUpdateCbs(unsigned int update_required_type)
00803 {
00804   RecRecord *r;
00805   int i, num_records;
00806 
00807   num_records = g_num_records;
00808   for (i = 0; i < num_records; i++) {
00809     r = &(g_records[i]);
00810     rec_mutex_acquire(&(r->lock));
00811     if (REC_TYPE_IS_CONFIG(r->rec_type)) {
00812       /* -- upgrade to support a list of callback functions
00813          if ((r->config_meta.update_required & update_required_type) &&
00814          (r->config_meta.update_cb)) {
00815          (*(r->config_meta.update_cb))(r->name, r->data_type, r->data,
00816          r->config_meta.update_cookie);
00817          r->config_meta.update_required =
00818          r->config_meta.update_required & ~update_required_type;
00819          }
00820        */
00821 
00822       if ((r->config_meta.update_required & update_required_type) && (r->config_meta.update_cb_list)) {
00823         RecConfigUpdateCbList *cur_callback = NULL;
00824         for (cur_callback = r->config_meta.update_cb_list; cur_callback; cur_callback = cur_callback->next) {
00825           (*(cur_callback->update_cb)) (r->name, r->data_type, r->data, cur_callback->update_cookie);
00826         }
00827         r->config_meta.update_required = r->config_meta.update_required & ~update_required_type;
00828       }
00829     }
00830     rec_mutex_release(&(r->lock));
00831   }
00832 
00833   return REC_ERR_OKAY;
00834 }
00835 
00836 
00837 //------------------------------------------------------------------------
00838 // RecResetStatRecord
00839 //------------------------------------------------------------------------
00840 int
00841 RecResetStatRecord(const char *name)
00842 {
00843   RecRecord *r1 = NULL;
00844   int err = REC_ERR_OKAY;
00845 
00846   if (ink_hash_table_lookup(g_records_ht, name, (void **) &r1)) {
00847     if (i_am_the_record_owner(r1->rec_type)) {
00848       rec_mutex_acquire(&(r1->lock));
00849       ++(r1->version);
00850       RecDataSet(r1->data_type, &(r1->data), &(r1->data_default));
00851       rec_mutex_release(&(r1->lock));
00852       err = REC_ERR_OKAY;
00853     } else {
00854       RecRecord r2;
00855 
00856       RecRecordInit(&r2);
00857       r2.rec_type = r1->rec_type;
00858       r2.name = r1->name;
00859       r2.data_type = r1->data_type;
00860       r2.data = r1->data_default;
00861 
00862       err = send_reset_message(&r2);
00863       RecRecordFree(&r2);
00864     }
00865   } else {
00866     err = REC_ERR_FAIL;
00867   }
00868 
00869   return err;
00870 }
00871 
00872 
00873 //------------------------------------------------------------------------
00874 // RecResetStatRecord
00875 //------------------------------------------------------------------------
00876 int
00877 RecResetStatRecord(RecT type, bool all)
00878 {
00879   int i, num_records;
00880   int err = REC_ERR_OKAY;
00881 
00882   RecDebug(DL_Note, "Reset Statistics Records");
00883 
00884   num_records = g_num_records;
00885   for (i = 0; i < num_records; i++) {
00886     RecRecord *r1 = &(g_records[i]);
00887 
00888     if (REC_TYPE_IS_STAT(r1->rec_type) && ((type == RECT_NULL) || (r1->rec_type == type)) &&
00889         (all || (r1->stat_meta.persist_type != RECP_NON_PERSISTENT)) &&
00890         (r1->data_type != RECD_STRING)) {
00891       if (i_am_the_record_owner(r1->rec_type)) {
00892         rec_mutex_acquire(&(r1->lock));
00893         ++(r1->version);
00894         if (!RecDataSet(r1->data_type, &(r1->data), &(r1->data_default))) {
00895           err = REC_ERR_FAIL;
00896         }
00897         rec_mutex_release(&(r1->lock));
00898       } else {
00899         RecRecord r2;
00900 
00901         RecRecordInit(&r2);
00902         r2.rec_type = r1->rec_type;
00903         r2.name = r1->name;
00904         r2.data_type = r1->data_type;
00905         r2.data = r1->data_default;
00906 
00907         err = send_reset_message(&r2);
00908         RecRecordFree(&r2);
00909       }
00910     }
00911   }
00912   return err;
00913 }
00914 
00915 
00916 int
00917 RecSetSyncRequired(char *name, bool lock)
00918 {
00919   int err = REC_ERR_FAIL;
00920   RecRecord *r1;
00921 
00922   // FIXME: Most of the time we set, we don't actually need to wrlock
00923   // since we are not modifying the g_records_ht.
00924   if (lock) {
00925     ink_rwlock_wrlock(&g_records_rwlock);
00926   }
00927 
00928   if (ink_hash_table_lookup(g_records_ht, name, (void **) &r1)) {
00929     if (i_am_the_record_owner(r1->rec_type)) {
00930       rec_mutex_acquire(&(r1->lock));
00931       r1->sync_required = REC_SYNC_REQUIRED;
00932       if (REC_TYPE_IS_CONFIG(r1->rec_type)) {
00933         r1->config_meta.update_required = REC_UPDATE_REQUIRED;
00934       }
00935       rec_mutex_release(&(r1->lock));
00936       err = REC_ERR_OKAY;
00937     } else {
00938       // No point of doing the following because our peer will
00939       // set the value with RecDataSet. However, since
00940       // r2.name == r1->name, the sync_required bit will not be
00941       // set.
00942 
00943       /*
00944          RecRecord r2;
00945 
00946          RecRecordInit(&r2);
00947          r2.rec_type  = r1->rec_type;
00948          r2.name      = r1->name;
00949          r2.data_type = r1->data_type;
00950          r2.data      = r1->data_default;
00951 
00952          err = send_set_message(&r2);
00953          RecRecordFree(&r2);
00954        */
00955     }
00956   }
00957 
00958   if (lock) {
00959     ink_rwlock_unlock(&g_records_rwlock);
00960   }
00961 
00962   return err;
00963 }
00964 
00965 int RecWriteConfigFile(textBuffer *tb)
00966 {
00967 #define TMP_FILENAME_EXT_STR ".tmp"
00968 #define TMP_FILENAME_EXT_LEN (sizeof(TMP_FILENAME_EXT_STR) - 1)
00969 
00970   int nbytes;
00971   int filename_len;
00972   int tmp_filename_len;
00973   int result;
00974   char buff[1024];
00975   char *tmp_filename;
00976 
00977   filename_len = strlen(g_rec_config_fpath);
00978   tmp_filename_len = filename_len + TMP_FILENAME_EXT_LEN;
00979   if (tmp_filename_len < (int)sizeof(buff)) {
00980     tmp_filename = buff;
00981   } else {
00982     tmp_filename = (char *)ats_malloc(tmp_filename_len + 1);
00983   }
00984   sprintf(tmp_filename, "%s%s", g_rec_config_fpath, TMP_FILENAME_EXT_STR);
00985 
00986   RecDebug(DL_Note, "Writing '%s'", g_rec_config_fpath);
00987 
00988   RecHandle h_file = RecFileOpenW(tmp_filename);
00989   do {
00990     if (h_file == REC_HANDLE_INVALID) {
00991       RecLog(DL_Warning, "open file: %s to write fail, errno: %d, error info: %s",
00992           tmp_filename, errno, strerror(errno));
00993       result = REC_ERR_FAIL;
00994       break;
00995     }
00996 
00997     if (RecFileWrite(h_file, tb->bufPtr(), tb->spaceUsed(), &nbytes) != REC_ERR_OKAY) {
00998       RecLog(DL_Warning, "write to file: %s fail, errno: %d, error info: %s",
00999           tmp_filename, errno, strerror(errno));
01000       result = REC_ERR_FAIL;
01001       break;
01002     }
01003 
01004     if (nbytes != tb->spaceUsed()) {
01005       RecLog(DL_Warning, "write to file: %s fail, disk maybe full", tmp_filename);
01006       result = REC_ERR_FAIL;
01007       break;
01008     }
01009 
01010     if (RecFileSync(h_file) != REC_ERR_OKAY) {
01011       RecLog(DL_Warning, "fsync file: %s fail, errno: %d, error info: %s",
01012           tmp_filename, errno, strerror(errno));
01013       result = REC_ERR_FAIL;
01014       break;
01015     }
01016     if (RecFileClose(h_file) != REC_ERR_OKAY) {
01017       RecLog(DL_Warning, "close file: %s fail, errno: %d, error info: %s",
01018           tmp_filename, errno, strerror(errno));
01019       result = REC_ERR_FAIL;
01020       break;
01021     }
01022     h_file = REC_HANDLE_INVALID;
01023 
01024     if (rename(tmp_filename, g_rec_config_fpath) != 0) {
01025       RecLog(DL_Warning, "rename file %s to %s fail, errno: %d, error info: %s",
01026           tmp_filename, g_rec_config_fpath, errno, strerror(errno));
01027       result = REC_ERR_FAIL;
01028       break;
01029     }
01030 
01031     result = REC_ERR_OKAY;
01032   } while (0);
01033 
01034   if (h_file != REC_HANDLE_INVALID) {
01035     RecFileClose(h_file);
01036   }
01037   if (tmp_filename != buff) {
01038     ats_free(tmp_filename);
01039   }
01040   return result;
01041 }
01042 

Generated by  doxygen 1.7.1