• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

ProtectedQueue.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   FIFO queue
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022 
00023   @section details Details
00024 
00025   ProtectedQueue implements a FIFO queue with the following functionality:
00026     -# Multiple threads could be simultaneously trying to enqueue and
00027       dequeue. Hence the queue needs to be protected with mutex.
00028     -# In case the queue is empty, dequeue() sleeps for a specified amount
00029       of time, or until a new element is inserted, whichever is earlier.
00030 
00031 */
00032 
00033 #include "P_EventSystem.h"
00034 
00035 
00036 // The protected queue is designed to delay signaling of threads
00037 // until some amount of work has been completed on the current thread
00038 // in order to prevent excess context switches.
00039 //
00040 // Defining EAGER_SIGNALLING disables this behavior and causes
00041 // threads to be made runnable immediately.
00042 //
00043 // #define EAGER_SIGNALLING
00044 
00045 extern ClassAllocator<Event> eventAllocator;
00046 
00047 void
00048 ProtectedQueue::enqueue(Event *e , bool fast_signal)
00049 {
00050   ink_assert(!e->in_the_prot_queue && !e->in_the_priority_queue);
00051   EThread *e_ethread = e->ethread;
00052   e->in_the_prot_queue = 1;
00053   bool was_empty = (ink_atomiclist_push(&al, e) == NULL);
00054 
00055   if (was_empty) {
00056     EThread *inserting_thread = this_ethread();
00057     // queue e->ethread in the list of threads to be signalled
00058     // inserting_thread == 0 means it is not a regular EThread
00059     if (inserting_thread != e_ethread) {
00060       if (!inserting_thread || !inserting_thread->ethreads_to_be_signalled) {
00061         signal();
00062         if (fast_signal) {
00063           if (e_ethread->signal_hook)
00064             e_ethread->signal_hook(e_ethread);
00065         }
00066       } else {
00067 #ifdef EAGER_SIGNALLING
00068         // Try to signal now and avoid deferred posting.
00069         if (e_ethread->EventQueueExternal.try_signal())
00070           return;
00071 #endif
00072         if (fast_signal) {
00073           if (e_ethread->signal_hook)
00074             e_ethread->signal_hook(e_ethread);
00075         }
00076         int &t = inserting_thread->n_ethreads_to_be_signalled;
00077         EThread **sig_e = inserting_thread->ethreads_to_be_signalled;
00078         if ((t + 1) >= eventProcessor.n_ethreads) {
00079           // we have run out of room
00080           if ((t + 1) == eventProcessor.n_ethreads) {
00081             // convert to direct map, put each ethread (sig_e[i]) into
00082             // the direct map loation: sig_e[sig_e[i]->id]
00083             for (int i = 0; i < t; i++) {
00084               EThread *cur = sig_e[i];  // put this ethread
00085               while (cur) {
00086                 EThread *next = sig_e[cur->id]; // into this location
00087                 if (next == cur)
00088                   break;
00089                 sig_e[cur->id] = cur;
00090                 cur = next;
00091               }
00092               // if not overwritten
00093               if (sig_e[i] && sig_e[i]->id != i)
00094                 sig_e[i] = 0;
00095             }
00096             t++;
00097           }
00098           // we have a direct map, insert this EThread
00099           sig_e[e_ethread->id] = e_ethread;
00100         } else
00101           // insert into vector
00102           sig_e[t++] = e_ethread;
00103       }
00104     }
00105   }
00106 }
00107 
00108 void
00109 flush_signals(EThread * thr)
00110 {
00111   ink_assert(this_ethread() == thr);
00112   int n = thr->n_ethreads_to_be_signalled;
00113   if (n > eventProcessor.n_ethreads)
00114     n = eventProcessor.n_ethreads;      // MAX
00115   int i;
00116 
00117   // Since the lock is only there to prevent a race in ink_cond_timedwait
00118   // the lock is taken only for a short time, thus it is unlikely that
00119   // this code has any effect.
00120 #ifdef EAGER_SIGNALLING
00121   for (i = 0; i < n; i++) {
00122     // Try to signal as many threads as possible without blocking.
00123     if (thr->ethreads_to_be_signalled[i]) {
00124       if (thr->ethreads_to_be_signalled[i]->EventQueueExternal.try_signal())
00125         thr->ethreads_to_be_signalled[i] = 0;
00126     }
00127   }
00128 #endif
00129   for (i = 0; i < n; i++) {
00130     if (thr->ethreads_to_be_signalled[i]) {
00131       thr->ethreads_to_be_signalled[i]->EventQueueExternal.signal();
00132       if (thr->ethreads_to_be_signalled[i]->signal_hook)
00133         thr->ethreads_to_be_signalled[i]->signal_hook(thr->ethreads_to_be_signalled[i]);
00134       thr->ethreads_to_be_signalled[i] = 0;
00135     }
00136   }
00137   thr->n_ethreads_to_be_signalled = 0;
00138 }
00139 
00140 void
00141 ProtectedQueue::dequeue_timed(ink_hrtime cur_time, ink_hrtime timeout, bool sleep)
00142 {
00143   (void) cur_time;
00144   Event *e;
00145   if (sleep) {
00146     ink_mutex_acquire(&lock);
00147     if (INK_ATOMICLIST_EMPTY(al)) {
00148       timespec ts = ink_hrtime_to_timespec(timeout);
00149       ink_cond_timedwait(&might_have_data, &lock, &ts);
00150     }
00151     ink_mutex_release(&lock);
00152   }
00153 
00154   e = (Event *) ink_atomiclist_popall(&al);
00155   // invert the list, to preserve order
00156   SLL<Event, Event::Link_link> l, t;
00157   t.head = e;
00158   while ((e = t.pop()))
00159     l.push(e);
00160   // insert into localQueue
00161   while ((e = l.pop())) {
00162     if (!e->cancelled)
00163       localQueue.enqueue(e);
00164     else {
00165       e->mutex = NULL;
00166       eventAllocator.free(e);
00167     }
00168   }
00169 }

Generated by  doxygen 1.7.1