1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.DefaultSelectStrategyFactory;
20 import io.netty.channel.EventLoop;
21 import io.netty.channel.IoEventLoop;
22 import io.netty.channel.IoExecutionContext;
23 import io.netty.channel.IoHandle;
24 import io.netty.channel.IoHandler;
25 import io.netty.channel.IoHandlerFactory;
26 import io.netty.channel.IoOps;
27 import io.netty.channel.SelectStrategy;
28 import io.netty.channel.SelectStrategyFactory;
29 import io.netty.channel.unix.FileDescriptor;
30 import io.netty.channel.unix.IovArray;
31 import io.netty.util.IntSupplier;
32 import io.netty.util.collection.IntObjectHashMap;
33 import io.netty.util.collection.IntObjectMap;
34 import io.netty.util.concurrent.Future;
35 import io.netty.util.concurrent.Promise;
36 import io.netty.util.internal.ObjectUtil;
37 import io.netty.util.internal.StringUtil;
38 import io.netty.util.internal.logging.InternalLogger;
39 import io.netty.util.internal.logging.InternalLoggerFactory;
40
41 import java.io.IOException;
42 import java.util.ArrayList;
43 import java.util.Collections;
44 import java.util.List;
45 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
46
47 import static java.lang.Math.min;
48
49
50
51
52 public final class KQueueIoHandler implements IoHandler {
53 private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueIoHandler.class);
54 private static final AtomicIntegerFieldUpdater<KQueueIoHandler> WAKEN_UP_UPDATER =
55 AtomicIntegerFieldUpdater.newUpdater(KQueueIoHandler.class, "wakenUp");
56 private static final int KQUEUE_WAKE_UP_IDENT = 0;
57
58
59
60 private static final int KQUEUE_MAX_TIMEOUT_SECONDS = 86399;
61
62 static {
63
64
65 KQueue.ensureAvailability();
66 }
67
68 private final boolean allowGrowing;
69 private final FileDescriptor kqueueFd;
70 private final KQueueEventArray changeList;
71 private final KQueueEventArray eventList;
72 private final SelectStrategy selectStrategy;
73 private final IovArray iovArray = new IovArray();
74 private final IntSupplier selectNowSupplier = new IntSupplier() {
75 @Override
76 public int get() throws Exception {
77 return kqueueWaitNow();
78 }
79 };
80 private final IntObjectMap<DefaultKqueueIoRegistration> registrations = new IntObjectHashMap<>(4096);
81 private int numChannels;
82
83 private volatile int wakenUp;
84
85
86
87
88 public static IoHandlerFactory newFactory() {
89 return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
90 }
91
92
93
94
95 public static IoHandlerFactory newFactory(final int maxEvents,
96 final SelectStrategyFactory selectStrategyFactory) {
97 ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
98 ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
99 return new IoHandlerFactory() {
100 @Override
101 public IoHandler newHandler() {
102 return new KQueueIoHandler(maxEvents, selectStrategyFactory.newSelectStrategy());
103 }
104 };
105 }
106
107 private KQueueIoHandler(int maxEvents, SelectStrategy strategy) {
108 this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
109 this.kqueueFd = Native.newKQueue();
110 if (maxEvents == 0) {
111 allowGrowing = true;
112 maxEvents = 4096;
113 } else {
114 allowGrowing = false;
115 }
116 this.changeList = new KQueueEventArray(maxEvents);
117 this.eventList = new KQueueEventArray(maxEvents);
118 int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
119 if (result < 0) {
120 destroy();
121 throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
122 }
123 }
124
125
126
127
128 IovArray cleanArray() {
129 iovArray.clear();
130 return iovArray;
131 }
132
133 @Override
134 public void wakeup(IoEventLoop eventLoop) {
135 if (!eventLoop.inEventLoop() && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
136 wakeup();
137 }
138 }
139
140 private void wakeup() {
141 Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
142
143
144 }
145
146 private int kqueueWait(IoExecutionContext context, boolean oldWakeup) throws IOException {
147
148
149
150
151 if (oldWakeup && !context.canBlock()) {
152 return kqueueWaitNow();
153 }
154
155 long totalDelay = context.delayNanos(System.nanoTime());
156 int delaySeconds = (int) min(totalDelay / 1000000000L, KQUEUE_MAX_TIMEOUT_SECONDS);
157 int delayNanos = (int) (totalDelay % 1000000000L);
158 return kqueueWait(delaySeconds, delayNanos);
159 }
160
161 private int kqueueWaitNow() throws IOException {
162 return kqueueWait(0, 0);
163 }
164
165 private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
166 int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
167 changeList.clear();
168 return numEvents;
169 }
170
171 private void processReady(int ready) {
172 for (int i = 0; i < ready; ++i) {
173 final short filter = eventList.filter(i);
174 final short flags = eventList.flags(i);
175 final int ident = eventList.ident(i);
176 if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
177
178
179 assert filter != Native.EVFILT_USER ||
180 (filter == Native.EVFILT_USER && ident == KQUEUE_WAKE_UP_IDENT);
181 continue;
182 }
183
184 DefaultKqueueIoRegistration registration = registrations.get(ident);
185 if (registration == null) {
186
187
188
189 logger.warn("events[{}]=[{}, {}] had no registration!", i, ident, filter);
190 continue;
191 }
192 registration.handle(ident, filter, flags, eventList.fflags(i), eventList.data(i));
193 }
194 }
195
196 @Override
197 public int run(IoExecutionContext context) {
198 int handled = 0;
199 try {
200 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
201 switch (strategy) {
202 case SelectStrategy.CONTINUE:
203 return 0;
204
205 case SelectStrategy.BUSY_WAIT:
206
207
208 case SelectStrategy.SELECT:
209 strategy = kqueueWait(context, WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239 if (wakenUp == 1) {
240 wakeup();
241 }
242
243 default:
244 }
245
246 if (strategy > 0) {
247 handled = strategy;
248 processReady(strategy);
249 }
250
251 if (allowGrowing && strategy == eventList.capacity()) {
252
253 eventList.realloc(false);
254 }
255 } catch (Error e) {
256 throw e;
257 } catch (Throwable t) {
258 handleLoopException(t);
259 }
260 return handled;
261 }
262
263 int numRegisteredChannels() {
264 return numChannels;
265 }
266
267 List<Channel> registeredChannelsList() {
268 IntObjectMap<DefaultKqueueIoRegistration> ch = registrations;
269 if (ch.isEmpty()) {
270 return Collections.emptyList();
271 }
272
273 List<Channel> channels = new ArrayList<>(ch.size());
274
275 for (DefaultKqueueIoRegistration registration : ch.values()) {
276 if (registration.handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
277 channels.add(((AbstractKQueueChannel.AbstractKQueueUnsafe) registration.handle).channel());
278 }
279 }
280 return Collections.unmodifiableList(channels);
281 }
282
283 private static void handleLoopException(Throwable t) {
284 logger.warn("Unexpected exception in the selector loop.", t);
285
286
287
288 try {
289 Thread.sleep(1000);
290 } catch (InterruptedException e) {
291
292 }
293 }
294
295 @Override
296 public void prepareToDestroy() {
297 try {
298 kqueueWaitNow();
299 } catch (IOException e) {
300
301 }
302
303
304
305 DefaultKqueueIoRegistration[] copy = registrations.values().toArray(new DefaultKqueueIoRegistration[0]);
306
307 for (DefaultKqueueIoRegistration reg: copy) {
308 reg.close();
309 }
310 }
311
312 @Override
313 public void destroy() {
314 try {
315 try {
316 kqueueFd.close();
317 } catch (IOException e) {
318 logger.warn("Failed to close the kqueue fd.", e);
319 }
320 } finally {
321
322 changeList.free();
323 eventList.free();
324 }
325 }
326
327 @Override
328 public KQueueIoRegistration register(IoEventLoop eventLoop, IoHandle handle) {
329 final KQueueIoHandle kqueueHandle = cast(handle);
330 if (kqueueHandle.ident() == KQUEUE_WAKE_UP_IDENT) {
331 throw new IllegalArgumentException("ident " + KQUEUE_WAKE_UP_IDENT + " is reserved for internal usage");
332 }
333
334 DefaultKqueueIoRegistration registration = new DefaultKqueueIoRegistration(
335 eventLoop, kqueueHandle);
336 DefaultKqueueIoRegistration old = registrations.put(kqueueHandle.ident(), registration);
337 if (old != null) {
338
339 registrations.put(kqueueHandle.ident(), old);
340 throw new IllegalStateException("registration for the KQueueIoHandle.ident() already exists");
341 }
342
343 if (kqueueHandle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
344 numChannels++;
345 }
346 return registration;
347 }
348
349 private static KQueueIoHandle cast(IoHandle handle) {
350 if (handle instanceof KQueueIoHandle) {
351 return (KQueueIoHandle) handle;
352 }
353 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
354 }
355
356 private static KQueueIoOps cast(IoOps ops) {
357 if (ops instanceof KQueueIoOps) {
358 return (KQueueIoOps) ops;
359 }
360 throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
361 }
362
363 @Override
364 public boolean isCompatible(Class<? extends IoHandle> handleType) {
365 return KQueueIoHandle.class.isAssignableFrom(handleType);
366 }
367
368 private final class DefaultKqueueIoRegistration implements KQueueIoRegistration {
369 private final Promise<?> cancellationPromise;
370 private final KQueueIoEvent event = new KQueueIoEvent();
371
372 final KQueueIoHandle handle;
373
374 private final IoEventLoop eventLoop;
375
376 DefaultKqueueIoRegistration(IoEventLoop eventLoop, KQueueIoHandle handle) {
377 this.eventLoop = eventLoop;
378 this.handle = handle;
379 this.cancellationPromise = eventLoop.newPromise();
380 }
381
382 @Override
383 public long submit(IoOps ops) {
384 KQueueIoOps kQueueIoOps = cast(ops);
385 if (!isValid()) {
386 return -1;
387 }
388 short filter = kQueueIoOps.filter();
389 short flags = kQueueIoOps.flags();
390 int fflags = kQueueIoOps.fflags();
391 if (eventLoop.inEventLoop()) {
392 evSet(filter, flags, fflags);
393 } else {
394 eventLoop.execute(() -> evSet(filter, flags, fflags));
395 }
396 return 0;
397 }
398
399 @Override
400 public KQueueIoHandler ioHandler() {
401 return KQueueIoHandler.this;
402 }
403
404 void handle(int ident, short filter, short flags, int fflags, long data) {
405 event.update(ident, filter, flags, fflags, data);
406 handle.handle(this, event);
407 }
408
409 private void evSet(short filter, short flags, int fflags) {
410 changeList.evSet(handle.ident(), filter, flags, fflags);
411 }
412
413 @Override
414 public void cancel() {
415 if (!cancellationPromise.trySuccess(null)) {
416 return;
417 }
418 if (eventLoop.inEventLoop()) {
419 cancel0();
420 } else {
421 eventLoop.execute(this::cancel0);
422 }
423 }
424
425 @Override
426 public Future<?> cancelFuture() {
427 return cancellationPromise;
428 }
429
430 private void cancel0() {
431 int ident = handle.ident();
432 DefaultKqueueIoRegistration old = registrations.remove(ident);
433 if (old != null) {
434 if (old != this) {
435
436 registrations.put(ident, old);
437 } else if (old.handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
438 numChannels--;
439 }
440 }
441 }
442
443 void close() {
444 cancel();
445 try {
446 handle.close();
447 } catch (Exception e) {
448 logger.debug("Exception during closing " + handle, e);
449 }
450 }
451 }
452 }