1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelException;
20 import io.netty.channel.EventLoop;
21 import io.netty.channel.EventLoopException;
22 import io.netty.channel.EventLoopTaskQueueFactory;
23 import io.netty.channel.SelectStrategy;
24 import io.netty.channel.SingleThreadEventLoop;
25 import io.netty.util.IntSupplier;
26 import io.netty.util.concurrent.RejectedExecutionHandler;
27 import io.netty.util.internal.ObjectUtil;
28 import io.netty.util.internal.PlatformDependent;
29 import io.netty.util.internal.ReflectionUtil;
30 import io.netty.util.internal.SystemPropertyUtil;
31 import io.netty.util.internal.logging.InternalLogger;
32 import io.netty.util.internal.logging.InternalLoggerFactory;
33
34 import java.io.IOException;
35 import java.lang.reflect.Field;
36 import java.nio.channels.CancelledKeyException;
37 import java.nio.channels.SelectableChannel;
38 import java.nio.channels.Selector;
39 import java.nio.channels.SelectionKey;
40
41 import java.nio.channels.spi.SelectorProvider;
42 import java.security.AccessController;
43 import java.security.PrivilegedAction;
44 import java.util.ArrayList;
45 import java.util.Collection;
46 import java.util.Iterator;
47 import java.util.NoSuchElementException;
48 import java.util.Queue;
49 import java.util.Set;
50 import java.util.concurrent.Executor;
51 import java.util.concurrent.atomic.AtomicLong;
52
53
54
55
56
57
58 public final class NioEventLoop extends SingleThreadEventLoop {
59
60 private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
61
62 private static final int CLEANUP_INTERVAL = 256;
63
64 private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
65 SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
66
67 private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
68 private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
69
70 private final IntSupplier selectNowSupplier = new IntSupplier() {
71 @Override
72 public int get() throws Exception {
73 return selectNow();
74 }
75 };
76
77
78
79
80
81
82
83 static {
84 if (PlatformDependent.javaVersion() < 7) {
85 final String key = "sun.nio.ch.bugLevel";
86 final String bugLevel = SystemPropertyUtil.get(key);
87 if (bugLevel == null) {
88 try {
89 AccessController.doPrivileged(new PrivilegedAction<Void>() {
90 @Override
91 public Void run() {
92 System.setProperty(key, "");
93 return null;
94 }
95 });
96 } catch (final SecurityException e) {
97 logger.debug("Unable to get/set System Property: " + key, e);
98 }
99 }
100 }
101
102 int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
103 if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
104 selectorAutoRebuildThreshold = 0;
105 }
106
107 SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
108
109 if (logger.isDebugEnabled()) {
110 logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
111 logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
112 }
113 }
114
115
116
117
118 private Selector selector;
119 private Selector unwrappedSelector;
120 private SelectedSelectionKeySet selectedKeys;
121
122 private final SelectorProvider provider;
123
124 private static final long AWAKE = -1L;
125 private static final long NONE = Long.MAX_VALUE;
126
127
128
129
130
131 private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
132
133 private final SelectStrategy selectStrategy;
134
135 private volatile int ioRatio = 50;
136 private int cancelledKeys;
137 private boolean needsToSelectAgain;
138
139 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
140 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
141 EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
142 super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
143 rejectedExecutionHandler);
144 this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
145 this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
146 final SelectorTuple selectorTuple = openSelector();
147 this.selector = selectorTuple.selector;
148 this.unwrappedSelector = selectorTuple.unwrappedSelector;
149 }
150
151 private static Queue<Runnable> newTaskQueue(
152 EventLoopTaskQueueFactory queueFactory) {
153 if (queueFactory == null) {
154 return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
155 }
156 return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
157 }
158
159 private static final class SelectorTuple {
160 final Selector unwrappedSelector;
161 final Selector selector;
162
163 SelectorTuple(Selector unwrappedSelector) {
164 this.unwrappedSelector = unwrappedSelector;
165 this.selector = unwrappedSelector;
166 }
167
168 SelectorTuple(Selector unwrappedSelector, Selector selector) {
169 this.unwrappedSelector = unwrappedSelector;
170 this.selector = selector;
171 }
172 }
173
174 private SelectorTuple openSelector() {
175 final Selector unwrappedSelector;
176 try {
177 unwrappedSelector = provider.openSelector();
178 } catch (IOException e) {
179 throw new ChannelException("failed to open a new selector", e);
180 }
181
182 if (DISABLE_KEY_SET_OPTIMIZATION) {
183 return new SelectorTuple(unwrappedSelector);
184 }
185
186 Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
187 @Override
188 public Object run() {
189 try {
190 return Class.forName(
191 "sun.nio.ch.SelectorImpl",
192 false,
193 PlatformDependent.getSystemClassLoader());
194 } catch (Throwable cause) {
195 return cause;
196 }
197 }
198 });
199
200 if (!(maybeSelectorImplClass instanceof Class) ||
201
202 !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
203 if (maybeSelectorImplClass instanceof Throwable) {
204 Throwable t = (Throwable) maybeSelectorImplClass;
205 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
206 }
207 return new SelectorTuple(unwrappedSelector);
208 }
209
210 final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
211 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
212
213 Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
214 @Override
215 public Object run() {
216 try {
217 Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
218 Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
219
220 if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
221
222
223 long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
224 long publicSelectedKeysFieldOffset =
225 PlatformDependent.objectFieldOffset(publicSelectedKeysField);
226
227 if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
228 PlatformDependent.putObject(
229 unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
230 PlatformDependent.putObject(
231 unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
232 return null;
233 }
234
235 }
236
237 Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
238 if (cause != null) {
239 return cause;
240 }
241 cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
242 if (cause != null) {
243 return cause;
244 }
245
246 selectedKeysField.set(unwrappedSelector, selectedKeySet);
247 publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
248 return null;
249 } catch (NoSuchFieldException e) {
250 return e;
251 } catch (IllegalAccessException e) {
252 return e;
253 }
254 }
255 });
256
257 if (maybeException instanceof Exception) {
258 selectedKeys = null;
259 Exception e = (Exception) maybeException;
260 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
261 return new SelectorTuple(unwrappedSelector);
262 }
263 selectedKeys = selectedKeySet;
264 logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
265 return new SelectorTuple(unwrappedSelector,
266 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
267 }
268
269
270
271
272 public SelectorProvider selectorProvider() {
273 return provider;
274 }
275
276 @Override
277 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
278 return newTaskQueue0(maxPendingTasks);
279 }
280
281 private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
282
283 return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
284 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
285 }
286
287
288
289
290
291
292 public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
293 ObjectUtil.checkNotNull(ch, "ch");
294 if (interestOps == 0) {
295 throw new IllegalArgumentException("interestOps must be non-zero.");
296 }
297 if ((interestOps & ~ch.validOps()) != 0) {
298 throw new IllegalArgumentException(
299 "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
300 }
301 ObjectUtil.checkNotNull(task, "task");
302
303 if (isShutdown()) {
304 throw new IllegalStateException("event loop shut down");
305 }
306
307 if (inEventLoop()) {
308 register0(ch, interestOps, task);
309 } else {
310 try {
311
312
313 submit(new Runnable() {
314 @Override
315 public void run() {
316 register0(ch, interestOps, task);
317 }
318 }).sync();
319 } catch (InterruptedException ignore) {
320
321 Thread.currentThread().interrupt();
322 }
323 }
324 }
325
326 private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
327 try {
328 ch.register(unwrappedSelector, interestOps, task);
329 } catch (Exception e) {
330 throw new EventLoopException("failed to register a channel", e);
331 }
332 }
333
334
335
336
337 public int getIoRatio() {
338 return ioRatio;
339 }
340
341
342
343
344
345
346
347 public void setIoRatio(int ioRatio) {
348 if (ioRatio <= 0 || ioRatio > 100) {
349 throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
350 }
351 this.ioRatio = ioRatio;
352 }
353
354
355
356
357
358 public void rebuildSelector() {
359 if (!inEventLoop()) {
360 execute(new Runnable() {
361 @Override
362 public void run() {
363 rebuildSelector0();
364 }
365 });
366 return;
367 }
368 rebuildSelector0();
369 }
370
371 @Override
372 public int registeredChannels() {
373 return selector.keys().size() - cancelledKeys;
374 }
375
376 @Override
377 public Iterator<Channel> registeredChannelsIterator() {
378 assert inEventLoop();
379 final Set<SelectionKey> keys = selector.keys();
380 if (keys.isEmpty()) {
381 return ChannelsReadOnlyIterator.empty();
382 }
383 return new Iterator<Channel>() {
384 final Iterator<SelectionKey> selectionKeyIterator =
385 ObjectUtil.checkNotNull(keys, "selectionKeys")
386 .iterator();
387 Channel next;
388 boolean isDone;
389
390 @Override
391 public boolean hasNext() {
392 if (isDone) {
393 return false;
394 }
395 Channel cur = next;
396 if (cur == null) {
397 cur = next = nextOrDone();
398 return cur != null;
399 }
400 return true;
401 }
402
403 @Override
404 public Channel next() {
405 if (isDone) {
406 throw new NoSuchElementException();
407 }
408 Channel cur = next;
409 if (cur == null) {
410 cur = nextOrDone();
411 if (cur == null) {
412 throw new NoSuchElementException();
413 }
414 }
415 next = nextOrDone();
416 return cur;
417 }
418
419 @Override
420 public void remove() {
421 throw new UnsupportedOperationException("remove");
422 }
423
424 private Channel nextOrDone() {
425 Iterator<SelectionKey> it = selectionKeyIterator;
426 while (it.hasNext()) {
427 SelectionKey key = it.next();
428 if (key.isValid()) {
429 Object attachment = key.attachment();
430 if (attachment instanceof AbstractNioChannel) {
431 return (AbstractNioChannel) attachment;
432 }
433 }
434 }
435 isDone = true;
436 return null;
437 }
438 };
439 }
440
441 private void rebuildSelector0() {
442 final Selector oldSelector = selector;
443 final SelectorTuple newSelectorTuple;
444
445 if (oldSelector == null) {
446 return;
447 }
448
449 try {
450 newSelectorTuple = openSelector();
451 } catch (Exception e) {
452 logger.warn("Failed to create a new Selector.", e);
453 return;
454 }
455
456
457 int nChannels = 0;
458 for (SelectionKey key: oldSelector.keys()) {
459 Object a = key.attachment();
460 try {
461 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
462 continue;
463 }
464
465 int interestOps = key.interestOps();
466 key.cancel();
467 SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
468 if (a instanceof AbstractNioChannel) {
469
470 ((AbstractNioChannel) a).selectionKey = newKey;
471 }
472 nChannels ++;
473 } catch (Exception e) {
474 logger.warn("Failed to re-register a Channel to the new Selector.", e);
475 if (a instanceof AbstractNioChannel) {
476 AbstractNioChannel ch = (AbstractNioChannel) a;
477 ch.unsafe().close(ch.unsafe().voidPromise());
478 } else {
479 @SuppressWarnings("unchecked")
480 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
481 invokeChannelUnregistered(task, key, e);
482 }
483 }
484 }
485
486 selector = newSelectorTuple.selector;
487 unwrappedSelector = newSelectorTuple.unwrappedSelector;
488
489 try {
490
491 oldSelector.close();
492 } catch (Throwable t) {
493 if (logger.isWarnEnabled()) {
494 logger.warn("Failed to close the old Selector.", t);
495 }
496 }
497
498 if (logger.isInfoEnabled()) {
499 logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
500 }
501 }
502
503 @Override
504 protected void run() {
505 int selectCnt = 0;
506 for (;;) {
507 try {
508 int strategy;
509 try {
510 strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
511 switch (strategy) {
512 case SelectStrategy.CONTINUE:
513 continue;
514
515 case SelectStrategy.BUSY_WAIT:
516
517
518 case SelectStrategy.SELECT:
519 long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
520 if (curDeadlineNanos == -1L) {
521 curDeadlineNanos = NONE;
522 }
523 nextWakeupNanos.set(curDeadlineNanos);
524 try {
525 if (!hasTasks()) {
526 strategy = select(curDeadlineNanos);
527 }
528 } finally {
529
530
531 nextWakeupNanos.lazySet(AWAKE);
532 }
533
534 default:
535 }
536 } catch (IOException e) {
537
538
539 rebuildSelector0();
540 selectCnt = 0;
541 handleLoopException(e);
542 continue;
543 }
544
545 selectCnt++;
546 cancelledKeys = 0;
547 needsToSelectAgain = false;
548 final int ioRatio = this.ioRatio;
549 boolean ranTasks;
550 if (ioRatio == 100) {
551 try {
552 if (strategy > 0) {
553 processSelectedKeys();
554 }
555 } finally {
556
557 ranTasks = runAllTasks();
558 }
559 } else if (strategy > 0) {
560 final long ioStartTime = System.nanoTime();
561 try {
562 processSelectedKeys();
563 } finally {
564
565 final long ioTime = System.nanoTime() - ioStartTime;
566 ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
567 }
568 } else {
569 ranTasks = runAllTasks(0);
570 }
571
572 if (ranTasks || strategy > 0) {
573 if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
574 logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
575 selectCnt - 1, selector);
576 }
577 selectCnt = 0;
578 } else if (unexpectedSelectorWakeup(selectCnt)) {
579 selectCnt = 0;
580 }
581 } catch (CancelledKeyException e) {
582
583 if (logger.isDebugEnabled()) {
584 logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
585 selector, e);
586 }
587 } catch (Error e) {
588 throw e;
589 } catch (Throwable t) {
590 handleLoopException(t);
591 } finally {
592
593 try {
594 if (isShuttingDown()) {
595 closeAll();
596 if (confirmShutdown()) {
597 return;
598 }
599 }
600 } catch (Error e) {
601 throw e;
602 } catch (Throwable t) {
603 handleLoopException(t);
604 }
605 }
606 }
607 }
608
609
610 private boolean unexpectedSelectorWakeup(int selectCnt) {
611 if (Thread.interrupted()) {
612
613
614
615
616
617 if (logger.isDebugEnabled()) {
618 logger.debug("Selector.select() returned prematurely because " +
619 "Thread.currentThread().interrupt() was called. Use " +
620 "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
621 }
622 return true;
623 }
624 if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
625 selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
626
627
628 logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
629 selectCnt, selector);
630 rebuildSelector();
631 return true;
632 }
633 return false;
634 }
635
636 private static void handleLoopException(Throwable t) {
637 logger.warn("Unexpected exception in the selector loop.", t);
638
639
640
641 try {
642 Thread.sleep(1000);
643 } catch (InterruptedException e) {
644
645 }
646 }
647
648 private void processSelectedKeys() {
649 if (selectedKeys != null) {
650 processSelectedKeysOptimized();
651 } else {
652 processSelectedKeysPlain(selector.selectedKeys());
653 }
654 }
655
656 @Override
657 protected void cleanup() {
658 try {
659 selector.close();
660 } catch (IOException e) {
661 logger.warn("Failed to close a selector.", e);
662 }
663 }
664
665 void cancel(SelectionKey key) {
666 key.cancel();
667 cancelledKeys ++;
668 if (cancelledKeys >= CLEANUP_INTERVAL) {
669 cancelledKeys = 0;
670 needsToSelectAgain = true;
671 }
672 }
673
674 private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
675
676
677
678 if (selectedKeys.isEmpty()) {
679 return;
680 }
681
682 Iterator<SelectionKey> i = selectedKeys.iterator();
683 for (;;) {
684 final SelectionKey k = i.next();
685 final Object a = k.attachment();
686 i.remove();
687
688 if (a instanceof AbstractNioChannel) {
689 processSelectedKey(k, (AbstractNioChannel) a);
690 } else {
691 @SuppressWarnings("unchecked")
692 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
693 processSelectedKey(k, task);
694 }
695
696 if (!i.hasNext()) {
697 break;
698 }
699
700 if (needsToSelectAgain) {
701 selectAgain();
702 selectedKeys = selector.selectedKeys();
703
704
705 if (selectedKeys.isEmpty()) {
706 break;
707 } else {
708 i = selectedKeys.iterator();
709 }
710 }
711 }
712 }
713
714 private void processSelectedKeysOptimized() {
715 for (int i = 0; i < selectedKeys.size; ++i) {
716 final SelectionKey k = selectedKeys.keys[i];
717
718
719 selectedKeys.keys[i] = null;
720
721 final Object a = k.attachment();
722
723 if (a instanceof AbstractNioChannel) {
724 processSelectedKey(k, (AbstractNioChannel) a);
725 } else {
726 @SuppressWarnings("unchecked")
727 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
728 processSelectedKey(k, task);
729 }
730
731 if (needsToSelectAgain) {
732
733
734 selectedKeys.reset(i + 1);
735
736 selectAgain();
737 i = -1;
738 }
739 }
740 }
741
742 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
743 final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
744 if (!k.isValid()) {
745 final EventLoop eventLoop;
746 try {
747 eventLoop = ch.eventLoop();
748 } catch (Throwable ignored) {
749
750
751
752 return;
753 }
754
755
756
757
758 if (eventLoop == this) {
759
760 unsafe.close(unsafe.voidPromise());
761 }
762 return;
763 }
764
765 try {
766 int readyOps = k.readyOps();
767
768
769 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
770
771
772 int ops = k.interestOps();
773 ops &= ~SelectionKey.OP_CONNECT;
774 k.interestOps(ops);
775
776 unsafe.finishConnect();
777 }
778
779
780 if ((readyOps & SelectionKey.OP_WRITE) != 0) {
781
782 unsafe.forceFlush();
783 }
784
785
786
787 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
788 unsafe.read();
789 }
790 } catch (CancelledKeyException ignored) {
791 unsafe.close(unsafe.voidPromise());
792 }
793 }
794
795 private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
796 int state = 0;
797 try {
798 task.channelReady(k.channel(), k);
799 state = 1;
800 } catch (Exception e) {
801 k.cancel();
802 invokeChannelUnregistered(task, k, e);
803 state = 2;
804 } finally {
805 switch (state) {
806 case 0:
807 k.cancel();
808 invokeChannelUnregistered(task, k, null);
809 break;
810 case 1:
811 if (!k.isValid()) {
812 invokeChannelUnregistered(task, k, null);
813 }
814 break;
815 default:
816 break;
817 }
818 }
819 }
820
821 private void closeAll() {
822 selectAgain();
823 Set<SelectionKey> keys = selector.keys();
824 Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
825 for (SelectionKey k: keys) {
826 Object a = k.attachment();
827 if (a instanceof AbstractNioChannel) {
828 channels.add((AbstractNioChannel) a);
829 } else {
830 k.cancel();
831 @SuppressWarnings("unchecked")
832 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
833 invokeChannelUnregistered(task, k, null);
834 }
835 }
836
837 for (AbstractNioChannel ch: channels) {
838 ch.unsafe().close(ch.unsafe().voidPromise());
839 }
840 }
841
842 private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause) {
843 try {
844 task.channelUnregistered(k.channel(), cause);
845 } catch (Exception e) {
846 logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e);
847 }
848 }
849
850 @Override
851 protected void wakeup(boolean inEventLoop) {
852 if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
853 selector.wakeup();
854 }
855 }
856
857 @Override
858 protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
859
860 return deadlineNanos < nextWakeupNanos.get();
861 }
862
863 @Override
864 protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
865
866 return deadlineNanos < nextWakeupNanos.get();
867 }
868
869 Selector unwrappedSelector() {
870 return unwrappedSelector;
871 }
872
873 int selectNow() throws IOException {
874 return selector.selectNow();
875 }
876
877 private int select(long deadlineNanos) throws IOException {
878 if (deadlineNanos == NONE) {
879 return selector.select();
880 }
881
882 long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
883 return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
884 }
885
886 private void selectAgain() {
887 needsToSelectAgain = false;
888 try {
889 selector.selectNow();
890 } catch (Throwable t) {
891 logger.warn("Failed to update SelectionKeys.", t);
892 }
893 }
894 }