1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty5.handler.codec.http2;
16
17 import io.netty5.util.collection.IntCollections;
18 import io.netty5.util.collection.IntObjectHashMap;
19 import io.netty5.util.collection.IntObjectMap;
20 import io.netty5.util.internal.DefaultPriorityQueue;
21 import io.netty5.util.internal.EmptyPriorityQueue;
22 import io.netty5.util.internal.PriorityQueue;
23 import io.netty5.util.internal.PriorityQueueNode;
24 import io.netty5.util.internal.SystemPropertyUtil;
25 import io.netty5.util.internal.UnstableApi;
26
27 import java.io.Serializable;
28 import java.util.ArrayList;
29 import java.util.Comparator;
30 import java.util.Iterator;
31 import java.util.List;
32
33 import static io.netty5.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
34 import static io.netty5.handler.codec.http2.Http2CodecUtil.DEFAULT_MIN_ALLOCATION_CHUNK;
35 import static io.netty5.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
36 import static io.netty5.handler.codec.http2.Http2CodecUtil.streamableBytes;
37 import static io.netty5.handler.codec.http2.Http2Error.INTERNAL_ERROR;
38 import static io.netty5.handler.codec.http2.Http2Exception.connectionError;
39 import static io.netty5.util.internal.ObjectUtil.checkPositive;
40 import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
41 import static java.lang.Integer.MAX_VALUE;
42 import static java.lang.Math.max;
43 import static java.lang.Math.min;
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 @UnstableApi
59 public final class WeightedFairQueueByteDistributor implements StreamByteDistributor {
60
61
62
63
64
65
66
67
68 static final int INITIAL_CHILDREN_MAP_SIZE =
69 max(1, SystemPropertyUtil.getInt("io.netty5.http2.childrenMapSize", 2));
70
71
72
73 private static final int DEFAULT_MAX_STATE_ONLY_SIZE = 5;
74
75 private final Http2Connection.PropertyKey stateKey;
76
77
78
79
80 private final IntObjectMap<State> stateOnlyMap;
81
82
83
84
85 private final PriorityQueue<State> stateOnlyRemovalQueue;
86 private final Http2Connection connection;
87 private final State connectionState;
88
89
90
91
92 private int allocationQuantum = DEFAULT_MIN_ALLOCATION_CHUNK;
93 private final int maxStateOnlySize;
94
95 public WeightedFairQueueByteDistributor(Http2Connection connection) {
96 this(connection, DEFAULT_MAX_STATE_ONLY_SIZE);
97 }
98
99 public WeightedFairQueueByteDistributor(Http2Connection connection, int maxStateOnlySize) {
100 checkPositiveOrZero(maxStateOnlySize, "maxStateOnlySize");
101 if (maxStateOnlySize == 0) {
102 stateOnlyMap = IntCollections.emptyMap();
103 stateOnlyRemovalQueue = EmptyPriorityQueue.instance();
104 } else {
105 stateOnlyMap = new IntObjectHashMap<State>(maxStateOnlySize);
106
107
108 stateOnlyRemovalQueue = new DefaultPriorityQueue<>(StateOnlyComparator.INSTANCE, maxStateOnlySize + 2);
109 }
110 this.maxStateOnlySize = maxStateOnlySize;
111
112 this.connection = connection;
113 stateKey = connection.newKey();
114 final Http2Stream connectionStream = connection.connectionStream();
115 connectionStream.setProperty(stateKey, connectionState = new State(connectionStream, 16));
116
117
118 connection.addListener(new Http2ConnectionAdapter() {
119 @Override
120 public void onStreamAdded(Http2Stream stream) {
121 State state = stateOnlyMap.remove(stream.id());
122 if (state == null) {
123 state = new State(stream);
124
125 List<ParentChangedEvent> events = new ArrayList<>(1);
126 connectionState.takeChild(state, false, events);
127 notifyParentChanged(events);
128 } else {
129 stateOnlyRemovalQueue.removeTyped(state);
130 state.stream = stream;
131 }
132 switch (stream.state()) {
133 case RESERVED_REMOTE:
134 case RESERVED_LOCAL:
135 state.setStreamReservedOrActivated();
136
137
138 break;
139 default:
140 break;
141 }
142 stream.setProperty(stateKey, state);
143 }
144
145 @Override
146 public void onStreamActive(Http2Stream stream) {
147 state(stream).setStreamReservedOrActivated();
148
149
150 }
151
152 @Override
153 public void onStreamClosed(Http2Stream stream) {
154 state(stream).close();
155 }
156
157 @Override
158 public void onStreamRemoved(Http2Stream stream) {
159
160
161
162 State state = state(stream);
163
164
165
166
167 state.stream = null;
168
169 if (WeightedFairQueueByteDistributor.this.maxStateOnlySize == 0) {
170 state.parent.removeChild(state);
171 return;
172 }
173 if (stateOnlyRemovalQueue.size() == WeightedFairQueueByteDistributor.this.maxStateOnlySize) {
174 State stateToRemove = stateOnlyRemovalQueue.peek();
175 if (StateOnlyComparator.INSTANCE.compare(stateToRemove, state) >= 0) {
176
177
178 state.parent.removeChild(state);
179 return;
180 }
181 stateOnlyRemovalQueue.poll();
182 stateToRemove.parent.removeChild(stateToRemove);
183 stateOnlyMap.remove(stateToRemove.streamId);
184 }
185 stateOnlyRemovalQueue.add(state);
186 stateOnlyMap.put(state.streamId, state);
187 }
188 });
189 }
190
191 @Override
192 public void updateStreamableBytes(StreamState state) {
193 state(state.stream()).updateStreamableBytes(streamableBytes(state),
194 state.hasFrame() && state.windowSize() >= 0);
195 }
196
197 @Override
198 public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
199 State state = state(childStreamId);
200 if (state == null) {
201
202
203
204 if (maxStateOnlySize == 0) {
205 return;
206 }
207 state = new State(childStreamId);
208 stateOnlyRemovalQueue.add(state);
209 stateOnlyMap.put(childStreamId, state);
210 }
211
212 State newParent = state(parentStreamId);
213 if (newParent == null) {
214
215
216
217 if (maxStateOnlySize == 0) {
218 return;
219 }
220 newParent = new State(parentStreamId);
221 stateOnlyRemovalQueue.add(newParent);
222 stateOnlyMap.put(parentStreamId, newParent);
223
224 List<ParentChangedEvent> events = new ArrayList<>(1);
225 connectionState.takeChild(newParent, false, events);
226 notifyParentChanged(events);
227 }
228
229
230
231 if (state.activeCountForTree != 0 && state.parent != null) {
232 state.parent.totalQueuedWeights += weight - state.weight;
233 }
234 state.weight = weight;
235
236 if (newParent != state.parent || exclusive && newParent.children.size() != 1) {
237 final List<ParentChangedEvent> events;
238 if (newParent.isDescendantOf(state)) {
239 events = new ArrayList<>(2 + (exclusive ? newParent.children.size() : 0));
240 state.parent.takeChild(newParent, false, events);
241 } else {
242 events = new ArrayList<>(1 + (exclusive ? newParent.children.size() : 0));
243 }
244 newParent.takeChild(state, exclusive, events);
245 notifyParentChanged(events);
246 }
247
248
249
250
251 while (stateOnlyRemovalQueue.size() > maxStateOnlySize) {
252 State stateToRemove = stateOnlyRemovalQueue.poll();
253 stateToRemove.parent.removeChild(stateToRemove);
254 stateOnlyMap.remove(stateToRemove.streamId);
255 }
256 }
257
258 @Override
259 public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
260
261 if (connectionState.activeCountForTree == 0) {
262 return false;
263 }
264
265
266
267
268 int oldIsActiveCountForTree;
269 do {
270 oldIsActiveCountForTree = connectionState.activeCountForTree;
271
272 maxBytes -= distributeToChildren(maxBytes, writer, connectionState);
273 } while (connectionState.activeCountForTree != 0 &&
274 (maxBytes > 0 || oldIsActiveCountForTree != connectionState.activeCountForTree));
275
276 return connectionState.activeCountForTree != 0;
277 }
278
279
280
281
282
283 public void allocationQuantum(int allocationQuantum) {
284 checkPositive(allocationQuantum, "allocationQuantum");
285 this.allocationQuantum = allocationQuantum;
286 }
287
288 private int distribute(int maxBytes, Writer writer, State state) throws Http2Exception {
289 if (state.isActive()) {
290 int nsent = min(maxBytes, state.streamableBytes);
291 state.write(nsent, writer);
292 if (nsent == 0 && maxBytes != 0) {
293
294
295
296
297 state.updateStreamableBytes(state.streamableBytes, false);
298 }
299 return nsent;
300 }
301
302 return distributeToChildren(maxBytes, writer, state);
303 }
304
305
306
307
308
309
310
311
312
313
314
315 private int distributeToChildren(int maxBytes, Writer writer, State state) throws Http2Exception {
316 long oldTotalQueuedWeights = state.totalQueuedWeights;
317 State childState = state.pollPseudoTimeQueue();
318 State nextChildState = state.peekPseudoTimeQueue();
319 childState.setDistributing();
320 try {
321 assert nextChildState == null || nextChildState.pseudoTimeToWrite >= childState.pseudoTimeToWrite :
322 "nextChildState[" + nextChildState.streamId + "].pseudoTime(" + nextChildState.pseudoTimeToWrite +
323 ") < " + " childState[" + childState.streamId + "].pseudoTime(" + childState.pseudoTimeToWrite + ')';
324 int nsent = distribute(nextChildState == null ? maxBytes :
325 min(maxBytes, (int) min((nextChildState.pseudoTimeToWrite - childState.pseudoTimeToWrite) *
326 childState.weight / oldTotalQueuedWeights + allocationQuantum, MAX_VALUE)
327 ),
328 writer,
329 childState);
330 state.pseudoTime += nsent;
331 childState.updatePseudoTime(state, nsent, oldTotalQueuedWeights);
332 return nsent;
333 } finally {
334 childState.unsetDistributing();
335
336
337
338 if (childState.activeCountForTree != 0) {
339 state.offerPseudoTimeQueue(childState);
340 }
341 }
342 }
343
344 private State state(Http2Stream stream) {
345 return stream.getProperty(stateKey);
346 }
347
348 private State state(int streamId) {
349 Http2Stream stream = connection.stream(streamId);
350 return stream != null ? state(stream) : stateOnlyMap.get(streamId);
351 }
352
353
354
355
356 boolean isChild(int childId, int parentId, short weight) {
357 State parent = state(parentId);
358 State child;
359 return parent.children.containsKey(childId) &&
360 (child = state(childId)).parent == parent && child.weight == weight;
361 }
362
363
364
365
366 int numChildren(int streamId) {
367 State state = state(streamId);
368 return state == null ? 0 : state.children.size();
369 }
370
371
372
373
374
375 void notifyParentChanged(List<ParentChangedEvent> events) {
376 for (int i = 0; i < events.size(); ++i) {
377 ParentChangedEvent event = events.get(i);
378 stateOnlyRemovalQueue.priorityChanged(event.state);
379 if (event.state.parent != null && event.state.activeCountForTree != 0) {
380 event.state.parent.offerAndInitializePseudoTime(event.state);
381 event.state.parent.activeCountChangeForTree(event.state.activeCountForTree);
382 }
383 }
384 }
385
386
387
388
389
390
391
392
393
394 private static final class StateOnlyComparator implements Comparator<State>, Serializable {
395 private static final long serialVersionUID = -4806936913002105966L;
396
397 static final StateOnlyComparator INSTANCE = new StateOnlyComparator();
398
399 @Override
400 public int compare(State o1, State o2) {
401
402 boolean o1Actived = o1.wasStreamReservedOrActivated();
403 if (o1Actived != o2.wasStreamReservedOrActivated()) {
404 return o1Actived ? -1 : 1;
405 }
406
407 int x = o2.dependencyTreeDepth - o1.dependencyTreeDepth;
408
409
410
411
412
413
414
415
416 return x != 0 ? x : o1.streamId - o2.streamId;
417 }
418 }
419
420 private static final class StatePseudoTimeComparator implements Comparator<State>, Serializable {
421 private static final long serialVersionUID = -1437548640227161828L;
422
423 static final StatePseudoTimeComparator INSTANCE = new StatePseudoTimeComparator();
424
425 @Override
426 public int compare(State o1, State o2) {
427 return Long.compare(o1.pseudoTimeToWrite, o2.pseudoTimeToWrite);
428 }
429 }
430
431
432
433
434 private final class State implements PriorityQueueNode {
435 private static final byte STATE_IS_ACTIVE = 0x1;
436 private static final byte STATE_IS_DISTRIBUTING = 0x2;
437 private static final byte STATE_STREAM_ACTIVATED = 0x4;
438
439
440
441
442 Http2Stream stream;
443 State parent;
444 IntObjectMap<State> children = IntCollections.emptyMap();
445 private final PriorityQueue<State> pseudoTimeQueue;
446 final int streamId;
447 int streamableBytes;
448 int dependencyTreeDepth;
449
450
451
452 int activeCountForTree;
453 private int pseudoTimeQueueIndex = INDEX_NOT_IN_QUEUE;
454 private int stateOnlyQueueIndex = INDEX_NOT_IN_QUEUE;
455
456
457
458 long pseudoTimeToWrite;
459
460
461
462 long pseudoTime;
463 long totalQueuedWeights;
464 private byte flags;
465 short weight = DEFAULT_PRIORITY_WEIGHT;
466
467 State(int streamId) {
468 this(streamId, null, 0);
469 }
470
471 State(Http2Stream stream) {
472 this(stream, 0);
473 }
474
475 State(Http2Stream stream, int initialSize) {
476 this(stream.id(), stream, initialSize);
477 }
478
479 State(int streamId, Http2Stream stream, int initialSize) {
480 this.stream = stream;
481 this.streamId = streamId;
482 pseudoTimeQueue = new DefaultPriorityQueue<>(StatePseudoTimeComparator.INSTANCE, initialSize);
483 }
484
485 boolean isDescendantOf(State state) {
486 State next = parent;
487 while (next != null) {
488 if (next == state) {
489 return true;
490 }
491 next = next.parent;
492 }
493 return false;
494 }
495
496 void takeChild(State child, boolean exclusive, List<ParentChangedEvent> events) {
497 takeChild(null, child, exclusive, events);
498 }
499
500
501
502
503
504 void takeChild(Iterator<IntObjectMap.PrimitiveEntry<State>> childItr, State child, boolean exclusive,
505 List<ParentChangedEvent> events) {
506 State oldParent = child.parent;
507
508 if (oldParent != this) {
509 events.add(new ParentChangedEvent(child, oldParent));
510 child.setParent(this);
511
512
513
514 if (childItr != null) {
515 childItr.remove();
516 } else if (oldParent != null) {
517 oldParent.children.remove(child.streamId);
518 }
519
520
521 initChildrenIfEmpty();
522
523 final State oldChild = children.put(child.streamId, child);
524 assert oldChild == null : "A stream with the same stream ID was already in the child map.";
525 }
526
527 if (exclusive && !children.isEmpty()) {
528
529
530 Iterator<IntObjectMap.PrimitiveEntry<State>> itr = removeAllChildrenExcept(child).entries().iterator();
531 while (itr.hasNext()) {
532 child.takeChild(itr, itr.next().value(), false, events);
533 }
534 }
535 }
536
537
538
539
540 void removeChild(State child) {
541 if (children.remove(child.streamId) != null) {
542 List<ParentChangedEvent> events = new ArrayList<>(1 + child.children.size());
543 events.add(new ParentChangedEvent(child, child.parent));
544 child.setParent(null);
545
546 if (!child.children.isEmpty()) {
547
548 Iterator<IntObjectMap.PrimitiveEntry<State>> itr = child.children.entries().iterator();
549 long totalWeight = child.getTotalWeight();
550 do {
551
552 State dependency = itr.next().value();
553 dependency.weight = (short) max(1, dependency.weight * child.weight / totalWeight);
554 takeChild(itr, dependency, false, events);
555 } while (itr.hasNext());
556 }
557
558 notifyParentChanged(events);
559 }
560 }
561
562 private long getTotalWeight() {
563 long totalWeight = 0L;
564 for (State state : children.values()) {
565 totalWeight += state.weight;
566 }
567 return totalWeight;
568 }
569
570
571
572
573
574
575 private IntObjectMap<State> removeAllChildrenExcept(State stateToRetain) {
576 stateToRetain = children.remove(stateToRetain.streamId);
577 IntObjectMap<State> prevChildren = children;
578
579
580 initChildren();
581 if (stateToRetain != null) {
582 children.put(stateToRetain.streamId, stateToRetain);
583 }
584 return prevChildren;
585 }
586
587 private void setParent(State newParent) {
588
589 if (activeCountForTree != 0 && parent != null) {
590 parent.removePseudoTimeQueue(this);
591 parent.activeCountChangeForTree(-activeCountForTree);
592 }
593 parent = newParent;
594
595 dependencyTreeDepth = newParent == null ? MAX_VALUE : newParent.dependencyTreeDepth + 1;
596 }
597
598 private void initChildrenIfEmpty() {
599 if (children == IntCollections.<State>emptyMap()) {
600 initChildren();
601 }
602 }
603
604 private void initChildren() {
605 children = new IntObjectHashMap<State>(INITIAL_CHILDREN_MAP_SIZE);
606 }
607
608 void write(int numBytes, Writer writer) throws Http2Exception {
609 assert stream != null;
610 try {
611 writer.write(stream, numBytes);
612 } catch (Throwable t) {
613 throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
614 }
615 }
616
617 void activeCountChangeForTree(int increment) {
618 assert activeCountForTree + increment >= 0;
619 activeCountForTree += increment;
620 if (parent != null) {
621 assert activeCountForTree != increment ||
622 pseudoTimeQueueIndex == INDEX_NOT_IN_QUEUE ||
623 parent.pseudoTimeQueue.containsTyped(this) :
624 "State[" + streamId + "].activeCountForTree changed from 0 to " + increment + " is in a " +
625 "pseudoTimeQueue, but not in parent[ " + parent.streamId + "]'s pseudoTimeQueue";
626 if (activeCountForTree == 0) {
627 parent.removePseudoTimeQueue(this);
628 } else if (activeCountForTree == increment && !isDistributing()) {
629
630
631
632
633
634
635
636
637 parent.offerAndInitializePseudoTime(this);
638 }
639 parent.activeCountChangeForTree(increment);
640 }
641 }
642
643 void updateStreamableBytes(int newStreamableBytes, boolean isActive) {
644 if (isActive() != isActive) {
645 if (isActive) {
646 activeCountChangeForTree(1);
647 setActive();
648 } else {
649 activeCountChangeForTree(-1);
650 unsetActive();
651 }
652 }
653
654 streamableBytes = newStreamableBytes;
655 }
656
657
658
659
660 void updatePseudoTime(State parentState, int nsent, long totalQueuedWeights) {
661 assert streamId != CONNECTION_STREAM_ID && nsent >= 0;
662
663
664 pseudoTimeToWrite = min(pseudoTimeToWrite, parentState.pseudoTime) + nsent * totalQueuedWeights / weight;
665 }
666
667
668
669
670
671
672 void offerAndInitializePseudoTime(State state) {
673 state.pseudoTimeToWrite = pseudoTime;
674 offerPseudoTimeQueue(state);
675 }
676
677 void offerPseudoTimeQueue(State state) {
678 pseudoTimeQueue.offer(state);
679 totalQueuedWeights += state.weight;
680 }
681
682
683
684
685 State pollPseudoTimeQueue() {
686 State state = pseudoTimeQueue.poll();
687
688 totalQueuedWeights -= state.weight;
689 return state;
690 }
691
692 void removePseudoTimeQueue(State state) {
693 if (pseudoTimeQueue.removeTyped(state)) {
694 totalQueuedWeights -= state.weight;
695 }
696 }
697
698 State peekPseudoTimeQueue() {
699 return pseudoTimeQueue.peek();
700 }
701
702 void close() {
703 updateStreamableBytes(0, false);
704 stream = null;
705 }
706
707 boolean wasStreamReservedOrActivated() {
708 return (flags & STATE_STREAM_ACTIVATED) != 0;
709 }
710
711 void setStreamReservedOrActivated() {
712 flags |= STATE_STREAM_ACTIVATED;
713 }
714
715 boolean isActive() {
716 return (flags & STATE_IS_ACTIVE) != 0;
717 }
718
719 private void setActive() {
720 flags |= STATE_IS_ACTIVE;
721 }
722
723 private void unsetActive() {
724 flags &= ~STATE_IS_ACTIVE;
725 }
726
727 boolean isDistributing() {
728 return (flags & STATE_IS_DISTRIBUTING) != 0;
729 }
730
731 void setDistributing() {
732 flags |= STATE_IS_DISTRIBUTING;
733 }
734
735 void unsetDistributing() {
736 flags &= ~STATE_IS_DISTRIBUTING;
737 }
738
739 @Override
740 public int priorityQueueIndex(DefaultPriorityQueue<?> queue) {
741 return queue == stateOnlyRemovalQueue ? stateOnlyQueueIndex : pseudoTimeQueueIndex;
742 }
743
744 @Override
745 public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i) {
746 if (queue == stateOnlyRemovalQueue) {
747 stateOnlyQueueIndex = i;
748 } else {
749 pseudoTimeQueueIndex = i;
750 }
751 }
752
753 @Override
754 public String toString() {
755
756 StringBuilder sb = new StringBuilder(256 * (activeCountForTree > 0 ? activeCountForTree : 1));
757 toString(sb);
758 return sb.toString();
759 }
760
761 private void toString(StringBuilder sb) {
762 sb.append("{streamId ").append(streamId)
763 .append(" streamableBytes ").append(streamableBytes)
764 .append(" activeCountForTree ").append(activeCountForTree)
765 .append(" pseudoTimeQueueIndex ").append(pseudoTimeQueueIndex)
766 .append(" pseudoTimeToWrite ").append(pseudoTimeToWrite)
767 .append(" pseudoTime ").append(pseudoTime)
768 .append(" flags ").append(flags)
769 .append(" pseudoTimeQueue.size() ").append(pseudoTimeQueue.size())
770 .append(" stateOnlyQueueIndex ").append(stateOnlyQueueIndex)
771 .append(" parent.streamId ").append(parent == null ? -1 : parent.streamId).append("} [");
772
773 if (!pseudoTimeQueue.isEmpty()) {
774 for (State s : pseudoTimeQueue) {
775 s.toString(sb);
776 sb.append(", ");
777 }
778
779 sb.setLength(sb.length() - 2);
780 }
781 sb.append(']');
782 }
783 }
784
785
786
787
788 private static final class ParentChangedEvent {
789 final State state;
790 final State oldParent;
791
792
793
794
795
796
797 ParentChangedEvent(State state, State oldParent) {
798 this.state = state;
799 this.oldParent = oldParent;
800 }
801 }
802 }