View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.channel.socket.ChannelOutputShutdownEvent;
19  import io.netty.channel.socket.ChannelOutputShutdownException;
20  import io.netty.util.DefaultAttributeMap;
21  import io.netty.util.ReferenceCountUtil;
22  import io.netty.util.internal.ObjectUtil;
23  import io.netty.util.internal.PlatformDependent;
24  import io.netty.util.internal.logging.InternalLogger;
25  import io.netty.util.internal.logging.InternalLoggerFactory;
26  
27  import java.io.IOException;
28  import java.net.ConnectException;
29  import java.net.InetSocketAddress;
30  import java.net.NoRouteToHostException;
31  import java.net.SocketAddress;
32  import java.net.SocketException;
33  import java.nio.channels.ClosedChannelException;
34  import java.nio.channels.NotYetConnectedException;
35  import java.util.concurrent.Executor;
36  import java.util.concurrent.RejectedExecutionException;
37  
38  /**
39   * A skeletal {@link Channel} implementation.
40   */
41  public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
42  
43      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
44  
45      private final Channel parent;
46      private final ChannelId id;
47      private final Unsafe unsafe;
48      private final DefaultChannelPipeline pipeline;
49      private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
50      private final CloseFuture closeFuture = new CloseFuture(this);
51  
52      private volatile SocketAddress localAddress;
53      private volatile SocketAddress remoteAddress;
54      private volatile EventLoop eventLoop;
55      private volatile boolean registered;
56      private boolean closeInitiated;
57      private Throwable initialCloseCause;
58  
59      /** Cache for the string representation of this channel */
60      private boolean strValActive;
61      private String strVal;
62  
63      /**
64       * Creates a new instance.
65       *
66       * @param parent
67       *        the parent of this channel. {@code null} if there's no parent.
68       */
69      protected AbstractChannel(Channel parent) {
70          this.parent = parent;
71          id = newId();
72          unsafe = newUnsafe();
73          pipeline = newChannelPipeline();
74      }
75  
76      /**
77       * Creates a new instance.
78       *
79       * @param parent
80       *        the parent of this channel. {@code null} if there's no parent.
81       */
82      protected AbstractChannel(Channel parent, ChannelId id) {
83          this.parent = parent;
84          this.id = id;
85          unsafe = newUnsafe();
86          pipeline = newChannelPipeline();
87      }
88  
89      protected final int maxMessagesPerWrite() {
90          ChannelConfig config = config();
91          if (config instanceof DefaultChannelConfig) {
92              return ((DefaultChannelConfig) config).getMaxMessagesPerWrite();
93          }
94          Integer value = config.getOption(ChannelOption.MAX_MESSAGES_PER_WRITE);
95          if (value == null) {
96              return Integer.MAX_VALUE;
97          }
98          return value;
99      }
100 
101     @Override
102     public final ChannelId id() {
103         return id;
104     }
105 
106     /**
107      * Returns a new {@link DefaultChannelId} instance. Subclasses may override this method to assign custom
108      * {@link ChannelId}s to {@link Channel}s that use the {@link AbstractChannel#AbstractChannel(Channel)} constructor.
109      */
110     protected ChannelId newId() {
111         return DefaultChannelId.newInstance();
112     }
113 
114     /**
115      * Returns a new {@link DefaultChannelPipeline} instance.
116      */
117     protected DefaultChannelPipeline newChannelPipeline() {
118         return new DefaultChannelPipeline(this);
119     }
120 
121     @Override
122     public Channel parent() {
123         return parent;
124     }
125 
126     @Override
127     public ChannelPipeline pipeline() {
128         return pipeline;
129     }
130 
131     @Override
132     public EventLoop eventLoop() {
133         EventLoop eventLoop = this.eventLoop;
134         if (eventLoop == null) {
135             throw new IllegalStateException("channel not registered to an event loop");
136         }
137         return eventLoop;
138     }
139 
140     @Override
141     public SocketAddress localAddress() {
142         SocketAddress localAddress = this.localAddress;
143         if (localAddress == null) {
144             try {
145                 this.localAddress = localAddress = unsafe().localAddress();
146             } catch (Error e) {
147                 throw e;
148             } catch (Throwable t) {
149                 // Sometimes fails on a closed socket in Windows.
150                 return null;
151             }
152         }
153         return localAddress;
154     }
155 
156     /**
157      * @deprecated no use-case for this.
158      */
159     @Deprecated
160     protected void invalidateLocalAddress() {
161         localAddress = null;
162     }
163 
164     @Override
165     public SocketAddress remoteAddress() {
166         SocketAddress remoteAddress = this.remoteAddress;
167         if (remoteAddress == null) {
168             try {
169                 this.remoteAddress = remoteAddress = unsafe().remoteAddress();
170             } catch (Error e) {
171                 throw e;
172             } catch (Throwable t) {
173                 // Sometimes fails on a closed socket in Windows.
174                 return null;
175             }
176         }
177         return remoteAddress;
178     }
179 
180     /**
181      * @deprecated no use-case for this.
182      */
183     @Deprecated
184     protected void invalidateRemoteAddress() {
185         remoteAddress = null;
186     }
187 
188     @Override
189     public boolean isRegistered() {
190         return registered;
191     }
192 
193     @Override
194     public ChannelFuture closeFuture() {
195         return closeFuture;
196     }
197 
198     @Override
199     public Unsafe unsafe() {
200         return unsafe;
201     }
202 
203     /**
204      * Create a new {@link AbstractUnsafe} instance which will be used for the life-time of the {@link Channel}
205      */
206     protected abstract AbstractUnsafe newUnsafe();
207 
208     /**
209      * Returns the ID of this channel.
210      */
211     @Override
212     public final int hashCode() {
213         return id.hashCode();
214     }
215 
216     /**
217      * Returns {@code true} if and only if the specified object is identical
218      * with this channel (i.e: {@code this == o}).
219      */
220     @Override
221     public final boolean equals(Object o) {
222         return this == o;
223     }
224 
225     @Override
226     public final int compareTo(Channel o) {
227         if (this == o) {
228             return 0;
229         }
230 
231         return id().compareTo(o.id());
232     }
233 
234     /**
235      * Returns the {@link String} representation of this channel.  The returned
236      * string contains the {@linkplain #hashCode() ID}, {@linkplain #localAddress() local address},
237      * and {@linkplain #remoteAddress() remote address} of this channel for
238      * easier identification.
239      */
240     @Override
241     public String toString() {
242         boolean active = isActive();
243         if (strValActive == active && strVal != null) {
244             return strVal;
245         }
246 
247         SocketAddress remoteAddr = remoteAddress();
248         SocketAddress localAddr = localAddress();
249         if (remoteAddr != null) {
250             StringBuilder buf = new StringBuilder(96)
251                 .append("[id: 0x")
252                 .append(id.asShortText())
253                 .append(", L:")
254                 .append(localAddr)
255                 .append(active? " - " : " ! ")
256                 .append("R:")
257                 .append(remoteAddr)
258                 .append(']');
259             strVal = buf.toString();
260         } else if (localAddr != null) {
261             StringBuilder buf = new StringBuilder(64)
262                 .append("[id: 0x")
263                 .append(id.asShortText())
264                 .append(", L:")
265                 .append(localAddr)
266                 .append(']');
267             strVal = buf.toString();
268         } else {
269             StringBuilder buf = new StringBuilder(16)
270                 .append("[id: 0x")
271                 .append(id.asShortText())
272                 .append(']');
273             strVal = buf.toString();
274         }
275 
276         strValActive = active;
277         return strVal;
278     }
279 
280     @Override
281     public final ChannelPromise voidPromise() {
282         return pipeline.voidPromise();
283     }
284 
285     /**
286      * {@link Unsafe} implementation which sub-classes must extend and use.
287      */
288     protected abstract class AbstractUnsafe implements Unsafe {
289 
290         private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
291         private RecvByteBufAllocator.Handle recvHandle;
292         private boolean inFlush0;
293         /** true if the channel has never been registered, false otherwise */
294         private boolean neverRegistered = true;
295 
296         private void assertEventLoop() {
297             assert !registered || eventLoop.inEventLoop();
298         }
299 
300         @Override
301         public RecvByteBufAllocator.Handle recvBufAllocHandle() {
302             if (recvHandle == null) {
303                 recvHandle = config().getRecvByteBufAllocator().newHandle();
304             }
305             return recvHandle;
306         }
307 
308         @Override
309         public final ChannelOutboundBuffer outboundBuffer() {
310             return outboundBuffer;
311         }
312 
313         @Override
314         public final SocketAddress localAddress() {
315             return localAddress0();
316         }
317 
318         @Override
319         public final SocketAddress remoteAddress() {
320             return remoteAddress0();
321         }
322 
323         @Override
324         public final void register(EventLoop eventLoop, final ChannelPromise promise) {
325             ObjectUtil.checkNotNull(eventLoop, "eventLoop");
326             if (isRegistered()) {
327                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
328                 return;
329             }
330             if (!isCompatible(eventLoop)) {
331                 promise.setFailure(
332                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
333                 return;
334             }
335 
336             AbstractChannel.this.eventLoop = eventLoop;
337 
338             if (eventLoop.inEventLoop()) {
339                 register0(promise);
340             } else {
341                 try {
342                     eventLoop.execute(new Runnable() {
343                         @Override
344                         public void run() {
345                             register0(promise);
346                         }
347                     });
348                 } catch (Throwable t) {
349                     logger.warn(
350                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",
351                             AbstractChannel.this, t);
352                     closeForcibly();
353                     closeFuture.setClosed();
354                     safeSetFailure(promise, t);
355                 }
356             }
357         }
358 
359         private void register0(ChannelPromise promise) {
360             // check if the channel is still open as it could be closed in the mean time when the register
361             // call was outside of the eventLoop
362             if (!promise.setUncancellable() || !ensureOpen(promise)) {
363                 return;
364             }
365             ChannelPromise registerPromise = newPromise();
366             boolean firstRegistration = neverRegistered;
367             registerPromise.addListener(new ChannelFutureListener() {
368                 @Override
369                 public void operationComplete(ChannelFuture future) throws Exception {
370                     if (future.isSuccess()) {
371                         neverRegistered = false;
372                         registered = true;
373 
374                         // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
375                         // user may already fire events through the pipeline in the ChannelFutureListener.
376                         pipeline.invokeHandlerAddedIfNeeded();
377 
378                         safeSetSuccess(promise);
379                         pipeline.fireChannelRegistered();
380                         // Only fire a channelActive if the channel has never been registered. This prevents firing
381                         // multiple channel actives if the channel is deregistered and re-registered.
382                         if (isActive()) {
383                             if (firstRegistration) {
384                                 pipeline.fireChannelActive();
385                             } else if (config().isAutoRead()) {
386                                 // This channel was registered before and autoRead() is set. This means we need to
387                                 // begin read again so that we process inbound data.
388                                 //
389                                 // See https://github.com/netty/netty/issues/4805
390                                 beginRead();
391                             }
392                         }
393                     } else {
394                         // Close the channel directly to avoid FD leak.
395                         closeForcibly();
396                         closeFuture.setClosed();
397                         safeSetFailure(promise, future.cause());
398                     }
399                 }
400             });
401             doRegister(registerPromise);
402         }
403 
404         @Override
405         public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
406             assertEventLoop();
407 
408             if (!promise.setUncancellable() || !ensureOpen(promise)) {
409                 return;
410             }
411 
412             // See: https://github.com/netty/netty/issues/576
413             if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
414                 localAddress instanceof InetSocketAddress &&
415                 !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
416                 !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
417                 // Warn a user about the fact that a non-root user can't receive a
418                 // broadcast packet on *nix if the socket is bound on non-wildcard address.
419                 logger.warn(
420                         "A non-root user can't receive a broadcast packet if the socket " +
421                         "is not bound to a wildcard address; binding to a non-wildcard " +
422                         "address (" + localAddress + ") anyway as requested.");
423             }
424 
425             boolean wasActive = isActive();
426             try {
427                 doBind(localAddress);
428             } catch (Throwable t) {
429                 safeSetFailure(promise, t);
430                 closeIfClosed();
431                 return;
432             }
433 
434             if (!wasActive && isActive()) {
435                 invokeLater(new Runnable() {
436                     @Override
437                     public void run() {
438                         pipeline.fireChannelActive();
439                     }
440                 });
441             }
442 
443             safeSetSuccess(promise);
444         }
445 
446         @Override
447         public final void disconnect(final ChannelPromise promise) {
448             assertEventLoop();
449 
450             if (!promise.setUncancellable()) {
451                 return;
452             }
453 
454             boolean wasActive = isActive();
455             try {
456                 doDisconnect();
457                 // Reset remoteAddress and localAddress
458                 remoteAddress = null;
459                 localAddress = null;
460             } catch (Throwable t) {
461                 safeSetFailure(promise, t);
462                 closeIfClosed();
463                 return;
464             }
465 
466             if (wasActive && !isActive()) {
467                 invokeLater(new Runnable() {
468                     @Override
469                     public void run() {
470                         pipeline.fireChannelInactive();
471                     }
472                 });
473             }
474 
475             safeSetSuccess(promise);
476             closeIfClosed(); // doDisconnect() might have closed the channel
477         }
478 
479         @Override
480         public void close(final ChannelPromise promise) {
481             assertEventLoop();
482 
483             ClosedChannelException closedChannelException =
484                     StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
485             close(promise, closedChannelException, closedChannelException, false);
486         }
487 
488         /**
489          * Shutdown the output portion of the corresponding {@link Channel}.
490          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
491          */
492         public final void shutdownOutput(final ChannelPromise promise) {
493             assertEventLoop();
494             shutdownOutput(promise, null);
495         }
496 
497         /**
498          * Shutdown the output portion of the corresponding {@link Channel}.
499          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
500          * @param cause The cause which may provide rational for the shutdown.
501          */
502         private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
503             if (!promise.setUncancellable()) {
504                 return;
505             }
506 
507             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
508             if (outboundBuffer == null) {
509                 promise.setFailure(new ClosedChannelException());
510                 return;
511             }
512             this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
513 
514             final Throwable shutdownCause = cause == null ?
515                     new ChannelOutputShutdownException("Channel output shutdown") :
516                     new ChannelOutputShutdownException("Channel output shutdown", cause);
517 
518             // When a side enables SO_LINGER and calls showdownOutput(...) to start TCP half-closure
519             // we can not call doDeregister here because we should ensure this side in fin_wait2 state
520             // can still receive and process the data which is send by another side in the close_wait state。
521             // See https://github.com/netty/netty/issues/11981
522             try {
523                 // The shutdown function does not block regardless of the SO_LINGER setting on the socket
524                 // so we don't need to use GlobalEventExecutor to execute the shutdown
525                 doShutdownOutput();
526                 promise.setSuccess();
527             } catch (Throwable err) {
528                 promise.setFailure(err);
529             } finally {
530                 closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
531             }
532         }
533 
534         private void closeOutboundBufferForShutdown(
535                 ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
536             buffer.failFlushed(cause, false);
537             buffer.close(cause, true);
538             pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
539         }
540 
541         private void close(final ChannelPromise promise, final Throwable cause,
542                            final ClosedChannelException closeCause, final boolean notify) {
543             if (!promise.setUncancellable()) {
544                 return;
545             }
546 
547             if (closeInitiated) {
548                 if (closeFuture.isDone()) {
549                     // Closed already.
550                     safeSetSuccess(promise);
551                 } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
552                     // This means close() was called before so we just register a listener and return
553                     closeFuture.addListener(new ChannelFutureListener() {
554                         @Override
555                         public void operationComplete(ChannelFuture future) throws Exception {
556                             promise.setSuccess();
557                         }
558                     });
559                 }
560                 return;
561             }
562 
563             closeInitiated = true;
564 
565             final boolean wasActive = isActive();
566             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
567             this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
568             Executor closeExecutor = prepareToClose();
569             if (closeExecutor != null) {
570                 closeExecutor.execute(new Runnable() {
571                     @Override
572                     public void run() {
573                         try {
574                             // Execute the close.
575                             doClose0(promise);
576                         } finally {
577                             // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
578                             invokeLater(new Runnable() {
579                                 @Override
580                                 public void run() {
581                                     if (outboundBuffer != null) {
582                                         // Fail all the queued messages
583                                         outboundBuffer.failFlushed(cause, notify);
584                                         outboundBuffer.close(closeCause);
585                                     }
586                                     fireChannelInactiveAndDeregister(wasActive);
587                                 }
588                             });
589                         }
590                     }
591                 });
592             } else {
593                 try {
594                     // Close the channel and fail the queued messages in all cases.
595                     doClose0(promise);
596                 } finally {
597                     if (outboundBuffer != null) {
598                         // Fail all the queued messages.
599                         outboundBuffer.failFlushed(cause, notify);
600                         outboundBuffer.close(closeCause);
601                     }
602                 }
603                 if (inFlush0) {
604                     invokeLater(new Runnable() {
605                         @Override
606                         public void run() {
607                             fireChannelInactiveAndDeregister(wasActive);
608                         }
609                     });
610                 } else {
611                     fireChannelInactiveAndDeregister(wasActive);
612                 }
613             }
614         }
615 
616         private void doClose0(ChannelPromise promise) {
617             try {
618                 doClose();
619                 closeFuture.setClosed();
620                 safeSetSuccess(promise);
621             } catch (Throwable t) {
622                 closeFuture.setClosed();
623                 safeSetFailure(promise, t);
624             }
625         }
626 
627         private void fireChannelInactiveAndDeregister(final boolean wasActive) {
628             deregister(voidPromise(), wasActive && !isActive());
629         }
630 
631         @Override
632         public final void closeForcibly() {
633             assertEventLoop();
634 
635             try {
636                 doClose();
637             } catch (Exception e) {
638                 logger.warn("Failed to close a channel.", e);
639             }
640         }
641 
642         @Override
643         public final void deregister(final ChannelPromise promise) {
644             assertEventLoop();
645 
646             deregister(promise, false);
647         }
648 
649         private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
650             if (!promise.setUncancellable()) {
651                 return;
652             }
653 
654             if (!registered) {
655                 safeSetSuccess(promise);
656                 return;
657             }
658 
659             // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
660             // we need to ensure we do the actual deregister operation later. This is needed as for example,
661             // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
662             // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
663             // the deregister operation this could lead to have a handler invoked by different EventLoop and so
664             // threads.
665             //
666             // See:
667             // https://github.com/netty/netty/issues/4435
668             invokeLater(new Runnable() {
669                 @Override
670                 public void run() {
671                     try {
672                         doDeregister();
673                     } catch (Throwable t) {
674                         logger.warn("Unexpected exception occurred while deregistering a channel.", t);
675                     } finally {
676                         if (fireChannelInactive) {
677                             pipeline.fireChannelInactive();
678                         }
679                         // Some transports like local and AIO does not allow the deregistration of
680                         // an open channel.  Their doDeregister() calls close(). Consequently,
681                         // close() calls deregister() again - no need to fire channelUnregistered, so check
682                         // if it was registered.
683                         if (registered) {
684                             registered = false;
685                             pipeline.fireChannelUnregistered();
686                         }
687                         safeSetSuccess(promise);
688                     }
689                 }
690             });
691         }
692 
693         @Override
694         public final void beginRead() {
695             assertEventLoop();
696 
697             try {
698                 doBeginRead();
699             } catch (final Exception e) {
700                 invokeLater(new Runnable() {
701                     @Override
702                     public void run() {
703                         pipeline.fireExceptionCaught(e);
704                     }
705                 });
706                 close(voidPromise());
707             }
708         }
709 
710         @Override
711         public final void write(Object msg, ChannelPromise promise) {
712             assertEventLoop();
713 
714             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
715             if (outboundBuffer == null) {
716                 try {
717                     // release message now to prevent resource-leak
718                     ReferenceCountUtil.release(msg);
719                 } finally {
720                     // If the outboundBuffer is null we know the channel was closed and so
721                     // need to fail the future right away. If it is not null the handling of the rest
722                     // will be done in flush0()
723                     // See https://github.com/netty/netty/issues/2362
724                     safeSetFailure(promise,
725                             newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
726                 }
727                 return;
728             }
729 
730             int size;
731             try {
732                 msg = filterOutboundMessage(msg);
733                 size = pipeline.estimatorHandle().size(msg);
734                 if (size < 0) {
735                     size = 0;
736                 }
737             } catch (Throwable t) {
738                 try {
739                     ReferenceCountUtil.release(msg);
740                 } finally {
741                     safeSetFailure(promise, t);
742                 }
743                 return;
744             }
745 
746             outboundBuffer.addMessage(msg, size, promise);
747         }
748 
749         @Override
750         public final void flush() {
751             assertEventLoop();
752 
753             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
754             if (outboundBuffer == null) {
755                 return;
756             }
757 
758             outboundBuffer.addFlush();
759             flush0();
760         }
761 
762         @SuppressWarnings("deprecation")
763         protected void flush0() {
764             if (inFlush0) {
765                 // Avoid re-entrance
766                 return;
767             }
768 
769             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
770             if (outboundBuffer == null || outboundBuffer.isEmpty()) {
771                 return;
772             }
773 
774             inFlush0 = true;
775 
776             // Mark all pending write requests as failure if the channel is inactive.
777             if (!isActive()) {
778                 try {
779                     // Check if we need to generate the exception at all.
780                     if (!outboundBuffer.isEmpty()) {
781                         if (isOpen()) {
782                             outboundBuffer.failFlushed(new NotYetConnectedException(), true);
783                         } else {
784                             // Do not trigger channelWritabilityChanged because the channel is closed already.
785                             outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
786                         }
787                     }
788                 } finally {
789                     inFlush0 = false;
790                 }
791                 return;
792             }
793 
794             try {
795                 doWrite(outboundBuffer);
796             } catch (Throwable t) {
797                 handleWriteError(t);
798             } finally {
799                 inFlush0 = false;
800             }
801         }
802 
803         protected final void handleWriteError(Throwable t) {
804             if (t instanceof IOException && config().isAutoClose()) {
805                 /**
806                  * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
807                  * failing all flushed messages and also ensure the actual close of the underlying transport
808                  * will happen before the promises are notified.
809                  *
810                  * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
811                  * may still return {@code true} even if the channel should be closed as result of the exception.
812                  */
813                 initialCloseCause = t;
814                 close(voidPromise(), t, newClosedChannelException(t, "flush0()"), false);
815             } else {
816                 try {
817                     shutdownOutput(voidPromise(), t);
818                 } catch (Throwable t2) {
819                     initialCloseCause = t;
820                     close(voidPromise(), t2, newClosedChannelException(t, "flush0()"), false);
821                 }
822             }
823         }
824 
825         private ClosedChannelException newClosedChannelException(Throwable cause, String method) {
826             ClosedChannelException exception =
827                     StacklessClosedChannelException.newInstance(AbstractChannel.AbstractUnsafe.class, method);
828             if (cause != null) {
829                 exception.initCause(cause);
830             }
831             return exception;
832         }
833 
834         @Override
835         public final ChannelPromise voidPromise() {
836             assertEventLoop();
837 
838             return unsafeVoidPromise;
839         }
840 
841         protected final boolean ensureOpen(ChannelPromise promise) {
842             if (isOpen()) {
843                 return true;
844             }
845 
846             safeSetFailure(promise, newClosedChannelException(initialCloseCause, "ensureOpen(ChannelPromise)"));
847             return false;
848         }
849 
850         /**
851          * Marks the specified {@code promise} as success.  If the {@code promise} is done already, log a message.
852          */
853         protected final void safeSetSuccess(ChannelPromise promise) {
854             if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
855                 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
856             }
857         }
858 
859         /**
860          * Marks the specified {@code promise} as failure.  If the {@code promise} is done already, log a message.
861          */
862         protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
863             if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
864                 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
865             }
866         }
867 
868         protected final void closeIfClosed() {
869             if (isOpen()) {
870                 return;
871             }
872             close(voidPromise());
873         }
874 
875         private void invokeLater(Runnable task) {
876             try {
877                 // This method is used by outbound operation implementations to trigger an inbound event later.
878                 // They do not trigger an inbound event immediately because an outbound operation might have been
879                 // triggered by another inbound event handler method.  If fired immediately, the call stack
880                 // will look like this for example:
881                 //
882                 //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
883                 //   -> handlerA.ctx.close()
884                 //      -> channel.unsafe.close()
885                 //         -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
886                 //
887                 // which means the execution of two inbound handler methods of the same handler overlap undesirably.
888                 eventLoop().execute(task);
889             } catch (RejectedExecutionException e) {
890                 logger.warn("Can't invoke task later as EventLoop rejected it", e);
891             }
892         }
893 
894         /**
895          * Appends the remote address to the message of the exceptions caused by connection attempt failure.
896          */
897         protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
898             if (cause instanceof ConnectException) {
899                 return new AnnotatedConnectException((ConnectException) cause, remoteAddress);
900             }
901             if (cause instanceof NoRouteToHostException) {
902                 return new AnnotatedNoRouteToHostException((NoRouteToHostException) cause, remoteAddress);
903             }
904             if (cause instanceof SocketException) {
905                 return new AnnotatedSocketException((SocketException) cause, remoteAddress);
906             }
907 
908             return cause;
909         }
910 
911         /**
912          * Prepares to close the {@link Channel}. If this method returns an {@link Executor}, the
913          * caller must call the {@link Executor#execute(Runnable)} method with a task that calls
914          * {@link #doClose()} on the returned {@link Executor}. If this method returns {@code null},
915          * {@link #doClose()} must be called from the caller thread. (i.e. {@link EventLoop})
916          */
917         protected Executor prepareToClose() {
918             return null;
919         }
920     }
921 
922     /**
923      * Return {@code true} if the given {@link EventLoop} is compatible with this instance.
924      */
925     protected abstract boolean isCompatible(EventLoop loop);
926 
927     /**
928      * Returns the {@link SocketAddress} which is bound locally.
929      */
930     protected abstract SocketAddress localAddress0();
931 
932     /**
933      * Return the {@link SocketAddress} which the {@link Channel} is connected to.
934      */
935     protected abstract SocketAddress remoteAddress0();
936 
937     /**
938      * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
939      * Subclasses may override this method
940      *
941      * @deprecated use {@link #doRegister(ChannelPromise)}
942      */
943     @Deprecated
944     protected void doRegister() throws Exception {
945         // NOOP
946     }
947 
948     /**
949      * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
950      * Subclasses may override this method
951      *
952      * @param promise {@link ChannelPromise} that must be notified once done to continue the registration.
953      */
954     protected void doRegister(ChannelPromise promise) {
955         try {
956             doRegister();
957         } catch (Throwable cause) {
958             promise.setFailure(cause);
959             return;
960         }
961         promise.setSuccess();
962     }
963 
964     /**
965      * Bind the {@link Channel} to the {@link SocketAddress}
966      */
967     protected abstract void doBind(SocketAddress localAddress) throws Exception;
968 
969     /**
970      * Disconnect this {@link Channel} from its remote peer
971      */
972     protected abstract void doDisconnect() throws Exception;
973 
974     /**
975      * Close the {@link Channel}
976      */
977     protected abstract void doClose() throws Exception;
978 
979     /**
980      * Called when conditions justify shutting down the output portion of the channel. This may happen if a write
981      * operation throws an exception.
982      */
983     protected void doShutdownOutput() throws Exception {
984         doClose();
985     }
986 
987     /**
988      * Deregister the {@link Channel} from its {@link EventLoop}.
989      *
990      * Sub-classes may override this method
991      */
992     protected void doDeregister() throws Exception {
993         // NOOP
994     }
995 
996     /**
997      * Schedule a read operation.
998      */
999     protected abstract void doBeginRead() throws Exception;
1000 
1001     /**
1002      * Flush the content of the given buffer to the remote peer.
1003      */
1004     protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
1005 
1006     /**
1007      * Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that
1008      * the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer)
1009      */
1010     protected Object filterOutboundMessage(Object msg) throws Exception {
1011         return msg;
1012     }
1013 
1014     protected void validateFileRegion(DefaultFileRegion region, long position) throws IOException {
1015         DefaultFileRegion.validate(region, position);
1016     }
1017 
1018     static final class CloseFuture extends DefaultChannelPromise {
1019 
1020         CloseFuture(AbstractChannel ch) {
1021             super(ch);
1022         }
1023 
1024         @Override
1025         public ChannelPromise setSuccess() {
1026             throw new IllegalStateException();
1027         }
1028 
1029         @Override
1030         public ChannelPromise setFailure(Throwable cause) {
1031             throw new IllegalStateException();
1032         }
1033 
1034         @Override
1035         public boolean trySuccess() {
1036             throw new IllegalStateException();
1037         }
1038 
1039         @Override
1040         public boolean tryFailure(Throwable cause) {
1041             throw new IllegalStateException();
1042         }
1043 
1044         boolean setClosed() {
1045             return super.trySuccess();
1046         }
1047     }
1048 
1049     private static final class AnnotatedConnectException extends ConnectException {
1050 
1051         private static final long serialVersionUID = 3901958112696433556L;
1052 
1053         AnnotatedConnectException(ConnectException exception, SocketAddress remoteAddress) {
1054             super(exception.getMessage() + ": " + remoteAddress);
1055             initCause(exception);
1056         }
1057 
1058         // Suppress a warning since this method doesn't need synchronization
1059         @Override
1060         public Throwable fillInStackTrace() {
1061             return this;
1062         }
1063     }
1064 
1065     private static final class AnnotatedNoRouteToHostException extends NoRouteToHostException {
1066 
1067         private static final long serialVersionUID = -6801433937592080623L;
1068 
1069         AnnotatedNoRouteToHostException(NoRouteToHostException exception, SocketAddress remoteAddress) {
1070             super(exception.getMessage() + ": " + remoteAddress);
1071             initCause(exception);
1072         }
1073 
1074         // Suppress a warning since this method doesn't need synchronization
1075         @Override
1076         public Throwable fillInStackTrace() {
1077             return this;
1078         }
1079     }
1080 
1081     private static final class AnnotatedSocketException extends SocketException {
1082 
1083         private static final long serialVersionUID = 3896743275010454039L;
1084 
1085         AnnotatedSocketException(SocketException exception, SocketAddress remoteAddress) {
1086             super(exception.getMessage() + ": " + remoteAddress);
1087             initCause(exception);
1088         }
1089 
1090         // Suppress a warning since this method doesn't need synchronization
1091         @Override
1092         public Throwable fillInStackTrace() {
1093             return this;
1094         }
1095     }
1096 }