1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelMetadata;
21 import io.netty.channel.ChannelOutboundBuffer;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.ChannelPromise;
24 import io.netty.channel.IoRegistration;
25 import io.netty.channel.ServerChannel;
26 import io.netty.channel.unix.Buffer;
27 import io.netty.channel.unix.Errors;
28
29 import java.net.SocketAddress;
30 import java.nio.ByteBuffer;
31
32 import static io.netty.channel.unix.Errors.ERRNO_EAGAIN_NEGATIVE;
33 import static io.netty.channel.unix.Errors.ERRNO_EWOULDBLOCK_NEGATIVE;
34
35 abstract class AbstractIoUringServerChannel extends AbstractIoUringChannel implements ServerChannel {
36 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
37
38 private static final class AcceptedAddressMemory {
39 private final ByteBuffer acceptedAddressMemory;
40 private final ByteBuffer acceptedAddressLengthMemory;
41 private final long acceptedAddressMemoryAddress;
42 private final long acceptedAddressLengthMemoryAddress;
43
44 AcceptedAddressMemory() {
45 acceptedAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
46 acceptedAddressMemoryAddress = Buffer.memoryAddress(acceptedAddressMemory);
47 acceptedAddressLengthMemory = Buffer.allocateDirectWithNativeOrder(Long.BYTES);
48
49
50 acceptedAddressLengthMemory.putLong(0, Native.SIZEOF_SOCKADDR_STORAGE);
51 acceptedAddressLengthMemoryAddress = Buffer.memoryAddress(acceptedAddressLengthMemory);
52 }
53
54 void free() {
55 Buffer.free(acceptedAddressMemory);
56 Buffer.free(acceptedAddressLengthMemory);
57 }
58 }
59 private final AcceptedAddressMemory acceptedAddressMemory;
60 private long acceptId;
61
62 protected AbstractIoUringServerChannel(LinuxSocket socket, boolean active) {
63 super(null, socket, active);
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 if (IoUring.isAcceptMultishotEnabled()) {
81 acceptedAddressMemory = null;
82 } else {
83 acceptedAddressMemory = new AcceptedAddressMemory();
84 }
85 }
86
87 @Override
88 public final ChannelMetadata metadata() {
89 return METADATA;
90 }
91
92 @Override
93 protected final void doClose() throws Exception {
94 super.doClose();
95 }
96
97 @Override
98 protected final AbstractUringUnsafe newUnsafe() {
99 return new UringServerChannelUnsafe();
100 }
101
102 @Override
103 protected final void doWrite(ChannelOutboundBuffer in) {
104 throw new UnsupportedOperationException();
105 }
106
107 @Override
108 protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
109 if (acceptId != 0) {
110 assert numOutstandingReads == 1 || numOutstandingReads == -1;
111 IoUringIoOps ops = IoUringIoOps.newAsyncCancel((byte) 0, acceptId, Native.IORING_OP_ACCEPT);
112 registration.submit(ops);
113 acceptId = 0;
114 } else {
115 assert numOutstandingReads == 0 || numOutstandingReads == -1;
116 }
117 }
118
119 @Override
120 protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
121 assert numOutstandingWrites == 0;
122 }
123
124 abstract Channel newChildChannel(
125 int fd, ByteBuffer acceptedAddressMemory) throws Exception;
126
127 private final class UringServerChannelUnsafe extends AbstractIoUringChannel.AbstractUringUnsafe {
128
129 @Override
130 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
131 throw new UnsupportedOperationException();
132 }
133
134 @Override
135 protected int scheduleWriteSingle(Object msg) {
136 throw new UnsupportedOperationException();
137 }
138
139 @Override
140 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
141 throw new UnsupportedOperationException();
142 }
143
144 @Override
145 protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
146 assert acceptId == 0;
147 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
148 allocHandle.attemptedBytesRead(1);
149
150 int fd = fd().intValue();
151 IoRegistration registration = registration();
152
153 final short ioPrio;
154
155 final long acceptedAddressMemoryAddress;
156 final long acceptedAddressLengthMemoryAddress;
157 if (IoUring.isAcceptMultishotEnabled()) {
158
159 ioPrio = Native.IORING_ACCEPT_MULTISHOT;
160 acceptedAddressMemoryAddress = 0;
161 acceptedAddressLengthMemoryAddress = 0;
162 } else {
163
164
165
166
167
168
169
170
171
172
173
174
175
176 if (IoUring.isAcceptNoWaitSupported()) {
177 if (first) {
178 ioPrio = socketIsEmpty ? Native.IORING_ACCEPT_POLL_FIRST : 0;
179 } else {
180 ioPrio = Native.IORING_ACCEPT_DONTWAIT;
181 }
182 } else {
183 ioPrio = 0;
184 }
185
186 assert acceptedAddressMemory != null;
187 acceptedAddressMemoryAddress = acceptedAddressMemory.acceptedAddressMemoryAddress;
188 acceptedAddressLengthMemoryAddress = acceptedAddressMemory.acceptedAddressLengthMemoryAddress;
189 }
190
191
192 IoUringIoOps ops = IoUringIoOps.newAccept(fd, (byte) 0, 0, ioPrio,
193 acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, nextOpsId());
194 acceptId = registration.submit(ops);
195 if (acceptId == 0) {
196 return 0;
197 }
198 if ((ioPrio & Native.IORING_ACCEPT_MULTISHOT) != 0) {
199
200 return -1;
201 }
202 return 1;
203 }
204
205 @Override
206 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
207 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
208 acceptId = 0;
209 return;
210 }
211 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
212 if (rearm) {
213
214 acceptId = 0;
215 }
216 final IoUringRecvByteAllocatorHandle allocHandle =
217 (IoUringRecvByteAllocatorHandle) unsafe()
218 .recvBufAllocHandle();
219 final ChannelPipeline pipeline = pipeline();
220 allocHandle.lastBytesRead(res);
221
222 if (res >= 0) {
223 allocHandle.incMessagesRead(1);
224 final ByteBuffer acceptedAddressBuffer;
225 final long acceptedAddressLengthMemoryAddress;
226 if (acceptedAddressMemory == null) {
227 acceptedAddressBuffer = null;
228 } else {
229 acceptedAddressBuffer = acceptedAddressMemory.acceptedAddressMemory;
230 }
231 try {
232 Channel channel = newChildChannel(res, acceptedAddressBuffer);
233 pipeline.fireChannelRead(channel);
234
235 if (allocHandle.continueReading() &&
236
237
238
239
240
241
242 !socketIsEmpty(flags)) {
243 if (rearm) {
244
245
246 scheduleRead(false);
247 }
248 } else {
249 allocHandle.readComplete();
250 pipeline.fireChannelReadComplete();
251 }
252 } catch (Throwable cause) {
253 allocHandle.readComplete();
254 pipeline.fireChannelReadComplete();
255 pipeline.fireExceptionCaught(cause);
256 }
257 } else {
258 allocHandle.readComplete();
259 pipeline.fireChannelReadComplete();
260
261 if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) {
262
263 pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res));
264 }
265 }
266 }
267
268 @Override
269 public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
270 final ChannelPromise promise) {
271 promise.setFailure(new UnsupportedOperationException());
272 }
273
274 @Override
275 protected void freeResourcesNow(IoRegistration reg) {
276 super.freeResourcesNow(reg);
277 if (acceptedAddressMemory != null) {
278 acceptedAddressMemory.free();
279 }
280 }
281 }
282
283 @Override
284 protected boolean socketIsEmpty(int flags) {
285
286
287 return IoUring.isAcceptNoWaitSupported() &&
288 IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
289 }
290
291 @Override
292 boolean isPollInFirst() {
293 return false;
294 }
295 }
296