View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.util.DefaultAttributeMap;
20  import io.netty.util.ReferenceCountUtil;
21  import io.netty.util.internal.EmptyArrays;
22  import io.netty.util.internal.OneTimeTask;
23  import io.netty.util.internal.PlatformDependent;
24  import io.netty.util.internal.logging.InternalLogger;
25  import io.netty.util.internal.logging.InternalLoggerFactory;
26  
27  import java.net.ConnectException;
28  import java.net.InetSocketAddress;
29  import java.net.NoRouteToHostException;
30  import java.net.SocketAddress;
31  import java.net.SocketException;
32  import java.nio.channels.ClosedChannelException;
33  import java.nio.channels.NotYetConnectedException;
34  import java.util.concurrent.Executor;
35  import java.util.concurrent.RejectedExecutionException;
36  
37  /**
38   * A skeletal {@link Channel} implementation.
39   */
40  public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
41  
42      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
43  
44      static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
45      static final NotYetConnectedException NOT_YET_CONNECTED_EXCEPTION = new NotYetConnectedException();
46  
47      static {
48          CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
49          NOT_YET_CONNECTED_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
50      }
51  
52      private MessageSizeEstimator.Handle estimatorHandle;
53  
54      private final Channel parent;
55      private final ChannelId id;
56      private final Unsafe unsafe;
57      private final DefaultChannelPipeline pipeline;
58      private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);
59      private final VoidChannelPromise voidPromise = new VoidChannelPromise(this, true);
60      private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
61      private final CloseFuture closeFuture = new CloseFuture(this);
62  
63      private volatile SocketAddress localAddress;
64      private volatile SocketAddress remoteAddress;
65      private volatile PausableChannelEventLoop eventLoop;
66      private volatile boolean registered;
67  
68      /** Cache for the string representation of this channel */
69      private boolean strValActive;
70      private String strVal;
71  
72      /**
73       * Creates a new instance.
74       *
75       * @param parent
76       *        the parent of this channel. {@code null} if there's no parent.
77       */
78      protected AbstractChannel(Channel parent) {
79          this.parent = parent;
80          id = DefaultChannelId.newInstance();
81          unsafe = newUnsafe();
82          pipeline = new DefaultChannelPipeline(this);
83      }
84  
85      /**
86       * Creates a new instance.
87       *
88       * @param parent
89       *        the parent of this channel. {@code null} if there's no parent.
90       */
91      protected AbstractChannel(Channel parent, ChannelId id) {
92          this.parent = parent;
93          this.id = id;
94          unsafe = newUnsafe();
95          pipeline = new DefaultChannelPipeline(this);
96      }
97  
98      @Override
99      public final ChannelId id() {
100         return id;
101     }
102 
103     @Override
104     public boolean isWritable() {
105         ChannelOutboundBuffer buf = unsafe.outboundBuffer();
106         return buf != null && buf.isWritable();
107     }
108 
109     @Override
110     public Channel parent() {
111         return parent;
112     }
113 
114     @Override
115     public ChannelPipeline pipeline() {
116         return pipeline;
117     }
118 
119     @Override
120     public ByteBufAllocator alloc() {
121         return config().getAllocator();
122     }
123 
124     @Override
125     public final EventLoop eventLoop() {
126         EventLoop eventLoop = this.eventLoop;
127         if (eventLoop == null) {
128             throw new IllegalStateException("channel not registered to an event loop");
129         }
130         return eventLoop;
131     }
132 
133     @Override
134     public SocketAddress localAddress() {
135         SocketAddress localAddress = this.localAddress;
136         if (localAddress == null) {
137             try {
138                 this.localAddress = localAddress = unsafe().localAddress();
139             } catch (Throwable t) {
140                 // Sometimes fails on a closed socket in Windows.
141                 return null;
142             }
143         }
144         return localAddress;
145     }
146 
147     protected void invalidateLocalAddress() {
148         localAddress = null;
149     }
150 
151     @Override
152     public SocketAddress remoteAddress() {
153         SocketAddress remoteAddress = this.remoteAddress;
154         if (remoteAddress == null) {
155             try {
156                 this.remoteAddress = remoteAddress = unsafe().remoteAddress();
157             } catch (Throwable t) {
158                 // Sometimes fails on a closed socket in Windows.
159                 return null;
160             }
161         }
162         return remoteAddress;
163     }
164 
165     /**
166      * Reset the stored remoteAddress
167      */
168     protected void invalidateRemoteAddress() {
169         remoteAddress = null;
170     }
171 
172     @Override
173     public boolean isRegistered() {
174         return registered;
175     }
176 
177     @Override
178     public ChannelFuture bind(SocketAddress localAddress) {
179         return pipeline.bind(localAddress);
180     }
181 
182     @Override
183     public ChannelFuture connect(SocketAddress remoteAddress) {
184         return pipeline.connect(remoteAddress);
185     }
186 
187     @Override
188     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
189         return pipeline.connect(remoteAddress, localAddress);
190     }
191 
192     @Override
193     public ChannelFuture disconnect() {
194         return pipeline.disconnect();
195     }
196 
197     @Override
198     public ChannelFuture close() {
199         return pipeline.close();
200     }
201 
202     @Override
203     public ChannelFuture deregister() {
204         /**
205          * One problem of channel deregistration is that after a channel has been deregistered
206          * there may still be tasks, created from within one of the channel's ChannelHandlers,
207          * in the {@link EventLoop}'s task queue. That way, an unfortunate twist of events could lead
208          * to tasks still being in the old {@link EventLoop}'s queue even after the channel has been
209          * registered with a new {@link EventLoop}. This would lead to the tasks being executed by two
210          * different {@link EventLoop}s.
211          *
212          * Our solution to this problem is to always perform the actual deregistration of
213          * the channel as a task and to reject any submission of new tasks, from within
214          * one of the channel's ChannelHandlers, until the channel is registered with
215          * another {@link EventLoop}. That way we can be sure that there are no more tasks regarding
216          * that particular channel after it has been deregistered (because the deregistration
217          * task is the last one.).
218          *
219          * This only works for one time tasks. To see how we handle periodic/delayed tasks have a look
220          * at {@link io.netty.util.concurrent.ScheduledFutureTask#run()}.
221          *
222          * Also see {@link HeadContext#deregister(ChannelHandlerContext, ChannelPromise)}.
223          */
224         eventLoop.rejectNewTasks();
225         return pipeline.deregister();
226     }
227 
228     @Override
229     public Channel flush() {
230         pipeline.flush();
231         return this;
232     }
233 
234     @Override
235     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
236         return pipeline.bind(localAddress, promise);
237     }
238 
239     @Override
240     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
241         return pipeline.connect(remoteAddress, promise);
242     }
243 
244     @Override
245     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
246         return pipeline.connect(remoteAddress, localAddress, promise);
247     }
248 
249     @Override
250     public ChannelFuture disconnect(ChannelPromise promise) {
251         return pipeline.disconnect(promise);
252     }
253 
254     @Override
255     public ChannelFuture close(ChannelPromise promise) {
256         return pipeline.close(promise);
257     }
258 
259     @Override
260     public ChannelFuture deregister(ChannelPromise promise) {
261         eventLoop.rejectNewTasks();
262         return pipeline.deregister(promise);
263     }
264 
265     @Override
266     public Channel read() {
267         pipeline.read();
268         return this;
269     }
270 
271     @Override
272     public ChannelFuture write(Object msg) {
273         return pipeline.write(msg);
274     }
275 
276     @Override
277     public ChannelFuture write(Object msg, ChannelPromise promise) {
278         return pipeline.write(msg, promise);
279     }
280 
281     @Override
282     public ChannelFuture writeAndFlush(Object msg) {
283         return pipeline.writeAndFlush(msg);
284     }
285 
286     @Override
287     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
288         return pipeline.writeAndFlush(msg, promise);
289     }
290 
291     @Override
292     public ChannelPromise newPromise() {
293         return new DefaultChannelPromise(this);
294     }
295 
296     @Override
297     public ChannelProgressivePromise newProgressivePromise() {
298         return new DefaultChannelProgressivePromise(this);
299     }
300 
301     @Override
302     public ChannelFuture newSucceededFuture() {
303         return succeededFuture;
304     }
305 
306     @Override
307     public ChannelFuture newFailedFuture(Throwable cause) {
308         return new FailedChannelFuture(this, null, cause);
309     }
310 
311     @Override
312     public ChannelFuture closeFuture() {
313         return closeFuture;
314     }
315 
316     @Override
317     public Unsafe unsafe() {
318         return unsafe;
319     }
320 
321     /**
322      * Create a new {@link AbstractUnsafe} instance which will be used for the life-time of the {@link Channel}
323      */
324     protected abstract AbstractUnsafe newUnsafe();
325 
326     /**
327      * Returns the ID of this channel.
328      */
329     @Override
330     public final int hashCode() {
331         return id.hashCode();
332     }
333 
334     /**
335      * Returns {@code true} if and only if the specified object is identical
336      * with this channel (i.e: {@code this == o}).
337      */
338     @Override
339     public final boolean equals(Object o) {
340         return this == o;
341     }
342 
343     @Override
344     public final int compareTo(Channel o) {
345         if (this == o) {
346             return 0;
347         }
348 
349         return id().compareTo(o.id());
350     }
351 
352     /**
353      * Returns the {@link String} representation of this channel.  The returned
354      * string contains the {@linkplain #hashCode()} ID}, {@linkplain #localAddress() local address},
355      * and {@linkplain #remoteAddress() remote address} of this channel for
356      * easier identification.
357      */
358     @Override
359     public String toString() {
360         boolean active = isActive();
361         if (strValActive == active && strVal != null) {
362             return strVal;
363         }
364 
365         SocketAddress remoteAddr = remoteAddress();
366         SocketAddress localAddr = localAddress();
367         if (remoteAddr != null) {
368             SocketAddress srcAddr;
369             SocketAddress dstAddr;
370             if (parent == null) {
371                 srcAddr = localAddr;
372                 dstAddr = remoteAddr;
373             } else {
374                 srcAddr = remoteAddr;
375                 dstAddr = localAddr;
376             }
377 
378             StringBuilder buf = new StringBuilder(96)
379                 .append("[id: 0x")
380                 .append(id.asShortText())
381                 .append(", ")
382                 .append(srcAddr)
383                 .append(active? " => " : " :> ")
384                 .append(dstAddr)
385                 .append(']');
386             strVal = buf.toString();
387         } else if (localAddr != null) {
388             StringBuilder buf = new StringBuilder(64)
389                 .append("[id: 0x")
390                 .append(id.asShortText())
391                 .append(", ")
392                 .append(localAddr)
393                 .append(']');
394             strVal = buf.toString();
395         } else {
396             StringBuilder buf = new StringBuilder(16)
397                 .append("[id: 0x")
398                 .append(id.asShortText())
399                 .append(']');
400             strVal = buf.toString();
401         }
402 
403         strValActive = active;
404         return strVal;
405     }
406 
407     @Override
408     public final ChannelPromise voidPromise() {
409         return voidPromise;
410     }
411 
412     final MessageSizeEstimator.Handle estimatorHandle() {
413         if (estimatorHandle == null) {
414             estimatorHandle = config().getMessageSizeEstimator().newHandle();
415         }
416         return estimatorHandle;
417     }
418 
419     /**
420      * {@link Unsafe} implementation which sub-classes must extend and use.
421      */
422     protected abstract class AbstractUnsafe implements Unsafe {
423 
424         private ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
425         private RecvByteBufAllocator.Handle recvHandle;
426         private boolean inFlush0;
427         /** true if the channel has never been registered, false otherwise */
428         private boolean neverRegistered = true;
429 
430         @Override
431         public RecvByteBufAllocator.Handle recvBufAllocHandle() {
432             if (recvHandle == null) {
433                 recvHandle = config().getRecvByteBufAllocator().newHandle();
434             }
435             return recvHandle;
436         }
437 
438         @Override
439         public final ChannelHandlerInvoker invoker() {
440             // return the unwrapped invoker.
441             return ((PausableChannelEventExecutor) eventLoop().asInvoker()).unwrapInvoker();
442         }
443 
444         @Override
445         public final ChannelOutboundBuffer outboundBuffer() {
446             return outboundBuffer;
447         }
448 
449         @Override
450         public final SocketAddress localAddress() {
451             return localAddress0();
452         }
453 
454         @Override
455         public final SocketAddress remoteAddress() {
456             return remoteAddress0();
457         }
458 
459         @Override
460         public final void register(EventLoop eventLoop, final ChannelPromise promise) {
461             if (eventLoop == null) {
462                 throw new NullPointerException("eventLoop");
463             }
464             if (promise == null) {
465                 throw new NullPointerException("promise");
466             }
467             if (isRegistered()) {
468                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
469                 return;
470             }
471             if (!isCompatible(eventLoop)) {
472                 promise.setFailure(
473                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
474                 return;
475             }
476 
477             // It's necessary to reuse the wrapped eventloop object. Otherwise the user will end up with multiple
478             // objects that do not share a common state.
479             if (AbstractChannel.this.eventLoop == null) {
480                 AbstractChannel.this.eventLoop = new PausableChannelEventLoop(eventLoop);
481             } else {
482                 AbstractChannel.this.eventLoop.unwrapped = eventLoop;
483             }
484 
485             if (eventLoop.inEventLoop()) {
486                 register0(promise);
487             } else {
488                 try {
489                     eventLoop.execute(new OneTimeTask() {
490                         @Override
491                         public void run() {
492                             register0(promise);
493                         }
494                     });
495                 } catch (Throwable t) {
496                     logger.warn(
497                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",
498                             AbstractChannel.this, t);
499                     closeForcibly();
500                     closeFuture.setClosed();
501                     safeSetFailure(promise, t);
502                 }
503             }
504         }
505 
506         private void register0(ChannelPromise promise) {
507             try {
508                 // check if the channel is still open as it could be closed in the mean time when the register
509                 // call was outside of the eventLoop
510                 if (!promise.setUncancellable() || !ensureOpen(promise)) {
511                     return;
512                 }
513                 boolean firstRegistration = neverRegistered;
514                 doRegister();
515                 neverRegistered = false;
516                 registered = true;
517                 eventLoop.acceptNewTasks();
518                 safeSetSuccess(promise);
519                 pipeline.fireChannelRegistered();
520                 // Only fire a channelActive if the channel has never been registered. This prevents firing
521                 // multiple channel actives if the channel is deregistered and re-registered.
522                 if (firstRegistration && isActive()) {
523                     pipeline.fireChannelActive();
524                 }
525             } catch (Throwable t) {
526                 // Close the channel directly to avoid FD leak.
527                 closeForcibly();
528                 closeFuture.setClosed();
529                 safeSetFailure(promise, t);
530             }
531         }
532 
533         @Override
534         public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
535             if (!promise.setUncancellable() || !ensureOpen(promise)) {
536                 return;
537             }
538 
539             // See: https://github.com/netty/netty/issues/576
540             if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
541                 localAddress instanceof InetSocketAddress &&
542                 !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
543                 !PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
544                 // Warn a user about the fact that a non-root user can't receive a
545                 // broadcast packet on *nix if the socket is bound on non-wildcard address.
546                 logger.warn(
547                         "A non-root user can't receive a broadcast packet if the socket " +
548                         "is not bound to a wildcard address; binding to a non-wildcard " +
549                         "address (" + localAddress + ") anyway as requested.");
550             }
551 
552             boolean wasActive = isActive();
553             try {
554                 doBind(localAddress);
555             } catch (Throwable t) {
556                 safeSetFailure(promise, t);
557                 closeIfClosed();
558                 return;
559             }
560 
561             if (!wasActive && isActive()) {
562                 invokeLater(new OneTimeTask() {
563                     @Override
564                     public void run() {
565                         pipeline.fireChannelActive();
566                     }
567                 });
568             }
569 
570             safeSetSuccess(promise);
571         }
572 
573         @Override
574         public final void disconnect(final ChannelPromise promise) {
575             if (!promise.setUncancellable()) {
576                 return;
577             }
578 
579             boolean wasActive = isActive();
580             try {
581                 doDisconnect();
582             } catch (Throwable t) {
583                 safeSetFailure(promise, t);
584                 closeIfClosed();
585                 return;
586             }
587 
588             if (wasActive && !isActive()) {
589                 invokeLater(new OneTimeTask() {
590                     @Override
591                     public void run() {
592                         pipeline.fireChannelInactive();
593                     }
594                 });
595             }
596 
597             safeSetSuccess(promise);
598             closeIfClosed(); // doDisconnect() might have closed the channel
599         }
600 
601         @Override
602         public final void close(final ChannelPromise promise) {
603             if (!promise.setUncancellable()) {
604                 return;
605             }
606 
607             if (inFlush0) {
608                 invokeLater(new OneTimeTask() {
609                     @Override
610                     public void run() {
611                         close(promise);
612                     }
613                 });
614                 return;
615             }
616 
617             if (outboundBuffer == null) {
618                 // This means close() was called before so we just register a listener and return
619                 closeFuture.addListener(new ChannelFutureListener() {
620                     @Override
621                     public void operationComplete(ChannelFuture future) throws Exception {
622                         promise.setSuccess();
623                     }
624                 });
625                 return;
626             }
627 
628             if (closeFuture.isDone()) {
629                 // Closed already.
630                 safeSetSuccess(promise);
631                 return;
632             }
633 
634             final boolean wasActive = isActive();
635             final ChannelOutboundBuffer buffer = outboundBuffer;
636             outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
637             Executor closeExecutor = closeExecutor();
638             if (closeExecutor != null) {
639                 closeExecutor.execute(new OneTimeTask() {
640                     @Override
641                     public void run() {
642                         Throwable cause = null;
643                         try {
644                             doClose();
645                         } catch (Throwable t) {
646                             cause = t;
647                         }
648                         final Throwable error = cause;
649                         // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
650                         invokeLater(new OneTimeTask() {
651                             @Override
652                             public void run() {
653                                 closeAndDeregister(buffer, wasActive, promise, error);
654                             }
655                         });
656                     }
657                 });
658             } else {
659                 Throwable error = null;
660                 try {
661                     doClose();
662                 } catch (Throwable t) {
663                     error = t;
664                 }
665                 closeAndDeregister(buffer, wasActive, promise, error);
666             }
667         }
668 
669         private void closeAndDeregister(ChannelOutboundBuffer outboundBuffer, final boolean wasActive,
670                                         ChannelPromise promise, Throwable error) {
671             // Fail all the queued messages
672             try {
673                 outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
674                 outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
675             } finally {
676                 if (wasActive && !isActive()) {
677                     invokeLater(new OneTimeTask() {
678                         @Override
679                         public void run() {
680                             pipeline.fireChannelInactive();
681                             deregister(voidPromise());
682                         }
683                     });
684                 } else {
685                     invokeLater(new OneTimeTask() {
686                         @Override
687                         public void run() {
688                             deregister(voidPromise());
689                         }
690                     });
691                 }
692 
693                 // Now complete the closeFuture and promise.
694                 closeFuture.setClosed();
695                 if (error != null) {
696                     safeSetFailure(promise, error);
697                 } else {
698                     safeSetSuccess(promise);
699                 }
700             }
701         }
702 
703         @Override
704         public final void closeForcibly() {
705             try {
706                 doClose();
707             } catch (Exception e) {
708                 logger.warn("Failed to close a channel.", e);
709             }
710         }
711 
712         /**
713          * This method must NEVER be called directly, but be executed as an
714          * extra task with a clean call stack instead. The reason for this
715          * is that this method calls {@link ChannelPipeline#fireChannelUnregistered()}
716          * directly, which might lead to an unfortunate nesting of independent inbound/outbound
717          * events. See the comments in {@link #invokeLater(Runnable)} for more details.
718          */
719         @Override
720         public final void deregister(final ChannelPromise promise) {
721             if (!promise.setUncancellable()) {
722                 return;
723             }
724 
725             if (!registered) {
726                 safeSetSuccess(promise);
727                 return;
728             }
729 
730             try {
731                 doDeregister();
732             } catch (Throwable t) {
733                 safeSetFailure(promise, t);
734                 logger.warn("Unexpected exception occurred while deregistering a channel.", t);
735             } finally {
736                 if (registered) {
737                     registered = false;
738                     safeSetSuccess(promise);
739                     pipeline.fireChannelUnregistered();
740                 } else {
741                     // Some transports like local and AIO does not allow the deregistration of
742                     // an open channel.  Their doDeregister() calls close().  Consequently,
743                     // close() calls deregister() again - no need to fire channelUnregistered.
744                     safeSetSuccess(promise);
745                 }
746             }
747         }
748 
749         @Override
750         public final void beginRead() {
751             if (!isActive()) {
752                 return;
753             }
754 
755             try {
756                 doBeginRead();
757             } catch (final Exception e) {
758                 invokeLater(new OneTimeTask() {
759                     @Override
760                     public void run() {
761                         pipeline.fireExceptionCaught(e);
762                     }
763                 });
764                 close(voidPromise());
765             }
766         }
767 
768         @Override
769         public final void write(Object msg, ChannelPromise promise) {
770             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
771             if (outboundBuffer == null) {
772                 // If the outboundBuffer is null we know the channel was closed and so
773                 // need to fail the future right away. If it is not null the handling of the rest
774                 // will be done in flush0()
775                 // See https://github.com/netty/netty/issues/2362
776                 safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
777                 // release message now to prevent resource-leak
778                 ReferenceCountUtil.release(msg);
779                 return;
780             }
781 
782             int size;
783             try {
784                 msg = filterOutboundMessage(msg);
785                 size = estimatorHandle().size(msg);
786                 if (size < 0) {
787                     size = 0;
788                 }
789             } catch (Throwable t) {
790                 safeSetFailure(promise, t);
791                 ReferenceCountUtil.release(msg);
792                 return;
793             }
794 
795             outboundBuffer.addMessage(msg, size, promise);
796         }
797 
798         @Override
799         public final void flush() {
800             ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
801             if (outboundBuffer == null) {
802                 return;
803             }
804 
805             outboundBuffer.addFlush();
806             flush0();
807         }
808 
809         protected void flush0() {
810             if (inFlush0) {
811                 // Avoid re-entrance
812                 return;
813             }
814 
815             final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
816             if (outboundBuffer == null || outboundBuffer.isEmpty()) {
817                 return;
818             }
819 
820             inFlush0 = true;
821 
822             // Mark all pending write requests as failure if the channel is inactive.
823             if (!isActive()) {
824                 try {
825                     if (isOpen()) {
826                         outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION);
827                     } else {
828                         outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
829                     }
830                 } finally {
831                     inFlush0 = false;
832                 }
833                 return;
834             }
835 
836             try {
837                 doWrite(outboundBuffer);
838             } catch (Throwable t) {
839                 outboundBuffer.failFlushed(t);
840             } finally {
841                 inFlush0 = false;
842             }
843         }
844 
845         @Override
846         public final ChannelPromise voidPromise() {
847             return unsafeVoidPromise;
848         }
849 
850         protected final boolean ensureOpen(ChannelPromise promise) {
851             if (isOpen()) {
852                 return true;
853             }
854 
855             safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
856             return false;
857         }
858 
859         /**
860          * Marks the specified {@code promise} as success.  If the {@code promise} is done already, log a message.
861          */
862         protected final void safeSetSuccess(ChannelPromise promise) {
863             if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
864                 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
865             }
866         }
867 
868         /**
869          * Marks the specified {@code promise} as failure.  If the {@code promise} is done already, log a message.
870          */
871         protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
872             if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
873                 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
874             }
875         }
876 
877         protected final void closeIfClosed() {
878             if (isOpen()) {
879                 return;
880             }
881             close(voidPromise());
882         }
883 
884         private void invokeLater(Runnable task) {
885             try {
886                 // This method is used by outbound operation implementations to trigger an inbound event later.
887                 // They do not trigger an inbound event immediately because an outbound operation might have been
888                 // triggered by another inbound event handler method.  If fired immediately, the call stack
889                 // will look like this for example:
890                 //
891                 //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
892                 //   -> handlerA.ctx.close()
893                 //      -> channel.unsafe.close()
894                 //         -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
895                 //
896                 // which means the execution of two inbound handler methods of the same handler overlap undesirably.
897                 eventLoop().unwrap().execute(task);
898             } catch (RejectedExecutionException e) {
899                 logger.warn("Can't invoke task later as EventLoop rejected it", e);
900             }
901         }
902 
903         /**
904          * Appends the remote address to the message of the exceptions caused by connection attempt failure.
905          */
906         protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
907             if (cause instanceof ConnectException) {
908                 Throwable newT = new ConnectException(cause.getMessage() + ": " + remoteAddress);
909                 newT.setStackTrace(cause.getStackTrace());
910                 cause = newT;
911             } else if (cause instanceof NoRouteToHostException) {
912                 Throwable newT = new NoRouteToHostException(cause.getMessage() + ": " + remoteAddress);
913                 newT.setStackTrace(cause.getStackTrace());
914                 cause = newT;
915             } else if (cause instanceof SocketException) {
916                 Throwable newT = new SocketException(cause.getMessage() + ": " + remoteAddress);
917                 newT.setStackTrace(cause.getStackTrace());
918                 cause = newT;
919             }
920 
921             return cause;
922         }
923 
924         /**
925          * @return {@link Executor} to execute {@link #doClose()} or {@code null} if it should be done in the
926          * {@link EventLoop}.
927          +
928          */
929         protected Executor closeExecutor() {
930             return null;
931         }
932     }
933 
934     /**
935      * Return {@code true} if the given {@link EventLoop} is compatible with this instance.
936      */
937     protected abstract boolean isCompatible(EventLoop loop);
938 
939     /**
940      * Returns the {@link SocketAddress} which is bound locally.
941      */
942     protected abstract SocketAddress localAddress0();
943 
944     /**
945      * Return the {@link SocketAddress} which the {@link Channel} is connected to.
946      */
947     protected abstract SocketAddress remoteAddress0();
948 
949     /**
950      * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
951      *
952      * Sub-classes may override this method
953      */
954     protected void doRegister() throws Exception {
955         // NOOP
956     }
957 
958     /**
959      * Bind the {@link Channel} to the {@link SocketAddress}
960      */
961     protected abstract void doBind(SocketAddress localAddress) throws Exception;
962 
963     /**
964      * Disconnect this {@link Channel} from its remote peer
965      */
966     protected abstract void doDisconnect() throws Exception;
967 
968     /**
969      * Close the {@link Channel}
970      */
971     protected abstract void doClose() throws Exception;
972 
973     /**
974      * Deregister the {@link Channel} from its {@link EventLoop}.
975      *
976      * Sub-classes may override this method
977      */
978     protected void doDeregister() throws Exception {
979         // NOOP
980     }
981 
982     /**
983      * Schedule a read operation.
984      */
985     protected abstract void doBeginRead() throws Exception;
986 
987     /**
988      * Flush the content of the given buffer to the remote peer.
989      */
990     protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
991 
992     /**
993      * Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that
994      * the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer)
995      */
996     protected Object filterOutboundMessage(Object msg) throws Exception {
997         return msg;
998     }
999 
1000     static final class CloseFuture extends DefaultChannelPromise {
1001 
1002         CloseFuture(AbstractChannel ch) {
1003             super(ch);
1004         }
1005 
1006         @Override
1007         public ChannelPromise setSuccess() {
1008             throw new IllegalStateException();
1009         }
1010 
1011         @Override
1012         public ChannelPromise setFailure(Throwable cause) {
1013             throw new IllegalStateException();
1014         }
1015 
1016         @Override
1017         public boolean trySuccess() {
1018             throw new IllegalStateException();
1019         }
1020 
1021         @Override
1022         public boolean tryFailure(Throwable cause) {
1023             throw new IllegalStateException();
1024         }
1025 
1026         boolean setClosed() {
1027             return super.trySuccess();
1028         }
1029     }
1030 
1031     private final class PausableChannelEventLoop
1032             extends PausableChannelEventExecutor implements EventLoop {
1033 
1034         volatile boolean isAcceptingNewTasks = true;
1035         volatile EventLoop unwrapped;
1036 
1037         PausableChannelEventLoop(EventLoop unwrapped) {
1038             this.unwrapped = unwrapped;
1039         }
1040 
1041         @Override
1042         public void rejectNewTasks() {
1043             isAcceptingNewTasks = false;
1044         }
1045 
1046         @Override
1047         public void acceptNewTasks() {
1048             isAcceptingNewTasks = true;
1049         }
1050 
1051         @Override
1052         public boolean isAcceptingNewTasks() {
1053             return isAcceptingNewTasks;
1054         }
1055 
1056         @Override
1057         public EventLoopGroup parent() {
1058             return unwrap().parent();
1059         }
1060 
1061         @Override
1062         public EventLoop next() {
1063             return unwrap().next();
1064         }
1065 
1066         @Override
1067         public EventLoop unwrap() {
1068             return unwrapped;
1069         }
1070 
1071         @Override
1072         public ChannelHandlerInvoker asInvoker() {
1073             return this;
1074         }
1075 
1076         @Override
1077         public ChannelFuture register(Channel channel) {
1078             return unwrap().register(channel);
1079         }
1080 
1081         @Override
1082         public ChannelFuture register(Channel channel, ChannelPromise promise) {
1083             return unwrap().register(channel, promise);
1084         }
1085 
1086         @Override
1087         Channel channel() {
1088             return AbstractChannel.this;
1089         }
1090 
1091         @Override
1092         ChannelHandlerInvoker unwrapInvoker() {
1093             return unwrapped.asInvoker();
1094         }
1095     }
1096 }