00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #ifndef _P_UnixEventProcessor_h_
00025 #define _P_UnixEventProcessor_h_
00026 #include "I_EventProcessor.h"
00027
00028 const int LOAD_BALANCE_INTERVAL = 1;
00029
00030
00031 TS_INLINE
00032 EventProcessor::EventProcessor():
00033 n_ethreads(0),
00034 n_thread_groups(0),
00035 n_dthreads(0),
00036 thread_data_used(0)
00037 {
00038 memset(all_ethreads, 0, sizeof(all_ethreads));
00039 memset(all_dthreads, 0, sizeof(all_dthreads));
00040 memset(n_threads_for_type, 0, sizeof(n_threads_for_type));
00041 memset(next_thread_for_type, 0, sizeof(next_thread_for_type));
00042 }
00043
00044 TS_INLINE off_t
00045 EventProcessor::allocate(int size)
00046 {
00047 static off_t start = INK_ALIGN(offsetof(EThread, thread_private), 16);
00048 static off_t loss = start - offsetof(EThread, thread_private);
00049 size = INK_ALIGN(size, 16);
00050
00051 int old;
00052 do {
00053 old = thread_data_used;
00054 if (old + loss + size > PER_THREAD_DATA)
00055 return -1;
00056 } while (!ink_atomic_cas(&thread_data_used, old, old + size));
00057
00058 return (off_t) (old + start);
00059 }
00060
00061 TS_INLINE EThread *
00062 EventProcessor::assign_thread(EventType etype)
00063 {
00064 int next;
00065
00066 ink_assert(etype < MAX_EVENT_TYPES);
00067 if (n_threads_for_type[etype] > 1)
00068 next = next_thread_for_type[etype]++ % n_threads_for_type[etype];
00069 else
00070 next = 0;
00071 return (eventthread[etype][next]);
00072 }
00073
00074 TS_INLINE Event *
00075 EventProcessor::schedule(Event * e, EventType etype, bool fast_signal)
00076 {
00077 ink_assert(etype < MAX_EVENT_TYPES);
00078 e->ethread = assign_thread(etype);
00079 if (e->continuation->mutex)
00080 e->mutex = e->continuation->mutex;
00081 else
00082 e->mutex = e->continuation->mutex = e->ethread->mutex;
00083 e->ethread->EventQueueExternal.enqueue(e, fast_signal);
00084 return e;
00085 }
00086
00087
00088 TS_INLINE Event *
00089 EventProcessor::schedule_imm_signal(Continuation * cont, EventType et, int callback_event, void *cookie)
00090 {
00091 Event *e = eventAllocator.alloc();
00092
00093 ink_assert(et < MAX_EVENT_TYPES);
00094 #ifdef ENABLE_TIME_TRACE
00095 e->start_time = ink_get_hrtime();
00096 #endif
00097 e->callback_event = callback_event;
00098 e->cookie = cookie;
00099 return schedule(e->init(cont, 0, 0), et, true);
00100 }
00101
00102 TS_INLINE Event *
00103 EventProcessor::schedule_imm(Continuation * cont, EventType et, int callback_event, void *cookie)
00104 {
00105 Event *e = eventAllocator.alloc();
00106
00107 ink_assert(et < MAX_EVENT_TYPES);
00108 #ifdef ENABLE_TIME_TRACE
00109 e->start_time = ink_get_hrtime();
00110 #endif
00111 e->callback_event = callback_event;
00112 e->cookie = cookie;
00113 return schedule(e->init(cont, 0, 0), et);
00114 }
00115
00116 TS_INLINE Event *
00117 EventProcessor::schedule_at(Continuation * cont, ink_hrtime t, EventType et, int callback_event, void *cookie)
00118 {
00119 Event *e = eventAllocator.alloc();
00120
00121 ink_assert(t > 0);
00122 ink_assert(et < MAX_EVENT_TYPES);
00123 e->callback_event = callback_event;
00124 e->cookie = cookie;
00125 return schedule(e->init(cont, t, 0), et);
00126 }
00127
00128 TS_INLINE Event *
00129 EventProcessor::schedule_in(Continuation * cont, ink_hrtime t, EventType et, int callback_event, void *cookie)
00130 {
00131 Event *e = eventAllocator.alloc();
00132
00133 ink_assert(et < MAX_EVENT_TYPES);
00134 e->callback_event = callback_event;
00135 e->cookie = cookie;
00136 return schedule(e->init(cont, ink_get_based_hrtime() + t, 0), et);
00137 }
00138
00139 TS_INLINE Event *
00140 EventProcessor::schedule_every(Continuation * cont, ink_hrtime t, EventType et, int callback_event, void *cookie)
00141 {
00142 Event *e = eventAllocator.alloc();
00143
00144 ink_assert(t != 0);
00145 ink_assert(et < MAX_EVENT_TYPES);
00146 e->callback_event = callback_event;
00147 e->cookie = cookie;
00148 if (t < 0)
00149 return schedule(e->init(cont, t, t), et);
00150 else
00151 return schedule(e->init(cont, ink_get_based_hrtime() + t, t), et);
00152 }
00153
00154
00155 #endif