1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.DefaultSelectStrategyFactory;
20 import io.netty.channel.IoHandle;
21 import io.netty.channel.IoHandler;
22 import io.netty.channel.IoHandlerContext;
23 import io.netty.channel.IoHandlerFactory;
24 import io.netty.channel.IoOps;
25 import io.netty.channel.IoRegistration;
26 import io.netty.channel.SelectStrategy;
27 import io.netty.channel.SelectStrategyFactory;
28 import io.netty.channel.unix.FileDescriptor;
29 import io.netty.util.IntSupplier;
30 import io.netty.util.collection.LongObjectHashMap;
31 import io.netty.util.collection.LongObjectMap;
32 import io.netty.util.concurrent.ThreadAwareExecutor;
33 import io.netty.util.internal.ObjectUtil;
34 import io.netty.util.internal.StringUtil;
35 import io.netty.util.internal.logging.InternalLogger;
36 import io.netty.util.internal.logging.InternalLoggerFactory;
37
38 import java.io.IOException;
39 import java.util.ArrayDeque;
40 import java.util.ArrayList;
41 import java.util.Collections;
42 import java.util.List;
43 import java.util.Queue;
44 import java.util.concurrent.atomic.AtomicBoolean;
45 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
46
47 import static java.lang.Math.min;
48
49
50
51
52 public final class KQueueIoHandler implements IoHandler {
53 private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueIoHandler.class);
54 private static final AtomicIntegerFieldUpdater<KQueueIoHandler> WAKEN_UP_UPDATER =
55 AtomicIntegerFieldUpdater.newUpdater(KQueueIoHandler.class, "wakenUp");
56 private static final int KQUEUE_WAKE_UP_IDENT = 0;
57
58
59
60 private static final int KQUEUE_MAX_TIMEOUT_SECONDS = 86399;
61
62 {
63 KQueue.ensureAvailability();
64 }
65
66 private final boolean allowGrowing;
67 private final FileDescriptor kqueueFd;
68 private final KQueueEventArray changeList;
69 private final KQueueEventArray eventList;
70 private final SelectStrategy selectStrategy;
71 private final NativeArrays nativeArrays;
72 private final IntSupplier selectNowSupplier = new IntSupplier() {
73 @Override
74 public int get() throws Exception {
75 return kqueueWaitNow();
76 }
77 };
78 private final ThreadAwareExecutor executor;
79 private final Queue<DefaultKqueueIoRegistration> cancelledRegistrations = new ArrayDeque<>();
80 private final LongObjectMap<DefaultKqueueIoRegistration> registrations = new LongObjectHashMap<>(4096);
81 private int numChannels;
82 private long nextId;
83
84 private volatile int wakenUp;
85
86 private long generateNextId() {
87 boolean reset = false;
88 for (;;) {
89 if (nextId == Long.MAX_VALUE) {
90 if (reset) {
91 throw new IllegalStateException("All possible ids in use");
92 }
93 reset = true;
94 }
95 nextId++;
96 if (nextId == KQUEUE_WAKE_UP_IDENT) {
97 continue;
98 }
99 if (!registrations.containsKey(nextId)) {
100 return nextId;
101 }
102 }
103 }
104
105
106
107
108 public static IoHandlerFactory newFactory() {
109 return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
110 }
111
112
113
114
115 public static IoHandlerFactory newFactory(final int maxEvents,
116 final SelectStrategyFactory selectStrategyFactory) {
117 KQueue.ensureAvailability();
118 ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
119 ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
120 return new IoHandlerFactory() {
121 @Override
122 public IoHandler newHandler(ThreadAwareExecutor executor) {
123 return new KQueueIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
124 }
125
126 @Override
127 public boolean isChangingThreadSupported() {
128 return true;
129 }
130 };
131 }
132
133 private KQueueIoHandler(ThreadAwareExecutor executor, int maxEvents, SelectStrategy strategy) {
134 this.executor = ObjectUtil.checkNotNull(executor, "executor");
135 this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
136 this.kqueueFd = Native.newKQueue();
137 if (maxEvents == 0) {
138 allowGrowing = true;
139 maxEvents = 4096;
140 } else {
141 allowGrowing = false;
142 }
143 this.changeList = new KQueueEventArray(maxEvents);
144 this.eventList = new KQueueEventArray(maxEvents);
145 nativeArrays = new NativeArrays();
146 int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
147 if (result < 0) {
148 destroy();
149 throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
150 }
151 }
152
153 @Override
154 public void wakeup() {
155 if (!executor.isExecutorThread(Thread.currentThread())
156 && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
157 wakeup0();
158 }
159 }
160
161 private void wakeup0() {
162 Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
163
164
165 }
166
167 private int kqueueWait(IoHandlerContext context, boolean oldWakeup) throws IOException {
168
169
170
171
172 if (oldWakeup && !context.canBlock()) {
173 return kqueueWaitNow();
174 }
175
176 long totalDelay = context.delayNanos(System.nanoTime());
177 int delaySeconds = (int) min(totalDelay / 1000000000L, KQUEUE_MAX_TIMEOUT_SECONDS);
178 int delayNanos = (int) (totalDelay % 1000000000L);
179 return kqueueWait(delaySeconds, delayNanos);
180 }
181
182 private int kqueueWaitNow() throws IOException {
183 return kqueueWait(0, 0);
184 }
185
186 private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
187 int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
188 changeList.clear();
189 return numEvents;
190 }
191
192 private void processReady(int ready) {
193 for (int i = 0; i < ready; ++i) {
194 final short filter = eventList.filter(i);
195 final short flags = eventList.flags(i);
196 final int ident = eventList.ident(i);
197 if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
198
199
200 assert filter != Native.EVFILT_USER ||
201 (filter == Native.EVFILT_USER && ident == KQUEUE_WAKE_UP_IDENT);
202 continue;
203 }
204
205 long id = eventList.udata(i);
206 DefaultKqueueIoRegistration registration = registrations.get(id);
207 if (registration == null) {
208
209
210
211 logger.warn("events[{}]=[{}, {}, {}] had no registration!", i, ident, id, filter);
212 continue;
213 }
214 registration.handle(ident, filter, flags, eventList.fflags(i), eventList.data(i), id);
215 }
216 }
217
218 @Override
219 public int run(IoHandlerContext context) {
220 int handled = 0;
221 try {
222 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
223 switch (strategy) {
224 case SelectStrategy.CONTINUE:
225 if (context.shouldReportActiveIoTime()) {
226 context.reportActiveIoTime(0);
227 }
228 return 0;
229
230 case SelectStrategy.BUSY_WAIT:
231
232
233 case SelectStrategy.SELECT:
234 strategy = kqueueWait(context, WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264 if (wakenUp == 1) {
265 wakeup0();
266 }
267
268 default:
269 }
270
271 if (strategy > 0) {
272 handled = strategy;
273 if (context.shouldReportActiveIoTime()) {
274
275 long activeIoStartTimeNanos = System.nanoTime();
276 processReady(strategy);
277 long activeIoEndTimeNanos = System.nanoTime();
278 context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
279 } else {
280 processReady(strategy);
281 }
282 } else if (context.shouldReportActiveIoTime()) {
283 context.reportActiveIoTime(0);
284 }
285
286 if (allowGrowing && strategy == eventList.capacity()) {
287
288 eventList.realloc(false);
289 }
290 } catch (Error e) {
291 throw e;
292 } catch (Throwable t) {
293 handleLoopException(t);
294 } finally {
295 processCancelledRegistrations();
296 }
297 return handled;
298 }
299
300
301 private void processCancelledRegistrations() {
302 for (;;) {
303 DefaultKqueueIoRegistration cancelledRegistration = cancelledRegistrations.poll();
304 if (cancelledRegistration == null) {
305 return;
306 }
307 DefaultKqueueIoRegistration removed = registrations.remove(cancelledRegistration.id);
308 assert removed == cancelledRegistration;
309 if (removed.isHandleForChannel()) {
310 numChannels--;
311 }
312 removed.handle.unregistered();
313 }
314 }
315
316 int numRegisteredChannels() {
317 return numChannels;
318 }
319
320 List<Channel> registeredChannelsList() {
321 LongObjectMap<DefaultKqueueIoRegistration> ch = registrations;
322 if (ch.isEmpty()) {
323 return Collections.emptyList();
324 }
325
326 List<Channel> channels = new ArrayList<>(ch.size());
327
328 for (DefaultKqueueIoRegistration registration : ch.values()) {
329 if (registration.handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
330 channels.add(((AbstractKQueueChannel.AbstractKQueueUnsafe) registration.handle).channel());
331 }
332 }
333 return Collections.unmodifiableList(channels);
334 }
335
336 private static void handleLoopException(Throwable t) {
337 logger.warn("Unexpected exception in the selector loop.", t);
338
339
340
341 try {
342 Thread.sleep(1000);
343 } catch (InterruptedException e) {
344
345 }
346 }
347
348 @Override
349 public void prepareToDestroy() {
350 try {
351 kqueueWaitNow();
352 } catch (IOException e) {
353
354 }
355
356
357
358 DefaultKqueueIoRegistration[] copy = registrations.values().toArray(new DefaultKqueueIoRegistration[0]);
359
360 for (DefaultKqueueIoRegistration reg: copy) {
361 reg.close();
362 }
363
364 processCancelledRegistrations();
365 }
366
367 @Override
368 public void destroy() {
369 try {
370 try {
371 kqueueFd.close();
372 } catch (IOException e) {
373 logger.warn("Failed to close the kqueue fd.", e);
374 }
375 } finally {
376
377 nativeArrays.free();
378 changeList.free();
379 eventList.free();
380 }
381 }
382
383 @Override
384 public IoRegistration register(IoHandle handle) {
385 final KQueueIoHandle kqueueHandle = cast(handle);
386 if (kqueueHandle.ident() == KQUEUE_WAKE_UP_IDENT) {
387 throw new IllegalArgumentException("ident " + KQUEUE_WAKE_UP_IDENT + " is reserved for internal usage");
388 }
389
390 DefaultKqueueIoRegistration registration = new DefaultKqueueIoRegistration(
391 executor, kqueueHandle);
392 DefaultKqueueIoRegistration old = registrations.put(registration.id, registration);
393 if (old != null) {
394
395 registrations.put(old.id, old);
396 throw new IllegalStateException();
397 }
398 if (registration.isHandleForChannel()) {
399 numChannels++;
400 }
401 handle.registered();
402 return registration;
403 }
404
405 private static KQueueIoHandle cast(IoHandle handle) {
406 if (handle instanceof KQueueIoHandle) {
407 return (KQueueIoHandle) handle;
408 }
409 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
410 }
411
412 private static KQueueIoOps cast(IoOps ops) {
413 if (ops instanceof KQueueIoOps) {
414 return (KQueueIoOps) ops;
415 }
416 throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
417 }
418
419 @Override
420 public boolean isCompatible(Class<? extends IoHandle> handleType) {
421 return KQueueIoHandle.class.isAssignableFrom(handleType);
422 }
423
424 private final class DefaultKqueueIoRegistration implements IoRegistration {
425 private boolean cancellationPending;
426 private final AtomicBoolean canceled = new AtomicBoolean();
427 private final KQueueIoEvent event = new KQueueIoEvent();
428
429 final KQueueIoHandle handle;
430 final long id;
431 private final ThreadAwareExecutor executor;
432
433 DefaultKqueueIoRegistration(ThreadAwareExecutor executor, KQueueIoHandle handle) {
434 this.executor = executor;
435 this.handle = handle;
436 id = generateNextId();
437 }
438
439 boolean isHandleForChannel() {
440 return handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe;
441 }
442
443 @SuppressWarnings("unchecked")
444 @Override
445 public <T> T attachment() {
446 return (T) nativeArrays;
447 }
448
449 @Override
450 public long submit(IoOps ops) {
451 KQueueIoOps kQueueIoOps = cast(ops);
452 if (!isValid()) {
453 return -1;
454 }
455 short filter = kQueueIoOps.filter();
456 short flags = kQueueIoOps.flags();
457 int fflags = kQueueIoOps.fflags();
458 if (executor.isExecutorThread(Thread.currentThread())) {
459 evSet(filter, flags, fflags);
460 } else {
461 executor.execute(() -> evSet(filter, flags, fflags));
462 }
463 return 0;
464 }
465
466 void handle(int ident, short filter, short flags, int fflags, long data, long udata) {
467 if (cancellationPending) {
468
469 return;
470 }
471 event.update(ident, filter, flags, fflags, data, udata);
472 handle.handle(this, event);
473 }
474
475 private void evSet(short filter, short flags, int fflags) {
476 if (cancellationPending) {
477
478 return;
479 }
480 changeList.evSet(handle.ident(), filter, flags, fflags, 0, id);
481 }
482
483 @Override
484 public boolean isValid() {
485 return !canceled.get();
486 }
487
488 @Override
489 public boolean cancel() {
490 if (!canceled.compareAndSet(false, true)) {
491 return false;
492 }
493 if (executor.isExecutorThread(Thread.currentThread())) {
494 cancel0();
495 } else {
496 executor.execute(this::cancel0);
497 }
498 return true;
499 }
500
501 private void cancel0() {
502
503
504
505 cancellationPending = true;
506 cancelledRegistrations.offer(this);
507 }
508
509 void close() {
510 cancel();
511 try {
512 handle.close();
513 } catch (Exception e) {
514 logger.debug("Exception during closing " + handle, e);
515 }
516 }
517 }
518 }