View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelMetadata;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.DefaultFileRegion;
27  import io.netty.channel.EventLoop;
28  import io.netty.channel.IoRegistration;
29  import io.netty.channel.socket.DuplexChannel;
30  import io.netty.channel.unix.IovArray;
31  import io.netty.channel.unix.Limits;
32  import io.netty.util.internal.logging.InternalLogger;
33  import io.netty.util.internal.logging.InternalLoggerFactory;
34  
35  import java.io.IOException;
36  import java.net.SocketAddress;
37  
38  import static io.netty.channel.unix.Errors.ioResult;
39  
40  abstract class AbstractIoUringStreamChannel extends AbstractIoUringChannel implements DuplexChannel {
41      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringStreamChannel.class);
42      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
43  
44      // Store the opCode so we know if we used WRITE or WRITEV.
45      byte writeOpCode;
46      // Keep track of the ids used for write and read so we can cancel these when needed.
47      long writeId;
48      byte readOpCode;
49      long readId;
50  
51      // The configured buffer ring if any
52      private IoUringBufferRing bufferRing;
53  
54      AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, boolean active) {
55          super(parent, socket, active);
56      }
57  
58      AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, SocketAddress remote) {
59          super(parent, socket, remote);
60      }
61  
62      @Override
63      public ChannelMetadata metadata() {
64          return METADATA;
65      }
66  
67      @Override
68      protected AbstractUringUnsafe newUnsafe() {
69          return new IoUringStreamUnsafe();
70      }
71  
72      @Override
73      public final ChannelFuture shutdown() {
74          return shutdown(newPromise());
75      }
76  
77      @Override
78      public final ChannelFuture shutdown(final ChannelPromise promise) {
79          ChannelFuture shutdownOutputFuture = shutdownOutput();
80          if (shutdownOutputFuture.isDone()) {
81              shutdownOutputDone(shutdownOutputFuture, promise);
82          } else {
83              shutdownOutputFuture.addListener(new ChannelFutureListener() {
84                  @Override
85                  public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
86                      shutdownOutputDone(shutdownOutputFuture, promise);
87                  }
88              });
89          }
90          return promise;
91      }
92  
93      @Override
94      protected final void doShutdownOutput() throws Exception {
95          socket.shutdown(false, true);
96      }
97  
98      private void shutdownInput0(final ChannelPromise promise) {
99          try {
100             socket.shutdown(true, false);
101             promise.setSuccess();
102         } catch (Throwable cause) {
103             promise.setFailure(cause);
104         }
105     }
106 
107     @Override
108     public final boolean isOutputShutdown() {
109         return socket.isOutputShutdown();
110     }
111 
112     @Override
113     public final boolean isInputShutdown() {
114         return socket.isInputShutdown();
115     }
116 
117     @Override
118     public final boolean isShutdown() {
119         return socket.isShutdown();
120     }
121 
122     @Override
123     public final ChannelFuture shutdownOutput() {
124         return shutdownOutput(newPromise());
125     }
126 
127     @Override
128     public final ChannelFuture shutdownOutput(final ChannelPromise promise) {
129         EventLoop loop = eventLoop();
130         if (loop.inEventLoop()) {
131             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
132         } else {
133             loop.execute(new Runnable() {
134                 @Override
135                 public void run() {
136                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
137                 }
138             });
139         }
140 
141         return promise;
142     }
143 
144     @Override
145     public final ChannelFuture shutdownInput() {
146         return shutdownInput(newPromise());
147     }
148 
149     @Override
150     public final ChannelFuture shutdownInput(final ChannelPromise promise) {
151         EventLoop loop = eventLoop();
152         if (loop.inEventLoop()) {
153             shutdownInput0(promise);
154         } else {
155             loop.execute(new Runnable() {
156                 @Override
157                 public void run() {
158                     shutdownInput0(promise);
159                 }
160             });
161         }
162         return promise;
163     }
164 
165     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
166         ChannelFuture shutdownInputFuture = shutdownInput();
167         if (shutdownInputFuture.isDone()) {
168             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
169         } else {
170             shutdownInputFuture.addListener(new ChannelFutureListener() {
171                 @Override
172                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
173                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
174                 }
175             });
176         }
177     }
178 
179     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
180                                      ChannelFuture shutdownInputFuture,
181                                      ChannelPromise promise) {
182         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
183         Throwable shutdownInputCause = shutdownInputFuture.cause();
184         if (shutdownOutputCause != null) {
185             if (shutdownInputCause != null) {
186                 logger.info("Exception suppressed because a previous exception occurred.",
187                              shutdownInputCause);
188             }
189             promise.setFailure(shutdownOutputCause);
190         } else if (shutdownInputCause != null) {
191             promise.setFailure(shutdownInputCause);
192         } else {
193             promise.setSuccess();
194         }
195     }
196 
197     @Override
198     protected final void doRegister(ChannelPromise promise) {
199         ChannelPromise registerPromise = this.newPromise();
200         // Ensure that the buffer group is properly set before channel::read
201         registerPromise.addListener(f -> {
202             if (f.isSuccess()) {
203                try {
204                    short bgid = ((IoUringStreamChannelConfig) config()).getBufferGroupId();
205                    if (bgid >= 0) {
206                        final IoUringIoHandler ioUringIoHandler = registration().attachment();
207                        bufferRing = ioUringIoHandler.findBufferRing(bgid);
208                    }
209                    if (active) {
210                        // Register for POLLRDHUP if this channel is already considered active.
211                        schedulePollRdHup();
212                    }
213                } finally {
214                    promise.setSuccess();
215                }
216             } else {
217                 promise.setFailure(f.cause());
218             }
219         });
220 
221         super.doRegister(registerPromise);
222     }
223 
224     @Override
225     protected Object filterOutboundMessage(Object msg) {
226         // Since we cannot use synchronous sendfile,
227         // the channel can only support DefaultFileRegion instead of FileRegion.
228         if (IoUring.isSpliceSupported() && msg instanceof DefaultFileRegion) {
229             return new IoUringFileRegion((DefaultFileRegion) msg);
230         }
231 
232         return super.filterOutboundMessage(msg);
233     }
234 
235     protected class IoUringStreamUnsafe extends AbstractUringUnsafe {
236 
237         private ByteBuf readBuffer;
238 
239         @Override
240         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
241             assert writeId == 0;
242             int numElements = Math.min(in.size(), Limits.IOV_MAX);
243             IoUringIoHandler handler = registration().attachment();
244             IovArray iovArray = handler.iovArray();
245             try {
246                 int offset = iovArray.count();
247                 in.forEachFlushedMessage(iovArray);
248 
249                 int fd = fd().intValue();
250                 IoRegistration registration = registration();
251                 IoUringIoOps ops = IoUringIoOps.newWritev(fd, (byte) 0, 0, iovArray.memoryAddress(offset),
252                         iovArray.count() - offset, nextOpsId());
253                 byte opCode = ops.opcode();
254                 writeId = registration.submit(ops);
255                 writeOpCode = opCode;
256                 if (writeId == 0) {
257                     return 0;
258                 }
259             } catch (Exception e) {
260                 // This should never happen, anyway fallback to single write.
261                 scheduleWriteSingle(in.current());
262             }
263             return 1;
264         }
265 
266         @Override
267         protected int scheduleWriteSingle(Object msg) {
268             assert writeId == 0;
269 
270             int fd = fd().intValue();
271             IoRegistration registration = registration();
272             final IoUringIoOps ops;
273             if (msg instanceof IoUringFileRegion) {
274                 IoUringFileRegion fileRegion = (IoUringFileRegion) msg;
275                 try {
276                     fileRegion.open();
277                 } catch (IOException e) {
278                     this.handleWriteError(e);
279                     return 0;
280                 }
281                 ops = fileRegion.splice(fd);
282             } else {
283                 ByteBuf buf = (ByteBuf) msg;
284                 ops = IoUringIoOps.newWrite(fd, (byte) 0, 0,
285                         IoUring.memoryAddress(buf) + buf.readerIndex(), buf.readableBytes(), nextOpsId());
286             }
287 
288             byte opCode = ops.opcode();
289             writeId = registration.submit(ops);
290             writeOpCode = opCode;
291             if (writeId == 0) {
292                 return 0;
293             }
294             return 1;
295         }
296 
297         private int calculateRecvFlags(boolean first) {
298             // Depending on if this is the first read or not we will use Native.MSG_DONTWAIT.
299             // The idea is that if the socket is blocking we can do the first read in a blocking fashion
300             // and so not need to also register POLLIN. As we can not 100 % sure if reads after the first will
301             // be possible directly we schedule these with Native.MSG_DONTWAIT. This allows us to still be
302             // able to signal the fireChannelReadComplete() in a timely manner and be consistent with other
303             // transports.
304             if (first) {
305                 return 0;
306             }
307             return Native.MSG_DONTWAIT;
308         }
309 
310         private short calculateRecvIoPrio(boolean first, boolean socketIsEmpty) {
311             // Depending on if socketIsEmpty is true we will arm the poll upfront and skip the initial transfer
312             // attempt.
313             // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#socket-state
314             if (first) {
315                 // IORING_RECVSEND_POLL_FIRST and IORING_CQE_F_SOCK_NONEMPTY were added in the same release (5.19).
316                 // We need to check if it's supported as otherwise providing these would result in an -EINVAL.
317                 return socketIsEmpty && IoUring.isCqeFSockNonEmptySupported() ?
318                         Native.IORING_RECVSEND_POLL_FIRST : 0;
319             }
320             return 0;
321         }
322 
323         @Override
324         protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
325             assert readBuffer == null;
326             assert readId == 0 : readId;
327             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
328 
329             if (bufferRing != null && bufferRing.isUsable()) {
330                 return scheduleReadProviderBuffer(bufferRing, first, socketIsEmpty);
331             }
332 
333             // We either have no buffer ring configured or we force a recv without using a buffer ring.
334             ByteBuf byteBuf = allocHandle.allocate(alloc());
335             try {
336                 int fd = fd().intValue();
337                 IoRegistration registration = registration();
338                 short ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
339                 int recvFlags = calculateRecvFlags(first);
340 
341                 IoUringIoOps ops = IoUringIoOps.newRecv(fd, (byte) 0, ioPrio, recvFlags,
342                         IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex(), byteBuf.writableBytes(), nextOpsId());
343                 readId = registration.submit(ops);
344                 readOpCode = Native.IORING_OP_RECV;
345                 if (readId == 0) {
346                     return 0;
347                 }
348                 readBuffer = byteBuf;
349                 byteBuf = null;
350                 return 1;
351             } finally {
352                 if (byteBuf != null) {
353                     byteBuf.release();
354                 }
355             }
356         }
357 
358         private int scheduleReadProviderBuffer(IoUringBufferRing bufferRing, boolean first, boolean socketIsEmpty) {
359             short bgId = bufferRing.bufferGroupId();
360             try {
361                 boolean multishot = IoUring.isRecvMultishotEnabled();
362                 byte flags = (byte) Native.IOSQE_BUFFER_SELECT;
363                 short ioPrio;
364                 final int recvFlags;
365                 if (multishot) {
366                     ioPrio = Native.IORING_RECV_MULTISHOT;
367                     recvFlags = 0;
368                 } else {
369                     // We should only use the calculate*() methods if this is not a multishot recv, as otherwise
370                     // the would be applied until the multishot will be re-armed.
371                     ioPrio = calculateRecvIoPrio(first, socketIsEmpty);
372                     recvFlags = calculateRecvFlags(first);
373                 }
374                 if (IoUring.isRecvsendBundleEnabled()) {
375                     // See https://github.com/axboe/liburing/wiki/
376                     // What's-new-with-io_uring-in-6.10#add-support-for-sendrecv-bundles
377                     ioPrio |= Native.IORING_RECVSEND_BUNDLE;
378                 }
379                 IoRegistration registration = registration();
380                 int fd = fd().intValue();
381                 IoUringIoOps ops = IoUringIoOps.newRecv(
382                         fd, flags, ioPrio, recvFlags, 0,
383                         0, nextOpsId(), bgId
384                 );
385                 readId = registration.submit(ops);
386                 readOpCode = Native.IORING_OP_RECV;
387                 if (readId == 0) {
388                     return 0;
389                 }
390                 if (multishot) {
391                     // Return -1 to signal we used multishot and so expect multiple recvComplete(...) calls.
392                     return -1;
393                 }
394                 return 1;
395             } catch (IllegalArgumentException illegalArgumentException) {
396                 this.handleReadException(pipeline(), null, illegalArgumentException, false, recvBufAllocHandle());
397                 return 0;
398             }
399         }
400 
401         @Override
402         protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
403             ByteBuf byteBuf = readBuffer;
404             readBuffer = null;
405             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
406                 readId = 0;
407                 // In case of cancellation we should reset the last used buffer ring to null as we will select a new one
408                 // when calling scheduleRead(..)
409                 if (byteBuf != null) {
410                     //recv without buffer ring
411                     byteBuf.release();
412                 }
413                 return;
414             }
415             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
416             boolean useBufferRing = (flags & Native.IORING_CQE_F_BUFFER) != 0;
417             short bid = (short) (flags >> Native.IORING_CQE_BUFFER_SHIFT);
418             boolean more = (flags & Native.IORING_CQE_F_BUF_MORE) != 0;
419 
420             boolean empty = socketIsEmpty(flags);
421             if (rearm) {
422                 // Only reset if we don't use multi-shot or we need to re-arm because the multi-shot was cancelled.
423                 readId = 0;
424             }
425 
426             boolean allDataRead = false;
427 
428             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
429             final ChannelPipeline pipeline = pipeline();
430 
431             try {
432                 if (res < 0) {
433                     if (res == Native.ERRNO_NOBUFS_NEGATIVE) {
434                         // recv with provider buffer failed, Fire the BufferRingExhaustedEvent to notify users.
435                         // About the failure. If this happens to often the user should most likely increase the
436                         // buffer ring size.
437                         pipeline.fireUserEventTriggered(bufferRing.getExhaustedEvent());
438 
439                         // try to expand the buffer ring by adding more buffers to it if there is any space left.
440                         bufferRing.expand();
441 
442                         // Let's trigger a read again without consulting the RecvByteBufAllocator.Handle as
443                         // we can't count this as a "real" read operation.
444                         // Because of how our BufferRing works we should have it filled again.
445                         scheduleRead(allocHandle.isFirstRead());
446                         return;
447                     }
448 
449                     // If res is negative we should pass it to ioResult(...) which will either throw
450                     // or convert it to 0 if we could not read because the socket was not readable.
451                     allocHandle.lastBytesRead(ioResult("io_uring read", res));
452                 } else if (res > 0) {
453                     if (useBufferRing) {
454                         // If RECVSEND_BUNDLE is used we need to do a bit more work here.
455                         // In this case we might need to obtain multiple buffers out of the buffer ring as
456                         // multiple of them might have been filled for one recv operation.
457                         // See https://github.com/axboe/liburing/wiki/
458                         // What's-new-with-io_uring-in-6.10#add-support-for-sendrecv-bundles
459                         int read = res;
460                         for (;;) {
461                             int attemptedBytesRead = bufferRing.attemptedBytesRead(bid);
462                             byteBuf = bufferRing.useBuffer(bid, read, more);
463                             read -= byteBuf.readableBytes();
464                             allocHandle.attemptedBytesRead(attemptedBytesRead);
465                             allocHandle.lastBytesRead(byteBuf.readableBytes());
466 
467                             assert read >= 0;
468                             if (read == 0) {
469                                 // Just break here, we will handle the byteBuf below and also fill the bufferRing
470                                 // if needed later.
471                                 break;
472                             }
473                             allocHandle.incMessagesRead(1);
474                             pipeline.fireChannelRead(byteBuf);
475                             byteBuf = null;
476                             bid = bufferRing.nextBid(bid);
477                             if (!allocHandle.continueReading()) {
478                                 // We should call fireChannelReadComplete() to mimic a normal read loop.
479                                 allocHandle.readComplete();
480                                 pipeline.fireChannelReadComplete();
481                                 allocHandle.reset(config());
482                             }
483                         }
484                     } else {
485                         int attemptedBytesRead = byteBuf.writableBytes();
486                         byteBuf.writerIndex(byteBuf.writerIndex() + res);
487                         allocHandle.attemptedBytesRead(attemptedBytesRead);
488                         allocHandle.lastBytesRead(res);
489                     }
490                 } else {
491                     // EOF which we signal with -1.
492                     allocHandle.lastBytesRead(-1);
493                 }
494                 if (allocHandle.lastBytesRead() <= 0) {
495                     // byteBuf might be null if we used a buffer ring.
496                     if (byteBuf != null) {
497                         // nothing was read, release the buffer.
498                         byteBuf.release();
499                         byteBuf = null;
500                     }
501                     allDataRead = allocHandle.lastBytesRead() < 0;
502                     if (allDataRead) {
503                         // There is nothing left to read as we received an EOF.
504                         shutdownInput(true);
505                     }
506                     allocHandle.readComplete();
507                     pipeline.fireChannelReadComplete();
508                     return;
509                 }
510 
511                 allocHandle.incMessagesRead(1);
512                 pipeline.fireChannelRead(byteBuf);
513                 byteBuf = null;
514                 scheduleNextRead(pipeline, allocHandle, rearm, empty);
515             } catch (Throwable t) {
516                 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
517             }
518         }
519 
520         private void scheduleNextRead(ChannelPipeline pipeline, IoUringRecvByteAllocatorHandle allocHandle,
521                                       boolean rearm, boolean empty) {
522             if (allocHandle.continueReading() && !empty) {
523                 if (rearm) {
524                     // We only should schedule another read if we need to rearm.
525                     // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#multi-shot
526                     scheduleRead(false);
527                 }
528             } else {
529                 // We did not fill the whole ByteBuf so we should break the "read loop" and try again later.
530                 allocHandle.readComplete();
531                 pipeline.fireChannelReadComplete();
532             }
533         }
534 
535         protected final void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,
536                                          Throwable cause, boolean allDataRead,
537                                          IoUringRecvByteAllocatorHandle allocHandle) {
538             if (byteBuf != null) {
539                 if (byteBuf.isReadable()) {
540                     pipeline.fireChannelRead(byteBuf);
541                 } else {
542                     byteBuf.release();
543                 }
544             }
545             allocHandle.readComplete();
546             pipeline.fireChannelReadComplete();
547             pipeline.fireExceptionCaught(cause);
548             if (allDataRead || cause instanceof IOException) {
549                 shutdownInput(true);
550             }
551         }
552 
553         @Override
554         boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
555             writeId = 0;
556             writeOpCode = 0;
557 
558             ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
559             Object current = channelOutboundBuffer.current();
560             if (current instanceof IoUringFileRegion) {
561                 IoUringFileRegion fileRegion = (IoUringFileRegion) current;
562                 try {
563                     int result = res >= 0 ? res : ioResult("io_uring splice", res);
564                     if (result == 0 && fileRegion.count() > 0) {
565                         validateFileRegion(fileRegion.fileRegion, fileRegion.transfered());
566                         return false;
567                     }
568                     int progress = fileRegion.handleResult(result, data);
569                     if (progress == -1) {
570                         // Done with writing
571                         channelOutboundBuffer.remove();
572                     } else if (progress > 0) {
573                         channelOutboundBuffer.progress(progress);
574                     }
575                 } catch (Throwable cause) {
576                     handleWriteError(cause);
577                 }
578                 return true;
579             }
580 
581             if (res >= 0) {
582                 channelOutboundBuffer.removeBytes(res);
583             } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
584                 return true;
585             } else {
586                 try {
587                     if (ioResult("io_uring write", res) == 0) {
588                         return false;
589                     }
590                 } catch (Throwable cause) {
591                     handleWriteError(cause);
592                 }
593             }
594             return true;
595         }
596 
597         @Override
598         protected void freeResourcesNow(IoRegistration reg) {
599             super.freeResourcesNow(reg);
600             assert readBuffer == null;
601         }
602     }
603 
604     @Override
605     protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
606         if (readId != 0) {
607             // Let's try to cancel outstanding reads as these might be submitted and waiting for data (via fastpoll).
608             assert numOutstandingReads == 1 || numOutstandingReads == -1;
609             IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, readId, readOpCode);
610             long id = registration.submit(ops);
611             assert id != 0;
612             readId = 0;
613         }
614     }
615 
616     @Override
617     protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
618         if (writeId != 0) {
619             // Let's try to cancel outstanding writes as these might be submitted and waiting to finish writing
620             // (via fastpoll).
621             assert numOutstandingWrites == 1;
622             assert writeOpCode != 0;
623             long id = registration.submit(IoUringIoOps.newAsyncCancel((byte) 0, writeId, writeOpCode));
624             assert id != 0;
625             writeId = 0;
626         }
627     }
628 
629     @Override
630     protected boolean socketIsEmpty(int flags) {
631         return IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
632     }
633 
634     @Override
635     protected void submitAndRunNow() {
636         if (writeId != 0) {
637             // Force a submit and processing of the completions to ensure we drain the outbound buffer and
638             // send the data to the remote peer.
639             ((IoUringIoHandler) registration().attachment()).submitAndRunNow(writeId);
640         }
641         super.submitAndRunNow();
642     }
643 
644     @Override
645     boolean isPollInFirst() {
646         return bufferRing == null || !bufferRing.isUsable();
647     }
648 }