View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.spdy;
17  
18  import org.jboss.netty.channel.MessageEvent;
19  
20  import java.io.Serializable;
21  import java.util.Comparator;
22  import java.util.Map;
23  import java.util.TreeMap;
24  import java.util.concurrent.ConcurrentHashMap;
25  import java.util.concurrent.ConcurrentLinkedQueue;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*;
29  
30  final class SpdySession {
31  
32      private static final SpdyProtocolException STREAM_CLOSED = new SpdyProtocolException("Stream closed");
33  
34      private final AtomicInteger activeLocalStreams  = new AtomicInteger();
35      private final AtomicInteger activeRemoteStreams = new AtomicInteger();
36      private final Map<Integer, StreamState> activeStreams = new ConcurrentHashMap<Integer, StreamState>();
37      private final StreamComparator streamComparator = new StreamComparator();
38      private final AtomicInteger sendWindowSize;
39      private final AtomicInteger receiveWindowSize;
40  
41      public SpdySession(int sendWindowSize, int receiveWindowSize) {
42          this.sendWindowSize = new AtomicInteger(sendWindowSize);
43          this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
44      }
45  
46      int numActiveStreams(boolean remote) {
47          if (remote) {
48              return activeRemoteStreams.get();
49          } else {
50              return activeLocalStreams.get();
51          }
52      }
53  
54      boolean noActiveStreams() {
55          return activeStreams.isEmpty();
56      }
57  
58      boolean isActiveStream(int streamId) {
59          return activeStreams.containsKey(streamId);
60      }
61  
62      // Stream-IDs should be iterated in priority order
63      Map<Integer, StreamState> activeStreams() {
64          Map<Integer, StreamState> streams = new TreeMap<Integer, StreamState>(streamComparator);
65          streams.putAll(activeStreams);
66          return streams;
67      }
68  
69      void acceptStream(
70              int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed,
71              int sendWindowSize, int receiveWindowSize, boolean remote) {
72          if (!remoteSideClosed || !localSideClosed) {
73              StreamState state = activeStreams.put(
74                      streamId,
75                      new StreamState(priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize));
76              if (state == null) {
77                  if (remote) {
78                      activeRemoteStreams.incrementAndGet();
79                  } else {
80                      activeLocalStreams.incrementAndGet();
81                  }
82              }
83          }
84      }
85  
86      private StreamState removeActiveStream(int streamId, boolean remote) {
87          StreamState state = activeStreams.remove(streamId);
88          if (state != null) {
89              if (remote) {
90                  activeRemoteStreams.decrementAndGet();
91              } else {
92                  activeLocalStreams.decrementAndGet();
93              }
94          }
95          return state;
96      }
97  
98      void removeStream(int streamId, boolean remote) {
99          StreamState state = removeActiveStream(streamId, remote);
100         if (state != null) {
101             MessageEvent e = state.removePendingWrite();
102             while (e != null) {
103                 e.getFuture().setFailure(STREAM_CLOSED);
104                 e = state.removePendingWrite();
105             }
106         }
107     }
108 
109     boolean isRemoteSideClosed(int streamId) {
110         StreamState state = activeStreams.get(streamId);
111         return state == null || state.isRemoteSideClosed();
112     }
113 
114     void closeRemoteSide(int streamId, boolean remote) {
115         StreamState state = activeStreams.get(streamId);
116         if (state != null) {
117             state.closeRemoteSide();
118             if (state.isLocalSideClosed()) {
119                 removeActiveStream(streamId, remote);
120             }
121         }
122     }
123 
124     boolean isLocalSideClosed(int streamId) {
125         StreamState state = activeStreams.get(streamId);
126         return state == null || state.isLocalSideClosed();
127     }
128 
129     void closeLocalSide(int streamId, boolean remote) {
130         StreamState state = activeStreams.get(streamId);
131         if (state != null) {
132             state.closeLocalSide();
133             if (state.isRemoteSideClosed()) {
134                 removeActiveStream(streamId, remote);
135             }
136         }
137     }
138 
139     /*
140      * hasReceivedReply and receivedReply are only called from messageReceived
141      * no need to synchronize access to the StreamState
142      */
143 
144     boolean hasReceivedReply(int streamId) {
145         StreamState state = activeStreams.get(streamId);
146         return state != null && state.hasReceivedReply();
147     }
148 
149     void receivedReply(int streamId) {
150         StreamState state = activeStreams.get(streamId);
151         if (state != null) {
152             state.receivedReply();
153         }
154     }
155 
156     int getSendWindowSize(int streamId) {
157         if (streamId == SPDY_SESSION_STREAM_ID) {
158             return sendWindowSize.get();
159         }
160 
161         StreamState state = activeStreams.get(streamId);
162         return state != null ? state.getSendWindowSize() : -1;
163     }
164 
165     int updateSendWindowSize(int streamId, int deltaWindowSize) {
166         if (streamId == SPDY_SESSION_STREAM_ID) {
167             return sendWindowSize.addAndGet(deltaWindowSize);
168         }
169 
170         StreamState state = activeStreams.get(streamId);
171         return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1;
172     }
173 
174     int updateReceiveWindowSize(int streamId, int deltaWindowSize) {
175         if (streamId == SPDY_SESSION_STREAM_ID) {
176             return receiveWindowSize.addAndGet(deltaWindowSize);
177         }
178 
179         StreamState state = activeStreams.get(streamId);
180         if (deltaWindowSize > 0) {
181             state.setReceiveWindowSizeLowerBound(0);
182         }
183         return state != null ? state.updateReceiveWindowSize(deltaWindowSize) : -1;
184     }
185 
186     int getReceiveWindowSizeLowerBound(int streamId) {
187         if (streamId == SPDY_SESSION_STREAM_ID) {
188             return 0;
189         }
190 
191         StreamState state = activeStreams.get(streamId);
192         return state != null ? state.getReceiveWindowSizeLowerBound() : 0;
193     }
194 
195     void updateAllSendWindowSizes(int deltaWindowSize) {
196         for (StreamState state: activeStreams.values()) {
197             state.updateSendWindowSize(deltaWindowSize);
198         }
199     }
200 
201     void updateAllReceiveWindowSizes(int deltaWindowSize) {
202         for (StreamState state: activeStreams.values()) {
203             state.updateReceiveWindowSize(deltaWindowSize);
204             if (deltaWindowSize < 0) {
205                 state.setReceiveWindowSizeLowerBound(deltaWindowSize);
206             }
207         }
208     }
209 
210     boolean putPendingWrite(int streamId, MessageEvent evt) {
211         StreamState state = activeStreams.get(streamId);
212         return state != null && state.putPendingWrite(evt);
213     }
214 
215     MessageEvent getPendingWrite(int streamId) {
216         if (streamId == SPDY_SESSION_STREAM_ID) {
217             for (Map.Entry<Integer, StreamState> e: activeStreams().entrySet()) {
218                 StreamState state = e.getValue();
219                 if (state.getSendWindowSize() > 0) {
220                     MessageEvent evt = state.getPendingWrite();
221                     if (evt != null) {
222                         return evt;
223                     }
224                 }
225             }
226             return null;
227         }
228 
229         StreamState state = activeStreams.get(streamId);
230         return state != null ? state.getPendingWrite() : null;
231     }
232 
233     MessageEvent removePendingWrite(int streamId) {
234         StreamState state = activeStreams.get(streamId);
235         return state != null ? state.removePendingWrite() : null;
236     }
237 
238     private static final class StreamState {
239 
240         private final byte priority;
241         private volatile boolean remoteSideClosed;
242         private volatile boolean localSideClosed;
243         private boolean receivedReply;
244         private final AtomicInteger sendWindowSize;
245         private final AtomicInteger receiveWindowSize;
246         private volatile int receiveWindowSizeLowerBound;
247         private final ConcurrentLinkedQueue<MessageEvent> pendingWriteQueue =
248                 new ConcurrentLinkedQueue<MessageEvent>();
249 
250         StreamState(
251                 byte priority, boolean remoteSideClosed, boolean localSideClosed,
252                 int sendWindowSize, int receiveWindowSize) {
253             this.priority = priority;
254             this.remoteSideClosed = remoteSideClosed;
255             this.localSideClosed = localSideClosed;
256             this.sendWindowSize = new AtomicInteger(sendWindowSize);
257             this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
258         }
259 
260         byte getPriority() {
261             return priority;
262         }
263 
264         boolean isRemoteSideClosed() {
265             return remoteSideClosed;
266         }
267 
268         void closeRemoteSide() {
269             remoteSideClosed = true;
270         }
271 
272         boolean isLocalSideClosed() {
273             return localSideClosed;
274         }
275 
276         void closeLocalSide() {
277             localSideClosed = true;
278         }
279 
280         boolean hasReceivedReply() {
281             return receivedReply;
282         }
283 
284         void receivedReply() {
285             receivedReply = true;
286         }
287 
288         int getSendWindowSize() {
289             return sendWindowSize.get();
290         }
291 
292         int updateSendWindowSize(int deltaWindowSize) {
293             return sendWindowSize.addAndGet(deltaWindowSize);
294         }
295 
296         int updateReceiveWindowSize(int deltaWindowSize) {
297             return receiveWindowSize.addAndGet(deltaWindowSize);
298         }
299 
300         int getReceiveWindowSizeLowerBound() {
301             return receiveWindowSizeLowerBound;
302         }
303 
304         void setReceiveWindowSizeLowerBound(int receiveWindowSizeLowerBound) {
305             this.receiveWindowSizeLowerBound = receiveWindowSizeLowerBound;
306         }
307 
308         boolean putPendingWrite(MessageEvent evt) {
309             return pendingWriteQueue.offer(evt);
310         }
311 
312         MessageEvent getPendingWrite() {
313             return pendingWriteQueue.peek();
314         }
315 
316         MessageEvent removePendingWrite() {
317             return pendingWriteQueue.poll();
318         }
319     }
320 
321     private final class StreamComparator implements Comparator<Integer>, Serializable {
322 
323         private static final long serialVersionUID = 1161471649740544848L;
324 
325         StreamComparator() { }
326 
327         public int compare(Integer id1, Integer id2) {
328             StreamState state1 = activeStreams.get(id1);
329             StreamState state2 = activeStreams.get(id2);
330 
331             int result = state1.getPriority() - state2.getPriority();
332             if (result != 0) {
333                 return result;
334             }
335 
336             return id1 - id2;
337         }
338     }
339 }