1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http2;
17
18 import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
19 import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
20 import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
21 import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
22 import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
23 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
24 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
25 import static io.netty.handler.codec.http2.Http2Exception.streamError;
26 import static io.netty.util.internal.ObjectUtil.checkNotNull;
27 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
28 import static java.lang.Math.max;
29 import static java.lang.Math.min;
30 import io.netty.buffer.ByteBuf;
31 import io.netty.channel.ChannelHandlerContext;
32 import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
33 import io.netty.handler.codec.http2.Http2Exception.StreamException;
34 import io.netty.util.internal.PlatformDependent;
35
36
37
38
39
40
41
42 public class DefaultHttp2LocalFlowController implements Http2LocalFlowController {
43
44
45
46
47 public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
48
49 private final Http2Connection connection;
50 private final Http2Connection.PropertyKey stateKey;
51 private Http2FrameWriter frameWriter;
52 private ChannelHandlerContext ctx;
53 private float windowUpdateRatio;
54 private int initialWindowSize = DEFAULT_WINDOW_SIZE;
55
56 public DefaultHttp2LocalFlowController(Http2Connection connection) {
57 this(connection, DEFAULT_WINDOW_UPDATE_RATIO, false);
58 }
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 public DefaultHttp2LocalFlowController(Http2Connection connection,
74 float windowUpdateRatio,
75 boolean autoRefillConnectionWindow) {
76 this.connection = checkNotNull(connection, "connection");
77 windowUpdateRatio(windowUpdateRatio);
78
79
80 stateKey = connection.newKey();
81 FlowState connectionState = autoRefillConnectionWindow ?
82 new AutoRefillState(connection.connectionStream(), initialWindowSize) :
83 new DefaultState(connection.connectionStream(), initialWindowSize);
84 connection.connectionStream().setProperty(stateKey, connectionState);
85
86
87 connection.addListener(new Http2ConnectionAdapter() {
88 @Override
89 public void onStreamAdded(Http2Stream stream) {
90
91
92 stream.setProperty(stateKey, REDUCED_FLOW_STATE);
93 }
94
95 @Override
96 public void onStreamActive(Http2Stream stream) {
97
98
99 stream.setProperty(stateKey, new DefaultState(stream, initialWindowSize));
100 }
101
102 @Override
103 public void onStreamClosed(Http2Stream stream) {
104 try {
105
106
107 FlowState state = state(stream);
108 int unconsumedBytes = state.unconsumedBytes();
109 if (ctx != null && unconsumedBytes > 0) {
110 if (consumeAllBytes(state, unconsumedBytes)) {
111
112
113 ctx.flush();
114 }
115 }
116 } catch (Http2Exception e) {
117 PlatformDependent.throwException(e);
118 } finally {
119
120
121
122 stream.setProperty(stateKey, REDUCED_FLOW_STATE);
123 }
124 }
125 });
126 }
127
128 @Override
129 public DefaultHttp2LocalFlowController frameWriter(Http2FrameWriter frameWriter) {
130 this.frameWriter = checkNotNull(frameWriter, "frameWriter");
131 return this;
132 }
133
134 @Override
135 public void channelHandlerContext(ChannelHandlerContext ctx) {
136 this.ctx = checkNotNull(ctx, "ctx");
137 }
138
139 @Override
140 public void initialWindowSize(int newWindowSize) throws Http2Exception {
141 assert ctx == null || ctx.executor().inEventLoop();
142 int delta = newWindowSize - initialWindowSize;
143 initialWindowSize = newWindowSize;
144
145 WindowUpdateVisitor visitor = new WindowUpdateVisitor(delta);
146 connection.forEachActiveStream(visitor);
147 visitor.throwIfError();
148 }
149
150 @Override
151 public int initialWindowSize() {
152 return initialWindowSize;
153 }
154
155 @Override
156 public int windowSize(Http2Stream stream) {
157 return state(stream).windowSize();
158 }
159
160 @Override
161 public int initialWindowSize(Http2Stream stream) {
162 return state(stream).initialWindowSize();
163 }
164
165 @Override
166 public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
167 assert ctx != null && ctx.executor().inEventLoop();
168 FlowState state = state(stream);
169
170
171 state.incrementInitialStreamWindow(delta);
172 state.writeWindowUpdateIfNeeded();
173 }
174
175 @Override
176 public boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception {
177 assert ctx != null && ctx.executor().inEventLoop();
178 checkPositiveOrZero(numBytes, "numBytes");
179 if (numBytes == 0) {
180 return false;
181 }
182
183
184
185 if (stream != null && !isClosed(stream)) {
186 if (stream.id() == CONNECTION_STREAM_ID) {
187 throw new UnsupportedOperationException("Returning bytes for the connection window is not supported");
188 }
189
190 return consumeAllBytes(state(stream), numBytes);
191 }
192 return false;
193 }
194
195 private boolean consumeAllBytes(FlowState state, int numBytes) throws Http2Exception {
196 return connectionState().consumeBytes(numBytes) | state.consumeBytes(numBytes);
197 }
198
199 @Override
200 public int unconsumedBytes(Http2Stream stream) {
201 return state(stream).unconsumedBytes();
202 }
203
204 private static void checkValidRatio(float ratio) {
205 if (Double.compare(ratio, 0.0) <= 0 || Double.compare(ratio, 1.0) >= 0) {
206 throw new IllegalArgumentException("Invalid ratio: " + ratio);
207 }
208 }
209
210
211
212
213
214
215
216
217 public void windowUpdateRatio(float ratio) {
218 assert ctx == null || ctx.executor().inEventLoop();
219 checkValidRatio(ratio);
220 windowUpdateRatio = ratio;
221 }
222
223
224
225
226
227
228 public float windowUpdateRatio() {
229 return windowUpdateRatio;
230 }
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245 public void windowUpdateRatio(Http2Stream stream, float ratio) throws Http2Exception {
246 assert ctx != null && ctx.executor().inEventLoop();
247 checkValidRatio(ratio);
248 FlowState state = state(stream);
249 state.windowUpdateRatio(ratio);
250 state.writeWindowUpdateIfNeeded();
251 }
252
253
254
255
256
257
258
259 public float windowUpdateRatio(Http2Stream stream) throws Http2Exception {
260 return state(stream).windowUpdateRatio();
261 }
262
263 @Override
264 public void receiveFlowControlledFrame(Http2Stream stream, ByteBuf data, int padding,
265 boolean endOfStream) throws Http2Exception {
266 assert ctx != null && ctx.executor().inEventLoop();
267 int dataLength = data.readableBytes() + padding;
268
269
270 FlowState connectionState = connectionState();
271 connectionState.receiveFlowControlledFrame(dataLength);
272
273 if (stream != null && !isClosed(stream)) {
274
275 FlowState state = state(stream);
276 state.endOfStream(endOfStream);
277 state.receiveFlowControlledFrame(dataLength);
278 } else if (dataLength > 0) {
279
280 connectionState.consumeBytes(dataLength);
281 }
282 }
283
284 private FlowState connectionState() {
285 return connection.connectionStream().getProperty(stateKey);
286 }
287
288 private FlowState state(Http2Stream stream) {
289 return stream.getProperty(stateKey);
290 }
291
292 private static boolean isClosed(Http2Stream stream) {
293 return stream.state() == Http2Stream.State.CLOSED;
294 }
295
296
297
298
299
300 private final class AutoRefillState extends DefaultState {
301 AutoRefillState(Http2Stream stream, int initialWindowSize) {
302 super(stream, initialWindowSize);
303 }
304
305 @Override
306 public void receiveFlowControlledFrame(int dataLength) throws Http2Exception {
307 super.receiveFlowControlledFrame(dataLength);
308
309 super.consumeBytes(dataLength);
310 }
311
312 @Override
313 public boolean consumeBytes(int numBytes) throws Http2Exception {
314
315 return false;
316 }
317 }
318
319
320
321
322 private class DefaultState implements FlowState {
323 private final Http2Stream stream;
324
325
326
327
328 private int window;
329
330
331
332
333
334
335
336 private int processedWindow;
337
338
339
340
341
342 private int initialStreamWindowSize;
343
344
345
346
347
348
349 private float streamWindowUpdateRatio;
350
351 private int lowerBound;
352 private boolean endOfStream;
353
354 DefaultState(Http2Stream stream, int initialWindowSize) {
355 this.stream = stream;
356 window(initialWindowSize);
357 streamWindowUpdateRatio = windowUpdateRatio;
358 }
359
360 @Override
361 public void window(int initialWindowSize) {
362 assert ctx == null || ctx.executor().inEventLoop();
363 window = processedWindow = initialStreamWindowSize = initialWindowSize;
364 }
365
366 @Override
367 public int windowSize() {
368 return window;
369 }
370
371 @Override
372 public int initialWindowSize() {
373 return initialStreamWindowSize;
374 }
375
376 @Override
377 public void endOfStream(boolean endOfStream) {
378 this.endOfStream = endOfStream;
379 }
380
381 @Override
382 public float windowUpdateRatio() {
383 return streamWindowUpdateRatio;
384 }
385
386 @Override
387 public void windowUpdateRatio(float ratio) {
388 assert ctx == null || ctx.executor().inEventLoop();
389 streamWindowUpdateRatio = ratio;
390 }
391
392 @Override
393 public void incrementInitialStreamWindow(int delta) {
394
395 int newValue = (int) min(MAX_INITIAL_WINDOW_SIZE,
396 max(MIN_INITIAL_WINDOW_SIZE, initialStreamWindowSize + (long) delta));
397 delta = newValue - initialStreamWindowSize;
398
399 initialStreamWindowSize += delta;
400 }
401
402 @Override
403 public void incrementFlowControlWindows(int delta) throws Http2Exception {
404 if (delta > 0 && window > MAX_INITIAL_WINDOW_SIZE - delta) {
405 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
406 "Flow control window overflowed for stream: %d", stream.id());
407 }
408
409 window += delta;
410 processedWindow += delta;
411 lowerBound = min(delta, 0);
412 }
413
414 @Override
415 public void receiveFlowControlledFrame(int dataLength) throws Http2Exception {
416 assert dataLength >= 0;
417
418
419 window -= dataLength;
420
421
422
423
424
425
426 if (window < lowerBound) {
427 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
428 "Flow control window exceeded for stream: %d", stream.id());
429 }
430 }
431
432 private void returnProcessedBytes(int delta) throws Http2Exception {
433 if (processedWindow - delta < window) {
434 throw streamError(stream.id(), INTERNAL_ERROR,
435 "Attempting to return too many bytes for stream %d", stream.id());
436 }
437 processedWindow -= delta;
438 }
439
440 @Override
441 public boolean consumeBytes(int numBytes) throws Http2Exception {
442
443 returnProcessedBytes(numBytes);
444 return writeWindowUpdateIfNeeded();
445 }
446
447 @Override
448 public int unconsumedBytes() {
449 return processedWindow - window;
450 }
451
452 @Override
453 public boolean writeWindowUpdateIfNeeded() throws Http2Exception {
454 if (endOfStream || initialStreamWindowSize <= 0 ||
455
456 isClosed(stream)) {
457 return false;
458 }
459
460 int threshold = (int) (initialStreamWindowSize * streamWindowUpdateRatio);
461 if (processedWindow <= threshold) {
462 writeWindowUpdate();
463 return true;
464 }
465 return false;
466 }
467
468
469
470
471
472 private void writeWindowUpdate() throws Http2Exception {
473
474 int deltaWindowSize = initialStreamWindowSize - processedWindow;
475 try {
476 incrementFlowControlWindows(deltaWindowSize);
477 } catch (Throwable t) {
478 throw connectionError(INTERNAL_ERROR, t,
479 "Attempting to return too many bytes for stream %d", stream.id());
480 }
481
482
483 frameWriter.writeWindowUpdate(ctx, stream.id(), deltaWindowSize, ctx.newPromise());
484 }
485 }
486
487
488
489
490
491 private static final FlowState REDUCED_FLOW_STATE = new FlowState() {
492
493 @Override
494 public int windowSize() {
495 return 0;
496 }
497
498 @Override
499 public int initialWindowSize() {
500 return 0;
501 }
502
503 @Override
504 public void window(int initialWindowSize) {
505 throw new UnsupportedOperationException();
506 }
507
508 @Override
509 public void incrementInitialStreamWindow(int delta) {
510
511
512 }
513
514 @Override
515 public boolean writeWindowUpdateIfNeeded() throws Http2Exception {
516 throw new UnsupportedOperationException();
517 }
518
519 @Override
520 public boolean consumeBytes(int numBytes) throws Http2Exception {
521 return false;
522 }
523
524 @Override
525 public int unconsumedBytes() {
526 return 0;
527 }
528
529 @Override
530 public float windowUpdateRatio() {
531 throw new UnsupportedOperationException();
532 }
533
534 @Override
535 public void windowUpdateRatio(float ratio) {
536 throw new UnsupportedOperationException();
537 }
538
539 @Override
540 public void receiveFlowControlledFrame(int dataLength) throws Http2Exception {
541 throw new UnsupportedOperationException();
542 }
543
544 @Override
545 public void incrementFlowControlWindows(int delta) throws Http2Exception {
546
547
548 }
549
550 @Override
551 public void endOfStream(boolean endOfStream) {
552 throw new UnsupportedOperationException();
553 }
554 };
555
556
557
558
559 private interface FlowState {
560
561 int windowSize();
562
563 int initialWindowSize();
564
565 void window(int initialWindowSize);
566
567
568
569
570
571 void incrementInitialStreamWindow(int delta);
572
573
574
575
576
577
578 boolean writeWindowUpdateIfNeeded() throws Http2Exception;
579
580
581
582
583
584
585
586
587
588 boolean consumeBytes(int numBytes) throws Http2Exception;
589
590 int unconsumedBytes();
591
592 float windowUpdateRatio();
593
594 void windowUpdateRatio(float ratio);
595
596
597
598
599
600
601 void receiveFlowControlledFrame(int dataLength) throws Http2Exception;
602
603
604
605
606
607
608 void incrementFlowControlWindows(int delta) throws Http2Exception;
609
610 void endOfStream(boolean endOfStream);
611 }
612
613
614
615
616 private final class WindowUpdateVisitor implements Http2StreamVisitor {
617 private CompositeStreamException compositeException;
618 private final int delta;
619
620 WindowUpdateVisitor(int delta) {
621 this.delta = delta;
622 }
623
624 @Override
625 public boolean visit(Http2Stream stream) throws Http2Exception {
626 try {
627
628 FlowState state = state(stream);
629 state.incrementFlowControlWindows(delta);
630 state.incrementInitialStreamWindow(delta);
631 } catch (StreamException e) {
632 if (compositeException == null) {
633 compositeException = new CompositeStreamException(e.error(), 4);
634 }
635 compositeException.add(e);
636 }
637 return true;
638 }
639
640 public void throwIfError() throws CompositeStreamException {
641 if (compositeException != null) {
642 throw compositeException;
643 }
644 }
645 }
646 }