View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.util.collection.IntCollections;
18  import io.netty.util.collection.IntObjectHashMap;
19  import io.netty.util.collection.IntObjectMap;
20  import io.netty.util.internal.DefaultPriorityQueue;
21  import io.netty.util.internal.EmptyPriorityQueue;
22  import io.netty.util.internal.MathUtil;
23  import io.netty.util.internal.PriorityQueue;
24  import io.netty.util.internal.PriorityQueueNode;
25  import io.netty.util.internal.SystemPropertyUtil;
26  import io.netty.util.internal.UnstableApi;
27  
28  import java.io.Serializable;
29  import java.util.ArrayList;
30  import java.util.Comparator;
31  import java.util.Iterator;
32  import java.util.List;
33  
34  import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
35  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MIN_ALLOCATION_CHUNK;
36  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
37  import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes;
38  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
39  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
40  import static java.lang.Integer.MAX_VALUE;
41  import static java.lang.Math.max;
42  import static java.lang.Math.min;
43  
44  /**
45   * A {@link StreamByteDistributor} that is sensitive to stream priority and uses
46   * <a href="https://en.wikipedia.org/wiki/Weighted_fair_queueing">Weighted Fair Queueing</a> approach for distributing
47   * bytes.
48   * <p>
49   * Inspiration for this distributor was taken from Linux's
50   * <a href="https://www.kernel.org/doc/Documentation/scheduler/sched-design-CFS.txt">Completely Fair Scheduler</a>
51   * to model the distribution of bytes to simulate an "ideal multi-tasking CPU", but in this case we are simulating
52   * an "ideal multi-tasking NIC".
53   * <p>
54   * Each write operation will use the {@link #allocationQuantum(int)} to know how many more bytes should be allocated
55   * relative to the next stream which wants to write. This is to balance fairness while also considering goodput.
56   */
57  @UnstableApi
58  public final class WeightedFairQueueByteDistributor implements StreamByteDistributor {
59      /**
60       * The initial size of the children map is chosen to be conservative on initial memory allocations under
61       * the assumption that most streams will have a small number of children. This choice may be
62       * sub-optimal if when children are present there are many children (i.e. a web page which has many
63       * dependencies to load).
64       *
65       * Visible only for testing!
66       */
67      static final int INITIAL_CHILDREN_MAP_SIZE =
68              max(1, SystemPropertyUtil.getInt("io.netty.http2.childrenMapSize", 2));
69      /**
70       * FireFox currently uses 5 streams to establish QoS classes.
71       */
72      private static final int DEFAULT_MAX_STATE_ONLY_SIZE = 5;
73  
74      private final Http2Connection.PropertyKey stateKey;
75      /**
76       * If there is no Http2Stream object, but we still persist priority information then this is where the state will
77       * reside.
78       */
79      private final IntObjectMap<State> stateOnlyMap;
80      /**
81       * This queue will hold streams that are not active and provides the capability to retain priority for streams which
82       * have no {@link Http2Stream} object. See {@link StateOnlyComparator} for the priority comparator.
83       */
84      private final PriorityQueue<State> stateOnlyRemovalQueue;
85      private final Http2Connection connection;
86      private final State connectionState;
87      /**
88       * The minimum number of bytes that we will attempt to allocate to a stream. This is to
89       * help improve goodput on a per-stream basis.
90       */
91      private int allocationQuantum = DEFAULT_MIN_ALLOCATION_CHUNK;
92      private final int maxStateOnlySize;
93  
94      public WeightedFairQueueByteDistributor(Http2Connection connection) {
95          this(connection, DEFAULT_MAX_STATE_ONLY_SIZE);
96      }
97  
98      public WeightedFairQueueByteDistributor(Http2Connection connection, int maxStateOnlySize) {
99          if (maxStateOnlySize < 0) {
100             throw new IllegalArgumentException("maxStateOnlySize: " + maxStateOnlySize + " (expected: >0)");
101         } else if (maxStateOnlySize == 0) {
102             stateOnlyMap = IntCollections.emptyMap();
103             stateOnlyRemovalQueue = EmptyPriorityQueue.instance();
104         } else {
105             stateOnlyMap = new IntObjectHashMap<State>(maxStateOnlySize);
106             // +2 because we may exceed the limit by 2 if a new dependency has no associated Http2Stream object. We need
107             // to create the State objects to put them into the dependency tree, which then impacts priority.
108             stateOnlyRemovalQueue = new DefaultPriorityQueue<State>(StateOnlyComparator.INSTANCE, maxStateOnlySize + 2);
109         }
110         this.maxStateOnlySize = maxStateOnlySize;
111 
112         this.connection = connection;
113         stateKey = connection.newKey();
114         final Http2Stream connectionStream = connection.connectionStream();
115         connectionStream.setProperty(stateKey, connectionState = new State(connectionStream, 16));
116 
117         // Register for notification of new streams.
118         connection.addListener(new Http2ConnectionAdapter() {
119             @Override
120             public void onStreamAdded(Http2Stream stream) {
121                 State state = stateOnlyMap.remove(stream.id());
122                 if (state == null) {
123                     state = new State(stream);
124                     // Only the stream which was just added will change parents. So we only need an array of size 1.
125                     List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1);
126                     connectionState.takeChild(state, false, events);
127                     notifyParentChanged(events);
128                 } else {
129                     stateOnlyRemovalQueue.removeTyped(state);
130                     state.stream = stream;
131                 }
132                 switch (stream.state()) {
133                     case RESERVED_REMOTE:
134                     case RESERVED_LOCAL:
135                         state.setStreamReservedOrActivated();
136                         // wasStreamReservedOrActivated is part of the comparator for stateOnlyRemovalQueue there is no
137                         // need to reprioritize here because it will not be in stateOnlyRemovalQueue.
138                         break;
139                     default:
140                         break;
141                 }
142                 stream.setProperty(stateKey, state);
143             }
144 
145             @Override
146             public void onStreamActive(Http2Stream stream) {
147                 state(stream).setStreamReservedOrActivated();
148                 // wasStreamReservedOrActivated is part of the comparator for stateOnlyRemovalQueue there is no need to
149                 // reprioritize here because it will not be in stateOnlyRemovalQueue.
150             }
151 
152             @Override
153             public void onStreamClosed(Http2Stream stream) {
154                 state(stream).close();
155             }
156 
157             @Override
158             public void onStreamRemoved(Http2Stream stream) {
159                 // The stream has been removed from the connection. We can no longer rely on the stream's property
160                 // storage to track the State. If we have room, and the precedence of the stream is sufficient, we
161                 // should retain the State in the stateOnlyMap.
162                 State state = state(stream);
163 
164                 // Typically the stream is set to null when the stream is closed because it is no longer needed to write
165                 // data. However if the stream was not activated it may not be closed (reserved streams) so we ensure
166                 // the stream reference is set to null to avoid retaining a reference longer than necessary.
167                 state.stream = null;
168 
169                 if (WeightedFairQueueByteDistributor.this.maxStateOnlySize == 0) {
170                     state.parent.removeChild(state);
171                     return;
172                 }
173                 if (stateOnlyRemovalQueue.size() == WeightedFairQueueByteDistributor.this.maxStateOnlySize) {
174                     State stateToRemove = stateOnlyRemovalQueue.peek();
175                     if (StateOnlyComparator.INSTANCE.compare(stateToRemove, state) >= 0) {
176                         // The "lowest priority" stream is a "higher priority" than the stream being removed, so we
177                         // just discard the state.
178                         state.parent.removeChild(state);
179                         return;
180                     }
181                     stateOnlyRemovalQueue.poll();
182                     stateToRemove.parent.removeChild(stateToRemove);
183                     stateOnlyMap.remove(stateToRemove.streamId);
184                 }
185                 stateOnlyRemovalQueue.add(state);
186                 stateOnlyMap.put(state.streamId, state);
187             }
188         });
189     }
190 
191     @Override
192     public void updateStreamableBytes(StreamState state) {
193         state(state.stream()).updateStreamableBytes(streamableBytes(state),
194                                                     state.hasFrame() && state.windowSize() >= 0);
195     }
196 
197     @Override
198     public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
199         State state = state(childStreamId);
200         if (state == null) {
201             // If there is no State object that means there is no Http2Stream object and we would have to keep the
202             // State object in the stateOnlyMap and stateOnlyRemovalQueue. However if maxStateOnlySize is 0 this means
203             // stateOnlyMap and stateOnlyRemovalQueue are empty collections and cannot be modified so we drop the State.
204             if (maxStateOnlySize == 0) {
205                 return;
206             }
207             state = new State(childStreamId);
208             stateOnlyRemovalQueue.add(state);
209             stateOnlyMap.put(childStreamId, state);
210         }
211 
212         State newParent = state(parentStreamId);
213         if (newParent == null) {
214             // If there is no State object that means there is no Http2Stream object and we would have to keep the
215             // State object in the stateOnlyMap and stateOnlyRemovalQueue. However if maxStateOnlySize is 0 this means
216             // stateOnlyMap and stateOnlyRemovalQueue are empty collections and cannot be modified so we drop the State.
217             if (maxStateOnlySize == 0) {
218                 return;
219             }
220             newParent = new State(parentStreamId);
221             stateOnlyRemovalQueue.add(newParent);
222             stateOnlyMap.put(parentStreamId, newParent);
223             // Only the stream which was just added will change parents. So we only need an array of size 1.
224             List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1);
225             connectionState.takeChild(newParent, false, events);
226             notifyParentChanged(events);
227         }
228 
229         // if activeCountForTree == 0 then it will not be in its parent's pseudoTimeQueue and thus should not be counted
230         // toward parent.totalQueuedWeights.
231         if (state.activeCountForTree != 0 && state.parent != null) {
232             state.parent.totalQueuedWeights += weight - state.weight;
233         }
234         state.weight = weight;
235 
236         if (newParent != state.parent || (exclusive && newParent.children.size() != 1)) {
237             final List<ParentChangedEvent> events;
238             if (newParent.isDescendantOf(state)) {
239                 events = new ArrayList<ParentChangedEvent>(2 + (exclusive ? newParent.children.size() : 0));
240                 state.parent.takeChild(newParent, false, events);
241             } else {
242                 events = new ArrayList<ParentChangedEvent>(1 + (exclusive ? newParent.children.size() : 0));
243             }
244             newParent.takeChild(state, exclusive, events);
245             notifyParentChanged(events);
246         }
247 
248         // The location in the dependency tree impacts the priority in the stateOnlyRemovalQueue map. If we created new
249         // State objects we must check if we exceeded the limit after we insert into the dependency tree to ensure the
250         // stateOnlyRemovalQueue has been updated.
251         while (stateOnlyRemovalQueue.size() > maxStateOnlySize) {
252             State stateToRemove = stateOnlyRemovalQueue.poll();
253             stateToRemove.parent.removeChild(stateToRemove);
254             stateOnlyMap.remove(stateToRemove.streamId);
255         }
256     }
257 
258     @Override
259     public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
260         // As long as there is some active frame we should write at least 1 time.
261         if (connectionState.activeCountForTree == 0) {
262             return false;
263         }
264 
265         // The goal is to write until we write all the allocated bytes or are no longer making progress.
266         // We still attempt to write even after the number of allocated bytes has been exhausted to allow empty frames
267         // to be sent. Making progress means the active streams rooted at the connection stream has changed.
268         int oldIsActiveCountForTree;
269         do {
270             oldIsActiveCountForTree = connectionState.activeCountForTree;
271             // connectionState will never be active, so go right to its children.
272             maxBytes -= distributeToChildren(maxBytes, writer, connectionState);
273         } while (connectionState.activeCountForTree != 0 &&
274                 (maxBytes > 0 || oldIsActiveCountForTree != connectionState.activeCountForTree));
275 
276         return connectionState.activeCountForTree != 0;
277     }
278 
279     /**
280      * Sets the amount of bytes that will be allocated to each stream. Defaults to 1KiB.
281      * @param allocationQuantum the amount of bytes that will be allocated to each stream. Must be &gt; 0.
282      */
283     public void allocationQuantum(int allocationQuantum) {
284         if (allocationQuantum <= 0) {
285             throw new IllegalArgumentException("allocationQuantum must be > 0");
286         }
287         this.allocationQuantum = allocationQuantum;
288     }
289 
290     private int distribute(int maxBytes, Writer writer, State state) throws Http2Exception {
291         if (state.isActive()) {
292             int nsent = min(maxBytes, state.streamableBytes);
293             state.write(nsent, writer);
294             if (nsent == 0 && maxBytes != 0) {
295                 // If a stream sends zero bytes, then we gave it a chance to write empty frames and it is now
296                 // considered inactive until the next call to updateStreamableBytes. This allows descendant streams to
297                 // be allocated bytes when the parent stream can't utilize them. This may be as a result of the
298                 // stream's flow control window being 0.
299                 state.updateStreamableBytes(state.streamableBytes, false);
300             }
301             return nsent;
302         }
303 
304         return distributeToChildren(maxBytes, writer, state);
305     }
306 
307     /**
308      * It is a pre-condition that {@code state.poll()} returns a non-{@code null} value. This is a result of the way
309      * the allocation algorithm is structured and can be explained in the following cases:
310      * <h3>For the recursive case</h3>
311      * If a stream has no children (in the allocation tree) than that node must be active or it will not be in the
312      * allocation tree. If a node is active then it will not delegate to children and recursion ends.
313      * <h3>For the initial case</h3>
314      * We check connectionState.activeCountForTree == 0 before any allocation is done. So if the connection stream
315      * has no active children we don't get into this method.
316      */
317     private int distributeToChildren(int maxBytes, Writer writer, State state) throws Http2Exception {
318         long oldTotalQueuedWeights = state.totalQueuedWeights;
319         State childState = state.pollPseudoTimeQueue();
320         State nextChildState = state.peekPseudoTimeQueue();
321         childState.setDistributing();
322         try {
323             assert nextChildState == null || nextChildState.pseudoTimeToWrite >= childState.pseudoTimeToWrite :
324                 "nextChildState[" + nextChildState.streamId + "].pseudoTime(" + nextChildState.pseudoTimeToWrite +
325                 ") < " + " childState[" + childState.streamId + "].pseudoTime(" + childState.pseudoTimeToWrite + ")";
326             int nsent = distribute(nextChildState == null ? maxBytes :
327                             min(maxBytes, (int) min((nextChildState.pseudoTimeToWrite - childState.pseudoTimeToWrite) *
328                                                childState.weight / oldTotalQueuedWeights + allocationQuantum, MAX_VALUE)
329                                ),
330                                writer,
331                                childState);
332             state.pseudoTime += nsent;
333             childState.updatePseudoTime(state, nsent, oldTotalQueuedWeights);
334             return nsent;
335         } finally {
336             childState.unsetDistributing();
337             // Do in finally to ensure the internal flags is not corrupted if an exception is thrown.
338             // The offer operation is delayed until we unroll up the recursive stack, so we don't have to remove from
339             // the priority pseudoTimeQueue due to a write operation.
340             if (childState.activeCountForTree != 0) {
341                 state.offerPseudoTimeQueue(childState);
342             }
343         }
344     }
345 
346     private State state(Http2Stream stream) {
347         return stream.getProperty(stateKey);
348     }
349 
350     private State state(int streamId) {
351         Http2Stream stream = connection.stream(streamId);
352         return stream != null ? state(stream) : stateOnlyMap.get(streamId);
353     }
354 
355     /**
356      * For testing only!
357      */
358     int streamableBytes0(Http2Stream stream) {
359         return state(stream).streamableBytes;
360     }
361 
362     /**
363      * For testing only!
364      */
365     boolean isChild(int childId, int parentId, short weight) {
366         State parent = state(parentId);
367         State child;
368         return parent.children.containsKey(childId) &&
369                 (child = state(childId)).parent == parent && child.weight == weight;
370     }
371 
372     /**
373      * For testing only!
374      */
375     int numChildren(int streamId) {
376         State state = state(streamId);
377         return state == null ? 0 : state.children.size();
378     }
379 
380     /**
381      * Notify all listeners of the priority tree change events (in ascending order)
382      * @param events The events (top down order) which have changed
383      */
384     void notifyParentChanged(List<ParentChangedEvent> events) {
385         for (int i = 0; i < events.size(); ++i) {
386             ParentChangedEvent event = events.get(i);
387             stateOnlyRemovalQueue.priorityChanged(event.state);
388             if (event.state.parent != null && event.state.activeCountForTree != 0) {
389                 event.state.parent.offerAndInitializePseudoTime(event.state);
390                 event.state.parent.activeCountChangeForTree(event.state.activeCountForTree);
391             }
392         }
393     }
394 
395     /**
396      * A comparator for {@link State} which has no associated {@link Http2Stream} object. The general precedence is:
397      * <ul>
398      *     <li>Was a stream activated or reserved (streams only used for priority are higher priority)</li>
399      *     <li>Depth in the priority tree (closer to root is higher priority></li>
400      *     <li>Stream ID (higher stream ID is higher priority - used for tie breaker)</li>
401      * </ul>
402      */
403     private static final class StateOnlyComparator implements Comparator<State>, Serializable {
404         private static final long serialVersionUID = -4806936913002105966L;
405 
406         static final StateOnlyComparator INSTANCE = new StateOnlyComparator();
407 
408         private StateOnlyComparator() {
409         }
410 
411         @Override
412         public int compare(State o1, State o2) {
413             // "priority only streams" (which have not been activated) are higher priority than streams used for data.
414             boolean o1Actived = o1.wasStreamReservedOrActivated();
415             if (o1Actived != o2.wasStreamReservedOrActivated()) {
416                 return o1Actived ? -1 : 1;
417             }
418             // Numerically greater depth is higher priority.
419             int x = o2.dependencyTreeDepth - o1.dependencyTreeDepth;
420 
421             // I also considered tracking the number of streams which are "activated" (eligible transfer data) at each
422             // subtree. This would require a traversal from each node to the root on dependency tree structural changes,
423             // and then it would require a re-prioritization at each of these nodes (instead of just the nodes where the
424             // direct parent changed). The costs of this are judged to be relatively high compared to the nominal
425             // benefit it provides to the heuristic. Instead folks should just increase maxStateOnlySize.
426 
427             // Last resort is to give larger stream ids more priority.
428             return x != 0 ? x : o1.streamId - o2.streamId;
429         }
430     }
431 
432     private static final class StatePseudoTimeComparator implements Comparator<State>, Serializable {
433         private static final long serialVersionUID = -1437548640227161828L;
434 
435         static final StatePseudoTimeComparator INSTANCE = new StatePseudoTimeComparator();
436 
437         private StatePseudoTimeComparator() {
438         }
439 
440         @Override
441         public int compare(State o1, State o2) {
442             return MathUtil.compare(o1.pseudoTimeToWrite, o2.pseudoTimeToWrite);
443         }
444     }
445 
446     /**
447      * The remote flow control state for a single stream.
448      */
449     private final class State implements PriorityQueueNode {
450         private static final byte STATE_IS_ACTIVE = 0x1;
451         private static final byte STATE_IS_DISTRIBUTING = 0x2;
452         private static final byte STATE_STREAM_ACTIVATED = 0x4;
453 
454         /**
455          * Maybe {@code null} if the stream if the stream is not active.
456          */
457         Http2Stream stream;
458         State parent;
459         IntObjectMap<State> children = IntCollections.emptyMap();
460         private final PriorityQueue<State> pseudoTimeQueue;
461         final int streamId;
462         int streamableBytes;
463         int dependencyTreeDepth;
464         /**
465          * Count of nodes rooted at this sub tree with {@link #isActive()} equal to {@code true}.
466          */
467         int activeCountForTree;
468         private int pseudoTimeQueueIndex = INDEX_NOT_IN_QUEUE;
469         private int stateOnlyQueueIndex = INDEX_NOT_IN_QUEUE;
470         /**
471          * An estimate of when this node should be given the opportunity to write data.
472          */
473         long pseudoTimeToWrite;
474         /**
475          * A pseudo time maintained for immediate children to base their {@link #pseudoTimeToWrite} off of.
476          */
477         long pseudoTime;
478         long totalQueuedWeights;
479         private byte flags;
480         short weight = DEFAULT_PRIORITY_WEIGHT;
481 
482         State(int streamId) {
483             this(streamId, null, 0);
484         }
485 
486         State(Http2Stream stream) {
487             this(stream, 0);
488         }
489 
490         State(Http2Stream stream, int initialSize) {
491             this(stream.id(), stream, initialSize);
492         }
493 
494         State(int streamId, Http2Stream stream, int initialSize) {
495             this.stream = stream;
496             this.streamId = streamId;
497             pseudoTimeQueue = new DefaultPriorityQueue<State>(StatePseudoTimeComparator.INSTANCE, initialSize);
498         }
499 
500         boolean isDescendantOf(State state) {
501             State next = parent;
502             while (next != null) {
503                 if (next == state) {
504                     return true;
505                 }
506                 next = next.parent;
507             }
508             return false;
509         }
510 
511         void takeChild(State child, boolean exclusive, List<ParentChangedEvent> events) {
512             takeChild(null, child, exclusive, events);
513         }
514 
515         /**
516          * Adds a child to this priority. If exclusive is set, any children of this node are moved to being dependent on
517          * the child.
518          */
519         void takeChild(Iterator<IntObjectMap.PrimitiveEntry<State>> childItr, State child, boolean exclusive,
520                        List<ParentChangedEvent> events) {
521             State oldParent = child.parent;
522 
523             if (oldParent != this) {
524                 events.add(new ParentChangedEvent(child, oldParent));
525                 child.setParent(this);
526                 // If the childItr is not null we are iterating over the oldParent.children collection and should
527                 // use the iterator to remove from the collection to avoid concurrent modification. Otherwise it is
528                 // assumed we are not iterating over this collection and it is safe to call remove directly.
529                 if (childItr != null) {
530                     childItr.remove();
531                 } else if (oldParent != null) {
532                     oldParent.children.remove(child.streamId);
533                 }
534 
535                 // Lazily initialize the children to save object allocations.
536                 initChildrenIfEmpty();
537 
538                 final State oldChild = children.put(child.streamId, child);
539                 assert oldChild == null : "A stream with the same stream ID was already in the child map.";
540             }
541 
542             if (exclusive && !children.isEmpty()) {
543                 // If it was requested that this child be the exclusive dependency of this node,
544                 // move any previous children to the child node, becoming grand children of this node.
545                 Iterator<IntObjectMap.PrimitiveEntry<State>> itr = removeAllChildrenExcept(child).entries().iterator();
546                 while (itr.hasNext()) {
547                     child.takeChild(itr, itr.next().value(), false, events);
548                 }
549             }
550         }
551 
552         /**
553          * Removes the child priority and moves any of its dependencies to being direct dependencies on this node.
554          */
555         void removeChild(State child) {
556             if (children.remove(child.streamId) != null) {
557                 List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1 + child.children.size());
558                 events.add(new ParentChangedEvent(child, child.parent));
559                 child.setParent(null);
560 
561                 // Move up any grand children to be directly dependent on this node.
562                 Iterator<IntObjectMap.PrimitiveEntry<State>> itr = child.children.entries().iterator();
563                 while (itr.hasNext()) {
564                     takeChild(itr, itr.next().value(), false, events);
565                 }
566 
567                 notifyParentChanged(events);
568             }
569         }
570 
571         /**
572          * Remove all children with the exception of {@code streamToRetain}.
573          * This method is intended to be used to support an exclusive priority dependency operation.
574          * @return The map of children prior to this operation, excluding {@code streamToRetain} if present.
575          */
576         private IntObjectMap<State> removeAllChildrenExcept(State stateToRetain) {
577             stateToRetain = children.remove(stateToRetain.streamId);
578             IntObjectMap<State> prevChildren = children;
579             // This map should be re-initialized in anticipation for the 1 exclusive child which will be added.
580             // It will either be added directly in this method, or after this method is called...but it will be added.
581             initChildren();
582             if (stateToRetain != null) {
583                 children.put(stateToRetain.streamId, stateToRetain);
584             }
585             return prevChildren;
586         }
587 
588         private void setParent(State newParent) {
589             // if activeCountForTree == 0 then it will not be in its parent's pseudoTimeQueue.
590             if (activeCountForTree != 0 && parent != null) {
591                 parent.removePseudoTimeQueue(this);
592                 parent.activeCountChangeForTree(-activeCountForTree);
593             }
594             parent = newParent;
595             // Use MAX_VALUE if no parent because lower depth is considered higher priority by StateOnlyComparator.
596             dependencyTreeDepth = newParent == null ? MAX_VALUE : newParent.dependencyTreeDepth + 1;
597         }
598 
599         private void initChildrenIfEmpty() {
600             if (children == IntCollections.<State>emptyMap()) {
601                 initChildren();
602             }
603         }
604 
605         private void initChildren() {
606             children = new IntObjectHashMap<State>(INITIAL_CHILDREN_MAP_SIZE);
607         }
608 
609         void write(int numBytes, Writer writer) throws Http2Exception {
610             assert stream != null;
611             try {
612                 writer.write(stream, numBytes);
613             } catch (Throwable t) {
614                 throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
615             }
616         }
617 
618         void activeCountChangeForTree(int increment) {
619             assert activeCountForTree + increment >= 0;
620             activeCountForTree += increment;
621             if (parent != null) {
622                 assert activeCountForTree != increment ||
623                        pseudoTimeQueueIndex == INDEX_NOT_IN_QUEUE ||
624                        parent.pseudoTimeQueue.containsTyped(this) :
625                      "State[" + streamId + "].activeCountForTree changed from 0 to " + increment + " is in a " +
626                      "pseudoTimeQueue, but not in parent[ " + parent.streamId + "]'s pseudoTimeQueue";
627                 if (activeCountForTree == 0) {
628                     parent.removePseudoTimeQueue(this);
629                 } else if (activeCountForTree == increment && !isDistributing()) {
630                     // If frame count was 0 but is now not, and this node is not already in a pseudoTimeQueue (assumed
631                     // to be pState's pseudoTimeQueue) then enqueue it. If this State object is being processed the
632                     // pseudoTime for this node should not be adjusted, and the node will be added back to the
633                     // pseudoTimeQueue/tree structure after it is done being processed. This may happen if the
634                     // activeCountForTree == 0 (a node which can't stream anything and is blocked) is at/near root of
635                     // the tree, and is popped off the pseudoTimeQueue during processing, and then put back on the
636                     // pseudoTimeQueue because a child changes position in the priority tree (or is closed because it is
637                     // not blocked and finished writing all data).
638                     parent.offerAndInitializePseudoTime(this);
639                 }
640                 parent.activeCountChangeForTree(increment);
641             }
642         }
643 
644         void updateStreamableBytes(int newStreamableBytes, boolean isActive) {
645             if (isActive() != isActive) {
646                 if (isActive) {
647                     activeCountChangeForTree(1);
648                     setActive();
649                 } else {
650                     activeCountChangeForTree(-1);
651                     unsetActive();
652                 }
653             }
654 
655             streamableBytes = newStreamableBytes;
656         }
657 
658         /**
659          * Assumes the parents {@link #totalQueuedWeights} includes this node's weight.
660          */
661         void updatePseudoTime(State parentState, int nsent, long totalQueuedWeights) {
662             assert streamId != CONNECTION_STREAM_ID && nsent >= 0;
663             // If the current pseudoTimeToSend is greater than parentState.pseudoTime then we previously over accounted
664             // and should use parentState.pseudoTime.
665             pseudoTimeToWrite = min(pseudoTimeToWrite, parentState.pseudoTime) + nsent * totalQueuedWeights / weight;
666         }
667 
668         /**
669          * The concept of pseudoTime can be influenced by priority tree manipulations or if a stream goes from "active"
670          * to "non-active". This method accounts for that by initializing the {@link #pseudoTimeToWrite} for
671          * {@code state} to {@link #pseudoTime} of this node and then calls {@link #offerPseudoTimeQueue(State)}.
672          */
673         void offerAndInitializePseudoTime(State state) {
674             state.pseudoTimeToWrite = pseudoTime;
675             offerPseudoTimeQueue(state);
676         }
677 
678         void offerPseudoTimeQueue(State state) {
679             pseudoTimeQueue.offer(state);
680             totalQueuedWeights += state.weight;
681         }
682 
683         /**
684          * Must only be called if the pseudoTimeQueue is non-empty!
685          */
686         State pollPseudoTimeQueue() {
687             State state = pseudoTimeQueue.poll();
688             // This method is only ever called if the pseudoTimeQueue is non-empty.
689             totalQueuedWeights -= state.weight;
690             return state;
691         }
692 
693         void removePseudoTimeQueue(State state) {
694             if (pseudoTimeQueue.removeTyped(state)) {
695                 totalQueuedWeights -= state.weight;
696             }
697         }
698 
699         State peekPseudoTimeQueue() {
700             return pseudoTimeQueue.peek();
701         }
702 
703         void close() {
704             updateStreamableBytes(0, false);
705             stream = null;
706         }
707 
708         boolean wasStreamReservedOrActivated() {
709             return (flags & STATE_STREAM_ACTIVATED) != 0;
710         }
711 
712         void setStreamReservedOrActivated() {
713             flags |= STATE_STREAM_ACTIVATED;
714         }
715 
716         boolean isActive() {
717             return (flags & STATE_IS_ACTIVE) != 0;
718         }
719 
720         private void setActive() {
721             flags |= STATE_IS_ACTIVE;
722         }
723 
724         private void unsetActive() {
725             flags &= ~STATE_IS_ACTIVE;
726         }
727 
728         boolean isDistributing() {
729             return (flags & STATE_IS_DISTRIBUTING) != 0;
730         }
731 
732         void setDistributing() {
733             flags |= STATE_IS_DISTRIBUTING;
734         }
735 
736         void unsetDistributing() {
737             flags &= ~STATE_IS_DISTRIBUTING;
738         }
739 
740         @Override
741         public int priorityQueueIndex(DefaultPriorityQueue<?> queue) {
742             return queue == stateOnlyRemovalQueue ? stateOnlyQueueIndex : pseudoTimeQueueIndex;
743         }
744 
745         @Override
746         public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i) {
747             if (queue == stateOnlyRemovalQueue) {
748                 stateOnlyQueueIndex = i;
749             } else {
750                 pseudoTimeQueueIndex = i;
751             }
752         }
753 
754         @Override
755         public String toString() {
756             // Use activeCountForTree as a rough estimate for how many nodes are in this subtree.
757             StringBuilder sb = new StringBuilder(256 * (activeCountForTree > 0 ? activeCountForTree : 1));
758             toString(sb);
759             return sb.toString();
760         }
761 
762         private void toString(StringBuilder sb) {
763             sb.append("{streamId ").append(streamId)
764                     .append(" streamableBytes ").append(streamableBytes)
765                     .append(" activeCountForTree ").append(activeCountForTree)
766                     .append(" pseudoTimeQueueIndex ").append(pseudoTimeQueueIndex)
767                     .append(" pseudoTimeToWrite ").append(pseudoTimeToWrite)
768                     .append(" pseudoTime ").append(pseudoTime)
769                     .append(" flags ").append(flags)
770                     .append(" pseudoTimeQueue.size() ").append(pseudoTimeQueue.size())
771                     .append(" stateOnlyQueueIndex ").append(stateOnlyQueueIndex)
772                     .append(" parent.streamId ").append(parent == null ? -1 : parent.streamId).append("} [");
773 
774             if (!pseudoTimeQueue.isEmpty()) {
775                 for (State s : pseudoTimeQueue) {
776                     s.toString(sb);
777                     sb.append(", ");
778                 }
779                 // Remove the last ", "
780                 sb.setLength(sb.length() - 2);
781             }
782             sb.append(']');
783         }
784     }
785 
786     /**
787      * Allows a correlation to be made between a stream and its old parent before a parent change occurs.
788      */
789     private static final class ParentChangedEvent {
790         final State state;
791         final State oldParent;
792 
793         /**
794          * Create a new instance.
795          * @param state The state who has had a parent change.
796          * @param oldParent The previous parent.
797          */
798         ParentChangedEvent(State state, State oldParent) {
799             this.state = state;
800             this.oldParent = oldParent;
801         }
802     }
803 }