View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelException;
20  import io.netty.channel.EventLoop;
21  import io.netty.channel.EventLoopException;
22  import io.netty.channel.EventLoopTaskQueueFactory;
23  import io.netty.channel.SelectStrategy;
24  import io.netty.channel.SingleThreadEventLoop;
25  import io.netty.util.IntSupplier;
26  import io.netty.util.concurrent.RejectedExecutionHandler;
27  import io.netty.util.internal.ObjectUtil;
28  import io.netty.util.internal.PlatformDependent;
29  import io.netty.util.internal.ReflectionUtil;
30  import io.netty.util.internal.SystemPropertyUtil;
31  import io.netty.util.internal.logging.InternalLogger;
32  import io.netty.util.internal.logging.InternalLoggerFactory;
33  
34  import java.io.IOException;
35  import java.lang.reflect.Field;
36  import java.nio.channels.CancelledKeyException;
37  import java.nio.channels.SelectableChannel;
38  import java.nio.channels.Selector;
39  import java.nio.channels.SelectionKey;
40  
41  import java.nio.channels.spi.SelectorProvider;
42  import java.security.AccessController;
43  import java.security.PrivilegedAction;
44  import java.util.ArrayList;
45  import java.util.Collection;
46  import java.util.Iterator;
47  import java.util.Queue;
48  import java.util.Set;
49  import java.util.concurrent.Executor;
50  import java.util.concurrent.atomic.AtomicLong;
51  
52  /**
53   * {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a
54   * {@link Selector} and so does the multi-plexing of these in the event loop.
55   *
56   */
57  public final class NioEventLoop extends SingleThreadEventLoop {
58  
59      private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
60  
61      private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
62  
63      private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
64              SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
65  
66      private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
67      private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
68  
69      private final IntSupplier selectNowSupplier = new IntSupplier() {
70          @Override
71          public int get() throws Exception {
72              return selectNow();
73          }
74      };
75  
76      // Workaround for JDK NIO bug.
77      //
78      // See:
79      // - https://bugs.openjdk.java.net/browse/JDK-6427854 for first few dev (unreleased) builds of JDK 7
80      // - https://bugs.openjdk.java.net/browse/JDK-6527572 for JDK prior to 5.0u15-rev and 6u10
81      // - https://github.com/netty/netty/issues/203
82      static {
83          if (PlatformDependent.javaVersion() < 7) {
84              final String key = "sun.nio.ch.bugLevel";
85              final String bugLevel = SystemPropertyUtil.get(key);
86              if (bugLevel == null) {
87                  try {
88                      AccessController.doPrivileged(new PrivilegedAction<Void>() {
89                          @Override
90                          public Void run() {
91                              System.setProperty(key, "");
92                              return null;
93                          }
94                      });
95                  } catch (final SecurityException e) {
96                      logger.debug("Unable to get/set System Property: " + key, e);
97                  }
98              }
99          }
100 
101         int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
102         if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
103             selectorAutoRebuildThreshold = 0;
104         }
105 
106         SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
107 
108         if (logger.isDebugEnabled()) {
109             logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
110             logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
111         }
112     }
113 
114     /**
115      * The NIO {@link Selector}.
116      */
117     private Selector selector;
118     private Selector unwrappedSelector;
119     private SelectedSelectionKeySet selectedKeys;
120 
121     private final SelectorProvider provider;
122 
123     private static final long AWAKE = -1L;
124     private static final long NONE = Long.MAX_VALUE;
125 
126     // nextWakeupNanos is:
127     //    AWAKE            when EL is awake
128     //    NONE             when EL is waiting with no wakeup scheduled
129     //    other value T    when EL is waiting with wakeup scheduled at time T
130     private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
131 
132     private final SelectStrategy selectStrategy;
133 
134     private volatile int ioRatio = 50;
135     private int cancelledKeys;
136     private boolean needsToSelectAgain;
137 
138     NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
139                  SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
140                  EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
141         super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
142                 rejectedExecutionHandler);
143         this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
144         this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
145         final SelectorTuple selectorTuple = openSelector();
146         this.selector = selectorTuple.selector;
147         this.unwrappedSelector = selectorTuple.unwrappedSelector;
148     }
149 
150     private static Queue<Runnable> newTaskQueue(
151             EventLoopTaskQueueFactory queueFactory) {
152         if (queueFactory == null) {
153             return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
154         }
155         return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
156     }
157 
158     private static final class SelectorTuple {
159         final Selector unwrappedSelector;
160         final Selector selector;
161 
162         SelectorTuple(Selector unwrappedSelector) {
163             this.unwrappedSelector = unwrappedSelector;
164             this.selector = unwrappedSelector;
165         }
166 
167         SelectorTuple(Selector unwrappedSelector, Selector selector) {
168             this.unwrappedSelector = unwrappedSelector;
169             this.selector = selector;
170         }
171     }
172 
173     private SelectorTuple openSelector() {
174         final Selector unwrappedSelector;
175         try {
176             unwrappedSelector = provider.openSelector();
177         } catch (IOException e) {
178             throw new ChannelException("failed to open a new selector", e);
179         }
180 
181         if (DISABLE_KEY_SET_OPTIMIZATION) {
182             return new SelectorTuple(unwrappedSelector);
183         }
184 
185         Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
186             @Override
187             public Object run() {
188                 try {
189                     return Class.forName(
190                             "sun.nio.ch.SelectorImpl",
191                             false,
192                             PlatformDependent.getSystemClassLoader());
193                 } catch (Throwable cause) {
194                     return cause;
195                 }
196             }
197         });
198 
199         if (!(maybeSelectorImplClass instanceof Class) ||
200             // ensure the current selector implementation is what we can instrument.
201             !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
202             if (maybeSelectorImplClass instanceof Throwable) {
203                 Throwable t = (Throwable) maybeSelectorImplClass;
204                 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
205             }
206             return new SelectorTuple(unwrappedSelector);
207         }
208 
209         final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
210         final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
211 
212         Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
213             @Override
214             public Object run() {
215                 try {
216                     Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
217                     Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
218 
219                     if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
220                         // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
221                         // This allows us to also do this in Java9+ without any extra flags.
222                         long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
223                         long publicSelectedKeysFieldOffset =
224                                 PlatformDependent.objectFieldOffset(publicSelectedKeysField);
225 
226                         if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
227                             PlatformDependent.putObject(
228                                     unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
229                             PlatformDependent.putObject(
230                                     unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
231                             return null;
232                         }
233                         // We could not retrieve the offset, lets try reflection as last-resort.
234                     }
235 
236                     Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
237                     if (cause != null) {
238                         return cause;
239                     }
240                     cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
241                     if (cause != null) {
242                         return cause;
243                     }
244 
245                     selectedKeysField.set(unwrappedSelector, selectedKeySet);
246                     publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
247                     return null;
248                 } catch (NoSuchFieldException e) {
249                     return e;
250                 } catch (IllegalAccessException e) {
251                     return e;
252                 }
253             }
254         });
255 
256         if (maybeException instanceof Exception) {
257             selectedKeys = null;
258             Exception e = (Exception) maybeException;
259             logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
260             return new SelectorTuple(unwrappedSelector);
261         }
262         selectedKeys = selectedKeySet;
263         logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
264         return new SelectorTuple(unwrappedSelector,
265                                  new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
266     }
267 
268     /**
269      * Returns the {@link SelectorProvider} used by this {@link NioEventLoop} to obtain the {@link Selector}.
270      */
271     public SelectorProvider selectorProvider() {
272         return provider;
273     }
274 
275     @Override
276     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
277         return newTaskQueue0(maxPendingTasks);
278     }
279 
280     private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
281         // This event loop never calls takeTask()
282         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
283                 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
284     }
285 
286     /**
287      * Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
288      * of this event loop.  Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
289      * be executed by this event loop when the {@link SelectableChannel} is ready.
290      */
291     public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
292         ObjectUtil.checkNotNull(ch, "ch");
293         if (interestOps == 0) {
294             throw new IllegalArgumentException("interestOps must be non-zero.");
295         }
296         if ((interestOps & ~ch.validOps()) != 0) {
297             throw new IllegalArgumentException(
298                     "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
299         }
300         ObjectUtil.checkNotNull(task, "task");
301 
302         if (isShutdown()) {
303             throw new IllegalStateException("event loop shut down");
304         }
305 
306         if (inEventLoop()) {
307             register0(ch, interestOps, task);
308         } else {
309             try {
310                 // Offload to the EventLoop as otherwise java.nio.channels.spi.AbstractSelectableChannel.register
311                 // may block for a long time while trying to obtain an internal lock that may be hold while selecting.
312                 submit(new Runnable() {
313                     @Override
314                     public void run() {
315                         register0(ch, interestOps, task);
316                     }
317                 }).sync();
318             } catch (InterruptedException ignore) {
319                 // Even if interrupted we did schedule it so just mark the Thread as interrupted.
320                 Thread.currentThread().interrupt();
321             }
322         }
323     }
324 
325     private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
326         try {
327             ch.register(unwrappedSelector, interestOps, task);
328         } catch (Exception e) {
329             throw new EventLoopException("failed to register a channel", e);
330         }
331     }
332 
333     /**
334      * Returns the percentage of the desired amount of time spent for I/O in the event loop.
335      */
336     public int getIoRatio() {
337         return ioRatio;
338     }
339 
340     /**
341      * Sets the percentage of the desired amount of time spent for I/O in the event loop. Value range from 1-100.
342      * The default value is {@code 50}, which means the event loop will try to spend the same amount of time for I/O
343      * as for non-I/O tasks. The lower the number the more time can be spent on non-I/O tasks. If value set to
344      * {@code 100}, this feature will be disabled and event loop will not attempt to balance I/O and non-I/O tasks.
345      */
346     public void setIoRatio(int ioRatio) {
347         if (ioRatio <= 0 || ioRatio > 100) {
348             throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
349         }
350         this.ioRatio = ioRatio;
351     }
352 
353     /**
354      * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
355      * around the infamous epoll 100% CPU bug.
356      */
357     public void rebuildSelector() {
358         if (!inEventLoop()) {
359             execute(new Runnable() {
360                 @Override
361                 public void run() {
362                     rebuildSelector0();
363                 }
364             });
365             return;
366         }
367         rebuildSelector0();
368     }
369 
370     @Override
371     public int registeredChannels() {
372         return selector.keys().size() - cancelledKeys;
373     }
374 
375     private void rebuildSelector0() {
376         final Selector oldSelector = selector;
377         final SelectorTuple newSelectorTuple;
378 
379         if (oldSelector == null) {
380             return;
381         }
382 
383         try {
384             newSelectorTuple = openSelector();
385         } catch (Exception e) {
386             logger.warn("Failed to create a new Selector.", e);
387             return;
388         }
389 
390         // Register all channels to the new Selector.
391         int nChannels = 0;
392         for (SelectionKey key: oldSelector.keys()) {
393             Object a = key.attachment();
394             try {
395                 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
396                     continue;
397                 }
398 
399                 int interestOps = key.interestOps();
400                 key.cancel();
401                 SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
402                 if (a instanceof AbstractNioChannel) {
403                     // Update SelectionKey
404                     ((AbstractNioChannel) a).selectionKey = newKey;
405                 }
406                 nChannels ++;
407             } catch (Exception e) {
408                 logger.warn("Failed to re-register a Channel to the new Selector.", e);
409                 if (a instanceof AbstractNioChannel) {
410                     AbstractNioChannel ch = (AbstractNioChannel) a;
411                     ch.unsafe().close(ch.unsafe().voidPromise());
412                 } else {
413                     @SuppressWarnings("unchecked")
414                     NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
415                     invokeChannelUnregistered(task, key, e);
416                 }
417             }
418         }
419 
420         selector = newSelectorTuple.selector;
421         unwrappedSelector = newSelectorTuple.unwrappedSelector;
422 
423         try {
424             // time to close the old selector as everything else is registered to the new one
425             oldSelector.close();
426         } catch (Throwable t) {
427             if (logger.isWarnEnabled()) {
428                 logger.warn("Failed to close the old Selector.", t);
429             }
430         }
431 
432         if (logger.isInfoEnabled()) {
433             logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
434         }
435     }
436 
437     @Override
438     protected void run() {
439         int selectCnt = 0;
440         for (;;) {
441             try {
442                 int strategy;
443                 try {
444                     strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
445                     switch (strategy) {
446                     case SelectStrategy.CONTINUE:
447                         continue;
448 
449                     case SelectStrategy.BUSY_WAIT:
450                         // fall-through to SELECT since the busy-wait is not supported with NIO
451 
452                     case SelectStrategy.SELECT:
453                         long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
454                         if (curDeadlineNanos == -1L) {
455                             curDeadlineNanos = NONE; // nothing on the calendar
456                         }
457                         nextWakeupNanos.set(curDeadlineNanos);
458                         try {
459                             if (!hasTasks()) {
460                                 strategy = select(curDeadlineNanos);
461                             }
462                         } finally {
463                             // This update is just to help block unnecessary selector wakeups
464                             // so use of lazySet is ok (no race condition)
465                             nextWakeupNanos.lazySet(AWAKE);
466                         }
467                         // fall through
468                     default:
469                     }
470                 } catch (IOException e) {
471                     // If we receive an IOException here its because the Selector is messed up. Let's rebuild
472                     // the selector and retry. https://github.com/netty/netty/issues/8566
473                     rebuildSelector0();
474                     selectCnt = 0;
475                     handleLoopException(e);
476                     continue;
477                 }
478 
479                 selectCnt++;
480                 cancelledKeys = 0;
481                 needsToSelectAgain = false;
482                 final int ioRatio = this.ioRatio;
483                 boolean ranTasks;
484                 if (ioRatio == 100) {
485                     try {
486                         if (strategy > 0) {
487                             processSelectedKeys();
488                         }
489                     } finally {
490                         // Ensure we always run tasks.
491                         ranTasks = runAllTasks();
492                     }
493                 } else if (strategy > 0) {
494                     final long ioStartTime = System.nanoTime();
495                     try {
496                         processSelectedKeys();
497                     } finally {
498                         // Ensure we always run tasks.
499                         final long ioTime = System.nanoTime() - ioStartTime;
500                         ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
501                     }
502                 } else {
503                     ranTasks = runAllTasks(0); // This will run the minimum number of tasks
504                 }
505 
506                 if (ranTasks || strategy > 0) {
507                     if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
508                         logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
509                                 selectCnt - 1, selector);
510                     }
511                     selectCnt = 0;
512                 } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
513                     selectCnt = 0;
514                 }
515             } catch (CancelledKeyException e) {
516                 // Harmless exception - log anyway
517                 if (logger.isDebugEnabled()) {
518                     logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
519                             selector, e);
520                 }
521             } catch (Error e) {
522                 throw e;
523             } catch (Throwable t) {
524                 handleLoopException(t);
525             } finally {
526                 // Always handle shutdown even if the loop processing threw an exception.
527                 try {
528                     if (isShuttingDown()) {
529                         closeAll();
530                         if (confirmShutdown()) {
531                             return;
532                         }
533                     }
534                 } catch (Error e) {
535                     throw e;
536                 } catch (Throwable t) {
537                     handleLoopException(t);
538                 }
539             }
540         }
541     }
542 
543     // returns true if selectCnt should be reset
544     private boolean unexpectedSelectorWakeup(int selectCnt) {
545         if (Thread.interrupted()) {
546             // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
547             // As this is most likely a bug in the handler of the user or it's client library we will
548             // also log it.
549             //
550             // See https://github.com/netty/netty/issues/2426
551             if (logger.isDebugEnabled()) {
552                 logger.debug("Selector.select() returned prematurely because " +
553                         "Thread.currentThread().interrupt() was called. Use " +
554                         "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
555             }
556             return true;
557         }
558         if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
559                 selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
560             // The selector returned prematurely many times in a row.
561             // Rebuild the selector to work around the problem.
562             logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
563                     selectCnt, selector);
564             rebuildSelector();
565             return true;
566         }
567         return false;
568     }
569 
570     private static void handleLoopException(Throwable t) {
571         logger.warn("Unexpected exception in the selector loop.", t);
572 
573         // Prevent possible consecutive immediate failures that lead to
574         // excessive CPU consumption.
575         try {
576             Thread.sleep(1000);
577         } catch (InterruptedException e) {
578             // Ignore.
579         }
580     }
581 
582     private void processSelectedKeys() {
583         if (selectedKeys != null) {
584             processSelectedKeysOptimized();
585         } else {
586             processSelectedKeysPlain(selector.selectedKeys());
587         }
588     }
589 
590     @Override
591     protected void cleanup() {
592         try {
593             selector.close();
594         } catch (IOException e) {
595             logger.warn("Failed to close a selector.", e);
596         }
597     }
598 
599     void cancel(SelectionKey key) {
600         key.cancel();
601         cancelledKeys ++;
602         if (cancelledKeys >= CLEANUP_INTERVAL) {
603             cancelledKeys = 0;
604             needsToSelectAgain = true;
605         }
606     }
607 
608     private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
609         // check if the set is empty and if so just return to not create garbage by
610         // creating a new Iterator every time even if there is nothing to process.
611         // See https://github.com/netty/netty/issues/597
612         if (selectedKeys.isEmpty()) {
613             return;
614         }
615 
616         Iterator<SelectionKey> i = selectedKeys.iterator();
617         for (;;) {
618             final SelectionKey k = i.next();
619             final Object a = k.attachment();
620             i.remove();
621 
622             if (a instanceof AbstractNioChannel) {
623                 processSelectedKey(k, (AbstractNioChannel) a);
624             } else {
625                 @SuppressWarnings("unchecked")
626                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
627                 processSelectedKey(k, task);
628             }
629 
630             if (!i.hasNext()) {
631                 break;
632             }
633 
634             if (needsToSelectAgain) {
635                 selectAgain();
636                 selectedKeys = selector.selectedKeys();
637 
638                 // Create the iterator again to avoid ConcurrentModificationException
639                 if (selectedKeys.isEmpty()) {
640                     break;
641                 } else {
642                     i = selectedKeys.iterator();
643                 }
644             }
645         }
646     }
647 
648     private void processSelectedKeysOptimized() {
649         for (int i = 0; i < selectedKeys.size; ++i) {
650             final SelectionKey k = selectedKeys.keys[i];
651             // null out entry in the array to allow to have it GC'ed once the Channel close
652             // See https://github.com/netty/netty/issues/2363
653             selectedKeys.keys[i] = null;
654 
655             final Object a = k.attachment();
656 
657             if (a instanceof AbstractNioChannel) {
658                 processSelectedKey(k, (AbstractNioChannel) a);
659             } else {
660                 @SuppressWarnings("unchecked")
661                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
662                 processSelectedKey(k, task);
663             }
664 
665             if (needsToSelectAgain) {
666                 // null out entries in the array to allow to have it GC'ed once the Channel close
667                 // See https://github.com/netty/netty/issues/2363
668                 selectedKeys.reset(i + 1);
669 
670                 selectAgain();
671                 i = -1;
672             }
673         }
674     }
675 
676     private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
677         final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
678         if (!k.isValid()) {
679             final EventLoop eventLoop;
680             try {
681                 eventLoop = ch.eventLoop();
682             } catch (Throwable ignored) {
683                 // If the channel implementation throws an exception because there is no event loop, we ignore this
684                 // because we are only trying to determine if ch is registered to this event loop and thus has authority
685                 // to close ch.
686                 return;
687             }
688             // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
689             // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
690             // still healthy and should not be closed.
691             // See https://github.com/netty/netty/issues/5125
692             if (eventLoop == this) {
693                 // close the channel if the key is not valid anymore
694                 unsafe.close(unsafe.voidPromise());
695             }
696             return;
697         }
698 
699         try {
700             int readyOps = k.readyOps();
701             // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
702             // the NIO JDK channel implementation may throw a NotYetConnectedException.
703             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
704                 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
705                 // See https://github.com/netty/netty/issues/924
706                 int ops = k.interestOps();
707                 ops &= ~SelectionKey.OP_CONNECT;
708                 k.interestOps(ops);
709 
710                 unsafe.finishConnect();
711             }
712 
713             // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
714             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
715                 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
716                 ch.unsafe().forceFlush();
717             }
718 
719             // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
720             // to a spin loop
721             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
722                 unsafe.read();
723             }
724         } catch (CancelledKeyException ignored) {
725             unsafe.close(unsafe.voidPromise());
726         }
727     }
728 
729     private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
730         int state = 0;
731         try {
732             task.channelReady(k.channel(), k);
733             state = 1;
734         } catch (Exception e) {
735             k.cancel();
736             invokeChannelUnregistered(task, k, e);
737             state = 2;
738         } finally {
739             switch (state) {
740             case 0:
741                 k.cancel();
742                 invokeChannelUnregistered(task, k, null);
743                 break;
744             case 1:
745                 if (!k.isValid()) { // Cancelled by channelReady()
746                     invokeChannelUnregistered(task, k, null);
747                 }
748                 break;
749             default:
750                  break;
751             }
752         }
753     }
754 
755     private void closeAll() {
756         selectAgain();
757         Set<SelectionKey> keys = selector.keys();
758         Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
759         for (SelectionKey k: keys) {
760             Object a = k.attachment();
761             if (a instanceof AbstractNioChannel) {
762                 channels.add((AbstractNioChannel) a);
763             } else {
764                 k.cancel();
765                 @SuppressWarnings("unchecked")
766                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
767                 invokeChannelUnregistered(task, k, null);
768             }
769         }
770 
771         for (AbstractNioChannel ch: channels) {
772             ch.unsafe().close(ch.unsafe().voidPromise());
773         }
774     }
775 
776     private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause) {
777         try {
778             task.channelUnregistered(k.channel(), cause);
779         } catch (Exception e) {
780             logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e);
781         }
782     }
783 
784     @Override
785     protected void wakeup(boolean inEventLoop) {
786         if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
787             selector.wakeup();
788         }
789     }
790 
791     @Override
792     protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
793         // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
794         return deadlineNanos < nextWakeupNanos.get();
795     }
796 
797     @Override
798     protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
799         // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
800         return deadlineNanos < nextWakeupNanos.get();
801     }
802 
803     Selector unwrappedSelector() {
804         return unwrappedSelector;
805     }
806 
807     int selectNow() throws IOException {
808         return selector.selectNow();
809     }
810 
811     private int select(long deadlineNanos) throws IOException {
812         if (deadlineNanos == NONE) {
813             return selector.select();
814         }
815         // Timeout will only be 0 if deadline is within 5 microsecs
816         long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
817         return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
818     }
819 
820     private void selectAgain() {
821         needsToSelectAgain = false;
822         try {
823             selector.selectNow();
824         } catch (Throwable t) {
825             logger.warn("Failed to update SelectionKeys.", t);
826         }
827     }
828 }