View Javadoc
1   /*
2    * Copyright 2019 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandler;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelId;
26  import io.netty.channel.ChannelMetadata;
27  import io.netty.channel.ChannelOption;
28  import io.netty.channel.ChannelOutboundBuffer;
29  import io.netty.channel.ChannelPipeline;
30  import io.netty.channel.ChannelProgressivePromise;
31  import io.netty.channel.ChannelPromise;
32  import io.netty.channel.DefaultChannelConfig;
33  import io.netty.channel.DefaultChannelPipeline;
34  import io.netty.channel.EventLoop;
35  import io.netty.channel.MessageSizeEstimator;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.VoidChannelPromise;
38  import io.netty.channel.WriteBufferWaterMark;
39  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40  import io.netty.channel.socket.ChannelOutputShutdownEvent;
41  import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
42  import io.netty.handler.ssl.SslCloseCompletionEvent;
43  import io.netty.util.DefaultAttributeMap;
44  import io.netty.util.ReferenceCountUtil;
45  import io.netty.util.concurrent.Future;
46  import io.netty.util.internal.ObjectUtil;
47  import io.netty.util.internal.StringUtil;
48  import io.netty.util.internal.logging.InternalLogger;
49  import io.netty.util.internal.logging.InternalLoggerFactory;
50  
51  import java.io.IOException;
52  import java.net.SocketAddress;
53  import java.nio.channels.ClosedChannelException;
54  import java.util.ArrayDeque;
55  import java.util.Map;
56  import java.util.Queue;
57  import java.util.concurrent.RejectedExecutionException;
58  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
59  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
60  
61  import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
62  import static io.netty.util.internal.ObjectUtil.checkNotNull;
63  import static java.lang.Math.min;
64  
65  abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
66  
67      static final Http2FrameStreamVisitor WRITABLE_VISITOR = new Http2FrameStreamVisitor() {
68          @Override
69          public boolean visit(Http2FrameStream stream) {
70              final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
71                      ((DefaultHttp2FrameStream) stream).attachment;
72              childChannel.trySetWritable();
73              return true;
74          }
75      };
76  
77      static final Http2FrameStreamVisitor CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR =
78              new UserEventStreamVisitor(ChannelInputShutdownReadComplete.INSTANCE);
79  
80      static final Http2FrameStreamVisitor CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR =
81              new UserEventStreamVisitor(ChannelOutputShutdownEvent.INSTANCE);
82  
83      static final Http2FrameStreamVisitor SSL_CLOSE_COMPLETION_EVENT_VISITOR =
84              new UserEventStreamVisitor(SslCloseCompletionEvent.SUCCESS);
85  
86      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractHttp2StreamChannel.class);
87  
88      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
89  
90      /**
91       * Number of bytes to consider non-payload messages. 9 is arbitrary, but also the minimum size of an HTTP/2 frame.
92       * Primarily is non-zero.
93       */
94      private static final int MIN_HTTP2_FRAME_SIZE = 9;
95  
96      /**
97       * {@link Http2FrameStreamVisitor} that fires the user event for every active stream pipeline.
98       */
99      private static final class UserEventStreamVisitor implements Http2FrameStreamVisitor {
100 
101         private final Object event;
102 
103         UserEventStreamVisitor(Object event) {
104             this.event = checkNotNull(event, "event");
105         }
106 
107         @Override
108         public boolean visit(Http2FrameStream stream) {
109             final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
110                     ((DefaultHttp2FrameStream) stream).attachment;
111             childChannel.pipeline().fireUserEventTriggered(event);
112             return true;
113         }
114     }
115 
116     /**
117      * Returns the flow-control size for DATA frames, and {@value MIN_HTTP2_FRAME_SIZE} for all other frames.
118      */
119     private static final class FlowControlledFrameSizeEstimator implements MessageSizeEstimator {
120 
121         static final FlowControlledFrameSizeEstimator INSTANCE = new FlowControlledFrameSizeEstimator();
122 
123         private static final Handle HANDLE_INSTANCE = new Handle() {
124             @Override
125             public int size(Object msg) {
126                 return msg instanceof Http2DataFrame ?
127                         // Guard against overflow.
128                         (int) min(Integer.MAX_VALUE, ((Http2DataFrame) msg).initialFlowControlledBytes() +
129                                 (long) MIN_HTTP2_FRAME_SIZE) : MIN_HTTP2_FRAME_SIZE;
130             }
131         };
132 
133         @Override
134         public Handle newHandle() {
135             return HANDLE_INSTANCE;
136         }
137     }
138 
139     private static final AtomicLongFieldUpdater<AbstractHttp2StreamChannel> TOTAL_PENDING_SIZE_UPDATER =
140             AtomicLongFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "totalPendingSize");
141 
142     private static final AtomicIntegerFieldUpdater<AbstractHttp2StreamChannel> UNWRITABLE_UPDATER =
143             AtomicIntegerFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "unwritable");
144 
145     private static void windowUpdateFrameWriteComplete(ChannelFuture future, Channel streamChannel) {
146         Throwable cause = future.cause();
147         if (cause != null) {
148             Throwable unwrappedCause;
149             // Unwrap if needed
150             if (cause instanceof Http2FrameStreamException && (unwrappedCause = cause.getCause()) != null) {
151                 cause = unwrappedCause;
152             }
153 
154             // Notify the child-channel and close it.
155             streamChannel.pipeline().fireExceptionCaught(cause);
156             streamChannel.unsafe().close(streamChannel.unsafe().voidPromise());
157         }
158     }
159 
160     private final ChannelFutureListener windowUpdateFrameWriteListener = future ->
161             windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
162 
163     /**
164      * The current status of the read-processing for a {@link AbstractHttp2StreamChannel}.
165      */
166     private enum ReadStatus {
167         /**
168          * No read in progress and no read was requested (yet)
169          */
170         IDLE,
171 
172         /**
173          * Reading in progress
174          */
175         IN_PROGRESS,
176 
177         /**
178          * A read operation was requested.
179          */
180         REQUESTED
181     }
182 
183     private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
184     private final Http2ChannelUnsafe unsafe = new Http2ChannelUnsafe();
185     private final Http2StreamChannelId channelId;
186     private final ChannelPipeline pipeline;
187     private final DefaultHttp2FrameStream stream;
188     private final ChannelPromise closePromise;
189 
190     private volatile boolean registered;
191 
192     private volatile long totalPendingSize;
193     private volatile int unwritable;
194 
195     // Cached to reduce GC
196     private Runnable fireChannelWritabilityChangedTask;
197 
198     private boolean outboundClosed;
199     private int flowControlledBytes;
200 
201     /**
202      * This variable represents if a read is in progress for the current channel or was requested.
203      * Note that depending upon the {@link RecvByteBufAllocator} behavior a read may extend beyond the
204      * {@link Http2ChannelUnsafe#beginRead()} method scope. The {@link Http2ChannelUnsafe#beginRead()} loop may
205      * drain all pending data, and then if the parent channel is reading this channel may still accept frames.
206      */
207     private ReadStatus readStatus = ReadStatus.IDLE;
208 
209     private Queue<Object> inboundBuffer;
210 
211     /** {@code true} after the first HEADERS frame has been written **/
212     private boolean firstFrameWritten;
213     private boolean readCompletePending;
214 
215     AbstractHttp2StreamChannel(DefaultHttp2FrameStream stream, int id, ChannelHandler inboundHandler) {
216         this.stream = stream;
217         stream.attachment = this;
218         pipeline = new DefaultChannelPipeline(this) {
219             @Override
220             protected void incrementPendingOutboundBytes(long size) {
221                 AbstractHttp2StreamChannel.this.incrementPendingOutboundBytes(size, true);
222             }
223 
224             @Override
225             protected void decrementPendingOutboundBytes(long size) {
226                 AbstractHttp2StreamChannel.this.decrementPendingOutboundBytes(size, true);
227             }
228 
229             @Override
230             protected void onUnhandledInboundException(Throwable cause) {
231                 // Ensure we use the correct Http2Error to close the channel.
232                 if (cause instanceof Http2FrameStreamException) {
233                     closeWithError(((Http2FrameStreamException) cause).error());
234                     return;
235                 } else {
236                     Http2Exception exception = Http2CodecUtil.getEmbeddedHttp2Exception(cause);
237                     if (exception != null) {
238                         closeWithError(exception.error());
239                         return;
240                     }
241                 }
242                 super.onUnhandledInboundException(cause);
243             }
244         };
245 
246         closePromise = pipeline.newPromise();
247         channelId = new Http2StreamChannelId(parent().id(), id);
248 
249         if (inboundHandler != null) {
250             // Add the handler to the pipeline now that we are registered.
251             pipeline.addLast(inboundHandler);
252         }
253     }
254 
255     private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
256         if (size == 0) {
257             return;
258         }
259 
260         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
261         if (newWriteBufferSize > config().getWriteBufferHighWaterMark()) {
262             setUnwritable(invokeLater);
263         }
264     }
265 
266     private void decrementPendingOutboundBytes(long size, boolean invokeLater) {
267         if (size == 0) {
268             return;
269         }
270 
271         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
272         // Once the totalPendingSize dropped below the low water-mark we can mark the child channel
273         // as writable again. Before doing so we also need to ensure the parent channel is writable to
274         // prevent excessive buffering in the parent outbound buffer. If the parent is not writable
275         // we will mark the child channel as writable once the parent becomes writable by calling
276         // trySetWritable() later.
277         if (newWriteBufferSize < config().getWriteBufferLowWaterMark() && parent().isWritable()) {
278             setWritable(invokeLater);
279         }
280     }
281 
282     final void trySetWritable() {
283         // The parent is writable again but the child channel itself may still not be writable.
284         // Lets try to set the child channel writable to match the state of the parent channel
285         // if (and only if) the totalPendingSize is smaller then the low water-mark.
286         // If this is not the case we will try again later once we drop under it.
287         if (totalPendingSize < config().getWriteBufferLowWaterMark()) {
288             setWritable(false);
289         }
290     }
291 
292     private void setWritable(boolean invokeLater) {
293         for (;;) {
294             final int oldValue = unwritable;
295             final int newValue = oldValue & ~1;
296             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
297                 if (oldValue != 0 && newValue == 0) {
298                     fireChannelWritabilityChanged(invokeLater);
299                 }
300                 break;
301             }
302         }
303     }
304 
305     private void setUnwritable(boolean invokeLater) {
306         for (;;) {
307             final int oldValue = unwritable;
308             final int newValue = oldValue | 1;
309             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
310                 if (oldValue == 0) {
311                     fireChannelWritabilityChanged(invokeLater);
312                 }
313                 break;
314             }
315         }
316     }
317 
318     private void fireChannelWritabilityChanged(boolean invokeLater) {
319         final ChannelPipeline pipeline = pipeline();
320         if (invokeLater) {
321             Runnable task = fireChannelWritabilityChangedTask;
322             if (task == null) {
323                 fireChannelWritabilityChangedTask = task = new Runnable() {
324                     @Override
325                     public void run() {
326                         pipeline.fireChannelWritabilityChanged();
327                     }
328                 };
329             }
330             eventLoop().execute(task);
331         } else {
332             pipeline.fireChannelWritabilityChanged();
333         }
334     }
335     @Override
336     public Http2FrameStream stream() {
337         return stream;
338     }
339 
340     void closeOutbound() {
341         outboundClosed = true;
342     }
343 
344     void streamClosed() {
345         unsafe.readEOS();
346         // Attempt to drain any queued data from the queue and deliver it to the application before closing this
347         // channel.
348         unsafe.doBeginRead();
349     }
350 
351     @Override
352     public ChannelMetadata metadata() {
353         return METADATA;
354     }
355 
356     @Override
357     public ChannelConfig config() {
358         return config;
359     }
360 
361     @Override
362     public boolean isOpen() {
363         return !closePromise.isDone();
364     }
365 
366     @Override
367     public boolean isActive() {
368         return isOpen();
369     }
370 
371     @Override
372     public boolean isWritable() {
373         return unwritable == 0;
374     }
375 
376     @Override
377     public ChannelId id() {
378         return channelId;
379     }
380 
381     @Override
382     public EventLoop eventLoop() {
383         return parent().eventLoop();
384     }
385 
386     @Override
387     public Channel parent() {
388         return parentContext().channel();
389     }
390 
391     @Override
392     public boolean isRegistered() {
393         return registered;
394     }
395 
396     @Override
397     public SocketAddress localAddress() {
398         return parent().localAddress();
399     }
400 
401     @Override
402     public SocketAddress remoteAddress() {
403         return parent().remoteAddress();
404     }
405 
406     @Override
407     public ChannelFuture closeFuture() {
408         return closePromise;
409     }
410 
411     @Override
412     public long bytesBeforeUnwritable() {
413         // +1 because writability doesn't change until the threshold is crossed (not equal to).
414         long bytes = config().getWriteBufferHighWaterMark() - totalPendingSize + 1;
415         // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check
416         // writability. Note that totalPendingSize and isWritable() use different volatile variables that are not
417         // synchronized together. totalPendingSize will be updated before isWritable().
418         return bytes > 0 && isWritable() ? bytes : 0;
419     }
420 
421     @Override
422     public long bytesBeforeWritable() {
423         // +1 because writability doesn't change until the threshold is crossed (not equal to).
424         long bytes = totalPendingSize - config().getWriteBufferLowWaterMark() + 1;
425         // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
426         // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
427         // together. totalPendingSize will be updated before isWritable().
428         return bytes <= 0 || isWritable() ? 0 : bytes;
429     }
430 
431     @Override
432     public Unsafe unsafe() {
433         return unsafe;
434     }
435 
436     @Override
437     public ChannelPipeline pipeline() {
438         return pipeline;
439     }
440 
441     @Override
442     public ByteBufAllocator alloc() {
443         return config().getAllocator();
444     }
445 
446     @Override
447     public Channel read() {
448         pipeline().read();
449         return this;
450     }
451 
452     @Override
453     public Channel flush() {
454         pipeline().flush();
455         return this;
456     }
457 
458     @Override
459     public ChannelFuture bind(SocketAddress localAddress) {
460         return pipeline().bind(localAddress);
461     }
462 
463     @Override
464     public ChannelFuture connect(SocketAddress remoteAddress) {
465         return pipeline().connect(remoteAddress);
466     }
467 
468     @Override
469     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
470         return pipeline().connect(remoteAddress, localAddress);
471     }
472 
473     @Override
474     public ChannelFuture disconnect() {
475         return pipeline().disconnect();
476     }
477 
478     @Override
479     public ChannelFuture close() {
480         return pipeline().close();
481     }
482 
483     @Override
484     public ChannelFuture deregister() {
485         return pipeline().deregister();
486     }
487 
488     @Override
489     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
490         return pipeline().bind(localAddress, promise);
491     }
492 
493     @Override
494     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
495         return pipeline().connect(remoteAddress, promise);
496     }
497 
498     @Override
499     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
500         return pipeline().connect(remoteAddress, localAddress, promise);
501     }
502 
503     @Override
504     public ChannelFuture disconnect(ChannelPromise promise) {
505         return pipeline().disconnect(promise);
506     }
507 
508     @Override
509     public ChannelFuture close(ChannelPromise promise) {
510         return pipeline().close(promise);
511     }
512 
513     @Override
514     public ChannelFuture deregister(ChannelPromise promise) {
515         return pipeline().deregister(promise);
516     }
517 
518     @Override
519     public ChannelFuture write(Object msg) {
520         return pipeline().write(msg);
521     }
522 
523     @Override
524     public ChannelFuture write(Object msg, ChannelPromise promise) {
525         return pipeline().write(msg, promise);
526     }
527 
528     @Override
529     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
530         return pipeline().writeAndFlush(msg, promise);
531     }
532 
533     @Override
534     public ChannelFuture writeAndFlush(Object msg) {
535         return pipeline().writeAndFlush(msg);
536     }
537 
538     @Override
539     public ChannelPromise newPromise() {
540         return pipeline().newPromise();
541     }
542 
543     @Override
544     public ChannelProgressivePromise newProgressivePromise() {
545         return pipeline().newProgressivePromise();
546     }
547 
548     @Override
549     public ChannelFuture newSucceededFuture() {
550         return pipeline().newSucceededFuture();
551     }
552 
553     @Override
554     public ChannelFuture newFailedFuture(Throwable cause) {
555         return pipeline().newFailedFuture(cause);
556     }
557 
558     @Override
559     public ChannelPromise voidPromise() {
560         return pipeline().voidPromise();
561     }
562 
563     @Override
564     public int hashCode() {
565         return id().hashCode();
566     }
567 
568     @Override
569     public boolean equals(Object o) {
570         return this == o;
571     }
572 
573     @Override
574     public int compareTo(Channel o) {
575         if (this == o) {
576             return 0;
577         }
578 
579         return id().compareTo(o.id());
580     }
581 
582     @Override
583     public String toString() {
584         return parent().toString() + '/' + channelId.getSequenceId() + " (H2 - " + stream + ')';
585     }
586 
587     /**
588      * Receive a read message. This does not notify handlers unless a read is in progress on the
589      * channel.
590      */
591     void fireChildRead(Http2Frame frame) {
592         assert eventLoop().inEventLoop();
593         if (!isActive()) {
594             ReferenceCountUtil.release(frame);
595         } else if (readStatus != ReadStatus.IDLE) {
596             // If a read is in progress or has been requested, there cannot be anything in the queue,
597             // otherwise we would have drained it from the queue and processed it during the read cycle.
598             assert inboundBuffer == null || inboundBuffer.isEmpty();
599             final RecvByteBufAllocator.Handle allocHandle = unsafe.recvBufAllocHandle();
600 
601             unsafe.doRead0(frame, allocHandle);
602             // We currently don't need to check for readEOS because the parent channel and child channel are limited
603             // to the same EventLoop thread. There are a limited number of frame types that may come after EOS is
604             // read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the
605             // cost of additional readComplete notifications on the rare path.
606             if (allocHandle.continueReading()) {
607                 maybeAddChannelToReadCompletePendingQueue();
608             } else {
609                 unsafe.notifyReadComplete(allocHandle, true, false);
610             }
611         } else {
612             if (inboundBuffer == null) {
613                 inboundBuffer = new ArrayDeque<Object>(4);
614             }
615             inboundBuffer.add(frame);
616         }
617     }
618 
619     void fireChildReadComplete() {
620         assert eventLoop().inEventLoop();
621         assert readStatus != ReadStatus.IDLE || !readCompletePending;
622         unsafe.notifyReadComplete(unsafe.recvBufAllocHandle(), false, false);
623     }
624 
625     final void closeWithError(Http2Error error) {
626         assert eventLoop().inEventLoop();
627         unsafe.close(unsafe.voidPromise(), error);
628     }
629 
630     private final class Http2ChannelUnsafe implements Unsafe {
631         private final VoidChannelPromise unsafeVoidPromise =
632                 new VoidChannelPromise(AbstractHttp2StreamChannel.this, false);
633         @SuppressWarnings("deprecation")
634         private RecvByteBufAllocator.Handle recvHandle;
635         private boolean writeDoneAndNoFlush;
636         private boolean closeInitiated;
637         private boolean readEOS;
638 
639         private boolean receivedEndOfStream;
640         private boolean sentEndOfStream;
641 
642         @Override
643         public void connect(final SocketAddress remoteAddress,
644                             SocketAddress localAddress, final ChannelPromise promise) {
645             if (!promise.setUncancellable()) {
646                 return;
647             }
648             promise.setFailure(new UnsupportedOperationException());
649         }
650 
651         @Override
652         public RecvByteBufAllocator.Handle recvBufAllocHandle() {
653             if (recvHandle == null) {
654                 recvHandle = config().getRecvByteBufAllocator().newHandle();
655                 recvHandle.reset(config());
656             }
657             return recvHandle;
658         }
659 
660         @Override
661         public SocketAddress localAddress() {
662             return parent().unsafe().localAddress();
663         }
664 
665         @Override
666         public SocketAddress remoteAddress() {
667             return parent().unsafe().remoteAddress();
668         }
669 
670         @Override
671         public void register(EventLoop eventLoop, ChannelPromise promise) {
672             if (!promise.setUncancellable()) {
673                 return;
674             }
675             if (registered) {
676                 promise.setFailure(new UnsupportedOperationException("Re-register is not supported"));
677                 return;
678             }
679 
680             registered = true;
681 
682             promise.setSuccess();
683 
684             pipeline().fireChannelRegistered();
685             if (isActive()) {
686                 pipeline().fireChannelActive();
687             }
688         }
689 
690         @Override
691         public void bind(SocketAddress localAddress, ChannelPromise promise) {
692             if (!promise.setUncancellable()) {
693                 return;
694             }
695             promise.setFailure(new UnsupportedOperationException());
696         }
697 
698         @Override
699         public void disconnect(ChannelPromise promise) {
700             close(promise);
701         }
702 
703         @Override
704         public void close(final ChannelPromise promise) {
705             close(promise, null);
706         }
707 
708         private void close(final ChannelPromise promise, Http2Error error) {
709             if (!promise.setUncancellable()) {
710                 return;
711             }
712             if (closeInitiated) {
713                 if (closePromise.isDone()) {
714                     // Closed already.
715                     promise.setSuccess();
716                 } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
717                     // This means close() was called before so we just register a listener and return
718                     closePromise.addListener(future -> promise.setSuccess());
719                 }
720                 return;
721             }
722             closeInitiated = true;
723             // Just set to false as removing from an underlying queue would even be more expensive.
724             readCompletePending = false;
725 
726             final boolean wasActive = isActive();
727 
728             // There is no need to update the local window as once the stream is closed all the pending bytes will be
729             // given back to the connection window by the controller itself.
730 
731             // Only ever send a reset frame if the connection is still alive and if the stream was created before
732             // as otherwise we may send a RST on a stream in an invalid state and cause a connection error.
733             if (parent().isActive() && isStreamIdValid(stream.id())) {
734                 // If error is null we know that the close was not triggered by an error and so we should only
735                 // try to send a RST frame if we didn't signal the end of the stream before.
736                 if (error == null) {
737                     if (!readEOS && !(receivedEndOfStream && sentEndOfStream)) {
738                         Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream());
739                         write(resetFrame, unsafe().voidPromise());
740                         flush();
741                     }
742                 } else {
743                     // Close was triggered by a stream error, in this case we always want to send a RST frame.
744                     Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(error).stream(stream());
745                     write(resetFrame, unsafe().voidPromise());
746                     flush();
747                 }
748             }
749 
750             if (inboundBuffer != null) {
751                 for (;;) {
752                     Object msg = inboundBuffer.poll();
753                     if (msg == null) {
754                         break;
755                     }
756                     ReferenceCountUtil.release(msg);
757                 }
758                 inboundBuffer = null;
759             }
760 
761             // The promise should be notified before we call fireChannelInactive().
762             outboundClosed = true;
763             closePromise.setSuccess();
764             promise.setSuccess();
765 
766             fireChannelInactiveAndDeregister(voidPromise(), wasActive);
767         }
768 
769         @Override
770         public void closeForcibly() {
771             close(unsafe().voidPromise());
772         }
773 
774         @Override
775         public void deregister(ChannelPromise promise) {
776             fireChannelInactiveAndDeregister(promise, false);
777         }
778 
779         private void fireChannelInactiveAndDeregister(final ChannelPromise promise,
780                                                       final boolean fireChannelInactive) {
781             if (!promise.setUncancellable()) {
782                 return;
783             }
784 
785             if (!registered) {
786                 promise.setSuccess();
787                 return;
788             }
789 
790             // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
791             // we need to ensure we do the actual deregister operation later. This is necessary to preserve the
792             // behavior of the AbstractChannel, which always invokes channelUnregistered and channelInactive
793             // events 'later' to ensure the current events in the handler are completed before these events.
794             //
795             // See:
796             // https://github.com/netty/netty/issues/4435
797             invokeLater(promise.channel(), new Runnable() {
798                 @Override
799                 public void run() {
800                     if (fireChannelInactive) {
801                         pipeline.fireChannelInactive();
802                     }
803                     // The user can fire `deregister` events multiple times but we only want to fire the pipeline
804                     // event if the channel was actually registered.
805                     if (registered) {
806                         registered = false;
807                         pipeline.fireChannelUnregistered();
808                     }
809                     safeSetSuccess(promise);
810                 }
811             });
812         }
813 
814         private void safeSetSuccess(ChannelPromise promise) {
815             if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
816                 logger.warn("{} Failed to mark a promise as success because it is done already: {}",
817                         promise.channel(), promise);
818             }
819         }
820 
821         private void invokeLater(Channel channel, Runnable task) {
822             try {
823                 // This method is used by outbound operation implementations to trigger an inbound event later.
824                 // They do not trigger an inbound event immediately because an outbound operation might have been
825                 // triggered by another inbound event handler method.  If fired immediately, the call stack
826                 // will look like this for example:
827                 //
828                 //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
829                 //   -> handlerA.ctx.close()
830                 //     -> channel.unsafe.close()
831                 //       -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
832                 //
833                 // which means the execution of two inbound handler methods of the same handler overlap undesirably.
834                 eventLoop().execute(task);
835             } catch (RejectedExecutionException e) {
836                 logger.warn("{} Can't invoke task later as EventLoop rejected it", channel, e);
837             }
838         }
839 
840         @Override
841         public void beginRead() {
842             if (!isActive()) {
843                 return;
844             }
845             updateLocalWindowIfNeeded();
846 
847             switch (readStatus) {
848                 case IDLE:
849                     readStatus = ReadStatus.IN_PROGRESS;
850                     doBeginRead();
851                     break;
852                 case IN_PROGRESS:
853                     readStatus = ReadStatus.REQUESTED;
854                     break;
855                 default:
856                     break;
857             }
858         }
859 
860         private Object pollQueuedMessage() {
861             return inboundBuffer == null ? null : inboundBuffer.poll();
862         }
863 
864         void doBeginRead() {
865             if (readStatus == ReadStatus.IDLE) {
866                 // Don't wait for the user to request a read to notify of channel closure.
867                 if (readEOS && (inboundBuffer == null || inboundBuffer.isEmpty())) {
868                     // Double check there is nothing left to flush such as a window update frame.
869                     flush();
870                     unsafe.closeForcibly();
871                 }
872             } else {
873                 do { // Process messages until there are none left (or the user stopped requesting) and also handle EOS.
874                     Object message = pollQueuedMessage();
875                     if (message == null) {
876                         // Double check there is nothing left to flush such as a window update frame.
877                         flush();
878                         if (readEOS) {
879                             unsafe.closeForcibly();
880                         }
881                         break;
882                     }
883                     final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
884                     allocHandle.reset(config());
885                     boolean continueReading = false;
886                     do {
887                         doRead0((Http2Frame) message, allocHandle);
888                     } while ((readEOS || (continueReading = allocHandle.continueReading()))
889                             && (message = pollQueuedMessage()) != null);
890 
891                     if (continueReading && isParentReadInProgress() && !readEOS) {
892                         // Currently the parent and child channel are on the same EventLoop thread. If the parent is
893                         // currently reading it is possible that more frames will be delivered to this child channel. In
894                         // the case that this child channel still wants to read we delay the channelReadComplete on this
895                         // child channel until the parent is done reading.
896                         maybeAddChannelToReadCompletePendingQueue();
897                     } else {
898                         notifyReadComplete(allocHandle, true, true);
899 
900                         // While in the read loop reset the readState AFTER calling readComplete (or other pipeline
901                         // callbacks) to prevents re-entry into this method (if autoRead is disabled and the user calls
902                         // read on each readComplete) and StackOverflowException.
903                         resetReadStatus();
904                     }
905                 } while (readStatus != ReadStatus.IDLE);
906             }
907         }
908 
909         void readEOS() {
910             readEOS = true;
911         }
912 
913         private boolean updateLocalWindowIfNeeded() {
914             if (flowControlledBytes != 0 && !parentContext().isRemoved() && config.autoStreamFlowControl) {
915                 int bytes = flowControlledBytes;
916                 flowControlledBytes = 0;
917                 writeWindowUpdateFrame(new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
918                 return true;
919             }
920             return false;
921         }
922 
923         void updateLocalWindowIfNeededAndFlush() {
924             if (updateLocalWindowIfNeeded()) {
925                 flush();
926             }
927         }
928 
929         private void resetReadStatus() {
930             readStatus = readStatus == ReadStatus.REQUESTED ? ReadStatus.IN_PROGRESS : ReadStatus.IDLE;
931         }
932 
933         void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle, boolean forceReadComplete,
934                                 boolean inReadLoop) {
935             if (!readCompletePending && !forceReadComplete) {
936                 return;
937             }
938             // Set to false just in case we added the channel multiple times before.
939             readCompletePending = false;
940 
941             if (!inReadLoop) {
942                 // While in the read loop we reset the state after calling pipeline methods to prevent StackOverflow.
943                 resetReadStatus();
944             }
945 
946             allocHandle.readComplete();
947             pipeline().fireChannelReadComplete();
948             // Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent
949             // channel is not currently reading we need to force a flush at the child channel, because we cannot
950             // rely upon flush occurring in channelReadComplete on the parent channel.
951             flush();
952             if (readEOS) {
953                 unsafe.closeForcibly();
954             }
955         }
956 
957         @SuppressWarnings("deprecation")
958         void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
959             final int bytes;
960             if (frame instanceof Http2DataFrame) {
961                 bytes = ((Http2DataFrame) frame).initialFlowControlledBytes();
962                 // It is important that we increment the flowControlledBytes before we call fireChannelRead(...)
963                 // as it may cause a read() that will call updateLocalWindowIfNeeded() and we need to ensure
964                 // in this case that we accounted for it.
965                 //
966                 // See https://github.com/netty/netty/issues/9663
967                 flowControlledBytes += bytes;
968             } else {
969                 bytes = MIN_HTTP2_FRAME_SIZE;
970             }
971 
972             // Let's keep track of what we received as the stream state itself will only be updated once the frame
973             // was dispatched for reading which might cause problems if we try to close the channel in a write future.
974             receivedEndOfStream |= isEndOfStream(frame);
975 
976             // Update before firing event through the pipeline to be consistent with other Channel implementation.
977             allocHandle.attemptedBytesRead(bytes);
978             allocHandle.lastBytesRead(bytes);
979             allocHandle.incMessagesRead(1);
980 
981             pipeline().fireChannelRead(frame);
982         }
983 
984         private ChannelFuture writeWindowUpdateFrame(Http2WindowUpdateFrame windowUpdateFrame) {
985             ChannelFuture future = write0(parentContext(), windowUpdateFrame);
986             // window update frames are commonly swallowed by the Http2FrameCodec and the promise is synchronously
987             // completed but the flow controller _may_ have generated a wire level WINDOW_UPDATE. Therefore we need,
988             // to assume there was a write done that needs to be flushed or we risk flow control starvation.
989             writeDoneAndNoFlush = true;
990             // Add a listener which will notify and teardown the stream
991             // when a window update fails if needed or check the result of the future directly if it was completed
992             // already.
993             // See https://github.com/netty/netty/issues/9663
994             if (future.isDone()) {
995                 windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
996             } else {
997                 future.addListener(windowUpdateFrameWriteListener);
998             }
999             return future;
1000         }
1001 
1002         @Override
1003         public void write(Object msg, final ChannelPromise promise) {
1004             // After this point its not possible to cancel a write anymore.
1005             if (!promise.setUncancellable()) {
1006                 ReferenceCountUtil.release(msg);
1007                 return;
1008             }
1009 
1010             if (!isActive() ||
1011                     // Once the outbound side was closed we should not allow header / data frames
1012                     outboundClosed && (msg instanceof Http2HeadersFrame || msg instanceof Http2DataFrame)) {
1013                 ReferenceCountUtil.release(msg);
1014                 promise.setFailure(new ClosedChannelException());
1015                 return;
1016             }
1017 
1018             try {
1019                 if (msg instanceof Http2StreamFrame) {
1020                     Http2StreamFrame frame = validateStreamFrame((Http2StreamFrame) msg).stream(stream());
1021                     if (msg instanceof Http2WindowUpdateFrame) {
1022                         Http2WindowUpdateFrame updateFrame = (Http2WindowUpdateFrame) msg;
1023                         if (config.autoStreamFlowControl) {
1024                             ReferenceCountUtil.release(msg);
1025                             promise.setFailure(new UnsupportedOperationException(
1026                                     Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL + " is set to false"));
1027                             return;
1028                         }
1029                         try {
1030                             ObjectUtil.checkInRange(updateFrame.windowSizeIncrement(), 0,
1031                                     flowControlledBytes, "windowSizeIncrement");
1032                         } catch (RuntimeException e) {
1033                             ReferenceCountUtil.release(updateFrame);
1034                             promise.setFailure(e);
1035                             return;
1036                         }
1037                         flowControlledBytes -= updateFrame.windowSizeIncrement();
1038                         if (parentContext().isRemoved()) {
1039                             ReferenceCountUtil.release(msg);
1040                             promise.setFailure(new ClosedChannelException());
1041                             return;
1042                         }
1043                         ChannelFuture f = writeWindowUpdateFrame(updateFrame);
1044                         if (f.isDone()) {
1045                             writeComplete(f, promise);
1046                         } else {
1047                             f.addListener(future -> writeComplete(future, promise));
1048                         }
1049                     } else {
1050                         writeHttp2StreamFrame(frame, promise);
1051                     }
1052                 } else {
1053                     String msgStr = msg.toString();
1054                     ReferenceCountUtil.release(msg);
1055                     promise.setFailure(new IllegalArgumentException(
1056                             "Message must be an " + StringUtil.simpleClassName(Http2StreamFrame.class) +
1057                                     ": " + msgStr));
1058                 }
1059             } catch (Throwable t) {
1060                 promise.tryFailure(t);
1061             }
1062         }
1063 
1064         private boolean isEndOfStream(Http2Frame frame) {
1065             if (frame instanceof Http2HeadersFrame) {
1066                 return ((Http2HeadersFrame) frame).isEndStream();
1067             }
1068             if (frame instanceof Http2DataFrame) {
1069                 return ((Http2DataFrame) frame).isEndStream();
1070             }
1071             return false;
1072         }
1073 
1074         private void writeHttp2StreamFrame(Http2StreamFrame frame, final ChannelPromise promise) {
1075             if (!firstFrameWritten && !isStreamIdValid(stream().id()) && !(frame instanceof Http2HeadersFrame)) {
1076                 ReferenceCountUtil.release(frame);
1077                 promise.setFailure(
1078                     new IllegalArgumentException("The first frame must be a headers frame. Was: "
1079                         + frame.name()));
1080                 return;
1081             }
1082 
1083             final boolean firstWrite;
1084             if (firstFrameWritten) {
1085                 firstWrite = false;
1086             } else {
1087                 firstWrite = firstFrameWritten = true;
1088             }
1089 
1090             // Let's keep track of what we send as the stream state itself will only be updated once the frame
1091             // was written which might cause problems if we try to close the channel in a write future.
1092             sentEndOfStream |= isEndOfStream(frame);
1093             ChannelFuture f = write0(parentContext(), frame);
1094             if (f.isDone()) {
1095                 if (firstWrite) {
1096                     firstWriteComplete(f, promise);
1097                 } else {
1098                     writeComplete(f, promise);
1099                 }
1100             } else {
1101                 final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(frame);
1102                 incrementPendingOutboundBytes(bytes, false);
1103                 f.addListener(future -> {
1104                     if (firstWrite) {
1105                         firstWriteComplete(future, promise);
1106                     } else {
1107                         writeComplete(future, promise);
1108                     }
1109                     decrementPendingOutboundBytes(bytes, false);
1110                 });
1111                 writeDoneAndNoFlush = true;
1112             }
1113         }
1114 
1115         private void firstWriteComplete(Future<?> future, ChannelPromise promise) {
1116             Throwable cause = future.cause();
1117             if (cause == null) {
1118                 promise.setSuccess();
1119             } else {
1120                 // If the first write fails there is not much we can do, just close
1121                 closeForcibly();
1122                 promise.setFailure(wrapStreamClosedError(cause));
1123             }
1124         }
1125 
1126         private void writeComplete(Future<?> future, ChannelPromise promise) {
1127             Throwable cause = future.cause();
1128             if (cause == null) {
1129                 promise.setSuccess();
1130             } else {
1131                 Throwable error = wrapStreamClosedError(cause);
1132                 // To make it more consistent with AbstractChannel we handle all IOExceptions here.
1133                 if (error instanceof IOException) {
1134                     if (config.isAutoClose()) {
1135                         // Close channel if needed.
1136                         closeForcibly();
1137                     } else {
1138                         // TODO: Once Http2StreamChannel extends DuplexChannel we should call shutdownOutput(...)
1139                         outboundClosed = true;
1140                     }
1141                 }
1142                 promise.setFailure(error);
1143             }
1144         }
1145 
1146         private Throwable wrapStreamClosedError(Throwable cause) {
1147             // If the error was caused by STREAM_CLOSED we should use a ClosedChannelException to better
1148             // mimic other transports and make it easier to reason about what exceptions to expect.
1149             if (cause instanceof Http2Exception && ((Http2Exception) cause).error() == Http2Error.STREAM_CLOSED) {
1150                 return new ClosedChannelException().initCause(cause);
1151             }
1152             return cause;
1153         }
1154 
1155         private Http2StreamFrame validateStreamFrame(Http2StreamFrame frame) {
1156             if (frame.stream() != null && frame.stream() != stream) {
1157                 String msgString = frame.toString();
1158                 ReferenceCountUtil.release(frame);
1159                 throw new IllegalArgumentException(
1160                         "Stream " + frame.stream() + " must not be set on the frame: " + msgString);
1161             }
1162             return frame;
1163         }
1164 
1165         @Override
1166         public void flush() {
1167             // If we are currently in the parent channel's read loop we should just ignore the flush.
1168             // We will ensure we trigger ctx.flush() after we processed all Channels later on and
1169             // so aggregate the flushes. This is done as ctx.flush() is expensive when as it may trigger an
1170             // write(...) or writev(...) operation on the socket.
1171             if (!writeDoneAndNoFlush || isParentReadInProgress()) {
1172                 // There is nothing to flush so this is a NOOP.
1173                 return;
1174             }
1175             // We need to set this to false before we call flush0(...) as ChannelFutureListener may produce more data
1176             // that are explicit flushed.
1177             writeDoneAndNoFlush = false;
1178             flush0(parentContext());
1179         }
1180 
1181         @Override
1182         public ChannelPromise voidPromise() {
1183             return unsafeVoidPromise;
1184         }
1185 
1186         @Override
1187         public ChannelOutboundBuffer outboundBuffer() {
1188             // Always return null as we not use the ChannelOutboundBuffer and not even support it.
1189             return null;
1190         }
1191     }
1192 
1193     /**
1194      * {@link ChannelConfig} so that the high and low writebuffer watermarks can reflect the outbound flow control
1195      * window, without having to create a new {@link WriteBufferWaterMark} object whenever the flow control window
1196      * changes.
1197      */
1198     private static final class Http2StreamChannelConfig extends DefaultChannelConfig {
1199 
1200         volatile boolean autoStreamFlowControl = true;
1201         Http2StreamChannelConfig(Channel channel) {
1202             super(channel);
1203         }
1204 
1205         @Override
1206         public MessageSizeEstimator getMessageSizeEstimator() {
1207             return FlowControlledFrameSizeEstimator.INSTANCE;
1208         }
1209 
1210         @Override
1211         public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
1212             throw new UnsupportedOperationException();
1213         }
1214 
1215         @Override
1216         public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
1217             if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
1218                 throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " +
1219                         RecvByteBufAllocator.ExtendedHandle.class);
1220             }
1221             super.setRecvByteBufAllocator(allocator);
1222             return this;
1223         }
1224 
1225         @Override
1226         public Map<ChannelOption<?>, Object> getOptions() {
1227             return getOptions(
1228                     super.getOptions(),
1229                     Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL);
1230         }
1231 
1232         @SuppressWarnings("unchecked")
1233         @Override
1234         public <T> T getOption(ChannelOption<T> option) {
1235             if (option == Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL) {
1236                 return (T) Boolean.valueOf(autoStreamFlowControl);
1237             }
1238             return super.getOption(option);
1239         }
1240 
1241         @Override
1242         public <T> boolean setOption(ChannelOption<T> option, T value) {
1243             validate(option, value);
1244             if (option == Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL) {
1245                 boolean newValue = (Boolean) value;
1246                 boolean changed = newValue && !autoStreamFlowControl;
1247                 autoStreamFlowControl = (Boolean) value;
1248                 if (changed) {
1249                     if (channel.isRegistered()) {
1250                         final Http2ChannelUnsafe unsafe = (Http2ChannelUnsafe) channel.unsafe();
1251                         if (channel.eventLoop().inEventLoop()) {
1252                             unsafe.updateLocalWindowIfNeededAndFlush();
1253                         } else {
1254                             channel.eventLoop().execute(new Runnable() {
1255                                 @Override
1256                                 public void run() {
1257                                     unsafe.updateLocalWindowIfNeededAndFlush();
1258                                 }
1259                             });
1260                         }
1261                     }
1262                 }
1263                 return true;
1264             }
1265             return super.setOption(option, value);
1266         }
1267     }
1268 
1269     private void maybeAddChannelToReadCompletePendingQueue() {
1270         if (!readCompletePending) {
1271             readCompletePending = true;
1272             addChannelToReadCompletePendingQueue();
1273         }
1274     }
1275 
1276     protected void flush0(ChannelHandlerContext ctx) {
1277         ctx.flush();
1278     }
1279 
1280     protected ChannelFuture write0(ChannelHandlerContext ctx, Object msg) {
1281         ChannelPromise promise = ctx.newPromise();
1282         ctx.write(msg, promise);
1283         return promise;
1284     }
1285 
1286     protected abstract boolean isParentReadInProgress();
1287     protected abstract void addChannelToReadCompletePendingQueue();
1288     protected abstract ChannelHandlerContext parentContext();
1289 }