View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.DefaultSelectStrategyFactory;
20  import io.netty.channel.EventLoop;
21  import io.netty.channel.IoEventLoop;
22  import io.netty.channel.IoExecutionContext;
23  import io.netty.channel.IoHandle;
24  import io.netty.channel.IoHandler;
25  import io.netty.channel.IoHandlerFactory;
26  import io.netty.channel.IoOps;
27  import io.netty.channel.SelectStrategy;
28  import io.netty.channel.SelectStrategyFactory;
29  import io.netty.channel.unix.FileDescriptor;
30  import io.netty.channel.unix.IovArray;
31  import io.netty.util.IntSupplier;
32  import io.netty.util.collection.IntObjectHashMap;
33  import io.netty.util.collection.IntObjectMap;
34  import io.netty.util.internal.ObjectUtil;
35  import io.netty.util.internal.StringUtil;
36  import io.netty.util.internal.SystemPropertyUtil;
37  import io.netty.util.internal.UnstableApi;
38  import io.netty.util.internal.logging.InternalLogger;
39  import io.netty.util.internal.logging.InternalLoggerFactory;
40  
41  import java.io.IOException;
42  import java.util.ArrayList;
43  import java.util.Collections;
44  import java.util.List;
45  import java.util.concurrent.atomic.AtomicBoolean;
46  import java.util.concurrent.atomic.AtomicLong;
47  
48  import static java.lang.Math.min;
49  import static java.lang.System.nanoTime;
50  
51  /**
52   * {@link IoHandler} which uses epoll under the covers. Only works on Linux!
53   */
54  public class EpollIoHandler implements IoHandler {
55      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollIoHandler.class);
56      private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
57              SystemPropertyUtil.getLong("io.netty.channel.epoll.epollWaitThreshold", 10);
58  
59      static {
60          // Ensure JNI is initialized by the time this class is loaded by this time!
61          // We use unix-common methods in this class which are backed by JNI methods.
62          Epoll.ensureAvailability();
63      }
64  
65      // Pick a number that no task could have previously used.
66      private long prevDeadlineNanos = nanoTime() - 1;
67      private FileDescriptor epollFd;
68      private FileDescriptor eventFd;
69      private FileDescriptor timerFd;
70      private final IntObjectMap<DefaultEpollIoRegistration> registrations = new IntObjectHashMap<>(4096);
71      private final boolean allowGrowing;
72      private final EpollEventArray events;
73  
74      // These are initialized on first use
75      private IovArray iovArray;
76      private NativeDatagramPacketArray datagramPacketArray;
77  
78      private final SelectStrategy selectStrategy;
79      private final IntSupplier selectNowSupplier = new IntSupplier() {
80          @Override
81          public int get() throws Exception {
82              return epollWaitNow();
83          }
84      };
85  
86      private static final long AWAKE = -1L;
87      private static final long NONE = Long.MAX_VALUE;
88  
89      // nextWakeupNanos is:
90      //    AWAKE            when EL is awake
91      //    NONE             when EL is waiting with no wakeup scheduled
92      //    other value T    when EL is waiting with wakeup scheduled at time T
93      private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
94      private boolean pendingWakeup;
95  
96      private int numChannels;
97  
98      // See https://man7.org/linux/man-pages/man2/timerfd_create.2.html.
99      private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
100 
101     /**
102      * Returns a new {@link IoHandlerFactory} that creates {@link EpollIoHandler} instances.
103      */
104     public static IoHandlerFactory newFactory() {
105         return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
106     }
107 
108     /**
109      * Returns a new {@link IoHandlerFactory} that creates {@link EpollIoHandler} instances.
110      */
111     public static IoHandlerFactory newFactory(final int maxEvents,
112                                               final SelectStrategyFactory selectStrategyFactory) {
113         ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
114         ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
115         return new IoHandlerFactory() {
116             @Override
117             public IoHandler newHandler() {
118                 return new EpollIoHandler(maxEvents, selectStrategyFactory.newSelectStrategy());
119             }
120         };
121     }
122 
123     // Package-private for testing
124     EpollIoHandler(int maxEvents, SelectStrategy strategy) {
125         selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
126         if (maxEvents == 0) {
127             allowGrowing = true;
128             events = new EpollEventArray(4096);
129         } else {
130             allowGrowing = false;
131             events = new EpollEventArray(maxEvents);
132         }
133         openFileDescriptors();
134     }
135 
136     private static EpollIoHandle cast(IoHandle handle) {
137         if (handle instanceof EpollIoHandle) {
138             return (EpollIoHandle) handle;
139         }
140         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
141     }
142 
143     private static EpollIoOps cast(IoOps ops) {
144         if (ops instanceof EpollIoOps) {
145             return (EpollIoOps) ops;
146         }
147         throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
148     }
149 
150     /**
151      * This method is intended for use by a process checkpoint/restore
152      * integration, such as OpenJDK CRaC.
153      */
154     @UnstableApi
155     public void openFileDescriptors() {
156         boolean success = false;
157         FileDescriptor epollFd = null;
158         FileDescriptor eventFd = null;
159         FileDescriptor timerFd = null;
160         try {
161             this.epollFd = epollFd = Native.newEpollCreate();
162             this.eventFd = eventFd = Native.newEventFd();
163             try {
164                 // It is important to use EPOLLET here as we only want to get the notification once per
165                 // wakeup and don't call eventfd_read(...).
166                 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
167             } catch (IOException e) {
168                 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
169             }
170             this.timerFd = timerFd = Native.newTimerFd();
171             try {
172                 // It is important to use EPOLLET here as we only want to get the notification once per
173                 // wakeup and don't call read(...).
174                 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
175             } catch (IOException e) {
176                 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
177             }
178             success = true;
179         } finally {
180             if (!success) {
181                 closeFileDescriptor(epollFd);
182                 closeFileDescriptor(eventFd);
183                 closeFileDescriptor(timerFd);
184             }
185         }
186     }
187 
188     private static void closeFileDescriptor(FileDescriptor fd) {
189         if (fd != null) {
190             try {
191                 fd.close();
192             } catch (Exception e) {
193                 // ignore
194             }
195         }
196     }
197 
198     /**
199      * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
200      */
201     IovArray cleanIovArray() {
202         if (iovArray == null) {
203             iovArray = new IovArray();
204         } else {
205             iovArray.clear();
206         }
207         return iovArray;
208     }
209 
210     /**
211      * Return a cleared {@link NativeDatagramPacketArray} that can be used for writes in this {@link EventLoop}.
212      */
213     NativeDatagramPacketArray cleanDatagramPacketArray() {
214         if (datagramPacketArray == null) {
215             datagramPacketArray = new NativeDatagramPacketArray();
216         } else {
217             datagramPacketArray.clear();
218         }
219         return datagramPacketArray;
220     }
221 
222     @Override
223     public void wakeup(IoEventLoop eventLoop) {
224         if (!eventLoop.inEventLoop() && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
225             // write to the evfd which will then wake-up epoll_wait(...)
226             Native.eventFdWrite(eventFd.intValue(), 1L);
227         }
228     }
229 
230     @Override
231     public void prepareToDestroy() {
232         // Using the intermediate collection to prevent ConcurrentModificationException.
233         // In the `close()` method, the channel is deleted from `channels` map.
234         DefaultEpollIoRegistration[] copy = registrations.values().toArray(new DefaultEpollIoRegistration[0]);
235 
236         for (DefaultEpollIoRegistration reg: copy) {
237             reg.close();
238         }
239     }
240 
241     @Override
242     public void destroy() {
243         try {
244             closeFileDescriptors();
245         } finally {
246             // release native memory
247             if (iovArray != null) {
248                 iovArray.release();
249                 iovArray = null;
250             }
251             if (datagramPacketArray != null) {
252                 datagramPacketArray.release();
253                 datagramPacketArray = null;
254             }
255             events.free();
256         }
257     }
258 
259     private final class DefaultEpollIoRegistration extends AtomicBoolean implements EpollIoRegistration {
260         private final IoEventLoop eventLoop;
261         final EpollIoHandle handle;
262 
263         DefaultEpollIoRegistration(IoEventLoop eventLoop, EpollIoHandle handle) {
264             this.eventLoop = eventLoop;
265             this.handle = handle;
266         }
267 
268         @Override
269         public long submit(IoOps ops) throws Exception {
270             EpollIoOps epollIoOps = cast(ops);
271             try {
272                 if (!isValid()) {
273                     return -1;
274                 }
275                 Native.epollCtlMod(epollFd.intValue(), handle.fd().intValue(), epollIoOps.value);
276                 return epollIoOps.value;
277             } catch (IOException e) {
278                 throw e;
279             } catch (Exception e) {
280                 throw new IOException(e);
281             }
282         }
283 
284         @Override
285         public EpollIoHandler ioHandler() {
286             return EpollIoHandler.this;
287         }
288 
289         @Override
290         public boolean isValid() {
291             return !get();
292         }
293 
294         @Override
295         public void cancel() throws IOException {
296             if (getAndSet(true)) {
297                 return;
298             }
299             if (handle.fd().isOpen()) {
300                 // Remove the fd registration from epoll. This is only needed if it's still open as otherwise it will
301                 // be automatically removed once the file-descriptor is closed.
302                 Native.epollCtlDel(epollFd.intValue(), handle.fd().intValue());
303             }
304             if (eventLoop.inEventLoop()) {
305                 cancel0();
306             } else {
307                 eventLoop.execute(this::cancel0);
308             }
309         }
310 
311         private void cancel0() {
312             int fd = handle.fd().intValue();
313             DefaultEpollIoRegistration old = registrations.remove(fd);
314             if (old != null) {
315                 if (old != this) {
316                     // The Channel mapping was already replaced due FD reuse, put back the stored Channel.
317                     registrations.put(fd, old);
318                 } else if (old.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
319                     numChannels--;
320                 }
321             }
322         }
323 
324         void close() {
325             try {
326                 cancel();
327             } catch (Exception e) {
328                 logger.debug("Exception during canceling " + this, e);
329             }
330             try {
331                 handle.close();
332             } catch (Exception e) {
333                 logger.debug("Exception during closing " + handle, e);
334             }
335         }
336 
337         void handle(long ev) {
338             handle.handle(this, EpollIoOps.eventOf((int) ev));
339         }
340     }
341 
342     @Override
343     public EpollIoRegistration register(IoEventLoop eventLoop, IoHandle handle)
344             throws Exception {
345         final EpollIoHandle epollHandle = cast(handle);
346         DefaultEpollIoRegistration registration = new DefaultEpollIoRegistration(eventLoop, epollHandle);
347         int fd = epollHandle.fd().intValue();
348         Native.epollCtlAdd(epollFd.intValue(), fd, EpollIoOps.EPOLLERR.value);
349         DefaultEpollIoRegistration old = registrations.put(fd, registration);
350 
351         // We either expect to have no registration in the map with the same FD or that the FD of the old registration
352         // is already closed.
353         assert old == null || !old.isValid();
354 
355         if (epollHandle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
356             numChannels++;
357         }
358         return registration;
359     }
360 
361     @Override
362     public boolean isCompatible(Class<? extends IoHandle> handleType) {
363         return EpollIoHandle.class.isAssignableFrom(handleType);
364     }
365 
366     int numRegisteredChannels() {
367         return numChannels;
368     }
369 
370     List<Channel> registeredChannelsList() {
371         IntObjectMap<DefaultEpollIoRegistration> ch = registrations;
372         if (ch.isEmpty()) {
373             return Collections.emptyList();
374         }
375 
376         List<Channel> channels = new ArrayList<>(ch.size());
377         for (DefaultEpollIoRegistration registration : ch.values()) {
378             if (registration.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
379                 channels.add(((AbstractEpollChannel.AbstractEpollUnsafe) registration.handle).channel());
380             }
381         }
382         return Collections.unmodifiableList(channels);
383     }
384 
385     private long epollWait(IoExecutionContext context, long deadlineNanos) throws IOException {
386         if (deadlineNanos == NONE) {
387             return Native.epollWait(epollFd, events, timerFd,
388                     Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD); // disarm timer
389         }
390         long totalDelay = context.delayNanos(System.nanoTime());
391         int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
392         int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
393         return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
394     }
395 
396     private int epollWaitNoTimerChange() throws IOException {
397         return Native.epollWait(epollFd, events, false);
398     }
399 
400     private int epollWaitNow() throws IOException {
401         return Native.epollWait(epollFd, events, true);
402     }
403 
404     private int epollBusyWait() throws IOException {
405         return Native.epollBusyWait(epollFd, events);
406     }
407 
408     private int epollWaitTimeboxed() throws IOException {
409         // Wait with 1 second "safeguard" timeout
410         return Native.epollWait(epollFd, events, 1000);
411     }
412 
413     @Override
414     public int run(IoExecutionContext context) {
415         int handled = 0;
416         try {
417             int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
418             switch (strategy) {
419                 case SelectStrategy.CONTINUE:
420                     return 0;
421 
422                 case SelectStrategy.BUSY_WAIT:
423                     strategy = epollBusyWait();
424                     break;
425 
426                 case SelectStrategy.SELECT:
427                     if (pendingWakeup) {
428                         // We are going to be immediately woken so no need to reset wakenUp
429                         // or check for timerfd adjustment.
430                         strategy = epollWaitTimeboxed();
431                         if (strategy != 0) {
432                             break;
433                         }
434                         // We timed out so assume that we missed the write event due to an
435                         // abnormally failed syscall (the write itself or a prior epoll_wait)
436                         logger.warn("Missed eventfd write (not seen after > 1 second)");
437                         pendingWakeup = false;
438                         if (!context.canBlock()) {
439                             break;
440                         }
441                         // fall-through
442                     }
443 
444                     long curDeadlineNanos = context.deadlineNanos();
445                     if (curDeadlineNanos == -1L) {
446                         curDeadlineNanos = NONE; // nothing on the calendar
447                     }
448                     nextWakeupNanos.set(curDeadlineNanos);
449                     try {
450                         if (context.canBlock()) {
451                             if (curDeadlineNanos == prevDeadlineNanos) {
452                                 // No timer activity needed
453                                 strategy = epollWaitNoTimerChange();
454                             } else {
455                                 // Timerfd needs to be re-armed or disarmed
456                                 long result = epollWait(context, curDeadlineNanos);
457                                 // The result contains the actual return value and if a timer was used or not.
458                                 // We need to "unpack" using the helper methods exposed in Native.
459                                 strategy = Native.epollReady(result);
460                                 prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
461                             }
462                         }
463                     } finally {
464                         // Try get() first to avoid much more expensive CAS in the case we
465                         // were woken via the wakeup() method (submitted task)
466                         if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
467                             pendingWakeup = true;
468                         }
469                     }
470                     // fallthrough
471                 default:
472             }
473             if (strategy > 0) {
474                 handled = strategy;
475                 if (processReady(events, strategy)) {
476                     prevDeadlineNanos = NONE;
477                 }
478             }
479             if (allowGrowing && strategy == events.length()) {
480                 //increase the size of the array as we needed the whole space for the events
481                 events.increase();
482             }
483         } catch (Error e) {
484             throw e;
485         } catch (Throwable t) {
486             handleLoopException(t);
487         }
488         return handled;
489     }
490 
491     /**
492      * Visible only for testing!
493      */
494     void handleLoopException(Throwable t) {
495         logger.warn("Unexpected exception in the selector loop.", t);
496 
497         // Prevent possible consecutive immediate failures that lead to
498         // excessive CPU consumption.
499         try {
500             Thread.sleep(1000);
501         } catch (InterruptedException e) {
502             // Ignore.
503         }
504     }
505 
506     // Returns true if a timerFd event was encountered
507     private boolean processReady(EpollEventArray events, int ready) {
508         boolean timerFired = false;
509         for (int i = 0; i < ready; i ++) {
510             final int fd = events.fd(i);
511             if (fd == eventFd.intValue()) {
512                 pendingWakeup = false;
513             } else if (fd == timerFd.intValue()) {
514                 timerFired = true;
515             } else {
516                 final long ev = events.events(i);
517 
518                 DefaultEpollIoRegistration registration = registrations.get(fd);
519                 if (registration != null) {
520                     registration.handle(ev);
521                 } else {
522                     // We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
523                     try {
524                         Native.epollCtlDel(epollFd.intValue(), fd);
525                     } catch (IOException ignore) {
526                         // This can happen but is nothing we need to worry about as we only try to delete
527                         // the fd from the epoll set as we not found it in our mappings. So this call to
528                         // epollCtlDel(...) is just to ensure we cleanup stuff and so may fail if it was
529                         // deleted before or the file descriptor was closed before.
530                     }
531                 }
532             }
533         }
534         return timerFired;
535     }
536 
537     /**
538      * This method is intended for use by process checkpoint/restore
539      * integration, such as OpenJDK CRaC.
540      * It's up to the caller to ensure that there is no concurrent use
541      * of the FDs while these are closed, e.g. by blocking the executor.
542      */
543     @UnstableApi
544     public void closeFileDescriptors() {
545         // Ensure any in-flight wakeup writes have been performed prior to closing eventFd.
546         while (pendingWakeup) {
547             try {
548                 int count = epollWaitTimeboxed();
549                 if (count == 0) {
550                     // We timed-out so assume that the write we're expecting isn't coming
551                     break;
552                 }
553                 for (int i = 0; i < count; i++) {
554                     if (events.fd(i) == eventFd.intValue()) {
555                         pendingWakeup = false;
556                         break;
557                     }
558                 }
559             } catch (IOException ignore) {
560                 // ignore
561             }
562         }
563         try {
564             eventFd.close();
565         } catch (IOException e) {
566             logger.warn("Failed to close the event fd.", e);
567         }
568         try {
569             timerFd.close();
570         } catch (IOException e) {
571             logger.warn("Failed to close the timer fd.", e);
572         }
573 
574         try {
575             epollFd.close();
576         } catch (IOException e) {
577             logger.warn("Failed to close the epoll fd.", e);
578         }
579     }
580 }