View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.spdy;
17  
18  import io.netty.channel.ChannelPromise;
19  import io.netty.util.internal.PlatformDependent;
20  
21  import java.io.Serializable;
22  import java.util.Comparator;
23  import java.util.Map;
24  import java.util.Queue;
25  import java.util.TreeMap;
26  import java.util.concurrent.ConcurrentLinkedQueue;
27  import java.util.concurrent.atomic.AtomicInteger;
28  
29  import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
30  
31  final class SpdySession {
32  
33      private final AtomicInteger activeLocalStreams  = new AtomicInteger();
34      private final AtomicInteger activeRemoteStreams = new AtomicInteger();
35      private final Map<Integer, StreamState> activeStreams = PlatformDependent.newConcurrentHashMap();
36      private final StreamComparator streamComparator = new StreamComparator();
37      private final AtomicInteger sendWindowSize;
38      private final AtomicInteger receiveWindowSize;
39  
40      SpdySession(int sendWindowSize, int receiveWindowSize) {
41          this.sendWindowSize = new AtomicInteger(sendWindowSize);
42          this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
43      }
44  
45      int numActiveStreams(boolean remote) {
46          if (remote) {
47              return activeRemoteStreams.get();
48          } else {
49              return activeLocalStreams.get();
50          }
51      }
52  
53      boolean noActiveStreams() {
54          return activeStreams.isEmpty();
55      }
56  
57      boolean isActiveStream(int streamId) {
58          return activeStreams.containsKey(streamId);
59      }
60  
61      // Stream-IDs should be iterated in priority order
62      Map<Integer, StreamState> activeStreams() {
63          Map<Integer, StreamState> streams = new TreeMap<Integer, StreamState>(streamComparator);
64          streams.putAll(activeStreams);
65          return streams;
66      }
67  
68      void acceptStream(
69              int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed,
70              int sendWindowSize, int receiveWindowSize, boolean remote) {
71          if (!remoteSideClosed || !localSideClosed) {
72              StreamState state = activeStreams.put(streamId, new StreamState(
73                      priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize));
74              if (state == null) {
75                  if (remote) {
76                      activeRemoteStreams.incrementAndGet();
77                  } else {
78                      activeLocalStreams.incrementAndGet();
79                  }
80              }
81          }
82      }
83  
84      private StreamState removeActiveStream(int streamId, boolean remote) {
85          StreamState state = activeStreams.remove(streamId);
86          if (state != null) {
87              if (remote) {
88                  activeRemoteStreams.decrementAndGet();
89              } else {
90                  activeLocalStreams.decrementAndGet();
91              }
92          }
93          return state;
94      }
95  
96      void removeStream(int streamId, Throwable cause, boolean remote) {
97          StreamState state = removeActiveStream(streamId, remote);
98          if (state != null) {
99              state.clearPendingWrites(cause);
100         }
101     }
102 
103     boolean isRemoteSideClosed(int streamId) {
104         StreamState state = activeStreams.get(streamId);
105         return state == null || state.isRemoteSideClosed();
106     }
107 
108     void closeRemoteSide(int streamId, boolean remote) {
109         StreamState state = activeStreams.get(streamId);
110         if (state != null) {
111             state.closeRemoteSide();
112             if (state.isLocalSideClosed()) {
113                 removeActiveStream(streamId, remote);
114             }
115         }
116     }
117 
118     boolean isLocalSideClosed(int streamId) {
119         StreamState state = activeStreams.get(streamId);
120         return state == null || state.isLocalSideClosed();
121     }
122 
123     void closeLocalSide(int streamId, boolean remote) {
124         StreamState state = activeStreams.get(streamId);
125         if (state != null) {
126             state.closeLocalSide();
127             if (state.isRemoteSideClosed()) {
128                 removeActiveStream(streamId, remote);
129             }
130         }
131     }
132 
133     /*
134      * hasReceivedReply and receivedReply are only called from channelRead()
135      * no need to synchronize access to the StreamState
136      */
137     boolean hasReceivedReply(int streamId) {
138         StreamState state = activeStreams.get(streamId);
139         return state != null && state.hasReceivedReply();
140     }
141 
142     void receivedReply(int streamId) {
143         StreamState state = activeStreams.get(streamId);
144         if (state != null) {
145             state.receivedReply();
146         }
147     }
148 
149     int getSendWindowSize(int streamId) {
150         if (streamId == SPDY_SESSION_STREAM_ID) {
151             return sendWindowSize.get();
152         }
153 
154         StreamState state = activeStreams.get(streamId);
155         return state != null ? state.getSendWindowSize() : -1;
156     }
157 
158     int updateSendWindowSize(int streamId, int deltaWindowSize) {
159         if (streamId == SPDY_SESSION_STREAM_ID) {
160             return sendWindowSize.addAndGet(deltaWindowSize);
161         }
162 
163         StreamState state = activeStreams.get(streamId);
164         return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1;
165     }
166 
167     int updateReceiveWindowSize(int streamId, int deltaWindowSize) {
168         if (streamId == SPDY_SESSION_STREAM_ID) {
169             return receiveWindowSize.addAndGet(deltaWindowSize);
170         }
171 
172         StreamState state = activeStreams.get(streamId);
173         if (state == null) {
174             return -1;
175         }
176         if (deltaWindowSize > 0) {
177             state.setReceiveWindowSizeLowerBound(0);
178         }
179         return state.updateReceiveWindowSize(deltaWindowSize);
180     }
181 
182     int getReceiveWindowSizeLowerBound(int streamId) {
183         if (streamId == SPDY_SESSION_STREAM_ID) {
184             return 0;
185         }
186 
187         StreamState state = activeStreams.get(streamId);
188         return state != null ? state.getReceiveWindowSizeLowerBound() : 0;
189     }
190 
191     void updateAllSendWindowSizes(int deltaWindowSize) {
192         for (StreamState state: activeStreams.values()) {
193             state.updateSendWindowSize(deltaWindowSize);
194         }
195     }
196 
197     void updateAllReceiveWindowSizes(int deltaWindowSize) {
198         for (StreamState state: activeStreams.values()) {
199             state.updateReceiveWindowSize(deltaWindowSize);
200             if (deltaWindowSize < 0) {
201                 state.setReceiveWindowSizeLowerBound(deltaWindowSize);
202             }
203         }
204     }
205 
206     boolean putPendingWrite(int streamId, PendingWrite pendingWrite) {
207         StreamState state = activeStreams.get(streamId);
208         return state != null && state.putPendingWrite(pendingWrite);
209     }
210 
211     PendingWrite getPendingWrite(int streamId) {
212         if (streamId == SPDY_SESSION_STREAM_ID) {
213             for (Map.Entry<Integer, StreamState> e: activeStreams().entrySet()) {
214                 StreamState state = e.getValue();
215                 if (state.getSendWindowSize() > 0) {
216                     PendingWrite pendingWrite = state.getPendingWrite();
217                     if (pendingWrite != null) {
218                         return pendingWrite;
219                     }
220                 }
221             }
222             return null;
223         }
224 
225         StreamState state = activeStreams.get(streamId);
226         return state != null ? state.getPendingWrite() : null;
227     }
228 
229     PendingWrite removePendingWrite(int streamId) {
230         StreamState state = activeStreams.get(streamId);
231         return state != null ? state.removePendingWrite() : null;
232     }
233 
234     private static final class StreamState {
235 
236         private final byte priority;
237         private boolean remoteSideClosed;
238         private boolean localSideClosed;
239         private boolean receivedReply;
240         private final AtomicInteger sendWindowSize;
241         private final AtomicInteger receiveWindowSize;
242         private int receiveWindowSizeLowerBound;
243         private final Queue<PendingWrite> pendingWriteQueue = new ConcurrentLinkedQueue<PendingWrite>();
244 
245         StreamState(
246                 byte priority, boolean remoteSideClosed, boolean localSideClosed,
247                 int sendWindowSize, int receiveWindowSize) {
248             this.priority = priority;
249             this.remoteSideClosed = remoteSideClosed;
250             this.localSideClosed = localSideClosed;
251             this.sendWindowSize = new AtomicInteger(sendWindowSize);
252             this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
253         }
254 
255         byte getPriority() {
256             return priority;
257         }
258 
259         boolean isRemoteSideClosed() {
260             return remoteSideClosed;
261         }
262 
263         void closeRemoteSide() {
264             remoteSideClosed = true;
265         }
266 
267         boolean isLocalSideClosed() {
268             return localSideClosed;
269         }
270 
271         void closeLocalSide() {
272             localSideClosed = true;
273         }
274 
275         boolean hasReceivedReply() {
276             return receivedReply;
277         }
278 
279         void receivedReply() {
280             receivedReply = true;
281         }
282 
283         int getSendWindowSize() {
284             return sendWindowSize.get();
285         }
286 
287         int updateSendWindowSize(int deltaWindowSize) {
288             return sendWindowSize.addAndGet(deltaWindowSize);
289         }
290 
291         int updateReceiveWindowSize(int deltaWindowSize) {
292             return receiveWindowSize.addAndGet(deltaWindowSize);
293         }
294 
295         int getReceiveWindowSizeLowerBound() {
296             return receiveWindowSizeLowerBound;
297         }
298 
299         void setReceiveWindowSizeLowerBound(int receiveWindowSizeLowerBound) {
300             this.receiveWindowSizeLowerBound = receiveWindowSizeLowerBound;
301         }
302 
303         boolean putPendingWrite(PendingWrite msg) {
304             return pendingWriteQueue.offer(msg);
305         }
306 
307         PendingWrite getPendingWrite() {
308             return pendingWriteQueue.peek();
309         }
310 
311         PendingWrite removePendingWrite() {
312             return pendingWriteQueue.poll();
313         }
314 
315         void clearPendingWrites(Throwable cause) {
316             for (;;) {
317                 PendingWrite pendingWrite = pendingWriteQueue.poll();
318                 if (pendingWrite == null) {
319                     break;
320                 }
321                 pendingWrite.fail(cause);
322             }
323         }
324     }
325 
326     private final class StreamComparator implements Comparator<Integer> {
327 
328         StreamComparator() { }
329 
330         @Override
331         public int compare(Integer id1, Integer id2) {
332             StreamState state1 = activeStreams.get(id1);
333             StreamState state2 = activeStreams.get(id2);
334 
335             int result = state1.getPriority() - state2.getPriority();
336             if (result != 0) {
337                 return result;
338             }
339 
340             return id1 - id2;
341         }
342     }
343 
344     public static final class PendingWrite {
345         final SpdyDataFrame spdyDataFrame;
346         final ChannelPromise promise;
347 
348         PendingWrite(SpdyDataFrame spdyDataFrame, ChannelPromise promise) {
349             this.spdyDataFrame = spdyDataFrame;
350             this.promise = promise;
351         }
352 
353         void fail(Throwable cause) {
354             spdyDataFrame.release();
355             promise.setFailure(cause);
356         }
357     }
358 }