View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.DefaultSelectStrategyFactory;
20  import io.netty.channel.EventLoop;
21  import io.netty.channel.IoEventLoop;
22  import io.netty.channel.IoExecutionContext;
23  import io.netty.channel.IoHandle;
24  import io.netty.channel.IoHandler;
25  import io.netty.channel.IoHandlerFactory;
26  import io.netty.channel.IoOps;
27  import io.netty.channel.SelectStrategy;
28  import io.netty.channel.SelectStrategyFactory;
29  import io.netty.channel.unix.FileDescriptor;
30  import io.netty.channel.unix.IovArray;
31  import io.netty.util.IntSupplier;
32  import io.netty.util.collection.IntObjectHashMap;
33  import io.netty.util.collection.IntObjectMap;
34  import io.netty.util.concurrent.Future;
35  import io.netty.util.concurrent.Promise;
36  import io.netty.util.internal.ObjectUtil;
37  import io.netty.util.internal.StringUtil;
38  import io.netty.util.internal.SystemPropertyUtil;
39  import io.netty.util.internal.UnstableApi;
40  import io.netty.util.internal.logging.InternalLogger;
41  import io.netty.util.internal.logging.InternalLoggerFactory;
42  
43  import java.io.IOException;
44  import java.util.ArrayList;
45  import java.util.Collections;
46  import java.util.List;
47  import java.util.concurrent.atomic.AtomicLong;
48  
49  import static java.lang.Math.min;
50  import static java.lang.System.nanoTime;
51  
52  /**
53   * {@link IoHandler} which uses epoll under the covers. Only works on Linux!
54   */
55  public class EpollIoHandler implements IoHandler {
56      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollIoHandler.class);
57      private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
58              SystemPropertyUtil.getLong("io.netty.channel.epoll.epollWaitThreshold", 10);
59  
60      static {
61          // Ensure JNI is initialized by the time this class is loaded by this time!
62          // We use unix-common methods in this class which are backed by JNI methods.
63          Epoll.ensureAvailability();
64      }
65  
66      // Pick a number that no task could have previously used.
67      private long prevDeadlineNanos = nanoTime() - 1;
68      private FileDescriptor epollFd;
69      private FileDescriptor eventFd;
70      private FileDescriptor timerFd;
71      private final IntObjectMap<DefaultEpollIoRegistration> registrations = new IntObjectHashMap<>(4096);
72      private final boolean allowGrowing;
73      private final EpollEventArray events;
74  
75      // These are initialized on first use
76      private IovArray iovArray;
77      private NativeDatagramPacketArray datagramPacketArray;
78  
79      private final SelectStrategy selectStrategy;
80      private final IntSupplier selectNowSupplier = new IntSupplier() {
81          @Override
82          public int get() throws Exception {
83              return epollWaitNow();
84          }
85      };
86  
87      private static final long AWAKE = -1L;
88      private static final long NONE = Long.MAX_VALUE;
89  
90      // nextWakeupNanos is:
91      //    AWAKE            when EL is awake
92      //    NONE             when EL is waiting with no wakeup scheduled
93      //    other value T    when EL is waiting with wakeup scheduled at time T
94      private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
95      private boolean pendingWakeup;
96  
97      private int numChannels;
98  
99      // See https://man7.org/linux/man-pages/man2/timerfd_create.2.html.
100     private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
101 
102     /**
103      * Returns a new {@link IoHandlerFactory} that creates {@link EpollIoHandler} instances.
104      */
105     public static IoHandlerFactory newFactory() {
106         return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
107     }
108 
109     /**
110      * Returns a new {@link IoHandlerFactory} that creates {@link EpollIoHandler} instances.
111      */
112     public static IoHandlerFactory newFactory(final int maxEvents,
113                                               final SelectStrategyFactory selectStrategyFactory) {
114         ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
115         ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
116         return new IoHandlerFactory() {
117             @Override
118             public IoHandler newHandler() {
119                 return new EpollIoHandler(maxEvents, selectStrategyFactory.newSelectStrategy());
120             }
121         };
122     }
123 
124     // Package-private for testing
125     EpollIoHandler(int maxEvents, SelectStrategy strategy) {
126         selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
127         if (maxEvents == 0) {
128             allowGrowing = true;
129             events = new EpollEventArray(4096);
130         } else {
131             allowGrowing = false;
132             events = new EpollEventArray(maxEvents);
133         }
134         openFileDescriptors();
135     }
136 
137     private static EpollIoHandle cast(IoHandle handle) {
138         if (handle instanceof EpollIoHandle) {
139             return (EpollIoHandle) handle;
140         }
141         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
142     }
143 
144     private static EpollIoOps cast(IoOps ops) {
145         if (ops instanceof EpollIoOps) {
146             return (EpollIoOps) ops;
147         }
148         throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
149     }
150 
151     /**
152      * This method is intended for use by a process checkpoint/restore
153      * integration, such as OpenJDK CRaC.
154      */
155     @UnstableApi
156     public void openFileDescriptors() {
157         boolean success = false;
158         FileDescriptor epollFd = null;
159         FileDescriptor eventFd = null;
160         FileDescriptor timerFd = null;
161         try {
162             this.epollFd = epollFd = Native.newEpollCreate();
163             this.eventFd = eventFd = Native.newEventFd();
164             try {
165                 // It is important to use EPOLLET here as we only want to get the notification once per
166                 // wakeup and don't call eventfd_read(...).
167                 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
168             } catch (IOException e) {
169                 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
170             }
171             this.timerFd = timerFd = Native.newTimerFd();
172             try {
173                 // It is important to use EPOLLET here as we only want to get the notification once per
174                 // wakeup and don't call read(...).
175                 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
176             } catch (IOException e) {
177                 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
178             }
179             success = true;
180         } finally {
181             if (!success) {
182                 closeFileDescriptor(epollFd);
183                 closeFileDescriptor(eventFd);
184                 closeFileDescriptor(timerFd);
185             }
186         }
187     }
188 
189     private static void closeFileDescriptor(FileDescriptor fd) {
190         if (fd != null) {
191             try {
192                 fd.close();
193             } catch (Exception e) {
194                 // ignore
195             }
196         }
197     }
198 
199     /**
200      * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
201      */
202     IovArray cleanIovArray() {
203         if (iovArray == null) {
204             iovArray = new IovArray();
205         } else {
206             iovArray.clear();
207         }
208         return iovArray;
209     }
210 
211     /**
212      * Return a cleared {@link NativeDatagramPacketArray} that can be used for writes in this {@link EventLoop}.
213      */
214     NativeDatagramPacketArray cleanDatagramPacketArray() {
215         if (datagramPacketArray == null) {
216             datagramPacketArray = new NativeDatagramPacketArray();
217         } else {
218             datagramPacketArray.clear();
219         }
220         return datagramPacketArray;
221     }
222 
223     @Override
224     public void wakeup(IoEventLoop eventLoop) {
225         if (!eventLoop.inEventLoop() && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
226             // write to the evfd which will then wake-up epoll_wait(...)
227             Native.eventFdWrite(eventFd.intValue(), 1L);
228         }
229     }
230 
231     @Override
232     public void prepareToDestroy() {
233         // Using the intermediate collection to prevent ConcurrentModificationException.
234         // In the `close()` method, the channel is deleted from `channels` map.
235         DefaultEpollIoRegistration[] copy = registrations.values().toArray(new DefaultEpollIoRegistration[0]);
236 
237         for (DefaultEpollIoRegistration reg: copy) {
238             reg.close();
239         }
240     }
241 
242     @Override
243     public void destroy() {
244         try {
245             closeFileDescriptors();
246         } finally {
247             // release native memory
248             if (iovArray != null) {
249                 iovArray.release();
250                 iovArray = null;
251             }
252             if (datagramPacketArray != null) {
253                 datagramPacketArray.release();
254                 datagramPacketArray = null;
255             }
256             events.free();
257         }
258     }
259 
260     private final class DefaultEpollIoRegistration implements EpollIoRegistration {
261         private final Promise<?> cancellationPromise;
262         private final IoEventLoop eventLoop;
263         final EpollIoHandle handle;
264 
265         DefaultEpollIoRegistration(IoEventLoop eventLoop, EpollIoHandle handle) {
266             this.eventLoop = eventLoop;
267             this.handle = handle;
268             this.cancellationPromise = eventLoop.newPromise();
269         }
270 
271         @Override
272         public long submit(IoOps ops) throws Exception {
273             EpollIoOps epollIoOps = cast(ops);
274             try {
275                 if (!isValid()) {
276                     return -1;
277                 }
278                 Native.epollCtlMod(epollFd.intValue(), handle.fd().intValue(), epollIoOps.value);
279                 return epollIoOps.value;
280             } catch (IOException e) {
281                 throw e;
282             } catch (Exception e) {
283                 throw new IOException(e);
284             }
285         }
286 
287         @Override
288         public EpollIoHandler ioHandler() {
289             return EpollIoHandler.this;
290         }
291 
292         @Override
293         public void cancel() throws IOException {
294             if (!cancellationPromise.trySuccess(null)) {
295                 return;
296             }
297             if (handle.fd().isOpen()) {
298                 // Remove the fd registration from epoll. This is only needed if it's still open as otherwise it will
299                 // be automatically removed once the file-descriptor is closed.
300                 Native.epollCtlDel(epollFd.intValue(), handle.fd().intValue());
301             }
302             if (eventLoop.inEventLoop()) {
303                 cancel0();
304             } else {
305                 eventLoop.execute(this::cancel0);
306             }
307         }
308 
309         private void cancel0() {
310             int fd = handle.fd().intValue();
311             DefaultEpollIoRegistration old = registrations.remove(fd);
312             if (old != null) {
313                 if (old != this) {
314                     // The Channel mapping was already replaced due FD reuse, put back the stored Channel.
315                     registrations.put(fd, old);
316                 } else if (old.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
317                     numChannels--;
318                 }
319             }
320         }
321 
322         void close() {
323             try {
324                 cancel();
325             } catch (Exception e) {
326                 logger.debug("Exception during canceling " + this, e);
327             }
328             try {
329                 handle.close();
330             } catch (Exception e) {
331                 logger.debug("Exception during closing " + handle, e);
332             }
333         }
334 
335         @Override
336         public Future<?> cancelFuture() {
337             return cancellationPromise;
338         }
339 
340         void handle(long ev) {
341             handle.handle(this, EpollIoOps.eventOf((int) ev));
342         }
343     }
344 
345     @Override
346     public EpollIoRegistration register(IoEventLoop eventLoop, IoHandle handle)
347             throws Exception {
348         final EpollIoHandle epollHandle = cast(handle);
349         DefaultEpollIoRegistration registration = new DefaultEpollIoRegistration(eventLoop, epollHandle);
350         int fd = epollHandle.fd().intValue();
351         Native.epollCtlAdd(epollFd.intValue(), fd, EpollIoOps.EPOLLERR.value);
352         DefaultEpollIoRegistration old = registrations.put(fd, registration);
353 
354         // We either expect to have no registration in the map with the same FD or that the FD of the old registration
355         // is already closed.
356         assert old == null || !old.isValid();
357 
358         if (epollHandle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
359             numChannels++;
360         }
361         return registration;
362     }
363 
364     @Override
365     public boolean isCompatible(Class<? extends IoHandle> handleType) {
366         return EpollIoHandle.class.isAssignableFrom(handleType);
367     }
368 
369     int numRegisteredChannels() {
370         return numChannels;
371     }
372 
373     List<Channel> registeredChannelsList() {
374         IntObjectMap<DefaultEpollIoRegistration> ch = registrations;
375         if (ch.isEmpty()) {
376             return Collections.emptyList();
377         }
378 
379         List<Channel> channels = new ArrayList<>(ch.size());
380         for (DefaultEpollIoRegistration registration : ch.values()) {
381             if (registration.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
382                 channels.add(((AbstractEpollChannel.AbstractEpollUnsafe) registration.handle).channel());
383             }
384         }
385         return Collections.unmodifiableList(channels);
386     }
387 
388     private long epollWait(IoExecutionContext context, long deadlineNanos) throws IOException {
389         if (deadlineNanos == NONE) {
390             return Native.epollWait(epollFd, events, timerFd,
391                     Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD); // disarm timer
392         }
393         long totalDelay = context.delayNanos(System.nanoTime());
394         int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
395         int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
396         return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
397     }
398 
399     private int epollWaitNoTimerChange() throws IOException {
400         return Native.epollWait(epollFd, events, false);
401     }
402 
403     private int epollWaitNow() throws IOException {
404         return Native.epollWait(epollFd, events, true);
405     }
406 
407     private int epollBusyWait() throws IOException {
408         return Native.epollBusyWait(epollFd, events);
409     }
410 
411     private int epollWaitTimeboxed() throws IOException {
412         // Wait with 1 second "safeguard" timeout
413         return Native.epollWait(epollFd, events, 1000);
414     }
415 
416     @Override
417     public int run(IoExecutionContext context) {
418         int handled = 0;
419         try {
420             int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
421             switch (strategy) {
422                 case SelectStrategy.CONTINUE:
423                     return 0;
424 
425                 case SelectStrategy.BUSY_WAIT:
426                     strategy = epollBusyWait();
427                     break;
428 
429                 case SelectStrategy.SELECT:
430                     if (pendingWakeup) {
431                         // We are going to be immediately woken so no need to reset wakenUp
432                         // or check for timerfd adjustment.
433                         strategy = epollWaitTimeboxed();
434                         if (strategy != 0) {
435                             break;
436                         }
437                         // We timed out so assume that we missed the write event due to an
438                         // abnormally failed syscall (the write itself or a prior epoll_wait)
439                         logger.warn("Missed eventfd write (not seen after > 1 second)");
440                         pendingWakeup = false;
441                         if (!context.canBlock()) {
442                             break;
443                         }
444                         // fall-through
445                     }
446 
447                     long curDeadlineNanos = context.deadlineNanos();
448                     if (curDeadlineNanos == -1L) {
449                         curDeadlineNanos = NONE; // nothing on the calendar
450                     }
451                     nextWakeupNanos.set(curDeadlineNanos);
452                     try {
453                         if (context.canBlock()) {
454                             if (curDeadlineNanos == prevDeadlineNanos) {
455                                 // No timer activity needed
456                                 strategy = epollWaitNoTimerChange();
457                             } else {
458                                 // Timerfd needs to be re-armed or disarmed
459                                 long result = epollWait(context, curDeadlineNanos);
460                                 // The result contains the actual return value and if a timer was used or not.
461                                 // We need to "unpack" using the helper methods exposed in Native.
462                                 strategy = Native.epollReady(result);
463                                 prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
464                             }
465                         }
466                     } finally {
467                         // Try get() first to avoid much more expensive CAS in the case we
468                         // were woken via the wakeup() method (submitted task)
469                         if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
470                             pendingWakeup = true;
471                         }
472                     }
473                     // fallthrough
474                 default:
475             }
476             if (strategy > 0) {
477                 handled = strategy;
478                 if (processReady(events, strategy)) {
479                     prevDeadlineNanos = NONE;
480                 }
481             }
482             if (allowGrowing && strategy == events.length()) {
483                 //increase the size of the array as we needed the whole space for the events
484                 events.increase();
485             }
486         } catch (Error e) {
487             throw e;
488         } catch (Throwable t) {
489             handleLoopException(t);
490         }
491         return handled;
492     }
493 
494     /**
495      * Visible only for testing!
496      */
497     void handleLoopException(Throwable t) {
498         logger.warn("Unexpected exception in the selector loop.", t);
499 
500         // Prevent possible consecutive immediate failures that lead to
501         // excessive CPU consumption.
502         try {
503             Thread.sleep(1000);
504         } catch (InterruptedException e) {
505             // Ignore.
506         }
507     }
508 
509     // Returns true if a timerFd event was encountered
510     private boolean processReady(EpollEventArray events, int ready) {
511         boolean timerFired = false;
512         for (int i = 0; i < ready; i ++) {
513             final int fd = events.fd(i);
514             if (fd == eventFd.intValue()) {
515                 pendingWakeup = false;
516             } else if (fd == timerFd.intValue()) {
517                 timerFired = true;
518             } else {
519                 final long ev = events.events(i);
520 
521                 DefaultEpollIoRegistration registration = registrations.get(fd);
522                 if (registration != null) {
523                     registration.handle(ev);
524                 } else {
525                     // We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
526                     try {
527                         Native.epollCtlDel(epollFd.intValue(), fd);
528                     } catch (IOException ignore) {
529                         // This can happen but is nothing we need to worry about as we only try to delete
530                         // the fd from the epoll set as we not found it in our mappings. So this call to
531                         // epollCtlDel(...) is just to ensure we cleanup stuff and so may fail if it was
532                         // deleted before or the file descriptor was closed before.
533                     }
534                 }
535             }
536         }
537         return timerFired;
538     }
539 
540     /**
541      * This method is intended for use by process checkpoint/restore
542      * integration, such as OpenJDK CRaC.
543      * It's up to the caller to ensure that there is no concurrent use
544      * of the FDs while these are closed, e.g. by blocking the executor.
545      */
546     @UnstableApi
547     public void closeFileDescriptors() {
548         // Ensure any in-flight wakeup writes have been performed prior to closing eventFd.
549         while (pendingWakeup) {
550             try {
551                 int count = epollWaitTimeboxed();
552                 if (count == 0) {
553                     // We timed-out so assume that the write we're expecting isn't coming
554                     break;
555                 }
556                 for (int i = 0; i < count; i++) {
557                     if (events.fd(i) == eventFd.intValue()) {
558                         pendingWakeup = false;
559                         break;
560                     }
561                 }
562             } catch (IOException ignore) {
563                 // ignore
564             }
565         }
566         try {
567             eventFd.close();
568         } catch (IOException e) {
569             logger.warn("Failed to close the event fd.", e);
570         }
571         try {
572             timerFd.close();
573         } catch (IOException e) {
574             logger.warn("Failed to close the timer fd.", e);
575         }
576 
577         try {
578             epollFd.close();
579         } catch (IOException e) {
580             logger.warn("Failed to close the epoll fd.", e);
581         }
582     }
583 }