View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.channel.socket.ChannelOutputShutdownEvent;
20  import io.netty.channel.socket.ChannelOutputShutdownException;
21  import io.netty.util.DefaultAttributeMap;
22  import io.netty.util.ReferenceCountUtil;
23  import io.netty.util.internal.PlatformDependent;
24  import io.netty.util.internal.ThrowableUtil;
25  import io.netty.util.internal.UnstableApi;
26  import io.netty.util.internal.logging.InternalLogger;
27  import io.netty.util.internal.logging.InternalLoggerFactory;
28  
29  import java.io.IOException;
30  import java.net.ConnectException;
31  import java.net.InetSocketAddress;
32  import java.net.NoRouteToHostException;
33  import java.net.SocketAddress;
34  import java.net.SocketException;
35  import java.nio.channels.ClosedChannelException;
36  import java.nio.channels.NotYetConnectedException;
37  import java.util.concurrent.Executor;
38  import java.util.concurrent.RejectedExecutionException;
39  
40  /**
41   * A skeletal {@link Channel} implementation.
42   */
43  public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
44  
45      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
46  
47      private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
48              new ClosedChannelException(), AbstractUnsafe.class, "flush0()");
49      private static final ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
50              new ClosedChannelException(), AbstractUnsafe.class, "ensureOpen(...)");
51      private static final ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
52              new ClosedChannelException(), AbstractUnsafe.class, "close(...)");
53      private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
54              new ClosedChannelException(), AbstractUnsafe.class, "write(...)");
55      private static final NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION = ThrowableUtil.unknownStackTrace(
56              new NotYetConnectedException(), AbstractUnsafe.class, "flush0()");
57  
58      private final Channel parent;
59      private final ChannelId id;
60      private final Unsafe unsafe;
61      private final DefaultChannelPipeline pipeline;
62      private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
63      private final CloseFuture closeFuture = new CloseFuture(this);
64  
65      private volatile SocketAddress localAddress;
66      private volatile SocketAddress remoteAddress;
67      private volatile EventLoop eventLoop;
68      private volatile boolean registered;
69      private boolean closeInitiated;
70  
71      /** Cache for the string representation of this channel */
72      private boolean strValActive;
73      private String strVal;
74  
75      /**
76       * Creates a new instance.
77       *
78       * @param parent
79       *        the parent of this channel. {@code null} if there's no parent.
80       */
81      protected AbstractChannel(Channel parent) {
82          this.parent = parent;
83          id = newId();
84          unsafe = newUnsafe();
85          pipeline = newChannelPipeline();
86      }
87  
88      /**
89       * Creates a new instance.
90       *
91       * @param parent
92       *        the parent of this channel. {@code null} if there's no parent.
93       */
94      protected AbstractChannel(Channel parent, ChannelId id) {
95          this.parent = parent;
96          this.id = id;
97          unsafe = newUnsafe();
98          pipeline = newChannelPipeline();
99      }
100 
101     @Override
102     public final ChannelId id() {
103         return id;
104     }
105 
106     /**
107      * Returns a new {@link DefaultChannelId} instance. Subclasses may override this method to assign custom
108      * {@link ChannelId}s to {@link Channel}s that use the {@link AbstractChannel#AbstractChannel(Channel)} constructor.
109      */
110     protected ChannelId newId() {
111         return DefaultChannelId.newInstance();
112     }
113 
114     /**
115      * Returns a new {@link DefaultChannelPipeline} instance.
116      */
117     protected DefaultChannelPipeline newChannelPipeline() {
118         return new DefaultChannelPipeline(this);
119     }
120 
121     @Override
122     public boolean isWritable() {
123         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
124         return buf != null && buf.isWritable();
125     }
126 
127     @Override
128     public long bytesBeforeUnwritable() {
129         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
130         // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
131         // We should be consistent with that here.
132         return buf != null ? buf.bytesBeforeUnwritable() : 0;
133     }
134 
135     @Override
136     public long bytesBeforeWritable() {
137         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
138         // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
139         // We should be consistent with that here.
140         return buf != null ? buf.bytesBeforeWritable() : Long.MAX_VALUE;
141     }
142 
143     @Override
144     public Channel parent() {
145         return parent;
146     }
147 
148     @Override
149     public ChannelPipeline pipeline() {
150         return pipeline;
151     }
152 
153     @Override
154     public ByteBufAllocator alloc() {
155         return config().getAllocator();
156     }
157 
158     @Override
159     public EventLoop eventLoop() {
160         EventLoop eventLoop = this.eventLoop;
161         if (eventLoop == null) {
162             throw new IllegalStateException("channel not registered to an event loop");
163         }
164         return eventLoop;
165     }
166 
167     @Override
168     public SocketAddress localAddress() {
169         SocketAddress localAddress = this.localAddress;
170         if (localAddress == null) {
171             try {
172                 this.localAddress = localAddress = unsafe().localAddress();
173             } catch (Error e) {
174                 throw e;
175             } catch (Throwable t) {
176                 // Sometimes fails on a closed socket in Windows.
177                 return null;
178             }
179         }
180         return localAddress;
181     }
182 
183     /**
184      * @deprecated no use-case for this.
185      */
186     @Deprecated
187     protected void invalidateLocalAddress() {
188         localAddress = null;
189     }
190 
191     @Override
192     public SocketAddress remoteAddress() {
193         SocketAddress remoteAddress = this.remoteAddress;
194         if (remoteAddress == null) {
195             try {
196                 this.remoteAddress = remoteAddress = unsafe().remoteAddress();
197             } catch (Error e) {
198                 throw e;
199             } catch (Throwable t) {
200                 // Sometimes fails on a closed socket in Windows.
201                 return null;
202             }
203         }
204         return remoteAddress;
205     }
206 
207     /**
208      * @deprecated no use-case for this.
209      */
210     @Deprecated
211     protected void invalidateRemoteAddress() {
212         remoteAddress = null;
213     }
214 
215     @Override
216     public boolean isRegistered() {
217         return registered;
218     }
219 
220     @Override
221     public ChannelFuture bind(SocketAddress localAddress) {
222         return pipeline.bind(localAddress);
223     }
224 
225     @Override
226     public ChannelFuture connect(SocketAddress remoteAddress) {
227         return pipeline.connect(remoteAddress);
228     }
229 
230     @Override
231     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
232         return pipeline.connect(remoteAddress, localAddress);
233     }
234 
235     @Override
236     public ChannelFuture disconnect() {
237         return pipeline.disconnect();
238     }
239 
240     @Override
241     public ChannelFuture close() {
242         return pipeline.close();
243     }
244 
245     @Override
246     public ChannelFuture deregister() {
247         return pipeline.deregister();
248     }
249 
250     @Override
251     public Channel flush() {
252         pipeline.flush();
253         return this;
254     }
255 
256     @Override
257     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
258         return pipeline.bind(localAddress, promise);
259     }
260 
261     @Override
262     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
263         return pipeline.connect(remoteAddress, promise);
264     }
265 
266     @Override
267     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
268         return pipeline.connect(remoteAddress, localAddress, promise);
269     }
270 
271     @Override
272     public ChannelFuture disconnect(ChannelPromise promise) {
273         return pipeline.disconnect(promise);
274     }
275 
276     @Override
277     public ChannelFuture close(ChannelPromise promise) {
278         return pipeline.close(promise);
279     }
280 
281     @Override
282     public ChannelFuture deregister(ChannelPromise promise) {
283         return pipeline.deregister(promise);
284     }
285 
286     @Override
287     public Channel read() {
288         pipeline.read();
289         return this;
290     }
291 
292     @Override
293     public ChannelFuture write(Object msg) {
294         return pipeline.write(msg);
295     }
296 
297     @Override
298     public ChannelFuture write(Object msg, ChannelPromise promise) {
299         return pipeline.write(msg, promise);
300     }
301 
302     @Override
303     public ChannelFuture writeAndFlush(Object msg) {
304         return pipeline.writeAndFlush(msg);
305     }
306 
307     @Override
308     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
309         return pipeline.writeAndFlush(msg, promise);
310     }
311 
312     @Override
313     public ChannelPromise newPromise() {
314         return pipeline.newPromise();
315     }
316 
317     @Override
318     public ChannelProgressivePromise newProgressivePromise() {
319         return pipeline.newProgressivePromise();
320     }
321 
322     @Override
323     public ChannelFuture newSucceededFuture() {
324         return pipeline.newSucceededFuture();
325     }
326 
327     @Override
328     public ChannelFuture newFailedFuture(Throwable cause) {
329         return pipeline.newFailedFuture(cause);
330     }
331 
332     @Override
333     public ChannelFuture closeFuture() {
334         return closeFuture;
335     }
336 
337     @Override
338     public Unsafe unsafe() {
339         return unsafe;
340     }
341 
342     /**
343      * Create a new {@link AbstractUnsafe} instance which will be used for the life-time of the {@link Channel}
344      */
345     protected abstract AbstractUnsafe newUnsafe();
346 
347     /**
348      * Returns the ID of this channel.
349      */
350     @Override
351     public final int hashCode() {
352         return id.hashCode();
353     }
354 
355     /**
356      * Returns {@code true} if and only if the specified object is identical
357      * with this channel (i.e: {@code this == o}).
358      */
359     @Override
360     public final boolean equals(Object o) {
361         return this == o;
362     }
363 
364     @Override
365     public final int compareTo(Channel o) {
366         if (this == o) {
367             return 0;
368         }
369 
370         return id().compareTo(o.id());
371     }
372 
373     /**
374      * Returns the {@link String} representation of this channel.  The returned
375      * string contains the {@linkplain #hashCode() ID}, {@linkplain #localAddress() local address},
376      * and {@linkplain #remoteAddress() remote address} of this channel for
377      * easier identification.
378      */
379     @Override
380     public String toString() {
381         boolean active = isActive();
382         if (strValActive == active && strVal != null) {
383             return strVal;
384         }
385 
386         SocketAddress remoteAddr = remoteAddress();
387         SocketAddress localAddr = localAddress();
388         if (remoteAddr != null) {
389             StringBuilder buf = new StringBuilder(96)
390                 .append("[id: 0x")
391                 .append(id.asShortText())
392                 .append(", L:")
393                 .append(localAddr)
394                 .append(active? " - " : " ! ")
395                 .append("R:")
396                 .append(remoteAddr)
397                 .append(']');
398             strVal = buf.toString();
399         } else if (localAddr != null) {
400             StringBuilder buf = new StringBuilder(64)
401                 .append("[id: 0x")
402                 .append(id.asShortText())
403                 .append(", L:")
404                 .append(localAddr)
405                 .append(']');
406             strVal = buf.toString();
407         } else {
408             StringBuilder buf = new StringBuilder(16)
409                 .append("[id: 0x")
410                 .append(id.asShortText())
411                 .append(']');
412             strVal = buf.toString();
413         }
414 
415         strValActive = active;
416         return strVal;
417     }
418 
419     @Override
420     public final ChannelPromise voidPromise() {
421         return pipeline.voidPromise();
422     }
423 
424     /**
425      * {@link Unsafe} implementation which sub-classes must extend and use.
426      */
427     protected abstract class AbstractUnsafe implements Unsafe {
428 
429         private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
430         private RecvByteBufAllocator.Handle recvHandle;
431         private boolean inFlush0;
432         /** true if the channel has never been registered, false otherwise */
433         private boolean neverRegistered = true;
434 
435         private void assertEventLoop() {
436             assert !registered || eventLoop.inEventLoop();
437         }
438 
439         @Override
440         public RecvByteBufAllocator.Handle recvBufAllocHandle() {
441             if (recvHandle == null) {
442                 recvHandle = config().getRecvByteBufAllocator().newHandle();
443             }
444             return recvHandle;
445         }
446 
447         @Override
448         public final ChannelOutboundBuffer outboundBuffer() {
449             return outboundBuffer;
450         }
451 
452         @Override
453         public final SocketAddress localAddress() {
454             return localAddress0();
455         }
456 
457         @Override
458         public final SocketAddress remoteAddress() {
459             return remoteAddress0();
460         }
461 
462         @Override
463         public final void register(EventLoop eventLoop, final ChannelPromise promise) {
464             if (eventLoop == null) {
465                 throw new NullPointerException("eventLoop");
466             }
467             if (isRegistered()) {
468                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
469                 return;
470             }
471             if (!isCompatible(eventLoop)) {
472                 promise.setFailure(
473                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
474                 return;
475             }
476 
477             AbstractChannel.this.eventLoop = eventLoop;
478 
479             if (eventLoop.inEventLoop()) {
480                 register0(promise);
481             } else {
482                 try {
483                     eventLoop.execute(new Runnable() {
484                         @Override
485                         public void run() {
486                             register0(promise);
487                         }
488                     });
489                 } catch (Throwable t) {
490                     logger.warn(
491                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",
492                             AbstractChannel.this, t);
493                     closeForcibly();
494                     closeFuture.setClosed();
495                     safeSetFailure(promise, t);
496                 }
497             }
498         }
499 
500         private void register0(ChannelPromise promise) {
501             try {
502                 // check if the channel is still open as it could be closed in the mean time when the register
503                 // call was outside of the eventLoop
504                 if (!promise.setUncancellable() || !ensureOpen(promise)) {
505                     return;
506                 }
507                 boolean firstRegistration = neverRegistered;
508                 doRegister();
509                 neverRegistered = false;
510                 registered = true;
511 
512                 // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
513                 // user may already fire events through the pipeline in the ChannelFutureListener.
514                 pipeline.invokeHandlerAddedIfNeeded();
515 
516                 safeSetSuccess(promise);
517                 pipeline.fireChannelRegistered();
518                 // Only fire a channelActive if the channel has never been registered. This prevents firing
519                 // multiple channel actives if the channel is deregistered and re-registered.
520                 if (isActive()) {
521                     if (firstRegistration) {
522                         pipeline.fireChannelActive();
523                     } else if (config().isAutoRead()) {
524                         // This channel was registered before and autoRead() is set. This means we need to begin read
525                         // again so that we process inbound data.
526                         //
527                         // See https://github.com/netty/netty/issues/4805
528                         beginRead();
529                     }
530                 }
531             } catch (Throwable t) {
532                 // Close the channel directly to avoid FD leak.
533                 closeForcibly();
534                 closeFuture.setClosed();
535                 safeSetFailure(promise, t);
536             }
537         }
538 
539         @Override
540         public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
541             assertEventLoop();
542 
543             if (!promise.setUncancellable() || !ensureOpen(promise)) {
544                 return;
545             }
546 
547             // See: https://github.com/netty/netty/issues/576
548             if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
549                 localAddress instanceof InetSocketAddress &&
550                 !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
551                 !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
552                 // Warn a user about the fact that a non-root user can't receive a
553                 // broadcast packet on *nix if the socket is bound on non-wildcard address.
554                 logger.warn(
555                         "A non-root user can't receive a broadcast packet if the socket " +
556                         "is not bound to a wildcard address; binding to a non-wildcard " +
557                         "address (" + localAddress + ") anyway as requested.");
558             }
559 
560             boolean wasActive = isActive();
561             try {
562                 doBind(localAddress);
563             } catch (Throwable t) {
564                 safeSetFailure(promise, t);
565                 closeIfClosed();
566                 return;
567             }
568 
569             if (!wasActive && isActive()) {
570                 invokeLater(new Runnable() {
571                     @Override
572                     public void run() {
573                         pipeline.fireChannelActive();
574                     }
575                 });
576             }
577 
578             safeSetSuccess(promise);
579         }
580 
581         @Override
582         public final void disconnect(final ChannelPromise promise) {
583             assertEventLoop();
584 
585             if (!promise.setUncancellable()) {
586                 return;
587             }
588 
589             boolean wasActive = isActive();
590             try {
591                 doDisconnect();
592             } catch (Throwable t) {
593                 safeSetFailure(promise, t);
594                 closeIfClosed();
595                 return;
596             }
597 
598             if (wasActive && !isActive()) {
599                 invokeLater(new Runnable() {
600                     @Override
601                     public void run() {
602                         pipeline.fireChannelInactive();
603                     }
604                 });
605             }
606 
607             safeSetSuccess(promise);
608             closeIfClosed(); // doDisconnect() might have closed the channel
609         }
610 
611         @Override
612         public final void close(final ChannelPromise promise) {
613             assertEventLoop();
614 
615             close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
616         }
617 
618         /**
619          * Shutdown the output portion of the corresponding {@link Channel}.
620          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
621          */
622         @UnstableApi
623         public final void shutdownOutput(final ChannelPromise promise) {
624             assertEventLoop();
625             shutdownOutput(promise, null);
626         }
627 
628         /**
629          * Shutdown the output portion of the corresponding {@link Channel}.
630          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
631          * @param cause The cause which may provide rational for the shutdown.
632          */
633         private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
634             if (!promise.setUncancellable()) {
635                 return;
636             }
637 
638             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
639             if (outboundBuffer == null) {
640                 promise.setFailure(CLOSE_CLOSED_CHANNEL_EXCEPTION);
641                 return;
642             }
643             this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
644 
645             final Throwable shutdownCause = cause == null ?
646                     new ChannelOutputShutdownException("Channel output shutdown") :
647                     new ChannelOutputShutdownException("Channel output shutdown", cause);
648             Executor closeExecutor = prepareToClose();
649             if (closeExecutor != null) {
650                 closeExecutor.execute(new Runnable() {
651                     @Override
652                     public void run() {
653                         try {
654                             // Execute the shutdown.
655                             doShutdownOutput();
656                             promise.setSuccess();
657                         } catch (Throwable err) {
658                             promise.setFailure(err);
659                         } finally {
660                             // Dispatch to the EventLoop
661                             eventLoop().execute(new Runnable() {
662                                 @Override
663                                 public void run() {
664                                     closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
665                                 }
666                             });
667                         }
668                     }
669                 });
670             } else {
671                 try {
672                     // Execute the shutdown.
673                     doShutdownOutput();
674                     promise.setSuccess();
675                 } catch (Throwable err) {
676                     promise.setFailure(err);
677                 } finally {
678                     closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
679                 }
680             }
681         }
682 
683         private void closeOutboundBufferForShutdown(
684                 ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
685             buffer.failFlushed(cause, false);
686             buffer.close(cause, true);
687             pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
688         }
689 
690         private void close(final ChannelPromise promise, final Throwable cause,
691                            final ClosedChannelException closeCause, final boolean notify) {
692             if (!promise.setUncancellable()) {
693                 return;
694             }
695 
696             if (closeInitiated) {
697                 if (closeFuture.isDone()) {
698                     // Closed already.
699                     safeSetSuccess(promise);
700                 } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
701                     // This means close() was called before so we just register a listener and return
702                     closeFuture.addListener(new ChannelFutureListener() {
703                         @Override
704                         public void operationComplete(ChannelFuture future) throws Exception {
705                             promise.setSuccess();
706                         }
707                     });
708                 }
709                 return;
710             }
711 
712             closeInitiated = true;
713 
714             final boolean wasActive = isActive();
715             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
716             this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
717             Executor closeExecutor = prepareToClose();
718             if (closeExecutor != null) {
719                 closeExecutor.execute(new Runnable() {
720                     @Override
721                     public void run() {
722                         try {
723                             // Execute the close.
724                             doClose0(promise);
725                         } finally {
726                             // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
727                             invokeLater(new Runnable() {
728                                 @Override
729                                 public void run() {
730                                     if (outboundBuffer != null) {
731                                         // Fail all the queued messages
732                                         outboundBuffer.failFlushed(cause, notify);
733                                         outboundBuffer.close(closeCause);
734                                     }
735                                     fireChannelInactiveAndDeregister(wasActive);
736                                 }
737                             });
738                         }
739                     }
740                 });
741             } else {
742                 try {
743                     // Close the channel and fail the queued messages in all cases.
744                     doClose0(promise);
745                 } finally {
746                     if (outboundBuffer != null) {
747                         // Fail all the queued messages.
748                         outboundBuffer.failFlushed(cause, notify);
749                         outboundBuffer.close(closeCause);
750                     }
751                 }
752                 if (inFlush0) {
753                     invokeLater(new Runnable() {
754                         @Override
755                         public void run() {
756                             fireChannelInactiveAndDeregister(wasActive);
757                         }
758                     });
759                 } else {
760                     fireChannelInactiveAndDeregister(wasActive);
761                 }
762             }
763         }
764 
765         private void doClose0(ChannelPromise promise) {
766             try {
767                 doClose();
768                 closeFuture.setClosed();
769                 safeSetSuccess(promise);
770             } catch (Throwable t) {
771                 closeFuture.setClosed();
772                 safeSetFailure(promise, t);
773             }
774         }
775 
776         private void fireChannelInactiveAndDeregister(final boolean wasActive) {
777             deregister(voidPromise(), wasActive && !isActive());
778         }
779 
780         @Override
781         public final void closeForcibly() {
782             assertEventLoop();
783 
784             try {
785                 doClose();
786             } catch (Exception e) {
787                 logger.warn("Failed to close a channel.", e);
788             }
789         }
790 
791         @Override
792         public final void deregister(final ChannelPromise promise) {
793             assertEventLoop();
794 
795             deregister(promise, false);
796         }
797 
798         private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
799             if (!promise.setUncancellable()) {
800                 return;
801             }
802 
803             if (!registered) {
804                 safeSetSuccess(promise);
805                 return;
806             }
807 
808             // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
809             // we need to ensure we do the actual deregister operation later. This is needed as for example,
810             // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
811             // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
812             // the deregister operation this could lead to have a handler invoked by different EventLoop and so
813             // threads.
814             //
815             // See:
816             // https://github.com/netty/netty/issues/4435
817             invokeLater(new Runnable() {
818                 @Override
819                 public void run() {
820                     try {
821                         doDeregister();
822                     } catch (Throwable t) {
823                         logger.warn("Unexpected exception occurred while deregistering a channel.", t);
824                     } finally {
825                         if (fireChannelInactive) {
826                             pipeline.fireChannelInactive();
827                         }
828                         // Some transports like local and AIO does not allow the deregistration of
829                         // an open channel.  Their doDeregister() calls close(). Consequently,
830                         // close() calls deregister() again - no need to fire channelUnregistered, so check
831                         // if it was registered.
832                         if (registered) {
833                             registered = false;
834                             pipeline.fireChannelUnregistered();
835                         }
836                         safeSetSuccess(promise);
837                     }
838                 }
839             });
840         }
841 
842         @Override
843         public final void beginRead() {
844             assertEventLoop();
845 
846             if (!isActive()) {
847                 return;
848             }
849 
850             try {
851                 doBeginRead();
852             } catch (final Exception e) {
853                 invokeLater(new Runnable() {
854                     @Override
855                     public void run() {
856                         pipeline.fireExceptionCaught(e);
857                     }
858                 });
859                 close(voidPromise());
860             }
861         }
862 
863         @Override
864         public final void write(Object msg, ChannelPromise promise) {
865             assertEventLoop();
866 
867             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
868             if (outboundBuffer == null) {
869                 // If the outboundBuffer is null we know the channel was closed and so
870                 // need to fail the future right away. If it is not null the handling of the rest
871                 // will be done in flush0()
872                 // See https://github.com/netty/netty/issues/2362
873                 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
874                 // release message now to prevent resource-leak
875                 ReferenceCountUtil.release(msg);
876                 return;
877             }
878 
879             int size;
880             try {
881                 msg = filterOutboundMessage(msg);
882                 size = pipeline.estimatorHandle().size(msg);
883                 if (size < 0) {
884                     size = 0;
885                 }
886             } catch (Throwable t) {
887                 safeSetFailure(promise, t);
888                 ReferenceCountUtil.release(msg);
889                 return;
890             }
891 
892             outboundBuffer.addMessage(msg, size, promise);
893         }
894 
895         @Override
896         public final void flush() {
897             assertEventLoop();
898 
899             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
900             if (outboundBuffer == null) {
901                 return;
902             }
903 
904             outboundBuffer.addFlush();
905             flush0();
906         }
907 
908         @SuppressWarnings("deprecation")
909         protected void flush0() {
910             if (inFlush0) {
911                 // Avoid re-entrance
912                 return;
913             }
914 
915             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
916             if (outboundBuffer == null || outboundBuffer.isEmpty()) {
917                 return;
918             }
919 
920             inFlush0 = true;
921 
922             // Mark all pending write requests as failure if the channel is inactive.
923             if (!isActive()) {
924                 try {
925                     if (isOpen()) {
926                         outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
927                     } else {
928                         // Do not trigger channelWritabilityChanged because the channel is closed already.
929                         outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
930                     }
931                 } finally {
932                     inFlush0 = false;
933                 }
934                 return;
935             }
936 
937             try {
938                 doWrite(outboundBuffer);
939             } catch (Throwable t) {
940                 if (t instanceof IOException && config().isAutoClose()) {
941                     /**
942                      * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
943                      * failing all flushed messages and also ensure the actual close of the underlying transport
944                      * will happen before the promises are notified.
945                      *
946                      * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
947                      * may still return {@code true} even if the channel should be closed as result of the exception.
948                      */
949                     close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
950                 } else {
951                     try {
952                         shutdownOutput(voidPromise(), t);
953                     } catch (Throwable t2) {
954                         close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
955                     }
956                 }
957             } finally {
958                 inFlush0 = false;
959             }
960         }
961 
962         @Override
963         public final ChannelPromise voidPromise() {
964             assertEventLoop();
965 
966             return unsafeVoidPromise;
967         }
968 
969         protected final boolean ensureOpen(ChannelPromise promise) {
970             if (isOpen()) {
971                 return true;
972             }
973 
974             safeSetFailure(promise, ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION);
975             return false;
976         }
977 
978         /**
979          * Marks the specified {@code promise} as success.  If the {@code promise} is done already, log a message.
980          */
981         protected final void safeSetSuccess(ChannelPromise promise) {
982             if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
983                 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
984             }
985         }
986 
987         /**
988          * Marks the specified {@code promise} as failure.  If the {@code promise} is done already, log a message.
989          */
990         protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
991             if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
992                 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
993             }
994         }
995 
996         protected final void closeIfClosed() {
997             if (isOpen()) {
998                 return;
999             }
1000             close(voidPromise());
1001         }
1002 
1003         private void invokeLater(Runnable task) {
1004             try {
1005                 // This method is used by outbound operation implementations to trigger an inbound event later.
1006                 // They do not trigger an inbound event immediately because an outbound operation might have been
1007                 // triggered by another inbound event handler method.  If fired immediately, the call stack
1008                 // will look like this for example:
1009                 //
1010                 //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
1011                 //   -> handlerA.ctx.close()
1012                 //      -> channel.unsafe.close()
1013                 //         -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
1014                 //
1015                 // which means the execution of two inbound handler methods of the same handler overlap undesirably.
1016                 eventLoop().execute(task);
1017             } catch (RejectedExecutionException e) {
1018                 logger.warn("Can't invoke task later as EventLoop rejected it", e);
1019             }
1020         }
1021 
1022         /**
1023          * Appends the remote address to the message of the exceptions caused by connection attempt failure.
1024          */
1025         protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
1026             if (cause instanceof ConnectException) {
1027                 return new AnnotatedConnectException((ConnectException) cause, remoteAddress);
1028             }
1029             if (cause instanceof NoRouteToHostException) {
1030                 return new AnnotatedNoRouteToHostException((NoRouteToHostException) cause, remoteAddress);
1031             }
1032             if (cause instanceof SocketException) {
1033                 return new AnnotatedSocketException((SocketException) cause, remoteAddress);
1034             }
1035 
1036             return cause;
1037         }
1038 
1039         /**
1040          * Prepares to close the {@link Channel}. If this method returns an {@link Executor}, the
1041          * caller must call the {@link Executor#execute(Runnable)} method with a task that calls
1042          * {@link #doClose()} on the returned {@link Executor}. If this method returns {@code null},
1043          * {@link #doClose()} must be called from the caller thread. (i.e. {@link EventLoop})
1044          */
1045         protected Executor prepareToClose() {
1046             return null;
1047         }
1048     }
1049 
1050     /**
1051      * Return {@code true} if the given {@link EventLoop} is compatible with this instance.
1052      */
1053     protected abstract boolean isCompatible(EventLoop loop);
1054 
1055     /**
1056      * Returns the {@link SocketAddress} which is bound locally.
1057      */
1058     protected abstract SocketAddress localAddress0();
1059 
1060     /**
1061      * Return the {@link SocketAddress} which the {@link Channel} is connected to.
1062      */
1063     protected abstract SocketAddress remoteAddress0();
1064 
1065     /**
1066      * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
1067      *
1068      * Sub-classes may override this method
1069      */
1070     protected void doRegister() throws Exception {
1071         // NOOP
1072     }
1073 
1074     /**
1075      * Bind the {@link Channel} to the {@link SocketAddress}
1076      */
1077     protected abstract void doBind(SocketAddress localAddress) throws Exception;
1078 
1079     /**
1080      * Disconnect this {@link Channel} from its remote peer
1081      */
1082     protected abstract void doDisconnect() throws Exception;
1083 
1084     /**
1085      * Close the {@link Channel}
1086      */
1087     protected abstract void doClose() throws Exception;
1088 
1089     /**
1090      * Called when conditions justify shutting down the output portion of the channel. This may happen if a write
1091      * operation throws an exception.
1092      */
1093     @UnstableApi
1094     protected void doShutdownOutput() throws Exception {
1095         doClose();
1096     }
1097 
1098     /**
1099      * Deregister the {@link Channel} from its {@link EventLoop}.
1100      *
1101      * Sub-classes may override this method
1102      */
1103     protected void doDeregister() throws Exception {
1104         // NOOP
1105     }
1106 
1107     /**
1108      * Schedule a read operation.
1109      */
1110     protected abstract void doBeginRead() throws Exception;
1111 
1112     /**
1113      * Flush the content of the given buffer to the remote peer.
1114      */
1115     protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
1116 
1117     /**
1118      * Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that
1119      * the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer)
1120      */
1121     protected Object filterOutboundMessage(Object msg) throws Exception {
1122         return msg;
1123     }
1124 
1125     static final class CloseFuture extends DefaultChannelPromise {
1126 
1127         CloseFuture(AbstractChannel ch) {
1128             super(ch);
1129         }
1130 
1131         @Override
1132         public ChannelPromise setSuccess() {
1133             throw new IllegalStateException();
1134         }
1135 
1136         @Override
1137         public ChannelPromise setFailure(Throwable cause) {
1138             throw new IllegalStateException();
1139         }
1140 
1141         @Override
1142         public boolean trySuccess() {
1143             throw new IllegalStateException();
1144         }
1145 
1146         @Override
1147         public boolean tryFailure(Throwable cause) {
1148             throw new IllegalStateException();
1149         }
1150 
1151         boolean setClosed() {
1152             return super.trySuccess();
1153         }
1154     }
1155 
1156     private static final class AnnotatedConnectException extends ConnectException {
1157 
1158         private static final long serialVersionUID = 3901958112696433556L;
1159 
1160         AnnotatedConnectException(ConnectException exception, SocketAddress remoteAddress) {
1161             super(exception.getMessage() + ": " + remoteAddress);
1162             initCause(exception);
1163             setStackTrace(exception.getStackTrace());
1164         }
1165 
1166         @Override
1167         public Throwable fillInStackTrace() {
1168             return this;
1169         }
1170     }
1171 
1172     private static final class AnnotatedNoRouteToHostException extends NoRouteToHostException {
1173 
1174         private static final long serialVersionUID = -6801433937592080623L;
1175 
1176         AnnotatedNoRouteToHostException(NoRouteToHostException exception, SocketAddress remoteAddress) {
1177             super(exception.getMessage() + ": " + remoteAddress);
1178             initCause(exception);
1179             setStackTrace(exception.getStackTrace());
1180         }
1181 
1182         @Override
1183         public Throwable fillInStackTrace() {
1184             return this;
1185         }
1186     }
1187 
1188     private static final class AnnotatedSocketException extends SocketException {
1189 
1190         private static final long serialVersionUID = 3896743275010454039L;
1191 
1192         AnnotatedSocketException(SocketException exception, SocketAddress remoteAddress) {
1193             super(exception.getMessage() + ": " + remoteAddress);
1194             initCause(exception);
1195             setStackTrace(exception.getStackTrace());
1196         }
1197 
1198         @Override
1199         public Throwable fillInStackTrace() {
1200             return this;
1201         }
1202     }
1203 }