1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty5.handler.codec.http2;
16
17 import io.netty5.buffer.api.Buffer;
18 import io.netty5.channel.ChannelHandlerContext;
19 import io.netty5.handler.codec.http2.Http2Exception.CompositeStreamException;
20 import io.netty5.handler.codec.http2.Http2Exception.StreamException;
21 import io.netty5.util.internal.PlatformDependent;
22 import io.netty5.util.internal.UnstableApi;
23
24 import static io.netty5.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
25 import static io.netty5.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
26 import static io.netty5.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
27 import static io.netty5.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
28 import static io.netty5.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
29 import static io.netty5.handler.codec.http2.Http2Error.INTERNAL_ERROR;
30 import static io.netty5.handler.codec.http2.Http2Exception.connectionError;
31 import static io.netty5.handler.codec.http2.Http2Exception.streamError;
32 import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
33 import static java.lang.Math.max;
34 import static java.lang.Math.min;
35 import static java.util.Objects.requireNonNull;
36
37
38
39
40
41
42
43 @UnstableApi
44 public class DefaultHttp2LocalFlowController implements Http2LocalFlowController {
45
46
47
48
49 public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
50
51 private final Http2Connection connection;
52 private final Http2Connection.PropertyKey stateKey;
53 private Http2FrameWriter frameWriter;
54 private ChannelHandlerContext ctx;
55 private float windowUpdateRatio;
56 private int initialWindowSize = DEFAULT_WINDOW_SIZE;
57
58 public DefaultHttp2LocalFlowController(Http2Connection connection) {
59 this(connection, DEFAULT_WINDOW_UPDATE_RATIO, false);
60 }
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 public DefaultHttp2LocalFlowController(Http2Connection connection,
76 float windowUpdateRatio,
77 boolean autoRefillConnectionWindow) {
78 this.connection = requireNonNull(connection, "connection");
79 windowUpdateRatio(windowUpdateRatio);
80
81
82 stateKey = connection.newKey();
83 FlowState connectionState = autoRefillConnectionWindow ?
84 new AutoRefillState(connection.connectionStream(), initialWindowSize) :
85 new DefaultState(connection.connectionStream(), initialWindowSize);
86 connection.connectionStream().setProperty(stateKey, connectionState);
87
88
89 connection.addListener(new Http2ConnectionAdapter() {
90 @Override
91 public void onStreamAdded(Http2Stream stream) {
92
93
94 stream.setProperty(stateKey, REDUCED_FLOW_STATE);
95 }
96
97 @Override
98 public void onStreamActive(Http2Stream stream) {
99
100
101 stream.setProperty(stateKey, new DefaultState(stream, initialWindowSize));
102 }
103
104 @Override
105 public void onStreamClosed(Http2Stream stream) {
106 try {
107
108
109 FlowState state = state(stream);
110 int unconsumedBytes = state.unconsumedBytes();
111 if (ctx != null && unconsumedBytes > 0) {
112 if (consumeAllBytes(state, unconsumedBytes)) {
113
114
115 ctx.flush();
116 }
117 }
118 } catch (Http2Exception e) {
119 PlatformDependent.throwException(e);
120 } finally {
121
122
123
124 stream.setProperty(stateKey, REDUCED_FLOW_STATE);
125 }
126 }
127 });
128 }
129
130 @Override
131 public DefaultHttp2LocalFlowController frameWriter(Http2FrameWriter frameWriter) {
132 this.frameWriter = requireNonNull(frameWriter, "frameWriter");
133 return this;
134 }
135
136 @Override
137 public void channelHandlerContext(ChannelHandlerContext ctx) {
138 this.ctx = requireNonNull(ctx, "ctx");
139 }
140
141 @Override
142 public void initialWindowSize(int newWindowSize) throws Http2Exception {
143 assert ctx == null || ctx.executor().inEventLoop();
144 int delta = newWindowSize - initialWindowSize;
145 initialWindowSize = newWindowSize;
146
147 WindowUpdateVisitor visitor = new WindowUpdateVisitor(delta);
148 connection.forEachActiveStream(visitor);
149 visitor.throwIfError();
150 }
151
152 @Override
153 public int initialWindowSize() {
154 return initialWindowSize;
155 }
156
157 @Override
158 public int windowSize(Http2Stream stream) {
159 return state(stream).windowSize();
160 }
161
162 @Override
163 public int initialWindowSize(Http2Stream stream) {
164 return state(stream).initialWindowSize();
165 }
166
167 @Override
168 public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
169 assert ctx != null && ctx.executor().inEventLoop();
170 FlowState state = state(stream);
171
172
173 state.incrementInitialStreamWindow(delta);
174 state.writeWindowUpdateIfNeeded();
175 }
176
177 @Override
178 public boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception {
179 assert ctx != null && ctx.executor().inEventLoop();
180 checkPositiveOrZero(numBytes, "numBytes");
181 if (numBytes == 0) {
182 return false;
183 }
184
185
186
187 if (stream != null && !isClosed(stream)) {
188 if (stream.id() == CONNECTION_STREAM_ID) {
189 throw new UnsupportedOperationException("Returning bytes for the connection window is not supported");
190 }
191
192 return consumeAllBytes(state(stream), numBytes);
193 }
194 return false;
195 }
196
197 private boolean consumeAllBytes(FlowState state, int numBytes) throws Http2Exception {
198 return connectionState().consumeBytes(numBytes) | state.consumeBytes(numBytes);
199 }
200
201 @Override
202 public int unconsumedBytes(Http2Stream stream) {
203 return state(stream).unconsumedBytes();
204 }
205
206 private static void checkValidRatio(float ratio) {
207 if (Double.compare(ratio, 0.0) <= 0 || Double.compare(ratio, 1.0) >= 0) {
208 throw new IllegalArgumentException("Invalid ratio: " + ratio);
209 }
210 }
211
212
213
214
215
216
217
218
219 public void windowUpdateRatio(float ratio) {
220 assert ctx == null || ctx.executor().inEventLoop();
221 checkValidRatio(ratio);
222 windowUpdateRatio = ratio;
223 }
224
225
226
227
228
229
230 public float windowUpdateRatio() {
231 return windowUpdateRatio;
232 }
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247 public void windowUpdateRatio(Http2Stream stream, float ratio) throws Http2Exception {
248 assert ctx != null && ctx.executor().inEventLoop();
249 checkValidRatio(ratio);
250 FlowState state = state(stream);
251 state.windowUpdateRatio(ratio);
252 state.writeWindowUpdateIfNeeded();
253 }
254
255
256
257
258
259
260
261 public float windowUpdateRatio(Http2Stream stream) throws Http2Exception {
262 return state(stream).windowUpdateRatio();
263 }
264
265 @Override
266 public void receiveFlowControlledFrame(Http2Stream stream, Buffer data, int padding,
267 boolean endOfStream) throws Http2Exception {
268 assert ctx != null && ctx.executor().inEventLoop();
269 int dataLength = data.readableBytes() + padding;
270
271
272 FlowState connectionState = connectionState();
273 connectionState.receiveFlowControlledFrame(dataLength);
274
275 if (stream != null && !isClosed(stream)) {
276
277 FlowState state = state(stream);
278 state.endOfStream(endOfStream);
279 state.receiveFlowControlledFrame(dataLength);
280 } else if (dataLength > 0) {
281
282 connectionState.consumeBytes(dataLength);
283 }
284 }
285
286 private FlowState connectionState() {
287 return connection.connectionStream().getProperty(stateKey);
288 }
289
290 private FlowState state(Http2Stream stream) {
291 return stream.getProperty(stateKey);
292 }
293
294 private static boolean isClosed(Http2Stream stream) {
295 return stream.state() == Http2Stream.State.CLOSED;
296 }
297
298
299
300
301
302 private final class AutoRefillState extends DefaultState {
303 AutoRefillState(Http2Stream stream, int initialWindowSize) {
304 super(stream, initialWindowSize);
305 }
306
307 @Override
308 public void receiveFlowControlledFrame(int dataLength) throws Http2Exception {
309 super.receiveFlowControlledFrame(dataLength);
310
311 super.consumeBytes(dataLength);
312 }
313
314 @Override
315 public boolean consumeBytes(int numBytes) throws Http2Exception {
316
317 return false;
318 }
319 }
320
321
322
323
324 private class DefaultState implements FlowState {
325 private final Http2Stream stream;
326
327
328
329
330 private int window;
331
332
333
334
335
336
337
338 private int processedWindow;
339
340
341
342
343
344 private int initialStreamWindowSize;
345
346
347
348
349
350
351 private float streamWindowUpdateRatio;
352
353 private int lowerBound;
354 private boolean endOfStream;
355
356 DefaultState(Http2Stream stream, int initialWindowSize) {
357 this.stream = stream;
358 window(initialWindowSize);
359 streamWindowUpdateRatio = windowUpdateRatio;
360 }
361
362 @Override
363 public void window(int initialWindowSize) {
364 assert ctx == null || ctx.executor().inEventLoop();
365 window = processedWindow = initialStreamWindowSize = initialWindowSize;
366 }
367
368 @Override
369 public int windowSize() {
370 return window;
371 }
372
373 @Override
374 public int initialWindowSize() {
375 return initialStreamWindowSize;
376 }
377
378 @Override
379 public void endOfStream(boolean endOfStream) {
380 this.endOfStream = endOfStream;
381 }
382
383 @Override
384 public float windowUpdateRatio() {
385 return streamWindowUpdateRatio;
386 }
387
388 @Override
389 public void windowUpdateRatio(float ratio) {
390 assert ctx == null || ctx.executor().inEventLoop();
391 streamWindowUpdateRatio = ratio;
392 }
393
394 @Override
395 public void incrementInitialStreamWindow(int delta) {
396
397 int newValue = (int) min(MAX_INITIAL_WINDOW_SIZE,
398 max(MIN_INITIAL_WINDOW_SIZE, initialStreamWindowSize + (long) delta));
399 delta = newValue - initialStreamWindowSize;
400
401 initialStreamWindowSize += delta;
402 }
403
404 @Override
405 public void incrementFlowControlWindows(int delta) throws Http2Exception {
406 if (delta > 0 && window > MAX_INITIAL_WINDOW_SIZE - delta) {
407 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
408 "Flow control window overflowed for stream: %d", stream.id());
409 }
410
411 window += delta;
412 processedWindow += delta;
413 lowerBound = min(delta, 0);
414 }
415
416 @Override
417 public void receiveFlowControlledFrame(int dataLength) throws Http2Exception {
418 assert dataLength >= 0;
419
420
421 window -= dataLength;
422
423
424
425
426
427
428 if (window < lowerBound) {
429 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
430 "Flow control window exceeded for stream: %d", stream.id());
431 }
432 }
433
434 private void returnProcessedBytes(int delta) throws Http2Exception {
435 if (processedWindow - delta < window) {
436 throw streamError(stream.id(), INTERNAL_ERROR,
437 "Attempting to return too many bytes for stream %d", stream.id());
438 }
439 processedWindow -= delta;
440 }
441
442 @Override
443 public boolean consumeBytes(int numBytes) throws Http2Exception {
444
445 returnProcessedBytes(numBytes);
446 return writeWindowUpdateIfNeeded();
447 }
448
449 @Override
450 public int unconsumedBytes() {
451 return processedWindow - window;
452 }
453
454 @Override
455 public boolean writeWindowUpdateIfNeeded() throws Http2Exception {
456 if (endOfStream || initialStreamWindowSize <= 0 ||
457
458 isClosed(stream)) {
459 return false;
460 }
461
462 int threshold = (int) (initialStreamWindowSize * streamWindowUpdateRatio);
463 if (processedWindow <= threshold) {
464 writeWindowUpdate();
465 return true;
466 }
467 return false;
468 }
469
470
471
472
473
474 private void writeWindowUpdate() throws Http2Exception {
475
476 int deltaWindowSize = initialStreamWindowSize - processedWindow;
477 try {
478 incrementFlowControlWindows(deltaWindowSize);
479 } catch (Throwable t) {
480 throw connectionError(INTERNAL_ERROR, t,
481 "Attempting to return too many bytes for stream %d", stream.id());
482 }
483
484
485 frameWriter.writeWindowUpdate(ctx, stream.id(), deltaWindowSize);
486 }
487 }
488
489
490
491
492
493 private static final FlowState REDUCED_FLOW_STATE = new FlowState() {
494
495 @Override
496 public int windowSize() {
497 return 0;
498 }
499
500 @Override
501 public int initialWindowSize() {
502 return 0;
503 }
504
505 @Override
506 public void window(int initialWindowSize) {
507 throw new UnsupportedOperationException();
508 }
509
510 @Override
511 public void incrementInitialStreamWindow(int delta) {
512
513
514 }
515
516 @Override
517 public boolean writeWindowUpdateIfNeeded() throws Http2Exception {
518 throw new UnsupportedOperationException();
519 }
520
521 @Override
522 public boolean consumeBytes(int numBytes) throws Http2Exception {
523 return false;
524 }
525
526 @Override
527 public int unconsumedBytes() {
528 return 0;
529 }
530
531 @Override
532 public float windowUpdateRatio() {
533 throw new UnsupportedOperationException();
534 }
535
536 @Override
537 public void windowUpdateRatio(float ratio) {
538 throw new UnsupportedOperationException();
539 }
540
541 @Override
542 public void receiveFlowControlledFrame(int dataLength) throws Http2Exception {
543 throw new UnsupportedOperationException();
544 }
545
546 @Override
547 public void incrementFlowControlWindows(int delta) throws Http2Exception {
548
549
550 }
551
552 @Override
553 public void endOfStream(boolean endOfStream) {
554 throw new UnsupportedOperationException();
555 }
556 };
557
558
559
560
561 private interface FlowState {
562
563 int windowSize();
564
565 int initialWindowSize();
566
567 void window(int initialWindowSize);
568
569
570
571
572
573 void incrementInitialStreamWindow(int delta);
574
575
576
577
578
579
580 boolean writeWindowUpdateIfNeeded() throws Http2Exception;
581
582
583
584
585
586
587
588
589
590
591 boolean consumeBytes(int numBytes) throws Http2Exception;
592
593 int unconsumedBytes();
594
595 float windowUpdateRatio();
596
597 void windowUpdateRatio(float ratio);
598
599
600
601
602
603
604 void receiveFlowControlledFrame(int dataLength) throws Http2Exception;
605
606
607
608
609
610
611 void incrementFlowControlWindows(int delta) throws Http2Exception;
612
613 void endOfStream(boolean endOfStream);
614 }
615
616
617
618
619 private final class WindowUpdateVisitor implements Http2StreamVisitor {
620 private CompositeStreamException compositeException;
621 private final int delta;
622
623 WindowUpdateVisitor(int delta) {
624 this.delta = delta;
625 }
626
627 @Override
628 public boolean visit(Http2Stream stream) throws Http2Exception {
629 try {
630
631 FlowState state = state(stream);
632 state.incrementFlowControlWindows(delta);
633 state.incrementInitialStreamWindow(delta);
634 } catch (StreamException e) {
635 if (compositeException == null) {
636 compositeException = new CompositeStreamException(e.error(), 4);
637 }
638 compositeException.add(e);
639 }
640 return true;
641 }
642
643 public void throwIfError() throws CompositeStreamException {
644 if (compositeException != null) {
645 throw compositeException;
646 }
647 }
648 }
649 }