1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.spdy;
17
18 import io.netty.channel.ChannelDuplexHandler;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelFutureListener;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelPromise;
23 import io.netty.util.internal.ObjectUtil;
24
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_SESSION_STREAM_ID;
28 import static io.netty.handler.codec.spdy.SpdyCodecUtil.isServerId;
29 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
30
31
32
33
34 public class SpdySessionHandler extends ChannelDuplexHandler {
35
36 private static final SpdyProtocolException PROTOCOL_EXCEPTION =
37 SpdyProtocolException.newStatic(null, SpdySessionHandler.class, "handleOutboundMessage(...)");
38 private static final SpdyProtocolException STREAM_CLOSED =
39 SpdyProtocolException.newStatic("Stream closed", SpdySessionHandler.class, "removeStream(...)");
40
41 private static final int DEFAULT_WINDOW_SIZE = 64 * 1024;
42 private int initialSendWindowSize = DEFAULT_WINDOW_SIZE;
43 private int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
44 private volatile int initialSessionReceiveWindowSize = DEFAULT_WINDOW_SIZE;
45
46 private final SpdySession spdySession = new SpdySession(initialSendWindowSize, initialReceiveWindowSize);
47 private int lastGoodStreamId;
48
49 private static final int DEFAULT_MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE;
50 private int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
51 private int localConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
52
53 private final AtomicInteger pings = new AtomicInteger();
54
55 private boolean sentGoAwayFrame;
56 private boolean receivedGoAwayFrame;
57
58 private ChannelFutureListener closeSessionFutureListener;
59
60 private final boolean server;
61 private final int minorVersion;
62
63
64
65
66
67
68
69
70
71
72 public SpdySessionHandler(SpdyVersion version, boolean server) {
73 this.minorVersion = ObjectUtil.checkNotNull(version, "version").minorVersion();
74 this.server = server;
75 }
76
77 public void setSessionReceiveWindowSize(int sessionReceiveWindowSize) {
78 checkPositiveOrZero(sessionReceiveWindowSize, "sessionReceiveWindowSize");
79
80
81
82
83
84
85 initialSessionReceiveWindowSize = sessionReceiveWindowSize;
86 }
87
88 @Override
89 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
90 if (msg instanceof SpdyDataFrame) {
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
115 int streamId = spdyDataFrame.streamId();
116
117 int deltaWindowSize = -1 * spdyDataFrame.content().readableBytes();
118 int newSessionWindowSize =
119 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, deltaWindowSize);
120
121
122 if (newSessionWindowSize < 0) {
123 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
124 return;
125 }
126
127
128 if (newSessionWindowSize <= initialSessionReceiveWindowSize / 2) {
129 int sessionDeltaWindowSize = initialSessionReceiveWindowSize - newSessionWindowSize;
130 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
131 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
132 new DefaultSpdyWindowUpdateFrame(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
133 ctx.writeAndFlush(spdyWindowUpdateFrame);
134 }
135
136
137
138 if (!spdySession.isActiveStream(streamId)) {
139 spdyDataFrame.release();
140 if (streamId <= lastGoodStreamId) {
141 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
142 } else if (!sentGoAwayFrame) {
143 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
144 }
145 return;
146 }
147
148
149
150 if (spdySession.isRemoteSideClosed(streamId)) {
151 spdyDataFrame.release();
152 issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_ALREADY_CLOSED);
153 return;
154 }
155
156
157 if (!isRemoteInitiatedId(streamId) && !spdySession.hasReceivedReply(streamId)) {
158 spdyDataFrame.release();
159 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
160 return;
161 }
162
163
164
165
166
167
168
169
170 int newWindowSize = spdySession.updateReceiveWindowSize(streamId, deltaWindowSize);
171
172
173
174
175
176
177 if (newWindowSize < spdySession.getReceiveWindowSizeLowerBound(streamId)) {
178 spdyDataFrame.release();
179 issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
180 return;
181 }
182
183
184
185 if (newWindowSize < 0) {
186 while (spdyDataFrame.content().readableBytes() > initialReceiveWindowSize) {
187 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(
188 streamId, spdyDataFrame.content().readRetainedSlice(initialReceiveWindowSize));
189 ctx.writeAndFlush(partialDataFrame);
190 }
191 }
192
193
194 if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) {
195 int streamDeltaWindowSize = initialReceiveWindowSize - newWindowSize;
196 spdySession.updateReceiveWindowSize(streamId, streamDeltaWindowSize);
197 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
198 new DefaultSpdyWindowUpdateFrame(streamId, streamDeltaWindowSize);
199 ctx.writeAndFlush(spdyWindowUpdateFrame);
200 }
201
202
203 if (spdyDataFrame.isLast()) {
204 halfCloseStream(streamId, true, ctx.newSucceededFuture());
205 }
206
207 } else if (msg instanceof SpdySynStreamFrame) {
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223 SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
224 int streamId = spdySynStreamFrame.streamId();
225
226
227 if (spdySynStreamFrame.isInvalid() ||
228 !isRemoteInitiatedId(streamId) ||
229 spdySession.isActiveStream(streamId)) {
230 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
231 return;
232 }
233
234
235 if (streamId <= lastGoodStreamId) {
236 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
237 return;
238 }
239
240
241 byte priority = spdySynStreamFrame.priority();
242 boolean remoteSideClosed = spdySynStreamFrame.isLast();
243 boolean localSideClosed = spdySynStreamFrame.isUnidirectional();
244 if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
245 issueStreamError(ctx, streamId, SpdyStreamStatus.REFUSED_STREAM);
246 return;
247 }
248
249 } else if (msg instanceof SpdySynReplyFrame) {
250
251
252
253
254
255
256
257
258 SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
259 int streamId = spdySynReplyFrame.streamId();
260
261
262 if (spdySynReplyFrame.isInvalid() ||
263 isRemoteInitiatedId(streamId) ||
264 spdySession.isRemoteSideClosed(streamId)) {
265 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
266 return;
267 }
268
269
270 if (spdySession.hasReceivedReply(streamId)) {
271 issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_IN_USE);
272 return;
273 }
274
275 spdySession.receivedReply(streamId);
276
277
278 if (spdySynReplyFrame.isLast()) {
279 halfCloseStream(streamId, true, ctx.newSucceededFuture());
280 }
281
282 } else if (msg instanceof SpdyRstStreamFrame) {
283
284
285
286
287
288
289
290
291
292
293 SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
294 removeStream(spdyRstStreamFrame.streamId(), ctx.newSucceededFuture());
295
296 } else if (msg instanceof SpdySettingsFrame) {
297
298 SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
299
300 int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
301 if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
302
303 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
304 return;
305 }
306
307 int newConcurrentStreams =
308 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
309 if (newConcurrentStreams >= 0) {
310 remoteConcurrentStreams = newConcurrentStreams;
311 }
312
313
314
315
316 if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
317 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
318 }
319 spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
320
321 int newInitialWindowSize =
322 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
323 if (newInitialWindowSize >= 0) {
324 updateInitialSendWindowSize(newInitialWindowSize);
325 }
326
327 } else if (msg instanceof SpdyPingFrame) {
328
329
330
331
332
333
334
335
336
337
338 SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
339
340 if (isRemoteInitiatedId(spdyPingFrame.id())) {
341 ctx.writeAndFlush(spdyPingFrame);
342 return;
343 }
344
345
346 if (pings.get() == 0) {
347 return;
348 }
349 pings.getAndDecrement();
350
351 } else if (msg instanceof SpdyGoAwayFrame) {
352
353 receivedGoAwayFrame = true;
354
355 } else if (msg instanceof SpdyHeadersFrame) {
356
357 SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
358 int streamId = spdyHeadersFrame.streamId();
359
360
361 if (spdyHeadersFrame.isInvalid()) {
362 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
363 return;
364 }
365
366 if (spdySession.isRemoteSideClosed(streamId)) {
367 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
368 return;
369 }
370
371
372 if (spdyHeadersFrame.isLast()) {
373 halfCloseStream(streamId, true, ctx.newSucceededFuture());
374 }
375
376 } else if (msg instanceof SpdyWindowUpdateFrame) {
377
378
379
380
381
382
383
384
385
386
387
388 SpdyWindowUpdateFrame spdyWindowUpdateFrame = (SpdyWindowUpdateFrame) msg;
389 int streamId = spdyWindowUpdateFrame.streamId();
390 int deltaWindowSize = spdyWindowUpdateFrame.deltaWindowSize();
391
392
393 if (streamId != SPDY_SESSION_STREAM_ID && spdySession.isLocalSideClosed(streamId)) {
394 return;
395 }
396
397
398 if (spdySession.getSendWindowSize(streamId) > Integer.MAX_VALUE - deltaWindowSize) {
399 if (streamId == SPDY_SESSION_STREAM_ID) {
400 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
401 } else {
402 issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
403 }
404 return;
405 }
406
407 updateSendWindowSize(ctx, streamId, deltaWindowSize);
408 }
409
410 ctx.fireChannelRead(msg);
411 }
412
413 @Override
414 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
415 for (Integer streamId: spdySession.activeStreams().keySet()) {
416 removeStream(streamId, ctx.newSucceededFuture());
417 }
418 ctx.fireChannelInactive();
419 }
420
421 @Override
422 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
423 if (cause instanceof SpdyProtocolException) {
424 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
425 }
426
427 ctx.fireExceptionCaught(cause);
428 }
429
430 @Override
431 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
432 sendGoAwayFrame(ctx, promise);
433 }
434
435 @Override
436 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
437 if (msg instanceof SpdyDataFrame ||
438 msg instanceof SpdySynStreamFrame ||
439 msg instanceof SpdySynReplyFrame ||
440 msg instanceof SpdyRstStreamFrame ||
441 msg instanceof SpdySettingsFrame ||
442 msg instanceof SpdyPingFrame ||
443 msg instanceof SpdyGoAwayFrame ||
444 msg instanceof SpdyHeadersFrame ||
445 msg instanceof SpdyWindowUpdateFrame) {
446
447 handleOutboundMessage(ctx, msg, promise);
448 } else {
449 ctx.write(msg, promise);
450 }
451 }
452
453 private void handleOutboundMessage(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
454 if (msg instanceof SpdyDataFrame) {
455
456 SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
457 int streamId = spdyDataFrame.streamId();
458
459
460 if (spdySession.isLocalSideClosed(streamId)) {
461 spdyDataFrame.release();
462 promise.setFailure(PROTOCOL_EXCEPTION);
463 return;
464 }
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479 int dataLength = spdyDataFrame.content().readableBytes();
480 int sendWindowSize = spdySession.getSendWindowSize(streamId);
481 int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
482 sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
483
484 if (sendWindowSize <= 0) {
485
486 spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
487 return;
488 } else if (sendWindowSize < dataLength) {
489
490 spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize);
491 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
492
493
494 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(
495 streamId, spdyDataFrame.content().readRetainedSlice(sendWindowSize));
496
497
498 spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
499
500
501
502 final ChannelHandlerContext context = ctx;
503 ctx.write(partialDataFrame).addListener(future -> {
504 if (!future.isSuccess()) {
505 issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
506 }
507 });
508 return;
509 } else {
510
511 spdySession.updateSendWindowSize(streamId, -1 * dataLength);
512 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength);
513
514
515
516 final ChannelHandlerContext context = ctx;
517 promise.addListener(future -> {
518 if (!future.isSuccess()) {
519 issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
520 }
521 });
522 }
523
524
525 if (spdyDataFrame.isLast()) {
526 halfCloseStream(streamId, false, promise);
527 }
528
529 } else if (msg instanceof SpdySynStreamFrame) {
530
531 SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
532 int streamId = spdySynStreamFrame.streamId();
533
534 if (isRemoteInitiatedId(streamId)) {
535 promise.setFailure(PROTOCOL_EXCEPTION);
536 return;
537 }
538
539 byte priority = spdySynStreamFrame.priority();
540 boolean remoteSideClosed = spdySynStreamFrame.isUnidirectional();
541 boolean localSideClosed = spdySynStreamFrame.isLast();
542 if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
543 promise.setFailure(PROTOCOL_EXCEPTION);
544 return;
545 }
546
547 } else if (msg instanceof SpdySynReplyFrame) {
548
549 SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
550 int streamId = spdySynReplyFrame.streamId();
551
552
553 if (!isRemoteInitiatedId(streamId) || spdySession.isLocalSideClosed(streamId)) {
554 promise.setFailure(PROTOCOL_EXCEPTION);
555 return;
556 }
557
558
559 if (spdySynReplyFrame.isLast()) {
560 halfCloseStream(streamId, false, promise);
561 }
562
563 } else if (msg instanceof SpdyRstStreamFrame) {
564
565 SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
566 removeStream(spdyRstStreamFrame.streamId(), promise);
567
568 } else if (msg instanceof SpdySettingsFrame) {
569
570 SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
571
572 int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
573 if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
574
575 promise.setFailure(PROTOCOL_EXCEPTION);
576 return;
577 }
578
579 int newConcurrentStreams =
580 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
581 if (newConcurrentStreams >= 0) {
582 localConcurrentStreams = newConcurrentStreams;
583 }
584
585
586
587
588 if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
589 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
590 }
591 spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
592
593 int newInitialWindowSize =
594 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
595 if (newInitialWindowSize >= 0) {
596 updateInitialReceiveWindowSize(newInitialWindowSize);
597 }
598
599 } else if (msg instanceof SpdyPingFrame) {
600
601 SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
602 if (isRemoteInitiatedId(spdyPingFrame.id())) {
603 ctx.fireExceptionCaught(new IllegalArgumentException(
604 "invalid PING ID: " + spdyPingFrame.id()));
605 return;
606 }
607 pings.getAndIncrement();
608
609 } else if (msg instanceof SpdyGoAwayFrame) {
610
611
612
613 promise.setFailure(PROTOCOL_EXCEPTION);
614 return;
615
616 } else if (msg instanceof SpdyHeadersFrame) {
617
618 SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
619 int streamId = spdyHeadersFrame.streamId();
620
621
622 if (spdySession.isLocalSideClosed(streamId)) {
623 promise.setFailure(PROTOCOL_EXCEPTION);
624 return;
625 }
626
627
628 if (spdyHeadersFrame.isLast()) {
629 halfCloseStream(streamId, false, promise);
630 }
631
632 } else if (msg instanceof SpdyWindowUpdateFrame) {
633
634
635 promise.setFailure(PROTOCOL_EXCEPTION);
636 return;
637 }
638
639 ctx.write(msg, promise);
640 }
641
642
643
644
645
646
647
648
649
650
651 private void issueSessionError(
652 ChannelHandlerContext ctx, SpdySessionStatus status) {
653
654 sendGoAwayFrame(ctx, status).addListener(new ClosingChannelFutureListener(ctx, ctx.newPromise()));
655 }
656
657
658
659
660
661
662
663
664
665
666
667
668 private void issueStreamError(ChannelHandlerContext ctx, int streamId, SpdyStreamStatus status) {
669 boolean fireChannelRead = !spdySession.isRemoteSideClosed(streamId);
670 ChannelPromise promise = ctx.newPromise();
671 removeStream(streamId, promise);
672
673 SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, status);
674 ctx.writeAndFlush(spdyRstStreamFrame, promise);
675 if (fireChannelRead) {
676 ctx.fireChannelRead(spdyRstStreamFrame);
677 }
678 }
679
680
681
682
683
684 private boolean isRemoteInitiatedId(int id) {
685 boolean serverId = isServerId(id);
686 return server && !serverId || !server && serverId;
687 }
688
689
690 private void updateInitialSendWindowSize(int newInitialWindowSize) {
691 int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
692 initialSendWindowSize = newInitialWindowSize;
693 spdySession.updateAllSendWindowSizes(deltaWindowSize);
694 }
695
696
697 private void updateInitialReceiveWindowSize(int newInitialWindowSize) {
698 int deltaWindowSize = newInitialWindowSize - initialReceiveWindowSize;
699 initialReceiveWindowSize = newInitialWindowSize;
700 spdySession.updateAllReceiveWindowSizes(deltaWindowSize);
701 }
702
703
704 private boolean acceptStream(
705 int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed) {
706
707 if (receivedGoAwayFrame || sentGoAwayFrame) {
708 return false;
709 }
710
711 boolean remote = isRemoteInitiatedId(streamId);
712 int maxConcurrentStreams = remote ? localConcurrentStreams : remoteConcurrentStreams;
713 if (spdySession.numActiveStreams(remote) >= maxConcurrentStreams) {
714 return false;
715 }
716 spdySession.acceptStream(
717 streamId, priority, remoteSideClosed, localSideClosed,
718 initialSendWindowSize, initialReceiveWindowSize, remote);
719 if (remote) {
720 lastGoodStreamId = streamId;
721 }
722 return true;
723 }
724
725 private void halfCloseStream(int streamId, boolean remote, ChannelFuture future) {
726 if (remote) {
727 spdySession.closeRemoteSide(streamId, isRemoteInitiatedId(streamId));
728 } else {
729 spdySession.closeLocalSide(streamId, isRemoteInitiatedId(streamId));
730 }
731 if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
732 future.addListener(closeSessionFutureListener);
733 }
734 }
735
736 private void removeStream(int streamId, ChannelFuture future) {
737 spdySession.removeStream(streamId, STREAM_CLOSED, isRemoteInitiatedId(streamId));
738
739 if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
740 future.addListener(closeSessionFutureListener);
741 }
742 }
743
744 private void updateSendWindowSize(final ChannelHandlerContext ctx, int streamId, int deltaWindowSize) {
745 spdySession.updateSendWindowSize(streamId, deltaWindowSize);
746
747 while (true) {
748
749 SpdySession.PendingWrite pendingWrite = spdySession.getPendingWrite(streamId);
750 if (pendingWrite == null) {
751 return;
752 }
753
754 SpdyDataFrame spdyDataFrame = pendingWrite.spdyDataFrame;
755 int dataFrameSize = spdyDataFrame.content().readableBytes();
756 int writeStreamId = spdyDataFrame.streamId();
757 int sendWindowSize = spdySession.getSendWindowSize(writeStreamId);
758 int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
759 sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
760
761 if (sendWindowSize <= 0) {
762 return;
763 } else if (sendWindowSize < dataFrameSize) {
764
765 spdySession.updateSendWindowSize(writeStreamId, -1 * sendWindowSize);
766 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
767
768
769 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(
770 writeStreamId, spdyDataFrame.content().readRetainedSlice(sendWindowSize));
771
772
773 ctx.writeAndFlush(partialDataFrame).addListener(future -> {
774 if (!future.isSuccess()) {
775 issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
776 }
777 });
778 } else {
779
780 spdySession.removePendingWrite(writeStreamId);
781 spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize);
782 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize);
783
784
785 if (spdyDataFrame.isLast()) {
786 halfCloseStream(writeStreamId, false, pendingWrite.promise);
787 }
788
789
790
791 ctx.writeAndFlush(spdyDataFrame, pendingWrite.promise).addListener(future -> {
792 if (!future.isSuccess()) {
793 issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
794 }
795 });
796 }
797 }
798 }
799
800 private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelPromise future) {
801
802 if (!ctx.channel().isActive()) {
803 ctx.close(future);
804 return;
805 }
806
807 ChannelFuture f = sendGoAwayFrame(ctx, SpdySessionStatus.OK);
808 if (spdySession.noActiveStreams()) {
809 f.addListener(new ClosingChannelFutureListener(ctx, future));
810 } else {
811 closeSessionFutureListener = new ClosingChannelFutureListener(ctx, future);
812 }
813
814 }
815
816 private ChannelFuture sendGoAwayFrame(
817 ChannelHandlerContext ctx, SpdySessionStatus status) {
818 if (!sentGoAwayFrame) {
819 sentGoAwayFrame = true;
820 SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamId, status);
821 return ctx.writeAndFlush(spdyGoAwayFrame);
822 } else {
823 return ctx.newSucceededFuture();
824 }
825 }
826
827 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
828 private final ChannelHandlerContext ctx;
829 private final ChannelPromise promise;
830
831 ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
832 this.ctx = ctx;
833 this.promise = promise;
834 }
835
836 @Override
837 public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
838 ctx.close(promise);
839 }
840 }
841 }