View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.util.collection.IntCollections;
18  import io.netty.util.collection.IntObjectHashMap;
19  import io.netty.util.collection.IntObjectMap;
20  import io.netty.util.internal.DefaultPriorityQueue;
21  import io.netty.util.internal.EmptyPriorityQueue;
22  import io.netty.util.internal.MathUtil;
23  import io.netty.util.internal.PriorityQueue;
24  import io.netty.util.internal.PriorityQueueNode;
25  import io.netty.util.internal.SystemPropertyUtil;
26  
27  import java.io.Serializable;
28  import java.util.ArrayList;
29  import java.util.Comparator;
30  import java.util.Iterator;
31  import java.util.List;
32  
33  import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
34  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MIN_ALLOCATION_CHUNK;
35  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
36  import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes;
37  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
38  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
39  import static io.netty.util.internal.ObjectUtil.checkPositive;
40  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
41  import static java.lang.Integer.MAX_VALUE;
42  import static java.lang.Math.max;
43  import static java.lang.Math.min;
44  
45  /**
46   * A {@link StreamByteDistributor} that is sensitive to stream priority and uses
47   * <a href="https://en.wikipedia.org/wiki/Weighted_fair_queueing">Weighted Fair Queueing</a> approach for distributing
48   * bytes.
49   * <p>
50   * Inspiration for this distributor was taken from Linux's
51   * <a href="https://www.kernel.org/doc/Documentation/scheduler/sched-design-CFS.txt">Completely Fair Scheduler</a>
52   * to model the distribution of bytes to simulate an "ideal multi-tasking CPU", but in this case we are simulating
53   * an "ideal multi-tasking NIC".
54   * <p>
55   * Each write operation will use the {@link #allocationQuantum(int)} to know how many more bytes should be allocated
56   * relative to the next stream which wants to write. This is to balance fairness while also considering goodput.
57   */
58  public final class WeightedFairQueueByteDistributor implements StreamByteDistributor {
59      /**
60       * The initial size of the children map is chosen to be conservative on initial memory allocations under
61       * the assumption that most streams will have a small number of children. This choice may be
62       * sub-optimal if when children are present there are many children (i.e. a web page which has many
63       * dependencies to load).
64       *
65       * Visible only for testing!
66       */
67      static final int INITIAL_CHILDREN_MAP_SIZE =
68              max(1, SystemPropertyUtil.getInt("io.netty.http2.childrenMapSize", 2));
69      /**
70       * FireFox currently uses 5 streams to establish QoS classes.
71       */
72      private static final int DEFAULT_MAX_STATE_ONLY_SIZE = 5;
73  
74      private final Http2Connection.PropertyKey stateKey;
75      /**
76       * If there is no Http2Stream object, but we still persist priority information then this is where the state will
77       * reside.
78       */
79      private final IntObjectMap<State> stateOnlyMap;
80      /**
81       * This queue will hold streams that are not active and provides the capability to retain priority for streams which
82       * have no {@link Http2Stream} object. See {@link StateOnlyComparator} for the priority comparator.
83       */
84      private final PriorityQueue<State> stateOnlyRemovalQueue;
85      private final Http2Connection connection;
86      private final State connectionState;
87      /**
88       * The minimum number of bytes that we will attempt to allocate to a stream. This is to
89       * help improve goodput on a per-stream basis.
90       */
91      private int allocationQuantum = DEFAULT_MIN_ALLOCATION_CHUNK;
92      private final int maxStateOnlySize;
93  
94      public WeightedFairQueueByteDistributor(Http2Connection connection) {
95          this(connection, DEFAULT_MAX_STATE_ONLY_SIZE);
96      }
97  
98      public WeightedFairQueueByteDistributor(Http2Connection connection, int maxStateOnlySize) {
99          checkPositiveOrZero(maxStateOnlySize, "maxStateOnlySize");
100         if (maxStateOnlySize == 0) {
101             stateOnlyMap = IntCollections.emptyMap();
102             stateOnlyRemovalQueue = EmptyPriorityQueue.instance();
103         } else {
104             stateOnlyMap = new IntObjectHashMap<State>(maxStateOnlySize);
105             // +2 because we may exceed the limit by 2 if a new dependency has no associated Http2Stream object. We need
106             // to create the State objects to put them into the dependency tree, which then impacts priority.
107             stateOnlyRemovalQueue = new DefaultPriorityQueue<State>(StateOnlyComparator.INSTANCE, maxStateOnlySize + 2);
108         }
109         this.maxStateOnlySize = maxStateOnlySize;
110 
111         this.connection = connection;
112         stateKey = connection.newKey();
113         final Http2Stream connectionStream = connection.connectionStream();
114         connectionStream.setProperty(stateKey, connectionState = new State(connectionStream, 16));
115 
116         // Register for notification of new streams.
117         connection.addListener(new Http2ConnectionAdapter() {
118             @Override
119             public void onStreamAdded(Http2Stream stream) {
120                 State state = stateOnlyMap.remove(stream.id());
121                 if (state == null) {
122                     state = new State(stream);
123                     // Only the stream which was just added will change parents. So we only need an array of size 1.
124                     List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1);
125                     connectionState.takeChild(state, false, events);
126                     notifyParentChanged(events);
127                 } else {
128                     stateOnlyRemovalQueue.removeTyped(state);
129                     state.stream = stream;
130                 }
131                 switch (stream.state()) {
132                     case RESERVED_REMOTE:
133                     case RESERVED_LOCAL:
134                         state.setStreamReservedOrActivated();
135                         // wasStreamReservedOrActivated is part of the comparator for stateOnlyRemovalQueue there is no
136                         // need to reprioritize here because it will not be in stateOnlyRemovalQueue.
137                         break;
138                     default:
139                         break;
140                 }
141                 stream.setProperty(stateKey, state);
142             }
143 
144             @Override
145             public void onStreamActive(Http2Stream stream) {
146                 state(stream).setStreamReservedOrActivated();
147                 // wasStreamReservedOrActivated is part of the comparator for stateOnlyRemovalQueue there is no need to
148                 // reprioritize here because it will not be in stateOnlyRemovalQueue.
149             }
150 
151             @Override
152             public void onStreamClosed(Http2Stream stream) {
153                 state(stream).close();
154             }
155 
156             @Override
157             public void onStreamRemoved(Http2Stream stream) {
158                 // The stream has been removed from the connection. We can no longer rely on the stream's property
159                 // storage to track the State. If we have room, and the precedence of the stream is sufficient, we
160                 // should retain the State in the stateOnlyMap.
161                 State state = state(stream);
162 
163                 // Typically the stream is set to null when the stream is closed because it is no longer needed to write
164                 // data. However if the stream was not activated it may not be closed (reserved streams) so we ensure
165                 // the stream reference is set to null to avoid retaining a reference longer than necessary.
166                 state.stream = null;
167 
168                 if (WeightedFairQueueByteDistributor.this.maxStateOnlySize == 0) {
169                     state.parent.removeChild(state);
170                     return;
171                 }
172                 if (stateOnlyRemovalQueue.size() == WeightedFairQueueByteDistributor.this.maxStateOnlySize) {
173                     State stateToRemove = stateOnlyRemovalQueue.peek();
174                     if (StateOnlyComparator.INSTANCE.compare(stateToRemove, state) >= 0) {
175                         // The "lowest priority" stream is a "higher priority" than the stream being removed, so we
176                         // just discard the state.
177                         state.parent.removeChild(state);
178                         return;
179                     }
180                     stateOnlyRemovalQueue.poll();
181                     stateToRemove.parent.removeChild(stateToRemove);
182                     stateOnlyMap.remove(stateToRemove.streamId);
183                 }
184                 stateOnlyRemovalQueue.add(state);
185                 stateOnlyMap.put(state.streamId, state);
186             }
187         });
188     }
189 
190     @Override
191     public void updateStreamableBytes(StreamState state) {
192         state(state.stream()).updateStreamableBytes(streamableBytes(state),
193                                                     state.hasFrame() && state.windowSize() >= 0);
194     }
195 
196     @Override
197     public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
198         State state = state(childStreamId);
199         if (state == null) {
200             // If there is no State object that means there is no Http2Stream object and we would have to keep the
201             // State object in the stateOnlyMap and stateOnlyRemovalQueue. However if maxStateOnlySize is 0 this means
202             // stateOnlyMap and stateOnlyRemovalQueue are empty collections and cannot be modified so we drop the State.
203             if (maxStateOnlySize == 0) {
204                 return;
205             }
206             state = new State(childStreamId);
207             stateOnlyRemovalQueue.add(state);
208             stateOnlyMap.put(childStreamId, state);
209         }
210 
211         State newParent = state(parentStreamId);
212         if (newParent == null) {
213             // If there is no State object that means there is no Http2Stream object and we would have to keep the
214             // State object in the stateOnlyMap and stateOnlyRemovalQueue. However if maxStateOnlySize is 0 this means
215             // stateOnlyMap and stateOnlyRemovalQueue are empty collections and cannot be modified so we drop the State.
216             if (maxStateOnlySize == 0) {
217                 return;
218             }
219             newParent = new State(parentStreamId);
220             stateOnlyRemovalQueue.add(newParent);
221             stateOnlyMap.put(parentStreamId, newParent);
222             // Only the stream which was just added will change parents. So we only need an array of size 1.
223             List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1);
224             connectionState.takeChild(newParent, false, events);
225             notifyParentChanged(events);
226         }
227 
228         // if activeCountForTree == 0 then it will not be in its parent's pseudoTimeQueue and thus should not be counted
229         // toward parent.totalQueuedWeights.
230         if (state.activeCountForTree != 0 && state.parent != null) {
231             state.parent.totalQueuedWeights += weight - state.weight;
232         }
233         state.weight = weight;
234 
235         if (newParent != state.parent || exclusive && newParent.children.size() != 1) {
236             final List<ParentChangedEvent> events;
237             if (newParent.isDescendantOf(state)) {
238                 events = new ArrayList<ParentChangedEvent>(2 + (exclusive ? newParent.children.size() : 0));
239                 state.parent.takeChild(newParent, false, events);
240             } else {
241                 events = new ArrayList<ParentChangedEvent>(1 + (exclusive ? newParent.children.size() : 0));
242             }
243             newParent.takeChild(state, exclusive, events);
244             notifyParentChanged(events);
245         }
246 
247         // The location in the dependency tree impacts the priority in the stateOnlyRemovalQueue map. If we created new
248         // State objects we must check if we exceeded the limit after we insert into the dependency tree to ensure the
249         // stateOnlyRemovalQueue has been updated.
250         while (stateOnlyRemovalQueue.size() > maxStateOnlySize) {
251             State stateToRemove = stateOnlyRemovalQueue.poll();
252             stateToRemove.parent.removeChild(stateToRemove);
253             stateOnlyMap.remove(stateToRemove.streamId);
254         }
255     }
256 
257     @Override
258     public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
259         // As long as there is some active frame we should write at least 1 time.
260         if (connectionState.activeCountForTree == 0) {
261             return false;
262         }
263 
264         // The goal is to write until we write all the allocated bytes or are no longer making progress.
265         // We still attempt to write even after the number of allocated bytes has been exhausted to allow empty frames
266         // to be sent. Making progress means the active streams rooted at the connection stream has changed.
267         int oldIsActiveCountForTree;
268         do {
269             oldIsActiveCountForTree = connectionState.activeCountForTree;
270             // connectionState will never be active, so go right to its children.
271             maxBytes -= distributeToChildren(maxBytes, writer, connectionState);
272         } while (connectionState.activeCountForTree != 0 &&
273                 (maxBytes > 0 || oldIsActiveCountForTree != connectionState.activeCountForTree));
274 
275         return connectionState.activeCountForTree != 0;
276     }
277 
278     /**
279      * Sets the amount of bytes that will be allocated to each stream. Defaults to 1KiB.
280      * @param allocationQuantum the amount of bytes that will be allocated to each stream. Must be &gt; 0.
281      */
282     public void allocationQuantum(int allocationQuantum) {
283         checkPositive(allocationQuantum, "allocationQuantum");
284         this.allocationQuantum = allocationQuantum;
285     }
286 
287     private int distribute(int maxBytes, Writer writer, State state) throws Http2Exception {
288         if (state.isActive()) {
289             int nsent = min(maxBytes, state.streamableBytes);
290             state.write(nsent, writer);
291             if (nsent == 0 && maxBytes != 0) {
292                 // If a stream sends zero bytes, then we gave it a chance to write empty frames and it is now
293                 // considered inactive until the next call to updateStreamableBytes. This allows descendant streams to
294                 // be allocated bytes when the parent stream can't utilize them. This may be as a result of the
295                 // stream's flow control window being 0.
296                 state.updateStreamableBytes(state.streamableBytes, false);
297             }
298             return nsent;
299         }
300 
301         return distributeToChildren(maxBytes, writer, state);
302     }
303 
304     /**
305      * It is a pre-condition that {@code state.poll()} returns a non-{@code null} value. This is a result of the way
306      * the allocation algorithm is structured and can be explained in the following cases:
307      * <h3>For the recursive case</h3>
308      * If a stream has no children (in the allocation tree) than that node must be active or it will not be in the
309      * allocation tree. If a node is active then it will not delegate to children and recursion ends.
310      * <h3>For the initial case</h3>
311      * We check connectionState.activeCountForTree == 0 before any allocation is done. So if the connection stream
312      * has no active children we don't get into this method.
313      */
314     private int distributeToChildren(int maxBytes, Writer writer, State state) throws Http2Exception {
315         long oldTotalQueuedWeights = state.totalQueuedWeights;
316         State childState = state.pollPseudoTimeQueue();
317         State nextChildState = state.peekPseudoTimeQueue();
318         childState.setDistributing();
319         try {
320             assert nextChildState == null || nextChildState.pseudoTimeToWrite >= childState.pseudoTimeToWrite :
321                 "nextChildState[" + nextChildState.streamId + "].pseudoTime(" + nextChildState.pseudoTimeToWrite +
322                 ") < " + " childState[" + childState.streamId + "].pseudoTime(" + childState.pseudoTimeToWrite + ')';
323             int nsent = distribute(nextChildState == null ? maxBytes :
324                             min(maxBytes, (int) min((nextChildState.pseudoTimeToWrite - childState.pseudoTimeToWrite) *
325                                                childState.weight / oldTotalQueuedWeights + allocationQuantum, MAX_VALUE)
326                                ),
327                                writer,
328                                childState);
329             state.pseudoTime += nsent;
330             childState.updatePseudoTime(state, nsent, oldTotalQueuedWeights);
331             return nsent;
332         } finally {
333             childState.unsetDistributing();
334             // Do in finally to ensure the internal flags is not corrupted if an exception is thrown.
335             // The offer operation is delayed until we unroll up the recursive stack, so we don't have to remove from
336             // the priority pseudoTimeQueue due to a write operation.
337             if (childState.activeCountForTree != 0) {
338                 state.offerPseudoTimeQueue(childState);
339             }
340         }
341     }
342 
343     private State state(Http2Stream stream) {
344         return stream.getProperty(stateKey);
345     }
346 
347     private State state(int streamId) {
348         Http2Stream stream = connection.stream(streamId);
349         return stream != null ? state(stream) : stateOnlyMap.get(streamId);
350     }
351 
352     /**
353      * For testing only!
354      */
355     boolean isChild(int childId, int parentId, short weight) {
356         State parent = state(parentId);
357         State child;
358         return parent.children.containsKey(childId) &&
359                 (child = state(childId)).parent == parent && child.weight == weight;
360     }
361 
362     /**
363      * For testing only!
364      */
365     int numChildren(int streamId) {
366         State state = state(streamId);
367         return state == null ? 0 : state.children.size();
368     }
369 
370     /**
371      * Notify all listeners of the priority tree change events (in ascending order)
372      * @param events The events (top down order) which have changed
373      */
374     void notifyParentChanged(List<ParentChangedEvent> events) {
375         for (int i = 0; i < events.size(); ++i) {
376             ParentChangedEvent event = events.get(i);
377             stateOnlyRemovalQueue.priorityChanged(event.state);
378             if (event.state.parent != null && event.state.activeCountForTree != 0) {
379                 event.state.parent.offerAndInitializePseudoTime(event.state);
380                 event.state.parent.activeCountChangeForTree(event.state.activeCountForTree);
381             }
382         }
383     }
384 
385     /**
386      * A comparator for {@link State} which has no associated {@link Http2Stream} object. The general precedence is:
387      * <ul>
388      *     <li>Was a stream activated or reserved (streams only used for priority are higher priority)</li>
389      *     <li>Depth in the priority tree (closer to root is higher priority></li>
390      *     <li>Stream ID (higher stream ID is higher priority - used for tie breaker)</li>
391      * </ul>
392      */
393     private static final class StateOnlyComparator implements Comparator<State>, Serializable {
394         private static final long serialVersionUID = -4806936913002105966L;
395 
396         static final StateOnlyComparator INSTANCE = new StateOnlyComparator();
397 
398         @Override
399         public int compare(State o1, State o2) {
400             // "priority only streams" (which have not been activated) are higher priority than streams used for data.
401             boolean o1Actived = o1.wasStreamReservedOrActivated();
402             if (o1Actived != o2.wasStreamReservedOrActivated()) {
403                 return o1Actived ? -1 : 1;
404             }
405             // Numerically greater depth is higher priority.
406             int x = o2.dependencyTreeDepth - o1.dependencyTreeDepth;
407 
408             // I also considered tracking the number of streams which are "activated" (eligible transfer data) at each
409             // subtree. This would require a traversal from each node to the root on dependency tree structural changes,
410             // and then it would require a re-prioritization at each of these nodes (instead of just the nodes where the
411             // direct parent changed). The costs of this are judged to be relatively high compared to the nominal
412             // benefit it provides to the heuristic. Instead folks should just increase maxStateOnlySize.
413 
414             // Last resort is to give larger stream ids more priority.
415             return x != 0 ? x : o1.streamId - o2.streamId;
416         }
417     }
418 
419     private static final class StatePseudoTimeComparator implements Comparator<State>, Serializable {
420         private static final long serialVersionUID = -1437548640227161828L;
421 
422         static final StatePseudoTimeComparator INSTANCE = new StatePseudoTimeComparator();
423 
424         @Override
425         public int compare(State o1, State o2) {
426             return MathUtil.compare(o1.pseudoTimeToWrite, o2.pseudoTimeToWrite);
427         }
428     }
429 
430     /**
431      * The remote flow control state for a single stream.
432      */
433     private final class State implements PriorityQueueNode {
434         private static final byte STATE_IS_ACTIVE = 0x1;
435         private static final byte STATE_IS_DISTRIBUTING = 0x2;
436         private static final byte STATE_STREAM_ACTIVATED = 0x4;
437 
438         /**
439          * Maybe {@code null} if the stream if the stream is not active.
440          */
441         Http2Stream stream;
442         State parent;
443         IntObjectMap<State> children = IntCollections.emptyMap();
444         private final PriorityQueue<State> pseudoTimeQueue;
445         final int streamId;
446         int streamableBytes;
447         int dependencyTreeDepth;
448         /**
449          * Count of nodes rooted at this sub tree with {@link #isActive()} equal to {@code true}.
450          */
451         int activeCountForTree;
452         private int pseudoTimeQueueIndex = INDEX_NOT_IN_QUEUE;
453         private int stateOnlyQueueIndex = INDEX_NOT_IN_QUEUE;
454         /**
455          * An estimate of when this node should be given the opportunity to write data.
456          */
457         long pseudoTimeToWrite;
458         /**
459          * A pseudo time maintained for immediate children to base their {@link #pseudoTimeToWrite} off of.
460          */
461         long pseudoTime;
462         long totalQueuedWeights;
463         private byte flags;
464         short weight = DEFAULT_PRIORITY_WEIGHT;
465 
466         State(int streamId) {
467             this(streamId, null, 0);
468         }
469 
470         State(Http2Stream stream) {
471             this(stream, 0);
472         }
473 
474         State(Http2Stream stream, int initialSize) {
475             this(stream.id(), stream, initialSize);
476         }
477 
478         State(int streamId, Http2Stream stream, int initialSize) {
479             this.stream = stream;
480             this.streamId = streamId;
481             pseudoTimeQueue = new DefaultPriorityQueue<State>(StatePseudoTimeComparator.INSTANCE, initialSize);
482         }
483 
484         boolean isDescendantOf(State state) {
485             State next = parent;
486             while (next != null) {
487                 if (next == state) {
488                     return true;
489                 }
490                 next = next.parent;
491             }
492             return false;
493         }
494 
495         void takeChild(State child, boolean exclusive, List<ParentChangedEvent> events) {
496             takeChild(null, child, exclusive, events);
497         }
498 
499         /**
500          * Adds a child to this priority. If exclusive is set, any children of this node are moved to being dependent on
501          * the child.
502          */
503         void takeChild(Iterator<IntObjectMap.PrimitiveEntry<State>> childItr, State child, boolean exclusive,
504                        List<ParentChangedEvent> events) {
505             State oldParent = child.parent;
506 
507             if (oldParent != this) {
508                 events.add(new ParentChangedEvent(child, oldParent));
509                 child.setParent(this);
510                 // If the childItr is not null we are iterating over the oldParent.children collection and should
511                 // use the iterator to remove from the collection to avoid concurrent modification. Otherwise it is
512                 // assumed we are not iterating over this collection and it is safe to call remove directly.
513                 if (childItr != null) {
514                     childItr.remove();
515                 } else if (oldParent != null) {
516                     oldParent.children.remove(child.streamId);
517                 }
518 
519                 // Lazily initialize the children to save object allocations.
520                 initChildrenIfEmpty();
521 
522                 final State oldChild = children.put(child.streamId, child);
523                 assert oldChild == null : "A stream with the same stream ID was already in the child map.";
524             }
525 
526             if (exclusive && !children.isEmpty()) {
527                 // If it was requested that this child be the exclusive dependency of this node,
528                 // move any previous children to the child node, becoming grand children of this node.
529                 Iterator<IntObjectMap.PrimitiveEntry<State>> itr = removeAllChildrenExcept(child).entries().iterator();
530                 while (itr.hasNext()) {
531                     child.takeChild(itr, itr.next().value(), false, events);
532                 }
533             }
534         }
535 
536         /**
537          * Removes the child priority and moves any of its dependencies to being direct dependencies on this node.
538          */
539         void removeChild(State child) {
540             if (children.remove(child.streamId) != null) {
541                 List<ParentChangedEvent> events = new ArrayList<ParentChangedEvent>(1 + child.children.size());
542                 events.add(new ParentChangedEvent(child, child.parent));
543                 child.setParent(null);
544 
545                 if (!child.children.isEmpty()) {
546                     // Move up any grand children to be directly dependent on this node.
547                     Iterator<IntObjectMap.PrimitiveEntry<State>> itr = child.children.entries().iterator();
548                     long totalWeight = child.getTotalWeight();
549                     do {
550                         // Redistribute the weight of child to its dependency proportionally.
551                         State dependency = itr.next().value();
552                         dependency.weight = (short) max(1, dependency.weight * child.weight / totalWeight);
553                         takeChild(itr, dependency, false, events);
554                     } while (itr.hasNext());
555                 }
556 
557                 notifyParentChanged(events);
558             }
559         }
560 
561         private long getTotalWeight() {
562             long totalWeight = 0L;
563             for (State state : children.values()) {
564                 totalWeight += state.weight;
565             }
566             return totalWeight;
567         }
568 
569         /**
570          * Remove all children with the exception of {@code streamToRetain}.
571          * This method is intended to be used to support an exclusive priority dependency operation.
572          * @return The map of children prior to this operation, excluding {@code streamToRetain} if present.
573          */
574         private IntObjectMap<State> removeAllChildrenExcept(State stateToRetain) {
575             stateToRetain = children.remove(stateToRetain.streamId);
576             IntObjectMap<State> prevChildren = children;
577             // This map should be re-initialized in anticipation for the 1 exclusive child which will be added.
578             // It will either be added directly in this method, or after this method is called...but it will be added.
579             initChildren();
580             if (stateToRetain != null) {
581                 children.put(stateToRetain.streamId, stateToRetain);
582             }
583             return prevChildren;
584         }
585 
586         private void setParent(State newParent) {
587             // if activeCountForTree == 0 then it will not be in its parent's pseudoTimeQueue.
588             if (activeCountForTree != 0 && parent != null) {
589                 parent.removePseudoTimeQueue(this);
590                 parent.activeCountChangeForTree(-activeCountForTree);
591             }
592             parent = newParent;
593             // Use MAX_VALUE if no parent because lower depth is considered higher priority by StateOnlyComparator.
594             dependencyTreeDepth = newParent == null ? MAX_VALUE : newParent.dependencyTreeDepth + 1;
595         }
596 
597         private void initChildrenIfEmpty() {
598             if (children == IntCollections.<State>emptyMap()) {
599                 initChildren();
600             }
601         }
602 
603         private void initChildren() {
604             children = new IntObjectHashMap<State>(INITIAL_CHILDREN_MAP_SIZE);
605         }
606 
607         void write(int numBytes, Writer writer) throws Http2Exception {
608             assert stream != null;
609             try {
610                 writer.write(stream, numBytes);
611             } catch (Throwable t) {
612                 throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
613             }
614         }
615 
616         void activeCountChangeForTree(int increment) {
617             assert activeCountForTree + increment >= 0;
618             activeCountForTree += increment;
619             if (parent != null) {
620                 assert activeCountForTree != increment ||
621                        pseudoTimeQueueIndex == INDEX_NOT_IN_QUEUE ||
622                        parent.pseudoTimeQueue.containsTyped(this) :
623                      "State[" + streamId + "].activeCountForTree changed from 0 to " + increment + " is in a " +
624                      "pseudoTimeQueue, but not in parent[ " + parent.streamId + "]'s pseudoTimeQueue";
625                 if (activeCountForTree == 0) {
626                     parent.removePseudoTimeQueue(this);
627                 } else if (activeCountForTree == increment && !isDistributing()) {
628                     // If frame count was 0 but is now not, and this node is not already in a pseudoTimeQueue (assumed
629                     // to be pState's pseudoTimeQueue) then enqueue it. If this State object is being processed the
630                     // pseudoTime for this node should not be adjusted, and the node will be added back to the
631                     // pseudoTimeQueue/tree structure after it is done being processed. This may happen if the
632                     // activeCountForTree == 0 (a node which can't stream anything and is blocked) is at/near root of
633                     // the tree, and is popped off the pseudoTimeQueue during processing, and then put back on the
634                     // pseudoTimeQueue because a child changes position in the priority tree (or is closed because it is
635                     // not blocked and finished writing all data).
636                     parent.offerAndInitializePseudoTime(this);
637                 }
638                 parent.activeCountChangeForTree(increment);
639             }
640         }
641 
642         void updateStreamableBytes(int newStreamableBytes, boolean isActive) {
643             if (isActive() != isActive) {
644                 if (isActive) {
645                     activeCountChangeForTree(1);
646                     setActive();
647                 } else {
648                     activeCountChangeForTree(-1);
649                     unsetActive();
650                 }
651             }
652 
653             streamableBytes = newStreamableBytes;
654         }
655 
656         /**
657          * Assumes the parents {@link #totalQueuedWeights} includes this node's weight.
658          */
659         void updatePseudoTime(State parentState, int nsent, long totalQueuedWeights) {
660             assert streamId != CONNECTION_STREAM_ID && nsent >= 0;
661             // If the current pseudoTimeToSend is greater than parentState.pseudoTime then we previously over accounted
662             // and should use parentState.pseudoTime.
663             pseudoTimeToWrite = min(pseudoTimeToWrite, parentState.pseudoTime) + nsent * totalQueuedWeights / weight;
664         }
665 
666         /**
667          * The concept of pseudoTime can be influenced by priority tree manipulations or if a stream goes from "active"
668          * to "non-active". This method accounts for that by initializing the {@link #pseudoTimeToWrite} for
669          * {@code state} to {@link #pseudoTime} of this node and then calls {@link #offerPseudoTimeQueue(State)}.
670          */
671         void offerAndInitializePseudoTime(State state) {
672             state.pseudoTimeToWrite = pseudoTime;
673             offerPseudoTimeQueue(state);
674         }
675 
676         void offerPseudoTimeQueue(State state) {
677             pseudoTimeQueue.offer(state);
678             totalQueuedWeights += state.weight;
679         }
680 
681         /**
682          * Must only be called if the pseudoTimeQueue is non-empty!
683          */
684         State pollPseudoTimeQueue() {
685             State state = pseudoTimeQueue.poll();
686             // This method is only ever called if the pseudoTimeQueue is non-empty.
687             totalQueuedWeights -= state.weight;
688             return state;
689         }
690 
691         void removePseudoTimeQueue(State state) {
692             if (pseudoTimeQueue.removeTyped(state)) {
693                 totalQueuedWeights -= state.weight;
694             }
695         }
696 
697         State peekPseudoTimeQueue() {
698             return pseudoTimeQueue.peek();
699         }
700 
701         void close() {
702             updateStreamableBytes(0, false);
703             stream = null;
704         }
705 
706         boolean wasStreamReservedOrActivated() {
707             return (flags & STATE_STREAM_ACTIVATED) != 0;
708         }
709 
710         void setStreamReservedOrActivated() {
711             flags |= STATE_STREAM_ACTIVATED;
712         }
713 
714         boolean isActive() {
715             return (flags & STATE_IS_ACTIVE) != 0;
716         }
717 
718         private void setActive() {
719             flags |= STATE_IS_ACTIVE;
720         }
721 
722         private void unsetActive() {
723             flags &= ~STATE_IS_ACTIVE;
724         }
725 
726         boolean isDistributing() {
727             return (flags & STATE_IS_DISTRIBUTING) != 0;
728         }
729 
730         void setDistributing() {
731             flags |= STATE_IS_DISTRIBUTING;
732         }
733 
734         void unsetDistributing() {
735             flags &= ~STATE_IS_DISTRIBUTING;
736         }
737 
738         @Override
739         public int priorityQueueIndex(DefaultPriorityQueue<?> queue) {
740             return queue == stateOnlyRemovalQueue ? stateOnlyQueueIndex : pseudoTimeQueueIndex;
741         }
742 
743         @Override
744         public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i) {
745             if (queue == stateOnlyRemovalQueue) {
746                 stateOnlyQueueIndex = i;
747             } else {
748                 pseudoTimeQueueIndex = i;
749             }
750         }
751 
752         @Override
753         public String toString() {
754             // Use activeCountForTree as a rough estimate for how many nodes are in this subtree.
755             StringBuilder sb = new StringBuilder(256 * (activeCountForTree > 0 ? activeCountForTree : 1));
756             toString(sb);
757             return sb.toString();
758         }
759 
760         private void toString(StringBuilder sb) {
761             sb.append("{streamId ").append(streamId)
762                     .append(" streamableBytes ").append(streamableBytes)
763                     .append(" activeCountForTree ").append(activeCountForTree)
764                     .append(" pseudoTimeQueueIndex ").append(pseudoTimeQueueIndex)
765                     .append(" pseudoTimeToWrite ").append(pseudoTimeToWrite)
766                     .append(" pseudoTime ").append(pseudoTime)
767                     .append(" flags ").append(flags)
768                     .append(" pseudoTimeQueue.size() ").append(pseudoTimeQueue.size())
769                     .append(" stateOnlyQueueIndex ").append(stateOnlyQueueIndex)
770                     .append(" parent.streamId ").append(parent == null ? -1 : parent.streamId).append("} [");
771 
772             if (!pseudoTimeQueue.isEmpty()) {
773                 for (State s : pseudoTimeQueue) {
774                     s.toString(sb);
775                     sb.append(", ");
776                 }
777                 // Remove the last ", "
778                 sb.setLength(sb.length() - 2);
779             }
780             sb.append(']');
781         }
782     }
783 
784     /**
785      * Allows a correlation to be made between a stream and its old parent before a parent change occurs.
786      */
787     private static final class ParentChangedEvent {
788         final State state;
789         final State oldParent;
790 
791         /**
792          * Create a new instance.
793          * @param state The state who has had a parent change.
794          * @param oldParent The previous parent.
795          */
796         ParentChangedEvent(State state, State oldParent) {
797             this.state = state;
798             this.oldParent = oldParent;
799         }
800     }
801 }