1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.spdy;
17
18 import java.util.Comparator;
19 import java.util.Map;
20 import java.util.Set;
21 import java.util.TreeSet;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.atomic.AtomicInteger;
25
26 import org.jboss.netty.channel.MessageEvent;
27
28 final class SpdySession {
29
30 private static final SpdyProtocolException STREAM_CLOSED = new SpdyProtocolException("Stream closed");
31
32 private final Map<Integer, StreamState> activeStreams =
33 new ConcurrentHashMap<Integer, StreamState>();
34
35 int numActiveStreams() {
36 return activeStreams.size();
37 }
38
39 boolean noActiveStreams() {
40 return activeStreams.isEmpty();
41 }
42
43 boolean isActiveStream(int streamID) {
44 return activeStreams.containsKey(streamID);
45 }
46
47
48 Set<Integer> getActiveStreams() {
49 TreeSet<Integer> StreamIDs = new TreeSet<Integer>(new PriorityComparator());
50 StreamIDs.addAll(activeStreams.keySet());
51 return StreamIDs;
52 }
53
54 void acceptStream(
55 int streamID, byte priority, boolean remoteSideClosed, boolean localSideClosed,
56 int sendWindowSize, int receiveWindowSize) {
57 if (!remoteSideClosed || !localSideClosed) {
58 activeStreams.put(
59 streamID,
60 new StreamState(priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize));
61 }
62 }
63
64 void removeStream(int streamID) {
65 Integer StreamID = streamID;
66 StreamState state = activeStreams.get(StreamID);
67 activeStreams.remove(StreamID);
68 if (state != null) {
69 MessageEvent e = state.removePendingWrite();
70 while (e != null) {
71 e.getFuture().setFailure(STREAM_CLOSED);
72 e = state.removePendingWrite();
73 }
74 }
75 }
76
77 boolean isRemoteSideClosed(int streamID) {
78 StreamState state = activeStreams.get(streamID);
79 return state == null || state.isRemoteSideClosed();
80 }
81
82 void closeRemoteSide(int streamID) {
83 Integer StreamID = streamID;
84 StreamState state = activeStreams.get(StreamID);
85 if (state != null) {
86 state.closeRemoteSide();
87 if (state.isLocalSideClosed()) {
88 activeStreams.remove(StreamID);
89 }
90 }
91 }
92
93 boolean isLocalSideClosed(int streamID) {
94 StreamState state = activeStreams.get(streamID);
95 return state == null || state.isLocalSideClosed();
96 }
97
98 void closeLocalSide(int streamID) {
99 Integer StreamID = streamID;
100 StreamState state = activeStreams.get(StreamID);
101 if (state != null) {
102 state.closeLocalSide();
103 if (state.isRemoteSideClosed()) {
104 activeStreams.remove(StreamID);
105 }
106 }
107 }
108
109
110
111
112
113
114 boolean hasReceivedReply(int streamID) {
115 StreamState state = activeStreams.get(streamID);
116 return state != null && state.hasReceivedReply();
117 }
118
119 void receivedReply(int streamID) {
120 StreamState state = activeStreams.get(streamID);
121 if (state != null) {
122 state.receivedReply();
123 }
124 }
125
126 int getSendWindowSize(int streamID) {
127 StreamState state = activeStreams.get(streamID);
128 return state != null ? state.getSendWindowSize() : -1;
129 }
130
131 int updateSendWindowSize(int streamID, int deltaWindowSize) {
132 StreamState state = activeStreams.get(streamID);
133 return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1;
134 }
135
136 int updateReceiveWindowSize(int streamID, int deltaWindowSize) {
137 StreamState state = activeStreams.get(streamID);
138 if (deltaWindowSize > 0) {
139 state.setReceiveWindowSizeLowerBound(0);
140 }
141 return state != null ? state.updateReceiveWindowSize(deltaWindowSize) : -1;
142 }
143
144 int getReceiveWindowSizeLowerBound(int streamID) {
145 StreamState state = activeStreams.get(streamID);
146 return state != null ? state.getReceiveWindowSizeLowerBound() : 0;
147 }
148
149 void updateAllReceiveWindowSizes(int deltaWindowSize) {
150 for (StreamState state: activeStreams.values()) {
151 state.updateReceiveWindowSize(deltaWindowSize);
152 if (deltaWindowSize < 0) {
153 state.setReceiveWindowSizeLowerBound(deltaWindowSize);
154 }
155 }
156 }
157
158 boolean putPendingWrite(int streamID, MessageEvent evt) {
159 StreamState state = activeStreams.get(streamID);
160 return state != null && state.putPendingWrite(evt);
161 }
162
163 MessageEvent getPendingWrite(int streamID) {
164 StreamState state = activeStreams.get(streamID);
165 return state != null ? state.getPendingWrite() : null;
166 }
167
168 MessageEvent removePendingWrite(int streamID) {
169 StreamState state = activeStreams.get(streamID);
170 return state != null ? state.removePendingWrite() : null;
171 }
172
173 private static final class StreamState {
174
175 private final byte priority;
176 private volatile boolean remoteSideClosed;
177 private volatile boolean localSideClosed;
178 private boolean receivedReply;
179 private final AtomicInteger sendWindowSize;
180 private final AtomicInteger receiveWindowSize;
181 private volatile int receiveWindowSizeLowerBound;
182 private final ConcurrentLinkedQueue<MessageEvent> pendingWriteQueue =
183 new ConcurrentLinkedQueue<MessageEvent>();
184
185 StreamState(
186 byte priority, boolean remoteSideClosed, boolean localSideClosed,
187 int sendWindowSize, int receiveWindowSize) {
188 this.priority = priority;
189 this.remoteSideClosed = remoteSideClosed;
190 this.localSideClosed = localSideClosed;
191 this.sendWindowSize = new AtomicInteger(sendWindowSize);
192 this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
193 }
194
195 byte getPriority() {
196 return priority;
197 }
198
199 boolean isRemoteSideClosed() {
200 return remoteSideClosed;
201 }
202
203 void closeRemoteSide() {
204 remoteSideClosed = true;
205 }
206
207 boolean isLocalSideClosed() {
208 return localSideClosed;
209 }
210
211 void closeLocalSide() {
212 localSideClosed = true;
213 }
214
215 boolean hasReceivedReply() {
216 return receivedReply;
217 }
218
219 void receivedReply() {
220 receivedReply = true;
221 }
222
223 int getSendWindowSize() {
224 return sendWindowSize.get();
225 }
226
227 int updateSendWindowSize(int deltaWindowSize) {
228 return sendWindowSize.addAndGet(deltaWindowSize);
229 }
230
231 int updateReceiveWindowSize(int deltaWindowSize) {
232 return receiveWindowSize.addAndGet(deltaWindowSize);
233 }
234
235 int getReceiveWindowSizeLowerBound() {
236 return receiveWindowSizeLowerBound;
237 }
238
239 void setReceiveWindowSizeLowerBound(int receiveWindowSizeLowerBound) {
240 this.receiveWindowSizeLowerBound = receiveWindowSizeLowerBound;
241 }
242
243 boolean putPendingWrite(MessageEvent evt) {
244 return pendingWriteQueue.offer(evt);
245 }
246
247 MessageEvent getPendingWrite() {
248 return pendingWriteQueue.peek();
249 }
250
251 MessageEvent removePendingWrite() {
252 return pendingWriteQueue.poll();
253 }
254 }
255
256 private final class PriorityComparator implements Comparator<Integer> {
257
258 PriorityComparator() {
259 }
260
261 public int compare(Integer id1, Integer id2) {
262 StreamState state1 = activeStreams.get(id1);
263 StreamState state2 = activeStreams.get(id2);
264 return state1.getPriority() - state2.getPriority();
265 }
266 }
267 }