View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelException;
21  import io.netty.channel.EventLoopException;
22  import io.netty.channel.SingleThreadEventLoop;
23  import io.netty.util.internal.PlatformDependent;
24  import io.netty.util.internal.SystemPropertyUtil;
25  import io.netty.util.internal.logging.InternalLogger;
26  import io.netty.util.internal.logging.InternalLoggerFactory;
27  
28  import java.io.IOException;
29  import java.lang.reflect.Field;
30  import java.nio.channels.CancelledKeyException;
31  import java.nio.channels.SelectableChannel;
32  import java.nio.channels.SelectionKey;
33  import java.nio.channels.Selector;
34  import java.nio.channels.spi.SelectorProvider;
35  import java.util.ArrayList;
36  import java.util.Collection;
37  import java.util.ConcurrentModificationException;
38  import java.util.Iterator;
39  import java.util.Queue;
40  import java.util.Set;
41  import java.util.concurrent.Executor;
42  import java.util.concurrent.TimeUnit;
43  import java.util.concurrent.atomic.AtomicBoolean;
44  
45  /**
46   * A {@link SingleThreadEventLoop} implementation which registers each {@link Channel} with a
47   * NIO {@link Selector} and performs the multiplexing of these in the event loop.
48   */
49  public final class NioEventLoop extends SingleThreadEventLoop {
50  
51      private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
52  
53      private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
54  
55      private static final boolean DISABLE_KEYSET_OPTIMIZATION =
56              SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
57  
58      private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
59      private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
60  
61      // Workaround for JDK NIO bug.
62      //
63      // See:
64      // - http://bugs.sun.com/view_bug.do?bug_id=6427854
65      // - https://github.com/netty/netty/issues/203
66      static {
67          String key = "sun.nio.ch.bugLevel";
68          try {
69              String buglevel = SystemPropertyUtil.get(key);
70              if (buglevel == null) {
71                  System.setProperty(key, "");
72              }
73          } catch (SecurityException e) {
74              if (logger.isDebugEnabled()) {
75                  logger.debug("Unable to get/set System Property: {}", key, e);
76              }
77          }
78  
79          int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
80          if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
81              selectorAutoRebuildThreshold = 0;
82          }
83  
84          SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
85  
86          if (logger.isDebugEnabled()) {
87              logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEYSET_OPTIMIZATION);
88              logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
89          }
90      }
91  
92      /**
93       * The NIO {@link Selector}.
94       */
95      Selector selector;
96      private SelectedSelectionKeySet selectedKeys;
97  
98      private final SelectorProvider provider;
99  
100     /**
101      * Boolean that controls determines if a blocked Selector.select should
102      * break out of its selection process. In our case we use a timeout for
103      * the select method and the select method will block for that time unless
104      * waken up.
105      */
106     private final AtomicBoolean wakenUp = new AtomicBoolean();
107 
108     private volatile int ioRatio = 50;
109     private int cancelledKeys;
110     private boolean needsToSelectAgain;
111 
112     NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider) {
113         super(parent, executor, false);
114         if (selectorProvider == null) {
115             throw new NullPointerException("selectorProvider");
116         }
117         provider = selectorProvider;
118         selector = openSelector();
119     }
120 
121     private Selector openSelector() {
122         final Selector selector;
123         try {
124             selector = provider.openSelector();
125         } catch (IOException e) {
126             throw new ChannelException("failed to open a new selector", e);
127         }
128 
129         if (DISABLE_KEYSET_OPTIMIZATION) {
130             return selector;
131         }
132 
133         try {
134             SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
135 
136             Class<?> selectorImplClass =
137                     Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
138 
139             // Ensure the current selector implementation is what we can instrument.
140             if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
141                 return selector;
142             }
143 
144             Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
145             Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
146 
147             selectedKeysField.setAccessible(true);
148             publicSelectedKeysField.setAccessible(true);
149 
150             selectedKeysField.set(selector, selectedKeySet);
151             publicSelectedKeysField.set(selector, selectedKeySet);
152 
153             selectedKeys = selectedKeySet;
154             logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
155         } catch (Throwable t) {
156             selectedKeys = null;
157             logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
158         }
159 
160         return selector;
161     }
162 
163     @Override
164     protected Queue<Runnable> newTaskQueue() {
165         // This event loop never calls takeTask()
166         return PlatformDependent.newMpscQueue();
167     }
168 
169     /**
170      * Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
171      * of this event loop.  Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
172      * be executed by this event loop when the {@link SelectableChannel} is ready.
173      */
174     public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
175         if (ch == null) {
176             throw new NullPointerException("ch");
177         }
178         if (interestOps == 0) {
179             throw new IllegalArgumentException("interestOps must be non-zero.");
180         }
181         if ((interestOps & ~ch.validOps()) != 0) {
182             throw new IllegalArgumentException(
183                     "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
184         }
185         if (task == null) {
186             throw new NullPointerException("task");
187         }
188 
189         if (isShutdown()) {
190             throw new IllegalStateException("event loop shut down");
191         }
192 
193         try {
194             ch.register(selector, interestOps, task);
195         } catch (Exception e) {
196             throw new EventLoopException("failed to register a channel", e);
197         }
198     }
199 
200     /**
201      * Returns the percentage of the desired amount of time spent for I/O in the event loop.
202      */
203     public int getIoRatio() {
204         return ioRatio;
205     }
206 
207     /**
208      * Sets the percentage of the desired amount of time spent for I/O in the event loop.  The default value is
209      * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
210      */
211     public void setIoRatio(int ioRatio) {
212         if (ioRatio <= 0 || ioRatio > 100) {
213             throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
214         }
215         this.ioRatio = ioRatio;
216     }
217 
218     /**
219      * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
220      * around the infamous epoll 100% CPU bug.
221      */
222     public void rebuildSelector() {
223         if (!inEventLoop()) {
224             execute(new Runnable() {
225                 @Override
226                 public void run() {
227                     rebuildSelector();
228                 }
229             });
230             return;
231         }
232 
233         final Selector oldSelector = selector;
234         final Selector newSelector;
235 
236         if (oldSelector == null) {
237             return;
238         }
239 
240         try {
241             newSelector = openSelector();
242         } catch (Exception e) {
243             logger.warn("Failed to create a new Selector.", e);
244             return;
245         }
246 
247         // Register all channels to the new Selector.
248         int nChannels = 0;
249         for (;;) {
250             try {
251                 for (SelectionKey key: oldSelector.keys()) {
252                     Object a = key.attachment();
253                     try {
254                         if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
255                             continue;
256                         }
257 
258                         int interestOps = key.interestOps();
259                         key.cancel();
260                         SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
261                         if (a instanceof AbstractNioChannel) {
262                             // Update SelectionKey
263                             ((AbstractNioChannel) a).selectionKey = newKey;
264                         }
265                         nChannels ++;
266                     } catch (Exception e) {
267                         logger.warn("Failed to re-register a Channel to the new Selector.", e);
268                         if (a instanceof AbstractNioChannel) {
269                             AbstractNioChannel ch = (AbstractNioChannel) a;
270                             ch.unsafe().close(ch.unsafe().voidPromise());
271                         } else {
272                             @SuppressWarnings("unchecked")
273                             NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
274                             invokeChannelUnregistered(task, key, e);
275                         }
276                     }
277                 }
278             } catch (ConcurrentModificationException e) {
279                 // Probably due to concurrent modification of the key set.
280                 continue;
281             }
282 
283             break;
284         }
285 
286         selector = newSelector;
287 
288         try {
289             // time to close the old selector as everything else is registered to the new one
290             oldSelector.close();
291         } catch (Throwable t) {
292             if (logger.isWarnEnabled()) {
293                 logger.warn("Failed to close the old Selector.", t);
294             }
295         }
296 
297         logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
298     }
299 
300     @Override
301     protected void run() {
302         boolean oldWakenUp = wakenUp.getAndSet(false);
303         try {
304             if (hasTasks()) {
305                 selectNow();
306             } else {
307                 select(oldWakenUp);
308 
309                 // 'wakenUp.compareAndSet(false, true)' is always evaluated
310                 // before calling 'selector.wakeup()' to reduce the wake-up
311                 // overhead. (Selector.wakeup() is an expensive operation.)
312                 //
313                 // However, there is a race condition in this approach.
314                 // The race condition is triggered when 'wakenUp' is set to
315                 // true too early.
316                 //
317                 // 'wakenUp' is set to true too early if:
318                 // 1) Selector is waken up between 'wakenUp.set(false)' and
319                 //    'selector.select(...)'. (BAD)
320                 // 2) Selector is waken up between 'selector.select(...)' and
321                 //    'if (wakenUp.get()) { ... }'. (OK)
322                 //
323                 // In the first case, 'wakenUp' is set to true and the
324                 // following 'selector.select(...)' will wake up immediately.
325                 // Until 'wakenUp' is set to false again in the next round,
326                 // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
327                 // any attempt to wake up the Selector will fail, too, causing
328                 // the following 'selector.select(...)' call to block
329                 // unnecessarily.
330                 //
331                 // To fix this problem, we wake up the selector again if wakenUp
332                 // is true immediately after selector.select(...).
333                 // It is inefficient in that it wakes up the selector for both
334                 // the first case (BAD - wake-up required) and the second case
335                 // (OK - no wake-up required).
336 
337                 if (wakenUp.get()) {
338                     selector.wakeup();
339                 }
340             }
341 
342             cancelledKeys = 0;
343             needsToSelectAgain = false;
344             final int ioRatio = this.ioRatio;
345             if (ioRatio == 100) {
346                 processSelectedKeys();
347                 runAllTasks();
348             } else {
349                 final long ioStartTime = System.nanoTime();
350 
351                 processSelectedKeys();
352 
353                 final long ioTime = System.nanoTime() - ioStartTime;
354                 runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
355             }
356 
357             if (isShuttingDown()) {
358                 closeAll();
359                 if (confirmShutdown()) {
360                     cleanupAndTerminate(true);
361                     return;
362                 }
363             }
364         } catch (Throwable t) {
365             logger.warn("Unexpected exception in the selector loop.", t);
366 
367             // TODO: After using a ForkJoinPool that is potentially shared with other software
368             // than Netty. The Thread.sleep might be problematic. Even though this is unlikely to ever
369             // happen anyways.
370 
371             // Prevent possible consecutive immediate failures that lead to
372             // excessive CPU consumption.
373             try {
374                 Thread.sleep(1000);
375             } catch (InterruptedException e) {
376                 // Ignore.
377             }
378         }
379 
380         scheduleExecution();
381     }
382 
383     private void processSelectedKeys() {
384         if (selectedKeys != null) {
385             processSelectedKeysOptimized(selectedKeys.flip());
386         } else {
387             processSelectedKeysPlain(selector.selectedKeys());
388         }
389     }
390 
391     @Override
392     protected void cleanup() {
393         try {
394             selector.close();
395         } catch (IOException e) {
396             logger.warn("Failed to close a selector.", e);
397         }
398     }
399 
400     void cancel(SelectionKey key) {
401         key.cancel();
402         cancelledKeys ++;
403         if (cancelledKeys >= CLEANUP_INTERVAL) {
404             cancelledKeys = 0;
405             needsToSelectAgain = true;
406         }
407     }
408 
409     @Override
410     protected Runnable pollTask() {
411         Runnable task = super.pollTask();
412         if (needsToSelectAgain) {
413             selectAgain();
414         }
415         return task;
416     }
417 
418     private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
419         // check if the set is empty and if so just return to not create garbage by
420         // creating a new Iterator every time even if there is nothing to process.
421         // See https://github.com/netty/netty/issues/597
422         if (selectedKeys.isEmpty()) {
423             return;
424         }
425 
426         Iterator<SelectionKey> i = selectedKeys.iterator();
427         for (;;) {
428             final SelectionKey k = i.next();
429             final Object a = k.attachment();
430             i.remove();
431 
432             if (a instanceof AbstractNioChannel) {
433                 processSelectedKey(k, (AbstractNioChannel) a);
434             } else {
435                 @SuppressWarnings("unchecked")
436                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
437                 processSelectedKey(k, task);
438             }
439 
440             if (!i.hasNext()) {
441                 break;
442             }
443 
444             if (needsToSelectAgain) {
445                 selectAgain();
446                 selectedKeys = selector.selectedKeys();
447 
448                 // Create the iterator again to avoid ConcurrentModificationException
449                 if (selectedKeys.isEmpty()) {
450                     break;
451                 } else {
452                     i = selectedKeys.iterator();
453                 }
454             }
455         }
456     }
457 
458     private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
459         for (int i = 0;; i ++) {
460             final SelectionKey k = selectedKeys[i];
461             if (k == null) {
462                 break;
463             }
464             // null out entry in the array to allow to have it GC'ed once the Channel close
465             // See https://github.com/netty/netty/issues/2363
466             selectedKeys[i] = null;
467 
468             final Object a = k.attachment();
469 
470             if (a instanceof AbstractNioChannel) {
471                 processSelectedKey(k, (AbstractNioChannel) a);
472             } else {
473                 @SuppressWarnings("unchecked")
474                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
475                 processSelectedKey(k, task);
476             }
477 
478             if (needsToSelectAgain) {
479                 // null out entries in the array to allow to have it GC'ed once the Channel close
480                 // See https://github.com/netty/netty/issues/2363
481                 for (;;) {
482                     if (selectedKeys[i] == null) {
483                         break;
484                     }
485                     selectedKeys[i] = null;
486                     i++;
487                 }
488 
489                 selectAgain();
490                 // Need to flip the optimized selectedKeys to get the right reference to the array
491                 // and reset the index to -1 which will then set to 0 on the for loop
492                 // to start over again.
493                 //
494                 // See https://github.com/netty/netty/issues/1523
495                 selectedKeys = this.selectedKeys.flip();
496                 i = -1;
497             }
498         }
499     }
500 
501     private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
502         final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
503         if (!k.isValid()) {
504             // close the channel if the key is not valid anymore
505             unsafe.close(unsafe.voidPromise());
506             return;
507         }
508 
509         try {
510             int readyOps = k.readyOps();
511             // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
512             // to a spin loop
513             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
514                 unsafe.read();
515                 if (!ch.isOpen()) {
516                     // Connection already closed - no need to handle write.
517                     return;
518                 }
519             }
520             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
521                 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
522                 ch.unsafe().forceFlush();
523             }
524             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
525                 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
526                 // See https://github.com/netty/netty/issues/924
527                 int ops = k.interestOps();
528                 ops &= ~SelectionKey.OP_CONNECT;
529                 k.interestOps(ops);
530 
531                 unsafe.finishConnect();
532             }
533         } catch (CancelledKeyException ignored) {
534             unsafe.close(unsafe.voidPromise());
535         }
536     }
537 
538     private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
539         int state = 0;
540         try {
541             task.channelReady(k.channel(), k);
542             state = 1;
543         } catch (Exception e) {
544             k.cancel();
545             invokeChannelUnregistered(task, k, e);
546             state = 2;
547         } finally {
548             switch (state) {
549             case 0:
550                 k.cancel();
551                 invokeChannelUnregistered(task, k, null);
552                 break;
553             case 1:
554                 if (!k.isValid()) { // Cancelled by channelReady()
555                     invokeChannelUnregistered(task, k, null);
556                 }
557                 break;
558             }
559         }
560     }
561 
562     private void closeAll() {
563         selectAgain();
564         Set<SelectionKey> keys = selector.keys();
565         Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
566         for (SelectionKey k: keys) {
567             Object a = k.attachment();
568             if (a instanceof AbstractNioChannel) {
569                 channels.add((AbstractNioChannel) a);
570             } else {
571                 k.cancel();
572                 @SuppressWarnings("unchecked")
573                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
574                 invokeChannelUnregistered(task, k, null);
575             }
576         }
577 
578         for (AbstractNioChannel ch: channels) {
579             ch.unsafe().close(ch.unsafe().voidPromise());
580         }
581     }
582 
583     private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause) {
584         try {
585             task.channelUnregistered(k.channel(), cause);
586         } catch (Exception e) {
587             logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e);
588         }
589     }
590 
591     @Override
592     protected void wakeup(boolean inEventLoop) {
593         if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
594             selector.wakeup();
595         }
596     }
597 
598     void selectNow() throws IOException {
599         try {
600             selector.selectNow();
601         } finally {
602             // restore wakup state if needed
603             if (wakenUp.get()) {
604                 selector.wakeup();
605             }
606         }
607     }
608 
609     private void select(boolean oldWakenUp) throws IOException {
610         Selector selector = this.selector;
611         try {
612             int selectCnt = 0;
613             long currentTimeNanos = System.nanoTime();
614             long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
615             for (;;) {
616                 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
617                 if (timeoutMillis <= 0) {
618                     if (selectCnt == 0) {
619                         selector.selectNow();
620                         selectCnt = 1;
621                     }
622                     break;
623                 }
624 
625                 int selectedKeys = selector.select(timeoutMillis);
626                 selectCnt ++;
627 
628                 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
629                     // - Selected something,
630                     // - waken up by user, or
631                     // - the task queue has a pending task.
632                     // - a scheduled task is ready for processing
633                     break;
634                 }
635                 if (Thread.interrupted()) {
636                     // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
637                     // As this is most likely a bug in the handler of the user or it's client library we will
638                     // also log it.
639                     //
640                     // See https://github.com/netty/netty/issues/2426
641                     if (logger.isDebugEnabled()) {
642                         logger.debug("Selector.select() returned prematurely because " +
643                                 "Thread.currentThread().interrupt() was called. Use " +
644                                 "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
645                     }
646                     selectCnt = 1;
647                     break;
648                 }
649 
650                 long time = System.nanoTime();
651                 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
652                     // timeoutMillis elapsed without anything selected.
653                     selectCnt = 1;
654                 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
655                         selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
656                     // The selector returned prematurely many times in a row.
657                     // Rebuild the selector to work around the problem.
658                     logger.warn(
659                             "Selector.select() returned prematurely {} times in a row; rebuilding selector.",
660                             selectCnt);
661 
662                     rebuildSelector();
663                     selector = this.selector;
664 
665                     // Select again to populate selectedKeys.
666                     selector.selectNow();
667                     selectCnt = 1;
668                     break;
669                 }
670 
671                 currentTimeNanos = time;
672             }
673 
674             if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
675                 if (logger.isDebugEnabled()) {
676                     logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
677                 }
678             }
679         } catch (CancelledKeyException e) {
680             if (logger.isDebugEnabled()) {
681                 logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
682             }
683             // Harmless exception - log anyway
684         }
685     }
686 
687     private void selectAgain() {
688         needsToSelectAgain = false;
689         try {
690             selector.selectNow();
691         } catch (Throwable t) {
692             logger.warn("Failed to update SelectionKeys.", t);
693         }
694     }
695 }