View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.channel.socket.ChannelOutputShutdownEvent;
19  import io.netty.channel.socket.ChannelOutputShutdownException;
20  import io.netty.util.DefaultAttributeMap;
21  import io.netty.util.ReferenceCountUtil;
22  import io.netty.util.internal.ObjectUtil;
23  import io.netty.util.internal.PlatformDependent;
24  import io.netty.util.internal.logging.InternalLogger;
25  import io.netty.util.internal.logging.InternalLoggerFactory;
26  
27  import java.io.IOException;
28  import java.net.ConnectException;
29  import java.net.InetSocketAddress;
30  import java.net.NoRouteToHostException;
31  import java.net.SocketAddress;
32  import java.net.SocketException;
33  import java.nio.channels.ClosedChannelException;
34  import java.nio.channels.NotYetConnectedException;
35  import java.util.concurrent.Executor;
36  import java.util.concurrent.RejectedExecutionException;
37  
38  /**
39   * A skeletal {@link Channel} implementation.
40   */
41  public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
42  
43      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
44  
45      private final Channel parent;
46      private final ChannelId id;
47      private final Unsafe unsafe;
48      private final DefaultChannelPipeline pipeline;
49      private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
50      private final CloseFuture closeFuture = new CloseFuture(this);
51  
52      private volatile SocketAddress localAddress;
53      private volatile SocketAddress remoteAddress;
54      private volatile EventLoop eventLoop;
55      private volatile boolean registered;
56      private boolean closeInitiated;
57      private Throwable initialCloseCause;
58  
59      /** Cache for the string representation of this channel */
60      private boolean strValActive;
61      private String strVal;
62  
63      /**
64       * Creates a new instance.
65       *
66       * @param parent
67       *        the parent of this channel. {@code null} if there's no parent.
68       */
69      protected AbstractChannel(Channel parent) {
70          this.parent = parent;
71          id = newId();
72          unsafe = newUnsafe();
73          pipeline = newChannelPipeline();
74      }
75  
76      /**
77       * Creates a new instance.
78       *
79       * @param parent
80       *        the parent of this channel. {@code null} if there's no parent.
81       */
82      protected AbstractChannel(Channel parent, ChannelId id) {
83          this.parent = parent;
84          this.id = id;
85          unsafe = newUnsafe();
86          pipeline = newChannelPipeline();
87      }
88  
89      protected final int maxMessagesPerWrite() {
90          ChannelConfig config = config();
91          if (config instanceof DefaultChannelConfig) {
92              return ((DefaultChannelConfig) config).getMaxMessagesPerWrite();
93          }
94          Integer value = config.getOption(ChannelOption.MAX_MESSAGES_PER_WRITE);
95          if (value == null) {
96              return Integer.MAX_VALUE;
97          }
98          return value;
99      }
100 
101     @Override
102     public final ChannelId id() {
103         return id;
104     }
105 
106     /**
107      * Returns a new {@link DefaultChannelId} instance. Subclasses may override this method to assign custom
108      * {@link ChannelId}s to {@link Channel}s that use the {@link AbstractChannel#AbstractChannel(Channel)} constructor.
109      */
110     protected ChannelId newId() {
111         return DefaultChannelId.newInstance();
112     }
113 
114     /**
115      * Returns a new {@link DefaultChannelPipeline} instance.
116      */
117     protected DefaultChannelPipeline newChannelPipeline() {
118         return new DefaultChannelPipeline(this);
119     }
120 
121     @Override
122     public Channel parent() {
123         return parent;
124     }
125 
126     @Override
127     public ChannelPipeline pipeline() {
128         return pipeline;
129     }
130 
131     @Override
132     public EventLoop eventLoop() {
133         EventLoop eventLoop = this.eventLoop;
134         if (eventLoop == null) {
135             throw new IllegalStateException("channel not registered to an event loop");
136         }
137         return eventLoop;
138     }
139 
140     @Override
141     public SocketAddress localAddress() {
142         SocketAddress localAddress = this.localAddress;
143         if (localAddress == null) {
144             try {
145                 this.localAddress = localAddress = unsafe().localAddress();
146             } catch (Error e) {
147                 throw e;
148             } catch (Throwable t) {
149                 // Sometimes fails on a closed socket in Windows.
150                 return null;
151             }
152         }
153         return localAddress;
154     }
155 
156     /**
157      * @deprecated no use-case for this.
158      */
159     @Deprecated
160     protected void invalidateLocalAddress() {
161         localAddress = null;
162     }
163 
164     @Override
165     public SocketAddress remoteAddress() {
166         SocketAddress remoteAddress = this.remoteAddress;
167         if (remoteAddress == null) {
168             try {
169                 this.remoteAddress = remoteAddress = unsafe().remoteAddress();
170             } catch (Error e) {
171                 throw e;
172             } catch (Throwable t) {
173                 // Sometimes fails on a closed socket in Windows.
174                 return null;
175             }
176         }
177         return remoteAddress;
178     }
179 
180     /**
181      * @deprecated no use-case for this.
182      */
183     @Deprecated
184     protected void invalidateRemoteAddress() {
185         remoteAddress = null;
186     }
187 
188     @Override
189     public boolean isRegistered() {
190         return registered;
191     }
192 
193     @Override
194     public ChannelFuture closeFuture() {
195         return closeFuture;
196     }
197 
198     @Override
199     public Unsafe unsafe() {
200         return unsafe;
201     }
202 
203     /**
204      * Create a new {@link AbstractUnsafe} instance which will be used for the life-time of the {@link Channel}
205      */
206     protected abstract AbstractUnsafe newUnsafe();
207 
208     /**
209      * Returns the ID of this channel.
210      */
211     @Override
212     public final int hashCode() {
213         return id.hashCode();
214     }
215 
216     /**
217      * Returns {@code true} if and only if the specified object is identical
218      * with this channel (i.e: {@code this == o}).
219      */
220     @Override
221     public final boolean equals(Object o) {
222         return this == o;
223     }
224 
225     @Override
226     public final int compareTo(Channel o) {
227         if (this == o) {
228             return 0;
229         }
230 
231         return id().compareTo(o.id());
232     }
233 
234     /**
235      * Returns the {@link String} representation of this channel.  The returned
236      * string contains the {@linkplain #hashCode() ID}, {@linkplain #localAddress() local address},
237      * and {@linkplain #remoteAddress() remote address} of this channel for
238      * easier identification.
239      */
240     @Override
241     public String toString() {
242         boolean active = isActive();
243         if (strValActive == active && strVal != null) {
244             return strVal;
245         }
246 
247         SocketAddress remoteAddr = remoteAddress();
248         SocketAddress localAddr = localAddress();
249         if (remoteAddr != null) {
250             StringBuilder buf = new StringBuilder(96)
251                 .append("[id: 0x")
252                 .append(id.asShortText())
253                 .append(", L:")
254                 .append(localAddr)
255                 .append(active? " - " : " ! ")
256                 .append("R:")
257                 .append(remoteAddr)
258                 .append(']');
259             strVal = buf.toString();
260         } else if (localAddr != null) {
261             StringBuilder buf = new StringBuilder(64)
262                 .append("[id: 0x")
263                 .append(id.asShortText())
264                 .append(", L:")
265                 .append(localAddr)
266                 .append(']');
267             strVal = buf.toString();
268         } else {
269             StringBuilder buf = new StringBuilder(16)
270                 .append("[id: 0x")
271                 .append(id.asShortText())
272                 .append(']');
273             strVal = buf.toString();
274         }
275 
276         strValActive = active;
277         return strVal;
278     }
279 
280     @Override
281     public final ChannelPromise voidPromise() {
282         return pipeline.voidPromise();
283     }
284 
285     /**
286      * {@link Unsafe} implementation which sub-classes must extend and use.
287      */
288     protected abstract class AbstractUnsafe implements Unsafe {
289 
290         private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
291         private RecvByteBufAllocator.Handle recvHandle;
292         private boolean inFlush0;
293         /** true if the channel has never been registered, false otherwise */
294         private boolean neverRegistered = true;
295 
296         private void assertEventLoop() {
297             assert !registered || eventLoop.inEventLoop();
298         }
299 
300         @Override
301         public RecvByteBufAllocator.Handle recvBufAllocHandle() {
302             if (recvHandle == null) {
303                 recvHandle = config().getRecvByteBufAllocator().newHandle();
304             }
305             return recvHandle;
306         }
307 
308         @Override
309         public final ChannelOutboundBuffer outboundBuffer() {
310             return outboundBuffer;
311         }
312 
313         @Override
314         public final SocketAddress localAddress() {
315             return localAddress0();
316         }
317 
318         @Override
319         public final SocketAddress remoteAddress() {
320             return remoteAddress0();
321         }
322 
323         @Override
324         public final void register(EventLoop eventLoop, final ChannelPromise promise) {
325             ObjectUtil.checkNotNull(eventLoop, "eventLoop");
326             if (isRegistered()) {
327                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
328                 return;
329             }
330             if (!isCompatible(eventLoop)) {
331                 promise.setFailure(
332                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
333                 return;
334             }
335 
336             AbstractChannel.this.eventLoop = eventLoop;
337 
338             // Clear any cached executors from prior event loop registrations.
339             AbstractChannelHandlerContext context = pipeline.tail;
340             do {
341                 context.contextExecutor = null;
342                 context = context.prev;
343             } while (context != null);
344 
345             if (eventLoop.inEventLoop()) {
346                 register0(promise);
347             } else {
348                 try {
349                     eventLoop.execute(new Runnable() {
350                         @Override
351                         public void run() {
352                             register0(promise);
353                         }
354                     });
355                 } catch (Throwable t) {
356                     logger.warn(
357                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",
358                             AbstractChannel.this, t);
359                     closeForcibly();
360                     closeFuture.setClosed();
361                     safeSetFailure(promise, t);
362                 }
363             }
364         }
365 
366         private void register0(ChannelPromise promise) {
367             // check if the channel is still open as it could be closed in the mean time when the register
368             // call was outside of the eventLoop
369             if (!promise.setUncancellable() || !ensureOpen(promise)) {
370                 return;
371             }
372             ChannelPromise registerPromise = newPromise();
373             boolean firstRegistration = neverRegistered;
374             registerPromise.addListener(new ChannelFutureListener() {
375                 @Override
376                 public void operationComplete(ChannelFuture future) throws Exception {
377                     if (future.isSuccess()) {
378                         neverRegistered = false;
379                         registered = true;
380 
381                         // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
382                         // user may already fire events through the pipeline in the ChannelFutureListener.
383                         pipeline.invokeHandlerAddedIfNeeded();
384 
385                         safeSetSuccess(promise);
386                         pipeline.fireChannelRegistered();
387                         // Only fire a channelActive if the channel has never been registered. This prevents firing
388                         // multiple channel actives if the channel is deregistered and re-registered.
389                         if (isActive()) {
390                             if (firstRegistration) {
391                                 pipeline.fireChannelActive();
392                             } else if (config().isAutoRead()) {
393                                 // This channel was registered before and autoRead() is set. This means we need to
394                                 // begin read again so that we process inbound data.
395                                 //
396                                 // See https://github.com/netty/netty/issues/4805
397                                 beginRead();
398                             }
399                         }
400                     } else {
401                         // Close the channel directly to avoid FD leak.
402                         closeForcibly();
403                         closeFuture.setClosed();
404                         safeSetFailure(promise, future.cause());
405                     }
406                 }
407             });
408             doRegister(registerPromise);
409         }
410 
411         @Override
412         public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
413             assertEventLoop();
414 
415             if (!promise.setUncancellable() || !ensureOpen(promise)) {
416                 return;
417             }
418 
419             // See: https://github.com/netty/netty/issues/576
420             if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
421                 localAddress instanceof InetSocketAddress &&
422                 !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
423                 !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
424                 // Warn a user about the fact that a non-root user can't receive a
425                 // broadcast packet on *nix if the socket is bound on non-wildcard address.
426                 logger.warn(
427                         "A non-root user can't receive a broadcast packet if the socket " +
428                         "is not bound to a wildcard address; binding to a non-wildcard " +
429                         "address (" + localAddress + ") anyway as requested.");
430             }
431 
432             boolean wasActive = isActive();
433             try {
434                 doBind(localAddress);
435             } catch (Throwable t) {
436                 safeSetFailure(promise, t);
437                 closeIfClosed();
438                 return;
439             }
440 
441             if (!wasActive && isActive()) {
442                 invokeLater(new Runnable() {
443                     @Override
444                     public void run() {
445                         pipeline.fireChannelActive();
446                     }
447                 });
448             }
449 
450             safeSetSuccess(promise);
451         }
452 
453         @Override
454         public final void disconnect(final ChannelPromise promise) {
455             assertEventLoop();
456 
457             if (!promise.setUncancellable()) {
458                 return;
459             }
460 
461             boolean wasActive = isActive();
462             try {
463                 doDisconnect();
464                 // Reset remoteAddress and localAddress
465                 remoteAddress = null;
466                 localAddress = null;
467             } catch (Throwable t) {
468                 safeSetFailure(promise, t);
469                 closeIfClosed();
470                 return;
471             }
472 
473             if (wasActive && !isActive()) {
474                 invokeLater(new Runnable() {
475                     @Override
476                     public void run() {
477                         pipeline.fireChannelInactive();
478                     }
479                 });
480             }
481 
482             safeSetSuccess(promise);
483             closeIfClosed(); // doDisconnect() might have closed the channel
484         }
485 
486         @Override
487         public void close(final ChannelPromise promise) {
488             assertEventLoop();
489 
490             ClosedChannelException closedChannelException =
491                     StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
492             close(promise, closedChannelException, closedChannelException);
493         }
494 
495         /**
496          * Shutdown the output portion of the corresponding {@link Channel}.
497          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
498          */
499         public final void shutdownOutput(final ChannelPromise promise) {
500             assertEventLoop();
501             shutdownOutput(promise, null);
502         }
503 
504         /**
505          * Shutdown the output portion of the corresponding {@link Channel}.
506          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
507          * @param cause The cause which may provide rational for the shutdown.
508          */
509         private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
510             if (!promise.setUncancellable()) {
511                 return;
512             }
513 
514             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
515             if (outboundBuffer == null) {
516                 promise.setFailure(new ClosedChannelException());
517                 return;
518             }
519             this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
520 
521             final Throwable shutdownCause = cause == null ?
522                     new ChannelOutputShutdownException("Channel output shutdown") :
523                     new ChannelOutputShutdownException("Channel output shutdown", cause);
524 
525             // When a side enables SO_LINGER and calls showdownOutput(...) to start TCP half-closure
526             // we can not call doDeregister here because we should ensure this side in fin_wait2 state
527             // can still receive and process the data which is send by another side in the close_wait state。
528             // See https://github.com/netty/netty/issues/11981
529             try {
530                 // The shutdown function does not block regardless of the SO_LINGER setting on the socket
531                 // so we don't need to use GlobalEventExecutor to execute the shutdown
532                 doShutdownOutput();
533                 promise.setSuccess();
534             } catch (Throwable err) {
535                 promise.setFailure(err);
536             } finally {
537                 closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
538             }
539         }
540 
541         private void closeOutboundBufferForShutdown(
542                 ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
543             buffer.failFlushed(cause, false);
544             buffer.close(cause, true);
545             pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
546         }
547 
548         protected void close(final ChannelPromise promise, final Throwable cause,
549                            final ClosedChannelException closeCause) {
550             if (!promise.setUncancellable()) {
551                 return;
552             }
553 
554             if (closeInitiated) {
555                 if (closeFuture.isDone()) {
556                     // Closed already.
557                     safeSetSuccess(promise);
558                 } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
559                     // This means close() was called before so we just register a listener and return
560                     closeFuture.addListener(new ChannelFutureListener() {
561                         @Override
562                         public void operationComplete(ChannelFuture future) throws Exception {
563                             promise.setSuccess();
564                         }
565                     });
566                 }
567                 return;
568             }
569 
570             closeInitiated = true;
571 
572             final boolean wasActive = isActive();
573             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
574             this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
575             Executor closeExecutor = prepareToClose();
576             if (closeExecutor != null) {
577                 closeExecutor.execute(new Runnable() {
578                     @Override
579                     public void run() {
580                         try {
581                             // Execute the close.
582                             doClose0(promise);
583                         } finally {
584                             // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
585                             invokeLater(new Runnable() {
586                                 @Override
587                                 public void run() {
588                                     if (outboundBuffer != null) {
589                                         // Fail all the queued messages
590                                         outboundBuffer.failFlushed(cause, false);
591                                         outboundBuffer.close(closeCause);
592                                     }
593                                     fireChannelInactiveAndDeregister(wasActive);
594                                 }
595                             });
596                         }
597                     }
598                 });
599             } else {
600                 try {
601                     // Close the channel and fail the queued messages in all cases.
602                     doClose0(promise);
603                 } finally {
604                     if (outboundBuffer != null) {
605                         // Fail all the queued messages.
606                         outboundBuffer.failFlushed(cause, false);
607                         outboundBuffer.close(closeCause);
608                     }
609                 }
610                 if (inFlush0) {
611                     invokeLater(new Runnable() {
612                         @Override
613                         public void run() {
614                             fireChannelInactiveAndDeregister(wasActive);
615                         }
616                     });
617                 } else {
618                     fireChannelInactiveAndDeregister(wasActive);
619                 }
620             }
621         }
622 
623         private void doClose0(ChannelPromise promise) {
624             try {
625                 doClose();
626                 closeFuture.setClosed();
627                 safeSetSuccess(promise);
628             } catch (Throwable t) {
629                 closeFuture.setClosed();
630                 safeSetFailure(promise, t);
631             }
632         }
633 
634         private void fireChannelInactiveAndDeregister(final boolean wasActive) {
635             deregister(voidPromise(), wasActive && !isActive());
636         }
637 
638         @Override
639         public final void closeForcibly() {
640             assertEventLoop();
641 
642             try {
643                 doClose();
644             } catch (Exception e) {
645                 logger.warn("Failed to close a channel.", e);
646             }
647         }
648 
649         @Override
650         public final void deregister(final ChannelPromise promise) {
651             assertEventLoop();
652 
653             deregister(promise, false);
654         }
655 
656         private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
657             if (!promise.setUncancellable()) {
658                 return;
659             }
660 
661             if (!registered) {
662                 safeSetSuccess(promise);
663                 return;
664             }
665 
666             // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
667             // we need to ensure we do the actual deregister operation later. This is needed as for example,
668             // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
669             // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
670             // the deregister operation this could lead to have a handler invoked by different EventLoop and so
671             // threads.
672             //
673             // See:
674             // https://github.com/netty/netty/issues/4435
675             invokeLater(new Runnable() {
676                 @Override
677                 public void run() {
678                     try {
679                         doDeregister();
680                     } catch (Throwable t) {
681                         logger.warn("Unexpected exception occurred while deregistering a channel.", t);
682                     } finally {
683                         if (fireChannelInactive) {
684                             pipeline.fireChannelInactive();
685                         }
686                         // Some transports like local and AIO does not allow the deregistration of
687                         // an open channel.  Their doDeregister() calls close(). Consequently,
688                         // close() calls deregister() again - no need to fire channelUnregistered, so check
689                         // if it was registered.
690                         if (registered) {
691                             registered = false;
692                             pipeline.fireChannelUnregistered();
693                         }
694                         safeSetSuccess(promise);
695                     }
696                 }
697             });
698         }
699 
700         @Override
701         public final void beginRead() {
702             assertEventLoop();
703 
704             try {
705                 doBeginRead();
706             } catch (final Exception e) {
707                 invokeLater(new Runnable() {
708                     @Override
709                     public void run() {
710                         pipeline.fireExceptionCaught(e);
711                     }
712                 });
713                 close(voidPromise());
714             }
715         }
716 
717         @Override
718         public final void write(Object msg, ChannelPromise promise) {
719             assertEventLoop();
720 
721             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
722             if (outboundBuffer == null) {
723                 try {
724                     // release message now to prevent resource-leak
725                     ReferenceCountUtil.release(msg);
726                 } finally {
727                     // If the outboundBuffer is null we know the channel was closed and so
728                     // need to fail the future right away. If it is not null the handling of the rest
729                     // will be done in flush0()
730                     // See https://github.com/netty/netty/issues/2362
731                     safeSetFailure(promise,
732                             newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
733                 }
734                 return;
735             }
736 
737             int size;
738             try {
739                 msg = filterOutboundMessage(msg);
740                 size = pipeline.estimatorHandle().size(msg);
741                 if (size < 0) {
742                     size = 0;
743                 }
744             } catch (Throwable t) {
745                 try {
746                     ReferenceCountUtil.release(msg);
747                 } finally {
748                     safeSetFailure(promise, t);
749                 }
750                 return;
751             }
752 
753             outboundBuffer.addMessage(msg, size, promise);
754         }
755 
756         @Override
757         public final void flush() {
758             assertEventLoop();
759 
760             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
761             if (outboundBuffer == null) {
762                 return;
763             }
764 
765             outboundBuffer.addFlush();
766             flush0();
767         }
768 
769         @SuppressWarnings("deprecation")
770         protected void flush0() {
771             if (inFlush0) {
772                 // Avoid re-entrance
773                 return;
774             }
775 
776             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
777             if (outboundBuffer == null || outboundBuffer.isEmpty()) {
778                 return;
779             }
780 
781             inFlush0 = true;
782 
783             // Mark all pending write requests as failure if the channel is inactive.
784             if (!isActive()) {
785                 try {
786                     // Check if we need to generate the exception at all.
787                     if (!outboundBuffer.isEmpty()) {
788                         if (isOpen()) {
789                             outboundBuffer.failFlushed(new NotYetConnectedException(), true);
790                         } else {
791                             // Do not trigger channelWritabilityChanged because the channel is closed already.
792                             outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
793                         }
794                     }
795                 } finally {
796                     inFlush0 = false;
797                 }
798                 return;
799             }
800 
801             try {
802                 doWrite(outboundBuffer);
803             } catch (Throwable t) {
804                 handleWriteError(t);
805             } finally {
806                 inFlush0 = false;
807             }
808         }
809 
810         protected final void handleWriteError(Throwable t) {
811             if (t instanceof IOException && config().isAutoClose()) {
812                 /**
813                  * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
814                  * failing all flushed messages and also ensure the actual close of the underlying transport
815                  * will happen before the promises are notified.
816                  *
817                  * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
818                  * may still return {@code true} even if the channel should be closed as result of the exception.
819                  */
820                 initialCloseCause = t;
821                 close(voidPromise(), t, newClosedChannelException(t, "flush0()"));
822             } else {
823                 try {
824                     shutdownOutput(voidPromise(), t);
825                 } catch (Throwable t2) {
826                     initialCloseCause = t;
827                     close(voidPromise(), t2, newClosedChannelException(t, "flush0()"));
828                 }
829             }
830         }
831 
832         private ClosedChannelException newClosedChannelException(Throwable cause, String method) {
833             ClosedChannelException exception =
834                     StacklessClosedChannelException.newInstance(AbstractChannel.AbstractUnsafe.class, method);
835             if (cause != null) {
836                 exception.initCause(cause);
837             }
838             return exception;
839         }
840 
841         @Override
842         public final ChannelPromise voidPromise() {
843             assertEventLoop();
844 
845             return unsafeVoidPromise;
846         }
847 
848         protected final boolean ensureOpen(ChannelPromise promise) {
849             if (isOpen()) {
850                 return true;
851             }
852 
853             safeSetFailure(promise, newClosedChannelException(initialCloseCause, "ensureOpen(ChannelPromise)"));
854             return false;
855         }
856 
857         /**
858          * Marks the specified {@code promise} as success.  If the {@code promise} is done already, log a message.
859          */
860         protected final void safeSetSuccess(ChannelPromise promise) {
861             if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
862                 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
863             }
864         }
865 
866         /**
867          * Marks the specified {@code promise} as failure.  If the {@code promise} is done already, log a message.
868          */
869         protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
870             if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
871                 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
872             }
873         }
874 
875         protected final void closeIfClosed() {
876             if (isOpen()) {
877                 return;
878             }
879             close(voidPromise());
880         }
881 
882         private void invokeLater(Runnable task) {
883             try {
884                 // This method is used by outbound operation implementations to trigger an inbound event later.
885                 // They do not trigger an inbound event immediately because an outbound operation might have been
886                 // triggered by another inbound event handler method.  If fired immediately, the call stack
887                 // will look like this for example:
888                 //
889                 //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
890                 //   -> handlerA.ctx.close()
891                 //      -> channel.unsafe.close()
892                 //         -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
893                 //
894                 // which means the execution of two inbound handler methods of the same handler overlap undesirably.
895                 eventLoop().execute(task);
896             } catch (RejectedExecutionException e) {
897                 logger.warn("Can't invoke task later as EventLoop rejected it", e);
898             }
899         }
900 
901         /**
902          * Appends the remote address to the message of the exceptions caused by connection attempt failure.
903          */
904         protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
905             if (cause instanceof ConnectException) {
906                 return new AnnotatedConnectException((ConnectException) cause, remoteAddress);
907             }
908             if (cause instanceof NoRouteToHostException) {
909                 return new AnnotatedNoRouteToHostException((NoRouteToHostException) cause, remoteAddress);
910             }
911             if (cause instanceof SocketException) {
912                 return new AnnotatedSocketException((SocketException) cause, remoteAddress);
913             }
914 
915             return cause;
916         }
917 
918         /**
919          * Prepares to close the {@link Channel}. If this method returns an {@link Executor}, the
920          * caller must call the {@link Executor#execute(Runnable)} method with a task that calls
921          * {@link #doClose()} on the returned {@link Executor}. If this method returns {@code null},
922          * {@link #doClose()} must be called from the caller thread. (i.e. {@link EventLoop})
923          */
924         protected Executor prepareToClose() {
925             return null;
926         }
927     }
928 
929     /**
930      * Return {@code true} if the given {@link EventLoop} is compatible with this instance.
931      */
932     protected abstract boolean isCompatible(EventLoop loop);
933 
934     /**
935      * Returns the {@link SocketAddress} which is bound locally.
936      */
937     protected abstract SocketAddress localAddress0();
938 
939     /**
940      * Return the {@link SocketAddress} which the {@link Channel} is connected to.
941      */
942     protected abstract SocketAddress remoteAddress0();
943 
944     /**
945      * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
946      * Subclasses may override this method
947      *
948      * @deprecated use {@link #doRegister(ChannelPromise)}
949      */
950     @Deprecated
951     protected void doRegister() throws Exception {
952         // NOOP
953     }
954 
955     /**
956      * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
957      * Subclasses may override this method
958      *
959      * @param promise {@link ChannelPromise} that must be notified once done to continue the registration.
960      */
961     protected void doRegister(ChannelPromise promise) {
962         try {
963             doRegister();
964         } catch (Throwable cause) {
965             promise.setFailure(cause);
966             return;
967         }
968         promise.setSuccess();
969     }
970 
971     /**
972      * Bind the {@link Channel} to the {@link SocketAddress}
973      */
974     protected abstract void doBind(SocketAddress localAddress) throws Exception;
975 
976     /**
977      * Disconnect this {@link Channel} from its remote peer
978      */
979     protected abstract void doDisconnect() throws Exception;
980 
981     /**
982      * Close the {@link Channel}
983      */
984     protected abstract void doClose() throws Exception;
985 
986     /**
987      * Called when conditions justify shutting down the output portion of the channel. This may happen if a write
988      * operation throws an exception.
989      */
990     protected void doShutdownOutput() throws Exception {
991         doClose();
992     }
993 
994     /**
995      * Deregister the {@link Channel} from its {@link EventLoop}.
996      *
997      * Sub-classes may override this method
998      */
999     protected void doDeregister() throws Exception {
1000         // NOOP
1001     }
1002 
1003     /**
1004      * Schedule a read operation.
1005      */
1006     protected abstract void doBeginRead() throws Exception;
1007 
1008     /**
1009      * Flush the content of the given buffer to the remote peer.
1010      */
1011     protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
1012 
1013     /**
1014      * Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that
1015      * the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer)
1016      */
1017     protected Object filterOutboundMessage(Object msg) throws Exception {
1018         return msg;
1019     }
1020 
1021     protected void validateFileRegion(DefaultFileRegion region, long position) throws IOException {
1022         DefaultFileRegion.validate(region, position);
1023     }
1024 
1025     static final class CloseFuture extends DefaultChannelPromise {
1026 
1027         CloseFuture(AbstractChannel ch) {
1028             super(ch);
1029         }
1030 
1031         @Override
1032         public ChannelPromise setSuccess() {
1033             throw new IllegalStateException();
1034         }
1035 
1036         @Override
1037         public ChannelPromise setFailure(Throwable cause) {
1038             throw new IllegalStateException();
1039         }
1040 
1041         @Override
1042         public boolean trySuccess() {
1043             throw new IllegalStateException();
1044         }
1045 
1046         @Override
1047         public boolean tryFailure(Throwable cause) {
1048             throw new IllegalStateException();
1049         }
1050 
1051         boolean setClosed() {
1052             return super.trySuccess();
1053         }
1054     }
1055 
1056     private static final class AnnotatedConnectException extends ConnectException {
1057 
1058         private static final long serialVersionUID = 3901958112696433556L;
1059 
1060         AnnotatedConnectException(ConnectException exception, SocketAddress remoteAddress) {
1061             super(exception.getMessage() + ": " + remoteAddress);
1062             initCause(exception);
1063         }
1064 
1065         // Suppress a warning since this method doesn't need synchronization
1066         @Override
1067         public Throwable fillInStackTrace() {
1068             return this;
1069         }
1070     }
1071 
1072     private static final class AnnotatedNoRouteToHostException extends NoRouteToHostException {
1073 
1074         private static final long serialVersionUID = -6801433937592080623L;
1075 
1076         AnnotatedNoRouteToHostException(NoRouteToHostException exception, SocketAddress remoteAddress) {
1077             super(exception.getMessage() + ": " + remoteAddress);
1078             initCause(exception);
1079         }
1080 
1081         // Suppress a warning since this method doesn't need synchronization
1082         @Override
1083         public Throwable fillInStackTrace() {
1084             return this;
1085         }
1086     }
1087 
1088     private static final class AnnotatedSocketException extends SocketException {
1089 
1090         private static final long serialVersionUID = 3896743275010454039L;
1091 
1092         AnnotatedSocketException(SocketException exception, SocketAddress remoteAddress) {
1093             super(exception.getMessage() + ": " + remoteAddress);
1094             initCause(exception);
1095         }
1096 
1097         // Suppress a warning since this method doesn't need synchronization
1098         @Override
1099         public Throwable fillInStackTrace() {
1100             return this;
1101         }
1102     }
1103 }