View Javadoc
1   /*
2    * Copyright 2019 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandler;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelId;
26  import io.netty.channel.ChannelMetadata;
27  import io.netty.channel.ChannelOption;
28  import io.netty.channel.ChannelOutboundBuffer;
29  import io.netty.channel.ChannelPipeline;
30  import io.netty.channel.ChannelProgressivePromise;
31  import io.netty.channel.ChannelPromise;
32  import io.netty.channel.DefaultChannelConfig;
33  import io.netty.channel.DefaultChannelPipeline;
34  import io.netty.channel.EventLoop;
35  import io.netty.channel.MessageSizeEstimator;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.VoidChannelPromise;
38  import io.netty.channel.WriteBufferWaterMark;
39  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40  import io.netty.channel.socket.ChannelOutputShutdownEvent;
41  import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
42  import io.netty.handler.ssl.SslCloseCompletionEvent;
43  import io.netty.util.DefaultAttributeMap;
44  import io.netty.util.ReferenceCountUtil;
45  import io.netty.util.internal.ObjectUtil;
46  import io.netty.util.internal.StringUtil;
47  import io.netty.util.internal.logging.InternalLogger;
48  import io.netty.util.internal.logging.InternalLoggerFactory;
49  
50  import java.io.IOException;
51  import java.net.SocketAddress;
52  import java.nio.channels.ClosedChannelException;
53  import java.util.ArrayDeque;
54  import java.util.Map;
55  import java.util.Queue;
56  import java.util.concurrent.RejectedExecutionException;
57  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
58  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
59  
60  import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
61  import static io.netty.util.internal.ObjectUtil.checkNotNull;
62  import static java.lang.Math.min;
63  
64  abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
65  
66      static final Http2FrameStreamVisitor WRITABLE_VISITOR = new Http2FrameStreamVisitor() {
67          @Override
68          public boolean visit(Http2FrameStream stream) {
69              final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
70                      ((DefaultHttp2FrameStream) stream).attachment;
71              childChannel.trySetWritable();
72              return true;
73          }
74      };
75  
76      static final Http2FrameStreamVisitor CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR =
77              new UserEventStreamVisitor(ChannelInputShutdownReadComplete.INSTANCE);
78  
79      static final Http2FrameStreamVisitor CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR =
80              new UserEventStreamVisitor(ChannelOutputShutdownEvent.INSTANCE);
81  
82      static final Http2FrameStreamVisitor SSL_CLOSE_COMPLETION_EVENT_VISITOR =
83              new UserEventStreamVisitor(SslCloseCompletionEvent.SUCCESS);
84  
85      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractHttp2StreamChannel.class);
86  
87      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
88  
89      /**
90       * Number of bytes to consider non-payload messages. 9 is arbitrary, but also the minimum size of an HTTP/2 frame.
91       * Primarily is non-zero.
92       */
93      private static final int MIN_HTTP2_FRAME_SIZE = 9;
94  
95      /**
96       * {@link Http2FrameStreamVisitor} that fires the user event for every active stream pipeline.
97       */
98      private static final class UserEventStreamVisitor implements Http2FrameStreamVisitor {
99  
100         private final Object event;
101 
102         UserEventStreamVisitor(Object event) {
103             this.event = checkNotNull(event, "event");
104         }
105 
106         @Override
107         public boolean visit(Http2FrameStream stream) {
108             final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
109                     ((DefaultHttp2FrameStream) stream).attachment;
110             childChannel.pipeline().fireUserEventTriggered(event);
111             return true;
112         }
113     }
114 
115     /**
116      * Returns the flow-control size for DATA frames, and {@value MIN_HTTP2_FRAME_SIZE} for all other frames.
117      */
118     private static final class FlowControlledFrameSizeEstimator implements MessageSizeEstimator {
119 
120         static final FlowControlledFrameSizeEstimator INSTANCE = new FlowControlledFrameSizeEstimator();
121 
122         private static final Handle HANDLE_INSTANCE = new Handle() {
123             @Override
124             public int size(Object msg) {
125                 return msg instanceof Http2DataFrame ?
126                         // Guard against overflow.
127                         (int) min(Integer.MAX_VALUE, ((Http2DataFrame) msg).initialFlowControlledBytes() +
128                                 (long) MIN_HTTP2_FRAME_SIZE) : MIN_HTTP2_FRAME_SIZE;
129             }
130         };
131 
132         @Override
133         public Handle newHandle() {
134             return HANDLE_INSTANCE;
135         }
136     }
137 
138     private static final AtomicLongFieldUpdater<AbstractHttp2StreamChannel> TOTAL_PENDING_SIZE_UPDATER =
139             AtomicLongFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "totalPendingSize");
140 
141     private static final AtomicIntegerFieldUpdater<AbstractHttp2StreamChannel> UNWRITABLE_UPDATER =
142             AtomicIntegerFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "unwritable");
143 
144     private static void windowUpdateFrameWriteComplete(ChannelFuture future, Channel streamChannel) {
145         Throwable cause = future.cause();
146         if (cause != null) {
147             Throwable unwrappedCause;
148             // Unwrap if needed
149             if (cause instanceof Http2FrameStreamException && (unwrappedCause = cause.getCause()) != null) {
150                 cause = unwrappedCause;
151             }
152 
153             // Notify the child-channel and close it.
154             streamChannel.pipeline().fireExceptionCaught(cause);
155             streamChannel.unsafe().close(streamChannel.unsafe().voidPromise());
156         }
157     }
158 
159     private final ChannelFutureListener windowUpdateFrameWriteListener = new ChannelFutureListener() {
160         @Override
161         public void operationComplete(ChannelFuture future) {
162             windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
163         }
164     };
165 
166     /**
167      * The current status of the read-processing for a {@link AbstractHttp2StreamChannel}.
168      */
169     private enum ReadStatus {
170         /**
171          * No read in progress and no read was requested (yet)
172          */
173         IDLE,
174 
175         /**
176          * Reading in progress
177          */
178         IN_PROGRESS,
179 
180         /**
181          * A read operation was requested.
182          */
183         REQUESTED
184     }
185 
186     private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
187     private final Http2ChannelUnsafe unsafe = new Http2ChannelUnsafe();
188     private final ChannelId channelId;
189     private final ChannelPipeline pipeline;
190     private final DefaultHttp2FrameStream stream;
191     private final ChannelPromise closePromise;
192 
193     private volatile boolean registered;
194 
195     private volatile long totalPendingSize;
196     private volatile int unwritable;
197 
198     // Cached to reduce GC
199     private Runnable fireChannelWritabilityChangedTask;
200 
201     private boolean outboundClosed;
202     private int flowControlledBytes;
203 
204     /**
205      * This variable represents if a read is in progress for the current channel or was requested.
206      * Note that depending upon the {@link RecvByteBufAllocator} behavior a read may extend beyond the
207      * {@link Http2ChannelUnsafe#beginRead()} method scope. The {@link Http2ChannelUnsafe#beginRead()} loop may
208      * drain all pending data, and then if the parent channel is reading this channel may still accept frames.
209      */
210     private ReadStatus readStatus = ReadStatus.IDLE;
211 
212     private Queue<Object> inboundBuffer;
213 
214     /** {@code true} after the first HEADERS frame has been written **/
215     private boolean firstFrameWritten;
216     private boolean readCompletePending;
217 
218     AbstractHttp2StreamChannel(DefaultHttp2FrameStream stream, int id, ChannelHandler inboundHandler) {
219         this.stream = stream;
220         stream.attachment = this;
221         pipeline = new DefaultChannelPipeline(this) {
222             @Override
223             protected void incrementPendingOutboundBytes(long size) {
224                 AbstractHttp2StreamChannel.this.incrementPendingOutboundBytes(size, true);
225             }
226 
227             @Override
228             protected void decrementPendingOutboundBytes(long size) {
229                 AbstractHttp2StreamChannel.this.decrementPendingOutboundBytes(size, true);
230             }
231 
232             @Override
233             protected void onUnhandledInboundException(Throwable cause) {
234                 // Ensure we use the correct Http2Error to close the channel.
235                 if (cause instanceof Http2FrameStreamException) {
236                     closeWithError(((Http2FrameStreamException) cause).error());
237                     return;
238                 } else {
239                     Http2Exception exception = Http2CodecUtil.getEmbeddedHttp2Exception(cause);
240                     if (exception != null) {
241                         closeWithError(exception.error());
242                         return;
243                     }
244                 }
245                 super.onUnhandledInboundException(cause);
246             }
247         };
248 
249         closePromise = pipeline.newPromise();
250         channelId = new Http2StreamChannelId(parent().id(), id);
251 
252         if (inboundHandler != null) {
253             // Add the handler to the pipeline now that we are registered.
254             pipeline.addLast(inboundHandler);
255         }
256     }
257 
258     private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
259         if (size == 0) {
260             return;
261         }
262 
263         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
264         if (newWriteBufferSize > config().getWriteBufferHighWaterMark()) {
265             setUnwritable(invokeLater);
266         }
267     }
268 
269     private void decrementPendingOutboundBytes(long size, boolean invokeLater) {
270         if (size == 0) {
271             return;
272         }
273 
274         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
275         // Once the totalPendingSize dropped below the low water-mark we can mark the child channel
276         // as writable again. Before doing so we also need to ensure the parent channel is writable to
277         // prevent excessive buffering in the parent outbound buffer. If the parent is not writable
278         // we will mark the child channel as writable once the parent becomes writable by calling
279         // trySetWritable() later.
280         if (newWriteBufferSize < config().getWriteBufferLowWaterMark() && parent().isWritable()) {
281             setWritable(invokeLater);
282         }
283     }
284 
285     final void trySetWritable() {
286         // The parent is writable again but the child channel itself may still not be writable.
287         // Lets try to set the child channel writable to match the state of the parent channel
288         // if (and only if) the totalPendingSize is smaller then the low water-mark.
289         // If this is not the case we will try again later once we drop under it.
290         if (totalPendingSize < config().getWriteBufferLowWaterMark()) {
291             setWritable(false);
292         }
293     }
294 
295     private void setWritable(boolean invokeLater) {
296         for (;;) {
297             final int oldValue = unwritable;
298             final int newValue = oldValue & ~1;
299             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
300                 if (oldValue != 0 && newValue == 0) {
301                     fireChannelWritabilityChanged(invokeLater);
302                 }
303                 break;
304             }
305         }
306     }
307 
308     private void setUnwritable(boolean invokeLater) {
309         for (;;) {
310             final int oldValue = unwritable;
311             final int newValue = oldValue | 1;
312             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
313                 if (oldValue == 0) {
314                     fireChannelWritabilityChanged(invokeLater);
315                 }
316                 break;
317             }
318         }
319     }
320 
321     private void fireChannelWritabilityChanged(boolean invokeLater) {
322         final ChannelPipeline pipeline = pipeline();
323         if (invokeLater) {
324             Runnable task = fireChannelWritabilityChangedTask;
325             if (task == null) {
326                 fireChannelWritabilityChangedTask = task = new Runnable() {
327                     @Override
328                     public void run() {
329                         pipeline.fireChannelWritabilityChanged();
330                     }
331                 };
332             }
333             eventLoop().execute(task);
334         } else {
335             pipeline.fireChannelWritabilityChanged();
336         }
337     }
338     @Override
339     public Http2FrameStream stream() {
340         return stream;
341     }
342 
343     void closeOutbound() {
344         outboundClosed = true;
345     }
346 
347     void streamClosed() {
348         unsafe.readEOS();
349         // Attempt to drain any queued data from the queue and deliver it to the application before closing this
350         // channel.
351         unsafe.doBeginRead();
352     }
353 
354     @Override
355     public ChannelMetadata metadata() {
356         return METADATA;
357     }
358 
359     @Override
360     public ChannelConfig config() {
361         return config;
362     }
363 
364     @Override
365     public boolean isOpen() {
366         return !closePromise.isDone();
367     }
368 
369     @Override
370     public boolean isActive() {
371         return isOpen();
372     }
373 
374     @Override
375     public boolean isWritable() {
376         return unwritable == 0;
377     }
378 
379     @Override
380     public ChannelId id() {
381         return channelId;
382     }
383 
384     @Override
385     public EventLoop eventLoop() {
386         return parent().eventLoop();
387     }
388 
389     @Override
390     public Channel parent() {
391         return parentContext().channel();
392     }
393 
394     @Override
395     public boolean isRegistered() {
396         return registered;
397     }
398 
399     @Override
400     public SocketAddress localAddress() {
401         return parent().localAddress();
402     }
403 
404     @Override
405     public SocketAddress remoteAddress() {
406         return parent().remoteAddress();
407     }
408 
409     @Override
410     public ChannelFuture closeFuture() {
411         return closePromise;
412     }
413 
414     @Override
415     public long bytesBeforeUnwritable() {
416         // +1 because writability doesn't change until the threshold is crossed (not equal to).
417         long bytes = config().getWriteBufferHighWaterMark() - totalPendingSize + 1;
418         // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check
419         // writability. Note that totalPendingSize and isWritable() use different volatile variables that are not
420         // synchronized together. totalPendingSize will be updated before isWritable().
421         return bytes > 0 && isWritable() ? bytes : 0;
422     }
423 
424     @Override
425     public long bytesBeforeWritable() {
426         // +1 because writability doesn't change until the threshold is crossed (not equal to).
427         long bytes = totalPendingSize - config().getWriteBufferLowWaterMark() + 1;
428         // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
429         // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
430         // together. totalPendingSize will be updated before isWritable().
431         return bytes <= 0 || isWritable() ? 0 : bytes;
432     }
433 
434     @Override
435     public Unsafe unsafe() {
436         return unsafe;
437     }
438 
439     @Override
440     public ChannelPipeline pipeline() {
441         return pipeline;
442     }
443 
444     @Override
445     public ByteBufAllocator alloc() {
446         return config().getAllocator();
447     }
448 
449     @Override
450     public Channel read() {
451         pipeline().read();
452         return this;
453     }
454 
455     @Override
456     public Channel flush() {
457         pipeline().flush();
458         return this;
459     }
460 
461     @Override
462     public ChannelFuture bind(SocketAddress localAddress) {
463         return pipeline().bind(localAddress);
464     }
465 
466     @Override
467     public ChannelFuture connect(SocketAddress remoteAddress) {
468         return pipeline().connect(remoteAddress);
469     }
470 
471     @Override
472     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
473         return pipeline().connect(remoteAddress, localAddress);
474     }
475 
476     @Override
477     public ChannelFuture disconnect() {
478         return pipeline().disconnect();
479     }
480 
481     @Override
482     public ChannelFuture close() {
483         return pipeline().close();
484     }
485 
486     @Override
487     public ChannelFuture deregister() {
488         return pipeline().deregister();
489     }
490 
491     @Override
492     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
493         return pipeline().bind(localAddress, promise);
494     }
495 
496     @Override
497     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
498         return pipeline().connect(remoteAddress, promise);
499     }
500 
501     @Override
502     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
503         return pipeline().connect(remoteAddress, localAddress, promise);
504     }
505 
506     @Override
507     public ChannelFuture disconnect(ChannelPromise promise) {
508         return pipeline().disconnect(promise);
509     }
510 
511     @Override
512     public ChannelFuture close(ChannelPromise promise) {
513         return pipeline().close(promise);
514     }
515 
516     @Override
517     public ChannelFuture deregister(ChannelPromise promise) {
518         return pipeline().deregister(promise);
519     }
520 
521     @Override
522     public ChannelFuture write(Object msg) {
523         return pipeline().write(msg);
524     }
525 
526     @Override
527     public ChannelFuture write(Object msg, ChannelPromise promise) {
528         return pipeline().write(msg, promise);
529     }
530 
531     @Override
532     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
533         return pipeline().writeAndFlush(msg, promise);
534     }
535 
536     @Override
537     public ChannelFuture writeAndFlush(Object msg) {
538         return pipeline().writeAndFlush(msg);
539     }
540 
541     @Override
542     public ChannelPromise newPromise() {
543         return pipeline().newPromise();
544     }
545 
546     @Override
547     public ChannelProgressivePromise newProgressivePromise() {
548         return pipeline().newProgressivePromise();
549     }
550 
551     @Override
552     public ChannelFuture newSucceededFuture() {
553         return pipeline().newSucceededFuture();
554     }
555 
556     @Override
557     public ChannelFuture newFailedFuture(Throwable cause) {
558         return pipeline().newFailedFuture(cause);
559     }
560 
561     @Override
562     public ChannelPromise voidPromise() {
563         return pipeline().voidPromise();
564     }
565 
566     @Override
567     public int hashCode() {
568         return id().hashCode();
569     }
570 
571     @Override
572     public boolean equals(Object o) {
573         return this == o;
574     }
575 
576     @Override
577     public int compareTo(Channel o) {
578         if (this == o) {
579             return 0;
580         }
581 
582         return id().compareTo(o.id());
583     }
584 
585     @Override
586     public String toString() {
587         return parent().toString() + "(H2 - " + stream + ')';
588     }
589 
590     /**
591      * Receive a read message. This does not notify handlers unless a read is in progress on the
592      * channel.
593      */
594     void fireChildRead(Http2Frame frame) {
595         assert eventLoop().inEventLoop();
596         if (!isActive()) {
597             ReferenceCountUtil.release(frame);
598         } else if (readStatus != ReadStatus.IDLE) {
599             // If a read is in progress or has been requested, there cannot be anything in the queue,
600             // otherwise we would have drained it from the queue and processed it during the read cycle.
601             assert inboundBuffer == null || inboundBuffer.isEmpty();
602             final RecvByteBufAllocator.Handle allocHandle = unsafe.recvBufAllocHandle();
603 
604             unsafe.doRead0(frame, allocHandle);
605             // We currently don't need to check for readEOS because the parent channel and child channel are limited
606             // to the same EventLoop thread. There are a limited number of frame types that may come after EOS is
607             // read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the
608             // cost of additional readComplete notifications on the rare path.
609             if (allocHandle.continueReading()) {
610                 maybeAddChannelToReadCompletePendingQueue();
611             } else {
612                 unsafe.notifyReadComplete(allocHandle, true, false);
613             }
614         } else {
615             if (inboundBuffer == null) {
616                 inboundBuffer = new ArrayDeque<Object>(4);
617             }
618             inboundBuffer.add(frame);
619         }
620     }
621 
622     void fireChildReadComplete() {
623         assert eventLoop().inEventLoop();
624         assert readStatus != ReadStatus.IDLE || !readCompletePending;
625         unsafe.notifyReadComplete(unsafe.recvBufAllocHandle(), false, false);
626     }
627 
628     final void closeWithError(Http2Error error) {
629         assert eventLoop().inEventLoop();
630         unsafe.close(unsafe.voidPromise(), error);
631     }
632 
633     private final class Http2ChannelUnsafe implements Unsafe {
634         private final VoidChannelPromise unsafeVoidPromise =
635                 new VoidChannelPromise(AbstractHttp2StreamChannel.this, false);
636         @SuppressWarnings("deprecation")
637         private RecvByteBufAllocator.Handle recvHandle;
638         private boolean writeDoneAndNoFlush;
639         private boolean closeInitiated;
640         private boolean readEOS;
641 
642         private boolean receivedEndOfStream;
643         private boolean sentEndOfStream;
644 
645         @Override
646         public void connect(final SocketAddress remoteAddress,
647                             SocketAddress localAddress, final ChannelPromise promise) {
648             if (!promise.setUncancellable()) {
649                 return;
650             }
651             promise.setFailure(new UnsupportedOperationException());
652         }
653 
654         @Override
655         public RecvByteBufAllocator.Handle recvBufAllocHandle() {
656             if (recvHandle == null) {
657                 recvHandle = config().getRecvByteBufAllocator().newHandle();
658                 recvHandle.reset(config());
659             }
660             return recvHandle;
661         }
662 
663         @Override
664         public SocketAddress localAddress() {
665             return parent().unsafe().localAddress();
666         }
667 
668         @Override
669         public SocketAddress remoteAddress() {
670             return parent().unsafe().remoteAddress();
671         }
672 
673         @Override
674         public void register(EventLoop eventLoop, ChannelPromise promise) {
675             if (!promise.setUncancellable()) {
676                 return;
677             }
678             if (registered) {
679                 promise.setFailure(new UnsupportedOperationException("Re-register is not supported"));
680                 return;
681             }
682 
683             registered = true;
684 
685             promise.setSuccess();
686 
687             pipeline().fireChannelRegistered();
688             if (isActive()) {
689                 pipeline().fireChannelActive();
690             }
691         }
692 
693         @Override
694         public void bind(SocketAddress localAddress, ChannelPromise promise) {
695             if (!promise.setUncancellable()) {
696                 return;
697             }
698             promise.setFailure(new UnsupportedOperationException());
699         }
700 
701         @Override
702         public void disconnect(ChannelPromise promise) {
703             close(promise);
704         }
705 
706         @Override
707         public void close(final ChannelPromise promise) {
708             close(promise, null);
709         }
710 
711         private void close(final ChannelPromise promise, Http2Error error) {
712             if (!promise.setUncancellable()) {
713                 return;
714             }
715             if (closeInitiated) {
716                 if (closePromise.isDone()) {
717                     // Closed already.
718                     promise.setSuccess();
719                 } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
720                     // This means close() was called before so we just register a listener and return
721                     closePromise.addListener(new ChannelFutureListener() {
722                         @Override
723                         public void operationComplete(ChannelFuture future) {
724                             promise.setSuccess();
725                         }
726                     });
727                 }
728                 return;
729             }
730             closeInitiated = true;
731             // Just set to false as removing from an underlying queue would even be more expensive.
732             readCompletePending = false;
733 
734             final boolean wasActive = isActive();
735 
736             // There is no need to update the local window as once the stream is closed all the pending bytes will be
737             // given back to the connection window by the controller itself.
738 
739             // Only ever send a reset frame if the connection is still alive and if the stream was created before
740             // as otherwise we may send a RST on a stream in an invalid state and cause a connection error.
741             if (parent().isActive() && isStreamIdValid(stream.id())) {
742                 // If error is null we know that the close was not triggered by an error and so we should only
743                 // try to send a RST frame if we didn't signal the end of the stream before.
744                 if (error == null) {
745                     if (!readEOS && !(receivedEndOfStream && sentEndOfStream)) {
746                         Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream());
747                         write(resetFrame, unsafe().voidPromise());
748                         flush();
749                     }
750                 } else {
751                     // Close was triggered by a stream error, in this case we always want to send a RST frame.
752                     Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(error).stream(stream());
753                     write(resetFrame, unsafe().voidPromise());
754                     flush();
755                 }
756             }
757 
758             if (inboundBuffer != null) {
759                 for (;;) {
760                     Object msg = inboundBuffer.poll();
761                     if (msg == null) {
762                         break;
763                     }
764                     ReferenceCountUtil.release(msg);
765                 }
766                 inboundBuffer = null;
767             }
768 
769             // The promise should be notified before we call fireChannelInactive().
770             outboundClosed = true;
771             closePromise.setSuccess();
772             promise.setSuccess();
773 
774             fireChannelInactiveAndDeregister(voidPromise(), wasActive);
775         }
776 
777         @Override
778         public void closeForcibly() {
779             close(unsafe().voidPromise());
780         }
781 
782         @Override
783         public void deregister(ChannelPromise promise) {
784             fireChannelInactiveAndDeregister(promise, false);
785         }
786 
787         private void fireChannelInactiveAndDeregister(final ChannelPromise promise,
788                                                       final boolean fireChannelInactive) {
789             if (!promise.setUncancellable()) {
790                 return;
791             }
792 
793             if (!registered) {
794                 promise.setSuccess();
795                 return;
796             }
797 
798             // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
799             // we need to ensure we do the actual deregister operation later. This is necessary to preserve the
800             // behavior of the AbstractChannel, which always invokes channelUnregistered and channelInactive
801             // events 'later' to ensure the current events in the handler are completed before these events.
802             //
803             // See:
804             // https://github.com/netty/netty/issues/4435
805             invokeLater(promise.channel(), new Runnable() {
806                 @Override
807                 public void run() {
808                     if (fireChannelInactive) {
809                         pipeline.fireChannelInactive();
810                     }
811                     // The user can fire `deregister` events multiple times but we only want to fire the pipeline
812                     // event if the channel was actually registered.
813                     if (registered) {
814                         registered = false;
815                         pipeline.fireChannelUnregistered();
816                     }
817                     safeSetSuccess(promise);
818                 }
819             });
820         }
821 
822         private void safeSetSuccess(ChannelPromise promise) {
823             if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
824                 logger.warn("{} Failed to mark a promise as success because it is done already: {}",
825                         promise.channel(), promise);
826             }
827         }
828 
829         private void invokeLater(Channel channel, Runnable task) {
830             try {
831                 // This method is used by outbound operation implementations to trigger an inbound event later.
832                 // They do not trigger an inbound event immediately because an outbound operation might have been
833                 // triggered by another inbound event handler method.  If fired immediately, the call stack
834                 // will look like this for example:
835                 //
836                 //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
837                 //   -> handlerA.ctx.close()
838                 //     -> channel.unsafe.close()
839                 //       -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
840                 //
841                 // which means the execution of two inbound handler methods of the same handler overlap undesirably.
842                 eventLoop().execute(task);
843             } catch (RejectedExecutionException e) {
844                 logger.warn("{} Can't invoke task later as EventLoop rejected it", channel, e);
845             }
846         }
847 
848         @Override
849         public void beginRead() {
850             if (!isActive()) {
851                 return;
852             }
853             updateLocalWindowIfNeeded();
854 
855             switch (readStatus) {
856                 case IDLE:
857                     readStatus = ReadStatus.IN_PROGRESS;
858                     doBeginRead();
859                     break;
860                 case IN_PROGRESS:
861                     readStatus = ReadStatus.REQUESTED;
862                     break;
863                 default:
864                     break;
865             }
866         }
867 
868         private Object pollQueuedMessage() {
869             return inboundBuffer == null ? null : inboundBuffer.poll();
870         }
871 
872         void doBeginRead() {
873             if (readStatus == ReadStatus.IDLE) {
874                 // Don't wait for the user to request a read to notify of channel closure.
875                 if (readEOS && (inboundBuffer == null || inboundBuffer.isEmpty())) {
876                     // Double check there is nothing left to flush such as a window update frame.
877                     flush();
878                     unsafe.closeForcibly();
879                 }
880             } else {
881                 do { // Process messages until there are none left (or the user stopped requesting) and also handle EOS.
882                     Object message = pollQueuedMessage();
883                     if (message == null) {
884                         // Double check there is nothing left to flush such as a window update frame.
885                         flush();
886                         if (readEOS) {
887                             unsafe.closeForcibly();
888                         }
889                         break;
890                     }
891                     final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
892                     allocHandle.reset(config());
893                     boolean continueReading = false;
894                     do {
895                         doRead0((Http2Frame) message, allocHandle);
896                     } while ((readEOS || (continueReading = allocHandle.continueReading()))
897                             && (message = pollQueuedMessage()) != null);
898 
899                     if (continueReading && isParentReadInProgress() && !readEOS) {
900                         // Currently the parent and child channel are on the same EventLoop thread. If the parent is
901                         // currently reading it is possible that more frames will be delivered to this child channel. In
902                         // the case that this child channel still wants to read we delay the channelReadComplete on this
903                         // child channel until the parent is done reading.
904                         maybeAddChannelToReadCompletePendingQueue();
905                     } else {
906                         notifyReadComplete(allocHandle, true, true);
907 
908                         // While in the read loop reset the readState AFTER calling readComplete (or other pipeline
909                         // callbacks) to prevents re-entry into this method (if autoRead is disabled and the user calls
910                         // read on each readComplete) and StackOverflowException.
911                         resetReadStatus();
912                     }
913                 } while (readStatus != ReadStatus.IDLE);
914             }
915         }
916 
917         void readEOS() {
918             readEOS = true;
919         }
920 
921         private boolean updateLocalWindowIfNeeded() {
922             if (flowControlledBytes != 0 && !parentContext().isRemoved() && config.autoStreamFlowControl) {
923                 int bytes = flowControlledBytes;
924                 flowControlledBytes = 0;
925                 writeWindowUpdateFrame(new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
926                 return true;
927             }
928             return false;
929         }
930 
931         void updateLocalWindowIfNeededAndFlush() {
932             if (updateLocalWindowIfNeeded()) {
933                 flush();
934             }
935         }
936 
937         private void resetReadStatus() {
938             readStatus = readStatus == ReadStatus.REQUESTED ? ReadStatus.IN_PROGRESS : ReadStatus.IDLE;
939         }
940 
941         void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle, boolean forceReadComplete,
942                                 boolean inReadLoop) {
943             if (!readCompletePending && !forceReadComplete) {
944                 return;
945             }
946             // Set to false just in case we added the channel multiple times before.
947             readCompletePending = false;
948 
949             if (!inReadLoop) {
950                 // While in the read loop we reset the state after calling pipeline methods to prevent StackOverflow.
951                 resetReadStatus();
952             }
953 
954             allocHandle.readComplete();
955             pipeline().fireChannelReadComplete();
956             // Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent
957             // channel is not currently reading we need to force a flush at the child channel, because we cannot
958             // rely upon flush occurring in channelReadComplete on the parent channel.
959             flush();
960             if (readEOS) {
961                 unsafe.closeForcibly();
962             }
963         }
964 
965         @SuppressWarnings("deprecation")
966         void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
967             final int bytes;
968             if (frame instanceof Http2DataFrame) {
969                 bytes = ((Http2DataFrame) frame).initialFlowControlledBytes();
970                 // It is important that we increment the flowControlledBytes before we call fireChannelRead(...)
971                 // as it may cause a read() that will call updateLocalWindowIfNeeded() and we need to ensure
972                 // in this case that we accounted for it.
973                 //
974                 // See https://github.com/netty/netty/issues/9663
975                 flowControlledBytes += bytes;
976             } else {
977                 bytes = MIN_HTTP2_FRAME_SIZE;
978             }
979 
980             // Let's keep track of what we received as the stream state itself will only be updated once the frame
981             // was dispatched for reading which might cause problems if we try to close the channel in a write future.
982             receivedEndOfStream |= isEndOfStream(frame);
983 
984             // Update before firing event through the pipeline to be consistent with other Channel implementation.
985             allocHandle.attemptedBytesRead(bytes);
986             allocHandle.lastBytesRead(bytes);
987             allocHandle.incMessagesRead(1);
988 
989             pipeline().fireChannelRead(frame);
990         }
991 
992         private ChannelFuture writeWindowUpdateFrame(Http2WindowUpdateFrame windowUpdateFrame) {
993             ChannelFuture future = write0(parentContext(), windowUpdateFrame);
994             // window update frames are commonly swallowed by the Http2FrameCodec and the promise is synchronously
995             // completed but the flow controller _may_ have generated a wire level WINDOW_UPDATE. Therefore we need,
996             // to assume there was a write done that needs to be flushed or we risk flow control starvation.
997             writeDoneAndNoFlush = true;
998             // Add a listener which will notify and teardown the stream
999             // when a window update fails if needed or check the result of the future directly if it was completed
1000             // already.
1001             // See https://github.com/netty/netty/issues/9663
1002             if (future.isDone()) {
1003                 windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
1004             } else {
1005                 future.addListener(windowUpdateFrameWriteListener);
1006             }
1007             return future;
1008         }
1009 
1010         @Override
1011         public void write(Object msg, final ChannelPromise promise) {
1012             // After this point its not possible to cancel a write anymore.
1013             if (!promise.setUncancellable()) {
1014                 ReferenceCountUtil.release(msg);
1015                 return;
1016             }
1017 
1018             if (!isActive() ||
1019                     // Once the outbound side was closed we should not allow header / data frames
1020                     outboundClosed && (msg instanceof Http2HeadersFrame || msg instanceof Http2DataFrame)) {
1021                 ReferenceCountUtil.release(msg);
1022                 promise.setFailure(new ClosedChannelException());
1023                 return;
1024             }
1025 
1026             try {
1027                 if (msg instanceof Http2StreamFrame) {
1028                     Http2StreamFrame frame = validateStreamFrame((Http2StreamFrame) msg).stream(stream());
1029                     if (msg instanceof Http2WindowUpdateFrame) {
1030                         Http2WindowUpdateFrame updateFrame = (Http2WindowUpdateFrame) msg;
1031                         if (config.autoStreamFlowControl) {
1032                             ReferenceCountUtil.release(msg);
1033                             promise.setFailure(new UnsupportedOperationException(
1034                                     Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL + " is set to false"));
1035                             return;
1036                         }
1037                         try {
1038                             ObjectUtil.checkInRange(updateFrame.windowSizeIncrement(), 0,
1039                                     flowControlledBytes, "windowSizeIncrement");
1040                         } catch (RuntimeException e) {
1041                             ReferenceCountUtil.release(updateFrame);
1042                             promise.setFailure(e);
1043                             return;
1044                         }
1045                         flowControlledBytes -= updateFrame.windowSizeIncrement();
1046                         if (parentContext().isRemoved()) {
1047                             ReferenceCountUtil.release(msg);
1048                             promise.setFailure(new ClosedChannelException());
1049                             return;
1050                         }
1051                         ChannelFuture f = writeWindowUpdateFrame(updateFrame);
1052                         if (f.isDone()) {
1053                             writeComplete(f, promise);
1054                         } else {
1055                             f.addListener(new ChannelFutureListener() {
1056                                 @Override
1057                                 public void operationComplete(ChannelFuture future) {
1058                                     writeComplete(future, promise);
1059                                 }
1060                             });
1061                         }
1062                     } else {
1063                         writeHttp2StreamFrame(frame, promise);
1064                     }
1065                 } else {
1066                     String msgStr = msg.toString();
1067                     ReferenceCountUtil.release(msg);
1068                     promise.setFailure(new IllegalArgumentException(
1069                             "Message must be an " + StringUtil.simpleClassName(Http2StreamFrame.class) +
1070                                     ": " + msgStr));
1071                 }
1072             } catch (Throwable t) {
1073                 promise.tryFailure(t);
1074             }
1075         }
1076 
1077         private boolean isEndOfStream(Http2Frame frame) {
1078             if (frame instanceof Http2HeadersFrame) {
1079                 return ((Http2HeadersFrame) frame).isEndStream();
1080             }
1081             if (frame instanceof Http2DataFrame) {
1082                 return ((Http2DataFrame) frame).isEndStream();
1083             }
1084             return false;
1085         }
1086 
1087         private void writeHttp2StreamFrame(Http2StreamFrame frame, final ChannelPromise promise) {
1088             if (!firstFrameWritten && !isStreamIdValid(stream().id()) && !(frame instanceof Http2HeadersFrame)) {
1089                 ReferenceCountUtil.release(frame);
1090                 promise.setFailure(
1091                     new IllegalArgumentException("The first frame must be a headers frame. Was: "
1092                         + frame.name()));
1093                 return;
1094             }
1095 
1096             final boolean firstWrite;
1097             if (firstFrameWritten) {
1098                 firstWrite = false;
1099             } else {
1100                 firstWrite = firstFrameWritten = true;
1101             }
1102 
1103             // Let's keep track of what we send as the stream state itself will only be updated once the frame
1104             // was written which might cause problems if we try to close the channel in a write future.
1105             sentEndOfStream |= isEndOfStream(frame);
1106             ChannelFuture f = write0(parentContext(), frame);
1107             if (f.isDone()) {
1108                 if (firstWrite) {
1109                     firstWriteComplete(f, promise);
1110                 } else {
1111                     writeComplete(f, promise);
1112                 }
1113             } else {
1114                 final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(frame);
1115                 incrementPendingOutboundBytes(bytes, false);
1116                 f.addListener(new ChannelFutureListener() {
1117                     @Override
1118                     public void operationComplete(ChannelFuture future) {
1119                         if (firstWrite) {
1120                             firstWriteComplete(future, promise);
1121                         } else {
1122                             writeComplete(future, promise);
1123                         }
1124                         decrementPendingOutboundBytes(bytes, false);
1125                     }
1126                 });
1127                 writeDoneAndNoFlush = true;
1128             }
1129         }
1130 
1131         private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) {
1132             Throwable cause = future.cause();
1133             if (cause == null) {
1134                 promise.setSuccess();
1135             } else {
1136                 // If the first write fails there is not much we can do, just close
1137                 closeForcibly();
1138                 promise.setFailure(wrapStreamClosedError(cause));
1139             }
1140         }
1141 
1142         private void writeComplete(ChannelFuture future, ChannelPromise promise) {
1143             Throwable cause = future.cause();
1144             if (cause == null) {
1145                 promise.setSuccess();
1146             } else {
1147                 Throwable error = wrapStreamClosedError(cause);
1148                 // To make it more consistent with AbstractChannel we handle all IOExceptions here.
1149                 if (error instanceof IOException) {
1150                     if (config.isAutoClose()) {
1151                         // Close channel if needed.
1152                         closeForcibly();
1153                     } else {
1154                         // TODO: Once Http2StreamChannel extends DuplexChannel we should call shutdownOutput(...)
1155                         outboundClosed = true;
1156                     }
1157                 }
1158                 promise.setFailure(error);
1159             }
1160         }
1161 
1162         private Throwable wrapStreamClosedError(Throwable cause) {
1163             // If the error was caused by STREAM_CLOSED we should use a ClosedChannelException to better
1164             // mimic other transports and make it easier to reason about what exceptions to expect.
1165             if (cause instanceof Http2Exception && ((Http2Exception) cause).error() == Http2Error.STREAM_CLOSED) {
1166                 return new ClosedChannelException().initCause(cause);
1167             }
1168             return cause;
1169         }
1170 
1171         private Http2StreamFrame validateStreamFrame(Http2StreamFrame frame) {
1172             if (frame.stream() != null && frame.stream() != stream) {
1173                 String msgString = frame.toString();
1174                 ReferenceCountUtil.release(frame);
1175                 throw new IllegalArgumentException(
1176                         "Stream " + frame.stream() + " must not be set on the frame: " + msgString);
1177             }
1178             return frame;
1179         }
1180 
1181         @Override
1182         public void flush() {
1183             // If we are currently in the parent channel's read loop we should just ignore the flush.
1184             // We will ensure we trigger ctx.flush() after we processed all Channels later on and
1185             // so aggregate the flushes. This is done as ctx.flush() is expensive when as it may trigger an
1186             // write(...) or writev(...) operation on the socket.
1187             if (!writeDoneAndNoFlush || isParentReadInProgress()) {
1188                 // There is nothing to flush so this is a NOOP.
1189                 return;
1190             }
1191             // We need to set this to false before we call flush0(...) as ChannelFutureListener may produce more data
1192             // that are explicit flushed.
1193             writeDoneAndNoFlush = false;
1194             flush0(parentContext());
1195         }
1196 
1197         @Override
1198         public ChannelPromise voidPromise() {
1199             return unsafeVoidPromise;
1200         }
1201 
1202         @Override
1203         public ChannelOutboundBuffer outboundBuffer() {
1204             // Always return null as we not use the ChannelOutboundBuffer and not even support it.
1205             return null;
1206         }
1207     }
1208 
1209     /**
1210      * {@link ChannelConfig} so that the high and low writebuffer watermarks can reflect the outbound flow control
1211      * window, without having to create a new {@link WriteBufferWaterMark} object whenever the flow control window
1212      * changes.
1213      */
1214     private static final class Http2StreamChannelConfig extends DefaultChannelConfig {
1215 
1216         volatile boolean autoStreamFlowControl = true;
1217         Http2StreamChannelConfig(Channel channel) {
1218             super(channel);
1219         }
1220 
1221         @Override
1222         public MessageSizeEstimator getMessageSizeEstimator() {
1223             return FlowControlledFrameSizeEstimator.INSTANCE;
1224         }
1225 
1226         @Override
1227         public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
1228             throw new UnsupportedOperationException();
1229         }
1230 
1231         @Override
1232         public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
1233             if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
1234                 throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " +
1235                         RecvByteBufAllocator.ExtendedHandle.class);
1236             }
1237             super.setRecvByteBufAllocator(allocator);
1238             return this;
1239         }
1240 
1241         @Override
1242         public Map<ChannelOption<?>, Object> getOptions() {
1243             return getOptions(
1244                     super.getOptions(),
1245                     Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL);
1246         }
1247 
1248         @SuppressWarnings("unchecked")
1249         @Override
1250         public <T> T getOption(ChannelOption<T> option) {
1251             if (option == Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL) {
1252                 return (T) Boolean.valueOf(autoStreamFlowControl);
1253             }
1254             return super.getOption(option);
1255         }
1256 
1257         @Override
1258         public <T> boolean setOption(ChannelOption<T> option, T value) {
1259             validate(option, value);
1260             if (option == Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL) {
1261                 boolean newValue = (Boolean) value;
1262                 boolean changed = newValue && !autoStreamFlowControl;
1263                 autoStreamFlowControl = (Boolean) value;
1264                 if (changed) {
1265                     if (channel.isRegistered()) {
1266                         final Http2ChannelUnsafe unsafe = (Http2ChannelUnsafe) channel.unsafe();
1267                         if (channel.eventLoop().inEventLoop()) {
1268                             unsafe.updateLocalWindowIfNeededAndFlush();
1269                         } else {
1270                             channel.eventLoop().execute(new Runnable() {
1271                                 @Override
1272                                 public void run() {
1273                                     unsafe.updateLocalWindowIfNeededAndFlush();
1274                                 }
1275                             });
1276                         }
1277                     }
1278                 }
1279                 return true;
1280             }
1281             return super.setOption(option, value);
1282         }
1283     }
1284 
1285     private void maybeAddChannelToReadCompletePendingQueue() {
1286         if (!readCompletePending) {
1287             readCompletePending = true;
1288             addChannelToReadCompletePendingQueue();
1289         }
1290     }
1291 
1292     protected void flush0(ChannelHandlerContext ctx) {
1293         ctx.flush();
1294     }
1295 
1296     protected ChannelFuture write0(ChannelHandlerContext ctx, Object msg) {
1297         ChannelPromise promise = ctx.newPromise();
1298         ctx.write(msg, promise);
1299         return promise;
1300     }
1301 
1302     protected abstract boolean isParentReadInProgress();
1303     protected abstract void addChannelToReadCompletePendingQueue();
1304     protected abstract ChannelHandlerContext parentContext();
1305 }