View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelMetadata;
21  import io.netty.channel.ChannelOutboundBuffer;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.channel.IoRegistration;
25  import io.netty.channel.ServerChannel;
26  import io.netty.channel.unix.Buffer;
27  import io.netty.channel.unix.Errors;
28  import io.netty.util.internal.CleanableDirectBuffer;
29  
30  import java.net.SocketAddress;
31  import java.nio.ByteBuffer;
32  
33  import static io.netty.channel.unix.Errors.ERRNO_EAGAIN_NEGATIVE;
34  import static io.netty.channel.unix.Errors.ERRNO_EWOULDBLOCK_NEGATIVE;
35  
36  abstract class AbstractIoUringServerChannel extends AbstractIoUringChannel implements ServerChannel {
37      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
38  
39      private static final class AcceptedAddressMemory {
40          private final CleanableDirectBuffer acceptedAddressMemoryCleanable;
41          private final ByteBuffer acceptedAddressMemory;
42          private final CleanableDirectBuffer acceptedAddressLengthMemoryCleanable;
43          private final ByteBuffer acceptedAddressLengthMemory;
44          private final long acceptedAddressMemoryAddress;
45          private final long acceptedAddressLengthMemoryAddress;
46  
47          AcceptedAddressMemory() {
48              acceptedAddressMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
49              acceptedAddressMemory = acceptedAddressMemoryCleanable.buffer();
50              acceptedAddressMemoryAddress = Buffer.memoryAddress(acceptedAddressMemory);
51              acceptedAddressLengthMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(Long.BYTES);
52              acceptedAddressLengthMemory = acceptedAddressLengthMemoryCleanable.buffer();
53              // Needs to be initialized to the size of acceptedAddressMemory.
54              // See https://man7.org/linux/man-pages/man2/accept.2.html
55              acceptedAddressLengthMemory.putLong(0, Native.SIZEOF_SOCKADDR_STORAGE);
56              acceptedAddressLengthMemoryAddress = Buffer.memoryAddress(acceptedAddressLengthMemory);
57          }
58  
59          void free() {
60              acceptedAddressMemoryCleanable.clean();
61              acceptedAddressLengthMemoryCleanable.clean();
62          }
63      }
64      private final AcceptedAddressMemory acceptedAddressMemory;
65      private long acceptId;
66  
67      protected AbstractIoUringServerChannel(LinuxSocket socket, boolean active) {
68          super(null, socket, active);
69  
70          // We can only depend on the accepted address if multi-shot is not used.
71          // From the manpage:
72          //       The multishot variants allow an application to issue a single
73          //       accept request, which will repeatedly trigger a CQE when a
74          //       connection request comes in. Like other multishot type requests,
75          //       the application should look at the CQE flags and see if
76          //       IORING_CQE_F_MORE is set on completion as an indication of whether
77          //       or not the accept request will generate further CQEs. Note that
78          //       for the multishot variants, setting addr and addrlen may not make
79          //       a lot of sense, as the same value would be used for every accepted
80          //       connection. This means that the data written to addr may be
81          //       overwritten by a new connection before the application has had
82          //       time to process a past connection. If the application knows that a
83          //       new connection cannot come in before a previous one has been
84          //       processed, it may be used as expected.
85          if (IoUring.isAcceptMultishotEnabled()) {
86              acceptedAddressMemory = null;
87          } else {
88              acceptedAddressMemory = new AcceptedAddressMemory();
89          }
90      }
91  
92      @Override
93      protected final boolean isStreamSocket() {
94          return true;
95      }
96  
97      @Override
98      public final ChannelMetadata metadata() {
99          return METADATA;
100     }
101 
102     @Override
103     protected final void doClose() throws Exception {
104         super.doClose();
105     }
106 
107     @Override
108     protected final AbstractUringUnsafe newUnsafe() {
109         return new UringServerChannelUnsafe();
110     }
111 
112     @Override
113     protected final void doWrite(ChannelOutboundBuffer in) {
114         throw new UnsupportedOperationException();
115     }
116 
117     @Override
118     protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
119         if (acceptId != 0) {
120             assert numOutstandingReads == 1 || numOutstandingReads == -1;
121             IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, acceptId, Native.IORING_OP_ACCEPT);
122             registration.submit(ops);
123             acceptId = 0;
124         } else {
125             assert numOutstandingReads == 0 || numOutstandingReads == -1;
126         }
127     }
128 
129     @Override
130     protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
131         assert numOutstandingWrites == 0;
132     }
133 
134     abstract Channel newChildChannel(
135             int fd, ByteBuffer acceptedAddressMemory) throws Exception;
136 
137     private final class UringServerChannelUnsafe extends AbstractIoUringChannel.AbstractUringUnsafe {
138 
139         @Override
140         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
141             throw new UnsupportedOperationException();
142         }
143 
144         @Override
145         protected int scheduleWriteSingle(Object msg) {
146             throw new UnsupportedOperationException();
147         }
148 
149         @Override
150         boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
151             throw new UnsupportedOperationException();
152         }
153 
154         @Override
155         protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
156             assert acceptId == 0;
157             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
158             allocHandle.attemptedBytesRead(1);
159 
160             int fd = fd().intValue();
161             IoRegistration registration = registration();
162 
163             final short ioPrio;
164 
165             final long acceptedAddressMemoryAddress;
166             final long acceptedAddressLengthMemoryAddress;
167             if (IoUring.isAcceptMultishotEnabled()) {
168                 // Let's use multi-shot accept to reduce overhead.
169                 ioPrio = Native.IORING_ACCEPT_MULTISHOT;
170                 acceptedAddressMemoryAddress = 0;
171                 acceptedAddressLengthMemoryAddress = 0;
172             } else {
173                 // Depending on if socketIsEmpty is true we will arm the poll upfront and skip the initial transfer
174                 // attempt.
175                 // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#socket-state
176                 //
177                 // Depending on if this is the first read or not we will use Native.IORING_ACCEPT_DONT_WAIT.
178                 // The idea is that if the socket is blocking we can do the first read in a blocking fashion
179                 // and so not need to also register POLLIN. As we can not 100 % sure if reads after the first will
180                 // be possible directly we schedule these with Native.IORING_ACCEPT_DONT_WAIT. This allows us to still
181                 // be able to signal the fireChannelReadComplete() in a timely manner and be consistent with other
182                 // transports.
183                 //
184                 // IORING_ACCEPT_POLL_FIRST and IORING_ACCEPT_DONTWAIT were added in the same release.
185                 // We need to check if its supported as otherwise providing these would result in an -EINVAL.
186                 if (IoUring.isAcceptNoWaitSupported()) {
187                     if (first) {
188                         ioPrio = socketIsEmpty ? Native.IORING_ACCEPT_POLL_FIRST : 0;
189                     } else {
190                         ioPrio = Native.IORING_ACCEPT_DONTWAIT;
191                     }
192                 } else {
193                     ioPrio = 0;
194                 }
195 
196                 assert acceptedAddressMemory != null;
197                 acceptedAddressMemoryAddress = acceptedAddressMemory.acceptedAddressMemoryAddress;
198                 acceptedAddressLengthMemoryAddress = acceptedAddressMemory.acceptedAddressLengthMemoryAddress;
199             }
200 
201             // See https://github.com/axboe/liburing/wiki/What's-new-with-io_uring-in-6.10#improvements-for-accept
202             IoUringIoOps ops = IoUringIoOps.newAccept(fd, (byte) 0, 0, ioPrio,
203                     acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, nextOpsId());
204             acceptId = registration.submit(ops);
205             if (acceptId == 0) {
206                 return 0;
207             }
208             if ((ioPrio & Native.IORING_ACCEPT_MULTISHOT) != 0) {
209                 // Let's return -1 to signal that we used multi-shot.
210                 return -1;
211             }
212             return 1;
213         }
214 
215         @Override
216         protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
217             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
218                 acceptId = 0;
219                 return;
220             }
221             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
222             if (rearm) {
223                 // Only reset if we don't use multi-shot or we need to re-arm because the multi-shot was cancelled.
224                 acceptId = 0;
225             }
226             final IoUringRecvByteAllocatorHandle allocHandle =
227                     (IoUringRecvByteAllocatorHandle) unsafe()
228                             .recvBufAllocHandle();
229             final ChannelPipeline pipeline = pipeline();
230             allocHandle.lastBytesRead(res);
231 
232             if (res >= 0) {
233                 allocHandle.incMessagesRead(1);
234                 final ByteBuffer acceptedAddressBuffer;
235                 final long acceptedAddressLengthMemoryAddress;
236                 if (acceptedAddressMemory == null) {
237                     acceptedAddressBuffer = null;
238                 } else {
239                     acceptedAddressBuffer = acceptedAddressMemory.acceptedAddressMemory;
240                 }
241                 try {
242                     Channel channel = newChildChannel(res, acceptedAddressBuffer);
243                     pipeline.fireChannelRead(channel);
244 
245                     if (allocHandle.continueReading() &&
246                             // If IORING_CQE_F_SOCK_NONEMPTY is supported we should check for it first before
247                             // trying to schedule a read. If it's supported and not part of the flags we know for sure
248                             // that the next read (which would be using Native.IORING_ACCEPT_DONTWAIT) will complete
249                             // without be able to read any data. This is useless work and we can skip it.
250                             //
251                             // See https://github.com/axboe/liburing/wiki/What's-new-with-io_uring-in-6.10
252                             !socketIsEmpty(flags)) {
253                         if (rearm) {
254                             // We only should schedule another read if we need to rearm.
255                             // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#multi-shot
256                             scheduleRead(false);
257                         }
258                     } else {
259                         allocHandle.readComplete();
260                         pipeline.fireChannelReadComplete();
261                     }
262                 } catch (Throwable cause) {
263                     allocHandle.readComplete();
264                     pipeline.fireChannelReadComplete();
265                     pipeline.fireExceptionCaught(cause);
266                 }
267             } else {
268                 allocHandle.readComplete();
269                 pipeline.fireChannelReadComplete();
270                 // Check if we did fail because there was nothing to accept atm.
271                 if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) {
272                     // Something bad happened. Convert to an exception.
273                     pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res));
274                 }
275             }
276         }
277 
278         @Override
279         public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
280                             final ChannelPromise promise) {
281             promise.setFailure(new UnsupportedOperationException());
282         }
283 
284         @Override
285         public void unregistered() {
286             super.unregistered();
287             if (acceptedAddressMemory != null) {
288                 acceptedAddressMemory.free();
289             }
290         }
291     }
292 
293     @Override
294     protected boolean socketIsEmpty(int flags) {
295         // IORING_CQE_F_SOCK_NONEMPTY is used for accept since IORING_ACCEPT_DONTWAIT was added.
296         // See https://github.com/axboe/liburing/wiki/What's-new-with-io_uring-in-6.10
297         return IoUring.isAcceptNoWaitSupported() &&
298                 IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
299     }
300 
301     @Override
302     boolean isPollInFirst() {
303         return false;
304     }
305 }
306