1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.kqueue;
17
18 import io.netty5.channel.DefaultSelectStrategyFactory;
19 import io.netty5.channel.IoExecutionContext;
20 import io.netty5.channel.IoHandle;
21 import io.netty5.channel.IoHandler;
22 import io.netty5.channel.IoHandlerFactory;
23 import io.netty5.channel.SelectStrategy;
24 import io.netty5.channel.SelectStrategyFactory;
25 import io.netty5.channel.unix.FileDescriptor;
26 import io.netty5.channel.unix.IovArray;
27 import io.netty5.util.collection.IntObjectHashMap;
28 import io.netty5.util.collection.IntObjectMap;
29 import io.netty5.util.internal.StringUtil;
30 import io.netty5.util.internal.UnstableApi;
31 import io.netty5.util.internal.logging.InternalLogger;
32 import io.netty5.util.internal.logging.InternalLoggerFactory;
33
34 import java.io.IOException;
35 import java.io.UncheckedIOException;
36 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
37 import java.util.function.IntSupplier;
38
39 import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
40 import static java.lang.Math.min;
41 import static java.util.Objects.requireNonNull;
42
43
44
45
46 @UnstableApi
47 public final class KQueueHandler implements IoHandler {
48 private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueHandler.class);
49 private static final AtomicIntegerFieldUpdater<KQueueHandler> WAKEN_UP_UPDATER =
50 AtomicIntegerFieldUpdater.newUpdater(KQueueHandler.class, "wakenUp");
51 private static final int KQUEUE_WAKE_UP_IDENT = 0;
52
53 static {
54
55
56 KQueue.ensureAvailability();
57 }
58
59 private final boolean allowGrowing;
60 private final FileDescriptor kqueueFd;
61 private final KQueueEventArray changeList;
62 private final KQueueEventArray eventList;
63 private final SelectStrategy selectStrategy;
64 private final IovArray iovArray = new IovArray();
65 private final IntSupplier selectNowSupplier = () -> {
66 try {
67 return kqueueWaitNow();
68 } catch (IOException e) {
69 throw new UncheckedIOException(e);
70 }
71 };
72
73 private final IntObjectMap<AbstractKQueueChannel<?>> channels = new IntObjectHashMap<>(4096);
74
75 private volatile int wakenUp;
76
77 private static AbstractKQueueChannel<?> cast(IoHandle handle) {
78 if (handle instanceof AbstractKQueueChannel) {
79 return (AbstractKQueueChannel<?>) handle;
80 }
81 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
82 }
83
84 private KQueueHandler() {
85 this(0, DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy());
86 }
87
88 private KQueueHandler(int maxEvents, SelectStrategy strategy) {
89 selectStrategy = requireNonNull(strategy, "strategy");
90 this.kqueueFd = Native.newKQueue();
91 if (maxEvents == 0) {
92 allowGrowing = true;
93 maxEvents = 4096;
94 } else {
95 allowGrowing = false;
96 }
97 changeList = new KQueueEventArray(maxEvents);
98 eventList = new KQueueEventArray(maxEvents);
99 int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
100 if (result < 0) {
101 destroy();
102 throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
103 }
104 }
105
106
107
108
109 public static IoHandlerFactory newFactory() {
110 return KQueueHandler::new;
111 }
112
113
114
115
116 public static IoHandlerFactory newFactory(final int maxEvents,
117 final SelectStrategyFactory selectStrategyFactory) {
118 checkPositiveOrZero(maxEvents, "maxEvents");
119 requireNonNull(selectStrategyFactory, "selectStrategyFactory");
120 return () -> new KQueueHandler(maxEvents, selectStrategyFactory.newSelectStrategy());
121 }
122
123 @Override
124 public void register(IoHandle handle) {
125 final AbstractKQueueChannel<?> kQueueChannel = cast(handle);
126 final int id = kQueueChannel.fd().intValue();
127 AbstractKQueueChannel<?> old = channels.put(id, kQueueChannel);
128
129
130 assert old == null || !old.isOpen();
131
132 kQueueChannel.register0(new KQueueRegistration() {
133 @Override
134 public void evSet(short filter, short flags, int fflags) {
135 KQueueHandler.this.evSet(kQueueChannel, filter, flags, fflags);
136 }
137
138 @Override
139 public IovArray cleanArray() {
140 return KQueueHandler.this.cleanArray();
141 }
142 });
143 }
144
145 @Override
146 public void deregister(IoHandle handle) throws Exception {
147 AbstractKQueueChannel<?> kQueueChannel = cast(handle);
148 int fd = kQueueChannel.fd().intValue();
149
150 AbstractKQueueChannel<?> old = channels.remove(fd);
151 if (old != null && old != kQueueChannel) {
152
153 channels.put(fd, old);
154
155
156 assert !kQueueChannel.isOpen();
157 } else if (kQueueChannel.isOpen()) {
158
159
160
161
162 kQueueChannel.unregisterFilters();
163 }
164
165 kQueueChannel.deregister0();
166 }
167
168 private void evSet(AbstractKQueueChannel<?> ch, short filter, short flags, int fflags) {
169 changeList.evSet(ch, filter, flags, fflags);
170 }
171
172 private IovArray cleanArray() {
173 iovArray.clear();
174 return iovArray;
175 }
176
177 @Override
178 public void wakeup(boolean inEventLoop) {
179 if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
180 wakeup();
181 }
182 }
183
184 private void wakeup() {
185 Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
186
187
188 }
189
190 private int kqueueWait(IoExecutionContext context, boolean oldWakeup) throws IOException {
191
192
193
194
195 if (oldWakeup && !context.canBlock()) {
196 return kqueueWaitNow();
197 }
198
199 long totalDelay = context.delayNanos(System.nanoTime());
200 int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
201 return kqueueWait(delaySeconds, (int) min(totalDelay - delaySeconds * 1000000000L, Integer.MAX_VALUE));
202 }
203
204 private int kqueueWaitNow() throws IOException {
205 return kqueueWait(0, 0);
206 }
207
208 private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
209 int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
210 changeList.clear();
211 return numEvents;
212 }
213
214 private void processReady(int ready) {
215 for (int i = 0; i < ready; ++i) {
216 final short filter = eventList.filter(i);
217 final short flags = eventList.flags(i);
218 final int fd = eventList.fd(i);
219 if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
220
221
222 assert filter != Native.EVFILT_USER ||
223 (filter == Native.EVFILT_USER && fd == KQUEUE_WAKE_UP_IDENT);
224 continue;
225 }
226
227 AbstractKQueueChannel<?> channel = channels.get(fd);
228 if (channel == null) {
229
230
231
232 logger.warn("events[{}]=[{}, {}] had no channel!", i, fd, filter);
233 continue;
234 }
235
236
237
238 if (filter == Native.EVFILT_WRITE) {
239 channel.writeReady();
240 } else if (filter == Native.EVFILT_READ) {
241
242 channel.readReady(eventList.data(i));
243 } else if (filter == Native.EVFILT_SOCK && (eventList.fflags(i) & Native.NOTE_RDHUP) != 0) {
244 channel.readEOF();
245 }
246
247
248
249
250 if ((flags & Native.EV_EOF) != 0) {
251 channel.readEOF();
252 }
253 }
254 }
255
256 @Override
257 public int run(IoExecutionContext context) {
258 int handled = 0;
259 try {
260 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
261 switch (strategy) {
262 case SelectStrategy.CONTINUE:
263 return 0;
264
265 case SelectStrategy.BUSY_WAIT:
266
267
268 case SelectStrategy.SELECT:
269 strategy = kqueueWait(context, WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299 if (wakenUp == 1) {
300 wakeup();
301 }
302
303 default:
304 }
305
306 if (strategy > 0) {
307 handled = strategy;
308 processReady(strategy);
309 }
310 if (allowGrowing && strategy == eventList.capacity()) {
311
312 eventList.realloc(false);
313 }
314 } catch (Error e) {
315 throw e;
316 } catch (Throwable t) {
317 handleLoopException(t);
318 }
319 return handled;
320 }
321
322 @Override
323 public void destroy() {
324 try {
325 try {
326 kqueueFd.close();
327 } catch (IOException e) {
328 logger.warn("Failed to close the kqueue fd.", e);
329 }
330 } finally {
331
332 changeList.free();
333 eventList.free();
334 }
335 }
336
337 @Override
338 public void prepareToDestroy() {
339 try {
340 kqueueWaitNow();
341 } catch (IOException e) {
342
343 }
344
345
346
347 AbstractKQueueChannel<?>[] localChannels = channels.values().toArray(new AbstractKQueueChannel[0]);
348
349 for (AbstractKQueueChannel<?> ch: localChannels) {
350 ch.closeTransportNow();
351 }
352 }
353
354 @Override
355 public boolean isCompatible(Class<? extends IoHandle> handleType) {
356 return AbstractKQueueChannel.class.isAssignableFrom(handleType);
357 }
358
359 private static void handleLoopException(Throwable t) {
360 logger.warn("Unexpected exception in the selector loop.", t);
361
362
363
364 try {
365 Thread.sleep(1000);
366 } catch (InterruptedException e) {
367
368 }
369 }
370 }