View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  
16  package io.netty.handler.codec.http2;
17  
18  import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
19  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
20  import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
21  import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
22  import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
23  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
24  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
25  import static io.netty.handler.codec.http2.Http2Exception.streamError;
26  import static io.netty.util.internal.ObjectUtil.checkNotNull;
27  import static java.lang.Math.max;
28  import static java.lang.Math.min;
29  import io.netty.buffer.ByteBuf;
30  import io.netty.channel.ChannelHandlerContext;
31  import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
32  import io.netty.handler.codec.http2.Http2Exception.StreamException;
33  import io.netty.util.internal.PlatformDependent;
34  import io.netty.util.internal.UnstableApi;
35  
36  /**
37   * Basic implementation of {@link Http2LocalFlowController}.
38   * <p>
39   * This class is <strong>NOT</strong> thread safe. The assumption is all methods must be invoked from a single thread.
40   * Typically this thread is the event loop thread for the {@link ChannelHandlerContext} managed by this class.
41   */
42  @UnstableApi
43  public class DefaultHttp2LocalFlowController implements Http2LocalFlowController {
44      /**
45       * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
46       * is sent to expand the window.
47       */
48      public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
49  
50      private final Http2Connection connection;
51      private final Http2Connection.PropertyKey stateKey;
52      private Http2FrameWriter frameWriter;
53      private ChannelHandlerContext ctx;
54      private float windowUpdateRatio;
55      private int initialWindowSize = DEFAULT_WINDOW_SIZE;
56  
57      public DefaultHttp2LocalFlowController(Http2Connection connection) {
58          this(connection, DEFAULT_WINDOW_UPDATE_RATIO, false);
59      }
60  
61      /**
62       * Constructs a controller with the given settings.
63       *
64       * @param connection the connection state.
65       * @param windowUpdateRatio the window percentage below which to send a {@code WINDOW_UPDATE}.
66       * @param autoRefillConnectionWindow if {@code true}, effectively disables the connection window
67       * in the flow control algorithm as they will always refill automatically without requiring the
68       * application to consume the bytes. When enabled, the maximum bytes you must be prepared to
69       * queue is proportional to {@code maximum number of concurrent streams * the initial window
70       * size per stream}
71       * (<a href="https://tools.ietf.org/html/rfc7540#section-6.5.2">SETTINGS_MAX_CONCURRENT_STREAMS</a>
72       * <a href="https://tools.ietf.org/html/rfc7540#section-6.5.2">SETTINGS_INITIAL_WINDOW_SIZE</a>).
73       */
74      public DefaultHttp2LocalFlowController(Http2Connection connection,
75                                             float windowUpdateRatio,
76                                             boolean autoRefillConnectionWindow) {
77          this.connection = checkNotNull(connection, "connection");
78          windowUpdateRatio(windowUpdateRatio);
79  
80          // Add a flow state for the connection.
81          stateKey = connection.newKey();
82          FlowState connectionState = autoRefillConnectionWindow ?
83                  new AutoRefillState(connection.connectionStream(), initialWindowSize) :
84                  new DefaultState(connection.connectionStream(), initialWindowSize);
85          connection.connectionStream().setProperty(stateKey, connectionState);
86  
87          // Register for notification of new streams.
88          connection.addListener(new Http2ConnectionAdapter() {
89              @Override
90              public void onStreamAdded(Http2Stream stream) {
91                  // Unconditionally used the reduced flow control state because it requires no object allocation
92                  // and the DefaultFlowState will be allocated in onStreamActive.
93                  stream.setProperty(stateKey, REDUCED_FLOW_STATE);
94              }
95  
96              @Override
97              public void onStreamActive(Http2Stream stream) {
98                  // Need to be sure the stream's initial window is adjusted for SETTINGS
99                  // frames which may have been exchanged while it was in IDLE
100                 stream.setProperty(stateKey, new DefaultState(stream, initialWindowSize));
101             }
102 
103             @Override
104             public void onStreamClosed(Http2Stream stream) {
105                 try {
106                     // When a stream is closed, consume any remaining bytes so that they
107                     // are restored to the connection window.
108                     FlowState state = state(stream);
109                     int unconsumedBytes = state.unconsumedBytes();
110                     if (ctx != null && unconsumedBytes > 0) {
111                         connectionState().consumeBytes(unconsumedBytes);
112                         state.consumeBytes(unconsumedBytes);
113                     }
114                 } catch (Http2Exception e) {
115                     PlatformDependent.throwException(e);
116                 } finally {
117                     // Unconditionally reduce the amount of memory required for flow control because there is no
118                     // object allocation costs associated with doing so and the stream will not have any more
119                     // local flow control state to keep track of anymore.
120                     stream.setProperty(stateKey, REDUCED_FLOW_STATE);
121                 }
122             }
123         });
124     }
125 
126     @Override
127     public DefaultHttp2LocalFlowController frameWriter(Http2FrameWriter frameWriter) {
128         this.frameWriter = checkNotNull(frameWriter, "frameWriter");
129         return this;
130     }
131 
132     @Override
133     public void channelHandlerContext(ChannelHandlerContext ctx) {
134         this.ctx = checkNotNull(ctx, "ctx");
135     }
136 
137     @Override
138     public void initialWindowSize(int newWindowSize) throws Http2Exception {
139         assert ctx == null || ctx.executor().inEventLoop();
140         int delta = newWindowSize - initialWindowSize;
141         initialWindowSize = newWindowSize;
142 
143         WindowUpdateVisitor visitor = new WindowUpdateVisitor(delta);
144         connection.forEachActiveStream(visitor);
145         visitor.throwIfError();
146     }
147 
148     @Override
149     public int initialWindowSize() {
150         return initialWindowSize;
151     }
152 
153     @Override
154     public int windowSize(Http2Stream stream) {
155         return state(stream).windowSize();
156     }
157 
158     @Override
159     public int initialWindowSize(Http2Stream stream) {
160         return state(stream).initialWindowSize();
161     }
162 
163     @Override
164     public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
165         assert ctx != null && ctx.executor().inEventLoop();
166         FlowState state = state(stream);
167         // Just add the delta to the stream-specific initial window size so that the next time the window
168         // expands it will grow to the new initial size.
169         state.incrementInitialStreamWindow(delta);
170         state.writeWindowUpdateIfNeeded();
171     }
172 
173     @Override
174     public boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception {
175         assert ctx != null && ctx.executor().inEventLoop();
176         if (numBytes < 0) {
177             throw new IllegalArgumentException("numBytes must not be negative");
178         }
179         if (numBytes == 0) {
180             return false;
181         }
182 
183         // Streams automatically consume all remaining bytes when they are closed, so just ignore
184         // if already closed.
185         if (stream != null && !isClosed(stream)) {
186             if (stream.id() == CONNECTION_STREAM_ID) {
187                 throw new UnsupportedOperationException("Returning bytes for the connection window is not supported");
188             }
189 
190             boolean windowUpdateSent = connectionState().consumeBytes(numBytes);
191             windowUpdateSent |= state(stream).consumeBytes(numBytes);
192             return windowUpdateSent;
193         }
194         return false;
195     }
196 
197     @Override
198     public int unconsumedBytes(Http2Stream stream) {
199         return state(stream).unconsumedBytes();
200     }
201 
202     private static void checkValidRatio(float ratio) {
203         if (Double.compare(ratio, 0.0) <= 0 || Double.compare(ratio, 1.0) >= 0) {
204             throw new IllegalArgumentException("Invalid ratio: " + ratio);
205         }
206     }
207 
208     /**
209      * The window update ratio is used to determine when a window update must be sent. If the ratio
210      * of bytes processed since the last update has meet or exceeded this ratio then a window update will
211      * be sent. This is the global window update ratio that will be used for new streams.
212      * @param ratio the ratio to use when checking if a {@code WINDOW_UPDATE} is determined necessary for new streams.
213      * @throws IllegalArgumentException If the ratio is out of bounds (0, 1).
214      */
215     public void windowUpdateRatio(float ratio) {
216         assert ctx == null || ctx.executor().inEventLoop();
217         checkValidRatio(ratio);
218         windowUpdateRatio = ratio;
219     }
220 
221     /**
222      * The window update ratio is used to determine when a window update must be sent. If the ratio
223      * of bytes processed since the last update has meet or exceeded this ratio then a window update will
224      * be sent. This is the global window update ratio that will be used for new streams.
225      */
226     public float windowUpdateRatio() {
227         return windowUpdateRatio;
228     }
229 
230     /**
231      * The window update ratio is used to determine when a window update must be sent. If the ratio
232      * of bytes processed since the last update has meet or exceeded this ratio then a window update will
233      * be sent. This window update ratio will only be applied to {@code streamId}.
234      * <p>
235      * Note it is the responsibly of the caller to ensure that the the
236      * initial {@code SETTINGS} frame is sent before this is called. It would
237      * be considered a {@link Http2Error#PROTOCOL_ERROR} if a {@code WINDOW_UPDATE}
238      * was generated by this method before the initial {@code SETTINGS} frame is sent.
239      * @param stream the stream for which {@code ratio} applies to.
240      * @param ratio the ratio to use when checking if a {@code WINDOW_UPDATE} is determined necessary.
241      * @throws Http2Exception If a protocol-error occurs while generating {@code WINDOW_UPDATE} frames
242      */
243     public void windowUpdateRatio(Http2Stream stream, float ratio) throws Http2Exception {
244         assert ctx != null && ctx.executor().inEventLoop();
245         checkValidRatio(ratio);
246         FlowState state = state(stream);
247         state.windowUpdateRatio(ratio);
248         state.writeWindowUpdateIfNeeded();
249     }
250 
251     /**
252      * The window update ratio is used to determine when a window update must be sent. If the ratio
253      * of bytes processed since the last update has meet or exceeded this ratio then a window update will
254      * be sent. This window update ratio will only be applied to {@code streamId}.
255      * @throws Http2Exception If no stream corresponding to {@code stream} could be found.
256      */
257     public float windowUpdateRatio(Http2Stream stream) throws Http2Exception {
258         return state(stream).windowUpdateRatio();
259     }
260 
261     @Override
262     public void receiveFlowControlledFrame(Http2Stream stream, ByteBuf data, int padding,
263             boolean endOfStream) throws Http2Exception {
264         assert ctx != null && ctx.executor().inEventLoop();
265         int dataLength = data.readableBytes() + padding;
266 
267         // Apply the connection-level flow control
268         FlowState connectionState = connectionState();
269         connectionState.receiveFlowControlledFrame(dataLength);
270 
271         if (stream != null && !isClosed(stream)) {
272             // Apply the stream-level flow control
273             FlowState state = state(stream);
274             state.endOfStream(endOfStream);
275             state.receiveFlowControlledFrame(dataLength);
276         } else if (dataLength > 0) {
277             // Immediately consume the bytes for the connection window.
278             connectionState.consumeBytes(dataLength);
279         }
280     }
281 
282     private FlowState connectionState() {
283         return connection.connectionStream().getProperty(stateKey);
284     }
285 
286     private FlowState state(Http2Stream stream) {
287         return stream.getProperty(stateKey);
288     }
289 
290     private static boolean isClosed(Http2Stream stream) {
291         return stream.state() == Http2Stream.State.CLOSED;
292     }
293 
294     /**
295      * Flow control state that does autorefill of the flow control window when the data is
296      * received.
297      */
298     private final class AutoRefillState extends DefaultState {
299         public AutoRefillState(Http2Stream stream, int initialWindowSize) {
300             super(stream, initialWindowSize);
301         }
302 
303         @Override
304         public void receiveFlowControlledFrame(int dataLength) throws Http2Exception {
305             super.receiveFlowControlledFrame(dataLength);
306             // Need to call the super to consume the bytes, since this.consumeBytes does nothing.
307             super.consumeBytes(dataLength);
308         }
309 
310         @Override
311         public boolean consumeBytes(int numBytes) throws Http2Exception {
312             // Do nothing, since the bytes are already consumed upon receiving the data.
313             return false;
314         }
315     }
316 
317     /**
318      * Flow control window state for an individual stream.
319      */
320     private class DefaultState implements FlowState {
321         private final Http2Stream stream;
322 
323         /**
324          * The actual flow control window that is decremented as soon as {@code DATA} arrives.
325          */
326         private int window;
327 
328         /**
329          * A view of {@link #window} that is used to determine when to send {@code WINDOW_UPDATE}
330          * frames. Decrementing this window for received {@code DATA} frames is delayed until the
331          * application has indicated that the data has been fully processed. This prevents sending
332          * a {@code WINDOW_UPDATE} until the number of processed bytes drops below the threshold.
333          */
334         private int processedWindow;
335 
336         /**
337          * This is what is used to determine how many bytes need to be returned relative to {@link #processedWindow}.
338          * Each stream has their own initial window size.
339          */
340         private int initialStreamWindowSize;
341 
342         /**
343          * This is used to determine when {@link #processedWindow} is sufficiently far away from
344          * {@link #initialStreamWindowSize} such that a {@code WINDOW_UPDATE} should be sent.
345          * Each stream has their own window update ratio.
346          */
347         private float streamWindowUpdateRatio;
348 
349         private int lowerBound;
350         private boolean endOfStream;
351 
352         public DefaultState(Http2Stream stream, int initialWindowSize) {
353             this.stream = stream;
354             window(initialWindowSize);
355             streamWindowUpdateRatio = windowUpdateRatio;
356         }
357 
358         @Override
359         public void window(int initialWindowSize) {
360             assert ctx == null || ctx.executor().inEventLoop();
361             window = processedWindow = initialStreamWindowSize = initialWindowSize;
362         }
363 
364         @Override
365         public int windowSize() {
366             return window;
367         }
368 
369         @Override
370         public int initialWindowSize() {
371             return initialStreamWindowSize;
372         }
373 
374         @Override
375         public void endOfStream(boolean endOfStream) {
376             this.endOfStream = endOfStream;
377         }
378 
379         @Override
380         public float windowUpdateRatio() {
381             return streamWindowUpdateRatio;
382         }
383 
384         @Override
385         public void windowUpdateRatio(float ratio) {
386             assert ctx == null || ctx.executor().inEventLoop();
387             streamWindowUpdateRatio = ratio;
388         }
389 
390         @Override
391         public void incrementInitialStreamWindow(int delta) {
392             // Clip the delta so that the resulting initialStreamWindowSize falls within the allowed range.
393             int newValue = (int) min(MAX_INITIAL_WINDOW_SIZE,
394                     max(MIN_INITIAL_WINDOW_SIZE, initialStreamWindowSize + (long) delta));
395             delta = newValue - initialStreamWindowSize;
396 
397             initialStreamWindowSize += delta;
398         }
399 
400         @Override
401         public void incrementFlowControlWindows(int delta) throws Http2Exception {
402             if (delta > 0 && window > MAX_INITIAL_WINDOW_SIZE - delta) {
403                 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
404                         "Flow control window overflowed for stream: %d", stream.id());
405             }
406 
407             window += delta;
408             processedWindow += delta;
409             lowerBound = delta < 0 ? delta : 0;
410         }
411 
412         @Override
413         public void receiveFlowControlledFrame(int dataLength) throws Http2Exception {
414             assert dataLength >= 0;
415 
416             // Apply the delta. Even if we throw an exception we want to have taken this delta into account.
417             window -= dataLength;
418 
419             // Window size can become negative if we sent a SETTINGS frame that reduces the
420             // size of the transfer window after the peer has written data frames.
421             // The value is bounded by the length that SETTINGS frame decrease the window.
422             // This difference is stored for the connection when writing the SETTINGS frame
423             // and is cleared once we send a WINDOW_UPDATE frame.
424             if (window < lowerBound) {
425                 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
426                         "Flow control window exceeded for stream: %d", stream.id());
427             }
428         }
429 
430         private void returnProcessedBytes(int delta) throws Http2Exception {
431             if (processedWindow - delta < window) {
432                 throw streamError(stream.id(), INTERNAL_ERROR,
433                         "Attempting to return too many bytes for stream %d", stream.id());
434             }
435             processedWindow -= delta;
436         }
437 
438         @Override
439         public boolean consumeBytes(int numBytes) throws Http2Exception {
440             // Return the bytes processed and update the window.
441             returnProcessedBytes(numBytes);
442             return writeWindowUpdateIfNeeded();
443         }
444 
445         @Override
446         public int unconsumedBytes() {
447             return processedWindow - window;
448         }
449 
450         @Override
451         public boolean writeWindowUpdateIfNeeded() throws Http2Exception {
452             if (endOfStream || initialStreamWindowSize <= 0) {
453                 return false;
454             }
455 
456             int threshold = (int) (initialStreamWindowSize * streamWindowUpdateRatio);
457             if (processedWindow <= threshold) {
458                 writeWindowUpdate();
459                 return true;
460             }
461             return false;
462         }
463 
464         /**
465          * Called to perform a window update for this stream (or connection). Updates the window size back
466          * to the size of the initial window and sends a window update frame to the remote endpoint.
467          */
468         private void writeWindowUpdate() throws Http2Exception {
469             // Expand the window for this stream back to the size of the initial window.
470             int deltaWindowSize = initialStreamWindowSize - processedWindow;
471             try {
472                 incrementFlowControlWindows(deltaWindowSize);
473             } catch (Throwable t) {
474                 throw connectionError(INTERNAL_ERROR, t,
475                         "Attempting to return too many bytes for stream %d", stream.id());
476             }
477 
478             // Send a window update for the stream/connection.
479             frameWriter.writeWindowUpdate(ctx, stream.id(), deltaWindowSize, ctx.newPromise());
480         }
481     }
482 
483     /**
484      * The local flow control state for a single stream that is not in a state where flow controlled frames cannot
485      * be exchanged.
486      */
487     private static final FlowState REDUCED_FLOW_STATE = new FlowState() {
488 
489         @Override
490         public int windowSize() {
491             return 0;
492         }
493 
494         @Override
495         public int initialWindowSize() {
496             return 0;
497         }
498 
499         @Override
500         public void window(int initialWindowSize) {
501             throw new UnsupportedOperationException();
502         }
503 
504         @Override
505         public void incrementInitialStreamWindow(int delta) {
506             // This operation needs to be supported during the initial settings exchange when
507             // the peer has not yet acknowledged this peer being activated.
508         }
509 
510         @Override
511         public boolean writeWindowUpdateIfNeeded() throws Http2Exception {
512             throw new UnsupportedOperationException();
513         }
514 
515         @Override
516         public boolean consumeBytes(int numBytes) throws Http2Exception {
517             return false;
518         }
519 
520         @Override
521         public int unconsumedBytes() {
522             return 0;
523         }
524 
525         @Override
526         public float windowUpdateRatio() {
527             throw new UnsupportedOperationException();
528         }
529 
530         @Override
531         public void windowUpdateRatio(float ratio) {
532             throw new UnsupportedOperationException();
533         }
534 
535         @Override
536         public void receiveFlowControlledFrame(int dataLength) throws Http2Exception {
537             throw new UnsupportedOperationException();
538         }
539 
540         @Override
541         public void incrementFlowControlWindows(int delta) throws Http2Exception {
542             // This operation needs to be supported during the initial settings exchange when
543             // the peer has not yet acknowledged this peer being activated.
544         }
545 
546         @Override
547         public void endOfStream(boolean endOfStream) {
548             throw new UnsupportedOperationException();
549         }
550     };
551 
552     /**
553      * An abstraction which provides specific extensions used by local flow control.
554      */
555     private interface FlowState {
556 
557         int windowSize();
558 
559         int initialWindowSize();
560 
561         void window(int initialWindowSize);
562 
563         /**
564          * Increment the initial window size for this stream.
565          * @param delta The amount to increase the initial window size by.
566          */
567         void incrementInitialStreamWindow(int delta);
568 
569         /**
570          * Updates the flow control window for this stream if it is appropriate.
571          *
572          * @return true if {@code WINDOW_UPDATE} was written, false otherwise.
573          */
574         boolean writeWindowUpdateIfNeeded() throws Http2Exception;
575 
576         /**
577          * Indicates that the application has consumed {@code numBytes} from the connection or stream and is
578          * ready to receive more data.
579          *
580          * @param numBytes the number of bytes to be returned to the flow control window.
581          * @return true if {@code WINDOW_UPDATE} was written, false otherwise.
582          * @throws Http2Exception
583          */
584         boolean consumeBytes(int numBytes) throws Http2Exception;
585 
586         int unconsumedBytes();
587 
588         float windowUpdateRatio();
589 
590         void windowUpdateRatio(float ratio);
591 
592         /**
593          * A flow control event has occurred and we should decrement the amount of available bytes for this stream.
594          * @param dataLength The amount of data to for which this stream is no longer eligible to use for flow control.
595          * @throws Http2Exception If too much data is used relative to how much is available.
596          */
597         void receiveFlowControlledFrame(int dataLength) throws Http2Exception;
598 
599         /**
600          * Increment the windows which are used to determine many bytes have been processed.
601          * @param delta The amount to increment the window by.
602          * @throws Http2Exception if integer overflow occurs on the window.
603          */
604         void incrementFlowControlWindows(int delta) throws Http2Exception;
605 
606         void endOfStream(boolean endOfStream);
607     }
608 
609     /**
610      * Provides a means to iterate over all active streams and increment the flow control windows.
611      */
612     private final class WindowUpdateVisitor implements Http2StreamVisitor {
613         private CompositeStreamException compositeException;
614         private final int delta;
615 
616         public WindowUpdateVisitor(int delta) {
617             this.delta = delta;
618         }
619 
620         @Override
621         public boolean visit(Http2Stream stream) throws Http2Exception {
622             try {
623                 // Increment flow control window first so state will be consistent if overflow is detected.
624                 FlowState state = state(stream);
625                 state.incrementFlowControlWindows(delta);
626                 state.incrementInitialStreamWindow(delta);
627             } catch (StreamException e) {
628                 if (compositeException == null) {
629                     compositeException = new CompositeStreamException(e.error(), 4);
630                 }
631                 compositeException.add(e);
632             }
633             return true;
634         }
635 
636         public void throwIfError() throws CompositeStreamException {
637             if (compositeException != null) {
638                 throw compositeException;
639             }
640         }
641     }
642 }