View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelMetadata;
21  import io.netty.channel.ChannelOutboundBuffer;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.channel.IoRegistration;
25  import io.netty.channel.ServerChannel;
26  import io.netty.channel.unix.Buffer;
27  import io.netty.channel.unix.Errors;
28  
29  import java.net.SocketAddress;
30  import java.nio.ByteBuffer;
31  
32  import static io.netty.channel.unix.Errors.ERRNO_EAGAIN_NEGATIVE;
33  import static io.netty.channel.unix.Errors.ERRNO_EWOULDBLOCK_NEGATIVE;
34  
35  abstract class AbstractIoUringServerChannel extends AbstractIoUringChannel implements ServerChannel {
36      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
37  
38      private static final class AcceptedAddressMemory {
39          private final ByteBuffer acceptedAddressMemory;
40          private final ByteBuffer acceptedAddressLengthMemory;
41          private final long acceptedAddressMemoryAddress;
42          private final long acceptedAddressLengthMemoryAddress;
43  
44          AcceptedAddressMemory() {
45              acceptedAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
46              acceptedAddressMemoryAddress = Buffer.memoryAddress(acceptedAddressMemory);
47              acceptedAddressLengthMemory = Buffer.allocateDirectWithNativeOrder(Long.BYTES);
48              // Needs to be initialized to the size of acceptedAddressMemory.
49              // See https://man7.org/linux/man-pages/man2/accept.2.html
50              acceptedAddressLengthMemory.putLong(0, Native.SIZEOF_SOCKADDR_STORAGE);
51              acceptedAddressLengthMemoryAddress = Buffer.memoryAddress(acceptedAddressLengthMemory);
52          }
53  
54          void free() {
55              Buffer.free(acceptedAddressMemory);
56              Buffer.free(acceptedAddressLengthMemory);
57          }
58      }
59      private final AcceptedAddressMemory acceptedAddressMemory;
60      private long acceptId;
61  
62      protected AbstractIoUringServerChannel(LinuxSocket socket, boolean active) {
63          super(null, socket, active);
64  
65          // We can only depend on the accepted address if multi-shot is not used.
66          // From the manpage:
67          //       The multishot variants allow an application to issue a single
68          //       accept request, which will repeatedly trigger a CQE when a
69          //       connection request comes in. Like other multishot type requests,
70          //       the application should look at the CQE flags and see if
71          //       IORING_CQE_F_MORE is set on completion as an indication of whether
72          //       or not the accept request will generate further CQEs. Note that
73          //       for the multishot variants, setting addr and addrlen may not make
74          //       a lot of sense, as the same value would be used for every accepted
75          //       connection. This means that the data written to addr may be
76          //       overwritten by a new connection before the application has had
77          //       time to process a past connection. If the application knows that a
78          //       new connection cannot come in before a previous one has been
79          //       processed, it may be used as expected.
80          if (IoUring.isAcceptMultishotEnabled()) {
81              acceptedAddressMemory = null;
82          } else {
83              acceptedAddressMemory = new AcceptedAddressMemory();
84          }
85      }
86  
87      @Override
88      public final ChannelMetadata metadata() {
89          return METADATA;
90      }
91  
92      @Override
93      protected final void doClose() throws Exception {
94          super.doClose();
95      }
96  
97      @Override
98      protected final AbstractUringUnsafe newUnsafe() {
99          return new UringServerChannelUnsafe();
100     }
101 
102     @Override
103     protected final void doWrite(ChannelOutboundBuffer in) {
104         throw new UnsupportedOperationException();
105     }
106 
107     @Override
108     protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
109         if (acceptId != 0) {
110             assert numOutstandingReads == 1 || numOutstandingReads == -1;
111             IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, acceptId, Native.IORING_OP_ACCEPT);
112             registration.submit(ops);
113             acceptId = 0;
114         } else {
115             assert numOutstandingReads == 0 || numOutstandingReads == -1;
116         }
117     }
118 
119     @Override
120     protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
121         assert numOutstandingWrites == 0;
122     }
123 
124     abstract Channel newChildChannel(
125             int fd, ByteBuffer acceptedAddressMemory) throws Exception;
126 
127     private final class UringServerChannelUnsafe extends AbstractIoUringChannel.AbstractUringUnsafe {
128 
129         @Override
130         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
131             throw new UnsupportedOperationException();
132         }
133 
134         @Override
135         protected int scheduleWriteSingle(Object msg) {
136             throw new UnsupportedOperationException();
137         }
138 
139         @Override
140         boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
141             throw new UnsupportedOperationException();
142         }
143 
144         @Override
145         protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
146             assert acceptId == 0;
147             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
148             allocHandle.attemptedBytesRead(1);
149 
150             int fd = fd().intValue();
151             IoRegistration registration = registration();
152 
153             final short ioPrio;
154 
155             final long acceptedAddressMemoryAddress;
156             final long acceptedAddressLengthMemoryAddress;
157             if (IoUring.isAcceptMultishotEnabled()) {
158                 // Let's use multi-shot accept to reduce overhead.
159                 ioPrio = Native.IORING_ACCEPT_MULTISHOT;
160                 acceptedAddressMemoryAddress = 0;
161                 acceptedAddressLengthMemoryAddress = 0;
162             } else {
163                 // Depending on if socketIsEmpty is true we will arm the poll upfront and skip the initial transfer
164                 // attempt.
165                 // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#socket-state
166                 //
167                 // Depending on if this is the first read or not we will use Native.IORING_ACCEPT_DONT_WAIT.
168                 // The idea is that if the socket is blocking we can do the first read in a blocking fashion
169                 // and so not need to also register POLLIN. As we can not 100 % sure if reads after the first will
170                 // be possible directly we schedule these with Native.IORING_ACCEPT_DONT_WAIT. This allows us to still
171                 // be able to signal the fireChannelReadComplete() in a timely manner and be consistent with other
172                 // transports.
173                 //
174                 // IORING_ACCEPT_POLL_FIRST and IORING_ACCEPT_DONTWAIT were added in the same release.
175                 // We need to check if its supported as otherwise providing these would result in an -EINVAL.
176                 if (IoUring.isAcceptNoWaitSupported()) {
177                     if (first) {
178                         ioPrio = socketIsEmpty ? Native.IORING_ACCEPT_POLL_FIRST : 0;
179                     } else {
180                         ioPrio = Native.IORING_ACCEPT_DONTWAIT;
181                     }
182                 } else {
183                     ioPrio = 0;
184                 }
185 
186                 assert acceptedAddressMemory != null;
187                 acceptedAddressMemoryAddress = acceptedAddressMemory.acceptedAddressMemoryAddress;
188                 acceptedAddressLengthMemoryAddress = acceptedAddressMemory.acceptedAddressLengthMemoryAddress;
189             }
190 
191             // See https://github.com/axboe/liburing/wiki/What's-new-with-io_uring-in-6.10#improvements-for-accept
192             IoUringIoOps ops = IoUringIoOps.newAccept(fd, (byte) 0, 0, ioPrio,
193                     acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, nextOpsId());
194             acceptId = registration.submit(ops);
195             if (acceptId == 0) {
196                 return 0;
197             }
198             if ((ioPrio & Native.IORING_ACCEPT_MULTISHOT) != 0) {
199                 // Let's return -1 to signal that we used multi-shot.
200                 return -1;
201             }
202             return 1;
203         }
204 
205         @Override
206         protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
207             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
208                 acceptId = 0;
209                 return;
210             }
211             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
212             if (rearm) {
213                 // Only reset if we don't use multi-shot or we need to re-arm because the multi-shot was cancelled.
214                 acceptId = 0;
215             }
216             final IoUringRecvByteAllocatorHandle allocHandle =
217                     (IoUringRecvByteAllocatorHandle) unsafe()
218                             .recvBufAllocHandle();
219             final ChannelPipeline pipeline = pipeline();
220             allocHandle.lastBytesRead(res);
221 
222             if (res >= 0) {
223                 allocHandle.incMessagesRead(1);
224                 final ByteBuffer acceptedAddressBuffer;
225                 final long acceptedAddressLengthMemoryAddress;
226                 if (acceptedAddressMemory == null) {
227                     acceptedAddressBuffer = null;
228                 } else {
229                     acceptedAddressBuffer = acceptedAddressMemory.acceptedAddressMemory;
230                 }
231                 try {
232                     Channel channel = newChildChannel(res, acceptedAddressBuffer);
233                     pipeline.fireChannelRead(channel);
234 
235                     if (allocHandle.continueReading() &&
236                             // If IORING_CQE_F_SOCK_NONEMPTY is supported we should check for it first before
237                             // trying to schedule a read. If it's supported and not part of the flags we know for sure
238                             // that the next read (which would be using Native.IORING_ACCEPT_DONTWAIT) will complete
239                             // without be able to read any data. This is useless work and we can skip it.
240                             //
241                             // See https://github.com/axboe/liburing/wiki/What's-new-with-io_uring-in-6.10
242                             !socketIsEmpty(flags)) {
243                         if (rearm) {
244                             // We only should schedule another read if we need to rearm.
245                             // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#multi-shot
246                             scheduleRead(false);
247                         }
248                     } else {
249                         allocHandle.readComplete();
250                         pipeline.fireChannelReadComplete();
251                     }
252                 } catch (Throwable cause) {
253                     allocHandle.readComplete();
254                     pipeline.fireChannelReadComplete();
255                     pipeline.fireExceptionCaught(cause);
256                 }
257             } else {
258                 allocHandle.readComplete();
259                 pipeline.fireChannelReadComplete();
260                 // Check if we did fail because there was nothing to accept atm.
261                 if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) {
262                     // Something bad happened. Convert to an exception.
263                     pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res));
264                 }
265             }
266         }
267 
268         @Override
269         public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
270                             final ChannelPromise promise) {
271             promise.setFailure(new UnsupportedOperationException());
272         }
273 
274         @Override
275         protected void freeResourcesNow(IoRegistration reg) {
276             super.freeResourcesNow(reg);
277             if (acceptedAddressMemory != null) {
278                 acceptedAddressMemory.free();
279             }
280         }
281     }
282 
283     @Override
284     protected boolean socketIsEmpty(int flags) {
285         // IORING_CQE_F_SOCK_NONEMPTY is used for accept since IORING_ACCEPT_DONTWAIT was added.
286         // See https://github.com/axboe/liburing/wiki/What's-new-with-io_uring-in-6.10
287         return IoUring.isAcceptNoWaitSupported() &&
288                 IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
289     }
290 
291     @Override
292     boolean isPollInFirst() {
293         return false;
294     }
295 }
296