View Javadoc
1   /*
2    * Copyright 2019 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandler;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelId;
26  import io.netty.channel.ChannelMetadata;
27  import io.netty.channel.ChannelOption;
28  import io.netty.channel.ChannelOutboundBuffer;
29  import io.netty.channel.ChannelPipeline;
30  import io.netty.channel.ChannelProgressivePromise;
31  import io.netty.channel.ChannelPromise;
32  import io.netty.channel.DefaultChannelConfig;
33  import io.netty.channel.DefaultChannelPipeline;
34  import io.netty.channel.EventLoop;
35  import io.netty.channel.MessageSizeEstimator;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.VoidChannelPromise;
38  import io.netty.channel.WriteBufferWaterMark;
39  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40  import io.netty.channel.socket.ChannelOutputShutdownEvent;
41  import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
42  import io.netty.handler.ssl.SslCloseCompletionEvent;
43  import io.netty.util.DefaultAttributeMap;
44  import io.netty.util.ReferenceCountUtil;
45  import io.netty.util.internal.ObjectUtil;
46  import io.netty.util.internal.StringUtil;
47  import io.netty.util.internal.logging.InternalLogger;
48  import io.netty.util.internal.logging.InternalLoggerFactory;
49  
50  import java.io.IOException;
51  import java.net.SocketAddress;
52  import java.nio.channels.ClosedChannelException;
53  import java.util.ArrayDeque;
54  import java.util.Map;
55  import java.util.Queue;
56  import java.util.concurrent.RejectedExecutionException;
57  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
58  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
59  
60  import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
61  import static io.netty.util.internal.ObjectUtil.checkNotNull;
62  import static java.lang.Math.min;
63  
64  abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
65  
66      static final Http2FrameStreamVisitor WRITABLE_VISITOR = new Http2FrameStreamVisitor() {
67          @Override
68          public boolean visit(Http2FrameStream stream) {
69              final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
70                      ((DefaultHttp2FrameStream) stream).attachment;
71              childChannel.trySetWritable();
72              return true;
73          }
74      };
75  
76      static final Http2FrameStreamVisitor CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR =
77              new UserEventStreamVisitor(ChannelInputShutdownReadComplete.INSTANCE);
78  
79      static final Http2FrameStreamVisitor CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR =
80              new UserEventStreamVisitor(ChannelOutputShutdownEvent.INSTANCE);
81  
82      static final Http2FrameStreamVisitor SSL_CLOSE_COMPLETION_EVENT_VISITOR =
83              new UserEventStreamVisitor(SslCloseCompletionEvent.SUCCESS);
84  
85      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractHttp2StreamChannel.class);
86  
87      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
88  
89      /**
90       * Number of bytes to consider non-payload messages. 9 is arbitrary, but also the minimum size of an HTTP/2 frame.
91       * Primarily is non-zero.
92       */
93      private static final int MIN_HTTP2_FRAME_SIZE = 9;
94  
95      /**
96       * {@link Http2FrameStreamVisitor} that fires the user event for every active stream pipeline.
97       */
98      private static final class UserEventStreamVisitor implements Http2FrameStreamVisitor {
99  
100         private final Object event;
101 
102         UserEventStreamVisitor(Object event) {
103             this.event = checkNotNull(event, "event");
104         }
105 
106         @Override
107         public boolean visit(Http2FrameStream stream) {
108             final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
109                     ((DefaultHttp2FrameStream) stream).attachment;
110             childChannel.pipeline().fireUserEventTriggered(event);
111             return true;
112         }
113     }
114 
115     /**
116      * Returns the flow-control size for DATA frames, and {@value MIN_HTTP2_FRAME_SIZE} for all other frames.
117      */
118     private static final class FlowControlledFrameSizeEstimator implements MessageSizeEstimator {
119 
120         static final FlowControlledFrameSizeEstimator INSTANCE = new FlowControlledFrameSizeEstimator();
121 
122         private static final Handle HANDLE_INSTANCE = new Handle() {
123             @Override
124             public int size(Object msg) {
125                 return msg instanceof Http2DataFrame ?
126                         // Guard against overflow.
127                         (int) min(Integer.MAX_VALUE, ((Http2DataFrame) msg).initialFlowControlledBytes() +
128                                 (long) MIN_HTTP2_FRAME_SIZE) : MIN_HTTP2_FRAME_SIZE;
129             }
130         };
131 
132         @Override
133         public Handle newHandle() {
134             return HANDLE_INSTANCE;
135         }
136     }
137 
138     private static final AtomicLongFieldUpdater<AbstractHttp2StreamChannel> TOTAL_PENDING_SIZE_UPDATER =
139             AtomicLongFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "totalPendingSize");
140 
141     private static final AtomicIntegerFieldUpdater<AbstractHttp2StreamChannel> UNWRITABLE_UPDATER =
142             AtomicIntegerFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "unwritable");
143 
144     private static void windowUpdateFrameWriteComplete(ChannelFuture future, Channel streamChannel) {
145         Throwable cause = future.cause();
146         if (cause != null) {
147             Throwable unwrappedCause;
148             // Unwrap if needed
149             if (cause instanceof Http2FrameStreamException && (unwrappedCause = cause.getCause()) != null) {
150                 cause = unwrappedCause;
151             }
152 
153             // Notify the child-channel and close it.
154             streamChannel.pipeline().fireExceptionCaught(cause);
155             streamChannel.unsafe().close(streamChannel.unsafe().voidPromise());
156         }
157     }
158 
159     private final ChannelFutureListener windowUpdateFrameWriteListener = new ChannelFutureListener() {
160         @Override
161         public void operationComplete(ChannelFuture future) {
162             windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
163         }
164     };
165 
166     /**
167      * The current status of the read-processing for a {@link AbstractHttp2StreamChannel}.
168      */
169     private enum ReadStatus {
170         /**
171          * No read in progress and no read was requested (yet)
172          */
173         IDLE,
174 
175         /**
176          * Reading in progress
177          */
178         IN_PROGRESS,
179 
180         /**
181          * A read operation was requested.
182          */
183         REQUESTED
184     }
185 
186     private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
187     private final Http2ChannelUnsafe unsafe = new Http2ChannelUnsafe();
188     private final ChannelId channelId;
189     private final ChannelPipeline pipeline;
190     private final DefaultHttp2FrameStream stream;
191     private final ChannelPromise closePromise;
192 
193     private volatile boolean registered;
194 
195     private volatile long totalPendingSize;
196     private volatile int unwritable;
197 
198     // Cached to reduce GC
199     private Runnable fireChannelWritabilityChangedTask;
200 
201     private boolean outboundClosed;
202     private int flowControlledBytes;
203 
204     /**
205      * This variable represents if a read is in progress for the current channel or was requested.
206      * Note that depending upon the {@link RecvByteBufAllocator} behavior a read may extend beyond the
207      * {@link Http2ChannelUnsafe#beginRead()} method scope. The {@link Http2ChannelUnsafe#beginRead()} loop may
208      * drain all pending data, and then if the parent channel is reading this channel may still accept frames.
209      */
210     private ReadStatus readStatus = ReadStatus.IDLE;
211 
212     private Queue<Object> inboundBuffer;
213 
214     /** {@code true} after the first HEADERS frame has been written **/
215     private boolean firstFrameWritten;
216     private boolean readCompletePending;
217 
218     AbstractHttp2StreamChannel(DefaultHttp2FrameStream stream, int id, ChannelHandler inboundHandler) {
219         this.stream = stream;
220         stream.attachment = this;
221         pipeline = new DefaultChannelPipeline(this) {
222             @Override
223             protected void incrementPendingOutboundBytes(long size) {
224                 AbstractHttp2StreamChannel.this.incrementPendingOutboundBytes(size, true);
225             }
226 
227             @Override
228             protected void decrementPendingOutboundBytes(long size) {
229                 AbstractHttp2StreamChannel.this.decrementPendingOutboundBytes(size, true);
230             }
231 
232             @Override
233             protected void onUnhandledInboundException(Throwable cause) {
234                 // Ensure we use the correct Http2Error to close the channel.
235                 if (cause instanceof Http2FrameStreamException) {
236                     closeWithError(((Http2FrameStreamException) cause).error());
237                     return;
238                 } else {
239                     Http2Exception exception = Http2CodecUtil.getEmbeddedHttp2Exception(cause);
240                     if (exception != null) {
241                         closeWithError(exception.error());
242                         return;
243                     }
244                 }
245                 super.onUnhandledInboundException(cause);
246             }
247         };
248 
249         closePromise = pipeline.newPromise();
250         channelId = new Http2StreamChannelId(parent().id(), id);
251 
252         if (inboundHandler != null) {
253             // Add the handler to the pipeline now that we are registered.
254             pipeline.addLast(inboundHandler);
255         }
256     }
257 
258     private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
259         if (size == 0) {
260             return;
261         }
262 
263         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
264         if (newWriteBufferSize > config().getWriteBufferHighWaterMark()) {
265             setUnwritable(invokeLater);
266         }
267     }
268 
269     private void decrementPendingOutboundBytes(long size, boolean invokeLater) {
270         if (size == 0) {
271             return;
272         }
273 
274         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
275         // Once the totalPendingSize dropped below the low water-mark we can mark the child channel
276         // as writable again. Before doing so we also need to ensure the parent channel is writable to
277         // prevent excessive buffering in the parent outbound buffer. If the parent is not writable
278         // we will mark the child channel as writable once the parent becomes writable by calling
279         // trySetWritable() later.
280         if (newWriteBufferSize < config().getWriteBufferLowWaterMark() && parent().isWritable()) {
281             setWritable(invokeLater);
282         }
283     }
284 
285     final void trySetWritable() {
286         // The parent is writable again but the child channel itself may still not be writable.
287         // Lets try to set the child channel writable to match the state of the parent channel
288         // if (and only if) the totalPendingSize is smaller then the low water-mark.
289         // If this is not the case we will try again later once we drop under it.
290         if (totalPendingSize < config().getWriteBufferLowWaterMark()) {
291             setWritable(false);
292         }
293     }
294 
295     private void setWritable(boolean invokeLater) {
296         for (;;) {
297             final int oldValue = unwritable;
298             final int newValue = oldValue & ~1;
299             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
300                 if (oldValue != 0 && newValue == 0) {
301                     fireChannelWritabilityChanged(invokeLater);
302                 }
303                 break;
304             }
305         }
306     }
307 
308     private void setUnwritable(boolean invokeLater) {
309         for (;;) {
310             final int oldValue = unwritable;
311             final int newValue = oldValue | 1;
312             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
313                 if (oldValue == 0) {
314                     fireChannelWritabilityChanged(invokeLater);
315                 }
316                 break;
317             }
318         }
319     }
320 
321     private void fireChannelWritabilityChanged(boolean invokeLater) {
322         final ChannelPipeline pipeline = pipeline();
323         if (invokeLater) {
324             Runnable task = fireChannelWritabilityChangedTask;
325             if (task == null) {
326                 fireChannelWritabilityChangedTask = task = new Runnable() {
327                     @Override
328                     public void run() {
329                         pipeline.fireChannelWritabilityChanged();
330                     }
331                 };
332             }
333             eventLoop().execute(task);
334         } else {
335             pipeline.fireChannelWritabilityChanged();
336         }
337     }
338     @Override
339     public Http2FrameStream stream() {
340         return stream;
341     }
342 
343     void closeOutbound() {
344         outboundClosed = true;
345     }
346 
347     void streamClosed() {
348         unsafe.readEOS();
349         // Attempt to drain any queued data from the queue and deliver it to the application before closing this
350         // channel.
351         unsafe.doBeginRead();
352     }
353 
354     @Override
355     public ChannelMetadata metadata() {
356         return METADATA;
357     }
358 
359     @Override
360     public ChannelConfig config() {
361         return config;
362     }
363 
364     @Override
365     public boolean isOpen() {
366         return !closePromise.isDone();
367     }
368 
369     @Override
370     public boolean isActive() {
371         return isOpen();
372     }
373 
374     @Override
375     public boolean isWritable() {
376         return unwritable == 0;
377     }
378 
379     @Override
380     public ChannelId id() {
381         return channelId;
382     }
383 
384     @Override
385     public EventLoop eventLoop() {
386         return parent().eventLoop();
387     }
388 
389     @Override
390     public Channel parent() {
391         return parentContext().channel();
392     }
393 
394     @Override
395     public boolean isRegistered() {
396         return registered;
397     }
398 
399     @Override
400     public SocketAddress localAddress() {
401         return parent().localAddress();
402     }
403 
404     @Override
405     public SocketAddress remoteAddress() {
406         return parent().remoteAddress();
407     }
408 
409     @Override
410     public ChannelFuture closeFuture() {
411         return closePromise;
412     }
413 
414     @Override
415     public long bytesBeforeUnwritable() {
416         // +1 because writability doesn't change until the threshold is crossed (not equal to).
417         long bytes = config().getWriteBufferHighWaterMark() - totalPendingSize + 1;
418         // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check
419         // writability. Note that totalPendingSize and isWritable() use different volatile variables that are not
420         // synchronized together. totalPendingSize will be updated before isWritable().
421         return bytes > 0 && isWritable() ? bytes : 0;
422     }
423 
424     @Override
425     public long bytesBeforeWritable() {
426         // +1 because writability doesn't change until the threshold is crossed (not equal to).
427         long bytes = totalPendingSize - config().getWriteBufferLowWaterMark() + 1;
428         // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
429         // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
430         // together. totalPendingSize will be updated before isWritable().
431         return bytes <= 0 || isWritable() ? 0 : bytes;
432     }
433 
434     @Override
435     public Unsafe unsafe() {
436         return unsafe;
437     }
438 
439     @Override
440     public ChannelPipeline pipeline() {
441         return pipeline;
442     }
443 
444     @Override
445     public ByteBufAllocator alloc() {
446         return config().getAllocator();
447     }
448 
449     @Override
450     public Channel read() {
451         pipeline().read();
452         return this;
453     }
454 
455     @Override
456     public Channel flush() {
457         pipeline().flush();
458         return this;
459     }
460 
461     @Override
462     public ChannelFuture bind(SocketAddress localAddress) {
463         return pipeline().bind(localAddress);
464     }
465 
466     @Override
467     public ChannelFuture connect(SocketAddress remoteAddress) {
468         return pipeline().connect(remoteAddress);
469     }
470 
471     @Override
472     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
473         return pipeline().connect(remoteAddress, localAddress);
474     }
475 
476     @Override
477     public ChannelFuture disconnect() {
478         return pipeline().disconnect();
479     }
480 
481     @Override
482     public ChannelFuture close() {
483         return pipeline().close();
484     }
485 
486     @Override
487     public ChannelFuture deregister() {
488         return pipeline().deregister();
489     }
490 
491     @Override
492     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
493         return pipeline().bind(localAddress, promise);
494     }
495 
496     @Override
497     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
498         return pipeline().connect(remoteAddress, promise);
499     }
500 
501     @Override
502     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
503         return pipeline().connect(remoteAddress, localAddress, promise);
504     }
505 
506     @Override
507     public ChannelFuture disconnect(ChannelPromise promise) {
508         return pipeline().disconnect(promise);
509     }
510 
511     @Override
512     public ChannelFuture close(ChannelPromise promise) {
513         return pipeline().close(promise);
514     }
515 
516     @Override
517     public ChannelFuture deregister(ChannelPromise promise) {
518         return pipeline().deregister(promise);
519     }
520 
521     @Override
522     public ChannelFuture write(Object msg) {
523         return pipeline().write(msg);
524     }
525 
526     @Override
527     public ChannelFuture write(Object msg, ChannelPromise promise) {
528         return pipeline().write(msg, promise);
529     }
530 
531     @Override
532     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
533         return pipeline().writeAndFlush(msg, promise);
534     }
535 
536     @Override
537     public ChannelFuture writeAndFlush(Object msg) {
538         return pipeline().writeAndFlush(msg);
539     }
540 
541     @Override
542     public ChannelPromise newPromise() {
543         return pipeline().newPromise();
544     }
545 
546     @Override
547     public ChannelProgressivePromise newProgressivePromise() {
548         return pipeline().newProgressivePromise();
549     }
550 
551     @Override
552     public ChannelFuture newSucceededFuture() {
553         return pipeline().newSucceededFuture();
554     }
555 
556     @Override
557     public ChannelFuture newFailedFuture(Throwable cause) {
558         return pipeline().newFailedFuture(cause);
559     }
560 
561     @Override
562     public ChannelPromise voidPromise() {
563         return pipeline().voidPromise();
564     }
565 
566     @Override
567     public int hashCode() {
568         return id().hashCode();
569     }
570 
571     @Override
572     public boolean equals(Object o) {
573         return this == o;
574     }
575 
576     @Override
577     public int compareTo(Channel o) {
578         if (this == o) {
579             return 0;
580         }
581 
582         return id().compareTo(o.id());
583     }
584 
585     @Override
586     public String toString() {
587         return parent().toString() + "(H2 - " + stream + ')';
588     }
589 
590     /**
591      * Receive a read message. This does not notify handlers unless a read is in progress on the
592      * channel.
593      */
594     void fireChildRead(Http2Frame frame) {
595         assert eventLoop().inEventLoop();
596         if (!isActive()) {
597             ReferenceCountUtil.release(frame);
598         } else if (readStatus != ReadStatus.IDLE) {
599             // If a read is in progress or has been requested, there cannot be anything in the queue,
600             // otherwise we would have drained it from the queue and processed it during the read cycle.
601             assert inboundBuffer == null || inboundBuffer.isEmpty();
602             final RecvByteBufAllocator.Handle allocHandle = unsafe.recvBufAllocHandle();
603 
604             unsafe.doRead0(frame, allocHandle);
605             // We currently don't need to check for readEOS because the parent channel and child channel are limited
606             // to the same EventLoop thread. There are a limited number of frame types that may come after EOS is
607             // read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the
608             // cost of additional readComplete notifications on the rare path.
609             if (allocHandle.continueReading()) {
610                 maybeAddChannelToReadCompletePendingQueue();
611             } else {
612                 unsafe.notifyReadComplete(allocHandle, true, false);
613             }
614         } else {
615             if (inboundBuffer == null) {
616                 inboundBuffer = new ArrayDeque<Object>(4);
617             }
618             inboundBuffer.add(frame);
619         }
620     }
621 
622     void fireChildReadComplete() {
623         assert eventLoop().inEventLoop();
624         assert readStatus != ReadStatus.IDLE || !readCompletePending;
625         unsafe.notifyReadComplete(unsafe.recvBufAllocHandle(), false, false);
626     }
627 
628     final void closeWithError(Http2Error error) {
629         assert eventLoop().inEventLoop();
630         unsafe.close(unsafe.voidPromise(), error);
631     }
632 
633     private final class Http2ChannelUnsafe implements Unsafe {
634         private final VoidChannelPromise unsafeVoidPromise =
635                 new VoidChannelPromise(AbstractHttp2StreamChannel.this, false);
636         @SuppressWarnings("deprecation")
637         private RecvByteBufAllocator.Handle recvHandle;
638         private boolean writeDoneAndNoFlush;
639         private boolean closeInitiated;
640         private boolean readEOS;
641 
642         private boolean receivedEndOfStream;
643         private boolean sentEndOfStream;
644 
645         @Override
646         public void connect(final SocketAddress remoteAddress,
647                             SocketAddress localAddress, final ChannelPromise promise) {
648             if (!promise.setUncancellable()) {
649                 return;
650             }
651             promise.setFailure(new UnsupportedOperationException());
652         }
653 
654         @Override
655         public RecvByteBufAllocator.Handle recvBufAllocHandle() {
656             if (recvHandle == null) {
657                 recvHandle = config().getRecvByteBufAllocator().newHandle();
658                 recvHandle.reset(config());
659             }
660             return recvHandle;
661         }
662 
663         @Override
664         public SocketAddress localAddress() {
665             return parent().unsafe().localAddress();
666         }
667 
668         @Override
669         public SocketAddress remoteAddress() {
670             return parent().unsafe().remoteAddress();
671         }
672 
673         @Override
674         public void register(EventLoop eventLoop, ChannelPromise promise) {
675             if (!promise.setUncancellable()) {
676                 return;
677             }
678             if (registered) {
679                 promise.setFailure(new UnsupportedOperationException("Re-register is not supported"));
680                 return;
681             }
682 
683             registered = true;
684 
685             promise.setSuccess();
686 
687             pipeline().fireChannelRegistered();
688             if (isActive()) {
689                 pipeline().fireChannelActive();
690             }
691         }
692 
693         @Override
694         public void bind(SocketAddress localAddress, ChannelPromise promise) {
695             if (!promise.setUncancellable()) {
696                 return;
697             }
698             promise.setFailure(new UnsupportedOperationException());
699         }
700 
701         @Override
702         public void disconnect(ChannelPromise promise) {
703             close(promise);
704         }
705 
706         @Override
707         public void close(final ChannelPromise promise) {
708             close(promise, Http2Error.CANCEL);
709         }
710 
711         void close(final ChannelPromise promise, Http2Error error) {
712             if (!promise.setUncancellable()) {
713                 return;
714             }
715             if (closeInitiated) {
716                 if (closePromise.isDone()) {
717                     // Closed already.
718                     promise.setSuccess();
719                 } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
720                     // This means close() was called before so we just register a listener and return
721                     closePromise.addListener(new ChannelFutureListener() {
722                         @Override
723                         public void operationComplete(ChannelFuture future) {
724                             promise.setSuccess();
725                         }
726                     });
727                 }
728                 return;
729             }
730             closeInitiated = true;
731             // Just set to false as removing from an underlying queue would even be more expensive.
732             readCompletePending = false;
733 
734             final boolean wasActive = isActive();
735 
736             // There is no need to update the local window as once the stream is closed all the pending bytes will be
737             // given back to the connection window by the controller itself.
738 
739             // Only ever send a reset frame if the connection is still alive and if the stream was created before
740             // as otherwise we may send a RST on a stream in an invalid state and cause a connection error.
741             if (parent().isActive() && isStreamIdValid(stream.id()) &&
742                     // Also ensure the stream was never "closed" before.
743                     !readEOS && !(receivedEndOfStream && sentEndOfStream)) {
744                 Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(error).stream(stream());
745                 write(resetFrame, unsafe().voidPromise());
746                 flush();
747             }
748 
749             if (inboundBuffer != null) {
750                 for (;;) {
751                     Object msg = inboundBuffer.poll();
752                     if (msg == null) {
753                         break;
754                     }
755                     ReferenceCountUtil.release(msg);
756                 }
757                 inboundBuffer = null;
758             }
759 
760             // The promise should be notified before we call fireChannelInactive().
761             outboundClosed = true;
762             closePromise.setSuccess();
763             promise.setSuccess();
764 
765             fireChannelInactiveAndDeregister(voidPromise(), wasActive);
766         }
767 
768         @Override
769         public void closeForcibly() {
770             close(unsafe().voidPromise());
771         }
772 
773         @Override
774         public void deregister(ChannelPromise promise) {
775             fireChannelInactiveAndDeregister(promise, false);
776         }
777 
778         private void fireChannelInactiveAndDeregister(final ChannelPromise promise,
779                                                       final boolean fireChannelInactive) {
780             if (!promise.setUncancellable()) {
781                 return;
782             }
783 
784             if (!registered) {
785                 promise.setSuccess();
786                 return;
787             }
788 
789             // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
790             // we need to ensure we do the actual deregister operation later. This is necessary to preserve the
791             // behavior of the AbstractChannel, which always invokes channelUnregistered and channelInactive
792             // events 'later' to ensure the current events in the handler are completed before these events.
793             //
794             // See:
795             // https://github.com/netty/netty/issues/4435
796             invokeLater(promise.channel(), new Runnable() {
797                 @Override
798                 public void run() {
799                     if (fireChannelInactive) {
800                         pipeline.fireChannelInactive();
801                     }
802                     // The user can fire `deregister` events multiple times but we only want to fire the pipeline
803                     // event if the channel was actually registered.
804                     if (registered) {
805                         registered = false;
806                         pipeline.fireChannelUnregistered();
807                     }
808                     safeSetSuccess(promise);
809                 }
810             });
811         }
812 
813         private void safeSetSuccess(ChannelPromise promise) {
814             if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
815                 logger.warn("{} Failed to mark a promise as success because it is done already: {}",
816                         promise.channel(), promise);
817             }
818         }
819 
820         private void invokeLater(Channel channel, Runnable task) {
821             try {
822                 // This method is used by outbound operation implementations to trigger an inbound event later.
823                 // They do not trigger an inbound event immediately because an outbound operation might have been
824                 // triggered by another inbound event handler method.  If fired immediately, the call stack
825                 // will look like this for example:
826                 //
827                 //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
828                 //   -> handlerA.ctx.close()
829                 //     -> channel.unsafe.close()
830                 //       -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
831                 //
832                 // which means the execution of two inbound handler methods of the same handler overlap undesirably.
833                 eventLoop().execute(task);
834             } catch (RejectedExecutionException e) {
835                 logger.warn("{} Can't invoke task later as EventLoop rejected it", channel, e);
836             }
837         }
838 
839         @Override
840         public void beginRead() {
841             if (!isActive()) {
842                 return;
843             }
844             updateLocalWindowIfNeeded();
845 
846             switch (readStatus) {
847                 case IDLE:
848                     readStatus = ReadStatus.IN_PROGRESS;
849                     doBeginRead();
850                     break;
851                 case IN_PROGRESS:
852                     readStatus = ReadStatus.REQUESTED;
853                     break;
854                 default:
855                     break;
856             }
857         }
858 
859         private Object pollQueuedMessage() {
860             return inboundBuffer == null ? null : inboundBuffer.poll();
861         }
862 
863         void doBeginRead() {
864             if (readStatus == ReadStatus.IDLE) {
865                 // Don't wait for the user to request a read to notify of channel closure.
866                 if (readEOS && (inboundBuffer == null || inboundBuffer.isEmpty())) {
867                     // Double check there is nothing left to flush such as a window update frame.
868                     flush();
869                     unsafe.closeForcibly();
870                 }
871             } else {
872                 do { // Process messages until there are none left (or the user stopped requesting) and also handle EOS.
873                     Object message = pollQueuedMessage();
874                     if (message == null) {
875                         // Double check there is nothing left to flush such as a window update frame.
876                         flush();
877                         if (readEOS) {
878                             unsafe.closeForcibly();
879                         }
880                         break;
881                     }
882                     final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
883                     allocHandle.reset(config());
884                     boolean continueReading = false;
885                     do {
886                         doRead0((Http2Frame) message, allocHandle);
887                     } while ((readEOS || (continueReading = allocHandle.continueReading()))
888                             && (message = pollQueuedMessage()) != null);
889 
890                     if (continueReading && isParentReadInProgress() && !readEOS) {
891                         // Currently the parent and child channel are on the same EventLoop thread. If the parent is
892                         // currently reading it is possible that more frames will be delivered to this child channel. In
893                         // the case that this child channel still wants to read we delay the channelReadComplete on this
894                         // child channel until the parent is done reading.
895                         maybeAddChannelToReadCompletePendingQueue();
896                     } else {
897                         notifyReadComplete(allocHandle, true, true);
898 
899                         // While in the read loop reset the readState AFTER calling readComplete (or other pipeline
900                         // callbacks) to prevents re-entry into this method (if autoRead is disabled and the user calls
901                         // read on each readComplete) and StackOverflowException.
902                         resetReadStatus();
903                     }
904                 } while (readStatus != ReadStatus.IDLE);
905             }
906         }
907 
908         void readEOS() {
909             readEOS = true;
910         }
911 
912         private boolean updateLocalWindowIfNeeded() {
913             if (flowControlledBytes != 0 && !parentContext().isRemoved() && config.autoStreamFlowControl) {
914                 int bytes = flowControlledBytes;
915                 flowControlledBytes = 0;
916                 writeWindowUpdateFrame(new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
917                 return true;
918             }
919             return false;
920         }
921 
922         void updateLocalWindowIfNeededAndFlush() {
923             if (updateLocalWindowIfNeeded()) {
924                 flush();
925             }
926         }
927 
928         private void resetReadStatus() {
929             readStatus = readStatus == ReadStatus.REQUESTED ? ReadStatus.IN_PROGRESS : ReadStatus.IDLE;
930         }
931 
932         void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle, boolean forceReadComplete,
933                                 boolean inReadLoop) {
934             if (!readCompletePending && !forceReadComplete) {
935                 return;
936             }
937             // Set to false just in case we added the channel multiple times before.
938             readCompletePending = false;
939 
940             if (!inReadLoop) {
941                 // While in the read loop we reset the state after calling pipeline methods to prevent StackOverflow.
942                 resetReadStatus();
943             }
944 
945             allocHandle.readComplete();
946             pipeline().fireChannelReadComplete();
947             // Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent
948             // channel is not currently reading we need to force a flush at the child channel, because we cannot
949             // rely upon flush occurring in channelReadComplete on the parent channel.
950             flush();
951             if (readEOS) {
952                 unsafe.closeForcibly();
953             }
954         }
955 
956         @SuppressWarnings("deprecation")
957         void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
958             final int bytes;
959             if (frame instanceof Http2DataFrame) {
960                 bytes = ((Http2DataFrame) frame).initialFlowControlledBytes();
961                 // It is important that we increment the flowControlledBytes before we call fireChannelRead(...)
962                 // as it may cause a read() that will call updateLocalWindowIfNeeded() and we need to ensure
963                 // in this case that we accounted for it.
964                 //
965                 // See https://github.com/netty/netty/issues/9663
966                 flowControlledBytes += bytes;
967             } else {
968                 bytes = MIN_HTTP2_FRAME_SIZE;
969             }
970 
971             // Let's keep track of what we received as the stream state itself will only be updated once the frame
972             // was dispatched for reading which might cause problems if we try to close the channel in a write future.
973             receivedEndOfStream |= isEndOfStream(frame);
974 
975             // Update before firing event through the pipeline to be consistent with other Channel implementation.
976             allocHandle.attemptedBytesRead(bytes);
977             allocHandle.lastBytesRead(bytes);
978             allocHandle.incMessagesRead(1);
979 
980             pipeline().fireChannelRead(frame);
981         }
982 
983         private ChannelFuture writeWindowUpdateFrame(Http2WindowUpdateFrame windowUpdateFrame) {
984             ChannelFuture future = write0(parentContext(), windowUpdateFrame);
985             // window update frames are commonly swallowed by the Http2FrameCodec and the promise is synchronously
986             // completed but the flow controller _may_ have generated a wire level WINDOW_UPDATE. Therefore we need,
987             // to assume there was a write done that needs to be flushed or we risk flow control starvation.
988             writeDoneAndNoFlush = true;
989             // Add a listener which will notify and teardown the stream
990             // when a window update fails if needed or check the result of the future directly if it was completed
991             // already.
992             // See https://github.com/netty/netty/issues/9663
993             if (future.isDone()) {
994                 windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
995             } else {
996                 future.addListener(windowUpdateFrameWriteListener);
997             }
998             return future;
999         }
1000 
1001         @Override
1002         public void write(Object msg, final ChannelPromise promise) {
1003             // After this point its not possible to cancel a write anymore.
1004             if (!promise.setUncancellable()) {
1005                 ReferenceCountUtil.release(msg);
1006                 return;
1007             }
1008 
1009             if (!isActive() ||
1010                     // Once the outbound side was closed we should not allow header / data frames
1011                     outboundClosed && (msg instanceof Http2HeadersFrame || msg instanceof Http2DataFrame)) {
1012                 ReferenceCountUtil.release(msg);
1013                 promise.setFailure(new ClosedChannelException());
1014                 return;
1015             }
1016 
1017             try {
1018                 if (msg instanceof Http2StreamFrame) {
1019                     Http2StreamFrame frame = validateStreamFrame((Http2StreamFrame) msg).stream(stream());
1020                     if (msg instanceof Http2WindowUpdateFrame) {
1021                         Http2WindowUpdateFrame updateFrame = (Http2WindowUpdateFrame) msg;
1022                         if (config.autoStreamFlowControl) {
1023                             ReferenceCountUtil.release(msg);
1024                             promise.setFailure(new UnsupportedOperationException(
1025                                     Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL + " is set to false"));
1026                             return;
1027                         }
1028                         try {
1029                             ObjectUtil.checkInRange(updateFrame.windowSizeIncrement(), 0,
1030                                     flowControlledBytes, "windowSizeIncrement");
1031                         } catch (RuntimeException e) {
1032                             ReferenceCountUtil.release(updateFrame);
1033                             promise.setFailure(e);
1034                             return;
1035                         }
1036                         flowControlledBytes -= updateFrame.windowSizeIncrement();
1037                         if (parentContext().isRemoved()) {
1038                             ReferenceCountUtil.release(msg);
1039                             promise.setFailure(new ClosedChannelException());
1040                             return;
1041                         }
1042                         ChannelFuture f = writeWindowUpdateFrame(updateFrame);
1043                         if (f.isDone()) {
1044                             writeComplete(f, promise);
1045                         } else {
1046                             f.addListener(new ChannelFutureListener() {
1047                                 @Override
1048                                 public void operationComplete(ChannelFuture future) {
1049                                     writeComplete(future, promise);
1050                                 }
1051                             });
1052                         }
1053                     } else {
1054                         writeHttp2StreamFrame(frame, promise);
1055                     }
1056                 } else {
1057                     String msgStr = msg.toString();
1058                     ReferenceCountUtil.release(msg);
1059                     promise.setFailure(new IllegalArgumentException(
1060                             "Message must be an " + StringUtil.simpleClassName(Http2StreamFrame.class) +
1061                                     ": " + msgStr));
1062                 }
1063             } catch (Throwable t) {
1064                 promise.tryFailure(t);
1065             }
1066         }
1067 
1068         private boolean isEndOfStream(Http2Frame frame) {
1069             if (frame instanceof Http2HeadersFrame) {
1070                 return ((Http2HeadersFrame) frame).isEndStream();
1071             }
1072             if (frame instanceof Http2DataFrame) {
1073                 return ((Http2DataFrame) frame).isEndStream();
1074             }
1075             return false;
1076         }
1077 
1078         private void writeHttp2StreamFrame(Http2StreamFrame frame, final ChannelPromise promise) {
1079             if (!firstFrameWritten && !isStreamIdValid(stream().id()) && !(frame instanceof Http2HeadersFrame)) {
1080                 ReferenceCountUtil.release(frame);
1081                 promise.setFailure(
1082                     new IllegalArgumentException("The first frame must be a headers frame. Was: "
1083                         + frame.name()));
1084                 return;
1085             }
1086 
1087             final boolean firstWrite;
1088             if (firstFrameWritten) {
1089                 firstWrite = false;
1090             } else {
1091                 firstWrite = firstFrameWritten = true;
1092             }
1093 
1094             // Let's keep track of what we send as the stream state itself will only be updated once the frame
1095             // was written which might cause problems if we try to close the channel in a write future.
1096             sentEndOfStream |= isEndOfStream(frame);
1097             ChannelFuture f = write0(parentContext(), frame);
1098             if (f.isDone()) {
1099                 if (firstWrite) {
1100                     firstWriteComplete(f, promise);
1101                 } else {
1102                     writeComplete(f, promise);
1103                 }
1104             } else {
1105                 final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(frame);
1106                 incrementPendingOutboundBytes(bytes, false);
1107                 f.addListener(new ChannelFutureListener() {
1108                     @Override
1109                     public void operationComplete(ChannelFuture future) {
1110                         if (firstWrite) {
1111                             firstWriteComplete(future, promise);
1112                         } else {
1113                             writeComplete(future, promise);
1114                         }
1115                         decrementPendingOutboundBytes(bytes, false);
1116                     }
1117                 });
1118                 writeDoneAndNoFlush = true;
1119             }
1120         }
1121 
1122         private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) {
1123             Throwable cause = future.cause();
1124             if (cause == null) {
1125                 promise.setSuccess();
1126             } else {
1127                 // If the first write fails there is not much we can do, just close
1128                 closeForcibly();
1129                 promise.setFailure(wrapStreamClosedError(cause));
1130             }
1131         }
1132 
1133         private void writeComplete(ChannelFuture future, ChannelPromise promise) {
1134             Throwable cause = future.cause();
1135             if (cause == null) {
1136                 promise.setSuccess();
1137             } else {
1138                 Throwable error = wrapStreamClosedError(cause);
1139                 // To make it more consistent with AbstractChannel we handle all IOExceptions here.
1140                 if (error instanceof IOException) {
1141                     if (config.isAutoClose()) {
1142                         // Close channel if needed.
1143                         closeForcibly();
1144                     } else {
1145                         // TODO: Once Http2StreamChannel extends DuplexChannel we should call shutdownOutput(...)
1146                         outboundClosed = true;
1147                     }
1148                 }
1149                 promise.setFailure(error);
1150             }
1151         }
1152 
1153         private Throwable wrapStreamClosedError(Throwable cause) {
1154             // If the error was caused by STREAM_CLOSED we should use a ClosedChannelException to better
1155             // mimic other transports and make it easier to reason about what exceptions to expect.
1156             if (cause instanceof Http2Exception && ((Http2Exception) cause).error() == Http2Error.STREAM_CLOSED) {
1157                 return new ClosedChannelException().initCause(cause);
1158             }
1159             return cause;
1160         }
1161 
1162         private Http2StreamFrame validateStreamFrame(Http2StreamFrame frame) {
1163             if (frame.stream() != null && frame.stream() != stream) {
1164                 String msgString = frame.toString();
1165                 ReferenceCountUtil.release(frame);
1166                 throw new IllegalArgumentException(
1167                         "Stream " + frame.stream() + " must not be set on the frame: " + msgString);
1168             }
1169             return frame;
1170         }
1171 
1172         @Override
1173         public void flush() {
1174             // If we are currently in the parent channel's read loop we should just ignore the flush.
1175             // We will ensure we trigger ctx.flush() after we processed all Channels later on and
1176             // so aggregate the flushes. This is done as ctx.flush() is expensive when as it may trigger an
1177             // write(...) or writev(...) operation on the socket.
1178             if (!writeDoneAndNoFlush || isParentReadInProgress()) {
1179                 // There is nothing to flush so this is a NOOP.
1180                 return;
1181             }
1182             // We need to set this to false before we call flush0(...) as ChannelFutureListener may produce more data
1183             // that are explicit flushed.
1184             writeDoneAndNoFlush = false;
1185             flush0(parentContext());
1186         }
1187 
1188         @Override
1189         public ChannelPromise voidPromise() {
1190             return unsafeVoidPromise;
1191         }
1192 
1193         @Override
1194         public ChannelOutboundBuffer outboundBuffer() {
1195             // Always return null as we not use the ChannelOutboundBuffer and not even support it.
1196             return null;
1197         }
1198     }
1199 
1200     /**
1201      * {@link ChannelConfig} so that the high and low writebuffer watermarks can reflect the outbound flow control
1202      * window, without having to create a new {@link WriteBufferWaterMark} object whenever the flow control window
1203      * changes.
1204      */
1205     private static final class Http2StreamChannelConfig extends DefaultChannelConfig {
1206 
1207         volatile boolean autoStreamFlowControl = true;
1208         Http2StreamChannelConfig(Channel channel) {
1209             super(channel);
1210         }
1211 
1212         @Override
1213         public MessageSizeEstimator getMessageSizeEstimator() {
1214             return FlowControlledFrameSizeEstimator.INSTANCE;
1215         }
1216 
1217         @Override
1218         public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
1219             throw new UnsupportedOperationException();
1220         }
1221 
1222         @Override
1223         public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
1224             if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
1225                 throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " +
1226                         RecvByteBufAllocator.ExtendedHandle.class);
1227             }
1228             super.setRecvByteBufAllocator(allocator);
1229             return this;
1230         }
1231 
1232         @Override
1233         public Map<ChannelOption<?>, Object> getOptions() {
1234             return getOptions(
1235                     super.getOptions(),
1236                     Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL);
1237         }
1238 
1239         @SuppressWarnings("unchecked")
1240         @Override
1241         public <T> T getOption(ChannelOption<T> option) {
1242             if (option == Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL) {
1243                 return (T) Boolean.valueOf(autoStreamFlowControl);
1244             }
1245             return super.getOption(option);
1246         }
1247 
1248         @Override
1249         public <T> boolean setOption(ChannelOption<T> option, T value) {
1250             validate(option, value);
1251             if (option == Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL) {
1252                 boolean newValue = (Boolean) value;
1253                 boolean changed = newValue && !autoStreamFlowControl;
1254                 autoStreamFlowControl = (Boolean) value;
1255                 if (changed) {
1256                     if (channel.isRegistered()) {
1257                         final Http2ChannelUnsafe unsafe = (Http2ChannelUnsafe) channel.unsafe();
1258                         if (channel.eventLoop().inEventLoop()) {
1259                             unsafe.updateLocalWindowIfNeededAndFlush();
1260                         } else {
1261                             channel.eventLoop().execute(new Runnable() {
1262                                 @Override
1263                                 public void run() {
1264                                     unsafe.updateLocalWindowIfNeededAndFlush();
1265                                 }
1266                             });
1267                         }
1268                     }
1269                 }
1270                 return true;
1271             }
1272             return super.setOption(option, value);
1273         }
1274     }
1275 
1276     private void maybeAddChannelToReadCompletePendingQueue() {
1277         if (!readCompletePending) {
1278             readCompletePending = true;
1279             addChannelToReadCompletePendingQueue();
1280         }
1281     }
1282 
1283     protected void flush0(ChannelHandlerContext ctx) {
1284         ctx.flush();
1285     }
1286 
1287     protected ChannelFuture write0(ChannelHandlerContext ctx, Object msg) {
1288         ChannelPromise promise = ctx.newPromise();
1289         ctx.write(msg, promise);
1290         return promise;
1291     }
1292 
1293     protected abstract boolean isParentReadInProgress();
1294     protected abstract void addChannelToReadCompletePendingQueue();
1295     protected abstract ChannelHandlerContext parentContext();
1296 }