View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.nio;
17  
18  import io.netty5.channel.Channel;
19  import io.netty5.channel.ChannelException;
20  import io.netty5.channel.DefaultSelectStrategyFactory;
21  import io.netty5.channel.IoExecutionContext;
22  import io.netty5.channel.IoHandle;
23  import io.netty5.channel.IoHandler;
24  import io.netty5.channel.IoHandlerFactory;
25  import io.netty5.channel.SelectStrategy;
26  import io.netty5.channel.SelectStrategyFactory;
27  import io.netty5.util.internal.PlatformDependent;
28  import io.netty5.util.internal.ReflectionUtil;
29  import io.netty5.util.internal.StringUtil;
30  import io.netty5.util.internal.SystemPropertyUtil;
31  import io.netty5.util.internal.logging.InternalLogger;
32  import io.netty5.util.internal.logging.InternalLoggerFactory;
33  
34  import java.io.IOException;
35  import java.io.UncheckedIOException;
36  import java.lang.reflect.Field;
37  import java.nio.channels.CancelledKeyException;
38  import java.nio.channels.SelectionKey;
39  import java.nio.channels.Selector;
40  import java.nio.channels.spi.SelectorProvider;
41  import java.security.AccessController;
42  import java.security.PrivilegedAction;
43  import java.util.ArrayList;
44  import java.util.Collection;
45  import java.util.Iterator;
46  import java.util.Set;
47  import java.util.concurrent.TimeUnit;
48  import java.util.concurrent.atomic.AtomicBoolean;
49  import java.util.function.IntSupplier;
50  
51  import static java.util.Objects.requireNonNull;
52  
53  /**
54   * {@link IoHandler} implementation which register the {@link Channel}'s to a
55   * {@link Selector} and so does the multi-plexing of these in the event loop.
56   *
57   */
58  public final class NioHandler implements IoHandler {
59  
60      private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioHandler.class);
61  
62      private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
63  
64      private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
65              SystemPropertyUtil.getBoolean("io.netty5.noKeySetOptimization", false);
66  
67      private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
68      private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
69  
70      private final IntSupplier selectNowSupplier = () -> {
71          try {
72              return selectNow();
73          } catch (IOException e) {
74              throw new UncheckedIOException(e);
75          }
76      };
77  
78      static {
79          int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty5.selectorAutoRebuildThreshold", 512);
80          if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
81              selectorAutoRebuildThreshold = 0;
82          }
83  
84          SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
85  
86          if (logger.isDebugEnabled()) {
87              logger.debug("-Dio.netty5.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
88              logger.debug("-Dio.netty5.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
89          }
90      }
91  
92      /**
93       * The NIO {@link Selector}.
94       */
95      private Selector selector;
96      private Selector unwrappedSelector;
97      private SelectedSelectionKeySet selectedKeys;
98  
99      private final SelectorProvider provider;
100 
101     /**
102      * Boolean that controls determines if a blocked Selector.select should
103      * break out of its selection process. In our case we use a timeout for
104      * the select method and the select method will block for that time unless
105      * waken up.
106      */
107     private final AtomicBoolean wakenUp = new AtomicBoolean();
108 
109     private final SelectStrategy selectStrategy;
110 
111     private int cancelledKeys;
112     private boolean needsToSelectAgain;
113 
114     private NioHandler() {
115         this(SelectorProvider.provider(), DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy());
116     }
117 
118     private NioHandler(SelectorProvider selectorProvider, SelectStrategy strategy) {
119         provider = selectorProvider;
120         final SelectorTuple selectorTuple = openSelector();
121         selector = selectorTuple.selector;
122         unwrappedSelector = selectorTuple.unwrappedSelector;
123         selectStrategy = strategy;
124     }
125 
126     /**
127      * Returns a new {@link IoHandlerFactory} that creates {@link NioHandler} instances.
128      */
129     public static IoHandlerFactory newFactory() {
130         return NioHandler::new;
131     }
132 
133     /**
134      * Returns a new {@link IoHandlerFactory} that creates {@link NioHandler} instances.
135      */
136     public static IoHandlerFactory newFactory(final SelectorProvider selectorProvider,
137                                               final SelectStrategyFactory selectStrategyFactory) {
138         requireNonNull(selectorProvider, "selectorProvider");
139         requireNonNull(selectStrategyFactory, "selectStrategyFactory");
140         return () -> new NioHandler(selectorProvider, selectStrategyFactory.newSelectStrategy());
141     }
142 
143     private static final class SelectorTuple {
144         final Selector unwrappedSelector;
145         final Selector selector;
146 
147         SelectorTuple(Selector unwrappedSelector) {
148             this.unwrappedSelector = unwrappedSelector;
149             this.selector = unwrappedSelector;
150         }
151 
152         SelectorTuple(Selector unwrappedSelector, Selector selector) {
153             this.unwrappedSelector = unwrappedSelector;
154             this.selector = selector;
155         }
156     }
157 
158     private SelectorTuple openSelector() {
159         final Selector unwrappedSelector;
160         try {
161             unwrappedSelector = provider.openSelector();
162         } catch (IOException e) {
163             throw new ChannelException("failed to open a new selector", e);
164         }
165 
166         if (DISABLE_KEY_SET_OPTIMIZATION) {
167             return new SelectorTuple(unwrappedSelector);
168         }
169 
170         Object maybeSelectorImplClass = AccessController.doPrivileged((PrivilegedAction<Object>) () -> {
171             try {
172                 return Class.forName(
173                         "sun.nio.ch.SelectorImpl",
174                         false,
175                         PlatformDependent.getSystemClassLoader());
176             } catch (Throwable cause) {
177                 return cause;
178             }
179         });
180 
181         if (!(maybeSelectorImplClass instanceof Class) ||
182             // ensure the current selector implementation is what we can instrument.
183             !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
184             if (maybeSelectorImplClass instanceof Throwable) {
185                 Throwable t = (Throwable) maybeSelectorImplClass;
186                 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
187             }
188             return new SelectorTuple(unwrappedSelector);
189         }
190 
191         final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
192         final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
193 
194         Object maybeException = AccessController.doPrivileged((PrivilegedAction<Object>) () -> {
195             try {
196                 Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
197                 Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
198 
199                 if (PlatformDependent.hasUnsafe()) {
200                     // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
201                     // This allows us to also do this in Java9+ without any extra flags.
202                     long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
203                     long publicSelectedKeysFieldOffset =
204                             PlatformDependent.objectFieldOffset(publicSelectedKeysField);
205 
206                     if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
207                         PlatformDependent.putObject(
208                                 unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
209                         PlatformDependent.putObject(
210                                 unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
211                         return null;
212                     }
213                     // We could not retrieve the offset, lets try reflection as last-resort.
214                 }
215 
216                 Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
217                 if (cause != null) {
218                     return cause;
219                 }
220                 cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
221                 if (cause != null) {
222                     return cause;
223                 }
224 
225                 selectedKeysField.set(unwrappedSelector, selectedKeySet);
226                 publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
227                 return null;
228             } catch (NoSuchFieldException | IllegalAccessException e) {
229                 return e;
230             }
231         });
232 
233         if (maybeException instanceof Exception) {
234             selectedKeys = null;
235             Exception e = (Exception) maybeException;
236             logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
237             return new SelectorTuple(unwrappedSelector);
238         }
239         selectedKeys = selectedKeySet;
240         logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
241         return new SelectorTuple(unwrappedSelector,
242                                  new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
243     }
244 
245     /**
246      * Returns the {@link SelectorProvider} used by this {@link NioHandler} to obtain the {@link Selector}.
247      */
248     public SelectorProvider selectorProvider() {
249         return provider;
250     }
251 
252     /**
253      * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
254      * around the infamous epoll 100% CPU bug.
255      */
256     void rebuildSelector() {
257         final Selector oldSelector = selector;
258         final SelectorTuple newSelectorTuple;
259 
260         if (oldSelector == null) {
261             return;
262         }
263 
264         try {
265             newSelectorTuple = openSelector();
266         } catch (Exception e) {
267             logger.warn("Failed to create a new Selector.", e);
268             return;
269         }
270 
271         // Register all channels to the new Selector.
272         int nChannels = 0;
273         for (SelectionKey key: oldSelector.keys()) {
274             NioProcessor handle = (NioProcessor) key.attachment();
275             try {
276                 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
277                     continue;
278                 }
279                 handle.register(newSelectorTuple.unwrappedSelector);
280                 nChannels ++;
281             } catch (Exception e) {
282                 logger.warn("Failed to re-register a NioHandle to the new Selector.", e);
283                 handle.close();
284             }
285         }
286 
287         selector = newSelectorTuple.selector;
288         unwrappedSelector = newSelectorTuple.unwrappedSelector;
289 
290         try {
291             // time to close the old selector as everything else is registered to the new one
292             oldSelector.close();
293         } catch (Throwable t) {
294             if (logger.isWarnEnabled()) {
295                 logger.warn("Failed to close the old Selector.", t);
296             }
297         }
298 
299         if (logger.isInfoEnabled()) {
300             logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
301         }
302     }
303 
304     private static NioProcessor nioHandle(IoHandle handle) {
305         if (handle instanceof AbstractNioChannel) {
306             return ((AbstractNioChannel<?, ?, ?>) handle).nioProcessor();
307         }
308         if (handle instanceof NioSelectableChannelHandle) {
309             return ((NioSelectableChannelHandle<?>) handle).nioProcessor();
310         }
311         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
312     }
313 
314     @Override
315     public void register(IoHandle handle) throws Exception {
316         NioProcessor nioProcessor = nioHandle(handle);
317         boolean selected = false;
318         for (;;) {
319             try {
320                 nioProcessor.register(unwrappedSelector());
321                 return;
322             } catch (CancelledKeyException e) {
323                 if (!selected) {
324                     // Force the Selector to select now as the "canceled" SelectionKey may still be
325                     // cached and not removed because no Select.select(..) operation was called yet.
326                     selectNow();
327                     selected = true;
328                 } else {
329                     // We forced a select operation on the selector before but the SelectionKey is still cached
330                     // for whatever reason. JDK bug ?
331                     throw e;
332                 }
333             }
334         }
335     }
336 
337     @Override
338     public void deregister(IoHandle handle) {
339         NioProcessor nioProcessor = nioHandle(handle);
340         nioProcessor.deregister();
341         cancelledKeys ++;
342         if (cancelledKeys >= CLEANUP_INTERVAL) {
343             cancelledKeys = 0;
344             needsToSelectAgain = true;
345         }
346     }
347 
348     @Override
349     public int run(IoExecutionContext runner) {
350         int handled = 0;
351         try {
352             try {
353                 switch (selectStrategy.calculateStrategy(selectNowSupplier, !runner.canBlock())) {
354                     case SelectStrategy.CONTINUE:
355                         return 0;
356 
357                     case SelectStrategy.BUSY_WAIT:
358                         // fall-through to SELECT since the busy-wait is not supported with NIO
359 
360                     case SelectStrategy.SELECT:
361                         select(runner, wakenUp.getAndSet(false));
362 
363                         // 'wakenUp.compareAndSet(false, true)' is always evaluated
364                         // before calling 'selector.wakeup()' to reduce the wake-up
365                         // overhead. (Selector.wakeup() is an expensive operation.)
366                         //
367                         // However, there is a race condition in this approach.
368                         // The race condition is triggered when 'wakenUp' is set to
369                         // true too early.
370                         //
371                         // 'wakenUp' is set to true too early if:
372                         // 1) Selector is waken up between 'wakenUp.set(false)' and
373                         //    'selector.select(...)'. (BAD)
374                         // 2) Selector is waken up between 'selector.select(...)' and
375                         //    'if (wakenUp.get()) { ... }'. (OK)
376                         //
377                         // In the first case, 'wakenUp' is set to true and the
378                         // following 'selector.select(...)' will wake up immediately.
379                         // Until 'wakenUp' is set to false again in the next round,
380                         // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
381                         // any attempt to wake up the Selector will fail, too, causing
382                         // the following 'selector.select(...)' call to block
383                         // unnecessarily.
384                         //
385                         // To fix this problem, we wake up the selector again if wakenUp
386                         // is true immediately after selector.select(...).
387                         // It is inefficient in that it wakes up the selector for both
388                         // the first case (BAD - wake-up required) and the second case
389                         // (OK - no wake-up required).
390 
391                         if (wakenUp.get()) {
392                             selector.wakeup();
393                         }
394                         // fall through
395                     default:
396                 }
397             } catch (IOException e) {
398                 // If we receive an IOException here its because the Selector is messed up. Let's rebuild
399                 // the selector and retry. https://github.com/netty/netty/issues/8566
400                 rebuildSelector();
401                 handleLoopException(e);
402                 return 0;
403             }
404 
405             cancelledKeys = 0;
406             needsToSelectAgain = false;
407             handled = processSelectedKeys();
408         } catch (Error e) {
409             throw e;
410         } catch (Throwable t) {
411             handleLoopException(t);
412         }
413         return handled;
414     }
415 
416     private static void handleLoopException(Throwable t) {
417         logger.warn("Unexpected exception in the selector loop.", t);
418 
419         // Prevent possible consecutive immediate failures that lead to
420         // excessive CPU consumption.
421         try {
422             Thread.sleep(1000);
423         } catch (InterruptedException e) {
424             // Ignore.
425         }
426     }
427 
428     private int processSelectedKeys() {
429         if (selectedKeys != null) {
430             return processSelectedKeysOptimized();
431         } else {
432             return processSelectedKeysPlain(selector.selectedKeys());
433         }
434     }
435 
436     @Override
437     public void destroy() {
438         try {
439             selector.close();
440         } catch (IOException e) {
441             logger.warn("Failed to close a selector.", e);
442         }
443     }
444 
445     private int processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
446         // check if the set is empty and if so just return to not create garbage by
447         // creating a new Iterator every time even if there is nothing to process.
448         // See https://github.com/netty/netty/issues/597
449         if (selectedKeys.isEmpty()) {
450             return 0;
451         }
452 
453         Iterator<SelectionKey> i = selectedKeys.iterator();
454         int handled = 0;
455         for (;;) {
456             final SelectionKey k = i.next();
457             i.remove();
458 
459             processSelectedKey(k);
460             ++handled;
461 
462             if (!i.hasNext()) {
463                 break;
464             }
465 
466             if (needsToSelectAgain) {
467                 selectAgain();
468                 selectedKeys = selector.selectedKeys();
469 
470                 // Create the iterator again to avoid ConcurrentModificationException
471                 if (selectedKeys.isEmpty()) {
472                     break;
473                 } else {
474                     i = selectedKeys.iterator();
475                 }
476             }
477         }
478         return handled;
479     }
480 
481     private int processSelectedKeysOptimized() {
482         int handled = 0;
483         for (int i = 0; i < selectedKeys.size; ++i) {
484             final SelectionKey k = selectedKeys.keys[i];
485             // null out entry in the array to allow to have it GC'ed once the Channel close
486             // See https://github.com/netty/netty/issues/2363
487             selectedKeys.keys[i] = null;
488 
489             processSelectedKey(k);
490             ++handled;
491 
492             if (needsToSelectAgain) {
493                 // null out entries in the array to allow to have it GC'ed once the Channel close
494                 // See https://github.com/netty/netty/issues/2363
495                 selectedKeys.reset(i + 1);
496 
497                 selectAgain();
498                 i = -1;
499             }
500         }
501         return handled;
502     }
503 
504     private void processSelectedKey(SelectionKey k) {
505         final NioProcessor handle = (NioProcessor) k.attachment();
506         handle.handle(k);
507     }
508 
509     @Override
510     public void prepareToDestroy() {
511         selectAgain();
512         Set<SelectionKey> keys = selector.keys();
513         Collection<NioProcessor> handles = new ArrayList<>(keys.size());
514         for (SelectionKey k: keys) {
515             NioProcessor handle = (NioProcessor) k.attachment();
516             handles.add(handle);
517         }
518 
519         for (NioProcessor h: handles) {
520             h.close();
521         }
522     }
523 
524     @Override
525     public void wakeup(boolean inEventLoop) {
526         if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
527             selector.wakeup();
528         }
529     }
530 
531     @Override
532     public boolean isCompatible(Class<? extends IoHandle> handleType) {
533         return AbstractNioChannel.class.isAssignableFrom(handleType);
534     }
535 
536     Selector unwrappedSelector() {
537         return unwrappedSelector;
538     }
539 
540     private int selectNow() throws IOException {
541         try {
542             return selector.selectNow();
543         } finally {
544             // restore wakeup state if needed
545             if (wakenUp.get()) {
546                 selector.wakeup();
547             }
548         }
549     }
550 
551     private void select(IoExecutionContext runner, boolean oldWakenUp) throws IOException {
552         Selector selector = this.selector;
553         try {
554             int selectCnt = 0;
555             long currentTimeNanos = System.nanoTime();
556             long selectDeadLineNanos = currentTimeNanos + runner.delayNanos(currentTimeNanos);
557 
558             for (;;) {
559                 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
560                 if (timeoutMillis <= 0) {
561                     if (selectCnt == 0) {
562                         selector.selectNow();
563                         selectCnt = 1;
564                     }
565                     break;
566                 }
567 
568                 // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
569                 // Selector#wakeup. So we need to check task queue again before executing select operation.
570                 // If we don't, the task might be pended until select operation was timed out.
571                 // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
572                 if (!runner.canBlock() && wakenUp.compareAndSet(false, true)) {
573                     selector.selectNow();
574                     selectCnt = 1;
575                     break;
576                 }
577 
578                 int selectedKeys = selector.select(timeoutMillis);
579                 selectCnt ++;
580 
581                 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || !runner.canBlock()) {
582                     // - Selected something,
583                     // - waken up by user, or
584                     // - the task queue has a pending task.
585                     // - a scheduled task is ready for processing
586                     break;
587                 }
588                 if (Thread.interrupted()) {
589                     // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
590                     // As this is most likely a bug in the handler of the user or it's client library we will
591                     // also log it.
592                     //
593                     // See https://github.com/netty/netty/issues/2426
594                     if (logger.isDebugEnabled()) {
595                         logger.debug("Selector.select() returned prematurely because " +
596                                 "Thread.currentThread().interrupt() was called. Use " +
597                                 "NioHandler.shutdownGracefully() to shutdown the NioHandler.");
598                     }
599                     selectCnt = 1;
600                     break;
601                 }
602 
603                 long time = System.nanoTime();
604                 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
605                     // timeoutMillis elapsed without anything selected.
606                     selectCnt = 1;
607                 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
608                         selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
609                     // The code exists in an extra method to ensure the method is not too big to inline as this
610                     // branch is not very likely to get hit very frequently.
611                     selector = selectRebuildSelector(selectCnt);
612                     selectCnt = 1;
613                     break;
614                 }
615 
616                 currentTimeNanos = time;
617             }
618 
619             if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
620                 if (logger.isDebugEnabled()) {
621                     logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
622                             selectCnt - 1, selector);
623                 }
624             }
625         } catch (CancelledKeyException e) {
626             if (logger.isDebugEnabled()) {
627                 logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
628                         selector, e);
629             }
630             // Harmless exception - log anyway
631         }
632     }
633 
634     private Selector selectRebuildSelector(int selectCnt) throws IOException {
635         // The selector returned prematurely many times in a row.
636         // Rebuild the selector to work around the problem.
637         logger.warn(
638                 "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
639                 selectCnt, selector);
640 
641         rebuildSelector();
642         Selector selector = this.selector;
643 
644         // Select again to populate selectedKeys.
645         selector.selectNow();
646         return selector;
647     }
648 
649     private void selectAgain() {
650         needsToSelectAgain = false;
651         try {
652             selector.selectNow();
653         } catch (Throwable t) {
654             logger.warn("Failed to update SelectionKeys.", t);
655         }
656     }
657 }