View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.DefaultSelectStrategyFactory;
20  import io.netty.channel.IoHandlerContext;
21  import io.netty.channel.IoHandle;
22  import io.netty.channel.IoHandler;
23  import io.netty.channel.IoHandlerFactory;
24  import io.netty.channel.IoOps;
25  import io.netty.channel.IoRegistration;
26  import io.netty.channel.SelectStrategy;
27  import io.netty.channel.SelectStrategyFactory;
28  import io.netty.channel.unix.FileDescriptor;
29  import io.netty.util.IntSupplier;
30  import io.netty.util.collection.IntObjectHashMap;
31  import io.netty.util.collection.IntObjectMap;
32  import io.netty.util.concurrent.ThreadAwareExecutor;
33  import io.netty.util.internal.ObjectUtil;
34  import io.netty.util.internal.StringUtil;
35  import io.netty.util.internal.SystemPropertyUtil;
36  import io.netty.util.internal.UnstableApi;
37  import io.netty.util.internal.logging.InternalLogger;
38  import io.netty.util.internal.logging.InternalLoggerFactory;
39  
40  import java.io.IOException;
41  import java.io.UncheckedIOException;
42  import java.util.ArrayList;
43  import java.util.Collections;
44  import java.util.List;
45  import java.util.concurrent.atomic.AtomicLong;
46  
47  import static java.lang.Math.min;
48  
49  /**
50   * {@link IoHandler} which uses epoll under the covers. Only works on Linux!
51   */
52  public class EpollIoHandler implements IoHandler {
53      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollIoHandler.class);
54      private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
55              SystemPropertyUtil.getLong("io.netty.channel.epoll.epollWaitThreshold", 10);
56  
57      {
58          Epoll.ensureAvailability();
59      }
60  
61      // Pick a number that no task could have previously used.
62      private long prevDeadlineNanos = NONE;
63      private FileDescriptor epollFd;
64      private FileDescriptor eventFd;
65      private FileDescriptor timerFd;
66      private final IntObjectMap<DefaultEpollIoRegistration> registrations = new IntObjectHashMap<>(4096);
67      private final boolean allowGrowing;
68      private final EpollEventArray events;
69      private final NativeArrays nativeArrays;
70  
71      private final SelectStrategy selectStrategy;
72      private final IntSupplier selectNowSupplier = new IntSupplier() {
73          @Override
74          public int get() throws Exception {
75              return epollWaitNow();
76          }
77      };
78      private final ThreadAwareExecutor executor;
79  
80      private static final long AWAKE = -1L;
81      private static final long NONE = Long.MAX_VALUE;
82  
83      // nextWakeupNanos is:
84      //    AWAKE            when EL is awake
85      //    NONE             when EL is waiting with no wakeup scheduled
86      //    other value T    when EL is waiting with wakeup scheduled at time T
87      private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
88      private boolean pendingWakeup;
89  
90      private int numChannels;
91  
92      // See https://man7.org/linux/man-pages/man2/timerfd_create.2.html.
93      private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
94  
95      /**
96       * Returns a new {@link IoHandlerFactory} that creates {@link EpollIoHandler} instances.
97       */
98      public static IoHandlerFactory newFactory() {
99          return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
100     }
101 
102     /**
103      * Returns a new {@link IoHandlerFactory} that creates {@link EpollIoHandler} instances.
104      */
105     public static IoHandlerFactory newFactory(final int maxEvents,
106                                               final SelectStrategyFactory selectStrategyFactory) {
107         Epoll.ensureAvailability();
108         ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
109         ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
110         return new IoHandlerFactory() {
111             @Override
112             public IoHandler newHandler(ThreadAwareExecutor executor) {
113                 return new EpollIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
114             }
115 
116             @Override
117             public boolean isChangingThreadSupported() {
118                 return true;
119             }
120         };
121     }
122 
123     // Package-private for testing
124     EpollIoHandler(ThreadAwareExecutor executor, int maxEvents, SelectStrategy strategy) {
125         this.executor = ObjectUtil.checkNotNull(executor, "executor");
126         selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
127         if (maxEvents == 0) {
128             allowGrowing = true;
129             events = new EpollEventArray(4096);
130         } else {
131             allowGrowing = false;
132             events = new EpollEventArray(maxEvents);
133         }
134         nativeArrays = new NativeArrays();
135         openFileDescriptors();
136     }
137 
138     private static EpollIoHandle cast(IoHandle handle) {
139         if (handle instanceof EpollIoHandle) {
140             return (EpollIoHandle) handle;
141         }
142         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
143     }
144 
145     private static EpollIoOps cast(IoOps ops) {
146         if (ops instanceof EpollIoOps) {
147             return (EpollIoOps) ops;
148         }
149         throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
150     }
151 
152     /**
153      * This method is intended for use by a process checkpoint/restore
154      * integration, such as OpenJDK CRaC.
155      */
156     @UnstableApi
157     public void openFileDescriptors() {
158         boolean success = false;
159         FileDescriptor epollFd = null;
160         FileDescriptor eventFd = null;
161         FileDescriptor timerFd = null;
162         try {
163             this.epollFd = epollFd = Native.newEpollCreate();
164             this.eventFd = eventFd = Native.newEventFd();
165             try {
166                 // It is important to use EPOLLET here as we only want to get the notification once per
167                 // wakeup and don't call eventfd_read(...).
168                 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
169             } catch (IOException e) {
170                 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
171             }
172             this.timerFd = timerFd = Native.newTimerFd();
173             try {
174                 // It is important to use EPOLLET here as we only want to get the notification once per
175                 // wakeup and don't call read(...).
176                 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
177             } catch (IOException e) {
178                 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
179             }
180             success = true;
181         } finally {
182             if (!success) {
183                 closeFileDescriptor(epollFd);
184                 closeFileDescriptor(eventFd);
185                 closeFileDescriptor(timerFd);
186             }
187         }
188     }
189 
190     private static void closeFileDescriptor(FileDescriptor fd) {
191         if (fd != null) {
192             try {
193                 fd.close();
194             } catch (Exception e) {
195                 // ignore
196             }
197         }
198     }
199 
200     @Override
201     public void wakeup() {
202         if (!executor.isExecutorThread(Thread.currentThread()) && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
203             // write to the evfd which will then wake-up epoll_wait(...)
204             Native.eventFdWrite(eventFd.intValue(), 1L);
205         }
206     }
207 
208     @Override
209     public void prepareToDestroy() {
210         // Using the intermediate collection to prevent ConcurrentModificationException.
211         // In the `close()` method, the channel is deleted from `channels` map.
212         DefaultEpollIoRegistration[] copy = registrations.values().toArray(new DefaultEpollIoRegistration[0]);
213 
214         for (DefaultEpollIoRegistration reg: copy) {
215             reg.close();
216         }
217     }
218 
219     @Override
220     public void destroy() {
221         try {
222             closeFileDescriptors();
223         } finally {
224             nativeArrays.free();
225             events.free();
226         }
227     }
228 
229     private enum RegistrationState {
230         // Was not added via EPOLL_CTL_ADD
231         Pending,
232         // Was added via EPOLL_CTL_ADD
233         Added,
234         // Was canceled an so removed via EPOLL_CTL_DEL
235         Cancelled
236     }
237 
238     private final class DefaultEpollIoRegistration
239             implements IoRegistration {
240         private final ThreadAwareExecutor executor;
241         private RegistrationState state = RegistrationState.Pending;
242         final EpollIoHandle handle;
243 
244         DefaultEpollIoRegistration(ThreadAwareExecutor executor, EpollIoHandle handle) {
245             this.executor = executor;
246             this.handle = handle;
247         }
248 
249         @SuppressWarnings("unchecked")
250         @Override
251         public <T> T attachment() {
252             return (T) nativeArrays;
253         }
254 
255         @Override
256         public long submit(IoOps ops) {
257             EpollIoOps epollIoOps = cast(ops);
258             try {
259                 synchronized (this) {
260                     switch (state) {
261                         case Cancelled:
262                             return -1;
263                         case Pending:
264                             if (epollIoOps.value == EpollIoOps.NONE.value) {
265                                 // 0 is a special value that basically means we should remove the registration.
266                                 // As we did not add the fd yet we should just return.
267                                 return 0;
268                             }
269                             Native.epollCtlAdd(epollFd.intValue(), handle.fd().intValue(), epollIoOps.value);
270                             state = RegistrationState.Added;
271                             return epollIoOps.value;
272                         case Added:
273                             if (epollIoOps.value == EpollIoOps.NONE.value) {
274                                 // 0 means there is nothing to handle anymore, unregister the fd as otherwise
275                                 // we might get notified forever because of EPOLLHUP / EPOLLERR.
276                                 Native.epollCtlDel(epollFd.intValue(), handle.fd().intValue());
277                                 return 0;
278                             }
279                             Native.epollCtlMod(epollFd.intValue(), handle.fd().intValue(), epollIoOps.value);
280                             return epollIoOps.value;
281                         default:
282                             throw new IllegalStateException();
283                     }
284                 }
285             } catch (IOException e) {
286                 throw new UncheckedIOException(e);
287             }
288         }
289 
290         @Override
291         public synchronized boolean isValid() {
292             return state != RegistrationState.Cancelled;
293         }
294 
295         @Override
296         public boolean cancel() {
297             synchronized (this) {
298                 if (state == RegistrationState.Cancelled) {
299                     return false;
300                 }
301                 state = RegistrationState.Cancelled;
302             }
303             if (executor.isExecutorThread(Thread.currentThread())) {
304                 cancel0();
305             } else {
306                 executor.execute(this::cancel0);
307             }
308             return true;
309         }
310 
311         private void cancel0() {
312             int fd = handle.fd().intValue();
313             DefaultEpollIoRegistration old = registrations.remove(fd);
314             if (old != null) {
315                 if (old != this) {
316                     // The Channel mapping was already replaced due FD reuse, put back the stored Channel.
317                     registrations.put(fd, old);
318                     return;
319                 } else if (old.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
320                     numChannels--;
321                 }
322                 if (handle.fd().isOpen()) {
323                     try {
324                         // Remove the fd registration from epoll. This is only needed if it's still open as otherwise
325                         // it will be automatically removed once the file-descriptor is closed.
326                         Native.epollCtlDel(epollFd.intValue(), fd);
327                     } catch (IOException e) {
328                         logger.debug("Unable to remove fd {} from epoll {}", fd, epollFd.intValue());
329                     }
330                 }
331                 handle.unregistered();
332             }
333         }
334 
335         void close() {
336             try {
337                 cancel();
338             } catch (Exception e) {
339                 logger.debug("Exception during canceling " + this, e);
340             }
341             try {
342                 handle.close();
343             } catch (Exception e) {
344                 logger.debug("Exception during closing " + handle, e);
345             }
346         }
347 
348         void handle(long ev) {
349             handle.handle(this, EpollIoOps.eventOf((int) ev));
350         }
351     }
352 
353     @Override
354     public IoRegistration register(IoHandle handle)
355             throws Exception {
356         final EpollIoHandle epollHandle = cast(handle);
357         DefaultEpollIoRegistration registration = new DefaultEpollIoRegistration(executor, epollHandle);
358         int fd = epollHandle.fd().intValue();
359         DefaultEpollIoRegistration old = registrations.put(fd, registration);
360 
361         // We either expect to have no registration in the map with the same FD or that the FD of the old registration
362         // is already closed.
363         assert old == null || !old.isValid();
364 
365         if (epollHandle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
366             numChannels++;
367         }
368         handle.registered();
369         return registration;
370     }
371 
372     @Override
373     public boolean isCompatible(Class<? extends IoHandle> handleType) {
374         return EpollIoHandle.class.isAssignableFrom(handleType);
375     }
376 
377     int numRegisteredChannels() {
378         return numChannels;
379     }
380 
381     List<Channel> registeredChannelsList() {
382         IntObjectMap<DefaultEpollIoRegistration> ch = registrations;
383         if (ch.isEmpty()) {
384             return Collections.emptyList();
385         }
386 
387         List<Channel> channels = new ArrayList<>(ch.size());
388         for (DefaultEpollIoRegistration registration : ch.values()) {
389             if (registration.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
390                 channels.add(((AbstractEpollChannel.AbstractEpollUnsafe) registration.handle).channel());
391             }
392         }
393         return Collections.unmodifiableList(channels);
394     }
395 
396     private long epollWait(IoHandlerContext context, long deadlineNanos) throws IOException {
397         if (deadlineNanos == NONE) {
398             return Native.epollWait(epollFd, events, timerFd,
399                     Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD); // disarm timer
400         }
401         long totalDelay = context.delayNanos(System.nanoTime());
402         int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
403         int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
404         return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
405     }
406 
407     private int epollWaitNoTimerChange() throws IOException {
408         return Native.epollWait(epollFd, events, false);
409     }
410 
411     private int epollWaitNow() throws IOException {
412         return Native.epollWait(epollFd, events, true);
413     }
414 
415     private int epollBusyWait() throws IOException {
416         return Native.epollBusyWait(epollFd, events);
417     }
418 
419     private int epollWaitTimeboxed() throws IOException {
420         // Wait with 1 second "safeguard" timeout
421         return Native.epollWait(epollFd, events, 1000);
422     }
423 
424     @Override
425     public int run(IoHandlerContext context) {
426         int handled = 0;
427         try {
428             int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
429             switch (strategy) {
430                 case SelectStrategy.CONTINUE:
431                     if (context.shouldReportActiveIoTime()) {
432                         context.reportActiveIoTime(0); // Report zero as we did no I/O.
433                     }
434                     return 0;
435 
436                 case SelectStrategy.BUSY_WAIT:
437                     strategy = epollBusyWait();
438                     break;
439 
440                 case SelectStrategy.SELECT:
441                     if (pendingWakeup) {
442                         // We are going to be immediately woken so no need to reset wakenUp
443                         // or check for timerfd adjustment.
444                         strategy = epollWaitTimeboxed();
445                         if (strategy != 0) {
446                             break;
447                         }
448                         // We timed out so assume that we missed the write event due to an
449                         // abnormally failed syscall (the write itself or a prior epoll_wait)
450                         logger.warn("Missed eventfd write (not seen after > 1 second)");
451                         pendingWakeup = false;
452                         if (!context.canBlock()) {
453                             break;
454                         }
455                         // fall-through
456                     }
457 
458                     long curDeadlineNanos = context.deadlineNanos();
459                     if (curDeadlineNanos == -1L) {
460                         curDeadlineNanos = NONE; // nothing on the calendar
461                     }
462                     nextWakeupNanos.set(curDeadlineNanos);
463                     try {
464                         if (context.canBlock()) {
465                             if (curDeadlineNanos == prevDeadlineNanos) {
466                                 // No timer activity needed
467                                 strategy = epollWaitNoTimerChange();
468                             } else {
469                                 // Timerfd needs to be re-armed or disarmed
470                                 long result = epollWait(context, curDeadlineNanos);
471                                 // The result contains the actual return value and if a timer was used or not.
472                                 // We need to "unpack" using the helper methods exposed in Native.
473                                 strategy = Native.epollReady(result);
474                                 prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
475                             }
476                         }
477                     } finally {
478                         // Try get() first to avoid much more expensive CAS in the case we
479                         // were woken via the wakeup() method (submitted task)
480                         if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
481                             pendingWakeup = true;
482                         }
483                     }
484                     // fallthrough
485                 default:
486             }
487             if (strategy > 0) {
488                 handled = strategy;
489                 if (context.shouldReportActiveIoTime()) {
490                     long activeIoStartTimeNanos = System.nanoTime();
491                     if (processReady(events, strategy)) {
492                         prevDeadlineNanos = NONE;
493                     }
494                     long activeIoEndTimeNanos = System.nanoTime();
495                     context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
496                 } else {
497                     if (processReady(events, strategy)) {
498                         prevDeadlineNanos = NONE;
499                     }
500                 }
501             } else if (context.shouldReportActiveIoTime()) {
502                 context.reportActiveIoTime(0);
503             }
504 
505             if (allowGrowing && strategy == events.length()) {
506                 //increase the size of the array as we needed the whole space for the events
507                 events.increase();
508             }
509         } catch (Error e) {
510             throw e;
511         } catch (Throwable t) {
512             handleLoopException(t);
513         }
514         return handled;
515     }
516 
517     /**
518      * Visible only for testing!
519      */
520     void handleLoopException(Throwable t) {
521         logger.warn("Unexpected exception in the selector loop.", t);
522 
523         // Prevent possible consecutive immediate failures that lead to
524         // excessive CPU consumption.
525         try {
526             Thread.sleep(1000);
527         } catch (InterruptedException e) {
528             // Ignore.
529         }
530     }
531 
532     // Returns true if a timerFd event was encountered
533     private boolean processReady(EpollEventArray events, int ready) {
534         boolean timerFired = false;
535         for (int i = 0; i < ready; i ++) {
536             final int fd = events.fd(i);
537             if (fd == eventFd.intValue()) {
538                 pendingWakeup = false;
539             } else if (fd == timerFd.intValue()) {
540                 timerFired = true;
541             } else {
542                 final long ev = events.events(i);
543 
544                 DefaultEpollIoRegistration registration = registrations.get(fd);
545                 if (registration != null) {
546                     registration.handle(ev);
547                 } else {
548                     // We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
549                     try {
550                         Native.epollCtlDel(epollFd.intValue(), fd);
551                     } catch (IOException ignore) {
552                         // This can happen but is nothing we need to worry about as we only try to delete
553                         // the fd from the epoll set as we not found it in our mappings. So this call to
554                         // epollCtlDel(...) is just to ensure we cleanup stuff and so may fail if it was
555                         // deleted before or the file descriptor was closed before.
556                     }
557                 }
558             }
559         }
560         return timerFired;
561     }
562 
563     /**
564      * This method is intended for use by process checkpoint/restore
565      * integration, such as OpenJDK CRaC.
566      * It's up to the caller to ensure that there is no concurrent use
567      * of the FDs while these are closed, e.g. by blocking the executor.
568      */
569     @UnstableApi
570     public void closeFileDescriptors() {
571         // Ensure any in-flight wakeup writes have been performed prior to closing eventFd.
572         while (pendingWakeup) {
573             try {
574                 int count = epollWaitTimeboxed();
575                 if (count == 0) {
576                     // We timed-out so assume that the write we're expecting isn't coming
577                     break;
578                 }
579                 for (int i = 0; i < count; i++) {
580                     if (events.fd(i) == eventFd.intValue()) {
581                         pendingWakeup = false;
582                         break;
583                     }
584                 }
585             } catch (IOException ignore) {
586                 // ignore
587             }
588         }
589         try {
590             eventFd.close();
591         } catch (IOException e) {
592             logger.warn("Failed to close the event fd.", e);
593         }
594         try {
595             timerFd.close();
596         } catch (IOException e) {
597             logger.warn("Failed to close the timer fd.", e);
598         }
599 
600         try {
601             epollFd.close();
602         } catch (IOException e) {
603             logger.warn("Failed to close the epoll fd.", e);
604         }
605     }
606 }