1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.DefaultSelectStrategyFactory;
20 import io.netty.channel.IoHandlerContext;
21 import io.netty.channel.IoHandle;
22 import io.netty.channel.IoHandler;
23 import io.netty.channel.IoHandlerFactory;
24 import io.netty.channel.IoOps;
25 import io.netty.channel.IoRegistration;
26 import io.netty.channel.SelectStrategy;
27 import io.netty.channel.SelectStrategyFactory;
28 import io.netty.channel.unix.FileDescriptor;
29 import io.netty.util.IntSupplier;
30 import io.netty.util.collection.IntObjectHashMap;
31 import io.netty.util.collection.IntObjectMap;
32 import io.netty.util.concurrent.ThreadAwareExecutor;
33 import io.netty.util.internal.ObjectUtil;
34 import io.netty.util.internal.StringUtil;
35 import io.netty.util.internal.SystemPropertyUtil;
36 import io.netty.util.internal.UnstableApi;
37 import io.netty.util.internal.logging.InternalLogger;
38 import io.netty.util.internal.logging.InternalLoggerFactory;
39
40 import java.io.IOException;
41 import java.io.UncheckedIOException;
42 import java.util.ArrayList;
43 import java.util.Collections;
44 import java.util.List;
45 import java.util.concurrent.atomic.AtomicLong;
46
47 import static java.lang.Math.min;
48
49
50
51
52 public class EpollIoHandler implements IoHandler {
53 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollIoHandler.class);
54 private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
55 SystemPropertyUtil.getLong("io.netty.channel.epoll.epollWaitThreshold", 10);
56
57 {
58 Epoll.ensureAvailability();
59 }
60
61
62 private long prevDeadlineNanos = NONE;
63 private FileDescriptor epollFd;
64 private FileDescriptor eventFd;
65 private FileDescriptor timerFd;
66 private final IntObjectMap<DefaultEpollIoRegistration> registrations = new IntObjectHashMap<>(4096);
67 private final boolean allowGrowing;
68 private final EpollEventArray events;
69 private final NativeArrays nativeArrays;
70
71 private final SelectStrategy selectStrategy;
72 private final IntSupplier selectNowSupplier = new IntSupplier() {
73 @Override
74 public int get() throws Exception {
75 return epollWaitNow();
76 }
77 };
78 private final ThreadAwareExecutor executor;
79
80 private static final long AWAKE = -1L;
81 private static final long NONE = Long.MAX_VALUE;
82
83
84
85
86
87 private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
88 private boolean pendingWakeup;
89
90 private int numChannels;
91
92
93 private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
94
95
96
97
98 public static IoHandlerFactory newFactory() {
99 return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
100 }
101
102
103
104
105 public static IoHandlerFactory newFactory(final int maxEvents,
106 final SelectStrategyFactory selectStrategyFactory) {
107 Epoll.ensureAvailability();
108 ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
109 ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
110 return new IoHandlerFactory() {
111 @Override
112 public IoHandler newHandler(ThreadAwareExecutor executor) {
113 return new EpollIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
114 }
115
116 @Override
117 public boolean isChangingThreadSupported() {
118 return true;
119 }
120 };
121 }
122
123
124 EpollIoHandler(ThreadAwareExecutor executor, int maxEvents, SelectStrategy strategy) {
125 this.executor = ObjectUtil.checkNotNull(executor, "executor");
126 selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
127 if (maxEvents == 0) {
128 allowGrowing = true;
129 events = new EpollEventArray(4096);
130 } else {
131 allowGrowing = false;
132 events = new EpollEventArray(maxEvents);
133 }
134 nativeArrays = new NativeArrays();
135 openFileDescriptors();
136 }
137
138 private static EpollIoHandle cast(IoHandle handle) {
139 if (handle instanceof EpollIoHandle) {
140 return (EpollIoHandle) handle;
141 }
142 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
143 }
144
145 private static EpollIoOps cast(IoOps ops) {
146 if (ops instanceof EpollIoOps) {
147 return (EpollIoOps) ops;
148 }
149 throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
150 }
151
152
153
154
155
156 @UnstableApi
157 public void openFileDescriptors() {
158 boolean success = false;
159 FileDescriptor epollFd = null;
160 FileDescriptor eventFd = null;
161 FileDescriptor timerFd = null;
162 try {
163 this.epollFd = epollFd = Native.newEpollCreate();
164 this.eventFd = eventFd = Native.newEventFd();
165 try {
166
167
168 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
169 } catch (IOException e) {
170 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
171 }
172 this.timerFd = timerFd = Native.newTimerFd();
173 try {
174
175
176 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
177 } catch (IOException e) {
178 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
179 }
180 success = true;
181 } finally {
182 if (!success) {
183 closeFileDescriptor(epollFd);
184 closeFileDescriptor(eventFd);
185 closeFileDescriptor(timerFd);
186 }
187 }
188 }
189
190 private static void closeFileDescriptor(FileDescriptor fd) {
191 if (fd != null) {
192 try {
193 fd.close();
194 } catch (Exception e) {
195
196 }
197 }
198 }
199
200 @Override
201 public void wakeup() {
202 if (!executor.isExecutorThread(Thread.currentThread()) && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
203
204 Native.eventFdWrite(eventFd.intValue(), 1L);
205 }
206 }
207
208 @Override
209 public void prepareToDestroy() {
210
211
212 DefaultEpollIoRegistration[] copy = registrations.values().toArray(new DefaultEpollIoRegistration[0]);
213
214 for (DefaultEpollIoRegistration reg: copy) {
215 reg.close();
216 }
217 }
218
219 @Override
220 public void destroy() {
221 try {
222 closeFileDescriptors();
223 } finally {
224 nativeArrays.free();
225 events.free();
226 }
227 }
228
229 private enum RegistrationState {
230
231 Pending,
232
233 Added,
234
235 Cancelled
236 }
237
238 private final class DefaultEpollIoRegistration
239 implements IoRegistration {
240 private final ThreadAwareExecutor executor;
241 private RegistrationState state = RegistrationState.Pending;
242 final EpollIoHandle handle;
243
244 DefaultEpollIoRegistration(ThreadAwareExecutor executor, EpollIoHandle handle) {
245 this.executor = executor;
246 this.handle = handle;
247 }
248
249 @SuppressWarnings("unchecked")
250 @Override
251 public <T> T attachment() {
252 return (T) nativeArrays;
253 }
254
255 @Override
256 public long submit(IoOps ops) {
257 EpollIoOps epollIoOps = cast(ops);
258 try {
259 synchronized (this) {
260 switch (state) {
261 case Cancelled:
262 return -1;
263 case Pending:
264 if (epollIoOps.value == EpollIoOps.NONE.value) {
265
266
267 return 0;
268 }
269 Native.epollCtlAdd(epollFd.intValue(), handle.fd().intValue(), epollIoOps.value);
270 state = RegistrationState.Added;
271 return epollIoOps.value;
272 case Added:
273 if (epollIoOps.value == EpollIoOps.NONE.value) {
274
275
276 Native.epollCtlDel(epollFd.intValue(), handle.fd().intValue());
277 return 0;
278 }
279 Native.epollCtlMod(epollFd.intValue(), handle.fd().intValue(), epollIoOps.value);
280 return epollIoOps.value;
281 default:
282 throw new IllegalStateException();
283 }
284 }
285 } catch (IOException e) {
286 throw new UncheckedIOException(e);
287 }
288 }
289
290 @Override
291 public synchronized boolean isValid() {
292 return state != RegistrationState.Cancelled;
293 }
294
295 @Override
296 public boolean cancel() {
297 synchronized (this) {
298 if (state == RegistrationState.Cancelled) {
299 return false;
300 }
301 state = RegistrationState.Cancelled;
302 }
303 if (executor.isExecutorThread(Thread.currentThread())) {
304 cancel0();
305 } else {
306 executor.execute(this::cancel0);
307 }
308 return true;
309 }
310
311 private void cancel0() {
312 int fd = handle.fd().intValue();
313 DefaultEpollIoRegistration old = registrations.remove(fd);
314 if (old != null) {
315 if (old != this) {
316
317 registrations.put(fd, old);
318 return;
319 } else if (old.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
320 numChannels--;
321 }
322 if (handle.fd().isOpen()) {
323 try {
324
325
326 Native.epollCtlDel(epollFd.intValue(), fd);
327 } catch (IOException e) {
328 logger.debug("Unable to remove fd {} from epoll {}", fd, epollFd.intValue());
329 }
330 }
331 handle.unregistered();
332 }
333 }
334
335 void close() {
336 try {
337 cancel();
338 } catch (Exception e) {
339 logger.debug("Exception during canceling " + this, e);
340 }
341 try {
342 handle.close();
343 } catch (Exception e) {
344 logger.debug("Exception during closing " + handle, e);
345 }
346 }
347
348 void handle(long ev) {
349 handle.handle(this, EpollIoOps.eventOf((int) ev));
350 }
351 }
352
353 @Override
354 public IoRegistration register(IoHandle handle)
355 throws Exception {
356 final EpollIoHandle epollHandle = cast(handle);
357 DefaultEpollIoRegistration registration = new DefaultEpollIoRegistration(executor, epollHandle);
358 int fd = epollHandle.fd().intValue();
359 DefaultEpollIoRegistration old = registrations.put(fd, registration);
360
361
362
363 assert old == null || !old.isValid();
364
365 if (epollHandle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
366 numChannels++;
367 }
368 handle.registered();
369 return registration;
370 }
371
372 @Override
373 public boolean isCompatible(Class<? extends IoHandle> handleType) {
374 return EpollIoHandle.class.isAssignableFrom(handleType);
375 }
376
377 int numRegisteredChannels() {
378 return numChannels;
379 }
380
381 List<Channel> registeredChannelsList() {
382 IntObjectMap<DefaultEpollIoRegistration> ch = registrations;
383 if (ch.isEmpty()) {
384 return Collections.emptyList();
385 }
386
387 List<Channel> channels = new ArrayList<>(ch.size());
388 for (DefaultEpollIoRegistration registration : ch.values()) {
389 if (registration.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
390 channels.add(((AbstractEpollChannel.AbstractEpollUnsafe) registration.handle).channel());
391 }
392 }
393 return Collections.unmodifiableList(channels);
394 }
395
396 private long epollWait(IoHandlerContext context, long deadlineNanos) throws IOException {
397 if (deadlineNanos == NONE) {
398 return Native.epollWait(epollFd, events, timerFd,
399 Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD);
400 }
401 long totalDelay = context.delayNanos(System.nanoTime());
402 int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
403 int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
404 return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
405 }
406
407 private int epollWaitNoTimerChange() throws IOException {
408 return Native.epollWait(epollFd, events, false);
409 }
410
411 private int epollWaitNow() throws IOException {
412 return Native.epollWait(epollFd, events, true);
413 }
414
415 private int epollBusyWait() throws IOException {
416 return Native.epollBusyWait(epollFd, events);
417 }
418
419 private int epollWaitTimeboxed() throws IOException {
420
421 return Native.epollWait(epollFd, events, 1000);
422 }
423
424 @Override
425 public int run(IoHandlerContext context) {
426 int handled = 0;
427 try {
428 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
429 switch (strategy) {
430 case SelectStrategy.CONTINUE:
431 if (context.shouldReportActiveIoTime()) {
432 context.reportActiveIoTime(0);
433 }
434 return 0;
435
436 case SelectStrategy.BUSY_WAIT:
437 strategy = epollBusyWait();
438 break;
439
440 case SelectStrategy.SELECT:
441 if (pendingWakeup) {
442
443
444 strategy = epollWaitTimeboxed();
445 if (strategy != 0) {
446 break;
447 }
448
449
450 logger.warn("Missed eventfd write (not seen after > 1 second)");
451 pendingWakeup = false;
452 if (!context.canBlock()) {
453 break;
454 }
455
456 }
457
458 long curDeadlineNanos = context.deadlineNanos();
459 if (curDeadlineNanos == -1L) {
460 curDeadlineNanos = NONE;
461 }
462 nextWakeupNanos.set(curDeadlineNanos);
463 try {
464 if (context.canBlock()) {
465 if (curDeadlineNanos == prevDeadlineNanos) {
466
467 strategy = epollWaitNoTimerChange();
468 } else {
469
470 long result = epollWait(context, curDeadlineNanos);
471
472
473 strategy = Native.epollReady(result);
474 prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
475 }
476 }
477 } finally {
478
479
480 if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
481 pendingWakeup = true;
482 }
483 }
484
485 default:
486 }
487 if (strategy > 0) {
488 handled = strategy;
489 if (context.shouldReportActiveIoTime()) {
490 long activeIoStartTimeNanos = System.nanoTime();
491 if (processReady(events, strategy)) {
492 prevDeadlineNanos = NONE;
493 }
494 long activeIoEndTimeNanos = System.nanoTime();
495 context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
496 } else {
497 if (processReady(events, strategy)) {
498 prevDeadlineNanos = NONE;
499 }
500 }
501 } else if (context.shouldReportActiveIoTime()) {
502 context.reportActiveIoTime(0);
503 }
504
505 if (allowGrowing && strategy == events.length()) {
506
507 events.increase();
508 }
509 } catch (Error e) {
510 throw e;
511 } catch (Throwable t) {
512 handleLoopException(t);
513 }
514 return handled;
515 }
516
517
518
519
520 void handleLoopException(Throwable t) {
521 logger.warn("Unexpected exception in the selector loop.", t);
522
523
524
525 try {
526 Thread.sleep(1000);
527 } catch (InterruptedException e) {
528
529 }
530 }
531
532
533 private boolean processReady(EpollEventArray events, int ready) {
534 boolean timerFired = false;
535 for (int i = 0; i < ready; i ++) {
536 final int fd = events.fd(i);
537 if (fd == eventFd.intValue()) {
538 pendingWakeup = false;
539 } else if (fd == timerFd.intValue()) {
540 timerFired = true;
541 } else {
542 final long ev = events.events(i);
543
544 DefaultEpollIoRegistration registration = registrations.get(fd);
545 if (registration != null) {
546 registration.handle(ev);
547 } else {
548
549 try {
550 Native.epollCtlDel(epollFd.intValue(), fd);
551 } catch (IOException ignore) {
552
553
554
555
556 }
557 }
558 }
559 }
560 return timerFired;
561 }
562
563
564
565
566
567
568
569 @UnstableApi
570 public void closeFileDescriptors() {
571
572 while (pendingWakeup) {
573 try {
574 int count = epollWaitTimeboxed();
575 if (count == 0) {
576
577 break;
578 }
579 for (int i = 0; i < count; i++) {
580 if (events.fd(i) == eventFd.intValue()) {
581 pendingWakeup = false;
582 break;
583 }
584 }
585 } catch (IOException ignore) {
586
587 }
588 }
589 try {
590 eventFd.close();
591 } catch (IOException e) {
592 logger.warn("Failed to close the event fd.", e);
593 }
594 try {
595 timerFd.close();
596 } catch (IOException e) {
597 logger.warn("Failed to close the timer fd.", e);
598 }
599
600 try {
601 epollFd.close();
602 } catch (IOException e) {
603 logger.warn("Failed to close the epoll fd.", e);
604 }
605 }
606 }