View Javadoc
1   /*
2    * Copyright 2019 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.codec.http2;
17  
18  import io.netty5.buffer.api.BufferAllocator;
19  import io.netty5.buffer.api.DefaultBufferAllocators;
20  import io.netty5.channel.AdaptiveRecvBufferAllocator;
21  import io.netty5.channel.ChannelOption;
22  import io.netty5.channel.ChannelOutputShutdownException;
23  import io.netty5.channel.ChannelShutdownDirection;
24  import io.netty5.channel.MaxMessagesRecvBufferAllocator;
25  import io.netty5.util.Resource;
26  import io.netty5.channel.Channel;
27  import io.netty5.channel.ChannelHandler;
28  import io.netty5.channel.ChannelId;
29  import io.netty5.channel.ChannelMetadata;
30  import io.netty5.channel.ChannelPipeline;
31  import io.netty5.channel.DefaultChannelPipeline;
32  import io.netty5.channel.EventLoop;
33  import io.netty5.channel.MessageSizeEstimator;
34  import io.netty5.channel.RecvBufferAllocator;
35  import io.netty5.channel.WriteBufferWaterMark;
36  import io.netty5.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
37  import io.netty5.util.DefaultAttributeMap;
38  import io.netty5.util.concurrent.EventExecutor;
39  import io.netty5.util.concurrent.Future;
40  import io.netty5.util.concurrent.Promise;
41  import io.netty5.util.internal.StringUtil;
42  import io.netty5.util.internal.ThrowableUtil;
43  import io.netty5.util.internal.logging.InternalLogger;
44  import io.netty5.util.internal.logging.InternalLoggerFactory;
45  
46  import java.io.IOException;
47  import java.net.SocketAddress;
48  import java.nio.channels.ClosedChannelException;
49  import java.nio.channels.NotYetConnectedException;
50  import java.util.ArrayDeque;
51  import java.util.Queue;
52  import java.util.concurrent.RejectedExecutionException;
53  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
54  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
55  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
56  
57  import static io.netty5.channel.ChannelOption.ALLOW_HALF_CLOSURE;
58  import static io.netty5.channel.ChannelOption.AUTO_CLOSE;
59  import static io.netty5.channel.ChannelOption.AUTO_READ;
60  import static io.netty5.channel.ChannelOption.BUFFER_ALLOCATOR;
61  import static io.netty5.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
62  import static io.netty5.channel.ChannelOption.MAX_MESSAGES_PER_READ;
63  import static io.netty5.channel.ChannelOption.MAX_MESSAGES_PER_WRITE;
64  import static io.netty5.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR;
65  import static io.netty5.channel.ChannelOption.RCVBUFFER_ALLOCATOR;
66  import static io.netty5.channel.ChannelOption.WRITE_BUFFER_WATER_MARK;
67  import static io.netty5.channel.ChannelOption.WRITE_SPIN_COUNT;
68  import static io.netty5.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
69  import static io.netty5.util.internal.ObjectUtil.checkPositive;
70  import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
71  import static java.lang.Math.min;
72  import static java.util.Objects.requireNonNull;
73  
74  final class DefaultHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
75  
76      static final Http2FrameStreamVisitor WRITABLE_VISITOR = stream -> {
77          final DefaultHttp2StreamChannel childChannel = (DefaultHttp2StreamChannel)
78                  ((DefaultHttp2FrameStream) stream).attachment;
79          childChannel.trySetWritable();
80          return true;
81      };
82  
83      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2StreamChannel.class);
84  
85      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
86  
87      /**
88       * Number of bytes to consider non-payload messages. 9 is arbitrary, but also the minimum size of an HTTP/2 frame.
89       * Primarily is non-zero.
90       */
91      private static final int MIN_HTTP2_FRAME_SIZE = 9;
92  
93      /**
94       * Returns the flow-control size for DATA frames, and {@value MIN_HTTP2_FRAME_SIZE} for all other frames.
95       */
96      private static final class FlowControlledFrameSizeEstimator implements MessageSizeEstimator {
97  
98          static final FlowControlledFrameSizeEstimator INSTANCE = new FlowControlledFrameSizeEstimator();
99  
100     private static final Handle HANDLE_INSTANCE = msg -> msg instanceof Http2DataFrame ?
101             // Guard against overflow.
102             (int) min(Integer.MAX_VALUE, ((Http2DataFrame) msg).initialFlowControlledBytes() +
103                     (long) MIN_HTTP2_FRAME_SIZE) : MIN_HTTP2_FRAME_SIZE;
104 
105         @Override
106         public Handle newHandle() {
107             return HANDLE_INSTANCE;
108         }
109     }
110 
111     private static final AtomicLongFieldUpdater<DefaultHttp2StreamChannel> TOTAL_PENDING_SIZE_UPDATER =
112             AtomicLongFieldUpdater.newUpdater(DefaultHttp2StreamChannel.class, "totalPendingSize");
113 
114     private static final AtomicIntegerFieldUpdater<DefaultHttp2StreamChannel> UNWRITABLE_UPDATER =
115             AtomicIntegerFieldUpdater.newUpdater(DefaultHttp2StreamChannel.class, "unwritable");
116 
117     private static void windowUpdateFrameWriteComplete(Channel streamChannel, Future<?> future) {
118         Throwable cause = future.cause();
119         if (cause != null) {
120             Throwable unwrappedCause;
121             // Unwrap if needed
122             if (cause instanceof Http2FrameStreamException && (unwrappedCause = cause.getCause()) != null) {
123                 cause = unwrappedCause;
124             }
125 
126             // Notify the child-channel and close it.
127             streamChannel.pipeline().fireChannelExceptionCaught(cause);
128             ((DefaultHttp2StreamChannel) streamChannel).closeTransport(streamChannel.newPromise());
129         }
130     }
131 
132     /**
133      * The current status of the read-processing for a {@link DefaultHttp2StreamChannel}.
134      */
135     private enum ReadStatus {
136         /**
137          * No read in progress and no read was requested (yet)
138          */
139         IDLE,
140 
141         /**
142          * Reading in progress
143          */
144         IN_PROGRESS,
145 
146         /**
147          * A read operation was requested.
148          */
149         REQUESTED
150     }
151 
152     private static final AtomicIntegerFieldUpdater<DefaultHttp2StreamChannel> AUTOREAD_UPDATER =
153             AtomicIntegerFieldUpdater.newUpdater(DefaultHttp2StreamChannel.class, "autoRead");
154     private static final AtomicReferenceFieldUpdater<DefaultHttp2StreamChannel, WriteBufferWaterMark>
155             WATERMARK_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
156                     DefaultHttp2StreamChannel.class, WriteBufferWaterMark.class, "writeBufferWaterMark");
157 
158     private volatile BufferAllocator bufferAllocator = DefaultBufferAllocators.preferredAllocator();
159     private volatile RecvBufferAllocator rcvBufAllocator = new AdaptiveRecvBufferAllocator();
160 
161     private volatile int connectTimeoutMillis = 30000;
162     private volatile int writeSpinCount = 16;
163     private volatile int maxMessagesPerWrite = Integer.MAX_VALUE;
164 
165     @SuppressWarnings("FieldMayBeFinal")
166     private volatile int autoRead = 1;
167     private volatile boolean autoClose = true;
168     private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
169     private volatile boolean allowHalfClosure;
170     private final Http2MultiplexHandler handler;
171     private final ChannelId channelId;
172     private final ChannelPipeline pipeline;
173     private final DefaultHttp2FrameStream stream;
174     private final Promise<Void> closePromise;
175 
176     private volatile boolean registered;
177 
178     private volatile long totalPendingSize;
179     private volatile int unwritable;
180     private volatile boolean inputShutdown;
181     private volatile boolean outputShutdown;
182 
183     // Cached to reduce GC
184     private Runnable fireChannelWritabilityChangedTask;
185 
186     private int flowControlledBytes;
187 
188     /**
189      * This variable represents if a read is in progress for the current channel or was requested.
190      * Note that depending upon the {@link RecvBufferAllocator} behavior a read may extend beyond the
191      * {@link #readTransport()} method scope. The {@link #readTransport()} loop may
192      * drain all pending data, and then if the parent channel is reading this channel may still accept frames.
193      */
194     private ReadStatus readStatus = ReadStatus.IDLE;
195 
196     private Queue<Object> inboundBuffer;
197 
198     /** {@code true} after the first HEADERS frame has been written **/
199     private boolean firstFrameWritten;
200     private boolean readCompletePending;
201 
202     private RecvBufferAllocator.Handle recvHandle;
203     private boolean writeDoneAndNoFlush;
204     private boolean closeInitiated;
205     private boolean readEOS;
206 
207     DefaultHttp2StreamChannel(Http2MultiplexHandler handler, DefaultHttp2FrameStream stream, int id,
208                                ChannelHandler inboundHandler) {
209         this.handler = handler;
210         this.stream = stream;
211         stream.attachment = this;
212         pipeline = new DefaultHttp2ChannelPipeline(this);
213         closePromise = pipeline.newPromise();
214         channelId = new Http2StreamChannelId(parent().id(), id);
215 
216         if (inboundHandler != null) {
217             // Add the handler to the pipeline now that we are registered.
218             pipeline.addLast(inboundHandler);
219         }
220     }
221 
222     private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
223         if (size == 0) {
224             return;
225         }
226 
227         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
228         if (newWriteBufferSize > writeBufferWaterMark.high()) {
229             setUnwritable(invokeLater);
230         }
231     }
232 
233     private void decrementPendingOutboundBytes(long size, boolean invokeLater) {
234         if (size == 0) {
235             return;
236         }
237 
238         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
239         // Once the totalPendingSize dropped below the low water-mark we can mark the child channel
240         // as writable again. Before doing so we also need to ensure the parent channel is writable to
241         // prevent excessive buffering in the parent outbound buffer. If the parent is not writable
242         // we will mark the child channel as writable once the parent becomes writable by calling
243         // trySetWritable() later.
244         if (newWriteBufferSize < writeBufferWaterMark.low() && parent().isWritable()) {
245             setWritable(invokeLater);
246         }
247     }
248 
249     void trySetWritable() {
250         // The parent is writable again but the child channel itself may still not be writable.
251         // Lets try to set the child channel writable to match the state of the parent channel
252         // if (and only if) the totalPendingSize is smaller then the low water-mark.
253         // If this is not the case we will try again later once we drop under it.
254         if (totalPendingSize < writeBufferWaterMark.low()) {
255             setWritable(false);
256         }
257     }
258 
259     private void setWritable(boolean invokeLater) {
260         for (;;) {
261             final int oldValue = unwritable;
262             final int newValue = oldValue & ~1;
263             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
264                 if (oldValue != 0 && newValue == 0) {
265                     fireChannelWritabilityChanged(invokeLater);
266                 }
267                 break;
268             }
269         }
270     }
271 
272     private void setUnwritable(boolean invokeLater) {
273         for (;;) {
274             final int oldValue = unwritable;
275             final int newValue = oldValue | 1;
276             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
277                 if (oldValue == 0) {
278                     fireChannelWritabilityChanged(invokeLater);
279                 }
280                 break;
281             }
282         }
283     }
284 
285     private void fireChannelWritabilityChanged(boolean invokeLater) {
286         final ChannelPipeline pipeline = pipeline();
287         if (invokeLater) {
288             Runnable task = fireChannelWritabilityChangedTask;
289             if (task == null) {
290                 fireChannelWritabilityChangedTask = task = pipeline::fireChannelWritabilityChanged;
291             }
292             executor().execute(task);
293         } else {
294             pipeline.fireChannelWritabilityChanged();
295         }
296     }
297     @Override
298     public Http2FrameStream stream() {
299         return stream;
300     }
301 
302     void closeOutbound() {
303         outputShutdown = true;
304     }
305 
306     void streamClosed() {
307         readEOS();
308         // Attempt to drain any queued data from the queue and deliver it to the application before closing this
309         // channel.
310         doBeginRead();
311     }
312 
313     @Override
314     public ChannelMetadata metadata() {
315         return METADATA;
316     }
317 
318     @Override
319     public boolean isOpen() {
320         return !closePromise.isDone();
321     }
322 
323     @Override
324     public boolean isActive() {
325         return isOpen();
326     }
327 
328     @Override
329     public boolean isShutdown(ChannelShutdownDirection direction) {
330         if (!isActive()) {
331             return true;
332         }
333         switch (direction) {
334             case Inbound:
335                 return inputShutdown;
336             case Outbound:
337                 return outputShutdown;
338             default:
339                 throw new AssertionError();
340         }
341     }
342 
343     @Override
344     public ChannelId id() {
345         return channelId;
346     }
347 
348     @Override
349     public EventLoop executor() {
350         return parent().executor();
351     }
352 
353     @Override
354     public Channel parent() {
355         return handler.parentContext().channel();
356     }
357 
358     @Override
359     public boolean isRegistered() {
360         return registered;
361     }
362 
363     @Override
364     public SocketAddress localAddress() {
365         return parent().localAddress();
366     }
367 
368     @Override
369     public SocketAddress remoteAddress() {
370         return parent().remoteAddress();
371     }
372 
373     @Override
374     public Future<Void> closeFuture() {
375         return closePromise.asFuture();
376     }
377 
378     @Override
379     public long writableBytes() {
380         long bytes = writeBufferWaterMark.high()
381                 - totalPendingSize - pipeline.pendingOutboundBytes();
382         // If bytes is negative we know we are not writable.
383         if (bytes > 0) {
384             return unwritable == 0 ? bytes : 0;
385         }
386         return 0;
387     }
388 
389     @Override
390     public ChannelPipeline pipeline() {
391         return pipeline;
392     }
393 
394     @Override
395     public int hashCode() {
396         return id().hashCode();
397     }
398 
399     @Override
400     public boolean equals(Object o) {
401         return this == o;
402     }
403 
404     @Override
405     public int compareTo(Channel o) {
406         if (this == o) {
407             return 0;
408         }
409 
410         return id().compareTo(o.id());
411     }
412 
413     @Override
414     public String toString() {
415         return parent().toString() + "(H2 - " + stream + ')';
416     }
417 
418     /**
419      * Receive a read message. This does not notify handlers unless a read is in progress on the
420      * channel.
421      */
422     void fireChildRead(Http2Frame frame) {
423         assert executor().inEventLoop();
424         if (!isActive()) {
425             Resource.dispose(frame);
426         } else if (readStatus != ReadStatus.IDLE) {
427             // If a read is in progress or has been requested, there cannot be anything in the queue,
428             // otherwise we would have drained it from the queue and processed it during the read cycle.
429             assert inboundBuffer == null || inboundBuffer.isEmpty();
430             final RecvBufferAllocator.Handle allocHandle = recvBufAllocHandle();
431             doRead0(frame, allocHandle);
432             // We currently don't need to check for readEOS because the parent channel and child channel are limited
433             // to the same EventLoop thread. There are a limited number of frame types that may come after EOS is
434             // read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the
435             // cost of additional readComplete notifications on the rare path.
436             if (allocHandle.continueReading(isAutoRead()) && !isShutdown(ChannelShutdownDirection.Inbound)) {
437                 maybeAddChannelToReadCompletePendingQueue();
438             } else {
439                 notifyReadComplete(allocHandle, true);
440             }
441         } else {
442             if (inboundBuffer == null) {
443                 inboundBuffer = new ArrayDeque<>(4);
444             }
445             inboundBuffer.add(frame);
446         }
447     }
448 
449     void fireChildReadComplete() {
450         assert executor().inEventLoop();
451         assert readStatus != ReadStatus.IDLE || !readCompletePending;
452         notifyReadComplete(recvBufAllocHandle(), false);
453     }
454 
455     private void connectTransport(final SocketAddress remoteAddress,
456                         SocketAddress localAddress, Promise<Void> promise) {
457         if (!promise.setUncancellable()) {
458             return;
459         }
460         promise.setFailure(new UnsupportedOperationException());
461     }
462 
463     private RecvBufferAllocator.Handle recvBufAllocHandle() {
464         if (recvHandle == null) {
465             recvHandle = rcvBufAllocator.newHandle();
466             recvHandle.reset();
467         }
468         return recvHandle;
469     }
470 
471     private void registerTransport(Promise<Void> promise) {
472         if (!promise.setUncancellable()) {
473             return;
474         }
475         if (registered) {
476             promise.setFailure(new UnsupportedOperationException("Re-register is not supported"));
477             return;
478         }
479 
480         registered = true;
481 
482         promise.setSuccess(null);
483 
484         pipeline().fireChannelRegistered();
485         if (isActive()) {
486             pipeline().fireChannelActive();
487             if (isAutoRead()) {
488                 read();
489             }
490         }
491     }
492 
493     private void bindTransport(SocketAddress localAddress, Promise<Void> promise) {
494         if (!promise.setUncancellable()) {
495             return;
496         }
497         promise.setFailure(new UnsupportedOperationException());
498     }
499 
500     private void disconnectTransport(Promise<Void> promise) {
501         closeTransport(promise);
502     }
503 
504     private void closeTransport(final Promise<Void> promise) {
505         if (!promise.setUncancellable()) {
506             return;
507         }
508         if (closeInitiated) {
509             if (closePromise.isDone()) {
510                 // Closed already.
511                 promise.setSuccess(null);
512             } else  {
513                 // This means close() was called before so we just register a listener and return
514                 closeFuture().addListener(promise, (p, future) -> p.setSuccess(null));
515             }
516             return;
517         }
518         closeInitiated = true;
519         // Just set to false as removing from an underlying queue would even be more expensive.
520         readCompletePending = false;
521 
522         final boolean wasActive = isActive();
523 
524         // There is no need to update the local window as once the stream is closed all the pending bytes will be
525         // given back to the connection window by the controller itself.
526 
527         // Only ever send a reset frame if the connection is still alive and if the stream was created before
528         // as otherwise we may send a RST on a stream in an invalid state and cause a connection error.
529         if (parent().isActive() && !readEOS && isStreamIdValid(stream.id())) {
530             Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream());
531             writeTransport(resetFrame, newPromise());
532             flush();
533         }
534 
535         if (inboundBuffer != null) {
536             for (;;) {
537                 Object msg = inboundBuffer.poll();
538                 if (msg == null) {
539                     break;
540                 }
541                 Resource.dispose(msg);
542             }
543             inboundBuffer = null;
544         }
545 
546         // The promise should be notified before we call fireChannelInactive().
547         outputShutdown = true;
548         closePromise.setSuccess(null);
549         promise.setSuccess(null);
550 
551         fireChannelInactiveAndDeregister(newPromise(), wasActive);
552     }
553 
554     private void shutdownTransport(ChannelShutdownDirection direction, Promise<Void> promise) {
555         if (!promise.setUncancellable()) {
556             return;
557         }
558         if (!isActive()) {
559             if (isOpen()) {
560                 promise.setFailure(new NotYetConnectedException());
561             } else {
562                 promise.setFailure(new ClosedChannelException());
563             }
564             return;
565         }
566         if (isShutdown(direction)) {
567             // Already shutdown so let's just make this a noop.
568             promise.setSuccess(null);
569             return;
570         }
571         boolean fireEvent = false;
572         switch (direction) {
573             case Outbound:
574                 fireEvent = shutdownOutput(true, promise);
575                 break;
576             case Inbound:
577                 try {
578                     inputShutdown = true;
579                     promise.setSuccess(null);
580                     fireEvent = true;
581                 } catch (Throwable cause) {
582                     promise.setFailure(cause);
583                 }
584                 break;
585             default:
586                 // Should never happen
587                 promise.setFailure(new IllegalStateException());
588                 break;
589         }
590         if (fireEvent) {
591             pipeline().fireChannelShutdown(direction);
592         }
593     }
594 
595     private boolean shutdownOutput(boolean writeFrame, Promise<Void> promise) {
596         if (!promise.setUncancellable()) {
597             return false;
598         }
599         if (isShutdown(ChannelShutdownDirection.Outbound)) {
600             // Already shutdown so let's just make this a noop.
601             promise.setSuccess(null);
602             return false;
603         }
604         if (writeFrame) {
605             // Write a headers frame with endOfStream flag set
606             writeTransport(new DefaultHttp2HeadersFrame(EmptyHttp2Headers.INSTANCE, true), newPromise());
607         }
608         outputShutdown = true;
609         promise.setSuccess(null);
610         return true;
611     }
612 
613     void closeForcibly() {
614         closeTransport(newPromise());
615     }
616 
617     private void deregisterTransport(Promise<Void> promise) {
618         fireChannelInactiveAndDeregister(promise, false);
619     }
620 
621     private void fireChannelInactiveAndDeregister(Promise<Void> promise,
622                                                   final boolean fireChannelInactive) {
623         if (!promise.setUncancellable()) {
624             return;
625         }
626 
627         if (!registered) {
628             promise.setSuccess(null);
629             return;
630         }
631 
632         // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
633         // we need to ensure we do the actual deregister operation later. This is necessary to preserve the
634         // behavior of the AbstractChannel, which always invokes channelUnregistered and channelInactive
635         // events 'later' to ensure the current events in the handler are completed before these events.
636         //
637         // See:
638         // https://github.com/netty/netty/issues/4435
639         invokeLater(()-> {
640             if (fireChannelInactive) {
641                 pipeline.fireChannelInactive();
642             }
643             // The user can fire `deregister` events multiple times but we only want to fire the pipeline
644             // event if the channel was actually registered.
645             if (registered) {
646                 registered = false;
647                 pipeline.fireChannelUnregistered();
648             }
649             safeSetSuccess(promise);
650         });
651     }
652 
653     private void safeSetSuccess(Promise<Void> promise) {
654         if (!promise.trySuccess(null)) {
655             logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
656         }
657     }
658 
659     private void invokeLater(Runnable task) {
660         try {
661             // This method is used by outbound operation implementations to trigger an inbound event later.
662             // They do not trigger an inbound event immediately because an outbound operation might have been
663             // triggered by another inbound event handler method.  If fired immediately, the call stack
664             // will look like this for example:
665             //
666             //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
667             //   -> handlerA.ctx.close()
668             //     -> channel.unsafe.close()
669             //       -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
670             //
671             // which means the execution of two inbound handler methods of the same handler overlap undesirably.
672             executor().execute(task);
673         } catch (RejectedExecutionException e) {
674             logger.warn("Can't invoke task later as EventLoop rejected it", e);
675         }
676     }
677 
678     private void readTransport() {
679         if (!isActive()) {
680             return;
681         }
682         updateLocalWindowIfNeeded();
683 
684         switch (readStatus) {
685             case IDLE:
686                 readStatus = ReadStatus.IN_PROGRESS;
687                 doBeginRead();
688                 break;
689             case IN_PROGRESS:
690                 readStatus = ReadStatus.REQUESTED;
691                 break;
692             default:
693                 break;
694         }
695     }
696 
697     private Object pollQueuedMessage() {
698         return inboundBuffer == null ? null : inboundBuffer.poll();
699     }
700 
701     void doBeginRead() {
702         // Process messages until there are none left (or the user stopped requesting) and also handle EOS.
703         while (readStatus != ReadStatus.IDLE) {
704             Object message = pollQueuedMessage();
705             if (message == null) {
706                 if (readEOS) {
707                     closeForcibly();
708                 }
709                 // We need to double check that there is nothing left to flush such as a
710                 // window update frame.
711                 flush();
712                 break;
713             }
714             final RecvBufferAllocator.Handle allocHandle = recvBufAllocHandle();
715             allocHandle.reset();
716             boolean continueReading = false;
717             do {
718                 doRead0((Http2Frame) message, allocHandle);
719             } while ((readEOS || (continueReading = allocHandle.continueReading(isAutoRead())))
720                     && (message = pollQueuedMessage()) != null);
721 
722             if (continueReading && handler.isParentReadInProgress() && !readEOS) {
723                 // Currently the parent and child channel are on the same EventLoop thread. If the parent is
724                 // currently reading it is possible that more frames will be delivered to this child channel. In
725                 // the case that this child channel still wants to read we delay the channelReadComplete on this
726                 // child channel until the parent is done reading.
727                 maybeAddChannelToReadCompletePendingQueue();
728             } else {
729                 notifyReadComplete(allocHandle, true);
730             }
731         }
732     }
733 
734     void readEOS() {
735         readEOS = true;
736     }
737 
738     private void updateLocalWindowIfNeeded() {
739         if (flowControlledBytes != 0) {
740             int bytes = flowControlledBytes;
741             flowControlledBytes = 0;
742             Future<Void> future = handler.parentContext()
743                     .write(new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
744             // window update frames are commonly swallowed by the Http2FrameCodec and the promise is synchronously
745             // completed but the flow controller _may_ have generated a wire level WINDOW_UPDATE. Therefore we need,
746             // to assume there was a write done that needs to be flushed or we risk flow control starvation.
747             writeDoneAndNoFlush = true;
748             // Add a listener which will notify and teardown the stream
749             // when a window update fails if needed or check the result of the future directly if it was completed
750             // already.
751             // See https://github.com/netty/netty/issues/9663
752             if (future.isDone()) {
753                 windowUpdateFrameWriteComplete(DefaultHttp2StreamChannel.this, future);
754             } else {
755                 future.addListener(DefaultHttp2StreamChannel.this,
756                         DefaultHttp2StreamChannel::windowUpdateFrameWriteComplete);
757             }
758         }
759     }
760 
761     void notifyReadComplete(RecvBufferAllocator.Handle allocHandle, boolean forceReadComplete) {
762         if (!readCompletePending && !forceReadComplete) {
763             return;
764         }
765         // Set to false just in case we added the channel multiple times before.
766         readCompletePending = false;
767 
768         if (readStatus == ReadStatus.REQUESTED) {
769             readStatus = ReadStatus.IN_PROGRESS;
770         } else {
771             readStatus = ReadStatus.IDLE;
772         }
773 
774         allocHandle.readComplete();
775         pipeline().fireChannelReadComplete();
776         if (isAutoRead()) {
777             read();
778         }
779 
780         // Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent
781         // channel is not currently reading we need to force a flush at the child channel, because we cannot
782         // rely upon flush occurring in channelReadComplete on the parent channel.
783         flush();
784         if (readEOS) {
785             closeForcibly();
786         }
787     }
788 
789     private void doRead0(Http2Frame frame, RecvBufferAllocator.Handle allocHandle) {
790         if (isShutdown(ChannelShutdownDirection.Inbound)) {
791             // Let's drop data and header frames in the case of shutdown input. Other frames are still valid for
792             // HTTP2.
793             if (frame instanceof Http2DataFrame || frame instanceof Http2HeadersFrame) {
794                 Resource.dispose(frame);
795                 return;
796             }
797         }
798         final int bytes;
799         if (frame instanceof Http2DataFrame) {
800             bytes = ((Http2DataFrame) frame).initialFlowControlledBytes();
801 
802             // It is important that we increment the flowControlledBytes before we call fireChannelRead(...)
803             // as it may cause a read() that will call updateLocalWindowIfNeeded() and we need to ensure
804             // in this case that we accounted for it.
805             //
806             // See https://github.com/netty/netty/issues/9663
807             flowControlledBytes += bytes;
808         } else {
809             bytes = MIN_HTTP2_FRAME_SIZE;
810         }
811         // Update before firing event through the pipeline to be consistent with other Channel implementation.
812         allocHandle.attemptedBytesRead(bytes);
813         allocHandle.lastBytesRead(bytes);
814         allocHandle.incMessagesRead(1);
815 
816         boolean shutdownInput = isShutdownNeeded(frame);
817         pipeline().fireChannelRead(frame);
818         if (shutdownInput) {
819             shutdownTransport(ChannelShutdownDirection.Inbound, newPromise());
820         }
821     }
822 
823     private boolean isShutdownNeeded(Http2Frame frame) {
824         if (frame instanceof Http2HeadersFrame) {
825             return ((Http2HeadersFrame) frame).isEndStream();
826         }
827         if (frame instanceof Http2DataFrame) {
828             return ((Http2DataFrame) frame).isEndStream();
829         }
830         return false;
831     }
832 
833     private void writeTransport(Object msg, Promise<Void> promise) {
834         // After this point it's not possible to cancel a write anymore.
835         if (!promise.setUncancellable()) {
836             Resource.dispose(msg);
837             return;
838         }
839 
840         if (!isActive()) {
841             Resource.dispose(msg);
842             promise.setFailure(new ClosedChannelException());
843             return;
844         }
845 
846         // Once the outbound side was closed we should not allow header / data frames
847         if (isShutdown(ChannelShutdownDirection.Outbound)
848                 && (msg instanceof Http2HeadersFrame || msg instanceof Http2DataFrame)) {
849             Resource.dispose(msg);
850             promise.setFailure(newOutputShutdownException(
851                     DefaultHttp2StreamChannel.class, "writeTransport(Object, Promise)"));
852             return;
853         }
854 
855         try {
856             if (msg instanceof Http2StreamFrame) {
857                 Http2StreamFrame frame = validateStreamFrame((Http2StreamFrame) msg).stream(stream());
858                 boolean shutdownOutput = isShutdownNeeded(frame);
859                 writeHttp2StreamFrame(frame, promise);
860                 if (shutdownOutput) {
861                     if (shutdownOutput(false, newPromise())) {
862                         pipeline().fireChannelShutdown(ChannelShutdownDirection.Outbound);
863                     }
864                 }
865             } else {
866                 String msgStr = msg.toString();
867                 Resource.dispose(msg);
868                 promise.setFailure(new IllegalArgumentException(
869                         "Message must be an " + StringUtil.simpleClassName(Http2StreamFrame.class) +
870                                 ": " + msgStr));
871             }
872         } catch (Throwable t) {
873             promise.tryFailure(t);
874         }
875     }
876 
877     private void writeHttp2StreamFrame(Http2StreamFrame frame, Promise<Void> promise) {
878         if (!firstFrameWritten && !isStreamIdValid(stream().id()) && !(frame instanceof Http2HeadersFrame)) {
879             Resource.dispose(frame);
880             promise.setFailure(
881                 new IllegalArgumentException("The first frame must be a headers frame. Was: "
882                     + frame.name()));
883             return;
884         }
885 
886         final boolean firstWrite;
887         if (firstFrameWritten) {
888             firstWrite = false;
889         } else {
890             firstWrite = firstFrameWritten = true;
891         }
892 
893         Future<Void> f = handler.parentContext().write(frame);
894         if (f.isDone()) {
895             if (firstWrite) {
896                 firstWriteComplete(f, promise);
897             } else {
898                 writeComplete(f, promise);
899             }
900         } else {
901             final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(frame);
902             incrementPendingOutboundBytes(bytes, false);
903             f.addListener(future ->  {
904                 if (firstWrite) {
905                     firstWriteComplete(future, promise);
906                 } else {
907                     writeComplete(future, promise);
908                 }
909                 decrementPendingOutboundBytes(bytes, false);
910             });
911             writeDoneAndNoFlush = true;
912         }
913     }
914 
915     private void firstWriteComplete(Future<?> future, Promise<Void> promise) {
916         Throwable cause = future.cause();
917         if (cause == null) {
918             promise.setSuccess(null);
919         } else {
920             // If the first write fails there is not much we can do, just close
921             closeForcibly();
922             promise.setFailure(wrapStreamClosedError(cause));
923         }
924     }
925 
926     private void writeComplete(Future<?> future, Promise<Void> promise) {
927         Throwable cause = future.cause();
928         if (cause == null) {
929             promise.setSuccess(null);
930         } else {
931             Throwable error = wrapStreamClosedError(cause);
932             // To make it more consistent with AbstractChannel we handle all IOExceptions here.
933             if (error instanceof IOException) {
934                 if (isAutoClose()) {
935                     // Close channel if needed.
936                     closeForcibly();
937                 } else {
938                     shutdownTransport(ChannelShutdownDirection.Outbound, newPromise());
939                 }
940             }
941             promise.setFailure(error);
942         }
943     }
944 
945     private Throwable wrapStreamClosedError(Throwable cause) {
946         // If the error was caused by STREAM_CLOSED we should use a ClosedChannelException to better
947         // mimic other transports and make it easier to reason about what exceptions to expect.
948         if (cause instanceof Http2Exception && ((Http2Exception) cause).error() == Http2Error.STREAM_CLOSED) {
949             return new ClosedChannelException().initCause(cause);
950         }
951         return cause;
952     }
953 
954     private Http2StreamFrame validateStreamFrame(Http2StreamFrame frame) {
955         if (frame.stream() != null && frame.stream() != stream) {
956             String msgString = frame.toString();
957             Resource.dispose(frame);
958             throw new IllegalArgumentException(
959                     "Stream " + frame.stream() + " must not be set on the frame: " + msgString);
960         }
961         return frame;
962     }
963 
964     private void flushTransport() {
965         // If we are currently in the parent channel's read loop we should just ignore the flush.
966         // We will ensure we trigger ctx.flush() after we processed all Channels later on and
967         // so aggregate the flushes. This is done as ctx.flush() is expensive when as it may trigger an
968         // write(...) or writev(...) operation on the socket.
969         if (!writeDoneAndNoFlush || handler.isParentReadInProgress()) {
970             // There is nothing to flush so this is a NOOP.
971             return;
972         }
973         // We need to set this to false before we call flush0(...) as FutureListener may produce more data
974         // that are explicit flushed.
975         writeDoneAndNoFlush = false;
976         handler.parentContext().flush();
977     }
978 
979     private void sendOutboundEventTransport(Object event, Promise<Void> promise) {
980         Resource.dispose(event);
981         promise.setSuccess(null);
982     }
983 
984     @Override
985     public BufferAllocator bufferAllocator() {
986         return bufferAllocator;
987     }
988 
989     private <T> void validate(ChannelOption<T> option, T value) {
990         requireNonNull(option, "option");
991         option.validate(value);
992     }
993 
994     @Override
995     @SuppressWarnings({ "unchecked", "deprecation" })
996     public <T> T getOption(ChannelOption<T> option) {
997         requireNonNull(option, "option");
998         if (option == AUTO_READ) {
999             return (T) Boolean.valueOf(isAutoRead());
1000         }
1001         if (option == WRITE_BUFFER_WATER_MARK) {
1002             return (T) getWriteBufferWaterMark();
1003         }
1004         if (option == CONNECT_TIMEOUT_MILLIS) {
1005             return (T) Integer.valueOf(getConnectTimeoutMillis());
1006         }
1007         if (option == MAX_MESSAGES_PER_READ) {
1008             return (T) Integer.valueOf(getMaxMessagesPerRead());
1009         }
1010         if (option == WRITE_SPIN_COUNT) {
1011             return (T) Integer.valueOf(getWriteSpinCount());
1012         }
1013         if (option == BUFFER_ALLOCATOR) {
1014             return (T) getBufferAllocator();
1015         }
1016         if (option == RCVBUFFER_ALLOCATOR) {
1017             return getRecvBufferAllocator();
1018         }
1019         if (option == AUTO_CLOSE) {
1020             return (T) Boolean.valueOf(isAutoClose());
1021         }
1022         if (option == MESSAGE_SIZE_ESTIMATOR) {
1023             return (T) getMessageSizeEstimator();
1024         }
1025         if (option == MAX_MESSAGES_PER_WRITE) {
1026             return (T) Integer.valueOf(getMaxMessagesPerWrite());
1027         }
1028         if (option == ALLOW_HALF_CLOSURE) {
1029             return (T) Boolean.valueOf(isAllowHalfClosure());
1030         }
1031 
1032         return null;
1033     }
1034 
1035     @Override
1036     @SuppressWarnings("deprecation")
1037     public <T> Channel setOption(ChannelOption<T> option, T value) {
1038         validate(option, value);
1039 
1040         if (option == AUTO_READ) {
1041             setAutoRead((Boolean) value);
1042         } else if (option == WRITE_BUFFER_WATER_MARK) {
1043             setWriteBufferWaterMark((WriteBufferWaterMark) value);
1044         } else if (option == CONNECT_TIMEOUT_MILLIS) {
1045             setConnectTimeoutMillis((Integer) value);
1046         } else if (option == MAX_MESSAGES_PER_READ) {
1047             setMaxMessagesPerRead((Integer) value);
1048         } else if (option == WRITE_SPIN_COUNT) {
1049             setWriteSpinCount((Integer) value);
1050         } else if (option == BUFFER_ALLOCATOR) {
1051             setBufferAllocator((BufferAllocator) value);
1052         } else if (option == RCVBUFFER_ALLOCATOR) {
1053             setRecvBufferAllocator((RecvBufferAllocator) value);
1054         } else if (option == AUTO_CLOSE) {
1055             setAutoClose((Boolean) value);
1056         } else if (option == MESSAGE_SIZE_ESTIMATOR) {
1057             return this;
1058         } else if (option == MAX_MESSAGES_PER_WRITE) {
1059             setMaxMessagesPerWrite((Integer) value);
1060         } else if (option == ALLOW_HALF_CLOSURE) {
1061             setAllowHalfClosure((Boolean) value);
1062         }
1063         return this;
1064     }
1065 
1066     @Override
1067     public boolean isOptionSupported(ChannelOption<?> option) {
1068         return option == AUTO_READ || option == WRITE_BUFFER_WATER_MARK || option == CONNECT_TIMEOUT_MILLIS ||
1069                 option == MAX_MESSAGES_PER_READ || option == WRITE_SPIN_COUNT || option == BUFFER_ALLOCATOR ||
1070                 option == RCVBUFFER_ALLOCATOR || option == AUTO_CLOSE || option == MESSAGE_SIZE_ESTIMATOR ||
1071                 option == MAX_MESSAGES_PER_WRITE || option == ALLOW_HALF_CLOSURE;
1072     }
1073 
1074     private int getConnectTimeoutMillis() {
1075         return connectTimeoutMillis;
1076     }
1077 
1078     private void setConnectTimeoutMillis(int connectTimeoutMillis) {
1079         checkPositiveOrZero(connectTimeoutMillis, "connectTimeoutMillis");
1080         this.connectTimeoutMillis = connectTimeoutMillis;
1081     }
1082 
1083     /**
1084      * @throws IllegalStateException if {@link #getRecvBufferAllocator()} does not return an object of type
1085      * {@link MaxMessagesRecvBufferAllocator}.
1086      */
1087     @Deprecated
1088     private int getMaxMessagesPerRead() {
1089         try {
1090             MaxMessagesRecvBufferAllocator allocator = getRecvBufferAllocator();
1091             return allocator.maxMessagesPerRead();
1092         } catch (ClassCastException e) {
1093             throw new IllegalStateException("getRecvBufferAllocator() must return an object of type " +
1094                     "MaxMessagesRecvBufferAllocator", e);
1095         }
1096     }
1097 
1098     /**
1099      * @throws IllegalStateException if {@link #getRecvBufferAllocator()} does not return an object of type
1100      * {@link MaxMessagesRecvBufferAllocator}.
1101      */
1102     @Deprecated
1103     private void setMaxMessagesPerRead(int maxMessagesPerRead) {
1104         try {
1105             MaxMessagesRecvBufferAllocator allocator = getRecvBufferAllocator();
1106             allocator.maxMessagesPerRead(maxMessagesPerRead);
1107         } catch (ClassCastException e) {
1108             throw new IllegalStateException("getRecvBufferAllocator() must return an object of type " +
1109                     "MaxMessagesRecvBufferAllocator", e);
1110         }
1111     }
1112 
1113     /**
1114      * Get the maximum number of message to write per eventloop run. Once this limit is
1115      * reached we will continue to process other events before trying to write the remaining messages.
1116      */
1117     private int getMaxMessagesPerWrite() {
1118         return maxMessagesPerWrite;
1119     }
1120 
1121     /**
1122      * Set the maximum number of message to write per eventloop run. Once this limit is
1123      * reached we will continue to process other events before trying to write the remaining messages.
1124      */
1125     private void setMaxMessagesPerWrite(int maxMessagesPerWrite) {
1126         this.maxMessagesPerWrite = checkPositive(maxMessagesPerWrite, "maxMessagesPerWrite");
1127     }
1128 
1129     private int getWriteSpinCount() {
1130         return writeSpinCount;
1131     }
1132 
1133     private void setWriteSpinCount(int writeSpinCount) {
1134         checkPositive(writeSpinCount, "writeSpinCount");
1135         // Integer.MAX_VALUE is used as a special value in the channel implementations to indicate the channel cannot
1136         // accept any more data, and results in the writeOp being set on the selector (or execute a runnable which tries
1137         // to flush later because the writeSpinCount quantum has been exhausted). This strategy prevents additional
1138         // conditional logic in the channel implementations, and shouldn't be noticeable in practice.
1139         if (writeSpinCount == Integer.MAX_VALUE) {
1140             --writeSpinCount;
1141         }
1142         this.writeSpinCount = writeSpinCount;
1143     }
1144 
1145     private BufferAllocator getBufferAllocator() {
1146         return bufferAllocator;
1147     }
1148 
1149     public void setBufferAllocator(BufferAllocator bufferAllocator) {
1150         requireNonNull(bufferAllocator, "bufferAllocator");
1151         this.bufferAllocator = bufferAllocator;
1152     }
1153 
1154     @SuppressWarnings("unchecked")
1155     private <T extends RecvBufferAllocator> T getRecvBufferAllocator() {
1156         return (T) rcvBufAllocator;
1157     }
1158 
1159     private void setRecvBufferAllocator(RecvBufferAllocator allocator) {
1160         rcvBufAllocator = requireNonNull(allocator, "allocator");
1161     }
1162 
1163     /**
1164      * Set the {@link RecvBufferAllocator} which is used for the channel to allocate receive buffers.
1165      * @param allocator the allocator to set.
1166      * @param metadata Used to set the {@link ChannelMetadata#defaultMaxMessagesPerRead()} if {@code allocator}
1167      * is of type {@link MaxMessagesRecvBufferAllocator}.
1168      */
1169     private void setRecvBufferAllocator(RecvBufferAllocator allocator, ChannelMetadata metadata) {
1170         requireNonNull(allocator, "allocator");
1171         requireNonNull(metadata, "metadata");
1172         if (allocator instanceof MaxMessagesRecvBufferAllocator) {
1173             ((MaxMessagesRecvBufferAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
1174         }
1175         setRecvBufferAllocator(allocator);
1176     }
1177 
1178     private boolean isAutoRead() {
1179         return autoRead == 1;
1180     }
1181 
1182     private void setAutoRead(boolean autoRead) {
1183         boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
1184         if (autoRead && !oldAutoRead) {
1185             read();
1186         } else if (!autoRead && oldAutoRead) {
1187             autoReadCleared();
1188         }
1189     }
1190 
1191     /**
1192      * Is called once {@link #setAutoRead(boolean)} is called with {@code false} and {@link #isAutoRead()} was
1193      * {@code true} before.
1194      */
1195     protected void autoReadCleared() { }
1196 
1197     private WriteBufferWaterMark getWriteBufferWaterMark() {
1198         return writeBufferWaterMark;
1199     }
1200 
1201     private boolean isAutoClose() {
1202         return autoClose;
1203     }
1204 
1205     private void setAutoClose(boolean autoClose) {
1206         this.autoClose = autoClose;
1207     }
1208 
1209     private int getWriteBufferHighWaterMark() {
1210         return writeBufferWaterMark.high();
1211     }
1212 
1213     private void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
1214         checkPositiveOrZero(writeBufferHighWaterMark, "writeBufferHighWaterMark");
1215         for (;;) {
1216             WriteBufferWaterMark waterMark = writeBufferWaterMark;
1217             if (writeBufferHighWaterMark < waterMark.low()) {
1218                 throw new IllegalArgumentException(
1219                         "writeBufferHighWaterMark cannot be less than " +
1220                                 "writeBufferLowWaterMark (" + waterMark.low() + "): " +
1221                                 writeBufferHighWaterMark);
1222             }
1223             if (WATERMARK_UPDATER.compareAndSet(this, waterMark,
1224                     new WriteBufferWaterMark(waterMark.low(), writeBufferHighWaterMark))) {
1225                 return;
1226             }
1227         }
1228     }
1229 
1230     private int getWriteBufferLowWaterMark() {
1231         return writeBufferWaterMark.low();
1232     }
1233 
1234     private void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
1235         checkPositiveOrZero(writeBufferLowWaterMark, "writeBufferLowWaterMark");
1236         for (;;) {
1237             WriteBufferWaterMark waterMark = writeBufferWaterMark;
1238             if (writeBufferLowWaterMark > waterMark.high()) {
1239                 throw new IllegalArgumentException(
1240                         "writeBufferLowWaterMark cannot be greater than " +
1241                                 "writeBufferHighWaterMark (" + waterMark.high() + "): " +
1242                                 writeBufferLowWaterMark);
1243             }
1244             if (WATERMARK_UPDATER.compareAndSet(this, waterMark,
1245                     new WriteBufferWaterMark(writeBufferLowWaterMark, waterMark.high()))) {
1246                 return;
1247             }
1248         }
1249     }
1250 
1251     private void setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
1252         this.writeBufferWaterMark = requireNonNull(writeBufferWaterMark, "writeBufferWaterMark");
1253     }
1254 
1255     private MessageSizeEstimator getMessageSizeEstimator() {
1256         return FlowControlledFrameSizeEstimator.INSTANCE;
1257     }
1258 
1259     private boolean isAllowHalfClosure() {
1260         return allowHalfClosure;
1261     }
1262 
1263     private void setAllowHalfClosure(boolean allowHalfClosure) {
1264         this.allowHalfClosure = allowHalfClosure;
1265     }
1266 
1267     private void maybeAddChannelToReadCompletePendingQueue() {
1268         if (!readCompletePending) {
1269             readCompletePending = true;
1270             handler.addChannelToReadCompletePendingQueue(this);
1271         }
1272     }
1273 
1274     /**
1275      * Creates a new {@link ChannelOutputShutdownException} which has the origin of the given {@link Class} and method.
1276      */
1277     private static ChannelOutputShutdownException newOutputShutdownException(Class<?> clazz, String method) {
1278         return ThrowableUtil.unknownStackTrace(new ChannelOutputShutdownException() {
1279             @Override
1280             public Throwable fillInStackTrace() {
1281                 // Suppress a warning since this method doesn't need synchronization
1282                 return this; // lgtm [java/non-sync-override]
1283             }
1284         }, clazz, method);
1285     }
1286 
1287     private static final class DefaultHttp2ChannelPipeline extends DefaultChannelPipeline {
1288         DefaultHttp2ChannelPipeline(Channel channel) {
1289             super(channel);
1290         }
1291 
1292         private DefaultHttp2StreamChannel defaultHttp2StreamChannel() {
1293             return (DefaultHttp2StreamChannel) channel();
1294         }
1295 
1296         @Override
1297         protected EventExecutor transportExecutor() {
1298             return defaultHttp2StreamChannel().executor();
1299         }
1300 
1301         @Override
1302         protected void pendingOutboundBytesUpdated(long pendingOutboundBytes) {
1303             // TODO: Do we need to do anything special here ?
1304         }
1305 
1306         @Override
1307         protected void registerTransport(Promise<Void> promise) {
1308             defaultHttp2StreamChannel().registerTransport(promise);
1309         }
1310 
1311         @Override
1312         protected void bindTransport(SocketAddress localAddress, Promise<Void> promise) {
1313             defaultHttp2StreamChannel().bindTransport(localAddress, promise);
1314         }
1315 
1316         @Override
1317         protected void connectTransport(
1318                 SocketAddress remoteAddress, SocketAddress localAddress, Promise<Void> promise) {
1319             defaultHttp2StreamChannel().connectTransport(remoteAddress, localAddress, promise);
1320         }
1321 
1322         @Override
1323         protected void disconnectTransport(Promise<Void> promise) {
1324             defaultHttp2StreamChannel().disconnectTransport(promise);
1325         }
1326 
1327         @Override
1328         protected void closeTransport(Promise<Void> promise) {
1329             defaultHttp2StreamChannel().closeTransport(promise);
1330         }
1331 
1332         @Override
1333         protected void shutdownTransport(ChannelShutdownDirection direction, Promise<Void> promise) {
1334             defaultHttp2StreamChannel().shutdownTransport(direction, promise);
1335         }
1336 
1337         @Override
1338         protected void deregisterTransport(Promise<Void> promise) {
1339             defaultHttp2StreamChannel().deregisterTransport(promise);
1340         }
1341 
1342         @Override
1343         protected void readTransport() {
1344             defaultHttp2StreamChannel().readTransport();
1345         }
1346 
1347         @Override
1348         protected void writeTransport(Object msg, Promise<Void> promise) {
1349             defaultHttp2StreamChannel().writeTransport(msg, promise);
1350         }
1351 
1352         @Override
1353         protected void flushTransport() {
1354             defaultHttp2StreamChannel().flushTransport();
1355         }
1356 
1357         @Override
1358         protected void sendOutboundEventTransport(Object event, Promise<Void> promise) {
1359             defaultHttp2StreamChannel().sendOutboundEventTransport(event, promise);
1360         }
1361     }
1362 }