1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.DefaultSelectStrategyFactory;
20 import io.netty.channel.IoHandle;
21 import io.netty.channel.IoHandler;
22 import io.netty.channel.IoHandlerContext;
23 import io.netty.channel.IoHandlerFactory;
24 import io.netty.channel.IoOps;
25 import io.netty.channel.IoRegistration;
26 import io.netty.channel.SelectStrategy;
27 import io.netty.channel.SelectStrategyFactory;
28 import io.netty.channel.unix.FileDescriptor;
29 import io.netty.util.IntSupplier;
30 import io.netty.util.collection.LongObjectHashMap;
31 import io.netty.util.collection.LongObjectMap;
32 import io.netty.util.concurrent.ThreadAwareExecutor;
33 import io.netty.util.internal.ObjectUtil;
34 import io.netty.util.internal.StringUtil;
35 import io.netty.util.internal.logging.InternalLogger;
36 import io.netty.util.internal.logging.InternalLoggerFactory;
37
38 import java.io.IOException;
39 import java.util.ArrayDeque;
40 import java.util.ArrayList;
41 import java.util.Collections;
42 import java.util.List;
43 import java.util.Queue;
44 import java.util.concurrent.atomic.AtomicBoolean;
45 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
46
47 import static java.lang.Math.min;
48
49
50
51
52 public final class KQueueIoHandler implements IoHandler {
53 private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueIoHandler.class);
54 private static final AtomicIntegerFieldUpdater<KQueueIoHandler> WAKEN_UP_UPDATER =
55 AtomicIntegerFieldUpdater.newUpdater(KQueueIoHandler.class, "wakenUp");
56 private static final int KQUEUE_WAKE_UP_IDENT = 0;
57
58
59
60 private static final int KQUEUE_MAX_TIMEOUT_SECONDS = 86399;
61
62 static {
63
64
65 KQueue.ensureAvailability();
66 }
67
68 private final boolean allowGrowing;
69 private final FileDescriptor kqueueFd;
70 private final KQueueEventArray changeList;
71 private final KQueueEventArray eventList;
72 private final SelectStrategy selectStrategy;
73 private final NativeArrays nativeArrays;
74 private final IntSupplier selectNowSupplier = new IntSupplier() {
75 @Override
76 public int get() throws Exception {
77 return kqueueWaitNow();
78 }
79 };
80 private final ThreadAwareExecutor executor;
81 private final Queue<DefaultKqueueIoRegistration> cancelledRegistrations = new ArrayDeque<>();
82 private final LongObjectMap<DefaultKqueueIoRegistration> registrations = new LongObjectHashMap<>(4096);
83 private int numChannels;
84 private long nextId;
85
86 private volatile int wakenUp;
87
88 private long generateNextId() {
89 boolean reset = false;
90 for (;;) {
91 if (nextId == Long.MAX_VALUE) {
92 if (reset) {
93 throw new IllegalStateException("All possible ids in use");
94 }
95 reset = true;
96 }
97 nextId++;
98 if (nextId == KQUEUE_WAKE_UP_IDENT) {
99 continue;
100 }
101 if (!registrations.containsKey(nextId)) {
102 return nextId;
103 }
104 }
105 }
106
107
108
109
110 public static IoHandlerFactory newFactory() {
111 return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
112 }
113
114
115
116
117 public static IoHandlerFactory newFactory(final int maxEvents,
118 final SelectStrategyFactory selectStrategyFactory) {
119 ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
120 ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
121 return new IoHandlerFactory() {
122 @Override
123 public IoHandler newHandler(ThreadAwareExecutor executor) {
124 return new KQueueIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
125 }
126
127 @Override
128 public boolean isChangingThreadSupported() {
129 return true;
130 }
131 };
132 }
133
134 private KQueueIoHandler(ThreadAwareExecutor executor, int maxEvents, SelectStrategy strategy) {
135 this.executor = ObjectUtil.checkNotNull(executor, "executor");
136 this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
137 this.kqueueFd = Native.newKQueue();
138 if (maxEvents == 0) {
139 allowGrowing = true;
140 maxEvents = 4096;
141 } else {
142 allowGrowing = false;
143 }
144 this.changeList = new KQueueEventArray(maxEvents);
145 this.eventList = new KQueueEventArray(maxEvents);
146 nativeArrays = new NativeArrays();
147 int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
148 if (result < 0) {
149 destroy();
150 throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
151 }
152 }
153
154 @Override
155 public void wakeup() {
156 if (!executor.isExecutorThread(Thread.currentThread())
157 && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
158 wakeup0();
159 }
160 }
161
162 private void wakeup0() {
163 Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
164
165
166 }
167
168 private int kqueueWait(IoHandlerContext context, boolean oldWakeup) throws IOException {
169
170
171
172
173 if (oldWakeup && !context.canBlock()) {
174 return kqueueWaitNow();
175 }
176
177 long totalDelay = context.delayNanos(System.nanoTime());
178 int delaySeconds = (int) min(totalDelay / 1000000000L, KQUEUE_MAX_TIMEOUT_SECONDS);
179 int delayNanos = (int) (totalDelay % 1000000000L);
180 return kqueueWait(delaySeconds, delayNanos);
181 }
182
183 private int kqueueWaitNow() throws IOException {
184 return kqueueWait(0, 0);
185 }
186
187 private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
188 int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
189 changeList.clear();
190 return numEvents;
191 }
192
193 private void processReady(int ready) {
194 for (int i = 0; i < ready; ++i) {
195 final short filter = eventList.filter(i);
196 final short flags = eventList.flags(i);
197 final int ident = eventList.ident(i);
198 if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
199
200
201 assert filter != Native.EVFILT_USER ||
202 (filter == Native.EVFILT_USER && ident == KQUEUE_WAKE_UP_IDENT);
203 continue;
204 }
205
206 long id = eventList.udata(i);
207 DefaultKqueueIoRegistration registration = registrations.get(id);
208 if (registration == null) {
209
210
211
212 logger.warn("events[{}]=[{}, {}, {}] had no registration!", i, ident, id, filter);
213 continue;
214 }
215 registration.handle(ident, filter, flags, eventList.fflags(i), eventList.data(i), id);
216 }
217 }
218
219 @Override
220 public int run(IoHandlerContext context) {
221 int handled = 0;
222 try {
223 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
224 switch (strategy) {
225 case SelectStrategy.CONTINUE:
226 if (context.shouldReportActiveIoTime()) {
227 context.reportActiveIoTime(0);
228 }
229 return 0;
230
231 case SelectStrategy.BUSY_WAIT:
232
233
234 case SelectStrategy.SELECT:
235 strategy = kqueueWait(context, WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265 if (wakenUp == 1) {
266 wakeup0();
267 }
268
269 default:
270 }
271
272 if (strategy > 0) {
273 handled = strategy;
274 if (context.shouldReportActiveIoTime()) {
275
276 long activeIoStartTimeNanos = System.nanoTime();
277 processReady(strategy);
278 long activeIoEndTimeNanos = System.nanoTime();
279 context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
280 } else {
281 processReady(strategy);
282 }
283 } else if (context.shouldReportActiveIoTime()) {
284 context.reportActiveIoTime(0);
285 }
286
287 if (allowGrowing && strategy == eventList.capacity()) {
288
289 eventList.realloc(false);
290 }
291 } catch (Error e) {
292 throw e;
293 } catch (Throwable t) {
294 handleLoopException(t);
295 } finally {
296 processCancelledRegistrations();
297 }
298 return handled;
299 }
300
301
302 private void processCancelledRegistrations() {
303 for (;;) {
304 DefaultKqueueIoRegistration cancelledRegistration = cancelledRegistrations.poll();
305 if (cancelledRegistration == null) {
306 return;
307 }
308 DefaultKqueueIoRegistration removed = registrations.remove(cancelledRegistration.id);
309 assert removed == cancelledRegistration;
310 if (removed.isHandleForChannel()) {
311 numChannels--;
312 }
313 }
314 }
315
316 int numRegisteredChannels() {
317 return numChannels;
318 }
319
320 List<Channel> registeredChannelsList() {
321 LongObjectMap<DefaultKqueueIoRegistration> ch = registrations;
322 if (ch.isEmpty()) {
323 return Collections.emptyList();
324 }
325
326 List<Channel> channels = new ArrayList<>(ch.size());
327
328 for (DefaultKqueueIoRegistration registration : ch.values()) {
329 if (registration.handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
330 channels.add(((AbstractKQueueChannel.AbstractKQueueUnsafe) registration.handle).channel());
331 }
332 }
333 return Collections.unmodifiableList(channels);
334 }
335
336 private static void handleLoopException(Throwable t) {
337 logger.warn("Unexpected exception in the selector loop.", t);
338
339
340
341 try {
342 Thread.sleep(1000);
343 } catch (InterruptedException e) {
344
345 }
346 }
347
348 @Override
349 public void prepareToDestroy() {
350 try {
351 kqueueWaitNow();
352 } catch (IOException e) {
353
354 }
355
356
357
358 DefaultKqueueIoRegistration[] copy = registrations.values().toArray(new DefaultKqueueIoRegistration[0]);
359
360 for (DefaultKqueueIoRegistration reg: copy) {
361 reg.close();
362 }
363
364 processCancelledRegistrations();
365 }
366
367 @Override
368 public void destroy() {
369 try {
370 try {
371 kqueueFd.close();
372 } catch (IOException e) {
373 logger.warn("Failed to close the kqueue fd.", e);
374 }
375 } finally {
376
377 nativeArrays.free();
378 changeList.free();
379 eventList.free();
380 }
381 }
382
383 @Override
384 public IoRegistration register(IoHandle handle) {
385 final KQueueIoHandle kqueueHandle = cast(handle);
386 if (kqueueHandle.ident() == KQUEUE_WAKE_UP_IDENT) {
387 throw new IllegalArgumentException("ident " + KQUEUE_WAKE_UP_IDENT + " is reserved for internal usage");
388 }
389
390 DefaultKqueueIoRegistration registration = new DefaultKqueueIoRegistration(
391 executor, kqueueHandle);
392 DefaultKqueueIoRegistration old = registrations.put(registration.id, registration);
393 if (old != null) {
394
395 registrations.put(old.id, old);
396 throw new IllegalStateException();
397 }
398
399 if (registration.isHandleForChannel()) {
400 numChannels++;
401 }
402 return registration;
403 }
404
405 private static KQueueIoHandle cast(IoHandle handle) {
406 if (handle instanceof KQueueIoHandle) {
407 return (KQueueIoHandle) handle;
408 }
409 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
410 }
411
412 private static KQueueIoOps cast(IoOps ops) {
413 if (ops instanceof KQueueIoOps) {
414 return (KQueueIoOps) ops;
415 }
416 throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
417 }
418
419 @Override
420 public boolean isCompatible(Class<? extends IoHandle> handleType) {
421 return KQueueIoHandle.class.isAssignableFrom(handleType);
422 }
423
424 private final class DefaultKqueueIoRegistration implements IoRegistration {
425 private boolean cancellationPending;
426 private final AtomicBoolean canceled = new AtomicBoolean();
427 private final KQueueIoEvent event = new KQueueIoEvent();
428
429 final KQueueIoHandle handle;
430 final long id;
431 private final ThreadAwareExecutor executor;
432
433 DefaultKqueueIoRegistration(ThreadAwareExecutor executor, KQueueIoHandle handle) {
434 this.executor = executor;
435 this.handle = handle;
436 id = generateNextId();
437 }
438
439 boolean isHandleForChannel() {
440 return handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe;
441 }
442
443 @SuppressWarnings("unchecked")
444 @Override
445 public <T> T attachment() {
446 return (T) nativeArrays;
447 }
448
449 @Override
450 public long submit(IoOps ops) {
451 KQueueIoOps kQueueIoOps = cast(ops);
452 if (!isValid()) {
453 return -1;
454 }
455 short filter = kQueueIoOps.filter();
456 short flags = kQueueIoOps.flags();
457 int fflags = kQueueIoOps.fflags();
458 if (executor.isExecutorThread(Thread.currentThread())) {
459 evSet(filter, flags, fflags);
460 } else {
461 executor.execute(() -> evSet(filter, flags, fflags));
462 }
463 return 0;
464 }
465
466 void handle(int ident, short filter, short flags, int fflags, long data, long udata) {
467 if (cancellationPending) {
468
469 return;
470 }
471 event.update(ident, filter, flags, fflags, data, udata);
472 handle.handle(this, event);
473 }
474
475 private void evSet(short filter, short flags, int fflags) {
476 if (cancellationPending) {
477
478 return;
479 }
480 changeList.evSet(handle.ident(), filter, flags, fflags, 0, id);
481 }
482
483 @Override
484 public boolean isValid() {
485 return !canceled.get();
486 }
487
488 @Override
489 public boolean cancel() {
490 if (!canceled.compareAndSet(false, true)) {
491 return false;
492 }
493 if (executor.isExecutorThread(Thread.currentThread())) {
494 cancel0();
495 } else {
496 executor.execute(this::cancel0);
497 }
498 return true;
499 }
500
501 private void cancel0() {
502
503
504
505 cancellationPending = true;
506 cancelledRegistrations.offer(this);
507 }
508
509 void close() {
510 cancel();
511 try {
512 handle.close();
513 } catch (Exception e) {
514 logger.debug("Exception during closing " + handle, e);
515 }
516 }
517 }
518 }