View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.EventLoop;
20  import io.netty.channel.EventLoopGroup;
21  import io.netty.channel.EventLoopTaskQueueFactory;
22  import io.netty.channel.SelectStrategy;
23  import io.netty.channel.SingleThreadEventLoop;
24  import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe;
25  import io.netty.channel.unix.FileDescriptor;
26  import io.netty.channel.unix.IovArray;
27  import io.netty.util.IntSupplier;
28  import io.netty.util.collection.IntObjectHashMap;
29  import io.netty.util.collection.IntObjectMap;
30  import io.netty.util.concurrent.RejectedExecutionHandler;
31  import io.netty.util.internal.ObjectUtil;
32  import io.netty.util.internal.PlatformDependent;
33  import io.netty.util.internal.SystemPropertyUtil;
34  import io.netty.util.internal.UnstableApi;
35  import io.netty.util.internal.logging.InternalLogger;
36  import io.netty.util.internal.logging.InternalLoggerFactory;
37  
38  import java.io.IOException;
39  import java.util.Iterator;
40  import java.util.Queue;
41  import java.util.concurrent.Executor;
42  import java.util.concurrent.atomic.AtomicLong;
43  
44  import static java.lang.Math.min;
45  
46  /**
47   * {@link EventLoop} which uses epoll under the covers. Only works on Linux!
48   */
49  public class EpollEventLoop extends SingleThreadEventLoop {
50      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
51      private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
52              SystemPropertyUtil.getLong("io.netty.channel.epoll.epollWaitThreshold", 10);
53  
54      static {
55          // Ensure JNI is initialized by the time this class is loaded by this time!
56          // We use unix-common methods in this class which are backed by JNI methods.
57          Epoll.ensureAvailability();
58      }
59  
60      private FileDescriptor epollFd;
61      private FileDescriptor eventFd;
62      private FileDescriptor timerFd;
63      private final IntObjectMap<AbstractEpollChannel> channels = new IntObjectHashMap<AbstractEpollChannel>(4096);
64      private final boolean allowGrowing;
65      private final EpollEventArray events;
66  
67      // These are initialized on first use
68      private IovArray iovArray;
69      private NativeDatagramPacketArray datagramPacketArray;
70  
71      private final SelectStrategy selectStrategy;
72      private final IntSupplier selectNowSupplier = new IntSupplier() {
73          @Override
74          public int get() throws Exception {
75              return epollWaitNow();
76          }
77      };
78  
79      private static final long AWAKE = -1L;
80      private static final long NONE = Long.MAX_VALUE;
81  
82      // nextWakeupNanos is:
83      //    AWAKE            when EL is awake
84      //    NONE             when EL is waiting with no wakeup scheduled
85      //    other value T    when EL is waiting with wakeup scheduled at time T
86      private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
87      private boolean pendingWakeup;
88      private volatile int ioRatio = 50;
89  
90      // See https://man7.org/linux/man-pages/man2/timerfd_create.2.html.
91      private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
92  
93      EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
94                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
95                     EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
96          super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
97                  rejectedExecutionHandler);
98          selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
99          if (maxEvents == 0) {
100             allowGrowing = true;
101             events = new EpollEventArray(4096);
102         } else {
103             allowGrowing = false;
104             events = new EpollEventArray(maxEvents);
105         }
106         openFileDescriptors();
107     }
108 
109     /**
110      * This method is intended for use by a process checkpoint/restore
111      * integration, such as OpenJDK CRaC.
112      */
113     @UnstableApi
114     public void openFileDescriptors() {
115         boolean success = false;
116         FileDescriptor epollFd = null;
117         FileDescriptor eventFd = null;
118         FileDescriptor timerFd = null;
119         try {
120             this.epollFd = epollFd = Native.newEpollCreate();
121             this.eventFd = eventFd = Native.newEventFd();
122             try {
123                 // It is important to use EPOLLET here as we only want to get the notification once per
124                 // wakeup and don't call eventfd_read(...).
125                 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
126             } catch (IOException e) {
127                 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
128             }
129             this.timerFd = timerFd = Native.newTimerFd();
130             try {
131                 // It is important to use EPOLLET here as we only want to get the notification once per
132                 // wakeup and don't call read(...).
133                 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
134             } catch (IOException e) {
135                 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
136             }
137             success = true;
138         } finally {
139             if (!success) {
140                 if (epollFd != null) {
141                     try {
142                         epollFd.close();
143                     } catch (Exception e) {
144                         // ignore
145                     }
146                 }
147                 if (eventFd != null) {
148                     try {
149                         eventFd.close();
150                     } catch (Exception e) {
151                         // ignore
152                     }
153                 }
154                 if (timerFd != null) {
155                     try {
156                         timerFd.close();
157                     } catch (Exception e) {
158                         // ignore
159                     }
160                 }
161             }
162         }
163     }
164 
165     private static Queue<Runnable> newTaskQueue(
166             EventLoopTaskQueueFactory queueFactory) {
167         if (queueFactory == null) {
168             return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
169         }
170         return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
171     }
172 
173     /**
174      * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
175      */
176     IovArray cleanIovArray() {
177         if (iovArray == null) {
178             iovArray = new IovArray();
179         } else {
180             iovArray.clear();
181         }
182         return iovArray;
183     }
184 
185     /**
186      * Return a cleared {@link NativeDatagramPacketArray} that can be used for writes in this {@link EventLoop}.
187      */
188     NativeDatagramPacketArray cleanDatagramPacketArray() {
189         if (datagramPacketArray == null) {
190             datagramPacketArray = new NativeDatagramPacketArray();
191         } else {
192             datagramPacketArray.clear();
193         }
194         return datagramPacketArray;
195     }
196 
197     @Override
198     protected void wakeup(boolean inEventLoop) {
199         if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
200             // write to the evfd which will then wake-up epoll_wait(...)
201             Native.eventFdWrite(eventFd.intValue(), 1L);
202         }
203     }
204 
205     @Override
206     protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
207         // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
208         return deadlineNanos < nextWakeupNanos.get();
209     }
210 
211     @Override
212     protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
213         // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
214         return deadlineNanos < nextWakeupNanos.get();
215     }
216 
217     /**
218      * Register the given epoll with this {@link EventLoop}.
219      */
220     void add(AbstractEpollChannel ch) throws IOException {
221         assert inEventLoop();
222         int fd = ch.socket.intValue();
223         Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags);
224         AbstractEpollChannel old = channels.put(fd, ch);
225 
226         // We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already
227         // closed.
228         assert old == null || !old.isOpen();
229     }
230 
231     /**
232      * The flags of the given epoll was modified so update the registration
233      */
234     void modify(AbstractEpollChannel ch) throws IOException {
235         assert inEventLoop();
236         Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags);
237     }
238 
239     /**
240      * Deregister the given epoll from this {@link EventLoop}.
241      */
242     void remove(AbstractEpollChannel ch) throws IOException {
243         assert inEventLoop();
244         int fd = ch.socket.intValue();
245 
246         AbstractEpollChannel old = channels.remove(fd);
247         if (old != null && old != ch) {
248             // The Channel mapping was already replaced due FD reuse, put back the stored Channel.
249             channels.put(fd, old);
250 
251             // If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed.
252             assert !ch.isOpen();
253         } else if (ch.isOpen()) {
254             // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
255             // removed once the file-descriptor is closed.
256             Native.epollCtlDel(epollFd.intValue(), fd);
257         }
258     }
259 
260     @Override
261     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
262         return newTaskQueue0(maxPendingTasks);
263     }
264 
265     private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
266         // This event loop never calls takeTask()
267         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
268                 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
269     }
270 
271     /**
272      * Returns the percentage of the desired amount of time spent for I/O in the event loop.
273      */
274     public int getIoRatio() {
275         return ioRatio;
276     }
277 
278     /**
279      * Sets the percentage of the desired amount of time spent for I/O in the event loop.  The default value is
280      * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
281      */
282     public void setIoRatio(int ioRatio) {
283         if (ioRatio <= 0 || ioRatio > 100) {
284             throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
285         }
286         this.ioRatio = ioRatio;
287     }
288 
289     @Override
290     public int registeredChannels() {
291         return channels.size();
292     }
293 
294     @Override
295     public Iterator<Channel> registeredChannelsIterator() {
296         assert inEventLoop();
297         IntObjectMap<AbstractEpollChannel> ch = channels;
298         if (ch.isEmpty()) {
299             return ChannelsReadOnlyIterator.empty();
300         }
301         return new ChannelsReadOnlyIterator<AbstractEpollChannel>(ch.values());
302     }
303 
304     private long epollWait(long deadlineNanos) throws IOException {
305         if (deadlineNanos == NONE) {
306             return Native.epollWait(epollFd, events, timerFd,
307                     Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD); // disarm timer
308         }
309         long totalDelay = deadlineToDelayNanos(deadlineNanos);
310         int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
311         int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
312         return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
313     }
314 
315     private int epollWaitNoTimerChange() throws IOException {
316         return Native.epollWait(epollFd, events, false);
317     }
318 
319     private int epollWaitNow() throws IOException {
320         return Native.epollWait(epollFd, events, true);
321     }
322 
323     private int epollBusyWait() throws IOException {
324         return Native.epollBusyWait(epollFd, events);
325     }
326 
327     private int epollWaitTimeboxed() throws IOException {
328         // Wait with 1 second "safeguard" timeout
329         return Native.epollWait(epollFd, events, 1000);
330     }
331 
332     @Override
333     protected void run() {
334         long prevDeadlineNanos = NONE;
335         for (;;) {
336             try {
337                 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
338                 switch (strategy) {
339                     case SelectStrategy.CONTINUE:
340                         continue;
341 
342                     case SelectStrategy.BUSY_WAIT:
343                         strategy = epollBusyWait();
344                         break;
345 
346                     case SelectStrategy.SELECT:
347                         if (pendingWakeup) {
348                             // We are going to be immediately woken so no need to reset wakenUp
349                             // or check for timerfd adjustment.
350                             strategy = epollWaitTimeboxed();
351                             if (strategy != 0) {
352                                 break;
353                             }
354                             // We timed out so assume that we missed the write event due to an
355                             // abnormally failed syscall (the write itself or a prior epoll_wait)
356                             logger.warn("Missed eventfd write (not seen after > 1 second)");
357                             pendingWakeup = false;
358                             if (hasTasks()) {
359                                 break;
360                             }
361                             // fall-through
362                         }
363 
364                         long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
365                         if (curDeadlineNanos == -1L) {
366                             curDeadlineNanos = NONE; // nothing on the calendar
367                         }
368                         nextWakeupNanos.set(curDeadlineNanos);
369                         try {
370                             if (!hasTasks()) {
371                                 if (curDeadlineNanos == prevDeadlineNanos) {
372                                     // No timer activity needed
373                                     strategy = epollWaitNoTimerChange();
374                                 } else {
375                                     // Timerfd needs to be re-armed or disarmed
376                                     long result = epollWait(curDeadlineNanos);
377                                     // The result contains the actual return value and if a timer was used or not.
378                                     // We need to "unpack" using the helper methods exposed in Native.
379                                     strategy = Native.epollReady(result);
380                                     prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
381                                 }
382                             }
383                         } finally {
384                             // Try get() first to avoid much more expensive CAS in the case we
385                             // were woken via the wakeup() method (submitted task)
386                             if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
387                                 pendingWakeup = true;
388                             }
389                         }
390                         // fallthrough
391                     default:
392                 }
393 
394                 final int ioRatio = this.ioRatio;
395                 if (ioRatio == 100) {
396                     try {
397                         if (strategy > 0 && processReady(events, strategy)) {
398                             prevDeadlineNanos = NONE;
399                         }
400                     } finally {
401                         // Ensure we always run tasks.
402                         runAllTasks();
403                     }
404                 } else if (strategy > 0) {
405                     final long ioStartTime = System.nanoTime();
406                     try {
407                         if (processReady(events, strategy)) {
408                             prevDeadlineNanos = NONE;
409                         }
410                     } finally {
411                         // Ensure we always run tasks.
412                         final long ioTime = System.nanoTime() - ioStartTime;
413                         runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
414                     }
415                 } else {
416                     runAllTasks(0); // This will run the minimum number of tasks
417                 }
418                 if (allowGrowing && strategy == events.length()) {
419                     //increase the size of the array as we needed the whole space for the events
420                     events.increase();
421                 }
422             } catch (Error e) {
423                 throw e;
424             } catch (Throwable t) {
425                 handleLoopException(t);
426             } finally {
427                 // Always handle shutdown even if the loop processing threw an exception.
428                 try {
429                     if (isShuttingDown()) {
430                         closeAll();
431                         if (confirmShutdown()) {
432                             break;
433                         }
434                     }
435                 } catch (Error e) {
436                     throw e;
437                 } catch (Throwable t) {
438                     handleLoopException(t);
439                 }
440             }
441         }
442     }
443 
444     /**
445      * Visible only for testing!
446      */
447     void handleLoopException(Throwable t) {
448         logger.warn("Unexpected exception in the selector loop.", t);
449 
450         // Prevent possible consecutive immediate failures that lead to
451         // excessive CPU consumption.
452         try {
453             Thread.sleep(1000);
454         } catch (InterruptedException e) {
455             // Ignore.
456         }
457     }
458 
459     private void closeAll() {
460         // Using the intermediate collection to prevent ConcurrentModificationException.
461         // In the `close()` method, the channel is deleted from `channels` map.
462         AbstractEpollChannel[] localChannels = channels.values().toArray(new AbstractEpollChannel[0]);
463 
464         for (AbstractEpollChannel ch: localChannels) {
465             ch.unsafe().close(ch.unsafe().voidPromise());
466         }
467     }
468 
469     // Returns true if a timerFd event was encountered
470     private boolean processReady(EpollEventArray events, int ready) {
471         boolean timerFired = false;
472         for (int i = 0; i < ready; i ++) {
473             final int fd = events.fd(i);
474             if (fd == eventFd.intValue()) {
475                 pendingWakeup = false;
476             } else if (fd == timerFd.intValue()) {
477                 timerFired = true;
478             } else {
479                 final long ev = events.events(i);
480 
481                 AbstractEpollChannel ch = channels.get(fd);
482                 if (ch != null) {
483                     // Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN if you're not 100%
484                     // sure about it!
485                     // Re-ordering can easily introduce bugs and bad side-effects, as we found out painfully in the
486                     // past.
487                     AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
488 
489                     // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
490                     // to read from the file descriptor.
491                     // See https://github.com/netty/netty/issues/3785
492                     //
493                     // It is possible for an EPOLLOUT or EPOLLERR to be generated when a connection is refused.
494                     // In either case epollOutReady() will do the correct thing (finish connecting, or fail
495                     // the connection).
496                     // See https://github.com/netty/netty/issues/3848
497                     if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) {
498                         // Force flush of data as the epoll is writable again
499                         unsafe.epollOutReady();
500                     }
501 
502                     // Check EPOLLIN before EPOLLRDHUP to ensure all data is read before shutting down the input.
503                     // See https://github.com/netty/netty/issues/4317.
504                     //
505                     // If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will
506                     // try to read from the underlying file descriptor and so notify the user about the error.
507                     if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) {
508                         // The Channel is still open and there is something to read. Do it now.
509                         unsafe.epollInReady();
510                     }
511 
512                     // Check if EPOLLRDHUP was set, this will notify us for connection-reset in which case
513                     // we may close the channel directly or try to read more data depending on the state of the
514                     // Channel and als depending on the AbstractEpollChannel subtype.
515                     if ((ev & Native.EPOLLRDHUP) != 0) {
516                         unsafe.epollRdHupReady();
517                     }
518                 } else {
519                     // We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
520                     try {
521                         Native.epollCtlDel(epollFd.intValue(), fd);
522                     } catch (IOException ignore) {
523                         // This can happen but is nothing we need to worry about as we only try to delete
524                         // the fd from the epoll set as we not found it in our mappings. So this call to
525                         // epollCtlDel(...) is just to ensure we cleanup stuff and so may fail if it was
526                         // deleted before or the file descriptor was closed before.
527                     }
528                 }
529             }
530         }
531         return timerFired;
532     }
533 
534     @Override
535     protected void cleanup() {
536         try {
537             closeFileDescriptors();
538         } finally {
539             // release native memory
540             if (iovArray != null) {
541                 iovArray.release();
542                 iovArray = null;
543             }
544             if (datagramPacketArray != null) {
545                 datagramPacketArray.release();
546                 datagramPacketArray = null;
547             }
548             events.free();
549         }
550     }
551 
552     /**
553      * This method is intended for use by process checkpoint/restore
554      * integration, such as OpenJDK CRaC.
555      * It's up to the caller to ensure that there is no concurrent use
556      * of the FDs while these are closed, e.g. by blocking the executor.
557      */
558     @UnstableApi
559     public void closeFileDescriptors() {
560         // Ensure any in-flight wakeup writes have been performed prior to closing eventFd.
561         while (pendingWakeup) {
562             try {
563                 int count = epollWaitTimeboxed();
564                 if (count == 0) {
565                     // We timed-out so assume that the write we're expecting isn't coming
566                     break;
567                 }
568                 for (int i = 0; i < count; i++) {
569                     if (events.fd(i) == eventFd.intValue()) {
570                         pendingWakeup = false;
571                         break;
572                     }
573                 }
574             } catch (IOException ignore) {
575                 // ignore
576             }
577         }
578         try {
579             eventFd.close();
580         } catch (IOException e) {
581             logger.warn("Failed to close the event fd.", e);
582         }
583         try {
584             timerFd.close();
585         } catch (IOException e) {
586             logger.warn("Failed to close the timer fd.", e);
587         }
588 
589         try {
590             epollFd.close();
591         } catch (IOException e) {
592             logger.warn("Failed to close the epoll fd.", e);
593         }
594     }
595 }