View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.spdy;
17  
18  import io.netty.channel.ChannelFuture;
19  import io.netty.channel.ChannelFutureListener;
20  import io.netty.channel.ChannelHandlerAdapter;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelPromise;
23  import io.netty.util.internal.EmptyArrays;
24  
25  import java.util.concurrent.atomic.AtomicInteger;
26  
27  import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
28  
29  /**
30   * Manages streams within a SPDY session.
31   */
32  public class SpdySessionHandler extends ChannelHandlerAdapter {
33  
34      private static final SpdyProtocolException PROTOCOL_EXCEPTION = new SpdyProtocolException();
35      private static final SpdyProtocolException STREAM_CLOSED = new SpdyProtocolException("Stream closed");
36  
37      static {
38          PROTOCOL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
39          STREAM_CLOSED.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
40      }
41  
42      private static final int DEFAULT_WINDOW_SIZE = 64 * 1024; // 64 KB default initial window size
43      private int initialSendWindowSize    = DEFAULT_WINDOW_SIZE;
44      private int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
45      private volatile int initialSessionReceiveWindowSize = DEFAULT_WINDOW_SIZE;
46  
47      private final SpdySession spdySession = new SpdySession(initialSendWindowSize, initialReceiveWindowSize);
48      private int lastGoodStreamId;
49  
50      private static final int DEFAULT_MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE;
51      private int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
52      private int localConcurrentStreams  = DEFAULT_MAX_CONCURRENT_STREAMS;
53  
54      private final AtomicInteger pings = new AtomicInteger();
55  
56      private boolean sentGoAwayFrame;
57      private boolean receivedGoAwayFrame;
58  
59      private ChannelFutureListener closeSessionFutureListener;
60  
61      private final boolean server;
62      private final int minorVersion;
63  
64      /**
65       * Creates a new session handler.
66       *
67       * @param version the protocol version
68       * @param server  {@code true} if and only if this session handler should
69       *                handle the server endpoint of the connection.
70       *                {@code false} if and only if this session handler should
71       *                handle the client endpoint of the connection.
72       */
73      public SpdySessionHandler(SpdyVersion version, boolean server) {
74          if (version == null) {
75              throw new NullPointerException("version");
76          }
77          this.server = server;
78          minorVersion = version.getMinorVersion();
79      }
80  
81      public void setSessionReceiveWindowSize(int sessionReceiveWindowSize) {
82        if (sessionReceiveWindowSize < 0) {
83          throw new IllegalArgumentException("sessionReceiveWindowSize");
84        }
85        // This will not send a window update frame immediately.
86        // If this value increases the allowed receive window size,
87        // a WINDOW_UPDATE frame will be sent when only half of the
88        // session window size remains during data frame processing.
89        // If this value decreases the allowed receive window size,
90        // the window will be reduced as data frames are processed.
91        initialSessionReceiveWindowSize = sessionReceiveWindowSize;
92      }
93  
94      @Override
95      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
96          if (msg instanceof SpdyDataFrame) {
97  
98              /*
99               * SPDY Data frame processing requirements:
100              *
101              * If an endpoint receives a data frame for a Stream-ID which is not open
102              * and the endpoint has not sent a GOAWAY frame, it must issue a stream error
103              * with the error code INVALID_STREAM for the Stream-ID.
104              *
105              * If an endpoint which created the stream receives a data frame before receiving
106              * a SYN_REPLY on that stream, it is a protocol error, and the recipient must
107              * issue a stream error with the getStatus code PROTOCOL_ERROR for the Stream-ID.
108              *
109              * If an endpoint receives multiple data frames for invalid Stream-IDs,
110              * it may close the session.
111              *
112              * If an endpoint refuses a stream it must ignore any data frames for that stream.
113              *
114              * If an endpoint receives a data frame after the stream is half-closed from the
115              * sender, it must send a RST_STREAM frame with the getStatus STREAM_ALREADY_CLOSED.
116              *
117              * If an endpoint receives a data frame after the stream is closed, it must send
118              * a RST_STREAM frame with the getStatus PROTOCOL_ERROR.
119              */
120             SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
121             int streamId = spdyDataFrame.streamId();
122 
123             int deltaWindowSize = -1 * spdyDataFrame.content().readableBytes();
124             int newSessionWindowSize =
125                 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, deltaWindowSize);
126 
127             // Check if session window size is reduced beyond allowable lower bound
128             if (newSessionWindowSize < 0) {
129                 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
130                 return;
131             }
132 
133             // Send a WINDOW_UPDATE frame if less than half the session window size remains
134             if (newSessionWindowSize <= initialSessionReceiveWindowSize / 2) {
135                 int sessionDeltaWindowSize = initialSessionReceiveWindowSize - newSessionWindowSize;
136                 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
137                 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
138                     new DefaultSpdyWindowUpdateFrame(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
139                 ctx.writeAndFlush(spdyWindowUpdateFrame);
140             }
141 
142             // Check if we received a data frame for a Stream-ID which is not open
143 
144             if (!spdySession.isActiveStream(streamId)) {
145                 spdyDataFrame.release();
146                 if (streamId <= lastGoodStreamId) {
147                     issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
148                 } else if (!sentGoAwayFrame) {
149                     issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
150                 }
151                 return;
152             }
153 
154             // Check if we received a data frame for a stream which is half-closed
155 
156             if (spdySession.isRemoteSideClosed(streamId)) {
157                 spdyDataFrame.release();
158                 issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_ALREADY_CLOSED);
159                 return;
160             }
161 
162             // Check if we received a data frame before receiving a SYN_REPLY
163             if (!isRemoteInitiatedId(streamId) && !spdySession.hasReceivedReply(streamId)) {
164                 spdyDataFrame.release();
165                 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
166                 return;
167             }
168 
169             /*
170              * SPDY Data frame flow control processing requirements:
171              *
172              * Recipient should not send a WINDOW_UPDATE frame as it consumes the last data frame.
173              */
174 
175             // Update receive window size
176             int newWindowSize = spdySession.updateReceiveWindowSize(streamId, deltaWindowSize);
177 
178             // Window size can become negative if we sent a SETTINGS frame that reduces the
179             // size of the transfer window after the peer has written data frames.
180             // The value is bounded by the length that SETTINGS frame decrease the window.
181             // This difference is stored for the session when writing the SETTINGS frame
182             // and is cleared once we send a WINDOW_UPDATE frame.
183             if (newWindowSize < spdySession.getReceiveWindowSizeLowerBound(streamId)) {
184                 spdyDataFrame.release();
185                 issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
186                 return;
187             }
188 
189             // Window size became negative due to sender writing frame before receiving SETTINGS
190             // Send data frames upstream in initialReceiveWindowSize chunks
191             if (newWindowSize < 0) {
192                 while (spdyDataFrame.content().readableBytes() > initialReceiveWindowSize) {
193                     SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId,
194                             spdyDataFrame.content().readSlice(initialReceiveWindowSize).retain());
195                     ctx.writeAndFlush(partialDataFrame);
196                 }
197             }
198 
199             // Send a WINDOW_UPDATE frame if less than half the stream window size remains
200             if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) {
201                 int streamDeltaWindowSize = initialReceiveWindowSize - newWindowSize;
202                 spdySession.updateReceiveWindowSize(streamId, streamDeltaWindowSize);
203                 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
204                         new DefaultSpdyWindowUpdateFrame(streamId, streamDeltaWindowSize);
205                 ctx.writeAndFlush(spdyWindowUpdateFrame);
206             }
207 
208             // Close the remote side of the stream if this is the last frame
209             if (spdyDataFrame.isLast()) {
210                 halfCloseStream(streamId, true, ctx.newSucceededFuture());
211             }
212 
213         } else if (msg instanceof SpdySynStreamFrame) {
214 
215             /*
216              * SPDY SYN_STREAM frame processing requirements:
217              *
218              * If an endpoint receives a SYN_STREAM with a Stream-ID that is less than
219              * any previously received SYN_STREAM, it must issue a session error with
220              * the getStatus PROTOCOL_ERROR.
221              *
222              * If an endpoint receives multiple SYN_STREAM frames with the same active
223              * Stream-ID, it must issue a stream error with the getStatus code PROTOCOL_ERROR.
224              *
225              * The recipient can reject a stream by sending a stream error with the
226              * getStatus code REFUSED_STREAM.
227              */
228 
229             SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
230             int streamId = spdySynStreamFrame.streamId();
231 
232             // Check if we received a valid SYN_STREAM frame
233             if (spdySynStreamFrame.isInvalid() ||
234                 !isRemoteInitiatedId(streamId) ||
235                 spdySession.isActiveStream(streamId)) {
236                 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
237                 return;
238             }
239 
240             // Stream-IDs must be monotonically increasing
241             if (streamId <= lastGoodStreamId) {
242                 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
243                 return;
244             }
245 
246             // Try to accept the stream
247             byte priority = spdySynStreamFrame.priority();
248             boolean remoteSideClosed = spdySynStreamFrame.isLast();
249             boolean localSideClosed = spdySynStreamFrame.isUnidirectional();
250             if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
251                 issueStreamError(ctx, streamId, SpdyStreamStatus.REFUSED_STREAM);
252                 return;
253             }
254 
255         } else if (msg instanceof SpdySynReplyFrame) {
256 
257             /*
258              * SPDY SYN_REPLY frame processing requirements:
259              *
260              * If an endpoint receives multiple SYN_REPLY frames for the same active Stream-ID
261              * it must issue a stream error with the getStatus code STREAM_IN_USE.
262              */
263 
264             SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
265             int streamId = spdySynReplyFrame.streamId();
266 
267             // Check if we received a valid SYN_REPLY frame
268             if (spdySynReplyFrame.isInvalid() ||
269                 isRemoteInitiatedId(streamId) ||
270                 spdySession.isRemoteSideClosed(streamId)) {
271                 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
272                 return;
273             }
274 
275             // Check if we have received multiple frames for the same Stream-ID
276             if (spdySession.hasReceivedReply(streamId)) {
277                 issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_IN_USE);
278                 return;
279             }
280 
281             spdySession.receivedReply(streamId);
282 
283             // Close the remote side of the stream if this is the last frame
284             if (spdySynReplyFrame.isLast()) {
285                 halfCloseStream(streamId, true, ctx.newSucceededFuture());
286             }
287 
288         } else if (msg instanceof SpdyRstStreamFrame) {
289 
290             /*
291              * SPDY RST_STREAM frame processing requirements:
292              *
293              * After receiving a RST_STREAM on a stream, the receiver must not send
294              * additional frames on that stream.
295              *
296              * An endpoint must not send a RST_STREAM in response to a RST_STREAM.
297              */
298 
299             SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
300             removeStream(spdyRstStreamFrame.streamId(), ctx.newSucceededFuture());
301 
302         } else if (msg instanceof SpdySettingsFrame) {
303 
304             SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
305 
306             int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
307             if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
308                 // Settings frame had the wrong minor version
309                 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
310                 return;
311             }
312 
313             int newConcurrentStreams =
314                 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
315             if (newConcurrentStreams >= 0) {
316                 remoteConcurrentStreams = newConcurrentStreams;
317             }
318 
319             // Persistence flag are inconsistent with the use of SETTINGS to communicate
320             // the initial window size. Remove flags from the sender requesting that the
321             // value be persisted. Remove values that the sender indicates are persisted.
322             if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
323                 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
324             }
325             spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
326 
327             int newInitialWindowSize =
328                 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
329             if (newInitialWindowSize >= 0) {
330                 updateInitialSendWindowSize(newInitialWindowSize);
331             }
332 
333         } else if (msg instanceof SpdyPingFrame) {
334 
335             /*
336              * SPDY PING frame processing requirements:
337              *
338              * Receivers of a PING frame should send an identical frame to the sender
339              * as soon as possible.
340              *
341              * Receivers of a PING frame must ignore frames that it did not initiate
342              */
343 
344             SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
345 
346             if (isRemoteInitiatedId(spdyPingFrame.id())) {
347                 ctx.writeAndFlush(spdyPingFrame);
348                 return;
349             }
350 
351             // Note: only checks that there are outstanding pings since uniqueness is not enforced
352             if (pings.get() == 0) {
353                 return;
354             }
355             pings.getAndDecrement();
356 
357         } else if (msg instanceof SpdyGoAwayFrame) {
358 
359             receivedGoAwayFrame = true;
360 
361         } else if (msg instanceof SpdyHeadersFrame) {
362 
363             SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
364             int streamId = spdyHeadersFrame.streamId();
365 
366             // Check if we received a valid HEADERS frame
367             if (spdyHeadersFrame.isInvalid()) {
368                 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
369                 return;
370             }
371 
372             if (spdySession.isRemoteSideClosed(streamId)) {
373                 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
374                 return;
375             }
376 
377             // Close the remote side of the stream if this is the last frame
378             if (spdyHeadersFrame.isLast()) {
379                 halfCloseStream(streamId, true, ctx.newSucceededFuture());
380             }
381 
382         } else if (msg instanceof SpdyWindowUpdateFrame) {
383 
384             /*
385              * SPDY WINDOW_UPDATE frame processing requirements:
386              *
387              * Receivers of a WINDOW_UPDATE that cause the window size to exceed 2^31
388              * must send a RST_STREAM with the getStatus code FLOW_CONTROL_ERROR.
389              *
390              * Sender should ignore all WINDOW_UPDATE frames associated with a stream
391              * after sending the last frame for the stream.
392              */
393 
394             SpdyWindowUpdateFrame spdyWindowUpdateFrame = (SpdyWindowUpdateFrame) msg;
395             int streamId = spdyWindowUpdateFrame.streamId();
396             int deltaWindowSize = spdyWindowUpdateFrame.deltaWindowSize();
397 
398             // Ignore frames for half-closed streams
399             if (streamId != SPDY_SESSION_STREAM_ID && spdySession.isLocalSideClosed(streamId)) {
400                 return;
401             }
402 
403             // Check for numerical overflow
404             if (spdySession.getSendWindowSize(streamId) > Integer.MAX_VALUE - deltaWindowSize) {
405                 if (streamId == SPDY_SESSION_STREAM_ID) {
406                     issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
407                 } else {
408                     issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
409                 }
410                 return;
411             }
412 
413             updateSendWindowSize(ctx, streamId, deltaWindowSize);
414         }
415 
416         ctx.fireChannelRead(msg);
417     }
418 
419     @Override
420     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
421         for (Integer streamId: spdySession.activeStreams().keySet()) {
422             removeStream(streamId, ctx.newSucceededFuture());
423         }
424         ctx.fireChannelInactive();
425     }
426 
427     @Override
428     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
429         if (cause instanceof SpdyProtocolException) {
430             issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
431         }
432 
433         ctx.fireExceptionCaught(cause);
434     }
435 
436     @Override
437     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
438         sendGoAwayFrame(ctx, promise);
439     }
440 
441     @Override
442     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
443         if (msg instanceof SpdyDataFrame ||
444             msg instanceof SpdySynStreamFrame ||
445             msg instanceof SpdySynReplyFrame ||
446             msg instanceof SpdyRstStreamFrame ||
447             msg instanceof SpdySettingsFrame ||
448             msg instanceof SpdyPingFrame ||
449             msg instanceof SpdyGoAwayFrame ||
450             msg instanceof SpdyHeadersFrame ||
451             msg instanceof SpdyWindowUpdateFrame) {
452 
453             handleOutboundMessage(ctx, msg, promise);
454         } else {
455             ctx.write(msg, promise);
456         }
457     }
458 
459     private void handleOutboundMessage(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
460         if (msg instanceof SpdyDataFrame) {
461 
462             SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
463             int streamId = spdyDataFrame.streamId();
464 
465             // Frames must not be sent on half-closed streams
466             if (spdySession.isLocalSideClosed(streamId)) {
467                 spdyDataFrame.release();
468                 promise.setFailure(PROTOCOL_EXCEPTION);
469                 return;
470             }
471 
472             /*
473              * SPDY Data frame flow control processing requirements:
474              *
475              * Sender must not send a data frame with data length greater
476              * than the transfer window size.
477              *
478              * After sending each data frame, the sender decrements its
479              * transfer window size by the amount of data transmitted.
480              *
481              * When the window size becomes less than or equal to 0, the
482              * sender must pause transmitting data frames.
483              */
484 
485             int dataLength = spdyDataFrame.content().readableBytes();
486             int sendWindowSize = spdySession.getSendWindowSize(streamId);
487             int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
488             sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
489 
490             if (sendWindowSize <= 0) {
491                 // Stream is stalled -- enqueue Data frame and return
492                 spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
493                 return;
494             } else if (sendWindowSize < dataLength) {
495                 // Stream is not stalled but we cannot send the entire frame
496                 spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize);
497                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
498 
499                 // Create a partial data frame whose length is the current window size
500                 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId,
501                         spdyDataFrame.content().readSlice(sendWindowSize).retain());
502 
503                 // Enqueue the remaining data (will be the first frame queued)
504                 spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
505 
506                 // The transfer window size is pre-decremented when sending a data frame downstream.
507                 // Close the session on write failures that leave the transfer window in a corrupt state.
508                 final ChannelHandlerContext context = ctx;
509                 ctx.write(partialDataFrame).addListener(new ChannelFutureListener() {
510                     @Override
511                     public void operationComplete(ChannelFuture future) throws Exception {
512                         if (!future.isSuccess()) {
513                             issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
514                         }
515                     }
516                 });
517                 return;
518             } else {
519                 // Window size is large enough to send entire data frame
520                 spdySession.updateSendWindowSize(streamId, -1 * dataLength);
521                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength);
522 
523                 // The transfer window size is pre-decremented when sending a data frame downstream.
524                 // Close the session on write failures that leave the transfer window in a corrupt state.
525                 final ChannelHandlerContext context = ctx;
526                 promise.addListener(new ChannelFutureListener() {
527                     @Override
528                     public void operationComplete(ChannelFuture future) throws Exception {
529                         if (!future.isSuccess()) {
530                             issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
531                         }
532                     }
533                 });
534             }
535 
536             // Close the local side of the stream if this is the last frame
537             if (spdyDataFrame.isLast()) {
538                 halfCloseStream(streamId, false, promise);
539             }
540 
541         } else if (msg instanceof SpdySynStreamFrame) {
542 
543             SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
544             int streamId = spdySynStreamFrame.streamId();
545 
546             if (isRemoteInitiatedId(streamId)) {
547                 promise.setFailure(PROTOCOL_EXCEPTION);
548                 return;
549             }
550 
551             byte priority = spdySynStreamFrame.priority();
552             boolean remoteSideClosed = spdySynStreamFrame.isUnidirectional();
553             boolean localSideClosed = spdySynStreamFrame.isLast();
554             if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
555                 promise.setFailure(PROTOCOL_EXCEPTION);
556                 return;
557             }
558 
559         } else if (msg instanceof SpdySynReplyFrame) {
560 
561             SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
562             int streamId = spdySynReplyFrame.streamId();
563 
564             // Frames must not be sent on half-closed streams
565             if (!isRemoteInitiatedId(streamId) || spdySession.isLocalSideClosed(streamId)) {
566                 promise.setFailure(PROTOCOL_EXCEPTION);
567                 return;
568             }
569 
570             // Close the local side of the stream if this is the last frame
571             if (spdySynReplyFrame.isLast()) {
572                 halfCloseStream(streamId, false, promise);
573             }
574 
575         } else if (msg instanceof SpdyRstStreamFrame) {
576 
577             SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
578             removeStream(spdyRstStreamFrame.streamId(), promise);
579 
580         } else if (msg instanceof SpdySettingsFrame) {
581 
582             SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
583 
584             int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
585             if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
586                 // Settings frame had the wrong minor version
587                 promise.setFailure(PROTOCOL_EXCEPTION);
588                 return;
589             }
590 
591             int newConcurrentStreams =
592                     spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
593             if (newConcurrentStreams >= 0) {
594                 localConcurrentStreams = newConcurrentStreams;
595             }
596 
597             // Persistence flag are inconsistent with the use of SETTINGS to communicate
598             // the initial window size. Remove flags from the sender requesting that the
599             // value be persisted. Remove values that the sender indicates are persisted.
600             if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
601                 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
602             }
603             spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
604 
605             int newInitialWindowSize =
606                     spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
607             if (newInitialWindowSize >= 0) {
608                 updateInitialReceiveWindowSize(newInitialWindowSize);
609             }
610 
611         } else if (msg instanceof SpdyPingFrame) {
612 
613             SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
614             if (isRemoteInitiatedId(spdyPingFrame.id())) {
615                 ctx.fireExceptionCaught(new IllegalArgumentException(
616                             "invalid PING ID: " + spdyPingFrame.id()));
617                 return;
618             }
619             pings.getAndIncrement();
620 
621         } else if (msg instanceof SpdyGoAwayFrame) {
622 
623             // Why is this being sent? Intercept it and fail the write.
624             // Should have sent a CLOSE ChannelStateEvent
625             promise.setFailure(PROTOCOL_EXCEPTION);
626             return;
627 
628         } else if (msg instanceof SpdyHeadersFrame) {
629 
630             SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
631             int streamId = spdyHeadersFrame.streamId();
632 
633             // Frames must not be sent on half-closed streams
634             if (spdySession.isLocalSideClosed(streamId)) {
635                 promise.setFailure(PROTOCOL_EXCEPTION);
636                 return;
637             }
638 
639             // Close the local side of the stream if this is the last frame
640             if (spdyHeadersFrame.isLast()) {
641                 halfCloseStream(streamId, false, promise);
642             }
643 
644         } else if (msg instanceof SpdyWindowUpdateFrame) {
645 
646             // Why is this being sent? Intercept it and fail the write.
647             promise.setFailure(PROTOCOL_EXCEPTION);
648             return;
649         }
650 
651         ctx.write(msg, promise);
652     }
653 
654     /*
655      * SPDY Session Error Handling:
656      *
657      * When a session error occurs, the endpoint encountering the error must first
658      * send a GOAWAY frame with the Stream-ID of the most recently received stream
659      * from the remote endpoint, and the error code for why the session is terminating.
660      *
661      * After sending the GOAWAY frame, the endpoint must close the TCP connection.
662      */
663     private void issueSessionError(
664             ChannelHandlerContext ctx, SpdySessionStatus status) {
665 
666         sendGoAwayFrame(ctx, status).addListener(new ClosingChannelFutureListener(ctx, ctx.newPromise()));
667     }
668 
669     /*
670      * SPDY Stream Error Handling:
671      *
672      * Upon a stream error, the endpoint must send a RST_STREAM frame which contains
673      * the Stream-ID for the stream where the error occurred and the error getStatus which
674      * caused the error.
675      *
676      * After sending the RST_STREAM, the stream is closed to the sending endpoint.
677      *
678      * Note: this is only called by the worker thread
679      */
680     private void issueStreamError(ChannelHandlerContext ctx, int streamId, SpdyStreamStatus status) {
681         boolean fireChannelRead = !spdySession.isRemoteSideClosed(streamId);
682         ChannelPromise promise = ctx.newPromise();
683         removeStream(streamId, promise);
684 
685         SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, status);
686         ctx.writeAndFlush(spdyRstStreamFrame, promise);
687         if (fireChannelRead) {
688             ctx.fireChannelRead(spdyRstStreamFrame);
689         }
690     }
691 
692     /*
693      * Helper functions
694      */
695 
696     private boolean isRemoteInitiatedId(int id) {
697         boolean serverId = isServerId(id);
698         return server && !serverId || !server && serverId;
699     }
700 
701     // need to synchronize to prevent new streams from being created while updating active streams
702     private synchronized void updateInitialSendWindowSize(int newInitialWindowSize) {
703         int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
704         initialSendWindowSize = newInitialWindowSize;
705         spdySession.updateAllSendWindowSizes(deltaWindowSize);
706     }
707 
708     // need to synchronize to prevent new streams from being created while updating active streams
709     private synchronized void updateInitialReceiveWindowSize(int newInitialWindowSize) {
710         int deltaWindowSize = newInitialWindowSize - initialReceiveWindowSize;
711         initialReceiveWindowSize = newInitialWindowSize;
712         spdySession.updateAllReceiveWindowSizes(deltaWindowSize);
713     }
714 
715     // need to synchronize accesses to sentGoAwayFrame, lastGoodStreamId, and initial window sizes
716     private synchronized boolean acceptStream(
717             int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed) {
718         // Cannot initiate any new streams after receiving or sending GOAWAY
719         if (receivedGoAwayFrame || sentGoAwayFrame) {
720             return false;
721         }
722 
723         boolean remote = isRemoteInitiatedId(streamId);
724         int maxConcurrentStreams = remote ? localConcurrentStreams : remoteConcurrentStreams;
725         if (spdySession.numActiveStreams(remote) >= maxConcurrentStreams) {
726             return false;
727         }
728         spdySession.acceptStream(
729                 streamId, priority, remoteSideClosed, localSideClosed,
730                 initialSendWindowSize, initialReceiveWindowSize, remote);
731         if (remote) {
732             lastGoodStreamId = streamId;
733         }
734         return true;
735     }
736 
737     private void halfCloseStream(int streamId, boolean remote, ChannelFuture future) {
738         if (remote) {
739             spdySession.closeRemoteSide(streamId, isRemoteInitiatedId(streamId));
740         } else {
741             spdySession.closeLocalSide(streamId, isRemoteInitiatedId(streamId));
742         }
743         if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
744             future.addListener(closeSessionFutureListener);
745         }
746     }
747 
748     private void removeStream(int streamId, ChannelFuture future) {
749         spdySession.removeStream(streamId, STREAM_CLOSED, isRemoteInitiatedId(streamId));
750 
751         if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
752             future.addListener(closeSessionFutureListener);
753         }
754     }
755 
756     private void updateSendWindowSize(final ChannelHandlerContext ctx, int streamId, int deltaWindowSize) {
757         spdySession.updateSendWindowSize(streamId, deltaWindowSize);
758 
759         while (true) {
760             // Check if we have unblocked a stalled stream
761             SpdySession.PendingWrite pendingWrite = spdySession.getPendingWrite(streamId);
762             if (pendingWrite == null) {
763                 return;
764             }
765 
766             SpdyDataFrame spdyDataFrame = pendingWrite.spdyDataFrame;
767             int dataFrameSize = spdyDataFrame.content().readableBytes();
768             int writeStreamId = spdyDataFrame.streamId();
769             int sendWindowSize = spdySession.getSendWindowSize(writeStreamId);
770             int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
771             sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
772 
773             if (sendWindowSize <= 0) {
774                 return;
775             } else if (sendWindowSize < dataFrameSize) {
776                 // We can send a partial frame
777                 spdySession.updateSendWindowSize(writeStreamId, -1 * sendWindowSize);
778                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
779 
780                 // Create a partial data frame whose length is the current window size
781                 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(writeStreamId,
782                         spdyDataFrame.content().readSlice(sendWindowSize).retain());
783 
784                 // The transfer window size is pre-decremented when sending a data frame downstream.
785                 // Close the session on write failures that leave the transfer window in a corrupt state.
786                 ctx.writeAndFlush(partialDataFrame).addListener(new ChannelFutureListener() {
787                     @Override
788                     public void operationComplete(ChannelFuture future) throws Exception {
789                         if (!future.isSuccess()) {
790                             issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
791                         }
792                     }
793                 });
794             } else {
795                 // Window size is large enough to send entire data frame
796                 spdySession.removePendingWrite(writeStreamId);
797                 spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize);
798                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize);
799 
800                 // Close the local side of the stream if this is the last frame
801                 if (spdyDataFrame.isLast()) {
802                     halfCloseStream(writeStreamId, false, pendingWrite.promise);
803                 }
804 
805                 // The transfer window size is pre-decremented when sending a data frame downstream.
806                 // Close the session on write failures that leave the transfer window in a corrupt state.
807                 ctx.writeAndFlush(spdyDataFrame, pendingWrite.promise).addListener(new ChannelFutureListener() {
808                     @Override
809                     public void operationComplete(ChannelFuture future) throws Exception {
810                         if (!future.isSuccess()) {
811                             issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
812                         }
813                     }
814                 });
815             }
816         }
817     }
818 
819     private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelPromise future) {
820         // Avoid NotYetConnectedException
821         if (!ctx.channel().isActive()) {
822             ctx.close(future);
823             return;
824         }
825 
826         ChannelFuture f = sendGoAwayFrame(ctx, SpdySessionStatus.OK);
827         if (spdySession.noActiveStreams()) {
828             f.addListener(new ClosingChannelFutureListener(ctx, future));
829         } else {
830             closeSessionFutureListener = new ClosingChannelFutureListener(ctx, future);
831         }
832         // FIXME: Close the connection forcibly after timeout.
833     }
834 
835     private synchronized ChannelFuture sendGoAwayFrame(
836             ChannelHandlerContext ctx, SpdySessionStatus status) {
837         if (!sentGoAwayFrame) {
838             sentGoAwayFrame = true;
839             SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamId, status);
840             return ctx.writeAndFlush(spdyGoAwayFrame);
841         } else {
842             return ctx.newSucceededFuture();
843         }
844     }
845 
846     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
847         private final ChannelHandlerContext ctx;
848         private final ChannelPromise promise;
849 
850         ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
851             this.ctx = ctx;
852             this.promise = promise;
853         }
854 
855         @Override
856         public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
857             ctx.close(promise);
858         }
859     }
860 }