1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.epoll;
17
18 import io.netty5.channel.DefaultSelectStrategyFactory;
19 import io.netty5.channel.IoExecutionContext;
20 import io.netty5.channel.IoHandle;
21 import io.netty5.channel.IoHandler;
22 import io.netty5.channel.IoHandlerFactory;
23 import io.netty5.channel.SelectStrategy;
24 import io.netty5.channel.SelectStrategyFactory;
25 import io.netty5.channel.SingleThreadEventLoop;
26 import io.netty5.channel.unix.FileDescriptor;
27 import io.netty5.channel.unix.IovArray;
28 import io.netty5.util.collection.IntObjectHashMap;
29 import io.netty5.util.collection.IntObjectMap;
30 import io.netty5.util.internal.StringUtil;
31 import io.netty5.util.internal.SystemPropertyUtil;
32 import io.netty5.util.internal.logging.InternalLogger;
33 import io.netty5.util.internal.logging.InternalLoggerFactory;
34
35 import java.io.IOException;
36 import java.io.UncheckedIOException;
37 import java.util.concurrent.atomic.AtomicLong;
38 import java.util.function.IntSupplier;
39
40 import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
41 import static java.lang.Math.min;
42 import static java.util.Objects.requireNonNull;
43
44
45
46
47 public class EpollHandler implements IoHandler {
48 private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
49 SystemPropertyUtil.getLong("io.netty5.channel.epoll.epollWaitThreshold", 10);
50
51 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollHandler.class);
52
53 static {
54
55
56 Epoll.ensureAvailability();
57 }
58
59
60 private long prevDeadlineNanos = SingleThreadEventLoop.nanoTime() - 1;
61 private final FileDescriptor epollFd;
62 private final FileDescriptor eventFd;
63 private final FileDescriptor timerFd;
64 private final IntObjectMap<AbstractEpollChannel<?>> channels = new IntObjectHashMap<>(4096);
65 private final boolean allowGrowing;
66 private final EpollEventArray events;
67
68
69 private IovArray iovArray;
70 private NativeDatagramPacketArray datagramPacketArray;
71
72 private final SelectStrategy selectStrategy;
73 private final IntSupplier selectNowSupplier = () -> {
74 try {
75 return epollWaitNow();
76 } catch (IOException e) {
77 throw new UncheckedIOException(e);
78 }
79 };
80
81 private static final long AWAKE = -1L;
82 private static final long NONE = Long.MAX_VALUE;
83
84
85
86
87
88 private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
89
90 private boolean pendingWakeup;
91
92
93 private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
94
95 private static AbstractEpollChannel<?> cast(IoHandle handle) {
96 if (handle instanceof AbstractEpollChannel) {
97 return (AbstractEpollChannel<?>) handle;
98 }
99 throw new IllegalArgumentException("Channel of type " + StringUtil.simpleClassName(handle) + " not supported");
100 }
101
102 private EpollHandler() {
103 this(0, DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy());
104 }
105
106
107 EpollHandler(int maxEvents, SelectStrategy strategy) {
108 selectStrategy = strategy;
109 if (maxEvents == 0) {
110 allowGrowing = true;
111 events = new EpollEventArray(4096);
112 } else {
113 allowGrowing = false;
114 events = new EpollEventArray(maxEvents);
115 }
116 boolean success = false;
117 FileDescriptor epollFd = null;
118 FileDescriptor eventFd = null;
119 FileDescriptor timerFd = null;
120 try {
121 this.epollFd = epollFd = Native.newEpollCreate();
122 this.eventFd = eventFd = Native.newEventFd();
123 try {
124
125
126 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
127 } catch (IOException e) {
128 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
129 }
130 this.timerFd = timerFd = Native.newTimerFd();
131 try {
132
133
134 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
135 } catch (IOException e) {
136 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
137 }
138 success = true;
139 } finally {
140 if (!success) {
141 if (epollFd != null) {
142 try {
143 epollFd.close();
144 } catch (Exception e) {
145
146 }
147 }
148 if (eventFd != null) {
149 try {
150 eventFd.close();
151 } catch (Exception e) {
152
153 }
154 }
155 if (timerFd != null) {
156 try {
157 timerFd.close();
158 } catch (Exception e) {
159
160 }
161 }
162 }
163 }
164 }
165
166
167
168
169 public static IoHandlerFactory newFactory() {
170 return EpollHandler::new;
171 }
172
173
174
175
176 public static IoHandlerFactory newFactory(final int maxEvents,
177 final SelectStrategyFactory selectStrategyFactory) {
178 checkPositiveOrZero(maxEvents, "maxEvents");
179 requireNonNull(selectStrategyFactory, "selectStrategyFactory");
180 return () -> new EpollHandler(maxEvents, selectStrategyFactory.newSelectStrategy());
181 }
182
183 private IovArray cleanIovArray() {
184 if (iovArray == null) {
185 iovArray = new IovArray();
186 } else {
187 iovArray.clear();
188 }
189 return iovArray;
190 }
191
192 private NativeDatagramPacketArray cleanDatagramPacketArray() {
193 if (datagramPacketArray == null) {
194 datagramPacketArray = new NativeDatagramPacketArray();
195 } else {
196 datagramPacketArray.clear();
197 }
198 return datagramPacketArray;
199 }
200
201 @Override
202 public final void register(IoHandle handle) throws Exception {
203 final AbstractEpollChannel<?> epollChannel = cast(handle);
204 epollChannel.register0(new EpollRegistration() {
205 @Override
206 public void update() throws IOException {
207 EpollHandler.this.modify(epollChannel);
208 }
209
210 @Override
211 public void remove() throws IOException {
212 EpollHandler.this.remove(epollChannel);
213 }
214
215 @Override
216 public IovArray cleanIovArray() {
217 return EpollHandler.this.cleanIovArray();
218 }
219
220 @Override
221 public NativeDatagramPacketArray cleanDatagramPacketArray() {
222 return EpollHandler.this.cleanDatagramPacketArray();
223 }
224 });
225 add(epollChannel);
226 }
227
228 @Override
229 public final void deregister(IoHandle handle) throws Exception {
230 cast(handle).deregister0();
231 }
232
233 @Override
234 public final void wakeup(boolean inEventLoop) {
235 if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
236
237 Native.eventFdWrite(eventFd.intValue(), 1L);
238 }
239 }
240
241
242
243
244 private void add(AbstractEpollChannel<?> ch) throws IOException {
245 int fd = ch.socket.intValue();
246 Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags());
247 AbstractEpollChannel<?> old = channels.put(fd, ch);
248
249
250
251 assert old == null || !old.isOpen();
252 }
253
254
255
256
257 private void modify(AbstractEpollChannel<?> ch) throws IOException {
258 Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags());
259 }
260
261
262
263
264 private void remove(AbstractEpollChannel<?> ch) throws IOException {
265 int fd = ch.socket.intValue();
266
267 AbstractEpollChannel<?> old = channels.remove(fd);
268 if (old != null && old != ch) {
269
270 channels.put(fd, old);
271
272
273 assert !ch.isOpen();
274 } else if (ch.isOpen()) {
275
276
277 Native.epollCtlDel(epollFd.intValue(), fd);
278 }
279 }
280
281 private long epollWait(IoExecutionContext context, long deadlineNanos) throws IOException {
282 if (deadlineNanos == NONE) {
283 return Native.epollWait(epollFd, events, timerFd,
284 Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD);
285 }
286 long totalDelay = context.delayNanos(System.nanoTime());
287 int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
288 int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
289 return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
290 }
291
292 private int epollWaitNoTimerChange() throws IOException {
293 return Native.epollWait(epollFd, events, false);
294 }
295
296 private int epollWaitNow() throws IOException {
297 return Native.epollWait(epollFd, events, true);
298 }
299
300 private int epollBusyWait() throws IOException {
301 return Native.epollBusyWait(epollFd, events);
302 }
303
304 private int epollWaitTimeboxed() throws IOException {
305
306 return Native.epollWait(epollFd, events, 1000);
307 }
308
309 @Override
310 public final int run(IoExecutionContext context) {
311 int handled = 0;
312 try {
313 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
314 switch (strategy) {
315 case SelectStrategy.CONTINUE:
316 return 0;
317
318 case SelectStrategy.BUSY_WAIT:
319 strategy = epollBusyWait();
320 break;
321
322 case SelectStrategy.SELECT:
323 if (pendingWakeup) {
324
325
326 strategy = epollWaitTimeboxed();
327 if (strategy != 0) {
328 break;
329 }
330
331
332 logger.warn("Missed eventfd write (not seen after > 1 second)");
333 pendingWakeup = false;
334 if (!context.canBlock()) {
335 break;
336 }
337
338 }
339
340 long curDeadlineNanos = context.deadlineNanos();
341 if (curDeadlineNanos == -1L) {
342 curDeadlineNanos = NONE;
343 }
344 nextWakeupNanos.set(curDeadlineNanos);
345 try {
346 if (context.canBlock()) {
347 if (curDeadlineNanos == prevDeadlineNanos) {
348
349 strategy = epollWaitNoTimerChange();
350 } else {
351
352 long result = epollWait(context, curDeadlineNanos);
353
354
355 strategy = Native.epollReady(result);
356 prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
357 }
358 }
359 } finally {
360
361
362 if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
363 pendingWakeup = true;
364 }
365 }
366
367 default:
368 }
369 if (strategy > 0) {
370 handled = strategy;
371 if (processReady(events, strategy)) {
372 prevDeadlineNanos = NONE;
373 }
374 }
375 if (allowGrowing && strategy == events.length()) {
376
377 events.increase();
378 }
379 } catch (Error error) {
380 throw error;
381 } catch (Throwable t) {
382 handleLoopException(t);
383 }
384 return handled;
385 }
386
387
388
389
390 void handleLoopException(Throwable t) {
391 logger.warn("Unexpected exception in the selector loop.", t);
392
393
394
395 try {
396 Thread.sleep(1000);
397 } catch (InterruptedException e) {
398
399 }
400 }
401
402 @Override
403 public void prepareToDestroy() {
404
405
406 AbstractEpollChannel<?>[] localChannels = channels.values().toArray(new AbstractEpollChannel[0]);
407
408 for (AbstractEpollChannel<?> ch: localChannels) {
409 ch.closeTransportNow();
410 }
411 }
412
413
414 private boolean processReady(EpollEventArray events, int ready) {
415 boolean timerFired = false;
416 for (int i = 0; i < ready; i ++) {
417 final int fd = events.fd(i);
418 if (fd == eventFd.intValue()) {
419 pendingWakeup = false;
420 } else if (fd == timerFd.intValue()) {
421 timerFired = true;
422 } else {
423 final long ev = events.events(i);
424
425 AbstractEpollChannel<?> ch = channels.get(fd);
426 if (ch != null) {
427
428
429
430
431
432
433
434
435
436
437
438
439
440 if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) {
441
442 ch.epollOutReady();
443 }
444
445
446
447
448
449
450 if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) {
451
452 ch.epollInReady();
453 }
454
455
456
457
458 if ((ev & Native.EPOLLRDHUP) != 0) {
459 ch.epollRdHupReady();
460 }
461 } else {
462
463 try {
464 Native.epollCtlDel(epollFd.intValue(), fd);
465 } catch (IOException ignore) {
466
467
468
469
470 }
471 }
472 }
473 }
474 return timerFired;
475 }
476
477 @Override
478 public final void destroy() {
479 try {
480
481 while (pendingWakeup) {
482 try {
483 int count = epollWaitTimeboxed();
484 if (count == 0) {
485
486 break;
487 }
488 for (int i = 0; i < count; i++) {
489 if (events.fd(i) == eventFd.intValue()) {
490 pendingWakeup = false;
491 break;
492 }
493 }
494 } catch (IOException ignore) {
495
496 }
497 }
498 try {
499 eventFd.close();
500 } catch (IOException e) {
501 logger.warn("Failed to close the event fd.", e);
502 }
503 try {
504 timerFd.close();
505 } catch (IOException e) {
506 logger.warn("Failed to close the timer fd.", e);
507 }
508 try {
509 epollFd.close();
510 } catch (IOException e) {
511 logger.warn("Failed to close the epoll fd.", e);
512 }
513 } finally {
514
515 if (iovArray != null) {
516 iovArray.release();
517 iovArray = null;
518 }
519 if (datagramPacketArray != null) {
520 datagramPacketArray.release();
521 datagramPacketArray = null;
522 }
523 events.free();
524 }
525 }
526
527 @Override
528 public boolean isCompatible(Class<? extends IoHandle> handleType) {
529 return AbstractEpollChannel.class.isAssignableFrom(handleType);
530 }
531 }