1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.channel.ChannelException;
19 import io.netty.channel.DefaultSelectStrategyFactory;
20 import io.netty.channel.IoHandlerContext;
21 import io.netty.channel.IoHandle;
22 import io.netty.channel.IoHandler;
23 import io.netty.channel.IoHandlerFactory;
24 import io.netty.channel.IoOps;
25 import io.netty.channel.IoRegistration;
26 import io.netty.channel.SelectStrategy;
27 import io.netty.channel.SelectStrategyFactory;
28 import io.netty.util.IntSupplier;
29 import io.netty.util.concurrent.ThreadAwareExecutor;
30 import io.netty.util.internal.ObjectUtil;
31 import io.netty.util.internal.PlatformDependent;
32 import io.netty.util.internal.ReflectionUtil;
33 import io.netty.util.internal.StringUtil;
34 import io.netty.util.internal.SystemPropertyUtil;
35 import io.netty.util.internal.logging.InternalLogger;
36 import io.netty.util.internal.logging.InternalLoggerFactory;
37
38 import java.io.IOException;
39 import java.lang.reflect.Field;
40 import java.nio.channels.CancelledKeyException;
41 import java.nio.channels.Selector;
42 import java.nio.channels.SelectionKey;
43
44 import java.nio.channels.spi.SelectorProvider;
45 import java.security.AccessController;
46 import java.security.PrivilegedAction;
47 import java.util.ArrayList;
48 import java.util.Collection;
49 import java.util.Iterator;
50 import java.util.Set;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.atomic.AtomicBoolean;
53
54
55
56
57 public final class NioIoHandler implements IoHandler {
58
59 private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioIoHandler.class);
60
61 private static final int CLEANUP_INTERVAL = 256;
62
63 private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
64 SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
65
66 private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
67 private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
68
69 private final IntSupplier selectNowSupplier = new IntSupplier() {
70 @Override
71 public int get() throws Exception {
72 return selectNow();
73 }
74 };
75
76
77
78
79
80
81
82 static {
83 int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
84 if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
85 selectorAutoRebuildThreshold = 0;
86 }
87
88 SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
89
90 if (logger.isDebugEnabled()) {
91 logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
92 logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
93 }
94 }
95
96
97
98
99 private Selector selector;
100 private Selector unwrappedSelector;
101 private SelectedSelectionKeySet selectedKeys;
102
103 private final SelectorProvider provider;
104
105
106
107
108
109
110
111 private final AtomicBoolean wakenUp = new AtomicBoolean();
112
113 private final SelectStrategy selectStrategy;
114 private final ThreadAwareExecutor executor;
115 private int cancelledKeys;
116 private boolean needsToSelectAgain;
117
118 private NioIoHandler(ThreadAwareExecutor executor, SelectorProvider selectorProvider,
119 SelectStrategy strategy) {
120 this.executor = ObjectUtil.checkNotNull(executor, "executionContext");
121 this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
122 this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
123 final SelectorTuple selectorTuple = openSelector();
124 this.selector = selectorTuple.selector;
125 this.unwrappedSelector = selectorTuple.unwrappedSelector;
126 }
127
128 private static final class SelectorTuple {
129 final Selector unwrappedSelector;
130 final Selector selector;
131
132 SelectorTuple(Selector unwrappedSelector) {
133 this.unwrappedSelector = unwrappedSelector;
134 this.selector = unwrappedSelector;
135 }
136
137 SelectorTuple(Selector unwrappedSelector, Selector selector) {
138 this.unwrappedSelector = unwrappedSelector;
139 this.selector = selector;
140 }
141 }
142
143 private SelectorTuple openSelector() {
144 final Selector unwrappedSelector;
145 try {
146 unwrappedSelector = provider.openSelector();
147 } catch (IOException e) {
148 throw new ChannelException("failed to open a new selector", e);
149 }
150
151 if (DISABLE_KEY_SET_OPTIMIZATION) {
152 return new SelectorTuple(unwrappedSelector);
153 }
154
155 Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
156 @Override
157 public Object run() {
158 try {
159 return Class.forName(
160 "sun.nio.ch.SelectorImpl",
161 false,
162 PlatformDependent.getSystemClassLoader());
163 } catch (Throwable cause) {
164 return cause;
165 }
166 }
167 });
168
169 if (!(maybeSelectorImplClass instanceof Class) ||
170
171 !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
172 if (maybeSelectorImplClass instanceof Throwable) {
173 Throwable t = (Throwable) maybeSelectorImplClass;
174 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
175 }
176 return new SelectorTuple(unwrappedSelector);
177 }
178
179 final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
180 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
181
182 Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
183 @Override
184 public Object run() {
185 try {
186 Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
187 Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
188
189 if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
190
191
192 long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
193 long publicSelectedKeysFieldOffset =
194 PlatformDependent.objectFieldOffset(publicSelectedKeysField);
195
196 if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
197 PlatformDependent.putObject(
198 unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
199 PlatformDependent.putObject(
200 unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
201 return null;
202 }
203
204 }
205
206 Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
207 if (cause != null) {
208 return cause;
209 }
210 cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
211 if (cause != null) {
212 return cause;
213 }
214
215 selectedKeysField.set(unwrappedSelector, selectedKeySet);
216 publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
217 return null;
218 } catch (NoSuchFieldException | IllegalAccessException e) {
219 return e;
220 }
221 }
222 });
223
224 if (maybeException instanceof Exception) {
225 selectedKeys = null;
226 Exception e = (Exception) maybeException;
227 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
228 return new SelectorTuple(unwrappedSelector);
229 }
230 selectedKeys = selectedKeySet;
231 logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
232 return new SelectorTuple(unwrappedSelector,
233 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
234 }
235
236
237
238
239 public SelectorProvider selectorProvider() {
240 return provider;
241 }
242
243 Selector selector() {
244 return selector;
245 }
246
247 int numRegistered() {
248 return selector().keys().size() - cancelledKeys;
249 }
250
251 Set<SelectionKey> registeredSet() {
252 return selector().keys();
253 }
254
255 void rebuildSelector0() {
256 final Selector oldSelector = selector;
257 final SelectorTuple newSelectorTuple;
258
259 if (oldSelector == null) {
260 return;
261 }
262
263 try {
264 newSelectorTuple = openSelector();
265 } catch (Exception e) {
266 logger.warn("Failed to create a new Selector.", e);
267 return;
268 }
269
270
271 int nChannels = 0;
272 for (SelectionKey key : oldSelector.keys()) {
273 DefaultNioRegistration handle = (DefaultNioRegistration) key.attachment();
274 try {
275 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
276 continue;
277 }
278
279 handle.register(newSelectorTuple.unwrappedSelector);
280 nChannels++;
281 } catch (Exception e) {
282 logger.warn("Failed to re-register a NioHandle to the new Selector.", e);
283 handle.cancel();
284 }
285 }
286
287 selector = newSelectorTuple.selector;
288 unwrappedSelector = newSelectorTuple.unwrappedSelector;
289
290 try {
291
292 oldSelector.close();
293 } catch (Throwable t) {
294 if (logger.isWarnEnabled()) {
295 logger.warn("Failed to close the old Selector.", t);
296 }
297 }
298
299 if (logger.isInfoEnabled()) {
300 logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
301 }
302 }
303
304 private static NioIoHandle nioHandle(IoHandle handle) {
305 if (handle instanceof NioIoHandle) {
306 return (NioIoHandle) handle;
307 }
308 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
309 }
310
311 private static NioIoOps cast(IoOps ops) {
312 if (ops instanceof NioIoOps) {
313 return (NioIoOps) ops;
314 }
315 throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
316 }
317
318 final class DefaultNioRegistration implements IoRegistration {
319 private final AtomicBoolean canceled = new AtomicBoolean();
320 private final NioIoHandle handle;
321 private volatile SelectionKey key;
322
323 DefaultNioRegistration(ThreadAwareExecutor executor, NioIoHandle handle, NioIoOps initialOps, Selector selector)
324 throws IOException {
325 this.handle = handle;
326 key = handle.selectableChannel().register(selector, initialOps.value, this);
327 }
328
329 NioIoHandle handle() {
330 return handle;
331 }
332
333 void register(Selector selector) throws IOException {
334 SelectionKey newKey = handle.selectableChannel().register(selector, key.interestOps(), this);
335 key.cancel();
336 key = newKey;
337 }
338
339 @SuppressWarnings("unchecked")
340 @Override
341 public <T> T attachment() {
342 return (T) key;
343 }
344
345 @Override
346 public boolean isValid() {
347 return !canceled.get() && key.isValid();
348 }
349
350 @Override
351 public long submit(IoOps ops) {
352 if (!isValid()) {
353 return -1;
354 }
355 int v = cast(ops).value;
356 key.interestOps(v);
357 return v;
358 }
359
360 @Override
361 public boolean cancel() {
362 if (!canceled.compareAndSet(false, true)) {
363 return false;
364 }
365 key.cancel();
366 cancelledKeys++;
367 if (cancelledKeys >= CLEANUP_INTERVAL) {
368 cancelledKeys = 0;
369 needsToSelectAgain = true;
370 }
371 handle.unregistered();
372 return true;
373 }
374
375 void close() {
376 cancel();
377 try {
378 handle.close();
379 } catch (Exception e) {
380 logger.debug("Exception during closing " + handle, e);
381 }
382 }
383
384 void handle(int ready) {
385 if (!isValid()) {
386 return;
387 }
388 handle.handle(this, NioIoOps.eventOf(ready));
389 }
390 }
391
392 @Override
393 public IoRegistration register(IoHandle handle)
394 throws Exception {
395 NioIoHandle nioHandle = nioHandle(handle);
396 NioIoOps ops = NioIoOps.NONE;
397 boolean selected = false;
398 for (;;) {
399 try {
400 IoRegistration registration = new DefaultNioRegistration(executor, nioHandle, ops, unwrappedSelector());
401 handle.registered();
402 return registration;
403 } catch (CancelledKeyException e) {
404 if (!selected) {
405
406
407 selectNow();
408 selected = true;
409 } else {
410
411
412 throw e;
413 }
414 }
415 }
416 }
417
418 @Override
419 public int run(IoHandlerContext context) {
420 int handled = 0;
421 try {
422 try {
423 switch (selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock())) {
424 case SelectStrategy.CONTINUE:
425 if (context.shouldReportActiveIoTime()) {
426 context.reportActiveIoTime(0);
427 }
428 return 0;
429
430 case SelectStrategy.BUSY_WAIT:
431
432
433 case SelectStrategy.SELECT:
434 select(context, wakenUp.getAndSet(false));
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464 if (wakenUp.get()) {
465 selector.wakeup();
466 }
467
468 default:
469 }
470 } catch (IOException e) {
471
472
473 rebuildSelector0();
474 handleLoopException(e);
475 return 0;
476 }
477
478 cancelledKeys = 0;
479 needsToSelectAgain = false;
480
481 if (context.shouldReportActiveIoTime()) {
482
483 long activeIoStartTimeNanos = System.nanoTime();
484 handled = processSelectedKeys();
485 long activeIoEndTimeNanos = System.nanoTime();
486 context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
487 } else {
488 handled = processSelectedKeys();
489 }
490 } catch (Error e) {
491 throw e;
492 } catch (Throwable t) {
493 handleLoopException(t);
494 }
495 return handled;
496 }
497
498 private static void handleLoopException(Throwable t) {
499 logger.warn("Unexpected exception in the selector loop.", t);
500
501
502
503 try {
504 Thread.sleep(1000);
505 } catch (InterruptedException e) {
506
507 }
508 }
509
510 private int processSelectedKeys() {
511 if (selectedKeys != null) {
512 return processSelectedKeysOptimized();
513 } else {
514 return processSelectedKeysPlain(selector.selectedKeys());
515 }
516 }
517
518 @Override
519 public void destroy() {
520 try {
521 selector.close();
522 } catch (IOException e) {
523 logger.warn("Failed to close a selector.", e);
524 }
525 }
526
527 private int processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
528
529
530
531 if (selectedKeys.isEmpty()) {
532 return 0;
533 }
534
535 Iterator<SelectionKey> i = selectedKeys.iterator();
536 int handled = 0;
537 for (;;) {
538 final SelectionKey k = i.next();
539 i.remove();
540
541 processSelectedKey(k);
542 ++handled;
543
544 if (!i.hasNext()) {
545 break;
546 }
547
548 if (needsToSelectAgain) {
549 selectAgain();
550 selectedKeys = selector.selectedKeys();
551
552
553 if (selectedKeys.isEmpty()) {
554 break;
555 } else {
556 i = selectedKeys.iterator();
557 }
558 }
559 }
560 return handled;
561 }
562
563 private int processSelectedKeysOptimized() {
564 int handled = 0;
565 for (int i = 0; i < selectedKeys.size; ++i) {
566 final SelectionKey k = selectedKeys.keys[i];
567
568
569 selectedKeys.keys[i] = null;
570
571 processSelectedKey(k);
572 ++handled;
573
574 if (needsToSelectAgain) {
575
576
577 selectedKeys.reset(i + 1);
578
579 selectAgain();
580 i = -1;
581 }
582 }
583 return handled;
584 }
585
586 private void processSelectedKey(SelectionKey k) {
587 final DefaultNioRegistration registration = (DefaultNioRegistration) k.attachment();
588 if (!registration.isValid()) {
589 try {
590 registration.handle.close();
591 } catch (Exception e) {
592 logger.debug("Exception during closing " + registration.handle, e);
593 }
594 return;
595 }
596 registration.handle(k.readyOps());
597 }
598
599 @Override
600 public void prepareToDestroy() {
601 selectAgain();
602 Set<SelectionKey> keys = selector.keys();
603 Collection<DefaultNioRegistration> registrations = new ArrayList<>(keys.size());
604 for (SelectionKey k: keys) {
605 DefaultNioRegistration handle = (DefaultNioRegistration) k.attachment();
606 registrations.add(handle);
607 }
608
609 for (DefaultNioRegistration reg: registrations) {
610 reg.close();
611 }
612 }
613
614 @Override
615 public void wakeup() {
616 if (!executor.isExecutorThread(Thread.currentThread()) && wakenUp.compareAndSet(false, true)) {
617 selector.wakeup();
618 }
619 }
620
621 @Override
622 public boolean isCompatible(Class<? extends IoHandle> handleType) {
623 return NioIoHandle.class.isAssignableFrom(handleType);
624 }
625
626 Selector unwrappedSelector() {
627 return unwrappedSelector;
628 }
629
630 private void select(IoHandlerContext runner, boolean oldWakenUp) throws IOException {
631 Selector selector = this.selector;
632 try {
633 int selectCnt = 0;
634 long currentTimeNanos = System.nanoTime();
635 long selectDeadLineNanos = currentTimeNanos + runner.delayNanos(currentTimeNanos);
636
637 for (;;) {
638 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
639 if (timeoutMillis <= 0) {
640 if (selectCnt == 0) {
641 selector.selectNow();
642 selectCnt = 1;
643 }
644 break;
645 }
646
647
648
649
650
651 if (!runner.canBlock() && wakenUp.compareAndSet(false, true)) {
652 selector.selectNow();
653 selectCnt = 1;
654 break;
655 }
656
657 int selectedKeys = selector.select(timeoutMillis);
658 selectCnt ++;
659
660 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || !runner.canBlock()) {
661
662
663
664
665 break;
666 }
667 if (Thread.interrupted()) {
668
669
670
671
672
673 if (logger.isDebugEnabled()) {
674 logger.debug("Selector.select() returned prematurely because " +
675 "Thread.currentThread().interrupt() was called. Use " +
676 "NioHandler.shutdownGracefully() to shutdown the NioHandler.");
677 }
678 selectCnt = 1;
679 break;
680 }
681
682 long time = System.nanoTime();
683 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
684
685 selectCnt = 1;
686 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
687 selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
688
689
690 selector = selectRebuildSelector(selectCnt);
691 selectCnt = 1;
692 break;
693 }
694
695 currentTimeNanos = time;
696 }
697
698 if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
699 if (logger.isDebugEnabled()) {
700 logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
701 selectCnt - 1, selector);
702 }
703 }
704 } catch (CancelledKeyException e) {
705 if (logger.isDebugEnabled()) {
706 logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
707 selector, e);
708 }
709
710 }
711 }
712
713 int selectNow() throws IOException {
714 try {
715 return selector.selectNow();
716 } finally {
717
718 if (wakenUp.get()) {
719 selector.wakeup();
720 }
721 }
722 }
723
724 private Selector selectRebuildSelector(int selectCnt) throws IOException {
725
726
727 logger.warn(
728 "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
729 selectCnt, selector);
730
731 rebuildSelector0();
732 Selector selector = this.selector;
733
734
735 selector.selectNow();
736 return selector;
737 }
738
739 private void selectAgain() {
740 needsToSelectAgain = false;
741 try {
742 selector.selectNow();
743 } catch (Throwable t) {
744 logger.warn("Failed to update SelectionKeys.", t);
745 }
746 }
747
748
749
750
751
752
753 public static IoHandlerFactory newFactory() {
754 return newFactory(SelectorProvider.provider(), DefaultSelectStrategyFactory.INSTANCE);
755 }
756
757
758
759
760
761
762
763 public static IoHandlerFactory newFactory(SelectorProvider selectorProvider) {
764 return newFactory(selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
765 }
766
767
768
769
770
771
772
773
774 public static IoHandlerFactory newFactory(final SelectorProvider selectorProvider,
775 final SelectStrategyFactory selectStrategyFactory) {
776 ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
777 ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
778 return new IoHandlerFactory() {
779 @Override
780 public IoHandler newHandler(ThreadAwareExecutor executor) {
781 return new NioIoHandler(executor, selectorProvider, selectStrategyFactory.newSelectStrategy());
782 }
783
784 @Override
785 public boolean isChangingThreadSupported() {
786 return true;
787 }
788 };
789 }
790 }