1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.DefaultSelectStrategyFactory;
20 import io.netty.channel.EventLoop;
21 import io.netty.channel.IoEventLoop;
22 import io.netty.channel.IoExecutionContext;
23 import io.netty.channel.IoHandle;
24 import io.netty.channel.IoHandler;
25 import io.netty.channel.IoHandlerFactory;
26 import io.netty.channel.IoOps;
27 import io.netty.channel.SelectStrategy;
28 import io.netty.channel.SelectStrategyFactory;
29 import io.netty.channel.unix.FileDescriptor;
30 import io.netty.channel.unix.IovArray;
31 import io.netty.util.IntSupplier;
32 import io.netty.util.collection.IntObjectHashMap;
33 import io.netty.util.collection.IntObjectMap;
34 import io.netty.util.internal.ObjectUtil;
35 import io.netty.util.internal.StringUtil;
36 import io.netty.util.internal.logging.InternalLogger;
37 import io.netty.util.internal.logging.InternalLoggerFactory;
38
39 import java.io.IOException;
40 import java.util.ArrayList;
41 import java.util.Collections;
42 import java.util.List;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
45
46 import static java.lang.Math.min;
47
48
49
50
51 public final class KQueueIoHandler implements IoHandler {
52 private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueIoHandler.class);
53 private static final AtomicIntegerFieldUpdater<KQueueIoHandler> WAKEN_UP_UPDATER =
54 AtomicIntegerFieldUpdater.newUpdater(KQueueIoHandler.class, "wakenUp");
55 private static final int KQUEUE_WAKE_UP_IDENT = 0;
56
57
58
59 private static final int KQUEUE_MAX_TIMEOUT_SECONDS = 86399;
60
61 static {
62
63
64 KQueue.ensureAvailability();
65 }
66
67 private final boolean allowGrowing;
68 private final FileDescriptor kqueueFd;
69 private final KQueueEventArray changeList;
70 private final KQueueEventArray eventList;
71 private final SelectStrategy selectStrategy;
72 private final IovArray iovArray = new IovArray();
73 private final IntSupplier selectNowSupplier = new IntSupplier() {
74 @Override
75 public int get() throws Exception {
76 return kqueueWaitNow();
77 }
78 };
79 private final IntObjectMap<DefaultKqueueIoRegistration> registrations = new IntObjectHashMap<>(4096);
80 private int numChannels;
81
82 private volatile int wakenUp;
83
84
85
86
87 public static IoHandlerFactory newFactory() {
88 return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
89 }
90
91
92
93
94 public static IoHandlerFactory newFactory(final int maxEvents,
95 final SelectStrategyFactory selectStrategyFactory) {
96 ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
97 ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
98 return new IoHandlerFactory() {
99 @Override
100 public IoHandler newHandler() {
101 return new KQueueIoHandler(maxEvents, selectStrategyFactory.newSelectStrategy());
102 }
103 };
104 }
105
106 private KQueueIoHandler(int maxEvents, SelectStrategy strategy) {
107 this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
108 this.kqueueFd = Native.newKQueue();
109 if (maxEvents == 0) {
110 allowGrowing = true;
111 maxEvents = 4096;
112 } else {
113 allowGrowing = false;
114 }
115 this.changeList = new KQueueEventArray(maxEvents);
116 this.eventList = new KQueueEventArray(maxEvents);
117 int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
118 if (result < 0) {
119 destroy();
120 throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
121 }
122 }
123
124
125
126
127 IovArray cleanArray() {
128 iovArray.clear();
129 return iovArray;
130 }
131
132 @Override
133 public void wakeup(IoEventLoop eventLoop) {
134 if (!eventLoop.inEventLoop() && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
135 wakeup();
136 }
137 }
138
139 private void wakeup() {
140 Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
141
142
143 }
144
145 private int kqueueWait(IoExecutionContext context, boolean oldWakeup) throws IOException {
146
147
148
149
150 if (oldWakeup && !context.canBlock()) {
151 return kqueueWaitNow();
152 }
153
154 long totalDelay = context.delayNanos(System.nanoTime());
155 int delaySeconds = (int) min(totalDelay / 1000000000L, KQUEUE_MAX_TIMEOUT_SECONDS);
156 int delayNanos = (int) (totalDelay % 1000000000L);
157 return kqueueWait(delaySeconds, delayNanos);
158 }
159
160 private int kqueueWaitNow() throws IOException {
161 return kqueueWait(0, 0);
162 }
163
164 private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
165 int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
166 changeList.clear();
167 return numEvents;
168 }
169
170 private void processReady(int ready) {
171 for (int i = 0; i < ready; ++i) {
172 final short filter = eventList.filter(i);
173 final short flags = eventList.flags(i);
174 final int ident = eventList.ident(i);
175 if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
176
177
178 assert filter != Native.EVFILT_USER ||
179 (filter == Native.EVFILT_USER && ident == KQUEUE_WAKE_UP_IDENT);
180 continue;
181 }
182
183 DefaultKqueueIoRegistration registration = registrations.get(ident);
184 if (registration == null) {
185
186
187
188 logger.warn("events[{}]=[{}, {}] had no registration!", i, ident, filter);
189 continue;
190 }
191 registration.handle(ident, filter, flags, eventList.fflags(i), eventList.data(i));
192 }
193 }
194
195 @Override
196 public int run(IoExecutionContext context) {
197 int handled = 0;
198 try {
199 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
200 switch (strategy) {
201 case SelectStrategy.CONTINUE:
202 return 0;
203
204 case SelectStrategy.BUSY_WAIT:
205
206
207 case SelectStrategy.SELECT:
208 strategy = kqueueWait(context, WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238 if (wakenUp == 1) {
239 wakeup();
240 }
241
242 default:
243 }
244
245 if (strategy > 0) {
246 handled = strategy;
247 processReady(strategy);
248 }
249
250 if (allowGrowing && strategy == eventList.capacity()) {
251
252 eventList.realloc(false);
253 }
254 } catch (Error e) {
255 throw e;
256 } catch (Throwable t) {
257 handleLoopException(t);
258 }
259 return handled;
260 }
261
262 int numRegisteredChannels() {
263 return numChannels;
264 }
265
266 List<Channel> registeredChannelsList() {
267 IntObjectMap<DefaultKqueueIoRegistration> ch = registrations;
268 if (ch.isEmpty()) {
269 return Collections.emptyList();
270 }
271
272 List<Channel> channels = new ArrayList<>(ch.size());
273
274 for (DefaultKqueueIoRegistration registration : ch.values()) {
275 if (registration.handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
276 channels.add(((AbstractKQueueChannel.AbstractKQueueUnsafe) registration.handle).channel());
277 }
278 }
279 return Collections.unmodifiableList(channels);
280 }
281
282 private static void handleLoopException(Throwable t) {
283 logger.warn("Unexpected exception in the selector loop.", t);
284
285
286
287 try {
288 Thread.sleep(1000);
289 } catch (InterruptedException e) {
290
291 }
292 }
293
294 @Override
295 public void prepareToDestroy() {
296 try {
297 kqueueWaitNow();
298 } catch (IOException e) {
299
300 }
301
302
303
304 DefaultKqueueIoRegistration[] copy = registrations.values().toArray(new DefaultKqueueIoRegistration[0]);
305
306 for (DefaultKqueueIoRegistration reg: copy) {
307 reg.close();
308 }
309 }
310
311 @Override
312 public void destroy() {
313 try {
314 try {
315 kqueueFd.close();
316 } catch (IOException e) {
317 logger.warn("Failed to close the kqueue fd.", e);
318 }
319 } finally {
320
321 changeList.free();
322 eventList.free();
323 }
324 }
325
326 @Override
327 public KQueueIoRegistration register(IoEventLoop eventLoop, IoHandle handle) {
328 final KQueueIoHandle kqueueHandle = cast(handle);
329 if (kqueueHandle.ident() == KQUEUE_WAKE_UP_IDENT) {
330 throw new IllegalArgumentException("ident " + KQUEUE_WAKE_UP_IDENT + " is reserved for internal usage");
331 }
332
333 DefaultKqueueIoRegistration registration = new DefaultKqueueIoRegistration(
334 eventLoop, kqueueHandle);
335 DefaultKqueueIoRegistration old = registrations.put(kqueueHandle.ident(), registration);
336 if (old != null) {
337
338 registrations.put(kqueueHandle.ident(), old);
339 throw new IllegalStateException("registration for the KQueueIoHandle.ident() already exists");
340 }
341
342 if (kqueueHandle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
343 numChannels++;
344 }
345 return registration;
346 }
347
348 private static KQueueIoHandle cast(IoHandle handle) {
349 if (handle instanceof KQueueIoHandle) {
350 return (KQueueIoHandle) handle;
351 }
352 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
353 }
354
355 private static KQueueIoOps cast(IoOps ops) {
356 if (ops instanceof KQueueIoOps) {
357 return (KQueueIoOps) ops;
358 }
359 throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
360 }
361
362 @Override
363 public boolean isCompatible(Class<? extends IoHandle> handleType) {
364 return KQueueIoHandle.class.isAssignableFrom(handleType);
365 }
366
367 private final class DefaultKqueueIoRegistration extends AtomicBoolean implements KQueueIoRegistration {
368 private final KQueueIoEvent event = new KQueueIoEvent();
369
370 final KQueueIoHandle handle;
371
372 private final IoEventLoop eventLoop;
373
374 DefaultKqueueIoRegistration(IoEventLoop eventLoop, KQueueIoHandle handle) {
375 this.eventLoop = eventLoop;
376 this.handle = handle;
377 }
378
379 @Override
380 public long submit(IoOps ops) {
381 KQueueIoOps kQueueIoOps = cast(ops);
382 if (!isValid()) {
383 return -1;
384 }
385 short filter = kQueueIoOps.filter();
386 short flags = kQueueIoOps.flags();
387 int fflags = kQueueIoOps.fflags();
388 if (eventLoop.inEventLoop()) {
389 evSet(filter, flags, fflags);
390 } else {
391 eventLoop.execute(() -> evSet(filter, flags, fflags));
392 }
393 return 0;
394 }
395
396 @Override
397 public KQueueIoHandler ioHandler() {
398 return KQueueIoHandler.this;
399 }
400
401 void handle(int ident, short filter, short flags, int fflags, long data) {
402 event.update(ident, filter, flags, fflags, data);
403 handle.handle(this, event);
404 }
405
406 private void evSet(short filter, short flags, int fflags) {
407 changeList.evSet(handle.ident(), filter, flags, fflags);
408 }
409
410 @Override
411 public boolean isValid() {
412 return !get();
413 }
414
415 @Override
416 public void cancel() {
417 if (getAndSet(true)) {
418 return;
419 }
420 if (eventLoop.inEventLoop()) {
421 cancel0();
422 } else {
423 eventLoop.execute(this::cancel0);
424 }
425 }
426
427 private void cancel0() {
428 int ident = handle.ident();
429 DefaultKqueueIoRegistration old = registrations.remove(ident);
430 if (old != null) {
431 if (old != this) {
432
433 registrations.put(ident, old);
434 } else if (old.handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
435 numChannels--;
436 }
437 }
438 }
439
440 void close() {
441 cancel();
442 try {
443 handle.close();
444 } catch (Exception e) {
445 logger.debug("Exception during closing " + handle, e);
446 }
447 }
448 }
449 }