1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http2;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelConfig;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandler;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelPipeline;
26 import io.netty.channel.EventLoop;
27 import io.netty.channel.ServerChannel;
28 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
29 import io.netty.channel.socket.ChannelOutputShutdownEvent;
30 import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
31 import io.netty.handler.ssl.SslCloseCompletionEvent;
32 import io.netty.util.ReferenceCounted;
33 import io.netty.util.internal.ObjectUtil;
34
35 import java.util.ArrayDeque;
36 import java.util.Queue;
37 import javax.net.ssl.SSLException;
38
39 import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR;
40 import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR;
41 import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.SSL_CLOSE_COMPLETION_EVENT_VISITOR;
42 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
43 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100 public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler {
101
102 static final ChannelFutureListener CHILD_CHANNEL_REGISTRATION_LISTENER = new ChannelFutureListener() {
103 @Override
104 public void operationComplete(ChannelFuture future) {
105 registerDone(future);
106 }
107 };
108
109 private final ChannelHandler inboundStreamHandler;
110 private final ChannelHandler upgradeStreamHandler;
111 private final Queue<AbstractHttp2StreamChannel> readCompletePendingQueue =
112 new MaxCapacityQueue<AbstractHttp2StreamChannel>(new ArrayDeque<AbstractHttp2StreamChannel>(8),
113
114 Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
115
116 private boolean parentReadInProgress;
117 private int idCount;
118
119
120 private volatile ChannelHandlerContext ctx;
121
122
123
124
125
126
127
128 public Http2MultiplexHandler(ChannelHandler inboundStreamHandler) {
129 this(inboundStreamHandler, null);
130 }
131
132
133
134
135
136
137
138
139
140 public Http2MultiplexHandler(ChannelHandler inboundStreamHandler, ChannelHandler upgradeStreamHandler) {
141 this.inboundStreamHandler = ObjectUtil.checkNotNull(inboundStreamHandler, "inboundStreamHandler");
142 this.upgradeStreamHandler = upgradeStreamHandler;
143 }
144
145 static void registerDone(ChannelFuture future) {
146
147
148
149 if (!future.isSuccess()) {
150 Channel childChannel = future.channel();
151 if (childChannel.isRegistered()) {
152 childChannel.close();
153 } else {
154 childChannel.unsafe().closeForcibly();
155 }
156 }
157 }
158
159 @Override
160 protected void handlerAdded0(ChannelHandlerContext ctx) {
161 if (ctx.executor() != ctx.channel().eventLoop()) {
162 throw new IllegalStateException("EventExecutor must be EventLoop of Channel");
163 }
164 this.ctx = ctx;
165 }
166
167 @Override
168 protected void handlerRemoved0(ChannelHandlerContext ctx) {
169 readCompletePendingQueue.clear();
170 }
171
172 @Override
173 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
174 parentReadInProgress = true;
175 if (msg instanceof Http2StreamFrame) {
176 if (msg instanceof Http2WindowUpdateFrame) {
177
178 return;
179 }
180 Http2StreamFrame streamFrame = (Http2StreamFrame) msg;
181 DefaultHttp2FrameStream s =
182 (DefaultHttp2FrameStream) streamFrame.stream();
183
184 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) s.attachment;
185 if (msg instanceof Http2ResetFrame || msg instanceof Http2PriorityFrame) {
186
187
188 channel.pipeline().fireUserEventTriggered(msg);
189
190
191
192 } else {
193 channel.fireChildRead(streamFrame);
194 }
195 return;
196 }
197
198 if (msg instanceof Http2GoAwayFrame) {
199
200
201 onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) msg);
202 }
203
204
205 ctx.fireChannelRead(msg);
206 }
207
208 @Override
209 public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
210 if (ctx.channel().isWritable()) {
211
212
213 forEachActiveStream(AbstractHttp2StreamChannel.WRITABLE_VISITOR);
214 }
215
216 ctx.fireChannelWritabilityChanged();
217 }
218
219 @Override
220 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
221 if (evt instanceof Http2FrameStreamEvent) {
222 Http2FrameStreamEvent event = (Http2FrameStreamEvent) evt;
223 DefaultHttp2FrameStream stream = (DefaultHttp2FrameStream) event.stream();
224 if (event.type() == Http2FrameStreamEvent.Type.State) {
225 switch (stream.state()) {
226 case HALF_CLOSED_LOCAL:
227 if (stream.id() != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
228
229 break;
230 }
231
232 case HALF_CLOSED_REMOTE:
233
234 case OPEN:
235 if (stream.attachment != null) {
236
237 break;
238 }
239 final AbstractHttp2StreamChannel ch;
240
241 if (stream.id() == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID && !isServer(ctx)) {
242
243 if (upgradeStreamHandler == null) {
244 throw connectionError(INTERNAL_ERROR,
245 "Client is misconfigured for upgrade requests");
246 }
247 ch = new Http2MultiplexHandlerStreamChannel(stream, upgradeStreamHandler);
248 ch.closeOutbound();
249 } else {
250 ch = new Http2MultiplexHandlerStreamChannel(stream, inboundStreamHandler);
251 }
252 ChannelFuture future = ctx.channel().eventLoop().register(ch);
253 if (future.isDone()) {
254 registerDone(future);
255 } else {
256 future.addListener(CHILD_CHANNEL_REGISTRATION_LISTENER);
257 }
258 break;
259 case CLOSED:
260 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) stream.attachment;
261 if (channel != null) {
262 channel.streamClosed();
263 }
264 break;
265 default:
266
267 break;
268 }
269 }
270 return;
271 }
272 if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
273 forEachActiveStream(CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR);
274 } else if (evt == ChannelOutputShutdownEvent.INSTANCE) {
275 forEachActiveStream(CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR);
276 } else if (evt == SslCloseCompletionEvent.SUCCESS) {
277 forEachActiveStream(SSL_CLOSE_COMPLETION_EVENT_VISITOR);
278 }
279 ctx.fireUserEventTriggered(evt);
280 }
281
282
283 Http2StreamChannel newOutboundStream() {
284 return new Http2MultiplexHandlerStreamChannel((DefaultHttp2FrameStream) newStream(), null);
285 }
286
287 @Override
288 public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) throws Exception {
289 if (cause instanceof Http2FrameStreamException) {
290 Http2FrameStreamException exception = (Http2FrameStreamException) cause;
291 Http2FrameStream stream = exception.stream();
292 AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
293 ((DefaultHttp2FrameStream) stream).attachment;
294 try {
295 childChannel.pipeline().fireExceptionCaught(cause.getCause());
296 } finally {
297
298
299 childChannel.closeWithError(exception.error());
300 }
301 return;
302 }
303 if (cause instanceof Http2MultiplexActiveStreamsException) {
304
305 fireExceptionCaughtForActiveStream(cause.getCause());
306 return;
307 }
308
309 if (cause.getCause() instanceof SSLException) {
310 fireExceptionCaughtForActiveStream(cause);
311 }
312 ctx.fireExceptionCaught(cause);
313 }
314
315 private void fireExceptionCaughtForActiveStream(final Throwable cause) throws Http2Exception {
316 forEachActiveStream(new Http2FrameStreamVisitor() {
317 @Override
318 public boolean visit(Http2FrameStream stream) {
319 AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
320 ((DefaultHttp2FrameStream) stream).attachment;
321 childChannel.pipeline().fireExceptionCaught(cause);
322 return true;
323 }
324 });
325 }
326
327 private static boolean isServer(ChannelHandlerContext ctx) {
328 return ctx.channel().parent() instanceof ServerChannel;
329 }
330
331 private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
332 if (goAwayFrame.lastStreamId() == Integer.MAX_VALUE) {
333
334 return;
335 }
336
337 try {
338 final boolean server = isServer(ctx);
339 forEachActiveStream(new Http2FrameStreamVisitor() {
340 @Override
341 public boolean visit(Http2FrameStream stream) {
342 final int streamId = stream.id();
343 if (streamId > goAwayFrame.lastStreamId() && Http2CodecUtil.isStreamIdValid(streamId, server)) {
344 final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
345 ((DefaultHttp2FrameStream) stream).attachment;
346 childChannel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
347 }
348 return true;
349 }
350 });
351 } catch (Http2Exception e) {
352 ctx.fireExceptionCaught(e);
353 ctx.close();
354 }
355 }
356
357
358
359
360 @Override
361 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
362 processPendingReadCompleteQueue();
363 ctx.fireChannelReadComplete();
364 }
365
366 private void processPendingReadCompleteQueue() {
367 parentReadInProgress = true;
368
369
370
371 AbstractHttp2StreamChannel childChannel = readCompletePendingQueue.poll();
372 if (childChannel != null) {
373 try {
374 do {
375 childChannel.fireChildReadComplete();
376 childChannel = readCompletePendingQueue.poll();
377 } while (childChannel != null);
378 } finally {
379 parentReadInProgress = false;
380 readCompletePendingQueue.clear();
381 ctx.flush();
382 }
383 } else {
384 parentReadInProgress = false;
385 }
386 }
387
388 private final class Http2MultiplexHandlerStreamChannel extends AbstractHttp2StreamChannel {
389
390 Http2MultiplexHandlerStreamChannel(DefaultHttp2FrameStream stream, ChannelHandler inboundHandler) {
391 super(stream, ++idCount, inboundHandler);
392 }
393
394 @Override
395 protected boolean isParentReadInProgress() {
396 return parentReadInProgress;
397 }
398
399 @Override
400 protected void addChannelToReadCompletePendingQueue() {
401
402
403 while (!readCompletePendingQueue.offer(this)) {
404 processPendingReadCompleteQueue();
405 }
406 }
407
408 @Override
409 protected ChannelHandlerContext parentContext() {
410 return ctx;
411 }
412 }
413 }