View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelException;
20  import io.netty.channel.EventLoop;
21  import io.netty.channel.EventLoopException;
22  import io.netty.channel.SelectStrategy;
23  import io.netty.channel.SingleThreadEventLoop;
24  import io.netty.util.IntSupplier;
25  import io.netty.util.concurrent.RejectedExecutionHandler;
26  import io.netty.util.internal.PlatformDependent;
27  import io.netty.util.internal.ReflectionUtil;
28  import io.netty.util.internal.SystemPropertyUtil;
29  import io.netty.util.internal.logging.InternalLogger;
30  import io.netty.util.internal.logging.InternalLoggerFactory;
31  
32  import java.io.IOException;
33  import java.lang.reflect.Field;
34  import java.nio.channels.CancelledKeyException;
35  import java.nio.channels.SelectableChannel;
36  import java.nio.channels.SelectionKey;
37  import java.nio.channels.Selector;
38  import java.nio.channels.spi.SelectorProvider;
39  import java.security.AccessController;
40  import java.security.PrivilegedAction;
41  import java.util.ArrayList;
42  import java.util.Collection;
43  import java.util.Iterator;
44  import java.util.Queue;
45  import java.util.Set;
46  import java.util.concurrent.Executor;
47  import java.util.concurrent.TimeUnit;
48  import java.util.concurrent.atomic.AtomicBoolean;
49  
50  /**
51   * {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a
52   * {@link Selector} and so does the multi-plexing of these in the event loop.
53   *
54   */
55  public final class NioEventLoop extends SingleThreadEventLoop {
56  
57      private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
58  
59      private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
60  
61      private static final boolean DISABLE_KEYSET_OPTIMIZATION =
62              SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
63  
64      private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
65      private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
66  
67      private final IntSupplier selectNowSupplier = new IntSupplier() {
68          @Override
69          public int get() throws Exception {
70              return selectNow();
71          }
72      };
73  
74      // Workaround for JDK NIO bug.
75      //
76      // See:
77      // - http://bugs.sun.com/view_bug.do?bug_id=6427854
78      // - https://github.com/netty/netty/issues/203
79      static {
80          final String key = "sun.nio.ch.bugLevel";
81          final String buglevel = SystemPropertyUtil.get(key);
82          if (buglevel == null) {
83              try {
84                  AccessController.doPrivileged(new PrivilegedAction<Void>() {
85                      @Override
86                      public Void run() {
87                          System.setProperty(key, "");
88                          return null;
89                      }
90                  });
91              } catch (final SecurityException e) {
92                  logger.debug("Unable to get/set System Property: " + key, e);
93              }
94          }
95  
96          int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
97          if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
98              selectorAutoRebuildThreshold = 0;
99          }
100 
101         SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
102 
103         if (logger.isDebugEnabled()) {
104             logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEYSET_OPTIMIZATION);
105             logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
106         }
107     }
108 
109     /**
110      * The NIO {@link Selector}.
111      */
112     private Selector selector;
113     private Selector unwrappedSelector;
114     private SelectedSelectionKeySet selectedKeys;
115 
116     private final SelectorProvider provider;
117 
118     /**
119      * Boolean that controls determines if a blocked Selector.select should
120      * break out of its selection process. In our case we use a timeout for
121      * the select method and the select method will block for that time unless
122      * waken up.
123      */
124     private final AtomicBoolean wakenUp = new AtomicBoolean();
125 
126     private final SelectStrategy selectStrategy;
127 
128     private volatile int ioRatio = 50;
129     private int cancelledKeys;
130     private boolean needsToSelectAgain;
131 
132     NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
133                  SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
134         super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
135         if (selectorProvider == null) {
136             throw new NullPointerException("selectorProvider");
137         }
138         if (strategy == null) {
139             throw new NullPointerException("selectStrategy");
140         }
141         provider = selectorProvider;
142         final SelectorTuple selectorTuple = openSelector();
143         selector = selectorTuple.selector;
144         unwrappedSelector = selectorTuple.unwrappedSelector;
145         selectStrategy = strategy;
146     }
147 
148     private static final class SelectorTuple {
149         final Selector unwrappedSelector;
150         final Selector selector;
151 
152         SelectorTuple(Selector unwrappedSelector) {
153             this.unwrappedSelector = unwrappedSelector;
154             this.selector = unwrappedSelector;
155         }
156 
157         SelectorTuple(Selector unwrappedSelector, Selector selector) {
158             this.unwrappedSelector = unwrappedSelector;
159             this.selector = selector;
160         }
161     }
162 
163     private SelectorTuple openSelector() {
164         final Selector unwrappedSelector;
165         try {
166             unwrappedSelector = provider.openSelector();
167         } catch (IOException e) {
168             throw new ChannelException("failed to open a new selector", e);
169         }
170 
171         if (DISABLE_KEYSET_OPTIMIZATION) {
172             return new SelectorTuple(unwrappedSelector);
173         }
174 
175         Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
176             @Override
177             public Object run() {
178                 try {
179                     return Class.forName(
180                             "sun.nio.ch.SelectorImpl",
181                             false,
182                             PlatformDependent.getSystemClassLoader());
183                 } catch (Throwable cause) {
184                     return cause;
185                 }
186             }
187         });
188 
189         if (!(maybeSelectorImplClass instanceof Class) ||
190             // ensure the current selector implementation is what we can instrument.
191             !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
192             if (maybeSelectorImplClass instanceof Throwable) {
193                 Throwable t = (Throwable) maybeSelectorImplClass;
194                 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
195             }
196             return new SelectorTuple(unwrappedSelector);
197         }
198 
199         final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
200         final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
201 
202         Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
203             @Override
204             public Object run() {
205                 try {
206                     Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
207                     Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
208 
209                     if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
210                         // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
211                         // This allows us to also do this in Java9+ without any extra flags.
212                         long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
213                         long publicSelectedKeysFieldOffset =
214                                 PlatformDependent.objectFieldOffset(publicSelectedKeysField);
215 
216                         if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
217                             PlatformDependent.putObject(
218                                     unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
219                             PlatformDependent.putObject(
220                                     unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
221                             return null;
222                         }
223                         // We could not retrieve the offset, lets try reflection as last-resort.
224                     }
225 
226                     Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
227                     if (cause != null) {
228                         return cause;
229                     }
230                     cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
231                     if (cause != null) {
232                         return cause;
233                     }
234 
235                     selectedKeysField.set(unwrappedSelector, selectedKeySet);
236                     publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
237                     return null;
238                 } catch (NoSuchFieldException e) {
239                     return e;
240                 } catch (IllegalAccessException e) {
241                     return e;
242                 }
243             }
244         });
245 
246         if (maybeException instanceof Exception) {
247             selectedKeys = null;
248             Exception e = (Exception) maybeException;
249             logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
250             return new SelectorTuple(unwrappedSelector);
251         }
252         selectedKeys = selectedKeySet;
253         logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
254         return new SelectorTuple(unwrappedSelector,
255                                  new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
256     }
257 
258     /**
259      * Returns the {@link SelectorProvider} used by this {@link NioEventLoop} to obtain the {@link Selector}.
260      */
261     public SelectorProvider selectorProvider() {
262         return provider;
263     }
264 
265     @Override
266     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
267         // This event loop never calls takeTask()
268         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
269                                                     : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
270     }
271 
272     /**
273      * Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
274      * of this event loop.  Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
275      * be executed by this event loop when the {@link SelectableChannel} is ready.
276      */
277     public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
278         if (ch == null) {
279             throw new NullPointerException("ch");
280         }
281         if (interestOps == 0) {
282             throw new IllegalArgumentException("interestOps must be non-zero.");
283         }
284         if ((interestOps & ~ch.validOps()) != 0) {
285             throw new IllegalArgumentException(
286                     "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
287         }
288         if (task == null) {
289             throw new NullPointerException("task");
290         }
291 
292         if (isShutdown()) {
293             throw new IllegalStateException("event loop shut down");
294         }
295 
296         try {
297             ch.register(selector, interestOps, task);
298         } catch (Exception e) {
299             throw new EventLoopException("failed to register a channel", e);
300         }
301     }
302 
303     /**
304      * Returns the percentage of the desired amount of time spent for I/O in the event loop.
305      */
306     public int getIoRatio() {
307         return ioRatio;
308     }
309 
310     /**
311      * Sets the percentage of the desired amount of time spent for I/O in the event loop.  The default value is
312      * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
313      */
314     public void setIoRatio(int ioRatio) {
315         if (ioRatio <= 0 || ioRatio > 100) {
316             throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
317         }
318         this.ioRatio = ioRatio;
319     }
320 
321     /**
322      * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
323      * around the infamous epoll 100% CPU bug.
324      */
325     public void rebuildSelector() {
326         if (!inEventLoop()) {
327             execute(new Runnable() {
328                 @Override
329                 public void run() {
330                     rebuildSelector0();
331                 }
332             });
333             return;
334         }
335         rebuildSelector0();
336     }
337 
338     private void rebuildSelector0() {
339         final Selector oldSelector = selector;
340         final SelectorTuple newSelectorTuple;
341 
342         if (oldSelector == null) {
343             return;
344         }
345 
346         try {
347             newSelectorTuple = openSelector();
348         } catch (Exception e) {
349             logger.warn("Failed to create a new Selector.", e);
350             return;
351         }
352 
353         // Register all channels to the new Selector.
354         int nChannels = 0;
355         for (SelectionKey key: oldSelector.keys()) {
356             Object a = key.attachment();
357             try {
358                 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
359                     continue;
360                 }
361 
362                 int interestOps = key.interestOps();
363                 key.cancel();
364                 SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
365                 if (a instanceof AbstractNioChannel) {
366                     // Update SelectionKey
367                     ((AbstractNioChannel) a).selectionKey = newKey;
368                 }
369                 nChannels ++;
370             } catch (Exception e) {
371                 logger.warn("Failed to re-register a Channel to the new Selector.", e);
372                 if (a instanceof AbstractNioChannel) {
373                     AbstractNioChannel ch = (AbstractNioChannel) a;
374                     ch.unsafe().close(ch.unsafe().voidPromise());
375                 } else {
376                     @SuppressWarnings("unchecked")
377                     NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
378                     invokeChannelUnregistered(task, key, e);
379                 }
380             }
381         }
382 
383         selector = newSelectorTuple.selector;
384         unwrappedSelector = newSelectorTuple.unwrappedSelector;
385 
386         try {
387             // time to close the old selector as everything else is registered to the new one
388             oldSelector.close();
389         } catch (Throwable t) {
390             if (logger.isWarnEnabled()) {
391                 logger.warn("Failed to close the old Selector.", t);
392             }
393         }
394 
395         if (logger.isInfoEnabled()) {
396             logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
397         }
398     }
399 
400     @Override
401     protected void run() {
402         for (;;) {
403             try {
404                 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
405                     case SelectStrategy.CONTINUE:
406                         continue;
407                     case SelectStrategy.SELECT:
408                         select(wakenUp.getAndSet(false));
409 
410                         // 'wakenUp.compareAndSet(false, true)' is always evaluated
411                         // before calling 'selector.wakeup()' to reduce the wake-up
412                         // overhead. (Selector.wakeup() is an expensive operation.)
413                         //
414                         // However, there is a race condition in this approach.
415                         // The race condition is triggered when 'wakenUp' is set to
416                         // true too early.
417                         //
418                         // 'wakenUp' is set to true too early if:
419                         // 1) Selector is waken up between 'wakenUp.set(false)' and
420                         //    'selector.select(...)'. (BAD)
421                         // 2) Selector is waken up between 'selector.select(...)' and
422                         //    'if (wakenUp.get()) { ... }'. (OK)
423                         //
424                         // In the first case, 'wakenUp' is set to true and the
425                         // following 'selector.select(...)' will wake up immediately.
426                         // Until 'wakenUp' is set to false again in the next round,
427                         // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
428                         // any attempt to wake up the Selector will fail, too, causing
429                         // the following 'selector.select(...)' call to block
430                         // unnecessarily.
431                         //
432                         // To fix this problem, we wake up the selector again if wakenUp
433                         // is true immediately after selector.select(...).
434                         // It is inefficient in that it wakes up the selector for both
435                         // the first case (BAD - wake-up required) and the second case
436                         // (OK - no wake-up required).
437 
438                         if (wakenUp.get()) {
439                             selector.wakeup();
440                         }
441                         // fall through
442                     default:
443                 }
444 
445                 cancelledKeys = 0;
446                 needsToSelectAgain = false;
447                 final int ioRatio = this.ioRatio;
448                 if (ioRatio == 100) {
449                     try {
450                         processSelectedKeys();
451                     } finally {
452                         // Ensure we always run tasks.
453                         runAllTasks();
454                     }
455                 } else {
456                     final long ioStartTime = System.nanoTime();
457                     try {
458                         processSelectedKeys();
459                     } finally {
460                         // Ensure we always run tasks.
461                         final long ioTime = System.nanoTime() - ioStartTime;
462                         runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
463                     }
464                 }
465             } catch (Throwable t) {
466                 handleLoopException(t);
467             }
468             // Always handle shutdown even if the loop processing threw an exception.
469             try {
470                 if (isShuttingDown()) {
471                     closeAll();
472                     if (confirmShutdown()) {
473                         return;
474                     }
475                 }
476             } catch (Throwable t) {
477                 handleLoopException(t);
478             }
479         }
480     }
481 
482     private static void handleLoopException(Throwable t) {
483         logger.warn("Unexpected exception in the selector loop.", t);
484 
485         // Prevent possible consecutive immediate failures that lead to
486         // excessive CPU consumption.
487         try {
488             Thread.sleep(1000);
489         } catch (InterruptedException e) {
490             // Ignore.
491         }
492     }
493 
494     private void processSelectedKeys() {
495         if (selectedKeys != null) {
496             processSelectedKeysOptimized();
497         } else {
498             processSelectedKeysPlain(selector.selectedKeys());
499         }
500     }
501 
502     @Override
503     protected void cleanup() {
504         try {
505             selector.close();
506         } catch (IOException e) {
507             logger.warn("Failed to close a selector.", e);
508         }
509     }
510 
511     void cancel(SelectionKey key) {
512         key.cancel();
513         cancelledKeys ++;
514         if (cancelledKeys >= CLEANUP_INTERVAL) {
515             cancelledKeys = 0;
516             needsToSelectAgain = true;
517         }
518     }
519 
520     @Override
521     protected Runnable pollTask() {
522         Runnable task = super.pollTask();
523         if (needsToSelectAgain) {
524             selectAgain();
525         }
526         return task;
527     }
528 
529     private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
530         // check if the set is empty and if so just return to not create garbage by
531         // creating a new Iterator every time even if there is nothing to process.
532         // See https://github.com/netty/netty/issues/597
533         if (selectedKeys.isEmpty()) {
534             return;
535         }
536 
537         Iterator<SelectionKey> i = selectedKeys.iterator();
538         for (;;) {
539             final SelectionKey k = i.next();
540             final Object a = k.attachment();
541             i.remove();
542 
543             if (a instanceof AbstractNioChannel) {
544                 processSelectedKey(k, (AbstractNioChannel) a);
545             } else {
546                 @SuppressWarnings("unchecked")
547                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
548                 processSelectedKey(k, task);
549             }
550 
551             if (!i.hasNext()) {
552                 break;
553             }
554 
555             if (needsToSelectAgain) {
556                 selectAgain();
557                 selectedKeys = selector.selectedKeys();
558 
559                 // Create the iterator again to avoid ConcurrentModificationException
560                 if (selectedKeys.isEmpty()) {
561                     break;
562                 } else {
563                     i = selectedKeys.iterator();
564                 }
565             }
566         }
567     }
568 
569     private void processSelectedKeysOptimized() {
570         for (int i = 0; i < selectedKeys.size; ++i) {
571             final SelectionKey k = selectedKeys.keys[i];
572             // null out entry in the array to allow to have it GC'ed once the Channel close
573             // See https://github.com/netty/netty/issues/2363
574             selectedKeys.keys[i] = null;
575 
576             final Object a = k.attachment();
577 
578             if (a instanceof AbstractNioChannel) {
579                 processSelectedKey(k, (AbstractNioChannel) a);
580             } else {
581                 @SuppressWarnings("unchecked")
582                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
583                 processSelectedKey(k, task);
584             }
585 
586             if (needsToSelectAgain) {
587                 // null out entries in the array to allow to have it GC'ed once the Channel close
588                 // See https://github.com/netty/netty/issues/2363
589                 selectedKeys.reset(i + 1);
590 
591                 selectAgain();
592                 i = -1;
593             }
594         }
595     }
596 
597     private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
598         final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
599         if (!k.isValid()) {
600             final EventLoop eventLoop;
601             try {
602                 eventLoop = ch.eventLoop();
603             } catch (Throwable ignored) {
604                 // If the channel implementation throws an exception because there is no event loop, we ignore this
605                 // because we are only trying to determine if ch is registered to this event loop and thus has authority
606                 // to close ch.
607                 return;
608             }
609             // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
610             // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
611             // still healthy and should not be closed.
612             // See https://github.com/netty/netty/issues/5125
613             if (eventLoop != this || eventLoop == null) {
614                 return;
615             }
616             // close the channel if the key is not valid anymore
617             unsafe.close(unsafe.voidPromise());
618             return;
619         }
620 
621         try {
622             int readyOps = k.readyOps();
623             // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
624             // the NIO JDK channel implementation may throw a NotYetConnectedException.
625             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
626                 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
627                 // See https://github.com/netty/netty/issues/924
628                 int ops = k.interestOps();
629                 ops &= ~SelectionKey.OP_CONNECT;
630                 k.interestOps(ops);
631 
632                 unsafe.finishConnect();
633             }
634 
635             // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
636             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
637                 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
638                 ch.unsafe().forceFlush();
639             }
640 
641             // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
642             // to a spin loop
643             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
644                 unsafe.read();
645             }
646         } catch (CancelledKeyException ignored) {
647             unsafe.close(unsafe.voidPromise());
648         }
649     }
650 
651     private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
652         int state = 0;
653         try {
654             task.channelReady(k.channel(), k);
655             state = 1;
656         } catch (Exception e) {
657             k.cancel();
658             invokeChannelUnregistered(task, k, e);
659             state = 2;
660         } finally {
661             switch (state) {
662             case 0:
663                 k.cancel();
664                 invokeChannelUnregistered(task, k, null);
665                 break;
666             case 1:
667                 if (!k.isValid()) { // Cancelled by channelReady()
668                     invokeChannelUnregistered(task, k, null);
669                 }
670                 break;
671             }
672         }
673     }
674 
675     private void closeAll() {
676         selectAgain();
677         Set<SelectionKey> keys = selector.keys();
678         Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
679         for (SelectionKey k: keys) {
680             Object a = k.attachment();
681             if (a instanceof AbstractNioChannel) {
682                 channels.add((AbstractNioChannel) a);
683             } else {
684                 k.cancel();
685                 @SuppressWarnings("unchecked")
686                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
687                 invokeChannelUnregistered(task, k, null);
688             }
689         }
690 
691         for (AbstractNioChannel ch: channels) {
692             ch.unsafe().close(ch.unsafe().voidPromise());
693         }
694     }
695 
696     private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause) {
697         try {
698             task.channelUnregistered(k.channel(), cause);
699         } catch (Exception e) {
700             logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e);
701         }
702     }
703 
704     @Override
705     protected void wakeup(boolean inEventLoop) {
706         if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
707             selector.wakeup();
708         }
709     }
710 
711     Selector unwrappedSelector() {
712         return unwrappedSelector;
713     }
714 
715     int selectNow() throws IOException {
716         try {
717             return selector.selectNow();
718         } finally {
719             // restore wakeup state if needed
720             if (wakenUp.get()) {
721                 selector.wakeup();
722             }
723         }
724     }
725 
726     private void select(boolean oldWakenUp) throws IOException {
727         Selector selector = this.selector;
728         try {
729             int selectCnt = 0;
730             long currentTimeNanos = System.nanoTime();
731             long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
732 
733             for (;;) {
734                 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
735                 if (timeoutMillis <= 0) {
736                     if (selectCnt == 0) {
737                         selector.selectNow();
738                         selectCnt = 1;
739                     }
740                     break;
741                 }
742 
743                 // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
744                 // Selector#wakeup. So we need to check task queue again before executing select operation.
745                 // If we don't, the task might be pended until select operation was timed out.
746                 // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
747                 if (hasTasks() && wakenUp.compareAndSet(false, true)) {
748                     selector.selectNow();
749                     selectCnt = 1;
750                     break;
751                 }
752 
753                 int selectedKeys = selector.select(timeoutMillis);
754                 selectCnt ++;
755 
756                 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
757                     // - Selected something,
758                     // - waken up by user, or
759                     // - the task queue has a pending task.
760                     // - a scheduled task is ready for processing
761                     break;
762                 }
763                 if (Thread.interrupted()) {
764                     // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
765                     // As this is most likely a bug in the handler of the user or it's client library we will
766                     // also log it.
767                     //
768                     // See https://github.com/netty/netty/issues/2426
769                     if (logger.isDebugEnabled()) {
770                         logger.debug("Selector.select() returned prematurely because " +
771                                 "Thread.currentThread().interrupt() was called. Use " +
772                                 "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
773                     }
774                     selectCnt = 1;
775                     break;
776                 }
777 
778                 long time = System.nanoTime();
779                 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
780                     // timeoutMillis elapsed without anything selected.
781                     selectCnt = 1;
782                 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
783                         selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
784                     // The selector returned prematurely many times in a row.
785                     // Rebuild the selector to work around the problem.
786                     logger.warn(
787                             "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
788                             selectCnt, selector);
789 
790                     rebuildSelector();
791                     selector = this.selector;
792 
793                     // Select again to populate selectedKeys.
794                     selector.selectNow();
795                     selectCnt = 1;
796                     break;
797                 }
798 
799                 currentTimeNanos = time;
800             }
801 
802             if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
803                 if (logger.isDebugEnabled()) {
804                     logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
805                             selectCnt - 1, selector);
806                 }
807             }
808         } catch (CancelledKeyException e) {
809             if (logger.isDebugEnabled()) {
810                 logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
811                         selector, e);
812             }
813             // Harmless exception - log anyway
814         }
815     }
816 
817     private void selectAgain() {
818         needsToSelectAgain = false;
819         try {
820             selector.selectNow();
821         } catch (Throwable t) {
822             logger.warn("Failed to update SelectionKeys.", t);
823         }
824     }
825 }