1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http2;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelConfig;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelHandler;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.channel.EventLoop;
26 import io.netty.util.ReferenceCounted;
27
28 import io.netty.util.internal.UnstableApi;
29
30 import java.util.ArrayDeque;
31 import java.util.Queue;
32
33 import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
34 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
35 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 @Deprecated
85 @UnstableApi
86 public class Http2MultiplexCodec extends Http2FrameCodec {
87
88 private final ChannelHandler inboundStreamHandler;
89 private final ChannelHandler upgradeStreamHandler;
90 private final Queue<AbstractHttp2StreamChannel> readCompletePendingQueue =
91 new MaxCapacityQueue<AbstractHttp2StreamChannel>(new ArrayDeque<AbstractHttp2StreamChannel>(8),
92
93 Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
94
95 private boolean parentReadInProgress;
96 private int idCount;
97
98
99 volatile ChannelHandlerContext ctx;
100
101 Http2MultiplexCodec(Http2ConnectionEncoder encoder,
102 Http2ConnectionDecoder decoder,
103 Http2Settings initialSettings,
104 ChannelHandler inboundStreamHandler,
105 ChannelHandler upgradeStreamHandler, boolean decoupleCloseAndGoAway, boolean flushPreface) {
106 super(encoder, decoder, initialSettings, decoupleCloseAndGoAway, flushPreface);
107 this.inboundStreamHandler = inboundStreamHandler;
108 this.upgradeStreamHandler = upgradeStreamHandler;
109 }
110
111 @Override
112 public void onHttpClientUpgrade() throws Http2Exception {
113
114 if (upgradeStreamHandler == null) {
115 throw connectionError(INTERNAL_ERROR, "Client is misconfigured for upgrade requests");
116 }
117
118 super.onHttpClientUpgrade();
119 }
120
121 @Override
122 public final void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
123 if (ctx.executor() != ctx.channel().eventLoop()) {
124 throw new IllegalStateException("EventExecutor must be EventLoop of Channel");
125 }
126 this.ctx = ctx;
127 }
128
129 @Override
130 public final void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
131 super.handlerRemoved0(ctx);
132
133 readCompletePendingQueue.clear();
134 }
135
136 @Override
137 final void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
138 if (frame instanceof Http2StreamFrame) {
139 Http2StreamFrame streamFrame = (Http2StreamFrame) frame;
140 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel)
141 ((DefaultHttp2FrameStream) streamFrame.stream()).attachment;
142 channel.fireChildRead(streamFrame);
143 return;
144 }
145 if (frame instanceof Http2GoAwayFrame) {
146 onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) frame);
147 }
148
149 ctx.fireChannelRead(frame);
150 }
151
152 @Override
153 final void onHttp2StreamStateChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream) {
154 switch (stream.state()) {
155 case HALF_CLOSED_LOCAL:
156 if (stream.id() != HTTP_UPGRADE_STREAM_ID) {
157
158 break;
159 }
160
161 case HALF_CLOSED_REMOTE:
162
163 case OPEN:
164 if (stream.attachment != null) {
165
166 break;
167 }
168 final Http2MultiplexCodecStreamChannel streamChannel;
169
170 if (stream.id() == HTTP_UPGRADE_STREAM_ID && !connection().isServer()) {
171
172
173 assert upgradeStreamHandler != null;
174 streamChannel = new Http2MultiplexCodecStreamChannel(stream, upgradeStreamHandler);
175 streamChannel.closeOutbound();
176 } else {
177 streamChannel = new Http2MultiplexCodecStreamChannel(stream, inboundStreamHandler);
178 }
179 ChannelFuture future = ctx.channel().eventLoop().register(streamChannel);
180 if (future.isDone()) {
181 Http2MultiplexHandler.registerDone(future);
182 } else {
183 future.addListener(Http2MultiplexHandler.CHILD_CHANNEL_REGISTRATION_LISTENER);
184 }
185 break;
186 case CLOSED:
187 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) stream.attachment;
188 if (channel != null) {
189 channel.streamClosed();
190 }
191 break;
192 default:
193
194 break;
195 }
196 }
197
198
199 final Http2StreamChannel newOutboundStream() {
200 return new Http2MultiplexCodecStreamChannel(newStream(), null);
201 }
202
203 @Override
204 final void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
205 Http2FrameStream stream = cause.stream();
206 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) ((DefaultHttp2FrameStream) stream).attachment;
207
208 try {
209 channel.pipeline().fireExceptionCaught(cause.getCause());
210 } finally {
211 channel.unsafe().closeForcibly();
212 }
213 }
214
215 private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
216 if (goAwayFrame.lastStreamId() == Integer.MAX_VALUE) {
217
218 return;
219 }
220
221 try {
222 forEachActiveStream(new Http2FrameStreamVisitor() {
223 @Override
224 public boolean visit(Http2FrameStream stream) {
225 final int streamId = stream.id();
226 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel)
227 ((DefaultHttp2FrameStream) stream).attachment;
228 if (streamId > goAwayFrame.lastStreamId() && connection().local().isValidStreamId(streamId)) {
229 channel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
230 }
231 return true;
232 }
233 });
234 } catch (Http2Exception e) {
235 ctx.fireExceptionCaught(e);
236 ctx.close();
237 }
238 }
239
240
241
242
243 @Override
244 public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
245 processPendingReadCompleteQueue();
246 channelReadComplete0(ctx);
247 }
248
249 private void processPendingReadCompleteQueue() {
250 parentReadInProgress = true;
251 try {
252
253
254
255 for (;;) {
256 AbstractHttp2StreamChannel childChannel = readCompletePendingQueue.poll();
257 if (childChannel == null) {
258 break;
259 }
260 childChannel.fireChildReadComplete();
261 }
262 } finally {
263 parentReadInProgress = false;
264 readCompletePendingQueue.clear();
265
266 flush0(ctx);
267 }
268 }
269 @Override
270 public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
271 parentReadInProgress = true;
272 super.channelRead(ctx, msg);
273 }
274
275 @Override
276 public final void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
277 if (ctx.channel().isWritable()) {
278
279
280 forEachActiveStream(AbstractHttp2StreamChannel.WRITABLE_VISITOR);
281 }
282
283 super.channelWritabilityChanged(ctx);
284 }
285
286 final void flush0(ChannelHandlerContext ctx) {
287 flush(ctx);
288 }
289
290 private final class Http2MultiplexCodecStreamChannel extends AbstractHttp2StreamChannel {
291
292 Http2MultiplexCodecStreamChannel(DefaultHttp2FrameStream stream, ChannelHandler inboundHandler) {
293 super(stream, ++idCount, inboundHandler);
294 }
295
296 @Override
297 protected boolean isParentReadInProgress() {
298 return parentReadInProgress;
299 }
300
301 @Override
302 protected void addChannelToReadCompletePendingQueue() {
303
304
305 while (!readCompletePendingQueue.offer(this)) {
306 processPendingReadCompleteQueue();
307 }
308 }
309
310 @Override
311 protected ChannelHandlerContext parentContext() {
312 return ctx;
313 }
314
315 @Override
316 protected ChannelFuture write0(ChannelHandlerContext ctx, Object msg) {
317 ChannelPromise promise = ctx.newPromise();
318 Http2MultiplexCodec.this.write(ctx, msg, promise);
319 return promise;
320 }
321
322 @Override
323 protected void flush0(ChannelHandlerContext ctx) {
324 Http2MultiplexCodec.this.flush0(ctx);
325 }
326 }
327 }