View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
18  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
19  import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
20  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
21  import static io.netty.handler.codec.http2.Http2Exception.streamError;
22  import static io.netty.util.internal.ObjectUtil.checkNotNull;
23  import static java.lang.Math.max;
24  import static java.lang.Math.min;
25  import io.netty.channel.ChannelHandlerContext;
26  
27  import java.util.ArrayDeque;
28  import java.util.Arrays;
29  import java.util.Comparator;
30  import java.util.Queue;
31  
32  /**
33   * Basic implementation of {@link Http2RemoteFlowController}.
34   */
35  public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController {
36  
37      /**
38       * A {@link Comparator} that sorts streams in ascending order the amount of streamable data.
39       */
40      private static final Comparator<Http2Stream> WEIGHT_ORDER = new Comparator<Http2Stream>() {
41          @Override
42          public int compare(Http2Stream o1, Http2Stream o2) {
43              return o2.weight() - o1.weight();
44          }
45      };
46  
47      private final Http2Connection connection;
48      private int initialWindowSize = DEFAULT_WINDOW_SIZE;
49      private ChannelHandlerContext ctx;
50      private boolean needFlush;
51  
52      public DefaultHttp2RemoteFlowController(Http2Connection connection) {
53          this.connection = checkNotNull(connection, "connection");
54  
55          // Add a flow state for the connection.
56          connection.connectionStream().setProperty(FlowState.class,
57                  new FlowState(connection.connectionStream(), initialWindowSize));
58  
59          // Register for notification of new streams.
60          connection.addListener(new Http2ConnectionAdapter() {
61              @Override
62              public void streamAdded(Http2Stream stream) {
63                  // Just add a new flow state to the stream.
64                  stream.setProperty(FlowState.class, new FlowState(stream, 0));
65              }
66  
67              @Override
68              public void streamActive(Http2Stream stream) {
69                  // Need to be sure the stream's initial window is adjusted for SETTINGS
70                  // frames which may have been exchanged while it was in IDLE
71                  state(stream).window(initialWindowSize);
72              }
73  
74              @Override
75              public void streamInactive(Http2Stream stream) {
76                  // Any pending frames can never be written, clear and
77                  // write errors for any pending frames.
78                  state(stream).clear();
79              }
80  
81              @Override
82              public void priorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) {
83                  Http2Stream parent = stream.parent();
84                  if (parent != null) {
85                      int delta = state(stream).streamableBytesForTree();
86                      if (delta != 0) {
87                          state(parent).incrementStreamableBytesForTree(delta);
88                      }
89                  }
90              }
91  
92              @Override
93              public void priorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) {
94                  Http2Stream parent = stream.parent();
95                  if (parent != null) {
96                      int delta = -state(stream).streamableBytesForTree();
97                      if (delta != 0) {
98                          state(parent).incrementStreamableBytesForTree(delta);
99                      }
100                 }
101             }
102         });
103     }
104 
105     @Override
106     public void initialWindowSize(int newWindowSize) throws Http2Exception {
107         if (newWindowSize < 0) {
108             throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize);
109         }
110 
111         int delta = newWindowSize - initialWindowSize;
112         initialWindowSize = newWindowSize;
113         for (Http2Stream stream : connection.activeStreams()) {
114             // Verify that the maximum value is not exceeded by this change.
115             state(stream).incrementStreamWindow(delta);
116         }
117 
118         if (delta > 0) {
119             // The window size increased, send any pending frames for all streams.
120             writePendingBytes();
121         }
122     }
123 
124     @Override
125     public int initialWindowSize() {
126         return initialWindowSize;
127     }
128 
129     @Override
130     public int windowSize(Http2Stream stream) {
131         return state(stream).window();
132     }
133 
134     @Override
135     public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception {
136         if (stream.id() == CONNECTION_STREAM_ID) {
137             // Update the connection window and write any pending frames for all streams.
138             connectionState().incrementStreamWindow(delta);
139             writePendingBytes();
140         } else {
141             // Update the stream window and write any pending frames for the stream.
142             FlowState state = state(stream);
143             state.incrementStreamWindow(delta);
144             state.writeBytes(state.writableWindow());
145             flush();
146         }
147     }
148 
149     @Override
150     public void sendFlowControlled(ChannelHandlerContext ctx, Http2Stream stream,
151                                    FlowControlled payload) {
152         checkNotNull(ctx, "ctx");
153         checkNotNull(payload, "payload");
154         if (this.ctx != null && this.ctx != ctx) {
155             throw new IllegalArgumentException("Writing data from multiple ChannelHandlerContexts is not supported");
156         }
157         // Save the context. We'll use this later when we write pending bytes.
158         this.ctx = ctx;
159         try {
160             FlowState state = state(stream);
161             state.newFrame(payload);
162             state.writeBytes(state.writableWindow());
163             flush();
164         } catch (Throwable e) {
165             payload.error(e);
166         }
167     }
168 
169     /**
170      * For testing purposes only. Exposes the number of streamable bytes for the tree rooted at
171      * the given stream.
172      */
173     int streamableBytesForTree(Http2Stream stream) {
174         return state(stream).streamableBytesForTree();
175     }
176 
177     private static FlowState state(Http2Stream stream) {
178         checkNotNull(stream, "stream");
179         return stream.getProperty(FlowState.class);
180     }
181 
182     private FlowState connectionState() {
183         return state(connection.connectionStream());
184     }
185 
186     /**
187      * Returns the flow control window for the entire connection.
188      */
189     private int connectionWindow() {
190         return connectionState().window();
191     }
192 
193     /**
194      * Flushes the {@link ChannelHandlerContext} if we've received any data frames.
195      */
196     private void flush() {
197         if (needFlush) {
198             ctx.flush();
199             needFlush = false;
200         }
201     }
202 
203     /**
204      * Writes as many pending bytes as possible, according to stream priority.
205      */
206     private void writePendingBytes() {
207         Http2Stream connectionStream = connection.connectionStream();
208         int connectionWindow = state(connectionStream).window();
209 
210         if (connectionWindow > 0) {
211             writeChildren(connectionStream, connectionWindow);
212             for (Http2Stream stream : connection.activeStreams()) {
213                 writeChildNode(state(stream));
214             }
215             flush();
216         }
217     }
218 
219     /**
220      * Write the children of {@code parent} in the priority tree. This will allocate bytes by stream weight.
221      * @param parent The parent of the nodes which will be written.
222      * @param connectionWindow The connection window this is available for use at this point in the tree.
223      * @return An object summarizing the write and allocation results.
224      */
225     private int writeChildren(Http2Stream parent, int connectionWindow) {
226         FlowState state = state(parent);
227         if (state.streamableBytesForTree() <= 0) {
228             return 0;
229         }
230         int bytesAllocated = 0;
231 
232         // If the number of streamable bytes for this tree will fit in the connection window
233         // then there is no need to prioritize the bytes...everyone sends what they have
234         if (state.streamableBytesForTree() <= connectionWindow) {
235             for (Http2Stream child : parent.children()) {
236                 state = state(child);
237                 int bytesForChild = state.streamableBytes();
238 
239                 if (bytesForChild > 0 || state.hasFrame()) {
240                     state.allocate(bytesForChild);
241                     writeChildNode(state);
242                     bytesAllocated += bytesForChild;
243                     connectionWindow -= bytesForChild;
244                 }
245                 int childBytesAllocated = writeChildren(child, connectionWindow);
246                 bytesAllocated += childBytesAllocated;
247                 connectionWindow -= childBytesAllocated;
248             }
249             return bytesAllocated;
250         }
251 
252         // This is the priority algorithm which will divide the available bytes based
253         // upon stream weight relative to its peers
254         Http2Stream[] children = parent.children().toArray(new Http2Stream[parent.numChildren()]);
255         Arrays.sort(children, WEIGHT_ORDER);
256         int totalWeight = parent.totalChildWeights();
257         for (int tail = children.length; tail > 0;) {
258             int head = 0;
259             int nextTail = 0;
260             int nextTotalWeight = 0;
261             int nextConnectionWindow = connectionWindow;
262             for (; head < tail && nextConnectionWindow > 0; ++head) {
263                 Http2Stream child = children[head];
264                 state = state(child);
265                 int weight = child.weight();
266                 double weightRatio = weight / (double) totalWeight;
267 
268                 int bytesForTree = Math.min(nextConnectionWindow, (int) Math.ceil(connectionWindow * weightRatio));
269                 int bytesForChild = Math.min(state.streamableBytes(), bytesForTree);
270 
271                 if (bytesForChild > 0 || state.hasFrame()) {
272                     state.allocate(bytesForChild);
273                     bytesAllocated += bytesForChild;
274                     nextConnectionWindow -= bytesForChild;
275                     bytesForTree -= bytesForChild;
276                     // If this subtree still wants to send then re-insert into children list and re-consider for next
277                     // iteration. This is needed because we don't yet know if all the peers will be able to use
278                     // all of their "fair share" of the connection window, and if they don't use it then we should
279                     // divide their unused shared up for the peers who still want to send.
280                     if (state.streamableBytesForTree() - bytesForChild > 0) {
281                         children[nextTail++] = child;
282                         nextTotalWeight += weight;
283                     }
284                     if (state.streamableBytes() - bytesForChild == 0) {
285                         writeChildNode(state);
286                     }
287                 }
288 
289                 if (bytesForTree > 0) {
290                     int childBytesAllocated = writeChildren(child, bytesForTree);
291                     bytesAllocated += childBytesAllocated;
292                     nextConnectionWindow -= childBytesAllocated;
293                 }
294             }
295             connectionWindow = nextConnectionWindow;
296             totalWeight = nextTotalWeight;
297             tail = nextTail;
298         }
299 
300         return bytesAllocated;
301     }
302 
303     /**
304      * Write bytes allocated to {@code state}
305      */
306     private static void writeChildNode(FlowState state) {
307         state.writeBytes(state.allocated());
308         state.resetAllocated();
309     }
310 
311     /**
312      * The outbound flow control state for a single stream.
313      */
314     final class FlowState {
315         private final Queue<Frame> pendingWriteQueue;
316         private final Http2Stream stream;
317         private int window;
318         private int pendingBytes;
319         private int streamableBytesForTree;
320         private int allocated;
321 
322         FlowState(Http2Stream stream, int initialWindowSize) {
323             this.stream = stream;
324             window(initialWindowSize);
325             pendingWriteQueue = new ArrayDeque<Frame>(2);
326         }
327 
328         int window() {
329             return window;
330         }
331 
332         void window(int initialWindowSize) {
333             window = initialWindowSize;
334         }
335 
336         /**
337          * Increment the number of bytes allocated to this stream by the priority algorithm
338          */
339         void allocate(int bytes) {
340             allocated += bytes;
341         }
342 
343         /**
344          * Gets the number of bytes that have been allocated to this stream by the priority algorithm.
345          */
346         int allocated() {
347             return allocated;
348         }
349 
350         /**
351          * Reset the number of bytes that have been allocated to this stream by the priority algorithm.
352          */
353         void resetAllocated() {
354             allocated = 0;
355         }
356 
357         /**
358          * Increments the flow control window for this stream by the given delta and returns the new value.
359          */
360         int incrementStreamWindow(int delta) throws Http2Exception {
361             if (delta > 0 && Integer.MAX_VALUE - delta < window) {
362                 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
363                         "Window size overflow for stream: %d", stream.id());
364             }
365             int previouslyStreamable = streamableBytes();
366             window += delta;
367 
368             // Update this branch of the priority tree if the streamable bytes have changed for this node.
369             int streamableDelta = streamableBytes() - previouslyStreamable;
370             if (streamableDelta != 0) {
371                 incrementStreamableBytesForTree(streamableDelta);
372             }
373             return window;
374         }
375 
376         /**
377          * Returns the maximum writable window (minimum of the stream and connection windows).
378          */
379         int writableWindow() {
380             return min(window, connectionWindow());
381         }
382 
383         /**
384          * Returns the number of pending bytes for this node that will fit within the
385          * {@link #window}. This is used for the priority algorithm to determine the aggregate
386          * number of bytes that can be written at each node. Each node only takes into account its
387          * stream window so that when a change occurs to the connection window, these values need
388          * not change (i.e. no tree traversal is required).
389          */
390         int streamableBytes() {
391             return max(0, min(pendingBytes, window));
392         }
393 
394         int streamableBytesForTree() {
395             return streamableBytesForTree;
396         }
397 
398         /**
399          * Creates a new payload with the given values and immediately enqueues it.
400          */
401         Frame newFrame(FlowControlled payload) {
402             // Store this as the future for the most recent write attempt.
403             Frame frame = new Frame(payload);
404             pendingWriteQueue.offer(frame);
405             return frame;
406         }
407 
408         /**
409          * Indicates whether or not there are frames in the pending queue.
410          */
411         boolean hasFrame() {
412             return !pendingWriteQueue.isEmpty();
413         }
414 
415         /**
416          * Returns the the head of the pending queue, or {@code null} if empty.
417          */
418         Frame peek() {
419             return pendingWriteQueue.peek();
420         }
421 
422         /**
423          * Clears the pending queue and writes errors for each remaining frame.
424          */
425         void clear() {
426             for (;;) {
427                 Frame frame = pendingWriteQueue.poll();
428                 if (frame == null) {
429                     break;
430                 }
431                 frame.writeError(streamError(stream.id(), INTERNAL_ERROR,
432                         "Stream closed before write could take place"));
433             }
434         }
435 
436         /**
437          * Writes up to the number of bytes from the pending queue. May write less if limited by the writable window, by
438          * the number of pending writes available, or because a frame does not support splitting on arbitrary
439          * boundaries.
440          */
441         int writeBytes(int bytes) {
442             int bytesAttempted = 0;
443             while (hasFrame()) {
444                 int maxBytes = min(bytes - bytesAttempted, writableWindow());
445                 bytesAttempted += peek().write(maxBytes);
446                 if (bytes - bytesAttempted <= 0) {
447                   break;
448                 }
449             }
450             return bytesAttempted;
451         }
452 
453         /**
454          * Recursively increments the streamable bytes for this branch in the priority tree starting at the current
455          * node.
456          */
457         void incrementStreamableBytesForTree(int numBytes) {
458             streamableBytesForTree += numBytes;
459             if (!stream.isRoot()) {
460                 state(stream.parent()).incrementStreamableBytesForTree(numBytes);
461             }
462         }
463 
464         /**
465          * A wrapper class around the content of a data frame.
466          */
467         private final class Frame {
468             final FlowControlled payload;
469 
470             Frame(FlowControlled payload) {
471                 this.payload = payload;
472                 // Increment the number of pending bytes for this stream.
473                 incrementPendingBytes(payload.size());
474             }
475 
476             /**
477              * Increments the number of pending bytes for this node. If there was any change to the number of bytes that
478              * fit into the stream window, then {@link #incrementStreamableBytesForTree} to recursively update this
479              * branch of the priority tree.
480              */
481             private void incrementPendingBytes(int numBytes) {
482                 int previouslyStreamable = streamableBytes();
483                 pendingBytes += numBytes;
484 
485                 int delta = streamableBytes() - previouslyStreamable;
486                 if (delta != 0) {
487                     incrementStreamableBytesForTree(delta);
488                 }
489             }
490 
491             /**
492              * Writes the frame and decrements the stream and connection window sizes. If the frame is in the pending
493              * queue, the written bytes are removed from this branch of the priority tree.
494              * <p>
495              * Note: this does not flush the {@link ChannelHandlerContext}.
496              */
497             int write(int allowedBytes) {
498                 int before = payload.size();
499                 needFlush |= payload.write(Math.max(0, allowedBytes));
500                 int writtenBytes = before - payload.size();
501                 try {
502                     connectionState().incrementStreamWindow(-writtenBytes);
503                     incrementStreamWindow(-writtenBytes);
504                 } catch (Http2Exception e) { // Should never get here since we're decrementing.
505                     throw new RuntimeException("Invalid window state when writing frame: " + e.getMessage(), e);
506                 }
507                 decrementPendingBytes(writtenBytes);
508                 if (payload.size() == 0) {
509                     pendingWriteQueue.remove();
510                 }
511                 return writtenBytes;
512             }
513 
514             /**
515              * Discards this frame, writing an error. If this frame is in the pending queue, the unwritten bytes are
516              * removed from this branch of the priority tree.
517              */
518             void writeError(Http2Exception cause) {
519                 decrementPendingBytes(payload.size());
520                 payload.error(cause);
521             }
522 
523             /**
524              * If this frame is in the pending queue, decrements the number of pending bytes for the stream.
525              */
526             void decrementPendingBytes(int bytes) {
527                 incrementPendingBytes(-bytes);
528             }
529         }
530     }
531 }