View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.util.internal.UnstableApi;
18  
19  import java.util.ArrayDeque;
20  import java.util.Deque;
21  
22  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MIN_ALLOCATION_CHUNK;
23  import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes;
24  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
25  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
26  import static io.netty.util.internal.ObjectUtil.checkNotNull;
27  import static java.lang.Math.max;
28  import static java.lang.Math.min;
29  
30  /**
31   * A {@link StreamByteDistributor} that ignores stream priority and uniformly allocates bytes to all
32   * streams. This class uses a minimum chunk size that will be allocated to each stream. While
33   * fewer streams may be written to in each call to {@link #distribute(int, Writer)}, doing this
34   * should improve the goodput on each written stream.
35   */
36  @UnstableApi
37  public final class UniformStreamByteDistributor implements StreamByteDistributor {
38      private final Http2Connection.PropertyKey stateKey;
39      private final Deque<State> queue = new ArrayDeque<State>(4);
40  
41      /**
42       * The minimum number of bytes that we will attempt to allocate to a stream. This is to
43       * help improve goodput on a per-stream basis.
44       */
45      private int minAllocationChunk = DEFAULT_MIN_ALLOCATION_CHUNK;
46      private long totalStreamableBytes;
47  
48      public UniformStreamByteDistributor(Http2Connection connection) {
49          // Add a state for the connection.
50          stateKey = connection.newKey();
51          Http2Stream connectionStream = connection.connectionStream();
52          connectionStream.setProperty(stateKey, new State(connectionStream));
53  
54          // Register for notification of new streams.
55          connection.addListener(new Http2ConnectionAdapter() {
56              @Override
57              public void onStreamAdded(Http2Stream stream) {
58                  stream.setProperty(stateKey, new State(stream));
59              }
60  
61              @Override
62              public void onStreamClosed(Http2Stream stream) {
63                  state(stream).close();
64              }
65          });
66      }
67  
68      /**
69       * Sets the minimum allocation chunk that will be allocated to each stream. Defaults to 1KiB.
70       *
71       * @param minAllocationChunk the minimum number of bytes that will be allocated to each stream.
72       * Must be > 0.
73       */
74      public void minAllocationChunk(int minAllocationChunk) {
75          if (minAllocationChunk <= 0) {
76              throw new IllegalArgumentException("minAllocationChunk must be > 0");
77          }
78          this.minAllocationChunk = minAllocationChunk;
79      }
80  
81      @Override
82      public void updateStreamableBytes(StreamState streamState) {
83          state(streamState.stream()).updateStreamableBytes(streamableBytes(streamState),
84                                                            streamState.hasFrame(),
85                                                            streamState.windowSize());
86      }
87  
88      @Override
89      public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
90          // This class ignores priority and dependency!
91      }
92  
93      @Override
94      public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
95          final int size = queue.size();
96          if (size == 0) {
97              return totalStreamableBytes > 0;
98          }
99  
100         final int chunkSize = max(minAllocationChunk, maxBytes / size);
101 
102         State state = queue.pollFirst();
103         do {
104             state.enqueued = false;
105             if (state.windowNegative) {
106                 continue;
107             }
108             if (maxBytes == 0 && state.streamableBytes > 0) {
109                 // Stop at the first state that can't send. Add this state back to the head of the queue. Note
110                 // that empty frames at the head of the queue will always be written, assuming the stream window
111                 // is not negative.
112                 queue.addFirst(state);
113                 state.enqueued = true;
114                 break;
115             }
116 
117             // Allocate as much data as we can for this stream.
118             int chunk = min(chunkSize, min(maxBytes, state.streamableBytes));
119             maxBytes -= chunk;
120 
121             // Write the allocated bytes and enqueue as necessary.
122             state.write(chunk, writer);
123         } while ((state = queue.pollFirst()) != null);
124 
125         return totalStreamableBytes > 0;
126     }
127 
128     private State state(Http2Stream stream) {
129         return checkNotNull(stream, "stream").getProperty(stateKey);
130     }
131 
132     /**
133      * The remote flow control state for a single stream.
134      */
135     private final class State {
136         final Http2Stream stream;
137         int streamableBytes;
138         boolean windowNegative;
139         boolean enqueued;
140         boolean writing;
141 
142         State(Http2Stream stream) {
143             this.stream = stream;
144         }
145 
146         void updateStreamableBytes(int newStreamableBytes, boolean hasFrame, int windowSize) {
147             assert hasFrame || newStreamableBytes == 0 :
148                 "hasFrame: " + hasFrame + " newStreamableBytes: " + newStreamableBytes;
149 
150             int delta = newStreamableBytes - streamableBytes;
151             if (delta != 0) {
152                 streamableBytes = newStreamableBytes;
153                 totalStreamableBytes += delta;
154             }
155             // In addition to only enqueuing state when they have frames we enforce the following restrictions:
156             // 1. If the window has gone negative. We never want to queue a state. However we also don't want to
157             //    Immediately remove the item if it is already queued because removal from deque is O(n). So
158             //    we allow it to stay queued and rely on the distribution loop to remove this state.
159             // 2. If the window is zero we only want to queue if we are not writing. If we are writing that means
160             //    we gave the state a chance to write zero length frames. We wait until updateStreamableBytes is
161             //    called again before this state is allowed to write.
162             windowNegative = windowSize < 0;
163             if (hasFrame && (windowSize > 0 || (windowSize == 0 && !writing))) {
164                 addToQueue();
165             }
166         }
167 
168         /**
169          * Write any allocated bytes for the given stream and updates the streamable bytes,
170          * assuming all of the bytes will be written.
171          */
172         void write(int numBytes, Writer writer) throws Http2Exception {
173             writing = true;
174             try {
175                 // Write the allocated bytes.
176                 writer.write(stream, numBytes);
177             } catch (Throwable t) {
178                 throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
179             } finally {
180                 writing = false;
181             }
182         }
183 
184         void addToQueue() {
185             if (!enqueued) {
186                 enqueued = true;
187                 queue.addLast(this);
188             }
189         }
190 
191         void removeFromQueue() {
192             if (enqueued) {
193                 enqueued = false;
194                 queue.remove(this);
195             }
196         }
197 
198         void close() {
199             // Remove this state from the queue.
200             removeFromQueue();
201 
202             // Clear the streamable bytes.
203             updateStreamableBytes(0, false, 0);
204         }
205     }
206 }