1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http2;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelConfig;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandler;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelId;
26 import io.netty.channel.ChannelMetadata;
27 import io.netty.channel.ChannelOption;
28 import io.netty.channel.ChannelOutboundBuffer;
29 import io.netty.channel.ChannelPipeline;
30 import io.netty.channel.ChannelProgressivePromise;
31 import io.netty.channel.ChannelPromise;
32 import io.netty.channel.DefaultChannelConfig;
33 import io.netty.channel.DefaultChannelPipeline;
34 import io.netty.channel.EventLoop;
35 import io.netty.channel.MessageSizeEstimator;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.VoidChannelPromise;
38 import io.netty.channel.WriteBufferWaterMark;
39 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40 import io.netty.channel.socket.ChannelOutputShutdownEvent;
41 import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
42 import io.netty.handler.ssl.SslCloseCompletionEvent;
43 import io.netty.util.DefaultAttributeMap;
44 import io.netty.util.ReferenceCountUtil;
45 import io.netty.util.concurrent.Future;
46 import io.netty.util.internal.ObjectUtil;
47 import io.netty.util.internal.StringUtil;
48 import io.netty.util.internal.logging.InternalLogger;
49 import io.netty.util.internal.logging.InternalLoggerFactory;
50
51 import java.io.IOException;
52 import java.net.SocketAddress;
53 import java.nio.channels.ClosedChannelException;
54 import java.util.ArrayDeque;
55 import java.util.Map;
56 import java.util.Queue;
57 import java.util.concurrent.RejectedExecutionException;
58 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
59 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
60
61 import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
62 import static io.netty.util.internal.ObjectUtil.checkNotNull;
63 import static java.lang.Math.min;
64
65 abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
66
67 static final Http2FrameStreamVisitor WRITABLE_VISITOR = new Http2FrameStreamVisitor() {
68 @Override
69 public boolean visit(Http2FrameStream stream) {
70 final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
71 ((DefaultHttp2FrameStream) stream).attachment;
72 childChannel.trySetWritable();
73 return true;
74 }
75 };
76
77 static final Http2FrameStreamVisitor CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR =
78 new UserEventStreamVisitor(ChannelInputShutdownReadComplete.INSTANCE);
79
80 static final Http2FrameStreamVisitor CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR =
81 new UserEventStreamVisitor(ChannelOutputShutdownEvent.INSTANCE);
82
83 static final Http2FrameStreamVisitor SSL_CLOSE_COMPLETION_EVENT_VISITOR =
84 new UserEventStreamVisitor(SslCloseCompletionEvent.SUCCESS);
85
86 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractHttp2StreamChannel.class);
87
88 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
89
90
91
92
93
94 private static final int MIN_HTTP2_FRAME_SIZE = 9;
95
96
97
98
99 private static final class UserEventStreamVisitor implements Http2FrameStreamVisitor {
100
101 private final Object event;
102
103 UserEventStreamVisitor(Object event) {
104 this.event = checkNotNull(event, "event");
105 }
106
107 @Override
108 public boolean visit(Http2FrameStream stream) {
109 final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
110 ((DefaultHttp2FrameStream) stream).attachment;
111 childChannel.pipeline().fireUserEventTriggered(event);
112 return true;
113 }
114 }
115
116
117
118
119 private static final class FlowControlledFrameSizeEstimator implements MessageSizeEstimator {
120
121 static final FlowControlledFrameSizeEstimator INSTANCE = new FlowControlledFrameSizeEstimator();
122
123 private static final Handle HANDLE_INSTANCE = new Handle() {
124 @Override
125 public int size(Object msg) {
126 return msg instanceof Http2DataFrame ?
127
128 (int) min(Integer.MAX_VALUE, ((Http2DataFrame) msg).initialFlowControlledBytes() +
129 (long) MIN_HTTP2_FRAME_SIZE) : MIN_HTTP2_FRAME_SIZE;
130 }
131 };
132
133 @Override
134 public Handle newHandle() {
135 return HANDLE_INSTANCE;
136 }
137 }
138
139 private static final AtomicLongFieldUpdater<AbstractHttp2StreamChannel> TOTAL_PENDING_SIZE_UPDATER =
140 AtomicLongFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "totalPendingSize");
141
142 private static final AtomicIntegerFieldUpdater<AbstractHttp2StreamChannel> UNWRITABLE_UPDATER =
143 AtomicIntegerFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "unwritable");
144
145 private static void windowUpdateFrameWriteComplete(ChannelFuture future, Channel streamChannel) {
146 Throwable cause = future.cause();
147 if (cause != null) {
148 Throwable unwrappedCause;
149
150 if (cause instanceof Http2FrameStreamException && (unwrappedCause = cause.getCause()) != null) {
151 cause = unwrappedCause;
152 }
153
154
155 streamChannel.pipeline().fireExceptionCaught(cause);
156 streamChannel.unsafe().close(streamChannel.unsafe().voidPromise());
157 }
158 }
159
160 private final ChannelFutureListener windowUpdateFrameWriteListener = future ->
161 windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
162
163
164
165
166 private enum ReadStatus {
167
168
169
170 IDLE,
171
172
173
174
175 IN_PROGRESS,
176
177
178
179
180 REQUESTED
181 }
182
183 private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
184 private final Http2ChannelUnsafe unsafe = new Http2ChannelUnsafe();
185 private final Http2StreamChannelId channelId;
186 private final ChannelPipeline pipeline;
187 private final DefaultHttp2FrameStream stream;
188 private final ChannelPromise closePromise;
189
190 private volatile boolean registered;
191
192 private volatile long totalPendingSize;
193 private volatile int unwritable;
194
195
196 private Runnable fireChannelWritabilityChangedTask;
197
198 private boolean outboundClosed;
199 private int flowControlledBytes;
200
201
202
203
204
205
206
207 private ReadStatus readStatus = ReadStatus.IDLE;
208
209 private Queue<Object> inboundBuffer;
210
211
212 private boolean firstFrameWritten;
213 private boolean readCompletePending;
214
215 AbstractHttp2StreamChannel(DefaultHttp2FrameStream stream, int id, ChannelHandler inboundHandler) {
216 this.stream = stream;
217 stream.attachment = this;
218 pipeline = new DefaultChannelPipeline(this) {
219 @Override
220 protected void incrementPendingOutboundBytes(long size) {
221 AbstractHttp2StreamChannel.this.incrementPendingOutboundBytes(size, true);
222 }
223
224 @Override
225 protected void decrementPendingOutboundBytes(long size) {
226 AbstractHttp2StreamChannel.this.decrementPendingOutboundBytes(size, true);
227 }
228
229 @Override
230 protected void onUnhandledInboundException(Throwable cause) {
231
232 if (cause instanceof Http2FrameStreamException) {
233 closeWithError(((Http2FrameStreamException) cause).error());
234 return;
235 } else {
236 Http2Exception exception = Http2CodecUtil.getEmbeddedHttp2Exception(cause);
237 if (exception != null) {
238 closeWithError(exception.error());
239 return;
240 }
241 }
242 super.onUnhandledInboundException(cause);
243 }
244 };
245
246 closePromise = pipeline.newPromise();
247 channelId = new Http2StreamChannelId(parent().id(), id);
248
249 if (inboundHandler != null) {
250
251 pipeline.addLast(inboundHandler);
252 }
253 }
254
255 private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
256 if (size == 0) {
257 return;
258 }
259
260 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
261 if (newWriteBufferSize > config().getWriteBufferHighWaterMark()) {
262 setUnwritable(invokeLater);
263 }
264 }
265
266 private void decrementPendingOutboundBytes(long size, boolean invokeLater) {
267 if (size == 0) {
268 return;
269 }
270
271 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
272
273
274
275
276
277 if (newWriteBufferSize < config().getWriteBufferLowWaterMark() && parent().isWritable()) {
278 setWritable(invokeLater);
279 }
280 }
281
282 final void trySetWritable() {
283
284
285
286
287 if (totalPendingSize < config().getWriteBufferLowWaterMark()) {
288 setWritable(false);
289 }
290 }
291
292 private void setWritable(boolean invokeLater) {
293 for (;;) {
294 final int oldValue = unwritable;
295 final int newValue = oldValue & ~1;
296 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
297 if (oldValue != 0 && newValue == 0) {
298 fireChannelWritabilityChanged(invokeLater);
299 }
300 break;
301 }
302 }
303 }
304
305 private void setUnwritable(boolean invokeLater) {
306 for (;;) {
307 final int oldValue = unwritable;
308 final int newValue = oldValue | 1;
309 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
310 if (oldValue == 0) {
311 fireChannelWritabilityChanged(invokeLater);
312 }
313 break;
314 }
315 }
316 }
317
318 private void fireChannelWritabilityChanged(boolean invokeLater) {
319 final ChannelPipeline pipeline = pipeline();
320 if (invokeLater) {
321 Runnable task = fireChannelWritabilityChangedTask;
322 if (task == null) {
323 fireChannelWritabilityChangedTask = task = new Runnable() {
324 @Override
325 public void run() {
326 pipeline.fireChannelWritabilityChanged();
327 }
328 };
329 }
330 eventLoop().execute(task);
331 } else {
332 pipeline.fireChannelWritabilityChanged();
333 }
334 }
335 @Override
336 public Http2FrameStream stream() {
337 return stream;
338 }
339
340 void closeOutbound() {
341 outboundClosed = true;
342 }
343
344 void streamClosed() {
345 unsafe.readEOS();
346
347
348 unsafe.doBeginRead();
349 }
350
351 @Override
352 public ChannelMetadata metadata() {
353 return METADATA;
354 }
355
356 @Override
357 public ChannelConfig config() {
358 return config;
359 }
360
361 @Override
362 public boolean isOpen() {
363 return !closePromise.isDone();
364 }
365
366 @Override
367 public boolean isActive() {
368 return isOpen();
369 }
370
371 @Override
372 public boolean isWritable() {
373 return unwritable == 0;
374 }
375
376 @Override
377 public ChannelId id() {
378 return channelId;
379 }
380
381 @Override
382 public EventLoop eventLoop() {
383 return parent().eventLoop();
384 }
385
386 @Override
387 public Channel parent() {
388 return parentContext().channel();
389 }
390
391 @Override
392 public boolean isRegistered() {
393 return registered;
394 }
395
396 @Override
397 public SocketAddress localAddress() {
398 return parent().localAddress();
399 }
400
401 @Override
402 public SocketAddress remoteAddress() {
403 return parent().remoteAddress();
404 }
405
406 @Override
407 public ChannelFuture closeFuture() {
408 return closePromise;
409 }
410
411 @Override
412 public long bytesBeforeUnwritable() {
413
414 long bytes = config().getWriteBufferHighWaterMark() - totalPendingSize + 1;
415
416
417
418 return bytes > 0 && isWritable() ? bytes : 0;
419 }
420
421 @Override
422 public long bytesBeforeWritable() {
423
424 long bytes = totalPendingSize - config().getWriteBufferLowWaterMark() + 1;
425
426
427
428 return bytes <= 0 || isWritable() ? 0 : bytes;
429 }
430
431 @Override
432 public Unsafe unsafe() {
433 return unsafe;
434 }
435
436 @Override
437 public ChannelPipeline pipeline() {
438 return pipeline;
439 }
440
441 @Override
442 public ByteBufAllocator alloc() {
443 return config().getAllocator();
444 }
445
446 @Override
447 public Channel read() {
448 pipeline().read();
449 return this;
450 }
451
452 @Override
453 public Channel flush() {
454 pipeline().flush();
455 return this;
456 }
457
458 @Override
459 public ChannelFuture bind(SocketAddress localAddress) {
460 return pipeline().bind(localAddress);
461 }
462
463 @Override
464 public ChannelFuture connect(SocketAddress remoteAddress) {
465 return pipeline().connect(remoteAddress);
466 }
467
468 @Override
469 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
470 return pipeline().connect(remoteAddress, localAddress);
471 }
472
473 @Override
474 public ChannelFuture disconnect() {
475 return pipeline().disconnect();
476 }
477
478 @Override
479 public ChannelFuture close() {
480 return pipeline().close();
481 }
482
483 @Override
484 public ChannelFuture deregister() {
485 return pipeline().deregister();
486 }
487
488 @Override
489 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
490 return pipeline().bind(localAddress, promise);
491 }
492
493 @Override
494 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
495 return pipeline().connect(remoteAddress, promise);
496 }
497
498 @Override
499 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
500 return pipeline().connect(remoteAddress, localAddress, promise);
501 }
502
503 @Override
504 public ChannelFuture disconnect(ChannelPromise promise) {
505 return pipeline().disconnect(promise);
506 }
507
508 @Override
509 public ChannelFuture close(ChannelPromise promise) {
510 return pipeline().close(promise);
511 }
512
513 @Override
514 public ChannelFuture deregister(ChannelPromise promise) {
515 return pipeline().deregister(promise);
516 }
517
518 @Override
519 public ChannelFuture write(Object msg) {
520 return pipeline().write(msg);
521 }
522
523 @Override
524 public ChannelFuture write(Object msg, ChannelPromise promise) {
525 return pipeline().write(msg, promise);
526 }
527
528 @Override
529 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
530 return pipeline().writeAndFlush(msg, promise);
531 }
532
533 @Override
534 public ChannelFuture writeAndFlush(Object msg) {
535 return pipeline().writeAndFlush(msg);
536 }
537
538 @Override
539 public ChannelPromise newPromise() {
540 return pipeline().newPromise();
541 }
542
543 @Override
544 public ChannelProgressivePromise newProgressivePromise() {
545 return pipeline().newProgressivePromise();
546 }
547
548 @Override
549 public ChannelFuture newSucceededFuture() {
550 return pipeline().newSucceededFuture();
551 }
552
553 @Override
554 public ChannelFuture newFailedFuture(Throwable cause) {
555 return pipeline().newFailedFuture(cause);
556 }
557
558 @Override
559 public ChannelPromise voidPromise() {
560 return pipeline().voidPromise();
561 }
562
563 @Override
564 public int hashCode() {
565 return id().hashCode();
566 }
567
568 @Override
569 public boolean equals(Object o) {
570 return this == o;
571 }
572
573 @Override
574 public int compareTo(Channel o) {
575 if (this == o) {
576 return 0;
577 }
578
579 return id().compareTo(o.id());
580 }
581
582 @Override
583 public String toString() {
584 return parent().toString() + '/' + channelId.getSequenceId() + " (H2 - " + stream + ')';
585 }
586
587
588
589
590
591 void fireChildRead(Http2Frame frame) {
592 assert eventLoop().inEventLoop();
593 if (!isActive()) {
594 ReferenceCountUtil.release(frame);
595 } else if (readStatus != ReadStatus.IDLE) {
596
597
598 assert inboundBuffer == null || inboundBuffer.isEmpty();
599 final RecvByteBufAllocator.Handle allocHandle = unsafe.recvBufAllocHandle();
600
601 unsafe.doRead0(frame, allocHandle);
602
603
604
605
606 if (allocHandle.continueReading()) {
607 maybeAddChannelToReadCompletePendingQueue();
608 } else {
609 unsafe.notifyReadComplete(allocHandle, true, false);
610 }
611 } else {
612 if (inboundBuffer == null) {
613 inboundBuffer = new ArrayDeque<Object>(4);
614 }
615 inboundBuffer.add(frame);
616 }
617 }
618
619 void fireChildReadComplete() {
620 assert eventLoop().inEventLoop();
621 assert readStatus != ReadStatus.IDLE || !readCompletePending;
622 unsafe.notifyReadComplete(unsafe.recvBufAllocHandle(), false, false);
623 }
624
625 final void closeWithError(Http2Error error) {
626 assert eventLoop().inEventLoop();
627 unsafe.close(unsafe.voidPromise(), error);
628 }
629
630 private final class Http2ChannelUnsafe implements Unsafe {
631 private final VoidChannelPromise unsafeVoidPromise =
632 new VoidChannelPromise(AbstractHttp2StreamChannel.this, false);
633 @SuppressWarnings("deprecation")
634 private RecvByteBufAllocator.Handle recvHandle;
635 private boolean writeDoneAndNoFlush;
636 private boolean closeInitiated;
637 private boolean readEOS;
638
639 private boolean receivedEndOfStream;
640 private boolean sentEndOfStream;
641
642 @Override
643 public void connect(final SocketAddress remoteAddress,
644 SocketAddress localAddress, final ChannelPromise promise) {
645 if (!promise.setUncancellable()) {
646 return;
647 }
648 promise.setFailure(new UnsupportedOperationException());
649 }
650
651 @Override
652 public RecvByteBufAllocator.Handle recvBufAllocHandle() {
653 if (recvHandle == null) {
654 recvHandle = config().getRecvByteBufAllocator().newHandle();
655 recvHandle.reset(config());
656 }
657 return recvHandle;
658 }
659
660 @Override
661 public SocketAddress localAddress() {
662 return parent().unsafe().localAddress();
663 }
664
665 @Override
666 public SocketAddress remoteAddress() {
667 return parent().unsafe().remoteAddress();
668 }
669
670 @Override
671 public void register(EventLoop eventLoop, ChannelPromise promise) {
672 if (!promise.setUncancellable()) {
673 return;
674 }
675 if (registered) {
676 promise.setFailure(new UnsupportedOperationException("Re-register is not supported"));
677 return;
678 }
679
680 registered = true;
681
682 promise.setSuccess();
683
684 pipeline().fireChannelRegistered();
685 if (isActive()) {
686 pipeline().fireChannelActive();
687 }
688 }
689
690 @Override
691 public void bind(SocketAddress localAddress, ChannelPromise promise) {
692 if (!promise.setUncancellable()) {
693 return;
694 }
695 promise.setFailure(new UnsupportedOperationException());
696 }
697
698 @Override
699 public void disconnect(ChannelPromise promise) {
700 close(promise);
701 }
702
703 @Override
704 public void close(final ChannelPromise promise) {
705 close(promise, null);
706 }
707
708 private void close(final ChannelPromise promise, Http2Error error) {
709 if (!promise.setUncancellable()) {
710 return;
711 }
712 if (closeInitiated) {
713 if (closePromise.isDone()) {
714
715 promise.setSuccess();
716 } else if (!(promise instanceof VoidChannelPromise)) {
717
718 closePromise.addListener(future -> promise.setSuccess());
719 }
720 return;
721 }
722 closeInitiated = true;
723
724 readCompletePending = false;
725
726 final boolean wasActive = isActive();
727
728
729
730
731
732
733 if (parent().isActive() && isStreamIdValid(stream.id())) {
734
735
736 if (error == null) {
737 if (!readEOS && !(receivedEndOfStream && sentEndOfStream)) {
738 Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream());
739 write(resetFrame, unsafe().voidPromise());
740 flush();
741 }
742 } else {
743
744 Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(error).stream(stream());
745 write(resetFrame, unsafe().voidPromise());
746 flush();
747 }
748 }
749
750 if (inboundBuffer != null) {
751 for (;;) {
752 Object msg = inboundBuffer.poll();
753 if (msg == null) {
754 break;
755 }
756 ReferenceCountUtil.release(msg);
757 }
758 inboundBuffer = null;
759 }
760
761
762 outboundClosed = true;
763 closePromise.setSuccess();
764 promise.setSuccess();
765
766 fireChannelInactiveAndDeregister(voidPromise(), wasActive);
767 }
768
769 @Override
770 public void closeForcibly() {
771 close(unsafe().voidPromise());
772 }
773
774 @Override
775 public void deregister(ChannelPromise promise) {
776 fireChannelInactiveAndDeregister(promise, false);
777 }
778
779 private void fireChannelInactiveAndDeregister(final ChannelPromise promise,
780 final boolean fireChannelInactive) {
781 if (!promise.setUncancellable()) {
782 return;
783 }
784
785 if (!registered) {
786 promise.setSuccess();
787 return;
788 }
789
790
791
792
793
794
795
796
797 invokeLater(promise.channel(), new Runnable() {
798 @Override
799 public void run() {
800 if (fireChannelInactive) {
801 pipeline.fireChannelInactive();
802 }
803
804
805 if (registered) {
806 registered = false;
807 pipeline.fireChannelUnregistered();
808 }
809 safeSetSuccess(promise);
810 }
811 });
812 }
813
814 private void safeSetSuccess(ChannelPromise promise) {
815 if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
816 logger.warn("{} Failed to mark a promise as success because it is done already: {}",
817 promise.channel(), promise);
818 }
819 }
820
821 private void invokeLater(Channel channel, Runnable task) {
822 try {
823
824
825
826
827
828
829
830
831
832
833
834 eventLoop().execute(task);
835 } catch (RejectedExecutionException e) {
836 logger.warn("{} Can't invoke task later as EventLoop rejected it", channel, e);
837 }
838 }
839
840 @Override
841 public void beginRead() {
842 if (!isActive()) {
843 return;
844 }
845 updateLocalWindowIfNeeded();
846
847 switch (readStatus) {
848 case IDLE:
849 readStatus = ReadStatus.IN_PROGRESS;
850 doBeginRead();
851 break;
852 case IN_PROGRESS:
853 readStatus = ReadStatus.REQUESTED;
854 break;
855 default:
856 break;
857 }
858 }
859
860 private Object pollQueuedMessage() {
861 return inboundBuffer == null ? null : inboundBuffer.poll();
862 }
863
864 void doBeginRead() {
865 if (readStatus == ReadStatus.IDLE) {
866
867 if (readEOS && (inboundBuffer == null || inboundBuffer.isEmpty())) {
868
869 flush();
870 unsafe.closeForcibly();
871 }
872 } else {
873 do {
874 Object message = pollQueuedMessage();
875 if (message == null) {
876
877 flush();
878 if (readEOS) {
879 unsafe.closeForcibly();
880 }
881 break;
882 }
883 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
884 allocHandle.reset(config());
885 boolean continueReading = false;
886 do {
887 doRead0((Http2Frame) message, allocHandle);
888 } while ((readEOS || (continueReading = allocHandle.continueReading()))
889 && (message = pollQueuedMessage()) != null);
890
891 if (continueReading && isParentReadInProgress() && !readEOS) {
892
893
894
895
896 maybeAddChannelToReadCompletePendingQueue();
897 } else {
898 notifyReadComplete(allocHandle, true, true);
899
900
901
902
903 resetReadStatus();
904 }
905 } while (readStatus != ReadStatus.IDLE);
906 }
907 }
908
909 void readEOS() {
910 readEOS = true;
911 }
912
913 private boolean updateLocalWindowIfNeeded() {
914 if (flowControlledBytes != 0 && !parentContext().isRemoved() && config.autoStreamFlowControl) {
915 int bytes = flowControlledBytes;
916 flowControlledBytes = 0;
917 writeWindowUpdateFrame(new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
918 return true;
919 }
920 return false;
921 }
922
923 void updateLocalWindowIfNeededAndFlush() {
924 if (updateLocalWindowIfNeeded()) {
925 flush();
926 }
927 }
928
929 private void resetReadStatus() {
930 readStatus = readStatus == ReadStatus.REQUESTED ? ReadStatus.IN_PROGRESS : ReadStatus.IDLE;
931 }
932
933 void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle, boolean forceReadComplete,
934 boolean inReadLoop) {
935 if (!readCompletePending && !forceReadComplete) {
936 return;
937 }
938
939 readCompletePending = false;
940
941 if (!inReadLoop) {
942
943 resetReadStatus();
944 }
945
946 allocHandle.readComplete();
947 pipeline().fireChannelReadComplete();
948
949
950
951 flush();
952 if (readEOS) {
953 unsafe.closeForcibly();
954 }
955 }
956
957 @SuppressWarnings("deprecation")
958 void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
959 final int bytes;
960 if (frame instanceof Http2DataFrame) {
961 bytes = ((Http2DataFrame) frame).initialFlowControlledBytes();
962
963
964
965
966
967 flowControlledBytes += bytes;
968 } else {
969 bytes = MIN_HTTP2_FRAME_SIZE;
970 }
971
972
973
974 receivedEndOfStream |= isEndOfStream(frame);
975
976
977 allocHandle.attemptedBytesRead(bytes);
978 allocHandle.lastBytesRead(bytes);
979 allocHandle.incMessagesRead(1);
980
981 pipeline().fireChannelRead(frame);
982 }
983
984 private ChannelFuture writeWindowUpdateFrame(Http2WindowUpdateFrame windowUpdateFrame) {
985 ChannelFuture future = write0(parentContext(), windowUpdateFrame);
986
987
988
989 writeDoneAndNoFlush = true;
990
991
992
993
994 if (future.isDone()) {
995 windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
996 } else {
997 future.addListener(windowUpdateFrameWriteListener);
998 }
999 return future;
1000 }
1001
1002 @Override
1003 public void write(Object msg, final ChannelPromise promise) {
1004
1005 if (!promise.setUncancellable()) {
1006 ReferenceCountUtil.release(msg);
1007 return;
1008 }
1009
1010 if (!isActive() ||
1011
1012 outboundClosed && (msg instanceof Http2HeadersFrame || msg instanceof Http2DataFrame)) {
1013 ReferenceCountUtil.release(msg);
1014 promise.setFailure(new ClosedChannelException());
1015 return;
1016 }
1017
1018 try {
1019 if (msg instanceof Http2StreamFrame) {
1020 Http2StreamFrame frame = validateStreamFrame((Http2StreamFrame) msg).stream(stream());
1021 if (msg instanceof Http2WindowUpdateFrame) {
1022 Http2WindowUpdateFrame updateFrame = (Http2WindowUpdateFrame) msg;
1023 if (config.autoStreamFlowControl) {
1024 ReferenceCountUtil.release(msg);
1025 promise.setFailure(new UnsupportedOperationException(
1026 Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL + " is set to false"));
1027 return;
1028 }
1029 try {
1030 ObjectUtil.checkInRange(updateFrame.windowSizeIncrement(), 0,
1031 flowControlledBytes, "windowSizeIncrement");
1032 } catch (RuntimeException e) {
1033 ReferenceCountUtil.release(updateFrame);
1034 promise.setFailure(e);
1035 return;
1036 }
1037 flowControlledBytes -= updateFrame.windowSizeIncrement();
1038 if (parentContext().isRemoved()) {
1039 ReferenceCountUtil.release(msg);
1040 promise.setFailure(new ClosedChannelException());
1041 return;
1042 }
1043 ChannelFuture f = writeWindowUpdateFrame(updateFrame);
1044 if (f.isDone()) {
1045 writeComplete(f, promise);
1046 } else {
1047 f.addListener(future -> writeComplete(future, promise));
1048 }
1049 } else {
1050 writeHttp2StreamFrame(frame, promise);
1051 }
1052 } else {
1053 String msgStr = msg.toString();
1054 ReferenceCountUtil.release(msg);
1055 promise.setFailure(new IllegalArgumentException(
1056 "Message must be an " + StringUtil.simpleClassName(Http2StreamFrame.class) +
1057 ": " + msgStr));
1058 }
1059 } catch (Throwable t) {
1060 promise.tryFailure(t);
1061 }
1062 }
1063
1064 private boolean isEndOfStream(Http2Frame frame) {
1065 if (frame instanceof Http2HeadersFrame) {
1066 return ((Http2HeadersFrame) frame).isEndStream();
1067 }
1068 if (frame instanceof Http2DataFrame) {
1069 return ((Http2DataFrame) frame).isEndStream();
1070 }
1071 return false;
1072 }
1073
1074 private void writeHttp2StreamFrame(Http2StreamFrame frame, final ChannelPromise promise) {
1075 if (!firstFrameWritten && !isStreamIdValid(stream().id()) && !(frame instanceof Http2HeadersFrame)) {
1076 ReferenceCountUtil.release(frame);
1077 promise.setFailure(
1078 new IllegalArgumentException("The first frame must be a headers frame. Was: "
1079 + frame.name()));
1080 return;
1081 }
1082
1083 final boolean firstWrite;
1084 if (firstFrameWritten) {
1085 firstWrite = false;
1086 } else {
1087 firstWrite = firstFrameWritten = true;
1088 }
1089
1090
1091
1092 sentEndOfStream |= isEndOfStream(frame);
1093 ChannelFuture f = write0(parentContext(), frame);
1094 if (f.isDone()) {
1095 if (firstWrite) {
1096 firstWriteComplete(f, promise);
1097 } else {
1098 writeComplete(f, promise);
1099 }
1100 } else {
1101 final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(frame);
1102 incrementPendingOutboundBytes(bytes, false);
1103 f.addListener(future -> {
1104 if (firstWrite) {
1105 firstWriteComplete(future, promise);
1106 } else {
1107 writeComplete(future, promise);
1108 }
1109 decrementPendingOutboundBytes(bytes, false);
1110 });
1111 writeDoneAndNoFlush = true;
1112 }
1113 }
1114
1115 private void firstWriteComplete(Future<?> future, ChannelPromise promise) {
1116 Throwable cause = future.cause();
1117 if (cause == null) {
1118 promise.setSuccess();
1119 } else {
1120
1121 closeForcibly();
1122 promise.setFailure(wrapStreamClosedError(cause));
1123 }
1124 }
1125
1126 private void writeComplete(Future<?> future, ChannelPromise promise) {
1127 Throwable cause = future.cause();
1128 if (cause == null) {
1129 promise.setSuccess();
1130 } else {
1131 Throwable error = wrapStreamClosedError(cause);
1132
1133 if (error instanceof IOException) {
1134 if (config.isAutoClose()) {
1135
1136 closeForcibly();
1137 } else {
1138
1139 outboundClosed = true;
1140 }
1141 }
1142 promise.setFailure(error);
1143 }
1144 }
1145
1146 private Throwable wrapStreamClosedError(Throwable cause) {
1147
1148
1149 if (cause instanceof Http2Exception && ((Http2Exception) cause).error() == Http2Error.STREAM_CLOSED) {
1150 return new ClosedChannelException().initCause(cause);
1151 }
1152 return cause;
1153 }
1154
1155 private Http2StreamFrame validateStreamFrame(Http2StreamFrame frame) {
1156 if (frame.stream() != null && frame.stream() != stream) {
1157 String msgString = frame.toString();
1158 ReferenceCountUtil.release(frame);
1159 throw new IllegalArgumentException(
1160 "Stream " + frame.stream() + " must not be set on the frame: " + msgString);
1161 }
1162 return frame;
1163 }
1164
1165 @Override
1166 public void flush() {
1167
1168
1169
1170
1171 if (!writeDoneAndNoFlush || isParentReadInProgress()) {
1172
1173 return;
1174 }
1175
1176
1177 writeDoneAndNoFlush = false;
1178 flush0(parentContext());
1179 }
1180
1181 @Override
1182 public ChannelPromise voidPromise() {
1183 return unsafeVoidPromise;
1184 }
1185
1186 @Override
1187 public ChannelOutboundBuffer outboundBuffer() {
1188
1189 return null;
1190 }
1191 }
1192
1193
1194
1195
1196
1197
1198 private static final class Http2StreamChannelConfig extends DefaultChannelConfig {
1199
1200 volatile boolean autoStreamFlowControl = true;
1201 Http2StreamChannelConfig(Channel channel) {
1202 super(channel);
1203 }
1204
1205 @Override
1206 public MessageSizeEstimator getMessageSizeEstimator() {
1207 return FlowControlledFrameSizeEstimator.INSTANCE;
1208 }
1209
1210 @Override
1211 public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
1212 throw new UnsupportedOperationException();
1213 }
1214
1215 @Override
1216 public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
1217 if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
1218 throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " +
1219 RecvByteBufAllocator.ExtendedHandle.class);
1220 }
1221 super.setRecvByteBufAllocator(allocator);
1222 return this;
1223 }
1224
1225 @Override
1226 public Map<ChannelOption<?>, Object> getOptions() {
1227 return getOptions(
1228 super.getOptions(),
1229 Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL);
1230 }
1231
1232 @SuppressWarnings("unchecked")
1233 @Override
1234 public <T> T getOption(ChannelOption<T> option) {
1235 if (option == Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL) {
1236 return (T) Boolean.valueOf(autoStreamFlowControl);
1237 }
1238 return super.getOption(option);
1239 }
1240
1241 @Override
1242 public <T> boolean setOption(ChannelOption<T> option, T value) {
1243 validate(option, value);
1244 if (option == Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL) {
1245 boolean newValue = (Boolean) value;
1246 boolean changed = newValue && !autoStreamFlowControl;
1247 autoStreamFlowControl = (Boolean) value;
1248 if (changed) {
1249 if (channel.isRegistered()) {
1250 final Http2ChannelUnsafe unsafe = (Http2ChannelUnsafe) channel.unsafe();
1251 if (channel.eventLoop().inEventLoop()) {
1252 unsafe.updateLocalWindowIfNeededAndFlush();
1253 } else {
1254 channel.eventLoop().execute(new Runnable() {
1255 @Override
1256 public void run() {
1257 unsafe.updateLocalWindowIfNeededAndFlush();
1258 }
1259 });
1260 }
1261 }
1262 }
1263 return true;
1264 }
1265 return super.setOption(option, value);
1266 }
1267 }
1268
1269 private void maybeAddChannelToReadCompletePendingQueue() {
1270 if (!readCompletePending) {
1271 readCompletePending = true;
1272 addChannelToReadCompletePendingQueue();
1273 }
1274 }
1275
1276 protected void flush0(ChannelHandlerContext ctx) {
1277 ctx.flush();
1278 }
1279
1280 protected ChannelFuture write0(ChannelHandlerContext ctx, Object msg) {
1281 ChannelPromise promise = ctx.newPromise();
1282 ctx.write(msg, promise);
1283 return promise;
1284 }
1285
1286 protected abstract boolean isParentReadInProgress();
1287 protected abstract void addChannelToReadCompletePendingQueue();
1288 protected abstract ChannelHandlerContext parentContext();
1289 }