1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty5.handler.codec.http2;
16
17 import io.netty5.buffer.api.Buffer;
18 import io.netty5.channel.Channel;
19 import io.netty5.channel.ChannelHandlerContext;
20 import io.netty5.channel.CoalescingBufferQueue;
21 import io.netty5.handler.codec.http.HttpStatusClass;
22 import io.netty5.handler.codec.http2.Http2CodecUtil.SimpleChannelPromiseAggregator;
23 import io.netty5.util.concurrent.Future;
24 import io.netty5.util.concurrent.FutureListener;
25 import io.netty5.util.concurrent.Promise;
26 import io.netty5.util.internal.UnstableApi;
27
28 import java.util.ArrayDeque;
29 import java.util.Queue;
30
31 import static io.netty5.handler.codec.http.HttpStatusClass.INFORMATIONAL;
32 import static io.netty5.handler.codec.http2.Http2Error.INTERNAL_ERROR;
33 import static io.netty5.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
34 import static io.netty5.handler.codec.http2.Http2Exception.connectionError;
35 import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
36 import static java.lang.Integer.MAX_VALUE;
37 import static java.lang.Math.min;
38 import static java.util.Objects.requireNonNull;
39
40
41
42
43 @UnstableApi
44 public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder, Http2SettingsReceivedConsumer {
45 private final Http2FrameWriter frameWriter;
46 private final Http2Connection connection;
47 private Http2LifecycleManager lifecycleManager;
48
49
50 private final Queue<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<>(4);
51 private Queue<Http2Settings> outstandingRemoteSettingsQueue;
52
53 public DefaultHttp2ConnectionEncoder(Http2Connection connection, Http2FrameWriter frameWriter) {
54 this.connection = requireNonNull(connection, "connection");
55 this.frameWriter = requireNonNull(frameWriter, "frameWriter");
56 if (connection.remote().flowController() == null) {
57 connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection));
58 }
59 }
60
61 @Override
62 public void lifecycleManager(Http2LifecycleManager lifecycleManager) {
63 this.lifecycleManager = requireNonNull(lifecycleManager, "lifecycleManager");
64 }
65
66 @Override
67 public Http2FrameWriter frameWriter() {
68 return frameWriter;
69 }
70
71 @Override
72 public Http2Connection connection() {
73 return connection;
74 }
75
76 @Override
77 public final Http2RemoteFlowController flowController() {
78 return connection().remote().flowController();
79 }
80
81 @Override
82 public void remoteSettings(Http2Settings settings) throws Http2Exception {
83 Boolean pushEnabled = settings.pushEnabled();
84 Http2FrameWriter.Configuration config = configuration();
85 Http2HeadersEncoder.Configuration outboundHeaderConfig = config.headersConfiguration();
86 Http2FrameSizePolicy outboundFrameSizePolicy = config.frameSizePolicy();
87 if (pushEnabled != null) {
88 if (!connection.isServer() && pushEnabled) {
89 throw connectionError(PROTOCOL_ERROR,
90 "Client received a value of ENABLE_PUSH specified to other than 0");
91 }
92 connection.remote().allowPushTo(pushEnabled);
93 }
94
95 Long maxConcurrentStreams = settings.maxConcurrentStreams();
96 if (maxConcurrentStreams != null) {
97 connection.local().maxActiveStreams((int) min(maxConcurrentStreams, MAX_VALUE));
98 }
99
100 Long headerTableSize = settings.headerTableSize();
101 if (headerTableSize != null) {
102 outboundHeaderConfig.maxHeaderTableSize((int) min(headerTableSize, MAX_VALUE));
103 }
104
105 Long maxHeaderListSize = settings.maxHeaderListSize();
106 if (maxHeaderListSize != null) {
107 outboundHeaderConfig.maxHeaderListSize(maxHeaderListSize);
108 }
109
110 Integer maxFrameSize = settings.maxFrameSize();
111 if (maxFrameSize != null) {
112 outboundFrameSizePolicy.maxFrameSize(maxFrameSize);
113 }
114
115 Integer initialWindowSize = settings.initialWindowSize();
116 if (initialWindowSize != null) {
117 flowController().initialWindowSize(initialWindowSize);
118 }
119 }
120
121 @Override
122 public Future<Void> writeData(final ChannelHandlerContext ctx, final int streamId, Buffer data, int padding,
123 final boolean endOfStream) {
124 final Http2Stream stream;
125 try {
126 stream = requireStream(streamId);
127
128
129 switch (stream.state()) {
130 case OPEN:
131 case HALF_CLOSED_REMOTE:
132
133 break;
134 default:
135 throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " + stream.state());
136 }
137 } catch (Throwable e) {
138 data.close();
139 return ctx.newFailedFuture(e);
140 }
141
142 Promise<Void> promise = ctx.newPromise();
143
144 flowController().addFlowControlled(stream,
145 new FlowControlledData(stream, data, padding, endOfStream, promise, ctx.channel()));
146 return promise.asFuture();
147 }
148
149 @Override
150 public Future<Void> writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
151 boolean endStream) {
152 return writeHeaders0(ctx, streamId, headers, false, 0, (short) 0, false, padding, endStream);
153 }
154
155 private static boolean validateHeadersSentState(Http2Stream stream, Http2Headers headers, boolean isServer,
156 boolean endOfStream) {
157 boolean isInformational = isServer && HttpStatusClass.valueOf(headers.status()) == INFORMATIONAL;
158 if ((isInformational || !endOfStream) && stream.isHeadersSent() || stream.isTrailersSent()) {
159 throw new IllegalStateException("Stream " + stream.id() + " sent too many headers EOS: " + endOfStream);
160 }
161 return isInformational;
162 }
163
164 @Override
165 public Future<Void> writeHeaders(final ChannelHandlerContext ctx, final int streamId,
166 final Http2Headers headers, final int streamDependency, final short weight,
167 final boolean exclusive, final int padding, final boolean endOfStream) {
168 return writeHeaders0(ctx, streamId, headers, true, streamDependency,
169 weight, exclusive, padding, endOfStream);
170 }
171
172
173
174
175
176 private static Future<Void> sendHeaders(Http2FrameWriter frameWriter, ChannelHandlerContext ctx, int streamId,
177 Http2Headers headers, final boolean hasPriority,
178 int streamDependency, final short weight,
179 boolean exclusive, final int padding,
180 boolean endOfStream) {
181 if (hasPriority) {
182 return frameWriter.writeHeaders(ctx, streamId, headers, streamDependency,
183 weight, exclusive, padding, endOfStream);
184 }
185 return frameWriter.writeHeaders(ctx, streamId, headers, padding, endOfStream);
186 }
187
188 private Future<Void> writeHeaders0(final ChannelHandlerContext ctx, final int streamId,
189 final Http2Headers headers, final boolean hasPriority,
190 final int streamDependency, final short weight,
191 final boolean exclusive, final int padding,
192 final boolean endOfStream) {
193 try {
194 Http2Stream stream = connection.stream(streamId);
195 if (stream == null) {
196 try {
197
198
199
200
201
202 stream = connection.local().createStream(streamId, false);
203 } catch (Http2Exception cause) {
204 if (connection.remote().mayHaveCreatedStream(streamId)) {
205 return ctx.newFailedFuture(
206 new IllegalStateException("Stream no longer exists: " + streamId, cause));
207 }
208 throw cause;
209 }
210 } else {
211 switch (stream.state()) {
212 case RESERVED_LOCAL:
213 stream.open(endOfStream);
214 break;
215 case OPEN:
216 case HALF_CLOSED_REMOTE:
217
218 break;
219 default:
220 throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " +
221 stream.state());
222 }
223 }
224
225
226
227 Http2RemoteFlowController flowController = flowController();
228 if (!endOfStream || !flowController.hasFlowControlled(stream)) {
229
230
231 boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
232
233 Future<Void> future = sendHeaders(frameWriter, ctx, streamId, headers, hasPriority, streamDependency,
234 weight, exclusive, padding, endOfStream);
235
236
237
238 if (future.isSuccess() || !future.isDone()) {
239
240
241
242
243
244 stream.headersSent(isInformational);
245
246 if (!future.isSuccess()) {
247
248 notifyLifecycleManagerOnError(future, ctx);
249 }
250 } else {
251 Throwable failureCause = future.cause();
252 lifecycleManager.onError(ctx, true, failureCause);
253 }
254
255 if (endOfStream) {
256
257
258
259 lifecycleManager.closeStreamLocal(stream, future);
260 }
261
262 return future;
263 } else {
264 Promise<Void> promise = ctx.newPromise();
265
266 flowController.addFlowControlled(stream,
267 new FlowControlledHeaders(stream, headers, hasPriority, streamDependency,
268 weight, exclusive, padding, true, promise));
269 return promise.asFuture();
270 }
271 } catch (Throwable t) {
272 lifecycleManager.onError(ctx, true, t);
273 return ctx.newFailedFuture(t);
274 }
275 }
276
277 @Override
278 public Future<Void> writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
279 boolean exclusive) {
280 return frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive);
281 }
282
283 @Override
284 public Future<Void> writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode) {
285
286 return lifecycleManager.resetStream(ctx, streamId, errorCode);
287 }
288
289 @Override
290 public Future<Void> writeSettings(ChannelHandlerContext ctx, Http2Settings settings) {
291 outstandingLocalSettingsQueue.add(settings);
292 try {
293 Boolean pushEnabled = settings.pushEnabled();
294 if (pushEnabled != null && connection.isServer()) {
295 throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified");
296 }
297 } catch (Throwable e) {
298 return ctx.newFailedFuture(e);
299 }
300
301 return frameWriter.writeSettings(ctx, settings);
302 }
303
304 @Override
305 public Future<Void> writeSettingsAck(ChannelHandlerContext ctx) {
306 if (outstandingRemoteSettingsQueue == null) {
307 return frameWriter.writeSettingsAck(ctx);
308 }
309 Http2Settings settings = outstandingRemoteSettingsQueue.poll();
310 if (settings == null) {
311 return ctx.newFailedFuture(new Http2Exception(INTERNAL_ERROR, "attempted to write a SETTINGS ACK with no " +
312 " pending SETTINGS"));
313 }
314 SimpleChannelPromiseAggregator aggregator =
315 new SimpleChannelPromiseAggregator(ctx.newPromise(), ctx.executor());
316
317
318
319 frameWriter.writeSettingsAck(ctx).cascadeTo(aggregator.newPromise());
320
321
322
323 Promise<Void> applySettingsPromise = aggregator.newPromise();
324 try {
325 remoteSettings(settings);
326 applySettingsPromise.setSuccess(null);
327 } catch (Throwable e) {
328 applySettingsPromise.setFailure(e);
329 lifecycleManager.onError(ctx, true, e);
330 }
331 return aggregator.doneAllocatingPromises();
332 }
333
334 @Override
335 public Future<Void> writePing(ChannelHandlerContext ctx, boolean ack, long data) {
336 return frameWriter.writePing(ctx, ack, data);
337 }
338
339 @Override
340 public Future<Void> writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
341 Http2Headers headers, int padding) {
342 try {
343 if (connection.goAwayReceived()) {
344 throw connectionError(PROTOCOL_ERROR, "Sending PUSH_PROMISE after GO_AWAY received.");
345 }
346
347 Http2Stream stream = requireStream(streamId);
348
349 connection.local().reservePushStream(promisedStreamId, stream);
350
351 Future<Void> future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding);
352
353 if (future.isSuccess() || !future.isDone()) {
354
355
356 stream.pushPromiseSent();
357
358 if (!future.isSuccess()) {
359
360 notifyLifecycleManagerOnError(future, ctx);
361 }
362 } else {
363 Throwable failureCause = future.cause();
364 lifecycleManager.onError(ctx, true, failureCause);
365 }
366 return future;
367 } catch (Throwable t) {
368 lifecycleManager.onError(ctx, true, t);
369 return ctx.newFailedFuture(t);
370 }
371 }
372
373 @Override
374 public Future<Void> writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, Buffer debugData) {
375 return lifecycleManager.goAway(ctx, lastStreamId, errorCode, debugData);
376 }
377
378 @Override
379 public Future<Void> writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) {
380 return ctx.newFailedFuture(new UnsupportedOperationException("Use the Http2[Inbound|Outbound]FlowController" +
381 " objects to control window sizes"));
382 }
383
384 @Override
385 public Future<Void> writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
386 Buffer payload) {
387 return frameWriter.writeFrame(ctx, frameType, streamId, flags, payload);
388 }
389
390 @Override
391 public void close() {
392 frameWriter.close();
393 }
394
395 @Override
396 public Http2Settings pollSentSettings() {
397 return outstandingLocalSettingsQueue.poll();
398 }
399
400 @Override
401 public Configuration configuration() {
402 return frameWriter.configuration();
403 }
404
405 private Http2Stream requireStream(int streamId) {
406 Http2Stream stream = connection.stream(streamId);
407 if (stream == null) {
408 final String message;
409 if (connection.streamMayHaveExisted(streamId)) {
410 message = "Stream no longer exists: " + streamId;
411 } else {
412 message = "Stream does not exist: " + streamId;
413 }
414 throw new IllegalArgumentException(message);
415 }
416 return stream;
417 }
418
419 @Override
420 public void consumeReceivedSettings(Http2Settings settings) {
421 if (outstandingRemoteSettingsQueue == null) {
422 outstandingRemoteSettingsQueue = new ArrayDeque<>(2);
423 }
424 outstandingRemoteSettingsQueue.add(settings);
425 }
426
427
428
429
430
431
432
433
434
435
436 private final class FlowControlledData extends FlowControlledBase {
437 private final CoalescingBufferQueue queue;
438 private int dataSize;
439
440 FlowControlledData(Http2Stream stream, Buffer buf, int padding, boolean endOfStream,
441 Promise<Void> promise, Channel channel) {
442 super(stream, padding, endOfStream, promise);
443 queue = new CoalescingBufferQueue(channel);
444 queue.add(buf, promise);
445 dataSize = queue.readableBytes();
446 }
447
448 @Override
449 public int size() {
450 return dataSize + padding;
451 }
452
453 @Override
454 public void error(ChannelHandlerContext ctx, Throwable cause) {
455 queue.releaseAndFailAll(cause);
456
457
458
459
460
461 lifecycleManager.onError(ctx, true, cause);
462 }
463
464 @Override
465 public void write(ChannelHandlerContext ctx, int allowedBytes) {
466 int queuedData = queue.readableBytes();
467 if (!endOfStream) {
468 if (queuedData == 0) {
469 if (queue.isEmpty()) {
470
471
472
473
474
475
476 padding = dataSize = 0;
477 } else {
478
479
480
481 Promise<Void> writePromise = ctx.newPromise();
482 writePromise.asFuture().addListener(this);
483 ctx.write(queue.remove(0, writePromise)).cascadeTo(writePromise);
484 }
485 return;
486 }
487
488 if (allowedBytes == 0) {
489 return;
490 }
491 }
492
493
494 int writableData = min(queuedData, allowedBytes);
495 Promise<Void> writePromise = ctx.newPromise();
496 writePromise.asFuture().addListener(this);
497 Buffer toWrite = queue.remove(writableData, writePromise);
498 dataSize = queue.readableBytes();
499
500
501 int writablePadding = min(allowedBytes - writableData, padding);
502 padding -= writablePadding;
503
504
505 frameWriter().writeData(ctx, stream.id(), toWrite, writablePadding,
506 endOfStream && size() == 0).cascadeTo(writePromise);
507 }
508
509 @Override
510 public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
511 FlowControlledData nextData;
512 if (FlowControlledData.class != next.getClass() ||
513 MAX_VALUE - (nextData = (FlowControlledData) next).size() < size()) {
514 return false;
515 }
516 nextData.queue.copyTo(queue);
517 dataSize = queue.readableBytes();
518
519 padding = Math.max(padding, nextData.padding);
520 endOfStream = nextData.endOfStream;
521 return true;
522 }
523 }
524
525 private void notifyLifecycleManagerOnError(Future<Void> future, final ChannelHandlerContext ctx) {
526 future.addListener(future1 -> {
527 Throwable cause = future1.cause();
528 if (cause != null) {
529 lifecycleManager.onError(ctx, true, cause);
530 }
531 });
532 }
533
534
535
536
537
538
539 private final class FlowControlledHeaders extends FlowControlledBase {
540 private final Http2Headers headers;
541 private final boolean hasPriority;
542 private final int streamDependency;
543 private final short weight;
544 private final boolean exclusive;
545
546 FlowControlledHeaders(Http2Stream stream, Http2Headers headers, boolean hasPriority,
547 int streamDependency, short weight, boolean exclusive,
548 int padding, boolean endOfStream, Promise<Void> promise) {
549 super(stream, padding, endOfStream, promise);
550 this.headers = headers;
551 this.hasPriority = hasPriority;
552 this.streamDependency = streamDependency;
553 this.weight = weight;
554 this.exclusive = exclusive;
555 }
556
557 @Override
558 public int size() {
559 return 0;
560 }
561
562 @Override
563 public void error(ChannelHandlerContext ctx, Throwable cause) {
564 if (ctx != null) {
565 lifecycleManager.onError(ctx, true, cause);
566 }
567 promise.tryFailure(cause);
568 }
569
570 @Override
571 public void write(ChannelHandlerContext ctx, int allowedBytes) {
572 boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
573
574
575 promise.asFuture().addListener(this);
576
577 Future<Void> f = sendHeaders(frameWriter, ctx, stream.id(), headers, hasPriority, streamDependency,
578 weight, exclusive, padding, endOfStream);
579 f.cascadeTo(promise);
580
581 if (!f.isFailed()) {
582
583
584 stream.headersSent(isInformational);
585 }
586 }
587
588 @Override
589 public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
590 return false;
591 }
592 }
593
594
595
596
597 public abstract class FlowControlledBase implements Http2RemoteFlowController.FlowControlled, FutureListener<Void> {
598 protected final Http2Stream stream;
599 protected Promise<Void> promise;
600 protected boolean endOfStream;
601 protected int padding;
602
603 FlowControlledBase(final Http2Stream stream, int padding, boolean endOfStream,
604 final Promise<Void> promise) {
605 checkPositiveOrZero(padding, "padding");
606 this.padding = padding;
607 this.endOfStream = endOfStream;
608 this.stream = stream;
609 this.promise = promise;
610 }
611
612 @Override
613 public void writeComplete() {
614 if (endOfStream) {
615 lifecycleManager.closeStreamLocal(stream, promise.asFuture());
616 }
617 }
618
619 @Override
620 public void operationComplete(Future<? extends Void> future) {
621 if (future.isFailed()) {
622 error(flowController().channelHandlerContext(), future.cause());
623 }
624 }
625 }
626 }