1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.spdy;
17
18 import io.netty.channel.ChannelDuplexHandler;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelFutureListener;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelPromise;
23 import io.netty.util.internal.ThrowableUtil;
24
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
28
29
30
31
32 public class SpdySessionHandler
33 extends ChannelDuplexHandler {
34
35 private static final SpdyProtocolException PROTOCOL_EXCEPTION = ThrowableUtil.unknownStackTrace(
36 new SpdyProtocolException(), SpdySessionHandler.class, "handleOutboundMessage(...)");
37 private static final SpdyProtocolException STREAM_CLOSED = ThrowableUtil.unknownStackTrace(
38 new SpdyProtocolException("Stream closed"), SpdySessionHandler.class, "removeStream(...)");
39
40 private static final int DEFAULT_WINDOW_SIZE = 64 * 1024;
41 private int initialSendWindowSize = DEFAULT_WINDOW_SIZE;
42 private int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
43 private volatile int initialSessionReceiveWindowSize = DEFAULT_WINDOW_SIZE;
44
45 private final SpdySession spdySession = new SpdySession(initialSendWindowSize, initialReceiveWindowSize);
46 private int lastGoodStreamId;
47
48 private static final int DEFAULT_MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE;
49 private int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
50 private int localConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
51
52 private final AtomicInteger pings = new AtomicInteger();
53
54 private boolean sentGoAwayFrame;
55 private boolean receivedGoAwayFrame;
56
57 private ChannelFutureListener closeSessionFutureListener;
58
59 private final boolean server;
60 private final int minorVersion;
61
62
63
64
65
66
67
68
69
70
71 public SpdySessionHandler(SpdyVersion version, boolean server) {
72 if (version == null) {
73 throw new NullPointerException("version");
74 }
75 this.server = server;
76 minorVersion = version.getMinorVersion();
77 }
78
79 public void setSessionReceiveWindowSize(int sessionReceiveWindowSize) {
80 if (sessionReceiveWindowSize < 0) {
81 throw new IllegalArgumentException("sessionReceiveWindowSize");
82 }
83
84
85
86
87
88
89 initialSessionReceiveWindowSize = sessionReceiveWindowSize;
90 }
91
92 @Override
93 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
94 if (msg instanceof SpdyDataFrame) {
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
119 int streamId = spdyDataFrame.streamId();
120
121 int deltaWindowSize = -1 * spdyDataFrame.content().readableBytes();
122 int newSessionWindowSize =
123 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, deltaWindowSize);
124
125
126 if (newSessionWindowSize < 0) {
127 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
128 return;
129 }
130
131
132 if (newSessionWindowSize <= initialSessionReceiveWindowSize / 2) {
133 int sessionDeltaWindowSize = initialSessionReceiveWindowSize - newSessionWindowSize;
134 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
135 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
136 new DefaultSpdyWindowUpdateFrame(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
137 ctx.writeAndFlush(spdyWindowUpdateFrame);
138 }
139
140
141
142 if (!spdySession.isActiveStream(streamId)) {
143 spdyDataFrame.release();
144 if (streamId <= lastGoodStreamId) {
145 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
146 } else if (!sentGoAwayFrame) {
147 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
148 }
149 return;
150 }
151
152
153
154 if (spdySession.isRemoteSideClosed(streamId)) {
155 spdyDataFrame.release();
156 issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_ALREADY_CLOSED);
157 return;
158 }
159
160
161 if (!isRemoteInitiatedId(streamId) && !spdySession.hasReceivedReply(streamId)) {
162 spdyDataFrame.release();
163 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
164 return;
165 }
166
167
168
169
170
171
172
173
174 int newWindowSize = spdySession.updateReceiveWindowSize(streamId, deltaWindowSize);
175
176
177
178
179
180
181 if (newWindowSize < spdySession.getReceiveWindowSizeLowerBound(streamId)) {
182 spdyDataFrame.release();
183 issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
184 return;
185 }
186
187
188
189 if (newWindowSize < 0) {
190 while (spdyDataFrame.content().readableBytes() > initialReceiveWindowSize) {
191 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId,
192 spdyDataFrame.content().readSlice(initialReceiveWindowSize).retain());
193 ctx.writeAndFlush(partialDataFrame);
194 }
195 }
196
197
198 if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) {
199 int streamDeltaWindowSize = initialReceiveWindowSize - newWindowSize;
200 spdySession.updateReceiveWindowSize(streamId, streamDeltaWindowSize);
201 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
202 new DefaultSpdyWindowUpdateFrame(streamId, streamDeltaWindowSize);
203 ctx.writeAndFlush(spdyWindowUpdateFrame);
204 }
205
206
207 if (spdyDataFrame.isLast()) {
208 halfCloseStream(streamId, true, ctx.newSucceededFuture());
209 }
210
211 } else if (msg instanceof SpdySynStreamFrame) {
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227 SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
228 int streamId = spdySynStreamFrame.streamId();
229
230
231 if (spdySynStreamFrame.isInvalid() ||
232 !isRemoteInitiatedId(streamId) ||
233 spdySession.isActiveStream(streamId)) {
234 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
235 return;
236 }
237
238
239 if (streamId <= lastGoodStreamId) {
240 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
241 return;
242 }
243
244
245 byte priority = spdySynStreamFrame.priority();
246 boolean remoteSideClosed = spdySynStreamFrame.isLast();
247 boolean localSideClosed = spdySynStreamFrame.isUnidirectional();
248 if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
249 issueStreamError(ctx, streamId, SpdyStreamStatus.REFUSED_STREAM);
250 return;
251 }
252
253 } else if (msg instanceof SpdySynReplyFrame) {
254
255
256
257
258
259
260
261
262 SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
263 int streamId = spdySynReplyFrame.streamId();
264
265
266 if (spdySynReplyFrame.isInvalid() ||
267 isRemoteInitiatedId(streamId) ||
268 spdySession.isRemoteSideClosed(streamId)) {
269 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
270 return;
271 }
272
273
274 if (spdySession.hasReceivedReply(streamId)) {
275 issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_IN_USE);
276 return;
277 }
278
279 spdySession.receivedReply(streamId);
280
281
282 if (spdySynReplyFrame.isLast()) {
283 halfCloseStream(streamId, true, ctx.newSucceededFuture());
284 }
285
286 } else if (msg instanceof SpdyRstStreamFrame) {
287
288
289
290
291
292
293
294
295
296
297 SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
298 removeStream(spdyRstStreamFrame.streamId(), ctx.newSucceededFuture());
299
300 } else if (msg instanceof SpdySettingsFrame) {
301
302 SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
303
304 int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
305 if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
306
307 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
308 return;
309 }
310
311 int newConcurrentStreams =
312 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
313 if (newConcurrentStreams >= 0) {
314 remoteConcurrentStreams = newConcurrentStreams;
315 }
316
317
318
319
320 if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
321 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
322 }
323 spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
324
325 int newInitialWindowSize =
326 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
327 if (newInitialWindowSize >= 0) {
328 updateInitialSendWindowSize(newInitialWindowSize);
329 }
330
331 } else if (msg instanceof SpdyPingFrame) {
332
333
334
335
336
337
338
339
340
341
342 SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
343
344 if (isRemoteInitiatedId(spdyPingFrame.id())) {
345 ctx.writeAndFlush(spdyPingFrame);
346 return;
347 }
348
349
350 if (pings.get() == 0) {
351 return;
352 }
353 pings.getAndDecrement();
354
355 } else if (msg instanceof SpdyGoAwayFrame) {
356
357 receivedGoAwayFrame = true;
358
359 } else if (msg instanceof SpdyHeadersFrame) {
360
361 SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
362 int streamId = spdyHeadersFrame.streamId();
363
364
365 if (spdyHeadersFrame.isInvalid()) {
366 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
367 return;
368 }
369
370 if (spdySession.isRemoteSideClosed(streamId)) {
371 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
372 return;
373 }
374
375
376 if (spdyHeadersFrame.isLast()) {
377 halfCloseStream(streamId, true, ctx.newSucceededFuture());
378 }
379
380 } else if (msg instanceof SpdyWindowUpdateFrame) {
381
382
383
384
385
386
387
388
389
390
391
392 SpdyWindowUpdateFrame spdyWindowUpdateFrame = (SpdyWindowUpdateFrame) msg;
393 int streamId = spdyWindowUpdateFrame.streamId();
394 int deltaWindowSize = spdyWindowUpdateFrame.deltaWindowSize();
395
396
397 if (streamId != SPDY_SESSION_STREAM_ID && spdySession.isLocalSideClosed(streamId)) {
398 return;
399 }
400
401
402 if (spdySession.getSendWindowSize(streamId) > Integer.MAX_VALUE - deltaWindowSize) {
403 if (streamId == SPDY_SESSION_STREAM_ID) {
404 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
405 } else {
406 issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
407 }
408 return;
409 }
410
411 updateSendWindowSize(ctx, streamId, deltaWindowSize);
412 }
413
414 ctx.fireChannelRead(msg);
415 }
416
417 @Override
418 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
419 for (Integer streamId: spdySession.activeStreams().keySet()) {
420 removeStream(streamId, ctx.newSucceededFuture());
421 }
422 ctx.fireChannelInactive();
423 }
424
425 @Override
426 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
427 if (cause instanceof SpdyProtocolException) {
428 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
429 }
430
431 ctx.fireExceptionCaught(cause);
432 }
433
434 @Override
435 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
436 sendGoAwayFrame(ctx, promise);
437 }
438
439 @Override
440 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
441 if (msg instanceof SpdyDataFrame ||
442 msg instanceof SpdySynStreamFrame ||
443 msg instanceof SpdySynReplyFrame ||
444 msg instanceof SpdyRstStreamFrame ||
445 msg instanceof SpdySettingsFrame ||
446 msg instanceof SpdyPingFrame ||
447 msg instanceof SpdyGoAwayFrame ||
448 msg instanceof SpdyHeadersFrame ||
449 msg instanceof SpdyWindowUpdateFrame) {
450
451 handleOutboundMessage(ctx, msg, promise);
452 } else {
453 ctx.write(msg, promise);
454 }
455 }
456
457 private void handleOutboundMessage(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
458 if (msg instanceof SpdyDataFrame) {
459
460 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
461 int streamId = spdyDataFrame.streamId();
462
463
464 if (spdySession.isLocalSideClosed(streamId)) {
465 spdyDataFrame.release();
466 promise.setFailure(PROTOCOL_EXCEPTION);
467 return;
468 }
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483 int dataLength = spdyDataFrame.content().readableBytes();
484 int sendWindowSize = spdySession.getSendWindowSize(streamId);
485 int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
486 sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
487
488 if (sendWindowSize <= 0) {
489
490 spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
491 return;
492 } else if (sendWindowSize < dataLength) {
493
494 spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize);
495 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
496
497
498 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(streamId,
499 spdyDataFrame.content().readSlice(sendWindowSize).retain());
500
501
502 spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
503
504
505
506 final ChannelHandlerContext context = ctx;
507 ctx.write(partialDataFrame).addListener(new ChannelFutureListener() {
508 @Override
509 public void operationComplete(ChannelFuture future) throws Exception {
510 if (!future.isSuccess()) {
511 issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
512 }
513 }
514 });
515 return;
516 } else {
517
518 spdySession.updateSendWindowSize(streamId, -1 * dataLength);
519 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength);
520
521
522
523 final ChannelHandlerContext context = ctx;
524 promise.addListener(new ChannelFutureListener() {
525 @Override
526 public void operationComplete(ChannelFuture future) throws Exception {
527 if (!future.isSuccess()) {
528 issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
529 }
530 }
531 });
532 }
533
534
535 if (spdyDataFrame.isLast()) {
536 halfCloseStream(streamId, false, promise);
537 }
538
539 } else if (msg instanceof SpdySynStreamFrame) {
540
541 SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
542 int streamId = spdySynStreamFrame.streamId();
543
544 if (isRemoteInitiatedId(streamId)) {
545 promise.setFailure(PROTOCOL_EXCEPTION);
546 return;
547 }
548
549 byte priority = spdySynStreamFrame.priority();
550 boolean remoteSideClosed = spdySynStreamFrame.isUnidirectional();
551 boolean localSideClosed = spdySynStreamFrame.isLast();
552 if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
553 promise.setFailure(PROTOCOL_EXCEPTION);
554 return;
555 }
556
557 } else if (msg instanceof SpdySynReplyFrame) {
558
559 SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
560 int streamId = spdySynReplyFrame.streamId();
561
562
563 if (!isRemoteInitiatedId(streamId) || spdySession.isLocalSideClosed(streamId)) {
564 promise.setFailure(PROTOCOL_EXCEPTION);
565 return;
566 }
567
568
569 if (spdySynReplyFrame.isLast()) {
570 halfCloseStream(streamId, false, promise);
571 }
572
573 } else if (msg instanceof SpdyRstStreamFrame) {
574
575 SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
576 removeStream(spdyRstStreamFrame.streamId(), promise);
577
578 } else if (msg instanceof SpdySettingsFrame) {
579
580 SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
581
582 int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
583 if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
584
585 promise.setFailure(PROTOCOL_EXCEPTION);
586 return;
587 }
588
589 int newConcurrentStreams =
590 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
591 if (newConcurrentStreams >= 0) {
592 localConcurrentStreams = newConcurrentStreams;
593 }
594
595
596
597
598 if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
599 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
600 }
601 spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
602
603 int newInitialWindowSize =
604 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
605 if (newInitialWindowSize >= 0) {
606 updateInitialReceiveWindowSize(newInitialWindowSize);
607 }
608
609 } else if (msg instanceof SpdyPingFrame) {
610
611 SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
612 if (isRemoteInitiatedId(spdyPingFrame.id())) {
613 ctx.fireExceptionCaught(new IllegalArgumentException(
614 "invalid PING ID: " + spdyPingFrame.id()));
615 return;
616 }
617 pings.getAndIncrement();
618
619 } else if (msg instanceof SpdyGoAwayFrame) {
620
621
622
623 promise.setFailure(PROTOCOL_EXCEPTION);
624 return;
625
626 } else if (msg instanceof SpdyHeadersFrame) {
627
628 SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
629 int streamId = spdyHeadersFrame.streamId();
630
631
632 if (spdySession.isLocalSideClosed(streamId)) {
633 promise.setFailure(PROTOCOL_EXCEPTION);
634 return;
635 }
636
637
638 if (spdyHeadersFrame.isLast()) {
639 halfCloseStream(streamId, false, promise);
640 }
641
642 } else if (msg instanceof SpdyWindowUpdateFrame) {
643
644
645 promise.setFailure(PROTOCOL_EXCEPTION);
646 return;
647 }
648
649 ctx.write(msg, promise);
650 }
651
652
653
654
655
656
657
658
659
660
661 private void issueSessionError(
662 ChannelHandlerContext ctx, SpdySessionStatus status) {
663
664 sendGoAwayFrame(ctx, status).addListener(new ClosingChannelFutureListener(ctx, ctx.newPromise()));
665 }
666
667
668
669
670
671
672
673
674
675
676
677
678 private void issueStreamError(ChannelHandlerContext ctx, int streamId, SpdyStreamStatus status) {
679 boolean fireChannelRead = !spdySession.isRemoteSideClosed(streamId);
680 ChannelPromise promise = ctx.newPromise();
681 removeStream(streamId, promise);
682
683 SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, status);
684 ctx.writeAndFlush(spdyRstStreamFrame, promise);
685 if (fireChannelRead) {
686 ctx.fireChannelRead(spdyRstStreamFrame);
687 }
688 }
689
690
691
692
693
694 private boolean isRemoteInitiatedId(int id) {
695 boolean serverId = isServerId(id);
696 return server && !serverId || !server && serverId;
697 }
698
699
700 private void updateInitialSendWindowSize(int newInitialWindowSize) {
701 int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
702 initialSendWindowSize = newInitialWindowSize;
703 spdySession.updateAllSendWindowSizes(deltaWindowSize);
704 }
705
706
707 private void updateInitialReceiveWindowSize(int newInitialWindowSize) {
708 int deltaWindowSize = newInitialWindowSize - initialReceiveWindowSize;
709 initialReceiveWindowSize = newInitialWindowSize;
710 spdySession.updateAllReceiveWindowSizes(deltaWindowSize);
711 }
712
713
714 private boolean acceptStream(
715 int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed) {
716
717 if (receivedGoAwayFrame || sentGoAwayFrame) {
718 return false;
719 }
720
721 boolean remote = isRemoteInitiatedId(streamId);
722 int maxConcurrentStreams = remote ? localConcurrentStreams : remoteConcurrentStreams;
723 if (spdySession.numActiveStreams(remote) >= maxConcurrentStreams) {
724 return false;
725 }
726 spdySession.acceptStream(
727 streamId, priority, remoteSideClosed, localSideClosed,
728 initialSendWindowSize, initialReceiveWindowSize, remote);
729 if (remote) {
730 lastGoodStreamId = streamId;
731 }
732 return true;
733 }
734
735 private void halfCloseStream(int streamId, boolean remote, ChannelFuture future) {
736 if (remote) {
737 spdySession.closeRemoteSide(streamId, isRemoteInitiatedId(streamId));
738 } else {
739 spdySession.closeLocalSide(streamId, isRemoteInitiatedId(streamId));
740 }
741 if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
742 future.addListener(closeSessionFutureListener);
743 }
744 }
745
746 private void removeStream(int streamId, ChannelFuture future) {
747 spdySession.removeStream(streamId, STREAM_CLOSED, isRemoteInitiatedId(streamId));
748
749 if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
750 future.addListener(closeSessionFutureListener);
751 }
752 }
753
754 private void updateSendWindowSize(final ChannelHandlerContext ctx, int streamId, int deltaWindowSize) {
755 spdySession.updateSendWindowSize(streamId, deltaWindowSize);
756
757 while (true) {
758
759 SpdySession.PendingWrite pendingWrite = spdySession.getPendingWrite(streamId);
760 if (pendingWrite == null) {
761 return;
762 }
763
764 SpdyDataFrame spdyDataFrame = pendingWrite.spdyDataFrame;
765 int dataFrameSize = spdyDataFrame.content().readableBytes();
766 int writeStreamId = spdyDataFrame.streamId();
767 int sendWindowSize = spdySession.getSendWindowSize(writeStreamId);
768 int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
769 sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
770
771 if (sendWindowSize <= 0) {
772 return;
773 } else if (sendWindowSize < dataFrameSize) {
774
775 spdySession.updateSendWindowSize(writeStreamId, -1 * sendWindowSize);
776 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
777
778
779 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(writeStreamId,
780 spdyDataFrame.content().readSlice(sendWindowSize).retain());
781
782
783
784 ctx.writeAndFlush(partialDataFrame).addListener(new ChannelFutureListener() {
785 @Override
786 public void operationComplete(ChannelFuture future) throws Exception {
787 if (!future.isSuccess()) {
788 issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
789 }
790 }
791 });
792 } else {
793
794 spdySession.removePendingWrite(writeStreamId);
795 spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize);
796 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize);
797
798
799 if (spdyDataFrame.isLast()) {
800 halfCloseStream(writeStreamId, false, pendingWrite.promise);
801 }
802
803
804
805 ctx.writeAndFlush(spdyDataFrame, pendingWrite.promise).addListener(new ChannelFutureListener() {
806 @Override
807 public void operationComplete(ChannelFuture future) throws Exception {
808 if (!future.isSuccess()) {
809 issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
810 }
811 }
812 });
813 }
814 }
815 }
816
817 private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelPromise future) {
818
819 if (!ctx.channel().isActive()) {
820 ctx.close(future);
821 return;
822 }
823
824 ChannelFuture f = sendGoAwayFrame(ctx, SpdySessionStatus.OK);
825 if (spdySession.noActiveStreams()) {
826 f.addListener(new ClosingChannelFutureListener(ctx, future));
827 } else {
828 closeSessionFutureListener = new ClosingChannelFutureListener(ctx, future);
829 }
830
831 }
832
833 private ChannelFuture sendGoAwayFrame(
834 ChannelHandlerContext ctx, SpdySessionStatus status) {
835 if (!sentGoAwayFrame) {
836 sentGoAwayFrame = true;
837 SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamId, status);
838 return ctx.writeAndFlush(spdyGoAwayFrame);
839 } else {
840 return ctx.newSucceededFuture();
841 }
842 }
843
844 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
845 private final ChannelHandlerContext ctx;
846 private final ChannelPromise promise;
847
848 ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
849 this.ctx = ctx;
850 this.promise = promise;
851 }
852
853 @Override
854 public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
855 ctx.close(promise);
856 }
857 }
858 }