1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.channel.ChannelHandlerContext;
18 import io.netty.util.internal.logging.InternalLogger;
19 import io.netty.util.internal.logging.InternalLoggerFactory;
20
21 import java.util.ArrayDeque;
22 import java.util.Deque;
23
24 import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
25 import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
26 import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
27 import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
28 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
29 import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
30 import static io.netty.handler.codec.http2.Http2Exception.streamError;
31 import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
32 import static io.netty.util.internal.ObjectUtil.checkNotNull;
33 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
34 import static java.lang.Math.max;
35 import static java.lang.Math.min;
36
37
38
39
40
41
42
43 public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController {
44 private static final InternalLogger logger =
45 InternalLoggerFactory.getInstance(DefaultHttp2RemoteFlowController.class);
46 private static final int MIN_WRITABLE_CHUNK = 32 * 1024;
47 private final Http2Connection connection;
48 private final Http2Connection.PropertyKey stateKey;
49 private final StreamByteDistributor streamByteDistributor;
50 private final FlowState connectionState;
51 private int initialWindowSize = DEFAULT_WINDOW_SIZE;
52 private WritabilityMonitor monitor;
53 private ChannelHandlerContext ctx;
54
55 public DefaultHttp2RemoteFlowController(Http2Connection connection) {
56 this(connection, (Listener) null);
57 }
58
59 public DefaultHttp2RemoteFlowController(Http2Connection connection,
60 StreamByteDistributor streamByteDistributor) {
61 this(connection, streamByteDistributor, null);
62 }
63
64 public DefaultHttp2RemoteFlowController(Http2Connection connection, final Listener listener) {
65 this(connection, new WeightedFairQueueByteDistributor(connection), listener);
66 }
67
68 public DefaultHttp2RemoteFlowController(Http2Connection connection,
69 StreamByteDistributor streamByteDistributor,
70 final Listener listener) {
71 this.connection = checkNotNull(connection, "connection");
72 this.streamByteDistributor = checkNotNull(streamByteDistributor, "streamWriteDistributor");
73
74
75 stateKey = connection.newKey();
76 connectionState = new FlowState(connection.connectionStream());
77 connection.connectionStream().setProperty(stateKey, connectionState);
78
79
80 listener(listener);
81 monitor.windowSize(connectionState, initialWindowSize);
82
83
84 connection.addListener(new Http2ConnectionAdapter() {
85 @Override
86 public void onStreamAdded(Http2Stream stream) {
87
88
89 stream.setProperty(stateKey, new FlowState(stream));
90 }
91
92 @Override
93 public void onStreamActive(Http2Stream stream) {
94
95
96 monitor.windowSize(state(stream), initialWindowSize);
97 }
98
99 @Override
100 public void onStreamClosed(Http2Stream stream) {
101
102
103 state(stream).cancel(STREAM_CLOSED, null);
104 }
105
106 @Override
107 public void onStreamHalfClosed(Http2Stream stream) {
108 if (HALF_CLOSED_LOCAL == stream.state()) {
109
110
111
112
113
114
115
116
117
118
119
120 state(stream).cancel(STREAM_CLOSED, null);
121 }
122 }
123 });
124 }
125
126
127
128
129
130
131 @Override
132 public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
133 this.ctx = checkNotNull(ctx, "ctx");
134
135
136
137 channelWritabilityChanged();
138
139
140
141
142
143 if (isChannelWritable()) {
144 writePendingBytes();
145 }
146 }
147
148 @Override
149 public ChannelHandlerContext channelHandlerContext() {
150 return ctx;
151 }
152
153 @Override
154 public void initialWindowSize(int newWindowSize) throws Http2Exception {
155 assert ctx == null || ctx.executor().inEventLoop();
156 monitor.initialWindowSize(newWindowSize);
157 }
158
159 @Override
160 public int initialWindowSize() {
161 return initialWindowSize;
162 }
163
164 @Override
165 public int windowSize(Http2Stream stream) {
166 return state(stream).windowSize();
167 }
168
169 @Override
170 public boolean isWritable(Http2Stream stream) {
171 return monitor.isWritable(state(stream));
172 }
173
174 @Override
175 public void channelWritabilityChanged() throws Http2Exception {
176 monitor.channelWritabilityChange();
177 }
178
179 @Override
180 public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
181
182 assert weight >= MIN_WEIGHT && weight <= MAX_WEIGHT : "Invalid weight";
183 assert childStreamId != parentStreamId : "A stream cannot depend on itself";
184 assert childStreamId > 0 && parentStreamId >= 0 : "childStreamId must be > 0. parentStreamId must be >= 0.";
185
186 streamByteDistributor.updateDependencyTree(childStreamId, parentStreamId, weight, exclusive);
187 }
188
189 private boolean isChannelWritable() {
190 return ctx != null && isChannelWritable0();
191 }
192
193 private boolean isChannelWritable0() {
194 return ctx.channel().isWritable();
195 }
196
197 @Override
198 public void listener(Listener listener) {
199 monitor = listener == null ? new WritabilityMonitor() : new ListenerWritabilityMonitor(listener);
200 }
201
202 @Override
203 public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
204 assert ctx == null || ctx.executor().inEventLoop();
205 monitor.incrementWindowSize(state(stream), delta);
206 }
207
208 @Override
209 public void addFlowControlled(Http2Stream stream, FlowControlled frame) {
210
211 assert ctx == null || ctx.executor().inEventLoop();
212 checkNotNull(frame, "frame");
213 try {
214 monitor.enqueueFrame(state(stream), frame);
215 } catch (Throwable t) {
216 frame.error(ctx, t);
217 }
218 }
219
220 @Override
221 public boolean hasFlowControlled(Http2Stream stream) {
222 return state(stream).hasFrame();
223 }
224
225 private FlowState state(Http2Stream stream) {
226 return (FlowState) stream.getProperty(stateKey);
227 }
228
229
230
231
232 private int connectionWindowSize() {
233 return connectionState.windowSize();
234 }
235
236 private int minUsableChannelBytes() {
237
238
239
240
241
242
243 return max(ctx.channel().config().getWriteBufferLowWaterMark(), MIN_WRITABLE_CHUNK);
244 }
245
246 private int maxUsableChannelBytes() {
247
248 int channelWritableBytes = (int) min(Integer.MAX_VALUE, ctx.channel().bytesBeforeUnwritable());
249 int usableBytes = channelWritableBytes > 0 ? max(channelWritableBytes, minUsableChannelBytes()) : 0;
250
251
252 return min(connectionState.windowSize(), usableBytes);
253 }
254
255
256
257
258
259 private int writableBytes() {
260 return min(connectionWindowSize(), maxUsableChannelBytes());
261 }
262
263 @Override
264 public void writePendingBytes() throws Http2Exception {
265 monitor.writePendingBytes();
266 }
267
268
269
270
271 private final class FlowState implements StreamByteDistributor.StreamState {
272 private final Http2Stream stream;
273 private final Deque<FlowControlled> pendingWriteQueue;
274 private int window;
275 private long pendingBytes;
276 private boolean markedWritable;
277
278
279
280
281 private boolean writing;
282
283
284
285 private boolean cancelled;
286
287 FlowState(Http2Stream stream) {
288 this.stream = stream;
289 pendingWriteQueue = new ArrayDeque<FlowControlled>(2);
290 }
291
292
293
294
295
296 boolean isWritable() {
297 return windowSize() > pendingBytes() && !cancelled;
298 }
299
300
301
302
303 @Override
304 public Http2Stream stream() {
305 return stream;
306 }
307
308
309
310
311 boolean markedWritability() {
312 return markedWritable;
313 }
314
315
316
317
318 void markedWritability(boolean isWritable) {
319 this.markedWritable = isWritable;
320 }
321
322 @Override
323 public int windowSize() {
324 return window;
325 }
326
327
328
329
330 void windowSize(int initialWindowSize) {
331 window = initialWindowSize;
332 }
333
334
335
336
337
338 int writeAllocatedBytes(int allocated) {
339 final int initialAllocated = allocated;
340 int writtenBytes;
341
342 Throwable cause = null;
343 FlowControlled frame;
344 try {
345 assert !writing;
346 writing = true;
347
348
349 boolean writeOccurred = false;
350 while (!cancelled && (frame = peek()) != null) {
351 int maxBytes = min(allocated, writableWindow());
352 if (maxBytes <= 0 && frame.size() > 0) {
353
354
355 break;
356 }
357 writeOccurred = true;
358 int initialFrameSize = frame.size();
359 try {
360 frame.write(ctx, max(0, maxBytes));
361 if (frame.size() == 0) {
362
363
364
365 pendingWriteQueue.remove();
366 frame.writeComplete();
367 }
368 } finally {
369
370 allocated -= initialFrameSize - frame.size();
371 }
372 }
373
374 if (!writeOccurred) {
375
376 return -1;
377 }
378
379 } catch (Throwable t) {
380
381 cancelled = true;
382 cause = t;
383 } finally {
384 writing = false;
385
386
387 writtenBytes = initialAllocated - allocated;
388
389 decrementPendingBytes(writtenBytes, false);
390 decrementFlowControlWindow(writtenBytes);
391
392
393
394 if (cancelled) {
395 cancel(INTERNAL_ERROR, cause);
396 }
397 }
398 return writtenBytes;
399 }
400
401
402
403
404 int incrementStreamWindow(int delta) throws Http2Exception {
405 if (delta > 0 && Integer.MAX_VALUE - delta < window) {
406 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
407 "Window size overflow for stream: %d", stream.id());
408 }
409 window += delta;
410
411 streamByteDistributor.updateStreamableBytes(this);
412 return window;
413 }
414
415
416
417
418 private int writableWindow() {
419 return min(window, connectionWindowSize());
420 }
421
422 @Override
423 public long pendingBytes() {
424 return pendingBytes;
425 }
426
427
428
429
430 void enqueueFrame(FlowControlled frame) {
431 FlowControlled last = pendingWriteQueue.peekLast();
432 if (last == null) {
433 enqueueFrameWithoutMerge(frame);
434 return;
435 }
436
437 int lastSize = last.size();
438 if (last.merge(ctx, frame)) {
439 incrementPendingBytes(last.size() - lastSize, true);
440 return;
441 }
442 enqueueFrameWithoutMerge(frame);
443 }
444
445 private void enqueueFrameWithoutMerge(FlowControlled frame) {
446 pendingWriteQueue.offer(frame);
447
448
449 incrementPendingBytes(frame.size(), true);
450 }
451
452 @Override
453 public boolean hasFrame() {
454 return !pendingWriteQueue.isEmpty();
455 }
456
457
458
459
460 private FlowControlled peek() {
461 return pendingWriteQueue.peek();
462 }
463
464
465
466
467
468
469 void cancel(Http2Error error, Throwable cause) {
470 cancelled = true;
471
472 if (writing) {
473 return;
474 }
475
476 FlowControlled frame = pendingWriteQueue.poll();
477 if (frame != null) {
478
479 final Http2Exception exception = streamError(stream.id(), error, cause,
480 "Stream closed before write could take place");
481 do {
482 writeError(frame, exception);
483 frame = pendingWriteQueue.poll();
484 } while (frame != null);
485 }
486
487 streamByteDistributor.updateStreamableBytes(this);
488
489 monitor.stateCancelled(this);
490 }
491
492
493
494
495
496 private void incrementPendingBytes(int numBytes, boolean updateStreamableBytes) {
497 pendingBytes += numBytes;
498 monitor.incrementPendingBytes(numBytes);
499 if (updateStreamableBytes) {
500 streamByteDistributor.updateStreamableBytes(this);
501 }
502 }
503
504
505
506
507 private void decrementPendingBytes(int bytes, boolean updateStreamableBytes) {
508 incrementPendingBytes(-bytes, updateStreamableBytes);
509 }
510
511
512
513
514 private void decrementFlowControlWindow(int bytes) {
515 try {
516 int negativeBytes = -bytes;
517 connectionState.incrementStreamWindow(negativeBytes);
518 incrementStreamWindow(negativeBytes);
519 } catch (Http2Exception e) {
520
521 throw new IllegalStateException("Invalid window state when writing frame: " + e.getMessage(), e);
522 }
523 }
524
525
526
527
528
529 private void writeError(FlowControlled frame, Http2Exception cause) {
530 assert ctx != null;
531 decrementPendingBytes(frame.size(), true);
532 frame.error(ctx, cause);
533 }
534 }
535
536
537
538
539 private class WritabilityMonitor implements StreamByteDistributor.Writer {
540 private boolean inWritePendingBytes;
541 private long totalPendingBytes;
542
543 @Override
544 public final void write(Http2Stream stream, int numBytes) {
545 state(stream).writeAllocatedBytes(numBytes);
546 }
547
548
549
550
551
552 void channelWritabilityChange() throws Http2Exception { }
553
554
555
556
557
558 void stateCancelled(FlowState state) { }
559
560
561
562
563
564
565 void windowSize(FlowState state, int initialWindowSize) {
566 state.windowSize(initialWindowSize);
567 }
568
569
570
571
572
573
574
575 void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
576 state.incrementStreamWindow(delta);
577 }
578
579
580
581
582
583
584
585 void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
586 state.enqueueFrame(frame);
587 }
588
589
590
591
592
593
594 final void incrementPendingBytes(int delta) {
595 totalPendingBytes += delta;
596
597
598
599 }
600
601
602
603
604
605
606 final boolean isWritable(FlowState state) {
607 return isWritableConnection() && state.isWritable();
608 }
609
610 final void writePendingBytes() throws Http2Exception {
611
612
613
614
615
616 if (inWritePendingBytes) {
617 return;
618 }
619 inWritePendingBytes = true;
620 try {
621 int bytesToWrite = writableBytes();
622
623
624 for (;;) {
625 if (!streamByteDistributor.distribute(bytesToWrite, this) ||
626 (bytesToWrite = writableBytes()) <= 0 ||
627 !isChannelWritable0()) {
628 break;
629 }
630 }
631 } finally {
632 inWritePendingBytes = false;
633 }
634 }
635
636 void initialWindowSize(int newWindowSize) throws Http2Exception {
637 checkPositiveOrZero(newWindowSize, "newWindowSize");
638
639 final int delta = newWindowSize - initialWindowSize;
640 initialWindowSize = newWindowSize;
641 connection.forEachActiveStream(new Http2StreamVisitor() {
642 @Override
643 public boolean visit(Http2Stream stream) throws Http2Exception {
644 state(stream).incrementStreamWindow(delta);
645 return true;
646 }
647 });
648
649 if (delta > 0 && isChannelWritable()) {
650
651 writePendingBytes();
652 }
653 }
654
655 final boolean isWritableConnection() {
656 return connectionState.windowSize() - totalPendingBytes > 0 && isChannelWritable();
657 }
658 }
659
660
661
662
663
664
665
666
667
668 private final class ListenerWritabilityMonitor extends WritabilityMonitor implements Http2StreamVisitor {
669 private final Listener listener;
670
671 ListenerWritabilityMonitor(Listener listener) {
672 this.listener = listener;
673 }
674
675 @Override
676 public boolean visit(Http2Stream stream) throws Http2Exception {
677 FlowState state = state(stream);
678 if (isWritable(state) != state.markedWritability()) {
679 notifyWritabilityChanged(state);
680 }
681 return true;
682 }
683
684 @Override
685 void windowSize(FlowState state, int initialWindowSize) {
686 super.windowSize(state, initialWindowSize);
687 try {
688 checkStateWritability(state);
689 } catch (Http2Exception e) {
690 throw new RuntimeException("Caught unexpected exception from window", e);
691 }
692 }
693
694 @Override
695 void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
696 super.incrementWindowSize(state, delta);
697 checkStateWritability(state);
698 }
699
700 @Override
701 void initialWindowSize(int newWindowSize) throws Http2Exception {
702 super.initialWindowSize(newWindowSize);
703 if (isWritableConnection()) {
704
705
706 checkAllWritabilityChanged();
707 }
708 }
709
710 @Override
711 void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
712 super.enqueueFrame(state, frame);
713 checkConnectionThenStreamWritabilityChanged(state);
714 }
715
716 @Override
717 void stateCancelled(FlowState state) {
718 try {
719 checkConnectionThenStreamWritabilityChanged(state);
720 } catch (Http2Exception e) {
721 throw new RuntimeException("Caught unexpected exception from checkAllWritabilityChanged", e);
722 }
723 }
724
725 @Override
726 void channelWritabilityChange() throws Http2Exception {
727 if (connectionState.markedWritability() != isChannelWritable()) {
728 checkAllWritabilityChanged();
729 }
730 }
731
732 private void checkStateWritability(FlowState state) throws Http2Exception {
733 if (isWritable(state) != state.markedWritability()) {
734 if (state == connectionState) {
735 checkAllWritabilityChanged();
736 } else {
737 notifyWritabilityChanged(state);
738 }
739 }
740 }
741
742 private void notifyWritabilityChanged(FlowState state) {
743 state.markedWritability(!state.markedWritability());
744 try {
745 listener.writabilityChanged(state.stream);
746 } catch (Throwable cause) {
747 logger.error("Caught Throwable from listener.writabilityChanged", cause);
748 }
749 }
750
751 private void checkConnectionThenStreamWritabilityChanged(FlowState state) throws Http2Exception {
752
753 if (isWritableConnection() != connectionState.markedWritability()) {
754 checkAllWritabilityChanged();
755 } else if (isWritable(state) != state.markedWritability()) {
756 notifyWritabilityChanged(state);
757 }
758 }
759
760 private void checkAllWritabilityChanged() throws Http2Exception {
761
762 connectionState.markedWritability(isWritableConnection());
763 connection.forEachActiveStream(this);
764 }
765 }
766 }