View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.DefaultSelectStrategyFactory;
20  import io.netty.channel.IoHandlerContext;
21  import io.netty.channel.IoHandle;
22  import io.netty.channel.IoHandler;
23  import io.netty.channel.IoHandlerFactory;
24  import io.netty.channel.IoOps;
25  import io.netty.channel.IoRegistration;
26  import io.netty.channel.SelectStrategy;
27  import io.netty.channel.SelectStrategyFactory;
28  import io.netty.channel.unix.FileDescriptor;
29  import io.netty.util.IntSupplier;
30  import io.netty.util.collection.IntObjectHashMap;
31  import io.netty.util.collection.IntObjectMap;
32  import io.netty.util.concurrent.ThreadAwareExecutor;
33  import io.netty.util.internal.ObjectUtil;
34  import io.netty.util.internal.StringUtil;
35  import io.netty.util.internal.SystemPropertyUtil;
36  import io.netty.util.internal.UnstableApi;
37  import io.netty.util.internal.logging.InternalLogger;
38  import io.netty.util.internal.logging.InternalLoggerFactory;
39  
40  import java.io.IOException;
41  import java.io.UncheckedIOException;
42  import java.util.ArrayList;
43  import java.util.Collections;
44  import java.util.List;
45  import java.util.concurrent.atomic.AtomicLong;
46  
47  import static java.lang.Math.min;
48  
49  /**
50   * {@link IoHandler} which uses epoll under the covers. Only works on Linux!
51   */
52  public class EpollIoHandler implements IoHandler {
53      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollIoHandler.class);
54      private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
55              SystemPropertyUtil.getLong("io.netty.channel.epoll.epollWaitThreshold", 10);
56  
57      {
58          Epoll.ensureAvailability();
59      }
60  
61      // Pick a number that no task could have previously used.
62      private long prevDeadlineNanos = NONE;
63      private FileDescriptor epollFd;
64      private FileDescriptor eventFd;
65      private FileDescriptor timerFd;
66      private final IntObjectMap<DefaultEpollIoRegistration> registrations = new IntObjectHashMap<>(4096);
67      private final boolean allowGrowing;
68      private final EpollEventArray events;
69      private final NativeArrays nativeArrays;
70  
71      private final SelectStrategy selectStrategy;
72      private final IntSupplier selectNowSupplier = new IntSupplier() {
73          @Override
74          public int get() throws Exception {
75              return epollWaitNow();
76          }
77      };
78      private final ThreadAwareExecutor executor;
79  
80      private static final long AWAKE = -1L;
81      private static final long NONE = Long.MAX_VALUE;
82  
83      // nextWakeupNanos is:
84      //    AWAKE            when EL is awake
85      //    NONE             when EL is waiting with no wakeup scheduled
86      //    other value T    when EL is waiting with wakeup scheduled at time T
87      private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
88      private boolean pendingWakeup;
89  
90      private int numChannels;
91  
92      // See https://man7.org/linux/man-pages/man2/timerfd_create.2.html.
93      private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
94  
95      /**
96       * Returns a new {@link IoHandlerFactory} that creates {@link EpollIoHandler} instances.
97       */
98      public static IoHandlerFactory newFactory() {
99          return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
100     }
101 
102     /**
103      * Returns a new {@link IoHandlerFactory} that creates {@link EpollIoHandler} instances.
104      */
105     public static IoHandlerFactory newFactory(final int maxEvents,
106                                               final SelectStrategyFactory selectStrategyFactory) {
107         Epoll.ensureAvailability();
108         ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
109         ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
110         return new IoHandlerFactory() {
111             @Override
112             public IoHandler newHandler(ThreadAwareExecutor executor) {
113                 return new EpollIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
114             }
115 
116             @Override
117             public boolean isChangingThreadSupported() {
118                 return true;
119             }
120         };
121     }
122 
123     // Package-private for testing
124     EpollIoHandler(ThreadAwareExecutor executor, int maxEvents, SelectStrategy strategy) {
125         this.executor = ObjectUtil.checkNotNull(executor, "executor");
126         selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
127         if (maxEvents == 0) {
128             allowGrowing = true;
129             events = new EpollEventArray(4096);
130         } else {
131             allowGrowing = false;
132             events = new EpollEventArray(maxEvents);
133         }
134         nativeArrays = new NativeArrays();
135         openFileDescriptors();
136     }
137 
138     private static EpollIoHandle cast(IoHandle handle) {
139         if (handle instanceof EpollIoHandle) {
140             return (EpollIoHandle) handle;
141         }
142         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
143     }
144 
145     private static EpollIoOps cast(IoOps ops) {
146         if (ops instanceof EpollIoOps) {
147             return (EpollIoOps) ops;
148         }
149         throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
150     }
151 
152     /**
153      * This method is intended for use by a process checkpoint/restore
154      * integration, such as OpenJDK CRaC.
155      */
156     @UnstableApi
157     public void openFileDescriptors() {
158         boolean success = false;
159         FileDescriptor epollFd = null;
160         FileDescriptor eventFd = null;
161         FileDescriptor timerFd = null;
162         try {
163             this.epollFd = epollFd = Native.newEpollCreate();
164             this.eventFd = eventFd = Native.newEventFd();
165             try {
166                 // It is important to use EPOLLET here as we only want to get the notification once per
167                 // wakeup and don't call eventfd_read(...).
168                 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
169             } catch (IOException e) {
170                 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
171             }
172             this.timerFd = timerFd = Native.newTimerFd();
173             try {
174                 // It is important to use EPOLLET here as we only want to get the notification once per
175                 // wakeup and don't call read(...).
176                 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
177             } catch (IOException e) {
178                 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
179             }
180             success = true;
181         } finally {
182             if (!success) {
183                 closeFileDescriptor(epollFd);
184                 closeFileDescriptor(eventFd);
185                 closeFileDescriptor(timerFd);
186             }
187         }
188     }
189 
190     private static void closeFileDescriptor(FileDescriptor fd) {
191         if (fd != null) {
192             try {
193                 fd.close();
194             } catch (Exception e) {
195                 // ignore
196             }
197         }
198     }
199 
200     @Override
201     public void wakeup() {
202         if (!executor.isExecutorThread(Thread.currentThread()) && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
203             // write to the evfd which will then wake-up epoll_wait(...)
204             Native.eventFdWrite(eventFd.intValue(), 1L);
205         }
206     }
207 
208     @Override
209     public void prepareToDestroy() {
210         // Using the intermediate collection to prevent ConcurrentModificationException.
211         // In the `close()` method, the channel is deleted from `channels` map.
212         DefaultEpollIoRegistration[] copy = registrations.values().toArray(new DefaultEpollIoRegistration[0]);
213 
214         for (DefaultEpollIoRegistration reg: copy) {
215             reg.close();
216         }
217     }
218 
219     @Override
220     public void destroy() {
221         try {
222             closeFileDescriptors();
223         } finally {
224             nativeArrays.free();
225             events.free();
226         }
227     }
228 
229     private enum RegistrationState {
230         // Was not added via EPOLL_CTL_ADD
231         Pending,
232         // Was added via EPOLL_CTL_ADD
233         Added,
234         // Was canceled an so removed via EPOLL_CTL_DEL
235         Cancelled
236     }
237 
238     private final class DefaultEpollIoRegistration
239             implements IoRegistration {
240         private final ThreadAwareExecutor executor;
241         private RegistrationState state = RegistrationState.Pending;
242         final EpollIoHandle handle;
243 
244         DefaultEpollIoRegistration(ThreadAwareExecutor executor, EpollIoHandle handle) {
245             this.executor = executor;
246             this.handle = handle;
247         }
248 
249         @SuppressWarnings("unchecked")
250         @Override
251         public <T> T attachment() {
252             return (T) nativeArrays;
253         }
254 
255         @Override
256         public long submit(IoOps ops) {
257             EpollIoOps epollIoOps = cast(ops);
258             try {
259                 synchronized (this) {
260                     switch (state) {
261                         case Cancelled:
262                             return -1;
263                         case Pending:
264                             Native.epollCtlAdd(epollFd.intValue(), handle.fd().intValue(), epollIoOps.value);
265                             state = RegistrationState.Added;
266                         case Added:
267                             Native.epollCtlMod(epollFd.intValue(), handle.fd().intValue(), epollIoOps.value);
268                             return epollIoOps.value;
269                         default:
270                             throw new IllegalStateException();
271                     }
272                 }
273             } catch (IOException e) {
274                 throw new UncheckedIOException(e);
275             }
276         }
277 
278         @Override
279         public synchronized boolean isValid() {
280             return state != RegistrationState.Cancelled;
281         }
282 
283         @Override
284         public boolean cancel() {
285             synchronized (this) {
286                 if (state == RegistrationState.Cancelled) {
287                     return false;
288                 }
289                 state = RegistrationState.Cancelled;
290             }
291             if (executor.isExecutorThread(Thread.currentThread())) {
292                 cancel0();
293             } else {
294                 executor.execute(this::cancel0);
295             }
296             return true;
297         }
298 
299         private void cancel0() {
300             int fd = handle.fd().intValue();
301             DefaultEpollIoRegistration old = registrations.remove(fd);
302             if (old != null) {
303                 if (old != this) {
304                     // The Channel mapping was already replaced due FD reuse, put back the stored Channel.
305                     registrations.put(fd, old);
306                     return;
307                 } else if (old.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
308                     numChannels--;
309                 }
310                 if (handle.fd().isOpen()) {
311                     try {
312                         // Remove the fd registration from epoll. This is only needed if it's still open as otherwise
313                         // it will be automatically removed once the file-descriptor is closed.
314                         Native.epollCtlDel(epollFd.intValue(), fd);
315                     } catch (IOException e) {
316                         logger.debug("Unable to remove fd {} from epoll {}", fd, epollFd.intValue());
317                     }
318                 }
319                 handle.unregistered();
320             }
321         }
322 
323         void close() {
324             try {
325                 cancel();
326             } catch (Exception e) {
327                 logger.debug("Exception during canceling " + this, e);
328             }
329             try {
330                 handle.close();
331             } catch (Exception e) {
332                 logger.debug("Exception during closing " + handle, e);
333             }
334         }
335 
336         void handle(long ev) {
337             handle.handle(this, EpollIoOps.eventOf((int) ev));
338         }
339     }
340 
341     @Override
342     public IoRegistration register(IoHandle handle)
343             throws Exception {
344         final EpollIoHandle epollHandle = cast(handle);
345         DefaultEpollIoRegistration registration = new DefaultEpollIoRegistration(executor, epollHandle);
346         int fd = epollHandle.fd().intValue();
347         DefaultEpollIoRegistration old = registrations.put(fd, registration);
348 
349         // We either expect to have no registration in the map with the same FD or that the FD of the old registration
350         // is already closed.
351         assert old == null || !old.isValid();
352 
353         if (epollHandle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
354             numChannels++;
355         }
356         handle.registered();
357         return registration;
358     }
359 
360     @Override
361     public boolean isCompatible(Class<? extends IoHandle> handleType) {
362         return EpollIoHandle.class.isAssignableFrom(handleType);
363     }
364 
365     int numRegisteredChannels() {
366         return numChannels;
367     }
368 
369     List<Channel> registeredChannelsList() {
370         IntObjectMap<DefaultEpollIoRegistration> ch = registrations;
371         if (ch.isEmpty()) {
372             return Collections.emptyList();
373         }
374 
375         List<Channel> channels = new ArrayList<>(ch.size());
376         for (DefaultEpollIoRegistration registration : ch.values()) {
377             if (registration.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
378                 channels.add(((AbstractEpollChannel.AbstractEpollUnsafe) registration.handle).channel());
379             }
380         }
381         return Collections.unmodifiableList(channels);
382     }
383 
384     private long epollWait(IoHandlerContext context, long deadlineNanos) throws IOException {
385         if (deadlineNanos == NONE) {
386             return Native.epollWait(epollFd, events, timerFd,
387                     Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD); // disarm timer
388         }
389         long totalDelay = context.delayNanos(System.nanoTime());
390         int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
391         int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
392         return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
393     }
394 
395     private int epollWaitNoTimerChange() throws IOException {
396         return Native.epollWait(epollFd, events, false);
397     }
398 
399     private int epollWaitNow() throws IOException {
400         return Native.epollWait(epollFd, events, true);
401     }
402 
403     private int epollBusyWait() throws IOException {
404         return Native.epollBusyWait(epollFd, events);
405     }
406 
407     private int epollWaitTimeboxed() throws IOException {
408         // Wait with 1 second "safeguard" timeout
409         return Native.epollWait(epollFd, events, 1000);
410     }
411 
412     @Override
413     public int run(IoHandlerContext context) {
414         int handled = 0;
415         try {
416             int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
417             switch (strategy) {
418                 case SelectStrategy.CONTINUE:
419                     if (context.shouldReportActiveIoTime()) {
420                         context.reportActiveIoTime(0); // Report zero as we did no I/O.
421                     }
422                     return 0;
423 
424                 case SelectStrategy.BUSY_WAIT:
425                     strategy = epollBusyWait();
426                     break;
427 
428                 case SelectStrategy.SELECT:
429                     if (pendingWakeup) {
430                         // We are going to be immediately woken so no need to reset wakenUp
431                         // or check for timerfd adjustment.
432                         strategy = epollWaitTimeboxed();
433                         if (strategy != 0) {
434                             break;
435                         }
436                         // We timed out so assume that we missed the write event due to an
437                         // abnormally failed syscall (the write itself or a prior epoll_wait)
438                         logger.warn("Missed eventfd write (not seen after > 1 second)");
439                         pendingWakeup = false;
440                         if (!context.canBlock()) {
441                             break;
442                         }
443                         // fall-through
444                     }
445 
446                     long curDeadlineNanos = context.deadlineNanos();
447                     if (curDeadlineNanos == -1L) {
448                         curDeadlineNanos = NONE; // nothing on the calendar
449                     }
450                     nextWakeupNanos.set(curDeadlineNanos);
451                     try {
452                         if (context.canBlock()) {
453                             if (curDeadlineNanos == prevDeadlineNanos) {
454                                 // No timer activity needed
455                                 strategy = epollWaitNoTimerChange();
456                             } else {
457                                 // Timerfd needs to be re-armed or disarmed
458                                 long result = epollWait(context, curDeadlineNanos);
459                                 // The result contains the actual return value and if a timer was used or not.
460                                 // We need to "unpack" using the helper methods exposed in Native.
461                                 strategy = Native.epollReady(result);
462                                 prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
463                             }
464                         }
465                     } finally {
466                         // Try get() first to avoid much more expensive CAS in the case we
467                         // were woken via the wakeup() method (submitted task)
468                         if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
469                             pendingWakeup = true;
470                         }
471                     }
472                     // fallthrough
473                 default:
474             }
475             if (strategy > 0) {
476                 handled = strategy;
477                 if (context.shouldReportActiveIoTime()) {
478                     long activeIoStartTimeNanos = System.nanoTime();
479                     if (processReady(events, strategy)) {
480                         prevDeadlineNanos = NONE;
481                     }
482                     long activeIoEndTimeNanos = System.nanoTime();
483                     context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
484                 } else {
485                     if (processReady(events, strategy)) {
486                         prevDeadlineNanos = NONE;
487                     }
488                 }
489             } else if (context.shouldReportActiveIoTime()) {
490                 context.reportActiveIoTime(0);
491             }
492 
493             if (allowGrowing && strategy == events.length()) {
494                 //increase the size of the array as we needed the whole space for the events
495                 events.increase();
496             }
497         } catch (Error e) {
498             throw e;
499         } catch (Throwable t) {
500             handleLoopException(t);
501         }
502         return handled;
503     }
504 
505     /**
506      * Visible only for testing!
507      */
508     void handleLoopException(Throwable t) {
509         logger.warn("Unexpected exception in the selector loop.", t);
510 
511         // Prevent possible consecutive immediate failures that lead to
512         // excessive CPU consumption.
513         try {
514             Thread.sleep(1000);
515         } catch (InterruptedException e) {
516             // Ignore.
517         }
518     }
519 
520     // Returns true if a timerFd event was encountered
521     private boolean processReady(EpollEventArray events, int ready) {
522         boolean timerFired = false;
523         for (int i = 0; i < ready; i ++) {
524             final int fd = events.fd(i);
525             if (fd == eventFd.intValue()) {
526                 pendingWakeup = false;
527             } else if (fd == timerFd.intValue()) {
528                 timerFired = true;
529             } else {
530                 final long ev = events.events(i);
531 
532                 DefaultEpollIoRegistration registration = registrations.get(fd);
533                 if (registration != null) {
534                     registration.handle(ev);
535                 } else {
536                     // We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
537                     try {
538                         Native.epollCtlDel(epollFd.intValue(), fd);
539                     } catch (IOException ignore) {
540                         // This can happen but is nothing we need to worry about as we only try to delete
541                         // the fd from the epoll set as we not found it in our mappings. So this call to
542                         // epollCtlDel(...) is just to ensure we cleanup stuff and so may fail if it was
543                         // deleted before or the file descriptor was closed before.
544                     }
545                 }
546             }
547         }
548         return timerFired;
549     }
550 
551     /**
552      * This method is intended for use by process checkpoint/restore
553      * integration, such as OpenJDK CRaC.
554      * It's up to the caller to ensure that there is no concurrent use
555      * of the FDs while these are closed, e.g. by blocking the executor.
556      */
557     @UnstableApi
558     public void closeFileDescriptors() {
559         // Ensure any in-flight wakeup writes have been performed prior to closing eventFd.
560         while (pendingWakeup) {
561             try {
562                 int count = epollWaitTimeboxed();
563                 if (count == 0) {
564                     // We timed-out so assume that the write we're expecting isn't coming
565                     break;
566                 }
567                 for (int i = 0; i < count; i++) {
568                     if (events.fd(i) == eventFd.intValue()) {
569                         pendingWakeup = false;
570                         break;
571                     }
572                 }
573             } catch (IOException ignore) {
574                 // ignore
575             }
576         }
577         try {
578             eventFd.close();
579         } catch (IOException e) {
580             logger.warn("Failed to close the event fd.", e);
581         }
582         try {
583             timerFd.close();
584         } catch (IOException e) {
585             logger.warn("Failed to close the timer fd.", e);
586         }
587 
588         try {
589             epollFd.close();
590         } catch (IOException e) {
591             logger.warn("Failed to close the epoll fd.", e);
592         }
593     }
594 }