View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.spdy;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelDownstreamHandler;
20  import org.jboss.netty.channel.ChannelEvent;
21  import org.jboss.netty.channel.ChannelFuture;
22  import org.jboss.netty.channel.ChannelFutureListener;
23  import org.jboss.netty.channel.ChannelHandlerContext;
24  import org.jboss.netty.channel.ChannelStateEvent;
25  import org.jboss.netty.channel.Channels;
26  import org.jboss.netty.channel.ExceptionEvent;
27  import org.jboss.netty.channel.MessageEvent;
28  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
29  
30  import java.net.SocketAddress;
31  import java.nio.channels.ClosedChannelException;
32  import java.util.concurrent.atomic.AtomicInteger;
33  
34  import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*;
35  
36  /**
37   * Manages streams within a SPDY session.
38   */
39  public class SpdySessionHandler extends SimpleChannelUpstreamHandler
40          implements ChannelDownstreamHandler {
41  
42      private static final SpdyProtocolException PROTOCOL_EXCEPTION = new SpdyProtocolException();
43  
44      private final SpdySession spdySession = new SpdySession();
45      private volatile int lastGoodStreamID;
46  
47      private volatile int remoteConcurrentStreams;
48      private volatile int localConcurrentStreams;
49      private volatile int maxConcurrentStreams;
50  
51      private static final int DEFAULT_WINDOW_SIZE = 64 * 1024; // 64 KB default initial window size
52      private volatile int initialSendWindowSize = DEFAULT_WINDOW_SIZE;
53      private volatile int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
54  
55      private final Object flowControlLock = new Object();
56  
57      private final AtomicInteger pings = new AtomicInteger();
58  
59      private volatile boolean sentGoAwayFrame;
60      private volatile boolean receivedGoAwayFrame;
61  
62      private volatile ChannelFuture closeSessionFuture;
63  
64      private final boolean server;
65      private final boolean flowControl;
66  
67      /**
68       * Creates a new SPDY/2 session handler.
69       *
70       * @param server {@code true} if and only if this session handler should
71       *               handle the server endpoint of the connection.
72       *               {@code false} if and only if this session handler should
73       *               handle the client endpoint of the connection.
74       */
75      @Deprecated
76      public SpdySessionHandler(boolean server) {
77          this(2, server);
78      }
79  
80      /**
81       * Creates a new session handler.
82       *
83       * @param version the protocol version
84       * @param server  {@code true} if and only if this session handler should
85       *                handle the server endpoint of the connection.
86       *                {@code false} if and only if this session handler should
87       *                handle the client endpoint of the connection.
88       */
89      public SpdySessionHandler(int version, boolean server) {
90          if (version < SPDY_MIN_VERSION || version > SPDY_MAX_VERSION) {
91              throw new IllegalArgumentException(
92                      "unsupported version: " + version);
93          }
94          this.server = server;
95          flowControl = version >= 3;
96      }
97  
98      @Override
99      public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
100             throws Exception {
101 
102         Object msg = e.getMessage();
103         if (msg instanceof SpdyDataFrame) {
104 
105             /*
106              * SPDY Data frame processing requirements:
107              *
108              * If an endpoint receives a data frame for a Stream-ID which is not open
109              * and the endpoint has not sent a GOAWAY frame, it must issue a stream error
110              * with the error code INVALID_STREAM for the Stream-ID.
111              *
112              * If an endpoint which created the stream receives a data frame before receiving
113              * a SYN_REPLY on that stream, it is a protocol error, and the recipient must
114              * issue a stream error with the status code PROTOCOL_ERROR for the Stream-ID.
115              *
116              * If an endpoint receives multiple data frames for invalid Stream-IDs,
117              * it may close the session.
118              *
119              * If an endpoint refuses a stream it must ignore any data frames for that stream.
120              *
121              * If an endpoint receives a data frame after the stream is half-closed from the
122              * sender, it must send a RST_STREAM frame with the status STREAM_ALREADY_CLOSED.
123              *
124              * If an endpoint receives a data frame after the stream is closed, it must send
125              * a RST_STREAM frame with the status PROTOCOL_ERROR.
126              */
127 
128             SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
129             int streamID = spdyDataFrame.getStreamId();
130 
131             // Check if we received a data frame for a Stream-ID which is not open
132             if (!spdySession.isActiveStream(streamID)) {
133                 if (streamID <= lastGoodStreamID) {
134                     issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
135                 } else if (!sentGoAwayFrame) {
136                     issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.INVALID_STREAM);
137                 }
138                 return;
139             }
140 
141             // Check if we received a data frame for a stream which is half-closed
142             if (spdySession.isRemoteSideClosed(streamID)) {
143                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.STREAM_ALREADY_CLOSED);
144                 return;
145             }
146 
147             // Check if we received a data frame before receiving a SYN_REPLY
148             if (!isRemoteInitiatedID(streamID) && !spdySession.hasReceivedReply(streamID)) {
149                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
150                 return;
151             }
152 
153             /*
154             * SPDY Data frame flow control processing requirements:
155             *
156             * Recipient should not send a WINDOW_UPDATE frame as it consumes the last data frame.
157             */
158 
159             if (flowControl) {
160                 // Update receive window size
161                 int deltaWindowSize = -1 * spdyDataFrame.getData().readableBytes();
162                 int newWindowSize = spdySession.updateReceiveWindowSize(streamID, deltaWindowSize);
163 
164                 // Window size can become negative if we sent a SETTINGS frame that reduces the
165                 // size of the transfer window after the peer has written data frames.
166                 // The value is bounded by the length that SETTINGS frame decrease the window.
167                 // This difference is stored for the session when writing the SETTINGS frame
168                 // and is cleared once we send a WINDOW_UPDATE frame.
169                 if (newWindowSize < spdySession.getReceiveWindowSizeLowerBound(streamID)) {
170                     issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.FLOW_CONTROL_ERROR);
171                     return;
172                 }
173 
174                 // Window size became negative due to sender writing frame before receiving SETTINGS
175                 // Send data frames upstream in initialReceiveWindowSize chunks
176                 if (newWindowSize < 0) {
177                     while (spdyDataFrame.getData().readableBytes() > initialReceiveWindowSize) {
178                         SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
179                         partialDataFrame.setData(spdyDataFrame.getData().readSlice(initialReceiveWindowSize));
180                         Channels.fireMessageReceived(ctx, partialDataFrame, e.getRemoteAddress());
181                     }
182                 }
183 
184                 // Send a WINDOW_UPDATE frame if less than half the window size remains
185                 if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) {
186                     deltaWindowSize = initialReceiveWindowSize - newWindowSize;
187                     spdySession.updateReceiveWindowSize(streamID, deltaWindowSize);
188                     SpdyWindowUpdateFrame spdyWindowUpdateFrame =
189                             new DefaultSpdyWindowUpdateFrame(streamID, deltaWindowSize);
190                     Channels.write(
191                             ctx, Channels.future(e.getChannel()), spdyWindowUpdateFrame, e.getRemoteAddress());
192                 }
193             }
194 
195             // Close the remote side of the stream if this is the last frame
196             if (spdyDataFrame.isLast()) {
197                 halfCloseStream(streamID, true);
198             }
199 
200         } else if (msg instanceof SpdySynStreamFrame) {
201 
202             /*
203              * SPDY SYN_STREAM frame processing requirements:
204              *
205              * If an endpoint receives a SYN_STREAM with a Stream-ID that is less than
206              * any previously received SYN_STREAM, it must issue a session error with
207              * the status PROTOCOL_ERROR.
208              *
209              * If an endpoint receives multiple SYN_STREAM frames with the same active
210              * Stream-ID, it must issue a stream error with the status code PROTOCOL_ERROR.
211              *
212              * The recipient can reject a stream by sending a stream error with the
213              * status code REFUSED_STREAM.
214              */
215 
216             SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
217             int streamID = spdySynStreamFrame.getStreamId();
218 
219             // Check if we received a valid SYN_STREAM frame
220             if (spdySynStreamFrame.isInvalid() ||
221                 !isRemoteInitiatedID(streamID) ||
222                 spdySession.isActiveStream(streamID)) {
223                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
224                 return;
225             }
226 
227             // Stream-IDs must be monotonically increasing
228             if (streamID <= lastGoodStreamID) {
229                 issueSessionError(ctx, e.getChannel(), e.getRemoteAddress(), SpdySessionStatus.PROTOCOL_ERROR);
230                 return;
231             }
232 
233             // Try to accept the stream
234             byte priority = spdySynStreamFrame.getPriority();
235             boolean remoteSideClosed = spdySynStreamFrame.isLast();
236             boolean localSideClosed = spdySynStreamFrame.isUnidirectional();
237             if (!acceptStream(streamID, priority, remoteSideClosed, localSideClosed)) {
238                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.REFUSED_STREAM);
239                 return;
240             }
241 
242         } else if (msg instanceof SpdySynReplyFrame) {
243 
244             /*
245              * SPDY SYN_REPLY frame processing requirements:
246              *
247              * If an endpoint receives multiple SYN_REPLY frames for the same active Stream-ID
248              * it must issue a stream error with the status code STREAM_IN_USE.
249              */
250 
251             SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
252             int streamID = spdySynReplyFrame.getStreamId();
253 
254             // Check if we received a valid SYN_REPLY frame
255             if (spdySynReplyFrame.isInvalid() ||
256                 isRemoteInitiatedID(streamID) ||
257                 spdySession.isRemoteSideClosed(streamID)) {
258                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.INVALID_STREAM);
259                 return;
260             }
261 
262             // Check if we have received multiple frames for the same Stream-ID
263             if (spdySession.hasReceivedReply(streamID)) {
264                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.STREAM_IN_USE);
265                 return;
266             }
267 
268             spdySession.receivedReply(streamID);
269 
270             // Close the remote side of the stream if this is the last frame
271             if (spdySynReplyFrame.isLast()) {
272                 halfCloseStream(streamID, true);
273             }
274 
275         } else if (msg instanceof SpdyRstStreamFrame) {
276 
277             /*
278              * SPDY RST_STREAM frame processing requirements:
279              *
280              * After receiving a RST_STREAM on a stream, the receiver must not send
281              * additional frames on that stream.
282              *
283              * An endpoint must not send a RST_STREAM in response to a RST_STREAM.
284              */
285 
286             SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
287             removeStream(spdyRstStreamFrame.getStreamId());
288 
289         } else if (msg instanceof SpdySettingsFrame) {
290 
291             SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
292 
293             int newConcurrentStreams =
294                 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
295             if (newConcurrentStreams >= 0) {
296                 updateConcurrentStreams(newConcurrentStreams, true);
297             }
298 
299             // Persistence flag are inconsistent with the use of SETTINGS to communicate
300             // the initial window size. Remove flags from the sender requesting that the
301             // value be persisted. Remove values that the sender indicates are persisted.
302             if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
303                 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
304             }
305             spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
306 
307             if (flowControl) {
308                 int newInitialWindowSize =
309                     spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
310                 if (newInitialWindowSize >= 0) {
311                     updateInitialSendWindowSize(newInitialWindowSize);
312                 }
313             }
314 
315         } else if (msg instanceof SpdyPingFrame) {
316 
317             /*
318              * SPDY PING frame processing requirements:
319              *
320              * Receivers of a PING frame should send an identical frame to the sender
321              * as soon as possible.
322              *
323              * Receivers of a PING frame must ignore frames that it did not initiate
324              */
325 
326             SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
327 
328             if (isRemoteInitiatedID(spdyPingFrame.getId())) {
329                 Channels.write(ctx, Channels.future(e.getChannel()), spdyPingFrame, e.getRemoteAddress());
330                 return;
331             }
332 
333             // Note: only checks that there are outstanding pings since uniqueness is not enforced
334             if (pings.get() == 0) {
335                 return;
336             }
337             pings.getAndDecrement();
338 
339         } else if (msg instanceof SpdyGoAwayFrame) {
340 
341             receivedGoAwayFrame = true;
342 
343         } else if (msg instanceof SpdyHeadersFrame) {
344 
345             SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
346             int streamID = spdyHeadersFrame.getStreamId();
347 
348             // Check if we received a valid HEADERS frame
349             if (spdyHeadersFrame.isInvalid()) {
350                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
351                 return;
352             }
353 
354             if (spdySession.isRemoteSideClosed(streamID)) {
355                 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.INVALID_STREAM);
356                 return;
357             }
358 
359             // Close the remote side of the stream if this is the last frame
360             if (spdyHeadersFrame.isLast()) {
361                 halfCloseStream(streamID, true);
362             }
363 
364         } else if (msg instanceof SpdyWindowUpdateFrame) {
365 
366             /*
367              * SPDY WINDOW_UPDATE frame processing requirements:
368              *
369              * Receivers of a WINDOW_UPDATE that cause the window size to exceed 2^31
370              * must send a RST_STREAM with the status code FLOW_CONTROL_ERROR.
371              *
372              * Sender should ignore all WINDOW_UPDATE frames associated with a stream
373              * after sending the last frame for the stream.
374              */
375 
376             if (flowControl) {
377                 SpdyWindowUpdateFrame spdyWindowUpdateFrame = (SpdyWindowUpdateFrame) msg;
378                 int streamID = spdyWindowUpdateFrame.getStreamId();
379                 int deltaWindowSize = spdyWindowUpdateFrame.getDeltaWindowSize();
380 
381                 // Ignore frames for half-closed streams
382                 if (spdySession.isLocalSideClosed(streamID)) {
383                     return;
384                 }
385 
386                 // Check for numerical overflow
387                 if (spdySession.getSendWindowSize(streamID) > Integer.MAX_VALUE - deltaWindowSize) {
388                     issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.FLOW_CONTROL_ERROR);
389                     return;
390                 }
391 
392                 updateSendWindowSize(ctx, streamID, deltaWindowSize);
393             }
394             return;
395         }
396 
397         super.messageReceived(ctx, e);
398     }
399 
400     @Override
401     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
402             throws Exception {
403 
404         Throwable cause = e.getCause();
405         if (cause instanceof SpdyProtocolException) {
406             issueSessionError(ctx, e.getChannel(), null, SpdySessionStatus.PROTOCOL_ERROR);
407         }
408 
409         super.exceptionCaught(ctx, e);
410     }
411 
412     public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
413             throws Exception {
414         if (evt instanceof ChannelStateEvent) {
415             ChannelStateEvent e = (ChannelStateEvent) evt;
416             switch (e.getState()) {
417             case OPEN:
418             case CONNECTED:
419             case BOUND:
420 
421                 /*
422                  * SPDY connection requirements:
423                  *
424                  * When either endpoint closes the transport-level connection,
425                  * it must first send a GOAWAY frame.
426                  */
427                 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
428                     sendGoAwayFrame(ctx, e);
429                     return;
430                 }
431             }
432         }
433         if (!(evt instanceof MessageEvent)) {
434             ctx.sendDownstream(evt);
435             return;
436         }
437 
438         MessageEvent e = (MessageEvent) evt;
439         Object msg = e.getMessage();
440 
441         if (msg instanceof SpdyDataFrame) {
442 
443             SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
444             final int streamID = spdyDataFrame.getStreamId();
445 
446             // Frames must not be sent on half-closed streams
447             if (spdySession.isLocalSideClosed(streamID)) {
448                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
449                 return;
450             }
451 
452             /*
453              * SPDY Data frame flow control processing requirements:
454              *
455              * Sender must not send a data frame with data length greater
456              * than the transfer window size.
457              *
458              * After sending each data frame, the sender decrements its
459              * transfer window size by the amount of data transmitted.
460              *
461              * When the window size becomes less than or equal to 0, the
462              * sender must pause transmitting data frames.
463              */
464 
465             if (flowControl) {
466                 synchronized (flowControlLock) {
467                     int dataLength = spdyDataFrame.getData().readableBytes();
468                     int sendWindowSize = spdySession.getSendWindowSize(streamID);
469 
470                     if (sendWindowSize <= 0) {
471                         // Stream is stalled -- enqueue Data frame and return
472                         spdySession.putPendingWrite(streamID, e);
473                         return;
474                     } else if (sendWindowSize < dataLength) {
475                         // Stream is not stalled but we cannot send the entire frame
476                         spdySession.updateSendWindowSize(streamID, -1 * sendWindowSize);
477 
478                         // Create a partial data frame whose length is the current window size
479                         SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
480                         partialDataFrame.setData(spdyDataFrame.getData().readSlice(sendWindowSize));
481 
482                         // Enqueue the remaining data (will be the first frame queued)
483                         spdySession.putPendingWrite(streamID, e);
484 
485                         ChannelFuture writeFuture = Channels.future(e.getChannel());
486 
487                         // The transfer window size is pre-decremented when sending a data frame downstream.
488                         // Close the stream on write failures that leaves the transfer window in a corrupt state.
489                         final SocketAddress remoteAddress = e.getRemoteAddress();
490                         final ChannelHandlerContext context = ctx;
491                         e.getFuture().addListener(new ChannelFutureListener() {
492                             public void operationComplete(ChannelFuture future) throws Exception {
493                                 if (!future.isSuccess()) {
494                                     issueStreamError(
495                                             context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
496                                 }
497                             }
498                         });
499 
500                         Channels.write(ctx, writeFuture, partialDataFrame, remoteAddress);
501                         return;
502                     } else {
503                         // Window size is large enough to send entire data frame
504                         spdySession.updateSendWindowSize(streamID, -1 * dataLength);
505 
506                         // The transfer window size is pre-decremented when sending a data frame downstream.
507                         // Close the stream on write failures that leaves the transfer window in a corrupt state.
508                         final SocketAddress remoteAddress = e.getRemoteAddress();
509                         final ChannelHandlerContext context = ctx;
510                         e.getFuture().addListener(new ChannelFutureListener() {
511                             public void operationComplete(ChannelFuture future) throws Exception {
512                                 if (!future.isSuccess()) {
513                                     issueStreamError(
514                                             context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
515                                 }
516                             }
517                         });
518                     }
519                 }
520             }
521 
522             // Close the local side of the stream if this is the last frame
523             if (spdyDataFrame.isLast()) {
524                 halfCloseStream(streamID, false);
525             }
526 
527         } else if (msg instanceof SpdySynStreamFrame) {
528 
529             SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
530             int streamID = spdySynStreamFrame.getStreamId();
531 
532             if (isRemoteInitiatedID(streamID)) {
533                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
534                 return;
535             }
536 
537             byte priority = spdySynStreamFrame.getPriority();
538             boolean remoteSideClosed = spdySynStreamFrame.isUnidirectional();
539             boolean localSideClosed = spdySynStreamFrame.isLast();
540             if (!acceptStream(streamID, priority, remoteSideClosed, localSideClosed)) {
541                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
542                 return;
543             }
544 
545         } else if (msg instanceof SpdySynReplyFrame) {
546 
547             SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
548             int streamID = spdySynReplyFrame.getStreamId();
549 
550             // Frames must not be sent on half-closed streams
551             if (!isRemoteInitiatedID(streamID) || spdySession.isLocalSideClosed(streamID)) {
552                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
553                 return;
554             }
555 
556             // Close the local side of the stream if this is the last frame
557             if (spdySynReplyFrame.isLast()) {
558                 halfCloseStream(streamID, false);
559             }
560 
561         } else if (msg instanceof SpdyRstStreamFrame) {
562 
563             SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
564             removeStream(spdyRstStreamFrame.getStreamId());
565 
566         } else if (msg instanceof SpdySettingsFrame) {
567 
568             SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
569 
570             int newConcurrentStreams =
571                     spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
572             if (newConcurrentStreams >= 0) {
573                 updateConcurrentStreams(newConcurrentStreams, false);
574             }
575 
576             // Persistence flag are inconsistent with the use of SETTINGS to communicate
577             // the initial window size. Remove flags from the sender requesting that the
578             // value be persisted. Remove values that the sender indicates are persisted.
579             if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
580                 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
581             }
582             spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
583 
584             if (flowControl) {
585                 int newInitialWindowSize =
586                         spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
587                 if (newInitialWindowSize >= 0) {
588                     updateInitialReceiveWindowSize(newInitialWindowSize);
589                 }
590             }
591 
592         } else if (msg instanceof SpdyPingFrame) {
593 
594             SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
595             if (isRemoteInitiatedID(spdyPingFrame.getId())) {
596                 e.getFuture().setFailure(new IllegalArgumentException(
597                             "invalid PING ID: " + spdyPingFrame.getId()));
598                 return;
599             }
600             pings.getAndIncrement();
601 
602         } else if (msg instanceof SpdyGoAwayFrame) {
603 
604             // Why is this being sent? Intercept it and fail the write.
605             // Should have sent a CLOSE ChannelStateEvent
606             e.getFuture().setFailure(PROTOCOL_EXCEPTION);
607             return;
608 
609         } else if (msg instanceof SpdyHeadersFrame) {
610 
611             SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
612             int streamID = spdyHeadersFrame.getStreamId();
613 
614             // Frames must not be sent on half-closed streams
615             if (spdySession.isLocalSideClosed(streamID)) {
616                 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
617                 return;
618             }
619 
620             // Close the local side of the stream if this is the last frame
621             if (spdyHeadersFrame.isLast()) {
622                 halfCloseStream(streamID, false);
623             }
624 
625         } else if (msg instanceof SpdyWindowUpdateFrame) {
626 
627             // Why is this being sent? Intercept it and fail the write.
628             e.getFuture().setFailure(PROTOCOL_EXCEPTION);
629             return;
630         }
631 
632         ctx.sendDownstream(evt);
633     }
634 
635     /*
636      * SPDY Session Error Handling:
637      *
638      * When a session error occurs, the endpoint encountering the error must first
639      * send a GOAWAY frame with the Stream-ID of the most recently received stream
640      * from the remote endpoint, and the error code for why the session is terminating.
641      *
642      * After sending the GOAWAY frame, the endpoint must close the TCP connection.
643      */
644     private void issueSessionError(
645             ChannelHandlerContext ctx, Channel channel, SocketAddress remoteAddress, SpdySessionStatus status) {
646 
647         ChannelFuture future = sendGoAwayFrame(ctx, channel, remoteAddress, status);
648         future.addListener(ChannelFutureListener.CLOSE);
649     }
650 
651     /*
652      * SPDY Stream Error Handling:
653      *
654      * Upon a stream error, the endpoint must send a RST_STREAM frame which contains
655      * the Stream-ID for the stream where the error occurred and the error status which
656      * caused the error.
657      *
658      * After sending the RST_STREAM, the stream is closed to the sending endpoint.
659      *
660      * Note: this is only called by the worker thread
661      */
662     private void issueStreamError(
663             ChannelHandlerContext ctx, SocketAddress remoteAddress, int streamID, SpdyStreamStatus status) {
664 
665         boolean fireMessageReceived = !spdySession.isRemoteSideClosed(streamID);
666         removeStream(streamID);
667 
668         SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamID, status);
669         Channels.write(ctx, Channels.future(ctx.getChannel()), spdyRstStreamFrame, remoteAddress);
670         if (fireMessageReceived) {
671             Channels.fireMessageReceived(ctx, spdyRstStreamFrame, remoteAddress);
672         }
673     }
674 
675     /*
676      * Helper functions
677      */
678 
679     private boolean isRemoteInitiatedID(int id) {
680         boolean serverID = isServerId(id);
681         return server && !serverID || !server && serverID;
682     }
683 
684     private void updateConcurrentStreams(int newConcurrentStreams, boolean remote) {
685         if (remote) {
686             remoteConcurrentStreams = newConcurrentStreams;
687         } else {
688             localConcurrentStreams = newConcurrentStreams;
689         }
690         if (localConcurrentStreams == remoteConcurrentStreams) {
691             maxConcurrentStreams = localConcurrentStreams;
692             return;
693         }
694         if (localConcurrentStreams == 0) {
695             maxConcurrentStreams = remoteConcurrentStreams;
696             return;
697         }
698         if (remoteConcurrentStreams == 0) {
699             maxConcurrentStreams = localConcurrentStreams;
700             return;
701         }
702         if (localConcurrentStreams > remoteConcurrentStreams) {
703             maxConcurrentStreams = remoteConcurrentStreams;
704         } else {
705             maxConcurrentStreams = localConcurrentStreams;
706         }
707     }
708 
709     // need to synchronize to prevent new streams from being created while updating active streams
710     private synchronized void updateInitialSendWindowSize(int newInitialWindowSize) {
711         int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
712         initialSendWindowSize = newInitialWindowSize;
713         for (Integer StreamID: spdySession.getActiveStreams()) {
714             spdySession.updateSendWindowSize(StreamID.intValue(), deltaWindowSize);
715         }
716     }
717 
718     // need to synchronize to prevent new streams from being created while updating active streams
719     private synchronized void updateInitialReceiveWindowSize(int newInitialWindowSize) {
720         int deltaWindowSize = newInitialWindowSize - initialReceiveWindowSize;
721         initialReceiveWindowSize = newInitialWindowSize;
722         spdySession.updateAllReceiveWindowSizes(deltaWindowSize);
723     }
724 
725     // need to synchronize accesses to sentGoAwayFrame, lastGoodStreamID, and initial window sizes
726     private synchronized boolean acceptStream(
727             int streamID, byte priority, boolean remoteSideClosed, boolean localSideClosed) {
728         // Cannot initiate any new streams after receiving or sending GOAWAY
729         if (receivedGoAwayFrame || sentGoAwayFrame) {
730             return false;
731         }
732 
733         int maxConcurrentStreams = this.maxConcurrentStreams; // read volatile once
734         if (maxConcurrentStreams != 0 &&
735            spdySession.numActiveStreams() >= maxConcurrentStreams) {
736             return false;
737         }
738         spdySession.acceptStream(
739                 streamID, priority, remoteSideClosed, localSideClosed,
740                 initialSendWindowSize, initialReceiveWindowSize);
741         if (isRemoteInitiatedID(streamID)) {
742             lastGoodStreamID = streamID;
743         }
744         return true;
745     }
746 
747     private void halfCloseStream(int streamID, boolean remote) {
748         if (remote) {
749             spdySession.closeRemoteSide(streamID);
750         } else {
751             spdySession.closeLocalSide(streamID);
752         }
753         if (closeSessionFuture != null && spdySession.noActiveStreams()) {
754             closeSessionFuture.setSuccess();
755         }
756     }
757 
758     private void removeStream(int streamID) {
759         spdySession.removeStream(streamID);
760         if (closeSessionFuture != null && spdySession.noActiveStreams()) {
761             closeSessionFuture.setSuccess();
762         }
763     }
764 
765     private void updateSendWindowSize(ChannelHandlerContext ctx, final int streamID, int deltaWindowSize) {
766         synchronized (flowControlLock) {
767             int newWindowSize = spdySession.updateSendWindowSize(streamID, deltaWindowSize);
768 
769             while (newWindowSize > 0) {
770                 // Check if we have unblocked a stalled stream
771                 MessageEvent e = spdySession.getPendingWrite(streamID);
772                 if (e == null) {
773                     break;
774                 }
775 
776                 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) e.getMessage();
777                 int dataFrameSize = spdyDataFrame.getData().readableBytes();
778 
779                 if (newWindowSize >= dataFrameSize) {
780                     // Window size is large enough to send entire data frame
781                     spdySession.removePendingWrite(streamID);
782                     newWindowSize = spdySession.updateSendWindowSize(streamID, -1 * dataFrameSize);
783 
784                     // The transfer window size is pre-decremented when sending a data frame downstream.
785                     // Close the stream on write failures that leaves the transfer window in a corrupt state.
786                     final SocketAddress remoteAddress = e.getRemoteAddress();
787                     final ChannelHandlerContext context = ctx;
788                     e.getFuture().addListener(new ChannelFutureListener() {
789                         public void operationComplete(ChannelFuture future) throws Exception {
790                             if (!future.isSuccess()) {
791                                 issueStreamError(context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
792                             }
793                         }
794                     });
795 
796                     // Close the local side of the stream if this is the last frame
797                     if (spdyDataFrame.isLast()) {
798                         halfCloseStream(streamID, false);
799                     }
800 
801                     Channels.write(ctx, e.getFuture(), spdyDataFrame, e.getRemoteAddress());
802                 } else {
803                     // We can send a partial frame
804                     spdySession.updateSendWindowSize(streamID, -1 * newWindowSize);
805 
806                     // Create a partial data frame whose length is the current window size
807                     SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
808                     partialDataFrame.setData(spdyDataFrame.getData().readSlice(newWindowSize));
809 
810                     ChannelFuture writeFuture = Channels.future(e.getChannel());
811 
812                     // The transfer window size is pre-decremented when sending a data frame downstream.
813                     // Close the stream on write failures that leaves the transfer window in a corrupt state.
814                     final SocketAddress remoteAddress = e.getRemoteAddress();
815                     final ChannelHandlerContext context = ctx;
816                     e.getFuture().addListener(new ChannelFutureListener() {
817                         public void operationComplete(ChannelFuture future) throws Exception {
818                             if (!future.isSuccess()) {
819                                 issueStreamError(context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
820                             }
821                         }
822                     });
823 
824                     Channels.write(ctx, writeFuture, partialDataFrame, remoteAddress);
825 
826                     newWindowSize = 0;
827                 }
828             }
829         }
830     }
831 
832     private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelStateEvent e) {
833         // Avoid NotYetConnectedException
834         if (!e.getChannel().isConnected()) {
835             ctx.sendDownstream(e);
836             return;
837         }
838 
839         ChannelFuture future = sendGoAwayFrame(ctx, e.getChannel(), null, SpdySessionStatus.OK);
840         if (spdySession.noActiveStreams()) {
841             future.addListener(new ClosingChannelFutureListener(ctx, e));
842         } else {
843             closeSessionFuture = Channels.future(e.getChannel());
844             closeSessionFuture.addListener(new ClosingChannelFutureListener(ctx, e));
845         }
846     }
847 
848     private synchronized ChannelFuture sendGoAwayFrame(
849             ChannelHandlerContext ctx, Channel channel, SocketAddress remoteAddress, SpdySessionStatus status) {
850         if (!sentGoAwayFrame) {
851             sentGoAwayFrame = true;
852             SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamID, status);
853             ChannelFuture future = Channels.future(channel);
854             Channels.write(ctx, future, spdyGoAwayFrame, remoteAddress);
855             return future;
856         }
857         return Channels.succeededFuture(channel);
858     }
859 
860     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
861         private final ChannelHandlerContext ctx;
862         private final ChannelStateEvent e;
863 
864         ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelStateEvent e) {
865             this.ctx = ctx;
866             this.e = e;
867         }
868 
869         public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
870             if (!(sentGoAwayFuture.getCause() instanceof ClosedChannelException)) {
871                 Channels.close(ctx, e.getFuture());
872             } else {
873                 e.getFuture().setSuccess();
874             }
875         }
876     }
877 }