1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.channel.ChannelException;
19 import io.netty.channel.DefaultSelectStrategyFactory;
20 import io.netty.channel.IoEventLoop;
21 import io.netty.channel.IoExecutionContext;
22 import io.netty.channel.IoHandle;
23 import io.netty.channel.IoHandler;
24 import io.netty.channel.IoHandlerFactory;
25 import io.netty.channel.IoOps;
26 import io.netty.channel.SelectStrategy;
27 import io.netty.channel.SelectStrategyFactory;
28 import io.netty.util.IntSupplier;
29 import io.netty.util.internal.ObjectUtil;
30 import io.netty.util.internal.PlatformDependent;
31 import io.netty.util.internal.ReflectionUtil;
32 import io.netty.util.internal.StringUtil;
33 import io.netty.util.internal.SystemPropertyUtil;
34 import io.netty.util.internal.logging.InternalLogger;
35 import io.netty.util.internal.logging.InternalLoggerFactory;
36
37 import java.io.IOException;
38 import java.lang.reflect.Field;
39 import java.nio.channels.CancelledKeyException;
40 import java.nio.channels.Selector;
41 import java.nio.channels.SelectionKey;
42
43 import java.nio.channels.spi.SelectorProvider;
44 import java.security.AccessController;
45 import java.security.PrivilegedAction;
46 import java.util.ArrayList;
47 import java.util.Collection;
48 import java.util.Iterator;
49 import java.util.Set;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.atomic.AtomicBoolean;
52
53
54
55
56 public final class NioIoHandler implements IoHandler {
57
58 private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioIoHandler.class);
59
60 private static final int CLEANUP_INTERVAL = 256;
61
62 private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
63 SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
64
65 private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
66 private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
67
68 private final IntSupplier selectNowSupplier = new IntSupplier() {
69 @Override
70 public int get() throws Exception {
71 return selectNow();
72 }
73 };
74
75
76
77
78
79
80
81 static {
82 int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
83 if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
84 selectorAutoRebuildThreshold = 0;
85 }
86
87 SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
88
89 if (logger.isDebugEnabled()) {
90 logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
91 logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
92 }
93 }
94
95
96
97
98 private Selector selector;
99 private Selector unwrappedSelector;
100 private SelectedSelectionKeySet selectedKeys;
101
102 private final SelectorProvider provider;
103
104
105
106
107
108
109
110 private final AtomicBoolean wakenUp = new AtomicBoolean();
111
112 private final SelectStrategy selectStrategy;
113 private int cancelledKeys;
114 private boolean needsToSelectAgain;
115
116 private NioIoHandler(SelectorProvider selectorProvider,
117 SelectStrategy strategy) {
118 this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
119 this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
120 final SelectorTuple selectorTuple = openSelector();
121 this.selector = selectorTuple.selector;
122 this.unwrappedSelector = selectorTuple.unwrappedSelector;
123 }
124
125 private static final class SelectorTuple {
126 final Selector unwrappedSelector;
127 final Selector selector;
128
129 SelectorTuple(Selector unwrappedSelector) {
130 this.unwrappedSelector = unwrappedSelector;
131 this.selector = unwrappedSelector;
132 }
133
134 SelectorTuple(Selector unwrappedSelector, Selector selector) {
135 this.unwrappedSelector = unwrappedSelector;
136 this.selector = selector;
137 }
138 }
139
140 private SelectorTuple openSelector() {
141 final Selector unwrappedSelector;
142 try {
143 unwrappedSelector = provider.openSelector();
144 } catch (IOException e) {
145 throw new ChannelException("failed to open a new selector", e);
146 }
147
148 if (DISABLE_KEY_SET_OPTIMIZATION) {
149 return new SelectorTuple(unwrappedSelector);
150 }
151
152 Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
153 @Override
154 public Object run() {
155 try {
156 return Class.forName(
157 "sun.nio.ch.SelectorImpl",
158 false,
159 PlatformDependent.getSystemClassLoader());
160 } catch (Throwable cause) {
161 return cause;
162 }
163 }
164 });
165
166 if (!(maybeSelectorImplClass instanceof Class) ||
167
168 !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
169 if (maybeSelectorImplClass instanceof Throwable) {
170 Throwable t = (Throwable) maybeSelectorImplClass;
171 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
172 }
173 return new SelectorTuple(unwrappedSelector);
174 }
175
176 final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
177 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
178
179 Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
180 @Override
181 public Object run() {
182 try {
183 Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
184 Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
185
186 if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
187
188
189 long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
190 long publicSelectedKeysFieldOffset =
191 PlatformDependent.objectFieldOffset(publicSelectedKeysField);
192
193 if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
194 PlatformDependent.putObject(
195 unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
196 PlatformDependent.putObject(
197 unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
198 return null;
199 }
200
201 }
202
203 Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
204 if (cause != null) {
205 return cause;
206 }
207 cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
208 if (cause != null) {
209 return cause;
210 }
211
212 selectedKeysField.set(unwrappedSelector, selectedKeySet);
213 publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
214 return null;
215 } catch (NoSuchFieldException e) {
216 return e;
217 } catch (IllegalAccessException e) {
218 return e;
219 }
220 }
221 });
222
223 if (maybeException instanceof Exception) {
224 selectedKeys = null;
225 Exception e = (Exception) maybeException;
226 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
227 return new SelectorTuple(unwrappedSelector);
228 }
229 selectedKeys = selectedKeySet;
230 logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
231 return new SelectorTuple(unwrappedSelector,
232 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
233 }
234
235
236
237
238 public SelectorProvider selectorProvider() {
239 return provider;
240 }
241
242 Selector selector() {
243 return selector;
244 }
245
246 int numRegistered() {
247 return selector().keys().size() - cancelledKeys;
248 }
249
250 Set<SelectionKey> registeredSet() {
251 return selector().keys();
252 }
253
254 void rebuildSelector0() {
255 final Selector oldSelector = selector;
256 final SelectorTuple newSelectorTuple;
257
258 if (oldSelector == null) {
259 return;
260 }
261
262 try {
263 newSelectorTuple = openSelector();
264 } catch (Exception e) {
265 logger.warn("Failed to create a new Selector.", e);
266 return;
267 }
268
269
270 int nChannels = 0;
271 for (SelectionKey key : oldSelector.keys()) {
272 DefaultNioRegistration handle = (DefaultNioRegistration) key.attachment();
273 try {
274 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
275 continue;
276 }
277
278 handle.register(newSelectorTuple.unwrappedSelector);
279 nChannels++;
280 } catch (Exception e) {
281 logger.warn("Failed to re-register a NioHandle to the new Selector.", e);
282 handle.cancel();
283 }
284 }
285
286 selector = newSelectorTuple.selector;
287 unwrappedSelector = newSelectorTuple.unwrappedSelector;
288
289 try {
290
291 oldSelector.close();
292 } catch (Throwable t) {
293 if (logger.isWarnEnabled()) {
294 logger.warn("Failed to close the old Selector.", t);
295 }
296 }
297
298 if (logger.isInfoEnabled()) {
299 logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
300 }
301 }
302
303 private static NioIoHandle nioHandle(IoHandle handle) {
304 if (handle instanceof NioIoHandle) {
305 return (NioIoHandle) handle;
306 }
307 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
308 }
309
310 private static NioIoOps cast(IoOps ops) {
311 if (ops instanceof NioIoOps) {
312 return (NioIoOps) ops;
313 }
314 throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
315 }
316
317 final class DefaultNioRegistration implements NioIoRegistration {
318 private final NioIoHandle handle;
319 private volatile SelectionKey key;
320
321 DefaultNioRegistration(NioIoHandle handle, NioIoOps initialOps, Selector selector) throws IOException {
322 this.handle = handle;
323 key = handle.selectableChannel().register(selector, initialOps.value, this);
324 }
325
326 NioIoHandle handle() {
327 return handle;
328 }
329
330 void register(Selector selector) throws IOException {
331 SelectionKey newKey = handle.selectableChannel().register(selector, key.interestOps(), this);
332 key.cancel();
333 key = newKey;
334 }
335
336 @Override
337 public SelectionKey selectionKey() {
338 return key;
339 }
340
341 @Override
342 public boolean isValid() {
343 return key.isValid();
344 }
345
346 @Override
347 public long submit(IoOps ops) {
348 int v = cast(ops).value;
349 key.interestOps(v);
350 return v;
351 }
352
353 @Override
354 public void cancel() {
355 key.cancel();
356 cancelledKeys++;
357 if (cancelledKeys >= CLEANUP_INTERVAL) {
358 cancelledKeys = 0;
359 needsToSelectAgain = true;
360 }
361 }
362
363 void close() {
364 cancel();
365 try {
366 handle.close();
367 } catch (Exception e) {
368 logger.debug("Exception during closing " + handle, e);
369 }
370 }
371
372 void handle(int ready) {
373 handle.handle(this, NioIoOps.eventOf(ready));
374 }
375
376 @Override
377 public NioIoHandler ioHandler() {
378 return NioIoHandler.this;
379 }
380 }
381
382 @Override
383 public NioIoRegistration register(IoEventLoop eventLoop, IoHandle handle)
384 throws Exception {
385 NioIoHandle nioHandle = nioHandle(handle);
386 NioIoOps ops = NioIoOps.NONE;
387 boolean selected = false;
388 for (;;) {
389 try {
390 return new DefaultNioRegistration(nioHandle, ops, unwrappedSelector());
391 } catch (CancelledKeyException e) {
392 if (!selected) {
393
394
395 selectNow();
396 selected = true;
397 } else {
398
399
400 throw e;
401 }
402 }
403 }
404 }
405
406 @Override
407 public int run(IoExecutionContext runner) {
408 int handled = 0;
409 try {
410 try {
411 switch (selectStrategy.calculateStrategy(selectNowSupplier, !runner.canBlock())) {
412 case SelectStrategy.CONTINUE:
413 return 0;
414
415 case SelectStrategy.BUSY_WAIT:
416
417
418 case SelectStrategy.SELECT:
419 select(runner, wakenUp.getAndSet(false));
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449 if (wakenUp.get()) {
450 selector.wakeup();
451 }
452
453 default:
454 }
455 } catch (IOException e) {
456
457
458 rebuildSelector0();
459 handleLoopException(e);
460 return 0;
461 }
462
463 cancelledKeys = 0;
464 needsToSelectAgain = false;
465 handled = processSelectedKeys();
466 } catch (Error e) {
467 throw e;
468 } catch (Throwable t) {
469 handleLoopException(t);
470 }
471 return handled;
472 }
473
474 private static void handleLoopException(Throwable t) {
475 logger.warn("Unexpected exception in the selector loop.", t);
476
477
478
479 try {
480 Thread.sleep(1000);
481 } catch (InterruptedException e) {
482
483 }
484 }
485
486 private int processSelectedKeys() {
487 if (selectedKeys != null) {
488 return processSelectedKeysOptimized();
489 } else {
490 return processSelectedKeysPlain(selector.selectedKeys());
491 }
492 }
493
494 @Override
495 public void destroy() {
496 try {
497 selector.close();
498 } catch (IOException e) {
499 logger.warn("Failed to close a selector.", e);
500 }
501 }
502
503 private int processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
504
505
506
507 if (selectedKeys.isEmpty()) {
508 return 0;
509 }
510
511 Iterator<SelectionKey> i = selectedKeys.iterator();
512 int handled = 0;
513 for (;;) {
514 final SelectionKey k = i.next();
515 i.remove();
516
517 processSelectedKey(k);
518 ++handled;
519
520 if (!i.hasNext()) {
521 break;
522 }
523
524 if (needsToSelectAgain) {
525 selectAgain();
526 selectedKeys = selector.selectedKeys();
527
528
529 if (selectedKeys.isEmpty()) {
530 break;
531 } else {
532 i = selectedKeys.iterator();
533 }
534 }
535 }
536 return handled;
537 }
538
539 private int processSelectedKeysOptimized() {
540 int handled = 0;
541 for (int i = 0; i < selectedKeys.size; ++i) {
542 final SelectionKey k = selectedKeys.keys[i];
543
544
545 selectedKeys.keys[i] = null;
546
547 processSelectedKey(k);
548 ++handled;
549
550 if (needsToSelectAgain) {
551
552
553 selectedKeys.reset(i + 1);
554
555 selectAgain();
556 i = -1;
557 }
558 }
559 return handled;
560 }
561
562 private void processSelectedKey(SelectionKey k) {
563 final DefaultNioRegistration registration = (DefaultNioRegistration) k.attachment();
564 if (!registration.isValid()) {
565 try {
566 registration.handle.close();
567 } catch (Exception e) {
568 logger.debug("Exception during closing " + registration.handle, e);
569 }
570 return;
571 }
572 registration.handle(k.readyOps());
573 }
574
575 @Override
576 public void prepareToDestroy() {
577 selectAgain();
578 Set<SelectionKey> keys = selector.keys();
579 Collection<DefaultNioRegistration> registrations = new ArrayList<>(keys.size());
580 for (SelectionKey k: keys) {
581 DefaultNioRegistration handle = (DefaultNioRegistration) k.attachment();
582 registrations.add(handle);
583 }
584
585 for (DefaultNioRegistration reg: registrations) {
586 reg.close();
587 }
588 }
589
590 @Override
591 public void wakeup(IoEventLoop eventLoop) {
592 if (!eventLoop.inEventLoop() && wakenUp.compareAndSet(false, true)) {
593 selector.wakeup();
594 }
595 }
596
597 @Override
598 public boolean isCompatible(Class<? extends IoHandle> handleType) {
599 return NioIoHandle.class.isAssignableFrom(handleType);
600 }
601
602 Selector unwrappedSelector() {
603 return unwrappedSelector;
604 }
605
606 private void select(IoExecutionContext runner, boolean oldWakenUp) throws IOException {
607 Selector selector = this.selector;
608 try {
609 int selectCnt = 0;
610 long currentTimeNanos = System.nanoTime();
611 long selectDeadLineNanos = currentTimeNanos + runner.delayNanos(currentTimeNanos);
612
613 for (;;) {
614 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
615 if (timeoutMillis <= 0) {
616 if (selectCnt == 0) {
617 selector.selectNow();
618 selectCnt = 1;
619 }
620 break;
621 }
622
623
624
625
626
627 if (!runner.canBlock() && wakenUp.compareAndSet(false, true)) {
628 selector.selectNow();
629 selectCnt = 1;
630 break;
631 }
632
633 int selectedKeys = selector.select(timeoutMillis);
634 selectCnt ++;
635
636 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || !runner.canBlock()) {
637
638
639
640
641 break;
642 }
643 if (Thread.interrupted()) {
644
645
646
647
648
649 if (logger.isDebugEnabled()) {
650 logger.debug("Selector.select() returned prematurely because " +
651 "Thread.currentThread().interrupt() was called. Use " +
652 "NioHandler.shutdownGracefully() to shutdown the NioHandler.");
653 }
654 selectCnt = 1;
655 break;
656 }
657
658 long time = System.nanoTime();
659 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
660
661 selectCnt = 1;
662 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
663 selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
664
665
666 selector = selectRebuildSelector(selectCnt);
667 selectCnt = 1;
668 break;
669 }
670
671 currentTimeNanos = time;
672 }
673
674 if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
675 if (logger.isDebugEnabled()) {
676 logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
677 selectCnt - 1, selector);
678 }
679 }
680 } catch (CancelledKeyException e) {
681 if (logger.isDebugEnabled()) {
682 logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
683 selector, e);
684 }
685
686 }
687 }
688
689 int selectNow() throws IOException {
690 try {
691 return selector.selectNow();
692 } finally {
693
694 if (wakenUp.get()) {
695 selector.wakeup();
696 }
697 }
698 }
699
700 private Selector selectRebuildSelector(int selectCnt) throws IOException {
701
702
703 logger.warn(
704 "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
705 selectCnt, selector);
706
707 rebuildSelector0();
708 Selector selector = this.selector;
709
710
711 selector.selectNow();
712 return selector;
713 }
714
715 private void selectAgain() {
716 needsToSelectAgain = false;
717 try {
718 selector.selectNow();
719 } catch (Throwable t) {
720 logger.warn("Failed to update SelectionKeys.", t);
721 }
722 }
723
724
725
726
727
728
729 public static IoHandlerFactory newFactory() {
730 return newFactory(SelectorProvider.provider(), DefaultSelectStrategyFactory.INSTANCE);
731 }
732
733
734
735
736
737
738
739 public static IoHandlerFactory newFactory(SelectorProvider selectorProvider) {
740 return newFactory(selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
741 }
742
743
744
745
746
747
748
749
750 public static IoHandlerFactory newFactory(final SelectorProvider selectorProvider,
751 final SelectStrategyFactory selectStrategyFactory) {
752 ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
753 ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
754 return new IoHandlerFactory() {
755 @Override
756 public IoHandler newHandler() {
757 return new NioIoHandler(selectorProvider, selectStrategyFactory.newSelectStrategy());
758 }
759 };
760 }
761 }