00001 /** @file 00002 00003 FIFO queue 00004 00005 @section license License 00006 00007 Licensed to the Apache Software Foundation (ASF) under one 00008 or more contributor license agreements. See the NOTICE file 00009 distributed with this work for additional information 00010 regarding copyright ownership. The ASF licenses this file 00011 to you under the Apache License, Version 2.0 (the 00012 "License"); you may not use this file except in compliance 00013 with the License. You may obtain a copy of the License at 00014 00015 http://www.apache.org/licenses/LICENSE-2.0 00016 00017 Unless required by applicable law or agreed to in writing, software 00018 distributed under the License is distributed on an "AS IS" BASIS, 00019 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00020 See the License for the specific language governing permissions and 00021 limitations under the License. 00022 00023 @section details Details 00024 00025 ProtectedQueue implements a FIFO queue with the following functionality: 00026 -# Multiple threads could be simultaneously trying to enqueue and 00027 dequeue. Hence the queue needs to be protected with mutex. 00028 -# In case the queue is empty, dequeue() sleeps for a specified amount 00029 of time, or until a new element is inserted, whichever is earlier. 00030 00031 */ 00032 00033 #include "P_EventSystem.h" 00034 00035 00036 // The protected queue is designed to delay signaling of threads 00037 // until some amount of work has been completed on the current thread 00038 // in order to prevent excess context switches. 00039 // 00040 // Defining EAGER_SIGNALLING disables this behavior and causes 00041 // threads to be made runnable immediately. 00042 // 00043 // #define EAGER_SIGNALLING 00044 00045 extern ClassAllocator<Event> eventAllocator; 00046 00047 void 00048 ProtectedQueue::enqueue(Event *e , bool fast_signal) 00049 { 00050 ink_assert(!e->in_the_prot_queue && !e->in_the_priority_queue); 00051 EThread *e_ethread = e->ethread; 00052 e->in_the_prot_queue = 1; 00053 bool was_empty = (ink_atomiclist_push(&al, e) == NULL); 00054 00055 if (was_empty) { 00056 EThread *inserting_thread = this_ethread(); 00057 // queue e->ethread in the list of threads to be signalled 00058 // inserting_thread == 0 means it is not a regular EThread 00059 if (inserting_thread != e_ethread) { 00060 if (!inserting_thread || !inserting_thread->ethreads_to_be_signalled) { 00061 signal(); 00062 if (fast_signal) { 00063 if (e_ethread->signal_hook) 00064 e_ethread->signal_hook(e_ethread); 00065 } 00066 } else { 00067 #ifdef EAGER_SIGNALLING 00068 // Try to signal now and avoid deferred posting. 00069 if (e_ethread->EventQueueExternal.try_signal()) 00070 return; 00071 #endif 00072 if (fast_signal) { 00073 if (e_ethread->signal_hook) 00074 e_ethread->signal_hook(e_ethread); 00075 } 00076 int &t = inserting_thread->n_ethreads_to_be_signalled; 00077 EThread **sig_e = inserting_thread->ethreads_to_be_signalled; 00078 if ((t + 1) >= eventProcessor.n_ethreads) { 00079 // we have run out of room 00080 if ((t + 1) == eventProcessor.n_ethreads) { 00081 // convert to direct map, put each ethread (sig_e[i]) into 00082 // the direct map loation: sig_e[sig_e[i]->id] 00083 for (int i = 0; i < t; i++) { 00084 EThread *cur = sig_e[i]; // put this ethread 00085 while (cur) { 00086 EThread *next = sig_e[cur->id]; // into this location 00087 if (next == cur) 00088 break; 00089 sig_e[cur->id] = cur; 00090 cur = next; 00091 } 00092 // if not overwritten 00093 if (sig_e[i] && sig_e[i]->id != i) 00094 sig_e[i] = 0; 00095 } 00096 t++; 00097 } 00098 // we have a direct map, insert this EThread 00099 sig_e[e_ethread->id] = e_ethread; 00100 } else 00101 // insert into vector 00102 sig_e[t++] = e_ethread; 00103 } 00104 } 00105 } 00106 } 00107 00108 void 00109 flush_signals(EThread * thr) 00110 { 00111 ink_assert(this_ethread() == thr); 00112 int n = thr->n_ethreads_to_be_signalled; 00113 if (n > eventProcessor.n_ethreads) 00114 n = eventProcessor.n_ethreads; // MAX 00115 int i; 00116 00117 // Since the lock is only there to prevent a race in ink_cond_timedwait 00118 // the lock is taken only for a short time, thus it is unlikely that 00119 // this code has any effect. 00120 #ifdef EAGER_SIGNALLING 00121 for (i = 0; i < n; i++) { 00122 // Try to signal as many threads as possible without blocking. 00123 if (thr->ethreads_to_be_signalled[i]) { 00124 if (thr->ethreads_to_be_signalled[i]->EventQueueExternal.try_signal()) 00125 thr->ethreads_to_be_signalled[i] = 0; 00126 } 00127 } 00128 #endif 00129 for (i = 0; i < n; i++) { 00130 if (thr->ethreads_to_be_signalled[i]) { 00131 thr->ethreads_to_be_signalled[i]->EventQueueExternal.signal(); 00132 if (thr->ethreads_to_be_signalled[i]->signal_hook) 00133 thr->ethreads_to_be_signalled[i]->signal_hook(thr->ethreads_to_be_signalled[i]); 00134 thr->ethreads_to_be_signalled[i] = 0; 00135 } 00136 } 00137 thr->n_ethreads_to_be_signalled = 0; 00138 } 00139 00140 void 00141 ProtectedQueue::dequeue_timed(ink_hrtime cur_time, ink_hrtime timeout, bool sleep) 00142 { 00143 (void) cur_time; 00144 Event *e; 00145 if (sleep) { 00146 ink_mutex_acquire(&lock); 00147 if (INK_ATOMICLIST_EMPTY(al)) { 00148 timespec ts = ink_hrtime_to_timespec(timeout); 00149 ink_cond_timedwait(&might_have_data, &lock, &ts); 00150 } 00151 ink_mutex_release(&lock); 00152 } 00153 00154 e = (Event *) ink_atomiclist_popall(&al); 00155 // invert the list, to preserve order 00156 SLL<Event, Event::Link_link> l, t; 00157 t.head = e; 00158 while ((e = t.pop())) 00159 l.push(e); 00160 // insert into localQueue 00161 while ((e = l.pop())) { 00162 if (!e->cancelled) 00163 localQueue.enqueue(e); 00164 else { 00165 e->mutex = NULL; 00166 eventAllocator.free(e); 00167 } 00168 } 00169 }