1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http2;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelConfig;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandler;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelId;
26 import io.netty.channel.ChannelMetadata;
27 import io.netty.channel.ChannelOutboundBuffer;
28 import io.netty.channel.ChannelPipeline;
29 import io.netty.channel.ChannelProgressivePromise;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.DefaultChannelConfig;
32 import io.netty.channel.DefaultChannelPipeline;
33 import io.netty.channel.EventLoop;
34 import io.netty.channel.MessageSizeEstimator;
35 import io.netty.channel.RecvByteBufAllocator;
36 import io.netty.channel.VoidChannelPromise;
37 import io.netty.channel.WriteBufferWaterMark;
38 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39 import io.netty.channel.socket.ChannelOutputShutdownEvent;
40 import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
41 import io.netty.handler.ssl.SslCloseCompletionEvent;
42 import io.netty.util.DefaultAttributeMap;
43 import io.netty.util.ReferenceCountUtil;
44 import io.netty.util.internal.StringUtil;
45 import io.netty.util.internal.logging.InternalLogger;
46 import io.netty.util.internal.logging.InternalLoggerFactory;
47
48 import java.io.IOException;
49 import java.net.SocketAddress;
50 import java.nio.channels.ClosedChannelException;
51 import java.util.ArrayDeque;
52 import java.util.Queue;
53 import java.util.concurrent.RejectedExecutionException;
54 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
55 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
56
57 import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
58 import static io.netty.util.internal.ObjectUtil.checkNotNull;
59 import static java.lang.Math.min;
60
61 abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
62
63 static final Http2FrameStreamVisitor WRITABLE_VISITOR = new Http2FrameStreamVisitor() {
64 @Override
65 public boolean visit(Http2FrameStream stream) {
66 final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
67 ((DefaultHttp2FrameStream) stream).attachment;
68 childChannel.trySetWritable();
69 return true;
70 }
71 };
72
73 static final Http2FrameStreamVisitor CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR =
74 new UserEventStreamVisitor(ChannelInputShutdownReadComplete.INSTANCE);
75
76 static final Http2FrameStreamVisitor CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR =
77 new UserEventStreamVisitor(ChannelOutputShutdownEvent.INSTANCE);
78
79 static final Http2FrameStreamVisitor SSL_CLOSE_COMPLETION_EVENT_VISITOR =
80 new UserEventStreamVisitor(SslCloseCompletionEvent.SUCCESS);
81
82 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractHttp2StreamChannel.class);
83
84 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
85
86
87
88
89
90 private static final int MIN_HTTP2_FRAME_SIZE = 9;
91
92
93
94
95 private static final class UserEventStreamVisitor implements Http2FrameStreamVisitor {
96
97 private final Object event;
98
99 UserEventStreamVisitor(Object event) {
100 this.event = checkNotNull(event, "event");
101 }
102
103 @Override
104 public boolean visit(Http2FrameStream stream) {
105 final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
106 ((DefaultHttp2FrameStream) stream).attachment;
107 childChannel.pipeline().fireUserEventTriggered(event);
108 return true;
109 }
110 }
111
112
113
114
115 private static final class FlowControlledFrameSizeEstimator implements MessageSizeEstimator {
116
117 static final FlowControlledFrameSizeEstimator INSTANCE = new FlowControlledFrameSizeEstimator();
118
119 private static final Handle HANDLE_INSTANCE = new Handle() {
120 @Override
121 public int size(Object msg) {
122 return msg instanceof Http2DataFrame ?
123
124 (int) min(Integer.MAX_VALUE, ((Http2DataFrame) msg).initialFlowControlledBytes() +
125 (long) MIN_HTTP2_FRAME_SIZE) : MIN_HTTP2_FRAME_SIZE;
126 }
127 };
128
129 @Override
130 public Handle newHandle() {
131 return HANDLE_INSTANCE;
132 }
133 }
134
135 private static final AtomicLongFieldUpdater<AbstractHttp2StreamChannel> TOTAL_PENDING_SIZE_UPDATER =
136 AtomicLongFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "totalPendingSize");
137
138 private static final AtomicIntegerFieldUpdater<AbstractHttp2StreamChannel> UNWRITABLE_UPDATER =
139 AtomicIntegerFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "unwritable");
140
141 private static void windowUpdateFrameWriteComplete(ChannelFuture future, Channel streamChannel) {
142 Throwable cause = future.cause();
143 if (cause != null) {
144 Throwable unwrappedCause;
145
146 if (cause instanceof Http2FrameStreamException && (unwrappedCause = cause.getCause()) != null) {
147 cause = unwrappedCause;
148 }
149
150
151 streamChannel.pipeline().fireExceptionCaught(cause);
152 streamChannel.unsafe().close(streamChannel.unsafe().voidPromise());
153 }
154 }
155
156 private final ChannelFutureListener windowUpdateFrameWriteListener = new ChannelFutureListener() {
157 @Override
158 public void operationComplete(ChannelFuture future) {
159 windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
160 }
161 };
162
163
164
165
166 private enum ReadStatus {
167
168
169
170 IDLE,
171
172
173
174
175 IN_PROGRESS,
176
177
178
179
180 REQUESTED
181 }
182
183 private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
184 private final Http2ChannelUnsafe unsafe = new Http2ChannelUnsafe();
185 private final ChannelId channelId;
186 private final ChannelPipeline pipeline;
187 private final DefaultHttp2FrameStream stream;
188 private final ChannelPromise closePromise;
189
190 private volatile boolean registered;
191
192 private volatile long totalPendingSize;
193 private volatile int unwritable;
194
195
196 private Runnable fireChannelWritabilityChangedTask;
197
198 private boolean outboundClosed;
199 private int flowControlledBytes;
200
201
202
203
204
205
206
207 private ReadStatus readStatus = ReadStatus.IDLE;
208
209 private Queue<Object> inboundBuffer;
210
211
212 private boolean firstFrameWritten;
213 private boolean readCompletePending;
214
215 AbstractHttp2StreamChannel(DefaultHttp2FrameStream stream, int id, ChannelHandler inboundHandler) {
216 this.stream = stream;
217 stream.attachment = this;
218 pipeline = new DefaultChannelPipeline(this) {
219 @Override
220 protected void incrementPendingOutboundBytes(long size) {
221 AbstractHttp2StreamChannel.this.incrementPendingOutboundBytes(size, true);
222 }
223
224 @Override
225 protected void decrementPendingOutboundBytes(long size) {
226 AbstractHttp2StreamChannel.this.decrementPendingOutboundBytes(size, true);
227 }
228
229 @Override
230 protected void onUnhandledInboundException(Throwable cause) {
231
232 if (cause instanceof Http2FrameStreamException) {
233 closeWithError(((Http2FrameStreamException) cause).error());
234 return;
235 } else {
236 Http2Exception exception = Http2CodecUtil.getEmbeddedHttp2Exception(cause);
237 if (exception != null) {
238 closeWithError(exception.error());
239 return;
240 }
241 }
242 super.onUnhandledInboundException(cause);
243 }
244 };
245
246 closePromise = pipeline.newPromise();
247 channelId = new Http2StreamChannelId(parent().id(), id);
248
249 if (inboundHandler != null) {
250
251 pipeline.addLast(inboundHandler);
252 }
253 }
254
255 private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
256 if (size == 0) {
257 return;
258 }
259
260 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
261 if (newWriteBufferSize > config().getWriteBufferHighWaterMark()) {
262 setUnwritable(invokeLater);
263 }
264 }
265
266 private void decrementPendingOutboundBytes(long size, boolean invokeLater) {
267 if (size == 0) {
268 return;
269 }
270
271 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
272
273
274
275
276
277 if (newWriteBufferSize < config().getWriteBufferLowWaterMark() && parent().isWritable()) {
278 setWritable(invokeLater);
279 }
280 }
281
282 final void trySetWritable() {
283
284
285
286
287 if (totalPendingSize < config().getWriteBufferLowWaterMark()) {
288 setWritable(false);
289 }
290 }
291
292 private void setWritable(boolean invokeLater) {
293 for (;;) {
294 final int oldValue = unwritable;
295 final int newValue = oldValue & ~1;
296 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
297 if (oldValue != 0 && newValue == 0) {
298 fireChannelWritabilityChanged(invokeLater);
299 }
300 break;
301 }
302 }
303 }
304
305 private void setUnwritable(boolean invokeLater) {
306 for (;;) {
307 final int oldValue = unwritable;
308 final int newValue = oldValue | 1;
309 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
310 if (oldValue == 0) {
311 fireChannelWritabilityChanged(invokeLater);
312 }
313 break;
314 }
315 }
316 }
317
318 private void fireChannelWritabilityChanged(boolean invokeLater) {
319 final ChannelPipeline pipeline = pipeline();
320 if (invokeLater) {
321 Runnable task = fireChannelWritabilityChangedTask;
322 if (task == null) {
323 fireChannelWritabilityChangedTask = task = new Runnable() {
324 @Override
325 public void run() {
326 pipeline.fireChannelWritabilityChanged();
327 }
328 };
329 }
330 eventLoop().execute(task);
331 } else {
332 pipeline.fireChannelWritabilityChanged();
333 }
334 }
335 @Override
336 public Http2FrameStream stream() {
337 return stream;
338 }
339
340 void closeOutbound() {
341 outboundClosed = true;
342 }
343
344 void streamClosed() {
345 unsafe.readEOS();
346
347
348 unsafe.doBeginRead();
349 }
350
351 @Override
352 public ChannelMetadata metadata() {
353 return METADATA;
354 }
355
356 @Override
357 public ChannelConfig config() {
358 return config;
359 }
360
361 @Override
362 public boolean isOpen() {
363 return !closePromise.isDone();
364 }
365
366 @Override
367 public boolean isActive() {
368 return isOpen();
369 }
370
371 @Override
372 public boolean isWritable() {
373 return unwritable == 0;
374 }
375
376 @Override
377 public ChannelId id() {
378 return channelId;
379 }
380
381 @Override
382 public EventLoop eventLoop() {
383 return parent().eventLoop();
384 }
385
386 @Override
387 public Channel parent() {
388 return parentContext().channel();
389 }
390
391 @Override
392 public boolean isRegistered() {
393 return registered;
394 }
395
396 @Override
397 public SocketAddress localAddress() {
398 return parent().localAddress();
399 }
400
401 @Override
402 public SocketAddress remoteAddress() {
403 return parent().remoteAddress();
404 }
405
406 @Override
407 public ChannelFuture closeFuture() {
408 return closePromise;
409 }
410
411 @Override
412 public long bytesBeforeUnwritable() {
413
414 long bytes = config().getWriteBufferHighWaterMark() - totalPendingSize + 1;
415
416
417
418 return bytes > 0 && isWritable() ? bytes : 0;
419 }
420
421 @Override
422 public long bytesBeforeWritable() {
423
424 long bytes = totalPendingSize - config().getWriteBufferLowWaterMark() + 1;
425
426
427
428 return bytes <= 0 || isWritable() ? 0 : bytes;
429 }
430
431 @Override
432 public Unsafe unsafe() {
433 return unsafe;
434 }
435
436 @Override
437 public ChannelPipeline pipeline() {
438 return pipeline;
439 }
440
441 @Override
442 public ByteBufAllocator alloc() {
443 return config().getAllocator();
444 }
445
446 @Override
447 public Channel read() {
448 pipeline().read();
449 return this;
450 }
451
452 @Override
453 public Channel flush() {
454 pipeline().flush();
455 return this;
456 }
457
458 @Override
459 public ChannelFuture bind(SocketAddress localAddress) {
460 return pipeline().bind(localAddress);
461 }
462
463 @Override
464 public ChannelFuture connect(SocketAddress remoteAddress) {
465 return pipeline().connect(remoteAddress);
466 }
467
468 @Override
469 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
470 return pipeline().connect(remoteAddress, localAddress);
471 }
472
473 @Override
474 public ChannelFuture disconnect() {
475 return pipeline().disconnect();
476 }
477
478 @Override
479 public ChannelFuture close() {
480 return pipeline().close();
481 }
482
483 @Override
484 public ChannelFuture deregister() {
485 return pipeline().deregister();
486 }
487
488 @Override
489 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
490 return pipeline().bind(localAddress, promise);
491 }
492
493 @Override
494 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
495 return pipeline().connect(remoteAddress, promise);
496 }
497
498 @Override
499 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
500 return pipeline().connect(remoteAddress, localAddress, promise);
501 }
502
503 @Override
504 public ChannelFuture disconnect(ChannelPromise promise) {
505 return pipeline().disconnect(promise);
506 }
507
508 @Override
509 public ChannelFuture close(ChannelPromise promise) {
510 return pipeline().close(promise);
511 }
512
513 @Override
514 public ChannelFuture deregister(ChannelPromise promise) {
515 return pipeline().deregister(promise);
516 }
517
518 @Override
519 public ChannelFuture write(Object msg) {
520 return pipeline().write(msg);
521 }
522
523 @Override
524 public ChannelFuture write(Object msg, ChannelPromise promise) {
525 return pipeline().write(msg, promise);
526 }
527
528 @Override
529 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
530 return pipeline().writeAndFlush(msg, promise);
531 }
532
533 @Override
534 public ChannelFuture writeAndFlush(Object msg) {
535 return pipeline().writeAndFlush(msg);
536 }
537
538 @Override
539 public ChannelPromise newPromise() {
540 return pipeline().newPromise();
541 }
542
543 @Override
544 public ChannelProgressivePromise newProgressivePromise() {
545 return pipeline().newProgressivePromise();
546 }
547
548 @Override
549 public ChannelFuture newSucceededFuture() {
550 return pipeline().newSucceededFuture();
551 }
552
553 @Override
554 public ChannelFuture newFailedFuture(Throwable cause) {
555 return pipeline().newFailedFuture(cause);
556 }
557
558 @Override
559 public ChannelPromise voidPromise() {
560 return pipeline().voidPromise();
561 }
562
563 @Override
564 public int hashCode() {
565 return id().hashCode();
566 }
567
568 @Override
569 public boolean equals(Object o) {
570 return this == o;
571 }
572
573 @Override
574 public int compareTo(Channel o) {
575 if (this == o) {
576 return 0;
577 }
578
579 return id().compareTo(o.id());
580 }
581
582 @Override
583 public String toString() {
584 return parent().toString() + "(H2 - " + stream + ')';
585 }
586
587
588
589
590
591 void fireChildRead(Http2Frame frame) {
592 assert eventLoop().inEventLoop();
593 if (!isActive()) {
594 ReferenceCountUtil.release(frame);
595 } else if (readStatus != ReadStatus.IDLE) {
596
597
598 assert inboundBuffer == null || inboundBuffer.isEmpty();
599 final RecvByteBufAllocator.Handle allocHandle = unsafe.recvBufAllocHandle();
600
601 unsafe.doRead0(frame, allocHandle);
602
603
604
605
606 if (allocHandle.continueReading()) {
607 maybeAddChannelToReadCompletePendingQueue();
608 } else {
609 unsafe.notifyReadComplete(allocHandle, true, false);
610 }
611 } else {
612 if (inboundBuffer == null) {
613 inboundBuffer = new ArrayDeque<Object>(4);
614 }
615 inboundBuffer.add(frame);
616 }
617 }
618
619 void fireChildReadComplete() {
620 assert eventLoop().inEventLoop();
621 assert readStatus != ReadStatus.IDLE || !readCompletePending;
622 unsafe.notifyReadComplete(unsafe.recvBufAllocHandle(), false, false);
623 }
624
625 final void closeWithError(Http2Error error) {
626 assert eventLoop().inEventLoop();
627 unsafe.close(unsafe.voidPromise(), error);
628 }
629
630 private final class Http2ChannelUnsafe implements Unsafe {
631 private final VoidChannelPromise unsafeVoidPromise =
632 new VoidChannelPromise(AbstractHttp2StreamChannel.this, false);
633 @SuppressWarnings("deprecation")
634 private RecvByteBufAllocator.Handle recvHandle;
635 private boolean writeDoneAndNoFlush;
636 private boolean closeInitiated;
637 private boolean readEOS;
638
639 private boolean receivedEndOfStream;
640 private boolean sentEndOfStream;
641
642 @Override
643 public void connect(final SocketAddress remoteAddress,
644 SocketAddress localAddress, final ChannelPromise promise) {
645 if (!promise.setUncancellable()) {
646 return;
647 }
648 promise.setFailure(new UnsupportedOperationException());
649 }
650
651 @Override
652 public RecvByteBufAllocator.Handle recvBufAllocHandle() {
653 if (recvHandle == null) {
654 recvHandle = config().getRecvByteBufAllocator().newHandle();
655 recvHandle.reset(config());
656 }
657 return recvHandle;
658 }
659
660 @Override
661 public SocketAddress localAddress() {
662 return parent().unsafe().localAddress();
663 }
664
665 @Override
666 public SocketAddress remoteAddress() {
667 return parent().unsafe().remoteAddress();
668 }
669
670 @Override
671 public void register(EventLoop eventLoop, ChannelPromise promise) {
672 if (!promise.setUncancellable()) {
673 return;
674 }
675 if (registered) {
676 promise.setFailure(new UnsupportedOperationException("Re-register is not supported"));
677 return;
678 }
679
680 registered = true;
681
682 promise.setSuccess();
683
684 pipeline().fireChannelRegistered();
685 if (isActive()) {
686 pipeline().fireChannelActive();
687 }
688 }
689
690 @Override
691 public void bind(SocketAddress localAddress, ChannelPromise promise) {
692 if (!promise.setUncancellable()) {
693 return;
694 }
695 promise.setFailure(new UnsupportedOperationException());
696 }
697
698 @Override
699 public void disconnect(ChannelPromise promise) {
700 close(promise);
701 }
702
703 @Override
704 public void close(final ChannelPromise promise) {
705 close(promise, Http2Error.CANCEL);
706 }
707
708 void close(final ChannelPromise promise, Http2Error error) {
709 if (!promise.setUncancellable()) {
710 return;
711 }
712 if (closeInitiated) {
713 if (closePromise.isDone()) {
714
715 promise.setSuccess();
716 } else if (!(promise instanceof VoidChannelPromise)) {
717
718 closePromise.addListener(new ChannelFutureListener() {
719 @Override
720 public void operationComplete(ChannelFuture future) {
721 promise.setSuccess();
722 }
723 });
724 }
725 return;
726 }
727 closeInitiated = true;
728
729 readCompletePending = false;
730
731 final boolean wasActive = isActive();
732
733
734
735
736
737
738 if (parent().isActive() && isStreamIdValid(stream.id()) &&
739
740 !readEOS && !(receivedEndOfStream && sentEndOfStream)) {
741 Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(error).stream(stream());
742 write(resetFrame, unsafe().voidPromise());
743 flush();
744 }
745
746 if (inboundBuffer != null) {
747 for (;;) {
748 Object msg = inboundBuffer.poll();
749 if (msg == null) {
750 break;
751 }
752 ReferenceCountUtil.release(msg);
753 }
754 inboundBuffer = null;
755 }
756
757
758 outboundClosed = true;
759 closePromise.setSuccess();
760 promise.setSuccess();
761
762 fireChannelInactiveAndDeregister(voidPromise(), wasActive);
763 }
764
765 @Override
766 public void closeForcibly() {
767 close(unsafe().voidPromise());
768 }
769
770 @Override
771 public void deregister(ChannelPromise promise) {
772 fireChannelInactiveAndDeregister(promise, false);
773 }
774
775 private void fireChannelInactiveAndDeregister(final ChannelPromise promise,
776 final boolean fireChannelInactive) {
777 if (!promise.setUncancellable()) {
778 return;
779 }
780
781 if (!registered) {
782 promise.setSuccess();
783 return;
784 }
785
786
787
788
789
790
791
792
793 invokeLater(promise.channel(), new Runnable() {
794 @Override
795 public void run() {
796 if (fireChannelInactive) {
797 pipeline.fireChannelInactive();
798 }
799
800
801 if (registered) {
802 registered = false;
803 pipeline.fireChannelUnregistered();
804 }
805 safeSetSuccess(promise);
806 }
807 });
808 }
809
810 private void safeSetSuccess(ChannelPromise promise) {
811 if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
812 logger.warn("{} Failed to mark a promise as success because it is done already: {}",
813 promise.channel(), promise);
814 }
815 }
816
817 private void invokeLater(Channel channel, Runnable task) {
818 try {
819
820
821
822
823
824
825
826
827
828
829
830 eventLoop().execute(task);
831 } catch (RejectedExecutionException e) {
832 logger.warn("{} Can't invoke task later as EventLoop rejected it", channel, e);
833 }
834 }
835
836 @Override
837 public void beginRead() {
838 if (!isActive()) {
839 return;
840 }
841 updateLocalWindowIfNeeded();
842
843 switch (readStatus) {
844 case IDLE:
845 readStatus = ReadStatus.IN_PROGRESS;
846 doBeginRead();
847 break;
848 case IN_PROGRESS:
849 readStatus = ReadStatus.REQUESTED;
850 break;
851 default:
852 break;
853 }
854 }
855
856 private Object pollQueuedMessage() {
857 return inboundBuffer == null ? null : inboundBuffer.poll();
858 }
859
860 void doBeginRead() {
861 if (readStatus == ReadStatus.IDLE) {
862
863 if (readEOS && (inboundBuffer == null || inboundBuffer.isEmpty())) {
864
865 flush();
866 unsafe.closeForcibly();
867 }
868 } else {
869 do {
870 Object message = pollQueuedMessage();
871 if (message == null) {
872
873 flush();
874 if (readEOS) {
875 unsafe.closeForcibly();
876 }
877 break;
878 }
879 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
880 allocHandle.reset(config());
881 boolean continueReading = false;
882 do {
883 doRead0((Http2Frame) message, allocHandle);
884 } while ((readEOS || (continueReading = allocHandle.continueReading()))
885 && (message = pollQueuedMessage()) != null);
886
887 if (continueReading && isParentReadInProgress() && !readEOS) {
888
889
890
891
892 maybeAddChannelToReadCompletePendingQueue();
893 } else {
894 notifyReadComplete(allocHandle, true, true);
895
896
897
898
899 resetReadStatus();
900 }
901 } while (readStatus != ReadStatus.IDLE);
902 }
903 }
904
905 void readEOS() {
906 readEOS = true;
907 }
908
909 private void updateLocalWindowIfNeeded() {
910 if (flowControlledBytes != 0 && !parentContext().isRemoved()) {
911 int bytes = flowControlledBytes;
912 flowControlledBytes = 0;
913 ChannelFuture future = write0(parentContext(), new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
914
915
916
917 writeDoneAndNoFlush = true;
918
919
920
921
922 if (future.isDone()) {
923 windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
924 } else {
925 future.addListener(windowUpdateFrameWriteListener);
926 }
927 }
928 }
929
930 private void resetReadStatus() {
931 readStatus = readStatus == ReadStatus.REQUESTED ? ReadStatus.IN_PROGRESS : ReadStatus.IDLE;
932 }
933
934 void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle, boolean forceReadComplete,
935 boolean inReadLoop) {
936 if (!readCompletePending && !forceReadComplete) {
937 return;
938 }
939
940 readCompletePending = false;
941
942 if (!inReadLoop) {
943
944 resetReadStatus();
945 }
946
947 allocHandle.readComplete();
948 pipeline().fireChannelReadComplete();
949
950
951
952 flush();
953 if (readEOS) {
954 unsafe.closeForcibly();
955 }
956 }
957
958 @SuppressWarnings("deprecation")
959 void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
960 final int bytes;
961 if (frame instanceof Http2DataFrame) {
962 bytes = ((Http2DataFrame) frame).initialFlowControlledBytes();
963
964
965
966
967
968 flowControlledBytes += bytes;
969 } else {
970 bytes = MIN_HTTP2_FRAME_SIZE;
971 }
972
973
974
975 receivedEndOfStream |= isEndOfStream(frame);
976
977
978 allocHandle.attemptedBytesRead(bytes);
979 allocHandle.lastBytesRead(bytes);
980 allocHandle.incMessagesRead(1);
981
982 pipeline().fireChannelRead(frame);
983 }
984
985 @Override
986 public void write(Object msg, final ChannelPromise promise) {
987
988 if (!promise.setUncancellable()) {
989 ReferenceCountUtil.release(msg);
990 return;
991 }
992
993 if (!isActive() ||
994
995 outboundClosed && (msg instanceof Http2HeadersFrame || msg instanceof Http2DataFrame)) {
996 ReferenceCountUtil.release(msg);
997 promise.setFailure(new ClosedChannelException());
998 return;
999 }
1000
1001 try {
1002 if (msg instanceof Http2StreamFrame) {
1003 Http2StreamFrame frame = validateStreamFrame((Http2StreamFrame) msg).stream(stream());
1004 writeHttp2StreamFrame(frame, promise);
1005 } else {
1006 String msgStr = msg.toString();
1007 ReferenceCountUtil.release(msg);
1008 promise.setFailure(new IllegalArgumentException(
1009 "Message must be an " + StringUtil.simpleClassName(Http2StreamFrame.class) +
1010 ": " + msgStr));
1011 }
1012 } catch (Throwable t) {
1013 promise.tryFailure(t);
1014 }
1015 }
1016
1017 private boolean isEndOfStream(Http2Frame frame) {
1018 if (frame instanceof Http2HeadersFrame) {
1019 return ((Http2HeadersFrame) frame).isEndStream();
1020 }
1021 if (frame instanceof Http2DataFrame) {
1022 return ((Http2DataFrame) frame).isEndStream();
1023 }
1024 return false;
1025 }
1026
1027 private void writeHttp2StreamFrame(Http2StreamFrame frame, final ChannelPromise promise) {
1028 if (!firstFrameWritten && !isStreamIdValid(stream().id()) && !(frame instanceof Http2HeadersFrame)) {
1029 ReferenceCountUtil.release(frame);
1030 promise.setFailure(
1031 new IllegalArgumentException("The first frame must be a headers frame. Was: "
1032 + frame.name()));
1033 return;
1034 }
1035
1036 final boolean firstWrite;
1037 if (firstFrameWritten) {
1038 firstWrite = false;
1039 } else {
1040 firstWrite = firstFrameWritten = true;
1041 }
1042
1043
1044
1045 sentEndOfStream |= isEndOfStream(frame);
1046 ChannelFuture f = write0(parentContext(), frame);
1047 if (f.isDone()) {
1048 if (firstWrite) {
1049 firstWriteComplete(f, promise);
1050 } else {
1051 writeComplete(f, promise);
1052 }
1053 } else {
1054 final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(frame);
1055 incrementPendingOutboundBytes(bytes, false);
1056 f.addListener(new ChannelFutureListener() {
1057 @Override
1058 public void operationComplete(ChannelFuture future) {
1059 if (firstWrite) {
1060 firstWriteComplete(future, promise);
1061 } else {
1062 writeComplete(future, promise);
1063 }
1064 decrementPendingOutboundBytes(bytes, false);
1065 }
1066 });
1067 writeDoneAndNoFlush = true;
1068 }
1069 }
1070
1071 private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) {
1072 Throwable cause = future.cause();
1073 if (cause == null) {
1074 promise.setSuccess();
1075 } else {
1076
1077 closeForcibly();
1078 promise.setFailure(wrapStreamClosedError(cause));
1079 }
1080 }
1081
1082 private void writeComplete(ChannelFuture future, ChannelPromise promise) {
1083 Throwable cause = future.cause();
1084 if (cause == null) {
1085 promise.setSuccess();
1086 } else {
1087 Throwable error = wrapStreamClosedError(cause);
1088
1089 if (error instanceof IOException) {
1090 if (config.isAutoClose()) {
1091
1092 closeForcibly();
1093 } else {
1094
1095 outboundClosed = true;
1096 }
1097 }
1098 promise.setFailure(error);
1099 }
1100 }
1101
1102 private Throwable wrapStreamClosedError(Throwable cause) {
1103
1104
1105 if (cause instanceof Http2Exception && ((Http2Exception) cause).error() == Http2Error.STREAM_CLOSED) {
1106 return new ClosedChannelException().initCause(cause);
1107 }
1108 return cause;
1109 }
1110
1111 private Http2StreamFrame validateStreamFrame(Http2StreamFrame frame) {
1112 if (frame.stream() != null && frame.stream() != stream) {
1113 String msgString = frame.toString();
1114 ReferenceCountUtil.release(frame);
1115 throw new IllegalArgumentException(
1116 "Stream " + frame.stream() + " must not be set on the frame: " + msgString);
1117 }
1118 return frame;
1119 }
1120
1121 @Override
1122 public void flush() {
1123
1124
1125
1126
1127 if (!writeDoneAndNoFlush || isParentReadInProgress()) {
1128
1129 return;
1130 }
1131
1132
1133 writeDoneAndNoFlush = false;
1134 flush0(parentContext());
1135 }
1136
1137 @Override
1138 public ChannelPromise voidPromise() {
1139 return unsafeVoidPromise;
1140 }
1141
1142 @Override
1143 public ChannelOutboundBuffer outboundBuffer() {
1144
1145 return null;
1146 }
1147 }
1148
1149
1150
1151
1152
1153
1154 private static final class Http2StreamChannelConfig extends DefaultChannelConfig {
1155 Http2StreamChannelConfig(Channel channel) {
1156 super(channel);
1157 }
1158
1159 @Override
1160 public MessageSizeEstimator getMessageSizeEstimator() {
1161 return FlowControlledFrameSizeEstimator.INSTANCE;
1162 }
1163
1164 @Override
1165 public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
1166 throw new UnsupportedOperationException();
1167 }
1168
1169 @Override
1170 public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
1171 if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
1172 throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " +
1173 RecvByteBufAllocator.ExtendedHandle.class);
1174 }
1175 super.setRecvByteBufAllocator(allocator);
1176 return this;
1177 }
1178 }
1179
1180 private void maybeAddChannelToReadCompletePendingQueue() {
1181 if (!readCompletePending) {
1182 readCompletePending = true;
1183 addChannelToReadCompletePendingQueue();
1184 }
1185 }
1186
1187 protected void flush0(ChannelHandlerContext ctx) {
1188 ctx.flush();
1189 }
1190
1191 protected ChannelFuture write0(ChannelHandlerContext ctx, Object msg) {
1192 ChannelPromise promise = ctx.newPromise();
1193 ctx.write(msg, promise);
1194 return promise;
1195 }
1196
1197 protected abstract boolean isParentReadInProgress();
1198 protected abstract void addChannelToReadCompletePendingQueue();
1199 protected abstract ChannelHandlerContext parentContext();
1200 }