View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelHandler;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelId;
27  import io.netty.channel.ChannelMetadata;
28  import io.netty.channel.ChannelOutboundBuffer;
29  import io.netty.channel.ChannelPipeline;
30  import io.netty.channel.ChannelProgressivePromise;
31  import io.netty.channel.ChannelPromise;
32  import io.netty.channel.DefaultChannelConfig;
33  import io.netty.channel.DefaultChannelPipeline;
34  import io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator;
35  import io.netty.channel.DelegatingChannelPromiseNotifier;
36  import io.netty.channel.EventLoop;
37  import io.netty.channel.MessageSizeEstimator;
38  import io.netty.channel.RecvByteBufAllocator;
39  import io.netty.channel.VoidChannelPromise;
40  import io.netty.channel.WriteBufferWaterMark;
41  import io.netty.util.DefaultAttributeMap;
42  import io.netty.util.ReferenceCountUtil;
43  import io.netty.util.ReferenceCounted;
44  import io.netty.util.internal.StringUtil;
45  import io.netty.util.internal.ThrowableUtil;
46  import io.netty.util.internal.UnstableApi;
47  
48  import java.net.SocketAddress;
49  import java.nio.channels.ClosedChannelException;
50  import java.util.ArrayDeque;
51  import java.util.Queue;
52  
53  import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
54  import static java.lang.Math.min;
55  
56  /**
57   * An HTTP/2 handler that creates child channels for each stream.
58   *
59   * <p>When a new stream is created, a new {@link Channel} is created for it. Applications send and
60   * receive {@link Http2StreamFrame}s on the created channel. {@link ByteBuf}s cannot be processed by the channel;
61   * all writes that reach the head of the pipeline must be an instance of {@link Http2StreamFrame}. Writes that reach
62   * the head of the pipeline are processed directly by this handler and cannot be intercepted.
63   *
64   * <p>The child channel will be notified of user events that impact the stream, such as {@link
65   * Http2GoAwayFrame} and {@link Http2ResetFrame}, as soon as they occur. Although {@code
66   * Http2GoAwayFrame} and {@code Http2ResetFrame} signify that the remote is ignoring further
67   * communication, closing of the channel is delayed until any inbound queue is drained with {@link
68   * Channel#read()}, which follows the default behavior of channels in Netty. Applications are
69   * free to close the channel in response to such events if they don't have use for any queued
70   * messages.
71   *
72   * <p>Outbound streams are supported via the {@link Http2StreamChannelBootstrap}.
73   *
74   * <p>{@link ChannelConfig#setMaxMessagesPerRead(int)} and {@link ChannelConfig#setAutoRead(boolean)} are supported.
75   *
76   * <h3>Reference Counting</h3>
77   *
78   * Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
79   * reference counted objects (e.g. {@link ByteBuf}s). The multiplex codec will call {@link ReferenceCounted#retain()}
80   * before propagating a reference counted object through the pipeline, and thus an application handler needs to release
81   * such an object after having consumed it. For more information on reference counting take a look at
82   * http://netty.io/wiki/reference-counted-objects.html
83   *
84   * <h3>Channel Events</h3>
85   *
86   * A child channel becomes active as soon as it is registered to an {@link EventLoop}. Therefore, an active channel
87   * does not map to an active HTTP/2 stream immediately. Only once a {@link Http2HeadersFrame} has been successfully sent
88   * or received, does the channel map to an active HTTP/2 stream. In case it is not possible to open a new HTTP/2 stream
89   * (i.e. due to the maximum number of active streams being exceeded), the child channel receives an exception
90   * indicating the cause and is closed immediately thereafter.
91   *
92   * <h3>Writability and Flow Control</h3>
93   *
94   * A child channel observes outbound/remote flow control via the channel's writability. A channel only becomes writable
95   * when it maps to an active HTTP/2 stream and the stream's flow control window is greater than zero. A child channel
96   * does not know about the connection-level flow control window. {@link ChannelHandler}s are free to ignore the
97   * channel's writability, in which case the excessive writes will be buffered by the parent channel. It's important to
98   * note that only {@link Http2DataFrame}s are subject to HTTP/2 flow control.
99   */
100 @UnstableApi
101 public class Http2MultiplexCodec extends Http2FrameCodec {
102 
103     private static final ChannelFutureListener CHILD_CHANNEL_REGISTRATION_LISTENER = new ChannelFutureListener() {
104         @Override
105         public void operationComplete(ChannelFuture future) throws Exception {
106             registerDone(future);
107         }
108     };
109 
110     private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
111     private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
112             new ClosedChannelException(), DefaultHttp2StreamChannel.Http2ChannelUnsafe.class, "write(...)");
113     /**
114      * Number of bytes to consider non-payload messages. 9 is arbitrary, but also the minimum size of an HTTP/2 frame.
115      * Primarily is non-zero.
116      */
117     private static final int MIN_HTTP2_FRAME_SIZE = 9;
118 
119     /**
120      * Returns the flow-control size for DATA frames, and 0 for all other frames.
121      */
122     private static final class FlowControlledFrameSizeEstimator implements MessageSizeEstimator {
123 
124         static final FlowControlledFrameSizeEstimator INSTANCE = new FlowControlledFrameSizeEstimator();
125 
126         static final MessageSizeEstimator.Handle HANDLE_INSTANCE = new MessageSizeEstimator.Handle() {
127             @Override
128             public int size(Object msg) {
129                 return msg instanceof Http2DataFrame ?
130                         // Guard against overflow.
131                         (int) min(Integer.MAX_VALUE, ((Http2DataFrame) msg).initialFlowControlledBytes() +
132                                 (long) MIN_HTTP2_FRAME_SIZE) : MIN_HTTP2_FRAME_SIZE;
133             }
134         };
135 
136         @Override
137         public Handle newHandle() {
138             return HANDLE_INSTANCE;
139         }
140     }
141 
142     private static final class Http2StreamChannelRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
143 
144         @Override
145         public MaxMessageHandle newHandle() {
146             return new MaxMessageHandle() {
147                 @Override
148                 public int guess() {
149                     return 1024;
150                 }
151             };
152         }
153     }
154 
155     private final ChannelHandler inboundStreamHandler;
156 
157     private int initialOutboundStreamWindow = Http2CodecUtil.DEFAULT_WINDOW_SIZE;
158     // TODO: We may be able to optimize when we really need to call flush(...) during channelReadComplete(...)
159     // by checking if this is true and only then call flush(...).
160     private boolean flushNeeded;
161     private boolean parentReadInProgress;
162     private int idCount;
163 
164     // Linked-List for DefaultHttp2StreamChannel instances that need to be processed by channelReadComplete(...)
165     private DefaultHttp2StreamChannel head;
166     private DefaultHttp2StreamChannel tail;
167 
168     // Need to be volatile as accessed from within the DefaultHttp2StreamChannel in a multi-threaded fashion.
169     volatile ChannelHandlerContext ctx;
170 
171     Http2MultiplexCodec(Http2ConnectionEncoder encoder, Http2ConnectionDecoder decoder, Http2Settings initialSettings,
172                     ChannelHandler inboundStreamHandler) {
173         super(encoder, decoder, initialSettings);
174         this.inboundStreamHandler = inboundStreamHandler;
175     }
176 
177     private static void registerDone(ChannelFuture future) {
178         // Handle any errors that occurred on the local thread while registering. Even though
179         // failures can happen after this point, they will be handled by the channel by closing the
180         // childChannel.
181         if (!future.isSuccess()) {
182             Channel childChannel = future.channel();
183             if (childChannel.isRegistered()) {
184                 childChannel.close();
185             } else {
186                 childChannel.unsafe().closeForcibly();
187             }
188         }
189     }
190 
191     @Override
192     public final void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
193         if (ctx.executor() != ctx.channel().eventLoop()) {
194             throw new IllegalStateException("EventExecutor must be EventLoop of Channel");
195         }
196         this.ctx = ctx;
197     }
198 
199     @Override
200     public final void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
201         super.handlerRemoved0(ctx);
202 
203         // Unlink the linked list to guard against GC nepotism.
204         DefaultHttp2StreamChannel ch = head;
205         while (ch != null) {
206             DefaultHttp2StreamChannel curr = ch;
207             ch = curr.next;
208             curr.next = null;
209         }
210         head = tail = null;
211     }
212 
213     @Override
214     Http2MultiplexCodecStream newStream() {
215         return new Http2MultiplexCodecStream();
216     }
217 
218     @Override
219     final void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
220         if (frame instanceof Http2StreamFrame) {
221             Http2StreamFrame streamFrame = (Http2StreamFrame) frame;
222             onHttp2StreamFrame(((Http2MultiplexCodecStream) streamFrame.stream()).channel, streamFrame);
223         } else if (frame instanceof Http2GoAwayFrame) {
224             onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) frame);
225         } else if (frame instanceof Http2SettingsFrame) {
226             Http2Settings settings = ((Http2SettingsFrame) frame).settings();
227             if (settings.initialWindowSize() != null) {
228                 initialOutboundStreamWindow = settings.initialWindowSize();
229             }
230         }
231     }
232 
233     @Override
234     final void onHttp2StreamStateChanged(ChannelHandlerContext ctx, Http2FrameStream stream) {
235         Http2MultiplexCodecStream s = (Http2MultiplexCodecStream) stream;
236 
237         switch (stream.state()) {
238             case HALF_CLOSED_REMOTE:
239             case OPEN:
240                 if (s.channel != null) {
241                     // ignore if child channel was already created.
242                     break;
243                 }
244                 // fall-trough
245                 ChannelFuture future = ctx.channel().eventLoop().register(new DefaultHttp2StreamChannel(s, false));
246                 if (future.isDone()) {
247                     registerDone(future);
248                 } else {
249                     future.addListener(CHILD_CHANNEL_REGISTRATION_LISTENER);
250                 }
251                 break;
252             case CLOSED:
253                 DefaultHttp2StreamChannel channel = s.channel;
254                 if (channel != null) {
255                     channel.streamClosed();
256                 }
257                 break;
258             default:
259                 // ignore for now
260                 break;
261         }
262     }
263 
264     @Override
265     final void onHttp2StreamWritabilityChanged(ChannelHandlerContext ctx, Http2FrameStream stream, boolean writable) {
266         (((Http2MultiplexCodecStream) stream).channel).writabilityChanged(writable);
267     }
268 
269     // TODO: This is most likely not the best way to expose this, need to think more about it.
270     final Http2StreamChannel newOutboundStream() {
271         return new DefaultHttp2StreamChannel(newStream(), true);
272     }
273 
274     @Override
275     final void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
276         Http2FrameStream stream = cause.stream();
277         DefaultHttp2StreamChannel childChannel = ((Http2MultiplexCodecStream) stream).channel;
278 
279         try {
280             childChannel.pipeline().fireExceptionCaught(cause.getCause());
281         } finally {
282             childChannel.unsafe().closeForcibly();
283         }
284     }
285 
286     private void onHttp2StreamFrame(DefaultHttp2StreamChannel childChannel, Http2StreamFrame frame) {
287         switch (childChannel.fireChildRead(frame)) {
288             case READ_PROCESSED_BUT_STOP_READING:
289                 childChannel.fireChildReadComplete();
290                 break;
291             case READ_PROCESSED_OK_TO_PROCESS_MORE:
292                 addChildChannelToReadPendingQueue(childChannel);
293                 break;
294             case READ_IGNORED_CHANNEL_INACTIVE:
295             case READ_QUEUED:
296                 // nothing to do:
297                 break;
298             default:
299                 throw new Error();
300         }
301     }
302 
303     final void addChildChannelToReadPendingQueue(DefaultHttp2StreamChannel childChannel) {
304         if (!childChannel.fireChannelReadPending) {
305             assert childChannel.next == null;
306 
307             if (tail == null) {
308                 assert head == null;
309                 tail = head = childChannel;
310             } else {
311                 tail.next = childChannel;
312                 tail = childChannel;
313             }
314             childChannel.fireChannelReadPending = true;
315         }
316     }
317 
318     private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
319         try {
320             forEachActiveStream(new Http2FrameStreamVisitor() {
321                 @Override
322                 public boolean visit(Http2FrameStream stream) {
323                     final int streamId = stream.id();
324                     final DefaultHttp2StreamChannel childChannel = ((Http2MultiplexCodecStream) stream).channel;
325                     if (streamId > goAwayFrame.lastStreamId() && connection().local().isValidStreamId(streamId)) {
326                         childChannel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
327                     }
328                     return true;
329                 }
330             });
331         } catch (Http2Exception e) {
332             ctx.fireExceptionCaught(e);
333             ctx.close();
334         } finally {
335             // We need to ensure we release the goAwayFrame.
336             goAwayFrame.release();
337         }
338     }
339 
340     /**
341      * Notifies any child streams of the read completion.
342      */
343     @Override
344     public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
345         parentReadInProgress = false;
346         onChannelReadComplete(ctx);
347         channelReadComplete0(ctx);
348     }
349 
350     @Override
351     public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
352         parentReadInProgress = true;
353         super.channelRead(ctx, msg);
354     }
355 
356     final void onChannelReadComplete(ChannelHandlerContext ctx)  {
357         // If we have many child channel we can optimize for the case when multiple call flush() in
358         // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
359         // write calls on the socket which is expensive.
360         try {
361             DefaultHttp2StreamChannel current = head;
362             while (current != null) {
363                 DefaultHttp2StreamChannel childChannel = current;
364                 if (childChannel.fireChannelReadPending) {
365                     // Clear early in case fireChildReadComplete() causes it to need to be re-processed
366                     childChannel.fireChannelReadPending = false;
367                     childChannel.fireChildReadComplete();
368                 }
369                 childChannel.next = null;
370                 current = current.next;
371             }
372         } finally {
373             tail = head = null;
374 
375             // We always flush as this is what Http2ConnectionHandler does for now.
376             // TODO: I think this is not really necessary and we should be able to optimize this in the future by
377             // checking flushNeeded and only flush if this returns true.
378             flush0(ctx);
379         }
380     }
381 
382     @Override
383     public final void flush(ChannelHandlerContext ctx) {
384         flushNeeded = false;
385         super.flush(ctx);
386     }
387 
388     // Allow to override for testing
389     void flush0(ChannelHandlerContext ctx) {
390         flush(ctx);
391     }
392 
393     /**
394      * Return bytes to flow control.
395      * <p>
396      * Package private to allow to override for testing
397      * @param ctx The {@link ChannelHandlerContext} associated with the parent channel.
398      * @param stream The object representing the HTTP/2 stream.
399      * @param bytes The number of bytes to return to flow control.
400      * @return {@code true} if a frame has been written as a result of this method call.
401      * @throws Http2Exception If this operation violates the flow control limits.
402      */
403     boolean onBytesConsumed(@SuppressWarnings("unused") ChannelHandlerContext ctx,
404                          Http2FrameStream stream, int bytes) throws Http2Exception {
405         return consumeBytes(stream.id(), bytes);
406     }
407 
408     // Allow to extend for testing
409     static class Http2MultiplexCodecStream extends DefaultHttp2FrameStream {
410         DefaultHttp2StreamChannel channel;
411     }
412 
413     private enum ReadState {
414         READ_QUEUED,
415         READ_IGNORED_CHANNEL_INACTIVE,
416         READ_PROCESSED_BUT_STOP_READING,
417         READ_PROCESSED_OK_TO_PROCESS_MORE
418     }
419 
420     private boolean initialWritability(DefaultHttp2FrameStream stream) {
421         // If the stream id is not valid yet we will just mark the channel as writable as we will be notified
422         // about non-writability state as soon as the first Http2HeaderFrame is written (if needed).
423         // This should be good enough and simplify things a lot.
424         return !isStreamIdValid(stream.id()) || isWritable(stream);
425     }
426 
427     // TODO: Handle writability changes due writing from outside the eventloop.
428     private final class DefaultHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
429         private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
430         private final Http2ChannelUnsafe unsafe = new Http2ChannelUnsafe();
431         private final ChannelId channelId;
432         private final ChannelPipeline pipeline;
433         private final DefaultHttp2FrameStream stream;
434         private final ChannelPromise closePromise;
435         private final boolean outbound;
436 
437         private volatile boolean registered;
438         // We start with the writability of the channel when creating the StreamChannel.
439         private volatile boolean writable;
440 
441         private boolean closePending;
442         private boolean readInProgress;
443         private Queue<Object> inboundBuffer;
444 
445         /** {@code true} after the first HEADERS frame has been written **/
446         private boolean firstFrameWritten;
447 
448         /** {@code true} if a close without an error was initiated **/
449         private boolean streamClosedWithoutError;
450 
451         // Keeps track of flush calls in channelReadComplete(...) and aggregate these.
452         private boolean inFireChannelReadComplete;
453         private boolean flushPending;
454 
455         boolean fireChannelReadPending;
456 
457         // Holds the reference to the next DefaultHttp2StreamChannel that should be processed in
458         // channelReadComplete(...)
459         DefaultHttp2StreamChannel next;
460 
461         DefaultHttp2StreamChannel(DefaultHttp2FrameStream stream, boolean outbound) {
462             this.stream = stream;
463             this.outbound = outbound;
464             writable = initialWritability(stream);
465             ((Http2MultiplexCodecStream) stream).channel = this;
466             pipeline = new DefaultChannelPipeline(this) {
467                 @Override
468                 protected void incrementPendingOutboundBytes(long size) {
469                     // Do thing for now
470                 }
471 
472                 @Override
473                 protected void decrementPendingOutboundBytes(long size) {
474                     // Do thing for now
475                 }
476             };
477             closePromise = pipeline.newPromise();
478             channelId = new Http2StreamChannelId(parent().id(), ++idCount);
479         }
480 
481         @Override
482         public Http2FrameStream stream() {
483             return stream;
484         }
485 
486         void streamClosed() {
487             streamClosedWithoutError = true;
488             if (readInProgress) {
489                 // Just call closeForcibly() as this will take care of fireChannelInactive().
490                 unsafe().closeForcibly();
491             } else {
492                 closePending = true;
493             }
494         }
495 
496         @Override
497         public ChannelMetadata metadata() {
498             return METADATA;
499         }
500 
501         @Override
502         public ChannelConfig config() {
503             return config;
504         }
505 
506         @Override
507         public boolean isOpen() {
508             return !closePromise.isDone();
509         }
510 
511         @Override
512         public boolean isActive() {
513             return isOpen();
514         }
515 
516         @Override
517         public boolean isWritable() {
518             return writable;
519         }
520 
521         @Override
522         public ChannelId id() {
523             return channelId;
524         }
525 
526         @Override
527         public EventLoop eventLoop() {
528             return parent().eventLoop();
529         }
530 
531         @Override
532         public Channel parent() {
533             return ctx.channel();
534         }
535 
536         @Override
537         public boolean isRegistered() {
538             return registered;
539         }
540 
541         @Override
542         public SocketAddress localAddress() {
543             return parent().localAddress();
544         }
545 
546         @Override
547         public SocketAddress remoteAddress() {
548             return parent().remoteAddress();
549         }
550 
551         @Override
552         public ChannelFuture closeFuture() {
553             return closePromise;
554         }
555 
556         @Override
557         public long bytesBeforeUnwritable() {
558             // TODO: Do a proper impl
559             return config().getWriteBufferHighWaterMark();
560         }
561 
562         @Override
563         public long bytesBeforeWritable() {
564             // TODO: Do a proper impl
565             return 0;
566         }
567 
568         @Override
569         public Unsafe unsafe() {
570             return unsafe;
571         }
572 
573         @Override
574         public ChannelPipeline pipeline() {
575             return pipeline;
576         }
577 
578         @Override
579         public ByteBufAllocator alloc() {
580             return config().getAllocator();
581         }
582 
583         @Override
584         public Channel read() {
585             pipeline().read();
586             return this;
587         }
588 
589         @Override
590         public Channel flush() {
591             pipeline().flush();
592             return this;
593         }
594 
595         @Override
596         public ChannelFuture bind(SocketAddress localAddress) {
597             return pipeline().bind(localAddress);
598         }
599 
600         @Override
601         public ChannelFuture connect(SocketAddress remoteAddress) {
602             return pipeline().connect(remoteAddress);
603         }
604 
605         @Override
606         public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
607             return pipeline().connect(remoteAddress, localAddress);
608         }
609 
610         @Override
611         public ChannelFuture disconnect() {
612             return pipeline().disconnect();
613         }
614 
615         @Override
616         public ChannelFuture close() {
617             return pipeline().close();
618         }
619 
620         @Override
621         public ChannelFuture deregister() {
622             return pipeline().deregister();
623         }
624 
625         @Override
626         public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
627             return pipeline().bind(localAddress, promise);
628         }
629 
630         @Override
631         public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
632             return pipeline().connect(remoteAddress, promise);
633         }
634 
635         @Override
636         public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
637             return pipeline().connect(remoteAddress, localAddress, promise);
638         }
639 
640         @Override
641         public ChannelFuture disconnect(ChannelPromise promise) {
642             return pipeline().disconnect(promise);
643         }
644 
645         @Override
646         public ChannelFuture close(ChannelPromise promise) {
647             return pipeline().close(promise);
648         }
649 
650         @Override
651         public ChannelFuture deregister(ChannelPromise promise) {
652             return pipeline().deregister(promise);
653         }
654 
655         @Override
656         public ChannelFuture write(Object msg) {
657             return pipeline().write(msg);
658         }
659 
660         @Override
661         public ChannelFuture write(Object msg, ChannelPromise promise) {
662             return pipeline().write(msg, promise);
663         }
664 
665         @Override
666         public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
667             return pipeline().writeAndFlush(msg, promise);
668         }
669 
670         @Override
671         public ChannelFuture writeAndFlush(Object msg) {
672             return pipeline().writeAndFlush(msg);
673         }
674 
675         @Override
676         public ChannelPromise newPromise() {
677             return pipeline().newPromise();
678         }
679 
680         @Override
681         public ChannelProgressivePromise newProgressivePromise() {
682             return pipeline().newProgressivePromise();
683         }
684 
685         @Override
686         public ChannelFuture newSucceededFuture() {
687             return pipeline().newSucceededFuture();
688         }
689 
690         @Override
691         public ChannelFuture newFailedFuture(Throwable cause) {
692             return pipeline().newFailedFuture(cause);
693         }
694 
695         @Override
696         public ChannelPromise voidPromise() {
697             return pipeline().voidPromise();
698         }
699 
700         @Override
701         public int hashCode() {
702             return id().hashCode();
703         }
704 
705         @Override
706         public boolean equals(Object o) {
707             return this == o;
708         }
709 
710         @Override
711         public int compareTo(Channel o) {
712             if (this == o) {
713                 return 0;
714             }
715 
716             return id().compareTo(o.id());
717         }
718 
719         @Override
720         public String toString() {
721             return parent().toString() + "(H2 - " + stream + ')';
722         }
723 
724         void writabilityChanged(boolean writable) {
725             assert eventLoop().inEventLoop();
726             if (writable != this.writable && isActive()) {
727                 // Only notify if we received a state change.
728                 this.writable = writable;
729                 pipeline().fireChannelWritabilityChanged();
730             }
731         }
732 
733         /**
734          * Receive a read message. This does not notify handlers unless a read is in progress on the
735          * channel.
736          */
737         ReadState fireChildRead(Http2Frame frame) {
738             assert eventLoop().inEventLoop();
739             if (!isActive()) {
740                 ReferenceCountUtil.release(frame);
741                 return ReadState.READ_IGNORED_CHANNEL_INACTIVE;
742             }
743             if (readInProgress && (inboundBuffer == null || inboundBuffer.isEmpty())) {
744                 // Check for null because inboundBuffer doesn't support null; we want to be consistent
745                 // for what values are supported.
746                 RecvByteBufAllocator.ExtendedHandle allocHandle = unsafe.recvBufAllocHandle();
747                 unsafe.doRead0(frame, allocHandle);
748                 return allocHandle.continueReading() ?
749                         ReadState.READ_PROCESSED_OK_TO_PROCESS_MORE : ReadState.READ_PROCESSED_BUT_STOP_READING;
750             } else {
751                 if (inboundBuffer == null) {
752                     inboundBuffer = new ArrayDeque<Object>(4);
753                 }
754                 inboundBuffer.add(frame);
755                 return ReadState.READ_QUEUED;
756             }
757         }
758 
759         void fireChildReadComplete() {
760             assert eventLoop().inEventLoop();
761             try {
762                 if (readInProgress) {
763                     inFireChannelReadComplete = true;
764                     readInProgress = false;
765                     unsafe().recvBufAllocHandle().readComplete();
766                     pipeline().fireChannelReadComplete();
767                 }
768                 flushNeeded |= flushPending;
769             } finally {
770                 inFireChannelReadComplete = false;
771                 flushPending = false;
772             }
773         }
774 
775         private final class Http2ChannelUnsafe implements Unsafe {
776             private final VoidChannelPromise unsafeVoidPromise =
777                     new VoidChannelPromise(DefaultHttp2StreamChannel.this, false);
778             @SuppressWarnings("deprecation")
779             private RecvByteBufAllocator.ExtendedHandle recvHandle;
780             private boolean writeDoneAndNoFlush;
781             private ChannelPromise pendingClosePromise;
782 
783             @Override
784             public void connect(final SocketAddress remoteAddress,
785                                 SocketAddress localAddress, final ChannelPromise promise) {
786                 if (!promise.setUncancellable()) {
787                     return;
788                 }
789                 promise.setFailure(new UnsupportedOperationException());
790             }
791 
792             @Override
793             public RecvByteBufAllocator.ExtendedHandle recvBufAllocHandle() {
794                 if (recvHandle == null) {
795                     recvHandle = (RecvByteBufAllocator.ExtendedHandle) config().getRecvByteBufAllocator().newHandle();
796                 }
797                 return recvHandle;
798             }
799 
800             @Override
801             public SocketAddress localAddress() {
802                 return parent().unsafe().localAddress();
803             }
804 
805             @Override
806             public SocketAddress remoteAddress() {
807                 return parent().unsafe().remoteAddress();
808             }
809 
810             @Override
811             public void register(EventLoop eventLoop, ChannelPromise promise) {
812                 if (!promise.setUncancellable()) {
813                     return;
814                 }
815                 if (registered) {
816                     throw new UnsupportedOperationException("Re-register is not supported");
817                 }
818 
819                 registered = true;
820 
821                 if (!outbound) {
822                     // Add the handler to the pipeline now that we are registered.
823                     pipeline().addLast(inboundStreamHandler);
824                 }
825 
826                 promise.setSuccess();
827 
828                 pipeline().fireChannelRegistered();
829                 if (isActive()) {
830                     pipeline().fireChannelActive();
831                 }
832             }
833 
834             @Override
835             public void bind(SocketAddress localAddress, ChannelPromise promise) {
836                 if (!promise.setUncancellable()) {
837                     return;
838                 }
839                 promise.setFailure(new UnsupportedOperationException());
840             }
841 
842             @Override
843             public void disconnect(ChannelPromise promise) {
844                 if (!promise.setUncancellable()) {
845                     return;
846                 }
847                 close(promise);
848             }
849 
850             @Override
851             public void close(ChannelPromise promise) {
852                 if (!promise.setUncancellable()) {
853                     return;
854                 }
855                 if (closePromise.isDone()) {
856                     promise.setFailure(new ClosedChannelException());
857                     return;
858                 }
859                 if (pendingClosePromise != null) {
860                     pendingClosePromise.addListener(new DelegatingChannelPromiseNotifier(promise));
861                     return;
862                 }
863                 pendingClosePromise = promise;
864                 try {
865                     closePending = false;
866                     fireChannelReadPending = false;
867 
868                     // Only ever send a reset frame if the connection is still alive as otherwise it makes no sense at
869                     // all anyway.
870                     if (parent().isActive() && !streamClosedWithoutError && isStreamIdValid(stream().id())) {
871                         Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream());
872                         write(resetFrame, unsafe().voidPromise());
873                         flush();
874                     }
875 
876                     if (inboundBuffer != null) {
877                         for (;;) {
878                             Object msg = inboundBuffer.poll();
879                             if (msg == null) {
880                                 break;
881                             }
882                             ReferenceCountUtil.release(msg);
883                         }
884                     }
885 
886                     pipeline().fireChannelInactive();
887                     if (isRegistered()) {
888                         deregister(unsafe().voidPromise());
889                     }
890                     promise.setSuccess();
891                     closePromise.setSuccess();
892                 } finally {
893                     pendingClosePromise = null;
894                 }
895             }
896 
897             @Override
898             public void closeForcibly() {
899                 close(unsafe().voidPromise());
900             }
901 
902             @Override
903             public void deregister(ChannelPromise promise) {
904                 if (!promise.setUncancellable()) {
905                     return;
906                 }
907                 if (registered) {
908                     registered = true;
909                     promise.setSuccess();
910                     pipeline().fireChannelUnregistered();
911                 } else {
912                     promise.setFailure(new IllegalStateException("Not registered"));
913                 }
914             }
915 
916             @Override
917             public void beginRead() {
918                 if (readInProgress || !isActive()) {
919                     return;
920                 }
921                 readInProgress = true;
922 
923                 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
924                 allocHandle.reset(config());
925                 if (inboundBuffer == null || inboundBuffer.isEmpty()) {
926                     if (closePending) {
927                         unsafe.closeForcibly();
928                     }
929                     return;
930                 }
931 
932                 // We have already checked that the queue is not empty, so before this value is used it will always be
933                 // set by allocHandle.continueReading().
934                 boolean continueReading;
935                 do {
936                     Object m = inboundBuffer.poll();
937                     if (m == null) {
938                         continueReading = false;
939                         break;
940                     }
941                     doRead0((Http2Frame) m, allocHandle);
942                 } while (continueReading = allocHandle.continueReading());
943 
944                 if (continueReading && parentReadInProgress) {
945                     // We don't know if more frames will be delivered in the parent channel's read loop, so add this
946                     // channel to the channelReadComplete queue to be notified later.
947                     addChildChannelToReadPendingQueue(DefaultHttp2StreamChannel.this);
948                 } else {
949                     // Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent
950                     // channel is not currently reading we need to force a flush at the child channel, because we cannot
951                     // rely upon flush occurring in channelReadComplete on the parent channel.
952                     readInProgress = false;
953                     allocHandle.readComplete();
954                     pipeline().fireChannelReadComplete();
955                     flush();
956                     if (closePending) {
957                         unsafe.closeForcibly();
958                     }
959                 }
960             }
961 
962             @SuppressWarnings("deprecation")
963             void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
964                 int numBytesToBeConsumed = 0;
965                 if (frame instanceof Http2DataFrame) {
966                     numBytesToBeConsumed = ((Http2DataFrame) frame).initialFlowControlledBytes();
967                     allocHandle.lastBytesRead(numBytesToBeConsumed);
968                 } else {
969                     allocHandle.lastBytesRead(MIN_HTTP2_FRAME_SIZE);
970                 }
971                 allocHandle.incMessagesRead(1);
972                 pipeline().fireChannelRead(frame);
973 
974                 if (numBytesToBeConsumed != 0) {
975                     try {
976                         writeDoneAndNoFlush |= onBytesConsumed(ctx, stream, numBytesToBeConsumed);
977                     } catch (Http2Exception e) {
978                         pipeline().fireExceptionCaught(e);
979                     }
980                 }
981             }
982 
983             @Override
984             public void write(Object msg, final ChannelPromise promise) {
985                 // After this point its not possible to cancel a write anymore.
986                 if (!promise.setUncancellable()) {
987                     ReferenceCountUtil.release(msg);
988                     return;
989                 }
990 
991                 if (!isActive()) {
992                     ReferenceCountUtil.release(msg);
993                     promise.setFailure(CLOSED_CHANNEL_EXCEPTION);
994                     return;
995                 }
996 
997                 try {
998                     if (msg instanceof Http2StreamFrame) {
999                         Http2StreamFrame frame = validateStreamFrame((Http2StreamFrame) msg).stream(stream());
1000                         if (!firstFrameWritten && !isStreamIdValid(stream().id())) {
1001                             if (!(frame instanceof Http2HeadersFrame)) {
1002                                 ReferenceCountUtil.release(frame);
1003                                 promise.setFailure(
1004                                         new IllegalArgumentException("The first frame must be a headers frame. Was: "
1005                                         + frame.name()));
1006                                 return;
1007                             }
1008                             firstFrameWritten = true;
1009                             ChannelFuture future = write0(frame);
1010                             if (future.isDone()) {
1011                                 firstWriteComplete(future, promise);
1012                             } else {
1013                                 future.addListener(new ChannelFutureListener() {
1014                                     @Override
1015                                     public void operationComplete(ChannelFuture future) throws Exception {
1016                                         firstWriteComplete(future, promise);
1017                                     }
1018                                 });
1019                             }
1020                             return;
1021                         }
1022                     } else  {
1023                         String msgStr = msg.toString();
1024                         ReferenceCountUtil.release(msg);
1025                         promise.setFailure(new IllegalArgumentException(
1026                                 "Message must be an " + StringUtil.simpleClassName(Http2StreamFrame.class) +
1027                                         ": " + msgStr));
1028                         return;
1029                     }
1030 
1031                     ChannelFuture future = write0(msg);
1032                     if (future.isDone()) {
1033                         writeComplete(future, promise);
1034                     } else {
1035                         future.addListener(new ChannelFutureListener() {
1036                             @Override
1037                             public void operationComplete(ChannelFuture future) throws Exception {
1038                                 writeComplete(future, promise);
1039                             }
1040                         });
1041                     }
1042                 } catch (Throwable t) {
1043                     promise.tryFailure(t);
1044                 } finally {
1045                     writeDoneAndNoFlush = true;
1046                 }
1047             }
1048 
1049             private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) {
1050                 Throwable cause = future.cause();
1051                 if (cause == null) {
1052                     // As we just finished our first write which made the stream-id valid we need to re-evaluate
1053                     // the writability of the channel.
1054                     writabilityChanged(Http2MultiplexCodec.this.isWritable(stream));
1055                     promise.setSuccess();
1056                 } else {
1057                     promise.setFailure(cause);
1058                     closeForcibly();
1059                 }
1060             }
1061 
1062             private void writeComplete(ChannelFuture future, ChannelPromise promise) {
1063                 Throwable cause = future.cause();
1064                 if (cause == null) {
1065                     promise.setSuccess();
1066                 } else {
1067                     promise.setFailure(cause);
1068                 }
1069             }
1070 
1071             private Http2StreamFrame validateStreamFrame(Http2StreamFrame frame) {
1072                 if (frame.stream() != null && frame.stream() != stream) {
1073                     String msgString = frame.toString();
1074                     ReferenceCountUtil.release(frame);
1075                     throw new IllegalArgumentException(
1076                             "Stream " + frame.stream() + " must not be set on the frame: " + msgString);
1077                 }
1078                 return frame;
1079             }
1080 
1081             private ChannelFuture write0(Object msg) {
1082                 ChannelPromise promise = ctx.newPromise();
1083                 Http2MultiplexCodec.this.write(ctx, msg, promise);
1084                 return promise;
1085             }
1086 
1087             @Override
1088             public void flush() {
1089                 if (!writeDoneAndNoFlush) {
1090                     // There is nothing to flush so this is a NOOP.
1091                     return;
1092                 }
1093                 try {
1094                     // If we are current channelReadComplete(...) call we should just mark this Channel with a flush
1095                     // pending. We will ensure we trigger ctx.flush() after we processed all Channels later on and
1096                     // so aggregate the flushes. This is done as ctx.flush() is expensive when as it may trigger an
1097                     // write(...) or writev(...) operation on the socket.
1098                     if (inFireChannelReadComplete) {
1099                         flushPending = true;
1100                     } else {
1101                         flush0(ctx);
1102                     }
1103                 } finally {
1104                     writeDoneAndNoFlush = false;
1105                 }
1106             }
1107 
1108             @Override
1109             public ChannelPromise voidPromise() {
1110                 return unsafeVoidPromise;
1111             }
1112 
1113             @Override
1114             public ChannelOutboundBuffer outboundBuffer() {
1115                 // Always return null as we not use the ChannelOutboundBuffer and not even support it.
1116                 return null;
1117             }
1118         }
1119 
1120         /**
1121          * {@link ChannelConfig} so that the high and low writebuffer watermarks can reflect the outbound flow control
1122          * window, without having to create a new {@link WriteBufferWaterMark} object whenever the flow control window
1123          * changes.
1124          */
1125         private final class Http2StreamChannelConfig extends DefaultChannelConfig {
1126 
1127             Http2StreamChannelConfig(Channel channel) {
1128                 super(channel);
1129                 setRecvByteBufAllocator(new Http2StreamChannelRecvByteBufAllocator());
1130             }
1131 
1132             @Override
1133             public int getWriteBufferHighWaterMark() {
1134                 return min(parent().config().getWriteBufferHighWaterMark(), initialOutboundStreamWindow);
1135             }
1136 
1137             @Override
1138             public int getWriteBufferLowWaterMark() {
1139                 return min(parent().config().getWriteBufferLowWaterMark(), initialOutboundStreamWindow);
1140             }
1141 
1142             @Override
1143             public MessageSizeEstimator getMessageSizeEstimator() {
1144                 return FlowControlledFrameSizeEstimator.INSTANCE;
1145             }
1146 
1147             @Override
1148             public WriteBufferWaterMark getWriteBufferWaterMark() {
1149                 int mark = getWriteBufferHighWaterMark();
1150                 return new WriteBufferWaterMark(mark, mark);
1151             }
1152 
1153             @Override
1154             public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
1155                 throw new UnsupportedOperationException();
1156             }
1157 
1158             @Override
1159             @Deprecated
1160             public ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
1161                 throw new UnsupportedOperationException();
1162             }
1163 
1164             @Override
1165             @Deprecated
1166             public ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
1167                 throw new UnsupportedOperationException();
1168             }
1169 
1170             @Override
1171             public ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
1172                 throw new UnsupportedOperationException();
1173             }
1174 
1175             @Override
1176             public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
1177                 if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
1178                     throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " +
1179                             RecvByteBufAllocator.ExtendedHandle.class);
1180                 }
1181                 super.setRecvByteBufAllocator(allocator);
1182                 return this;
1183             }
1184         }
1185     }
1186 }