• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

UnixEThread.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 //////////////////////////////////////////////////////////////////////
00025 //
00026 // The EThread Class
00027 //
00028 /////////////////////////////////////////////////////////////////////
00029 #include "P_EventSystem.h"
00030 
00031 #if HAVE_EVENTFD
00032 #include <sys/eventfd.h>
00033 #endif
00034 
00035 struct AIOCallback;
00036 
00037 #define MAX_HEARTBEATS_MISSED           10
00038 #define NO_HEARTBEAT                    -1
00039 #define THREAD_MAX_HEARTBEAT_MSECONDS   60
00040 #define NO_ETHREAD_ID                   -1
00041 
00042 EThread::EThread()
00043   : generator((uint64_t)ink_get_hrtime_internal() ^ (uint64_t)(uintptr_t)this),
00044    ethreads_to_be_signalled(NULL),
00045    n_ethreads_to_be_signalled(0),
00046    main_accept_index(-1),
00047    id(NO_ETHREAD_ID), event_types(0),
00048    signal_hook(0),
00049    tt(REGULAR)
00050 {
00051   memset(thread_private, 0, PER_THREAD_DATA);
00052 }
00053 
00054 EThread::EThread(ThreadType att, int anid)
00055   : generator((uint64_t)ink_get_hrtime_internal() ^ (uint64_t)(uintptr_t)this),
00056     ethreads_to_be_signalled(NULL),
00057     n_ethreads_to_be_signalled(0),
00058     main_accept_index(-1),
00059     id(anid),
00060     event_types(0),
00061     signal_hook(0),
00062     tt(att),
00063     server_session_pool(NULL)
00064 {
00065   ethreads_to_be_signalled = (EThread **)ats_malloc(MAX_EVENT_THREADS * sizeof(EThread *));
00066   memset((char *) ethreads_to_be_signalled, 0, MAX_EVENT_THREADS * sizeof(EThread *));
00067   memset(thread_private, 0, PER_THREAD_DATA);
00068 #if HAVE_EVENTFD
00069   evfd = eventfd(0, O_NONBLOCK | FD_CLOEXEC);
00070   if (evfd < 0) {
00071     if (errno == EINVAL) { // flags invalid for kernel <= 2.6.26
00072       evfd = eventfd(0,0);
00073       if (evfd < 0)
00074         Fatal("EThread::EThread: %d=eventfd(0,0),errno(%d)",evfd,errno);
00075     } else
00076       Fatal("EThread::EThread: %d=eventfd(0,O_NONBLOCK | FD_CLOEXEC),errno(%d)",evfd,errno);
00077   }
00078   fcntl(evfd, F_SETFD, FD_CLOEXEC);
00079   fcntl(evfd, F_SETFL, O_NONBLOCK);
00080 #elif TS_USE_PORT
00081   /* Solaris ports requires no crutches to do cross thread signaling.
00082    * We'll just port_send the event straight over the port.
00083    */
00084 #else
00085   ink_release_assert(pipe(evpipe) >= 0);
00086   fcntl(evpipe[0], F_SETFD, FD_CLOEXEC);
00087   fcntl(evpipe[0], F_SETFL, O_NONBLOCK);
00088   fcntl(evpipe[1], F_SETFD, FD_CLOEXEC);
00089   fcntl(evpipe[1], F_SETFL, O_NONBLOCK);
00090 #endif
00091 }
00092 
00093 EThread::EThread(ThreadType att, Event * e)
00094  : generator((uint32_t)((uintptr_t)time(NULL) ^ (uintptr_t) this)),
00095    ethreads_to_be_signalled(NULL),
00096    n_ethreads_to_be_signalled(0),
00097    main_accept_index(-1),
00098    id(NO_ETHREAD_ID), event_types(0),
00099    signal_hook(0),
00100    tt(att), oneevent(e)
00101 {
00102   ink_assert(att == DEDICATED);
00103   memset(thread_private, 0, PER_THREAD_DATA);
00104 }
00105 
00106 
00107 // Provide a destructor so that SDK functions which create and destroy
00108 // threads won't have to deal with EThread memory deallocation.
00109 EThread::~EThread()
00110 {
00111   if (n_ethreads_to_be_signalled > 0)
00112     flush_signals(this);
00113   ats_free(ethreads_to_be_signalled);
00114   // TODO: This can't be deleted ....
00115   // delete[]l1_hash;
00116 }
00117 
00118 bool
00119 EThread::is_event_type(EventType et)
00120 {
00121   return !!(event_types & (1 << (int) et));
00122 }
00123 
00124 void
00125 EThread::set_event_type(EventType et)
00126 {
00127   event_types |= (1 << (int) et);
00128 }
00129 
00130 void
00131 EThread::process_event(Event * e, int calling_code)
00132 {
00133   ink_assert((!e->in_the_prot_queue && !e->in_the_priority_queue));
00134   MUTEX_TRY_LOCK_FOR(lock, e->mutex.m_ptr, this, e->continuation);
00135   if (!lock) {
00136     e->timeout_at = cur_time + DELAY_FOR_RETRY;
00137     EventQueueExternal.enqueue_local(e);
00138   } else {
00139     if (e->cancelled) {
00140       free_event(e);
00141       return;
00142     }
00143     Continuation *c_temp = e->continuation;
00144     e->continuation->handleEvent(calling_code, e);
00145     ink_assert(!e->in_the_priority_queue);
00146     ink_assert(c_temp == e->continuation);
00147     MUTEX_RELEASE(lock);
00148     if (e->period) {
00149       if (!e->in_the_prot_queue && !e->in_the_priority_queue) {
00150         if (e->period < 0)
00151           e->timeout_at = e->period;
00152         else {
00153           cur_time = ink_get_based_hrtime();
00154           e->timeout_at = cur_time + e->period;
00155           if (e->timeout_at < cur_time)
00156             e->timeout_at = cur_time;
00157         }
00158         EventQueueExternal.enqueue_local(e);
00159       }
00160     } else if (!e->in_the_prot_queue && !e->in_the_priority_queue)
00161       free_event(e);
00162   }
00163 }
00164 
00165 //
00166 // void  EThread::execute()
00167 //
00168 // Execute loops forever on:
00169 // Find the earliest event.
00170 // Sleep until the event time or until an earlier event is inserted
00171 // When its time for the event, try to get the appropriate continuation
00172 // lock. If successful, call the continuation, otherwise put the event back
00173 // into the queue.
00174 //
00175 
00176 void
00177 EThread::execute() {
00178   switch (tt) {
00179 
00180     case REGULAR: {
00181       Event *e;
00182       Que(Event, link) NegativeQueue;
00183       ink_hrtime next_time = 0;
00184 
00185       // give priority to immediate events
00186       for (;;) {
00187         // execute all the available external events that have
00188         // already been dequeued
00189         cur_time = ink_get_based_hrtime_internal();
00190         while ((e = EventQueueExternal.dequeue_local())) {
00191           if (e->cancelled)
00192              free_event(e);
00193           else if (!e->timeout_at) { // IMMEDIATE
00194             ink_assert(e->period == 0);
00195             process_event(e, e->callback_event);
00196           } else if (e->timeout_at > 0) // INTERVAL
00197             EventQueue.enqueue(e, cur_time);
00198           else { // NEGATIVE
00199             Event *p = NULL;
00200             Event *a = NegativeQueue.head;
00201             while (a && a->timeout_at > e->timeout_at) {
00202               p = a;
00203               a = a->link.next;
00204             }
00205             if (!a)
00206               NegativeQueue.enqueue(e);
00207             else
00208               NegativeQueue.insert(e, p);
00209           }
00210         }
00211         bool done_one;
00212         do {
00213           done_one = false;
00214           // execute all the eligible internal events
00215           EventQueue.check_ready(cur_time, this);
00216           while ((e = EventQueue.dequeue_ready(cur_time))) {
00217             ink_assert(e);
00218             ink_assert(e->timeout_at > 0);
00219             if (e->cancelled)
00220               free_event(e);
00221             else {
00222               done_one = true;
00223               process_event(e, e->callback_event);
00224             }
00225           }
00226         } while (done_one);
00227         // execute any negative (poll) events
00228         if (NegativeQueue.head) {
00229           if (n_ethreads_to_be_signalled)
00230             flush_signals(this);
00231           // dequeue all the external events and put them in a local
00232           // queue. If there are no external events available, don't
00233           // do a cond_timedwait.
00234           if (!INK_ATOMICLIST_EMPTY(EventQueueExternal.al))
00235             EventQueueExternal.dequeue_timed(cur_time, next_time, false);
00236           while ((e = EventQueueExternal.dequeue_local())) {
00237             if (!e->timeout_at)
00238               process_event(e, e->callback_event);
00239             else {
00240               if (e->cancelled)
00241                 free_event(e);
00242               else {
00243                 // If its a negative event, it must be a result of
00244                 // a negative event, which has been turned into a
00245                 // timed-event (because of a missed lock), executed
00246                 // before the poll. So, it must
00247                 // be executed in this round (because you can't have
00248                 // more than one poll between two executions of a
00249                 // negative event)
00250                 if (e->timeout_at < 0) {
00251                   Event *p = NULL;
00252                   Event *a = NegativeQueue.head;
00253                   while (a && a->timeout_at > e->timeout_at) {
00254                     p = a;
00255                     a = a->link.next;
00256                   }
00257                   if (!a)
00258                     NegativeQueue.enqueue(e);
00259                   else
00260                     NegativeQueue.insert(e, p);
00261                 } else
00262                   EventQueue.enqueue(e, cur_time);
00263               }
00264             }
00265           }
00266           // execute poll events
00267           while ((e = NegativeQueue.dequeue()))
00268             process_event(e, EVENT_POLL);
00269           if (!INK_ATOMICLIST_EMPTY(EventQueueExternal.al))
00270             EventQueueExternal.dequeue_timed(cur_time, next_time, false);
00271         } else {                // Means there are no negative events
00272           next_time = EventQueue.earliest_timeout();
00273           ink_hrtime sleep_time = next_time - cur_time;
00274 
00275           if (sleep_time > THREAD_MAX_HEARTBEAT_MSECONDS * HRTIME_MSECOND) {
00276             next_time = cur_time + THREAD_MAX_HEARTBEAT_MSECONDS * HRTIME_MSECOND;
00277           }
00278           // dequeue all the external events and put them in a local
00279           // queue. If there are no external events available, do a
00280           // cond_timedwait.
00281           if (n_ethreads_to_be_signalled)
00282             flush_signals(this);
00283           EventQueueExternal.dequeue_timed(cur_time, next_time, true);
00284         }
00285       }
00286     }
00287 
00288     case DEDICATED: {
00289       // coverity[lock]
00290       MUTEX_TAKE_LOCK_FOR(oneevent->mutex, this, oneevent->continuation);
00291       oneevent->continuation->handleEvent(EVENT_IMMEDIATE, oneevent);
00292       MUTEX_UNTAKE_LOCK(oneevent->mutex, this);
00293       free_event(oneevent);
00294       break;
00295     }
00296 
00297     default:
00298       ink_assert(!"bad case value (execute)");
00299       break;
00300   }                             /* End switch */
00301   // coverity[missing_unlock]
00302 }

Generated by  doxygen 1.7.1