1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http2;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelConfig;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelHandler;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.channel.EventLoop;
26 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
27 import io.netty.channel.socket.ChannelOutputShutdownEvent;
28 import io.netty.handler.ssl.SslCloseCompletionEvent;
29 import io.netty.util.ReferenceCounted;
30
31 import java.util.ArrayDeque;
32 import java.util.Queue;
33
34 import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR;
35 import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR;
36 import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.SSL_CLOSE_COMPLETION_EVENT_VISITOR;
37 import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
38 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
39 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 @Deprecated
89 public class Http2MultiplexCodec extends Http2FrameCodec {
90
91 private final ChannelHandler inboundStreamHandler;
92 private final ChannelHandler upgradeStreamHandler;
93 private final Queue<AbstractHttp2StreamChannel> readCompletePendingQueue =
94 new MaxCapacityQueue<AbstractHttp2StreamChannel>(new ArrayDeque<AbstractHttp2StreamChannel>(8),
95
96 Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
97
98 private boolean parentReadInProgress;
99 private int idCount;
100
101
102 volatile ChannelHandlerContext ctx;
103
104 Http2MultiplexCodec(Http2ConnectionEncoder encoder,
105 Http2ConnectionDecoder decoder,
106 Http2Settings initialSettings,
107 ChannelHandler inboundStreamHandler,
108 ChannelHandler upgradeStreamHandler, boolean decoupleCloseAndGoAway, boolean flushPreface) {
109 super(encoder, decoder, initialSettings, decoupleCloseAndGoAway, flushPreface);
110 this.inboundStreamHandler = inboundStreamHandler;
111 this.upgradeStreamHandler = upgradeStreamHandler;
112 }
113
114 @Override
115 public void onHttpClientUpgrade() throws Http2Exception {
116
117 if (upgradeStreamHandler == null) {
118 throw connectionError(INTERNAL_ERROR, "Client is misconfigured for upgrade requests");
119 }
120
121 super.onHttpClientUpgrade();
122 }
123
124 @Override
125 public final void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
126 if (ctx.executor() != ctx.channel().eventLoop()) {
127 throw new IllegalStateException("EventExecutor must be EventLoop of Channel");
128 }
129 this.ctx = ctx;
130 }
131
132 @Override
133 public final void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
134 super.handlerRemoved0(ctx);
135
136 readCompletePendingQueue.clear();
137 }
138
139 @Override
140 final void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
141 if (frame instanceof Http2StreamFrame) {
142 Http2StreamFrame msg = (Http2StreamFrame) frame;
143 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel)
144 ((DefaultHttp2FrameStream) msg.stream()).attachment;
145 channel.fireChildRead(msg);
146 return;
147 }
148 if (frame instanceof Http2GoAwayFrame) {
149 onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) frame);
150 }
151
152 ctx.fireChannelRead(frame);
153 }
154
155 @Override
156 final void onHttp2StreamStateChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream) {
157 switch (stream.state()) {
158 case HALF_CLOSED_LOCAL:
159 if (stream.id() != HTTP_UPGRADE_STREAM_ID) {
160
161 break;
162 }
163
164 case HALF_CLOSED_REMOTE:
165
166 case OPEN:
167 if (stream.attachment != null) {
168
169 break;
170 }
171 final Http2MultiplexCodecStreamChannel streamChannel;
172
173 if (stream.id() == HTTP_UPGRADE_STREAM_ID && !connection().isServer()) {
174
175
176 assert upgradeStreamHandler != null;
177 streamChannel = new Http2MultiplexCodecStreamChannel(stream, upgradeStreamHandler);
178 streamChannel.closeOutbound();
179 } else {
180 streamChannel = new Http2MultiplexCodecStreamChannel(stream, inboundStreamHandler);
181 }
182 ChannelFuture future = ctx.channel().eventLoop().register(streamChannel);
183 if (future.isDone()) {
184 Http2MultiplexHandler.registerDone(future);
185 } else {
186 future.addListener(Http2MultiplexHandler.CHILD_CHANNEL_REGISTRATION_LISTENER);
187 }
188 break;
189 case CLOSED:
190 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) stream.attachment;
191 if (channel != null) {
192 channel.streamClosed();
193 }
194 break;
195 default:
196
197 break;
198 }
199 }
200
201
202 final Http2StreamChannel newOutboundStream() {
203 return new Http2MultiplexCodecStreamChannel(newStream(), null);
204 }
205
206 @Override
207 final void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
208 Http2FrameStream stream = cause.stream();
209 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) ((DefaultHttp2FrameStream) stream).attachment;
210
211 try {
212 channel.pipeline().fireExceptionCaught(cause.getCause());
213 } finally {
214
215
216 channel.closeWithError(cause.error());
217 }
218 }
219
220 private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
221 if (goAwayFrame.lastStreamId() == Integer.MAX_VALUE) {
222
223 return;
224 }
225
226 try {
227 forEachActiveStream(new Http2FrameStreamVisitor() {
228 @Override
229 public boolean visit(Http2FrameStream stream) {
230 final int streamId = stream.id();
231 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel)
232 ((DefaultHttp2FrameStream) stream).attachment;
233 if (streamId > goAwayFrame.lastStreamId() && connection().local().isValidStreamId(streamId)) {
234 channel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
235 }
236 return true;
237 }
238 });
239 } catch (Http2Exception e) {
240 ctx.fireExceptionCaught(e);
241 ctx.close();
242 }
243 }
244
245
246
247
248 @Override
249 public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
250 processPendingReadCompleteQueue();
251 channelReadComplete0(ctx);
252 }
253
254 private void processPendingReadCompleteQueue() {
255 parentReadInProgress = true;
256 try {
257
258
259
260 for (;;) {
261 AbstractHttp2StreamChannel childChannel = readCompletePendingQueue.poll();
262 if (childChannel == null) {
263 break;
264 }
265 childChannel.fireChildReadComplete();
266 }
267 } finally {
268 parentReadInProgress = false;
269 readCompletePendingQueue.clear();
270
271 flush0(ctx);
272 }
273 }
274 @Override
275 public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
276 parentReadInProgress = true;
277 super.channelRead(ctx, msg);
278 }
279
280 @Override
281 public final void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
282 if (ctx.channel().isWritable()) {
283
284
285 forEachActiveStream(AbstractHttp2StreamChannel.WRITABLE_VISITOR);
286 }
287
288 super.channelWritabilityChanged(ctx);
289 }
290
291 @Override
292 final void onUserEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
293 if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
294 forEachActiveStream(CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR);
295 } else if (evt == ChannelOutputShutdownEvent.INSTANCE) {
296 forEachActiveStream(CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR);
297 } else if (evt == SslCloseCompletionEvent.SUCCESS) {
298 forEachActiveStream(SSL_CLOSE_COMPLETION_EVENT_VISITOR);
299 }
300 super.onUserEventTriggered(ctx, evt);
301 }
302
303 final void flush0(ChannelHandlerContext ctx) {
304 flush(ctx);
305 }
306
307 private final class Http2MultiplexCodecStreamChannel extends AbstractHttp2StreamChannel {
308
309 Http2MultiplexCodecStreamChannel(DefaultHttp2FrameStream stream, ChannelHandler inboundHandler) {
310 super(stream, ++idCount, inboundHandler);
311 }
312
313 @Override
314 protected boolean isParentReadInProgress() {
315 return parentReadInProgress;
316 }
317
318 @Override
319 protected void addChannelToReadCompletePendingQueue() {
320
321
322 while (!readCompletePendingQueue.offer(this)) {
323 processPendingReadCompleteQueue();
324 }
325 }
326
327 @Override
328 protected ChannelHandlerContext parentContext() {
329 return ctx;
330 }
331
332 @Override
333 protected ChannelFuture write0(ChannelHandlerContext ctx, Object msg) {
334 ChannelPromise promise = ctx.newPromise();
335 Http2MultiplexCodec.this.write(ctx, msg, promise);
336 return promise;
337 }
338
339 @Override
340 protected void flush0(ChannelHandlerContext ctx) {
341 Http2MultiplexCodec.this.flush0(ctx);
342 }
343 }
344 }