View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  
16  package io.netty.handler.codec.http2;
17  
18  import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
19  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
20  import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
21  import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
22  import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
23  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
24  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
25  import static io.netty.handler.codec.http2.Http2Exception.streamError;
26  import static io.netty.util.internal.ObjectUtil.checkNotNull;
27  import static java.lang.Math.max;
28  import static java.lang.Math.min;
29  import io.netty.buffer.ByteBuf;
30  import io.netty.channel.ChannelHandlerContext;
31  import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
32  import io.netty.handler.codec.http2.Http2Exception.StreamException;
33  
34  /**
35   * Basic implementation of {@link Http2LocalFlowController}.
36   */
37  public class DefaultHttp2LocalFlowController implements Http2LocalFlowController {
38      private static final int DEFAULT_COMPOSITE_EXCEPTION_SIZE = 4;
39      /**
40       * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
41       * is sent to expand the window.
42       */
43      public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
44  
45      private final Http2Connection connection;
46      private final Http2FrameWriter frameWriter;
47      private volatile float windowUpdateRatio;
48      private volatile int initialWindowSize = DEFAULT_WINDOW_SIZE;
49  
50      public DefaultHttp2LocalFlowController(Http2Connection connection, Http2FrameWriter frameWriter) {
51          this(connection, frameWriter, DEFAULT_WINDOW_UPDATE_RATIO);
52      }
53  
54      public DefaultHttp2LocalFlowController(Http2Connection connection,
55              Http2FrameWriter frameWriter, float windowUpdateRatio) {
56          this.connection = checkNotNull(connection, "connection");
57          this.frameWriter = checkNotNull(frameWriter, "frameWriter");
58          windowUpdateRatio(windowUpdateRatio);
59  
60          // Add a flow state for the connection.
61          final Http2Stream connectionStream = connection.connectionStream();
62          connectionStream.setProperty(FlowState.class, new FlowState(connectionStream, initialWindowSize));
63  
64          // Register for notification of new streams.
65          connection.addListener(new Http2ConnectionAdapter() {
66              @Override
67              public void streamAdded(Http2Stream stream) {
68                  stream.setProperty(FlowState.class, new FlowState(stream, 0));
69              }
70  
71              @Override
72              public void streamActive(Http2Stream stream) {
73                  // Need to be sure the stream's initial window is adjusted for SETTINGS
74                  // frames which may have been exchanged while it was in IDLE
75                  state(stream).window(initialWindowSize);
76              }
77          });
78      }
79  
80      @Override
81      public void initialWindowSize(int newWindowSize) throws Http2Exception {
82          int delta = newWindowSize - initialWindowSize;
83          initialWindowSize = newWindowSize;
84  
85          CompositeStreamException compositeException = null;
86          for (Http2Stream stream : connection.activeStreams()) {
87              try {
88                  // Increment flow control window first so state will be consistent if overflow is detected
89                  FlowState state = state(stream);
90                  state.incrementFlowControlWindows(delta);
91                  state.incrementInitialStreamWindow(delta);
92              } catch (StreamException e) {
93                  if (compositeException == null) {
94                      compositeException = new CompositeStreamException(e.error(), DEFAULT_COMPOSITE_EXCEPTION_SIZE);
95                  }
96                  compositeException.add(e);
97              }
98          }
99          if (compositeException != null) {
100             throw compositeException;
101         }
102     }
103 
104     @Override
105     public int initialWindowSize() {
106         return initialWindowSize;
107     }
108 
109     @Override
110     public int windowSize(Http2Stream stream) {
111         return state(stream).window();
112     }
113 
114     @Override
115     public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, int delta) throws Http2Exception {
116         checkNotNull(ctx, "ctx");
117         FlowState state = state(stream);
118         // Just add the delta to the stream-specific initial window size so that the next time the window
119         // expands it will grow to the new initial size.
120         state.incrementInitialStreamWindow(delta);
121         state.writeWindowUpdateIfNeeded(ctx);
122     }
123 
124     @Override
125     public void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes)
126             throws Http2Exception {
127         state(stream).consumeBytes(ctx, numBytes);
128     }
129 
130     @Override
131     public int unconsumedBytes(Http2Stream stream) {
132         return state(stream).unconsumedBytes();
133     }
134 
135     private static void checkValidRatio(float ratio) {
136         if (Double.compare(ratio, 0.0) <= 0 || Double.compare(ratio, 1.0) >= 0) {
137             throw new IllegalArgumentException("Invalid ratio: " + ratio);
138         }
139     }
140 
141     /**
142      * The window update ratio is used to determine when a window update must be sent. If the ratio
143      * of bytes processed since the last update has meet or exceeded this ratio then a window update will
144      * be sent. This is the global window update ratio that will be used for new streams.
145      * @param ratio the ratio to use when checking if a {@code WINDOW_UPDATE} is determined necessary for new streams.
146      * @throws IllegalArgumentException If the ratio is out of bounds (0, 1).
147      */
148     public void windowUpdateRatio(float ratio) {
149         checkValidRatio(ratio);
150         windowUpdateRatio = ratio;
151     }
152 
153     /**
154      * The window update ratio is used to determine when a window update must be sent. If the ratio
155      * of bytes processed since the last update has meet or exceeded this ratio then a window update will
156      * be sent. This is the global window update ratio that will be used for new streams.
157      */
158     public float windowUpdateRatio() {
159         return windowUpdateRatio;
160     }
161 
162     /**
163      * The window update ratio is used to determine when a window update must be sent. If the ratio
164      * of bytes processed since the last update has meet or exceeded this ratio then a window update will
165      * be sent. This window update ratio will only be applied to {@code streamId}.
166      * <p>
167      * Note it is the responsibly of the caller to ensure that the the
168      * initial {@code SETTINGS} frame is sent before this is called. It would
169      * be considered a {@link Http2Error#PROTOCOL_ERROR} if a {@code WINDOW_UPDATE}
170      * was generated by this method before the initial {@code SETTINGS} frame is sent.
171      * @param ctx the context to use if a {@code WINDOW_UPDATE} is determined necessary.
172      * @param stream the stream for which {@code ratio} applies to.
173      * @param ratio the ratio to use when checking if a {@code WINDOW_UPDATE} is determined necessary.
174      * @throws Http2Exception If a protocol-error occurs while generating {@code WINDOW_UPDATE} frames
175      */
176     public void windowUpdateRatio(ChannelHandlerContext ctx, Http2Stream stream, float ratio) throws Http2Exception {
177         checkValidRatio(ratio);
178         FlowState state = state(stream);
179         state.windowUpdateRatio(ratio);
180         state.writeWindowUpdateIfNeeded(ctx);
181     }
182 
183     /**
184      * The window update ratio is used to determine when a window update must be sent. If the ratio
185      * of bytes processed since the last update has meet or exceeded this ratio then a window update will
186      * be sent. This window update ratio will only be applied to {@code streamId}.
187      * @throws Http2Exception If no stream corresponding to {@code stream} could be found.
188      */
189     public float windowUpdateRatio(Http2Stream stream) throws Http2Exception {
190         return state(stream).windowUpdateRatio();
191     }
192 
193     @Override
194     public void receiveFlowControlledFrame(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf data,
195             int padding, boolean endOfStream) throws Http2Exception {
196         int dataLength = data.readableBytes() + padding;
197 
198         // Apply the connection-level flow control
199         connectionState().receiveFlowControlledFrame(dataLength);
200 
201         // Apply the stream-level flow control
202         FlowState state = state(stream);
203         state.endOfStream(endOfStream);
204         state.receiveFlowControlledFrame(dataLength);
205     }
206 
207     private FlowState connectionState() {
208         return state(connection.connectionStream());
209     }
210 
211     private FlowState state(Http2Stream stream) {
212         checkNotNull(stream, "stream");
213         return stream.getProperty(FlowState.class);
214     }
215 
216     /**
217      * Flow control window state for an individual stream.
218      */
219     private final class FlowState {
220         private final Http2Stream stream;
221 
222         /**
223          * The actual flow control window that is decremented as soon as {@code DATA} arrives.
224          */
225         private int window;
226 
227         /**
228          * A view of {@link #window} that is used to determine when to send {@code WINDOW_UPDATE}
229          * frames. Decrementing this window for received {@code DATA} frames is delayed until the
230          * application has indicated that the data has been fully processed. This prevents sending
231          * a {@code WINDOW_UPDATE} until the number of processed bytes drops below the threshold.
232          */
233         private int processedWindow;
234 
235         /**
236          * This is what is used to determine how many bytes need to be returned relative to {@link #processedWindow}.
237          * Each stream has their own initial window size.
238          */
239         private volatile int initialStreamWindowSize;
240 
241         /**
242          * This is used to determine when {@link #processedWindow} is sufficiently far away from
243          * {@link #initialStreamWindowSize} such that a {@code WINDOW_UPDATE} should be sent.
244          * Each stream has their own window update ratio.
245          */
246         private volatile float streamWindowUpdateRatio;
247 
248         private int lowerBound;
249         private boolean endOfStream;
250 
251         FlowState(Http2Stream stream, int initialWindowSize) {
252             this.stream = stream;
253             window(initialWindowSize);
254             streamWindowUpdateRatio = windowUpdateRatio;
255         }
256 
257         int window() {
258             return window;
259         }
260 
261         void window(int initialWindowSize) {
262             window = processedWindow = initialStreamWindowSize = initialWindowSize;
263         }
264 
265         void endOfStream(boolean endOfStream) {
266             this.endOfStream = endOfStream;
267         }
268 
269         float windowUpdateRatio() {
270             return streamWindowUpdateRatio;
271         }
272 
273         void windowUpdateRatio(float ratio) {
274             streamWindowUpdateRatio = ratio;
275         }
276 
277         /**
278          * Increment the initial window size for this stream.
279          * @param delta The amount to increase the initial window size by.
280          */
281         void incrementInitialStreamWindow(int delta) {
282             // Clip the delta so that the resulting initialStreamWindowSize falls within the allowed range.
283             int newValue = (int) min(MAX_INITIAL_WINDOW_SIZE,
284                     max(MIN_INITIAL_WINDOW_SIZE, initialStreamWindowSize + (long) delta));
285             delta = newValue - initialStreamWindowSize;
286 
287             initialStreamWindowSize += delta;
288         }
289 
290         /**
291          * Increment the windows which are used to determine many bytes have been processed.
292          * @param delta The amount to increment the window by.
293          * @throws Http2Exception if integer overflow occurs on the window.
294          */
295         void incrementFlowControlWindows(int delta) throws Http2Exception {
296             if (delta > 0 && window > MAX_INITIAL_WINDOW_SIZE - delta) {
297                 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
298                         "Flow control window overflowed for stream: %d", stream.id());
299             }
300 
301             window += delta;
302             processedWindow += delta;
303             lowerBound = delta < 0 ? delta : 0;
304         }
305 
306         /**
307          * A flow control event has occurred and we should decrement the amount of available bytes for this stream.
308          * @param dataLength The amount of data to for which this stream is no longer eligible to use for flow control.
309          * @throws Http2Exception If too much data is used relative to how much is available.
310          */
311         void receiveFlowControlledFrame(int dataLength) throws Http2Exception {
312             assert dataLength >= 0;
313 
314             // Apply the delta. Even if we throw an exception we want to have taken this delta into account.
315             window -= dataLength;
316 
317             // Window size can become negative if we sent a SETTINGS frame that reduces the
318             // size of the transfer window after the peer has written data frames.
319             // The value is bounded by the length that SETTINGS frame decrease the window.
320             // This difference is stored for the connection when writing the SETTINGS frame
321             // and is cleared once we send a WINDOW_UPDATE frame.
322             if (window < lowerBound) {
323                 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
324                         "Flow control window exceeded for stream: %d", stream.id());
325             }
326         }
327 
328         /**
329          * Returns the processed bytes for this stream.
330          */
331         void returnProcessedBytes(int delta) throws Http2Exception {
332             if (processedWindow - delta < window) {
333                 throw streamError(stream.id(), INTERNAL_ERROR,
334                         "Attempting to return too many bytes for stream %d", stream.id());
335             }
336             processedWindow -= delta;
337         }
338 
339         void consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception {
340             if (stream.id() == CONNECTION_STREAM_ID) {
341                 throw new UnsupportedOperationException("Returning bytes for the connection window is not supported");
342             }
343             if (numBytes <= 0) {
344                 throw new IllegalArgumentException("numBytes must be positive");
345             }
346 
347             // Return bytes to the connection window
348             FlowState connectionState = connectionState();
349             connectionState.returnProcessedBytes(numBytes);
350             connectionState.writeWindowUpdateIfNeeded(ctx);
351 
352             // Return the bytes processed and update the window.
353             returnProcessedBytes(numBytes);
354             writeWindowUpdateIfNeeded(ctx);
355         }
356 
357         int unconsumedBytes() {
358             return processedWindow - window;
359         }
360 
361         /**
362          * Updates the flow control window for this stream if it is appropriate.
363          */
364         void writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception {
365             if (endOfStream || initialStreamWindowSize <= 0) {
366                 return;
367             }
368 
369             int threshold = (int) (initialStreamWindowSize * streamWindowUpdateRatio);
370             if (processedWindow <= threshold) {
371                 writeWindowUpdate(ctx);
372             }
373         }
374 
375         /**
376          * Called to perform a window update for this stream (or connection). Updates the window size back
377          * to the size of the initial window and sends a window update frame to the remote endpoint.
378          */
379         void writeWindowUpdate(ChannelHandlerContext ctx) throws Http2Exception {
380             // Expand the window for this stream back to the size of the initial window.
381             int deltaWindowSize = initialStreamWindowSize - processedWindow;
382             try {
383                 incrementFlowControlWindows(deltaWindowSize);
384             } catch (Throwable t) {
385                 throw connectionError(INTERNAL_ERROR, t,
386                         "Attempting to return too many bytes for stream %d", stream.id());
387             }
388 
389             // Send a window update for the stream/connection.
390             frameWriter.writeWindowUpdate(ctx, stream.id(), deltaWindowSize, ctx.newPromise());
391             ctx.flush();
392         }
393     }
394 }