1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http2;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelConfig;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandler;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelId;
26 import io.netty.channel.ChannelMetadata;
27 import io.netty.channel.ChannelOption;
28 import io.netty.channel.ChannelOutboundBuffer;
29 import io.netty.channel.ChannelPipeline;
30 import io.netty.channel.ChannelProgressivePromise;
31 import io.netty.channel.ChannelPromise;
32 import io.netty.channel.DefaultChannelConfig;
33 import io.netty.channel.DefaultChannelPipeline;
34 import io.netty.channel.EventLoop;
35 import io.netty.channel.MessageSizeEstimator;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.VoidChannelPromise;
38 import io.netty.channel.WriteBufferWaterMark;
39 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
40 import io.netty.channel.socket.ChannelOutputShutdownEvent;
41 import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
42 import io.netty.handler.ssl.SslCloseCompletionEvent;
43 import io.netty.util.DefaultAttributeMap;
44 import io.netty.util.ReferenceCountUtil;
45 import io.netty.util.internal.ObjectUtil;
46 import io.netty.util.internal.StringUtil;
47 import io.netty.util.internal.logging.InternalLogger;
48 import io.netty.util.internal.logging.InternalLoggerFactory;
49
50 import java.io.IOException;
51 import java.net.SocketAddress;
52 import java.nio.channels.ClosedChannelException;
53 import java.util.ArrayDeque;
54 import java.util.Map;
55 import java.util.Queue;
56 import java.util.concurrent.RejectedExecutionException;
57 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
58 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
59
60 import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
61 import static io.netty.util.internal.ObjectUtil.checkNotNull;
62 import static java.lang.Math.min;
63
64 abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
65
66 static final Http2FrameStreamVisitor WRITABLE_VISITOR = new Http2FrameStreamVisitor() {
67 @Override
68 public boolean visit(Http2FrameStream stream) {
69 final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
70 ((DefaultHttp2FrameStream) stream).attachment;
71 childChannel.trySetWritable();
72 return true;
73 }
74 };
75
76 static final Http2FrameStreamVisitor CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR =
77 new UserEventStreamVisitor(ChannelInputShutdownReadComplete.INSTANCE);
78
79 static final Http2FrameStreamVisitor CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR =
80 new UserEventStreamVisitor(ChannelOutputShutdownEvent.INSTANCE);
81
82 static final Http2FrameStreamVisitor SSL_CLOSE_COMPLETION_EVENT_VISITOR =
83 new UserEventStreamVisitor(SslCloseCompletionEvent.SUCCESS);
84
85 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractHttp2StreamChannel.class);
86
87 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
88
89
90
91
92
93 private static final int MIN_HTTP2_FRAME_SIZE = 9;
94
95
96
97
98 private static final class UserEventStreamVisitor implements Http2FrameStreamVisitor {
99
100 private final Object event;
101
102 UserEventStreamVisitor(Object event) {
103 this.event = checkNotNull(event, "event");
104 }
105
106 @Override
107 public boolean visit(Http2FrameStream stream) {
108 final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
109 ((DefaultHttp2FrameStream) stream).attachment;
110 childChannel.pipeline().fireUserEventTriggered(event);
111 return true;
112 }
113 }
114
115
116
117
118 private static final class FlowControlledFrameSizeEstimator implements MessageSizeEstimator {
119
120 static final FlowControlledFrameSizeEstimator INSTANCE = new FlowControlledFrameSizeEstimator();
121
122 private static final Handle HANDLE_INSTANCE = new Handle() {
123 @Override
124 public int size(Object msg) {
125 return msg instanceof Http2DataFrame ?
126
127 (int) min(Integer.MAX_VALUE, ((Http2DataFrame) msg).initialFlowControlledBytes() +
128 (long) MIN_HTTP2_FRAME_SIZE) : MIN_HTTP2_FRAME_SIZE;
129 }
130 };
131
132 @Override
133 public Handle newHandle() {
134 return HANDLE_INSTANCE;
135 }
136 }
137
138 private static final AtomicLongFieldUpdater<AbstractHttp2StreamChannel> TOTAL_PENDING_SIZE_UPDATER =
139 AtomicLongFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "totalPendingSize");
140
141 private static final AtomicIntegerFieldUpdater<AbstractHttp2StreamChannel> UNWRITABLE_UPDATER =
142 AtomicIntegerFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "unwritable");
143
144 private static void windowUpdateFrameWriteComplete(ChannelFuture future, Channel streamChannel) {
145 Throwable cause = future.cause();
146 if (cause != null) {
147 Throwable unwrappedCause;
148
149 if (cause instanceof Http2FrameStreamException && (unwrappedCause = cause.getCause()) != null) {
150 cause = unwrappedCause;
151 }
152
153
154 streamChannel.pipeline().fireExceptionCaught(cause);
155 streamChannel.unsafe().close(streamChannel.unsafe().voidPromise());
156 }
157 }
158
159 private final ChannelFutureListener windowUpdateFrameWriteListener = new ChannelFutureListener() {
160 @Override
161 public void operationComplete(ChannelFuture future) {
162 windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
163 }
164 };
165
166
167
168
169 private enum ReadStatus {
170
171
172
173 IDLE,
174
175
176
177
178 IN_PROGRESS,
179
180
181
182
183 REQUESTED
184 }
185
186 private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
187 private final Http2ChannelUnsafe unsafe = new Http2ChannelUnsafe();
188 private final ChannelId channelId;
189 private final ChannelPipeline pipeline;
190 private final DefaultHttp2FrameStream stream;
191 private final ChannelPromise closePromise;
192
193 private volatile boolean registered;
194
195 private volatile long totalPendingSize;
196 private volatile int unwritable;
197
198
199 private Runnable fireChannelWritabilityChangedTask;
200
201 private boolean outboundClosed;
202 private int flowControlledBytes;
203
204
205
206
207
208
209
210 private ReadStatus readStatus = ReadStatus.IDLE;
211
212 private Queue<Object> inboundBuffer;
213
214
215 private boolean firstFrameWritten;
216 private boolean readCompletePending;
217
218 AbstractHttp2StreamChannel(DefaultHttp2FrameStream stream, int id, ChannelHandler inboundHandler) {
219 this.stream = stream;
220 stream.attachment = this;
221 pipeline = new DefaultChannelPipeline(this) {
222 @Override
223 protected void incrementPendingOutboundBytes(long size) {
224 AbstractHttp2StreamChannel.this.incrementPendingOutboundBytes(size, true);
225 }
226
227 @Override
228 protected void decrementPendingOutboundBytes(long size) {
229 AbstractHttp2StreamChannel.this.decrementPendingOutboundBytes(size, true);
230 }
231
232 @Override
233 protected void onUnhandledInboundException(Throwable cause) {
234
235 if (cause instanceof Http2FrameStreamException) {
236 closeWithError(((Http2FrameStreamException) cause).error());
237 return;
238 } else {
239 Http2Exception exception = Http2CodecUtil.getEmbeddedHttp2Exception(cause);
240 if (exception != null) {
241 closeWithError(exception.error());
242 return;
243 }
244 }
245 super.onUnhandledInboundException(cause);
246 }
247 };
248
249 closePromise = pipeline.newPromise();
250 channelId = new Http2StreamChannelId(parent().id(), id);
251
252 if (inboundHandler != null) {
253
254 pipeline.addLast(inboundHandler);
255 }
256 }
257
258 private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
259 if (size == 0) {
260 return;
261 }
262
263 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
264 if (newWriteBufferSize > config().getWriteBufferHighWaterMark()) {
265 setUnwritable(invokeLater);
266 }
267 }
268
269 private void decrementPendingOutboundBytes(long size, boolean invokeLater) {
270 if (size == 0) {
271 return;
272 }
273
274 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
275
276
277
278
279
280 if (newWriteBufferSize < config().getWriteBufferLowWaterMark() && parent().isWritable()) {
281 setWritable(invokeLater);
282 }
283 }
284
285 final void trySetWritable() {
286
287
288
289
290 if (totalPendingSize < config().getWriteBufferLowWaterMark()) {
291 setWritable(false);
292 }
293 }
294
295 private void setWritable(boolean invokeLater) {
296 for (;;) {
297 final int oldValue = unwritable;
298 final int newValue = oldValue & ~1;
299 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
300 if (oldValue != 0 && newValue == 0) {
301 fireChannelWritabilityChanged(invokeLater);
302 }
303 break;
304 }
305 }
306 }
307
308 private void setUnwritable(boolean invokeLater) {
309 for (;;) {
310 final int oldValue = unwritable;
311 final int newValue = oldValue | 1;
312 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
313 if (oldValue == 0) {
314 fireChannelWritabilityChanged(invokeLater);
315 }
316 break;
317 }
318 }
319 }
320
321 private void fireChannelWritabilityChanged(boolean invokeLater) {
322 final ChannelPipeline pipeline = pipeline();
323 if (invokeLater) {
324 Runnable task = fireChannelWritabilityChangedTask;
325 if (task == null) {
326 fireChannelWritabilityChangedTask = task = new Runnable() {
327 @Override
328 public void run() {
329 pipeline.fireChannelWritabilityChanged();
330 }
331 };
332 }
333 eventLoop().execute(task);
334 } else {
335 pipeline.fireChannelWritabilityChanged();
336 }
337 }
338 @Override
339 public Http2FrameStream stream() {
340 return stream;
341 }
342
343 void closeOutbound() {
344 outboundClosed = true;
345 }
346
347 void streamClosed() {
348 unsafe.readEOS();
349
350
351 unsafe.doBeginRead();
352 }
353
354 @Override
355 public ChannelMetadata metadata() {
356 return METADATA;
357 }
358
359 @Override
360 public ChannelConfig config() {
361 return config;
362 }
363
364 @Override
365 public boolean isOpen() {
366 return !closePromise.isDone();
367 }
368
369 @Override
370 public boolean isActive() {
371 return isOpen();
372 }
373
374 @Override
375 public boolean isWritable() {
376 return unwritable == 0;
377 }
378
379 @Override
380 public ChannelId id() {
381 return channelId;
382 }
383
384 @Override
385 public EventLoop eventLoop() {
386 return parent().eventLoop();
387 }
388
389 @Override
390 public Channel parent() {
391 return parentContext().channel();
392 }
393
394 @Override
395 public boolean isRegistered() {
396 return registered;
397 }
398
399 @Override
400 public SocketAddress localAddress() {
401 return parent().localAddress();
402 }
403
404 @Override
405 public SocketAddress remoteAddress() {
406 return parent().remoteAddress();
407 }
408
409 @Override
410 public ChannelFuture closeFuture() {
411 return closePromise;
412 }
413
414 @Override
415 public long bytesBeforeUnwritable() {
416
417 long bytes = config().getWriteBufferHighWaterMark() - totalPendingSize + 1;
418
419
420
421 return bytes > 0 && isWritable() ? bytes : 0;
422 }
423
424 @Override
425 public long bytesBeforeWritable() {
426
427 long bytes = totalPendingSize - config().getWriteBufferLowWaterMark() + 1;
428
429
430
431 return bytes <= 0 || isWritable() ? 0 : bytes;
432 }
433
434 @Override
435 public Unsafe unsafe() {
436 return unsafe;
437 }
438
439 @Override
440 public ChannelPipeline pipeline() {
441 return pipeline;
442 }
443
444 @Override
445 public ByteBufAllocator alloc() {
446 return config().getAllocator();
447 }
448
449 @Override
450 public Channel read() {
451 pipeline().read();
452 return this;
453 }
454
455 @Override
456 public Channel flush() {
457 pipeline().flush();
458 return this;
459 }
460
461 @Override
462 public ChannelFuture bind(SocketAddress localAddress) {
463 return pipeline().bind(localAddress);
464 }
465
466 @Override
467 public ChannelFuture connect(SocketAddress remoteAddress) {
468 return pipeline().connect(remoteAddress);
469 }
470
471 @Override
472 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
473 return pipeline().connect(remoteAddress, localAddress);
474 }
475
476 @Override
477 public ChannelFuture disconnect() {
478 return pipeline().disconnect();
479 }
480
481 @Override
482 public ChannelFuture close() {
483 return pipeline().close();
484 }
485
486 @Override
487 public ChannelFuture deregister() {
488 return pipeline().deregister();
489 }
490
491 @Override
492 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
493 return pipeline().bind(localAddress, promise);
494 }
495
496 @Override
497 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
498 return pipeline().connect(remoteAddress, promise);
499 }
500
501 @Override
502 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
503 return pipeline().connect(remoteAddress, localAddress, promise);
504 }
505
506 @Override
507 public ChannelFuture disconnect(ChannelPromise promise) {
508 return pipeline().disconnect(promise);
509 }
510
511 @Override
512 public ChannelFuture close(ChannelPromise promise) {
513 return pipeline().close(promise);
514 }
515
516 @Override
517 public ChannelFuture deregister(ChannelPromise promise) {
518 return pipeline().deregister(promise);
519 }
520
521 @Override
522 public ChannelFuture write(Object msg) {
523 return pipeline().write(msg);
524 }
525
526 @Override
527 public ChannelFuture write(Object msg, ChannelPromise promise) {
528 return pipeline().write(msg, promise);
529 }
530
531 @Override
532 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
533 return pipeline().writeAndFlush(msg, promise);
534 }
535
536 @Override
537 public ChannelFuture writeAndFlush(Object msg) {
538 return pipeline().writeAndFlush(msg);
539 }
540
541 @Override
542 public ChannelPromise newPromise() {
543 return pipeline().newPromise();
544 }
545
546 @Override
547 public ChannelProgressivePromise newProgressivePromise() {
548 return pipeline().newProgressivePromise();
549 }
550
551 @Override
552 public ChannelFuture newSucceededFuture() {
553 return pipeline().newSucceededFuture();
554 }
555
556 @Override
557 public ChannelFuture newFailedFuture(Throwable cause) {
558 return pipeline().newFailedFuture(cause);
559 }
560
561 @Override
562 public ChannelPromise voidPromise() {
563 return pipeline().voidPromise();
564 }
565
566 @Override
567 public int hashCode() {
568 return id().hashCode();
569 }
570
571 @Override
572 public boolean equals(Object o) {
573 return this == o;
574 }
575
576 @Override
577 public int compareTo(Channel o) {
578 if (this == o) {
579 return 0;
580 }
581
582 return id().compareTo(o.id());
583 }
584
585 @Override
586 public String toString() {
587 return parent().toString() + "(H2 - " + stream + ')';
588 }
589
590
591
592
593
594 void fireChildRead(Http2Frame frame) {
595 assert eventLoop().inEventLoop();
596 if (!isActive()) {
597 ReferenceCountUtil.release(frame);
598 } else if (readStatus != ReadStatus.IDLE) {
599
600
601 assert inboundBuffer == null || inboundBuffer.isEmpty();
602 final RecvByteBufAllocator.Handle allocHandle = unsafe.recvBufAllocHandle();
603
604 unsafe.doRead0(frame, allocHandle);
605
606
607
608
609 if (allocHandle.continueReading()) {
610 maybeAddChannelToReadCompletePendingQueue();
611 } else {
612 unsafe.notifyReadComplete(allocHandle, true, false);
613 }
614 } else {
615 if (inboundBuffer == null) {
616 inboundBuffer = new ArrayDeque<Object>(4);
617 }
618 inboundBuffer.add(frame);
619 }
620 }
621
622 void fireChildReadComplete() {
623 assert eventLoop().inEventLoop();
624 assert readStatus != ReadStatus.IDLE || !readCompletePending;
625 unsafe.notifyReadComplete(unsafe.recvBufAllocHandle(), false, false);
626 }
627
628 final void closeWithError(Http2Error error) {
629 assert eventLoop().inEventLoop();
630 unsafe.close(unsafe.voidPromise(), error);
631 }
632
633 private final class Http2ChannelUnsafe implements Unsafe {
634 private final VoidChannelPromise unsafeVoidPromise =
635 new VoidChannelPromise(AbstractHttp2StreamChannel.this, false);
636 @SuppressWarnings("deprecation")
637 private RecvByteBufAllocator.Handle recvHandle;
638 private boolean writeDoneAndNoFlush;
639 private boolean closeInitiated;
640 private boolean readEOS;
641
642 private boolean receivedEndOfStream;
643 private boolean sentEndOfStream;
644
645 @Override
646 public void connect(final SocketAddress remoteAddress,
647 SocketAddress localAddress, final ChannelPromise promise) {
648 if (!promise.setUncancellable()) {
649 return;
650 }
651 promise.setFailure(new UnsupportedOperationException());
652 }
653
654 @Override
655 public RecvByteBufAllocator.Handle recvBufAllocHandle() {
656 if (recvHandle == null) {
657 recvHandle = config().getRecvByteBufAllocator().newHandle();
658 recvHandle.reset(config());
659 }
660 return recvHandle;
661 }
662
663 @Override
664 public SocketAddress localAddress() {
665 return parent().unsafe().localAddress();
666 }
667
668 @Override
669 public SocketAddress remoteAddress() {
670 return parent().unsafe().remoteAddress();
671 }
672
673 @Override
674 public void register(EventLoop eventLoop, ChannelPromise promise) {
675 if (!promise.setUncancellable()) {
676 return;
677 }
678 if (registered) {
679 promise.setFailure(new UnsupportedOperationException("Re-register is not supported"));
680 return;
681 }
682
683 registered = true;
684
685 promise.setSuccess();
686
687 pipeline().fireChannelRegistered();
688 if (isActive()) {
689 pipeline().fireChannelActive();
690 }
691 }
692
693 @Override
694 public void bind(SocketAddress localAddress, ChannelPromise promise) {
695 if (!promise.setUncancellable()) {
696 return;
697 }
698 promise.setFailure(new UnsupportedOperationException());
699 }
700
701 @Override
702 public void disconnect(ChannelPromise promise) {
703 close(promise);
704 }
705
706 @Override
707 public void close(final ChannelPromise promise) {
708 close(promise, null);
709 }
710
711 private void close(final ChannelPromise promise, Http2Error error) {
712 if (!promise.setUncancellable()) {
713 return;
714 }
715 if (closeInitiated) {
716 if (closePromise.isDone()) {
717
718 promise.setSuccess();
719 } else if (!(promise instanceof VoidChannelPromise)) {
720
721 closePromise.addListener(new ChannelFutureListener() {
722 @Override
723 public void operationComplete(ChannelFuture future) {
724 promise.setSuccess();
725 }
726 });
727 }
728 return;
729 }
730 closeInitiated = true;
731
732 readCompletePending = false;
733
734 final boolean wasActive = isActive();
735
736
737
738
739
740
741 if (parent().isActive() && isStreamIdValid(stream.id())) {
742
743
744 if (error == null) {
745 if (!readEOS && !(receivedEndOfStream && sentEndOfStream)) {
746 Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream());
747 write(resetFrame, unsafe().voidPromise());
748 flush();
749 }
750 } else {
751
752 Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(error).stream(stream());
753 write(resetFrame, unsafe().voidPromise());
754 flush();
755 }
756 }
757
758 if (inboundBuffer != null) {
759 for (;;) {
760 Object msg = inboundBuffer.poll();
761 if (msg == null) {
762 break;
763 }
764 ReferenceCountUtil.release(msg);
765 }
766 inboundBuffer = null;
767 }
768
769
770 outboundClosed = true;
771 closePromise.setSuccess();
772 promise.setSuccess();
773
774 fireChannelInactiveAndDeregister(voidPromise(), wasActive);
775 }
776
777 @Override
778 public void closeForcibly() {
779 close(unsafe().voidPromise());
780 }
781
782 @Override
783 public void deregister(ChannelPromise promise) {
784 fireChannelInactiveAndDeregister(promise, false);
785 }
786
787 private void fireChannelInactiveAndDeregister(final ChannelPromise promise,
788 final boolean fireChannelInactive) {
789 if (!promise.setUncancellable()) {
790 return;
791 }
792
793 if (!registered) {
794 promise.setSuccess();
795 return;
796 }
797
798
799
800
801
802
803
804
805 invokeLater(promise.channel(), new Runnable() {
806 @Override
807 public void run() {
808 if (fireChannelInactive) {
809 pipeline.fireChannelInactive();
810 }
811
812
813 if (registered) {
814 registered = false;
815 pipeline.fireChannelUnregistered();
816 }
817 safeSetSuccess(promise);
818 }
819 });
820 }
821
822 private void safeSetSuccess(ChannelPromise promise) {
823 if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
824 logger.warn("{} Failed to mark a promise as success because it is done already: {}",
825 promise.channel(), promise);
826 }
827 }
828
829 private void invokeLater(Channel channel, Runnable task) {
830 try {
831
832
833
834
835
836
837
838
839
840
841
842 eventLoop().execute(task);
843 } catch (RejectedExecutionException e) {
844 logger.warn("{} Can't invoke task later as EventLoop rejected it", channel, e);
845 }
846 }
847
848 @Override
849 public void beginRead() {
850 if (!isActive()) {
851 return;
852 }
853 updateLocalWindowIfNeeded();
854
855 switch (readStatus) {
856 case IDLE:
857 readStatus = ReadStatus.IN_PROGRESS;
858 doBeginRead();
859 break;
860 case IN_PROGRESS:
861 readStatus = ReadStatus.REQUESTED;
862 break;
863 default:
864 break;
865 }
866 }
867
868 private Object pollQueuedMessage() {
869 return inboundBuffer == null ? null : inboundBuffer.poll();
870 }
871
872 void doBeginRead() {
873 if (readStatus == ReadStatus.IDLE) {
874
875 if (readEOS && (inboundBuffer == null || inboundBuffer.isEmpty())) {
876
877 flush();
878 unsafe.closeForcibly();
879 }
880 } else {
881 do {
882 Object message = pollQueuedMessage();
883 if (message == null) {
884
885 flush();
886 if (readEOS) {
887 unsafe.closeForcibly();
888 }
889 break;
890 }
891 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
892 allocHandle.reset(config());
893 boolean continueReading = false;
894 do {
895 doRead0((Http2Frame) message, allocHandle);
896 } while ((readEOS || (continueReading = allocHandle.continueReading()))
897 && (message = pollQueuedMessage()) != null);
898
899 if (continueReading && isParentReadInProgress() && !readEOS) {
900
901
902
903
904 maybeAddChannelToReadCompletePendingQueue();
905 } else {
906 notifyReadComplete(allocHandle, true, true);
907
908
909
910
911 resetReadStatus();
912 }
913 } while (readStatus != ReadStatus.IDLE);
914 }
915 }
916
917 void readEOS() {
918 readEOS = true;
919 }
920
921 private boolean updateLocalWindowIfNeeded() {
922 if (flowControlledBytes != 0 && !parentContext().isRemoved() && config.autoStreamFlowControl) {
923 int bytes = flowControlledBytes;
924 flowControlledBytes = 0;
925 writeWindowUpdateFrame(new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
926 return true;
927 }
928 return false;
929 }
930
931 void updateLocalWindowIfNeededAndFlush() {
932 if (updateLocalWindowIfNeeded()) {
933 flush();
934 }
935 }
936
937 private void resetReadStatus() {
938 readStatus = readStatus == ReadStatus.REQUESTED ? ReadStatus.IN_PROGRESS : ReadStatus.IDLE;
939 }
940
941 void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle, boolean forceReadComplete,
942 boolean inReadLoop) {
943 if (!readCompletePending && !forceReadComplete) {
944 return;
945 }
946
947 readCompletePending = false;
948
949 if (!inReadLoop) {
950
951 resetReadStatus();
952 }
953
954 allocHandle.readComplete();
955 pipeline().fireChannelReadComplete();
956
957
958
959 flush();
960 if (readEOS) {
961 unsafe.closeForcibly();
962 }
963 }
964
965 @SuppressWarnings("deprecation")
966 void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
967 final int bytes;
968 if (frame instanceof Http2DataFrame) {
969 bytes = ((Http2DataFrame) frame).initialFlowControlledBytes();
970
971
972
973
974
975 flowControlledBytes += bytes;
976 } else {
977 bytes = MIN_HTTP2_FRAME_SIZE;
978 }
979
980
981
982 receivedEndOfStream |= isEndOfStream(frame);
983
984
985 allocHandle.attemptedBytesRead(bytes);
986 allocHandle.lastBytesRead(bytes);
987 allocHandle.incMessagesRead(1);
988
989 pipeline().fireChannelRead(frame);
990 }
991
992 private ChannelFuture writeWindowUpdateFrame(Http2WindowUpdateFrame windowUpdateFrame) {
993 ChannelFuture future = write0(parentContext(), windowUpdateFrame);
994
995
996
997 writeDoneAndNoFlush = true;
998
999
1000
1001
1002 if (future.isDone()) {
1003 windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
1004 } else {
1005 future.addListener(windowUpdateFrameWriteListener);
1006 }
1007 return future;
1008 }
1009
1010 @Override
1011 public void write(Object msg, final ChannelPromise promise) {
1012
1013 if (!promise.setUncancellable()) {
1014 ReferenceCountUtil.release(msg);
1015 return;
1016 }
1017
1018 if (!isActive() ||
1019
1020 outboundClosed && (msg instanceof Http2HeadersFrame || msg instanceof Http2DataFrame)) {
1021 ReferenceCountUtil.release(msg);
1022 promise.setFailure(new ClosedChannelException());
1023 return;
1024 }
1025
1026 try {
1027 if (msg instanceof Http2StreamFrame) {
1028 Http2StreamFrame frame = validateStreamFrame((Http2StreamFrame) msg).stream(stream());
1029 if (msg instanceof Http2WindowUpdateFrame) {
1030 Http2WindowUpdateFrame updateFrame = (Http2WindowUpdateFrame) msg;
1031 if (config.autoStreamFlowControl) {
1032 ReferenceCountUtil.release(msg);
1033 promise.setFailure(new UnsupportedOperationException(
1034 Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL + " is set to false"));
1035 return;
1036 }
1037 try {
1038 ObjectUtil.checkInRange(updateFrame.windowSizeIncrement(), 0,
1039 flowControlledBytes, "windowSizeIncrement");
1040 } catch (RuntimeException e) {
1041 ReferenceCountUtil.release(updateFrame);
1042 promise.setFailure(e);
1043 return;
1044 }
1045 flowControlledBytes -= updateFrame.windowSizeIncrement();
1046 if (parentContext().isRemoved()) {
1047 ReferenceCountUtil.release(msg);
1048 promise.setFailure(new ClosedChannelException());
1049 return;
1050 }
1051 ChannelFuture f = writeWindowUpdateFrame(updateFrame);
1052 if (f.isDone()) {
1053 writeComplete(f, promise);
1054 } else {
1055 f.addListener(new ChannelFutureListener() {
1056 @Override
1057 public void operationComplete(ChannelFuture future) {
1058 writeComplete(future, promise);
1059 }
1060 });
1061 }
1062 } else {
1063 writeHttp2StreamFrame(frame, promise);
1064 }
1065 } else {
1066 String msgStr = msg.toString();
1067 ReferenceCountUtil.release(msg);
1068 promise.setFailure(new IllegalArgumentException(
1069 "Message must be an " + StringUtil.simpleClassName(Http2StreamFrame.class) +
1070 ": " + msgStr));
1071 }
1072 } catch (Throwable t) {
1073 promise.tryFailure(t);
1074 }
1075 }
1076
1077 private boolean isEndOfStream(Http2Frame frame) {
1078 if (frame instanceof Http2HeadersFrame) {
1079 return ((Http2HeadersFrame) frame).isEndStream();
1080 }
1081 if (frame instanceof Http2DataFrame) {
1082 return ((Http2DataFrame) frame).isEndStream();
1083 }
1084 return false;
1085 }
1086
1087 private void writeHttp2StreamFrame(Http2StreamFrame frame, final ChannelPromise promise) {
1088 if (!firstFrameWritten && !isStreamIdValid(stream().id()) && !(frame instanceof Http2HeadersFrame)) {
1089 ReferenceCountUtil.release(frame);
1090 promise.setFailure(
1091 new IllegalArgumentException("The first frame must be a headers frame. Was: "
1092 + frame.name()));
1093 return;
1094 }
1095
1096 final boolean firstWrite;
1097 if (firstFrameWritten) {
1098 firstWrite = false;
1099 } else {
1100 firstWrite = firstFrameWritten = true;
1101 }
1102
1103
1104
1105 sentEndOfStream |= isEndOfStream(frame);
1106 ChannelFuture f = write0(parentContext(), frame);
1107 if (f.isDone()) {
1108 if (firstWrite) {
1109 firstWriteComplete(f, promise);
1110 } else {
1111 writeComplete(f, promise);
1112 }
1113 } else {
1114 final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(frame);
1115 incrementPendingOutboundBytes(bytes, false);
1116 f.addListener(new ChannelFutureListener() {
1117 @Override
1118 public void operationComplete(ChannelFuture future) {
1119 if (firstWrite) {
1120 firstWriteComplete(future, promise);
1121 } else {
1122 writeComplete(future, promise);
1123 }
1124 decrementPendingOutboundBytes(bytes, false);
1125 }
1126 });
1127 writeDoneAndNoFlush = true;
1128 }
1129 }
1130
1131 private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) {
1132 Throwable cause = future.cause();
1133 if (cause == null) {
1134 promise.setSuccess();
1135 } else {
1136
1137 closeForcibly();
1138 promise.setFailure(wrapStreamClosedError(cause));
1139 }
1140 }
1141
1142 private void writeComplete(ChannelFuture future, ChannelPromise promise) {
1143 Throwable cause = future.cause();
1144 if (cause == null) {
1145 promise.setSuccess();
1146 } else {
1147 Throwable error = wrapStreamClosedError(cause);
1148
1149 if (error instanceof IOException) {
1150 if (config.isAutoClose()) {
1151
1152 closeForcibly();
1153 } else {
1154
1155 outboundClosed = true;
1156 }
1157 }
1158 promise.setFailure(error);
1159 }
1160 }
1161
1162 private Throwable wrapStreamClosedError(Throwable cause) {
1163
1164
1165 if (cause instanceof Http2Exception && ((Http2Exception) cause).error() == Http2Error.STREAM_CLOSED) {
1166 return new ClosedChannelException().initCause(cause);
1167 }
1168 return cause;
1169 }
1170
1171 private Http2StreamFrame validateStreamFrame(Http2StreamFrame frame) {
1172 if (frame.stream() != null && frame.stream() != stream) {
1173 String msgString = frame.toString();
1174 ReferenceCountUtil.release(frame);
1175 throw new IllegalArgumentException(
1176 "Stream " + frame.stream() + " must not be set on the frame: " + msgString);
1177 }
1178 return frame;
1179 }
1180
1181 @Override
1182 public void flush() {
1183
1184
1185
1186
1187 if (!writeDoneAndNoFlush || isParentReadInProgress()) {
1188
1189 return;
1190 }
1191
1192
1193 writeDoneAndNoFlush = false;
1194 flush0(parentContext());
1195 }
1196
1197 @Override
1198 public ChannelPromise voidPromise() {
1199 return unsafeVoidPromise;
1200 }
1201
1202 @Override
1203 public ChannelOutboundBuffer outboundBuffer() {
1204
1205 return null;
1206 }
1207 }
1208
1209
1210
1211
1212
1213
1214 private static final class Http2StreamChannelConfig extends DefaultChannelConfig {
1215
1216 volatile boolean autoStreamFlowControl = true;
1217 Http2StreamChannelConfig(Channel channel) {
1218 super(channel);
1219 }
1220
1221 @Override
1222 public MessageSizeEstimator getMessageSizeEstimator() {
1223 return FlowControlledFrameSizeEstimator.INSTANCE;
1224 }
1225
1226 @Override
1227 public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
1228 throw new UnsupportedOperationException();
1229 }
1230
1231 @Override
1232 public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
1233 if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
1234 throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " +
1235 RecvByteBufAllocator.ExtendedHandle.class);
1236 }
1237 super.setRecvByteBufAllocator(allocator);
1238 return this;
1239 }
1240
1241 @Override
1242 public Map<ChannelOption<?>, Object> getOptions() {
1243 return getOptions(
1244 super.getOptions(),
1245 Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL);
1246 }
1247
1248 @SuppressWarnings("unchecked")
1249 @Override
1250 public <T> T getOption(ChannelOption<T> option) {
1251 if (option == Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL) {
1252 return (T) Boolean.valueOf(autoStreamFlowControl);
1253 }
1254 return super.getOption(option);
1255 }
1256
1257 @Override
1258 public <T> boolean setOption(ChannelOption<T> option, T value) {
1259 validate(option, value);
1260 if (option == Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL) {
1261 boolean newValue = (Boolean) value;
1262 boolean changed = newValue && !autoStreamFlowControl;
1263 autoStreamFlowControl = (Boolean) value;
1264 if (changed) {
1265 if (channel.isRegistered()) {
1266 final Http2ChannelUnsafe unsafe = (Http2ChannelUnsafe) channel.unsafe();
1267 if (channel.eventLoop().inEventLoop()) {
1268 unsafe.updateLocalWindowIfNeededAndFlush();
1269 } else {
1270 channel.eventLoop().execute(new Runnable() {
1271 @Override
1272 public void run() {
1273 unsafe.updateLocalWindowIfNeededAndFlush();
1274 }
1275 });
1276 }
1277 }
1278 }
1279 return true;
1280 }
1281 return super.setOption(option, value);
1282 }
1283 }
1284
1285 private void maybeAddChannelToReadCompletePendingQueue() {
1286 if (!readCompletePending) {
1287 readCompletePending = true;
1288 addChannelToReadCompletePendingQueue();
1289 }
1290 }
1291
1292 protected void flush0(ChannelHandlerContext ctx) {
1293 ctx.flush();
1294 }
1295
1296 protected ChannelFuture write0(ChannelHandlerContext ctx, Object msg) {
1297 ChannelPromise promise = ctx.newPromise();
1298 ctx.write(msg, promise);
1299 return promise;
1300 }
1301
1302 protected abstract boolean isParentReadInProgress();
1303 protected abstract void addChannelToReadCompletePendingQueue();
1304 protected abstract ChannelHandlerContext parentContext();
1305 }