1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.channel.socket.ChannelOutputShutdownEvent;
20 import io.netty.channel.socket.ChannelOutputShutdownException;
21 import io.netty.util.DefaultAttributeMap;
22 import io.netty.util.ReferenceCountUtil;
23 import io.netty.util.internal.ObjectUtil;
24 import io.netty.util.internal.PlatformDependent;
25 import io.netty.util.internal.logging.InternalLogger;
26 import io.netty.util.internal.logging.InternalLoggerFactory;
27
28 import java.io.IOException;
29 import java.net.ConnectException;
30 import java.net.InetSocketAddress;
31 import java.net.NoRouteToHostException;
32 import java.net.SocketAddress;
33 import java.net.SocketException;
34 import java.nio.channels.ClosedChannelException;
35 import java.nio.channels.NotYetConnectedException;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.RejectedExecutionException;
38
39
40
41
42 public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
43
44 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
45
46 private final Channel parent;
47 private final ChannelId id;
48 private final Unsafe unsafe;
49 private final DefaultChannelPipeline pipeline;
50 private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
51 private final CloseFuture closeFuture = new CloseFuture(this);
52
53 private volatile SocketAddress localAddress;
54 private volatile SocketAddress remoteAddress;
55 private volatile EventLoop eventLoop;
56 private volatile boolean registered;
57 private boolean closeInitiated;
58 private Throwable initialCloseCause;
59
60
61 private boolean strValActive;
62 private String strVal;
63
64
65
66
67
68
69
70 protected AbstractChannel(Channel parent) {
71 this.parent = parent;
72 id = newId();
73 unsafe = newUnsafe();
74 pipeline = newChannelPipeline();
75 }
76
77
78
79
80
81
82
83 protected AbstractChannel(Channel parent, ChannelId id) {
84 this.parent = parent;
85 this.id = id;
86 unsafe = newUnsafe();
87 pipeline = newChannelPipeline();
88 }
89
90 protected final int maxMessagesPerWrite() {
91 ChannelConfig config = config();
92 if (config instanceof DefaultChannelConfig) {
93 return ((DefaultChannelConfig) config).getMaxMessagesPerWrite();
94 }
95 Integer value = config.getOption(ChannelOption.MAX_MESSAGES_PER_WRITE);
96 if (value == null) {
97 return Integer.MAX_VALUE;
98 }
99 return value;
100 }
101
102 @Override
103 public final ChannelId id() {
104 return id;
105 }
106
107
108
109
110
111 protected ChannelId newId() {
112 return DefaultChannelId.newInstance();
113 }
114
115
116
117
118 protected DefaultChannelPipeline newChannelPipeline() {
119 return new DefaultChannelPipeline(this);
120 }
121
122 @Override
123 public boolean isWritable() {
124 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
125 return buf != null && buf.isWritable();
126 }
127
128 @Override
129 public long bytesBeforeUnwritable() {
130 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
131
132
133 return buf != null ? buf.bytesBeforeUnwritable() : 0;
134 }
135
136 @Override
137 public long bytesBeforeWritable() {
138 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
139
140
141 return buf != null ? buf.bytesBeforeWritable() : Long.MAX_VALUE;
142 }
143
144 @Override
145 public Channel parent() {
146 return parent;
147 }
148
149 @Override
150 public ChannelPipeline pipeline() {
151 return pipeline;
152 }
153
154 @Override
155 public ByteBufAllocator alloc() {
156 return config().getAllocator();
157 }
158
159 @Override
160 public EventLoop eventLoop() {
161 EventLoop eventLoop = this.eventLoop;
162 if (eventLoop == null) {
163 throw new IllegalStateException("channel not registered to an event loop");
164 }
165 return eventLoop;
166 }
167
168 @Override
169 public SocketAddress localAddress() {
170 SocketAddress localAddress = this.localAddress;
171 if (localAddress == null) {
172 try {
173 this.localAddress = localAddress = unsafe().localAddress();
174 } catch (Error e) {
175 throw e;
176 } catch (Throwable t) {
177
178 return null;
179 }
180 }
181 return localAddress;
182 }
183
184
185
186
187 @Deprecated
188 protected void invalidateLocalAddress() {
189 localAddress = null;
190 }
191
192 @Override
193 public SocketAddress remoteAddress() {
194 SocketAddress remoteAddress = this.remoteAddress;
195 if (remoteAddress == null) {
196 try {
197 this.remoteAddress = remoteAddress = unsafe().remoteAddress();
198 } catch (Error e) {
199 throw e;
200 } catch (Throwable t) {
201
202 return null;
203 }
204 }
205 return remoteAddress;
206 }
207
208
209
210
211 @Deprecated
212 protected void invalidateRemoteAddress() {
213 remoteAddress = null;
214 }
215
216 @Override
217 public boolean isRegistered() {
218 return registered;
219 }
220
221 @Override
222 public ChannelFuture bind(SocketAddress localAddress) {
223 return pipeline.bind(localAddress);
224 }
225
226 @Override
227 public ChannelFuture connect(SocketAddress remoteAddress) {
228 return pipeline.connect(remoteAddress);
229 }
230
231 @Override
232 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
233 return pipeline.connect(remoteAddress, localAddress);
234 }
235
236 @Override
237 public ChannelFuture disconnect() {
238 return pipeline.disconnect();
239 }
240
241 @Override
242 public ChannelFuture close() {
243 return pipeline.close();
244 }
245
246 @Override
247 public ChannelFuture deregister() {
248 return pipeline.deregister();
249 }
250
251 @Override
252 public Channel flush() {
253 pipeline.flush();
254 return this;
255 }
256
257 @Override
258 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
259 return pipeline.bind(localAddress, promise);
260 }
261
262 @Override
263 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
264 return pipeline.connect(remoteAddress, promise);
265 }
266
267 @Override
268 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
269 return pipeline.connect(remoteAddress, localAddress, promise);
270 }
271
272 @Override
273 public ChannelFuture disconnect(ChannelPromise promise) {
274 return pipeline.disconnect(promise);
275 }
276
277 @Override
278 public ChannelFuture close(ChannelPromise promise) {
279 return pipeline.close(promise);
280 }
281
282 @Override
283 public ChannelFuture deregister(ChannelPromise promise) {
284 return pipeline.deregister(promise);
285 }
286
287 @Override
288 public Channel read() {
289 pipeline.read();
290 return this;
291 }
292
293 @Override
294 public ChannelFuture write(Object msg) {
295 return pipeline.write(msg);
296 }
297
298 @Override
299 public ChannelFuture write(Object msg, ChannelPromise promise) {
300 return pipeline.write(msg, promise);
301 }
302
303 @Override
304 public ChannelFuture writeAndFlush(Object msg) {
305 return pipeline.writeAndFlush(msg);
306 }
307
308 @Override
309 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
310 return pipeline.writeAndFlush(msg, promise);
311 }
312
313 @Override
314 public ChannelPromise newPromise() {
315 return pipeline.newPromise();
316 }
317
318 @Override
319 public ChannelProgressivePromise newProgressivePromise() {
320 return pipeline.newProgressivePromise();
321 }
322
323 @Override
324 public ChannelFuture newSucceededFuture() {
325 return pipeline.newSucceededFuture();
326 }
327
328 @Override
329 public ChannelFuture newFailedFuture(Throwable cause) {
330 return pipeline.newFailedFuture(cause);
331 }
332
333 @Override
334 public ChannelFuture closeFuture() {
335 return closeFuture;
336 }
337
338 @Override
339 public Unsafe unsafe() {
340 return unsafe;
341 }
342
343
344
345
346 protected abstract AbstractUnsafe newUnsafe();
347
348
349
350
351 @Override
352 public final int hashCode() {
353 return id.hashCode();
354 }
355
356
357
358
359
360 @Override
361 public final boolean equals(Object o) {
362 return this == o;
363 }
364
365 @Override
366 public final int compareTo(Channel o) {
367 if (this == o) {
368 return 0;
369 }
370
371 return id().compareTo(o.id());
372 }
373
374
375
376
377
378
379
380 @Override
381 public String toString() {
382 boolean active = isActive();
383 if (strValActive == active && strVal != null) {
384 return strVal;
385 }
386
387 SocketAddress remoteAddr = remoteAddress();
388 SocketAddress localAddr = localAddress();
389 if (remoteAddr != null) {
390 StringBuilder buf = new StringBuilder(96)
391 .append("[id: 0x")
392 .append(id.asShortText())
393 .append(", L:")
394 .append(localAddr)
395 .append(active? " - " : " ! ")
396 .append("R:")
397 .append(remoteAddr)
398 .append(']');
399 strVal = buf.toString();
400 } else if (localAddr != null) {
401 StringBuilder buf = new StringBuilder(64)
402 .append("[id: 0x")
403 .append(id.asShortText())
404 .append(", L:")
405 .append(localAddr)
406 .append(']');
407 strVal = buf.toString();
408 } else {
409 StringBuilder buf = new StringBuilder(16)
410 .append("[id: 0x")
411 .append(id.asShortText())
412 .append(']');
413 strVal = buf.toString();
414 }
415
416 strValActive = active;
417 return strVal;
418 }
419
420 @Override
421 public final ChannelPromise voidPromise() {
422 return pipeline.voidPromise();
423 }
424
425
426
427
428 protected abstract class AbstractUnsafe implements Unsafe {
429
430 private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
431 private RecvByteBufAllocator.Handle recvHandle;
432 private boolean inFlush0;
433
434 private boolean neverRegistered = true;
435
436 private void assertEventLoop() {
437 assert !registered || eventLoop.inEventLoop();
438 }
439
440 @Override
441 public RecvByteBufAllocator.Handle recvBufAllocHandle() {
442 if (recvHandle == null) {
443 recvHandle = config().getRecvByteBufAllocator().newHandle();
444 }
445 return recvHandle;
446 }
447
448 @Override
449 public final ChannelOutboundBuffer outboundBuffer() {
450 return outboundBuffer;
451 }
452
453 @Override
454 public final SocketAddress localAddress() {
455 return localAddress0();
456 }
457
458 @Override
459 public final SocketAddress remoteAddress() {
460 return remoteAddress0();
461 }
462
463 @Override
464 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
465 ObjectUtil.checkNotNull(eventLoop, "eventLoop");
466 if (isRegistered()) {
467 promise.setFailure(new IllegalStateException("registered to an event loop already"));
468 return;
469 }
470 if (!isCompatible(eventLoop)) {
471 promise.setFailure(
472 new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
473 return;
474 }
475
476 AbstractChannel.this.eventLoop = eventLoop;
477
478 if (eventLoop.inEventLoop()) {
479 register0(promise);
480 } else {
481 try {
482 eventLoop.execute(new Runnable() {
483 @Override
484 public void run() {
485 register0(promise);
486 }
487 });
488 } catch (Throwable t) {
489 logger.warn(
490 "Force-closing a channel whose registration task was not accepted by an event loop: {}",
491 AbstractChannel.this, t);
492 closeForcibly();
493 closeFuture.setClosed();
494 safeSetFailure(promise, t);
495 }
496 }
497 }
498
499 private void register0(ChannelPromise promise) {
500 try {
501
502
503 if (!promise.setUncancellable() || !ensureOpen(promise)) {
504 return;
505 }
506 boolean firstRegistration = neverRegistered;
507 doRegister();
508 neverRegistered = false;
509 registered = true;
510
511
512
513 pipeline.invokeHandlerAddedIfNeeded();
514
515 safeSetSuccess(promise);
516 pipeline.fireChannelRegistered();
517
518
519 if (isActive()) {
520 if (firstRegistration) {
521 pipeline.fireChannelActive();
522 } else if (config().isAutoRead()) {
523
524
525
526
527 beginRead();
528 }
529 }
530 } catch (Throwable t) {
531
532 closeForcibly();
533 closeFuture.setClosed();
534 safeSetFailure(promise, t);
535 }
536 }
537
538 @Override
539 public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
540 assertEventLoop();
541
542 if (!promise.setUncancellable() || !ensureOpen(promise)) {
543 return;
544 }
545
546
547 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
548 localAddress instanceof InetSocketAddress &&
549 !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
550 !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
551
552
553 logger.warn(
554 "A non-root user can't receive a broadcast packet if the socket " +
555 "is not bound to a wildcard address; binding to a non-wildcard " +
556 "address (" + localAddress + ") anyway as requested.");
557 }
558
559 boolean wasActive = isActive();
560 try {
561 doBind(localAddress);
562 } catch (Throwable t) {
563 safeSetFailure(promise, t);
564 closeIfClosed();
565 return;
566 }
567
568 if (!wasActive && isActive()) {
569 invokeLater(new Runnable() {
570 @Override
571 public void run() {
572 pipeline.fireChannelActive();
573 }
574 });
575 }
576
577 safeSetSuccess(promise);
578 }
579
580 @Override
581 public final void disconnect(final ChannelPromise promise) {
582 assertEventLoop();
583
584 if (!promise.setUncancellable()) {
585 return;
586 }
587
588 boolean wasActive = isActive();
589 try {
590 doDisconnect();
591
592 remoteAddress = null;
593 localAddress = null;
594 } catch (Throwable t) {
595 safeSetFailure(promise, t);
596 closeIfClosed();
597 return;
598 }
599
600 if (wasActive && !isActive()) {
601 invokeLater(new Runnable() {
602 @Override
603 public void run() {
604 pipeline.fireChannelInactive();
605 }
606 });
607 }
608
609 safeSetSuccess(promise);
610 closeIfClosed();
611 }
612
613 @Override
614 public void close(final ChannelPromise promise) {
615 assertEventLoop();
616
617 ClosedChannelException closedChannelException =
618 StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
619 close(promise, closedChannelException, closedChannelException, false);
620 }
621
622
623
624
625
626 public final void shutdownOutput(final ChannelPromise promise) {
627 assertEventLoop();
628 shutdownOutput(promise, null);
629 }
630
631
632
633
634
635
636 private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
637 if (!promise.setUncancellable()) {
638 return;
639 }
640
641 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
642 if (outboundBuffer == null) {
643 promise.setFailure(new ClosedChannelException());
644 return;
645 }
646 this.outboundBuffer = null;
647
648 final Throwable shutdownCause = cause == null ?
649 new ChannelOutputShutdownException("Channel output shutdown") :
650 new ChannelOutputShutdownException("Channel output shutdown", cause);
651
652
653
654
655
656 try {
657
658
659 doShutdownOutput();
660 promise.setSuccess();
661 } catch (Throwable err) {
662 promise.setFailure(err);
663 } finally {
664 closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
665 }
666 }
667
668 private void closeOutboundBufferForShutdown(
669 ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
670 buffer.failFlushed(cause, false);
671 buffer.close(cause, true);
672 pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
673 }
674
675 private void close(final ChannelPromise promise, final Throwable cause,
676 final ClosedChannelException closeCause, final boolean notify) {
677 if (!promise.setUncancellable()) {
678 return;
679 }
680
681 if (closeInitiated) {
682 if (closeFuture.isDone()) {
683
684 safeSetSuccess(promise);
685 } else if (!(promise instanceof VoidChannelPromise)) {
686
687 closeFuture.addListener(new ChannelFutureListener() {
688 @Override
689 public void operationComplete(ChannelFuture future) throws Exception {
690 promise.setSuccess();
691 }
692 });
693 }
694 return;
695 }
696
697 closeInitiated = true;
698
699 final boolean wasActive = isActive();
700 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
701 this.outboundBuffer = null;
702 Executor closeExecutor = prepareToClose();
703 if (closeExecutor != null) {
704 closeExecutor.execute(new Runnable() {
705 @Override
706 public void run() {
707 try {
708
709 doClose0(promise);
710 } finally {
711
712 invokeLater(new Runnable() {
713 @Override
714 public void run() {
715 if (outboundBuffer != null) {
716
717 outboundBuffer.failFlushed(cause, notify);
718 outboundBuffer.close(closeCause);
719 }
720 fireChannelInactiveAndDeregister(wasActive);
721 }
722 });
723 }
724 }
725 });
726 } else {
727 try {
728
729 doClose0(promise);
730 } finally {
731 if (outboundBuffer != null) {
732
733 outboundBuffer.failFlushed(cause, notify);
734 outboundBuffer.close(closeCause);
735 }
736 }
737 if (inFlush0) {
738 invokeLater(new Runnable() {
739 @Override
740 public void run() {
741 fireChannelInactiveAndDeregister(wasActive);
742 }
743 });
744 } else {
745 fireChannelInactiveAndDeregister(wasActive);
746 }
747 }
748 }
749
750 private void doClose0(ChannelPromise promise) {
751 try {
752 doClose();
753 closeFuture.setClosed();
754 safeSetSuccess(promise);
755 } catch (Throwable t) {
756 closeFuture.setClosed();
757 safeSetFailure(promise, t);
758 }
759 }
760
761 private void fireChannelInactiveAndDeregister(final boolean wasActive) {
762 deregister(voidPromise(), wasActive && !isActive());
763 }
764
765 @Override
766 public final void closeForcibly() {
767 assertEventLoop();
768
769 try {
770 doClose();
771 } catch (Exception e) {
772 logger.warn("Failed to close a channel.", e);
773 }
774 }
775
776 @Override
777 public final void deregister(final ChannelPromise promise) {
778 assertEventLoop();
779
780 deregister(promise, false);
781 }
782
783 private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
784 if (!promise.setUncancellable()) {
785 return;
786 }
787
788 if (!registered) {
789 safeSetSuccess(promise);
790 return;
791 }
792
793
794
795
796
797
798
799
800
801
802 invokeLater(new Runnable() {
803 @Override
804 public void run() {
805 try {
806 doDeregister();
807 } catch (Throwable t) {
808 logger.warn("Unexpected exception occurred while deregistering a channel.", t);
809 } finally {
810 if (fireChannelInactive) {
811 pipeline.fireChannelInactive();
812 }
813
814
815
816
817 if (registered) {
818 registered = false;
819 pipeline.fireChannelUnregistered();
820 }
821 safeSetSuccess(promise);
822 }
823 }
824 });
825 }
826
827 @Override
828 public final void beginRead() {
829 assertEventLoop();
830
831 try {
832 doBeginRead();
833 } catch (final Exception e) {
834 invokeLater(new Runnable() {
835 @Override
836 public void run() {
837 pipeline.fireExceptionCaught(e);
838 }
839 });
840 close(voidPromise());
841 }
842 }
843
844 @Override
845 public final void write(Object msg, ChannelPromise promise) {
846 assertEventLoop();
847
848 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
849 if (outboundBuffer == null) {
850 try {
851
852 ReferenceCountUtil.release(msg);
853 } finally {
854
855
856
857
858 safeSetFailure(promise,
859 newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
860 }
861 return;
862 }
863
864 int size;
865 try {
866 msg = filterOutboundMessage(msg);
867 size = pipeline.estimatorHandle().size(msg);
868 if (size < 0) {
869 size = 0;
870 }
871 } catch (Throwable t) {
872 try {
873 ReferenceCountUtil.release(msg);
874 } finally {
875 safeSetFailure(promise, t);
876 }
877 return;
878 }
879
880 outboundBuffer.addMessage(msg, size, promise);
881 }
882
883 @Override
884 public final void flush() {
885 assertEventLoop();
886
887 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
888 if (outboundBuffer == null) {
889 return;
890 }
891
892 outboundBuffer.addFlush();
893 flush0();
894 }
895
896 @SuppressWarnings("deprecation")
897 protected void flush0() {
898 if (inFlush0) {
899
900 return;
901 }
902
903 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
904 if (outboundBuffer == null || outboundBuffer.isEmpty()) {
905 return;
906 }
907
908 inFlush0 = true;
909
910
911 if (!isActive()) {
912 try {
913
914 if (!outboundBuffer.isEmpty()) {
915 if (isOpen()) {
916 outboundBuffer.failFlushed(new NotYetConnectedException(), true);
917 } else {
918
919 outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
920 }
921 }
922 } finally {
923 inFlush0 = false;
924 }
925 return;
926 }
927
928 try {
929 doWrite(outboundBuffer);
930 } catch (Throwable t) {
931 handleWriteError(t);
932 } finally {
933 inFlush0 = false;
934 }
935 }
936
937 protected final void handleWriteError(Throwable t) {
938 if (t instanceof IOException && config().isAutoClose()) {
939
940
941
942
943
944
945
946
947 initialCloseCause = t;
948 close(voidPromise(), t, newClosedChannelException(t, "flush0()"), false);
949 } else {
950 try {
951 shutdownOutput(voidPromise(), t);
952 } catch (Throwable t2) {
953 initialCloseCause = t;
954 close(voidPromise(), t2, newClosedChannelException(t, "flush0()"), false);
955 }
956 }
957 }
958
959 private ClosedChannelException newClosedChannelException(Throwable cause, String method) {
960 ClosedChannelException exception =
961 StacklessClosedChannelException.newInstance(AbstractChannel.AbstractUnsafe.class, method);
962 if (cause != null) {
963 exception.initCause(cause);
964 }
965 return exception;
966 }
967
968 @Override
969 public final ChannelPromise voidPromise() {
970 assertEventLoop();
971
972 return unsafeVoidPromise;
973 }
974
975 protected final boolean ensureOpen(ChannelPromise promise) {
976 if (isOpen()) {
977 return true;
978 }
979
980 safeSetFailure(promise, newClosedChannelException(initialCloseCause, "ensureOpen(ChannelPromise)"));
981 return false;
982 }
983
984
985
986
987 protected final void safeSetSuccess(ChannelPromise promise) {
988 if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
989 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
990 }
991 }
992
993
994
995
996 protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
997 if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
998 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
999 }
1000 }
1001
1002 protected final void closeIfClosed() {
1003 if (isOpen()) {
1004 return;
1005 }
1006 close(voidPromise());
1007 }
1008
1009 private void invokeLater(Runnable task) {
1010 try {
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022 eventLoop().execute(task);
1023 } catch (RejectedExecutionException e) {
1024 logger.warn("Can't invoke task later as EventLoop rejected it", e);
1025 }
1026 }
1027
1028
1029
1030
1031 protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
1032 if (cause instanceof ConnectException) {
1033 return new AnnotatedConnectException((ConnectException) cause, remoteAddress);
1034 }
1035 if (cause instanceof NoRouteToHostException) {
1036 return new AnnotatedNoRouteToHostException((NoRouteToHostException) cause, remoteAddress);
1037 }
1038 if (cause instanceof SocketException) {
1039 return new AnnotatedSocketException((SocketException) cause, remoteAddress);
1040 }
1041
1042 return cause;
1043 }
1044
1045
1046
1047
1048
1049
1050
1051 protected Executor prepareToClose() {
1052 return null;
1053 }
1054 }
1055
1056
1057
1058
1059 protected abstract boolean isCompatible(EventLoop loop);
1060
1061
1062
1063
1064 protected abstract SocketAddress localAddress0();
1065
1066
1067
1068
1069 protected abstract SocketAddress remoteAddress0();
1070
1071
1072
1073
1074
1075
1076 protected void doRegister() throws Exception {
1077
1078 }
1079
1080
1081
1082
1083 protected abstract void doBind(SocketAddress localAddress) throws Exception;
1084
1085
1086
1087
1088 protected abstract void doDisconnect() throws Exception;
1089
1090
1091
1092
1093 protected abstract void doClose() throws Exception;
1094
1095
1096
1097
1098
1099 protected void doShutdownOutput() throws Exception {
1100 doClose();
1101 }
1102
1103
1104
1105
1106
1107
1108 protected void doDeregister() throws Exception {
1109
1110 }
1111
1112
1113
1114
1115 protected abstract void doBeginRead() throws Exception;
1116
1117
1118
1119
1120 protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
1121
1122
1123
1124
1125
1126 protected Object filterOutboundMessage(Object msg) throws Exception {
1127 return msg;
1128 }
1129
1130 protected void validateFileRegion(DefaultFileRegion region, long position) throws IOException {
1131 DefaultFileRegion.validate(region, position);
1132 }
1133
1134 static final class CloseFuture extends DefaultChannelPromise {
1135
1136 CloseFuture(AbstractChannel ch) {
1137 super(ch);
1138 }
1139
1140 @Override
1141 public ChannelPromise setSuccess() {
1142 throw new IllegalStateException();
1143 }
1144
1145 @Override
1146 public ChannelPromise setFailure(Throwable cause) {
1147 throw new IllegalStateException();
1148 }
1149
1150 @Override
1151 public boolean trySuccess() {
1152 throw new IllegalStateException();
1153 }
1154
1155 @Override
1156 public boolean tryFailure(Throwable cause) {
1157 throw new IllegalStateException();
1158 }
1159
1160 boolean setClosed() {
1161 return super.trySuccess();
1162 }
1163 }
1164
1165 private static final class AnnotatedConnectException extends ConnectException {
1166
1167 private static final long serialVersionUID = 3901958112696433556L;
1168
1169 AnnotatedConnectException(ConnectException exception, SocketAddress remoteAddress) {
1170 super(exception.getMessage() + ": " + remoteAddress);
1171 initCause(exception);
1172 }
1173
1174
1175 @Override
1176 public Throwable fillInStackTrace() {
1177 return this;
1178 }
1179 }
1180
1181 private static final class AnnotatedNoRouteToHostException extends NoRouteToHostException {
1182
1183 private static final long serialVersionUID = -6801433937592080623L;
1184
1185 AnnotatedNoRouteToHostException(NoRouteToHostException exception, SocketAddress remoteAddress) {
1186 super(exception.getMessage() + ": " + remoteAddress);
1187 initCause(exception);
1188 }
1189
1190
1191 @Override
1192 public Throwable fillInStackTrace() {
1193 return this;
1194 }
1195 }
1196
1197 private static final class AnnotatedSocketException extends SocketException {
1198
1199 private static final long serialVersionUID = 3896743275010454039L;
1200
1201 AnnotatedSocketException(SocketException exception, SocketAddress remoteAddress) {
1202 super(exception.getMessage() + ": " + remoteAddress);
1203 initCause(exception);
1204 }
1205
1206
1207 @Override
1208 public Throwable fillInStackTrace() {
1209 return this;
1210 }
1211 }
1212 }