1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.quic;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.Unpooled;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelId;
24 import io.netty.channel.ChannelMetadata;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPipeline;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.DefaultChannelId;
29 import io.netty.channel.DefaultChannelPipeline;
30 import io.netty.channel.EventLoop;
31 import io.netty.channel.PendingWriteQueue;
32 import io.netty.channel.RecvByteBufAllocator;
33 import io.netty.channel.VoidChannelPromise;
34 import io.netty.channel.socket.ChannelInputShutdownEvent;
35 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
36 import io.netty.channel.socket.ChannelOutputShutdownException;
37 import io.netty.util.DefaultAttributeMap;
38 import io.netty.util.ReferenceCountUtil;
39 import io.netty.util.concurrent.PromiseNotifier;
40 import io.netty.util.internal.StringUtil;
41 import io.netty.util.internal.logging.InternalLogger;
42 import io.netty.util.internal.logging.InternalLoggerFactory;
43 import org.jetbrains.annotations.Nullable;
44
45 import java.net.SocketAddress;
46 import java.nio.channels.ClosedChannelException;
47 import java.util.concurrent.RejectedExecutionException;
48
49
50
51
52 final class QuicheQuicStreamChannel extends DefaultAttributeMap implements QuicStreamChannel {
53 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
54 private static final InternalLogger LOGGER = InternalLoggerFactory.getInstance(QuicheQuicStreamChannel.class);
55 private final QuicheQuicChannel parent;
56 private final ChannelId id;
57 private final ChannelPipeline pipeline;
58 private final QuicStreamChannelUnsafe unsafe;
59 private final ChannelPromise closePromise;
60 private final PendingWriteQueue queue;
61
62 private final QuicStreamChannelConfig config;
63 private final QuicStreamAddress address;
64
65 private boolean readable;
66 private boolean readPending;
67 private boolean inRecv;
68 private boolean inWriteQueued;
69 private boolean finReceived;
70 private boolean finSent;
71
72 private volatile boolean registered;
73 private volatile boolean writable = true;
74 private volatile boolean active = true;
75 private volatile boolean inputShutdown;
76 private volatile boolean outputShutdown;
77 private volatile QuicStreamPriority priority;
78 private volatile int capacity;
79
80 QuicheQuicStreamChannel(QuicheQuicChannel parent, long streamId) {
81 this.parent = parent;
82 this.id = DefaultChannelId.newInstance();
83 unsafe = new QuicStreamChannelUnsafe();
84 this.pipeline = new DefaultChannelPipeline(this) {
85
86 };
87 config = new QuicheQuicStreamChannelConfig(this);
88 this.address = new QuicStreamAddress(streamId);
89 this.closePromise = newPromise();
90 queue = new PendingWriteQueue(this);
91
92
93 if (parent.streamType(streamId) == QuicStreamType.UNIDIRECTIONAL && parent.isStreamLocalCreated(streamId)) {
94 inputShutdown = true;
95 }
96 }
97
98 @Override
99 public QuicStreamAddress localAddress() {
100 return address;
101 }
102
103 @Override
104 public QuicStreamAddress remoteAddress() {
105 return address;
106 }
107
108 @Override
109 public boolean isLocalCreated() {
110 return parent().isStreamLocalCreated(streamId());
111 }
112
113 @Override
114 public QuicStreamType type() {
115 return parent().streamType(streamId());
116 }
117
118 @Override
119 public long streamId() {
120 return address.streamId();
121 }
122
123 @Override
124 public QuicStreamPriority priority() {
125 return priority;
126 }
127
128 @Override
129 public ChannelFuture updatePriority(QuicStreamPriority priority, ChannelPromise promise) {
130 if (eventLoop().inEventLoop()) {
131 updatePriority0(priority, promise);
132 } else {
133 eventLoop().execute(() -> updatePriority0(priority, promise));
134 }
135 return promise;
136 }
137
138 private void updatePriority0(QuicStreamPriority priority, ChannelPromise promise) {
139 assert eventLoop().inEventLoop();
140 if (!promise.setUncancellable()) {
141 return;
142 }
143 try {
144 parent().streamPriority(streamId(), (byte) priority.urgency(), priority.isIncremental());
145 } catch (Throwable cause) {
146 promise.setFailure(cause);
147 return;
148 }
149 this.priority = priority;
150 promise.setSuccess();
151 }
152
153 @Override
154 public boolean isInputShutdown() {
155 return inputShutdown;
156 }
157
158 @Override
159 public ChannelFuture shutdownOutput(ChannelPromise promise) {
160 if (eventLoop().inEventLoop()) {
161 shutdownOutput0(promise);
162 } else {
163 eventLoop().execute(() -> shutdownOutput0(promise));
164 }
165 return promise;
166 }
167
168 private void shutdownOutput0(ChannelPromise promise) {
169 assert eventLoop().inEventLoop();
170 if (!promise.setUncancellable()) {
171 return;
172 }
173 outputShutdown = true;
174 unsafe.writeWithoutCheckChannelState(QuicStreamFrame.EMPTY_FIN, promise);
175 unsafe.flush();
176 }
177
178 @Override
179 public ChannelFuture shutdownInput(int error, ChannelPromise promise) {
180 if (eventLoop().inEventLoop()) {
181 shutdownInput0(error, promise);
182 } else {
183 eventLoop().execute(() -> shutdownInput0(error, promise));
184 }
185 return promise;
186 }
187
188 @Override
189 public ChannelFuture shutdownOutput(int error, ChannelPromise promise) {
190 if (eventLoop().inEventLoop()) {
191 shutdownOutput0(error, promise);
192 } else {
193 eventLoop().execute(() -> shutdownOutput0(error, promise));
194 }
195 return promise;
196 }
197
198 @Override
199 public QuicheQuicChannel parent() {
200 return parent;
201 }
202
203 private void shutdownInput0(int err, ChannelPromise channelPromise) {
204 assert eventLoop().inEventLoop();
205 if (!channelPromise.setUncancellable()) {
206 return;
207 }
208 inputShutdown = true;
209 parent().streamShutdown(streamId(), true, false, err, channelPromise);
210 closeIfDone();
211 }
212
213 @Override
214 public boolean isOutputShutdown() {
215 return outputShutdown;
216 }
217
218 private void shutdownOutput0(int error, ChannelPromise channelPromise) {
219 assert eventLoop().inEventLoop();
220 if (!channelPromise.setUncancellable()) {
221 return;
222 }
223 parent().streamShutdown(streamId(), false, true, error, channelPromise);
224 outputShutdown = true;
225 closeIfDone();
226 }
227
228 @Override
229 public boolean isShutdown() {
230 return outputShutdown && inputShutdown;
231 }
232
233 @Override
234 public ChannelFuture shutdown(ChannelPromise channelPromise) {
235 if (eventLoop().inEventLoop()) {
236 shutdown0(channelPromise);
237 } else {
238 eventLoop().execute(() -> shutdown0(channelPromise));
239 }
240 return channelPromise;
241 }
242
243 private void shutdown0(ChannelPromise promise) {
244 assert eventLoop().inEventLoop();
245 if (!promise.setUncancellable()) {
246 return;
247 }
248 inputShutdown = true;
249 outputShutdown = true;
250 unsafe.writeWithoutCheckChannelState(QuicStreamFrame.EMPTY_FIN, unsafe.voidPromise());
251 unsafe.flush();
252 parent().streamShutdown(streamId(), true, false, 0, promise);
253 closeIfDone();
254 }
255
256 @Override
257 public ChannelFuture shutdown(int error, ChannelPromise promise) {
258 if (eventLoop().inEventLoop()) {
259 shutdown0(error, promise);
260 } else {
261 eventLoop().execute(() -> shutdown0(error, promise));
262 }
263 return promise;
264 }
265
266 private void shutdown0(int error, ChannelPromise channelPromise) {
267 assert eventLoop().inEventLoop();
268 if (!channelPromise.setUncancellable()) {
269 return;
270 }
271 inputShutdown = true;
272 outputShutdown = true;
273 parent().streamShutdown(streamId(), true, true, error, channelPromise);
274 closeIfDone();
275 }
276
277 private void sendFinIfNeeded() throws Exception {
278 if (!finSent) {
279 finSent = true;
280 parent().streamSendFin(streamId());
281 }
282 }
283
284 private void closeIfDone() {
285 if (finSent && (finReceived || type() == QuicStreamType.UNIDIRECTIONAL && isLocalCreated())) {
286 unsafe().close(unsafe().voidPromise());
287 }
288 }
289
290 private void removeStreamFromParent() {
291 if (!active && finReceived) {
292 parent().streamClosed(streamId());
293 inputShutdown = true;
294 outputShutdown = true;
295 }
296 }
297
298 @Override
299 public QuicStreamChannel flush() {
300 pipeline.flush();
301 return this;
302 }
303
304 @Override
305 public QuicStreamChannel read() {
306 pipeline.read();
307 return this;
308 }
309
310 @Override
311 public QuicStreamChannelConfig config() {
312 return config;
313 }
314
315 @Override
316 public boolean isOpen() {
317 return active;
318 }
319
320 @Override
321 public boolean isActive() {
322 return isOpen();
323 }
324
325 @Override
326 public ChannelMetadata metadata() {
327 return METADATA;
328 }
329
330 @Override
331 public ChannelId id() {
332 return id;
333 }
334
335 @Override
336 public EventLoop eventLoop() {
337 return parent.eventLoop();
338 }
339
340 @Override
341 public boolean isRegistered() {
342 return registered;
343 }
344
345 @Override
346 public ChannelFuture closeFuture() {
347 return closePromise;
348 }
349
350 @Override
351 public boolean isWritable() {
352 return writable;
353 }
354
355 @Override
356 public long bytesBeforeUnwritable() {
357
358 return Math.max(capacity, 0);
359 }
360
361 @Override
362 public long bytesBeforeWritable() {
363 if (writable) {
364 return 0;
365 }
366
367 return 8;
368 }
369
370 @Override
371 public QuicStreamChannelUnsafe unsafe() {
372 return unsafe;
373 }
374
375 @Override
376 public ChannelPipeline pipeline() {
377 return pipeline;
378 }
379
380 @Override
381 public ByteBufAllocator alloc() {
382 return config.getAllocator();
383 }
384
385 @Override
386 public int compareTo(Channel o) {
387 return id.compareTo(o.id());
388 }
389
390
391
392
393 @Override
394 public int hashCode() {
395 return id.hashCode();
396 }
397
398
399
400
401
402 @Override
403 public boolean equals(Object o) {
404 return this == o;
405 }
406
407 @Override
408 public String toString() {
409 return "[id: 0x" + id.asShortText() + ", " + address + "]";
410 }
411
412
413
414
415 boolean writable(int capacity) {
416 assert eventLoop().inEventLoop();
417 if (capacity < 0) {
418
419 if (capacity != Quiche.QUICHE_ERR_DONE) {
420 if (!queue.isEmpty()) {
421 if (capacity == Quiche.QUICHE_ERR_STREAM_STOPPED) {
422 queue.removeAndFailAll(new ChannelOutputShutdownException("STOP_SENDING frame received"));
423
424 return false;
425 } else {
426 queue.removeAndFailAll(Quiche.convertToException(capacity));
427 }
428 } else if (capacity == Quiche.QUICHE_ERR_STREAM_STOPPED) {
429
430 return false;
431 }
432
433 finSent = true;
434 unsafe().close(unsafe().voidPromise());
435 }
436 return false;
437 }
438 this.capacity = capacity;
439 boolean mayNeedWrite = unsafe().writeQueued();
440
441 updateWritabilityIfNeeded(this.capacity > 0);
442 return mayNeedWrite;
443 }
444
445 private void updateWritabilityIfNeeded(boolean newWritable) {
446 if (writable != newWritable) {
447 writable = newWritable;
448 pipeline.fireChannelWritabilityChanged();
449 }
450 }
451
452
453
454
455 void readable() {
456 assert eventLoop().inEventLoop();
457
458 readable = true;
459 if (readPending) {
460 unsafe().recv();
461 }
462 }
463
464 final class QuicStreamChannelUnsafe implements Unsafe {
465
466 @SuppressWarnings("deprecation")
467 private RecvByteBufAllocator.Handle recvHandle;
468
469 private final ChannelPromise voidPromise = new VoidChannelPromise(
470 QuicheQuicStreamChannel.this, false);
471 @Override
472 public void connect(SocketAddress remote, SocketAddress local, ChannelPromise promise) {
473 assert eventLoop().inEventLoop();
474 promise.setFailure(new UnsupportedOperationException());
475 }
476
477 @SuppressWarnings("deprecation")
478 @Override
479 public RecvByteBufAllocator.Handle recvBufAllocHandle() {
480 if (recvHandle == null) {
481 recvHandle = config.getRecvByteBufAllocator().newHandle();
482 }
483 return recvHandle;
484 }
485
486 @Override
487 public SocketAddress localAddress() {
488 return address;
489 }
490
491 @Override
492 public SocketAddress remoteAddress() {
493 return address;
494 }
495
496 @Override
497 public void register(EventLoop eventLoop, ChannelPromise promise) {
498 assert eventLoop.inEventLoop();
499 if (!promise.setUncancellable()) {
500 return;
501 }
502 if (registered) {
503 promise.setFailure(new IllegalStateException());
504 return;
505 }
506 if (eventLoop != parent.eventLoop()) {
507 promise.setFailure(new IllegalArgumentException());
508 return;
509 }
510 registered = true;
511 promise.setSuccess();
512 pipeline.fireChannelRegistered();
513 pipeline.fireChannelActive();
514 }
515
516 @Override
517 public void bind(SocketAddress localAddress, ChannelPromise promise) {
518 assert eventLoop().inEventLoop();
519 if (!promise.setUncancellable()) {
520 return;
521 }
522 promise.setFailure(new UnsupportedOperationException());
523 }
524
525 @Override
526 public void disconnect(ChannelPromise promise) {
527 assert eventLoop().inEventLoop();
528 close(promise);
529 }
530
531 @Override
532 public void close(ChannelPromise promise) {
533 close(null, promise);
534 }
535
536 void close(@Nullable ClosedChannelException writeFailCause, ChannelPromise promise) {
537 assert eventLoop().inEventLoop();
538 if (!promise.setUncancellable()) {
539 return;
540 }
541 if (!active || closePromise.isDone()) {
542 if (promise.isVoid()) {
543 return;
544 }
545 closePromise.addListener(new PromiseNotifier<>(promise));
546 return;
547 }
548 active = false;
549 try {
550
551 sendFinIfNeeded();
552 } catch (Exception ignore) {
553
554 } finally {
555 if (!queue.isEmpty()) {
556
557 if (writeFailCause == null) {
558 writeFailCause = new ClosedChannelException();
559 }
560 queue.removeAndFailAll(writeFailCause);
561 }
562
563 promise.trySuccess();
564 closePromise.trySuccess();
565 if (type() == QuicStreamType.UNIDIRECTIONAL && isLocalCreated()) {
566 inputShutdown = true;
567 outputShutdown = true;
568
569
570 parent().streamClosed(streamId());
571 } else {
572 removeStreamFromParent();
573 }
574 }
575 if (inWriteQueued) {
576 invokeLater(() -> deregister(voidPromise(), true));
577 } else {
578 deregister(voidPromise(), true);
579 }
580 }
581
582 private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
583 assert eventLoop().inEventLoop();
584 if (!promise.setUncancellable()) {
585 return;
586 }
587
588 if (!registered) {
589 promise.trySuccess();
590 return;
591 }
592
593
594
595
596
597
598
599
600
601
602 invokeLater(() -> {
603 if (fireChannelInactive) {
604 pipeline.fireChannelInactive();
605 }
606
607
608
609
610 if (registered) {
611 registered = false;
612 pipeline.fireChannelUnregistered();
613 }
614 promise.setSuccess();
615 });
616 }
617
618 private void invokeLater(Runnable task) {
619 try {
620
621
622
623
624
625
626
627
628
629
630
631 eventLoop().execute(task);
632 } catch (RejectedExecutionException e) {
633 LOGGER.warn("Can't invoke task later as EventLoop rejected it", e);
634 }
635 }
636
637 @Override
638 public void closeForcibly() {
639 assert eventLoop().inEventLoop();
640 close(unsafe().voidPromise());
641 }
642
643 @Override
644 public void deregister(ChannelPromise promise) {
645 assert eventLoop().inEventLoop();
646 deregister(promise, false);
647 }
648
649 @Override
650 public void beginRead() {
651 assert eventLoop().inEventLoop();
652 readPending = true;
653 if (readable) {
654 unsafe().recv();
655
656
657
658
659
660
661 parent().connectionSendAndFlush();
662 }
663 }
664
665 private void closeIfNeeded(boolean wasFinSent) {
666
667
668
669
670 if (!wasFinSent && QuicheQuicStreamChannel.this.finSent
671 && (type() == QuicStreamType.UNIDIRECTIONAL || finReceived)) {
672
673 close(voidPromise());
674 }
675 }
676
677 boolean writeQueued() {
678 assert eventLoop().inEventLoop();
679 boolean wasFinSent = QuicheQuicStreamChannel.this.finSent;
680 inWriteQueued = true;
681 try {
682 if (queue.isEmpty()) {
683 return false;
684 }
685 boolean written = false;
686 for (;;) {
687 Object msg = queue.current();
688 if (msg == null) {
689 break;
690 }
691 try {
692 int res = write0(msg);
693 if (res == 1) {
694 queue.remove().setSuccess();
695 written = true;
696 } else if (res == 0 || res == Quiche.QUICHE_ERR_DONE) {
697 break;
698 } else if (res == Quiche.QUICHE_ERR_STREAM_STOPPED) {
699
700
701
702 queue.removeAndFailAll(
703 new ChannelOutputShutdownException("STOP_SENDING frame received"));
704 break;
705 } else {
706 queue.remove().setFailure(Quiche.convertToException(res));
707 }
708 } catch (Exception e) {
709 queue.remove().setFailure(e);
710 }
711 }
712 if (written) {
713 updateWritabilityIfNeeded(true);
714 }
715 return written;
716 } finally {
717 closeIfNeeded(wasFinSent);
718 inWriteQueued = false;
719 }
720 }
721
722 @Override
723 public void write(Object msg, ChannelPromise promise) {
724 assert eventLoop().inEventLoop();
725 if (!promise.setUncancellable()) {
726 ReferenceCountUtil.release(msg);
727 return;
728 }
729
730
731 if (!isOpen()) {
732 queueAndFailAll(msg, promise, new ClosedChannelException());
733 } else if (finSent) {
734 queueAndFailAll(msg, promise, new ChannelOutputShutdownException("Fin was sent already"));
735 } else if (!queue.isEmpty()) {
736
737
738 try {
739 msg = filterMsg(msg);
740 } catch (UnsupportedOperationException e) {
741 ReferenceCountUtil.release(msg);
742 promise.setFailure(e);
743 return;
744 }
745
746
747 ReferenceCountUtil.touch(msg);
748 queue.add(msg, promise);
749
750
751 writeQueued();
752 } else {
753 assert queue.isEmpty();
754 writeWithoutCheckChannelState(msg, promise);
755 }
756 }
757
758 private void queueAndFailAll(Object msg, ChannelPromise promise, Throwable cause) {
759
760 ReferenceCountUtil.touch(msg);
761
762 queue.add(msg, promise);
763 queue.removeAndFailAll(cause);
764 }
765
766 private Object filterMsg(Object msg) {
767 if (msg instanceof ByteBuf) {
768 ByteBuf buffer = (ByteBuf) msg;
769 if (!buffer.isDirect()) {
770 ByteBuf tmpBuffer = alloc().directBuffer(buffer.readableBytes());
771 tmpBuffer.writeBytes(buffer, buffer.readerIndex(), buffer.readableBytes());
772 buffer.release();
773 return tmpBuffer;
774 }
775 } else if (msg instanceof QuicStreamFrame) {
776 QuicStreamFrame frame = (QuicStreamFrame) msg;
777 ByteBuf buffer = frame.content();
778 if (!buffer.isDirect()) {
779 ByteBuf tmpBuffer = alloc().directBuffer(buffer.readableBytes());
780 tmpBuffer.writeBytes(buffer, buffer.readerIndex(), buffer.readableBytes());
781 QuicStreamFrame tmpFrame = frame.replace(tmpBuffer);
782 frame.release();
783 return tmpFrame;
784 }
785 } else {
786 throw new UnsupportedOperationException(
787 "unsupported message type: " + StringUtil.simpleClassName(msg));
788 }
789 return msg;
790 }
791
792 void writeWithoutCheckChannelState(Object msg, ChannelPromise promise) {
793 try {
794 msg = filterMsg(msg);
795 } catch (UnsupportedOperationException e) {
796 ReferenceCountUtil.release(msg);
797 promise.setFailure(e);
798 }
799
800 boolean wasFinSent = QuicheQuicStreamChannel.this.finSent;
801 boolean mayNeedWritabilityUpdate = false;
802 try {
803 int res = write0(msg);
804 if (res > 0) {
805 ReferenceCountUtil.release(msg);
806 promise.setSuccess();
807 mayNeedWritabilityUpdate = capacity == 0;
808 } else if (res == 0 || res == Quiche.QUICHE_ERR_DONE) {
809
810 ReferenceCountUtil.touch(msg);
811 queue.add(msg, promise);
812 mayNeedWritabilityUpdate = true;
813 } else if (res == Quiche.QUICHE_ERR_STREAM_STOPPED) {
814 throw new ChannelOutputShutdownException("STOP_SENDING frame received");
815 } else {
816 throw Quiche.convertToException(res);
817 }
818 } catch (Exception e) {
819 ReferenceCountUtil.release(msg);
820 promise.setFailure(e);
821 mayNeedWritabilityUpdate = capacity == 0;
822 } finally {
823 if (mayNeedWritabilityUpdate) {
824 updateWritabilityIfNeeded(false);
825 }
826 closeIfNeeded(wasFinSent);
827 }
828 }
829
830 private int write0(Object msg) throws Exception {
831 if (type() == QuicStreamType.UNIDIRECTIONAL && !isLocalCreated()) {
832 throw new UnsupportedOperationException(
833 "Writes on non-local created streams that are unidirectional are not supported");
834 }
835 if (finSent) {
836 throw new ChannelOutputShutdownException("Fin was sent already");
837 }
838
839 final boolean fin;
840 ByteBuf buffer;
841 if (msg instanceof ByteBuf) {
842 fin = false;
843 buffer = (ByteBuf) msg;
844 } else {
845 QuicStreamFrame frame = (QuicStreamFrame) msg;
846 fin = frame.hasFin();
847 buffer = frame.content();
848 }
849
850 boolean readable = buffer.isReadable();
851 if (!fin && !readable) {
852 return 1;
853 }
854
855 boolean sendSomething = false;
856 try {
857 do {
858 int res = parent().streamSend(streamId(), buffer, fin);
859
860
861 int cap = parent.streamCapacity(streamId());
862 if (cap >= 0) {
863 capacity = cap;
864 }
865 if (res < 0) {
866 return res;
867 }
868 if (readable && res == 0) {
869 return 0;
870 }
871 sendSomething = true;
872 buffer.skipBytes(res);
873 } while (buffer.isReadable());
874
875 if (fin) {
876 finSent = true;
877 outputShutdown = true;
878 }
879 return 1;
880 } finally {
881
882
883
884
885 if (sendSomething) {
886 parent.connectionSendAndFlush();
887 }
888 }
889 }
890
891 @Override
892 public void flush() {
893 assert eventLoop().inEventLoop();
894
895 }
896
897 @Override
898 public ChannelPromise voidPromise() {
899 assert eventLoop().inEventLoop();
900 return voidPromise;
901 }
902
903 @Override
904 @Nullable
905 public ChannelOutboundBuffer outboundBuffer() {
906 return null;
907 }
908
909 private void closeOnRead(ChannelPipeline pipeline, boolean readFrames) {
910 if (readFrames && finReceived && finSent) {
911 close(voidPromise());
912 } else if (config.isAllowHalfClosure()) {
913 if (finReceived) {
914
915
916 pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
917 pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
918 if (finSent) {
919
920
921 close(voidPromise());
922 }
923 }
924 } else {
925
926
927 close(voidPromise());
928 }
929 }
930
931 private void handleReadException(ChannelPipeline pipeline, @Nullable ByteBuf byteBuf, Throwable cause,
932 @SuppressWarnings("deprecation") RecvByteBufAllocator.Handle allocHandle,
933 boolean readFrames) {
934 if (byteBuf != null) {
935 if (byteBuf.isReadable()) {
936 pipeline.fireChannelRead(byteBuf);
937 } else {
938 byteBuf.release();
939 }
940 }
941
942 readComplete(allocHandle, pipeline);
943 pipeline.fireExceptionCaught(cause);
944 if (finReceived) {
945 closeOnRead(pipeline, readFrames);
946 }
947 }
948
949 void recv() {
950 assert eventLoop().inEventLoop();
951 if (inRecv) {
952
953
954 return;
955 }
956
957 inRecv = true;
958 try {
959 ChannelPipeline pipeline = pipeline();
960 QuicheQuicStreamChannelConfig config = (QuicheQuicStreamChannelConfig) config();
961
962
963 DirectIoByteBufAllocator allocator = config.allocator;
964 @SuppressWarnings("deprecation")
965 RecvByteBufAllocator.Handle allocHandle = this.recvBufAllocHandle();
966 boolean readFrames = config.isReadFrames();
967
968
969
970 while (active && readPending && readable) {
971 allocHandle.reset(config);
972 ByteBuf byteBuf = null;
973 QuicheQuicChannel parent = parent();
974
975
976
977 boolean readCompleteNeeded = false;
978 boolean continueReading = true;
979 try {
980 while (!finReceived && continueReading) {
981 byteBuf = allocHandle.allocate(allocator);
982 allocHandle.attemptedBytesRead(byteBuf.writableBytes());
983 switch (parent.streamRecv(streamId(), byteBuf)) {
984 case DONE:
985
986 readable = false;
987 break;
988 case FIN:
989
990
991 readable = false;
992 finReceived = true;
993 inputShutdown = true;
994 break;
995 case OK:
996 break;
997 default:
998 throw new Error();
999 }
1000 allocHandle.lastBytesRead(byteBuf.readableBytes());
1001 if (allocHandle.lastBytesRead() <= 0) {
1002 byteBuf.release();
1003 if (finReceived && readFrames) {
1004
1005
1006 byteBuf = Unpooled.EMPTY_BUFFER;
1007 } else {
1008 byteBuf = null;
1009 break;
1010 }
1011 }
1012
1013 allocHandle.incMessagesRead(1);
1014 readCompleteNeeded = true;
1015
1016
1017
1018 readPending = false;
1019
1020 if (readFrames) {
1021 pipeline.fireChannelRead(new DefaultQuicStreamFrame(byteBuf, finReceived));
1022 } else {
1023 pipeline.fireChannelRead(byteBuf);
1024 }
1025 byteBuf = null;
1026 continueReading = allocHandle.continueReading();
1027 }
1028
1029 if (readCompleteNeeded) {
1030 readComplete(allocHandle, pipeline);
1031 }
1032 if (finReceived) {
1033 readable = false;
1034 closeOnRead(pipeline, readFrames);
1035 }
1036 } catch (Throwable cause) {
1037 readable = false;
1038 handleReadException(pipeline, byteBuf, cause, allocHandle, readFrames);
1039 }
1040 }
1041 } finally {
1042
1043 inRecv = false;
1044 removeStreamFromParent();
1045 }
1046 }
1047
1048
1049
1050 private void readComplete(@SuppressWarnings("deprecation") RecvByteBufAllocator.Handle allocHandle,
1051 ChannelPipeline pipeline) {
1052 allocHandle.readComplete();
1053 pipeline.fireChannelReadComplete();
1054 }
1055 }
1056 }