View Javadoc
1   /*
2    * Copyright 2019 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandler;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelId;
26  import io.netty.channel.ChannelMetadata;
27  import io.netty.channel.ChannelOutboundBuffer;
28  import io.netty.channel.ChannelPipeline;
29  import io.netty.channel.ChannelProgressivePromise;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.DefaultChannelConfig;
32  import io.netty.channel.DefaultChannelPipeline;
33  import io.netty.channel.EventLoop;
34  import io.netty.channel.MessageSizeEstimator;
35  import io.netty.channel.RecvByteBufAllocator;
36  import io.netty.channel.VoidChannelPromise;
37  import io.netty.channel.WriteBufferWaterMark;
38  import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
39  import io.netty.util.DefaultAttributeMap;
40  import io.netty.util.ReferenceCountUtil;
41  import io.netty.util.internal.StringUtil;
42  import io.netty.util.internal.logging.InternalLogger;
43  import io.netty.util.internal.logging.InternalLoggerFactory;
44  
45  import java.io.IOException;
46  import java.net.SocketAddress;
47  import java.nio.channels.ClosedChannelException;
48  import java.util.ArrayDeque;
49  import java.util.Queue;
50  import java.util.concurrent.RejectedExecutionException;
51  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
52  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
53  
54  import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
55  import static java.lang.Math.min;
56  
57  abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
58  
59      static final Http2FrameStreamVisitor WRITABLE_VISITOR = new Http2FrameStreamVisitor() {
60          @Override
61          public boolean visit(Http2FrameStream stream) {
62              final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
63                      ((DefaultHttp2FrameStream) stream).attachment;
64              childChannel.trySetWritable();
65              return true;
66          }
67      };
68  
69      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractHttp2StreamChannel.class);
70  
71      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
72  
73      /**
74       * Number of bytes to consider non-payload messages. 9 is arbitrary, but also the minimum size of an HTTP/2 frame.
75       * Primarily is non-zero.
76       */
77      private static final int MIN_HTTP2_FRAME_SIZE = 9;
78  
79      /**
80       * Returns the flow-control size for DATA frames, and {@value MIN_HTTP2_FRAME_SIZE} for all other frames.
81       */
82      private static final class FlowControlledFrameSizeEstimator implements MessageSizeEstimator {
83  
84          static final FlowControlledFrameSizeEstimator INSTANCE = new FlowControlledFrameSizeEstimator();
85  
86          private static final Handle HANDLE_INSTANCE = new Handle() {
87              @Override
88              public int size(Object msg) {
89                  return msg instanceof Http2DataFrame ?
90                          // Guard against overflow.
91                          (int) min(Integer.MAX_VALUE, ((Http2DataFrame) msg).initialFlowControlledBytes() +
92                                  (long) MIN_HTTP2_FRAME_SIZE) : MIN_HTTP2_FRAME_SIZE;
93              }
94          };
95  
96          @Override
97          public Handle newHandle() {
98              return HANDLE_INSTANCE;
99          }
100     }
101 
102     private static final AtomicLongFieldUpdater<AbstractHttp2StreamChannel> TOTAL_PENDING_SIZE_UPDATER =
103             AtomicLongFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "totalPendingSize");
104 
105     private static final AtomicIntegerFieldUpdater<AbstractHttp2StreamChannel> UNWRITABLE_UPDATER =
106             AtomicIntegerFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "unwritable");
107 
108     private static void windowUpdateFrameWriteComplete(ChannelFuture future, Channel streamChannel) {
109         Throwable cause = future.cause();
110         if (cause != null) {
111             Throwable unwrappedCause;
112             // Unwrap if needed
113             if (cause instanceof Http2FrameStreamException && (unwrappedCause = cause.getCause()) != null) {
114                 cause = unwrappedCause;
115             }
116 
117             // Notify the child-channel and close it.
118             streamChannel.pipeline().fireExceptionCaught(cause);
119             streamChannel.unsafe().close(streamChannel.unsafe().voidPromise());
120         }
121     }
122 
123     private final ChannelFutureListener windowUpdateFrameWriteListener = new ChannelFutureListener() {
124         @Override
125         public void operationComplete(ChannelFuture future) {
126             windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
127         }
128     };
129 
130     /**
131      * The current status of the read-processing for a {@link AbstractHttp2StreamChannel}.
132      */
133     private enum ReadStatus {
134         /**
135          * No read in progress and no read was requested (yet)
136          */
137         IDLE,
138 
139         /**
140          * Reading in progress
141          */
142         IN_PROGRESS,
143 
144         /**
145          * A read operation was requested.
146          */
147         REQUESTED
148     }
149 
150     private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
151     private final Http2ChannelUnsafe unsafe = new Http2ChannelUnsafe();
152     private final ChannelId channelId;
153     private final ChannelPipeline pipeline;
154     private final DefaultHttp2FrameStream stream;
155     private final ChannelPromise closePromise;
156 
157     private volatile boolean registered;
158 
159     private volatile long totalPendingSize;
160     private volatile int unwritable;
161 
162     // Cached to reduce GC
163     private Runnable fireChannelWritabilityChangedTask;
164 
165     private boolean outboundClosed;
166     private int flowControlledBytes;
167 
168     /**
169      * This variable represents if a read is in progress for the current channel or was requested.
170      * Note that depending upon the {@link RecvByteBufAllocator} behavior a read may extend beyond the
171      * {@link Http2ChannelUnsafe#beginRead()} method scope. The {@link Http2ChannelUnsafe#beginRead()} loop may
172      * drain all pending data, and then if the parent channel is reading this channel may still accept frames.
173      */
174     private ReadStatus readStatus = ReadStatus.IDLE;
175 
176     private Queue<Object> inboundBuffer;
177 
178     /** {@code true} after the first HEADERS frame has been written **/
179     private boolean firstFrameWritten;
180     private boolean readCompletePending;
181 
182     AbstractHttp2StreamChannel(DefaultHttp2FrameStream stream, int id, ChannelHandler inboundHandler) {
183         this.stream = stream;
184         stream.attachment = this;
185         pipeline = new DefaultChannelPipeline(this) {
186             @Override
187             protected void incrementPendingOutboundBytes(long size) {
188                 AbstractHttp2StreamChannel.this.incrementPendingOutboundBytes(size, true);
189             }
190 
191             @Override
192             protected void decrementPendingOutboundBytes(long size) {
193                 AbstractHttp2StreamChannel.this.decrementPendingOutboundBytes(size, true);
194             }
195         };
196 
197         closePromise = pipeline.newPromise();
198         channelId = new Http2StreamChannelId(parent().id(), id);
199 
200         if (inboundHandler != null) {
201             // Add the handler to the pipeline now that we are registered.
202             pipeline.addLast(inboundHandler);
203         }
204     }
205 
206     private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
207         if (size == 0) {
208             return;
209         }
210 
211         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
212         if (newWriteBufferSize > config().getWriteBufferHighWaterMark()) {
213             setUnwritable(invokeLater);
214         }
215     }
216 
217     private void decrementPendingOutboundBytes(long size, boolean invokeLater) {
218         if (size == 0) {
219             return;
220         }
221 
222         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
223         // Once the totalPendingSize dropped below the low water-mark we can mark the child channel
224         // as writable again. Before doing so we also need to ensure the parent channel is writable to
225         // prevent excessive buffering in the parent outbound buffer. If the parent is not writable
226         // we will mark the child channel as writable once the parent becomes writable by calling
227         // trySetWritable() later.
228         if (newWriteBufferSize < config().getWriteBufferLowWaterMark() && parent().isWritable()) {
229             setWritable(invokeLater);
230         }
231     }
232 
233     final void trySetWritable() {
234         // The parent is writable again but the child channel itself may still not be writable.
235         // Lets try to set the child channel writable to match the state of the parent channel
236         // if (and only if) the totalPendingSize is smaller then the low water-mark.
237         // If this is not the case we will try again later once we drop under it.
238         if (totalPendingSize < config().getWriteBufferLowWaterMark()) {
239             setWritable(false);
240         }
241     }
242 
243     private void setWritable(boolean invokeLater) {
244         for (;;) {
245             final int oldValue = unwritable;
246             final int newValue = oldValue & ~1;
247             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
248                 if (oldValue != 0 && newValue == 0) {
249                     fireChannelWritabilityChanged(invokeLater);
250                 }
251                 break;
252             }
253         }
254     }
255 
256     private void setUnwritable(boolean invokeLater) {
257         for (;;) {
258             final int oldValue = unwritable;
259             final int newValue = oldValue | 1;
260             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
261                 if (oldValue == 0) {
262                     fireChannelWritabilityChanged(invokeLater);
263                 }
264                 break;
265             }
266         }
267     }
268 
269     private void fireChannelWritabilityChanged(boolean invokeLater) {
270         final ChannelPipeline pipeline = pipeline();
271         if (invokeLater) {
272             Runnable task = fireChannelWritabilityChangedTask;
273             if (task == null) {
274                 fireChannelWritabilityChangedTask = task = new Runnable() {
275                     @Override
276                     public void run() {
277                         pipeline.fireChannelWritabilityChanged();
278                     }
279                 };
280             }
281             eventLoop().execute(task);
282         } else {
283             pipeline.fireChannelWritabilityChanged();
284         }
285     }
286     @Override
287     public Http2FrameStream stream() {
288         return stream;
289     }
290 
291     void closeOutbound() {
292         outboundClosed = true;
293     }
294 
295     void streamClosed() {
296         unsafe.readEOS();
297         // Attempt to drain any queued data from the queue and deliver it to the application before closing this
298         // channel.
299         unsafe.doBeginRead();
300     }
301 
302     @Override
303     public ChannelMetadata metadata() {
304         return METADATA;
305     }
306 
307     @Override
308     public ChannelConfig config() {
309         return config;
310     }
311 
312     @Override
313     public boolean isOpen() {
314         return !closePromise.isDone();
315     }
316 
317     @Override
318     public boolean isActive() {
319         return isOpen();
320     }
321 
322     @Override
323     public boolean isWritable() {
324         return unwritable == 0;
325     }
326 
327     @Override
328     public ChannelId id() {
329         return channelId;
330     }
331 
332     @Override
333     public EventLoop eventLoop() {
334         return parent().eventLoop();
335     }
336 
337     @Override
338     public Channel parent() {
339         return parentContext().channel();
340     }
341 
342     @Override
343     public boolean isRegistered() {
344         return registered;
345     }
346 
347     @Override
348     public SocketAddress localAddress() {
349         return parent().localAddress();
350     }
351 
352     @Override
353     public SocketAddress remoteAddress() {
354         return parent().remoteAddress();
355     }
356 
357     @Override
358     public ChannelFuture closeFuture() {
359         return closePromise;
360     }
361 
362     @Override
363     public long bytesBeforeUnwritable() {
364         long bytes = config().getWriteBufferHighWaterMark() - totalPendingSize;
365         // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check
366         // writability. Note that totalPendingSize and isWritable() use different volatile variables that are not
367         // synchronized together. totalPendingSize will be updated before isWritable().
368         if (bytes > 0) {
369             return isWritable() ? bytes : 0;
370         }
371         return 0;
372     }
373 
374     @Override
375     public long bytesBeforeWritable() {
376         long bytes = totalPendingSize - config().getWriteBufferLowWaterMark();
377         // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
378         // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
379         // together. totalPendingSize will be updated before isWritable().
380         if (bytes > 0) {
381             return isWritable() ? 0 : bytes;
382         }
383         return 0;
384     }
385 
386     @Override
387     public Unsafe unsafe() {
388         return unsafe;
389     }
390 
391     @Override
392     public ChannelPipeline pipeline() {
393         return pipeline;
394     }
395 
396     @Override
397     public ByteBufAllocator alloc() {
398         return config().getAllocator();
399     }
400 
401     @Override
402     public Channel read() {
403         pipeline().read();
404         return this;
405     }
406 
407     @Override
408     public Channel flush() {
409         pipeline().flush();
410         return this;
411     }
412 
413     @Override
414     public ChannelFuture bind(SocketAddress localAddress) {
415         return pipeline().bind(localAddress);
416     }
417 
418     @Override
419     public ChannelFuture connect(SocketAddress remoteAddress) {
420         return pipeline().connect(remoteAddress);
421     }
422 
423     @Override
424     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
425         return pipeline().connect(remoteAddress, localAddress);
426     }
427 
428     @Override
429     public ChannelFuture disconnect() {
430         return pipeline().disconnect();
431     }
432 
433     @Override
434     public ChannelFuture close() {
435         return pipeline().close();
436     }
437 
438     @Override
439     public ChannelFuture deregister() {
440         return pipeline().deregister();
441     }
442 
443     @Override
444     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
445         return pipeline().bind(localAddress, promise);
446     }
447 
448     @Override
449     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
450         return pipeline().connect(remoteAddress, promise);
451     }
452 
453     @Override
454     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
455         return pipeline().connect(remoteAddress, localAddress, promise);
456     }
457 
458     @Override
459     public ChannelFuture disconnect(ChannelPromise promise) {
460         return pipeline().disconnect(promise);
461     }
462 
463     @Override
464     public ChannelFuture close(ChannelPromise promise) {
465         return pipeline().close(promise);
466     }
467 
468     @Override
469     public ChannelFuture deregister(ChannelPromise promise) {
470         return pipeline().deregister(promise);
471     }
472 
473     @Override
474     public ChannelFuture write(Object msg) {
475         return pipeline().write(msg);
476     }
477 
478     @Override
479     public ChannelFuture write(Object msg, ChannelPromise promise) {
480         return pipeline().write(msg, promise);
481     }
482 
483     @Override
484     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
485         return pipeline().writeAndFlush(msg, promise);
486     }
487 
488     @Override
489     public ChannelFuture writeAndFlush(Object msg) {
490         return pipeline().writeAndFlush(msg);
491     }
492 
493     @Override
494     public ChannelPromise newPromise() {
495         return pipeline().newPromise();
496     }
497 
498     @Override
499     public ChannelProgressivePromise newProgressivePromise() {
500         return pipeline().newProgressivePromise();
501     }
502 
503     @Override
504     public ChannelFuture newSucceededFuture() {
505         return pipeline().newSucceededFuture();
506     }
507 
508     @Override
509     public ChannelFuture newFailedFuture(Throwable cause) {
510         return pipeline().newFailedFuture(cause);
511     }
512 
513     @Override
514     public ChannelPromise voidPromise() {
515         return pipeline().voidPromise();
516     }
517 
518     @Override
519     public int hashCode() {
520         return id().hashCode();
521     }
522 
523     @Override
524     public boolean equals(Object o) {
525         return this == o;
526     }
527 
528     @Override
529     public int compareTo(Channel o) {
530         if (this == o) {
531             return 0;
532         }
533 
534         return id().compareTo(o.id());
535     }
536 
537     @Override
538     public String toString() {
539         return parent().toString() + "(H2 - " + stream + ')';
540     }
541 
542     /**
543      * Receive a read message. This does not notify handlers unless a read is in progress on the
544      * channel.
545      */
546     void fireChildRead(Http2Frame frame) {
547         assert eventLoop().inEventLoop();
548         if (!isActive()) {
549             ReferenceCountUtil.release(frame);
550         } else if (readStatus != ReadStatus.IDLE) {
551             // If a read is in progress or has been requested, there cannot be anything in the queue,
552             // otherwise we would have drained it from the queue and processed it during the read cycle.
553             assert inboundBuffer == null || inboundBuffer.isEmpty();
554             final RecvByteBufAllocator.Handle allocHandle = unsafe.recvBufAllocHandle();
555             unsafe.doRead0(frame, allocHandle);
556             // We currently don't need to check for readEOS because the parent channel and child channel are limited
557             // to the same EventLoop thread. There are a limited number of frame types that may come after EOS is
558             // read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the
559             // cost of additional readComplete notifications on the rare path.
560             if (allocHandle.continueReading()) {
561                 maybeAddChannelToReadCompletePendingQueue();
562             } else {
563                 unsafe.notifyReadComplete(allocHandle, true);
564             }
565         } else {
566             if (inboundBuffer == null) {
567                 inboundBuffer = new ArrayDeque<Object>(4);
568             }
569             inboundBuffer.add(frame);
570         }
571     }
572 
573     void fireChildReadComplete() {
574         assert eventLoop().inEventLoop();
575         assert readStatus != ReadStatus.IDLE || !readCompletePending;
576         unsafe.notifyReadComplete(unsafe.recvBufAllocHandle(), false);
577     }
578 
579     private final class Http2ChannelUnsafe implements Unsafe {
580         private final VoidChannelPromise unsafeVoidPromise =
581                 new VoidChannelPromise(AbstractHttp2StreamChannel.this, false);
582         @SuppressWarnings("deprecation")
583         private RecvByteBufAllocator.Handle recvHandle;
584         private boolean writeDoneAndNoFlush;
585         private boolean closeInitiated;
586         private boolean readEOS;
587 
588         @Override
589         public void connect(final SocketAddress remoteAddress,
590                             SocketAddress localAddress, final ChannelPromise promise) {
591             if (!promise.setUncancellable()) {
592                 return;
593             }
594             promise.setFailure(new UnsupportedOperationException());
595         }
596 
597         @Override
598         public RecvByteBufAllocator.Handle recvBufAllocHandle() {
599             if (recvHandle == null) {
600                 recvHandle = config().getRecvByteBufAllocator().newHandle();
601                 recvHandle.reset(config());
602             }
603             return recvHandle;
604         }
605 
606         @Override
607         public SocketAddress localAddress() {
608             return parent().unsafe().localAddress();
609         }
610 
611         @Override
612         public SocketAddress remoteAddress() {
613             return parent().unsafe().remoteAddress();
614         }
615 
616         @Override
617         public void register(EventLoop eventLoop, ChannelPromise promise) {
618             if (!promise.setUncancellable()) {
619                 return;
620             }
621             if (registered) {
622                 promise.setFailure(new UnsupportedOperationException("Re-register is not supported"));
623                 return;
624             }
625 
626             registered = true;
627 
628             promise.setSuccess();
629 
630             pipeline().fireChannelRegistered();
631             if (isActive()) {
632                 pipeline().fireChannelActive();
633             }
634         }
635 
636         @Override
637         public void bind(SocketAddress localAddress, ChannelPromise promise) {
638             if (!promise.setUncancellable()) {
639                 return;
640             }
641             promise.setFailure(new UnsupportedOperationException());
642         }
643 
644         @Override
645         public void disconnect(ChannelPromise promise) {
646             close(promise);
647         }
648 
649         @Override
650         public void close(final ChannelPromise promise) {
651             if (!promise.setUncancellable()) {
652                 return;
653             }
654             if (closeInitiated) {
655                 if (closePromise.isDone()) {
656                     // Closed already.
657                     promise.setSuccess();
658                 } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
659                     // This means close() was called before so we just register a listener and return
660                     closePromise.addListener(new ChannelFutureListener() {
661                         @Override
662                         public void operationComplete(ChannelFuture future) {
663                             promise.setSuccess();
664                         }
665                     });
666                 }
667                 return;
668             }
669             closeInitiated = true;
670             // Just set to false as removing from an underlying queue would even be more expensive.
671             readCompletePending = false;
672 
673             final boolean wasActive = isActive();
674 
675             // There is no need to update the local window as once the stream is closed all the pending bytes will be
676             // given back to the connection window by the controller itself.
677 
678             // Only ever send a reset frame if the connection is still alive and if the stream was created before
679             // as otherwise we may send a RST on a stream in an invalid state and cause a connection error.
680             if (parent().isActive() && !readEOS && isStreamIdValid(stream.id())) {
681                 Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream());
682                 write(resetFrame, unsafe().voidPromise());
683                 flush();
684             }
685 
686             if (inboundBuffer != null) {
687                 for (;;) {
688                     Object msg = inboundBuffer.poll();
689                     if (msg == null) {
690                         break;
691                     }
692                     ReferenceCountUtil.release(msg);
693                 }
694                 inboundBuffer = null;
695             }
696 
697             // The promise should be notified before we call fireChannelInactive().
698             outboundClosed = true;
699             closePromise.setSuccess();
700             promise.setSuccess();
701 
702             fireChannelInactiveAndDeregister(voidPromise(), wasActive);
703         }
704 
705         @Override
706         public void closeForcibly() {
707             close(unsafe().voidPromise());
708         }
709 
710         @Override
711         public void deregister(ChannelPromise promise) {
712             fireChannelInactiveAndDeregister(promise, false);
713         }
714 
715         private void fireChannelInactiveAndDeregister(final ChannelPromise promise,
716                                                       final boolean fireChannelInactive) {
717             if (!promise.setUncancellable()) {
718                 return;
719             }
720 
721             if (!registered) {
722                 promise.setSuccess();
723                 return;
724             }
725 
726             // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
727             // we need to ensure we do the actual deregister operation later. This is necessary to preserve the
728             // behavior of the AbstractChannel, which always invokes channelUnregistered and channelInactive
729             // events 'later' to ensure the current events in the handler are completed before these events.
730             //
731             // See:
732             // https://github.com/netty/netty/issues/4435
733             invokeLater(new Runnable() {
734                 @Override
735                 public void run() {
736                     if (fireChannelInactive) {
737                         pipeline.fireChannelInactive();
738                     }
739                     // The user can fire `deregister` events multiple times but we only want to fire the pipeline
740                     // event if the channel was actually registered.
741                     if (registered) {
742                         registered = false;
743                         pipeline.fireChannelUnregistered();
744                     }
745                     safeSetSuccess(promise);
746                 }
747             });
748         }
749 
750         private void safeSetSuccess(ChannelPromise promise) {
751             if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
752                 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
753             }
754         }
755 
756         private void invokeLater(Runnable task) {
757             try {
758                 // This method is used by outbound operation implementations to trigger an inbound event later.
759                 // They do not trigger an inbound event immediately because an outbound operation might have been
760                 // triggered by another inbound event handler method.  If fired immediately, the call stack
761                 // will look like this for example:
762                 //
763                 //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
764                 //   -> handlerA.ctx.close()
765                 //     -> channel.unsafe.close()
766                 //       -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
767                 //
768                 // which means the execution of two inbound handler methods of the same handler overlap undesirably.
769                 eventLoop().execute(task);
770             } catch (RejectedExecutionException e) {
771                 logger.warn("Can't invoke task later as EventLoop rejected it", e);
772             }
773         }
774 
775         @Override
776         public void beginRead() {
777             if (!isActive()) {
778                 return;
779             }
780             updateLocalWindowIfNeeded();
781 
782             switch (readStatus) {
783                 case IDLE:
784                     readStatus = ReadStatus.IN_PROGRESS;
785                     doBeginRead();
786                     break;
787                 case IN_PROGRESS:
788                     readStatus = ReadStatus.REQUESTED;
789                     break;
790                 default:
791                     break;
792             }
793         }
794 
795         private Object pollQueuedMessage() {
796             return inboundBuffer == null ? null : inboundBuffer.poll();
797         }
798 
799         void doBeginRead() {
800             // Process messages until there are none left (or the user stopped requesting) and also handle EOS.
801             while (readStatus != ReadStatus.IDLE) {
802                 Object message = pollQueuedMessage();
803                 if (message == null) {
804                     if (readEOS) {
805                         unsafe.closeForcibly();
806                     }
807                     // We need to double check that there is nothing left to flush such as a
808                     // window update frame.
809                     flush();
810                     break;
811                 }
812                 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
813                 allocHandle.reset(config());
814                 boolean continueReading = false;
815                 do {
816                     doRead0((Http2Frame) message, allocHandle);
817                 } while ((readEOS || (continueReading = allocHandle.continueReading()))
818                         && (message = pollQueuedMessage()) != null);
819 
820                 if (continueReading && isParentReadInProgress() && !readEOS) {
821                     // Currently the parent and child channel are on the same EventLoop thread. If the parent is
822                     // currently reading it is possible that more frames will be delivered to this child channel. In
823                     // the case that this child channel still wants to read we delay the channelReadComplete on this
824                     // child channel until the parent is done reading.
825                     maybeAddChannelToReadCompletePendingQueue();
826                 } else {
827                     notifyReadComplete(allocHandle, true);
828                 }
829             }
830         }
831 
832         void readEOS() {
833             readEOS = true;
834         }
835 
836         private void updateLocalWindowIfNeeded() {
837             if (flowControlledBytes != 0) {
838                 int bytes = flowControlledBytes;
839                 flowControlledBytes = 0;
840                 ChannelFuture future = write0(parentContext(), new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
841                 // window update frames are commonly swallowed by the Http2FrameCodec and the promise is synchronously
842                 // completed but the flow controller _may_ have generated a wire level WINDOW_UPDATE. Therefore we need,
843                 // to assume there was a write done that needs to be flushed or we risk flow control starvation.
844                 writeDoneAndNoFlush = true;
845                 // Add a listener which will notify and teardown the stream
846                 // when a window update fails if needed or check the result of the future directly if it was completed
847                 // already.
848                 // See https://github.com/netty/netty/issues/9663
849                 if (future.isDone()) {
850                     windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
851                 } else {
852                     future.addListener(windowUpdateFrameWriteListener);
853                 }
854             }
855         }
856 
857         void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle, boolean forceReadComplete) {
858             if (!readCompletePending && !forceReadComplete) {
859                 return;
860             }
861             // Set to false just in case we added the channel multiple times before.
862             readCompletePending = false;
863 
864             if (readStatus == ReadStatus.REQUESTED) {
865                 readStatus = ReadStatus.IN_PROGRESS;
866             } else {
867                 readStatus = ReadStatus.IDLE;
868             }
869 
870             allocHandle.readComplete();
871             pipeline().fireChannelReadComplete();
872             // Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent
873             // channel is not currently reading we need to force a flush at the child channel, because we cannot
874             // rely upon flush occurring in channelReadComplete on the parent channel.
875             flush();
876             if (readEOS) {
877                 unsafe.closeForcibly();
878             }
879         }
880 
881         @SuppressWarnings("deprecation")
882         void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
883             final int bytes;
884             if (frame instanceof Http2DataFrame) {
885                 bytes = ((Http2DataFrame) frame).initialFlowControlledBytes();
886 
887                 // It is important that we increment the flowControlledBytes before we call fireChannelRead(...)
888                 // as it may cause a read() that will call updateLocalWindowIfNeeded() and we need to ensure
889                 // in this case that we accounted for it.
890                 //
891                 // See https://github.com/netty/netty/issues/9663
892                 flowControlledBytes += bytes;
893             } else {
894                 bytes = MIN_HTTP2_FRAME_SIZE;
895             }
896             // Update before firing event through the pipeline to be consistent with other Channel implementation.
897             allocHandle.attemptedBytesRead(bytes);
898             allocHandle.lastBytesRead(bytes);
899             allocHandle.incMessagesRead(1);
900 
901             pipeline().fireChannelRead(frame);
902         }
903 
904         @Override
905         public void write(Object msg, final ChannelPromise promise) {
906             // After this point its not possible to cancel a write anymore.
907             if (!promise.setUncancellable()) {
908                 ReferenceCountUtil.release(msg);
909                 return;
910             }
911 
912             if (!isActive() ||
913                     // Once the outbound side was closed we should not allow header / data frames
914                     outboundClosed && (msg instanceof Http2HeadersFrame || msg instanceof Http2DataFrame)) {
915                 ReferenceCountUtil.release(msg);
916                 promise.setFailure(new ClosedChannelException());
917                 return;
918             }
919 
920             try {
921                 if (msg instanceof Http2StreamFrame) {
922                     Http2StreamFrame frame = validateStreamFrame((Http2StreamFrame) msg).stream(stream());
923                     writeHttp2StreamFrame(frame, promise);
924                 } else {
925                     String msgStr = msg.toString();
926                     ReferenceCountUtil.release(msg);
927                     promise.setFailure(new IllegalArgumentException(
928                             "Message must be an " + StringUtil.simpleClassName(Http2StreamFrame.class) +
929                                     ": " + msgStr));
930                 }
931             } catch (Throwable t) {
932                 promise.tryFailure(t);
933             }
934         }
935 
936         private void writeHttp2StreamFrame(Http2StreamFrame frame, final ChannelPromise promise) {
937             if (!firstFrameWritten && !isStreamIdValid(stream().id()) && !(frame instanceof Http2HeadersFrame)) {
938                 ReferenceCountUtil.release(frame);
939                 promise.setFailure(
940                     new IllegalArgumentException("The first frame must be a headers frame. Was: "
941                         + frame.name()));
942                 return;
943             }
944 
945             final boolean firstWrite;
946             if (firstFrameWritten) {
947                 firstWrite = false;
948             } else {
949                 firstWrite = firstFrameWritten = true;
950             }
951 
952             ChannelFuture f = write0(parentContext(), frame);
953             if (f.isDone()) {
954                 if (firstWrite) {
955                     firstWriteComplete(f, promise);
956                 } else {
957                     writeComplete(f, promise);
958                 }
959             } else {
960                 final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(frame);
961                 incrementPendingOutboundBytes(bytes, false);
962                 f.addListener(new ChannelFutureListener() {
963                     @Override
964                     public void operationComplete(ChannelFuture future) {
965                         if (firstWrite) {
966                             firstWriteComplete(future, promise);
967                         } else {
968                             writeComplete(future, promise);
969                         }
970                         decrementPendingOutboundBytes(bytes, false);
971                     }
972                 });
973                 writeDoneAndNoFlush = true;
974             }
975         }
976 
977         private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) {
978             Throwable cause = future.cause();
979             if (cause == null) {
980                 promise.setSuccess();
981             } else {
982                 // If the first write fails there is not much we can do, just close
983                 closeForcibly();
984                 promise.setFailure(wrapStreamClosedError(cause));
985             }
986         }
987 
988         private void writeComplete(ChannelFuture future, ChannelPromise promise) {
989             Throwable cause = future.cause();
990             if (cause == null) {
991                 promise.setSuccess();
992             } else {
993                 Throwable error = wrapStreamClosedError(cause);
994                 // To make it more consistent with AbstractChannel we handle all IOExceptions here.
995                 if (error instanceof IOException) {
996                     if (config.isAutoClose()) {
997                         // Close channel if needed.
998                         closeForcibly();
999                     } else {
1000                         // TODO: Once Http2StreamChannel extends DuplexChannel we should call shutdownOutput(...)
1001                         outboundClosed = true;
1002                     }
1003                 }
1004                 promise.setFailure(error);
1005             }
1006         }
1007 
1008         private Throwable wrapStreamClosedError(Throwable cause) {
1009             // If the error was caused by STREAM_CLOSED we should use a ClosedChannelException to better
1010             // mimic other transports and make it easier to reason about what exceptions to expect.
1011             if (cause instanceof Http2Exception && ((Http2Exception) cause).error() == Http2Error.STREAM_CLOSED) {
1012                 return new ClosedChannelException().initCause(cause);
1013             }
1014             return cause;
1015         }
1016 
1017         private Http2StreamFrame validateStreamFrame(Http2StreamFrame frame) {
1018             if (frame.stream() != null && frame.stream() != stream) {
1019                 String msgString = frame.toString();
1020                 ReferenceCountUtil.release(frame);
1021                 throw new IllegalArgumentException(
1022                         "Stream " + frame.stream() + " must not be set on the frame: " + msgString);
1023             }
1024             return frame;
1025         }
1026 
1027         @Override
1028         public void flush() {
1029             // If we are currently in the parent channel's read loop we should just ignore the flush.
1030             // We will ensure we trigger ctx.flush() after we processed all Channels later on and
1031             // so aggregate the flushes. This is done as ctx.flush() is expensive when as it may trigger an
1032             // write(...) or writev(...) operation on the socket.
1033             if (!writeDoneAndNoFlush || isParentReadInProgress()) {
1034                 // There is nothing to flush so this is a NOOP.
1035                 return;
1036             }
1037             // We need to set this to false before we call flush0(...) as ChannelFutureListener may produce more data
1038             // that are explicit flushed.
1039             writeDoneAndNoFlush = false;
1040             flush0(parentContext());
1041         }
1042 
1043         @Override
1044         public ChannelPromise voidPromise() {
1045             return unsafeVoidPromise;
1046         }
1047 
1048         @Override
1049         public ChannelOutboundBuffer outboundBuffer() {
1050             // Always return null as we not use the ChannelOutboundBuffer and not even support it.
1051             return null;
1052         }
1053     }
1054 
1055     /**
1056      * {@link ChannelConfig} so that the high and low writebuffer watermarks can reflect the outbound flow control
1057      * window, without having to create a new {@link WriteBufferWaterMark} object whenever the flow control window
1058      * changes.
1059      */
1060     private static final class Http2StreamChannelConfig extends DefaultChannelConfig {
1061         Http2StreamChannelConfig(Channel channel) {
1062             super(channel);
1063         }
1064 
1065         @Override
1066         public MessageSizeEstimator getMessageSizeEstimator() {
1067             return FlowControlledFrameSizeEstimator.INSTANCE;
1068         }
1069 
1070         @Override
1071         public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
1072             throw new UnsupportedOperationException();
1073         }
1074 
1075         @Override
1076         public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
1077             if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
1078                 throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " +
1079                         RecvByteBufAllocator.ExtendedHandle.class);
1080             }
1081             super.setRecvByteBufAllocator(allocator);
1082             return this;
1083         }
1084     }
1085 
1086     private void maybeAddChannelToReadCompletePendingQueue() {
1087         if (!readCompletePending) {
1088             readCompletePending = true;
1089             addChannelToReadCompletePendingQueue();
1090         }
1091     }
1092 
1093     protected void flush0(ChannelHandlerContext ctx) {
1094         ctx.flush();
1095     }
1096 
1097     protected ChannelFuture write0(ChannelHandlerContext ctx, Object msg) {
1098         ChannelPromise promise = ctx.newPromise();
1099         ctx.write(msg, promise);
1100         return promise;
1101     }
1102 
1103     protected abstract boolean isParentReadInProgress();
1104     protected abstract void addChannelToReadCompletePendingQueue();
1105     protected abstract ChannelHandlerContext parentContext();
1106 }