1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package io.netty5.channel;
17  
18  import io.netty5.buffer.api.BufferAllocator;
19  import io.netty5.buffer.api.DefaultBufferAllocators;
20  import io.netty5.util.Resource;
21  import io.netty5.util.DefaultAttributeMap;
22  import io.netty5.util.concurrent.DefaultPromise;
23  import io.netty5.util.concurrent.EventExecutor;
24  import io.netty5.util.concurrent.Future;
25  import io.netty5.util.concurrent.Promise;
26  import io.netty5.util.internal.PlatformDependent;
27  import io.netty5.util.internal.StringUtil;
28  import io.netty5.util.internal.logging.InternalLogger;
29  import io.netty5.util.internal.logging.InternalLoggerFactory;
30  
31  import java.io.IOException;
32  import java.net.ConnectException;
33  import java.net.InetSocketAddress;
34  import java.net.NoRouteToHostException;
35  import java.net.SocketAddress;
36  import java.net.SocketException;
37  import java.nio.channels.AlreadyConnectedException;
38  import java.nio.channels.ClosedChannelException;
39  import java.nio.channels.ConnectionPendingException;
40  import java.nio.channels.NotYetConnectedException;
41  import java.util.Collections;
42  import java.util.IdentityHashMap;
43  import java.util.NoSuchElementException;
44  import java.util.Set;
45  import java.util.concurrent.Executor;
46  import java.util.concurrent.RejectedExecutionException;
47  import java.util.concurrent.TimeUnit;
48  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
49  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
50  
51  import static io.netty5.channel.ChannelOption.ALLOW_HALF_CLOSURE;
52  import static io.netty5.channel.ChannelOption.AUTO_CLOSE;
53  import static io.netty5.channel.ChannelOption.AUTO_READ;
54  import static io.netty5.channel.ChannelOption.BUFFER_ALLOCATOR;
55  import static io.netty5.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
56  import static io.netty5.channel.ChannelOption.MAX_MESSAGES_PER_READ;
57  import static io.netty5.channel.ChannelOption.MAX_MESSAGES_PER_WRITE;
58  import static io.netty5.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR;
59  import static io.netty5.channel.ChannelOption.RCVBUFFER_ALLOCATOR;
60  import static io.netty5.channel.ChannelOption.WRITE_BUFFER_WATER_MARK;
61  import static io.netty5.channel.ChannelOption.WRITE_SPIN_COUNT;
62  import static io.netty5.util.internal.ObjectUtil.checkPositive;
63  import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
64  import static java.util.Objects.requireNonNull;
65  
66  
67  
68  
69  public abstract class AbstractChannel<P extends Channel, L extends SocketAddress, R extends SocketAddress>
70          extends DefaultAttributeMap implements Channel {
71  
72      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
73      private static final MessageSizeEstimator DEFAULT_MSG_SIZE_ESTIMATOR = DefaultMessageSizeEstimator.DEFAULT;
74  
75      private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
76  
77      private static final Set<ChannelOption<?>> SUPPORTED_CHANNEL_OPTIONS = supportedOptions();
78  
79      private final P parent;
80      private final ChannelId id;
81      private final ChannelPipeline pipeline;
82      private final ClosePromise closePromise;
83      private final Runnable fireChannelWritabilityChangedTask;
84      private final EventLoop eventLoop;
85      private final ChannelMetadata metadata;
86  
87      
88      private boolean strValActive;
89      private String strVal;
90  
91      @SuppressWarnings("rawtypes")
92      private static final AtomicIntegerFieldUpdater<AbstractChannel> WRITABLE_UPDATER =
93              AtomicIntegerFieldUpdater.newUpdater(AbstractChannel.class, "writable");
94      private volatile int writable = 1;
95      private volatile ChannelOutboundBuffer outboundBuffer;
96      private volatile L localAddress;
97      private volatile R remoteAddress;
98      private volatile boolean registered;
99  
100     private static final AtomicIntegerFieldUpdater<AbstractChannel> AUTOREAD_UPDATER =
101             AtomicIntegerFieldUpdater.newUpdater(AbstractChannel.class, "autoRead");
102     private static final AtomicReferenceFieldUpdater<AbstractChannel, WriteBufferWaterMark> WATERMARK_UPDATER =
103             AtomicReferenceFieldUpdater.newUpdater(
104                     AbstractChannel.class, WriteBufferWaterMark.class, "writeBufferWaterMark");
105 
106     private volatile BufferAllocator bufferAllocator = DefaultBufferAllocators.preferredAllocator();
107     private volatile RecvBufferAllocator rcvBufAllocator;
108     private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR;
109 
110     private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
111     private volatile int writeSpinCount = 16;
112     private volatile int maxMessagesPerWrite = Integer.MAX_VALUE;
113 
114     @SuppressWarnings("FieldMayBeFinal")
115     private volatile int autoRead = 1;
116     private volatile boolean autoClose = true;
117     private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
118     private volatile boolean allowHalfClosure;
119 
120     
121     private boolean closeInitiated;
122     private Throwable initialCloseCause;
123     private boolean readBeforeActive;
124     private RecvBufferAllocator.Handle recvHandle;
125     private MessageSizeEstimator.Handle estimatorHandler;
126     private boolean inWriteFlushed;
127     
128     private boolean neverRegistered = true;
129     private boolean neverActive = true;
130 
131     
132 
133 
134 
135     private Promise<Void> connectPromise;
136     private Future<?> connectTimeoutFuture;
137     private R requestedRemoteAddress;
138 
139     
140 
141 
142 
143 
144 
145 
146     protected AbstractChannel(P parent, EventLoop eventLoop, ChannelMetadata metadata) {
147         this(parent, eventLoop, metadata, new AdaptiveRecvBufferAllocator());
148     }
149 
150     
151 
152 
153 
154 
155 
156 
157 
158     protected AbstractChannel(P parent, EventLoop eventLoop,
159                               ChannelMetadata metadata, RecvBufferAllocator defaultRecvBufferAllocator) {
160         this(parent, eventLoop, metadata, defaultRecvBufferAllocator, DefaultChannelId.newInstance());
161     }
162 
163     
164 
165 
166 
167 
168 
169 
170 
171 
172     protected AbstractChannel(P parent, EventLoop eventLoop, ChannelMetadata metadata,
173                               RecvBufferAllocator defaultRecvBufferAllocator, ChannelId id) {
174         this.parent = parent;
175         this.eventLoop = validateEventLoopGroup(eventLoop, "eventLoop", getClass());
176         this.metadata = requireNonNull(metadata, "metadata");
177         closePromise = new ClosePromise(eventLoop);
178         outboundBuffer = new ChannelOutboundBuffer(eventLoop);
179         this.id = id;
180         pipeline = newChannelPipeline();
181         fireChannelWritabilityChangedTask = () -> pipeline().fireChannelWritabilityChanged();
182         rcvBufAllocator = validateAndConfigure(defaultRecvBufferAllocator, metadata);
183     }
184 
185     private static RecvBufferAllocator validateAndConfigure(RecvBufferAllocator defaultRecvBufferAllocator,
186                                                             ChannelMetadata metadata) {
187         requireNonNull(defaultRecvBufferAllocator, "defaultRecvBufferAllocator");
188         if (defaultRecvBufferAllocator instanceof MaxMessagesRecvBufferAllocator) {
189             ((MaxMessagesRecvBufferAllocator) defaultRecvBufferAllocator)
190                     .maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
191         }
192         return defaultRecvBufferAllocator;
193     }
194 
195     protected static <T extends EventLoopGroup> T validateEventLoopGroup(
196             T group, String name, Class<? extends Channel> channelType) {
197         requireNonNull(group, name);
198         if (!group.isCompatible(channelType)) {
199             throw new IllegalArgumentException(group + " does not support channel of type " +
200                             StringUtil.simpleClassName(channelType));
201         }
202         return group;
203     }
204 
205     @Override
206     public final ChannelId id() {
207         return id;
208     }
209 
210     @Override
211     public final ChannelMetadata metadata() {
212         return metadata;
213     }
214 
215     
216 
217 
218     protected ChannelPipeline newChannelPipeline() {
219         return new DefaultAbstractChannelPipeline(this);
220     }
221 
222     @Override
223     public BufferAllocator bufferAllocator() {
224         return bufferAllocator;
225     }
226 
227     @Override
228     public final P parent() {
229         return parent;
230     }
231 
232     @Override
233     public final ChannelPipeline pipeline() {
234         return pipeline;
235     }
236 
237     @Override
238     public final EventLoop executor() {
239         return eventLoop;
240     }
241 
242     @Override
243     public final L localAddress() {
244         L localAddress = this.localAddress;
245         if (localAddress == null) {
246             try {
247                 this.localAddress = localAddress = localAddress0();
248             } catch (Error e) {
249                 throw e;
250             } catch (Throwable t) {
251                 
252                 return null;
253             }
254         }
255         return localAddress;
256     }
257 
258     @Override
259     public final R remoteAddress() {
260         R remoteAddress = this.remoteAddress;
261         if (remoteAddress == null) {
262             try {
263                 this.remoteAddress = remoteAddress = remoteAddress0();
264             } catch (Error e) {
265                 throw e;
266             } catch (Throwable t) {
267                 
268                 return null;
269             }
270         }
271         return remoteAddress;
272     }
273 
274     protected final void cacheAddresses(L localAddress, R  remoteAddress) {
275         this.localAddress = localAddress;
276         this.remoteAddress = remoteAddress;
277     }
278 
279     @Override
280     public final boolean isRegistered() {
281         return registered;
282     }
283 
284     @Override
285     public final Future<Void> closeFuture() {
286         return closePromise;
287     }
288 
289     private long totalPending() {
290         ChannelOutboundBuffer buf = outboundBuffer();
291         if (buf == null) {
292             return -1;
293         }
294         return buf.totalPendingWriteBytes() + pipeline().pendingOutboundBytes();
295     }
296 
297     @Override
298     public final long writableBytes() {
299         long totalPending = totalPending();
300         if (totalPending == -1) {
301             
302             return 0;
303         }
304 
305         long bytes = writeBufferWaterMark.high() -
306                 totalPending;
307         
308         if (bytes > 0) {
309             return WRITABLE_UPDATER.get(this) == 0 ? 0: bytes;
310         }
311         return 0;
312     }
313 
314     
315 
316 
317     @Override
318     public final int hashCode() {
319         return id.hashCode();
320     }
321 
322     
323 
324 
325 
326     @Override
327     public final boolean equals(Object o) {
328         return this == o;
329     }
330 
331     @Override
332     public final int compareTo(Channel o) {
333         if (this == o) {
334             return 0;
335         }
336 
337         return id().compareTo(o.id());
338     }
339 
340     
341 
342 
343 
344 
345 
346     @Override
347     public String toString() {
348         boolean active = isActive();
349         if (strValActive == active && strVal != null) {
350             return strVal;
351         }
352 
353         SocketAddress remoteAddr = remoteAddress();
354         SocketAddress localAddr = localAddress();
355         if (remoteAddr != null) {
356             StringBuilder buf = new StringBuilder(96)
357                 .append("[id: 0x")
358                 .append(id.asShortText())
359                 .append(", L:")
360                 .append(localAddr)
361                 .append(active? " - " : " ! ")
362                 .append("R:")
363                 .append(remoteAddr)
364                 .append(']');
365             strVal = buf.toString();
366         } else if (localAddr != null) {
367             StringBuilder buf = new StringBuilder(64)
368                 .append("[id: 0x")
369                 .append(id.asShortText())
370                 .append(", L:")
371                 .append(localAddr)
372                 .append(']');
373             strVal = buf.toString();
374         } else {
375             StringBuilder buf = new StringBuilder(16)
376                 .append("[id: 0x")
377                 .append(id.asShortText())
378                 .append(']');
379             strVal = buf.toString();
380         }
381 
382         strValActive = active;
383         return strVal;
384     }
385 
386     protected final void readIfIsAutoRead() {
387         assertEventLoop();
388 
389         if (isAutoRead() || readBeforeActive) {
390             readBeforeActive = false;
391             read();
392         }
393     }
394 
395     protected final void assertEventLoop() {
396         assert eventLoop.inEventLoop();
397     }
398 
399     protected RecvBufferAllocator.Handle recvBufAllocHandle() {
400         assertEventLoop();
401 
402         if (recvHandle == null) {
403             recvHandle = getRecvBufferAllocator().newHandle();
404         }
405         return recvHandle;
406     }
407 
408     private void registerTransport(final Promise<Void> promise) {
409         assertEventLoop();
410 
411         if (isRegistered()) {
412             promise.setFailure(new IllegalStateException("registered to an event loop already"));
413             return;
414         }
415 
416         try {
417             
418             
419             if (!promise.setUncancellable() || !ensureOpen(promise)) {
420                 return;
421             }
422             boolean firstRegistration = neverRegistered;
423             executor().registerForIo(this).addListener(f -> {
424                 if (f.isSuccess()) {
425 
426                     neverRegistered = false;
427                     registered = true;
428 
429                     safeSetSuccess(promise);
430                     pipeline.fireChannelRegistered();
431                     
432                     
433                     if (isActive()) {
434                         if (firstRegistration) {
435                             fireChannelActiveIfNotActiveBefore();
436                         }
437                         readIfIsAutoRead();
438                     }
439                 } else {
440                     
441                     closeNowAndFail(promise, f.cause());
442                 }
443             });
444 
445         } catch (Throwable t) {
446             
447             closeNowAndFail(promise, t);
448         }
449     }
450 
451     
452 
453 
454 
455 
456     protected final boolean fireChannelActiveIfNotActiveBefore() {
457         assertEventLoop();
458 
459         if (neverActive) {
460             neverActive = false;
461             pipeline().fireChannelActive();
462             return true;
463         }
464         return false;
465     }
466 
467     private void closeNowAndFail(Promise<Void> promise, Throwable cause) {
468         closeForciblyTransport();
469         closePromise.setClosed();
470         safeSetFailure(promise, cause);
471     }
472 
473     private void bindTransport(final SocketAddress localAddress, final Promise<Void> promise) {
474         assertEventLoop();
475 
476         if (!promise.setUncancellable() || !ensureOpen(promise)) {
477             return;
478         }
479 
480         
481         if (localAddress instanceof InetSocketAddress && isOptionSupported(ChannelOption.SO_BROADCAST) &&
482                 Boolean.TRUE.equals(getOption(ChannelOption.SO_BROADCAST)) &&
483             !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
484             !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
485             
486             
487             logger.warn(
488                     "A non-root user can't receive a broadcast packet if the socket " +
489                     "is not bound to a wildcard address; binding to a non-wildcard " +
490                     "address (" + localAddress + ") anyway as requested.");
491         }
492 
493         boolean wasActive = isActive();
494         try {
495             doBind(localAddress);
496         } catch (Throwable t) {
497             safeSetFailure(promise, t);
498             closeIfClosed();
499             return;
500         }
501 
502         if (!wasActive && isActive()) {
503             invokeLater(() -> {
504                 if (fireChannelActiveIfNotActiveBefore()) {
505                     readIfIsAutoRead();
506                 }
507             });
508         }
509 
510         safeSetSuccess(promise);
511     }
512 
513     private void disconnectTransport(final Promise<Void> promise) {
514         assertEventLoop();
515 
516         if (!promise.setUncancellable()) {
517             return;
518         }
519 
520         boolean wasActive = isActive();
521         try {
522             doDisconnect();
523             
524             remoteAddress = null;
525             localAddress = null;
526             neverActive = true;
527         } catch (Throwable t) {
528             safeSetFailure(promise, t);
529             closeIfClosed();
530             return;
531         }
532 
533         if (wasActive && !isActive()) {
534             invokeLater(pipeline::fireChannelInactive);
535         }
536 
537         safeSetSuccess(promise);
538         closeIfClosed(); 
539     }
540 
541     protected void closeTransport(final Promise<Void> promise) {
542         assertEventLoop();
543 
544         ClosedChannelException closedChannelException =
545                 StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(Promise)");
546         close(promise, closedChannelException, closedChannelException);
547     }
548 
549     private void updateWritabilityIfNeeded(boolean notify, boolean notifyLater) {
550         long totalPending = totalPending();
551 
552         if (totalPending > writeBufferWaterMark.high()) {
553             if (WRITABLE_UPDATER.compareAndSet(this, 1, 0)) {
554                 fireChannelWritabilityChangedIfNeeded(notify, notifyLater);
555             }
556         } else if (totalPending < writeBufferWaterMark.low()) {
557             if (WRITABLE_UPDATER.compareAndSet(this, 0, 1)) {
558                 fireChannelWritabilityChangedIfNeeded(notify, notifyLater);
559             }
560         }
561     }
562 
563     private void fireChannelWritabilityChangedIfNeeded(boolean notify, boolean notifyLater) {
564         if (!notify) {
565             return;
566         }
567         if (notifyLater) {
568             executor().execute(fireChannelWritabilityChangedTask);
569         } else {
570             pipeline().fireChannelWritabilityChanged();
571         }
572     }
573 
574     
575 
576 
577 
578 
579     private boolean shutdownOutput(final Promise<Void> promise, Throwable cause) {
580         final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
581         if (outboundBuffer == null) {
582             promise.setFailure(new ClosedChannelException());
583             return false;
584         }
585         this.outboundBuffer = null; 
586 
587         final Throwable shutdownCause = cause == null ?
588                 new ChannelOutputShutdownException("Channel output shutdown") :
589                 new ChannelOutputShutdownException("Channel output shutdown", cause);
590         
591         
592         
593         
594         try {
595             
596             
597             doShutdown(ChannelShutdownDirection.Outbound);
598             promise.setSuccess(null);
599         } catch (Throwable err) {
600             promise.setFailure(err);
601         } finally {
602             outboundBuffer.failFlushedAndClose(shutdownCause, shutdownCause);
603         }
604         return true;
605     }
606 
607     private void close(final Promise<Void> promise, final Throwable cause,
608                        final ClosedChannelException closeCause) {
609         if (!promise.setUncancellable()) {
610             return;
611         }
612 
613         if (closeInitiated) {
614             if (closePromise.isDone()) {
615                 
616                 safeSetSuccess(promise);
617             } else {
618                 
619                 closePromise.addListener(promise, (p, future) -> p.setSuccess(null));
620             }
621             return;
622         }
623 
624         closeInitiated = true;
625 
626         final boolean wasActive = isActive();
627         final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
628         this.outboundBuffer = null; 
629         Future<Executor> closeExecutorFuture = prepareToClose();
630         if (closeExecutorFuture != null) {
631             closeExecutorFuture.addListener(f -> {
632                 if (f.isFailed()) {
633                     logger.warn("We couldnt obtain the closeExecutor", f.cause());
634                     closeNow(outboundBuffer, wasActive, promise, cause, closeCause);
635                 } else {
636                     Executor closeExecutor = f.getNow();
637                     closeExecutor.execute(() -> {
638                         try {
639                             
640                             doClose0(promise);
641                         } finally {
642                             
643                             invokeLater(() -> {
644                                 closeAndUpdateWritability(outboundBuffer, cause, closeCause);
645                                 fireChannelInactiveAndDeregister(wasActive);
646                             });
647                         }
648                     });
649                 }
650             });
651         } else {
652             closeNow(outboundBuffer, wasActive, promise, cause, closeCause);
653         }
654     }
655 
656     private void closeNow(ChannelOutboundBuffer outboundBuffer, boolean wasActive, Promise<Void> promise,
657                           Throwable cause, ClosedChannelException closeCause) {
658         try {
659             
660             doClose0(promise);
661         } finally {
662             closeAndUpdateWritability(outboundBuffer, cause, closeCause);
663         }
664         if (inWriteFlushed) {
665             invokeLater(() -> fireChannelInactiveAndDeregister(wasActive));
666         } else {
667             fireChannelInactiveAndDeregister(wasActive);
668         }
669     }
670 
671     private void closeAndUpdateWritability(
672             ChannelOutboundBuffer outboundBuffer, Throwable cause, Throwable closeCause) {
673         if (outboundBuffer != null) {
674             
675             outboundBuffer.failFlushedAndClose(cause, closeCause);
676             updateWritabilityIfNeeded(false, false);
677         }
678     }
679 
680     private void doClose0(Promise<Void> promise) {
681         try {
682             cancelConnect();
683             doClose();
684             closePromise.setClosed();
685             safeSetSuccess(promise);
686         } catch (Throwable t) {
687             closePromise.setClosed();
688             safeSetFailure(promise, t);
689         }
690     }
691 
692     private void fireChannelInactiveAndDeregister(final boolean wasActive) {
693         deregister(newPromise(), wasActive && !isActive());
694     }
695 
696     protected final void closeForciblyTransport() {
697         assertEventLoop();
698 
699         try {
700             cancelConnect();
701             doClose();
702         } catch (Exception e) {
703             logger.warn("Failed to close a channel.", e);
704         }
705     }
706 
707     private void cancelConnect() {
708         Promise<Void> promise = connectPromise;
709         if (promise != null) {
710             
711             promise.tryFailure(new ClosedChannelException());
712             connectPromise = null;
713         }
714 
715         Future<?> future = connectTimeoutFuture;
716         if (future != null) {
717             future.cancel();
718             connectTimeoutFuture = null;
719         }
720     }
721 
722     protected final void shutdownTransport(ChannelShutdownDirection direction, Promise<Void> promise) {
723         assertEventLoop();
724 
725         if (!promise.setUncancellable()) {
726             return;
727         }
728         if (!isActive()) {
729             if (isOpen()) {
730                 promise.setFailure(new NotYetConnectedException());
731             } else {
732                 promise.setFailure(new ClosedChannelException());
733             }
734             return;
735         }
736         if (isShutdown(direction)) {
737             
738             promise.setSuccess(null);
739             return;
740         }
741         boolean fireEvent = false;
742         switch (direction) {
743             case Outbound:
744                 fireEvent = shutdownOutput(promise, null);
745                 break;
746             case Inbound:
747                 try {
748                     doShutdown(direction);
749                     promise.setSuccess(null);
750                     fireEvent = true;
751                 } catch (Throwable cause) {
752                     promise.setFailure(cause);
753                 }
754                 break;
755             default:
756                 
757                 promise.setFailure(new AssertionError());
758                 break;
759         }
760         if (fireEvent) {
761             pipeline().fireChannelShutdown(direction);
762         }
763     }
764 
765     protected void deregisterTransport(final Promise<Void> promise) {
766         assertEventLoop();
767 
768         deregister(promise, false);
769     }
770 
771     private void deregister(final Promise<Void> promise, final boolean fireChannelInactive) {
772         if (!promise.setUncancellable()) {
773             return;
774         }
775 
776         if (!registered) {
777             safeSetSuccess(promise);
778             return;
779         }
780 
781         
782         
783         
784         
785         
786         
787         
788         
789         
790         invokeLater(() -> {
791             try {
792                 eventLoop.deregisterForIo(this).addListener(f -> {
793                     if (f.isFailed()) {
794                         logger.warn("Unexpected exception occurred while deregistering a channel.", f.cause());
795                     }
796                     deregisterDone(fireChannelInactive, promise);
797                 });
798             } catch (Throwable t) {
799                 logger.warn("Unexpected exception occurred while deregistering a channel.", t);
800                 deregisterDone(fireChannelInactive, promise);
801             }
802         });
803     }
804 
805     private void deregisterDone(boolean fireChannelInactive, Promise<Void> promise) {
806         if (fireChannelInactive) {
807             pipeline.fireChannelInactive();
808         }
809         
810         
811         
812         
813         if (registered) {
814             registered = false;
815             pipeline.fireChannelUnregistered();
816 
817             if (!isOpen()) {
818                 
819                 
820                 while (!pipeline.isEmpty()) {
821                     try {
822                         pipeline.removeLast();
823                     } catch (NoSuchElementException ignore) {
824                         
825                         
826                     }
827                 }
828             }
829         }
830         safeSetSuccess(promise);
831     }
832 
833     private void readTransport() {
834         assertEventLoop();
835 
836         if (!isActive()) {
837             readBeforeActive = true;
838             return;
839         }
840 
841         if (isShutdown(ChannelShutdownDirection.Inbound)) {
842             
843             return;
844         }
845         try {
846             doBeginRead();
847         } catch (final Exception e) {
848             invokeLater(() -> pipeline.fireChannelExceptionCaught(e));
849             closeTransport(newPromise());
850         }
851     }
852 
853     private void writeTransport(Object msg, Promise<Void> promise) {
854         assertEventLoop();
855 
856         ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
857         if (outboundBuffer == null) {
858             try {
859                 
860                 Resource.dispose(msg);
861             } finally {
862                 
863                 
864                 
865                 
866                 final Throwable cause;
867                 if (!isActive()) {
868                     cause = newClosedChannelException(initialCloseCause, "write(Object, Promise)");
869                 } else {
870                     cause = ChannelOutputShutdownException.newInstance(AbstractChannel.class,
871                             "writeTransport(Object, Promise)");
872                 }
873                 safeSetFailure(promise, cause);
874             }
875             return;
876         }
877 
878         int size;
879         try {
880             msg = filterOutboundMessage(msg);
881             if (estimatorHandler == null) {
882                 estimatorHandler = getMessageSizeEstimator().newHandle();
883             }
884             size = estimatorHandler.size(msg);
885             if (size < 0) {
886                 size = 0;
887             }
888         } catch (Throwable t) {
889             try {
890                 Resource.dispose(msg);
891             } catch (Throwable inner) {
892                 t.addSuppressed(inner);
893             } finally {
894                 safeSetFailure(promise, t);
895             }
896             return;
897         }
898 
899         outboundBuffer.addMessage(msg, size, promise);
900         updateWritabilityIfNeeded(true, false);
901     }
902 
903     private void flushTransport() {
904         assertEventLoop();
905 
906         ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
907         if (outboundBuffer == null) {
908             return;
909         }
910 
911         outboundBuffer.addFlush();
912         writeFlushed();
913     }
914 
915     
916 
917 
918     protected void writeFlushed() {
919         assertEventLoop();
920 
921         if (inWriteFlushed) {
922             
923             return;
924         }
925 
926         final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
927         if (outboundBuffer == null || outboundBuffer.isEmpty()) {
928             return;
929         }
930 
931         inWriteFlushed = true;
932 
933         
934         if (!isActive()) {
935             try {
936                 
937                 if (!outboundBuffer.isEmpty()) {
938                     if (isOpen()) {
939                         outboundBuffer.failFlushed(new NotYetConnectedException());
940                         updateWritabilityIfNeeded(true, true);
941                     } else {
942                         
943                         outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "writeFlushed()"));
944                     }
945                 }
946             } finally {
947                 inWriteFlushed = false;
948             }
949             return;
950         }
951 
952         try {
953             doWrite(outboundBuffer);
954         } catch (Throwable t) {
955             handleWriteError(t);
956         } finally {
957             
958             
959             updateWritabilityIfNeeded(true, true);
960             inWriteFlushed = false;
961         }
962     }
963 
964     protected final void handleWriteError(Throwable t) {
965         assertEventLoop();
966 
967         if (t instanceof IOException && isAutoClose()) {
968             
969 
970 
971 
972 
973 
974 
975 
976 
977             initialCloseCause = t;
978             close(newPromise(), t, newClosedChannelException(t, "writeFlushed()"));
979         } else {
980             try {
981                 if (shutdownOutput(newPromise(), t)) {
982                     pipeline().fireChannelShutdown(ChannelShutdownDirection.Outbound);
983                 }
984             } catch (Throwable t2) {
985                 initialCloseCause = t;
986                 close(newPromise(), t2, newClosedChannelException(t, "writeFlushed()"));
987             }
988         }
989     }
990 
991     private ClosedChannelException newClosedChannelException(Throwable cause, String method) {
992         ClosedChannelException exception =
993                 StacklessClosedChannelException.newInstance(AbstractChannel.class, method);
994         if (cause != null) {
995             exception.initCause(cause);
996         }
997         return exception;
998     }
999 
1000     private void sendOutboundEventTransport(Object event, Promise<Void> promise) {
1001         Resource.dispose(event);
1002         promise.setSuccess(null);
1003     }
1004 
1005     protected final boolean ensureOpen(Promise<Void> promise) {
1006         if (isOpen()) {
1007             return true;
1008         }
1009 
1010         safeSetFailure(promise, newClosedChannelException(initialCloseCause, "ensureOpen(Promise)"));
1011         return false;
1012     }
1013 
1014     
1015 
1016 
1017     protected final void safeSetSuccess(Promise<Void> promise) {
1018         if (!promise.trySuccess(null)) {
1019             logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
1020         }
1021     }
1022 
1023     
1024 
1025 
1026     protected final void safeSetFailure(Promise<Void> promise, Throwable cause) {
1027         if (!promise.tryFailure(cause)) {
1028             logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
1029         }
1030     }
1031 
1032     protected final void closeIfClosed() {
1033         assertEventLoop();
1034 
1035         if (isOpen()) {
1036             return;
1037         }
1038         closeTransport(newPromise());
1039     }
1040 
1041     private void invokeLater(Runnable task) {
1042         try {
1043             
1044             
1045             
1046             
1047             
1048             
1049             
1050             
1051             
1052             
1053             
1054             executor().execute(task);
1055         } catch (RejectedExecutionException e) {
1056             logger.warn("Can't invoke task later as EventLoop rejected it", e);
1057         }
1058     }
1059 
1060     
1061 
1062 
1063     protected static Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
1064         if (cause instanceof ConnectException) {
1065             return new AnnotatedConnectException((ConnectException) cause, remoteAddress);
1066         }
1067         if (cause instanceof NoRouteToHostException) {
1068             return new AnnotatedNoRouteToHostException((NoRouteToHostException) cause, remoteAddress);
1069         }
1070         if (cause instanceof SocketException) {
1071             return new AnnotatedSocketException((SocketException) cause, remoteAddress);
1072         }
1073 
1074         return cause;
1075     }
1076 
1077     
1078 
1079 
1080 
1081 
1082 
1083     protected Future<Executor> prepareToClose() {
1084         return null;
1085     }
1086 
1087     
1088 
1089 
1090 
1091 
1092 
1093     protected final ChannelOutboundBuffer outboundBuffer() {
1094         return outboundBuffer;
1095     }
1096 
1097     
1098 
1099 
1100     protected abstract L localAddress0();
1101 
1102     
1103 
1104 
1105     protected abstract R remoteAddress0();
1106 
1107     
1108 
1109 
1110     protected abstract void doBind(SocketAddress localAddress) throws Exception;
1111 
1112     
1113 
1114 
1115     protected abstract void doDisconnect() throws Exception;
1116 
1117     
1118 
1119 
1120     protected abstract void doClose() throws Exception;
1121 
1122     
1123 
1124 
1125 
1126 
1127 
1128     protected abstract void doShutdown(ChannelShutdownDirection direction) throws Exception;
1129 
1130     
1131 
1132 
1133     protected abstract void doBeginRead() throws Exception;
1134 
1135     
1136 
1137 
1138     protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
1139 
1140     
1141 
1142 
1143 
1144 
1145 
1146 
1147 
1148 
1149     protected abstract boolean doConnect(
1150             SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
1151 
1152     
1153 
1154 
1155 
1156 
1157 
1158 
1159 
1160     protected abstract boolean doFinishConnect(R requestedRemoteAddress) throws Exception;
1161 
1162     
1163 
1164 
1165 
1166 
1167     protected final boolean isConnectPending() {
1168         assertEventLoop();
1169         return connectPromise != null;
1170     }
1171 
1172     @SuppressWarnings("unchecked")
1173     private void connectTransport(
1174             SocketAddress remoteAddress, SocketAddress localAddress, Promise<Void> promise) {
1175         assertEventLoop();
1176         if (!promise.setUncancellable() || !ensureOpen(promise)) {
1177             return;
1178         }
1179 
1180         try {
1181             if (connectPromise != null) {
1182                 throw new ConnectionPendingException();
1183             }
1184 
1185             if (remoteAddress() != null) {
1186                 
1187                 throw new AlreadyConnectedException();
1188             }
1189 
1190             boolean wasActive = isActive();
1191             if (doConnect(remoteAddress, localAddress)) {
1192                 fulfillConnectPromise(promise, wasActive);
1193             } else {
1194                 connectPromise = promise;
1195                 requestedRemoteAddress = (R) remoteAddress;
1196 
1197                 
1198                 int connectTimeoutMillis = getConnectTimeoutMillis();
1199                 if (connectTimeoutMillis > 0) {
1200                     connectTimeoutFuture = executor().schedule(() -> {
1201                         Promise<Void> connectPromise = this.connectPromise;
1202                         if (connectPromise != null && !connectPromise.isDone()
1203                                 && connectPromise.tryFailure(new ConnectTimeoutException(
1204                                 "connection timed out: " + remoteAddress))) {
1205                             closeTransport(newPromise());
1206                         }
1207                     }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1208                 }
1209 
1210                 promise.asFuture().addListener(future -> {
1211                     if (future.isCancelled()) {
1212                         if (connectTimeoutFuture != null) {
1213                             connectTimeoutFuture.cancel();
1214                         }
1215                         connectPromise = null;
1216                         closeTransport(newPromise());
1217                     }
1218                 });
1219             }
1220         } catch (Throwable t) {
1221             closeIfClosed();
1222             promise.tryFailure(annotateConnectException(t, remoteAddress));
1223         }
1224     }
1225 
1226     private void fulfillConnectPromise(Promise<Void> promise, boolean wasActive) {
1227         if (promise == null) {
1228             
1229             return;
1230         }
1231 
1232         
1233         
1234         boolean active = isActive();
1235 
1236         
1237         boolean promiseSet = promise.trySuccess(null);
1238 
1239         
1240         
1241         if (!wasActive && active) {
1242             if (fireChannelActiveIfNotActiveBefore()) {
1243                 readIfIsAutoRead();
1244             }
1245         }
1246 
1247         
1248         if (!promiseSet) {
1249             closeTransport(newPromise());
1250         }
1251     }
1252 
1253     private void fulfillConnectPromise(Promise<Void> promise, Throwable cause) {
1254         if (promise == null) {
1255             
1256             return;
1257         }
1258 
1259         
1260         promise.tryFailure(cause);
1261         closeIfClosed();
1262     }
1263 
1264     
1265 
1266 
1267     protected final void finishConnect() {
1268         
1269         
1270         assertEventLoop();
1271 
1272         boolean connectStillInProgress = false;
1273         try {
1274             boolean wasActive = isActive();
1275             if (!doFinishConnect(requestedRemoteAddress)) {
1276                 connectStillInProgress = true;
1277                 return;
1278             }
1279             requestedRemoteAddress = null;
1280             fulfillConnectPromise(connectPromise, wasActive);
1281         } catch (Throwable t) {
1282             fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
1283         } finally {
1284             if (!connectStillInProgress) {
1285                 
1286                 
1287                 if (connectTimeoutFuture != null) {
1288                     connectTimeoutFuture.cancel();
1289                 }
1290                 connectPromise = null;
1291             }
1292         }
1293     }
1294 
1295     
1296 
1297 
1298 
1299     protected Object filterOutboundMessage(Object msg) throws Exception {
1300         return msg;
1301     }
1302 
1303     protected static void validateFileRegion(DefaultFileRegion region, long position) throws IOException {
1304         DefaultFileRegion.validate(region, position);
1305     }
1306 
1307     @Override
1308     @SuppressWarnings({ "unchecked", "deprecation" })
1309     public final <T> T getOption(ChannelOption<T> option) {
1310         requireNonNull(option, "option");
1311         if (option == AUTO_READ) {
1312             return (T) Boolean.valueOf(isAutoRead());
1313         }
1314         if (option == WRITE_BUFFER_WATER_MARK) {
1315             return (T) getWriteBufferWaterMark();
1316         }
1317         if (option == CONNECT_TIMEOUT_MILLIS) {
1318             return (T) Integer.valueOf(getConnectTimeoutMillis());
1319         }
1320         if (option == MAX_MESSAGES_PER_READ) {
1321             return (T) Integer.valueOf(getMaxMessagesPerRead());
1322         }
1323         if (option == WRITE_SPIN_COUNT) {
1324             return (T) Integer.valueOf(getWriteSpinCount());
1325         }
1326         if (option == BUFFER_ALLOCATOR) {
1327             return (T) getBufferAllocator();
1328         }
1329         if (option == RCVBUFFER_ALLOCATOR) {
1330             return getRecvBufferAllocator();
1331         }
1332         if (option == AUTO_CLOSE) {
1333             return (T) Boolean.valueOf(isAutoClose());
1334         }
1335         if (option == MESSAGE_SIZE_ESTIMATOR) {
1336             return (T) getMessageSizeEstimator();
1337         }
1338         if (option == MAX_MESSAGES_PER_WRITE) {
1339             return (T) Integer.valueOf(getMaxMessagesPerWrite());
1340         }
1341         if (option == ALLOW_HALF_CLOSURE) {
1342             return (T) Boolean.valueOf(isAllowHalfClosure());
1343         }
1344 
1345         return getExtendedOption(option);
1346     }
1347 
1348     
1349 
1350 
1351 
1352 
1353 
1354 
1355 
1356 
1357     protected  <T> T getExtendedOption(ChannelOption<T> option) {
1358         throw new UnsupportedOperationException("ChannelOption not supported: " + option);
1359     }
1360 
1361     @Override
1362     @SuppressWarnings("deprecation")
1363     public final <T> Channel setOption(ChannelOption<T> option, T value) {
1364         validate(option, value);
1365 
1366         if (option == AUTO_READ) {
1367             setAutoRead((Boolean) value);
1368         } else if (option == WRITE_BUFFER_WATER_MARK) {
1369             setWriteBufferWaterMark((WriteBufferWaterMark) value);
1370         } else if (option == CONNECT_TIMEOUT_MILLIS) {
1371             setConnectTimeoutMillis((Integer) value);
1372         } else if (option == MAX_MESSAGES_PER_READ) {
1373             setMaxMessagesPerRead((Integer) value);
1374         } else if (option == WRITE_SPIN_COUNT) {
1375             setWriteSpinCount((Integer) value);
1376         } else if (option == BUFFER_ALLOCATOR) {
1377             setBufferAllocator((BufferAllocator) value);
1378         } else if (option == RCVBUFFER_ALLOCATOR) {
1379             setRecvBufferAllocator((RecvBufferAllocator) value);
1380         } else if (option == AUTO_CLOSE) {
1381             setAutoClose((Boolean) value);
1382         } else if (option == MESSAGE_SIZE_ESTIMATOR) {
1383             setMessageSizeEstimator((MessageSizeEstimator) value);
1384         } else if (option == MAX_MESSAGES_PER_WRITE) {
1385             setMaxMessagesPerWrite((Integer) value);
1386         } else if (option == ALLOW_HALF_CLOSURE) {
1387             setAllowHalfClosure((Boolean) value);
1388         } else {
1389             setExtendedOption(option, value);
1390         }
1391 
1392         return this;
1393     }
1394 
1395     
1396 
1397 
1398 
1399 
1400 
1401 
1402 
1403     protected <T> void setExtendedOption(ChannelOption<T> option, T value) {
1404         throw new UnsupportedOperationException("ChannelOption not supported: " + option);
1405     }
1406 
1407     @Override
1408     public final boolean isOptionSupported(ChannelOption<?> option) {
1409         if (SUPPORTED_CHANNEL_OPTIONS.contains(option)) {
1410             return true;
1411         }
1412         return isExtendedOptionSupported(option);
1413     }
1414 
1415     
1416 
1417 
1418 
1419 
1420 
1421 
1422     protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
1423         return false;
1424     }
1425 
1426     private static Set<ChannelOption<?>> supportedOptions() {
1427         return newSupportedIdentityOptionsSet(
1428                 AUTO_READ, WRITE_BUFFER_WATER_MARK, CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ,
1429                 WRITE_SPIN_COUNT, BUFFER_ALLOCATOR, RCVBUFFER_ALLOCATOR, AUTO_CLOSE, MESSAGE_SIZE_ESTIMATOR,
1430                 MAX_MESSAGES_PER_WRITE, ALLOW_HALF_CLOSURE);
1431     }
1432 
1433     protected static Set<ChannelOption<?>> newSupportedIdentityOptionsSet(ChannelOption<?>... options) {
1434         Set<ChannelOption<?>> supportedOptionsSet = Collections.newSetFromMap(new IdentityHashMap<>());
1435         Collections.addAll(supportedOptionsSet, options);
1436         return Collections.unmodifiableSet(supportedOptionsSet);
1437     }
1438 
1439     protected <T> void validate(ChannelOption<T> option, T value) {
1440         requireNonNull(option, "option");
1441         option.validate(value);
1442     }
1443 
1444     private int getConnectTimeoutMillis() {
1445         return connectTimeoutMillis;
1446     }
1447 
1448     private void setConnectTimeoutMillis(int connectTimeoutMillis) {
1449         checkPositiveOrZero(connectTimeoutMillis, "connectTimeoutMillis");
1450         this.connectTimeoutMillis = connectTimeoutMillis;
1451     }
1452 
1453     
1454 
1455 
1456 
1457 
1458     @Deprecated
1459     private int getMaxMessagesPerRead() {
1460         try {
1461             MaxMessagesRecvBufferAllocator allocator = getRecvBufferAllocator();
1462             return allocator.maxMessagesPerRead();
1463         } catch (ClassCastException e) {
1464             throw new IllegalStateException("getRecvBufferAllocator() must return an object of type " +
1465                     "MaxMessagesRecvBufferAllocator", e);
1466         }
1467     }
1468 
1469     
1470 
1471 
1472 
1473 
1474     @Deprecated
1475     private void setMaxMessagesPerRead(int maxMessagesPerRead) {
1476         try {
1477             MaxMessagesRecvBufferAllocator allocator = getRecvBufferAllocator();
1478             allocator.maxMessagesPerRead(maxMessagesPerRead);
1479         } catch (ClassCastException e) {
1480             throw new IllegalStateException("getRecvBufferAllocator() must return an object of type " +
1481                     "MaxMessagesRecvBufferAllocator", e);
1482         }
1483     }
1484 
1485     
1486 
1487 
1488 
1489     protected final int getMaxMessagesPerWrite() {
1490         return maxMessagesPerWrite;
1491     }
1492 
1493     
1494 
1495 
1496 
1497     private void setMaxMessagesPerWrite(int maxMessagesPerWrite) {
1498         this.maxMessagesPerWrite = checkPositive(maxMessagesPerWrite, "maxMessagesPerWrite");
1499     }
1500 
1501     protected final int getWriteSpinCount() {
1502         return writeSpinCount;
1503     }
1504 
1505     private void setWriteSpinCount(int writeSpinCount) {
1506         checkPositive(writeSpinCount, "writeSpinCount");
1507         
1508         
1509         
1510         
1511         if (writeSpinCount == Integer.MAX_VALUE) {
1512             --writeSpinCount;
1513         }
1514         this.writeSpinCount = writeSpinCount;
1515     }
1516 
1517     private BufferAllocator getBufferAllocator() {
1518         return bufferAllocator;
1519     }
1520 
1521     public void setBufferAllocator(BufferAllocator bufferAllocator) {
1522         requireNonNull(bufferAllocator, "bufferAllocator");
1523         this.bufferAllocator = bufferAllocator;
1524     }
1525 
1526     @SuppressWarnings("unchecked")
1527     private <T extends RecvBufferAllocator> T getRecvBufferAllocator() {
1528         return (T) rcvBufAllocator;
1529     }
1530 
1531     private void setRecvBufferAllocator(RecvBufferAllocator allocator) {
1532         rcvBufAllocator = requireNonNull(allocator, "allocator");
1533     }
1534 
1535     protected final boolean isAutoRead() {
1536         return autoRead == 1;
1537     }
1538 
1539     private void setAutoRead(boolean autoRead) {
1540         boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
1541         if (autoRead && !oldAutoRead) {
1542             read();
1543         } else if (!autoRead && oldAutoRead) {
1544             autoReadCleared();
1545         }
1546     }
1547 
1548     
1549 
1550 
1551 
1552     protected void autoReadCleared() { }
1553 
1554     private boolean isAutoClose() {
1555         return autoClose;
1556     }
1557 
1558     private void setAutoClose(boolean autoClose) {
1559         this.autoClose = autoClose;
1560     }
1561 
1562     private void setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
1563         this.writeBufferWaterMark = requireNonNull(writeBufferWaterMark, "writeBufferWaterMark");
1564     }
1565 
1566     private WriteBufferWaterMark getWriteBufferWaterMark() {
1567         return writeBufferWaterMark;
1568     }
1569 
1570     private MessageSizeEstimator getMessageSizeEstimator() {
1571         return msgSizeEstimator;
1572     }
1573 
1574     private void setMessageSizeEstimator(MessageSizeEstimator estimator) {
1575         requireNonNull(estimator, "estimator");
1576         msgSizeEstimator = estimator;
1577     }
1578 
1579     protected final boolean isAllowHalfClosure() {
1580         return allowHalfClosure;
1581     }
1582 
1583     private void setAllowHalfClosure(boolean allowHalfClosure) {
1584         this.allowHalfClosure = allowHalfClosure;
1585     }
1586 
1587     private static final class ClosePromise extends DefaultPromise<Void> {
1588 
1589         ClosePromise(EventExecutor eventExecutor) {
1590             super(eventExecutor);
1591         }
1592 
1593         @Override
1594         public Promise<Void> setSuccess(Void result) {
1595             throw new IllegalStateException();
1596         }
1597 
1598         @Override
1599         public Promise<Void> setFailure(Throwable cause) {
1600             throw new IllegalStateException();
1601         }
1602 
1603         @Override
1604         public boolean trySuccess(Void result) {
1605             throw new IllegalStateException();
1606         }
1607 
1608         @Override
1609         public boolean tryFailure(Throwable cause) {
1610             throw new IllegalStateException();
1611         }
1612 
1613         @Override
1614         public boolean setUncancellable() {
1615             return false;
1616         }
1617 
1618         void setClosed() {
1619             super.trySuccess(null);
1620         }
1621     }
1622 
1623     private static final class AnnotatedConnectException extends ConnectException {
1624 
1625         private static final long serialVersionUID = 3901958112696433556L;
1626 
1627         AnnotatedConnectException(ConnectException exception, SocketAddress remoteAddress) {
1628             super(exception.getMessage() + ": " + remoteAddress);
1629             initCause(exception);
1630         }
1631 
1632         
1633         @Override
1634         public Throwable fillInStackTrace() {   
1635             return this;
1636         }
1637     }
1638 
1639     private static final class AnnotatedNoRouteToHostException extends NoRouteToHostException {
1640 
1641         private static final long serialVersionUID = -6801433937592080623L;
1642 
1643         AnnotatedNoRouteToHostException(NoRouteToHostException exception, SocketAddress remoteAddress) {
1644             super(exception.getMessage() + ": " + remoteAddress);
1645             initCause(exception);
1646         }
1647 
1648         
1649         @Override
1650         public Throwable fillInStackTrace() {   
1651             return this;
1652         }
1653     }
1654 
1655     private static final class AnnotatedSocketException extends SocketException {
1656 
1657         private static final long serialVersionUID = 3896743275010454039L;
1658 
1659         AnnotatedSocketException(SocketException exception, SocketAddress remoteAddress) {
1660             super(exception.getMessage() + ": " + remoteAddress);
1661             initCause(exception);
1662         }
1663 
1664         
1665         @Override
1666         public Throwable fillInStackTrace() {   
1667             return this;
1668         }
1669     }
1670 
1671     protected void runAfterTransportAction() {
1672         
1673     }
1674 
1675     protected static class DefaultAbstractChannelPipeline extends DefaultChannelPipeline {
1676         protected DefaultAbstractChannelPipeline(AbstractChannel<?, ?, ?> channel) {
1677             super(channel);
1678         }
1679 
1680         protected final AbstractChannel<?, ?, ?> abstractChannel() {
1681             return (AbstractChannel<?, ?, ?>) channel();
1682         }
1683 
1684         @Override
1685         protected final EventExecutor transportExecutor() {
1686             return abstractChannel().executor();
1687         }
1688 
1689         @Override
1690         protected final void pendingOutboundBytesUpdated(long pendingOutboundBytes) {
1691             abstractChannel().updateWritabilityIfNeeded(true, false);
1692         }
1693 
1694         @Override
1695         protected final void registerTransport(Promise<Void> promise) {
1696             AbstractChannel<?, ?, ?> channel = abstractChannel();
1697             channel.registerTransport(promise);
1698             channel.runAfterTransportAction();
1699         }
1700 
1701         @Override
1702         protected final void bindTransport(SocketAddress localAddress, Promise<Void> promise) {
1703             AbstractChannel<?, ?, ?> channel = abstractChannel();
1704             channel.bindTransport(localAddress, promise);
1705             channel.runAfterTransportAction();
1706         }
1707 
1708         @Override
1709         protected final void connectTransport(
1710                 SocketAddress remoteAddress, SocketAddress localAddress, Promise<Void> promise) {
1711             AbstractChannel<?, ?, ?> channel = abstractChannel();
1712             channel.connectTransport(remoteAddress, localAddress, promise);
1713             channel.runAfterTransportAction();
1714         }
1715 
1716         @Override
1717         protected final void disconnectTransport(Promise<Void> promise) {
1718             AbstractChannel<?, ?, ?> channel = abstractChannel();
1719             channel.disconnectTransport(promise);
1720             channel.runAfterTransportAction();
1721         }
1722 
1723         @Override
1724         protected final void closeTransport(Promise<Void> promise) {
1725             AbstractChannel<?, ?, ?> channel = abstractChannel();
1726             abstractChannel().closeTransport(promise);
1727             channel.runAfterTransportAction();
1728         }
1729 
1730         @Override
1731         protected final void shutdownTransport(ChannelShutdownDirection direction, Promise<Void> promise) {
1732             AbstractChannel<?, ?, ?> channel = abstractChannel();
1733             channel.shutdownTransport(direction, promise);
1734             channel.runAfterTransportAction();
1735         }
1736 
1737         @Override
1738         protected final void deregisterTransport(Promise<Void> promise) {
1739             AbstractChannel<?, ?, ?> channel = abstractChannel();
1740             channel.deregisterTransport(promise);
1741             channel.runAfterTransportAction();
1742         }
1743 
1744         @Override
1745         protected final void readTransport() {
1746             AbstractChannel<?, ?, ?> channel = abstractChannel();
1747             channel.readTransport();
1748             channel.runAfterTransportAction();
1749         }
1750 
1751         @Override
1752         protected final void writeTransport(Object msg, Promise<Void> promise) {
1753             AbstractChannel<?, ?, ?> channel = abstractChannel();
1754             channel.writeTransport(msg, promise);
1755             channel.runAfterTransportAction();
1756         }
1757 
1758         @Override
1759         protected final void flushTransport() {
1760             AbstractChannel<?, ?, ?> channel = abstractChannel();
1761             channel.flushTransport();
1762             channel.runAfterTransportAction();
1763         }
1764 
1765         @Override
1766         protected final void sendOutboundEventTransport(Object event, Promise<Void> promise) {
1767             AbstractChannel<?, ?, ?> channel = abstractChannel();
1768             channel.sendOutboundEventTransport(event, promise);
1769             channel.runAfterTransportAction();
1770         }
1771     }
1772 }