View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import java.util.ArrayDeque;
18  import java.util.Deque;
19  
20  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MIN_ALLOCATION_CHUNK;
21  import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes;
22  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
23  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
24  import static io.netty.util.internal.ObjectUtil.checkNotNull;
25  import static io.netty.util.internal.ObjectUtil.checkPositive;
26  import static java.lang.Math.max;
27  import static java.lang.Math.min;
28  
29  /**
30   * A {@link StreamByteDistributor} that ignores stream priority and uniformly allocates bytes to all
31   * streams. This class uses a minimum chunk size that will be allocated to each stream. While
32   * fewer streams may be written to in each call to {@link #distribute(int, Writer)}, doing this
33   * should improve the goodput on each written stream.
34   */
35  public final class UniformStreamByteDistributor implements StreamByteDistributor {
36      private final Http2Connection.PropertyKey stateKey;
37      private final Deque<State> queue = new ArrayDeque<State>(4);
38  
39      /**
40       * The minimum number of bytes that we will attempt to allocate to a stream. This is to
41       * help improve goodput on a per-stream basis.
42       */
43      private int minAllocationChunk = DEFAULT_MIN_ALLOCATION_CHUNK;
44      private long totalStreamableBytes;
45  
46      public UniformStreamByteDistributor(Http2Connection connection) {
47          // Add a state for the connection.
48          stateKey = connection.newKey();
49          Http2Stream connectionStream = connection.connectionStream();
50          connectionStream.setProperty(stateKey, new State(connectionStream));
51  
52          // Register for notification of new streams.
53          connection.addListener(new Http2ConnectionAdapter() {
54              @Override
55              public void onStreamAdded(Http2Stream stream) {
56                  stream.setProperty(stateKey, new State(stream));
57              }
58  
59              @Override
60              public void onStreamClosed(Http2Stream stream) {
61                  state(stream).close();
62              }
63          });
64      }
65  
66      /**
67       * Sets the minimum allocation chunk that will be allocated to each stream. Defaults to 1KiB.
68       *
69       * @param minAllocationChunk the minimum number of bytes that will be allocated to each stream.
70       * Must be > 0.
71       */
72      public void minAllocationChunk(int minAllocationChunk) {
73          checkPositive(minAllocationChunk, "minAllocationChunk");
74          this.minAllocationChunk = minAllocationChunk;
75      }
76  
77      @Override
78      public void updateStreamableBytes(StreamState streamState) {
79          state(streamState.stream()).updateStreamableBytes(streamableBytes(streamState),
80                                                            streamState.hasFrame(),
81                                                            streamState.windowSize());
82      }
83  
84      @Override
85      public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
86          // This class ignores priority and dependency!
87      }
88  
89      @Override
90      public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
91          final int size = queue.size();
92          if (size == 0) {
93              return totalStreamableBytes > 0;
94          }
95  
96          final int chunkSize = max(minAllocationChunk, maxBytes / size);
97  
98          State state = queue.pollFirst();
99          do {
100             state.enqueued = false;
101             if (state.windowNegative) {
102                 continue;
103             }
104             if (maxBytes == 0 && state.streamableBytes > 0) {
105                 // Stop at the first state that can't send. Add this state back to the head of the queue. Note
106                 // that empty frames at the head of the queue will always be written, assuming the stream window
107                 // is not negative.
108                 queue.addFirst(state);
109                 state.enqueued = true;
110                 break;
111             }
112 
113             // Allocate as much data as we can for this stream.
114             int chunk = min(chunkSize, min(maxBytes, state.streamableBytes));
115             maxBytes -= chunk;
116 
117             // Write the allocated bytes and enqueue as necessary.
118             state.write(chunk, writer);
119         } while ((state = queue.pollFirst()) != null);
120 
121         return totalStreamableBytes > 0;
122     }
123 
124     private State state(Http2Stream stream) {
125         return checkNotNull(stream, "stream").getProperty(stateKey);
126     }
127 
128     /**
129      * The remote flow control state for a single stream.
130      */
131     private final class State {
132         final Http2Stream stream;
133         int streamableBytes;
134         boolean windowNegative;
135         boolean enqueued;
136         boolean writing;
137 
138         State(Http2Stream stream) {
139             this.stream = stream;
140         }
141 
142         void updateStreamableBytes(int newStreamableBytes, boolean hasFrame, int windowSize) {
143             assert hasFrame || newStreamableBytes == 0 :
144                 "hasFrame: " + hasFrame + " newStreamableBytes: " + newStreamableBytes;
145 
146             int delta = newStreamableBytes - streamableBytes;
147             if (delta != 0) {
148                 streamableBytes = newStreamableBytes;
149                 totalStreamableBytes += delta;
150             }
151             // In addition to only enqueuing state when they have frames we enforce the following restrictions:
152             // 1. If the window has gone negative. We never want to queue a state. However we also don't want to
153             //    Immediately remove the item if it is already queued because removal from deque is O(n). So
154             //    we allow it to stay queued and rely on the distribution loop to remove this state.
155             // 2. If the window is zero we only want to queue if we are not writing. If we are writing that means
156             //    we gave the state a chance to write zero length frames. We wait until updateStreamableBytes is
157             //    called again before this state is allowed to write.
158             windowNegative = windowSize < 0;
159             if (hasFrame && (windowSize > 0 || windowSize == 0 && !writing)) {
160                 addToQueue();
161             }
162         }
163 
164         /**
165          * Write any allocated bytes for the given stream and updates the streamable bytes,
166          * assuming all of the bytes will be written.
167          */
168         void write(int numBytes, Writer writer) throws Http2Exception {
169             writing = true;
170             try {
171                 // Write the allocated bytes.
172                 writer.write(stream, numBytes);
173             } catch (Throwable t) {
174                 throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
175             } finally {
176                 writing = false;
177             }
178         }
179 
180         void addToQueue() {
181             if (!enqueued) {
182                 enqueued = true;
183                 queue.addLast(this);
184             }
185         }
186 
187         void removeFromQueue() {
188             if (enqueued) {
189                 enqueued = false;
190                 queue.remove(this);
191             }
192         }
193 
194         void close() {
195             // Remove this state from the queue.
196             removeFromQueue();
197 
198             // Clear the streamable bytes.
199             updateStreamableBytes(0, false, 0);
200         }
201     }
202 }