View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.channel.socket.ChannelOutputShutdownEvent;
20  import io.netty.channel.socket.ChannelOutputShutdownException;
21  import io.netty.util.DefaultAttributeMap;
22  import io.netty.util.ReferenceCountUtil;
23  import io.netty.util.internal.ObjectUtil;
24  import io.netty.util.internal.PlatformDependent;
25  import io.netty.util.internal.UnstableApi;
26  import io.netty.util.internal.logging.InternalLogger;
27  import io.netty.util.internal.logging.InternalLoggerFactory;
28  
29  import java.io.IOException;
30  import java.net.ConnectException;
31  import java.net.InetSocketAddress;
32  import java.net.NoRouteToHostException;
33  import java.net.SocketAddress;
34  import java.net.SocketException;
35  import java.nio.channels.ClosedChannelException;
36  import java.nio.channels.NotYetConnectedException;
37  import java.util.concurrent.Executor;
38  import java.util.concurrent.RejectedExecutionException;
39  
40  /**
41   * A skeletal {@link Channel} implementation.
42   */
43  public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
44  
45      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
46  
47      private final Channel parent;
48      private final ChannelId id;
49      private final Unsafe unsafe;
50      private final DefaultChannelPipeline pipeline;
51      private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
52      private final CloseFuture closeFuture = new CloseFuture(this);
53  
54      private volatile SocketAddress localAddress;
55      private volatile SocketAddress remoteAddress;
56      private volatile EventLoop eventLoop;
57      private volatile boolean registered;
58      private boolean closeInitiated;
59      private Throwable initialCloseCause;
60  
61      /** Cache for the string representation of this channel */
62      private boolean strValActive;
63      private String strVal;
64  
65      /**
66       * Creates a new instance.
67       *
68       * @param parent
69       *        the parent of this channel. {@code null} if there's no parent.
70       */
71      protected AbstractChannel(Channel parent) {
72          this.parent = parent;
73          id = newId();
74          unsafe = newUnsafe();
75          pipeline = newChannelPipeline();
76      }
77  
78      /**
79       * Creates a new instance.
80       *
81       * @param parent
82       *        the parent of this channel. {@code null} if there's no parent.
83       */
84      protected AbstractChannel(Channel parent, ChannelId id) {
85          this.parent = parent;
86          this.id = id;
87          unsafe = newUnsafe();
88          pipeline = newChannelPipeline();
89      }
90  
91      protected final int maxMessagesPerWrite() {
92          ChannelConfig config = config();
93          if (config instanceof DefaultChannelConfig) {
94              return ((DefaultChannelConfig) config).getMaxMessagesPerWrite();
95          }
96          Integer value = config.getOption(ChannelOption.MAX_MESSAGES_PER_WRITE);
97          if (value == null) {
98              return Integer.MAX_VALUE;
99          }
100         return value;
101     }
102 
103     @Override
104     public final ChannelId id() {
105         return id;
106     }
107 
108     /**
109      * Returns a new {@link DefaultChannelId} instance. Subclasses may override this method to assign custom
110      * {@link ChannelId}s to {@link Channel}s that use the {@link AbstractChannel#AbstractChannel(Channel)} constructor.
111      */
112     protected ChannelId newId() {
113         return DefaultChannelId.newInstance();
114     }
115 
116     /**
117      * Returns a new {@link DefaultChannelPipeline} instance.
118      */
119     protected DefaultChannelPipeline newChannelPipeline() {
120         return new DefaultChannelPipeline(this);
121     }
122 
123     @Override
124     public boolean isWritable() {
125         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
126         return buf != null && buf.isWritable();
127     }
128 
129     @Override
130     public long bytesBeforeUnwritable() {
131         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
132         // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
133         // We should be consistent with that here.
134         return buf != null ? buf.bytesBeforeUnwritable() : 0;
135     }
136 
137     @Override
138     public long bytesBeforeWritable() {
139         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
140         // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
141         // We should be consistent with that here.
142         return buf != null ? buf.bytesBeforeWritable() : Long.MAX_VALUE;
143     }
144 
145     @Override
146     public Channel parent() {
147         return parent;
148     }
149 
150     @Override
151     public ChannelPipeline pipeline() {
152         return pipeline;
153     }
154 
155     @Override
156     public ByteBufAllocator alloc() {
157         return config().getAllocator();
158     }
159 
160     @Override
161     public EventLoop eventLoop() {
162         EventLoop eventLoop = this.eventLoop;
163         if (eventLoop == null) {
164             throw new IllegalStateException("channel not registered to an event loop");
165         }
166         return eventLoop;
167     }
168 
169     @Override
170     public SocketAddress localAddress() {
171         SocketAddress localAddress = this.localAddress;
172         if (localAddress == null) {
173             try {
174                 this.localAddress = localAddress = unsafe().localAddress();
175             } catch (Error e) {
176                 throw e;
177             } catch (Throwable t) {
178                 // Sometimes fails on a closed socket in Windows.
179                 return null;
180             }
181         }
182         return localAddress;
183     }
184 
185     /**
186      * @deprecated no use-case for this.
187      */
188     @Deprecated
189     protected void invalidateLocalAddress() {
190         localAddress = null;
191     }
192 
193     @Override
194     public SocketAddress remoteAddress() {
195         SocketAddress remoteAddress = this.remoteAddress;
196         if (remoteAddress == null) {
197             try {
198                 this.remoteAddress = remoteAddress = unsafe().remoteAddress();
199             } catch (Error e) {
200                 throw e;
201             } catch (Throwable t) {
202                 // Sometimes fails on a closed socket in Windows.
203                 return null;
204             }
205         }
206         return remoteAddress;
207     }
208 
209     /**
210      * @deprecated no use-case for this.
211      */
212     @Deprecated
213     protected void invalidateRemoteAddress() {
214         remoteAddress = null;
215     }
216 
217     @Override
218     public boolean isRegistered() {
219         return registered;
220     }
221 
222     @Override
223     public ChannelFuture bind(SocketAddress localAddress) {
224         return pipeline.bind(localAddress);
225     }
226 
227     @Override
228     public ChannelFuture connect(SocketAddress remoteAddress) {
229         return pipeline.connect(remoteAddress);
230     }
231 
232     @Override
233     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
234         return pipeline.connect(remoteAddress, localAddress);
235     }
236 
237     @Override
238     public ChannelFuture disconnect() {
239         return pipeline.disconnect();
240     }
241 
242     @Override
243     public ChannelFuture close() {
244         return pipeline.close();
245     }
246 
247     @Override
248     public ChannelFuture deregister() {
249         return pipeline.deregister();
250     }
251 
252     @Override
253     public Channel flush() {
254         pipeline.flush();
255         return this;
256     }
257 
258     @Override
259     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
260         return pipeline.bind(localAddress, promise);
261     }
262 
263     @Override
264     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
265         return pipeline.connect(remoteAddress, promise);
266     }
267 
268     @Override
269     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
270         return pipeline.connect(remoteAddress, localAddress, promise);
271     }
272 
273     @Override
274     public ChannelFuture disconnect(ChannelPromise promise) {
275         return pipeline.disconnect(promise);
276     }
277 
278     @Override
279     public ChannelFuture close(ChannelPromise promise) {
280         return pipeline.close(promise);
281     }
282 
283     @Override
284     public ChannelFuture deregister(ChannelPromise promise) {
285         return pipeline.deregister(promise);
286     }
287 
288     @Override
289     public Channel read() {
290         pipeline.read();
291         return this;
292     }
293 
294     @Override
295     public ChannelFuture write(Object msg) {
296         return pipeline.write(msg);
297     }
298 
299     @Override
300     public ChannelFuture write(Object msg, ChannelPromise promise) {
301         return pipeline.write(msg, promise);
302     }
303 
304     @Override
305     public ChannelFuture writeAndFlush(Object msg) {
306         return pipeline.writeAndFlush(msg);
307     }
308 
309     @Override
310     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
311         return pipeline.writeAndFlush(msg, promise);
312     }
313 
314     @Override
315     public ChannelPromise newPromise() {
316         return pipeline.newPromise();
317     }
318 
319     @Override
320     public ChannelProgressivePromise newProgressivePromise() {
321         return pipeline.newProgressivePromise();
322     }
323 
324     @Override
325     public ChannelFuture newSucceededFuture() {
326         return pipeline.newSucceededFuture();
327     }
328 
329     @Override
330     public ChannelFuture newFailedFuture(Throwable cause) {
331         return pipeline.newFailedFuture(cause);
332     }
333 
334     @Override
335     public ChannelFuture closeFuture() {
336         return closeFuture;
337     }
338 
339     @Override
340     public Unsafe unsafe() {
341         return unsafe;
342     }
343 
344     /**
345      * Create a new {@link AbstractUnsafe} instance which will be used for the life-time of the {@link Channel}
346      */
347     protected abstract AbstractUnsafe newUnsafe();
348 
349     /**
350      * Returns the ID of this channel.
351      */
352     @Override
353     public final int hashCode() {
354         return id.hashCode();
355     }
356 
357     /**
358      * Returns {@code true} if and only if the specified object is identical
359      * with this channel (i.e: {@code this == o}).
360      */
361     @Override
362     public final boolean equals(Object o) {
363         return this == o;
364     }
365 
366     @Override
367     public final int compareTo(Channel o) {
368         if (this == o) {
369             return 0;
370         }
371 
372         return id().compareTo(o.id());
373     }
374 
375     /**
376      * Returns the {@link String} representation of this channel.  The returned
377      * string contains the {@linkplain #hashCode() ID}, {@linkplain #localAddress() local address},
378      * and {@linkplain #remoteAddress() remote address} of this channel for
379      * easier identification.
380      */
381     @Override
382     public String toString() {
383         boolean active = isActive();
384         if (strValActive == active && strVal != null) {
385             return strVal;
386         }
387 
388         SocketAddress remoteAddr = remoteAddress();
389         SocketAddress localAddr = localAddress();
390         if (remoteAddr != null) {
391             StringBuilder buf = new StringBuilder(96)
392                 .append("[id: 0x")
393                 .append(id.asShortText())
394                 .append(", L:")
395                 .append(localAddr)
396                 .append(active? " - " : " ! ")
397                 .append("R:")
398                 .append(remoteAddr)
399                 .append(']');
400             strVal = buf.toString();
401         } else if (localAddr != null) {
402             StringBuilder buf = new StringBuilder(64)
403                 .append("[id: 0x")
404                 .append(id.asShortText())
405                 .append(", L:")
406                 .append(localAddr)
407                 .append(']');
408             strVal = buf.toString();
409         } else {
410             StringBuilder buf = new StringBuilder(16)
411                 .append("[id: 0x")
412                 .append(id.asShortText())
413                 .append(']');
414             strVal = buf.toString();
415         }
416 
417         strValActive = active;
418         return strVal;
419     }
420 
421     @Override
422     public final ChannelPromise voidPromise() {
423         return pipeline.voidPromise();
424     }
425 
426     /**
427      * {@link Unsafe} implementation which sub-classes must extend and use.
428      */
429     protected abstract class AbstractUnsafe implements Unsafe {
430 
431         private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
432         private RecvByteBufAllocator.Handle recvHandle;
433         private boolean inFlush0;
434         /** true if the channel has never been registered, false otherwise */
435         private boolean neverRegistered = true;
436 
437         private void assertEventLoop() {
438             assert !registered || eventLoop.inEventLoop();
439         }
440 
441         @Override
442         public RecvByteBufAllocator.Handle recvBufAllocHandle() {
443             if (recvHandle == null) {
444                 recvHandle = config().getRecvByteBufAllocator().newHandle();
445             }
446             return recvHandle;
447         }
448 
449         @Override
450         public final ChannelOutboundBuffer outboundBuffer() {
451             return outboundBuffer;
452         }
453 
454         @Override
455         public final SocketAddress localAddress() {
456             return localAddress0();
457         }
458 
459         @Override
460         public final SocketAddress remoteAddress() {
461             return remoteAddress0();
462         }
463 
464         @Override
465         public final void register(EventLoop eventLoop, final ChannelPromise promise) {
466             ObjectUtil.checkNotNull(eventLoop, "eventLoop");
467             if (isRegistered()) {
468                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
469                 return;
470             }
471             if (!isCompatible(eventLoop)) {
472                 promise.setFailure(
473                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
474                 return;
475             }
476 
477             AbstractChannel.this.eventLoop = eventLoop;
478 
479             if (eventLoop.inEventLoop()) {
480                 register0(promise);
481             } else {
482                 try {
483                     eventLoop.execute(new Runnable() {
484                         @Override
485                         public void run() {
486                             register0(promise);
487                         }
488                     });
489                 } catch (Throwable t) {
490                     logger.warn(
491                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",
492                             AbstractChannel.this, t);
493                     closeForcibly();
494                     closeFuture.setClosed();
495                     safeSetFailure(promise, t);
496                 }
497             }
498         }
499 
500         private void register0(ChannelPromise promise) {
501             try {
502                 // check if the channel is still open as it could be closed in the mean time when the register
503                 // call was outside of the eventLoop
504                 if (!promise.setUncancellable() || !ensureOpen(promise)) {
505                     return;
506                 }
507                 boolean firstRegistration = neverRegistered;
508                 doRegister();
509                 neverRegistered = false;
510                 registered = true;
511 
512                 // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
513                 // user may already fire events through the pipeline in the ChannelFutureListener.
514                 pipeline.invokeHandlerAddedIfNeeded();
515 
516                 safeSetSuccess(promise);
517                 pipeline.fireChannelRegistered();
518                 // Only fire a channelActive if the channel has never been registered. This prevents firing
519                 // multiple channel actives if the channel is deregistered and re-registered.
520                 if (isActive()) {
521                     if (firstRegistration) {
522                         pipeline.fireChannelActive();
523                     } else if (config().isAutoRead()) {
524                         // This channel was registered before and autoRead() is set. This means we need to begin read
525                         // again so that we process inbound data.
526                         //
527                         // See https://github.com/netty/netty/issues/4805
528                         beginRead();
529                     }
530                 }
531             } catch (Throwable t) {
532                 // Close the channel directly to avoid FD leak.
533                 closeForcibly();
534                 closeFuture.setClosed();
535                 safeSetFailure(promise, t);
536             }
537         }
538 
539         @Override
540         public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
541             assertEventLoop();
542 
543             if (!promise.setUncancellable() || !ensureOpen(promise)) {
544                 return;
545             }
546 
547             // See: https://github.com/netty/netty/issues/576
548             if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
549                 localAddress instanceof InetSocketAddress &&
550                 !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
551                 !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
552                 // Warn a user about the fact that a non-root user can't receive a
553                 // broadcast packet on *nix if the socket is bound on non-wildcard address.
554                 logger.warn(
555                         "A non-root user can't receive a broadcast packet if the socket " +
556                         "is not bound to a wildcard address; binding to a non-wildcard " +
557                         "address (" + localAddress + ") anyway as requested.");
558             }
559 
560             boolean wasActive = isActive();
561             try {
562                 doBind(localAddress);
563             } catch (Throwable t) {
564                 safeSetFailure(promise, t);
565                 closeIfClosed();
566                 return;
567             }
568 
569             if (!wasActive && isActive()) {
570                 invokeLater(new Runnable() {
571                     @Override
572                     public void run() {
573                         pipeline.fireChannelActive();
574                     }
575                 });
576             }
577 
578             safeSetSuccess(promise);
579         }
580 
581         @Override
582         public final void disconnect(final ChannelPromise promise) {
583             assertEventLoop();
584 
585             if (!promise.setUncancellable()) {
586                 return;
587             }
588 
589             boolean wasActive = isActive();
590             try {
591                 doDisconnect();
592                 // Reset remoteAddress and localAddress
593                 remoteAddress = null;
594                 localAddress = null;
595             } catch (Throwable t) {
596                 safeSetFailure(promise, t);
597                 closeIfClosed();
598                 return;
599             }
600 
601             if (wasActive && !isActive()) {
602                 invokeLater(new Runnable() {
603                     @Override
604                     public void run() {
605                         pipeline.fireChannelInactive();
606                     }
607                 });
608             }
609 
610             safeSetSuccess(promise);
611             closeIfClosed(); // doDisconnect() might have closed the channel
612         }
613 
614         @Override
615         public void close(final ChannelPromise promise) {
616             assertEventLoop();
617 
618             ClosedChannelException closedChannelException =
619                     StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
620             close(promise, closedChannelException, closedChannelException, false);
621         }
622 
623         /**
624          * Shutdown the output portion of the corresponding {@link Channel}.
625          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
626          */
627         @UnstableApi
628         public final void shutdownOutput(final ChannelPromise promise) {
629             assertEventLoop();
630             shutdownOutput(promise, null);
631         }
632 
633         /**
634          * Shutdown the output portion of the corresponding {@link Channel}.
635          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
636          * @param cause The cause which may provide rational for the shutdown.
637          */
638         private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
639             if (!promise.setUncancellable()) {
640                 return;
641             }
642 
643             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
644             if (outboundBuffer == null) {
645                 promise.setFailure(new ClosedChannelException());
646                 return;
647             }
648             this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
649 
650             final Throwable shutdownCause = cause == null ?
651                     new ChannelOutputShutdownException("Channel output shutdown") :
652                     new ChannelOutputShutdownException("Channel output shutdown", cause);
653 
654             // When a side enables SO_LINGER and calls showdownOutput(...) to start TCP half-closure
655             // we can not call doDeregister here because we should ensure this side in fin_wait2 state
656             // can still receive and process the data which is send by another side in the close_wait state。
657             // See https://github.com/netty/netty/issues/11981
658             try {
659                 // The shutdown function does not block regardless of the SO_LINGER setting on the socket
660                 // so we don't need to use GlobalEventExecutor to execute the shutdown
661                 doShutdownOutput();
662                 promise.setSuccess();
663             } catch (Throwable err) {
664                 promise.setFailure(err);
665             } finally {
666                 closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
667             }
668         }
669 
670         private void closeOutboundBufferForShutdown(
671                 ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
672             buffer.failFlushed(cause, false);
673             buffer.close(cause, true);
674             pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
675         }
676 
677         private void close(final ChannelPromise promise, final Throwable cause,
678                            final ClosedChannelException closeCause, final boolean notify) {
679             if (!promise.setUncancellable()) {
680                 return;
681             }
682 
683             if (closeInitiated) {
684                 if (closeFuture.isDone()) {
685                     // Closed already.
686                     safeSetSuccess(promise);
687                 } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
688                     // This means close() was called before so we just register a listener and return
689                     closeFuture.addListener(new ChannelFutureListener() {
690                         @Override
691                         public void operationComplete(ChannelFuture future) throws Exception {
692                             promise.setSuccess();
693                         }
694                     });
695                 }
696                 return;
697             }
698 
699             closeInitiated = true;
700 
701             final boolean wasActive = isActive();
702             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
703             this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
704             Executor closeExecutor = prepareToClose();
705             if (closeExecutor != null) {
706                 closeExecutor.execute(new Runnable() {
707                     @Override
708                     public void run() {
709                         try {
710                             // Execute the close.
711                             doClose0(promise);
712                         } finally {
713                             // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
714                             invokeLater(new Runnable() {
715                                 @Override
716                                 public void run() {
717                                     if (outboundBuffer != null) {
718                                         // Fail all the queued messages
719                                         outboundBuffer.failFlushed(cause, notify);
720                                         outboundBuffer.close(closeCause);
721                                     }
722                                     fireChannelInactiveAndDeregister(wasActive);
723                                 }
724                             });
725                         }
726                     }
727                 });
728             } else {
729                 try {
730                     // Close the channel and fail the queued messages in all cases.
731                     doClose0(promise);
732                 } finally {
733                     if (outboundBuffer != null) {
734                         // Fail all the queued messages.
735                         outboundBuffer.failFlushed(cause, notify);
736                         outboundBuffer.close(closeCause);
737                     }
738                 }
739                 if (inFlush0) {
740                     invokeLater(new Runnable() {
741                         @Override
742                         public void run() {
743                             fireChannelInactiveAndDeregister(wasActive);
744                         }
745                     });
746                 } else {
747                     fireChannelInactiveAndDeregister(wasActive);
748                 }
749             }
750         }
751 
752         private void doClose0(ChannelPromise promise) {
753             try {
754                 doClose();
755                 closeFuture.setClosed();
756                 safeSetSuccess(promise);
757             } catch (Throwable t) {
758                 closeFuture.setClosed();
759                 safeSetFailure(promise, t);
760             }
761         }
762 
763         private void fireChannelInactiveAndDeregister(final boolean wasActive) {
764             deregister(voidPromise(), wasActive && !isActive());
765         }
766 
767         @Override
768         public final void closeForcibly() {
769             assertEventLoop();
770 
771             try {
772                 doClose();
773             } catch (Exception e) {
774                 logger.warn("Failed to close a channel.", e);
775             }
776         }
777 
778         @Override
779         public final void deregister(final ChannelPromise promise) {
780             assertEventLoop();
781 
782             deregister(promise, false);
783         }
784 
785         private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
786             if (!promise.setUncancellable()) {
787                 return;
788             }
789 
790             if (!registered) {
791                 safeSetSuccess(promise);
792                 return;
793             }
794 
795             // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
796             // we need to ensure we do the actual deregister operation later. This is needed as for example,
797             // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
798             // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
799             // the deregister operation this could lead to have a handler invoked by different EventLoop and so
800             // threads.
801             //
802             // See:
803             // https://github.com/netty/netty/issues/4435
804             invokeLater(new Runnable() {
805                 @Override
806                 public void run() {
807                     try {
808                         doDeregister();
809                     } catch (Throwable t) {
810                         logger.warn("Unexpected exception occurred while deregistering a channel.", t);
811                     } finally {
812                         if (fireChannelInactive) {
813                             pipeline.fireChannelInactive();
814                         }
815                         // Some transports like local and AIO does not allow the deregistration of
816                         // an open channel.  Their doDeregister() calls close(). Consequently,
817                         // close() calls deregister() again - no need to fire channelUnregistered, so check
818                         // if it was registered.
819                         if (registered) {
820                             registered = false;
821                             pipeline.fireChannelUnregistered();
822                         }
823                         safeSetSuccess(promise);
824                     }
825                 }
826             });
827         }
828 
829         @Override
830         public final void beginRead() {
831             assertEventLoop();
832 
833             try {
834                 doBeginRead();
835             } catch (final Exception e) {
836                 invokeLater(new Runnable() {
837                     @Override
838                     public void run() {
839                         pipeline.fireExceptionCaught(e);
840                     }
841                 });
842                 close(voidPromise());
843             }
844         }
845 
846         @Override
847         public final void write(Object msg, ChannelPromise promise) {
848             assertEventLoop();
849 
850             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
851             if (outboundBuffer == null) {
852                 try {
853                     // release message now to prevent resource-leak
854                     ReferenceCountUtil.release(msg);
855                 } finally {
856                     // If the outboundBuffer is null we know the channel was closed and so
857                     // need to fail the future right away. If it is not null the handling of the rest
858                     // will be done in flush0()
859                     // See https://github.com/netty/netty/issues/2362
860                     safeSetFailure(promise,
861                             newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
862                 }
863                 return;
864             }
865 
866             int size;
867             try {
868                 msg = filterOutboundMessage(msg);
869                 size = pipeline.estimatorHandle().size(msg);
870                 if (size < 0) {
871                     size = 0;
872                 }
873             } catch (Throwable t) {
874                 try {
875                     ReferenceCountUtil.release(msg);
876                 } finally {
877                     safeSetFailure(promise, t);
878                 }
879                 return;
880             }
881 
882             outboundBuffer.addMessage(msg, size, promise);
883         }
884 
885         @Override
886         public final void flush() {
887             assertEventLoop();
888 
889             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
890             if (outboundBuffer == null) {
891                 return;
892             }
893 
894             outboundBuffer.addFlush();
895             flush0();
896         }
897 
898         @SuppressWarnings("deprecation")
899         protected void flush0() {
900             if (inFlush0) {
901                 // Avoid re-entrance
902                 return;
903             }
904 
905             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
906             if (outboundBuffer == null || outboundBuffer.isEmpty()) {
907                 return;
908             }
909 
910             inFlush0 = true;
911 
912             // Mark all pending write requests as failure if the channel is inactive.
913             if (!isActive()) {
914                 try {
915                     // Check if we need to generate the exception at all.
916                     if (!outboundBuffer.isEmpty()) {
917                         if (isOpen()) {
918                             outboundBuffer.failFlushed(new NotYetConnectedException(), true);
919                         } else {
920                             // Do not trigger channelWritabilityChanged because the channel is closed already.
921                             outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
922                         }
923                     }
924                 } finally {
925                     inFlush0 = false;
926                 }
927                 return;
928             }
929 
930             try {
931                 doWrite(outboundBuffer);
932             } catch (Throwable t) {
933                 handleWriteError(t);
934             } finally {
935                 inFlush0 = false;
936             }
937         }
938 
939         protected final void handleWriteError(Throwable t) {
940             if (t instanceof IOException && config().isAutoClose()) {
941                 /**
942                  * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
943                  * failing all flushed messages and also ensure the actual close of the underlying transport
944                  * will happen before the promises are notified.
945                  *
946                  * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
947                  * may still return {@code true} even if the channel should be closed as result of the exception.
948                  */
949                 initialCloseCause = t;
950                 close(voidPromise(), t, newClosedChannelException(t, "flush0()"), false);
951             } else {
952                 try {
953                     shutdownOutput(voidPromise(), t);
954                 } catch (Throwable t2) {
955                     initialCloseCause = t;
956                     close(voidPromise(), t2, newClosedChannelException(t, "flush0()"), false);
957                 }
958             }
959         }
960 
961         private ClosedChannelException newClosedChannelException(Throwable cause, String method) {
962             ClosedChannelException exception =
963                     StacklessClosedChannelException.newInstance(AbstractChannel.AbstractUnsafe.class, method);
964             if (cause != null) {
965                 exception.initCause(cause);
966             }
967             return exception;
968         }
969 
970         @Override
971         public final ChannelPromise voidPromise() {
972             assertEventLoop();
973 
974             return unsafeVoidPromise;
975         }
976 
977         protected final boolean ensureOpen(ChannelPromise promise) {
978             if (isOpen()) {
979                 return true;
980             }
981 
982             safeSetFailure(promise, newClosedChannelException(initialCloseCause, "ensureOpen(ChannelPromise)"));
983             return false;
984         }
985 
986         /**
987          * Marks the specified {@code promise} as success.  If the {@code promise} is done already, log a message.
988          */
989         protected final void safeSetSuccess(ChannelPromise promise) {
990             if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
991                 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
992             }
993         }
994 
995         /**
996          * Marks the specified {@code promise} as failure.  If the {@code promise} is done already, log a message.
997          */
998         protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
999             if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
1000                 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
1001             }
1002         }
1003 
1004         protected final void closeIfClosed() {
1005             if (isOpen()) {
1006                 return;
1007             }
1008             close(voidPromise());
1009         }
1010 
1011         private void invokeLater(Runnable task) {
1012             try {
1013                 // This method is used by outbound operation implementations to trigger an inbound event later.
1014                 // They do not trigger an inbound event immediately because an outbound operation might have been
1015                 // triggered by another inbound event handler method.  If fired immediately, the call stack
1016                 // will look like this for example:
1017                 //
1018                 //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
1019                 //   -> handlerA.ctx.close()
1020                 //      -> channel.unsafe.close()
1021                 //         -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
1022                 //
1023                 // which means the execution of two inbound handler methods of the same handler overlap undesirably.
1024                 eventLoop().execute(task);
1025             } catch (RejectedExecutionException e) {
1026                 logger.warn("Can't invoke task later as EventLoop rejected it", e);
1027             }
1028         }
1029 
1030         /**
1031          * Appends the remote address to the message of the exceptions caused by connection attempt failure.
1032          */
1033         protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
1034             if (cause instanceof ConnectException) {
1035                 return new AnnotatedConnectException((ConnectException) cause, remoteAddress);
1036             }
1037             if (cause instanceof NoRouteToHostException) {
1038                 return new AnnotatedNoRouteToHostException((NoRouteToHostException) cause, remoteAddress);
1039             }
1040             if (cause instanceof SocketException) {
1041                 return new AnnotatedSocketException((SocketException) cause, remoteAddress);
1042             }
1043 
1044             return cause;
1045         }
1046 
1047         /**
1048          * Prepares to close the {@link Channel}. If this method returns an {@link Executor}, the
1049          * caller must call the {@link Executor#execute(Runnable)} method with a task that calls
1050          * {@link #doClose()} on the returned {@link Executor}. If this method returns {@code null},
1051          * {@link #doClose()} must be called from the caller thread. (i.e. {@link EventLoop})
1052          */
1053         protected Executor prepareToClose() {
1054             return null;
1055         }
1056     }
1057 
1058     /**
1059      * Return {@code true} if the given {@link EventLoop} is compatible with this instance.
1060      */
1061     protected abstract boolean isCompatible(EventLoop loop);
1062 
1063     /**
1064      * Returns the {@link SocketAddress} which is bound locally.
1065      */
1066     protected abstract SocketAddress localAddress0();
1067 
1068     /**
1069      * Return the {@link SocketAddress} which the {@link Channel} is connected to.
1070      */
1071     protected abstract SocketAddress remoteAddress0();
1072 
1073     /**
1074      * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
1075      *
1076      * Sub-classes may override this method
1077      */
1078     protected void doRegister() throws Exception {
1079         // NOOP
1080     }
1081 
1082     /**
1083      * Bind the {@link Channel} to the {@link SocketAddress}
1084      */
1085     protected abstract void doBind(SocketAddress localAddress) throws Exception;
1086 
1087     /**
1088      * Disconnect this {@link Channel} from its remote peer
1089      */
1090     protected abstract void doDisconnect() throws Exception;
1091 
1092     /**
1093      * Close the {@link Channel}
1094      */
1095     protected abstract void doClose() throws Exception;
1096 
1097     /**
1098      * Called when conditions justify shutting down the output portion of the channel. This may happen if a write
1099      * operation throws an exception.
1100      */
1101     @UnstableApi
1102     protected void doShutdownOutput() throws Exception {
1103         doClose();
1104     }
1105 
1106     /**
1107      * Deregister the {@link Channel} from its {@link EventLoop}.
1108      *
1109      * Sub-classes may override this method
1110      */
1111     protected void doDeregister() throws Exception {
1112         // NOOP
1113     }
1114 
1115     /**
1116      * Schedule a read operation.
1117      */
1118     protected abstract void doBeginRead() throws Exception;
1119 
1120     /**
1121      * Flush the content of the given buffer to the remote peer.
1122      */
1123     protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
1124 
1125     /**
1126      * Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that
1127      * the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer)
1128      */
1129     protected Object filterOutboundMessage(Object msg) throws Exception {
1130         return msg;
1131     }
1132 
1133     protected void validateFileRegion(DefaultFileRegion region, long position) throws IOException {
1134         DefaultFileRegion.validate(region, position);
1135     }
1136 
1137     static final class CloseFuture extends DefaultChannelPromise {
1138 
1139         CloseFuture(AbstractChannel ch) {
1140             super(ch);
1141         }
1142 
1143         @Override
1144         public ChannelPromise setSuccess() {
1145             throw new IllegalStateException();
1146         }
1147 
1148         @Override
1149         public ChannelPromise setFailure(Throwable cause) {
1150             throw new IllegalStateException();
1151         }
1152 
1153         @Override
1154         public boolean trySuccess() {
1155             throw new IllegalStateException();
1156         }
1157 
1158         @Override
1159         public boolean tryFailure(Throwable cause) {
1160             throw new IllegalStateException();
1161         }
1162 
1163         boolean setClosed() {
1164             return super.trySuccess();
1165         }
1166     }
1167 
1168     private static final class AnnotatedConnectException extends ConnectException {
1169 
1170         private static final long serialVersionUID = 3901958112696433556L;
1171 
1172         AnnotatedConnectException(ConnectException exception, SocketAddress remoteAddress) {
1173             super(exception.getMessage() + ": " + remoteAddress);
1174             initCause(exception);
1175         }
1176 
1177         // Suppress a warning since this method doesn't need synchronization
1178         @Override
1179         public Throwable fillInStackTrace() {   // lgtm[java/non-sync-override]
1180             return this;
1181         }
1182     }
1183 
1184     private static final class AnnotatedNoRouteToHostException extends NoRouteToHostException {
1185 
1186         private static final long serialVersionUID = -6801433937592080623L;
1187 
1188         AnnotatedNoRouteToHostException(NoRouteToHostException exception, SocketAddress remoteAddress) {
1189             super(exception.getMessage() + ": " + remoteAddress);
1190             initCause(exception);
1191         }
1192 
1193         // Suppress a warning since this method doesn't need synchronization
1194         @Override
1195         public Throwable fillInStackTrace() {   // lgtm[java/non-sync-override]
1196             return this;
1197         }
1198     }
1199 
1200     private static final class AnnotatedSocketException extends SocketException {
1201 
1202         private static final long serialVersionUID = 3896743275010454039L;
1203 
1204         AnnotatedSocketException(SocketException exception, SocketAddress remoteAddress) {
1205             super(exception.getMessage() + ": " + remoteAddress);
1206             initCause(exception);
1207         }
1208 
1209         // Suppress a warning since this method doesn't need synchronization
1210         @Override
1211         public Throwable fillInStackTrace() {   // lgtm[java/non-sync-override]
1212             return this;
1213         }
1214     }
1215 }