View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.DefaultSelectStrategyFactory;
20  import io.netty.channel.IoHandlerContext;
21  import io.netty.channel.IoHandle;
22  import io.netty.channel.IoHandler;
23  import io.netty.channel.IoHandlerFactory;
24  import io.netty.channel.IoOps;
25  import io.netty.channel.IoRegistration;
26  import io.netty.channel.SelectStrategy;
27  import io.netty.channel.SelectStrategyFactory;
28  import io.netty.channel.unix.FileDescriptor;
29  import io.netty.util.IntSupplier;
30  import io.netty.util.collection.IntObjectHashMap;
31  import io.netty.util.collection.IntObjectMap;
32  import io.netty.util.concurrent.ThreadAwareExecutor;
33  import io.netty.util.internal.ObjectUtil;
34  import io.netty.util.internal.StringUtil;
35  import io.netty.util.internal.SystemPropertyUtil;
36  import io.netty.util.internal.UnstableApi;
37  import io.netty.util.internal.logging.InternalLogger;
38  import io.netty.util.internal.logging.InternalLoggerFactory;
39  
40  import java.io.IOException;
41  import java.io.UncheckedIOException;
42  import java.util.ArrayList;
43  import java.util.Collections;
44  import java.util.List;
45  import java.util.concurrent.atomic.AtomicBoolean;
46  import java.util.concurrent.atomic.AtomicLong;
47  
48  import static java.lang.Math.min;
49  import static java.lang.System.nanoTime;
50  
51  /**
52   * {@link IoHandler} which uses epoll under the covers. Only works on Linux!
53   */
54  public class EpollIoHandler implements IoHandler {
55      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollIoHandler.class);
56      private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
57              SystemPropertyUtil.getLong("io.netty.channel.epoll.epollWaitThreshold", 10);
58  
59      static {
60          // Ensure JNI is initialized by the time this class is loaded by this time!
61          // We use unix-common methods in this class which are backed by JNI methods.
62          Epoll.ensureAvailability();
63      }
64  
65      // Pick a number that no task could have previously used.
66      private long prevDeadlineNanos = nanoTime() - 1;
67      private FileDescriptor epollFd;
68      private FileDescriptor eventFd;
69      private FileDescriptor timerFd;
70      private final IntObjectMap<DefaultEpollIoRegistration> registrations = new IntObjectHashMap<>(4096);
71      private final boolean allowGrowing;
72      private final EpollEventArray events;
73      private final NativeArrays nativeArrays;
74  
75      private final SelectStrategy selectStrategy;
76      private final IntSupplier selectNowSupplier = new IntSupplier() {
77          @Override
78          public int get() throws Exception {
79              return epollWaitNow();
80          }
81      };
82      private final ThreadAwareExecutor executor;
83  
84      private static final long AWAKE = -1L;
85      private static final long NONE = Long.MAX_VALUE;
86  
87      // nextWakeupNanos is:
88      //    AWAKE            when EL is awake
89      //    NONE             when EL is waiting with no wakeup scheduled
90      //    other value T    when EL is waiting with wakeup scheduled at time T
91      private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
92      private boolean pendingWakeup;
93  
94      private int numChannels;
95  
96      // See https://man7.org/linux/man-pages/man2/timerfd_create.2.html.
97      private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
98  
99      /**
100      * Returns a new {@link IoHandlerFactory} that creates {@link EpollIoHandler} instances.
101      */
102     public static IoHandlerFactory newFactory() {
103         return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
104     }
105 
106     /**
107      * Returns a new {@link IoHandlerFactory} that creates {@link EpollIoHandler} instances.
108      */
109     public static IoHandlerFactory newFactory(final int maxEvents,
110                                               final SelectStrategyFactory selectStrategyFactory) {
111         ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
112         ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
113         return new IoHandlerFactory() {
114             @Override
115             public IoHandler newHandler(ThreadAwareExecutor executor) {
116                 return new EpollIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
117             }
118 
119             @Override
120             public boolean isChangingThreadSupported() {
121                 return true;
122             }
123         };
124     }
125 
126     // Package-private for testing
127     EpollIoHandler(ThreadAwareExecutor executor, int maxEvents, SelectStrategy strategy) {
128         this.executor = ObjectUtil.checkNotNull(executor, "executor");
129         selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
130         if (maxEvents == 0) {
131             allowGrowing = true;
132             events = new EpollEventArray(4096);
133         } else {
134             allowGrowing = false;
135             events = new EpollEventArray(maxEvents);
136         }
137         nativeArrays = new NativeArrays();
138         openFileDescriptors();
139     }
140 
141     private static EpollIoHandle cast(IoHandle handle) {
142         if (handle instanceof EpollIoHandle) {
143             return (EpollIoHandle) handle;
144         }
145         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
146     }
147 
148     private static EpollIoOps cast(IoOps ops) {
149         if (ops instanceof EpollIoOps) {
150             return (EpollIoOps) ops;
151         }
152         throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
153     }
154 
155     /**
156      * This method is intended for use by a process checkpoint/restore
157      * integration, such as OpenJDK CRaC.
158      */
159     @UnstableApi
160     public void openFileDescriptors() {
161         boolean success = false;
162         FileDescriptor epollFd = null;
163         FileDescriptor eventFd = null;
164         FileDescriptor timerFd = null;
165         try {
166             this.epollFd = epollFd = Native.newEpollCreate();
167             this.eventFd = eventFd = Native.newEventFd();
168             try {
169                 // It is important to use EPOLLET here as we only want to get the notification once per
170                 // wakeup and don't call eventfd_read(...).
171                 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
172             } catch (IOException e) {
173                 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
174             }
175             this.timerFd = timerFd = Native.newTimerFd();
176             try {
177                 // It is important to use EPOLLET here as we only want to get the notification once per
178                 // wakeup and don't call read(...).
179                 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
180             } catch (IOException e) {
181                 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
182             }
183             success = true;
184         } finally {
185             if (!success) {
186                 closeFileDescriptor(epollFd);
187                 closeFileDescriptor(eventFd);
188                 closeFileDescriptor(timerFd);
189             }
190         }
191     }
192 
193     private static void closeFileDescriptor(FileDescriptor fd) {
194         if (fd != null) {
195             try {
196                 fd.close();
197             } catch (Exception e) {
198                 // ignore
199             }
200         }
201     }
202 
203     @Override
204     public void wakeup() {
205         if (!executor.isExecutorThread(Thread.currentThread()) && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
206             // write to the evfd which will then wake-up epoll_wait(...)
207             Native.eventFdWrite(eventFd.intValue(), 1L);
208         }
209     }
210 
211     @Override
212     public void prepareToDestroy() {
213         // Using the intermediate collection to prevent ConcurrentModificationException.
214         // In the `close()` method, the channel is deleted from `channels` map.
215         DefaultEpollIoRegistration[] copy = registrations.values().toArray(new DefaultEpollIoRegistration[0]);
216 
217         for (DefaultEpollIoRegistration reg: copy) {
218             reg.close();
219         }
220     }
221 
222     @Override
223     public void destroy() {
224         try {
225             closeFileDescriptors();
226         } finally {
227             nativeArrays.free();
228             events.free();
229         }
230     }
231 
232     private final class DefaultEpollIoRegistration implements IoRegistration {
233         private final ThreadAwareExecutor executor;
234         private final AtomicBoolean canceled = new AtomicBoolean();
235         final EpollIoHandle handle;
236 
237         DefaultEpollIoRegistration(ThreadAwareExecutor executor, EpollIoHandle handle) {
238             this.executor = executor;
239             this.handle = handle;
240         }
241 
242         @SuppressWarnings("unchecked")
243         @Override
244         public <T> T attachment() {
245             return (T) nativeArrays;
246         }
247 
248         @Override
249         public long submit(IoOps ops) {
250             EpollIoOps epollIoOps = cast(ops);
251             try {
252                 if (!isValid()) {
253                     return -1;
254                 }
255                 Native.epollCtlMod(epollFd.intValue(), handle.fd().intValue(), epollIoOps.value);
256                 return epollIoOps.value;
257             } catch (IOException e) {
258                 throw new UncheckedIOException(e);
259             }
260         }
261 
262         @Override
263         public boolean isValid() {
264             return !canceled.get();
265         }
266 
267         @Override
268         public boolean cancel() {
269             if (!canceled.compareAndSet(false, true)) {
270                 return false;
271             }
272             if (executor.isExecutorThread(Thread.currentThread())) {
273                 cancel0();
274             } else {
275                 executor.execute(this::cancel0);
276             }
277             return true;
278         }
279 
280         private void cancel0() {
281             int fd = handle.fd().intValue();
282             DefaultEpollIoRegistration old = registrations.remove(fd);
283             if (old != null) {
284                 if (old != this) {
285                     // The Channel mapping was already replaced due FD reuse, put back the stored Channel.
286                     registrations.put(fd, old);
287                     return;
288                 } else if (old.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
289                     numChannels--;
290                 }
291                 if (handle.fd().isOpen()) {
292                     try {
293                         // Remove the fd registration from epoll. This is only needed if it's still open as otherwise
294                         // it will be automatically removed once the file-descriptor is closed.
295                         Native.epollCtlDel(epollFd.intValue(), fd);
296                     } catch (IOException e) {
297                         logger.debug("Unable to remove fd {} from epoll {}", fd, epollFd.intValue());
298                     }
299                 }
300             }
301         }
302 
303         void close() {
304             try {
305                 cancel();
306             } catch (Exception e) {
307                 logger.debug("Exception during canceling " + this, e);
308             }
309             try {
310                 handle.close();
311             } catch (Exception e) {
312                 logger.debug("Exception during closing " + handle, e);
313             }
314         }
315 
316         void handle(long ev) {
317             handle.handle(this, EpollIoOps.eventOf((int) ev));
318         }
319     }
320 
321     @Override
322     public IoRegistration register(IoHandle handle)
323             throws Exception {
324         final EpollIoHandle epollHandle = cast(handle);
325         DefaultEpollIoRegistration registration = new DefaultEpollIoRegistration(executor, epollHandle);
326         int fd = epollHandle.fd().intValue();
327         Native.epollCtlAdd(epollFd.intValue(), fd, EpollIoOps.EPOLLERR.value);
328         DefaultEpollIoRegistration old = registrations.put(fd, registration);
329 
330         // We either expect to have no registration in the map with the same FD or that the FD of the old registration
331         // is already closed.
332         assert old == null || !old.isValid();
333 
334         if (epollHandle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
335             numChannels++;
336         }
337         return registration;
338     }
339 
340     @Override
341     public boolean isCompatible(Class<? extends IoHandle> handleType) {
342         return EpollIoHandle.class.isAssignableFrom(handleType);
343     }
344 
345     int numRegisteredChannels() {
346         return numChannels;
347     }
348 
349     List<Channel> registeredChannelsList() {
350         IntObjectMap<DefaultEpollIoRegistration> ch = registrations;
351         if (ch.isEmpty()) {
352             return Collections.emptyList();
353         }
354 
355         List<Channel> channels = new ArrayList<>(ch.size());
356         for (DefaultEpollIoRegistration registration : ch.values()) {
357             if (registration.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
358                 channels.add(((AbstractEpollChannel.AbstractEpollUnsafe) registration.handle).channel());
359             }
360         }
361         return Collections.unmodifiableList(channels);
362     }
363 
364     private long epollWait(IoHandlerContext context, long deadlineNanos) throws IOException {
365         if (deadlineNanos == NONE) {
366             return Native.epollWait(epollFd, events, timerFd,
367                     Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD); // disarm timer
368         }
369         long totalDelay = context.delayNanos(System.nanoTime());
370         int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
371         int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
372         return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
373     }
374 
375     private int epollWaitNoTimerChange() throws IOException {
376         return Native.epollWait(epollFd, events, false);
377     }
378 
379     private int epollWaitNow() throws IOException {
380         return Native.epollWait(epollFd, events, true);
381     }
382 
383     private int epollBusyWait() throws IOException {
384         return Native.epollBusyWait(epollFd, events);
385     }
386 
387     private int epollWaitTimeboxed() throws IOException {
388         // Wait with 1 second "safeguard" timeout
389         return Native.epollWait(epollFd, events, 1000);
390     }
391 
392     @Override
393     public int run(IoHandlerContext context) {
394         int handled = 0;
395         try {
396             int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
397             switch (strategy) {
398                 case SelectStrategy.CONTINUE:
399                     if (context.shouldReportActiveIoTime()) {
400                         context.reportActiveIoTime(0); // Report zero as we did no I/O.
401                     }
402                     return 0;
403 
404                 case SelectStrategy.BUSY_WAIT:
405                     strategy = epollBusyWait();
406                     break;
407 
408                 case SelectStrategy.SELECT:
409                     if (pendingWakeup) {
410                         // We are going to be immediately woken so no need to reset wakenUp
411                         // or check for timerfd adjustment.
412                         strategy = epollWaitTimeboxed();
413                         if (strategy != 0) {
414                             break;
415                         }
416                         // We timed out so assume that we missed the write event due to an
417                         // abnormally failed syscall (the write itself or a prior epoll_wait)
418                         logger.warn("Missed eventfd write (not seen after > 1 second)");
419                         pendingWakeup = false;
420                         if (!context.canBlock()) {
421                             break;
422                         }
423                         // fall-through
424                     }
425 
426                     long curDeadlineNanos = context.deadlineNanos();
427                     if (curDeadlineNanos == -1L) {
428                         curDeadlineNanos = NONE; // nothing on the calendar
429                     }
430                     nextWakeupNanos.set(curDeadlineNanos);
431                     try {
432                         if (context.canBlock()) {
433                             if (curDeadlineNanos == prevDeadlineNanos) {
434                                 // No timer activity needed
435                                 strategy = epollWaitNoTimerChange();
436                             } else {
437                                 // Timerfd needs to be re-armed or disarmed
438                                 long result = epollWait(context, curDeadlineNanos);
439                                 // The result contains the actual return value and if a timer was used or not.
440                                 // We need to "unpack" using the helper methods exposed in Native.
441                                 strategy = Native.epollReady(result);
442                                 prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
443                             }
444                         }
445                     } finally {
446                         // Try get() first to avoid much more expensive CAS in the case we
447                         // were woken via the wakeup() method (submitted task)
448                         if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
449                             pendingWakeup = true;
450                         }
451                     }
452                     // fallthrough
453                 default:
454             }
455             if (strategy > 0) {
456                 handled = strategy;
457                 if (context.shouldReportActiveIoTime()) {
458                     long activeIoStartTimeNanos = System.nanoTime();
459                     if (processReady(events, strategy)) {
460                         prevDeadlineNanos = NONE;
461                     }
462                     long activeIoEndTimeNanos = System.nanoTime();
463                     context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
464                 } else {
465                     if (processReady(events, strategy)) {
466                         prevDeadlineNanos = NONE;
467                     }
468                 }
469             } else if (context.shouldReportActiveIoTime()) {
470                 context.reportActiveIoTime(0);
471             }
472 
473             if (allowGrowing && strategy == events.length()) {
474                 //increase the size of the array as we needed the whole space for the events
475                 events.increase();
476             }
477         } catch (Error e) {
478             throw e;
479         } catch (Throwable t) {
480             handleLoopException(t);
481         }
482         return handled;
483     }
484 
485     /**
486      * Visible only for testing!
487      */
488     void handleLoopException(Throwable t) {
489         logger.warn("Unexpected exception in the selector loop.", t);
490 
491         // Prevent possible consecutive immediate failures that lead to
492         // excessive CPU consumption.
493         try {
494             Thread.sleep(1000);
495         } catch (InterruptedException e) {
496             // Ignore.
497         }
498     }
499 
500     // Returns true if a timerFd event was encountered
501     private boolean processReady(EpollEventArray events, int ready) {
502         boolean timerFired = false;
503         for (int i = 0; i < ready; i ++) {
504             final int fd = events.fd(i);
505             if (fd == eventFd.intValue()) {
506                 pendingWakeup = false;
507             } else if (fd == timerFd.intValue()) {
508                 timerFired = true;
509             } else {
510                 final long ev = events.events(i);
511 
512                 DefaultEpollIoRegistration registration = registrations.get(fd);
513                 if (registration != null) {
514                     registration.handle(ev);
515                 } else {
516                     // We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
517                     try {
518                         Native.epollCtlDel(epollFd.intValue(), fd);
519                     } catch (IOException ignore) {
520                         // This can happen but is nothing we need to worry about as we only try to delete
521                         // the fd from the epoll set as we not found it in our mappings. So this call to
522                         // epollCtlDel(...) is just to ensure we cleanup stuff and so may fail if it was
523                         // deleted before or the file descriptor was closed before.
524                     }
525                 }
526             }
527         }
528         return timerFired;
529     }
530 
531     /**
532      * This method is intended for use by process checkpoint/restore
533      * integration, such as OpenJDK CRaC.
534      * It's up to the caller to ensure that there is no concurrent use
535      * of the FDs while these are closed, e.g. by blocking the executor.
536      */
537     @UnstableApi
538     public void closeFileDescriptors() {
539         // Ensure any in-flight wakeup writes have been performed prior to closing eventFd.
540         while (pendingWakeup) {
541             try {
542                 int count = epollWaitTimeboxed();
543                 if (count == 0) {
544                     // We timed-out so assume that the write we're expecting isn't coming
545                     break;
546                 }
547                 for (int i = 0; i < count; i++) {
548                     if (events.fd(i) == eventFd.intValue()) {
549                         pendingWakeup = false;
550                         break;
551                     }
552                 }
553             } catch (IOException ignore) {
554                 // ignore
555             }
556         }
557         try {
558             eventFd.close();
559         } catch (IOException e) {
560             logger.warn("Failed to close the event fd.", e);
561         }
562         try {
563             timerFd.close();
564         } catch (IOException e) {
565             logger.warn("Failed to close the timer fd.", e);
566         }
567 
568         try {
569             epollFd.close();
570         } catch (IOException e) {
571             logger.warn("Failed to close the epoll fd.", e);
572         }
573     }
574 }