View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.channel.EventLoop;
19  import io.netty.channel.EventLoopGroup;
20  import io.netty.channel.EventLoopTaskQueueFactory;
21  import io.netty.channel.SelectStrategy;
22  import io.netty.channel.SingleThreadEventLoop;
23  import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe;
24  import io.netty.channel.unix.FileDescriptor;
25  import io.netty.channel.unix.IovArray;
26  import io.netty.util.IntSupplier;
27  import io.netty.util.collection.IntObjectHashMap;
28  import io.netty.util.collection.IntObjectMap;
29  import io.netty.util.concurrent.RejectedExecutionHandler;
30  import io.netty.util.internal.ObjectUtil;
31  import io.netty.util.internal.PlatformDependent;
32  import io.netty.util.internal.SystemPropertyUtil;
33  import io.netty.util.internal.logging.InternalLogger;
34  import io.netty.util.internal.logging.InternalLoggerFactory;
35  
36  import java.io.IOException;
37  import java.util.Queue;
38  import java.util.concurrent.Executor;
39  import java.util.concurrent.atomic.AtomicLong;
40  
41  import static java.lang.Math.min;
42  
43  /**
44   * {@link EventLoop} which uses epoll under the covers. Only works on Linux!
45   */
46  class EpollEventLoop extends SingleThreadEventLoop {
47      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
48      private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
49              SystemPropertyUtil.getLong("io.netty.channel.epoll.epollWaitThreshold", 10);
50  
51      static {
52          // Ensure JNI is initialized by the time this class is loaded by this time!
53          // We use unix-common methods in this class which are backed by JNI methods.
54          Epoll.ensureAvailability();
55      }
56  
57      private final FileDescriptor epollFd;
58      private final FileDescriptor eventFd;
59      private final FileDescriptor timerFd;
60      private final IntObjectMap<AbstractEpollChannel> channels = new IntObjectHashMap<AbstractEpollChannel>(4096);
61      private final boolean allowGrowing;
62      private final EpollEventArray events;
63  
64      // These are initialized on first use
65      private IovArray iovArray;
66      private NativeDatagramPacketArray datagramPacketArray;
67  
68      private final SelectStrategy selectStrategy;
69      private final IntSupplier selectNowSupplier = new IntSupplier() {
70          @Override
71          public int get() throws Exception {
72              return epollWaitNow();
73          }
74      };
75  
76      private static final long AWAKE = -1L;
77      private static final long NONE = Long.MAX_VALUE;
78  
79      // nextWakeupNanos is:
80      //    AWAKE            when EL is awake
81      //    NONE             when EL is waiting with no wakeup scheduled
82      //    other value T    when EL is waiting with wakeup scheduled at time T
83      private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
84      private boolean pendingWakeup;
85      private volatile int ioRatio = 50;
86  
87      // See https://man7.org/linux/man-pages/man2/timerfd_create.2.html.
88      private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
89  
90      EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
91                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
92                     EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
93          super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
94                  rejectedExecutionHandler);
95          selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
96          if (maxEvents == 0) {
97              allowGrowing = true;
98              events = new EpollEventArray(4096);
99          } else {
100             allowGrowing = false;
101             events = new EpollEventArray(maxEvents);
102         }
103         boolean success = false;
104         FileDescriptor epollFd = null;
105         FileDescriptor eventFd = null;
106         FileDescriptor timerFd = null;
107         try {
108             this.epollFd = epollFd = Native.newEpollCreate();
109             this.eventFd = eventFd = Native.newEventFd();
110             try {
111                 // It is important to use EPOLLET here as we only want to get the notification once per
112                 // wakeup and don't call eventfd_read(...).
113                 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
114             } catch (IOException e) {
115                 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
116             }
117             this.timerFd = timerFd = Native.newTimerFd();
118             try {
119                 // It is important to use EPOLLET here as we only want to get the notification once per
120                 // wakeup and don't call read(...).
121                 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
122             } catch (IOException e) {
123                 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
124             }
125             success = true;
126         } finally {
127             if (!success) {
128                 if (epollFd != null) {
129                     try {
130                         epollFd.close();
131                     } catch (Exception e) {
132                         // ignore
133                     }
134                 }
135                 if (eventFd != null) {
136                     try {
137                         eventFd.close();
138                     } catch (Exception e) {
139                         // ignore
140                     }
141                 }
142                 if (timerFd != null) {
143                     try {
144                         timerFd.close();
145                     } catch (Exception e) {
146                         // ignore
147                     }
148                 }
149             }
150         }
151     }
152 
153     private static Queue<Runnable> newTaskQueue(
154             EventLoopTaskQueueFactory queueFactory) {
155         if (queueFactory == null) {
156             return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
157         }
158         return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
159     }
160 
161     /**
162      * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
163      */
164     IovArray cleanIovArray() {
165         if (iovArray == null) {
166             iovArray = new IovArray();
167         } else {
168             iovArray.clear();
169         }
170         return iovArray;
171     }
172 
173     /**
174      * Return a cleared {@link NativeDatagramPacketArray} that can be used for writes in this {@link EventLoop}.
175      */
176     NativeDatagramPacketArray cleanDatagramPacketArray() {
177         if (datagramPacketArray == null) {
178             datagramPacketArray = new NativeDatagramPacketArray();
179         } else {
180             datagramPacketArray.clear();
181         }
182         return datagramPacketArray;
183     }
184 
185     @Override
186     protected void wakeup(boolean inEventLoop) {
187         if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
188             // write to the evfd which will then wake-up epoll_wait(...)
189             Native.eventFdWrite(eventFd.intValue(), 1L);
190         }
191     }
192 
193     @Override
194     protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
195         // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
196         return deadlineNanos < nextWakeupNanos.get();
197     }
198 
199     @Override
200     protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
201         // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
202         return deadlineNanos < nextWakeupNanos.get();
203     }
204 
205     /**
206      * Register the given epoll with this {@link EventLoop}.
207      */
208     void add(AbstractEpollChannel ch) throws IOException {
209         assert inEventLoop();
210         int fd = ch.socket.intValue();
211         Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags);
212         AbstractEpollChannel old = channels.put(fd, ch);
213 
214         // We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already
215         // closed.
216         assert old == null || !old.isOpen();
217     }
218 
219     /**
220      * The flags of the given epoll was modified so update the registration
221      */
222     void modify(AbstractEpollChannel ch) throws IOException {
223         assert inEventLoop();
224         Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags);
225     }
226 
227     /**
228      * Deregister the given epoll from this {@link EventLoop}.
229      */
230     void remove(AbstractEpollChannel ch) throws IOException {
231         assert inEventLoop();
232         int fd = ch.socket.intValue();
233 
234         AbstractEpollChannel old = channels.remove(fd);
235         if (old != null && old != ch) {
236             // The Channel mapping was already replaced due FD reuse, put back the stored Channel.
237             channels.put(fd, old);
238 
239             // If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed.
240             assert !ch.isOpen();
241         } else if (ch.isOpen()) {
242             // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
243             // removed once the file-descriptor is closed.
244             Native.epollCtlDel(epollFd.intValue(), fd);
245         }
246     }
247 
248     @Override
249     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
250         return newTaskQueue0(maxPendingTasks);
251     }
252 
253     private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
254         // This event loop never calls takeTask()
255         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
256                 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
257     }
258 
259     /**
260      * Returns the percentage of the desired amount of time spent for I/O in the event loop.
261      */
262     public int getIoRatio() {
263         return ioRatio;
264     }
265 
266     /**
267      * Sets the percentage of the desired amount of time spent for I/O in the event loop.  The default value is
268      * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
269      */
270     public void setIoRatio(int ioRatio) {
271         if (ioRatio <= 0 || ioRatio > 100) {
272             throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
273         }
274         this.ioRatio = ioRatio;
275     }
276 
277     @Override
278     public int registeredChannels() {
279         return channels.size();
280     }
281 
282     private long epollWait(long deadlineNanos) throws IOException {
283         if (deadlineNanos == NONE) {
284             return Native.epollWait(epollFd, events, timerFd,
285                     Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD); // disarm timer
286         }
287         long totalDelay = deadlineToDelayNanos(deadlineNanos);
288         int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
289         int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
290         return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
291     }
292 
293     private int epollWaitNoTimerChange() throws IOException {
294         return Native.epollWait(epollFd, events, false);
295     }
296 
297     private int epollWaitNow() throws IOException {
298         return Native.epollWait(epollFd, events, true);
299     }
300 
301     private int epollBusyWait() throws IOException {
302         return Native.epollBusyWait(epollFd, events);
303     }
304 
305     private int epollWaitTimeboxed() throws IOException {
306         // Wait with 1 second "safeguard" timeout
307         return Native.epollWait(epollFd, events, 1000);
308     }
309 
310     @Override
311     protected void run() {
312         long prevDeadlineNanos = NONE;
313         for (;;) {
314             try {
315                 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
316                 switch (strategy) {
317                     case SelectStrategy.CONTINUE:
318                         continue;
319 
320                     case SelectStrategy.BUSY_WAIT:
321                         strategy = epollBusyWait();
322                         break;
323 
324                     case SelectStrategy.SELECT:
325                         if (pendingWakeup) {
326                             // We are going to be immediately woken so no need to reset wakenUp
327                             // or check for timerfd adjustment.
328                             strategy = epollWaitTimeboxed();
329                             if (strategy != 0) {
330                                 break;
331                             }
332                             // We timed out so assume that we missed the write event due to an
333                             // abnormally failed syscall (the write itself or a prior epoll_wait)
334                             logger.warn("Missed eventfd write (not seen after > 1 second)");
335                             pendingWakeup = false;
336                             if (hasTasks()) {
337                                 break;
338                             }
339                             // fall-through
340                         }
341 
342                         long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
343                         if (curDeadlineNanos == -1L) {
344                             curDeadlineNanos = NONE; // nothing on the calendar
345                         }
346                         nextWakeupNanos.set(curDeadlineNanos);
347                         try {
348                             if (!hasTasks()) {
349                                 if (curDeadlineNanos == prevDeadlineNanos) {
350                                     // No timer activity needed
351                                     strategy = epollWaitNoTimerChange();
352                                 } else {
353                                     // Timerfd needs to be re-armed or disarmed
354                                     long result = epollWait(curDeadlineNanos);
355                                     // The result contains the actual return value and if a timer was used or not.
356                                     // We need to "unpack" using the helper methods exposed in Native.
357                                     strategy = Native.epollReady(result);
358                                     prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
359                                 }
360                             }
361                         } finally {
362                             // Try get() first to avoid much more expensive CAS in the case we
363                             // were woken via the wakeup() method (submitted task)
364                             if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
365                                 pendingWakeup = true;
366                             }
367                         }
368                         // fallthrough
369                     default:
370                 }
371 
372                 final int ioRatio = this.ioRatio;
373                 if (ioRatio == 100) {
374                     try {
375                         if (strategy > 0 && processReady(events, strategy)) {
376                             prevDeadlineNanos = NONE;
377                         }
378                     } finally {
379                         // Ensure we always run tasks.
380                         runAllTasks();
381                     }
382                 } else if (strategy > 0) {
383                     final long ioStartTime = System.nanoTime();
384                     try {
385                         if (processReady(events, strategy)) {
386                             prevDeadlineNanos = NONE;
387                         }
388                     } finally {
389                         // Ensure we always run tasks.
390                         final long ioTime = System.nanoTime() - ioStartTime;
391                         runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
392                     }
393                 } else {
394                     runAllTasks(0); // This will run the minimum number of tasks
395                 }
396                 if (allowGrowing && strategy == events.length()) {
397                     //increase the size of the array as we needed the whole space for the events
398                     events.increase();
399                 }
400             } catch (Error e) {
401                 throw e;
402             } catch (Throwable t) {
403                 handleLoopException(t);
404             } finally {
405                 // Always handle shutdown even if the loop processing threw an exception.
406                 try {
407                     if (isShuttingDown()) {
408                         closeAll();
409                         if (confirmShutdown()) {
410                             break;
411                         }
412                     }
413                 } catch (Error e) {
414                     throw e;
415                 } catch (Throwable t) {
416                     handleLoopException(t);
417                 }
418             }
419         }
420     }
421 
422     /**
423      * Visible only for testing!
424      */
425     void handleLoopException(Throwable t) {
426         logger.warn("Unexpected exception in the selector loop.", t);
427 
428         // Prevent possible consecutive immediate failures that lead to
429         // excessive CPU consumption.
430         try {
431             Thread.sleep(1000);
432         } catch (InterruptedException e) {
433             // Ignore.
434         }
435     }
436 
437     private void closeAll() {
438         // Using the intermediate collection to prevent ConcurrentModificationException.
439         // In the `close()` method, the channel is deleted from `channels` map.
440         AbstractEpollChannel[] localChannels = channels.values().toArray(new AbstractEpollChannel[0]);
441 
442         for (AbstractEpollChannel ch: localChannels) {
443             ch.unsafe().close(ch.unsafe().voidPromise());
444         }
445     }
446 
447     // Returns true if a timerFd event was encountered
448     private boolean processReady(EpollEventArray events, int ready) {
449         boolean timerFired = false;
450         for (int i = 0; i < ready; i ++) {
451             final int fd = events.fd(i);
452             if (fd == eventFd.intValue()) {
453                 pendingWakeup = false;
454             } else if (fd == timerFd.intValue()) {
455                 timerFired = true;
456             } else {
457                 final long ev = events.events(i);
458 
459                 AbstractEpollChannel ch = channels.get(fd);
460                 if (ch != null) {
461                     // Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN if you're not 100%
462                     // sure about it!
463                     // Re-ordering can easily introduce bugs and bad side-effects, as we found out painfully in the
464                     // past.
465                     AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
466 
467                     // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
468                     // to read from the file descriptor.
469                     // See https://github.com/netty/netty/issues/3785
470                     //
471                     // It is possible for an EPOLLOUT or EPOLLERR to be generated when a connection is refused.
472                     // In either case epollOutReady() will do the correct thing (finish connecting, or fail
473                     // the connection).
474                     // See https://github.com/netty/netty/issues/3848
475                     if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) {
476                         // Force flush of data as the epoll is writable again
477                         unsafe.epollOutReady();
478                     }
479 
480                     // Check EPOLLIN before EPOLLRDHUP to ensure all data is read before shutting down the input.
481                     // See https://github.com/netty/netty/issues/4317.
482                     //
483                     // If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will
484                     // try to read from the underlying file descriptor and so notify the user about the error.
485                     if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) {
486                         // The Channel is still open and there is something to read. Do it now.
487                         unsafe.epollInReady();
488                     }
489 
490                     // Check if EPOLLRDHUP was set, this will notify us for connection-reset in which case
491                     // we may close the channel directly or try to read more data depending on the state of the
492                     // Channel and als depending on the AbstractEpollChannel subtype.
493                     if ((ev & Native.EPOLLRDHUP) != 0) {
494                         unsafe.epollRdHupReady();
495                     }
496                 } else {
497                     // We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
498                     try {
499                         Native.epollCtlDel(epollFd.intValue(), fd);
500                     } catch (IOException ignore) {
501                         // This can happen but is nothing we need to worry about as we only try to delete
502                         // the fd from the epoll set as we not found it in our mappings. So this call to
503                         // epollCtlDel(...) is just to ensure we cleanup stuff and so may fail if it was
504                         // deleted before or the file descriptor was closed before.
505                     }
506                 }
507             }
508         }
509         return timerFired;
510     }
511 
512     @Override
513     protected void cleanup() {
514         try {
515             // Ensure any in-flight wakeup writes have been performed prior to closing eventFd.
516             while (pendingWakeup) {
517                 try {
518                     int count = epollWaitTimeboxed();
519                     if (count == 0) {
520                         // We timed-out so assume that the write we're expecting isn't coming
521                         break;
522                     }
523                     for (int i = 0; i < count; i++) {
524                         if (events.fd(i) == eventFd.intValue()) {
525                             pendingWakeup = false;
526                             break;
527                         }
528                     }
529                 } catch (IOException ignore) {
530                     // ignore
531                 }
532             }
533             try {
534                 eventFd.close();
535             } catch (IOException e) {
536                 logger.warn("Failed to close the event fd.", e);
537             }
538             try {
539                 timerFd.close();
540             } catch (IOException e) {
541                 logger.warn("Failed to close the timer fd.", e);
542             }
543 
544             try {
545                 epollFd.close();
546             } catch (IOException e) {
547                 logger.warn("Failed to close the epoll fd.", e);
548             }
549         } finally {
550             // release native memory
551             if (iovArray != null) {
552                 iovArray.release();
553                 iovArray = null;
554             }
555             if (datagramPacketArray != null) {
556                 datagramPacketArray.release();
557                 datagramPacketArray = null;
558             }
559             events.free();
560         }
561     }
562 }