View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.spdy;
17  
18  import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*;
19  
20  import java.net.SocketAddress;
21  import java.nio.channels.ClosedChannelException;
22  import java.util.concurrent.atomic.AtomicInteger;
23  
24  import org.jboss.netty.channel.Channel;
25  import org.jboss.netty.channel.ChannelDownstreamHandler;
26  import org.jboss.netty.channel.ChannelEvent;
27  import org.jboss.netty.channel.ChannelFuture;
28  import org.jboss.netty.channel.ChannelFutureListener;
29  import org.jboss.netty.channel.ChannelHandlerContext;
30  import org.jboss.netty.channel.ChannelStateEvent;
31  import org.jboss.netty.channel.Channels;
32  import org.jboss.netty.channel.ExceptionEvent;
33  import org.jboss.netty.channel.MessageEvent;
34  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
35  
36  /**
37   * Manages streams within a SPDY session.
38   */
39  public class SpdySessionHandler extends SimpleChannelUpstreamHandler
40          implements ChannelDownstreamHandler {
41  
42      private static final SpdyProtocolException PROTOCOL_EXCEPTION = new SpdyProtocolException();
43  
44      private final SpdySession spdySession = new SpdySession();
45      private volatile int lastGoodStreamID;
46  
47      private volatile int remoteConcurrentStreams;
48      private volatile int localConcurrentStreams;
49      private volatile int maxConcurrentStreams;
50  
51      private static final int DEFAULT_WINDOW_SIZE = 64 * 1024; // 64 KB default initial window size
52      private volatile int initialSendWindowSize = DEFAULT_WINDOW_SIZE;
53      private volatile int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
54  
55      private final Object flowControlLock = new Object();
56  
57      private final AtomicInteger pings = new AtomicInteger();
58  
59      private volatile boolean sentGoAwayFrame;
60      private volatile boolean receivedGoAwayFrame;
61  
62      private volatile ChannelFuture closeSessionFuture;
63  
64      private final boolean server;
65      private final boolean flowControl;
66  
67      /**
68       * Creates a new SPDY/2 session handler.
69       *
70       * @param server {@code true} if and only if this session handler should
71       *               handle the server endpoint of the connection.
72       *               {@code false} if and only if this session handler should
73       *               handle the client endpoint of the connection.
74       */
75      @Deprecated
76      public SpdySessionHandler(boolean server) {
77          this(2, server);
78      }
79  
80      /**
81       * Creates a new session handler.
82       *
83       * @param version the protocol version
84       * @param server  {@code true} if and only if this session handler should
85       *                handle the server endpoint of the connection.
86       *                {@code false} if and only if this session handler should
87       *                handle the client endpoint of the connection.
88       */
89      public SpdySessionHandler(int version, boolean server) {
90          if (version < SPDY_MIN_VERSION || version > SPDY_MAX_VERSION) {
91              throw new IllegalArgumentException(
92                      "unsupported version: " + version);
93          }
94          this.server = server;
95          flowControl = version >= 3;
96      }
97  
98      @Override
99      public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
100             throws Exception {
101 
102         Object msg = e.getMessage();
103         if (msg instanceof SpdyDataFrame) {
104 
105             /*
106              * SPDY Data frame processing requirements:
107              *
108              * If an endpoint receives a data frame for a Stream-ID which is not open
109              * and the endpoint has not sent a GOAWAY frame, it must issue a stream error
110              * with the error code INVALID_STREAM for the Stream-ID.
111              *
112              * If an endpoint which created the stream receives a data frame before receiving
113              * a SYN_REPLY on that stream, it is a protocol error, and the recipient must
114              * issue a stream error with the status code PROTOCOL_ERROR for the Stream-ID.
115              *
116              * If an endpoint receives multiple data frames for invalid Stream-IDs,
117              * it may close the session.
118              *
119              * If an endpoint refuses a stream it must ignore any data frames for that stream.
120              *
121              * If an endpoint receives a data frame after the stream is half-closed from the
122              * sender, it must send a RST_STREAM frame with the status STREAM_ALREADY_CLOSED.
123              *
124              * If an endpoint receives a data frame after the stream is closed, it must send
125              * a RST_STREAM frame with the status PROTOCOL_ERROR.
126              */
127 
128             SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
129             int streamID = spdyDataFrame.getStreamId();
130 
131             // Check if we received a data frame for a Stream-ID which is not open
132             if (!spdySession.isActiveStream(streamID)) {
133                 if (streamID <= lastGoodStreamID) {
134                     issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
135                 } else if (!sentGoAwayFrame) {
136                     issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.INVALID_STREAM);
137                 }
138                 return;
139             }
140 
141             // Check if we received a data frame for a stream which is half-closed
142             if (spdySession.isRemoteSideClosed(streamID)) {
143                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.STREAM_ALREADY_CLOSED);
144                 return;
145             }
146 
147             // Check if we received a data frame before receiving a SYN_REPLY
148             if (!isRemoteInitiatedID(streamID) && !spdySession.hasReceivedReply(streamID)) {
149                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
150                 return;
151             }
152 
153             /*
154             * SPDY Data frame flow control processing requirements:
155             *
156             * Recipient should not send a WINDOW_UPDATE frame as it consumes the last data frame.
157             */
158 
159             if (flowControl) {
160                 // Update receive window size
161                 int deltaWindowSize = -1 * spdyDataFrame.getData().readableBytes();
162                 int newWindowSize = spdySession.updateReceiveWindowSize(streamID, deltaWindowSize);
163 
164                 // Window size can become negative if we sent a SETTINGS frame that reduces the
165                 // size of the transfer window after the peer has written data frames.
166                 // The value is bounded by the length that SETTINGS frame decrease the window.
167                 // This difference is stored for the session when writing the SETTINGS frame
168                 // and is cleared once we send a WINDOW_UPDATE frame.
169                 if (newWindowSize < spdySession.getReceiveWindowSizeLowerBound(streamID)) {
170                     issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.FLOW_CONTROL_ERROR);
171                     return;
172                 }
173 
174                 // Window size became negative due to sender writing frame before receiving SETTINGS
175                 // Send data frames upstream in initialReceiveWindowSize chunks
176                 if (newWindowSize < 0) {
177                     while (spdyDataFrame.getData().readableBytes() > initialReceiveWindowSize) {
178                         SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
179                         partialDataFrame.setData(spdyDataFrame.getData().readSlice(initialReceiveWindowSize));
180                         Channels.fireMessageReceived(ctx, partialDataFrame, e.getRemoteAddress());
181                     }
182                 }
183 
184                 // Send a WINDOW_UPDATE frame if less than half the window size remains
185                 if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) {
186                     deltaWindowSize = initialReceiveWindowSize - newWindowSize;
187                     spdySession.updateReceiveWindowSize(streamID, deltaWindowSize);
188                     SpdyWindowUpdateFrame spdyWindowUpdateFrame =
189                             new DefaultSpdyWindowUpdateFrame(streamID, deltaWindowSize);
190                     Channels.write(
191                             ctx, Channels.future(e.getChannel()), spdyWindowUpdateFrame, e.getRemoteAddress());
192                 }
193             }
194 
195             // Close the remote side of the stream if this is the last frame
196             if (spdyDataFrame.isLast()) {
197                 halfCloseStream(streamID, true);
198             }
199 
200         } else if (msg instanceof SpdySynStreamFrame) {
201 
202             /*
203              * SPDY SYN_STREAM frame processing requirements:
204              *
205              * If an endpoint receives a SYN_STREAM with a Stream-ID that is less than
206              * any previously received SYN_STREAM, it must issue a session error with
207              * the status PROTOCOL_ERROR.
208              *
209              * If an endpoint receives multiple SYN_STREAM frames with the same active
210              * Stream-ID, it must issue a stream error with the status code PROTOCOL_ERROR.
211              *
212              * The recipient can reject a stream by sending a stream error with the
213              * status code REFUSED_STREAM.
214              */
215 
216             SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
217             int streamID = spdySynStreamFrame.getStreamId();
218 
219             // Check if we received a valid SYN_STREAM frame
220             if (spdySynStreamFrame.isInvalid() ||
221                 !isRemoteInitiatedID(streamID) ||
222                 spdySession.isActiveStream(streamID)) {
223                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
224                 return;
225             }
226 
227             // Stream-IDs must be monotonically increasing
228             if (streamID <= lastGoodStreamID) {
229                 issueSessionError(ctx, e.getChannel(), e.getRemoteAddress(), SpdySessionStatus.PROTOCOL_ERROR);
230                 return;
231             }
232 
233             // Try to accept the stream
234             byte priority = spdySynStreamFrame.getPriority();
235             boolean remoteSideClosed = spdySynStreamFrame.isLast();
236             boolean localSideClosed = spdySynStreamFrame.isUnidirectional();
237             if (!acceptStream(streamID, priority, remoteSideClosed, localSideClosed)) {
238                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.REFUSED_STREAM);
239                 return;
240             }
241 
242         } else if (msg instanceof SpdySynReplyFrame) {
243 
244             /*
245              * SPDY SYN_REPLY frame processing requirements:
246              *
247              * If an endpoint receives multiple SYN_REPLY frames for the same active Stream-ID
248              * it must issue a stream error with the status code STREAM_IN_USE.
249              */
250 
251             SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
252             int streamID = spdySynReplyFrame.getStreamId();
253 
254             // Check if we received a valid SYN_REPLY frame
255             if (spdySynReplyFrame.isInvalid() ||
256                 isRemoteInitiatedID(streamID) ||
257                 spdySession.isRemoteSideClosed(streamID)) {
258                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.INVALID_STREAM);
259                 return;
260             }
261 
262             // Check if we have received multiple frames for the same Stream-ID
263             if (spdySession.hasReceivedReply(streamID)) {
264                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.STREAM_IN_USE);
265                 return;
266             }
267 
268             spdySession.receivedReply(streamID);
269 
270             // Close the remote side of the stream if this is the last frame
271             if (spdySynReplyFrame.isLast()) {
272                 halfCloseStream(streamID, true);
273             }
274 
275         } else if (msg instanceof SpdyRstStreamFrame) {
276 
277             /*
278              * SPDY RST_STREAM frame processing requirements:
279              *
280              * After receiving a RST_STREAM on a stream, the receiver must not send
281              * additional frames on that stream.
282              *
283              * An endpoint must not send a RST_STREAM in response to a RST_STREAM.
284              */
285 
286             SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
287             removeStream(spdyRstStreamFrame.getStreamId());
288 
289         } else if (msg instanceof SpdySettingsFrame) {
290 
291             SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
292 
293             int newConcurrentStreams =
294                 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
295             if (newConcurrentStreams >= 0) {
296                 updateConcurrentStreams(newConcurrentStreams, true);
297             }
298 
299             // Persistence flag are inconsistent with the use of SETTINGS to communicate
300             // the initial window size. Remove flags from the sender requesting that the
301             // value be persisted. Remove values that the sender indicates are persisted.
302             if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
303                 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
304             }
305             spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
306 
307             if (flowControl) {
308                 int newInitialWindowSize =
309                     spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
310                 if (newInitialWindowSize >= 0) {
311                     updateInitialSendWindowSize(newInitialWindowSize);
312                 }
313             }
314 
315         } else if (msg instanceof SpdyPingFrame) {
316 
317             /*
318              * SPDY PING frame processing requirements:
319              *
320              * Receivers of a PING frame should send an identical frame to the sender
321              * as soon as possible.
322              *
323              * Receivers of a PING frame must ignore frames that it did not initiate
324              */
325 
326             SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
327 
328             if (isRemoteInitiatedID(spdyPingFrame.getId())) {
329                 Channels.write(ctx, Channels.future(e.getChannel()), spdyPingFrame, e.getRemoteAddress());
330                 return;
331             }
332 
333             // Note: only checks that there are outstanding pings since uniqueness is not enforced
334             if (pings.get() == 0) {
335                 return;
336             }
337             pings.getAndDecrement();
338 
339         } else if (msg instanceof SpdyGoAwayFrame) {
340 
341             receivedGoAwayFrame = true;
342 
343         } else if (msg instanceof SpdyHeadersFrame) {
344 
345             SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
346             int streamID = spdyHeadersFrame.getStreamId();
347 
348             // Check if we received a valid HEADERS frame
349             if (spdyHeadersFrame.isInvalid()) {
350                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
351                 return;
352             }
353 
354             if (spdySession.isRemoteSideClosed(streamID)) {
355                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.INVALID_STREAM);
356                 return;
357             }
358 
359             // Close the remote side of the stream if this is the last frame
360             if (spdyHeadersFrame.isLast()) {
361                 halfCloseStream(streamID, true);
362             }
363 
364         } else if (msg instanceof SpdyWindowUpdateFrame) {
365 
366             /*
367              * SPDY WINDOW_UPDATE frame processing requirements:
368              *
369              * Receivers of a WINDOW_UPDATE that cause the window size to exceed 2^31
370              * must send a RST_STREAM with the status code FLOW_CONTROL_ERROR.
371              *
372              * Sender should ignore all WINDOW_UPDATE frames associated with a stream
373              * after sending the last frame for the stream.
374              */
375 
376             if (flowControl) {
377                 SpdyWindowUpdateFrame spdyWindowUpdateFrame = (SpdyWindowUpdateFrame) msg;
378                 int streamID = spdyWindowUpdateFrame.getStreamId();
379                 int deltaWindowSize = spdyWindowUpdateFrame.getDeltaWindowSize();
380 
381                 // Ignore frames for half-closed streams
382                 if (spdySession.isLocalSideClosed(streamID)) {
383                     return;
384                 }
385 
386                 // Check for numerical overflow
387                 if (spdySession.getSendWindowSize(streamID) > Integer.MAX_VALUE - deltaWindowSize) {
388                     issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.FLOW_CONTROL_ERROR);
389                     return;
390                 }
391 
392                 updateSendWindowSize(ctx, streamID, deltaWindowSize);
393             }
394             return;
395         }
396 
397         super.messageReceived(ctx, e);
398     }
399 
400     @Override
401     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
402             throws Exception {
403 
404         Throwable cause = e.getCause();
405         if (cause instanceof SpdyProtocolException) {
406             issueSessionError(ctx, e.getChannel(), null, SpdySessionStatus.PROTOCOL_ERROR);
407         }
408 
409         super.exceptionCaught(ctx, e);
410     }
411 
412     public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
413             throws Exception {
414         if (evt instanceof ChannelStateEvent) {
415             ChannelStateEvent e = (ChannelStateEvent) evt;
416             switch (e.getState()) {
417             case OPEN:
418             case CONNECTED:
419             case BOUND:
420 
421                 /*
422                  * SPDY connection requirements:
423                  *
424                  * When either endpoint closes the transport-level connection,
425                  * it must first send a GOAWAY frame.
426                  */
427                 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
428                     sendGoAwayFrame(ctx, e);
429                     return;
430                 }
431             }
432         }
433         if (!(evt instanceof MessageEvent)) {
434             ctx.sendDownstream(evt);
435             return;
436         }
437 
438         MessageEvent e = (MessageEvent) evt;
439         Object msg = e.getMessage();
440 
441         if (msg instanceof SpdyDataFrame) {
442 
443             SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
444             final int streamID = spdyDataFrame.getStreamId();
445 
446             // Frames must not be sent on half-closed streams
447             if (spdySession.isLocalSideClosed(streamID)) {
448                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
449                 return;
450             }
451 
452             /*
453              * SPDY Data frame flow control processing requirements:
454              *
455              * Sender must not send a data frame with data length greater
456              * than the transfer window size.
457              *
458              * After sending each data frame, the sender decrements its
459              * transfer window size by the amount of data transmitted.
460              *
461              * When the window size becomes less than or equal to 0, the
462              * sender must pause transmitting data frames.
463              */
464 
465             if (flowControl) {
466                 synchronized (flowControlLock) {
467                     int dataLength = spdyDataFrame.getData().readableBytes();
468                     int sendWindowSize = spdySession.getSendWindowSize(streamID);
469 
470                     if (sendWindowSize >= dataLength) {
471                         // Window size is large enough to send entire data frame
472                         spdySession.updateSendWindowSize(streamID, -1 * dataLength);
473 
474                         // The transfer window size is pre-decremented when sending a data frame downstream.
475                         // Close the stream on write failures that leaves the transfer window in a corrupt state.
476                         final SocketAddress remoteAddress = e.getRemoteAddress();
477                         final ChannelHandlerContext context = ctx;
478                         e.getFuture().addListener(new ChannelFutureListener() {
479                             public void operationComplete(ChannelFuture future) throws Exception {
480                                 if (!future.isSuccess()) {
481                                     issueStreamError(
482                                             context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
483                                 }
484                             }
485                         });
486 
487                     } else if (sendWindowSize > 0) {
488                         // Stream is not stalled but we cannot send the entire frame
489                         spdySession.updateSendWindowSize(streamID, -1 * sendWindowSize);
490 
491                         // Create a partial data frame whose length is the current window size
492                         SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
493                         partialDataFrame.setData(spdyDataFrame.getData().readSlice(sendWindowSize));
494 
495                         // Enqueue the remaining data (will be the first frame queued)
496                         spdySession.putPendingWrite(streamID, e);
497 
498                         ChannelFuture writeFuture = Channels.future(e.getChannel());
499 
500                         // The transfer window size is pre-decremented when sending a data frame downstream.
501                         // Close the stream on write failures that leaves the transfer window in a corrupt state.
502                         final SocketAddress remoteAddress = e.getRemoteAddress();
503                         final ChannelHandlerContext context = ctx;
504                         e.getFuture().addListener(new ChannelFutureListener() {
505                             public void operationComplete(ChannelFuture future) throws Exception {
506                                 if (!future.isSuccess()) {
507                                     issueStreamError(
508                                             context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
509                                 }
510                             }
511                         });
512 
513                         Channels.write(ctx, writeFuture, partialDataFrame, remoteAddress);
514                         return;
515 
516                     } else {
517                         // Stream is stalled -- enqueue Data frame and return
518                         spdySession.putPendingWrite(streamID, e);
519                         return;
520                     }
521                 }
522             }
523 
524             // Close the local side of the stream if this is the last frame
525             if (spdyDataFrame.isLast()) {
526                 halfCloseStream(streamID, false);
527             }
528 
529         } else if (msg instanceof SpdySynStreamFrame) {
530 
531             SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
532             int streamID = spdySynStreamFrame.getStreamId();
533 
534             if (isRemoteInitiatedID(streamID)) {
535                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
536                 return;
537             }
538 
539             byte priority = spdySynStreamFrame.getPriority();
540             boolean remoteSideClosed = spdySynStreamFrame.isUnidirectional();
541             boolean localSideClosed = spdySynStreamFrame.isLast();
542             if (!acceptStream(streamID, priority, remoteSideClosed, localSideClosed)) {
543                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
544                 return;
545             }
546 
547         } else if (msg instanceof SpdySynReplyFrame) {
548 
549             SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
550             int streamID = spdySynReplyFrame.getStreamId();
551 
552             // Frames must not be sent on half-closed streams
553             if (!isRemoteInitiatedID(streamID) || spdySession.isLocalSideClosed(streamID)) {
554                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
555                 return;
556             }
557 
558             // Close the local side of the stream if this is the last frame
559             if (spdySynReplyFrame.isLast()) {
560                 halfCloseStream(streamID, false);
561             }
562 
563         } else if (msg instanceof SpdyRstStreamFrame) {
564 
565             SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
566             removeStream(spdyRstStreamFrame.getStreamId());
567 
568         } else if (msg instanceof SpdySettingsFrame) {
569 
570             SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
571 
572             int newConcurrentStreams =
573                     spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
574             if (newConcurrentStreams >= 0) {
575                 updateConcurrentStreams(newConcurrentStreams, false);
576             }
577 
578             // Persistence flag are inconsistent with the use of SETTINGS to communicate
579             // the initial window size. Remove flags from the sender requesting that the
580             // value be persisted. Remove values that the sender indicates are persisted.
581             if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
582                 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
583             }
584             spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
585 
586             if (flowControl) {
587                 int newInitialWindowSize =
588                         spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
589                 if (newInitialWindowSize >= 0) {
590                     updateInitialReceiveWindowSize(newInitialWindowSize);
591                 }
592             }
593 
594         } else if (msg instanceof SpdyPingFrame) {
595 
596             SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
597             if (isRemoteInitiatedID(spdyPingFrame.getId())) {
598                 e.getFuture().setFailure(new IllegalArgumentException(
599                             "invalid PING ID: " + spdyPingFrame.getId()));
600                 return;
601             }
602             pings.getAndIncrement();
603 
604         } else if (msg instanceof SpdyGoAwayFrame) {
605 
606             // Why is this being sent? Intercept it and fail the write.
607             // Should have sent a CLOSE ChannelStateEvent
608             e.getFuture().setFailure(PROTOCOL_EXCEPTION);
609             return;
610 
611         } else if (msg instanceof SpdyHeadersFrame) {
612 
613             SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
614             int streamID = spdyHeadersFrame.getStreamId();
615 
616             // Frames must not be sent on half-closed streams
617             if (spdySession.isLocalSideClosed(streamID)) {
618                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
619                 return;
620             }
621 
622             // Close the local side of the stream if this is the last frame
623             if (spdyHeadersFrame.isLast()) {
624                 halfCloseStream(streamID, false);
625             }
626 
627         } else if (msg instanceof SpdyWindowUpdateFrame) {
628 
629             // Why is this being sent? Intercept it and fail the write.
630             e.getFuture().setFailure(PROTOCOL_EXCEPTION);
631             return;
632         }
633 
634         ctx.sendDownstream(evt);
635     }
636 
637     /*
638      * SPDY Session Error Handling:
639      *
640      * When a session error occurs, the endpoint encountering the error must first
641      * send a GOAWAY frame with the Stream-ID of the most recently received stream
642      * from the remote endpoint, and the error code for why the session is terminating.
643      *
644      * After sending the GOAWAY frame, the endpoint must close the TCP connection.
645      */
646     private void issueSessionError(
647             ChannelHandlerContext ctx, Channel channel, SocketAddress remoteAddress, SpdySessionStatus status) {
648 
649         ChannelFuture future = sendGoAwayFrame(ctx, channel, remoteAddress, status);
650         future.addListener(ChannelFutureListener.CLOSE);
651     }
652 
653     /*
654      * SPDY Stream Error Handling:
655      *
656      * Upon a stream error, the endpoint must send a RST_STREAM frame which contains
657      * the Stream-ID for the stream where the error occurred and the error status which
658      * caused the error.
659      *
660      * After sending the RST_STREAM, the stream is closed to the sending endpoint.
661      *
662      * Note: this is only called by the worker thread
663      */
664     private void issueStreamError(
665             ChannelHandlerContext ctx, SocketAddress remoteAddress, int streamID, SpdyStreamStatus status) {
666 
667         boolean fireMessageReceived = !spdySession.isRemoteSideClosed(streamID);
668         removeStream(streamID);
669 
670         SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamID, status);
671         Channels.write(ctx, Channels.future(ctx.getChannel()), spdyRstStreamFrame, remoteAddress);
672         if (fireMessageReceived) {
673             Channels.fireMessageReceived(ctx, spdyRstStreamFrame, remoteAddress);
674         }
675     }
676 
677     /*
678      * Helper functions
679      */
680 
681     private boolean isRemoteInitiatedID(int id) {
682         boolean serverID = isServerId(id);
683         return server && !serverID || !server && serverID;
684     }
685 
686     private void updateConcurrentStreams(int newConcurrentStreams, boolean remote) {
687         if (remote) {
688             remoteConcurrentStreams = newConcurrentStreams;
689         } else {
690             localConcurrentStreams = newConcurrentStreams;
691         }
692         if (localConcurrentStreams == remoteConcurrentStreams) {
693             maxConcurrentStreams = localConcurrentStreams;
694             return;
695         }
696         if (localConcurrentStreams == 0) {
697             maxConcurrentStreams = remoteConcurrentStreams;
698             return;
699         }
700         if (remoteConcurrentStreams == 0) {
701             maxConcurrentStreams = localConcurrentStreams;
702             return;
703         }
704         if (localConcurrentStreams > remoteConcurrentStreams) {
705             maxConcurrentStreams = remoteConcurrentStreams;
706         } else {
707             maxConcurrentStreams = localConcurrentStreams;
708         }
709     }
710 
711     // need to synchronize to prevent new streams from being created while updating active streams
712     private synchronized void updateInitialSendWindowSize(int newInitialWindowSize) {
713         int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
714         initialSendWindowSize = newInitialWindowSize;
715         for (Integer StreamID: spdySession.getActiveStreams()) {
716             spdySession.updateSendWindowSize(StreamID.intValue(), deltaWindowSize);
717         }
718     }
719 
720     // need to synchronize to prevent new streams from being created while updating active streams
721     private synchronized void updateInitialReceiveWindowSize(int newInitialWindowSize) {
722         int deltaWindowSize = newInitialWindowSize - initialReceiveWindowSize;
723         initialReceiveWindowSize = newInitialWindowSize;
724         spdySession.updateAllReceiveWindowSizes(deltaWindowSize);
725     }
726 
727     // need to synchronize accesses to sentGoAwayFrame, lastGoodStreamID, and initial window sizes
728     private synchronized boolean acceptStream(
729             int streamID, byte priority, boolean remoteSideClosed, boolean localSideClosed) {
730         // Cannot initiate any new streams after receiving or sending GOAWAY
731         if (receivedGoAwayFrame || sentGoAwayFrame) {
732             return false;
733         }
734 
735         int maxConcurrentStreams = this.maxConcurrentStreams; // read volatile once
736         if (maxConcurrentStreams != 0 &&
737            spdySession.numActiveStreams() >= maxConcurrentStreams) {
738             return false;
739         }
740         spdySession.acceptStream(
741                 streamID, priority, remoteSideClosed, localSideClosed,
742                 initialSendWindowSize, initialReceiveWindowSize);
743         if (isRemoteInitiatedID(streamID)) {
744             lastGoodStreamID = streamID;
745         }
746         return true;
747     }
748 
749     private void halfCloseStream(int streamID, boolean remote) {
750         if (remote) {
751             spdySession.closeRemoteSide(streamID);
752         } else {
753             spdySession.closeLocalSide(streamID);
754         }
755         if (closeSessionFuture != null && spdySession.noActiveStreams()) {
756             closeSessionFuture.setSuccess();
757         }
758     }
759 
760     private void removeStream(int streamID) {
761         spdySession.removeStream(streamID);
762         if (closeSessionFuture != null && spdySession.noActiveStreams()) {
763             closeSessionFuture.setSuccess();
764         }
765     }
766 
767     private void updateSendWindowSize(ChannelHandlerContext ctx, final int streamID, int deltaWindowSize) {
768         synchronized (flowControlLock) {
769             int newWindowSize = spdySession.updateSendWindowSize(streamID, deltaWindowSize);
770 
771             while (newWindowSize > 0) {
772                 // Check if we have unblocked a stalled stream
773                 MessageEvent e = spdySession.getPendingWrite(streamID);
774                 if (e == null) {
775                     break;
776                 }
777 
778                 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) e.getMessage();
779                 int dataFrameSize = spdyDataFrame.getData().readableBytes();
780 
781                 if (newWindowSize >= dataFrameSize) {
782                     // Window size is large enough to send entire data frame
783                     spdySession.removePendingWrite(streamID);
784                     newWindowSize = spdySession.updateSendWindowSize(streamID, -1 * dataFrameSize);
785 
786                     // The transfer window size is pre-decremented when sending a data frame downstream.
787                     // Close the stream on write failures that leaves the transfer window in a corrupt state.
788                     final SocketAddress remoteAddress = e.getRemoteAddress();
789                     final ChannelHandlerContext context = ctx;
790                     e.getFuture().addListener(new ChannelFutureListener() {
791                         public void operationComplete(ChannelFuture future) throws Exception {
792                             if (!future.isSuccess()) {
793                                 issueStreamError(context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
794                             }
795                         }
796                     });
797 
798                     // Close the local side of the stream if this is the last frame
799                     if (spdyDataFrame.isLast()) {
800                         halfCloseStream(streamID, false);
801                     }
802 
803                     Channels.write(ctx, e.getFuture(), spdyDataFrame, e.getRemoteAddress());
804 
805                 } else {
806                     // We can send a partial frame
807                     spdySession.updateSendWindowSize(streamID, -1 * newWindowSize);
808 
809                     // Create a partial data frame whose length is the current window size
810                     SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
811                     partialDataFrame.setData(spdyDataFrame.getData().readSlice(newWindowSize));
812 
813                     ChannelFuture writeFuture = Channels.future(e.getChannel());
814 
815                     // The transfer window size is pre-decremented when sending a data frame downstream.
816                     // Close the stream on write failures that leaves the transfer window in a corrupt state.
817                     final SocketAddress remoteAddress = e.getRemoteAddress();
818                     final ChannelHandlerContext context = ctx;
819                     e.getFuture().addListener(new ChannelFutureListener() {
820                         public void operationComplete(ChannelFuture future) throws Exception {
821                             if (!future.isSuccess()) {
822                                 issueStreamError(context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
823                             }
824                         }
825                     });
826 
827                     Channels.write(ctx, writeFuture, partialDataFrame, remoteAddress);
828 
829                     newWindowSize = 0;
830                 }
831             }
832         }
833     }
834 
835     private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelStateEvent e) {
836         // Avoid NotYetConnectedException
837         if (!e.getChannel().isConnected()) {
838             ctx.sendDownstream(e);
839             return;
840         }
841 
842         ChannelFuture future = sendGoAwayFrame(ctx, e.getChannel(), null, SpdySessionStatus.OK);
843         if (spdySession.noActiveStreams()) {
844             future.addListener(new ClosingChannelFutureListener(ctx, e));
845         } else {
846             closeSessionFuture = Channels.future(e.getChannel());
847             closeSessionFuture.addListener(new ClosingChannelFutureListener(ctx, e));
848         }
849     }
850 
851     private synchronized ChannelFuture sendGoAwayFrame(
852             ChannelHandlerContext ctx, Channel channel, SocketAddress remoteAddress, SpdySessionStatus status) {
853         if (!sentGoAwayFrame) {
854             sentGoAwayFrame = true;
855             SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamID, status);
856             ChannelFuture future = Channels.future(channel);
857             Channels.write(ctx, future, spdyGoAwayFrame, remoteAddress);
858             return future;
859         }
860         return Channels.succeededFuture(channel);
861     }
862 
863     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
864         private final ChannelHandlerContext ctx;
865         private final ChannelStateEvent e;
866 
867         ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelStateEvent e) {
868             this.ctx = ctx;
869             this.e = e;
870         }
871 
872         public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
873             if (!(sentGoAwayFuture.getCause() instanceof ClosedChannelException)) {
874                 Channels.close(ctx, e.getFuture());
875             } else {
876                 e.getFuture().setSuccess();
877             }
878         }
879     }
880 }