View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelHandler;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelId;
27  import io.netty.channel.ChannelMetadata;
28  import io.netty.channel.ChannelOutboundBuffer;
29  import io.netty.channel.ChannelPipeline;
30  import io.netty.channel.ChannelProgressivePromise;
31  import io.netty.channel.ChannelPromise;
32  import io.netty.channel.DefaultChannelConfig;
33  import io.netty.channel.DefaultChannelPipeline;
34  import io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator;
35  import io.netty.channel.EventLoop;
36  import io.netty.channel.MessageSizeEstimator;
37  import io.netty.channel.RecvByteBufAllocator;
38  import io.netty.channel.VoidChannelPromise;
39  import io.netty.channel.WriteBufferWaterMark;
40  import io.netty.util.DefaultAttributeMap;
41  import io.netty.util.ReferenceCountUtil;
42  import io.netty.util.ReferenceCounted;
43  import io.netty.util.internal.StringUtil;
44  import io.netty.util.internal.ThrowableUtil;
45  import io.netty.util.internal.UnstableApi;
46  
47  import java.net.SocketAddress;
48  import java.nio.channels.ClosedChannelException;
49  import java.util.ArrayDeque;
50  import java.util.Queue;
51  
52  import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
53  import static java.lang.Math.min;
54  
55  /**
56   * An HTTP/2 handler that creates child channels for each stream.
57   *
58   * <p>When a new stream is created, a new {@link Channel} is created for it. Applications send and
59   * receive {@link Http2StreamFrame}s on the created channel. {@link ByteBuf}s cannot be processed by the channel;
60   * all writes that reach the head of the pipeline must be an instance of {@link Http2StreamFrame}. Writes that reach
61   * the head of the pipeline are processed directly by this handler and cannot be intercepted.
62   *
63   * <p>The child channel will be notified of user events that impact the stream, such as {@link
64   * Http2GoAwayFrame} and {@link Http2ResetFrame}, as soon as they occur. Although {@code
65   * Http2GoAwayFrame} and {@code Http2ResetFrame} signify that the remote is ignoring further
66   * communication, closing of the channel is delayed until any inbound queue is drained with {@link
67   * Channel#read()}, which follows the default behavior of channels in Netty. Applications are
68   * free to close the channel in response to such events if they don't have use for any queued
69   * messages. Any connection level events like {@link Http2SettingsFrame} and {@link Http2GoAwayFrame}
70   * will be processed internally and also propagated down the pipeline for other handlers to act on.
71   *
72   * <p>Outbound streams are supported via the {@link Http2StreamChannelBootstrap}.
73   *
74   * <p>{@link ChannelConfig#setMaxMessagesPerRead(int)} and {@link ChannelConfig#setAutoRead(boolean)} are supported.
75   *
76   * <h3>Reference Counting</h3>
77   *
78   * Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
79   * reference counted objects (e.g. {@link ByteBuf}s). The multiplex codec will call {@link ReferenceCounted#retain()}
80   * before propagating a reference counted object through the pipeline, and thus an application handler needs to release
81   * such an object after having consumed it. For more information on reference counting take a look at
82   * http://netty.io/wiki/reference-counted-objects.html
83   *
84   * <h3>Channel Events</h3>
85   *
86   * A child channel becomes active as soon as it is registered to an {@link EventLoop}. Therefore, an active channel
87   * does not map to an active HTTP/2 stream immediately. Only once a {@link Http2HeadersFrame} has been successfully sent
88   * or received, does the channel map to an active HTTP/2 stream. In case it is not possible to open a new HTTP/2 stream
89   * (i.e. due to the maximum number of active streams being exceeded), the child channel receives an exception
90   * indicating the cause and is closed immediately thereafter.
91   *
92   * <h3>Writability and Flow Control</h3>
93   *
94   * A child channel observes outbound/remote flow control via the channel's writability. A channel only becomes writable
95   * when it maps to an active HTTP/2 stream and the stream's flow control window is greater than zero. A child channel
96   * does not know about the connection-level flow control window. {@link ChannelHandler}s are free to ignore the
97   * channel's writability, in which case the excessive writes will be buffered by the parent channel. It's important to
98   * note that only {@link Http2DataFrame}s are subject to HTTP/2 flow control.
99   */
100 @UnstableApi
101 public class Http2MultiplexCodec extends Http2FrameCodec {
102 
103     private static final ChannelFutureListener CHILD_CHANNEL_REGISTRATION_LISTENER = new ChannelFutureListener() {
104         @Override
105         public void operationComplete(ChannelFuture future) throws Exception {
106             registerDone(future);
107         }
108     };
109 
110     private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
111     private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
112             new ClosedChannelException(), DefaultHttp2StreamChannel.Http2ChannelUnsafe.class, "write(...)");
113     /**
114      * Number of bytes to consider non-payload messages. 9 is arbitrary, but also the minimum size of an HTTP/2 frame.
115      * Primarily is non-zero.
116      */
117     private static final int MIN_HTTP2_FRAME_SIZE = 9;
118 
119     /**
120      * Returns the flow-control size for DATA frames, and 0 for all other frames.
121      */
122     private static final class FlowControlledFrameSizeEstimator implements MessageSizeEstimator {
123 
124         static final FlowControlledFrameSizeEstimator INSTANCE = new FlowControlledFrameSizeEstimator();
125 
126         static final MessageSizeEstimator.Handle HANDLE_INSTANCE = new MessageSizeEstimator.Handle() {
127             @Override
128             public int size(Object msg) {
129                 return msg instanceof Http2DataFrame ?
130                         // Guard against overflow.
131                         (int) min(Integer.MAX_VALUE, ((Http2DataFrame) msg).initialFlowControlledBytes() +
132                                 (long) MIN_HTTP2_FRAME_SIZE) : MIN_HTTP2_FRAME_SIZE;
133             }
134         };
135 
136         @Override
137         public Handle newHandle() {
138             return HANDLE_INSTANCE;
139         }
140     }
141 
142     private static final class Http2StreamChannelRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
143 
144         @Override
145         public MaxMessageHandle newHandle() {
146             return new MaxMessageHandle() {
147                 @Override
148                 public int guess() {
149                     return 1024;
150                 }
151             };
152         }
153     }
154 
155     private final ChannelHandler inboundStreamHandler;
156 
157     private int initialOutboundStreamWindow = Http2CodecUtil.DEFAULT_WINDOW_SIZE;
158     private boolean parentReadInProgress;
159     private int idCount;
160 
161     // Linked-List for DefaultHttp2StreamChannel instances that need to be processed by channelReadComplete(...)
162     private DefaultHttp2StreamChannel head;
163     private DefaultHttp2StreamChannel tail;
164 
165     // Need to be volatile as accessed from within the DefaultHttp2StreamChannel in a multi-threaded fashion.
166     volatile ChannelHandlerContext ctx;
167 
168     Http2MultiplexCodec(Http2ConnectionEncoder encoder,
169                         Http2ConnectionDecoder decoder,
170                         Http2Settings initialSettings,
171                         ChannelHandler inboundStreamHandler) {
172         super(encoder, decoder, initialSettings);
173         this.inboundStreamHandler = inboundStreamHandler;
174     }
175 
176     private static void registerDone(ChannelFuture future) {
177         // Handle any errors that occurred on the local thread while registering. Even though
178         // failures can happen after this point, they will be handled by the channel by closing the
179         // childChannel.
180         if (!future.isSuccess()) {
181             Channel childChannel = future.channel();
182             if (childChannel.isRegistered()) {
183                 childChannel.close();
184             } else {
185                 childChannel.unsafe().closeForcibly();
186             }
187         }
188     }
189 
190     @Override
191     public final void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
192         if (ctx.executor() != ctx.channel().eventLoop()) {
193             throw new IllegalStateException("EventExecutor must be EventLoop of Channel");
194         }
195         this.ctx = ctx;
196     }
197 
198     @Override
199     public final void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
200         super.handlerRemoved0(ctx);
201 
202         // Unlink the linked list to guard against GC nepotism.
203         DefaultHttp2StreamChannel ch = head;
204         while (ch != null) {
205             DefaultHttp2StreamChannel curr = ch;
206             ch = curr.next;
207             curr.next = null;
208         }
209         head = tail = null;
210     }
211 
212     @Override
213     Http2MultiplexCodecStream newStream() {
214         return new Http2MultiplexCodecStream();
215     }
216 
217     @Override
218     final void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
219         if (frame instanceof Http2StreamFrame) {
220             Http2StreamFrame streamFrame = (Http2StreamFrame) frame;
221             onHttp2StreamFrame(((Http2MultiplexCodecStream) streamFrame.stream()).channel, streamFrame);
222         } else if (frame instanceof Http2GoAwayFrame) {
223             onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) frame);
224             // Allow other handlers to act on GOAWAY frame
225             ctx.fireChannelRead(frame);
226         } else if (frame instanceof Http2SettingsFrame) {
227             Http2Settings settings = ((Http2SettingsFrame) frame).settings();
228             if (settings.initialWindowSize() != null) {
229                 initialOutboundStreamWindow = settings.initialWindowSize();
230             }
231             // Allow other handlers to act on SETTINGS frame
232             ctx.fireChannelRead(frame);
233         } else {
234             // Send any other frames down the pipeline
235             ctx.fireChannelRead(frame);
236         }
237     }
238 
239     @Override
240     final void onHttp2StreamStateChanged(ChannelHandlerContext ctx, Http2FrameStream stream) {
241         Http2MultiplexCodecStream s = (Http2MultiplexCodecStream) stream;
242 
243         switch (stream.state()) {
244             case HALF_CLOSED_REMOTE:
245             case OPEN:
246                 if (s.channel != null) {
247                     // ignore if child channel was already created.
248                     break;
249                 }
250                 // fall-trough
251                 ChannelFuture future = ctx.channel().eventLoop().register(new DefaultHttp2StreamChannel(s, false));
252                 if (future.isDone()) {
253                     registerDone(future);
254                 } else {
255                     future.addListener(CHILD_CHANNEL_REGISTRATION_LISTENER);
256                 }
257                 break;
258             case CLOSED:
259                 DefaultHttp2StreamChannel channel = s.channel;
260                 if (channel != null) {
261                     channel.streamClosed();
262                 }
263                 break;
264             default:
265                 // ignore for now
266                 break;
267         }
268     }
269 
270     @Override
271     final void onHttp2StreamWritabilityChanged(ChannelHandlerContext ctx, Http2FrameStream stream, boolean writable) {
272         (((Http2MultiplexCodecStream) stream).channel).writabilityChanged(writable);
273     }
274 
275     // TODO: This is most likely not the best way to expose this, need to think more about it.
276     final Http2StreamChannel newOutboundStream() {
277         return new DefaultHttp2StreamChannel(newStream(), true);
278     }
279 
280     @Override
281     final void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
282         Http2FrameStream stream = cause.stream();
283         DefaultHttp2StreamChannel childChannel = ((Http2MultiplexCodecStream) stream).channel;
284 
285         try {
286             childChannel.pipeline().fireExceptionCaught(cause.getCause());
287         } finally {
288             childChannel.unsafe().closeForcibly();
289         }
290     }
291 
292     private void onHttp2StreamFrame(DefaultHttp2StreamChannel childChannel, Http2StreamFrame frame) {
293         switch (childChannel.fireChildRead(frame)) {
294             case READ_PROCESSED_BUT_STOP_READING:
295                 childChannel.fireChildReadComplete();
296                 break;
297             case READ_PROCESSED_OK_TO_PROCESS_MORE:
298                 addChildChannelToReadPendingQueue(childChannel);
299                 break;
300             case READ_IGNORED_CHANNEL_INACTIVE:
301             case READ_QUEUED:
302                 // nothing to do:
303                 break;
304             default:
305                 throw new Error();
306         }
307     }
308 
309     final void addChildChannelToReadPendingQueue(DefaultHttp2StreamChannel childChannel) {
310         if (!childChannel.fireChannelReadPending) {
311             assert childChannel.next == null;
312 
313             if (tail == null) {
314                 assert head == null;
315                 tail = head = childChannel;
316             } else {
317                 tail.next = childChannel;
318                 tail = childChannel;
319             }
320             childChannel.fireChannelReadPending = true;
321         }
322     }
323 
324     private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
325         try {
326             forEachActiveStream(new Http2FrameStreamVisitor() {
327                 @Override
328                 public boolean visit(Http2FrameStream stream) {
329                     final int streamId = stream.id();
330                     final DefaultHttp2StreamChannel childChannel = ((Http2MultiplexCodecStream) stream).channel;
331                     if (streamId > goAwayFrame.lastStreamId() && connection().local().isValidStreamId(streamId)) {
332                         childChannel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
333                     }
334                     return true;
335                 }
336             });
337         } catch (Http2Exception e) {
338             ctx.fireExceptionCaught(e);
339             ctx.close();
340         } finally {
341             // We need to ensure we release the goAwayFrame.
342             goAwayFrame.release();
343         }
344     }
345 
346     /**
347      * Notifies any child streams of the read completion.
348      */
349     @Override
350     public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
351         parentReadInProgress = false;
352         onChannelReadComplete(ctx);
353         channelReadComplete0(ctx);
354     }
355 
356     @Override
357     public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
358         parentReadInProgress = true;
359         super.channelRead(ctx, msg);
360     }
361 
362     final void onChannelReadComplete(ChannelHandlerContext ctx)  {
363         // If we have many child channel we can optimize for the case when multiple call flush() in
364         // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
365         // write calls on the socket which is expensive.
366         try {
367             DefaultHttp2StreamChannel current = head;
368             while (current != null) {
369                 DefaultHttp2StreamChannel childChannel = current;
370                 if (childChannel.fireChannelReadPending) {
371                     // Clear early in case fireChildReadComplete() causes it to need to be re-processed
372                     childChannel.fireChannelReadPending = false;
373                     childChannel.fireChildReadComplete();
374                 }
375                 childChannel.next = null;
376                 current = current.next;
377             }
378         } finally {
379             tail = head = null;
380 
381             // We always flush as this is what Http2ConnectionHandler does for now.
382             flush0(ctx);
383         }
384     }
385 
386     // Allow to override for testing
387     void flush0(ChannelHandlerContext ctx) {
388         flush(ctx);
389     }
390 
391     /**
392      * Return bytes to flow control.
393      * <p>
394      * Package private to allow to override for testing
395      * @param ctx The {@link ChannelHandlerContext} associated with the parent channel.
396      * @param stream The object representing the HTTP/2 stream.
397      * @param bytes The number of bytes to return to flow control.
398      * @return {@code true} if a frame has been written as a result of this method call.
399      * @throws Http2Exception If this operation violates the flow control limits.
400      */
401     boolean onBytesConsumed(@SuppressWarnings("unused") ChannelHandlerContext ctx,
402                          Http2FrameStream stream, int bytes) throws Http2Exception {
403         return consumeBytes(stream.id(), bytes);
404     }
405 
406     // Allow to extend for testing
407     static class Http2MultiplexCodecStream extends DefaultHttp2FrameStream {
408         DefaultHttp2StreamChannel channel;
409     }
410 
411     private enum ReadState {
412         READ_QUEUED,
413         READ_IGNORED_CHANNEL_INACTIVE,
414         READ_PROCESSED_BUT_STOP_READING,
415         READ_PROCESSED_OK_TO_PROCESS_MORE
416     }
417 
418     private boolean initialWritability(DefaultHttp2FrameStream stream) {
419         // If the stream id is not valid yet we will just mark the channel as writable as we will be notified
420         // about non-writability state as soon as the first Http2HeaderFrame is written (if needed).
421         // This should be good enough and simplify things a lot.
422         return !isStreamIdValid(stream.id()) || isWritable(stream);
423     }
424 
425     // TODO: Handle writability changes due writing from outside the eventloop.
426     private final class DefaultHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
427         private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
428         private final Http2ChannelUnsafe unsafe = new Http2ChannelUnsafe();
429         private final ChannelId channelId;
430         private final ChannelPipeline pipeline;
431         private final DefaultHttp2FrameStream stream;
432         private final ChannelPromise closePromise;
433         private final boolean outbound;
434 
435         private volatile boolean registered;
436         // We start with the writability of the channel when creating the StreamChannel.
437         private volatile boolean writable;
438 
439         private boolean outboundClosed;
440         private boolean closePending;
441         private boolean readInProgress;
442         private Queue<Object> inboundBuffer;
443 
444         /** {@code true} after the first HEADERS frame has been written **/
445         private boolean firstFrameWritten;
446 
447         /** {@code true} if a close without an error was initiated **/
448         private boolean streamClosedWithoutError;
449 
450         // Keeps track of flush calls in channelReadComplete(...) and aggregate these.
451         private boolean inFireChannelReadComplete;
452 
453         boolean fireChannelReadPending;
454 
455         // Holds the reference to the next DefaultHttp2StreamChannel that should be processed in
456         // channelReadComplete(...)
457         DefaultHttp2StreamChannel next;
458 
459         DefaultHttp2StreamChannel(DefaultHttp2FrameStream stream, boolean outbound) {
460             this.stream = stream;
461             this.outbound = outbound;
462             writable = initialWritability(stream);
463             ((Http2MultiplexCodecStream) stream).channel = this;
464             pipeline = new DefaultChannelPipeline(this) {
465                 @Override
466                 protected void incrementPendingOutboundBytes(long size) {
467                     // Do thing for now
468                 }
469 
470                 @Override
471                 protected void decrementPendingOutboundBytes(long size) {
472                     // Do thing for now
473                 }
474             };
475             closePromise = pipeline.newPromise();
476             channelId = new Http2StreamChannelId(parent().id(), ++idCount);
477         }
478 
479         @Override
480         public Http2FrameStream stream() {
481             return stream;
482         }
483 
484         void streamClosed() {
485             streamClosedWithoutError = true;
486             if (readInProgress) {
487                 // Just call closeForcibly() as this will take care of fireChannelInactive().
488                 unsafe().closeForcibly();
489             } else {
490                 closePending = true;
491             }
492         }
493 
494         @Override
495         public ChannelMetadata metadata() {
496             return METADATA;
497         }
498 
499         @Override
500         public ChannelConfig config() {
501             return config;
502         }
503 
504         @Override
505         public boolean isOpen() {
506             return !closePromise.isDone();
507         }
508 
509         @Override
510         public boolean isActive() {
511             return isOpen();
512         }
513 
514         @Override
515         public boolean isWritable() {
516             return writable;
517         }
518 
519         @Override
520         public ChannelId id() {
521             return channelId;
522         }
523 
524         @Override
525         public EventLoop eventLoop() {
526             return parent().eventLoop();
527         }
528 
529         @Override
530         public Channel parent() {
531             return ctx.channel();
532         }
533 
534         @Override
535         public boolean isRegistered() {
536             return registered;
537         }
538 
539         @Override
540         public SocketAddress localAddress() {
541             return parent().localAddress();
542         }
543 
544         @Override
545         public SocketAddress remoteAddress() {
546             return parent().remoteAddress();
547         }
548 
549         @Override
550         public ChannelFuture closeFuture() {
551             return closePromise;
552         }
553 
554         @Override
555         public long bytesBeforeUnwritable() {
556             // TODO: Do a proper impl
557             return config().getWriteBufferHighWaterMark();
558         }
559 
560         @Override
561         public long bytesBeforeWritable() {
562             // TODO: Do a proper impl
563             return 0;
564         }
565 
566         @Override
567         public Unsafe unsafe() {
568             return unsafe;
569         }
570 
571         @Override
572         public ChannelPipeline pipeline() {
573             return pipeline;
574         }
575 
576         @Override
577         public ByteBufAllocator alloc() {
578             return config().getAllocator();
579         }
580 
581         @Override
582         public Channel read() {
583             pipeline().read();
584             return this;
585         }
586 
587         @Override
588         public Channel flush() {
589             pipeline().flush();
590             return this;
591         }
592 
593         @Override
594         public ChannelFuture bind(SocketAddress localAddress) {
595             return pipeline().bind(localAddress);
596         }
597 
598         @Override
599         public ChannelFuture connect(SocketAddress remoteAddress) {
600             return pipeline().connect(remoteAddress);
601         }
602 
603         @Override
604         public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
605             return pipeline().connect(remoteAddress, localAddress);
606         }
607 
608         @Override
609         public ChannelFuture disconnect() {
610             return pipeline().disconnect();
611         }
612 
613         @Override
614         public ChannelFuture close() {
615             return pipeline().close();
616         }
617 
618         @Override
619         public ChannelFuture deregister() {
620             return pipeline().deregister();
621         }
622 
623         @Override
624         public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
625             return pipeline().bind(localAddress, promise);
626         }
627 
628         @Override
629         public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
630             return pipeline().connect(remoteAddress, promise);
631         }
632 
633         @Override
634         public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
635             return pipeline().connect(remoteAddress, localAddress, promise);
636         }
637 
638         @Override
639         public ChannelFuture disconnect(ChannelPromise promise) {
640             return pipeline().disconnect(promise);
641         }
642 
643         @Override
644         public ChannelFuture close(ChannelPromise promise) {
645             return pipeline().close(promise);
646         }
647 
648         @Override
649         public ChannelFuture deregister(ChannelPromise promise) {
650             return pipeline().deregister(promise);
651         }
652 
653         @Override
654         public ChannelFuture write(Object msg) {
655             return pipeline().write(msg);
656         }
657 
658         @Override
659         public ChannelFuture write(Object msg, ChannelPromise promise) {
660             return pipeline().write(msg, promise);
661         }
662 
663         @Override
664         public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
665             return pipeline().writeAndFlush(msg, promise);
666         }
667 
668         @Override
669         public ChannelFuture writeAndFlush(Object msg) {
670             return pipeline().writeAndFlush(msg);
671         }
672 
673         @Override
674         public ChannelPromise newPromise() {
675             return pipeline().newPromise();
676         }
677 
678         @Override
679         public ChannelProgressivePromise newProgressivePromise() {
680             return pipeline().newProgressivePromise();
681         }
682 
683         @Override
684         public ChannelFuture newSucceededFuture() {
685             return pipeline().newSucceededFuture();
686         }
687 
688         @Override
689         public ChannelFuture newFailedFuture(Throwable cause) {
690             return pipeline().newFailedFuture(cause);
691         }
692 
693         @Override
694         public ChannelPromise voidPromise() {
695             return pipeline().voidPromise();
696         }
697 
698         @Override
699         public int hashCode() {
700             return id().hashCode();
701         }
702 
703         @Override
704         public boolean equals(Object o) {
705             return this == o;
706         }
707 
708         @Override
709         public int compareTo(Channel o) {
710             if (this == o) {
711                 return 0;
712             }
713 
714             return id().compareTo(o.id());
715         }
716 
717         @Override
718         public String toString() {
719             return parent().toString() + "(H2 - " + stream + ')';
720         }
721 
722         void writabilityChanged(boolean writable) {
723             assert eventLoop().inEventLoop();
724             if (writable != this.writable && isActive()) {
725                 // Only notify if we received a state change.
726                 this.writable = writable;
727                 pipeline().fireChannelWritabilityChanged();
728             }
729         }
730 
731         /**
732          * Receive a read message. This does not notify handlers unless a read is in progress on the
733          * channel.
734          */
735         ReadState fireChildRead(Http2Frame frame) {
736             assert eventLoop().inEventLoop();
737             if (!isActive()) {
738                 ReferenceCountUtil.release(frame);
739                 return ReadState.READ_IGNORED_CHANNEL_INACTIVE;
740             }
741             if (readInProgress && (inboundBuffer == null || inboundBuffer.isEmpty())) {
742                 // Check for null because inboundBuffer doesn't support null; we want to be consistent
743                 // for what values are supported.
744                 RecvByteBufAllocator.ExtendedHandle allocHandle = unsafe.recvBufAllocHandle();
745                 unsafe.doRead0(frame, allocHandle);
746                 return allocHandle.continueReading() ?
747                         ReadState.READ_PROCESSED_OK_TO_PROCESS_MORE : ReadState.READ_PROCESSED_BUT_STOP_READING;
748             } else {
749                 if (inboundBuffer == null) {
750                     inboundBuffer = new ArrayDeque<Object>(4);
751                 }
752                 inboundBuffer.add(frame);
753                 return ReadState.READ_QUEUED;
754             }
755         }
756 
757         void fireChildReadComplete() {
758             assert eventLoop().inEventLoop();
759             try {
760                 if (readInProgress) {
761                     inFireChannelReadComplete = true;
762                     readInProgress = false;
763                     unsafe().recvBufAllocHandle().readComplete();
764                     pipeline().fireChannelReadComplete();
765                 }
766             } finally {
767                 inFireChannelReadComplete = false;
768             }
769         }
770 
771         private final class Http2ChannelUnsafe implements Unsafe {
772             private final VoidChannelPromise unsafeVoidPromise =
773                     new VoidChannelPromise(DefaultHttp2StreamChannel.this, false);
774             @SuppressWarnings("deprecation")
775             private RecvByteBufAllocator.ExtendedHandle recvHandle;
776             private boolean writeDoneAndNoFlush;
777             private boolean closeInitiated;
778 
779             @Override
780             public void connect(final SocketAddress remoteAddress,
781                                 SocketAddress localAddress, final ChannelPromise promise) {
782                 if (!promise.setUncancellable()) {
783                     return;
784                 }
785                 promise.setFailure(new UnsupportedOperationException());
786             }
787 
788             @Override
789             public RecvByteBufAllocator.ExtendedHandle recvBufAllocHandle() {
790                 if (recvHandle == null) {
791                     recvHandle = (RecvByteBufAllocator.ExtendedHandle) config().getRecvByteBufAllocator().newHandle();
792                 }
793                 return recvHandle;
794             }
795 
796             @Override
797             public SocketAddress localAddress() {
798                 return parent().unsafe().localAddress();
799             }
800 
801             @Override
802             public SocketAddress remoteAddress() {
803                 return parent().unsafe().remoteAddress();
804             }
805 
806             @Override
807             public void register(EventLoop eventLoop, ChannelPromise promise) {
808                 if (!promise.setUncancellable()) {
809                     return;
810                 }
811                 if (registered) {
812                     throw new UnsupportedOperationException("Re-register is not supported");
813                 }
814 
815                 registered = true;
816 
817                 if (!outbound) {
818                     // Add the handler to the pipeline now that we are registered.
819                     pipeline().addLast(inboundStreamHandler);
820                 }
821 
822                 promise.setSuccess();
823 
824                 pipeline().fireChannelRegistered();
825                 if (isActive()) {
826                     pipeline().fireChannelActive();
827                 }
828             }
829 
830             @Override
831             public void bind(SocketAddress localAddress, ChannelPromise promise) {
832                 if (!promise.setUncancellable()) {
833                     return;
834                 }
835                 promise.setFailure(new UnsupportedOperationException());
836             }
837 
838             @Override
839             public void disconnect(ChannelPromise promise) {
840                 close(promise);
841             }
842 
843             @Override
844             public void close(final ChannelPromise promise) {
845                 if (!promise.setUncancellable()) {
846                     return;
847                 }
848                 if (closeInitiated) {
849                     if (closePromise.isDone()) {
850                         // Closed already.
851                         promise.setSuccess();
852                     } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
853                         // This means close() was called before so we just register a listener and return
854                         closePromise.addListener(new ChannelFutureListener() {
855                             @Override
856                             public void operationComplete(ChannelFuture future) throws Exception {
857                                 promise.setSuccess();
858                             }
859                         });
860                     }
861                     return;
862                 }
863                 closeInitiated = true;
864 
865                 closePending = false;
866                 fireChannelReadPending = false;
867 
868                 // Only ever send a reset frame if the connection is still alive as otherwise it makes no sense at
869                 // all anyway.
870                 if (parent().isActive() && !streamClosedWithoutError && isStreamIdValid(stream().id())) {
871                     Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream());
872                     write(resetFrame, unsafe().voidPromise());
873                     flush();
874                 }
875 
876                 if (inboundBuffer != null) {
877                     for (;;) {
878                         Object msg = inboundBuffer.poll();
879                         if (msg == null) {
880                             break;
881                         }
882                         ReferenceCountUtil.release(msg);
883                     }
884                 }
885 
886                 // The promise should be notified before we call fireChannelInactive().
887                 outboundClosed = true;
888                 closePromise.setSuccess();
889                 promise.setSuccess();
890 
891                 pipeline().fireChannelInactive();
892                 if (isRegistered()) {
893                     deregister(unsafe().voidPromise());
894                 }
895             }
896 
897             @Override
898             public void closeForcibly() {
899                 close(unsafe().voidPromise());
900             }
901 
902             @Override
903             public void deregister(ChannelPromise promise) {
904                 if (!promise.setUncancellable()) {
905                     return;
906                 }
907                 if (registered) {
908                     registered = true;
909                     promise.setSuccess();
910                     pipeline().fireChannelUnregistered();
911                 } else {
912                     promise.setFailure(new IllegalStateException("Not registered"));
913                 }
914             }
915 
916             @Override
917             public void beginRead() {
918                 if (readInProgress || !isActive()) {
919                     return;
920                 }
921                 readInProgress = true;
922 
923                 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
924                 allocHandle.reset(config());
925                 if (inboundBuffer == null || inboundBuffer.isEmpty()) {
926                     if (closePending) {
927                         unsafe.closeForcibly();
928                     }
929                     return;
930                 }
931 
932                 // We have already checked that the queue is not empty, so before this value is used it will always be
933                 // set by allocHandle.continueReading().
934                 boolean continueReading;
935                 do {
936                     Object m = inboundBuffer.poll();
937                     if (m == null) {
938                         continueReading = false;
939                         break;
940                     }
941                     doRead0((Http2Frame) m, allocHandle);
942                 } while (continueReading = allocHandle.continueReading());
943 
944                 if (continueReading && parentReadInProgress) {
945                     // We don't know if more frames will be delivered in the parent channel's read loop, so add this
946                     // channel to the channelReadComplete queue to be notified later.
947                     addChildChannelToReadPendingQueue(DefaultHttp2StreamChannel.this);
948                 } else {
949                     // Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent
950                     // channel is not currently reading we need to force a flush at the child channel, because we cannot
951                     // rely upon flush occurring in channelReadComplete on the parent channel.
952                     readInProgress = false;
953                     allocHandle.readComplete();
954                     pipeline().fireChannelReadComplete();
955                     flush();
956                     if (closePending) {
957                         unsafe.closeForcibly();
958                     }
959                 }
960             }
961 
962             @SuppressWarnings("deprecation")
963             void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
964                 int numBytesToBeConsumed = 0;
965                 if (frame instanceof Http2DataFrame) {
966                     numBytesToBeConsumed = ((Http2DataFrame) frame).initialFlowControlledBytes();
967                     allocHandle.lastBytesRead(numBytesToBeConsumed);
968                 } else {
969                     allocHandle.lastBytesRead(MIN_HTTP2_FRAME_SIZE);
970                 }
971                 allocHandle.incMessagesRead(1);
972                 pipeline().fireChannelRead(frame);
973 
974                 if (numBytesToBeConsumed != 0) {
975                     try {
976                         writeDoneAndNoFlush |= onBytesConsumed(ctx, stream, numBytesToBeConsumed);
977                     } catch (Http2Exception e) {
978                         pipeline().fireExceptionCaught(e);
979                     }
980                 }
981             }
982 
983             @Override
984             public void write(Object msg, final ChannelPromise promise) {
985                 // After this point its not possible to cancel a write anymore.
986                 if (!promise.setUncancellable()) {
987                     ReferenceCountUtil.release(msg);
988                     return;
989                 }
990 
991                 if (!isActive() ||
992                         // Once the outbound side was closed we should not allow header / data frames
993                         outboundClosed && (msg instanceof Http2HeadersFrame || msg instanceof Http2DataFrame)) {
994                     ReferenceCountUtil.release(msg);
995                     promise.setFailure(CLOSED_CHANNEL_EXCEPTION);
996                     return;
997                 }
998 
999                 try {
1000                     if (msg instanceof Http2StreamFrame) {
1001                         Http2StreamFrame frame = validateStreamFrame((Http2StreamFrame) msg).stream(stream());
1002                         if (!firstFrameWritten && !isStreamIdValid(stream().id())) {
1003                             if (!(frame instanceof Http2HeadersFrame)) {
1004                                 ReferenceCountUtil.release(frame);
1005                                 promise.setFailure(
1006                                         new IllegalArgumentException("The first frame must be a headers frame. Was: "
1007                                         + frame.name()));
1008                                 return;
1009                             }
1010                             firstFrameWritten = true;
1011                             ChannelFuture future = write0(frame);
1012                             if (future.isDone()) {
1013                                 firstWriteComplete(future, promise);
1014                             } else {
1015                                 future.addListener(new ChannelFutureListener() {
1016                                     @Override
1017                                     public void operationComplete(ChannelFuture future) throws Exception {
1018                                         firstWriteComplete(future, promise);
1019                                     }
1020                                 });
1021                             }
1022                             return;
1023                         }
1024                     } else  {
1025                         String msgStr = msg.toString();
1026                         ReferenceCountUtil.release(msg);
1027                         promise.setFailure(new IllegalArgumentException(
1028                                 "Message must be an " + StringUtil.simpleClassName(Http2StreamFrame.class) +
1029                                         ": " + msgStr));
1030                         return;
1031                     }
1032 
1033                     ChannelFuture future = write0(msg);
1034                     if (future.isDone()) {
1035                         writeComplete(future, promise);
1036                     } else {
1037                         future.addListener(new ChannelFutureListener() {
1038                             @Override
1039                             public void operationComplete(ChannelFuture future) throws Exception {
1040                                 writeComplete(future, promise);
1041                             }
1042                         });
1043                     }
1044                 } catch (Throwable t) {
1045                     promise.tryFailure(t);
1046                 } finally {
1047                     writeDoneAndNoFlush = true;
1048                 }
1049             }
1050 
1051             private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) {
1052                 Throwable cause = future.cause();
1053                 if (cause == null) {
1054                     // As we just finished our first write which made the stream-id valid we need to re-evaluate
1055                     // the writability of the channel.
1056                     writabilityChanged(Http2MultiplexCodec.this.isWritable(stream));
1057                     promise.setSuccess();
1058                 } else {
1059                     promise.setFailure(wrapStreamClosedError(cause));
1060                     // If the first write fails there is not much we can do, just close
1061                     closeForcibly();
1062                 }
1063             }
1064 
1065             private void writeComplete(ChannelFuture future, ChannelPromise promise) {
1066                 Throwable cause = future.cause();
1067                 if (cause == null) {
1068                     promise.setSuccess();
1069                 } else {
1070                     Throwable error = wrapStreamClosedError(cause);
1071                     promise.setFailure(error);
1072 
1073                     if (error instanceof ClosedChannelException) {
1074                         if (config.isAutoClose()) {
1075                             // Close channel if needed.
1076                             closeForcibly();
1077                         } else {
1078                             outboundClosed = true;
1079                         }
1080                     }
1081                 }
1082             }
1083 
1084             private Throwable wrapStreamClosedError(Throwable cause) {
1085                 // If the error was caused by STREAM_CLOSED we should use a ClosedChannelException to better
1086                 // mimic other transports and make it easier to reason about what exceptions to expect.
1087                 if (cause instanceof Http2Exception && ((Http2Exception) cause).error() == Http2Error.STREAM_CLOSED) {
1088                     return new ClosedChannelException().initCause(cause);
1089                 }
1090                 return cause;
1091             }
1092 
1093             private Http2StreamFrame validateStreamFrame(Http2StreamFrame frame) {
1094                 if (frame.stream() != null && frame.stream() != stream) {
1095                     String msgString = frame.toString();
1096                     ReferenceCountUtil.release(frame);
1097                     throw new IllegalArgumentException(
1098                             "Stream " + frame.stream() + " must not be set on the frame: " + msgString);
1099                 }
1100                 return frame;
1101             }
1102 
1103             private ChannelFuture write0(Object msg) {
1104                 ChannelPromise promise = ctx.newPromise();
1105                 Http2MultiplexCodec.this.write(ctx, msg, promise);
1106                 return promise;
1107             }
1108 
1109             @Override
1110             public void flush() {
1111                 if (!writeDoneAndNoFlush) {
1112                     // There is nothing to flush so this is a NOOP.
1113                     return;
1114                 }
1115                 try {
1116                     // If we are currently in the  channelReadComplete(...) call we should just ignore the flush.
1117                     // We will ensure we trigger ctx.flush() after we processed all Channels later on and
1118                     // so aggregate the flushes. This is done as ctx.flush() is expensive when as it may trigger an
1119                     // write(...) or writev(...) operation on the socket.
1120                     if (!inFireChannelReadComplete) {
1121                         flush0(ctx);
1122                     }
1123                 } finally {
1124                     writeDoneAndNoFlush = false;
1125                 }
1126             }
1127 
1128             @Override
1129             public ChannelPromise voidPromise() {
1130                 return unsafeVoidPromise;
1131             }
1132 
1133             @Override
1134             public ChannelOutboundBuffer outboundBuffer() {
1135                 // Always return null as we not use the ChannelOutboundBuffer and not even support it.
1136                 return null;
1137             }
1138         }
1139 
1140         /**
1141          * {@link ChannelConfig} so that the high and low writebuffer watermarks can reflect the outbound flow control
1142          * window, without having to create a new {@link WriteBufferWaterMark} object whenever the flow control window
1143          * changes.
1144          */
1145         private final class Http2StreamChannelConfig extends DefaultChannelConfig {
1146 
1147             Http2StreamChannelConfig(Channel channel) {
1148                 super(channel);
1149                 setRecvByteBufAllocator(new Http2StreamChannelRecvByteBufAllocator());
1150             }
1151 
1152             @Override
1153             public int getWriteBufferHighWaterMark() {
1154                 return min(parent().config().getWriteBufferHighWaterMark(), initialOutboundStreamWindow);
1155             }
1156 
1157             @Override
1158             public int getWriteBufferLowWaterMark() {
1159                 return min(parent().config().getWriteBufferLowWaterMark(), initialOutboundStreamWindow);
1160             }
1161 
1162             @Override
1163             public MessageSizeEstimator getMessageSizeEstimator() {
1164                 return FlowControlledFrameSizeEstimator.INSTANCE;
1165             }
1166 
1167             @Override
1168             public WriteBufferWaterMark getWriteBufferWaterMark() {
1169                 int mark = getWriteBufferHighWaterMark();
1170                 return new WriteBufferWaterMark(mark, mark);
1171             }
1172 
1173             @Override
1174             public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
1175                 throw new UnsupportedOperationException();
1176             }
1177 
1178             @Override
1179             @Deprecated
1180             public ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
1181                 throw new UnsupportedOperationException();
1182             }
1183 
1184             @Override
1185             @Deprecated
1186             public ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
1187                 throw new UnsupportedOperationException();
1188             }
1189 
1190             @Override
1191             public ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
1192                 throw new UnsupportedOperationException();
1193             }
1194 
1195             @Override
1196             public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
1197                 if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
1198                     throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " +
1199                             RecvByteBufAllocator.ExtendedHandle.class);
1200                 }
1201                 super.setRecvByteBufAllocator(allocator);
1202                 return this;
1203             }
1204         }
1205     }
1206 }