1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.spdy;
17
18 import io.netty.channel.ChannelDuplexHandler;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelFutureListener;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelPromise;
23 import io.netty.util.internal.ObjectUtil;
24
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_SESSION_STREAM_ID;
28 import static io.netty.handler.codec.spdy.SpdyCodecUtil.isServerId;
29 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
30
31
32
33
34 public class SpdySessionHandler extends ChannelDuplexHandler {
35
36 private static final SpdyProtocolException PROTOCOL_EXCEPTION =
37 SpdyProtocolException.newStatic(null, SpdySessionHandler.class, "handleOutboundMessage(...)");
38 private static final SpdyProtocolException STREAM_CLOSED =
39 SpdyProtocolException.newStatic("Stream closed", SpdySessionHandler.class, "removeStream(...)");
40
41 private static final int DEFAULT_WINDOW_SIZE = 64 * 1024;
42 private int initialSendWindowSize = DEFAULT_WINDOW_SIZE;
43 private int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
44 private volatile int initialSessionReceiveWindowSize = DEFAULT_WINDOW_SIZE;
45
46 private final SpdySession spdySession = new SpdySession(initialSendWindowSize, initialReceiveWindowSize);
47 private int lastGoodStreamId;
48
49 private static final int DEFAULT_MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE;
50 private int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
51 private int localConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
52
53 private final AtomicInteger pings = new AtomicInteger();
54
55 private boolean sentGoAwayFrame;
56 private boolean receivedGoAwayFrame;
57
58 private ChannelFutureListener closeSessionFutureListener;
59
60 private final boolean server;
61 private final int minorVersion;
62
63
64
65
66
67
68
69
70
71
72 public SpdySessionHandler(SpdyVersion version, boolean server) {
73 this.minorVersion = ObjectUtil.checkNotNull(version, "version").minorVersion();
74 this.server = server;
75 }
76
77 public void setSessionReceiveWindowSize(int sessionReceiveWindowSize) {
78 checkPositiveOrZero(sessionReceiveWindowSize, "sessionReceiveWindowSize");
79
80
81
82
83
84
85 initialSessionReceiveWindowSize = sessionReceiveWindowSize;
86 }
87
88 @Override
89 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
90 if (msg instanceof SpdyDataFrame) {
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
115 int streamId = spdyDataFrame.streamId();
116
117 int deltaWindowSize = -1 * spdyDataFrame.content().readableBytes();
118 int newSessionWindowSize =
119 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, deltaWindowSize);
120
121
122 if (newSessionWindowSize < 0) {
123 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
124 return;
125 }
126
127
128 if (newSessionWindowSize <= initialSessionReceiveWindowSize / 2) {
129 int sessionDeltaWindowSize = initialSessionReceiveWindowSize - newSessionWindowSize;
130 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
131 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
132 new DefaultSpdyWindowUpdateFrame(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
133 ctx.writeAndFlush(spdyWindowUpdateFrame);
134 }
135
136
137
138 if (!spdySession.isActiveStream(streamId)) {
139 spdyDataFrame.release();
140 if (streamId <= lastGoodStreamId) {
141 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
142 } else if (!sentGoAwayFrame) {
143 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
144 }
145 return;
146 }
147
148
149
150 if (spdySession.isRemoteSideClosed(streamId)) {
151 spdyDataFrame.release();
152 issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_ALREADY_CLOSED);
153 return;
154 }
155
156
157 if (!isRemoteInitiatedId(streamId) && !spdySession.hasReceivedReply(streamId)) {
158 spdyDataFrame.release();
159 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
160 return;
161 }
162
163
164
165
166
167
168
169
170 int newWindowSize = spdySession.updateReceiveWindowSize(streamId, deltaWindowSize);
171
172
173
174
175
176
177 if (newWindowSize < spdySession.getReceiveWindowSizeLowerBound(streamId)) {
178 spdyDataFrame.release();
179 issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
180 return;
181 }
182
183
184
185 if (newWindowSize < 0) {
186 while (spdyDataFrame.content().readableBytes() > initialReceiveWindowSize) {
187 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(
188 streamId, spdyDataFrame.content().readRetainedSlice(initialReceiveWindowSize));
189 ctx.writeAndFlush(partialDataFrame);
190 }
191 }
192
193
194 if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) {
195 int streamDeltaWindowSize = initialReceiveWindowSize - newWindowSize;
196 spdySession.updateReceiveWindowSize(streamId, streamDeltaWindowSize);
197 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
198 new DefaultSpdyWindowUpdateFrame(streamId, streamDeltaWindowSize);
199 ctx.writeAndFlush(spdyWindowUpdateFrame);
200 }
201
202
203 if (spdyDataFrame.isLast()) {
204 halfCloseStream(streamId, true, ctx.newSucceededFuture());
205 }
206
207 } else if (msg instanceof SpdySynStreamFrame) {
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223 SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
224 int streamId = spdySynStreamFrame.streamId();
225
226
227 if (spdySynStreamFrame.isInvalid() ||
228 !isRemoteInitiatedId(streamId) ||
229 spdySession.isActiveStream(streamId)) {
230 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
231 return;
232 }
233
234
235 if (streamId <= lastGoodStreamId) {
236 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
237 return;
238 }
239
240
241 byte priority = spdySynStreamFrame.priority();
242 boolean remoteSideClosed = spdySynStreamFrame.isLast();
243 boolean localSideClosed = spdySynStreamFrame.isUnidirectional();
244 if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
245 issueStreamError(ctx, streamId, SpdyStreamStatus.REFUSED_STREAM);
246 return;
247 }
248
249 } else if (msg instanceof SpdySynReplyFrame) {
250
251
252
253
254
255
256
257
258 SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
259 int streamId = spdySynReplyFrame.streamId();
260
261
262 if (spdySynReplyFrame.isInvalid() ||
263 isRemoteInitiatedId(streamId) ||
264 spdySession.isRemoteSideClosed(streamId)) {
265 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
266 return;
267 }
268
269
270 if (spdySession.hasReceivedReply(streamId)) {
271 issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_IN_USE);
272 return;
273 }
274
275 spdySession.receivedReply(streamId);
276
277
278 if (spdySynReplyFrame.isLast()) {
279 halfCloseStream(streamId, true, ctx.newSucceededFuture());
280 }
281
282 } else if (msg instanceof SpdyRstStreamFrame) {
283
284
285
286
287
288
289
290
291
292
293 SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
294 removeStream(spdyRstStreamFrame.streamId(), ctx.newSucceededFuture());
295
296 } else if (msg instanceof SpdySettingsFrame) {
297
298 SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
299
300 int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
301 if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
302
303 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
304 return;
305 }
306
307 int newConcurrentStreams =
308 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
309 if (newConcurrentStreams >= 0) {
310 remoteConcurrentStreams = newConcurrentStreams;
311 }
312
313
314
315
316 if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
317 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
318 }
319 spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
320
321 int newInitialWindowSize =
322 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
323 if (newInitialWindowSize >= 0) {
324 updateInitialSendWindowSize(newInitialWindowSize);
325 }
326
327 } else if (msg instanceof SpdyPingFrame) {
328
329
330
331
332
333
334
335
336
337
338 SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
339
340 if (isRemoteInitiatedId(spdyPingFrame.id())) {
341 ctx.writeAndFlush(spdyPingFrame);
342 return;
343 }
344
345
346 if (pings.get() == 0) {
347 return;
348 }
349 pings.getAndDecrement();
350
351 } else if (msg instanceof SpdyGoAwayFrame) {
352
353 receivedGoAwayFrame = true;
354
355 } else if (msg instanceof SpdyHeadersFrame) {
356
357 SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
358 int streamId = spdyHeadersFrame.streamId();
359
360
361 if (spdyHeadersFrame.isInvalid()) {
362 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
363 return;
364 }
365
366 if (spdySession.isRemoteSideClosed(streamId)) {
367 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
368 return;
369 }
370
371
372 if (spdyHeadersFrame.isLast()) {
373 halfCloseStream(streamId, true, ctx.newSucceededFuture());
374 }
375
376 } else if (msg instanceof SpdyWindowUpdateFrame) {
377
378
379
380
381
382
383
384
385
386
387
388 SpdyWindowUpdateFrame spdyWindowUpdateFrame = (SpdyWindowUpdateFrame) msg;
389 int streamId = spdyWindowUpdateFrame.streamId();
390 int deltaWindowSize = spdyWindowUpdateFrame.deltaWindowSize();
391
392
393 if (streamId != SPDY_SESSION_STREAM_ID && spdySession.isLocalSideClosed(streamId)) {
394 return;
395 }
396
397
398 if (spdySession.getSendWindowSize(streamId) > Integer.MAX_VALUE - deltaWindowSize) {
399 if (streamId == SPDY_SESSION_STREAM_ID) {
400 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
401 } else {
402 issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
403 }
404 return;
405 }
406
407 updateSendWindowSize(ctx, streamId, deltaWindowSize);
408 }
409
410 ctx.fireChannelRead(msg);
411 }
412
413 @Override
414 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
415 for (Integer streamId: spdySession.activeStreams().keySet()) {
416 removeStream(streamId, ctx.newSucceededFuture());
417 }
418 ctx.fireChannelInactive();
419 }
420
421 @Override
422 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
423 if (cause instanceof SpdyProtocolException) {
424 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
425 }
426
427 ctx.fireExceptionCaught(cause);
428 }
429
430 @Override
431 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
432 sendGoAwayFrame(ctx, promise);
433 }
434
435 @Override
436 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
437 if (msg instanceof SpdyDataFrame ||
438 msg instanceof SpdySynStreamFrame ||
439 msg instanceof SpdySynReplyFrame ||
440 msg instanceof SpdyRstStreamFrame ||
441 msg instanceof SpdySettingsFrame ||
442 msg instanceof SpdyPingFrame ||
443 msg instanceof SpdyGoAwayFrame ||
444 msg instanceof SpdyHeadersFrame ||
445 msg instanceof SpdyWindowUpdateFrame) {
446
447 handleOutboundMessage(ctx, msg, promise);
448 } else {
449 ctx.write(msg, promise);
450 }
451 }
452
453 private void handleOutboundMessage(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
454 if (msg instanceof SpdyDataFrame) {
455
456 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
457 int streamId = spdyDataFrame.streamId();
458
459
460 if (spdySession.isLocalSideClosed(streamId)) {
461 spdyDataFrame.release();
462 promise.setFailure(PROTOCOL_EXCEPTION);
463 return;
464 }
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479 int dataLength = spdyDataFrame.content().readableBytes();
480 int sendWindowSize = spdySession.getSendWindowSize(streamId);
481 int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
482 sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
483
484 if (sendWindowSize <= 0) {
485
486 spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
487 return;
488 } else if (sendWindowSize < dataLength) {
489
490 spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize);
491 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
492
493
494 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(
495 streamId, spdyDataFrame.content().readRetainedSlice(sendWindowSize));
496
497
498 spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
499
500
501
502 final ChannelHandlerContext context = ctx;
503 ctx.write(partialDataFrame).addListener(new ChannelFutureListener() {
504 @Override
505 public void operationComplete(ChannelFuture future) throws Exception {
506 if (!future.isSuccess()) {
507 issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
508 }
509 }
510 });
511 return;
512 } else {
513
514 spdySession.updateSendWindowSize(streamId, -1 * dataLength);
515 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength);
516
517
518
519 final ChannelHandlerContext context = ctx;
520 promise.addListener(new ChannelFutureListener() {
521 @Override
522 public void operationComplete(ChannelFuture future) throws Exception {
523 if (!future.isSuccess()) {
524 issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
525 }
526 }
527 });
528 }
529
530
531 if (spdyDataFrame.isLast()) {
532 halfCloseStream(streamId, false, promise);
533 }
534
535 } else if (msg instanceof SpdySynStreamFrame) {
536
537 SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
538 int streamId = spdySynStreamFrame.streamId();
539
540 if (isRemoteInitiatedId(streamId)) {
541 promise.setFailure(PROTOCOL_EXCEPTION);
542 return;
543 }
544
545 byte priority = spdySynStreamFrame.priority();
546 boolean remoteSideClosed = spdySynStreamFrame.isUnidirectional();
547 boolean localSideClosed = spdySynStreamFrame.isLast();
548 if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
549 promise.setFailure(PROTOCOL_EXCEPTION);
550 return;
551 }
552
553 } else if (msg instanceof SpdySynReplyFrame) {
554
555 SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
556 int streamId = spdySynReplyFrame.streamId();
557
558
559 if (!isRemoteInitiatedId(streamId) || spdySession.isLocalSideClosed(streamId)) {
560 promise.setFailure(PROTOCOL_EXCEPTION);
561 return;
562 }
563
564
565 if (spdySynReplyFrame.isLast()) {
566 halfCloseStream(streamId, false, promise);
567 }
568
569 } else if (msg instanceof SpdyRstStreamFrame) {
570
571 SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
572 removeStream(spdyRstStreamFrame.streamId(), promise);
573
574 } else if (msg instanceof SpdySettingsFrame) {
575
576 SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
577
578 int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
579 if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
580
581 promise.setFailure(PROTOCOL_EXCEPTION);
582 return;
583 }
584
585 int newConcurrentStreams =
586 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
587 if (newConcurrentStreams >= 0) {
588 localConcurrentStreams = newConcurrentStreams;
589 }
590
591
592
593
594 if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
595 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
596 }
597 spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
598
599 int newInitialWindowSize =
600 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
601 if (newInitialWindowSize >= 0) {
602 updateInitialReceiveWindowSize(newInitialWindowSize);
603 }
604
605 } else if (msg instanceof SpdyPingFrame) {
606
607 SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
608 if (isRemoteInitiatedId(spdyPingFrame.id())) {
609 ctx.fireExceptionCaught(new IllegalArgumentException(
610 "invalid PING ID: " + spdyPingFrame.id()));
611 return;
612 }
613 pings.getAndIncrement();
614
615 } else if (msg instanceof SpdyGoAwayFrame) {
616
617
618
619 promise.setFailure(PROTOCOL_EXCEPTION);
620 return;
621
622 } else if (msg instanceof SpdyHeadersFrame) {
623
624 SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
625 int streamId = spdyHeadersFrame.streamId();
626
627
628 if (spdySession.isLocalSideClosed(streamId)) {
629 promise.setFailure(PROTOCOL_EXCEPTION);
630 return;
631 }
632
633
634 if (spdyHeadersFrame.isLast()) {
635 halfCloseStream(streamId, false, promise);
636 }
637
638 } else if (msg instanceof SpdyWindowUpdateFrame) {
639
640
641 promise.setFailure(PROTOCOL_EXCEPTION);
642 return;
643 }
644
645 ctx.write(msg, promise);
646 }
647
648
649
650
651
652
653
654
655
656
657 private void issueSessionError(
658 ChannelHandlerContext ctx, SpdySessionStatus status) {
659
660 sendGoAwayFrame(ctx, status).addListener(new ClosingChannelFutureListener(ctx, ctx.newPromise()));
661 }
662
663
664
665
666
667
668
669
670
671
672
673
674 private void issueStreamError(ChannelHandlerContext ctx, int streamId, SpdyStreamStatus status) {
675 boolean fireChannelRead = !spdySession.isRemoteSideClosed(streamId);
676 ChannelPromise promise = ctx.newPromise();
677 removeStream(streamId, promise);
678
679 SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, status);
680 ctx.writeAndFlush(spdyRstStreamFrame, promise);
681 if (fireChannelRead) {
682 ctx.fireChannelRead(spdyRstStreamFrame);
683 }
684 }
685
686
687
688
689
690 private boolean isRemoteInitiatedId(int id) {
691 boolean serverId = isServerId(id);
692 return server && !serverId || !server && serverId;
693 }
694
695
696 private void updateInitialSendWindowSize(int newInitialWindowSize) {
697 int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
698 initialSendWindowSize = newInitialWindowSize;
699 spdySession.updateAllSendWindowSizes(deltaWindowSize);
700 }
701
702
703 private void updateInitialReceiveWindowSize(int newInitialWindowSize) {
704 int deltaWindowSize = newInitialWindowSize - initialReceiveWindowSize;
705 initialReceiveWindowSize = newInitialWindowSize;
706 spdySession.updateAllReceiveWindowSizes(deltaWindowSize);
707 }
708
709
710 private boolean acceptStream(
711 int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed) {
712
713 if (receivedGoAwayFrame || sentGoAwayFrame) {
714 return false;
715 }
716
717 boolean remote = isRemoteInitiatedId(streamId);
718 int maxConcurrentStreams = remote ? localConcurrentStreams : remoteConcurrentStreams;
719 if (spdySession.numActiveStreams(remote) >= maxConcurrentStreams) {
720 return false;
721 }
722 spdySession.acceptStream(
723 streamId, priority, remoteSideClosed, localSideClosed,
724 initialSendWindowSize, initialReceiveWindowSize, remote);
725 if (remote) {
726 lastGoodStreamId = streamId;
727 }
728 return true;
729 }
730
731 private void halfCloseStream(int streamId, boolean remote, ChannelFuture future) {
732 if (remote) {
733 spdySession.closeRemoteSide(streamId, isRemoteInitiatedId(streamId));
734 } else {
735 spdySession.closeLocalSide(streamId, isRemoteInitiatedId(streamId));
736 }
737 if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
738 future.addListener(closeSessionFutureListener);
739 }
740 }
741
742 private void removeStream(int streamId, ChannelFuture future) {
743 spdySession.removeStream(streamId, STREAM_CLOSED, isRemoteInitiatedId(streamId));
744
745 if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
746 future.addListener(closeSessionFutureListener);
747 }
748 }
749
750 private void updateSendWindowSize(final ChannelHandlerContext ctx, int streamId, int deltaWindowSize) {
751 spdySession.updateSendWindowSize(streamId, deltaWindowSize);
752
753 while (true) {
754
755 SpdySession.PendingWrite pendingWrite = spdySession.getPendingWrite(streamId);
756 if (pendingWrite == null) {
757 return;
758 }
759
760 SpdyDataFrame spdyDataFrame = pendingWrite.spdyDataFrame;
761 int dataFrameSize = spdyDataFrame.content().readableBytes();
762 int writeStreamId = spdyDataFrame.streamId();
763 int sendWindowSize = spdySession.getSendWindowSize(writeStreamId);
764 int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
765 sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
766
767 if (sendWindowSize <= 0) {
768 return;
769 } else if (sendWindowSize < dataFrameSize) {
770
771 spdySession.updateSendWindowSize(writeStreamId, -1 * sendWindowSize);
772 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
773
774
775 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(
776 writeStreamId, spdyDataFrame.content().readRetainedSlice(sendWindowSize));
777
778
779
780 ctx.writeAndFlush(partialDataFrame).addListener(new ChannelFutureListener() {
781 @Override
782 public void operationComplete(ChannelFuture future) throws Exception {
783 if (!future.isSuccess()) {
784 issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
785 }
786 }
787 });
788 } else {
789
790 spdySession.removePendingWrite(writeStreamId);
791 spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize);
792 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize);
793
794
795 if (spdyDataFrame.isLast()) {
796 halfCloseStream(writeStreamId, false, pendingWrite.promise);
797 }
798
799
800
801 ctx.writeAndFlush(spdyDataFrame, pendingWrite.promise).addListener(new ChannelFutureListener() {
802 @Override
803 public void operationComplete(ChannelFuture future) throws Exception {
804 if (!future.isSuccess()) {
805 issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
806 }
807 }
808 });
809 }
810 }
811 }
812
813 private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelPromise future) {
814
815 if (!ctx.channel().isActive()) {
816 ctx.close(future);
817 return;
818 }
819
820 ChannelFuture f = sendGoAwayFrame(ctx, SpdySessionStatus.OK);
821 if (spdySession.noActiveStreams()) {
822 f.addListener(new ClosingChannelFutureListener(ctx, future));
823 } else {
824 closeSessionFutureListener = new ClosingChannelFutureListener(ctx, future);
825 }
826
827 }
828
829 private ChannelFuture sendGoAwayFrame(
830 ChannelHandlerContext ctx, SpdySessionStatus status) {
831 if (!sentGoAwayFrame) {
832 sentGoAwayFrame = true;
833 SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamId, status);
834 return ctx.writeAndFlush(spdyGoAwayFrame);
835 } else {
836 return ctx.newSucceededFuture();
837 }
838 }
839
840 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
841 private final ChannelHandlerContext ctx;
842 private final ChannelPromise promise;
843
844 ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
845 this.ctx = ctx;
846 this.promise = promise;
847 }
848
849 @Override
850 public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
851 ctx.close(promise);
852 }
853 }
854 }