1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelMetadata;
21 import io.netty.channel.ChannelOutboundBuffer;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.ChannelPromise;
24 import io.netty.channel.IoRegistration;
25 import io.netty.channel.ServerChannel;
26 import io.netty.channel.unix.Buffer;
27 import io.netty.channel.unix.Errors;
28 import io.netty.util.internal.CleanableDirectBuffer;
29
30 import java.net.SocketAddress;
31 import java.nio.ByteBuffer;
32
33 import static io.netty.channel.unix.Errors.ERRNO_EAGAIN_NEGATIVE;
34 import static io.netty.channel.unix.Errors.ERRNO_EWOULDBLOCK_NEGATIVE;
35
36 abstract class AbstractIoUringServerChannel extends AbstractIoUringChannel implements ServerChannel {
37 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
38
39 private static final class AcceptedAddressMemory {
40 private final CleanableDirectBuffer acceptedAddressMemoryCleanable;
41 private final ByteBuffer acceptedAddressMemory;
42 private final CleanableDirectBuffer acceptedAddressLengthMemoryCleanable;
43 private final ByteBuffer acceptedAddressLengthMemory;
44 private final long acceptedAddressMemoryAddress;
45 private final long acceptedAddressLengthMemoryAddress;
46
47 AcceptedAddressMemory() {
48 acceptedAddressMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
49 acceptedAddressMemory = acceptedAddressMemoryCleanable.buffer();
50 acceptedAddressMemoryAddress = Buffer.memoryAddress(acceptedAddressMemory);
51 acceptedAddressLengthMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(Long.BYTES);
52 acceptedAddressLengthMemory = acceptedAddressLengthMemoryCleanable.buffer();
53
54
55 acceptedAddressLengthMemory.putLong(0, Native.SIZEOF_SOCKADDR_STORAGE);
56 acceptedAddressLengthMemoryAddress = Buffer.memoryAddress(acceptedAddressLengthMemory);
57 }
58
59 void free() {
60 acceptedAddressMemoryCleanable.clean();
61 acceptedAddressLengthMemoryCleanable.clean();
62 }
63 }
64 private final AcceptedAddressMemory acceptedAddressMemory;
65 private long acceptId;
66
67 protected AbstractIoUringServerChannel(LinuxSocket socket, boolean active) {
68 super(null, socket, active);
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 if (IoUring.isAcceptMultishotEnabled()) {
86 acceptedAddressMemory = null;
87 } else {
88 acceptedAddressMemory = new AcceptedAddressMemory();
89 }
90 }
91
92 @Override
93 protected final boolean isStreamSocket() {
94 return true;
95 }
96
97 @Override
98 public final ChannelMetadata metadata() {
99 return METADATA;
100 }
101
102 @Override
103 protected final void doClose() throws Exception {
104 super.doClose();
105 }
106
107 @Override
108 protected final AbstractUringUnsafe newUnsafe() {
109 return new UringServerChannelUnsafe();
110 }
111
112 @Override
113 protected final void doWrite(ChannelOutboundBuffer in) {
114 throw new UnsupportedOperationException();
115 }
116
117 @Override
118 protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
119 if (acceptId != 0) {
120 assert numOutstandingReads == 1 || numOutstandingReads == -1;
121 IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, acceptId, Native.IORING_OP_ACCEPT);
122 registration.submit(ops);
123 acceptId = 0;
124 } else {
125 assert numOutstandingReads == 0 || numOutstandingReads == -1;
126 }
127 }
128
129 @Override
130 protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
131 assert numOutstandingWrites == 0;
132 }
133
134 abstract Channel newChildChannel(
135 int fd, ByteBuffer acceptedAddressMemory) throws Exception;
136
137 private final class UringServerChannelUnsafe extends AbstractIoUringChannel.AbstractUringUnsafe {
138
139 @Override
140 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
141 throw new UnsupportedOperationException();
142 }
143
144 @Override
145 protected int scheduleWriteSingle(Object msg) {
146 throw new UnsupportedOperationException();
147 }
148
149 @Override
150 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
151 throw new UnsupportedOperationException();
152 }
153
154 @Override
155 protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
156 assert acceptId == 0;
157 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
158 allocHandle.attemptedBytesRead(1);
159
160 int fd = fd().intValue();
161 IoRegistration registration = registration();
162
163 final short ioPrio;
164
165 final long acceptedAddressMemoryAddress;
166 final long acceptedAddressLengthMemoryAddress;
167 if (IoUring.isAcceptMultishotEnabled()) {
168
169 ioPrio = Native.IORING_ACCEPT_MULTISHOT;
170 acceptedAddressMemoryAddress = 0;
171 acceptedAddressLengthMemoryAddress = 0;
172 } else {
173
174
175
176
177
178
179
180
181
182
183
184
185
186 if (IoUring.isAcceptNoWaitSupported()) {
187 if (first) {
188 ioPrio = socketIsEmpty ? Native.IORING_ACCEPT_POLL_FIRST : 0;
189 } else {
190 ioPrio = Native.IORING_ACCEPT_DONTWAIT;
191 }
192 } else {
193 ioPrio = 0;
194 }
195
196 assert acceptedAddressMemory != null;
197 acceptedAddressMemoryAddress = acceptedAddressMemory.acceptedAddressMemoryAddress;
198 acceptedAddressLengthMemoryAddress = acceptedAddressMemory.acceptedAddressLengthMemoryAddress;
199 }
200
201
202 IoUringIoOps ops = IoUringIoOps.newAccept(fd, (byte) 0, 0, ioPrio,
203 acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, nextOpsId());
204 acceptId = registration.submit(ops);
205 if (acceptId == 0) {
206 return 0;
207 }
208 if ((ioPrio & Native.IORING_ACCEPT_MULTISHOT) != 0) {
209
210 return -1;
211 }
212 return 1;
213 }
214
215 @Override
216 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
217 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
218 acceptId = 0;
219 return;
220 }
221 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
222 if (rearm) {
223
224 acceptId = 0;
225 }
226 final IoUringRecvByteAllocatorHandle allocHandle =
227 (IoUringRecvByteAllocatorHandle) unsafe()
228 .recvBufAllocHandle();
229 final ChannelPipeline pipeline = pipeline();
230 allocHandle.lastBytesRead(res);
231
232 if (res >= 0) {
233 allocHandle.incMessagesRead(1);
234 final ByteBuffer acceptedAddressBuffer;
235 final long acceptedAddressLengthMemoryAddress;
236 if (acceptedAddressMemory == null) {
237 acceptedAddressBuffer = null;
238 } else {
239 acceptedAddressBuffer = acceptedAddressMemory.acceptedAddressMemory;
240 }
241 try {
242 Channel channel = newChildChannel(res, acceptedAddressBuffer);
243 pipeline.fireChannelRead(channel);
244
245 if (allocHandle.continueReading() &&
246
247
248
249
250
251
252 !socketIsEmpty(flags)) {
253 if (rearm) {
254
255
256 scheduleRead(false);
257 }
258 } else {
259 allocHandle.readComplete();
260 pipeline.fireChannelReadComplete();
261 }
262 } catch (Throwable cause) {
263 allocHandle.readComplete();
264 pipeline.fireChannelReadComplete();
265 pipeline.fireExceptionCaught(cause);
266 }
267 } else {
268 allocHandle.readComplete();
269 pipeline.fireChannelReadComplete();
270
271 if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) {
272
273 pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res));
274 }
275 }
276 }
277
278 @Override
279 public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
280 final ChannelPromise promise) {
281 promise.setFailure(new UnsupportedOperationException());
282 }
283
284 @Override
285 public void unregistered() {
286 super.unregistered();
287 if (acceptedAddressMemory != null) {
288 acceptedAddressMemory.free();
289 }
290 }
291 }
292
293 @Override
294 protected boolean socketIsEmpty(int flags) {
295
296
297 return IoUring.isAcceptNoWaitSupported() &&
298 IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
299 }
300
301 @Override
302 boolean isPollInFirst() {
303 return false;
304 }
305 }
306