1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.spdy;
17
18 import org.jboss.netty.channel.MessageEvent;
19
20 import java.io.Serializable;
21 import java.util.Comparator;
22 import java.util.Map;
23 import java.util.TreeMap;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentLinkedQueue;
26 import java.util.concurrent.atomic.AtomicInteger;
27
28 import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*;
29
30 final class SpdySession {
31
32 private static final SpdyProtocolException STREAM_CLOSED = new SpdyProtocolException("Stream closed");
33
34 private final AtomicInteger activeLocalStreams = new AtomicInteger();
35 private final AtomicInteger activeRemoteStreams = new AtomicInteger();
36 private final Map<Integer, StreamState> activeStreams = new ConcurrentHashMap<Integer, StreamState>();
37 private final StreamComparator streamComparator = new StreamComparator();
38 private final AtomicInteger sendWindowSize;
39 private final AtomicInteger receiveWindowSize;
40
41 public SpdySession(int sendWindowSize, int receiveWindowSize) {
42 this.sendWindowSize = new AtomicInteger(sendWindowSize);
43 this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
44 }
45
46 int numActiveStreams(boolean remote) {
47 if (remote) {
48 return activeRemoteStreams.get();
49 } else {
50 return activeLocalStreams.get();
51 }
52 }
53
54 boolean noActiveStreams() {
55 return activeStreams.isEmpty();
56 }
57
58 boolean isActiveStream(int streamId) {
59 return activeStreams.containsKey(streamId);
60 }
61
62
63 Map<Integer, StreamState> activeStreams() {
64 Map<Integer, StreamState> streams = new TreeMap<Integer, StreamState>(streamComparator);
65 streams.putAll(activeStreams);
66 return streams;
67 }
68
69 void acceptStream(
70 int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed,
71 int sendWindowSize, int receiveWindowSize, boolean remote) {
72 if (!remoteSideClosed || !localSideClosed) {
73 StreamState state = activeStreams.put(
74 streamId,
75 new StreamState(priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize));
76 if (state == null) {
77 if (remote) {
78 activeRemoteStreams.incrementAndGet();
79 } else {
80 activeLocalStreams.incrementAndGet();
81 }
82 }
83 }
84 }
85
86 private StreamState removeActiveStream(int streamId, boolean remote) {
87 StreamState state = activeStreams.remove(streamId);
88 if (state != null) {
89 if (remote) {
90 activeRemoteStreams.decrementAndGet();
91 } else {
92 activeLocalStreams.decrementAndGet();
93 }
94 }
95 return state;
96 }
97
98 void removeStream(int streamId, boolean remote) {
99 StreamState state = removeActiveStream(streamId, remote);
100 if (state != null) {
101 MessageEvent e = state.removePendingWrite();
102 while (e != null) {
103 e.getFuture().setFailure(STREAM_CLOSED);
104 e = state.removePendingWrite();
105 }
106 }
107 }
108
109 boolean isRemoteSideClosed(int streamId) {
110 StreamState state = activeStreams.get(streamId);
111 return state == null || state.isRemoteSideClosed();
112 }
113
114 void closeRemoteSide(int streamId, boolean remote) {
115 StreamState state = activeStreams.get(streamId);
116 if (state != null) {
117 state.closeRemoteSide();
118 if (state.isLocalSideClosed()) {
119 removeActiveStream(streamId, remote);
120 }
121 }
122 }
123
124 boolean isLocalSideClosed(int streamId) {
125 StreamState state = activeStreams.get(streamId);
126 return state == null || state.isLocalSideClosed();
127 }
128
129 void closeLocalSide(int streamId, boolean remote) {
130 StreamState state = activeStreams.get(streamId);
131 if (state != null) {
132 state.closeLocalSide();
133 if (state.isRemoteSideClosed()) {
134 removeActiveStream(streamId, remote);
135 }
136 }
137 }
138
139
140
141
142
143
144 boolean hasReceivedReply(int streamId) {
145 StreamState state = activeStreams.get(streamId);
146 return state != null && state.hasReceivedReply();
147 }
148
149 void receivedReply(int streamId) {
150 StreamState state = activeStreams.get(streamId);
151 if (state != null) {
152 state.receivedReply();
153 }
154 }
155
156 int getSendWindowSize(int streamId) {
157 if (streamId == SPDY_SESSION_STREAM_ID) {
158 return sendWindowSize.get();
159 }
160
161 StreamState state = activeStreams.get(streamId);
162 return state != null ? state.getSendWindowSize() : -1;
163 }
164
165 int updateSendWindowSize(int streamId, int deltaWindowSize) {
166 if (streamId == SPDY_SESSION_STREAM_ID) {
167 return sendWindowSize.addAndGet(deltaWindowSize);
168 }
169
170 StreamState state = activeStreams.get(streamId);
171 return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1;
172 }
173
174 int updateReceiveWindowSize(int streamId, int deltaWindowSize) {
175 if (streamId == SPDY_SESSION_STREAM_ID) {
176 return receiveWindowSize.addAndGet(deltaWindowSize);
177 }
178
179 StreamState state = activeStreams.get(streamId);
180 if (deltaWindowSize > 0) {
181 state.setReceiveWindowSizeLowerBound(0);
182 }
183 return state != null ? state.updateReceiveWindowSize(deltaWindowSize) : -1;
184 }
185
186 int getReceiveWindowSizeLowerBound(int streamId) {
187 if (streamId == SPDY_SESSION_STREAM_ID) {
188 return 0;
189 }
190
191 StreamState state = activeStreams.get(streamId);
192 return state != null ? state.getReceiveWindowSizeLowerBound() : 0;
193 }
194
195 void updateAllSendWindowSizes(int deltaWindowSize) {
196 for (StreamState state: activeStreams.values()) {
197 state.updateSendWindowSize(deltaWindowSize);
198 }
199 }
200
201 void updateAllReceiveWindowSizes(int deltaWindowSize) {
202 for (StreamState state: activeStreams.values()) {
203 state.updateReceiveWindowSize(deltaWindowSize);
204 if (deltaWindowSize < 0) {
205 state.setReceiveWindowSizeLowerBound(deltaWindowSize);
206 }
207 }
208 }
209
210 boolean putPendingWrite(int streamId, MessageEvent evt) {
211 StreamState state = activeStreams.get(streamId);
212 return state != null && state.putPendingWrite(evt);
213 }
214
215 MessageEvent getPendingWrite(int streamId) {
216 if (streamId == SPDY_SESSION_STREAM_ID) {
217 for (Map.Entry<Integer, StreamState> e: activeStreams().entrySet()) {
218 StreamState state = e.getValue();
219 if (state.getSendWindowSize() > 0) {
220 MessageEvent evt = state.getPendingWrite();
221 if (evt != null) {
222 return evt;
223 }
224 }
225 }
226 return null;
227 }
228
229 StreamState state = activeStreams.get(streamId);
230 return state != null ? state.getPendingWrite() : null;
231 }
232
233 MessageEvent removePendingWrite(int streamId) {
234 StreamState state = activeStreams.get(streamId);
235 return state != null ? state.removePendingWrite() : null;
236 }
237
238 private static final class StreamState {
239
240 private final byte priority;
241 private volatile boolean remoteSideClosed;
242 private volatile boolean localSideClosed;
243 private boolean receivedReply;
244 private final AtomicInteger sendWindowSize;
245 private final AtomicInteger receiveWindowSize;
246 private volatile int receiveWindowSizeLowerBound;
247 private final ConcurrentLinkedQueue<MessageEvent> pendingWriteQueue =
248 new ConcurrentLinkedQueue<MessageEvent>();
249
250 StreamState(
251 byte priority, boolean remoteSideClosed, boolean localSideClosed,
252 int sendWindowSize, int receiveWindowSize) {
253 this.priority = priority;
254 this.remoteSideClosed = remoteSideClosed;
255 this.localSideClosed = localSideClosed;
256 this.sendWindowSize = new AtomicInteger(sendWindowSize);
257 this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
258 }
259
260 byte getPriority() {
261 return priority;
262 }
263
264 boolean isRemoteSideClosed() {
265 return remoteSideClosed;
266 }
267
268 void closeRemoteSide() {
269 remoteSideClosed = true;
270 }
271
272 boolean isLocalSideClosed() {
273 return localSideClosed;
274 }
275
276 void closeLocalSide() {
277 localSideClosed = true;
278 }
279
280 boolean hasReceivedReply() {
281 return receivedReply;
282 }
283
284 void receivedReply() {
285 receivedReply = true;
286 }
287
288 int getSendWindowSize() {
289 return sendWindowSize.get();
290 }
291
292 int updateSendWindowSize(int deltaWindowSize) {
293 return sendWindowSize.addAndGet(deltaWindowSize);
294 }
295
296 int updateReceiveWindowSize(int deltaWindowSize) {
297 return receiveWindowSize.addAndGet(deltaWindowSize);
298 }
299
300 int getReceiveWindowSizeLowerBound() {
301 return receiveWindowSizeLowerBound;
302 }
303
304 void setReceiveWindowSizeLowerBound(int receiveWindowSizeLowerBound) {
305 this.receiveWindowSizeLowerBound = receiveWindowSizeLowerBound;
306 }
307
308 boolean putPendingWrite(MessageEvent evt) {
309 return pendingWriteQueue.offer(evt);
310 }
311
312 MessageEvent getPendingWrite() {
313 return pendingWriteQueue.peek();
314 }
315
316 MessageEvent removePendingWrite() {
317 return pendingWriteQueue.poll();
318 }
319 }
320
321 private final class StreamComparator implements Comparator<Integer>, Serializable {
322
323 private static final long serialVersionUID = 1161471649740544848L;
324
325 StreamComparator() { }
326
327 public int compare(Integer id1, Integer id2) {
328 StreamState state1 = activeStreams.get(id1);
329 StreamState state2 = activeStreams.get(id2);
330
331 int result = state1.getPriority() - state2.getPriority();
332 if (result != 0) {
333 return result;
334 }
335
336 return id1 - id2;
337 }
338 }
339 }