00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 #include "P_EventSystem.h"
00030 
00031 #if HAVE_EVENTFD
00032 #include <sys/eventfd.h>
00033 #endif
00034 
00035 struct AIOCallback;
00036 
00037 #define MAX_HEARTBEATS_MISSED           10
00038 #define NO_HEARTBEAT                    -1
00039 #define THREAD_MAX_HEARTBEAT_MSECONDS   60
00040 #define NO_ETHREAD_ID                   -1
00041 
00042 EThread::EThread()
00043   : generator((uint64_t)ink_get_hrtime_internal() ^ (uint64_t)(uintptr_t)this),
00044    ethreads_to_be_signalled(NULL),
00045    n_ethreads_to_be_signalled(0),
00046    main_accept_index(-1),
00047    id(NO_ETHREAD_ID), event_types(0),
00048    signal_hook(0),
00049    tt(REGULAR)
00050 {
00051   memset(thread_private, 0, PER_THREAD_DATA);
00052 }
00053 
00054 EThread::EThread(ThreadType att, int anid)
00055   : generator((uint64_t)ink_get_hrtime_internal() ^ (uint64_t)(uintptr_t)this),
00056     ethreads_to_be_signalled(NULL),
00057     n_ethreads_to_be_signalled(0),
00058     main_accept_index(-1),
00059     id(anid),
00060     event_types(0),
00061     signal_hook(0),
00062     tt(att),
00063     server_session_pool(NULL)
00064 {
00065   ethreads_to_be_signalled = (EThread **)ats_malloc(MAX_EVENT_THREADS * sizeof(EThread *));
00066   memset((char *) ethreads_to_be_signalled, 0, MAX_EVENT_THREADS * sizeof(EThread *));
00067   memset(thread_private, 0, PER_THREAD_DATA);
00068 #if HAVE_EVENTFD
00069   evfd = eventfd(0, O_NONBLOCK | FD_CLOEXEC);
00070   if (evfd < 0) {
00071     if (errno == EINVAL) { 
00072       evfd = eventfd(0,0);
00073       if (evfd < 0)
00074         Fatal("EThread::EThread: %d=eventfd(0,0),errno(%d)",evfd,errno);
00075     } else
00076       Fatal("EThread::EThread: %d=eventfd(0,O_NONBLOCK | FD_CLOEXEC),errno(%d)",evfd,errno);
00077   }
00078   fcntl(evfd, F_SETFD, FD_CLOEXEC);
00079   fcntl(evfd, F_SETFL, O_NONBLOCK);
00080 #elif TS_USE_PORT
00081   
00082 
00083 
00084 #else
00085   ink_release_assert(pipe(evpipe) >= 0);
00086   fcntl(evpipe[0], F_SETFD, FD_CLOEXEC);
00087   fcntl(evpipe[0], F_SETFL, O_NONBLOCK);
00088   fcntl(evpipe[1], F_SETFD, FD_CLOEXEC);
00089   fcntl(evpipe[1], F_SETFL, O_NONBLOCK);
00090 #endif
00091 }
00092 
00093 EThread::EThread(ThreadType att, Event * e)
00094  : generator((uint32_t)((uintptr_t)time(NULL) ^ (uintptr_t) this)),
00095    ethreads_to_be_signalled(NULL),
00096    n_ethreads_to_be_signalled(0),
00097    main_accept_index(-1),
00098    id(NO_ETHREAD_ID), event_types(0),
00099    signal_hook(0),
00100    tt(att), oneevent(e)
00101 {
00102   ink_assert(att == DEDICATED);
00103   memset(thread_private, 0, PER_THREAD_DATA);
00104 }
00105 
00106 
00107 
00108 
00109 EThread::~EThread()
00110 {
00111   if (n_ethreads_to_be_signalled > 0)
00112     flush_signals(this);
00113   ats_free(ethreads_to_be_signalled);
00114   
00115   
00116 }
00117 
00118 bool
00119 EThread::is_event_type(EventType et)
00120 {
00121   return !!(event_types & (1 << (int) et));
00122 }
00123 
00124 void
00125 EThread::set_event_type(EventType et)
00126 {
00127   event_types |= (1 << (int) et);
00128 }
00129 
00130 void
00131 EThread::process_event(Event * e, int calling_code)
00132 {
00133   ink_assert((!e->in_the_prot_queue && !e->in_the_priority_queue));
00134   MUTEX_TRY_LOCK_FOR(lock, e->mutex.m_ptr, this, e->continuation);
00135   if (!lock) {
00136     e->timeout_at = cur_time + DELAY_FOR_RETRY;
00137     EventQueueExternal.enqueue_local(e);
00138   } else {
00139     if (e->cancelled) {
00140       free_event(e);
00141       return;
00142     }
00143     Continuation *c_temp = e->continuation;
00144     e->continuation->handleEvent(calling_code, e);
00145     ink_assert(!e->in_the_priority_queue);
00146     ink_assert(c_temp == e->continuation);
00147     MUTEX_RELEASE(lock);
00148     if (e->period) {
00149       if (!e->in_the_prot_queue && !e->in_the_priority_queue) {
00150         if (e->period < 0)
00151           e->timeout_at = e->period;
00152         else {
00153           cur_time = ink_get_based_hrtime();
00154           e->timeout_at = cur_time + e->period;
00155           if (e->timeout_at < cur_time)
00156             e->timeout_at = cur_time;
00157         }
00158         EventQueueExternal.enqueue_local(e);
00159       }
00160     } else if (!e->in_the_prot_queue && !e->in_the_priority_queue)
00161       free_event(e);
00162   }
00163 }
00164 
00165 
00166 
00167 
00168 
00169 
00170 
00171 
00172 
00173 
00174 
00175 
00176 void
00177 EThread::execute() {
00178   switch (tt) {
00179 
00180     case REGULAR: {
00181       Event *e;
00182       Que(Event, link) NegativeQueue;
00183       ink_hrtime next_time = 0;
00184 
00185       
00186       for (;;) {
00187         
00188         
00189         cur_time = ink_get_based_hrtime_internal();
00190         while ((e = EventQueueExternal.dequeue_local())) {
00191           if (e->cancelled)
00192              free_event(e);
00193           else if (!e->timeout_at) { 
00194             ink_assert(e->period == 0);
00195             process_event(e, e->callback_event);
00196           } else if (e->timeout_at > 0) 
00197             EventQueue.enqueue(e, cur_time);
00198           else { 
00199             Event *p = NULL;
00200             Event *a = NegativeQueue.head;
00201             while (a && a->timeout_at > e->timeout_at) {
00202               p = a;
00203               a = a->link.next;
00204             }
00205             if (!a)
00206               NegativeQueue.enqueue(e);
00207             else
00208               NegativeQueue.insert(e, p);
00209           }
00210         }
00211         bool done_one;
00212         do {
00213           done_one = false;
00214           
00215           EventQueue.check_ready(cur_time, this);
00216           while ((e = EventQueue.dequeue_ready(cur_time))) {
00217             ink_assert(e);
00218             ink_assert(e->timeout_at > 0);
00219             if (e->cancelled)
00220               free_event(e);
00221             else {
00222               done_one = true;
00223               process_event(e, e->callback_event);
00224             }
00225           }
00226         } while (done_one);
00227         
00228         if (NegativeQueue.head) {
00229           if (n_ethreads_to_be_signalled)
00230             flush_signals(this);
00231           
00232           
00233           
00234           if (!INK_ATOMICLIST_EMPTY(EventQueueExternal.al))
00235             EventQueueExternal.dequeue_timed(cur_time, next_time, false);
00236           while ((e = EventQueueExternal.dequeue_local())) {
00237             if (!e->timeout_at)
00238               process_event(e, e->callback_event);
00239             else {
00240               if (e->cancelled)
00241                 free_event(e);
00242               else {
00243                 
00244                 
00245                 
00246                 
00247                 
00248                 
00249                 
00250                 if (e->timeout_at < 0) {
00251                   Event *p = NULL;
00252                   Event *a = NegativeQueue.head;
00253                   while (a && a->timeout_at > e->timeout_at) {
00254                     p = a;
00255                     a = a->link.next;
00256                   }
00257                   if (!a)
00258                     NegativeQueue.enqueue(e);
00259                   else
00260                     NegativeQueue.insert(e, p);
00261                 } else
00262                   EventQueue.enqueue(e, cur_time);
00263               }
00264             }
00265           }
00266           
00267           while ((e = NegativeQueue.dequeue()))
00268             process_event(e, EVENT_POLL);
00269           if (!INK_ATOMICLIST_EMPTY(EventQueueExternal.al))
00270             EventQueueExternal.dequeue_timed(cur_time, next_time, false);
00271         } else {                
00272           next_time = EventQueue.earliest_timeout();
00273           ink_hrtime sleep_time = next_time - cur_time;
00274 
00275           if (sleep_time > THREAD_MAX_HEARTBEAT_MSECONDS * HRTIME_MSECOND) {
00276             next_time = cur_time + THREAD_MAX_HEARTBEAT_MSECONDS * HRTIME_MSECOND;
00277           }
00278           
00279           
00280           
00281           if (n_ethreads_to_be_signalled)
00282             flush_signals(this);
00283           EventQueueExternal.dequeue_timed(cur_time, next_time, true);
00284         }
00285       }
00286     }
00287 
00288     case DEDICATED: {
00289       
00290       MUTEX_TAKE_LOCK_FOR(oneevent->mutex, this, oneevent->continuation);
00291       oneevent->continuation->handleEvent(EVENT_IMMEDIATE, oneevent);
00292       MUTEX_UNTAKE_LOCK(oneevent->mutex, this);
00293       free_event(oneevent);
00294       break;
00295     }
00296 
00297     default:
00298       ink_assert(!"bad case value (execute)");
00299       break;
00300   }                             
00301   
00302 }