View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.channel.socket.ChannelOutputShutdownEvent;
20  import io.netty.channel.socket.ChannelOutputShutdownException;
21  import io.netty.util.DefaultAttributeMap;
22  import io.netty.util.ReferenceCountUtil;
23  import io.netty.util.internal.PlatformDependent;
24  import io.netty.util.internal.ThrowableUtil;
25  import io.netty.util.internal.UnstableApi;
26  import io.netty.util.internal.logging.InternalLogger;
27  import io.netty.util.internal.logging.InternalLoggerFactory;
28  
29  import java.io.IOException;
30  import java.net.ConnectException;
31  import java.net.InetSocketAddress;
32  import java.net.NoRouteToHostException;
33  import java.net.SocketAddress;
34  import java.net.SocketException;
35  import java.nio.channels.ClosedChannelException;
36  import java.nio.channels.NotYetConnectedException;
37  import java.util.concurrent.Executor;
38  import java.util.concurrent.RejectedExecutionException;
39  
40  /**
41   * A skeletal {@link Channel} implementation.
42   */
43  public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
44  
45      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
46  
47      private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
48              new ClosedChannelException(), AbstractUnsafe.class, "flush0()");
49      private static final ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
50              new ClosedChannelException(), AbstractUnsafe.class, "ensureOpen(...)");
51      private static final ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
52              new ClosedChannelException(), AbstractUnsafe.class, "close(...)");
53      private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
54              new ClosedChannelException(), AbstractUnsafe.class, "write(...)");
55      private static final NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION = ThrowableUtil.unknownStackTrace(
56              new NotYetConnectedException(), AbstractUnsafe.class, "flush0()");
57  
58      private final Channel parent;
59      private final ChannelId id;
60      private final Unsafe unsafe;
61      private final DefaultChannelPipeline pipeline;
62      private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
63      private final CloseFuture closeFuture = new CloseFuture(this);
64  
65      private volatile SocketAddress localAddress;
66      private volatile SocketAddress remoteAddress;
67      private volatile EventLoop eventLoop;
68      private volatile boolean registered;
69      private boolean closeInitiated;
70  
71      /** Cache for the string representation of this channel */
72      private boolean strValActive;
73      private String strVal;
74  
75      /**
76       * Creates a new instance.
77       *
78       * @param parent
79       *        the parent of this channel. {@code null} if there's no parent.
80       */
81      protected AbstractChannel(Channel parent) {
82          this.parent = parent;
83          id = newId();
84          unsafe = newUnsafe();
85          pipeline = newChannelPipeline();
86      }
87  
88      /**
89       * Creates a new instance.
90       *
91       * @param parent
92       *        the parent of this channel. {@code null} if there's no parent.
93       */
94      protected AbstractChannel(Channel parent, ChannelId id) {
95          this.parent = parent;
96          this.id = id;
97          unsafe = newUnsafe();
98          pipeline = newChannelPipeline();
99      }
100 
101     @Override
102     public final ChannelId id() {
103         return id;
104     }
105 
106     /**
107      * Returns a new {@link DefaultChannelId} instance. Subclasses may override this method to assign custom
108      * {@link ChannelId}s to {@link Channel}s that use the {@link AbstractChannel#AbstractChannel(Channel)} constructor.
109      */
110     protected ChannelId newId() {
111         return DefaultChannelId.newInstance();
112     }
113 
114     /**
115      * Returns a new {@link DefaultChannelPipeline} instance.
116      */
117     protected DefaultChannelPipeline newChannelPipeline() {
118         return new DefaultChannelPipeline(this);
119     }
120 
121     @Override
122     public boolean isWritable() {
123         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
124         return buf != null && buf.isWritable();
125     }
126 
127     @Override
128     public long bytesBeforeUnwritable() {
129         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
130         // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
131         // We should be consistent with that here.
132         return buf != null ? buf.bytesBeforeUnwritable() : 0;
133     }
134 
135     @Override
136     public long bytesBeforeWritable() {
137         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
138         // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
139         // We should be consistent with that here.
140         return buf != null ? buf.bytesBeforeWritable() : Long.MAX_VALUE;
141     }
142 
143     @Override
144     public Channel parent() {
145         return parent;
146     }
147 
148     @Override
149     public ChannelPipeline pipeline() {
150         return pipeline;
151     }
152 
153     @Override
154     public ByteBufAllocator alloc() {
155         return config().getAllocator();
156     }
157 
158     @Override
159     public EventLoop eventLoop() {
160         EventLoop eventLoop = this.eventLoop;
161         if (eventLoop == null) {
162             throw new IllegalStateException("channel not registered to an event loop");
163         }
164         return eventLoop;
165     }
166 
167     @Override
168     public SocketAddress localAddress() {
169         SocketAddress localAddress = this.localAddress;
170         if (localAddress == null) {
171             try {
172                 this.localAddress = localAddress = unsafe().localAddress();
173             } catch (Throwable t) {
174                 // Sometimes fails on a closed socket in Windows.
175                 return null;
176             }
177         }
178         return localAddress;
179     }
180 
181     /**
182      * @deprecated no use-case for this.
183      */
184     @Deprecated
185     protected void invalidateLocalAddress() {
186         localAddress = null;
187     }
188 
189     @Override
190     public SocketAddress remoteAddress() {
191         SocketAddress remoteAddress = this.remoteAddress;
192         if (remoteAddress == null) {
193             try {
194                 this.remoteAddress = remoteAddress = unsafe().remoteAddress();
195             } catch (Throwable t) {
196                 // Sometimes fails on a closed socket in Windows.
197                 return null;
198             }
199         }
200         return remoteAddress;
201     }
202 
203     /**
204      * @deprecated no use-case for this.
205      */
206     @Deprecated
207     protected void invalidateRemoteAddress() {
208         remoteAddress = null;
209     }
210 
211     @Override
212     public boolean isRegistered() {
213         return registered;
214     }
215 
216     @Override
217     public ChannelFuture bind(SocketAddress localAddress) {
218         return pipeline.bind(localAddress);
219     }
220 
221     @Override
222     public ChannelFuture connect(SocketAddress remoteAddress) {
223         return pipeline.connect(remoteAddress);
224     }
225 
226     @Override
227     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
228         return pipeline.connect(remoteAddress, localAddress);
229     }
230 
231     @Override
232     public ChannelFuture disconnect() {
233         return pipeline.disconnect();
234     }
235 
236     @Override
237     public ChannelFuture close() {
238         return pipeline.close();
239     }
240 
241     @Override
242     public ChannelFuture deregister() {
243         return pipeline.deregister();
244     }
245 
246     @Override
247     public Channel flush() {
248         pipeline.flush();
249         return this;
250     }
251 
252     @Override
253     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
254         return pipeline.bind(localAddress, promise);
255     }
256 
257     @Override
258     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
259         return pipeline.connect(remoteAddress, promise);
260     }
261 
262     @Override
263     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
264         return pipeline.connect(remoteAddress, localAddress, promise);
265     }
266 
267     @Override
268     public ChannelFuture disconnect(ChannelPromise promise) {
269         return pipeline.disconnect(promise);
270     }
271 
272     @Override
273     public ChannelFuture close(ChannelPromise promise) {
274         return pipeline.close(promise);
275     }
276 
277     @Override
278     public ChannelFuture deregister(ChannelPromise promise) {
279         return pipeline.deregister(promise);
280     }
281 
282     @Override
283     public Channel read() {
284         pipeline.read();
285         return this;
286     }
287 
288     @Override
289     public ChannelFuture write(Object msg) {
290         return pipeline.write(msg);
291     }
292 
293     @Override
294     public ChannelFuture write(Object msg, ChannelPromise promise) {
295         return pipeline.write(msg, promise);
296     }
297 
298     @Override
299     public ChannelFuture writeAndFlush(Object msg) {
300         return pipeline.writeAndFlush(msg);
301     }
302 
303     @Override
304     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
305         return pipeline.writeAndFlush(msg, promise);
306     }
307 
308     @Override
309     public ChannelPromise newPromise() {
310         return pipeline.newPromise();
311     }
312 
313     @Override
314     public ChannelProgressivePromise newProgressivePromise() {
315         return pipeline.newProgressivePromise();
316     }
317 
318     @Override
319     public ChannelFuture newSucceededFuture() {
320         return pipeline.newSucceededFuture();
321     }
322 
323     @Override
324     public ChannelFuture newFailedFuture(Throwable cause) {
325         return pipeline.newFailedFuture(cause);
326     }
327 
328     @Override
329     public ChannelFuture closeFuture() {
330         return closeFuture;
331     }
332 
333     @Override
334     public Unsafe unsafe() {
335         return unsafe;
336     }
337 
338     /**
339      * Create a new {@link AbstractUnsafe} instance which will be used for the life-time of the {@link Channel}
340      */
341     protected abstract AbstractUnsafe newUnsafe();
342 
343     /**
344      * Returns the ID of this channel.
345      */
346     @Override
347     public final int hashCode() {
348         return id.hashCode();
349     }
350 
351     /**
352      * Returns {@code true} if and only if the specified object is identical
353      * with this channel (i.e: {@code this == o}).
354      */
355     @Override
356     public final boolean equals(Object o) {
357         return this == o;
358     }
359 
360     @Override
361     public final int compareTo(Channel o) {
362         if (this == o) {
363             return 0;
364         }
365 
366         return id().compareTo(o.id());
367     }
368 
369     /**
370      * Returns the {@link String} representation of this channel.  The returned
371      * string contains the {@linkplain #hashCode() ID}, {@linkplain #localAddress() local address},
372      * and {@linkplain #remoteAddress() remote address} of this channel for
373      * easier identification.
374      */
375     @Override
376     public String toString() {
377         boolean active = isActive();
378         if (strValActive == active && strVal != null) {
379             return strVal;
380         }
381 
382         SocketAddress remoteAddr = remoteAddress();
383         SocketAddress localAddr = localAddress();
384         if (remoteAddr != null) {
385             StringBuilder buf = new StringBuilder(96)
386                 .append("[id: 0x")
387                 .append(id.asShortText())
388                 .append(", L:")
389                 .append(localAddr)
390                 .append(active? " - " : " ! ")
391                 .append("R:")
392                 .append(remoteAddr)
393                 .append(']');
394             strVal = buf.toString();
395         } else if (localAddr != null) {
396             StringBuilder buf = new StringBuilder(64)
397                 .append("[id: 0x")
398                 .append(id.asShortText())
399                 .append(", L:")
400                 .append(localAddr)
401                 .append(']');
402             strVal = buf.toString();
403         } else {
404             StringBuilder buf = new StringBuilder(16)
405                 .append("[id: 0x")
406                 .append(id.asShortText())
407                 .append(']');
408             strVal = buf.toString();
409         }
410 
411         strValActive = active;
412         return strVal;
413     }
414 
415     @Override
416     public final ChannelPromise voidPromise() {
417         return pipeline.voidPromise();
418     }
419 
420     /**
421      * {@link Unsafe} implementation which sub-classes must extend and use.
422      */
423     protected abstract class AbstractUnsafe implements Unsafe {
424 
425         private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
426         private RecvByteBufAllocator.Handle recvHandle;
427         private boolean inFlush0;
428         /** true if the channel has never been registered, false otherwise */
429         private boolean neverRegistered = true;
430 
431         private void assertEventLoop() {
432             assert !registered || eventLoop.inEventLoop();
433         }
434 
435         @Override
436         public RecvByteBufAllocator.Handle recvBufAllocHandle() {
437             if (recvHandle == null) {
438                 recvHandle = config().getRecvByteBufAllocator().newHandle();
439             }
440             return recvHandle;
441         }
442 
443         @Override
444         public final ChannelOutboundBuffer outboundBuffer() {
445             return outboundBuffer;
446         }
447 
448         @Override
449         public final SocketAddress localAddress() {
450             return localAddress0();
451         }
452 
453         @Override
454         public final SocketAddress remoteAddress() {
455             return remoteAddress0();
456         }
457 
458         @Override
459         public final void register(EventLoop eventLoop, final ChannelPromise promise) {
460             if (eventLoop == null) {
461                 throw new NullPointerException("eventLoop");
462             }
463             if (isRegistered()) {
464                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
465                 return;
466             }
467             if (!isCompatible(eventLoop)) {
468                 promise.setFailure(
469                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
470                 return;
471             }
472 
473             AbstractChannel.this.eventLoop = eventLoop;
474 
475             if (eventLoop.inEventLoop()) {
476                 register0(promise);
477             } else {
478                 try {
479                     eventLoop.execute(new Runnable() {
480                         @Override
481                         public void run() {
482                             register0(promise);
483                         }
484                     });
485                 } catch (Throwable t) {
486                     logger.warn(
487                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",
488                             AbstractChannel.this, t);
489                     closeForcibly();
490                     closeFuture.setClosed();
491                     safeSetFailure(promise, t);
492                 }
493             }
494         }
495 
496         private void register0(ChannelPromise promise) {
497             try {
498                 // check if the channel is still open as it could be closed in the mean time when the register
499                 // call was outside of the eventLoop
500                 if (!promise.setUncancellable() || !ensureOpen(promise)) {
501                     return;
502                 }
503                 boolean firstRegistration = neverRegistered;
504                 doRegister();
505                 neverRegistered = false;
506                 registered = true;
507 
508                 // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
509                 // user may already fire events through the pipeline in the ChannelFutureListener.
510                 pipeline.invokeHandlerAddedIfNeeded();
511 
512                 safeSetSuccess(promise);
513                 pipeline.fireChannelRegistered();
514                 // Only fire a channelActive if the channel has never been registered. This prevents firing
515                 // multiple channel actives if the channel is deregistered and re-registered.
516                 if (isActive()) {
517                     if (firstRegistration) {
518                         pipeline.fireChannelActive();
519                     } else if (config().isAutoRead()) {
520                         // This channel was registered before and autoRead() is set. This means we need to begin read
521                         // again so that we process inbound data.
522                         //
523                         // See https://github.com/netty/netty/issues/4805
524                         beginRead();
525                     }
526                 }
527             } catch (Throwable t) {
528                 // Close the channel directly to avoid FD leak.
529                 closeForcibly();
530                 closeFuture.setClosed();
531                 safeSetFailure(promise, t);
532             }
533         }
534 
535         @Override
536         public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
537             assertEventLoop();
538 
539             if (!promise.setUncancellable() || !ensureOpen(promise)) {
540                 return;
541             }
542 
543             // See: https://github.com/netty/netty/issues/576
544             if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
545                 localAddress instanceof InetSocketAddress &&
546                 !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
547                 !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
548                 // Warn a user about the fact that a non-root user can't receive a
549                 // broadcast packet on *nix if the socket is bound on non-wildcard address.
550                 logger.warn(
551                         "A non-root user can't receive a broadcast packet if the socket " +
552                         "is not bound to a wildcard address; binding to a non-wildcard " +
553                         "address (" + localAddress + ") anyway as requested.");
554             }
555 
556             boolean wasActive = isActive();
557             try {
558                 doBind(localAddress);
559             } catch (Throwable t) {
560                 safeSetFailure(promise, t);
561                 closeIfClosed();
562                 return;
563             }
564 
565             if (!wasActive && isActive()) {
566                 invokeLater(new Runnable() {
567                     @Override
568                     public void run() {
569                         pipeline.fireChannelActive();
570                     }
571                 });
572             }
573 
574             safeSetSuccess(promise);
575         }
576 
577         @Override
578         public final void disconnect(final ChannelPromise promise) {
579             assertEventLoop();
580 
581             if (!promise.setUncancellable()) {
582                 return;
583             }
584 
585             boolean wasActive = isActive();
586             try {
587                 doDisconnect();
588             } catch (Throwable t) {
589                 safeSetFailure(promise, t);
590                 closeIfClosed();
591                 return;
592             }
593 
594             if (wasActive && !isActive()) {
595                 invokeLater(new Runnable() {
596                     @Override
597                     public void run() {
598                         pipeline.fireChannelInactive();
599                     }
600                 });
601             }
602 
603             safeSetSuccess(promise);
604             closeIfClosed(); // doDisconnect() might have closed the channel
605         }
606 
607         @Override
608         public final void close(final ChannelPromise promise) {
609             assertEventLoop();
610 
611             close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
612         }
613 
614         /**
615          * Shutdown the output portion of the corresponding {@link Channel}.
616          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
617          */
618         @UnstableApi
619         public final void shutdownOutput(final ChannelPromise promise) {
620             assertEventLoop();
621             shutdownOutput(promise, null);
622         }
623 
624         /**
625          * Shutdown the output portion of the corresponding {@link Channel}.
626          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
627          * @param cause The cause which may provide rational for the shutdown.
628          */
629         private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
630             if (!promise.setUncancellable()) {
631                 return;
632             }
633 
634             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
635             if (outboundBuffer == null) {
636                 promise.setFailure(CLOSE_CLOSED_CHANNEL_EXCEPTION);
637                 return;
638             }
639             this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
640 
641             final Throwable shutdownCause = cause == null ?
642                     new ChannelOutputShutdownException("Channel output shutdown") :
643                     new ChannelOutputShutdownException("Channel output shutdown", cause);
644             Executor closeExecutor = prepareToClose();
645             if (closeExecutor != null) {
646                 closeExecutor.execute(new Runnable() {
647                     @Override
648                     public void run() {
649                         try {
650                             // Execute the shutdown.
651                             doShutdownOutput();
652                             promise.setSuccess();
653                         } catch (Throwable err) {
654                             promise.setFailure(err);
655                         } finally {
656                             // Dispatch to the EventLoop
657                             eventLoop().execute(new Runnable() {
658                                 @Override
659                                 public void run() {
660                                     closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
661                                 }
662                             });
663                         }
664                     }
665                 });
666             } else {
667                 try {
668                     // Execute the shutdown.
669                     doShutdownOutput();
670                     promise.setSuccess();
671                 } catch (Throwable err) {
672                     promise.setFailure(err);
673                 } finally {
674                     closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
675                 }
676             }
677         }
678 
679         private void closeOutboundBufferForShutdown(
680                 ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
681             buffer.failFlushed(cause, false);
682             buffer.close(cause, true);
683             pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
684         }
685 
686         private void close(final ChannelPromise promise, final Throwable cause,
687                            final ClosedChannelException closeCause, final boolean notify) {
688             if (!promise.setUncancellable()) {
689                 return;
690             }
691 
692             if (closeInitiated) {
693                 if (closeFuture.isDone()) {
694                     // Closed already.
695                     safeSetSuccess(promise);
696                 } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
697                     // This means close() was called before so we just register a listener and return
698                     closeFuture.addListener(new ChannelFutureListener() {
699                         @Override
700                         public void operationComplete(ChannelFuture future) throws Exception {
701                             promise.setSuccess();
702                         }
703                     });
704                 }
705                 return;
706             }
707 
708             closeInitiated = true;
709 
710             final boolean wasActive = isActive();
711             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
712             this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
713             Executor closeExecutor = prepareToClose();
714             if (closeExecutor != null) {
715                 closeExecutor.execute(new Runnable() {
716                     @Override
717                     public void run() {
718                         try {
719                             // Execute the close.
720                             doClose0(promise);
721                         } finally {
722                             // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
723                             invokeLater(new Runnable() {
724                                 @Override
725                                 public void run() {
726                                     if (outboundBuffer != null) {
727                                         // Fail all the queued messages
728                                         outboundBuffer.failFlushed(cause, notify);
729                                         outboundBuffer.close(closeCause);
730                                     }
731                                     fireChannelInactiveAndDeregister(wasActive);
732                                 }
733                             });
734                         }
735                     }
736                 });
737             } else {
738                 try {
739                     // Close the channel and fail the queued messages in all cases.
740                     doClose0(promise);
741                 } finally {
742                     if (outboundBuffer != null) {
743                         // Fail all the queued messages.
744                         outboundBuffer.failFlushed(cause, notify);
745                         outboundBuffer.close(closeCause);
746                     }
747                 }
748                 if (inFlush0) {
749                     invokeLater(new Runnable() {
750                         @Override
751                         public void run() {
752                             fireChannelInactiveAndDeregister(wasActive);
753                         }
754                     });
755                 } else {
756                     fireChannelInactiveAndDeregister(wasActive);
757                 }
758             }
759         }
760 
761         private void doClose0(ChannelPromise promise) {
762             try {
763                 doClose();
764                 closeFuture.setClosed();
765                 safeSetSuccess(promise);
766             } catch (Throwable t) {
767                 closeFuture.setClosed();
768                 safeSetFailure(promise, t);
769             }
770         }
771 
772         private void fireChannelInactiveAndDeregister(final boolean wasActive) {
773             deregister(voidPromise(), wasActive && !isActive());
774         }
775 
776         @Override
777         public final void closeForcibly() {
778             assertEventLoop();
779 
780             try {
781                 doClose();
782             } catch (Exception e) {
783                 logger.warn("Failed to close a channel.", e);
784             }
785         }
786 
787         @Override
788         public final void deregister(final ChannelPromise promise) {
789             assertEventLoop();
790 
791             deregister(promise, false);
792         }
793 
794         private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
795             if (!promise.setUncancellable()) {
796                 return;
797             }
798 
799             if (!registered) {
800                 safeSetSuccess(promise);
801                 return;
802             }
803 
804             // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
805             // we need to ensure we do the actual deregister operation later. This is needed as for example,
806             // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
807             // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
808             // the deregister operation this could lead to have a handler invoked by different EventLoop and so
809             // threads.
810             //
811             // See:
812             // https://github.com/netty/netty/issues/4435
813             invokeLater(new Runnable() {
814                 @Override
815                 public void run() {
816                     try {
817                         doDeregister();
818                     } catch (Throwable t) {
819                         logger.warn("Unexpected exception occurred while deregistering a channel.", t);
820                     } finally {
821                         if (fireChannelInactive) {
822                             pipeline.fireChannelInactive();
823                         }
824                         // Some transports like local and AIO does not allow the deregistration of
825                         // an open channel.  Their doDeregister() calls close(). Consequently,
826                         // close() calls deregister() again - no need to fire channelUnregistered, so check
827                         // if it was registered.
828                         if (registered) {
829                             registered = false;
830                             pipeline.fireChannelUnregistered();
831                         }
832                         safeSetSuccess(promise);
833                     }
834                 }
835             });
836         }
837 
838         @Override
839         public final void beginRead() {
840             assertEventLoop();
841 
842             if (!isActive()) {
843                 return;
844             }
845 
846             try {
847                 doBeginRead();
848             } catch (final Exception e) {
849                 invokeLater(new Runnable() {
850                     @Override
851                     public void run() {
852                         pipeline.fireExceptionCaught(e);
853                     }
854                 });
855                 close(voidPromise());
856             }
857         }
858 
859         @Override
860         public final void write(Object msg, ChannelPromise promise) {
861             assertEventLoop();
862 
863             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
864             if (outboundBuffer == null) {
865                 // If the outboundBuffer is null we know the channel was closed and so
866                 // need to fail the future right away. If it is not null the handling of the rest
867                 // will be done in flush0()
868                 // See https://github.com/netty/netty/issues/2362
869                 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
870                 // release message now to prevent resource-leak
871                 ReferenceCountUtil.release(msg);
872                 return;
873             }
874 
875             int size;
876             try {
877                 msg = filterOutboundMessage(msg);
878                 size = pipeline.estimatorHandle().size(msg);
879                 if (size < 0) {
880                     size = 0;
881                 }
882             } catch (Throwable t) {
883                 safeSetFailure(promise, t);
884                 ReferenceCountUtil.release(msg);
885                 return;
886             }
887 
888             outboundBuffer.addMessage(msg, size, promise);
889         }
890 
891         @Override
892         public final void flush() {
893             assertEventLoop();
894 
895             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
896             if (outboundBuffer == null) {
897                 return;
898             }
899 
900             outboundBuffer.addFlush();
901             flush0();
902         }
903 
904         @SuppressWarnings("deprecation")
905         protected void flush0() {
906             if (inFlush0) {
907                 // Avoid re-entrance
908                 return;
909             }
910 
911             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
912             if (outboundBuffer == null || outboundBuffer.isEmpty()) {
913                 return;
914             }
915 
916             inFlush0 = true;
917 
918             // Mark all pending write requests as failure if the channel is inactive.
919             if (!isActive()) {
920                 try {
921                     if (isOpen()) {
922                         outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
923                     } else {
924                         // Do not trigger channelWritabilityChanged because the channel is closed already.
925                         outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
926                     }
927                 } finally {
928                     inFlush0 = false;
929                 }
930                 return;
931             }
932 
933             try {
934                 doWrite(outboundBuffer);
935             } catch (Throwable t) {
936                 if (t instanceof IOException && config().isAutoClose()) {
937                     /**
938                      * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
939                      * failing all flushed messages and also ensure the actual close of the underlying transport
940                      * will happen before the promises are notified.
941                      *
942                      * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
943                      * may still return {@code true} even if the channel should be closed as result of the exception.
944                      */
945                     close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
946                 } else {
947                     try {
948                         shutdownOutput(voidPromise(), t);
949                     } catch (Throwable t2) {
950                         close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
951                     }
952                 }
953             } finally {
954                 inFlush0 = false;
955             }
956         }
957 
958         @Override
959         public final ChannelPromise voidPromise() {
960             assertEventLoop();
961 
962             return unsafeVoidPromise;
963         }
964 
965         protected final boolean ensureOpen(ChannelPromise promise) {
966             if (isOpen()) {
967                 return true;
968             }
969 
970             safeSetFailure(promise, ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION);
971             return false;
972         }
973 
974         /**
975          * Marks the specified {@code promise} as success.  If the {@code promise} is done already, log a message.
976          */
977         protected final void safeSetSuccess(ChannelPromise promise) {
978             if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
979                 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
980             }
981         }
982 
983         /**
984          * Marks the specified {@code promise} as failure.  If the {@code promise} is done already, log a message.
985          */
986         protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
987             if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
988                 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
989             }
990         }
991 
992         protected final void closeIfClosed() {
993             if (isOpen()) {
994                 return;
995             }
996             close(voidPromise());
997         }
998 
999         private void invokeLater(Runnable task) {
1000             try {
1001                 // This method is used by outbound operation implementations to trigger an inbound event later.
1002                 // They do not trigger an inbound event immediately because an outbound operation might have been
1003                 // triggered by another inbound event handler method.  If fired immediately, the call stack
1004                 // will look like this for example:
1005                 //
1006                 //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
1007                 //   -> handlerA.ctx.close()
1008                 //      -> channel.unsafe.close()
1009                 //         -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
1010                 //
1011                 // which means the execution of two inbound handler methods of the same handler overlap undesirably.
1012                 eventLoop().execute(task);
1013             } catch (RejectedExecutionException e) {
1014                 logger.warn("Can't invoke task later as EventLoop rejected it", e);
1015             }
1016         }
1017 
1018         /**
1019          * Appends the remote address to the message of the exceptions caused by connection attempt failure.
1020          */
1021         protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
1022             if (cause instanceof ConnectException) {
1023                 return new AnnotatedConnectException((ConnectException) cause, remoteAddress);
1024             }
1025             if (cause instanceof NoRouteToHostException) {
1026                 return new AnnotatedNoRouteToHostException((NoRouteToHostException) cause, remoteAddress);
1027             }
1028             if (cause instanceof SocketException) {
1029                 return new AnnotatedSocketException((SocketException) cause, remoteAddress);
1030             }
1031 
1032             return cause;
1033         }
1034 
1035         /**
1036          * Prepares to close the {@link Channel}. If this method returns an {@link Executor}, the
1037          * caller must call the {@link Executor#execute(Runnable)} method with a task that calls
1038          * {@link #doClose()} on the returned {@link Executor}. If this method returns {@code null},
1039          * {@link #doClose()} must be called from the caller thread. (i.e. {@link EventLoop})
1040          */
1041         protected Executor prepareToClose() {
1042             return null;
1043         }
1044     }
1045 
1046     /**
1047      * Return {@code true} if the given {@link EventLoop} is compatible with this instance.
1048      */
1049     protected abstract boolean isCompatible(EventLoop loop);
1050 
1051     /**
1052      * Returns the {@link SocketAddress} which is bound locally.
1053      */
1054     protected abstract SocketAddress localAddress0();
1055 
1056     /**
1057      * Return the {@link SocketAddress} which the {@link Channel} is connected to.
1058      */
1059     protected abstract SocketAddress remoteAddress0();
1060 
1061     /**
1062      * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
1063      *
1064      * Sub-classes may override this method
1065      */
1066     protected void doRegister() throws Exception {
1067         // NOOP
1068     }
1069 
1070     /**
1071      * Bind the {@link Channel} to the {@link SocketAddress}
1072      */
1073     protected abstract void doBind(SocketAddress localAddress) throws Exception;
1074 
1075     /**
1076      * Disconnect this {@link Channel} from its remote peer
1077      */
1078     protected abstract void doDisconnect() throws Exception;
1079 
1080     /**
1081      * Close the {@link Channel}
1082      */
1083     protected abstract void doClose() throws Exception;
1084 
1085     /**
1086      * Called when conditions justify shutting down the output portion of the channel. This may happen if a write
1087      * operation throws an exception.
1088      */
1089     @UnstableApi
1090     protected void doShutdownOutput() throws Exception {
1091         doClose();
1092     }
1093 
1094     /**
1095      * Deregister the {@link Channel} from its {@link EventLoop}.
1096      *
1097      * Sub-classes may override this method
1098      */
1099     protected void doDeregister() throws Exception {
1100         // NOOP
1101     }
1102 
1103     /**
1104      * Schedule a read operation.
1105      */
1106     protected abstract void doBeginRead() throws Exception;
1107 
1108     /**
1109      * Flush the content of the given buffer to the remote peer.
1110      */
1111     protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
1112 
1113     /**
1114      * Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that
1115      * the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer)
1116      */
1117     protected Object filterOutboundMessage(Object msg) throws Exception {
1118         return msg;
1119     }
1120 
1121     static final class CloseFuture extends DefaultChannelPromise {
1122 
1123         CloseFuture(AbstractChannel ch) {
1124             super(ch);
1125         }
1126 
1127         @Override
1128         public ChannelPromise setSuccess() {
1129             throw new IllegalStateException();
1130         }
1131 
1132         @Override
1133         public ChannelPromise setFailure(Throwable cause) {
1134             throw new IllegalStateException();
1135         }
1136 
1137         @Override
1138         public boolean trySuccess() {
1139             throw new IllegalStateException();
1140         }
1141 
1142         @Override
1143         public boolean tryFailure(Throwable cause) {
1144             throw new IllegalStateException();
1145         }
1146 
1147         boolean setClosed() {
1148             return super.trySuccess();
1149         }
1150     }
1151 
1152     private static final class AnnotatedConnectException extends ConnectException {
1153 
1154         private static final long serialVersionUID = 3901958112696433556L;
1155 
1156         AnnotatedConnectException(ConnectException exception, SocketAddress remoteAddress) {
1157             super(exception.getMessage() + ": " + remoteAddress);
1158             initCause(exception);
1159             setStackTrace(exception.getStackTrace());
1160         }
1161 
1162         @Override
1163         public Throwable fillInStackTrace() {
1164             return this;
1165         }
1166     }
1167 
1168     private static final class AnnotatedNoRouteToHostException extends NoRouteToHostException {
1169 
1170         private static final long serialVersionUID = -6801433937592080623L;
1171 
1172         AnnotatedNoRouteToHostException(NoRouteToHostException exception, SocketAddress remoteAddress) {
1173             super(exception.getMessage() + ": " + remoteAddress);
1174             initCause(exception);
1175             setStackTrace(exception.getStackTrace());
1176         }
1177 
1178         @Override
1179         public Throwable fillInStackTrace() {
1180             return this;
1181         }
1182     }
1183 
1184     private static final class AnnotatedSocketException extends SocketException {
1185 
1186         private static final long serialVersionUID = 3896743275010454039L;
1187 
1188         AnnotatedSocketException(SocketException exception, SocketAddress remoteAddress) {
1189             super(exception.getMessage() + ": " + remoteAddress);
1190             initCause(exception);
1191             setStackTrace(exception.getStackTrace());
1192         }
1193 
1194         @Override
1195         public Throwable fillInStackTrace() {
1196             return this;
1197         }
1198     }
1199 }