1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package io.netty.handler.codec.spdy;
17  
18  import io.netty.channel.ChannelDuplexHandler;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelFutureListener;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelPromise;
23  import io.netty.util.internal.ObjectUtil;
24  
25  import java.util.concurrent.atomic.AtomicInteger;
26  
27  import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_SESSION_STREAM_ID;
28  import static io.netty.handler.codec.spdy.SpdyCodecUtil.isServerId;
29  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
30  
31  
32  
33  
34  public class SpdySessionHandler extends ChannelDuplexHandler {
35  
36      private static final SpdyProtocolException PROTOCOL_EXCEPTION =
37              SpdyProtocolException.newStatic(null, SpdySessionHandler.class, "handleOutboundMessage(...)");
38      private static final SpdyProtocolException STREAM_CLOSED =
39              SpdyProtocolException.newStatic("Stream closed", SpdySessionHandler.class, "removeStream(...)");
40  
41      private static final int DEFAULT_WINDOW_SIZE = 64 * 1024; 
42      private int initialSendWindowSize    = DEFAULT_WINDOW_SIZE;
43      private int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
44      private volatile int initialSessionReceiveWindowSize = DEFAULT_WINDOW_SIZE;
45  
46      private final SpdySession spdySession = new SpdySession(initialSendWindowSize, initialReceiveWindowSize);
47      private int lastGoodStreamId;
48  
49      private static final int DEFAULT_MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE;
50      private int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
51      private int localConcurrentStreams  = DEFAULT_MAX_CONCURRENT_STREAMS;
52  
53      private final AtomicInteger pings = new AtomicInteger();
54  
55      private boolean sentGoAwayFrame;
56      private boolean receivedGoAwayFrame;
57  
58      private ChannelFutureListener closeSessionFutureListener;
59  
60      private final boolean server;
61      private final int minorVersion;
62  
63      
64  
65  
66  
67  
68  
69  
70  
71  
72      public SpdySessionHandler(SpdyVersion version, boolean server) {
73          this.minorVersion = ObjectUtil.checkNotNull(version, "version").minorVersion();
74          this.server = server;
75      }
76  
77      public void setSessionReceiveWindowSize(int sessionReceiveWindowSize) {
78          checkPositiveOrZero(sessionReceiveWindowSize, "sessionReceiveWindowSize");
79          
80          
81          
82          
83          
84          
85          initialSessionReceiveWindowSize = sessionReceiveWindowSize;
86      }
87  
88      @Override
89      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
90          if (msg instanceof SpdyDataFrame) {
91  
92              
93  
94  
95  
96  
97  
98  
99  
100 
101 
102 
103 
104 
105 
106 
107 
108 
109 
110 
111 
112 
113 
114             SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
115             int streamId = spdyDataFrame.streamId();
116 
117             int deltaWindowSize = -1 * spdyDataFrame.content().readableBytes();
118             int newSessionWindowSize =
119                 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, deltaWindowSize);
120 
121             
122             if (newSessionWindowSize < 0) {
123                 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
124                 return;
125             }
126 
127             
128             if (newSessionWindowSize <= initialSessionReceiveWindowSize / 2) {
129                 int sessionDeltaWindowSize = initialSessionReceiveWindowSize - newSessionWindowSize;
130                 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
131                 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
132                     new DefaultSpdyWindowUpdateFrame(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
133                 ctx.writeAndFlush(spdyWindowUpdateFrame);
134             }
135 
136             
137 
138             if (!spdySession.isActiveStream(streamId)) {
139                 spdyDataFrame.release();
140                 if (streamId <= lastGoodStreamId) {
141                     issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
142                 } else if (!sentGoAwayFrame) {
143                     issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
144                 }
145                 return;
146             }
147 
148             
149 
150             if (spdySession.isRemoteSideClosed(streamId)) {
151                 spdyDataFrame.release();
152                 issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_ALREADY_CLOSED);
153                 return;
154             }
155 
156             
157             if (!isRemoteInitiatedId(streamId) && !spdySession.hasReceivedReply(streamId)) {
158                 spdyDataFrame.release();
159                 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
160                 return;
161             }
162 
163             
164 
165 
166 
167 
168 
169             
170             int newWindowSize = spdySession.updateReceiveWindowSize(streamId, deltaWindowSize);
171 
172             
173             
174             
175             
176             
177             if (newWindowSize < spdySession.getReceiveWindowSizeLowerBound(streamId)) {
178                 spdyDataFrame.release();
179                 issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
180                 return;
181             }
182 
183             
184             
185             if (newWindowSize < 0) {
186                 while (spdyDataFrame.content().readableBytes() > initialReceiveWindowSize) {
187                     SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(
188                             streamId, spdyDataFrame.content().readRetainedSlice(initialReceiveWindowSize));
189                     ctx.writeAndFlush(partialDataFrame);
190                 }
191             }
192 
193             
194             if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) {
195                 int streamDeltaWindowSize = initialReceiveWindowSize - newWindowSize;
196                 spdySession.updateReceiveWindowSize(streamId, streamDeltaWindowSize);
197                 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
198                         new DefaultSpdyWindowUpdateFrame(streamId, streamDeltaWindowSize);
199                 ctx.writeAndFlush(spdyWindowUpdateFrame);
200             }
201 
202             
203             if (spdyDataFrame.isLast()) {
204                 halfCloseStream(streamId, true, ctx.newSucceededFuture());
205             }
206 
207         } else if (msg instanceof SpdySynStreamFrame) {
208 
209             
210 
211 
212 
213 
214 
215 
216 
217 
218 
219 
220 
221 
222 
223             SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
224             int streamId = spdySynStreamFrame.streamId();
225 
226             
227             if (spdySynStreamFrame.isInvalid() ||
228                 !isRemoteInitiatedId(streamId) ||
229                 spdySession.isActiveStream(streamId)) {
230                 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
231                 return;
232             }
233 
234             
235             if (streamId <= lastGoodStreamId) {
236                 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
237                 return;
238             }
239 
240             
241             byte priority = spdySynStreamFrame.priority();
242             boolean remoteSideClosed = spdySynStreamFrame.isLast();
243             boolean localSideClosed = spdySynStreamFrame.isUnidirectional();
244             if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
245                 issueStreamError(ctx, streamId, SpdyStreamStatus.REFUSED_STREAM);
246                 return;
247             }
248 
249         } else if (msg instanceof SpdySynReplyFrame) {
250 
251             
252 
253 
254 
255 
256 
257 
258             SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
259             int streamId = spdySynReplyFrame.streamId();
260 
261             
262             if (spdySynReplyFrame.isInvalid() ||
263                 isRemoteInitiatedId(streamId) ||
264                 spdySession.isRemoteSideClosed(streamId)) {
265                 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
266                 return;
267             }
268 
269             
270             if (spdySession.hasReceivedReply(streamId)) {
271                 issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_IN_USE);
272                 return;
273             }
274 
275             spdySession.receivedReply(streamId);
276 
277             
278             if (spdySynReplyFrame.isLast()) {
279                 halfCloseStream(streamId, true, ctx.newSucceededFuture());
280             }
281 
282         } else if (msg instanceof SpdyRstStreamFrame) {
283 
284             
285 
286 
287 
288 
289 
290 
291 
292 
293             SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
294             removeStream(spdyRstStreamFrame.streamId(), ctx.newSucceededFuture());
295 
296         } else if (msg instanceof SpdySettingsFrame) {
297 
298             SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
299 
300             int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
301             if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
302                 
303                 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
304                 return;
305             }
306 
307             int newConcurrentStreams =
308                 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
309             if (newConcurrentStreams >= 0) {
310                 remoteConcurrentStreams = newConcurrentStreams;
311             }
312 
313             
314             
315             
316             if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
317                 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
318             }
319             spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
320 
321             int newInitialWindowSize =
322                 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
323             if (newInitialWindowSize >= 0) {
324                 updateInitialSendWindowSize(newInitialWindowSize);
325             }
326 
327         } else if (msg instanceof SpdyPingFrame) {
328 
329             
330 
331 
332 
333 
334 
335 
336 
337 
338             SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
339 
340             if (isRemoteInitiatedId(spdyPingFrame.id())) {
341                 ctx.writeAndFlush(spdyPingFrame);
342                 return;
343             }
344 
345             
346             if (pings.get() == 0) {
347                 return;
348             }
349             pings.getAndDecrement();
350 
351         } else if (msg instanceof SpdyGoAwayFrame) {
352 
353             receivedGoAwayFrame = true;
354 
355         } else if (msg instanceof SpdyHeadersFrame) {
356 
357             SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
358             int streamId = spdyHeadersFrame.streamId();
359 
360             
361             if (spdyHeadersFrame.isInvalid()) {
362                 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
363                 return;
364             }
365 
366             if (spdySession.isRemoteSideClosed(streamId)) {
367                 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
368                 return;
369             }
370 
371             
372             if (spdyHeadersFrame.isLast()) {
373                 halfCloseStream(streamId, true, ctx.newSucceededFuture());
374             }
375 
376         } else if (msg instanceof SpdyWindowUpdateFrame) {
377 
378             
379 
380 
381 
382 
383 
384 
385 
386 
387 
388             SpdyWindowUpdateFrame spdyWindowUpdateFrame = (SpdyWindowUpdateFrame) msg;
389             int streamId = spdyWindowUpdateFrame.streamId();
390             int deltaWindowSize = spdyWindowUpdateFrame.deltaWindowSize();
391 
392             
393             if (streamId != SPDY_SESSION_STREAM_ID && spdySession.isLocalSideClosed(streamId)) {
394                 return;
395             }
396 
397             
398             if (spdySession.getSendWindowSize(streamId) > Integer.MAX_VALUE - deltaWindowSize) {
399                 if (streamId == SPDY_SESSION_STREAM_ID) {
400                     issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
401                 } else {
402                     issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
403                 }
404                 return;
405             }
406 
407             updateSendWindowSize(ctx, streamId, deltaWindowSize);
408         }
409 
410         ctx.fireChannelRead(msg);
411     }
412 
413     @Override
414     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
415         for (Integer streamId: spdySession.activeStreams().keySet()) {
416             removeStream(streamId, ctx.newSucceededFuture());
417         }
418         ctx.fireChannelInactive();
419     }
420 
421     @Override
422     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
423         if (cause instanceof SpdyProtocolException) {
424             issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
425         }
426 
427         ctx.fireExceptionCaught(cause);
428     }
429 
430     @Override
431     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
432         sendGoAwayFrame(ctx, promise);
433     }
434 
435     @Override
436     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
437         if (msg instanceof SpdyDataFrame ||
438             msg instanceof SpdySynStreamFrame ||
439             msg instanceof SpdySynReplyFrame ||
440             msg instanceof SpdyRstStreamFrame ||
441             msg instanceof SpdySettingsFrame ||
442             msg instanceof SpdyPingFrame ||
443             msg instanceof SpdyGoAwayFrame ||
444             msg instanceof SpdyHeadersFrame ||
445             msg instanceof SpdyWindowUpdateFrame) {
446 
447             handleOutboundMessage(ctx, msg, promise);
448         } else {
449             ctx.write(msg, promise);
450         }
451     }
452 
453     private void handleOutboundMessage(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
454         if (msg instanceof SpdyDataFrame) {
455 
456             SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
457             int streamId = spdyDataFrame.streamId();
458 
459             
460             if (spdySession.isLocalSideClosed(streamId)) {
461                 spdyDataFrame.release();
462                 promise.setFailure(PROTOCOL_EXCEPTION);
463                 return;
464             }
465 
466             
467 
468 
469 
470 
471 
472 
473 
474 
475 
476 
477 
478 
479             int dataLength = spdyDataFrame.content().readableBytes();
480             int sendWindowSize = spdySession.getSendWindowSize(streamId);
481             int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
482             sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
483 
484             if (sendWindowSize <= 0) {
485                 
486                 spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
487                 return;
488             } else if (sendWindowSize < dataLength) {
489                 
490                 spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize);
491                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
492 
493                 
494                 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(
495                         streamId, spdyDataFrame.content().readRetainedSlice(sendWindowSize));
496 
497                 
498                 spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
499 
500                 
501                 
502                 final ChannelHandlerContext context = ctx;
503                 ctx.write(partialDataFrame).addListener(new ChannelFutureListener() {
504                     @Override
505                     public void operationComplete(ChannelFuture future) throws Exception {
506                         if (!future.isSuccess()) {
507                             issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
508                         }
509                     }
510                 });
511                 return;
512             } else {
513                 
514                 spdySession.updateSendWindowSize(streamId, -1 * dataLength);
515                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength);
516 
517                 
518                 
519                 final ChannelHandlerContext context = ctx;
520                 promise.addListener(new ChannelFutureListener() {
521                     @Override
522                     public void operationComplete(ChannelFuture future) throws Exception {
523                         if (!future.isSuccess()) {
524                             issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
525                         }
526                     }
527                 });
528             }
529 
530             
531             if (spdyDataFrame.isLast()) {
532                 halfCloseStream(streamId, false, promise);
533             }
534 
535         } else if (msg instanceof SpdySynStreamFrame) {
536 
537             SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
538             int streamId = spdySynStreamFrame.streamId();
539 
540             if (isRemoteInitiatedId(streamId)) {
541                 promise.setFailure(PROTOCOL_EXCEPTION);
542                 return;
543             }
544 
545             byte priority = spdySynStreamFrame.priority();
546             boolean remoteSideClosed = spdySynStreamFrame.isUnidirectional();
547             boolean localSideClosed = spdySynStreamFrame.isLast();
548             if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
549                 promise.setFailure(PROTOCOL_EXCEPTION);
550                 return;
551             }
552 
553         } else if (msg instanceof SpdySynReplyFrame) {
554 
555             SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
556             int streamId = spdySynReplyFrame.streamId();
557 
558             
559             if (!isRemoteInitiatedId(streamId) || spdySession.isLocalSideClosed(streamId)) {
560                 promise.setFailure(PROTOCOL_EXCEPTION);
561                 return;
562             }
563 
564             
565             if (spdySynReplyFrame.isLast()) {
566                 halfCloseStream(streamId, false, promise);
567             }
568 
569         } else if (msg instanceof SpdyRstStreamFrame) {
570 
571             SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
572             removeStream(spdyRstStreamFrame.streamId(), promise);
573 
574         } else if (msg instanceof SpdySettingsFrame) {
575 
576             SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
577 
578             int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
579             if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
580                 
581                 promise.setFailure(PROTOCOL_EXCEPTION);
582                 return;
583             }
584 
585             int newConcurrentStreams =
586                     spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
587             if (newConcurrentStreams >= 0) {
588                 localConcurrentStreams = newConcurrentStreams;
589             }
590 
591             
592             
593             
594             if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
595                 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
596             }
597             spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
598 
599             int newInitialWindowSize =
600                     spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
601             if (newInitialWindowSize >= 0) {
602                 updateInitialReceiveWindowSize(newInitialWindowSize);
603             }
604 
605         } else if (msg instanceof SpdyPingFrame) {
606 
607             SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
608             if (isRemoteInitiatedId(spdyPingFrame.id())) {
609                 ctx.fireExceptionCaught(new IllegalArgumentException(
610                             "invalid PING ID: " + spdyPingFrame.id()));
611                 return;
612             }
613             pings.getAndIncrement();
614 
615         } else if (msg instanceof SpdyGoAwayFrame) {
616 
617             
618             
619             promise.setFailure(PROTOCOL_EXCEPTION);
620             return;
621 
622         } else if (msg instanceof SpdyHeadersFrame) {
623 
624             SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
625             int streamId = spdyHeadersFrame.streamId();
626 
627             
628             if (spdySession.isLocalSideClosed(streamId)) {
629                 promise.setFailure(PROTOCOL_EXCEPTION);
630                 return;
631             }
632 
633             
634             if (spdyHeadersFrame.isLast()) {
635                 halfCloseStream(streamId, false, promise);
636             }
637 
638         } else if (msg instanceof SpdyWindowUpdateFrame) {
639 
640             
641             promise.setFailure(PROTOCOL_EXCEPTION);
642             return;
643         }
644 
645         ctx.write(msg, promise);
646     }
647 
648     
649 
650 
651 
652 
653 
654 
655 
656 
657     private void issueSessionError(
658             ChannelHandlerContext ctx, SpdySessionStatus status) {
659 
660         sendGoAwayFrame(ctx, status).addListener(new ClosingChannelFutureListener(ctx, ctx.newPromise()));
661     }
662 
663     
664 
665 
666 
667 
668 
669 
670 
671 
672 
673 
674     private void issueStreamError(ChannelHandlerContext ctx, int streamId, SpdyStreamStatus status) {
675         boolean fireChannelRead = !spdySession.isRemoteSideClosed(streamId);
676         ChannelPromise promise = ctx.newPromise();
677         removeStream(streamId, promise);
678 
679         SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, status);
680         ctx.writeAndFlush(spdyRstStreamFrame, promise);
681         if (fireChannelRead) {
682             ctx.fireChannelRead(spdyRstStreamFrame);
683         }
684     }
685 
686     
687 
688 
689 
690     private boolean isRemoteInitiatedId(int id) {
691         boolean serverId = isServerId(id);
692         return server && !serverId || !server && serverId;
693     }
694 
695     
696     private void updateInitialSendWindowSize(int newInitialWindowSize) {
697         int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
698         initialSendWindowSize = newInitialWindowSize;
699         spdySession.updateAllSendWindowSizes(deltaWindowSize);
700     }
701 
702     
703     private void updateInitialReceiveWindowSize(int newInitialWindowSize) {
704         int deltaWindowSize = newInitialWindowSize - initialReceiveWindowSize;
705         initialReceiveWindowSize = newInitialWindowSize;
706         spdySession.updateAllReceiveWindowSizes(deltaWindowSize);
707     }
708 
709     
710     private boolean acceptStream(
711             int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed) {
712         
713         if (receivedGoAwayFrame || sentGoAwayFrame) {
714             return false;
715         }
716 
717         boolean remote = isRemoteInitiatedId(streamId);
718         int maxConcurrentStreams = remote ? localConcurrentStreams : remoteConcurrentStreams;
719         if (spdySession.numActiveStreams(remote) >= maxConcurrentStreams) {
720             return false;
721         }
722         spdySession.acceptStream(
723                 streamId, priority, remoteSideClosed, localSideClosed,
724                 initialSendWindowSize, initialReceiveWindowSize, remote);
725         if (remote) {
726             lastGoodStreamId = streamId;
727         }
728         return true;
729     }
730 
731     private void halfCloseStream(int streamId, boolean remote, ChannelFuture future) {
732         if (remote) {
733             spdySession.closeRemoteSide(streamId, isRemoteInitiatedId(streamId));
734         } else {
735             spdySession.closeLocalSide(streamId, isRemoteInitiatedId(streamId));
736         }
737         if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
738             future.addListener(closeSessionFutureListener);
739         }
740     }
741 
742     private void removeStream(int streamId, ChannelFuture future) {
743         spdySession.removeStream(streamId, STREAM_CLOSED, isRemoteInitiatedId(streamId));
744 
745         if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
746             future.addListener(closeSessionFutureListener);
747         }
748     }
749 
750     private void updateSendWindowSize(final ChannelHandlerContext ctx, int streamId, int deltaWindowSize) {
751         spdySession.updateSendWindowSize(streamId, deltaWindowSize);
752 
753         while (true) {
754             
755             SpdySession.PendingWrite pendingWrite = spdySession.getPendingWrite(streamId);
756             if (pendingWrite == null) {
757                 return;
758             }
759 
760             SpdyDataFrame spdyDataFrame = pendingWrite.spdyDataFrame;
761             int dataFrameSize = spdyDataFrame.content().readableBytes();
762             int writeStreamId = spdyDataFrame.streamId();
763             int sendWindowSize = spdySession.getSendWindowSize(writeStreamId);
764             int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
765             sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
766 
767             if (sendWindowSize <= 0) {
768                 return;
769             } else if (sendWindowSize < dataFrameSize) {
770                 
771                 spdySession.updateSendWindowSize(writeStreamId, -1 * sendWindowSize);
772                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
773 
774                 
775                 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(
776                         writeStreamId, spdyDataFrame.content().readRetainedSlice(sendWindowSize));
777 
778                 
779                 
780                 ctx.writeAndFlush(partialDataFrame).addListener(new ChannelFutureListener() {
781                     @Override
782                     public void operationComplete(ChannelFuture future) throws Exception {
783                         if (!future.isSuccess()) {
784                             issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
785                         }
786                     }
787                 });
788             } else {
789                 
790                 spdySession.removePendingWrite(writeStreamId);
791                 spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize);
792                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize);
793 
794                 
795                 if (spdyDataFrame.isLast()) {
796                     halfCloseStream(writeStreamId, false, pendingWrite.promise);
797                 }
798 
799                 
800                 
801                 ctx.writeAndFlush(spdyDataFrame, pendingWrite.promise).addListener(new ChannelFutureListener() {
802                     @Override
803                     public void operationComplete(ChannelFuture future) throws Exception {
804                         if (!future.isSuccess()) {
805                             issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
806                         }
807                     }
808                 });
809             }
810         }
811     }
812 
813     private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelPromise future) {
814         
815         if (!ctx.channel().isActive()) {
816             ctx.close(future);
817             return;
818         }
819 
820         ChannelFuture f = sendGoAwayFrame(ctx, SpdySessionStatus.OK);
821         if (spdySession.noActiveStreams()) {
822             f.addListener(new ClosingChannelFutureListener(ctx, future));
823         } else {
824             closeSessionFutureListener = new ClosingChannelFutureListener(ctx, future);
825         }
826         
827     }
828 
829     private ChannelFuture sendGoAwayFrame(
830             ChannelHandlerContext ctx, SpdySessionStatus status) {
831         if (!sentGoAwayFrame) {
832             sentGoAwayFrame = true;
833             SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamId, status);
834             return ctx.writeAndFlush(spdyGoAwayFrame);
835         } else {
836             return ctx.newSucceededFuture();
837         }
838     }
839 
840     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
841         private final ChannelHandlerContext ctx;
842         private final ChannelPromise promise;
843 
844         ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
845             this.ctx = ctx;
846             this.promise = promise;
847         }
848 
849         @Override
850         public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
851             ctx.close(promise);
852         }
853     }
854 }