View Javadoc

1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.spdy;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelDownstreamHandler;
20  import org.jboss.netty.channel.ChannelEvent;
21  import org.jboss.netty.channel.ChannelFuture;
22  import org.jboss.netty.channel.ChannelFutureListener;
23  import org.jboss.netty.channel.ChannelHandlerContext;
24  import org.jboss.netty.channel.ChannelStateEvent;
25  import org.jboss.netty.channel.Channels;
26  import org.jboss.netty.channel.ExceptionEvent;
27  import org.jboss.netty.channel.MessageEvent;
28  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
29  
30  import java.net.SocketAddress;
31  import java.nio.channels.ClosedChannelException;
32  import java.util.concurrent.atomic.AtomicInteger;
33  
34  import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*;
35  
36  /**
37   * Manages streams within a SPDY session.
38   */
39  public class SpdySessionHandler extends SimpleChannelUpstreamHandler
40          implements ChannelDownstreamHandler {
41  
42      private static final SpdyProtocolException PROTOCOL_EXCEPTION = new SpdyProtocolException();
43  
44      private static final int DEFAULT_WINDOW_SIZE = 64 * 1024; // 64 KB default initial window size
45      private volatile int initialSendWindowSize    = DEFAULT_WINDOW_SIZE;
46      private volatile int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
47      private volatile int initialSessionReceiveWindowSize = DEFAULT_WINDOW_SIZE;
48  
49      private final SpdySession spdySession = new SpdySession(initialSendWindowSize, initialReceiveWindowSize);
50      private volatile int lastGoodStreamId;
51  
52      private static final int DEFAULT_MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE;
53      private volatile int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
54      private volatile int localConcurrentStreams  = DEFAULT_MAX_CONCURRENT_STREAMS;
55  
56      private final Object flowControlLock = new Object();
57  
58      private final AtomicInteger pings = new AtomicInteger();
59  
60      private volatile boolean sentGoAwayFrame;
61      private volatile boolean receivedGoAwayFrame;
62  
63      private volatile ChannelFutureListener closeSessionFutureListener;
64  
65      private final boolean server;
66      private final int minorVersion;
67  
68      /**
69       * Creates a new session handler.
70       *
71       * @param spdyVersion the protocol version
72       * @param server      {@code true} if and only if this session handler should
73       *                    handle the server endpoint of the connection.
74       *                    {@code false} if and only if this session handler should
75       *                    handle the client endpoint of the connection.
76       */
77      public SpdySessionHandler(SpdyVersion spdyVersion, boolean server) {
78          if (spdyVersion == null) {
79              throw new NullPointerException("spdyVersion");
80          }
81          this.server = server;
82          minorVersion = spdyVersion.getMinorVersion();
83      }
84  
85      public void setSessionReceiveWindowSize(int sessionReceiveWindowSize) {
86        if (sessionReceiveWindowSize < 0) {
87          throw new IllegalArgumentException("sessionReceiveWindowSize");
88        }
89        // This will not send a window update frame immediately.
90        // If this value increases the allowed receive window size,
91        // a WINDOW_UPDATE frame will be sent when only half of the
92        // session window size remains during data frame processing.
93        // If this value decreases the allowed receive window size,
94        // the window will be reduced as data frames are processed.
95        initialSessionReceiveWindowSize = sessionReceiveWindowSize;
96      }
97  
98      @Override
99      public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
100             throws Exception {
101 
102         Object msg = e.getMessage();
103         if (msg instanceof SpdyDataFrame) {
104 
105             /*
106              * SPDY Data frame processing requirements:
107              *
108              * If an endpoint receives a data frame for a Stream-ID which is not open
109              * and the endpoint has not sent a GOAWAY frame, it must issue a stream error
110              * with the error code INVALID_STREAM for the Stream-ID.
111              *
112              * If an endpoint which created the stream receives a data frame before receiving
113              * a SYN_REPLY on that stream, it is a protocol error, and the recipient must
114              * issue a stream error with the status code PROTOCOL_ERROR for the Stream-ID.
115              *
116              * If an endpoint receives multiple data frames for invalid Stream-IDs,
117              * it may close the session.
118              *
119              * If an endpoint refuses a stream it must ignore any data frames for that stream.
120              *
121              * If an endpoint receives a data frame after the stream is half-closed from the
122              * sender, it must send a RST_STREAM frame with the status STREAM_ALREADY_CLOSED.
123              *
124              * If an endpoint receives a data frame after the stream is closed, it must send
125              * a RST_STREAM frame with the status PROTOCOL_ERROR.
126              */
127 
128             SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
129             int streamId = spdyDataFrame.getStreamId();
130 
131             int deltaWindowSize = -1 * spdyDataFrame.getData().readableBytes();
132             int newSessionWindowSize =
133                 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, deltaWindowSize);
134 
135             // Check if session window size is reduced beyond allowable lower bound
136             if (newSessionWindowSize < 0) {
137                 issueSessionError(ctx, e.getChannel(), e.getRemoteAddress(), SpdySessionStatus.PROTOCOL_ERROR);
138                 return;
139             }
140 
141             // Send a WINDOW_UPDATE frame if less than half the session window size remains
142             if (newSessionWindowSize <= initialSessionReceiveWindowSize / 2) {
143                 int sessionDeltaWindowSize = initialSessionReceiveWindowSize - newSessionWindowSize;
144                 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
145                 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
146                     new DefaultSpdyWindowUpdateFrame(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
147                 Channels.write(
148                         ctx, Channels.future(e.getChannel()), spdyWindowUpdateFrame, e.getRemoteAddress());
149             }
150 
151             // Check if we received a data frame for a Stream-ID which is not open
152             if (!spdySession.isActiveStream(streamId)) {
153                 if (streamId <= lastGoodStreamId) {
154                     issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.PROTOCOL_ERROR);
155                 } else if (!sentGoAwayFrame) {
156                     issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.INVALID_STREAM);
157                 }
158                 return;
159             }
160 
161             // Check if we received a data frame for a stream which is half-closed
162             if (spdySession.isRemoteSideClosed(streamId)) {
163                 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.STREAM_ALREADY_CLOSED);
164                 return;
165             }
166 
167             // Check if we received a data frame before receiving a SYN_REPLY
168             if (!isRemoteInitiatedId(streamId) && !spdySession.hasReceivedReply(streamId)) {
169                 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.PROTOCOL_ERROR);
170                 return;
171             }
172 
173             /*
174              * SPDY Data frame flow control processing requirements:
175              *
176              * Recipient should not send a WINDOW_UPDATE frame as it consumes the last data frame.
177              */
178 
179             // Update receive window size
180             int newWindowSize = spdySession.updateReceiveWindowSize(streamId, deltaWindowSize);
181 
182             // Window size can become negative if we sent a SETTINGS frame that reduces the
183             // size of the transfer window after the peer has written data frames.
184             // The value is bounded by the length that SETTINGS frame decrease the window.
185             // This difference is stored for the session when writing the SETTINGS frame
186             // and is cleared once we send a WINDOW_UPDATE frame.
187             if (newWindowSize < spdySession.getReceiveWindowSizeLowerBound(streamId)) {
188                 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
189                 return;
190             }
191 
192             // Window size became negative due to sender writing frame before receiving SETTINGS
193             // Send data frames upstream in initialReceiveWindowSize chunks
194             if (newWindowSize < 0) {
195                 while (spdyDataFrame.getData().readableBytes() > initialReceiveWindowSize) {
196                     SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId);
197                     partialDataFrame.setData(spdyDataFrame.getData().readSlice(initialReceiveWindowSize));
198                     Channels.fireMessageReceived(ctx, partialDataFrame, e.getRemoteAddress());
199                 }
200             }
201 
202             // Send a WINDOW_UPDATE frame if less than half the stream window size remains
203             if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) {
204                 int streamDeltaWindowSize = initialReceiveWindowSize - newWindowSize;
205                 spdySession.updateReceiveWindowSize(streamId, streamDeltaWindowSize);
206                 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
207                         new DefaultSpdyWindowUpdateFrame(streamId, streamDeltaWindowSize);
208                 Channels.write(
209                         ctx, Channels.future(e.getChannel()), spdyWindowUpdateFrame, e.getRemoteAddress());
210             }
211 
212             // Close the remote side of the stream if this is the last frame
213             if (spdyDataFrame.isLast()) {
214                 halfCloseStream(streamId, true, e.getFuture());
215             }
216 
217         } else if (msg instanceof SpdySynStreamFrame) {
218 
219             /*
220              * SPDY SYN_STREAM frame processing requirements:
221              *
222              * If an endpoint receives a SYN_STREAM with a Stream-ID that is less than
223              * any previously received SYN_STREAM, it must issue a session error with
224              * the status PROTOCOL_ERROR.
225              *
226              * If an endpoint receives multiple SYN_STREAM frames with the same active
227              * Stream-ID, it must issue a stream error with the status code PROTOCOL_ERROR.
228              *
229              * The recipient can reject a stream by sending a stream error with the
230              * status code REFUSED_STREAM.
231              */
232 
233             SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
234             int streamId = spdySynStreamFrame.getStreamId();
235 
236             // Check if we received a valid SYN_STREAM frame
237             if (spdySynStreamFrame.isInvalid() ||
238                 !isRemoteInitiatedId(streamId) ||
239                 spdySession.isActiveStream(streamId)) {
240                 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.PROTOCOL_ERROR);
241                 return;
242             }
243 
244             // Stream-IDs must be monotonically increasing
245             if (streamId <= lastGoodStreamId) {
246                 issueSessionError(ctx, e.getChannel(), e.getRemoteAddress(), SpdySessionStatus.PROTOCOL_ERROR);
247                 return;
248             }
249 
250             // Try to accept the stream
251             byte priority = spdySynStreamFrame.getPriority();
252             boolean remoteSideClosed = spdySynStreamFrame.isLast();
253             boolean localSideClosed = spdySynStreamFrame.isUnidirectional();
254             if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
255                 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.REFUSED_STREAM);
256                 return;
257             }
258 
259         } else if (msg instanceof SpdySynReplyFrame) {
260 
261             /*
262              * SPDY SYN_REPLY frame processing requirements:
263              *
264              * If an endpoint receives multiple SYN_REPLY frames for the same active Stream-ID
265              * it must issue a stream error with the status code STREAM_IN_USE.
266              */
267 
268             SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
269             int streamId = spdySynReplyFrame.getStreamId();
270 
271             // Check if we received a valid SYN_REPLY frame
272             if (spdySynReplyFrame.isInvalid() ||
273                 isRemoteInitiatedId(streamId) ||
274                 spdySession.isRemoteSideClosed(streamId)) {
275                 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.INVALID_STREAM);
276                 return;
277             }
278 
279             // Check if we have received multiple frames for the same Stream-ID
280             if (spdySession.hasReceivedReply(streamId)) {
281                 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.STREAM_IN_USE);
282                 return;
283             }
284 
285             spdySession.receivedReply(streamId);
286 
287             // Close the remote side of the stream if this is the last frame
288             if (spdySynReplyFrame.isLast()) {
289                 halfCloseStream(streamId, true, e.getFuture());
290             }
291 
292         } else if (msg instanceof SpdyRstStreamFrame) {
293 
294             /*
295              * SPDY RST_STREAM frame processing requirements:
296              *
297              * After receiving a RST_STREAM on a stream, the receiver must not send
298              * additional frames on that stream.
299              *
300              * An endpoint must not send a RST_STREAM in response to a RST_STREAM.
301              */
302 
303             SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
304             removeStream(spdyRstStreamFrame.getStreamId(), e.getFuture());
305 
306         } else if (msg instanceof SpdySettingsFrame) {
307 
308             SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
309 
310             int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
311             if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
312                 // Settings frame had the wrong minor version
313                 issueSessionError(ctx, e.getChannel(), e.getRemoteAddress(), SpdySessionStatus.PROTOCOL_ERROR);
314                 return;
315             }
316 
317             int newConcurrentStreams =
318                 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
319             if (newConcurrentStreams >= 0) {
320                 remoteConcurrentStreams = newConcurrentStreams;
321             }
322 
323             // Persistence flag are inconsistent with the use of SETTINGS to communicate
324             // the initial window size. Remove flags from the sender requesting that the
325             // value be persisted. Remove values that the sender indicates are persisted.
326             if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
327                 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
328             }
329             spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
330 
331             int newInitialWindowSize =
332                 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
333             if (newInitialWindowSize >= 0) {
334                 updateInitialSendWindowSize(newInitialWindowSize);
335             }
336 
337         } else if (msg instanceof SpdyPingFrame) {
338 
339             /*
340              * SPDY PING frame processing requirements:
341              *
342              * Receivers of a PING frame should send an identical frame to the sender
343              * as soon as possible.
344              *
345              * Receivers of a PING frame must ignore frames that it did not initiate
346              */
347 
348             SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
349 
350             if (isRemoteInitiatedId(spdyPingFrame.getId())) {
351                 Channels.write(ctx, Channels.future(e.getChannel()), spdyPingFrame, e.getRemoteAddress());
352                 return;
353             }
354 
355             // Note: only checks that there are outstanding pings since uniqueness is not enforced
356             if (pings.get() == 0) {
357                 return;
358             }
359             pings.getAndDecrement();
360 
361         } else if (msg instanceof SpdyGoAwayFrame) {
362 
363             receivedGoAwayFrame = true;
364 
365         } else if (msg instanceof SpdyHeadersFrame) {
366 
367             SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
368             int streamId = spdyHeadersFrame.getStreamId();
369 
370             // Check if we received a valid HEADERS frame
371             if (spdyHeadersFrame.isInvalid()) {
372                 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.PROTOCOL_ERROR);
373                 return;
374             }
375 
376             if (spdySession.isRemoteSideClosed(streamId)) {
377                 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.INVALID_STREAM);
378                 return;
379             }
380 
381             // Close the remote side of the stream if this is the last frame
382             if (spdyHeadersFrame.isLast()) {
383                 halfCloseStream(streamId, true, e.getFuture());
384             }
385 
386         } else if (msg instanceof SpdyWindowUpdateFrame) {
387 
388             /*
389              * SPDY WINDOW_UPDATE frame processing requirements:
390              *
391              * Receivers of a WINDOW_UPDATE that cause the window size to exceed 2^31
392              * must send a RST_STREAM with the status code FLOW_CONTROL_ERROR.
393              *
394              * Sender should ignore all WINDOW_UPDATE frames associated with a stream
395              * after sending the last frame for the stream.
396              */
397 
398             SpdyWindowUpdateFrame spdyWindowUpdateFrame = (SpdyWindowUpdateFrame) msg;
399             int streamId = spdyWindowUpdateFrame.getStreamId();
400             int deltaWindowSize = spdyWindowUpdateFrame.getDeltaWindowSize();
401 
402             // Ignore frames for half-closed streams
403             if (streamId != SPDY_SESSION_STREAM_ID && spdySession.isLocalSideClosed(streamId)) {
404                 return;
405             }
406 
407             // Check for numerical overflow
408             if (spdySession.getSendWindowSize(streamId) > Integer.MAX_VALUE - deltaWindowSize) {
409                 if (streamId == SPDY_SESSION_STREAM_ID) {
410                     issueSessionError(ctx, e.getChannel(), e.getRemoteAddress(), SpdySessionStatus.PROTOCOL_ERROR);
411                 } else {
412                     issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
413                 }
414                 return;
415             }
416 
417             updateSendWindowSize(ctx, streamId, deltaWindowSize);
418             return;
419         }
420 
421         super.messageReceived(ctx, e);
422     }
423 
424     @Override
425     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
426             throws Exception {
427 
428         Throwable cause = e.getCause();
429         if (cause instanceof SpdyProtocolException) {
430             issueSessionError(ctx, e.getChannel(), null, SpdySessionStatus.PROTOCOL_ERROR);
431         }
432 
433         super.exceptionCaught(ctx, e);
434     }
435 
436     public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
437             throws Exception {
438         if (evt instanceof ChannelStateEvent) {
439             ChannelStateEvent e = (ChannelStateEvent) evt;
440             switch (e.getState()) {
441             case OPEN:
442             case CONNECTED:
443             case BOUND:
444 
445                 /*
446                  * SPDY connection requirements:
447                  *
448                  * When either endpoint closes the transport-level connection,
449                  * it must first send a GOAWAY frame.
450                  */
451                 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
452                     sendGoAwayFrame(ctx, e);
453                     return;
454                 }
455             }
456         }
457         if (!(evt instanceof MessageEvent)) {
458             ctx.sendDownstream(evt);
459             return;
460         }
461 
462         MessageEvent e = (MessageEvent) evt;
463         Object msg = e.getMessage();
464 
465         if (msg instanceof SpdyDataFrame) {
466 
467             SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
468             final int streamId = spdyDataFrame.getStreamId();
469 
470             // Frames must not be sent on half-closed streams
471             if (spdySession.isLocalSideClosed(streamId)) {
472                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
473                 return;
474             }
475 
476             /*
477              * SPDY Data frame flow control processing requirements:
478              *
479              * Sender must not send a data frame with data length greater
480              * than the transfer window size.
481              *
482              * After sending each data frame, the sender decrements its
483              * transfer window size by the amount of data transmitted.
484              *
485              * When the window size becomes less than or equal to 0, the
486              * sender must pause transmitting data frames.
487              */
488 
489             synchronized (flowControlLock) {
490                 int dataLength = spdyDataFrame.getData().readableBytes();
491                 int sendWindowSize = spdySession.getSendWindowSize(streamId);
492                 int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
493                 sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
494 
495                 if (sendWindowSize <= 0) {
496                     // Stream is stalled -- enqueue Data frame and return
497                     spdySession.putPendingWrite(streamId, e);
498                     return;
499                 } else if (sendWindowSize < dataLength) {
500                     // Stream is not stalled but we cannot send the entire frame
501                     spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize);
502                     spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
503 
504                     // Create a partial data frame whose length is the current window size
505                     SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId);
506                     partialDataFrame.setData(spdyDataFrame.getData().readSlice(sendWindowSize));
507 
508                     // Enqueue the remaining data (will be the first frame queued)
509                     spdySession.putPendingWrite(streamId, e);
510 
511                     ChannelFuture writeFuture = Channels.future(e.getChannel());
512 
513                     // The transfer window size is pre-decremented when sending a data frame downstream.
514                     // Close the session on write failures that leaves the transfer window in a corrupt state.
515                     final SocketAddress remoteAddress = e.getRemoteAddress();
516                     final ChannelHandlerContext context = ctx;
517                     e.getFuture().addListener(new ChannelFutureListener() {
518                         public void operationComplete(ChannelFuture future) throws Exception {
519                             if (!future.isSuccess()) {
520                                 Channel channel = future.getChannel();
521                                 issueSessionError(
522                                         context, channel, remoteAddress, SpdySessionStatus.INTERNAL_ERROR);
523                             }
524                         }
525                     });
526 
527                     Channels.write(ctx, writeFuture, partialDataFrame, remoteAddress);
528                     return;
529                 } else {
530                     // Window size is large enough to send entire data frame
531                     spdySession.updateSendWindowSize(streamId, -1 * dataLength);
532                     spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength);
533 
534                     // The transfer window size is pre-decremented when sending a data frame downstream.
535                     // Close the session on write failures that leaves the transfer window in a corrupt state.
536                     final SocketAddress remoteAddress = e.getRemoteAddress();
537                     final ChannelHandlerContext context = ctx;
538                     e.getFuture().addListener(new ChannelFutureListener() {
539                         public void operationComplete(ChannelFuture future) throws Exception {
540                             if (!future.isSuccess()) {
541                                 Channel channel = future.getChannel();
542                                 issueSessionError(
543                                         context, channel, remoteAddress, SpdySessionStatus.INTERNAL_ERROR);
544                             }
545                         }
546                     });
547                 }
548             }
549 
550             // Close the local side of the stream if this is the last frame
551             if (spdyDataFrame.isLast()) {
552                 halfCloseStream(streamId, false, e.getFuture());
553             }
554 
555         } else if (msg instanceof SpdySynStreamFrame) {
556 
557             SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
558             int streamId = spdySynStreamFrame.getStreamId();
559 
560             if (isRemoteInitiatedId(streamId)) {
561                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
562                 return;
563             }
564 
565             byte priority = spdySynStreamFrame.getPriority();
566             boolean remoteSideClosed = spdySynStreamFrame.isUnidirectional();
567             boolean localSideClosed = spdySynStreamFrame.isLast();
568             if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
569                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
570                 return;
571             }
572 
573         } else if (msg instanceof SpdySynReplyFrame) {
574 
575             SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
576             int streamId = spdySynReplyFrame.getStreamId();
577 
578             // Frames must not be sent on half-closed streams
579             if (!isRemoteInitiatedId(streamId) || spdySession.isLocalSideClosed(streamId)) {
580                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
581                 return;
582             }
583 
584             // Close the local side of the stream if this is the last frame
585             if (spdySynReplyFrame.isLast()) {
586                 halfCloseStream(streamId, false, e.getFuture());
587             }
588 
589         } else if (msg instanceof SpdyRstStreamFrame) {
590 
591             SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
592             removeStream(spdyRstStreamFrame.getStreamId(), e.getFuture());
593 
594         } else if (msg instanceof SpdySettingsFrame) {
595 
596             SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
597 
598             int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
599             if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
600                 // Settings frame had the wrong minor version
601                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
602                 return;
603             }
604 
605             int newConcurrentStreams =
606                     spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
607             if (newConcurrentStreams >= 0) {
608                 localConcurrentStreams = newConcurrentStreams;
609             }
610 
611             // Persistence flag are inconsistent with the use of SETTINGS to communicate
612             // the initial window size. Remove flags from the sender requesting that the
613             // value be persisted. Remove values that the sender indicates are persisted.
614             if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
615                 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
616             }
617             spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
618 
619             int newInitialWindowSize =
620                     spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
621             if (newInitialWindowSize >= 0) {
622                 updateInitialReceiveWindowSize(newInitialWindowSize);
623             }
624 
625         } else if (msg instanceof SpdyPingFrame) {
626 
627             SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
628             if (isRemoteInitiatedId(spdyPingFrame.getId())) {
629                 e.getFuture().setFailure(new IllegalArgumentException(
630                             "invalid PING ID: " + spdyPingFrame.getId()));
631                 return;
632             }
633             pings.getAndIncrement();
634 
635         } else if (msg instanceof SpdyGoAwayFrame) {
636 
637             // Why is this being sent? Intercept it and fail the write.
638             // Should have sent a CLOSE ChannelStateEvent
639             e.getFuture().setFailure(PROTOCOL_EXCEPTION);
640             return;
641 
642         } else if (msg instanceof SpdyHeadersFrame) {
643 
644             SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
645             int streamId = spdyHeadersFrame.getStreamId();
646 
647             // Frames must not be sent on half-closed streams
648             if (spdySession.isLocalSideClosed(streamId)) {
649                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
650                 return;
651             }
652 
653             // Close the local side of the stream if this is the last frame
654             if (spdyHeadersFrame.isLast()) {
655                 halfCloseStream(streamId, false, e.getFuture());
656             }
657 
658         } else if (msg instanceof SpdyWindowUpdateFrame) {
659 
660             // Why is this being sent? Intercept it and fail the write.
661             e.getFuture().setFailure(PROTOCOL_EXCEPTION);
662             return;
663         }
664 
665         ctx.sendDownstream(evt);
666     }
667 
668     /*
669      * SPDY Session Error Handling:
670      *
671      * When a session error occurs, the endpoint encountering the error must first
672      * send a GOAWAY frame with the Stream-ID of the most recently received stream
673      * from the remote endpoint, and the error code for why the session is terminating.
674      *
675      * After sending the GOAWAY frame, the endpoint must close the TCP connection.
676      */
677     private void issueSessionError(
678             ChannelHandlerContext ctx, Channel channel, SocketAddress remoteAddress, SpdySessionStatus status) {
679 
680         ChannelFuture future = sendGoAwayFrame(ctx, channel, remoteAddress, status);
681         future.addListener(ChannelFutureListener.CLOSE);
682     }
683 
684     /*
685      * SPDY Stream Error Handling:
686      *
687      * Upon a stream error, the endpoint must send a RST_STREAM frame which contains
688      * the Stream-ID for the stream where the error occurred and the error status which
689      * caused the error.
690      *
691      * After sending the RST_STREAM, the stream is closed to the sending endpoint.
692      *
693      * Note: this is only called by the worker thread
694      */
695     private void issueStreamError(
696             ChannelHandlerContext ctx, SocketAddress remoteAddress, int streamId, SpdyStreamStatus status) {
697 
698         boolean fireMessageReceived = !spdySession.isRemoteSideClosed(streamId);
699         ChannelFuture future = Channels.future(ctx.getChannel());
700         removeStream(streamId, future);
701 
702         SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, status);
703         Channels.write(ctx, future, spdyRstStreamFrame, remoteAddress);
704         if (fireMessageReceived) {
705             Channels.fireMessageReceived(ctx, spdyRstStreamFrame, remoteAddress);
706         }
707     }
708 
709     /*
710      * Helper functions
711      */
712 
713     private boolean isRemoteInitiatedId(int id) {
714         boolean serverId = isServerId(id);
715         return server && !serverId || !server && serverId;
716     }
717 
718     // need to synchronize to prevent new streams from being created while updating active streams
719     private synchronized void updateInitialSendWindowSize(int newInitialWindowSize) {
720         int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
721         initialSendWindowSize = newInitialWindowSize;
722         spdySession.updateAllSendWindowSizes(deltaWindowSize);
723     }
724 
725     // need to synchronize to prevent new streams from being created while updating active streams
726     private synchronized void updateInitialReceiveWindowSize(int newInitialWindowSize) {
727         int deltaWindowSize = newInitialWindowSize - initialReceiveWindowSize;
728         initialReceiveWindowSize = newInitialWindowSize;
729         spdySession.updateAllReceiveWindowSizes(deltaWindowSize);
730     }
731 
732     // need to synchronize accesses to sentGoAwayFrame, lastGoodStreamId, and initial window sizes
733     private synchronized boolean acceptStream(
734             int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed) {
735         // Cannot initiate any new streams after receiving or sending GOAWAY
736         if (receivedGoAwayFrame || sentGoAwayFrame) {
737             return false;
738         }
739 
740         boolean remote = isRemoteInitiatedId(streamId);
741         int maxConcurrentStreams = remote ? localConcurrentStreams : remoteConcurrentStreams;
742         if (spdySession.numActiveStreams(remote) >= maxConcurrentStreams) {
743             return false;
744         }
745         spdySession.acceptStream(
746                 streamId, priority, remoteSideClosed, localSideClosed,
747                 initialSendWindowSize, initialReceiveWindowSize, remote);
748         if (remote) {
749             lastGoodStreamId = streamId;
750         }
751         return true;
752     }
753 
754     private void halfCloseStream(int streamId, boolean remote, ChannelFuture future) {
755         if (remote) {
756             spdySession.closeRemoteSide(streamId, isRemoteInitiatedId(streamId));
757         } else {
758             spdySession.closeLocalSide(streamId, isRemoteInitiatedId(streamId));
759         }
760         if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
761             future.addListener(closeSessionFutureListener);
762         }
763     }
764 
765     private void removeStream(int streamId, ChannelFuture future) {
766         spdySession.removeStream(streamId, isRemoteInitiatedId(streamId));
767         if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
768             future.addListener(closeSessionFutureListener);
769         }
770     }
771 
772     private void updateSendWindowSize(ChannelHandlerContext ctx, int streamId, int deltaWindowSize) {
773         synchronized (flowControlLock) {
774             int newWindowSize = spdySession.updateSendWindowSize(streamId, deltaWindowSize);
775             if (streamId != SPDY_SESSION_STREAM_ID) {
776                 int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
777                 newWindowSize = Math.min(newWindowSize, sessionSendWindowSize);
778             }
779 
780             while (newWindowSize > 0) {
781                 // Check if we have unblocked a stalled stream
782                 MessageEvent e = spdySession.getPendingWrite(streamId);
783                 if (e == null) {
784                     break;
785                 }
786 
787                 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) e.getMessage();
788                 int dataFrameSize = spdyDataFrame.getData().readableBytes();
789                 int writeStreamId = spdyDataFrame.getStreamId();
790                 if (streamId == SPDY_SESSION_STREAM_ID) {
791                     newWindowSize = Math.min(newWindowSize, spdySession.getSendWindowSize(writeStreamId));
792                 }
793 
794                 if (newWindowSize >= dataFrameSize) {
795                     // Window size is large enough to send entire data frame
796                     spdySession.removePendingWrite(writeStreamId);
797                     newWindowSize = spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize);
798                     int sessionSendWindowSize =
799                             spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize);
800                     newWindowSize = Math.min(newWindowSize, sessionSendWindowSize);
801 
802                     // The transfer window size is pre-decremented when sending a data frame downstream.
803                     // Close the session on write failures that leaves the transfer window in a corrupt state.
804                     final SocketAddress remoteAddress = e.getRemoteAddress();
805                     final ChannelHandlerContext context = ctx;
806                     e.getFuture().addListener(new ChannelFutureListener() {
807                         public void operationComplete(ChannelFuture future) throws Exception {
808                             if (!future.isSuccess()) {
809                                 Channel channel = future.getChannel();
810                                 issueSessionError(context, channel, remoteAddress, SpdySessionStatus.INTERNAL_ERROR);
811                             }
812                         }
813                     });
814 
815                     // Close the local side of the stream if this is the last frame
816                     if (spdyDataFrame.isLast()) {
817                         halfCloseStream(writeStreamId, false, e.getFuture());
818                     }
819 
820                     Channels.write(ctx, e.getFuture(), spdyDataFrame, e.getRemoteAddress());
821                 } else {
822                     // We can send a partial frame
823                     spdySession.updateSendWindowSize(writeStreamId, -1 * newWindowSize);
824                     spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * newWindowSize);
825 
826                     // Create a partial data frame whose length is the current window size
827                     SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(writeStreamId);
828                     partialDataFrame.setData(spdyDataFrame.getData().readSlice(newWindowSize));
829 
830                     ChannelFuture writeFuture = Channels.future(e.getChannel());
831 
832                     // The transfer window size is pre-decremented when sending a data frame downstream.
833                     // Close the session on write failures that leaves the transfer window in a corrupt state.
834                     final SocketAddress remoteAddress = e.getRemoteAddress();
835                     final ChannelHandlerContext context = ctx;
836                     e.getFuture().addListener(new ChannelFutureListener() {
837                         public void operationComplete(ChannelFuture future) throws Exception {
838                             if (!future.isSuccess()) {
839                                 Channel channel = future.getChannel();
840                                 issueSessionError(context, channel, remoteAddress, SpdySessionStatus.INTERNAL_ERROR);
841                             }
842                         }
843                     });
844 
845                     Channels.write(ctx, writeFuture, partialDataFrame, remoteAddress);
846 
847                     newWindowSize = 0;
848                 }
849             }
850         }
851     }
852 
853     private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelStateEvent e) {
854         // Avoid NotYetConnectedException
855         if (!e.getChannel().isConnected()) {
856             ctx.sendDownstream(e);
857             return;
858         }
859 
860         ChannelFuture future = sendGoAwayFrame(ctx, e.getChannel(), null, SpdySessionStatus.OK);
861         if (spdySession.noActiveStreams()) {
862             future.addListener(new ClosingChannelFutureListener(ctx, e));
863         } else {
864             closeSessionFutureListener = new ClosingChannelFutureListener(ctx, e);
865         }
866     }
867 
868     private synchronized ChannelFuture sendGoAwayFrame(
869             ChannelHandlerContext ctx, Channel channel, SocketAddress remoteAddress, SpdySessionStatus status) {
870         if (!sentGoAwayFrame) {
871             sentGoAwayFrame = true;
872             SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamId, status);
873             ChannelFuture future = Channels.future(channel);
874             Channels.write(ctx, future, spdyGoAwayFrame, remoteAddress);
875             return future;
876         }
877         return Channels.succeededFuture(channel);
878     }
879 
880     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
881         private final ChannelHandlerContext ctx;
882         private final ChannelStateEvent e;
883 
884         ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelStateEvent e) {
885             this.ctx = ctx;
886             this.e = e;
887         }
888 
889         public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
890             if (!(sentGoAwayFuture.getCause() instanceof ClosedChannelException)) {
891                 Channels.close(ctx, e.getFuture());
892             } else {
893                 e.getFuture().setSuccess();
894             }
895         }
896     }
897 }