1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.channel.ChannelFuture;
19 import io.netty.channel.ChannelFutureListener;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.ChannelPromise;
22 import io.netty.channel.CoalescingBufferQueue;
23 import io.netty.handler.codec.http.HttpStatusClass;
24 import io.netty.handler.codec.http2.Http2CodecUtil.SimpleChannelPromiseAggregator;
25
26 import java.util.ArrayDeque;
27 import java.util.Queue;
28
29 import static io.netty.handler.codec.http.HttpStatusClass.INFORMATIONAL;
30 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
31 import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
32 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
33 import static io.netty.util.internal.ObjectUtil.checkNotNull;
34 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
35 import static java.lang.Integer.MAX_VALUE;
36 import static java.lang.Math.min;
37
38
39
40
41 public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder, Http2SettingsReceivedConsumer {
42 private final Http2FrameWriter frameWriter;
43 private final Http2Connection connection;
44 private Http2LifecycleManager lifecycleManager;
45
46
47 private final Queue<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<Http2Settings>(4);
48 private Queue<Http2Settings> outstandingRemoteSettingsQueue;
49
50 public DefaultHttp2ConnectionEncoder(Http2Connection connection, Http2FrameWriter frameWriter) {
51 this.connection = checkNotNull(connection, "connection");
52 this.frameWriter = checkNotNull(frameWriter, "frameWriter");
53 if (connection.remote().flowController() == null) {
54 connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection));
55 }
56 }
57
58 @Override
59 public void lifecycleManager(Http2LifecycleManager lifecycleManager) {
60 this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager");
61 }
62
63 @Override
64 public Http2FrameWriter frameWriter() {
65 return frameWriter;
66 }
67
68 @Override
69 public Http2Connection connection() {
70 return connection;
71 }
72
73 @Override
74 public final Http2RemoteFlowController flowController() {
75 return connection().remote().flowController();
76 }
77
78 @Override
79 public void remoteSettings(Http2Settings settings) throws Http2Exception {
80 Boolean pushEnabled = settings.pushEnabled();
81 Http2FrameWriter.Configuration config = configuration();
82 Http2HeadersEncoder.Configuration outboundHeaderConfig = config.headersConfiguration();
83 Http2FrameSizePolicy outboundFrameSizePolicy = config.frameSizePolicy();
84 if (pushEnabled != null) {
85 if (!connection.isServer() && pushEnabled) {
86 throw connectionError(PROTOCOL_ERROR,
87 "Client received a value of ENABLE_PUSH specified to other than 0");
88 }
89 connection.remote().allowPushTo(pushEnabled);
90 }
91
92 Long maxConcurrentStreams = settings.maxConcurrentStreams();
93 if (maxConcurrentStreams != null) {
94 connection.local().maxActiveStreams((int) min(maxConcurrentStreams, MAX_VALUE));
95 }
96
97 Long headerTableSize = settings.headerTableSize();
98 if (headerTableSize != null) {
99 outboundHeaderConfig.maxHeaderTableSize(headerTableSize);
100 }
101
102 Long maxHeaderListSize = settings.maxHeaderListSize();
103 if (maxHeaderListSize != null) {
104 outboundHeaderConfig.maxHeaderListSize(maxHeaderListSize);
105 }
106
107 Integer maxFrameSize = settings.maxFrameSize();
108 if (maxFrameSize != null) {
109 outboundFrameSizePolicy.maxFrameSize(maxFrameSize);
110 }
111
112 Integer initialWindowSize = settings.initialWindowSize();
113 if (initialWindowSize != null) {
114 flowController().initialWindowSize(initialWindowSize);
115 }
116 }
117
118 @Override
119 public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
120 final boolean endOfStream, ChannelPromise promise) {
121 promise = promise.unvoid();
122 final Http2Stream stream;
123 try {
124 stream = requireStream(streamId);
125
126
127 switch (stream.state()) {
128 case OPEN:
129 case HALF_CLOSED_REMOTE:
130
131 break;
132 default:
133 throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " + stream.state());
134 }
135 } catch (Throwable e) {
136 data.release();
137 return promise.setFailure(e);
138 }
139
140
141 flowController().addFlowControlled(stream,
142 new FlowControlledData(stream, data, padding, endOfStream, promise));
143 return promise;
144 }
145
146 @Override
147 public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
148 boolean endStream, ChannelPromise promise) {
149 return writeHeaders0(ctx, streamId, headers, false, 0, (short) 0, false, padding, endStream, promise);
150 }
151
152 private static boolean validateHeadersSentState(Http2Stream stream, Http2Headers headers, boolean isServer,
153 boolean endOfStream) {
154 boolean isInformational = isServer && HttpStatusClass.valueOf(headers.status()) == INFORMATIONAL;
155 if ((isInformational || !endOfStream) && stream.isHeadersSent() || stream.isTrailersSent()) {
156 throw new IllegalStateException("Stream " + stream.id() + " sent too many headers EOS: " + endOfStream);
157 }
158 return isInformational;
159 }
160
161 @Override
162 public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId,
163 final Http2Headers headers, final int streamDependency, final short weight,
164 final boolean exclusive, final int padding, final boolean endOfStream, ChannelPromise promise) {
165 return writeHeaders0(ctx, streamId, headers, true, streamDependency,
166 weight, exclusive, padding, endOfStream, promise);
167 }
168
169
170
171
172
173 private static ChannelFuture sendHeaders(Http2FrameWriter frameWriter, ChannelHandlerContext ctx, int streamId,
174 Http2Headers headers, final boolean hasPriority,
175 int streamDependency, final short weight,
176 boolean exclusive, final int padding,
177 boolean endOfStream, ChannelPromise promise) {
178 if (hasPriority) {
179 return frameWriter.writeHeaders(ctx, streamId, headers, streamDependency,
180 weight, exclusive, padding, endOfStream, promise);
181 }
182 return frameWriter.writeHeaders(ctx, streamId, headers, padding, endOfStream, promise);
183 }
184
185 private ChannelFuture writeHeaders0(final ChannelHandlerContext ctx, final int streamId,
186 final Http2Headers headers, final boolean hasPriority,
187 final int streamDependency, final short weight,
188 final boolean exclusive, final int padding,
189 final boolean endOfStream, ChannelPromise promise) {
190 try {
191 Http2Stream stream = connection.stream(streamId);
192 if (stream == null) {
193 try {
194
195
196
197
198
199 stream = connection.local().createStream(streamId, false);
200 } catch (Http2Exception cause) {
201 if (connection.remote().mayHaveCreatedStream(streamId)) {
202 promise.tryFailure(new IllegalStateException("Stream no longer exists: " + streamId, cause));
203 return promise;
204 }
205 throw cause;
206 }
207 } else {
208 switch (stream.state()) {
209 case RESERVED_LOCAL:
210 stream.open(endOfStream);
211 break;
212 case OPEN:
213 case HALF_CLOSED_REMOTE:
214
215 break;
216 default:
217 throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " +
218 stream.state());
219 }
220 }
221
222
223
224 Http2RemoteFlowController flowController = flowController();
225 if (!endOfStream || !flowController.hasFlowControlled(stream)) {
226
227
228 promise = promise.unvoid();
229 boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
230
231 ChannelFuture future = sendHeaders(frameWriter, ctx, streamId, headers, hasPriority, streamDependency,
232 weight, exclusive, padding, endOfStream, promise);
233
234
235 Throwable failureCause = future.cause();
236 if (failureCause == null) {
237
238
239
240
241
242 stream.headersSent(isInformational);
243
244 if (!future.isSuccess()) {
245
246 notifyLifecycleManagerOnError(future, ctx);
247 }
248 } else {
249 lifecycleManager.onError(ctx, true, failureCause);
250 }
251
252 if (endOfStream) {
253
254
255
256 lifecycleManager.closeStreamLocal(stream, future);
257 }
258
259 return future;
260 } else {
261
262 flowController.addFlowControlled(stream,
263 new FlowControlledHeaders(stream, headers, hasPriority, streamDependency,
264 weight, exclusive, padding, true, promise));
265 return promise;
266 }
267 } catch (Throwable t) {
268 lifecycleManager.onError(ctx, true, t);
269 promise.tryFailure(t);
270 return promise;
271 }
272 }
273
274 @Override
275 public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
276 boolean exclusive, ChannelPromise promise) {
277 return frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
278 }
279
280 @Override
281 public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
282 ChannelPromise promise) {
283
284 return lifecycleManager.resetStream(ctx, streamId, errorCode, promise);
285 }
286
287 @Override
288 public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings,
289 ChannelPromise promise) {
290 outstandingLocalSettingsQueue.add(settings);
291 try {
292 Boolean pushEnabled = settings.pushEnabled();
293 if (pushEnabled != null && connection.isServer()) {
294 throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified");
295 }
296 } catch (Throwable e) {
297 return promise.setFailure(e);
298 }
299
300 return frameWriter.writeSettings(ctx, settings, promise);
301 }
302
303 @Override
304 public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
305 if (outstandingRemoteSettingsQueue == null) {
306 return frameWriter.writeSettingsAck(ctx, promise);
307 }
308 Http2Settings settings = outstandingRemoteSettingsQueue.poll();
309 if (settings == null) {
310 return promise.setFailure(new Http2Exception(INTERNAL_ERROR, "attempted to write a SETTINGS ACK with no " +
311 " pending SETTINGS"));
312 }
313 SimpleChannelPromiseAggregator aggregator = new SimpleChannelPromiseAggregator(promise, ctx.channel(),
314 ctx.executor());
315
316
317
318 frameWriter.writeSettingsAck(ctx, aggregator.newPromise());
319
320
321
322 ChannelPromise applySettingsPromise = aggregator.newPromise();
323 try {
324 remoteSettings(settings);
325 applySettingsPromise.setSuccess();
326 } catch (Throwable e) {
327 applySettingsPromise.setFailure(e);
328 lifecycleManager.onError(ctx, true, e);
329 }
330 return aggregator.doneAllocatingPromises();
331 }
332
333 @Override
334 public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
335 return frameWriter.writePing(ctx, ack, data, promise);
336 }
337
338 @Override
339 public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
340 Http2Headers headers, int padding, ChannelPromise promise) {
341 try {
342 if (connection.goAwayReceived()) {
343 throw connectionError(PROTOCOL_ERROR, "Sending PUSH_PROMISE after GO_AWAY received.");
344 }
345
346 Http2Stream stream = requireStream(streamId);
347
348 connection.local().reservePushStream(promisedStreamId, stream);
349
350 promise = promise.unvoid();
351 ChannelFuture future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding,
352 promise);
353
354 Throwable failureCause = future.cause();
355 if (failureCause == null) {
356
357
358 stream.pushPromiseSent();
359
360 if (!future.isSuccess()) {
361
362 notifyLifecycleManagerOnError(future, ctx);
363 }
364 } else {
365 lifecycleManager.onError(ctx, true, failureCause);
366 }
367 return future;
368 } catch (Throwable t) {
369 lifecycleManager.onError(ctx, true, t);
370 promise.tryFailure(t);
371 return promise;
372 }
373 }
374
375 @Override
376 public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
377 ChannelPromise promise) {
378 return lifecycleManager.goAway(ctx, lastStreamId, errorCode, debugData, promise);
379 }
380
381 @Override
382 public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement,
383 ChannelPromise promise) {
384 return promise.setFailure(new UnsupportedOperationException("Use the Http2[Inbound|Outbound]FlowController" +
385 " objects to control window sizes"));
386 }
387
388 @Override
389 public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
390 ByteBuf payload, ChannelPromise promise) {
391 return frameWriter.writeFrame(ctx, frameType, streamId, flags, payload, promise);
392 }
393
394 @Override
395 public void close() {
396 frameWriter.close();
397 }
398
399 @Override
400 public Http2Settings pollSentSettings() {
401 return outstandingLocalSettingsQueue.poll();
402 }
403
404 @Override
405 public Configuration configuration() {
406 return frameWriter.configuration();
407 }
408
409 private Http2Stream requireStream(int streamId) {
410 Http2Stream stream = connection.stream(streamId);
411 if (stream == null) {
412 final String message;
413 if (connection.streamMayHaveExisted(streamId)) {
414 message = "Stream no longer exists: " + streamId;
415 } else {
416 message = "Stream does not exist: " + streamId;
417 }
418 throw new IllegalArgumentException(message);
419 }
420 return stream;
421 }
422
423 @Override
424 public void consumeReceivedSettings(Http2Settings settings) {
425 if (outstandingRemoteSettingsQueue == null) {
426 outstandingRemoteSettingsQueue = new ArrayDeque<Http2Settings>(2);
427 }
428 outstandingRemoteSettingsQueue.add(settings);
429 }
430
431
432
433
434
435
436
437
438
439
440 private final class FlowControlledData extends FlowControlledBase {
441 private final CoalescingBufferQueue queue;
442 private int dataSize;
443
444 FlowControlledData(Http2Stream stream, ByteBuf buf, int padding, boolean endOfStream,
445 ChannelPromise promise) {
446 super(stream, padding, endOfStream, promise);
447 queue = new CoalescingBufferQueue(promise.channel());
448 queue.add(buf, promise);
449 dataSize = queue.readableBytes();
450 }
451
452 @Override
453 public int size() {
454 return dataSize + padding;
455 }
456
457 @Override
458 public void error(ChannelHandlerContext ctx, Throwable cause) {
459 queue.releaseAndFailAll(cause);
460
461
462
463
464
465 lifecycleManager.onError(ctx, true, cause);
466 }
467
468 @Override
469 public void write(ChannelHandlerContext ctx, int allowedBytes) {
470 int queuedData = queue.readableBytes();
471 if (!endOfStream) {
472 if (queuedData == 0) {
473 if (queue.isEmpty()) {
474
475
476
477
478
479
480 padding = dataSize = 0;
481 } else {
482
483
484
485 ChannelPromise writePromise = ctx.newPromise().addListener(this);
486 ctx.write(queue.remove(0, writePromise), writePromise);
487 }
488 return;
489 }
490
491 if (allowedBytes == 0) {
492 return;
493 }
494 }
495
496
497 int writableData = min(queuedData, allowedBytes);
498 ChannelPromise writePromise = ctx.newPromise().addListener(this);
499 ByteBuf toWrite = queue.remove(writableData, writePromise);
500 dataSize = queue.readableBytes();
501
502
503 int writablePadding = min(allowedBytes - writableData, padding);
504 padding -= writablePadding;
505
506
507 frameWriter().writeData(ctx, stream.id(), toWrite, writablePadding,
508 endOfStream && size() == 0, writePromise);
509 }
510
511 @Override
512 public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
513 FlowControlledData nextData;
514 if (FlowControlledData.class != next.getClass() ||
515 MAX_VALUE - (nextData = (FlowControlledData) next).size() < size()) {
516 return false;
517 }
518 nextData.queue.copyTo(queue);
519 dataSize = queue.readableBytes();
520
521 padding = Math.max(padding, nextData.padding);
522 endOfStream = nextData.endOfStream;
523 return true;
524 }
525 }
526
527 private void notifyLifecycleManagerOnError(ChannelFuture future, final ChannelHandlerContext ctx) {
528 future.addListener(new ChannelFutureListener() {
529 @Override
530 public void operationComplete(ChannelFuture future) throws Exception {
531 Throwable cause = future.cause();
532 if (cause != null) {
533 lifecycleManager.onError(ctx, true, cause);
534 }
535 }
536 });
537 }
538
539
540
541
542
543
544 private final class FlowControlledHeaders extends FlowControlledBase {
545 private final Http2Headers headers;
546 private final boolean hasPriority;
547 private final int streamDependency;
548 private final short weight;
549 private final boolean exclusive;
550
551 FlowControlledHeaders(Http2Stream stream, Http2Headers headers, boolean hasPriority,
552 int streamDependency, short weight, boolean exclusive,
553 int padding, boolean endOfStream, ChannelPromise promise) {
554 super(stream, padding, endOfStream, promise.unvoid());
555 this.headers = headers;
556 this.hasPriority = hasPriority;
557 this.streamDependency = streamDependency;
558 this.weight = weight;
559 this.exclusive = exclusive;
560 }
561
562 @Override
563 public int size() {
564 return 0;
565 }
566
567 @Override
568 public void error(ChannelHandlerContext ctx, Throwable cause) {
569 if (ctx != null) {
570 lifecycleManager.onError(ctx, true, cause);
571 }
572 promise.tryFailure(cause);
573 }
574
575 @Override
576 public void write(ChannelHandlerContext ctx, int allowedBytes) {
577 boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
578
579
580 promise.addListener(this);
581
582 ChannelFuture f = sendHeaders(frameWriter, ctx, stream.id(), headers, hasPriority, streamDependency,
583 weight, exclusive, padding, endOfStream, promise);
584
585 Throwable failureCause = f.cause();
586 if (failureCause == null) {
587
588
589 stream.headersSent(isInformational);
590 }
591 }
592
593 @Override
594 public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
595 return false;
596 }
597 }
598
599
600
601
602 public abstract class FlowControlledBase implements Http2RemoteFlowController.FlowControlled,
603 ChannelFutureListener {
604 protected final Http2Stream stream;
605 protected ChannelPromise promise;
606 protected boolean endOfStream;
607 protected int padding;
608
609 FlowControlledBase(final Http2Stream stream, int padding, boolean endOfStream,
610 final ChannelPromise promise) {
611 checkPositiveOrZero(padding, "padding");
612 this.padding = padding;
613 this.endOfStream = endOfStream;
614 this.stream = stream;
615 this.promise = promise;
616 }
617
618 @Override
619 public void writeComplete() {
620 if (endOfStream) {
621 lifecycleManager.closeStreamLocal(stream, promise);
622 }
623 }
624
625 @Override
626 public void operationComplete(ChannelFuture future) throws Exception {
627 if (!future.isSuccess()) {
628 error(flowController().channelHandlerContext(), future.cause());
629 }
630 }
631 }
632 }