1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty5.handler.codec.http2;
16
17 import io.netty5.channel.ChannelHandlerContext;
18 import io.netty5.channel.ChannelOption;
19 import io.netty5.util.internal.UnstableApi;
20 import io.netty5.util.internal.logging.InternalLogger;
21 import io.netty5.util.internal.logging.InternalLoggerFactory;
22
23 import java.util.ArrayDeque;
24 import java.util.Deque;
25
26 import static io.netty5.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
27 import static io.netty5.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
28 import static io.netty5.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
29 import static io.netty5.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
30 import static io.netty5.handler.codec.http2.Http2Error.INTERNAL_ERROR;
31 import static io.netty5.handler.codec.http2.Http2Error.STREAM_CLOSED;
32 import static io.netty5.handler.codec.http2.Http2Exception.streamError;
33 import static io.netty5.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
34 import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
35 import static java.lang.Math.max;
36 import static java.lang.Math.min;
37 import static java.util.Objects.requireNonNull;
38
39
40
41
42
43
44
45 @UnstableApi
46 public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController {
47 private static final InternalLogger logger =
48 InternalLoggerFactory.getInstance(DefaultHttp2RemoteFlowController.class);
49 private static final int MIN_WRITABLE_CHUNK = 32 * 1024;
50 private final Http2Connection connection;
51 private final Http2Connection.PropertyKey stateKey;
52 private final StreamByteDistributor streamByteDistributor;
53 private final FlowState connectionState;
54 private int initialWindowSize = DEFAULT_WINDOW_SIZE;
55 private WritabilityMonitor monitor;
56 private ChannelHandlerContext ctx;
57
58 public DefaultHttp2RemoteFlowController(Http2Connection connection) {
59 this(connection, (Listener) null);
60 }
61
62 public DefaultHttp2RemoteFlowController(Http2Connection connection,
63 StreamByteDistributor streamByteDistributor) {
64 this(connection, streamByteDistributor, null);
65 }
66
67 public DefaultHttp2RemoteFlowController(Http2Connection connection, final Listener listener) {
68 this(connection, new WeightedFairQueueByteDistributor(connection), listener);
69 }
70
71 public DefaultHttp2RemoteFlowController(Http2Connection connection,
72 StreamByteDistributor streamByteDistributor,
73 final Listener listener) {
74 this.connection = requireNonNull(connection, "connection");
75 this.streamByteDistributor = requireNonNull(streamByteDistributor, "streamWriteDistributor");
76
77
78 stateKey = connection.newKey();
79 connectionState = new FlowState(connection.connectionStream());
80 connection.connectionStream().setProperty(stateKey, connectionState);
81
82
83 listener(listener);
84 monitor.windowSize(connectionState, initialWindowSize);
85
86
87 connection.addListener(new Http2ConnectionAdapter() {
88 @Override
89 public void onStreamAdded(Http2Stream stream) {
90
91
92 stream.setProperty(stateKey, new FlowState(stream));
93 }
94
95 @Override
96 public void onStreamActive(Http2Stream stream) {
97
98
99 monitor.windowSize(state(stream), initialWindowSize);
100 }
101
102 @Override
103 public void onStreamClosed(Http2Stream stream) {
104
105
106 state(stream).cancel(STREAM_CLOSED, null);
107 }
108
109 @Override
110 public void onStreamHalfClosed(Http2Stream stream) {
111 if (HALF_CLOSED_LOCAL == stream.state()) {
112
113
114
115
116
117
118
119
120
121
122
123 state(stream).cancel(STREAM_CLOSED, null);
124 }
125 }
126 });
127 }
128
129
130
131
132
133
134 @Override
135 public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
136 this.ctx = requireNonNull(ctx, "ctx");
137
138
139
140 channelWritabilityChanged();
141
142
143
144
145
146 if (isChannelWritable()) {
147 writePendingBytes();
148 }
149 }
150
151 @Override
152 public ChannelHandlerContext channelHandlerContext() {
153 return ctx;
154 }
155
156 @Override
157 public void initialWindowSize(int newWindowSize) throws Http2Exception {
158 assert ctx == null || ctx.executor().inEventLoop();
159 monitor.initialWindowSize(newWindowSize);
160 }
161
162 @Override
163 public int initialWindowSize() {
164 return initialWindowSize;
165 }
166
167 @Override
168 public int windowSize(Http2Stream stream) {
169 return state(stream).windowSize();
170 }
171
172 @Override
173 public boolean isWritable(Http2Stream stream) {
174 return monitor.isWritable(state(stream));
175 }
176
177 @Override
178 public void channelWritabilityChanged() throws Http2Exception {
179 monitor.channelWritabilityChange();
180 }
181
182 @Override
183 public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
184
185 assert weight >= MIN_WEIGHT && weight <= MAX_WEIGHT : "Invalid weight";
186 assert childStreamId != parentStreamId : "A stream cannot depend on itself";
187 assert childStreamId > 0 && parentStreamId >= 0 : "childStreamId must be > 0. parentStreamId must be >= 0.";
188
189 streamByteDistributor.updateDependencyTree(childStreamId, parentStreamId, weight, exclusive);
190 }
191
192 private boolean isChannelWritable() {
193 return ctx != null && isChannelWritable0();
194 }
195
196 private boolean isChannelWritable0() {
197 return ctx.channel().isWritable();
198 }
199
200 @Override
201 public void listener(Listener listener) {
202 monitor = listener == null ? new WritabilityMonitor() : new ListenerWritabilityMonitor(listener);
203 }
204
205 @Override
206 public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
207 assert ctx == null || ctx.executor().inEventLoop();
208 monitor.incrementWindowSize(state(stream), delta);
209 }
210
211 @Override
212 public void addFlowControlled(Http2Stream stream, FlowControlled frame) {
213
214 assert ctx == null || ctx.executor().inEventLoop();
215 requireNonNull(frame, "frame");
216 try {
217 monitor.enqueueFrame(state(stream), frame);
218 } catch (Throwable t) {
219 frame.error(ctx, t);
220 }
221 }
222
223 @Override
224 public boolean hasFlowControlled(Http2Stream stream) {
225 return state(stream).hasFrame();
226 }
227
228 private FlowState state(Http2Stream stream) {
229 return stream.getProperty(stateKey);
230 }
231
232
233
234
235 private int connectionWindowSize() {
236 return connectionState.windowSize();
237 }
238
239 private int minUsableChannelBytes() {
240
241
242
243
244
245
246 return max(ctx.channel().getOption(ChannelOption.WRITE_BUFFER_WATER_MARK).low(), MIN_WRITABLE_CHUNK);
247 }
248
249 private int maxUsableChannelBytes() {
250
251 int channelWritableBytes = (int) min(Integer.MAX_VALUE, ctx.channel().writableBytes());
252 int usableBytes = channelWritableBytes > 0 ? max(channelWritableBytes, minUsableChannelBytes()) : 0;
253
254
255 return min(connectionState.windowSize(), usableBytes);
256 }
257
258
259
260
261
262 private int writableBytes() {
263 return min(connectionWindowSize(), maxUsableChannelBytes());
264 }
265
266 @Override
267 public void writePendingBytes() throws Http2Exception {
268 monitor.writePendingBytes();
269 }
270
271
272
273
274 private final class FlowState implements StreamByteDistributor.StreamState {
275 private final Http2Stream stream;
276 private final Deque<FlowControlled> pendingWriteQueue;
277 private int window;
278 private long pendingBytes;
279 private boolean markedWritable;
280
281
282
283
284 private boolean writing;
285
286
287
288 private boolean cancelled;
289
290 FlowState(Http2Stream stream) {
291 this.stream = stream;
292 pendingWriteQueue = new ArrayDeque<>(2);
293 }
294
295
296
297
298
299 boolean isWritable() {
300 return windowSize() > pendingBytes() && !cancelled;
301 }
302
303
304
305
306 @Override
307 public Http2Stream stream() {
308 return stream;
309 }
310
311
312
313
314 boolean markedWritability() {
315 return markedWritable;
316 }
317
318
319
320
321 void markedWritability(boolean isWritable) {
322 markedWritable = isWritable;
323 }
324
325 @Override
326 public int windowSize() {
327 return window;
328 }
329
330
331
332
333 void windowSize(int initialWindowSize) {
334 window = initialWindowSize;
335 }
336
337
338
339
340
341 int writeAllocatedBytes(int allocated) {
342 final int initialAllocated = allocated;
343 int writtenBytes;
344
345 Throwable cause = null;
346 FlowControlled frame;
347 try {
348 assert !writing;
349 writing = true;
350
351
352 boolean writeOccurred = false;
353 while (!cancelled && (frame = peek()) != null) {
354 int maxBytes = min(allocated, writableWindow());
355 if (maxBytes <= 0 && frame.size() > 0) {
356
357
358 break;
359 }
360 writeOccurred = true;
361 int initialFrameSize = frame.size();
362 try {
363 frame.write(ctx, max(0, maxBytes));
364 if (frame.size() == 0) {
365
366
367
368 pendingWriteQueue.remove();
369 frame.writeComplete();
370 }
371 } finally {
372
373 allocated -= initialFrameSize - frame.size();
374 }
375 }
376
377 if (!writeOccurred) {
378
379 return -1;
380 }
381
382 } catch (Throwable t) {
383
384 cancelled = true;
385 cause = t;
386 } finally {
387 writing = false;
388
389
390 writtenBytes = initialAllocated - allocated;
391
392 decrementPendingBytes(writtenBytes, false);
393 decrementFlowControlWindow(writtenBytes);
394
395
396
397 if (cancelled) {
398 cancel(INTERNAL_ERROR, cause);
399 }
400 }
401 return writtenBytes;
402 }
403
404
405
406
407 int incrementStreamWindow(int delta) throws Http2Exception {
408 if (delta > 0 && Integer.MAX_VALUE - delta < window) {
409 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
410 "Window size overflow for stream: %d", stream.id());
411 }
412 window += delta;
413
414 streamByteDistributor.updateStreamableBytes(this);
415 return window;
416 }
417
418
419
420
421 private int writableWindow() {
422 return min(window, connectionWindowSize());
423 }
424
425 @Override
426 public long pendingBytes() {
427 return pendingBytes;
428 }
429
430
431
432
433 void enqueueFrame(FlowControlled frame) {
434 FlowControlled last = pendingWriteQueue.peekLast();
435 if (last == null) {
436 enqueueFrameWithoutMerge(frame);
437 return;
438 }
439
440 int lastSize = last.size();
441 if (last.merge(ctx, frame)) {
442 incrementPendingBytes(last.size() - lastSize, true);
443 return;
444 }
445 enqueueFrameWithoutMerge(frame);
446 }
447
448 private void enqueueFrameWithoutMerge(FlowControlled frame) {
449 pendingWriteQueue.offer(frame);
450
451
452 incrementPendingBytes(frame.size(), true);
453 }
454
455 @Override
456 public boolean hasFrame() {
457 return !pendingWriteQueue.isEmpty();
458 }
459
460
461
462
463 private FlowControlled peek() {
464 return pendingWriteQueue.peek();
465 }
466
467
468
469
470
471
472 void cancel(Http2Error error, Throwable cause) {
473 cancelled = true;
474
475 if (writing) {
476 return;
477 }
478
479 FlowControlled frame = pendingWriteQueue.poll();
480 if (frame != null) {
481
482 final Http2Exception exception = streamError(stream.id(), error, cause,
483 "Stream closed before write could take place");
484 do {
485 writeError(frame, exception);
486 frame = pendingWriteQueue.poll();
487 } while (frame != null);
488 }
489
490 streamByteDistributor.updateStreamableBytes(this);
491
492 monitor.stateCancelled(this);
493 }
494
495
496
497
498
499 private void incrementPendingBytes(int numBytes, boolean updateStreamableBytes) {
500 pendingBytes += numBytes;
501 monitor.incrementPendingBytes(numBytes);
502 if (updateStreamableBytes) {
503 streamByteDistributor.updateStreamableBytes(this);
504 }
505 }
506
507
508
509
510 private void decrementPendingBytes(int bytes, boolean updateStreamableBytes) {
511 incrementPendingBytes(-bytes, updateStreamableBytes);
512 }
513
514
515
516
517 private void decrementFlowControlWindow(int bytes) {
518 try {
519 int negativeBytes = -bytes;
520 connectionState.incrementStreamWindow(negativeBytes);
521 incrementStreamWindow(negativeBytes);
522 } catch (Http2Exception e) {
523
524 throw new IllegalStateException("Invalid window state when writing frame: " + e.getMessage(), e);
525 }
526 }
527
528
529
530
531
532 private void writeError(FlowControlled frame, Http2Exception cause) {
533 assert ctx != null;
534 decrementPendingBytes(frame.size(), true);
535 frame.error(ctx, cause);
536 }
537 }
538
539
540
541
542 private class WritabilityMonitor implements StreamByteDistributor.Writer {
543 private boolean inWritePendingBytes;
544 private long totalPendingBytes;
545
546 @Override
547 public final void write(Http2Stream stream, int numBytes) {
548 state(stream).writeAllocatedBytes(numBytes);
549 }
550
551
552
553
554
555 void channelWritabilityChange() throws Http2Exception { }
556
557
558
559
560
561 void stateCancelled(FlowState state) { }
562
563
564
565
566
567
568 void windowSize(FlowState state, int initialWindowSize) {
569 state.windowSize(initialWindowSize);
570 }
571
572
573
574
575
576
577
578 void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
579 state.incrementStreamWindow(delta);
580 }
581
582
583
584
585
586
587
588 void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
589 state.enqueueFrame(frame);
590 }
591
592
593
594
595
596
597 final void incrementPendingBytes(int delta) {
598 totalPendingBytes += delta;
599
600
601
602 }
603
604
605
606
607
608
609 final boolean isWritable(FlowState state) {
610 return isWritableConnection() && state.isWritable();
611 }
612
613 final void writePendingBytes() throws Http2Exception {
614
615
616
617
618
619 if (inWritePendingBytes) {
620 return;
621 }
622 inWritePendingBytes = true;
623 try {
624 int bytesToWrite = writableBytes();
625
626
627 for (;;) {
628 if (!streamByteDistributor.distribute(bytesToWrite, this) ||
629 (bytesToWrite = writableBytes()) <= 0 ||
630 !isChannelWritable0()) {
631 break;
632 }
633 }
634 } finally {
635 inWritePendingBytes = false;
636 }
637 }
638
639 void initialWindowSize(int newWindowSize) throws Http2Exception {
640 checkPositiveOrZero(newWindowSize, "newWindowSize");
641
642 final int delta = newWindowSize - initialWindowSize;
643 initialWindowSize = newWindowSize;
644 connection.forEachActiveStream(stream -> {
645 state(stream).incrementStreamWindow(delta);
646 return true;
647 });
648
649 if (delta > 0 && isChannelWritable()) {
650
651 writePendingBytes();
652 }
653 }
654
655 final boolean isWritableConnection() {
656 return connectionState.windowSize() - totalPendingBytes > 0 && isChannelWritable();
657 }
658 }
659
660
661
662
663
664
665
666
667
668 private final class ListenerWritabilityMonitor extends WritabilityMonitor implements Http2StreamVisitor {
669 private final Listener listener;
670
671 ListenerWritabilityMonitor(Listener listener) {
672 this.listener = listener;
673 }
674
675 @Override
676 public boolean visit(Http2Stream stream) throws Http2Exception {
677 FlowState state = state(stream);
678 if (isWritable(state) != state.markedWritability()) {
679 notifyWritabilityChanged(state);
680 }
681 return true;
682 }
683
684 @Override
685 void windowSize(FlowState state, int initialWindowSize) {
686 super.windowSize(state, initialWindowSize);
687 try {
688 checkStateWritability(state);
689 } catch (Http2Exception e) {
690 throw new RuntimeException("Caught unexpected exception from window", e);
691 }
692 }
693
694 @Override
695 void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
696 super.incrementWindowSize(state, delta);
697 checkStateWritability(state);
698 }
699
700 @Override
701 void initialWindowSize(int newWindowSize) throws Http2Exception {
702 super.initialWindowSize(newWindowSize);
703 if (isWritableConnection()) {
704
705
706 checkAllWritabilityChanged();
707 }
708 }
709
710 @Override
711 void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
712 super.enqueueFrame(state, frame);
713 checkConnectionThenStreamWritabilityChanged(state);
714 }
715
716 @Override
717 void stateCancelled(FlowState state) {
718 try {
719 checkConnectionThenStreamWritabilityChanged(state);
720 } catch (Http2Exception e) {
721 throw new RuntimeException("Caught unexpected exception from checkAllWritabilityChanged", e);
722 }
723 }
724
725 @Override
726 void channelWritabilityChange() throws Http2Exception {
727 if (connectionState.markedWritability() != isChannelWritable()) {
728 checkAllWritabilityChanged();
729 }
730 }
731
732 private void checkStateWritability(FlowState state) throws Http2Exception {
733 if (isWritable(state) != state.markedWritability()) {
734 if (state == connectionState) {
735 checkAllWritabilityChanged();
736 } else {
737 notifyWritabilityChanged(state);
738 }
739 }
740 }
741
742 private void notifyWritabilityChanged(FlowState state) {
743 state.markedWritability(!state.markedWritability());
744 try {
745 listener.writabilityChanged(state.stream);
746 } catch (Throwable cause) {
747 logger.error("Caught Throwable from listener.writabilityChanged", cause);
748 }
749 }
750
751 private void checkConnectionThenStreamWritabilityChanged(FlowState state) throws Http2Exception {
752
753 if (isWritableConnection() != connectionState.markedWritability()) {
754 checkAllWritabilityChanged();
755 } else if (isWritable(state) != state.markedWritability()) {
756 notifyWritabilityChanged(state);
757 }
758 }
759
760 private void checkAllWritabilityChanged() throws Http2Exception {
761
762 connectionState.markedWritability(isWritableConnection());
763 connection.forEachActiveStream(this);
764 }
765 }
766 }