View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.util.collection.IntCollections;
18  import io.netty.util.collection.IntObjectHashMap;
19  import io.netty.util.collection.IntObjectMap;
20  import io.netty.util.internal.DefaultPriorityQueue;
21  import io.netty.util.internal.EmptyPriorityQueue;
22  import io.netty.util.internal.PriorityQueue;
23  import io.netty.util.internal.PriorityQueueNode;
24  import io.netty.util.internal.SystemPropertyUtil;
25  
26  import java.io.Serializable;
27  import java.util.ArrayList;
28  import java.util.Comparator;
29  import java.util.Iterator;
30  import java.util.List;
31  
32  import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
33  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MIN_ALLOCATION_CHUNK;
34  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
35  import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes;
36  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
37  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
38  import static io.netty.util.internal.ObjectUtil.checkPositive;
39  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
40  import static java.lang.Integer.MAX_VALUE;
41  import static java.lang.Math.max;
42  import static java.lang.Math.min;
43  
44  /**
45   * A {@link StreamByteDistributor} that is sensitive to stream priority and uses
46   * <a href="https://en.wikipedia.org/wiki/Weighted_fair_queueing">Weighted Fair Queueing</a> approach for distributing
47   * bytes.
48   * <p>
49   * Inspiration for this distributor was taken from Linux's
50   * <a href="https://www.kernel.org/doc/Documentation/scheduler/sched-design-CFS.txt">Completely Fair Scheduler</a>
51   * to model the distribution of bytes to simulate an "ideal multi-tasking CPU", but in this case we are simulating
52   * an "ideal multi-tasking NIC".
53   * <p>
54   * Each write operation will use the {@link #allocationQuantum(int)} to know how many more bytes should be allocated
55   * relative to the next stream which wants to write. This is to balance fairness while also considering goodput.
56   */
57  public final class WeightedFairQueueByteDistributor implements StreamByteDistributor {
58      /**
59       * The initial size of the children map is chosen to be conservative on initial memory allocations under
60       * the assumption that most streams will have a small number of children. This choice may be
61       * sub-optimal if when children are present there are many children (i.e. a web page which has many
62       * dependencies to load).
63       *
64       * Visible only for testing!
65       */
66      static final int INITIAL_CHILDREN_MAP_SIZE =
67              max(1, SystemPropertyUtil.getInt("io.netty.http2.childrenMapSize", 2));
68      /**
69       * FireFox currently uses 5 streams to establish QoS classes.
70       */
71      private static final int DEFAULT_MAX_STATE_ONLY_SIZE = 5;
72  
73      private final Http2Connection.PropertyKey stateKey;
74      /**
75       * If there is no Http2Stream object, but we still persist priority information then this is where the state will
76       * reside.
77       */
78      private final IntObjectMap<State> stateOnlyMap;
79      /**
80       * This queue will hold streams that are not active and provides the capability to retain priority for streams which
81       * have no {@link Http2Stream} object. See {@link StateOnlyComparator} for the priority comparator.
82       */
83      private final PriorityQueue<State> stateOnlyRemovalQueue;
84      private final Http2Connection connection;
85      private final State connectionState;
86      /**
87       * The minimum number of bytes that we will attempt to allocate to a stream. This is to
88       * help improve goodput on a per-stream basis.
89       */
90      private int allocationQuantum = DEFAULT_MIN_ALLOCATION_CHUNK;
91      private final int maxStateOnlySize;
92  
93      public WeightedFairQueueByteDistributor(Http2Connection connection) {
94          this(connection, DEFAULT_MAX_STATE_ONLY_SIZE);
95      }
96  
97      public WeightedFairQueueByteDistributor(Http2Connection connection, int maxStateOnlySize) {
98          checkPositiveOrZero(maxStateOnlySize, "maxStateOnlySize");
99          if (maxStateOnlySize == 0) {
100             stateOnlyMap = IntCollections.emptyMap();
101             stateOnlyRemovalQueue = EmptyPriorityQueue.instance();
102         } else {
103             stateOnlyMap = new IntObjectHashMap<State>(maxStateOnlySize);
104             // +2 because we may exceed the limit by 2 if a new dependency has no associated Http2Stream object. We need
105             // to create the State objects to put them into the dependency tree, which then impacts priority.
106             stateOnlyRemovalQueue = new DefaultPriorityQueue<State>(StateOnlyComparator.INSTANCE, maxStateOnlySize + 2);
107         }
108         this.maxStateOnlySize = maxStateOnlySize;
109 
110         this.connection = connection;
111         stateKey = connection.newKey();
112         final Http2Stream connectionStream = connection.connectionStream();
113         connectionStream.setProperty(stateKey, connectionState = new State(connectionStream, 16));
114 
115         // Register for notification of new streams.
116         connection.addListener(new Http2ConnectionAdapter() {
117             @Override
118             public void onStreamAdded(Http2Stream stream) {
119                 State state = stateOnlyMap.remove(stream.id());
120                 if (state == null) {
121                     state = new State(stream);
122                     // Only the stream which was just added will change parents. So we only need an array of size 1.
123                     List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1);
124                     connectionState.takeChild(state, false, events);
125                     notifyParentChanged(events);
126                 } else {
127                     stateOnlyRemovalQueue.removeTyped(state);
128                     state.stream = stream;
129                 }
130                 switch (stream.state()) {
131                     case RESERVED_REMOTE:
132                     case RESERVED_LOCAL:
133                         state.setStreamReservedOrActivated();
134                         // wasStreamReservedOrActivated is part of the comparator for stateOnlyRemovalQueue there is no
135                         // need to reprioritize here because it will not be in stateOnlyRemovalQueue.
136                         break;
137                     default:
138                         break;
139                 }
140                 stream.setProperty(stateKey, state);
141             }
142 
143             @Override
144             public void onStreamActive(Http2Stream stream) {
145                 state(stream).setStreamReservedOrActivated();
146                 // wasStreamReservedOrActivated is part of the comparator for stateOnlyRemovalQueue there is no need to
147                 // reprioritize here because it will not be in stateOnlyRemovalQueue.
148             }
149 
150             @Override
151             public void onStreamClosed(Http2Stream stream) {
152                 state(stream).close();
153             }
154 
155             @Override
156             public void onStreamRemoved(Http2Stream stream) {
157                 // The stream has been removed from the connection. We can no longer rely on the stream's property
158                 // storage to track the State. If we have room, and the precedence of the stream is sufficient, we
159                 // should retain the State in the stateOnlyMap.
160                 State state = state(stream);
161 
162                 // Typically the stream is set to null when the stream is closed because it is no longer needed to write
163                 // data. However if the stream was not activated it may not be closed (reserved streams) so we ensure
164                 // the stream reference is set to null to avoid retaining a reference longer than necessary.
165                 state.stream = null;
166 
167                 if (WeightedFairQueueByteDistributor.this.maxStateOnlySize == 0) {
168                     state.parent.removeChild(state);
169                     return;
170                 }
171                 if (stateOnlyRemovalQueue.size() == WeightedFairQueueByteDistributor.this.maxStateOnlySize) {
172                     State stateToRemove = stateOnlyRemovalQueue.peek();
173                     if (StateOnlyComparator.INSTANCE.compare(stateToRemove, state) >= 0) {
174                         // The "lowest priority" stream is a "higher priority" than the stream being removed, so we
175                         // just discard the state.
176                         state.parent.removeChild(state);
177                         return;
178                     }
179                     stateOnlyRemovalQueue.poll();
180                     stateToRemove.parent.removeChild(stateToRemove);
181                     stateOnlyMap.remove(stateToRemove.streamId);
182                 }
183                 stateOnlyRemovalQueue.add(state);
184                 stateOnlyMap.put(state.streamId, state);
185             }
186         });
187     }
188 
189     @Override
190     public void updateStreamableBytes(StreamState state) {
191         state(state.stream()).updateStreamableBytes(streamableBytes(state),
192                                                     state.hasFrame() && state.windowSize() >= 0);
193     }
194 
195     @Override
196     public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
197         State state = state(childStreamId);
198         if (state == null) {
199             // If there is no State object that means there is no Http2Stream object and we would have to keep the
200             // State object in the stateOnlyMap and stateOnlyRemovalQueue. However if maxStateOnlySize is 0 this means
201             // stateOnlyMap and stateOnlyRemovalQueue are empty collections and cannot be modified so we drop the State.
202             if (maxStateOnlySize == 0) {
203                 return;
204             }
205             state = new State(childStreamId);
206             stateOnlyRemovalQueue.add(state);
207             stateOnlyMap.put(childStreamId, state);
208         }
209 
210         State newParent = state(parentStreamId);
211         if (newParent == null) {
212             // If there is no State object that means there is no Http2Stream object and we would have to keep the
213             // State object in the stateOnlyMap and stateOnlyRemovalQueue. However if maxStateOnlySize is 0 this means
214             // stateOnlyMap and stateOnlyRemovalQueue are empty collections and cannot be modified so we drop the State.
215             if (maxStateOnlySize == 0) {
216                 return;
217             }
218             newParent = new State(parentStreamId);
219             stateOnlyRemovalQueue.add(newParent);
220             stateOnlyMap.put(parentStreamId, newParent);
221             // Only the stream which was just added will change parents. So we only need an array of size 1.
222             List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1);
223             connectionState.takeChild(newParent, false, events);
224             notifyParentChanged(events);
225         }
226 
227         // if activeCountForTree == 0 then it will not be in its parent's pseudoTimeQueue and thus should not be counted
228         // toward parent.totalQueuedWeights.
229         if (state.activeCountForTree != 0 && state.parent != null) {
230             state.parent.totalQueuedWeights += weight - state.weight;
231         }
232         state.weight = weight;
233 
234         if (newParent != state.parent || exclusive && newParent.children.size() != 1) {
235             final List<ParentChangedEvent> events;
236             if (newParent.isDescendantOf(state)) {
237                 events = new ArrayList<ParentChangedEvent>(2 + (exclusive ? newParent.children.size() : 0));
238                 state.parent.takeChild(newParent, false, events);
239             } else {
240                 events = new ArrayList<ParentChangedEvent>(1 + (exclusive ? newParent.children.size() : 0));
241             }
242             newParent.takeChild(state, exclusive, events);
243             notifyParentChanged(events);
244         }
245 
246         // The location in the dependency tree impacts the priority in the stateOnlyRemovalQueue map. If we created new
247         // State objects we must check if we exceeded the limit after we insert into the dependency tree to ensure the
248         // stateOnlyRemovalQueue has been updated.
249         while (stateOnlyRemovalQueue.size() > maxStateOnlySize) {
250             State stateToRemove = stateOnlyRemovalQueue.poll();
251             stateToRemove.parent.removeChild(stateToRemove);
252             stateOnlyMap.remove(stateToRemove.streamId);
253         }
254     }
255 
256     @Override
257     public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
258         // As long as there is some active frame we should write at least 1 time.
259         if (connectionState.activeCountForTree == 0) {
260             return false;
261         }
262 
263         // The goal is to write until we write all the allocated bytes or are no longer making progress.
264         // We still attempt to write even after the number of allocated bytes has been exhausted to allow empty frames
265         // to be sent. Making progress means the active streams rooted at the connection stream has changed.
266         int oldIsActiveCountForTree;
267         do {
268             oldIsActiveCountForTree = connectionState.activeCountForTree;
269             // connectionState will never be active, so go right to its children.
270             maxBytes -= distributeToChildren(maxBytes, writer, connectionState);
271         } while (connectionState.activeCountForTree != 0 &&
272                 (maxBytes > 0 || oldIsActiveCountForTree != connectionState.activeCountForTree));
273 
274         return connectionState.activeCountForTree != 0;
275     }
276 
277     /**
278      * Sets the amount of bytes that will be allocated to each stream. Defaults to 1KiB.
279      * @param allocationQuantum the amount of bytes that will be allocated to each stream. Must be &gt; 0.
280      */
281     public void allocationQuantum(int allocationQuantum) {
282         checkPositive(allocationQuantum, "allocationQuantum");
283         this.allocationQuantum = allocationQuantum;
284     }
285 
286     private int distribute(int maxBytes, Writer writer, State state) throws Http2Exception {
287         if (state.isActive()) {
288             int nsent = min(maxBytes, state.streamableBytes);
289             state.write(nsent, writer);
290             if (nsent == 0 && maxBytes != 0) {
291                 // If a stream sends zero bytes, then we gave it a chance to write empty frames and it is now
292                 // considered inactive until the next call to updateStreamableBytes. This allows descendant streams to
293                 // be allocated bytes when the parent stream can't utilize them. This may be as a result of the
294                 // stream's flow control window being 0.
295                 state.updateStreamableBytes(state.streamableBytes, false);
296             }
297             return nsent;
298         }
299 
300         return distributeToChildren(maxBytes, writer, state);
301     }
302 
303     /**
304      * It is a pre-condition that {@code state.poll()} returns a non-{@code null} value. This is a result of the way
305      * the allocation algorithm is structured and can be explained in the following cases:
306      * <h3>For the recursive case</h3>
307      * If a stream has no children (in the allocation tree) than that node must be active or it will not be in the
308      * allocation tree. If a node is active then it will not delegate to children and recursion ends.
309      * <h3>For the initial case</h3>
310      * We check connectionState.activeCountForTree == 0 before any allocation is done. So if the connection stream
311      * has no active children we don't get into this method.
312      */
313     private int distributeToChildren(int maxBytes, Writer writer, State state) throws Http2Exception {
314         long oldTotalQueuedWeights = state.totalQueuedWeights;
315         State childState = state.pollPseudoTimeQueue();
316         State nextChildState = state.peekPseudoTimeQueue();
317         childState.setDistributing();
318         try {
319             assert nextChildState == null || nextChildState.pseudoTimeToWrite >= childState.pseudoTimeToWrite :
320                 "nextChildState[" + nextChildState.streamId + "].pseudoTime(" + nextChildState.pseudoTimeToWrite +
321                 ") < " + " childState[" + childState.streamId + "].pseudoTime(" + childState.pseudoTimeToWrite + ')';
322             int nsent = distribute(nextChildState == null ? maxBytes :
323                             min(maxBytes, (int) min((nextChildState.pseudoTimeToWrite - childState.pseudoTimeToWrite) *
324                                                childState.weight / oldTotalQueuedWeights + allocationQuantum, MAX_VALUE)
325                                ),
326                                writer,
327                                childState);
328             state.pseudoTime += nsent;
329             childState.updatePseudoTime(state, nsent, oldTotalQueuedWeights);
330             return nsent;
331         } finally {
332             childState.unsetDistributing();
333             // Do in finally to ensure the internal flags is not corrupted if an exception is thrown.
334             // The offer operation is delayed until we unroll up the recursive stack, so we don't have to remove from
335             // the priority pseudoTimeQueue due to a write operation.
336             if (childState.activeCountForTree != 0) {
337                 state.offerPseudoTimeQueue(childState);
338             }
339         }
340     }
341 
342     private State state(Http2Stream stream) {
343         return stream.getProperty(stateKey);
344     }
345 
346     private State state(int streamId) {
347         Http2Stream stream = connection.stream(streamId);
348         return stream != null ? state(stream) : stateOnlyMap.get(streamId);
349     }
350 
351     /**
352      * For testing only!
353      */
354     boolean isChild(int childId, int parentId, short weight) {
355         State parent = state(parentId);
356         State child;
357         return parent.children.containsKey(childId) &&
358                 (child = state(childId)).parent == parent && child.weight == weight;
359     }
360 
361     /**
362      * For testing only!
363      */
364     int numChildren(int streamId) {
365         State state = state(streamId);
366         return state == null ? 0 : state.children.size();
367     }
368 
369     /**
370      * Notify all listeners of the priority tree change events (in ascending order)
371      * @param events The events (top down order) which have changed
372      */
373     void notifyParentChanged(List<ParentChangedEvent> events) {
374         for (int i = 0; i < events.size(); ++i) {
375             ParentChangedEvent event = events.get(i);
376             stateOnlyRemovalQueue.priorityChanged(event.state);
377             if (event.state.parent != null && event.state.activeCountForTree != 0) {
378                 event.state.parent.offerAndInitializePseudoTime(event.state);
379                 event.state.parent.activeCountChangeForTree(event.state.activeCountForTree);
380             }
381         }
382     }
383 
384     /**
385      * A comparator for {@link State} which has no associated {@link Http2Stream} object. The general precedence is:
386      * <ul>
387      *     <li>Was a stream activated or reserved (streams only used for priority are higher priority)</li>
388      *     <li>Depth in the priority tree (closer to root is higher priority></li>
389      *     <li>Stream ID (higher stream ID is higher priority - used for tie breaker)</li>
390      * </ul>
391      */
392     private static final class StateOnlyComparator implements Comparator<State>, Serializable {
393         private static final long serialVersionUID = -4806936913002105966L;
394 
395         static final StateOnlyComparator INSTANCE = new StateOnlyComparator();
396 
397         @Override
398         public int compare(State o1, State o2) {
399             // "priority only streams" (which have not been activated) are higher priority than streams used for data.
400             boolean o1Actived = o1.wasStreamReservedOrActivated();
401             if (o1Actived != o2.wasStreamReservedOrActivated()) {
402                 return o1Actived ? -1 : 1;
403             }
404             // Numerically greater depth is higher priority.
405             int x = o2.dependencyTreeDepth - o1.dependencyTreeDepth;
406 
407             // I also considered tracking the number of streams which are "activated" (eligible transfer data) at each
408             // subtree. This would require a traversal from each node to the root on dependency tree structural changes,
409             // and then it would require a re-prioritization at each of these nodes (instead of just the nodes where the
410             // direct parent changed). The costs of this are judged to be relatively high compared to the nominal
411             // benefit it provides to the heuristic. Instead folks should just increase maxStateOnlySize.
412 
413             // Last resort is to give larger stream ids more priority.
414             return x != 0 ? x : o1.streamId - o2.streamId;
415         }
416     }
417 
418     private static final class StatePseudoTimeComparator implements Comparator<State>, Serializable {
419         private static final long serialVersionUID = -1437548640227161828L;
420 
421         static final StatePseudoTimeComparator INSTANCE = new StatePseudoTimeComparator();
422 
423         @Override
424         public int compare(State o1, State o2) {
425             return Long.compare(o1.pseudoTimeToWrite, o2.pseudoTimeToWrite);
426         }
427     }
428 
429     /**
430      * The remote flow control state for a single stream.
431      */
432     private final class State implements PriorityQueueNode {
433         private static final byte STATE_IS_ACTIVE = 0x1;
434         private static final byte STATE_IS_DISTRIBUTING = 0x2;
435         private static final byte STATE_STREAM_ACTIVATED = 0x4;
436 
437         /**
438          * Maybe {@code null} if the stream if the stream is not active.
439          */
440         Http2Stream stream;
441         State parent;
442         IntObjectMap<State> children = IntCollections.emptyMap();
443         private final PriorityQueue<State> pseudoTimeQueue;
444         final int streamId;
445         int streamableBytes;
446         int dependencyTreeDepth;
447         /**
448          * Count of nodes rooted at this sub tree with {@link #isActive()} equal to {@code true}.
449          */
450         int activeCountForTree;
451         private int pseudoTimeQueueIndex = INDEX_NOT_IN_QUEUE;
452         private int stateOnlyQueueIndex = INDEX_NOT_IN_QUEUE;
453         /**
454          * An estimate of when this node should be given the opportunity to write data.
455          */
456         long pseudoTimeToWrite;
457         /**
458          * A pseudo time maintained for immediate children to base their {@link #pseudoTimeToWrite} off of.
459          */
460         long pseudoTime;
461         long totalQueuedWeights;
462         private byte flags;
463         short weight = DEFAULT_PRIORITY_WEIGHT;
464 
465         State(int streamId) {
466             this(streamId, null, 0);
467         }
468 
469         State(Http2Stream stream) {
470             this(stream, 0);
471         }
472 
473         State(Http2Stream stream, int initialSize) {
474             this(stream.id(), stream, initialSize);
475         }
476 
477         State(int streamId, Http2Stream stream, int initialSize) {
478             this.stream = stream;
479             this.streamId = streamId;
480             pseudoTimeQueue = new DefaultPriorityQueue<State>(StatePseudoTimeComparator.INSTANCE, initialSize);
481         }
482 
483         boolean isDescendantOf(State state) {
484             State next = parent;
485             while (next != null) {
486                 if (next == state) {
487                     return true;
488                 }
489                 next = next.parent;
490             }
491             return false;
492         }
493 
494         void takeChild(State child, boolean exclusive, List<ParentChangedEvent> events) {
495             takeChild(null, child, exclusive, events);
496         }
497 
498         /**
499          * Adds a child to this priority. If exclusive is set, any children of this node are moved to being dependent on
500          * the child.
501          */
502         void takeChild(Iterator<IntObjectMap.PrimitiveEntry<State>> childItr, State child, boolean exclusive,
503                        List<ParentChangedEvent> events) {
504             State oldParent = child.parent;
505 
506             if (oldParent != this) {
507                 events.add(new ParentChangedEvent(child, oldParent));
508                 child.setParent(this);
509                 // If the childItr is not null we are iterating over the oldParent.children collection and should
510                 // use the iterator to remove from the collection to avoid concurrent modification. Otherwise it is
511                 // assumed we are not iterating over this collection and it is safe to call remove directly.
512                 if (childItr != null) {
513                     childItr.remove();
514                 } else if (oldParent != null) {
515                     oldParent.children.remove(child.streamId);
516                 }
517 
518                 // Lazily initialize the children to save object allocations.
519                 initChildrenIfEmpty();
520 
521                 final State oldChild = children.put(child.streamId, child);
522                 assert oldChild == null : "A stream with the same stream ID was already in the child map.";
523             }
524 
525             if (exclusive && !children.isEmpty()) {
526                 // If it was requested that this child be the exclusive dependency of this node,
527                 // move any previous children to the child node, becoming grand children of this node.
528                 Iterator<IntObjectMap.PrimitiveEntry<State>> itr = removeAllChildrenExcept(child).entries().iterator();
529                 while (itr.hasNext()) {
530                     child.takeChild(itr, itr.next().value(), false, events);
531                 }
532             }
533         }
534 
535         /**
536          * Removes the child priority and moves any of its dependencies to being direct dependencies on this node.
537          */
538         void removeChild(State child) {
539             if (children.remove(child.streamId) != null) {
540                 List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1 + child.children.size());
541                 events.add(new ParentChangedEvent(child, child.parent));
542                 child.setParent(null);
543 
544                 if (!child.children.isEmpty()) {
545                     // Move up any grand children to be directly dependent on this node.
546                     Iterator<IntObjectMap.PrimitiveEntry<State>> itr = child.children.entries().iterator();
547                     long totalWeight = child.getTotalWeight();
548                     do {
549                         // Redistribute the weight of child to its dependency proportionally.
550                         State dependency = itr.next().value();
551                         dependency.weight = (short) max(1, dependency.weight * child.weight / totalWeight);
552                         takeChild(itr, dependency, false, events);
553                     } while (itr.hasNext());
554                 }
555 
556                 notifyParentChanged(events);
557             }
558         }
559 
560         private long getTotalWeight() {
561             long totalWeight = 0L;
562             for (State state : children.values()) {
563                 totalWeight += state.weight;
564             }
565             return totalWeight;
566         }
567 
568         /**
569          * Remove all children with the exception of {@code streamToRetain}.
570          * This method is intended to be used to support an exclusive priority dependency operation.
571          * @return The map of children prior to this operation, excluding {@code streamToRetain} if present.
572          */
573         private IntObjectMap<State> removeAllChildrenExcept(State stateToRetain) {
574             stateToRetain = children.remove(stateToRetain.streamId);
575             IntObjectMap<State> prevChildren = children;
576             // This map should be re-initialized in anticipation for the 1 exclusive child which will be added.
577             // It will either be added directly in this method, or after this method is called...but it will be added.
578             initChildren();
579             if (stateToRetain != null) {
580                 children.put(stateToRetain.streamId, stateToRetain);
581             }
582             return prevChildren;
583         }
584 
585         private void setParent(State newParent) {
586             // if activeCountForTree == 0 then it will not be in its parent's pseudoTimeQueue.
587             if (activeCountForTree != 0 && parent != null) {
588                 parent.removePseudoTimeQueue(this);
589                 parent.activeCountChangeForTree(-activeCountForTree);
590             }
591             parent = newParent;
592             // Use MAX_VALUE if no parent because lower depth is considered higher priority by StateOnlyComparator.
593             dependencyTreeDepth = newParent == null ? MAX_VALUE : newParent.dependencyTreeDepth + 1;
594         }
595 
596         private void initChildrenIfEmpty() {
597             if (children == IntCollections.<State>emptyMap()) {
598                 initChildren();
599             }
600         }
601 
602         private void initChildren() {
603             children = new IntObjectHashMap<State>(INITIAL_CHILDREN_MAP_SIZE);
604         }
605 
606         void write(int numBytes, Writer writer) throws Http2Exception {
607             assert stream != null;
608             try {
609                 writer.write(stream, numBytes);
610             } catch (Throwable t) {
611                 throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
612             }
613         }
614 
615         void activeCountChangeForTree(int increment) {
616             assert activeCountForTree + increment >= 0;
617             activeCountForTree += increment;
618             if (parent != null) {
619                 assert activeCountForTree != increment ||
620                        pseudoTimeQueueIndex == INDEX_NOT_IN_QUEUE ||
621                        parent.pseudoTimeQueue.containsTyped(this) :
622                      "State[" + streamId + "].activeCountForTree changed from 0 to " + increment + " is in a " +
623                      "pseudoTimeQueue, but not in parent[ " + parent.streamId + "]'s pseudoTimeQueue";
624                 if (activeCountForTree == 0) {
625                     parent.removePseudoTimeQueue(this);
626                 } else if (activeCountForTree == increment && !isDistributing()) {
627                     // If frame count was 0 but is now not, and this node is not already in a pseudoTimeQueue (assumed
628                     // to be pState's pseudoTimeQueue) then enqueue it. If this State object is being processed the
629                     // pseudoTime for this node should not be adjusted, and the node will be added back to the
630                     // pseudoTimeQueue/tree structure after it is done being processed. This may happen if the
631                     // activeCountForTree == 0 (a node which can't stream anything and is blocked) is at/near root of
632                     // the tree, and is popped off the pseudoTimeQueue during processing, and then put back on the
633                     // pseudoTimeQueue because a child changes position in the priority tree (or is closed because it is
634                     // not blocked and finished writing all data).
635                     parent.offerAndInitializePseudoTime(this);
636                 }
637                 parent.activeCountChangeForTree(increment);
638             }
639         }
640 
641         void updateStreamableBytes(int newStreamableBytes, boolean isActive) {
642             if (isActive() != isActive) {
643                 if (isActive) {
644                     activeCountChangeForTree(1);
645                     setActive();
646                 } else {
647                     activeCountChangeForTree(-1);
648                     unsetActive();
649                 }
650             }
651 
652             streamableBytes = newStreamableBytes;
653         }
654 
655         /**
656          * Assumes the parents {@link #totalQueuedWeights} includes this node's weight.
657          */
658         void updatePseudoTime(State parentState, int nsent, long totalQueuedWeights) {
659             assert streamId != CONNECTION_STREAM_ID && nsent >= 0;
660             // If the current pseudoTimeToSend is greater than parentState.pseudoTime then we previously over accounted
661             // and should use parentState.pseudoTime.
662             pseudoTimeToWrite = min(pseudoTimeToWrite, parentState.pseudoTime) + nsent * totalQueuedWeights / weight;
663         }
664 
665         /**
666          * The concept of pseudoTime can be influenced by priority tree manipulations or if a stream goes from "active"
667          * to "non-active". This method accounts for that by initializing the {@link #pseudoTimeToWrite} for
668          * {@code state} to {@link #pseudoTime} of this node and then calls {@link #offerPseudoTimeQueue(State)}.
669          */
670         void offerAndInitializePseudoTime(State state) {
671             state.pseudoTimeToWrite = pseudoTime;
672             offerPseudoTimeQueue(state);
673         }
674 
675         void offerPseudoTimeQueue(State state) {
676             pseudoTimeQueue.offer(state);
677             totalQueuedWeights += state.weight;
678         }
679 
680         /**
681          * Must only be called if the pseudoTimeQueue is non-empty!
682          */
683         State pollPseudoTimeQueue() {
684             State state = pseudoTimeQueue.poll();
685             // This method is only ever called if the pseudoTimeQueue is non-empty.
686             totalQueuedWeights -= state.weight;
687             return state;
688         }
689 
690         void removePseudoTimeQueue(State state) {
691             if (pseudoTimeQueue.removeTyped(state)) {
692                 totalQueuedWeights -= state.weight;
693             }
694         }
695 
696         State peekPseudoTimeQueue() {
697             return pseudoTimeQueue.peek();
698         }
699 
700         void close() {
701             updateStreamableBytes(0, false);
702             stream = null;
703         }
704 
705         boolean wasStreamReservedOrActivated() {
706             return (flags & STATE_STREAM_ACTIVATED) != 0;
707         }
708 
709         void setStreamReservedOrActivated() {
710             flags |= STATE_STREAM_ACTIVATED;
711         }
712 
713         boolean isActive() {
714             return (flags & STATE_IS_ACTIVE) != 0;
715         }
716 
717         private void setActive() {
718             flags |= STATE_IS_ACTIVE;
719         }
720 
721         private void unsetActive() {
722             flags &= ~STATE_IS_ACTIVE;
723         }
724 
725         boolean isDistributing() {
726             return (flags & STATE_IS_DISTRIBUTING) != 0;
727         }
728 
729         void setDistributing() {
730             flags |= STATE_IS_DISTRIBUTING;
731         }
732 
733         void unsetDistributing() {
734             flags &= ~STATE_IS_DISTRIBUTING;
735         }
736 
737         @Override
738         public int priorityQueueIndex(DefaultPriorityQueue<?> queue) {
739             return queue == stateOnlyRemovalQueue ? stateOnlyQueueIndex : pseudoTimeQueueIndex;
740         }
741 
742         @Override
743         public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i) {
744             if (queue == stateOnlyRemovalQueue) {
745                 stateOnlyQueueIndex = i;
746             } else {
747                 pseudoTimeQueueIndex = i;
748             }
749         }
750 
751         @Override
752         public String toString() {
753             // Use activeCountForTree as a rough estimate for how many nodes are in this subtree.
754             StringBuilder sb = new StringBuilder(256 * (activeCountForTree > 0 ? activeCountForTree : 1));
755             toString(sb);
756             return sb.toString();
757         }
758 
759         private void toString(StringBuilder sb) {
760             sb.append("{streamId ").append(streamId)
761                     .append(" streamableBytes ").append(streamableBytes)
762                     .append(" activeCountForTree ").append(activeCountForTree)
763                     .append(" pseudoTimeQueueIndex ").append(pseudoTimeQueueIndex)
764                     .append(" pseudoTimeToWrite ").append(pseudoTimeToWrite)
765                     .append(" pseudoTime ").append(pseudoTime)
766                     .append(" flags ").append(flags)
767                     .append(" pseudoTimeQueue.size() ").append(pseudoTimeQueue.size())
768                     .append(" stateOnlyQueueIndex ").append(stateOnlyQueueIndex)
769                     .append(" parent.streamId ").append(parent == null ? -1 : parent.streamId).append("} [");
770 
771             if (!pseudoTimeQueue.isEmpty()) {
772                 for (State s : pseudoTimeQueue) {
773                     s.toString(sb);
774                     sb.append(", ");
775                 }
776                 // Remove the last ", "
777                 sb.setLength(sb.length() - 2);
778             }
779             sb.append(']');
780         }
781     }
782 
783     /**
784      * Allows a correlation to be made between a stream and its old parent before a parent change occurs.
785      */
786     private static final class ParentChangedEvent {
787         final State state;
788         final State oldParent;
789 
790         /**
791          * Create a new instance.
792          * @param state The state who has had a parent change.
793          * @param oldParent The previous parent.
794          */
795         ParentChangedEvent(State state, State oldParent) {
796             this.state = state;
797             this.oldParent = oldParent;
798         }
799     }
800 }