1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.DefaultSelectStrategyFactory;
20 import io.netty.channel.IoHandlerContext;
21 import io.netty.channel.IoHandle;
22 import io.netty.channel.IoHandler;
23 import io.netty.channel.IoHandlerFactory;
24 import io.netty.channel.IoOps;
25 import io.netty.channel.IoRegistration;
26 import io.netty.channel.SelectStrategy;
27 import io.netty.channel.SelectStrategyFactory;
28 import io.netty.channel.unix.FileDescriptor;
29 import io.netty.util.IntSupplier;
30 import io.netty.util.collection.IntObjectHashMap;
31 import io.netty.util.collection.IntObjectMap;
32 import io.netty.util.concurrent.ThreadAwareExecutor;
33 import io.netty.util.internal.ObjectUtil;
34 import io.netty.util.internal.StringUtil;
35 import io.netty.util.internal.SystemPropertyUtil;
36 import io.netty.util.internal.UnstableApi;
37 import io.netty.util.internal.logging.InternalLogger;
38 import io.netty.util.internal.logging.InternalLoggerFactory;
39
40 import java.io.IOException;
41 import java.io.UncheckedIOException;
42 import java.util.ArrayList;
43 import java.util.Collections;
44 import java.util.List;
45 import java.util.concurrent.atomic.AtomicBoolean;
46 import java.util.concurrent.atomic.AtomicLong;
47
48 import static java.lang.Math.min;
49 import static java.lang.System.nanoTime;
50
51
52
53
54 public class EpollIoHandler implements IoHandler {
55 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollIoHandler.class);
56 private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
57 SystemPropertyUtil.getLong("io.netty.channel.epoll.epollWaitThreshold", 10);
58
59 {
60 Epoll.ensureAvailability();
61 }
62
63
64 private long prevDeadlineNanos = nanoTime() - 1;
65 private FileDescriptor epollFd;
66 private FileDescriptor eventFd;
67 private FileDescriptor timerFd;
68 private final IntObjectMap<DefaultEpollIoRegistration> registrations = new IntObjectHashMap<>(4096);
69 private final boolean allowGrowing;
70 private final EpollEventArray events;
71 private final NativeArrays nativeArrays;
72
73 private final SelectStrategy selectStrategy;
74 private final IntSupplier selectNowSupplier = new IntSupplier() {
75 @Override
76 public int get() throws Exception {
77 return epollWaitNow();
78 }
79 };
80 private final ThreadAwareExecutor executor;
81
82 private static final long AWAKE = -1L;
83 private static final long NONE = Long.MAX_VALUE;
84
85
86
87
88
89 private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
90 private boolean pendingWakeup;
91
92 private int numChannels;
93
94
95 private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
96
97
98
99
100 public static IoHandlerFactory newFactory() {
101 return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
102 }
103
104
105
106
107 public static IoHandlerFactory newFactory(final int maxEvents,
108 final SelectStrategyFactory selectStrategyFactory) {
109 Epoll.ensureAvailability();
110 ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
111 ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
112 return new IoHandlerFactory() {
113 @Override
114 public IoHandler newHandler(ThreadAwareExecutor executor) {
115 return new EpollIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
116 }
117
118 @Override
119 public boolean isChangingThreadSupported() {
120 return true;
121 }
122 };
123 }
124
125
126 EpollIoHandler(ThreadAwareExecutor executor, int maxEvents, SelectStrategy strategy) {
127 this.executor = ObjectUtil.checkNotNull(executor, "executor");
128 selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
129 if (maxEvents == 0) {
130 allowGrowing = true;
131 events = new EpollEventArray(4096);
132 } else {
133 allowGrowing = false;
134 events = new EpollEventArray(maxEvents);
135 }
136 nativeArrays = new NativeArrays();
137 openFileDescriptors();
138 }
139
140 private static EpollIoHandle cast(IoHandle handle) {
141 if (handle instanceof EpollIoHandle) {
142 return (EpollIoHandle) handle;
143 }
144 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
145 }
146
147 private static EpollIoOps cast(IoOps ops) {
148 if (ops instanceof EpollIoOps) {
149 return (EpollIoOps) ops;
150 }
151 throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
152 }
153
154
155
156
157
158 @UnstableApi
159 public void openFileDescriptors() {
160 boolean success = false;
161 FileDescriptor epollFd = null;
162 FileDescriptor eventFd = null;
163 FileDescriptor timerFd = null;
164 try {
165 this.epollFd = epollFd = Native.newEpollCreate();
166 this.eventFd = eventFd = Native.newEventFd();
167 try {
168
169
170 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
171 } catch (IOException e) {
172 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
173 }
174 this.timerFd = timerFd = Native.newTimerFd();
175 try {
176
177
178 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
179 } catch (IOException e) {
180 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
181 }
182 success = true;
183 } finally {
184 if (!success) {
185 closeFileDescriptor(epollFd);
186 closeFileDescriptor(eventFd);
187 closeFileDescriptor(timerFd);
188 }
189 }
190 }
191
192 private static void closeFileDescriptor(FileDescriptor fd) {
193 if (fd != null) {
194 try {
195 fd.close();
196 } catch (Exception e) {
197
198 }
199 }
200 }
201
202 @Override
203 public void wakeup() {
204 if (!executor.isExecutorThread(Thread.currentThread()) && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
205
206 Native.eventFdWrite(eventFd.intValue(), 1L);
207 }
208 }
209
210 @Override
211 public void prepareToDestroy() {
212
213
214 DefaultEpollIoRegistration[] copy = registrations.values().toArray(new DefaultEpollIoRegistration[0]);
215
216 for (DefaultEpollIoRegistration reg: copy) {
217 reg.close();
218 }
219 }
220
221 @Override
222 public void destroy() {
223 try {
224 closeFileDescriptors();
225 } finally {
226 nativeArrays.free();
227 events.free();
228 }
229 }
230
231 private final class DefaultEpollIoRegistration implements IoRegistration {
232 private final ThreadAwareExecutor executor;
233 private final AtomicBoolean canceled = new AtomicBoolean();
234 final EpollIoHandle handle;
235
236 DefaultEpollIoRegistration(ThreadAwareExecutor executor, EpollIoHandle handle) {
237 this.executor = executor;
238 this.handle = handle;
239 }
240
241 @SuppressWarnings("unchecked")
242 @Override
243 public <T> T attachment() {
244 return (T) nativeArrays;
245 }
246
247 @Override
248 public long submit(IoOps ops) {
249 EpollIoOps epollIoOps = cast(ops);
250 try {
251 if (!isValid()) {
252 return -1;
253 }
254 Native.epollCtlMod(epollFd.intValue(), handle.fd().intValue(), epollIoOps.value);
255 return epollIoOps.value;
256 } catch (IOException e) {
257 throw new UncheckedIOException(e);
258 }
259 }
260
261 @Override
262 public boolean isValid() {
263 return !canceled.get();
264 }
265
266 @Override
267 public boolean cancel() {
268 if (!canceled.compareAndSet(false, true)) {
269 return false;
270 }
271 if (executor.isExecutorThread(Thread.currentThread())) {
272 cancel0();
273 } else {
274 executor.execute(this::cancel0);
275 }
276 return true;
277 }
278
279 private void cancel0() {
280 int fd = handle.fd().intValue();
281 DefaultEpollIoRegistration old = registrations.remove(fd);
282 if (old != null) {
283 if (old != this) {
284
285 registrations.put(fd, old);
286 return;
287 } else if (old.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
288 numChannels--;
289 }
290 if (handle.fd().isOpen()) {
291 try {
292
293
294 Native.epollCtlDel(epollFd.intValue(), fd);
295 } catch (IOException e) {
296 logger.debug("Unable to remove fd {} from epoll {}", fd, epollFd.intValue());
297 }
298 }
299 handle.unregistered();
300 }
301 }
302
303 void close() {
304 try {
305 cancel();
306 } catch (Exception e) {
307 logger.debug("Exception during canceling " + this, e);
308 }
309 try {
310 handle.close();
311 } catch (Exception e) {
312 logger.debug("Exception during closing " + handle, e);
313 }
314 }
315
316 void handle(long ev) {
317 handle.handle(this, EpollIoOps.eventOf((int) ev));
318 }
319 }
320
321 @Override
322 public IoRegistration register(IoHandle handle)
323 throws Exception {
324 final EpollIoHandle epollHandle = cast(handle);
325 DefaultEpollIoRegistration registration = new DefaultEpollIoRegistration(executor, epollHandle);
326 int fd = epollHandle.fd().intValue();
327 Native.epollCtlAdd(epollFd.intValue(), fd, EpollIoOps.EPOLLERR.value);
328 DefaultEpollIoRegistration old = registrations.put(fd, registration);
329
330
331
332 assert old == null || !old.isValid();
333
334 if (epollHandle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
335 numChannels++;
336 }
337 handle.registered();
338 return registration;
339 }
340
341 @Override
342 public boolean isCompatible(Class<? extends IoHandle> handleType) {
343 return EpollIoHandle.class.isAssignableFrom(handleType);
344 }
345
346 int numRegisteredChannels() {
347 return numChannels;
348 }
349
350 List<Channel> registeredChannelsList() {
351 IntObjectMap<DefaultEpollIoRegistration> ch = registrations;
352 if (ch.isEmpty()) {
353 return Collections.emptyList();
354 }
355
356 List<Channel> channels = new ArrayList<>(ch.size());
357 for (DefaultEpollIoRegistration registration : ch.values()) {
358 if (registration.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
359 channels.add(((AbstractEpollChannel.AbstractEpollUnsafe) registration.handle).channel());
360 }
361 }
362 return Collections.unmodifiableList(channels);
363 }
364
365 private long epollWait(IoHandlerContext context, long deadlineNanos) throws IOException {
366 if (deadlineNanos == NONE) {
367 return Native.epollWait(epollFd, events, timerFd,
368 Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD);
369 }
370 long totalDelay = context.delayNanos(System.nanoTime());
371 int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
372 int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
373 return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
374 }
375
376 private int epollWaitNoTimerChange() throws IOException {
377 return Native.epollWait(epollFd, events, false);
378 }
379
380 private int epollWaitNow() throws IOException {
381 return Native.epollWait(epollFd, events, true);
382 }
383
384 private int epollBusyWait() throws IOException {
385 return Native.epollBusyWait(epollFd, events);
386 }
387
388 private int epollWaitTimeboxed() throws IOException {
389
390 return Native.epollWait(epollFd, events, 1000);
391 }
392
393 @Override
394 public int run(IoHandlerContext context) {
395 int handled = 0;
396 try {
397 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
398 switch (strategy) {
399 case SelectStrategy.CONTINUE:
400 if (context.shouldReportActiveIoTime()) {
401 context.reportActiveIoTime(0);
402 }
403 return 0;
404
405 case SelectStrategy.BUSY_WAIT:
406 strategy = epollBusyWait();
407 break;
408
409 case SelectStrategy.SELECT:
410 if (pendingWakeup) {
411
412
413 strategy = epollWaitTimeboxed();
414 if (strategy != 0) {
415 break;
416 }
417
418
419 logger.warn("Missed eventfd write (not seen after > 1 second)");
420 pendingWakeup = false;
421 if (!context.canBlock()) {
422 break;
423 }
424
425 }
426
427 long curDeadlineNanos = context.deadlineNanos();
428 if (curDeadlineNanos == -1L) {
429 curDeadlineNanos = NONE;
430 }
431 nextWakeupNanos.set(curDeadlineNanos);
432 try {
433 if (context.canBlock()) {
434 if (curDeadlineNanos == prevDeadlineNanos) {
435
436 strategy = epollWaitNoTimerChange();
437 } else {
438
439 long result = epollWait(context, curDeadlineNanos);
440
441
442 strategy = Native.epollReady(result);
443 prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
444 }
445 }
446 } finally {
447
448
449 if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
450 pendingWakeup = true;
451 }
452 }
453
454 default:
455 }
456 if (strategy > 0) {
457 handled = strategy;
458 if (context.shouldReportActiveIoTime()) {
459 long activeIoStartTimeNanos = System.nanoTime();
460 if (processReady(events, strategy)) {
461 prevDeadlineNanos = NONE;
462 }
463 long activeIoEndTimeNanos = System.nanoTime();
464 context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
465 } else {
466 if (processReady(events, strategy)) {
467 prevDeadlineNanos = NONE;
468 }
469 }
470 } else if (context.shouldReportActiveIoTime()) {
471 context.reportActiveIoTime(0);
472 }
473
474 if (allowGrowing && strategy == events.length()) {
475
476 events.increase();
477 }
478 } catch (Error e) {
479 throw e;
480 } catch (Throwable t) {
481 handleLoopException(t);
482 }
483 return handled;
484 }
485
486
487
488
489 void handleLoopException(Throwable t) {
490 logger.warn("Unexpected exception in the selector loop.", t);
491
492
493
494 try {
495 Thread.sleep(1000);
496 } catch (InterruptedException e) {
497
498 }
499 }
500
501
502 private boolean processReady(EpollEventArray events, int ready) {
503 boolean timerFired = false;
504 for (int i = 0; i < ready; i ++) {
505 final int fd = events.fd(i);
506 if (fd == eventFd.intValue()) {
507 pendingWakeup = false;
508 } else if (fd == timerFd.intValue()) {
509 timerFired = true;
510 } else {
511 final long ev = events.events(i);
512
513 DefaultEpollIoRegistration registration = registrations.get(fd);
514 if (registration != null) {
515 registration.handle(ev);
516 } else {
517
518 try {
519 Native.epollCtlDel(epollFd.intValue(), fd);
520 } catch (IOException ignore) {
521
522
523
524
525 }
526 }
527 }
528 }
529 return timerFired;
530 }
531
532
533
534
535
536
537
538 @UnstableApi
539 public void closeFileDescriptors() {
540
541 while (pendingWakeup) {
542 try {
543 int count = epollWaitTimeboxed();
544 if (count == 0) {
545
546 break;
547 }
548 for (int i = 0; i < count; i++) {
549 if (events.fd(i) == eventFd.intValue()) {
550 pendingWakeup = false;
551 break;
552 }
553 }
554 } catch (IOException ignore) {
555
556 }
557 }
558 try {
559 eventFd.close();
560 } catch (IOException e) {
561 logger.warn("Failed to close the event fd.", e);
562 }
563 try {
564 timerFd.close();
565 } catch (IOException e) {
566 logger.warn("Failed to close the timer fd.", e);
567 }
568
569 try {
570 epollFd.close();
571 } catch (IOException e) {
572 logger.warn("Failed to close the epoll fd.", e);
573 }
574 }
575 }