1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.DefaultSelectStrategyFactory;
20 import io.netty.channel.IoHandlerContext;
21 import io.netty.channel.IoHandle;
22 import io.netty.channel.IoHandler;
23 import io.netty.channel.IoHandlerFactory;
24 import io.netty.channel.IoOps;
25 import io.netty.channel.IoRegistration;
26 import io.netty.channel.SelectStrategy;
27 import io.netty.channel.SelectStrategyFactory;
28 import io.netty.channel.unix.FileDescriptor;
29 import io.netty.util.IntSupplier;
30 import io.netty.util.collection.IntObjectHashMap;
31 import io.netty.util.collection.IntObjectMap;
32 import io.netty.util.concurrent.ThreadAwareExecutor;
33 import io.netty.util.internal.ObjectUtil;
34 import io.netty.util.internal.StringUtil;
35 import io.netty.util.internal.SystemPropertyUtil;
36 import io.netty.util.internal.UnstableApi;
37 import io.netty.util.internal.logging.InternalLogger;
38 import io.netty.util.internal.logging.InternalLoggerFactory;
39
40 import java.io.IOException;
41 import java.io.UncheckedIOException;
42 import java.util.ArrayList;
43 import java.util.Collections;
44 import java.util.List;
45 import java.util.concurrent.atomic.AtomicLong;
46
47 import static java.lang.Math.min;
48
49
50
51
52 public class EpollIoHandler implements IoHandler {
53 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollIoHandler.class);
54 private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
55 SystemPropertyUtil.getLong("io.netty.channel.epoll.epollWaitThreshold", 10);
56
57 {
58 Epoll.ensureAvailability();
59 }
60
61
62 private long prevDeadlineNanos = NONE;
63 private FileDescriptor epollFd;
64 private FileDescriptor eventFd;
65 private FileDescriptor timerFd;
66 private final IntObjectMap<DefaultEpollIoRegistration> registrations = new IntObjectHashMap<>(4096);
67 private final boolean allowGrowing;
68 private final EpollEventArray events;
69 private final NativeArrays nativeArrays;
70
71 private final SelectStrategy selectStrategy;
72 private final IntSupplier selectNowSupplier = new IntSupplier() {
73 @Override
74 public int get() throws Exception {
75 return epollWaitNow();
76 }
77 };
78 private final ThreadAwareExecutor executor;
79
80 private static final long AWAKE = -1L;
81 private static final long NONE = Long.MAX_VALUE;
82
83
84
85
86
87 private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
88 private boolean pendingWakeup;
89
90 private int numChannels;
91
92
93 private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
94
95
96
97
98 public static IoHandlerFactory newFactory() {
99 return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
100 }
101
102
103
104
105 public static IoHandlerFactory newFactory(final int maxEvents,
106 final SelectStrategyFactory selectStrategyFactory) {
107 Epoll.ensureAvailability();
108 ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
109 ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
110 return new IoHandlerFactory() {
111 @Override
112 public IoHandler newHandler(ThreadAwareExecutor executor) {
113 return new EpollIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
114 }
115
116 @Override
117 public boolean isChangingThreadSupported() {
118 return true;
119 }
120 };
121 }
122
123
124 EpollIoHandler(ThreadAwareExecutor executor, int maxEvents, SelectStrategy strategy) {
125 this.executor = ObjectUtil.checkNotNull(executor, "executor");
126 selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
127 if (maxEvents == 0) {
128 allowGrowing = true;
129 events = new EpollEventArray(4096);
130 } else {
131 allowGrowing = false;
132 events = new EpollEventArray(maxEvents);
133 }
134 nativeArrays = new NativeArrays();
135 openFileDescriptors();
136 }
137
138 private static EpollIoHandle cast(IoHandle handle) {
139 if (handle instanceof EpollIoHandle) {
140 return (EpollIoHandle) handle;
141 }
142 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
143 }
144
145 private static EpollIoOps cast(IoOps ops) {
146 if (ops instanceof EpollIoOps) {
147 return (EpollIoOps) ops;
148 }
149 throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
150 }
151
152
153
154
155
156 @UnstableApi
157 public void openFileDescriptors() {
158 boolean success = false;
159 FileDescriptor epollFd = null;
160 FileDescriptor eventFd = null;
161 FileDescriptor timerFd = null;
162 try {
163 this.epollFd = epollFd = Native.newEpollCreate();
164 this.eventFd = eventFd = Native.newEventFd();
165 try {
166
167
168 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
169 } catch (IOException e) {
170 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
171 }
172 this.timerFd = timerFd = Native.newTimerFd();
173 try {
174
175
176 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
177 } catch (IOException e) {
178 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
179 }
180 success = true;
181 } finally {
182 if (!success) {
183 closeFileDescriptor(epollFd);
184 closeFileDescriptor(eventFd);
185 closeFileDescriptor(timerFd);
186 }
187 }
188 }
189
190 private static void closeFileDescriptor(FileDescriptor fd) {
191 if (fd != null) {
192 try {
193 fd.close();
194 } catch (Exception e) {
195
196 }
197 }
198 }
199
200 @Override
201 public void wakeup() {
202 if (!executor.isExecutorThread(Thread.currentThread()) && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
203
204 Native.eventFdWrite(eventFd.intValue(), 1L);
205 }
206 }
207
208 @Override
209 public void prepareToDestroy() {
210
211
212 DefaultEpollIoRegistration[] copy = registrations.values().toArray(new DefaultEpollIoRegistration[0]);
213
214 for (DefaultEpollIoRegistration reg: copy) {
215 reg.close();
216 }
217 }
218
219 @Override
220 public void destroy() {
221 try {
222 closeFileDescriptors();
223 } finally {
224 nativeArrays.free();
225 events.free();
226 }
227 }
228
229 private enum RegistrationState {
230
231 Pending,
232
233 Added,
234
235 Cancelled
236 }
237
238 private final class DefaultEpollIoRegistration
239 implements IoRegistration {
240 private final ThreadAwareExecutor executor;
241 private RegistrationState state = RegistrationState.Pending;
242 final EpollIoHandle handle;
243
244 DefaultEpollIoRegistration(ThreadAwareExecutor executor, EpollIoHandle handle) {
245 this.executor = executor;
246 this.handle = handle;
247 }
248
249 @SuppressWarnings("unchecked")
250 @Override
251 public <T> T attachment() {
252 return (T) nativeArrays;
253 }
254
255 @Override
256 public long submit(IoOps ops) {
257 EpollIoOps epollIoOps = cast(ops);
258 try {
259 synchronized (this) {
260 switch (state) {
261 case Cancelled:
262 return -1;
263 case Pending:
264 Native.epollCtlAdd(epollFd.intValue(), handle.fd().intValue(), epollIoOps.value);
265 state = RegistrationState.Added;
266 case Added:
267 Native.epollCtlMod(epollFd.intValue(), handle.fd().intValue(), epollIoOps.value);
268 return epollIoOps.value;
269 default:
270 throw new IllegalStateException();
271 }
272 }
273 } catch (IOException e) {
274 throw new UncheckedIOException(e);
275 }
276 }
277
278 @Override
279 public synchronized boolean isValid() {
280 return state != RegistrationState.Cancelled;
281 }
282
283 @Override
284 public boolean cancel() {
285 synchronized (this) {
286 if (state == RegistrationState.Cancelled) {
287 return false;
288 }
289 state = RegistrationState.Cancelled;
290 }
291 if (executor.isExecutorThread(Thread.currentThread())) {
292 cancel0();
293 } else {
294 executor.execute(this::cancel0);
295 }
296 return true;
297 }
298
299 private void cancel0() {
300 int fd = handle.fd().intValue();
301 DefaultEpollIoRegistration old = registrations.remove(fd);
302 if (old != null) {
303 if (old != this) {
304
305 registrations.put(fd, old);
306 return;
307 } else if (old.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
308 numChannels--;
309 }
310 if (handle.fd().isOpen()) {
311 try {
312
313
314 Native.epollCtlDel(epollFd.intValue(), fd);
315 } catch (IOException e) {
316 logger.debug("Unable to remove fd {} from epoll {}", fd, epollFd.intValue());
317 }
318 }
319 handle.unregistered();
320 }
321 }
322
323 void close() {
324 try {
325 cancel();
326 } catch (Exception e) {
327 logger.debug("Exception during canceling " + this, e);
328 }
329 try {
330 handle.close();
331 } catch (Exception e) {
332 logger.debug("Exception during closing " + handle, e);
333 }
334 }
335
336 void handle(long ev) {
337 handle.handle(this, EpollIoOps.eventOf((int) ev));
338 }
339 }
340
341 @Override
342 public IoRegistration register(IoHandle handle)
343 throws Exception {
344 final EpollIoHandle epollHandle = cast(handle);
345 DefaultEpollIoRegistration registration = new DefaultEpollIoRegistration(executor, epollHandle);
346 int fd = epollHandle.fd().intValue();
347 DefaultEpollIoRegistration old = registrations.put(fd, registration);
348
349
350
351 assert old == null || !old.isValid();
352
353 if (epollHandle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
354 numChannels++;
355 }
356 handle.registered();
357 return registration;
358 }
359
360 @Override
361 public boolean isCompatible(Class<? extends IoHandle> handleType) {
362 return EpollIoHandle.class.isAssignableFrom(handleType);
363 }
364
365 int numRegisteredChannels() {
366 return numChannels;
367 }
368
369 List<Channel> registeredChannelsList() {
370 IntObjectMap<DefaultEpollIoRegistration> ch = registrations;
371 if (ch.isEmpty()) {
372 return Collections.emptyList();
373 }
374
375 List<Channel> channels = new ArrayList<>(ch.size());
376 for (DefaultEpollIoRegistration registration : ch.values()) {
377 if (registration.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
378 channels.add(((AbstractEpollChannel.AbstractEpollUnsafe) registration.handle).channel());
379 }
380 }
381 return Collections.unmodifiableList(channels);
382 }
383
384 private long epollWait(IoHandlerContext context, long deadlineNanos) throws IOException {
385 if (deadlineNanos == NONE) {
386 return Native.epollWait(epollFd, events, timerFd,
387 Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD);
388 }
389 long totalDelay = context.delayNanos(System.nanoTime());
390 int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
391 int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
392 return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
393 }
394
395 private int epollWaitNoTimerChange() throws IOException {
396 return Native.epollWait(epollFd, events, false);
397 }
398
399 private int epollWaitNow() throws IOException {
400 return Native.epollWait(epollFd, events, true);
401 }
402
403 private int epollBusyWait() throws IOException {
404 return Native.epollBusyWait(epollFd, events);
405 }
406
407 private int epollWaitTimeboxed() throws IOException {
408
409 return Native.epollWait(epollFd, events, 1000);
410 }
411
412 @Override
413 public int run(IoHandlerContext context) {
414 int handled = 0;
415 try {
416 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
417 switch (strategy) {
418 case SelectStrategy.CONTINUE:
419 if (context.shouldReportActiveIoTime()) {
420 context.reportActiveIoTime(0);
421 }
422 return 0;
423
424 case SelectStrategy.BUSY_WAIT:
425 strategy = epollBusyWait();
426 break;
427
428 case SelectStrategy.SELECT:
429 if (pendingWakeup) {
430
431
432 strategy = epollWaitTimeboxed();
433 if (strategy != 0) {
434 break;
435 }
436
437
438 logger.warn("Missed eventfd write (not seen after > 1 second)");
439 pendingWakeup = false;
440 if (!context.canBlock()) {
441 break;
442 }
443
444 }
445
446 long curDeadlineNanos = context.deadlineNanos();
447 if (curDeadlineNanos == -1L) {
448 curDeadlineNanos = NONE;
449 }
450 nextWakeupNanos.set(curDeadlineNanos);
451 try {
452 if (context.canBlock()) {
453 if (curDeadlineNanos == prevDeadlineNanos) {
454
455 strategy = epollWaitNoTimerChange();
456 } else {
457
458 long result = epollWait(context, curDeadlineNanos);
459
460
461 strategy = Native.epollReady(result);
462 prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
463 }
464 }
465 } finally {
466
467
468 if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
469 pendingWakeup = true;
470 }
471 }
472
473 default:
474 }
475 if (strategy > 0) {
476 handled = strategy;
477 if (context.shouldReportActiveIoTime()) {
478 long activeIoStartTimeNanos = System.nanoTime();
479 if (processReady(events, strategy)) {
480 prevDeadlineNanos = NONE;
481 }
482 long activeIoEndTimeNanos = System.nanoTime();
483 context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
484 } else {
485 if (processReady(events, strategy)) {
486 prevDeadlineNanos = NONE;
487 }
488 }
489 } else if (context.shouldReportActiveIoTime()) {
490 context.reportActiveIoTime(0);
491 }
492
493 if (allowGrowing && strategy == events.length()) {
494
495 events.increase();
496 }
497 } catch (Error e) {
498 throw e;
499 } catch (Throwable t) {
500 handleLoopException(t);
501 }
502 return handled;
503 }
504
505
506
507
508 void handleLoopException(Throwable t) {
509 logger.warn("Unexpected exception in the selector loop.", t);
510
511
512
513 try {
514 Thread.sleep(1000);
515 } catch (InterruptedException e) {
516
517 }
518 }
519
520
521 private boolean processReady(EpollEventArray events, int ready) {
522 boolean timerFired = false;
523 for (int i = 0; i < ready; i ++) {
524 final int fd = events.fd(i);
525 if (fd == eventFd.intValue()) {
526 pendingWakeup = false;
527 } else if (fd == timerFd.intValue()) {
528 timerFired = true;
529 } else {
530 final long ev = events.events(i);
531
532 DefaultEpollIoRegistration registration = registrations.get(fd);
533 if (registration != null) {
534 registration.handle(ev);
535 } else {
536
537 try {
538 Native.epollCtlDel(epollFd.intValue(), fd);
539 } catch (IOException ignore) {
540
541
542
543
544 }
545 }
546 }
547 }
548 return timerFired;
549 }
550
551
552
553
554
555
556
557 @UnstableApi
558 public void closeFileDescriptors() {
559
560 while (pendingWakeup) {
561 try {
562 int count = epollWaitTimeboxed();
563 if (count == 0) {
564
565 break;
566 }
567 for (int i = 0; i < count; i++) {
568 if (events.fd(i) == eventFd.intValue()) {
569 pendingWakeup = false;
570 break;
571 }
572 }
573 } catch (IOException ignore) {
574
575 }
576 }
577 try {
578 eventFd.close();
579 } catch (IOException e) {
580 logger.warn("Failed to close the event fd.", e);
581 }
582 try {
583 timerFd.close();
584 } catch (IOException e) {
585 logger.warn("Failed to close the timer fd.", e);
586 }
587
588 try {
589 epollFd.close();
590 } catch (IOException e) {
591 logger.warn("Failed to close the epoll fd.", e);
592 }
593 }
594 }