1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.spdy;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelDownstreamHandler;
20 import org.jboss.netty.channel.ChannelEvent;
21 import org.jboss.netty.channel.ChannelFuture;
22 import org.jboss.netty.channel.ChannelFutureListener;
23 import org.jboss.netty.channel.ChannelHandlerContext;
24 import org.jboss.netty.channel.ChannelStateEvent;
25 import org.jboss.netty.channel.Channels;
26 import org.jboss.netty.channel.ExceptionEvent;
27 import org.jboss.netty.channel.MessageEvent;
28 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
29
30 import java.net.SocketAddress;
31 import java.nio.channels.ClosedChannelException;
32 import java.util.concurrent.atomic.AtomicInteger;
33
34 import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*;
35
36
37
38
39 public class SpdySessionHandler extends SimpleChannelUpstreamHandler
40 implements ChannelDownstreamHandler {
41
42 private static final SpdyProtocolException PROTOCOL_EXCEPTION = new SpdyProtocolException();
43
44 private static final int DEFAULT_WINDOW_SIZE = 64 * 1024;
45 private volatile int initialSendWindowSize = DEFAULT_WINDOW_SIZE;
46 private volatile int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
47 private volatile int initialSessionReceiveWindowSize = DEFAULT_WINDOW_SIZE;
48
49 private final SpdySession spdySession = new SpdySession(initialSendWindowSize, initialReceiveWindowSize);
50 private volatile int lastGoodStreamId;
51
52 private static final int DEFAULT_MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE;
53 private volatile int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
54 private volatile int localConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
55
56 private final Object flowControlLock = new Object();
57
58 private final AtomicInteger pings = new AtomicInteger();
59
60 private volatile boolean sentGoAwayFrame;
61 private volatile boolean receivedGoAwayFrame;
62
63 private volatile ChannelFutureListener closeSessionFutureListener;
64
65 private final boolean server;
66 private final int minorVersion;
67
68
69
70
71
72
73
74
75
76
77 public SpdySessionHandler(SpdyVersion spdyVersion, boolean server) {
78 if (spdyVersion == null) {
79 throw new NullPointerException("spdyVersion");
80 }
81 this.server = server;
82 minorVersion = spdyVersion.getMinorVersion();
83 }
84
85 public void setSessionReceiveWindowSize(int sessionReceiveWindowSize) {
86 if (sessionReceiveWindowSize < 0) {
87 throw new IllegalArgumentException("sessionReceiveWindowSize");
88 }
89
90
91
92
93
94
95 initialSessionReceiveWindowSize = sessionReceiveWindowSize;
96 }
97
98 @Override
99 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
100 throws Exception {
101
102 Object msg = e.getMessage();
103 if (msg instanceof SpdyDataFrame) {
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
129 int streamId = spdyDataFrame.getStreamId();
130
131 int deltaWindowSize = -1 * spdyDataFrame.getData().readableBytes();
132 int newSessionWindowSize =
133 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, deltaWindowSize);
134
135
136 if (newSessionWindowSize < 0) {
137 issueSessionError(ctx, e.getChannel(), e.getRemoteAddress(), SpdySessionStatus.PROTOCOL_ERROR);
138 return;
139 }
140
141
142 if (newSessionWindowSize <= initialSessionReceiveWindowSize / 2) {
143 int sessionDeltaWindowSize = initialSessionReceiveWindowSize - newSessionWindowSize;
144 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
145 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
146 new DefaultSpdyWindowUpdateFrame(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
147 Channels.write(
148 ctx, Channels.future(e.getChannel()), spdyWindowUpdateFrame, e.getRemoteAddress());
149 }
150
151
152 if (!spdySession.isActiveStream(streamId)) {
153 if (streamId <= lastGoodStreamId) {
154 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.PROTOCOL_ERROR);
155 } else if (!sentGoAwayFrame) {
156 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.INVALID_STREAM);
157 }
158 return;
159 }
160
161
162 if (spdySession.isRemoteSideClosed(streamId)) {
163 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.STREAM_ALREADY_CLOSED);
164 return;
165 }
166
167
168 if (!isRemoteInitiatedId(streamId) && !spdySession.hasReceivedReply(streamId)) {
169 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.PROTOCOL_ERROR);
170 return;
171 }
172
173
174
175
176
177
178
179
180 int newWindowSize = spdySession.updateReceiveWindowSize(streamId, deltaWindowSize);
181
182
183
184
185
186
187 if (newWindowSize < spdySession.getReceiveWindowSizeLowerBound(streamId)) {
188 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
189 return;
190 }
191
192
193
194 if (newWindowSize < 0) {
195 while (spdyDataFrame.getData().readableBytes() > initialReceiveWindowSize) {
196 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId);
197 partialDataFrame.setData(spdyDataFrame.getData().readSlice(initialReceiveWindowSize));
198 Channels.fireMessageReceived(ctx, partialDataFrame, e.getRemoteAddress());
199 }
200 }
201
202
203 if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) {
204 int streamDeltaWindowSize = initialReceiveWindowSize - newWindowSize;
205 spdySession.updateReceiveWindowSize(streamId, streamDeltaWindowSize);
206 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
207 new DefaultSpdyWindowUpdateFrame(streamId, streamDeltaWindowSize);
208 Channels.write(
209 ctx, Channels.future(e.getChannel()), spdyWindowUpdateFrame, e.getRemoteAddress());
210 }
211
212
213 if (spdyDataFrame.isLast()) {
214 halfCloseStream(streamId, true, e.getFuture());
215 }
216
217 } else if (msg instanceof SpdySynStreamFrame) {
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233 SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
234 int streamId = spdySynStreamFrame.getStreamId();
235
236
237 if (spdySynStreamFrame.isInvalid() ||
238 !isRemoteInitiatedId(streamId) ||
239 spdySession.isActiveStream(streamId)) {
240 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.PROTOCOL_ERROR);
241 return;
242 }
243
244
245 if (streamId <= lastGoodStreamId) {
246 issueSessionError(ctx, e.getChannel(), e.getRemoteAddress(), SpdySessionStatus.PROTOCOL_ERROR);
247 return;
248 }
249
250
251 byte priority = spdySynStreamFrame.getPriority();
252 boolean remoteSideClosed = spdySynStreamFrame.isLast();
253 boolean localSideClosed = spdySynStreamFrame.isUnidirectional();
254 if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
255 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.REFUSED_STREAM);
256 return;
257 }
258
259 } else if (msg instanceof SpdySynReplyFrame) {
260
261
262
263
264
265
266
267
268 SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
269 int streamId = spdySynReplyFrame.getStreamId();
270
271
272 if (spdySynReplyFrame.isInvalid() ||
273 isRemoteInitiatedId(streamId) ||
274 spdySession.isRemoteSideClosed(streamId)) {
275 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.INVALID_STREAM);
276 return;
277 }
278
279
280 if (spdySession.hasReceivedReply(streamId)) {
281 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.STREAM_IN_USE);
282 return;
283 }
284
285 spdySession.receivedReply(streamId);
286
287
288 if (spdySynReplyFrame.isLast()) {
289 halfCloseStream(streamId, true, e.getFuture());
290 }
291
292 } else if (msg instanceof SpdyRstStreamFrame) {
293
294
295
296
297
298
299
300
301
302
303 SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
304 removeStream(spdyRstStreamFrame.getStreamId(), e.getFuture());
305
306 } else if (msg instanceof SpdySettingsFrame) {
307
308 SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
309
310 int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
311 if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
312
313 issueSessionError(ctx, e.getChannel(), e.getRemoteAddress(), SpdySessionStatus.PROTOCOL_ERROR);
314 return;
315 }
316
317 int newConcurrentStreams =
318 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
319 if (newConcurrentStreams >= 0) {
320 remoteConcurrentStreams = newConcurrentStreams;
321 }
322
323
324
325
326 if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
327 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
328 }
329 spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
330
331 int newInitialWindowSize =
332 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
333 if (newInitialWindowSize >= 0) {
334 updateInitialSendWindowSize(newInitialWindowSize);
335 }
336
337 } else if (msg instanceof SpdyPingFrame) {
338
339
340
341
342
343
344
345
346
347
348 SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
349
350 if (isRemoteInitiatedId(spdyPingFrame.getId())) {
351 Channels.write(ctx, Channels.future(e.getChannel()), spdyPingFrame, e.getRemoteAddress());
352 return;
353 }
354
355
356 if (pings.get() == 0) {
357 return;
358 }
359 pings.getAndDecrement();
360
361 } else if (msg instanceof SpdyGoAwayFrame) {
362
363 receivedGoAwayFrame = true;
364
365 } else if (msg instanceof SpdyHeadersFrame) {
366
367 SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
368 int streamId = spdyHeadersFrame.getStreamId();
369
370
371 if (spdyHeadersFrame.isInvalid()) {
372 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.PROTOCOL_ERROR);
373 return;
374 }
375
376 if (spdySession.isRemoteSideClosed(streamId)) {
377 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.INVALID_STREAM);
378 return;
379 }
380
381
382 if (spdyHeadersFrame.isLast()) {
383 halfCloseStream(streamId, true, e.getFuture());
384 }
385
386 } else if (msg instanceof SpdyWindowUpdateFrame) {
387
388
389
390
391
392
393
394
395
396
397
398 SpdyWindowUpdateFrame spdyWindowUpdateFrame = (SpdyWindowUpdateFrame) msg;
399 int streamId = spdyWindowUpdateFrame.getStreamId();
400 int deltaWindowSize = spdyWindowUpdateFrame.getDeltaWindowSize();
401
402
403 if (streamId != SPDY_SESSION_STREAM_ID && spdySession.isLocalSideClosed(streamId)) {
404 return;
405 }
406
407
408 if (spdySession.getSendWindowSize(streamId) > Integer.MAX_VALUE - deltaWindowSize) {
409 if (streamId == SPDY_SESSION_STREAM_ID) {
410 issueSessionError(ctx, e.getChannel(), e.getRemoteAddress(), SpdySessionStatus.PROTOCOL_ERROR);
411 } else {
412 issueStreamError(ctx, e.getRemoteAddress(), streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
413 }
414 return;
415 }
416
417 updateSendWindowSize(ctx, streamId, deltaWindowSize);
418 return;
419 }
420
421 super.messageReceived(ctx, e);
422 }
423
424 @Override
425 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
426 throws Exception {
427
428 Throwable cause = e.getCause();
429 if (cause instanceof SpdyProtocolException) {
430 issueSessionError(ctx, e.getChannel(), null, SpdySessionStatus.PROTOCOL_ERROR);
431 }
432
433 super.exceptionCaught(ctx, e);
434 }
435
436 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
437 throws Exception {
438 if (evt instanceof ChannelStateEvent) {
439 ChannelStateEvent e = (ChannelStateEvent) evt;
440 switch (e.getState()) {
441 case OPEN:
442 case CONNECTED:
443 case BOUND:
444
445
446
447
448
449
450
451 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
452 sendGoAwayFrame(ctx, e);
453 return;
454 }
455 }
456 }
457 if (!(evt instanceof MessageEvent)) {
458 ctx.sendDownstream(evt);
459 return;
460 }
461
462 MessageEvent e = (MessageEvent) evt;
463 Object msg = e.getMessage();
464
465 if (msg instanceof SpdyDataFrame) {
466
467 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
468 final int streamId = spdyDataFrame.getStreamId();
469
470
471 if (spdySession.isLocalSideClosed(streamId)) {
472 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
473 return;
474 }
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489 synchronized (flowControlLock) {
490 int dataLength = spdyDataFrame.getData().readableBytes();
491 int sendWindowSize = spdySession.getSendWindowSize(streamId);
492 int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
493 sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
494
495 if (sendWindowSize <= 0) {
496
497 spdySession.putPendingWrite(streamId, e);
498 return;
499 } else if (sendWindowSize < dataLength) {
500
501 spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize);
502 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
503
504
505 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId);
506 partialDataFrame.setData(spdyDataFrame.getData().readSlice(sendWindowSize));
507
508
509 spdySession.putPendingWrite(streamId, e);
510
511 ChannelFuture writeFuture = Channels.future(e.getChannel());
512
513
514
515 final SocketAddress remoteAddress = e.getRemoteAddress();
516 final ChannelHandlerContext context = ctx;
517 e.getFuture().addListener(new ChannelFutureListener() {
518 public void operationComplete(ChannelFuture future) throws Exception {
519 if (!future.isSuccess()) {
520 Channel channel = future.getChannel();
521 issueSessionError(
522 context, channel, remoteAddress, SpdySessionStatus.INTERNAL_ERROR);
523 }
524 }
525 });
526
527 Channels.write(ctx, writeFuture, partialDataFrame, remoteAddress);
528 return;
529 } else {
530
531 spdySession.updateSendWindowSize(streamId, -1 * dataLength);
532 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength);
533
534
535
536 final SocketAddress remoteAddress = e.getRemoteAddress();
537 final ChannelHandlerContext context = ctx;
538 e.getFuture().addListener(new ChannelFutureListener() {
539 public void operationComplete(ChannelFuture future) throws Exception {
540 if (!future.isSuccess()) {
541 Channel channel = future.getChannel();
542 issueSessionError(
543 context, channel, remoteAddress, SpdySessionStatus.INTERNAL_ERROR);
544 }
545 }
546 });
547 }
548 }
549
550
551 if (spdyDataFrame.isLast()) {
552 halfCloseStream(streamId, false, e.getFuture());
553 }
554
555 } else if (msg instanceof SpdySynStreamFrame) {
556
557 SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
558 int streamId = spdySynStreamFrame.getStreamId();
559
560 if (isRemoteInitiatedId(streamId)) {
561 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
562 return;
563 }
564
565 byte priority = spdySynStreamFrame.getPriority();
566 boolean remoteSideClosed = spdySynStreamFrame.isUnidirectional();
567 boolean localSideClosed = spdySynStreamFrame.isLast();
568 if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
569 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
570 return;
571 }
572
573 } else if (msg instanceof SpdySynReplyFrame) {
574
575 SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
576 int streamId = spdySynReplyFrame.getStreamId();
577
578
579 if (!isRemoteInitiatedId(streamId) || spdySession.isLocalSideClosed(streamId)) {
580 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
581 return;
582 }
583
584
585 if (spdySynReplyFrame.isLast()) {
586 halfCloseStream(streamId, false, e.getFuture());
587 }
588
589 } else if (msg instanceof SpdyRstStreamFrame) {
590
591 SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
592 removeStream(spdyRstStreamFrame.getStreamId(), e.getFuture());
593
594 } else if (msg instanceof SpdySettingsFrame) {
595
596 SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
597
598 int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
599 if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
600
601 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
602 return;
603 }
604
605 int newConcurrentStreams =
606 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
607 if (newConcurrentStreams >= 0) {
608 localConcurrentStreams = newConcurrentStreams;
609 }
610
611
612
613
614 if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
615 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
616 }
617 spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
618
619 int newInitialWindowSize =
620 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
621 if (newInitialWindowSize >= 0) {
622 updateInitialReceiveWindowSize(newInitialWindowSize);
623 }
624
625 } else if (msg instanceof SpdyPingFrame) {
626
627 SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
628 if (isRemoteInitiatedId(spdyPingFrame.getId())) {
629 e.getFuture().setFailure(new IllegalArgumentException(
630 "invalid PING ID: " + spdyPingFrame.getId()));
631 return;
632 }
633 pings.getAndIncrement();
634
635 } else if (msg instanceof SpdyGoAwayFrame) {
636
637
638
639 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
640 return;
641
642 } else if (msg instanceof SpdyHeadersFrame) {
643
644 SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
645 int streamId = spdyHeadersFrame.getStreamId();
646
647
648 if (spdySession.isLocalSideClosed(streamId)) {
649 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
650 return;
651 }
652
653
654 if (spdyHeadersFrame.isLast()) {
655 halfCloseStream(streamId, false, e.getFuture());
656 }
657
658 } else if (msg instanceof SpdyWindowUpdateFrame) {
659
660
661 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
662 return;
663 }
664
665 ctx.sendDownstream(evt);
666 }
667
668
669
670
671
672
673
674
675
676
677 private void issueSessionError(
678 ChannelHandlerContext ctx, Channel channel, SocketAddress remoteAddress, SpdySessionStatus status) {
679
680 ChannelFuture future = sendGoAwayFrame(ctx, channel, remoteAddress, status);
681 future.addListener(ChannelFutureListener.CLOSE);
682 }
683
684
685
686
687
688
689
690
691
692
693
694
695 private void issueStreamError(
696 ChannelHandlerContext ctx, SocketAddress remoteAddress, int streamId, SpdyStreamStatus status) {
697
698 boolean fireMessageReceived = !spdySession.isRemoteSideClosed(streamId);
699 ChannelFuture future = Channels.future(ctx.getChannel());
700 removeStream(streamId, future);
701
702 SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, status);
703 Channels.write(ctx, future, spdyRstStreamFrame, remoteAddress);
704 if (fireMessageReceived) {
705 Channels.fireMessageReceived(ctx, spdyRstStreamFrame, remoteAddress);
706 }
707 }
708
709
710
711
712
713 private boolean isRemoteInitiatedId(int id) {
714 boolean serverId = isServerId(id);
715 return server && !serverId || !server && serverId;
716 }
717
718
719 private synchronized void updateInitialSendWindowSize(int newInitialWindowSize) {
720 int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
721 initialSendWindowSize = newInitialWindowSize;
722 spdySession.updateAllSendWindowSizes(deltaWindowSize);
723 }
724
725
726 private synchronized void updateInitialReceiveWindowSize(int newInitialWindowSize) {
727 int deltaWindowSize = newInitialWindowSize - initialReceiveWindowSize;
728 initialReceiveWindowSize = newInitialWindowSize;
729 spdySession.updateAllReceiveWindowSizes(deltaWindowSize);
730 }
731
732
733 private synchronized boolean acceptStream(
734 int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed) {
735
736 if (receivedGoAwayFrame || sentGoAwayFrame) {
737 return false;
738 }
739
740 boolean remote = isRemoteInitiatedId(streamId);
741 int maxConcurrentStreams = remote ? localConcurrentStreams : remoteConcurrentStreams;
742 if (spdySession.numActiveStreams(remote) >= maxConcurrentStreams) {
743 return false;
744 }
745 spdySession.acceptStream(
746 streamId, priority, remoteSideClosed, localSideClosed,
747 initialSendWindowSize, initialReceiveWindowSize, remote);
748 if (remote) {
749 lastGoodStreamId = streamId;
750 }
751 return true;
752 }
753
754 private void halfCloseStream(int streamId, boolean remote, ChannelFuture future) {
755 if (remote) {
756 spdySession.closeRemoteSide(streamId, isRemoteInitiatedId(streamId));
757 } else {
758 spdySession.closeLocalSide(streamId, isRemoteInitiatedId(streamId));
759 }
760 if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
761 future.addListener(closeSessionFutureListener);
762 }
763 }
764
765 private void removeStream(int streamId, ChannelFuture future) {
766 spdySession.removeStream(streamId, isRemoteInitiatedId(streamId));
767 if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
768 future.addListener(closeSessionFutureListener);
769 }
770 }
771
772 private void updateSendWindowSize(ChannelHandlerContext ctx, int streamId, int deltaWindowSize) {
773 synchronized (flowControlLock) {
774 int newWindowSize = spdySession.updateSendWindowSize(streamId, deltaWindowSize);
775 if (streamId != SPDY_SESSION_STREAM_ID) {
776 int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
777 newWindowSize = Math.min(newWindowSize, sessionSendWindowSize);
778 }
779
780 while (newWindowSize > 0) {
781
782 MessageEvent e = spdySession.getPendingWrite(streamId);
783 if (e == null) {
784 break;
785 }
786
787 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) e.getMessage();
788 int dataFrameSize = spdyDataFrame.getData().readableBytes();
789 int writeStreamId = spdyDataFrame.getStreamId();
790 if (streamId == SPDY_SESSION_STREAM_ID) {
791 newWindowSize = Math.min(newWindowSize, spdySession.getSendWindowSize(writeStreamId));
792 }
793
794 if (newWindowSize >= dataFrameSize) {
795
796 spdySession.removePendingWrite(writeStreamId);
797 newWindowSize = spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize);
798 int sessionSendWindowSize =
799 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize);
800 newWindowSize = Math.min(newWindowSize, sessionSendWindowSize);
801
802
803
804 final SocketAddress remoteAddress = e.getRemoteAddress();
805 final ChannelHandlerContext context = ctx;
806 e.getFuture().addListener(new ChannelFutureListener() {
807 public void operationComplete(ChannelFuture future) throws Exception {
808 if (!future.isSuccess()) {
809 Channel channel = future.getChannel();
810 issueSessionError(context, channel, remoteAddress, SpdySessionStatus.INTERNAL_ERROR);
811 }
812 }
813 });
814
815
816 if (spdyDataFrame.isLast()) {
817 halfCloseStream(writeStreamId, false, e.getFuture());
818 }
819
820 Channels.write(ctx, e.getFuture(), spdyDataFrame, e.getRemoteAddress());
821 } else {
822
823 spdySession.updateSendWindowSize(writeStreamId, -1 * newWindowSize);
824 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * newWindowSize);
825
826
827 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(writeStreamId);
828 partialDataFrame.setData(spdyDataFrame.getData().readSlice(newWindowSize));
829
830 ChannelFuture writeFuture = Channels.future(e.getChannel());
831
832
833
834 final SocketAddress remoteAddress = e.getRemoteAddress();
835 final ChannelHandlerContext context = ctx;
836 e.getFuture().addListener(new ChannelFutureListener() {
837 public void operationComplete(ChannelFuture future) throws Exception {
838 if (!future.isSuccess()) {
839 Channel channel = future.getChannel();
840 issueSessionError(context, channel, remoteAddress, SpdySessionStatus.INTERNAL_ERROR);
841 }
842 }
843 });
844
845 Channels.write(ctx, writeFuture, partialDataFrame, remoteAddress);
846
847 newWindowSize = 0;
848 }
849 }
850 }
851 }
852
853 private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelStateEvent e) {
854
855 if (!e.getChannel().isConnected()) {
856 ctx.sendDownstream(e);
857 return;
858 }
859
860 ChannelFuture future = sendGoAwayFrame(ctx, e.getChannel(), null, SpdySessionStatus.OK);
861 if (spdySession.noActiveStreams()) {
862 future.addListener(new ClosingChannelFutureListener(ctx, e));
863 } else {
864 closeSessionFutureListener = new ClosingChannelFutureListener(ctx, e);
865 }
866 }
867
868 private synchronized ChannelFuture sendGoAwayFrame(
869 ChannelHandlerContext ctx, Channel channel, SocketAddress remoteAddress, SpdySessionStatus status) {
870 if (!sentGoAwayFrame) {
871 sentGoAwayFrame = true;
872 SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamId, status);
873 ChannelFuture future = Channels.future(channel);
874 Channels.write(ctx, future, spdyGoAwayFrame, remoteAddress);
875 return future;
876 }
877 return Channels.succeededFuture(channel);
878 }
879
880 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
881 private final ChannelHandlerContext ctx;
882 private final ChannelStateEvent e;
883
884 ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelStateEvent e) {
885 this.ctx = ctx;
886 this.e = e;
887 }
888
889 public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
890 if (!(sentGoAwayFuture.getCause() instanceof ClosedChannelException)) {
891 Channels.close(ctx, e.getFuture());
892 } else {
893 e.getFuture().setSuccess();
894 }
895 }
896 }
897 }