1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.codec.http2;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.channel.Channel;
20 import io.netty5.channel.ChannelHandler;
21 import io.netty5.channel.ChannelHandlerContext;
22 import io.netty5.channel.ChannelOption;
23 import io.netty5.channel.ChannelPipeline;
24 import io.netty5.channel.EventLoop;
25 import io.netty5.channel.ServerChannel;
26 import io.netty5.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
27 import io.netty5.util.Resource;
28 import io.netty5.util.concurrent.Future;
29 import io.netty5.util.concurrent.FutureContextListener;
30 import io.netty5.util.internal.UnstableApi;
31
32 import javax.net.ssl.SSLException;
33 import java.util.ArrayDeque;
34 import java.util.Objects;
35 import java.util.Queue;
36
37 import static io.netty5.handler.codec.http2.Http2Error.INTERNAL_ERROR;
38 import static io.netty5.handler.codec.http2.Http2Exception.connectionError;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 @UnstableApi
85 public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler {
86
87 private static final FutureContextListener<Channel, Void> CHILD_CHANNEL_REGISTRATION_LISTENER =
88 Http2MultiplexHandler::registerDone;
89
90 private final ChannelHandler inboundStreamHandler;
91 private final ChannelHandler upgradeStreamHandler;
92 private final Queue<DefaultHttp2StreamChannel> readCompletePendingQueue =
93 new MaxCapacityQueue<>(new ArrayDeque<>(8),
94
95 Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
96
97 private boolean parentReadInProgress;
98 private int idCount;
99
100
101 private volatile ChannelHandlerContext ctx;
102
103
104
105
106
107
108
109 public Http2MultiplexHandler(ChannelHandler inboundStreamHandler) {
110 this(inboundStreamHandler, null);
111 }
112
113
114
115
116
117
118
119
120
121 public Http2MultiplexHandler(ChannelHandler inboundStreamHandler, ChannelHandler upgradeStreamHandler) {
122 this.inboundStreamHandler = Objects.requireNonNull(inboundStreamHandler, "inboundStreamHandler");
123 this.upgradeStreamHandler = upgradeStreamHandler;
124 }
125
126 private static void registerDone(Channel childChannel, Future<?> future) {
127
128
129
130 if (future.isFailed()) {
131 if (childChannel.isRegistered()) {
132 childChannel.close();
133 } else {
134 ((DefaultHttp2StreamChannel) childChannel).closeForcibly();
135 }
136 }
137 }
138
139 @Override
140 protected void handlerAdded0(ChannelHandlerContext ctx) {
141 if (ctx.executor().inEventLoop() != ctx.channel().executor().inEventLoop()) {
142 throw new IllegalStateException("EventExecutor must be on the same thread as the EventLoop of the Channel");
143 }
144 this.ctx = ctx;
145 }
146
147 @Override
148 protected void handlerRemoved0(ChannelHandlerContext ctx) {
149 readCompletePendingQueue.clear();
150 }
151
152 @Override
153 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
154 parentReadInProgress = true;
155 if (msg instanceof Http2StreamFrame) {
156 if (msg instanceof Http2WindowUpdateFrame) {
157
158 return;
159 }
160 Http2StreamFrame streamFrame = (Http2StreamFrame) msg;
161 DefaultHttp2FrameStream s =
162 (DefaultHttp2FrameStream) streamFrame.stream();
163
164 DefaultHttp2StreamChannel channel = (DefaultHttp2StreamChannel) s.attachment;
165 if (msg instanceof Http2ResetFrame) {
166
167
168 channel.pipeline().fireChannelInboundEvent(msg);
169
170
171
172 } else {
173 channel.fireChildRead(streamFrame);
174 }
175 return;
176 }
177
178 if (msg instanceof Http2GoAwayFrame) {
179
180
181 onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) msg);
182 }
183
184
185 ctx.fireChannelRead(msg);
186 }
187
188 @Override
189 public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
190 if (ctx.channel().isWritable()) {
191
192
193 forEachActiveStream(DefaultHttp2StreamChannel.WRITABLE_VISITOR);
194 }
195
196 ctx.fireChannelWritabilityChanged();
197 }
198
199 @Override
200 public void channelInboundEvent(ChannelHandlerContext ctx, Object evt) throws Exception {
201 if (evt instanceof Http2FrameStreamEvent) {
202 Http2FrameStreamEvent event = (Http2FrameStreamEvent) evt;
203 DefaultHttp2FrameStream stream = (DefaultHttp2FrameStream) event.stream();
204 if (event.type() == Http2FrameStreamEvent.Type.State) {
205 switch (stream.state()) {
206 case HALF_CLOSED_LOCAL:
207
208 if (stream.id() != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
209 break;
210 }
211
212 case HALF_CLOSED_REMOTE:
213
214 case OPEN:
215 createStreamChannelIfNeeded(stream);
216 break;
217 case CLOSED:
218 DefaultHttp2StreamChannel channel = (DefaultHttp2StreamChannel) stream.attachment;
219 if (channel != null) {
220 channel.streamClosed();
221 }
222 break;
223 default:
224
225 break;
226 }
227 }
228 return;
229 }
230 ctx.fireChannelInboundEvent(evt);
231 }
232
233 private void createStreamChannelIfNeeded(DefaultHttp2FrameStream stream)
234 throws Http2Exception {
235 if (stream.attachment != null) {
236
237 return;
238 }
239 final DefaultHttp2StreamChannel ch;
240
241 if (stream.id() == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID && !isServer(ctx)) {
242
243 if (upgradeStreamHandler == null) {
244 throw connectionError(INTERNAL_ERROR,
245 "Client is misconfigured for upgrade requests");
246 }
247 ch = new DefaultHttp2StreamChannel(this, stream, ++idCount, upgradeStreamHandler);
248 ch.closeOutbound();
249 } else {
250 ch = new DefaultHttp2StreamChannel(this, stream, ++idCount, inboundStreamHandler);
251 }
252 Future<Void> future = ch.register();
253 if (future.isDone()) {
254 registerDone(ch, future);
255 } else {
256 future.addListener(ch, CHILD_CHANNEL_REGISTRATION_LISTENER);
257 }
258 }
259
260
261 Http2StreamChannel newOutboundStream() {
262 return new DefaultHttp2StreamChannel(this, (DefaultHttp2FrameStream) newStream(), ++idCount, null);
263 }
264
265 @Override
266 public void channelExceptionCaught(ChannelHandlerContext ctx, final Throwable cause) throws Exception {
267 if (cause instanceof Http2FrameStreamException) {
268 Http2FrameStreamException exception = (Http2FrameStreamException) cause;
269 Http2FrameStream stream = exception.stream();
270 DefaultHttp2StreamChannel childChannel = (DefaultHttp2StreamChannel)
271 ((DefaultHttp2FrameStream) stream).attachment;
272 try {
273 childChannel.pipeline().fireChannelExceptionCaught(cause.getCause());
274 } finally {
275 childChannel.closeForcibly();
276 }
277 return;
278 }
279 if (cause.getCause() instanceof SSLException) {
280 forEachActiveStream(stream -> {
281 DefaultHttp2StreamChannel childChannel = (DefaultHttp2StreamChannel)
282 ((DefaultHttp2FrameStream) stream).attachment;
283 childChannel.pipeline().fireChannelExceptionCaught(cause);
284 return true;
285 });
286 }
287 ctx.fireChannelExceptionCaught(cause);
288 }
289
290 private static boolean isServer(ChannelHandlerContext ctx) {
291 return ctx.channel().parent() instanceof ServerChannel;
292 }
293
294 private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
295 if (goAwayFrame.lastStreamId() == Integer.MAX_VALUE) {
296
297 return;
298 }
299
300 try {
301 final boolean server = isServer(ctx);
302 forEachActiveStream(stream -> {
303 final int streamId = stream.id();
304 if (streamId > goAwayFrame.lastStreamId() && Http2CodecUtil.isStreamIdValid(streamId, server)) {
305 final DefaultHttp2StreamChannel childChannel = (DefaultHttp2StreamChannel)
306 ((DefaultHttp2FrameStream) stream).attachment;
307 childChannel.pipeline().fireChannelInboundEvent(goAwayFrame.copy());
308 }
309 return true;
310 });
311 } catch (Http2Exception e) {
312 ctx.fireChannelExceptionCaught(e);
313 ctx.close();
314 }
315 }
316
317
318
319
320 @Override
321 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
322 processPendingReadCompleteQueue();
323 ctx.fireChannelReadComplete();
324 }
325
326 private void processPendingReadCompleteQueue() {
327 parentReadInProgress = true;
328
329
330
331 DefaultHttp2StreamChannel childChannel = readCompletePendingQueue.poll();
332 if (childChannel != null) {
333 try {
334 do {
335 childChannel.fireChildReadComplete();
336 childChannel = readCompletePendingQueue.poll();
337 } while (childChannel != null);
338 } finally {
339 parentReadInProgress = false;
340 readCompletePendingQueue.clear();
341 ctx.flush();
342 }
343 } else {
344 parentReadInProgress = false;
345 }
346 }
347
348 boolean isParentReadInProgress() {
349 return parentReadInProgress;
350 }
351
352 void addChannelToReadCompletePendingQueue(DefaultHttp2StreamChannel channel) {
353
354
355 while (!readCompletePendingQueue.offer(channel)) {
356 processPendingReadCompleteQueue();
357 }
358 }
359
360 ChannelHandlerContext parentContext() {
361 return ctx;
362 }
363 }