View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelMetadata;
21  import io.netty.channel.ChannelOutboundBuffer;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.channel.IoRegistration;
25  import io.netty.channel.ServerChannel;
26  import io.netty.channel.unix.Buffer;
27  import io.netty.channel.unix.Errors;
28  import io.netty.util.internal.CleanableDirectBuffer;
29  
30  import java.net.SocketAddress;
31  import java.nio.ByteBuffer;
32  
33  import static io.netty.channel.unix.Errors.ERRNO_EAGAIN_NEGATIVE;
34  import static io.netty.channel.unix.Errors.ERRNO_EWOULDBLOCK_NEGATIVE;
35  
36  abstract class AbstractIoUringServerChannel extends AbstractIoUringChannel implements ServerChannel {
37      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
38  
39      private static final class AcceptedAddressMemory {
40          private final CleanableDirectBuffer acceptedAddressMemoryCleanable;
41          private final ByteBuffer acceptedAddressMemory;
42          private final CleanableDirectBuffer acceptedAddressLengthMemoryCleanable;
43          private final ByteBuffer acceptedAddressLengthMemory;
44          private final long acceptedAddressMemoryAddress;
45          private final long acceptedAddressLengthMemoryAddress;
46  
47          AcceptedAddressMemory() {
48              acceptedAddressMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
49              acceptedAddressMemory = acceptedAddressMemoryCleanable.buffer();
50              acceptedAddressMemoryAddress = Buffer.memoryAddress(acceptedAddressMemory);
51              acceptedAddressLengthMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(Long.BYTES);
52              acceptedAddressLengthMemory = acceptedAddressLengthMemoryCleanable.buffer();
53              // Needs to be initialized to the size of acceptedAddressMemory.
54              // See https://man7.org/linux/man-pages/man2/accept.2.html
55              acceptedAddressLengthMemory.putLong(0, Native.SIZEOF_SOCKADDR_STORAGE);
56              acceptedAddressLengthMemoryAddress = Buffer.memoryAddress(acceptedAddressLengthMemory);
57          }
58  
59          void free() {
60              acceptedAddressMemoryCleanable.clean();
61              acceptedAddressLengthMemoryCleanable.clean();
62          }
63      }
64      private final AcceptedAddressMemory acceptedAddressMemory;
65      private long acceptId;
66  
67      protected AbstractIoUringServerChannel(LinuxSocket socket, boolean active) {
68          super(null, socket, active);
69  
70          // We can only depend on the accepted address if multi-shot is not used.
71          // From the manpage:
72          //       The multishot variants allow an application to issue a single
73          //       accept request, which will repeatedly trigger a CQE when a
74          //       connection request comes in. Like other multishot type requests,
75          //       the application should look at the CQE flags and see if
76          //       IORING_CQE_F_MORE is set on completion as an indication of whether
77          //       or not the accept request will generate further CQEs. Note that
78          //       for the multishot variants, setting addr and addrlen may not make
79          //       a lot of sense, as the same value would be used for every accepted
80          //       connection. This means that the data written to addr may be
81          //       overwritten by a new connection before the application has had
82          //       time to process a past connection. If the application knows that a
83          //       new connection cannot come in before a previous one has been
84          //       processed, it may be used as expected.
85          if (IoUring.isAcceptMultishotEnabled()) {
86              acceptedAddressMemory = null;
87          } else {
88              acceptedAddressMemory = new AcceptedAddressMemory();
89          }
90      }
91  
92      @Override
93      public final ChannelMetadata metadata() {
94          return METADATA;
95      }
96  
97      @Override
98      protected final void doClose() throws Exception {
99          super.doClose();
100     }
101 
102     @Override
103     protected final AbstractUringUnsafe newUnsafe() {
104         return new UringServerChannelUnsafe();
105     }
106 
107     @Override
108     protected final void doWrite(ChannelOutboundBuffer in) {
109         throw new UnsupportedOperationException();
110     }
111 
112     @Override
113     protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
114         if (acceptId != 0) {
115             assert numOutstandingReads == 1 || numOutstandingReads == -1;
116             IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, acceptId, Native.IORING_OP_ACCEPT);
117             registration.submit(ops);
118             acceptId = 0;
119         } else {
120             assert numOutstandingReads == 0 || numOutstandingReads == -1;
121         }
122     }
123 
124     @Override
125     protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
126         assert numOutstandingWrites == 0;
127     }
128 
129     abstract Channel newChildChannel(
130             int fd, ByteBuffer acceptedAddressMemory) throws Exception;
131 
132     private final class UringServerChannelUnsafe extends AbstractIoUringChannel.AbstractUringUnsafe {
133 
134         @Override
135         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
136             throw new UnsupportedOperationException();
137         }
138 
139         @Override
140         protected int scheduleWriteSingle(Object msg) {
141             throw new UnsupportedOperationException();
142         }
143 
144         @Override
145         boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
146             throw new UnsupportedOperationException();
147         }
148 
149         @Override
150         protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
151             assert acceptId == 0;
152             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
153             allocHandle.attemptedBytesRead(1);
154 
155             int fd = fd().intValue();
156             IoRegistration registration = registration();
157 
158             final short ioPrio;
159 
160             final long acceptedAddressMemoryAddress;
161             final long acceptedAddressLengthMemoryAddress;
162             if (IoUring.isAcceptMultishotEnabled()) {
163                 // Let's use multi-shot accept to reduce overhead.
164                 ioPrio = Native.IORING_ACCEPT_MULTISHOT;
165                 acceptedAddressMemoryAddress = 0;
166                 acceptedAddressLengthMemoryAddress = 0;
167             } else {
168                 // Depending on if socketIsEmpty is true we will arm the poll upfront and skip the initial transfer
169                 // attempt.
170                 // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#socket-state
171                 //
172                 // Depending on if this is the first read or not we will use Native.IORING_ACCEPT_DONT_WAIT.
173                 // The idea is that if the socket is blocking we can do the first read in a blocking fashion
174                 // and so not need to also register POLLIN. As we can not 100 % sure if reads after the first will
175                 // be possible directly we schedule these with Native.IORING_ACCEPT_DONT_WAIT. This allows us to still
176                 // be able to signal the fireChannelReadComplete() in a timely manner and be consistent with other
177                 // transports.
178                 //
179                 // IORING_ACCEPT_POLL_FIRST and IORING_ACCEPT_DONTWAIT were added in the same release.
180                 // We need to check if its supported as otherwise providing these would result in an -EINVAL.
181                 if (IoUring.isAcceptNoWaitSupported()) {
182                     if (first) {
183                         ioPrio = socketIsEmpty ? Native.IORING_ACCEPT_POLL_FIRST : 0;
184                     } else {
185                         ioPrio = Native.IORING_ACCEPT_DONTWAIT;
186                     }
187                 } else {
188                     ioPrio = 0;
189                 }
190 
191                 assert acceptedAddressMemory != null;
192                 acceptedAddressMemoryAddress = acceptedAddressMemory.acceptedAddressMemoryAddress;
193                 acceptedAddressLengthMemoryAddress = acceptedAddressMemory.acceptedAddressLengthMemoryAddress;
194             }
195 
196             // See https://github.com/axboe/liburing/wiki/What's-new-with-io_uring-in-6.10#improvements-for-accept
197             IoUringIoOps ops = IoUringIoOps.newAccept(fd, (byte) 0, 0, ioPrio,
198                     acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, nextOpsId());
199             acceptId = registration.submit(ops);
200             if (acceptId == 0) {
201                 return 0;
202             }
203             if ((ioPrio & Native.IORING_ACCEPT_MULTISHOT) != 0) {
204                 // Let's return -1 to signal that we used multi-shot.
205                 return -1;
206             }
207             return 1;
208         }
209 
210         @Override
211         protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
212             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
213                 acceptId = 0;
214                 return;
215             }
216             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
217             if (rearm) {
218                 // Only reset if we don't use multi-shot or we need to re-arm because the multi-shot was cancelled.
219                 acceptId = 0;
220             }
221             final IoUringRecvByteAllocatorHandle allocHandle =
222                     (IoUringRecvByteAllocatorHandle) unsafe()
223                             .recvBufAllocHandle();
224             final ChannelPipeline pipeline = pipeline();
225             allocHandle.lastBytesRead(res);
226 
227             if (res >= 0) {
228                 allocHandle.incMessagesRead(1);
229                 final ByteBuffer acceptedAddressBuffer;
230                 final long acceptedAddressLengthMemoryAddress;
231                 if (acceptedAddressMemory == null) {
232                     acceptedAddressBuffer = null;
233                 } else {
234                     acceptedAddressBuffer = acceptedAddressMemory.acceptedAddressMemory;
235                 }
236                 try {
237                     Channel channel = newChildChannel(res, acceptedAddressBuffer);
238                     pipeline.fireChannelRead(channel);
239 
240                     if (allocHandle.continueReading() &&
241                             // If IORING_CQE_F_SOCK_NONEMPTY is supported we should check for it first before
242                             // trying to schedule a read. If it's supported and not part of the flags we know for sure
243                             // that the next read (which would be using Native.IORING_ACCEPT_DONTWAIT) will complete
244                             // without be able to read any data. This is useless work and we can skip it.
245                             //
246                             // See https://github.com/axboe/liburing/wiki/What's-new-with-io_uring-in-6.10
247                             !socketIsEmpty(flags)) {
248                         if (rearm) {
249                             // We only should schedule another read if we need to rearm.
250                             // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#multi-shot
251                             scheduleRead(false);
252                         }
253                     } else {
254                         allocHandle.readComplete();
255                         pipeline.fireChannelReadComplete();
256                     }
257                 } catch (Throwable cause) {
258                     allocHandle.readComplete();
259                     pipeline.fireChannelReadComplete();
260                     pipeline.fireExceptionCaught(cause);
261                 }
262             } else {
263                 allocHandle.readComplete();
264                 pipeline.fireChannelReadComplete();
265                 // Check if we did fail because there was nothing to accept atm.
266                 if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) {
267                     // Something bad happened. Convert to an exception.
268                     pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res));
269                 }
270             }
271         }
272 
273         @Override
274         public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
275                             final ChannelPromise promise) {
276             promise.setFailure(new UnsupportedOperationException());
277         }
278 
279         @Override
280         protected void freeResourcesNow(IoRegistration reg) {
281             super.freeResourcesNow(reg);
282             if (acceptedAddressMemory != null) {
283                 acceptedAddressMemory.free();
284             }
285         }
286     }
287 
288     @Override
289     protected boolean socketIsEmpty(int flags) {
290         // IORING_CQE_F_SOCK_NONEMPTY is used for accept since IORING_ACCEPT_DONTWAIT was added.
291         // See https://github.com/axboe/liburing/wiki/What's-new-with-io_uring-in-6.10
292         return IoUring.isAcceptNoWaitSupported() &&
293                 IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
294     }
295 
296     @Override
297     boolean isPollInFirst() {
298         return false;
299     }
300 }
301