View Javadoc
1   /*
2    * Copyright 2020 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.quic;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.Channel;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelId;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.DefaultChannelId;
29  import io.netty.channel.DefaultChannelPipeline;
30  import io.netty.channel.EventLoop;
31  import io.netty.channel.PendingWriteQueue;
32  import io.netty.channel.RecvByteBufAllocator;
33  import io.netty.channel.VoidChannelPromise;
34  import io.netty.channel.socket.ChannelInputShutdownEvent;
35  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
36  import io.netty.channel.socket.ChannelOutputShutdownException;
37  import io.netty.util.DefaultAttributeMap;
38  import io.netty.util.ReferenceCountUtil;
39  import io.netty.util.concurrent.PromiseNotifier;
40  import io.netty.util.internal.StringUtil;
41  import io.netty.util.internal.logging.InternalLogger;
42  import io.netty.util.internal.logging.InternalLoggerFactory;
43  import org.jetbrains.annotations.Nullable;
44  
45  import java.net.SocketAddress;
46  import java.nio.channels.ClosedChannelException;
47  import java.util.concurrent.RejectedExecutionException;
48  
49  /**
50   * {@link QuicStreamChannel} implementation that uses <a href="https://github.com/cloudflare/quiche">quiche</a>.
51   */
52  final class QuicheQuicStreamChannel extends DefaultAttributeMap implements QuicStreamChannel {
53      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
54      private static final InternalLogger LOGGER = InternalLoggerFactory.getInstance(QuicheQuicStreamChannel.class);
55      private final QuicheQuicChannel parent;
56      private final ChannelId id;
57      private final ChannelPipeline pipeline;
58      private final QuicStreamChannelUnsafe unsafe;
59      private final ChannelPromise closePromise;
60      private final PendingWriteQueue queue;
61  
62      private final QuicStreamChannelConfig config;
63      private final QuicStreamAddress address;
64  
65      private boolean readable;
66      private boolean readPending;
67      private boolean inRecv;
68      private boolean inWriteQueued;
69      private boolean finReceived;
70      private boolean finSent;
71  
72      private volatile boolean registered;
73      private volatile boolean writable = true;
74      private volatile boolean active = true;
75      private volatile boolean inputShutdown;
76      private volatile boolean outputShutdown;
77      private volatile QuicStreamPriority priority;
78      private volatile int capacity;
79  
80      QuicheQuicStreamChannel(QuicheQuicChannel parent, long streamId) {
81          this.parent = parent;
82          this.id = DefaultChannelId.newInstance();
83          unsafe = new QuicStreamChannelUnsafe();
84          this.pipeline = new DefaultChannelPipeline(this) {
85              // TODO: add some overrides maybe ?
86          };
87          config = new QuicheQuicStreamChannelConfig(this);
88          this.address = new QuicStreamAddress(streamId);
89          this.closePromise = newPromise();
90          queue = new PendingWriteQueue(this);
91          // Local created unidirectional streams have the input shutdown by spec. There will never be any data for
92          // these to be read.
93          if (parent.streamType(streamId) == QuicStreamType.UNIDIRECTIONAL && parent.isStreamLocalCreated(streamId)) {
94              inputShutdown = true;
95          }
96      }
97  
98      @Override
99      public QuicStreamAddress localAddress() {
100         return address;
101     }
102 
103     @Override
104     public QuicStreamAddress remoteAddress() {
105         return address;
106     }
107 
108     @Override
109     public boolean isLocalCreated() {
110         return parent().isStreamLocalCreated(streamId());
111     }
112 
113     @Override
114     public QuicStreamType type() {
115         return parent().streamType(streamId());
116     }
117 
118     @Override
119     public long streamId() {
120         return address.streamId();
121     }
122 
123     @Override
124     public QuicStreamPriority priority() {
125         return priority;
126     }
127 
128     @Override
129     public ChannelFuture updatePriority(QuicStreamPriority priority, ChannelPromise promise) {
130         if (eventLoop().inEventLoop()) {
131             updatePriority0(priority, promise);
132         } else {
133             eventLoop().execute(() -> updatePriority0(priority, promise));
134         }
135         return promise;
136     }
137 
138     private void updatePriority0(QuicStreamPriority priority, ChannelPromise promise) {
139         assert eventLoop().inEventLoop();
140         if (!promise.setUncancellable()) {
141             return;
142         }
143         try {
144             parent().streamPriority(streamId(), (byte) priority.urgency(), priority.isIncremental());
145         } catch (Throwable cause) {
146             promise.setFailure(cause);
147             return;
148         }
149         this.priority = priority;
150         promise.setSuccess();
151     }
152 
153     @Override
154     public boolean isInputShutdown() {
155         return inputShutdown;
156     }
157 
158     @Override
159     public ChannelFuture shutdownOutput(ChannelPromise promise) {
160         if (eventLoop().inEventLoop()) {
161             shutdownOutput0(promise);
162         } else {
163             eventLoop().execute(() -> shutdownOutput0(promise));
164         }
165         return promise;
166     }
167 
168     private void shutdownOutput0(ChannelPromise promise) {
169         assert eventLoop().inEventLoop();
170         if (!promise.setUncancellable()) {
171             return;
172         }
173         outputShutdown = true;
174         unsafe.writeWithoutCheckChannelState(QuicStreamFrame.EMPTY_FIN, promise);
175         unsafe.flush();
176     }
177 
178     @Override
179     public ChannelFuture shutdownInput(int error, ChannelPromise promise) {
180         if (eventLoop().inEventLoop()) {
181             shutdownInput0(error, promise);
182         } else {
183             eventLoop().execute(() -> shutdownInput0(error, promise));
184         }
185         return promise;
186     }
187 
188     @Override
189     public ChannelFuture shutdownOutput(int error, ChannelPromise promise) {
190         if (eventLoop().inEventLoop()) {
191             shutdownOutput0(error, promise);
192         } else {
193             eventLoop().execute(() -> shutdownOutput0(error, promise));
194         }
195         return promise;
196     }
197 
198     @Override
199     public QuicheQuicChannel parent() {
200         return parent;
201     }
202 
203     private void shutdownInput0(int err, ChannelPromise channelPromise) {
204         assert eventLoop().inEventLoop();
205         if (!channelPromise.setUncancellable()) {
206             return;
207         }
208         inputShutdown = true;
209         parent().streamShutdown(streamId(), true, false, err, channelPromise);
210         closeIfDone();
211     }
212 
213     @Override
214     public boolean isOutputShutdown() {
215         return outputShutdown;
216     }
217 
218     private void shutdownOutput0(int error, ChannelPromise channelPromise) {
219         assert eventLoop().inEventLoop();
220         if (!channelPromise.setUncancellable()) {
221             return;
222         }
223         parent().streamShutdown(streamId(), false, true, error, channelPromise);
224         outputShutdown = true;
225         closeIfDone();
226     }
227 
228     @Override
229     public boolean isShutdown() {
230         return outputShutdown && inputShutdown;
231     }
232 
233     @Override
234     public ChannelFuture shutdown(ChannelPromise channelPromise) {
235         if (eventLoop().inEventLoop()) {
236             shutdown0(channelPromise);
237         } else {
238             eventLoop().execute(() -> shutdown0(channelPromise));
239         }
240         return channelPromise;
241     }
242 
243     private void shutdown0(ChannelPromise promise) {
244         assert eventLoop().inEventLoop();
245         if (!promise.setUncancellable()) {
246             return;
247         }
248         inputShutdown = true;
249         outputShutdown = true;
250         unsafe.writeWithoutCheckChannelState(QuicStreamFrame.EMPTY_FIN, unsafe.voidPromise());
251         unsafe.flush();
252         parent().streamShutdown(streamId(), true, false, 0, promise);
253         closeIfDone();
254     }
255 
256     @Override
257     public ChannelFuture shutdown(int error, ChannelPromise promise) {
258         if (eventLoop().inEventLoop()) {
259             shutdown0(error, promise);
260         } else {
261             eventLoop().execute(() -> shutdown0(error, promise));
262         }
263         return promise;
264     }
265 
266     private void shutdown0(int error, ChannelPromise channelPromise) {
267         assert eventLoop().inEventLoop();
268         if (!channelPromise.setUncancellable()) {
269             return;
270         }
271         inputShutdown = true;
272         outputShutdown = true;
273         parent().streamShutdown(streamId(), true, true, error, channelPromise);
274         closeIfDone();
275     }
276 
277     private void sendFinIfNeeded() throws Exception {
278         if (!finSent) {
279             finSent = true;
280             parent().streamSendFin(streamId());
281         }
282     }
283 
284     private void closeIfDone() {
285         if (finSent && (finReceived || type() == QuicStreamType.UNIDIRECTIONAL && isLocalCreated())) {
286             unsafe().close(unsafe().voidPromise());
287         }
288     }
289 
290     private void removeStreamFromParent() {
291         if (!active && finReceived) {
292             parent().streamClosed(streamId());
293             inputShutdown = true;
294             outputShutdown = true;
295         }
296     }
297 
298     @Override
299     public QuicStreamChannel flush() {
300         pipeline.flush();
301         return this;
302     }
303 
304     @Override
305     public QuicStreamChannel read() {
306         pipeline.read();
307         return this;
308     }
309 
310     @Override
311     public QuicStreamChannelConfig config() {
312         return config;
313     }
314 
315     @Override
316     public boolean isOpen() {
317         return active;
318     }
319 
320     @Override
321     public boolean isActive() {
322         return isOpen();
323     }
324 
325     @Override
326     public ChannelMetadata metadata() {
327         return METADATA;
328     }
329 
330     @Override
331     public ChannelId id() {
332         return id;
333     }
334 
335     @Override
336     public EventLoop eventLoop() {
337         return parent.eventLoop();
338     }
339 
340     @Override
341     public boolean isRegistered() {
342         return registered;
343     }
344 
345     @Override
346     public ChannelFuture closeFuture() {
347         return closePromise;
348     }
349 
350     @Override
351     public boolean isWritable() {
352         return writable;
353     }
354 
355     @Override
356     public long bytesBeforeUnwritable() {
357         // Capacity might be negative if the stream was closed.
358         return Math.max(capacity, 0);
359     }
360 
361     @Override
362     public long bytesBeforeWritable() {
363         if (writable) {
364             return 0;
365         }
366         // Just return something positive for now
367         return 8;
368     }
369 
370     @Override
371     public QuicStreamChannelUnsafe unsafe() {
372         return unsafe;
373     }
374 
375     @Override
376     public ChannelPipeline pipeline() {
377         return pipeline;
378     }
379 
380     @Override
381     public ByteBufAllocator alloc() {
382         return config.getAllocator();
383     }
384 
385     @Override
386     public int compareTo(Channel o) {
387         return id.compareTo(o.id());
388     }
389 
390     /**
391      * Returns the ID of this channel.
392      */
393     @Override
394     public int hashCode() {
395         return id.hashCode();
396     }
397 
398     /**
399      * Returns {@code true} if and only if the specified object is identical
400      * with this channel (i.e: {@code this == o}).
401      */
402     @Override
403     public boolean equals(Object o) {
404         return this == o;
405     }
406 
407     @Override
408     public String toString() {
409         return "[id: 0x" + id.asShortText() + ", " + address + "]";
410     }
411 
412     /**
413      * Stream writability changed.
414      */
415     boolean writable(int capacity) {
416         assert eventLoop().inEventLoop();
417         if (capacity < 0) {
418             // If the value is negative its a quiche error.
419             if (capacity != Quiche.QUICHE_ERR_DONE) {
420                 if (!queue.isEmpty()) {
421                     if (capacity == Quiche.QUICHE_ERR_STREAM_STOPPED) {
422                         queue.removeAndFailAll(new ChannelOutputShutdownException("STOP_SENDING frame received"));
423                         // If STOP_SENDING is received we should not close the channel but just fail all queued writes.
424                         return false;
425                     } else {
426                         queue.removeAndFailAll(Quiche.convertToException(capacity));
427                     }
428                 } else if (capacity == Quiche.QUICHE_ERR_STREAM_STOPPED) {
429                     // If STOP_SENDING is received we should not close the channel
430                     return false;
431                 }
432                 // IF this error was not QUICHE_ERR_STREAM_STOPPED we should close the channel.
433                 finSent = true;
434                 unsafe().close(unsafe().voidPromise());
435             }
436             return false;
437         }
438         this.capacity = capacity;
439         boolean mayNeedWrite = unsafe().writeQueued();
440         // we need to re-read this.capacity as writeQueued() may update the capacity.
441         updateWritabilityIfNeeded(this.capacity > 0);
442         return mayNeedWrite;
443     }
444 
445     private void updateWritabilityIfNeeded(boolean newWritable) {
446         if (writable != newWritable) {
447             writable = newWritable;
448             pipeline.fireChannelWritabilityChanged();
449         }
450     }
451 
452     /**
453      * Stream is readable.
454      */
455     void readable() {
456         assert eventLoop().inEventLoop();
457         // Mark as readable and if a read is pending execute it.
458         readable = true;
459         if (readPending) {
460             unsafe().recv();
461         }
462     }
463 
464     final class QuicStreamChannelUnsafe implements Unsafe {
465 
466         @SuppressWarnings("deprecation")
467         private RecvByteBufAllocator.Handle recvHandle;
468 
469         private final ChannelPromise voidPromise = new VoidChannelPromise(
470                 QuicheQuicStreamChannel.this, false);
471         @Override
472         public void connect(SocketAddress remote, SocketAddress local, ChannelPromise promise) {
473             assert eventLoop().inEventLoop();
474             promise.setFailure(new UnsupportedOperationException());
475         }
476 
477         @SuppressWarnings("deprecation")
478         @Override
479         public RecvByteBufAllocator.Handle recvBufAllocHandle() {
480             if (recvHandle == null) {
481                 recvHandle = config.getRecvByteBufAllocator().newHandle();
482             }
483             return recvHandle;
484         }
485 
486         @Override
487         public SocketAddress localAddress() {
488             return address;
489         }
490 
491         @Override
492         public SocketAddress remoteAddress() {
493             return address;
494         }
495 
496         @Override
497         public void register(EventLoop eventLoop, ChannelPromise promise) {
498             assert eventLoop.inEventLoop();
499             if (!promise.setUncancellable()) {
500                 return;
501             }
502             if (registered) {
503                 promise.setFailure(new IllegalStateException());
504                 return;
505             }
506             if (eventLoop != parent.eventLoop()) {
507                 promise.setFailure(new IllegalArgumentException());
508                 return;
509             }
510             registered = true;
511             promise.setSuccess();
512             pipeline.fireChannelRegistered();
513             pipeline.fireChannelActive();
514         }
515 
516         @Override
517         public void bind(SocketAddress localAddress, ChannelPromise promise) {
518             assert eventLoop().inEventLoop();
519             if (!promise.setUncancellable()) {
520                 return;
521             }
522             promise.setFailure(new UnsupportedOperationException());
523         }
524 
525         @Override
526         public void disconnect(ChannelPromise promise) {
527             assert eventLoop().inEventLoop();
528             close(promise);
529         }
530 
531         @Override
532         public void close(ChannelPromise promise) {
533             close(null, promise);
534         }
535 
536         void close(@Nullable ClosedChannelException writeFailCause, ChannelPromise promise) {
537             assert eventLoop().inEventLoop();
538             if (!promise.setUncancellable()) {
539                 return;
540             }
541             if (!active || closePromise.isDone()) {
542                 if (promise.isVoid()) {
543                     return;
544                 }
545                 closePromise.addListener(new PromiseNotifier<>(promise));
546                 return;
547             }
548             active = false;
549             try {
550                 // Close the channel and fail the queued messages in all cases.
551                 sendFinIfNeeded();
552             } catch (Exception ignore) {
553                 // Just ignore
554             } finally {
555                 if (!queue.isEmpty()) {
556                     // Only fail if the queue is non-empty.
557                     if (writeFailCause == null) {
558                         writeFailCause = new ClosedChannelException();
559                     }
560                     queue.removeAndFailAll(writeFailCause);
561                 }
562 
563                 promise.trySuccess();
564                 closePromise.trySuccess();
565                 if (type() == QuicStreamType.UNIDIRECTIONAL && isLocalCreated()) {
566                     inputShutdown = true;
567                     outputShutdown = true;
568                     // If its an unidirectional stream and was created locally it is safe to close the stream now as
569                     // we will never receive data from the other side.
570                     parent().streamClosed(streamId());
571                 } else {
572                     removeStreamFromParent();
573                 }
574             }
575             if (inWriteQueued) {
576                 invokeLater(() -> deregister(voidPromise(), true));
577             } else {
578                 deregister(voidPromise(), true);
579             }
580         }
581 
582         private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
583             assert eventLoop().inEventLoop();
584             if (!promise.setUncancellable()) {
585                 return;
586             }
587 
588             if (!registered) {
589                 promise.trySuccess();
590                 return;
591             }
592 
593             // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
594             // we need to ensure we do the actual deregister operation later. This is needed as for example,
595             // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
596             // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
597             // the deregister operation this could lead to have a handler invoked by different EventLoop and so
598             // threads.
599             //
600             // See:
601             // https://github.com/netty/netty/issues/4435
602             invokeLater(() -> {
603                 if (fireChannelInactive) {
604                     pipeline.fireChannelInactive();
605                 }
606                 // Some transports like local and AIO does not allow the deregistration of
607                 // an open channel.  Their doDeregister() calls close(). Consequently,
608                 // close() calls deregister() again - no need to fire channelUnregistered, so check
609                 // if it was registered.
610                 if (registered) {
611                     registered = false;
612                     pipeline.fireChannelUnregistered();
613                 }
614                 promise.setSuccess();
615             });
616         }
617 
618         private void invokeLater(Runnable task) {
619             try {
620                 // This method is used by outbound operation implementations to trigger an inbound event later.
621                 // They do not trigger an inbound event immediately because an outbound operation might have been
622                 // triggered by another inbound event handler method.  If fired immediately, the call stack
623                 // will look like this for example:
624                 //
625                 //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
626                 //   -> handlerA.ctx.close()
627                 //      -> channel.unsafe.close()
628                 //         -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
629                 //
630                 // which means the execution of two inbound handler methods of the same handler overlap undesirably.
631                 eventLoop().execute(task);
632             } catch (RejectedExecutionException e) {
633                 LOGGER.warn("Can't invoke task later as EventLoop rejected it", e);
634             }
635         }
636 
637         @Override
638         public void closeForcibly() {
639             assert eventLoop().inEventLoop();
640             close(unsafe().voidPromise());
641         }
642 
643         @Override
644         public void deregister(ChannelPromise promise) {
645             assert eventLoop().inEventLoop();
646             deregister(promise, false);
647         }
648 
649         @Override
650         public void beginRead() {
651             assert eventLoop().inEventLoop();
652             readPending = true;
653             if (readable) {
654                 unsafe().recv();
655 
656                 // As the stream was readable, and we called recv() ourselves we also need to call
657                 // connectionSendAndFlush(). This is needed as recv() might consume data and so a window update
658                 // frame might be produced. If we miss to call connectionSendAndFlush() we might never send the update
659                 // to the remote peer and so the remote peer might never attempt to send more data.
660                 // See also https://docs.rs/quiche/latest/quiche/struct.Connection.html#method.send.
661                 parent().connectionSendAndFlush();
662             }
663         }
664 
665         private void closeIfNeeded(boolean wasFinSent) {
666             // Let's check if we should close the channel now.
667             // If it's a unidirectional channel we can close it as there will be no fin that we can read
668             // from the remote peer. If its an bidirectional channel we should only close the channel if we
669             // also received the fin from the remote peer.
670             if (!wasFinSent && QuicheQuicStreamChannel.this.finSent
671                     && (type() == QuicStreamType.UNIDIRECTIONAL || finReceived)) {
672                 // close the channel now
673                 close(voidPromise());
674             }
675         }
676 
677         boolean writeQueued() {
678             assert eventLoop().inEventLoop();
679             boolean wasFinSent = QuicheQuicStreamChannel.this.finSent;
680             inWriteQueued = true;
681             try {
682                 if (queue.isEmpty()) {
683                     return false;
684                 }
685                 boolean written = false;
686                 for (;;) {
687                     Object msg = queue.current();
688                     if (msg == null) {
689                         break;
690                     }
691                     try {
692                         int res = write0(msg);
693                         if (res == 1) {
694                             queue.remove().setSuccess();
695                             written = true;
696                         } else if (res == 0 || res == Quiche.QUICHE_ERR_DONE) {
697                             break;
698                         } else if (res == Quiche.QUICHE_ERR_STREAM_STOPPED) {
699                             // Once its signaled that the stream is stopped we can just fail everything.
700                             // That said we should not close the channel yet as there might be some data that is
701                             // not read yet by the user.
702                             queue.removeAndFailAll(
703                                     new ChannelOutputShutdownException("STOP_SENDING frame received"));
704                             break;
705                         } else {
706                             queue.remove().setFailure(Quiche.convertToException(res));
707                         }
708                     } catch (Exception e) {
709                         queue.remove().setFailure(e);
710                     }
711                 }
712                 if (written) {
713                     updateWritabilityIfNeeded(true);
714                 }
715                 return written;
716             } finally {
717                 closeIfNeeded(wasFinSent);
718                 inWriteQueued = false;
719             }
720         }
721 
722         @Override
723         public void write(Object msg, ChannelPromise promise) {
724             assert eventLoop().inEventLoop();
725             if (!promise.setUncancellable()) {
726                 ReferenceCountUtil.release(msg);
727                 return;
728             }
729             // Check first if the Channel is in a state in which it will accept writes, if not fail everything
730             // with the right exception
731             if (!isOpen()) {
732                 queueAndFailAll(msg, promise, new ClosedChannelException());
733             } else if (finSent) {
734                 queueAndFailAll(msg, promise, new ChannelOutputShutdownException("Fin was sent already"));
735             } else if (!queue.isEmpty()) {
736                 // If the queue is not empty we should just add the message to the queue as we will drain
737                 // it later once the stream becomes writable again.
738                 try {
739                     msg = filterMsg(msg);
740                 } catch (UnsupportedOperationException e) {
741                     ReferenceCountUtil.release(msg);
742                     promise.setFailure(e);
743                     return;
744                 }
745 
746                 // Touch the message to make things easier in terms of debugging buffer leaks.
747                 ReferenceCountUtil.touch(msg);
748                 queue.add(msg, promise);
749 
750                 // Try again to write queued messages.
751                 writeQueued();
752             } else {
753                 assert queue.isEmpty();
754                 writeWithoutCheckChannelState(msg, promise);
755             }
756         }
757 
758         private void queueAndFailAll(Object msg, ChannelPromise promise, Throwable cause) {
759             // Touch the message to make things easier in terms of debugging buffer leaks.
760             ReferenceCountUtil.touch(msg);
761 
762             queue.add(msg, promise);
763             queue.removeAndFailAll(cause);
764         }
765 
766         private Object filterMsg(Object msg) {
767             if (msg instanceof ByteBuf) {
768                 ByteBuf buffer = (ByteBuf)  msg;
769                 if (!buffer.isDirect()) {
770                     ByteBuf tmpBuffer = alloc().directBuffer(buffer.readableBytes());
771                     tmpBuffer.writeBytes(buffer, buffer.readerIndex(), buffer.readableBytes());
772                     buffer.release();
773                     return tmpBuffer;
774                 }
775             } else if (msg instanceof QuicStreamFrame) {
776                 QuicStreamFrame frame = (QuicStreamFrame) msg;
777                 ByteBuf buffer = frame.content();
778                 if (!buffer.isDirect()) {
779                     ByteBuf tmpBuffer = alloc().directBuffer(buffer.readableBytes());
780                     tmpBuffer.writeBytes(buffer, buffer.readerIndex(), buffer.readableBytes());
781                     QuicStreamFrame tmpFrame = frame.replace(tmpBuffer);
782                     frame.release();
783                     return tmpFrame;
784                 }
785             } else {
786                 throw new UnsupportedOperationException(
787                         "unsupported message type: " + StringUtil.simpleClassName(msg));
788             }
789             return msg;
790         }
791 
792         void writeWithoutCheckChannelState(Object msg, ChannelPromise promise) {
793             try {
794                 msg = filterMsg(msg);
795             } catch (UnsupportedOperationException e) {
796                 ReferenceCountUtil.release(msg);
797                 promise.setFailure(e);
798             }
799 
800             boolean wasFinSent = QuicheQuicStreamChannel.this.finSent;
801             boolean mayNeedWritabilityUpdate = false;
802             try {
803                 int res = write0(msg);
804                 if (res > 0) {
805                     ReferenceCountUtil.release(msg);
806                     promise.setSuccess();
807                     mayNeedWritabilityUpdate = capacity == 0;
808                 } else if (res == 0 || res == Quiche.QUICHE_ERR_DONE) {
809                     // Touch the message to make things easier in terms of debugging buffer leaks.
810                     ReferenceCountUtil.touch(msg);
811                     queue.add(msg, promise);
812                     mayNeedWritabilityUpdate = true;
813                 } else if (res == Quiche.QUICHE_ERR_STREAM_STOPPED) {
814                     throw new ChannelOutputShutdownException("STOP_SENDING frame received");
815                 } else {
816                     throw Quiche.convertToException(res);
817                 }
818             } catch (Exception e) {
819                 ReferenceCountUtil.release(msg);
820                 promise.setFailure(e);
821                 mayNeedWritabilityUpdate = capacity == 0;
822             } finally {
823                 if (mayNeedWritabilityUpdate) {
824                     updateWritabilityIfNeeded(false);
825                 }
826                 closeIfNeeded(wasFinSent);
827             }
828         }
829 
830         private int write0(Object msg) throws Exception {
831             if (type() == QuicStreamType.UNIDIRECTIONAL && !isLocalCreated()) {
832                 throw new UnsupportedOperationException(
833                         "Writes on non-local created streams that are unidirectional are not supported");
834             }
835             if (finSent) {
836                 throw new ChannelOutputShutdownException("Fin was sent already");
837             }
838 
839             final boolean fin;
840             ByteBuf buffer;
841             if (msg instanceof ByteBuf) {
842                 fin = false;
843                 buffer = (ByteBuf) msg;
844             } else {
845                 QuicStreamFrame frame = (QuicStreamFrame) msg;
846                 fin = frame.hasFin();
847                 buffer = frame.content();
848             }
849 
850             boolean readable = buffer.isReadable();
851             if (!fin && !readable) {
852                 return 1;
853             }
854 
855             boolean sendSomething = false;
856             try {
857                 do {
858                     int res = parent().streamSend(streamId(), buffer, fin);
859 
860                     // Update the capacity as well.
861                     int cap = parent.streamCapacity(streamId());
862                     if (cap >= 0) {
863                         capacity = cap;
864                     }
865                     if (res < 0) {
866                         return res;
867                     }
868                     if (readable && res == 0) {
869                         return 0;
870                     }
871                     sendSomething = true;
872                     buffer.skipBytes(res);
873                 } while (buffer.isReadable());
874 
875                 if (fin) {
876                     finSent = true;
877                     outputShutdown = true;
878                 }
879                 return 1;
880             } finally {
881                 // As we called quiche_conn_stream_send(...) we need to ensure we will call quiche_conn_send(...) either
882                 // now or we will do so once we see the channelReadComplete event.
883                 //
884                 // See https://docs.rs/quiche/0.6.0/quiche/struct.Connection.html#method.send
885                 if (sendSomething) {
886                     parent.connectionSendAndFlush();
887                 }
888             }
889         }
890 
891         @Override
892         public void flush() {
893             assert eventLoop().inEventLoop();
894             // NOOP.
895         }
896 
897         @Override
898         public ChannelPromise voidPromise() {
899             assert eventLoop().inEventLoop();
900             return voidPromise;
901         }
902 
903         @Override
904         @Nullable
905         public ChannelOutboundBuffer outboundBuffer() {
906             return null;
907         }
908 
909         private void closeOnRead(ChannelPipeline pipeline, boolean readFrames) {
910             if (readFrames && finReceived && finSent) {
911                 close(voidPromise());
912             } else if (config.isAllowHalfClosure()) {
913                 if (finReceived) {
914                     // If we receive a fin there will be no more data to read so we need to fire both events
915                     // to be consistent with other transports.
916                     pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
917                     pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
918                     if (finSent) {
919                         // This was an unidirectional stream which means as soon as we received FIN and sent a FIN
920                         // we need close the connection.
921                         close(voidPromise());
922                     }
923                 }
924             } else {
925                 // This was an unidirectional stream which means as soon as we received FIN we need
926                 // close the connection.
927                 close(voidPromise());
928             }
929         }
930 
931         private void handleReadException(ChannelPipeline pipeline, @Nullable ByteBuf byteBuf, Throwable cause,
932                                          @SuppressWarnings("deprecation") RecvByteBufAllocator.Handle allocHandle,
933                                          boolean readFrames) {
934             if (byteBuf != null) {
935                 if (byteBuf.isReadable()) {
936                     pipeline.fireChannelRead(byteBuf);
937                 } else {
938                     byteBuf.release();
939                 }
940             }
941 
942             readComplete(allocHandle, pipeline);
943             pipeline.fireExceptionCaught(cause);
944             if (finReceived) {
945                 closeOnRead(pipeline, readFrames);
946             }
947         }
948 
949         void recv() {
950             assert eventLoop().inEventLoop();
951             if (inRecv) {
952                 // As the use may call read() we need to guard against reentrancy here as otherwise it could
953                 // be possible that we re-enter this method while still processing it.
954                 return;
955             }
956 
957             inRecv = true;
958             try {
959                 ChannelPipeline pipeline = pipeline();
960                 QuicheQuicStreamChannelConfig config = (QuicheQuicStreamChannelConfig) config();
961                 // Directly access the DirectIoByteBufAllocator as we need an direct buffer to read into in all cases
962                 // even if there is no Unsafe present and the direct buffer is not pooled.
963                 DirectIoByteBufAllocator allocator = config.allocator;
964                 @SuppressWarnings("deprecation")
965                 RecvByteBufAllocator.Handle allocHandle = this.recvBufAllocHandle();
966                 boolean readFrames = config.isReadFrames();
967 
968                 // We should loop as long as a read() was requested and there is anything left to read, which means the
969                 // stream was marked as readable before.
970                 while (active && readPending && readable) {
971                     allocHandle.reset(config);
972                     ByteBuf byteBuf = null;
973                     QuicheQuicChannel parent = parent();
974                     // It's possible that the stream was marked as finish while we iterated over the readable streams
975                     // or while we did have auto read disabled. If so we need to ensure we not try to read from it as it
976                     // would produce an error.
977                     boolean readCompleteNeeded = false;
978                     boolean continueReading = true;
979                     try {
980                         while (!finReceived && continueReading) {
981                             byteBuf = allocHandle.allocate(allocator);
982                             allocHandle.attemptedBytesRead(byteBuf.writableBytes());
983                             switch (parent.streamRecv(streamId(), byteBuf)) {
984                                 case DONE:
985                                     // Nothing left to read;
986                                     readable = false;
987                                     break;
988                                 case FIN:
989                                     // If we received a FIN we also should mark the channel as non readable as
990                                     // there is nothing left to read really.
991                                     readable = false;
992                                     finReceived = true;
993                                     inputShutdown = true;
994                                     break;
995                                 case OK:
996                                     break;
997                                 default:
998                                     throw new Error();
999                             }
1000                             allocHandle.lastBytesRead(byteBuf.readableBytes());
1001                             if (allocHandle.lastBytesRead() <= 0) {
1002                                 byteBuf.release();
1003                                 if (finReceived && readFrames) {
1004                                     // If we read QuicStreamFrames we should fire an frame through the pipeline
1005                                     // with an empty buffer but the fin flag set to true.
1006                                     byteBuf = Unpooled.EMPTY_BUFFER;
1007                                 } else {
1008                                     byteBuf = null;
1009                                     break;
1010                                 }
1011                             }
1012                             // We did read one message.
1013                             allocHandle.incMessagesRead(1);
1014                             readCompleteNeeded = true;
1015 
1016                             // It's important that we reset this to false before we call fireChannelRead(...)
1017                             // as the user may request another read() from channelRead(...) callback.
1018                             readPending = false;
1019 
1020                             if (readFrames) {
1021                                 pipeline.fireChannelRead(new DefaultQuicStreamFrame(byteBuf, finReceived));
1022                             } else {
1023                                 pipeline.fireChannelRead(byteBuf);
1024                             }
1025                             byteBuf = null;
1026                             continueReading = allocHandle.continueReading();
1027                         }
1028 
1029                         if (readCompleteNeeded) {
1030                             readComplete(allocHandle, pipeline);
1031                         }
1032                         if (finReceived) {
1033                             readable = false;
1034                             closeOnRead(pipeline, readFrames);
1035                         }
1036                     } catch (Throwable cause) {
1037                         readable = false;
1038                         handleReadException(pipeline, byteBuf, cause, allocHandle, readFrames);
1039                     }
1040                 }
1041             } finally {
1042                 // About to leave the method lets reset so we can enter it again.
1043                 inRecv = false;
1044                 removeStreamFromParent();
1045             }
1046         }
1047 
1048         // Read was complete and something was read, so we we need to reset the readPending flags, the allocHandle
1049         // and call fireChannelReadComplete(). The user may schedule another read now.
1050         private void readComplete(@SuppressWarnings("deprecation") RecvByteBufAllocator.Handle allocHandle,
1051                                   ChannelPipeline pipeline) {
1052             allocHandle.readComplete();
1053             pipeline.fireChannelReadComplete();
1054         }
1055     }
1056 }