• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

P_ClusterInline.h

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 /****************************************************************************
00025 
00026   ClusterInline.h
00027 ****************************************************************************/
00028 
00029 #ifndef __P_CLUSTERINLINE_H__
00030 #define __P_CLUSTERINLINE_H__
00031 #include "P_ClusterCacheInternal.h"
00032 #include "P_CacheInternal.h"
00033 #include "P_ClusterHandler.h"
00034 
00035 inline Action *
00036 Cluster_lookup(Continuation * cont, CacheKey * key, CacheFragType frag_type, char *hostname, int host_len)
00037 {
00038   // Try to send remote, if not possible, handle locally
00039   Action *retAct;
00040   ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
00041   if (m && !clusterProcessor.disable_remote_cluster_ops(m)) {
00042     CacheContinuation *cc = CacheContinuation::cacheContAllocator_alloc();
00043     cc->action = cont;
00044     cc->mutex = cont->mutex;
00045     retAct = CacheContinuation::do_remote_lookup(cont, key, cc, frag_type, hostname, host_len);
00046     if (retAct) {
00047       return retAct;
00048     } else {
00049       // not remote, do local lookup
00050       CacheContinuation::cacheContAllocator_free(cc);
00051       return (Action *) NULL;
00052     }
00053   } else {
00054     Action a;
00055     a = cont;
00056     return CacheContinuation::callback_failure(&a, CACHE_EVENT_LOOKUP_FAILED, 0);
00057   }
00058   return (Action *) NULL;
00059 }
00060 
00061 inline Action *
00062 Cluster_read(ClusterMachine * owner_machine, int opcode,
00063              Continuation * cont, MIOBuffer * buf,
00064              CacheURL * url, CacheHTTPHdr * request,
00065              CacheLookupHttpConfig * params, CacheKey * key,
00066              time_t pin_in_cache, CacheFragType frag_type, char *hostname, int host_len)
00067 {
00068   (void) params;
00069   if (clusterProcessor.disable_remote_cluster_ops(owner_machine)) {
00070     Action a;
00071     a = cont;
00072     return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_READ_FAILED, 0);
00073   }
00074   int vers = CacheOpMsg_long::protoToVersion(owner_machine->msg_proto_major);
00075   int flen;
00076   int len = 0;
00077   int res = 0;
00078   char *msg;
00079   char *data;
00080 
00081   if (vers == CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) {
00082     if ((opcode == CACHE_OPEN_READ_LONG)
00083         || (opcode == CACHE_OPEN_READ_BUFFER_LONG)) {
00084       // Determine length of data to Marshal
00085       flen = op_to_sizeof_fixedlen_msg(opcode);
00086 
00087       const char *url_hostname;
00088       int url_hlen;
00089       INK_MD5 url_only_md5;
00090 
00091       Cache::generate_key(&url_only_md5, url);
00092       url_hostname = url->host_get(&url_hlen);
00093 
00094       len += request->m_heap->marshal_length();
00095       len += params->marshal_length();
00096       len += url_hlen;
00097 
00098       if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)       // Bound marshalled data
00099         goto err_exit;
00100 
00101       // Perform data Marshal operation
00102       msg = (char *) ALLOCA_DOUBLE(flen + len);
00103       data = msg + flen;
00104 
00105       int cur_len = len;
00106 
00107       res = request->m_heap->marshal(data, cur_len);
00108       if (res < 0) {
00109         goto err_exit;
00110       }
00111       data += res;
00112       cur_len -= res;
00113       if ((res = params->marshal(data, cur_len)) < 0)
00114         goto err_exit;
00115       data += res;
00116       memcpy(data, url_hostname, url_hlen);
00117 
00118       CacheOpArgs_General readArgs;
00119       readArgs.url_md5 = &url_only_md5;
00120       readArgs.pin_in_cache = pin_in_cache;
00121       readArgs.frag_type = frag_type;
00122       return CacheContinuation::do_op(cont, owner_machine, (void *) &readArgs,
00123                                       opcode, (char *) msg, (flen + len), -1, buf);
00124     } else {
00125       // Build message if we have host data.
00126 
00127       if (host_len) {
00128         // Determine length of data to Marshal
00129         flen = op_to_sizeof_fixedlen_msg(opcode);
00130         len = host_len;
00131 
00132         if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)     // Bound marshalled data
00133           goto err_exit;
00134 
00135         msg = (char *) ALLOCA_DOUBLE(flen + len);
00136         data = msg + flen;
00137         memcpy(data, hostname, host_len);
00138 
00139       } else {
00140         msg = 0;
00141         flen = 0;
00142         len = 0;
00143       }
00144       CacheOpArgs_General readArgs;
00145       readArgs.url_md5 = key;
00146       readArgs.frag_type = frag_type;
00147       return CacheContinuation::do_op(cont, owner_machine, (void *) &readArgs,
00148                                       opcode, (char *) msg, (flen + len), -1, buf);
00149     }
00150 
00151   } else {
00152     //////////////////////////////////////////////////////////////
00153     // Create the specified down rev version of this message
00154     //////////////////////////////////////////////////////////////
00155     ink_release_assert(!"CacheOpMsg_long [read] bad msg version");
00156   }
00157 err_exit:
00158   Action a;
00159   a = cont;
00160   return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_READ_FAILED, 0);
00161 }
00162 
00163 inline Action *
00164 Cluster_write(Continuation * cont, int expected_size,
00165               MIOBuffer * buf, ClusterMachine * m,
00166               INK_MD5 * url_md5, CacheFragType ft, int options,
00167               time_t pin_in_cache, int opcode,
00168               CacheKey * key, CacheURL * url,
00169               CacheHTTPHdr * request, CacheHTTPInfo * old_info, char *hostname, int host_len)
00170 {
00171   (void) key;
00172   (void) request;
00173   if (clusterProcessor.disable_remote_cluster_ops(m)) {
00174     Action a;
00175     a = cont;
00176     return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_WRITE_FAILED, 0);
00177   }
00178   char *msg = 0;
00179   char *data = 0;
00180   int allow_multiple_writes = 0;
00181   int len = 0;
00182   int flen = 0;
00183   int vers = CacheOpMsg_long::protoToVersion(m->msg_proto_major);
00184 
00185   switch (opcode) {
00186   case CACHE_OPEN_WRITE:
00187     {
00188       // Build message if we have host data
00189       if (host_len) {
00190         // Determine length of data to Marshal
00191         flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE);
00192         len = host_len;
00193 
00194         if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)     // Bound marshalled data
00195           goto err_exit;
00196 
00197         msg = (char *) ALLOCA_DOUBLE(flen + len);
00198         data = msg + flen;
00199 
00200         memcpy(data, hostname, host_len);
00201       }
00202       break;
00203     }
00204   case CACHE_OPEN_WRITE_LONG:
00205     {
00206       int url_hlen;
00207       const char *url_hostname = url->host_get(&url_hlen);
00208 
00209       // Determine length of data to Marshal
00210       flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE_LONG);
00211       len = 0;
00212 
00213       if (old_info == (CacheHTTPInfo *) CACHE_ALLOW_MULTIPLE_WRITES) {
00214         old_info = 0;
00215         allow_multiple_writes = 1;
00216       }
00217       if (old_info) {
00218         len += old_info->marshal_length();
00219       }
00220       len += url_hlen;
00221 
00222       if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE)       // Bound marshalled data
00223         goto err_exit;
00224 
00225       // Perform data Marshal operation
00226       msg = (char *) ALLOCA_DOUBLE(flen + len);
00227       data = msg + flen;
00228 
00229       if (old_info) {
00230         int res = old_info->marshal(data, len);
00231 
00232         if (res < 0) {
00233           goto err_exit;
00234         }
00235         data += res;
00236       }
00237       memcpy(data, url_hostname, url_hlen);
00238       break;
00239     }
00240   default:
00241     {
00242       ink_release_assert(!"open_write_internal invalid opcode.");
00243     }                           // End of case
00244   }                             // End of switch
00245 
00246   if (vers == CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) {
00247     // Do remote open_write()
00248     CacheOpArgs_General writeArgs;
00249     writeArgs.url_md5 = url_md5;
00250     writeArgs.pin_in_cache = pin_in_cache;
00251     writeArgs.frag_type = ft;
00252     writeArgs.cfl_flags |= (options & CACHE_WRITE_OPT_OVERWRITE ? CFL_OVERWRITE_ON_WRITE : 0);
00253     writeArgs.cfl_flags |= (old_info ? CFL_LOPENWRITE_HAVE_OLDINFO : 0);
00254     writeArgs.cfl_flags |= (allow_multiple_writes ? CFL_ALLOW_MULTIPLE_WRITES : 0);
00255 
00256     return CacheContinuation::do_op(cont, m, (void *) &writeArgs, opcode, msg, flen + len, expected_size, buf);
00257   } else {
00258     //////////////////////////////////////////////////////////////
00259     // Create the specified down rev version of this message
00260     //////////////////////////////////////////////////////////////
00261     ink_release_assert(!"CacheOpMsg_long [write] bad msg version");
00262     return (Action *) 0;
00263   }
00264 
00265 err_exit:
00266   Action a;
00267   a = cont;
00268   return CacheContinuation::callback_failure(&a, CACHE_EVENT_OPEN_WRITE_FAILED, 0);
00269 }
00270 
00271 inline Action *
00272 Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey * to,
00273              CacheFragType type, char *hostname, int host_len)
00274 {
00275   if (clusterProcessor.disable_remote_cluster_ops(m)) {
00276     Action a;
00277     a = cont;
00278     return CacheContinuation::callback_failure(&a, CACHE_EVENT_LINK_FAILED, 0);
00279   }
00280 
00281   int vers = CacheOpMsg_short_2::protoToVersion(m->msg_proto_major);
00282   if (vers == CacheOpMsg_short_2::CACHE_OP_SHORT_2_MESSAGE_VERSION) {
00283     // Do remote link
00284 
00285     // Allocate memory for message header
00286     int flen = op_to_sizeof_fixedlen_msg(CACHE_LINK);
00287     int len = host_len;
00288 
00289     if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
00290       goto err_exit;
00291 
00292     char *msg = (char *) ALLOCA_DOUBLE(flen + len);
00293     memcpy((msg + flen), hostname, host_len);
00294 
00295     // Setup args for remote link
00296     CacheOpArgs_Link linkArgs;
00297     linkArgs.from = from;
00298     linkArgs.to = to;
00299     linkArgs.frag_type = type;
00300     return CacheContinuation::do_op(cont, m, (void *) &linkArgs, CACHE_LINK, msg, (flen + len));
00301   } else {
00302     //////////////////////////////////////////////////////////////
00303     // Create the specified down rev version of this message
00304     //////////////////////////////////////////////////////////////
00305     ink_release_assert(!"CacheOpMsg_short_2 [CACHE_LINK] bad msg version");
00306     return 0;
00307   }
00308 
00309 err_exit:
00310   Action a;
00311   a = cont;
00312   return CacheContinuation::callback_failure(&a, CACHE_EVENT_LINK_FAILED, 0);
00313 }
00314 
00315 inline Action *
00316 Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFragType type, char *hostname, int host_len)
00317 {
00318   if (clusterProcessor.disable_remote_cluster_ops(m)) {
00319     Action a;
00320     a = cont;
00321     return CacheContinuation::callback_failure(&a, CACHE_EVENT_DEREF_FAILED, 0);
00322   }
00323 
00324   int vers = CacheOpMsg_short::protoToVersion(m->msg_proto_major);
00325   if (vers == CacheOpMsg_short::CACHE_OP_SHORT_MESSAGE_VERSION) {
00326     // Do remote deref
00327 
00328     // Allocate memory for message header
00329     int flen = op_to_sizeof_fixedlen_msg(CACHE_DEREF);
00330     int len = host_len;
00331 
00332     if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
00333       goto err_exit;
00334 
00335     char *msg = (char *) ALLOCA_DOUBLE(flen + len);
00336     memcpy((msg + flen), hostname, host_len);
00337 
00338     // Setup args for remote deref
00339     CacheOpArgs_Deref drefArgs;
00340     drefArgs.md5 = key;
00341     drefArgs.frag_type = type;
00342     return CacheContinuation::do_op(cont, m, (void *) &drefArgs, CACHE_DEREF, msg, (flen + len));
00343   } else {
00344     //////////////////////////////////////////////////////////////
00345     // Create the specified down rev version of this message
00346     //////////////////////////////////////////////////////////////
00347     ink_release_assert(!"CacheOpMsg_short [CACHE_DEREF] bad msg version");
00348     return 0;
00349   }
00350 
00351 err_exit:
00352   Action a;
00353   a = cont;
00354   return CacheContinuation::callback_failure(&a, CACHE_EVENT_DEREF_FAILED, 0);
00355 }
00356 
00357 inline Action *
00358 Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key,
00359                bool rm_user_agents, bool rm_link, CacheFragType frag_type, char *hostname, int host_len)
00360 {
00361   if (clusterProcessor.disable_remote_cluster_ops(m)) {
00362     Action a;
00363     a = cont;
00364     return CacheContinuation::callback_failure(&a, CACHE_EVENT_REMOVE_FAILED, 0);
00365   }
00366 
00367   int vers = CacheOpMsg_short::protoToVersion(m->msg_proto_major);
00368   if (vers == CacheOpMsg_short::CACHE_OP_SHORT_MESSAGE_VERSION) {
00369     // Do remote update
00370 
00371     // Allocate memory for message header
00372     int flen = op_to_sizeof_fixedlen_msg(CACHE_REMOVE);
00373     int len = host_len;
00374 
00375     if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data
00376       goto err_exit;
00377 
00378     char *msg = (char *) ALLOCA_DOUBLE(flen + len);
00379     memcpy((msg + flen), hostname, host_len);
00380 
00381     // Setup args for remote update
00382     CacheOpArgs_General updateArgs;
00383     updateArgs.url_md5 = key;
00384     updateArgs.cfl_flags |= (rm_user_agents ? CFL_REMOVE_USER_AGENTS : 0);
00385     updateArgs.cfl_flags |= (rm_link ? CFL_REMOVE_LINK : 0);
00386     updateArgs.frag_type = frag_type;
00387     return CacheContinuation::do_op(cont, m, (void *) &updateArgs, CACHE_REMOVE, msg, (flen + len));
00388   } else {
00389     //////////////////////////////////////////////////////////////
00390     // Create the specified down rev version of this message
00391     //////////////////////////////////////////////////////////////
00392     ink_release_assert(!"CacheOpMsg_short [CACHE_REMOVE] bad msg version");
00393     return (Action *) 0;
00394   }
00395 
00396 err_exit:
00397   Action a;
00398   a = cont;
00399   return CacheContinuation::callback_failure(&a, CACHE_EVENT_REMOVE_FAILED, 0);
00400 }
00401 
00402 #endif /* __CLUSTERINLINE_H__ */

Generated by  doxygen 1.7.1