View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty5.handler.codec.http2;
16  
17  import io.netty5.util.collection.IntCollections;
18  import io.netty5.util.collection.IntObjectHashMap;
19  import io.netty5.util.collection.IntObjectMap;
20  import io.netty5.util.internal.DefaultPriorityQueue;
21  import io.netty5.util.internal.EmptyPriorityQueue;
22  import io.netty5.util.internal.PriorityQueue;
23  import io.netty5.util.internal.PriorityQueueNode;
24  import io.netty5.util.internal.SystemPropertyUtil;
25  import io.netty5.util.internal.UnstableApi;
26  
27  import java.io.Serializable;
28  import java.util.ArrayList;
29  import java.util.Comparator;
30  import java.util.Iterator;
31  import java.util.List;
32  
33  import static io.netty5.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
34  import static io.netty5.handler.codec.http2.Http2CodecUtil.DEFAULT_MIN_ALLOCATION_CHUNK;
35  import static io.netty5.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
36  import static io.netty5.handler.codec.http2.Http2CodecUtil.streamableBytes;
37  import static io.netty5.handler.codec.http2.Http2Error.INTERNAL_ERROR;
38  import static io.netty5.handler.codec.http2.Http2Exception.connectionError;
39  import static io.netty5.util.internal.ObjectUtil.checkPositive;
40  import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
41  import static java.lang.Integer.MAX_VALUE;
42  import static java.lang.Math.max;
43  import static java.lang.Math.min;
44  
45  /**
46   * A {@link StreamByteDistributor} that is sensitive to stream priority and uses
47   * <a href="https://en.wikipedia.org/wiki/Weighted_fair_queueing">Weighted Fair Queueing</a> approach for distributing
48   * bytes.
49   * <p>
50   * Inspiration for this distributor was taken from Linux's
51   * <a href="https://www.kernel.org/doc/Documentation/scheduler/sched-design-CFS.txt">Completely Fair Scheduler</a>
52   * to model the distribution of bytes to simulate an "ideal multi-tasking CPU", but in this case we are simulating
53   * an "ideal multi-tasking NIC".
54   * <p>
55   * Each write operation will use the {@link #allocationQuantum(int)} to know how many more bytes should be allocated
56   * relative to the next stream which wants to write. This is to balance fairness while also considering goodput.
57   */
58  @UnstableApi
59  public final class WeightedFairQueueByteDistributor implements StreamByteDistributor {
60      /**
61       * The initial size of the children map is chosen to be conservative on initial memory allocations under
62       * the assumption that most streams will have a small number of children. This choice may be
63       * sub-optimal if when children are present there are many children (i.e. a web page which has many
64       * dependencies to load).
65       *
66       * Visible only for testing!
67       */
68      static final int INITIAL_CHILDREN_MAP_SIZE =
69              max(1, SystemPropertyUtil.getInt("io.netty5.http2.childrenMapSize", 2));
70      /**
71       * FireFox currently uses 5 streams to establish QoS classes.
72       */
73      private static final int DEFAULT_MAX_STATE_ONLY_SIZE = 5;
74  
75      private final Http2Connection.PropertyKey stateKey;
76      /**
77       * If there is no Http2Stream object, but we still persist priority information then this is where the state will
78       * reside.
79       */
80      private final IntObjectMap<State> stateOnlyMap;
81      /**
82       * This queue will hold streams that are not active and provides the capability to retain priority for streams which
83       * have no {@link Http2Stream} object. See {@link StateOnlyComparator} for the priority comparator.
84       */
85      private final PriorityQueue<State> stateOnlyRemovalQueue;
86      private final Http2Connection connection;
87      private final State connectionState;
88      /**
89       * The minimum number of bytes that we will attempt to allocate to a stream. This is to
90       * help improve goodput on a per-stream basis.
91       */
92      private int allocationQuantum = DEFAULT_MIN_ALLOCATION_CHUNK;
93      private final int maxStateOnlySize;
94  
95      public WeightedFairQueueByteDistributor(Http2Connection connection) {
96          this(connection, DEFAULT_MAX_STATE_ONLY_SIZE);
97      }
98  
99      public WeightedFairQueueByteDistributor(Http2Connection connection, int maxStateOnlySize) {
100         checkPositiveOrZero(maxStateOnlySize, "maxStateOnlySize");
101         if (maxStateOnlySize == 0) {
102             stateOnlyMap = IntCollections.emptyMap();
103             stateOnlyRemovalQueue = EmptyPriorityQueue.instance();
104         } else {
105             stateOnlyMap = new IntObjectHashMap<State>(maxStateOnlySize);
106             // +2 because we may exceed the limit by 2 if a new dependency has no associated Http2Stream object. We need
107             // to create the State objects to put them into the dependency tree, which then impacts priority.
108             stateOnlyRemovalQueue = new DefaultPriorityQueue<>(StateOnlyComparator.INSTANCE, maxStateOnlySize + 2);
109         }
110         this.maxStateOnlySize = maxStateOnlySize;
111 
112         this.connection = connection;
113         stateKey = connection.newKey();
114         final Http2Stream connectionStream = connection.connectionStream();
115         connectionStream.setProperty(stateKey, connectionState = new State(connectionStream, 16));
116 
117         // Register for notification of new streams.
118         connection.addListener(new Http2ConnectionAdapter() {
119             @Override
120             public void onStreamAdded(Http2Stream stream) {
121                 State state = stateOnlyMap.remove(stream.id());
122                 if (state == null) {
123                     state = new State(stream);
124                     // Only the stream which was just added will change parents. So we only need an array of size 1.
125                     List<ParentChangedEvent> events = new ArrayList<>(1);
126                     connectionState.takeChild(state, false, events);
127                     notifyParentChanged(events);
128                 } else {
129                     stateOnlyRemovalQueue.removeTyped(state);
130                     state.stream = stream;
131                 }
132                 switch (stream.state()) {
133                     case RESERVED_REMOTE:
134                     case RESERVED_LOCAL:
135                         state.setStreamReservedOrActivated();
136                         // wasStreamReservedOrActivated is part of the comparator for stateOnlyRemovalQueue there is no
137                         // need to reprioritize here because it will not be in stateOnlyRemovalQueue.
138                         break;
139                     default:
140                         break;
141                 }
142                 stream.setProperty(stateKey, state);
143             }
144 
145             @Override
146             public void onStreamActive(Http2Stream stream) {
147                 state(stream).setStreamReservedOrActivated();
148                 // wasStreamReservedOrActivated is part of the comparator for stateOnlyRemovalQueue there is no need to
149                 // reprioritize here because it will not be in stateOnlyRemovalQueue.
150             }
151 
152             @Override
153             public void onStreamClosed(Http2Stream stream) {
154                 state(stream).close();
155             }
156 
157             @Override
158             public void onStreamRemoved(Http2Stream stream) {
159                 // The stream has been removed from the connection. We can no longer rely on the stream's property
160                 // storage to track the State. If we have room, and the precedence of the stream is sufficient, we
161                 // should retain the State in the stateOnlyMap.
162                 State state = state(stream);
163 
164                 // Typically the stream is set to null when the stream is closed because it is no longer needed to write
165                 // data. However if the stream was not activated it may not be closed (reserved streams) so we ensure
166                 // the stream reference is set to null to avoid retaining a reference longer than necessary.
167                 state.stream = null;
168 
169                 if (WeightedFairQueueByteDistributor.this.maxStateOnlySize == 0) {
170                     state.parent.removeChild(state);
171                     return;
172                 }
173                 if (stateOnlyRemovalQueue.size() == WeightedFairQueueByteDistributor.this.maxStateOnlySize) {
174                     State stateToRemove = stateOnlyRemovalQueue.peek();
175                     if (StateOnlyComparator.INSTANCE.compare(stateToRemove, state) >= 0) {
176                         // The "lowest priority" stream is a "higher priority" than the stream being removed, so we
177                         // just discard the state.
178                         state.parent.removeChild(state);
179                         return;
180                     }
181                     stateOnlyRemovalQueue.poll();
182                     stateToRemove.parent.removeChild(stateToRemove);
183                     stateOnlyMap.remove(stateToRemove.streamId);
184                 }
185                 stateOnlyRemovalQueue.add(state);
186                 stateOnlyMap.put(state.streamId, state);
187             }
188         });
189     }
190 
191     @Override
192     public void updateStreamableBytes(StreamState state) {
193         state(state.stream()).updateStreamableBytes(streamableBytes(state),
194                                                     state.hasFrame() && state.windowSize() >= 0);
195     }
196 
197     @Override
198     public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
199         State state = state(childStreamId);
200         if (state == null) {
201             // If there is no State object that means there is no Http2Stream object and we would have to keep the
202             // State object in the stateOnlyMap and stateOnlyRemovalQueue. However if maxStateOnlySize is 0 this means
203             // stateOnlyMap and stateOnlyRemovalQueue are empty collections and cannot be modified so we drop the State.
204             if (maxStateOnlySize == 0) {
205                 return;
206             }
207             state = new State(childStreamId);
208             stateOnlyRemovalQueue.add(state);
209             stateOnlyMap.put(childStreamId, state);
210         }
211 
212         State newParent = state(parentStreamId);
213         if (newParent == null) {
214             // If there is no State object that means there is no Http2Stream object and we would have to keep the
215             // State object in the stateOnlyMap and stateOnlyRemovalQueue. However if maxStateOnlySize is 0 this means
216             // stateOnlyMap and stateOnlyRemovalQueue are empty collections and cannot be modified so we drop the State.
217             if (maxStateOnlySize == 0) {
218                 return;
219             }
220             newParent = new State(parentStreamId);
221             stateOnlyRemovalQueue.add(newParent);
222             stateOnlyMap.put(parentStreamId, newParent);
223             // Only the stream which was just added will change parents. So we only need an array of size 1.
224             List<ParentChangedEvent> events = new ArrayList<>(1);
225             connectionState.takeChild(newParent, false, events);
226             notifyParentChanged(events);
227         }
228 
229         // if activeCountForTree == 0 then it will not be in its parent's pseudoTimeQueue and thus should not be counted
230         // toward parent.totalQueuedWeights.
231         if (state.activeCountForTree != 0 && state.parent != null) {
232             state.parent.totalQueuedWeights += weight - state.weight;
233         }
234         state.weight = weight;
235 
236         if (newParent != state.parent || exclusive && newParent.children.size() != 1) {
237             final List<ParentChangedEvent> events;
238             if (newParent.isDescendantOf(state)) {
239                 events = new ArrayList<>(2 + (exclusive ? newParent.children.size() : 0));
240                 state.parent.takeChild(newParent, false, events);
241             } else {
242                 events = new ArrayList<>(1 + (exclusive ? newParent.children.size() : 0));
243             }
244             newParent.takeChild(state, exclusive, events);
245             notifyParentChanged(events);
246         }
247 
248         // The location in the dependency tree impacts the priority in the stateOnlyRemovalQueue map. If we created new
249         // State objects we must check if we exceeded the limit after we insert into the dependency tree to ensure the
250         // stateOnlyRemovalQueue has been updated.
251         while (stateOnlyRemovalQueue.size() > maxStateOnlySize) {
252             State stateToRemove = stateOnlyRemovalQueue.poll();
253             stateToRemove.parent.removeChild(stateToRemove);
254             stateOnlyMap.remove(stateToRemove.streamId);
255         }
256     }
257 
258     @Override
259     public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
260         // As long as there is some active frame we should write at least 1 time.
261         if (connectionState.activeCountForTree == 0) {
262             return false;
263         }
264 
265         // The goal is to write until we write all the allocated bytes or are no longer making progress.
266         // We still attempt to write even after the number of allocated bytes has been exhausted to allow empty frames
267         // to be sent. Making progress means the active streams rooted at the connection stream has changed.
268         int oldIsActiveCountForTree;
269         do {
270             oldIsActiveCountForTree = connectionState.activeCountForTree;
271             // connectionState will never be active, so go right to its children.
272             maxBytes -= distributeToChildren(maxBytes, writer, connectionState);
273         } while (connectionState.activeCountForTree != 0 &&
274                 (maxBytes > 0 || oldIsActiveCountForTree != connectionState.activeCountForTree));
275 
276         return connectionState.activeCountForTree != 0;
277     }
278 
279     /**
280      * Sets the amount of bytes that will be allocated to each stream. Defaults to 1KiB.
281      * @param allocationQuantum the amount of bytes that will be allocated to each stream. Must be &gt; 0.
282      */
283     public void allocationQuantum(int allocationQuantum) {
284         checkPositive(allocationQuantum, "allocationQuantum");
285         this.allocationQuantum = allocationQuantum;
286     }
287 
288     private int distribute(int maxBytes, Writer writer, State state) throws Http2Exception {
289         if (state.isActive()) {
290             int nsent = min(maxBytes, state.streamableBytes);
291             state.write(nsent, writer);
292             if (nsent == 0 && maxBytes != 0) {
293                 // If a stream sends zero bytes, then we gave it a chance to write empty frames and it is now
294                 // considered inactive until the next call to updateStreamableBytes. This allows descendant streams to
295                 // be allocated bytes when the parent stream can't utilize them. This may be as a result of the
296                 // stream's flow control window being 0.
297                 state.updateStreamableBytes(state.streamableBytes, false);
298             }
299             return nsent;
300         }
301 
302         return distributeToChildren(maxBytes, writer, state);
303     }
304 
305     /**
306      * It is a pre-condition that {@code state.poll()} returns a non-{@code null} value. This is a result of the way
307      * the allocation algorithm is structured and can be explained in the following cases:
308      * <h3>For the recursive case</h3>
309      * If a stream has no children (in the allocation tree) than that node must be active or it will not be in the
310      * allocation tree. If a node is active then it will not delegate to children and recursion ends.
311      * <h3>For the initial case</h3>
312      * We check connectionState.activeCountForTree == 0 before any allocation is done. So if the connection stream
313      * has no active children we don't get into this method.
314      */
315     private int distributeToChildren(int maxBytes, Writer writer, State state) throws Http2Exception {
316         long oldTotalQueuedWeights = state.totalQueuedWeights;
317         State childState = state.pollPseudoTimeQueue();
318         State nextChildState = state.peekPseudoTimeQueue();
319         childState.setDistributing();
320         try {
321             assert nextChildState == null || nextChildState.pseudoTimeToWrite >= childState.pseudoTimeToWrite :
322                 "nextChildState[" + nextChildState.streamId + "].pseudoTime(" + nextChildState.pseudoTimeToWrite +
323                 ") < " + " childState[" + childState.streamId + "].pseudoTime(" + childState.pseudoTimeToWrite + ')';
324             int nsent = distribute(nextChildState == null ? maxBytes :
325                             min(maxBytes, (int) min((nextChildState.pseudoTimeToWrite - childState.pseudoTimeToWrite) *
326                                                childState.weight / oldTotalQueuedWeights + allocationQuantum, MAX_VALUE)
327                                ),
328                                writer,
329                                childState);
330             state.pseudoTime += nsent;
331             childState.updatePseudoTime(state, nsent, oldTotalQueuedWeights);
332             return nsent;
333         } finally {
334             childState.unsetDistributing();
335             // Do in finally to ensure the internal flags is not corrupted if an exception is thrown.
336             // The offer operation is delayed until we unroll up the recursive stack, so we don't have to remove from
337             // the priority pseudoTimeQueue due to a write operation.
338             if (childState.activeCountForTree != 0) {
339                 state.offerPseudoTimeQueue(childState);
340             }
341         }
342     }
343 
344     private State state(Http2Stream stream) {
345         return stream.getProperty(stateKey);
346     }
347 
348     private State state(int streamId) {
349         Http2Stream stream = connection.stream(streamId);
350         return stream != null ? state(stream) : stateOnlyMap.get(streamId);
351     }
352 
353     /**
354      * For testing only!
355      */
356     boolean isChild(int childId, int parentId, short weight) {
357         State parent = state(parentId);
358         State child;
359         return parent.children.containsKey(childId) &&
360                 (child = state(childId)).parent == parent && child.weight == weight;
361     }
362 
363     /**
364      * For testing only!
365      */
366     int numChildren(int streamId) {
367         State state = state(streamId);
368         return state == null ? 0 : state.children.size();
369     }
370 
371     /**
372      * Notify all listeners of the priority tree change events (in ascending order)
373      * @param events The events (top down order) which have changed
374      */
375     void notifyParentChanged(List<ParentChangedEvent> events) {
376         for (int i = 0; i < events.size(); ++i) {
377             ParentChangedEvent event = events.get(i);
378             stateOnlyRemovalQueue.priorityChanged(event.state);
379             if (event.state.parent != null && event.state.activeCountForTree != 0) {
380                 event.state.parent.offerAndInitializePseudoTime(event.state);
381                 event.state.parent.activeCountChangeForTree(event.state.activeCountForTree);
382             }
383         }
384     }
385 
386     /**
387      * A comparator for {@link State} which has no associated {@link Http2Stream} object. The general precedence is:
388      * <ul>
389      *     <li>Was a stream activated or reserved (streams only used for priority are higher priority)</li>
390      *     <li>Depth in the priority tree (closer to root is higher priority></li>
391      *     <li>Stream ID (higher stream ID is higher priority - used for tie breaker)</li>
392      * </ul>
393      */
394     private static final class StateOnlyComparator implements Comparator<State>, Serializable {
395         private static final long serialVersionUID = -4806936913002105966L;
396 
397         static final StateOnlyComparator INSTANCE = new StateOnlyComparator();
398 
399         @Override
400         public int compare(State o1, State o2) {
401             // "priority only streams" (which have not been activated) are higher priority than streams used for data.
402             boolean o1Actived = o1.wasStreamReservedOrActivated();
403             if (o1Actived != o2.wasStreamReservedOrActivated()) {
404                 return o1Actived ? -1 : 1;
405             }
406             // Numerically greater depth is higher priority.
407             int x = o2.dependencyTreeDepth - o1.dependencyTreeDepth;
408 
409             // I also considered tracking the number of streams which are "activated" (eligible transfer data) at each
410             // subtree. This would require a traversal from each node to the root on dependency tree structural changes,
411             // and then it would require a re-prioritization at each of these nodes (instead of just the nodes where the
412             // direct parent changed). The costs of this are judged to be relatively high compared to the nominal
413             // benefit it provides to the heuristic. Instead folks should just increase maxStateOnlySize.
414 
415             // Last resort is to give larger stream ids more priority.
416             return x != 0 ? x : o1.streamId - o2.streamId;
417         }
418     }
419 
420     private static final class StatePseudoTimeComparator implements Comparator<State>, Serializable {
421         private static final long serialVersionUID = -1437548640227161828L;
422 
423         static final StatePseudoTimeComparator INSTANCE = new StatePseudoTimeComparator();
424 
425         @Override
426         public int compare(State o1, State o2) {
427             return Long.compare(o1.pseudoTimeToWrite, o2.pseudoTimeToWrite);
428         }
429     }
430 
431     /**
432      * The remote flow control state for a single stream.
433      */
434     private final class State implements PriorityQueueNode {
435         private static final byte STATE_IS_ACTIVE = 0x1;
436         private static final byte STATE_IS_DISTRIBUTING = 0x2;
437         private static final byte STATE_STREAM_ACTIVATED = 0x4;
438 
439         /**
440          * Maybe {@code null} if the stream if the stream is not active.
441          */
442         Http2Stream stream;
443         State parent;
444         IntObjectMap<State> children = IntCollections.emptyMap();
445         private final PriorityQueue<State> pseudoTimeQueue;
446         final int streamId;
447         int streamableBytes;
448         int dependencyTreeDepth;
449         /**
450          * Count of nodes rooted at this sub tree with {@link #isActive()} equal to {@code true}.
451          */
452         int activeCountForTree;
453         private int pseudoTimeQueueIndex = INDEX_NOT_IN_QUEUE;
454         private int stateOnlyQueueIndex = INDEX_NOT_IN_QUEUE;
455         /**
456          * An estimate of when this node should be given the opportunity to write data.
457          */
458         long pseudoTimeToWrite;
459         /**
460          * A pseudo time maintained for immediate children to base their {@link #pseudoTimeToWrite} off of.
461          */
462         long pseudoTime;
463         long totalQueuedWeights;
464         private byte flags;
465         short weight = DEFAULT_PRIORITY_WEIGHT;
466 
467         State(int streamId) {
468             this(streamId, null, 0);
469         }
470 
471         State(Http2Stream stream) {
472             this(stream, 0);
473         }
474 
475         State(Http2Stream stream, int initialSize) {
476             this(stream.id(), stream, initialSize);
477         }
478 
479         State(int streamId, Http2Stream stream, int initialSize) {
480             this.stream = stream;
481             this.streamId = streamId;
482             pseudoTimeQueue = new DefaultPriorityQueue<>(StatePseudoTimeComparator.INSTANCE, initialSize);
483         }
484 
485         boolean isDescendantOf(State state) {
486             State next = parent;
487             while (next != null) {
488                 if (next == state) {
489                     return true;
490                 }
491                 next = next.parent;
492             }
493             return false;
494         }
495 
496         void takeChild(State child, boolean exclusive, List<ParentChangedEvent> events) {
497             takeChild(null, child, exclusive, events);
498         }
499 
500         /**
501          * Adds a child to this priority. If exclusive is set, any children of this node are moved to being dependent on
502          * the child.
503          */
504         void takeChild(Iterator<IntObjectMap.PrimitiveEntry<State>> childItr, State child, boolean exclusive,
505                        List<ParentChangedEvent> events) {
506             State oldParent = child.parent;
507 
508             if (oldParent != this) {
509                 events.add(new ParentChangedEvent(child, oldParent));
510                 child.setParent(this);
511                 // If the childItr is not null we are iterating over the oldParent.children collection and should
512                 // use the iterator to remove from the collection to avoid concurrent modification. Otherwise it is
513                 // assumed we are not iterating over this collection and it is safe to call remove directly.
514                 if (childItr != null) {
515                     childItr.remove();
516                 } else if (oldParent != null) {
517                     oldParent.children.remove(child.streamId);
518                 }
519 
520                 // Lazily initialize the children to save object allocations.
521                 initChildrenIfEmpty();
522 
523                 final State oldChild = children.put(child.streamId, child);
524                 assert oldChild == null : "A stream with the same stream ID was already in the child map.";
525             }
526 
527             if (exclusive && !children.isEmpty()) {
528                 // If it was requested that this child be the exclusive dependency of this node,
529                 // move any previous children to the child node, becoming grand children of this node.
530                 Iterator<IntObjectMap.PrimitiveEntry<State>> itr = removeAllChildrenExcept(child).entries().iterator();
531                 while (itr.hasNext()) {
532                     child.takeChild(itr, itr.next().value(), false, events);
533                 }
534             }
535         }
536 
537         /**
538          * Removes the child priority and moves any of its dependencies to being direct dependencies on this node.
539          */
540         void removeChild(State child) {
541             if (children.remove(child.streamId) != null) {
542                 List<ParentChangedEvent> events = new ArrayList<>(1 + child.children.size());
543                 events.add(new ParentChangedEvent(child, child.parent));
544                 child.setParent(null);
545 
546                 if (!child.children.isEmpty()) {
547                     // Move up any grand children to be directly dependent on this node.
548                     Iterator<IntObjectMap.PrimitiveEntry<State>> itr = child.children.entries().iterator();
549                     long totalWeight = child.getTotalWeight();
550                     do {
551                         // Redistribute the weight of child to its dependency proportionally.
552                         State dependency = itr.next().value();
553                         dependency.weight = (short) max(1, dependency.weight * child.weight / totalWeight);
554                         takeChild(itr, dependency, false, events);
555                     } while (itr.hasNext());
556                 }
557 
558                 notifyParentChanged(events);
559             }
560         }
561 
562         private long getTotalWeight() {
563             long totalWeight = 0L;
564             for (State state : children.values()) {
565                 totalWeight += state.weight;
566             }
567             return totalWeight;
568         }
569 
570         /**
571          * Remove all children with the exception of {@code streamToRetain}.
572          * This method is intended to be used to support an exclusive priority dependency operation.
573          * @return The map of children prior to this operation, excluding {@code streamToRetain} if present.
574          */
575         private IntObjectMap<State> removeAllChildrenExcept(State stateToRetain) {
576             stateToRetain = children.remove(stateToRetain.streamId);
577             IntObjectMap<State> prevChildren = children;
578             // This map should be re-initialized in anticipation for the 1 exclusive child which will be added.
579             // It will either be added directly in this method, or after this method is called...but it will be added.
580             initChildren();
581             if (stateToRetain != null) {
582                 children.put(stateToRetain.streamId, stateToRetain);
583             }
584             return prevChildren;
585         }
586 
587         private void setParent(State newParent) {
588             // if activeCountForTree == 0 then it will not be in its parent's pseudoTimeQueue.
589             if (activeCountForTree != 0 && parent != null) {
590                 parent.removePseudoTimeQueue(this);
591                 parent.activeCountChangeForTree(-activeCountForTree);
592             }
593             parent = newParent;
594             // Use MAX_VALUE if no parent because lower depth is considered higher priority by StateOnlyComparator.
595             dependencyTreeDepth = newParent == null ? MAX_VALUE : newParent.dependencyTreeDepth + 1;
596         }
597 
598         private void initChildrenIfEmpty() {
599             if (children == IntCollections.<State>emptyMap()) {
600                 initChildren();
601             }
602         }
603 
604         private void initChildren() {
605             children = new IntObjectHashMap<State>(INITIAL_CHILDREN_MAP_SIZE);
606         }
607 
608         void write(int numBytes, Writer writer) throws Http2Exception {
609             assert stream != null;
610             try {
611                 writer.write(stream, numBytes);
612             } catch (Throwable t) {
613                 throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
614             }
615         }
616 
617         void activeCountChangeForTree(int increment) {
618             assert activeCountForTree + increment >= 0;
619             activeCountForTree += increment;
620             if (parent != null) {
621                 assert activeCountForTree != increment ||
622                        pseudoTimeQueueIndex == INDEX_NOT_IN_QUEUE ||
623                        parent.pseudoTimeQueue.containsTyped(this) :
624                      "State[" + streamId + "].activeCountForTree changed from 0 to " + increment + " is in a " +
625                      "pseudoTimeQueue, but not in parent[ " + parent.streamId + "]'s pseudoTimeQueue";
626                 if (activeCountForTree == 0) {
627                     parent.removePseudoTimeQueue(this);
628                 } else if (activeCountForTree == increment && !isDistributing()) {
629                     // If frame count was 0 but is now not, and this node is not already in a pseudoTimeQueue (assumed
630                     // to be pState's pseudoTimeQueue) then enqueue it. If this State object is being processed the
631                     // pseudoTime for this node should not be adjusted, and the node will be added back to the
632                     // pseudoTimeQueue/tree structure after it is done being processed. This may happen if the
633                     // activeCountForTree == 0 (a node which can't stream anything and is blocked) is at/near root of
634                     // the tree, and is popped off the pseudoTimeQueue during processing, and then put back on the
635                     // pseudoTimeQueue because a child changes position in the priority tree (or is closed because it is
636                     // not blocked and finished writing all data).
637                     parent.offerAndInitializePseudoTime(this);
638                 }
639                 parent.activeCountChangeForTree(increment);
640             }
641         }
642 
643         void updateStreamableBytes(int newStreamableBytes, boolean isActive) {
644             if (isActive() != isActive) {
645                 if (isActive) {
646                     activeCountChangeForTree(1);
647                     setActive();
648                 } else {
649                     activeCountChangeForTree(-1);
650                     unsetActive();
651                 }
652             }
653 
654             streamableBytes = newStreamableBytes;
655         }
656 
657         /**
658          * Assumes the parents {@link #totalQueuedWeights} includes this node's weight.
659          */
660         void updatePseudoTime(State parentState, int nsent, long totalQueuedWeights) {
661             assert streamId != CONNECTION_STREAM_ID && nsent >= 0;
662             // If the current pseudoTimeToSend is greater than parentState.pseudoTime then we previously over accounted
663             // and should use parentState.pseudoTime.
664             pseudoTimeToWrite = min(pseudoTimeToWrite, parentState.pseudoTime) + nsent * totalQueuedWeights / weight;
665         }
666 
667         /**
668          * The concept of pseudoTime can be influenced by priority tree manipulations or if a stream goes from "active"
669          * to "non-active". This method accounts for that by initializing the {@link #pseudoTimeToWrite} for
670          * {@code state} to {@link #pseudoTime} of this node and then calls {@link #offerPseudoTimeQueue(State)}.
671          */
672         void offerAndInitializePseudoTime(State state) {
673             state.pseudoTimeToWrite = pseudoTime;
674             offerPseudoTimeQueue(state);
675         }
676 
677         void offerPseudoTimeQueue(State state) {
678             pseudoTimeQueue.offer(state);
679             totalQueuedWeights += state.weight;
680         }
681 
682         /**
683          * Must only be called if the pseudoTimeQueue is non-empty!
684          */
685         State pollPseudoTimeQueue() {
686             State state = pseudoTimeQueue.poll();
687             // This method is only ever called if the pseudoTimeQueue is non-empty.
688             totalQueuedWeights -= state.weight;
689             return state;
690         }
691 
692         void removePseudoTimeQueue(State state) {
693             if (pseudoTimeQueue.removeTyped(state)) {
694                 totalQueuedWeights -= state.weight;
695             }
696         }
697 
698         State peekPseudoTimeQueue() {
699             return pseudoTimeQueue.peek();
700         }
701 
702         void close() {
703             updateStreamableBytes(0, false);
704             stream = null;
705         }
706 
707         boolean wasStreamReservedOrActivated() {
708             return (flags & STATE_STREAM_ACTIVATED) != 0;
709         }
710 
711         void setStreamReservedOrActivated() {
712             flags |= STATE_STREAM_ACTIVATED;
713         }
714 
715         boolean isActive() {
716             return (flags & STATE_IS_ACTIVE) != 0;
717         }
718 
719         private void setActive() {
720             flags |= STATE_IS_ACTIVE;
721         }
722 
723         private void unsetActive() {
724             flags &= ~STATE_IS_ACTIVE;
725         }
726 
727         boolean isDistributing() {
728             return (flags & STATE_IS_DISTRIBUTING) != 0;
729         }
730 
731         void setDistributing() {
732             flags |= STATE_IS_DISTRIBUTING;
733         }
734 
735         void unsetDistributing() {
736             flags &= ~STATE_IS_DISTRIBUTING;
737         }
738 
739         @Override
740         public int priorityQueueIndex(DefaultPriorityQueue<?> queue) {
741             return queue == stateOnlyRemovalQueue ? stateOnlyQueueIndex : pseudoTimeQueueIndex;
742         }
743 
744         @Override
745         public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i) {
746             if (queue == stateOnlyRemovalQueue) {
747                 stateOnlyQueueIndex = i;
748             } else {
749                 pseudoTimeQueueIndex = i;
750             }
751         }
752 
753         @Override
754         public String toString() {
755             // Use activeCountForTree as a rough estimate for how many nodes are in this subtree.
756             StringBuilder sb = new StringBuilder(256 * (activeCountForTree > 0 ? activeCountForTree : 1));
757             toString(sb);
758             return sb.toString();
759         }
760 
761         private void toString(StringBuilder sb) {
762             sb.append("{streamId ").append(streamId)
763                     .append(" streamableBytes ").append(streamableBytes)
764                     .append(" activeCountForTree ").append(activeCountForTree)
765                     .append(" pseudoTimeQueueIndex ").append(pseudoTimeQueueIndex)
766                     .append(" pseudoTimeToWrite ").append(pseudoTimeToWrite)
767                     .append(" pseudoTime ").append(pseudoTime)
768                     .append(" flags ").append(flags)
769                     .append(" pseudoTimeQueue.size() ").append(pseudoTimeQueue.size())
770                     .append(" stateOnlyQueueIndex ").append(stateOnlyQueueIndex)
771                     .append(" parent.streamId ").append(parent == null ? -1 : parent.streamId).append("} [");
772 
773             if (!pseudoTimeQueue.isEmpty()) {
774                 for (State s : pseudoTimeQueue) {
775                     s.toString(sb);
776                     sb.append(", ");
777                 }
778                 // Remove the last ", "
779                 sb.setLength(sb.length() - 2);
780             }
781             sb.append(']');
782         }
783     }
784 
785     /**
786      * Allows a correlation to be made between a stream and its old parent before a parent change occurs.
787      */
788     private static final class ParentChangedEvent {
789         final State state;
790         final State oldParent;
791 
792         /**
793          * Create a new instance.
794          * @param state The state who has had a parent change.
795          * @param oldParent The previous parent.
796          */
797         ParentChangedEvent(State state, State oldParent) {
798             this.state = state;
799             this.oldParent = oldParent;
800         }
801     }
802 }