View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelMetadata;
21  import io.netty.channel.ChannelOutboundBuffer;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.channel.IoRegistration;
25  import io.netty.channel.ServerChannel;
26  import io.netty.channel.unix.Buffer;
27  import io.netty.channel.unix.Errors;
28  
29  import java.net.SocketAddress;
30  import java.nio.ByteBuffer;
31  
32  import static io.netty.channel.unix.Errors.ERRNO_EAGAIN_NEGATIVE;
33  import static io.netty.channel.unix.Errors.ERRNO_EWOULDBLOCK_NEGATIVE;
34  
35  abstract class AbstractIoUringServerChannel extends AbstractIoUringChannel implements ServerChannel {
36      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
37  
38      private static final class AcceptedAddressMemory {
39          private final ByteBuffer acceptedAddressMemory;
40          private final ByteBuffer acceptedAddressLengthMemory;
41          private final long acceptedAddressMemoryAddress;
42          private final long acceptedAddressLengthMemoryAddress;
43  
44          AcceptedAddressMemory() {
45              acceptedAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
46              acceptedAddressMemoryAddress = Buffer.memoryAddress(acceptedAddressMemory);
47              acceptedAddressLengthMemory = Buffer.allocateDirectWithNativeOrder(Long.BYTES);
48              // Needs to be initialized to the size of acceptedAddressMemory.
49              // See https://man7.org/linux/man-pages/man2/accept.2.html
50              acceptedAddressLengthMemory.putLong(0, Native.SIZEOF_SOCKADDR_STORAGE);
51              acceptedAddressLengthMemoryAddress = Buffer.memoryAddress(acceptedAddressLengthMemory);
52          }
53  
54          void free() {
55              Buffer.free(acceptedAddressMemory);
56              Buffer.free(acceptedAddressLengthMemory);
57          }
58      }
59      private final AcceptedAddressMemory acceptedAddressMemory;
60      private long acceptId;
61  
62      protected AbstractIoUringServerChannel(LinuxSocket socket, boolean active) {
63          super(null, socket, active);
64  
65          // We can only depend on the accepted address if multi-shot is not used.
66          // From the manpage:
67          //       The multishot variants allow an application to issue a single
68          //       accept request, which will repeatedly trigger a CQE when a
69          //       connection request comes in. Like other multishot type requests,
70          //       the application should look at the CQE flags and see if
71          //       IORING_CQE_F_MORE is set on completion as an indication of whether
72          //       or not the accept request will generate further CQEs. Note that
73          //       for the multishot variants, setting addr and addrlen may not make
74          //       a lot of sense, as the same value would be used for every accepted
75          //       connection. This means that the data written to addr may be
76          //       overwritten by a new connection before the application has had
77          //       time to process a past connection. If the application knows that a
78          //       new connection cannot come in before a previous one has been
79          //       processed, it may be used as expected.
80          if (IoUring.isAcceptMultishotSupported()) {
81              acceptedAddressMemory = null;
82          } else {
83              acceptedAddressMemory = new AcceptedAddressMemory();
84          }
85      }
86  
87      @Override
88      public final ChannelMetadata metadata() {
89          return METADATA;
90      }
91  
92      @Override
93      protected final void doClose() throws Exception {
94          super.doClose();
95      }
96  
97      @Override
98      protected final AbstractUringUnsafe newUnsafe() {
99          return new UringServerChannelUnsafe();
100     }
101 
102     @Override
103     protected final void doWrite(ChannelOutboundBuffer in) {
104         throw new UnsupportedOperationException();
105     }
106 
107     @Override
108     protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
109         if (acceptId != 0) {
110             assert numOutstandingReads == 1 || numOutstandingReads == -1;
111             IoUringIoOps ops = IoUringIoOps.newAsyncCancel(flags((byte) 0), acceptId, Native.IORING_OP_ACCEPT);
112             registration.submit(ops);
113             acceptId = 0;
114         } else {
115             assert numOutstandingReads == 0 || numOutstandingReads == -1;
116         }
117     }
118 
119     @Override
120     protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
121         assert numOutstandingWrites == 0;
122     }
123 
124     abstract Channel newChildChannel(
125             int fd, long acceptedAddressMemoryAddress, long acceptedAddressLengthMemoryAddress) throws Exception;
126 
127     private final class UringServerChannelUnsafe extends AbstractIoUringChannel.AbstractUringUnsafe {
128 
129         @Override
130         protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
131             throw new UnsupportedOperationException();
132         }
133 
134         @Override
135         protected int scheduleWriteSingle(Object msg) {
136             throw new UnsupportedOperationException();
137         }
138 
139         @Override
140         boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
141             throw new UnsupportedOperationException();
142         }
143 
144         @Override
145         protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
146             assert acceptId == 0;
147             final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
148             allocHandle.attemptedBytesRead(1);
149 
150             int fd = fd().intValue();
151             IoRegistration registration = registration();
152 
153             // Depending on if socketIsEmpty is true we will arm the poll upfront and skip the initial transfer
154             // attempt.
155             // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#socket-state
156             //
157             // Depending on if this is the first read or not we will use Native.IORING_ACCEPT_DONT_WAIT.
158             // The idea is that if the socket is blocking we can do the first read in a blocking fashion
159             // and so not need to also register POLLIN. As we can not 100 % sure if reads after the first will
160             // be possible directly we schedule these with Native.IORING_ACCEPT_DONT_WAIT. This allows us to still be
161             // able to signal the fireChannelReadComplete() in a timely manner and be consistent with other
162             // transports.
163             short ioPrio;
164 
165             // IORING_ACCEPT_POLL_FIRST and IORING_ACCEPT_DONTWAIT were added in the same release.
166             // We need to check if its supported as otherwise providing these would result in an -EINVAL.
167             if (IoUring.isAcceptNoWaitSupported()) {
168                 if (first) {
169                     ioPrio = socketIsEmpty ? Native.IORING_ACCEPT_POLL_FIRST : 0;
170                 } else {
171                     ioPrio = Native.IORING_ACCEPT_DONTWAIT;
172                 }
173             } else {
174                 ioPrio = 0;
175             }
176 
177             final long acceptedAddressMemoryAddress;
178             final long acceptedAddressLengthMemoryAddress;
179             if (IoUring.isAcceptMultishotSupported()) {
180                 // Let's use multi-shot accept to reduce overhead.
181                 ioPrio |= Native.IORING_ACCEPT_MULTISHOT;
182                 acceptedAddressMemoryAddress = 0;
183                 acceptedAddressLengthMemoryAddress = 0;
184             } else {
185                 assert acceptedAddressMemory != null;
186                 acceptedAddressMemoryAddress = acceptedAddressMemory.acceptedAddressMemoryAddress;
187                 acceptedAddressLengthMemoryAddress = acceptedAddressMemory.acceptedAddressLengthMemoryAddress;
188             }
189 
190             // See https://github.com/axboe/liburing/wiki/What's-new-with-io_uring-in-6.10#improvements-for-accept
191             IoUringIoOps ops = IoUringIoOps.newAccept(fd, flags((byte) 0), 0, ioPrio,
192                     acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, nextOpsId());
193             acceptId = registration.submit(ops);
194             if (acceptId == 0) {
195                 return 0;
196             }
197             if ((ioPrio & Native.IORING_ACCEPT_MULTISHOT) != 0) {
198                 // Let's return -1 to signal that we used multi-shot.
199                 return -1;
200             }
201             return 1;
202         }
203 
204         @Override
205         protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
206             if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
207                 acceptId = 0;
208                 return;
209             }
210             assert acceptId != 0;
211             boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
212             if (rearm) {
213                 // Only reset if we don't use multi-shot or we need to re-arm because the multi-shot was cancelled.
214                 acceptId = 0;
215             }
216             final IoUringRecvByteAllocatorHandle allocHandle =
217                     (IoUringRecvByteAllocatorHandle) unsafe()
218                             .recvBufAllocHandle();
219             final ChannelPipeline pipeline = pipeline();
220             allocHandle.lastBytesRead(res);
221 
222             if (res >= 0) {
223                 allocHandle.incMessagesRead(1);
224                 final long acceptedAddressMemoryAddress;
225                 final long acceptedAddressLengthMemoryAddress;
226                 if (acceptedAddressMemory == null) {
227                     acceptedAddressMemoryAddress = 0;
228                     acceptedAddressLengthMemoryAddress = 0;
229                 } else {
230                     acceptedAddressMemoryAddress = acceptedAddressMemory.acceptedAddressMemoryAddress;
231                     acceptedAddressLengthMemoryAddress = acceptedAddressMemory.acceptedAddressLengthMemoryAddress;
232                 }
233                 try {
234                     Channel channel = newChildChannel(
235                             res, acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress);
236                     pipeline.fireChannelRead(channel);
237 
238                     if (allocHandle.continueReading() &&
239                             // If IORING_CQE_F_SOCK_NONEMPTY is supported we should check for it first before
240                             // trying to schedule a read. If it's supported and not part of the flags we know for sure
241                             // that the next read (which would be using Native.IORING_ACCEPT_DONTWAIT) will complete
242                             // without be able to read any data. This is useless work and we can skip it.
243                             //
244                             // See https://github.com/axboe/liburing/wiki/What's-new-with-io_uring-in-6.10
245                             !socketIsEmpty(flags)) {
246                         if (rearm) {
247                             // We only should schedule another read if we need to rearm.
248                             // See https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023#multi-shot
249                             scheduleRead(false);
250                         }
251                     } else {
252                         allocHandle.readComplete();
253                         pipeline.fireChannelReadComplete();
254                     }
255                 } catch (Throwable cause) {
256                     allocHandle.readComplete();
257                     pipeline.fireChannelReadComplete();
258                     pipeline.fireExceptionCaught(cause);
259                 }
260             } else {
261                 allocHandle.readComplete();
262                 pipeline.fireChannelReadComplete();
263                 // Check if we did fail because there was nothing to accept atm.
264                 if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) {
265                     // Something bad happened. Convert to an exception.
266                     pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res));
267                 }
268             }
269         }
270 
271         @Override
272         public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
273                             final ChannelPromise promise) {
274             promise.setFailure(new UnsupportedOperationException());
275         }
276 
277         @Override
278         protected void freeResourcesNow(IoRegistration reg) {
279             super.freeResourcesNow(reg);
280             if (acceptedAddressMemory != null) {
281                 acceptedAddressMemory.free();
282             }
283         }
284     }
285 
286     @Override
287     protected boolean socketIsEmpty(int flags) {
288         // IORING_CQE_F_SOCK_NONEMPTY is used for accept since IORING_ACCEPT_DONTWAIT was added.
289         // See https://github.com/axboe/liburing/wiki/What's-new-with-io_uring-in-6.10
290         return IoUring.isAcceptNoWaitSupported() &&
291                 IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
292     }
293 
294     @Override
295     boolean isPollInFirst() {
296         return false;
297     }
298 }
299