1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.DefaultSelectStrategyFactory;
20 import io.netty.channel.IoHandlerContext;
21 import io.netty.channel.IoHandle;
22 import io.netty.channel.IoHandler;
23 import io.netty.channel.IoHandlerFactory;
24 import io.netty.channel.IoOps;
25 import io.netty.channel.IoRegistration;
26 import io.netty.channel.SelectStrategy;
27 import io.netty.channel.SelectStrategyFactory;
28 import io.netty.channel.unix.FileDescriptor;
29 import io.netty.util.IntSupplier;
30 import io.netty.util.collection.IntObjectHashMap;
31 import io.netty.util.collection.IntObjectMap;
32 import io.netty.util.concurrent.ThreadAwareExecutor;
33 import io.netty.util.internal.ObjectUtil;
34 import io.netty.util.internal.StringUtil;
35 import io.netty.util.internal.logging.InternalLogger;
36 import io.netty.util.internal.logging.InternalLoggerFactory;
37
38 import java.io.IOException;
39 import java.util.ArrayList;
40 import java.util.Collections;
41 import java.util.List;
42 import java.util.concurrent.atomic.AtomicBoolean;
43 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
44
45 import static java.lang.Math.min;
46
47
48
49
50 public final class KQueueIoHandler implements IoHandler {
51 private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueIoHandler.class);
52 private static final AtomicIntegerFieldUpdater<KQueueIoHandler> WAKEN_UP_UPDATER =
53 AtomicIntegerFieldUpdater.newUpdater(KQueueIoHandler.class, "wakenUp");
54 private static final int KQUEUE_WAKE_UP_IDENT = 0;
55
56
57
58 private static final int KQUEUE_MAX_TIMEOUT_SECONDS = 86399;
59
60 static {
61
62
63 KQueue.ensureAvailability();
64 }
65
66 private final boolean allowGrowing;
67 private final FileDescriptor kqueueFd;
68 private final KQueueEventArray changeList;
69 private final KQueueEventArray eventList;
70 private final SelectStrategy selectStrategy;
71 private final NativeArrays nativeArrays;
72 private final IntSupplier selectNowSupplier = new IntSupplier() {
73 @Override
74 public int get() throws Exception {
75 return kqueueWaitNow();
76 }
77 };
78 private final ThreadAwareExecutor executor;
79 private final IntObjectMap<DefaultKqueueIoRegistration> registrations = new IntObjectHashMap<>(4096);
80 private int numChannels;
81
82 private volatile int wakenUp;
83
84
85
86
87 public static IoHandlerFactory newFactory() {
88 return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
89 }
90
91
92
93
94 public static IoHandlerFactory newFactory(final int maxEvents,
95 final SelectStrategyFactory selectStrategyFactory) {
96 ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
97 ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
98 return executor -> new KQueueIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
99 }
100
101 private KQueueIoHandler(ThreadAwareExecutor executor, int maxEvents, SelectStrategy strategy) {
102 this.executor = ObjectUtil.checkNotNull(executor, "executor");
103 this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
104 this.kqueueFd = Native.newKQueue();
105 if (maxEvents == 0) {
106 allowGrowing = true;
107 maxEvents = 4096;
108 } else {
109 allowGrowing = false;
110 }
111 this.changeList = new KQueueEventArray(maxEvents);
112 this.eventList = new KQueueEventArray(maxEvents);
113 nativeArrays = new NativeArrays();
114 int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
115 if (result < 0) {
116 destroy();
117 throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
118 }
119 }
120
121 @Override
122 public void wakeup() {
123 if (!executor.isExecutorThread(Thread.currentThread())
124 && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
125 wakeup0();
126 }
127 }
128
129 private void wakeup0() {
130 Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
131
132
133 }
134
135 private int kqueueWait(IoHandlerContext context, boolean oldWakeup) throws IOException {
136
137
138
139
140 if (oldWakeup && !context.canBlock()) {
141 return kqueueWaitNow();
142 }
143
144 long totalDelay = context.delayNanos(System.nanoTime());
145 int delaySeconds = (int) min(totalDelay / 1000000000L, KQUEUE_MAX_TIMEOUT_SECONDS);
146 int delayNanos = (int) (totalDelay % 1000000000L);
147 return kqueueWait(delaySeconds, delayNanos);
148 }
149
150 private int kqueueWaitNow() throws IOException {
151 return kqueueWait(0, 0);
152 }
153
154 private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
155 int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
156 changeList.clear();
157 return numEvents;
158 }
159
160 private void processReady(int ready) {
161 for (int i = 0; i < ready; ++i) {
162 final short filter = eventList.filter(i);
163 final short flags = eventList.flags(i);
164 final int ident = eventList.ident(i);
165 if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
166
167
168 assert filter != Native.EVFILT_USER ||
169 (filter == Native.EVFILT_USER && ident == KQUEUE_WAKE_UP_IDENT);
170 continue;
171 }
172
173 DefaultKqueueIoRegistration registration = registrations.get(ident);
174 if (registration == null) {
175
176
177
178 logger.warn("events[{}]=[{}, {}] had no registration!", i, ident, filter);
179 continue;
180 }
181 registration.handle(ident, filter, flags, eventList.fflags(i), eventList.data(i));
182 }
183 }
184
185 @Override
186 public int run(IoHandlerContext context) {
187 int handled = 0;
188 try {
189 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
190 switch (strategy) {
191 case SelectStrategy.CONTINUE:
192 return 0;
193
194 case SelectStrategy.BUSY_WAIT:
195
196
197 case SelectStrategy.SELECT:
198 strategy = kqueueWait(context, WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228 if (wakenUp == 1) {
229 wakeup0();
230 }
231
232 default:
233 }
234
235 if (strategy > 0) {
236 handled = strategy;
237 processReady(strategy);
238 }
239
240 if (allowGrowing && strategy == eventList.capacity()) {
241
242 eventList.realloc(false);
243 }
244 } catch (Error e) {
245 throw e;
246 } catch (Throwable t) {
247 handleLoopException(t);
248 }
249 return handled;
250 }
251
252 int numRegisteredChannels() {
253 return numChannels;
254 }
255
256 List<Channel> registeredChannelsList() {
257 IntObjectMap<DefaultKqueueIoRegistration> ch = registrations;
258 if (ch.isEmpty()) {
259 return Collections.emptyList();
260 }
261
262 List<Channel> channels = new ArrayList<>(ch.size());
263
264 for (DefaultKqueueIoRegistration registration : ch.values()) {
265 if (registration.handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
266 channels.add(((AbstractKQueueChannel.AbstractKQueueUnsafe) registration.handle).channel());
267 }
268 }
269 return Collections.unmodifiableList(channels);
270 }
271
272 private static void handleLoopException(Throwable t) {
273 logger.warn("Unexpected exception in the selector loop.", t);
274
275
276
277 try {
278 Thread.sleep(1000);
279 } catch (InterruptedException e) {
280
281 }
282 }
283
284 @Override
285 public void prepareToDestroy() {
286 try {
287 kqueueWaitNow();
288 } catch (IOException e) {
289
290 }
291
292
293
294 DefaultKqueueIoRegistration[] copy = registrations.values().toArray(new DefaultKqueueIoRegistration[0]);
295
296 for (DefaultKqueueIoRegistration reg: copy) {
297 reg.close();
298 }
299 }
300
301 @Override
302 public void destroy() {
303 try {
304 try {
305 kqueueFd.close();
306 } catch (IOException e) {
307 logger.warn("Failed to close the kqueue fd.", e);
308 }
309 } finally {
310
311 nativeArrays.free();
312 changeList.free();
313 eventList.free();
314 }
315 }
316
317 @Override
318 public IoRegistration register(IoHandle handle) {
319 final KQueueIoHandle kqueueHandle = cast(handle);
320 if (kqueueHandle.ident() == KQUEUE_WAKE_UP_IDENT) {
321 throw new IllegalArgumentException("ident " + KQUEUE_WAKE_UP_IDENT + " is reserved for internal usage");
322 }
323
324 DefaultKqueueIoRegistration registration = new DefaultKqueueIoRegistration(
325 executor, kqueueHandle);
326 DefaultKqueueIoRegistration old = registrations.put(kqueueHandle.ident(), registration);
327 if (old != null) {
328
329 registrations.put(kqueueHandle.ident(), old);
330 throw new IllegalStateException("registration for the KQueueIoHandle.ident() already exists");
331 }
332
333 if (kqueueHandle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
334 numChannels++;
335 }
336 return registration;
337 }
338
339 private static KQueueIoHandle cast(IoHandle handle) {
340 if (handle instanceof KQueueIoHandle) {
341 return (KQueueIoHandle) handle;
342 }
343 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
344 }
345
346 private static KQueueIoOps cast(IoOps ops) {
347 if (ops instanceof KQueueIoOps) {
348 return (KQueueIoOps) ops;
349 }
350 throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
351 }
352
353 @Override
354 public boolean isCompatible(Class<? extends IoHandle> handleType) {
355 return KQueueIoHandle.class.isAssignableFrom(handleType);
356 }
357
358 private final class DefaultKqueueIoRegistration implements IoRegistration {
359 private final AtomicBoolean canceled = new AtomicBoolean();
360 private final KQueueIoEvent event = new KQueueIoEvent();
361
362 final KQueueIoHandle handle;
363
364 private final ThreadAwareExecutor executor;
365
366 DefaultKqueueIoRegistration(ThreadAwareExecutor executor, KQueueIoHandle handle) {
367 this.executor = executor;
368 this.handle = handle;
369 }
370
371 @SuppressWarnings("unchecked")
372 @Override
373 public <T> T attachment() {
374 return (T) nativeArrays;
375 }
376
377 @Override
378 public long submit(IoOps ops) {
379 KQueueIoOps kQueueIoOps = cast(ops);
380 if (!isValid()) {
381 return -1;
382 }
383 short filter = kQueueIoOps.filter();
384 short flags = kQueueIoOps.flags();
385 int fflags = kQueueIoOps.fflags();
386 if (executor.isExecutorThread(Thread.currentThread())) {
387 evSet(filter, flags, fflags);
388 } else {
389 executor.execute(() -> evSet(filter, flags, fflags));
390 }
391 return 0;
392 }
393
394 void handle(int ident, short filter, short flags, int fflags, long data) {
395 event.update(ident, filter, flags, fflags, data);
396 handle.handle(this, event);
397 }
398
399 private void evSet(short filter, short flags, int fflags) {
400 changeList.evSet(handle.ident(), filter, flags, fflags);
401 }
402
403 @Override
404 public boolean isValid() {
405 return !canceled.get();
406 }
407
408 @Override
409 public boolean cancel() {
410 if (!canceled.compareAndSet(false, true)) {
411 return false;
412 }
413 if (executor.isExecutorThread(Thread.currentThread())) {
414 cancel0();
415 } else {
416 executor.execute(this::cancel0);
417 }
418 return true;
419 }
420
421 private void cancel0() {
422 int ident = handle.ident();
423 DefaultKqueueIoRegistration old = registrations.remove(ident);
424 if (old != null) {
425 if (old != this) {
426
427 registrations.put(ident, old);
428 } else if (old.handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
429 numChannels--;
430 }
431 }
432 }
433
434 void close() {
435 cancel();
436 try {
437 handle.close();
438 } catch (Exception e) {
439 logger.debug("Exception during closing " + handle, e);
440 }
441 }
442 }
443 }