00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "P_EventSystem.h"
00025 #include <sched.h>
00026 #if TS_USE_HWLOC
00027 #include <alloca.h>
00028 #include <hwloc.h>
00029 #endif
00030 #include "ink_defs.h"
00031
00032 EventType
00033 EventProcessor::spawn_event_threads(int n_threads, const char* et_name, size_t stacksize)
00034 {
00035 char thr_name[MAX_THREAD_NAME_LENGTH];
00036 EventType new_thread_group_id;
00037 int i;
00038
00039 ink_release_assert(n_threads > 0);
00040 ink_release_assert((n_ethreads + n_threads) <= MAX_EVENT_THREADS);
00041 ink_release_assert(n_thread_groups < MAX_EVENT_TYPES);
00042
00043 new_thread_group_id = (EventType) n_thread_groups;
00044
00045 for (i = 0; i < n_threads; i++) {
00046 EThread *t = new EThread(REGULAR, n_ethreads + i);
00047 all_ethreads[n_ethreads + i] = t;
00048 eventthread[new_thread_group_id][i] = t;
00049 t->set_event_type(new_thread_group_id);
00050 }
00051
00052 n_threads_for_type[new_thread_group_id] = n_threads;
00053 for (i = 0; i < n_threads; i++) {
00054 snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[%s %d]", et_name, i);
00055 eventthread[new_thread_group_id][i]->start(thr_name, stacksize);
00056 }
00057
00058 n_thread_groups++;
00059 n_ethreads += n_threads;
00060 Debug("iocore_thread", "Created thread group '%s' id %d with %d threads", et_name, new_thread_group_id, n_threads);
00061
00062 return new_thread_group_id;
00063 }
00064
00065
00066 class EventProcessor eventProcessor;
00067
00068 int
00069 EventProcessor::start(int n_event_threads, size_t stacksize)
00070 {
00071 char thr_name[MAX_THREAD_NAME_LENGTH];
00072 int i;
00073
00074
00075 static int started = 0;
00076 ink_release_assert(!started);
00077 ink_release_assert(n_event_threads > 0 && n_event_threads <= MAX_EVENT_THREADS);
00078 started = 1;
00079
00080 n_ethreads = n_event_threads;
00081 n_thread_groups = 1;
00082
00083 int first_thread = 1;
00084
00085 for (i = 0; i < n_event_threads; i++) {
00086 EThread *t = new EThread(REGULAR, i);
00087 if (first_thread && !i) {
00088 ink_thread_setspecific(Thread::thread_data_key, t);
00089 global_mutex = t->mutex;
00090 t->cur_time = ink_get_based_hrtime_internal();
00091 }
00092 all_ethreads[i] = t;
00093
00094 eventthread[ET_CALL][i] = t;
00095 t->set_event_type((EventType) ET_CALL);
00096 }
00097 n_threads_for_type[ET_CALL] = n_event_threads;
00098
00099 #if TS_USE_HWLOC
00100 int affinity = 0;
00101 REC_ReadConfigInteger(affinity, "proxy.config.exec_thread.affinity");
00102 hwloc_obj_t obj;
00103 hwloc_obj_type_t obj_type;
00104 int obj_count = 0;
00105 char *obj_name;
00106
00107 switch(affinity) {
00108 case 4:
00109
00110 #if HAVE_HWLOC_OBJ_PU
00111 obj_type = HWLOC_OBJ_PU;
00112 obj_name = (char *) "Logical Processor";
00113 break;
00114 #endif
00115 case 3:
00116 obj_type = HWLOC_OBJ_CORE;
00117 obj_name = (char *) "Core";
00118 break;
00119 case 1:
00120 obj_type = HWLOC_OBJ_NODE;
00121 obj_name = (char *) "NUMA Node";
00122 if (hwloc_get_nbobjs_by_type(ink_get_topology(), obj_type) > 0) {
00123 break;
00124 }
00125 case 2:
00126 obj_type = HWLOC_OBJ_SOCKET;
00127 obj_name = (char *) "Socket";
00128 break;
00129 default:
00130 obj_type = HWLOC_OBJ_MACHINE;
00131 obj_name = (char *) "Machine";
00132 }
00133
00134 obj_count = hwloc_get_nbobjs_by_type(ink_get_topology(), obj_type);
00135 Debug("iocore_thread", "Affinity: %d %ss: %d PU: %d", affinity, obj_name, obj_count, ink_number_of_processors());
00136
00137 #endif
00138 for (i = first_thread; i < n_ethreads; i++) {
00139 snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[ET_NET %d]", i);
00140 ink_thread tid = all_ethreads[i]->start(thr_name, stacksize);
00141 (void)tid;
00142 #if TS_USE_HWLOC
00143 if (obj_count > 0) {
00144 obj = hwloc_get_obj_by_type(ink_get_topology(), obj_type, i % obj_count);
00145 #if HWLOC_API_VERSION >= 0x00010100
00146 int cpu_mask_len = hwloc_bitmap_snprintf(NULL, 0, obj->cpuset) + 1;
00147 char *cpu_mask = (char *) alloca(cpu_mask_len);
00148 hwloc_bitmap_snprintf(cpu_mask, cpu_mask_len, obj->cpuset);
00149 Debug("iocore_thread","EThread: %d %s: %d CPU Mask: %s\n", i, obj_name, obj->logical_index, cpu_mask);
00150 #else
00151 Debug("iocore_thread","EThread: %d %s: %d\n", i, obj_name, obj->logical_index);
00152 #endif // HWLOC_API_VERSION
00153 hwloc_set_thread_cpubind(ink_get_topology(), tid, obj->cpuset, HWLOC_CPUBIND_STRICT);
00154 } else {
00155 Warning("hwloc returned an unexpected value -- CPU affinity disabled");
00156 }
00157 #endif // TS_USE_HWLOC
00158 }
00159
00160 Debug("iocore_thread", "Created event thread group id %d with %d threads", ET_CALL, n_event_threads);
00161 return 0;
00162 }
00163
00164 void
00165 EventProcessor::shutdown()
00166 {
00167 }
00168
00169 Event *
00170 EventProcessor::spawn_thread(Continuation *cont, const char* thr_name, size_t stacksize)
00171 {
00172 ink_release_assert(n_dthreads < MAX_EVENT_THREADS);
00173 Event *e = eventAllocator.alloc();
00174
00175 e->init(cont, 0, 0);
00176 all_dthreads[n_dthreads] = new EThread(DEDICATED, e);
00177 e->ethread = all_dthreads[n_dthreads];
00178 e->mutex = e->continuation->mutex = all_dthreads[n_dthreads]->mutex;
00179 n_dthreads++;
00180 e->ethread->start(thr_name, stacksize);
00181
00182 return e;
00183 }