View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelException;
20  import io.netty.channel.EventLoop;
21  import io.netty.channel.EventLoopException;
22  import io.netty.channel.SelectStrategy;
23  import io.netty.channel.SingleThreadEventLoop;
24  import io.netty.util.IntSupplier;
25  import io.netty.util.concurrent.RejectedExecutionHandler;
26  import io.netty.util.internal.PlatformDependent;
27  import io.netty.util.internal.ReflectionUtil;
28  import io.netty.util.internal.SystemPropertyUtil;
29  import io.netty.util.internal.logging.InternalLogger;
30  import io.netty.util.internal.logging.InternalLoggerFactory;
31  
32  import java.io.IOException;
33  import java.lang.reflect.Field;
34  import java.nio.channels.CancelledKeyException;
35  import java.nio.channels.SelectableChannel;
36  import java.nio.channels.SelectionKey;
37  import java.nio.channels.Selector;
38  import java.nio.channels.spi.SelectorProvider;
39  import java.security.AccessController;
40  import java.security.PrivilegedAction;
41  import java.util.ArrayList;
42  import java.util.Collection;
43  import java.util.Iterator;
44  import java.util.Queue;
45  import java.util.Set;
46  import java.util.concurrent.Callable;
47  import java.util.concurrent.Executor;
48  import java.util.concurrent.TimeUnit;
49  import java.util.concurrent.atomic.AtomicBoolean;
50  
51  /**
52   * {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a
53   * {@link Selector} and so does the multi-plexing of these in the event loop.
54   *
55   */
56  public final class NioEventLoop extends SingleThreadEventLoop {
57  
58      private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
59  
60      private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
61  
62      private static final boolean DISABLE_KEYSET_OPTIMIZATION =
63              SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
64  
65      private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
66      private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
67  
68      private final IntSupplier selectNowSupplier = new IntSupplier() {
69          @Override
70          public int get() throws Exception {
71              return selectNow();
72          }
73      };
74      private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() {
75          @Override
76          public Integer call() throws Exception {
77              return NioEventLoop.super.pendingTasks();
78          }
79      };
80  
81      // Workaround for JDK NIO bug.
82      //
83      // See:
84      // - http://bugs.sun.com/view_bug.do?bug_id=6427854
85      // - https://github.com/netty/netty/issues/203
86      static {
87          final String key = "sun.nio.ch.bugLevel";
88          final String buglevel = SystemPropertyUtil.get(key);
89          if (buglevel == null) {
90              try {
91                  AccessController.doPrivileged(new PrivilegedAction<Void>() {
92                      @Override
93                      public Void run() {
94                          System.setProperty(key, "");
95                          return null;
96                      }
97                  });
98              } catch (final SecurityException e) {
99                  logger.debug("Unable to get/set System Property: " + key, e);
100             }
101         }
102 
103         int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
104         if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
105             selectorAutoRebuildThreshold = 0;
106         }
107 
108         SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
109 
110         if (logger.isDebugEnabled()) {
111             logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEYSET_OPTIMIZATION);
112             logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
113         }
114     }
115 
116     /**
117      * The NIO {@link Selector}.
118      */
119     private Selector selector;
120     private Selector unwrappedSelector;
121     private SelectedSelectionKeySet selectedKeys;
122 
123     private final SelectorProvider provider;
124 
125     /**
126      * Boolean that controls determines if a blocked Selector.select should
127      * break out of its selection process. In our case we use a timeout for
128      * the select method and the select method will block for that time unless
129      * waken up.
130      */
131     private final AtomicBoolean wakenUp = new AtomicBoolean();
132 
133     private final SelectStrategy selectStrategy;
134 
135     private volatile int ioRatio = 50;
136     private int cancelledKeys;
137     private boolean needsToSelectAgain;
138 
139     NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
140                  SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
141         super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
142         if (selectorProvider == null) {
143             throw new NullPointerException("selectorProvider");
144         }
145         if (strategy == null) {
146             throw new NullPointerException("selectStrategy");
147         }
148         provider = selectorProvider;
149         final SelectorTuple selectorTuple = openSelector();
150         selector = selectorTuple.selector;
151         unwrappedSelector = selectorTuple.unwrappedSelector;
152         selectStrategy = strategy;
153     }
154 
155     private static final class SelectorTuple {
156         final Selector unwrappedSelector;
157         final Selector selector;
158 
159         SelectorTuple(Selector unwrappedSelector) {
160             this.unwrappedSelector = unwrappedSelector;
161             this.selector = unwrappedSelector;
162         }
163 
164         SelectorTuple(Selector unwrappedSelector, Selector selector) {
165             this.unwrappedSelector = unwrappedSelector;
166             this.selector = selector;
167         }
168     }
169 
170     private SelectorTuple openSelector() {
171         final Selector unwrappedSelector;
172         try {
173             unwrappedSelector = provider.openSelector();
174         } catch (IOException e) {
175             throw new ChannelException("failed to open a new selector", e);
176         }
177 
178         if (DISABLE_KEYSET_OPTIMIZATION) {
179             return new SelectorTuple(unwrappedSelector);
180         }
181 
182         final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
183 
184         Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
185             @Override
186             public Object run() {
187                 try {
188                     return Class.forName(
189                             "sun.nio.ch.SelectorImpl",
190                             false,
191                             PlatformDependent.getSystemClassLoader());
192                 } catch (Throwable cause) {
193                     return cause;
194                 }
195             }
196         });
197 
198         if (!(maybeSelectorImplClass instanceof Class) ||
199                 // ensure the current selector implementation is what we can instrument.
200                 !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
201             if (maybeSelectorImplClass instanceof Throwable) {
202                 Throwable t = (Throwable) maybeSelectorImplClass;
203                 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
204             }
205             return new SelectorTuple(unwrappedSelector);
206         }
207 
208         final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
209 
210         Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
211             @Override
212             public Object run() {
213                 try {
214                     Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
215                     Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
216 
217                     Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
218                     if (cause != null) {
219                         return cause;
220                     }
221                     cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
222                     if (cause != null) {
223                         return cause;
224                     }
225 
226                     selectedKeysField.set(unwrappedSelector, selectedKeySet);
227                     publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
228                     return null;
229                 } catch (NoSuchFieldException e) {
230                     return e;
231                 } catch (IllegalAccessException e) {
232                     return e;
233                 }
234             }
235         });
236 
237         if (maybeException instanceof Exception) {
238             selectedKeys = null;
239             Exception e = (Exception) maybeException;
240             logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
241             return new SelectorTuple(unwrappedSelector);
242         }
243         selectedKeys = selectedKeySet;
244         logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
245         return new SelectorTuple(unwrappedSelector,
246                                  new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
247     }
248 
249     /**
250      * Returns the {@link SelectorProvider} used by this {@link NioEventLoop} to obtain the {@link Selector}.
251      */
252     public SelectorProvider selectorProvider() {
253         return provider;
254     }
255 
256     @Override
257     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
258         // This event loop never calls takeTask()
259         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
260                                                     : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
261     }
262 
263     @Override
264     public int pendingTasks() {
265         // As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as
266         // otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer).
267         // See https://github.com/netty/netty/issues/5297
268         if (inEventLoop()) {
269             return super.pendingTasks();
270         } else {
271             return submit(pendingTasksCallable).syncUninterruptibly().getNow();
272         }
273     }
274 
275     /**
276      * Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
277      * of this event loop.  Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
278      * be executed by this event loop when the {@link SelectableChannel} is ready.
279      */
280     public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
281         if (ch == null) {
282             throw new NullPointerException("ch");
283         }
284         if (interestOps == 0) {
285             throw new IllegalArgumentException("interestOps must be non-zero.");
286         }
287         if ((interestOps & ~ch.validOps()) != 0) {
288             throw new IllegalArgumentException(
289                     "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
290         }
291         if (task == null) {
292             throw new NullPointerException("task");
293         }
294 
295         if (isShutdown()) {
296             throw new IllegalStateException("event loop shut down");
297         }
298 
299         try {
300             ch.register(selector, interestOps, task);
301         } catch (Exception e) {
302             throw new EventLoopException("failed to register a channel", e);
303         }
304     }
305 
306     /**
307      * Returns the percentage of the desired amount of time spent for I/O in the event loop.
308      */
309     public int getIoRatio() {
310         return ioRatio;
311     }
312 
313     /**
314      * Sets the percentage of the desired amount of time spent for I/O in the event loop.  The default value is
315      * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
316      */
317     public void setIoRatio(int ioRatio) {
318         if (ioRatio <= 0 || ioRatio > 100) {
319             throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
320         }
321         this.ioRatio = ioRatio;
322     }
323 
324     /**
325      * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
326      * around the infamous epoll 100% CPU bug.
327      */
328     public void rebuildSelector() {
329         if (!inEventLoop()) {
330             execute(new Runnable() {
331                 @Override
332                 public void run() {
333                     rebuildSelector0();
334                 }
335             });
336             return;
337         }
338         rebuildSelector0();
339     }
340 
341     private void rebuildSelector0() {
342         final Selector oldSelector = selector;
343         final SelectorTuple newSelectorTuple;
344 
345         if (oldSelector == null) {
346             return;
347         }
348 
349         try {
350             newSelectorTuple = openSelector();
351         } catch (Exception e) {
352             logger.warn("Failed to create a new Selector.", e);
353             return;
354         }
355 
356         // Register all channels to the new Selector.
357         int nChannels = 0;
358         for (SelectionKey key: oldSelector.keys()) {
359             Object a = key.attachment();
360             try {
361                 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
362                     continue;
363                 }
364 
365                 int interestOps = key.interestOps();
366                 key.cancel();
367                 SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
368                 if (a instanceof AbstractNioChannel) {
369                     // Update SelectionKey
370                     ((AbstractNioChannel) a).selectionKey = newKey;
371                 }
372                 nChannels ++;
373             } catch (Exception e) {
374                 logger.warn("Failed to re-register a Channel to the new Selector.", e);
375                 if (a instanceof AbstractNioChannel) {
376                     AbstractNioChannel ch = (AbstractNioChannel) a;
377                     ch.unsafe().close(ch.unsafe().voidPromise());
378                 } else {
379                     @SuppressWarnings("unchecked")
380                     NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
381                     invokeChannelUnregistered(task, key, e);
382                 }
383             }
384         }
385 
386         selector = newSelectorTuple.selector;
387         unwrappedSelector = newSelectorTuple.unwrappedSelector;
388 
389         try {
390             // time to close the old selector as everything else is registered to the new one
391             oldSelector.close();
392         } catch (Throwable t) {
393             if (logger.isWarnEnabled()) {
394                 logger.warn("Failed to close the old Selector.", t);
395             }
396         }
397 
398         logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
399     }
400 
401     @Override
402     protected void run() {
403         for (;;) {
404             try {
405                 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
406                     case SelectStrategy.CONTINUE:
407                         continue;
408                     case SelectStrategy.SELECT:
409                         select(wakenUp.getAndSet(false));
410 
411                         // 'wakenUp.compareAndSet(false, true)' is always evaluated
412                         // before calling 'selector.wakeup()' to reduce the wake-up
413                         // overhead. (Selector.wakeup() is an expensive operation.)
414                         //
415                         // However, there is a race condition in this approach.
416                         // The race condition is triggered when 'wakenUp' is set to
417                         // true too early.
418                         //
419                         // 'wakenUp' is set to true too early if:
420                         // 1) Selector is waken up between 'wakenUp.set(false)' and
421                         //    'selector.select(...)'. (BAD)
422                         // 2) Selector is waken up between 'selector.select(...)' and
423                         //    'if (wakenUp.get()) { ... }'. (OK)
424                         //
425                         // In the first case, 'wakenUp' is set to true and the
426                         // following 'selector.select(...)' will wake up immediately.
427                         // Until 'wakenUp' is set to false again in the next round,
428                         // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
429                         // any attempt to wake up the Selector will fail, too, causing
430                         // the following 'selector.select(...)' call to block
431                         // unnecessarily.
432                         //
433                         // To fix this problem, we wake up the selector again if wakenUp
434                         // is true immediately after selector.select(...).
435                         // It is inefficient in that it wakes up the selector for both
436                         // the first case (BAD - wake-up required) and the second case
437                         // (OK - no wake-up required).
438 
439                         if (wakenUp.get()) {
440                             selector.wakeup();
441                         }
442                         // fall through
443                     default:
444                 }
445 
446                 cancelledKeys = 0;
447                 needsToSelectAgain = false;
448                 final int ioRatio = this.ioRatio;
449                 if (ioRatio == 100) {
450                     try {
451                         processSelectedKeys();
452                     } finally {
453                         // Ensure we always run tasks.
454                         runAllTasks();
455                     }
456                 } else {
457                     final long ioStartTime = System.nanoTime();
458                     try {
459                         processSelectedKeys();
460                     } finally {
461                         // Ensure we always run tasks.
462                         final long ioTime = System.nanoTime() - ioStartTime;
463                         runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
464                     }
465                 }
466             } catch (Throwable t) {
467                 handleLoopException(t);
468             }
469             // Always handle shutdown even if the loop processing threw an exception.
470             try {
471                 if (isShuttingDown()) {
472                     closeAll();
473                     if (confirmShutdown()) {
474                         return;
475                     }
476                 }
477             } catch (Throwable t) {
478                 handleLoopException(t);
479             }
480         }
481     }
482 
483     private static void handleLoopException(Throwable t) {
484         logger.warn("Unexpected exception in the selector loop.", t);
485 
486         // Prevent possible consecutive immediate failures that lead to
487         // excessive CPU consumption.
488         try {
489             Thread.sleep(1000);
490         } catch (InterruptedException e) {
491             // Ignore.
492         }
493     }
494 
495     private void processSelectedKeys() {
496         if (selectedKeys != null) {
497             processSelectedKeysOptimized();
498         } else {
499             processSelectedKeysPlain(selector.selectedKeys());
500         }
501     }
502 
503     @Override
504     protected void cleanup() {
505         try {
506             selector.close();
507         } catch (IOException e) {
508             logger.warn("Failed to close a selector.", e);
509         }
510     }
511 
512     void cancel(SelectionKey key) {
513         key.cancel();
514         cancelledKeys ++;
515         if (cancelledKeys >= CLEANUP_INTERVAL) {
516             cancelledKeys = 0;
517             needsToSelectAgain = true;
518         }
519     }
520 
521     @Override
522     protected Runnable pollTask() {
523         Runnable task = super.pollTask();
524         if (needsToSelectAgain) {
525             selectAgain();
526         }
527         return task;
528     }
529 
530     private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
531         // check if the set is empty and if so just return to not create garbage by
532         // creating a new Iterator every time even if there is nothing to process.
533         // See https://github.com/netty/netty/issues/597
534         if (selectedKeys.isEmpty()) {
535             return;
536         }
537 
538         Iterator<SelectionKey> i = selectedKeys.iterator();
539         for (;;) {
540             final SelectionKey k = i.next();
541             final Object a = k.attachment();
542             i.remove();
543 
544             if (a instanceof AbstractNioChannel) {
545                 processSelectedKey(k, (AbstractNioChannel) a);
546             } else {
547                 @SuppressWarnings("unchecked")
548                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
549                 processSelectedKey(k, task);
550             }
551 
552             if (!i.hasNext()) {
553                 break;
554             }
555 
556             if (needsToSelectAgain) {
557                 selectAgain();
558                 selectedKeys = selector.selectedKeys();
559 
560                 // Create the iterator again to avoid ConcurrentModificationException
561                 if (selectedKeys.isEmpty()) {
562                     break;
563                 } else {
564                     i = selectedKeys.iterator();
565                 }
566             }
567         }
568     }
569 
570     private void processSelectedKeysOptimized() {
571         for (int i = 0; i < selectedKeys.size; ++i) {
572             final SelectionKey k = selectedKeys.keys[i];
573             // null out entry in the array to allow to have it GC'ed once the Channel close
574             // See https://github.com/netty/netty/issues/2363
575             selectedKeys.keys[i] = null;
576 
577             final Object a = k.attachment();
578 
579             if (a instanceof AbstractNioChannel) {
580                 processSelectedKey(k, (AbstractNioChannel) a);
581             } else {
582                 @SuppressWarnings("unchecked")
583                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
584                 processSelectedKey(k, task);
585             }
586 
587             if (needsToSelectAgain) {
588                 // null out entries in the array to allow to have it GC'ed once the Channel close
589                 // See https://github.com/netty/netty/issues/2363
590                 selectedKeys.reset(i + 1);
591 
592                 selectAgain();
593                 i = -1;
594             }
595         }
596     }
597 
598     private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
599         final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
600         if (!k.isValid()) {
601             final EventLoop eventLoop;
602             try {
603                 eventLoop = ch.eventLoop();
604             } catch (Throwable ignored) {
605                 // If the channel implementation throws an exception because there is no event loop, we ignore this
606                 // because we are only trying to determine if ch is registered to this event loop and thus has authority
607                 // to close ch.
608                 return;
609             }
610             // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
611             // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
612             // still healthy and should not be closed.
613             // See https://github.com/netty/netty/issues/5125
614             if (eventLoop != this || eventLoop == null) {
615                 return;
616             }
617             // close the channel if the key is not valid anymore
618             unsafe.close(unsafe.voidPromise());
619             return;
620         }
621 
622         try {
623             int readyOps = k.readyOps();
624             // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
625             // the NIO JDK channel implementation may throw a NotYetConnectedException.
626             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
627                 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
628                 // See https://github.com/netty/netty/issues/924
629                 int ops = k.interestOps();
630                 ops &= ~SelectionKey.OP_CONNECT;
631                 k.interestOps(ops);
632 
633                 unsafe.finishConnect();
634             }
635 
636             // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
637             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
638                 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
639                 ch.unsafe().forceFlush();
640             }
641 
642             // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
643             // to a spin loop
644             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
645                 unsafe.read();
646             }
647         } catch (CancelledKeyException ignored) {
648             unsafe.close(unsafe.voidPromise());
649         }
650     }
651 
652     private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
653         int state = 0;
654         try {
655             task.channelReady(k.channel(), k);
656             state = 1;
657         } catch (Exception e) {
658             k.cancel();
659             invokeChannelUnregistered(task, k, e);
660             state = 2;
661         } finally {
662             switch (state) {
663             case 0:
664                 k.cancel();
665                 invokeChannelUnregistered(task, k, null);
666                 break;
667             case 1:
668                 if (!k.isValid()) { // Cancelled by channelReady()
669                     invokeChannelUnregistered(task, k, null);
670                 }
671                 break;
672             }
673         }
674     }
675 
676     private void closeAll() {
677         selectAgain();
678         Set<SelectionKey> keys = selector.keys();
679         Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
680         for (SelectionKey k: keys) {
681             Object a = k.attachment();
682             if (a instanceof AbstractNioChannel) {
683                 channels.add((AbstractNioChannel) a);
684             } else {
685                 k.cancel();
686                 @SuppressWarnings("unchecked")
687                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
688                 invokeChannelUnregistered(task, k, null);
689             }
690         }
691 
692         for (AbstractNioChannel ch: channels) {
693             ch.unsafe().close(ch.unsafe().voidPromise());
694         }
695     }
696 
697     private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause) {
698         try {
699             task.channelUnregistered(k.channel(), cause);
700         } catch (Exception e) {
701             logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e);
702         }
703     }
704 
705     @Override
706     protected void wakeup(boolean inEventLoop) {
707         if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
708             selector.wakeup();
709         }
710     }
711 
712     Selector unwrappedSelector() {
713         return unwrappedSelector;
714     }
715 
716     int selectNow() throws IOException {
717         try {
718             return selector.selectNow();
719         } finally {
720             // restore wakeup state if needed
721             if (wakenUp.get()) {
722                 selector.wakeup();
723             }
724         }
725     }
726 
727     private void select(boolean oldWakenUp) throws IOException {
728         Selector selector = this.selector;
729         try {
730             int selectCnt = 0;
731             long currentTimeNanos = System.nanoTime();
732             long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
733             for (;;) {
734                 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
735                 if (timeoutMillis <= 0) {
736                     if (selectCnt == 0) {
737                         selector.selectNow();
738                         selectCnt = 1;
739                     }
740                     break;
741                 }
742 
743                 // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
744                 // Selector#wakeup. So we need to check task queue again before executing select operation.
745                 // If we don't, the task might be pended until select operation was timed out.
746                 // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
747                 if (hasTasks() && wakenUp.compareAndSet(false, true)) {
748                     selector.selectNow();
749                     selectCnt = 1;
750                     break;
751                 }
752 
753                 int selectedKeys = selector.select(timeoutMillis);
754                 selectCnt ++;
755 
756                 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
757                     // - Selected something,
758                     // - waken up by user, or
759                     // - the task queue has a pending task.
760                     // - a scheduled task is ready for processing
761                     break;
762                 }
763                 if (Thread.interrupted()) {
764                     // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
765                     // As this is most likely a bug in the handler of the user or it's client library we will
766                     // also log it.
767                     //
768                     // See https://github.com/netty/netty/issues/2426
769                     if (logger.isDebugEnabled()) {
770                         logger.debug("Selector.select() returned prematurely because " +
771                                 "Thread.currentThread().interrupt() was called. Use " +
772                                 "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
773                     }
774                     selectCnt = 1;
775                     break;
776                 }
777 
778                 long time = System.nanoTime();
779                 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
780                     // timeoutMillis elapsed without anything selected.
781                     selectCnt = 1;
782                 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
783                         selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
784                     // The selector returned prematurely many times in a row.
785                     // Rebuild the selector to work around the problem.
786                     logger.warn(
787                             "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
788                             selectCnt, selector);
789 
790                     rebuildSelector();
791                     selector = this.selector;
792 
793                     // Select again to populate selectedKeys.
794                     selector.selectNow();
795                     selectCnt = 1;
796                     break;
797                 }
798 
799                 currentTimeNanos = time;
800             }
801 
802             if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
803                 if (logger.isDebugEnabled()) {
804                     logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
805                             selectCnt - 1, selector);
806                 }
807             }
808         } catch (CancelledKeyException e) {
809             if (logger.isDebugEnabled()) {
810                 logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
811                         selector, e);
812             }
813             // Harmless exception - log anyway
814         }
815     }
816 
817     private void selectAgain() {
818         needsToSelectAgain = false;
819         try {
820             selector.selectNow();
821         } catch (Throwable t) {
822             logger.warn("Failed to update SelectionKeys.", t);
823         }
824     }
825 }