00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "P_EventSystem.h"
00030
00031 #if HAVE_EVENTFD
00032 #include <sys/eventfd.h>
00033 #endif
00034
00035 struct AIOCallback;
00036
00037 #define MAX_HEARTBEATS_MISSED 10
00038 #define NO_HEARTBEAT -1
00039 #define THREAD_MAX_HEARTBEAT_MSECONDS 60
00040 #define NO_ETHREAD_ID -1
00041
00042 EThread::EThread()
00043 : generator((uint64_t)ink_get_hrtime_internal() ^ (uint64_t)(uintptr_t)this),
00044 ethreads_to_be_signalled(NULL),
00045 n_ethreads_to_be_signalled(0),
00046 main_accept_index(-1),
00047 id(NO_ETHREAD_ID), event_types(0),
00048 signal_hook(0),
00049 tt(REGULAR)
00050 {
00051 memset(thread_private, 0, PER_THREAD_DATA);
00052 }
00053
00054 EThread::EThread(ThreadType att, int anid)
00055 : generator((uint64_t)ink_get_hrtime_internal() ^ (uint64_t)(uintptr_t)this),
00056 ethreads_to_be_signalled(NULL),
00057 n_ethreads_to_be_signalled(0),
00058 main_accept_index(-1),
00059 id(anid),
00060 event_types(0),
00061 signal_hook(0),
00062 tt(att),
00063 server_session_pool(NULL)
00064 {
00065 ethreads_to_be_signalled = (EThread **)ats_malloc(MAX_EVENT_THREADS * sizeof(EThread *));
00066 memset((char *) ethreads_to_be_signalled, 0, MAX_EVENT_THREADS * sizeof(EThread *));
00067 memset(thread_private, 0, PER_THREAD_DATA);
00068 #if HAVE_EVENTFD
00069 evfd = eventfd(0, O_NONBLOCK | FD_CLOEXEC);
00070 if (evfd < 0) {
00071 if (errno == EINVAL) {
00072 evfd = eventfd(0,0);
00073 if (evfd < 0)
00074 Fatal("EThread::EThread: %d=eventfd(0,0),errno(%d)",evfd,errno);
00075 } else
00076 Fatal("EThread::EThread: %d=eventfd(0,O_NONBLOCK | FD_CLOEXEC),errno(%d)",evfd,errno);
00077 }
00078 fcntl(evfd, F_SETFD, FD_CLOEXEC);
00079 fcntl(evfd, F_SETFL, O_NONBLOCK);
00080 #elif TS_USE_PORT
00081
00082
00083
00084 #else
00085 ink_release_assert(pipe(evpipe) >= 0);
00086 fcntl(evpipe[0], F_SETFD, FD_CLOEXEC);
00087 fcntl(evpipe[0], F_SETFL, O_NONBLOCK);
00088 fcntl(evpipe[1], F_SETFD, FD_CLOEXEC);
00089 fcntl(evpipe[1], F_SETFL, O_NONBLOCK);
00090 #endif
00091 }
00092
00093 EThread::EThread(ThreadType att, Event * e)
00094 : generator((uint32_t)((uintptr_t)time(NULL) ^ (uintptr_t) this)),
00095 ethreads_to_be_signalled(NULL),
00096 n_ethreads_to_be_signalled(0),
00097 main_accept_index(-1),
00098 id(NO_ETHREAD_ID), event_types(0),
00099 signal_hook(0),
00100 tt(att), oneevent(e)
00101 {
00102 ink_assert(att == DEDICATED);
00103 memset(thread_private, 0, PER_THREAD_DATA);
00104 }
00105
00106
00107
00108
00109 EThread::~EThread()
00110 {
00111 if (n_ethreads_to_be_signalled > 0)
00112 flush_signals(this);
00113 ats_free(ethreads_to_be_signalled);
00114
00115
00116 }
00117
00118 bool
00119 EThread::is_event_type(EventType et)
00120 {
00121 return !!(event_types & (1 << (int) et));
00122 }
00123
00124 void
00125 EThread::set_event_type(EventType et)
00126 {
00127 event_types |= (1 << (int) et);
00128 }
00129
00130 void
00131 EThread::process_event(Event * e, int calling_code)
00132 {
00133 ink_assert((!e->in_the_prot_queue && !e->in_the_priority_queue));
00134 MUTEX_TRY_LOCK_FOR(lock, e->mutex.m_ptr, this, e->continuation);
00135 if (!lock) {
00136 e->timeout_at = cur_time + DELAY_FOR_RETRY;
00137 EventQueueExternal.enqueue_local(e);
00138 } else {
00139 if (e->cancelled) {
00140 free_event(e);
00141 return;
00142 }
00143 Continuation *c_temp = e->continuation;
00144 e->continuation->handleEvent(calling_code, e);
00145 ink_assert(!e->in_the_priority_queue);
00146 ink_assert(c_temp == e->continuation);
00147 MUTEX_RELEASE(lock);
00148 if (e->period) {
00149 if (!e->in_the_prot_queue && !e->in_the_priority_queue) {
00150 if (e->period < 0)
00151 e->timeout_at = e->period;
00152 else {
00153 cur_time = ink_get_based_hrtime();
00154 e->timeout_at = cur_time + e->period;
00155 if (e->timeout_at < cur_time)
00156 e->timeout_at = cur_time;
00157 }
00158 EventQueueExternal.enqueue_local(e);
00159 }
00160 } else if (!e->in_the_prot_queue && !e->in_the_priority_queue)
00161 free_event(e);
00162 }
00163 }
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176 void
00177 EThread::execute() {
00178 switch (tt) {
00179
00180 case REGULAR: {
00181 Event *e;
00182 Que(Event, link) NegativeQueue;
00183 ink_hrtime next_time = 0;
00184
00185
00186 for (;;) {
00187
00188
00189 cur_time = ink_get_based_hrtime_internal();
00190 while ((e = EventQueueExternal.dequeue_local())) {
00191 if (e->cancelled)
00192 free_event(e);
00193 else if (!e->timeout_at) {
00194 ink_assert(e->period == 0);
00195 process_event(e, e->callback_event);
00196 } else if (e->timeout_at > 0)
00197 EventQueue.enqueue(e, cur_time);
00198 else {
00199 Event *p = NULL;
00200 Event *a = NegativeQueue.head;
00201 while (a && a->timeout_at > e->timeout_at) {
00202 p = a;
00203 a = a->link.next;
00204 }
00205 if (!a)
00206 NegativeQueue.enqueue(e);
00207 else
00208 NegativeQueue.insert(e, p);
00209 }
00210 }
00211 bool done_one;
00212 do {
00213 done_one = false;
00214
00215 EventQueue.check_ready(cur_time, this);
00216 while ((e = EventQueue.dequeue_ready(cur_time))) {
00217 ink_assert(e);
00218 ink_assert(e->timeout_at > 0);
00219 if (e->cancelled)
00220 free_event(e);
00221 else {
00222 done_one = true;
00223 process_event(e, e->callback_event);
00224 }
00225 }
00226 } while (done_one);
00227
00228 if (NegativeQueue.head) {
00229 if (n_ethreads_to_be_signalled)
00230 flush_signals(this);
00231
00232
00233
00234 if (!INK_ATOMICLIST_EMPTY(EventQueueExternal.al))
00235 EventQueueExternal.dequeue_timed(cur_time, next_time, false);
00236 while ((e = EventQueueExternal.dequeue_local())) {
00237 if (!e->timeout_at)
00238 process_event(e, e->callback_event);
00239 else {
00240 if (e->cancelled)
00241 free_event(e);
00242 else {
00243
00244
00245
00246
00247
00248
00249
00250 if (e->timeout_at < 0) {
00251 Event *p = NULL;
00252 Event *a = NegativeQueue.head;
00253 while (a && a->timeout_at > e->timeout_at) {
00254 p = a;
00255 a = a->link.next;
00256 }
00257 if (!a)
00258 NegativeQueue.enqueue(e);
00259 else
00260 NegativeQueue.insert(e, p);
00261 } else
00262 EventQueue.enqueue(e, cur_time);
00263 }
00264 }
00265 }
00266
00267 while ((e = NegativeQueue.dequeue()))
00268 process_event(e, EVENT_POLL);
00269 if (!INK_ATOMICLIST_EMPTY(EventQueueExternal.al))
00270 EventQueueExternal.dequeue_timed(cur_time, next_time, false);
00271 } else {
00272 next_time = EventQueue.earliest_timeout();
00273 ink_hrtime sleep_time = next_time - cur_time;
00274
00275 if (sleep_time > THREAD_MAX_HEARTBEAT_MSECONDS * HRTIME_MSECOND) {
00276 next_time = cur_time + THREAD_MAX_HEARTBEAT_MSECONDS * HRTIME_MSECOND;
00277 }
00278
00279
00280
00281 if (n_ethreads_to_be_signalled)
00282 flush_signals(this);
00283 EventQueueExternal.dequeue_timed(cur_time, next_time, true);
00284 }
00285 }
00286 }
00287
00288 case DEDICATED: {
00289
00290 MUTEX_TAKE_LOCK_FOR(oneevent->mutex, this, oneevent->continuation);
00291 oneevent->continuation->handleEvent(EVENT_IMMEDIATE, oneevent);
00292 MUTEX_UNTAKE_LOCK(oneevent->mutex, this);
00293 free_event(oneevent);
00294 break;
00295 }
00296
00297 default:
00298 ink_assert(!"bad case value (execute)");
00299 break;
00300 }
00301
00302 }