1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.channel.ChannelException;
19 import io.netty.channel.DefaultSelectStrategyFactory;
20 import io.netty.channel.IoEventLoop;
21 import io.netty.channel.IoExecutionContext;
22 import io.netty.channel.IoHandle;
23 import io.netty.channel.IoHandler;
24 import io.netty.channel.IoHandlerFactory;
25 import io.netty.channel.IoOps;
26 import io.netty.channel.SelectStrategy;
27 import io.netty.channel.SelectStrategyFactory;
28 import io.netty.util.IntSupplier;
29 import io.netty.util.concurrent.Future;
30 import io.netty.util.concurrent.Promise;
31 import io.netty.util.internal.ObjectUtil;
32 import io.netty.util.internal.PlatformDependent;
33 import io.netty.util.internal.ReflectionUtil;
34 import io.netty.util.internal.StringUtil;
35 import io.netty.util.internal.SystemPropertyUtil;
36 import io.netty.util.internal.logging.InternalLogger;
37 import io.netty.util.internal.logging.InternalLoggerFactory;
38
39 import java.io.IOException;
40 import java.lang.reflect.Field;
41 import java.nio.channels.CancelledKeyException;
42 import java.nio.channels.Selector;
43 import java.nio.channels.SelectionKey;
44
45 import java.nio.channels.spi.SelectorProvider;
46 import java.security.AccessController;
47 import java.security.PrivilegedAction;
48 import java.util.ArrayList;
49 import java.util.Collection;
50 import java.util.Iterator;
51 import java.util.Set;
52 import java.util.concurrent.TimeUnit;
53 import java.util.concurrent.atomic.AtomicBoolean;
54
55
56
57
58 public final class NioIoHandler implements IoHandler {
59
60 private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioIoHandler.class);
61
62 private static final int CLEANUP_INTERVAL = 256;
63
64 private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
65 SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
66
67 private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
68 private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
69
70 private final IntSupplier selectNowSupplier = new IntSupplier() {
71 @Override
72 public int get() throws Exception {
73 return selectNow();
74 }
75 };
76
77
78
79
80
81
82
83 static {
84 int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
85 if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
86 selectorAutoRebuildThreshold = 0;
87 }
88
89 SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
90
91 if (logger.isDebugEnabled()) {
92 logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
93 logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
94 }
95 }
96
97
98
99
100 private Selector selector;
101 private Selector unwrappedSelector;
102 private SelectedSelectionKeySet selectedKeys;
103
104 private final SelectorProvider provider;
105
106
107
108
109
110
111
112 private final AtomicBoolean wakenUp = new AtomicBoolean();
113
114 private final SelectStrategy selectStrategy;
115 private int cancelledKeys;
116 private boolean needsToSelectAgain;
117
118 private NioIoHandler(SelectorProvider selectorProvider,
119 SelectStrategy strategy) {
120 this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
121 this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
122 final SelectorTuple selectorTuple = openSelector();
123 this.selector = selectorTuple.selector;
124 this.unwrappedSelector = selectorTuple.unwrappedSelector;
125 }
126
127 private static final class SelectorTuple {
128 final Selector unwrappedSelector;
129 final Selector selector;
130
131 SelectorTuple(Selector unwrappedSelector) {
132 this.unwrappedSelector = unwrappedSelector;
133 this.selector = unwrappedSelector;
134 }
135
136 SelectorTuple(Selector unwrappedSelector, Selector selector) {
137 this.unwrappedSelector = unwrappedSelector;
138 this.selector = selector;
139 }
140 }
141
142 private SelectorTuple openSelector() {
143 final Selector unwrappedSelector;
144 try {
145 unwrappedSelector = provider.openSelector();
146 } catch (IOException e) {
147 throw new ChannelException("failed to open a new selector", e);
148 }
149
150 if (DISABLE_KEY_SET_OPTIMIZATION) {
151 return new SelectorTuple(unwrappedSelector);
152 }
153
154 Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
155 @Override
156 public Object run() {
157 try {
158 return Class.forName(
159 "sun.nio.ch.SelectorImpl",
160 false,
161 PlatformDependent.getSystemClassLoader());
162 } catch (Throwable cause) {
163 return cause;
164 }
165 }
166 });
167
168 if (!(maybeSelectorImplClass instanceof Class) ||
169
170 !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
171 if (maybeSelectorImplClass instanceof Throwable) {
172 Throwable t = (Throwable) maybeSelectorImplClass;
173 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
174 }
175 return new SelectorTuple(unwrappedSelector);
176 }
177
178 final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
179 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
180
181 Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
182 @Override
183 public Object run() {
184 try {
185 Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
186 Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
187
188 if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
189
190
191 long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
192 long publicSelectedKeysFieldOffset =
193 PlatformDependent.objectFieldOffset(publicSelectedKeysField);
194
195 if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
196 PlatformDependent.putObject(
197 unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
198 PlatformDependent.putObject(
199 unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
200 return null;
201 }
202
203 }
204
205 Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
206 if (cause != null) {
207 return cause;
208 }
209 cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
210 if (cause != null) {
211 return cause;
212 }
213
214 selectedKeysField.set(unwrappedSelector, selectedKeySet);
215 publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
216 return null;
217 } catch (NoSuchFieldException e) {
218 return e;
219 } catch (IllegalAccessException e) {
220 return e;
221 }
222 }
223 });
224
225 if (maybeException instanceof Exception) {
226 selectedKeys = null;
227 Exception e = (Exception) maybeException;
228 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
229 return new SelectorTuple(unwrappedSelector);
230 }
231 selectedKeys = selectedKeySet;
232 logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
233 return new SelectorTuple(unwrappedSelector,
234 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
235 }
236
237
238
239
240 public SelectorProvider selectorProvider() {
241 return provider;
242 }
243
244 Selector selector() {
245 return selector;
246 }
247
248 int numRegistered() {
249 return selector().keys().size() - cancelledKeys;
250 }
251
252 Set<SelectionKey> registeredSet() {
253 return selector().keys();
254 }
255
256 void rebuildSelector0() {
257 final Selector oldSelector = selector;
258 final SelectorTuple newSelectorTuple;
259
260 if (oldSelector == null) {
261 return;
262 }
263
264 try {
265 newSelectorTuple = openSelector();
266 } catch (Exception e) {
267 logger.warn("Failed to create a new Selector.", e);
268 return;
269 }
270
271
272 int nChannels = 0;
273 for (SelectionKey key : oldSelector.keys()) {
274 DefaultNioRegistration handle = (DefaultNioRegistration) key.attachment();
275 try {
276 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
277 continue;
278 }
279
280 handle.register(newSelectorTuple.unwrappedSelector);
281 nChannels++;
282 } catch (Exception e) {
283 logger.warn("Failed to re-register a NioHandle to the new Selector.", e);
284 handle.cancel();
285 }
286 }
287
288 selector = newSelectorTuple.selector;
289 unwrappedSelector = newSelectorTuple.unwrappedSelector;
290
291 try {
292
293 oldSelector.close();
294 } catch (Throwable t) {
295 if (logger.isWarnEnabled()) {
296 logger.warn("Failed to close the old Selector.", t);
297 }
298 }
299
300 if (logger.isInfoEnabled()) {
301 logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
302 }
303 }
304
305 private static NioIoHandle nioHandle(IoHandle handle) {
306 if (handle instanceof NioIoHandle) {
307 return (NioIoHandle) handle;
308 }
309 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
310 }
311
312 private static NioIoOps cast(IoOps ops) {
313 if (ops instanceof NioIoOps) {
314 return (NioIoOps) ops;
315 }
316 throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
317 }
318
319 final class DefaultNioRegistration implements NioIoRegistration {
320 private final Promise<?> cancellationPromise;
321 private final NioIoHandle handle;
322 private volatile SelectionKey key;
323
324 DefaultNioRegistration(IoEventLoop eventLoop, NioIoHandle handle, NioIoOps initialOps, Selector selector)
325 throws IOException {
326 this.handle = handle;
327 key = handle.selectableChannel().register(selector, initialOps.value, this);
328 this.cancellationPromise = eventLoop.newPromise();
329 }
330
331 NioIoHandle handle() {
332 return handle;
333 }
334
335 void register(Selector selector) throws IOException {
336 SelectionKey newKey = handle.selectableChannel().register(selector, key.interestOps(), this);
337 key.cancel();
338 key = newKey;
339 }
340
341 @Override
342 public SelectionKey selectionKey() {
343 return key;
344 }
345
346 @Override
347 public boolean isValid() {
348 return key.isValid();
349 }
350
351 @Override
352 public long submit(IoOps ops) {
353 int v = cast(ops).value;
354 key.interestOps(v);
355 return v;
356 }
357
358 @Override
359 public void cancel() {
360 if (!cancellationPromise.trySuccess(null)) {
361 return;
362 }
363 key.cancel();
364 cancelledKeys++;
365 if (cancelledKeys >= CLEANUP_INTERVAL) {
366 cancelledKeys = 0;
367 needsToSelectAgain = true;
368 }
369 }
370
371 @Override
372 public Future<?> cancelFuture() {
373 return cancellationPromise;
374 }
375
376 void close() {
377 cancel();
378 try {
379 handle.close();
380 } catch (Exception e) {
381 logger.debug("Exception during closing " + handle, e);
382 }
383 }
384
385 void handle(int ready) {
386 handle.handle(this, NioIoOps.eventOf(ready));
387 }
388
389 @Override
390 public NioIoHandler ioHandler() {
391 return NioIoHandler.this;
392 }
393 }
394
395 @Override
396 public NioIoRegistration register(IoEventLoop eventLoop, IoHandle handle)
397 throws Exception {
398 NioIoHandle nioHandle = nioHandle(handle);
399 NioIoOps ops = NioIoOps.NONE;
400 boolean selected = false;
401 for (;;) {
402 try {
403 return new DefaultNioRegistration(eventLoop, nioHandle, ops, unwrappedSelector());
404 } catch (CancelledKeyException e) {
405 if (!selected) {
406
407
408 selectNow();
409 selected = true;
410 } else {
411
412
413 throw e;
414 }
415 }
416 }
417 }
418
419 @Override
420 public int run(IoExecutionContext runner) {
421 int handled = 0;
422 try {
423 try {
424 switch (selectStrategy.calculateStrategy(selectNowSupplier, !runner.canBlock())) {
425 case SelectStrategy.CONTINUE:
426 return 0;
427
428 case SelectStrategy.BUSY_WAIT:
429
430
431 case SelectStrategy.SELECT:
432 select(runner, wakenUp.getAndSet(false));
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462 if (wakenUp.get()) {
463 selector.wakeup();
464 }
465
466 default:
467 }
468 } catch (IOException e) {
469
470
471 rebuildSelector0();
472 handleLoopException(e);
473 return 0;
474 }
475
476 cancelledKeys = 0;
477 needsToSelectAgain = false;
478 handled = processSelectedKeys();
479 } catch (Error e) {
480 throw e;
481 } catch (Throwable t) {
482 handleLoopException(t);
483 }
484 return handled;
485 }
486
487 private static void handleLoopException(Throwable t) {
488 logger.warn("Unexpected exception in the selector loop.", t);
489
490
491
492 try {
493 Thread.sleep(1000);
494 } catch (InterruptedException e) {
495
496 }
497 }
498
499 private int processSelectedKeys() {
500 if (selectedKeys != null) {
501 return processSelectedKeysOptimized();
502 } else {
503 return processSelectedKeysPlain(selector.selectedKeys());
504 }
505 }
506
507 @Override
508 public void destroy() {
509 try {
510 selector.close();
511 } catch (IOException e) {
512 logger.warn("Failed to close a selector.", e);
513 }
514 }
515
516 private int processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
517
518
519
520 if (selectedKeys.isEmpty()) {
521 return 0;
522 }
523
524 Iterator<SelectionKey> i = selectedKeys.iterator();
525 int handled = 0;
526 for (;;) {
527 final SelectionKey k = i.next();
528 i.remove();
529
530 processSelectedKey(k);
531 ++handled;
532
533 if (!i.hasNext()) {
534 break;
535 }
536
537 if (needsToSelectAgain) {
538 selectAgain();
539 selectedKeys = selector.selectedKeys();
540
541
542 if (selectedKeys.isEmpty()) {
543 break;
544 } else {
545 i = selectedKeys.iterator();
546 }
547 }
548 }
549 return handled;
550 }
551
552 private int processSelectedKeysOptimized() {
553 int handled = 0;
554 for (int i = 0; i < selectedKeys.size; ++i) {
555 final SelectionKey k = selectedKeys.keys[i];
556
557
558 selectedKeys.keys[i] = null;
559
560 processSelectedKey(k);
561 ++handled;
562
563 if (needsToSelectAgain) {
564
565
566 selectedKeys.reset(i + 1);
567
568 selectAgain();
569 i = -1;
570 }
571 }
572 return handled;
573 }
574
575 private void processSelectedKey(SelectionKey k) {
576 final DefaultNioRegistration registration = (DefaultNioRegistration) k.attachment();
577 if (!registration.isValid()) {
578 try {
579 registration.handle.close();
580 } catch (Exception e) {
581 logger.debug("Exception during closing " + registration.handle, e);
582 }
583 return;
584 }
585 registration.handle(k.readyOps());
586 }
587
588 @Override
589 public void prepareToDestroy() {
590 selectAgain();
591 Set<SelectionKey> keys = selector.keys();
592 Collection<DefaultNioRegistration> registrations = new ArrayList<>(keys.size());
593 for (SelectionKey k: keys) {
594 DefaultNioRegistration handle = (DefaultNioRegistration) k.attachment();
595 registrations.add(handle);
596 }
597
598 for (DefaultNioRegistration reg: registrations) {
599 reg.close();
600 }
601 }
602
603 @Override
604 public void wakeup(IoEventLoop eventLoop) {
605 if (!eventLoop.inEventLoop() && wakenUp.compareAndSet(false, true)) {
606 selector.wakeup();
607 }
608 }
609
610 @Override
611 public boolean isCompatible(Class<? extends IoHandle> handleType) {
612 return NioIoHandle.class.isAssignableFrom(handleType);
613 }
614
615 Selector unwrappedSelector() {
616 return unwrappedSelector;
617 }
618
619 private void select(IoExecutionContext runner, boolean oldWakenUp) throws IOException {
620 Selector selector = this.selector;
621 try {
622 int selectCnt = 0;
623 long currentTimeNanos = System.nanoTime();
624 long selectDeadLineNanos = currentTimeNanos + runner.delayNanos(currentTimeNanos);
625
626 for (;;) {
627 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
628 if (timeoutMillis <= 0) {
629 if (selectCnt == 0) {
630 selector.selectNow();
631 selectCnt = 1;
632 }
633 break;
634 }
635
636
637
638
639
640 if (!runner.canBlock() && wakenUp.compareAndSet(false, true)) {
641 selector.selectNow();
642 selectCnt = 1;
643 break;
644 }
645
646 int selectedKeys = selector.select(timeoutMillis);
647 selectCnt ++;
648
649 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || !runner.canBlock()) {
650
651
652
653
654 break;
655 }
656 if (Thread.interrupted()) {
657
658
659
660
661
662 if (logger.isDebugEnabled()) {
663 logger.debug("Selector.select() returned prematurely because " +
664 "Thread.currentThread().interrupt() was called. Use " +
665 "NioHandler.shutdownGracefully() to shutdown the NioHandler.");
666 }
667 selectCnt = 1;
668 break;
669 }
670
671 long time = System.nanoTime();
672 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
673
674 selectCnt = 1;
675 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
676 selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
677
678
679 selector = selectRebuildSelector(selectCnt);
680 selectCnt = 1;
681 break;
682 }
683
684 currentTimeNanos = time;
685 }
686
687 if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
688 if (logger.isDebugEnabled()) {
689 logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
690 selectCnt - 1, selector);
691 }
692 }
693 } catch (CancelledKeyException e) {
694 if (logger.isDebugEnabled()) {
695 logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
696 selector, e);
697 }
698
699 }
700 }
701
702 int selectNow() throws IOException {
703 try {
704 return selector.selectNow();
705 } finally {
706
707 if (wakenUp.get()) {
708 selector.wakeup();
709 }
710 }
711 }
712
713 private Selector selectRebuildSelector(int selectCnt) throws IOException {
714
715
716 logger.warn(
717 "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
718 selectCnt, selector);
719
720 rebuildSelector0();
721 Selector selector = this.selector;
722
723
724 selector.selectNow();
725 return selector;
726 }
727
728 private void selectAgain() {
729 needsToSelectAgain = false;
730 try {
731 selector.selectNow();
732 } catch (Throwable t) {
733 logger.warn("Failed to update SelectionKeys.", t);
734 }
735 }
736
737
738
739
740
741
742 public static IoHandlerFactory newFactory() {
743 return newFactory(SelectorProvider.provider(), DefaultSelectStrategyFactory.INSTANCE);
744 }
745
746
747
748
749
750
751
752 public static IoHandlerFactory newFactory(SelectorProvider selectorProvider) {
753 return newFactory(selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
754 }
755
756
757
758
759
760
761
762
763 public static IoHandlerFactory newFactory(final SelectorProvider selectorProvider,
764 final SelectStrategyFactory selectStrategyFactory) {
765 ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
766 ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
767 return new IoHandlerFactory() {
768 @Override
769 public IoHandler newHandler() {
770 return new NioIoHandler(selectorProvider, selectStrategyFactory.newSelectStrategy());
771 }
772 };
773 }
774 }