00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 #include "P_EventSystem.h"
00025 #include "Error.h"
00026 
00027 #include "LogSock.h"
00028 #include "LogUtils.h"
00029 
00030 static const int LS_SOCKTYPE = SOCK_STREAM;
00031 static const int LS_PROTOCOL = 0;
00032 
00033 
00034 
00035 
00036 
00037 
00038 
00039  
00040 LogSock::LogSock(int max_connects)
00041   :
00042 ct((ConnectTable *) NULL),
00043 m_accept_connections(false),
00044 m_max_connections(max_connects + 1)
00045 {
00046   ink_assert(m_max_connections > 0);
00047 
00048   
00049   
00050   
00051   ct = new ConnectTable[m_max_connections];
00052   ink_assert(ct != NULL);
00053   for (int i = 0; i < m_max_connections; ++i) {
00054     init_cid(i, NULL, 0, -1, LogSock::LS_STATE_UNUSED);
00055   }
00056 
00057   Debug("log-sock", "LogSocket established");
00058 }
00059 
00060 
00061 
00062 
00063 
00064  
00065 LogSock::~LogSock()
00066 {
00067   Debug("log-sock", "shutting down LogSocket on [%s:%d]", ct[0].host, ct[0].port);
00068 
00069   this->close();                
00070   this->close(0);               
00071   delete[]ct;                   
00072 }
00073 
00074 
00075 
00076 
00077 
00078 
00079 
00080 
00081 
00082 
00083 int
00084 LogSock::listen(int accept_port, int family)
00085 {
00086   IpEndpoint bind_addr;
00087   int size = sizeof(bind_addr);
00088   char this_host[MAXDNAME];
00089   int ret;
00090   ats_scoped_fd accept_sd;
00091 
00092   Debug("log-sock", "Listening ...");
00093 
00094   
00095   bind_addr.setToAnyAddr(family);
00096   if (!bind_addr.isValid()) {
00097     Warning("Could not set up socket - invalid address family %d", family);
00098     return -1;
00099   }
00100   bind_addr.port() = htons(accept_port);
00101   size = ats_ip_size(&bind_addr.sa);
00102 
00103   
00104   
00105   
00106   accept_sd =::socket(family, LS_SOCKTYPE, LS_PROTOCOL);
00107   if (accept_sd < 0) {
00108     Warning("Could not create a socket for family %d: %s", family, strerror(errno));
00109     return -1;
00110   }
00111   
00112   
00113   
00114   
00115   if ((ret = safe_fcntl(accept_sd, F_SETFD, 1)) < 0) {
00116     Warning("Could not set option CLOSE ON EXEC on socket (%d): %s", ret, strerror(errno));
00117     return -1;
00118   }
00119   
00120   struct linger l;
00121   l.l_onoff = 0;
00122   l.l_linger = 0;
00123   if ((ret = safe_setsockopt(accept_sd, SOL_SOCKET, SO_LINGER, (char *) &l, sizeof(l))) < 0) {
00124     Warning("Could not set option NO_LINGER on socket (%d): %s", ret, strerror(errno));
00125     return -1;
00126   }
00127   
00128   if ((ret = safe_setsockopt(accept_sd, SOL_SOCKET, SO_REUSEADDR, SOCKOPT_ON, sizeof(int))) < 0) {
00129     Warning("Could not set option REUSEADDR on socket (%d): %s", ret, strerror(errno));
00130     return -1;
00131   }
00132 
00133   
00134   if ((ret = safe_bind(accept_sd, &bind_addr.sa, size)) < 0) {
00135     Warning("Could not bind port: %s", strerror(errno));
00136     return -1;
00137   }
00138 
00139   if ((ret = safe_setsockopt(accept_sd, IPPROTO_TCP, TCP_NODELAY, SOCKOPT_ON, sizeof(int))) < 0) {
00140     Warning("Could not set option TCP_NODELAY on socket (%d): %s", ret, strerror(errno));
00141     return -1;
00142   }
00143 
00144   if ((ret = safe_setsockopt(accept_sd, SOL_SOCKET, SO_KEEPALIVE, SOCKOPT_ON, sizeof(int))) < 0) {
00145     Warning("Could not set option SO_KEEPALIVE on socket (%d): %s", ret, strerror(errno));
00146     return -1;
00147   }
00148 
00149   
00150   
00151   
00152   
00153   
00154   if (accept_port == 0) {
00155     ret = safe_getsockname(accept_sd, &bind_addr.sa, &size);
00156     if (ret == 0) {
00157       accept_port = ntohs(bind_addr.port());
00158     }
00159   }
00160   
00161   
00162   
00163   if ((ret = safe_listen(accept_sd, m_max_connections)) < 0) {
00164     Warning("Could not establish listen queue: %s", strerror(errno));
00165     return -1;
00166   }
00167   
00168   
00169   
00170   
00171   if (gethostname(&this_host[0], MAXDNAME) != 0) {
00172     snprintf(this_host, sizeof(this_host), "unknown-host");
00173   }
00174   init_cid(0, this_host, accept_port, accept_sd, LogSock::LS_STATE_INCOMING);
00175 
00176   m_accept_connections = true;
00177   Debug("log-sock", "LogSocket established on [%s:%d]", this_host, accept_port);
00178 
00179   accept_sd.release();
00180   return 0;
00181 }
00182 
00183 
00184 
00185 
00186 
00187 
00188 
00189 
00190 
00191 int
00192 LogSock::accept()
00193 {
00194   int cid, connect_sd;
00195   IpEndpoint connect_addr;
00196   socklen_t size = sizeof(connect_addr);
00197   in_port_t connect_port;
00198 
00199   if (!m_accept_connections || ct[0].sd < 0) {
00200     return LogSock::LS_ERROR_NO_CONNECTION;
00201   }
00202 
00203   cid = new_cid();
00204   if (cid < 0) {
00205     return LogSock::LS_ERROR_CONNECT_TABLE_FULL;
00206   }
00207 
00208   Debug("log-sock", "waiting to accept a new connection");
00209 
00210   connect_sd =::accept(ct[0].sd, &connect_addr.sa, &size);
00211   if (connect_sd < 0) {
00212     return LogSock::LS_ERROR_ACCEPT;
00213   }
00214   connect_port = ntohs(connect_addr.port());
00215 
00216   init_cid(cid, NULL, connect_port, connect_sd, LogSock::LS_STATE_INCOMING);
00217 
00218   Debug("log-sock", "new connection accepted, cid = %d, port = %d", cid, connect_port);
00219 
00220   return cid;
00221 }
00222 
00223 
00224 
00225 
00226 
00227 
00228   
00229 int
00230 LogSock::connect(sockaddr const* ip)
00231 {
00232   int cid, ret;
00233   ats_scoped_fd connect_sd;
00234   uint16_t port;
00235 
00236   if (!ats_is_ip(ip)) {
00237     Note("Invalid host IP or port number for connection");
00238     return LogSock::LS_ERROR_NO_SUCH_HOST;
00239   }
00240   port = ntohs(ats_ip_port_cast(ip));
00241 
00242   ip_port_text_buffer ipstr;
00243   Debug("log-sock", "connecting to [%s:%d]", ats_ip_nptop(ip, ipstr, sizeof(ipstr)), port);
00244 
00245   
00246   cid = new_cid();
00247   if (cid < 0) {
00248     Note("No more connections allowed for this socket");
00249     return LogSock::LS_ERROR_CONNECT_TABLE_FULL;
00250   }
00251   
00252   connect_sd =::socket(ip->sa_family, LS_SOCKTYPE, LS_PROTOCOL);
00253   if (connect_sd < 0) {
00254     Note("Error initializing socket for connection: %d", static_cast<int>(connect_sd));
00255     return LogSock::LS_ERROR_SOCKET;
00256   }
00257 
00258   if ((ret = safe_setsockopt(connect_sd, IPPROTO_TCP, TCP_NODELAY, SOCKOPT_ON, sizeof(int))) < 0) {
00259     Note("Could not set option TCP_NODELAY on socket (%d): %s", ret, strerror(errno));
00260     return -1;
00261   }
00262 
00263   if ((ret = safe_setsockopt(connect_sd, SOL_SOCKET, SO_KEEPALIVE, SOCKOPT_ON, sizeof(int))) < 0) {
00264     Note("Could not set option SO_KEEPALIVE on socket (%d): %s", ret, strerror(errno));
00265     return -1;
00266   }
00267 
00268   
00269   if (::connect(connect_sd, ip, ats_ip_size(ip)) != 0) {
00270     Note("Failure to connect");
00271     return LogSock::LS_ERROR_CONNECT;
00272   }
00273 
00274   init_cid(cid, ipstr, port, connect_sd, LogSock::LS_STATE_OUTGOING);
00275 
00276   Debug("log-sock", "outgoing connection to [%s:%d] established, fd  = %d", ipstr, port, cid);
00277 
00278   connect_sd.release();
00279   return cid;
00280 }
00281 
00282 
00283 
00284 
00285 
00286 
00287 
00288 
00289  
00290 bool LogSock::pending_data(int *cid, int timeout_msec, bool include_connects)
00291 {
00292   int
00293     start_index,
00294     ret,
00295     n_poll_fds,
00296     i;
00297   static struct pollfd
00298     fds[LS_CONST_CLUSTER_MAX_MACHINES];
00299   int
00300     fd_to_cid[LS_CONST_CLUSTER_MAX_MACHINES];
00301 
00302   ink_assert(m_max_connections <= (LS_CONST_CLUSTER_MAX_MACHINES + 1));
00303   ink_assert(cid != NULL);
00304   ink_assert(timeout_msec >= 0);
00305 
00306   
00307   
00308   
00309   
00310   
00311   
00312 
00313   if (*cid >= 0) {              
00314 
00315     ink_assert(*cid < m_max_connections);
00316     fds[0].fd = ct[*cid].sd;
00317     fds[0].events = POLLIN;
00318     fds[0].revents = 0;
00319     fd_to_cid[0] = *cid;
00320     n_poll_fds = 1;
00321 
00322   } else {                      
00323 
00324     if (include_connects) {
00325       start_index = 0;
00326     } else {
00327       start_index = 1;
00328     }
00329     n_poll_fds = 0;
00330     for (i = start_index; i < m_max_connections; i++) {
00331       if (ct[i].state == LogSock::LS_STATE_INCOMING) {
00332         fds[n_poll_fds].fd = ct[i].sd;
00333         fds[n_poll_fds].events = POLLIN;
00334         fds[n_poll_fds].revents = 0;
00335         fd_to_cid[n_poll_fds] = i;
00336         n_poll_fds++;
00337       }
00338     }
00339   }
00340 
00341   if (n_poll_fds == 0) {
00342     return false;
00343   }
00344 
00345   ret =::poll(fds, n_poll_fds, timeout_msec);
00346 
00347   if (ret == 0) {
00348     return false;               
00349   } else if (ret < 0) {
00350     Debug("log-sock", "error on poll");
00351     return false;               
00352   }
00353   
00354   
00355   
00356   
00357   
00358 
00359   for (i = 0; i < n_poll_fds; i++) {
00360     if (fds[i].revents & POLLIN) {
00361       *cid = fd_to_cid[i];
00362       Debug("log-sock", "poll successful on index %d", *cid);
00363       return true;
00364     }
00365   }
00366 
00367   Debug("log-sock", "invalid revents in the poll table");
00368   return false;
00369 }
00370 
00371 
00372 
00373 
00374 
00375 
00376 bool LogSock::pending_any(int *cid, int timeout_msec)
00377 {
00378   ink_assert(cid != NULL);
00379   *cid = -1;
00380   if (m_accept_connections) {
00381     return pending_data(cid, timeout_msec, true);
00382   } else {
00383     return pending_data(cid, timeout_msec, false);
00384   }
00385 }
00386 
00387 
00388 
00389 
00390 
00391 
00392 
00393 
00394 bool LogSock::pending_message_any(int *cid, int timeout_msec)
00395 {
00396   ink_assert(cid != NULL);
00397   *cid = -1;
00398   return pending_data(cid, timeout_msec, false);
00399 }
00400 
00401 
00402 
00403 
00404 
00405 
00406 bool LogSock::pending_message_on(int cid, int timeout_msec)
00407 {
00408   return pending_data(&cid, timeout_msec, false);
00409 }
00410 
00411 
00412 
00413 
00414 
00415 
00416 
00417 bool LogSock::pending_connect(int timeout_msec)
00418 {
00419   int
00420     cid = 0;
00421   if (m_accept_connections) {
00422     return pending_data(&cid, timeout_msec, true);
00423   } else {
00424     return false;
00425   }
00426 }
00427 
00428 
00429 
00430 
00431 
00432 
00433 
00434 void
00435 LogSock::close(int cid)
00436 {
00437   ink_assert(cid >= 0 && cid < m_max_connections);
00438 
00439   Debug("log-sock", "closing connection for cid %d", cid);
00440 
00441   if (ct[cid].state != LogSock::LS_STATE_UNUSED) {
00442     ::close(ct[cid].sd);
00443     delete ct[cid].host;
00444     ct[cid].state = LogSock::LS_STATE_UNUSED;
00445   }
00446 }
00447 
00448 void
00449 LogSock::close()
00450 {
00451   for (int i = 1; i < m_max_connections; i++) {
00452     this->close(i);
00453   }
00454 }
00455 
00456 
00457 
00458 
00459 
00460 
00461 
00462 int
00463 LogSock::write(int cid, void *buf, int bytes)
00464 {
00465   LogSock::MsgHeader header = {
00466   0};
00467   header.msg_bytes = 0;
00468   int ret;
00469 
00470   ink_assert(cid >= 0 && cid < m_max_connections);
00471 
00472   if (buf == NULL || bytes == 0) {
00473     return 0;
00474   }
00475 
00476   if (ct[cid].state != LogSock::LS_STATE_OUTGOING) {
00477     return LogSock::LS_ERROR_STATE;
00478   }
00479 
00480   Debug("log-sock", "Sending %d bytes to cid %d", bytes, cid);
00481 
00482   
00483   
00484   
00485   Debug("log-sock", "   sending header (%zu bytes)", sizeof(LogSock::MsgHeader));
00486   header.msg_bytes = bytes;
00487   ret =::send(ct[cid].sd, (char *) &header, sizeof(LogSock::MsgHeader), 0);
00488   if (ret != sizeof(LogSock::MsgHeader)) {
00489     return LogSock::LS_ERROR_WRITE;
00490   }
00491   
00492   
00493   
00494   Debug("log-sock", "   sending data (%d bytes)", bytes);
00495   return::send(ct[cid].sd, (char *) buf, bytes, 0);
00496 }
00497 
00498 
00499 
00500 
00501 
00502 
00503 
00504 
00505 int
00506 LogSock::read(int cid, void *buf, unsigned maxsize)
00507 {
00508   LogSock::MsgHeader header;
00509   unsigned size;
00510 
00511   ink_assert(cid >= 0 && cid < m_max_connections);
00512   ink_assert(buf != NULL);
00513 
00514   if (ct[cid].state != LogSock::LS_STATE_INCOMING) {
00515     return LogSock::LS_ERROR_STATE;
00516   }
00517 
00518   Debug("log-sock", "reading data from cid %d", cid);
00519 
00520   if (read_header(ct[cid].sd, &header) < 0) {
00521     return LogSock::LS_ERROR_READ;
00522   }
00523 
00524   size = ((unsigned) header.msg_bytes < maxsize) ? (unsigned) header.msg_bytes : maxsize;
00525   return read_body(ct[cid].sd, buf, size);
00526 }
00527 
00528 
00529 
00530 
00531 
00532 
00533 
00534 
00535 
00536 void *
00537 LogSock::read_alloc(int cid, int *size)
00538 {
00539   LogSock::MsgHeader header;
00540   char *data;
00541 
00542   ink_assert(cid >= 0 && cid < m_max_connections);
00543 
00544   if (ct[cid].state != LogSock::LS_STATE_INCOMING) {
00545     return NULL;
00546   }
00547 
00548   Debug("log-sock", "reading data from cid %d", cid);
00549 
00550   if (read_header(ct[cid].sd, &header) < 0) {
00551     return NULL;
00552   }
00553 
00554   data = new char[header.msg_bytes];
00555   ink_assert(data != NULL);
00556 
00557   if ((*size = read_body(ct[cid].sd, data, header.msg_bytes)) < 0) {
00558     delete[] data;
00559     data = NULL;
00560   }
00561 
00562   return data;
00563 }
00564 
00565 
00566 
00567 bool LogSock::is_connected(int cid, bool ping) const
00568 {
00569   int
00570     i,
00571     j,
00572     flags;
00573 
00574   ink_assert(cid >= 0 && cid < m_max_connections);
00575 
00576   if (ct[cid].state == LogSock::LS_STATE_UNUSED) {
00577     return false;
00578   }
00579 
00580   if (ping) {
00581     flags = fcntl(ct[cid].sd, F_GETFL);
00582     ::fcntl(ct[cid].sd, F_SETFL, O_NONBLOCK);
00583     j =::recv(ct[cid].sd, (char *) &i, sizeof(int), MSG_PEEK);
00584     ::fcntl(ct[cid].sd, F_SETFL, flags);
00585     if (j != 0) {
00586       return true;
00587     } else {
00588       return false;
00589     }
00590   } else {
00591     return (ct[cid].sd >= 0);
00592   }
00593 }
00594 
00595 
00596 
00597 void
00598 LogSock::check_connections()
00599 {
00600   for (int i = 1; i < m_max_connections; i++) {
00601     if (ct[i].state == LogSock::LS_STATE_INCOMING) {
00602       if (!is_connected(i, true)) {
00603         Debug("log-sock", "Connection %d is no longer connected", i);
00604         close(i);
00605       }
00606     }
00607   }
00608 }
00609 
00610 
00611 
00612 
00613 
00614 
00615 bool LogSock::authorized_client(int cid, char *key)
00616 {
00617   
00618   
00619   
00620   if (!pending_message_on(cid, 5000)) {
00621     return false;
00622   }
00623   
00624   
00625   
00626   
00627   char
00628     buf[1024];
00629   int
00630     size = this->read(cid, buf, 1024);
00631   ink_assert(size >= 0 && size <= 1024);
00632 
00633   if (strncmp(buf, key, size) == 0) {
00634     return true;
00635   }
00636 
00637   return false;
00638 }
00639 
00640 
00641 
00642 char *
00643 LogSock::connected_host(int cid)
00644 {
00645   ink_assert(cid >= 0 && cid < m_max_connections);
00646   return ct[cid].host;
00647 }
00648 
00649 
00650 
00651 int
00652 LogSock::connected_port(int cid)
00653 {
00654   ink_assert(cid >= 0 && cid < m_max_connections);
00655   return ct[cid].port;
00656 }
00657 
00658 
00659 
00660 
00661 
00662 
00663 
00664 
00665 int
00666 LogSock::new_cid()
00667 {
00668   int cid = -1;
00669 
00670   for (int i = 1; i < m_max_connections; i++) {
00671     if (ct[i].state == LogSock::LS_STATE_UNUSED) {
00672       cid = i;
00673       break;
00674     }
00675   }
00676 
00677   return cid;
00678 }
00679 
00680 
00681 
00682 
00683 void
00684 LogSock::init_cid(int cid, char *host, int port, int sd, LogSock::State state)
00685 {
00686   ink_assert(cid >= 0 && cid < m_max_connections);
00687   
00688   ink_assert(port >= 0);
00689   
00690   ink_assert(state >= 0 && state < LogSock::LS_N_STATES);
00691 
00692   if (host != NULL) {
00693     const size_t host_size = strlen(host) + 1;
00694     ct[cid].host = new char[host_size];
00695     ink_strlcpy(ct[cid].host, host, host_size);
00696   } else {
00697     ct[cid].host = NULL;
00698   }
00699 
00700   ct[cid].port = port;
00701   ct[cid].sd = sd;
00702   ct[cid].state = state;
00703 }
00704 
00705 
00706 
00707 int
00708 LogSock::read_header(int sd, LogSock::MsgHeader * header)
00709 {
00710   ink_assert(sd >= 0);
00711   ink_assert(header != NULL);
00712 
00713   int bytes =::recv(sd, (char *) header, sizeof(LogSock::MsgHeader), 0);
00714   if (bytes != sizeof(LogSock::MsgHeader)) {
00715     return -1;
00716   }
00717 
00718   return bytes;
00719 }
00720 
00721 
00722 
00723 int
00724 LogSock::read_body(int sd, void *buf, int bytes)
00725 {
00726   ink_assert(sd >= 0);
00727   ink_assert(buf != NULL);
00728   ink_assert(bytes >= 0);
00729 
00730   if (bytes == 0) {
00731     return 0;
00732   }
00733 
00734   unsigned bytes_left = bytes;
00735   unsigned bytes_read;
00736   char *to = (char *) buf;
00737 
00738   while (bytes_left) {
00739     bytes_read =::recv(sd, to, bytes_left, 0);
00740     to += bytes_read;
00741     bytes_left -= bytes_read;
00742   }
00743 
00744   return bytes;
00745 }