View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel;
17  
18  import io.netty5.buffer.api.BufferAllocator;
19  import io.netty5.buffer.api.DefaultBufferAllocators;
20  import io.netty5.util.Resource;
21  import io.netty5.util.DefaultAttributeMap;
22  import io.netty5.util.concurrent.DefaultPromise;
23  import io.netty5.util.concurrent.EventExecutor;
24  import io.netty5.util.concurrent.Future;
25  import io.netty5.util.concurrent.Promise;
26  import io.netty5.util.internal.PlatformDependent;
27  import io.netty5.util.internal.StringUtil;
28  import io.netty5.util.internal.logging.InternalLogger;
29  import io.netty5.util.internal.logging.InternalLoggerFactory;
30  
31  import java.io.IOException;
32  import java.net.ConnectException;
33  import java.net.InetSocketAddress;
34  import java.net.NoRouteToHostException;
35  import java.net.SocketAddress;
36  import java.net.SocketException;
37  import java.nio.channels.AlreadyConnectedException;
38  import java.nio.channels.ClosedChannelException;
39  import java.nio.channels.ConnectionPendingException;
40  import java.nio.channels.NotYetConnectedException;
41  import java.util.Collections;
42  import java.util.IdentityHashMap;
43  import java.util.NoSuchElementException;
44  import java.util.Set;
45  import java.util.concurrent.Executor;
46  import java.util.concurrent.RejectedExecutionException;
47  import java.util.concurrent.TimeUnit;
48  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
49  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
50  
51  import static io.netty5.channel.ChannelOption.ALLOW_HALF_CLOSURE;
52  import static io.netty5.channel.ChannelOption.AUTO_CLOSE;
53  import static io.netty5.channel.ChannelOption.AUTO_READ;
54  import static io.netty5.channel.ChannelOption.BUFFER_ALLOCATOR;
55  import static io.netty5.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
56  import static io.netty5.channel.ChannelOption.MAX_MESSAGES_PER_READ;
57  import static io.netty5.channel.ChannelOption.MAX_MESSAGES_PER_WRITE;
58  import static io.netty5.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR;
59  import static io.netty5.channel.ChannelOption.RCVBUFFER_ALLOCATOR;
60  import static io.netty5.channel.ChannelOption.WRITE_BUFFER_WATER_MARK;
61  import static io.netty5.channel.ChannelOption.WRITE_SPIN_COUNT;
62  import static io.netty5.util.internal.ObjectUtil.checkPositive;
63  import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
64  import static java.util.Objects.requireNonNull;
65  
66  /**
67   * A skeletal {@link Channel} implementation.
68   */
69  public abstract class AbstractChannel<P extends Channel, L extends SocketAddress, R extends SocketAddress>
70          extends DefaultAttributeMap implements Channel {
71  
72      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
73      private static final MessageSizeEstimator DEFAULT_MSG_SIZE_ESTIMATOR = DefaultMessageSizeEstimator.DEFAULT;
74  
75      private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
76  
77      private static final Set<ChannelOption<?>> SUPPORTED_CHANNEL_OPTIONS = supportedOptions();
78  
79      private final P parent;
80      private final ChannelId id;
81      private final ChannelPipeline pipeline;
82      private final ClosePromise closePromise;
83      private final Runnable fireChannelWritabilityChangedTask;
84      private final EventLoop eventLoop;
85      private final ChannelMetadata metadata;
86  
87      /** Cache for the string representation of this channel */
88      private boolean strValActive;
89      private String strVal;
90  
91      @SuppressWarnings("rawtypes")
92      private static final AtomicIntegerFieldUpdater<AbstractChannel> WRITABLE_UPDATER =
93              AtomicIntegerFieldUpdater.newUpdater(AbstractChannel.class, "writable");
94      private volatile int writable = 1;
95      private volatile ChannelOutboundBuffer outboundBuffer;
96      private volatile L localAddress;
97      private volatile R remoteAddress;
98      private volatile boolean registered;
99  
100     private static final AtomicIntegerFieldUpdater<AbstractChannel> AUTOREAD_UPDATER =
101             AtomicIntegerFieldUpdater.newUpdater(AbstractChannel.class, "autoRead");
102     private static final AtomicReferenceFieldUpdater<AbstractChannel, WriteBufferWaterMark> WATERMARK_UPDATER =
103             AtomicReferenceFieldUpdater.newUpdater(
104                     AbstractChannel.class, WriteBufferWaterMark.class, "writeBufferWaterMark");
105 
106     private volatile BufferAllocator bufferAllocator = DefaultBufferAllocators.preferredAllocator();
107     private volatile RecvBufferAllocator rcvBufAllocator;
108     private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR;
109 
110     private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
111     private volatile int writeSpinCount = 16;
112     private volatile int maxMessagesPerWrite = Integer.MAX_VALUE;
113 
114     @SuppressWarnings("FieldMayBeFinal")
115     private volatile int autoRead = 1;
116     private volatile boolean autoClose = true;
117     private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
118     private volatile boolean allowHalfClosure;
119 
120     // All fields below are only called from within the EventLoop thread.
121     private boolean closeInitiated;
122     private Throwable initialCloseCause;
123     private boolean readBeforeActive;
124     private RecvBufferAllocator.Handle recvHandle;
125     private MessageSizeEstimator.Handle estimatorHandler;
126     private boolean inWriteFlushed;
127     /** true if the channel has never been registered, false otherwise */
128     private boolean neverRegistered = true;
129     private boolean neverActive = true;
130 
131     /**
132      * The future of the current connection attempt.  If not null, subsequent
133      * connection attempts will fail.
134      */
135     private Promise<Void> connectPromise;
136     private Future<?> connectTimeoutFuture;
137     private R requestedRemoteAddress;
138 
139     /**
140      * Creates a new instance.
141      *
142      * @param parent        the parent of this channel. {@code null} if there's no parent.
143      * @param eventLoop     the {@link EventLoop} which will be used.
144      * @param metadata      the {@link ChannelMetadata} to use.
145      */
146     protected AbstractChannel(P parent, EventLoop eventLoop, ChannelMetadata metadata) {
147         this(parent, eventLoop, metadata, new AdaptiveRecvBufferAllocator());
148     }
149 
150     /**
151      * Creates a new instance.
152      *
153      * @param parent                        the parent of this channel. {@code null} if there's no parent.
154      * @param eventLoop                     the {@link EventLoop} which will be used.
155      * @param metadata                      the {@link ChannelMetadata} to use.
156      * @param defaultRecvBufferAllocator    the {@link RecvBufferAllocator} that is used by default.
157      */
158     protected AbstractChannel(P parent, EventLoop eventLoop,
159                               ChannelMetadata metadata, RecvBufferAllocator defaultRecvBufferAllocator) {
160         this(parent, eventLoop, metadata, defaultRecvBufferAllocator, DefaultChannelId.newInstance());
161     }
162 
163     /**
164      * Creates a new instance.
165      *
166      * @param parent                        the parent of this channel. {@code null} if there's no parent.
167      * @param eventLoop                     the {@link EventLoop} which will be used.
168      * @param metadata                      the {@link ChannelMetadata} to use.
169      * @param defaultRecvBufferAllocator    the {@link RecvBufferAllocator} that is used by default.
170      * @param id                            the {@link ChannelId} which will be used.
171      */
172     protected AbstractChannel(P parent, EventLoop eventLoop, ChannelMetadata metadata,
173                               RecvBufferAllocator defaultRecvBufferAllocator, ChannelId id) {
174         this.parent = parent;
175         this.eventLoop = validateEventLoopGroup(eventLoop, "eventLoop", getClass());
176         this.metadata = requireNonNull(metadata, "metadata");
177         closePromise = new ClosePromise(eventLoop);
178         outboundBuffer = new ChannelOutboundBuffer(eventLoop);
179         this.id = id;
180         pipeline = newChannelPipeline();
181         fireChannelWritabilityChangedTask = () -> pipeline().fireChannelWritabilityChanged();
182         rcvBufAllocator = validateAndConfigure(defaultRecvBufferAllocator, metadata);
183     }
184 
185     private static RecvBufferAllocator validateAndConfigure(RecvBufferAllocator defaultRecvBufferAllocator,
186                                                             ChannelMetadata metadata) {
187         requireNonNull(defaultRecvBufferAllocator, "defaultRecvBufferAllocator");
188         if (defaultRecvBufferAllocator instanceof MaxMessagesRecvBufferAllocator) {
189             ((MaxMessagesRecvBufferAllocator) defaultRecvBufferAllocator)
190                     .maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
191         }
192         return defaultRecvBufferAllocator;
193     }
194 
195     protected static <T extends EventLoopGroup> T validateEventLoopGroup(
196             T group, String name, Class<? extends Channel> channelType) {
197         requireNonNull(group, name);
198         if (!group.isCompatible(channelType)) {
199             throw new IllegalArgumentException(group + " does not support channel of type " +
200                             StringUtil.simpleClassName(channelType));
201         }
202         return group;
203     }
204 
205     @Override
206     public final ChannelId id() {
207         return id;
208     }
209 
210     @Override
211     public final ChannelMetadata metadata() {
212         return metadata;
213     }
214 
215     /**
216      * Returns a new {@link ChannelPipeline} instance.
217      */
218     protected ChannelPipeline newChannelPipeline() {
219         return new DefaultAbstractChannelPipeline(this);
220     }
221 
222     @Override
223     public BufferAllocator bufferAllocator() {
224         return bufferAllocator;
225     }
226 
227     @Override
228     public final P parent() {
229         return parent;
230     }
231 
232     @Override
233     public final ChannelPipeline pipeline() {
234         return pipeline;
235     }
236 
237     @Override
238     public final EventLoop executor() {
239         return eventLoop;
240     }
241 
242     @Override
243     public final L localAddress() {
244         L localAddress = this.localAddress;
245         if (localAddress == null) {
246             try {
247                 this.localAddress = localAddress = localAddress0();
248             } catch (Error e) {
249                 throw e;
250             } catch (Throwable t) {
251                 // Sometimes fails on a closed socket in Windows.
252                 return null;
253             }
254         }
255         return localAddress;
256     }
257 
258     @Override
259     public final R remoteAddress() {
260         R remoteAddress = this.remoteAddress;
261         if (remoteAddress == null) {
262             try {
263                 this.remoteAddress = remoteAddress = remoteAddress0();
264             } catch (Error e) {
265                 throw e;
266             } catch (Throwable t) {
267                 // Sometimes fails on a closed socket in Windows.
268                 return null;
269             }
270         }
271         return remoteAddress;
272     }
273 
274     protected final void cacheAddresses(L localAddress, R  remoteAddress) {
275         this.localAddress = localAddress;
276         this.remoteAddress = remoteAddress;
277     }
278 
279     @Override
280     public final boolean isRegistered() {
281         return registered;
282     }
283 
284     @Override
285     public final Future<Void> closeFuture() {
286         return closePromise;
287     }
288 
289     private long totalPending() {
290         ChannelOutboundBuffer buf = outboundBuffer();
291         if (buf == null) {
292             return -1;
293         }
294         return buf.totalPendingWriteBytes() + pipeline().pendingOutboundBytes();
295     }
296 
297     @Override
298     public final long writableBytes() {
299         long totalPending = totalPending();
300         if (totalPending == -1) {
301             // Already closed.
302             return 0;
303         }
304 
305         long bytes = writeBufferWaterMark.high() -
306                 totalPending;
307         // If bytes is negative we know we are not writable.
308         if (bytes > 0) {
309             return WRITABLE_UPDATER.get(this) == 0 ? 0: bytes;
310         }
311         return 0;
312     }
313 
314     /**
315      * Returns the ID of this channel.
316      */
317     @Override
318     public final int hashCode() {
319         return id.hashCode();
320     }
321 
322     /**
323      * Returns {@code true} if and only if the specified object is identical
324      * with this channel (i.e: {@code this == o}).
325      */
326     @Override
327     public final boolean equals(Object o) {
328         return this == o;
329     }
330 
331     @Override
332     public final int compareTo(Channel o) {
333         if (this == o) {
334             return 0;
335         }
336 
337         return id().compareTo(o.id());
338     }
339 
340     /**
341      * Returns the {@link String} representation of this channel.  The returned
342      * string contains the {@linkplain #hashCode() ID}, {@linkplain #localAddress() local address},
343      * and {@linkplain #remoteAddress() remote address} of this channel for
344      * easier identification.
345      */
346     @Override
347     public String toString() {
348         boolean active = isActive();
349         if (strValActive == active && strVal != null) {
350             return strVal;
351         }
352 
353         SocketAddress remoteAddr = remoteAddress();
354         SocketAddress localAddr = localAddress();
355         if (remoteAddr != null) {
356             StringBuilder buf = new StringBuilder(96)
357                 .append("[id: 0x")
358                 .append(id.asShortText())
359                 .append(", L:")
360                 .append(localAddr)
361                 .append(active? " - " : " ! ")
362                 .append("R:")
363                 .append(remoteAddr)
364                 .append(']');
365             strVal = buf.toString();
366         } else if (localAddr != null) {
367             StringBuilder buf = new StringBuilder(64)
368                 .append("[id: 0x")
369                 .append(id.asShortText())
370                 .append(", L:")
371                 .append(localAddr)
372                 .append(']');
373             strVal = buf.toString();
374         } else {
375             StringBuilder buf = new StringBuilder(16)
376                 .append("[id: 0x")
377                 .append(id.asShortText())
378                 .append(']');
379             strVal = buf.toString();
380         }
381 
382         strValActive = active;
383         return strVal;
384     }
385 
386     protected final void readIfIsAutoRead() {
387         assertEventLoop();
388 
389         if (isAutoRead() || readBeforeActive) {
390             readBeforeActive = false;
391             read();
392         }
393     }
394 
395     protected final void assertEventLoop() {
396         assert eventLoop.inEventLoop();
397     }
398 
399     protected RecvBufferAllocator.Handle recvBufAllocHandle() {
400         assertEventLoop();
401 
402         if (recvHandle == null) {
403             recvHandle = getRecvBufferAllocator().newHandle();
404         }
405         return recvHandle;
406     }
407 
408     private void registerTransport(final Promise<Void> promise) {
409         assertEventLoop();
410 
411         if (isRegistered()) {
412             promise.setFailure(new IllegalStateException("registered to an event loop already"));
413             return;
414         }
415 
416         try {
417             // check if the channel is still open as it could be closed in the mean time when the register
418             // call was outside of the eventLoop
419             if (!promise.setUncancellable() || !ensureOpen(promise)) {
420                 return;
421             }
422             boolean firstRegistration = neverRegistered;
423             executor().registerForIo(this).addListener(f -> {
424                 if (f.isSuccess()) {
425 
426                     neverRegistered = false;
427                     registered = true;
428 
429                     safeSetSuccess(promise);
430                     pipeline.fireChannelRegistered();
431                     // Only fire a channelActive if the channel has never been registered. This prevents firing
432                     // multiple channel actives if the channel is deregistered and re-registered.
433                     if (isActive()) {
434                         if (firstRegistration) {
435                             fireChannelActiveIfNotActiveBefore();
436                         }
437                         readIfIsAutoRead();
438                     }
439                 } else {
440                     // Close the channel directly to avoid FD leak.
441                     closeNowAndFail(promise, f.cause());
442                 }
443             });
444 
445         } catch (Throwable t) {
446             // Close the channel directly to avoid FD leak.
447             closeNowAndFail(promise, t);
448         }
449     }
450 
451     /**
452      * Calls {@link ChannelPipeline#fireChannelActive()} if it was not done yet.
453      *
454      * @return {@code true} if {@link ChannelPipeline#fireChannelActive()} was called, {@code false} otherwise.
455      */
456     protected final boolean fireChannelActiveIfNotActiveBefore() {
457         assertEventLoop();
458 
459         if (neverActive) {
460             neverActive = false;
461             pipeline().fireChannelActive();
462             return true;
463         }
464         return false;
465     }
466 
467     private void closeNowAndFail(Promise<Void> promise, Throwable cause) {
468         closeForciblyTransport();
469         closePromise.setClosed();
470         safeSetFailure(promise, cause);
471     }
472 
473     private void bindTransport(final SocketAddress localAddress, final Promise<Void> promise) {
474         assertEventLoop();
475 
476         if (!promise.setUncancellable() || !ensureOpen(promise)) {
477             return;
478         }
479 
480         // See: https://github.com/netty/netty/issues/576
481         if (localAddress instanceof InetSocketAddress && isOptionSupported(ChannelOption.SO_BROADCAST) &&
482                 Boolean.TRUE.equals(getOption(ChannelOption.SO_BROADCAST)) &&
483             !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
484             !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
485             // Warn a user about the fact that a non-root user can't receive a
486             // broadcast packet on *nix if the socket is bound on non-wildcard address.
487             logger.warn(
488                     "A non-root user can't receive a broadcast packet if the socket " +
489                     "is not bound to a wildcard address; binding to a non-wildcard " +
490                     "address (" + localAddress + ") anyway as requested.");
491         }
492 
493         boolean wasActive = isActive();
494         try {
495             doBind(localAddress);
496         } catch (Throwable t) {
497             safeSetFailure(promise, t);
498             closeIfClosed();
499             return;
500         }
501 
502         if (!wasActive && isActive()) {
503             invokeLater(() -> {
504                 if (fireChannelActiveIfNotActiveBefore()) {
505                     readIfIsAutoRead();
506                 }
507             });
508         }
509 
510         safeSetSuccess(promise);
511     }
512 
513     private void disconnectTransport(final Promise<Void> promise) {
514         assertEventLoop();
515 
516         if (!promise.setUncancellable()) {
517             return;
518         }
519 
520         boolean wasActive = isActive();
521         try {
522             doDisconnect();
523             // Reset remoteAddress and localAddress
524             remoteAddress = null;
525             localAddress = null;
526             neverActive = true;
527         } catch (Throwable t) {
528             safeSetFailure(promise, t);
529             closeIfClosed();
530             return;
531         }
532 
533         if (wasActive && !isActive()) {
534             invokeLater(pipeline::fireChannelInactive);
535         }
536 
537         safeSetSuccess(promise);
538         closeIfClosed(); // doDisconnect() might have closed the channel
539     }
540 
541     protected void closeTransport(final Promise<Void> promise) {
542         assertEventLoop();
543 
544         ClosedChannelException closedChannelException =
545                 StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(Promise)");
546         close(promise, closedChannelException, closedChannelException);
547     }
548 
549     private void updateWritabilityIfNeeded(boolean notify, boolean notifyLater) {
550         long totalPending = totalPending();
551 
552         if (totalPending > writeBufferWaterMark.high()) {
553             if (WRITABLE_UPDATER.compareAndSet(this, 1, 0)) {
554                 fireChannelWritabilityChangedIfNeeded(notify, notifyLater);
555             }
556         } else if (totalPending < writeBufferWaterMark.low()) {
557             if (WRITABLE_UPDATER.compareAndSet(this, 0, 1)) {
558                 fireChannelWritabilityChangedIfNeeded(notify, notifyLater);
559             }
560         }
561     }
562 
563     private void fireChannelWritabilityChangedIfNeeded(boolean notify, boolean notifyLater) {
564         if (!notify) {
565             return;
566         }
567         if (notifyLater) {
568             executor().execute(fireChannelWritabilityChangedTask);
569         } else {
570             pipeline().fireChannelWritabilityChanged();
571         }
572     }
573 
574     /**
575      * Shutdown the output portion of the corresponding {@link Channel}.
576      * For example this will clean up the {@link ChannelOutboundBuffer} and not allow any more writes.
577      * @param cause The cause which may provide rational for the shutdown.
578      */
579     private boolean shutdownOutput(final Promise<Void> promise, Throwable cause) {
580         final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
581         if (outboundBuffer == null) {
582             promise.setFailure(new ClosedChannelException());
583             return false;
584         }
585         this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
586 
587         final Throwable shutdownCause = cause == null ?
588                 new ChannelOutputShutdownException("Channel output shutdown") :
589                 new ChannelOutputShutdownException("Channel output shutdown", cause);
590         // When a side enables SO_LINGER and calls showdownOutput(...) to start TCP half-closure
591         // we can not call doDeregister here because we should ensure this side in fin_wait2 state
592         // can still receive and process the data which is send by another side in the close_wait state。
593         // See https://github.com/netty/netty/issues/11981
594         try {
595             // The shutdown function does not block regardless of the SO_LINGER setting on the socket
596             // so we don't need to use GlobalEventExecutor to execute the shutdown
597             doShutdown(ChannelShutdownDirection.Outbound);
598             promise.setSuccess(null);
599         } catch (Throwable err) {
600             promise.setFailure(err);
601         } finally {
602             outboundBuffer.failFlushedAndClose(shutdownCause, shutdownCause);
603         }
604         return true;
605     }
606 
607     private void close(final Promise<Void> promise, final Throwable cause,
608                        final ClosedChannelException closeCause) {
609         if (!promise.setUncancellable()) {
610             return;
611         }
612 
613         if (closeInitiated) {
614             if (closePromise.isDone()) {
615                 // Closed already.
616                 safeSetSuccess(promise);
617             } else {
618                 // This means close() was called before, so we just register a listener and return
619                 closePromise.addListener(promise, (p, future) -> p.setSuccess(null));
620             }
621             return;
622         }
623 
624         closeInitiated = true;
625 
626         final boolean wasActive = isActive();
627         final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
628         this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
629         Future<Executor> closeExecutorFuture = prepareToClose();
630         if (closeExecutorFuture != null) {
631             closeExecutorFuture.addListener(f -> {
632                 if (f.isFailed()) {
633                     logger.warn("We couldnt obtain the closeExecutor", f.cause());
634                     closeNow(outboundBuffer, wasActive, promise, cause, closeCause);
635                 } else {
636                     Executor closeExecutor = f.getNow();
637                     closeExecutor.execute(() -> {
638                         try {
639                             // Execute the close.
640                             doClose0(promise);
641                         } finally {
642                             // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
643                             invokeLater(() -> {
644                                 closeAndUpdateWritability(outboundBuffer, cause, closeCause);
645                                 fireChannelInactiveAndDeregister(wasActive);
646                             });
647                         }
648                     });
649                 }
650             });
651         } else {
652             closeNow(outboundBuffer, wasActive, promise, cause, closeCause);
653         }
654     }
655 
656     private void closeNow(ChannelOutboundBuffer outboundBuffer, boolean wasActive, Promise<Void> promise,
657                           Throwable cause, ClosedChannelException closeCause) {
658         try {
659             // Close the channel and fail the queued messages in all cases.
660             doClose0(promise);
661         } finally {
662             closeAndUpdateWritability(outboundBuffer, cause, closeCause);
663         }
664         if (inWriteFlushed) {
665             invokeLater(() -> fireChannelInactiveAndDeregister(wasActive));
666         } else {
667             fireChannelInactiveAndDeregister(wasActive);
668         }
669     }
670 
671     private void closeAndUpdateWritability(
672             ChannelOutboundBuffer outboundBuffer, Throwable cause, Throwable closeCause) {
673         if (outboundBuffer != null) {
674             // Fail all the queued messages
675             outboundBuffer.failFlushedAndClose(cause, closeCause);
676             updateWritabilityIfNeeded(false, false);
677         }
678     }
679 
680     private void doClose0(Promise<Void> promise) {
681         try {
682             cancelConnect();
683             doClose();
684             closePromise.setClosed();
685             safeSetSuccess(promise);
686         } catch (Throwable t) {
687             closePromise.setClosed();
688             safeSetFailure(promise, t);
689         }
690     }
691 
692     private void fireChannelInactiveAndDeregister(final boolean wasActive) {
693         deregister(newPromise(), wasActive && !isActive());
694     }
695 
696     protected final void closeForciblyTransport() {
697         assertEventLoop();
698 
699         try {
700             cancelConnect();
701             doClose();
702         } catch (Exception e) {
703             logger.warn("Failed to close a channel.", e);
704         }
705     }
706 
707     private void cancelConnect() {
708         Promise<Void> promise = connectPromise;
709         if (promise != null) {
710             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
711             promise.tryFailure(new ClosedChannelException());
712             connectPromise = null;
713         }
714 
715         Future<?> future = connectTimeoutFuture;
716         if (future != null) {
717             future.cancel();
718             connectTimeoutFuture = null;
719         }
720     }
721 
722     protected final void shutdownTransport(ChannelShutdownDirection direction, Promise<Void> promise) {
723         assertEventLoop();
724 
725         if (!promise.setUncancellable()) {
726             return;
727         }
728         if (!isActive()) {
729             if (isOpen()) {
730                 promise.setFailure(new NotYetConnectedException());
731             } else {
732                 promise.setFailure(new ClosedChannelException());
733             }
734             return;
735         }
736         if (isShutdown(direction)) {
737             // Already shutdown so let's just make this a noop.
738             promise.setSuccess(null);
739             return;
740         }
741         boolean fireEvent = false;
742         switch (direction) {
743             case Outbound:
744                 fireEvent = shutdownOutput(promise, null);
745                 break;
746             case Inbound:
747                 try {
748                     doShutdown(direction);
749                     promise.setSuccess(null);
750                     fireEvent = true;
751                 } catch (Throwable cause) {
752                     promise.setFailure(cause);
753                 }
754                 break;
755             default:
756                 // Should never happen
757                 promise.setFailure(new AssertionError());
758                 break;
759         }
760         if (fireEvent) {
761             pipeline().fireChannelShutdown(direction);
762         }
763     }
764 
765     protected void deregisterTransport(final Promise<Void> promise) {
766         assertEventLoop();
767 
768         deregister(promise, false);
769     }
770 
771     private void deregister(final Promise<Void> promise, final boolean fireChannelInactive) {
772         if (!promise.setUncancellable()) {
773             return;
774         }
775 
776         if (!registered) {
777             safeSetSuccess(promise);
778             return;
779         }
780 
781         // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
782         // we need to ensure we do the actual deregister operation later. This is needed as for example,
783         // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
784         // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
785         // the deregister operation this could lead to have a handler invoked by different EventLoop and so
786         // threads.
787         //
788         // See:
789         // https://github.com/netty/netty/issues/4435
790         invokeLater(() -> {
791             try {
792                 eventLoop.deregisterForIo(this).addListener(f -> {
793                     if (f.isFailed()) {
794                         logger.warn("Unexpected exception occurred while deregistering a channel.", f.cause());
795                     }
796                     deregisterDone(fireChannelInactive, promise);
797                 });
798             } catch (Throwable t) {
799                 logger.warn("Unexpected exception occurred while deregistering a channel.", t);
800                 deregisterDone(fireChannelInactive, promise);
801             }
802         });
803     }
804 
805     private void deregisterDone(boolean fireChannelInactive, Promise<Void> promise) {
806         if (fireChannelInactive) {
807             pipeline.fireChannelInactive();
808         }
809         // Some transports like local and AIO does not allow the deregistration of
810         // an open channel. Their doDeregister() calls close(). Consequently,
811         // close() calls deregister() again - no need to fire channelUnregistered, so check
812         // if it was registered.
813         if (registered) {
814             registered = false;
815             pipeline.fireChannelUnregistered();
816 
817             if (!isOpen()) {
818                 // Remove all handlers from the ChannelPipeline. This is needed to ensure
819                 // handlerRemoved(...) is called and so resources are released.
820                 while (!pipeline.isEmpty()) {
821                     try {
822                         pipeline.removeLast();
823                     } catch (NoSuchElementException ignore) {
824                         // try again as there may be a race when someone outside the EventLoop removes
825                         // handlers concurrently as well.
826                     }
827                 }
828             }
829         }
830         safeSetSuccess(promise);
831     }
832 
833     private void readTransport() {
834         assertEventLoop();
835 
836         if (!isActive()) {
837             readBeforeActive = true;
838             return;
839         }
840 
841         if (isShutdown(ChannelShutdownDirection.Inbound)) {
842             // Input was shutdown so not try to read.
843             return;
844         }
845         try {
846             doBeginRead();
847         } catch (final Exception e) {
848             invokeLater(() -> pipeline.fireChannelExceptionCaught(e));
849             closeTransport(newPromise());
850         }
851     }
852 
853     private void writeTransport(Object msg, Promise<Void> promise) {
854         assertEventLoop();
855 
856         ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
857         if (outboundBuffer == null) {
858             try {
859                 // release message now to prevent resource-leak
860                 Resource.dispose(msg);
861             } finally {
862                 // If the outboundBuffer is null we know the channel was closed or the outbound was shutdown, so
863                 // need to fail the future right away. If it is not null the handling of the rest
864                 // will be done in writeFlushed()
865                 // See https://github.com/netty/netty/issues/2362
866                 final Throwable cause;
867                 if (!isActive()) {
868                     cause = newClosedChannelException(initialCloseCause, "write(Object, Promise)");
869                 } else {
870                     cause = ChannelOutputShutdownException.newInstance(AbstractChannel.class,
871                             "writeTransport(Object, Promise)");
872                 }
873                 safeSetFailure(promise, cause);
874             }
875             return;
876         }
877 
878         int size;
879         try {
880             msg = filterOutboundMessage(msg);
881             if (estimatorHandler == null) {
882                 estimatorHandler = getMessageSizeEstimator().newHandle();
883             }
884             size = estimatorHandler.size(msg);
885             if (size < 0) {
886                 size = 0;
887             }
888         } catch (Throwable t) {
889             try {
890                 Resource.dispose(msg);
891             } catch (Throwable inner) {
892                 t.addSuppressed(inner);
893             } finally {
894                 safeSetFailure(promise, t);
895             }
896             return;
897         }
898 
899         outboundBuffer.addMessage(msg, size, promise);
900         updateWritabilityIfNeeded(true, false);
901     }
902 
903     private void flushTransport() {
904         assertEventLoop();
905 
906         ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
907         if (outboundBuffer == null) {
908             return;
909         }
910 
911         outboundBuffer.addFlush();
912         writeFlushed();
913     }
914 
915     /**
916      * Write previous flushed messages.
917      */
918     protected void writeFlushed() {
919         assertEventLoop();
920 
921         if (inWriteFlushed) {
922             // Avoid re-entrance
923             return;
924         }
925 
926         final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
927         if (outboundBuffer == null || outboundBuffer.isEmpty()) {
928             return;
929         }
930 
931         inWriteFlushed = true;
932 
933         // Mark all pending write requests as failure if the channel is inactive.
934         if (!isActive()) {
935             try {
936                 // Check if we need to generate the exception at all.
937                 if (!outboundBuffer.isEmpty()) {
938                     if (isOpen()) {
939                         outboundBuffer.failFlushed(new NotYetConnectedException());
940                         updateWritabilityIfNeeded(true, true);
941                     } else {
942                         // Do not trigger channelWritabilityChanged because the channel is closed already.
943                         outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "writeFlushed()"));
944                     }
945                 }
946             } finally {
947                 inWriteFlushed = false;
948             }
949             return;
950         }
951 
952         try {
953             doWrite(outboundBuffer);
954         } catch (Throwable t) {
955             handleWriteError(t);
956         } finally {
957             // It's important that we call this with notifyLater true so we not get into trouble when flush() is called
958             // again in channelWritabilityChanged(...).
959             updateWritabilityIfNeeded(true, true);
960             inWriteFlushed = false;
961         }
962     }
963 
964     protected final void handleWriteError(Throwable t) {
965         assertEventLoop();
966 
967         if (t instanceof IOException && isAutoClose()) {
968             /*
969              * Just call {@link #close(Promise, Throwable, boolean)} here which will take care of
970              * failing all flushed messages and also ensure the actual close of the underlying transport
971              * will happen before the promises are notified.
972              *
973              * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #writableBytes()}}
974              * may still return {@code true} / {@code > 0} even if the channel should be closed as result of
975              * the exception.
976              */
977             initialCloseCause = t;
978             close(newPromise(), t, newClosedChannelException(t, "writeFlushed()"));
979         } else {
980             try {
981                 if (shutdownOutput(newPromise(), t)) {
982                     pipeline().fireChannelShutdown(ChannelShutdownDirection.Outbound);
983                 }
984             } catch (Throwable t2) {
985                 initialCloseCause = t;
986                 close(newPromise(), t2, newClosedChannelException(t, "writeFlushed()"));
987             }
988         }
989     }
990 
991     private ClosedChannelException newClosedChannelException(Throwable cause, String method) {
992         ClosedChannelException exception =
993                 StacklessClosedChannelException.newInstance(AbstractChannel.class, method);
994         if (cause != null) {
995             exception.initCause(cause);
996         }
997         return exception;
998     }
999 
1000     private void sendOutboundEventTransport(Object event, Promise<Void> promise) {
1001         Resource.dispose(event);
1002         promise.setSuccess(null);
1003     }
1004 
1005     protected final boolean ensureOpen(Promise<Void> promise) {
1006         if (isOpen()) {
1007             return true;
1008         }
1009 
1010         safeSetFailure(promise, newClosedChannelException(initialCloseCause, "ensureOpen(Promise)"));
1011         return false;
1012     }
1013 
1014     /**
1015      * Marks the specified {@code promise} as success.  If the {@code promise} is done already, log a message.
1016      */
1017     protected final void safeSetSuccess(Promise<Void> promise) {
1018         if (!promise.trySuccess(null)) {
1019             logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
1020         }
1021     }
1022 
1023     /**
1024      * Marks the specified {@code promise} as failure.  If the {@code promise} is done already, log a message.
1025      */
1026     protected final void safeSetFailure(Promise<Void> promise, Throwable cause) {
1027         if (!promise.tryFailure(cause)) {
1028             logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
1029         }
1030     }
1031 
1032     protected final void closeIfClosed() {
1033         assertEventLoop();
1034 
1035         if (isOpen()) {
1036             return;
1037         }
1038         closeTransport(newPromise());
1039     }
1040 
1041     private void invokeLater(Runnable task) {
1042         try {
1043             // This method is used by outbound operation implementations to trigger an inbound event later.
1044             // They do not trigger an inbound event immediately because an outbound operation might have been
1045             // triggered by another inbound event handler method.  If fired immediately, the call stack
1046             // will look like this for example:
1047             //
1048             //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
1049             //   -> handlerA.ctx.close()
1050             //      -> channel.unsafe.close()
1051             //         -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
1052             //
1053             // which means the execution of two inbound handler methods of the same handler overlap undesirably.
1054             executor().execute(task);
1055         } catch (RejectedExecutionException e) {
1056             logger.warn("Can't invoke task later as EventLoop rejected it", e);
1057         }
1058     }
1059 
1060     /**
1061      * Appends the remote address to the message of the exceptions caused by connection attempt failure.
1062      */
1063     protected static Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
1064         if (cause instanceof ConnectException) {
1065             return new AnnotatedConnectException((ConnectException) cause, remoteAddress);
1066         }
1067         if (cause instanceof NoRouteToHostException) {
1068             return new AnnotatedNoRouteToHostException((NoRouteToHostException) cause, remoteAddress);
1069         }
1070         if (cause instanceof SocketException) {
1071             return new AnnotatedSocketException((SocketException) cause, remoteAddress);
1072         }
1073 
1074         return cause;
1075     }
1076 
1077     /**
1078      * Prepares to close the {@link Channel}. If this method returns an {@link Executor}, the
1079      * caller must call the {@link Executor#execute(Runnable)} method with a task that calls
1080      * {@link #doClose()} on the returned {@link Executor}. If this method returns {@code null},
1081      * {@link #doClose()} must be called from the caller thread. (i.e. {@link EventLoop})
1082      */
1083     protected Future<Executor> prepareToClose() {
1084         return null;
1085     }
1086 
1087     /**
1088      * Returns the {@link ChannelOutboundBuffer} that is used by this {@link AbstractChannel}. This might be
1089      * {@code null} if no more writes are allowed.
1090      *
1091      * @return the outbound buffer.
1092      */
1093     protected final ChannelOutboundBuffer outboundBuffer() {
1094         return outboundBuffer;
1095     }
1096 
1097     /**
1098      * Returns the {@link SocketAddress} which is bound locally.
1099      */
1100     protected abstract L localAddress0();
1101 
1102     /**
1103      * Return the {@link SocketAddress} which the {@link Channel} is connected to.
1104      */
1105     protected abstract R remoteAddress0();
1106 
1107     /**
1108      * Bind the {@link Channel} to the {@link SocketAddress}
1109      */
1110     protected abstract void doBind(SocketAddress localAddress) throws Exception;
1111 
1112     /**
1113      * Disconnect this {@link Channel} from its remote peer
1114      */
1115     protected abstract void doDisconnect() throws Exception;
1116 
1117     /**
1118      * Close the {@link Channel}
1119      */
1120     protected abstract void doClose() throws Exception;
1121 
1122     /**
1123      * Shutdown one direction of the {@link Channel}.
1124      *
1125      * @param direction     the direction to shutdown.
1126      * @throws Exception    thrown on error.
1127      */
1128     protected abstract void doShutdown(ChannelShutdownDirection direction) throws Exception;
1129 
1130     /**
1131      * Schedule a read operation.
1132      */
1133     protected abstract void doBeginRead() throws Exception;
1134 
1135     /**
1136      * Flush the content of the given buffer to the remote peer.
1137      */
1138     protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
1139 
1140     /**
1141      * Connect to remote peer.
1142      *
1143      * @param remoteAddress     the address of the remote peer.
1144      * @param localAddress      the local address of this channel.
1145      * @return                  {@code true} if the connect was completed, {@code false} if {@link #finishConnect()}
1146      *                          will be called later again to try finishing the connect.
1147      * @throws Exception        thrown on error.
1148      */
1149     protected abstract boolean doConnect(
1150             SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
1151 
1152     /**
1153      * Finish a connect request.
1154      *
1155      * @param requestedRemoteAddress    the remote address of the peer.
1156      * @return                  {@code true} if the connect was completed, {@code false} if {@link #finishConnect()}
1157      *                          will be called later again to try finishing the connect.
1158      * @throws Exception        thrown on error.
1159      */
1160     protected abstract boolean doFinishConnect(R requestedRemoteAddress) throws Exception;
1161 
1162     /**
1163      * Returns if a connect request was issued before and we are waiting for {@link #finishConnect()} to be called.
1164      *
1165      * @return {@code true} if there is an outstanding connect request.
1166      */
1167     protected final boolean isConnectPending() {
1168         assertEventLoop();
1169         return connectPromise != null;
1170     }
1171 
1172     @SuppressWarnings("unchecked")
1173     private void connectTransport(
1174             SocketAddress remoteAddress, SocketAddress localAddress, Promise<Void> promise) {
1175         assertEventLoop();
1176         if (!promise.setUncancellable() || !ensureOpen(promise)) {
1177             return;
1178         }
1179 
1180         try {
1181             if (connectPromise != null) {
1182                 throw new ConnectionPendingException();
1183             }
1184 
1185             if (remoteAddress() != null) {
1186                 // Already connected to a remote host.
1187                 throw new AlreadyConnectedException();
1188             }
1189 
1190             boolean wasActive = isActive();
1191             if (doConnect(remoteAddress, localAddress)) {
1192                 fulfillConnectPromise(promise, wasActive);
1193             } else {
1194                 connectPromise = promise;
1195                 requestedRemoteAddress = (R) remoteAddress;
1196 
1197                 // Schedule connect timeout.
1198                 int connectTimeoutMillis = getConnectTimeoutMillis();
1199                 if (connectTimeoutMillis > 0) {
1200                     connectTimeoutFuture = executor().schedule(() -> {
1201                         Promise<Void> connectPromise = this.connectPromise;
1202                         if (connectPromise != null && !connectPromise.isDone()
1203                                 && connectPromise.tryFailure(new ConnectTimeoutException(
1204                                 "connection timed out: " + remoteAddress))) {
1205                             closeTransport(newPromise());
1206                         }
1207                     }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1208                 }
1209 
1210                 promise.asFuture().addListener(future -> {
1211                     if (future.isCancelled()) {
1212                         if (connectTimeoutFuture != null) {
1213                             connectTimeoutFuture.cancel();
1214                         }
1215                         connectPromise = null;
1216                         closeTransport(newPromise());
1217                     }
1218                 });
1219             }
1220         } catch (Throwable t) {
1221             closeIfClosed();
1222             promise.tryFailure(annotateConnectException(t, remoteAddress));
1223         }
1224     }
1225 
1226     private void fulfillConnectPromise(Promise<Void> promise, boolean wasActive) {
1227         if (promise == null) {
1228             // Closed via cancellation and the promise has been notified already.
1229             return;
1230         }
1231 
1232         // Get the state as trySuccess() may trigger an ChannelFutureListeners that will close the Channel.
1233         // We still need to ensure we call fireChannelActive() in this case.
1234         boolean active = isActive();
1235 
1236         // trySuccess() will return false if a user cancelled the connection attempt.
1237         boolean promiseSet = promise.trySuccess(null);
1238 
1239         // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
1240         // because what happened is what happened.
1241         if (!wasActive && active) {
1242             if (fireChannelActiveIfNotActiveBefore()) {
1243                 readIfIsAutoRead();
1244             }
1245         }
1246 
1247         // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
1248         if (!promiseSet) {
1249             closeTransport(newPromise());
1250         }
1251     }
1252 
1253     private void fulfillConnectPromise(Promise<Void> promise, Throwable cause) {
1254         if (promise == null) {
1255             // Closed via cancellation and the promise has been notified already.
1256             return;
1257         }
1258 
1259         // Use tryFailure() instead of setFailure() to avoid the race against cancel().
1260         promise.tryFailure(cause);
1261         closeIfClosed();
1262     }
1263 
1264     /**
1265      * Should be called once the connect request is ready to be completed.
1266      */
1267     protected final void finishConnect() {
1268         // Note this method is invoked by the event loop only if the connection attempt was
1269         // neither cancelled nor timed out.
1270         assertEventLoop();
1271 
1272         boolean connectStillInProgress = false;
1273         try {
1274             boolean wasActive = isActive();
1275             if (!doFinishConnect(requestedRemoteAddress)) {
1276                 connectStillInProgress = true;
1277                 return;
1278             }
1279             requestedRemoteAddress = null;
1280             fulfillConnectPromise(connectPromise, wasActive);
1281         } catch (Throwable t) {
1282             fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
1283         } finally {
1284             if (!connectStillInProgress) {
1285                 // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
1286                 // See https://github.com/netty/netty/issues/1770
1287                 if (connectTimeoutFuture != null) {
1288                     connectTimeoutFuture.cancel();
1289                 }
1290                 connectPromise = null;
1291             }
1292         }
1293     }
1294 
1295     /**
1296      * Invoked when a new message is added to a {@link ChannelOutboundBuffer} of this {@link AbstractChannel}, so that
1297      * the {@link Channel} implementation converts the message to another. (e.g. heap buffer -> direct buffer)
1298      */
1299     protected Object filterOutboundMessage(Object msg) throws Exception {
1300         return msg;
1301     }
1302 
1303     protected static void validateFileRegion(DefaultFileRegion region, long position) throws IOException {
1304         DefaultFileRegion.validate(region, position);
1305     }
1306 
1307     @Override
1308     @SuppressWarnings({ "unchecked", "deprecation" })
1309     public final <T> T getOption(ChannelOption<T> option) {
1310         requireNonNull(option, "option");
1311         if (option == AUTO_READ) {
1312             return (T) Boolean.valueOf(isAutoRead());
1313         }
1314         if (option == WRITE_BUFFER_WATER_MARK) {
1315             return (T) getWriteBufferWaterMark();
1316         }
1317         if (option == CONNECT_TIMEOUT_MILLIS) {
1318             return (T) Integer.valueOf(getConnectTimeoutMillis());
1319         }
1320         if (option == MAX_MESSAGES_PER_READ) {
1321             return (T) Integer.valueOf(getMaxMessagesPerRead());
1322         }
1323         if (option == WRITE_SPIN_COUNT) {
1324             return (T) Integer.valueOf(getWriteSpinCount());
1325         }
1326         if (option == BUFFER_ALLOCATOR) {
1327             return (T) getBufferAllocator();
1328         }
1329         if (option == RCVBUFFER_ALLOCATOR) {
1330             return getRecvBufferAllocator();
1331         }
1332         if (option == AUTO_CLOSE) {
1333             return (T) Boolean.valueOf(isAutoClose());
1334         }
1335         if (option == MESSAGE_SIZE_ESTIMATOR) {
1336             return (T) getMessageSizeEstimator();
1337         }
1338         if (option == MAX_MESSAGES_PER_WRITE) {
1339             return (T) Integer.valueOf(getMaxMessagesPerWrite());
1340         }
1341         if (option == ALLOW_HALF_CLOSURE) {
1342             return (T) Boolean.valueOf(isAllowHalfClosure());
1343         }
1344 
1345         return getExtendedOption(option);
1346     }
1347 
1348     /**
1349      * Override to add support for more {@link ChannelOption}s.
1350      * You need to also call {@link super} after handling the extra options.
1351      *
1352      * @param option    the {@link ChannelOption}.
1353      * @return          the value for the option
1354      * @param <T>       the value type.
1355      * @throws UnsupportedOperationException    if the {@link ChannelOption} is not supported.
1356      */
1357     protected  <T> T getExtendedOption(ChannelOption<T> option) {
1358         throw new UnsupportedOperationException("ChannelOption not supported: " + option);
1359     }
1360 
1361     @Override
1362     @SuppressWarnings("deprecation")
1363     public final <T> Channel setOption(ChannelOption<T> option, T value) {
1364         validate(option, value);
1365 
1366         if (option == AUTO_READ) {
1367             setAutoRead((Boolean) value);
1368         } else if (option == WRITE_BUFFER_WATER_MARK) {
1369             setWriteBufferWaterMark((WriteBufferWaterMark) value);
1370         } else if (option == CONNECT_TIMEOUT_MILLIS) {
1371             setConnectTimeoutMillis((Integer) value);
1372         } else if (option == MAX_MESSAGES_PER_READ) {
1373             setMaxMessagesPerRead((Integer) value);
1374         } else if (option == WRITE_SPIN_COUNT) {
1375             setWriteSpinCount((Integer) value);
1376         } else if (option == BUFFER_ALLOCATOR) {
1377             setBufferAllocator((BufferAllocator) value);
1378         } else if (option == RCVBUFFER_ALLOCATOR) {
1379             setRecvBufferAllocator((RecvBufferAllocator) value);
1380         } else if (option == AUTO_CLOSE) {
1381             setAutoClose((Boolean) value);
1382         } else if (option == MESSAGE_SIZE_ESTIMATOR) {
1383             setMessageSizeEstimator((MessageSizeEstimator) value);
1384         } else if (option == MAX_MESSAGES_PER_WRITE) {
1385             setMaxMessagesPerWrite((Integer) value);
1386         } else if (option == ALLOW_HALF_CLOSURE) {
1387             setAllowHalfClosure((Boolean) value);
1388         } else {
1389             setExtendedOption(option, value);
1390         }
1391 
1392         return this;
1393     }
1394 
1395     /**
1396      * Override to add support for more {@link ChannelOption}s.
1397      * You need to also call {@link super} after handling the extra options.
1398      *
1399      * @param option    the {@link ChannelOption}.
1400      * @param <T>       the value type.
1401      * @throws UnsupportedOperationException    if the {@link ChannelOption} is not supported.
1402      */
1403     protected <T> void setExtendedOption(ChannelOption<T> option, T value) {
1404         throw new UnsupportedOperationException("ChannelOption not supported: " + option);
1405     }
1406 
1407     @Override
1408     public final boolean isOptionSupported(ChannelOption<?> option) {
1409         if (SUPPORTED_CHANNEL_OPTIONS.contains(option)) {
1410             return true;
1411         }
1412         return isExtendedOptionSupported(option);
1413     }
1414 
1415     /**
1416      * Override to add support for more {@link ChannelOption}s.
1417      * You need to also call {@link super} after handling the extra options.
1418      *
1419      * @param option    the {@link ChannelOption}.
1420      * @return          {@code true} if supported, {@code false} otherwise.
1421      */
1422     protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
1423         return false;
1424     }
1425 
1426     private static Set<ChannelOption<?>> supportedOptions() {
1427         return newSupportedIdentityOptionsSet(
1428                 AUTO_READ, WRITE_BUFFER_WATER_MARK, CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ,
1429                 WRITE_SPIN_COUNT, BUFFER_ALLOCATOR, RCVBUFFER_ALLOCATOR, AUTO_CLOSE, MESSAGE_SIZE_ESTIMATOR,
1430                 MAX_MESSAGES_PER_WRITE, ALLOW_HALF_CLOSURE);
1431     }
1432 
1433     protected static Set<ChannelOption<?>> newSupportedIdentityOptionsSet(ChannelOption<?>... options) {
1434         Set<ChannelOption<?>> supportedOptionsSet = Collections.newSetFromMap(new IdentityHashMap<>());
1435         Collections.addAll(supportedOptionsSet, options);
1436         return Collections.unmodifiableSet(supportedOptionsSet);
1437     }
1438 
1439     protected <T> void validate(ChannelOption<T> option, T value) {
1440         requireNonNull(option, "option");
1441         option.validate(value);
1442     }
1443 
1444     private int getConnectTimeoutMillis() {
1445         return connectTimeoutMillis;
1446     }
1447 
1448     private void setConnectTimeoutMillis(int connectTimeoutMillis) {
1449         checkPositiveOrZero(connectTimeoutMillis, "connectTimeoutMillis");
1450         this.connectTimeoutMillis = connectTimeoutMillis;
1451     }
1452 
1453     /**
1454      * <p>
1455      * @throws IllegalStateException if {@link #getRecvBufferAllocator()} does not return an object of type
1456      * {@link MaxMessagesRecvBufferAllocator}.
1457      */
1458     @Deprecated
1459     private int getMaxMessagesPerRead() {
1460         try {
1461             MaxMessagesRecvBufferAllocator allocator = getRecvBufferAllocator();
1462             return allocator.maxMessagesPerRead();
1463         } catch (ClassCastException e) {
1464             throw new IllegalStateException("getRecvBufferAllocator() must return an object of type " +
1465                     "MaxMessagesRecvBufferAllocator", e);
1466         }
1467     }
1468 
1469     /**
1470      * <p>
1471      * @throws IllegalStateException if {@link #getRecvBufferAllocator()} does not return an object of type
1472      * {@link MaxMessagesRecvBufferAllocator}.
1473      */
1474     @Deprecated
1475     private void setMaxMessagesPerRead(int maxMessagesPerRead) {
1476         try {
1477             MaxMessagesRecvBufferAllocator allocator = getRecvBufferAllocator();
1478             allocator.maxMessagesPerRead(maxMessagesPerRead);
1479         } catch (ClassCastException e) {
1480             throw new IllegalStateException("getRecvBufferAllocator() must return an object of type " +
1481                     "MaxMessagesRecvBufferAllocator", e);
1482         }
1483     }
1484 
1485     /**
1486      * Get the maximum number of message to write per eventloop run. Once this limit is
1487      * reached we will continue to process other events before trying to write the remaining messages.
1488      */
1489     protected final int getMaxMessagesPerWrite() {
1490         return maxMessagesPerWrite;
1491     }
1492 
1493     /**
1494      * Set the maximum number of message to write per eventloop run. Once this limit is
1495      * reached we will continue to process other events before trying to write the remaining messages.
1496      */
1497     private void setMaxMessagesPerWrite(int maxMessagesPerWrite) {
1498         this.maxMessagesPerWrite = checkPositive(maxMessagesPerWrite, "maxMessagesPerWrite");
1499     }
1500 
1501     protected final int getWriteSpinCount() {
1502         return writeSpinCount;
1503     }
1504 
1505     private void setWriteSpinCount(int writeSpinCount) {
1506         checkPositive(writeSpinCount, "writeSpinCount");
1507         // Integer.MAX_VALUE is used as a special value in the channel implementations to indicate the channel cannot
1508         // accept any more data, and results in the writeOp being set on the selector (or execute a runnable which tries
1509         // to flush later because the writeSpinCount quantum has been exhausted). This strategy prevents additional
1510         // conditional logic in the channel implementations, and shouldn't be noticeable in practice.
1511         if (writeSpinCount == Integer.MAX_VALUE) {
1512             --writeSpinCount;
1513         }
1514         this.writeSpinCount = writeSpinCount;
1515     }
1516 
1517     private BufferAllocator getBufferAllocator() {
1518         return bufferAllocator;
1519     }
1520 
1521     public void setBufferAllocator(BufferAllocator bufferAllocator) {
1522         requireNonNull(bufferAllocator, "bufferAllocator");
1523         this.bufferAllocator = bufferAllocator;
1524     }
1525 
1526     @SuppressWarnings("unchecked")
1527     private <T extends RecvBufferAllocator> T getRecvBufferAllocator() {
1528         return (T) rcvBufAllocator;
1529     }
1530 
1531     private void setRecvBufferAllocator(RecvBufferAllocator allocator) {
1532         rcvBufAllocator = requireNonNull(allocator, "allocator");
1533     }
1534 
1535     protected final boolean isAutoRead() {
1536         return autoRead == 1;
1537     }
1538 
1539     private void setAutoRead(boolean autoRead) {
1540         boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
1541         if (autoRead && !oldAutoRead) {
1542             read();
1543         } else if (!autoRead && oldAutoRead) {
1544             autoReadCleared();
1545         }
1546     }
1547 
1548     /**
1549      * Is called once {@link #setAutoRead(boolean)} is called with {@code false} and {@link #isAutoRead()} was
1550      * {@code true} before.
1551      */
1552     protected void autoReadCleared() { }
1553 
1554     private boolean isAutoClose() {
1555         return autoClose;
1556     }
1557 
1558     private void setAutoClose(boolean autoClose) {
1559         this.autoClose = autoClose;
1560     }
1561 
1562     private void setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
1563         this.writeBufferWaterMark = requireNonNull(writeBufferWaterMark, "writeBufferWaterMark");
1564     }
1565 
1566     private WriteBufferWaterMark getWriteBufferWaterMark() {
1567         return writeBufferWaterMark;
1568     }
1569 
1570     private MessageSizeEstimator getMessageSizeEstimator() {
1571         return msgSizeEstimator;
1572     }
1573 
1574     private void setMessageSizeEstimator(MessageSizeEstimator estimator) {
1575         requireNonNull(estimator, "estimator");
1576         msgSizeEstimator = estimator;
1577     }
1578 
1579     protected final boolean isAllowHalfClosure() {
1580         return allowHalfClosure;
1581     }
1582 
1583     private void setAllowHalfClosure(boolean allowHalfClosure) {
1584         this.allowHalfClosure = allowHalfClosure;
1585     }
1586 
1587     private static final class ClosePromise extends DefaultPromise<Void> {
1588 
1589         ClosePromise(EventExecutor eventExecutor) {
1590             super(eventExecutor);
1591         }
1592 
1593         @Override
1594         public Promise<Void> setSuccess(Void result) {
1595             throw new IllegalStateException();
1596         }
1597 
1598         @Override
1599         public Promise<Void> setFailure(Throwable cause) {
1600             throw new IllegalStateException();
1601         }
1602 
1603         @Override
1604         public boolean trySuccess(Void result) {
1605             throw new IllegalStateException();
1606         }
1607 
1608         @Override
1609         public boolean tryFailure(Throwable cause) {
1610             throw new IllegalStateException();
1611         }
1612 
1613         @Override
1614         public boolean setUncancellable() {
1615             return false;
1616         }
1617 
1618         void setClosed() {
1619             super.trySuccess(null);
1620         }
1621     }
1622 
1623     private static final class AnnotatedConnectException extends ConnectException {
1624 
1625         private static final long serialVersionUID = 3901958112696433556L;
1626 
1627         AnnotatedConnectException(ConnectException exception, SocketAddress remoteAddress) {
1628             super(exception.getMessage() + ": " + remoteAddress);
1629             initCause(exception);
1630         }
1631 
1632         // Suppress a warning since this method doesn't need synchronization
1633         @Override
1634         public Throwable fillInStackTrace() {   // lgtm[java/non-sync-override]
1635             return this;
1636         }
1637     }
1638 
1639     private static final class AnnotatedNoRouteToHostException extends NoRouteToHostException {
1640 
1641         private static final long serialVersionUID = -6801433937592080623L;
1642 
1643         AnnotatedNoRouteToHostException(NoRouteToHostException exception, SocketAddress remoteAddress) {
1644             super(exception.getMessage() + ": " + remoteAddress);
1645             initCause(exception);
1646         }
1647 
1648         // Suppress a warning since this method doesn't need synchronization
1649         @Override
1650         public Throwable fillInStackTrace() {   // lgtm[java/non-sync-override]
1651             return this;
1652         }
1653     }
1654 
1655     private static final class AnnotatedSocketException extends SocketException {
1656 
1657         private static final long serialVersionUID = 3896743275010454039L;
1658 
1659         AnnotatedSocketException(SocketException exception, SocketAddress remoteAddress) {
1660             super(exception.getMessage() + ": " + remoteAddress);
1661             initCause(exception);
1662         }
1663 
1664         // Suppress a warning since this method doesn't need synchronization
1665         @Override
1666         public Throwable fillInStackTrace() {   // lgtm[java/non-sync-override]
1667             return this;
1668         }
1669     }
1670 
1671     protected void runAfterTransportAction() {
1672         // Noop
1673     }
1674 
1675     protected static class DefaultAbstractChannelPipeline extends DefaultChannelPipeline {
1676         protected DefaultAbstractChannelPipeline(AbstractChannel<?, ?, ?> channel) {
1677             super(channel);
1678         }
1679 
1680         protected final AbstractChannel<?, ?, ?> abstractChannel() {
1681             return (AbstractChannel<?, ?, ?>) channel();
1682         }
1683 
1684         @Override
1685         protected final EventExecutor transportExecutor() {
1686             return abstractChannel().executor();
1687         }
1688 
1689         @Override
1690         protected final void pendingOutboundBytesUpdated(long pendingOutboundBytes) {
1691             abstractChannel().updateWritabilityIfNeeded(true, false);
1692         }
1693 
1694         @Override
1695         protected final void registerTransport(Promise<Void> promise) {
1696             AbstractChannel<?, ?, ?> channel = abstractChannel();
1697             channel.registerTransport(promise);
1698             channel.runAfterTransportAction();
1699         }
1700 
1701         @Override
1702         protected final void bindTransport(SocketAddress localAddress, Promise<Void> promise) {
1703             AbstractChannel<?, ?, ?> channel = abstractChannel();
1704             channel.bindTransport(localAddress, promise);
1705             channel.runAfterTransportAction();
1706         }
1707 
1708         @Override
1709         protected final void connectTransport(
1710                 SocketAddress remoteAddress, SocketAddress localAddress, Promise<Void> promise) {
1711             AbstractChannel<?, ?, ?> channel = abstractChannel();
1712             channel.connectTransport(remoteAddress, localAddress, promise);
1713             channel.runAfterTransportAction();
1714         }
1715 
1716         @Override
1717         protected final void disconnectTransport(Promise<Void> promise) {
1718             AbstractChannel<?, ?, ?> channel = abstractChannel();
1719             channel.disconnectTransport(promise);
1720             channel.runAfterTransportAction();
1721         }
1722 
1723         @Override
1724         protected final void closeTransport(Promise<Void> promise) {
1725             AbstractChannel<?, ?, ?> channel = abstractChannel();
1726             abstractChannel().closeTransport(promise);
1727             channel.runAfterTransportAction();
1728         }
1729 
1730         @Override
1731         protected final void shutdownTransport(ChannelShutdownDirection direction, Promise<Void> promise) {
1732             AbstractChannel<?, ?, ?> channel = abstractChannel();
1733             channel.shutdownTransport(direction, promise);
1734             channel.runAfterTransportAction();
1735         }
1736 
1737         @Override
1738         protected final void deregisterTransport(Promise<Void> promise) {
1739             AbstractChannel<?, ?, ?> channel = abstractChannel();
1740             channel.deregisterTransport(promise);
1741             channel.runAfterTransportAction();
1742         }
1743 
1744         @Override
1745         protected final void readTransport() {
1746             AbstractChannel<?, ?, ?> channel = abstractChannel();
1747             channel.readTransport();
1748             channel.runAfterTransportAction();
1749         }
1750 
1751         @Override
1752         protected final void writeTransport(Object msg, Promise<Void> promise) {
1753             AbstractChannel<?, ?, ?> channel = abstractChannel();
1754             channel.writeTransport(msg, promise);
1755             channel.runAfterTransportAction();
1756         }
1757 
1758         @Override
1759         protected final void flushTransport() {
1760             AbstractChannel<?, ?, ?> channel = abstractChannel();
1761             channel.flushTransport();
1762             channel.runAfterTransportAction();
1763         }
1764 
1765         @Override
1766         protected final void sendOutboundEventTransport(Object event, Promise<Void> promise) {
1767             AbstractChannel<?, ?, ?> channel = abstractChannel();
1768             channel.sendOutboundEventTransport(event, promise);
1769             channel.runAfterTransportAction();
1770         }
1771     }
1772 }