Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "libts.h"
00025 #include "Rollback.h"
00026 #include "ParseRules.h"
00027 #include "P_RecCore.h"
00028 #include "P_RecLocal.h"
00029 #include "P_RecMessage.h"
00030 #include "P_RecUtils.h"
00031 #include "P_RecFile.h"
00032 #include "LocalManager.h"
00033 #include "FileManager.h"
00034
00035
00036 static bool message_initialized_p = false;
00037
00038
00039
00040
00041 bool
00042 i_am_the_record_owner(RecT rec_type)
00043 {
00044 switch (rec_type) {
00045 case RECT_CONFIG:
00046 case RECT_NODE:
00047 case RECT_CLUSTER:
00048 case RECT_LOCAL:
00049 return true;
00050 case RECT_PROCESS:
00051 case RECT_PLUGIN:
00052 return false;
00053 default:
00054 ink_assert(!"Unexpected RecT type");
00055 return false;
00056 }
00057 }
00058
00059
00060
00061
00062 static void *
00063 sync_thr(void * data)
00064 {
00065 textBuffer *tb = new textBuffer(65536);
00066 FileManager * configFiles = (FileManager *)data;
00067
00068 Rollback *rb;
00069 bool inc_version;
00070 bool written;
00071
00072 while (1) {
00073 send_push_message();
00074 RecSyncStatsFile();
00075 if (RecSyncConfigToTB(tb, &inc_version) == REC_ERR_OKAY) {
00076 written = false;
00077 if (configFiles->getRollbackObj(REC_CONFIG_FILE, &rb)) {
00078 if (inc_version) {
00079 RecDebug(DL_Note, "Rollback: '%s'", REC_CONFIG_FILE);
00080 version_t ver = rb->getCurrentVersion();
00081 if ((rb->updateVersion(tb, ver, -1, false)) != OK_ROLLBACK) {
00082 RecDebug(DL_Note, "Rollback failed: '%s'", REC_CONFIG_FILE);
00083 }
00084 written = true;
00085 }
00086 }
00087 else {
00088 rb = NULL;
00089 }
00090 if (!written) {
00091 RecWriteConfigFile(tb);
00092 if (rb != NULL) {
00093 rb->setLastModifiedTime();
00094 }
00095 }
00096 }
00097 usleep(REC_REMOTE_SYNC_INTERVAL_MS * 1000);
00098 }
00099 return NULL;
00100 }
00101
00102
00103
00104
00105
00106 static void *
00107 config_update_thr(void * )
00108 {
00109 while (true) {
00110 RecExecConfigUpdateCbs(REC_LOCAL_UPDATE_REQUIRED);
00111 usleep(REC_CONFIG_UPDATE_INTERVAL_MS * 1000);
00112 }
00113 return NULL;
00114 }
00115
00116
00117
00118
00119
00120 void
00121 RecMessageInit()
00122 {
00123 ink_assert(g_mode_type != RECM_NULL);
00124 lmgmt->registerMgmtCallback(MGMT_SIGNAL_LIBRECORDS, RecMessageRecvThis, NULL);
00125 message_initialized_p = true;
00126 }
00127
00128
00129
00130
00131 int
00132 RecLocalInit(Diags * _diags)
00133 {
00134 static bool initialized_p = false;;
00135
00136 if (initialized_p) {
00137 return REC_ERR_OKAY;
00138 }
00139
00140 g_mode_type = RECM_SERVER;
00141
00142 if (RecCoreInit(RECM_SERVER, _diags) == REC_ERR_FAIL) {
00143 return REC_ERR_FAIL;
00144 }
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155 initialized_p = true;
00156
00157 return REC_ERR_OKAY;
00158 }
00159
00160
00161
00162
00163
00164 int
00165 RecLocalInitMessage()
00166 {
00167 static bool initialized_p = false;
00168
00169 if (initialized_p) {
00170 return REC_ERR_OKAY;
00171 }
00172
00173 RecMessageInit();
00174 if (RecMessageRegisterRecvCb(recv_message_cb, NULL)) {
00175 return REC_ERR_FAIL;
00176 }
00177
00178 initialized_p = true;
00179
00180 return REC_ERR_OKAY;
00181 }
00182
00183
00184
00185
00186 int
00187 RecLocalStart(FileManager * configFiles)
00188 {
00189 ink_thread_create(sync_thr, configFiles);
00190 ink_thread_create(config_update_thr, NULL);
00191
00192 return REC_ERR_OKAY;
00193 }
00194
00195 int
00196 RecRegisterManagerCb(int id, RecManagerCb _fn, void *_data)
00197 {
00198 return lmgmt->registerMgmtCallback(id, _fn, _data);
00199 }
00200
00201 void
00202 RecSignalManager(int id, const char *, size_t)
00203 {
00204
00205
00206 RecDebug(DL_Debug, "local manager dropping signal %d", id);
00207 }
00208
00209
00210
00211
00212
00213 int
00214 RecMessageSend(RecMessage * msg)
00215 {
00216 int msg_size;
00217
00218 if (!message_initialized_p)
00219 return REC_ERR_OKAY;
00220
00221
00222 if (g_mode_type == RECM_CLIENT || g_mode_type == RECM_SERVER) {
00223 msg->o_end = msg->o_write;
00224 msg_size = sizeof(RecMessageHdr) + (msg->o_write - msg->o_start);
00225 lmgmt->signalEvent(MGMT_EVENT_LIBRECORDS, (char *) msg, msg_size);
00226 }
00227
00228 return REC_ERR_OKAY;
00229 }
00230