View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.spdy;
17  
18  import io.netty.channel.ChannelDuplexHandler;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelFutureListener;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelPromise;
23  import io.netty.util.internal.ThrowableUtil;
24  
25  import java.util.concurrent.atomic.AtomicInteger;
26  
27  import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_SESSION_STREAM_ID;
28  import static io.netty.handler.codec.spdy.SpdyCodecUtil.isServerId;
29  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
30  
31  /**
32   * Manages streams within a SPDY session.
33   */
34  public class SpdySessionHandler extends ChannelDuplexHandler {
35  
36      private static final SpdyProtocolException PROTOCOL_EXCEPTION = ThrowableUtil.unknownStackTrace(
37              SpdyProtocolException.newStatic(null), SpdySessionHandler.class, "handleOutboundMessage(...)");
38      private static final SpdyProtocolException STREAM_CLOSED = ThrowableUtil.unknownStackTrace(
39              SpdyProtocolException.newStatic("Stream closed"), SpdySessionHandler.class, "removeStream(...)");
40  
41      private static final int DEFAULT_WINDOW_SIZE = 64 * 1024; // 64 KB default initial window size
42      private int initialSendWindowSize    = DEFAULT_WINDOW_SIZE;
43      private int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
44      private volatile int initialSessionReceiveWindowSize = DEFAULT_WINDOW_SIZE;
45  
46      private final SpdySession spdySession = new SpdySession(initialSendWindowSize, initialReceiveWindowSize);
47      private int lastGoodStreamId;
48  
49      private static final int DEFAULT_MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE;
50      private int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
51      private int localConcurrentStreams  = DEFAULT_MAX_CONCURRENT_STREAMS;
52  
53      private final AtomicInteger pings = new AtomicInteger();
54  
55      private boolean sentGoAwayFrame;
56      private boolean receivedGoAwayFrame;
57  
58      private ChannelFutureListener closeSessionFutureListener;
59  
60      private final boolean server;
61      private final int minorVersion;
62  
63      /**
64       * Creates a new session handler.
65       *
66       * @param version the protocol version
67       * @param server  {@code true} if and only if this session handler should
68       *                handle the server endpoint of the connection.
69       *                {@code false} if and only if this session handler should
70       *                handle the client endpoint of the connection.
71       */
72      public SpdySessionHandler(SpdyVersion version, boolean server) {
73          if (version == null) {
74              throw new NullPointerException("version");
75          }
76          this.server = server;
77          minorVersion = version.getMinorVersion();
78      }
79  
80      public void setSessionReceiveWindowSize(int sessionReceiveWindowSize) {
81          checkPositiveOrZero(sessionReceiveWindowSize, "sessionReceiveWindowSize");
82          // This will not send a window update frame immediately.
83          // If this value increases the allowed receive window size,
84          // a WINDOW_UPDATE frame will be sent when only half of the
85          // session window size remains during data frame processing.
86          // If this value decreases the allowed receive window size,
87          // the window will be reduced as data frames are processed.
88          initialSessionReceiveWindowSize = sessionReceiveWindowSize;
89      }
90  
91      @Override
92      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
93          if (msg instanceof SpdyDataFrame) {
94  
95              /*
96               * SPDY Data frame processing requirements:
97               *
98               * If an endpoint receives a data frame for a Stream-ID which is not open
99               * and the endpoint has not sent a GOAWAY frame, it must issue a stream error
100              * with the error code INVALID_STREAM for the Stream-ID.
101              *
102              * If an endpoint which created the stream receives a data frame before receiving
103              * a SYN_REPLY on that stream, it is a protocol error, and the recipient must
104              * issue a stream error with the getStatus code PROTOCOL_ERROR for the Stream-ID.
105              *
106              * If an endpoint receives multiple data frames for invalid Stream-IDs,
107              * it may close the session.
108              *
109              * If an endpoint refuses a stream it must ignore any data frames for that stream.
110              *
111              * If an endpoint receives a data frame after the stream is half-closed from the
112              * sender, it must send a RST_STREAM frame with the getStatus STREAM_ALREADY_CLOSED.
113              *
114              * If an endpoint receives a data frame after the stream is closed, it must send
115              * a RST_STREAM frame with the getStatus PROTOCOL_ERROR.
116              */
117             SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
118             int streamId = spdyDataFrame.streamId();
119 
120             int deltaWindowSize = -1 * spdyDataFrame.content().readableBytes();
121             int newSessionWindowSize =
122                 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, deltaWindowSize);
123 
124             // Check if session window size is reduced beyond allowable lower bound
125             if (newSessionWindowSize < 0) {
126                 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
127                 return;
128             }
129 
130             // Send a WINDOW_UPDATE frame if less than half the session window size remains
131             if (newSessionWindowSize <= initialSessionReceiveWindowSize / 2) {
132                 int sessionDeltaWindowSize = initialSessionReceiveWindowSize - newSessionWindowSize;
133                 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
134                 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
135                     new DefaultSpdyWindowUpdateFrame(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
136                 ctx.writeAndFlush(spdyWindowUpdateFrame);
137             }
138 
139             // Check if we received a data frame for a Stream-ID which is not open
140 
141             if (!spdySession.isActiveStream(streamId)) {
142                 spdyDataFrame.release();
143                 if (streamId <= lastGoodStreamId) {
144                     issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
145                 } else if (!sentGoAwayFrame) {
146                     issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
147                 }
148                 return;
149             }
150 
151             // Check if we received a data frame for a stream which is half-closed
152 
153             if (spdySession.isRemoteSideClosed(streamId)) {
154                 spdyDataFrame.release();
155                 issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_ALREADY_CLOSED);
156                 return;
157             }
158 
159             // Check if we received a data frame before receiving a SYN_REPLY
160             if (!isRemoteInitiatedId(streamId) && !spdySession.hasReceivedReply(streamId)) {
161                 spdyDataFrame.release();
162                 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
163                 return;
164             }
165 
166             /*
167              * SPDY Data frame flow control processing requirements:
168              *
169              * Recipient should not send a WINDOW_UPDATE frame as it consumes the last data frame.
170              */
171 
172             // Update receive window size
173             int newWindowSize = spdySession.updateReceiveWindowSize(streamId, deltaWindowSize);
174 
175             // Window size can become negative if we sent a SETTINGS frame that reduces the
176             // size of the transfer window after the peer has written data frames.
177             // The value is bounded by the length that SETTINGS frame decrease the window.
178             // This difference is stored for the session when writing the SETTINGS frame
179             // and is cleared once we send a WINDOW_UPDATE frame.
180             if (newWindowSize < spdySession.getReceiveWindowSizeLowerBound(streamId)) {
181                 spdyDataFrame.release();
182                 issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
183                 return;
184             }
185 
186             // Window size became negative due to sender writing frame before receiving SETTINGS
187             // Send data frames upstream in initialReceiveWindowSize chunks
188             if (newWindowSize < 0) {
189                 while (spdyDataFrame.content().readableBytes() > initialReceiveWindowSize) {
190                     SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(
191                             streamId, spdyDataFrame.content().readRetainedSlice(initialReceiveWindowSize));
192                     ctx.writeAndFlush(partialDataFrame);
193                 }
194             }
195 
196             // Send a WINDOW_UPDATE frame if less than half the stream window size remains
197             if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) {
198                 int streamDeltaWindowSize = initialReceiveWindowSize - newWindowSize;
199                 spdySession.updateReceiveWindowSize(streamId, streamDeltaWindowSize);
200                 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
201                         new DefaultSpdyWindowUpdateFrame(streamId, streamDeltaWindowSize);
202                 ctx.writeAndFlush(spdyWindowUpdateFrame);
203             }
204 
205             // Close the remote side of the stream if this is the last frame
206             if (spdyDataFrame.isLast()) {
207                 halfCloseStream(streamId, true, ctx.newSucceededFuture());
208             }
209 
210         } else if (msg instanceof SpdySynStreamFrame) {
211 
212             /*
213              * SPDY SYN_STREAM frame processing requirements:
214              *
215              * If an endpoint receives a SYN_STREAM with a Stream-ID that is less than
216              * any previously received SYN_STREAM, it must issue a session error with
217              * the getStatus PROTOCOL_ERROR.
218              *
219              * If an endpoint receives multiple SYN_STREAM frames with the same active
220              * Stream-ID, it must issue a stream error with the getStatus code PROTOCOL_ERROR.
221              *
222              * The recipient can reject a stream by sending a stream error with the
223              * getStatus code REFUSED_STREAM.
224              */
225 
226             SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
227             int streamId = spdySynStreamFrame.streamId();
228 
229             // Check if we received a valid SYN_STREAM frame
230             if (spdySynStreamFrame.isInvalid() ||
231                 !isRemoteInitiatedId(streamId) ||
232                 spdySession.isActiveStream(streamId)) {
233                 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
234                 return;
235             }
236 
237             // Stream-IDs must be monotonically increasing
238             if (streamId <= lastGoodStreamId) {
239                 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
240                 return;
241             }
242 
243             // Try to accept the stream
244             byte priority = spdySynStreamFrame.priority();
245             boolean remoteSideClosed = spdySynStreamFrame.isLast();
246             boolean localSideClosed = spdySynStreamFrame.isUnidirectional();
247             if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
248                 issueStreamError(ctx, streamId, SpdyStreamStatus.REFUSED_STREAM);
249                 return;
250             }
251 
252         } else if (msg instanceof SpdySynReplyFrame) {
253 
254             /*
255              * SPDY SYN_REPLY frame processing requirements:
256              *
257              * If an endpoint receives multiple SYN_REPLY frames for the same active Stream-ID
258              * it must issue a stream error with the getStatus code STREAM_IN_USE.
259              */
260 
261             SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
262             int streamId = spdySynReplyFrame.streamId();
263 
264             // Check if we received a valid SYN_REPLY frame
265             if (spdySynReplyFrame.isInvalid() ||
266                 isRemoteInitiatedId(streamId) ||
267                 spdySession.isRemoteSideClosed(streamId)) {
268                 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
269                 return;
270             }
271 
272             // Check if we have received multiple frames for the same Stream-ID
273             if (spdySession.hasReceivedReply(streamId)) {
274                 issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_IN_USE);
275                 return;
276             }
277 
278             spdySession.receivedReply(streamId);
279 
280             // Close the remote side of the stream if this is the last frame
281             if (spdySynReplyFrame.isLast()) {
282                 halfCloseStream(streamId, true, ctx.newSucceededFuture());
283             }
284 
285         } else if (msg instanceof SpdyRstStreamFrame) {
286 
287             /*
288              * SPDY RST_STREAM frame processing requirements:
289              *
290              * After receiving a RST_STREAM on a stream, the receiver must not send
291              * additional frames on that stream.
292              *
293              * An endpoint must not send a RST_STREAM in response to a RST_STREAM.
294              */
295 
296             SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
297             removeStream(spdyRstStreamFrame.streamId(), ctx.newSucceededFuture());
298 
299         } else if (msg instanceof SpdySettingsFrame) {
300 
301             SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
302 
303             int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
304             if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
305                 // Settings frame had the wrong minor version
306                 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
307                 return;
308             }
309 
310             int newConcurrentStreams =
311                 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
312             if (newConcurrentStreams >= 0) {
313                 remoteConcurrentStreams = newConcurrentStreams;
314             }
315 
316             // Persistence flag are inconsistent with the use of SETTINGS to communicate
317             // the initial window size. Remove flags from the sender requesting that the
318             // value be persisted. Remove values that the sender indicates are persisted.
319             if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
320                 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
321             }
322             spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
323 
324             int newInitialWindowSize =
325                 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
326             if (newInitialWindowSize >= 0) {
327                 updateInitialSendWindowSize(newInitialWindowSize);
328             }
329 
330         } else if (msg instanceof SpdyPingFrame) {
331 
332             /*
333              * SPDY PING frame processing requirements:
334              *
335              * Receivers of a PING frame should send an identical frame to the sender
336              * as soon as possible.
337              *
338              * Receivers of a PING frame must ignore frames that it did not initiate
339              */
340 
341             SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
342 
343             if (isRemoteInitiatedId(spdyPingFrame.id())) {
344                 ctx.writeAndFlush(spdyPingFrame);
345                 return;
346             }
347 
348             // Note: only checks that there are outstanding pings since uniqueness is not enforced
349             if (pings.get() == 0) {
350                 return;
351             }
352             pings.getAndDecrement();
353 
354         } else if (msg instanceof SpdyGoAwayFrame) {
355 
356             receivedGoAwayFrame = true;
357 
358         } else if (msg instanceof SpdyHeadersFrame) {
359 
360             SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
361             int streamId = spdyHeadersFrame.streamId();
362 
363             // Check if we received a valid HEADERS frame
364             if (spdyHeadersFrame.isInvalid()) {
365                 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
366                 return;
367             }
368 
369             if (spdySession.isRemoteSideClosed(streamId)) {
370                 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
371                 return;
372             }
373 
374             // Close the remote side of the stream if this is the last frame
375             if (spdyHeadersFrame.isLast()) {
376                 halfCloseStream(streamId, true, ctx.newSucceededFuture());
377             }
378 
379         } else if (msg instanceof SpdyWindowUpdateFrame) {
380 
381             /*
382              * SPDY WINDOW_UPDATE frame processing requirements:
383              *
384              * Receivers of a WINDOW_UPDATE that cause the window size to exceed 2^31
385              * must send a RST_STREAM with the getStatus code FLOW_CONTROL_ERROR.
386              *
387              * Sender should ignore all WINDOW_UPDATE frames associated with a stream
388              * after sending the last frame for the stream.
389              */
390 
391             SpdyWindowUpdateFrame spdyWindowUpdateFrame = (SpdyWindowUpdateFrame) msg;
392             int streamId = spdyWindowUpdateFrame.streamId();
393             int deltaWindowSize = spdyWindowUpdateFrame.deltaWindowSize();
394 
395             // Ignore frames for half-closed streams
396             if (streamId != SPDY_SESSION_STREAM_ID && spdySession.isLocalSideClosed(streamId)) {
397                 return;
398             }
399 
400             // Check for numerical overflow
401             if (spdySession.getSendWindowSize(streamId) > Integer.MAX_VALUE - deltaWindowSize) {
402                 if (streamId == SPDY_SESSION_STREAM_ID) {
403                     issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
404                 } else {
405                     issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
406                 }
407                 return;
408             }
409 
410             updateSendWindowSize(ctx, streamId, deltaWindowSize);
411         }
412 
413         ctx.fireChannelRead(msg);
414     }
415 
416     @Override
417     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
418         for (Integer streamId: spdySession.activeStreams().keySet()) {
419             removeStream(streamId, ctx.newSucceededFuture());
420         }
421         ctx.fireChannelInactive();
422     }
423 
424     @Override
425     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
426         if (cause instanceof SpdyProtocolException) {
427             issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
428         }
429 
430         ctx.fireExceptionCaught(cause);
431     }
432 
433     @Override
434     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
435         sendGoAwayFrame(ctx, promise);
436     }
437 
438     @Override
439     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
440         if (msg instanceof SpdyDataFrame ||
441             msg instanceof SpdySynStreamFrame ||
442             msg instanceof SpdySynReplyFrame ||
443             msg instanceof SpdyRstStreamFrame ||
444             msg instanceof SpdySettingsFrame ||
445             msg instanceof SpdyPingFrame ||
446             msg instanceof SpdyGoAwayFrame ||
447             msg instanceof SpdyHeadersFrame ||
448             msg instanceof SpdyWindowUpdateFrame) {
449 
450             handleOutboundMessage(ctx, msg, promise);
451         } else {
452             ctx.write(msg, promise);
453         }
454     }
455 
456     private void handleOutboundMessage(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
457         if (msg instanceof SpdyDataFrame) {
458 
459             SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
460             int streamId = spdyDataFrame.streamId();
461 
462             // Frames must not be sent on half-closed streams
463             if (spdySession.isLocalSideClosed(streamId)) {
464                 spdyDataFrame.release();
465                 promise.setFailure(PROTOCOL_EXCEPTION);
466                 return;
467             }
468 
469             /*
470              * SPDY Data frame flow control processing requirements:
471              *
472              * Sender must not send a data frame with data length greater
473              * than the transfer window size.
474              *
475              * After sending each data frame, the sender decrements its
476              * transfer window size by the amount of data transmitted.
477              *
478              * When the window size becomes less than or equal to 0, the
479              * sender must pause transmitting data frames.
480              */
481 
482             int dataLength = spdyDataFrame.content().readableBytes();
483             int sendWindowSize = spdySession.getSendWindowSize(streamId);
484             int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
485             sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
486 
487             if (sendWindowSize <= 0) {
488                 // Stream is stalled -- enqueue Data frame and return
489                 spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
490                 return;
491             } else if (sendWindowSize < dataLength) {
492                 // Stream is not stalled but we cannot send the entire frame
493                 spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize);
494                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
495 
496                 // Create a partial data frame whose length is the current window size
497                 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(
498                         streamId, spdyDataFrame.content().readRetainedSlice(sendWindowSize));
499 
500                 // Enqueue the remaining data (will be the first frame queued)
501                 spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
502 
503                 // The transfer window size is pre-decremented when sending a data frame downstream.
504                 // Close the session on write failures that leave the transfer window in a corrupt state.
505                 final ChannelHandlerContext context = ctx;
506                 ctx.write(partialDataFrame).addListener(new ChannelFutureListener() {
507                     @Override
508                     public void operationComplete(ChannelFuture future) throws Exception {
509                         if (!future.isSuccess()) {
510                             issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
511                         }
512                     }
513                 });
514                 return;
515             } else {
516                 // Window size is large enough to send entire data frame
517                 spdySession.updateSendWindowSize(streamId, -1 * dataLength);
518                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength);
519 
520                 // The transfer window size is pre-decremented when sending a data frame downstream.
521                 // Close the session on write failures that leave the transfer window in a corrupt state.
522                 final ChannelHandlerContext context = ctx;
523                 promise.addListener(new ChannelFutureListener() {
524                     @Override
525                     public void operationComplete(ChannelFuture future) throws Exception {
526                         if (!future.isSuccess()) {
527                             issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
528                         }
529                     }
530                 });
531             }
532 
533             // Close the local side of the stream if this is the last frame
534             if (spdyDataFrame.isLast()) {
535                 halfCloseStream(streamId, false, promise);
536             }
537 
538         } else if (msg instanceof SpdySynStreamFrame) {
539 
540             SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
541             int streamId = spdySynStreamFrame.streamId();
542 
543             if (isRemoteInitiatedId(streamId)) {
544                 promise.setFailure(PROTOCOL_EXCEPTION);
545                 return;
546             }
547 
548             byte priority = spdySynStreamFrame.priority();
549             boolean remoteSideClosed = spdySynStreamFrame.isUnidirectional();
550             boolean localSideClosed = spdySynStreamFrame.isLast();
551             if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
552                 promise.setFailure(PROTOCOL_EXCEPTION);
553                 return;
554             }
555 
556         } else if (msg instanceof SpdySynReplyFrame) {
557 
558             SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
559             int streamId = spdySynReplyFrame.streamId();
560 
561             // Frames must not be sent on half-closed streams
562             if (!isRemoteInitiatedId(streamId) || spdySession.isLocalSideClosed(streamId)) {
563                 promise.setFailure(PROTOCOL_EXCEPTION);
564                 return;
565             }
566 
567             // Close the local side of the stream if this is the last frame
568             if (spdySynReplyFrame.isLast()) {
569                 halfCloseStream(streamId, false, promise);
570             }
571 
572         } else if (msg instanceof SpdyRstStreamFrame) {
573 
574             SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
575             removeStream(spdyRstStreamFrame.streamId(), promise);
576 
577         } else if (msg instanceof SpdySettingsFrame) {
578 
579             SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
580 
581             int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
582             if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
583                 // Settings frame had the wrong minor version
584                 promise.setFailure(PROTOCOL_EXCEPTION);
585                 return;
586             }
587 
588             int newConcurrentStreams =
589                     spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
590             if (newConcurrentStreams >= 0) {
591                 localConcurrentStreams = newConcurrentStreams;
592             }
593 
594             // Persistence flag are inconsistent with the use of SETTINGS to communicate
595             // the initial window size. Remove flags from the sender requesting that the
596             // value be persisted. Remove values that the sender indicates are persisted.
597             if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
598                 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
599             }
600             spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
601 
602             int newInitialWindowSize =
603                     spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
604             if (newInitialWindowSize >= 0) {
605                 updateInitialReceiveWindowSize(newInitialWindowSize);
606             }
607 
608         } else if (msg instanceof SpdyPingFrame) {
609 
610             SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
611             if (isRemoteInitiatedId(spdyPingFrame.id())) {
612                 ctx.fireExceptionCaught(new IllegalArgumentException(
613                             "invalid PING ID: " + spdyPingFrame.id()));
614                 return;
615             }
616             pings.getAndIncrement();
617 
618         } else if (msg instanceof SpdyGoAwayFrame) {
619 
620             // Why is this being sent? Intercept it and fail the write.
621             // Should have sent a CLOSE ChannelStateEvent
622             promise.setFailure(PROTOCOL_EXCEPTION);
623             return;
624 
625         } else if (msg instanceof SpdyHeadersFrame) {
626 
627             SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
628             int streamId = spdyHeadersFrame.streamId();
629 
630             // Frames must not be sent on half-closed streams
631             if (spdySession.isLocalSideClosed(streamId)) {
632                 promise.setFailure(PROTOCOL_EXCEPTION);
633                 return;
634             }
635 
636             // Close the local side of the stream if this is the last frame
637             if (spdyHeadersFrame.isLast()) {
638                 halfCloseStream(streamId, false, promise);
639             }
640 
641         } else if (msg instanceof SpdyWindowUpdateFrame) {
642 
643             // Why is this being sent? Intercept it and fail the write.
644             promise.setFailure(PROTOCOL_EXCEPTION);
645             return;
646         }
647 
648         ctx.write(msg, promise);
649     }
650 
651     /*
652      * SPDY Session Error Handling:
653      *
654      * When a session error occurs, the endpoint encountering the error must first
655      * send a GOAWAY frame with the Stream-ID of the most recently received stream
656      * from the remote endpoint, and the error code for why the session is terminating.
657      *
658      * After sending the GOAWAY frame, the endpoint must close the TCP connection.
659      */
660     private void issueSessionError(
661             ChannelHandlerContext ctx, SpdySessionStatus status) {
662 
663         sendGoAwayFrame(ctx, status).addListener(new ClosingChannelFutureListener(ctx, ctx.newPromise()));
664     }
665 
666     /*
667      * SPDY Stream Error Handling:
668      *
669      * Upon a stream error, the endpoint must send a RST_STREAM frame which contains
670      * the Stream-ID for the stream where the error occurred and the error getStatus which
671      * caused the error.
672      *
673      * After sending the RST_STREAM, the stream is closed to the sending endpoint.
674      *
675      * Note: this is only called by the worker thread
676      */
677     private void issueStreamError(ChannelHandlerContext ctx, int streamId, SpdyStreamStatus status) {
678         boolean fireChannelRead = !spdySession.isRemoteSideClosed(streamId);
679         ChannelPromise promise = ctx.newPromise();
680         removeStream(streamId, promise);
681 
682         SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, status);
683         ctx.writeAndFlush(spdyRstStreamFrame, promise);
684         if (fireChannelRead) {
685             ctx.fireChannelRead(spdyRstStreamFrame);
686         }
687     }
688 
689     /*
690      * Helper functions
691      */
692 
693     private boolean isRemoteInitiatedId(int id) {
694         boolean serverId = isServerId(id);
695         return server && !serverId || !server && serverId;
696     }
697 
698     // need to synchronize to prevent new streams from being created while updating active streams
699     private void updateInitialSendWindowSize(int newInitialWindowSize) {
700         int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
701         initialSendWindowSize = newInitialWindowSize;
702         spdySession.updateAllSendWindowSizes(deltaWindowSize);
703     }
704 
705     // need to synchronize to prevent new streams from being created while updating active streams
706     private void updateInitialReceiveWindowSize(int newInitialWindowSize) {
707         int deltaWindowSize = newInitialWindowSize - initialReceiveWindowSize;
708         initialReceiveWindowSize = newInitialWindowSize;
709         spdySession.updateAllReceiveWindowSizes(deltaWindowSize);
710     }
711 
712     // need to synchronize accesses to sentGoAwayFrame, lastGoodStreamId, and initial window sizes
713     private boolean acceptStream(
714             int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed) {
715         // Cannot initiate any new streams after receiving or sending GOAWAY
716         if (receivedGoAwayFrame || sentGoAwayFrame) {
717             return false;
718         }
719 
720         boolean remote = isRemoteInitiatedId(streamId);
721         int maxConcurrentStreams = remote ? localConcurrentStreams : remoteConcurrentStreams;
722         if (spdySession.numActiveStreams(remote) >= maxConcurrentStreams) {
723             return false;
724         }
725         spdySession.acceptStream(
726                 streamId, priority, remoteSideClosed, localSideClosed,
727                 initialSendWindowSize, initialReceiveWindowSize, remote);
728         if (remote) {
729             lastGoodStreamId = streamId;
730         }
731         return true;
732     }
733 
734     private void halfCloseStream(int streamId, boolean remote, ChannelFuture future) {
735         if (remote) {
736             spdySession.closeRemoteSide(streamId, isRemoteInitiatedId(streamId));
737         } else {
738             spdySession.closeLocalSide(streamId, isRemoteInitiatedId(streamId));
739         }
740         if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
741             future.addListener(closeSessionFutureListener);
742         }
743     }
744 
745     private void removeStream(int streamId, ChannelFuture future) {
746         spdySession.removeStream(streamId, STREAM_CLOSED, isRemoteInitiatedId(streamId));
747 
748         if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
749             future.addListener(closeSessionFutureListener);
750         }
751     }
752 
753     private void updateSendWindowSize(final ChannelHandlerContext ctx, int streamId, int deltaWindowSize) {
754         spdySession.updateSendWindowSize(streamId, deltaWindowSize);
755 
756         while (true) {
757             // Check if we have unblocked a stalled stream
758             SpdySession.PendingWrite pendingWrite = spdySession.getPendingWrite(streamId);
759             if (pendingWrite == null) {
760                 return;
761             }
762 
763             SpdyDataFrame spdyDataFrame = pendingWrite.spdyDataFrame;
764             int dataFrameSize = spdyDataFrame.content().readableBytes();
765             int writeStreamId = spdyDataFrame.streamId();
766             int sendWindowSize = spdySession.getSendWindowSize(writeStreamId);
767             int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
768             sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
769 
770             if (sendWindowSize <= 0) {
771                 return;
772             } else if (sendWindowSize < dataFrameSize) {
773                 // We can send a partial frame
774                 spdySession.updateSendWindowSize(writeStreamId, -1 * sendWindowSize);
775                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
776 
777                 // Create a partial data frame whose length is the current window size
778                 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(
779                         writeStreamId, spdyDataFrame.content().readRetainedSlice(sendWindowSize));
780 
781                 // The transfer window size is pre-decremented when sending a data frame downstream.
782                 // Close the session on write failures that leave the transfer window in a corrupt state.
783                 ctx.writeAndFlush(partialDataFrame).addListener(new ChannelFutureListener() {
784                     @Override
785                     public void operationComplete(ChannelFuture future) throws Exception {
786                         if (!future.isSuccess()) {
787                             issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
788                         }
789                     }
790                 });
791             } else {
792                 // Window size is large enough to send entire data frame
793                 spdySession.removePendingWrite(writeStreamId);
794                 spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize);
795                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize);
796 
797                 // Close the local side of the stream if this is the last frame
798                 if (spdyDataFrame.isLast()) {
799                     halfCloseStream(writeStreamId, false, pendingWrite.promise);
800                 }
801 
802                 // The transfer window size is pre-decremented when sending a data frame downstream.
803                 // Close the session on write failures that leave the transfer window in a corrupt state.
804                 ctx.writeAndFlush(spdyDataFrame, pendingWrite.promise).addListener(new ChannelFutureListener() {
805                     @Override
806                     public void operationComplete(ChannelFuture future) throws Exception {
807                         if (!future.isSuccess()) {
808                             issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
809                         }
810                     }
811                 });
812             }
813         }
814     }
815 
816     private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelPromise future) {
817         // Avoid NotYetConnectedException
818         if (!ctx.channel().isActive()) {
819             ctx.close(future);
820             return;
821         }
822 
823         ChannelFuture f = sendGoAwayFrame(ctx, SpdySessionStatus.OK);
824         if (spdySession.noActiveStreams()) {
825             f.addListener(new ClosingChannelFutureListener(ctx, future));
826         } else {
827             closeSessionFutureListener = new ClosingChannelFutureListener(ctx, future);
828         }
829         // FIXME: Close the connection forcibly after timeout.
830     }
831 
832     private ChannelFuture sendGoAwayFrame(
833             ChannelHandlerContext ctx, SpdySessionStatus status) {
834         if (!sentGoAwayFrame) {
835             sentGoAwayFrame = true;
836             SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamId, status);
837             return ctx.writeAndFlush(spdyGoAwayFrame);
838         } else {
839             return ctx.newSucceededFuture();
840         }
841     }
842 
843     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
844         private final ChannelHandlerContext ctx;
845         private final ChannelPromise promise;
846 
847         ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
848             this.ctx = ctx;
849             this.promise = promise;
850         }
851 
852         @Override
853         public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
854             ctx.close(promise);
855         }
856     }
857 }