00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 #include "SpdyClientSession.h"
00025 #include "I_Net.h"
00026 
00027 static ClassAllocator<SpdyClientSession> spdyClientSessionAllocator("spdyClientSessionAllocator");
00028 ClassAllocator<SpdyRequest> spdyRequestAllocator("spdyRequestAllocator");
00029 
00030 #if TS_HAS_SPDY
00031 #include "SpdyClientSession.h"
00032 
00033 static const spdylay_proto_version versmap[] = {
00034   SPDYLAY_PROTO_SPDY2,    
00035   SPDYLAY_PROTO_SPDY3,    
00036   SPDYLAY_PROTO_SPDY3_1,  
00037 };
00038 
00039 static char const* const  npnmap[] = {
00040   TS_NPN_PROTOCOL_SPDY_2,
00041   TS_NPN_PROTOCOL_SPDY_3,
00042   TS_NPN_PROTOCOL_SPDY_3_1
00043 };
00044 
00045 #endif
00046 static int spdy_process_read(TSEvent event, SpdyClientSession *sm);
00047 static int spdy_process_write(TSEvent event, SpdyClientSession *sm);
00048 static int spdy_process_fetch(TSEvent event, SpdyClientSession *sm, void *edata);
00049 static int spdy_process_fetch_header(TSEvent event, SpdyClientSession *sm, TSFetchSM fetch_sm);
00050 static int spdy_process_fetch_body(TSEvent event, SpdyClientSession *sm, TSFetchSM fetch_sm);
00051 static uint64_t g_sm_id = 1;
00052 
00053 void
00054 SpdyRequest::init(SpdyClientSession *sm, int id)
00055 {
00056   spdy_sm = sm;
00057   stream_id = id;
00058   headers.clear();
00059 
00060   MD5_Init(&recv_md5);
00061   start_time = TShrtime();
00062 
00063   SPDY_INCREMENT_THREAD_DYN_STAT(SPDY_STAT_CURRENT_CLIENT_STREAM_COUNT, sm->mutex->thread_holding);
00064 }
00065 
00066 void
00067 SpdyRequest::clear()
00068 {
00069   SPDY_DECREMENT_THREAD_DYN_STAT(SPDY_STAT_CURRENT_CLIENT_STREAM_COUNT, spdy_sm->mutex->thread_holding);
00070 
00071   if (fetch_sm) {
00072     TSFetchDestroy(fetch_sm);
00073     fetch_sm = NULL;
00074   }
00075 
00076   vector<pair<string, string> >().swap(headers);
00077 
00078   std::string().swap(url);
00079   std::string().swap(host);
00080   std::string().swap(path);
00081   std::string().swap(scheme);
00082   std::string().swap(method);
00083   std::string().swap(version);
00084 
00085   Debug("spdy", "****Delete Request[%" PRIu64 ":%d]", spdy_sm->sm_id, stream_id);
00086 }
00087 
00088 void
00089 SpdyClientSession::init(NetVConnection * netvc, spdy::SessionVersion vers)
00090 {
00091   int r;
00092 
00093   this->mutex = new_ProxyMutex();
00094   this->vc = netvc;
00095   this->req_map.clear();
00096   this->version = vers;
00097 
00098   r = spdylay_session_server_new(&session, versmap[vers], &spdy_callbacks, this);
00099 
00100   
00101   
00102   
00103   
00104   SPDY_INCREMENT_THREAD_DYN_STAT(SPDY_STAT_CURRENT_CLIENT_SESSION_COUNT, netvc->mutex->thread_holding);
00105   SPDY_INCREMENT_THREAD_DYN_STAT(SPDY_STAT_TOTAL_CLIENT_CONNECTION_COUNT, netvc->mutex->thread_holding);
00106 
00107   ink_release_assert(r == 0);
00108   sm_id = atomic_inc(g_sm_id);
00109   total_size = 0;
00110   start_time = TShrtime();
00111 
00112   this->vc->set_inactivity_timeout(HRTIME_SECONDS(spdy_accept_no_activity_timeout));
00113   SET_HANDLER(&SpdyClientSession::state_session_start);
00114 
00115 }
00116 
00117 void
00118 SpdyClientSession::clear()
00119 {
00120   int last_event = event;
00121 
00122   SPDY_DECREMENT_THREAD_DYN_STAT(SPDY_STAT_CURRENT_CLIENT_SESSION_COUNT, this->mutex->thread_holding);
00123 
00124   
00125   
00126   
00127   
00128   map<int, SpdyRequest*>::iterator iter = req_map.begin();
00129   map<int, SpdyRequest*>::iterator endIter = req_map.end();
00130   for(; iter != endIter; ++iter) {
00131     SpdyRequest *req = iter->second;
00132     if (req) {
00133       req->clear();
00134       spdyRequestAllocator.free(req);
00135     } else {
00136       Error("req null in SpdSM::clear");
00137     }
00138   }
00139   req_map.clear();
00140 
00141   this->mutex = NULL;
00142 
00143   if (vc) {
00144     TSVConnClose(reinterpret_cast<TSVConn>(vc));
00145     vc = NULL;
00146   }
00147 
00148 
00149   if (req_reader) {
00150     TSIOBufferReaderFree(req_reader);
00151     req_reader = NULL;
00152   }
00153 
00154   if (req_buffer) {
00155     TSIOBufferDestroy(req_buffer);
00156     req_buffer = NULL;
00157   }
00158 
00159   if (resp_reader) {
00160     TSIOBufferReaderFree(resp_reader);
00161     resp_reader = NULL;
00162   }
00163 
00164   if (resp_buffer) {
00165     TSIOBufferDestroy(resp_buffer);
00166     resp_buffer = NULL;
00167   }
00168 
00169   if (session) {
00170     spdylay_session_del(session);
00171     session = NULL;
00172   }
00173 
00174   Debug("spdy-free", "****Delete SpdyClientSession[%" PRIu64 "], last event:%d" PRIu64,
00175         sm_id, last_event);
00176 }
00177 
00178 void
00179 spdy_cs_create(NetVConnection * netvc, spdy::SessionVersion vers, MIOBuffer * iobuf, IOBufferReader * reader)
00180 {
00181   SpdyClientSession  *sm;
00182 
00183   sm = spdyClientSessionAllocator.alloc();
00184   sm->init(netvc, vers);
00185 
00186   sm->req_buffer = iobuf ? reinterpret_cast<TSIOBuffer>(iobuf) : TSIOBufferCreate();
00187   sm->req_reader = reader ? reinterpret_cast<TSIOBufferReader>(reader) : TSIOBufferReaderAlloc(sm->req_buffer);
00188 
00189   sm->resp_buffer = TSIOBufferCreate();
00190   sm->resp_reader = TSIOBufferReaderAlloc(sm->resp_buffer);
00191 
00192   eventProcessor.schedule_imm(sm, ET_NET);
00193 }
00194 
00195 int
00196 SpdyClientSession::state_session_start(int , void * )
00197 {
00198   const spdylay_settings_entry entries[] = {
00199     { SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS, SPDYLAY_ID_FLAG_SETTINGS_NONE, spdy_max_concurrent_streams },
00200     { SPDYLAY_SETTINGS_INITIAL_WINDOW_SIZE, SPDYLAY_ID_FLAG_SETTINGS_NONE, spdy_initial_window_size }
00201   };
00202   int r;
00203 
00204   if (TSIOBufferReaderAvail(this->req_reader) > 0) {
00205     spdy_process_read(TS_EVENT_VCONN_WRITE_READY, this);
00206   }
00207 
00208   this->read_vio = (TSVIO)this->vc->do_io_read(this, INT64_MAX, reinterpret_cast<MIOBuffer *>(this->req_buffer));
00209   this->write_vio = (TSVIO)this->vc->do_io_write(this, INT64_MAX, reinterpret_cast<IOBufferReader *>(this->resp_reader));
00210 
00211   SET_HANDLER(&SpdyClientSession::state_session_readwrite);
00212 
00213   r = spdylay_submit_settings(this->session, SPDYLAY_FLAG_SETTINGS_NONE, entries, countof(entries));
00214   ink_assert(r == 0);
00215 
00216   if (this->version >= spdy::SESSION_VERSION_3_1 && spdy_initial_window_size > (1 << 16)) {
00217     int32_t delta = (spdy_initial_window_size - SPDYLAY_INITIAL_WINDOW_SIZE);
00218 
00219     r = spdylay_submit_window_update(this->session, 0, delta);
00220     ink_assert(r == 0);
00221   }
00222 
00223   TSVIOReenable(this->write_vio);
00224   return EVENT_CONT;
00225 }
00226 
00227 int
00228 SpdyClientSession::state_session_readwrite(int event, void * edata)
00229 {
00230   int ret = 0;
00231   bool from_fetch = false;
00232 
00233   this->event = event;
00234 
00235   if (edata == this->read_vio) {
00236     Debug("spdy", "++++[READ EVENT]");
00237     if (event != TS_EVENT_VCONN_READ_READY &&
00238         event != TS_EVENT_VCONN_READ_COMPLETE) {
00239       ret = -1;
00240       goto out;
00241     }
00242     ret = spdy_process_read((TSEvent)event, this);
00243   } else if (edata == this->write_vio) {
00244     Debug("spdy", "----[WRITE EVENT]");
00245     if (event != TS_EVENT_VCONN_WRITE_READY &&
00246         event != TS_EVENT_VCONN_WRITE_COMPLETE) {
00247       ret = -1;
00248       goto out;
00249     }
00250     ret = spdy_process_write((TSEvent)event, this);
00251   } else {
00252     from_fetch = true;
00253     ret = spdy_process_fetch((TSEvent)event, this, edata);
00254   }
00255 
00256   Debug("spdy-event", "++++SpdyClientSession[%" PRIu64 "], EVENT:%d, ret:%d",
00257         this->sm_id, event, ret);
00258 out:
00259   if (ret) {
00260     this->clear();
00261     spdyClientSessionAllocator.free(this);
00262   } else if (!from_fetch) {
00263     this->vc->set_inactivity_timeout(HRTIME_SECONDS(spdy_no_activity_timeout_in));
00264   }
00265 
00266   return EVENT_CONT;
00267 }
00268 
00269 int64_t
00270 SpdyClientSession::getPluginId() const
00271 {
00272   return sm_id;
00273 }
00274 
00275 char const*
00276 SpdyClientSession::getPluginTag() const
00277 {
00278   return npnmap[this->version];
00279 }
00280 
00281 
00282 static int
00283 spdy_process_read(TSEvent , SpdyClientSession *sm)
00284 {
00285   return spdylay_session_recv(sm->session);
00286 }
00287 
00288 static int
00289 spdy_process_write(TSEvent , SpdyClientSession *sm)
00290 {
00291   int ret;
00292 
00293   ret = spdylay_session_send(sm->session);
00294 
00295   if (TSIOBufferReaderAvail(sm->resp_reader) > 0)
00296     TSVIOReenable(sm->write_vio);
00297   else {
00298     Debug("spdy", "----TOTAL SEND (sm_id:%" PRIu64 ", total_size:%" PRIu64 ", total_send:%" PRId64 ")",
00299           sm->sm_id, sm->total_size, TSVIONDoneGet(sm->write_vio));
00300 
00301     
00302     
00303     
00304     
00305     
00306     TSVIOReenable(sm->read_vio);
00307   }
00308 
00309   return ret;
00310 }
00311 
00312 static int
00313 spdy_process_fetch(TSEvent event, SpdyClientSession *sm, void *edata)
00314 {
00315   int ret = -1;
00316   TSFetchSM fetch_sm = (TSFetchSM)edata;
00317   SpdyRequest *req = (SpdyRequest *)TSFetchUserDataGet(fetch_sm);
00318 
00319   switch ((int)event) {
00320 
00321   case TS_FETCH_EVENT_EXT_HEAD_DONE:
00322     Debug("spdy", "----[FETCH HEADER DONE]");
00323     ret = spdy_process_fetch_header(event, sm, fetch_sm);
00324     break;
00325 
00326   case TS_FETCH_EVENT_EXT_BODY_READY:
00327     Debug("spdy", "----[FETCH BODY READY]");
00328     ret = spdy_process_fetch_body(event, sm, fetch_sm);
00329     break;
00330 
00331   case TS_FETCH_EVENT_EXT_BODY_DONE:
00332     Debug("spdy", "----[FETCH BODY DONE]");
00333     req->fetch_body_completed = true;
00334     ret = spdy_process_fetch_body(event, sm, fetch_sm);
00335     break;
00336 
00337   default:
00338     Debug("spdy", "----[FETCH ERROR]");
00339     if (req->fetch_body_completed)
00340       ret = 0; 
00341     else {
00342       Error("spdy_process_fetch fetch error, fetch_sm %p, ret %d for sm_id %" PRId64 ", stream_id %u, req time %" PRId64 ", url %s", req->fetch_sm, ret, sm->sm_id, req->stream_id, req->start_time, req->url.c_str());
00343       req->fetch_sm = NULL;
00344     }
00345     break;
00346   }
00347 
00348   if (ret) {
00349     Error("spdy_process_fetch sending STATUS_500, fetch_sm %p, ret %d for sm_id %" PRId64 ", stream_id %u, req time %" PRId64 ", url %s", req->fetch_sm, ret, sm->sm_id, req->stream_id, req->start_time, req->url.c_str());
00350     spdy_prepare_status_response_and_clean_request(sm, req->stream_id, STATUS_500);
00351     
00352     
00353     
00354   }
00355 
00356   return 0;
00357 }
00358 
00359 static int
00360 spdy_process_fetch_header(TSEvent , SpdyClientSession *sm, TSFetchSM fetch_sm)
00361 {
00362   int ret = -1;
00363   SpdyRequest *req = (SpdyRequest *)TSFetchUserDataGet(fetch_sm);
00364 
00365   SpdyNV spdy_nv(fetch_sm);
00366 
00367   Debug("spdy", "----spdylay_submit_syn_reply");
00368   if (sm->session) {
00369     ret = spdylay_submit_syn_reply(sm->session,
00370                                  SPDYLAY_CTRL_FLAG_NONE, req->stream_id,
00371                                  spdy_nv.nv);
00372   } else {
00373     Error("spdy_process_fetch_header, sm->session NULL, sm_id %" PRId64 ", fetch_sm %p, stream_id %d, req_time %" PRId64 ", url %s", sm->sm_id, fetch_sm, req->stream_id, req->start_time, req->url.c_str());
00374   }
00375 
00376   TSVIOReenable(sm->write_vio);
00377   return ret;
00378 }
00379 
00380 static ssize_t
00381 spdy_read_fetch_body_callback(spdylay_session * , int32_t stream_id,
00382                               uint8_t *buf, size_t length, int *eof,
00383                               spdylay_data_source *source, void *user_data)
00384 {
00385 
00386   static int g_call_cnt;
00387   int64_t already;
00388 
00389   SpdyClientSession *sm = (SpdyClientSession *)user_data;
00390   SpdyRequest *req = (SpdyRequest *)source->ptr;
00391 
00392   
00393   
00394   
00395   if (req != sm->find_request(stream_id)) {
00396     Debug("spdy", "    stream_id:%d, call:%d, req has been deleted, return 0",
00397           stream_id, g_call_cnt);
00398     *eof = 1;
00399     return 0;
00400   }
00401 
00402   already = TSFetchReadData(req->fetch_sm, buf, length);
00403 
00404   Debug("spdy", "    stream_id:%d, call:%d, length:%ld, already:%" PRId64,
00405         stream_id, g_call_cnt, length, already);
00406   if (is_debug_tag_set("spdy"))
00407     MD5_Update(&req->recv_md5, buf, already);
00408 
00409   TSVIOReenable(sm->write_vio);
00410   g_call_cnt++;
00411 
00412   req->fetch_data_len += already;
00413   if (already < (int64_t)length) {
00414     if (req->event == TS_FETCH_EVENT_EXT_BODY_DONE) {
00415       TSHRTime end_time = TShrtime();
00416       SPDY_SUM_THREAD_DYN_STAT(SPDY_STAT_TOTAL_TRANSACTIONS_TIME, sm->mutex->thread_holding, end_time - req->start_time);
00417       Debug("spdy", "----Request[%" PRIu64 ":%d] %s %lld %d", sm->sm_id, req->stream_id,
00418             req->url.c_str(), (end_time - req->start_time)/TS_HRTIME_MSECOND,
00419             req->fetch_data_len);
00420       unsigned char digest[MD5_DIGEST_LENGTH];
00421       if (is_debug_tag_set("spdy")) {
00422         MD5_Final(digest, &req->recv_md5);
00423         char md5_strbuf[MD5_DIGEST_LENGTH * 2 + 1];
00424         for (int i = 0; i < MD5_DIGEST_LENGTH; i++) {
00425           snprintf(md5_strbuf + (i * 2), 3 , "%02x", digest[i]);
00426         }
00427         Debug("spdy", "----recv md5sum: %s", md5_strbuf);
00428       }
00429       *eof = 1;
00430       sm->cleanup_request(stream_id);
00431     } else if (already == 0) {
00432       req->need_resume_data = true;
00433       return SPDYLAY_ERR_DEFERRED;
00434     }
00435   }
00436 
00437   return already;
00438 }
00439 
00440 static int
00441 spdy_process_fetch_body(TSEvent event, SpdyClientSession *sm, TSFetchSM fetch_sm)
00442 {
00443   int ret = 0;
00444   spdylay_data_provider data_prd;
00445   SpdyRequest *req = (SpdyRequest *)TSFetchUserDataGet(fetch_sm);
00446   req->event = event;
00447 
00448   data_prd.source.ptr = (void *)req;
00449   data_prd.read_callback = spdy_read_fetch_body_callback;
00450 
00451   if (!req->has_submitted_data) {
00452     req->has_submitted_data = true;
00453     Debug("spdy", "----spdylay_submit_data");
00454     ret = spdylay_submit_data(sm->session, req->stream_id,
00455                               SPDYLAY_DATA_FLAG_FIN, &data_prd);
00456   } else if (req->need_resume_data) {
00457     Debug("spdy", "----spdylay_session_resume_data");
00458     ret = spdylay_session_resume_data(sm->session, req->stream_id);
00459     if (ret == SPDYLAY_ERR_INVALID_ARGUMENT)
00460       ret = 0;
00461   }
00462 
00463   TSVIOReenable(sm->write_vio);
00464   return ret;
00465 }