1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.DefaultSelectStrategyFactory;
20 import io.netty.channel.EventLoop;
21 import io.netty.channel.IoEventLoop;
22 import io.netty.channel.IoExecutionContext;
23 import io.netty.channel.IoHandle;
24 import io.netty.channel.IoHandler;
25 import io.netty.channel.IoHandlerFactory;
26 import io.netty.channel.IoOps;
27 import io.netty.channel.SelectStrategy;
28 import io.netty.channel.SelectStrategyFactory;
29 import io.netty.channel.unix.FileDescriptor;
30 import io.netty.channel.unix.IovArray;
31 import io.netty.util.IntSupplier;
32 import io.netty.util.collection.IntObjectHashMap;
33 import io.netty.util.collection.IntObjectMap;
34 import io.netty.util.internal.ObjectUtil;
35 import io.netty.util.internal.StringUtil;
36 import io.netty.util.internal.SystemPropertyUtil;
37 import io.netty.util.internal.UnstableApi;
38 import io.netty.util.internal.logging.InternalLogger;
39 import io.netty.util.internal.logging.InternalLoggerFactory;
40
41 import java.io.IOException;
42 import java.util.ArrayList;
43 import java.util.Collections;
44 import java.util.List;
45 import java.util.concurrent.atomic.AtomicBoolean;
46 import java.util.concurrent.atomic.AtomicLong;
47
48 import static java.lang.Math.min;
49 import static java.lang.System.nanoTime;
50
51
52
53
54 public class EpollIoHandler implements IoHandler {
55 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollIoHandler.class);
56 private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
57 SystemPropertyUtil.getLong("io.netty.channel.epoll.epollWaitThreshold", 10);
58
59 static {
60
61
62 Epoll.ensureAvailability();
63 }
64
65
66 private long prevDeadlineNanos = nanoTime() - 1;
67 private FileDescriptor epollFd;
68 private FileDescriptor eventFd;
69 private FileDescriptor timerFd;
70 private final IntObjectMap<DefaultEpollIoRegistration> registrations = new IntObjectHashMap<>(4096);
71 private final boolean allowGrowing;
72 private final EpollEventArray events;
73
74
75 private IovArray iovArray;
76 private NativeDatagramPacketArray datagramPacketArray;
77
78 private final SelectStrategy selectStrategy;
79 private final IntSupplier selectNowSupplier = new IntSupplier() {
80 @Override
81 public int get() throws Exception {
82 return epollWaitNow();
83 }
84 };
85
86 private static final long AWAKE = -1L;
87 private static final long NONE = Long.MAX_VALUE;
88
89
90
91
92
93 private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
94 private boolean pendingWakeup;
95
96 private int numChannels;
97
98
99 private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
100
101
102
103
104 public static IoHandlerFactory newFactory() {
105 return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
106 }
107
108
109
110
111 public static IoHandlerFactory newFactory(final int maxEvents,
112 final SelectStrategyFactory selectStrategyFactory) {
113 ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
114 ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
115 return new IoHandlerFactory() {
116 @Override
117 public IoHandler newHandler() {
118 return new EpollIoHandler(maxEvents, selectStrategyFactory.newSelectStrategy());
119 }
120 };
121 }
122
123
124 EpollIoHandler(int maxEvents, SelectStrategy strategy) {
125 selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
126 if (maxEvents == 0) {
127 allowGrowing = true;
128 events = new EpollEventArray(4096);
129 } else {
130 allowGrowing = false;
131 events = new EpollEventArray(maxEvents);
132 }
133 openFileDescriptors();
134 }
135
136 private static EpollIoHandle cast(IoHandle handle) {
137 if (handle instanceof EpollIoHandle) {
138 return (EpollIoHandle) handle;
139 }
140 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
141 }
142
143 private static EpollIoOps cast(IoOps ops) {
144 if (ops instanceof EpollIoOps) {
145 return (EpollIoOps) ops;
146 }
147 throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
148 }
149
150
151
152
153
154 @UnstableApi
155 public void openFileDescriptors() {
156 boolean success = false;
157 FileDescriptor epollFd = null;
158 FileDescriptor eventFd = null;
159 FileDescriptor timerFd = null;
160 try {
161 this.epollFd = epollFd = Native.newEpollCreate();
162 this.eventFd = eventFd = Native.newEventFd();
163 try {
164
165
166 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
167 } catch (IOException e) {
168 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
169 }
170 this.timerFd = timerFd = Native.newTimerFd();
171 try {
172
173
174 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
175 } catch (IOException e) {
176 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
177 }
178 success = true;
179 } finally {
180 if (!success) {
181 closeFileDescriptor(epollFd);
182 closeFileDescriptor(eventFd);
183 closeFileDescriptor(timerFd);
184 }
185 }
186 }
187
188 private static void closeFileDescriptor(FileDescriptor fd) {
189 if (fd != null) {
190 try {
191 fd.close();
192 } catch (Exception e) {
193
194 }
195 }
196 }
197
198
199
200
201 IovArray cleanIovArray() {
202 if (iovArray == null) {
203 iovArray = new IovArray();
204 } else {
205 iovArray.clear();
206 }
207 return iovArray;
208 }
209
210
211
212
213 NativeDatagramPacketArray cleanDatagramPacketArray() {
214 if (datagramPacketArray == null) {
215 datagramPacketArray = new NativeDatagramPacketArray();
216 } else {
217 datagramPacketArray.clear();
218 }
219 return datagramPacketArray;
220 }
221
222 @Override
223 public void wakeup(IoEventLoop eventLoop) {
224 if (!eventLoop.inEventLoop() && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
225
226 Native.eventFdWrite(eventFd.intValue(), 1L);
227 }
228 }
229
230 @Override
231 public void prepareToDestroy() {
232
233
234 DefaultEpollIoRegistration[] copy = registrations.values().toArray(new DefaultEpollIoRegistration[0]);
235
236 for (DefaultEpollIoRegistration reg: copy) {
237 reg.close();
238 }
239 }
240
241 @Override
242 public void destroy() {
243 try {
244 closeFileDescriptors();
245 } finally {
246
247 if (iovArray != null) {
248 iovArray.release();
249 iovArray = null;
250 }
251 if (datagramPacketArray != null) {
252 datagramPacketArray.release();
253 datagramPacketArray = null;
254 }
255 events.free();
256 }
257 }
258
259 private final class DefaultEpollIoRegistration extends AtomicBoolean implements EpollIoRegistration {
260 private final IoEventLoop eventLoop;
261 final EpollIoHandle handle;
262
263 DefaultEpollIoRegistration(IoEventLoop eventLoop, EpollIoHandle handle) {
264 this.eventLoop = eventLoop;
265 this.handle = handle;
266 }
267
268 @Override
269 public long submit(IoOps ops) throws Exception {
270 EpollIoOps epollIoOps = cast(ops);
271 try {
272 if (!isValid()) {
273 return -1;
274 }
275 Native.epollCtlMod(epollFd.intValue(), handle.fd().intValue(), epollIoOps.value);
276 return epollIoOps.value;
277 } catch (IOException e) {
278 throw e;
279 } catch (Exception e) {
280 throw new IOException(e);
281 }
282 }
283
284 @Override
285 public EpollIoHandler ioHandler() {
286 return EpollIoHandler.this;
287 }
288
289 @Override
290 public boolean isValid() {
291 return !get();
292 }
293
294 @Override
295 public void cancel() throws IOException {
296 if (getAndSet(true)) {
297 return;
298 }
299 if (handle.fd().isOpen()) {
300
301
302 Native.epollCtlDel(epollFd.intValue(), handle.fd().intValue());
303 }
304 if (eventLoop.inEventLoop()) {
305 cancel0();
306 } else {
307 eventLoop.execute(this::cancel0);
308 }
309 }
310
311 private void cancel0() {
312 int fd = handle.fd().intValue();
313 DefaultEpollIoRegistration old = registrations.remove(fd);
314 if (old != null) {
315 if (old != this) {
316
317 registrations.put(fd, old);
318 } else if (old.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
319 numChannels--;
320 }
321 }
322 }
323
324 void close() {
325 try {
326 cancel();
327 } catch (Exception e) {
328 logger.debug("Exception during canceling " + this, e);
329 }
330 try {
331 handle.close();
332 } catch (Exception e) {
333 logger.debug("Exception during closing " + handle, e);
334 }
335 }
336
337 void handle(long ev) {
338 handle.handle(this, EpollIoOps.eventOf((int) ev));
339 }
340 }
341
342 @Override
343 public EpollIoRegistration register(IoEventLoop eventLoop, IoHandle handle)
344 throws Exception {
345 final EpollIoHandle epollHandle = cast(handle);
346 DefaultEpollIoRegistration registration = new DefaultEpollIoRegistration(eventLoop, epollHandle);
347 int fd = epollHandle.fd().intValue();
348 Native.epollCtlAdd(epollFd.intValue(), fd, EpollIoOps.EPOLLERR.value);
349 DefaultEpollIoRegistration old = registrations.put(fd, registration);
350
351
352
353 assert old == null || !old.isValid();
354
355 if (epollHandle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
356 numChannels++;
357 }
358 return registration;
359 }
360
361 @Override
362 public boolean isCompatible(Class<? extends IoHandle> handleType) {
363 return EpollIoHandle.class.isAssignableFrom(handleType);
364 }
365
366 int numRegisteredChannels() {
367 return numChannels;
368 }
369
370 List<Channel> registeredChannelsList() {
371 IntObjectMap<DefaultEpollIoRegistration> ch = registrations;
372 if (ch.isEmpty()) {
373 return Collections.emptyList();
374 }
375
376 List<Channel> channels = new ArrayList<>(ch.size());
377 for (DefaultEpollIoRegistration registration : ch.values()) {
378 if (registration.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
379 channels.add(((AbstractEpollChannel.AbstractEpollUnsafe) registration.handle).channel());
380 }
381 }
382 return Collections.unmodifiableList(channels);
383 }
384
385 private long epollWait(IoExecutionContext context, long deadlineNanos) throws IOException {
386 if (deadlineNanos == NONE) {
387 return Native.epollWait(epollFd, events, timerFd,
388 Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD);
389 }
390 long totalDelay = context.delayNanos(System.nanoTime());
391 int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
392 int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
393 return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
394 }
395
396 private int epollWaitNoTimerChange() throws IOException {
397 return Native.epollWait(epollFd, events, false);
398 }
399
400 private int epollWaitNow() throws IOException {
401 return Native.epollWait(epollFd, events, true);
402 }
403
404 private int epollBusyWait() throws IOException {
405 return Native.epollBusyWait(epollFd, events);
406 }
407
408 private int epollWaitTimeboxed() throws IOException {
409
410 return Native.epollWait(epollFd, events, 1000);
411 }
412
413 @Override
414 public int run(IoExecutionContext context) {
415 int handled = 0;
416 try {
417 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
418 switch (strategy) {
419 case SelectStrategy.CONTINUE:
420 return 0;
421
422 case SelectStrategy.BUSY_WAIT:
423 strategy = epollBusyWait();
424 break;
425
426 case SelectStrategy.SELECT:
427 if (pendingWakeup) {
428
429
430 strategy = epollWaitTimeboxed();
431 if (strategy != 0) {
432 break;
433 }
434
435
436 logger.warn("Missed eventfd write (not seen after > 1 second)");
437 pendingWakeup = false;
438 if (!context.canBlock()) {
439 break;
440 }
441
442 }
443
444 long curDeadlineNanos = context.deadlineNanos();
445 if (curDeadlineNanos == -1L) {
446 curDeadlineNanos = NONE;
447 }
448 nextWakeupNanos.set(curDeadlineNanos);
449 try {
450 if (context.canBlock()) {
451 if (curDeadlineNanos == prevDeadlineNanos) {
452
453 strategy = epollWaitNoTimerChange();
454 } else {
455
456 long result = epollWait(context, curDeadlineNanos);
457
458
459 strategy = Native.epollReady(result);
460 prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
461 }
462 }
463 } finally {
464
465
466 if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
467 pendingWakeup = true;
468 }
469 }
470
471 default:
472 }
473 if (strategy > 0) {
474 handled = strategy;
475 if (processReady(events, strategy)) {
476 prevDeadlineNanos = NONE;
477 }
478 }
479 if (allowGrowing && strategy == events.length()) {
480
481 events.increase();
482 }
483 } catch (Error e) {
484 throw e;
485 } catch (Throwable t) {
486 handleLoopException(t);
487 }
488 return handled;
489 }
490
491
492
493
494 void handleLoopException(Throwable t) {
495 logger.warn("Unexpected exception in the selector loop.", t);
496
497
498
499 try {
500 Thread.sleep(1000);
501 } catch (InterruptedException e) {
502
503 }
504 }
505
506
507 private boolean processReady(EpollEventArray events, int ready) {
508 boolean timerFired = false;
509 for (int i = 0; i < ready; i ++) {
510 final int fd = events.fd(i);
511 if (fd == eventFd.intValue()) {
512 pendingWakeup = false;
513 } else if (fd == timerFd.intValue()) {
514 timerFired = true;
515 } else {
516 final long ev = events.events(i);
517
518 DefaultEpollIoRegistration registration = registrations.get(fd);
519 if (registration != null) {
520 registration.handle(ev);
521 } else {
522
523 try {
524 Native.epollCtlDel(epollFd.intValue(), fd);
525 } catch (IOException ignore) {
526
527
528
529
530 }
531 }
532 }
533 }
534 return timerFired;
535 }
536
537
538
539
540
541
542
543 @UnstableApi
544 public void closeFileDescriptors() {
545
546 while (pendingWakeup) {
547 try {
548 int count = epollWaitTimeboxed();
549 if (count == 0) {
550
551 break;
552 }
553 for (int i = 0; i < count; i++) {
554 if (events.fd(i) == eventFd.intValue()) {
555 pendingWakeup = false;
556 break;
557 }
558 }
559 } catch (IOException ignore) {
560
561 }
562 }
563 try {
564 eventFd.close();
565 } catch (IOException e) {
566 logger.warn("Failed to close the event fd.", e);
567 }
568 try {
569 timerFd.close();
570 } catch (IOException e) {
571 logger.warn("Failed to close the timer fd.", e);
572 }
573
574 try {
575 epollFd.close();
576 } catch (IOException e) {
577 logger.warn("Failed to close the epoll fd.", e);
578 }
579 }
580 }