View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.DefaultSelectStrategyFactory;
20  import io.netty.channel.IoHandlerContext;
21  import io.netty.channel.IoHandle;
22  import io.netty.channel.IoHandler;
23  import io.netty.channel.IoHandlerFactory;
24  import io.netty.channel.IoOps;
25  import io.netty.channel.IoRegistration;
26  import io.netty.channel.SelectStrategy;
27  import io.netty.channel.SelectStrategyFactory;
28  import io.netty.channel.unix.FileDescriptor;
29  import io.netty.util.IntSupplier;
30  import io.netty.util.collection.IntObjectHashMap;
31  import io.netty.util.collection.IntObjectMap;
32  import io.netty.util.concurrent.ThreadAwareExecutor;
33  import io.netty.util.internal.ObjectUtil;
34  import io.netty.util.internal.StringUtil;
35  import io.netty.util.internal.SystemPropertyUtil;
36  import io.netty.util.internal.UnstableApi;
37  import io.netty.util.internal.logging.InternalLogger;
38  import io.netty.util.internal.logging.InternalLoggerFactory;
39  
40  import java.io.IOException;
41  import java.io.UncheckedIOException;
42  import java.util.ArrayList;
43  import java.util.Collections;
44  import java.util.List;
45  import java.util.concurrent.atomic.AtomicBoolean;
46  import java.util.concurrent.atomic.AtomicLong;
47  
48  import static java.lang.Math.min;
49  import static java.lang.System.nanoTime;
50  
51  /**
52   * {@link IoHandler} which uses epoll under the covers. Only works on Linux!
53   */
54  public class EpollIoHandler implements IoHandler {
55      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollIoHandler.class);
56      private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
57              SystemPropertyUtil.getLong("io.netty.channel.epoll.epollWaitThreshold", 10);
58  
59      static {
60          // Ensure JNI is initialized by the time this class is loaded by this time!
61          // We use unix-common methods in this class which are backed by JNI methods.
62          Epoll.ensureAvailability();
63      }
64  
65      // Pick a number that no task could have previously used.
66      private long prevDeadlineNanos = nanoTime() - 1;
67      private FileDescriptor epollFd;
68      private FileDescriptor eventFd;
69      private FileDescriptor timerFd;
70      private final IntObjectMap<DefaultEpollIoRegistration> registrations = new IntObjectHashMap<>(4096);
71      private final boolean allowGrowing;
72      private final EpollEventArray events;
73      private final NativeArrays nativeArrays;
74  
75      private final SelectStrategy selectStrategy;
76      private final IntSupplier selectNowSupplier = new IntSupplier() {
77          @Override
78          public int get() throws Exception {
79              return epollWaitNow();
80          }
81      };
82      private final ThreadAwareExecutor executor;
83  
84      private static final long AWAKE = -1L;
85      private static final long NONE = Long.MAX_VALUE;
86  
87      // nextWakeupNanos is:
88      //    AWAKE            when EL is awake
89      //    NONE             when EL is waiting with no wakeup scheduled
90      //    other value T    when EL is waiting with wakeup scheduled at time T
91      private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
92      private boolean pendingWakeup;
93  
94      private int numChannels;
95  
96      // See https://man7.org/linux/man-pages/man2/timerfd_create.2.html.
97      private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
98  
99      /**
100      * Returns a new {@link IoHandlerFactory} that creates {@link EpollIoHandler} instances.
101      */
102     public static IoHandlerFactory newFactory() {
103         return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
104     }
105 
106     /**
107      * Returns a new {@link IoHandlerFactory} that creates {@link EpollIoHandler} instances.
108      */
109     public static IoHandlerFactory newFactory(final int maxEvents,
110                                               final SelectStrategyFactory selectStrategyFactory) {
111         ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
112         ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
113         return executor -> new EpollIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
114     }
115 
116     // Package-private for testing
117     EpollIoHandler(ThreadAwareExecutor executor, int maxEvents, SelectStrategy strategy) {
118         this.executor = ObjectUtil.checkNotNull(executor, "executor");
119         selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
120         if (maxEvents == 0) {
121             allowGrowing = true;
122             events = new EpollEventArray(4096);
123         } else {
124             allowGrowing = false;
125             events = new EpollEventArray(maxEvents);
126         }
127         nativeArrays = new NativeArrays();
128         openFileDescriptors();
129     }
130 
131     private static EpollIoHandle cast(IoHandle handle) {
132         if (handle instanceof EpollIoHandle) {
133             return (EpollIoHandle) handle;
134         }
135         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
136     }
137 
138     private static EpollIoOps cast(IoOps ops) {
139         if (ops instanceof EpollIoOps) {
140             return (EpollIoOps) ops;
141         }
142         throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
143     }
144 
145     /**
146      * This method is intended for use by a process checkpoint/restore
147      * integration, such as OpenJDK CRaC.
148      */
149     @UnstableApi
150     public void openFileDescriptors() {
151         boolean success = false;
152         FileDescriptor epollFd = null;
153         FileDescriptor eventFd = null;
154         FileDescriptor timerFd = null;
155         try {
156             this.epollFd = epollFd = Native.newEpollCreate();
157             this.eventFd = eventFd = Native.newEventFd();
158             try {
159                 // It is important to use EPOLLET here as we only want to get the notification once per
160                 // wakeup and don't call eventfd_read(...).
161                 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
162             } catch (IOException e) {
163                 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
164             }
165             this.timerFd = timerFd = Native.newTimerFd();
166             try {
167                 // It is important to use EPOLLET here as we only want to get the notification once per
168                 // wakeup and don't call read(...).
169                 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
170             } catch (IOException e) {
171                 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
172             }
173             success = true;
174         } finally {
175             if (!success) {
176                 closeFileDescriptor(epollFd);
177                 closeFileDescriptor(eventFd);
178                 closeFileDescriptor(timerFd);
179             }
180         }
181     }
182 
183     private static void closeFileDescriptor(FileDescriptor fd) {
184         if (fd != null) {
185             try {
186                 fd.close();
187             } catch (Exception e) {
188                 // ignore
189             }
190         }
191     }
192 
193     @Override
194     public void wakeup() {
195         if (!executor.isExecutorThread(Thread.currentThread()) && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
196             // write to the evfd which will then wake-up epoll_wait(...)
197             Native.eventFdWrite(eventFd.intValue(), 1L);
198         }
199     }
200 
201     @Override
202     public void prepareToDestroy() {
203         // Using the intermediate collection to prevent ConcurrentModificationException.
204         // In the `close()` method, the channel is deleted from `channels` map.
205         DefaultEpollIoRegistration[] copy = registrations.values().toArray(new DefaultEpollIoRegistration[0]);
206 
207         for (DefaultEpollIoRegistration reg: copy) {
208             reg.close();
209         }
210     }
211 
212     @Override
213     public void destroy() {
214         try {
215             closeFileDescriptors();
216         } finally {
217             nativeArrays.free();
218             events.free();
219         }
220     }
221 
222     private final class DefaultEpollIoRegistration implements IoRegistration {
223         private final ThreadAwareExecutor executor;
224         private final AtomicBoolean canceled = new AtomicBoolean();
225         final EpollIoHandle handle;
226 
227         DefaultEpollIoRegistration(ThreadAwareExecutor executor, EpollIoHandle handle) {
228             this.executor = executor;
229             this.handle = handle;
230         }
231 
232         @SuppressWarnings("unchecked")
233         @Override
234         public <T> T attachment() {
235             return (T) nativeArrays;
236         }
237 
238         @Override
239         public long submit(IoOps ops) {
240             EpollIoOps epollIoOps = cast(ops);
241             try {
242                 if (!isValid()) {
243                     return -1;
244                 }
245                 Native.epollCtlMod(epollFd.intValue(), handle.fd().intValue(), epollIoOps.value);
246                 return epollIoOps.value;
247             } catch (IOException e) {
248                 throw new UncheckedIOException(e);
249             }
250         }
251 
252         @Override
253         public boolean isValid() {
254             return !canceled.get();
255         }
256 
257         @Override
258         public boolean cancel() {
259             if (!canceled.compareAndSet(false, true)) {
260                 return false;
261             }
262             if (executor.isExecutorThread(Thread.currentThread())) {
263                 cancel0();
264             } else {
265                 executor.execute(this::cancel0);
266             }
267             return true;
268         }
269 
270         private void cancel0() {
271             int fd = handle.fd().intValue();
272             DefaultEpollIoRegistration old = registrations.remove(fd);
273             if (old != null) {
274                 if (old != this) {
275                     // The Channel mapping was already replaced due FD reuse, put back the stored Channel.
276                     registrations.put(fd, old);
277                     return;
278                 } else if (old.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
279                     numChannels--;
280                 }
281                 if (handle.fd().isOpen()) {
282                     try {
283                         // Remove the fd registration from epoll. This is only needed if it's still open as otherwise
284                         // it will be automatically removed once the file-descriptor is closed.
285                         Native.epollCtlDel(epollFd.intValue(), fd);
286                     } catch (IOException e) {
287                         logger.debug("Unable to remove fd {} from epoll {}", fd, epollFd.intValue());
288                     }
289                 }
290             }
291         }
292 
293         void close() {
294             try {
295                 cancel();
296             } catch (Exception e) {
297                 logger.debug("Exception during canceling " + this, e);
298             }
299             try {
300                 handle.close();
301             } catch (Exception e) {
302                 logger.debug("Exception during closing " + handle, e);
303             }
304         }
305 
306         void handle(long ev) {
307             handle.handle(this, EpollIoOps.eventOf((int) ev));
308         }
309     }
310 
311     @Override
312     public IoRegistration register(IoHandle handle)
313             throws Exception {
314         final EpollIoHandle epollHandle = cast(handle);
315         DefaultEpollIoRegistration registration = new DefaultEpollIoRegistration(executor, epollHandle);
316         int fd = epollHandle.fd().intValue();
317         Native.epollCtlAdd(epollFd.intValue(), fd, EpollIoOps.EPOLLERR.value);
318         DefaultEpollIoRegistration old = registrations.put(fd, registration);
319 
320         // We either expect to have no registration in the map with the same FD or that the FD of the old registration
321         // is already closed.
322         assert old == null || !old.isValid();
323 
324         if (epollHandle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
325             numChannels++;
326         }
327         return registration;
328     }
329 
330     @Override
331     public boolean isCompatible(Class<? extends IoHandle> handleType) {
332         return EpollIoHandle.class.isAssignableFrom(handleType);
333     }
334 
335     int numRegisteredChannels() {
336         return numChannels;
337     }
338 
339     List<Channel> registeredChannelsList() {
340         IntObjectMap<DefaultEpollIoRegistration> ch = registrations;
341         if (ch.isEmpty()) {
342             return Collections.emptyList();
343         }
344 
345         List<Channel> channels = new ArrayList<>(ch.size());
346         for (DefaultEpollIoRegistration registration : ch.values()) {
347             if (registration.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
348                 channels.add(((AbstractEpollChannel.AbstractEpollUnsafe) registration.handle).channel());
349             }
350         }
351         return Collections.unmodifiableList(channels);
352     }
353 
354     private long epollWait(IoHandlerContext context, long deadlineNanos) throws IOException {
355         if (deadlineNanos == NONE) {
356             return Native.epollWait(epollFd, events, timerFd,
357                     Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD); // disarm timer
358         }
359         long totalDelay = context.delayNanos(System.nanoTime());
360         int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
361         int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
362         return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
363     }
364 
365     private int epollWaitNoTimerChange() throws IOException {
366         return Native.epollWait(epollFd, events, false);
367     }
368 
369     private int epollWaitNow() throws IOException {
370         return Native.epollWait(epollFd, events, true);
371     }
372 
373     private int epollBusyWait() throws IOException {
374         return Native.epollBusyWait(epollFd, events);
375     }
376 
377     private int epollWaitTimeboxed() throws IOException {
378         // Wait with 1 second "safeguard" timeout
379         return Native.epollWait(epollFd, events, 1000);
380     }
381 
382     @Override
383     public int run(IoHandlerContext context) {
384         int handled = 0;
385         try {
386             int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
387             switch (strategy) {
388                 case SelectStrategy.CONTINUE:
389                     return 0;
390 
391                 case SelectStrategy.BUSY_WAIT:
392                     strategy = epollBusyWait();
393                     break;
394 
395                 case SelectStrategy.SELECT:
396                     if (pendingWakeup) {
397                         // We are going to be immediately woken so no need to reset wakenUp
398                         // or check for timerfd adjustment.
399                         strategy = epollWaitTimeboxed();
400                         if (strategy != 0) {
401                             break;
402                         }
403                         // We timed out so assume that we missed the write event due to an
404                         // abnormally failed syscall (the write itself or a prior epoll_wait)
405                         logger.warn("Missed eventfd write (not seen after > 1 second)");
406                         pendingWakeup = false;
407                         if (!context.canBlock()) {
408                             break;
409                         }
410                         // fall-through
411                     }
412 
413                     long curDeadlineNanos = context.deadlineNanos();
414                     if (curDeadlineNanos == -1L) {
415                         curDeadlineNanos = NONE; // nothing on the calendar
416                     }
417                     nextWakeupNanos.set(curDeadlineNanos);
418                     try {
419                         if (context.canBlock()) {
420                             if (curDeadlineNanos == prevDeadlineNanos) {
421                                 // No timer activity needed
422                                 strategy = epollWaitNoTimerChange();
423                             } else {
424                                 // Timerfd needs to be re-armed or disarmed
425                                 long result = epollWait(context, curDeadlineNanos);
426                                 // The result contains the actual return value and if a timer was used or not.
427                                 // We need to "unpack" using the helper methods exposed in Native.
428                                 strategy = Native.epollReady(result);
429                                 prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
430                             }
431                         }
432                     } finally {
433                         // Try get() first to avoid much more expensive CAS in the case we
434                         // were woken via the wakeup() method (submitted task)
435                         if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
436                             pendingWakeup = true;
437                         }
438                     }
439                     // fallthrough
440                 default:
441             }
442             if (strategy > 0) {
443                 handled = strategy;
444                 if (processReady(events, strategy)) {
445                     prevDeadlineNanos = NONE;
446                 }
447             }
448             if (allowGrowing && strategy == events.length()) {
449                 //increase the size of the array as we needed the whole space for the events
450                 events.increase();
451             }
452         } catch (Error e) {
453             throw e;
454         } catch (Throwable t) {
455             handleLoopException(t);
456         }
457         return handled;
458     }
459 
460     /**
461      * Visible only for testing!
462      */
463     void handleLoopException(Throwable t) {
464         logger.warn("Unexpected exception in the selector loop.", t);
465 
466         // Prevent possible consecutive immediate failures that lead to
467         // excessive CPU consumption.
468         try {
469             Thread.sleep(1000);
470         } catch (InterruptedException e) {
471             // Ignore.
472         }
473     }
474 
475     // Returns true if a timerFd event was encountered
476     private boolean processReady(EpollEventArray events, int ready) {
477         boolean timerFired = false;
478         for (int i = 0; i < ready; i ++) {
479             final int fd = events.fd(i);
480             if (fd == eventFd.intValue()) {
481                 pendingWakeup = false;
482             } else if (fd == timerFd.intValue()) {
483                 timerFired = true;
484             } else {
485                 final long ev = events.events(i);
486 
487                 DefaultEpollIoRegistration registration = registrations.get(fd);
488                 if (registration != null) {
489                     registration.handle(ev);
490                 } else {
491                     // We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
492                     try {
493                         Native.epollCtlDel(epollFd.intValue(), fd);
494                     } catch (IOException ignore) {
495                         // This can happen but is nothing we need to worry about as we only try to delete
496                         // the fd from the epoll set as we not found it in our mappings. So this call to
497                         // epollCtlDel(...) is just to ensure we cleanup stuff and so may fail if it was
498                         // deleted before or the file descriptor was closed before.
499                     }
500                 }
501             }
502         }
503         return timerFired;
504     }
505 
506     /**
507      * This method is intended for use by process checkpoint/restore
508      * integration, such as OpenJDK CRaC.
509      * It's up to the caller to ensure that there is no concurrent use
510      * of the FDs while these are closed, e.g. by blocking the executor.
511      */
512     @UnstableApi
513     public void closeFileDescriptors() {
514         // Ensure any in-flight wakeup writes have been performed prior to closing eventFd.
515         while (pendingWakeup) {
516             try {
517                 int count = epollWaitTimeboxed();
518                 if (count == 0) {
519                     // We timed-out so assume that the write we're expecting isn't coming
520                     break;
521                 }
522                 for (int i = 0; i < count; i++) {
523                     if (events.fd(i) == eventFd.intValue()) {
524                         pendingWakeup = false;
525                         break;
526                     }
527                 }
528             } catch (IOException ignore) {
529                 // ignore
530             }
531         }
532         try {
533             eventFd.close();
534         } catch (IOException e) {
535             logger.warn("Failed to close the event fd.", e);
536         }
537         try {
538             timerFd.close();
539         } catch (IOException e) {
540             logger.warn("Failed to close the timer fd.", e);
541         }
542 
543         try {
544             epollFd.close();
545         } catch (IOException e) {
546             logger.warn("Failed to close the epoll fd.", e);
547         }
548     }
549 }