1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.DefaultSelectStrategyFactory;
20 import io.netty.channel.IoHandlerContext;
21 import io.netty.channel.IoHandle;
22 import io.netty.channel.IoHandler;
23 import io.netty.channel.IoHandlerFactory;
24 import io.netty.channel.IoOps;
25 import io.netty.channel.IoRegistration;
26 import io.netty.channel.SelectStrategy;
27 import io.netty.channel.SelectStrategyFactory;
28 import io.netty.channel.unix.FileDescriptor;
29 import io.netty.util.IntSupplier;
30 import io.netty.util.collection.LongObjectHashMap;
31 import io.netty.util.collection.LongObjectMap;
32 import io.netty.util.concurrent.ThreadAwareExecutor;
33 import io.netty.util.internal.ObjectUtil;
34 import io.netty.util.internal.StringUtil;
35 import io.netty.util.internal.logging.InternalLogger;
36 import io.netty.util.internal.logging.InternalLoggerFactory;
37
38 import java.io.IOException;
39 import java.util.ArrayDeque;
40 import java.util.ArrayList;
41 import java.util.Collections;
42 import java.util.List;
43 import java.util.Queue;
44 import java.util.concurrent.atomic.AtomicBoolean;
45 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
46
47 import static java.lang.Math.min;
48
49
50
51
52 public final class KQueueIoHandler implements IoHandler {
53 private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueIoHandler.class);
54 private static final AtomicIntegerFieldUpdater<KQueueIoHandler> WAKEN_UP_UPDATER =
55 AtomicIntegerFieldUpdater.newUpdater(KQueueIoHandler.class, "wakenUp");
56 private static final int KQUEUE_WAKE_UP_IDENT = 0;
57
58
59
60 private static final int KQUEUE_MAX_TIMEOUT_SECONDS = 86399;
61
62 static {
63
64
65 KQueue.ensureAvailability();
66 }
67
68 private final boolean allowGrowing;
69 private final FileDescriptor kqueueFd;
70 private final KQueueEventArray changeList;
71 private final KQueueEventArray eventList;
72 private final SelectStrategy selectStrategy;
73 private final NativeArrays nativeArrays;
74 private final IntSupplier selectNowSupplier = new IntSupplier() {
75 @Override
76 public int get() throws Exception {
77 return kqueueWaitNow();
78 }
79 };
80 private final ThreadAwareExecutor executor;
81 private final Queue<DefaultKqueueIoRegistration> cancelledRegistrations = new ArrayDeque<>();
82 private final LongObjectMap<DefaultKqueueIoRegistration> registrations = new LongObjectHashMap<>(4096);
83 private int numChannels;
84 private long nextId;
85
86 private volatile int wakenUp;
87
88 private long generateNextId() {
89 boolean reset = false;
90 for (;;) {
91 if (nextId == Long.MAX_VALUE) {
92 if (reset) {
93 throw new IllegalStateException("All possible ids in use");
94 }
95 reset = true;
96 }
97 nextId++;
98 if (nextId == KQUEUE_WAKE_UP_IDENT) {
99 continue;
100 }
101 if (!registrations.containsKey(nextId)) {
102 return nextId;
103 }
104 }
105 }
106
107
108
109
110 public static IoHandlerFactory newFactory() {
111 return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
112 }
113
114
115
116
117 public static IoHandlerFactory newFactory(final int maxEvents,
118 final SelectStrategyFactory selectStrategyFactory) {
119 ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
120 ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
121 return executor -> new KQueueIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
122 }
123
124 private KQueueIoHandler(ThreadAwareExecutor executor, int maxEvents, SelectStrategy strategy) {
125 this.executor = ObjectUtil.checkNotNull(executor, "executor");
126 this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
127 this.kqueueFd = Native.newKQueue();
128 if (maxEvents == 0) {
129 allowGrowing = true;
130 maxEvents = 4096;
131 } else {
132 allowGrowing = false;
133 }
134 this.changeList = new KQueueEventArray(maxEvents);
135 this.eventList = new KQueueEventArray(maxEvents);
136 nativeArrays = new NativeArrays();
137 int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
138 if (result < 0) {
139 destroy();
140 throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
141 }
142 }
143
144 @Override
145 public void wakeup() {
146 if (!executor.isExecutorThread(Thread.currentThread())
147 && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
148 wakeup0();
149 }
150 }
151
152 private void wakeup0() {
153 Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
154
155
156 }
157
158 private int kqueueWait(IoHandlerContext context, boolean oldWakeup) throws IOException {
159
160
161
162
163 if (oldWakeup && !context.canBlock()) {
164 return kqueueWaitNow();
165 }
166
167 long totalDelay = context.delayNanos(System.nanoTime());
168 int delaySeconds = (int) min(totalDelay / 1000000000L, KQUEUE_MAX_TIMEOUT_SECONDS);
169 int delayNanos = (int) (totalDelay % 1000000000L);
170 return kqueueWait(delaySeconds, delayNanos);
171 }
172
173 private int kqueueWaitNow() throws IOException {
174 return kqueueWait(0, 0);
175 }
176
177 private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
178 int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
179 changeList.clear();
180 return numEvents;
181 }
182
183 private void processReady(int ready) {
184 for (int i = 0; i < ready; ++i) {
185 final short filter = eventList.filter(i);
186 final short flags = eventList.flags(i);
187 final int ident = eventList.ident(i);
188 if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
189
190
191 assert filter != Native.EVFILT_USER ||
192 (filter == Native.EVFILT_USER && ident == KQUEUE_WAKE_UP_IDENT);
193 continue;
194 }
195
196 long id = eventList.udata(i);
197 DefaultKqueueIoRegistration registration = registrations.get(id);
198 if (registration == null) {
199
200
201
202 logger.warn("events[{}]=[{}, {}, {}] had no registration!", i, ident, id, filter);
203 continue;
204 }
205 registration.handle(ident, filter, flags, eventList.fflags(i), eventList.data(i), id);
206 }
207 }
208
209 @Override
210 public int run(IoHandlerContext context) {
211 int handled = 0;
212 try {
213 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
214 switch (strategy) {
215 case SelectStrategy.CONTINUE:
216 return 0;
217
218 case SelectStrategy.BUSY_WAIT:
219
220
221 case SelectStrategy.SELECT:
222 strategy = kqueueWait(context, WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252 if (wakenUp == 1) {
253 wakeup0();
254 }
255
256 default:
257 }
258
259 if (strategy > 0) {
260 handled = strategy;
261 processReady(strategy);
262 }
263
264 if (allowGrowing && strategy == eventList.capacity()) {
265
266 eventList.realloc(false);
267 }
268 } catch (Error e) {
269 throw e;
270 } catch (Throwable t) {
271 handleLoopException(t);
272 } finally {
273 processCancelledRegistrations();
274 }
275 return handled;
276 }
277
278
279 private void processCancelledRegistrations() {
280 for (;;) {
281 DefaultKqueueIoRegistration cancelledRegistration = cancelledRegistrations.poll();
282 if (cancelledRegistration == null) {
283 return;
284 }
285 DefaultKqueueIoRegistration removed = registrations.remove(cancelledRegistration.id);
286 assert removed == cancelledRegistration;
287 if (removed.isHandleForChannel()) {
288 numChannels--;
289 }
290 }
291 }
292
293 int numRegisteredChannels() {
294 return numChannels;
295 }
296
297 List<Channel> registeredChannelsList() {
298 LongObjectMap<DefaultKqueueIoRegistration> ch = registrations;
299 if (ch.isEmpty()) {
300 return Collections.emptyList();
301 }
302
303 List<Channel> channels = new ArrayList<>(ch.size());
304
305 for (DefaultKqueueIoRegistration registration : ch.values()) {
306 if (registration.handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
307 channels.add(((AbstractKQueueChannel.AbstractKQueueUnsafe) registration.handle).channel());
308 }
309 }
310 return Collections.unmodifiableList(channels);
311 }
312
313 private static void handleLoopException(Throwable t) {
314 logger.warn("Unexpected exception in the selector loop.", t);
315
316
317
318 try {
319 Thread.sleep(1000);
320 } catch (InterruptedException e) {
321
322 }
323 }
324
325 @Override
326 public void prepareToDestroy() {
327 try {
328 kqueueWaitNow();
329 } catch (IOException e) {
330
331 }
332
333
334
335 DefaultKqueueIoRegistration[] copy = registrations.values().toArray(new DefaultKqueueIoRegistration[0]);
336
337 for (DefaultKqueueIoRegistration reg: copy) {
338 reg.close();
339 }
340
341 processCancelledRegistrations();
342 }
343
344 @Override
345 public void destroy() {
346 try {
347 try {
348 kqueueFd.close();
349 } catch (IOException e) {
350 logger.warn("Failed to close the kqueue fd.", e);
351 }
352 } finally {
353
354 nativeArrays.free();
355 changeList.free();
356 eventList.free();
357 }
358 }
359
360 @Override
361 public IoRegistration register(IoHandle handle) {
362 final KQueueIoHandle kqueueHandle = cast(handle);
363 if (kqueueHandle.ident() == KQUEUE_WAKE_UP_IDENT) {
364 throw new IllegalArgumentException("ident " + KQUEUE_WAKE_UP_IDENT + " is reserved for internal usage");
365 }
366
367 DefaultKqueueIoRegistration registration = new DefaultKqueueIoRegistration(
368 executor, kqueueHandle);
369 DefaultKqueueIoRegistration old = registrations.put(registration.id, registration);
370 if (old != null) {
371
372 registrations.put(old.id, old);
373 throw new IllegalStateException();
374 }
375
376 if (registration.isHandleForChannel()) {
377 numChannels++;
378 }
379 return registration;
380 }
381
382 private static KQueueIoHandle cast(IoHandle handle) {
383 if (handle instanceof KQueueIoHandle) {
384 return (KQueueIoHandle) handle;
385 }
386 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
387 }
388
389 private static KQueueIoOps cast(IoOps ops) {
390 if (ops instanceof KQueueIoOps) {
391 return (KQueueIoOps) ops;
392 }
393 throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
394 }
395
396 @Override
397 public boolean isCompatible(Class<? extends IoHandle> handleType) {
398 return KQueueIoHandle.class.isAssignableFrom(handleType);
399 }
400
401 private final class DefaultKqueueIoRegistration implements IoRegistration {
402 private boolean cancellationPending;
403 private final AtomicBoolean canceled = new AtomicBoolean();
404 private final KQueueIoEvent event = new KQueueIoEvent();
405
406 final KQueueIoHandle handle;
407 final long id;
408 private final ThreadAwareExecutor executor;
409
410 DefaultKqueueIoRegistration(ThreadAwareExecutor executor, KQueueIoHandle handle) {
411 this.executor = executor;
412 this.handle = handle;
413 id = generateNextId();
414 }
415
416 boolean isHandleForChannel() {
417 return handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe;
418 }
419
420 @SuppressWarnings("unchecked")
421 @Override
422 public <T> T attachment() {
423 return (T) nativeArrays;
424 }
425
426 @Override
427 public long submit(IoOps ops) {
428 KQueueIoOps kQueueIoOps = cast(ops);
429 if (!isValid()) {
430 return -1;
431 }
432 short filter = kQueueIoOps.filter();
433 short flags = kQueueIoOps.flags();
434 int fflags = kQueueIoOps.fflags();
435 if (executor.isExecutorThread(Thread.currentThread())) {
436 evSet(filter, flags, fflags);
437 } else {
438 executor.execute(() -> evSet(filter, flags, fflags));
439 }
440 return 0;
441 }
442
443 void handle(int ident, short filter, short flags, int fflags, long data, long udata) {
444 if (cancellationPending) {
445
446 return;
447 }
448 event.update(ident, filter, flags, fflags, data, udata);
449 handle.handle(this, event);
450 }
451
452 private void evSet(short filter, short flags, int fflags) {
453 if (cancellationPending) {
454
455 return;
456 }
457 changeList.evSet(handle.ident(), filter, flags, fflags, 0, id);
458 }
459
460 @Override
461 public boolean isValid() {
462 return !canceled.get();
463 }
464
465 @Override
466 public boolean cancel() {
467 if (!canceled.compareAndSet(false, true)) {
468 return false;
469 }
470 if (executor.isExecutorThread(Thread.currentThread())) {
471 cancel0();
472 } else {
473 executor.execute(this::cancel0);
474 }
475 return true;
476 }
477
478 private void cancel0() {
479
480
481
482 cancellationPending = true;
483 cancelledRegistrations.offer(this);
484 }
485
486 void close() {
487 cancel();
488 try {
489 handle.close();
490 } catch (Exception e) {
491 logger.debug("Exception during closing " + handle, e);
492 }
493 }
494 }
495 }