1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelMetadata;
21 import io.netty.channel.ChannelOutboundBuffer;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.ChannelPromise;
24 import io.netty.channel.ServerChannel;
25 import io.netty.channel.unix.Buffer;
26 import io.netty.channel.unix.Errors;
27
28 import java.net.SocketAddress;
29 import java.nio.ByteBuffer;
30
31 import static io.netty.channel.unix.Errors.ERRNO_EAGAIN_NEGATIVE;
32 import static io.netty.channel.unix.Errors.ERRNO_EWOULDBLOCK_NEGATIVE;
33
34 abstract class AbstractIoUringServerChannel extends AbstractIoUringChannel implements ServerChannel {
35 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
36
37 private final ByteBuffer acceptedAddressMemory;
38 private final ByteBuffer acceptedAddressLengthMemory;
39
40 private final long acceptedAddressMemoryAddress;
41 private final long acceptedAddressLengthMemoryAddress;
42
43 private long acceptId;
44
45 protected AbstractIoUringServerChannel(LinuxSocket socket, boolean active) {
46 super(null, socket, active);
47
48 acceptedAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
49 acceptedAddressMemoryAddress = Buffer.memoryAddress(acceptedAddressMemory);
50 acceptedAddressLengthMemory = Buffer.allocateDirectWithNativeOrder(Long.BYTES);
51
52
53 acceptedAddressLengthMemory.putLong(0, Native.SIZEOF_SOCKADDR_STORAGE);
54 acceptedAddressLengthMemoryAddress = Buffer.memoryAddress(acceptedAddressLengthMemory);
55 }
56
57 @Override
58 public final ChannelMetadata metadata() {
59 return METADATA;
60 }
61
62 @Override
63 protected final void doClose() throws Exception {
64 super.doClose();
65 }
66
67 @Override
68 protected final AbstractUringUnsafe newUnsafe() {
69 return new UringServerChannelUnsafe();
70 }
71
72 @Override
73 protected final void doWrite(ChannelOutboundBuffer in) {
74 throw new UnsupportedOperationException();
75 }
76
77 @Override
78 protected final void cancelOutstandingReads(IoUringIoRegistration registration, int numOutstandingReads) {
79 if (acceptId != 0) {
80 assert numOutstandingReads == 1;
81 int fd = fd().intValue();
82 IoUringIoOps ops = IoUringIoOps.newAsyncCancel(
83 fd, flags((byte) 0), acceptId, Native.IORING_OP_ACCEPT);
84 registration.submit(ops);
85 } else {
86 assert numOutstandingReads == 0;
87 }
88 }
89
90 @Override
91 protected final void cancelOutstandingWrites(IoUringIoRegistration registration, int numOutstandingWrites) {
92 assert numOutstandingWrites == 0;
93 }
94
95 abstract Channel newChildChannel(
96 int fd, long acceptedAddressMemoryAddress, long acceptedAddressLengthMemoryAddress) throws Exception;
97
98 private final class UringServerChannelUnsafe extends AbstractIoUringChannel.AbstractUringUnsafe {
99
100 @Override
101 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
102 throw new UnsupportedOperationException();
103 }
104
105 @Override
106 protected int scheduleWriteSingle(Object msg) {
107 throw new UnsupportedOperationException();
108 }
109
110 @Override
111 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
112 throw new UnsupportedOperationException();
113 }
114
115 @Override
116 protected int scheduleRead0(boolean first, boolean socketIsEmpty) {
117 assert acceptId == 0;
118 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
119 allocHandle.attemptedBytesRead(1);
120
121 int fd = fd().intValue();
122 IoUringIoRegistration registration = registration();
123
124
125
126
127
128
129
130
131
132
133
134 final short ioPrio;
135
136
137
138 if (IoUring.isIOUringAcceptNoWaitSupported()) {
139 if (first) {
140 ioPrio = socketIsEmpty ? Native.IORING_ACCEPT_POLL_FIRST : 0;
141 } else {
142 ioPrio = Native.IORING_ACCEPT_DONTWAIT;
143 }
144 } else {
145 ioPrio = 0;
146 }
147
148 IoUringIoOps ops = IoUringIoOps.newAccept(fd, flags((byte) 0), 0, ioPrio,
149 acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, nextOpsId());
150 acceptId = registration.submit(ops);
151 if (acceptId == 0) {
152 return 0;
153 }
154 return 1;
155 }
156
157 @Override
158 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
159 assert acceptId != 0;
160 acceptId = 0;
161 final IoUringRecvByteAllocatorHandle allocHandle =
162 (IoUringRecvByteAllocatorHandle) unsafe()
163 .recvBufAllocHandle();
164 final ChannelPipeline pipeline = pipeline();
165 allocHandle.lastBytesRead(res);
166
167 if (res >= 0) {
168 allocHandle.incMessagesRead(1);
169 try {
170 Channel channel = newChildChannel(
171 res, acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress);
172 pipeline.fireChannelRead(channel);
173
174 if (allocHandle.continueReading() &&
175
176
177
178
179
180
181 !socketIsEmpty(flags)) {
182 scheduleRead(false);
183 } else {
184 allocHandle.readComplete();
185 pipeline.fireChannelReadComplete();
186 }
187 } catch (Throwable cause) {
188 allocHandle.readComplete();
189 pipeline.fireChannelReadComplete();
190 pipeline.fireExceptionCaught(cause);
191 }
192 } else if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
193 allocHandle.readComplete();
194 pipeline.fireChannelReadComplete();
195
196 if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) {
197
198 pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res));
199 }
200 }
201 }
202
203 @Override
204 public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
205 final ChannelPromise promise) {
206 promise.setFailure(new UnsupportedOperationException());
207 }
208
209 @Override
210 protected void freeResourcesNow(IoUringIoRegistration reg) {
211 super.freeResourcesNow(reg);
212 Buffer.free(acceptedAddressMemory);
213 Buffer.free(acceptedAddressLengthMemory);
214 }
215 }
216
217 @Override
218 protected boolean socketIsEmpty(int flags) {
219
220
221 return IoUring.isIOUringAcceptNoWaitSupported() &&
222 IoUring.isIOUringCqeFSockNonEmptySupported() && (flags & Native.IORING_CQE_F_SOCK_NONEMPTY) == 0;
223 }
224 }
225