1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.DefaultSelectStrategyFactory;
20 import io.netty.channel.EventLoop;
21 import io.netty.channel.IoEventLoop;
22 import io.netty.channel.IoExecutionContext;
23 import io.netty.channel.IoHandle;
24 import io.netty.channel.IoHandler;
25 import io.netty.channel.IoHandlerFactory;
26 import io.netty.channel.IoOps;
27 import io.netty.channel.SelectStrategy;
28 import io.netty.channel.SelectStrategyFactory;
29 import io.netty.channel.unix.FileDescriptor;
30 import io.netty.channel.unix.IovArray;
31 import io.netty.util.IntSupplier;
32 import io.netty.util.collection.IntObjectHashMap;
33 import io.netty.util.collection.IntObjectMap;
34 import io.netty.util.concurrent.Future;
35 import io.netty.util.concurrent.Promise;
36 import io.netty.util.internal.ObjectUtil;
37 import io.netty.util.internal.StringUtil;
38 import io.netty.util.internal.SystemPropertyUtil;
39 import io.netty.util.internal.UnstableApi;
40 import io.netty.util.internal.logging.InternalLogger;
41 import io.netty.util.internal.logging.InternalLoggerFactory;
42
43 import java.io.IOException;
44 import java.util.ArrayList;
45 import java.util.Collections;
46 import java.util.List;
47 import java.util.concurrent.atomic.AtomicLong;
48
49 import static java.lang.Math.min;
50 import static java.lang.System.nanoTime;
51
52
53
54
55 public class EpollIoHandler implements IoHandler {
56 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollIoHandler.class);
57 private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
58 SystemPropertyUtil.getLong("io.netty.channel.epoll.epollWaitThreshold", 10);
59
60 static {
61
62
63 Epoll.ensureAvailability();
64 }
65
66
67 private long prevDeadlineNanos = nanoTime() - 1;
68 private FileDescriptor epollFd;
69 private FileDescriptor eventFd;
70 private FileDescriptor timerFd;
71 private final IntObjectMap<DefaultEpollIoRegistration> registrations = new IntObjectHashMap<>(4096);
72 private final boolean allowGrowing;
73 private final EpollEventArray events;
74
75
76 private IovArray iovArray;
77 private NativeDatagramPacketArray datagramPacketArray;
78
79 private final SelectStrategy selectStrategy;
80 private final IntSupplier selectNowSupplier = new IntSupplier() {
81 @Override
82 public int get() throws Exception {
83 return epollWaitNow();
84 }
85 };
86
87 private static final long AWAKE = -1L;
88 private static final long NONE = Long.MAX_VALUE;
89
90
91
92
93
94 private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
95 private boolean pendingWakeup;
96
97 private int numChannels;
98
99
100 private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
101
102
103
104
105 public static IoHandlerFactory newFactory() {
106 return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
107 }
108
109
110
111
112 public static IoHandlerFactory newFactory(final int maxEvents,
113 final SelectStrategyFactory selectStrategyFactory) {
114 ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
115 ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
116 return new IoHandlerFactory() {
117 @Override
118 public IoHandler newHandler() {
119 return new EpollIoHandler(maxEvents, selectStrategyFactory.newSelectStrategy());
120 }
121 };
122 }
123
124
125 EpollIoHandler(int maxEvents, SelectStrategy strategy) {
126 selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
127 if (maxEvents == 0) {
128 allowGrowing = true;
129 events = new EpollEventArray(4096);
130 } else {
131 allowGrowing = false;
132 events = new EpollEventArray(maxEvents);
133 }
134 openFileDescriptors();
135 }
136
137 private static EpollIoHandle cast(IoHandle handle) {
138 if (handle instanceof EpollIoHandle) {
139 return (EpollIoHandle) handle;
140 }
141 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
142 }
143
144 private static EpollIoOps cast(IoOps ops) {
145 if (ops instanceof EpollIoOps) {
146 return (EpollIoOps) ops;
147 }
148 throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
149 }
150
151
152
153
154
155 @UnstableApi
156 public void openFileDescriptors() {
157 boolean success = false;
158 FileDescriptor epollFd = null;
159 FileDescriptor eventFd = null;
160 FileDescriptor timerFd = null;
161 try {
162 this.epollFd = epollFd = Native.newEpollCreate();
163 this.eventFd = eventFd = Native.newEventFd();
164 try {
165
166
167 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
168 } catch (IOException e) {
169 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
170 }
171 this.timerFd = timerFd = Native.newTimerFd();
172 try {
173
174
175 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
176 } catch (IOException e) {
177 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
178 }
179 success = true;
180 } finally {
181 if (!success) {
182 closeFileDescriptor(epollFd);
183 closeFileDescriptor(eventFd);
184 closeFileDescriptor(timerFd);
185 }
186 }
187 }
188
189 private static void closeFileDescriptor(FileDescriptor fd) {
190 if (fd != null) {
191 try {
192 fd.close();
193 } catch (Exception e) {
194
195 }
196 }
197 }
198
199
200
201
202 IovArray cleanIovArray() {
203 if (iovArray == null) {
204 iovArray = new IovArray();
205 } else {
206 iovArray.clear();
207 }
208 return iovArray;
209 }
210
211
212
213
214 NativeDatagramPacketArray cleanDatagramPacketArray() {
215 if (datagramPacketArray == null) {
216 datagramPacketArray = new NativeDatagramPacketArray();
217 } else {
218 datagramPacketArray.clear();
219 }
220 return datagramPacketArray;
221 }
222
223 @Override
224 public void wakeup(IoEventLoop eventLoop) {
225 if (!eventLoop.inEventLoop() && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
226
227 Native.eventFdWrite(eventFd.intValue(), 1L);
228 }
229 }
230
231 @Override
232 public void prepareToDestroy() {
233
234
235 DefaultEpollIoRegistration[] copy = registrations.values().toArray(new DefaultEpollIoRegistration[0]);
236
237 for (DefaultEpollIoRegistration reg: copy) {
238 reg.close();
239 }
240 }
241
242 @Override
243 public void destroy() {
244 try {
245 closeFileDescriptors();
246 } finally {
247
248 if (iovArray != null) {
249 iovArray.release();
250 iovArray = null;
251 }
252 if (datagramPacketArray != null) {
253 datagramPacketArray.release();
254 datagramPacketArray = null;
255 }
256 events.free();
257 }
258 }
259
260 private final class DefaultEpollIoRegistration implements EpollIoRegistration {
261 private final Promise<?> cancellationPromise;
262 private final IoEventLoop eventLoop;
263 final EpollIoHandle handle;
264
265 DefaultEpollIoRegistration(IoEventLoop eventLoop, EpollIoHandle handle) {
266 this.eventLoop = eventLoop;
267 this.handle = handle;
268 this.cancellationPromise = eventLoop.newPromise();
269 }
270
271 @Override
272 public long submit(IoOps ops) throws Exception {
273 EpollIoOps epollIoOps = cast(ops);
274 try {
275 if (!isValid()) {
276 return -1;
277 }
278 Native.epollCtlMod(epollFd.intValue(), handle.fd().intValue(), epollIoOps.value);
279 return epollIoOps.value;
280 } catch (IOException e) {
281 throw e;
282 } catch (Exception e) {
283 throw new IOException(e);
284 }
285 }
286
287 @Override
288 public EpollIoHandler ioHandler() {
289 return EpollIoHandler.this;
290 }
291
292 @Override
293 public void cancel() throws IOException {
294 if (!cancellationPromise.trySuccess(null)) {
295 return;
296 }
297 if (handle.fd().isOpen()) {
298
299
300 Native.epollCtlDel(epollFd.intValue(), handle.fd().intValue());
301 }
302 if (eventLoop.inEventLoop()) {
303 cancel0();
304 } else {
305 eventLoop.execute(this::cancel0);
306 }
307 }
308
309 private void cancel0() {
310 int fd = handle.fd().intValue();
311 DefaultEpollIoRegistration old = registrations.remove(fd);
312 if (old != null) {
313 if (old != this) {
314
315 registrations.put(fd, old);
316 } else if (old.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
317 numChannels--;
318 }
319 }
320 }
321
322 void close() {
323 try {
324 cancel();
325 } catch (Exception e) {
326 logger.debug("Exception during canceling " + this, e);
327 }
328 try {
329 handle.close();
330 } catch (Exception e) {
331 logger.debug("Exception during closing " + handle, e);
332 }
333 }
334
335 @Override
336 public Future<?> cancelFuture() {
337 return cancellationPromise;
338 }
339
340 void handle(long ev) {
341 handle.handle(this, EpollIoOps.eventOf((int) ev));
342 }
343 }
344
345 @Override
346 public EpollIoRegistration register(IoEventLoop eventLoop, IoHandle handle)
347 throws Exception {
348 final EpollIoHandle epollHandle = cast(handle);
349 DefaultEpollIoRegistration registration = new DefaultEpollIoRegistration(eventLoop, epollHandle);
350 int fd = epollHandle.fd().intValue();
351 Native.epollCtlAdd(epollFd.intValue(), fd, EpollIoOps.EPOLLERR.value);
352 DefaultEpollIoRegistration old = registrations.put(fd, registration);
353
354
355
356 assert old == null || !old.isValid();
357
358 if (epollHandle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
359 numChannels++;
360 }
361 return registration;
362 }
363
364 @Override
365 public boolean isCompatible(Class<? extends IoHandle> handleType) {
366 return EpollIoHandle.class.isAssignableFrom(handleType);
367 }
368
369 int numRegisteredChannels() {
370 return numChannels;
371 }
372
373 List<Channel> registeredChannelsList() {
374 IntObjectMap<DefaultEpollIoRegistration> ch = registrations;
375 if (ch.isEmpty()) {
376 return Collections.emptyList();
377 }
378
379 List<Channel> channels = new ArrayList<>(ch.size());
380 for (DefaultEpollIoRegistration registration : ch.values()) {
381 if (registration.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
382 channels.add(((AbstractEpollChannel.AbstractEpollUnsafe) registration.handle).channel());
383 }
384 }
385 return Collections.unmodifiableList(channels);
386 }
387
388 private long epollWait(IoExecutionContext context, long deadlineNanos) throws IOException {
389 if (deadlineNanos == NONE) {
390 return Native.epollWait(epollFd, events, timerFd,
391 Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD);
392 }
393 long totalDelay = context.delayNanos(System.nanoTime());
394 int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
395 int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
396 return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
397 }
398
399 private int epollWaitNoTimerChange() throws IOException {
400 return Native.epollWait(epollFd, events, false);
401 }
402
403 private int epollWaitNow() throws IOException {
404 return Native.epollWait(epollFd, events, true);
405 }
406
407 private int epollBusyWait() throws IOException {
408 return Native.epollBusyWait(epollFd, events);
409 }
410
411 private int epollWaitTimeboxed() throws IOException {
412
413 return Native.epollWait(epollFd, events, 1000);
414 }
415
416 @Override
417 public int run(IoExecutionContext context) {
418 int handled = 0;
419 try {
420 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
421 switch (strategy) {
422 case SelectStrategy.CONTINUE:
423 return 0;
424
425 case SelectStrategy.BUSY_WAIT:
426 strategy = epollBusyWait();
427 break;
428
429 case SelectStrategy.SELECT:
430 if (pendingWakeup) {
431
432
433 strategy = epollWaitTimeboxed();
434 if (strategy != 0) {
435 break;
436 }
437
438
439 logger.warn("Missed eventfd write (not seen after > 1 second)");
440 pendingWakeup = false;
441 if (!context.canBlock()) {
442 break;
443 }
444
445 }
446
447 long curDeadlineNanos = context.deadlineNanos();
448 if (curDeadlineNanos == -1L) {
449 curDeadlineNanos = NONE;
450 }
451 nextWakeupNanos.set(curDeadlineNanos);
452 try {
453 if (context.canBlock()) {
454 if (curDeadlineNanos == prevDeadlineNanos) {
455
456 strategy = epollWaitNoTimerChange();
457 } else {
458
459 long result = epollWait(context, curDeadlineNanos);
460
461
462 strategy = Native.epollReady(result);
463 prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
464 }
465 }
466 } finally {
467
468
469 if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
470 pendingWakeup = true;
471 }
472 }
473
474 default:
475 }
476 if (strategy > 0) {
477 handled = strategy;
478 if (processReady(events, strategy)) {
479 prevDeadlineNanos = NONE;
480 }
481 }
482 if (allowGrowing && strategy == events.length()) {
483
484 events.increase();
485 }
486 } catch (Error e) {
487 throw e;
488 } catch (Throwable t) {
489 handleLoopException(t);
490 }
491 return handled;
492 }
493
494
495
496
497 void handleLoopException(Throwable t) {
498 logger.warn("Unexpected exception in the selector loop.", t);
499
500
501
502 try {
503 Thread.sleep(1000);
504 } catch (InterruptedException e) {
505
506 }
507 }
508
509
510 private boolean processReady(EpollEventArray events, int ready) {
511 boolean timerFired = false;
512 for (int i = 0; i < ready; i ++) {
513 final int fd = events.fd(i);
514 if (fd == eventFd.intValue()) {
515 pendingWakeup = false;
516 } else if (fd == timerFd.intValue()) {
517 timerFired = true;
518 } else {
519 final long ev = events.events(i);
520
521 DefaultEpollIoRegistration registration = registrations.get(fd);
522 if (registration != null) {
523 registration.handle(ev);
524 } else {
525
526 try {
527 Native.epollCtlDel(epollFd.intValue(), fd);
528 } catch (IOException ignore) {
529
530
531
532
533 }
534 }
535 }
536 }
537 return timerFired;
538 }
539
540
541
542
543
544
545
546 @UnstableApi
547 public void closeFileDescriptors() {
548
549 while (pendingWakeup) {
550 try {
551 int count = epollWaitTimeboxed();
552 if (count == 0) {
553
554 break;
555 }
556 for (int i = 0; i < count; i++) {
557 if (events.fd(i) == eventFd.intValue()) {
558 pendingWakeup = false;
559 break;
560 }
561 }
562 } catch (IOException ignore) {
563
564 }
565 }
566 try {
567 eventFd.close();
568 } catch (IOException e) {
569 logger.warn("Failed to close the event fd.", e);
570 }
571 try {
572 timerFd.close();
573 } catch (IOException e) {
574 logger.warn("Failed to close the timer fd.", e);
575 }
576
577 try {
578 epollFd.close();
579 } catch (IOException e) {
580 logger.warn("Failed to close the epoll fd.", e);
581 }
582 }
583 }