00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "P_EventSystem.h"
00025 #include "Error.h"
00026
00027 #include "LogSock.h"
00028 #include "LogUtils.h"
00029
00030 static const int LS_SOCKTYPE = SOCK_STREAM;
00031 static const int LS_PROTOCOL = 0;
00032
00033
00034
00035
00036
00037
00038
00039
00040 LogSock::LogSock(int max_connects)
00041 :
00042 ct((ConnectTable *) NULL),
00043 m_accept_connections(false),
00044 m_max_connections(max_connects + 1)
00045 {
00046 ink_assert(m_max_connections > 0);
00047
00048
00049
00050
00051 ct = new ConnectTable[m_max_connections];
00052 ink_assert(ct != NULL);
00053 for (int i = 0; i < m_max_connections; ++i) {
00054 init_cid(i, NULL, 0, -1, LogSock::LS_STATE_UNUSED);
00055 }
00056
00057 Debug("log-sock", "LogSocket established");
00058 }
00059
00060
00061
00062
00063
00064
00065 LogSock::~LogSock()
00066 {
00067 Debug("log-sock", "shutting down LogSocket on [%s:%d]", ct[0].host, ct[0].port);
00068
00069 this->close();
00070 this->close(0);
00071 delete[]ct;
00072 }
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083 int
00084 LogSock::listen(int accept_port, int family)
00085 {
00086 IpEndpoint bind_addr;
00087 int size = sizeof(bind_addr);
00088 char this_host[MAXDNAME];
00089 int ret;
00090 ats_scoped_fd accept_sd;
00091
00092 Debug("log-sock", "Listening ...");
00093
00094
00095 bind_addr.setToAnyAddr(family);
00096 if (!bind_addr.isValid()) {
00097 Warning("Could not set up socket - invalid address family %d", family);
00098 return -1;
00099 }
00100 bind_addr.port() = htons(accept_port);
00101 size = ats_ip_size(&bind_addr.sa);
00102
00103
00104
00105
00106 accept_sd =::socket(family, LS_SOCKTYPE, LS_PROTOCOL);
00107 if (accept_sd < 0) {
00108 Warning("Could not create a socket for family %d: %s", family, strerror(errno));
00109 return -1;
00110 }
00111
00112
00113
00114
00115 if ((ret = safe_fcntl(accept_sd, F_SETFD, 1)) < 0) {
00116 Warning("Could not set option CLOSE ON EXEC on socket (%d): %s", ret, strerror(errno));
00117 return -1;
00118 }
00119
00120 struct linger l;
00121 l.l_onoff = 0;
00122 l.l_linger = 0;
00123 if ((ret = safe_setsockopt(accept_sd, SOL_SOCKET, SO_LINGER, (char *) &l, sizeof(l))) < 0) {
00124 Warning("Could not set option NO_LINGER on socket (%d): %s", ret, strerror(errno));
00125 return -1;
00126 }
00127
00128 if ((ret = safe_setsockopt(accept_sd, SOL_SOCKET, SO_REUSEADDR, SOCKOPT_ON, sizeof(int))) < 0) {
00129 Warning("Could not set option REUSEADDR on socket (%d): %s", ret, strerror(errno));
00130 return -1;
00131 }
00132
00133
00134 if ((ret = safe_bind(accept_sd, &bind_addr.sa, size)) < 0) {
00135 Warning("Could not bind port: %s", strerror(errno));
00136 return -1;
00137 }
00138
00139 if ((ret = safe_setsockopt(accept_sd, IPPROTO_TCP, TCP_NODELAY, SOCKOPT_ON, sizeof(int))) < 0) {
00140 Warning("Could not set option TCP_NODELAY on socket (%d): %s", ret, strerror(errno));
00141 return -1;
00142 }
00143
00144 if ((ret = safe_setsockopt(accept_sd, SOL_SOCKET, SO_KEEPALIVE, SOCKOPT_ON, sizeof(int))) < 0) {
00145 Warning("Could not set option SO_KEEPALIVE on socket (%d): %s", ret, strerror(errno));
00146 return -1;
00147 }
00148
00149
00150
00151
00152
00153
00154 if (accept_port == 0) {
00155 ret = safe_getsockname(accept_sd, &bind_addr.sa, &size);
00156 if (ret == 0) {
00157 accept_port = ntohs(bind_addr.port());
00158 }
00159 }
00160
00161
00162
00163 if ((ret = safe_listen(accept_sd, m_max_connections)) < 0) {
00164 Warning("Could not establish listen queue: %s", strerror(errno));
00165 return -1;
00166 }
00167
00168
00169
00170
00171 if (gethostname(&this_host[0], MAXDNAME) != 0) {
00172 snprintf(this_host, sizeof(this_host), "unknown-host");
00173 }
00174 init_cid(0, this_host, accept_port, accept_sd, LogSock::LS_STATE_INCOMING);
00175
00176 m_accept_connections = true;
00177 Debug("log-sock", "LogSocket established on [%s:%d]", this_host, accept_port);
00178
00179 accept_sd.release();
00180 return 0;
00181 }
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191 int
00192 LogSock::accept()
00193 {
00194 int cid, connect_sd;
00195 IpEndpoint connect_addr;
00196 socklen_t size = sizeof(connect_addr);
00197 in_port_t connect_port;
00198
00199 if (!m_accept_connections || ct[0].sd < 0) {
00200 return LogSock::LS_ERROR_NO_CONNECTION;
00201 }
00202
00203 cid = new_cid();
00204 if (cid < 0) {
00205 return LogSock::LS_ERROR_CONNECT_TABLE_FULL;
00206 }
00207
00208 Debug("log-sock", "waiting to accept a new connection");
00209
00210 connect_sd =::accept(ct[0].sd, &connect_addr.sa, &size);
00211 if (connect_sd < 0) {
00212 return LogSock::LS_ERROR_ACCEPT;
00213 }
00214 connect_port = ntohs(connect_addr.port());
00215
00216 init_cid(cid, NULL, connect_port, connect_sd, LogSock::LS_STATE_INCOMING);
00217
00218 Debug("log-sock", "new connection accepted, cid = %d, port = %d", cid, connect_port);
00219
00220 return cid;
00221 }
00222
00223
00224
00225
00226
00227
00228
00229 int
00230 LogSock::connect(sockaddr const* ip)
00231 {
00232 int cid, ret;
00233 ats_scoped_fd connect_sd;
00234 uint16_t port;
00235
00236 if (!ats_is_ip(ip)) {
00237 Note("Invalid host IP or port number for connection");
00238 return LogSock::LS_ERROR_NO_SUCH_HOST;
00239 }
00240 port = ntohs(ats_ip_port_cast(ip));
00241
00242 ip_port_text_buffer ipstr;
00243 Debug("log-sock", "connecting to [%s:%d]", ats_ip_nptop(ip, ipstr, sizeof(ipstr)), port);
00244
00245
00246 cid = new_cid();
00247 if (cid < 0) {
00248 Note("No more connections allowed for this socket");
00249 return LogSock::LS_ERROR_CONNECT_TABLE_FULL;
00250 }
00251
00252 connect_sd =::socket(ip->sa_family, LS_SOCKTYPE, LS_PROTOCOL);
00253 if (connect_sd < 0) {
00254 Note("Error initializing socket for connection: %d", static_cast<int>(connect_sd));
00255 return LogSock::LS_ERROR_SOCKET;
00256 }
00257
00258 if ((ret = safe_setsockopt(connect_sd, IPPROTO_TCP, TCP_NODELAY, SOCKOPT_ON, sizeof(int))) < 0) {
00259 Note("Could not set option TCP_NODELAY on socket (%d): %s", ret, strerror(errno));
00260 return -1;
00261 }
00262
00263 if ((ret = safe_setsockopt(connect_sd, SOL_SOCKET, SO_KEEPALIVE, SOCKOPT_ON, sizeof(int))) < 0) {
00264 Note("Could not set option SO_KEEPALIVE on socket (%d): %s", ret, strerror(errno));
00265 return -1;
00266 }
00267
00268
00269 if (::connect(connect_sd, ip, ats_ip_size(ip)) != 0) {
00270 Note("Failure to connect");
00271 return LogSock::LS_ERROR_CONNECT;
00272 }
00273
00274 init_cid(cid, ipstr, port, connect_sd, LogSock::LS_STATE_OUTGOING);
00275
00276 Debug("log-sock", "outgoing connection to [%s:%d] established, fd = %d", ipstr, port, cid);
00277
00278 connect_sd.release();
00279 return cid;
00280 }
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290 bool LogSock::pending_data(int *cid, int timeout_msec, bool include_connects)
00291 {
00292 int
00293 start_index,
00294 ret,
00295 n_poll_fds,
00296 i;
00297 static struct pollfd
00298 fds[LS_CONST_CLUSTER_MAX_MACHINES];
00299 int
00300 fd_to_cid[LS_CONST_CLUSTER_MAX_MACHINES];
00301
00302 ink_assert(m_max_connections <= (LS_CONST_CLUSTER_MAX_MACHINES + 1));
00303 ink_assert(cid != NULL);
00304 ink_assert(timeout_msec >= 0);
00305
00306
00307
00308
00309
00310
00311
00312
00313 if (*cid >= 0) {
00314
00315 ink_assert(*cid < m_max_connections);
00316 fds[0].fd = ct[*cid].sd;
00317 fds[0].events = POLLIN;
00318 fds[0].revents = 0;
00319 fd_to_cid[0] = *cid;
00320 n_poll_fds = 1;
00321
00322 } else {
00323
00324 if (include_connects) {
00325 start_index = 0;
00326 } else {
00327 start_index = 1;
00328 }
00329 n_poll_fds = 0;
00330 for (i = start_index; i < m_max_connections; i++) {
00331 if (ct[i].state == LogSock::LS_STATE_INCOMING) {
00332 fds[n_poll_fds].fd = ct[i].sd;
00333 fds[n_poll_fds].events = POLLIN;
00334 fds[n_poll_fds].revents = 0;
00335 fd_to_cid[n_poll_fds] = i;
00336 n_poll_fds++;
00337 }
00338 }
00339 }
00340
00341 if (n_poll_fds == 0) {
00342 return false;
00343 }
00344
00345 ret =::poll(fds, n_poll_fds, timeout_msec);
00346
00347 if (ret == 0) {
00348 return false;
00349 } else if (ret < 0) {
00350 Debug("log-sock", "error on poll");
00351 return false;
00352 }
00353
00354
00355
00356
00357
00358
00359 for (i = 0; i < n_poll_fds; i++) {
00360 if (fds[i].revents & POLLIN) {
00361 *cid = fd_to_cid[i];
00362 Debug("log-sock", "poll successful on index %d", *cid);
00363 return true;
00364 }
00365 }
00366
00367 Debug("log-sock", "invalid revents in the poll table");
00368 return false;
00369 }
00370
00371
00372
00373
00374
00375
00376 bool LogSock::pending_any(int *cid, int timeout_msec)
00377 {
00378 ink_assert(cid != NULL);
00379 *cid = -1;
00380 if (m_accept_connections) {
00381 return pending_data(cid, timeout_msec, true);
00382 } else {
00383 return pending_data(cid, timeout_msec, false);
00384 }
00385 }
00386
00387
00388
00389
00390
00391
00392
00393
00394 bool LogSock::pending_message_any(int *cid, int timeout_msec)
00395 {
00396 ink_assert(cid != NULL);
00397 *cid = -1;
00398 return pending_data(cid, timeout_msec, false);
00399 }
00400
00401
00402
00403
00404
00405
00406 bool LogSock::pending_message_on(int cid, int timeout_msec)
00407 {
00408 return pending_data(&cid, timeout_msec, false);
00409 }
00410
00411
00412
00413
00414
00415
00416
00417 bool LogSock::pending_connect(int timeout_msec)
00418 {
00419 int
00420 cid = 0;
00421 if (m_accept_connections) {
00422 return pending_data(&cid, timeout_msec, true);
00423 } else {
00424 return false;
00425 }
00426 }
00427
00428
00429
00430
00431
00432
00433
00434 void
00435 LogSock::close(int cid)
00436 {
00437 ink_assert(cid >= 0 && cid < m_max_connections);
00438
00439 Debug("log-sock", "closing connection for cid %d", cid);
00440
00441 if (ct[cid].state != LogSock::LS_STATE_UNUSED) {
00442 ::close(ct[cid].sd);
00443 delete ct[cid].host;
00444 ct[cid].state = LogSock::LS_STATE_UNUSED;
00445 }
00446 }
00447
00448 void
00449 LogSock::close()
00450 {
00451 for (int i = 1; i < m_max_connections; i++) {
00452 this->close(i);
00453 }
00454 }
00455
00456
00457
00458
00459
00460
00461
00462 int
00463 LogSock::write(int cid, void *buf, int bytes)
00464 {
00465 LogSock::MsgHeader header = {
00466 0};
00467 header.msg_bytes = 0;
00468 int ret;
00469
00470 ink_assert(cid >= 0 && cid < m_max_connections);
00471
00472 if (buf == NULL || bytes == 0) {
00473 return 0;
00474 }
00475
00476 if (ct[cid].state != LogSock::LS_STATE_OUTGOING) {
00477 return LogSock::LS_ERROR_STATE;
00478 }
00479
00480 Debug("log-sock", "Sending %d bytes to cid %d", bytes, cid);
00481
00482
00483
00484
00485 Debug("log-sock", " sending header (%zu bytes)", sizeof(LogSock::MsgHeader));
00486 header.msg_bytes = bytes;
00487 ret =::send(ct[cid].sd, (char *) &header, sizeof(LogSock::MsgHeader), 0);
00488 if (ret != sizeof(LogSock::MsgHeader)) {
00489 return LogSock::LS_ERROR_WRITE;
00490 }
00491
00492
00493
00494 Debug("log-sock", " sending data (%d bytes)", bytes);
00495 return::send(ct[cid].sd, (char *) buf, bytes, 0);
00496 }
00497
00498
00499
00500
00501
00502
00503
00504
00505 int
00506 LogSock::read(int cid, void *buf, unsigned maxsize)
00507 {
00508 LogSock::MsgHeader header;
00509 unsigned size;
00510
00511 ink_assert(cid >= 0 && cid < m_max_connections);
00512 ink_assert(buf != NULL);
00513
00514 if (ct[cid].state != LogSock::LS_STATE_INCOMING) {
00515 return LogSock::LS_ERROR_STATE;
00516 }
00517
00518 Debug("log-sock", "reading data from cid %d", cid);
00519
00520 if (read_header(ct[cid].sd, &header) < 0) {
00521 return LogSock::LS_ERROR_READ;
00522 }
00523
00524 size = ((unsigned) header.msg_bytes < maxsize) ? (unsigned) header.msg_bytes : maxsize;
00525 return read_body(ct[cid].sd, buf, size);
00526 }
00527
00528
00529
00530
00531
00532
00533
00534
00535
00536 void *
00537 LogSock::read_alloc(int cid, int *size)
00538 {
00539 LogSock::MsgHeader header;
00540 char *data;
00541
00542 ink_assert(cid >= 0 && cid < m_max_connections);
00543
00544 if (ct[cid].state != LogSock::LS_STATE_INCOMING) {
00545 return NULL;
00546 }
00547
00548 Debug("log-sock", "reading data from cid %d", cid);
00549
00550 if (read_header(ct[cid].sd, &header) < 0) {
00551 return NULL;
00552 }
00553
00554 data = new char[header.msg_bytes];
00555 ink_assert(data != NULL);
00556
00557 if ((*size = read_body(ct[cid].sd, data, header.msg_bytes)) < 0) {
00558 delete[] data;
00559 data = NULL;
00560 }
00561
00562 return data;
00563 }
00564
00565
00566
00567 bool LogSock::is_connected(int cid, bool ping) const
00568 {
00569 int
00570 i,
00571 j,
00572 flags;
00573
00574 ink_assert(cid >= 0 && cid < m_max_connections);
00575
00576 if (ct[cid].state == LogSock::LS_STATE_UNUSED) {
00577 return false;
00578 }
00579
00580 if (ping) {
00581 flags = fcntl(ct[cid].sd, F_GETFL);
00582 ::fcntl(ct[cid].sd, F_SETFL, O_NONBLOCK);
00583 j =::recv(ct[cid].sd, (char *) &i, sizeof(int), MSG_PEEK);
00584 ::fcntl(ct[cid].sd, F_SETFL, flags);
00585 if (j != 0) {
00586 return true;
00587 } else {
00588 return false;
00589 }
00590 } else {
00591 return (ct[cid].sd >= 0);
00592 }
00593 }
00594
00595
00596
00597 void
00598 LogSock::check_connections()
00599 {
00600 for (int i = 1; i < m_max_connections; i++) {
00601 if (ct[i].state == LogSock::LS_STATE_INCOMING) {
00602 if (!is_connected(i, true)) {
00603 Debug("log-sock", "Connection %d is no longer connected", i);
00604 close(i);
00605 }
00606 }
00607 }
00608 }
00609
00610
00611
00612
00613
00614
00615 bool LogSock::authorized_client(int cid, char *key)
00616 {
00617
00618
00619
00620 if (!pending_message_on(cid, 5000)) {
00621 return false;
00622 }
00623
00624
00625
00626
00627 char
00628 buf[1024];
00629 int
00630 size = this->read(cid, buf, 1024);
00631 ink_assert(size >= 0 && size <= 1024);
00632
00633 if (strncmp(buf, key, size) == 0) {
00634 return true;
00635 }
00636
00637 return false;
00638 }
00639
00640
00641
00642 char *
00643 LogSock::connected_host(int cid)
00644 {
00645 ink_assert(cid >= 0 && cid < m_max_connections);
00646 return ct[cid].host;
00647 }
00648
00649
00650
00651 int
00652 LogSock::connected_port(int cid)
00653 {
00654 ink_assert(cid >= 0 && cid < m_max_connections);
00655 return ct[cid].port;
00656 }
00657
00658
00659
00660
00661
00662
00663
00664
00665 int
00666 LogSock::new_cid()
00667 {
00668 int cid = -1;
00669
00670 for (int i = 1; i < m_max_connections; i++) {
00671 if (ct[i].state == LogSock::LS_STATE_UNUSED) {
00672 cid = i;
00673 break;
00674 }
00675 }
00676
00677 return cid;
00678 }
00679
00680
00681
00682
00683 void
00684 LogSock::init_cid(int cid, char *host, int port, int sd, LogSock::State state)
00685 {
00686 ink_assert(cid >= 0 && cid < m_max_connections);
00687
00688 ink_assert(port >= 0);
00689
00690 ink_assert(state >= 0 && state < LogSock::LS_N_STATES);
00691
00692 if (host != NULL) {
00693 const size_t host_size = strlen(host) + 1;
00694 ct[cid].host = new char[host_size];
00695 ink_strlcpy(ct[cid].host, host, host_size);
00696 } else {
00697 ct[cid].host = NULL;
00698 }
00699
00700 ct[cid].port = port;
00701 ct[cid].sd = sd;
00702 ct[cid].state = state;
00703 }
00704
00705
00706
00707 int
00708 LogSock::read_header(int sd, LogSock::MsgHeader * header)
00709 {
00710 ink_assert(sd >= 0);
00711 ink_assert(header != NULL);
00712
00713 int bytes =::recv(sd, (char *) header, sizeof(LogSock::MsgHeader), 0);
00714 if (bytes != sizeof(LogSock::MsgHeader)) {
00715 return -1;
00716 }
00717
00718 return bytes;
00719 }
00720
00721
00722
00723 int
00724 LogSock::read_body(int sd, void *buf, int bytes)
00725 {
00726 ink_assert(sd >= 0);
00727 ink_assert(buf != NULL);
00728 ink_assert(bytes >= 0);
00729
00730 if (bytes == 0) {
00731 return 0;
00732 }
00733
00734 unsigned bytes_left = bytes;
00735 unsigned bytes_read;
00736 char *to = (char *) buf;
00737
00738 while (bytes_left) {
00739 bytes_read =::recv(sd, to, bytes_left, 0);
00740 to += bytes_read;
00741 bytes_left -= bytes_read;
00742 }
00743
00744 return bytes;
00745 }