View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.channel.socket.ChannelOutputShutdownEvent;
20  import io.netty.channel.socket.ChannelOutputShutdownException;
21  import io.netty.util.DefaultAttributeMap;
22  import io.netty.util.ReferenceCountUtil;
23  import io.netty.util.internal.PlatformDependent;
24  import io.netty.util.internal.ThrowableUtil;
25  import io.netty.util.internal.UnstableApi;
26  import io.netty.util.internal.logging.InternalLogger;
27  import io.netty.util.internal.logging.InternalLoggerFactory;
28  
29  import java.io.IOException;
30  import java.net.ConnectException;
31  import java.net.InetSocketAddress;
32  import java.net.NoRouteToHostException;
33  import java.net.SocketAddress;
34  import java.net.SocketException;
35  import java.nio.channels.ClosedChannelException;
36  import java.nio.channels.NotYetConnectedException;
37  import java.util.concurrent.Executor;
38  import java.util.concurrent.RejectedExecutionException;
39  
40  /**
41   * A skeletal {@link Channel} implementation.
42   */
43  public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
44  
45      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
46  
47      private static final ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
48              new ExtendedClosedChannelException(null), AbstractUnsafe.class, "ensureOpen(...)");
49      private static final ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
50              new ClosedChannelException(), AbstractUnsafe.class, "close(...)");
51      private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
52              new ExtendedClosedChannelException(null), AbstractUnsafe.class, "write(...)");
53      private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
54              new ExtendedClosedChannelException(null), AbstractUnsafe.class, "flush0()");
55      private static final NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION = ThrowableUtil.unknownStackTrace(
56              new NotYetConnectedException(), AbstractUnsafe.class, "flush0()");
57  
58      private final Channel parent;
59      private final ChannelId id;
60      private final Unsafe unsafe;
61      private final DefaultChannelPipeline pipeline;
62      private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
63      private final CloseFuture closeFuture = new CloseFuture(this);
64  
65      private volatile SocketAddress localAddress;
66      private volatile SocketAddress remoteAddress;
67      private volatile EventLoop eventLoop;
68      private volatile boolean registered;
69      private boolean closeInitiated;
70      private Throwable initialCloseCause;
71  
72      /** Cache for the string representation of this channel */
73      private boolean strValActive;
74      private String strVal;
75  
76      /**
77       * Creates a new instance.
78       *
79       * @param parent
80       *        the parent of this channel. {@code null} if there's no parent.
81       */
82      protected AbstractChannel(Channel parent) {
83          this.parent = parent;
84          id = newId();
85          unsafe = newUnsafe();
86          pipeline = newChannelPipeline();
87      }
88  
89      /**
90       * Creates a new instance.
91       *
92       * @param parent
93       *        the parent of this channel. {@code null} if there's no parent.
94       */
95      protected AbstractChannel(Channel parent, ChannelId id) {
96          this.parent = parent;
97          this.id = id;
98          unsafe = newUnsafe();
99          pipeline = newChannelPipeline();
100     }
101 
102     @Override
103     public final ChannelId id() {
104         return id;
105     }
106 
107     /**
108      * Returns a new {@link DefaultChannelId} instance. Subclasses may override this method to assign custom
109      * {@link ChannelId}s to {@link Channel}s that use the {@link AbstractChannel#AbstractChannel(Channel)} constructor.
110      */
111     protected ChannelId newId() {
112         return DefaultChannelId.newInstance();
113     }
114 
115     /**
116      * Returns a new {@link DefaultChannelPipeline} instance.
117      */
118     protected DefaultChannelPipeline newChannelPipeline() {
119         return new DefaultChannelPipeline(this);
120     }
121 
122     @Override
123     public boolean isWritable() {
124         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
125         return buf != null && buf.isWritable();
126     }
127 
128     @Override
129     public long bytesBeforeUnwritable() {
130         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
131         // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
132         // We should be consistent with that here.
133         return buf != null ? buf.bytesBeforeUnwritable() : 0;
134     }
135 
136     @Override
137     public long bytesBeforeWritable() {
138         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
139         // isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
140         // We should be consistent with that here.
141         return buf != null ? buf.bytesBeforeWritable() : Long.MAX_VALUE;
142     }
143 
144     @Override
145     public Channel parent() {
146         return parent;
147     }
148 
149     @Override
150     public ChannelPipeline pipeline() {
151         return pipeline;
152     }
153 
154     @Override
155     public ByteBufAllocator alloc() {
156         return config().getAllocator();
157     }
158 
159     @Override
160     public EventLoop eventLoop() {
161         EventLoop eventLoop = this.eventLoop;
162         if (eventLoop == null) {
163             throw new IllegalStateException("channel not registered to an event loop");
164         }
165         return eventLoop;
166     }
167 
168     @Override
169     public SocketAddress localAddress() {
170         SocketAddress localAddress = this.localAddress;
171         if (localAddress == null) {
172             try {
173                 this.localAddress = localAddress = unsafe().localAddress();
174             } catch (Error e) {
175                 throw e;
176             } catch (Throwable t) {
177                 // Sometimes fails on a closed socket in Windows.
178                 return null;
179             }
180         }
181         return localAddress;
182     }
183 
184     /**
185      * @deprecated no use-case for this.
186      */
187     @Deprecated
188     protected void invalidateLocalAddress() {
189         localAddress = null;
190     }
191 
192     @Override
193     public SocketAddress remoteAddress() {
194         SocketAddress remoteAddress = this.remoteAddress;
195         if (remoteAddress == null) {
196             try {
197                 this.remoteAddress = remoteAddress = unsafe().remoteAddress();
198             } catch (Error e) {
199                 throw e;
200             } catch (Throwable t) {
201                 // Sometimes fails on a closed socket in Windows.
202                 return null;
203             }
204         }
205         return remoteAddress;
206     }
207 
208     /**
209      * @deprecated no use-case for this.
210      */
211     @Deprecated
212     protected void invalidateRemoteAddress() {
213         remoteAddress = null;
214     }
215 
216     @Override
217     public boolean isRegistered() {
218         return registered;
219     }
220 
221     @Override
222     public ChannelFuture bind(SocketAddress localAddress) {
223         return pipeline.bind(localAddress);
224     }
225 
226     @Override
227     public ChannelFuture connect(SocketAddress remoteAddress) {
228         return pipeline.connect(remoteAddress);
229     }
230 
231     @Override
232     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
233         return pipeline.connect(remoteAddress, localAddress);
234     }
235 
236     @Override
237     public ChannelFuture disconnect() {
238         return pipeline.disconnect();
239     }
240 
241     @Override
242     public ChannelFuture close() {
243         return pipeline.close();
244     }
245 
246     @Override
247     public ChannelFuture deregister() {
248         return pipeline.deregister();
249     }
250 
251     @Override
252     public Channel flush() {
253         pipeline.flush();
254         return this;
255     }
256 
257     @Override
258     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
259         return pipeline.bind(localAddress, promise);
260     }
261 
262     @Override
263     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
264         return pipeline.connect(remoteAddress, promise);
265     }
266 
267     @Override
268     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
269         return pipeline.connect(remoteAddress, localAddress, promise);
270     }
271 
272     @Override
273     public ChannelFuture disconnect(ChannelPromise promise) {
274         return pipeline.disconnect(promise);
275     }
276 
277     @Override
278     public ChannelFuture close(ChannelPromise promise) {
279         return pipeline.close(promise);
280     }
281 
282     @Override
283     public ChannelFuture deregister(ChannelPromise promise) {
284         return pipeline.deregister(promise);
285     }
286 
287     @Override
288     public Channel read() {
289         pipeline.read();
290         return this;
291     }
292 
293     @Override
294     public ChannelFuture write(Object msg) {
295         return pipeline.write(msg);
296     }
297 
298     @Override
299     public ChannelFuture write(Object msg, ChannelPromise promise) {
300         return pipeline.write(msg, promise);
301     }
302 
303     @Override
304     public ChannelFuture writeAndFlush(Object msg) {
305         return pipeline.writeAndFlush(msg);
306     }
307 
308     @Override
309     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
310         return pipeline.writeAndFlush(msg, promise);
311     }
312 
313     @Override
314     public ChannelPromise newPromise() {
315         return pipeline.newPromise();
316     }
317 
318     @Override
319     public ChannelProgressivePromise newProgressivePromise() {
320         return pipeline.newProgressivePromise();
321     }
322 
323     @Override
324     public ChannelFuture newSucceededFuture() {
325         return pipeline.newSucceededFuture();
326     }
327 
328     @Override
329     public ChannelFuture newFailedFuture(Throwable cause) {
330         return pipeline.newFailedFuture(cause);
331     }
332 
333     @Override
334     public ChannelFuture closeFuture() {
335         return closeFuture;
336     }
337 
338     @Override
339     public Unsafe unsafe() {
340         return unsafe;
341     }
342 
343     /**
344      * Create a new {@link AbstractUnsafe} instance which will be used for the life-time of the {@link Channel}
345      */
346     protected abstract AbstractUnsafe newUnsafe();
347 
348     /**
349      * Returns the ID of this channel.
350      */
351     @Override
352     public final int hashCode() {
353         return id.hashCode();
354     }
355 
356     /**
357      * Returns {@code true} if and only if the specified object is identical
358      * with this channel (i.e: {@code this == o}).
359      */
360     @Override
361     public final boolean equals(Object o) {
362         return this == o;
363     }
364 
365     @Override
366     public final int compareTo(Channel o) {
367         if (this == o) {
368             return 0;
369         }
370 
371         return id().compareTo(o.id());
372     }
373 
374     /**
375      * Returns the {@link String} representation of this channel.  The returned
376      * string contains the {@linkplain #hashCode() ID}, {@linkplain #localAddress() local address},
377      * and {@linkplain #remoteAddress() remote address} of this channel for
378      * easier identification.
379      */
380     @Override
381     public String toString() {
382         boolean active = isActive();
383         if (strValActive == active && strVal != null) {
384             return strVal;
385         }
386 
387         SocketAddress remoteAddr = remoteAddress();
388         SocketAddress localAddr = localAddress();
389         if (remoteAddr != null) {
390             StringBuilder buf = new StringBuilder(96)
391                 .append("[id: 0x")
392                 .append(id.asShortText())
393                 .append(", L:")
394                 .append(localAddr)
395                 .append(active? " - " : " ! ")
396                 .append("R:")
397                 .append(remoteAddr)
398                 .append(']');
399             strVal = buf.toString();
400         } else if (localAddr != null) {
401             StringBuilder buf = new StringBuilder(64)
402                 .append("[id: 0x")
403                 .append(id.asShortText())
404                 .append(", L:")
405                 .append(localAddr)
406                 .append(']');
407             strVal = buf.toString();
408         } else {
409             StringBuilder buf = new StringBuilder(16)
410                 .append("[id: 0x")
411                 .append(id.asShortText())
412                 .append(']');
413             strVal = buf.toString();
414         }
415 
416         strValActive = active;
417         return strVal;
418     }
419 
420     @Override
421     public final ChannelPromise voidPromise() {
422         return pipeline.voidPromise();
423     }
424 
425     /**
426      * {@link Unsafe} implementation which sub-classes must extend and use.
427      */
428     protected abstract class AbstractUnsafe implements Unsafe {
429 
430         private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
431         private RecvByteBufAllocator.Handle recvHandle;
432         private boolean inFlush0;
433         /** true if the channel has never been registered, false otherwise */
434         private boolean neverRegistered = true;
435 
436         private void assertEventLoop() {
437             assert !registered || eventLoop.inEventLoop();
438         }
439 
440         @Override
441         public RecvByteBufAllocator.Handle recvBufAllocHandle() {
442             if (recvHandle == null) {
443                 recvHandle = config().getRecvByteBufAllocator().newHandle();
444             }
445             return recvHandle;
446         }
447 
448         @Override
449         public final ChannelOutboundBuffer outboundBuffer() {
450             return outboundBuffer;
451         }
452 
453         @Override
454         public final SocketAddress localAddress() {
455             return localAddress0();
456         }
457 
458         @Override
459         public final SocketAddress remoteAddress() {
460             return remoteAddress0();
461         }
462 
463         @Override
464         public final void register(EventLoop eventLoop, final ChannelPromise promise) {
465             if (eventLoop == null) {
466                 throw new NullPointerException("eventLoop");
467             }
468             if (isRegistered()) {
469                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
470                 return;
471             }
472             if (!isCompatible(eventLoop)) {
473                 promise.setFailure(
474                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
475                 return;
476             }
477 
478             AbstractChannel.this.eventLoop = eventLoop;
479 
480             if (eventLoop.inEventLoop()) {
481                 register0(promise);
482             } else {
483                 try {
484                     eventLoop.execute(new Runnable() {
485                         @Override
486                         public void run() {
487                             register0(promise);
488                         }
489                     });
490                 } catch (Throwable t) {
491                     logger.warn(
492                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",
493                             AbstractChannel.this, t);
494                     closeForcibly();
495                     closeFuture.setClosed();
496                     safeSetFailure(promise, t);
497                 }
498             }
499         }
500 
501         private void register0(ChannelPromise promise) {
502             try {
503                 // check if the channel is still open as it could be closed in the mean time when the register
504                 // call was outside of the eventLoop
505                 if (!promise.setUncancellable() || !ensureOpen(promise)) {
506                     return;
507                 }
508                 boolean firstRegistration = neverRegistered;
509                 doRegister();
510                 neverRegistered = false;
511                 registered = true;
512 
513                 // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
514                 // user may already fire events through the pipeline in the ChannelFutureListener.
515                 pipeline.invokeHandlerAddedIfNeeded();
516 
517                 safeSetSuccess(promise);
518                 pipeline.fireChannelRegistered();
519                 // Only fire a channelActive if the channel has never been registered. This prevents firing
520                 // multiple channel actives if the channel is deregistered and re-registered.
521                 if (isActive()) {
522                     if (firstRegistration) {
523                         pipeline.fireChannelActive();
524                     } else if (config().isAutoRead()) {
525                         // This channel was registered before and autoRead() is set. This means we need to begin read
526                         // again so that we process inbound data.
527                         //
528                         // See https://github.com/netty/netty/issues/4805
529                         beginRead();
530                     }
531                 }
532             } catch (Throwable t) {
533                 // Close the channel directly to avoid FD leak.
534                 closeForcibly();
535                 closeFuture.setClosed();
536                 safeSetFailure(promise, t);
537             }
538         }
539 
540         @Override
541         public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
542             assertEventLoop();
543 
544             if (!promise.setUncancellable() || !ensureOpen(promise)) {
545                 return;
546             }
547 
548             // See: https://github.com/netty/netty/issues/576
549             if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
550                 localAddress instanceof InetSocketAddress &&
551                 !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
552                 !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
553                 // Warn a user about the fact that a non-root user can't receive a
554                 // broadcast packet on *nix if the socket is bound on non-wildcard address.
555                 logger.warn(
556                         "A non-root user can't receive a broadcast packet if the socket " +
557                         "is not bound to a wildcard address; binding to a non-wildcard " +
558                         "address (" + localAddress + ") anyway as requested.");
559             }
560 
561             boolean wasActive = isActive();
562             try {
563                 doBind(localAddress);
564             } catch (Throwable t) {
565                 safeSetFailure(promise, t);
566                 closeIfClosed();
567                 return;
568             }
569 
570             if (!wasActive && isActive()) {
571                 invokeLater(new Runnable() {
572                     @Override
573                     public void run() {
574                         pipeline.fireChannelActive();
575                     }
576                 });
577             }
578 
579             safeSetSuccess(promise);
580         }
581 
582         @Override
583         public final void disconnect(final ChannelPromise promise) {
584             assertEventLoop();
585 
586             if (!promise.setUncancellable()) {
587                 return;
588             }
589 
590             boolean wasActive = isActive();
591             try {
592                 doDisconnect();
593             } catch (Throwable t) {
594                 safeSetFailure(promise, t);
595                 closeIfClosed();
596                 return;
597             }
598 
599             if (wasActive && !isActive()) {
600                 invokeLater(new Runnable() {
601                     @Override
602                     public void run() {
603                         pipeline.fireChannelInactive();
604                     }
605                 });
606             }
607 
608             safeSetSuccess(promise);
609             closeIfClosed(); // doDisconnect() might have closed the channel
610         }
611 
612         @Override
613         public final void close(final ChannelPromise promise) {
614             assertEventLoop();
615 
616             close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
617         }
618 
619         /**
620          * Shutdown the output portion of the corresponding {@link Channel}.
621          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
622          */
623         @UnstableApi
624         public final void shutdownOutput(final ChannelPromise promise) {
625             assertEventLoop();
626             shutdownOutput(promise, null);
627         }
628 
629         /**
630          * Shutdown the output portion of the corresponding {@link Channel}.
631          * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
632          * @param cause The cause which may provide rational for the shutdown.
633          */
634         private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
635             if (!promise.setUncancellable()) {
636                 return;
637             }
638 
639             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
640             if (outboundBuffer == null) {
641                 promise.setFailure(CLOSE_CLOSED_CHANNEL_EXCEPTION);
642                 return;
643             }
644             this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
645 
646             final Throwable shutdownCause = cause == null ?
647                     new ChannelOutputShutdownException("Channel output shutdown") :
648                     new ChannelOutputShutdownException("Channel output shutdown", cause);
649             Executor closeExecutor = prepareToClose();
650             if (closeExecutor != null) {
651                 closeExecutor.execute(new Runnable() {
652                     @Override
653                     public void run() {
654                         try {
655                             // Execute the shutdown.
656                             doShutdownOutput();
657                             promise.setSuccess();
658                         } catch (Throwable err) {
659                             promise.setFailure(err);
660                         } finally {
661                             // Dispatch to the EventLoop
662                             eventLoop().execute(new Runnable() {
663                                 @Override
664                                 public void run() {
665                                     closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
666                                 }
667                             });
668                         }
669                     }
670                 });
671             } else {
672                 try {
673                     // Execute the shutdown.
674                     doShutdownOutput();
675                     promise.setSuccess();
676                 } catch (Throwable err) {
677                     promise.setFailure(err);
678                 } finally {
679                     closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
680                 }
681             }
682         }
683 
684         private void closeOutboundBufferForShutdown(
685                 ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
686             buffer.failFlushed(cause, false);
687             buffer.close(cause, true);
688             pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
689         }
690 
691         private void close(final ChannelPromise promise, final Throwable cause,
692                            final ClosedChannelException closeCause, final boolean notify) {
693             if (!promise.setUncancellable()) {
694                 return;
695             }
696 
697             if (closeInitiated) {
698                 if (closeFuture.isDone()) {
699                     // Closed already.
700                     safeSetSuccess(promise);
701                 } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
702                     // This means close() was called before so we just register a listener and return
703                     closeFuture.addListener(new ChannelFutureListener() {
704                         @Override
705                         public void operationComplete(ChannelFuture future) throws Exception {
706                             promise.setSuccess();
707                         }
708                     });
709                 }
710                 return;
711             }
712 
713             closeInitiated = true;
714 
715             final boolean wasActive = isActive();
716             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
717             this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
718             Executor closeExecutor = prepareToClose();
719             if (closeExecutor != null) {
720                 closeExecutor.execute(new Runnable() {
721                     @Override
722                     public void run() {
723                         try {
724                             // Execute the close.
725                             doClose0(promise);
726                         } finally {
727                             // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
728                             invokeLater(new Runnable() {
729                                 @Override
730                                 public void run() {
731                                     if (outboundBuffer != null) {
732                                         // Fail all the queued messages
733                                         outboundBuffer.failFlushed(cause, notify);
734                                         outboundBuffer.close(closeCause);
735                                     }
736                                     fireChannelInactiveAndDeregister(wasActive);
737                                 }
738                             });
739                         }
740                     }
741                 });
742             } else {
743                 try {
744                     // Close the channel and fail the queued messages in all cases.
745                     doClose0(promise);
746                 } finally {
747                     if (outboundBuffer != null) {
748                         // Fail all the queued messages.
749                         outboundBuffer.failFlushed(cause, notify);
750                         outboundBuffer.close(closeCause);
751                     }
752                 }
753                 if (inFlush0) {
754                     invokeLater(new Runnable() {
755                         @Override
756                         public void run() {
757                             fireChannelInactiveAndDeregister(wasActive);
758                         }
759                     });
760                 } else {
761                     fireChannelInactiveAndDeregister(wasActive);
762                 }
763             }
764         }
765 
766         private void doClose0(ChannelPromise promise) {
767             try {
768                 doClose();
769                 closeFuture.setClosed();
770                 safeSetSuccess(promise);
771             } catch (Throwable t) {
772                 closeFuture.setClosed();
773                 safeSetFailure(promise, t);
774             }
775         }
776 
777         private void fireChannelInactiveAndDeregister(final boolean wasActive) {
778             deregister(voidPromise(), wasActive && !isActive());
779         }
780 
781         @Override
782         public final void closeForcibly() {
783             assertEventLoop();
784 
785             try {
786                 doClose();
787             } catch (Exception e) {
788                 logger.warn("Failed to close a channel.", e);
789             }
790         }
791 
792         @Override
793         public final void deregister(final ChannelPromise promise) {
794             assertEventLoop();
795 
796             deregister(promise, false);
797         }
798 
799         private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
800             if (!promise.setUncancellable()) {
801                 return;
802             }
803 
804             if (!registered) {
805                 safeSetSuccess(promise);
806                 return;
807             }
808 
809             // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
810             // we need to ensure we do the actual deregister operation later. This is needed as for example,
811             // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
812             // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
813             // the deregister operation this could lead to have a handler invoked by different EventLoop and so
814             // threads.
815             //
816             // See:
817             // https://github.com/netty/netty/issues/4435
818             invokeLater(new Runnable() {
819                 @Override
820                 public void run() {
821                     try {
822                         doDeregister();
823                     } catch (Throwable t) {
824                         logger.warn("Unexpected exception occurred while deregistering a channel.", t);
825                     } finally {
826                         if (fireChannelInactive) {
827                             pipeline.fireChannelInactive();
828                         }
829                         // Some transports like local and AIO does not allow the deregistration of
830                         // an open channel.  Their doDeregister() calls close(). Consequently,
831                         // close() calls deregister() again - no need to fire channelUnregistered, so check
832                         // if it was registered.
833                         if (registered) {
834                             registered = false;
835                             pipeline.fireChannelUnregistered();
836                         }
837                         safeSetSuccess(promise);
838                     }
839                 }
840             });
841         }
842 
843         @Override
844         public final void beginRead() {
845             assertEventLoop();
846 
847             if (!isActive()) {
848                 return;
849             }
850 
851             try {
852                 doBeginRead();
853             } catch (final Exception e) {
854                 invokeLater(new Runnable() {
855                     @Override
856                     public void run() {
857                         pipeline.fireExceptionCaught(e);
858                     }
859                 });
860                 close(voidPromise());
861             }
862         }
863 
864         @Override
865         public final void write(Object msg, ChannelPromise promise) {
866             assertEventLoop();
867 
868             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
869             if (outboundBuffer == null) {
870                 // If the outboundBuffer is null we know the channel was closed and so
871                 // need to fail the future right away. If it is not null the handling of the rest
872                 // will be done in flush0()
873                 // See https://github.com/netty/netty/issues/2362
874                 safeSetFailure(promise, newWriteException(initialCloseCause));
875                 // release message now to prevent resource-leak
876                 ReferenceCountUtil.release(msg);
877                 return;
878             }
879 
880             int size;
881             try {
882                 msg = filterOutboundMessage(msg);
883                 size = pipeline.estimatorHandle().size(msg);
884                 if (size < 0) {
885                     size = 0;
886                 }
887             } catch (Throwable t) {
888                 safeSetFailure(promise, t);
889                 ReferenceCountUtil.release(msg);
890                 return;
891             }
892 
893             outboundBuffer.addMessage(msg, size, promise);
894         }
895 
896         @Override
897         public final void flush() {
898             assertEventLoop();
899 
900             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
901             if (outboundBuffer == null) {
902                 return;
903             }
904 
905             outboundBuffer.addFlush();
906             flush0();
907         }
908 
909         @SuppressWarnings("deprecation")
910         protected void flush0() {
911             if (inFlush0) {
912                 // Avoid re-entrance
913                 return;
914             }
915 
916             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
917             if (outboundBuffer == null || outboundBuffer.isEmpty()) {
918                 return;
919             }
920 
921             inFlush0 = true;
922 
923             // Mark all pending write requests as failure if the channel is inactive.
924             if (!isActive()) {
925                 try {
926                     if (isOpen()) {
927                         outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
928                     } else {
929                         // Do not trigger channelWritabilityChanged because the channel is closed already.
930                         outboundBuffer.failFlushed(newFlush0Exception(initialCloseCause), false);
931                     }
932                 } finally {
933                     inFlush0 = false;
934                 }
935                 return;
936             }
937 
938             try {
939                 doWrite(outboundBuffer);
940             } catch (Throwable t) {
941                 if (t instanceof IOException && config().isAutoClose()) {
942                     /**
943                      * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
944                      * failing all flushed messages and also ensure the actual close of the underlying transport
945                      * will happen before the promises are notified.
946                      *
947                      * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
948                      * may still return {@code true} even if the channel should be closed as result of the exception.
949                      */
950                     initialCloseCause = t;
951                     close(voidPromise(), t, newFlush0Exception(t), false);
952                 } else {
953                     try {
954                         shutdownOutput(voidPromise(), t);
955                     } catch (Throwable t2) {
956                         initialCloseCause = t;
957                         close(voidPromise(), t2, newFlush0Exception(t), false);
958                     }
959                 }
960             } finally {
961                 inFlush0 = false;
962             }
963         }
964 
965         private ClosedChannelException newWriteException(Throwable cause) {
966             if (cause == null) {
967                 return WRITE_CLOSED_CHANNEL_EXCEPTION;
968             }
969             return ThrowableUtil.unknownStackTrace(
970                     new ExtendedClosedChannelException(cause), AbstractUnsafe.class, "write(...)");
971         }
972 
973         private ClosedChannelException newFlush0Exception(Throwable cause) {
974             if (cause == null) {
975                 return FLUSH0_CLOSED_CHANNEL_EXCEPTION;
976             }
977             return ThrowableUtil.unknownStackTrace(
978                     new ExtendedClosedChannelException(cause), AbstractUnsafe.class, "flush0()");
979         }
980 
981         private ClosedChannelException newEnsureOpenException(Throwable cause) {
982             if (cause == null) {
983                 return ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION;
984             }
985             return ThrowableUtil.unknownStackTrace(
986                     new ExtendedClosedChannelException(cause), AbstractUnsafe.class, "ensureOpen(...)");
987         }
988 
989         @Override
990         public final ChannelPromise voidPromise() {
991             assertEventLoop();
992 
993             return unsafeVoidPromise;
994         }
995 
996         protected final boolean ensureOpen(ChannelPromise promise) {
997             if (isOpen()) {
998                 return true;
999             }
1000 
1001             safeSetFailure(promise, newEnsureOpenException(initialCloseCause));
1002             return false;
1003         }
1004 
1005         /**
1006          * Marks the specified {@code promise} as success.  If the {@code promise} is done already, log a message.
1007          */
1008         protected final void safeSetSuccess(ChannelPromise promise) {
1009             if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
1010                 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
1011             }
1012         }
1013 
1014         /**
1015          * Marks the specified {@code promise} as failure.  If the {@code promise} is done already, log a message.
1016          */
1017         protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
1018             if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
1019                 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
1020             }
1021         }
1022 
1023         protected final void closeIfClosed() {
1024             if (isOpen()) {
1025                 return;
1026             }
1027             close(voidPromise());
1028         }
1029 
1030         private void invokeLater(Runnable task) {
1031             try {
1032                 // This method is used by outbound operation implementations to trigger an inbound event later.
1033                 // They do not trigger an inbound event immediately because an outbound operation might have been
1034                 // triggered by another inbound event handler method.  If fired immediately, the call stack
1035                 // will look like this for example:
1036                 //
1037                 //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
1038                 //   -> handlerA.ctx.close()
1039                 //      -> channel.unsafe.close()
1040                 //         -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
1041                 //
1042                 // which means the execution of two inbound handler methods of the same handler overlap undesirably.
1043                 eventLoop().execute(task);
1044             } catch (RejectedExecutionException e) {
1045                 logger.warn("Can't invoke task later as EventLoop rejected it", e);
1046             }
1047         }
1048 
1049         /**
1050          * Appends the remote address to the message of the exceptions caused by connection attempt failure.
1051          */
1052         protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
1053             if (cause instanceof ConnectException) {
1054                 return new AnnotatedConnectException((ConnectException) cause, remoteAddress);
1055             }
1056             if (cause instanceof NoRouteToHostException) {
1057                 return new AnnotatedNoRouteToHostException((NoRouteToHostException) cause, remoteAddress);
1058             }
1059             if (cause instanceof SocketException) {
1060                 return new AnnotatedSocketException((SocketException) cause, remoteAddress);
1061             }
1062 
1063             return cause;
1064         }
1065 
1066         /**
1067          * Prepares to close the {@link Channel}. If this method returns an {@link Executor}, the
1068          * caller must call the {@link Executor#execute(Runnable)} method with a task that calls
1069          * {@link #doClose()} on the returned {@link Executor}. If this method returns {@code null},
1070          * {@link #doClose()} must be called from the caller thread. (i.e. {@link EventLoop})
1071          */
1072         protected Executor prepareToClose() {
1073             return null;
1074         }
1075     }
1076 
1077     /**
1078      * Return {@code true} if the given {@link EventLoop} is compatible with this instance.
1079      */
1080     protected abstract boolean isCompatible(EventLoop loop);
1081 
1082     /**
1083      * Returns the {@link SocketAddress} which is bound locally.
1084      */
1085     protected abstract SocketAddress localAddress0();
1086 
1087     /**
1088      * Return the {@link SocketAddress} which the {@link Channel} is connected to.
1089      */
1090     protected abstract SocketAddress remoteAddress0();
1091 
1092     /**
1093      * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
1094      *
1095      * Sub-classes may override this method
1096      */
1097     protected void doRegister() throws Exception {
1098         // NOOP
1099     }
1100 
1101     /**
1102      * Bind the {@link Channel} to the {@link SocketAddress}
1103      */
1104     protected abstract void doBind(SocketAddress localAddress) throws Exception;
1105 
1106     /**
1107      * Disconnect this {@link Channel} from its remote peer
1108      */
1109     protected abstract void doDisconnect() throws Exception;
1110 
1111     /**
1112      * Close the {@link Channel}
1113      */
1114     protected abstract void doClose() throws Exception;
1115 
1116     /**
1117      * Called when conditions justify shutting down the output portion of the channel. This may happen if a write
1118      * operation throws an exception.
1119      */
1120     @UnstableApi
1121     protected void doShutdownOutput() throws Exception {
1122         doClose();
1123     }
1124 
1125     /**
1126      * Deregister the {@link Channel} from its {@link EventLoop}.
1127      *
1128      * Sub-classes may override this method
1129      */
1130     protected void doDeregister() throws Exception {
1131         // NOOP
1132     }
1133 
1134     /**
1135      * Schedule a read operation.
1136      */
1137     protected abstract void doBeginRead() throws Exception;
1138 
1139     /**
1140      * Flush the content of the given buffer to the remote peer.
1141      */
1142     protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
1143 
1144     /**
1145      * Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that
1146      * the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer)
1147      */
1148     protected Object filterOutboundMessage(Object msg) throws Exception {
1149         return msg;
1150     }
1151 
1152     protected void validateFileRegion(DefaultFileRegion region, long position) throws IOException {
1153         DefaultFileRegion.validate(region, position);
1154     }
1155 
1156     static final class CloseFuture extends DefaultChannelPromise {
1157 
1158         CloseFuture(AbstractChannel ch) {
1159             super(ch);
1160         }
1161 
1162         @Override
1163         public ChannelPromise setSuccess() {
1164             throw new IllegalStateException();
1165         }
1166 
1167         @Override
1168         public ChannelPromise setFailure(Throwable cause) {
1169             throw new IllegalStateException();
1170         }
1171 
1172         @Override
1173         public boolean trySuccess() {
1174             throw new IllegalStateException();
1175         }
1176 
1177         @Override
1178         public boolean tryFailure(Throwable cause) {
1179             throw new IllegalStateException();
1180         }
1181 
1182         boolean setClosed() {
1183             return super.trySuccess();
1184         }
1185     }
1186 
1187     private static final class AnnotatedConnectException extends ConnectException {
1188 
1189         private static final long serialVersionUID = 3901958112696433556L;
1190 
1191         AnnotatedConnectException(ConnectException exception, SocketAddress remoteAddress) {
1192             super(exception.getMessage() + ": " + remoteAddress);
1193             initCause(exception);
1194             setStackTrace(exception.getStackTrace());
1195         }
1196 
1197         @Override
1198         public Throwable fillInStackTrace() {
1199             return this;
1200         }
1201     }
1202 
1203     private static final class AnnotatedNoRouteToHostException extends NoRouteToHostException {
1204 
1205         private static final long serialVersionUID = -6801433937592080623L;
1206 
1207         AnnotatedNoRouteToHostException(NoRouteToHostException exception, SocketAddress remoteAddress) {
1208             super(exception.getMessage() + ": " + remoteAddress);
1209             initCause(exception);
1210             setStackTrace(exception.getStackTrace());
1211         }
1212 
1213         @Override
1214         public Throwable fillInStackTrace() {
1215             return this;
1216         }
1217     }
1218 
1219     private static final class AnnotatedSocketException extends SocketException {
1220 
1221         private static final long serialVersionUID = 3896743275010454039L;
1222 
1223         AnnotatedSocketException(SocketException exception, SocketAddress remoteAddress) {
1224             super(exception.getMessage() + ": " + remoteAddress);
1225             initCause(exception);
1226             setStackTrace(exception.getStackTrace());
1227         }
1228 
1229         @Override
1230         public Throwable fillInStackTrace() {
1231             return this;
1232         }
1233     }
1234 }