• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

SpdyClientSession.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   SpdyClientSession.cc
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 #include "SpdyClientSession.h"
00025 #include "I_Net.h"
00026 
00027 static ClassAllocator<SpdyClientSession> spdyClientSessionAllocator("spdyClientSessionAllocator");
00028 ClassAllocator<SpdyRequest> spdyRequestAllocator("spdyRequestAllocator");
00029 
00030 #if TS_HAS_SPDY
00031 #include "SpdyClientSession.h"
00032 
00033 static const spdylay_proto_version versmap[] = {
00034   SPDYLAY_PROTO_SPDY2,    // SPDY_VERSION_2
00035   SPDYLAY_PROTO_SPDY3,    // SPDY_VERSION_3
00036   SPDYLAY_PROTO_SPDY3_1,  // SPDY_VERSION_3_1
00037 };
00038 
00039 static char const* const  npnmap[] = {
00040   TS_NPN_PROTOCOL_SPDY_2,
00041   TS_NPN_PROTOCOL_SPDY_3,
00042   TS_NPN_PROTOCOL_SPDY_3_1
00043 };
00044 
00045 #endif
00046 static int spdy_process_read(TSEvent event, SpdyClientSession *sm);
00047 static int spdy_process_write(TSEvent event, SpdyClientSession *sm);
00048 static int spdy_process_fetch(TSEvent event, SpdyClientSession *sm, void *edata);
00049 static int spdy_process_fetch_header(TSEvent event, SpdyClientSession *sm, TSFetchSM fetch_sm);
00050 static int spdy_process_fetch_body(TSEvent event, SpdyClientSession *sm, TSFetchSM fetch_sm);
00051 static uint64_t g_sm_id = 1;
00052 
00053 void
00054 SpdyRequest::init(SpdyClientSession *sm, int id)
00055 {
00056   spdy_sm = sm;
00057   stream_id = id;
00058   headers.clear();
00059 
00060   MD5_Init(&recv_md5);
00061   start_time = TShrtime();
00062 
00063   SPDY_INCREMENT_THREAD_DYN_STAT(SPDY_STAT_CURRENT_CLIENT_STREAM_COUNT, sm->mutex->thread_holding);
00064 }
00065 
00066 void
00067 SpdyRequest::clear()
00068 {
00069   SPDY_DECREMENT_THREAD_DYN_STAT(SPDY_STAT_CURRENT_CLIENT_STREAM_COUNT, spdy_sm->mutex->thread_holding);
00070 
00071   if (fetch_sm) {
00072     TSFetchDestroy(fetch_sm);
00073     fetch_sm = NULL;
00074   }
00075 
00076   vector<pair<string, string> >().swap(headers);
00077 
00078   std::string().swap(url);
00079   std::string().swap(host);
00080   std::string().swap(path);
00081   std::string().swap(scheme);
00082   std::string().swap(method);
00083   std::string().swap(version);
00084 
00085   Debug("spdy", "****Delete Request[%" PRIu64 ":%d]", spdy_sm->sm_id, stream_id);
00086 }
00087 
00088 void
00089 SpdyClientSession::init(NetVConnection * netvc, spdy::SessionVersion vers)
00090 {
00091   int r;
00092 
00093   this->mutex = new_ProxyMutex();
00094   this->vc = netvc;
00095   this->req_map.clear();
00096   this->version = vers;
00097 
00098   r = spdylay_session_server_new(&session, versmap[vers], &spdy_callbacks, this);
00099 
00100   // A bit ugly but we need a thread and I don't want to wait until the
00101   // session start event in case of a time out generating a decrement
00102   // with no increment. It seems a lesser thing to have the thread counts
00103   // a little off but globally consistent.
00104   SPDY_INCREMENT_THREAD_DYN_STAT(SPDY_STAT_CURRENT_CLIENT_SESSION_COUNT, netvc->mutex->thread_holding);
00105   SPDY_INCREMENT_THREAD_DYN_STAT(SPDY_STAT_TOTAL_CLIENT_CONNECTION_COUNT, netvc->mutex->thread_holding);
00106 
00107   ink_release_assert(r == 0);
00108   sm_id = atomic_inc(g_sm_id);
00109   total_size = 0;
00110   start_time = TShrtime();
00111 
00112   this->vc->set_inactivity_timeout(HRTIME_SECONDS(spdy_accept_no_activity_timeout));
00113   SET_HANDLER(&SpdyClientSession::state_session_start);
00114 
00115 }
00116 
00117 void
00118 SpdyClientSession::clear()
00119 {
00120   int last_event = event;
00121 
00122   SPDY_DECREMENT_THREAD_DYN_STAT(SPDY_STAT_CURRENT_CLIENT_SESSION_COUNT, this->mutex->thread_holding);
00123 
00124   //
00125   // SpdyRequest depends on SpdyClientSession,
00126   // we should delete it firstly to avoid race.
00127   //
00128   map<int, SpdyRequest*>::iterator iter = req_map.begin();
00129   map<int, SpdyRequest*>::iterator endIter = req_map.end();
00130   for(; iter != endIter; ++iter) {
00131     SpdyRequest *req = iter->second;
00132     if (req) {
00133       req->clear();
00134       spdyRequestAllocator.free(req);
00135     } else {
00136       Error("req null in SpdSM::clear");
00137     }
00138   }
00139   req_map.clear();
00140 
00141   this->mutex = NULL;
00142 
00143   if (vc) {
00144     TSVConnClose(reinterpret_cast<TSVConn>(vc));
00145     vc = NULL;
00146   }
00147 
00148 
00149   if (req_reader) {
00150     TSIOBufferReaderFree(req_reader);
00151     req_reader = NULL;
00152   }
00153 
00154   if (req_buffer) {
00155     TSIOBufferDestroy(req_buffer);
00156     req_buffer = NULL;
00157   }
00158 
00159   if (resp_reader) {
00160     TSIOBufferReaderFree(resp_reader);
00161     resp_reader = NULL;
00162   }
00163 
00164   if (resp_buffer) {
00165     TSIOBufferDestroy(resp_buffer);
00166     resp_buffer = NULL;
00167   }
00168 
00169   if (session) {
00170     spdylay_session_del(session);
00171     session = NULL;
00172   }
00173 
00174   Debug("spdy-free", "****Delete SpdyClientSession[%" PRIu64 "], last event:%d" PRIu64,
00175         sm_id, last_event);
00176 }
00177 
00178 void
00179 spdy_cs_create(NetVConnection * netvc, spdy::SessionVersion vers, MIOBuffer * iobuf, IOBufferReader * reader)
00180 {
00181   SpdyClientSession  *sm;
00182 
00183   sm = spdyClientSessionAllocator.alloc();
00184   sm->init(netvc, vers);
00185 
00186   sm->req_buffer = iobuf ? reinterpret_cast<TSIOBuffer>(iobuf) : TSIOBufferCreate();
00187   sm->req_reader = reader ? reinterpret_cast<TSIOBufferReader>(reader) : TSIOBufferReaderAlloc(sm->req_buffer);
00188 
00189   sm->resp_buffer = TSIOBufferCreate();
00190   sm->resp_reader = TSIOBufferReaderAlloc(sm->resp_buffer);
00191 
00192   eventProcessor.schedule_imm(sm, ET_NET);
00193 }
00194 
00195 int
00196 SpdyClientSession::state_session_start(int /* event */, void * /* edata */)
00197 {
00198   const spdylay_settings_entry entries[] = {
00199     { SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS, SPDYLAY_ID_FLAG_SETTINGS_NONE, spdy_max_concurrent_streams },
00200     { SPDYLAY_SETTINGS_INITIAL_WINDOW_SIZE, SPDYLAY_ID_FLAG_SETTINGS_NONE, spdy_initial_window_size }
00201   };
00202   int r;
00203 
00204   if (TSIOBufferReaderAvail(this->req_reader) > 0) {
00205     spdy_process_read(TS_EVENT_VCONN_WRITE_READY, this);
00206   }
00207 
00208   this->read_vio = (TSVIO)this->vc->do_io_read(this, INT64_MAX, reinterpret_cast<MIOBuffer *>(this->req_buffer));
00209   this->write_vio = (TSVIO)this->vc->do_io_write(this, INT64_MAX, reinterpret_cast<IOBufferReader *>(this->resp_reader));
00210 
00211   SET_HANDLER(&SpdyClientSession::state_session_readwrite);
00212 
00213   r = spdylay_submit_settings(this->session, SPDYLAY_FLAG_SETTINGS_NONE, entries, countof(entries));
00214   ink_assert(r == 0);
00215 
00216   if (this->version >= spdy::SESSION_VERSION_3_1 && spdy_initial_window_size > (1 << 16)) {
00217     int32_t delta = (spdy_initial_window_size - SPDYLAY_INITIAL_WINDOW_SIZE);
00218 
00219     r = spdylay_submit_window_update(this->session, 0, delta);
00220     ink_assert(r == 0);
00221   }
00222 
00223   TSVIOReenable(this->write_vio);
00224   return EVENT_CONT;
00225 }
00226 
00227 int
00228 SpdyClientSession::state_session_readwrite(int event, void * edata)
00229 {
00230   int ret = 0;
00231   bool from_fetch = false;
00232 
00233   this->event = event;
00234 
00235   if (edata == this->read_vio) {
00236     Debug("spdy", "++++[READ EVENT]");
00237     if (event != TS_EVENT_VCONN_READ_READY &&
00238         event != TS_EVENT_VCONN_READ_COMPLETE) {
00239       ret = -1;
00240       goto out;
00241     }
00242     ret = spdy_process_read((TSEvent)event, this);
00243   } else if (edata == this->write_vio) {
00244     Debug("spdy", "----[WRITE EVENT]");
00245     if (event != TS_EVENT_VCONN_WRITE_READY &&
00246         event != TS_EVENT_VCONN_WRITE_COMPLETE) {
00247       ret = -1;
00248       goto out;
00249     }
00250     ret = spdy_process_write((TSEvent)event, this);
00251   } else {
00252     from_fetch = true;
00253     ret = spdy_process_fetch((TSEvent)event, this, edata);
00254   }
00255 
00256   Debug("spdy-event", "++++SpdyClientSession[%" PRIu64 "], EVENT:%d, ret:%d",
00257         this->sm_id, event, ret);
00258 out:
00259   if (ret) {
00260     this->clear();
00261     spdyClientSessionAllocator.free(this);
00262   } else if (!from_fetch) {
00263     this->vc->set_inactivity_timeout(HRTIME_SECONDS(spdy_no_activity_timeout_in));
00264   }
00265 
00266   return EVENT_CONT;
00267 }
00268 
00269 int64_t
00270 SpdyClientSession::getPluginId() const
00271 {
00272   return sm_id;
00273 }
00274 
00275 char const*
00276 SpdyClientSession::getPluginTag() const
00277 {
00278   return npnmap[this->version];
00279 }
00280 
00281 
00282 static int
00283 spdy_process_read(TSEvent /* event ATS_UNUSED */, SpdyClientSession *sm)
00284 {
00285   return spdylay_session_recv(sm->session);
00286 }
00287 
00288 static int
00289 spdy_process_write(TSEvent /* event ATS_UNUSED */, SpdyClientSession *sm)
00290 {
00291   int ret;
00292 
00293   ret = spdylay_session_send(sm->session);
00294 
00295   if (TSIOBufferReaderAvail(sm->resp_reader) > 0)
00296     TSVIOReenable(sm->write_vio);
00297   else {
00298     Debug("spdy", "----TOTAL SEND (sm_id:%" PRIu64 ", total_size:%" PRIu64 ", total_send:%" PRId64 ")",
00299           sm->sm_id, sm->total_size, TSVIONDoneGet(sm->write_vio));
00300 
00301     //
00302     // We should reenable read_vio when no data to be written,
00303     // otherwise it could lead to hang issue when client POST
00304     // data is waiting to be read.
00305     //
00306     TSVIOReenable(sm->read_vio);
00307   }
00308 
00309   return ret;
00310 }
00311 
00312 static int
00313 spdy_process_fetch(TSEvent event, SpdyClientSession *sm, void *edata)
00314 {
00315   int ret = -1;
00316   TSFetchSM fetch_sm = (TSFetchSM)edata;
00317   SpdyRequest *req = (SpdyRequest *)TSFetchUserDataGet(fetch_sm);
00318 
00319   switch ((int)event) {
00320 
00321   case TS_FETCH_EVENT_EXT_HEAD_DONE:
00322     Debug("spdy", "----[FETCH HEADER DONE]");
00323     ret = spdy_process_fetch_header(event, sm, fetch_sm);
00324     break;
00325 
00326   case TS_FETCH_EVENT_EXT_BODY_READY:
00327     Debug("spdy", "----[FETCH BODY READY]");
00328     ret = spdy_process_fetch_body(event, sm, fetch_sm);
00329     break;
00330 
00331   case TS_FETCH_EVENT_EXT_BODY_DONE:
00332     Debug("spdy", "----[FETCH BODY DONE]");
00333     req->fetch_body_completed = true;
00334     ret = spdy_process_fetch_body(event, sm, fetch_sm);
00335     break;
00336 
00337   default:
00338     Debug("spdy", "----[FETCH ERROR]");
00339     if (req->fetch_body_completed)
00340       ret = 0; // Ignore fetch errors after FETCH BODY DONE
00341     else {
00342       Error("spdy_process_fetch fetch error, fetch_sm %p, ret %d for sm_id %" PRId64 ", stream_id %u, req time %" PRId64 ", url %s", req->fetch_sm, ret, sm->sm_id, req->stream_id, req->start_time, req->url.c_str());
00343       req->fetch_sm = NULL;
00344     }
00345     break;
00346   }
00347 
00348   if (ret) {
00349     Error("spdy_process_fetch sending STATUS_500, fetch_sm %p, ret %d for sm_id %" PRId64 ", stream_id %u, req time %" PRId64 ", url %s", req->fetch_sm, ret, sm->sm_id, req->stream_id, req->start_time, req->url.c_str());
00350     spdy_prepare_status_response_and_clean_request(sm, req->stream_id, STATUS_500);
00351     // It is better to send back a 500 response on the stream and have the client connection remain open.  However, we
00352     // have seen a core around this.  We have a local patch to close the client connection (return -1) this is related
00353     // to TS-2883.  TS-2883 still needs to be fixed.
00354   }
00355 
00356   return 0;
00357 }
00358 
00359 static int
00360 spdy_process_fetch_header(TSEvent /*event*/, SpdyClientSession *sm, TSFetchSM fetch_sm)
00361 {
00362   int ret = -1;
00363   SpdyRequest *req = (SpdyRequest *)TSFetchUserDataGet(fetch_sm);
00364 
00365   SpdyNV spdy_nv(fetch_sm);
00366 
00367   Debug("spdy", "----spdylay_submit_syn_reply");
00368   if (sm->session) {
00369     ret = spdylay_submit_syn_reply(sm->session,
00370                                  SPDYLAY_CTRL_FLAG_NONE, req->stream_id,
00371                                  spdy_nv.nv);
00372   } else {
00373     Error("spdy_process_fetch_header, sm->session NULL, sm_id %" PRId64 ", fetch_sm %p, stream_id %d, req_time %" PRId64 ", url %s", sm->sm_id, fetch_sm, req->stream_id, req->start_time, req->url.c_str());
00374   }
00375 
00376   TSVIOReenable(sm->write_vio);
00377   return ret;
00378 }
00379 
00380 static ssize_t
00381 spdy_read_fetch_body_callback(spdylay_session * /*session*/, int32_t stream_id,
00382                               uint8_t *buf, size_t length, int *eof,
00383                               spdylay_data_source *source, void *user_data)
00384 {
00385 
00386   static int g_call_cnt;
00387   int64_t already;
00388 
00389   SpdyClientSession *sm = (SpdyClientSession *)user_data;
00390   SpdyRequest *req = (SpdyRequest *)source->ptr;
00391 
00392   //
00393   // req has been deleted, ignore this data.
00394   //
00395   if (req != sm->find_request(stream_id)) {
00396     Debug("spdy", "    stream_id:%d, call:%d, req has been deleted, return 0",
00397           stream_id, g_call_cnt);
00398     *eof = 1;
00399     return 0;
00400   }
00401 
00402   already = TSFetchReadData(req->fetch_sm, buf, length);
00403 
00404   Debug("spdy", "    stream_id:%d, call:%d, length:%ld, already:%" PRId64,
00405         stream_id, g_call_cnt, length, already);
00406   if (is_debug_tag_set("spdy"))
00407     MD5_Update(&req->recv_md5, buf, already);
00408 
00409   TSVIOReenable(sm->write_vio);
00410   g_call_cnt++;
00411 
00412   req->fetch_data_len += already;
00413   if (already < (int64_t)length) {
00414     if (req->event == TS_FETCH_EVENT_EXT_BODY_DONE) {
00415       TSHRTime end_time = TShrtime();
00416       SPDY_SUM_THREAD_DYN_STAT(SPDY_STAT_TOTAL_TRANSACTIONS_TIME, sm->mutex->thread_holding, end_time - req->start_time);
00417       Debug("spdy", "----Request[%" PRIu64 ":%d] %s %lld %d", sm->sm_id, req->stream_id,
00418             req->url.c_str(), (end_time - req->start_time)/TS_HRTIME_MSECOND,
00419             req->fetch_data_len);
00420       unsigned char digest[MD5_DIGEST_LENGTH];
00421       if (is_debug_tag_set("spdy")) {
00422         MD5_Final(digest, &req->recv_md5);
00423         char md5_strbuf[MD5_DIGEST_LENGTH * 2 + 1];
00424         for (int i = 0; i < MD5_DIGEST_LENGTH; i++) {
00425           snprintf(md5_strbuf + (i * 2), 3 /* null byte counts towards the limit */, "%02x", digest[i]);
00426         }
00427         Debug("spdy", "----recv md5sum: %s", md5_strbuf);
00428       }
00429       *eof = 1;
00430       sm->cleanup_request(stream_id);
00431     } else if (already == 0) {
00432       req->need_resume_data = true;
00433       return SPDYLAY_ERR_DEFERRED;
00434     }
00435   }
00436 
00437   return already;
00438 }
00439 
00440 static int
00441 spdy_process_fetch_body(TSEvent event, SpdyClientSession *sm, TSFetchSM fetch_sm)
00442 {
00443   int ret = 0;
00444   spdylay_data_provider data_prd;
00445   SpdyRequest *req = (SpdyRequest *)TSFetchUserDataGet(fetch_sm);
00446   req->event = event;
00447 
00448   data_prd.source.ptr = (void *)req;
00449   data_prd.read_callback = spdy_read_fetch_body_callback;
00450 
00451   if (!req->has_submitted_data) {
00452     req->has_submitted_data = true;
00453     Debug("spdy", "----spdylay_submit_data");
00454     ret = spdylay_submit_data(sm->session, req->stream_id,
00455                               SPDYLAY_DATA_FLAG_FIN, &data_prd);
00456   } else if (req->need_resume_data) {
00457     Debug("spdy", "----spdylay_session_resume_data");
00458     ret = spdylay_session_resume_data(sm->session, req->stream_id);
00459     if (ret == SPDYLAY_ERR_INVALID_ARGUMENT)
00460       ret = 0;
00461   }
00462 
00463   TSVIOReenable(sm->write_vio);
00464   return ret;
00465 }

Generated by  doxygen 1.7.1