00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "SpdyClientSession.h"
00025 #include "I_Net.h"
00026
00027 static ClassAllocator<SpdyClientSession> spdyClientSessionAllocator("spdyClientSessionAllocator");
00028 ClassAllocator<SpdyRequest> spdyRequestAllocator("spdyRequestAllocator");
00029
00030 #if TS_HAS_SPDY
00031 #include "SpdyClientSession.h"
00032
00033 static const spdylay_proto_version versmap[] = {
00034 SPDYLAY_PROTO_SPDY2,
00035 SPDYLAY_PROTO_SPDY3,
00036 SPDYLAY_PROTO_SPDY3_1,
00037 };
00038
00039 static char const* const npnmap[] = {
00040 TS_NPN_PROTOCOL_SPDY_2,
00041 TS_NPN_PROTOCOL_SPDY_3,
00042 TS_NPN_PROTOCOL_SPDY_3_1
00043 };
00044
00045 #endif
00046 static int spdy_process_read(TSEvent event, SpdyClientSession *sm);
00047 static int spdy_process_write(TSEvent event, SpdyClientSession *sm);
00048 static int spdy_process_fetch(TSEvent event, SpdyClientSession *sm, void *edata);
00049 static int spdy_process_fetch_header(TSEvent event, SpdyClientSession *sm, TSFetchSM fetch_sm);
00050 static int spdy_process_fetch_body(TSEvent event, SpdyClientSession *sm, TSFetchSM fetch_sm);
00051 static uint64_t g_sm_id = 1;
00052
00053 void
00054 SpdyRequest::init(SpdyClientSession *sm, int id)
00055 {
00056 spdy_sm = sm;
00057 stream_id = id;
00058 headers.clear();
00059
00060 MD5_Init(&recv_md5);
00061 start_time = TShrtime();
00062
00063 SPDY_INCREMENT_THREAD_DYN_STAT(SPDY_STAT_CURRENT_CLIENT_STREAM_COUNT, sm->mutex->thread_holding);
00064 }
00065
00066 void
00067 SpdyRequest::clear()
00068 {
00069 SPDY_DECREMENT_THREAD_DYN_STAT(SPDY_STAT_CURRENT_CLIENT_STREAM_COUNT, spdy_sm->mutex->thread_holding);
00070
00071 if (fetch_sm) {
00072 TSFetchDestroy(fetch_sm);
00073 fetch_sm = NULL;
00074 }
00075
00076 vector<pair<string, string> >().swap(headers);
00077
00078 std::string().swap(url);
00079 std::string().swap(host);
00080 std::string().swap(path);
00081 std::string().swap(scheme);
00082 std::string().swap(method);
00083 std::string().swap(version);
00084
00085 Debug("spdy", "****Delete Request[%" PRIu64 ":%d]", spdy_sm->sm_id, stream_id);
00086 }
00087
00088 void
00089 SpdyClientSession::init(NetVConnection * netvc, spdy::SessionVersion vers)
00090 {
00091 int r;
00092
00093 this->mutex = new_ProxyMutex();
00094 this->vc = netvc;
00095 this->req_map.clear();
00096 this->version = vers;
00097
00098 r = spdylay_session_server_new(&session, versmap[vers], &spdy_callbacks, this);
00099
00100
00101
00102
00103
00104 SPDY_INCREMENT_THREAD_DYN_STAT(SPDY_STAT_CURRENT_CLIENT_SESSION_COUNT, netvc->mutex->thread_holding);
00105 SPDY_INCREMENT_THREAD_DYN_STAT(SPDY_STAT_TOTAL_CLIENT_CONNECTION_COUNT, netvc->mutex->thread_holding);
00106
00107 ink_release_assert(r == 0);
00108 sm_id = atomic_inc(g_sm_id);
00109 total_size = 0;
00110 start_time = TShrtime();
00111
00112 this->vc->set_inactivity_timeout(HRTIME_SECONDS(spdy_accept_no_activity_timeout));
00113 SET_HANDLER(&SpdyClientSession::state_session_start);
00114
00115 }
00116
00117 void
00118 SpdyClientSession::clear()
00119 {
00120 int last_event = event;
00121
00122 SPDY_DECREMENT_THREAD_DYN_STAT(SPDY_STAT_CURRENT_CLIENT_SESSION_COUNT, this->mutex->thread_holding);
00123
00124
00125
00126
00127
00128 map<int, SpdyRequest*>::iterator iter = req_map.begin();
00129 map<int, SpdyRequest*>::iterator endIter = req_map.end();
00130 for(; iter != endIter; ++iter) {
00131 SpdyRequest *req = iter->second;
00132 if (req) {
00133 req->clear();
00134 spdyRequestAllocator.free(req);
00135 } else {
00136 Error("req null in SpdSM::clear");
00137 }
00138 }
00139 req_map.clear();
00140
00141 this->mutex = NULL;
00142
00143 if (vc) {
00144 TSVConnClose(reinterpret_cast<TSVConn>(vc));
00145 vc = NULL;
00146 }
00147
00148
00149 if (req_reader) {
00150 TSIOBufferReaderFree(req_reader);
00151 req_reader = NULL;
00152 }
00153
00154 if (req_buffer) {
00155 TSIOBufferDestroy(req_buffer);
00156 req_buffer = NULL;
00157 }
00158
00159 if (resp_reader) {
00160 TSIOBufferReaderFree(resp_reader);
00161 resp_reader = NULL;
00162 }
00163
00164 if (resp_buffer) {
00165 TSIOBufferDestroy(resp_buffer);
00166 resp_buffer = NULL;
00167 }
00168
00169 if (session) {
00170 spdylay_session_del(session);
00171 session = NULL;
00172 }
00173
00174 Debug("spdy-free", "****Delete SpdyClientSession[%" PRIu64 "], last event:%d" PRIu64,
00175 sm_id, last_event);
00176 }
00177
00178 void
00179 spdy_cs_create(NetVConnection * netvc, spdy::SessionVersion vers, MIOBuffer * iobuf, IOBufferReader * reader)
00180 {
00181 SpdyClientSession *sm;
00182
00183 sm = spdyClientSessionAllocator.alloc();
00184 sm->init(netvc, vers);
00185
00186 sm->req_buffer = iobuf ? reinterpret_cast<TSIOBuffer>(iobuf) : TSIOBufferCreate();
00187 sm->req_reader = reader ? reinterpret_cast<TSIOBufferReader>(reader) : TSIOBufferReaderAlloc(sm->req_buffer);
00188
00189 sm->resp_buffer = TSIOBufferCreate();
00190 sm->resp_reader = TSIOBufferReaderAlloc(sm->resp_buffer);
00191
00192 eventProcessor.schedule_imm(sm, ET_NET);
00193 }
00194
00195 int
00196 SpdyClientSession::state_session_start(int , void * )
00197 {
00198 const spdylay_settings_entry entries[] = {
00199 { SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS, SPDYLAY_ID_FLAG_SETTINGS_NONE, spdy_max_concurrent_streams },
00200 { SPDYLAY_SETTINGS_INITIAL_WINDOW_SIZE, SPDYLAY_ID_FLAG_SETTINGS_NONE, spdy_initial_window_size }
00201 };
00202 int r;
00203
00204 if (TSIOBufferReaderAvail(this->req_reader) > 0) {
00205 spdy_process_read(TS_EVENT_VCONN_WRITE_READY, this);
00206 }
00207
00208 this->read_vio = (TSVIO)this->vc->do_io_read(this, INT64_MAX, reinterpret_cast<MIOBuffer *>(this->req_buffer));
00209 this->write_vio = (TSVIO)this->vc->do_io_write(this, INT64_MAX, reinterpret_cast<IOBufferReader *>(this->resp_reader));
00210
00211 SET_HANDLER(&SpdyClientSession::state_session_readwrite);
00212
00213 r = spdylay_submit_settings(this->session, SPDYLAY_FLAG_SETTINGS_NONE, entries, countof(entries));
00214 ink_assert(r == 0);
00215
00216 if (this->version >= spdy::SESSION_VERSION_3_1 && spdy_initial_window_size > (1 << 16)) {
00217 int32_t delta = (spdy_initial_window_size - SPDYLAY_INITIAL_WINDOW_SIZE);
00218
00219 r = spdylay_submit_window_update(this->session, 0, delta);
00220 ink_assert(r == 0);
00221 }
00222
00223 TSVIOReenable(this->write_vio);
00224 return EVENT_CONT;
00225 }
00226
00227 int
00228 SpdyClientSession::state_session_readwrite(int event, void * edata)
00229 {
00230 int ret = 0;
00231 bool from_fetch = false;
00232
00233 this->event = event;
00234
00235 if (edata == this->read_vio) {
00236 Debug("spdy", "++++[READ EVENT]");
00237 if (event != TS_EVENT_VCONN_READ_READY &&
00238 event != TS_EVENT_VCONN_READ_COMPLETE) {
00239 ret = -1;
00240 goto out;
00241 }
00242 ret = spdy_process_read((TSEvent)event, this);
00243 } else if (edata == this->write_vio) {
00244 Debug("spdy", "----[WRITE EVENT]");
00245 if (event != TS_EVENT_VCONN_WRITE_READY &&
00246 event != TS_EVENT_VCONN_WRITE_COMPLETE) {
00247 ret = -1;
00248 goto out;
00249 }
00250 ret = spdy_process_write((TSEvent)event, this);
00251 } else {
00252 from_fetch = true;
00253 ret = spdy_process_fetch((TSEvent)event, this, edata);
00254 }
00255
00256 Debug("spdy-event", "++++SpdyClientSession[%" PRIu64 "], EVENT:%d, ret:%d",
00257 this->sm_id, event, ret);
00258 out:
00259 if (ret) {
00260 this->clear();
00261 spdyClientSessionAllocator.free(this);
00262 } else if (!from_fetch) {
00263 this->vc->set_inactivity_timeout(HRTIME_SECONDS(spdy_no_activity_timeout_in));
00264 }
00265
00266 return EVENT_CONT;
00267 }
00268
00269 int64_t
00270 SpdyClientSession::getPluginId() const
00271 {
00272 return sm_id;
00273 }
00274
00275 char const*
00276 SpdyClientSession::getPluginTag() const
00277 {
00278 return npnmap[this->version];
00279 }
00280
00281
00282 static int
00283 spdy_process_read(TSEvent , SpdyClientSession *sm)
00284 {
00285 return spdylay_session_recv(sm->session);
00286 }
00287
00288 static int
00289 spdy_process_write(TSEvent , SpdyClientSession *sm)
00290 {
00291 int ret;
00292
00293 ret = spdylay_session_send(sm->session);
00294
00295 if (TSIOBufferReaderAvail(sm->resp_reader) > 0)
00296 TSVIOReenable(sm->write_vio);
00297 else {
00298 Debug("spdy", "----TOTAL SEND (sm_id:%" PRIu64 ", total_size:%" PRIu64 ", total_send:%" PRId64 ")",
00299 sm->sm_id, sm->total_size, TSVIONDoneGet(sm->write_vio));
00300
00301
00302
00303
00304
00305
00306 TSVIOReenable(sm->read_vio);
00307 }
00308
00309 return ret;
00310 }
00311
00312 static int
00313 spdy_process_fetch(TSEvent event, SpdyClientSession *sm, void *edata)
00314 {
00315 int ret = -1;
00316 TSFetchSM fetch_sm = (TSFetchSM)edata;
00317 SpdyRequest *req = (SpdyRequest *)TSFetchUserDataGet(fetch_sm);
00318
00319 switch ((int)event) {
00320
00321 case TS_FETCH_EVENT_EXT_HEAD_DONE:
00322 Debug("spdy", "----[FETCH HEADER DONE]");
00323 ret = spdy_process_fetch_header(event, sm, fetch_sm);
00324 break;
00325
00326 case TS_FETCH_EVENT_EXT_BODY_READY:
00327 Debug("spdy", "----[FETCH BODY READY]");
00328 ret = spdy_process_fetch_body(event, sm, fetch_sm);
00329 break;
00330
00331 case TS_FETCH_EVENT_EXT_BODY_DONE:
00332 Debug("spdy", "----[FETCH BODY DONE]");
00333 req->fetch_body_completed = true;
00334 ret = spdy_process_fetch_body(event, sm, fetch_sm);
00335 break;
00336
00337 default:
00338 Debug("spdy", "----[FETCH ERROR]");
00339 if (req->fetch_body_completed)
00340 ret = 0;
00341 else {
00342 Error("spdy_process_fetch fetch error, fetch_sm %p, ret %d for sm_id %" PRId64 ", stream_id %u, req time %" PRId64 ", url %s", req->fetch_sm, ret, sm->sm_id, req->stream_id, req->start_time, req->url.c_str());
00343 req->fetch_sm = NULL;
00344 }
00345 break;
00346 }
00347
00348 if (ret) {
00349 Error("spdy_process_fetch sending STATUS_500, fetch_sm %p, ret %d for sm_id %" PRId64 ", stream_id %u, req time %" PRId64 ", url %s", req->fetch_sm, ret, sm->sm_id, req->stream_id, req->start_time, req->url.c_str());
00350 spdy_prepare_status_response_and_clean_request(sm, req->stream_id, STATUS_500);
00351
00352
00353
00354 }
00355
00356 return 0;
00357 }
00358
00359 static int
00360 spdy_process_fetch_header(TSEvent , SpdyClientSession *sm, TSFetchSM fetch_sm)
00361 {
00362 int ret = -1;
00363 SpdyRequest *req = (SpdyRequest *)TSFetchUserDataGet(fetch_sm);
00364
00365 SpdyNV spdy_nv(fetch_sm);
00366
00367 Debug("spdy", "----spdylay_submit_syn_reply");
00368 if (sm->session) {
00369 ret = spdylay_submit_syn_reply(sm->session,
00370 SPDYLAY_CTRL_FLAG_NONE, req->stream_id,
00371 spdy_nv.nv);
00372 } else {
00373 Error("spdy_process_fetch_header, sm->session NULL, sm_id %" PRId64 ", fetch_sm %p, stream_id %d, req_time %" PRId64 ", url %s", sm->sm_id, fetch_sm, req->stream_id, req->start_time, req->url.c_str());
00374 }
00375
00376 TSVIOReenable(sm->write_vio);
00377 return ret;
00378 }
00379
00380 static ssize_t
00381 spdy_read_fetch_body_callback(spdylay_session * , int32_t stream_id,
00382 uint8_t *buf, size_t length, int *eof,
00383 spdylay_data_source *source, void *user_data)
00384 {
00385
00386 static int g_call_cnt;
00387 int64_t already;
00388
00389 SpdyClientSession *sm = (SpdyClientSession *)user_data;
00390 SpdyRequest *req = (SpdyRequest *)source->ptr;
00391
00392
00393
00394
00395 if (req != sm->find_request(stream_id)) {
00396 Debug("spdy", " stream_id:%d, call:%d, req has been deleted, return 0",
00397 stream_id, g_call_cnt);
00398 *eof = 1;
00399 return 0;
00400 }
00401
00402 already = TSFetchReadData(req->fetch_sm, buf, length);
00403
00404 Debug("spdy", " stream_id:%d, call:%d, length:%ld, already:%" PRId64,
00405 stream_id, g_call_cnt, length, already);
00406 if (is_debug_tag_set("spdy"))
00407 MD5_Update(&req->recv_md5, buf, already);
00408
00409 TSVIOReenable(sm->write_vio);
00410 g_call_cnt++;
00411
00412 req->fetch_data_len += already;
00413 if (already < (int64_t)length) {
00414 if (req->event == TS_FETCH_EVENT_EXT_BODY_DONE) {
00415 TSHRTime end_time = TShrtime();
00416 SPDY_SUM_THREAD_DYN_STAT(SPDY_STAT_TOTAL_TRANSACTIONS_TIME, sm->mutex->thread_holding, end_time - req->start_time);
00417 Debug("spdy", "----Request[%" PRIu64 ":%d] %s %lld %d", sm->sm_id, req->stream_id,
00418 req->url.c_str(), (end_time - req->start_time)/TS_HRTIME_MSECOND,
00419 req->fetch_data_len);
00420 unsigned char digest[MD5_DIGEST_LENGTH];
00421 if (is_debug_tag_set("spdy")) {
00422 MD5_Final(digest, &req->recv_md5);
00423 char md5_strbuf[MD5_DIGEST_LENGTH * 2 + 1];
00424 for (int i = 0; i < MD5_DIGEST_LENGTH; i++) {
00425 snprintf(md5_strbuf + (i * 2), 3 , "%02x", digest[i]);
00426 }
00427 Debug("spdy", "----recv md5sum: %s", md5_strbuf);
00428 }
00429 *eof = 1;
00430 sm->cleanup_request(stream_id);
00431 } else if (already == 0) {
00432 req->need_resume_data = true;
00433 return SPDYLAY_ERR_DEFERRED;
00434 }
00435 }
00436
00437 return already;
00438 }
00439
00440 static int
00441 spdy_process_fetch_body(TSEvent event, SpdyClientSession *sm, TSFetchSM fetch_sm)
00442 {
00443 int ret = 0;
00444 spdylay_data_provider data_prd;
00445 SpdyRequest *req = (SpdyRequest *)TSFetchUserDataGet(fetch_sm);
00446 req->event = event;
00447
00448 data_prd.source.ptr = (void *)req;
00449 data_prd.read_callback = spdy_read_fetch_body_callback;
00450
00451 if (!req->has_submitted_data) {
00452 req->has_submitted_data = true;
00453 Debug("spdy", "----spdylay_submit_data");
00454 ret = spdylay_submit_data(sm->session, req->stream_id,
00455 SPDYLAY_DATA_FLAG_FIN, &data_prd);
00456 } else if (req->need_resume_data) {
00457 Debug("spdy", "----spdylay_session_resume_data");
00458 ret = spdylay_session_resume_data(sm->session, req->stream_id);
00459 if (ret == SPDYLAY_ERR_INVALID_ARGUMENT)
00460 ret = 0;
00461 }
00462
00463 TSVIOReenable(sm->write_vio);
00464 return ret;
00465 }