1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.spdy;
17
18 import io.netty.channel.ChannelPromise;
19 import io.netty.util.internal.PlatformDependent;
20
21 import java.io.Serializable;
22 import java.util.Comparator;
23 import java.util.Map;
24 import java.util.Queue;
25 import java.util.TreeMap;
26 import java.util.concurrent.ConcurrentLinkedQueue;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
30
31 final class SpdySession {
32
33 private final AtomicInteger activeLocalStreams = new AtomicInteger();
34 private final AtomicInteger activeRemoteStreams = new AtomicInteger();
35 private final Map<Integer, StreamState> activeStreams = PlatformDependent.newConcurrentHashMap();
36 private final StreamComparator streamComparator = new StreamComparator();
37 private final AtomicInteger sendWindowSize;
38 private final AtomicInteger receiveWindowSize;
39
40 SpdySession(int sendWindowSize, int receiveWindowSize) {
41 this.sendWindowSize = new AtomicInteger(sendWindowSize);
42 this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
43 }
44
45 int numActiveStreams(boolean remote) {
46 if (remote) {
47 return activeRemoteStreams.get();
48 } else {
49 return activeLocalStreams.get();
50 }
51 }
52
53 boolean noActiveStreams() {
54 return activeStreams.isEmpty();
55 }
56
57 boolean isActiveStream(int streamId) {
58 return activeStreams.containsKey(streamId);
59 }
60
61
62 Map<Integer, StreamState> activeStreams() {
63 Map<Integer, StreamState> streams = new TreeMap<Integer, StreamState>(streamComparator);
64 streams.putAll(activeStreams);
65 return streams;
66 }
67
68 void acceptStream(
69 int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed,
70 int sendWindowSize, int receiveWindowSize, boolean remote) {
71 if (!remoteSideClosed || !localSideClosed) {
72 StreamState state = activeStreams.put(streamId, new StreamState(
73 priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize));
74 if (state == null) {
75 if (remote) {
76 activeRemoteStreams.incrementAndGet();
77 } else {
78 activeLocalStreams.incrementAndGet();
79 }
80 }
81 }
82 }
83
84 private StreamState removeActiveStream(int streamId, boolean remote) {
85 StreamState state = activeStreams.remove(streamId);
86 if (state != null) {
87 if (remote) {
88 activeRemoteStreams.decrementAndGet();
89 } else {
90 activeLocalStreams.decrementAndGet();
91 }
92 }
93 return state;
94 }
95
96 void removeStream(int streamId, Throwable cause, boolean remote) {
97 StreamState state = removeActiveStream(streamId, remote);
98 if (state != null) {
99 state.clearPendingWrites(cause);
100 }
101 }
102
103 boolean isRemoteSideClosed(int streamId) {
104 StreamState state = activeStreams.get(streamId);
105 return state == null || state.isRemoteSideClosed();
106 }
107
108 void closeRemoteSide(int streamId, boolean remote) {
109 StreamState state = activeStreams.get(streamId);
110 if (state != null) {
111 state.closeRemoteSide();
112 if (state.isLocalSideClosed()) {
113 removeActiveStream(streamId, remote);
114 }
115 }
116 }
117
118 boolean isLocalSideClosed(int streamId) {
119 StreamState state = activeStreams.get(streamId);
120 return state == null || state.isLocalSideClosed();
121 }
122
123 void closeLocalSide(int streamId, boolean remote) {
124 StreamState state = activeStreams.get(streamId);
125 if (state != null) {
126 state.closeLocalSide();
127 if (state.isRemoteSideClosed()) {
128 removeActiveStream(streamId, remote);
129 }
130 }
131 }
132
133
134
135
136
137 boolean hasReceivedReply(int streamId) {
138 StreamState state = activeStreams.get(streamId);
139 return state != null && state.hasReceivedReply();
140 }
141
142 void receivedReply(int streamId) {
143 StreamState state = activeStreams.get(streamId);
144 if (state != null) {
145 state.receivedReply();
146 }
147 }
148
149 int getSendWindowSize(int streamId) {
150 if (streamId == SPDY_SESSION_STREAM_ID) {
151 return sendWindowSize.get();
152 }
153
154 StreamState state = activeStreams.get(streamId);
155 return state != null ? state.getSendWindowSize() : -1;
156 }
157
158 int updateSendWindowSize(int streamId, int deltaWindowSize) {
159 if (streamId == SPDY_SESSION_STREAM_ID) {
160 return sendWindowSize.addAndGet(deltaWindowSize);
161 }
162
163 StreamState state = activeStreams.get(streamId);
164 return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1;
165 }
166
167 int updateReceiveWindowSize(int streamId, int deltaWindowSize) {
168 if (streamId == SPDY_SESSION_STREAM_ID) {
169 return receiveWindowSize.addAndGet(deltaWindowSize);
170 }
171
172 StreamState state = activeStreams.get(streamId);
173 if (state == null) {
174 return -1;
175 }
176 if (deltaWindowSize > 0) {
177 state.setReceiveWindowSizeLowerBound(0);
178 }
179 return state.updateReceiveWindowSize(deltaWindowSize);
180 }
181
182 int getReceiveWindowSizeLowerBound(int streamId) {
183 if (streamId == SPDY_SESSION_STREAM_ID) {
184 return 0;
185 }
186
187 StreamState state = activeStreams.get(streamId);
188 return state != null ? state.getReceiveWindowSizeLowerBound() : 0;
189 }
190
191 void updateAllSendWindowSizes(int deltaWindowSize) {
192 for (StreamState state: activeStreams.values()) {
193 state.updateSendWindowSize(deltaWindowSize);
194 }
195 }
196
197 void updateAllReceiveWindowSizes(int deltaWindowSize) {
198 for (StreamState state: activeStreams.values()) {
199 state.updateReceiveWindowSize(deltaWindowSize);
200 if (deltaWindowSize < 0) {
201 state.setReceiveWindowSizeLowerBound(deltaWindowSize);
202 }
203 }
204 }
205
206 boolean putPendingWrite(int streamId, PendingWrite pendingWrite) {
207 StreamState state = activeStreams.get(streamId);
208 return state != null && state.putPendingWrite(pendingWrite);
209 }
210
211 PendingWrite getPendingWrite(int streamId) {
212 if (streamId == SPDY_SESSION_STREAM_ID) {
213 for (Map.Entry<Integer, StreamState> e: activeStreams().entrySet()) {
214 StreamState state = e.getValue();
215 if (state.getSendWindowSize() > 0) {
216 PendingWrite pendingWrite = state.getPendingWrite();
217 if (pendingWrite != null) {
218 return pendingWrite;
219 }
220 }
221 }
222 return null;
223 }
224
225 StreamState state = activeStreams.get(streamId);
226 return state != null ? state.getPendingWrite() : null;
227 }
228
229 PendingWrite removePendingWrite(int streamId) {
230 StreamState state = activeStreams.get(streamId);
231 return state != null ? state.removePendingWrite() : null;
232 }
233
234 private static final class StreamState {
235
236 private final byte priority;
237 private boolean remoteSideClosed;
238 private boolean localSideClosed;
239 private boolean receivedReply;
240 private final AtomicInteger sendWindowSize;
241 private final AtomicInteger receiveWindowSize;
242 private int receiveWindowSizeLowerBound;
243 private final Queue<PendingWrite> pendingWriteQueue = new ConcurrentLinkedQueue<PendingWrite>();
244
245 StreamState(
246 byte priority, boolean remoteSideClosed, boolean localSideClosed,
247 int sendWindowSize, int receiveWindowSize) {
248 this.priority = priority;
249 this.remoteSideClosed = remoteSideClosed;
250 this.localSideClosed = localSideClosed;
251 this.sendWindowSize = new AtomicInteger(sendWindowSize);
252 this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
253 }
254
255 byte getPriority() {
256 return priority;
257 }
258
259 boolean isRemoteSideClosed() {
260 return remoteSideClosed;
261 }
262
263 void closeRemoteSide() {
264 remoteSideClosed = true;
265 }
266
267 boolean isLocalSideClosed() {
268 return localSideClosed;
269 }
270
271 void closeLocalSide() {
272 localSideClosed = true;
273 }
274
275 boolean hasReceivedReply() {
276 return receivedReply;
277 }
278
279 void receivedReply() {
280 receivedReply = true;
281 }
282
283 int getSendWindowSize() {
284 return sendWindowSize.get();
285 }
286
287 int updateSendWindowSize(int deltaWindowSize) {
288 return sendWindowSize.addAndGet(deltaWindowSize);
289 }
290
291 int updateReceiveWindowSize(int deltaWindowSize) {
292 return receiveWindowSize.addAndGet(deltaWindowSize);
293 }
294
295 int getReceiveWindowSizeLowerBound() {
296 return receiveWindowSizeLowerBound;
297 }
298
299 void setReceiveWindowSizeLowerBound(int receiveWindowSizeLowerBound) {
300 this.receiveWindowSizeLowerBound = receiveWindowSizeLowerBound;
301 }
302
303 boolean putPendingWrite(PendingWrite msg) {
304 return pendingWriteQueue.offer(msg);
305 }
306
307 PendingWrite getPendingWrite() {
308 return pendingWriteQueue.peek();
309 }
310
311 PendingWrite removePendingWrite() {
312 return pendingWriteQueue.poll();
313 }
314
315 void clearPendingWrites(Throwable cause) {
316 for (;;) {
317 PendingWrite pendingWrite = pendingWriteQueue.poll();
318 if (pendingWrite == null) {
319 break;
320 }
321 pendingWrite.fail(cause);
322 }
323 }
324 }
325
326 private final class StreamComparator implements Comparator<Integer> {
327
328 StreamComparator() { }
329
330 @Override
331 public int compare(Integer id1, Integer id2) {
332 StreamState state1 = activeStreams.get(id1);
333 StreamState state2 = activeStreams.get(id2);
334
335 int result = state1.getPriority() - state2.getPriority();
336 if (result != 0) {
337 return result;
338 }
339
340 return id1 - id2;
341 }
342 }
343
344 public static final class PendingWrite {
345 final SpdyDataFrame spdyDataFrame;
346 final ChannelPromise promise;
347
348 PendingWrite(SpdyDataFrame spdyDataFrame, ChannelPromise promise) {
349 this.spdyDataFrame = spdyDataFrame;
350 this.promise = promise;
351 }
352
353 void fail(Throwable cause) {
354 spdyDataFrame.release();
355 promise.setFailure(cause);
356 }
357 }
358 }