View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.epoll;
17  
18  import io.netty5.channel.DefaultSelectStrategyFactory;
19  import io.netty5.channel.IoExecutionContext;
20  import io.netty5.channel.IoHandle;
21  import io.netty5.channel.IoHandler;
22  import io.netty5.channel.IoHandlerFactory;
23  import io.netty5.channel.SelectStrategy;
24  import io.netty5.channel.SelectStrategyFactory;
25  import io.netty5.channel.SingleThreadEventLoop;
26  import io.netty5.channel.unix.FileDescriptor;
27  import io.netty5.channel.unix.IovArray;
28  import io.netty5.util.collection.IntObjectHashMap;
29  import io.netty5.util.collection.IntObjectMap;
30  import io.netty5.util.internal.StringUtil;
31  import io.netty5.util.internal.SystemPropertyUtil;
32  import io.netty5.util.internal.logging.InternalLogger;
33  import io.netty5.util.internal.logging.InternalLoggerFactory;
34  
35  import java.io.IOException;
36  import java.io.UncheckedIOException;
37  import java.util.concurrent.atomic.AtomicLong;
38  import java.util.function.IntSupplier;
39  
40  import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
41  import static java.lang.Math.min;
42  import static java.util.Objects.requireNonNull;
43  
44  /**
45   * {@link IoHandler} which uses epoll under the covers. Only works on Linux!
46   */
47  public class EpollHandler implements IoHandler {
48      private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
49              SystemPropertyUtil.getLong("io.netty5.channel.epoll.epollWaitThreshold", 10);
50  
51      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollHandler.class);
52  
53      static {
54          // Ensure JNI is initialized by the time this class is loaded by this time!
55          // We use unix-common methods in this class which are backed by JNI methods.
56          Epoll.ensureAvailability();
57      }
58  
59      // Pick a number that no task could have previously used.
60      private long prevDeadlineNanos = SingleThreadEventLoop.nanoTime() - 1;
61      private final FileDescriptor epollFd;
62      private final FileDescriptor eventFd;
63      private final FileDescriptor timerFd;
64      private final IntObjectMap<AbstractEpollChannel<?>> channels = new IntObjectHashMap<>(4096);
65      private final boolean allowGrowing;
66      private final EpollEventArray events;
67  
68      // These are initialized on first use
69      private IovArray iovArray;
70      private NativeDatagramPacketArray datagramPacketArray;
71  
72      private final SelectStrategy selectStrategy;
73      private final IntSupplier selectNowSupplier = () -> {
74          try {
75              return epollWaitNow();
76          } catch (IOException e) {
77              throw new UncheckedIOException(e);
78          }
79      };
80  
81      private static final long AWAKE = -1L;
82      private static final long NONE = Long.MAX_VALUE;
83  
84      // nextWakeupNanos is:
85      //    AWAKE            when EL is awake
86      //    NONE             when EL is waiting with no wakeup scheduled
87      //    other value T    when EL is waiting with wakeup scheduled at time T
88      private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
89  
90      private boolean pendingWakeup;
91  
92      // See https://man7.org/linux/man-pages/man2/timerfd_create.2.html.
93      private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
94  
95      private static AbstractEpollChannel<?> cast(IoHandle handle) {
96          if (handle instanceof AbstractEpollChannel) {
97              return (AbstractEpollChannel<?>) handle;
98          }
99          throw new IllegalArgumentException("Channel of type " + StringUtil.simpleClassName(handle) + " not supported");
100     }
101 
102     private EpollHandler() {
103         this(0, DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy());
104     }
105 
106     // Package-private for tests.
107     EpollHandler(int maxEvents, SelectStrategy strategy) {
108         selectStrategy = strategy;
109         if (maxEvents == 0) {
110             allowGrowing = true;
111             events = new EpollEventArray(4096);
112         } else {
113             allowGrowing = false;
114             events = new EpollEventArray(maxEvents);
115         }
116         boolean success = false;
117         FileDescriptor epollFd = null;
118         FileDescriptor eventFd = null;
119         FileDescriptor timerFd = null;
120         try {
121             this.epollFd = epollFd = Native.newEpollCreate();
122             this.eventFd = eventFd = Native.newEventFd();
123             try {
124                 // It is important to use EPOLLET here as we only want to get the notification once per
125                 // wakeup and don't call eventfd_read(...).
126                 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
127             } catch (IOException e) {
128                 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
129             }
130             this.timerFd = timerFd = Native.newTimerFd();
131             try {
132                 // It is important to use EPOLLET here as we only want to get the notification once per
133                 // wakeup and don't call read(...).
134                 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
135             } catch (IOException e) {
136                 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
137             }
138             success = true;
139         } finally {
140             if (!success) {
141                 if (epollFd != null) {
142                     try {
143                         epollFd.close();
144                     } catch (Exception e) {
145                         // ignore
146                     }
147                 }
148                 if (eventFd != null) {
149                     try {
150                         eventFd.close();
151                     } catch (Exception e) {
152                         // ignore
153                     }
154                 }
155                 if (timerFd != null) {
156                     try {
157                         timerFd.close();
158                     } catch (Exception e) {
159                         // ignore
160                     }
161                 }
162             }
163         }
164     }
165 
166     /**
167      * Returns a new {@link IoHandlerFactory} that creates {@link EpollHandler} instances.
168      */
169     public static IoHandlerFactory newFactory() {
170         return EpollHandler::new;
171     }
172 
173     /**
174      * Returns a new {@link IoHandlerFactory} that creates {@link EpollHandler} instances.
175      */
176     public static IoHandlerFactory newFactory(final int maxEvents,
177                                               final SelectStrategyFactory selectStrategyFactory) {
178         checkPositiveOrZero(maxEvents, "maxEvents");
179         requireNonNull(selectStrategyFactory, "selectStrategyFactory");
180         return () -> new EpollHandler(maxEvents, selectStrategyFactory.newSelectStrategy());
181     }
182 
183     private IovArray cleanIovArray() {
184         if (iovArray == null) {
185             iovArray = new IovArray();
186         } else {
187             iovArray.clear();
188         }
189         return iovArray;
190     }
191 
192     private NativeDatagramPacketArray cleanDatagramPacketArray() {
193         if (datagramPacketArray == null) {
194             datagramPacketArray = new NativeDatagramPacketArray();
195         } else {
196             datagramPacketArray.clear();
197         }
198         return datagramPacketArray;
199     }
200 
201     @Override
202     public final void register(IoHandle handle) throws Exception {
203         final AbstractEpollChannel<?> epollChannel = cast(handle);
204         epollChannel.register0(new EpollRegistration() {
205             @Override
206             public void update() throws IOException {
207                 EpollHandler.this.modify(epollChannel);
208             }
209 
210             @Override
211             public void remove() throws IOException {
212                 EpollHandler.this.remove(epollChannel);
213             }
214 
215             @Override
216             public IovArray cleanIovArray() {
217                 return EpollHandler.this.cleanIovArray();
218             }
219 
220             @Override
221             public NativeDatagramPacketArray cleanDatagramPacketArray() {
222                 return EpollHandler.this.cleanDatagramPacketArray();
223             }
224         });
225         add(epollChannel);
226     }
227 
228     @Override
229     public final void deregister(IoHandle handle) throws Exception {
230         cast(handle).deregister0();
231     }
232 
233     @Override
234     public final void wakeup(boolean inEventLoop) {
235         if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
236             // write to the evfd which will then wake-up epoll_wait(...)
237             Native.eventFdWrite(eventFd.intValue(), 1L);
238         }
239     }
240 
241     /**
242      * Register the given channel with this {@link EpollHandler}.
243      */
244     private void add(AbstractEpollChannel<?> ch) throws IOException {
245         int fd = ch.socket.intValue();
246         Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags());
247         AbstractEpollChannel<?> old = channels.put(fd, ch);
248 
249         // We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already
250         // closed.
251         assert old == null || !old.isOpen();
252     }
253 
254     /**
255      * The flags of the given epoll was modified so update the registration
256      */
257     private void modify(AbstractEpollChannel<?> ch) throws IOException {
258         Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags());
259     }
260 
261     /**
262      * Deregister the given channel from this {@link EpollHandler}.
263      */
264     private void remove(AbstractEpollChannel<?> ch) throws IOException {
265         int fd = ch.socket.intValue();
266 
267         AbstractEpollChannel<?> old = channels.remove(fd);
268         if (old != null && old != ch) {
269             // The Channel mapping was already replaced due FD reuse, put back the stored Channel.
270             channels.put(fd, old);
271 
272             // If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed.
273             assert !ch.isOpen();
274         } else if (ch.isOpen()) {
275             // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
276             // removed once the file-descriptor is closed.
277             Native.epollCtlDel(epollFd.intValue(), fd);
278         }
279     }
280 
281     private long epollWait(IoExecutionContext context, long deadlineNanos) throws IOException {
282         if (deadlineNanos == NONE) {
283             return Native.epollWait(epollFd, events, timerFd,
284                     Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD); // disarm timer
285         }
286         long totalDelay = context.delayNanos(System.nanoTime());
287         int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
288         int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
289         return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
290     }
291 
292     private int epollWaitNoTimerChange() throws IOException {
293         return Native.epollWait(epollFd, events, false);
294     }
295 
296     private int epollWaitNow() throws IOException {
297         return Native.epollWait(epollFd, events, true);
298     }
299 
300     private int epollBusyWait() throws IOException {
301         return Native.epollBusyWait(epollFd, events);
302     }
303 
304     private int epollWaitTimeboxed() throws IOException {
305         // Wait with 1 second "safeguard" timeout
306         return Native.epollWait(epollFd, events, 1000);
307     }
308 
309     @Override
310     public final int run(IoExecutionContext context) {
311         int handled = 0;
312         try {
313             int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
314             switch (strategy) {
315                 case SelectStrategy.CONTINUE:
316                     return 0;
317 
318                 case SelectStrategy.BUSY_WAIT:
319                     strategy = epollBusyWait();
320                     break;
321 
322                 case SelectStrategy.SELECT:
323                     if (pendingWakeup) {
324                         // We are going to be immediately woken so no need to reset wakenUp
325                         // or check for timerfd adjustment.
326                         strategy = epollWaitTimeboxed();
327                         if (strategy != 0) {
328                             break;
329                         }
330                         // We timed out so assume that we missed the write event due to an
331                         // abnormally failed syscall (the write itself or a prior epoll_wait)
332                         logger.warn("Missed eventfd write (not seen after > 1 second)");
333                         pendingWakeup = false;
334                         if (!context.canBlock()) {
335                             break;
336                         }
337                         // fall-through
338                     }
339 
340                     long curDeadlineNanos = context.deadlineNanos();
341                     if (curDeadlineNanos == -1L) {
342                         curDeadlineNanos = NONE; // nothing on the calendar
343                     }
344                     nextWakeupNanos.set(curDeadlineNanos);
345                     try {
346                         if (context.canBlock()) {
347                             if (curDeadlineNanos == prevDeadlineNanos) {
348                                 // No timer activity needed
349                                 strategy = epollWaitNoTimerChange();
350                             } else {
351                                 // Timerfd needs to be re-armed or disarmed
352                                 long result = epollWait(context, curDeadlineNanos);
353                                 // The result contains the actual return value and if a timer was used or not.
354                                 // We need to "unpack" using the helper methods exposed in Native.
355                                 strategy = Native.epollReady(result);
356                                 prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
357                             }
358                         }
359                     } finally {
360                         // Try get() first to avoid much more expensive CAS in the case we
361                         // were woken via the wakeup() method (submitted task)
362                         if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
363                             pendingWakeup = true;
364                         }
365                     }
366                     // fall-through
367                 default:
368             }
369             if (strategy > 0) {
370                 handled = strategy;
371                 if (processReady(events, strategy)) {
372                     prevDeadlineNanos = NONE;
373                 }
374             }
375             if (allowGrowing && strategy == events.length()) {
376                 //increase the size of the array as we needed the whole space for the events
377                 events.increase();
378             }
379         } catch (Error error) {
380             throw error;
381         } catch (Throwable t) {
382             handleLoopException(t);
383         }
384         return handled;
385     }
386 
387     /**
388      * Visible only for testing!
389      */
390     void handleLoopException(Throwable t) {
391         logger.warn("Unexpected exception in the selector loop.", t);
392 
393         // Prevent possible consecutive immediate failures that lead to
394         // excessive CPU consumption.
395         try {
396             Thread.sleep(1000);
397         } catch (InterruptedException e) {
398             // Ignore.
399         }
400     }
401 
402     @Override
403     public void prepareToDestroy() {
404         // Using the intermediate collection to prevent ConcurrentModificationException.
405         // In the `close()` method, the channel is deleted from `channels` map.
406         AbstractEpollChannel<?>[] localChannels = channels.values().toArray(new AbstractEpollChannel[0]);
407 
408         for (AbstractEpollChannel<?> ch: localChannels) {
409             ch.closeTransportNow();
410         }
411     }
412 
413     // Returns true if a timerFd event was encountered
414     private boolean processReady(EpollEventArray events, int ready) {
415         boolean timerFired = false;
416         for (int i = 0; i < ready; i ++) {
417             final int fd = events.fd(i);
418             if (fd == eventFd.intValue()) {
419                 pendingWakeup = false;
420             } else if (fd == timerFd.intValue()) {
421                 timerFired = true;
422             } else {
423                 final long ev = events.events(i);
424 
425                 AbstractEpollChannel<?> ch = channels.get(fd);
426                 if (ch != null) {
427                     // Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN if you're not 100%
428                     // sure about it!
429                     // Re-ordering can easily introduce bugs and bad side-effects, as we found out painfully in the
430                     // past.
431 
432                     // First check for EPOLLOUT as we may need to fail the connect Promise before try
433                     // to read from the file descriptor.
434                     // See https://github.com/netty/netty/issues/3785
435                     //
436                     // It is possible for an EPOLLOUT or EPOLLERR to be generated when a connection is refused.
437                     // In either case epollOutReady() will do the correct thing (finish connecting, or fail
438                     // the connection).
439                     // See https://github.com/netty/netty/issues/3848
440                     if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) {
441                         // Force flush of data as the epoll is writable again
442                         ch.epollOutReady();
443                     }
444 
445                     // Check EPOLLIN before EPOLLRDHUP to ensure all data is read before shutting down the input.
446                     // See https://github.com/netty/netty/issues/4317.
447                     //
448                     // If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will
449                     // try to read from the underlying file descriptor and so notify the user about the error.
450                     if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) {
451                         // The Channel is still open and there is something to read. Do it now.
452                         ch.epollInReady();
453                     }
454 
455                     // Check if EPOLLRDHUP was set, this will notify us for connection-reset in which case
456                     // we may close the channel directly or try to read more data depending on the state of the
457                     // Channel and als depending on the AbstractEpollChannel subtype.
458                     if ((ev & Native.EPOLLRDHUP) != 0) {
459                         ch.epollRdHupReady();
460                     }
461                 } else {
462                     // We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
463                     try {
464                         Native.epollCtlDel(epollFd.intValue(), fd);
465                     } catch (IOException ignore) {
466                         // This can happen but is nothing we need to worry about as we only try to delete
467                         // the fd from the epoll set as we not found it in our mappings. So this call to
468                         // epollCtlDel(...) is just to ensure we cleanup stuff and so may fail if it was
469                         // deleted before or the file descriptor was closed before.
470                     }
471                 }
472             }
473         }
474         return timerFired;
475     }
476 
477     @Override
478     public final void destroy() {
479         try {
480             // Ensure any in-flight wakeup writes have been performed prior to closing eventFd.
481             while (pendingWakeup) {
482                 try {
483                     int count = epollWaitTimeboxed();
484                     if (count == 0) {
485                         // We timed-out so assume that the write we're expecting isn't coming
486                         break;
487                     }
488                     for (int i = 0; i < count; i++) {
489                         if (events.fd(i) == eventFd.intValue()) {
490                             pendingWakeup = false;
491                             break;
492                         }
493                     }
494                 } catch (IOException ignore) {
495                     // ignore
496                 }
497             }
498             try {
499                 eventFd.close();
500             } catch (IOException e) {
501                 logger.warn("Failed to close the event fd.", e);
502             }
503             try {
504                 timerFd.close();
505             } catch (IOException e) {
506                 logger.warn("Failed to close the timer fd.", e);
507             }
508             try {
509                 epollFd.close();
510             } catch (IOException e) {
511                 logger.warn("Failed to close the epoll fd.", e);
512             }
513         } finally {
514             // release native memory
515             if (iovArray != null) {
516                 iovArray.release();
517                 iovArray = null;
518             }
519             if (datagramPacketArray != null) {
520                 datagramPacketArray.release();
521                 datagramPacketArray = null;
522             }
523             events.free();
524         }
525     }
526 
527     @Override
528     public boolean isCompatible(Class<? extends IoHandle> handleType) {
529         return AbstractEpollChannel.class.isAssignableFrom(handleType);
530     }
531 }