1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelMetadata;
21 import io.netty.channel.ChannelOutboundBuffer;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.ChannelPromise;
24 import io.netty.channel.IoRegistration;
25 import io.netty.channel.ServerChannel;
26 import io.netty.channel.unix.Buffer;
27 import io.netty.channel.unix.Errors;
28 import io.netty.util.internal.CleanableDirectBuffer;
29
30 import java.net.SocketAddress;
31 import java.nio.ByteBuffer;
32
33 import static io.netty.channel.unix.Errors.ERRNO_EAGAIN_NEGATIVE;
34 import static io.netty.channel.unix.Errors.ERRNO_EWOULDBLOCK_NEGATIVE;
35
36 abstract class AbstractIoUringServerChannel extends AbstractIoUringChannel implements ServerChannel {
37 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
38
39 private static final class AcceptedAddressMemory {
40 private final CleanableDirectBuffer acceptedAddressMemoryCleanable;
41 private final ByteBuffer acceptedAddressMemory;
42 private final CleanableDirectBuffer acceptedAddressLengthMemoryCleanable;
43 private final ByteBuffer acceptedAddressLengthMemory;
44 private final long acceptedAddressMemoryAddress;
45 private final long acceptedAddressLengthMemoryAddress;
46
47 AcceptedAddressMemory() {
48 acceptedAddressMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
49 acceptedAddressMemory = acceptedAddressMemoryCleanable.buffer();
50 acceptedAddressMemoryAddress = Buffer.memoryAddress(acceptedAddressMemory);
51 acceptedAddressLengthMemoryCleanable = Buffer.allocateDirectBufferWithNativeOrder(Long.BYTES);
52 acceptedAddressLengthMemory = acceptedAddressLengthMemoryCleanable.buffer();
53
54
55 acceptedAddressLengthMemory.putLong(0, Native.SIZEOF_SOCKADDR_STORAGE);
56 acceptedAddressLengthMemoryAddress = Buffer.memoryAddress(acceptedAddressLengthMemory);
57 }
58
59 void free() {
60 acceptedAddressMemoryCleanable.clean();
61 acceptedAddressLengthMemoryCleanable.clean();
62 }
63 }
64 private final AcceptedAddressMemory acceptedAddressMemory;
65 private long acceptId;
66
67 protected AbstractIoUringServerChannel(LinuxSocket socket, boolean active) {
68 super(null, socket, active);
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 if (IoUring.isAcceptMultishotEnabled()) {
86 acceptedAddressMemory = null;
87 } else {
88 acceptedAddressMemory = new AcceptedAddressMemory();
89 }
90 }
91
92 @Override
93 public final ChannelMetadata metadata() {
94 return METADATA;
95 }
96
97 @Override
98 protected final void doClose() throws Exception {
99 super.doClose();
100 }
101
102 @Override
103 protected final AbstractUringUnsafe newUnsafe() {
104 return new UringServerChannelUnsafe();
105 }
106
107 @Override
108 protected final void doWrite(ChannelOutboundBuffer in) {
109 throw new UnsupportedOperationException();
110 }
111
112 @Override
113 protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
114 if (acceptId != 0) {
115 assert numOutstandingReads == 1 || numOutstandingReads == -1;
116 IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, acceptId, Native.IORING_OP_ACCEPT);
117 registration.submit(ops);
118 acceptId = 0;
119 } else {
120 assert numOutstandingReads == 0 || numOutstandingReads == -1;
121 }
122 }
123
124 @Override
125 protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
126 assert numOutstandingWrites == 0;
127 }
128
129 abstract Channel newChildChannel(
130 int fd, ByteBuffer acceptedAddressMemory) throws Exception;
131
132 private final class UringServerChannelUnsafe extends AbstractIoUringChannel.AbstractUringUnsafe {
133
134 @Override
135 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
136 throw new UnsupportedOperationException();
137 }
138
139 @Override
140 protected int scheduleWriteSingle(Object msg) {
141 throw new UnsupportedOperationException();
142 }
143
144 @Override
145 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
146 throw new UnsupportedOperationException();
147 }
148
149 @Override
150 protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
151 assert acceptId == 0;
152 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
153 allocHandle.attemptedBytesRead(1);
154
155 int fd = fd().intValue();
156 IoRegistration registration = registration();
157
158 final short ioPrio;
159
160 final long acceptedAddressMemoryAddress;
161 final long acceptedAddressLengthMemoryAddress;
162 if (IoUring.isAcceptMultishotEnabled()) {
163
164 ioPrio = Native.IORING_ACCEPT_MULTISHOT;
165 acceptedAddressMemoryAddress = 0;
166 acceptedAddressLengthMemoryAddress = 0;
167 } else {
168
169
170
171
172
173
174
175
176
177
178
179
180
181 if (IoUring.isAcceptNoWaitSupported()) {
182 if (first) {
183 ioPrio = socketIsEmpty ? Native.IORING_ACCEPT_POLL_FIRST : 0;
184 } else {
185 ioPrio = Native.IORING_ACCEPT_DONTWAIT;
186 }
187 } else {
188 ioPrio = 0;
189 }
190
191 assert acceptedAddressMemory != null;
192 acceptedAddressMemoryAddress = acceptedAddressMemory.acceptedAddressMemoryAddress;
193 acceptedAddressLengthMemoryAddress = acceptedAddressMemory.acceptedAddressLengthMemoryAddress;
194 }
195
196
197 IoUringIoOps ops = IoUringIoOps.newAccept(fd, (byte) 0, 0, ioPrio,
198 acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, nextOpsId());
199 acceptId = registration.submit(ops);
200 if (acceptId == 0) {
201 return 0;
202 }
203 if ((ioPrio & Native.IORING_ACCEPT_MULTISHOT) != 0) {
204
205 return -1;
206 }
207 return 1;
208 }
209
210 @Override
211 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
212 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
213 acceptId = 0;
214 return;
215 }
216 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
217 if (rearm) {
218
219 acceptId = 0;
220 }
221 final IoUringRecvByteAllocatorHandle allocHandle =
222 (IoUringRecvByteAllocatorHandle) unsafe()
223 .recvBufAllocHandle();
224 final ChannelPipeline pipeline = pipeline();
225 allocHandle.lastBytesRead(res);
226
227 if (res >= 0) {
228 allocHandle.incMessagesRead(1);
229 final ByteBuffer acceptedAddressBuffer;
230 final long acceptedAddressLengthMemoryAddress;
231 if (acceptedAddressMemory == null) {
232 acceptedAddressBuffer = null;
233 } else {
234 acceptedAddressBuffer = acceptedAddressMemory.acceptedAddressMemory;
235 }
236 try {
237 Channel channel = newChildChannel(res, acceptedAddressBuffer);
238 pipeline.fireChannelRead(channel);
239
240 if (allocHandle.continueReading() &&
241
242
243
244
245
246
247 !socketIsEmpty(flags)) {
248 if (rearm) {
249
250
251 scheduleRead(false);
252 }
253 } else {
254 allocHandle.readComplete();
255 pipeline.fireChannelReadComplete();
256 }
257 } catch (Throwable cause) {
258 allocHandle.readComplete();
259 pipeline.fireChannelReadComplete();
260 pipeline.fireExceptionCaught(cause);
261 }
262 } else {
263 allocHandle.readComplete();
264 pipeline.fireChannelReadComplete();
265
266 if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) {
267
268 pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res));
269 }
270 }
271 }
272
273 @Override
274 public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
275 final ChannelPromise promise) {
276 promise.setFailure(new UnsupportedOperationException());
277 }
278
279 @Override
280 protected void freeResourcesNow(IoRegistration reg) {
281 super.freeResourcesNow(reg);
282 if (acceptedAddressMemory != null) {
283 acceptedAddressMemory.free();
284 }
285 }
286 }
287
288 @Override
289 protected boolean socketIsEmpty(int flags) {
290
291
292 return IoUring.isAcceptNoWaitSupported() &&
293 IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
294 }
295
296 @Override
297 boolean isPollInFirst() {
298 return false;
299 }
300 }
301