1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import java.util.ArrayDeque;
18 import java.util.Deque;
19
20 import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MIN_ALLOCATION_CHUNK;
21 import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes;
22 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
23 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
24 import static io.netty.util.internal.ObjectUtil.checkNotNull;
25 import static io.netty.util.internal.ObjectUtil.checkPositive;
26 import static java.lang.Math.max;
27 import static java.lang.Math.min;
28
29
30
31
32
33
34
35 public final class UniformStreamByteDistributor implements StreamByteDistributor {
36 private final Http2Connection.PropertyKey stateKey;
37 private final Deque<State> queue = new ArrayDeque<State>(4);
38
39
40
41
42
43 private int minAllocationChunk = DEFAULT_MIN_ALLOCATION_CHUNK;
44 private long totalStreamableBytes;
45
46 public UniformStreamByteDistributor(Http2Connection connection) {
47
48 stateKey = connection.newKey();
49 Http2Stream connectionStream = connection.connectionStream();
50 connectionStream.setProperty(stateKey, new State(connectionStream));
51
52
53 connection.addListener(new Http2ConnectionAdapter() {
54 @Override
55 public void onStreamAdded(Http2Stream stream) {
56 stream.setProperty(stateKey, new State(stream));
57 }
58
59 @Override
60 public void onStreamClosed(Http2Stream stream) {
61 state(stream).close();
62 }
63 });
64 }
65
66
67
68
69
70
71
72 public void minAllocationChunk(int minAllocationChunk) {
73 checkPositive(minAllocationChunk, "minAllocationChunk");
74 this.minAllocationChunk = minAllocationChunk;
75 }
76
77 @Override
78 public void updateStreamableBytes(StreamState streamState) {
79 state(streamState.stream()).updateStreamableBytes(streamableBytes(streamState),
80 streamState.hasFrame(),
81 streamState.windowSize());
82 }
83
84 @Override
85 public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
86
87 }
88
89 @Override
90 public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
91 final int size = queue.size();
92 if (size == 0) {
93 return totalStreamableBytes > 0;
94 }
95
96 final int chunkSize = max(minAllocationChunk, maxBytes / size);
97
98 State state = queue.pollFirst();
99 do {
100 state.enqueued = false;
101 if (state.windowNegative) {
102 continue;
103 }
104 if (maxBytes == 0 && state.streamableBytes > 0) {
105
106
107
108 queue.addFirst(state);
109 state.enqueued = true;
110 break;
111 }
112
113
114 int chunk = min(chunkSize, min(maxBytes, state.streamableBytes));
115 maxBytes -= chunk;
116
117
118 state.write(chunk, writer);
119 } while ((state = queue.pollFirst()) != null);
120
121 return totalStreamableBytes > 0;
122 }
123
124 private State state(Http2Stream stream) {
125 return checkNotNull(stream, "stream").getProperty(stateKey);
126 }
127
128
129
130
131 private final class State {
132 final Http2Stream stream;
133 int streamableBytes;
134 boolean windowNegative;
135 boolean enqueued;
136 boolean writing;
137
138 State(Http2Stream stream) {
139 this.stream = stream;
140 }
141
142 void updateStreamableBytes(int newStreamableBytes, boolean hasFrame, int windowSize) {
143 assert hasFrame || newStreamableBytes == 0 :
144 "hasFrame: " + hasFrame + " newStreamableBytes: " + newStreamableBytes;
145
146 int delta = newStreamableBytes - streamableBytes;
147 if (delta != 0) {
148 streamableBytes = newStreamableBytes;
149 totalStreamableBytes += delta;
150 }
151
152
153
154
155
156
157
158 windowNegative = windowSize < 0;
159 if (hasFrame && (windowSize > 0 || windowSize == 0 && !writing)) {
160 addToQueue();
161 }
162 }
163
164
165
166
167
168 void write(int numBytes, Writer writer) throws Http2Exception {
169 writing = true;
170 try {
171
172 writer.write(stream, numBytes);
173 } catch (Throwable t) {
174 throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
175 } finally {
176 writing = false;
177 }
178 }
179
180 void addToQueue() {
181 if (!enqueued) {
182 enqueued = true;
183 queue.addLast(this);
184 }
185 }
186
187 void removeFromQueue() {
188 if (enqueued) {
189 enqueued = false;
190 queue.remove(this);
191 }
192 }
193
194 void close() {
195
196 removeFromQueue();
197
198
199 updateStreamableBytes(0, false, 0);
200 }
201 }
202 }