View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.spdy;
17  
18  import io.netty.channel.ChannelDuplexHandler;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelFutureListener;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelPromise;
23  import io.netty.util.internal.ObjectUtil;
24  import io.netty.util.internal.ThrowableUtil;
25  
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_SESSION_STREAM_ID;
29  import static io.netty.handler.codec.spdy.SpdyCodecUtil.isServerId;
30  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
31  
32  /**
33   * Manages streams within a SPDY session.
34   */
35  public class SpdySessionHandler extends ChannelDuplexHandler {
36  
37      private static final SpdyProtocolException PROTOCOL_EXCEPTION = ThrowableUtil.unknownStackTrace(
38              SpdyProtocolException.newStatic(null), SpdySessionHandler.class, "handleOutboundMessage(...)");
39      private static final SpdyProtocolException STREAM_CLOSED = ThrowableUtil.unknownStackTrace(
40              SpdyProtocolException.newStatic("Stream closed"), SpdySessionHandler.class, "removeStream(...)");
41  
42      private static final int DEFAULT_WINDOW_SIZE = 64 * 1024; // 64 KB default initial window size
43      private int initialSendWindowSize    = DEFAULT_WINDOW_SIZE;
44      private int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
45      private volatile int initialSessionReceiveWindowSize = DEFAULT_WINDOW_SIZE;
46  
47      private final SpdySession spdySession = new SpdySession(initialSendWindowSize, initialReceiveWindowSize);
48      private int lastGoodStreamId;
49  
50      private static final int DEFAULT_MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE;
51      private int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
52      private int localConcurrentStreams  = DEFAULT_MAX_CONCURRENT_STREAMS;
53  
54      private final AtomicInteger pings = new AtomicInteger();
55  
56      private boolean sentGoAwayFrame;
57      private boolean receivedGoAwayFrame;
58  
59      private ChannelFutureListener closeSessionFutureListener;
60  
61      private final boolean server;
62      private final int minorVersion;
63  
64      /**
65       * Creates a new session handler.
66       *
67       * @param version the protocol version
68       * @param server  {@code true} if and only if this session handler should
69       *                handle the server endpoint of the connection.
70       *                {@code false} if and only if this session handler should
71       *                handle the client endpoint of the connection.
72       */
73      public SpdySessionHandler(SpdyVersion version, boolean server) {
74          this.minorVersion = ObjectUtil.checkNotNull(version, "version").getMinorVersion();
75          this.server = server;
76      }
77  
78      public void setSessionReceiveWindowSize(int sessionReceiveWindowSize) {
79          checkPositiveOrZero(sessionReceiveWindowSize, "sessionReceiveWindowSize");
80          // This will not send a window update frame immediately.
81          // If this value increases the allowed receive window size,
82          // a WINDOW_UPDATE frame will be sent when only half of the
83          // session window size remains during data frame processing.
84          // If this value decreases the allowed receive window size,
85          // the window will be reduced as data frames are processed.
86          initialSessionReceiveWindowSize = sessionReceiveWindowSize;
87      }
88  
89      @Override
90      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
91          if (msg instanceof SpdyDataFrame) {
92  
93              /*
94               * SPDY Data frame processing requirements:
95               *
96               * If an endpoint receives a data frame for a Stream-ID which is not open
97               * and the endpoint has not sent a GOAWAY frame, it must issue a stream error
98               * with the error code INVALID_STREAM for the Stream-ID.
99               *
100              * If an endpoint which created the stream receives a data frame before receiving
101              * a SYN_REPLY on that stream, it is a protocol error, and the recipient must
102              * issue a stream error with the getStatus code PROTOCOL_ERROR for the Stream-ID.
103              *
104              * If an endpoint receives multiple data frames for invalid Stream-IDs,
105              * it may close the session.
106              *
107              * If an endpoint refuses a stream it must ignore any data frames for that stream.
108              *
109              * If an endpoint receives a data frame after the stream is half-closed from the
110              * sender, it must send a RST_STREAM frame with the getStatus STREAM_ALREADY_CLOSED.
111              *
112              * If an endpoint receives a data frame after the stream is closed, it must send
113              * a RST_STREAM frame with the getStatus PROTOCOL_ERROR.
114              */
115             SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
116             int streamId = spdyDataFrame.streamId();
117 
118             int deltaWindowSize = -1 * spdyDataFrame.content().readableBytes();
119             int newSessionWindowSize =
120                 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, deltaWindowSize);
121 
122             // Check if session window size is reduced beyond allowable lower bound
123             if (newSessionWindowSize < 0) {
124                 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
125                 return;
126             }
127 
128             // Send a WINDOW_UPDATE frame if less than half the session window size remains
129             if (newSessionWindowSize <= initialSessionReceiveWindowSize / 2) {
130                 int sessionDeltaWindowSize = initialSessionReceiveWindowSize - newSessionWindowSize;
131                 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
132                 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
133                     new DefaultSpdyWindowUpdateFrame(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
134                 ctx.writeAndFlush(spdyWindowUpdateFrame);
135             }
136 
137             // Check if we received a data frame for a Stream-ID which is not open
138 
139             if (!spdySession.isActiveStream(streamId)) {
140                 spdyDataFrame.release();
141                 if (streamId <= lastGoodStreamId) {
142                     issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
143                 } else if (!sentGoAwayFrame) {
144                     issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
145                 }
146                 return;
147             }
148 
149             // Check if we received a data frame for a stream which is half-closed
150 
151             if (spdySession.isRemoteSideClosed(streamId)) {
152                 spdyDataFrame.release();
153                 issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_ALREADY_CLOSED);
154                 return;
155             }
156 
157             // Check if we received a data frame before receiving a SYN_REPLY
158             if (!isRemoteInitiatedId(streamId) && !spdySession.hasReceivedReply(streamId)) {
159                 spdyDataFrame.release();
160                 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
161                 return;
162             }
163 
164             /*
165              * SPDY Data frame flow control processing requirements:
166              *
167              * Recipient should not send a WINDOW_UPDATE frame as it consumes the last data frame.
168              */
169 
170             // Update receive window size
171             int newWindowSize = spdySession.updateReceiveWindowSize(streamId, deltaWindowSize);
172 
173             // Window size can become negative if we sent a SETTINGS frame that reduces the
174             // size of the transfer window after the peer has written data frames.
175             // The value is bounded by the length that SETTINGS frame decrease the window.
176             // This difference is stored for the session when writing the SETTINGS frame
177             // and is cleared once we send a WINDOW_UPDATE frame.
178             if (newWindowSize < spdySession.getReceiveWindowSizeLowerBound(streamId)) {
179                 spdyDataFrame.release();
180                 issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
181                 return;
182             }
183 
184             // Window size became negative due to sender writing frame before receiving SETTINGS
185             // Send data frames upstream in initialReceiveWindowSize chunks
186             if (newWindowSize < 0) {
187                 while (spdyDataFrame.content().readableBytes() > initialReceiveWindowSize) {
188                     SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(
189                             streamId, spdyDataFrame.content().readRetainedSlice(initialReceiveWindowSize));
190                     ctx.writeAndFlush(partialDataFrame);
191                 }
192             }
193 
194             // Send a WINDOW_UPDATE frame if less than half the stream window size remains
195             if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) {
196                 int streamDeltaWindowSize = initialReceiveWindowSize - newWindowSize;
197                 spdySession.updateReceiveWindowSize(streamId, streamDeltaWindowSize);
198                 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
199                         new DefaultSpdyWindowUpdateFrame(streamId, streamDeltaWindowSize);
200                 ctx.writeAndFlush(spdyWindowUpdateFrame);
201             }
202 
203             // Close the remote side of the stream if this is the last frame
204             if (spdyDataFrame.isLast()) {
205                 halfCloseStream(streamId, true, ctx.newSucceededFuture());
206             }
207 
208         } else if (msg instanceof SpdySynStreamFrame) {
209 
210             /*
211              * SPDY SYN_STREAM frame processing requirements:
212              *
213              * If an endpoint receives a SYN_STREAM with a Stream-ID that is less than
214              * any previously received SYN_STREAM, it must issue a session error with
215              * the getStatus PROTOCOL_ERROR.
216              *
217              * If an endpoint receives multiple SYN_STREAM frames with the same active
218              * Stream-ID, it must issue a stream error with the getStatus code PROTOCOL_ERROR.
219              *
220              * The recipient can reject a stream by sending a stream error with the
221              * getStatus code REFUSED_STREAM.
222              */
223 
224             SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
225             int streamId = spdySynStreamFrame.streamId();
226 
227             // Check if we received a valid SYN_STREAM frame
228             if (spdySynStreamFrame.isInvalid() ||
229                 !isRemoteInitiatedId(streamId) ||
230                 spdySession.isActiveStream(streamId)) {
231                 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
232                 return;
233             }
234 
235             // Stream-IDs must be monotonically increasing
236             if (streamId <= lastGoodStreamId) {
237                 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
238                 return;
239             }
240 
241             // Try to accept the stream
242             byte priority = spdySynStreamFrame.priority();
243             boolean remoteSideClosed = spdySynStreamFrame.isLast();
244             boolean localSideClosed = spdySynStreamFrame.isUnidirectional();
245             if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
246                 issueStreamError(ctx, streamId, SpdyStreamStatus.REFUSED_STREAM);
247                 return;
248             }
249 
250         } else if (msg instanceof SpdySynReplyFrame) {
251 
252             /*
253              * SPDY SYN_REPLY frame processing requirements:
254              *
255              * If an endpoint receives multiple SYN_REPLY frames for the same active Stream-ID
256              * it must issue a stream error with the getStatus code STREAM_IN_USE.
257              */
258 
259             SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
260             int streamId = spdySynReplyFrame.streamId();
261 
262             // Check if we received a valid SYN_REPLY frame
263             if (spdySynReplyFrame.isInvalid() ||
264                 isRemoteInitiatedId(streamId) ||
265                 spdySession.isRemoteSideClosed(streamId)) {
266                 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
267                 return;
268             }
269 
270             // Check if we have received multiple frames for the same Stream-ID
271             if (spdySession.hasReceivedReply(streamId)) {
272                 issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_IN_USE);
273                 return;
274             }
275 
276             spdySession.receivedReply(streamId);
277 
278             // Close the remote side of the stream if this is the last frame
279             if (spdySynReplyFrame.isLast()) {
280                 halfCloseStream(streamId, true, ctx.newSucceededFuture());
281             }
282 
283         } else if (msg instanceof SpdyRstStreamFrame) {
284 
285             /*
286              * SPDY RST_STREAM frame processing requirements:
287              *
288              * After receiving a RST_STREAM on a stream, the receiver must not send
289              * additional frames on that stream.
290              *
291              * An endpoint must not send a RST_STREAM in response to a RST_STREAM.
292              */
293 
294             SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
295             removeStream(spdyRstStreamFrame.streamId(), ctx.newSucceededFuture());
296 
297         } else if (msg instanceof SpdySettingsFrame) {
298 
299             SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
300 
301             int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
302             if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
303                 // Settings frame had the wrong minor version
304                 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
305                 return;
306             }
307 
308             int newConcurrentStreams =
309                 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
310             if (newConcurrentStreams >= 0) {
311                 remoteConcurrentStreams = newConcurrentStreams;
312             }
313 
314             // Persistence flag are inconsistent with the use of SETTINGS to communicate
315             // the initial window size. Remove flags from the sender requesting that the
316             // value be persisted. Remove values that the sender indicates are persisted.
317             if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
318                 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
319             }
320             spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
321 
322             int newInitialWindowSize =
323                 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
324             if (newInitialWindowSize >= 0) {
325                 updateInitialSendWindowSize(newInitialWindowSize);
326             }
327 
328         } else if (msg instanceof SpdyPingFrame) {
329 
330             /*
331              * SPDY PING frame processing requirements:
332              *
333              * Receivers of a PING frame should send an identical frame to the sender
334              * as soon as possible.
335              *
336              * Receivers of a PING frame must ignore frames that it did not initiate
337              */
338 
339             SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
340 
341             if (isRemoteInitiatedId(spdyPingFrame.id())) {
342                 ctx.writeAndFlush(spdyPingFrame);
343                 return;
344             }
345 
346             // Note: only checks that there are outstanding pings since uniqueness is not enforced
347             if (pings.get() == 0) {
348                 return;
349             }
350             pings.getAndDecrement();
351 
352         } else if (msg instanceof SpdyGoAwayFrame) {
353 
354             receivedGoAwayFrame = true;
355 
356         } else if (msg instanceof SpdyHeadersFrame) {
357 
358             SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
359             int streamId = spdyHeadersFrame.streamId();
360 
361             // Check if we received a valid HEADERS frame
362             if (spdyHeadersFrame.isInvalid()) {
363                 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
364                 return;
365             }
366 
367             if (spdySession.isRemoteSideClosed(streamId)) {
368                 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
369                 return;
370             }
371 
372             // Close the remote side of the stream if this is the last frame
373             if (spdyHeadersFrame.isLast()) {
374                 halfCloseStream(streamId, true, ctx.newSucceededFuture());
375             }
376 
377         } else if (msg instanceof SpdyWindowUpdateFrame) {
378 
379             /*
380              * SPDY WINDOW_UPDATE frame processing requirements:
381              *
382              * Receivers of a WINDOW_UPDATE that cause the window size to exceed 2^31
383              * must send a RST_STREAM with the getStatus code FLOW_CONTROL_ERROR.
384              *
385              * Sender should ignore all WINDOW_UPDATE frames associated with a stream
386              * after sending the last frame for the stream.
387              */
388 
389             SpdyWindowUpdateFrame spdyWindowUpdateFrame = (SpdyWindowUpdateFrame) msg;
390             int streamId = spdyWindowUpdateFrame.streamId();
391             int deltaWindowSize = spdyWindowUpdateFrame.deltaWindowSize();
392 
393             // Ignore frames for half-closed streams
394             if (streamId != SPDY_SESSION_STREAM_ID && spdySession.isLocalSideClosed(streamId)) {
395                 return;
396             }
397 
398             // Check for numerical overflow
399             if (spdySession.getSendWindowSize(streamId) > Integer.MAX_VALUE - deltaWindowSize) {
400                 if (streamId == SPDY_SESSION_STREAM_ID) {
401                     issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
402                 } else {
403                     issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
404                 }
405                 return;
406             }
407 
408             updateSendWindowSize(ctx, streamId, deltaWindowSize);
409         }
410 
411         ctx.fireChannelRead(msg);
412     }
413 
414     @Override
415     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
416         for (Integer streamId: spdySession.activeStreams().keySet()) {
417             removeStream(streamId, ctx.newSucceededFuture());
418         }
419         ctx.fireChannelInactive();
420     }
421 
422     @Override
423     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
424         if (cause instanceof SpdyProtocolException) {
425             issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
426         }
427 
428         ctx.fireExceptionCaught(cause);
429     }
430 
431     @Override
432     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
433         sendGoAwayFrame(ctx, promise);
434     }
435 
436     @Override
437     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
438         if (msg instanceof SpdyDataFrame ||
439             msg instanceof SpdySynStreamFrame ||
440             msg instanceof SpdySynReplyFrame ||
441             msg instanceof SpdyRstStreamFrame ||
442             msg instanceof SpdySettingsFrame ||
443             msg instanceof SpdyPingFrame ||
444             msg instanceof SpdyGoAwayFrame ||
445             msg instanceof SpdyHeadersFrame ||
446             msg instanceof SpdyWindowUpdateFrame) {
447 
448             handleOutboundMessage(ctx, msg, promise);
449         } else {
450             ctx.write(msg, promise);
451         }
452     }
453 
454     private void handleOutboundMessage(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
455         if (msg instanceof SpdyDataFrame) {
456 
457             SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
458             int streamId = spdyDataFrame.streamId();
459 
460             // Frames must not be sent on half-closed streams
461             if (spdySession.isLocalSideClosed(streamId)) {
462                 spdyDataFrame.release();
463                 promise.setFailure(PROTOCOL_EXCEPTION);
464                 return;
465             }
466 
467             /*
468              * SPDY Data frame flow control processing requirements:
469              *
470              * Sender must not send a data frame with data length greater
471              * than the transfer window size.
472              *
473              * After sending each data frame, the sender decrements its
474              * transfer window size by the amount of data transmitted.
475              *
476              * When the window size becomes less than or equal to 0, the
477              * sender must pause transmitting data frames.
478              */
479 
480             int dataLength = spdyDataFrame.content().readableBytes();
481             int sendWindowSize = spdySession.getSendWindowSize(streamId);
482             int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
483             sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
484 
485             if (sendWindowSize <= 0) {
486                 // Stream is stalled -- enqueue Data frame and return
487                 spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
488                 return;
489             } else if (sendWindowSize < dataLength) {
490                 // Stream is not stalled but we cannot send the entire frame
491                 spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize);
492                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
493 
494                 // Create a partial data frame whose length is the current window size
495                 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(
496                         streamId, spdyDataFrame.content().readRetainedSlice(sendWindowSize));
497 
498                 // Enqueue the remaining data (will be the first frame queued)
499                 spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
500 
501                 // The transfer window size is pre-decremented when sending a data frame downstream.
502                 // Close the session on write failures that leave the transfer window in a corrupt state.
503                 final ChannelHandlerContext context = ctx;
504                 ctx.write(partialDataFrame).addListener(new ChannelFutureListener() {
505                     @Override
506                     public void operationComplete(ChannelFuture future) throws Exception {
507                         if (!future.isSuccess()) {
508                             issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
509                         }
510                     }
511                 });
512                 return;
513             } else {
514                 // Window size is large enough to send entire data frame
515                 spdySession.updateSendWindowSize(streamId, -1 * dataLength);
516                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength);
517 
518                 // The transfer window size is pre-decremented when sending a data frame downstream.
519                 // Close the session on write failures that leave the transfer window in a corrupt state.
520                 final ChannelHandlerContext context = ctx;
521                 promise.addListener(new ChannelFutureListener() {
522                     @Override
523                     public void operationComplete(ChannelFuture future) throws Exception {
524                         if (!future.isSuccess()) {
525                             issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
526                         }
527                     }
528                 });
529             }
530 
531             // Close the local side of the stream if this is the last frame
532             if (spdyDataFrame.isLast()) {
533                 halfCloseStream(streamId, false, promise);
534             }
535 
536         } else if (msg instanceof SpdySynStreamFrame) {
537 
538             SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
539             int streamId = spdySynStreamFrame.streamId();
540 
541             if (isRemoteInitiatedId(streamId)) {
542                 promise.setFailure(PROTOCOL_EXCEPTION);
543                 return;
544             }
545 
546             byte priority = spdySynStreamFrame.priority();
547             boolean remoteSideClosed = spdySynStreamFrame.isUnidirectional();
548             boolean localSideClosed = spdySynStreamFrame.isLast();
549             if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
550                 promise.setFailure(PROTOCOL_EXCEPTION);
551                 return;
552             }
553 
554         } else if (msg instanceof SpdySynReplyFrame) {
555 
556             SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
557             int streamId = spdySynReplyFrame.streamId();
558 
559             // Frames must not be sent on half-closed streams
560             if (!isRemoteInitiatedId(streamId) || spdySession.isLocalSideClosed(streamId)) {
561                 promise.setFailure(PROTOCOL_EXCEPTION);
562                 return;
563             }
564 
565             // Close the local side of the stream if this is the last frame
566             if (spdySynReplyFrame.isLast()) {
567                 halfCloseStream(streamId, false, promise);
568             }
569 
570         } else if (msg instanceof SpdyRstStreamFrame) {
571 
572             SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
573             removeStream(spdyRstStreamFrame.streamId(), promise);
574 
575         } else if (msg instanceof SpdySettingsFrame) {
576 
577             SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
578 
579             int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
580             if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
581                 // Settings frame had the wrong minor version
582                 promise.setFailure(PROTOCOL_EXCEPTION);
583                 return;
584             }
585 
586             int newConcurrentStreams =
587                     spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
588             if (newConcurrentStreams >= 0) {
589                 localConcurrentStreams = newConcurrentStreams;
590             }
591 
592             // Persistence flag are inconsistent with the use of SETTINGS to communicate
593             // the initial window size. Remove flags from the sender requesting that the
594             // value be persisted. Remove values that the sender indicates are persisted.
595             if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
596                 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
597             }
598             spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
599 
600             int newInitialWindowSize =
601                     spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
602             if (newInitialWindowSize >= 0) {
603                 updateInitialReceiveWindowSize(newInitialWindowSize);
604             }
605 
606         } else if (msg instanceof SpdyPingFrame) {
607 
608             SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
609             if (isRemoteInitiatedId(spdyPingFrame.id())) {
610                 ctx.fireExceptionCaught(new IllegalArgumentException(
611                             "invalid PING ID: " + spdyPingFrame.id()));
612                 return;
613             }
614             pings.getAndIncrement();
615 
616         } else if (msg instanceof SpdyGoAwayFrame) {
617 
618             // Why is this being sent? Intercept it and fail the write.
619             // Should have sent a CLOSE ChannelStateEvent
620             promise.setFailure(PROTOCOL_EXCEPTION);
621             return;
622 
623         } else if (msg instanceof SpdyHeadersFrame) {
624 
625             SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
626             int streamId = spdyHeadersFrame.streamId();
627 
628             // Frames must not be sent on half-closed streams
629             if (spdySession.isLocalSideClosed(streamId)) {
630                 promise.setFailure(PROTOCOL_EXCEPTION);
631                 return;
632             }
633 
634             // Close the local side of the stream if this is the last frame
635             if (spdyHeadersFrame.isLast()) {
636                 halfCloseStream(streamId, false, promise);
637             }
638 
639         } else if (msg instanceof SpdyWindowUpdateFrame) {
640 
641             // Why is this being sent? Intercept it and fail the write.
642             promise.setFailure(PROTOCOL_EXCEPTION);
643             return;
644         }
645 
646         ctx.write(msg, promise);
647     }
648 
649     /*
650      * SPDY Session Error Handling:
651      *
652      * When a session error occurs, the endpoint encountering the error must first
653      * send a GOAWAY frame with the Stream-ID of the most recently received stream
654      * from the remote endpoint, and the error code for why the session is terminating.
655      *
656      * After sending the GOAWAY frame, the endpoint must close the TCP connection.
657      */
658     private void issueSessionError(
659             ChannelHandlerContext ctx, SpdySessionStatus status) {
660 
661         sendGoAwayFrame(ctx, status).addListener(new ClosingChannelFutureListener(ctx, ctx.newPromise()));
662     }
663 
664     /*
665      * SPDY Stream Error Handling:
666      *
667      * Upon a stream error, the endpoint must send a RST_STREAM frame which contains
668      * the Stream-ID for the stream where the error occurred and the error getStatus which
669      * caused the error.
670      *
671      * After sending the RST_STREAM, the stream is closed to the sending endpoint.
672      *
673      * Note: this is only called by the worker thread
674      */
675     private void issueStreamError(ChannelHandlerContext ctx, int streamId, SpdyStreamStatus status) {
676         boolean fireChannelRead = !spdySession.isRemoteSideClosed(streamId);
677         ChannelPromise promise = ctx.newPromise();
678         removeStream(streamId, promise);
679 
680         SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, status);
681         ctx.writeAndFlush(spdyRstStreamFrame, promise);
682         if (fireChannelRead) {
683             ctx.fireChannelRead(spdyRstStreamFrame);
684         }
685     }
686 
687     /*
688      * Helper functions
689      */
690 
691     private boolean isRemoteInitiatedId(int id) {
692         boolean serverId = isServerId(id);
693         return server && !serverId || !server && serverId;
694     }
695 
696     // need to synchronize to prevent new streams from being created while updating active streams
697     private void updateInitialSendWindowSize(int newInitialWindowSize) {
698         int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
699         initialSendWindowSize = newInitialWindowSize;
700         spdySession.updateAllSendWindowSizes(deltaWindowSize);
701     }
702 
703     // need to synchronize to prevent new streams from being created while updating active streams
704     private void updateInitialReceiveWindowSize(int newInitialWindowSize) {
705         int deltaWindowSize = newInitialWindowSize - initialReceiveWindowSize;
706         initialReceiveWindowSize = newInitialWindowSize;
707         spdySession.updateAllReceiveWindowSizes(deltaWindowSize);
708     }
709 
710     // need to synchronize accesses to sentGoAwayFrame, lastGoodStreamId, and initial window sizes
711     private boolean acceptStream(
712             int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed) {
713         // Cannot initiate any new streams after receiving or sending GOAWAY
714         if (receivedGoAwayFrame || sentGoAwayFrame) {
715             return false;
716         }
717 
718         boolean remote = isRemoteInitiatedId(streamId);
719         int maxConcurrentStreams = remote ? localConcurrentStreams : remoteConcurrentStreams;
720         if (spdySession.numActiveStreams(remote) >= maxConcurrentStreams) {
721             return false;
722         }
723         spdySession.acceptStream(
724                 streamId, priority, remoteSideClosed, localSideClosed,
725                 initialSendWindowSize, initialReceiveWindowSize, remote);
726         if (remote) {
727             lastGoodStreamId = streamId;
728         }
729         return true;
730     }
731 
732     private void halfCloseStream(int streamId, boolean remote, ChannelFuture future) {
733         if (remote) {
734             spdySession.closeRemoteSide(streamId, isRemoteInitiatedId(streamId));
735         } else {
736             spdySession.closeLocalSide(streamId, isRemoteInitiatedId(streamId));
737         }
738         if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
739             future.addListener(closeSessionFutureListener);
740         }
741     }
742 
743     private void removeStream(int streamId, ChannelFuture future) {
744         spdySession.removeStream(streamId, STREAM_CLOSED, isRemoteInitiatedId(streamId));
745 
746         if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
747             future.addListener(closeSessionFutureListener);
748         }
749     }
750 
751     private void updateSendWindowSize(final ChannelHandlerContext ctx, int streamId, int deltaWindowSize) {
752         spdySession.updateSendWindowSize(streamId, deltaWindowSize);
753 
754         while (true) {
755             // Check if we have unblocked a stalled stream
756             SpdySession.PendingWrite pendingWrite = spdySession.getPendingWrite(streamId);
757             if (pendingWrite == null) {
758                 return;
759             }
760 
761             SpdyDataFrame spdyDataFrame = pendingWrite.spdyDataFrame;
762             int dataFrameSize = spdyDataFrame.content().readableBytes();
763             int writeStreamId = spdyDataFrame.streamId();
764             int sendWindowSize = spdySession.getSendWindowSize(writeStreamId);
765             int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
766             sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
767 
768             if (sendWindowSize <= 0) {
769                 return;
770             } else if (sendWindowSize < dataFrameSize) {
771                 // We can send a partial frame
772                 spdySession.updateSendWindowSize(writeStreamId, -1 * sendWindowSize);
773                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
774 
775                 // Create a partial data frame whose length is the current window size
776                 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(
777                         writeStreamId, spdyDataFrame.content().readRetainedSlice(sendWindowSize));
778 
779                 // The transfer window size is pre-decremented when sending a data frame downstream.
780                 // Close the session on write failures that leave the transfer window in a corrupt state.
781                 ctx.writeAndFlush(partialDataFrame).addListener(new ChannelFutureListener() {
782                     @Override
783                     public void operationComplete(ChannelFuture future) throws Exception {
784                         if (!future.isSuccess()) {
785                             issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
786                         }
787                     }
788                 });
789             } else {
790                 // Window size is large enough to send entire data frame
791                 spdySession.removePendingWrite(writeStreamId);
792                 spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize);
793                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize);
794 
795                 // Close the local side of the stream if this is the last frame
796                 if (spdyDataFrame.isLast()) {
797                     halfCloseStream(writeStreamId, false, pendingWrite.promise);
798                 }
799 
800                 // The transfer window size is pre-decremented when sending a data frame downstream.
801                 // Close the session on write failures that leave the transfer window in a corrupt state.
802                 ctx.writeAndFlush(spdyDataFrame, pendingWrite.promise).addListener(new ChannelFutureListener() {
803                     @Override
804                     public void operationComplete(ChannelFuture future) throws Exception {
805                         if (!future.isSuccess()) {
806                             issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
807                         }
808                     }
809                 });
810             }
811         }
812     }
813 
814     private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelPromise future) {
815         // Avoid NotYetConnectedException
816         if (!ctx.channel().isActive()) {
817             ctx.close(future);
818             return;
819         }
820 
821         ChannelFuture f = sendGoAwayFrame(ctx, SpdySessionStatus.OK);
822         if (spdySession.noActiveStreams()) {
823             f.addListener(new ClosingChannelFutureListener(ctx, future));
824         } else {
825             closeSessionFutureListener = new ClosingChannelFutureListener(ctx, future);
826         }
827         // FIXME: Close the connection forcibly after timeout.
828     }
829 
830     private ChannelFuture sendGoAwayFrame(
831             ChannelHandlerContext ctx, SpdySessionStatus status) {
832         if (!sentGoAwayFrame) {
833             sentGoAwayFrame = true;
834             SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamId, status);
835             return ctx.writeAndFlush(spdyGoAwayFrame);
836         } else {
837             return ctx.newSucceededFuture();
838         }
839     }
840 
841     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
842         private final ChannelHandlerContext ctx;
843         private final ChannelPromise promise;
844 
845         ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
846             this.ctx = ctx;
847             this.promise = promise;
848         }
849 
850         @Override
851         public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
852             ctx.close(promise);
853         }
854     }
855 }