View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelException;
20  import io.netty.channel.EventLoop;
21  import io.netty.channel.EventLoopException;
22  import io.netty.channel.SelectStrategy;
23  import io.netty.channel.SingleThreadEventLoop;
24  import io.netty.util.IntSupplier;
25  import io.netty.util.concurrent.RejectedExecutionHandler;
26  import io.netty.util.internal.PlatformDependent;
27  import io.netty.util.internal.ReflectionUtil;
28  import io.netty.util.internal.SystemPropertyUtil;
29  import io.netty.util.internal.logging.InternalLogger;
30  import io.netty.util.internal.logging.InternalLoggerFactory;
31  
32  import java.io.IOException;
33  import java.lang.reflect.Field;
34  import java.nio.channels.CancelledKeyException;
35  import java.nio.channels.SelectableChannel;
36  import java.nio.channels.SelectionKey;
37  import java.nio.channels.Selector;
38  import java.nio.channels.spi.SelectorProvider;
39  import java.security.AccessController;
40  import java.security.PrivilegedAction;
41  import java.util.ArrayList;
42  import java.util.Collection;
43  import java.util.Iterator;
44  import java.util.Queue;
45  import java.util.Set;
46  import java.util.concurrent.Callable;
47  import java.util.concurrent.Executor;
48  import java.util.concurrent.TimeUnit;
49  import java.util.concurrent.atomic.AtomicBoolean;
50  
51  /**
52   * {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a
53   * {@link Selector} and so does the multi-plexing of these in the event loop.
54   *
55   */
56  public final class NioEventLoop extends SingleThreadEventLoop {
57  
58      private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
59  
60      private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
61  
62      private static final boolean DISABLE_KEYSET_OPTIMIZATION =
63              SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
64  
65      private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
66      private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
67  
68      private final IntSupplier selectNowSupplier = new IntSupplier() {
69          @Override
70          public int get() throws Exception {
71              return selectNow();
72          }
73      };
74      private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() {
75          @Override
76          public Integer call() throws Exception {
77              return NioEventLoop.super.pendingTasks();
78          }
79      };
80  
81      // Workaround for JDK NIO bug.
82      //
83      // See:
84      // - http://bugs.sun.com/view_bug.do?bug_id=6427854
85      // - https://github.com/netty/netty/issues/203
86      static {
87          final String key = "sun.nio.ch.bugLevel";
88          final String buglevel = SystemPropertyUtil.get(key);
89          if (buglevel == null) {
90              try {
91                  AccessController.doPrivileged(new PrivilegedAction<Void>() {
92                      @Override
93                      public Void run() {
94                          System.setProperty(key, "");
95                          return null;
96                      }
97                  });
98              } catch (final SecurityException e) {
99                  logger.debug("Unable to get/set System Property: " + key, e);
100             }
101         }
102 
103         int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
104         if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
105             selectorAutoRebuildThreshold = 0;
106         }
107 
108         SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
109 
110         if (logger.isDebugEnabled()) {
111             logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEYSET_OPTIMIZATION);
112             logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
113         }
114     }
115 
116     /**
117      * The NIO {@link Selector}.
118      */
119     private Selector selector;
120     private Selector unwrappedSelector;
121     private SelectedSelectionKeySet selectedKeys;
122 
123     private final SelectorProvider provider;
124 
125     /**
126      * Boolean that controls determines if a blocked Selector.select should
127      * break out of its selection process. In our case we use a timeout for
128      * the select method and the select method will block for that time unless
129      * waken up.
130      */
131     private final AtomicBoolean wakenUp = new AtomicBoolean();
132 
133     private final SelectStrategy selectStrategy;
134 
135     private volatile int ioRatio = 50;
136     private int cancelledKeys;
137     private boolean needsToSelectAgain;
138 
139     NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
140                  SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
141         super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
142         if (selectorProvider == null) {
143             throw new NullPointerException("selectorProvider");
144         }
145         if (strategy == null) {
146             throw new NullPointerException("selectStrategy");
147         }
148         provider = selectorProvider;
149         final SelectorTuple selectorTuple = openSelector();
150         selector = selectorTuple.selector;
151         unwrappedSelector = selectorTuple.unwrappedSelector;
152         selectStrategy = strategy;
153     }
154 
155     private static final class SelectorTuple {
156         final Selector unwrappedSelector;
157         final Selector selector;
158 
159         SelectorTuple(Selector unwrappedSelector) {
160             this.unwrappedSelector = unwrappedSelector;
161             this.selector = unwrappedSelector;
162         }
163 
164         SelectorTuple(Selector unwrappedSelector, Selector selector) {
165             this.unwrappedSelector = unwrappedSelector;
166             this.selector = selector;
167         }
168     }
169 
170     private SelectorTuple openSelector() {
171         final Selector unwrappedSelector;
172         try {
173             unwrappedSelector = provider.openSelector();
174         } catch (IOException e) {
175             throw new ChannelException("failed to open a new selector", e);
176         }
177 
178         if (DISABLE_KEYSET_OPTIMIZATION) {
179             return new SelectorTuple(unwrappedSelector);
180         }
181 
182         Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
183             @Override
184             public Object run() {
185                 try {
186                     return Class.forName(
187                             "sun.nio.ch.SelectorImpl",
188                             false,
189                             PlatformDependent.getSystemClassLoader());
190                 } catch (Throwable cause) {
191                     return cause;
192                 }
193             }
194         });
195 
196         if (!(maybeSelectorImplClass instanceof Class) ||
197                 // ensure the current selector implementation is what we can instrument.
198                 !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
199             if (maybeSelectorImplClass instanceof Throwable) {
200                 Throwable t = (Throwable) maybeSelectorImplClass;
201                 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
202             }
203             return new SelectorTuple(unwrappedSelector);
204         }
205 
206         final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
207         final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
208 
209         Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
210             @Override
211             public Object run() {
212                 try {
213                     Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
214                     Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
215 
216                     Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
217                     if (cause != null) {
218                         return cause;
219                     }
220                     cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
221                     if (cause != null) {
222                         return cause;
223                     }
224 
225                     selectedKeysField.set(unwrappedSelector, selectedKeySet);
226                     publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
227                     return null;
228                 } catch (NoSuchFieldException e) {
229                     return e;
230                 } catch (IllegalAccessException e) {
231                     return e;
232                 }
233             }
234         });
235 
236         if (maybeException instanceof Exception) {
237             selectedKeys = null;
238             Exception e = (Exception) maybeException;
239             logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
240             return new SelectorTuple(unwrappedSelector);
241         }
242         selectedKeys = selectedKeySet;
243         logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
244         return new SelectorTuple(unwrappedSelector,
245                                  new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
246     }
247 
248     /**
249      * Returns the {@link SelectorProvider} used by this {@link NioEventLoop} to obtain the {@link Selector}.
250      */
251     public SelectorProvider selectorProvider() {
252         return provider;
253     }
254 
255     @Override
256     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
257         // This event loop never calls takeTask()
258         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
259                                                     : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
260     }
261 
262     @Override
263     public int pendingTasks() {
264         // As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as
265         // otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer).
266         // See https://github.com/netty/netty/issues/5297
267         if (inEventLoop()) {
268             return super.pendingTasks();
269         } else {
270             return submit(pendingTasksCallable).syncUninterruptibly().getNow();
271         }
272     }
273 
274     /**
275      * Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
276      * of this event loop.  Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
277      * be executed by this event loop when the {@link SelectableChannel} is ready.
278      */
279     public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
280         if (ch == null) {
281             throw new NullPointerException("ch");
282         }
283         if (interestOps == 0) {
284             throw new IllegalArgumentException("interestOps must be non-zero.");
285         }
286         if ((interestOps & ~ch.validOps()) != 0) {
287             throw new IllegalArgumentException(
288                     "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
289         }
290         if (task == null) {
291             throw new NullPointerException("task");
292         }
293 
294         if (isShutdown()) {
295             throw new IllegalStateException("event loop shut down");
296         }
297 
298         try {
299             ch.register(selector, interestOps, task);
300         } catch (Exception e) {
301             throw new EventLoopException("failed to register a channel", e);
302         }
303     }
304 
305     /**
306      * Returns the percentage of the desired amount of time spent for I/O in the event loop.
307      */
308     public int getIoRatio() {
309         return ioRatio;
310     }
311 
312     /**
313      * Sets the percentage of the desired amount of time spent for I/O in the event loop.  The default value is
314      * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
315      */
316     public void setIoRatio(int ioRatio) {
317         if (ioRatio <= 0 || ioRatio > 100) {
318             throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
319         }
320         this.ioRatio = ioRatio;
321     }
322 
323     /**
324      * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
325      * around the infamous epoll 100% CPU bug.
326      */
327     public void rebuildSelector() {
328         if (!inEventLoop()) {
329             execute(new Runnable() {
330                 @Override
331                 public void run() {
332                     rebuildSelector0();
333                 }
334             });
335             return;
336         }
337         rebuildSelector0();
338     }
339 
340     private void rebuildSelector0() {
341         final Selector oldSelector = selector;
342         final SelectorTuple newSelectorTuple;
343 
344         if (oldSelector == null) {
345             return;
346         }
347 
348         try {
349             newSelectorTuple = openSelector();
350         } catch (Exception e) {
351             logger.warn("Failed to create a new Selector.", e);
352             return;
353         }
354 
355         // Register all channels to the new Selector.
356         int nChannels = 0;
357         for (SelectionKey key: oldSelector.keys()) {
358             Object a = key.attachment();
359             try {
360                 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
361                     continue;
362                 }
363 
364                 int interestOps = key.interestOps();
365                 key.cancel();
366                 SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
367                 if (a instanceof AbstractNioChannel) {
368                     // Update SelectionKey
369                     ((AbstractNioChannel) a).selectionKey = newKey;
370                 }
371                 nChannels ++;
372             } catch (Exception e) {
373                 logger.warn("Failed to re-register a Channel to the new Selector.", e);
374                 if (a instanceof AbstractNioChannel) {
375                     AbstractNioChannel ch = (AbstractNioChannel) a;
376                     ch.unsafe().close(ch.unsafe().voidPromise());
377                 } else {
378                     @SuppressWarnings("unchecked")
379                     NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
380                     invokeChannelUnregistered(task, key, e);
381                 }
382             }
383         }
384 
385         selector = newSelectorTuple.selector;
386         unwrappedSelector = newSelectorTuple.unwrappedSelector;
387 
388         try {
389             // time to close the old selector as everything else is registered to the new one
390             oldSelector.close();
391         } catch (Throwable t) {
392             if (logger.isWarnEnabled()) {
393                 logger.warn("Failed to close the old Selector.", t);
394             }
395         }
396 
397         if (logger.isInfoEnabled()) {
398             logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
399         }
400     }
401 
402     @Override
403     protected void run() {
404         for (;;) {
405             try {
406                 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
407                     case SelectStrategy.CONTINUE:
408                         continue;
409                     case SelectStrategy.SELECT:
410                         select(wakenUp.getAndSet(false));
411 
412                         // 'wakenUp.compareAndSet(false, true)' is always evaluated
413                         // before calling 'selector.wakeup()' to reduce the wake-up
414                         // overhead. (Selector.wakeup() is an expensive operation.)
415                         //
416                         // However, there is a race condition in this approach.
417                         // The race condition is triggered when 'wakenUp' is set to
418                         // true too early.
419                         //
420                         // 'wakenUp' is set to true too early if:
421                         // 1) Selector is waken up between 'wakenUp.set(false)' and
422                         //    'selector.select(...)'. (BAD)
423                         // 2) Selector is waken up between 'selector.select(...)' and
424                         //    'if (wakenUp.get()) { ... }'. (OK)
425                         //
426                         // In the first case, 'wakenUp' is set to true and the
427                         // following 'selector.select(...)' will wake up immediately.
428                         // Until 'wakenUp' is set to false again in the next round,
429                         // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
430                         // any attempt to wake up the Selector will fail, too, causing
431                         // the following 'selector.select(...)' call to block
432                         // unnecessarily.
433                         //
434                         // To fix this problem, we wake up the selector again if wakenUp
435                         // is true immediately after selector.select(...).
436                         // It is inefficient in that it wakes up the selector for both
437                         // the first case (BAD - wake-up required) and the second case
438                         // (OK - no wake-up required).
439 
440                         if (wakenUp.get()) {
441                             selector.wakeup();
442                         }
443                         // fall through
444                     default:
445                 }
446 
447                 cancelledKeys = 0;
448                 needsToSelectAgain = false;
449                 final int ioRatio = this.ioRatio;
450                 if (ioRatio == 100) {
451                     try {
452                         processSelectedKeys();
453                     } finally {
454                         // Ensure we always run tasks.
455                         runAllTasks();
456                     }
457                 } else {
458                     final long ioStartTime = System.nanoTime();
459                     try {
460                         processSelectedKeys();
461                     } finally {
462                         // Ensure we always run tasks.
463                         final long ioTime = System.nanoTime() - ioStartTime;
464                         runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
465                     }
466                 }
467             } catch (Throwable t) {
468                 handleLoopException(t);
469             }
470             // Always handle shutdown even if the loop processing threw an exception.
471             try {
472                 if (isShuttingDown()) {
473                     closeAll();
474                     if (confirmShutdown()) {
475                         return;
476                     }
477                 }
478             } catch (Throwable t) {
479                 handleLoopException(t);
480             }
481         }
482     }
483 
484     private static void handleLoopException(Throwable t) {
485         logger.warn("Unexpected exception in the selector loop.", t);
486 
487         // Prevent possible consecutive immediate failures that lead to
488         // excessive CPU consumption.
489         try {
490             Thread.sleep(1000);
491         } catch (InterruptedException e) {
492             // Ignore.
493         }
494     }
495 
496     private void processSelectedKeys() {
497         if (selectedKeys != null) {
498             processSelectedKeysOptimized();
499         } else {
500             processSelectedKeysPlain(selector.selectedKeys());
501         }
502     }
503 
504     @Override
505     protected void cleanup() {
506         try {
507             selector.close();
508         } catch (IOException e) {
509             logger.warn("Failed to close a selector.", e);
510         }
511     }
512 
513     void cancel(SelectionKey key) {
514         key.cancel();
515         cancelledKeys ++;
516         if (cancelledKeys >= CLEANUP_INTERVAL) {
517             cancelledKeys = 0;
518             needsToSelectAgain = true;
519         }
520     }
521 
522     @Override
523     protected Runnable pollTask() {
524         Runnable task = super.pollTask();
525         if (needsToSelectAgain) {
526             selectAgain();
527         }
528         return task;
529     }
530 
531     private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
532         // check if the set is empty and if so just return to not create garbage by
533         // creating a new Iterator every time even if there is nothing to process.
534         // See https://github.com/netty/netty/issues/597
535         if (selectedKeys.isEmpty()) {
536             return;
537         }
538 
539         Iterator<SelectionKey> i = selectedKeys.iterator();
540         for (;;) {
541             final SelectionKey k = i.next();
542             final Object a = k.attachment();
543             i.remove();
544 
545             if (a instanceof AbstractNioChannel) {
546                 processSelectedKey(k, (AbstractNioChannel) a);
547             } else {
548                 @SuppressWarnings("unchecked")
549                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
550                 processSelectedKey(k, task);
551             }
552 
553             if (!i.hasNext()) {
554                 break;
555             }
556 
557             if (needsToSelectAgain) {
558                 selectAgain();
559                 selectedKeys = selector.selectedKeys();
560 
561                 // Create the iterator again to avoid ConcurrentModificationException
562                 if (selectedKeys.isEmpty()) {
563                     break;
564                 } else {
565                     i = selectedKeys.iterator();
566                 }
567             }
568         }
569     }
570 
571     private void processSelectedKeysOptimized() {
572         for (int i = 0; i < selectedKeys.size; ++i) {
573             final SelectionKey k = selectedKeys.keys[i];
574             // null out entry in the array to allow to have it GC'ed once the Channel close
575             // See https://github.com/netty/netty/issues/2363
576             selectedKeys.keys[i] = null;
577 
578             final Object a = k.attachment();
579 
580             if (a instanceof AbstractNioChannel) {
581                 processSelectedKey(k, (AbstractNioChannel) a);
582             } else {
583                 @SuppressWarnings("unchecked")
584                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
585                 processSelectedKey(k, task);
586             }
587 
588             if (needsToSelectAgain) {
589                 // null out entries in the array to allow to have it GC'ed once the Channel close
590                 // See https://github.com/netty/netty/issues/2363
591                 selectedKeys.reset(i + 1);
592 
593                 selectAgain();
594                 i = -1;
595             }
596         }
597     }
598 
599     private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
600         final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
601         if (!k.isValid()) {
602             final EventLoop eventLoop;
603             try {
604                 eventLoop = ch.eventLoop();
605             } catch (Throwable ignored) {
606                 // If the channel implementation throws an exception because there is no event loop, we ignore this
607                 // because we are only trying to determine if ch is registered to this event loop and thus has authority
608                 // to close ch.
609                 return;
610             }
611             // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
612             // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
613             // still healthy and should not be closed.
614             // See https://github.com/netty/netty/issues/5125
615             if (eventLoop != this || eventLoop == null) {
616                 return;
617             }
618             // close the channel if the key is not valid anymore
619             unsafe.close(unsafe.voidPromise());
620             return;
621         }
622 
623         try {
624             int readyOps = k.readyOps();
625             // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
626             // the NIO JDK channel implementation may throw a NotYetConnectedException.
627             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
628                 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
629                 // See https://github.com/netty/netty/issues/924
630                 int ops = k.interestOps();
631                 ops &= ~SelectionKey.OP_CONNECT;
632                 k.interestOps(ops);
633 
634                 unsafe.finishConnect();
635             }
636 
637             // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
638             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
639                 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
640                 ch.unsafe().forceFlush();
641             }
642 
643             // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
644             // to a spin loop
645             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
646                 unsafe.read();
647             }
648         } catch (CancelledKeyException ignored) {
649             unsafe.close(unsafe.voidPromise());
650         }
651     }
652 
653     private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
654         int state = 0;
655         try {
656             task.channelReady(k.channel(), k);
657             state = 1;
658         } catch (Exception e) {
659             k.cancel();
660             invokeChannelUnregistered(task, k, e);
661             state = 2;
662         } finally {
663             switch (state) {
664             case 0:
665                 k.cancel();
666                 invokeChannelUnregistered(task, k, null);
667                 break;
668             case 1:
669                 if (!k.isValid()) { // Cancelled by channelReady()
670                     invokeChannelUnregistered(task, k, null);
671                 }
672                 break;
673             }
674         }
675     }
676 
677     private void closeAll() {
678         selectAgain();
679         Set<SelectionKey> keys = selector.keys();
680         Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
681         for (SelectionKey k: keys) {
682             Object a = k.attachment();
683             if (a instanceof AbstractNioChannel) {
684                 channels.add((AbstractNioChannel) a);
685             } else {
686                 k.cancel();
687                 @SuppressWarnings("unchecked")
688                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
689                 invokeChannelUnregistered(task, k, null);
690             }
691         }
692 
693         for (AbstractNioChannel ch: channels) {
694             ch.unsafe().close(ch.unsafe().voidPromise());
695         }
696     }
697 
698     private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause) {
699         try {
700             task.channelUnregistered(k.channel(), cause);
701         } catch (Exception e) {
702             logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e);
703         }
704     }
705 
706     @Override
707     protected void wakeup(boolean inEventLoop) {
708         if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
709             selector.wakeup();
710         }
711     }
712 
713     Selector unwrappedSelector() {
714         return unwrappedSelector;
715     }
716 
717     int selectNow() throws IOException {
718         try {
719             return selector.selectNow();
720         } finally {
721             // restore wakeup state if needed
722             if (wakenUp.get()) {
723                 selector.wakeup();
724             }
725         }
726     }
727 
728     private void select(boolean oldWakenUp) throws IOException {
729         Selector selector = this.selector;
730         try {
731             int selectCnt = 0;
732             long currentTimeNanos = System.nanoTime();
733             long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
734 
735             for (;;) {
736                 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
737                 if (timeoutMillis <= 0) {
738                     if (selectCnt == 0) {
739                         selector.selectNow();
740                         selectCnt = 1;
741                     }
742                     break;
743                 }
744 
745                 // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
746                 // Selector#wakeup. So we need to check task queue again before executing select operation.
747                 // If we don't, the task might be pended until select operation was timed out.
748                 // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
749                 if (hasTasks() && wakenUp.compareAndSet(false, true)) {
750                     selector.selectNow();
751                     selectCnt = 1;
752                     break;
753                 }
754 
755                 int selectedKeys = selector.select(timeoutMillis);
756                 selectCnt ++;
757 
758                 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
759                     // - Selected something,
760                     // - waken up by user, or
761                     // - the task queue has a pending task.
762                     // - a scheduled task is ready for processing
763                     break;
764                 }
765                 if (Thread.interrupted()) {
766                     // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
767                     // As this is most likely a bug in the handler of the user or it's client library we will
768                     // also log it.
769                     //
770                     // See https://github.com/netty/netty/issues/2426
771                     if (logger.isDebugEnabled()) {
772                         logger.debug("Selector.select() returned prematurely because " +
773                                 "Thread.currentThread().interrupt() was called. Use " +
774                                 "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
775                     }
776                     selectCnt = 1;
777                     break;
778                 }
779 
780                 long time = System.nanoTime();
781                 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
782                     // timeoutMillis elapsed without anything selected.
783                     selectCnt = 1;
784                 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
785                         selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
786                     // The selector returned prematurely many times in a row.
787                     // Rebuild the selector to work around the problem.
788                     logger.warn(
789                             "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
790                             selectCnt, selector);
791 
792                     rebuildSelector();
793                     selector = this.selector;
794 
795                     // Select again to populate selectedKeys.
796                     selector.selectNow();
797                     selectCnt = 1;
798                     break;
799                 }
800 
801                 currentTimeNanos = time;
802             }
803 
804             if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
805                 if (logger.isDebugEnabled()) {
806                     logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
807                             selectCnt - 1, selector);
808                 }
809             }
810         } catch (CancelledKeyException e) {
811             if (logger.isDebugEnabled()) {
812                 logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
813                         selector, e);
814             }
815             // Harmless exception - log anyway
816         }
817     }
818 
819     private void selectAgain() {
820         needsToSelectAgain = false;
821         try {
822             selector.selectNow();
823         } catch (Throwable t) {
824             logger.warn("Failed to update SelectionKeys.", t);
825         }
826     }
827 }