1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelMetadata;
21 import io.netty.channel.ChannelOutboundBuffer;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.ChannelPromise;
24 import io.netty.channel.ServerChannel;
25 import io.netty.channel.unix.Buffer;
26 import io.netty.channel.unix.Errors;
27
28 import java.net.SocketAddress;
29 import java.nio.ByteBuffer;
30
31 import static io.netty.channel.unix.Errors.ERRNO_EAGAIN_NEGATIVE;
32 import static io.netty.channel.unix.Errors.ERRNO_EWOULDBLOCK_NEGATIVE;
33
34 abstract class AbstractIoUringServerChannel extends AbstractIoUringChannel implements ServerChannel {
35 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
36
37 private final ByteBuffer acceptedAddressMemory;
38 private final ByteBuffer acceptedAddressLengthMemory;
39
40 private final long acceptedAddressMemoryAddress;
41 private final long acceptedAddressLengthMemoryAddress;
42
43 private long acceptId;
44
45 protected AbstractIoUringServerChannel(LinuxSocket socket, boolean active) {
46 super(null, socket, active);
47
48 acceptedAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
49 acceptedAddressMemoryAddress = Buffer.memoryAddress(acceptedAddressMemory);
50 acceptedAddressLengthMemory = Buffer.allocateDirectWithNativeOrder(Long.BYTES);
51
52
53 acceptedAddressLengthMemory.putLong(0, Native.SIZEOF_SOCKADDR_STORAGE);
54 acceptedAddressLengthMemoryAddress = Buffer.memoryAddress(acceptedAddressLengthMemory);
55 }
56
57 @Override
58 public final ChannelMetadata metadata() {
59 return METADATA;
60 }
61
62 @Override
63 protected final void doClose() throws Exception {
64 super.doClose();
65 }
66
67 @Override
68 protected final AbstractUringUnsafe newUnsafe() {
69 return new UringServerChannelUnsafe();
70 }
71
72 @Override
73 protected final void doWrite(ChannelOutboundBuffer in) {
74 throw new UnsupportedOperationException();
75 }
76
77 @Override
78 protected final void cancelOutstandingReads(IoUringIoRegistration registration, int numOutstandingReads) {
79 if (acceptId != 0) {
80 assert numOutstandingReads == 1;
81 int fd = fd().intValue();
82 IoUringIoOps ops = IoUringIoOps.newAsyncCancel(
83 fd, (byte) 0, acceptId, Native.IORING_OP_ACCEPT);
84 registration.submit(ops);
85 } else {
86 assert numOutstandingReads == 0;
87 }
88 }
89
90 @Override
91 protected final void cancelOutstandingWrites(IoUringIoRegistration registration, int numOutstandingWrites) {
92 assert numOutstandingWrites == 0;
93 }
94
95 abstract Channel newChildChannel(
96 int fd, long acceptedAddressMemoryAddress, long acceptedAddressLengthMemoryAddress) throws Exception;
97
98 private final class UringServerChannelUnsafe extends AbstractIoUringChannel.AbstractUringUnsafe {
99
100 @Override
101 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
102 throw new UnsupportedOperationException();
103 }
104
105 @Override
106 protected int scheduleWriteSingle(Object msg) {
107 throw new UnsupportedOperationException();
108 }
109
110 @Override
111 boolean writeComplete0(byte op, int res, int flags, short data, int outstanding) {
112 throw new UnsupportedOperationException();
113 }
114
115 @Override
116 protected int scheduleRead0(boolean first) {
117 assert acceptId == 0;
118 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
119 allocHandle.attemptedBytesRead(1);
120
121 int fd = fd().intValue();
122 IoUringIoRegistration registration = registration();
123 IoUringIoOps ops = IoUringIoOps.newAccept(fd, (byte) 0, 0,
124 acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, nextOpsId());
125 acceptId = registration.submit(ops);
126 if (acceptId == 0) {
127 return 0;
128 }
129 return 1;
130 }
131
132 @Override
133 protected void readComplete0(byte op, int res, int flags, short data, int outstanding) {
134 assert acceptId != 0;
135 acceptId = 0;
136 final IoUringRecvByteAllocatorHandle allocHandle =
137 (IoUringRecvByteAllocatorHandle) unsafe()
138 .recvBufAllocHandle();
139 final ChannelPipeline pipeline = pipeline();
140 allocHandle.lastBytesRead(res);
141
142 if (res >= 0) {
143 allocHandle.incMessagesRead(1);
144 try {
145 Channel channel = newChildChannel(
146 res, acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress);
147 pipeline.fireChannelRead(channel);
148 if (allocHandle.continueReading()) {
149 scheduleRead(false);
150 } else {
151 allocHandle.readComplete();
152 pipeline.fireChannelReadComplete();
153 }
154 } catch (Throwable cause) {
155 allocHandle.readComplete();
156 pipeline.fireChannelReadComplete();
157 pipeline.fireExceptionCaught(cause);
158 }
159 } else if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
160 allocHandle.readComplete();
161 pipeline.fireChannelReadComplete();
162
163 if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) {
164
165 pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res));
166 }
167 }
168 }
169
170 @Override
171 public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
172 final ChannelPromise promise) {
173 promise.setFailure(new UnsupportedOperationException());
174 }
175
176 @Override
177 protected void freeResourcesNow(IoUringIoRegistration reg) {
178 super.freeResourcesNow(reg);
179 Buffer.free(acceptedAddressMemory);
180 Buffer.free(acceptedAddressLengthMemory);
181 }
182 }
183 }
184