View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.channel.socket.ChannelOutputShutdownEvent;
20  import io.netty.channel.socket.ChannelOutputShutdownException;
21  import io.netty.util.DefaultAttributeMap;
22  import io.netty.util.ReferenceCountUtil;
23  import io.netty.util.internal.ObjectUtil;
24  import io.netty.util.internal.PlatformDependent;
25  import io.netty.util.internal.logging.InternalLogger;
26  import io.netty.util.internal.logging.InternalLoggerFactory;
27  
28  import java.io.IOException;
29  import java.net.ConnectException;
30  import java.net.InetSocketAddress;
31  import java.net.NoRouteToHostException;
32  import java.net.SocketAddress;
33  import java.net.SocketException;
34  import java.nio.channels.ClosedChannelException;
35  import java.nio.channels.NotYetConnectedException;
36  import java.util.concurrent.Executor;
37  import java.util.concurrent.RejectedExecutionException;
38  
39  /**
40   * A skeletal {@link Channel} implementation.
41   */
42  public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
43  
44      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
45  
46      private final Channel parent;
47      private final ChannelId id;
48      private final Unsafe unsafe;
49      private final DefaultChannelPipeline pipeline;
50      private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
51      private final CloseFuture closeFuture = new CloseFuture(this);
52  
53      private volatile SocketAddress localAddress;
54      private volatile SocketAddress remoteAddress;
55      private volatile EventLoop eventLoop;
56      private volatile boolean registered;
57      private boolean closeInitiated;
58      private Throwable initialCloseCause;
59  
60      /** Cache for the string representation of this channel */
61      private boolean strValActive;
62      private String strVal;
63  
64      /**
65       * Creates a new instance.
66       *
67       * @param parent
68       *        the parent of this channel. {@code null} if there's no parent.
69       */
70      protected AbstractChannel(Channel parent) {
71          this.parent = parent;
72          id = newId();
73          unsafe = newUnsafe();
74          pipeline = newChannelPipeline();
75      }
76  
77      /**
78       * Creates a new instance.
79       *
80       * @param parent
81       *        the parent of this channel. {@code null} if there's no parent.
82       */
83      protected AbstractChannel(Channel parent, ChannelId id) {
84          this.parent = parent;
85          this.id = id;
86          unsafe = newUnsafe();
87          pipeline = newChannelPipeline();
88      }
89  
90      protected final int maxMessagesPerWrite() {
91          ChannelConfig config = config();
92          if (config instanceof DefaultChannelConfig) {
93              return ((DefaultChannelConfig) config).getMaxMessagesPerWrite();
94          }
95          Integer value = config.getOption(ChannelOption.MAX_MESSAGES_PER_WRITE);
96          if (value == null) {
97              return Integer.MAX_VALUE;
98          }
99          return value;
100     }
101 
102     @Override
103     public final ChannelId id() {
104         return id;
105     }
106 
107     /**
108      * Returns a new {@link DefaultChannelId} instance. Subclasses may override this method to assign custom
109      * {@link ChannelId}s to {@link Channel}s that use the {@link AbstractChannel#AbstractChannel(Channel)} constructor.
110      */
111     protected ChannelId newId() {
112         return DefaultChannelId.newInstance();
113     }
114 
115     /**
116      * Returns a new {@link DefaultChannelPipeline} instance.
117      */
118     protected DefaultChannelPipeline newChannelPipeline() {
119         return new DefaultChannelPipeline(this);
120     }
121 
122     @Override
123     public boolean isWritable() {
124         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
125         return buf != null && buf.isWritable();
126     }
127 
128     @Override
129     public long bytesBeforeUnwritable() {
130         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
131         // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
132         // We should be consistent with that here.
133         return buf != null ? buf.bytesBeforeUnwritable() : 0;
134     }
135 
136     @Override
137     public long bytesBeforeWritable() {
138         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
139         // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
140         // We should be consistent with that here.
141         return buf != null ? buf.bytesBeforeWritable() : Long.MAX_VALUE;
142     }
143 
144     @Override
145     public Channel parent() {
146         return parent;
147     }
148 
149     @Override
150     public ChannelPipeline pipeline() {
151         return pipeline;
152     }
153 
154     @Override
155     public ByteBufAllocator alloc() {
156         return config().getAllocator();
157     }
158 
159     @Override
160     public EventLoop eventLoop() {
161         EventLoop eventLoop = this.eventLoop;
162         if (eventLoop == null) {
163             throw new IllegalStateException("channel not registered to an event loop");
164         }
165         return eventLoop;
166     }
167 
168     @Override
169     public SocketAddress localAddress() {
170         SocketAddress localAddress = this.localAddress;
171         if (localAddress == null) {
172             try {
173                 this.localAddress = localAddress = unsafe().localAddress();
174             } catch (Error e) {
175                 throw e;
176             } catch (Throwable t) {
177                 // Sometimes fails on a closed socket in Windows.
178                 return null;
179             }
180         }
181         return localAddress;
182     }
183 
184     /**
185      * @deprecated no use-case for this.
186      */
187     @Deprecated
188     protected void invalidateLocalAddress() {
189         localAddress = null;
190     }
191 
192     @Override
193     public SocketAddress remoteAddress() {
194         SocketAddress remoteAddress = this.remoteAddress;
195         if (remoteAddress == null) {
196             try {
197                 this.remoteAddress = remoteAddress = unsafe().remoteAddress();
198             } catch (Error e) {
199                 throw e;
200             } catch (Throwable t) {
201                 // Sometimes fails on a closed socket in Windows.
202                 return null;
203             }
204         }
205         return remoteAddress;
206     }
207 
208     /**
209      * @deprecated no use-case for this.
210      */
211     @Deprecated
212     protected void invalidateRemoteAddress() {
213         remoteAddress = null;
214     }
215 
216     @Override
217     public boolean isRegistered() {
218         return registered;
219     }
220 
221     @Override
222     public ChannelFuture bind(SocketAddress localAddress) {
223         return pipeline.bind(localAddress);
224     }
225 
226     @Override
227     public ChannelFuture connect(SocketAddress remoteAddress) {
228         return pipeline.connect(remoteAddress);
229     }
230 
231     @Override
232     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
233         return pipeline.connect(remoteAddress, localAddress);
234     }
235 
236     @Override
237     public ChannelFuture disconnect() {
238         return pipeline.disconnect();
239     }
240 
241     @Override
242     public ChannelFuture close() {
243         return pipeline.close();
244     }
245 
246     @Override
247     public ChannelFuture deregister() {
248         return pipeline.deregister();
249     }
250 
251     @Override
252     public Channel flush() {
253         pipeline.flush();
254         return this;
255     }
256 
257     @Override
258     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
259         return pipeline.bind(localAddress, promise);
260     }
261 
262     @Override
263     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
264         return pipeline.connect(remoteAddress, promise);
265     }
266 
267     @Override
268     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
269         return pipeline.connect(remoteAddress, localAddress, promise);
270     }
271 
272     @Override
273     public ChannelFuture disconnect(ChannelPromise promise) {
274         return pipeline.disconnect(promise);
275     }
276 
277     @Override
278     public ChannelFuture close(ChannelPromise promise) {
279         return pipeline.close(promise);
280     }
281 
282     @Override
283     public ChannelFuture deregister(ChannelPromise promise) {
284         return pipeline.deregister(promise);
285     }
286 
287     @Override
288     public Channel read() {
289         pipeline.read();
290         return this;
291     }
292 
293     @Override
294     public ChannelFuture write(Object msg) {
295         return pipeline.write(msg);
296     }
297 
298     @Override
299     public ChannelFuture write(Object msg, ChannelPromise promise) {
300         return pipeline.write(msg, promise);
301     }
302 
303     @Override
304     public ChannelFuture writeAndFlush(Object msg) {
305         return pipeline.writeAndFlush(msg);
306     }
307 
308     @Override
309     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
310         return pipeline.writeAndFlush(msg, promise);
311     }
312 
313     @Override
314     public ChannelPromise newPromise() {
315         return pipeline.newPromise();
316     }
317 
318     @Override
319     public ChannelProgressivePromise newProgressivePromise() {
320         return pipeline.newProgressivePromise();
321     }
322 
323     @Override
324     public ChannelFuture newSucceededFuture() {
325         return pipeline.newSucceededFuture();
326     }
327 
328     @Override
329     public ChannelFuture newFailedFuture(Throwable cause) {
330         return pipeline.newFailedFuture(cause);
331     }
332 
333     @Override
334     public ChannelFuture closeFuture() {
335         return closeFuture;
336     }
337 
338     @Override
339     public Unsafe unsafe() {
340         return unsafe;
341     }
342 
343     /**
344      * Create a new {@link AbstractUnsafe} instance which will be used for the life-time of the {@link Channel}
345      */
346     protected abstract AbstractUnsafe newUnsafe();
347 
348     /**
349      * Returns the ID of this channel.
350      */
351     @Override
352     public final int hashCode() {
353         return id.hashCode();
354     }
355 
356     /**
357      * Returns {@code true} if and only if the specified object is identical
358      * with this channel (i.e: {@code this == o}).
359      */
360     @Override
361     public final boolean equals(Object o) {
362         return this == o;
363     }
364 
365     @Override
366     public final int compareTo(Channel o) {
367         if (this == o) {
368             return 0;
369         }
370 
371         return id().compareTo(o.id());
372     }
373 
374     /**
375      * Returns the {@link String} representation of this channel.  The returned
376      * string contains the {@linkplain #hashCode() ID}, {@linkplain #localAddress() local address},
377      * and {@linkplain #remoteAddress() remote address} of this channel for
378      * easier identification.
379      */
380     @Override
381     public String toString() {
382         boolean active = isActive();
383         if (strValActive == active && strVal != null) {
384             return strVal;
385         }
386 
387         SocketAddress remoteAddr = remoteAddress();
388         SocketAddress localAddr = localAddress();
389         if (remoteAddr != null) {
390             StringBuilder buf = new StringBuilder(96)
391                 .append("[id: 0x")
392                 .append(id.asShortText())
393                 .append(", L:")
394                 .append(localAddr)
395                 .append(active? " - " : " ! ")
396                 .append("R:")
397                 .append(remoteAddr)
398                 .append(']');
399             strVal = buf.toString();
400         } else if (localAddr != null) {
401             StringBuilder buf = new StringBuilder(64)
402                 .append("[id: 0x")
403                 .append(id.asShortText())
404                 .append(", L:")
405                 .append(localAddr)
406                 .append(']');
407             strVal = buf.toString();
408         } else {
409             StringBuilder buf = new StringBuilder(16)
410                 .append("[id: 0x")
411                 .append(id.asShortText())
412                 .append(']');
413             strVal = buf.toString();
414         }
415 
416         strValActive = active;
417         return strVal;
418     }
419 
420     @Override
421     public final ChannelPromise voidPromise() {
422         return pipeline.voidPromise();
423     }
424 
425     /**
426      * {@link Unsafe} implementation which sub-classes must extend and use.
427      */
428     protected abstract class AbstractUnsafe implements Unsafe {
429 
430         private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
431         private RecvByteBufAllocator.Handle recvHandle;
432         private boolean inFlush0;
433         /** true if the channel has never been registered, false otherwise */
434         private boolean neverRegistered = true;
435 
436         private void assertEventLoop() {
437             assert !registered || eventLoop.inEventLoop();
438         }
439 
440         @Override
441         public RecvByteBufAllocator.Handle recvBufAllocHandle() {
442             if (recvHandle == null) {
443                 recvHandle = config().getRecvByteBufAllocator().newHandle();
444             }
445             return recvHandle;
446         }
447 
448         @Override
449         public final ChannelOutboundBuffer outboundBuffer() {
450             return outboundBuffer;
451         }
452 
453         @Override
454         public final SocketAddress localAddress() {
455             return localAddress0();
456         }
457 
458         @Override
459         public final SocketAddress remoteAddress() {
460             return remoteAddress0();
461         }
462 
463         @Override
464         public final void register(EventLoop eventLoop, final ChannelPromise promise) {
465             ObjectUtil.checkNotNull(eventLoop, "eventLoop");
466             if (isRegistered()) {
467                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
468                 return;
469             }
470             if (!isCompatible(eventLoop)) {
471                 promise.setFailure(
472                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
473                 return;
474             }
475 
476             AbstractChannel.this.eventLoop = eventLoop;
477 
478             if (eventLoop.inEventLoop()) {
479                 register0(promise);
480             } else {
481                 try {
482                     eventLoop.execute(new Runnable() {
483                         @Override
484                         public void run() {
485                             register0(promise);
486                         }
487                     });
488                 } catch (Throwable t) {
489                     logger.warn(
490                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",
491                             AbstractChannel.this, t);
492                     closeForcibly();
493                     closeFuture.setClosed();
494                     safeSetFailure(promise, t);
495                 }
496             }
497         }
498 
499         private void register0(ChannelPromise promise) {
500             try {
501                 // check if the channel is still open as it could be closed in the mean time when the register
502                 // call was outside of the eventLoop
503                 if (!promise.setUncancellable() || !ensureOpen(promise)) {
504                     return;
505                 }
506                 boolean firstRegistration = neverRegistered;
507                 doRegister();
508                 neverRegistered = false;
509                 registered = true;
510 
511                 // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
512                 // user may already fire events through the pipeline in the ChannelFutureListener.
513                 pipeline.invokeHandlerAddedIfNeeded();
514 
515                 safeSetSuccess(promise);
516                 pipeline.fireChannelRegistered();
517                 // Only fire a channelActive if the channel has never been registered. This prevents firing
518                 // multiple channel actives if the channel is deregistered and re-registered.
519                 if (isActive()) {
520                     if (firstRegistration) {
521                         pipeline.fireChannelActive();
522                     } else if (config().isAutoRead()) {
523                         // This channel was registered before and autoRead() is set. This means we need to begin read
524                         // again so that we process inbound data.
525                         //
526                         // See https://github.com/netty/netty/issues/4805
527                         beginRead();
528                     }
529                 }
530             } catch (Throwable t) {
531                 // Close the channel directly to avoid FD leak.
532                 closeForcibly();
533                 closeFuture.setClosed();
534                 safeSetFailure(promise, t);
535             }
536         }
537 
538         @Override
539         public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
540             assertEventLoop();
541 
542             if (!promise.setUncancellable() || !ensureOpen(promise)) {
543                 return;
544             }
545 
546             // See: https://github.com/netty/netty/issues/576
547             if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
548                 localAddress instanceof InetSocketAddress &&
549                 !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
550                 !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
551                 // Warn a user about the fact that a non-root user can't receive a
552                 // broadcast packet on *nix if the socket is bound on non-wildcard address.
553                 logger.warn(
554                         "A non-root user can't receive a broadcast packet if the socket " +
555                         "is not bound to a wildcard address; binding to a non-wildcard " +
556                         "address (" + localAddress + ") anyway as requested.");
557             }
558 
559             boolean wasActive = isActive();
560             try {
561                 doBind(localAddress);
562             } catch (Throwable t) {
563                 safeSetFailure(promise, t);
564                 closeIfClosed();
565                 return;
566             }
567 
568             if (!wasActive && isActive()) {
569                 invokeLater(new Runnable() {
570                     @Override
571                     public void run() {
572                         pipeline.fireChannelActive();
573                     }
574                 });
575             }
576 
577             safeSetSuccess(promise);
578         }
579 
580         @Override
581         public final void disconnect(final ChannelPromise promise) {
582             assertEventLoop();
583 
584             if (!promise.setUncancellable()) {
585                 return;
586             }
587 
588             boolean wasActive = isActive();
589             try {
590                 doDisconnect();
591                 // Reset remoteAddress and localAddress
592                 remoteAddress = null;
593                 localAddress = null;
594             } catch (Throwable t) {
595                 safeSetFailure(promise, t);
596                 closeIfClosed();
597                 return;
598             }
599 
600             if (wasActive && !isActive()) {
601                 invokeLater(new Runnable() {
602                     @Override
603                     public void run() {
604                         pipeline.fireChannelInactive();
605                     }
606                 });
607             }
608 
609             safeSetSuccess(promise);
610             closeIfClosed(); // doDisconnect() might have closed the channel
611         }
612 
613         @Override
614         public void close(final ChannelPromise promise) {
615             assertEventLoop();
616 
617             ClosedChannelException closedChannelException =
618                     StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
619             close(promise, closedChannelException, closedChannelException, false);
620         }
621 
622         /**
623          * Shutdown the output portion of the corresponding {@link Channel}.
624          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
625          */
626         public final void shutdownOutput(final ChannelPromise promise) {
627             assertEventLoop();
628             shutdownOutput(promise, null);
629         }
630 
631         /**
632          * Shutdown the output portion of the corresponding {@link Channel}.
633          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
634          * @param cause The cause which may provide rational for the shutdown.
635          */
636         private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
637             if (!promise.setUncancellable()) {
638                 return;
639             }
640 
641             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
642             if (outboundBuffer == null) {
643                 promise.setFailure(new ClosedChannelException());
644                 return;
645             }
646             this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
647 
648             final Throwable shutdownCause = cause == null ?
649                     new ChannelOutputShutdownException("Channel output shutdown") :
650                     new ChannelOutputShutdownException("Channel output shutdown", cause);
651 
652             // When a side enables SO_LINGER and calls showdownOutput(...) to start TCP half-closure
653             // we can not call doDeregister here because we should ensure this side in fin_wait2 state
654             // can still receive and process the data which is send by another side in the close_wait state。
655             // See https://github.com/netty/netty/issues/11981
656             try {
657                 // The shutdown function does not block regardless of the SO_LINGER setting on the socket
658                 // so we don't need to use GlobalEventExecutor to execute the shutdown
659                 doShutdownOutput();
660                 promise.setSuccess();
661             } catch (Throwable err) {
662                 promise.setFailure(err);
663             } finally {
664                 closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
665             }
666         }
667 
668         private void closeOutboundBufferForShutdown(
669                 ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
670             buffer.failFlushed(cause, false);
671             buffer.close(cause, true);
672             pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
673         }
674 
675         private void close(final ChannelPromise promise, final Throwable cause,
676                            final ClosedChannelException closeCause, final boolean notify) {
677             if (!promise.setUncancellable()) {
678                 return;
679             }
680 
681             if (closeInitiated) {
682                 if (closeFuture.isDone()) {
683                     // Closed already.
684                     safeSetSuccess(promise);
685                 } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
686                     // This means close() was called before so we just register a listener and return
687                     closeFuture.addListener(new ChannelFutureListener() {
688                         @Override
689                         public void operationComplete(ChannelFuture future) throws Exception {
690                             promise.setSuccess();
691                         }
692                     });
693                 }
694                 return;
695             }
696 
697             closeInitiated = true;
698 
699             final boolean wasActive = isActive();
700             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
701             this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
702             Executor closeExecutor = prepareToClose();
703             if (closeExecutor != null) {
704                 closeExecutor.execute(new Runnable() {
705                     @Override
706                     public void run() {
707                         try {
708                             // Execute the close.
709                             doClose0(promise);
710                         } finally {
711                             // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
712                             invokeLater(new Runnable() {
713                                 @Override
714                                 public void run() {
715                                     if (outboundBuffer != null) {
716                                         // Fail all the queued messages
717                                         outboundBuffer.failFlushed(cause, notify);
718                                         outboundBuffer.close(closeCause);
719                                     }
720                                     fireChannelInactiveAndDeregister(wasActive);
721                                 }
722                             });
723                         }
724                     }
725                 });
726             } else {
727                 try {
728                     // Close the channel and fail the queued messages in all cases.
729                     doClose0(promise);
730                 } finally {
731                     if (outboundBuffer != null) {
732                         // Fail all the queued messages.
733                         outboundBuffer.failFlushed(cause, notify);
734                         outboundBuffer.close(closeCause);
735                     }
736                 }
737                 if (inFlush0) {
738                     invokeLater(new Runnable() {
739                         @Override
740                         public void run() {
741                             fireChannelInactiveAndDeregister(wasActive);
742                         }
743                     });
744                 } else {
745                     fireChannelInactiveAndDeregister(wasActive);
746                 }
747             }
748         }
749 
750         private void doClose0(ChannelPromise promise) {
751             try {
752                 doClose();
753                 closeFuture.setClosed();
754                 safeSetSuccess(promise);
755             } catch (Throwable t) {
756                 closeFuture.setClosed();
757                 safeSetFailure(promise, t);
758             }
759         }
760 
761         private void fireChannelInactiveAndDeregister(final boolean wasActive) {
762             deregister(voidPromise(), wasActive && !isActive());
763         }
764 
765         @Override
766         public final void closeForcibly() {
767             assertEventLoop();
768 
769             try {
770                 doClose();
771             } catch (Exception e) {
772                 logger.warn("Failed to close a channel.", e);
773             }
774         }
775 
776         @Override
777         public final void deregister(final ChannelPromise promise) {
778             assertEventLoop();
779 
780             deregister(promise, false);
781         }
782 
783         private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
784             if (!promise.setUncancellable()) {
785                 return;
786             }
787 
788             if (!registered) {
789                 safeSetSuccess(promise);
790                 return;
791             }
792 
793             // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
794             // we need to ensure we do the actual deregister operation later. This is needed as for example,
795             // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
796             // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
797             // the deregister operation this could lead to have a handler invoked by different EventLoop and so
798             // threads.
799             //
800             // See:
801             // https://github.com/netty/netty/issues/4435
802             invokeLater(new Runnable() {
803                 @Override
804                 public void run() {
805                     try {
806                         doDeregister();
807                     } catch (Throwable t) {
808                         logger.warn("Unexpected exception occurred while deregistering a channel.", t);
809                     } finally {
810                         if (fireChannelInactive) {
811                             pipeline.fireChannelInactive();
812                         }
813                         // Some transports like local and AIO does not allow the deregistration of
814                         // an open channel.  Their doDeregister() calls close(). Consequently,
815                         // close() calls deregister() again - no need to fire channelUnregistered, so check
816                         // if it was registered.
817                         if (registered) {
818                             registered = false;
819                             pipeline.fireChannelUnregistered();
820                         }
821                         safeSetSuccess(promise);
822                     }
823                 }
824             });
825         }
826 
827         @Override
828         public final void beginRead() {
829             assertEventLoop();
830 
831             try {
832                 doBeginRead();
833             } catch (final Exception e) {
834                 invokeLater(new Runnable() {
835                     @Override
836                     public void run() {
837                         pipeline.fireExceptionCaught(e);
838                     }
839                 });
840                 close(voidPromise());
841             }
842         }
843 
844         @Override
845         public final void write(Object msg, ChannelPromise promise) {
846             assertEventLoop();
847 
848             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
849             if (outboundBuffer == null) {
850                 try {
851                     // release message now to prevent resource-leak
852                     ReferenceCountUtil.release(msg);
853                 } finally {
854                     // If the outboundBuffer is null we know the channel was closed and so
855                     // need to fail the future right away. If it is not null the handling of the rest
856                     // will be done in flush0()
857                     // See https://github.com/netty/netty/issues/2362
858                     safeSetFailure(promise,
859                             newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
860                 }
861                 return;
862             }
863 
864             int size;
865             try {
866                 msg = filterOutboundMessage(msg);
867                 size = pipeline.estimatorHandle().size(msg);
868                 if (size < 0) {
869                     size = 0;
870                 }
871             } catch (Throwable t) {
872                 try {
873                     ReferenceCountUtil.release(msg);
874                 } finally {
875                     safeSetFailure(promise, t);
876                 }
877                 return;
878             }
879 
880             outboundBuffer.addMessage(msg, size, promise);
881         }
882 
883         @Override
884         public final void flush() {
885             assertEventLoop();
886 
887             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
888             if (outboundBuffer == null) {
889                 return;
890             }
891 
892             outboundBuffer.addFlush();
893             flush0();
894         }
895 
896         @SuppressWarnings("deprecation")
897         protected void flush0() {
898             if (inFlush0) {
899                 // Avoid re-entrance
900                 return;
901             }
902 
903             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
904             if (outboundBuffer == null || outboundBuffer.isEmpty()) {
905                 return;
906             }
907 
908             inFlush0 = true;
909 
910             // Mark all pending write requests as failure if the channel is inactive.
911             if (!isActive()) {
912                 try {
913                     // Check if we need to generate the exception at all.
914                     if (!outboundBuffer.isEmpty()) {
915                         if (isOpen()) {
916                             outboundBuffer.failFlushed(new NotYetConnectedException(), true);
917                         } else {
918                             // Do not trigger channelWritabilityChanged because the channel is closed already.
919                             outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
920                         }
921                     }
922                 } finally {
923                     inFlush0 = false;
924                 }
925                 return;
926             }
927 
928             try {
929                 doWrite(outboundBuffer);
930             } catch (Throwable t) {
931                 handleWriteError(t);
932             } finally {
933                 inFlush0 = false;
934             }
935         }
936 
937         protected final void handleWriteError(Throwable t) {
938             if (t instanceof IOException && config().isAutoClose()) {
939                 /**
940                  * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
941                  * failing all flushed messages and also ensure the actual close of the underlying transport
942                  * will happen before the promises are notified.
943                  *
944                  * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
945                  * may still return {@code true} even if the channel should be closed as result of the exception.
946                  */
947                 initialCloseCause = t;
948                 close(voidPromise(), t, newClosedChannelException(t, "flush0()"), false);
949             } else {
950                 try {
951                     shutdownOutput(voidPromise(), t);
952                 } catch (Throwable t2) {
953                     initialCloseCause = t;
954                     close(voidPromise(), t2, newClosedChannelException(t, "flush0()"), false);
955                 }
956             }
957         }
958 
959         private ClosedChannelException newClosedChannelException(Throwable cause, String method) {
960             ClosedChannelException exception =
961                     StacklessClosedChannelException.newInstance(AbstractChannel.AbstractUnsafe.class, method);
962             if (cause != null) {
963                 exception.initCause(cause);
964             }
965             return exception;
966         }
967 
968         @Override
969         public final ChannelPromise voidPromise() {
970             assertEventLoop();
971 
972             return unsafeVoidPromise;
973         }
974 
975         protected final boolean ensureOpen(ChannelPromise promise) {
976             if (isOpen()) {
977                 return true;
978             }
979 
980             safeSetFailure(promise, newClosedChannelException(initialCloseCause, "ensureOpen(ChannelPromise)"));
981             return false;
982         }
983 
984         /**
985          * Marks the specified {@code promise} as success.  If the {@code promise} is done already, log a message.
986          */
987         protected final void safeSetSuccess(ChannelPromise promise) {
988             if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
989                 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
990             }
991         }
992 
993         /**
994          * Marks the specified {@code promise} as failure.  If the {@code promise} is done already, log a message.
995          */
996         protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
997             if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
998                 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
999             }
1000         }
1001 
1002         protected final void closeIfClosed() {
1003             if (isOpen()) {
1004                 return;
1005             }
1006             close(voidPromise());
1007         }
1008 
1009         private void invokeLater(Runnable task) {
1010             try {
1011                 // This method is used by outbound operation implementations to trigger an inbound event later.
1012                 // They do not trigger an inbound event immediately because an outbound operation might have been
1013                 // triggered by another inbound event handler method.  If fired immediately, the call stack
1014                 // will look like this for example:
1015                 //
1016                 //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
1017                 //   -> handlerA.ctx.close()
1018                 //      -> channel.unsafe.close()
1019                 //         -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
1020                 //
1021                 // which means the execution of two inbound handler methods of the same handler overlap undesirably.
1022                 eventLoop().execute(task);
1023             } catch (RejectedExecutionException e) {
1024                 logger.warn("Can't invoke task later as EventLoop rejected it", e);
1025             }
1026         }
1027 
1028         /**
1029          * Appends the remote address to the message of the exceptions caused by connection attempt failure.
1030          */
1031         protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
1032             if (cause instanceof ConnectException) {
1033                 return new AnnotatedConnectException((ConnectException) cause, remoteAddress);
1034             }
1035             if (cause instanceof NoRouteToHostException) {
1036                 return new AnnotatedNoRouteToHostException((NoRouteToHostException) cause, remoteAddress);
1037             }
1038             if (cause instanceof SocketException) {
1039                 return new AnnotatedSocketException((SocketException) cause, remoteAddress);
1040             }
1041 
1042             return cause;
1043         }
1044 
1045         /**
1046          * Prepares to close the {@link Channel}. If this method returns an {@link Executor}, the
1047          * caller must call the {@link Executor#execute(Runnable)} method with a task that calls
1048          * {@link #doClose()} on the returned {@link Executor}. If this method returns {@code null},
1049          * {@link #doClose()} must be called from the caller thread. (i.e. {@link EventLoop})
1050          */
1051         protected Executor prepareToClose() {
1052             return null;
1053         }
1054     }
1055 
1056     /**
1057      * Return {@code true} if the given {@link EventLoop} is compatible with this instance.
1058      */
1059     protected abstract boolean isCompatible(EventLoop loop);
1060 
1061     /**
1062      * Returns the {@link SocketAddress} which is bound locally.
1063      */
1064     protected abstract SocketAddress localAddress0();
1065 
1066     /**
1067      * Return the {@link SocketAddress} which the {@link Channel} is connected to.
1068      */
1069     protected abstract SocketAddress remoteAddress0();
1070 
1071     /**
1072      * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
1073      *
1074      * Sub-classes may override this method
1075      */
1076     protected void doRegister() throws Exception {
1077         // NOOP
1078     }
1079 
1080     /**
1081      * Bind the {@link Channel} to the {@link SocketAddress}
1082      */
1083     protected abstract void doBind(SocketAddress localAddress) throws Exception;
1084 
1085     /**
1086      * Disconnect this {@link Channel} from its remote peer
1087      */
1088     protected abstract void doDisconnect() throws Exception;
1089 
1090     /**
1091      * Close the {@link Channel}
1092      */
1093     protected abstract void doClose() throws Exception;
1094 
1095     /**
1096      * Called when conditions justify shutting down the output portion of the channel. This may happen if a write
1097      * operation throws an exception.
1098      */
1099     protected void doShutdownOutput() throws Exception {
1100         doClose();
1101     }
1102 
1103     /**
1104      * Deregister the {@link Channel} from its {@link EventLoop}.
1105      *
1106      * Sub-classes may override this method
1107      */
1108     protected void doDeregister() throws Exception {
1109         // NOOP
1110     }
1111 
1112     /**
1113      * Schedule a read operation.
1114      */
1115     protected abstract void doBeginRead() throws Exception;
1116 
1117     /**
1118      * Flush the content of the given buffer to the remote peer.
1119      */
1120     protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
1121 
1122     /**
1123      * Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that
1124      * the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer)
1125      */
1126     protected Object filterOutboundMessage(Object msg) throws Exception {
1127         return msg;
1128     }
1129 
1130     protected void validateFileRegion(DefaultFileRegion region, long position) throws IOException {
1131         DefaultFileRegion.validate(region, position);
1132     }
1133 
1134     static final class CloseFuture extends DefaultChannelPromise {
1135 
1136         CloseFuture(AbstractChannel ch) {
1137             super(ch);
1138         }
1139 
1140         @Override
1141         public ChannelPromise setSuccess() {
1142             throw new IllegalStateException();
1143         }
1144 
1145         @Override
1146         public ChannelPromise setFailure(Throwable cause) {
1147             throw new IllegalStateException();
1148         }
1149 
1150         @Override
1151         public boolean trySuccess() {
1152             throw new IllegalStateException();
1153         }
1154 
1155         @Override
1156         public boolean tryFailure(Throwable cause) {
1157             throw new IllegalStateException();
1158         }
1159 
1160         boolean setClosed() {
1161             return super.trySuccess();
1162         }
1163     }
1164 
1165     private static final class AnnotatedConnectException extends ConnectException {
1166 
1167         private static final long serialVersionUID = 3901958112696433556L;
1168 
1169         AnnotatedConnectException(ConnectException exception, SocketAddress remoteAddress) {
1170             super(exception.getMessage() + ": " + remoteAddress);
1171             initCause(exception);
1172         }
1173 
1174         // Suppress a warning since this method doesn't need synchronization
1175         @Override
1176         public Throwable fillInStackTrace() {
1177             return this;
1178         }
1179     }
1180 
1181     private static final class AnnotatedNoRouteToHostException extends NoRouteToHostException {
1182 
1183         private static final long serialVersionUID = -6801433937592080623L;
1184 
1185         AnnotatedNoRouteToHostException(NoRouteToHostException exception, SocketAddress remoteAddress) {
1186             super(exception.getMessage() + ": " + remoteAddress);
1187             initCause(exception);
1188         }
1189 
1190         // Suppress a warning since this method doesn't need synchronization
1191         @Override
1192         public Throwable fillInStackTrace() {
1193             return this;
1194         }
1195     }
1196 
1197     private static final class AnnotatedSocketException extends SocketException {
1198 
1199         private static final long serialVersionUID = 3896743275010454039L;
1200 
1201         AnnotatedSocketException(SocketException exception, SocketAddress remoteAddress) {
1202             super(exception.getMessage() + ": " + remoteAddress);
1203             initCause(exception);
1204         }
1205 
1206         // Suppress a warning since this method doesn't need synchronization
1207         @Override
1208         public Throwable fillInStackTrace() {
1209             return this;
1210         }
1211     }
1212 }