View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.spdy;
17  
18  import java.util.Comparator;
19  import java.util.Map;
20  import java.util.Set;
21  import java.util.TreeSet;
22  import java.util.concurrent.ConcurrentHashMap;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  import java.util.concurrent.atomic.AtomicInteger;
25  
26  import org.jboss.netty.channel.MessageEvent;
27  
28  final class SpdySession {
29  
30      private static final SpdyProtocolException STREAM_CLOSED = new SpdyProtocolException("Stream closed");
31  
32      private final Map<Integer, StreamState> activeStreams =
33          new ConcurrentHashMap<Integer, StreamState>();
34  
35      int numActiveStreams() {
36          return activeStreams.size();
37      }
38  
39      boolean noActiveStreams() {
40          return activeStreams.isEmpty();
41      }
42  
43      boolean isActiveStream(int streamID) {
44          return activeStreams.containsKey(streamID);
45      }
46  
47      // Stream-IDs should be iterated in priority order
48      Set<Integer> getActiveStreams() {
49          TreeSet<Integer> StreamIDs = new TreeSet<Integer>(new PriorityComparator());
50          StreamIDs.addAll(activeStreams.keySet());
51          return StreamIDs;
52      }
53  
54      void acceptStream(
55              int streamID, byte priority, boolean remoteSideClosed, boolean localSideClosed,
56              int sendWindowSize, int receiveWindowSize) {
57          if (!remoteSideClosed || !localSideClosed) {
58              activeStreams.put(
59                      streamID,
60                      new StreamState(priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize));
61          }
62      }
63  
64      void removeStream(int streamID) {
65          Integer StreamID = streamID;
66          StreamState state = activeStreams.get(StreamID);
67          activeStreams.remove(StreamID);
68          if (state != null) {
69              MessageEvent e = state.removePendingWrite();
70              while (e != null) {
71                  e.getFuture().setFailure(STREAM_CLOSED);
72                  e = state.removePendingWrite();
73              }
74          }
75      }
76  
77      boolean isRemoteSideClosed(int streamID) {
78          StreamState state = activeStreams.get(streamID);
79          return state == null || state.isRemoteSideClosed();
80      }
81  
82      void closeRemoteSide(int streamID) {
83          Integer StreamID = streamID;
84          StreamState state = activeStreams.get(StreamID);
85          if (state != null) {
86              state.closeRemoteSide();
87              if (state.isLocalSideClosed()) {
88                  activeStreams.remove(StreamID);
89              }
90          }
91      }
92  
93      boolean isLocalSideClosed(int streamID) {
94          StreamState state = activeStreams.get(streamID);
95          return state == null || state.isLocalSideClosed();
96      }
97  
98      void closeLocalSide(int streamID) {
99          Integer StreamID = streamID;
100         StreamState state = activeStreams.get(StreamID);
101         if (state != null) {
102             state.closeLocalSide();
103             if (state.isRemoteSideClosed()) {
104                 activeStreams.remove(StreamID);
105             }
106         }
107     }
108 
109     /*
110      * hasReceivedReply and receivedReply are only called from messageReceived
111      * no need to synchronize access to the StreamState
112      */
113 
114     boolean hasReceivedReply(int streamID) {
115         StreamState state = activeStreams.get(streamID);
116         return state != null && state.hasReceivedReply();
117     }
118 
119     void receivedReply(int streamID) {
120         StreamState state = activeStreams.get(streamID);
121         if (state != null) {
122             state.receivedReply();
123         }
124     }
125 
126     int getSendWindowSize(int streamID) {
127         StreamState state = activeStreams.get(streamID);
128         return state != null ? state.getSendWindowSize() : -1;
129     }
130 
131     int updateSendWindowSize(int streamID, int deltaWindowSize) {
132         StreamState state = activeStreams.get(streamID);
133         return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1;
134     }
135 
136     int updateReceiveWindowSize(int streamID, int deltaWindowSize) {
137         StreamState state = activeStreams.get(streamID);
138         if (deltaWindowSize > 0) {
139             state.setReceiveWindowSizeLowerBound(0);
140         }
141         return state != null ? state.updateReceiveWindowSize(deltaWindowSize) : -1;
142     }
143 
144     int getReceiveWindowSizeLowerBound(int streamID) {
145         StreamState state = activeStreams.get(streamID);
146         return state != null ? state.getReceiveWindowSizeLowerBound() : 0;
147     }
148 
149     void updateAllReceiveWindowSizes(int deltaWindowSize) {
150         for (StreamState state: activeStreams.values()) {
151             state.updateReceiveWindowSize(deltaWindowSize);
152             if (deltaWindowSize < 0) {
153                 state.setReceiveWindowSizeLowerBound(deltaWindowSize);
154             }
155         }
156     }
157 
158     boolean putPendingWrite(int streamID, MessageEvent evt) {
159         StreamState state = activeStreams.get(streamID);
160         return state != null && state.putPendingWrite(evt);
161     }
162 
163     MessageEvent getPendingWrite(int streamID) {
164         StreamState state = activeStreams.get(streamID);
165         return state != null ? state.getPendingWrite() : null;
166     }
167 
168     MessageEvent removePendingWrite(int streamID) {
169         StreamState state = activeStreams.get(streamID);
170         return state != null ? state.removePendingWrite() : null;
171     }
172 
173     private static final class StreamState {
174 
175         private final byte priority;
176         private volatile boolean remoteSideClosed;
177         private volatile boolean localSideClosed;
178         private boolean receivedReply;
179         private final AtomicInteger sendWindowSize;
180         private final AtomicInteger receiveWindowSize;
181         private volatile int receiveWindowSizeLowerBound;
182         private final ConcurrentLinkedQueue<MessageEvent> pendingWriteQueue =
183                 new ConcurrentLinkedQueue<MessageEvent>();
184 
185         StreamState(
186                 byte priority, boolean remoteSideClosed, boolean localSideClosed,
187                 int sendWindowSize, int receiveWindowSize) {
188             this.priority = priority;
189             this.remoteSideClosed = remoteSideClosed;
190             this.localSideClosed = localSideClosed;
191             this.sendWindowSize = new AtomicInteger(sendWindowSize);
192             this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
193         }
194 
195         byte getPriority() {
196             return priority;
197         }
198 
199         boolean isRemoteSideClosed() {
200             return remoteSideClosed;
201         }
202 
203         void closeRemoteSide() {
204             remoteSideClosed = true;
205         }
206 
207         boolean isLocalSideClosed() {
208             return localSideClosed;
209         }
210 
211         void closeLocalSide() {
212             localSideClosed = true;
213         }
214 
215         boolean hasReceivedReply() {
216             return receivedReply;
217         }
218 
219         void receivedReply() {
220             receivedReply = true;
221         }
222 
223         int getSendWindowSize() {
224             return sendWindowSize.get();
225         }
226 
227         int updateSendWindowSize(int deltaWindowSize) {
228             return sendWindowSize.addAndGet(deltaWindowSize);
229         }
230 
231         int updateReceiveWindowSize(int deltaWindowSize) {
232             return receiveWindowSize.addAndGet(deltaWindowSize);
233         }
234 
235         int getReceiveWindowSizeLowerBound() {
236             return receiveWindowSizeLowerBound;
237         }
238 
239         void setReceiveWindowSizeLowerBound(int receiveWindowSizeLowerBound) {
240             this.receiveWindowSizeLowerBound = receiveWindowSizeLowerBound;
241         }
242 
243         boolean putPendingWrite(MessageEvent evt) {
244             return pendingWriteQueue.offer(evt);
245         }
246 
247         MessageEvent getPendingWrite() {
248             return pendingWriteQueue.peek();
249         }
250 
251         MessageEvent removePendingWrite() {
252             return pendingWriteQueue.poll();
253         }
254     }
255 
256     private final class PriorityComparator implements Comparator<Integer> {
257 
258         PriorityComparator() {
259         }
260 
261         public int compare(Integer id1, Integer id2) {
262             StreamState state1 = activeStreams.get(id1);
263             StreamState state2 = activeStreams.get(id2);
264             return state1.getPriority() - state2.getPriority();
265         }
266     }
267 }