1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.spdy;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelDownstreamHandler;
20 import org.jboss.netty.channel.ChannelEvent;
21 import org.jboss.netty.channel.ChannelFuture;
22 import org.jboss.netty.channel.ChannelFutureListener;
23 import org.jboss.netty.channel.ChannelHandlerContext;
24 import org.jboss.netty.channel.ChannelStateEvent;
25 import org.jboss.netty.channel.Channels;
26 import org.jboss.netty.channel.ExceptionEvent;
27 import org.jboss.netty.channel.MessageEvent;
28 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
29
30 import java.net.SocketAddress;
31 import java.nio.channels.ClosedChannelException;
32 import java.util.concurrent.atomic.AtomicInteger;
33
34 import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*;
35
36
37
38
39 public class SpdySessionHandler extends SimpleChannelUpstreamHandler
40 implements ChannelDownstreamHandler {
41
42 private static final SpdyProtocolException PROTOCOL_EXCEPTION = new SpdyProtocolException();
43
44 private final SpdySession spdySession = new SpdySession();
45 private volatile int lastGoodStreamID;
46
47 private volatile int remoteConcurrentStreams;
48 private volatile int localConcurrentStreams;
49 private volatile int maxConcurrentStreams;
50
51 private static final int DEFAULT_WINDOW_SIZE = 64 * 1024;
52 private volatile int initialSendWindowSize = DEFAULT_WINDOW_SIZE;
53 private volatile int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
54
55 private final Object flowControlLock = new Object();
56
57 private final AtomicInteger pings = new AtomicInteger();
58
59 private volatile boolean sentGoAwayFrame;
60 private volatile boolean receivedGoAwayFrame;
61
62 private volatile ChannelFuture closeSessionFuture;
63
64 private final boolean server;
65 private final boolean flowControl;
66
67
68
69
70
71
72
73
74
75 @Deprecated
76 public SpdySessionHandler(boolean server) {
77 this(2, server);
78 }
79
80
81
82
83
84
85
86
87
88
89 public SpdySessionHandler(int version, boolean server) {
90 if (version < SPDY_MIN_VERSION || version > SPDY_MAX_VERSION) {
91 throw new IllegalArgumentException(
92 "unsupported version: " + version);
93 }
94 this.server = server;
95 flowControl = version >= 3;
96 }
97
98 @Override
99 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
100 throws Exception {
101
102 Object msg = e.getMessage();
103 if (msg instanceof SpdyDataFrame) {
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
129 int streamID = spdyDataFrame.getStreamId();
130
131
132 if (!spdySession.isActiveStream(streamID)) {
133 if (streamID <= lastGoodStreamID) {
134 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
135 } else if (!sentGoAwayFrame) {
136 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.INVALID_STREAM);
137 }
138 return;
139 }
140
141
142 if (spdySession.isRemoteSideClosed(streamID)) {
143 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.STREAM_ALREADY_CLOSED);
144 return;
145 }
146
147
148 if (!isRemoteInitiatedID(streamID) && !spdySession.hasReceivedReply(streamID)) {
149 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
150 return;
151 }
152
153
154
155
156
157
158
159 if (flowControl) {
160
161 int deltaWindowSize = -1 * spdyDataFrame.getData().readableBytes();
162 int newWindowSize = spdySession.updateReceiveWindowSize(streamID, deltaWindowSize);
163
164
165
166
167
168
169 if (newWindowSize < spdySession.getReceiveWindowSizeLowerBound(streamID)) {
170 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.FLOW_CONTROL_ERROR);
171 return;
172 }
173
174
175
176 if (newWindowSize < 0) {
177 while (spdyDataFrame.getData().readableBytes() > initialReceiveWindowSize) {
178 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
179 partialDataFrame.setData(spdyDataFrame.getData().readSlice(initialReceiveWindowSize));
180 Channels.fireMessageReceived(ctx, partialDataFrame, e.getRemoteAddress());
181 }
182 }
183
184
185 if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) {
186 deltaWindowSize = initialReceiveWindowSize - newWindowSize;
187 spdySession.updateReceiveWindowSize(streamID, deltaWindowSize);
188 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
189 new DefaultSpdyWindowUpdateFrame(streamID, deltaWindowSize);
190 Channels.write(
191 ctx, Channels.future(e.getChannel()), spdyWindowUpdateFrame, e.getRemoteAddress());
192 }
193 }
194
195
196 if (spdyDataFrame.isLast()) {
197 halfCloseStream(streamID, true);
198 }
199
200 } else if (msg instanceof SpdySynStreamFrame) {
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216 SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
217 int streamID = spdySynStreamFrame.getStreamId();
218
219
220 if (spdySynStreamFrame.isInvalid() ||
221 !isRemoteInitiatedID(streamID) ||
222 spdySession.isActiveStream(streamID)) {
223 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
224 return;
225 }
226
227
228 if (streamID <= lastGoodStreamID) {
229 issueSessionError(ctx, e.getChannel(), e.getRemoteAddress(), SpdySessionStatus.PROTOCOL_ERROR);
230 return;
231 }
232
233
234 byte priority = spdySynStreamFrame.getPriority();
235 boolean remoteSideClosed = spdySynStreamFrame.isLast();
236 boolean localSideClosed = spdySynStreamFrame.isUnidirectional();
237 if (!acceptStream(streamID, priority, remoteSideClosed, localSideClosed)) {
238 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.REFUSED_STREAM);
239 return;
240 }
241
242 } else if (msg instanceof SpdySynReplyFrame) {
243
244
245
246
247
248
249
250
251 SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
252 int streamID = spdySynReplyFrame.getStreamId();
253
254
255 if (spdySynReplyFrame.isInvalid() ||
256 isRemoteInitiatedID(streamID) ||
257 spdySession.isRemoteSideClosed(streamID)) {
258 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.INVALID_STREAM);
259 return;
260 }
261
262
263 if (spdySession.hasReceivedReply(streamID)) {
264 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.STREAM_IN_USE);
265 return;
266 }
267
268 spdySession.receivedReply(streamID);
269
270
271 if (spdySynReplyFrame.isLast()) {
272 halfCloseStream(streamID, true);
273 }
274
275 } else if (msg instanceof SpdyRstStreamFrame) {
276
277
278
279
280
281
282
283
284
285
286 SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
287 removeStream(spdyRstStreamFrame.getStreamId());
288
289 } else if (msg instanceof SpdySettingsFrame) {
290
291 SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
292
293 int newConcurrentStreams =
294 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
295 if (newConcurrentStreams >= 0) {
296 updateConcurrentStreams(newConcurrentStreams, true);
297 }
298
299
300
301
302 if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
303 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
304 }
305 spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
306
307 if (flowControl) {
308 int newInitialWindowSize =
309 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
310 if (newInitialWindowSize >= 0) {
311 updateInitialSendWindowSize(newInitialWindowSize);
312 }
313 }
314
315 } else if (msg instanceof SpdyPingFrame) {
316
317
318
319
320
321
322
323
324
325
326 SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
327
328 if (isRemoteInitiatedID(spdyPingFrame.getId())) {
329 Channels.write(ctx, Channels.future(e.getChannel()), spdyPingFrame, e.getRemoteAddress());
330 return;
331 }
332
333
334 if (pings.get() == 0) {
335 return;
336 }
337 pings.getAndDecrement();
338
339 } else if (msg instanceof SpdyGoAwayFrame) {
340
341 receivedGoAwayFrame = true;
342
343 } else if (msg instanceof SpdyHeadersFrame) {
344
345 SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
346 int streamID = spdyHeadersFrame.getStreamId();
347
348
349 if (spdyHeadersFrame.isInvalid()) {
350 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
351 return;
352 }
353
354 if (spdySession.isRemoteSideClosed(streamID)) {
355 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.INVALID_STREAM);
356 return;
357 }
358
359
360 if (spdyHeadersFrame.isLast()) {
361 halfCloseStream(streamID, true);
362 }
363
364 } else if (msg instanceof SpdyWindowUpdateFrame) {
365
366
367
368
369
370
371
372
373
374
375
376 if (flowControl) {
377 SpdyWindowUpdateFrame spdyWindowUpdateFrame = (SpdyWindowUpdateFrame) msg;
378 int streamID = spdyWindowUpdateFrame.getStreamId();
379 int deltaWindowSize = spdyWindowUpdateFrame.getDeltaWindowSize();
380
381
382 if (spdySession.isLocalSideClosed(streamID)) {
383 return;
384 }
385
386
387 if (spdySession.getSendWindowSize(streamID) > Integer.MAX_VALUE - deltaWindowSize) {
388 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.FLOW_CONTROL_ERROR);
389 return;
390 }
391
392 updateSendWindowSize(ctx, streamID, deltaWindowSize);
393 }
394 return;
395 }
396
397 super.messageReceived(ctx, e);
398 }
399
400 @Override
401 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
402 throws Exception {
403
404 Throwable cause = e.getCause();
405 if (cause instanceof SpdyProtocolException) {
406 issueSessionError(ctx, e.getChannel(), null, SpdySessionStatus.PROTOCOL_ERROR);
407 }
408
409 super.exceptionCaught(ctx, e);
410 }
411
412 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
413 throws Exception {
414 if (evt instanceof ChannelStateEvent) {
415 ChannelStateEvent e = (ChannelStateEvent) evt;
416 switch (e.getState()) {
417 case OPEN:
418 case CONNECTED:
419 case BOUND:
420
421
422
423
424
425
426
427 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
428 sendGoAwayFrame(ctx, e);
429 return;
430 }
431 }
432 }
433 if (!(evt instanceof MessageEvent)) {
434 ctx.sendDownstream(evt);
435 return;
436 }
437
438 MessageEvent e = (MessageEvent) evt;
439 Object msg = e.getMessage();
440
441 if (msg instanceof SpdyDataFrame) {
442
443 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
444 final int streamID = spdyDataFrame.getStreamId();
445
446
447 if (spdySession.isLocalSideClosed(streamID)) {
448 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
449 return;
450 }
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465 if (flowControl) {
466 synchronized (flowControlLock) {
467 int dataLength = spdyDataFrame.getData().readableBytes();
468 int sendWindowSize = spdySession.getSendWindowSize(streamID);
469
470 if (sendWindowSize <= 0) {
471
472 spdySession.putPendingWrite(streamID, e);
473 return;
474 } else if (sendWindowSize < dataLength) {
475
476 spdySession.updateSendWindowSize(streamID, -1 * sendWindowSize);
477
478
479 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
480 partialDataFrame.setData(spdyDataFrame.getData().readSlice(sendWindowSize));
481
482
483 spdySession.putPendingWrite(streamID, e);
484
485 ChannelFuture writeFuture = Channels.future(e.getChannel());
486
487
488
489 final SocketAddress remoteAddress = e.getRemoteAddress();
490 final ChannelHandlerContext context = ctx;
491 e.getFuture().addListener(new ChannelFutureListener() {
492 public void operationComplete(ChannelFuture future) throws Exception {
493 if (!future.isSuccess()) {
494 issueStreamError(
495 context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
496 }
497 }
498 });
499
500 Channels.write(ctx, writeFuture, partialDataFrame, remoteAddress);
501 return;
502 } else {
503
504 spdySession.updateSendWindowSize(streamID, -1 * dataLength);
505
506
507
508 final SocketAddress remoteAddress = e.getRemoteAddress();
509 final ChannelHandlerContext context = ctx;
510 e.getFuture().addListener(new ChannelFutureListener() {
511 public void operationComplete(ChannelFuture future) throws Exception {
512 if (!future.isSuccess()) {
513 issueStreamError(
514 context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
515 }
516 }
517 });
518 }
519 }
520 }
521
522
523 if (spdyDataFrame.isLast()) {
524 halfCloseStream(streamID, false);
525 }
526
527 } else if (msg instanceof SpdySynStreamFrame) {
528
529 SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
530 int streamID = spdySynStreamFrame.getStreamId();
531
532 if (isRemoteInitiatedID(streamID)) {
533 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
534 return;
535 }
536
537 byte priority = spdySynStreamFrame.getPriority();
538 boolean remoteSideClosed = spdySynStreamFrame.isUnidirectional();
539 boolean localSideClosed = spdySynStreamFrame.isLast();
540 if (!acceptStream(streamID, priority, remoteSideClosed, localSideClosed)) {
541 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
542 return;
543 }
544
545 } else if (msg instanceof SpdySynReplyFrame) {
546
547 SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
548 int streamID = spdySynReplyFrame.getStreamId();
549
550
551 if (!isRemoteInitiatedID(streamID) || spdySession.isLocalSideClosed(streamID)) {
552 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
553 return;
554 }
555
556
557 if (spdySynReplyFrame.isLast()) {
558 halfCloseStream(streamID, false);
559 }
560
561 } else if (msg instanceof SpdyRstStreamFrame) {
562
563 SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
564 removeStream(spdyRstStreamFrame.getStreamId());
565
566 } else if (msg instanceof SpdySettingsFrame) {
567
568 SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
569
570 int newConcurrentStreams =
571 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
572 if (newConcurrentStreams >= 0) {
573 updateConcurrentStreams(newConcurrentStreams, false);
574 }
575
576
577
578
579 if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
580 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
581 }
582 spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
583
584 if (flowControl) {
585 int newInitialWindowSize =
586 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
587 if (newInitialWindowSize >= 0) {
588 updateInitialReceiveWindowSize(newInitialWindowSize);
589 }
590 }
591
592 } else if (msg instanceof SpdyPingFrame) {
593
594 SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
595 if (isRemoteInitiatedID(spdyPingFrame.getId())) {
596 e.getFuture().setFailure(new IllegalArgumentException(
597 "invalid PING ID: " + spdyPingFrame.getId()));
598 return;
599 }
600 pings.getAndIncrement();
601
602 } else if (msg instanceof SpdyGoAwayFrame) {
603
604
605
606 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
607 return;
608
609 } else if (msg instanceof SpdyHeadersFrame) {
610
611 SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
612 int streamID = spdyHeadersFrame.getStreamId();
613
614
615 if (spdySession.isLocalSideClosed(streamID)) {
616 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
617 return;
618 }
619
620
621 if (spdyHeadersFrame.isLast()) {
622 halfCloseStream(streamID, false);
623 }
624
625 } else if (msg instanceof SpdyWindowUpdateFrame) {
626
627
628 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
629 return;
630 }
631
632 ctx.sendDownstream(evt);
633 }
634
635
636
637
638
639
640
641
642
643
644 private void issueSessionError(
645 ChannelHandlerContext ctx, Channel channel, SocketAddress remoteAddress, SpdySessionStatus status) {
646
647 ChannelFuture future = sendGoAwayFrame(ctx, channel, remoteAddress, status);
648 future.addListener(ChannelFutureListener.CLOSE);
649 }
650
651
652
653
654
655
656
657
658
659
660
661
662 private void issueStreamError(
663 ChannelHandlerContext ctx, SocketAddress remoteAddress, int streamID, SpdyStreamStatus status) {
664
665 boolean fireMessageReceived = !spdySession.isRemoteSideClosed(streamID);
666 removeStream(streamID);
667
668 SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamID, status);
669 Channels.write(ctx, Channels.future(ctx.getChannel()), spdyRstStreamFrame, remoteAddress);
670 if (fireMessageReceived) {
671 Channels.fireMessageReceived(ctx, spdyRstStreamFrame, remoteAddress);
672 }
673 }
674
675
676
677
678
679 private boolean isRemoteInitiatedID(int id) {
680 boolean serverID = isServerId(id);
681 return server && !serverID || !server && serverID;
682 }
683
684 private void updateConcurrentStreams(int newConcurrentStreams, boolean remote) {
685 if (remote) {
686 remoteConcurrentStreams = newConcurrentStreams;
687 } else {
688 localConcurrentStreams = newConcurrentStreams;
689 }
690 if (localConcurrentStreams == remoteConcurrentStreams) {
691 maxConcurrentStreams = localConcurrentStreams;
692 return;
693 }
694 if (localConcurrentStreams == 0) {
695 maxConcurrentStreams = remoteConcurrentStreams;
696 return;
697 }
698 if (remoteConcurrentStreams == 0) {
699 maxConcurrentStreams = localConcurrentStreams;
700 return;
701 }
702 if (localConcurrentStreams > remoteConcurrentStreams) {
703 maxConcurrentStreams = remoteConcurrentStreams;
704 } else {
705 maxConcurrentStreams = localConcurrentStreams;
706 }
707 }
708
709
710 private synchronized void updateInitialSendWindowSize(int newInitialWindowSize) {
711 int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
712 initialSendWindowSize = newInitialWindowSize;
713 for (Integer StreamID: spdySession.getActiveStreams()) {
714 spdySession.updateSendWindowSize(StreamID.intValue(), deltaWindowSize);
715 }
716 }
717
718
719 private synchronized void updateInitialReceiveWindowSize(int newInitialWindowSize) {
720 int deltaWindowSize = newInitialWindowSize - initialReceiveWindowSize;
721 initialReceiveWindowSize = newInitialWindowSize;
722 spdySession.updateAllReceiveWindowSizes(deltaWindowSize);
723 }
724
725
726 private synchronized boolean acceptStream(
727 int streamID, byte priority, boolean remoteSideClosed, boolean localSideClosed) {
728
729 if (receivedGoAwayFrame || sentGoAwayFrame) {
730 return false;
731 }
732
733 int maxConcurrentStreams = this.maxConcurrentStreams;
734 if (maxConcurrentStreams != 0 &&
735 spdySession.numActiveStreams() >= maxConcurrentStreams) {
736 return false;
737 }
738 spdySession.acceptStream(
739 streamID, priority, remoteSideClosed, localSideClosed,
740 initialSendWindowSize, initialReceiveWindowSize);
741 if (isRemoteInitiatedID(streamID)) {
742 lastGoodStreamID = streamID;
743 }
744 return true;
745 }
746
747 private void halfCloseStream(int streamID, boolean remote) {
748 if (remote) {
749 spdySession.closeRemoteSide(streamID);
750 } else {
751 spdySession.closeLocalSide(streamID);
752 }
753 if (closeSessionFuture != null && spdySession.noActiveStreams()) {
754 closeSessionFuture.setSuccess();
755 }
756 }
757
758 private void removeStream(int streamID) {
759 spdySession.removeStream(streamID);
760 if (closeSessionFuture != null && spdySession.noActiveStreams()) {
761 closeSessionFuture.setSuccess();
762 }
763 }
764
765 private void updateSendWindowSize(ChannelHandlerContext ctx, final int streamID, int deltaWindowSize) {
766 synchronized (flowControlLock) {
767 int newWindowSize = spdySession.updateSendWindowSize(streamID, deltaWindowSize);
768
769 while (newWindowSize > 0) {
770
771 MessageEvent e = spdySession.getPendingWrite(streamID);
772 if (e == null) {
773 break;
774 }
775
776 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) e.getMessage();
777 int dataFrameSize = spdyDataFrame.getData().readableBytes();
778
779 if (newWindowSize >= dataFrameSize) {
780
781 spdySession.removePendingWrite(streamID);
782 newWindowSize = spdySession.updateSendWindowSize(streamID, -1 * dataFrameSize);
783
784
785
786 final SocketAddress remoteAddress = e.getRemoteAddress();
787 final ChannelHandlerContext context = ctx;
788 e.getFuture().addListener(new ChannelFutureListener() {
789 public void operationComplete(ChannelFuture future) throws Exception {
790 if (!future.isSuccess()) {
791 issueStreamError(context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
792 }
793 }
794 });
795
796
797 if (spdyDataFrame.isLast()) {
798 halfCloseStream(streamID, false);
799 }
800
801 Channels.write(ctx, e.getFuture(), spdyDataFrame, e.getRemoteAddress());
802 } else {
803
804 spdySession.updateSendWindowSize(streamID, -1 * newWindowSize);
805
806
807 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
808 partialDataFrame.setData(spdyDataFrame.getData().readSlice(newWindowSize));
809
810 ChannelFuture writeFuture = Channels.future(e.getChannel());
811
812
813
814 final SocketAddress remoteAddress = e.getRemoteAddress();
815 final ChannelHandlerContext context = ctx;
816 e.getFuture().addListener(new ChannelFutureListener() {
817 public void operationComplete(ChannelFuture future) throws Exception {
818 if (!future.isSuccess()) {
819 issueStreamError(context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
820 }
821 }
822 });
823
824 Channels.write(ctx, writeFuture, partialDataFrame, remoteAddress);
825
826 newWindowSize = 0;
827 }
828 }
829 }
830 }
831
832 private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelStateEvent e) {
833
834 if (!e.getChannel().isConnected()) {
835 ctx.sendDownstream(e);
836 return;
837 }
838
839 ChannelFuture future = sendGoAwayFrame(ctx, e.getChannel(), null, SpdySessionStatus.OK);
840 if (spdySession.noActiveStreams()) {
841 future.addListener(new ClosingChannelFutureListener(ctx, e));
842 } else {
843 closeSessionFuture = Channels.future(e.getChannel());
844 closeSessionFuture.addListener(new ClosingChannelFutureListener(ctx, e));
845 }
846 }
847
848 private synchronized ChannelFuture sendGoAwayFrame(
849 ChannelHandlerContext ctx, Channel channel, SocketAddress remoteAddress, SpdySessionStatus status) {
850 if (!sentGoAwayFrame) {
851 sentGoAwayFrame = true;
852 SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamID, status);
853 ChannelFuture future = Channels.future(channel);
854 Channels.write(ctx, future, spdyGoAwayFrame, remoteAddress);
855 return future;
856 }
857 return Channels.succeededFuture(channel);
858 }
859
860 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
861 private final ChannelHandlerContext ctx;
862 private final ChannelStateEvent e;
863
864 ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelStateEvent e) {
865 this.ctx = ctx;
866 this.e = e;
867 }
868
869 public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
870 if (!(sentGoAwayFuture.getCause() instanceof ClosedChannelException)) {
871 Channels.close(ctx, e.getFuture());
872 } else {
873 e.getFuture().setSuccess();
874 }
875 }
876 }
877 }