View Javadoc
1   /*
2    * Copyright 2019 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandler;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelId;
26  import io.netty.channel.ChannelMetadata;
27  import io.netty.channel.ChannelOutboundBuffer;
28  import io.netty.channel.ChannelPipeline;
29  import io.netty.channel.ChannelProgressivePromise;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.DefaultChannelConfig;
32  import io.netty.channel.DefaultChannelPipeline;
33  import io.netty.channel.EventLoop;
34  import io.netty.channel.MessageSizeEstimator;
35  import io.netty.channel.RecvByteBufAllocator;
36  import io.netty.channel.VoidChannelPromise;
37  import io.netty.channel.WriteBufferWaterMark;
38  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39  import io.netty.channel.socket.ChannelOutputShutdownEvent;
40  import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
41  import io.netty.handler.ssl.SslCloseCompletionEvent;
42  import io.netty.util.DefaultAttributeMap;
43  import io.netty.util.ReferenceCountUtil;
44  import io.netty.util.internal.StringUtil;
45  import io.netty.util.internal.logging.InternalLogger;
46  import io.netty.util.internal.logging.InternalLoggerFactory;
47  
48  import java.io.IOException;
49  import java.net.SocketAddress;
50  import java.nio.channels.ClosedChannelException;
51  import java.util.ArrayDeque;
52  import java.util.Queue;
53  import java.util.concurrent.RejectedExecutionException;
54  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
55  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
56  
57  import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
58  import static io.netty.util.internal.ObjectUtil.checkNotNull;
59  import static java.lang.Math.min;
60  
61  abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
62  
63      static final Http2FrameStreamVisitor WRITABLE_VISITOR = new Http2FrameStreamVisitor() {
64          @Override
65          public boolean visit(Http2FrameStream stream) {
66              final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
67                      ((DefaultHttp2FrameStream) stream).attachment;
68              childChannel.trySetWritable();
69              return true;
70          }
71      };
72  
73      static final Http2FrameStreamVisitor CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR =
74              new UserEventStreamVisitor(ChannelInputShutdownReadComplete.INSTANCE);
75  
76      static final Http2FrameStreamVisitor CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR =
77              new UserEventStreamVisitor(ChannelOutputShutdownEvent.INSTANCE);
78  
79      static final Http2FrameStreamVisitor SSL_CLOSE_COMPLETION_EVENT_VISITOR =
80              new UserEventStreamVisitor(SslCloseCompletionEvent.SUCCESS);
81  
82      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractHttp2StreamChannel.class);
83  
84      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
85  
86      /**
87       * Number of bytes to consider non-payload messages. 9 is arbitrary, but also the minimum size of an HTTP/2 frame.
88       * Primarily is non-zero.
89       */
90      private static final int MIN_HTTP2_FRAME_SIZE = 9;
91  
92      /**
93       * {@link Http2FrameStreamVisitor} that fires the user event for every active stream pipeline.
94       */
95      private static final class UserEventStreamVisitor implements Http2FrameStreamVisitor {
96  
97          private final Object event;
98  
99          UserEventStreamVisitor(Object event) {
100             this.event = checkNotNull(event, "event");
101         }
102 
103         @Override
104         public boolean visit(Http2FrameStream stream) {
105             final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
106                     ((DefaultHttp2FrameStream) stream).attachment;
107             childChannel.pipeline().fireUserEventTriggered(event);
108             return true;
109         }
110     }
111 
112     /**
113      * Returns the flow-control size for DATA frames, and {@value MIN_HTTP2_FRAME_SIZE} for all other frames.
114      */
115     private static final class FlowControlledFrameSizeEstimator implements MessageSizeEstimator {
116 
117         static final FlowControlledFrameSizeEstimator INSTANCE = new FlowControlledFrameSizeEstimator();
118 
119         private static final Handle HANDLE_INSTANCE = new Handle() {
120             @Override
121             public int size(Object msg) {
122                 return msg instanceof Http2DataFrame ?
123                         // Guard against overflow.
124                         (int) min(Integer.MAX_VALUE, ((Http2DataFrame) msg).initialFlowControlledBytes() +
125                                 (long) MIN_HTTP2_FRAME_SIZE) : MIN_HTTP2_FRAME_SIZE;
126             }
127         };
128 
129         @Override
130         public Handle newHandle() {
131             return HANDLE_INSTANCE;
132         }
133     }
134 
135     private static final AtomicLongFieldUpdater<AbstractHttp2StreamChannel> TOTAL_PENDING_SIZE_UPDATER =
136             AtomicLongFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "totalPendingSize");
137 
138     private static final AtomicIntegerFieldUpdater<AbstractHttp2StreamChannel> UNWRITABLE_UPDATER =
139             AtomicIntegerFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "unwritable");
140 
141     private static void windowUpdateFrameWriteComplete(ChannelFuture future, Channel streamChannel) {
142         Throwable cause = future.cause();
143         if (cause != null) {
144             Throwable unwrappedCause;
145             // Unwrap if needed
146             if (cause instanceof Http2FrameStreamException && (unwrappedCause = cause.getCause()) != null) {
147                 cause = unwrappedCause;
148             }
149 
150             // Notify the child-channel and close it.
151             streamChannel.pipeline().fireExceptionCaught(cause);
152             streamChannel.unsafe().close(streamChannel.unsafe().voidPromise());
153         }
154     }
155 
156     private final ChannelFutureListener windowUpdateFrameWriteListener = new ChannelFutureListener() {
157         @Override
158         public void operationComplete(ChannelFuture future) {
159             windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
160         }
161     };
162 
163     /**
164      * The current status of the read-processing for a {@link AbstractHttp2StreamChannel}.
165      */
166     private enum ReadStatus {
167         /**
168          * No read in progress and no read was requested (yet)
169          */
170         IDLE,
171 
172         /**
173          * Reading in progress
174          */
175         IN_PROGRESS,
176 
177         /**
178          * A read operation was requested.
179          */
180         REQUESTED
181     }
182 
183     private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
184     private final Http2ChannelUnsafe unsafe = new Http2ChannelUnsafe();
185     private final ChannelId channelId;
186     private final ChannelPipeline pipeline;
187     private final DefaultHttp2FrameStream stream;
188     private final ChannelPromise closePromise;
189 
190     private volatile boolean registered;
191 
192     private volatile long totalPendingSize;
193     private volatile int unwritable;
194 
195     // Cached to reduce GC
196     private Runnable fireChannelWritabilityChangedTask;
197 
198     private boolean outboundClosed;
199     private int flowControlledBytes;
200 
201     /**
202      * This variable represents if a read is in progress for the current channel or was requested.
203      * Note that depending upon the {@link RecvByteBufAllocator} behavior a read may extend beyond the
204      * {@link Http2ChannelUnsafe#beginRead()} method scope. The {@link Http2ChannelUnsafe#beginRead()} loop may
205      * drain all pending data, and then if the parent channel is reading this channel may still accept frames.
206      */
207     private ReadStatus readStatus = ReadStatus.IDLE;
208 
209     private Queue<Object> inboundBuffer;
210 
211     /** {@code true} after the first HEADERS frame has been written **/
212     private boolean firstFrameWritten;
213     private boolean readCompletePending;
214 
215     AbstractHttp2StreamChannel(DefaultHttp2FrameStream stream, int id, ChannelHandler inboundHandler) {
216         this.stream = stream;
217         stream.attachment = this;
218         pipeline = new DefaultChannelPipeline(this) {
219             @Override
220             protected void incrementPendingOutboundBytes(long size) {
221                 AbstractHttp2StreamChannel.this.incrementPendingOutboundBytes(size, true);
222             }
223 
224             @Override
225             protected void decrementPendingOutboundBytes(long size) {
226                 AbstractHttp2StreamChannel.this.decrementPendingOutboundBytes(size, true);
227             }
228 
229             @Override
230             protected void onUnhandledInboundException(Throwable cause) {
231                 // Ensure we use the correct Http2Error to close the channel.
232                 if (cause instanceof Http2FrameStreamException) {
233                     closeWithError(((Http2FrameStreamException) cause).error());
234                     return;
235                 } else {
236                     Http2Exception exception = Http2CodecUtil.getEmbeddedHttp2Exception(cause);
237                     if (exception != null) {
238                         closeWithError(exception.error());
239                         return;
240                     }
241                 }
242                 super.onUnhandledInboundException(cause);
243             }
244         };
245 
246         closePromise = pipeline.newPromise();
247         channelId = new Http2StreamChannelId(parent().id(), id);
248 
249         if (inboundHandler != null) {
250             // Add the handler to the pipeline now that we are registered.
251             pipeline.addLast(inboundHandler);
252         }
253     }
254 
255     private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
256         if (size == 0) {
257             return;
258         }
259 
260         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
261         if (newWriteBufferSize > config().getWriteBufferHighWaterMark()) {
262             setUnwritable(invokeLater);
263         }
264     }
265 
266     private void decrementPendingOutboundBytes(long size, boolean invokeLater) {
267         if (size == 0) {
268             return;
269         }
270 
271         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
272         // Once the totalPendingSize dropped below the low water-mark we can mark the child channel
273         // as writable again. Before doing so we also need to ensure the parent channel is writable to
274         // prevent excessive buffering in the parent outbound buffer. If the parent is not writable
275         // we will mark the child channel as writable once the parent becomes writable by calling
276         // trySetWritable() later.
277         if (newWriteBufferSize < config().getWriteBufferLowWaterMark() && parent().isWritable()) {
278             setWritable(invokeLater);
279         }
280     }
281 
282     final void trySetWritable() {
283         // The parent is writable again but the child channel itself may still not be writable.
284         // Lets try to set the child channel writable to match the state of the parent channel
285         // if (and only if) the totalPendingSize is smaller then the low water-mark.
286         // If this is not the case we will try again later once we drop under it.
287         if (totalPendingSize < config().getWriteBufferLowWaterMark()) {
288             setWritable(false);
289         }
290     }
291 
292     private void setWritable(boolean invokeLater) {
293         for (;;) {
294             final int oldValue = unwritable;
295             final int newValue = oldValue & ~1;
296             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
297                 if (oldValue != 0 && newValue == 0) {
298                     fireChannelWritabilityChanged(invokeLater);
299                 }
300                 break;
301             }
302         }
303     }
304 
305     private void setUnwritable(boolean invokeLater) {
306         for (;;) {
307             final int oldValue = unwritable;
308             final int newValue = oldValue | 1;
309             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
310                 if (oldValue == 0) {
311                     fireChannelWritabilityChanged(invokeLater);
312                 }
313                 break;
314             }
315         }
316     }
317 
318     private void fireChannelWritabilityChanged(boolean invokeLater) {
319         final ChannelPipeline pipeline = pipeline();
320         if (invokeLater) {
321             Runnable task = fireChannelWritabilityChangedTask;
322             if (task == null) {
323                 fireChannelWritabilityChangedTask = task = new Runnable() {
324                     @Override
325                     public void run() {
326                         pipeline.fireChannelWritabilityChanged();
327                     }
328                 };
329             }
330             eventLoop().execute(task);
331         } else {
332             pipeline.fireChannelWritabilityChanged();
333         }
334     }
335     @Override
336     public Http2FrameStream stream() {
337         return stream;
338     }
339 
340     void closeOutbound() {
341         outboundClosed = true;
342     }
343 
344     void streamClosed() {
345         unsafe.readEOS();
346         // Attempt to drain any queued data from the queue and deliver it to the application before closing this
347         // channel.
348         unsafe.doBeginRead();
349     }
350 
351     @Override
352     public ChannelMetadata metadata() {
353         return METADATA;
354     }
355 
356     @Override
357     public ChannelConfig config() {
358         return config;
359     }
360 
361     @Override
362     public boolean isOpen() {
363         return !closePromise.isDone();
364     }
365 
366     @Override
367     public boolean isActive() {
368         return isOpen();
369     }
370 
371     @Override
372     public boolean isWritable() {
373         return unwritable == 0;
374     }
375 
376     @Override
377     public ChannelId id() {
378         return channelId;
379     }
380 
381     @Override
382     public EventLoop eventLoop() {
383         return parent().eventLoop();
384     }
385 
386     @Override
387     public Channel parent() {
388         return parentContext().channel();
389     }
390 
391     @Override
392     public boolean isRegistered() {
393         return registered;
394     }
395 
396     @Override
397     public SocketAddress localAddress() {
398         return parent().localAddress();
399     }
400 
401     @Override
402     public SocketAddress remoteAddress() {
403         return parent().remoteAddress();
404     }
405 
406     @Override
407     public ChannelFuture closeFuture() {
408         return closePromise;
409     }
410 
411     @Override
412     public long bytesBeforeUnwritable() {
413         // +1 because writability doesn't change until the threshold is crossed (not equal to).
414         long bytes = config().getWriteBufferHighWaterMark() - totalPendingSize + 1;
415         // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check
416         // writability. Note that totalPendingSize and isWritable() use different volatile variables that are not
417         // synchronized together. totalPendingSize will be updated before isWritable().
418         return bytes > 0 && isWritable() ? bytes : 0;
419     }
420 
421     @Override
422     public long bytesBeforeWritable() {
423         // +1 because writability doesn't change until the threshold is crossed (not equal to).
424         long bytes = totalPendingSize - config().getWriteBufferLowWaterMark() + 1;
425         // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
426         // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
427         // together. totalPendingSize will be updated before isWritable().
428         return bytes <= 0 || isWritable() ? 0 : bytes;
429     }
430 
431     @Override
432     public Unsafe unsafe() {
433         return unsafe;
434     }
435 
436     @Override
437     public ChannelPipeline pipeline() {
438         return pipeline;
439     }
440 
441     @Override
442     public ByteBufAllocator alloc() {
443         return config().getAllocator();
444     }
445 
446     @Override
447     public Channel read() {
448         pipeline().read();
449         return this;
450     }
451 
452     @Override
453     public Channel flush() {
454         pipeline().flush();
455         return this;
456     }
457 
458     @Override
459     public ChannelFuture bind(SocketAddress localAddress) {
460         return pipeline().bind(localAddress);
461     }
462 
463     @Override
464     public ChannelFuture connect(SocketAddress remoteAddress) {
465         return pipeline().connect(remoteAddress);
466     }
467 
468     @Override
469     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
470         return pipeline().connect(remoteAddress, localAddress);
471     }
472 
473     @Override
474     public ChannelFuture disconnect() {
475         return pipeline().disconnect();
476     }
477 
478     @Override
479     public ChannelFuture close() {
480         return pipeline().close();
481     }
482 
483     @Override
484     public ChannelFuture deregister() {
485         return pipeline().deregister();
486     }
487 
488     @Override
489     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
490         return pipeline().bind(localAddress, promise);
491     }
492 
493     @Override
494     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
495         return pipeline().connect(remoteAddress, promise);
496     }
497 
498     @Override
499     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
500         return pipeline().connect(remoteAddress, localAddress, promise);
501     }
502 
503     @Override
504     public ChannelFuture disconnect(ChannelPromise promise) {
505         return pipeline().disconnect(promise);
506     }
507 
508     @Override
509     public ChannelFuture close(ChannelPromise promise) {
510         return pipeline().close(promise);
511     }
512 
513     @Override
514     public ChannelFuture deregister(ChannelPromise promise) {
515         return pipeline().deregister(promise);
516     }
517 
518     @Override
519     public ChannelFuture write(Object msg) {
520         return pipeline().write(msg);
521     }
522 
523     @Override
524     public ChannelFuture write(Object msg, ChannelPromise promise) {
525         return pipeline().write(msg, promise);
526     }
527 
528     @Override
529     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
530         return pipeline().writeAndFlush(msg, promise);
531     }
532 
533     @Override
534     public ChannelFuture writeAndFlush(Object msg) {
535         return pipeline().writeAndFlush(msg);
536     }
537 
538     @Override
539     public ChannelPromise newPromise() {
540         return pipeline().newPromise();
541     }
542 
543     @Override
544     public ChannelProgressivePromise newProgressivePromise() {
545         return pipeline().newProgressivePromise();
546     }
547 
548     @Override
549     public ChannelFuture newSucceededFuture() {
550         return pipeline().newSucceededFuture();
551     }
552 
553     @Override
554     public ChannelFuture newFailedFuture(Throwable cause) {
555         return pipeline().newFailedFuture(cause);
556     }
557 
558     @Override
559     public ChannelPromise voidPromise() {
560         return pipeline().voidPromise();
561     }
562 
563     @Override
564     public int hashCode() {
565         return id().hashCode();
566     }
567 
568     @Override
569     public boolean equals(Object o) {
570         return this == o;
571     }
572 
573     @Override
574     public int compareTo(Channel o) {
575         if (this == o) {
576             return 0;
577         }
578 
579         return id().compareTo(o.id());
580     }
581 
582     @Override
583     public String toString() {
584         return parent().toString() + "(H2 - " + stream + ')';
585     }
586 
587     /**
588      * Receive a read message. This does not notify handlers unless a read is in progress on the
589      * channel.
590      */
591     void fireChildRead(Http2Frame frame) {
592         assert eventLoop().inEventLoop();
593         if (!isActive()) {
594             ReferenceCountUtil.release(frame);
595         } else if (readStatus != ReadStatus.IDLE) {
596             // If a read is in progress or has been requested, there cannot be anything in the queue,
597             // otherwise we would have drained it from the queue and processed it during the read cycle.
598             assert inboundBuffer == null || inboundBuffer.isEmpty();
599             final RecvByteBufAllocator.Handle allocHandle = unsafe.recvBufAllocHandle();
600 
601             unsafe.doRead0(frame, allocHandle);
602             // We currently don't need to check for readEOS because the parent channel and child channel are limited
603             // to the same EventLoop thread. There are a limited number of frame types that may come after EOS is
604             // read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the
605             // cost of additional readComplete notifications on the rare path.
606             if (allocHandle.continueReading()) {
607                 maybeAddChannelToReadCompletePendingQueue();
608             } else {
609                 unsafe.notifyReadComplete(allocHandle, true, false);
610             }
611         } else {
612             if (inboundBuffer == null) {
613                 inboundBuffer = new ArrayDeque<Object>(4);
614             }
615             inboundBuffer.add(frame);
616         }
617     }
618 
619     void fireChildReadComplete() {
620         assert eventLoop().inEventLoop();
621         assert readStatus != ReadStatus.IDLE || !readCompletePending;
622         unsafe.notifyReadComplete(unsafe.recvBufAllocHandle(), false, false);
623     }
624 
625     final void closeWithError(Http2Error error) {
626         assert eventLoop().inEventLoop();
627         unsafe.close(unsafe.voidPromise(), error);
628     }
629 
630     private final class Http2ChannelUnsafe implements Unsafe {
631         private final VoidChannelPromise unsafeVoidPromise =
632                 new VoidChannelPromise(AbstractHttp2StreamChannel.this, false);
633         @SuppressWarnings("deprecation")
634         private RecvByteBufAllocator.Handle recvHandle;
635         private boolean writeDoneAndNoFlush;
636         private boolean closeInitiated;
637         private boolean readEOS;
638 
639         private boolean receivedEndOfStream;
640         private boolean sentEndOfStream;
641 
642         @Override
643         public void connect(final SocketAddress remoteAddress,
644                             SocketAddress localAddress, final ChannelPromise promise) {
645             if (!promise.setUncancellable()) {
646                 return;
647             }
648             promise.setFailure(new UnsupportedOperationException());
649         }
650 
651         @Override
652         public RecvByteBufAllocator.Handle recvBufAllocHandle() {
653             if (recvHandle == null) {
654                 recvHandle = config().getRecvByteBufAllocator().newHandle();
655                 recvHandle.reset(config());
656             }
657             return recvHandle;
658         }
659 
660         @Override
661         public SocketAddress localAddress() {
662             return parent().unsafe().localAddress();
663         }
664 
665         @Override
666         public SocketAddress remoteAddress() {
667             return parent().unsafe().remoteAddress();
668         }
669 
670         @Override
671         public void register(EventLoop eventLoop, ChannelPromise promise) {
672             if (!promise.setUncancellable()) {
673                 return;
674             }
675             if (registered) {
676                 promise.setFailure(new UnsupportedOperationException("Re-register is not supported"));
677                 return;
678             }
679 
680             registered = true;
681 
682             promise.setSuccess();
683 
684             pipeline().fireChannelRegistered();
685             if (isActive()) {
686                 pipeline().fireChannelActive();
687             }
688         }
689 
690         @Override
691         public void bind(SocketAddress localAddress, ChannelPromise promise) {
692             if (!promise.setUncancellable()) {
693                 return;
694             }
695             promise.setFailure(new UnsupportedOperationException());
696         }
697 
698         @Override
699         public void disconnect(ChannelPromise promise) {
700             close(promise);
701         }
702 
703         @Override
704         public void close(final ChannelPromise promise) {
705             close(promise, Http2Error.CANCEL);
706         }
707 
708         void close(final ChannelPromise promise, Http2Error error) {
709             if (!promise.setUncancellable()) {
710                 return;
711             }
712             if (closeInitiated) {
713                 if (closePromise.isDone()) {
714                     // Closed already.
715                     promise.setSuccess();
716                 } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
717                     // This means close() was called before so we just register a listener and return
718                     closePromise.addListener(new ChannelFutureListener() {
719                         @Override
720                         public void operationComplete(ChannelFuture future) {
721                             promise.setSuccess();
722                         }
723                     });
724                 }
725                 return;
726             }
727             closeInitiated = true;
728             // Just set to false as removing from an underlying queue would even be more expensive.
729             readCompletePending = false;
730 
731             final boolean wasActive = isActive();
732 
733             // There is no need to update the local window as once the stream is closed all the pending bytes will be
734             // given back to the connection window by the controller itself.
735 
736             // Only ever send a reset frame if the connection is still alive and if the stream was created before
737             // as otherwise we may send a RST on a stream in an invalid state and cause a connection error.
738             if (parent().isActive() && isStreamIdValid(stream.id()) &&
739                     // Also ensure the stream was never "closed" before.
740                     !readEOS && !(receivedEndOfStream && sentEndOfStream)) {
741                 Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(error).stream(stream());
742                 write(resetFrame, unsafe().voidPromise());
743                 flush();
744             }
745 
746             if (inboundBuffer != null) {
747                 for (;;) {
748                     Object msg = inboundBuffer.poll();
749                     if (msg == null) {
750                         break;
751                     }
752                     ReferenceCountUtil.release(msg);
753                 }
754                 inboundBuffer = null;
755             }
756 
757             // The promise should be notified before we call fireChannelInactive().
758             outboundClosed = true;
759             closePromise.setSuccess();
760             promise.setSuccess();
761 
762             fireChannelInactiveAndDeregister(voidPromise(), wasActive);
763         }
764 
765         @Override
766         public void closeForcibly() {
767             close(unsafe().voidPromise());
768         }
769 
770         @Override
771         public void deregister(ChannelPromise promise) {
772             fireChannelInactiveAndDeregister(promise, false);
773         }
774 
775         private void fireChannelInactiveAndDeregister(final ChannelPromise promise,
776                                                       final boolean fireChannelInactive) {
777             if (!promise.setUncancellable()) {
778                 return;
779             }
780 
781             if (!registered) {
782                 promise.setSuccess();
783                 return;
784             }
785 
786             // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
787             // we need to ensure we do the actual deregister operation later. This is necessary to preserve the
788             // behavior of the AbstractChannel, which always invokes channelUnregistered and channelInactive
789             // events 'later' to ensure the current events in the handler are completed before these events.
790             //
791             // See:
792             // https://github.com/netty/netty/issues/4435
793             invokeLater(promise.channel(), new Runnable() {
794                 @Override
795                 public void run() {
796                     if (fireChannelInactive) {
797                         pipeline.fireChannelInactive();
798                     }
799                     // The user can fire `deregister` events multiple times but we only want to fire the pipeline
800                     // event if the channel was actually registered.
801                     if (registered) {
802                         registered = false;
803                         pipeline.fireChannelUnregistered();
804                     }
805                     safeSetSuccess(promise);
806                 }
807             });
808         }
809 
810         private void safeSetSuccess(ChannelPromise promise) {
811             if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
812                 logger.warn("{} Failed to mark a promise as success because it is done already: {}",
813                         promise.channel(), promise);
814             }
815         }
816 
817         private void invokeLater(Channel channel, Runnable task) {
818             try {
819                 // This method is used by outbound operation implementations to trigger an inbound event later.
820                 // They do not trigger an inbound event immediately because an outbound operation might have been
821                 // triggered by another inbound event handler method.  If fired immediately, the call stack
822                 // will look like this for example:
823                 //
824                 //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
825                 //   -> handlerA.ctx.close()
826                 //     -> channel.unsafe.close()
827                 //       -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
828                 //
829                 // which means the execution of two inbound handler methods of the same handler overlap undesirably.
830                 eventLoop().execute(task);
831             } catch (RejectedExecutionException e) {
832                 logger.warn("{} Can't invoke task later as EventLoop rejected it", channel, e);
833             }
834         }
835 
836         @Override
837         public void beginRead() {
838             if (!isActive()) {
839                 return;
840             }
841             updateLocalWindowIfNeeded();
842 
843             switch (readStatus) {
844                 case IDLE:
845                     readStatus = ReadStatus.IN_PROGRESS;
846                     doBeginRead();
847                     break;
848                 case IN_PROGRESS:
849                     readStatus = ReadStatus.REQUESTED;
850                     break;
851                 default:
852                     break;
853             }
854         }
855 
856         private Object pollQueuedMessage() {
857             return inboundBuffer == null ? null : inboundBuffer.poll();
858         }
859 
860         void doBeginRead() {
861             if (readStatus == ReadStatus.IDLE) {
862                 // Don't wait for the user to request a read to notify of channel closure.
863                 if (readEOS && (inboundBuffer == null || inboundBuffer.isEmpty())) {
864                     // Double check there is nothing left to flush such as a window update frame.
865                     flush();
866                     unsafe.closeForcibly();
867                 }
868             } else {
869                 do { // Process messages until there are none left (or the user stopped requesting) and also handle EOS.
870                     Object message = pollQueuedMessage();
871                     if (message == null) {
872                         // Double check there is nothing left to flush such as a window update frame.
873                         flush();
874                         if (readEOS) {
875                             unsafe.closeForcibly();
876                         }
877                         break;
878                     }
879                     final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
880                     allocHandle.reset(config());
881                     boolean continueReading = false;
882                     do {
883                         doRead0((Http2Frame) message, allocHandle);
884                     } while ((readEOS || (continueReading = allocHandle.continueReading()))
885                             && (message = pollQueuedMessage()) != null);
886 
887                     if (continueReading && isParentReadInProgress() && !readEOS) {
888                         // Currently the parent and child channel are on the same EventLoop thread. If the parent is
889                         // currently reading it is possible that more frames will be delivered to this child channel. In
890                         // the case that this child channel still wants to read we delay the channelReadComplete on this
891                         // child channel until the parent is done reading.
892                         maybeAddChannelToReadCompletePendingQueue();
893                     } else {
894                         notifyReadComplete(allocHandle, true, true);
895 
896                         // While in the read loop reset the readState AFTER calling readComplete (or other pipeline
897                         // callbacks) to prevents re-entry into this method (if autoRead is disabled and the user calls
898                         // read on each readComplete) and StackOverflowException.
899                         resetReadStatus();
900                     }
901                 } while (readStatus != ReadStatus.IDLE);
902             }
903         }
904 
905         void readEOS() {
906             readEOS = true;
907         }
908 
909         private void updateLocalWindowIfNeeded() {
910             if (flowControlledBytes != 0 && !parentContext().isRemoved()) {
911                 int bytes = flowControlledBytes;
912                 flowControlledBytes = 0;
913                 ChannelFuture future = write0(parentContext(), new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
914                 // window update frames are commonly swallowed by the Http2FrameCodec and the promise is synchronously
915                 // completed but the flow controller _may_ have generated a wire level WINDOW_UPDATE. Therefore we need,
916                 // to assume there was a write done that needs to be flushed or we risk flow control starvation.
917                 writeDoneAndNoFlush = true;
918                 // Add a listener which will notify and teardown the stream
919                 // when a window update fails if needed or check the result of the future directly if it was completed
920                 // already.
921                 // See https://github.com/netty/netty/issues/9663
922                 if (future.isDone()) {
923                     windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
924                 } else {
925                     future.addListener(windowUpdateFrameWriteListener);
926                 }
927             }
928         }
929 
930         private void resetReadStatus() {
931             readStatus = readStatus == ReadStatus.REQUESTED ? ReadStatus.IN_PROGRESS : ReadStatus.IDLE;
932         }
933 
934         void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle, boolean forceReadComplete,
935                                 boolean inReadLoop) {
936             if (!readCompletePending && !forceReadComplete) {
937                 return;
938             }
939             // Set to false just in case we added the channel multiple times before.
940             readCompletePending = false;
941 
942             if (!inReadLoop) {
943                 // While in the read loop we reset the state after calling pipeline methods to prevent StackOverflow.
944                 resetReadStatus();
945             }
946 
947             allocHandle.readComplete();
948             pipeline().fireChannelReadComplete();
949             // Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent
950             // channel is not currently reading we need to force a flush at the child channel, because we cannot
951             // rely upon flush occurring in channelReadComplete on the parent channel.
952             flush();
953             if (readEOS) {
954                 unsafe.closeForcibly();
955             }
956         }
957 
958         @SuppressWarnings("deprecation")
959         void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
960             final int bytes;
961             if (frame instanceof Http2DataFrame) {
962                 bytes = ((Http2DataFrame) frame).initialFlowControlledBytes();
963                 // It is important that we increment the flowControlledBytes before we call fireChannelRead(...)
964                 // as it may cause a read() that will call updateLocalWindowIfNeeded() and we need to ensure
965                 // in this case that we accounted for it.
966                 //
967                 // See https://github.com/netty/netty/issues/9663
968                 flowControlledBytes += bytes;
969             } else {
970                 bytes = MIN_HTTP2_FRAME_SIZE;
971             }
972 
973             // Let's keep track of what we received as the stream state itself will only be updated once the frame
974             // was dispatched for reading which might cause problems if we try to close the channel in a write future.
975             receivedEndOfStream |= isEndOfStream(frame);
976 
977             // Update before firing event through the pipeline to be consistent with other Channel implementation.
978             allocHandle.attemptedBytesRead(bytes);
979             allocHandle.lastBytesRead(bytes);
980             allocHandle.incMessagesRead(1);
981 
982             pipeline().fireChannelRead(frame);
983         }
984 
985         @Override
986         public void write(Object msg, final ChannelPromise promise) {
987             // After this point its not possible to cancel a write anymore.
988             if (!promise.setUncancellable()) {
989                 ReferenceCountUtil.release(msg);
990                 return;
991             }
992 
993             if (!isActive() ||
994                     // Once the outbound side was closed we should not allow header / data frames
995                     outboundClosed && (msg instanceof Http2HeadersFrame || msg instanceof Http2DataFrame)) {
996                 ReferenceCountUtil.release(msg);
997                 promise.setFailure(new ClosedChannelException());
998                 return;
999             }
1000 
1001             try {
1002                 if (msg instanceof Http2StreamFrame) {
1003                     Http2StreamFrame frame = validateStreamFrame((Http2StreamFrame) msg).stream(stream());
1004                     writeHttp2StreamFrame(frame, promise);
1005                 } else {
1006                     String msgStr = msg.toString();
1007                     ReferenceCountUtil.release(msg);
1008                     promise.setFailure(new IllegalArgumentException(
1009                             "Message must be an " + StringUtil.simpleClassName(Http2StreamFrame.class) +
1010                                     ": " + msgStr));
1011                 }
1012             } catch (Throwable t) {
1013                 promise.tryFailure(t);
1014             }
1015         }
1016 
1017         private boolean isEndOfStream(Http2Frame frame) {
1018             if (frame instanceof Http2HeadersFrame) {
1019                 return ((Http2HeadersFrame) frame).isEndStream();
1020             }
1021             if (frame instanceof Http2DataFrame) {
1022                 return ((Http2DataFrame) frame).isEndStream();
1023             }
1024             return false;
1025         }
1026 
1027         private void writeHttp2StreamFrame(Http2StreamFrame frame, final ChannelPromise promise) {
1028             if (!firstFrameWritten && !isStreamIdValid(stream().id()) && !(frame instanceof Http2HeadersFrame)) {
1029                 ReferenceCountUtil.release(frame);
1030                 promise.setFailure(
1031                     new IllegalArgumentException("The first frame must be a headers frame. Was: "
1032                         + frame.name()));
1033                 return;
1034             }
1035 
1036             final boolean firstWrite;
1037             if (firstFrameWritten) {
1038                 firstWrite = false;
1039             } else {
1040                 firstWrite = firstFrameWritten = true;
1041             }
1042 
1043             // Let's keep track of what we send as the stream state itself will only be updated once the frame
1044             // was written which might cause problems if we try to close the channel in a write future.
1045             sentEndOfStream |= isEndOfStream(frame);
1046             ChannelFuture f = write0(parentContext(), frame);
1047             if (f.isDone()) {
1048                 if (firstWrite) {
1049                     firstWriteComplete(f, promise);
1050                 } else {
1051                     writeComplete(f, promise);
1052                 }
1053             } else {
1054                 final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(frame);
1055                 incrementPendingOutboundBytes(bytes, false);
1056                 f.addListener(new ChannelFutureListener() {
1057                     @Override
1058                     public void operationComplete(ChannelFuture future) {
1059                         if (firstWrite) {
1060                             firstWriteComplete(future, promise);
1061                         } else {
1062                             writeComplete(future, promise);
1063                         }
1064                         decrementPendingOutboundBytes(bytes, false);
1065                     }
1066                 });
1067                 writeDoneAndNoFlush = true;
1068             }
1069         }
1070 
1071         private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) {
1072             Throwable cause = future.cause();
1073             if (cause == null) {
1074                 promise.setSuccess();
1075             } else {
1076                 // If the first write fails there is not much we can do, just close
1077                 closeForcibly();
1078                 promise.setFailure(wrapStreamClosedError(cause));
1079             }
1080         }
1081 
1082         private void writeComplete(ChannelFuture future, ChannelPromise promise) {
1083             Throwable cause = future.cause();
1084             if (cause == null) {
1085                 promise.setSuccess();
1086             } else {
1087                 Throwable error = wrapStreamClosedError(cause);
1088                 // To make it more consistent with AbstractChannel we handle all IOExceptions here.
1089                 if (error instanceof IOException) {
1090                     if (config.isAutoClose()) {
1091                         // Close channel if needed.
1092                         closeForcibly();
1093                     } else {
1094                         // TODO: Once Http2StreamChannel extends DuplexChannel we should call shutdownOutput(...)
1095                         outboundClosed = true;
1096                     }
1097                 }
1098                 promise.setFailure(error);
1099             }
1100         }
1101 
1102         private Throwable wrapStreamClosedError(Throwable cause) {
1103             // If the error was caused by STREAM_CLOSED we should use a ClosedChannelException to better
1104             // mimic other transports and make it easier to reason about what exceptions to expect.
1105             if (cause instanceof Http2Exception && ((Http2Exception) cause).error() == Http2Error.STREAM_CLOSED) {
1106                 return new ClosedChannelException().initCause(cause);
1107             }
1108             return cause;
1109         }
1110 
1111         private Http2StreamFrame validateStreamFrame(Http2StreamFrame frame) {
1112             if (frame.stream() != null && frame.stream() != stream) {
1113                 String msgString = frame.toString();
1114                 ReferenceCountUtil.release(frame);
1115                 throw new IllegalArgumentException(
1116                         "Stream " + frame.stream() + " must not be set on the frame: " + msgString);
1117             }
1118             return frame;
1119         }
1120 
1121         @Override
1122         public void flush() {
1123             // If we are currently in the parent channel's read loop we should just ignore the flush.
1124             // We will ensure we trigger ctx.flush() after we processed all Channels later on and
1125             // so aggregate the flushes. This is done as ctx.flush() is expensive when as it may trigger an
1126             // write(...) or writev(...) operation on the socket.
1127             if (!writeDoneAndNoFlush || isParentReadInProgress()) {
1128                 // There is nothing to flush so this is a NOOP.
1129                 return;
1130             }
1131             // We need to set this to false before we call flush0(...) as ChannelFutureListener may produce more data
1132             // that are explicit flushed.
1133             writeDoneAndNoFlush = false;
1134             flush0(parentContext());
1135         }
1136 
1137         @Override
1138         public ChannelPromise voidPromise() {
1139             return unsafeVoidPromise;
1140         }
1141 
1142         @Override
1143         public ChannelOutboundBuffer outboundBuffer() {
1144             // Always return null as we not use the ChannelOutboundBuffer and not even support it.
1145             return null;
1146         }
1147     }
1148 
1149     /**
1150      * {@link ChannelConfig} so that the high and low writebuffer watermarks can reflect the outbound flow control
1151      * window, without having to create a new {@link WriteBufferWaterMark} object whenever the flow control window
1152      * changes.
1153      */
1154     private static final class Http2StreamChannelConfig extends DefaultChannelConfig {
1155         Http2StreamChannelConfig(Channel channel) {
1156             super(channel);
1157         }
1158 
1159         @Override
1160         public MessageSizeEstimator getMessageSizeEstimator() {
1161             return FlowControlledFrameSizeEstimator.INSTANCE;
1162         }
1163 
1164         @Override
1165         public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
1166             throw new UnsupportedOperationException();
1167         }
1168 
1169         @Override
1170         public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
1171             if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
1172                 throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " +
1173                         RecvByteBufAllocator.ExtendedHandle.class);
1174             }
1175             super.setRecvByteBufAllocator(allocator);
1176             return this;
1177         }
1178     }
1179 
1180     private void maybeAddChannelToReadCompletePendingQueue() {
1181         if (!readCompletePending) {
1182             readCompletePending = true;
1183             addChannelToReadCompletePendingQueue();
1184         }
1185     }
1186 
1187     protected void flush0(ChannelHandlerContext ctx) {
1188         ctx.flush();
1189     }
1190 
1191     protected ChannelFuture write0(ChannelHandlerContext ctx, Object msg) {
1192         ChannelPromise promise = ctx.newPromise();
1193         ctx.write(msg, promise);
1194         return promise;
1195     }
1196 
1197     protected abstract boolean isParentReadInProgress();
1198     protected abstract void addChannelToReadCompletePendingQueue();
1199     protected abstract ChannelHandlerContext parentContext();
1200 }