View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelMetadata;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPipeline;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.DefaultFileRegion;
27  import io.netty.channel.EventLoop;
28  import io.netty.channel.socket.DuplexChannel;
29  import io.netty.channel.unix.IovArray;
30  import io.netty.channel.unix.Limits;
31  import io.netty.util.internal.logging.InternalLogger;
32  import io.netty.util.internal.logging.InternalLoggerFactory;
33  
34  import java.io.IOException;
35  import java.net.SocketAddress;
36  
37  import static io.netty.channel.unix.Errors.ioResult;
38  
39  abstract class AbstractIoUringStreamChannel extends AbstractIoUringChannel implements DuplexChannel {
40      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIoUringStreamChannel.class);
41      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
42  
43      // Store the opCode so we know if we used WRITE or WRITEV.
44      private byte writeOpCode;
45  
46      // Keep track of the ids used for write and read so we can cancel these when needed.
47      private long writeId;
48      private long readId;
49  
50      AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, boolean active) {
51          super(parent, socket, active);
52      }
53  
54      AbstractIoUringStreamChannel(Channel parent, LinuxSocket socket, SocketAddress remote) {
55          super(parent, socket, remote);
56      }
57  
58      @Override
59      public ChannelMetadata metadata() {
60          return METADATA;
61      }
62  
63      @Override
64      protected final AbstractUringUnsafe newUnsafe() {
65          return new IoUringStreamUnsafe();
66      }
67  
68      @Override
69      public final ChannelFuture shutdown() {
70          return shutdown(newPromise());
71      }
72  
73      @Override
74      public final ChannelFuture shutdown(final ChannelPromise promise) {
75          ChannelFuture shutdownOutputFuture = shutdownOutput();
76          if (shutdownOutputFuture.isDone()) {
77              shutdownOutputDone(shutdownOutputFuture, promise);
78          } else {
79              shutdownOutputFuture.addListener(new ChannelFutureListener() {
80                  @Override
81                  public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
82                      shutdownOutputDone(shutdownOutputFuture, promise);
83                  }
84              });
85          }
86          return promise;
87      }
88  
89      @Override
90      protected final void doShutdownOutput() throws Exception {
91          socket.shutdown(false, true);
92      }
93  
94      private void shutdownInput0(final ChannelPromise promise) {
95          try {
96              socket.shutdown(true, false);
97              promise.setSuccess();
98          } catch (Throwable cause) {
99              promise.setFailure(cause);
100         }
101     }
102 
103     @Override
104     public final boolean isOutputShutdown() {
105         return socket.isOutputShutdown();
106     }
107 
108     @Override
109     public final boolean isInputShutdown() {
110         return socket.isInputShutdown();
111     }
112 
113     @Override
114     public final boolean isShutdown() {
115         return socket.isShutdown();
116     }
117 
118     @Override
119     public final ChannelFuture shutdownOutput() {
120         return shutdownOutput(newPromise());
121     }
122 
123     @Override
124     public final ChannelFuture shutdownOutput(final ChannelPromise promise) {
125         EventLoop loop = eventLoop();
126         if (loop.inEventLoop()) {
127             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
128         } else {
129             loop.execute(new Runnable() {
130                 @Override
131                 public void run() {
132                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
133                 }
134             });
135         }
136 
137         return promise;
138     }
139 
140     @Override
141     public final ChannelFuture shutdownInput() {
142         return shutdownInput(newPromise());
143     }
144 
145     @Override
146     public final ChannelFuture shutdownInput(final ChannelPromise promise) {
147         EventLoop loop = eventLoop();
148         if (loop.inEventLoop()) {
149             shutdownInput0(promise);
150         } else {
151             loop.execute(new Runnable() {
152                 @Override
153                 public void run() {
154                     shutdownInput0(promise);
155                 }
156             });
157         }
158         return promise;
159     }
160 
161     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
162         ChannelFuture shutdownInputFuture = shutdownInput();
163         if (shutdownInputFuture.isDone()) {
164             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
165         } else {
166             shutdownInputFuture.addListener(new ChannelFutureListener() {
167                 @Override
168                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
169                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
170                 }
171             });
172         }
173     }
174 
175     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
176                                      ChannelFuture shutdownInputFuture,
177                                      ChannelPromise promise) {
178         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
179         Throwable shutdownInputCause = shutdownInputFuture.cause();
180         if (shutdownOutputCause != null) {
181             if (shutdownInputCause != null) {
182                 logger.info("Exception suppressed because a previous exception occurred.",
183                              shutdownInputCause);
184             }
185             promise.setFailure(shutdownOutputCause);
186         } else if (shutdownInputCause != null) {
187             promise.setFailure(shutdownInputCause);
188         } else {
189             promise.setSuccess();
190         }
191     }
192 
193     @Override
194     protected final void doRegister(ChannelPromise promise) {
195         super.doRegister(promise);
196         promise.addListener(f -> {
197             if (f.isSuccess()) {
198                 if (active) {
199                     // Register for POLLRDHUP if this channel is already considered active.
200                     schedulePollRdHup();
201                 }
202             }
203         });
204     }
205 
206     @Override
207     protected Object filterOutboundMessage(Object msg) {
208         // Since we cannot use synchronous sendfile,
209         // the channel can only support DefaultFileRegion instead of FileRegion.
210         if (IoUring.isIOUringSpliceSupported() && msg instanceof DefaultFileRegion) {
211             return new IoUringFileRegion((DefaultFileRegion) msg);
212         }
213 
214         return super.filterOutboundMessage(msg);
215     }
216 
217     private final class IoUringStreamUnsafe extends AbstractUringUnsafe {
218 
219         private ByteBuf readBuffer;
220         private IovArray iovArray;
221 
222         @Override
223         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
224             assert iovArray == null;
225             assert writeId == 0;
226             int numElements = Math.min(in.size(), Limits.IOV_MAX);
227             ByteBuf iovArrayBuffer = alloc().directBuffer(numElements * IovArray.IOV_SIZE);
228             iovArray = new IovArray(iovArrayBuffer);
229             try {
230                 int offset = iovArray.count();
231                 in.forEachFlushedMessage(iovArray);
232 
233                 int fd = fd().intValue();
234                 IoUringIoRegistration registration = registration();
235                 IoUringIoOps ops = IoUringIoOps.newWritev(fd, flags((byte) 0), 0, iovArray.memoryAddress(offset),
236                         iovArray.count() - offset, nextOpsId());
237                 byte opCode = ops.opcode();
238                 writeId = registration.submit(ops);
239                 writeOpCode = opCode;
240                 if (writeId == 0) {
241                     iovArray.release();
242                     iovArray = null;
243                     return 0;
244                 }
245             } catch (Exception e) {
246                 iovArray.release();
247                 iovArray = null;
248 
249                 // This should never happen, anyway fallback to single write.
250                 scheduleWriteSingle(in.current());
251             }
252             return 1;
253         }
254 
255         @Override
256         protected int scheduleWriteSingle(Object msg) {
257             assert iovArray == null;
258             assert writeId == 0;
259 
260             int fd = fd().intValue();
261             IoUringIoRegistration registration = registration();
262             final IoUringIoOps ops;
263             if (msg instanceof IoUringFileRegion) {
264                 IoUringFileRegion fileRegion = (IoUringFileRegion) msg;
265                 try {
266                     fileRegion.open();
267                 } catch (IOException e) {
268                     this.handleWriteError(e);
269                     return 0;
270                 }
271                 ops = fileRegion.splice(fd);
272             } else {
273                 ByteBuf buf = (ByteBuf) msg;
274                 ops = IoUringIoOps.newWrite(fd, flags((byte) 0), 0,
275                         buf.memoryAddress() + buf.readerIndex(), buf.readableBytes(), nextOpsId());
276             }
277 
278             byte opCode = ops.opcode();
279             writeId = registration.submit(ops);
280             writeOpCode = opCode;
281             if (writeId == 0) {
282                 return 0;
283             }
284             return 1;
285         }
286 
287         @Override
288         protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
289             assert readBuffer == null;
290             assert readId == 0;
291 
292             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
293             ByteBuf byteBuf = allocHandle.allocate(alloc());
294             try {
295                 allocHandle.attemptedBytesRead(byteBuf.writableBytes());
296                 int fd = fd().intValue();
297                 IoUringIoRegistration registration = registration();
298 
299                 // Depending on if socketIsEmpty is true we will arm the poll upfront and skip the initial transfer
300                 // attempt.
301                 // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#socket-state
302                 final short ioPrio;
303 
304                 // Depending on if this is the first read or not we will use Native.MSG_DONTWAIT.
305                 // The idea is that if the socket is blocking we can do the first read in a blocking fashion
306                 // and so not need to also register POLLIN. As we can not 100 % sure if reads after the first will
307                 // be possible directly we schedule these with Native.MSG_DONTWAIT. This allows us to still be
308                 // able to signal the fireChannelReadComplete() in a timely manner and be consistent with other
309                 // transports.
310                 final int recvFlags;
311 
312                 if (first) {
313                     // IORING_RECVSEND_POLL_FIRST and IORING_CQE_F_SOCK_NONEMPTY were added in the same release (5.19).
314                     // We need to check if it's supported as otherwise providing these would result in an -EINVAL.
315                     ioPrio = socketIsEmpty && IoUring.isIOUringCqeFSockNonEmptySupported() ?
316                             Native.IORING_RECVSEND_POLL_FIRST : 0;
317                     recvFlags = 0;
318                 } else {
319                     ioPrio = 0;
320                     recvFlags = Native.MSG_DONTWAIT;
321                 }
322                 IoUringIoOps ops = IoUringIoOps.newRecv(fd, flags((byte) 0), ioPrio, recvFlags,
323                         byteBuf.memoryAddress() + byteBuf.writerIndex(), byteBuf.writableBytes(), nextOpsId());
324                 readId = registration.submit(ops);
325                 if (readId == 0) {
326                     return 0;
327                 }
328                 readBuffer = byteBuf;
329                 byteBuf = null;
330                 return 1;
331             } finally {
332                 if (byteBuf != null) {
333                     byteBuf.release();
334                 }
335             }
336         }
337 
338         @Override
339         protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
340             assert readId != 0;
341             readId = 0;
342             boolean allDataRead = false;
343 
344             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
345             final ChannelPipeline pipeline = pipeline();
346             ByteBuf byteBuf = this.readBuffer;
347             this.readBuffer = null;
348             assert byteBuf != null;
349 
350             try {
351                 if (res < 0) {
352                     if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
353                         byteBuf.release();
354                         return;
355                     }
356                     // If res is negative we should pass it to ioResult(...) which will either throw
357                     // or convert it to 0 if we could not read because the socket was not readable.
358                     allocHandle.lastBytesRead(ioResult("io_uring read", res));
359                 } else if (res > 0) {
360                     byteBuf.writerIndex(byteBuf.writerIndex() + res);
361                     allocHandle.lastBytesRead(res);
362                 } else {
363                     // EOF which we signal with -1.
364                     allocHandle.lastBytesRead(-1);
365                 }
366                 if (allocHandle.lastBytesRead() <= 0) {
367                     // nothing was read, release the buffer.
368                     byteBuf.release();
369                     byteBuf = null;
370                     allDataRead = allocHandle.lastBytesRead() < 0;
371                     if (allDataRead) {
372                         // There is nothing left to read as we received an EOF.
373                         shutdownInput(true);
374                     }
375                     allocHandle.readComplete();
376                     pipeline.fireChannelReadComplete();
377                     return;
378                 }
379 
380                 allocHandle.incMessagesRead(1);
381                 pipeline.fireChannelRead(byteBuf);
382                 byteBuf = null;
383                 if (allocHandle.continueReading() && !socketIsEmpty(flags)) {
384                     // Let's schedule another read.
385                     scheduleRead(false);
386                 } else {
387                     // We did not fill the whole ByteBuf so we should break the "read loop" and try again later.
388                     allocHandle.readComplete();
389                     pipeline.fireChannelReadComplete();
390                 }
391             } catch (Throwable t) {
392                 handleReadException(pipeline, byteBuf, t, allDataRead, allocHandle);
393             }
394         }
395 
396         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,
397                                          Throwable cause, boolean allDataRead,
398                                          IoUringRecvByteAllocatorHandle allocHandle) {
399             if (byteBuf != null) {
400                 if (byteBuf.isReadable()) {
401                     pipeline.fireChannelRead(byteBuf);
402                 } else {
403                     byteBuf.release();
404                 }
405             }
406             allocHandle.readComplete();
407             pipeline.fireChannelReadComplete();
408             pipeline.fireExceptionCaught(cause);
409             if (allDataRead || cause instanceof IOException) {
410                 shutdownInput(true);
411             }
412         }
413 
414         @Override
415         boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
416             assert writeId != 0;
417             writeId = 0;
418             writeOpCode = 0;
419 
420             ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
421             Object current = channelOutboundBuffer.current();
422             if (current instanceof IoUringFileRegion) {
423                 IoUringFileRegion fileRegion = (IoUringFileRegion) current;
424                 try {
425                     int result = res >= 0 ? res : ioResult("io_uring splice", res);
426                     if (result == 0 && fileRegion.count() > 0) {
427                         validateFileRegion(fileRegion.fileRegion, fileRegion.transfered());
428                         return false;
429                     }
430                     int progress = fileRegion.handleResult(result, data);
431                     if (progress == -1) {
432                         // Done with writing
433                         channelOutboundBuffer.remove();
434                     } else if (progress > 0) {
435                         channelOutboundBuffer.progress(progress);
436                     }
437                 } catch (Throwable cause) {
438                     handleWriteError(cause);
439                 }
440                 return true;
441             }
442 
443             IovArray iovArray = this.iovArray;
444             if (iovArray != null) {
445                 this.iovArray = null;
446                 iovArray.release();
447             }
448             if (res >= 0) {
449                 channelOutboundBuffer.removeBytes(res);
450             } else if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
451                 return true;
452             } else {
453                 try {
454                     if (ioResult("io_uring write", res) == 0) {
455                         return false;
456                     }
457                 } catch (Throwable cause) {
458                     handleWriteError(cause);
459                 }
460             }
461             return true;
462         }
463 
464         @Override
465         protected void freeResourcesNow(IoUringIoRegistration reg) {
466             super.freeResourcesNow(reg);
467             assert readBuffer == null;
468         }
469     }
470 
471     @Override
472     protected final void cancelOutstandingReads(IoUringIoRegistration registration, int numOutstandingReads) {
473         if (readId != 0) {
474             // Let's try to cancel outstanding reads as these might be submitted and waiting for data (via fastpoll).
475             assert numOutstandingReads == 1;
476             int fd = fd().intValue();
477             IoUringIoOps ops = IoUringIoOps.newAsyncCancel(fd, flags((byte) 0), readId, Native.IORING_OP_RECV);
478             registration.submit(ops);
479         } else {
480             assert numOutstandingReads == 0;
481         }
482     }
483 
484     @Override
485     protected final void cancelOutstandingWrites(IoUringIoRegistration registration, int numOutstandingWrites) {
486         if (writeId != 0) {
487             // Let's try to cancel outstanding writes as these might be submitted and waiting to finish writing
488             // (via fastpoll).
489             assert numOutstandingWrites == 1;
490             assert writeOpCode != 0;
491             int fd = fd().intValue();
492             registration.submit(IoUringIoOps.newAsyncCancel(fd, flags((byte) 0), writeId, writeOpCode));
493         } else {
494             assert numOutstandingWrites == 0;
495         }
496     }
497 
498     @Override
499     protected boolean socketIsEmpty(int flags) {
500         return IoUring.isIOUringCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
501     }
502 }