View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelException;
20  import io.netty.channel.EventLoop;
21  import io.netty.channel.EventLoopException;
22  import io.netty.channel.EventLoopTaskQueueFactory;
23  import io.netty.channel.SelectStrategy;
24  import io.netty.channel.SingleThreadEventLoop;
25  import io.netty.util.IntSupplier;
26  import io.netty.util.concurrent.RejectedExecutionHandler;
27  import io.netty.util.internal.ObjectUtil;
28  import io.netty.util.internal.PlatformDependent;
29  import io.netty.util.internal.ReflectionUtil;
30  import io.netty.util.internal.SystemPropertyUtil;
31  import io.netty.util.internal.logging.InternalLogger;
32  import io.netty.util.internal.logging.InternalLoggerFactory;
33  
34  import java.io.IOException;
35  import java.lang.reflect.Field;
36  import java.nio.channels.CancelledKeyException;
37  import java.nio.channels.SelectableChannel;
38  import java.nio.channels.Selector;
39  import java.nio.channels.SelectionKey;
40  
41  import java.nio.channels.spi.SelectorProvider;
42  import java.security.AccessController;
43  import java.security.PrivilegedAction;
44  import java.util.ArrayList;
45  import java.util.Collection;
46  import java.util.Iterator;
47  import java.util.NoSuchElementException;
48  import java.util.Queue;
49  import java.util.Set;
50  import java.util.concurrent.Executor;
51  import java.util.concurrent.atomic.AtomicLong;
52  
53  /**
54   * {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a
55   * {@link Selector} and so does the multi-plexing of these in the event loop.
56   *
57   */
58  public final class NioEventLoop extends SingleThreadEventLoop {
59  
60      private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
61  
62      private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
63  
64      private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
65              SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
66  
67      private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
68      private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
69  
70      private final IntSupplier selectNowSupplier = new IntSupplier() {
71          @Override
72          public int get() throws Exception {
73              return selectNow();
74          }
75      };
76  
77      // Workaround for JDK NIO bug.
78      //
79      // See:
80      // - https://bugs.openjdk.java.net/browse/JDK-6427854 for first few dev (unreleased) builds of JDK 7
81      // - https://bugs.openjdk.java.net/browse/JDK-6527572 for JDK prior to 5.0u15-rev and 6u10
82      // - https://github.com/netty/netty/issues/203
83      static {
84          if (PlatformDependent.javaVersion() < 7) {
85              final String key = "sun.nio.ch.bugLevel";
86              final String bugLevel = SystemPropertyUtil.get(key);
87              if (bugLevel == null) {
88                  try {
89                      AccessController.doPrivileged(new PrivilegedAction<Void>() {
90                          @Override
91                          public Void run() {
92                              System.setProperty(key, "");
93                              return null;
94                          }
95                      });
96                  } catch (final SecurityException e) {
97                      logger.debug("Unable to get/set System Property: " + key, e);
98                  }
99              }
100         }
101 
102         int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
103         if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
104             selectorAutoRebuildThreshold = 0;
105         }
106 
107         SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
108 
109         if (logger.isDebugEnabled()) {
110             logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
111             logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
112         }
113     }
114 
115     /**
116      * The NIO {@link Selector}.
117      */
118     private Selector selector;
119     private Selector unwrappedSelector;
120     private SelectedSelectionKeySet selectedKeys;
121 
122     private final SelectorProvider provider;
123 
124     private static final long AWAKE = -1L;
125     private static final long NONE = Long.MAX_VALUE;
126 
127     // nextWakeupNanos is:
128     //    AWAKE            when EL is awake
129     //    NONE             when EL is waiting with no wakeup scheduled
130     //    other value T    when EL is waiting with wakeup scheduled at time T
131     private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
132 
133     private final SelectStrategy selectStrategy;
134 
135     private volatile int ioRatio = 50;
136     private int cancelledKeys;
137     private boolean needsToSelectAgain;
138 
139     NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
140                  SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
141                  EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
142         super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
143                 rejectedExecutionHandler);
144         this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
145         this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
146         final SelectorTuple selectorTuple = openSelector();
147         this.selector = selectorTuple.selector;
148         this.unwrappedSelector = selectorTuple.unwrappedSelector;
149     }
150 
151     private static Queue<Runnable> newTaskQueue(
152             EventLoopTaskQueueFactory queueFactory) {
153         if (queueFactory == null) {
154             return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
155         }
156         return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
157     }
158 
159     private static final class SelectorTuple {
160         final Selector unwrappedSelector;
161         final Selector selector;
162 
163         SelectorTuple(Selector unwrappedSelector) {
164             this.unwrappedSelector = unwrappedSelector;
165             this.selector = unwrappedSelector;
166         }
167 
168         SelectorTuple(Selector unwrappedSelector, Selector selector) {
169             this.unwrappedSelector = unwrappedSelector;
170             this.selector = selector;
171         }
172     }
173 
174     private SelectorTuple openSelector() {
175         final Selector unwrappedSelector;
176         try {
177             unwrappedSelector = provider.openSelector();
178         } catch (IOException e) {
179             throw new ChannelException("failed to open a new selector", e);
180         }
181 
182         if (DISABLE_KEY_SET_OPTIMIZATION) {
183             return new SelectorTuple(unwrappedSelector);
184         }
185 
186         Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
187             @Override
188             public Object run() {
189                 try {
190                     return Class.forName(
191                             "sun.nio.ch.SelectorImpl",
192                             false,
193                             PlatformDependent.getSystemClassLoader());
194                 } catch (Throwable cause) {
195                     return cause;
196                 }
197             }
198         });
199 
200         if (!(maybeSelectorImplClass instanceof Class) ||
201             // ensure the current selector implementation is what we can instrument.
202             !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
203             if (maybeSelectorImplClass instanceof Throwable) {
204                 Throwable t = (Throwable) maybeSelectorImplClass;
205                 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
206             }
207             return new SelectorTuple(unwrappedSelector);
208         }
209 
210         final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
211         final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
212 
213         Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
214             @Override
215             public Object run() {
216                 try {
217                     Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
218                     Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
219 
220                     if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
221                         // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
222                         // This allows us to also do this in Java9+ without any extra flags.
223                         long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
224                         long publicSelectedKeysFieldOffset =
225                                 PlatformDependent.objectFieldOffset(publicSelectedKeysField);
226 
227                         if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
228                             PlatformDependent.putObject(
229                                     unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
230                             PlatformDependent.putObject(
231                                     unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
232                             return null;
233                         }
234                         // We could not retrieve the offset, lets try reflection as last-resort.
235                     }
236 
237                     Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
238                     if (cause != null) {
239                         return cause;
240                     }
241                     cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
242                     if (cause != null) {
243                         return cause;
244                     }
245 
246                     selectedKeysField.set(unwrappedSelector, selectedKeySet);
247                     publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
248                     return null;
249                 } catch (NoSuchFieldException e) {
250                     return e;
251                 } catch (IllegalAccessException e) {
252                     return e;
253                 }
254             }
255         });
256 
257         if (maybeException instanceof Exception) {
258             selectedKeys = null;
259             Exception e = (Exception) maybeException;
260             logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
261             return new SelectorTuple(unwrappedSelector);
262         }
263         selectedKeys = selectedKeySet;
264         logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
265         return new SelectorTuple(unwrappedSelector,
266                                  new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
267     }
268 
269     /**
270      * Returns the {@link SelectorProvider} used by this {@link NioEventLoop} to obtain the {@link Selector}.
271      */
272     public SelectorProvider selectorProvider() {
273         return provider;
274     }
275 
276     @Override
277     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
278         return newTaskQueue0(maxPendingTasks);
279     }
280 
281     private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
282         // This event loop never calls takeTask()
283         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
284                 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
285     }
286 
287     /**
288      * Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
289      * of this event loop.  Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
290      * be executed by this event loop when the {@link SelectableChannel} is ready.
291      */
292     public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
293         ObjectUtil.checkNotNull(ch, "ch");
294         if (interestOps == 0) {
295             throw new IllegalArgumentException("interestOps must be non-zero.");
296         }
297         if ((interestOps & ~ch.validOps()) != 0) {
298             throw new IllegalArgumentException(
299                     "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
300         }
301         ObjectUtil.checkNotNull(task, "task");
302 
303         if (isShutdown()) {
304             throw new IllegalStateException("event loop shut down");
305         }
306 
307         if (inEventLoop()) {
308             register0(ch, interestOps, task);
309         } else {
310             try {
311                 // Offload to the EventLoop as otherwise java.nio.channels.spi.AbstractSelectableChannel.register
312                 // may block for a long time while trying to obtain an internal lock that may be hold while selecting.
313                 submit(new Runnable() {
314                     @Override
315                     public void run() {
316                         register0(ch, interestOps, task);
317                     }
318                 }).sync();
319             } catch (InterruptedException ignore) {
320                 // Even if interrupted we did schedule it so just mark the Thread as interrupted.
321                 Thread.currentThread().interrupt();
322             }
323         }
324     }
325 
326     private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
327         try {
328             ch.register(unwrappedSelector, interestOps, task);
329         } catch (Exception e) {
330             throw new EventLoopException("failed to register a channel", e);
331         }
332     }
333 
334     /**
335      * Returns the percentage of the desired amount of time spent for I/O in the event loop.
336      */
337     public int getIoRatio() {
338         return ioRatio;
339     }
340 
341     /**
342      * Sets the percentage of the desired amount of time spent for I/O in the event loop. Value range from 1-100.
343      * The default value is {@code 50}, which means the event loop will try to spend the same amount of time for I/O
344      * as for non-I/O tasks. The lower the number the more time can be spent on non-I/O tasks. If value set to
345      * {@code 100}, this feature will be disabled and event loop will not attempt to balance I/O and non-I/O tasks.
346      */
347     public void setIoRatio(int ioRatio) {
348         if (ioRatio <= 0 || ioRatio > 100) {
349             throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
350         }
351         this.ioRatio = ioRatio;
352     }
353 
354     /**
355      * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
356      * around the infamous epoll 100% CPU bug.
357      */
358     public void rebuildSelector() {
359         if (!inEventLoop()) {
360             execute(new Runnable() {
361                 @Override
362                 public void run() {
363                     rebuildSelector0();
364                 }
365             });
366             return;
367         }
368         rebuildSelector0();
369     }
370 
371     @Override
372     public int registeredChannels() {
373         return selector.keys().size() - cancelledKeys;
374     }
375 
376     @Override
377     public Iterator<Channel> registeredChannelsIterator() {
378         assert inEventLoop();
379         final Set<SelectionKey> keys = selector.keys();
380         if (keys.isEmpty()) {
381             return ChannelsReadOnlyIterator.empty();
382         }
383         return new Iterator<Channel>() {
384             final Iterator<SelectionKey> selectionKeyIterator =
385                     ObjectUtil.checkNotNull(keys, "selectionKeys")
386                             .iterator();
387             Channel next;
388             boolean isDone;
389 
390             @Override
391             public boolean hasNext() {
392                 if (isDone) {
393                     return false;
394                 }
395                 Channel cur = next;
396                 if (cur == null) {
397                     cur = next = nextOrDone();
398                     return cur != null;
399                 }
400                 return true;
401             }
402 
403             @Override
404             public Channel next() {
405                 if (isDone) {
406                     throw new NoSuchElementException();
407                 }
408                 Channel cur = next;
409                 if (cur == null) {
410                     cur = nextOrDone();
411                     if (cur == null) {
412                         throw new NoSuchElementException();
413                     }
414                 }
415                 next = nextOrDone();
416                 return cur;
417             }
418 
419             @Override
420             public void remove() {
421                 throw new UnsupportedOperationException("remove");
422             }
423 
424             private Channel nextOrDone() {
425                 Iterator<SelectionKey> it = selectionKeyIterator;
426                 while (it.hasNext()) {
427                     SelectionKey key = it.next();
428                     if (key.isValid()) {
429                         Object attachment = key.attachment();
430                         if (attachment instanceof AbstractNioChannel) {
431                             return (AbstractNioChannel) attachment;
432                         }
433                     }
434                 }
435                 isDone = true;
436                 return null;
437             }
438         };
439     }
440 
441     private void rebuildSelector0() {
442         final Selector oldSelector = selector;
443         final SelectorTuple newSelectorTuple;
444 
445         if (oldSelector == null) {
446             return;
447         }
448 
449         try {
450             newSelectorTuple = openSelector();
451         } catch (Exception e) {
452             logger.warn("Failed to create a new Selector.", e);
453             return;
454         }
455 
456         // Register all channels to the new Selector.
457         int nChannels = 0;
458         for (SelectionKey key: oldSelector.keys()) {
459             Object a = key.attachment();
460             try {
461                 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
462                     continue;
463                 }
464 
465                 int interestOps = key.interestOps();
466                 key.cancel();
467                 SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
468                 if (a instanceof AbstractNioChannel) {
469                     // Update SelectionKey
470                     ((AbstractNioChannel) a).selectionKey = newKey;
471                 }
472                 nChannels ++;
473             } catch (Exception e) {
474                 logger.warn("Failed to re-register a Channel to the new Selector.", e);
475                 if (a instanceof AbstractNioChannel) {
476                     AbstractNioChannel ch = (AbstractNioChannel) a;
477                     ch.unsafe().close(ch.unsafe().voidPromise());
478                 } else {
479                     @SuppressWarnings("unchecked")
480                     NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
481                     invokeChannelUnregistered(task, key, e);
482                 }
483             }
484         }
485 
486         selector = newSelectorTuple.selector;
487         unwrappedSelector = newSelectorTuple.unwrappedSelector;
488 
489         try {
490             // time to close the old selector as everything else is registered to the new one
491             oldSelector.close();
492         } catch (Throwable t) {
493             if (logger.isWarnEnabled()) {
494                 logger.warn("Failed to close the old Selector.", t);
495             }
496         }
497 
498         if (logger.isInfoEnabled()) {
499             logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
500         }
501     }
502 
503     @Override
504     protected void run() {
505         int selectCnt = 0;
506         for (;;) {
507             try {
508                 int strategy;
509                 try {
510                     strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
511                     switch (strategy) {
512                     case SelectStrategy.CONTINUE:
513                         continue;
514 
515                     case SelectStrategy.BUSY_WAIT:
516                         // fall-through to SELECT since the busy-wait is not supported with NIO
517 
518                     case SelectStrategy.SELECT:
519                         long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
520                         if (curDeadlineNanos == -1L) {
521                             curDeadlineNanos = NONE; // nothing on the calendar
522                         }
523                         nextWakeupNanos.set(curDeadlineNanos);
524                         try {
525                             if (!hasTasks()) {
526                                 strategy = select(curDeadlineNanos);
527                             }
528                         } finally {
529                             // This update is just to help block unnecessary selector wakeups
530                             // so use of lazySet is ok (no race condition)
531                             nextWakeupNanos.lazySet(AWAKE);
532                         }
533                         // fall through
534                     default:
535                     }
536                 } catch (IOException e) {
537                     // If we receive an IOException here its because the Selector is messed up. Let's rebuild
538                     // the selector and retry. https://github.com/netty/netty/issues/8566
539                     rebuildSelector0();
540                     selectCnt = 0;
541                     handleLoopException(e);
542                     continue;
543                 }
544 
545                 selectCnt++;
546                 cancelledKeys = 0;
547                 needsToSelectAgain = false;
548                 final int ioRatio = this.ioRatio;
549                 boolean ranTasks;
550                 if (ioRatio == 100) {
551                     try {
552                         if (strategy > 0) {
553                             processSelectedKeys();
554                         }
555                     } finally {
556                         // Ensure we always run tasks.
557                         ranTasks = runAllTasks();
558                     }
559                 } else if (strategy > 0) {
560                     final long ioStartTime = System.nanoTime();
561                     try {
562                         processSelectedKeys();
563                     } finally {
564                         // Ensure we always run tasks.
565                         final long ioTime = System.nanoTime() - ioStartTime;
566                         ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
567                     }
568                 } else {
569                     ranTasks = runAllTasks(0); // This will run the minimum number of tasks
570                 }
571 
572                 if (selectReturnPrematurely(selectCnt, ranTasks, strategy)) {
573                     selectCnt = 0;
574                 } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
575                     selectCnt = 0;
576                 }
577             } catch (CancelledKeyException e) {
578                 // Harmless exception - log anyway
579                 if (logger.isDebugEnabled()) {
580                     logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
581                             selector, e);
582                 }
583             } catch (Error e) {
584                 throw e;
585             } catch (Throwable t) {
586                 handleLoopException(t);
587             } finally {
588                 // Always handle shutdown even if the loop processing threw an exception.
589                 try {
590                     if (isShuttingDown()) {
591                         closeAll();
592                         if (confirmShutdown()) {
593                             return;
594                         }
595                     }
596                 } catch (Error e) {
597                     throw e;
598                 } catch (Throwable t) {
599                     handleLoopException(t);
600                 }
601             }
602         }
603     }
604 
605     // returns true if selectCnt should be reset
606     private boolean selectReturnPrematurely(int selectCnt, boolean ranTasks, int strategy) {
607         if (ranTasks || strategy > 0) {
608             if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
609                 logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
610                     selectCnt - 1, selector);
611             }
612             return true;
613         }
614         return false;
615     }
616 
617     // returns true if selectCnt should be reset
618     private boolean unexpectedSelectorWakeup(int selectCnt) {
619         if (Thread.interrupted()) {
620             // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
621             // As this is most likely a bug in the handler of the user or it's client library we will
622             // also log it.
623             //
624             // See https://github.com/netty/netty/issues/2426
625             if (logger.isDebugEnabled()) {
626                 logger.debug("Selector.select() returned prematurely because " +
627                         "Thread.currentThread().interrupt() was called. Use " +
628                         "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
629             }
630             return true;
631         }
632         if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
633                 selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
634             // The selector returned prematurely many times in a row.
635             // Rebuild the selector to work around the problem.
636             logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
637                     selectCnt, selector);
638             rebuildSelector();
639             return true;
640         }
641         return false;
642     }
643 
644     private static void handleLoopException(Throwable t) {
645         logger.warn("Unexpected exception in the selector loop.", t);
646 
647         // Prevent possible consecutive immediate failures that lead to
648         // excessive CPU consumption.
649         try {
650             Thread.sleep(1000);
651         } catch (InterruptedException e) {
652             // Ignore.
653         }
654     }
655 
656     private void processSelectedKeys() {
657         if (selectedKeys != null) {
658             processSelectedKeysOptimized();
659         } else {
660             processSelectedKeysPlain(selector.selectedKeys());
661         }
662     }
663 
664     @Override
665     protected void cleanup() {
666         try {
667             selector.close();
668         } catch (IOException e) {
669             logger.warn("Failed to close a selector.", e);
670         }
671     }
672 
673     void cancel(SelectionKey key) {
674         key.cancel();
675         cancelledKeys ++;
676         if (cancelledKeys >= CLEANUP_INTERVAL) {
677             cancelledKeys = 0;
678             needsToSelectAgain = true;
679         }
680     }
681 
682     private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
683         // check if the set is empty and if so just return to not create garbage by
684         // creating a new Iterator every time even if there is nothing to process.
685         // See https://github.com/netty/netty/issues/597
686         if (selectedKeys.isEmpty()) {
687             return;
688         }
689 
690         Iterator<SelectionKey> i = selectedKeys.iterator();
691         for (;;) {
692             final SelectionKey k = i.next();
693             final Object a = k.attachment();
694             i.remove();
695 
696             if (a instanceof AbstractNioChannel) {
697                 processSelectedKey(k, (AbstractNioChannel) a);
698             } else {
699                 @SuppressWarnings("unchecked")
700                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
701                 processSelectedKey(k, task);
702             }
703 
704             if (!i.hasNext()) {
705                 break;
706             }
707 
708             if (needsToSelectAgain) {
709                 selectAgain();
710                 selectedKeys = selector.selectedKeys();
711 
712                 // Create the iterator again to avoid ConcurrentModificationException
713                 if (selectedKeys.isEmpty()) {
714                     break;
715                 } else {
716                     i = selectedKeys.iterator();
717                 }
718             }
719         }
720     }
721 
722     private void processSelectedKeysOptimized() {
723         for (int i = 0; i < selectedKeys.size; ++i) {
724             final SelectionKey k = selectedKeys.keys[i];
725             // null out entry in the array to allow to have it GC'ed once the Channel close
726             // See https://github.com/netty/netty/issues/2363
727             selectedKeys.keys[i] = null;
728 
729             final Object a = k.attachment();
730 
731             if (a instanceof AbstractNioChannel) {
732                 processSelectedKey(k, (AbstractNioChannel) a);
733             } else {
734                 @SuppressWarnings("unchecked")
735                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
736                 processSelectedKey(k, task);
737             }
738 
739             if (needsToSelectAgain) {
740                 // null out entries in the array to allow to have it GC'ed once the Channel close
741                 // See https://github.com/netty/netty/issues/2363
742                 selectedKeys.reset(i + 1);
743 
744                 selectAgain();
745                 i = -1;
746             }
747         }
748     }
749 
750     private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
751         final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
752         if (!k.isValid()) {
753             final EventLoop eventLoop;
754             try {
755                 eventLoop = ch.eventLoop();
756             } catch (Throwable ignored) {
757                 // If the channel implementation throws an exception because there is no event loop, we ignore this
758                 // because we are only trying to determine if ch is registered to this event loop and thus has authority
759                 // to close ch.
760                 return;
761             }
762             // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
763             // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
764             // still healthy and should not be closed.
765             // See https://github.com/netty/netty/issues/5125
766             if (eventLoop == this) {
767                 // close the channel if the key is not valid anymore
768                 unsafe.close(unsafe.voidPromise());
769             }
770             return;
771         }
772 
773         try {
774             int readyOps = k.readyOps();
775             // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
776             // the NIO JDK channel implementation may throw a NotYetConnectedException.
777             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
778                 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
779                 // See https://github.com/netty/netty/issues/924
780                 int ops = k.interestOps();
781                 ops &= ~SelectionKey.OP_CONNECT;
782                 k.interestOps(ops);
783 
784                 unsafe.finishConnect();
785             }
786 
787             // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
788             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
789                 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
790                unsafe.forceFlush();
791             }
792 
793             // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
794             // to a spin loop
795             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
796                 unsafe.read();
797             }
798         } catch (CancelledKeyException ignored) {
799             unsafe.close(unsafe.voidPromise());
800         }
801     }
802 
803     private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
804         int state = 0;
805         try {
806             task.channelReady(k.channel(), k);
807             state = 1;
808         } catch (Exception e) {
809             k.cancel();
810             invokeChannelUnregistered(task, k, e);
811             state = 2;
812         } finally {
813             switch (state) {
814             case 0:
815                 k.cancel();
816                 invokeChannelUnregistered(task, k, null);
817                 break;
818             case 1:
819                 if (!k.isValid()) { // Cancelled by channelReady()
820                     invokeChannelUnregistered(task, k, null);
821                 }
822                 break;
823             default:
824                  break;
825             }
826         }
827     }
828 
829     private void closeAll() {
830         selectAgain();
831         Set<SelectionKey> keys = selector.keys();
832         Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
833         for (SelectionKey k: keys) {
834             Object a = k.attachment();
835             if (a instanceof AbstractNioChannel) {
836                 channels.add((AbstractNioChannel) a);
837             } else {
838                 k.cancel();
839                 @SuppressWarnings("unchecked")
840                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
841                 invokeChannelUnregistered(task, k, null);
842             }
843         }
844 
845         for (AbstractNioChannel ch: channels) {
846             ch.unsafe().close(ch.unsafe().voidPromise());
847         }
848     }
849 
850     private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause) {
851         try {
852             task.channelUnregistered(k.channel(), cause);
853         } catch (Exception e) {
854             logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e);
855         }
856     }
857 
858     @Override
859     protected void wakeup(boolean inEventLoop) {
860         if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
861             selector.wakeup();
862         }
863     }
864 
865     @Override
866     protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
867         // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
868         return deadlineNanos < nextWakeupNanos.get();
869     }
870 
871     @Override
872     protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
873         // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
874         return deadlineNanos < nextWakeupNanos.get();
875     }
876 
877     Selector unwrappedSelector() {
878         return unwrappedSelector;
879     }
880 
881     int selectNow() throws IOException {
882         return selector.selectNow();
883     }
884 
885     private int select(long deadlineNanos) throws IOException {
886         if (deadlineNanos == NONE) {
887             return selector.select();
888         }
889         // Timeout will only be 0 if deadline is within 5 microsecs
890         long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
891         return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
892     }
893 
894     private void selectAgain() {
895         needsToSelectAgain = false;
896         try {
897             selector.selectNow();
898         } catch (Throwable t) {
899             logger.warn("Failed to update SelectionKeys.", t);
900         }
901     }
902 }