1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.channel.ChannelException;
19 import io.netty.channel.DefaultSelectStrategyFactory;
20 import io.netty.channel.IoHandlerContext;
21 import io.netty.channel.IoHandle;
22 import io.netty.channel.IoHandler;
23 import io.netty.channel.IoHandlerFactory;
24 import io.netty.channel.IoOps;
25 import io.netty.channel.IoRegistration;
26 import io.netty.channel.SelectStrategy;
27 import io.netty.channel.SelectStrategyFactory;
28 import io.netty.util.IntSupplier;
29 import io.netty.util.concurrent.ThreadAwareExecutor;
30 import io.netty.util.internal.ObjectUtil;
31 import io.netty.util.internal.PlatformDependent;
32 import io.netty.util.internal.ReflectionUtil;
33 import io.netty.util.internal.StringUtil;
34 import io.netty.util.internal.SystemPropertyUtil;
35 import io.netty.util.internal.logging.InternalLogger;
36 import io.netty.util.internal.logging.InternalLoggerFactory;
37
38 import java.io.IOException;
39 import java.lang.reflect.Field;
40 import java.nio.channels.CancelledKeyException;
41 import java.nio.channels.Selector;
42 import java.nio.channels.SelectionKey;
43
44 import java.nio.channels.spi.SelectorProvider;
45 import java.security.AccessController;
46 import java.security.PrivilegedAction;
47 import java.util.ArrayList;
48 import java.util.Collection;
49 import java.util.Iterator;
50 import java.util.Set;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.atomic.AtomicBoolean;
53
54
55
56
57 public final class NioIoHandler implements IoHandler {
58
59 private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioIoHandler.class);
60
61 private static final int CLEANUP_INTERVAL = 256;
62
63 private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
64 SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
65
66 private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
67 private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
68
69 private final IntSupplier selectNowSupplier = new IntSupplier() {
70 @Override
71 public int get() throws Exception {
72 return selectNow();
73 }
74 };
75
76
77
78
79
80
81
82 static {
83 int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
84 if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
85 selectorAutoRebuildThreshold = 0;
86 }
87
88 SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
89
90 if (logger.isDebugEnabled()) {
91 logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
92 logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
93 }
94 }
95
96
97
98
99 private Selector selector;
100 private Selector unwrappedSelector;
101 private SelectedSelectionKeySet selectedKeys;
102
103 private final SelectorProvider provider;
104
105
106
107
108
109
110
111 private final AtomicBoolean wakenUp = new AtomicBoolean();
112
113 private final SelectStrategy selectStrategy;
114 private final ThreadAwareExecutor executor;
115 private int cancelledKeys;
116 private boolean needsToSelectAgain;
117
118 private NioIoHandler(ThreadAwareExecutor executor, SelectorProvider selectorProvider,
119 SelectStrategy strategy) {
120 this.executor = ObjectUtil.checkNotNull(executor, "executionContext");
121 this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
122 this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
123 final SelectorTuple selectorTuple = openSelector();
124 this.selector = selectorTuple.selector;
125 this.unwrappedSelector = selectorTuple.unwrappedSelector;
126 }
127
128 private static final class SelectorTuple {
129 final Selector unwrappedSelector;
130 final Selector selector;
131
132 SelectorTuple(Selector unwrappedSelector) {
133 this.unwrappedSelector = unwrappedSelector;
134 this.selector = unwrappedSelector;
135 }
136
137 SelectorTuple(Selector unwrappedSelector, Selector selector) {
138 this.unwrappedSelector = unwrappedSelector;
139 this.selector = selector;
140 }
141 }
142
143 private SelectorTuple openSelector() {
144 final Selector unwrappedSelector;
145 try {
146 unwrappedSelector = provider.openSelector();
147 } catch (IOException e) {
148 throw new ChannelException("failed to open a new selector", e);
149 }
150
151 if (DISABLE_KEY_SET_OPTIMIZATION) {
152 return new SelectorTuple(unwrappedSelector);
153 }
154
155 Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
156 @Override
157 public Object run() {
158 try {
159 return Class.forName(
160 "sun.nio.ch.SelectorImpl",
161 false,
162 PlatformDependent.getSystemClassLoader());
163 } catch (Throwable cause) {
164 return cause;
165 }
166 }
167 });
168
169 if (!(maybeSelectorImplClass instanceof Class) ||
170
171 !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
172 if (maybeSelectorImplClass instanceof Throwable) {
173 Throwable t = (Throwable) maybeSelectorImplClass;
174 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
175 }
176 return new SelectorTuple(unwrappedSelector);
177 }
178
179 final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
180 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
181
182 Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
183 @Override
184 public Object run() {
185 try {
186 Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
187 Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
188
189 if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
190
191
192 long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
193 long publicSelectedKeysFieldOffset =
194 PlatformDependent.objectFieldOffset(publicSelectedKeysField);
195
196 if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
197 PlatformDependent.putObject(
198 unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
199 PlatformDependent.putObject(
200 unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
201 return null;
202 }
203
204 }
205
206 Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
207 if (cause != null) {
208 return cause;
209 }
210 cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
211 if (cause != null) {
212 return cause;
213 }
214
215 selectedKeysField.set(unwrappedSelector, selectedKeySet);
216 publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
217 return null;
218 } catch (NoSuchFieldException e) {
219 return e;
220 } catch (IllegalAccessException e) {
221 return e;
222 }
223 }
224 });
225
226 if (maybeException instanceof Exception) {
227 selectedKeys = null;
228 Exception e = (Exception) maybeException;
229 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
230 return new SelectorTuple(unwrappedSelector);
231 }
232 selectedKeys = selectedKeySet;
233 logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
234 return new SelectorTuple(unwrappedSelector,
235 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
236 }
237
238
239
240
241 public SelectorProvider selectorProvider() {
242 return provider;
243 }
244
245 Selector selector() {
246 return selector;
247 }
248
249 int numRegistered() {
250 return selector().keys().size() - cancelledKeys;
251 }
252
253 Set<SelectionKey> registeredSet() {
254 return selector().keys();
255 }
256
257 void rebuildSelector0() {
258 final Selector oldSelector = selector;
259 final SelectorTuple newSelectorTuple;
260
261 if (oldSelector == null) {
262 return;
263 }
264
265 try {
266 newSelectorTuple = openSelector();
267 } catch (Exception e) {
268 logger.warn("Failed to create a new Selector.", e);
269 return;
270 }
271
272
273 int nChannels = 0;
274 for (SelectionKey key : oldSelector.keys()) {
275 DefaultNioRegistration handle = (DefaultNioRegistration) key.attachment();
276 try {
277 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
278 continue;
279 }
280
281 handle.register(newSelectorTuple.unwrappedSelector);
282 nChannels++;
283 } catch (Exception e) {
284 logger.warn("Failed to re-register a NioHandle to the new Selector.", e);
285 handle.cancel();
286 }
287 }
288
289 selector = newSelectorTuple.selector;
290 unwrappedSelector = newSelectorTuple.unwrappedSelector;
291
292 try {
293
294 oldSelector.close();
295 } catch (Throwable t) {
296 if (logger.isWarnEnabled()) {
297 logger.warn("Failed to close the old Selector.", t);
298 }
299 }
300
301 if (logger.isInfoEnabled()) {
302 logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
303 }
304 }
305
306 private static NioIoHandle nioHandle(IoHandle handle) {
307 if (handle instanceof NioIoHandle) {
308 return (NioIoHandle) handle;
309 }
310 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
311 }
312
313 private static NioIoOps cast(IoOps ops) {
314 if (ops instanceof NioIoOps) {
315 return (NioIoOps) ops;
316 }
317 throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
318 }
319
320 final class DefaultNioRegistration implements IoRegistration {
321 private final AtomicBoolean canceled = new AtomicBoolean();
322 private final NioIoHandle handle;
323 private volatile SelectionKey key;
324
325 DefaultNioRegistration(ThreadAwareExecutor executor, NioIoHandle handle, NioIoOps initialOps, Selector selector)
326 throws IOException {
327 this.handle = handle;
328 key = handle.selectableChannel().register(selector, initialOps.value, this);
329 }
330
331 NioIoHandle handle() {
332 return handle;
333 }
334
335 void register(Selector selector) throws IOException {
336 SelectionKey newKey = handle.selectableChannel().register(selector, key.interestOps(), this);
337 key.cancel();
338 key = newKey;
339 }
340
341 @SuppressWarnings("unchecked")
342 @Override
343 public <T> T attachment() {
344 return (T) key;
345 }
346
347 @Override
348 public boolean isValid() {
349 return !canceled.get() && key.isValid();
350 }
351
352 @Override
353 public long submit(IoOps ops) {
354 int v = cast(ops).value;
355 key.interestOps(v);
356 return v;
357 }
358
359 @Override
360 public boolean cancel() {
361 if (!canceled.compareAndSet(false, true)) {
362 return false;
363 }
364 key.cancel();
365 cancelledKeys++;
366 if (cancelledKeys >= CLEANUP_INTERVAL) {
367 cancelledKeys = 0;
368 needsToSelectAgain = true;
369 }
370 return true;
371 }
372
373 void close() {
374 cancel();
375 try {
376 handle.close();
377 } catch (Exception e) {
378 logger.debug("Exception during closing " + handle, e);
379 }
380 }
381
382 void handle(int ready) {
383 handle.handle(this, NioIoOps.eventOf(ready));
384 }
385 }
386
387 @Override
388 public IoRegistration register(IoHandle handle)
389 throws Exception {
390 NioIoHandle nioHandle = nioHandle(handle);
391 NioIoOps ops = NioIoOps.NONE;
392 boolean selected = false;
393 for (;;) {
394 try {
395 return new DefaultNioRegistration(executor, nioHandle, ops, unwrappedSelector());
396 } catch (CancelledKeyException e) {
397 if (!selected) {
398
399
400 selectNow();
401 selected = true;
402 } else {
403
404
405 throw e;
406 }
407 }
408 }
409 }
410
411 @Override
412 public int run(IoHandlerContext context) {
413 int handled = 0;
414 try {
415 try {
416 switch (selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock())) {
417 case SelectStrategy.CONTINUE:
418 return 0;
419
420 case SelectStrategy.BUSY_WAIT:
421
422
423 case SelectStrategy.SELECT:
424 select(context, wakenUp.getAndSet(false));
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454 if (wakenUp.get()) {
455 selector.wakeup();
456 }
457
458 default:
459 }
460 } catch (IOException e) {
461
462
463 rebuildSelector0();
464 handleLoopException(e);
465 return 0;
466 }
467
468 cancelledKeys = 0;
469 needsToSelectAgain = false;
470 handled = processSelectedKeys();
471 } catch (Error e) {
472 throw e;
473 } catch (Throwable t) {
474 handleLoopException(t);
475 }
476 return handled;
477 }
478
479 private static void handleLoopException(Throwable t) {
480 logger.warn("Unexpected exception in the selector loop.", t);
481
482
483
484 try {
485 Thread.sleep(1000);
486 } catch (InterruptedException e) {
487
488 }
489 }
490
491 private int processSelectedKeys() {
492 if (selectedKeys != null) {
493 return processSelectedKeysOptimized();
494 } else {
495 return processSelectedKeysPlain(selector.selectedKeys());
496 }
497 }
498
499 @Override
500 public void destroy() {
501 try {
502 selector.close();
503 } catch (IOException e) {
504 logger.warn("Failed to close a selector.", e);
505 }
506 }
507
508 private int processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
509
510
511
512 if (selectedKeys.isEmpty()) {
513 return 0;
514 }
515
516 Iterator<SelectionKey> i = selectedKeys.iterator();
517 int handled = 0;
518 for (;;) {
519 final SelectionKey k = i.next();
520 i.remove();
521
522 processSelectedKey(k);
523 ++handled;
524
525 if (!i.hasNext()) {
526 break;
527 }
528
529 if (needsToSelectAgain) {
530 selectAgain();
531 selectedKeys = selector.selectedKeys();
532
533
534 if (selectedKeys.isEmpty()) {
535 break;
536 } else {
537 i = selectedKeys.iterator();
538 }
539 }
540 }
541 return handled;
542 }
543
544 private int processSelectedKeysOptimized() {
545 int handled = 0;
546 for (int i = 0; i < selectedKeys.size; ++i) {
547 final SelectionKey k = selectedKeys.keys[i];
548
549
550 selectedKeys.keys[i] = null;
551
552 processSelectedKey(k);
553 ++handled;
554
555 if (needsToSelectAgain) {
556
557
558 selectedKeys.reset(i + 1);
559
560 selectAgain();
561 i = -1;
562 }
563 }
564 return handled;
565 }
566
567 private void processSelectedKey(SelectionKey k) {
568 final DefaultNioRegistration registration = (DefaultNioRegistration) k.attachment();
569 if (!registration.isValid()) {
570 try {
571 registration.handle.close();
572 } catch (Exception e) {
573 logger.debug("Exception during closing " + registration.handle, e);
574 }
575 return;
576 }
577 registration.handle(k.readyOps());
578 }
579
580 @Override
581 public void prepareToDestroy() {
582 selectAgain();
583 Set<SelectionKey> keys = selector.keys();
584 Collection<DefaultNioRegistration> registrations = new ArrayList<>(keys.size());
585 for (SelectionKey k: keys) {
586 DefaultNioRegistration handle = (DefaultNioRegistration) k.attachment();
587 registrations.add(handle);
588 }
589
590 for (DefaultNioRegistration reg: registrations) {
591 reg.close();
592 }
593 }
594
595 @Override
596 public void wakeup() {
597 if (!executor.isExecutorThread(Thread.currentThread()) && wakenUp.compareAndSet(false, true)) {
598 selector.wakeup();
599 }
600 }
601
602 @Override
603 public boolean isCompatible(Class<? extends IoHandle> handleType) {
604 return NioIoHandle.class.isAssignableFrom(handleType);
605 }
606
607 Selector unwrappedSelector() {
608 return unwrappedSelector;
609 }
610
611 private void select(IoHandlerContext runner, boolean oldWakenUp) throws IOException {
612 Selector selector = this.selector;
613 try {
614 int selectCnt = 0;
615 long currentTimeNanos = System.nanoTime();
616 long selectDeadLineNanos = currentTimeNanos + runner.delayNanos(currentTimeNanos);
617
618 for (;;) {
619 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
620 if (timeoutMillis <= 0) {
621 if (selectCnt == 0) {
622 selector.selectNow();
623 selectCnt = 1;
624 }
625 break;
626 }
627
628
629
630
631
632 if (!runner.canBlock() && wakenUp.compareAndSet(false, true)) {
633 selector.selectNow();
634 selectCnt = 1;
635 break;
636 }
637
638 int selectedKeys = selector.select(timeoutMillis);
639 selectCnt ++;
640
641 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || !runner.canBlock()) {
642
643
644
645
646 break;
647 }
648 if (Thread.interrupted()) {
649
650
651
652
653
654 if (logger.isDebugEnabled()) {
655 logger.debug("Selector.select() returned prematurely because " +
656 "Thread.currentThread().interrupt() was called. Use " +
657 "NioHandler.shutdownGracefully() to shutdown the NioHandler.");
658 }
659 selectCnt = 1;
660 break;
661 }
662
663 long time = System.nanoTime();
664 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
665
666 selectCnt = 1;
667 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
668 selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
669
670
671 selector = selectRebuildSelector(selectCnt);
672 selectCnt = 1;
673 break;
674 }
675
676 currentTimeNanos = time;
677 }
678
679 if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
680 if (logger.isDebugEnabled()) {
681 logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
682 selectCnt - 1, selector);
683 }
684 }
685 } catch (CancelledKeyException e) {
686 if (logger.isDebugEnabled()) {
687 logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
688 selector, e);
689 }
690
691 }
692 }
693
694 int selectNow() throws IOException {
695 try {
696 return selector.selectNow();
697 } finally {
698
699 if (wakenUp.get()) {
700 selector.wakeup();
701 }
702 }
703 }
704
705 private Selector selectRebuildSelector(int selectCnt) throws IOException {
706
707
708 logger.warn(
709 "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
710 selectCnt, selector);
711
712 rebuildSelector0();
713 Selector selector = this.selector;
714
715
716 selector.selectNow();
717 return selector;
718 }
719
720 private void selectAgain() {
721 needsToSelectAgain = false;
722 try {
723 selector.selectNow();
724 } catch (Throwable t) {
725 logger.warn("Failed to update SelectionKeys.", t);
726 }
727 }
728
729
730
731
732
733
734 public static IoHandlerFactory newFactory() {
735 return newFactory(SelectorProvider.provider(), DefaultSelectStrategyFactory.INSTANCE);
736 }
737
738
739
740
741
742
743
744 public static IoHandlerFactory newFactory(SelectorProvider selectorProvider) {
745 return newFactory(selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
746 }
747
748
749
750
751
752
753
754
755 public static IoHandlerFactory newFactory(final SelectorProvider selectorProvider,
756 final SelectStrategyFactory selectStrategyFactory) {
757 ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
758 ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
759 return context -> new NioIoHandler(context, selectorProvider, selectStrategyFactory.newSelectStrategy());
760 }
761 }