00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #ifndef __P_CLUSTERINLINE_H__
00030 #define __P_CLUSTERINLINE_H__
00031 #include "P_ClusterCacheInternal.h"
00032 #include "P_CacheInternal.h"
00033 #include "P_ClusterHandler.h"
00034
00035 inline Action *
00036 Cluster_lookup(Continuation * cont, CacheKey * key, CacheFragType frag_type, char *hostname, int host_len)
00037 {
00038
00039 Action *retAct;
00040 ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
00041 if (m && !clusterProcessor.disable_remote_cluster_ops(m)) {
00042 CacheContinuation *cc = CacheContinuation::cacheContAllocator_alloc();
00043 cc->action = cont;
00044 cc->mutex = cont->mutex;
00045 retAct = CacheContinuation::do_remote_lookup(cont, key, cc, frag_type, hostname, host_len);
00046 if (retAct) {
00047 return retAct;
00048 } else {
00049
00050 CacheContinuation::cacheContAllocator_free(cc);
00051 return (Action *) NULL;
00052 }
00053 } else {
00054 Action a;
00055 a = cont;
00056 return CacheContinuation::callback_failure(&a, CACHE_EVENT_LOOKUP_FAILED, 0);
00057 }
00058 return (Action *) NULL;
00059 }
00060
00061 inline Action *
00062 Cluster_read(ClusterMachine * owner_machine, int opcode,
00063 Continuation * cont, MIOBuffer * buf,
00064 CacheURL * url, CacheHTTPHdr * request,
00065 CacheLookupHttpConfig * params, CacheKey * key,
00066 time_t pin_in_cache, CacheFragType frag_type, char *hostname, int host_len)
00067 {
00068 (void) params;
00069 if (clusterProcessor.disable_remote_cluster_ops(owner_machine)) {
00070 Action a;
00071 a = cont;
00072 return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_READ_FAILED, 0);
00073 }
00074 int vers = CacheOpMsg_long::protoToVersion(owner_machine->msg_proto_major);
00075 int flen;
00076 int len = 0;
00077 int res = 0;
00078 char *msg;
00079 char *data;
00080
00081 if (vers == CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) {
00082 if ((opcode == CACHE_OPEN_READ_LONG)
00083 || (opcode == CACHE_OPEN_READ_BUFFER_LONG)) {
00084
00085 flen = op_to_sizeof_fixedlen_msg(opcode);
00086
00087 const char *url_hostname;
00088 int url_hlen;
00089 INK_MD5 url_only_md5;
00090
00091 Cache::generate_key(&url_only_md5, url);
00092 url_hostname = url->host_get(&url_hlen);
00093
00094 len += request->m_heap->marshal_length();
00095 len += params->marshal_length();
00096 len += url_hlen;
00097
00098 if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)
00099 goto err_exit;
00100
00101
00102 msg = (char *) ALLOCA_DOUBLE(flen + len);
00103 data = msg + flen;
00104
00105 int cur_len = len;
00106
00107 res = request->m_heap->marshal(data, cur_len);
00108 if (res < 0) {
00109 goto err_exit;
00110 }
00111 data += res;
00112 cur_len -= res;
00113 if ((res = params->marshal(data, cur_len)) < 0)
00114 goto err_exit;
00115 data += res;
00116 memcpy(data, url_hostname, url_hlen);
00117
00118 CacheOpArgs_General readArgs;
00119 readArgs.url_md5 = &url_only_md5;
00120 readArgs.pin_in_cache = pin_in_cache;
00121 readArgs.frag_type = frag_type;
00122 return CacheContinuation::do_op(cont, owner_machine, (void *) &readArgs,
00123 opcode, (char *) msg, (flen + len), -1, buf);
00124 } else {
00125
00126
00127 if (host_len) {
00128
00129 flen = op_to_sizeof_fixedlen_msg(opcode);
00130 len = host_len;
00131
00132 if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)
00133 goto err_exit;
00134
00135 msg = (char *) ALLOCA_DOUBLE(flen + len);
00136 data = msg + flen;
00137 memcpy(data, hostname, host_len);
00138
00139 } else {
00140 msg = 0;
00141 flen = 0;
00142 len = 0;
00143 }
00144 CacheOpArgs_General readArgs;
00145 readArgs.url_md5 = key;
00146 readArgs.frag_type = frag_type;
00147 return CacheContinuation::do_op(cont, owner_machine, (void *) &readArgs,
00148 opcode, (char *) msg, (flen + len), -1, buf);
00149 }
00150
00151 } else {
00152
00153
00154
00155 ink_release_assert(!"CacheOpMsg_long [read] bad msg version");
00156 }
00157 err_exit:
00158 Action a;
00159 a = cont;
00160 return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_READ_FAILED, 0);
00161 }
00162
00163 inline Action *
00164 Cluster_write(Continuation * cont, int expected_size,
00165 MIOBuffer * buf, ClusterMachine * m,
00166 INK_MD5 * url_md5, CacheFragType ft, int options,
00167 time_t pin_in_cache, int opcode,
00168 CacheKey * key, CacheURL * url,
00169 CacheHTTPHdr * request, CacheHTTPInfo * old_info, char *hostname, int host_len)
00170 {
00171 (void) key;
00172 (void) request;
00173 if (clusterProcessor.disable_remote_cluster_ops(m)) {
00174 Action a;
00175 a = cont;
00176 return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_WRITE_FAILED, 0);
00177 }
00178 char *msg = 0;
00179 char *data = 0;
00180 int allow_multiple_writes = 0;
00181 int len = 0;
00182 int flen = 0;
00183 int vers = CacheOpMsg_long::protoToVersion(m->msg_proto_major);
00184
00185 switch (opcode) {
00186 case CACHE_OPEN_WRITE:
00187 {
00188
00189 if (host_len) {
00190
00191 flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE);
00192 len = host_len;
00193
00194 if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)
00195 goto err_exit;
00196
00197 msg = (char *) ALLOCA_DOUBLE(flen + len);
00198 data = msg + flen;
00199
00200 memcpy(data, hostname, host_len);
00201 }
00202 break;
00203 }
00204 case CACHE_OPEN_WRITE_LONG:
00205 {
00206 int url_hlen;
00207 const char *url_hostname = url->host_get(&url_hlen);
00208
00209
00210 flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE_LONG);
00211 len = 0;
00212
00213 if (old_info == (CacheHTTPInfo *) CACHE_ALLOW_MULTIPLE_WRITES) {
00214 old_info = 0;
00215 allow_multiple_writes = 1;
00216 }
00217 if (old_info) {
00218 len += old_info->marshal_length();
00219 }
00220 len += url_hlen;
00221
00222 if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)
00223 goto err_exit;
00224
00225
00226 msg = (char *) ALLOCA_DOUBLE(flen + len);
00227 data = msg + flen;
00228
00229 if (old_info) {
00230 int res = old_info->marshal(data, len);
00231
00232 if (res < 0) {
00233 goto err_exit;
00234 }
00235 data += res;
00236 }
00237 memcpy(data, url_hostname, url_hlen);
00238 break;
00239 }
00240 default:
00241 {
00242 ink_release_assert(!"open_write_internal invalid opcode.");
00243 }
00244 }
00245
00246 if (vers == CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) {
00247
00248 CacheOpArgs_General writeArgs;
00249 writeArgs.url_md5 = url_md5;
00250 writeArgs.pin_in_cache = pin_in_cache;
00251 writeArgs.frag_type = ft;
00252 writeArgs.cfl_flags |= (options & CACHE_WRITE_OPT_OVERWRITE ? CFL_OVERWRITE_ON_WRITE : 0);
00253 writeArgs.cfl_flags |= (old_info ? CFL_LOPENWRITE_HAVE_OLDINFO : 0);
00254 writeArgs.cfl_flags |= (allow_multiple_writes ? CFL_ALLOW_MULTIPLE_WRITES : 0);
00255
00256 return CacheContinuation::do_op(cont, m, (void *) &writeArgs, opcode, msg, flen + len, expected_size, buf);
00257 } else {
00258
00259
00260
00261 ink_release_assert(!"CacheOpMsg_long [write] bad msg version");
00262 return (Action *) 0;
00263 }
00264
00265 err_exit:
00266 Action a;
00267 a = cont;
00268 return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_WRITE_FAILED, 0);
00269 }
00270
00271 inline Action *
00272 Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey * to,
00273 CacheFragType type, char *hostname, int host_len)
00274 {
00275 if (clusterProcessor.disable_remote_cluster_ops(m)) {
00276 Action a;
00277 a = cont;
00278 return CacheContinuation::callback_failure(&a, CACHE_EVENT_LINK_FAILED, 0);
00279 }
00280
00281 int vers = CacheOpMsg_short_2::protoToVersion(m->msg_proto_major);
00282 if (vers == CacheOpMsg_short_2::CACHE_OP_SHORT_2_MESSAGE_VERSION) {
00283
00284
00285
00286 int flen = op_to_sizeof_fixedlen_msg(CACHE_LINK);
00287 int len = host_len;
00288
00289 if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)
00290 goto err_exit;
00291
00292 char *msg = (char *) ALLOCA_DOUBLE(flen + len);
00293 memcpy((msg + flen), hostname, host_len);
00294
00295
00296 CacheOpArgs_Link linkArgs;
00297 linkArgs.from = from;
00298 linkArgs.to = to;
00299 linkArgs.frag_type = type;
00300 return CacheContinuation::do_op(cont, m, (void *) &linkArgs, CACHE_LINK, msg, (flen + len));
00301 } else {
00302
00303
00304
00305 ink_release_assert(!"CacheOpMsg_short_2 [CACHE_LINK] bad msg version");
00306 return 0;
00307 }
00308
00309 err_exit:
00310 Action a;
00311 a = cont;
00312 return CacheContinuation::callback_failure(&a, CACHE_EVENT_LINK_FAILED, 0);
00313 }
00314
00315 inline Action *
00316 Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFragType type, char *hostname, int host_len)
00317 {
00318 if (clusterProcessor.disable_remote_cluster_ops(m)) {
00319 Action a;
00320 a = cont;
00321 return CacheContinuation::callback_failure(&a, CACHE_EVENT_DEREF_FAILED, 0);
00322 }
00323
00324 int vers = CacheOpMsg_short::protoToVersion(m->msg_proto_major);
00325 if (vers == CacheOpMsg_short::CACHE_OP_SHORT_MESSAGE_VERSION) {
00326
00327
00328
00329 int flen = op_to_sizeof_fixedlen_msg(CACHE_DEREF);
00330 int len = host_len;
00331
00332 if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)
00333 goto err_exit;
00334
00335 char *msg = (char *) ALLOCA_DOUBLE(flen + len);
00336 memcpy((msg + flen), hostname, host_len);
00337
00338
00339 CacheOpArgs_Deref drefArgs;
00340 drefArgs.md5 = key;
00341 drefArgs.frag_type = type;
00342 return CacheContinuation::do_op(cont, m, (void *) &drefArgs, CACHE_DEREF, msg, (flen + len));
00343 } else {
00344
00345
00346
00347 ink_release_assert(!"CacheOpMsg_short [CACHE_DEREF] bad msg version");
00348 return 0;
00349 }
00350
00351 err_exit:
00352 Action a;
00353 a = cont;
00354 return CacheContinuation::callback_failure(&a, CACHE_EVENT_DEREF_FAILED, 0);
00355 }
00356
00357 inline Action *
00358 Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
00359 bool rm_user_agents, bool rm_link, CacheFragType frag_type, char *hostname, int host_len)
00360 {
00361 if (clusterProcessor.disable_remote_cluster_ops(m)) {
00362 Action a;
00363 a = cont;
00364 return CacheContinuation::callback_failure(&a, CACHE_EVENT_REMOVE_FAILED, 0);
00365 }
00366
00367 int vers = CacheOpMsg_short::protoToVersion(m->msg_proto_major);
00368 if (vers == CacheOpMsg_short::CACHE_OP_SHORT_MESSAGE_VERSION) {
00369
00370
00371
00372 int flen = op_to_sizeof_fixedlen_msg(CACHE_REMOVE);
00373 int len = host_len;
00374
00375 if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)
00376 goto err_exit;
00377
00378 char *msg = (char *) ALLOCA_DOUBLE(flen + len);
00379 memcpy((msg + flen), hostname, host_len);
00380
00381
00382 CacheOpArgs_General updateArgs;
00383 updateArgs.url_md5 = key;
00384 updateArgs.cfl_flags |= (rm_user_agents ? CFL_REMOVE_USER_AGENTS : 0);
00385 updateArgs.cfl_flags |= (rm_link ? CFL_REMOVE_LINK : 0);
00386 updateArgs.frag_type = frag_type;
00387 return CacheContinuation::do_op(cont, m, (void *) &updateArgs, CACHE_REMOVE, msg, (flen + len));
00388 } else {
00389
00390
00391
00392 ink_release_assert(!"CacheOpMsg_short [CACHE_REMOVE] bad msg version");
00393 return (Action *) 0;
00394 }
00395
00396 err_exit:
00397 Action a;
00398 a = cont;
00399 return CacheContinuation::callback_failure(&a, CACHE_EVENT_REMOVE_FAILED, 0);
00400 }
00401
00402 #endif