1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelMetadata;
21 import io.netty.channel.ChannelOutboundBuffer;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.ChannelPromise;
24 import io.netty.channel.IoRegistration;
25 import io.netty.channel.ServerChannel;
26 import io.netty.channel.unix.Buffer;
27 import io.netty.channel.unix.Errors;
28
29 import java.net.SocketAddress;
30 import java.nio.ByteBuffer;
31
32 import static io.netty.channel.unix.Errors.ERRNO_EAGAIN_NEGATIVE;
33 import static io.netty.channel.unix.Errors.ERRNO_EWOULDBLOCK_NEGATIVE;
34
35 abstract class AbstractIoUringServerChannel extends AbstractIoUringChannel implements ServerChannel {
36 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
37
38 private static final class AcceptedAddressMemory {
39 private final ByteBuffer acceptedAddressMemory;
40 private final ByteBuffer acceptedAddressLengthMemory;
41 private final long acceptedAddressMemoryAddress;
42 private final long acceptedAddressLengthMemoryAddress;
43
44 AcceptedAddressMemory() {
45 acceptedAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
46 acceptedAddressMemoryAddress = Buffer.memoryAddress(acceptedAddressMemory);
47 acceptedAddressLengthMemory = Buffer.allocateDirectWithNativeOrder(Long.BYTES);
48
49
50 acceptedAddressLengthMemory.putLong(0, Native.SIZEOF_SOCKADDR_STORAGE);
51 acceptedAddressLengthMemoryAddress = Buffer.memoryAddress(acceptedAddressLengthMemory);
52 }
53
54 void free() {
55 Buffer.free(acceptedAddressMemory);
56 Buffer.free(acceptedAddressLengthMemory);
57 }
58 }
59 private final AcceptedAddressMemory acceptedAddressMemory;
60 private long acceptId;
61
62 protected AbstractIoUringServerChannel(LinuxSocket socket, boolean active) {
63 super(null, socket, active);
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 if (IoUring.isAcceptMultishotSupported()) {
81 acceptedAddressMemory = null;
82 } else {
83 acceptedAddressMemory = new AcceptedAddressMemory();
84 }
85 }
86
87 @Override
88 public final ChannelMetadata metadata() {
89 return METADATA;
90 }
91
92 @Override
93 protected final void doClose() throws Exception {
94 super.doClose();
95 }
96
97 @Override
98 protected final AbstractUringUnsafe newUnsafe() {
99 return new UringServerChannelUnsafe();
100 }
101
102 @Override
103 protected final void doWrite(ChannelOutboundBuffer in) {
104 throw new UnsupportedOperationException();
105 }
106
107 @Override
108 protected final void cancelOutstandingReads(IoRegistration registration, int numOutstandingReads) {
109 if (acceptId != 0) {
110 assert numOutstandingReads == 1 || numOutstandingReads == -1;
111 IoUringIoOps ops = IoUringIoOps.newAsyncCancel(flags((byte) 0), acceptId, Native.IORING_OP_ACCEPT);
112 registration.submit(ops);
113 acceptId = 0;
114 } else {
115 assert numOutstandingReads == 0 || numOutstandingReads == -1;
116 }
117 }
118
119 @Override
120 protected final void cancelOutstandingWrites(IoRegistration registration, int numOutstandingWrites) {
121 assert numOutstandingWrites == 0;
122 }
123
124 abstract Channel newChildChannel(
125 int fd, long acceptedAddressMemoryAddress, long acceptedAddressLengthMemoryAddress) throws Exception;
126
127 private final class UringServerChannelUnsafe extends AbstractIoUringChannel.AbstractUringUnsafe {
128
129 @Override
130 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
131 throw new UnsupportedOperationException();
132 }
133
134 @Override
135 protected int scheduleWriteSingle(Object msg) {
136 throw new UnsupportedOperationException();
137 }
138
139 @Override
140 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
141 throw new UnsupportedOperationException();
142 }
143
144 @Override
145 protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
146 assert acceptId == 0;
147 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
148 allocHandle.attemptedBytesRead(1);
149
150 int fd = fd().intValue();
151 IoRegistration registration = registration();
152
153
154
155
156
157
158
159
160
161
162
163 short ioPrio;
164
165
166
167 if (IoUring.isAcceptNoWaitSupported()) {
168 if (first) {
169 ioPrio = socketIsEmpty ? Native.IORING_ACCEPT_POLL_FIRST : 0;
170 } else {
171 ioPrio = Native.IORING_ACCEPT_DONTWAIT;
172 }
173 } else {
174 ioPrio = 0;
175 }
176
177 final long acceptedAddressMemoryAddress;
178 final long acceptedAddressLengthMemoryAddress;
179 if (IoUring.isAcceptMultishotSupported()) {
180
181 ioPrio |= Native.IORING_ACCEPT_MULTISHOT;
182 acceptedAddressMemoryAddress = 0;
183 acceptedAddressLengthMemoryAddress = 0;
184 } else {
185 assert acceptedAddressMemory != null;
186 acceptedAddressMemoryAddress = acceptedAddressMemory.acceptedAddressMemoryAddress;
187 acceptedAddressLengthMemoryAddress = acceptedAddressMemory.acceptedAddressLengthMemoryAddress;
188 }
189
190
191 IoUringIoOps ops = IoUringIoOps.newAccept(fd, flags((byte) 0), 0, ioPrio,
192 acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, nextOpsId());
193 acceptId = registration.submit(ops);
194 if (acceptId == 0) {
195 return 0;
196 }
197 if ((ioPrio & Native.IORING_ACCEPT_MULTISHOT) != 0) {
198
199 return -1;
200 }
201 return 1;
202 }
203
204 @Override
205 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
206 if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
207 acceptId = 0;
208 return;
209 }
210 assert acceptId != 0;
211 boolean rearm = (flags & Native.IORING_CQE_F_MORE) == 0;
212 if (rearm) {
213
214 acceptId = 0;
215 }
216 final IoUringRecvByteAllocatorHandle allocHandle =
217 (IoUringRecvByteAllocatorHandle) unsafe()
218 .recvBufAllocHandle();
219 final ChannelPipeline pipeline = pipeline();
220 allocHandle.lastBytesRead(res);
221
222 if (res >= 0) {
223 allocHandle.incMessagesRead(1);
224 final long acceptedAddressMemoryAddress;
225 final long acceptedAddressLengthMemoryAddress;
226 if (acceptedAddressMemory == null) {
227 acceptedAddressMemoryAddress = 0;
228 acceptedAddressLengthMemoryAddress = 0;
229 } else {
230 acceptedAddressMemoryAddress = acceptedAddressMemory.acceptedAddressMemoryAddress;
231 acceptedAddressLengthMemoryAddress = acceptedAddressMemory.acceptedAddressLengthMemoryAddress;
232 }
233 try {
234 Channel channel = newChildChannel(
235 res, acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress);
236 pipeline.fireChannelRead(channel);
237
238 if (allocHandle.continueReading() &&
239
240
241
242
243
244
245 !socketIsEmpty(flags)) {
246 if (rearm) {
247
248
249 scheduleRead(false);
250 }
251 } else {
252 allocHandle.readComplete();
253 pipeline.fireChannelReadComplete();
254 }
255 } catch (Throwable cause) {
256 allocHandle.readComplete();
257 pipeline.fireChannelReadComplete();
258 pipeline.fireExceptionCaught(cause);
259 }
260 } else {
261 allocHandle.readComplete();
262 pipeline.fireChannelReadComplete();
263
264 if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) {
265
266 pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res));
267 }
268 }
269 }
270
271 @Override
272 public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
273 final ChannelPromise promise) {
274 promise.setFailure(new UnsupportedOperationException());
275 }
276
277 @Override
278 protected void freeResourcesNow(IoRegistration reg) {
279 super.freeResourcesNow(reg);
280 if (acceptedAddressMemory != null) {
281 acceptedAddressMemory.free();
282 }
283 }
284 }
285
286 @Override
287 protected boolean socketIsEmpty(int flags) {
288
289
290 return IoUring.isAcceptNoWaitSupported() &&
291 IoUring.isCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
292 }
293
294 @Override
295 boolean isPollInFirst() {
296 return false;
297 }
298 }
299