View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.util.collection.IntCollections;
18  import io.netty.util.collection.IntObjectHashMap;
19  import io.netty.util.collection.IntObjectMap;
20  import io.netty.util.internal.DefaultPriorityQueue;
21  import io.netty.util.internal.EmptyPriorityQueue;
22  import io.netty.util.internal.MathUtil;
23  import io.netty.util.internal.PriorityQueue;
24  import io.netty.util.internal.PriorityQueueNode;
25  import io.netty.util.internal.SystemPropertyUtil;
26  import io.netty.util.internal.UnstableApi;
27  
28  import java.io.Serializable;
29  import java.util.ArrayList;
30  import java.util.Comparator;
31  import java.util.Iterator;
32  import java.util.List;
33  
34  import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
35  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MIN_ALLOCATION_CHUNK;
36  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
37  import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes;
38  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
39  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
40  import static io.netty.util.internal.ObjectUtil.checkPositive;
41  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
42  import static java.lang.Integer.MAX_VALUE;
43  import static java.lang.Math.max;
44  import static java.lang.Math.min;
45  
46  /**
47   * A {@link StreamByteDistributor} that is sensitive to stream priority and uses
48   * <a href="https://en.wikipedia.org/wiki/Weighted_fair_queueing">Weighted Fair Queueing</a> approach for distributing
49   * bytes.
50   * <p>
51   * Inspiration for this distributor was taken from Linux's
52   * <a href="https://www.kernel.org/doc/Documentation/scheduler/sched-design-CFS.txt">Completely Fair Scheduler</a>
53   * to model the distribution of bytes to simulate an "ideal multi-tasking CPU", but in this case we are simulating
54   * an "ideal multi-tasking NIC".
55   * <p>
56   * Each write operation will use the {@link #allocationQuantum(int)} to know how many more bytes should be allocated
57   * relative to the next stream which wants to write. This is to balance fairness while also considering goodput.
58   */
59  @UnstableApi
60  public final class WeightedFairQueueByteDistributor implements StreamByteDistributor {
61      /**
62       * The initial size of the children map is chosen to be conservative on initial memory allocations under
63       * the assumption that most streams will have a small number of children. This choice may be
64       * sub-optimal if when children are present there are many children (i.e. a web page which has many
65       * dependencies to load).
66       *
67       * Visible only for testing!
68       */
69      static final int INITIAL_CHILDREN_MAP_SIZE =
70              max(1, SystemPropertyUtil.getInt("io.netty.http2.childrenMapSize", 2));
71      /**
72       * FireFox currently uses 5 streams to establish QoS classes.
73       */
74      private static final int DEFAULT_MAX_STATE_ONLY_SIZE = 5;
75  
76      private final Http2Connection.PropertyKey stateKey;
77      /**
78       * If there is no Http2Stream object, but we still persist priority information then this is where the state will
79       * reside.
80       */
81      private final IntObjectMap<State> stateOnlyMap;
82      /**
83       * This queue will hold streams that are not active and provides the capability to retain priority for streams which
84       * have no {@link Http2Stream} object. See {@link StateOnlyComparator} for the priority comparator.
85       */
86      private final PriorityQueue<State> stateOnlyRemovalQueue;
87      private final Http2Connection connection;
88      private final State connectionState;
89      /**
90       * The minimum number of bytes that we will attempt to allocate to a stream. This is to
91       * help improve goodput on a per-stream basis.
92       */
93      private int allocationQuantum = DEFAULT_MIN_ALLOCATION_CHUNK;
94      private final int maxStateOnlySize;
95  
96      public WeightedFairQueueByteDistributor(Http2Connection connection) {
97          this(connection, DEFAULT_MAX_STATE_ONLY_SIZE);
98      }
99  
100     public WeightedFairQueueByteDistributor(Http2Connection connection, int maxStateOnlySize) {
101         checkPositiveOrZero(maxStateOnlySize, "maxStateOnlySize");
102         if (maxStateOnlySize == 0) {
103             stateOnlyMap = IntCollections.emptyMap();
104             stateOnlyRemovalQueue = EmptyPriorityQueue.instance();
105         } else {
106             stateOnlyMap = new IntObjectHashMap<State>(maxStateOnlySize);
107             // +2 because we may exceed the limit by 2 if a new dependency has no associated Http2Stream object. We need
108             // to create the State objects to put them into the dependency tree, which then impacts priority.
109             stateOnlyRemovalQueue = new DefaultPriorityQueue<State>(StateOnlyComparator.INSTANCE, maxStateOnlySize + 2);
110         }
111         this.maxStateOnlySize = maxStateOnlySize;
112 
113         this.connection = connection;
114         stateKey = connection.newKey();
115         final Http2Stream connectionStream = connection.connectionStream();
116         connectionStream.setProperty(stateKey, connectionState = new State(connectionStream, 16));
117 
118         // Register for notification of new streams.
119         connection.addListener(new Http2ConnectionAdapter() {
120             @Override
121             public void onStreamAdded(Http2Stream stream) {
122                 State state = stateOnlyMap.remove(stream.id());
123                 if (state == null) {
124                     state = new State(stream);
125                     // Only the stream which was just added will change parents. So we only need an array of size 1.
126                     List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1);
127                     connectionState.takeChild(state, false, events);
128                     notifyParentChanged(events);
129                 } else {
130                     stateOnlyRemovalQueue.removeTyped(state);
131                     state.stream = stream;
132                 }
133                 switch (stream.state()) {
134                     case RESERVED_REMOTE:
135                     case RESERVED_LOCAL:
136                         state.setStreamReservedOrActivated();
137                         // wasStreamReservedOrActivated is part of the comparator for stateOnlyRemovalQueue there is no
138                         // need to reprioritize here because it will not be in stateOnlyRemovalQueue.
139                         break;
140                     default:
141                         break;
142                 }
143                 stream.setProperty(stateKey, state);
144             }
145 
146             @Override
147             public void onStreamActive(Http2Stream stream) {
148                 state(stream).setStreamReservedOrActivated();
149                 // wasStreamReservedOrActivated is part of the comparator for stateOnlyRemovalQueue there is no need to
150                 // reprioritize here because it will not be in stateOnlyRemovalQueue.
151             }
152 
153             @Override
154             public void onStreamClosed(Http2Stream stream) {
155                 state(stream).close();
156             }
157 
158             @Override
159             public void onStreamRemoved(Http2Stream stream) {
160                 // The stream has been removed from the connection. We can no longer rely on the stream's property
161                 // storage to track the State. If we have room, and the precedence of the stream is sufficient, we
162                 // should retain the State in the stateOnlyMap.
163                 State state = state(stream);
164 
165                 // Typically the stream is set to null when the stream is closed because it is no longer needed to write
166                 // data. However if the stream was not activated it may not be closed (reserved streams) so we ensure
167                 // the stream reference is set to null to avoid retaining a reference longer than necessary.
168                 state.stream = null;
169 
170                 if (WeightedFairQueueByteDistributor.this.maxStateOnlySize == 0) {
171                     state.parent.removeChild(state);
172                     return;
173                 }
174                 if (stateOnlyRemovalQueue.size() == WeightedFairQueueByteDistributor.this.maxStateOnlySize) {
175                     State stateToRemove = stateOnlyRemovalQueue.peek();
176                     if (StateOnlyComparator.INSTANCE.compare(stateToRemove, state) >= 0) {
177                         // The "lowest priority" stream is a "higher priority" than the stream being removed, so we
178                         // just discard the state.
179                         state.parent.removeChild(state);
180                         return;
181                     }
182                     stateOnlyRemovalQueue.poll();
183                     stateToRemove.parent.removeChild(stateToRemove);
184                     stateOnlyMap.remove(stateToRemove.streamId);
185                 }
186                 stateOnlyRemovalQueue.add(state);
187                 stateOnlyMap.put(state.streamId, state);
188             }
189         });
190     }
191 
192     @Override
193     public void updateStreamableBytes(StreamState state) {
194         state(state.stream()).updateStreamableBytes(streamableBytes(state),
195                                                     state.hasFrame() && state.windowSize() >= 0);
196     }
197 
198     @Override
199     public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
200         State state = state(childStreamId);
201         if (state == null) {
202             // If there is no State object that means there is no Http2Stream object and we would have to keep the
203             // State object in the stateOnlyMap and stateOnlyRemovalQueue. However if maxStateOnlySize is 0 this means
204             // stateOnlyMap and stateOnlyRemovalQueue are empty collections and cannot be modified so we drop the State.
205             if (maxStateOnlySize == 0) {
206                 return;
207             }
208             state = new State(childStreamId);
209             stateOnlyRemovalQueue.add(state);
210             stateOnlyMap.put(childStreamId, state);
211         }
212 
213         State newParent = state(parentStreamId);
214         if (newParent == null) {
215             // If there is no State object that means there is no Http2Stream object and we would have to keep the
216             // State object in the stateOnlyMap and stateOnlyRemovalQueue. However if maxStateOnlySize is 0 this means
217             // stateOnlyMap and stateOnlyRemovalQueue are empty collections and cannot be modified so we drop the State.
218             if (maxStateOnlySize == 0) {
219                 return;
220             }
221             newParent = new State(parentStreamId);
222             stateOnlyRemovalQueue.add(newParent);
223             stateOnlyMap.put(parentStreamId, newParent);
224             // Only the stream which was just added will change parents. So we only need an array of size 1.
225             List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1);
226             connectionState.takeChild(newParent, false, events);
227             notifyParentChanged(events);
228         }
229 
230         // if activeCountForTree == 0 then it will not be in its parent's pseudoTimeQueue and thus should not be counted
231         // toward parent.totalQueuedWeights.
232         if (state.activeCountForTree != 0 && state.parent != null) {
233             state.parent.totalQueuedWeights += weight - state.weight;
234         }
235         state.weight = weight;
236 
237         if (newParent != state.parent || exclusive && newParent.children.size() != 1) {
238             final List<ParentChangedEvent> events;
239             if (newParent.isDescendantOf(state)) {
240                 events = new ArrayList<ParentChangedEvent>(2 + (exclusive ? newParent.children.size() : 0));
241                 state.parent.takeChild(newParent, false, events);
242             } else {
243                 events = new ArrayList<ParentChangedEvent>(1 + (exclusive ? newParent.children.size() : 0));
244             }
245             newParent.takeChild(state, exclusive, events);
246             notifyParentChanged(events);
247         }
248 
249         // The location in the dependency tree impacts the priority in the stateOnlyRemovalQueue map. If we created new
250         // State objects we must check if we exceeded the limit after we insert into the dependency tree to ensure the
251         // stateOnlyRemovalQueue has been updated.
252         while (stateOnlyRemovalQueue.size() > maxStateOnlySize) {
253             State stateToRemove = stateOnlyRemovalQueue.poll();
254             stateToRemove.parent.removeChild(stateToRemove);
255             stateOnlyMap.remove(stateToRemove.streamId);
256         }
257     }
258 
259     @Override
260     public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
261         // As long as there is some active frame we should write at least 1 time.
262         if (connectionState.activeCountForTree == 0) {
263             return false;
264         }
265 
266         // The goal is to write until we write all the allocated bytes or are no longer making progress.
267         // We still attempt to write even after the number of allocated bytes has been exhausted to allow empty frames
268         // to be sent. Making progress means the active streams rooted at the connection stream has changed.
269         int oldIsActiveCountForTree;
270         do {
271             oldIsActiveCountForTree = connectionState.activeCountForTree;
272             // connectionState will never be active, so go right to its children.
273             maxBytes -= distributeToChildren(maxBytes, writer, connectionState);
274         } while (connectionState.activeCountForTree != 0 &&
275                 (maxBytes > 0 || oldIsActiveCountForTree != connectionState.activeCountForTree));
276 
277         return connectionState.activeCountForTree != 0;
278     }
279 
280     /**
281      * Sets the amount of bytes that will be allocated to each stream. Defaults to 1KiB.
282      * @param allocationQuantum the amount of bytes that will be allocated to each stream. Must be &gt; 0.
283      */
284     public void allocationQuantum(int allocationQuantum) {
285         checkPositive(allocationQuantum, "allocationQuantum");
286         this.allocationQuantum = allocationQuantum;
287     }
288 
289     private int distribute(int maxBytes, Writer writer, State state) throws Http2Exception {
290         if (state.isActive()) {
291             int nsent = min(maxBytes, state.streamableBytes);
292             state.write(nsent, writer);
293             if (nsent == 0 && maxBytes != 0) {
294                 // If a stream sends zero bytes, then we gave it a chance to write empty frames and it is now
295                 // considered inactive until the next call to updateStreamableBytes. This allows descendant streams to
296                 // be allocated bytes when the parent stream can't utilize them. This may be as a result of the
297                 // stream's flow control window being 0.
298                 state.updateStreamableBytes(state.streamableBytes, false);
299             }
300             return nsent;
301         }
302 
303         return distributeToChildren(maxBytes, writer, state);
304     }
305 
306     /**
307      * It is a pre-condition that {@code state.poll()} returns a non-{@code null} value. This is a result of the way
308      * the allocation algorithm is structured and can be explained in the following cases:
309      * <h3>For the recursive case</h3>
310      * If a stream has no children (in the allocation tree) than that node must be active or it will not be in the
311      * allocation tree. If a node is active then it will not delegate to children and recursion ends.
312      * <h3>For the initial case</h3>
313      * We check connectionState.activeCountForTree == 0 before any allocation is done. So if the connection stream
314      * has no active children we don't get into this method.
315      */
316     private int distributeToChildren(int maxBytes, Writer writer, State state) throws Http2Exception {
317         long oldTotalQueuedWeights = state.totalQueuedWeights;
318         State childState = state.pollPseudoTimeQueue();
319         State nextChildState = state.peekPseudoTimeQueue();
320         childState.setDistributing();
321         try {
322             assert nextChildState == null || nextChildState.pseudoTimeToWrite >= childState.pseudoTimeToWrite :
323                 "nextChildState[" + nextChildState.streamId + "].pseudoTime(" + nextChildState.pseudoTimeToWrite +
324                 ") < " + " childState[" + childState.streamId + "].pseudoTime(" + childState.pseudoTimeToWrite + ')';
325             int nsent = distribute(nextChildState == null ? maxBytes :
326                             min(maxBytes, (int) min((nextChildState.pseudoTimeToWrite - childState.pseudoTimeToWrite) *
327                                                childState.weight / oldTotalQueuedWeights + allocationQuantum, MAX_VALUE)
328                                ),
329                                writer,
330                                childState);
331             state.pseudoTime += nsent;
332             childState.updatePseudoTime(state, nsent, oldTotalQueuedWeights);
333             return nsent;
334         } finally {
335             childState.unsetDistributing();
336             // Do in finally to ensure the internal flags is not corrupted if an exception is thrown.
337             // The offer operation is delayed until we unroll up the recursive stack, so we don't have to remove from
338             // the priority pseudoTimeQueue due to a write operation.
339             if (childState.activeCountForTree != 0) {
340                 state.offerPseudoTimeQueue(childState);
341             }
342         }
343     }
344 
345     private State state(Http2Stream stream) {
346         return stream.getProperty(stateKey);
347     }
348 
349     private State state(int streamId) {
350         Http2Stream stream = connection.stream(streamId);
351         return stream != null ? state(stream) : stateOnlyMap.get(streamId);
352     }
353 
354     /**
355      * For testing only!
356      */
357     boolean isChild(int childId, int parentId, short weight) {
358         State parent = state(parentId);
359         State child;
360         return parent.children.containsKey(childId) &&
361                 (child = state(childId)).parent == parent && child.weight == weight;
362     }
363 
364     /**
365      * For testing only!
366      */
367     int numChildren(int streamId) {
368         State state = state(streamId);
369         return state == null ? 0 : state.children.size();
370     }
371 
372     /**
373      * Notify all listeners of the priority tree change events (in ascending order)
374      * @param events The events (top down order) which have changed
375      */
376     void notifyParentChanged(List<ParentChangedEvent> events) {
377         for (int i = 0; i < events.size(); ++i) {
378             ParentChangedEvent event = events.get(i);
379             stateOnlyRemovalQueue.priorityChanged(event.state);
380             if (event.state.parent != null && event.state.activeCountForTree != 0) {
381                 event.state.parent.offerAndInitializePseudoTime(event.state);
382                 event.state.parent.activeCountChangeForTree(event.state.activeCountForTree);
383             }
384         }
385     }
386 
387     /**
388      * A comparator for {@link State} which has no associated {@link Http2Stream} object. The general precedence is:
389      * <ul>
390      *     <li>Was a stream activated or reserved (streams only used for priority are higher priority)</li>
391      *     <li>Depth in the priority tree (closer to root is higher priority></li>
392      *     <li>Stream ID (higher stream ID is higher priority - used for tie breaker)</li>
393      * </ul>
394      */
395     private static final class StateOnlyComparator implements Comparator<State>, Serializable {
396         private static final long serialVersionUID = -4806936913002105966L;
397 
398         static final StateOnlyComparator INSTANCE = new StateOnlyComparator();
399 
400         @Override
401         public int compare(State o1, State o2) {
402             // "priority only streams" (which have not been activated) are higher priority than streams used for data.
403             boolean o1Actived = o1.wasStreamReservedOrActivated();
404             if (o1Actived != o2.wasStreamReservedOrActivated()) {
405                 return o1Actived ? -1 : 1;
406             }
407             // Numerically greater depth is higher priority.
408             int x = o2.dependencyTreeDepth - o1.dependencyTreeDepth;
409 
410             // I also considered tracking the number of streams which are "activated" (eligible transfer data) at each
411             // subtree. This would require a traversal from each node to the root on dependency tree structural changes,
412             // and then it would require a re-prioritization at each of these nodes (instead of just the nodes where the
413             // direct parent changed). The costs of this are judged to be relatively high compared to the nominal
414             // benefit it provides to the heuristic. Instead folks should just increase maxStateOnlySize.
415 
416             // Last resort is to give larger stream ids more priority.
417             return x != 0 ? x : o1.streamId - o2.streamId;
418         }
419     }
420 
421     private static final class StatePseudoTimeComparator implements Comparator<State>, Serializable {
422         private static final long serialVersionUID = -1437548640227161828L;
423 
424         static final StatePseudoTimeComparator INSTANCE = new StatePseudoTimeComparator();
425 
426         @Override
427         public int compare(State o1, State o2) {
428             return MathUtil.compare(o1.pseudoTimeToWrite, o2.pseudoTimeToWrite);
429         }
430     }
431 
432     /**
433      * The remote flow control state for a single stream.
434      */
435     private final class State implements PriorityQueueNode {
436         private static final byte STATE_IS_ACTIVE = 0x1;
437         private static final byte STATE_IS_DISTRIBUTING = 0x2;
438         private static final byte STATE_STREAM_ACTIVATED = 0x4;
439 
440         /**
441          * Maybe {@code null} if the stream if the stream is not active.
442          */
443         Http2Stream stream;
444         State parent;
445         IntObjectMap<State> children = IntCollections.emptyMap();
446         private final PriorityQueue<State> pseudoTimeQueue;
447         final int streamId;
448         int streamableBytes;
449         int dependencyTreeDepth;
450         /**
451          * Count of nodes rooted at this sub tree with {@link #isActive()} equal to {@code true}.
452          */
453         int activeCountForTree;
454         private int pseudoTimeQueueIndex = INDEX_NOT_IN_QUEUE;
455         private int stateOnlyQueueIndex = INDEX_NOT_IN_QUEUE;
456         /**
457          * An estimate of when this node should be given the opportunity to write data.
458          */
459         long pseudoTimeToWrite;
460         /**
461          * A pseudo time maintained for immediate children to base their {@link #pseudoTimeToWrite} off of.
462          */
463         long pseudoTime;
464         long totalQueuedWeights;
465         private byte flags;
466         short weight = DEFAULT_PRIORITY_WEIGHT;
467 
468         State(int streamId) {
469             this(streamId, null, 0);
470         }
471 
472         State(Http2Stream stream) {
473             this(stream, 0);
474         }
475 
476         State(Http2Stream stream, int initialSize) {
477             this(stream.id(), stream, initialSize);
478         }
479 
480         State(int streamId, Http2Stream stream, int initialSize) {
481             this.stream = stream;
482             this.streamId = streamId;
483             pseudoTimeQueue = new DefaultPriorityQueue<State>(StatePseudoTimeComparator.INSTANCE, initialSize);
484         }
485 
486         boolean isDescendantOf(State state) {
487             State next = parent;
488             while (next != null) {
489                 if (next == state) {
490                     return true;
491                 }
492                 next = next.parent;
493             }
494             return false;
495         }
496 
497         void takeChild(State child, boolean exclusive, List<ParentChangedEvent> events) {
498             takeChild(null, child, exclusive, events);
499         }
500 
501         /**
502          * Adds a child to this priority. If exclusive is set, any children of this node are moved to being dependent on
503          * the child.
504          */
505         void takeChild(Iterator<IntObjectMap.PrimitiveEntry<State>> childItr, State child, boolean exclusive,
506                        List<ParentChangedEvent> events) {
507             State oldParent = child.parent;
508 
509             if (oldParent != this) {
510                 events.add(new ParentChangedEvent(child, oldParent));
511                 child.setParent(this);
512                 // If the childItr is not null we are iterating over the oldParent.children collection and should
513                 // use the iterator to remove from the collection to avoid concurrent modification. Otherwise it is
514                 // assumed we are not iterating over this collection and it is safe to call remove directly.
515                 if (childItr != null) {
516                     childItr.remove();
517                 } else if (oldParent != null) {
518                     oldParent.children.remove(child.streamId);
519                 }
520 
521                 // Lazily initialize the children to save object allocations.
522                 initChildrenIfEmpty();
523 
524                 final State oldChild = children.put(child.streamId, child);
525                 assert oldChild == null : "A stream with the same stream ID was already in the child map.";
526             }
527 
528             if (exclusive && !children.isEmpty()) {
529                 // If it was requested that this child be the exclusive dependency of this node,
530                 // move any previous children to the child node, becoming grand children of this node.
531                 Iterator<IntObjectMap.PrimitiveEntry<State>> itr = removeAllChildrenExcept(child).entries().iterator();
532                 while (itr.hasNext()) {
533                     child.takeChild(itr, itr.next().value(), false, events);
534                 }
535             }
536         }
537 
538         /**
539          * Removes the child priority and moves any of its dependencies to being direct dependencies on this node.
540          */
541         void removeChild(State child) {
542             if (children.remove(child.streamId) != null) {
543                 List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1 + child.children.size());
544                 events.add(new ParentChangedEvent(child, child.parent));
545                 child.setParent(null);
546 
547                 if (!child.children.isEmpty()) {
548                     // Move up any grand children to be directly dependent on this node.
549                     Iterator<IntObjectMap.PrimitiveEntry<State>> itr = child.children.entries().iterator();
550                     long totalWeight = child.getTotalWeight();
551                     do {
552                         // Redistribute the weight of child to its dependency proportionally.
553                         State dependency = itr.next().value();
554                         dependency.weight = (short) max(1, dependency.weight * child.weight / totalWeight);
555                         takeChild(itr, dependency, false, events);
556                     } while (itr.hasNext());
557                 }
558 
559                 notifyParentChanged(events);
560             }
561         }
562 
563         private long getTotalWeight() {
564             long totalWeight = 0L;
565             for (State state : children.values()) {
566                 totalWeight += state.weight;
567             }
568             return totalWeight;
569         }
570 
571         /**
572          * Remove all children with the exception of {@code streamToRetain}.
573          * This method is intended to be used to support an exclusive priority dependency operation.
574          * @return The map of children prior to this operation, excluding {@code streamToRetain} if present.
575          */
576         private IntObjectMap<State> removeAllChildrenExcept(State stateToRetain) {
577             stateToRetain = children.remove(stateToRetain.streamId);
578             IntObjectMap<State> prevChildren = children;
579             // This map should be re-initialized in anticipation for the 1 exclusive child which will be added.
580             // It will either be added directly in this method, or after this method is called...but it will be added.
581             initChildren();
582             if (stateToRetain != null) {
583                 children.put(stateToRetain.streamId, stateToRetain);
584             }
585             return prevChildren;
586         }
587 
588         private void setParent(State newParent) {
589             // if activeCountForTree == 0 then it will not be in its parent's pseudoTimeQueue.
590             if (activeCountForTree != 0 && parent != null) {
591                 parent.removePseudoTimeQueue(this);
592                 parent.activeCountChangeForTree(-activeCountForTree);
593             }
594             parent = newParent;
595             // Use MAX_VALUE if no parent because lower depth is considered higher priority by StateOnlyComparator.
596             dependencyTreeDepth = newParent == null ? MAX_VALUE : newParent.dependencyTreeDepth + 1;
597         }
598 
599         private void initChildrenIfEmpty() {
600             if (children == IntCollections.<State>emptyMap()) {
601                 initChildren();
602             }
603         }
604 
605         private void initChildren() {
606             children = new IntObjectHashMap<State>(INITIAL_CHILDREN_MAP_SIZE);
607         }
608 
609         void write(int numBytes, Writer writer) throws Http2Exception {
610             assert stream != null;
611             try {
612                 writer.write(stream, numBytes);
613             } catch (Throwable t) {
614                 throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
615             }
616         }
617 
618         void activeCountChangeForTree(int increment) {
619             assert activeCountForTree + increment >= 0;
620             activeCountForTree += increment;
621             if (parent != null) {
622                 assert activeCountForTree != increment ||
623                        pseudoTimeQueueIndex == INDEX_NOT_IN_QUEUE ||
624                        parent.pseudoTimeQueue.containsTyped(this) :
625                      "State[" + streamId + "].activeCountForTree changed from 0 to " + increment + " is in a " +
626                      "pseudoTimeQueue, but not in parent[ " + parent.streamId + "]'s pseudoTimeQueue";
627                 if (activeCountForTree == 0) {
628                     parent.removePseudoTimeQueue(this);
629                 } else if (activeCountForTree == increment && !isDistributing()) {
630                     // If frame count was 0 but is now not, and this node is not already in a pseudoTimeQueue (assumed
631                     // to be pState's pseudoTimeQueue) then enqueue it. If this State object is being processed the
632                     // pseudoTime for this node should not be adjusted, and the node will be added back to the
633                     // pseudoTimeQueue/tree structure after it is done being processed. This may happen if the
634                     // activeCountForTree == 0 (a node which can't stream anything and is blocked) is at/near root of
635                     // the tree, and is popped off the pseudoTimeQueue during processing, and then put back on the
636                     // pseudoTimeQueue because a child changes position in the priority tree (or is closed because it is
637                     // not blocked and finished writing all data).
638                     parent.offerAndInitializePseudoTime(this);
639                 }
640                 parent.activeCountChangeForTree(increment);
641             }
642         }
643 
644         void updateStreamableBytes(int newStreamableBytes, boolean isActive) {
645             if (isActive() != isActive) {
646                 if (isActive) {
647                     activeCountChangeForTree(1);
648                     setActive();
649                 } else {
650                     activeCountChangeForTree(-1);
651                     unsetActive();
652                 }
653             }
654 
655             streamableBytes = newStreamableBytes;
656         }
657 
658         /**
659          * Assumes the parents {@link #totalQueuedWeights} includes this node's weight.
660          */
661         void updatePseudoTime(State parentState, int nsent, long totalQueuedWeights) {
662             assert streamId != CONNECTION_STREAM_ID && nsent >= 0;
663             // If the current pseudoTimeToSend is greater than parentState.pseudoTime then we previously over accounted
664             // and should use parentState.pseudoTime.
665             pseudoTimeToWrite = min(pseudoTimeToWrite, parentState.pseudoTime) + nsent * totalQueuedWeights / weight;
666         }
667 
668         /**
669          * The concept of pseudoTime can be influenced by priority tree manipulations or if a stream goes from "active"
670          * to "non-active". This method accounts for that by initializing the {@link #pseudoTimeToWrite} for
671          * {@code state} to {@link #pseudoTime} of this node and then calls {@link #offerPseudoTimeQueue(State)}.
672          */
673         void offerAndInitializePseudoTime(State state) {
674             state.pseudoTimeToWrite = pseudoTime;
675             offerPseudoTimeQueue(state);
676         }
677 
678         void offerPseudoTimeQueue(State state) {
679             pseudoTimeQueue.offer(state);
680             totalQueuedWeights += state.weight;
681         }
682 
683         /**
684          * Must only be called if the pseudoTimeQueue is non-empty!
685          */
686         State pollPseudoTimeQueue() {
687             State state = pseudoTimeQueue.poll();
688             // This method is only ever called if the pseudoTimeQueue is non-empty.
689             totalQueuedWeights -= state.weight;
690             return state;
691         }
692 
693         void removePseudoTimeQueue(State state) {
694             if (pseudoTimeQueue.removeTyped(state)) {
695                 totalQueuedWeights -= state.weight;
696             }
697         }
698 
699         State peekPseudoTimeQueue() {
700             return pseudoTimeQueue.peek();
701         }
702 
703         void close() {
704             updateStreamableBytes(0, false);
705             stream = null;
706         }
707 
708         boolean wasStreamReservedOrActivated() {
709             return (flags & STATE_STREAM_ACTIVATED) != 0;
710         }
711 
712         void setStreamReservedOrActivated() {
713             flags |= STATE_STREAM_ACTIVATED;
714         }
715 
716         boolean isActive() {
717             return (flags & STATE_IS_ACTIVE) != 0;
718         }
719 
720         private void setActive() {
721             flags |= STATE_IS_ACTIVE;
722         }
723 
724         private void unsetActive() {
725             flags &= ~STATE_IS_ACTIVE;
726         }
727 
728         boolean isDistributing() {
729             return (flags & STATE_IS_DISTRIBUTING) != 0;
730         }
731 
732         void setDistributing() {
733             flags |= STATE_IS_DISTRIBUTING;
734         }
735 
736         void unsetDistributing() {
737             flags &= ~STATE_IS_DISTRIBUTING;
738         }
739 
740         @Override
741         public int priorityQueueIndex(DefaultPriorityQueue<?> queue) {
742             return queue == stateOnlyRemovalQueue ? stateOnlyQueueIndex : pseudoTimeQueueIndex;
743         }
744 
745         @Override
746         public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i) {
747             if (queue == stateOnlyRemovalQueue) {
748                 stateOnlyQueueIndex = i;
749             } else {
750                 pseudoTimeQueueIndex = i;
751             }
752         }
753 
754         @Override
755         public String toString() {
756             // Use activeCountForTree as a rough estimate for how many nodes are in this subtree.
757             StringBuilder sb = new StringBuilder(256 * (activeCountForTree > 0 ? activeCountForTree : 1));
758             toString(sb);
759             return sb.toString();
760         }
761 
762         private void toString(StringBuilder sb) {
763             sb.append("{streamId ").append(streamId)
764                     .append(" streamableBytes ").append(streamableBytes)
765                     .append(" activeCountForTree ").append(activeCountForTree)
766                     .append(" pseudoTimeQueueIndex ").append(pseudoTimeQueueIndex)
767                     .append(" pseudoTimeToWrite ").append(pseudoTimeToWrite)
768                     .append(" pseudoTime ").append(pseudoTime)
769                     .append(" flags ").append(flags)
770                     .append(" pseudoTimeQueue.size() ").append(pseudoTimeQueue.size())
771                     .append(" stateOnlyQueueIndex ").append(stateOnlyQueueIndex)
772                     .append(" parent.streamId ").append(parent == null ? -1 : parent.streamId).append("} [");
773 
774             if (!pseudoTimeQueue.isEmpty()) {
775                 for (State s : pseudoTimeQueue) {
776                     s.toString(sb);
777                     sb.append(", ");
778                 }
779                 // Remove the last ", "
780                 sb.setLength(sb.length() - 2);
781             }
782             sb.append(']');
783         }
784     }
785 
786     /**
787      * Allows a correlation to be made between a stream and its old parent before a parent change occurs.
788      */
789     private static final class ParentChangedEvent {
790         final State state;
791         final State oldParent;
792 
793         /**
794          * Create a new instance.
795          * @param state The state who has had a parent change.
796          * @param oldParent The previous parent.
797          */
798         ParentChangedEvent(State state, State oldParent) {
799             this.state = state;
800             this.oldParent = oldParent;
801         }
802     }
803 }