00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 #ifndef __P_CLUSTERINLINE_H__
00030 #define __P_CLUSTERINLINE_H__
00031 #include "P_ClusterCacheInternal.h"
00032 #include "P_CacheInternal.h"
00033 #include "P_ClusterHandler.h"
00034 
00035 inline Action *
00036 Cluster_lookup(Continuation * cont, CacheKey * key, CacheFragType frag_type, char *hostname, int host_len)
00037 {
00038   
00039   Action *retAct;
00040   ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
00041   if (m && !clusterProcessor.disable_remote_cluster_ops(m)) {
00042     CacheContinuation *cc = CacheContinuation::cacheContAllocator_alloc();
00043     cc->action = cont;
00044     cc->mutex = cont->mutex;
00045     retAct = CacheContinuation::do_remote_lookup(cont, key, cc, frag_type, hostname, host_len);
00046     if (retAct) {
00047       return retAct;
00048     } else {
00049       
00050       CacheContinuation::cacheContAllocator_free(cc);
00051       return (Action *) NULL;
00052     }
00053   } else {
00054     Action a;
00055     a = cont;
00056     return CacheContinuation::callback_failure(&a, CACHE_EVENT_LOOKUP_FAILED, 0);
00057   }
00058   return (Action *) NULL;
00059 }
00060 
00061 inline Action *
00062 Cluster_read(ClusterMachine * owner_machine, int opcode,
00063              Continuation * cont, MIOBuffer * buf,
00064              CacheURL * url, CacheHTTPHdr * request,
00065              CacheLookupHttpConfig * params, CacheKey * key,
00066              time_t pin_in_cache, CacheFragType frag_type, char *hostname, int host_len)
00067 {
00068   (void) params;
00069   if (clusterProcessor.disable_remote_cluster_ops(owner_machine)) {
00070     Action a;
00071     a = cont;
00072     return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_READ_FAILED, 0);
00073   }
00074   int vers = CacheOpMsg_long::protoToVersion(owner_machine->msg_proto_major);
00075   int flen;
00076   int len = 0;
00077   int res = 0;
00078   char *msg;
00079   char *data;
00080 
00081   if (vers == CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) {
00082     if ((opcode == CACHE_OPEN_READ_LONG)
00083         || (opcode == CACHE_OPEN_READ_BUFFER_LONG)) {
00084       
00085       flen = op_to_sizeof_fixedlen_msg(opcode);
00086 
00087       const char *url_hostname;
00088       int url_hlen;
00089       INK_MD5 url_only_md5;
00090 
00091       Cache::generate_key(&url_only_md5, url);
00092       url_hostname = url->host_get(&url_hlen);
00093 
00094       len += request->m_heap->marshal_length();
00095       len += params->marshal_length();
00096       len += url_hlen;
00097 
00098       if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)       
00099         goto err_exit;
00100 
00101       
00102       msg = (char *) ALLOCA_DOUBLE(flen + len);
00103       data = msg + flen;
00104 
00105       int cur_len = len;
00106 
00107       res = request->m_heap->marshal(data, cur_len);
00108       if (res < 0) {
00109         goto err_exit;
00110       }
00111       data += res;
00112       cur_len -= res;
00113       if ((res = params->marshal(data, cur_len)) < 0)
00114         goto err_exit;
00115       data += res;
00116       memcpy(data, url_hostname, url_hlen);
00117 
00118       CacheOpArgs_General readArgs;
00119       readArgs.url_md5 = &url_only_md5;
00120       readArgs.pin_in_cache = pin_in_cache;
00121       readArgs.frag_type = frag_type;
00122       return CacheContinuation::do_op(cont, owner_machine, (void *) &readArgs,
00123                                       opcode, (char *) msg, (flen + len), -1, buf);
00124     } else {
00125       
00126 
00127       if (host_len) {
00128         
00129         flen = op_to_sizeof_fixedlen_msg(opcode);
00130         len = host_len;
00131 
00132         if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)     
00133           goto err_exit;
00134 
00135         msg = (char *) ALLOCA_DOUBLE(flen + len);
00136         data = msg + flen;
00137         memcpy(data, hostname, host_len);
00138 
00139       } else {
00140         msg = 0;
00141         flen = 0;
00142         len = 0;
00143       }
00144       CacheOpArgs_General readArgs;
00145       readArgs.url_md5 = key;
00146       readArgs.frag_type = frag_type;
00147       return CacheContinuation::do_op(cont, owner_machine, (void *) &readArgs,
00148                                       opcode, (char *) msg, (flen + len), -1, buf);
00149     }
00150 
00151   } else {
00152 
00153     
00154 
00155     ink_release_assert(!"CacheOpMsg_long [read] bad msg version");
00156   }
00157 err_exit:
00158   Action a;
00159   a = cont;
00160   return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_READ_FAILED, 0);
00161 }
00162 
00163 inline Action *
00164 Cluster_write(Continuation * cont, int expected_size,
00165               MIOBuffer * buf, ClusterMachine * m,
00166               INK_MD5 * url_md5, CacheFragType ft, int options,
00167               time_t pin_in_cache, int opcode,
00168               CacheKey * key, CacheURL * url,
00169               CacheHTTPHdr * request, CacheHTTPInfo * old_info, char *hostname, int host_len)
00170 {
00171   (void) key;
00172   (void) request;
00173   if (clusterProcessor.disable_remote_cluster_ops(m)) {
00174     Action a;
00175     a = cont;
00176     return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_WRITE_FAILED, 0);
00177   }
00178   char *msg = 0;
00179   char *data = 0;
00180   int allow_multiple_writes = 0;
00181   int len = 0;
00182   int flen = 0;
00183   int vers = CacheOpMsg_long::protoToVersion(m->msg_proto_major);
00184 
00185   switch (opcode) {
00186   case CACHE_OPEN_WRITE:
00187     {
00188       
00189       if (host_len) {
00190         
00191         flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE);
00192         len = host_len;
00193 
00194         if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)     
00195           goto err_exit;
00196 
00197         msg = (char *) ALLOCA_DOUBLE(flen + len);
00198         data = msg + flen;
00199 
00200         memcpy(data, hostname, host_len);
00201       }
00202       break;
00203     }
00204   case CACHE_OPEN_WRITE_LONG:
00205     {
00206       int url_hlen;
00207       const char *url_hostname = url->host_get(&url_hlen);
00208 
00209       
00210       flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE_LONG);
00211       len = 0;
00212 
00213       if (old_info == (CacheHTTPInfo *) CACHE_ALLOW_MULTIPLE_WRITES) {
00214         old_info = 0;
00215         allow_multiple_writes = 1;
00216       }
00217       if (old_info) {
00218         len += old_info->marshal_length();
00219       }
00220       len += url_hlen;
00221 
00222       if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)       
00223         goto err_exit;
00224 
00225       
00226       msg = (char *) ALLOCA_DOUBLE(flen + len);
00227       data = msg + flen;
00228 
00229       if (old_info) {
00230         int res = old_info->marshal(data, len);
00231 
00232         if (res < 0) {
00233           goto err_exit;
00234         }
00235         data += res;
00236       }
00237       memcpy(data, url_hostname, url_hlen);
00238       break;
00239     }
00240   default:
00241     {
00242       ink_release_assert(!"open_write_internal invalid opcode.");
00243     }                           
00244   }                             
00245 
00246   if (vers == CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) {
00247     
00248     CacheOpArgs_General writeArgs;
00249     writeArgs.url_md5 = url_md5;
00250     writeArgs.pin_in_cache = pin_in_cache;
00251     writeArgs.frag_type = ft;
00252     writeArgs.cfl_flags |= (options & CACHE_WRITE_OPT_OVERWRITE ? CFL_OVERWRITE_ON_WRITE : 0);
00253     writeArgs.cfl_flags |= (old_info ? CFL_LOPENWRITE_HAVE_OLDINFO : 0);
00254     writeArgs.cfl_flags |= (allow_multiple_writes ? CFL_ALLOW_MULTIPLE_WRITES : 0);
00255 
00256     return CacheContinuation::do_op(cont, m, (void *) &writeArgs, opcode, msg, flen + len, expected_size, buf);
00257   } else {
00258 
00259     
00260 
00261     ink_release_assert(!"CacheOpMsg_long [write] bad msg version");
00262     return (Action *) 0;
00263   }
00264 
00265 err_exit:
00266   Action a;
00267   a = cont;
00268   return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_WRITE_FAILED, 0);
00269 }
00270 
00271 inline Action *
00272 Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey * to,
00273              CacheFragType type, char *hostname, int host_len)
00274 {
00275   if (clusterProcessor.disable_remote_cluster_ops(m)) {
00276     Action a;
00277     a = cont;
00278     return CacheContinuation::callback_failure(&a, CACHE_EVENT_LINK_FAILED, 0);
00279   }
00280 
00281   int vers = CacheOpMsg_short_2::protoToVersion(m->msg_proto_major);
00282   if (vers == CacheOpMsg_short_2::CACHE_OP_SHORT_2_MESSAGE_VERSION) {
00283     
00284 
00285     
00286     int flen = op_to_sizeof_fixedlen_msg(CACHE_LINK);
00287     int len = host_len;
00288 
00289     if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) 
00290       goto err_exit;
00291 
00292     char *msg = (char *) ALLOCA_DOUBLE(flen + len);
00293     memcpy((msg + flen), hostname, host_len);
00294 
00295     
00296     CacheOpArgs_Link linkArgs;
00297     linkArgs.from = from;
00298     linkArgs.to = to;
00299     linkArgs.frag_type = type;
00300     return CacheContinuation::do_op(cont, m, (void *) &linkArgs, CACHE_LINK, msg, (flen + len));
00301   } else {
00302 
00303     
00304 
00305     ink_release_assert(!"CacheOpMsg_short_2 [CACHE_LINK] bad msg version");
00306     return 0;
00307   }
00308 
00309 err_exit:
00310   Action a;
00311   a = cont;
00312   return CacheContinuation::callback_failure(&a, CACHE_EVENT_LINK_FAILED, 0);
00313 }
00314 
00315 inline Action *
00316 Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFragType type, char *hostname, int host_len)
00317 {
00318   if (clusterProcessor.disable_remote_cluster_ops(m)) {
00319     Action a;
00320     a = cont;
00321     return CacheContinuation::callback_failure(&a, CACHE_EVENT_DEREF_FAILED, 0);
00322   }
00323 
00324   int vers = CacheOpMsg_short::protoToVersion(m->msg_proto_major);
00325   if (vers == CacheOpMsg_short::CACHE_OP_SHORT_MESSAGE_VERSION) {
00326     
00327 
00328     
00329     int flen = op_to_sizeof_fixedlen_msg(CACHE_DEREF);
00330     int len = host_len;
00331 
00332     if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) 
00333       goto err_exit;
00334 
00335     char *msg = (char *) ALLOCA_DOUBLE(flen + len);
00336     memcpy((msg + flen), hostname, host_len);
00337 
00338     
00339     CacheOpArgs_Deref drefArgs;
00340     drefArgs.md5 = key;
00341     drefArgs.frag_type = type;
00342     return CacheContinuation::do_op(cont, m, (void *) &drefArgs, CACHE_DEREF, msg, (flen + len));
00343   } else {
00344 
00345     
00346 
00347     ink_release_assert(!"CacheOpMsg_short [CACHE_DEREF] bad msg version");
00348     return 0;
00349   }
00350 
00351 err_exit:
00352   Action a;
00353   a = cont;
00354   return CacheContinuation::callback_failure(&a, CACHE_EVENT_DEREF_FAILED, 0);
00355 }
00356 
00357 inline Action *
00358 Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
00359                bool rm_user_agents, bool rm_link, CacheFragType frag_type, char *hostname, int host_len)
00360 {
00361   if (clusterProcessor.disable_remote_cluster_ops(m)) {
00362     Action a;
00363     a = cont;
00364     return CacheContinuation::callback_failure(&a, CACHE_EVENT_REMOVE_FAILED, 0);
00365   }
00366 
00367   int vers = CacheOpMsg_short::protoToVersion(m->msg_proto_major);
00368   if (vers == CacheOpMsg_short::CACHE_OP_SHORT_MESSAGE_VERSION) {
00369     
00370 
00371     
00372     int flen = op_to_sizeof_fixedlen_msg(CACHE_REMOVE);
00373     int len = host_len;
00374 
00375     if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) 
00376       goto err_exit;
00377 
00378     char *msg = (char *) ALLOCA_DOUBLE(flen + len);
00379     memcpy((msg + flen), hostname, host_len);
00380 
00381     
00382     CacheOpArgs_General updateArgs;
00383     updateArgs.url_md5 = key;
00384     updateArgs.cfl_flags |= (rm_user_agents ? CFL_REMOVE_USER_AGENTS : 0);
00385     updateArgs.cfl_flags |= (rm_link ? CFL_REMOVE_LINK : 0);
00386     updateArgs.frag_type = frag_type;
00387     return CacheContinuation::do_op(cont, m, (void *) &updateArgs, CACHE_REMOVE, msg, (flen + len));
00388   } else {
00389 
00390     
00391 
00392     ink_release_assert(!"CacheOpMsg_short [CACHE_REMOVE] bad msg version");
00393     return (Action *) 0;
00394   }
00395 
00396 err_exit:
00397   Action a;
00398   a = cont;
00399   return CacheContinuation::callback_failure(&a, CACHE_EVENT_REMOVE_FAILED, 0);
00400 }
00401 
00402 #endif