1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http2;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelConfig;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandler;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelId;
26 import io.netty.channel.ChannelMetadata;
27 import io.netty.channel.ChannelOption;
28 import io.netty.channel.ChannelOutboundBuffer;
29 import io.netty.channel.ChannelPipeline;
30 import io.netty.channel.ChannelProgressivePromise;
31 import io.netty.channel.ChannelPromise;
32 import io.netty.channel.DefaultChannelConfig;
33 import io.netty.channel.DefaultChannelPipeline;
34 import io.netty.channel.EventLoop;
35 import io.netty.channel.MessageSizeEstimator;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.VoidChannelPromise;
38 import io.netty.channel.WriteBufferWaterMark;
39 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40 import io.netty.channel.socket.ChannelOutputShutdownEvent;
41 import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
42 import io.netty.handler.ssl.SslCloseCompletionEvent;
43 import io.netty.util.DefaultAttributeMap;
44 import io.netty.util.ReferenceCountUtil;
45 import io.netty.util.internal.ObjectUtil;
46 import io.netty.util.internal.StringUtil;
47 import io.netty.util.internal.logging.InternalLogger;
48 import io.netty.util.internal.logging.InternalLoggerFactory;
49
50 import java.io.IOException;
51 import java.net.SocketAddress;
52 import java.nio.channels.ClosedChannelException;
53 import java.util.ArrayDeque;
54 import java.util.Map;
55 import java.util.Queue;
56 import java.util.concurrent.RejectedExecutionException;
57 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
58 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
59
60 import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
61 import static io.netty.util.internal.ObjectUtil.checkNotNull;
62 import static java.lang.Math.min;
63
64 abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
65
66 static final Http2FrameStreamVisitor WRITABLE_VISITOR = new Http2FrameStreamVisitor() {
67 @Override
68 public boolean visit(Http2FrameStream stream) {
69 final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
70 ((DefaultHttp2FrameStream) stream).attachment;
71 childChannel.trySetWritable();
72 return true;
73 }
74 };
75
76 static final Http2FrameStreamVisitor CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR =
77 new UserEventStreamVisitor(ChannelInputShutdownReadComplete.INSTANCE);
78
79 static final Http2FrameStreamVisitor CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR =
80 new UserEventStreamVisitor(ChannelOutputShutdownEvent.INSTANCE);
81
82 static final Http2FrameStreamVisitor SSL_CLOSE_COMPLETION_EVENT_VISITOR =
83 new UserEventStreamVisitor(SslCloseCompletionEvent.SUCCESS);
84
85 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractHttp2StreamChannel.class);
86
87 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
88
89
90
91
92
93 private static final int MIN_HTTP2_FRAME_SIZE = 9;
94
95
96
97
98 private static final class UserEventStreamVisitor implements Http2FrameStreamVisitor {
99
100 private final Object event;
101
102 UserEventStreamVisitor(Object event) {
103 this.event = checkNotNull(event, "event");
104 }
105
106 @Override
107 public boolean visit(Http2FrameStream stream) {
108 final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
109 ((DefaultHttp2FrameStream) stream).attachment;
110 childChannel.pipeline().fireUserEventTriggered(event);
111 return true;
112 }
113 }
114
115
116
117
118 private static final class FlowControlledFrameSizeEstimator implements MessageSizeEstimator {
119
120 static final FlowControlledFrameSizeEstimator INSTANCE = new FlowControlledFrameSizeEstimator();
121
122 private static final Handle HANDLE_INSTANCE = new Handle() {
123 @Override
124 public int size(Object msg) {
125 return msg instanceof Http2DataFrame ?
126
127 (int) min(Integer.MAX_VALUE, ((Http2DataFrame) msg).initialFlowControlledBytes() +
128 (long) MIN_HTTP2_FRAME_SIZE) : MIN_HTTP2_FRAME_SIZE;
129 }
130 };
131
132 @Override
133 public Handle newHandle() {
134 return HANDLE_INSTANCE;
135 }
136 }
137
138 private static final AtomicLongFieldUpdater<AbstractHttp2StreamChannel> TOTAL_PENDING_SIZE_UPDATER =
139 AtomicLongFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "totalPendingSize");
140
141 private static final AtomicIntegerFieldUpdater<AbstractHttp2StreamChannel> UNWRITABLE_UPDATER =
142 AtomicIntegerFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "unwritable");
143
144 private static void windowUpdateFrameWriteComplete(ChannelFuture future, Channel streamChannel) {
145 Throwable cause = future.cause();
146 if (cause != null) {
147 Throwable unwrappedCause;
148
149 if (cause instanceof Http2FrameStreamException && (unwrappedCause = cause.getCause()) != null) {
150 cause = unwrappedCause;
151 }
152
153
154 streamChannel.pipeline().fireExceptionCaught(cause);
155 streamChannel.unsafe().close(streamChannel.unsafe().voidPromise());
156 }
157 }
158
159 private final ChannelFutureListener windowUpdateFrameWriteListener = new ChannelFutureListener() {
160 @Override
161 public void operationComplete(ChannelFuture future) {
162 windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
163 }
164 };
165
166
167
168
169 private enum ReadStatus {
170
171
172
173 IDLE,
174
175
176
177
178 IN_PROGRESS,
179
180
181
182
183 REQUESTED
184 }
185
186 private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
187 private final Http2ChannelUnsafe unsafe = new Http2ChannelUnsafe();
188 private final ChannelId channelId;
189 private final ChannelPipeline pipeline;
190 private final DefaultHttp2FrameStream stream;
191 private final ChannelPromise closePromise;
192
193 private volatile boolean registered;
194
195 private volatile long totalPendingSize;
196 private volatile int unwritable;
197
198
199 private Runnable fireChannelWritabilityChangedTask;
200
201 private boolean outboundClosed;
202 private int flowControlledBytes;
203
204
205
206
207
208
209
210 private ReadStatus readStatus = ReadStatus.IDLE;
211
212 private Queue<Object> inboundBuffer;
213
214
215 private boolean firstFrameWritten;
216 private boolean readCompletePending;
217
218 AbstractHttp2StreamChannel(DefaultHttp2FrameStream stream, int id, ChannelHandler inboundHandler) {
219 this.stream = stream;
220 stream.attachment = this;
221 pipeline = new DefaultChannelPipeline(this) {
222 @Override
223 protected void incrementPendingOutboundBytes(long size) {
224 AbstractHttp2StreamChannel.this.incrementPendingOutboundBytes(size, true);
225 }
226
227 @Override
228 protected void decrementPendingOutboundBytes(long size) {
229 AbstractHttp2StreamChannel.this.decrementPendingOutboundBytes(size, true);
230 }
231
232 @Override
233 protected void onUnhandledInboundException(Throwable cause) {
234
235 if (cause instanceof Http2FrameStreamException) {
236 closeWithError(((Http2FrameStreamException) cause).error());
237 return;
238 } else {
239 Http2Exception exception = Http2CodecUtil.getEmbeddedHttp2Exception(cause);
240 if (exception != null) {
241 closeWithError(exception.error());
242 return;
243 }
244 }
245 super.onUnhandledInboundException(cause);
246 }
247 };
248
249 closePromise = pipeline.newPromise();
250 channelId = new Http2StreamChannelId(parent().id(), id);
251
252 if (inboundHandler != null) {
253
254 pipeline.addLast(inboundHandler);
255 }
256 }
257
258 private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
259 if (size == 0) {
260 return;
261 }
262
263 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
264 if (newWriteBufferSize > config().getWriteBufferHighWaterMark()) {
265 setUnwritable(invokeLater);
266 }
267 }
268
269 private void decrementPendingOutboundBytes(long size, boolean invokeLater) {
270 if (size == 0) {
271 return;
272 }
273
274 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
275
276
277
278
279
280 if (newWriteBufferSize < config().getWriteBufferLowWaterMark() && parent().isWritable()) {
281 setWritable(invokeLater);
282 }
283 }
284
285 final void trySetWritable() {
286
287
288
289
290 if (totalPendingSize < config().getWriteBufferLowWaterMark()) {
291 setWritable(false);
292 }
293 }
294
295 private void setWritable(boolean invokeLater) {
296 for (;;) {
297 final int oldValue = unwritable;
298 final int newValue = oldValue & ~1;
299 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
300 if (oldValue != 0 && newValue == 0) {
301 fireChannelWritabilityChanged(invokeLater);
302 }
303 break;
304 }
305 }
306 }
307
308 private void setUnwritable(boolean invokeLater) {
309 for (;;) {
310 final int oldValue = unwritable;
311 final int newValue = oldValue | 1;
312 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
313 if (oldValue == 0) {
314 fireChannelWritabilityChanged(invokeLater);
315 }
316 break;
317 }
318 }
319 }
320
321 private void fireChannelWritabilityChanged(boolean invokeLater) {
322 final ChannelPipeline pipeline = pipeline();
323 if (invokeLater) {
324 Runnable task = fireChannelWritabilityChangedTask;
325 if (task == null) {
326 fireChannelWritabilityChangedTask = task = new Runnable() {
327 @Override
328 public void run() {
329 pipeline.fireChannelWritabilityChanged();
330 }
331 };
332 }
333 eventLoop().execute(task);
334 } else {
335 pipeline.fireChannelWritabilityChanged();
336 }
337 }
338 @Override
339 public Http2FrameStream stream() {
340 return stream;
341 }
342
343 void closeOutbound() {
344 outboundClosed = true;
345 }
346
347 void streamClosed() {
348 unsafe.readEOS();
349
350
351 unsafe.doBeginRead();
352 }
353
354 @Override
355 public ChannelMetadata metadata() {
356 return METADATA;
357 }
358
359 @Override
360 public ChannelConfig config() {
361 return config;
362 }
363
364 @Override
365 public boolean isOpen() {
366 return !closePromise.isDone();
367 }
368
369 @Override
370 public boolean isActive() {
371 return isOpen();
372 }
373
374 @Override
375 public boolean isWritable() {
376 return unwritable == 0;
377 }
378
379 @Override
380 public ChannelId id() {
381 return channelId;
382 }
383
384 @Override
385 public EventLoop eventLoop() {
386 return parent().eventLoop();
387 }
388
389 @Override
390 public Channel parent() {
391 return parentContext().channel();
392 }
393
394 @Override
395 public boolean isRegistered() {
396 return registered;
397 }
398
399 @Override
400 public SocketAddress localAddress() {
401 return parent().localAddress();
402 }
403
404 @Override
405 public SocketAddress remoteAddress() {
406 return parent().remoteAddress();
407 }
408
409 @Override
410 public ChannelFuture closeFuture() {
411 return closePromise;
412 }
413
414 @Override
415 public long bytesBeforeUnwritable() {
416
417 long bytes = config().getWriteBufferHighWaterMark() - totalPendingSize + 1;
418
419
420
421 return bytes > 0 && isWritable() ? bytes : 0;
422 }
423
424 @Override
425 public long bytesBeforeWritable() {
426
427 long bytes = totalPendingSize - config().getWriteBufferLowWaterMark() + 1;
428
429
430
431 return bytes <= 0 || isWritable() ? 0 : bytes;
432 }
433
434 @Override
435 public Unsafe unsafe() {
436 return unsafe;
437 }
438
439 @Override
440 public ChannelPipeline pipeline() {
441 return pipeline;
442 }
443
444 @Override
445 public ByteBufAllocator alloc() {
446 return config().getAllocator();
447 }
448
449 @Override
450 public Channel read() {
451 pipeline().read();
452 return this;
453 }
454
455 @Override
456 public Channel flush() {
457 pipeline().flush();
458 return this;
459 }
460
461 @Override
462 public ChannelFuture bind(SocketAddress localAddress) {
463 return pipeline().bind(localAddress);
464 }
465
466 @Override
467 public ChannelFuture connect(SocketAddress remoteAddress) {
468 return pipeline().connect(remoteAddress);
469 }
470
471 @Override
472 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
473 return pipeline().connect(remoteAddress, localAddress);
474 }
475
476 @Override
477 public ChannelFuture disconnect() {
478 return pipeline().disconnect();
479 }
480
481 @Override
482 public ChannelFuture close() {
483 return pipeline().close();
484 }
485
486 @Override
487 public ChannelFuture deregister() {
488 return pipeline().deregister();
489 }
490
491 @Override
492 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
493 return pipeline().bind(localAddress, promise);
494 }
495
496 @Override
497 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
498 return pipeline().connect(remoteAddress, promise);
499 }
500
501 @Override
502 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
503 return pipeline().connect(remoteAddress, localAddress, promise);
504 }
505
506 @Override
507 public ChannelFuture disconnect(ChannelPromise promise) {
508 return pipeline().disconnect(promise);
509 }
510
511 @Override
512 public ChannelFuture close(ChannelPromise promise) {
513 return pipeline().close(promise);
514 }
515
516 @Override
517 public ChannelFuture deregister(ChannelPromise promise) {
518 return pipeline().deregister(promise);
519 }
520
521 @Override
522 public ChannelFuture write(Object msg) {
523 return pipeline().write(msg);
524 }
525
526 @Override
527 public ChannelFuture write(Object msg, ChannelPromise promise) {
528 return pipeline().write(msg, promise);
529 }
530
531 @Override
532 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
533 return pipeline().writeAndFlush(msg, promise);
534 }
535
536 @Override
537 public ChannelFuture writeAndFlush(Object msg) {
538 return pipeline().writeAndFlush(msg);
539 }
540
541 @Override
542 public ChannelPromise newPromise() {
543 return pipeline().newPromise();
544 }
545
546 @Override
547 public ChannelProgressivePromise newProgressivePromise() {
548 return pipeline().newProgressivePromise();
549 }
550
551 @Override
552 public ChannelFuture newSucceededFuture() {
553 return pipeline().newSucceededFuture();
554 }
555
556 @Override
557 public ChannelFuture newFailedFuture(Throwable cause) {
558 return pipeline().newFailedFuture(cause);
559 }
560
561 @Override
562 public ChannelPromise voidPromise() {
563 return pipeline().voidPromise();
564 }
565
566 @Override
567 public int hashCode() {
568 return id().hashCode();
569 }
570
571 @Override
572 public boolean equals(Object o) {
573 return this == o;
574 }
575
576 @Override
577 public int compareTo(Channel o) {
578 if (this == o) {
579 return 0;
580 }
581
582 return id().compareTo(o.id());
583 }
584
585 @Override
586 public String toString() {
587 return parent().toString() + "(H2 - " + stream + ')';
588 }
589
590
591
592
593
594 void fireChildRead(Http2Frame frame) {
595 assert eventLoop().inEventLoop();
596 if (!isActive()) {
597 ReferenceCountUtil.release(frame);
598 } else if (readStatus != ReadStatus.IDLE) {
599
600
601 assert inboundBuffer == null || inboundBuffer.isEmpty();
602 final RecvByteBufAllocator.Handle allocHandle = unsafe.recvBufAllocHandle();
603
604 unsafe.doRead0(frame, allocHandle);
605
606
607
608
609 if (allocHandle.continueReading()) {
610 maybeAddChannelToReadCompletePendingQueue();
611 } else {
612 unsafe.notifyReadComplete(allocHandle, true, false);
613 }
614 } else {
615 if (inboundBuffer == null) {
616 inboundBuffer = new ArrayDeque<Object>(4);
617 }
618 inboundBuffer.add(frame);
619 }
620 }
621
622 void fireChildReadComplete() {
623 assert eventLoop().inEventLoop();
624 assert readStatus != ReadStatus.IDLE || !readCompletePending;
625 unsafe.notifyReadComplete(unsafe.recvBufAllocHandle(), false, false);
626 }
627
628 final void closeWithError(Http2Error error) {
629 assert eventLoop().inEventLoop();
630 unsafe.close(unsafe.voidPromise(), error);
631 }
632
633 private final class Http2ChannelUnsafe implements Unsafe {
634 private final VoidChannelPromise unsafeVoidPromise =
635 new VoidChannelPromise(AbstractHttp2StreamChannel.this, false);
636 @SuppressWarnings("deprecation")
637 private RecvByteBufAllocator.Handle recvHandle;
638 private boolean writeDoneAndNoFlush;
639 private boolean closeInitiated;
640 private boolean readEOS;
641
642 private boolean receivedEndOfStream;
643 private boolean sentEndOfStream;
644
645 @Override
646 public void connect(final SocketAddress remoteAddress,
647 SocketAddress localAddress, final ChannelPromise promise) {
648 if (!promise.setUncancellable()) {
649 return;
650 }
651 promise.setFailure(new UnsupportedOperationException());
652 }
653
654 @Override
655 public RecvByteBufAllocator.Handle recvBufAllocHandle() {
656 if (recvHandle == null) {
657 recvHandle = config().getRecvByteBufAllocator().newHandle();
658 recvHandle.reset(config());
659 }
660 return recvHandle;
661 }
662
663 @Override
664 public SocketAddress localAddress() {
665 return parent().unsafe().localAddress();
666 }
667
668 @Override
669 public SocketAddress remoteAddress() {
670 return parent().unsafe().remoteAddress();
671 }
672
673 @Override
674 public void register(EventLoop eventLoop, ChannelPromise promise) {
675 if (!promise.setUncancellable()) {
676 return;
677 }
678 if (registered) {
679 promise.setFailure(new UnsupportedOperationException("Re-register is not supported"));
680 return;
681 }
682
683 registered = true;
684
685 promise.setSuccess();
686
687 pipeline().fireChannelRegistered();
688 if (isActive()) {
689 pipeline().fireChannelActive();
690 }
691 }
692
693 @Override
694 public void bind(SocketAddress localAddress, ChannelPromise promise) {
695 if (!promise.setUncancellable()) {
696 return;
697 }
698 promise.setFailure(new UnsupportedOperationException());
699 }
700
701 @Override
702 public void disconnect(ChannelPromise promise) {
703 close(promise);
704 }
705
706 @Override
707 public void close(final ChannelPromise promise) {
708 close(promise, Http2Error.CANCEL);
709 }
710
711 void close(final ChannelPromise promise, Http2Error error) {
712 if (!promise.setUncancellable()) {
713 return;
714 }
715 if (closeInitiated) {
716 if (closePromise.isDone()) {
717
718 promise.setSuccess();
719 } else if (!(promise instanceof VoidChannelPromise)) {
720
721 closePromise.addListener(new ChannelFutureListener() {
722 @Override
723 public void operationComplete(ChannelFuture future) {
724 promise.setSuccess();
725 }
726 });
727 }
728 return;
729 }
730 closeInitiated = true;
731
732 readCompletePending = false;
733
734 final boolean wasActive = isActive();
735
736
737
738
739
740
741 if (parent().isActive() && isStreamIdValid(stream.id()) &&
742
743 !readEOS && !(receivedEndOfStream && sentEndOfStream)) {
744 Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(error).stream(stream());
745 write(resetFrame, unsafe().voidPromise());
746 flush();
747 }
748
749 if (inboundBuffer != null) {
750 for (;;) {
751 Object msg = inboundBuffer.poll();
752 if (msg == null) {
753 break;
754 }
755 ReferenceCountUtil.release(msg);
756 }
757 inboundBuffer = null;
758 }
759
760
761 outboundClosed = true;
762 closePromise.setSuccess();
763 promise.setSuccess();
764
765 fireChannelInactiveAndDeregister(voidPromise(), wasActive);
766 }
767
768 @Override
769 public void closeForcibly() {
770 close(unsafe().voidPromise());
771 }
772
773 @Override
774 public void deregister(ChannelPromise promise) {
775 fireChannelInactiveAndDeregister(promise, false);
776 }
777
778 private void fireChannelInactiveAndDeregister(final ChannelPromise promise,
779 final boolean fireChannelInactive) {
780 if (!promise.setUncancellable()) {
781 return;
782 }
783
784 if (!registered) {
785 promise.setSuccess();
786 return;
787 }
788
789
790
791
792
793
794
795
796 invokeLater(promise.channel(), new Runnable() {
797 @Override
798 public void run() {
799 if (fireChannelInactive) {
800 pipeline.fireChannelInactive();
801 }
802
803
804 if (registered) {
805 registered = false;
806 pipeline.fireChannelUnregistered();
807 }
808 safeSetSuccess(promise);
809 }
810 });
811 }
812
813 private void safeSetSuccess(ChannelPromise promise) {
814 if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
815 logger.warn("{} Failed to mark a promise as success because it is done already: {}",
816 promise.channel(), promise);
817 }
818 }
819
820 private void invokeLater(Channel channel, Runnable task) {
821 try {
822
823
824
825
826
827
828
829
830
831
832
833 eventLoop().execute(task);
834 } catch (RejectedExecutionException e) {
835 logger.warn("{} Can't invoke task later as EventLoop rejected it", channel, e);
836 }
837 }
838
839 @Override
840 public void beginRead() {
841 if (!isActive()) {
842 return;
843 }
844 updateLocalWindowIfNeeded();
845
846 switch (readStatus) {
847 case IDLE:
848 readStatus = ReadStatus.IN_PROGRESS;
849 doBeginRead();
850 break;
851 case IN_PROGRESS:
852 readStatus = ReadStatus.REQUESTED;
853 break;
854 default:
855 break;
856 }
857 }
858
859 private Object pollQueuedMessage() {
860 return inboundBuffer == null ? null : inboundBuffer.poll();
861 }
862
863 void doBeginRead() {
864 if (readStatus == ReadStatus.IDLE) {
865
866 if (readEOS && (inboundBuffer == null || inboundBuffer.isEmpty())) {
867
868 flush();
869 unsafe.closeForcibly();
870 }
871 } else {
872 do {
873 Object message = pollQueuedMessage();
874 if (message == null) {
875
876 flush();
877 if (readEOS) {
878 unsafe.closeForcibly();
879 }
880 break;
881 }
882 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
883 allocHandle.reset(config());
884 boolean continueReading = false;
885 do {
886 doRead0((Http2Frame) message, allocHandle);
887 } while ((readEOS || (continueReading = allocHandle.continueReading()))
888 && (message = pollQueuedMessage()) != null);
889
890 if (continueReading && isParentReadInProgress() && !readEOS) {
891
892
893
894
895 maybeAddChannelToReadCompletePendingQueue();
896 } else {
897 notifyReadComplete(allocHandle, true, true);
898
899
900
901
902 resetReadStatus();
903 }
904 } while (readStatus != ReadStatus.IDLE);
905 }
906 }
907
908 void readEOS() {
909 readEOS = true;
910 }
911
912 private boolean updateLocalWindowIfNeeded() {
913 if (flowControlledBytes != 0 && !parentContext().isRemoved() && config.autoStreamFlowControl) {
914 int bytes = flowControlledBytes;
915 flowControlledBytes = 0;
916 writeWindowUpdateFrame(new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
917 return true;
918 }
919 return false;
920 }
921
922 void updateLocalWindowIfNeededAndFlush() {
923 if (updateLocalWindowIfNeeded()) {
924 flush();
925 }
926 }
927
928 private void resetReadStatus() {
929 readStatus = readStatus == ReadStatus.REQUESTED ? ReadStatus.IN_PROGRESS : ReadStatus.IDLE;
930 }
931
932 void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle, boolean forceReadComplete,
933 boolean inReadLoop) {
934 if (!readCompletePending && !forceReadComplete) {
935 return;
936 }
937
938 readCompletePending = false;
939
940 if (!inReadLoop) {
941
942 resetReadStatus();
943 }
944
945 allocHandle.readComplete();
946 pipeline().fireChannelReadComplete();
947
948
949
950 flush();
951 if (readEOS) {
952 unsafe.closeForcibly();
953 }
954 }
955
956 @SuppressWarnings("deprecation")
957 void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
958 final int bytes;
959 if (frame instanceof Http2DataFrame) {
960 bytes = ((Http2DataFrame) frame).initialFlowControlledBytes();
961
962
963
964
965
966 flowControlledBytes += bytes;
967 } else {
968 bytes = MIN_HTTP2_FRAME_SIZE;
969 }
970
971
972
973 receivedEndOfStream |= isEndOfStream(frame);
974
975
976 allocHandle.attemptedBytesRead(bytes);
977 allocHandle.lastBytesRead(bytes);
978 allocHandle.incMessagesRead(1);
979
980 pipeline().fireChannelRead(frame);
981 }
982
983 private ChannelFuture writeWindowUpdateFrame(Http2WindowUpdateFrame windowUpdateFrame) {
984 ChannelFuture future = write0(parentContext(), windowUpdateFrame);
985
986
987
988 writeDoneAndNoFlush = true;
989
990
991
992
993 if (future.isDone()) {
994 windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
995 } else {
996 future.addListener(windowUpdateFrameWriteListener);
997 }
998 return future;
999 }
1000
1001 @Override
1002 public void write(Object msg, final ChannelPromise promise) {
1003
1004 if (!promise.setUncancellable()) {
1005 ReferenceCountUtil.release(msg);
1006 return;
1007 }
1008
1009 if (!isActive() ||
1010
1011 outboundClosed && (msg instanceof Http2HeadersFrame || msg instanceof Http2DataFrame)) {
1012 ReferenceCountUtil.release(msg);
1013 promise.setFailure(new ClosedChannelException());
1014 return;
1015 }
1016
1017 try {
1018 if (msg instanceof Http2StreamFrame) {
1019 Http2StreamFrame frame = validateStreamFrame((Http2StreamFrame) msg).stream(stream());
1020 if (msg instanceof Http2WindowUpdateFrame) {
1021 Http2WindowUpdateFrame updateFrame = (Http2WindowUpdateFrame) msg;
1022 if (config.autoStreamFlowControl) {
1023 ReferenceCountUtil.release(msg);
1024 promise.setFailure(new UnsupportedOperationException(
1025 Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL + " is set to false"));
1026 return;
1027 }
1028 try {
1029 ObjectUtil.checkInRange(updateFrame.windowSizeIncrement(), 0,
1030 flowControlledBytes, "windowSizeIncrement");
1031 } catch (RuntimeException e) {
1032 ReferenceCountUtil.release(updateFrame);
1033 promise.setFailure(e);
1034 return;
1035 }
1036 flowControlledBytes -= updateFrame.windowSizeIncrement();
1037 if (parentContext().isRemoved()) {
1038 ReferenceCountUtil.release(msg);
1039 promise.setFailure(new ClosedChannelException());
1040 return;
1041 }
1042 ChannelFuture f = writeWindowUpdateFrame(updateFrame);
1043 if (f.isDone()) {
1044 writeComplete(f, promise);
1045 } else {
1046 f.addListener(new ChannelFutureListener() {
1047 @Override
1048 public void operationComplete(ChannelFuture future) {
1049 writeComplete(future, promise);
1050 }
1051 });
1052 }
1053 } else {
1054 writeHttp2StreamFrame(frame, promise);
1055 }
1056 } else {
1057 String msgStr = msg.toString();
1058 ReferenceCountUtil.release(msg);
1059 promise.setFailure(new IllegalArgumentException(
1060 "Message must be an " + StringUtil.simpleClassName(Http2StreamFrame.class) +
1061 ": " + msgStr));
1062 }
1063 } catch (Throwable t) {
1064 promise.tryFailure(t);
1065 }
1066 }
1067
1068 private boolean isEndOfStream(Http2Frame frame) {
1069 if (frame instanceof Http2HeadersFrame) {
1070 return ((Http2HeadersFrame) frame).isEndStream();
1071 }
1072 if (frame instanceof Http2DataFrame) {
1073 return ((Http2DataFrame) frame).isEndStream();
1074 }
1075 return false;
1076 }
1077
1078 private void writeHttp2StreamFrame(Http2StreamFrame frame, final ChannelPromise promise) {
1079 if (!firstFrameWritten && !isStreamIdValid(stream().id()) && !(frame instanceof Http2HeadersFrame)) {
1080 ReferenceCountUtil.release(frame);
1081 promise.setFailure(
1082 new IllegalArgumentException("The first frame must be a headers frame. Was: "
1083 + frame.name()));
1084 return;
1085 }
1086
1087 final boolean firstWrite;
1088 if (firstFrameWritten) {
1089 firstWrite = false;
1090 } else {
1091 firstWrite = firstFrameWritten = true;
1092 }
1093
1094
1095
1096 sentEndOfStream |= isEndOfStream(frame);
1097 ChannelFuture f = write0(parentContext(), frame);
1098 if (f.isDone()) {
1099 if (firstWrite) {
1100 firstWriteComplete(f, promise);
1101 } else {
1102 writeComplete(f, promise);
1103 }
1104 } else {
1105 final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(frame);
1106 incrementPendingOutboundBytes(bytes, false);
1107 f.addListener(new ChannelFutureListener() {
1108 @Override
1109 public void operationComplete(ChannelFuture future) {
1110 if (firstWrite) {
1111 firstWriteComplete(future, promise);
1112 } else {
1113 writeComplete(future, promise);
1114 }
1115 decrementPendingOutboundBytes(bytes, false);
1116 }
1117 });
1118 writeDoneAndNoFlush = true;
1119 }
1120 }
1121
1122 private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) {
1123 Throwable cause = future.cause();
1124 if (cause == null) {
1125 promise.setSuccess();
1126 } else {
1127
1128 closeForcibly();
1129 promise.setFailure(wrapStreamClosedError(cause));
1130 }
1131 }
1132
1133 private void writeComplete(ChannelFuture future, ChannelPromise promise) {
1134 Throwable cause = future.cause();
1135 if (cause == null) {
1136 promise.setSuccess();
1137 } else {
1138 Throwable error = wrapStreamClosedError(cause);
1139
1140 if (error instanceof IOException) {
1141 if (config.isAutoClose()) {
1142
1143 closeForcibly();
1144 } else {
1145
1146 outboundClosed = true;
1147 }
1148 }
1149 promise.setFailure(error);
1150 }
1151 }
1152
1153 private Throwable wrapStreamClosedError(Throwable cause) {
1154
1155
1156 if (cause instanceof Http2Exception && ((Http2Exception) cause).error() == Http2Error.STREAM_CLOSED) {
1157 return new ClosedChannelException().initCause(cause);
1158 }
1159 return cause;
1160 }
1161
1162 private Http2StreamFrame validateStreamFrame(Http2StreamFrame frame) {
1163 if (frame.stream() != null && frame.stream() != stream) {
1164 String msgString = frame.toString();
1165 ReferenceCountUtil.release(frame);
1166 throw new IllegalArgumentException(
1167 "Stream " + frame.stream() + " must not be set on the frame: " + msgString);
1168 }
1169 return frame;
1170 }
1171
1172 @Override
1173 public void flush() {
1174
1175
1176
1177
1178 if (!writeDoneAndNoFlush || isParentReadInProgress()) {
1179
1180 return;
1181 }
1182
1183
1184 writeDoneAndNoFlush = false;
1185 flush0(parentContext());
1186 }
1187
1188 @Override
1189 public ChannelPromise voidPromise() {
1190 return unsafeVoidPromise;
1191 }
1192
1193 @Override
1194 public ChannelOutboundBuffer outboundBuffer() {
1195
1196 return null;
1197 }
1198 }
1199
1200
1201
1202
1203
1204
1205 private static final class Http2StreamChannelConfig extends DefaultChannelConfig {
1206
1207 volatile boolean autoStreamFlowControl = true;
1208 Http2StreamChannelConfig(Channel channel) {
1209 super(channel);
1210 }
1211
1212 @Override
1213 public MessageSizeEstimator getMessageSizeEstimator() {
1214 return FlowControlledFrameSizeEstimator.INSTANCE;
1215 }
1216
1217 @Override
1218 public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
1219 throw new UnsupportedOperationException();
1220 }
1221
1222 @Override
1223 public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
1224 if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
1225 throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " +
1226 RecvByteBufAllocator.ExtendedHandle.class);
1227 }
1228 super.setRecvByteBufAllocator(allocator);
1229 return this;
1230 }
1231
1232 @Override
1233 public Map<ChannelOption<?>, Object> getOptions() {
1234 return getOptions(
1235 super.getOptions(),
1236 Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL);
1237 }
1238
1239 @SuppressWarnings("unchecked")
1240 @Override
1241 public <T> T getOption(ChannelOption<T> option) {
1242 if (option == Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL) {
1243 return (T) Boolean.valueOf(autoStreamFlowControl);
1244 }
1245 return super.getOption(option);
1246 }
1247
1248 @Override
1249 public <T> boolean setOption(ChannelOption<T> option, T value) {
1250 validate(option, value);
1251 if (option == Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL) {
1252 boolean newValue = (Boolean) value;
1253 boolean changed = newValue && !autoStreamFlowControl;
1254 autoStreamFlowControl = (Boolean) value;
1255 if (changed) {
1256 if (channel.isRegistered()) {
1257 final Http2ChannelUnsafe unsafe = (Http2ChannelUnsafe) channel.unsafe();
1258 if (channel.eventLoop().inEventLoop()) {
1259 unsafe.updateLocalWindowIfNeededAndFlush();
1260 } else {
1261 channel.eventLoop().execute(new Runnable() {
1262 @Override
1263 public void run() {
1264 unsafe.updateLocalWindowIfNeededAndFlush();
1265 }
1266 });
1267 }
1268 }
1269 }
1270 return true;
1271 }
1272 return super.setOption(option, value);
1273 }
1274 }
1275
1276 private void maybeAddChannelToReadCompletePendingQueue() {
1277 if (!readCompletePending) {
1278 readCompletePending = true;
1279 addChannelToReadCompletePendingQueue();
1280 }
1281 }
1282
1283 protected void flush0(ChannelHandlerContext ctx) {
1284 ctx.flush();
1285 }
1286
1287 protected ChannelFuture write0(ChannelHandlerContext ctx, Object msg) {
1288 ChannelPromise promise = ctx.newPromise();
1289 ctx.write(msg, promise);
1290 return promise;
1291 }
1292
1293 protected abstract boolean isParentReadInProgress();
1294 protected abstract void addChannelToReadCompletePendingQueue();
1295 protected abstract ChannelHandlerContext parentContext();
1296 }