00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #ifndef _P_UnixSocketManager_h_
00033 #define _P_UnixSocketManager_h_
00034
00035 #include "libts.h"
00036 #include "I_SocketManager.h"
00037
00038
00039
00040
00041
00042
00043 #define EPOLL_MAX_DESCRIPTOR_SIZE 32768
00044
00045 TS_INLINE bool
00046 transient_error()
00047 {
00048 bool transient = (errno == EINTR);
00049 #ifdef ENOMEM
00050 transient = transient || (errno == ENOMEM);
00051 #endif
00052 #ifdef ENOBUFS
00053 transient = transient || (errno == ENOBUFS);
00054 #endif
00055 return transient;
00056 }
00057
00058
00059
00060
00061
00062 TS_INLINE int
00063 SocketManager::accept(int s, struct sockaddr *addr, socklen_t *addrlen)
00064 {
00065 int r;
00066 do {
00067 r =::accept(s, addr, addrlen);
00068 if (likely(r >= 0))
00069 break;
00070 r = -errno;
00071 } while (transient_error());
00072
00073 return r;
00074 }
00075
00076 TS_INLINE int
00077 SocketManager::open(const char *path, int oflag, mode_t mode)
00078 {
00079 int s;
00080 do {
00081 s =::open(path, oflag, mode);
00082 if (likely(s >= 0))
00083 break;
00084 s = -errno;
00085 } while (transient_error());
00086 return s;
00087 }
00088
00089 TS_INLINE int64_t
00090 SocketManager::read(int fd, void *buf, int size, void * )
00091 {
00092 int64_t r;
00093 do {
00094 r =::read(fd, buf, size);
00095 if (likely(r >= 0))
00096 break;
00097 r = -errno;
00098 } while (r == -EINTR);
00099 return r;
00100 }
00101
00102 TS_INLINE int64_t
00103 SocketManager::pread(int fd, void *buf, int size, off_t offset, char * )
00104 {
00105 int64_t r;
00106 do {
00107 r =::pread(fd, buf, size, offset);
00108 if (r < 0)
00109 r = -errno;
00110 } while (r == -EINTR);
00111 return r;
00112 }
00113
00114 TS_INLINE int64_t
00115 SocketManager::readv(int fd, struct iovec *vector, size_t count)
00116 {
00117 int64_t r;
00118 do {
00119
00120 if (likely((r =::readv(fd, vector, count)) >= 0))
00121 break;
00122 r = -errno;
00123 } while (transient_error());
00124 return r;
00125 }
00126
00127 TS_INLINE int64_t
00128 SocketManager::vector_io(int fd, struct iovec *vector, size_t count, int read_request, void * )
00129 {
00130 const int max_iovecs_per_request = 16;
00131 int n;
00132 int64_t r = 0;
00133 int n_vec;
00134 int64_t bytes_xfered = 0;
00135 int current_count;
00136 int64_t current_request_bytes;
00137
00138 for (n_vec = 0; n_vec < (int) count; n_vec += max_iovecs_per_request) {
00139 current_count = min(max_iovecs_per_request, ((int) (count - n_vec)));
00140 do {
00141
00142 r = read_request ? ::readv(fd, &vector[n_vec], current_count) : ::writev(fd, &vector[n_vec], current_count);
00143 if (likely(r >= 0))
00144 break;
00145 r = -errno;
00146 } while (transient_error());
00147
00148 if (r <= 0) {
00149 return (bytes_xfered && (r == -EAGAIN)) ? bytes_xfered : r;
00150 }
00151 bytes_xfered += r;
00152
00153 if ((n_vec + max_iovecs_per_request) >= (int) count)
00154 break;
00155
00156
00157 current_request_bytes = 0;
00158 for (n = n_vec; n < (n_vec + current_count); ++n)
00159 current_request_bytes += vector[n].iov_len;
00160
00161
00162 if (r != current_request_bytes)
00163 break;
00164 }
00165 return bytes_xfered;
00166 }
00167
00168 TS_INLINE int64_t
00169 SocketManager::read_vector(int fd, struct iovec *vector, size_t count, void *pOLP)
00170 {
00171 return vector_io(fd, vector, count, 1, pOLP);
00172 }
00173
00174 TS_INLINE int
00175 SocketManager::recv(int fd, void *buf, int size, int flags)
00176 {
00177 int r;
00178 do {
00179 if (unlikely((r =::recv(fd, (char *) buf, size, flags)) < 0)) {
00180 r = -errno;
00181 }
00182 } while (r == -EINTR);
00183 return r;
00184 }
00185
00186 TS_INLINE int
00187 SocketManager::recvfrom(int fd, void *buf, int size, int flags, struct sockaddr *addr, socklen_t *addrlen)
00188 {
00189 int r;
00190 do {
00191 r =::recvfrom(fd, (char *) buf, size, flags, addr, addrlen);
00192 if (unlikely(r < 0))
00193 r = -errno;
00194 } while (r == -EINTR);
00195 return r;
00196 }
00197
00198 TS_INLINE int64_t
00199 SocketManager::write(int fd, void *buf, int size, void * )
00200 {
00201 int64_t r;
00202 do {
00203 if (likely((r =::write(fd, buf, size)) >= 0))
00204 break;
00205 r = -errno;
00206 } while (r == -EINTR);
00207 return r;
00208 }
00209
00210 TS_INLINE int64_t
00211 SocketManager::pwrite(int fd, void *buf, int size, off_t offset, char * )
00212 {
00213 int64_t r;
00214 do {
00215 if (unlikely((r =::pwrite(fd, buf, size, offset)) < 0))
00216 r = -errno;
00217 } while (r == -EINTR);
00218 return r;
00219 }
00220
00221 TS_INLINE int64_t
00222 SocketManager::writev(int fd, struct iovec *vector, size_t count)
00223 {
00224 int64_t r;
00225 do {
00226 if (likely((r =::writev(fd, vector, count)) >= 0))
00227 break;
00228 r = -errno;
00229 } while (transient_error());
00230 return r;
00231 }
00232
00233 TS_INLINE int64_t
00234 SocketManager::write_vector(int fd, struct iovec *vector, size_t count, void *pOLP)
00235 {
00236 return vector_io(fd, vector, count, 0, pOLP);
00237 }
00238
00239
00240 TS_INLINE int
00241 SocketManager::send(int fd, void *buf, int size, int flags)
00242 {
00243 int r;
00244 do {
00245 if (unlikely((r =::send(fd, (char *) buf, size, flags)) < 0))
00246 r = -errno;
00247 } while (r == -EINTR);
00248 return r;
00249 }
00250
00251 TS_INLINE int
00252 SocketManager::sendto(int fd, void *buf, int len, int flags, struct sockaddr const* to, int tolen)
00253 {
00254 int r;
00255 do {
00256 if (unlikely((r =::sendto(fd, (char *) buf, len, flags, to, tolen)) < 0))
00257 r = -errno;
00258 } while (r == -EINTR);
00259 return r;
00260 }
00261
00262 TS_INLINE int
00263 SocketManager::sendmsg(int fd, struct msghdr *m, int flags, void * )
00264 {
00265 int r;
00266 do {
00267 if (unlikely((r =::sendmsg(fd, m, flags)) < 0))
00268 r = -errno;
00269 } while (r == -EINTR);
00270 return r;
00271 }
00272
00273 TS_INLINE int64_t
00274 SocketManager::lseek(int fd, off_t offset, int whence)
00275 {
00276 int64_t r;
00277 do {
00278 if ((r =::lseek(fd, offset, whence)) < 0)
00279 r = -errno;
00280 } while (r == -EINTR);
00281 return r;
00282 }
00283
00284 TS_INLINE int
00285 SocketManager::fstat(int fd, struct stat *buf)
00286 {
00287 int r;
00288 do {
00289 if ((r =::fstat(fd, buf)) >= 0)
00290 break;
00291 r = -errno;
00292 } while (transient_error());
00293 return r;
00294 }
00295
00296 TS_INLINE int
00297 SocketManager::unlink(char *buf)
00298 {
00299 int r;
00300 do {
00301 if ((r =::unlink(buf)) < 0)
00302 r = -errno;
00303 } while (r == -EINTR);
00304 return r;
00305 }
00306
00307 TS_INLINE int
00308 SocketManager::fsync(int fildes)
00309 {
00310 int r;
00311 do {
00312 if ((r =::fsync(fildes)) < 0)
00313 r = -errno;
00314 } while (r == -EINTR);
00315 return r;
00316 }
00317
00318 TS_INLINE int
00319 SocketManager::ftruncate(int fildes, off_t length)
00320 {
00321 int r;
00322 do {
00323 if ((r =::ftruncate(fildes, length)) < 0)
00324 r = -errno;
00325 } while (r == -EINTR);
00326 return r;
00327 }
00328
00329 TS_INLINE int
00330 SocketManager::poll(struct pollfd *fds, unsigned long nfds, int timeout)
00331 {
00332 int r;
00333 do {
00334 if ((r =::poll(fds, nfds, timeout)) >= 0)
00335 break;
00336 r = -errno;
00337 } while (transient_error());
00338 return r;
00339 }
00340
00341 #if TS_USE_EPOLL
00342 TS_INLINE int
00343 SocketManager::epoll_create(int size)
00344 {
00345 int r;
00346 if (size <= 0)
00347 size = EPOLL_MAX_DESCRIPTOR_SIZE;
00348 do {
00349 if (likely((r =::epoll_create(size)) >= 0))
00350 break;
00351 r = -errno;
00352 } while (errno == -EINTR);
00353 return r;
00354 }
00355
00356 TS_INLINE int
00357 SocketManager::epoll_close(int epfd)
00358 {
00359 int r = 0;
00360 if (likely(epfd >= 0)) {
00361 do {
00362 if (likely((r =::close(epfd)) == 0))
00363 break;
00364 r = -errno;
00365 } while (errno == -EINTR);
00366 }
00367 return r;
00368 }
00369
00370 TS_INLINE int
00371 SocketManager::epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
00372 {
00373 int r;
00374 do {
00375 if (likely((r =::epoll_ctl(epfd, op, fd, event)) == 0))
00376 break;
00377 r = -errno;
00378 } while (errno == -EINTR);
00379 return r;
00380 }
00381
00382 TS_INLINE int
00383 SocketManager::epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
00384 {
00385 int r;
00386 do {
00387 if ((r =::epoll_wait(epfd, events, maxevents, timeout)) >= 0)
00388 break;
00389 r = -errno;
00390 } while (errno == -EINTR);
00391 return r;
00392 }
00393
00394 #endif
00395
00396 #if TS_USE_KQUEUE
00397 TS_INLINE int
00398 SocketManager::kqueue()
00399 {
00400 return ::kqueue();
00401 }
00402
00403 TS_INLINE int
00404 SocketManager::kevent(int kq, const struct kevent *changelist, int nchanges,
00405 struct kevent *eventlist, int nevents,
00406 const struct timespec *timeout)
00407 {
00408 int r;
00409 do {
00410 r =::kevent(kq, changelist, nchanges,
00411 eventlist, nevents, timeout);
00412 if (likely(r >= 0)) {
00413 break;
00414 }
00415 r = -errno;
00416 } while (errno == -EINTR);
00417 return r;
00418 }
00419 #endif
00420
00421 #if TS_USE_PORT
00422 TS_INLINE int
00423 SocketManager::port_create()
00424 {
00425 return ::port_create();
00426 }
00427
00428 TS_INLINE int
00429 SocketManager::port_associate(int port, int source, uintptr_t obj,
00430 int events, void *user)
00431 {
00432 int r;
00433 r =::port_associate(port, source, obj, events, user);
00434 if(r < 0)
00435 r = -errno;
00436 return r;
00437 }
00438
00439 TS_INLINE int
00440 SocketManager::port_dissociate(int port, int source, uintptr_t obj)
00441 {
00442 int r;
00443 r =::port_dissociate(port, source, obj);
00444 if(r < 0)
00445 r = -errno;
00446 return r;
00447 }
00448
00449 TS_INLINE int
00450 SocketManager::port_getn(int port, port_event_t *list, uint_t max,
00451 uint_t *nget, timespec_t *timeout)
00452 {
00453 int r;
00454 do {
00455 if ((r =::port_getn(port, list, max, nget, timeout)) >= 0)
00456 break;
00457 r = -errno;
00458 } while (errno == -EINTR);
00459 return r;
00460 }
00461 #endif
00462
00463
00464 TS_INLINE int
00465 SocketManager::get_sndbuf_size(int s)
00466 {
00467 int bsz = 0;
00468 int bszsz, r;
00469
00470 bszsz = sizeof(bsz);
00471 r = safe_getsockopt(s, SOL_SOCKET, SO_SNDBUF, (char *) &bsz, &bszsz);
00472 return (r == 0 ? bsz : r);
00473 }
00474
00475 TS_INLINE int
00476 SocketManager::get_rcvbuf_size(int s)
00477 {
00478 int bsz = 0;
00479 int bszsz, r;
00480
00481 bszsz = sizeof(bsz);
00482 r = safe_getsockopt(s, SOL_SOCKET, SO_RCVBUF, (char *) &bsz, &bszsz);
00483 return (r == 0 ? bsz : r);
00484 }
00485
00486 TS_INLINE int
00487 SocketManager::set_sndbuf_size(int s, int bsz)
00488 {
00489 return safe_setsockopt(s, SOL_SOCKET, SO_SNDBUF, (char *) &bsz, sizeof(bsz));
00490 }
00491
00492 TS_INLINE int
00493 SocketManager::set_rcvbuf_size(int s, int bsz)
00494 {
00495 return safe_setsockopt(s, SOL_SOCKET, SO_RCVBUF, (char *) &bsz, sizeof(bsz));
00496 }
00497
00498 TS_INLINE int
00499 SocketManager::getsockname(int s, struct sockaddr *sa, socklen_t *sz)
00500 {
00501 return::getsockname(s, sa, sz);
00502 }
00503
00504 TS_INLINE int
00505 SocketManager::socket(int domain, int type, int protocol, bool )
00506 {
00507 return::socket(domain, type, protocol);
00508 }
00509
00510 TS_INLINE int
00511 SocketManager::mc_socket(int domain, int type, int protocol, bool bNonBlocking)
00512 {
00513 return SocketManager::socket(domain, type, protocol, bNonBlocking);
00514 }
00515
00516 TS_INLINE int
00517 SocketManager::shutdown(int s, int how)
00518 {
00519 int res;
00520 do {
00521 if (unlikely((res =::shutdown(s, how)) < 0))
00522 res = -errno;
00523 } while (res == -EINTR);
00524 return res;
00525 }
00526
00527 TS_INLINE int
00528 SocketManager::lockf(int s, int f, off_t size)
00529 {
00530 int res;
00531 do {
00532 if ((res =::lockf(s, f, size)) < 0)
00533 res = -errno;
00534 } while (res == -EINTR);
00535 return res;
00536 }
00537
00538 TS_INLINE int
00539 SocketManager::dup(int s)
00540 {
00541 int res;
00542 do {
00543 if ((res =::dup(s)) >= 0)
00544 break;
00545 res = -errno;
00546 } while (res == -EINTR);
00547 return res;
00548 }
00549
00550 #endif