View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.spdy;
17  
18  import io.netty.channel.ChannelPromise;
19  import io.netty.util.internal.PlatformDependent;
20  
21  import java.util.Comparator;
22  import java.util.Map;
23  import java.util.Queue;
24  import java.util.TreeMap;
25  import java.util.concurrent.ConcurrentLinkedQueue;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
29  
30  final class SpdySession {
31  
32      private final AtomicInteger activeLocalStreams  = new AtomicInteger();
33      private final AtomicInteger activeRemoteStreams = new AtomicInteger();
34      private final Map<Integer, StreamState> activeStreams = PlatformDependent.newConcurrentHashMap();
35      private final StreamComparator streamComparator = new StreamComparator();
36      private final AtomicInteger sendWindowSize;
37      private final AtomicInteger receiveWindowSize;
38  
39      SpdySession(int sendWindowSize, int receiveWindowSize) {
40          this.sendWindowSize = new AtomicInteger(sendWindowSize);
41          this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
42      }
43  
44      int numActiveStreams(boolean remote) {
45          if (remote) {
46              return activeRemoteStreams.get();
47          } else {
48              return activeLocalStreams.get();
49          }
50      }
51  
52      boolean noActiveStreams() {
53          return activeStreams.isEmpty();
54      }
55  
56      boolean isActiveStream(int streamId) {
57          return activeStreams.containsKey(streamId);
58      }
59  
60      // Stream-IDs should be iterated in priority order
61      Map<Integer, StreamState> activeStreams() {
62          Map<Integer, StreamState> streams = new TreeMap<Integer, StreamState>(streamComparator);
63          streams.putAll(activeStreams);
64          return streams;
65      }
66  
67      void acceptStream(
68              int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed,
69              int sendWindowSize, int receiveWindowSize, boolean remote) {
70          if (!remoteSideClosed || !localSideClosed) {
71              StreamState state = activeStreams.put(streamId, new StreamState(
72                      priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize));
73              if (state == null) {
74                  if (remote) {
75                      activeRemoteStreams.incrementAndGet();
76                  } else {
77                      activeLocalStreams.incrementAndGet();
78                  }
79              }
80          }
81      }
82  
83      private StreamState removeActiveStream(int streamId, boolean remote) {
84          StreamState state = activeStreams.remove(streamId);
85          if (state != null) {
86              if (remote) {
87                  activeRemoteStreams.decrementAndGet();
88              } else {
89                  activeLocalStreams.decrementAndGet();
90              }
91          }
92          return state;
93      }
94  
95      void removeStream(int streamId, Throwable cause, boolean remote) {
96          StreamState state = removeActiveStream(streamId, remote);
97          if (state != null) {
98              state.clearPendingWrites(cause);
99          }
100     }
101 
102     boolean isRemoteSideClosed(int streamId) {
103         StreamState state = activeStreams.get(streamId);
104         return state == null || state.isRemoteSideClosed();
105     }
106 
107     void closeRemoteSide(int streamId, boolean remote) {
108         StreamState state = activeStreams.get(streamId);
109         if (state != null) {
110             state.closeRemoteSide();
111             if (state.isLocalSideClosed()) {
112                 removeActiveStream(streamId, remote);
113             }
114         }
115     }
116 
117     boolean isLocalSideClosed(int streamId) {
118         StreamState state = activeStreams.get(streamId);
119         return state == null || state.isLocalSideClosed();
120     }
121 
122     void closeLocalSide(int streamId, boolean remote) {
123         StreamState state = activeStreams.get(streamId);
124         if (state != null) {
125             state.closeLocalSide();
126             if (state.isRemoteSideClosed()) {
127                 removeActiveStream(streamId, remote);
128             }
129         }
130     }
131 
132     /*
133      * hasReceivedReply and receivedReply are only called from channelRead()
134      * no need to synchronize access to the StreamState
135      */
136     boolean hasReceivedReply(int streamId) {
137         StreamState state = activeStreams.get(streamId);
138         return state != null && state.hasReceivedReply();
139     }
140 
141     void receivedReply(int streamId) {
142         StreamState state = activeStreams.get(streamId);
143         if (state != null) {
144             state.receivedReply();
145         }
146     }
147 
148     int getSendWindowSize(int streamId) {
149         if (streamId == SPDY_SESSION_STREAM_ID) {
150             return sendWindowSize.get();
151         }
152 
153         StreamState state = activeStreams.get(streamId);
154         return state != null ? state.getSendWindowSize() : -1;
155     }
156 
157     int updateSendWindowSize(int streamId, int deltaWindowSize) {
158         if (streamId == SPDY_SESSION_STREAM_ID) {
159             return sendWindowSize.addAndGet(deltaWindowSize);
160         }
161 
162         StreamState state = activeStreams.get(streamId);
163         return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1;
164     }
165 
166     int updateReceiveWindowSize(int streamId, int deltaWindowSize) {
167         if (streamId == SPDY_SESSION_STREAM_ID) {
168             return receiveWindowSize.addAndGet(deltaWindowSize);
169         }
170 
171         StreamState state = activeStreams.get(streamId);
172         if (state == null) {
173             return -1;
174         }
175         if (deltaWindowSize > 0) {
176             state.setReceiveWindowSizeLowerBound(0);
177         }
178         return state.updateReceiveWindowSize(deltaWindowSize);
179     }
180 
181     int getReceiveWindowSizeLowerBound(int streamId) {
182         if (streamId == SPDY_SESSION_STREAM_ID) {
183             return 0;
184         }
185 
186         StreamState state = activeStreams.get(streamId);
187         return state != null ? state.getReceiveWindowSizeLowerBound() : 0;
188     }
189 
190     void updateAllSendWindowSizes(int deltaWindowSize) {
191         for (StreamState state: activeStreams.values()) {
192             state.updateSendWindowSize(deltaWindowSize);
193         }
194     }
195 
196     void updateAllReceiveWindowSizes(int deltaWindowSize) {
197         for (StreamState state: activeStreams.values()) {
198             state.updateReceiveWindowSize(deltaWindowSize);
199             if (deltaWindowSize < 0) {
200                 state.setReceiveWindowSizeLowerBound(deltaWindowSize);
201             }
202         }
203     }
204 
205     boolean putPendingWrite(int streamId, PendingWrite pendingWrite) {
206         StreamState state = activeStreams.get(streamId);
207         return state != null && state.putPendingWrite(pendingWrite);
208     }
209 
210     PendingWrite getPendingWrite(int streamId) {
211         if (streamId == SPDY_SESSION_STREAM_ID) {
212             for (Map.Entry<Integer, StreamState> e: activeStreams().entrySet()) {
213                 StreamState state = e.getValue();
214                 if (state.getSendWindowSize() > 0) {
215                     PendingWrite pendingWrite = state.getPendingWrite();
216                     if (pendingWrite != null) {
217                         return pendingWrite;
218                     }
219                 }
220             }
221             return null;
222         }
223 
224         StreamState state = activeStreams.get(streamId);
225         return state != null ? state.getPendingWrite() : null;
226     }
227 
228     PendingWrite removePendingWrite(int streamId) {
229         StreamState state = activeStreams.get(streamId);
230         return state != null ? state.removePendingWrite() : null;
231     }
232 
233     private static final class StreamState {
234 
235         private final byte priority;
236         private boolean remoteSideClosed;
237         private boolean localSideClosed;
238         private boolean receivedReply;
239         private final AtomicInteger sendWindowSize;
240         private final AtomicInteger receiveWindowSize;
241         private int receiveWindowSizeLowerBound;
242         private final Queue<PendingWrite> pendingWriteQueue = new ConcurrentLinkedQueue<PendingWrite>();
243 
244         StreamState(
245                 byte priority, boolean remoteSideClosed, boolean localSideClosed,
246                 int sendWindowSize, int receiveWindowSize) {
247             this.priority = priority;
248             this.remoteSideClosed = remoteSideClosed;
249             this.localSideClosed = localSideClosed;
250             this.sendWindowSize = new AtomicInteger(sendWindowSize);
251             this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
252         }
253 
254         byte getPriority() {
255             return priority;
256         }
257 
258         boolean isRemoteSideClosed() {
259             return remoteSideClosed;
260         }
261 
262         void closeRemoteSide() {
263             remoteSideClosed = true;
264         }
265 
266         boolean isLocalSideClosed() {
267             return localSideClosed;
268         }
269 
270         void closeLocalSide() {
271             localSideClosed = true;
272         }
273 
274         boolean hasReceivedReply() {
275             return receivedReply;
276         }
277 
278         void receivedReply() {
279             receivedReply = true;
280         }
281 
282         int getSendWindowSize() {
283             return sendWindowSize.get();
284         }
285 
286         int updateSendWindowSize(int deltaWindowSize) {
287             return sendWindowSize.addAndGet(deltaWindowSize);
288         }
289 
290         int updateReceiveWindowSize(int deltaWindowSize) {
291             return receiveWindowSize.addAndGet(deltaWindowSize);
292         }
293 
294         int getReceiveWindowSizeLowerBound() {
295             return receiveWindowSizeLowerBound;
296         }
297 
298         void setReceiveWindowSizeLowerBound(int receiveWindowSizeLowerBound) {
299             this.receiveWindowSizeLowerBound = receiveWindowSizeLowerBound;
300         }
301 
302         boolean putPendingWrite(PendingWrite msg) {
303             return pendingWriteQueue.offer(msg);
304         }
305 
306         PendingWrite getPendingWrite() {
307             return pendingWriteQueue.peek();
308         }
309 
310         PendingWrite removePendingWrite() {
311             return pendingWriteQueue.poll();
312         }
313 
314         void clearPendingWrites(Throwable cause) {
315             for (;;) {
316                 PendingWrite pendingWrite = pendingWriteQueue.poll();
317                 if (pendingWrite == null) {
318                     break;
319                 }
320                 pendingWrite.fail(cause);
321             }
322         }
323     }
324 
325     private final class StreamComparator implements Comparator<Integer> {
326 
327         StreamComparator() { }
328 
329         @Override
330         public int compare(Integer id1, Integer id2) {
331             StreamState state1 = activeStreams.get(id1);
332             StreamState state2 = activeStreams.get(id2);
333 
334             int result = state1.getPriority() - state2.getPriority();
335             if (result != 0) {
336                 return result;
337             }
338 
339             return id1 - id2;
340         }
341     }
342 
343     public static final class PendingWrite {
344         final SpdyDataFrame spdyDataFrame;
345         final ChannelPromise promise;
346 
347         PendingWrite(SpdyDataFrame spdyDataFrame, ChannelPromise promise) {
348             this.spdyDataFrame = spdyDataFrame;
349             this.promise = promise;
350         }
351 
352         void fail(Throwable cause) {
353             spdyDataFrame.release();
354             promise.setFailure(cause);
355         }
356     }
357 }