View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.channel.socket.ChannelOutputShutdownEvent;
20  import io.netty.channel.socket.ChannelOutputShutdownException;
21  import io.netty.util.DefaultAttributeMap;
22  import io.netty.util.ReferenceCountUtil;
23  import io.netty.util.internal.PlatformDependent;
24  import io.netty.util.internal.ThrowableUtil;
25  import io.netty.util.internal.UnstableApi;
26  import io.netty.util.internal.logging.InternalLogger;
27  import io.netty.util.internal.logging.InternalLoggerFactory;
28  
29  import java.io.IOException;
30  import java.net.ConnectException;
31  import java.net.InetSocketAddress;
32  import java.net.NoRouteToHostException;
33  import java.net.SocketAddress;
34  import java.net.SocketException;
35  import java.nio.channels.ClosedChannelException;
36  import java.nio.channels.NotYetConnectedException;
37  import java.util.concurrent.Executor;
38  import java.util.concurrent.RejectedExecutionException;
39  
40  /**
41   * A skeletal {@link Channel} implementation.
42   */
43  public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
44  
45      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
46  
47      private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
48              new ClosedChannelException(), AbstractUnsafe.class, "flush0()");
49      private static final ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
50              new ClosedChannelException(), AbstractUnsafe.class, "ensureOpen(...)");
51      private static final ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
52              new ClosedChannelException(), AbstractUnsafe.class, "close(...)");
53      private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
54              new ClosedChannelException(), AbstractUnsafe.class, "write(...)");
55      private static final NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION = ThrowableUtil.unknownStackTrace(
56              new NotYetConnectedException(), AbstractUnsafe.class, "flush0()");
57  
58      private final Channel parent;
59      private final long hashCode = PlatformDependent.threadLocalRandom().nextLong();
60      private final Unsafe unsafe;
61      private final DefaultChannelPipeline pipeline;
62      private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);
63      private final VoidChannelPromise voidPromise = new VoidChannelPromise(this, true);
64      private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
65      private final CloseFuture closeFuture = new CloseFuture(this);
66  
67      private volatile SocketAddress localAddress;
68      private volatile SocketAddress remoteAddress;
69      private volatile EventLoop eventLoop;
70      private volatile boolean registered;
71      private boolean closeInitiated;
72  
73      /** Cache for the string representation of this channel */
74      private boolean strValActive;
75      private String strVal;
76  
77      /**
78       * Creates a new instance.
79       *
80       * @param parent
81       *        the parent of this channel. {@code null} if there's no parent.
82       */
83      protected AbstractChannel(Channel parent) {
84          this.parent = parent;
85          unsafe = newUnsafe();
86          pipeline = newChannelPipeline();
87      }
88  
89      /**
90       * Returns a new {@link DefaultChannelPipeline} instance.
91       */
92      protected DefaultChannelPipeline newChannelPipeline() {
93          return new DefaultChannelPipeline(this);
94      }
95  
96      @Override
97      public boolean isWritable() {
98          ChannelOutboundBuffer buf = unsafe.outboundBuffer();
99          return buf != null && buf.isWritable();
100     }
101 
102     @Override
103     public Channel parent() {
104         return parent;
105     }
106 
107     @Override
108     public ChannelPipeline pipeline() {
109         return pipeline;
110     }
111 
112     @Override
113     public ByteBufAllocator alloc() {
114         return config().getAllocator();
115     }
116 
117     @Override
118     public EventLoop eventLoop() {
119         EventLoop eventLoop = this.eventLoop;
120         if (eventLoop == null) {
121             throw new IllegalStateException("channel not registered to an event loop");
122         }
123         return eventLoop;
124     }
125 
126     @Override
127     public SocketAddress localAddress() {
128         SocketAddress localAddress = this.localAddress;
129         if (localAddress == null) {
130             try {
131                 this.localAddress = localAddress = unsafe().localAddress();
132             } catch (Throwable t) {
133                 // Sometimes fails on a closed socket in Windows.
134                 return null;
135             }
136         }
137         return localAddress;
138     }
139 
140     /**
141      * @deprecated no use-case for this.
142      */
143     @Deprecated
144     protected void invalidateLocalAddress() {
145         localAddress = null;
146     }
147 
148     @Override
149     public SocketAddress remoteAddress() {
150         SocketAddress remoteAddress = this.remoteAddress;
151         if (remoteAddress == null) {
152             try {
153                 this.remoteAddress = remoteAddress = unsafe().remoteAddress();
154             } catch (Throwable t) {
155                 // Sometimes fails on a closed socket in Windows.
156                 return null;
157             }
158         }
159         return remoteAddress;
160     }
161 
162     /**
163      * @deprecated no use-case for this.
164      */
165     @Deprecated
166     protected void invalidateRemoteAddress() {
167         remoteAddress = null;
168     }
169 
170     @Override
171     public boolean isRegistered() {
172         return registered;
173     }
174 
175     @Override
176     public ChannelFuture bind(SocketAddress localAddress) {
177         return pipeline.bind(localAddress);
178     }
179 
180     @Override
181     public ChannelFuture connect(SocketAddress remoteAddress) {
182         return pipeline.connect(remoteAddress);
183     }
184 
185     @Override
186     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
187         return pipeline.connect(remoteAddress, localAddress);
188     }
189 
190     @Override
191     public ChannelFuture disconnect() {
192         return pipeline.disconnect();
193     }
194 
195     @Override
196     public ChannelFuture close() {
197         return pipeline.close();
198     }
199 
200     @Override
201     public ChannelFuture deregister() {
202         return pipeline.deregister();
203     }
204 
205     @Override
206     public Channel flush() {
207         pipeline.flush();
208         return this;
209     }
210 
211     @Override
212     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
213         return pipeline.bind(localAddress, promise);
214     }
215 
216     @Override
217     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
218         return pipeline.connect(remoteAddress, promise);
219     }
220 
221     @Override
222     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
223         return pipeline.connect(remoteAddress, localAddress, promise);
224     }
225 
226     @Override
227     public ChannelFuture disconnect(ChannelPromise promise) {
228         return pipeline.disconnect(promise);
229     }
230 
231     @Override
232     public ChannelFuture close(ChannelPromise promise) {
233         return pipeline.close(promise);
234     }
235 
236     @Override
237     public ChannelFuture deregister(ChannelPromise promise) {
238         return pipeline.deregister(promise);
239     }
240 
241     @Override
242     public Channel read() {
243         pipeline.read();
244         return this;
245     }
246 
247     @Override
248     public ChannelFuture write(Object msg) {
249         return pipeline.write(msg);
250     }
251 
252     @Override
253     public ChannelFuture write(Object msg, ChannelPromise promise) {
254         return pipeline.write(msg, promise);
255     }
256 
257     @Override
258     public ChannelFuture writeAndFlush(Object msg) {
259         return pipeline.writeAndFlush(msg);
260     }
261 
262     @Override
263     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
264         return pipeline.writeAndFlush(msg, promise);
265     }
266 
267     @Override
268     public ChannelPromise newPromise() {
269         return new DefaultChannelPromise(this);
270     }
271 
272     @Override
273     public ChannelProgressivePromise newProgressivePromise() {
274         return new DefaultChannelProgressivePromise(this);
275     }
276 
277     @Override
278     public ChannelFuture newSucceededFuture() {
279         return succeededFuture;
280     }
281 
282     @Override
283     public ChannelFuture newFailedFuture(Throwable cause) {
284         return new FailedChannelFuture(this, null, cause);
285     }
286 
287     @Override
288     public ChannelFuture closeFuture() {
289         return closeFuture;
290     }
291 
292     @Override
293     public Unsafe unsafe() {
294         return unsafe;
295     }
296 
297     /**
298      * Create a new {@link AbstractUnsafe} instance which will be used for the life-time of the {@link Channel}
299      */
300     protected abstract AbstractUnsafe newUnsafe();
301 
302     /**
303      * Returns the ID of this channel.
304      */
305     @Override
306     public final int hashCode() {
307         return (int) hashCode;
308     }
309 
310     /**
311      * Returns {@code true} if and only if the specified object is identical
312      * with this channel (i.e: {@code this == o}).
313      */
314     @Override
315     public final boolean equals(Object o) {
316         return this == o;
317     }
318 
319     @Override
320     public final int compareTo(Channel o) {
321         if (this == o) {
322             return 0;
323         }
324 
325         long ret = hashCode - o.hashCode();
326         if (ret > 0) {
327             return 1;
328         }
329         if (ret < 0) {
330             return -1;
331         }
332 
333         ret = System.identityHashCode(this) - System.identityHashCode(o);
334         if (ret != 0) {
335             return (int) ret;
336         }
337 
338         // Jackpot! - different objects with same hashes
339         throw new Error();
340     }
341 
342     /**
343      * Returns the {@link String} representation of this channel.  The returned
344      * string contains the {@linkplain #hashCode() ID}, {@linkplain #localAddress() local address},
345      * and {@linkplain #remoteAddress() remote address} of this channel for
346      * easier identification.
347      */
348     @Override
349     public String toString() {
350         boolean active = isActive();
351         if (strValActive == active && strVal != null) {
352             return strVal;
353         }
354 
355         SocketAddress remoteAddr = remoteAddress();
356         SocketAddress localAddr = localAddress();
357         if (remoteAddr != null) {
358             String activeNotation = active? "-" : "!";
359             strVal = String.format("[id: 0x%08x, L:%s %s R:%s]", (int) hashCode, localAddr, activeNotation, remoteAddr);
360         } else if (localAddr != null) {
361             strVal = String.format("[id: 0x%08x, L:%s]", (int) hashCode, localAddr);
362         } else {
363             strVal = String.format("[id: 0x%08x]", (int) hashCode);
364         }
365 
366         strValActive = active;
367         return strVal;
368     }
369 
370     @Override
371     public final ChannelPromise voidPromise() {
372         return voidPromise;
373     }
374 
375     /**
376      * {@link Unsafe} implementation which sub-classes must extend and use.
377      */
378     protected abstract class AbstractUnsafe implements Unsafe {
379 
380         private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
381         private boolean inFlush0;
382         /** true if the channel has never been registered, false otherwise */
383         private boolean neverRegistered = true;
384 
385         private void assertEventLoop() {
386             assert !registered || eventLoop.inEventLoop();
387         }
388 
389         @Override
390         public final ChannelOutboundBuffer outboundBuffer() {
391             return outboundBuffer;
392         }
393 
394         @Override
395         public final SocketAddress localAddress() {
396             return localAddress0();
397         }
398 
399         @Override
400         public final SocketAddress remoteAddress() {
401             return remoteAddress0();
402         }
403 
404         @Override
405         public final void register(EventLoop eventLoop, final ChannelPromise promise) {
406             if (eventLoop == null) {
407                 throw new NullPointerException("eventLoop");
408             }
409             if (isRegistered()) {
410                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
411                 return;
412             }
413             if (!isCompatible(eventLoop)) {
414                 promise.setFailure(
415                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
416                 return;
417             }
418 
419             AbstractChannel.this.eventLoop = eventLoop;
420 
421             if (eventLoop.inEventLoop()) {
422                 register0(promise);
423             } else {
424                 try {
425                     eventLoop.execute(new Runnable() {
426                         @Override
427                         public void run() {
428                             register0(promise);
429                         }
430                     });
431                 } catch (Throwable t) {
432                     logger.warn(
433                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",
434                             AbstractChannel.this, t);
435                     closeForcibly();
436                     closeFuture.setClosed();
437                     safeSetFailure(promise, t);
438                 }
439             }
440         }
441 
442         private void register0(ChannelPromise promise) {
443             try {
444                 // check if the channel is still open as it could be closed in the mean time when the register
445                 // call was outside of the eventLoop
446                 if (!promise.setUncancellable() || !ensureOpen(promise)) {
447                     return;
448                 }
449                 boolean firstRegistration = neverRegistered;
450                 doRegister();
451                 neverRegistered = false;
452                 registered = true;
453 
454                 // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
455                 // user may already fire events through the pipeline in the ChannelFutureListener.
456                 pipeline.invokeHandlerAddedIfNeeded();
457 
458                 safeSetSuccess(promise);
459                 pipeline.fireChannelRegistered();
460                 // Only fire a channelActive if the channel has never been registered. This prevents firing
461                 // multiple channel actives if the channel is deregistered and re-registered.
462                 if (isActive()) {
463                     if (firstRegistration) {
464                         pipeline.fireChannelActive();
465                     } else if (config().isAutoRead()) {
466                         // This channel was registered before and autoRead() is set. This means we need to begin read
467                         // again so that we process inbound data.
468                         //
469                         // See https://github.com/netty/netty/issues/4805
470                         beginRead();
471                     }
472                 }
473             } catch (Throwable t) {
474                 // Close the channel directly to avoid FD leak.
475                 closeForcibly();
476                 closeFuture.setClosed();
477                 safeSetFailure(promise, t);
478             }
479         }
480 
481         @Override
482         public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
483             assertEventLoop();
484 
485             if (!promise.setUncancellable() || !ensureOpen(promise)) {
486                 return;
487             }
488 
489             // See: https://github.com/netty/netty/issues/576
490             if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
491                 localAddress instanceof InetSocketAddress &&
492                 !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
493                 !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
494                 // Warn a user about the fact that a non-root user can't receive a
495                 // broadcast packet on *nix if the socket is bound on non-wildcard address.
496                 logger.warn(
497                         "A non-root user can't receive a broadcast packet if the socket " +
498                         "is not bound to a wildcard address; binding to a non-wildcard " +
499                         "address (" + localAddress + ") anyway as requested.");
500             }
501 
502             boolean wasActive = isActive();
503             try {
504                 doBind(localAddress);
505             } catch (Throwable t) {
506                 safeSetFailure(promise, t);
507                 closeIfClosed();
508                 return;
509             }
510 
511             if (!wasActive && isActive()) {
512                 invokeLater(new Runnable() {
513                     @Override
514                     public void run() {
515                         pipeline.fireChannelActive();
516                     }
517                 });
518             }
519 
520             safeSetSuccess(promise);
521         }
522 
523         @Override
524         public final void disconnect(final ChannelPromise promise) {
525             assertEventLoop();
526 
527             if (!promise.setUncancellable()) {
528                 return;
529             }
530 
531             boolean wasActive = isActive();
532             try {
533                 doDisconnect();
534             } catch (Throwable t) {
535                 safeSetFailure(promise, t);
536                 closeIfClosed();
537                 return;
538             }
539 
540             if (wasActive && !isActive()) {
541                 invokeLater(new Runnable() {
542                     @Override
543                     public void run() {
544                         pipeline.fireChannelInactive();
545                     }
546                 });
547             }
548 
549             safeSetSuccess(promise);
550             closeIfClosed(); // doDisconnect() might have closed the channel
551         }
552 
553         @Override
554         public final void close(final ChannelPromise promise) {
555             assertEventLoop();
556 
557             close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
558         }
559 
560         /**
561          * Shutdown the output portion of the corresponding {@link Channel}.
562          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
563          */
564         @UnstableApi
565         public final void shutdownOutput(final ChannelPromise promise) {
566             assertEventLoop();
567             shutdownOutput(promise, null);
568         }
569 
570         /**
571          * Shutdown the output portion of the corresponding {@link Channel}.
572          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
573          * @param cause The cause which may provide rational for the shutdown.
574          */
575         private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
576             if (!promise.setUncancellable()) {
577                 return;
578             }
579 
580             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
581             if (outboundBuffer == null) {
582                 promise.setFailure(CLOSE_CLOSED_CHANNEL_EXCEPTION);
583                 return;
584             }
585             this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
586 
587             final Throwable shutdownCause = cause == null ?
588                     new ChannelOutputShutdownException("Channel output shutdown") :
589                     new ChannelOutputShutdownException("Channel output shutdown", cause);
590             Executor closeExecutor = prepareToClose();
591             if (closeExecutor != null) {
592                 closeExecutor.execute(new Runnable() {
593                     @Override
594                     public void run() {
595                         try {
596                             // Execute the shutdown.
597                             doShutdownOutput();
598                             promise.setSuccess();
599                         } catch (Throwable err) {
600                             promise.setFailure(err);
601                         } finally {
602                             // Dispatch to the EventLoop
603                             eventLoop().execute(new Runnable() {
604                                 @Override
605                                 public void run() {
606                                     closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
607                                 }
608                             });
609                         }
610                     }
611                 });
612             } else {
613                 try {
614                     // Execute the shutdown.
615                     doShutdownOutput();
616                     promise.setSuccess();
617                 } catch (Throwable err) {
618                     promise.setFailure(err);
619                 } finally {
620                     closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
621                 }
622             }
623         }
624 
625         private void closeOutboundBufferForShutdown(
626                 ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
627             buffer.failFlushed(cause, false);
628             buffer.close(cause, true);
629             pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
630         }
631 
632         private void close(final ChannelPromise promise, final Throwable cause,
633                            final ClosedChannelException closeCause, final boolean notify) {
634             if (!promise.setUncancellable()) {
635                 return;
636             }
637 
638             if (closeInitiated) {
639                 if (closeFuture.isDone()) {
640                     // Closed already.
641                     safeSetSuccess(promise);
642                 } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
643                     // This means close() was called before so we just register a listener and return
644                     closeFuture.addListener(new ChannelFutureListener() {
645                         @Override
646                         public void operationComplete(ChannelFuture future) throws Exception {
647                             promise.setSuccess();
648                         }
649                     });
650                 }
651                 return;
652             }
653 
654             closeInitiated = true;
655 
656             final boolean wasActive = isActive();
657             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
658             this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
659             Executor closeExecutor = prepareToClose();
660             if (closeExecutor != null) {
661                 closeExecutor.execute(new Runnable() {
662                     @Override
663                     public void run() {
664                         try {
665                             // Execute the close.
666                             doClose0(promise);
667                         } finally {
668                             // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
669                             invokeLater(new Runnable() {
670                                 @Override
671                                 public void run() {
672                                     if (outboundBuffer != null) {
673                                         // Fail all the queued messages
674                                         outboundBuffer.failFlushed(cause, notify);
675                                         outboundBuffer.close(closeCause);
676                                     }
677                                     fireChannelInactiveAndDeregister(wasActive);
678                                 }
679                             });
680                         }
681                     }
682                 });
683             } else {
684                 try {
685                     // Close the channel and fail the queued messages in all cases.
686                     doClose0(promise);
687                 } finally {
688                     if (outboundBuffer != null) {
689                         // Fail all the queued messages.
690                         outboundBuffer.failFlushed(cause, notify);
691                         outboundBuffer.close(closeCause);
692                     }
693                 }
694                 if (inFlush0) {
695                     invokeLater(new Runnable() {
696                         @Override
697                         public void run() {
698                             fireChannelInactiveAndDeregister(wasActive);
699                         }
700                     });
701                 } else {
702                     fireChannelInactiveAndDeregister(wasActive);
703                 }
704             }
705         }
706 
707         private void doClose0(ChannelPromise promise) {
708             try {
709                 doClose();
710                 closeFuture.setClosed();
711                 safeSetSuccess(promise);
712             } catch (Throwable t) {
713                 closeFuture.setClosed();
714                 safeSetFailure(promise, t);
715             }
716         }
717 
718         private void fireChannelInactiveAndDeregister(final boolean wasActive) {
719             deregister(voidPromise(), wasActive && !isActive());
720         }
721 
722         @Override
723         public final void closeForcibly() {
724             assertEventLoop();
725 
726             try {
727                 doClose();
728             } catch (Exception e) {
729                 logger.warn("Failed to close a channel.", e);
730             }
731         }
732 
733         @Override
734         public final void deregister(final ChannelPromise promise) {
735             assertEventLoop();
736 
737             deregister(promise, false);
738         }
739 
740         private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
741             if (!promise.setUncancellable()) {
742                 return;
743             }
744 
745             if (!registered) {
746                 safeSetSuccess(promise);
747                 return;
748             }
749 
750             // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
751             // we need to ensure we do the actual deregister operation later. This is needed as for example,
752             // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
753             // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
754             // the deregister operation this could lead to have a handler invoked by different EventLoop and so
755             // threads.
756             //
757             // See:
758             // https://github.com/netty/netty/issues/4435
759             invokeLater(new Runnable() {
760                 @Override
761                 public void run() {
762                     try {
763                         doDeregister();
764                     } catch (Throwable t) {
765                         logger.warn("Unexpected exception occurred while deregistering a channel.", t);
766                     } finally {
767                         if (fireChannelInactive) {
768                             pipeline.fireChannelInactive();
769                         }
770                         // Some transports like local and AIO does not allow the deregistration of
771                         // an open channel.  Their doDeregister() calls close(). Consequently,
772                         // close() calls deregister() again - no need to fire channelUnregistered, so check
773                         // if it was registered.
774                         if (registered) {
775                             registered = false;
776                             pipeline.fireChannelUnregistered();
777                         }
778                         safeSetSuccess(promise);
779                     }
780                 }
781             });
782         }
783 
784         @Override
785         public final void beginRead() {
786             assertEventLoop();
787 
788             if (!isActive()) {
789                 return;
790             }
791 
792             try {
793                 doBeginRead();
794             } catch (final Exception e) {
795                 invokeLater(new Runnable() {
796                     @Override
797                     public void run() {
798                         pipeline.fireExceptionCaught(e);
799                     }
800                 });
801                 close(voidPromise());
802             }
803         }
804 
805         @Override
806         public final void write(Object msg, ChannelPromise promise) {
807             assertEventLoop();
808 
809             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
810             if (outboundBuffer == null) {
811                 // If the outboundBuffer is null we know the channel was closed and so
812                 // need to fail the future right away. If it is not null the handling of the rest
813                 // will be done in flush0()
814                 // See https://github.com/netty/netty/issues/2362
815                 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
816                 // release message now to prevent resource-leak
817                 ReferenceCountUtil.release(msg);
818                 return;
819             }
820 
821             int size;
822             try {
823                 msg = filterOutboundMessage(msg);
824                 size = pipeline.estimatorHandle().size(msg);
825                 if (size < 0) {
826                     size = 0;
827                 }
828             } catch (Throwable t) {
829                 safeSetFailure(promise, t);
830                 ReferenceCountUtil.release(msg);
831                 return;
832             }
833 
834             outboundBuffer.addMessage(msg, size, promise);
835         }
836 
837         @Override
838         public final void flush() {
839             assertEventLoop();
840 
841             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
842             if (outboundBuffer == null) {
843                 return;
844             }
845 
846             outboundBuffer.addFlush();
847             flush0();
848         }
849 
850         protected void flush0() {
851             if (inFlush0) {
852                 // Avoid re-entrance
853                 return;
854             }
855 
856             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
857             if (outboundBuffer == null || outboundBuffer.isEmpty()) {
858                 return;
859             }
860 
861             inFlush0 = true;
862 
863             // Mark all pending write requests as failure if the channel is inactive.
864             if (!isActive()) {
865                 try {
866                     if (isOpen()) {
867                         outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
868                     } else {
869                         // Do not trigger channelWritabilityChanged because the channel is closed already.
870                         outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
871                     }
872                 } finally {
873                     inFlush0 = false;
874                 }
875                 return;
876             }
877 
878             try {
879                 doWrite(outboundBuffer);
880             } catch (Throwable t) {
881                 if (t instanceof IOException && config().isAutoClose()) {
882                     /**
883                      * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
884                      * failing all flushed messages and also ensure the actual close of the underlying transport
885                      * will happen before the promises are notified.
886                      *
887                      * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
888                      * may still return {@code true} even if the channel should be closed as result of the exception.
889                      */
890                     close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
891                 } else {
892                     try {
893                         shutdownOutput(voidPromise(), t);
894                     } catch (Throwable t2) {
895                         close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
896                     }
897                 }
898             } finally {
899                 inFlush0 = false;
900             }
901         }
902 
903         @Override
904         public final ChannelPromise voidPromise() {
905             assertEventLoop();
906 
907             return unsafeVoidPromise;
908         }
909 
910         protected final boolean ensureOpen(ChannelPromise promise) {
911             if (isOpen()) {
912                 return true;
913             }
914 
915             safeSetFailure(promise, ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION);
916             return false;
917         }
918 
919         /**
920          * Marks the specified {@code promise} as success.  If the {@code promise} is done already, log a message.
921          */
922         protected final void safeSetSuccess(ChannelPromise promise) {
923             if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
924                 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
925             }
926         }
927 
928         /**
929          * Marks the specified {@code promise} as failure.  If the {@code promise} is done already, log a message.
930          */
931         protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
932             if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
933                 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
934             }
935         }
936 
937         protected final void closeIfClosed() {
938             if (isOpen()) {
939                 return;
940             }
941             close(voidPromise());
942         }
943 
944         private void invokeLater(Runnable task) {
945             try {
946                 // This method is used by outbound operation implementations to trigger an inbound event later.
947                 // They do not trigger an inbound event immediately because an outbound operation might have been
948                 // triggered by another inbound event handler method.  If fired immediately, the call stack
949                 // will look like this for example:
950                 //
951                 //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
952                 //   -> handlerA.ctx.close()
953                 //      -> channel.unsafe.close()
954                 //         -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
955                 //
956                 // which means the execution of two inbound handler methods of the same handler overlap undesirably.
957                 eventLoop().execute(task);
958             } catch (RejectedExecutionException e) {
959                 logger.warn("Can't invoke task later as EventLoop rejected it", e);
960             }
961         }
962 
963         /**
964          * Appends the remote address to the message of the exceptions caused by connection attempt failure.
965          */
966         protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
967             if (cause instanceof ConnectException) {
968                 return new AnnotatedConnectException((ConnectException) cause, remoteAddress);
969             }
970             if (cause instanceof NoRouteToHostException) {
971                 return new AnnotatedNoRouteToHostException((NoRouteToHostException) cause, remoteAddress);
972             }
973             if (cause instanceof SocketException) {
974                 return new AnnotatedSocketException((SocketException) cause, remoteAddress);
975             }
976 
977             return cause;
978         }
979 
980         /**
981          * Prepares to close the {@link Channel}. If this method returns an {@link Executor}, the
982          * caller must call the {@link Executor#execute(Runnable)} method with a task that calls
983          * {@link #doClose()} on the returned {@link Executor}. If this method returns {@code null},
984          * {@link #doClose()} must be called from the caller thread. (i.e. {@link EventLoop})
985          */
986         protected Executor prepareToClose() {
987             return null;
988         }
989     }
990 
991     /**
992      * Return {@code true} if the given {@link EventLoop} is compatible with this instance.
993      */
994     protected abstract boolean isCompatible(EventLoop loop);
995 
996     /**
997      * Returns the {@link SocketAddress} which is bound locally.
998      */
999     protected abstract SocketAddress localAddress0();
1000 
1001     /**
1002      * Return the {@link SocketAddress} which the {@link Channel} is connected to.
1003      */
1004     protected abstract SocketAddress remoteAddress0();
1005 
1006     /**
1007      * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
1008      *
1009      * Sub-classes may override this method
1010      */
1011     protected void doRegister() throws Exception {
1012         // NOOP
1013     }
1014 
1015     /**
1016      * Bind the {@link Channel} to the {@link SocketAddress}
1017      */
1018     protected abstract void doBind(SocketAddress localAddress) throws Exception;
1019 
1020     /**
1021      * Disconnect this {@link Channel} from its remote peer
1022      */
1023     protected abstract void doDisconnect() throws Exception;
1024 
1025     /**
1026      * Close the {@link Channel}
1027      */
1028     protected abstract void doClose() throws Exception;
1029 
1030     /**
1031      * Called when conditions justify shutting down the output portion of the channel. This may happen if a write
1032      * operation throws an exception.
1033      */
1034     @UnstableApi
1035     protected void doShutdownOutput() throws Exception {
1036         doClose();
1037     }
1038 
1039     /**
1040      * Deregister the {@link Channel} from its {@link EventLoop}.
1041      *
1042      * Sub-classes may override this method
1043      */
1044     protected void doDeregister() throws Exception {
1045         // NOOP
1046     }
1047 
1048     /**
1049      * Schedule a read operation.
1050      */
1051     protected abstract void doBeginRead() throws Exception;
1052 
1053     /**
1054      * Flush the content of the given buffer to the remote peer.
1055      */
1056     protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
1057 
1058     /**
1059      * Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that
1060      * the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer)
1061      */
1062     protected Object filterOutboundMessage(Object msg) throws Exception {
1063         return msg;
1064     }
1065 
1066     static final class CloseFuture extends DefaultChannelPromise {
1067 
1068         CloseFuture(AbstractChannel ch) {
1069             super(ch);
1070         }
1071 
1072         @Override
1073         public ChannelPromise setSuccess() {
1074             throw new IllegalStateException();
1075         }
1076 
1077         @Override
1078         public ChannelPromise setFailure(Throwable cause) {
1079             throw new IllegalStateException();
1080         }
1081 
1082         @Override
1083         public boolean trySuccess() {
1084             throw new IllegalStateException();
1085         }
1086 
1087         @Override
1088         public boolean tryFailure(Throwable cause) {
1089             throw new IllegalStateException();
1090         }
1091 
1092         boolean setClosed() {
1093             return super.trySuccess();
1094         }
1095     }
1096 
1097     private static final class AnnotatedConnectException extends ConnectException {
1098 
1099         private static final long serialVersionUID = 3901958112696433556L;
1100 
1101         AnnotatedConnectException(ConnectException exception, SocketAddress remoteAddress) {
1102             super(exception.getMessage() + ": " + remoteAddress);
1103             initCause(exception);
1104             setStackTrace(exception.getStackTrace());
1105         }
1106 
1107         @Override
1108         public Throwable fillInStackTrace() {
1109             return this;
1110         }
1111     }
1112 
1113     private static final class AnnotatedNoRouteToHostException extends NoRouteToHostException {
1114 
1115         private static final long serialVersionUID = -6801433937592080623L;
1116 
1117         AnnotatedNoRouteToHostException(NoRouteToHostException exception, SocketAddress remoteAddress) {
1118             super(exception.getMessage() + ": " + remoteAddress);
1119             initCause(exception);
1120             setStackTrace(exception.getStackTrace());
1121         }
1122 
1123         @Override
1124         public Throwable fillInStackTrace() {
1125             return this;
1126         }
1127     }
1128 
1129     private static final class AnnotatedSocketException extends SocketException {
1130 
1131         private static final long serialVersionUID = 3896743275010454039L;
1132 
1133         AnnotatedSocketException(SocketException exception, SocketAddress remoteAddress) {
1134             super(exception.getMessage() + ": " + remoteAddress);
1135             initCause(exception);
1136             setStackTrace(exception.getStackTrace());
1137         }
1138 
1139         @Override
1140         public Throwable fillInStackTrace() {
1141             return this;
1142         }
1143     }
1144 }