View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelHandler;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelId;
27  import io.netty.channel.ChannelMetadata;
28  import io.netty.channel.ChannelOutboundBuffer;
29  import io.netty.channel.ChannelPipeline;
30  import io.netty.channel.ChannelProgressivePromise;
31  import io.netty.channel.ChannelPromise;
32  import io.netty.channel.DefaultChannelConfig;
33  import io.netty.channel.DefaultChannelPipeline;
34  import io.netty.channel.EventLoop;
35  import io.netty.channel.MessageSizeEstimator;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.RecvByteBufAllocator.Handle;
38  import io.netty.channel.VoidChannelPromise;
39  import io.netty.channel.WriteBufferWaterMark;
40  import io.netty.util.DefaultAttributeMap;
41  import io.netty.util.ReferenceCountUtil;
42  import io.netty.util.ReferenceCounted;
43  import io.netty.util.internal.StringUtil;
44  import io.netty.util.internal.ThrowableUtil;
45  import io.netty.util.internal.UnstableApi;
46  import io.netty.util.internal.logging.InternalLogger;
47  import io.netty.util.internal.logging.InternalLoggerFactory;
48  
49  import java.net.SocketAddress;
50  import java.nio.channels.ClosedChannelException;
51  import java.util.ArrayDeque;
52  import java.util.Queue;
53  import java.util.concurrent.RejectedExecutionException;
54  
55  import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
56  import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
57  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
58  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
59  import static java.lang.Math.min;
60  
61  /**
62   * An HTTP/2 handler that creates child channels for each stream.
63   *
64   * <p>When a new stream is created, a new {@link Channel} is created for it. Applications send and
65   * receive {@link Http2StreamFrame}s on the created channel. {@link ByteBuf}s cannot be processed by the channel;
66   * all writes that reach the head of the pipeline must be an instance of {@link Http2StreamFrame}. Writes that reach
67   * the head of the pipeline are processed directly by this handler and cannot be intercepted.
68   *
69   * <p>The child channel will be notified of user events that impact the stream, such as {@link
70   * Http2GoAwayFrame} and {@link Http2ResetFrame}, as soon as they occur. Although {@code
71   * Http2GoAwayFrame} and {@code Http2ResetFrame} signify that the remote is ignoring further
72   * communication, closing of the channel is delayed until any inbound queue is drained with {@link
73   * Channel#read()}, which follows the default behavior of channels in Netty. Applications are
74   * free to close the channel in response to such events if they don't have use for any queued
75   * messages. Any connection level events like {@link Http2SettingsFrame} and {@link Http2GoAwayFrame}
76   * will be processed internally and also propagated down the pipeline for other handlers to act on.
77   *
78   * <p>Outbound streams are supported via the {@link Http2StreamChannelBootstrap}.
79   *
80   * <p>{@link ChannelConfig#setMaxMessagesPerRead(int)} and {@link ChannelConfig#setAutoRead(boolean)} are supported.
81   *
82   * <h3>Reference Counting</h3>
83   *
84   * Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
85   * reference counted objects (e.g. {@link ByteBuf}s). The multiplex codec will call {@link ReferenceCounted#retain()}
86   * before propagating a reference counted object through the pipeline, and thus an application handler needs to release
87   * such an object after having consumed it. For more information on reference counting take a look at
88   * http://netty.io/wiki/reference-counted-objects.html
89   *
90   * <h3>Channel Events</h3>
91   *
92   * A child channel becomes active as soon as it is registered to an {@link EventLoop}. Therefore, an active channel
93   * does not map to an active HTTP/2 stream immediately. Only once a {@link Http2HeadersFrame} has been successfully sent
94   * or received, does the channel map to an active HTTP/2 stream. In case it is not possible to open a new HTTP/2 stream
95   * (i.e. due to the maximum number of active streams being exceeded), the child channel receives an exception
96   * indicating the cause and is closed immediately thereafter.
97   *
98   * <h3>Writability and Flow Control</h3>
99   *
100  * A child channel observes outbound/remote flow control via the channel's writability. A channel only becomes writable
101  * when it maps to an active HTTP/2 stream and the stream's flow control window is greater than zero. A child channel
102  * does not know about the connection-level flow control window. {@link ChannelHandler}s are free to ignore the
103  * channel's writability, in which case the excessive writes will be buffered by the parent channel. It's important to
104  * note that only {@link Http2DataFrame}s are subject to HTTP/2 flow control.
105  */
106 @UnstableApi
107 public class Http2MultiplexCodec extends Http2FrameCodec {
108 
109     private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2StreamChannel.class);
110 
111     private static final ChannelFutureListener CHILD_CHANNEL_REGISTRATION_LISTENER = new ChannelFutureListener() {
112         @Override
113         public void operationComplete(ChannelFuture future) {
114             registerDone(future);
115         }
116     };
117 
118     private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
119     private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
120             new ClosedChannelException(), DefaultHttp2StreamChannel.Http2ChannelUnsafe.class, "write(...)");
121     /**
122      * Number of bytes to consider non-payload messages. 9 is arbitrary, but also the minimum size of an HTTP/2 frame.
123      * Primarily is non-zero.
124      */
125     private static final int MIN_HTTP2_FRAME_SIZE = 9;
126 
127     /**
128      * Returns the flow-control size for DATA frames, and 0 for all other frames.
129      */
130     private static final class FlowControlledFrameSizeEstimator implements MessageSizeEstimator {
131 
132         static final FlowControlledFrameSizeEstimator INSTANCE = new FlowControlledFrameSizeEstimator();
133 
134         static final MessageSizeEstimator.Handle HANDLE_INSTANCE = new MessageSizeEstimator.Handle() {
135             @Override
136             public int size(Object msg) {
137                 return msg instanceof Http2DataFrame ?
138                         // Guard against overflow.
139                         (int) min(Integer.MAX_VALUE, ((Http2DataFrame) msg).initialFlowControlledBytes() +
140                                 (long) MIN_HTTP2_FRAME_SIZE) : MIN_HTTP2_FRAME_SIZE;
141             }
142         };
143 
144         @Override
145         public Handle newHandle() {
146             return HANDLE_INSTANCE;
147         }
148     }
149 
150     private final ChannelHandler inboundStreamHandler;
151     private final ChannelHandler upgradeStreamHandler;
152 
153     private int initialOutboundStreamWindow = Http2CodecUtil.DEFAULT_WINDOW_SIZE;
154     private boolean parentReadInProgress;
155     private int idCount;
156 
157     // Linked-List for DefaultHttp2StreamChannel instances that need to be processed by channelReadComplete(...)
158     private DefaultHttp2StreamChannel head;
159     private DefaultHttp2StreamChannel tail;
160 
161     // Need to be volatile as accessed from within the DefaultHttp2StreamChannel in a multi-threaded fashion.
162     volatile ChannelHandlerContext ctx;
163 
164     Http2MultiplexCodec(Http2ConnectionEncoder encoder,
165                         Http2ConnectionDecoder decoder,
166                         Http2Settings initialSettings,
167                         ChannelHandler inboundStreamHandler,
168                         ChannelHandler upgradeStreamHandler) {
169         super(encoder, decoder, initialSettings);
170         this.inboundStreamHandler = inboundStreamHandler;
171         this.upgradeStreamHandler = upgradeStreamHandler;
172     }
173 
174     @Override
175     public void onHttpClientUpgrade() throws Http2Exception {
176         // We must have an upgrade handler or else we can't handle the stream
177         if (upgradeStreamHandler == null) {
178             throw connectionError(INTERNAL_ERROR, "Client is misconfigured for upgrade requests");
179         }
180         // Creates the Http2Stream in the Connection.
181         super.onHttpClientUpgrade();
182         // Now make a new FrameStream, set it's underlying Http2Stream, and initialize it.
183         Http2MultiplexCodecStream codecStream = newStream();
184         codecStream.setStreamAndProperty(streamKey, connection().stream(HTTP_UPGRADE_STREAM_ID));
185         onHttp2UpgradeStreamInitialized(ctx, codecStream);
186     }
187 
188     private static void registerDone(ChannelFuture future) {
189         // Handle any errors that occurred on the local thread while registering. Even though
190         // failures can happen after this point, they will be handled by the channel by closing the
191         // childChannel.
192         if (!future.isSuccess()) {
193             Channel childChannel = future.channel();
194             if (childChannel.isRegistered()) {
195                 childChannel.close();
196             } else {
197                 childChannel.unsafe().closeForcibly();
198             }
199         }
200     }
201 
202     @Override
203     public final void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
204         if (ctx.executor() != ctx.channel().eventLoop()) {
205             throw new IllegalStateException("EventExecutor must be EventLoop of Channel");
206         }
207         this.ctx = ctx;
208     }
209 
210     @Override
211     public final void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
212         super.handlerRemoved0(ctx);
213 
214         // Unlink the linked list to guard against GC nepotism.
215         DefaultHttp2StreamChannel ch = head;
216         while (ch != null) {
217             DefaultHttp2StreamChannel curr = ch;
218             ch = curr.next;
219             curr.next = curr.previous = null;
220         }
221         head = tail = null;
222     }
223 
224     @Override
225     Http2MultiplexCodecStream newStream() {
226         return new Http2MultiplexCodecStream();
227     }
228 
229     @Override
230     final void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
231         if (frame instanceof Http2StreamFrame) {
232             Http2StreamFrame streamFrame = (Http2StreamFrame) frame;
233             ((Http2MultiplexCodecStream) streamFrame.stream()).channel.fireChildRead(streamFrame);
234         } else if (frame instanceof Http2GoAwayFrame) {
235             onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) frame);
236             // Allow other handlers to act on GOAWAY frame
237             ctx.fireChannelRead(frame);
238         } else if (frame instanceof Http2SettingsFrame) {
239             Http2Settings settings = ((Http2SettingsFrame) frame).settings();
240             if (settings.initialWindowSize() != null) {
241                 initialOutboundStreamWindow = settings.initialWindowSize();
242             }
243             // Allow other handlers to act on SETTINGS frame
244             ctx.fireChannelRead(frame);
245         } else {
246             // Send any other frames down the pipeline
247             ctx.fireChannelRead(frame);
248         }
249     }
250 
251     private void onHttp2UpgradeStreamInitialized(ChannelHandlerContext ctx, Http2MultiplexCodecStream stream) {
252         assert stream.state() == Http2Stream.State.HALF_CLOSED_LOCAL;
253         DefaultHttp2StreamChannel ch = new DefaultHttp2StreamChannel(stream, true);
254         ch.outboundClosed = true;
255 
256         // Add our upgrade handler to the channel and then register the channel.
257         // The register call fires the channelActive, etc.
258         ch.pipeline().addLast(upgradeStreamHandler);
259         ChannelFuture future = ctx.channel().eventLoop().register(ch);
260         if (future.isDone()) {
261             registerDone(future);
262         } else {
263             future.addListener(CHILD_CHANNEL_REGISTRATION_LISTENER);
264         }
265     }
266 
267     @Override
268     final void onHttp2StreamStateChanged(ChannelHandlerContext ctx, Http2FrameStream stream) {
269         Http2MultiplexCodecStream s = (Http2MultiplexCodecStream) stream;
270 
271         switch (stream.state()) {
272             case HALF_CLOSED_REMOTE:
273             case OPEN:
274                 if (s.channel != null) {
275                     // ignore if child channel was already created.
276                     break;
277                 }
278                 // fall-trough
279                 ChannelFuture future = ctx.channel().eventLoop().register(new DefaultHttp2StreamChannel(s, false));
280                 if (future.isDone()) {
281                     registerDone(future);
282                 } else {
283                     future.addListener(CHILD_CHANNEL_REGISTRATION_LISTENER);
284                 }
285                 break;
286             case CLOSED:
287                 DefaultHttp2StreamChannel channel = s.channel;
288                 if (channel != null) {
289                     channel.streamClosed();
290                 }
291                 break;
292             default:
293                 // ignore for now
294                 break;
295         }
296     }
297 
298     @Override
299     final void onHttp2StreamWritabilityChanged(ChannelHandlerContext ctx, Http2FrameStream stream, boolean writable) {
300         (((Http2MultiplexCodecStream) stream).channel).writabilityChanged(writable);
301     }
302 
303     // TODO: This is most likely not the best way to expose this, need to think more about it.
304     final Http2StreamChannel newOutboundStream() {
305         return new DefaultHttp2StreamChannel(newStream(), true);
306     }
307 
308     @Override
309     final void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
310         Http2FrameStream stream = cause.stream();
311         DefaultHttp2StreamChannel childChannel = ((Http2MultiplexCodecStream) stream).channel;
312 
313         try {
314             childChannel.pipeline().fireExceptionCaught(cause.getCause());
315         } finally {
316             childChannel.unsafe().closeForcibly();
317         }
318     }
319 
320     private boolean isChildChannelInReadPendingQueue(DefaultHttp2StreamChannel childChannel) {
321         return childChannel.previous != null || childChannel.next != null || head == childChannel;
322     }
323 
324     final void tryAddChildChannelToReadPendingQueue(DefaultHttp2StreamChannel childChannel) {
325         if (!isChildChannelInReadPendingQueue(childChannel)) {
326             addChildChannelToReadPendingQueue(childChannel);
327         }
328     }
329 
330     final void addChildChannelToReadPendingQueue(DefaultHttp2StreamChannel childChannel) {
331         if (tail == null) {
332             assert head == null;
333             tail = head = childChannel;
334         } else {
335             childChannel.previous = tail;
336             tail.next = childChannel;
337             tail = childChannel;
338         }
339     }
340 
341     private void tryRemoveChildChannelFromReadPendingQueue(DefaultHttp2StreamChannel childChannel) {
342         if (isChildChannelInReadPendingQueue(childChannel)) {
343             removeChildChannelFromReadPendingQueue(childChannel);
344         }
345     }
346 
347     private void removeChildChannelFromReadPendingQueue(DefaultHttp2StreamChannel childChannel) {
348         DefaultHttp2StreamChannel previous = childChannel.previous;
349         if (childChannel.next != null) {
350             childChannel.next.previous = previous;
351         } else {
352             tail = tail.previous; // If there is no next, this childChannel is the tail, so move the tail back.
353         }
354         if (previous != null) {
355             previous.next = childChannel.next;
356         } else {
357             head = head.next; // If there is no previous, this childChannel is the head, so move the tail forward.
358         }
359         childChannel.next = childChannel.previous = null;
360     }
361 
362     private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
363         try {
364             forEachActiveStream(new Http2FrameStreamVisitor() {
365                 @Override
366                 public boolean visit(Http2FrameStream stream) {
367                     final int streamId = stream.id();
368                     final DefaultHttp2StreamChannel childChannel = ((Http2MultiplexCodecStream) stream).channel;
369                     if (streamId > goAwayFrame.lastStreamId() && connection().local().isValidStreamId(streamId)) {
370                         childChannel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
371                     }
372                     return true;
373                 }
374             });
375         } catch (Http2Exception e) {
376             ctx.fireExceptionCaught(e);
377             ctx.close();
378         }
379     }
380 
381     /**
382      * Notifies any child streams of the read completion.
383      */
384     @Override
385     public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
386         try {
387             onChannelReadComplete(ctx);
388         } finally {
389             parentReadInProgress = false;
390             tail = head = null;
391             // We always flush as this is what Http2ConnectionHandler does for now.
392             flush0(ctx);
393         }
394         channelReadComplete0(ctx);
395     }
396 
397     @Override
398     public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
399         parentReadInProgress = true;
400         super.channelRead(ctx, msg);
401     }
402 
403     final void onChannelReadComplete(ChannelHandlerContext ctx)  {
404         // If we have many child channel we can optimize for the case when multiple call flush() in
405         // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
406         // write calls on the socket which is expensive.
407         DefaultHttp2StreamChannel current = head;
408         while (current != null) {
409             DefaultHttp2StreamChannel childChannel = current;
410             // Clear early in case fireChildReadComplete() causes it to need to be re-processed
411             current = current.next;
412             childChannel.next = childChannel.previous = null;
413             childChannel.fireChildReadComplete();
414         }
415     }
416 
417     // Allow to override for testing
418     void flush0(ChannelHandlerContext ctx) {
419         flush(ctx);
420     }
421 
422     /**
423      * Return bytes to flow control.
424      * <p>
425      * Package private to allow to override for testing
426      * @param ctx The {@link ChannelHandlerContext} associated with the parent channel.
427      * @param stream The object representing the HTTP/2 stream.
428      * @param bytes The number of bytes to return to flow control.
429      * @return {@code true} if a frame has been written as a result of this method call.
430      * @throws Http2Exception If this operation violates the flow control limits.
431      */
432     boolean onBytesConsumed(@SuppressWarnings("unused") ChannelHandlerContext ctx,
433                          Http2FrameStream stream, int bytes) throws Http2Exception {
434         return consumeBytes(stream.id(), bytes);
435     }
436 
437     // Allow to extend for testing
438     static class Http2MultiplexCodecStream extends DefaultHttp2FrameStream {
439         DefaultHttp2StreamChannel channel;
440     }
441 
442     private boolean initialWritability(DefaultHttp2FrameStream stream) {
443         // If the stream id is not valid yet we will just mark the channel as writable as we will be notified
444         // about non-writability state as soon as the first Http2HeaderFrame is written (if needed).
445         // This should be good enough and simplify things a lot.
446         return !isStreamIdValid(stream.id()) || isWritable(stream);
447     }
448 
449     // TODO: Handle writability changes due writing from outside the eventloop.
450     private final class DefaultHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
451         private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
452         private final Http2ChannelUnsafe unsafe = new Http2ChannelUnsafe();
453         private final ChannelId channelId;
454         private final ChannelPipeline pipeline;
455         private final DefaultHttp2FrameStream stream;
456         private final ChannelPromise closePromise;
457         private final boolean outbound;
458 
459         private volatile boolean registered;
460         // We start with the writability of the channel when creating the StreamChannel.
461         private volatile boolean writable;
462 
463         private boolean outboundClosed;
464         /**
465          * This variable represents if a read is in progress for the current channel. Note that depending upon the
466          * {@link RecvByteBufAllocator} behavior a read may extend beyond the {@link Http2ChannelUnsafe#beginRead()}
467          * method scope. The {@link Http2ChannelUnsafe#beginRead()} loop may drain all pending data, and then if the
468          * parent channel is reading this channel may still accept frames.
469          */
470         private boolean readInProgress;
471         private Queue<Object> inboundBuffer;
472 
473         /** {@code true} after the first HEADERS frame has been written **/
474         private boolean firstFrameWritten;
475 
476         // Currently the child channel and parent channel are always on the same EventLoop thread. This allows us to
477         // extend the read loop of a child channel if the child channel drains its queued data during read, and the
478         // parent channel is still in its read loop. The next/previous links build a doubly linked list that the parent
479         // channel will iterate in its channelReadComplete to end the read cycle for each child channel in the list.
480         DefaultHttp2StreamChannel next;
481         DefaultHttp2StreamChannel previous;
482 
483         DefaultHttp2StreamChannel(DefaultHttp2FrameStream stream, boolean outbound) {
484             this.stream = stream;
485             this.outbound = outbound;
486             writable = initialWritability(stream);
487             ((Http2MultiplexCodecStream) stream).channel = this;
488             pipeline = new DefaultChannelPipeline(this) {
489                 @Override
490                 protected void incrementPendingOutboundBytes(long size) {
491                     // Do thing for now
492                 }
493 
494                 @Override
495                 protected void decrementPendingOutboundBytes(long size) {
496                     // Do thing for now
497                 }
498             };
499             closePromise = pipeline.newPromise();
500             channelId = new Http2StreamChannelId(parent().id(), ++idCount);
501         }
502 
503         @Override
504         public Http2FrameStream stream() {
505             return stream;
506         }
507 
508         void streamClosed() {
509             unsafe.readEOS();
510             // Attempt to drain any queued data from the queue and deliver it to the application before closing this
511             // channel.
512             unsafe.doBeginRead();
513         }
514 
515         @Override
516         public ChannelMetadata metadata() {
517             return METADATA;
518         }
519 
520         @Override
521         public ChannelConfig config() {
522             return config;
523         }
524 
525         @Override
526         public boolean isOpen() {
527             return !closePromise.isDone();
528         }
529 
530         @Override
531         public boolean isActive() {
532             return isOpen();
533         }
534 
535         @Override
536         public boolean isWritable() {
537             return writable;
538         }
539 
540         @Override
541         public ChannelId id() {
542             return channelId;
543         }
544 
545         @Override
546         public EventLoop eventLoop() {
547             return parent().eventLoop();
548         }
549 
550         @Override
551         public Channel parent() {
552             return ctx.channel();
553         }
554 
555         @Override
556         public boolean isRegistered() {
557             return registered;
558         }
559 
560         @Override
561         public SocketAddress localAddress() {
562             return parent().localAddress();
563         }
564 
565         @Override
566         public SocketAddress remoteAddress() {
567             return parent().remoteAddress();
568         }
569 
570         @Override
571         public ChannelFuture closeFuture() {
572             return closePromise;
573         }
574 
575         @Override
576         public long bytesBeforeUnwritable() {
577             // TODO: Do a proper impl
578             return config().getWriteBufferHighWaterMark();
579         }
580 
581         @Override
582         public long bytesBeforeWritable() {
583             // TODO: Do a proper impl
584             return 0;
585         }
586 
587         @Override
588         public Unsafe unsafe() {
589             return unsafe;
590         }
591 
592         @Override
593         public ChannelPipeline pipeline() {
594             return pipeline;
595         }
596 
597         @Override
598         public ByteBufAllocator alloc() {
599             return config().getAllocator();
600         }
601 
602         @Override
603         public Channel read() {
604             pipeline().read();
605             return this;
606         }
607 
608         @Override
609         public Channel flush() {
610             pipeline().flush();
611             return this;
612         }
613 
614         @Override
615         public ChannelFuture bind(SocketAddress localAddress) {
616             return pipeline().bind(localAddress);
617         }
618 
619         @Override
620         public ChannelFuture connect(SocketAddress remoteAddress) {
621             return pipeline().connect(remoteAddress);
622         }
623 
624         @Override
625         public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
626             return pipeline().connect(remoteAddress, localAddress);
627         }
628 
629         @Override
630         public ChannelFuture disconnect() {
631             return pipeline().disconnect();
632         }
633 
634         @Override
635         public ChannelFuture close() {
636             return pipeline().close();
637         }
638 
639         @Override
640         public ChannelFuture deregister() {
641             return pipeline().deregister();
642         }
643 
644         @Override
645         public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
646             return pipeline().bind(localAddress, promise);
647         }
648 
649         @Override
650         public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
651             return pipeline().connect(remoteAddress, promise);
652         }
653 
654         @Override
655         public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
656             return pipeline().connect(remoteAddress, localAddress, promise);
657         }
658 
659         @Override
660         public ChannelFuture disconnect(ChannelPromise promise) {
661             return pipeline().disconnect(promise);
662         }
663 
664         @Override
665         public ChannelFuture close(ChannelPromise promise) {
666             return pipeline().close(promise);
667         }
668 
669         @Override
670         public ChannelFuture deregister(ChannelPromise promise) {
671             return pipeline().deregister(promise);
672         }
673 
674         @Override
675         public ChannelFuture write(Object msg) {
676             return pipeline().write(msg);
677         }
678 
679         @Override
680         public ChannelFuture write(Object msg, ChannelPromise promise) {
681             return pipeline().write(msg, promise);
682         }
683 
684         @Override
685         public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
686             return pipeline().writeAndFlush(msg, promise);
687         }
688 
689         @Override
690         public ChannelFuture writeAndFlush(Object msg) {
691             return pipeline().writeAndFlush(msg);
692         }
693 
694         @Override
695         public ChannelPromise newPromise() {
696             return pipeline().newPromise();
697         }
698 
699         @Override
700         public ChannelProgressivePromise newProgressivePromise() {
701             return pipeline().newProgressivePromise();
702         }
703 
704         @Override
705         public ChannelFuture newSucceededFuture() {
706             return pipeline().newSucceededFuture();
707         }
708 
709         @Override
710         public ChannelFuture newFailedFuture(Throwable cause) {
711             return pipeline().newFailedFuture(cause);
712         }
713 
714         @Override
715         public ChannelPromise voidPromise() {
716             return pipeline().voidPromise();
717         }
718 
719         @Override
720         public int hashCode() {
721             return id().hashCode();
722         }
723 
724         @Override
725         public boolean equals(Object o) {
726             return this == o;
727         }
728 
729         @Override
730         public int compareTo(Channel o) {
731             if (this == o) {
732                 return 0;
733             }
734 
735             return id().compareTo(o.id());
736         }
737 
738         @Override
739         public String toString() {
740             return parent().toString() + "(H2 - " + stream + ')';
741         }
742 
743         void writabilityChanged(boolean writable) {
744             assert eventLoop().inEventLoop();
745             if (writable != this.writable && isActive()) {
746                 // Only notify if we received a state change.
747                 this.writable = writable;
748                 pipeline().fireChannelWritabilityChanged();
749             }
750         }
751 
752         /**
753          * Receive a read message. This does not notify handlers unless a read is in progress on the
754          * channel.
755          */
756         void fireChildRead(Http2Frame frame) {
757             assert eventLoop().inEventLoop();
758             if (!isActive()) {
759                 ReferenceCountUtil.release(frame);
760             } else if (readInProgress) {
761                 // If readInProgress there cannot be anything in the queue, otherwise we would have drained it from the
762                 // queue and processed it during the read cycle.
763                 assert inboundBuffer == null || inboundBuffer.isEmpty();
764                 final Handle allocHandle = unsafe.recvBufAllocHandle();
765                 unsafe.doRead0(frame, allocHandle);
766                 // We currently don't need to check for readEOS because the parent channel and child channel are limited
767                 // to the same EventLoop thread. There are a limited number of frame types that may come after EOS is
768                 // read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the
769                 // cost of additional readComplete notifications on the rare path.
770                 if (allocHandle.continueReading()) {
771                     tryAddChildChannelToReadPendingQueue(this);
772                 } else {
773                     tryRemoveChildChannelFromReadPendingQueue(this);
774                     unsafe.notifyReadComplete(allocHandle);
775                 }
776             } else {
777                 if (inboundBuffer == null) {
778                     inboundBuffer = new ArrayDeque<Object>(4);
779                 }
780                 inboundBuffer.add(frame);
781             }
782         }
783 
784         void fireChildReadComplete() {
785             assert eventLoop().inEventLoop();
786             assert readInProgress;
787             unsafe.notifyReadComplete(unsafe.recvBufAllocHandle());
788         }
789 
790         private final class Http2ChannelUnsafe implements Unsafe {
791             private final VoidChannelPromise unsafeVoidPromise =
792                     new VoidChannelPromise(DefaultHttp2StreamChannel.this, false);
793             @SuppressWarnings("deprecation")
794             private Handle recvHandle;
795             private boolean writeDoneAndNoFlush;
796             private boolean closeInitiated;
797             private boolean readEOS;
798 
799             @Override
800             public void connect(final SocketAddress remoteAddress,
801                                 SocketAddress localAddress, final ChannelPromise promise) {
802                 if (!promise.setUncancellable()) {
803                     return;
804                 }
805                 promise.setFailure(new UnsupportedOperationException());
806             }
807 
808             @Override
809             public Handle recvBufAllocHandle() {
810                 if (recvHandle == null) {
811                     recvHandle = config().getRecvByteBufAllocator().newHandle();
812                     recvHandle.reset(config());
813                 }
814                 return recvHandle;
815             }
816 
817             @Override
818             public SocketAddress localAddress() {
819                 return parent().unsafe().localAddress();
820             }
821 
822             @Override
823             public SocketAddress remoteAddress() {
824                 return parent().unsafe().remoteAddress();
825             }
826 
827             @Override
828             public void register(EventLoop eventLoop, ChannelPromise promise) {
829                 if (!promise.setUncancellable()) {
830                     return;
831                 }
832                 if (registered) {
833                     throw new UnsupportedOperationException("Re-register is not supported");
834                 }
835 
836                 registered = true;
837 
838                 if (!outbound) {
839                     // Add the handler to the pipeline now that we are registered.
840                     pipeline().addLast(inboundStreamHandler);
841                 }
842 
843                 promise.setSuccess();
844 
845                 pipeline().fireChannelRegistered();
846                 if (isActive()) {
847                     pipeline().fireChannelActive();
848                 }
849             }
850 
851             @Override
852             public void bind(SocketAddress localAddress, ChannelPromise promise) {
853                 if (!promise.setUncancellable()) {
854                     return;
855                 }
856                 promise.setFailure(new UnsupportedOperationException());
857             }
858 
859             @Override
860             public void disconnect(ChannelPromise promise) {
861                 close(promise);
862             }
863 
864             @Override
865             public void close(final ChannelPromise promise) {
866                 if (!promise.setUncancellable()) {
867                     return;
868                 }
869                 if (closeInitiated) {
870                     if (closePromise.isDone()) {
871                         // Closed already.
872                         promise.setSuccess();
873                     } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
874                         // This means close() was called before so we just register a listener and return
875                         closePromise.addListener(new ChannelFutureListener() {
876                             @Override
877                             public void operationComplete(ChannelFuture future) {
878                                 promise.setSuccess();
879                             }
880                         });
881                     }
882                     return;
883                 }
884                 closeInitiated = true;
885 
886                 tryRemoveChildChannelFromReadPendingQueue(DefaultHttp2StreamChannel.this);
887 
888                 final boolean wasActive = isActive();
889 
890                 // Only ever send a reset frame if the connection is still alive and if the stream may have existed
891                 // as otherwise we may send a RST on a stream in an invalid state and cause a connection error.
892                 if (parent().isActive() && !readEOS && connection().streamMayHaveExisted(stream().id())) {
893                     Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream());
894                     write(resetFrame, unsafe().voidPromise());
895                     flush();
896                 }
897 
898                 if (inboundBuffer != null) {
899                     for (;;) {
900                         Object msg = inboundBuffer.poll();
901                         if (msg == null) {
902                             break;
903                         }
904                         ReferenceCountUtil.release(msg);
905                     }
906                 }
907 
908                 // The promise should be notified before we call fireChannelInactive().
909                 outboundClosed = true;
910                 closePromise.setSuccess();
911                 promise.setSuccess();
912 
913                 fireChannelInactiveAndDeregister(voidPromise(), wasActive);
914             }
915 
916             @Override
917             public void closeForcibly() {
918                 close(unsafe().voidPromise());
919             }
920 
921             @Override
922             public void deregister(ChannelPromise promise) {
923                 fireChannelInactiveAndDeregister(promise, false);
924             }
925 
926             private void fireChannelInactiveAndDeregister(final ChannelPromise promise,
927                                                           final boolean fireChannelInactive) {
928                 if (!promise.setUncancellable()) {
929                     return;
930                 }
931 
932                 if (!registered) {
933                     promise.setSuccess();
934                     return;
935                 }
936 
937                 // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
938                 // we need to ensure we do the actual deregister operation later. This is necessary to preserve the
939                 // behavior of the AbstractChannel, which always invokes channelUnregistered and channelInactive
940                 // events 'later' to ensure the current events in the handler are completed before these events.
941                 //
942                 // See:
943                 // https://github.com/netty/netty/issues/4435
944                 invokeLater(new Runnable() {
945                     @Override
946                     public void run() {
947                         if (fireChannelInactive) {
948                             pipeline.fireChannelInactive();
949                         }
950                         // The user can fire `deregister` events multiple times but we only want to fire the pipeline
951                         // event if the channel was actually registered.
952                         if (registered) {
953                             registered = false;
954                             pipeline.fireChannelUnregistered();
955                         }
956                         safeSetSuccess(promise);
957                     }
958                 });
959             }
960 
961             private void safeSetSuccess(ChannelPromise promise) {
962                 if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
963                     logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
964                 }
965             }
966 
967             private void invokeLater(Runnable task) {
968                 try {
969                     // This method is used by outbound operation implementations to trigger an inbound event later.
970                     // They do not trigger an inbound event immediately because an outbound operation might have been
971                     // triggered by another inbound event handler method.  If fired immediately, the call stack
972                     // will look like this for example:
973                     //
974                     //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
975                     //   -> handlerA.ctx.close()
976                     //     -> channel.unsafe.close()
977                     //       -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
978                     //
979                     // which means the execution of two inbound handler methods of the same handler overlap undesirably.
980                     eventLoop().execute(task);
981                 } catch (RejectedExecutionException e) {
982                     logger.warn("Can't invoke task later as EventLoop rejected it", e);
983                 }
984             }
985 
986             @Override
987             public void beginRead() {
988                 if (readInProgress || !isActive()) {
989                     return;
990                 }
991                 readInProgress = true;
992                 doBeginRead();
993             }
994 
995             void doBeginRead() {
996                 Object message;
997                 if (inboundBuffer == null || (message = inboundBuffer.poll()) == null) {
998                     if (readEOS) {
999                         unsafe.closeForcibly();
1000                     }
1001                 } else {
1002                     final Handle allocHandle = recvBufAllocHandle();
1003                     allocHandle.reset(config());
1004                     boolean continueReading = false;
1005                     do {
1006                         doRead0((Http2Frame) message, allocHandle);
1007                     } while ((readEOS || (continueReading = allocHandle.continueReading())) &&
1008                              (message = inboundBuffer.poll()) != null);
1009 
1010                     if (continueReading && parentReadInProgress && !readEOS) {
1011                         // Currently the parent and child channel are on the same EventLoop thread. If the parent is
1012                         // currently reading it is possile that more frames will be delivered to this child channel. In
1013                         // the case that this child channel still wants to read we delay the channelReadComplete on this
1014                         // child channel until the parent is done reading.
1015                         assert !isChildChannelInReadPendingQueue(DefaultHttp2StreamChannel.this);
1016                         addChildChannelToReadPendingQueue(DefaultHttp2StreamChannel.this);
1017                     } else {
1018                         notifyReadComplete(allocHandle);
1019                     }
1020                 }
1021             }
1022 
1023             void readEOS() {
1024                 readEOS = true;
1025             }
1026 
1027             void notifyReadComplete(Handle allocHandle) {
1028                 assert next == null && previous == null;
1029                 readInProgress = false;
1030                 allocHandle.readComplete();
1031                 pipeline().fireChannelReadComplete();
1032                 // Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent
1033                 // channel is not currently reading we need to force a flush at the child channel, because we cannot
1034                 // rely upon flush occurring in channelReadComplete on the parent channel.
1035                 flush();
1036                 if (readEOS) {
1037                     unsafe.closeForcibly();
1038                 }
1039             }
1040 
1041             @SuppressWarnings("deprecation")
1042             void doRead0(Http2Frame frame, Handle allocHandle) {
1043                 pipeline().fireChannelRead(frame);
1044                 allocHandle.incMessagesRead(1);
1045 
1046                 if (frame instanceof Http2DataFrame) {
1047                     final int numBytesToBeConsumed = ((Http2DataFrame) frame).initialFlowControlledBytes();
1048                     allocHandle.attemptedBytesRead(numBytesToBeConsumed);
1049                     allocHandle.lastBytesRead(numBytesToBeConsumed);
1050                     if (numBytesToBeConsumed != 0) {
1051                         try {
1052                             writeDoneAndNoFlush |= onBytesConsumed(ctx, stream, numBytesToBeConsumed);
1053                         } catch (Http2Exception e) {
1054                             pipeline().fireExceptionCaught(e);
1055                         }
1056                     }
1057                 } else {
1058                     allocHandle.attemptedBytesRead(MIN_HTTP2_FRAME_SIZE);
1059                     allocHandle.lastBytesRead(MIN_HTTP2_FRAME_SIZE);
1060                 }
1061             }
1062 
1063             @Override
1064             public void write(Object msg, final ChannelPromise promise) {
1065                 // After this point its not possible to cancel a write anymore.
1066                 if (!promise.setUncancellable()) {
1067                     ReferenceCountUtil.release(msg);
1068                     return;
1069                 }
1070 
1071                 if (!isActive() ||
1072                         // Once the outbound side was closed we should not allow header / data frames
1073                         outboundClosed && (msg instanceof Http2HeadersFrame || msg instanceof Http2DataFrame)) {
1074                     ReferenceCountUtil.release(msg);
1075                     promise.setFailure(CLOSED_CHANNEL_EXCEPTION);
1076                     return;
1077                 }
1078 
1079                 try {
1080                     if (msg instanceof Http2StreamFrame) {
1081                         Http2StreamFrame frame = validateStreamFrame((Http2StreamFrame) msg).stream(stream());
1082                         if (!firstFrameWritten && !isStreamIdValid(stream().id())) {
1083                             if (!(frame instanceof Http2HeadersFrame)) {
1084                                 ReferenceCountUtil.release(frame);
1085                                 promise.setFailure(
1086                                         new IllegalArgumentException("The first frame must be a headers frame. Was: "
1087                                         + frame.name()));
1088                                 return;
1089                             }
1090                             firstFrameWritten = true;
1091                             ChannelFuture future = write0(frame);
1092                             if (future.isDone()) {
1093                                 firstWriteComplete(future, promise);
1094                             } else {
1095                                 future.addListener(new ChannelFutureListener() {
1096                                     @Override
1097                                     public void operationComplete(ChannelFuture future) {
1098                                         firstWriteComplete(future, promise);
1099                                     }
1100                                 });
1101                             }
1102                             return;
1103                         }
1104                     } else  {
1105                         String msgStr = msg.toString();
1106                         ReferenceCountUtil.release(msg);
1107                         promise.setFailure(new IllegalArgumentException(
1108                                 "Message must be an " + StringUtil.simpleClassName(Http2StreamFrame.class) +
1109                                         ": " + msgStr));
1110                         return;
1111                     }
1112 
1113                     ChannelFuture future = write0(msg);
1114                     if (future.isDone()) {
1115                         writeComplete(future, promise);
1116                     } else {
1117                         future.addListener(new ChannelFutureListener() {
1118                             @Override
1119                             public void operationComplete(ChannelFuture future) {
1120                                 writeComplete(future, promise);
1121                             }
1122                         });
1123                     }
1124                 } catch (Throwable t) {
1125                     promise.tryFailure(t);
1126                 } finally {
1127                     writeDoneAndNoFlush = true;
1128                 }
1129             }
1130 
1131             private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) {
1132                 Throwable cause = future.cause();
1133                 if (cause == null) {
1134                     // As we just finished our first write which made the stream-id valid we need to re-evaluate
1135                     // the writability of the channel.
1136                     writabilityChanged(Http2MultiplexCodec.this.isWritable(stream));
1137                     promise.setSuccess();
1138                 } else {
1139                     // If the first write fails there is not much we can do, just close
1140                     closeForcibly();
1141                     promise.setFailure(wrapStreamClosedError(cause));
1142                 }
1143             }
1144 
1145             private void writeComplete(ChannelFuture future, ChannelPromise promise) {
1146                 Throwable cause = future.cause();
1147                 if (cause == null) {
1148                     promise.setSuccess();
1149                 } else {
1150                     Throwable error = wrapStreamClosedError(cause);
1151                     if (error instanceof ClosedChannelException) {
1152                         if (config.isAutoClose()) {
1153                             // Close channel if needed.
1154                             closeForcibly();
1155                         } else {
1156                             outboundClosed = true;
1157                         }
1158                     }
1159                     promise.setFailure(error);
1160                 }
1161             }
1162 
1163             private Throwable wrapStreamClosedError(Throwable cause) {
1164                 // If the error was caused by STREAM_CLOSED we should use a ClosedChannelException to better
1165                 // mimic other transports and make it easier to reason about what exceptions to expect.
1166                 if (cause instanceof Http2Exception && ((Http2Exception) cause).error() == Http2Error.STREAM_CLOSED) {
1167                     return new ClosedChannelException().initCause(cause);
1168                 }
1169                 return cause;
1170             }
1171 
1172             private Http2StreamFrame validateStreamFrame(Http2StreamFrame frame) {
1173                 if (frame.stream() != null && frame.stream() != stream) {
1174                     String msgString = frame.toString();
1175                     ReferenceCountUtil.release(frame);
1176                     throw new IllegalArgumentException(
1177                             "Stream " + frame.stream() + " must not be set on the frame: " + msgString);
1178                 }
1179                 return frame;
1180             }
1181 
1182             private ChannelFuture write0(Object msg) {
1183                 ChannelPromise promise = ctx.newPromise();
1184                 Http2MultiplexCodec.this.write(ctx, msg, promise);
1185                 return promise;
1186             }
1187 
1188             @Override
1189             public void flush() {
1190                 // If we are currently in the parent channel's read loop we should just ignore the flush.
1191                 // We will ensure we trigger ctx.flush() after we processed all Channels later on and
1192                 // so aggregate the flushes. This is done as ctx.flush() is expensive when as it may trigger an
1193                 // write(...) or writev(...) operation on the socket.
1194                 if (!writeDoneAndNoFlush || parentReadInProgress) {
1195                     // There is nothing to flush so this is a NOOP.
1196                     return;
1197                 }
1198                 try {
1199                     flush0(ctx);
1200                 } finally {
1201                     writeDoneAndNoFlush = false;
1202                 }
1203             }
1204 
1205             @Override
1206             public ChannelPromise voidPromise() {
1207                 return unsafeVoidPromise;
1208             }
1209 
1210             @Override
1211             public ChannelOutboundBuffer outboundBuffer() {
1212                 // Always return null as we not use the ChannelOutboundBuffer and not even support it.
1213                 return null;
1214             }
1215         }
1216 
1217         /**
1218          * {@link ChannelConfig} so that the high and low writebuffer watermarks can reflect the outbound flow control
1219          * window, without having to create a new {@link WriteBufferWaterMark} object whenever the flow control window
1220          * changes.
1221          */
1222         private final class Http2StreamChannelConfig extends DefaultChannelConfig {
1223             Http2StreamChannelConfig(Channel channel) {
1224                 super(channel);
1225             }
1226 
1227             @Override
1228             public int getWriteBufferHighWaterMark() {
1229                 return min(parent().config().getWriteBufferHighWaterMark(), initialOutboundStreamWindow);
1230             }
1231 
1232             @Override
1233             public int getWriteBufferLowWaterMark() {
1234                 return min(parent().config().getWriteBufferLowWaterMark(), initialOutboundStreamWindow);
1235             }
1236 
1237             @Override
1238             public MessageSizeEstimator getMessageSizeEstimator() {
1239                 return FlowControlledFrameSizeEstimator.INSTANCE;
1240             }
1241 
1242             @Override
1243             public WriteBufferWaterMark getWriteBufferWaterMark() {
1244                 int mark = getWriteBufferHighWaterMark();
1245                 return new WriteBufferWaterMark(mark, mark);
1246             }
1247 
1248             @Override
1249             public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
1250                 throw new UnsupportedOperationException();
1251             }
1252 
1253             @Override
1254             @Deprecated
1255             public ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
1256                 throw new UnsupportedOperationException();
1257             }
1258 
1259             @Override
1260             @Deprecated
1261             public ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
1262                 throw new UnsupportedOperationException();
1263             }
1264 
1265             @Override
1266             public ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
1267                 throw new UnsupportedOperationException();
1268             }
1269 
1270             @Override
1271             public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
1272                 if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
1273                     throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " +
1274                             RecvByteBufAllocator.ExtendedHandle.class);
1275                 }
1276                 super.setRecvByteBufAllocator(allocator);
1277                 return this;
1278             }
1279         }
1280     }
1281 }