1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.EventLoop;
20 import io.netty.channel.EventLoopGroup;
21 import io.netty.channel.EventLoopTaskQueueFactory;
22 import io.netty.channel.SelectStrategy;
23 import io.netty.channel.SingleThreadEventLoop;
24 import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe;
25 import io.netty.channel.unix.FileDescriptor;
26 import io.netty.channel.unix.IovArray;
27 import io.netty.util.IntSupplier;
28 import io.netty.util.collection.IntObjectHashMap;
29 import io.netty.util.collection.IntObjectMap;
30 import io.netty.util.concurrent.RejectedExecutionHandler;
31 import io.netty.util.internal.ObjectUtil;
32 import io.netty.util.internal.PlatformDependent;
33 import io.netty.util.internal.SystemPropertyUtil;
34 import io.netty.util.internal.UnstableApi;
35 import io.netty.util.internal.logging.InternalLogger;
36 import io.netty.util.internal.logging.InternalLoggerFactory;
37
38 import java.io.IOException;
39 import java.util.Iterator;
40 import java.util.Queue;
41 import java.util.concurrent.Executor;
42 import java.util.concurrent.atomic.AtomicLong;
43
44 import static java.lang.Math.min;
45
46
47
48
49 public class EpollEventLoop extends SingleThreadEventLoop {
50 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
51 private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
52 SystemPropertyUtil.getLong("io.netty.channel.epoll.epollWaitThreshold", 10);
53
54 static {
55
56
57 Epoll.ensureAvailability();
58 }
59
60 private FileDescriptor epollFd;
61 private FileDescriptor eventFd;
62 private FileDescriptor timerFd;
63 private final IntObjectMap<AbstractEpollChannel> channels = new IntObjectHashMap<AbstractEpollChannel>(4096);
64 private final boolean allowGrowing;
65 private final EpollEventArray events;
66
67
68 private IovArray iovArray;
69 private NativeDatagramPacketArray datagramPacketArray;
70
71 private final SelectStrategy selectStrategy;
72 private final IntSupplier selectNowSupplier = new IntSupplier() {
73 @Override
74 public int get() throws Exception {
75 return epollWaitNow();
76 }
77 };
78
79 private static final long AWAKE = -1L;
80 private static final long NONE = Long.MAX_VALUE;
81
82
83
84
85
86 private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
87 private boolean pendingWakeup;
88 private volatile int ioRatio = 50;
89
90
91 private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
92
93 EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
94 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
95 EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
96 super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
97 rejectedExecutionHandler);
98 selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
99 if (maxEvents == 0) {
100 allowGrowing = true;
101 events = new EpollEventArray(4096);
102 } else {
103 allowGrowing = false;
104 events = new EpollEventArray(maxEvents);
105 }
106 openFileDescriptors();
107 }
108
109
110
111
112
113 @UnstableApi
114 public void openFileDescriptors() {
115 boolean success = false;
116 FileDescriptor epollFd = null;
117 FileDescriptor eventFd = null;
118 FileDescriptor timerFd = null;
119 try {
120 this.epollFd = epollFd = Native.newEpollCreate();
121 this.eventFd = eventFd = Native.newEventFd();
122 try {
123
124
125 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
126 } catch (IOException e) {
127 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
128 }
129 this.timerFd = timerFd = Native.newTimerFd();
130 try {
131
132
133 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
134 } catch (IOException e) {
135 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
136 }
137 success = true;
138 } finally {
139 if (!success) {
140 if (epollFd != null) {
141 try {
142 epollFd.close();
143 } catch (Exception e) {
144
145 }
146 }
147 if (eventFd != null) {
148 try {
149 eventFd.close();
150 } catch (Exception e) {
151
152 }
153 }
154 if (timerFd != null) {
155 try {
156 timerFd.close();
157 } catch (Exception e) {
158
159 }
160 }
161 }
162 }
163 }
164
165 private static Queue<Runnable> newTaskQueue(
166 EventLoopTaskQueueFactory queueFactory) {
167 if (queueFactory == null) {
168 return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
169 }
170 return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
171 }
172
173
174
175
176 IovArray cleanIovArray() {
177 if (iovArray == null) {
178 iovArray = new IovArray();
179 } else {
180 iovArray.clear();
181 }
182 return iovArray;
183 }
184
185
186
187
188 NativeDatagramPacketArray cleanDatagramPacketArray() {
189 if (datagramPacketArray == null) {
190 datagramPacketArray = new NativeDatagramPacketArray();
191 } else {
192 datagramPacketArray.clear();
193 }
194 return datagramPacketArray;
195 }
196
197 @Override
198 protected void wakeup(boolean inEventLoop) {
199 if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
200
201 Native.eventFdWrite(eventFd.intValue(), 1L);
202 }
203 }
204
205 @Override
206 protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
207
208 return deadlineNanos < nextWakeupNanos.get();
209 }
210
211 @Override
212 protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
213
214 return deadlineNanos < nextWakeupNanos.get();
215 }
216
217
218
219
220 void add(AbstractEpollChannel ch) throws IOException {
221 assert inEventLoop();
222 int fd = ch.socket.intValue();
223 Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags);
224 AbstractEpollChannel old = channels.put(fd, ch);
225
226
227
228 assert old == null || !old.isOpen();
229 }
230
231
232
233
234 void modify(AbstractEpollChannel ch) throws IOException {
235 assert inEventLoop();
236 Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags);
237 }
238
239
240
241
242 void remove(AbstractEpollChannel ch) throws IOException {
243 assert inEventLoop();
244 int fd = ch.socket.intValue();
245
246 AbstractEpollChannel old = channels.remove(fd);
247 if (old != null && old != ch) {
248
249 channels.put(fd, old);
250
251
252 assert !ch.isOpen();
253 } else if (ch.isOpen()) {
254
255
256 Native.epollCtlDel(epollFd.intValue(), fd);
257 }
258 }
259
260 @Override
261 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
262 return newTaskQueue0(maxPendingTasks);
263 }
264
265 private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
266
267 return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
268 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
269 }
270
271
272
273
274 public int getIoRatio() {
275 return ioRatio;
276 }
277
278
279
280
281
282 public void setIoRatio(int ioRatio) {
283 if (ioRatio <= 0 || ioRatio > 100) {
284 throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
285 }
286 this.ioRatio = ioRatio;
287 }
288
289 @Override
290 public int registeredChannels() {
291 return channels.size();
292 }
293
294 @Override
295 public Iterator<Channel> registeredChannelsIterator() {
296 assert inEventLoop();
297 IntObjectMap<AbstractEpollChannel> ch = channels;
298 if (ch.isEmpty()) {
299 return ChannelsReadOnlyIterator.empty();
300 }
301 return new ChannelsReadOnlyIterator<AbstractEpollChannel>(ch.values());
302 }
303
304 private long epollWait(long deadlineNanos) throws IOException {
305 if (deadlineNanos == NONE) {
306 return Native.epollWait(epollFd, events, timerFd,
307 Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD);
308 }
309 long totalDelay = deadlineToDelayNanos(deadlineNanos);
310 int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
311 int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
312 return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
313 }
314
315 private int epollWaitNoTimerChange() throws IOException {
316 return Native.epollWait(epollFd, events, false);
317 }
318
319 private int epollWaitNow() throws IOException {
320 return Native.epollWait(epollFd, events, true);
321 }
322
323 private int epollBusyWait() throws IOException {
324 return Native.epollBusyWait(epollFd, events);
325 }
326
327 private int epollWaitTimeboxed() throws IOException {
328
329 return Native.epollWait(epollFd, events, 1000);
330 }
331
332 @Override
333 protected void run() {
334 long prevDeadlineNanos = NONE;
335 for (;;) {
336 try {
337 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
338 switch (strategy) {
339 case SelectStrategy.CONTINUE:
340 continue;
341
342 case SelectStrategy.BUSY_WAIT:
343 strategy = epollBusyWait();
344 break;
345
346 case SelectStrategy.SELECT:
347 if (pendingWakeup) {
348
349
350 strategy = epollWaitTimeboxed();
351 if (strategy != 0) {
352 break;
353 }
354
355
356 logger.warn("Missed eventfd write (not seen after > 1 second)");
357 pendingWakeup = false;
358 if (hasTasks()) {
359 break;
360 }
361
362 }
363
364 long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
365 if (curDeadlineNanos == -1L) {
366 curDeadlineNanos = NONE;
367 }
368 nextWakeupNanos.set(curDeadlineNanos);
369 try {
370 if (!hasTasks()) {
371 if (curDeadlineNanos == prevDeadlineNanos) {
372
373 strategy = epollWaitNoTimerChange();
374 } else {
375
376 long result = epollWait(curDeadlineNanos);
377
378
379 strategy = Native.epollReady(result);
380 prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
381 }
382 }
383 } finally {
384
385
386 if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
387 pendingWakeup = true;
388 }
389 }
390
391 default:
392 }
393
394 final int ioRatio = this.ioRatio;
395 if (ioRatio == 100) {
396 try {
397 if (strategy > 0 && processReady(events, strategy)) {
398 prevDeadlineNanos = NONE;
399 }
400 } finally {
401
402 runAllTasks();
403 }
404 } else if (strategy > 0) {
405 final long ioStartTime = System.nanoTime();
406 try {
407 if (processReady(events, strategy)) {
408 prevDeadlineNanos = NONE;
409 }
410 } finally {
411
412 final long ioTime = System.nanoTime() - ioStartTime;
413 runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
414 }
415 } else {
416 runAllTasks(0);
417 }
418 if (allowGrowing && strategy == events.length()) {
419
420 events.increase();
421 }
422 } catch (Error e) {
423 throw e;
424 } catch (Throwable t) {
425 handleLoopException(t);
426 } finally {
427
428 try {
429 if (isShuttingDown()) {
430 closeAll();
431 if (confirmShutdown()) {
432 break;
433 }
434 }
435 } catch (Error e) {
436 throw e;
437 } catch (Throwable t) {
438 handleLoopException(t);
439 }
440 }
441 }
442 }
443
444
445
446
447 void handleLoopException(Throwable t) {
448 logger.warn("Unexpected exception in the selector loop.", t);
449
450
451
452 try {
453 Thread.sleep(1000);
454 } catch (InterruptedException e) {
455
456 }
457 }
458
459 private void closeAll() {
460
461
462 AbstractEpollChannel[] localChannels = channels.values().toArray(new AbstractEpollChannel[0]);
463
464 for (AbstractEpollChannel ch: localChannels) {
465 ch.unsafe().close(ch.unsafe().voidPromise());
466 }
467 }
468
469
470 private boolean processReady(EpollEventArray events, int ready) {
471 boolean timerFired = false;
472 for (int i = 0; i < ready; i ++) {
473 final int fd = events.fd(i);
474 if (fd == eventFd.intValue()) {
475 pendingWakeup = false;
476 } else if (fd == timerFd.intValue()) {
477 timerFired = true;
478 } else {
479 final long ev = events.events(i);
480
481 AbstractEpollChannel ch = channels.get(fd);
482 if (ch != null) {
483
484
485
486
487 AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
488
489
490
491
492
493
494
495
496
497 if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) {
498
499 unsafe.epollOutReady();
500 }
501
502
503
504
505
506
507 if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) {
508
509 unsafe.epollInReady();
510 }
511
512
513
514
515 if ((ev & Native.EPOLLRDHUP) != 0) {
516 unsafe.epollRdHupReady();
517 }
518 } else {
519
520 try {
521 Native.epollCtlDel(epollFd.intValue(), fd);
522 } catch (IOException ignore) {
523
524
525
526
527 }
528 }
529 }
530 }
531 return timerFired;
532 }
533
534 @Override
535 protected void cleanup() {
536 try {
537 closeFileDescriptors();
538 } finally {
539
540 if (iovArray != null) {
541 iovArray.release();
542 iovArray = null;
543 }
544 if (datagramPacketArray != null) {
545 datagramPacketArray.release();
546 datagramPacketArray = null;
547 }
548 events.free();
549 }
550 }
551
552
553
554
555
556
557
558 @UnstableApi
559 public void closeFileDescriptors() {
560
561 while (pendingWakeup) {
562 try {
563 int count = epollWaitTimeboxed();
564 if (count == 0) {
565
566 break;
567 }
568 for (int i = 0; i < count; i++) {
569 if (events.fd(i) == eventFd.intValue()) {
570 pendingWakeup = false;
571 break;
572 }
573 }
574 } catch (IOException ignore) {
575
576 }
577 }
578 try {
579 eventFd.close();
580 } catch (IOException e) {
581 logger.warn("Failed to close the event fd.", e);
582 }
583 try {
584 timerFd.close();
585 } catch (IOException e) {
586 logger.warn("Failed to close the timer fd.", e);
587 }
588
589 try {
590 epollFd.close();
591 } catch (IOException e) {
592 logger.warn("Failed to close the epoll fd.", e);
593 }
594 }
595 }