1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.nio;
17
18 import io.netty5.channel.Channel;
19 import io.netty5.channel.ChannelException;
20 import io.netty5.channel.DefaultSelectStrategyFactory;
21 import io.netty5.channel.IoExecutionContext;
22 import io.netty5.channel.IoHandle;
23 import io.netty5.channel.IoHandler;
24 import io.netty5.channel.IoHandlerFactory;
25 import io.netty5.channel.SelectStrategy;
26 import io.netty5.channel.SelectStrategyFactory;
27 import io.netty5.util.internal.PlatformDependent;
28 import io.netty5.util.internal.ReflectionUtil;
29 import io.netty5.util.internal.StringUtil;
30 import io.netty5.util.internal.SystemPropertyUtil;
31 import io.netty5.util.internal.logging.InternalLogger;
32 import io.netty5.util.internal.logging.InternalLoggerFactory;
33
34 import java.io.IOException;
35 import java.io.UncheckedIOException;
36 import java.lang.reflect.Field;
37 import java.nio.channels.CancelledKeyException;
38 import java.nio.channels.SelectionKey;
39 import java.nio.channels.Selector;
40 import java.nio.channels.spi.SelectorProvider;
41 import java.security.AccessController;
42 import java.security.PrivilegedAction;
43 import java.util.ArrayList;
44 import java.util.Collection;
45 import java.util.Iterator;
46 import java.util.Set;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.atomic.AtomicBoolean;
49 import java.util.function.IntSupplier;
50
51 import static java.util.Objects.requireNonNull;
52
53
54
55
56
57
58 public final class NioHandler implements IoHandler {
59
60 private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioHandler.class);
61
62 private static final int CLEANUP_INTERVAL = 256;
63
64 private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
65 SystemPropertyUtil.getBoolean("io.netty5.noKeySetOptimization", false);
66
67 private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
68 private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
69
70 private final IntSupplier selectNowSupplier = () -> {
71 try {
72 return selectNow();
73 } catch (IOException e) {
74 throw new UncheckedIOException(e);
75 }
76 };
77
78 static {
79 int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty5.selectorAutoRebuildThreshold", 512);
80 if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
81 selectorAutoRebuildThreshold = 0;
82 }
83
84 SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
85
86 if (logger.isDebugEnabled()) {
87 logger.debug("-Dio.netty5.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
88 logger.debug("-Dio.netty5.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
89 }
90 }
91
92
93
94
95 private Selector selector;
96 private Selector unwrappedSelector;
97 private SelectedSelectionKeySet selectedKeys;
98
99 private final SelectorProvider provider;
100
101
102
103
104
105
106
107 private final AtomicBoolean wakenUp = new AtomicBoolean();
108
109 private final SelectStrategy selectStrategy;
110
111 private int cancelledKeys;
112 private boolean needsToSelectAgain;
113
114 private NioHandler() {
115 this(SelectorProvider.provider(), DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy());
116 }
117
118 private NioHandler(SelectorProvider selectorProvider, SelectStrategy strategy) {
119 provider = selectorProvider;
120 final SelectorTuple selectorTuple = openSelector();
121 selector = selectorTuple.selector;
122 unwrappedSelector = selectorTuple.unwrappedSelector;
123 selectStrategy = strategy;
124 }
125
126
127
128
129 public static IoHandlerFactory newFactory() {
130 return NioHandler::new;
131 }
132
133
134
135
136 public static IoHandlerFactory newFactory(final SelectorProvider selectorProvider,
137 final SelectStrategyFactory selectStrategyFactory) {
138 requireNonNull(selectorProvider, "selectorProvider");
139 requireNonNull(selectStrategyFactory, "selectStrategyFactory");
140 return () -> new NioHandler(selectorProvider, selectStrategyFactory.newSelectStrategy());
141 }
142
143 private static final class SelectorTuple {
144 final Selector unwrappedSelector;
145 final Selector selector;
146
147 SelectorTuple(Selector unwrappedSelector) {
148 this.unwrappedSelector = unwrappedSelector;
149 this.selector = unwrappedSelector;
150 }
151
152 SelectorTuple(Selector unwrappedSelector, Selector selector) {
153 this.unwrappedSelector = unwrappedSelector;
154 this.selector = selector;
155 }
156 }
157
158 private SelectorTuple openSelector() {
159 final Selector unwrappedSelector;
160 try {
161 unwrappedSelector = provider.openSelector();
162 } catch (IOException e) {
163 throw new ChannelException("failed to open a new selector", e);
164 }
165
166 if (DISABLE_KEY_SET_OPTIMIZATION) {
167 return new SelectorTuple(unwrappedSelector);
168 }
169
170 Object maybeSelectorImplClass = AccessController.doPrivileged((PrivilegedAction<Object>) () -> {
171 try {
172 return Class.forName(
173 "sun.nio.ch.SelectorImpl",
174 false,
175 PlatformDependent.getSystemClassLoader());
176 } catch (Throwable cause) {
177 return cause;
178 }
179 });
180
181 if (!(maybeSelectorImplClass instanceof Class) ||
182
183 !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
184 if (maybeSelectorImplClass instanceof Throwable) {
185 Throwable t = (Throwable) maybeSelectorImplClass;
186 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
187 }
188 return new SelectorTuple(unwrappedSelector);
189 }
190
191 final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
192 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
193
194 Object maybeException = AccessController.doPrivileged((PrivilegedAction<Object>) () -> {
195 try {
196 Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
197 Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
198
199 if (PlatformDependent.hasUnsafe()) {
200
201
202 long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
203 long publicSelectedKeysFieldOffset =
204 PlatformDependent.objectFieldOffset(publicSelectedKeysField);
205
206 if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
207 PlatformDependent.putObject(
208 unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
209 PlatformDependent.putObject(
210 unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
211 return null;
212 }
213
214 }
215
216 Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
217 if (cause != null) {
218 return cause;
219 }
220 cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
221 if (cause != null) {
222 return cause;
223 }
224
225 selectedKeysField.set(unwrappedSelector, selectedKeySet);
226 publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
227 return null;
228 } catch (NoSuchFieldException | IllegalAccessException e) {
229 return e;
230 }
231 });
232
233 if (maybeException instanceof Exception) {
234 selectedKeys = null;
235 Exception e = (Exception) maybeException;
236 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
237 return new SelectorTuple(unwrappedSelector);
238 }
239 selectedKeys = selectedKeySet;
240 logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
241 return new SelectorTuple(unwrappedSelector,
242 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
243 }
244
245
246
247
248 public SelectorProvider selectorProvider() {
249 return provider;
250 }
251
252
253
254
255
256 void rebuildSelector() {
257 final Selector oldSelector = selector;
258 final SelectorTuple newSelectorTuple;
259
260 if (oldSelector == null) {
261 return;
262 }
263
264 try {
265 newSelectorTuple = openSelector();
266 } catch (Exception e) {
267 logger.warn("Failed to create a new Selector.", e);
268 return;
269 }
270
271
272 int nChannels = 0;
273 for (SelectionKey key: oldSelector.keys()) {
274 NioProcessor handle = (NioProcessor) key.attachment();
275 try {
276 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
277 continue;
278 }
279 handle.register(newSelectorTuple.unwrappedSelector);
280 nChannels ++;
281 } catch (Exception e) {
282 logger.warn("Failed to re-register a NioHandle to the new Selector.", e);
283 handle.close();
284 }
285 }
286
287 selector = newSelectorTuple.selector;
288 unwrappedSelector = newSelectorTuple.unwrappedSelector;
289
290 try {
291
292 oldSelector.close();
293 } catch (Throwable t) {
294 if (logger.isWarnEnabled()) {
295 logger.warn("Failed to close the old Selector.", t);
296 }
297 }
298
299 if (logger.isInfoEnabled()) {
300 logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
301 }
302 }
303
304 private static NioProcessor nioHandle(IoHandle handle) {
305 if (handle instanceof AbstractNioChannel) {
306 return ((AbstractNioChannel<?, ?, ?>) handle).nioProcessor();
307 }
308 if (handle instanceof NioSelectableChannelHandle) {
309 return ((NioSelectableChannelHandle<?>) handle).nioProcessor();
310 }
311 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
312 }
313
314 @Override
315 public void register(IoHandle handle) throws Exception {
316 NioProcessor nioProcessor = nioHandle(handle);
317 boolean selected = false;
318 for (;;) {
319 try {
320 nioProcessor.register(unwrappedSelector());
321 return;
322 } catch (CancelledKeyException e) {
323 if (!selected) {
324
325
326 selectNow();
327 selected = true;
328 } else {
329
330
331 throw e;
332 }
333 }
334 }
335 }
336
337 @Override
338 public void deregister(IoHandle handle) {
339 NioProcessor nioProcessor = nioHandle(handle);
340 nioProcessor.deregister();
341 cancelledKeys ++;
342 if (cancelledKeys >= CLEANUP_INTERVAL) {
343 cancelledKeys = 0;
344 needsToSelectAgain = true;
345 }
346 }
347
348 @Override
349 public int run(IoExecutionContext runner) {
350 int handled = 0;
351 try {
352 try {
353 switch (selectStrategy.calculateStrategy(selectNowSupplier, !runner.canBlock())) {
354 case SelectStrategy.CONTINUE:
355 return 0;
356
357 case SelectStrategy.BUSY_WAIT:
358
359
360 case SelectStrategy.SELECT:
361 select(runner, wakenUp.getAndSet(false));
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391 if (wakenUp.get()) {
392 selector.wakeup();
393 }
394
395 default:
396 }
397 } catch (IOException e) {
398
399
400 rebuildSelector();
401 handleLoopException(e);
402 return 0;
403 }
404
405 cancelledKeys = 0;
406 needsToSelectAgain = false;
407 handled = processSelectedKeys();
408 } catch (Error e) {
409 throw e;
410 } catch (Throwable t) {
411 handleLoopException(t);
412 }
413 return handled;
414 }
415
416 private static void handleLoopException(Throwable t) {
417 logger.warn("Unexpected exception in the selector loop.", t);
418
419
420
421 try {
422 Thread.sleep(1000);
423 } catch (InterruptedException e) {
424
425 }
426 }
427
428 private int processSelectedKeys() {
429 if (selectedKeys != null) {
430 return processSelectedKeysOptimized();
431 } else {
432 return processSelectedKeysPlain(selector.selectedKeys());
433 }
434 }
435
436 @Override
437 public void destroy() {
438 try {
439 selector.close();
440 } catch (IOException e) {
441 logger.warn("Failed to close a selector.", e);
442 }
443 }
444
445 private int processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
446
447
448
449 if (selectedKeys.isEmpty()) {
450 return 0;
451 }
452
453 Iterator<SelectionKey> i = selectedKeys.iterator();
454 int handled = 0;
455 for (;;) {
456 final SelectionKey k = i.next();
457 i.remove();
458
459 processSelectedKey(k);
460 ++handled;
461
462 if (!i.hasNext()) {
463 break;
464 }
465
466 if (needsToSelectAgain) {
467 selectAgain();
468 selectedKeys = selector.selectedKeys();
469
470
471 if (selectedKeys.isEmpty()) {
472 break;
473 } else {
474 i = selectedKeys.iterator();
475 }
476 }
477 }
478 return handled;
479 }
480
481 private int processSelectedKeysOptimized() {
482 int handled = 0;
483 for (int i = 0; i < selectedKeys.size; ++i) {
484 final SelectionKey k = selectedKeys.keys[i];
485
486
487 selectedKeys.keys[i] = null;
488
489 processSelectedKey(k);
490 ++handled;
491
492 if (needsToSelectAgain) {
493
494
495 selectedKeys.reset(i + 1);
496
497 selectAgain();
498 i = -1;
499 }
500 }
501 return handled;
502 }
503
504 private void processSelectedKey(SelectionKey k) {
505 final NioProcessor handle = (NioProcessor) k.attachment();
506 handle.handle(k);
507 }
508
509 @Override
510 public void prepareToDestroy() {
511 selectAgain();
512 Set<SelectionKey> keys = selector.keys();
513 Collection<NioProcessor> handles = new ArrayList<>(keys.size());
514 for (SelectionKey k: keys) {
515 NioProcessor handle = (NioProcessor) k.attachment();
516 handles.add(handle);
517 }
518
519 for (NioProcessor h: handles) {
520 h.close();
521 }
522 }
523
524 @Override
525 public void wakeup(boolean inEventLoop) {
526 if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
527 selector.wakeup();
528 }
529 }
530
531 @Override
532 public boolean isCompatible(Class<? extends IoHandle> handleType) {
533 return AbstractNioChannel.class.isAssignableFrom(handleType);
534 }
535
536 Selector unwrappedSelector() {
537 return unwrappedSelector;
538 }
539
540 private int selectNow() throws IOException {
541 try {
542 return selector.selectNow();
543 } finally {
544
545 if (wakenUp.get()) {
546 selector.wakeup();
547 }
548 }
549 }
550
551 private void select(IoExecutionContext runner, boolean oldWakenUp) throws IOException {
552 Selector selector = this.selector;
553 try {
554 int selectCnt = 0;
555 long currentTimeNanos = System.nanoTime();
556 long selectDeadLineNanos = currentTimeNanos + runner.delayNanos(currentTimeNanos);
557
558 for (;;) {
559 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
560 if (timeoutMillis <= 0) {
561 if (selectCnt == 0) {
562 selector.selectNow();
563 selectCnt = 1;
564 }
565 break;
566 }
567
568
569
570
571
572 if (!runner.canBlock() && wakenUp.compareAndSet(false, true)) {
573 selector.selectNow();
574 selectCnt = 1;
575 break;
576 }
577
578 int selectedKeys = selector.select(timeoutMillis);
579 selectCnt ++;
580
581 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || !runner.canBlock()) {
582
583
584
585
586 break;
587 }
588 if (Thread.interrupted()) {
589
590
591
592
593
594 if (logger.isDebugEnabled()) {
595 logger.debug("Selector.select() returned prematurely because " +
596 "Thread.currentThread().interrupt() was called. Use " +
597 "NioHandler.shutdownGracefully() to shutdown the NioHandler.");
598 }
599 selectCnt = 1;
600 break;
601 }
602
603 long time = System.nanoTime();
604 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
605
606 selectCnt = 1;
607 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
608 selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
609
610
611 selector = selectRebuildSelector(selectCnt);
612 selectCnt = 1;
613 break;
614 }
615
616 currentTimeNanos = time;
617 }
618
619 if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
620 if (logger.isDebugEnabled()) {
621 logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
622 selectCnt - 1, selector);
623 }
624 }
625 } catch (CancelledKeyException e) {
626 if (logger.isDebugEnabled()) {
627 logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
628 selector, e);
629 }
630
631 }
632 }
633
634 private Selector selectRebuildSelector(int selectCnt) throws IOException {
635
636
637 logger.warn(
638 "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
639 selectCnt, selector);
640
641 rebuildSelector();
642 Selector selector = this.selector;
643
644
645 selector.selectNow();
646 return selector;
647 }
648
649 private void selectAgain() {
650 needsToSelectAgain = false;
651 try {
652 selector.selectNow();
653 } catch (Throwable t) {
654 logger.warn("Failed to update SelectionKeys.", t);
655 }
656 }
657 }