View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.DefaultSelectStrategyFactory;
20  import io.netty.channel.IoHandlerContext;
21  import io.netty.channel.IoHandle;
22  import io.netty.channel.IoHandler;
23  import io.netty.channel.IoHandlerFactory;
24  import io.netty.channel.IoOps;
25  import io.netty.channel.IoRegistration;
26  import io.netty.channel.SelectStrategy;
27  import io.netty.channel.SelectStrategyFactory;
28  import io.netty.channel.unix.FileDescriptor;
29  import io.netty.util.IntSupplier;
30  import io.netty.util.collection.IntObjectHashMap;
31  import io.netty.util.collection.IntObjectMap;
32  import io.netty.util.concurrent.ThreadAwareExecutor;
33  import io.netty.util.internal.ObjectUtil;
34  import io.netty.util.internal.StringUtil;
35  import io.netty.util.internal.SystemPropertyUtil;
36  import io.netty.util.internal.UnstableApi;
37  import io.netty.util.internal.logging.InternalLogger;
38  import io.netty.util.internal.logging.InternalLoggerFactory;
39  
40  import java.io.IOException;
41  import java.io.UncheckedIOException;
42  import java.util.ArrayList;
43  import java.util.Collections;
44  import java.util.List;
45  import java.util.concurrent.atomic.AtomicBoolean;
46  import java.util.concurrent.atomic.AtomicLong;
47  
48  import static java.lang.Math.min;
49  import static java.lang.System.nanoTime;
50  
51  /**
52   * {@link IoHandler} which uses epoll under the covers. Only works on Linux!
53   */
54  public class EpollIoHandler implements IoHandler {
55      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollIoHandler.class);
56      private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
57              SystemPropertyUtil.getLong("io.netty.channel.epoll.epollWaitThreshold", 10);
58  
59      {
60          Epoll.ensureAvailability();
61      }
62  
63      // Pick a number that no task could have previously used.
64      private long prevDeadlineNanos = nanoTime() - 1;
65      private FileDescriptor epollFd;
66      private FileDescriptor eventFd;
67      private FileDescriptor timerFd;
68      private final IntObjectMap<DefaultEpollIoRegistration> registrations = new IntObjectHashMap<>(4096);
69      private final boolean allowGrowing;
70      private final EpollEventArray events;
71      private final NativeArrays nativeArrays;
72  
73      private final SelectStrategy selectStrategy;
74      private final IntSupplier selectNowSupplier = new IntSupplier() {
75          @Override
76          public int get() throws Exception {
77              return epollWaitNow();
78          }
79      };
80      private final ThreadAwareExecutor executor;
81  
82      private static final long AWAKE = -1L;
83      private static final long NONE = Long.MAX_VALUE;
84  
85      // nextWakeupNanos is:
86      //    AWAKE            when EL is awake
87      //    NONE             when EL is waiting with no wakeup scheduled
88      //    other value T    when EL is waiting with wakeup scheduled at time T
89      private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
90      private boolean pendingWakeup;
91  
92      private int numChannels;
93  
94      // See https://man7.org/linux/man-pages/man2/timerfd_create.2.html.
95      private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
96  
97      /**
98       * Returns a new {@link IoHandlerFactory} that creates {@link EpollIoHandler} instances.
99       */
100     public static IoHandlerFactory newFactory() {
101         return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
102     }
103 
104     /**
105      * Returns a new {@link IoHandlerFactory} that creates {@link EpollIoHandler} instances.
106      */
107     public static IoHandlerFactory newFactory(final int maxEvents,
108                                               final SelectStrategyFactory selectStrategyFactory) {
109         Epoll.ensureAvailability();
110         ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
111         ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
112         return new IoHandlerFactory() {
113             @Override
114             public IoHandler newHandler(ThreadAwareExecutor executor) {
115                 return new EpollIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
116             }
117 
118             @Override
119             public boolean isChangingThreadSupported() {
120                 return true;
121             }
122         };
123     }
124 
125     // Package-private for testing
126     EpollIoHandler(ThreadAwareExecutor executor, int maxEvents, SelectStrategy strategy) {
127         this.executor = ObjectUtil.checkNotNull(executor, "executor");
128         selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
129         if (maxEvents == 0) {
130             allowGrowing = true;
131             events = new EpollEventArray(4096);
132         } else {
133             allowGrowing = false;
134             events = new EpollEventArray(maxEvents);
135         }
136         nativeArrays = new NativeArrays();
137         openFileDescriptors();
138     }
139 
140     private static EpollIoHandle cast(IoHandle handle) {
141         if (handle instanceof EpollIoHandle) {
142             return (EpollIoHandle) handle;
143         }
144         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
145     }
146 
147     private static EpollIoOps cast(IoOps ops) {
148         if (ops instanceof EpollIoOps) {
149             return (EpollIoOps) ops;
150         }
151         throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
152     }
153 
154     /**
155      * This method is intended for use by a process checkpoint/restore
156      * integration, such as OpenJDK CRaC.
157      */
158     @UnstableApi
159     public void openFileDescriptors() {
160         boolean success = false;
161         FileDescriptor epollFd = null;
162         FileDescriptor eventFd = null;
163         FileDescriptor timerFd = null;
164         try {
165             this.epollFd = epollFd = Native.newEpollCreate();
166             this.eventFd = eventFd = Native.newEventFd();
167             try {
168                 // It is important to use EPOLLET here as we only want to get the notification once per
169                 // wakeup and don't call eventfd_read(...).
170                 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
171             } catch (IOException e) {
172                 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
173             }
174             this.timerFd = timerFd = Native.newTimerFd();
175             try {
176                 // It is important to use EPOLLET here as we only want to get the notification once per
177                 // wakeup and don't call read(...).
178                 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
179             } catch (IOException e) {
180                 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
181             }
182             success = true;
183         } finally {
184             if (!success) {
185                 closeFileDescriptor(epollFd);
186                 closeFileDescriptor(eventFd);
187                 closeFileDescriptor(timerFd);
188             }
189         }
190     }
191 
192     private static void closeFileDescriptor(FileDescriptor fd) {
193         if (fd != null) {
194             try {
195                 fd.close();
196             } catch (Exception e) {
197                 // ignore
198             }
199         }
200     }
201 
202     @Override
203     public void wakeup() {
204         if (!executor.isExecutorThread(Thread.currentThread()) && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
205             // write to the evfd which will then wake-up epoll_wait(...)
206             Native.eventFdWrite(eventFd.intValue(), 1L);
207         }
208     }
209 
210     @Override
211     public void prepareToDestroy() {
212         // Using the intermediate collection to prevent ConcurrentModificationException.
213         // In the `close()` method, the channel is deleted from `channels` map.
214         DefaultEpollIoRegistration[] copy = registrations.values().toArray(new DefaultEpollIoRegistration[0]);
215 
216         for (DefaultEpollIoRegistration reg: copy) {
217             reg.close();
218         }
219     }
220 
221     @Override
222     public void destroy() {
223         try {
224             closeFileDescriptors();
225         } finally {
226             nativeArrays.free();
227             events.free();
228         }
229     }
230 
231     private final class DefaultEpollIoRegistration implements IoRegistration {
232         private final ThreadAwareExecutor executor;
233         private final AtomicBoolean canceled = new AtomicBoolean();
234         final EpollIoHandle handle;
235 
236         DefaultEpollIoRegistration(ThreadAwareExecutor executor, EpollIoHandle handle) {
237             this.executor = executor;
238             this.handle = handle;
239         }
240 
241         @SuppressWarnings("unchecked")
242         @Override
243         public <T> T attachment() {
244             return (T) nativeArrays;
245         }
246 
247         @Override
248         public long submit(IoOps ops) {
249             EpollIoOps epollIoOps = cast(ops);
250             try {
251                 if (!isValid()) {
252                     return -1;
253                 }
254                 Native.epollCtlMod(epollFd.intValue(), handle.fd().intValue(), epollIoOps.value);
255                 return epollIoOps.value;
256             } catch (IOException e) {
257                 throw new UncheckedIOException(e);
258             }
259         }
260 
261         @Override
262         public boolean isValid() {
263             return !canceled.get();
264         }
265 
266         @Override
267         public boolean cancel() {
268             if (!canceled.compareAndSet(false, true)) {
269                 return false;
270             }
271             if (executor.isExecutorThread(Thread.currentThread())) {
272                 cancel0();
273             } else {
274                 executor.execute(this::cancel0);
275             }
276             return true;
277         }
278 
279         private void cancel0() {
280             int fd = handle.fd().intValue();
281             DefaultEpollIoRegistration old = registrations.remove(fd);
282             if (old != null) {
283                 if (old != this) {
284                     // The Channel mapping was already replaced due FD reuse, put back the stored Channel.
285                     registrations.put(fd, old);
286                     return;
287                 } else if (old.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
288                     numChannels--;
289                 }
290                 if (handle.fd().isOpen()) {
291                     try {
292                         // Remove the fd registration from epoll. This is only needed if it's still open as otherwise
293                         // it will be automatically removed once the file-descriptor is closed.
294                         Native.epollCtlDel(epollFd.intValue(), fd);
295                     } catch (IOException e) {
296                         logger.debug("Unable to remove fd {} from epoll {}", fd, epollFd.intValue());
297                     }
298                 }
299                 handle.unregistered();
300             }
301         }
302 
303         void close() {
304             try {
305                 cancel();
306             } catch (Exception e) {
307                 logger.debug("Exception during canceling " + this, e);
308             }
309             try {
310                 handle.close();
311             } catch (Exception e) {
312                 logger.debug("Exception during closing " + handle, e);
313             }
314         }
315 
316         void handle(long ev) {
317             handle.handle(this, EpollIoOps.eventOf((int) ev));
318         }
319     }
320 
321     @Override
322     public IoRegistration register(IoHandle handle)
323             throws Exception {
324         final EpollIoHandle epollHandle = cast(handle);
325         DefaultEpollIoRegistration registration = new DefaultEpollIoRegistration(executor, epollHandle);
326         int fd = epollHandle.fd().intValue();
327         Native.epollCtlAdd(epollFd.intValue(), fd, EpollIoOps.EPOLLERR.value);
328         DefaultEpollIoRegistration old = registrations.put(fd, registration);
329 
330         // We either expect to have no registration in the map with the same FD or that the FD of the old registration
331         // is already closed.
332         assert old == null || !old.isValid();
333 
334         if (epollHandle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
335             numChannels++;
336         }
337         handle.registered();
338         return registration;
339     }
340 
341     @Override
342     public boolean isCompatible(Class<? extends IoHandle> handleType) {
343         return EpollIoHandle.class.isAssignableFrom(handleType);
344     }
345 
346     int numRegisteredChannels() {
347         return numChannels;
348     }
349 
350     List<Channel> registeredChannelsList() {
351         IntObjectMap<DefaultEpollIoRegistration> ch = registrations;
352         if (ch.isEmpty()) {
353             return Collections.emptyList();
354         }
355 
356         List<Channel> channels = new ArrayList<>(ch.size());
357         for (DefaultEpollIoRegistration registration : ch.values()) {
358             if (registration.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
359                 channels.add(((AbstractEpollChannel.AbstractEpollUnsafe) registration.handle).channel());
360             }
361         }
362         return Collections.unmodifiableList(channels);
363     }
364 
365     private long epollWait(IoHandlerContext context, long deadlineNanos) throws IOException {
366         if (deadlineNanos == NONE) {
367             return Native.epollWait(epollFd, events, timerFd,
368                     Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD); // disarm timer
369         }
370         long totalDelay = context.delayNanos(System.nanoTime());
371         int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
372         int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
373         return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
374     }
375 
376     private int epollWaitNoTimerChange() throws IOException {
377         return Native.epollWait(epollFd, events, false);
378     }
379 
380     private int epollWaitNow() throws IOException {
381         return Native.epollWait(epollFd, events, true);
382     }
383 
384     private int epollBusyWait() throws IOException {
385         return Native.epollBusyWait(epollFd, events);
386     }
387 
388     private int epollWaitTimeboxed() throws IOException {
389         // Wait with 1 second "safeguard" timeout
390         return Native.epollWait(epollFd, events, 1000);
391     }
392 
393     @Override
394     public int run(IoHandlerContext context) {
395         int handled = 0;
396         try {
397             int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
398             switch (strategy) {
399                 case SelectStrategy.CONTINUE:
400                     if (context.shouldReportActiveIoTime()) {
401                         context.reportActiveIoTime(0); // Report zero as we did no I/O.
402                     }
403                     return 0;
404 
405                 case SelectStrategy.BUSY_WAIT:
406                     strategy = epollBusyWait();
407                     break;
408 
409                 case SelectStrategy.SELECT:
410                     if (pendingWakeup) {
411                         // We are going to be immediately woken so no need to reset wakenUp
412                         // or check for timerfd adjustment.
413                         strategy = epollWaitTimeboxed();
414                         if (strategy != 0) {
415                             break;
416                         }
417                         // We timed out so assume that we missed the write event due to an
418                         // abnormally failed syscall (the write itself or a prior epoll_wait)
419                         logger.warn("Missed eventfd write (not seen after > 1 second)");
420                         pendingWakeup = false;
421                         if (!context.canBlock()) {
422                             break;
423                         }
424                         // fall-through
425                     }
426 
427                     long curDeadlineNanos = context.deadlineNanos();
428                     if (curDeadlineNanos == -1L) {
429                         curDeadlineNanos = NONE; // nothing on the calendar
430                     }
431                     nextWakeupNanos.set(curDeadlineNanos);
432                     try {
433                         if (context.canBlock()) {
434                             if (curDeadlineNanos == prevDeadlineNanos) {
435                                 // No timer activity needed
436                                 strategy = epollWaitNoTimerChange();
437                             } else {
438                                 // Timerfd needs to be re-armed or disarmed
439                                 long result = epollWait(context, curDeadlineNanos);
440                                 // The result contains the actual return value and if a timer was used or not.
441                                 // We need to "unpack" using the helper methods exposed in Native.
442                                 strategy = Native.epollReady(result);
443                                 prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
444                             }
445                         }
446                     } finally {
447                         // Try get() first to avoid much more expensive CAS in the case we
448                         // were woken via the wakeup() method (submitted task)
449                         if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
450                             pendingWakeup = true;
451                         }
452                     }
453                     // fallthrough
454                 default:
455             }
456             if (strategy > 0) {
457                 handled = strategy;
458                 if (context.shouldReportActiveIoTime()) {
459                     long activeIoStartTimeNanos = System.nanoTime();
460                     if (processReady(events, strategy)) {
461                         prevDeadlineNanos = NONE;
462                     }
463                     long activeIoEndTimeNanos = System.nanoTime();
464                     context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
465                 } else {
466                     if (processReady(events, strategy)) {
467                         prevDeadlineNanos = NONE;
468                     }
469                 }
470             } else if (context.shouldReportActiveIoTime()) {
471                 context.reportActiveIoTime(0);
472             }
473 
474             if (allowGrowing && strategy == events.length()) {
475                 //increase the size of the array as we needed the whole space for the events
476                 events.increase();
477             }
478         } catch (Error e) {
479             throw e;
480         } catch (Throwable t) {
481             handleLoopException(t);
482         }
483         return handled;
484     }
485 
486     /**
487      * Visible only for testing!
488      */
489     void handleLoopException(Throwable t) {
490         logger.warn("Unexpected exception in the selector loop.", t);
491 
492         // Prevent possible consecutive immediate failures that lead to
493         // excessive CPU consumption.
494         try {
495             Thread.sleep(1000);
496         } catch (InterruptedException e) {
497             // Ignore.
498         }
499     }
500 
501     // Returns true if a timerFd event was encountered
502     private boolean processReady(EpollEventArray events, int ready) {
503         boolean timerFired = false;
504         for (int i = 0; i < ready; i ++) {
505             final int fd = events.fd(i);
506             if (fd == eventFd.intValue()) {
507                 pendingWakeup = false;
508             } else if (fd == timerFd.intValue()) {
509                 timerFired = true;
510             } else {
511                 final long ev = events.events(i);
512 
513                 DefaultEpollIoRegistration registration = registrations.get(fd);
514                 if (registration != null) {
515                     registration.handle(ev);
516                 } else {
517                     // We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
518                     try {
519                         Native.epollCtlDel(epollFd.intValue(), fd);
520                     } catch (IOException ignore) {
521                         // This can happen but is nothing we need to worry about as we only try to delete
522                         // the fd from the epoll set as we not found it in our mappings. So this call to
523                         // epollCtlDel(...) is just to ensure we cleanup stuff and so may fail if it was
524                         // deleted before or the file descriptor was closed before.
525                     }
526                 }
527             }
528         }
529         return timerFired;
530     }
531 
532     /**
533      * This method is intended for use by process checkpoint/restore
534      * integration, such as OpenJDK CRaC.
535      * It's up to the caller to ensure that there is no concurrent use
536      * of the FDs while these are closed, e.g. by blocking the executor.
537      */
538     @UnstableApi
539     public void closeFileDescriptors() {
540         // Ensure any in-flight wakeup writes have been performed prior to closing eventFd.
541         while (pendingWakeup) {
542             try {
543                 int count = epollWaitTimeboxed();
544                 if (count == 0) {
545                     // We timed-out so assume that the write we're expecting isn't coming
546                     break;
547                 }
548                 for (int i = 0; i < count; i++) {
549                     if (events.fd(i) == eventFd.intValue()) {
550                         pendingWakeup = false;
551                         break;
552                     }
553                 }
554             } catch (IOException ignore) {
555                 // ignore
556             }
557         }
558         try {
559             eventFd.close();
560         } catch (IOException e) {
561             logger.warn("Failed to close the event fd.", e);
562         }
563         try {
564             timerFd.close();
565         } catch (IOException e) {
566             logger.warn("Failed to close the timer fd.", e);
567         }
568 
569         try {
570             epollFd.close();
571         } catch (IOException e) {
572             logger.warn("Failed to close the epoll fd.", e);
573         }
574     }
575 }