1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.channel.ChannelException;
19 import io.netty.channel.DefaultSelectStrategyFactory;
20 import io.netty.channel.IoHandlerContext;
21 import io.netty.channel.IoHandle;
22 import io.netty.channel.IoHandler;
23 import io.netty.channel.IoHandlerFactory;
24 import io.netty.channel.IoOps;
25 import io.netty.channel.IoRegistration;
26 import io.netty.channel.SelectStrategy;
27 import io.netty.channel.SelectStrategyFactory;
28 import io.netty.util.IntSupplier;
29 import io.netty.util.concurrent.ThreadAwareExecutor;
30 import io.netty.util.internal.ObjectUtil;
31 import io.netty.util.internal.PlatformDependent;
32 import io.netty.util.internal.ReflectionUtil;
33 import io.netty.util.internal.StringUtil;
34 import io.netty.util.internal.SystemPropertyUtil;
35 import io.netty.util.internal.logging.InternalLogger;
36 import io.netty.util.internal.logging.InternalLoggerFactory;
37
38 import java.io.IOException;
39 import java.lang.reflect.Field;
40 import java.nio.channels.CancelledKeyException;
41 import java.nio.channels.Selector;
42 import java.nio.channels.SelectionKey;
43
44 import java.nio.channels.spi.SelectorProvider;
45 import java.security.AccessController;
46 import java.security.PrivilegedAction;
47 import java.util.ArrayList;
48 import java.util.Collection;
49 import java.util.Iterator;
50 import java.util.Set;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.atomic.AtomicBoolean;
53
54
55
56
57 public final class NioIoHandler implements IoHandler {
58
59 private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioIoHandler.class);
60
61 private static final int CLEANUP_INTERVAL = 256;
62
63 private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
64 SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
65
66 private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
67 private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
68
69 private final IntSupplier selectNowSupplier = new IntSupplier() {
70 @Override
71 public int get() throws Exception {
72 return selectNow();
73 }
74 };
75
76
77
78
79
80
81
82 static {
83 int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
84 if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
85 selectorAutoRebuildThreshold = 0;
86 }
87
88 SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
89
90 if (logger.isDebugEnabled()) {
91 logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
92 logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
93 }
94 }
95
96
97
98
99 private Selector selector;
100 private Selector unwrappedSelector;
101 private SelectedSelectionKeySet selectedKeys;
102
103 private final SelectorProvider provider;
104
105
106
107
108
109
110
111 private final AtomicBoolean wakenUp = new AtomicBoolean();
112
113 private final SelectStrategy selectStrategy;
114 private final ThreadAwareExecutor executor;
115 private int cancelledKeys;
116 private boolean needsToSelectAgain;
117
118 private NioIoHandler(ThreadAwareExecutor executor, SelectorProvider selectorProvider,
119 SelectStrategy strategy) {
120 this.executor = ObjectUtil.checkNotNull(executor, "executionContext");
121 this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
122 this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
123 final SelectorTuple selectorTuple = openSelector();
124 this.selector = selectorTuple.selector;
125 this.unwrappedSelector = selectorTuple.unwrappedSelector;
126 }
127
128 private static final class SelectorTuple {
129 final Selector unwrappedSelector;
130 final Selector selector;
131
132 SelectorTuple(Selector unwrappedSelector) {
133 this.unwrappedSelector = unwrappedSelector;
134 this.selector = unwrappedSelector;
135 }
136
137 SelectorTuple(Selector unwrappedSelector, Selector selector) {
138 this.unwrappedSelector = unwrappedSelector;
139 this.selector = selector;
140 }
141 }
142
143 private SelectorTuple openSelector() {
144 final Selector unwrappedSelector;
145 try {
146 unwrappedSelector = provider.openSelector();
147 } catch (IOException e) {
148 throw new ChannelException("failed to open a new selector", e);
149 }
150
151 if (DISABLE_KEY_SET_OPTIMIZATION) {
152 return new SelectorTuple(unwrappedSelector);
153 }
154
155 Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
156 @Override
157 public Object run() {
158 try {
159 return Class.forName(
160 "sun.nio.ch.SelectorImpl",
161 false,
162 PlatformDependent.getSystemClassLoader());
163 } catch (Throwable cause) {
164 return cause;
165 }
166 }
167 });
168
169 if (!(maybeSelectorImplClass instanceof Class) ||
170
171 !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
172 if (maybeSelectorImplClass instanceof Throwable) {
173 Throwable t = (Throwable) maybeSelectorImplClass;
174 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
175 }
176 return new SelectorTuple(unwrappedSelector);
177 }
178
179 final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
180 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
181
182 Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
183 @Override
184 public Object run() {
185 try {
186 Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
187 Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
188
189 if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
190
191
192 long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
193 long publicSelectedKeysFieldOffset =
194 PlatformDependent.objectFieldOffset(publicSelectedKeysField);
195
196 if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
197 PlatformDependent.putObject(
198 unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
199 PlatformDependent.putObject(
200 unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
201 return null;
202 }
203
204 }
205
206 Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
207 if (cause != null) {
208 return cause;
209 }
210 cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
211 if (cause != null) {
212 return cause;
213 }
214
215 selectedKeysField.set(unwrappedSelector, selectedKeySet);
216 publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
217 return null;
218 } catch (NoSuchFieldException | IllegalAccessException e) {
219 return e;
220 }
221 }
222 });
223
224 if (maybeException instanceof Exception) {
225 selectedKeys = null;
226 Exception e = (Exception) maybeException;
227 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
228 return new SelectorTuple(unwrappedSelector);
229 }
230 selectedKeys = selectedKeySet;
231 logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
232 return new SelectorTuple(unwrappedSelector,
233 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
234 }
235
236
237
238
239 public SelectorProvider selectorProvider() {
240 return provider;
241 }
242
243 Selector selector() {
244 return selector;
245 }
246
247 int numRegistered() {
248 return selector().keys().size() - cancelledKeys;
249 }
250
251 Set<SelectionKey> registeredSet() {
252 return selector().keys();
253 }
254
255 void rebuildSelector0() {
256 final Selector oldSelector = selector;
257 final SelectorTuple newSelectorTuple;
258
259 if (oldSelector == null) {
260 return;
261 }
262
263 try {
264 newSelectorTuple = openSelector();
265 } catch (Exception e) {
266 logger.warn("Failed to create a new Selector.", e);
267 return;
268 }
269
270
271 int nChannels = 0;
272 for (SelectionKey key : oldSelector.keys()) {
273 DefaultNioRegistration handle = (DefaultNioRegistration) key.attachment();
274 try {
275 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
276 continue;
277 }
278
279 handle.register(newSelectorTuple.unwrappedSelector);
280 nChannels++;
281 } catch (Exception e) {
282 logger.warn("Failed to re-register a NioHandle to the new Selector.", e);
283 handle.cancel();
284 }
285 }
286
287 selector = newSelectorTuple.selector;
288 unwrappedSelector = newSelectorTuple.unwrappedSelector;
289
290 try {
291
292 oldSelector.close();
293 } catch (Throwable t) {
294 if (logger.isWarnEnabled()) {
295 logger.warn("Failed to close the old Selector.", t);
296 }
297 }
298
299 if (logger.isInfoEnabled()) {
300 logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
301 }
302 }
303
304 private static NioIoHandle nioHandle(IoHandle handle) {
305 if (handle instanceof NioIoHandle) {
306 return (NioIoHandle) handle;
307 }
308 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
309 }
310
311 private static NioIoOps cast(IoOps ops) {
312 if (ops instanceof NioIoOps) {
313 return (NioIoOps) ops;
314 }
315 throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
316 }
317
318 final class DefaultNioRegistration implements IoRegistration {
319 private final AtomicBoolean canceled = new AtomicBoolean();
320 private final NioIoHandle handle;
321 private volatile SelectionKey key;
322
323 DefaultNioRegistration(ThreadAwareExecutor executor, NioIoHandle handle, NioIoOps initialOps, Selector selector)
324 throws IOException {
325 this.handle = handle;
326 key = handle.selectableChannel().register(selector, initialOps.value, this);
327 }
328
329 NioIoHandle handle() {
330 return handle;
331 }
332
333 void register(Selector selector) throws IOException {
334 SelectionKey newKey = handle.selectableChannel().register(selector, key.interestOps(), this);
335 key.cancel();
336 key = newKey;
337 }
338
339 @SuppressWarnings("unchecked")
340 @Override
341 public <T> T attachment() {
342 return (T) key;
343 }
344
345 @Override
346 public boolean isValid() {
347 return !canceled.get() && key.isValid();
348 }
349
350 @Override
351 public long submit(IoOps ops) {
352 int v = cast(ops).value;
353 key.interestOps(v);
354 return v;
355 }
356
357 @Override
358 public boolean cancel() {
359 if (!canceled.compareAndSet(false, true)) {
360 return false;
361 }
362 key.cancel();
363 cancelledKeys++;
364 if (cancelledKeys >= CLEANUP_INTERVAL) {
365 cancelledKeys = 0;
366 needsToSelectAgain = true;
367 }
368 return true;
369 }
370
371 void close() {
372 cancel();
373 try {
374 handle.close();
375 } catch (Exception e) {
376 logger.debug("Exception during closing " + handle, e);
377 }
378 }
379
380 void handle(int ready) {
381 handle.handle(this, NioIoOps.eventOf(ready));
382 }
383 }
384
385 @Override
386 public IoRegistration register(IoHandle handle)
387 throws Exception {
388 NioIoHandle nioHandle = nioHandle(handle);
389 NioIoOps ops = NioIoOps.NONE;
390 boolean selected = false;
391 for (;;) {
392 try {
393 return new DefaultNioRegistration(executor, nioHandle, ops, unwrappedSelector());
394 } catch (CancelledKeyException e) {
395 if (!selected) {
396
397
398 selectNow();
399 selected = true;
400 } else {
401
402
403 throw e;
404 }
405 }
406 }
407 }
408
409 @Override
410 public int run(IoHandlerContext context) {
411 int handled = 0;
412 try {
413 try {
414 switch (selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock())) {
415 case SelectStrategy.CONTINUE:
416 return 0;
417
418 case SelectStrategy.BUSY_WAIT:
419
420
421 case SelectStrategy.SELECT:
422 select(context, wakenUp.getAndSet(false));
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452 if (wakenUp.get()) {
453 selector.wakeup();
454 }
455
456 default:
457 }
458 } catch (IOException e) {
459
460
461 rebuildSelector0();
462 handleLoopException(e);
463 return 0;
464 }
465
466 cancelledKeys = 0;
467 needsToSelectAgain = false;
468 handled = processSelectedKeys();
469 } catch (Error e) {
470 throw e;
471 } catch (Throwable t) {
472 handleLoopException(t);
473 }
474 return handled;
475 }
476
477 private static void handleLoopException(Throwable t) {
478 logger.warn("Unexpected exception in the selector loop.", t);
479
480
481
482 try {
483 Thread.sleep(1000);
484 } catch (InterruptedException e) {
485
486 }
487 }
488
489 private int processSelectedKeys() {
490 if (selectedKeys != null) {
491 return processSelectedKeysOptimized();
492 } else {
493 return processSelectedKeysPlain(selector.selectedKeys());
494 }
495 }
496
497 @Override
498 public void destroy() {
499 try {
500 selector.close();
501 } catch (IOException e) {
502 logger.warn("Failed to close a selector.", e);
503 }
504 }
505
506 private int processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
507
508
509
510 if (selectedKeys.isEmpty()) {
511 return 0;
512 }
513
514 Iterator<SelectionKey> i = selectedKeys.iterator();
515 int handled = 0;
516 for (;;) {
517 final SelectionKey k = i.next();
518 i.remove();
519
520 processSelectedKey(k);
521 ++handled;
522
523 if (!i.hasNext()) {
524 break;
525 }
526
527 if (needsToSelectAgain) {
528 selectAgain();
529 selectedKeys = selector.selectedKeys();
530
531
532 if (selectedKeys.isEmpty()) {
533 break;
534 } else {
535 i = selectedKeys.iterator();
536 }
537 }
538 }
539 return handled;
540 }
541
542 private int processSelectedKeysOptimized() {
543 int handled = 0;
544 for (int i = 0; i < selectedKeys.size; ++i) {
545 final SelectionKey k = selectedKeys.keys[i];
546
547
548 selectedKeys.keys[i] = null;
549
550 processSelectedKey(k);
551 ++handled;
552
553 if (needsToSelectAgain) {
554
555
556 selectedKeys.reset(i + 1);
557
558 selectAgain();
559 i = -1;
560 }
561 }
562 return handled;
563 }
564
565 private void processSelectedKey(SelectionKey k) {
566 final DefaultNioRegistration registration = (DefaultNioRegistration) k.attachment();
567 if (!registration.isValid()) {
568 try {
569 registration.handle.close();
570 } catch (Exception e) {
571 logger.debug("Exception during closing " + registration.handle, e);
572 }
573 return;
574 }
575 registration.handle(k.readyOps());
576 }
577
578 @Override
579 public void prepareToDestroy() {
580 selectAgain();
581 Set<SelectionKey> keys = selector.keys();
582 Collection<DefaultNioRegistration> registrations = new ArrayList<>(keys.size());
583 for (SelectionKey k: keys) {
584 DefaultNioRegistration handle = (DefaultNioRegistration) k.attachment();
585 registrations.add(handle);
586 }
587
588 for (DefaultNioRegistration reg: registrations) {
589 reg.close();
590 }
591 }
592
593 @Override
594 public void wakeup() {
595 if (!executor.isExecutorThread(Thread.currentThread()) && wakenUp.compareAndSet(false, true)) {
596 selector.wakeup();
597 }
598 }
599
600 @Override
601 public boolean isCompatible(Class<? extends IoHandle> handleType) {
602 return NioIoHandle.class.isAssignableFrom(handleType);
603 }
604
605 Selector unwrappedSelector() {
606 return unwrappedSelector;
607 }
608
609 private void select(IoHandlerContext runner, boolean oldWakenUp) throws IOException {
610 Selector selector = this.selector;
611 try {
612 int selectCnt = 0;
613 long currentTimeNanos = System.nanoTime();
614 long selectDeadLineNanos = currentTimeNanos + runner.delayNanos(currentTimeNanos);
615
616 for (;;) {
617 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
618 if (timeoutMillis <= 0) {
619 if (selectCnt == 0) {
620 selector.selectNow();
621 selectCnt = 1;
622 }
623 break;
624 }
625
626
627
628
629
630 if (!runner.canBlock() && wakenUp.compareAndSet(false, true)) {
631 selector.selectNow();
632 selectCnt = 1;
633 break;
634 }
635
636 int selectedKeys = selector.select(timeoutMillis);
637 selectCnt ++;
638
639 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || !runner.canBlock()) {
640
641
642
643
644 break;
645 }
646 if (Thread.interrupted()) {
647
648
649
650
651
652 if (logger.isDebugEnabled()) {
653 logger.debug("Selector.select() returned prematurely because " +
654 "Thread.currentThread().interrupt() was called. Use " +
655 "NioHandler.shutdownGracefully() to shutdown the NioHandler.");
656 }
657 selectCnt = 1;
658 break;
659 }
660
661 long time = System.nanoTime();
662 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
663
664 selectCnt = 1;
665 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
666 selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
667
668
669 selector = selectRebuildSelector(selectCnt);
670 selectCnt = 1;
671 break;
672 }
673
674 currentTimeNanos = time;
675 }
676
677 if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
678 if (logger.isDebugEnabled()) {
679 logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
680 selectCnt - 1, selector);
681 }
682 }
683 } catch (CancelledKeyException e) {
684 if (logger.isDebugEnabled()) {
685 logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
686 selector, e);
687 }
688
689 }
690 }
691
692 int selectNow() throws IOException {
693 try {
694 return selector.selectNow();
695 } finally {
696
697 if (wakenUp.get()) {
698 selector.wakeup();
699 }
700 }
701 }
702
703 private Selector selectRebuildSelector(int selectCnt) throws IOException {
704
705
706 logger.warn(
707 "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
708 selectCnt, selector);
709
710 rebuildSelector0();
711 Selector selector = this.selector;
712
713
714 selector.selectNow();
715 return selector;
716 }
717
718 private void selectAgain() {
719 needsToSelectAgain = false;
720 try {
721 selector.selectNow();
722 } catch (Throwable t) {
723 logger.warn("Failed to update SelectionKeys.", t);
724 }
725 }
726
727
728
729
730
731
732 public static IoHandlerFactory newFactory() {
733 return newFactory(SelectorProvider.provider(), DefaultSelectStrategyFactory.INSTANCE);
734 }
735
736
737
738
739
740
741
742 public static IoHandlerFactory newFactory(SelectorProvider selectorProvider) {
743 return newFactory(selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
744 }
745
746
747
748
749
750
751
752
753 public static IoHandlerFactory newFactory(final SelectorProvider selectorProvider,
754 final SelectStrategyFactory selectStrategyFactory) {
755 ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
756 ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
757 return context -> new NioIoHandler(context, selectorProvider, selectStrategyFactory.newSelectStrategy());
758 }
759 }