View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.channel.socket.ChannelOutputShutdownEvent;
19  import io.netty.channel.socket.ChannelOutputShutdownException;
20  import io.netty.util.DefaultAttributeMap;
21  import io.netty.util.ReferenceCountUtil;
22  import io.netty.util.internal.ObjectUtil;
23  import io.netty.util.internal.PlatformDependent;
24  import io.netty.util.internal.logging.InternalLogger;
25  import io.netty.util.internal.logging.InternalLoggerFactory;
26  
27  import java.io.IOException;
28  import java.net.ConnectException;
29  import java.net.InetSocketAddress;
30  import java.net.NoRouteToHostException;
31  import java.net.SocketAddress;
32  import java.net.SocketException;
33  import java.nio.channels.ClosedChannelException;
34  import java.nio.channels.NotYetConnectedException;
35  import java.util.concurrent.Executor;
36  import java.util.concurrent.RejectedExecutionException;
37  
38  /**
39   * A skeletal {@link Channel} implementation.
40   */
41  public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
42  
43      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
44  
45      private final Channel parent;
46      private final ChannelId id;
47      private final Unsafe unsafe;
48      private final DefaultChannelPipeline pipeline;
49      private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
50      private final CloseFuture closeFuture = new CloseFuture(this);
51  
52      private volatile SocketAddress localAddress;
53      private volatile SocketAddress remoteAddress;
54      private volatile EventLoop eventLoop;
55      private volatile boolean registered;
56      private boolean closeInitiated;
57      private Throwable initialCloseCause;
58  
59      /** Cache for the string representation of this channel */
60      private boolean strValActive;
61      private String strVal;
62  
63      /**
64       * Creates a new instance.
65       *
66       * @param parent
67       *        the parent of this channel. {@code null} if there's no parent.
68       */
69      protected AbstractChannel(Channel parent) {
70          this.parent = parent;
71          id = newId();
72          unsafe = newUnsafe();
73          pipeline = newChannelPipeline();
74      }
75  
76      /**
77       * Creates a new instance.
78       *
79       * @param parent
80       *        the parent of this channel. {@code null} if there's no parent.
81       */
82      protected AbstractChannel(Channel parent, ChannelId id) {
83          this.parent = parent;
84          this.id = id;
85          unsafe = newUnsafe();
86          pipeline = newChannelPipeline();
87      }
88  
89      protected final int maxMessagesPerWrite() {
90          ChannelConfig config = config();
91          if (config instanceof DefaultChannelConfig) {
92              return ((DefaultChannelConfig) config).getMaxMessagesPerWrite();
93          }
94          Integer value = config.getOption(ChannelOption.MAX_MESSAGES_PER_WRITE);
95          if (value == null) {
96              return Integer.MAX_VALUE;
97          }
98          return value;
99      }
100 
101     @Override
102     public final ChannelId id() {
103         return id;
104     }
105 
106     /**
107      * Returns a new {@link DefaultChannelId} instance. Subclasses may override this method to assign custom
108      * {@link ChannelId}s to {@link Channel}s that use the {@link AbstractChannel#AbstractChannel(Channel)} constructor.
109      */
110     protected ChannelId newId() {
111         return DefaultChannelId.newInstance();
112     }
113 
114     /**
115      * Returns a new {@link DefaultChannelPipeline} instance.
116      */
117     protected DefaultChannelPipeline newChannelPipeline() {
118         return new DefaultChannelPipeline(this);
119     }
120 
121     @Override
122     public Channel parent() {
123         return parent;
124     }
125 
126     @Override
127     public ChannelPipeline pipeline() {
128         return pipeline;
129     }
130 
131     @Override
132     public EventLoop eventLoop() {
133         EventLoop eventLoop = this.eventLoop;
134         if (eventLoop == null) {
135             throw new IllegalStateException("channel not registered to an event loop");
136         }
137         return eventLoop;
138     }
139 
140     @Override
141     public SocketAddress localAddress() {
142         SocketAddress localAddress = this.localAddress;
143         if (localAddress == null) {
144             try {
145                 this.localAddress = localAddress = unsafe().localAddress();
146             } catch (Error e) {
147                 throw e;
148             } catch (Throwable t) {
149                 // Sometimes fails on a closed socket in Windows.
150                 return null;
151             }
152         }
153         return localAddress;
154     }
155 
156     /**
157      * @deprecated no use-case for this.
158      */
159     @Deprecated
160     protected void invalidateLocalAddress() {
161         localAddress = null;
162     }
163 
164     @Override
165     public SocketAddress remoteAddress() {
166         SocketAddress remoteAddress = this.remoteAddress;
167         if (remoteAddress == null) {
168             try {
169                 this.remoteAddress = remoteAddress = unsafe().remoteAddress();
170             } catch (Error e) {
171                 throw e;
172             } catch (Throwable t) {
173                 // Sometimes fails on a closed socket in Windows.
174                 return null;
175             }
176         }
177         return remoteAddress;
178     }
179 
180     /**
181      * @deprecated no use-case for this.
182      */
183     @Deprecated
184     protected void invalidateRemoteAddress() {
185         remoteAddress = null;
186     }
187 
188     @Override
189     public boolean isRegistered() {
190         return registered;
191     }
192 
193     @Override
194     public ChannelFuture closeFuture() {
195         return closeFuture;
196     }
197 
198     @Override
199     public Unsafe unsafe() {
200         return unsafe;
201     }
202 
203     /**
204      * Create a new {@link AbstractUnsafe} instance which will be used for the life-time of the {@link Channel}
205      */
206     protected abstract AbstractUnsafe newUnsafe();
207 
208     /**
209      * Returns the ID of this channel.
210      */
211     @Override
212     public final int hashCode() {
213         return id.hashCode();
214     }
215 
216     /**
217      * Returns {@code true} if and only if the specified object is identical
218      * with this channel (i.e: {@code this == o}).
219      */
220     @Override
221     public final boolean equals(Object o) {
222         return this == o;
223     }
224 
225     @Override
226     public final int compareTo(Channel o) {
227         if (this == o) {
228             return 0;
229         }
230 
231         return id().compareTo(o.id());
232     }
233 
234     /**
235      * Returns the {@link String} representation of this channel.  The returned
236      * string contains the {@linkplain #hashCode() ID}, {@linkplain #localAddress() local address},
237      * and {@linkplain #remoteAddress() remote address} of this channel for
238      * easier identification.
239      */
240     @Override
241     public String toString() {
242         boolean active = isActive();
243         if (strValActive == active && strVal != null) {
244             return strVal;
245         }
246 
247         SocketAddress remoteAddr = remoteAddress();
248         SocketAddress localAddr = localAddress();
249         if (remoteAddr != null) {
250             StringBuilder buf = new StringBuilder(96)
251                 .append("[id: 0x")
252                 .append(id.asShortText())
253                 .append(", L:")
254                 .append(localAddr)
255                 .append(active? " - " : " ! ")
256                 .append("R:")
257                 .append(remoteAddr)
258                 .append(']');
259             strVal = buf.toString();
260         } else if (localAddr != null) {
261             StringBuilder buf = new StringBuilder(64)
262                 .append("[id: 0x")
263                 .append(id.asShortText())
264                 .append(", L:")
265                 .append(localAddr)
266                 .append(']');
267             strVal = buf.toString();
268         } else {
269             StringBuilder buf = new StringBuilder(16)
270                 .append("[id: 0x")
271                 .append(id.asShortText())
272                 .append(']');
273             strVal = buf.toString();
274         }
275 
276         strValActive = active;
277         return strVal;
278     }
279 
280     @Override
281     public final ChannelPromise voidPromise() {
282         return pipeline.voidPromise();
283     }
284 
285     /**
286      * {@link Unsafe} implementation which sub-classes must extend and use.
287      */
288     protected abstract class AbstractUnsafe implements Unsafe {
289 
290         private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
291         private RecvByteBufAllocator.Handle recvHandle;
292         private boolean inFlush0;
293         /** true if the channel has never been registered, false otherwise */
294         private boolean neverRegistered = true;
295 
296         private void assertEventLoop() {
297             assert !registered || eventLoop.inEventLoop();
298         }
299 
300         @Override
301         public RecvByteBufAllocator.Handle recvBufAllocHandle() {
302             if (recvHandle == null) {
303                 recvHandle = config().getRecvByteBufAllocator().newHandle();
304             }
305             return recvHandle;
306         }
307 
308         @Override
309         public final ChannelOutboundBuffer outboundBuffer() {
310             return outboundBuffer;
311         }
312 
313         @Override
314         public final SocketAddress localAddress() {
315             return localAddress0();
316         }
317 
318         @Override
319         public final SocketAddress remoteAddress() {
320             return remoteAddress0();
321         }
322 
323         @Override
324         public final void register(EventLoop eventLoop, final ChannelPromise promise) {
325             ObjectUtil.checkNotNull(eventLoop, "eventLoop");
326             if (isRegistered()) {
327                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
328                 return;
329             }
330             if (!isCompatible(eventLoop)) {
331                 promise.setFailure(
332                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
333                 return;
334             }
335 
336             AbstractChannel.this.eventLoop = eventLoop;
337 
338             // Clear any cached executors from prior event loop registrations.
339             AbstractChannelHandlerContext context = pipeline.tail;
340             do {
341                 context.contextExecutor = null;
342                 context = context.prev;
343             } while (context != null);
344 
345             if (eventLoop.inEventLoop()) {
346                 register0(promise);
347             } else {
348                 try {
349                     eventLoop.execute(new Runnable() {
350                         @Override
351                         public void run() {
352                             register0(promise);
353                         }
354                     });
355                 } catch (Throwable t) {
356                     logger.warn(
357                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",
358                             AbstractChannel.this, t);
359                     closeForcibly();
360                     closeFuture.setClosed();
361                     safeSetFailure(promise, t);
362                 }
363             }
364         }
365 
366         private void register0(ChannelPromise promise) {
367             // check if the channel is still open as it could be closed in the mean time when the register
368             // call was outside of the eventLoop
369             if (!promise.setUncancellable() || !ensureOpen(promise)) {
370                 return;
371             }
372             ChannelPromise registerPromise = newPromise();
373             boolean firstRegistration = neverRegistered;
374             registerPromise.addListener(future -> {
375                 if (future.isSuccess()) {
376                     neverRegistered = false;
377                     registered = true;
378 
379                     // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
380                     // user may already fire events through the pipeline in the ChannelFutureListener.
381                     pipeline.invokeHandlerAddedIfNeeded();
382 
383                     safeSetSuccess(promise);
384                     pipeline.fireChannelRegistered();
385                     // Only fire a channelActive if the channel has never been registered. This prevents firing
386                     // multiple channel actives if the channel is deregistered and re-registered.
387                     if (isActive()) {
388                         if (firstRegistration) {
389                             pipeline.fireChannelActive();
390                         } else if (config().isAutoRead()) {
391                             // This channel was registered before and autoRead() is set. This means we need to
392                             // begin read again so that we process inbound data.
393                             //
394                             // See https://github.com/netty/netty/issues/4805
395                             beginRead();
396                         }
397                     }
398                 } else {
399                     // Close the channel directly to avoid FD leak.
400                     close(newPromise());
401                     closeFuture.setClosed();
402                     safeSetFailure(promise, future.cause());
403                 }
404             });
405             doRegister(registerPromise);
406         }
407 
408         @Override
409         public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
410             assertEventLoop();
411 
412             if (!promise.setUncancellable() || !ensureOpen(promise)) {
413                 return;
414             }
415 
416             // See: https://github.com/netty/netty/issues/576
417             if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
418                 localAddress instanceof InetSocketAddress &&
419                 !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
420                 !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
421                 // Warn a user about the fact that a non-root user can't receive a
422                 // broadcast packet on *nix if the socket is bound on non-wildcard address.
423                 logger.warn(
424                         "A non-root user can't receive a broadcast packet if the socket " +
425                         "is not bound to a wildcard address; binding to a non-wildcard " +
426                         "address (" + localAddress + ") anyway as requested.");
427             }
428 
429             boolean wasActive = isActive();
430             try {
431                 doBind(localAddress);
432             } catch (Throwable t) {
433                 safeSetFailure(promise, t);
434                 closeIfClosed();
435                 return;
436             }
437 
438             if (!wasActive && isActive()) {
439                 invokeLater(new Runnable() {
440                     @Override
441                     public void run() {
442                         pipeline.fireChannelActive();
443                     }
444                 });
445             }
446 
447             safeSetSuccess(promise);
448         }
449 
450         @Override
451         public final void disconnect(final ChannelPromise promise) {
452             assertEventLoop();
453 
454             if (!promise.setUncancellable()) {
455                 return;
456             }
457 
458             boolean wasActive = isActive();
459             try {
460                 doDisconnect();
461                 // Reset remoteAddress and localAddress
462                 remoteAddress = null;
463                 localAddress = null;
464             } catch (Throwable t) {
465                 safeSetFailure(promise, t);
466                 closeIfClosed();
467                 return;
468             }
469 
470             if (wasActive && !isActive()) {
471                 invokeLater(new Runnable() {
472                     @Override
473                     public void run() {
474                         pipeline.fireChannelInactive();
475                     }
476                 });
477             }
478 
479             safeSetSuccess(promise);
480             closeIfClosed(); // doDisconnect() might have closed the channel
481         }
482 
483         @Override
484         public void close(final ChannelPromise promise) {
485             assertEventLoop();
486 
487             ClosedChannelException closedChannelException =
488                     StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
489             close(promise, closedChannelException, closedChannelException);
490         }
491 
492         /**
493          * Shutdown the output portion of the corresponding {@link Channel}.
494          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
495          */
496         public final void shutdownOutput(final ChannelPromise promise) {
497             assertEventLoop();
498             shutdownOutput(promise, null);
499         }
500 
501         /**
502          * Shutdown the output portion of the corresponding {@link Channel}.
503          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
504          * @param cause The cause which may provide rational for the shutdown.
505          */
506         private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
507             if (!promise.setUncancellable()) {
508                 return;
509             }
510 
511             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
512             if (outboundBuffer == null) {
513                 promise.setFailure(new ClosedChannelException());
514                 return;
515             }
516             this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
517 
518             final Throwable shutdownCause = cause == null ?
519                     new ChannelOutputShutdownException("Channel output shutdown") :
520                     new ChannelOutputShutdownException("Channel output shutdown", cause);
521 
522             // When a side enables SO_LINGER and calls showdownOutput(...) to start TCP half-closure
523             // we can not call doDeregister here because we should ensure this side in fin_wait2 state
524             // can still receive and process the data which is send by another side in the close_wait state。
525             // See https://github.com/netty/netty/issues/11981
526             try {
527                 // The shutdown function does not block regardless of the SO_LINGER setting on the socket
528                 // so we don't need to use GlobalEventExecutor to execute the shutdown
529                 doShutdownOutput();
530                 promise.setSuccess();
531             } catch (Throwable err) {
532                 promise.setFailure(err);
533             } finally {
534                 closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
535             }
536         }
537 
538         private void closeOutboundBufferForShutdown(
539                 ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
540             buffer.failFlushed(cause, false);
541             buffer.close(cause, true);
542             pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
543         }
544 
545         protected void close(final ChannelPromise promise, final Throwable cause,
546                            final ClosedChannelException closeCause) {
547             if (!promise.setUncancellable()) {
548                 return;
549             }
550 
551             if (closeInitiated) {
552                 if (closeFuture.isDone()) {
553                     // Closed already.
554                     safeSetSuccess(promise);
555                 } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
556                     // This means close() was called before so we just register a listener and return
557                     closeFuture.addListener(future -> promise.setSuccess());
558                 }
559                 return;
560             }
561 
562             closeInitiated = true;
563 
564             final boolean wasActive = isActive();
565             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
566             this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
567             Executor closeExecutor = prepareToClose();
568             if (closeExecutor != null) {
569                 closeExecutor.execute(new Runnable() {
570                     @Override
571                     public void run() {
572                         try {
573                             // Execute the close.
574                             doClose0(promise);
575                         } finally {
576                             // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
577                             invokeLater(new Runnable() {
578                                 @Override
579                                 public void run() {
580                                     if (outboundBuffer != null) {
581                                         // Fail all the queued messages
582                                         outboundBuffer.failFlushed(cause, false);
583                                         outboundBuffer.close(closeCause);
584                                     }
585                                     fireChannelInactiveAndDeregister(wasActive);
586                                 }
587                             });
588                         }
589                     }
590                 });
591             } else {
592                 try {
593                     // Close the channel and fail the queued messages in all cases.
594                     doClose0(promise);
595                 } finally {
596                     if (outboundBuffer != null) {
597                         // Fail all the queued messages.
598                         outboundBuffer.failFlushed(cause, false);
599                         outboundBuffer.close(closeCause);
600                     }
601                 }
602                 if (inFlush0) {
603                     invokeLater(new Runnable() {
604                         @Override
605                         public void run() {
606                             fireChannelInactiveAndDeregister(wasActive);
607                         }
608                     });
609                 } else {
610                     fireChannelInactiveAndDeregister(wasActive);
611                 }
612             }
613         }
614 
615         private void doClose0(ChannelPromise promise) {
616             try {
617                 doClose();
618                 closeFuture.setClosed();
619                 safeSetSuccess(promise);
620             } catch (Throwable t) {
621                 closeFuture.setClosed();
622                 safeSetFailure(promise, t);
623             }
624         }
625 
626         private void fireChannelInactiveAndDeregister(final boolean wasActive) {
627             deregister(voidPromise(), wasActive && !isActive());
628         }
629 
630         @Override
631         public final void closeForcibly() {
632             assertEventLoop();
633 
634             try {
635                 doClose();
636             } catch (Exception e) {
637                 logger.warn("Failed to close a channel.", e);
638             }
639         }
640 
641         @Override
642         public final void deregister(final ChannelPromise promise) {
643             assertEventLoop();
644 
645             deregister(promise, false);
646         }
647 
648         private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
649             if (!promise.setUncancellable()) {
650                 return;
651             }
652 
653             if (!registered) {
654                 safeSetSuccess(promise);
655                 return;
656             }
657 
658             // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
659             // we need to ensure we do the actual deregister operation later. This is needed as for example,
660             // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
661             // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
662             // the deregister operation this could lead to have a handler invoked by different EventLoop and so
663             // threads.
664             //
665             // See:
666             // https://github.com/netty/netty/issues/4435
667             invokeLater(new Runnable() {
668                 @Override
669                 public void run() {
670                     try {
671                         doDeregister();
672                     } catch (Throwable t) {
673                         logger.warn("Unexpected exception occurred while deregistering a channel.", t);
674                     } finally {
675                         if (fireChannelInactive) {
676                             pipeline.fireChannelInactive();
677                         }
678                         // Some transports like local and AIO does not allow the deregistration of
679                         // an open channel.  Their doDeregister() calls close(). Consequently,
680                         // close() calls deregister() again - no need to fire channelUnregistered, so check
681                         // if it was registered.
682                         if (registered) {
683                             registered = false;
684                             pipeline.fireChannelUnregistered();
685                         }
686                         safeSetSuccess(promise);
687                     }
688                 }
689             });
690         }
691 
692         @Override
693         public final void beginRead() {
694             assertEventLoop();
695 
696             try {
697                 doBeginRead();
698             } catch (final Exception e) {
699                 invokeLater(new Runnable() {
700                     @Override
701                     public void run() {
702                         pipeline.fireExceptionCaught(e);
703                     }
704                 });
705                 close(voidPromise());
706             }
707         }
708 
709         @Override
710         public final void write(Object msg, ChannelPromise promise) {
711             assertEventLoop();
712 
713             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
714             if (outboundBuffer == null) {
715                 try {
716                     // release message now to prevent resource-leak
717                     ReferenceCountUtil.release(msg);
718                 } finally {
719                     // If the outboundBuffer is null we know the channel was closed and so
720                     // need to fail the future right away. If it is not null the handling of the rest
721                     // will be done in flush0()
722                     // See https://github.com/netty/netty/issues/2362
723                     safeSetFailure(promise,
724                             newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
725                 }
726                 return;
727             }
728 
729             int size;
730             try {
731                 msg = filterOutboundMessage(msg);
732                 size = pipeline.estimatorHandle().size(msg);
733                 if (size < 0) {
734                     size = 0;
735                 }
736             } catch (Throwable t) {
737                 try {
738                     ReferenceCountUtil.release(msg);
739                 } finally {
740                     safeSetFailure(promise, t);
741                 }
742                 return;
743             }
744 
745             outboundBuffer.addMessage(msg, size, promise);
746         }
747 
748         @Override
749         public final void flush() {
750             assertEventLoop();
751 
752             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
753             if (outboundBuffer == null) {
754                 return;
755             }
756 
757             outboundBuffer.addFlush();
758             flush0();
759         }
760 
761         @SuppressWarnings("deprecation")
762         protected void flush0() {
763             if (inFlush0) {
764                 // Avoid re-entrance
765                 return;
766             }
767 
768             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
769             if (outboundBuffer == null || outboundBuffer.isEmpty()) {
770                 return;
771             }
772 
773             inFlush0 = true;
774 
775             // Mark all pending write requests as failure if the channel is inactive.
776             if (!isActive()) {
777                 try {
778                     // Check if we need to generate the exception at all.
779                     if (!outboundBuffer.isEmpty()) {
780                         if (isOpen()) {
781                             outboundBuffer.failFlushed(new NotYetConnectedException(), true);
782                         } else {
783                             // Do not trigger channelWritabilityChanged because the channel is closed already.
784                             outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
785                         }
786                     }
787                 } finally {
788                     inFlush0 = false;
789                 }
790                 return;
791             }
792 
793             try {
794                 doWrite(outboundBuffer);
795             } catch (Throwable t) {
796                 handleWriteError(t);
797             } finally {
798                 inFlush0 = false;
799             }
800         }
801 
802         protected final void handleWriteError(Throwable t) {
803             if (t instanceof IOException && config().isAutoClose()) {
804                 /**
805                  * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
806                  * failing all flushed messages and also ensure the actual close of the underlying transport
807                  * will happen before the promises are notified.
808                  *
809                  * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
810                  * may still return {@code true} even if the channel should be closed as result of the exception.
811                  */
812                 initialCloseCause = t;
813                 close(voidPromise(), t, newClosedChannelException(t, "flush0()"));
814             } else {
815                 try {
816                     shutdownOutput(voidPromise(), t);
817                 } catch (Throwable t2) {
818                     initialCloseCause = t;
819                     close(voidPromise(), t2, newClosedChannelException(t, "flush0()"));
820                 }
821             }
822         }
823 
824         private ClosedChannelException newClosedChannelException(Throwable cause, String method) {
825             ClosedChannelException exception =
826                     StacklessClosedChannelException.newInstance(AbstractChannel.AbstractUnsafe.class, method);
827             if (cause != null) {
828                 exception.initCause(cause);
829             }
830             return exception;
831         }
832 
833         @Override
834         public final ChannelPromise voidPromise() {
835             assertEventLoop();
836 
837             return unsafeVoidPromise;
838         }
839 
840         protected final boolean ensureOpen(ChannelPromise promise) {
841             if (isOpen()) {
842                 return true;
843             }
844 
845             safeSetFailure(promise, newClosedChannelException(initialCloseCause, "ensureOpen(ChannelPromise)"));
846             return false;
847         }
848 
849         /**
850          * Marks the specified {@code promise} as success.  If the {@code promise} is done already, log a message.
851          */
852         protected final void safeSetSuccess(ChannelPromise promise) {
853             if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
854                 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
855             }
856         }
857 
858         /**
859          * Marks the specified {@code promise} as failure.  If the {@code promise} is done already, log a message.
860          */
861         protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
862             if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
863                 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
864             }
865         }
866 
867         protected final void closeIfClosed() {
868             if (isOpen()) {
869                 return;
870             }
871             close(voidPromise());
872         }
873 
874         private void invokeLater(Runnable task) {
875             try {
876                 // This method is used by outbound operation implementations to trigger an inbound event later.
877                 // They do not trigger an inbound event immediately because an outbound operation might have been
878                 // triggered by another inbound event handler method.  If fired immediately, the call stack
879                 // will look like this for example:
880                 //
881                 //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
882                 //   -> handlerA.ctx.close()
883                 //      -> channel.unsafe.close()
884                 //         -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
885                 //
886                 // which means the execution of two inbound handler methods of the same handler overlap undesirably.
887                 eventLoop().execute(task);
888             } catch (RejectedExecutionException e) {
889                 logger.warn("Can't invoke task later as EventLoop rejected it", e);
890             }
891         }
892 
893         /**
894          * Appends the remote address to the message of the exceptions caused by connection attempt failure.
895          */
896         protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
897             if (cause instanceof ConnectException) {
898                 return new AnnotatedConnectException((ConnectException) cause, remoteAddress);
899             }
900             if (cause instanceof NoRouteToHostException) {
901                 return new AnnotatedNoRouteToHostException((NoRouteToHostException) cause, remoteAddress);
902             }
903             if (cause instanceof SocketException) {
904                 return new AnnotatedSocketException((SocketException) cause, remoteAddress);
905             }
906 
907             return cause;
908         }
909 
910         /**
911          * Prepares to close the {@link Channel}. If this method returns an {@link Executor}, the
912          * caller must call the {@link Executor#execute(Runnable)} method with a task that calls
913          * {@link #doClose()} on the returned {@link Executor}. If this method returns {@code null},
914          * {@link #doClose()} must be called from the caller thread. (i.e. {@link EventLoop})
915          */
916         protected Executor prepareToClose() {
917             return null;
918         }
919     }
920 
921     /**
922      * Return {@code true} if the given {@link EventLoop} is compatible with this instance.
923      */
924     protected abstract boolean isCompatible(EventLoop loop);
925 
926     /**
927      * Returns the {@link SocketAddress} which is bound locally.
928      */
929     protected abstract SocketAddress localAddress0();
930 
931     /**
932      * Return the {@link SocketAddress} which the {@link Channel} is connected to.
933      */
934     protected abstract SocketAddress remoteAddress0();
935 
936     /**
937      * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
938      * Subclasses may override this method
939      *
940      * @deprecated use {@link #doRegister(ChannelPromise)}
941      */
942     @Deprecated
943     protected void doRegister() throws Exception {
944         // NOOP
945     }
946 
947     /**
948      * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
949      * Subclasses may override this method
950      *
951      * @param promise {@link ChannelPromise} that must be notified once done to continue the registration.
952      */
953     protected void doRegister(ChannelPromise promise) {
954         try {
955             doRegister();
956         } catch (Throwable cause) {
957             promise.setFailure(cause);
958             return;
959         }
960         promise.setSuccess();
961     }
962 
963     /**
964      * Bind the {@link Channel} to the {@link SocketAddress}
965      */
966     protected abstract void doBind(SocketAddress localAddress) throws Exception;
967 
968     /**
969      * Disconnect this {@link Channel} from its remote peer
970      */
971     protected abstract void doDisconnect() throws Exception;
972 
973     /**
974      * Close the {@link Channel}
975      */
976     protected abstract void doClose() throws Exception;
977 
978     /**
979      * Called when conditions justify shutting down the output portion of the channel. This may happen if a write
980      * operation throws an exception.
981      */
982     protected void doShutdownOutput() throws Exception {
983         doClose();
984     }
985 
986     /**
987      * Deregister the {@link Channel} from its {@link EventLoop}.
988      *
989      * Sub-classes may override this method
990      */
991     protected void doDeregister() throws Exception {
992         // NOOP
993     }
994 
995     /**
996      * Schedule a read operation.
997      */
998     protected abstract void doBeginRead() throws Exception;
999 
1000     /**
1001      * Flush the content of the given buffer to the remote peer.
1002      */
1003     protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
1004 
1005     /**
1006      * Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that
1007      * the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer)
1008      */
1009     protected Object filterOutboundMessage(Object msg) throws Exception {
1010         return msg;
1011     }
1012 
1013     protected void validateFileRegion(DefaultFileRegion region, long position) throws IOException {
1014         DefaultFileRegion.validate(region, position);
1015     }
1016 
1017     static final class CloseFuture extends DefaultChannelPromise {
1018 
1019         CloseFuture(AbstractChannel ch) {
1020             super(ch);
1021         }
1022 
1023         @Override
1024         public ChannelPromise setSuccess() {
1025             throw new IllegalStateException();
1026         }
1027 
1028         @Override
1029         public ChannelPromise setFailure(Throwable cause) {
1030             throw new IllegalStateException();
1031         }
1032 
1033         @Override
1034         public boolean trySuccess() {
1035             throw new IllegalStateException();
1036         }
1037 
1038         @Override
1039         public boolean tryFailure(Throwable cause) {
1040             throw new IllegalStateException();
1041         }
1042 
1043         boolean setClosed() {
1044             return super.trySuccess();
1045         }
1046     }
1047 
1048     private static final class AnnotatedConnectException extends ConnectException {
1049 
1050         private static final long serialVersionUID = 3901958112696433556L;
1051 
1052         AnnotatedConnectException(ConnectException exception, SocketAddress remoteAddress) {
1053             super(exception.getMessage() + ": " + remoteAddress);
1054             initCause(exception);
1055         }
1056 
1057         // Suppress a warning since this method doesn't need synchronization
1058         @Override
1059         public Throwable fillInStackTrace() {
1060             return this;
1061         }
1062     }
1063 
1064     private static final class AnnotatedNoRouteToHostException extends NoRouteToHostException {
1065 
1066         private static final long serialVersionUID = -6801433937592080623L;
1067 
1068         AnnotatedNoRouteToHostException(NoRouteToHostException exception, SocketAddress remoteAddress) {
1069             super(exception.getMessage() + ": " + remoteAddress);
1070             initCause(exception);
1071         }
1072 
1073         // Suppress a warning since this method doesn't need synchronization
1074         @Override
1075         public Throwable fillInStackTrace() {
1076             return this;
1077         }
1078     }
1079 
1080     private static final class AnnotatedSocketException extends SocketException {
1081 
1082         private static final long serialVersionUID = 3896743275010454039L;
1083 
1084         AnnotatedSocketException(SocketException exception, SocketAddress remoteAddress) {
1085             super(exception.getMessage() + ": " + remoteAddress);
1086             initCause(exception);
1087         }
1088 
1089         // Suppress a warning since this method doesn't need synchronization
1090         @Override
1091         public Throwable fillInStackTrace() {
1092             return this;
1093         }
1094     }
1095 }