1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.channel.ChannelException;
19 import io.netty.channel.DefaultSelectStrategyFactory;
20 import io.netty.channel.IoHandlerContext;
21 import io.netty.channel.IoHandle;
22 import io.netty.channel.IoHandler;
23 import io.netty.channel.IoHandlerFactory;
24 import io.netty.channel.IoOps;
25 import io.netty.channel.IoRegistration;
26 import io.netty.channel.SelectStrategy;
27 import io.netty.channel.SelectStrategyFactory;
28 import io.netty.util.IntSupplier;
29 import io.netty.util.concurrent.ThreadAwareExecutor;
30 import io.netty.util.internal.ObjectUtil;
31 import io.netty.util.internal.PlatformDependent;
32 import io.netty.util.internal.ReflectionUtil;
33 import io.netty.util.internal.StringUtil;
34 import io.netty.util.internal.SystemPropertyUtil;
35 import io.netty.util.internal.logging.InternalLogger;
36 import io.netty.util.internal.logging.InternalLoggerFactory;
37
38 import java.io.IOException;
39 import java.lang.reflect.Field;
40 import java.nio.channels.CancelledKeyException;
41 import java.nio.channels.Selector;
42 import java.nio.channels.SelectionKey;
43
44 import java.nio.channels.spi.SelectorProvider;
45 import java.security.AccessController;
46 import java.security.PrivilegedAction;
47 import java.util.ArrayList;
48 import java.util.Collection;
49 import java.util.Iterator;
50 import java.util.Set;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.atomic.AtomicBoolean;
53
54
55
56
57 public final class NioIoHandler implements IoHandler {
58
59 private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioIoHandler.class);
60
61 private static final int CLEANUP_INTERVAL = 256;
62
63 private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
64 SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
65
66 private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
67 private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
68
69 private final IntSupplier selectNowSupplier = new IntSupplier() {
70 @Override
71 public int get() throws Exception {
72 return selectNow();
73 }
74 };
75
76
77
78
79
80
81
82 static {
83 int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
84 if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
85 selectorAutoRebuildThreshold = 0;
86 }
87
88 SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
89
90 if (logger.isDebugEnabled()) {
91 logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
92 logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
93 }
94 }
95
96
97
98
99 private Selector selector;
100 private Selector unwrappedSelector;
101 private SelectedSelectionKeySet selectedKeys;
102
103 private final SelectorProvider provider;
104
105
106
107
108
109
110
111 private final AtomicBoolean wakenUp = new AtomicBoolean();
112
113 private final SelectStrategy selectStrategy;
114 private final ThreadAwareExecutor executor;
115 private int cancelledKeys;
116 private boolean needsToSelectAgain;
117
118 private NioIoHandler(ThreadAwareExecutor executor, SelectorProvider selectorProvider,
119 SelectStrategy strategy) {
120 this.executor = ObjectUtil.checkNotNull(executor, "executionContext");
121 this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
122 this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
123 final SelectorTuple selectorTuple = openSelector();
124 this.selector = selectorTuple.selector;
125 this.unwrappedSelector = selectorTuple.unwrappedSelector;
126 }
127
128 private static final class SelectorTuple {
129 final Selector unwrappedSelector;
130 final Selector selector;
131
132 SelectorTuple(Selector unwrappedSelector) {
133 this.unwrappedSelector = unwrappedSelector;
134 this.selector = unwrappedSelector;
135 }
136
137 SelectorTuple(Selector unwrappedSelector, Selector selector) {
138 this.unwrappedSelector = unwrappedSelector;
139 this.selector = selector;
140 }
141 }
142
143 private SelectorTuple openSelector() {
144 final Selector unwrappedSelector;
145 try {
146 unwrappedSelector = provider.openSelector();
147 } catch (IOException e) {
148 throw new ChannelException("failed to open a new selector", e);
149 }
150
151 if (DISABLE_KEY_SET_OPTIMIZATION) {
152 return new SelectorTuple(unwrappedSelector);
153 }
154
155 Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
156 @Override
157 public Object run() {
158 try {
159 return Class.forName(
160 "sun.nio.ch.SelectorImpl",
161 false,
162 PlatformDependent.getSystemClassLoader());
163 } catch (Throwable cause) {
164 return cause;
165 }
166 }
167 });
168
169 if (!(maybeSelectorImplClass instanceof Class) ||
170
171 !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
172 if (maybeSelectorImplClass instanceof Throwable) {
173 Throwable t = (Throwable) maybeSelectorImplClass;
174 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
175 }
176 return new SelectorTuple(unwrappedSelector);
177 }
178
179 final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
180 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
181
182 Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
183 @Override
184 public Object run() {
185 try {
186 Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
187 Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
188
189 if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
190
191
192 long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
193 long publicSelectedKeysFieldOffset =
194 PlatformDependent.objectFieldOffset(publicSelectedKeysField);
195
196 if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
197 PlatformDependent.putObject(
198 unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
199 PlatformDependent.putObject(
200 unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
201 return null;
202 }
203
204 }
205
206 Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
207 if (cause != null) {
208 return cause;
209 }
210 cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
211 if (cause != null) {
212 return cause;
213 }
214
215 selectedKeysField.set(unwrappedSelector, selectedKeySet);
216 publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
217 return null;
218 } catch (NoSuchFieldException | IllegalAccessException e) {
219 return e;
220 }
221 }
222 });
223
224 if (maybeException instanceof Exception) {
225 selectedKeys = null;
226 Exception e = (Exception) maybeException;
227 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
228 return new SelectorTuple(unwrappedSelector);
229 }
230 selectedKeys = selectedKeySet;
231 logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
232 return new SelectorTuple(unwrappedSelector,
233 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
234 }
235
236
237
238
239 public SelectorProvider selectorProvider() {
240 return provider;
241 }
242
243 Selector selector() {
244 return selector;
245 }
246
247 int numRegistered() {
248 return selector().keys().size() - cancelledKeys;
249 }
250
251 Set<SelectionKey> registeredSet() {
252 return selector().keys();
253 }
254
255 void rebuildSelector0() {
256 final Selector oldSelector = selector;
257 final SelectorTuple newSelectorTuple;
258
259 if (oldSelector == null) {
260 return;
261 }
262
263 try {
264 newSelectorTuple = openSelector();
265 } catch (Exception e) {
266 logger.warn("Failed to create a new Selector.", e);
267 return;
268 }
269
270
271 int nChannels = 0;
272 for (SelectionKey key : oldSelector.keys()) {
273 DefaultNioRegistration handle = (DefaultNioRegistration) key.attachment();
274 try {
275 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
276 continue;
277 }
278
279 handle.register(newSelectorTuple.unwrappedSelector);
280 nChannels++;
281 } catch (Exception e) {
282 logger.warn("Failed to re-register a NioHandle to the new Selector.", e);
283 handle.cancel();
284 }
285 }
286
287 selector = newSelectorTuple.selector;
288 unwrappedSelector = newSelectorTuple.unwrappedSelector;
289
290 try {
291
292 oldSelector.close();
293 } catch (Throwable t) {
294 if (logger.isWarnEnabled()) {
295 logger.warn("Failed to close the old Selector.", t);
296 }
297 }
298
299 if (logger.isInfoEnabled()) {
300 logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
301 }
302 }
303
304 private static NioIoHandle nioHandle(IoHandle handle) {
305 if (handle instanceof NioIoHandle) {
306 return (NioIoHandle) handle;
307 }
308 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
309 }
310
311 private static NioIoOps cast(IoOps ops) {
312 if (ops instanceof NioIoOps) {
313 return (NioIoOps) ops;
314 }
315 throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
316 }
317
318 final class DefaultNioRegistration implements IoRegistration {
319 private final AtomicBoolean canceled = new AtomicBoolean();
320 private final NioIoHandle handle;
321 private volatile SelectionKey key;
322
323 DefaultNioRegistration(ThreadAwareExecutor executor, NioIoHandle handle, NioIoOps initialOps, Selector selector)
324 throws IOException {
325 this.handle = handle;
326 key = handle.selectableChannel().register(selector, initialOps.value, this);
327 }
328
329 NioIoHandle handle() {
330 return handle;
331 }
332
333 void register(Selector selector) throws IOException {
334 SelectionKey newKey = handle.selectableChannel().register(selector, key.interestOps(), this);
335 key.cancel();
336 key = newKey;
337 }
338
339 @SuppressWarnings("unchecked")
340 @Override
341 public <T> T attachment() {
342 return (T) key;
343 }
344
345 @Override
346 public boolean isValid() {
347 return !canceled.get() && key.isValid();
348 }
349
350 @Override
351 public long submit(IoOps ops) {
352 int v = cast(ops).value;
353 key.interestOps(v);
354 return v;
355 }
356
357 @Override
358 public boolean cancel() {
359 if (!canceled.compareAndSet(false, true)) {
360 return false;
361 }
362 key.cancel();
363 cancelledKeys++;
364 if (cancelledKeys >= CLEANUP_INTERVAL) {
365 cancelledKeys = 0;
366 needsToSelectAgain = true;
367 }
368 return true;
369 }
370
371 void close() {
372 cancel();
373 try {
374 handle.close();
375 } catch (Exception e) {
376 logger.debug("Exception during closing " + handle, e);
377 }
378 }
379
380 void handle(int ready) {
381 handle.handle(this, NioIoOps.eventOf(ready));
382 }
383 }
384
385 @Override
386 public IoRegistration register(IoHandle handle)
387 throws Exception {
388 NioIoHandle nioHandle = nioHandle(handle);
389 NioIoOps ops = NioIoOps.NONE;
390 boolean selected = false;
391 for (;;) {
392 try {
393 return new DefaultNioRegistration(executor, nioHandle, ops, unwrappedSelector());
394 } catch (CancelledKeyException e) {
395 if (!selected) {
396
397
398 selectNow();
399 selected = true;
400 } else {
401
402
403 throw e;
404 }
405 }
406 }
407 }
408
409 @Override
410 public int run(IoHandlerContext context) {
411 int handled = 0;
412 try {
413 try {
414 switch (selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock())) {
415 case SelectStrategy.CONTINUE:
416 if (context.shouldReportActiveIoTime()) {
417 context.reportActiveIoTime(0);
418 }
419 return 0;
420
421 case SelectStrategy.BUSY_WAIT:
422
423
424 case SelectStrategy.SELECT:
425 select(context, wakenUp.getAndSet(false));
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455 if (wakenUp.get()) {
456 selector.wakeup();
457 }
458
459 default:
460 }
461 } catch (IOException e) {
462
463
464 rebuildSelector0();
465 handleLoopException(e);
466 return 0;
467 }
468
469 cancelledKeys = 0;
470 needsToSelectAgain = false;
471
472 if (context.shouldReportActiveIoTime()) {
473
474 long activeIoStartTimeNanos = System.nanoTime();
475 handled = processSelectedKeys();
476 long activeIoEndTimeNanos = System.nanoTime();
477 context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
478 } else {
479 handled = processSelectedKeys();
480 }
481 } catch (Error e) {
482 throw e;
483 } catch (Throwable t) {
484 handleLoopException(t);
485 }
486 return handled;
487 }
488
489 private static void handleLoopException(Throwable t) {
490 logger.warn("Unexpected exception in the selector loop.", t);
491
492
493
494 try {
495 Thread.sleep(1000);
496 } catch (InterruptedException e) {
497
498 }
499 }
500
501 private int processSelectedKeys() {
502 if (selectedKeys != null) {
503 return processSelectedKeysOptimized();
504 } else {
505 return processSelectedKeysPlain(selector.selectedKeys());
506 }
507 }
508
509 @Override
510 public void destroy() {
511 try {
512 selector.close();
513 } catch (IOException e) {
514 logger.warn("Failed to close a selector.", e);
515 }
516 }
517
518 private int processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
519
520
521
522 if (selectedKeys.isEmpty()) {
523 return 0;
524 }
525
526 Iterator<SelectionKey> i = selectedKeys.iterator();
527 int handled = 0;
528 for (;;) {
529 final SelectionKey k = i.next();
530 i.remove();
531
532 processSelectedKey(k);
533 ++handled;
534
535 if (!i.hasNext()) {
536 break;
537 }
538
539 if (needsToSelectAgain) {
540 selectAgain();
541 selectedKeys = selector.selectedKeys();
542
543
544 if (selectedKeys.isEmpty()) {
545 break;
546 } else {
547 i = selectedKeys.iterator();
548 }
549 }
550 }
551 return handled;
552 }
553
554 private int processSelectedKeysOptimized() {
555 int handled = 0;
556 for (int i = 0; i < selectedKeys.size; ++i) {
557 final SelectionKey k = selectedKeys.keys[i];
558
559
560 selectedKeys.keys[i] = null;
561
562 processSelectedKey(k);
563 ++handled;
564
565 if (needsToSelectAgain) {
566
567
568 selectedKeys.reset(i + 1);
569
570 selectAgain();
571 i = -1;
572 }
573 }
574 return handled;
575 }
576
577 private void processSelectedKey(SelectionKey k) {
578 final DefaultNioRegistration registration = (DefaultNioRegistration) k.attachment();
579 if (!registration.isValid()) {
580 try {
581 registration.handle.close();
582 } catch (Exception e) {
583 logger.debug("Exception during closing " + registration.handle, e);
584 }
585 return;
586 }
587 registration.handle(k.readyOps());
588 }
589
590 @Override
591 public void prepareToDestroy() {
592 selectAgain();
593 Set<SelectionKey> keys = selector.keys();
594 Collection<DefaultNioRegistration> registrations = new ArrayList<>(keys.size());
595 for (SelectionKey k: keys) {
596 DefaultNioRegistration handle = (DefaultNioRegistration) k.attachment();
597 registrations.add(handle);
598 }
599
600 for (DefaultNioRegistration reg: registrations) {
601 reg.close();
602 }
603 }
604
605 @Override
606 public void wakeup() {
607 if (!executor.isExecutorThread(Thread.currentThread()) && wakenUp.compareAndSet(false, true)) {
608 selector.wakeup();
609 }
610 }
611
612 @Override
613 public boolean isCompatible(Class<? extends IoHandle> handleType) {
614 return NioIoHandle.class.isAssignableFrom(handleType);
615 }
616
617 Selector unwrappedSelector() {
618 return unwrappedSelector;
619 }
620
621 private void select(IoHandlerContext runner, boolean oldWakenUp) throws IOException {
622 Selector selector = this.selector;
623 try {
624 int selectCnt = 0;
625 long currentTimeNanos = System.nanoTime();
626 long selectDeadLineNanos = currentTimeNanos + runner.delayNanos(currentTimeNanos);
627
628 for (;;) {
629 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
630 if (timeoutMillis <= 0) {
631 if (selectCnt == 0) {
632 selector.selectNow();
633 selectCnt = 1;
634 }
635 break;
636 }
637
638
639
640
641
642 if (!runner.canBlock() && wakenUp.compareAndSet(false, true)) {
643 selector.selectNow();
644 selectCnt = 1;
645 break;
646 }
647
648 int selectedKeys = selector.select(timeoutMillis);
649 selectCnt ++;
650
651 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || !runner.canBlock()) {
652
653
654
655
656 break;
657 }
658 if (Thread.interrupted()) {
659
660
661
662
663
664 if (logger.isDebugEnabled()) {
665 logger.debug("Selector.select() returned prematurely because " +
666 "Thread.currentThread().interrupt() was called. Use " +
667 "NioHandler.shutdownGracefully() to shutdown the NioHandler.");
668 }
669 selectCnt = 1;
670 break;
671 }
672
673 long time = System.nanoTime();
674 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
675
676 selectCnt = 1;
677 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
678 selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
679
680
681 selector = selectRebuildSelector(selectCnt);
682 selectCnt = 1;
683 break;
684 }
685
686 currentTimeNanos = time;
687 }
688
689 if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
690 if (logger.isDebugEnabled()) {
691 logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
692 selectCnt - 1, selector);
693 }
694 }
695 } catch (CancelledKeyException e) {
696 if (logger.isDebugEnabled()) {
697 logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
698 selector, e);
699 }
700
701 }
702 }
703
704 int selectNow() throws IOException {
705 try {
706 return selector.selectNow();
707 } finally {
708
709 if (wakenUp.get()) {
710 selector.wakeup();
711 }
712 }
713 }
714
715 private Selector selectRebuildSelector(int selectCnt) throws IOException {
716
717
718 logger.warn(
719 "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
720 selectCnt, selector);
721
722 rebuildSelector0();
723 Selector selector = this.selector;
724
725
726 selector.selectNow();
727 return selector;
728 }
729
730 private void selectAgain() {
731 needsToSelectAgain = false;
732 try {
733 selector.selectNow();
734 } catch (Throwable t) {
735 logger.warn("Failed to update SelectionKeys.", t);
736 }
737 }
738
739
740
741
742
743
744 public static IoHandlerFactory newFactory() {
745 return newFactory(SelectorProvider.provider(), DefaultSelectStrategyFactory.INSTANCE);
746 }
747
748
749
750
751
752
753
754 public static IoHandlerFactory newFactory(SelectorProvider selectorProvider) {
755 return newFactory(selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
756 }
757
758
759
760
761
762
763
764
765 public static IoHandlerFactory newFactory(final SelectorProvider selectorProvider,
766 final SelectStrategyFactory selectStrategyFactory) {
767 ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
768 ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
769 return new IoHandlerFactory() {
770 @Override
771 public IoHandler newHandler(ThreadAwareExecutor executor) {
772 return new NioIoHandler(executor, selectorProvider, selectStrategyFactory.newSelectStrategy());
773 }
774
775 @Override
776 public boolean isChangingThreadSupported() {
777 return true;
778 }
779 };
780 }
781 }