View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty5.handler.codec.http2;
16  
17  import io.netty5.buffer.api.Buffer;
18  import io.netty5.channel.ChannelHandlerContext;
19  import io.netty5.handler.codec.http2.Http2Exception.CompositeStreamException;
20  import io.netty5.handler.codec.http2.Http2Exception.StreamException;
21  import io.netty5.util.internal.PlatformDependent;
22  import io.netty5.util.internal.UnstableApi;
23  
24  import static io.netty5.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
25  import static io.netty5.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
26  import static io.netty5.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
27  import static io.netty5.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
28  import static io.netty5.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
29  import static io.netty5.handler.codec.http2.Http2Error.INTERNAL_ERROR;
30  import static io.netty5.handler.codec.http2.Http2Exception.connectionError;
31  import static io.netty5.handler.codec.http2.Http2Exception.streamError;
32  import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
33  import static java.lang.Math.max;
34  import static java.lang.Math.min;
35  import static java.util.Objects.requireNonNull;
36  
37  /**
38   * Basic implementation of {@link Http2LocalFlowController}.
39   * <p>
40   * This class is <strong>NOT</strong> thread safe. The assumption is all methods must be invoked from a single thread.
41   * Typically this thread is the event loop thread for the {@link ChannelHandlerContext} managed by this class.
42   */
43  @UnstableApi
44  public class DefaultHttp2LocalFlowController implements Http2LocalFlowController {
45      /**
46       * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
47       * is sent to expand the window.
48       */
49      public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
50  
51      private final Http2Connection connection;
52      private final Http2Connection.PropertyKey stateKey;
53      private Http2FrameWriter frameWriter;
54      private ChannelHandlerContext ctx;
55      private float windowUpdateRatio;
56      private int initialWindowSize = DEFAULT_WINDOW_SIZE;
57  
58      public DefaultHttp2LocalFlowController(Http2Connection connection) {
59          this(connection, DEFAULT_WINDOW_UPDATE_RATIO, false);
60      }
61  
62      /**
63       * Constructs a controller with the given settings.
64       *
65       * @param connection the connection state.
66       * @param windowUpdateRatio the window percentage below which to send a {@code WINDOW_UPDATE}.
67       * @param autoRefillConnectionWindow if {@code true}, effectively disables the connection window
68       * in the flow control algorithm as they will always refill automatically without requiring the
69       * application to consume the bytes. When enabled, the maximum bytes you must be prepared to
70       * queue is proportional to {@code maximum number of concurrent streams * the initial window
71       * size per stream}
72       * (<a href="https://tools.ietf.org/html/rfc7540#section-6.5.2">SETTINGS_MAX_CONCURRENT_STREAMS</a>
73       * <a href="https://tools.ietf.org/html/rfc7540#section-6.5.2">SETTINGS_INITIAL_WINDOW_SIZE</a>).
74       */
75      public DefaultHttp2LocalFlowController(Http2Connection connection,
76                                             float windowUpdateRatio,
77                                             boolean autoRefillConnectionWindow) {
78          this.connection = requireNonNull(connection, "connection");
79          windowUpdateRatio(windowUpdateRatio);
80  
81          // Add a flow state for the connection.
82          stateKey = connection.newKey();
83          FlowState connectionState = autoRefillConnectionWindow ?
84                  new AutoRefillState(connection.connectionStream(), initialWindowSize) :
85                  new DefaultState(connection.connectionStream(), initialWindowSize);
86          connection.connectionStream().setProperty(stateKey, connectionState);
87  
88          // Register for notification of new streams.
89          connection.addListener(new Http2ConnectionAdapter() {
90              @Override
91              public void onStreamAdded(Http2Stream stream) {
92                  // Unconditionally used the reduced flow control state because it requires no object allocation
93                  // and the DefaultFlowState will be allocated in onStreamActive.
94                  stream.setProperty(stateKey, REDUCED_FLOW_STATE);
95              }
96  
97              @Override
98              public void onStreamActive(Http2Stream stream) {
99                  // Need to be sure the stream's initial window is adjusted for SETTINGS
100                 // frames which may have been exchanged while it was in IDLE
101                 stream.setProperty(stateKey, new DefaultState(stream, initialWindowSize));
102             }
103 
104             @Override
105             public void onStreamClosed(Http2Stream stream) {
106                 try {
107                     // When a stream is closed, consume any remaining bytes so that they
108                     // are restored to the connection window.
109                     FlowState state = state(stream);
110                     int unconsumedBytes = state.unconsumedBytes();
111                     if (ctx != null && unconsumedBytes > 0) {
112                         if (consumeAllBytes(state, unconsumedBytes)) {
113                             // As the user has no real control on when this callback is used we should better
114                             // call flush() if we produced any window update to ensure we not stale.
115                             ctx.flush();
116                         }
117                     }
118                 } catch (Http2Exception e) {
119                     PlatformDependent.throwException(e);
120                 } finally {
121                     // Unconditionally reduce the amount of memory required for flow control because there is no
122                     // object allocation costs associated with doing so and the stream will not have any more
123                     // local flow control state to keep track of anymore.
124                     stream.setProperty(stateKey, REDUCED_FLOW_STATE);
125                 }
126             }
127         });
128     }
129 
130     @Override
131     public DefaultHttp2LocalFlowController frameWriter(Http2FrameWriter frameWriter) {
132         this.frameWriter = requireNonNull(frameWriter, "frameWriter");
133         return this;
134     }
135 
136     @Override
137     public void channelHandlerContext(ChannelHandlerContext ctx) {
138         this.ctx = requireNonNull(ctx, "ctx");
139     }
140 
141     @Override
142     public void initialWindowSize(int newWindowSize) throws Http2Exception {
143         assert ctx == null || ctx.executor().inEventLoop();
144         int delta = newWindowSize - initialWindowSize;
145         initialWindowSize = newWindowSize;
146 
147         WindowUpdateVisitor visitor = new WindowUpdateVisitor(delta);
148         connection.forEachActiveStream(visitor);
149         visitor.throwIfError();
150     }
151 
152     @Override
153     public int initialWindowSize() {
154         return initialWindowSize;
155     }
156 
157     @Override
158     public int windowSize(Http2Stream stream) {
159         return state(stream).windowSize();
160     }
161 
162     @Override
163     public int initialWindowSize(Http2Stream stream) {
164         return state(stream).initialWindowSize();
165     }
166 
167     @Override
168     public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
169         assert ctx != null && ctx.executor().inEventLoop();
170         FlowState state = state(stream);
171         // Just add the delta to the stream-specific initial window size so that the next time the window
172         // expands it will grow to the new initial size.
173         state.incrementInitialStreamWindow(delta);
174         state.writeWindowUpdateIfNeeded();
175     }
176 
177     @Override
178     public boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception {
179         assert ctx != null && ctx.executor().inEventLoop();
180         checkPositiveOrZero(numBytes, "numBytes");
181         if (numBytes == 0) {
182             return false;
183         }
184 
185         // Streams automatically consume all remaining bytes when they are closed, so just ignore
186         // if already closed.
187         if (stream != null && !isClosed(stream)) {
188             if (stream.id() == CONNECTION_STREAM_ID) {
189                 throw new UnsupportedOperationException("Returning bytes for the connection window is not supported");
190             }
191 
192             return consumeAllBytes(state(stream), numBytes);
193         }
194         return false;
195     }
196 
197     private boolean consumeAllBytes(FlowState state, int numBytes) throws Http2Exception {
198         return connectionState().consumeBytes(numBytes) | state.consumeBytes(numBytes);
199     }
200 
201     @Override
202     public int unconsumedBytes(Http2Stream stream) {
203         return state(stream).unconsumedBytes();
204     }
205 
206     private static void checkValidRatio(float ratio) {
207         if (Double.compare(ratio, 0.0) <= 0 || Double.compare(ratio, 1.0) >= 0) {
208             throw new IllegalArgumentException("Invalid ratio: " + ratio);
209         }
210     }
211 
212     /**
213      * The window update ratio is used to determine when a window update must be sent. If the ratio
214      * of bytes processed since the last update has meet or exceeded this ratio then a window update will
215      * be sent. This is the global window update ratio that will be used for new streams.
216      * @param ratio the ratio to use when checking if a {@code WINDOW_UPDATE} is determined necessary for new streams.
217      * @throws IllegalArgumentException If the ratio is out of bounds (0, 1).
218      */
219     public void windowUpdateRatio(float ratio) {
220         assert ctx == null || ctx.executor().inEventLoop();
221         checkValidRatio(ratio);
222         windowUpdateRatio = ratio;
223     }
224 
225     /**
226      * The window update ratio is used to determine when a window update must be sent. If the ratio
227      * of bytes processed since the last update has meet or exceeded this ratio then a window update will
228      * be sent. This is the global window update ratio that will be used for new streams.
229      */
230     public float windowUpdateRatio() {
231         return windowUpdateRatio;
232     }
233 
234     /**
235      * The window update ratio is used to determine when a window update must be sent. If the ratio
236      * of bytes processed since the last update has meet or exceeded this ratio then a window update will
237      * be sent. This window update ratio will only be applied to {@code streamId}.
238      * <p>
239      * Note it is the responsibly of the caller to ensure that the
240      * initial {@code SETTINGS} frame is sent before this is called. It would
241      * be considered a {@link Http2Error#PROTOCOL_ERROR} if a {@code WINDOW_UPDATE}
242      * was generated by this method before the initial {@code SETTINGS} frame is sent.
243      * @param stream the stream for which {@code ratio} applies to.
244      * @param ratio the ratio to use when checking if a {@code WINDOW_UPDATE} is determined necessary.
245      * @throws Http2Exception If a protocol-error occurs while generating {@code WINDOW_UPDATE} frames
246      */
247     public void windowUpdateRatio(Http2Stream stream, float ratio) throws Http2Exception {
248         assert ctx != null && ctx.executor().inEventLoop();
249         checkValidRatio(ratio);
250         FlowState state = state(stream);
251         state.windowUpdateRatio(ratio);
252         state.writeWindowUpdateIfNeeded();
253     }
254 
255     /**
256      * The window update ratio is used to determine when a window update must be sent. If the ratio
257      * of bytes processed since the last update has meet or exceeded this ratio then a window update will
258      * be sent. This window update ratio will only be applied to {@code streamId}.
259      * @throws Http2Exception If no stream corresponding to {@code stream} could be found.
260      */
261     public float windowUpdateRatio(Http2Stream stream) throws Http2Exception {
262         return state(stream).windowUpdateRatio();
263     }
264 
265     @Override
266     public void receiveFlowControlledFrame(Http2Stream stream, Buffer data, int padding,
267                                            boolean endOfStream) throws Http2Exception {
268         assert ctx != null && ctx.executor().inEventLoop();
269         int dataLength = data.readableBytes() + padding;
270 
271         // Apply the connection-level flow control
272         FlowState connectionState = connectionState();
273         connectionState.receiveFlowControlledFrame(dataLength);
274 
275         if (stream != null && !isClosed(stream)) {
276             // Apply the stream-level flow control
277             FlowState state = state(stream);
278             state.endOfStream(endOfStream);
279             state.receiveFlowControlledFrame(dataLength);
280         } else if (dataLength > 0) {
281             // Immediately consume the bytes for the connection window.
282             connectionState.consumeBytes(dataLength);
283         }
284     }
285 
286     private FlowState connectionState() {
287         return connection.connectionStream().getProperty(stateKey);
288     }
289 
290     private FlowState state(Http2Stream stream) {
291         return stream.getProperty(stateKey);
292     }
293 
294     private static boolean isClosed(Http2Stream stream) {
295         return stream.state() == Http2Stream.State.CLOSED;
296     }
297 
298     /**
299      * Flow control state that does autorefill of the flow control window when the data is
300      * received.
301      */
302     private final class AutoRefillState extends DefaultState {
303         AutoRefillState(Http2Stream stream, int initialWindowSize) {
304             super(stream, initialWindowSize);
305         }
306 
307         @Override
308         public void receiveFlowControlledFrame(int dataLength) throws Http2Exception {
309             super.receiveFlowControlledFrame(dataLength);
310             // Need to call the super to consume the bytes, since this.consumeBytes does nothing.
311             super.consumeBytes(dataLength);
312         }
313 
314         @Override
315         public boolean consumeBytes(int numBytes) throws Http2Exception {
316             // Do nothing, since the bytes are already consumed upon receiving the data.
317             return false;
318         }
319     }
320 
321     /**
322      * Flow control window state for an individual stream.
323      */
324     private class DefaultState implements FlowState {
325         private final Http2Stream stream;
326 
327         /**
328          * The actual flow control window that is decremented as soon as {@code DATA} arrives.
329          */
330         private int window;
331 
332         /**
333          * A view of {@link #window} that is used to determine when to send {@code WINDOW_UPDATE}
334          * frames. Decrementing this window for received {@code DATA} frames is delayed until the
335          * application has indicated that the data has been fully processed. This prevents sending
336          * a {@code WINDOW_UPDATE} until the number of processed bytes drops below the threshold.
337          */
338         private int processedWindow;
339 
340         /**
341          * This is what is used to determine how many bytes need to be returned relative to {@link #processedWindow}.
342          * Each stream has their own initial window size.
343          */
344         private int initialStreamWindowSize;
345 
346         /**
347          * This is used to determine when {@link #processedWindow} is sufficiently far away from
348          * {@link #initialStreamWindowSize} such that a {@code WINDOW_UPDATE} should be sent.
349          * Each stream has their own window update ratio.
350          */
351         private float streamWindowUpdateRatio;
352 
353         private int lowerBound;
354         private boolean endOfStream;
355 
356         DefaultState(Http2Stream stream, int initialWindowSize) {
357             this.stream = stream;
358             window(initialWindowSize);
359             streamWindowUpdateRatio = windowUpdateRatio;
360         }
361 
362         @Override
363         public void window(int initialWindowSize) {
364             assert ctx == null || ctx.executor().inEventLoop();
365             window = processedWindow = initialStreamWindowSize = initialWindowSize;
366         }
367 
368         @Override
369         public int windowSize() {
370             return window;
371         }
372 
373         @Override
374         public int initialWindowSize() {
375             return initialStreamWindowSize;
376         }
377 
378         @Override
379         public void endOfStream(boolean endOfStream) {
380             this.endOfStream = endOfStream;
381         }
382 
383         @Override
384         public float windowUpdateRatio() {
385             return streamWindowUpdateRatio;
386         }
387 
388         @Override
389         public void windowUpdateRatio(float ratio) {
390             assert ctx == null || ctx.executor().inEventLoop();
391             streamWindowUpdateRatio = ratio;
392         }
393 
394         @Override
395         public void incrementInitialStreamWindow(int delta) {
396             // Clip the delta so that the resulting initialStreamWindowSize falls within the allowed range.
397             int newValue = (int) min(MAX_INITIAL_WINDOW_SIZE,
398                     max(MIN_INITIAL_WINDOW_SIZE, initialStreamWindowSize + (long) delta));
399             delta = newValue - initialStreamWindowSize;
400 
401             initialStreamWindowSize += delta;
402         }
403 
404         @Override
405         public void incrementFlowControlWindows(int delta) throws Http2Exception {
406             if (delta > 0 && window > MAX_INITIAL_WINDOW_SIZE - delta) {
407                 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
408                         "Flow control window overflowed for stream: %d", stream.id());
409             }
410 
411             window += delta;
412             processedWindow += delta;
413             lowerBound = min(delta, 0);
414         }
415 
416         @Override
417         public void receiveFlowControlledFrame(int dataLength) throws Http2Exception {
418             assert dataLength >= 0;
419 
420             // Apply the delta. Even if we throw an exception we want to have taken this delta into account.
421             window -= dataLength;
422 
423             // Window size can become negative if we sent a SETTINGS frame that reduces the
424             // size of the transfer window after the peer has written data frames.
425             // The value is bounded by the length that SETTINGS frame decrease the window.
426             // This difference is stored for the connection when writing the SETTINGS frame
427             // and is cleared once we send a WINDOW_UPDATE frame.
428             if (window < lowerBound) {
429                 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
430                         "Flow control window exceeded for stream: %d", stream.id());
431             }
432         }
433 
434         private void returnProcessedBytes(int delta) throws Http2Exception {
435             if (processedWindow - delta < window) {
436                 throw streamError(stream.id(), INTERNAL_ERROR,
437                         "Attempting to return too many bytes for stream %d", stream.id());
438             }
439             processedWindow -= delta;
440         }
441 
442         @Override
443         public boolean consumeBytes(int numBytes) throws Http2Exception {
444             // Return the bytes processed and update the window.
445             returnProcessedBytes(numBytes);
446             return writeWindowUpdateIfNeeded();
447         }
448 
449         @Override
450         public int unconsumedBytes() {
451             return processedWindow - window;
452         }
453 
454         @Override
455         public boolean writeWindowUpdateIfNeeded() throws Http2Exception {
456             if (endOfStream || initialStreamWindowSize <= 0 ||
457                     // If the stream is already closed there is no need to try to write a window update for it.
458                     isClosed(stream)) {
459                 return false;
460             }
461 
462             int threshold = (int) (initialStreamWindowSize * streamWindowUpdateRatio);
463             if (processedWindow <= threshold) {
464                 writeWindowUpdate();
465                 return true;
466             }
467             return false;
468         }
469 
470         /**
471          * Called to perform a window update for this stream (or connection). Updates the window size back
472          * to the size of the initial window and sends a window update frame to the remote endpoint.
473          */
474         private void writeWindowUpdate() throws Http2Exception {
475             // Expand the window for this stream back to the size of the initial window.
476             int deltaWindowSize = initialStreamWindowSize - processedWindow;
477             try {
478                 incrementFlowControlWindows(deltaWindowSize);
479             } catch (Throwable t) {
480                 throw connectionError(INTERNAL_ERROR, t,
481                         "Attempting to return too many bytes for stream %d", stream.id());
482             }
483 
484             // Send a window update for the stream/connection.
485             frameWriter.writeWindowUpdate(ctx, stream.id(), deltaWindowSize);
486         }
487     }
488 
489     /**
490      * The local flow control state for a single stream that is not in a state where flow controlled frames cannot
491      * be exchanged.
492      */
493     private static final FlowState REDUCED_FLOW_STATE = new FlowState() {
494 
495         @Override
496         public int windowSize() {
497             return 0;
498         }
499 
500         @Override
501         public int initialWindowSize() {
502             return 0;
503         }
504 
505         @Override
506         public void window(int initialWindowSize) {
507             throw new UnsupportedOperationException();
508         }
509 
510         @Override
511         public void incrementInitialStreamWindow(int delta) {
512             // This operation needs to be supported during the initial settings exchange when
513             // the peer has not yet acknowledged this peer being activated.
514         }
515 
516         @Override
517         public boolean writeWindowUpdateIfNeeded() throws Http2Exception {
518             throw new UnsupportedOperationException();
519         }
520 
521         @Override
522         public boolean consumeBytes(int numBytes) throws Http2Exception {
523             return false;
524         }
525 
526         @Override
527         public int unconsumedBytes() {
528             return 0;
529         }
530 
531         @Override
532         public float windowUpdateRatio() {
533             throw new UnsupportedOperationException();
534         }
535 
536         @Override
537         public void windowUpdateRatio(float ratio) {
538             throw new UnsupportedOperationException();
539         }
540 
541         @Override
542         public void receiveFlowControlledFrame(int dataLength) throws Http2Exception {
543             throw new UnsupportedOperationException();
544         }
545 
546         @Override
547         public void incrementFlowControlWindows(int delta) throws Http2Exception {
548             // This operation needs to be supported during the initial settings exchange when
549             // the peer has not yet acknowledged this peer being activated.
550         }
551 
552         @Override
553         public void endOfStream(boolean endOfStream) {
554             throw new UnsupportedOperationException();
555         }
556     };
557 
558     /**
559      * An abstraction which provides specific extensions used by local flow control.
560      */
561     private interface FlowState {
562 
563         int windowSize();
564 
565         int initialWindowSize();
566 
567         void window(int initialWindowSize);
568 
569         /**
570          * Increment the initial window size for this stream.
571          * @param delta The amount to increase the initial window size by.
572          */
573         void incrementInitialStreamWindow(int delta);
574 
575         /**
576          * Updates the flow control window for this stream if it is appropriate.
577          *
578          * @return true if {@code WINDOW_UPDATE} was written, false otherwise.
579          */
580         boolean writeWindowUpdateIfNeeded() throws Http2Exception;
581 
582         /**
583          * Indicates that the application has consumed {@code numBytes} from the connection or stream and is
584          * ready to receive more data.
585          *
586          * @param numBytes the number of bytes to be returned to the flow control window.
587          * @return true if {@code WINDOW_UPDATE} was written, false otherwise.
588          * @throws Http2Exception If the number of bytes is too great for the current window,
589          * or an internal error occurs.
590          */
591         boolean consumeBytes(int numBytes) throws Http2Exception;
592 
593         int unconsumedBytes();
594 
595         float windowUpdateRatio();
596 
597         void windowUpdateRatio(float ratio);
598 
599         /**
600          * A flow control event has occurred and we should decrement the amount of available bytes for this stream.
601          * @param dataLength The amount of data to for which this stream is no longer eligible to use for flow control.
602          * @throws Http2Exception If too much data is used relative to how much is available.
603          */
604         void receiveFlowControlledFrame(int dataLength) throws Http2Exception;
605 
606         /**
607          * Increment the windows which are used to determine many bytes have been processed.
608          * @param delta The amount to increment the window by.
609          * @throws Http2Exception if integer overflow occurs on the window.
610          */
611         void incrementFlowControlWindows(int delta) throws Http2Exception;
612 
613         void endOfStream(boolean endOfStream);
614     }
615 
616     /**
617      * Provides a means to iterate over all active streams and increment the flow control windows.
618      */
619     private final class WindowUpdateVisitor implements Http2StreamVisitor {
620         private CompositeStreamException compositeException;
621         private final int delta;
622 
623         WindowUpdateVisitor(int delta) {
624             this.delta = delta;
625         }
626 
627         @Override
628         public boolean visit(Http2Stream stream) throws Http2Exception {
629             try {
630                 // Increment flow control window first so state will be consistent if overflow is detected.
631                 FlowState state = state(stream);
632                 state.incrementFlowControlWindows(delta);
633                 state.incrementInitialStreamWindow(delta);
634             } catch (StreamException e) {
635                 if (compositeException == null) {
636                     compositeException = new CompositeStreamException(e.error(), 4);
637                 }
638                 compositeException.add(e);
639             }
640             return true;
641         }
642 
643         public void throwIfError() throws CompositeStreamException {
644             if (compositeException != null) {
645                 throw compositeException;
646             }
647         }
648     }
649 }