View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.util.collection.IntCollections;
18  import io.netty.util.collection.IntObjectHashMap;
19  import io.netty.util.collection.IntObjectMap;
20  import io.netty.util.internal.DefaultPriorityQueue;
21  import io.netty.util.internal.EmptyPriorityQueue;
22  import io.netty.util.internal.MathUtil;
23  import io.netty.util.internal.PriorityQueue;
24  import io.netty.util.internal.PriorityQueueNode;
25  import io.netty.util.internal.SystemPropertyUtil;
26  import io.netty.util.internal.UnstableApi;
27  
28  import java.io.Serializable;
29  import java.util.ArrayList;
30  import java.util.Comparator;
31  import java.util.Iterator;
32  import java.util.List;
33  
34  import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
35  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MIN_ALLOCATION_CHUNK;
36  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
37  import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes;
38  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
39  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
40  import static io.netty.util.internal.ObjectUtil.checkPositive;
41  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
42  import static java.lang.Integer.MAX_VALUE;
43  import static java.lang.Math.max;
44  import static java.lang.Math.min;
45  
46  /**
47   * A {@link StreamByteDistributor} that is sensitive to stream priority and uses
48   * <a href="https://en.wikipedia.org/wiki/Weighted_fair_queueing">Weighted Fair Queueing</a> approach for distributing
49   * bytes.
50   * <p>
51   * Inspiration for this distributor was taken from Linux's
52   * <a href="https://www.kernel.org/doc/Documentation/scheduler/sched-design-CFS.txt">Completely Fair Scheduler</a>
53   * to model the distribution of bytes to simulate an "ideal multi-tasking CPU", but in this case we are simulating
54   * an "ideal multi-tasking NIC".
55   * <p>
56   * Each write operation will use the {@link #allocationQuantum(int)} to know how many more bytes should be allocated
57   * relative to the next stream which wants to write. This is to balance fairness while also considering goodput.
58   */
59  @UnstableApi
60  public final class WeightedFairQueueByteDistributor implements StreamByteDistributor {
61      /**
62       * The initial size of the children map is chosen to be conservative on initial memory allocations under
63       * the assumption that most streams will have a small number of children. This choice may be
64       * sub-optimal if when children are present there are many children (i.e. a web page which has many
65       * dependencies to load).
66       *
67       * Visible only for testing!
68       */
69      static final int INITIAL_CHILDREN_MAP_SIZE =
70              max(1, SystemPropertyUtil.getInt("io.netty.http2.childrenMapSize", 2));
71      /**
72       * FireFox currently uses 5 streams to establish QoS classes.
73       */
74      private static final int DEFAULT_MAX_STATE_ONLY_SIZE = 5;
75  
76      private final Http2Connection.PropertyKey stateKey;
77      /**
78       * If there is no Http2Stream object, but we still persist priority information then this is where the state will
79       * reside.
80       */
81      private final IntObjectMap<State> stateOnlyMap;
82      /**
83       * This queue will hold streams that are not active and provides the capability to retain priority for streams which
84       * have no {@link Http2Stream} object. See {@link StateOnlyComparator} for the priority comparator.
85       */
86      private final PriorityQueue<State> stateOnlyRemovalQueue;
87      private final Http2Connection connection;
88      private final State connectionState;
89      /**
90       * The minimum number of bytes that we will attempt to allocate to a stream. This is to
91       * help improve goodput on a per-stream basis.
92       */
93      private int allocationQuantum = DEFAULT_MIN_ALLOCATION_CHUNK;
94      private final int maxStateOnlySize;
95  
96      public WeightedFairQueueByteDistributor(Http2Connection connection) {
97          this(connection, DEFAULT_MAX_STATE_ONLY_SIZE);
98      }
99  
100     public WeightedFairQueueByteDistributor(Http2Connection connection, int maxStateOnlySize) {
101         checkPositiveOrZero(maxStateOnlySize, "maxStateOnlySize");
102         if (maxStateOnlySize == 0) {
103             stateOnlyMap = IntCollections.emptyMap();
104             stateOnlyRemovalQueue = EmptyPriorityQueue.instance();
105         } else {
106             stateOnlyMap = new IntObjectHashMap<State>(maxStateOnlySize);
107             // +2 because we may exceed the limit by 2 if a new dependency has no associated Http2Stream object. We need
108             // to create the State objects to put them into the dependency tree, which then impacts priority.
109             stateOnlyRemovalQueue = new DefaultPriorityQueue<State>(StateOnlyComparator.INSTANCE, maxStateOnlySize + 2);
110         }
111         this.maxStateOnlySize = maxStateOnlySize;
112 
113         this.connection = connection;
114         stateKey = connection.newKey();
115         final Http2Stream connectionStream = connection.connectionStream();
116         connectionStream.setProperty(stateKey, connectionState = new State(connectionStream, 16));
117 
118         // Register for notification of new streams.
119         connection.addListener(new Http2ConnectionAdapter() {
120             @Override
121             public void onStreamAdded(Http2Stream stream) {
122                 State state = stateOnlyMap.remove(stream.id());
123                 if (state == null) {
124                     state = new State(stream);
125                     // Only the stream which was just added will change parents. So we only need an array of size 1.
126                     List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1);
127                     connectionState.takeChild(state, false, events);
128                     notifyParentChanged(events);
129                 } else {
130                     stateOnlyRemovalQueue.removeTyped(state);
131                     state.stream = stream;
132                 }
133                 switch (stream.state()) {
134                     case RESERVED_REMOTE:
135                     case RESERVED_LOCAL:
136                         state.setStreamReservedOrActivated();
137                         // wasStreamReservedOrActivated is part of the comparator for stateOnlyRemovalQueue there is no
138                         // need to reprioritize here because it will not be in stateOnlyRemovalQueue.
139                         break;
140                     default:
141                         break;
142                 }
143                 stream.setProperty(stateKey, state);
144             }
145 
146             @Override
147             public void onStreamActive(Http2Stream stream) {
148                 state(stream).setStreamReservedOrActivated();
149                 // wasStreamReservedOrActivated is part of the comparator for stateOnlyRemovalQueue there is no need to
150                 // reprioritize here because it will not be in stateOnlyRemovalQueue.
151             }
152 
153             @Override
154             public void onStreamClosed(Http2Stream stream) {
155                 state(stream).close();
156             }
157 
158             @Override
159             public void onStreamRemoved(Http2Stream stream) {
160                 // The stream has been removed from the connection. We can no longer rely on the stream's property
161                 // storage to track the State. If we have room, and the precedence of the stream is sufficient, we
162                 // should retain the State in the stateOnlyMap.
163                 State state = state(stream);
164 
165                 // Typically the stream is set to null when the stream is closed because it is no longer needed to write
166                 // data. However if the stream was not activated it may not be closed (reserved streams) so we ensure
167                 // the stream reference is set to null to avoid retaining a reference longer than necessary.
168                 state.stream = null;
169 
170                 if (WeightedFairQueueByteDistributor.this.maxStateOnlySize == 0) {
171                     state.parent.removeChild(state);
172                     return;
173                 }
174                 if (stateOnlyRemovalQueue.size() == WeightedFairQueueByteDistributor.this.maxStateOnlySize) {
175                     State stateToRemove = stateOnlyRemovalQueue.peek();
176                     if (StateOnlyComparator.INSTANCE.compare(stateToRemove, state) >= 0) {
177                         // The "lowest priority" stream is a "higher priority" than the stream being removed, so we
178                         // just discard the state.
179                         state.parent.removeChild(state);
180                         return;
181                     }
182                     stateOnlyRemovalQueue.poll();
183                     stateToRemove.parent.removeChild(stateToRemove);
184                     stateOnlyMap.remove(stateToRemove.streamId);
185                 }
186                 stateOnlyRemovalQueue.add(state);
187                 stateOnlyMap.put(state.streamId, state);
188             }
189         });
190     }
191 
192     @Override
193     public void updateStreamableBytes(StreamState state) {
194         state(state.stream()).updateStreamableBytes(streamableBytes(state),
195                                                     state.hasFrame() && state.windowSize() >= 0);
196     }
197 
198     @Override
199     public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
200         State state = state(childStreamId);
201         if (state == null) {
202             // If there is no State object that means there is no Http2Stream object and we would have to keep the
203             // State object in the stateOnlyMap and stateOnlyRemovalQueue. However if maxStateOnlySize is 0 this means
204             // stateOnlyMap and stateOnlyRemovalQueue are empty collections and cannot be modified so we drop the State.
205             if (maxStateOnlySize == 0) {
206                 return;
207             }
208             state = new State(childStreamId);
209             stateOnlyRemovalQueue.add(state);
210             stateOnlyMap.put(childStreamId, state);
211         }
212 
213         State newParent = state(parentStreamId);
214         if (newParent == null) {
215             // If there is no State object that means there is no Http2Stream object and we would have to keep the
216             // State object in the stateOnlyMap and stateOnlyRemovalQueue. However if maxStateOnlySize is 0 this means
217             // stateOnlyMap and stateOnlyRemovalQueue are empty collections and cannot be modified so we drop the State.
218             if (maxStateOnlySize == 0) {
219                 return;
220             }
221             newParent = new State(parentStreamId);
222             stateOnlyRemovalQueue.add(newParent);
223             stateOnlyMap.put(parentStreamId, newParent);
224             // Only the stream which was just added will change parents. So we only need an array of size 1.
225             List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1);
226             connectionState.takeChild(newParent, false, events);
227             notifyParentChanged(events);
228         }
229 
230         // if activeCountForTree == 0 then it will not be in its parent's pseudoTimeQueue and thus should not be counted
231         // toward parent.totalQueuedWeights.
232         if (state.activeCountForTree != 0 && state.parent != null) {
233             state.parent.totalQueuedWeights += weight - state.weight;
234         }
235         state.weight = weight;
236 
237         if (newParent != state.parent || (exclusive && newParent.children.size() != 1)) {
238             final List<ParentChangedEvent> events;
239             if (newParent.isDescendantOf(state)) {
240                 events = new ArrayList<ParentChangedEvent>(2 + (exclusive ? newParent.children.size() : 0));
241                 state.parent.takeChild(newParent, false, events);
242             } else {
243                 events = new ArrayList<ParentChangedEvent>(1 + (exclusive ? newParent.children.size() : 0));
244             }
245             newParent.takeChild(state, exclusive, events);
246             notifyParentChanged(events);
247         }
248 
249         // The location in the dependency tree impacts the priority in the stateOnlyRemovalQueue map. If we created new
250         // State objects we must check if we exceeded the limit after we insert into the dependency tree to ensure the
251         // stateOnlyRemovalQueue has been updated.
252         while (stateOnlyRemovalQueue.size() > maxStateOnlySize) {
253             State stateToRemove = stateOnlyRemovalQueue.poll();
254             stateToRemove.parent.removeChild(stateToRemove);
255             stateOnlyMap.remove(stateToRemove.streamId);
256         }
257     }
258 
259     @Override
260     public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
261         // As long as there is some active frame we should write at least 1 time.
262         if (connectionState.activeCountForTree == 0) {
263             return false;
264         }
265 
266         // The goal is to write until we write all the allocated bytes or are no longer making progress.
267         // We still attempt to write even after the number of allocated bytes has been exhausted to allow empty frames
268         // to be sent. Making progress means the active streams rooted at the connection stream has changed.
269         int oldIsActiveCountForTree;
270         do {
271             oldIsActiveCountForTree = connectionState.activeCountForTree;
272             // connectionState will never be active, so go right to its children.
273             maxBytes -= distributeToChildren(maxBytes, writer, connectionState);
274         } while (connectionState.activeCountForTree != 0 &&
275                 (maxBytes > 0 || oldIsActiveCountForTree != connectionState.activeCountForTree));
276 
277         return connectionState.activeCountForTree != 0;
278     }
279 
280     /**
281      * Sets the amount of bytes that will be allocated to each stream. Defaults to 1KiB.
282      * @param allocationQuantum the amount of bytes that will be allocated to each stream. Must be &gt; 0.
283      */
284     public void allocationQuantum(int allocationQuantum) {
285         checkPositive(allocationQuantum, "allocationQuantum");
286         this.allocationQuantum = allocationQuantum;
287     }
288 
289     private int distribute(int maxBytes, Writer writer, State state) throws Http2Exception {
290         if (state.isActive()) {
291             int nsent = min(maxBytes, state.streamableBytes);
292             state.write(nsent, writer);
293             if (nsent == 0 && maxBytes != 0) {
294                 // If a stream sends zero bytes, then we gave it a chance to write empty frames and it is now
295                 // considered inactive until the next call to updateStreamableBytes. This allows descendant streams to
296                 // be allocated bytes when the parent stream can't utilize them. This may be as a result of the
297                 // stream's flow control window being 0.
298                 state.updateStreamableBytes(state.streamableBytes, false);
299             }
300             return nsent;
301         }
302 
303         return distributeToChildren(maxBytes, writer, state);
304     }
305 
306     /**
307      * It is a pre-condition that {@code state.poll()} returns a non-{@code null} value. This is a result of the way
308      * the allocation algorithm is structured and can be explained in the following cases:
309      * <h3>For the recursive case</h3>
310      * If a stream has no children (in the allocation tree) than that node must be active or it will not be in the
311      * allocation tree. If a node is active then it will not delegate to children and recursion ends.
312      * <h3>For the initial case</h3>
313      * We check connectionState.activeCountForTree == 0 before any allocation is done. So if the connection stream
314      * has no active children we don't get into this method.
315      */
316     private int distributeToChildren(int maxBytes, Writer writer, State state) throws Http2Exception {
317         long oldTotalQueuedWeights = state.totalQueuedWeights;
318         State childState = state.pollPseudoTimeQueue();
319         State nextChildState = state.peekPseudoTimeQueue();
320         childState.setDistributing();
321         try {
322             assert nextChildState == null || nextChildState.pseudoTimeToWrite >= childState.pseudoTimeToWrite :
323                 "nextChildState[" + nextChildState.streamId + "].pseudoTime(" + nextChildState.pseudoTimeToWrite +
324                 ") < " + " childState[" + childState.streamId + "].pseudoTime(" + childState.pseudoTimeToWrite + ")";
325             int nsent = distribute(nextChildState == null ? maxBytes :
326                             min(maxBytes, (int) min((nextChildState.pseudoTimeToWrite - childState.pseudoTimeToWrite) *
327                                                childState.weight / oldTotalQueuedWeights + allocationQuantum, MAX_VALUE)
328                                ),
329                                writer,
330                                childState);
331             state.pseudoTime += nsent;
332             childState.updatePseudoTime(state, nsent, oldTotalQueuedWeights);
333             return nsent;
334         } finally {
335             childState.unsetDistributing();
336             // Do in finally to ensure the internal flags is not corrupted if an exception is thrown.
337             // The offer operation is delayed until we unroll up the recursive stack, so we don't have to remove from
338             // the priority pseudoTimeQueue due to a write operation.
339             if (childState.activeCountForTree != 0) {
340                 state.offerPseudoTimeQueue(childState);
341             }
342         }
343     }
344 
345     private State state(Http2Stream stream) {
346         return stream.getProperty(stateKey);
347     }
348 
349     private State state(int streamId) {
350         Http2Stream stream = connection.stream(streamId);
351         return stream != null ? state(stream) : stateOnlyMap.get(streamId);
352     }
353 
354     /**
355      * For testing only!
356      */
357     boolean isChild(int childId, int parentId, short weight) {
358         State parent = state(parentId);
359         State child;
360         return parent.children.containsKey(childId) &&
361                 (child = state(childId)).parent == parent && child.weight == weight;
362     }
363 
364     /**
365      * For testing only!
366      */
367     int numChildren(int streamId) {
368         State state = state(streamId);
369         return state == null ? 0 : state.children.size();
370     }
371 
372     /**
373      * Notify all listeners of the priority tree change events (in ascending order)
374      * @param events The events (top down order) which have changed
375      */
376     void notifyParentChanged(List<ParentChangedEvent> events) {
377         for (int i = 0; i < events.size(); ++i) {
378             ParentChangedEvent event = events.get(i);
379             stateOnlyRemovalQueue.priorityChanged(event.state);
380             if (event.state.parent != null && event.state.activeCountForTree != 0) {
381                 event.state.parent.offerAndInitializePseudoTime(event.state);
382                 event.state.parent.activeCountChangeForTree(event.state.activeCountForTree);
383             }
384         }
385     }
386 
387     /**
388      * A comparator for {@link State} which has no associated {@link Http2Stream} object. The general precedence is:
389      * <ul>
390      *     <li>Was a stream activated or reserved (streams only used for priority are higher priority)</li>
391      *     <li>Depth in the priority tree (closer to root is higher priority></li>
392      *     <li>Stream ID (higher stream ID is higher priority - used for tie breaker)</li>
393      * </ul>
394      */
395     private static final class StateOnlyComparator implements Comparator<State>, Serializable {
396         private static final long serialVersionUID = -4806936913002105966L;
397 
398         static final StateOnlyComparator INSTANCE = new StateOnlyComparator();
399 
400         private StateOnlyComparator() {
401         }
402 
403         @Override
404         public int compare(State o1, State o2) {
405             // "priority only streams" (which have not been activated) are higher priority than streams used for data.
406             boolean o1Actived = o1.wasStreamReservedOrActivated();
407             if (o1Actived != o2.wasStreamReservedOrActivated()) {
408                 return o1Actived ? -1 : 1;
409             }
410             // Numerically greater depth is higher priority.
411             int x = o2.dependencyTreeDepth - o1.dependencyTreeDepth;
412 
413             // I also considered tracking the number of streams which are "activated" (eligible transfer data) at each
414             // subtree. This would require a traversal from each node to the root on dependency tree structural changes,
415             // and then it would require a re-prioritization at each of these nodes (instead of just the nodes where the
416             // direct parent changed). The costs of this are judged to be relatively high compared to the nominal
417             // benefit it provides to the heuristic. Instead folks should just increase maxStateOnlySize.
418 
419             // Last resort is to give larger stream ids more priority.
420             return x != 0 ? x : o1.streamId - o2.streamId;
421         }
422     }
423 
424     private static final class StatePseudoTimeComparator implements Comparator<State>, Serializable {
425         private static final long serialVersionUID = -1437548640227161828L;
426 
427         static final StatePseudoTimeComparator INSTANCE = new StatePseudoTimeComparator();
428 
429         private StatePseudoTimeComparator() {
430         }
431 
432         @Override
433         public int compare(State o1, State o2) {
434             return MathUtil.compare(o1.pseudoTimeToWrite, o2.pseudoTimeToWrite);
435         }
436     }
437 
438     /**
439      * The remote flow control state for a single stream.
440      */
441     private final class State implements PriorityQueueNode {
442         private static final byte STATE_IS_ACTIVE = 0x1;
443         private static final byte STATE_IS_DISTRIBUTING = 0x2;
444         private static final byte STATE_STREAM_ACTIVATED = 0x4;
445 
446         /**
447          * Maybe {@code null} if the stream if the stream is not active.
448          */
449         Http2Stream stream;
450         State parent;
451         IntObjectMap<State> children = IntCollections.emptyMap();
452         private final PriorityQueue<State> pseudoTimeQueue;
453         final int streamId;
454         int streamableBytes;
455         int dependencyTreeDepth;
456         /**
457          * Count of nodes rooted at this sub tree with {@link #isActive()} equal to {@code true}.
458          */
459         int activeCountForTree;
460         private int pseudoTimeQueueIndex = INDEX_NOT_IN_QUEUE;
461         private int stateOnlyQueueIndex = INDEX_NOT_IN_QUEUE;
462         /**
463          * An estimate of when this node should be given the opportunity to write data.
464          */
465         long pseudoTimeToWrite;
466         /**
467          * A pseudo time maintained for immediate children to base their {@link #pseudoTimeToWrite} off of.
468          */
469         long pseudoTime;
470         long totalQueuedWeights;
471         private byte flags;
472         short weight = DEFAULT_PRIORITY_WEIGHT;
473 
474         State(int streamId) {
475             this(streamId, null, 0);
476         }
477 
478         State(Http2Stream stream) {
479             this(stream, 0);
480         }
481 
482         State(Http2Stream stream, int initialSize) {
483             this(stream.id(), stream, initialSize);
484         }
485 
486         State(int streamId, Http2Stream stream, int initialSize) {
487             this.stream = stream;
488             this.streamId = streamId;
489             pseudoTimeQueue = new DefaultPriorityQueue<State>(StatePseudoTimeComparator.INSTANCE, initialSize);
490         }
491 
492         boolean isDescendantOf(State state) {
493             State next = parent;
494             while (next != null) {
495                 if (next == state) {
496                     return true;
497                 }
498                 next = next.parent;
499             }
500             return false;
501         }
502 
503         void takeChild(State child, boolean exclusive, List<ParentChangedEvent> events) {
504             takeChild(null, child, exclusive, events);
505         }
506 
507         /**
508          * Adds a child to this priority. If exclusive is set, any children of this node are moved to being dependent on
509          * the child.
510          */
511         void takeChild(Iterator<IntObjectMap.PrimitiveEntry<State>> childItr, State child, boolean exclusive,
512                        List<ParentChangedEvent> events) {
513             State oldParent = child.parent;
514 
515             if (oldParent != this) {
516                 events.add(new ParentChangedEvent(child, oldParent));
517                 child.setParent(this);
518                 // If the childItr is not null we are iterating over the oldParent.children collection and should
519                 // use the iterator to remove from the collection to avoid concurrent modification. Otherwise it is
520                 // assumed we are not iterating over this collection and it is safe to call remove directly.
521                 if (childItr != null) {
522                     childItr.remove();
523                 } else if (oldParent != null) {
524                     oldParent.children.remove(child.streamId);
525                 }
526 
527                 // Lazily initialize the children to save object allocations.
528                 initChildrenIfEmpty();
529 
530                 final State oldChild = children.put(child.streamId, child);
531                 assert oldChild == null : "A stream with the same stream ID was already in the child map.";
532             }
533 
534             if (exclusive && !children.isEmpty()) {
535                 // If it was requested that this child be the exclusive dependency of this node,
536                 // move any previous children to the child node, becoming grand children of this node.
537                 Iterator<IntObjectMap.PrimitiveEntry<State>> itr = removeAllChildrenExcept(child).entries().iterator();
538                 while (itr.hasNext()) {
539                     child.takeChild(itr, itr.next().value(), false, events);
540                 }
541             }
542         }
543 
544         /**
545          * Removes the child priority and moves any of its dependencies to being direct dependencies on this node.
546          */
547         void removeChild(State child) {
548             if (children.remove(child.streamId) != null) {
549                 List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1 + child.children.size());
550                 events.add(new ParentChangedEvent(child, child.parent));
551                 child.setParent(null);
552 
553                 // Move up any grand children to be directly dependent on this node.
554                 Iterator<IntObjectMap.PrimitiveEntry<State>> itr = child.children.entries().iterator();
555                 while (itr.hasNext()) {
556                     takeChild(itr, itr.next().value(), false, events);
557                 }
558 
559                 notifyParentChanged(events);
560             }
561         }
562 
563         /**
564          * Remove all children with the exception of {@code streamToRetain}.
565          * This method is intended to be used to support an exclusive priority dependency operation.
566          * @return The map of children prior to this operation, excluding {@code streamToRetain} if present.
567          */
568         private IntObjectMap<State> removeAllChildrenExcept(State stateToRetain) {
569             stateToRetain = children.remove(stateToRetain.streamId);
570             IntObjectMap<State> prevChildren = children;
571             // This map should be re-initialized in anticipation for the 1 exclusive child which will be added.
572             // It will either be added directly in this method, or after this method is called...but it will be added.
573             initChildren();
574             if (stateToRetain != null) {
575                 children.put(stateToRetain.streamId, stateToRetain);
576             }
577             return prevChildren;
578         }
579 
580         private void setParent(State newParent) {
581             // if activeCountForTree == 0 then it will not be in its parent's pseudoTimeQueue.
582             if (activeCountForTree != 0 && parent != null) {
583                 parent.removePseudoTimeQueue(this);
584                 parent.activeCountChangeForTree(-activeCountForTree);
585             }
586             parent = newParent;
587             // Use MAX_VALUE if no parent because lower depth is considered higher priority by StateOnlyComparator.
588             dependencyTreeDepth = newParent == null ? MAX_VALUE : newParent.dependencyTreeDepth + 1;
589         }
590 
591         private void initChildrenIfEmpty() {
592             if (children == IntCollections.<State>emptyMap()) {
593                 initChildren();
594             }
595         }
596 
597         private void initChildren() {
598             children = new IntObjectHashMap<State>(INITIAL_CHILDREN_MAP_SIZE);
599         }
600 
601         void write(int numBytes, Writer writer) throws Http2Exception {
602             assert stream != null;
603             try {
604                 writer.write(stream, numBytes);
605             } catch (Throwable t) {
606                 throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
607             }
608         }
609 
610         void activeCountChangeForTree(int increment) {
611             assert activeCountForTree + increment >= 0;
612             activeCountForTree += increment;
613             if (parent != null) {
614                 assert activeCountForTree != increment ||
615                        pseudoTimeQueueIndex == INDEX_NOT_IN_QUEUE ||
616                        parent.pseudoTimeQueue.containsTyped(this) :
617                      "State[" + streamId + "].activeCountForTree changed from 0 to " + increment + " is in a " +
618                      "pseudoTimeQueue, but not in parent[ " + parent.streamId + "]'s pseudoTimeQueue";
619                 if (activeCountForTree == 0) {
620                     parent.removePseudoTimeQueue(this);
621                 } else if (activeCountForTree == increment && !isDistributing()) {
622                     // If frame count was 0 but is now not, and this node is not already in a pseudoTimeQueue (assumed
623                     // to be pState's pseudoTimeQueue) then enqueue it. If this State object is being processed the
624                     // pseudoTime for this node should not be adjusted, and the node will be added back to the
625                     // pseudoTimeQueue/tree structure after it is done being processed. This may happen if the
626                     // activeCountForTree == 0 (a node which can't stream anything and is blocked) is at/near root of
627                     // the tree, and is popped off the pseudoTimeQueue during processing, and then put back on the
628                     // pseudoTimeQueue because a child changes position in the priority tree (or is closed because it is
629                     // not blocked and finished writing all data).
630                     parent.offerAndInitializePseudoTime(this);
631                 }
632                 parent.activeCountChangeForTree(increment);
633             }
634         }
635 
636         void updateStreamableBytes(int newStreamableBytes, boolean isActive) {
637             if (isActive() != isActive) {
638                 if (isActive) {
639                     activeCountChangeForTree(1);
640                     setActive();
641                 } else {
642                     activeCountChangeForTree(-1);
643                     unsetActive();
644                 }
645             }
646 
647             streamableBytes = newStreamableBytes;
648         }
649 
650         /**
651          * Assumes the parents {@link #totalQueuedWeights} includes this node's weight.
652          */
653         void updatePseudoTime(State parentState, int nsent, long totalQueuedWeights) {
654             assert streamId != CONNECTION_STREAM_ID && nsent >= 0;
655             // If the current pseudoTimeToSend is greater than parentState.pseudoTime then we previously over accounted
656             // and should use parentState.pseudoTime.
657             pseudoTimeToWrite = min(pseudoTimeToWrite, parentState.pseudoTime) + nsent * totalQueuedWeights / weight;
658         }
659 
660         /**
661          * The concept of pseudoTime can be influenced by priority tree manipulations or if a stream goes from "active"
662          * to "non-active". This method accounts for that by initializing the {@link #pseudoTimeToWrite} for
663          * {@code state} to {@link #pseudoTime} of this node and then calls {@link #offerPseudoTimeQueue(State)}.
664          */
665         void offerAndInitializePseudoTime(State state) {
666             state.pseudoTimeToWrite = pseudoTime;
667             offerPseudoTimeQueue(state);
668         }
669 
670         void offerPseudoTimeQueue(State state) {
671             pseudoTimeQueue.offer(state);
672             totalQueuedWeights += state.weight;
673         }
674 
675         /**
676          * Must only be called if the pseudoTimeQueue is non-empty!
677          */
678         State pollPseudoTimeQueue() {
679             State state = pseudoTimeQueue.poll();
680             // This method is only ever called if the pseudoTimeQueue is non-empty.
681             totalQueuedWeights -= state.weight;
682             return state;
683         }
684 
685         void removePseudoTimeQueue(State state) {
686             if (pseudoTimeQueue.removeTyped(state)) {
687                 totalQueuedWeights -= state.weight;
688             }
689         }
690 
691         State peekPseudoTimeQueue() {
692             return pseudoTimeQueue.peek();
693         }
694 
695         void close() {
696             updateStreamableBytes(0, false);
697             stream = null;
698         }
699 
700         boolean wasStreamReservedOrActivated() {
701             return (flags & STATE_STREAM_ACTIVATED) != 0;
702         }
703 
704         void setStreamReservedOrActivated() {
705             flags |= STATE_STREAM_ACTIVATED;
706         }
707 
708         boolean isActive() {
709             return (flags & STATE_IS_ACTIVE) != 0;
710         }
711 
712         private void setActive() {
713             flags |= STATE_IS_ACTIVE;
714         }
715 
716         private void unsetActive() {
717             flags &= ~STATE_IS_ACTIVE;
718         }
719 
720         boolean isDistributing() {
721             return (flags & STATE_IS_DISTRIBUTING) != 0;
722         }
723 
724         void setDistributing() {
725             flags |= STATE_IS_DISTRIBUTING;
726         }
727 
728         void unsetDistributing() {
729             flags &= ~STATE_IS_DISTRIBUTING;
730         }
731 
732         @Override
733         public int priorityQueueIndex(DefaultPriorityQueue<?> queue) {
734             return queue == stateOnlyRemovalQueue ? stateOnlyQueueIndex : pseudoTimeQueueIndex;
735         }
736 
737         @Override
738         public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i) {
739             if (queue == stateOnlyRemovalQueue) {
740                 stateOnlyQueueIndex = i;
741             } else {
742                 pseudoTimeQueueIndex = i;
743             }
744         }
745 
746         @Override
747         public String toString() {
748             // Use activeCountForTree as a rough estimate for how many nodes are in this subtree.
749             StringBuilder sb = new StringBuilder(256 * (activeCountForTree > 0 ? activeCountForTree : 1));
750             toString(sb);
751             return sb.toString();
752         }
753 
754         private void toString(StringBuilder sb) {
755             sb.append("{streamId ").append(streamId)
756                     .append(" streamableBytes ").append(streamableBytes)
757                     .append(" activeCountForTree ").append(activeCountForTree)
758                     .append(" pseudoTimeQueueIndex ").append(pseudoTimeQueueIndex)
759                     .append(" pseudoTimeToWrite ").append(pseudoTimeToWrite)
760                     .append(" pseudoTime ").append(pseudoTime)
761                     .append(" flags ").append(flags)
762                     .append(" pseudoTimeQueue.size() ").append(pseudoTimeQueue.size())
763                     .append(" stateOnlyQueueIndex ").append(stateOnlyQueueIndex)
764                     .append(" parent.streamId ").append(parent == null ? -1 : parent.streamId).append("} [");
765 
766             if (!pseudoTimeQueue.isEmpty()) {
767                 for (State s : pseudoTimeQueue) {
768                     s.toString(sb);
769                     sb.append(", ");
770                 }
771                 // Remove the last ", "
772                 sb.setLength(sb.length() - 2);
773             }
774             sb.append(']');
775         }
776     }
777 
778     /**
779      * Allows a correlation to be made between a stream and its old parent before a parent change occurs.
780      */
781     private static final class ParentChangedEvent {
782         final State state;
783         final State oldParent;
784 
785         /**
786          * Create a new instance.
787          * @param state The state who has had a parent change.
788          * @param oldParent The previous parent.
789          */
790         ParentChangedEvent(State state, State oldParent) {
791             this.state = state;
792             this.oldParent = oldParent;
793         }
794     }
795 }