1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelException;
20 import org.jboss.netty.channel.ChannelFuture;
21 import org.jboss.netty.channel.MessageEvent;
22 import org.jboss.netty.channel.socket.Worker;
23 import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
24 import org.jboss.netty.logging.InternalLogger;
25 import org.jboss.netty.logging.InternalLoggerFactory;
26 import org.jboss.netty.util.ExternalResourceReleasable;
27 import org.jboss.netty.util.ThreadRenamingRunnable;
28 import org.jboss.netty.util.internal.DeadLockProofWorker;
29
30 import java.io.IOException;
31 import java.nio.channels.AsynchronousCloseException;
32 import java.nio.channels.CancelledKeyException;
33 import java.nio.channels.ClosedChannelException;
34 import java.nio.channels.DatagramChannel;
35 import java.nio.channels.NotYetConnectedException;
36 import java.nio.channels.SelectableChannel;
37 import java.nio.channels.SelectionKey;
38 import java.nio.channels.Selector;
39 import java.nio.channels.SocketChannel;
40 import java.nio.channels.WritableByteChannel;
41 import java.util.Iterator;
42 import java.util.Queue;
43 import java.util.Set;
44 import java.util.concurrent.ConcurrentLinkedQueue;
45 import java.util.concurrent.Executor;
46 import java.util.concurrent.ExecutorService;
47 import java.util.concurrent.RejectedExecutionException;
48 import java.util.concurrent.atomic.AtomicBoolean;
49 import java.util.concurrent.atomic.AtomicInteger;
50 import java.util.concurrent.locks.ReadWriteLock;
51 import java.util.concurrent.locks.ReentrantReadWriteLock;
52
53 import static org.jboss.netty.channel.Channels.*;
54
55 abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
56
57
58 private static final AtomicInteger nextId = new AtomicInteger();
59
60 final int id = nextId.incrementAndGet();
61
62
63
64
65 private static final InternalLogger logger = InternalLoggerFactory
66 .getInstance(AbstractNioWorker.class);
67
68 private static final int CONSTRAINT_LEVEL = NioProviderMetadata.CONSTRAINT_LEVEL;
69
70 static final int CLEANUP_INTERVAL = 256;
71
72
73
74
75
76
77 private final Executor executor;
78
79
80
81
82
83 protected volatile Thread thread;
84
85
86
87
88 volatile Selector selector;
89
90
91
92
93
94
95
96 protected final AtomicBoolean wakenUp = new AtomicBoolean();
97
98
99
100
101 private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
102
103
104
105
106 private final Object startStopLock = new Object();
107
108
109
110
111 private final Queue<Runnable> registerTaskQueue = new ConcurrentLinkedQueue<Runnable>();
112
113
114
115
116 protected final Queue<Runnable> writeTaskQueue = new ConcurrentLinkedQueue<Runnable>();
117
118 private final Queue<Runnable> eventQueue = new ConcurrentLinkedQueue<Runnable>();
119
120
121 private volatile int cancelledKeys;
122
123 protected final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
124
125 AbstractNioWorker(Executor executor) {
126 this.executor = executor;
127 openSelector();
128 }
129
130 void register(AbstractNioChannel<?> channel, ChannelFuture future) {
131
132 synchronized (startStopLock) {
133 if (selector == null) {
134
135 throw new RejectedExecutionException("Worker has already been shutdown");
136 }
137 Runnable registerTask = createRegisterTask(channel, future);
138
139 boolean offered = registerTaskQueue.offer(registerTask);
140 assert offered;
141
142 if (wakenUp.compareAndSet(false, true)) {
143 selector.wakeup();
144 }
145
146 }
147 }
148
149
150
151 private Selector recreateSelector() throws IOException {
152 Selector newSelector = Selector.open();
153 Selector selector = this.selector;
154 this.selector = newSelector;
155
156
157
158 for (SelectionKey key: selector.keys()) {
159 SelectableChannel ch = key.channel();
160 int ops = key.interestOps();
161 Object att = key.attachment();
162
163 key.cancel();
164
165 try {
166
167 ch.register(newSelector, ops, att);
168 } catch (ClosedChannelException e) {
169
170 AbstractNioChannel<?> channel = (AbstractNioChannel<?>) att;
171 close(channel, succeededFuture(channel));
172 }
173 }
174 try {
175
176 selector.close();
177 } catch (Throwable t) {
178 if (logger.isWarnEnabled()) {
179 logger.warn("Failed to close a selector.", t);
180 }
181 }
182 if (logger.isWarnEnabled()) {
183 logger.warn("Recreated Selector because of possible jdk epoll(..) bug");
184 }
185 return newSelector;
186 }
187
188
189
190
191
192
193
194 private void openSelector() {
195 try {
196 selector = Selector.open();
197 } catch (Throwable t) {
198 throw new ChannelException("Failed to create a selector.", t);
199 }
200
201
202 boolean success = false;
203 try {
204 DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id));
205 success = true;
206 } finally {
207 if (!success) {
208
209 try {
210 selector.close();
211 } catch (Throwable t) {
212 logger.warn("Failed to close a selector.", t);
213 }
214 selector = null;
215
216 }
217 }
218 assert selector != null && selector.isOpen();
219 }
220
221
222 public void run() {
223 thread = Thread.currentThread();
224
225 boolean shutdown = false;
226 int selectReturnsImmediately = 0;
227 Selector selector = this.selector;
228
229
230 final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
231 boolean wakenupFromLoop = false;
232 for (;;) {
233 wakenUp.set(false);
234
235 if (CONSTRAINT_LEVEL != 0) {
236 selectorGuard.writeLock().lock();
237
238
239 selectorGuard.writeLock().unlock();
240 }
241
242 try {
243 long beforeSelect = System.nanoTime();
244 int selected = SelectorUtil.select(selector);
245 if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) {
246 long timeBlocked = System.nanoTime() - beforeSelect;
247
248 if (timeBlocked < minSelectTimeout) {
249 boolean notConnected = false;
250
251 for (SelectionKey key: selector.keys()) {
252 SelectableChannel ch = key.channel();
253 try {
254 if (ch instanceof DatagramChannel && !((DatagramChannel) ch).isConnected() ||
255 ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) {
256 notConnected = true;
257
258 key.cancel();
259 }
260 } catch (CancelledKeyException e) {
261
262 }
263 }
264 if (notConnected) {
265 selectReturnsImmediately = 0;
266 } else {
267
268
269
270 selectReturnsImmediately ++;
271
272 }
273
274 } else {
275 selectReturnsImmediately = 0;
276 }
277
278 if (selectReturnsImmediately == 1024) {
279
280
281
282 selector = recreateSelector();
283 selectReturnsImmediately = 0;
284 wakenupFromLoop = false;
285
286 continue;
287 }
288 } else {
289
290 selectReturnsImmediately = 0;
291 }
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321 if (wakenUp.get()) {
322 wakenupFromLoop = true;
323 selector.wakeup();
324 } else {
325 wakenupFromLoop = false;
326 }
327
328 cancelledKeys = 0;
329 processRegisterTaskQueue();
330 processEventQueue();
331 processWriteTaskQueue();
332 processSelectedKeys(selector.selectedKeys());
333
334
335
336
337
338
339 if (selector.keys().isEmpty()) {
340 if (shutdown ||
341 executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {
342
343 synchronized (startStopLock) {
344 if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
345 try {
346 selector.close();
347 } catch (IOException e) {
348 logger.warn(
349 "Failed to close a selector.", e);
350 } finally {
351 this.selector = null;
352 }
353 break;
354 } else {
355 shutdown = false;
356 }
357 }
358 }
359 } else {
360 shutdown = false;
361 }
362 } catch (Throwable t) {
363 logger.warn(
364 "Unexpected exception in the selector loop.", t);
365
366
367
368 try {
369 Thread.sleep(1000);
370 } catch (InterruptedException e) {
371
372 }
373 }
374 }
375 }
376
377 public void executeInIoThread(Runnable task) {
378 executeInIoThread(task, false);
379 }
380
381
382
383
384
385
386
387
388
389
390 public void executeInIoThread(Runnable task, boolean alwaysAsync) {
391 if (!alwaysAsync && Thread.currentThread() == thread) {
392 task.run();
393 } else {
394 eventQueue.offer(task);
395
396 synchronized (startStopLock) {
397
398
399 if (selector == null) {
400
401 for (;;) {
402 Runnable r = eventQueue.poll();
403 if (r == null) {
404 break;
405 }
406 r.run();
407 }
408 } else {
409 if (wakenUp.compareAndSet(false, true)) {
410
411 Selector selector = this.selector;
412 if (selector != null) {
413 selector.wakeup();
414 }
415 }
416 }
417 }
418 }
419
420 }
421
422
423 private void processRegisterTaskQueue() throws IOException {
424 for (;;) {
425 final Runnable task = registerTaskQueue.poll();
426 if (task == null) {
427 break;
428 }
429
430 task.run();
431 cleanUpCancelledKeys();
432 }
433 }
434
435 private void processWriteTaskQueue() throws IOException {
436 for (;;) {
437 final Runnable task = writeTaskQueue.poll();
438 if (task == null) {
439 break;
440 }
441
442 task.run();
443 cleanUpCancelledKeys();
444 }
445 }
446
447 private void processEventQueue() throws IOException {
448 for (;;) {
449 final Runnable task = eventQueue.poll();
450 if (task == null) {
451 break;
452 }
453 task.run();
454 cleanUpCancelledKeys();
455 }
456 }
457
458 private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
459
460
461
462 if (selectedKeys.isEmpty()) {
463 return;
464 }
465 for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
466 SelectionKey k = i.next();
467 i.remove();
468 try {
469 int readyOps = k.readyOps();
470 if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
471 if (!read(k)) {
472
473 continue;
474 }
475 }
476 if ((readyOps & SelectionKey.OP_WRITE) != 0) {
477 writeFromSelectorLoop(k);
478 }
479 } catch (CancelledKeyException e) {
480 close(k);
481 }
482
483 if (cleanUpCancelledKeys()) {
484 break;
485 }
486 }
487 }
488
489 private boolean cleanUpCancelledKeys() throws IOException {
490 if (cancelledKeys >= CLEANUP_INTERVAL) {
491 cancelledKeys = 0;
492 selector.selectNow();
493 return true;
494 }
495 return false;
496 }
497
498
499
500 private void close(SelectionKey k) {
501 AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
502 close(ch, succeededFuture(ch));
503 }
504
505 void writeFromUserCode(final AbstractNioChannel<?> channel) {
506 if (!channel.isConnected()) {
507 cleanUpWriteBuffer(channel);
508 return;
509 }
510
511 if (scheduleWriteIfNecessary(channel)) {
512 return;
513 }
514
515
516
517 if (channel.writeSuspended) {
518 return;
519 }
520
521 if (channel.inWriteNowLoop) {
522 return;
523 }
524
525 write0(channel);
526 }
527
528 void writeFromTaskLoop(AbstractNioChannel<?> ch) {
529 if (!ch.writeSuspended) {
530 write0(ch);
531 }
532 }
533
534 void writeFromSelectorLoop(final SelectionKey k) {
535 AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
536 ch.writeSuspended = false;
537 write0(ch);
538 }
539
540 protected abstract boolean scheduleWriteIfNecessary(AbstractNioChannel<?> channel);
541
542 protected void write0(AbstractNioChannel<?> channel) {
543 boolean open = true;
544 boolean addOpWrite = false;
545 boolean removeOpWrite = false;
546 boolean iothread = isIoThread(channel);
547
548 long writtenBytes = 0;
549
550 final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
551 final WritableByteChannel ch = channel.channel;
552 final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
553 final int writeSpinCount = channel.getConfig().getWriteSpinCount();
554 synchronized (channel.writeLock) {
555 channel.inWriteNowLoop = true;
556 for (;;) {
557
558 MessageEvent evt = channel.currentWriteEvent;
559 SendBuffer buf = null;
560 ChannelFuture future = null;
561 try {
562 if (evt == null) {
563 if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
564 removeOpWrite = true;
565 channel.writeSuspended = false;
566 break;
567 }
568 future = evt.getFuture();
569
570 channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
571 } else {
572 future = evt.getFuture();
573 buf = channel.currentWriteBuffer;
574
575 }
576
577 long localWrittenBytes = 0;
578 for (int i = writeSpinCount; i > 0; i --) {
579 localWrittenBytes = buf.transferTo(ch);
580 if (localWrittenBytes != 0) {
581 writtenBytes += localWrittenBytes;
582 break;
583 }
584 if (buf.finished()) {
585 break;
586 }
587 }
588
589 if (buf.finished()) {
590
591 buf.release();
592 channel.currentWriteEvent = null;
593 channel.currentWriteBuffer = null;
594 evt = null;
595 buf = null;
596 future.setSuccess();
597 } else {
598
599 addOpWrite = true;
600 channel.writeSuspended = true;
601
602 if (localWrittenBytes > 0) {
603
604 future.setProgress(
605 localWrittenBytes,
606 buf.writtenBytes(), buf.totalBytes());
607 }
608 break;
609 }
610 } catch (AsynchronousCloseException e) {
611
612 } catch (Throwable t) {
613 if (buf != null) {
614 buf.release();
615 }
616 channel.currentWriteEvent = null;
617 channel.currentWriteBuffer = null;
618 buf = null;
619 evt = null;
620 if (future != null) {
621 future.setFailure(t);
622 }
623 if (iothread) {
624 fireExceptionCaught(channel, t);
625 } else {
626 fireExceptionCaughtLater(channel, t);
627 }
628 if (t instanceof IOException) {
629 open = false;
630 close(channel, succeededFuture(channel));
631 }
632 }
633 }
634 channel.inWriteNowLoop = false;
635
636
637
638
639
640
641
642 if (open) {
643 if (addOpWrite) {
644 setOpWrite(channel);
645 } else if (removeOpWrite) {
646 clearOpWrite(channel);
647 }
648 }
649 }
650 if (iothread) {
651 fireWriteComplete(channel, writtenBytes);
652 } else {
653 fireWriteCompleteLater(channel, writtenBytes);
654 }
655 }
656
657 static boolean isIoThread(AbstractNioChannel<?> channel) {
658 return Thread.currentThread() == channel.worker.thread;
659 }
660
661 protected void setOpWrite(AbstractNioChannel<?> channel) {
662 Selector selector = this.selector;
663 SelectionKey key = channel.channel.keyFor(selector);
664 if (key == null) {
665 return;
666 }
667 if (!key.isValid()) {
668 close(key);
669 return;
670 }
671
672
673
674 synchronized (channel.interestOpsLock) {
675 int interestOps = channel.getRawInterestOps();
676 if ((interestOps & SelectionKey.OP_WRITE) == 0) {
677 interestOps |= SelectionKey.OP_WRITE;
678 key.interestOps(interestOps);
679 channel.setRawInterestOpsNow(interestOps);
680 }
681 }
682 }
683
684 protected void clearOpWrite(AbstractNioChannel<?> channel) {
685 Selector selector = this.selector;
686 SelectionKey key = channel.channel.keyFor(selector);
687 if (key == null) {
688 return;
689 }
690 if (!key.isValid()) {
691 close(key);
692 return;
693 }
694
695
696
697 synchronized (channel.interestOpsLock) {
698 int interestOps = channel.getRawInterestOps();
699 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
700 interestOps &= ~SelectionKey.OP_WRITE;
701 key.interestOps(interestOps);
702 channel.setRawInterestOpsNow(interestOps);
703 }
704 }
705 }
706
707
708 void close(AbstractNioChannel<?> channel, ChannelFuture future) {
709 boolean connected = channel.isConnected();
710 boolean bound = channel.isBound();
711 boolean iothread = isIoThread(channel);
712
713 try {
714 channel.channel.close();
715 cancelledKeys ++;
716
717 if (channel.setClosed()) {
718 future.setSuccess();
719 if (connected) {
720 if (iothread) {
721 fireChannelDisconnected(channel);
722 } else {
723 fireChannelDisconnectedLater(channel);
724 }
725 }
726 if (bound) {
727 if (iothread) {
728 fireChannelUnbound(channel);
729 } else {
730 fireChannelUnboundLater(channel);
731 }
732 }
733
734 cleanUpWriteBuffer(channel);
735 if (iothread) {
736 fireChannelClosed(channel);
737 } else {
738 fireChannelClosedLater(channel);
739 }
740 } else {
741 future.setSuccess();
742 }
743 } catch (Throwable t) {
744 future.setFailure(t);
745 if (iothread) {
746 fireExceptionCaught(channel, t);
747 } else {
748 fireExceptionCaughtLater(channel, t);
749 }
750 }
751 }
752
753 protected static void cleanUpWriteBuffer(AbstractNioChannel<?> channel) {
754 Exception cause = null;
755 boolean fireExceptionCaught = false;
756
757
758 synchronized (channel.writeLock) {
759 MessageEvent evt = channel.currentWriteEvent;
760 if (evt != null) {
761
762
763 if (channel.isOpen()) {
764 cause = new NotYetConnectedException();
765 } else {
766 cause = new ClosedChannelException();
767 }
768
769 ChannelFuture future = evt.getFuture();
770 if (channel.currentWriteBuffer != null) {
771 channel.currentWriteBuffer.release();
772 channel.currentWriteBuffer = null;
773 }
774 channel.currentWriteEvent = null;
775 evt = null;
776 future.setFailure(cause);
777 fireExceptionCaught = true;
778 }
779
780 Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
781 for (;;) {
782 evt = writeBuffer.poll();
783 if (evt == null) {
784 break;
785 }
786
787
788 if (cause == null) {
789 if (channel.isOpen()) {
790 cause = new NotYetConnectedException();
791 } else {
792 cause = new ClosedChannelException();
793 }
794 fireExceptionCaught = true;
795 }
796 evt.getFuture().setFailure(cause);
797
798
799 }
800 }
801
802 if (fireExceptionCaught) {
803 if (isIoThread(channel)) {
804 fireExceptionCaught(channel, cause);
805 } else {
806 fireExceptionCaughtLater(channel, cause);
807 }
808 }
809 }
810
811 void setInterestOps(AbstractNioChannel<?> channel, ChannelFuture future, int interestOps) {
812 boolean changed = false;
813 boolean iothread = isIoThread(channel);
814 try {
815
816
817 synchronized (channel.interestOpsLock) {
818 Selector selector = this.selector;
819 SelectionKey key = channel.channel.keyFor(selector);
820
821
822 interestOps &= ~Channel.OP_WRITE;
823 interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE;
824
825 if (key == null || selector == null) {
826 if (channel.getRawInterestOps() != interestOps) {
827 changed = true;
828 }
829
830
831
832 channel.setRawInterestOpsNow(interestOps);
833
834 future.setSuccess();
835 if (changed) {
836 if (iothread) {
837 fireChannelInterestChanged(channel);
838 } else {
839 fireChannelInterestChangedLater(channel);
840 }
841 }
842
843 return;
844 }
845
846 switch (CONSTRAINT_LEVEL) {
847 case 0:
848 if (channel.getRawInterestOps() != interestOps) {
849 key.interestOps(interestOps);
850 if (Thread.currentThread() != thread &&
851 wakenUp.compareAndSet(false, true)) {
852 selector.wakeup();
853 }
854 changed = true;
855 }
856 break;
857 case 1:
858 case 2:
859 if (channel.getRawInterestOps() != interestOps) {
860 if (Thread.currentThread() == thread) {
861 key.interestOps(interestOps);
862 changed = true;
863 } else {
864 selectorGuard.readLock().lock();
865 try {
866 if (wakenUp.compareAndSet(false, true)) {
867 selector.wakeup();
868 }
869 key.interestOps(interestOps);
870 changed = true;
871 } finally {
872 selectorGuard.readLock().unlock();
873 }
874 }
875 }
876 break;
877 default:
878 throw new Error();
879 }
880
881 if (changed) {
882 channel.setRawInterestOpsNow(interestOps);
883 }
884 }
885
886 future.setSuccess();
887 if (changed) {
888 if (iothread) {
889 fireChannelInterestChanged(channel);
890 } else {
891 fireChannelInterestChangedLater(channel);
892 }
893 }
894 } catch (CancelledKeyException e) {
895
896 ClosedChannelException cce = new ClosedChannelException();
897 future.setFailure(cce);
898 if (iothread) {
899 fireExceptionCaught(channel, cce);
900 } else {
901 fireExceptionCaughtLater(channel, cce);
902 }
903 } catch (Throwable t) {
904 future.setFailure(t);
905 if (iothread) {
906 fireExceptionCaught(channel, t);
907 } else {
908 fireExceptionCaughtLater(channel, t);
909 }
910 }
911 }
912
913
914 public void releaseExternalResources() {
915 sendBufferPool.releaseExternalResources();
916 }
917
918
919
920
921
922
923
924
925 protected abstract boolean read(SelectionKey k);
926
927
928
929
930
931
932
933
934 protected abstract Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future);
935
936 }