View Javadoc

1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.spdy;
17  
18  import io.netty.channel.ChannelDuplexHandler;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelFutureListener;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelPromise;
23  import io.netty.util.internal.ThrowableUtil;
24  
25  import java.util.concurrent.atomic.AtomicInteger;
26  
27  import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
28  
29  /**
30   * Manages streams within a SPDY session.
31   */
32  public class SpdySessionHandler
33          extends ChannelDuplexHandler {
34  
35      private static final SpdyProtocolException PROTOCOL_EXCEPTION = ThrowableUtil.unknownStackTrace(
36              new SpdyProtocolException(), SpdySessionHandler.class, "handleOutboundMessage(...)");
37      private static final SpdyProtocolException STREAM_CLOSED = ThrowableUtil.unknownStackTrace(
38              new SpdyProtocolException("Stream closed"), SpdySessionHandler.class, "removeStream(...)");
39  
40      private static final int DEFAULT_WINDOW_SIZE = 64 * 1024; // 64 KB default initial window size
41      private int initialSendWindowSize    = DEFAULT_WINDOW_SIZE;
42      private int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
43      private volatile int initialSessionReceiveWindowSize = DEFAULT_WINDOW_SIZE;
44  
45      private final SpdySession spdySession = new SpdySession(initialSendWindowSize, initialReceiveWindowSize);
46      private int lastGoodStreamId;
47  
48      private static final int DEFAULT_MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE;
49      private int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
50      private int localConcurrentStreams  = DEFAULT_MAX_CONCURRENT_STREAMS;
51  
52      private final AtomicInteger pings = new AtomicInteger();
53  
54      private boolean sentGoAwayFrame;
55      private boolean receivedGoAwayFrame;
56  
57      private ChannelFutureListener closeSessionFutureListener;
58  
59      private final boolean server;
60      private final int minorVersion;
61  
62      /**
63       * Creates a new session handler.
64       *
65       * @param version the protocol version
66       * @param server  {@code true} if and only if this session handler should
67       *                handle the server endpoint of the connection.
68       *                {@code false} if and only if this session handler should
69       *                handle the client endpoint of the connection.
70       */
71      public SpdySessionHandler(SpdyVersion version, boolean server) {
72          if (version == null) {
73              throw new NullPointerException("version");
74          }
75          this.server = server;
76          minorVersion = version.getMinorVersion();
77      }
78  
79      public void setSessionReceiveWindowSize(int sessionReceiveWindowSize) {
80        if (sessionReceiveWindowSize < 0) {
81          throw new IllegalArgumentException("sessionReceiveWindowSize");
82        }
83        // This will not send a window update frame immediately.
84        // If this value increases the allowed receive window size,
85        // a WINDOW_UPDATE frame will be sent when only half of the
86        // session window size remains during data frame processing.
87        // If this value decreases the allowed receive window size,
88        // the window will be reduced as data frames are processed.
89        initialSessionReceiveWindowSize = sessionReceiveWindowSize;
90      }
91  
92      @Override
93      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
94          if (msg instanceof SpdyDataFrame) {
95  
96              /*
97               * SPDY Data frame processing requirements:
98               *
99               * If an endpoint receives a data frame for a Stream-ID which is not open
100              * and the endpoint has not sent a GOAWAY frame, it must issue a stream error
101              * with the error code INVALID_STREAM for the Stream-ID.
102              *
103              * If an endpoint which created the stream receives a data frame before receiving
104              * a SYN_REPLY on that stream, it is a protocol error, and the recipient must
105              * issue a stream error with the getStatus code PROTOCOL_ERROR for the Stream-ID.
106              *
107              * If an endpoint receives multiple data frames for invalid Stream-IDs,
108              * it may close the session.
109              *
110              * If an endpoint refuses a stream it must ignore any data frames for that stream.
111              *
112              * If an endpoint receives a data frame after the stream is half-closed from the
113              * sender, it must send a RST_STREAM frame with the getStatus STREAM_ALREADY_CLOSED.
114              *
115              * If an endpoint receives a data frame after the stream is closed, it must send
116              * a RST_STREAM frame with the getStatus PROTOCOL_ERROR.
117              */
118             SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
119             int streamId = spdyDataFrame.streamId();
120 
121             int deltaWindowSize = -1 * spdyDataFrame.content().readableBytes();
122             int newSessionWindowSize =
123                 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, deltaWindowSize);
124 
125             // Check if session window size is reduced beyond allowable lower bound
126             if (newSessionWindowSize < 0) {
127                 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
128                 return;
129             }
130 
131             // Send a WINDOW_UPDATE frame if less than half the session window size remains
132             if (newSessionWindowSize <= initialSessionReceiveWindowSize / 2) {
133                 int sessionDeltaWindowSize = initialSessionReceiveWindowSize - newSessionWindowSize;
134                 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
135                 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
136                     new DefaultSpdyWindowUpdateFrame(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
137                 ctx.writeAndFlush(spdyWindowUpdateFrame);
138             }
139 
140             // Check if we received a data frame for a Stream-ID which is not open
141 
142             if (!spdySession.isActiveStream(streamId)) {
143                 spdyDataFrame.release();
144                 if (streamId <= lastGoodStreamId) {
145                     issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
146                 } else if (!sentGoAwayFrame) {
147                     issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
148                 }
149                 return;
150             }
151 
152             // Check if we received a data frame for a stream which is half-closed
153 
154             if (spdySession.isRemoteSideClosed(streamId)) {
155                 spdyDataFrame.release();
156                 issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_ALREADY_CLOSED);
157                 return;
158             }
159 
160             // Check if we received a data frame before receiving a SYN_REPLY
161             if (!isRemoteInitiatedId(streamId) && !spdySession.hasReceivedReply(streamId)) {
162                 spdyDataFrame.release();
163                 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
164                 return;
165             }
166 
167             /*
168              * SPDY Data frame flow control processing requirements:
169              *
170              * Recipient should not send a WINDOW_UPDATE frame as it consumes the last data frame.
171              */
172 
173             // Update receive window size
174             int newWindowSize = spdySession.updateReceiveWindowSize(streamId, deltaWindowSize);
175 
176             // Window size can become negative if we sent a SETTINGS frame that reduces the
177             // size of the transfer window after the peer has written data frames.
178             // The value is bounded by the length that SETTINGS frame decrease the window.
179             // This difference is stored for the session when writing the SETTINGS frame
180             // and is cleared once we send a WINDOW_UPDATE frame.
181             if (newWindowSize < spdySession.getReceiveWindowSizeLowerBound(streamId)) {
182                 spdyDataFrame.release();
183                 issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
184                 return;
185             }
186 
187             // Window size became negative due to sender writing frame before receiving SETTINGS
188             // Send data frames upstream in initialReceiveWindowSize chunks
189             if (newWindowSize < 0) {
190                 while (spdyDataFrame.content().readableBytes() > initialReceiveWindowSize) {
191                     SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId,
192                             spdyDataFrame.content().readSlice(initialReceiveWindowSize).retain());
193                     ctx.writeAndFlush(partialDataFrame);
194                 }
195             }
196 
197             // Send a WINDOW_UPDATE frame if less than half the stream window size remains
198             if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) {
199                 int streamDeltaWindowSize = initialReceiveWindowSize - newWindowSize;
200                 spdySession.updateReceiveWindowSize(streamId, streamDeltaWindowSize);
201                 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
202                         new DefaultSpdyWindowUpdateFrame(streamId, streamDeltaWindowSize);
203                 ctx.writeAndFlush(spdyWindowUpdateFrame);
204             }
205 
206             // Close the remote side of the stream if this is the last frame
207             if (spdyDataFrame.isLast()) {
208                 halfCloseStream(streamId, true, ctx.newSucceededFuture());
209             }
210 
211         } else if (msg instanceof SpdySynStreamFrame) {
212 
213             /*
214              * SPDY SYN_STREAM frame processing requirements:
215              *
216              * If an endpoint receives a SYN_STREAM with a Stream-ID that is less than
217              * any previously received SYN_STREAM, it must issue a session error with
218              * the getStatus PROTOCOL_ERROR.
219              *
220              * If an endpoint receives multiple SYN_STREAM frames with the same active
221              * Stream-ID, it must issue a stream error with the getStatus code PROTOCOL_ERROR.
222              *
223              * The recipient can reject a stream by sending a stream error with the
224              * getStatus code REFUSED_STREAM.
225              */
226 
227             SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
228             int streamId = spdySynStreamFrame.streamId();
229 
230             // Check if we received a valid SYN_STREAM frame
231             if (spdySynStreamFrame.isInvalid() ||
232                 !isRemoteInitiatedId(streamId) ||
233                 spdySession.isActiveStream(streamId)) {
234                 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
235                 return;
236             }
237 
238             // Stream-IDs must be monotonically increasing
239             if (streamId <= lastGoodStreamId) {
240                 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
241                 return;
242             }
243 
244             // Try to accept the stream
245             byte priority = spdySynStreamFrame.priority();
246             boolean remoteSideClosed = spdySynStreamFrame.isLast();
247             boolean localSideClosed = spdySynStreamFrame.isUnidirectional();
248             if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
249                 issueStreamError(ctx, streamId, SpdyStreamStatus.REFUSED_STREAM);
250                 return;
251             }
252 
253         } else if (msg instanceof SpdySynReplyFrame) {
254 
255             /*
256              * SPDY SYN_REPLY frame processing requirements:
257              *
258              * If an endpoint receives multiple SYN_REPLY frames for the same active Stream-ID
259              * it must issue a stream error with the getStatus code STREAM_IN_USE.
260              */
261 
262             SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
263             int streamId = spdySynReplyFrame.streamId();
264 
265             // Check if we received a valid SYN_REPLY frame
266             if (spdySynReplyFrame.isInvalid() ||
267                 isRemoteInitiatedId(streamId) ||
268                 spdySession.isRemoteSideClosed(streamId)) {
269                 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
270                 return;
271             }
272 
273             // Check if we have received multiple frames for the same Stream-ID
274             if (spdySession.hasReceivedReply(streamId)) {
275                 issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_IN_USE);
276                 return;
277             }
278 
279             spdySession.receivedReply(streamId);
280 
281             // Close the remote side of the stream if this is the last frame
282             if (spdySynReplyFrame.isLast()) {
283                 halfCloseStream(streamId, true, ctx.newSucceededFuture());
284             }
285 
286         } else if (msg instanceof SpdyRstStreamFrame) {
287 
288             /*
289              * SPDY RST_STREAM frame processing requirements:
290              *
291              * After receiving a RST_STREAM on a stream, the receiver must not send
292              * additional frames on that stream.
293              *
294              * An endpoint must not send a RST_STREAM in response to a RST_STREAM.
295              */
296 
297             SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
298             removeStream(spdyRstStreamFrame.streamId(), ctx.newSucceededFuture());
299 
300         } else if (msg instanceof SpdySettingsFrame) {
301 
302             SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
303 
304             int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
305             if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
306                 // Settings frame had the wrong minor version
307                 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
308                 return;
309             }
310 
311             int newConcurrentStreams =
312                 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
313             if (newConcurrentStreams >= 0) {
314                 remoteConcurrentStreams = newConcurrentStreams;
315             }
316 
317             // Persistence flag are inconsistent with the use of SETTINGS to communicate
318             // the initial window size. Remove flags from the sender requesting that the
319             // value be persisted. Remove values that the sender indicates are persisted.
320             if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
321                 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
322             }
323             spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
324 
325             int newInitialWindowSize =
326                 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
327             if (newInitialWindowSize >= 0) {
328                 updateInitialSendWindowSize(newInitialWindowSize);
329             }
330 
331         } else if (msg instanceof SpdyPingFrame) {
332 
333             /*
334              * SPDY PING frame processing requirements:
335              *
336              * Receivers of a PING frame should send an identical frame to the sender
337              * as soon as possible.
338              *
339              * Receivers of a PING frame must ignore frames that it did not initiate
340              */
341 
342             SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
343 
344             if (isRemoteInitiatedId(spdyPingFrame.id())) {
345                 ctx.writeAndFlush(spdyPingFrame);
346                 return;
347             }
348 
349             // Note: only checks that there are outstanding pings since uniqueness is not enforced
350             if (pings.get() == 0) {
351                 return;
352             }
353             pings.getAndDecrement();
354 
355         } else if (msg instanceof SpdyGoAwayFrame) {
356 
357             receivedGoAwayFrame = true;
358 
359         } else if (msg instanceof SpdyHeadersFrame) {
360 
361             SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
362             int streamId = spdyHeadersFrame.streamId();
363 
364             // Check if we received a valid HEADERS frame
365             if (spdyHeadersFrame.isInvalid()) {
366                 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
367                 return;
368             }
369 
370             if (spdySession.isRemoteSideClosed(streamId)) {
371                 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
372                 return;
373             }
374 
375             // Close the remote side of the stream if this is the last frame
376             if (spdyHeadersFrame.isLast()) {
377                 halfCloseStream(streamId, true, ctx.newSucceededFuture());
378             }
379 
380         } else if (msg instanceof SpdyWindowUpdateFrame) {
381 
382             /*
383              * SPDY WINDOW_UPDATE frame processing requirements:
384              *
385              * Receivers of a WINDOW_UPDATE that cause the window size to exceed 2^31
386              * must send a RST_STREAM with the getStatus code FLOW_CONTROL_ERROR.
387              *
388              * Sender should ignore all WINDOW_UPDATE frames associated with a stream
389              * after sending the last frame for the stream.
390              */
391 
392             SpdyWindowUpdateFrame spdyWindowUpdateFrame = (SpdyWindowUpdateFrame) msg;
393             int streamId = spdyWindowUpdateFrame.streamId();
394             int deltaWindowSize = spdyWindowUpdateFrame.deltaWindowSize();
395 
396             // Ignore frames for half-closed streams
397             if (streamId != SPDY_SESSION_STREAM_ID && spdySession.isLocalSideClosed(streamId)) {
398                 return;
399             }
400 
401             // Check for numerical overflow
402             if (spdySession.getSendWindowSize(streamId) > Integer.MAX_VALUE - deltaWindowSize) {
403                 if (streamId == SPDY_SESSION_STREAM_ID) {
404                     issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
405                 } else {
406                     issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
407                 }
408                 return;
409             }
410 
411             updateSendWindowSize(ctx, streamId, deltaWindowSize);
412         }
413 
414         ctx.fireChannelRead(msg);
415     }
416 
417     @Override
418     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
419         for (Integer streamId: spdySession.activeStreams().keySet()) {
420             removeStream(streamId, ctx.newSucceededFuture());
421         }
422         ctx.fireChannelInactive();
423     }
424 
425     @Override
426     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
427         if (cause instanceof SpdyProtocolException) {
428             issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
429         }
430 
431         ctx.fireExceptionCaught(cause);
432     }
433 
434     @Override
435     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
436         sendGoAwayFrame(ctx, promise);
437     }
438 
439     @Override
440     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
441         if (msg instanceof SpdyDataFrame ||
442             msg instanceof SpdySynStreamFrame ||
443             msg instanceof SpdySynReplyFrame ||
444             msg instanceof SpdyRstStreamFrame ||
445             msg instanceof SpdySettingsFrame ||
446             msg instanceof SpdyPingFrame ||
447             msg instanceof SpdyGoAwayFrame ||
448             msg instanceof SpdyHeadersFrame ||
449             msg instanceof SpdyWindowUpdateFrame) {
450 
451             handleOutboundMessage(ctx, msg, promise);
452         } else {
453             ctx.write(msg, promise);
454         }
455     }
456 
457     private void handleOutboundMessage(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
458         if (msg instanceof SpdyDataFrame) {
459 
460             SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
461             int streamId = spdyDataFrame.streamId();
462 
463             // Frames must not be sent on half-closed streams
464             if (spdySession.isLocalSideClosed(streamId)) {
465                 spdyDataFrame.release();
466                 promise.setFailure(PROTOCOL_EXCEPTION);
467                 return;
468             }
469 
470             /*
471              * SPDY Data frame flow control processing requirements:
472              *
473              * Sender must not send a data frame with data length greater
474              * than the transfer window size.
475              *
476              * After sending each data frame, the sender decrements its
477              * transfer window size by the amount of data transmitted.
478              *
479              * When the window size becomes less than or equal to 0, the
480              * sender must pause transmitting data frames.
481              */
482 
483             int dataLength = spdyDataFrame.content().readableBytes();
484             int sendWindowSize = spdySession.getSendWindowSize(streamId);
485             int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
486             sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
487 
488             if (sendWindowSize <= 0) {
489                 // Stream is stalled -- enqueue Data frame and return
490                 spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
491                 return;
492             } else if (sendWindowSize < dataLength) {
493                 // Stream is not stalled but we cannot send the entire frame
494                 spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize);
495                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
496 
497                 // Create a partial data frame whose length is the current window size
498                 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId,
499                         spdyDataFrame.content().readSlice(sendWindowSize).retain());
500 
501                 // Enqueue the remaining data (will be the first frame queued)
502                 spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
503 
504                 // The transfer window size is pre-decremented when sending a data frame downstream.
505                 // Close the session on write failures that leave the transfer window in a corrupt state.
506                 final ChannelHandlerContext context = ctx;
507                 ctx.write(partialDataFrame).addListener(new ChannelFutureListener() {
508                     @Override
509                     public void operationComplete(ChannelFuture future) throws Exception {
510                         if (!future.isSuccess()) {
511                             issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
512                         }
513                     }
514                 });
515                 return;
516             } else {
517                 // Window size is large enough to send entire data frame
518                 spdySession.updateSendWindowSize(streamId, -1 * dataLength);
519                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength);
520 
521                 // The transfer window size is pre-decremented when sending a data frame downstream.
522                 // Close the session on write failures that leave the transfer window in a corrupt state.
523                 final ChannelHandlerContext context = ctx;
524                 promise.addListener(new ChannelFutureListener() {
525                     @Override
526                     public void operationComplete(ChannelFuture future) throws Exception {
527                         if (!future.isSuccess()) {
528                             issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
529                         }
530                     }
531                 });
532             }
533 
534             // Close the local side of the stream if this is the last frame
535             if (spdyDataFrame.isLast()) {
536                 halfCloseStream(streamId, false, promise);
537             }
538 
539         } else if (msg instanceof SpdySynStreamFrame) {
540 
541             SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
542             int streamId = spdySynStreamFrame.streamId();
543 
544             if (isRemoteInitiatedId(streamId)) {
545                 promise.setFailure(PROTOCOL_EXCEPTION);
546                 return;
547             }
548 
549             byte priority = spdySynStreamFrame.priority();
550             boolean remoteSideClosed = spdySynStreamFrame.isUnidirectional();
551             boolean localSideClosed = spdySynStreamFrame.isLast();
552             if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
553                 promise.setFailure(PROTOCOL_EXCEPTION);
554                 return;
555             }
556 
557         } else if (msg instanceof SpdySynReplyFrame) {
558 
559             SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
560             int streamId = spdySynReplyFrame.streamId();
561 
562             // Frames must not be sent on half-closed streams
563             if (!isRemoteInitiatedId(streamId) || spdySession.isLocalSideClosed(streamId)) {
564                 promise.setFailure(PROTOCOL_EXCEPTION);
565                 return;
566             }
567 
568             // Close the local side of the stream if this is the last frame
569             if (spdySynReplyFrame.isLast()) {
570                 halfCloseStream(streamId, false, promise);
571             }
572 
573         } else if (msg instanceof SpdyRstStreamFrame) {
574 
575             SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
576             removeStream(spdyRstStreamFrame.streamId(), promise);
577 
578         } else if (msg instanceof SpdySettingsFrame) {
579 
580             SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
581 
582             int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
583             if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
584                 // Settings frame had the wrong minor version
585                 promise.setFailure(PROTOCOL_EXCEPTION);
586                 return;
587             }
588 
589             int newConcurrentStreams =
590                     spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
591             if (newConcurrentStreams >= 0) {
592                 localConcurrentStreams = newConcurrentStreams;
593             }
594 
595             // Persistence flag are inconsistent with the use of SETTINGS to communicate
596             // the initial window size. Remove flags from the sender requesting that the
597             // value be persisted. Remove values that the sender indicates are persisted.
598             if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
599                 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
600             }
601             spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
602 
603             int newInitialWindowSize =
604                     spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
605             if (newInitialWindowSize >= 0) {
606                 updateInitialReceiveWindowSize(newInitialWindowSize);
607             }
608 
609         } else if (msg instanceof SpdyPingFrame) {
610 
611             SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
612             if (isRemoteInitiatedId(spdyPingFrame.id())) {
613                 ctx.fireExceptionCaught(new IllegalArgumentException(
614                             "invalid PING ID: " + spdyPingFrame.id()));
615                 return;
616             }
617             pings.getAndIncrement();
618 
619         } else if (msg instanceof SpdyGoAwayFrame) {
620 
621             // Why is this being sent? Intercept it and fail the write.
622             // Should have sent a CLOSE ChannelStateEvent
623             promise.setFailure(PROTOCOL_EXCEPTION);
624             return;
625 
626         } else if (msg instanceof SpdyHeadersFrame) {
627 
628             SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
629             int streamId = spdyHeadersFrame.streamId();
630 
631             // Frames must not be sent on half-closed streams
632             if (spdySession.isLocalSideClosed(streamId)) {
633                 promise.setFailure(PROTOCOL_EXCEPTION);
634                 return;
635             }
636 
637             // Close the local side of the stream if this is the last frame
638             if (spdyHeadersFrame.isLast()) {
639                 halfCloseStream(streamId, false, promise);
640             }
641 
642         } else if (msg instanceof SpdyWindowUpdateFrame) {
643 
644             // Why is this being sent? Intercept it and fail the write.
645             promise.setFailure(PROTOCOL_EXCEPTION);
646             return;
647         }
648 
649         ctx.write(msg, promise);
650     }
651 
652     /*
653      * SPDY Session Error Handling:
654      *
655      * When a session error occurs, the endpoint encountering the error must first
656      * send a GOAWAY frame with the Stream-ID of the most recently received stream
657      * from the remote endpoint, and the error code for why the session is terminating.
658      *
659      * After sending the GOAWAY frame, the endpoint must close the TCP connection.
660      */
661     private void issueSessionError(
662             ChannelHandlerContext ctx, SpdySessionStatus status) {
663 
664         sendGoAwayFrame(ctx, status).addListener(new ClosingChannelFutureListener(ctx, ctx.newPromise()));
665     }
666 
667     /*
668      * SPDY Stream Error Handling:
669      *
670      * Upon a stream error, the endpoint must send a RST_STREAM frame which contains
671      * the Stream-ID for the stream where the error occurred and the error getStatus which
672      * caused the error.
673      *
674      * After sending the RST_STREAM, the stream is closed to the sending endpoint.
675      *
676      * Note: this is only called by the worker thread
677      */
678     private void issueStreamError(ChannelHandlerContext ctx, int streamId, SpdyStreamStatus status) {
679         boolean fireChannelRead = !spdySession.isRemoteSideClosed(streamId);
680         ChannelPromise promise = ctx.newPromise();
681         removeStream(streamId, promise);
682 
683         SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, status);
684         ctx.writeAndFlush(spdyRstStreamFrame, promise);
685         if (fireChannelRead) {
686             ctx.fireChannelRead(spdyRstStreamFrame);
687         }
688     }
689 
690     /*
691      * Helper functions
692      */
693 
694     private boolean isRemoteInitiatedId(int id) {
695         boolean serverId = isServerId(id);
696         return server && !serverId || !server && serverId;
697     }
698 
699     // need to synchronize to prevent new streams from being created while updating active streams
700     private void updateInitialSendWindowSize(int newInitialWindowSize) {
701         int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
702         initialSendWindowSize = newInitialWindowSize;
703         spdySession.updateAllSendWindowSizes(deltaWindowSize);
704     }
705 
706     // need to synchronize to prevent new streams from being created while updating active streams
707     private void updateInitialReceiveWindowSize(int newInitialWindowSize) {
708         int deltaWindowSize = newInitialWindowSize - initialReceiveWindowSize;
709         initialReceiveWindowSize = newInitialWindowSize;
710         spdySession.updateAllReceiveWindowSizes(deltaWindowSize);
711     }
712 
713     // need to synchronize accesses to sentGoAwayFrame, lastGoodStreamId, and initial window sizes
714     private boolean acceptStream(
715             int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed) {
716         // Cannot initiate any new streams after receiving or sending GOAWAY
717         if (receivedGoAwayFrame || sentGoAwayFrame) {
718             return false;
719         }
720 
721         boolean remote = isRemoteInitiatedId(streamId);
722         int maxConcurrentStreams = remote ? localConcurrentStreams : remoteConcurrentStreams;
723         if (spdySession.numActiveStreams(remote) >= maxConcurrentStreams) {
724             return false;
725         }
726         spdySession.acceptStream(
727                 streamId, priority, remoteSideClosed, localSideClosed,
728                 initialSendWindowSize, initialReceiveWindowSize, remote);
729         if (remote) {
730             lastGoodStreamId = streamId;
731         }
732         return true;
733     }
734 
735     private void halfCloseStream(int streamId, boolean remote, ChannelFuture future) {
736         if (remote) {
737             spdySession.closeRemoteSide(streamId, isRemoteInitiatedId(streamId));
738         } else {
739             spdySession.closeLocalSide(streamId, isRemoteInitiatedId(streamId));
740         }
741         if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
742             future.addListener(closeSessionFutureListener);
743         }
744     }
745 
746     private void removeStream(int streamId, ChannelFuture future) {
747         spdySession.removeStream(streamId, STREAM_CLOSED, isRemoteInitiatedId(streamId));
748 
749         if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
750             future.addListener(closeSessionFutureListener);
751         }
752     }
753 
754     private void updateSendWindowSize(final ChannelHandlerContext ctx, int streamId, int deltaWindowSize) {
755         spdySession.updateSendWindowSize(streamId, deltaWindowSize);
756 
757         while (true) {
758             // Check if we have unblocked a stalled stream
759             SpdySession.PendingWrite pendingWrite = spdySession.getPendingWrite(streamId);
760             if (pendingWrite == null) {
761                 return;
762             }
763 
764             SpdyDataFrame spdyDataFrame = pendingWrite.spdyDataFrame;
765             int dataFrameSize = spdyDataFrame.content().readableBytes();
766             int writeStreamId = spdyDataFrame.streamId();
767             int sendWindowSize = spdySession.getSendWindowSize(writeStreamId);
768             int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
769             sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
770 
771             if (sendWindowSize <= 0) {
772                 return;
773             } else if (sendWindowSize < dataFrameSize) {
774                 // We can send a partial frame
775                 spdySession.updateSendWindowSize(writeStreamId, -1 * sendWindowSize);
776                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
777 
778                 // Create a partial data frame whose length is the current window size
779                 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(writeStreamId,
780                         spdyDataFrame.content().readSlice(sendWindowSize).retain());
781 
782                 // The transfer window size is pre-decremented when sending a data frame downstream.
783                 // Close the session on write failures that leave the transfer window in a corrupt state.
784                 ctx.writeAndFlush(partialDataFrame).addListener(new ChannelFutureListener() {
785                     @Override
786                     public void operationComplete(ChannelFuture future) throws Exception {
787                         if (!future.isSuccess()) {
788                             issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
789                         }
790                     }
791                 });
792             } else {
793                 // Window size is large enough to send entire data frame
794                 spdySession.removePendingWrite(writeStreamId);
795                 spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize);
796                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize);
797 
798                 // Close the local side of the stream if this is the last frame
799                 if (spdyDataFrame.isLast()) {
800                     halfCloseStream(writeStreamId, false, pendingWrite.promise);
801                 }
802 
803                 // The transfer window size is pre-decremented when sending a data frame downstream.
804                 // Close the session on write failures that leave the transfer window in a corrupt state.
805                 ctx.writeAndFlush(spdyDataFrame, pendingWrite.promise).addListener(new ChannelFutureListener() {
806                     @Override
807                     public void operationComplete(ChannelFuture future) throws Exception {
808                         if (!future.isSuccess()) {
809                             issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
810                         }
811                     }
812                 });
813             }
814         }
815     }
816 
817     private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelPromise future) {
818         // Avoid NotYetConnectedException
819         if (!ctx.channel().isActive()) {
820             ctx.close(future);
821             return;
822         }
823 
824         ChannelFuture f = sendGoAwayFrame(ctx, SpdySessionStatus.OK);
825         if (spdySession.noActiveStreams()) {
826             f.addListener(new ClosingChannelFutureListener(ctx, future));
827         } else {
828             closeSessionFutureListener = new ClosingChannelFutureListener(ctx, future);
829         }
830         // FIXME: Close the connection forcibly after timeout.
831     }
832 
833     private ChannelFuture sendGoAwayFrame(
834             ChannelHandlerContext ctx, SpdySessionStatus status) {
835         if (!sentGoAwayFrame) {
836             sentGoAwayFrame = true;
837             SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamId, status);
838             return ctx.writeAndFlush(spdyGoAwayFrame);
839         } else {
840             return ctx.newSucceededFuture();
841         }
842     }
843 
844     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
845         private final ChannelHandlerContext ctx;
846         private final ChannelPromise promise;
847 
848         ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
849             this.ctx = ctx;
850             this.promise = promise;
851         }
852 
853         @Override
854         public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
855             ctx.close(promise);
856         }
857     }
858 }