View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.util.collection.IntCollections;
18  import io.netty.util.collection.IntObjectHashMap;
19  import io.netty.util.collection.IntObjectMap;
20  import io.netty.util.internal.DefaultPriorityQueue;
21  import io.netty.util.internal.EmptyPriorityQueue;
22  import io.netty.util.internal.MathUtil;
23  import io.netty.util.internal.PriorityQueue;
24  import io.netty.util.internal.PriorityQueueNode;
25  import io.netty.util.internal.SystemPropertyUtil;
26  import io.netty.util.internal.UnstableApi;
27  
28  import java.io.Serializable;
29  import java.util.ArrayList;
30  import java.util.Comparator;
31  import java.util.Iterator;
32  import java.util.List;
33  
34  import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
35  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MIN_ALLOCATION_CHUNK;
36  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
37  import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes;
38  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
39  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
40  import static java.lang.Integer.MAX_VALUE;
41  import static java.lang.Math.max;
42  import static java.lang.Math.min;
43  
44  /**
45   * A {@link StreamByteDistributor} that is sensitive to stream priority and uses
46   * <a href="https://en.wikipedia.org/wiki/Weighted_fair_queueing">Weighted Fair Queueing</a> approach for distributing
47   * bytes.
48   * <p>
49   * Inspiration for this distributor was taken from Linux's
50   * <a href="https://www.kernel.org/doc/Documentation/scheduler/sched-design-CFS.txt">Completely Fair Scheduler</a>
51   * to model the distribution of bytes to simulate an "ideal multi-tasking CPU", but in this case we are simulating
52   * an "ideal multi-tasking NIC".
53   * <p>
54   * Each write operation will use the {@link #allocationQuantum(int)} to know how many more bytes should be allocated
55   * relative to the next stream which wants to write. This is to balance fairness while also considering goodput.
56   */
57  @UnstableApi
58  public final class WeightedFairQueueByteDistributor implements StreamByteDistributor {
59      /**
60       * The initial size of the children map is chosen to be conservative on initial memory allocations under
61       * the assumption that most streams will have a small number of children. This choice may be
62       * sub-optimal if when children are present there are many children (i.e. a web page which has many
63       * dependencies to load).
64       *
65       * Visible only for testing!
66       */
67      static final int INITIAL_CHILDREN_MAP_SIZE =
68              max(1, SystemPropertyUtil.getInt("io.netty.http2.childrenMapSize", 2));
69      /**
70       * FireFox currently uses 5 streams to establish QoS classes.
71       */
72      private static final int DEFAULT_MAX_STATE_ONLY_SIZE = 5;
73  
74      private final Http2Connection.PropertyKey stateKey;
75      /**
76       * If there is no Http2Stream object, but we still persist priority information then this is where the state will
77       * reside.
78       */
79      private final IntObjectMap<State> stateOnlyMap;
80      /**
81       * This queue will hold streams that are not active and provides the capability to retain priority for streams which
82       * have no {@link Http2Stream} object. See {@link StateOnlyComparator} for the priority comparator.
83       */
84      private final PriorityQueue<State> stateOnlyRemovalQueue;
85      private final Http2Connection connection;
86      private final State connectionState;
87      /**
88       * The minimum number of bytes that we will attempt to allocate to a stream. This is to
89       * help improve goodput on a per-stream basis.
90       */
91      private int allocationQuantum = DEFAULT_MIN_ALLOCATION_CHUNK;
92      private final int maxStateOnlySize;
93  
94      public WeightedFairQueueByteDistributor(Http2Connection connection) {
95          this(connection, DEFAULT_MAX_STATE_ONLY_SIZE);
96      }
97  
98      public WeightedFairQueueByteDistributor(Http2Connection connection, int maxStateOnlySize) {
99          if (maxStateOnlySize < 0) {
100             throw new IllegalArgumentException("maxStateOnlySize: " + maxStateOnlySize + " (expected: >0)");
101         } else if (maxStateOnlySize == 0) {
102             stateOnlyMap = IntCollections.emptyMap();
103             stateOnlyRemovalQueue = EmptyPriorityQueue.instance();
104         } else {
105             stateOnlyMap = new IntObjectHashMap<State>(maxStateOnlySize);
106             // +2 because we may exceed the limit by 2 if a new dependency has no associated Http2Stream object. We need
107             // to create the State objects to put them into the dependency tree, which then impacts priority.
108             stateOnlyRemovalQueue = new DefaultPriorityQueue<State>(StateOnlyComparator.INSTANCE, maxStateOnlySize + 2);
109         }
110         this.maxStateOnlySize = maxStateOnlySize;
111 
112         this.connection = connection;
113         stateKey = connection.newKey();
114         final Http2Stream connectionStream = connection.connectionStream();
115         connectionStream.setProperty(stateKey, connectionState = new State(connectionStream, 16));
116 
117         // Register for notification of new streams.
118         connection.addListener(new Http2ConnectionAdapter() {
119             @Override
120             public void onStreamAdded(Http2Stream stream) {
121                 State state = stateOnlyMap.remove(stream.id());
122                 if (state == null) {
123                     state = new State(stream);
124                     // Only the stream which was just added will change parents. So we only need an array of size 1.
125                     List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1);
126                     connectionState.takeChild(state, false, events);
127                     notifyParentChanged(events);
128                 } else {
129                     stateOnlyRemovalQueue.removeTyped(state);
130                     state.stream = stream;
131                 }
132                 switch (stream.state()) {
133                     case RESERVED_REMOTE:
134                     case RESERVED_LOCAL:
135                         state.setStreamReservedOrActivated();
136                         // wasStreamReservedOrActivated is part of the comparator for stateOnlyRemovalQueue there is no
137                         // need to reprioritize here because it will not be in stateOnlyRemovalQueue.
138                         break;
139                     default:
140                         break;
141                 }
142                 stream.setProperty(stateKey, state);
143             }
144 
145             @Override
146             public void onStreamActive(Http2Stream stream) {
147                 state(stream).setStreamReservedOrActivated();
148                 // wasStreamReservedOrActivated is part of the comparator for stateOnlyRemovalQueue there is no need to
149                 // reprioritize here because it will not be in stateOnlyRemovalQueue.
150             }
151 
152             @Override
153             public void onStreamClosed(Http2Stream stream) {
154                 state(stream).close();
155             }
156 
157             @Override
158             public void onStreamRemoved(Http2Stream stream) {
159                 // The stream has been removed from the connection. We can no longer rely on the stream's property
160                 // storage to track the State. If we have room, and the precedence of the stream is sufficient, we
161                 // should retain the State in the stateOnlyMap.
162                 State state = state(stream);
163 
164                 // Typically the stream is set to null when the stream is closed because it is no longer needed to write
165                 // data. However if the stream was not activated it may not be closed (reserved streams) so we ensure
166                 // the stream reference is set to null to avoid retaining a reference longer than necessary.
167                 state.stream = null;
168 
169                 if (WeightedFairQueueByteDistributor.this.maxStateOnlySize == 0) {
170                     state.parent.removeChild(state);
171                     return;
172                 }
173                 if (stateOnlyRemovalQueue.size() == WeightedFairQueueByteDistributor.this.maxStateOnlySize) {
174                     State stateToRemove = stateOnlyRemovalQueue.peek();
175                     if (StateOnlyComparator.INSTANCE.compare(stateToRemove, state) >= 0) {
176                         // The "lowest priority" stream is a "higher priority" than the stream being removed, so we
177                         // just discard the state.
178                         state.parent.removeChild(state);
179                         return;
180                     }
181                     stateOnlyRemovalQueue.poll();
182                     stateToRemove.parent.removeChild(stateToRemove);
183                     stateOnlyMap.remove(stateToRemove.streamId);
184                 }
185                 stateOnlyRemovalQueue.add(state);
186                 stateOnlyMap.put(state.streamId, state);
187             }
188         });
189     }
190 
191     @Override
192     public void updateStreamableBytes(StreamState state) {
193         state(state.stream()).updateStreamableBytes(streamableBytes(state),
194                                                     state.hasFrame() && state.windowSize() >= 0);
195     }
196 
197     @Override
198     public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
199         State state = state(childStreamId);
200         if (state == null) {
201             // If there is no State object that means there is no Http2Stream object and we would have to keep the
202             // State object in the stateOnlyMap and stateOnlyRemovalQueue. However if maxStateOnlySize is 0 this means
203             // stateOnlyMap and stateOnlyRemovalQueue are empty collections and cannot be modified so we drop the State.
204             if (maxStateOnlySize == 0) {
205                 return;
206             }
207             state = new State(childStreamId);
208             stateOnlyRemovalQueue.add(state);
209             stateOnlyMap.put(childStreamId, state);
210         }
211 
212         State newParent = state(parentStreamId);
213         if (newParent == null) {
214             // If there is no State object that means there is no Http2Stream object and we would have to keep the
215             // State object in the stateOnlyMap and stateOnlyRemovalQueue. However if maxStateOnlySize is 0 this means
216             // stateOnlyMap and stateOnlyRemovalQueue are empty collections and cannot be modified so we drop the State.
217             if (maxStateOnlySize == 0) {
218                 return;
219             }
220             newParent = new State(parentStreamId);
221             stateOnlyRemovalQueue.add(newParent);
222             stateOnlyMap.put(parentStreamId, newParent);
223             // Only the stream which was just added will change parents. So we only need an array of size 1.
224             List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1);
225             connectionState.takeChild(newParent, false, events);
226             notifyParentChanged(events);
227         }
228 
229         // if activeCountForTree == 0 then it will not be in its parent's pseudoTimeQueue and thus should not be counted
230         // toward parent.totalQueuedWeights.
231         if (state.activeCountForTree != 0 && state.parent != null) {
232             state.parent.totalQueuedWeights += weight - state.weight;
233         }
234         state.weight = weight;
235 
236         if (newParent != state.parent || (exclusive && newParent.children.size() != 1)) {
237             final List<ParentChangedEvent> events;
238             if (newParent.isDescendantOf(state)) {
239                 events = new ArrayList<ParentChangedEvent>(2 + (exclusive ? newParent.children.size() : 0));
240                 state.parent.takeChild(newParent, false, events);
241             } else {
242                 events = new ArrayList<ParentChangedEvent>(1 + (exclusive ? newParent.children.size() : 0));
243             }
244             newParent.takeChild(state, exclusive, events);
245             notifyParentChanged(events);
246         }
247 
248         // The location in the dependency tree impacts the priority in the stateOnlyRemovalQueue map. If we created new
249         // State objects we must check if we exceeded the limit after we insert into the dependency tree to ensure the
250         // stateOnlyRemovalQueue has been updated.
251         while (stateOnlyRemovalQueue.size() > maxStateOnlySize) {
252             State stateToRemove = stateOnlyRemovalQueue.poll();
253             stateToRemove.parent.removeChild(stateToRemove);
254             stateOnlyMap.remove(stateToRemove.streamId);
255         }
256     }
257 
258     @Override
259     public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
260         // As long as there is some active frame we should write at least 1 time.
261         if (connectionState.activeCountForTree == 0) {
262             return false;
263         }
264 
265         // The goal is to write until we write all the allocated bytes or are no longer making progress.
266         // We still attempt to write even after the number of allocated bytes has been exhausted to allow empty frames
267         // to be sent. Making progress means the active streams rooted at the connection stream has changed.
268         int oldIsActiveCountForTree;
269         do {
270             oldIsActiveCountForTree = connectionState.activeCountForTree;
271             // connectionState will never be active, so go right to its children.
272             maxBytes -= distributeToChildren(maxBytes, writer, connectionState);
273         } while (connectionState.activeCountForTree != 0 &&
274                 (maxBytes > 0 || oldIsActiveCountForTree != connectionState.activeCountForTree));
275 
276         return connectionState.activeCountForTree != 0;
277     }
278 
279     /**
280      * Sets the amount of bytes that will be allocated to each stream. Defaults to 1KiB.
281      * @param allocationQuantum the amount of bytes that will be allocated to each stream. Must be &gt; 0.
282      */
283     public void allocationQuantum(int allocationQuantum) {
284         if (allocationQuantum <= 0) {
285             throw new IllegalArgumentException("allocationQuantum must be > 0");
286         }
287         this.allocationQuantum = allocationQuantum;
288     }
289 
290     private int distribute(int maxBytes, Writer writer, State state) throws Http2Exception {
291         if (state.isActive()) {
292             int nsent = min(maxBytes, state.streamableBytes);
293             state.write(nsent, writer);
294             if (nsent == 0 && maxBytes != 0) {
295                 // If a stream sends zero bytes, then we gave it a chance to write empty frames and it is now
296                 // considered inactive until the next call to updateStreamableBytes. This allows descendant streams to
297                 // be allocated bytes when the parent stream can't utilize them. This may be as a result of the
298                 // stream's flow control window being 0.
299                 state.updateStreamableBytes(state.streamableBytes, false);
300             }
301             return nsent;
302         }
303 
304         return distributeToChildren(maxBytes, writer, state);
305     }
306 
307     /**
308      * It is a pre-condition that {@code state.poll()} returns a non-{@code null} value. This is a result of the way
309      * the allocation algorithm is structured and can be explained in the following cases:
310      * <h3>For the recursive case</h3>
311      * If a stream has no children (in the allocation tree) than that node must be active or it will not be in the
312      * allocation tree. If a node is active then it will not delegate to children and recursion ends.
313      * <h3>For the initial case</h3>
314      * We check connectionState.activeCountForTree == 0 before any allocation is done. So if the connection stream
315      * has no active children we don't get into this method.
316      */
317     private int distributeToChildren(int maxBytes, Writer writer, State state) throws Http2Exception {
318         long oldTotalQueuedWeights = state.totalQueuedWeights;
319         State childState = state.pollPseudoTimeQueue();
320         State nextChildState = state.peekPseudoTimeQueue();
321         childState.setDistributing();
322         try {
323             assert nextChildState == null || nextChildState.pseudoTimeToWrite >= childState.pseudoTimeToWrite :
324                 "nextChildState[" + nextChildState.streamId + "].pseudoTime(" + nextChildState.pseudoTimeToWrite +
325                 ") < " + " childState[" + childState.streamId + "].pseudoTime(" + childState.pseudoTimeToWrite + ")";
326             int nsent = distribute(nextChildState == null ? maxBytes :
327                             min(maxBytes, (int) min((nextChildState.pseudoTimeToWrite - childState.pseudoTimeToWrite) *
328                                                childState.weight / oldTotalQueuedWeights + allocationQuantum, MAX_VALUE)
329                                ),
330                                writer,
331                                childState);
332             state.pseudoTime += nsent;
333             childState.updatePseudoTime(state, nsent, oldTotalQueuedWeights);
334             return nsent;
335         } finally {
336             childState.unsetDistributing();
337             // Do in finally to ensure the internal flags is not corrupted if an exception is thrown.
338             // The offer operation is delayed until we unroll up the recursive stack, so we don't have to remove from
339             // the priority pseudoTimeQueue due to a write operation.
340             if (childState.activeCountForTree != 0) {
341                 state.offerPseudoTimeQueue(childState);
342             }
343         }
344     }
345 
346     private State state(Http2Stream stream) {
347         return stream.getProperty(stateKey);
348     }
349 
350     private State state(int streamId) {
351         Http2Stream stream = connection.stream(streamId);
352         return stream != null ? state(stream) : stateOnlyMap.get(streamId);
353     }
354 
355     /**
356      * For testing only!
357      */
358     boolean isChild(int childId, int parentId, short weight) {
359         State parent = state(parentId);
360         State child;
361         return parent.children.containsKey(childId) &&
362                 (child = state(childId)).parent == parent && child.weight == weight;
363     }
364 
365     /**
366      * For testing only!
367      */
368     int numChildren(int streamId) {
369         State state = state(streamId);
370         return state == null ? 0 : state.children.size();
371     }
372 
373     /**
374      * Notify all listeners of the priority tree change events (in ascending order)
375      * @param events The events (top down order) which have changed
376      */
377     void notifyParentChanged(List<ParentChangedEvent> events) {
378         for (int i = 0; i < events.size(); ++i) {
379             ParentChangedEvent event = events.get(i);
380             stateOnlyRemovalQueue.priorityChanged(event.state);
381             if (event.state.parent != null && event.state.activeCountForTree != 0) {
382                 event.state.parent.offerAndInitializePseudoTime(event.state);
383                 event.state.parent.activeCountChangeForTree(event.state.activeCountForTree);
384             }
385         }
386     }
387 
388     /**
389      * A comparator for {@link State} which has no associated {@link Http2Stream} object. The general precedence is:
390      * <ul>
391      *     <li>Was a stream activated or reserved (streams only used for priority are higher priority)</li>
392      *     <li>Depth in the priority tree (closer to root is higher priority></li>
393      *     <li>Stream ID (higher stream ID is higher priority - used for tie breaker)</li>
394      * </ul>
395      */
396     private static final class StateOnlyComparator implements Comparator<State>, Serializable {
397         private static final long serialVersionUID = -4806936913002105966L;
398 
399         static final StateOnlyComparator INSTANCE = new StateOnlyComparator();
400 
401         private StateOnlyComparator() {
402         }
403 
404         @Override
405         public int compare(State o1, State o2) {
406             // "priority only streams" (which have not been activated) are higher priority than streams used for data.
407             boolean o1Actived = o1.wasStreamReservedOrActivated();
408             if (o1Actived != o2.wasStreamReservedOrActivated()) {
409                 return o1Actived ? -1 : 1;
410             }
411             // Numerically greater depth is higher priority.
412             int x = o2.dependencyTreeDepth - o1.dependencyTreeDepth;
413 
414             // I also considered tracking the number of streams which are "activated" (eligible transfer data) at each
415             // subtree. This would require a traversal from each node to the root on dependency tree structural changes,
416             // and then it would require a re-prioritization at each of these nodes (instead of just the nodes where the
417             // direct parent changed). The costs of this are judged to be relatively high compared to the nominal
418             // benefit it provides to the heuristic. Instead folks should just increase maxStateOnlySize.
419 
420             // Last resort is to give larger stream ids more priority.
421             return x != 0 ? x : o1.streamId - o2.streamId;
422         }
423     }
424 
425     private static final class StatePseudoTimeComparator implements Comparator<State>, Serializable {
426         private static final long serialVersionUID = -1437548640227161828L;
427 
428         static final StatePseudoTimeComparator INSTANCE = new StatePseudoTimeComparator();
429 
430         private StatePseudoTimeComparator() {
431         }
432 
433         @Override
434         public int compare(State o1, State o2) {
435             return MathUtil.compare(o1.pseudoTimeToWrite, o2.pseudoTimeToWrite);
436         }
437     }
438 
439     /**
440      * The remote flow control state for a single stream.
441      */
442     private final class State implements PriorityQueueNode {
443         private static final byte STATE_IS_ACTIVE = 0x1;
444         private static final byte STATE_IS_DISTRIBUTING = 0x2;
445         private static final byte STATE_STREAM_ACTIVATED = 0x4;
446 
447         /**
448          * Maybe {@code null} if the stream if the stream is not active.
449          */
450         Http2Stream stream;
451         State parent;
452         IntObjectMap<State> children = IntCollections.emptyMap();
453         private final PriorityQueue<State> pseudoTimeQueue;
454         final int streamId;
455         int streamableBytes;
456         int dependencyTreeDepth;
457         /**
458          * Count of nodes rooted at this sub tree with {@link #isActive()} equal to {@code true}.
459          */
460         int activeCountForTree;
461         private int pseudoTimeQueueIndex = INDEX_NOT_IN_QUEUE;
462         private int stateOnlyQueueIndex = INDEX_NOT_IN_QUEUE;
463         /**
464          * An estimate of when this node should be given the opportunity to write data.
465          */
466         long pseudoTimeToWrite;
467         /**
468          * A pseudo time maintained for immediate children to base their {@link #pseudoTimeToWrite} off of.
469          */
470         long pseudoTime;
471         long totalQueuedWeights;
472         private byte flags;
473         short weight = DEFAULT_PRIORITY_WEIGHT;
474 
475         State(int streamId) {
476             this(streamId, null, 0);
477         }
478 
479         State(Http2Stream stream) {
480             this(stream, 0);
481         }
482 
483         State(Http2Stream stream, int initialSize) {
484             this(stream.id(), stream, initialSize);
485         }
486 
487         State(int streamId, Http2Stream stream, int initialSize) {
488             this.stream = stream;
489             this.streamId = streamId;
490             pseudoTimeQueue = new DefaultPriorityQueue<State>(StatePseudoTimeComparator.INSTANCE, initialSize);
491         }
492 
493         boolean isDescendantOf(State state) {
494             State next = parent;
495             while (next != null) {
496                 if (next == state) {
497                     return true;
498                 }
499                 next = next.parent;
500             }
501             return false;
502         }
503 
504         void takeChild(State child, boolean exclusive, List<ParentChangedEvent> events) {
505             takeChild(null, child, exclusive, events);
506         }
507 
508         /**
509          * Adds a child to this priority. If exclusive is set, any children of this node are moved to being dependent on
510          * the child.
511          */
512         void takeChild(Iterator<IntObjectMap.PrimitiveEntry<State>> childItr, State child, boolean exclusive,
513                        List<ParentChangedEvent> events) {
514             State oldParent = child.parent;
515 
516             if (oldParent != this) {
517                 events.add(new ParentChangedEvent(child, oldParent));
518                 child.setParent(this);
519                 // If the childItr is not null we are iterating over the oldParent.children collection and should
520                 // use the iterator to remove from the collection to avoid concurrent modification. Otherwise it is
521                 // assumed we are not iterating over this collection and it is safe to call remove directly.
522                 if (childItr != null) {
523                     childItr.remove();
524                 } else if (oldParent != null) {
525                     oldParent.children.remove(child.streamId);
526                 }
527 
528                 // Lazily initialize the children to save object allocations.
529                 initChildrenIfEmpty();
530 
531                 final State oldChild = children.put(child.streamId, child);
532                 assert oldChild == null : "A stream with the same stream ID was already in the child map.";
533             }
534 
535             if (exclusive && !children.isEmpty()) {
536                 // If it was requested that this child be the exclusive dependency of this node,
537                 // move any previous children to the child node, becoming grand children of this node.
538                 Iterator<IntObjectMap.PrimitiveEntry<State>> itr = removeAllChildrenExcept(child).entries().iterator();
539                 while (itr.hasNext()) {
540                     child.takeChild(itr, itr.next().value(), false, events);
541                 }
542             }
543         }
544 
545         /**
546          * Removes the child priority and moves any of its dependencies to being direct dependencies on this node.
547          */
548         void removeChild(State child) {
549             if (children.remove(child.streamId) != null) {
550                 List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1 + child.children.size());
551                 events.add(new ParentChangedEvent(child, child.parent));
552                 child.setParent(null);
553 
554                 // Move up any grand children to be directly dependent on this node.
555                 Iterator<IntObjectMap.PrimitiveEntry<State>> itr = child.children.entries().iterator();
556                 while (itr.hasNext()) {
557                     takeChild(itr, itr.next().value(), false, events);
558                 }
559 
560                 notifyParentChanged(events);
561             }
562         }
563 
564         /**
565          * Remove all children with the exception of {@code streamToRetain}.
566          * This method is intended to be used to support an exclusive priority dependency operation.
567          * @return The map of children prior to this operation, excluding {@code streamToRetain} if present.
568          */
569         private IntObjectMap<State> removeAllChildrenExcept(State stateToRetain) {
570             stateToRetain = children.remove(stateToRetain.streamId);
571             IntObjectMap<State> prevChildren = children;
572             // This map should be re-initialized in anticipation for the 1 exclusive child which will be added.
573             // It will either be added directly in this method, or after this method is called...but it will be added.
574             initChildren();
575             if (stateToRetain != null) {
576                 children.put(stateToRetain.streamId, stateToRetain);
577             }
578             return prevChildren;
579         }
580 
581         private void setParent(State newParent) {
582             // if activeCountForTree == 0 then it will not be in its parent's pseudoTimeQueue.
583             if (activeCountForTree != 0 && parent != null) {
584                 parent.removePseudoTimeQueue(this);
585                 parent.activeCountChangeForTree(-activeCountForTree);
586             }
587             parent = newParent;
588             // Use MAX_VALUE if no parent because lower depth is considered higher priority by StateOnlyComparator.
589             dependencyTreeDepth = newParent == null ? MAX_VALUE : newParent.dependencyTreeDepth + 1;
590         }
591 
592         private void initChildrenIfEmpty() {
593             if (children == IntCollections.<State>emptyMap()) {
594                 initChildren();
595             }
596         }
597 
598         private void initChildren() {
599             children = new IntObjectHashMap<State>(INITIAL_CHILDREN_MAP_SIZE);
600         }
601 
602         void write(int numBytes, Writer writer) throws Http2Exception {
603             assert stream != null;
604             try {
605                 writer.write(stream, numBytes);
606             } catch (Throwable t) {
607                 throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
608             }
609         }
610 
611         void activeCountChangeForTree(int increment) {
612             assert activeCountForTree + increment >= 0;
613             activeCountForTree += increment;
614             if (parent != null) {
615                 assert activeCountForTree != increment ||
616                        pseudoTimeQueueIndex == INDEX_NOT_IN_QUEUE ||
617                        parent.pseudoTimeQueue.containsTyped(this) :
618                      "State[" + streamId + "].activeCountForTree changed from 0 to " + increment + " is in a " +
619                      "pseudoTimeQueue, but not in parent[ " + parent.streamId + "]'s pseudoTimeQueue";
620                 if (activeCountForTree == 0) {
621                     parent.removePseudoTimeQueue(this);
622                 } else if (activeCountForTree == increment && !isDistributing()) {
623                     // If frame count was 0 but is now not, and this node is not already in a pseudoTimeQueue (assumed
624                     // to be pState's pseudoTimeQueue) then enqueue it. If this State object is being processed the
625                     // pseudoTime for this node should not be adjusted, and the node will be added back to the
626                     // pseudoTimeQueue/tree structure after it is done being processed. This may happen if the
627                     // activeCountForTree == 0 (a node which can't stream anything and is blocked) is at/near root of
628                     // the tree, and is popped off the pseudoTimeQueue during processing, and then put back on the
629                     // pseudoTimeQueue because a child changes position in the priority tree (or is closed because it is
630                     // not blocked and finished writing all data).
631                     parent.offerAndInitializePseudoTime(this);
632                 }
633                 parent.activeCountChangeForTree(increment);
634             }
635         }
636 
637         void updateStreamableBytes(int newStreamableBytes, boolean isActive) {
638             if (isActive() != isActive) {
639                 if (isActive) {
640                     activeCountChangeForTree(1);
641                     setActive();
642                 } else {
643                     activeCountChangeForTree(-1);
644                     unsetActive();
645                 }
646             }
647 
648             streamableBytes = newStreamableBytes;
649         }
650 
651         /**
652          * Assumes the parents {@link #totalQueuedWeights} includes this node's weight.
653          */
654         void updatePseudoTime(State parentState, int nsent, long totalQueuedWeights) {
655             assert streamId != CONNECTION_STREAM_ID && nsent >= 0;
656             // If the current pseudoTimeToSend is greater than parentState.pseudoTime then we previously over accounted
657             // and should use parentState.pseudoTime.
658             pseudoTimeToWrite = min(pseudoTimeToWrite, parentState.pseudoTime) + nsent * totalQueuedWeights / weight;
659         }
660 
661         /**
662          * The concept of pseudoTime can be influenced by priority tree manipulations or if a stream goes from "active"
663          * to "non-active". This method accounts for that by initializing the {@link #pseudoTimeToWrite} for
664          * {@code state} to {@link #pseudoTime} of this node and then calls {@link #offerPseudoTimeQueue(State)}.
665          */
666         void offerAndInitializePseudoTime(State state) {
667             state.pseudoTimeToWrite = pseudoTime;
668             offerPseudoTimeQueue(state);
669         }
670 
671         void offerPseudoTimeQueue(State state) {
672             pseudoTimeQueue.offer(state);
673             totalQueuedWeights += state.weight;
674         }
675 
676         /**
677          * Must only be called if the pseudoTimeQueue is non-empty!
678          */
679         State pollPseudoTimeQueue() {
680             State state = pseudoTimeQueue.poll();
681             // This method is only ever called if the pseudoTimeQueue is non-empty.
682             totalQueuedWeights -= state.weight;
683             return state;
684         }
685 
686         void removePseudoTimeQueue(State state) {
687             if (pseudoTimeQueue.removeTyped(state)) {
688                 totalQueuedWeights -= state.weight;
689             }
690         }
691 
692         State peekPseudoTimeQueue() {
693             return pseudoTimeQueue.peek();
694         }
695 
696         void close() {
697             updateStreamableBytes(0, false);
698             stream = null;
699         }
700 
701         boolean wasStreamReservedOrActivated() {
702             return (flags & STATE_STREAM_ACTIVATED) != 0;
703         }
704 
705         void setStreamReservedOrActivated() {
706             flags |= STATE_STREAM_ACTIVATED;
707         }
708 
709         boolean isActive() {
710             return (flags & STATE_IS_ACTIVE) != 0;
711         }
712 
713         private void setActive() {
714             flags |= STATE_IS_ACTIVE;
715         }
716 
717         private void unsetActive() {
718             flags &= ~STATE_IS_ACTIVE;
719         }
720 
721         boolean isDistributing() {
722             return (flags & STATE_IS_DISTRIBUTING) != 0;
723         }
724 
725         void setDistributing() {
726             flags |= STATE_IS_DISTRIBUTING;
727         }
728 
729         void unsetDistributing() {
730             flags &= ~STATE_IS_DISTRIBUTING;
731         }
732 
733         @Override
734         public int priorityQueueIndex(DefaultPriorityQueue<?> queue) {
735             return queue == stateOnlyRemovalQueue ? stateOnlyQueueIndex : pseudoTimeQueueIndex;
736         }
737 
738         @Override
739         public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i) {
740             if (queue == stateOnlyRemovalQueue) {
741                 stateOnlyQueueIndex = i;
742             } else {
743                 pseudoTimeQueueIndex = i;
744             }
745         }
746 
747         @Override
748         public String toString() {
749             // Use activeCountForTree as a rough estimate for how many nodes are in this subtree.
750             StringBuilder sb = new StringBuilder(256 * (activeCountForTree > 0 ? activeCountForTree : 1));
751             toString(sb);
752             return sb.toString();
753         }
754 
755         private void toString(StringBuilder sb) {
756             sb.append("{streamId ").append(streamId)
757                     .append(" streamableBytes ").append(streamableBytes)
758                     .append(" activeCountForTree ").append(activeCountForTree)
759                     .append(" pseudoTimeQueueIndex ").append(pseudoTimeQueueIndex)
760                     .append(" pseudoTimeToWrite ").append(pseudoTimeToWrite)
761                     .append(" pseudoTime ").append(pseudoTime)
762                     .append(" flags ").append(flags)
763                     .append(" pseudoTimeQueue.size() ").append(pseudoTimeQueue.size())
764                     .append(" stateOnlyQueueIndex ").append(stateOnlyQueueIndex)
765                     .append(" parent.streamId ").append(parent == null ? -1 : parent.streamId).append("} [");
766 
767             if (!pseudoTimeQueue.isEmpty()) {
768                 for (State s : pseudoTimeQueue) {
769                     s.toString(sb);
770                     sb.append(", ");
771                 }
772                 // Remove the last ", "
773                 sb.setLength(sb.length() - 2);
774             }
775             sb.append(']');
776         }
777     }
778 
779     /**
780      * Allows a correlation to be made between a stream and its old parent before a parent change occurs.
781      */
782     private static final class ParentChangedEvent {
783         final State state;
784         final State oldParent;
785 
786         /**
787          * Create a new instance.
788          * @param state The state who has had a parent change.
789          * @param oldParent The previous parent.
790          */
791         ParentChangedEvent(State state, State oldParent) {
792             this.state = state;
793             this.oldParent = oldParent;
794         }
795     }
796 }