View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  
16  package io.netty.handler.codec.http2;
17  
18  import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
19  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
20  import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
21  import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
22  import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
23  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
24  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
25  import static io.netty.handler.codec.http2.Http2Exception.streamError;
26  import static io.netty.util.internal.ObjectUtil.checkNotNull;
27  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
28  import static java.lang.Math.max;
29  import static java.lang.Math.min;
30  import io.netty.buffer.ByteBuf;
31  import io.netty.channel.ChannelHandlerContext;
32  import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
33  import io.netty.handler.codec.http2.Http2Exception.StreamException;
34  import io.netty.util.internal.PlatformDependent;
35  
36  /**
37   * Basic implementation of {@link Http2LocalFlowController}.
38   * <p>
39   * This class is <strong>NOT</strong> thread safe. The assumption is all methods must be invoked from a single thread.
40   * Typically this thread is the event loop thread for the {@link ChannelHandlerContext} managed by this class.
41   */
42  public class DefaultHttp2LocalFlowController implements Http2LocalFlowController {
43      /**
44       * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
45       * is sent to expand the window.
46       */
47      public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
48  
49      private final Http2Connection connection;
50      private final Http2Connection.PropertyKey stateKey;
51      private Http2FrameWriter frameWriter;
52      private ChannelHandlerContext ctx;
53      private float windowUpdateRatio;
54      private int initialWindowSize = DEFAULT_WINDOW_SIZE;
55  
56      public DefaultHttp2LocalFlowController(Http2Connection connection) {
57          this(connection, DEFAULT_WINDOW_UPDATE_RATIO, false);
58      }
59  
60      /**
61       * Constructs a controller with the given settings.
62       *
63       * @param connection the connection state.
64       * @param windowUpdateRatio the window percentage below which to send a {@code WINDOW_UPDATE}.
65       * @param autoRefillConnectionWindow if {@code true}, effectively disables the connection window
66       * in the flow control algorithm as they will always refill automatically without requiring the
67       * application to consume the bytes. When enabled, the maximum bytes you must be prepared to
68       * queue is proportional to {@code maximum number of concurrent streams * the initial window
69       * size per stream}
70       * (<a href="https://tools.ietf.org/html/rfc7540#section-6.5.2">SETTINGS_MAX_CONCURRENT_STREAMS</a>
71       * <a href="https://tools.ietf.org/html/rfc7540#section-6.5.2">SETTINGS_INITIAL_WINDOW_SIZE</a>).
72       */
73      public DefaultHttp2LocalFlowController(Http2Connection connection,
74                                             float windowUpdateRatio,
75                                             boolean autoRefillConnectionWindow) {
76          this.connection = checkNotNull(connection, "connection");
77          windowUpdateRatio(windowUpdateRatio);
78  
79          // Add a flow state for the connection.
80          stateKey = connection.newKey();
81          FlowState connectionState = autoRefillConnectionWindow ?
82                  new AutoRefillState(connection.connectionStream(), initialWindowSize) :
83                  new DefaultState(connection.connectionStream(), initialWindowSize);
84          connection.connectionStream().setProperty(stateKey, connectionState);
85  
86          // Register for notification of new streams.
87          connection.addListener(new Http2ConnectionAdapter() {
88              @Override
89              public void onStreamAdded(Http2Stream stream) {
90                  // Unconditionally used the reduced flow control state because it requires no object allocation
91                  // and the DefaultFlowState will be allocated in onStreamActive.
92                  stream.setProperty(stateKey, REDUCED_FLOW_STATE);
93              }
94  
95              @Override
96              public void onStreamActive(Http2Stream stream) {
97                  // Need to be sure the stream's initial window is adjusted for SETTINGS
98                  // frames which may have been exchanged while it was in IDLE
99                  stream.setProperty(stateKey, new DefaultState(stream, initialWindowSize));
100             }
101 
102             @Override
103             public void onStreamClosed(Http2Stream stream) {
104                 try {
105                     // When a stream is closed, consume any remaining bytes so that they
106                     // are restored to the connection window.
107                     FlowState state = state(stream);
108                     int unconsumedBytes = state.unconsumedBytes();
109                     if (ctx != null && unconsumedBytes > 0) {
110                         if (consumeAllBytes(state, unconsumedBytes)) {
111                             // As the user has no real control on when this callback is used we should better
112                             // call flush() if we produced any window update to ensure we not stale.
113                             ctx.flush();
114                         }
115                     }
116                 } catch (Http2Exception e) {
117                     PlatformDependent.throwException(e);
118                 } finally {
119                     // Unconditionally reduce the amount of memory required for flow control because there is no
120                     // object allocation costs associated with doing so and the stream will not have any more
121                     // local flow control state to keep track of anymore.
122                     stream.setProperty(stateKey, REDUCED_FLOW_STATE);
123                 }
124             }
125         });
126     }
127 
128     @Override
129     public DefaultHttp2LocalFlowController frameWriter(Http2FrameWriter frameWriter) {
130         this.frameWriter = checkNotNull(frameWriter, "frameWriter");
131         return this;
132     }
133 
134     @Override
135     public void channelHandlerContext(ChannelHandlerContext ctx) {
136         this.ctx = checkNotNull(ctx, "ctx");
137     }
138 
139     @Override
140     public void initialWindowSize(int newWindowSize) throws Http2Exception {
141         assert ctx == null || ctx.executor().inEventLoop();
142         int delta = newWindowSize - initialWindowSize;
143         initialWindowSize = newWindowSize;
144 
145         WindowUpdateVisitor visitor = new WindowUpdateVisitor(delta);
146         connection.forEachActiveStream(visitor);
147         visitor.throwIfError();
148     }
149 
150     @Override
151     public int initialWindowSize() {
152         return initialWindowSize;
153     }
154 
155     @Override
156     public int windowSize(Http2Stream stream) {
157         return state(stream).windowSize();
158     }
159 
160     @Override
161     public int initialWindowSize(Http2Stream stream) {
162         return state(stream).initialWindowSize();
163     }
164 
165     @Override
166     public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
167         assert ctx != null && ctx.executor().inEventLoop();
168         FlowState state = state(stream);
169         // Just add the delta to the stream-specific initial window size so that the next time the window
170         // expands it will grow to the new initial size.
171         state.incrementInitialStreamWindow(delta);
172         state.writeWindowUpdateIfNeeded();
173     }
174 
175     @Override
176     public boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception {
177         assert ctx != null && ctx.executor().inEventLoop();
178         checkPositiveOrZero(numBytes, "numBytes");
179         if (numBytes == 0) {
180             return false;
181         }
182 
183         // Streams automatically consume all remaining bytes when they are closed, so just ignore
184         // if already closed.
185         if (stream != null && !isClosed(stream)) {
186             if (stream.id() == CONNECTION_STREAM_ID) {
187                 throw new UnsupportedOperationException("Returning bytes for the connection window is not supported");
188             }
189 
190             return consumeAllBytes(state(stream), numBytes);
191         }
192         return false;
193     }
194 
195     private boolean consumeAllBytes(FlowState state, int numBytes) throws Http2Exception {
196         return connectionState().consumeBytes(numBytes) | state.consumeBytes(numBytes);
197     }
198 
199     @Override
200     public int unconsumedBytes(Http2Stream stream) {
201         return state(stream).unconsumedBytes();
202     }
203 
204     private static void checkValidRatio(float ratio) {
205         if (Double.compare(ratio, 0.0) <= 0 || Double.compare(ratio, 1.0) >= 0) {
206             throw new IllegalArgumentException("Invalid ratio: " + ratio);
207         }
208     }
209 
210     /**
211      * The window update ratio is used to determine when a window update must be sent. If the ratio
212      * of bytes processed since the last update has meet or exceeded this ratio then a window update will
213      * be sent. This is the global window update ratio that will be used for new streams.
214      * @param ratio the ratio to use when checking if a {@code WINDOW_UPDATE} is determined necessary for new streams.
215      * @throws IllegalArgumentException If the ratio is out of bounds (0, 1).
216      */
217     public void windowUpdateRatio(float ratio) {
218         assert ctx == null || ctx.executor().inEventLoop();
219         checkValidRatio(ratio);
220         windowUpdateRatio = ratio;
221     }
222 
223     /**
224      * The window update ratio is used to determine when a window update must be sent. If the ratio
225      * of bytes processed since the last update has meet or exceeded this ratio then a window update will
226      * be sent. This is the global window update ratio that will be used for new streams.
227      */
228     public float windowUpdateRatio() {
229         return windowUpdateRatio;
230     }
231 
232     /**
233      * The window update ratio is used to determine when a window update must be sent. If the ratio
234      * of bytes processed since the last update has meet or exceeded this ratio then a window update will
235      * be sent. This window update ratio will only be applied to {@code streamId}.
236      * <p>
237      * Note it is the responsibly of the caller to ensure that the
238      * initial {@code SETTINGS} frame is sent before this is called. It would
239      * be considered a {@link Http2Error#PROTOCOL_ERROR} if a {@code WINDOW_UPDATE}
240      * was generated by this method before the initial {@code SETTINGS} frame is sent.
241      * @param stream the stream for which {@code ratio} applies to.
242      * @param ratio the ratio to use when checking if a {@code WINDOW_UPDATE} is determined necessary.
243      * @throws Http2Exception If a protocol-error occurs while generating {@code WINDOW_UPDATE} frames
244      */
245     public void windowUpdateRatio(Http2Stream stream, float ratio) throws Http2Exception {
246         assert ctx != null && ctx.executor().inEventLoop();
247         checkValidRatio(ratio);
248         FlowState state = state(stream);
249         state.windowUpdateRatio(ratio);
250         state.writeWindowUpdateIfNeeded();
251     }
252 
253     /**
254      * The window update ratio is used to determine when a window update must be sent. If the ratio
255      * of bytes processed since the last update has meet or exceeded this ratio then a window update will
256      * be sent. This window update ratio will only be applied to {@code streamId}.
257      * @throws Http2Exception If no stream corresponding to {@code stream} could be found.
258      */
259     public float windowUpdateRatio(Http2Stream stream) throws Http2Exception {
260         return state(stream).windowUpdateRatio();
261     }
262 
263     @Override
264     public void receiveFlowControlledFrame(Http2Stream stream, ByteBuf data, int padding,
265             boolean endOfStream) throws Http2Exception {
266         assert ctx != null && ctx.executor().inEventLoop();
267         int dataLength = data.readableBytes() + padding;
268 
269         // Apply the connection-level flow control
270         FlowState connectionState = connectionState();
271         connectionState.receiveFlowControlledFrame(dataLength);
272 
273         if (stream != null && !isClosed(stream)) {
274             // Apply the stream-level flow control
275             FlowState state = state(stream);
276             state.endOfStream(endOfStream);
277             state.receiveFlowControlledFrame(dataLength);
278         } else if (dataLength > 0) {
279             // Immediately consume the bytes for the connection window.
280             connectionState.consumeBytes(dataLength);
281         }
282     }
283 
284     private FlowState connectionState() {
285         return connection.connectionStream().getProperty(stateKey);
286     }
287 
288     private FlowState state(Http2Stream stream) {
289         return stream.getProperty(stateKey);
290     }
291 
292     private static boolean isClosed(Http2Stream stream) {
293         return stream.state() == Http2Stream.State.CLOSED;
294     }
295 
296     /**
297      * Flow control state that does autorefill of the flow control window when the data is
298      * received.
299      */
300     private final class AutoRefillState extends DefaultState {
301         AutoRefillState(Http2Stream stream, int initialWindowSize) {
302             super(stream, initialWindowSize);
303         }
304 
305         @Override
306         public void receiveFlowControlledFrame(int dataLength) throws Http2Exception {
307             super.receiveFlowControlledFrame(dataLength);
308             // Need to call the super to consume the bytes, since this.consumeBytes does nothing.
309             super.consumeBytes(dataLength);
310         }
311 
312         @Override
313         public boolean consumeBytes(int numBytes) throws Http2Exception {
314             // Do nothing, since the bytes are already consumed upon receiving the data.
315             return false;
316         }
317     }
318 
319     /**
320      * Flow control window state for an individual stream.
321      */
322     private class DefaultState implements FlowState {
323         private final Http2Stream stream;
324 
325         /**
326          * The actual flow control window that is decremented as soon as {@code DATA} arrives.
327          */
328         private int window;
329 
330         /**
331          * A view of {@link #window} that is used to determine when to send {@code WINDOW_UPDATE}
332          * frames. Decrementing this window for received {@code DATA} frames is delayed until the
333          * application has indicated that the data has been fully processed. This prevents sending
334          * a {@code WINDOW_UPDATE} until the number of processed bytes drops below the threshold.
335          */
336         private int processedWindow;
337 
338         /**
339          * This is what is used to determine how many bytes need to be returned relative to {@link #processedWindow}.
340          * Each stream has their own initial window size.
341          */
342         private int initialStreamWindowSize;
343 
344         /**
345          * This is used to determine when {@link #processedWindow} is sufficiently far away from
346          * {@link #initialStreamWindowSize} such that a {@code WINDOW_UPDATE} should be sent.
347          * Each stream has their own window update ratio.
348          */
349         private float streamWindowUpdateRatio;
350 
351         private int lowerBound;
352         private boolean endOfStream;
353 
354         DefaultState(Http2Stream stream, int initialWindowSize) {
355             this.stream = stream;
356             window(initialWindowSize);
357             streamWindowUpdateRatio = windowUpdateRatio;
358         }
359 
360         @Override
361         public void window(int initialWindowSize) {
362             assert ctx == null || ctx.executor().inEventLoop();
363             window = processedWindow = initialStreamWindowSize = initialWindowSize;
364         }
365 
366         @Override
367         public int windowSize() {
368             return window;
369         }
370 
371         @Override
372         public int initialWindowSize() {
373             return initialStreamWindowSize;
374         }
375 
376         @Override
377         public void endOfStream(boolean endOfStream) {
378             this.endOfStream = endOfStream;
379         }
380 
381         @Override
382         public float windowUpdateRatio() {
383             return streamWindowUpdateRatio;
384         }
385 
386         @Override
387         public void windowUpdateRatio(float ratio) {
388             assert ctx == null || ctx.executor().inEventLoop();
389             streamWindowUpdateRatio = ratio;
390         }
391 
392         @Override
393         public void incrementInitialStreamWindow(int delta) {
394             // Clip the delta so that the resulting initialStreamWindowSize falls within the allowed range.
395             int newValue = (int) min(MAX_INITIAL_WINDOW_SIZE,
396                     max(MIN_INITIAL_WINDOW_SIZE, initialStreamWindowSize + (long) delta));
397             delta = newValue - initialStreamWindowSize;
398 
399             initialStreamWindowSize += delta;
400         }
401 
402         @Override
403         public void incrementFlowControlWindows(int delta) throws Http2Exception {
404             if (delta > 0 && window > MAX_INITIAL_WINDOW_SIZE - delta) {
405                 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
406                         "Flow control window overflowed for stream: %d", stream.id());
407             }
408 
409             window += delta;
410             processedWindow += delta;
411             lowerBound = min(delta, 0);
412         }
413 
414         @Override
415         public void receiveFlowControlledFrame(int dataLength) throws Http2Exception {
416             assert dataLength >= 0;
417 
418             // Apply the delta. Even if we throw an exception we want to have taken this delta into account.
419             window -= dataLength;
420 
421             // Window size can become negative if we sent a SETTINGS frame that reduces the
422             // size of the transfer window after the peer has written data frames.
423             // The value is bounded by the length that SETTINGS frame decrease the window.
424             // This difference is stored for the connection when writing the SETTINGS frame
425             // and is cleared once we send a WINDOW_UPDATE frame.
426             if (window < lowerBound) {
427                 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
428                         "Flow control window exceeded for stream: %d", stream.id());
429             }
430         }
431 
432         private void returnProcessedBytes(int delta) throws Http2Exception {
433             if (processedWindow - delta < window) {
434                 throw streamError(stream.id(), INTERNAL_ERROR,
435                         "Attempting to return too many bytes for stream %d", stream.id());
436             }
437             processedWindow -= delta;
438         }
439 
440         @Override
441         public boolean consumeBytes(int numBytes) throws Http2Exception {
442             // Return the bytes processed and update the window.
443             returnProcessedBytes(numBytes);
444             return writeWindowUpdateIfNeeded();
445         }
446 
447         @Override
448         public int unconsumedBytes() {
449             return processedWindow - window;
450         }
451 
452         @Override
453         public boolean writeWindowUpdateIfNeeded() throws Http2Exception {
454             if (endOfStream || initialStreamWindowSize <= 0 ||
455                     // If the stream is already closed there is no need to try to write a window update for it.
456                     isClosed(stream)) {
457                 return false;
458             }
459 
460             int threshold = (int) (initialStreamWindowSize * streamWindowUpdateRatio);
461             if (processedWindow <= threshold) {
462                 writeWindowUpdate();
463                 return true;
464             }
465             return false;
466         }
467 
468         /**
469          * Called to perform a window update for this stream (or connection). Updates the window size back
470          * to the size of the initial window and sends a window update frame to the remote endpoint.
471          */
472         private void writeWindowUpdate() throws Http2Exception {
473             // Expand the window for this stream back to the size of the initial window.
474             int deltaWindowSize = initialStreamWindowSize - processedWindow;
475             try {
476                 incrementFlowControlWindows(deltaWindowSize);
477             } catch (Throwable t) {
478                 throw connectionError(INTERNAL_ERROR, t,
479                         "Attempting to return too many bytes for stream %d", stream.id());
480             }
481 
482             // Send a window update for the stream/connection.
483             frameWriter.writeWindowUpdate(ctx, stream.id(), deltaWindowSize, ctx.newPromise());
484         }
485     }
486 
487     /**
488      * The local flow control state for a single stream that is not in a state where flow controlled frames cannot
489      * be exchanged.
490      */
491     private static final FlowState REDUCED_FLOW_STATE = new FlowState() {
492 
493         @Override
494         public int windowSize() {
495             return 0;
496         }
497 
498         @Override
499         public int initialWindowSize() {
500             return 0;
501         }
502 
503         @Override
504         public void window(int initialWindowSize) {
505             throw new UnsupportedOperationException();
506         }
507 
508         @Override
509         public void incrementInitialStreamWindow(int delta) {
510             // This operation needs to be supported during the initial settings exchange when
511             // the peer has not yet acknowledged this peer being activated.
512         }
513 
514         @Override
515         public boolean writeWindowUpdateIfNeeded() throws Http2Exception {
516             throw new UnsupportedOperationException();
517         }
518 
519         @Override
520         public boolean consumeBytes(int numBytes) throws Http2Exception {
521             return false;
522         }
523 
524         @Override
525         public int unconsumedBytes() {
526             return 0;
527         }
528 
529         @Override
530         public float windowUpdateRatio() {
531             throw new UnsupportedOperationException();
532         }
533 
534         @Override
535         public void windowUpdateRatio(float ratio) {
536             throw new UnsupportedOperationException();
537         }
538 
539         @Override
540         public void receiveFlowControlledFrame(int dataLength) throws Http2Exception {
541             throw new UnsupportedOperationException();
542         }
543 
544         @Override
545         public void incrementFlowControlWindows(int delta) throws Http2Exception {
546             // This operation needs to be supported during the initial settings exchange when
547             // the peer has not yet acknowledged this peer being activated.
548         }
549 
550         @Override
551         public void endOfStream(boolean endOfStream) {
552             throw new UnsupportedOperationException();
553         }
554     };
555 
556     /**
557      * An abstraction which provides specific extensions used by local flow control.
558      */
559     private interface FlowState {
560 
561         int windowSize();
562 
563         int initialWindowSize();
564 
565         void window(int initialWindowSize);
566 
567         /**
568          * Increment the initial window size for this stream.
569          * @param delta The amount to increase the initial window size by.
570          */
571         void incrementInitialStreamWindow(int delta);
572 
573         /**
574          * Updates the flow control window for this stream if it is appropriate.
575          *
576          * @return true if {@code WINDOW_UPDATE} was written, false otherwise.
577          */
578         boolean writeWindowUpdateIfNeeded() throws Http2Exception;
579 
580         /**
581          * Indicates that the application has consumed {@code numBytes} from the connection or stream and is
582          * ready to receive more data.
583          *
584          * @param numBytes the number of bytes to be returned to the flow control window.
585          * @return true if {@code WINDOW_UPDATE} was written, false otherwise.
586          * @throws Http2Exception
587          */
588         boolean consumeBytes(int numBytes) throws Http2Exception;
589 
590         int unconsumedBytes();
591 
592         float windowUpdateRatio();
593 
594         void windowUpdateRatio(float ratio);
595 
596         /**
597          * A flow control event has occurred and we should decrement the amount of available bytes for this stream.
598          * @param dataLength The amount of data to for which this stream is no longer eligible to use for flow control.
599          * @throws Http2Exception If too much data is used relative to how much is available.
600          */
601         void receiveFlowControlledFrame(int dataLength) throws Http2Exception;
602 
603         /**
604          * Increment the windows which are used to determine many bytes have been processed.
605          * @param delta The amount to increment the window by.
606          * @throws Http2Exception if integer overflow occurs on the window.
607          */
608         void incrementFlowControlWindows(int delta) throws Http2Exception;
609 
610         void endOfStream(boolean endOfStream);
611     }
612 
613     /**
614      * Provides a means to iterate over all active streams and increment the flow control windows.
615      */
616     private final class WindowUpdateVisitor implements Http2StreamVisitor {
617         private CompositeStreamException compositeException;
618         private final int delta;
619 
620         WindowUpdateVisitor(int delta) {
621             this.delta = delta;
622         }
623 
624         @Override
625         public boolean visit(Http2Stream stream) throws Http2Exception {
626             try {
627                 // Increment flow control window first so state will be consistent if overflow is detected.
628                 FlowState state = state(stream);
629                 state.incrementFlowControlWindows(delta);
630                 state.incrementInitialStreamWindow(delta);
631             } catch (StreamException e) {
632                 if (compositeException == null) {
633                     compositeException = new CompositeStreamException(e.error(), 4);
634                 }
635                 compositeException.add(e);
636             }
637             return true;
638         }
639 
640         public void throwIfError() throws CompositeStreamException {
641             if (compositeException != null) {
642                 throw compositeException;
643             }
644         }
645     }
646 }