1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.DefaultSelectStrategyFactory;
20 import io.netty.channel.IoHandlerContext;
21 import io.netty.channel.IoHandle;
22 import io.netty.channel.IoHandler;
23 import io.netty.channel.IoHandlerFactory;
24 import io.netty.channel.IoOps;
25 import io.netty.channel.IoRegistration;
26 import io.netty.channel.SelectStrategy;
27 import io.netty.channel.SelectStrategyFactory;
28 import io.netty.channel.unix.FileDescriptor;
29 import io.netty.util.IntSupplier;
30 import io.netty.util.collection.LongObjectHashMap;
31 import io.netty.util.collection.LongObjectMap;
32 import io.netty.util.concurrent.ThreadAwareExecutor;
33 import io.netty.util.internal.ObjectUtil;
34 import io.netty.util.internal.StringUtil;
35 import io.netty.util.internal.logging.InternalLogger;
36 import io.netty.util.internal.logging.InternalLoggerFactory;
37
38 import java.io.IOException;
39 import java.util.ArrayList;
40 import java.util.Collections;
41 import java.util.List;
42 import java.util.concurrent.atomic.AtomicBoolean;
43 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
44
45 import static java.lang.Math.min;
46
47
48
49
50 public final class KQueueIoHandler implements IoHandler {
51 private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueIoHandler.class);
52 private static final AtomicIntegerFieldUpdater<KQueueIoHandler> WAKEN_UP_UPDATER =
53 AtomicIntegerFieldUpdater.newUpdater(KQueueIoHandler.class, "wakenUp");
54 private static final int KQUEUE_WAKE_UP_IDENT = 0;
55
56
57
58 private static final int KQUEUE_MAX_TIMEOUT_SECONDS = 86399;
59
60 static {
61
62
63 KQueue.ensureAvailability();
64 }
65
66 private final boolean allowGrowing;
67 private final FileDescriptor kqueueFd;
68 private final KQueueEventArray changeList;
69 private final KQueueEventArray eventList;
70 private final SelectStrategy selectStrategy;
71 private final NativeArrays nativeArrays;
72 private final IntSupplier selectNowSupplier = new IntSupplier() {
73 @Override
74 public int get() throws Exception {
75 return kqueueWaitNow();
76 }
77 };
78 private final ThreadAwareExecutor executor;
79 private final LongObjectMap<DefaultKqueueIoRegistration> registrations = new LongObjectHashMap<>(4096);
80 private int numChannels;
81 private long nextId;
82
83 private volatile int wakenUp;
84
85 private long generateNextId() {
86 boolean reset = false;
87 for (;;) {
88 if (nextId == Long.MAX_VALUE) {
89 if (reset) {
90 throw new IllegalStateException("All possible ids in use");
91 }
92 reset = true;
93 }
94 nextId++;
95 if (nextId == KQUEUE_WAKE_UP_IDENT) {
96 continue;
97 }
98 if (!registrations.containsKey(nextId)) {
99 return nextId;
100 }
101 }
102 }
103
104
105
106
107 public static IoHandlerFactory newFactory() {
108 return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
109 }
110
111
112
113
114 public static IoHandlerFactory newFactory(final int maxEvents,
115 final SelectStrategyFactory selectStrategyFactory) {
116 ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
117 ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
118 return executor -> new KQueueIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
119 }
120
121 private KQueueIoHandler(ThreadAwareExecutor executor, int maxEvents, SelectStrategy strategy) {
122 this.executor = ObjectUtil.checkNotNull(executor, "executor");
123 this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
124 this.kqueueFd = Native.newKQueue();
125 if (maxEvents == 0) {
126 allowGrowing = true;
127 maxEvents = 4096;
128 } else {
129 allowGrowing = false;
130 }
131 this.changeList = new KQueueEventArray(maxEvents);
132 this.eventList = new KQueueEventArray(maxEvents);
133 nativeArrays = new NativeArrays();
134 int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
135 if (result < 0) {
136 destroy();
137 throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
138 }
139 }
140
141 @Override
142 public void wakeup() {
143 if (!executor.isExecutorThread(Thread.currentThread())
144 && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
145 wakeup0();
146 }
147 }
148
149 private void wakeup0() {
150 Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
151
152
153 }
154
155 private int kqueueWait(IoHandlerContext context, boolean oldWakeup) throws IOException {
156
157
158
159
160 if (oldWakeup && !context.canBlock()) {
161 return kqueueWaitNow();
162 }
163
164 long totalDelay = context.delayNanos(System.nanoTime());
165 int delaySeconds = (int) min(totalDelay / 1000000000L, KQUEUE_MAX_TIMEOUT_SECONDS);
166 int delayNanos = (int) (totalDelay % 1000000000L);
167 return kqueueWait(delaySeconds, delayNanos);
168 }
169
170 private int kqueueWaitNow() throws IOException {
171 return kqueueWait(0, 0);
172 }
173
174 private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
175 int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
176 changeList.clear();
177 return numEvents;
178 }
179
180 private void processReady(int ready) {
181 for (int i = 0; i < ready; ++i) {
182 final short filter = eventList.filter(i);
183 final short flags = eventList.flags(i);
184 final int ident = eventList.ident(i);
185 if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
186
187
188 assert filter != Native.EVFILT_USER ||
189 (filter == Native.EVFILT_USER && ident == KQUEUE_WAKE_UP_IDENT);
190 continue;
191 }
192
193 long id = eventList.udata(i);
194 DefaultKqueueIoRegistration registration = registrations.get(id);
195 if (registration == null) {
196
197
198
199 logger.warn("events[{}]=[{}, {}, {}] had no registration!", i, ident, id, filter);
200 continue;
201 }
202 registration.handle(ident, filter, flags, eventList.fflags(i), eventList.data(i), id);
203 }
204 }
205
206 @Override
207 public int run(IoHandlerContext context) {
208 int handled = 0;
209 try {
210 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
211 switch (strategy) {
212 case SelectStrategy.CONTINUE:
213 return 0;
214
215 case SelectStrategy.BUSY_WAIT:
216
217
218 case SelectStrategy.SELECT:
219 strategy = kqueueWait(context, WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249 if (wakenUp == 1) {
250 wakeup0();
251 }
252
253 default:
254 }
255
256 if (strategy > 0) {
257 handled = strategy;
258 processReady(strategy);
259 }
260
261 if (allowGrowing && strategy == eventList.capacity()) {
262
263 eventList.realloc(false);
264 }
265 } catch (Error e) {
266 throw e;
267 } catch (Throwable t) {
268 handleLoopException(t);
269 }
270 return handled;
271 }
272
273 int numRegisteredChannels() {
274 return numChannels;
275 }
276
277 List<Channel> registeredChannelsList() {
278 LongObjectMap<DefaultKqueueIoRegistration> ch = registrations;
279 if (ch.isEmpty()) {
280 return Collections.emptyList();
281 }
282
283 List<Channel> channels = new ArrayList<>(ch.size());
284
285 for (DefaultKqueueIoRegistration registration : ch.values()) {
286 if (registration.handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
287 channels.add(((AbstractKQueueChannel.AbstractKQueueUnsafe) registration.handle).channel());
288 }
289 }
290 return Collections.unmodifiableList(channels);
291 }
292
293 private static void handleLoopException(Throwable t) {
294 logger.warn("Unexpected exception in the selector loop.", t);
295
296
297
298 try {
299 Thread.sleep(1000);
300 } catch (InterruptedException e) {
301
302 }
303 }
304
305 @Override
306 public void prepareToDestroy() {
307 try {
308 kqueueWaitNow();
309 } catch (IOException e) {
310
311 }
312
313
314
315 DefaultKqueueIoRegistration[] copy = registrations.values().toArray(new DefaultKqueueIoRegistration[0]);
316
317 for (DefaultKqueueIoRegistration reg: copy) {
318 reg.close();
319 }
320 }
321
322 @Override
323 public void destroy() {
324 try {
325 try {
326 kqueueFd.close();
327 } catch (IOException e) {
328 logger.warn("Failed to close the kqueue fd.", e);
329 }
330 } finally {
331
332 nativeArrays.free();
333 changeList.free();
334 eventList.free();
335 }
336 }
337
338 @Override
339 public IoRegistration register(IoHandle handle) {
340 final KQueueIoHandle kqueueHandle = cast(handle);
341 if (kqueueHandle.ident() == KQUEUE_WAKE_UP_IDENT) {
342 throw new IllegalArgumentException("ident " + KQUEUE_WAKE_UP_IDENT + " is reserved for internal usage");
343 }
344
345 DefaultKqueueIoRegistration registration = new DefaultKqueueIoRegistration(
346 executor, kqueueHandle);
347 DefaultKqueueIoRegistration old = registrations.put(registration.id, registration);
348 if (old != null) {
349
350 registrations.put(old.id, old);
351 throw new IllegalStateException();
352 }
353
354 if (kqueueHandle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
355 numChannels++;
356 }
357 return registration;
358 }
359
360 private static KQueueIoHandle cast(IoHandle handle) {
361 if (handle instanceof KQueueIoHandle) {
362 return (KQueueIoHandle) handle;
363 }
364 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
365 }
366
367 private static KQueueIoOps cast(IoOps ops) {
368 if (ops instanceof KQueueIoOps) {
369 return (KQueueIoOps) ops;
370 }
371 throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
372 }
373
374 @Override
375 public boolean isCompatible(Class<? extends IoHandle> handleType) {
376 return KQueueIoHandle.class.isAssignableFrom(handleType);
377 }
378
379 private final class DefaultKqueueIoRegistration implements IoRegistration {
380 private final AtomicBoolean canceled = new AtomicBoolean();
381 private final KQueueIoEvent event = new KQueueIoEvent();
382
383 final KQueueIoHandle handle;
384 final long id;
385 private final ThreadAwareExecutor executor;
386
387 DefaultKqueueIoRegistration(ThreadAwareExecutor executor, KQueueIoHandle handle) {
388 this.executor = executor;
389 this.handle = handle;
390 id = generateNextId();
391 }
392
393 @SuppressWarnings("unchecked")
394 @Override
395 public <T> T attachment() {
396 return (T) nativeArrays;
397 }
398
399 @Override
400 public long submit(IoOps ops) {
401 KQueueIoOps kQueueIoOps = cast(ops);
402 if (!isValid()) {
403 return -1;
404 }
405 short filter = kQueueIoOps.filter();
406 short flags = kQueueIoOps.flags();
407 int fflags = kQueueIoOps.fflags();
408 if (executor.isExecutorThread(Thread.currentThread())) {
409 evSet(filter, flags, fflags);
410 } else {
411 executor.execute(() -> evSet(filter, flags, fflags));
412 }
413 return 0;
414 }
415
416 void handle(int ident, short filter, short flags, int fflags, long data, long udata) {
417 event.update(ident, filter, flags, fflags, data, udata);
418 handle.handle(this, event);
419 }
420
421 private void evSet(short filter, short flags, int fflags) {
422 changeList.evSet(handle.ident(), filter, flags, fflags, 0, id);
423 }
424
425 @Override
426 public boolean isValid() {
427 return !canceled.get();
428 }
429
430 @Override
431 public boolean cancel() {
432 if (!canceled.compareAndSet(false, true)) {
433 return false;
434 }
435 if (executor.isExecutorThread(Thread.currentThread())) {
436 cancel0();
437 } else {
438 executor.execute(this::cancel0);
439 }
440 return true;
441 }
442
443 private void cancel0() {
444 DefaultKqueueIoRegistration old = registrations.remove(id);
445 if (old != null) {
446 if (old != this) {
447
448 registrations.put(id, old);
449 } else if (old.handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
450 numChannels--;
451 }
452 }
453 }
454
455 void close() {
456 cancel();
457 try {
458 handle.close();
459 } catch (Exception e) {
460 logger.debug("Exception during closing " + handle, e);
461 }
462 }
463 }
464 }