1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package io.netty.channel.nio;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelException;
20  import io.netty.channel.EventLoop;
21  import io.netty.channel.EventLoopException;
22  import io.netty.channel.EventLoopTaskQueueFactory;
23  import io.netty.channel.SelectStrategy;
24  import io.netty.channel.SingleThreadEventLoop;
25  import io.netty.util.IntSupplier;
26  import io.netty.util.concurrent.RejectedExecutionHandler;
27  import io.netty.util.internal.ObjectUtil;
28  import io.netty.util.internal.PlatformDependent;
29  import io.netty.util.internal.ReflectionUtil;
30  import io.netty.util.internal.SystemPropertyUtil;
31  import io.netty.util.internal.logging.InternalLogger;
32  import io.netty.util.internal.logging.InternalLoggerFactory;
33  
34  import java.io.IOException;
35  import java.lang.reflect.Field;
36  import java.nio.channels.CancelledKeyException;
37  import java.nio.channels.SelectableChannel;
38  import java.nio.channels.Selector;
39  import java.nio.channels.SelectionKey;
40  
41  import java.nio.channels.spi.SelectorProvider;
42  import java.security.AccessController;
43  import java.security.PrivilegedAction;
44  import java.util.ArrayList;
45  import java.util.Collection;
46  import java.util.Iterator;
47  import java.util.NoSuchElementException;
48  import java.util.Queue;
49  import java.util.Set;
50  import java.util.concurrent.Executor;
51  import java.util.concurrent.atomic.AtomicLong;
52  
53  
54  
55  
56  
57  
58  public final class NioEventLoop extends SingleThreadEventLoop {
59  
60      private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
61  
62      private static final int CLEANUP_INTERVAL = 256; 
63  
64      private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
65              SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
66  
67      private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
68      private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
69  
70      private final IntSupplier selectNowSupplier = new IntSupplier() {
71          @Override
72          public int get() throws Exception {
73              return selectNow();
74          }
75      };
76  
77      
78      
79      
80      
81      
82      
83      static {
84          if (PlatformDependent.javaVersion() < 7) {
85              final String key = "sun.nio.ch.bugLevel";
86              final String bugLevel = SystemPropertyUtil.get(key);
87              if (bugLevel == null) {
88                  try {
89                      AccessController.doPrivileged(new PrivilegedAction<Void>() {
90                          @Override
91                          public Void run() {
92                              System.setProperty(key, "");
93                              return null;
94                          }
95                      });
96                  } catch (final SecurityException e) {
97                      logger.debug("Unable to get/set System Property: " + key, e);
98                  }
99              }
100         }
101 
102         int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
103         if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
104             selectorAutoRebuildThreshold = 0;
105         }
106 
107         SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
108 
109         if (logger.isDebugEnabled()) {
110             logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
111             logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
112         }
113     }
114 
115     
116 
117 
118     private Selector selector;
119     private Selector unwrappedSelector;
120     private SelectedSelectionKeySet selectedKeys;
121 
122     private final SelectorProvider provider;
123 
124     private static final long AWAKE = -1L;
125     private static final long NONE = Long.MAX_VALUE;
126 
127     
128     
129     
130     
131     private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
132 
133     private final SelectStrategy selectStrategy;
134 
135     private volatile int ioRatio = 50;
136     private int cancelledKeys;
137     private boolean needsToSelectAgain;
138 
139     NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
140                  SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
141                  EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
142         super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
143                 rejectedExecutionHandler);
144         this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
145         this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
146         final SelectorTuple selectorTuple = openSelector();
147         this.selector = selectorTuple.selector;
148         this.unwrappedSelector = selectorTuple.unwrappedSelector;
149     }
150 
151     private static Queue<Runnable> newTaskQueue(
152             EventLoopTaskQueueFactory queueFactory) {
153         if (queueFactory == null) {
154             return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
155         }
156         return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
157     }
158 
159     private static final class SelectorTuple {
160         final Selector unwrappedSelector;
161         final Selector selector;
162 
163         SelectorTuple(Selector unwrappedSelector) {
164             this.unwrappedSelector = unwrappedSelector;
165             this.selector = unwrappedSelector;
166         }
167 
168         SelectorTuple(Selector unwrappedSelector, Selector selector) {
169             this.unwrappedSelector = unwrappedSelector;
170             this.selector = selector;
171         }
172     }
173 
174     private SelectorTuple openSelector() {
175         final Selector unwrappedSelector;
176         try {
177             unwrappedSelector = provider.openSelector();
178         } catch (IOException e) {
179             throw new ChannelException("failed to open a new selector", e);
180         }
181 
182         if (DISABLE_KEY_SET_OPTIMIZATION) {
183             return new SelectorTuple(unwrappedSelector);
184         }
185 
186         Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
187             @Override
188             public Object run() {
189                 try {
190                     return Class.forName(
191                             "sun.nio.ch.SelectorImpl",
192                             false,
193                             PlatformDependent.getSystemClassLoader());
194                 } catch (Throwable cause) {
195                     return cause;
196                 }
197             }
198         });
199 
200         if (!(maybeSelectorImplClass instanceof Class) ||
201             
202             !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
203             if (maybeSelectorImplClass instanceof Throwable) {
204                 Throwable t = (Throwable) maybeSelectorImplClass;
205                 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
206             }
207             return new SelectorTuple(unwrappedSelector);
208         }
209 
210         final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
211         final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
212 
213         Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
214             @Override
215             public Object run() {
216                 try {
217                     Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
218                     Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
219 
220                     if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
221                         
222                         
223                         long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
224                         long publicSelectedKeysFieldOffset =
225                                 PlatformDependent.objectFieldOffset(publicSelectedKeysField);
226 
227                         if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
228                             PlatformDependent.putObject(
229                                     unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
230                             PlatformDependent.putObject(
231                                     unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
232                             return null;
233                         }
234                         
235                     }
236 
237                     Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
238                     if (cause != null) {
239                         return cause;
240                     }
241                     cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
242                     if (cause != null) {
243                         return cause;
244                     }
245 
246                     selectedKeysField.set(unwrappedSelector, selectedKeySet);
247                     publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
248                     return null;
249                 } catch (NoSuchFieldException e) {
250                     return e;
251                 } catch (IllegalAccessException e) {
252                     return e;
253                 }
254             }
255         });
256 
257         if (maybeException instanceof Exception) {
258             selectedKeys = null;
259             Exception e = (Exception) maybeException;
260             logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
261             return new SelectorTuple(unwrappedSelector);
262         }
263         selectedKeys = selectedKeySet;
264         logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
265         return new SelectorTuple(unwrappedSelector,
266                                  new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
267     }
268 
269     
270 
271 
272     public SelectorProvider selectorProvider() {
273         return provider;
274     }
275 
276     @Override
277     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
278         return newTaskQueue0(maxPendingTasks);
279     }
280 
281     private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
282         
283         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
284                 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
285     }
286 
287     
288 
289 
290 
291 
292     public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
293         ObjectUtil.checkNotNull(ch, "ch");
294         if (interestOps == 0) {
295             throw new IllegalArgumentException("interestOps must be non-zero.");
296         }
297         if ((interestOps & ~ch.validOps()) != 0) {
298             throw new IllegalArgumentException(
299                     "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
300         }
301         ObjectUtil.checkNotNull(task, "task");
302 
303         if (isShutdown()) {
304             throw new IllegalStateException("event loop shut down");
305         }
306 
307         if (inEventLoop()) {
308             register0(ch, interestOps, task);
309         } else {
310             try {
311                 
312                 
313                 submit(new Runnable() {
314                     @Override
315                     public void run() {
316                         register0(ch, interestOps, task);
317                     }
318                 }).sync();
319             } catch (InterruptedException ignore) {
320                 
321                 Thread.currentThread().interrupt();
322             }
323         }
324     }
325 
326     private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
327         try {
328             ch.register(unwrappedSelector, interestOps, task);
329         } catch (Exception e) {
330             throw new EventLoopException("failed to register a channel", e);
331         }
332     }
333 
334     
335 
336 
337     public int getIoRatio() {
338         return ioRatio;
339     }
340 
341     
342 
343 
344 
345 
346 
347     public void setIoRatio(int ioRatio) {
348         if (ioRatio <= 0 || ioRatio > 100) {
349             throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
350         }
351         this.ioRatio = ioRatio;
352     }
353 
354     
355 
356 
357 
358     public void rebuildSelector() {
359         if (!inEventLoop()) {
360             execute(new Runnable() {
361                 @Override
362                 public void run() {
363                     rebuildSelector0();
364                 }
365             });
366             return;
367         }
368         rebuildSelector0();
369     }
370 
371     @Override
372     public int registeredChannels() {
373         return selector.keys().size() - cancelledKeys;
374     }
375 
376     @Override
377     public Iterator<Channel> registeredChannelsIterator() {
378         assert inEventLoop();
379         final Set<SelectionKey> keys = selector.keys();
380         if (keys.isEmpty()) {
381             return ChannelsReadOnlyIterator.empty();
382         }
383         return new Iterator<Channel>() {
384             final Iterator<SelectionKey> selectionKeyIterator =
385                     ObjectUtil.checkNotNull(keys, "selectionKeys")
386                             .iterator();
387             Channel next;
388             boolean isDone;
389 
390             @Override
391             public boolean hasNext() {
392                 if (isDone) {
393                     return false;
394                 }
395                 Channel cur = next;
396                 if (cur == null) {
397                     cur = next = nextOrDone();
398                     return cur != null;
399                 }
400                 return true;
401             }
402 
403             @Override
404             public Channel next() {
405                 if (isDone) {
406                     throw new NoSuchElementException();
407                 }
408                 Channel cur = next;
409                 if (cur == null) {
410                     cur = nextOrDone();
411                     if (cur == null) {
412                         throw new NoSuchElementException();
413                     }
414                 }
415                 next = nextOrDone();
416                 return cur;
417             }
418 
419             @Override
420             public void remove() {
421                 throw new UnsupportedOperationException("remove");
422             }
423 
424             private Channel nextOrDone() {
425                 Iterator<SelectionKey> it = selectionKeyIterator;
426                 while (it.hasNext()) {
427                     SelectionKey key = it.next();
428                     if (key.isValid()) {
429                         Object attachment = key.attachment();
430                         if (attachment instanceof AbstractNioChannel) {
431                             return (AbstractNioChannel) attachment;
432                         }
433                     }
434                 }
435                 isDone = true;
436                 return null;
437             }
438         };
439     }
440 
441     private void rebuildSelector0() {
442         final Selector oldSelector = selector;
443         final SelectorTuple newSelectorTuple;
444 
445         if (oldSelector == null) {
446             return;
447         }
448 
449         try {
450             newSelectorTuple = openSelector();
451         } catch (Exception e) {
452             logger.warn("Failed to create a new Selector.", e);
453             return;
454         }
455 
456         
457         int nChannels = 0;
458         for (SelectionKey key: oldSelector.keys()) {
459             Object a = key.attachment();
460             try {
461                 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
462                     continue;
463                 }
464 
465                 int interestOps = key.interestOps();
466                 key.cancel();
467                 SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
468                 if (a instanceof AbstractNioChannel) {
469                     
470                     ((AbstractNioChannel) a).selectionKey = newKey;
471                 }
472                 nChannels ++;
473             } catch (Exception e) {
474                 logger.warn("Failed to re-register a Channel to the new Selector.", e);
475                 if (a instanceof AbstractNioChannel) {
476                     AbstractNioChannel ch = (AbstractNioChannel) a;
477                     ch.unsafe().close(ch.unsafe().voidPromise());
478                 } else {
479                     @SuppressWarnings("unchecked")
480                     NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
481                     invokeChannelUnregistered(task, key, e);
482                 }
483             }
484         }
485 
486         selector = newSelectorTuple.selector;
487         unwrappedSelector = newSelectorTuple.unwrappedSelector;
488 
489         try {
490             
491             oldSelector.close();
492         } catch (Throwable t) {
493             if (logger.isWarnEnabled()) {
494                 logger.warn("Failed to close the old Selector.", t);
495             }
496         }
497 
498         if (logger.isInfoEnabled()) {
499             logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
500         }
501     }
502 
503     @Override
504     protected void run() {
505         int selectCnt = 0;
506         for (;;) {
507             try {
508                 int strategy;
509                 try {
510                     strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
511                     switch (strategy) {
512                     case SelectStrategy.CONTINUE:
513                         continue;
514 
515                     case SelectStrategy.BUSY_WAIT:
516                         
517 
518                     case SelectStrategy.SELECT:
519                         long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
520                         if (curDeadlineNanos == -1L) {
521                             curDeadlineNanos = NONE; 
522                         }
523                         nextWakeupNanos.set(curDeadlineNanos);
524                         try {
525                             if (!hasTasks()) {
526                                 strategy = select(curDeadlineNanos);
527                             }
528                         } finally {
529                             
530                             
531                             nextWakeupNanos.lazySet(AWAKE);
532                         }
533                         
534                     default:
535                     }
536                 } catch (IOException e) {
537                     
538                     
539                     rebuildSelector0();
540                     selectCnt = 0;
541                     handleLoopException(e);
542                     continue;
543                 }
544 
545                 selectCnt++;
546                 cancelledKeys = 0;
547                 needsToSelectAgain = false;
548                 final int ioRatio = this.ioRatio;
549                 boolean ranTasks;
550                 if (ioRatio == 100) {
551                     try {
552                         if (strategy > 0) {
553                             processSelectedKeys();
554                         }
555                     } finally {
556                         
557                         ranTasks = runAllTasks();
558                     }
559                 } else if (strategy > 0) {
560                     final long ioStartTime = System.nanoTime();
561                     try {
562                         processSelectedKeys();
563                     } finally {
564                         
565                         final long ioTime = System.nanoTime() - ioStartTime;
566                         ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
567                     }
568                 } else {
569                     ranTasks = runAllTasks(0); 
570                 }
571 
572                 if (selectReturnPrematurely(selectCnt, ranTasks, strategy)) {
573                     selectCnt = 0;
574                 } else if (unexpectedSelectorWakeup(selectCnt)) { 
575                     selectCnt = 0;
576                 }
577             } catch (CancelledKeyException e) {
578                 
579                 if (logger.isDebugEnabled()) {
580                     logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
581                             selector, e);
582                 }
583             } catch (Error e) {
584                 throw e;
585             } catch (Throwable t) {
586                 handleLoopException(t);
587             } finally {
588                 
589                 try {
590                     if (isShuttingDown()) {
591                         closeAll();
592                         if (confirmShutdown()) {
593                             return;
594                         }
595                     }
596                 } catch (Error e) {
597                     throw e;
598                 } catch (Throwable t) {
599                     handleLoopException(t);
600                 }
601             }
602         }
603     }
604 
605     
606     private boolean selectReturnPrematurely(int selectCnt, boolean ranTasks, int strategy) {
607         if (ranTasks || strategy > 0) {
608             if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
609                 logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
610                     selectCnt - 1, selector);
611             }
612             return true;
613         }
614         return false;
615     }
616 
617     
618     private boolean unexpectedSelectorWakeup(int selectCnt) {
619         if (Thread.interrupted()) {
620             
621             
622             
623             
624             
625             if (logger.isDebugEnabled()) {
626                 logger.debug("Selector.select() returned prematurely because " +
627                         "Thread.currentThread().interrupt() was called. Use " +
628                         "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
629             }
630             return true;
631         }
632         if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
633                 selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
634             
635             
636             logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
637                     selectCnt, selector);
638             rebuildSelector();
639             return true;
640         }
641         return false;
642     }
643 
644     private static void handleLoopException(Throwable t) {
645         logger.warn("Unexpected exception in the selector loop.", t);
646 
647         
648         
649         try {
650             Thread.sleep(1000);
651         } catch (InterruptedException e) {
652             
653         }
654     }
655 
656     private void processSelectedKeys() {
657         if (selectedKeys != null) {
658             processSelectedKeysOptimized();
659         } else {
660             processSelectedKeysPlain(selector.selectedKeys());
661         }
662     }
663 
664     @Override
665     protected void cleanup() {
666         try {
667             selector.close();
668         } catch (IOException e) {
669             logger.warn("Failed to close a selector.", e);
670         }
671     }
672 
673     void cancel(SelectionKey key) {
674         key.cancel();
675         cancelledKeys ++;
676         if (cancelledKeys >= CLEANUP_INTERVAL) {
677             cancelledKeys = 0;
678             needsToSelectAgain = true;
679         }
680     }
681 
682     private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
683         
684         
685         
686         if (selectedKeys.isEmpty()) {
687             return;
688         }
689 
690         Iterator<SelectionKey> i = selectedKeys.iterator();
691         for (;;) {
692             final SelectionKey k = i.next();
693             final Object a = k.attachment();
694             i.remove();
695 
696             if (a instanceof AbstractNioChannel) {
697                 processSelectedKey(k, (AbstractNioChannel) a);
698             } else {
699                 @SuppressWarnings("unchecked")
700                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
701                 processSelectedKey(k, task);
702             }
703 
704             if (!i.hasNext()) {
705                 break;
706             }
707 
708             if (needsToSelectAgain) {
709                 selectAgain();
710                 selectedKeys = selector.selectedKeys();
711 
712                 
713                 if (selectedKeys.isEmpty()) {
714                     break;
715                 } else {
716                     i = selectedKeys.iterator();
717                 }
718             }
719         }
720     }
721 
722     private void processSelectedKeysOptimized() {
723         for (int i = 0; i < selectedKeys.size; ++i) {
724             final SelectionKey k = selectedKeys.keys[i];
725             
726             
727             selectedKeys.keys[i] = null;
728 
729             final Object a = k.attachment();
730 
731             if (a instanceof AbstractNioChannel) {
732                 processSelectedKey(k, (AbstractNioChannel) a);
733             } else {
734                 @SuppressWarnings("unchecked")
735                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
736                 processSelectedKey(k, task);
737             }
738 
739             if (needsToSelectAgain) {
740                 
741                 
742                 selectedKeys.reset(i + 1);
743 
744                 selectAgain();
745                 i = -1;
746             }
747         }
748     }
749 
750     private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
751         final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
752         if (!k.isValid()) {
753             final EventLoop eventLoop;
754             try {
755                 eventLoop = ch.eventLoop();
756             } catch (Throwable ignored) {
757                 
758                 
759                 
760                 return;
761             }
762             
763             
764             
765             
766             if (eventLoop == this) {
767                 
768                 unsafe.close(unsafe.voidPromise());
769             }
770             return;
771         }
772 
773         try {
774             int readyOps = k.readyOps();
775             
776             
777             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
778                 
779                 
780                 int ops = k.interestOps();
781                 ops &= ~SelectionKey.OP_CONNECT;
782                 k.interestOps(ops);
783 
784                 unsafe.finishConnect();
785             }
786 
787             
788             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
789                 
790                unsafe.forceFlush();
791             }
792 
793             
794             
795             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
796                 unsafe.read();
797             }
798         } catch (CancelledKeyException ignored) {
799             unsafe.close(unsafe.voidPromise());
800         }
801     }
802 
803     private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
804         int state = 0;
805         try {
806             task.channelReady(k.channel(), k);
807             state = 1;
808         } catch (Exception e) {
809             k.cancel();
810             invokeChannelUnregistered(task, k, e);
811             state = 2;
812         } finally {
813             switch (state) {
814             case 0:
815                 k.cancel();
816                 invokeChannelUnregistered(task, k, null);
817                 break;
818             case 1:
819                 if (!k.isValid()) { 
820                     invokeChannelUnregistered(task, k, null);
821                 }
822                 break;
823             default:
824                  break;
825             }
826         }
827     }
828 
829     private void closeAll() {
830         selectAgain();
831         Set<SelectionKey> keys = selector.keys();
832         Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
833         for (SelectionKey k: keys) {
834             Object a = k.attachment();
835             if (a instanceof AbstractNioChannel) {
836                 channels.add((AbstractNioChannel) a);
837             } else {
838                 k.cancel();
839                 @SuppressWarnings("unchecked")
840                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
841                 invokeChannelUnregistered(task, k, null);
842             }
843         }
844 
845         for (AbstractNioChannel ch: channels) {
846             ch.unsafe().close(ch.unsafe().voidPromise());
847         }
848     }
849 
850     private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause) {
851         try {
852             task.channelUnregistered(k.channel(), cause);
853         } catch (Exception e) {
854             logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e);
855         }
856     }
857 
858     @Override
859     protected void wakeup(boolean inEventLoop) {
860         if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
861             selector.wakeup();
862         }
863     }
864 
865     @Override
866     protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
867         
868         return deadlineNanos < nextWakeupNanos.get();
869     }
870 
871     @Override
872     protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
873         
874         return deadlineNanos < nextWakeupNanos.get();
875     }
876 
877     Selector unwrappedSelector() {
878         return unwrappedSelector;
879     }
880 
881     int selectNow() throws IOException {
882         return selector.selectNow();
883     }
884 
885     private int select(long deadlineNanos) throws IOException {
886         if (deadlineNanos == NONE) {
887             return selector.select();
888         }
889         
890         long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
891         return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
892     }
893 
894     private void selectAgain() {
895         needsToSelectAgain = false;
896         try {
897             selector.selectNow();
898         } catch (Throwable t) {
899             logger.warn("Failed to update SelectionKeys.", t);
900         }
901     }
902 }