1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.spdy;
17
18 import static org.jboss.netty.handler.codec.spdy.SpdyCodecUtil.*;
19
20 import java.net.SocketAddress;
21 import java.nio.channels.ClosedChannelException;
22 import java.util.concurrent.atomic.AtomicInteger;
23
24 import org.jboss.netty.channel.Channel;
25 import org.jboss.netty.channel.ChannelDownstreamHandler;
26 import org.jboss.netty.channel.ChannelEvent;
27 import org.jboss.netty.channel.ChannelFuture;
28 import org.jboss.netty.channel.ChannelFutureListener;
29 import org.jboss.netty.channel.ChannelHandlerContext;
30 import org.jboss.netty.channel.ChannelStateEvent;
31 import org.jboss.netty.channel.Channels;
32 import org.jboss.netty.channel.ExceptionEvent;
33 import org.jboss.netty.channel.MessageEvent;
34 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
35
36
37
38
39 public class SpdySessionHandler extends SimpleChannelUpstreamHandler
40 implements ChannelDownstreamHandler {
41
42 private static final SpdyProtocolException PROTOCOL_EXCEPTION = new SpdyProtocolException();
43
44 private final SpdySession spdySession = new SpdySession();
45 private volatile int lastGoodStreamID;
46
47 private volatile int remoteConcurrentStreams;
48 private volatile int localConcurrentStreams;
49 private volatile int maxConcurrentStreams;
50
51 private static final int DEFAULT_WINDOW_SIZE = 64 * 1024;
52 private volatile int initialSendWindowSize = DEFAULT_WINDOW_SIZE;
53 private volatile int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
54
55 private final Object flowControlLock = new Object();
56
57 private final AtomicInteger pings = new AtomicInteger();
58
59 private volatile boolean sentGoAwayFrame;
60 private volatile boolean receivedGoAwayFrame;
61
62 private volatile ChannelFuture closeSessionFuture;
63
64 private final boolean server;
65 private final boolean flowControl;
66
67
68
69
70
71
72
73
74
75 @Deprecated
76 public SpdySessionHandler(boolean server) {
77 this(2, server);
78 }
79
80
81
82
83
84
85
86
87
88
89 public SpdySessionHandler(int version, boolean server) {
90 if (version < SPDY_MIN_VERSION || version > SPDY_MAX_VERSION) {
91 throw new IllegalArgumentException(
92 "unsupported version: " + version);
93 }
94 this.server = server;
95 flowControl = version >= 3;
96 }
97
98 @Override
99 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
100 throws Exception {
101
102 Object msg = e.getMessage();
103 if (msg instanceof SpdyDataFrame) {
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
129 int streamID = spdyDataFrame.getStreamId();
130
131
132 if (!spdySession.isActiveStream(streamID)) {
133 if (streamID <= lastGoodStreamID) {
134 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
135 } else if (!sentGoAwayFrame) {
136 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.INVALID_STREAM);
137 }
138 return;
139 }
140
141
142 if (spdySession.isRemoteSideClosed(streamID)) {
143 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.STREAM_ALREADY_CLOSED);
144 return;
145 }
146
147
148 if (!isRemoteInitiatedID(streamID) && !spdySession.hasReceivedReply(streamID)) {
149 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
150 return;
151 }
152
153
154
155
156
157
158
159 if (flowControl) {
160
161 int deltaWindowSize = -1 * spdyDataFrame.getData().readableBytes();
162 int newWindowSize = spdySession.updateReceiveWindowSize(streamID, deltaWindowSize);
163
164
165
166
167
168
169 if (newWindowSize < spdySession.getReceiveWindowSizeLowerBound(streamID)) {
170 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.FLOW_CONTROL_ERROR);
171 return;
172 }
173
174
175
176 if (newWindowSize < 0) {
177 while (spdyDataFrame.getData().readableBytes() > initialReceiveWindowSize) {
178 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
179 partialDataFrame.setData(spdyDataFrame.getData().readSlice(initialReceiveWindowSize));
180 Channels.fireMessageReceived(ctx, partialDataFrame, e.getRemoteAddress());
181 }
182 }
183
184
185 if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) {
186 deltaWindowSize = initialReceiveWindowSize - newWindowSize;
187 spdySession.updateReceiveWindowSize(streamID, deltaWindowSize);
188 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
189 new DefaultSpdyWindowUpdateFrame(streamID, deltaWindowSize);
190 Channels.write(
191 ctx, Channels.future(e.getChannel()), spdyWindowUpdateFrame, e.getRemoteAddress());
192 }
193 }
194
195
196 if (spdyDataFrame.isLast()) {
197 halfCloseStream(streamID, true);
198 }
199
200 } else if (msg instanceof SpdySynStreamFrame) {
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216 SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
217 int streamID = spdySynStreamFrame.getStreamId();
218
219
220 if (spdySynStreamFrame.isInvalid() ||
221 !isRemoteInitiatedID(streamID) ||
222 spdySession.isActiveStream(streamID)) {
223 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
224 return;
225 }
226
227
228 if (streamID <= lastGoodStreamID) {
229 issueSessionError(ctx, e.getChannel(), e.getRemoteAddress(), SpdySessionStatus.PROTOCOL_ERROR);
230 return;
231 }
232
233
234 byte priority = spdySynStreamFrame.getPriority();
235 boolean remoteSideClosed = spdySynStreamFrame.isLast();
236 boolean localSideClosed = spdySynStreamFrame.isUnidirectional();
237 if (!acceptStream(streamID, priority, remoteSideClosed, localSideClosed)) {
238 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.REFUSED_STREAM);
239 return;
240 }
241
242 } else if (msg instanceof SpdySynReplyFrame) {
243
244
245
246
247
248
249
250
251 SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
252 int streamID = spdySynReplyFrame.getStreamId();
253
254
255 if (spdySynReplyFrame.isInvalid() ||
256 isRemoteInitiatedID(streamID) ||
257 spdySession.isRemoteSideClosed(streamID)) {
258 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.INVALID_STREAM);
259 return;
260 }
261
262
263 if (spdySession.hasReceivedReply(streamID)) {
264 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.STREAM_IN_USE);
265 return;
266 }
267
268 spdySession.receivedReply(streamID);
269
270
271 if (spdySynReplyFrame.isLast()) {
272 halfCloseStream(streamID, true);
273 }
274
275 } else if (msg instanceof SpdyRstStreamFrame) {
276
277
278
279
280
281
282
283
284
285
286 SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
287 removeStream(spdyRstStreamFrame.getStreamId());
288
289 } else if (msg instanceof SpdySettingsFrame) {
290
291 SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
292
293 int newConcurrentStreams =
294 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
295 if (newConcurrentStreams >= 0) {
296 updateConcurrentStreams(newConcurrentStreams, true);
297 }
298
299
300
301
302 if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
303 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
304 }
305 spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
306
307 if (flowControl) {
308 int newInitialWindowSize =
309 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
310 if (newInitialWindowSize >= 0) {
311 updateInitialSendWindowSize(newInitialWindowSize);
312 }
313 }
314
315 } else if (msg instanceof SpdyPingFrame) {
316
317
318
319
320
321
322
323
324
325
326 SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
327
328 if (isRemoteInitiatedID(spdyPingFrame.getId())) {
329 Channels.write(ctx, Channels.future(e.getChannel()), spdyPingFrame, e.getRemoteAddress());
330 return;
331 }
332
333
334 if (pings.get() == 0) {
335 return;
336 }
337 pings.getAndDecrement();
338
339 } else if (msg instanceof SpdyGoAwayFrame) {
340
341 receivedGoAwayFrame = true;
342
343 } else if (msg instanceof SpdyHeadersFrame) {
344
345 SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
346 int streamID = spdyHeadersFrame.getStreamId();
347
348
349 if (spdyHeadersFrame.isInvalid()) {
350 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.PROTOCOL_ERROR);
351 return;
352 }
353
354 if (spdySession.isRemoteSideClosed(streamID)) {
355 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.INVALID_STREAM);
356 return;
357 }
358
359
360 if (spdyHeadersFrame.isLast()) {
361 halfCloseStream(streamID, true);
362 }
363
364 } else if (msg instanceof SpdyWindowUpdateFrame) {
365
366
367
368
369
370
371
372
373
374
375
376 if (flowControl) {
377 SpdyWindowUpdateFrame spdyWindowUpdateFrame = (SpdyWindowUpdateFrame) msg;
378 int streamID = spdyWindowUpdateFrame.getStreamId();
379 int deltaWindowSize = spdyWindowUpdateFrame.getDeltaWindowSize();
380
381
382 if (spdySession.isLocalSideClosed(streamID)) {
383 return;
384 }
385
386
387 if (spdySession.getSendWindowSize(streamID) > Integer.MAX_VALUE - deltaWindowSize) {
388 issueStreamError(ctx, e.getRemoteAddress(), streamID, SpdyStreamStatus.FLOW_CONTROL_ERROR);
389 return;
390 }
391
392 updateSendWindowSize(ctx, streamID, deltaWindowSize);
393 }
394 return;
395 }
396
397 super.messageReceived(ctx, e);
398 }
399
400 @Override
401 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
402 throws Exception {
403
404 Throwable cause = e.getCause();
405 if (cause instanceof SpdyProtocolException) {
406 issueSessionError(ctx, e.getChannel(), null, SpdySessionStatus.PROTOCOL_ERROR);
407 }
408
409 super.exceptionCaught(ctx, e);
410 }
411
412 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
413 throws Exception {
414 if (evt instanceof ChannelStateEvent) {
415 ChannelStateEvent e = (ChannelStateEvent) evt;
416 switch (e.getState()) {
417 case OPEN:
418 case CONNECTED:
419 case BOUND:
420
421
422
423
424
425
426
427 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
428 sendGoAwayFrame(ctx, e);
429 return;
430 }
431 }
432 }
433 if (!(evt instanceof MessageEvent)) {
434 ctx.sendDownstream(evt);
435 return;
436 }
437
438 MessageEvent e = (MessageEvent) evt;
439 Object msg = e.getMessage();
440
441 if (msg instanceof SpdyDataFrame) {
442
443 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
444 final int streamID = spdyDataFrame.getStreamId();
445
446
447 if (spdySession.isLocalSideClosed(streamID)) {
448 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
449 return;
450 }
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465 if (flowControl) {
466 synchronized (flowControlLock) {
467 int dataLength = spdyDataFrame.getData().readableBytes();
468 int sendWindowSize = spdySession.getSendWindowSize(streamID);
469
470 if (sendWindowSize >= dataLength) {
471
472 spdySession.updateSendWindowSize(streamID, -1 * dataLength);
473
474
475
476 final SocketAddress remoteAddress = e.getRemoteAddress();
477 final ChannelHandlerContext context = ctx;
478 e.getFuture().addListener(new ChannelFutureListener() {
479 public void operationComplete(ChannelFuture future) throws Exception {
480 if (!future.isSuccess()) {
481 issueStreamError(
482 context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
483 }
484 }
485 });
486
487 } else if (sendWindowSize > 0) {
488
489 spdySession.updateSendWindowSize(streamID, -1 * sendWindowSize);
490
491
492 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
493 partialDataFrame.setData(spdyDataFrame.getData().readSlice(sendWindowSize));
494
495
496 spdySession.putPendingWrite(streamID, e);
497
498 ChannelFuture writeFuture = Channels.future(e.getChannel());
499
500
501
502 final SocketAddress remoteAddress = e.getRemoteAddress();
503 final ChannelHandlerContext context = ctx;
504 e.getFuture().addListener(new ChannelFutureListener() {
505 public void operationComplete(ChannelFuture future) throws Exception {
506 if (!future.isSuccess()) {
507 issueStreamError(
508 context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
509 }
510 }
511 });
512
513 Channels.write(ctx, writeFuture, partialDataFrame, remoteAddress);
514 return;
515
516 } else {
517
518 spdySession.putPendingWrite(streamID, e);
519 return;
520 }
521 }
522 }
523
524
525 if (spdyDataFrame.isLast()) {
526 halfCloseStream(streamID, false);
527 }
528
529 } else if (msg instanceof SpdySynStreamFrame) {
530
531 SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
532 int streamID = spdySynStreamFrame.getStreamId();
533
534 if (isRemoteInitiatedID(streamID)) {
535 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
536 return;
537 }
538
539 byte priority = spdySynStreamFrame.getPriority();
540 boolean remoteSideClosed = spdySynStreamFrame.isUnidirectional();
541 boolean localSideClosed = spdySynStreamFrame.isLast();
542 if (!acceptStream(streamID, priority, remoteSideClosed, localSideClosed)) {
543 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
544 return;
545 }
546
547 } else if (msg instanceof SpdySynReplyFrame) {
548
549 SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
550 int streamID = spdySynReplyFrame.getStreamId();
551
552
553 if (!isRemoteInitiatedID(streamID) || spdySession.isLocalSideClosed(streamID)) {
554 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
555 return;
556 }
557
558
559 if (spdySynReplyFrame.isLast()) {
560 halfCloseStream(streamID, false);
561 }
562
563 } else if (msg instanceof SpdyRstStreamFrame) {
564
565 SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
566 removeStream(spdyRstStreamFrame.getStreamId());
567
568 } else if (msg instanceof SpdySettingsFrame) {
569
570 SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
571
572 int newConcurrentStreams =
573 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
574 if (newConcurrentStreams >= 0) {
575 updateConcurrentStreams(newConcurrentStreams, false);
576 }
577
578
579
580
581 if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
582 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
583 }
584 spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
585
586 if (flowControl) {
587 int newInitialWindowSize =
588 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
589 if (newInitialWindowSize >= 0) {
590 updateInitialReceiveWindowSize(newInitialWindowSize);
591 }
592 }
593
594 } else if (msg instanceof SpdyPingFrame) {
595
596 SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
597 if (isRemoteInitiatedID(spdyPingFrame.getId())) {
598 e.getFuture().setFailure(new IllegalArgumentException(
599 "invalid PING ID: " + spdyPingFrame.getId()));
600 return;
601 }
602 pings.getAndIncrement();
603
604 } else if (msg instanceof SpdyGoAwayFrame) {
605
606
607
608 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
609 return;
610
611 } else if (msg instanceof SpdyHeadersFrame) {
612
613 SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
614 int streamID = spdyHeadersFrame.getStreamId();
615
616
617 if (spdySession.isLocalSideClosed(streamID)) {
618 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
619 return;
620 }
621
622
623 if (spdyHeadersFrame.isLast()) {
624 halfCloseStream(streamID, false);
625 }
626
627 } else if (msg instanceof SpdyWindowUpdateFrame) {
628
629
630 e.getFuture().setFailure(PROTOCOL_EXCEPTION);
631 return;
632 }
633
634 ctx.sendDownstream(evt);
635 }
636
637
638
639
640
641
642
643
644
645
646 private void issueSessionError(
647 ChannelHandlerContext ctx, Channel channel, SocketAddress remoteAddress, SpdySessionStatus status) {
648
649 ChannelFuture future = sendGoAwayFrame(ctx, channel, remoteAddress, status);
650 future.addListener(ChannelFutureListener.CLOSE);
651 }
652
653
654
655
656
657
658
659
660
661
662
663
664 private void issueStreamError(
665 ChannelHandlerContext ctx, SocketAddress remoteAddress, int streamID, SpdyStreamStatus status) {
666
667 boolean fireMessageReceived = !spdySession.isRemoteSideClosed(streamID);
668 removeStream(streamID);
669
670 SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamID, status);
671 Channels.write(ctx, Channels.future(ctx.getChannel()), spdyRstStreamFrame, remoteAddress);
672 if (fireMessageReceived) {
673 Channels.fireMessageReceived(ctx, spdyRstStreamFrame, remoteAddress);
674 }
675 }
676
677
678
679
680
681 private boolean isRemoteInitiatedID(int id) {
682 boolean serverID = isServerId(id);
683 return server && !serverID || !server && serverID;
684 }
685
686 private void updateConcurrentStreams(int newConcurrentStreams, boolean remote) {
687 if (remote) {
688 remoteConcurrentStreams = newConcurrentStreams;
689 } else {
690 localConcurrentStreams = newConcurrentStreams;
691 }
692 if (localConcurrentStreams == remoteConcurrentStreams) {
693 maxConcurrentStreams = localConcurrentStreams;
694 return;
695 }
696 if (localConcurrentStreams == 0) {
697 maxConcurrentStreams = remoteConcurrentStreams;
698 return;
699 }
700 if (remoteConcurrentStreams == 0) {
701 maxConcurrentStreams = localConcurrentStreams;
702 return;
703 }
704 if (localConcurrentStreams > remoteConcurrentStreams) {
705 maxConcurrentStreams = remoteConcurrentStreams;
706 } else {
707 maxConcurrentStreams = localConcurrentStreams;
708 }
709 }
710
711
712 private synchronized void updateInitialSendWindowSize(int newInitialWindowSize) {
713 int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
714 initialSendWindowSize = newInitialWindowSize;
715 for (Integer StreamID: spdySession.getActiveStreams()) {
716 spdySession.updateSendWindowSize(StreamID.intValue(), deltaWindowSize);
717 }
718 }
719
720
721 private synchronized void updateInitialReceiveWindowSize(int newInitialWindowSize) {
722 int deltaWindowSize = newInitialWindowSize - initialReceiveWindowSize;
723 initialReceiveWindowSize = newInitialWindowSize;
724 spdySession.updateAllReceiveWindowSizes(deltaWindowSize);
725 }
726
727
728 private synchronized boolean acceptStream(
729 int streamID, byte priority, boolean remoteSideClosed, boolean localSideClosed) {
730
731 if (receivedGoAwayFrame || sentGoAwayFrame) {
732 return false;
733 }
734
735 int maxConcurrentStreams = this.maxConcurrentStreams;
736 if (maxConcurrentStreams != 0 &&
737 spdySession.numActiveStreams() >= maxConcurrentStreams) {
738 return false;
739 }
740 spdySession.acceptStream(
741 streamID, priority, remoteSideClosed, localSideClosed,
742 initialSendWindowSize, initialReceiveWindowSize);
743 if (isRemoteInitiatedID(streamID)) {
744 lastGoodStreamID = streamID;
745 }
746 return true;
747 }
748
749 private void halfCloseStream(int streamID, boolean remote) {
750 if (remote) {
751 spdySession.closeRemoteSide(streamID);
752 } else {
753 spdySession.closeLocalSide(streamID);
754 }
755 if (closeSessionFuture != null && spdySession.noActiveStreams()) {
756 closeSessionFuture.setSuccess();
757 }
758 }
759
760 private void removeStream(int streamID) {
761 spdySession.removeStream(streamID);
762 if (closeSessionFuture != null && spdySession.noActiveStreams()) {
763 closeSessionFuture.setSuccess();
764 }
765 }
766
767 private void updateSendWindowSize(ChannelHandlerContext ctx, final int streamID, int deltaWindowSize) {
768 synchronized (flowControlLock) {
769 int newWindowSize = spdySession.updateSendWindowSize(streamID, deltaWindowSize);
770
771 while (newWindowSize > 0) {
772
773 MessageEvent e = spdySession.getPendingWrite(streamID);
774 if (e == null) {
775 break;
776 }
777
778 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) e.getMessage();
779 int dataFrameSize = spdyDataFrame.getData().readableBytes();
780
781 if (newWindowSize >= dataFrameSize) {
782
783 spdySession.removePendingWrite(streamID);
784 newWindowSize = spdySession.updateSendWindowSize(streamID, -1 * dataFrameSize);
785
786
787
788 final SocketAddress remoteAddress = e.getRemoteAddress();
789 final ChannelHandlerContext context = ctx;
790 e.getFuture().addListener(new ChannelFutureListener() {
791 public void operationComplete(ChannelFuture future) throws Exception {
792 if (!future.isSuccess()) {
793 issueStreamError(context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
794 }
795 }
796 });
797
798
799 if (spdyDataFrame.isLast()) {
800 halfCloseStream(streamID, false);
801 }
802
803 Channels.write(ctx, e.getFuture(), spdyDataFrame, e.getRemoteAddress());
804
805 } else {
806
807 spdySession.updateSendWindowSize(streamID, -1 * newWindowSize);
808
809
810 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamID);
811 partialDataFrame.setData(spdyDataFrame.getData().readSlice(newWindowSize));
812
813 ChannelFuture writeFuture = Channels.future(e.getChannel());
814
815
816
817 final SocketAddress remoteAddress = e.getRemoteAddress();
818 final ChannelHandlerContext context = ctx;
819 e.getFuture().addListener(new ChannelFutureListener() {
820 public void operationComplete(ChannelFuture future) throws Exception {
821 if (!future.isSuccess()) {
822 issueStreamError(context, remoteAddress, streamID, SpdyStreamStatus.INTERNAL_ERROR);
823 }
824 }
825 });
826
827 Channels.write(ctx, writeFuture, partialDataFrame, remoteAddress);
828
829 newWindowSize = 0;
830 }
831 }
832 }
833 }
834
835 private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelStateEvent e) {
836
837 if (!e.getChannel().isConnected()) {
838 ctx.sendDownstream(e);
839 return;
840 }
841
842 ChannelFuture future = sendGoAwayFrame(ctx, e.getChannel(), null, SpdySessionStatus.OK);
843 if (spdySession.noActiveStreams()) {
844 future.addListener(new ClosingChannelFutureListener(ctx, e));
845 } else {
846 closeSessionFuture = Channels.future(e.getChannel());
847 closeSessionFuture.addListener(new ClosingChannelFutureListener(ctx, e));
848 }
849 }
850
851 private synchronized ChannelFuture sendGoAwayFrame(
852 ChannelHandlerContext ctx, Channel channel, SocketAddress remoteAddress, SpdySessionStatus status) {
853 if (!sentGoAwayFrame) {
854 sentGoAwayFrame = true;
855 SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamID, status);
856 ChannelFuture future = Channels.future(channel);
857 Channels.write(ctx, future, spdyGoAwayFrame, remoteAddress);
858 return future;
859 }
860 return Channels.succeededFuture(channel);
861 }
862
863 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
864 private final ChannelHandlerContext ctx;
865 private final ChannelStateEvent e;
866
867 ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelStateEvent e) {
868 this.ctx = ctx;
869 this.e = e;
870 }
871
872 public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
873 if (!(sentGoAwayFuture.getCause() instanceof ClosedChannelException)) {
874 Channels.close(ctx, e.getFuture());
875 } else {
876 e.getFuture().setSuccess();
877 }
878 }
879 }
880 }