1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.util.collection.IntCollections;
18 import io.netty.util.collection.IntObjectHashMap;
19 import io.netty.util.collection.IntObjectMap;
20 import io.netty.util.internal.DefaultPriorityQueue;
21 import io.netty.util.internal.EmptyPriorityQueue;
22 import io.netty.util.internal.PriorityQueue;
23 import io.netty.util.internal.PriorityQueueNode;
24 import io.netty.util.internal.SystemPropertyUtil;
25
26 import java.io.Serializable;
27 import java.util.ArrayList;
28 import java.util.Comparator;
29 import java.util.Iterator;
30 import java.util.List;
31
32 import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
33 import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MIN_ALLOCATION_CHUNK;
34 import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
35 import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes;
36 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
37 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
38 import static io.netty.util.internal.ObjectUtil.checkPositive;
39 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
40 import static java.lang.Integer.MAX_VALUE;
41 import static java.lang.Math.max;
42 import static java.lang.Math.min;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 public final class WeightedFairQueueByteDistributor implements StreamByteDistributor {
58
59
60
61
62
63
64
65
66 static final int INITIAL_CHILDREN_MAP_SIZE =
67 max(1, SystemPropertyUtil.getInt("io.netty.http2.childrenMapSize", 2));
68
69
70
71 private static final int DEFAULT_MAX_STATE_ONLY_SIZE = 5;
72
73 private final Http2Connection.PropertyKey stateKey;
74
75
76
77
78 private final IntObjectMap<State> stateOnlyMap;
79
80
81
82
83 private final PriorityQueue<State> stateOnlyRemovalQueue;
84 private final Http2Connection connection;
85 private final State connectionState;
86
87
88
89
90 private int allocationQuantum = DEFAULT_MIN_ALLOCATION_CHUNK;
91 private final int maxStateOnlySize;
92
93 public WeightedFairQueueByteDistributor(Http2Connection connection) {
94 this(connection, DEFAULT_MAX_STATE_ONLY_SIZE);
95 }
96
97 public WeightedFairQueueByteDistributor(Http2Connection connection, int maxStateOnlySize) {
98 checkPositiveOrZero(maxStateOnlySize, "maxStateOnlySize");
99 if (maxStateOnlySize == 0) {
100 stateOnlyMap = IntCollections.emptyMap();
101 stateOnlyRemovalQueue = EmptyPriorityQueue.instance();
102 } else {
103 stateOnlyMap = new IntObjectHashMap<State>(maxStateOnlySize);
104
105
106 stateOnlyRemovalQueue = new DefaultPriorityQueue<State>(StateOnlyComparator.INSTANCE, maxStateOnlySize + 2);
107 }
108 this.maxStateOnlySize = maxStateOnlySize;
109
110 this.connection = connection;
111 stateKey = connection.newKey();
112 final Http2Stream connectionStream = connection.connectionStream();
113 connectionStream.setProperty(stateKey, connectionState = new State(connectionStream, 16));
114
115
116 connection.addListener(new Http2ConnectionAdapter() {
117 @Override
118 public void onStreamAdded(Http2Stream stream) {
119 State state = stateOnlyMap.remove(stream.id());
120 if (state == null) {
121 state = new State(stream);
122
123 List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1);
124 connectionState.takeChild(state, false, events);
125 notifyParentChanged(events);
126 } else {
127 stateOnlyRemovalQueue.removeTyped(state);
128 state.stream = stream;
129 }
130 switch (stream.state()) {
131 case RESERVED_REMOTE:
132 case RESERVED_LOCAL:
133 state.setStreamReservedOrActivated();
134
135
136 break;
137 default:
138 break;
139 }
140 stream.setProperty(stateKey, state);
141 }
142
143 @Override
144 public void onStreamActive(Http2Stream stream) {
145 state(stream).setStreamReservedOrActivated();
146
147
148 }
149
150 @Override
151 public void onStreamClosed(Http2Stream stream) {
152 state(stream).close();
153 }
154
155 @Override
156 public void onStreamRemoved(Http2Stream stream) {
157
158
159
160 State state = state(stream);
161
162
163
164
165 state.stream = null;
166
167 if (WeightedFairQueueByteDistributor.this.maxStateOnlySize == 0) {
168 state.parent.removeChild(state);
169 return;
170 }
171 if (stateOnlyRemovalQueue.size() == WeightedFairQueueByteDistributor.this.maxStateOnlySize) {
172 State stateToRemove = stateOnlyRemovalQueue.peek();
173 if (StateOnlyComparator.INSTANCE.compare(stateToRemove, state) >= 0) {
174
175
176 state.parent.removeChild(state);
177 return;
178 }
179 stateOnlyRemovalQueue.poll();
180 stateToRemove.parent.removeChild(stateToRemove);
181 stateOnlyMap.remove(stateToRemove.streamId);
182 }
183 stateOnlyRemovalQueue.add(state);
184 stateOnlyMap.put(state.streamId, state);
185 }
186 });
187 }
188
189 @Override
190 public void updateStreamableBytes(StreamState state) {
191 state(state.stream()).updateStreamableBytes(streamableBytes(state),
192 state.hasFrame() && state.windowSize() >= 0);
193 }
194
195 @Override
196 public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
197 State state = state(childStreamId);
198 if (state == null) {
199
200
201
202 if (maxStateOnlySize == 0) {
203 return;
204 }
205 state = new State(childStreamId);
206 stateOnlyRemovalQueue.add(state);
207 stateOnlyMap.put(childStreamId, state);
208 }
209
210 State newParent = state(parentStreamId);
211 if (newParent == null) {
212
213
214
215 if (maxStateOnlySize == 0) {
216 return;
217 }
218 newParent = new State(parentStreamId);
219 stateOnlyRemovalQueue.add(newParent);
220 stateOnlyMap.put(parentStreamId, newParent);
221
222 List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1);
223 connectionState.takeChild(newParent, false, events);
224 notifyParentChanged(events);
225 }
226
227
228
229 if (state.activeCountForTree != 0 && state.parent != null) {
230 state.parent.totalQueuedWeights += weight - state.weight;
231 }
232 state.weight = weight;
233
234 if (newParent != state.parent || exclusive && newParent.children.size() != 1) {
235 final List<ParentChangedEvent> events;
236 if (newParent.isDescendantOf(state)) {
237 events = new ArrayList<ParentChangedEvent>(2 + (exclusive ? newParent.children.size() : 0));
238 state.parent.takeChild(newParent, false, events);
239 } else {
240 events = new ArrayList<ParentChangedEvent>(1 + (exclusive ? newParent.children.size() : 0));
241 }
242 newParent.takeChild(state, exclusive, events);
243 notifyParentChanged(events);
244 }
245
246
247
248
249 while (stateOnlyRemovalQueue.size() > maxStateOnlySize) {
250 State stateToRemove = stateOnlyRemovalQueue.poll();
251 stateToRemove.parent.removeChild(stateToRemove);
252 stateOnlyMap.remove(stateToRemove.streamId);
253 }
254 }
255
256 @Override
257 public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
258
259 if (connectionState.activeCountForTree == 0) {
260 return false;
261 }
262
263
264
265
266 int oldIsActiveCountForTree;
267 do {
268 oldIsActiveCountForTree = connectionState.activeCountForTree;
269
270 maxBytes -= distributeToChildren(maxBytes, writer, connectionState);
271 } while (connectionState.activeCountForTree != 0 &&
272 (maxBytes > 0 || oldIsActiveCountForTree != connectionState.activeCountForTree));
273
274 return connectionState.activeCountForTree != 0;
275 }
276
277
278
279
280
281 public void allocationQuantum(int allocationQuantum) {
282 checkPositive(allocationQuantum, "allocationQuantum");
283 this.allocationQuantum = allocationQuantum;
284 }
285
286 private int distribute(int maxBytes, Writer writer, State state) throws Http2Exception {
287 if (state.isActive()) {
288 int nsent = min(maxBytes, state.streamableBytes);
289 state.write(nsent, writer);
290 if (nsent == 0 && maxBytes != 0) {
291
292
293
294
295 state.updateStreamableBytes(state.streamableBytes, false);
296 }
297 return nsent;
298 }
299
300 return distributeToChildren(maxBytes, writer, state);
301 }
302
303
304
305
306
307
308
309
310
311
312
313 private int distributeToChildren(int maxBytes, Writer writer, State state) throws Http2Exception {
314 long oldTotalQueuedWeights = state.totalQueuedWeights;
315 State childState = state.pollPseudoTimeQueue();
316 State nextChildState = state.peekPseudoTimeQueue();
317 childState.setDistributing();
318 try {
319 assert nextChildState == null || nextChildState.pseudoTimeToWrite >= childState.pseudoTimeToWrite :
320 "nextChildState[" + nextChildState.streamId + "].pseudoTime(" + nextChildState.pseudoTimeToWrite +
321 ") < " + " childState[" + childState.streamId + "].pseudoTime(" + childState.pseudoTimeToWrite + ')';
322 int nsent = distribute(nextChildState == null ? maxBytes :
323 min(maxBytes, (int) min((nextChildState.pseudoTimeToWrite - childState.pseudoTimeToWrite) *
324 childState.weight / oldTotalQueuedWeights + allocationQuantum, MAX_VALUE)
325 ),
326 writer,
327 childState);
328 state.pseudoTime += nsent;
329 childState.updatePseudoTime(state, nsent, oldTotalQueuedWeights);
330 return nsent;
331 } finally {
332 childState.unsetDistributing();
333
334
335
336 if (childState.activeCountForTree != 0) {
337 state.offerPseudoTimeQueue(childState);
338 }
339 }
340 }
341
342 private State state(Http2Stream stream) {
343 return stream.getProperty(stateKey);
344 }
345
346 private State state(int streamId) {
347 Http2Stream stream = connection.stream(streamId);
348 return stream != null ? state(stream) : stateOnlyMap.get(streamId);
349 }
350
351
352
353
354 boolean isChild(int childId, int parentId, short weight) {
355 State parent = state(parentId);
356 State child;
357 return parent.children.containsKey(childId) &&
358 (child = state(childId)).parent == parent && child.weight == weight;
359 }
360
361
362
363
364 int numChildren(int streamId) {
365 State state = state(streamId);
366 return state == null ? 0 : state.children.size();
367 }
368
369
370
371
372
373 void notifyParentChanged(List<ParentChangedEvent> events) {
374 for (int i = 0; i < events.size(); ++i) {
375 ParentChangedEvent event = events.get(i);
376 stateOnlyRemovalQueue.priorityChanged(event.state);
377 if (event.state.parent != null && event.state.activeCountForTree != 0) {
378 event.state.parent.offerAndInitializePseudoTime(event.state);
379 event.state.parent.activeCountChangeForTree(event.state.activeCountForTree);
380 }
381 }
382 }
383
384
385
386
387
388
389
390
391
392 private static final class StateOnlyComparator implements Comparator<State>, Serializable {
393 private static final long serialVersionUID = -4806936913002105966L;
394
395 static final StateOnlyComparator INSTANCE = new StateOnlyComparator();
396
397 @Override
398 public int compare(State o1, State o2) {
399
400 boolean o1Actived = o1.wasStreamReservedOrActivated();
401 if (o1Actived != o2.wasStreamReservedOrActivated()) {
402 return o1Actived ? -1 : 1;
403 }
404
405 int x = o2.dependencyTreeDepth - o1.dependencyTreeDepth;
406
407
408
409
410
411
412
413
414 return x != 0 ? x : o1.streamId - o2.streamId;
415 }
416 }
417
418 private static final class StatePseudoTimeComparator implements Comparator<State>, Serializable {
419 private static final long serialVersionUID = -1437548640227161828L;
420
421 static final StatePseudoTimeComparator INSTANCE = new StatePseudoTimeComparator();
422
423 @Override
424 public int compare(State o1, State o2) {
425 return Long.compare(o1.pseudoTimeToWrite, o2.pseudoTimeToWrite);
426 }
427 }
428
429
430
431
432 private final class State implements PriorityQueueNode {
433 private static final byte STATE_IS_ACTIVE = 0x1;
434 private static final byte STATE_IS_DISTRIBUTING = 0x2;
435 private static final byte STATE_STREAM_ACTIVATED = 0x4;
436
437
438
439
440 Http2Stream stream;
441 State parent;
442 IntObjectMap<State> children = IntCollections.emptyMap();
443 private final PriorityQueue<State> pseudoTimeQueue;
444 final int streamId;
445 int streamableBytes;
446 int dependencyTreeDepth;
447
448
449
450 int activeCountForTree;
451 private int pseudoTimeQueueIndex = INDEX_NOT_IN_QUEUE;
452 private int stateOnlyQueueIndex = INDEX_NOT_IN_QUEUE;
453
454
455
456 long pseudoTimeToWrite;
457
458
459
460 long pseudoTime;
461 long totalQueuedWeights;
462 private byte flags;
463 short weight = DEFAULT_PRIORITY_WEIGHT;
464
465 State(int streamId) {
466 this(streamId, null, 0);
467 }
468
469 State(Http2Stream stream) {
470 this(stream, 0);
471 }
472
473 State(Http2Stream stream, int initialSize) {
474 this(stream.id(), stream, initialSize);
475 }
476
477 State(int streamId, Http2Stream stream, int initialSize) {
478 this.stream = stream;
479 this.streamId = streamId;
480 pseudoTimeQueue = new DefaultPriorityQueue<State>(StatePseudoTimeComparator.INSTANCE, initialSize);
481 }
482
483 boolean isDescendantOf(State state) {
484 State next = parent;
485 while (next != null) {
486 if (next == state) {
487 return true;
488 }
489 next = next.parent;
490 }
491 return false;
492 }
493
494 void takeChild(State child, boolean exclusive, List<ParentChangedEvent> events) {
495 takeChild(null, child, exclusive, events);
496 }
497
498
499
500
501
502 void takeChild(Iterator<IntObjectMap.PrimitiveEntry<State>> childItr, State child, boolean exclusive,
503 List<ParentChangedEvent> events) {
504 State oldParent = child.parent;
505
506 if (oldParent != this) {
507 events.add(new ParentChangedEvent(child, oldParent));
508 child.setParent(this);
509
510
511
512 if (childItr != null) {
513 childItr.remove();
514 } else if (oldParent != null) {
515 oldParent.children.remove(child.streamId);
516 }
517
518
519 initChildrenIfEmpty();
520
521 final State oldChild = children.put(child.streamId, child);
522 assert oldChild == null : "A stream with the same stream ID was already in the child map.";
523 }
524
525 if (exclusive && !children.isEmpty()) {
526
527
528 Iterator<IntObjectMap.PrimitiveEntry<State>> itr = removeAllChildrenExcept(child).entries().iterator();
529 while (itr.hasNext()) {
530 child.takeChild(itr, itr.next().value(), false, events);
531 }
532 }
533 }
534
535
536
537
538 void removeChild(State child) {
539 if (children.remove(child.streamId) != null) {
540 List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1 + child.children.size());
541 events.add(new ParentChangedEvent(child, child.parent));
542 child.setParent(null);
543
544 if (!child.children.isEmpty()) {
545
546 Iterator<IntObjectMap.PrimitiveEntry<State>> itr = child.children.entries().iterator();
547 long totalWeight = child.getTotalWeight();
548 do {
549
550 State dependency = itr.next().value();
551 dependency.weight = (short) max(1, dependency.weight * child.weight / totalWeight);
552 takeChild(itr, dependency, false, events);
553 } while (itr.hasNext());
554 }
555
556 notifyParentChanged(events);
557 }
558 }
559
560 private long getTotalWeight() {
561 long totalWeight = 0L;
562 for (State state : children.values()) {
563 totalWeight += state.weight;
564 }
565 return totalWeight;
566 }
567
568
569
570
571
572
573 private IntObjectMap<State> removeAllChildrenExcept(State stateToRetain) {
574 stateToRetain = children.remove(stateToRetain.streamId);
575 IntObjectMap<State> prevChildren = children;
576
577
578 initChildren();
579 if (stateToRetain != null) {
580 children.put(stateToRetain.streamId, stateToRetain);
581 }
582 return prevChildren;
583 }
584
585 private void setParent(State newParent) {
586
587 if (activeCountForTree != 0 && parent != null) {
588 parent.removePseudoTimeQueue(this);
589 parent.activeCountChangeForTree(-activeCountForTree);
590 }
591 parent = newParent;
592
593 dependencyTreeDepth = newParent == null ? MAX_VALUE : newParent.dependencyTreeDepth + 1;
594 }
595
596 private void initChildrenIfEmpty() {
597 if (children == IntCollections.<State>emptyMap()) {
598 initChildren();
599 }
600 }
601
602 private void initChildren() {
603 children = new IntObjectHashMap<State>(INITIAL_CHILDREN_MAP_SIZE);
604 }
605
606 void write(int numBytes, Writer writer) throws Http2Exception {
607 assert stream != null;
608 try {
609 writer.write(stream, numBytes);
610 } catch (Throwable t) {
611 throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
612 }
613 }
614
615 void activeCountChangeForTree(int increment) {
616 assert activeCountForTree + increment >= 0;
617 activeCountForTree += increment;
618 if (parent != null) {
619 assert activeCountForTree != increment ||
620 pseudoTimeQueueIndex == INDEX_NOT_IN_QUEUE ||
621 parent.pseudoTimeQueue.containsTyped(this) :
622 "State[" + streamId + "].activeCountForTree changed from 0 to " + increment + " is in a " +
623 "pseudoTimeQueue, but not in parent[ " + parent.streamId + "]'s pseudoTimeQueue";
624 if (activeCountForTree == 0) {
625 parent.removePseudoTimeQueue(this);
626 } else if (activeCountForTree == increment && !isDistributing()) {
627
628
629
630
631
632
633
634
635 parent.offerAndInitializePseudoTime(this);
636 }
637 parent.activeCountChangeForTree(increment);
638 }
639 }
640
641 void updateStreamableBytes(int newStreamableBytes, boolean isActive) {
642 if (isActive() != isActive) {
643 if (isActive) {
644 activeCountChangeForTree(1);
645 setActive();
646 } else {
647 activeCountChangeForTree(-1);
648 unsetActive();
649 }
650 }
651
652 streamableBytes = newStreamableBytes;
653 }
654
655
656
657
658 void updatePseudoTime(State parentState, int nsent, long totalQueuedWeights) {
659 assert streamId != CONNECTION_STREAM_ID && nsent >= 0;
660
661
662 pseudoTimeToWrite = min(pseudoTimeToWrite, parentState.pseudoTime) + nsent * totalQueuedWeights / weight;
663 }
664
665
666
667
668
669
670 void offerAndInitializePseudoTime(State state) {
671 state.pseudoTimeToWrite = pseudoTime;
672 offerPseudoTimeQueue(state);
673 }
674
675 void offerPseudoTimeQueue(State state) {
676 pseudoTimeQueue.offer(state);
677 totalQueuedWeights += state.weight;
678 }
679
680
681
682
683 State pollPseudoTimeQueue() {
684 State state = pseudoTimeQueue.poll();
685
686 totalQueuedWeights -= state.weight;
687 return state;
688 }
689
690 void removePseudoTimeQueue(State state) {
691 if (pseudoTimeQueue.removeTyped(state)) {
692 totalQueuedWeights -= state.weight;
693 }
694 }
695
696 State peekPseudoTimeQueue() {
697 return pseudoTimeQueue.peek();
698 }
699
700 void close() {
701 updateStreamableBytes(0, false);
702 stream = null;
703 }
704
705 boolean wasStreamReservedOrActivated() {
706 return (flags & STATE_STREAM_ACTIVATED) != 0;
707 }
708
709 void setStreamReservedOrActivated() {
710 flags |= STATE_STREAM_ACTIVATED;
711 }
712
713 boolean isActive() {
714 return (flags & STATE_IS_ACTIVE) != 0;
715 }
716
717 private void setActive() {
718 flags |= STATE_IS_ACTIVE;
719 }
720
721 private void unsetActive() {
722 flags &= ~STATE_IS_ACTIVE;
723 }
724
725 boolean isDistributing() {
726 return (flags & STATE_IS_DISTRIBUTING) != 0;
727 }
728
729 void setDistributing() {
730 flags |= STATE_IS_DISTRIBUTING;
731 }
732
733 void unsetDistributing() {
734 flags &= ~STATE_IS_DISTRIBUTING;
735 }
736
737 @Override
738 public int priorityQueueIndex(DefaultPriorityQueue<?> queue) {
739 return queue == stateOnlyRemovalQueue ? stateOnlyQueueIndex : pseudoTimeQueueIndex;
740 }
741
742 @Override
743 public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i) {
744 if (queue == stateOnlyRemovalQueue) {
745 stateOnlyQueueIndex = i;
746 } else {
747 pseudoTimeQueueIndex = i;
748 }
749 }
750
751 @Override
752 public String toString() {
753
754 StringBuilder sb = new StringBuilder(256 * (activeCountForTree > 0 ? activeCountForTree : 1));
755 toString(sb);
756 return sb.toString();
757 }
758
759 private void toString(StringBuilder sb) {
760 sb.append("{streamId ").append(streamId)
761 .append(" streamableBytes ").append(streamableBytes)
762 .append(" activeCountForTree ").append(activeCountForTree)
763 .append(" pseudoTimeQueueIndex ").append(pseudoTimeQueueIndex)
764 .append(" pseudoTimeToWrite ").append(pseudoTimeToWrite)
765 .append(" pseudoTime ").append(pseudoTime)
766 .append(" flags ").append(flags)
767 .append(" pseudoTimeQueue.size() ").append(pseudoTimeQueue.size())
768 .append(" stateOnlyQueueIndex ").append(stateOnlyQueueIndex)
769 .append(" parent.streamId ").append(parent == null ? -1 : parent.streamId).append("} [");
770
771 if (!pseudoTimeQueue.isEmpty()) {
772 for (State s : pseudoTimeQueue) {
773 s.toString(sb);
774 sb.append(", ");
775 }
776
777 sb.setLength(sb.length() - 2);
778 }
779 sb.append(']');
780 }
781 }
782
783
784
785
786 private static final class ParentChangedEvent {
787 final State state;
788 final State oldParent;
789
790
791
792
793
794
795 ParentChangedEvent(State state, State oldParent) {
796 this.state = state;
797 this.oldParent = oldParent;
798 }
799 }
800 }