1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.EventLoop;
20 import io.netty.channel.EventLoopGroup;
21 import io.netty.channel.EventLoopTaskQueueFactory;
22 import io.netty.channel.SelectStrategy;
23 import io.netty.channel.SingleThreadEventLoop;
24 import io.netty.channel.kqueue.AbstractKQueueChannel.AbstractKQueueUnsafe;
25 import io.netty.channel.unix.FileDescriptor;
26 import io.netty.channel.unix.IovArray;
27 import io.netty.util.IntSupplier;
28 import io.netty.util.collection.LongObjectHashMap;
29 import io.netty.util.collection.LongObjectMap;
30 import io.netty.util.concurrent.RejectedExecutionHandler;
31 import io.netty.util.internal.ObjectUtil;
32 import io.netty.util.internal.PlatformDependent;
33 import io.netty.util.internal.logging.InternalLogger;
34 import io.netty.util.internal.logging.InternalLoggerFactory;
35
36 import java.io.IOException;
37 import java.util.ArrayDeque;
38 import java.util.Collection;
39 import java.util.Iterator;
40 import java.util.Queue;
41 import java.util.concurrent.Executor;
42 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
43
44 import static java.lang.Math.min;
45
46
47
48
49 final class KQueueEventLoop extends SingleThreadEventLoop {
50 private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueEventLoop.class);
51 private static final AtomicIntegerFieldUpdater<KQueueEventLoop> WAKEN_UP_UPDATER =
52 AtomicIntegerFieldUpdater.newUpdater(KQueueEventLoop.class, "wakenUp");
53 private static final int KQUEUE_WAKE_UP_IDENT = 0;
54
55
56
57 private static final int KQUEUE_MAX_TIMEOUT_SECONDS = 86399;
58
59 static {
60
61
62 KQueue.ensureAvailability();
63 }
64
65 private final boolean allowGrowing;
66 private final FileDescriptor kqueueFd;
67 private final KQueueEventArray changeList;
68 private final KQueueEventArray eventList;
69 private final SelectStrategy selectStrategy;
70 private final IovArray iovArray = new IovArray();
71 private final IntSupplier selectNowSupplier = new IntSupplier() {
72 @Override
73 public int get() throws Exception {
74 return kqueueWaitNow();
75 }
76 };
77 private final LongObjectMap<KQueueRegistration> registrations = new LongObjectHashMap<KQueueRegistration>(4096);
78 private final Queue<KQueueRegistration> cancelledRegistrations = new ArrayDeque<KQueueRegistration>();
79 private long nextId;
80 private volatile int wakenUp;
81 private volatile int ioRatio = 50;
82
83 KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
84 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
85 EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
86 super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
87 rejectedExecutionHandler);
88 this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
89 this.kqueueFd = Native.newKQueue();
90 if (maxEvents == 0) {
91 allowGrowing = true;
92 maxEvents = 4096;
93 } else {
94 allowGrowing = false;
95 }
96 this.changeList = new KQueueEventArray(maxEvents);
97 this.eventList = new KQueueEventArray(maxEvents);
98 int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
99 if (result < 0) {
100 cleanup();
101 throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
102 }
103 }
104
105 final class KQueueRegistration {
106 private final AbstractKQueueChannel channel;
107 private final long id;
108 private boolean removed;
109
110 KQueueRegistration(AbstractKQueueChannel channel, long id) {
111 this.channel = channel;
112 this.id = id;
113 }
114
115 void evSet(short filter, short flags, int fflags, long data) {
116 assert inEventLoop();
117 if (removed) {
118 return;
119 }
120 changeList.evSet(channel.fd().intValue(), filter, flags, fflags, data, id);
121 }
122
123 void remove() throws Exception {
124 assert inEventLoop();
125
126 KQueueRegistration old = registrations.get(id);
127 if (old == null) {
128 return;
129 }
130 assert old == this;
131 if (!old.removed) {
132 cancelledRegistrations.offer(old);
133 }
134 if (channel.isOpen()) {
135
136
137
138
139 channel.unregisterFilters();
140 }
141 old.removed = true;
142 }
143 }
144
145 private static Queue<Runnable> newTaskQueue(
146 EventLoopTaskQueueFactory queueFactory) {
147 if (queueFactory == null) {
148 return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
149 }
150 return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
151 }
152
153 private long generateNextId() {
154 boolean reset = false;
155 for (;;) {
156 if (nextId == Long.MAX_VALUE) {
157 if (reset) {
158 throw new IllegalStateException("All possible ids in use");
159 }
160 reset = true;
161 }
162 nextId++;
163 if (nextId == KQUEUE_WAKE_UP_IDENT) {
164 continue;
165 }
166 if (!registrations.containsKey(nextId)) {
167 return nextId;
168 }
169 }
170 }
171
172
173 private void processCancelledRegistrations() {
174 for (;;) {
175 KQueueRegistration cancelledRegistration = cancelledRegistrations.poll();
176 if (cancelledRegistration == null) {
177 return;
178 }
179 KQueueRegistration removed = registrations.remove(cancelledRegistration.id);
180 assert removed == cancelledRegistration;
181 }
182 }
183
184 KQueueRegistration add(AbstractKQueueChannel ch) {
185 assert inEventLoop();
186
187 long id = generateNextId();
188 KQueueRegistration registration = new KQueueRegistration(ch, id);
189 KQueueRegistration old = registrations.put(id, registration);
190
191 assert old == null;
192 return registration;
193 }
194
195
196
197
198 IovArray cleanArray() {
199 iovArray.clear();
200 return iovArray;
201 }
202
203 @Override
204 protected void wakeup(boolean inEventLoop) {
205 if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
206 wakeup();
207 }
208 }
209
210 private void wakeup() {
211 Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
212
213
214 }
215
216 private int kqueueWait(boolean oldWakeup) throws IOException {
217
218
219
220
221 if (oldWakeup && hasTasks()) {
222 return kqueueWaitNow();
223 }
224
225 long totalDelay = delayNanos(System.nanoTime());
226 int delaySeconds = (int) min(totalDelay / 1000000000L, KQUEUE_MAX_TIMEOUT_SECONDS);
227 int delayNanos = (int) (totalDelay % 1000000000L);
228 return kqueueWait(delaySeconds, delayNanos);
229 }
230
231 private int kqueueWaitNow() throws IOException {
232 return kqueueWait(0, 0);
233 }
234
235 private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
236 int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
237 changeList.clear();
238 return numEvents;
239 }
240
241 private void processReady(int ready) {
242 for (int i = 0; i < ready; ++i) {
243 final short filter = eventList.filter(i);
244 final short flags = eventList.flags(i);
245 final int fd = eventList.fd(i);
246 if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
247
248
249 assert filter != Native.EVFILT_USER ||
250 (filter == Native.EVFILT_USER && fd == KQUEUE_WAKE_UP_IDENT);
251 continue;
252 }
253
254 long id = eventList.udata(i);
255 KQueueRegistration registration = registrations.get(id);
256 if (registration == null) {
257
258
259
260 logger.warn("events[{}]=[{}, {}, {}] had no registration!", i, fd, id, filter);
261 continue;
262 }
263 if (registration.removed) {
264
265 continue;
266 }
267 AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) registration.channel.unsafe();
268
269
270 if (filter == Native.EVFILT_WRITE) {
271 unsafe.writeReady();
272 } else if (filter == Native.EVFILT_READ) {
273
274 unsafe.readReady(eventList.data(i));
275 } else if (filter == Native.EVFILT_SOCK && (eventList.fflags(i) & Native.NOTE_RDHUP) != 0) {
276 unsafe.readEOF();
277 }
278
279
280
281
282 if ((flags & Native.EV_EOF) != 0) {
283 unsafe.readEOF();
284 }
285 }
286 }
287
288 @Override
289 protected void run() {
290 for (;;) {
291 try {
292 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
293 switch (strategy) {
294 case SelectStrategy.CONTINUE:
295 continue;
296
297 case SelectStrategy.BUSY_WAIT:
298
299
300 case SelectStrategy.SELECT:
301 strategy = kqueueWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331 if (wakenUp == 1) {
332 wakeup();
333 }
334
335 default:
336 }
337
338 final int ioRatio = this.ioRatio;
339 if (ioRatio == 100) {
340 try {
341 if (strategy > 0) {
342 processReady(strategy);
343 }
344 } finally {
345 runAllTasks();
346 }
347 } else {
348 final long ioStartTime = System.nanoTime();
349
350 try {
351 if (strategy > 0) {
352 processReady(strategy);
353 }
354 } finally {
355 final long ioTime = System.nanoTime() - ioStartTime;
356 runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
357 }
358 }
359 if (allowGrowing && strategy == eventList.capacity()) {
360
361 eventList.realloc(false);
362 }
363 } catch (Error e) {
364 throw e;
365 } catch (Throwable t) {
366 handleLoopException(t);
367 } finally {
368 processCancelledRegistrations();
369
370 try {
371 if (isShuttingDown()) {
372 closeAll();
373 if (confirmShutdown()) {
374 break;
375 }
376 }
377 } catch (Error e) {
378 throw e;
379 } catch (Throwable t) {
380 handleLoopException(t);
381 }
382 }
383 }
384 }
385
386 @Override
387 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
388 return newTaskQueue0(maxPendingTasks);
389 }
390
391 private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
392
393 return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
394 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
395 }
396
397
398
399
400 public int getIoRatio() {
401 return ioRatio;
402 }
403
404
405
406
407
408 public void setIoRatio(int ioRatio) {
409 if (ioRatio <= 0 || ioRatio > 100) {
410 throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
411 }
412 this.ioRatio = ioRatio;
413 }
414
415 @Override
416 public int registeredChannels() {
417 return registrations.size();
418 }
419
420 @Override
421 public Iterator<Channel> registeredChannelsIterator() {
422 assert inEventLoop();
423 final LongObjectMap<KQueueRegistration> ch = registrations;
424 if (ch.isEmpty()) {
425 return ChannelsReadOnlyIterator.empty();
426 }
427 return new ChannelsReadOnlyIterator<AbstractKQueueChannel>(new Iterable<AbstractKQueueChannel>() {
428 private final Collection<KQueueRegistration> registrations = ch.values();
429 @Override
430 public Iterator<AbstractKQueueChannel> iterator() {
431 final Iterator<KQueueRegistration> registrationIterator = registrations.iterator();
432 return new Iterator<AbstractKQueueChannel>() {
433 @Override
434 public boolean hasNext() {
435 return registrationIterator.hasNext();
436 }
437
438 @Override
439 public AbstractKQueueChannel next() {
440 return registrationIterator.next().channel;
441 }
442 };
443 }
444 });
445 }
446
447 @Override
448 protected void cleanup() {
449 try {
450 try {
451 kqueueFd.close();
452 } catch (IOException e) {
453 logger.warn("Failed to close the kqueue fd.", e);
454 }
455 } finally {
456
457 iovArray.release();
458 changeList.free();
459 eventList.free();
460 }
461 }
462
463 private void closeAll() {
464 try {
465 kqueueWaitNow();
466 } catch (IOException e) {
467
468 }
469
470
471
472 KQueueRegistration[] localRegistrations = registrations.values().toArray(new KQueueRegistration[0]);
473
474 for (KQueueRegistration reg: localRegistrations) {
475 reg.channel.unsafe().close(reg.channel.unsafe().voidPromise());
476 }
477 processCancelledRegistrations();
478 }
479
480 private static void handleLoopException(Throwable t) {
481 logger.warn("Unexpected exception in the selector loop.", t);
482
483
484
485 try {
486 Thread.sleep(1000);
487 } catch (InterruptedException e) {
488
489 }
490 }
491 }