1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.DefaultSelectStrategyFactory;
20 import io.netty.channel.IoHandlerContext;
21 import io.netty.channel.IoHandle;
22 import io.netty.channel.IoHandler;
23 import io.netty.channel.IoHandlerFactory;
24 import io.netty.channel.IoOps;
25 import io.netty.channel.IoRegistration;
26 import io.netty.channel.SelectStrategy;
27 import io.netty.channel.SelectStrategyFactory;
28 import io.netty.channel.unix.FileDescriptor;
29 import io.netty.util.IntSupplier;
30 import io.netty.util.collection.IntObjectHashMap;
31 import io.netty.util.collection.IntObjectMap;
32 import io.netty.util.concurrent.ThreadAwareExecutor;
33 import io.netty.util.internal.ObjectUtil;
34 import io.netty.util.internal.StringUtil;
35 import io.netty.util.internal.SystemPropertyUtil;
36 import io.netty.util.internal.UnstableApi;
37 import io.netty.util.internal.logging.InternalLogger;
38 import io.netty.util.internal.logging.InternalLoggerFactory;
39
40 import java.io.IOException;
41 import java.io.UncheckedIOException;
42 import java.util.ArrayList;
43 import java.util.Collections;
44 import java.util.List;
45 import java.util.concurrent.atomic.AtomicBoolean;
46 import java.util.concurrent.atomic.AtomicLong;
47
48 import static java.lang.Math.min;
49 import static java.lang.System.nanoTime;
50
51
52
53
54 public class EpollIoHandler implements IoHandler {
55 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollIoHandler.class);
56 private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
57 SystemPropertyUtil.getLong("io.netty.channel.epoll.epollWaitThreshold", 10);
58
59 static {
60
61
62 Epoll.ensureAvailability();
63 }
64
65
66 private long prevDeadlineNanos = nanoTime() - 1;
67 private FileDescriptor epollFd;
68 private FileDescriptor eventFd;
69 private FileDescriptor timerFd;
70 private final IntObjectMap<DefaultEpollIoRegistration> registrations = new IntObjectHashMap<>(4096);
71 private final boolean allowGrowing;
72 private final EpollEventArray events;
73 private final NativeArrays nativeArrays;
74
75 private final SelectStrategy selectStrategy;
76 private final IntSupplier selectNowSupplier = new IntSupplier() {
77 @Override
78 public int get() throws Exception {
79 return epollWaitNow();
80 }
81 };
82 private final ThreadAwareExecutor executor;
83
84 private static final long AWAKE = -1L;
85 private static final long NONE = Long.MAX_VALUE;
86
87
88
89
90
91 private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
92 private boolean pendingWakeup;
93
94 private int numChannels;
95
96
97 private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
98
99
100
101
102 public static IoHandlerFactory newFactory() {
103 return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
104 }
105
106
107
108
109 public static IoHandlerFactory newFactory(final int maxEvents,
110 final SelectStrategyFactory selectStrategyFactory) {
111 ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
112 ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
113 return new IoHandlerFactory() {
114 @Override
115 public IoHandler newHandler(ThreadAwareExecutor executor) {
116 return new EpollIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
117 }
118
119 @Override
120 public boolean isChangingThreadSupported() {
121 return true;
122 }
123 };
124 }
125
126
127 EpollIoHandler(ThreadAwareExecutor executor, int maxEvents, SelectStrategy strategy) {
128 this.executor = ObjectUtil.checkNotNull(executor, "executor");
129 selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
130 if (maxEvents == 0) {
131 allowGrowing = true;
132 events = new EpollEventArray(4096);
133 } else {
134 allowGrowing = false;
135 events = new EpollEventArray(maxEvents);
136 }
137 nativeArrays = new NativeArrays();
138 openFileDescriptors();
139 }
140
141 private static EpollIoHandle cast(IoHandle handle) {
142 if (handle instanceof EpollIoHandle) {
143 return (EpollIoHandle) handle;
144 }
145 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
146 }
147
148 private static EpollIoOps cast(IoOps ops) {
149 if (ops instanceof EpollIoOps) {
150 return (EpollIoOps) ops;
151 }
152 throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
153 }
154
155
156
157
158
159 @UnstableApi
160 public void openFileDescriptors() {
161 boolean success = false;
162 FileDescriptor epollFd = null;
163 FileDescriptor eventFd = null;
164 FileDescriptor timerFd = null;
165 try {
166 this.epollFd = epollFd = Native.newEpollCreate();
167 this.eventFd = eventFd = Native.newEventFd();
168 try {
169
170
171 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
172 } catch (IOException e) {
173 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
174 }
175 this.timerFd = timerFd = Native.newTimerFd();
176 try {
177
178
179 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
180 } catch (IOException e) {
181 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
182 }
183 success = true;
184 } finally {
185 if (!success) {
186 closeFileDescriptor(epollFd);
187 closeFileDescriptor(eventFd);
188 closeFileDescriptor(timerFd);
189 }
190 }
191 }
192
193 private static void closeFileDescriptor(FileDescriptor fd) {
194 if (fd != null) {
195 try {
196 fd.close();
197 } catch (Exception e) {
198
199 }
200 }
201 }
202
203 @Override
204 public void wakeup() {
205 if (!executor.isExecutorThread(Thread.currentThread()) && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
206
207 Native.eventFdWrite(eventFd.intValue(), 1L);
208 }
209 }
210
211 @Override
212 public void prepareToDestroy() {
213
214
215 DefaultEpollIoRegistration[] copy = registrations.values().toArray(new DefaultEpollIoRegistration[0]);
216
217 for (DefaultEpollIoRegistration reg: copy) {
218 reg.close();
219 }
220 }
221
222 @Override
223 public void destroy() {
224 try {
225 closeFileDescriptors();
226 } finally {
227 nativeArrays.free();
228 events.free();
229 }
230 }
231
232 private final class DefaultEpollIoRegistration implements IoRegistration {
233 private final ThreadAwareExecutor executor;
234 private final AtomicBoolean canceled = new AtomicBoolean();
235 final EpollIoHandle handle;
236
237 DefaultEpollIoRegistration(ThreadAwareExecutor executor, EpollIoHandle handle) {
238 this.executor = executor;
239 this.handle = handle;
240 }
241
242 @SuppressWarnings("unchecked")
243 @Override
244 public <T> T attachment() {
245 return (T) nativeArrays;
246 }
247
248 @Override
249 public long submit(IoOps ops) {
250 EpollIoOps epollIoOps = cast(ops);
251 try {
252 if (!isValid()) {
253 return -1;
254 }
255 Native.epollCtlMod(epollFd.intValue(), handle.fd().intValue(), epollIoOps.value);
256 return epollIoOps.value;
257 } catch (IOException e) {
258 throw new UncheckedIOException(e);
259 }
260 }
261
262 @Override
263 public boolean isValid() {
264 return !canceled.get();
265 }
266
267 @Override
268 public boolean cancel() {
269 if (!canceled.compareAndSet(false, true)) {
270 return false;
271 }
272 if (executor.isExecutorThread(Thread.currentThread())) {
273 cancel0();
274 } else {
275 executor.execute(this::cancel0);
276 }
277 return true;
278 }
279
280 private void cancel0() {
281 int fd = handle.fd().intValue();
282 DefaultEpollIoRegistration old = registrations.remove(fd);
283 if (old != null) {
284 if (old != this) {
285
286 registrations.put(fd, old);
287 return;
288 } else if (old.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
289 numChannels--;
290 }
291 if (handle.fd().isOpen()) {
292 try {
293
294
295 Native.epollCtlDel(epollFd.intValue(), fd);
296 } catch (IOException e) {
297 logger.debug("Unable to remove fd {} from epoll {}", fd, epollFd.intValue());
298 }
299 }
300 }
301 }
302
303 void close() {
304 try {
305 cancel();
306 } catch (Exception e) {
307 logger.debug("Exception during canceling " + this, e);
308 }
309 try {
310 handle.close();
311 } catch (Exception e) {
312 logger.debug("Exception during closing " + handle, e);
313 }
314 }
315
316 void handle(long ev) {
317 handle.handle(this, EpollIoOps.eventOf((int) ev));
318 }
319 }
320
321 @Override
322 public IoRegistration register(IoHandle handle)
323 throws Exception {
324 final EpollIoHandle epollHandle = cast(handle);
325 DefaultEpollIoRegistration registration = new DefaultEpollIoRegistration(executor, epollHandle);
326 int fd = epollHandle.fd().intValue();
327 Native.epollCtlAdd(epollFd.intValue(), fd, EpollIoOps.EPOLLERR.value);
328 DefaultEpollIoRegistration old = registrations.put(fd, registration);
329
330
331
332 assert old == null || !old.isValid();
333
334 if (epollHandle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
335 numChannels++;
336 }
337 return registration;
338 }
339
340 @Override
341 public boolean isCompatible(Class<? extends IoHandle> handleType) {
342 return EpollIoHandle.class.isAssignableFrom(handleType);
343 }
344
345 int numRegisteredChannels() {
346 return numChannels;
347 }
348
349 List<Channel> registeredChannelsList() {
350 IntObjectMap<DefaultEpollIoRegistration> ch = registrations;
351 if (ch.isEmpty()) {
352 return Collections.emptyList();
353 }
354
355 List<Channel> channels = new ArrayList<>(ch.size());
356 for (DefaultEpollIoRegistration registration : ch.values()) {
357 if (registration.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
358 channels.add(((AbstractEpollChannel.AbstractEpollUnsafe) registration.handle).channel());
359 }
360 }
361 return Collections.unmodifiableList(channels);
362 }
363
364 private long epollWait(IoHandlerContext context, long deadlineNanos) throws IOException {
365 if (deadlineNanos == NONE) {
366 return Native.epollWait(epollFd, events, timerFd,
367 Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD);
368 }
369 long totalDelay = context.delayNanos(System.nanoTime());
370 int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
371 int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
372 return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
373 }
374
375 private int epollWaitNoTimerChange() throws IOException {
376 return Native.epollWait(epollFd, events, false);
377 }
378
379 private int epollWaitNow() throws IOException {
380 return Native.epollWait(epollFd, events, true);
381 }
382
383 private int epollBusyWait() throws IOException {
384 return Native.epollBusyWait(epollFd, events);
385 }
386
387 private int epollWaitTimeboxed() throws IOException {
388
389 return Native.epollWait(epollFd, events, 1000);
390 }
391
392 @Override
393 public int run(IoHandlerContext context) {
394 int handled = 0;
395 try {
396 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
397 switch (strategy) {
398 case SelectStrategy.CONTINUE:
399 if (context.shouldReportActiveIoTime()) {
400 context.reportActiveIoTime(0);
401 }
402 return 0;
403
404 case SelectStrategy.BUSY_WAIT:
405 strategy = epollBusyWait();
406 break;
407
408 case SelectStrategy.SELECT:
409 if (pendingWakeup) {
410
411
412 strategy = epollWaitTimeboxed();
413 if (strategy != 0) {
414 break;
415 }
416
417
418 logger.warn("Missed eventfd write (not seen after > 1 second)");
419 pendingWakeup = false;
420 if (!context.canBlock()) {
421 break;
422 }
423
424 }
425
426 long curDeadlineNanos = context.deadlineNanos();
427 if (curDeadlineNanos == -1L) {
428 curDeadlineNanos = NONE;
429 }
430 nextWakeupNanos.set(curDeadlineNanos);
431 try {
432 if (context.canBlock()) {
433 if (curDeadlineNanos == prevDeadlineNanos) {
434
435 strategy = epollWaitNoTimerChange();
436 } else {
437
438 long result = epollWait(context, curDeadlineNanos);
439
440
441 strategy = Native.epollReady(result);
442 prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
443 }
444 }
445 } finally {
446
447
448 if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
449 pendingWakeup = true;
450 }
451 }
452
453 default:
454 }
455 if (strategy > 0) {
456 handled = strategy;
457 if (context.shouldReportActiveIoTime()) {
458 long activeIoStartTimeNanos = System.nanoTime();
459 if (processReady(events, strategy)) {
460 prevDeadlineNanos = NONE;
461 }
462 long activeIoEndTimeNanos = System.nanoTime();
463 context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
464 } else {
465 if (processReady(events, strategy)) {
466 prevDeadlineNanos = NONE;
467 }
468 }
469 } else if (context.shouldReportActiveIoTime()) {
470 context.reportActiveIoTime(0);
471 }
472
473 if (allowGrowing && strategy == events.length()) {
474
475 events.increase();
476 }
477 } catch (Error e) {
478 throw e;
479 } catch (Throwable t) {
480 handleLoopException(t);
481 }
482 return handled;
483 }
484
485
486
487
488 void handleLoopException(Throwable t) {
489 logger.warn("Unexpected exception in the selector loop.", t);
490
491
492
493 try {
494 Thread.sleep(1000);
495 } catch (InterruptedException e) {
496
497 }
498 }
499
500
501 private boolean processReady(EpollEventArray events, int ready) {
502 boolean timerFired = false;
503 for (int i = 0; i < ready; i ++) {
504 final int fd = events.fd(i);
505 if (fd == eventFd.intValue()) {
506 pendingWakeup = false;
507 } else if (fd == timerFd.intValue()) {
508 timerFired = true;
509 } else {
510 final long ev = events.events(i);
511
512 DefaultEpollIoRegistration registration = registrations.get(fd);
513 if (registration != null) {
514 registration.handle(ev);
515 } else {
516
517 try {
518 Native.epollCtlDel(epollFd.intValue(), fd);
519 } catch (IOException ignore) {
520
521
522
523
524 }
525 }
526 }
527 }
528 return timerFired;
529 }
530
531
532
533
534
535
536
537 @UnstableApi
538 public void closeFileDescriptors() {
539
540 while (pendingWakeup) {
541 try {
542 int count = epollWaitTimeboxed();
543 if (count == 0) {
544
545 break;
546 }
547 for (int i = 0; i < count; i++) {
548 if (events.fd(i) == eventFd.intValue()) {
549 pendingWakeup = false;
550 break;
551 }
552 }
553 } catch (IOException ignore) {
554
555 }
556 }
557 try {
558 eventFd.close();
559 } catch (IOException e) {
560 logger.warn("Failed to close the event fd.", e);
561 }
562 try {
563 timerFd.close();
564 } catch (IOException e) {
565 logger.warn("Failed to close the timer fd.", e);
566 }
567
568 try {
569 epollFd.close();
570 } catch (IOException e) {
571 logger.warn("Failed to close the epoll fd.", e);
572 }
573 }
574 }