View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelHandler;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelId;
27  import io.netty.channel.ChannelMetadata;
28  import io.netty.channel.ChannelOutboundBuffer;
29  import io.netty.channel.ChannelPipeline;
30  import io.netty.channel.ChannelProgressivePromise;
31  import io.netty.channel.ChannelPromise;
32  import io.netty.channel.DefaultChannelConfig;
33  import io.netty.channel.DefaultChannelPipeline;
34  import io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator;
35  import io.netty.channel.EventLoop;
36  import io.netty.channel.MessageSizeEstimator;
37  import io.netty.channel.RecvByteBufAllocator;
38  import io.netty.channel.VoidChannelPromise;
39  import io.netty.channel.WriteBufferWaterMark;
40  import io.netty.util.DefaultAttributeMap;
41  import io.netty.util.ReferenceCountUtil;
42  import io.netty.util.ReferenceCounted;
43  import io.netty.util.internal.StringUtil;
44  import io.netty.util.internal.ThrowableUtil;
45  import io.netty.util.internal.UnstableApi;
46  import io.netty.util.internal.logging.InternalLogger;
47  import io.netty.util.internal.logging.InternalLoggerFactory;
48  
49  import java.net.SocketAddress;
50  import java.nio.channels.ClosedChannelException;
51  import java.util.ArrayDeque;
52  import java.util.Queue;
53  import java.util.concurrent.RejectedExecutionException;
54  
55  import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
56  import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
57  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
58  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
59  
60  import static java.lang.Math.min;
61  
62  /**
63   * An HTTP/2 handler that creates child channels for each stream.
64   *
65   * <p>When a new stream is created, a new {@link Channel} is created for it. Applications send and
66   * receive {@link Http2StreamFrame}s on the created channel. {@link ByteBuf}s cannot be processed by the channel;
67   * all writes that reach the head of the pipeline must be an instance of {@link Http2StreamFrame}. Writes that reach
68   * the head of the pipeline are processed directly by this handler and cannot be intercepted.
69   *
70   * <p>The child channel will be notified of user events that impact the stream, such as {@link
71   * Http2GoAwayFrame} and {@link Http2ResetFrame}, as soon as they occur. Although {@code
72   * Http2GoAwayFrame} and {@code Http2ResetFrame} signify that the remote is ignoring further
73   * communication, closing of the channel is delayed until any inbound queue is drained with {@link
74   * Channel#read()}, which follows the default behavior of channels in Netty. Applications are
75   * free to close the channel in response to such events if they don't have use for any queued
76   * messages. Any connection level events like {@link Http2SettingsFrame} and {@link Http2GoAwayFrame}
77   * will be processed internally and also propagated down the pipeline for other handlers to act on.
78   *
79   * <p>Outbound streams are supported via the {@link Http2StreamChannelBootstrap}.
80   *
81   * <p>{@link ChannelConfig#setMaxMessagesPerRead(int)} and {@link ChannelConfig#setAutoRead(boolean)} are supported.
82   *
83   * <h3>Reference Counting</h3>
84   *
85   * Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
86   * reference counted objects (e.g. {@link ByteBuf}s). The multiplex codec will call {@link ReferenceCounted#retain()}
87   * before propagating a reference counted object through the pipeline, and thus an application handler needs to release
88   * such an object after having consumed it. For more information on reference counting take a look at
89   * http://netty.io/wiki/reference-counted-objects.html
90   *
91   * <h3>Channel Events</h3>
92   *
93   * A child channel becomes active as soon as it is registered to an {@link EventLoop}. Therefore, an active channel
94   * does not map to an active HTTP/2 stream immediately. Only once a {@link Http2HeadersFrame} has been successfully sent
95   * or received, does the channel map to an active HTTP/2 stream. In case it is not possible to open a new HTTP/2 stream
96   * (i.e. due to the maximum number of active streams being exceeded), the child channel receives an exception
97   * indicating the cause and is closed immediately thereafter.
98   *
99   * <h3>Writability and Flow Control</h3>
100  *
101  * A child channel observes outbound/remote flow control via the channel's writability. A channel only becomes writable
102  * when it maps to an active HTTP/2 stream and the stream's flow control window is greater than zero. A child channel
103  * does not know about the connection-level flow control window. {@link ChannelHandler}s are free to ignore the
104  * channel's writability, in which case the excessive writes will be buffered by the parent channel. It's important to
105  * note that only {@link Http2DataFrame}s are subject to HTTP/2 flow control.
106  */
107 @UnstableApi
108 public class Http2MultiplexCodec extends Http2FrameCodec {
109 
110     private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2StreamChannel.class);
111 
112     private static final ChannelFutureListener CHILD_CHANNEL_REGISTRATION_LISTENER = new ChannelFutureListener() {
113         @Override
114         public void operationComplete(ChannelFuture future) throws Exception {
115             registerDone(future);
116         }
117     };
118 
119     private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
120     private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
121             new ClosedChannelException(), DefaultHttp2StreamChannel.Http2ChannelUnsafe.class, "write(...)");
122     /**
123      * Number of bytes to consider non-payload messages. 9 is arbitrary, but also the minimum size of an HTTP/2 frame.
124      * Primarily is non-zero.
125      */
126     private static final int MIN_HTTP2_FRAME_SIZE = 9;
127 
128     /**
129      * Returns the flow-control size for DATA frames, and 0 for all other frames.
130      */
131     private static final class FlowControlledFrameSizeEstimator implements MessageSizeEstimator {
132 
133         static final FlowControlledFrameSizeEstimator INSTANCE = new FlowControlledFrameSizeEstimator();
134 
135         static final MessageSizeEstimator.Handle HANDLE_INSTANCE = new MessageSizeEstimator.Handle() {
136             @Override
137             public int size(Object msg) {
138                 return msg instanceof Http2DataFrame ?
139                         // Guard against overflow.
140                         (int) min(Integer.MAX_VALUE, ((Http2DataFrame) msg).initialFlowControlledBytes() +
141                                 (long) MIN_HTTP2_FRAME_SIZE) : MIN_HTTP2_FRAME_SIZE;
142             }
143         };
144 
145         @Override
146         public Handle newHandle() {
147             return HANDLE_INSTANCE;
148         }
149     }
150 
151     private static final class Http2StreamChannelRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
152 
153         @Override
154         public MaxMessageHandle newHandle() {
155             return new MaxMessageHandle() {
156                 @Override
157                 public int guess() {
158                     return 1024;
159                 }
160             };
161         }
162     }
163 
164     private final ChannelHandler inboundStreamHandler;
165     private final ChannelHandler upgradeStreamHandler;
166 
167     private int initialOutboundStreamWindow = Http2CodecUtil.DEFAULT_WINDOW_SIZE;
168     private boolean parentReadInProgress;
169     private int idCount;
170 
171     // Linked-List for DefaultHttp2StreamChannel instances that need to be processed by channelReadComplete(...)
172     private DefaultHttp2StreamChannel head;
173     private DefaultHttp2StreamChannel tail;
174 
175     // Need to be volatile as accessed from within the DefaultHttp2StreamChannel in a multi-threaded fashion.
176     volatile ChannelHandlerContext ctx;
177 
178     Http2MultiplexCodec(Http2ConnectionEncoder encoder,
179                         Http2ConnectionDecoder decoder,
180                         Http2Settings initialSettings,
181                         ChannelHandler inboundStreamHandler,
182                         ChannelHandler upgradeStreamHandler) {
183         super(encoder, decoder, initialSettings);
184         this.inboundStreamHandler = inboundStreamHandler;
185         this.upgradeStreamHandler = upgradeStreamHandler;
186     }
187 
188     @Override
189     public void onHttpClientUpgrade() throws Http2Exception {
190         // We must have an upgrade handler or else we can't handle the stream
191         if (upgradeStreamHandler == null) {
192             throw connectionError(INTERNAL_ERROR, "Client is misconfigured for upgrade requests");
193         }
194         // Creates the Http2Stream in the Connection.
195         super.onHttpClientUpgrade();
196         // Now make a new FrameStream, set it's underlying Http2Stream, and initialize it.
197         Http2MultiplexCodecStream codecStream = newStream();
198         codecStream.setStreamAndProperty(streamKey, connection().stream(HTTP_UPGRADE_STREAM_ID));
199         onHttp2UpgradeStreamInitialized(ctx, codecStream);
200     }
201 
202     private static void registerDone(ChannelFuture future) {
203         // Handle any errors that occurred on the local thread while registering. Even though
204         // failures can happen after this point, they will be handled by the channel by closing the
205         // childChannel.
206         if (!future.isSuccess()) {
207             Channel childChannel = future.channel();
208             if (childChannel.isRegistered()) {
209                 childChannel.close();
210             } else {
211                 childChannel.unsafe().closeForcibly();
212             }
213         }
214     }
215 
216     @Override
217     public final void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
218         if (ctx.executor() != ctx.channel().eventLoop()) {
219             throw new IllegalStateException("EventExecutor must be EventLoop of Channel");
220         }
221         this.ctx = ctx;
222     }
223 
224     @Override
225     public final void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
226         super.handlerRemoved0(ctx);
227 
228         // Unlink the linked list to guard against GC nepotism.
229         DefaultHttp2StreamChannel ch = head;
230         while (ch != null) {
231             DefaultHttp2StreamChannel curr = ch;
232             ch = curr.next;
233             curr.next = null;
234         }
235         head = tail = null;
236     }
237 
238     @Override
239     Http2MultiplexCodecStream newStream() {
240         return new Http2MultiplexCodecStream();
241     }
242 
243     @Override
244     final void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
245         if (frame instanceof Http2StreamFrame) {
246             Http2StreamFrame streamFrame = (Http2StreamFrame) frame;
247             onHttp2StreamFrame(((Http2MultiplexCodecStream) streamFrame.stream()).channel, streamFrame);
248         } else if (frame instanceof Http2GoAwayFrame) {
249             onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) frame);
250             // Allow other handlers to act on GOAWAY frame
251             ctx.fireChannelRead(frame);
252         } else if (frame instanceof Http2SettingsFrame) {
253             Http2Settings settings = ((Http2SettingsFrame) frame).settings();
254             if (settings.initialWindowSize() != null) {
255                 initialOutboundStreamWindow = settings.initialWindowSize();
256             }
257             // Allow other handlers to act on SETTINGS frame
258             ctx.fireChannelRead(frame);
259         } else {
260             // Send any other frames down the pipeline
261             ctx.fireChannelRead(frame);
262         }
263     }
264 
265     private void onHttp2UpgradeStreamInitialized(ChannelHandlerContext ctx, Http2MultiplexCodecStream stream) {
266         assert stream.state() == Http2Stream.State.HALF_CLOSED_LOCAL;
267         DefaultHttp2StreamChannel ch = new DefaultHttp2StreamChannel(stream, true);
268         ch.outboundClosed = true;
269 
270         // Add our upgrade handler to the channel and then register the channel.
271         // The register call fires the channelActive, etc.
272         ch.pipeline().addLast(upgradeStreamHandler);
273         ChannelFuture future = ctx.channel().eventLoop().register(ch);
274         if (future.isDone()) {
275             registerDone(future);
276         } else {
277             future.addListener(CHILD_CHANNEL_REGISTRATION_LISTENER);
278         }
279     }
280 
281     @Override
282     final void onHttp2StreamStateChanged(ChannelHandlerContext ctx, Http2FrameStream stream) {
283         Http2MultiplexCodecStream s = (Http2MultiplexCodecStream) stream;
284 
285         switch (stream.state()) {
286             case HALF_CLOSED_REMOTE:
287             case OPEN:
288                 if (s.channel != null) {
289                     // ignore if child channel was already created.
290                     break;
291                 }
292                 // fall-trough
293                 ChannelFuture future = ctx.channel().eventLoop().register(new DefaultHttp2StreamChannel(s, false));
294                 if (future.isDone()) {
295                     registerDone(future);
296                 } else {
297                     future.addListener(CHILD_CHANNEL_REGISTRATION_LISTENER);
298                 }
299                 break;
300             case CLOSED:
301                 DefaultHttp2StreamChannel channel = s.channel;
302                 if (channel != null) {
303                     channel.streamClosed();
304                 }
305                 break;
306             default:
307                 // ignore for now
308                 break;
309         }
310     }
311 
312     @Override
313     final void onHttp2StreamWritabilityChanged(ChannelHandlerContext ctx, Http2FrameStream stream, boolean writable) {
314         (((Http2MultiplexCodecStream) stream).channel).writabilityChanged(writable);
315     }
316 
317     // TODO: This is most likely not the best way to expose this, need to think more about it.
318     final Http2StreamChannel newOutboundStream() {
319         return new DefaultHttp2StreamChannel(newStream(), true);
320     }
321 
322     @Override
323     final void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
324         Http2FrameStream stream = cause.stream();
325         DefaultHttp2StreamChannel childChannel = ((Http2MultiplexCodecStream) stream).channel;
326 
327         try {
328             childChannel.pipeline().fireExceptionCaught(cause.getCause());
329         } finally {
330             childChannel.unsafe().closeForcibly();
331         }
332     }
333 
334     private void onHttp2StreamFrame(DefaultHttp2StreamChannel childChannel, Http2StreamFrame frame) {
335         switch (childChannel.fireChildRead(frame)) {
336             case READ_PROCESSED_BUT_STOP_READING:
337                 childChannel.fireChildReadComplete();
338                 break;
339             case READ_PROCESSED_OK_TO_PROCESS_MORE:
340                 addChildChannelToReadPendingQueue(childChannel);
341                 break;
342             case READ_IGNORED_CHANNEL_INACTIVE:
343             case READ_QUEUED:
344                 // nothing to do:
345                 break;
346             default:
347                 throw new Error();
348         }
349     }
350 
351     final void addChildChannelToReadPendingQueue(DefaultHttp2StreamChannel childChannel) {
352         if (!childChannel.fireChannelReadPending) {
353             assert childChannel.next == null;
354 
355             if (tail == null) {
356                 assert head == null;
357                 tail = head = childChannel;
358             } else {
359                 tail.next = childChannel;
360                 tail = childChannel;
361             }
362             childChannel.fireChannelReadPending = true;
363         }
364     }
365 
366     private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
367         try {
368             forEachActiveStream(new Http2FrameStreamVisitor() {
369                 @Override
370                 public boolean visit(Http2FrameStream stream) {
371                     final int streamId = stream.id();
372                     final DefaultHttp2StreamChannel childChannel = ((Http2MultiplexCodecStream) stream).channel;
373                     if (streamId > goAwayFrame.lastStreamId() && connection().local().isValidStreamId(streamId)) {
374                         childChannel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
375                     }
376                     return true;
377                 }
378             });
379         } catch (Http2Exception e) {
380             ctx.fireExceptionCaught(e);
381             ctx.close();
382         }
383     }
384 
385     /**
386      * Notifies any child streams of the read completion.
387      */
388     @Override
389     public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
390         parentReadInProgress = false;
391         onChannelReadComplete(ctx);
392         channelReadComplete0(ctx);
393     }
394 
395     @Override
396     public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
397         parentReadInProgress = true;
398         super.channelRead(ctx, msg);
399     }
400 
401     final void onChannelReadComplete(ChannelHandlerContext ctx)  {
402         // If we have many child channel we can optimize for the case when multiple call flush() in
403         // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
404         // write calls on the socket which is expensive.
405         try {
406             DefaultHttp2StreamChannel current = head;
407             while (current != null) {
408                 DefaultHttp2StreamChannel childChannel = current;
409                 if (childChannel.fireChannelReadPending) {
410                     // Clear early in case fireChildReadComplete() causes it to need to be re-processed
411                     childChannel.fireChannelReadPending = false;
412                     childChannel.fireChildReadComplete();
413                 }
414                 childChannel.next = null;
415                 current = current.next;
416             }
417         } finally {
418             tail = head = null;
419 
420             // We always flush as this is what Http2ConnectionHandler does for now.
421             flush0(ctx);
422         }
423     }
424 
425     // Allow to override for testing
426     void flush0(ChannelHandlerContext ctx) {
427         flush(ctx);
428     }
429 
430     /**
431      * Return bytes to flow control.
432      * <p>
433      * Package private to allow to override for testing
434      * @param ctx The {@link ChannelHandlerContext} associated with the parent channel.
435      * @param stream The object representing the HTTP/2 stream.
436      * @param bytes The number of bytes to return to flow control.
437      * @return {@code true} if a frame has been written as a result of this method call.
438      * @throws Http2Exception If this operation violates the flow control limits.
439      */
440     boolean onBytesConsumed(@SuppressWarnings("unused") ChannelHandlerContext ctx,
441                          Http2FrameStream stream, int bytes) throws Http2Exception {
442         return consumeBytes(stream.id(), bytes);
443     }
444 
445     // Allow to extend for testing
446     static class Http2MultiplexCodecStream extends DefaultHttp2FrameStream {
447         DefaultHttp2StreamChannel channel;
448     }
449 
450     private enum ReadState {
451         READ_QUEUED,
452         READ_IGNORED_CHANNEL_INACTIVE,
453         READ_PROCESSED_BUT_STOP_READING,
454         READ_PROCESSED_OK_TO_PROCESS_MORE
455     }
456 
457     private boolean initialWritability(DefaultHttp2FrameStream stream) {
458         // If the stream id is not valid yet we will just mark the channel as writable as we will be notified
459         // about non-writability state as soon as the first Http2HeaderFrame is written (if needed).
460         // This should be good enough and simplify things a lot.
461         return !isStreamIdValid(stream.id()) || isWritable(stream);
462     }
463 
464     // TODO: Handle writability changes due writing from outside the eventloop.
465     private final class DefaultHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
466         private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
467         private final Http2ChannelUnsafe unsafe = new Http2ChannelUnsafe();
468         private final ChannelId channelId;
469         private final ChannelPipeline pipeline;
470         private final DefaultHttp2FrameStream stream;
471         private final ChannelPromise closePromise;
472         private final boolean outbound;
473 
474         private volatile boolean registered;
475         // We start with the writability of the channel when creating the StreamChannel.
476         private volatile boolean writable;
477 
478         private boolean outboundClosed;
479         private boolean closePending;
480         private boolean readInProgress;
481         private Queue<Object> inboundBuffer;
482 
483         /** {@code true} after the first HEADERS frame has been written **/
484         private boolean firstFrameWritten;
485 
486         /** {@code true} if a close without an error was initiated **/
487         private boolean streamClosedWithoutError;
488 
489         // Keeps track of flush calls in channelReadComplete(...) and aggregate these.
490         private boolean inFireChannelReadComplete;
491 
492         boolean fireChannelReadPending;
493 
494         // Holds the reference to the next DefaultHttp2StreamChannel that should be processed in
495         // channelReadComplete(...)
496         DefaultHttp2StreamChannel next;
497 
498         DefaultHttp2StreamChannel(DefaultHttp2FrameStream stream, boolean outbound) {
499             this.stream = stream;
500             this.outbound = outbound;
501             writable = initialWritability(stream);
502             ((Http2MultiplexCodecStream) stream).channel = this;
503             pipeline = new DefaultChannelPipeline(this) {
504                 @Override
505                 protected void incrementPendingOutboundBytes(long size) {
506                     // Do thing for now
507                 }
508 
509                 @Override
510                 protected void decrementPendingOutboundBytes(long size) {
511                     // Do thing for now
512                 }
513             };
514             closePromise = pipeline.newPromise();
515             channelId = new Http2StreamChannelId(parent().id(), ++idCount);
516         }
517 
518         @Override
519         public Http2FrameStream stream() {
520             return stream;
521         }
522 
523         void streamClosed() {
524             streamClosedWithoutError = true;
525             if (readInProgress) {
526                 // Just call closeForcibly() as this will take care of fireChannelInactive().
527                 unsafe().closeForcibly();
528             } else {
529                 closePending = true;
530             }
531         }
532 
533         @Override
534         public ChannelMetadata metadata() {
535             return METADATA;
536         }
537 
538         @Override
539         public ChannelConfig config() {
540             return config;
541         }
542 
543         @Override
544         public boolean isOpen() {
545             return !closePromise.isDone();
546         }
547 
548         @Override
549         public boolean isActive() {
550             return isOpen();
551         }
552 
553         @Override
554         public boolean isWritable() {
555             return writable;
556         }
557 
558         @Override
559         public ChannelId id() {
560             return channelId;
561         }
562 
563         @Override
564         public EventLoop eventLoop() {
565             return parent().eventLoop();
566         }
567 
568         @Override
569         public Channel parent() {
570             return ctx.channel();
571         }
572 
573         @Override
574         public boolean isRegistered() {
575             return registered;
576         }
577 
578         @Override
579         public SocketAddress localAddress() {
580             return parent().localAddress();
581         }
582 
583         @Override
584         public SocketAddress remoteAddress() {
585             return parent().remoteAddress();
586         }
587 
588         @Override
589         public ChannelFuture closeFuture() {
590             return closePromise;
591         }
592 
593         @Override
594         public long bytesBeforeUnwritable() {
595             // TODO: Do a proper impl
596             return config().getWriteBufferHighWaterMark();
597         }
598 
599         @Override
600         public long bytesBeforeWritable() {
601             // TODO: Do a proper impl
602             return 0;
603         }
604 
605         @Override
606         public Unsafe unsafe() {
607             return unsafe;
608         }
609 
610         @Override
611         public ChannelPipeline pipeline() {
612             return pipeline;
613         }
614 
615         @Override
616         public ByteBufAllocator alloc() {
617             return config().getAllocator();
618         }
619 
620         @Override
621         public Channel read() {
622             pipeline().read();
623             return this;
624         }
625 
626         @Override
627         public Channel flush() {
628             pipeline().flush();
629             return this;
630         }
631 
632         @Override
633         public ChannelFuture bind(SocketAddress localAddress) {
634             return pipeline().bind(localAddress);
635         }
636 
637         @Override
638         public ChannelFuture connect(SocketAddress remoteAddress) {
639             return pipeline().connect(remoteAddress);
640         }
641 
642         @Override
643         public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
644             return pipeline().connect(remoteAddress, localAddress);
645         }
646 
647         @Override
648         public ChannelFuture disconnect() {
649             return pipeline().disconnect();
650         }
651 
652         @Override
653         public ChannelFuture close() {
654             return pipeline().close();
655         }
656 
657         @Override
658         public ChannelFuture deregister() {
659             return pipeline().deregister();
660         }
661 
662         @Override
663         public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
664             return pipeline().bind(localAddress, promise);
665         }
666 
667         @Override
668         public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
669             return pipeline().connect(remoteAddress, promise);
670         }
671 
672         @Override
673         public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
674             return pipeline().connect(remoteAddress, localAddress, promise);
675         }
676 
677         @Override
678         public ChannelFuture disconnect(ChannelPromise promise) {
679             return pipeline().disconnect(promise);
680         }
681 
682         @Override
683         public ChannelFuture close(ChannelPromise promise) {
684             return pipeline().close(promise);
685         }
686 
687         @Override
688         public ChannelFuture deregister(ChannelPromise promise) {
689             return pipeline().deregister(promise);
690         }
691 
692         @Override
693         public ChannelFuture write(Object msg) {
694             return pipeline().write(msg);
695         }
696 
697         @Override
698         public ChannelFuture write(Object msg, ChannelPromise promise) {
699             return pipeline().write(msg, promise);
700         }
701 
702         @Override
703         public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
704             return pipeline().writeAndFlush(msg, promise);
705         }
706 
707         @Override
708         public ChannelFuture writeAndFlush(Object msg) {
709             return pipeline().writeAndFlush(msg);
710         }
711 
712         @Override
713         public ChannelPromise newPromise() {
714             return pipeline().newPromise();
715         }
716 
717         @Override
718         public ChannelProgressivePromise newProgressivePromise() {
719             return pipeline().newProgressivePromise();
720         }
721 
722         @Override
723         public ChannelFuture newSucceededFuture() {
724             return pipeline().newSucceededFuture();
725         }
726 
727         @Override
728         public ChannelFuture newFailedFuture(Throwable cause) {
729             return pipeline().newFailedFuture(cause);
730         }
731 
732         @Override
733         public ChannelPromise voidPromise() {
734             return pipeline().voidPromise();
735         }
736 
737         @Override
738         public int hashCode() {
739             return id().hashCode();
740         }
741 
742         @Override
743         public boolean equals(Object o) {
744             return this == o;
745         }
746 
747         @Override
748         public int compareTo(Channel o) {
749             if (this == o) {
750                 return 0;
751             }
752 
753             return id().compareTo(o.id());
754         }
755 
756         @Override
757         public String toString() {
758             return parent().toString() + "(H2 - " + stream + ')';
759         }
760 
761         void writabilityChanged(boolean writable) {
762             assert eventLoop().inEventLoop();
763             if (writable != this.writable && isActive()) {
764                 // Only notify if we received a state change.
765                 this.writable = writable;
766                 pipeline().fireChannelWritabilityChanged();
767             }
768         }
769 
770         /**
771          * Receive a read message. This does not notify handlers unless a read is in progress on the
772          * channel.
773          */
774         ReadState fireChildRead(Http2Frame frame) {
775             assert eventLoop().inEventLoop();
776             if (!isActive()) {
777                 ReferenceCountUtil.release(frame);
778                 return ReadState.READ_IGNORED_CHANNEL_INACTIVE;
779             }
780             if (readInProgress && (inboundBuffer == null || inboundBuffer.isEmpty())) {
781                 // Check for null because inboundBuffer doesn't support null; we want to be consistent
782                 // for what values are supported.
783                 RecvByteBufAllocator.ExtendedHandle allocHandle = unsafe.recvBufAllocHandle();
784                 unsafe.doRead0(frame, allocHandle);
785                 return allocHandle.continueReading() ?
786                         ReadState.READ_PROCESSED_OK_TO_PROCESS_MORE : ReadState.READ_PROCESSED_BUT_STOP_READING;
787             } else {
788                 if (inboundBuffer == null) {
789                     inboundBuffer = new ArrayDeque<Object>(4);
790                 }
791                 inboundBuffer.add(frame);
792                 return ReadState.READ_QUEUED;
793             }
794         }
795 
796         void fireChildReadComplete() {
797             assert eventLoop().inEventLoop();
798             try {
799                 if (readInProgress) {
800                     inFireChannelReadComplete = true;
801                     readInProgress = false;
802                     unsafe().recvBufAllocHandle().readComplete();
803                     pipeline().fireChannelReadComplete();
804                 }
805             } finally {
806                 inFireChannelReadComplete = false;
807             }
808         }
809 
810         private final class Http2ChannelUnsafe implements Unsafe {
811             private final VoidChannelPromise unsafeVoidPromise =
812                     new VoidChannelPromise(DefaultHttp2StreamChannel.this, false);
813             @SuppressWarnings("deprecation")
814             private RecvByteBufAllocator.ExtendedHandle recvHandle;
815             private boolean writeDoneAndNoFlush;
816             private boolean closeInitiated;
817 
818             @Override
819             public void connect(final SocketAddress remoteAddress,
820                                 SocketAddress localAddress, final ChannelPromise promise) {
821                 if (!promise.setUncancellable()) {
822                     return;
823                 }
824                 promise.setFailure(new UnsupportedOperationException());
825             }
826 
827             @Override
828             public RecvByteBufAllocator.ExtendedHandle recvBufAllocHandle() {
829                 if (recvHandle == null) {
830                     recvHandle = (RecvByteBufAllocator.ExtendedHandle) config().getRecvByteBufAllocator().newHandle();
831                 }
832                 return recvHandle;
833             }
834 
835             @Override
836             public SocketAddress localAddress() {
837                 return parent().unsafe().localAddress();
838             }
839 
840             @Override
841             public SocketAddress remoteAddress() {
842                 return parent().unsafe().remoteAddress();
843             }
844 
845             @Override
846             public void register(EventLoop eventLoop, ChannelPromise promise) {
847                 if (!promise.setUncancellable()) {
848                     return;
849                 }
850                 if (registered) {
851                     throw new UnsupportedOperationException("Re-register is not supported");
852                 }
853 
854                 registered = true;
855 
856                 if (!outbound) {
857                     // Add the handler to the pipeline now that we are registered.
858                     pipeline().addLast(inboundStreamHandler);
859                 }
860 
861                 promise.setSuccess();
862 
863                 pipeline().fireChannelRegistered();
864                 if (isActive()) {
865                     pipeline().fireChannelActive();
866                 }
867             }
868 
869             @Override
870             public void bind(SocketAddress localAddress, ChannelPromise promise) {
871                 if (!promise.setUncancellable()) {
872                     return;
873                 }
874                 promise.setFailure(new UnsupportedOperationException());
875             }
876 
877             @Override
878             public void disconnect(ChannelPromise promise) {
879                 close(promise);
880             }
881 
882             @Override
883             public void close(final ChannelPromise promise) {
884                 if (!promise.setUncancellable()) {
885                     return;
886                 }
887                 if (closeInitiated) {
888                     if (closePromise.isDone()) {
889                         // Closed already.
890                         promise.setSuccess();
891                     } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
892                         // This means close() was called before so we just register a listener and return
893                         closePromise.addListener(new ChannelFutureListener() {
894                             @Override
895                             public void operationComplete(ChannelFuture future) throws Exception {
896                                 promise.setSuccess();
897                             }
898                         });
899                     }
900                     return;
901                 }
902                 closeInitiated = true;
903 
904                 closePending = false;
905                 fireChannelReadPending = false;
906 
907                 final boolean wasActive = isActive();
908 
909                 // Only ever send a reset frame if the connection is still alive and if the stream may have existed
910                 // as otherwise we may send a RST on a stream in an invalid state and cause a connection error.
911                 if (parent().isActive() && !streamClosedWithoutError &&
912                         connection().streamMayHaveExisted(stream().id())) {
913                     Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream());
914                     write(resetFrame, unsafe().voidPromise());
915                     flush();
916                 }
917 
918                 if (inboundBuffer != null) {
919                     for (;;) {
920                         Object msg = inboundBuffer.poll();
921                         if (msg == null) {
922                             break;
923                         }
924                         ReferenceCountUtil.release(msg);
925                     }
926                 }
927 
928                 // The promise should be notified before we call fireChannelInactive().
929                 outboundClosed = true;
930                 closePromise.setSuccess();
931                 promise.setSuccess();
932 
933                 fireChannelInactiveAndDeregister(voidPromise(), wasActive);
934             }
935 
936             @Override
937             public void closeForcibly() {
938                 close(unsafe().voidPromise());
939             }
940 
941             @Override
942             public void deregister(ChannelPromise promise) {
943                 fireChannelInactiveAndDeregister(promise, false);
944             }
945 
946             private void fireChannelInactiveAndDeregister(final ChannelPromise promise,
947                                                           final boolean fireChannelInactive) {
948                 if (!promise.setUncancellable()) {
949                     return;
950                 }
951 
952                 if (!registered) {
953                     promise.setSuccess();
954                     return;
955                 }
956 
957                 // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
958                 // we need to ensure we do the actual deregister operation later. This is necessary to preserve the
959                 // behavior of the AbstractChannel, which always invokes channelUnregistered and channelInactive
960                 // events 'later' to ensure the current events in the handler are completed before these events.
961                 //
962                 // See:
963                 // https://github.com/netty/netty/issues/4435
964                 invokeLater(new Runnable() {
965                     @Override
966                     public void run() {
967                         if (fireChannelInactive) {
968                             pipeline.fireChannelInactive();
969                         }
970                         // The user can fire `deregister` events multiple times but we only want to fire the pipeline
971                         // event if the channel was actually registered.
972                         if (registered) {
973                             registered = false;
974                             pipeline.fireChannelUnregistered();
975                         }
976                         safeSetSuccess(promise);
977                     }
978                 });
979             }
980 
981             private void safeSetSuccess(ChannelPromise promise) {
982                 if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
983                     logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
984                 }
985             }
986 
987             private void invokeLater(Runnable task) {
988                 try {
989                     // This method is used by outbound operation implementations to trigger an inbound event later.
990                     // They do not trigger an inbound event immediately because an outbound operation might have been
991                     // triggered by another inbound event handler method.  If fired immediately, the call stack
992                     // will look like this for example:
993                     //
994                     //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
995                     //   -> handlerA.ctx.close()
996                     //     -> channel.unsafe.close()
997                     //       -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
998                     //
999                     // which means the execution of two inbound handler methods of the same handler overlap undesirably.
1000                     eventLoop().execute(task);
1001                 } catch (RejectedExecutionException e) {
1002                     logger.warn("Can't invoke task later as EventLoop rejected it", e);
1003                 }
1004             }
1005 
1006             @Override
1007             public void beginRead() {
1008                 if (readInProgress || !isActive()) {
1009                     return;
1010                 }
1011                 readInProgress = true;
1012 
1013                 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
1014                 allocHandle.reset(config());
1015                 if (inboundBuffer == null || inboundBuffer.isEmpty()) {
1016                     if (closePending) {
1017                         unsafe.closeForcibly();
1018                     }
1019                     return;
1020                 }
1021 
1022                 // We have already checked that the queue is not empty, so before this value is used it will always be
1023                 // set by allocHandle.continueReading().
1024                 boolean continueReading;
1025                 do {
1026                     Object m = inboundBuffer.poll();
1027                     if (m == null) {
1028                         continueReading = false;
1029                         break;
1030                     }
1031                     doRead0((Http2Frame) m, allocHandle);
1032                 } while (continueReading = allocHandle.continueReading());
1033 
1034                 if (continueReading && parentReadInProgress) {
1035                     // We don't know if more frames will be delivered in the parent channel's read loop, so add this
1036                     // channel to the channelReadComplete queue to be notified later.
1037                     addChildChannelToReadPendingQueue(DefaultHttp2StreamChannel.this);
1038                 } else {
1039                     // Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent
1040                     // channel is not currently reading we need to force a flush at the child channel, because we cannot
1041                     // rely upon flush occurring in channelReadComplete on the parent channel.
1042                     readInProgress = false;
1043                     allocHandle.readComplete();
1044                     pipeline().fireChannelReadComplete();
1045                     flush();
1046                     if (closePending) {
1047                         unsafe.closeForcibly();
1048                     }
1049                 }
1050             }
1051 
1052             @SuppressWarnings("deprecation")
1053             void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
1054                 int numBytesToBeConsumed = 0;
1055                 if (frame instanceof Http2DataFrame) {
1056                     numBytesToBeConsumed = ((Http2DataFrame) frame).initialFlowControlledBytes();
1057                     allocHandle.lastBytesRead(numBytesToBeConsumed);
1058                 } else {
1059                     allocHandle.lastBytesRead(MIN_HTTP2_FRAME_SIZE);
1060                 }
1061                 allocHandle.incMessagesRead(1);
1062                 pipeline().fireChannelRead(frame);
1063 
1064                 if (numBytesToBeConsumed != 0) {
1065                     try {
1066                         writeDoneAndNoFlush |= onBytesConsumed(ctx, stream, numBytesToBeConsumed);
1067                     } catch (Http2Exception e) {
1068                         pipeline().fireExceptionCaught(e);
1069                     }
1070                 }
1071             }
1072 
1073             @Override
1074             public void write(Object msg, final ChannelPromise promise) {
1075                 // After this point its not possible to cancel a write anymore.
1076                 if (!promise.setUncancellable()) {
1077                     ReferenceCountUtil.release(msg);
1078                     return;
1079                 }
1080 
1081                 if (!isActive() ||
1082                         // Once the outbound side was closed we should not allow header / data frames
1083                         outboundClosed && (msg instanceof Http2HeadersFrame || msg instanceof Http2DataFrame)) {
1084                     ReferenceCountUtil.release(msg);
1085                     promise.setFailure(CLOSED_CHANNEL_EXCEPTION);
1086                     return;
1087                 }
1088 
1089                 try {
1090                     if (msg instanceof Http2StreamFrame) {
1091                         Http2StreamFrame frame = validateStreamFrame((Http2StreamFrame) msg).stream(stream());
1092                         if (!firstFrameWritten && !isStreamIdValid(stream().id())) {
1093                             if (!(frame instanceof Http2HeadersFrame)) {
1094                                 ReferenceCountUtil.release(frame);
1095                                 promise.setFailure(
1096                                         new IllegalArgumentException("The first frame must be a headers frame. Was: "
1097                                         + frame.name()));
1098                                 return;
1099                             }
1100                             firstFrameWritten = true;
1101                             ChannelFuture future = write0(frame);
1102                             if (future.isDone()) {
1103                                 firstWriteComplete(future, promise);
1104                             } else {
1105                                 future.addListener(new ChannelFutureListener() {
1106                                     @Override
1107                                     public void operationComplete(ChannelFuture future) throws Exception {
1108                                         firstWriteComplete(future, promise);
1109                                     }
1110                                 });
1111                             }
1112                             return;
1113                         }
1114                     } else  {
1115                         String msgStr = msg.toString();
1116                         ReferenceCountUtil.release(msg);
1117                         promise.setFailure(new IllegalArgumentException(
1118                                 "Message must be an " + StringUtil.simpleClassName(Http2StreamFrame.class) +
1119                                         ": " + msgStr));
1120                         return;
1121                     }
1122 
1123                     ChannelFuture future = write0(msg);
1124                     if (future.isDone()) {
1125                         writeComplete(future, promise);
1126                     } else {
1127                         future.addListener(new ChannelFutureListener() {
1128                             @Override
1129                             public void operationComplete(ChannelFuture future) throws Exception {
1130                                 writeComplete(future, promise);
1131                             }
1132                         });
1133                     }
1134                 } catch (Throwable t) {
1135                     promise.tryFailure(t);
1136                 } finally {
1137                     writeDoneAndNoFlush = true;
1138                 }
1139             }
1140 
1141             private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) {
1142                 Throwable cause = future.cause();
1143                 if (cause == null) {
1144                     // As we just finished our first write which made the stream-id valid we need to re-evaluate
1145                     // the writability of the channel.
1146                     writabilityChanged(Http2MultiplexCodec.this.isWritable(stream));
1147                     promise.setSuccess();
1148                 } else {
1149                     // If the first write fails there is not much we can do, just close
1150                     closeForcibly();
1151                     promise.setFailure(wrapStreamClosedError(cause));
1152                 }
1153             }
1154 
1155             private void writeComplete(ChannelFuture future, ChannelPromise promise) {
1156                 Throwable cause = future.cause();
1157                 if (cause == null) {
1158                     promise.setSuccess();
1159                 } else {
1160                     Throwable error = wrapStreamClosedError(cause);
1161                     if (error instanceof ClosedChannelException) {
1162                         if (config.isAutoClose()) {
1163                             // Close channel if needed.
1164                             closeForcibly();
1165                         } else {
1166                             outboundClosed = true;
1167                         }
1168                     }
1169                     promise.setFailure(error);
1170                 }
1171             }
1172 
1173             private Throwable wrapStreamClosedError(Throwable cause) {
1174                 // If the error was caused by STREAM_CLOSED we should use a ClosedChannelException to better
1175                 // mimic other transports and make it easier to reason about what exceptions to expect.
1176                 if (cause instanceof Http2Exception && ((Http2Exception) cause).error() == Http2Error.STREAM_CLOSED) {
1177                     return new ClosedChannelException().initCause(cause);
1178                 }
1179                 return cause;
1180             }
1181 
1182             private Http2StreamFrame validateStreamFrame(Http2StreamFrame frame) {
1183                 if (frame.stream() != null && frame.stream() != stream) {
1184                     String msgString = frame.toString();
1185                     ReferenceCountUtil.release(frame);
1186                     throw new IllegalArgumentException(
1187                             "Stream " + frame.stream() + " must not be set on the frame: " + msgString);
1188                 }
1189                 return frame;
1190             }
1191 
1192             private ChannelFuture write0(Object msg) {
1193                 ChannelPromise promise = ctx.newPromise();
1194                 Http2MultiplexCodec.this.write(ctx, msg, promise);
1195                 return promise;
1196             }
1197 
1198             @Override
1199             public void flush() {
1200                 if (!writeDoneAndNoFlush) {
1201                     // There is nothing to flush so this is a NOOP.
1202                     return;
1203                 }
1204                 try {
1205                     // If we are currently in the  channelReadComplete(...) call we should just ignore the flush.
1206                     // We will ensure we trigger ctx.flush() after we processed all Channels later on and
1207                     // so aggregate the flushes. This is done as ctx.flush() is expensive when as it may trigger an
1208                     // write(...) or writev(...) operation on the socket.
1209                     if (!inFireChannelReadComplete) {
1210                         flush0(ctx);
1211                     }
1212                 } finally {
1213                     writeDoneAndNoFlush = false;
1214                 }
1215             }
1216 
1217             @Override
1218             public ChannelPromise voidPromise() {
1219                 return unsafeVoidPromise;
1220             }
1221 
1222             @Override
1223             public ChannelOutboundBuffer outboundBuffer() {
1224                 // Always return null as we not use the ChannelOutboundBuffer and not even support it.
1225                 return null;
1226             }
1227         }
1228 
1229         /**
1230          * {@link ChannelConfig} so that the high and low writebuffer watermarks can reflect the outbound flow control
1231          * window, without having to create a new {@link WriteBufferWaterMark} object whenever the flow control window
1232          * changes.
1233          */
1234         private final class Http2StreamChannelConfig extends DefaultChannelConfig {
1235 
1236             Http2StreamChannelConfig(Channel channel) {
1237                 super(channel);
1238                 setRecvByteBufAllocator(new Http2StreamChannelRecvByteBufAllocator());
1239             }
1240 
1241             @Override
1242             public int getWriteBufferHighWaterMark() {
1243                 return min(parent().config().getWriteBufferHighWaterMark(), initialOutboundStreamWindow);
1244             }
1245 
1246             @Override
1247             public int getWriteBufferLowWaterMark() {
1248                 return min(parent().config().getWriteBufferLowWaterMark(), initialOutboundStreamWindow);
1249             }
1250 
1251             @Override
1252             public MessageSizeEstimator getMessageSizeEstimator() {
1253                 return FlowControlledFrameSizeEstimator.INSTANCE;
1254             }
1255 
1256             @Override
1257             public WriteBufferWaterMark getWriteBufferWaterMark() {
1258                 int mark = getWriteBufferHighWaterMark();
1259                 return new WriteBufferWaterMark(mark, mark);
1260             }
1261 
1262             @Override
1263             public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
1264                 throw new UnsupportedOperationException();
1265             }
1266 
1267             @Override
1268             @Deprecated
1269             public ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
1270                 throw new UnsupportedOperationException();
1271             }
1272 
1273             @Override
1274             @Deprecated
1275             public ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
1276                 throw new UnsupportedOperationException();
1277             }
1278 
1279             @Override
1280             public ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
1281                 throw new UnsupportedOperationException();
1282             }
1283 
1284             @Override
1285             public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
1286                 if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
1287                     throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " +
1288                             RecvByteBufAllocator.ExtendedHandle.class);
1289                 }
1290                 super.setRecvByteBufAllocator(allocator);
1291                 return this;
1292             }
1293         }
1294     }
1295 }