1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelException;
20 import io.netty.channel.EventLoop;
21 import io.netty.channel.EventLoopException;
22 import io.netty.channel.SelectStrategy;
23 import io.netty.channel.SingleThreadEventLoop;
24 import io.netty.channel.nio.AbstractNioChannel.NioUnsafe;
25 import io.netty.util.IntSupplier;
26 import io.netty.util.concurrent.RejectedExecutionHandler;
27 import io.netty.util.internal.PlatformDependent;
28 import io.netty.util.internal.ReflectionUtil;
29 import io.netty.util.internal.SystemPropertyUtil;
30 import io.netty.util.internal.logging.InternalLogger;
31 import io.netty.util.internal.logging.InternalLoggerFactory;
32
33 import java.io.IOException;
34 import java.lang.reflect.Field;
35 import java.nio.channels.CancelledKeyException;
36 import java.nio.channels.SelectableChannel;
37 import java.nio.channels.SelectionKey;
38 import java.nio.channels.Selector;
39 import java.nio.channels.spi.SelectorProvider;
40 import java.security.AccessController;
41 import java.security.PrivilegedAction;
42 import java.util.ArrayList;
43 import java.util.Collection;
44 import java.util.Iterator;
45 import java.util.Queue;
46 import java.util.Set;
47 import java.util.concurrent.ThreadFactory;
48 import java.util.concurrent.Callable;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.atomic.AtomicBoolean;
51
52
53
54
55
56
57 public final class NioEventLoop extends SingleThreadEventLoop {
58
59 private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
60
61 private static final int CLEANUP_INTERVAL = 256;
62
63 private static final boolean DISABLE_KEYSET_OPTIMIZATION =
64 SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
65
66 private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
67 private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
68
69 private final IntSupplier selectNowSupplier = new IntSupplier() {
70 @Override
71 public int get() throws Exception {
72 return selectNow();
73 }
74 };
75 private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() {
76 @Override
77 public Integer call() throws Exception {
78 return NioEventLoop.super.pendingTasks();
79 }
80 };
81
82
83
84
85
86
87 static {
88 final String key = "sun.nio.ch.bugLevel";
89 final String buglevel = SystemPropertyUtil.get(key);
90 if (buglevel == null) {
91 try {
92 AccessController.doPrivileged(new PrivilegedAction<Void>() {
93 @Override
94 public Void run() {
95 System.setProperty(key, "");
96 return null;
97 }
98 });
99 } catch (final SecurityException e) {
100 logger.debug("Unable to get/set System Property: " + key, e);
101 }
102 }
103
104 int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
105 if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
106 selectorAutoRebuildThreshold = 0;
107 }
108
109 SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
110
111 if (logger.isDebugEnabled()) {
112 logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEYSET_OPTIMIZATION);
113 logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
114 }
115 }
116
117
118
119
120 private Selector selector;
121 private Selector unwrappedSelector;
122 private SelectedSelectionKeySet selectedKeys;
123
124 private final SelectorProvider provider;
125
126
127
128
129
130
131
132 private final AtomicBoolean wakenUp = new AtomicBoolean();
133
134 private final SelectStrategy selectStrategy;
135
136 private volatile int ioRatio = 50;
137 private int cancelledKeys;
138 private boolean needsToSelectAgain;
139
140 NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider,
141 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
142 super(parent, threadFactory, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
143 if (selectorProvider == null) {
144 throw new NullPointerException("selectorProvider");
145 }
146 if (strategy == null) {
147 throw new NullPointerException("selectStrategy");
148 }
149 provider = selectorProvider;
150 final SelectorTuple selectorTuple = openSelector();
151 selector = selectorTuple.selector;
152 unwrappedSelector = selectorTuple.unwrappedSelector;
153 selectStrategy = strategy;
154 }
155
156 private static final class SelectorTuple {
157 final Selector unwrappedSelector;
158 final Selector selector;
159
160 SelectorTuple(Selector unwrappedSelector) {
161 this.unwrappedSelector = unwrappedSelector;
162 this.selector = unwrappedSelector;
163 }
164
165 SelectorTuple(Selector unwrappedSelector, Selector selector) {
166 this.unwrappedSelector = unwrappedSelector;
167 this.selector = selector;
168 }
169 }
170
171 private SelectorTuple openSelector() {
172 final Selector unwrappedSelector;
173 try {
174 unwrappedSelector = provider.openSelector();
175 } catch (IOException e) {
176 throw new ChannelException("failed to open a new selector", e);
177 }
178
179 if (DISABLE_KEYSET_OPTIMIZATION) {
180 return new SelectorTuple(unwrappedSelector);
181 }
182
183 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
184
185 Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
186 @Override
187 public Object run() {
188 try {
189 return Class.forName(
190 "sun.nio.ch.SelectorImpl",
191 false,
192 PlatformDependent.getSystemClassLoader());
193 } catch (Throwable cause) {
194 return cause;
195 }
196 }
197 });
198
199 if (!(maybeSelectorImplClass instanceof Class) ||
200
201 !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
202 if (maybeSelectorImplClass instanceof Throwable) {
203 Throwable t = (Throwable) maybeSelectorImplClass;
204 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
205 }
206 return new SelectorTuple(unwrappedSelector);
207 }
208
209 final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
210
211 Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
212 @Override
213 public Object run() {
214 try {
215 Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
216 Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
217
218 Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
219 if (cause != null) {
220 return cause;
221 }
222 cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
223 if (cause != null) {
224 return cause;
225 }
226
227 selectedKeysField.set(unwrappedSelector, selectedKeySet);
228 publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
229 return null;
230 } catch (NoSuchFieldException e) {
231 return e;
232 } catch (IllegalAccessException e) {
233 return e;
234 }
235 }
236 });
237
238 if (maybeException instanceof Exception) {
239 selectedKeys = null;
240 Exception e = (Exception) maybeException;
241 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
242 return new SelectorTuple(unwrappedSelector);
243 }
244 selectedKeys = selectedKeySet;
245 logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
246 return new SelectorTuple(unwrappedSelector,
247 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
248 }
249
250
251
252
253 public SelectorProvider selectorProvider() {
254 return provider;
255 }
256
257 @Override
258 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
259
260 return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
261 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
262 }
263
264 @Override
265 public int pendingTasks() {
266
267
268
269 if (inEventLoop()) {
270 return super.pendingTasks();
271 } else {
272 return submit(pendingTasksCallable).syncUninterruptibly().getNow();
273 }
274 }
275
276
277
278
279
280
281 public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
282 if (ch == null) {
283 throw new NullPointerException("ch");
284 }
285 if (interestOps == 0) {
286 throw new IllegalArgumentException("interestOps must be non-zero.");
287 }
288 if ((interestOps & ~ch.validOps()) != 0) {
289 throw new IllegalArgumentException(
290 "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
291 }
292 if (task == null) {
293 throw new NullPointerException("task");
294 }
295
296 if (isShutdown()) {
297 throw new IllegalStateException("event loop shut down");
298 }
299
300 try {
301 ch.register(selector, interestOps, task);
302 } catch (Exception e) {
303 throw new EventLoopException("failed to register a channel", e);
304 }
305 }
306
307
308
309
310 public int getIoRatio() {
311 return ioRatio;
312 }
313
314
315
316
317
318 public void setIoRatio(int ioRatio) {
319 if (ioRatio <= 0 || ioRatio > 100) {
320 throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
321 }
322 this.ioRatio = ioRatio;
323 }
324
325
326
327
328
329 public void rebuildSelector() {
330 if (!inEventLoop()) {
331 execute(new Runnable() {
332 @Override
333 public void run() {
334 rebuildSelector0();
335 }
336 });
337 return;
338 }
339 rebuildSelector0();
340 }
341
342 private void rebuildSelector0() {
343 final Selector oldSelector = selector;
344 final SelectorTuple newSelectorTuple;
345
346 if (oldSelector == null) {
347 return;
348 }
349
350 try {
351 newSelectorTuple = openSelector();
352 } catch (Exception e) {
353 logger.warn("Failed to create a new Selector.", e);
354 return;
355 }
356
357
358 int nChannels = 0;
359 for (SelectionKey key: oldSelector.keys()) {
360 Object a = key.attachment();
361 try {
362 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
363 continue;
364 }
365
366 int interestOps = key.interestOps();
367 key.cancel();
368 SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
369 if (a instanceof AbstractNioChannel) {
370
371 ((AbstractNioChannel) a).selectionKey = newKey;
372 }
373 nChannels ++;
374 } catch (Exception e) {
375 logger.warn("Failed to re-register a Channel to the new Selector.", e);
376 if (a instanceof AbstractNioChannel) {
377 AbstractNioChannel ch = (AbstractNioChannel) a;
378 ch.unsafe().close(ch.unsafe().voidPromise());
379 } else {
380 @SuppressWarnings("unchecked")
381 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
382 invokeChannelUnregistered(task, key, e);
383 }
384 }
385 }
386
387 selector = newSelectorTuple.selector;
388 unwrappedSelector = newSelectorTuple.unwrappedSelector;
389
390 try {
391
392 oldSelector.close();
393 } catch (Throwable t) {
394 if (logger.isWarnEnabled()) {
395 logger.warn("Failed to close the old Selector.", t);
396 }
397 }
398
399 logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
400 }
401
402 @Override
403 protected void run() {
404 for (;;) {
405 try {
406 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
407 case SelectStrategy.CONTINUE:
408 continue;
409 case SelectStrategy.SELECT:
410 select(wakenUp.getAndSet(false));
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440 if (wakenUp.get()) {
441 selector.wakeup();
442 }
443
444 default:
445 }
446
447 cancelledKeys = 0;
448 needsToSelectAgain = false;
449 final int ioRatio = this.ioRatio;
450 if (ioRatio == 100) {
451 try {
452 processSelectedKeys();
453 } finally {
454
455 runAllTasks();
456 }
457 } else {
458 final long ioStartTime = System.nanoTime();
459 try {
460 processSelectedKeys();
461 } finally {
462
463 final long ioTime = System.nanoTime() - ioStartTime;
464 runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
465 }
466 }
467 } catch (Throwable t) {
468 handleLoopException(t);
469 }
470
471 try {
472 if (isShuttingDown()) {
473 closeAll();
474 if (confirmShutdown()) {
475 return;
476 }
477 }
478 } catch (Throwable t) {
479 handleLoopException(t);
480 }
481 }
482 }
483
484 private static void handleLoopException(Throwable t) {
485 logger.warn("Unexpected exception in the selector loop.", t);
486
487
488
489 try {
490 Thread.sleep(1000);
491 } catch (InterruptedException e) {
492
493 }
494 }
495
496 private void processSelectedKeys() {
497 if (selectedKeys != null) {
498 processSelectedKeysOptimized();
499 } else {
500 processSelectedKeysPlain(selector.selectedKeys());
501 }
502 }
503
504 @Override
505 protected void cleanup() {
506 try {
507 selector.close();
508 } catch (IOException e) {
509 logger.warn("Failed to close a selector.", e);
510 }
511 }
512
513 void cancel(SelectionKey key) {
514 key.cancel();
515 cancelledKeys ++;
516 if (cancelledKeys >= CLEANUP_INTERVAL) {
517 cancelledKeys = 0;
518 needsToSelectAgain = true;
519 }
520 }
521
522 @Override
523 protected Runnable pollTask() {
524 Runnable task = super.pollTask();
525 if (needsToSelectAgain) {
526 selectAgain();
527 }
528 return task;
529 }
530
531 private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
532
533
534
535 if (selectedKeys.isEmpty()) {
536 return;
537 }
538
539 Iterator<SelectionKey> i = selectedKeys.iterator();
540 for (;;) {
541 final SelectionKey k = i.next();
542 final Object a = k.attachment();
543 i.remove();
544
545 if (a instanceof AbstractNioChannel) {
546 processSelectedKey(k, (AbstractNioChannel) a);
547 } else {
548 @SuppressWarnings("unchecked")
549 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
550 processSelectedKey(k, task);
551 }
552
553 if (!i.hasNext()) {
554 break;
555 }
556
557 if (needsToSelectAgain) {
558 selectAgain();
559 selectedKeys = selector.selectedKeys();
560
561
562 if (selectedKeys.isEmpty()) {
563 break;
564 } else {
565 i = selectedKeys.iterator();
566 }
567 }
568 }
569 }
570
571 private void processSelectedKeysOptimized() {
572 for (int i = 0; i < selectedKeys.size; ++i) {
573 final SelectionKey k = selectedKeys.keys[i];
574
575
576 selectedKeys.keys[i] = null;
577
578 final Object a = k.attachment();
579
580 if (a instanceof AbstractNioChannel) {
581 processSelectedKey(k, (AbstractNioChannel) a);
582 } else {
583 @SuppressWarnings("unchecked")
584 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
585 processSelectedKey(k, task);
586 }
587
588 if (needsToSelectAgain) {
589
590
591 selectedKeys.reset(i + 1);
592
593 selectAgain();
594 i = -1;
595 }
596 }
597 }
598
599 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
600 final NioUnsafe unsafe = ch.unsafe();
601 if (!k.isValid()) {
602 final EventLoop eventLoop;
603 try {
604 eventLoop = ch.eventLoop();
605 } catch (Throwable ignored) {
606
607
608
609 return;
610 }
611
612
613
614
615 if (eventLoop != this || eventLoop == null) {
616 return;
617 }
618
619 unsafe.close(unsafe.voidPromise());
620 return;
621 }
622
623 try {
624 int readyOps = k.readyOps();
625
626
627 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
628
629
630 int ops = k.interestOps();
631 ops &= ~SelectionKey.OP_CONNECT;
632 k.interestOps(ops);
633
634 unsafe.finishConnect();
635 }
636
637
638 if ((readyOps & SelectionKey.OP_WRITE) != 0) {
639
640 ch.unsafe().forceFlush();
641 }
642
643
644
645 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
646 unsafe.read();
647 }
648 } catch (CancelledKeyException ignored) {
649 unsafe.close(unsafe.voidPromise());
650 }
651 }
652
653 private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
654 int state = 0;
655 try {
656 task.channelReady(k.channel(), k);
657 state = 1;
658 } catch (Exception e) {
659 k.cancel();
660 invokeChannelUnregistered(task, k, e);
661 state = 2;
662 } finally {
663 switch (state) {
664 case 0:
665 k.cancel();
666 invokeChannelUnregistered(task, k, null);
667 break;
668 case 1:
669 if (!k.isValid()) {
670 invokeChannelUnregistered(task, k, null);
671 }
672 break;
673 }
674 }
675 }
676
677 private void closeAll() {
678 selectAgain();
679 Set<SelectionKey> keys = selector.keys();
680 Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
681 for (SelectionKey k: keys) {
682 Object a = k.attachment();
683 if (a instanceof AbstractNioChannel) {
684 channels.add((AbstractNioChannel) a);
685 } else {
686 k.cancel();
687 @SuppressWarnings("unchecked")
688 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
689 invokeChannelUnregistered(task, k, null);
690 }
691 }
692
693 for (AbstractNioChannel ch: channels) {
694 ch.unsafe().close(ch.unsafe().voidPromise());
695 }
696 }
697
698 private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause) {
699 try {
700 task.channelUnregistered(k.channel(), cause);
701 } catch (Exception e) {
702 logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e);
703 }
704 }
705
706 @Override
707 protected void wakeup(boolean inEventLoop) {
708 if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
709 selector.wakeup();
710 }
711 }
712
713 Selector unwrappedSelector() {
714 return unwrappedSelector;
715 }
716
717 int selectNow() throws IOException {
718 try {
719 return selector.selectNow();
720 } finally {
721
722 if (wakenUp.get()) {
723 selector.wakeup();
724 }
725 }
726 }
727
728 private void select(boolean oldWakenUp) throws IOException {
729 Selector selector = this.selector;
730 try {
731 int selectCnt = 0;
732 long currentTimeNanos = System.nanoTime();
733 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
734 for (;;) {
735 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
736 if (timeoutMillis <= 0) {
737 if (selectCnt == 0) {
738 selector.selectNow();
739 selectCnt = 1;
740 }
741 break;
742 }
743
744
745
746
747
748 if (hasTasks() && wakenUp.compareAndSet(false, true)) {
749 selector.selectNow();
750 selectCnt = 1;
751 break;
752 }
753
754 int selectedKeys = selector.select(timeoutMillis);
755 selectCnt ++;
756
757 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
758
759
760
761
762 break;
763 }
764 if (Thread.interrupted()) {
765
766
767
768
769
770 if (logger.isDebugEnabled()) {
771 logger.debug("Selector.select() returned prematurely because " +
772 "Thread.currentThread().interrupt() was called. Use " +
773 "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
774 }
775 selectCnt = 1;
776 break;
777 }
778
779 long time = System.nanoTime();
780 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
781
782 selectCnt = 1;
783 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
784 selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
785
786
787 logger.warn(
788 "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
789 selectCnt, selector);
790
791 rebuildSelector();
792 selector = this.selector;
793
794
795 selector.selectNow();
796 selectCnt = 1;
797 break;
798 }
799
800 currentTimeNanos = time;
801 }
802
803 if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
804 if (logger.isDebugEnabled()) {
805 logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
806 selectCnt - 1, selector);
807 }
808 }
809 } catch (CancelledKeyException e) {
810 if (logger.isDebugEnabled()) {
811 logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
812 selector, e);
813 }
814
815 }
816 }
817
818 private void selectAgain() {
819 needsToSelectAgain = false;
820 try {
821 selector.selectNow();
822 } catch (Throwable t) {
823 logger.warn("Failed to update SelectionKeys.", t);
824 }
825 }
826 }