View Javadoc
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty5.handler.codec.http2;
16  
17  import io.netty5.util.internal.UnstableApi;
18  
19  import java.util.ArrayDeque;
20  import java.util.Deque;
21  
22  import static io.netty5.handler.codec.http2.Http2CodecUtil.DEFAULT_MIN_ALLOCATION_CHUNK;
23  import static io.netty5.handler.codec.http2.Http2CodecUtil.streamableBytes;
24  import static io.netty5.handler.codec.http2.Http2Error.INTERNAL_ERROR;
25  import static io.netty5.handler.codec.http2.Http2Exception.connectionError;
26  import static io.netty5.util.internal.ObjectUtil.checkPositive;
27  import static java.lang.Math.max;
28  import static java.lang.Math.min;
29  import static java.util.Objects.requireNonNull;
30  
31  /**
32   * A {@link StreamByteDistributor} that ignores stream priority and uniformly allocates bytes to all
33   * streams. This class uses a minimum chunk size that will be allocated to each stream. While
34   * fewer streams may be written to in each call to {@link #distribute(int, Writer)}, doing this
35   * should improve the goodput on each written stream.
36   */
37  @UnstableApi
38  public final class UniformStreamByteDistributor implements StreamByteDistributor {
39      private final Http2Connection.PropertyKey stateKey;
40      private final Deque<State> queue = new ArrayDeque<>(4);
41  
42      /**
43       * The minimum number of bytes that we will attempt to allocate to a stream. This is to
44       * help improve goodput on a per-stream basis.
45       */
46      private int minAllocationChunk = DEFAULT_MIN_ALLOCATION_CHUNK;
47      private long totalStreamableBytes;
48  
49      public UniformStreamByteDistributor(Http2Connection connection) {
50          // Add a state for the connection.
51          stateKey = connection.newKey();
52          Http2Stream connectionStream = connection.connectionStream();
53          connectionStream.setProperty(stateKey, new State(connectionStream));
54  
55          // Register for notification of new streams.
56          connection.addListener(new Http2ConnectionAdapter() {
57              @Override
58              public void onStreamAdded(Http2Stream stream) {
59                  stream.setProperty(stateKey, new State(stream));
60              }
61  
62              @Override
63              public void onStreamClosed(Http2Stream stream) {
64                  state(stream).close();
65              }
66          });
67      }
68  
69      /**
70       * Sets the minimum allocation chunk that will be allocated to each stream. Defaults to 1KiB.
71       *
72       * @param minAllocationChunk the minimum number of bytes that will be allocated to each stream.
73       * Must be > 0.
74       */
75      public void minAllocationChunk(int minAllocationChunk) {
76          checkPositive(minAllocationChunk, "minAllocationChunk");
77          this.minAllocationChunk = minAllocationChunk;
78      }
79  
80      @Override
81      public void updateStreamableBytes(StreamState streamState) {
82          state(streamState.stream()).updateStreamableBytes(streamableBytes(streamState),
83                                                            streamState.hasFrame(),
84                                                            streamState.windowSize());
85      }
86  
87      @Override
88      public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
89          // This class ignores priority and dependency!
90      }
91  
92      @Override
93      public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
94          final int size = queue.size();
95          if (size == 0) {
96              return totalStreamableBytes > 0;
97          }
98  
99          final int chunkSize = max(minAllocationChunk, maxBytes / size);
100 
101         State state = queue.pollFirst();
102         do {
103             state.enqueued = false;
104             if (state.windowNegative) {
105                 continue;
106             }
107             if (maxBytes == 0 && state.streamableBytes > 0) {
108                 // Stop at the first state that can't send. Add this state back to the head of the queue. Note
109                 // that empty frames at the head of the queue will always be written, assuming the stream window
110                 // is not negative.
111                 queue.addFirst(state);
112                 state.enqueued = true;
113                 break;
114             }
115 
116             // Allocate as much data as we can for this stream.
117             int chunk = min(chunkSize, min(maxBytes, state.streamableBytes));
118             maxBytes -= chunk;
119 
120             // Write the allocated bytes and enqueue as necessary.
121             state.write(chunk, writer);
122         } while ((state = queue.pollFirst()) != null);
123 
124         return totalStreamableBytes > 0;
125     }
126 
127     private State state(Http2Stream stream) {
128         return requireNonNull(stream, "stream").getProperty(stateKey);
129     }
130 
131     /**
132      * The remote flow control state for a single stream.
133      */
134     private final class State {
135         final Http2Stream stream;
136         int streamableBytes;
137         boolean windowNegative;
138         boolean enqueued;
139         boolean writing;
140 
141         State(Http2Stream stream) {
142             this.stream = stream;
143         }
144 
145         void updateStreamableBytes(int newStreamableBytes, boolean hasFrame, int windowSize) {
146             assert hasFrame || newStreamableBytes == 0 :
147                 "hasFrame: " + hasFrame + " newStreamableBytes: " + newStreamableBytes;
148 
149             int delta = newStreamableBytes - streamableBytes;
150             if (delta != 0) {
151                 streamableBytes = newStreamableBytes;
152                 totalStreamableBytes += delta;
153             }
154             // In addition to only enqueuing state when they have frames we enforce the following restrictions:
155             // 1. If the window has gone negative. We never want to queue a state. However we also don't want to
156             //    Immediately remove the item if it is already queued because removal from deque is O(n). So
157             //    we allow it to stay queued and rely on the distribution loop to remove this state.
158             // 2. If the window is zero we only want to queue if we are not writing. If we are writing that means
159             //    we gave the state a chance to write zero length frames. We wait until updateStreamableBytes is
160             //    called again before this state is allowed to write.
161             windowNegative = windowSize < 0;
162             if (hasFrame && (windowSize > 0 || windowSize == 0 && !writing)) {
163                 addToQueue();
164             }
165         }
166 
167         /**
168          * Write any allocated bytes for the given stream and updates the streamable bytes,
169          * assuming all of the bytes will be written.
170          */
171         void write(int numBytes, Writer writer) throws Http2Exception {
172             writing = true;
173             try {
174                 // Write the allocated bytes.
175                 writer.write(stream, numBytes);
176             } catch (Throwable t) {
177                 throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
178             } finally {
179                 writing = false;
180             }
181         }
182 
183         void addToQueue() {
184             if (!enqueued) {
185                 enqueued = true;
186                 queue.addLast(this);
187             }
188         }
189 
190         void removeFromQueue() {
191             if (enqueued) {
192                 enqueued = false;
193                 queue.remove(this);
194             }
195         }
196 
197         void close() {
198             // Remove this state from the queue.
199             removeFromQueue();
200 
201             // Clear the streamable bytes.
202             updateStreamableBytes(0, false, 0);
203         }
204     }
205 }