View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelMetadata;
21  import io.netty.channel.ChannelOutboundBuffer;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.channel.ServerChannel;
25  import io.netty.channel.unix.Buffer;
26  import io.netty.channel.unix.Errors;
27  
28  import java.net.SocketAddress;
29  import java.nio.ByteBuffer;
30  
31  import static io.netty.channel.unix.Errors.ERRNO_EAGAIN_NEGATIVE;
32  import static io.netty.channel.unix.Errors.ERRNO_EWOULDBLOCK_NEGATIVE;
33  
34  abstract class AbstractIoUringServerChannel extends AbstractIoUringChannel implements ServerChannel {
35      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
36  
37      private final ByteBuffer acceptedAddressMemory;
38      private final ByteBuffer acceptedAddressLengthMemory;
39  
40      private final long acceptedAddressMemoryAddress;
41      private final long acceptedAddressLengthMemoryAddress;
42  
43      private long acceptId;
44  
45      protected AbstractIoUringServerChannel(LinuxSocket socket, boolean active) {
46          super(null, socket, active);
47  
48          acceptedAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
49          acceptedAddressMemoryAddress = Buffer.memoryAddress(acceptedAddressMemory);
50          acceptedAddressLengthMemory = Buffer.allocateDirectWithNativeOrder(Long.BYTES);
51          // Needs to be initialized to the size of acceptedAddressMemory.
52          // See https://man7.org/linux/man-pages/man2/accept.2.html
53          acceptedAddressLengthMemory.putLong(0, Native.SIZEOF_SOCKADDR_STORAGE);
54          acceptedAddressLengthMemoryAddress = Buffer.memoryAddress(acceptedAddressLengthMemory);
55      }
56  
57      @Override
58      public final ChannelMetadata metadata() {
59          return METADATA;
60      }
61  
62      @Override
63      protected final void doClose() throws Exception {
64          super.doClose();
65      }
66  
67      @Override
68      protected final AbstractUringUnsafe newUnsafe() {
69          return new UringServerChannelUnsafe();
70      }
71  
72      @Override
73      protected final void doWrite(ChannelOutboundBuffer in) {
74          throw new UnsupportedOperationException();
75      }
76  
77      @Override
78      protected final void cancelOutstandingReads(IoUringIoRegistration registration, int numOutstandingReads) {
79          if (acceptId != 0) {
80              assert numOutstandingReads == 1;
81              int fd = fd().intValue();
82              IoUringIoOps ops = IoUringIoOps.newAsyncCancel(
83                      fd, flags((byte) 0), acceptId, Native.IORING_OP_ACCEPT);
84              registration.submit(ops);
85          } else {
86              assert numOutstandingReads == 0;
87          }
88      }
89  
90      @Override
91      protected final void cancelOutstandingWrites(IoUringIoRegistration registration, int numOutstandingWrites) {
92          assert numOutstandingWrites == 0;
93      }
94  
95      abstract Channel newChildChannel(
96              int fd, long acceptedAddressMemoryAddress, long acceptedAddressLengthMemoryAddress) throws Exception;
97  
98      private final class UringServerChannelUnsafe extends AbstractIoUringChannel.AbstractUringUnsafe {
99  
100         @Override
101         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
102             throw new UnsupportedOperationException();
103         }
104 
105         @Override
106         protected int scheduleWriteSingle(Object msg) {
107             throw new UnsupportedOperationException();
108         }
109 
110         @Override
111         boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
112             throw new UnsupportedOperationException();
113         }
114 
115         @Override
116         protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
117             assert acceptId == 0;
118             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
119             allocHandle.attemptedBytesRead(1);
120 
121             int fd = fd().intValue();
122             IoUringIoRegistration registration = registration();
123 
124             // Depending on if socketIsEmpty is true we will arm the poll upfront and skip the initial transfer
125             // attempt.
126             // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#socket-state
127             //
128             // Depending on if this is the first read or not we will use Native.IORING_ACCEPT_DONT_WAIT.
129             // The idea is that if the socket is blocking we can do the first read in a blocking fashion
130             // and so not need to also register POLLIN. As we can not 100 % sure if reads after the first will
131             // be possible directly we schedule these with Native.IORING_ACCEPT_DONT_WAIT. This allows us to still be
132             // able to signal the fireChannelReadComplete() in a timely manner and be consistent with other
133             // transports.
134             final short ioPrio;
135 
136             // IORING_ACCEPT_POLL_FIRST and IORING_ACCEPT_DONTWAIT were added in the same release.
137             // We need to check if its supported as otherwise providing these would result in an -EINVAL.
138             if (IoUring.isIOUringAcceptNoWaitSupported()) {
139                 if (first) {
140                     ioPrio = socketIsEmpty ? Native.IORING_ACCEPT_POLL_FIRST : 0;
141                 } else {
142                     ioPrio = Native.IORING_ACCEPT_DONTWAIT;
143                 }
144             } else {
145                 ioPrio = 0;
146             }
147             // See https://github.com/axboe/liburing/wiki/What's-new-with-io_uring-in-6.10#improvements-for-accept
148             IoUringIoOps ops = IoUringIoOps.newAccept(fd, flags((byte) 0), 0, ioPrio,
149                     acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, nextOpsId());
150             acceptId = registration.submit(ops);
151             if (acceptId == 0) {
152                 return 0;
153             }
154             return 1;
155         }
156 
157         @Override
158         protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
159             assert acceptId != 0;
160             acceptId = 0;
161             final IoUringRecvByteAllocatorHandle allocHandle =
162                     (IoUringRecvByteAllocatorHandle) unsafe()
163                             .recvBufAllocHandle();
164             final ChannelPipeline pipeline = pipeline();
165             allocHandle.lastBytesRead(res);
166 
167             if (res >= 0) {
168                 allocHandle.incMessagesRead(1);
169                 try {
170                     Channel channel = newChildChannel(
171                             res, acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress);
172                     pipeline.fireChannelRead(channel);
173 
174                     if (allocHandle.continueReading() &&
175                             // If IORING_CQE_F_SOCK_NONEMPTY is supported we should check for it first before
176                             // trying to schedule a read. If it's supported and not part of the flags we know for sure
177                             // that the next read (which would be using Native.IORING_ACCEPT_DONTWAIT) will complete
178                             // without be able to read any data. This is useless work and we can skip it.
179                             //
180                             // See https://github.com/axboe/liburing/wiki/What's-new-with-io_uring-in-6.10
181                             !socketIsEmpty(flags)) {
182                         scheduleRead(false);
183                     } else {
184                         allocHandle.readComplete();
185                         pipeline.fireChannelReadComplete();
186                     }
187                 } catch (Throwable cause) {
188                     allocHandle.readComplete();
189                     pipeline.fireChannelReadComplete();
190                     pipeline.fireExceptionCaught(cause);
191                 }
192             } else if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
193                 allocHandle.readComplete();
194                 pipeline.fireChannelReadComplete();
195                 // Check if we did fail because there was nothing to accept atm.
196                 if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) {
197                     // Something bad happened. Convert to an exception.
198                     pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res));
199                 }
200             }
201         }
202 
203         @Override
204         public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
205                             final ChannelPromise promise) {
206             promise.setFailure(new UnsupportedOperationException());
207         }
208 
209         @Override
210         protected void freeResourcesNow(IoUringIoRegistration reg) {
211             super.freeResourcesNow(reg);
212             Buffer.free(acceptedAddressMemory);
213             Buffer.free(acceptedAddressLengthMemory);
214         }
215     }
216 
217     @Override
218     protected boolean socketIsEmpty(int flags) {
219         // IORING_CQE_F_SOCK_NONEMPTY is used for accept since IORING_ACCEPT_DONTWAIT was added.
220         // See https://github.com/axboe/liburing/wiki/What's-new-with-io_uring-in-6.10
221         return IoUring.isIOUringAcceptNoWaitSupported() &&
222                 IoUring.isIOUringCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
223     }
224 }
225