1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelMetadata;
21 import io.netty.channel.ChannelOutboundBuffer;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.ChannelPromise;
24 import io.netty.channel.ServerChannel;
25 import io.netty.channel.unix.Buffer;
26 import io.netty.channel.unix.Errors;
27
28 import java.net.SocketAddress;
29 import java.nio.ByteBuffer;
30
31 import static io.netty.channel.unix.Errors.ERRNO_EAGAIN_NEGATIVE;
32 import static io.netty.channel.unix.Errors.ERRNO_EWOULDBLOCK_NEGATIVE;
33
34 abstract class AbstractIoUringServerChannel extends AbstractIoUringChannel implements ServerChannel {
35 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
36
37 private final ByteBuffer acceptedAddressMemory;
38 private final ByteBuffer acceptedAddressLengthMemory;
39
40 private final long acceptedAddressMemoryAddress;
41 private final long acceptedAddressLengthMemoryAddress;
42
43 private long acceptId;
44
45 protected AbstractIoUringServerChannel(LinuxSocket socket, boolean active) {
46 super(null, socket, active);
47
48 acceptedAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
49 acceptedAddressMemoryAddress = Buffer.memoryAddress(acceptedAddressMemory);
50 acceptedAddressLengthMemory = Buffer.allocateDirectWithNativeOrder(Long.BYTES);
51
52
53 acceptedAddressLengthMemory.putLong(0, Native.SIZEOF_SOCKADDR_STORAGE);
54 acceptedAddressLengthMemoryAddress = Buffer.memoryAddress(acceptedAddressLengthMemory);
55 }
56
57 @Override
58 public final ChannelMetadata metadata() {
59 return METADATA;
60 }
61
62 @Override
63 protected final void doClose() throws Exception {
64 super.doClose();
65 Buffer.free(acceptedAddressMemory);
66 Buffer.free(acceptedAddressLengthMemory);
67 }
68
69 @Override
70 protected final AbstractUringUnsafe newUnsafe() {
71 return new UringServerChannelUnsafe();
72 }
73
74 @Override
75 protected final void doWrite(ChannelOutboundBuffer in) {
76 throw new UnsupportedOperationException();
77 }
78
79 @Override
80 protected final void cancelOutstandingReads(IoUringIoRegistration registration, int numOutstandingReads) {
81 if (acceptId != 0) {
82 assert numOutstandingReads == 1;
83 int fd = fd().intValue();
84 IoUringIoOps ops = IoUringIoOps.newAsyncCancel(
85 fd, 0, acceptId, Native.IORING_OP_ACCEPT);
86 registration.submit(ops);
87 }
88 assert numOutstandingReads == 0;
89 }
90
91 @Override
92 protected final void cancelOutstandingWrites(IoUringIoRegistration registration, int numOutstandingWrites) {
93 assert numOutstandingWrites == 0;
94 }
95
96 abstract Channel newChildChannel(
97 int fd, long acceptedAddressMemoryAddress, long acceptedAddressLengthMemoryAddress) throws Exception;
98
99 private final class UringServerChannelUnsafe extends AbstractIoUringChannel.AbstractUringUnsafe {
100
101 @Override
102 protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
103 throw new UnsupportedOperationException();
104 }
105
106 @Override
107 protected int scheduleWriteSingle(Object msg) {
108 throw new UnsupportedOperationException();
109 }
110
111 @Override
112 boolean writeComplete0(int res, int flags, int data, int outstanding) {
113 throw new UnsupportedOperationException();
114 }
115
116 @Override
117 protected int scheduleRead0(boolean first) {
118 assert acceptId == 0;
119 final IoUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
120 allocHandle.attemptedBytesRead(1);
121
122 int fd = fd().intValue();
123 IoUringIoRegistration registration = registration();
124 IoUringIoOps ops = IoUringIoOps.newAccept(fd, 0, 0,
125 acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, nextOpsId());
126 acceptId = registration.submit(ops);
127 return 1;
128 }
129
130 @Override
131 protected void readComplete0(int res, int flags, int data, int outstanding) {
132 assert acceptId != 0;
133 acceptId = 0;
134 final IoUringRecvByteAllocatorHandle allocHandle =
135 (IoUringRecvByteAllocatorHandle) unsafe()
136 .recvBufAllocHandle();
137 final ChannelPipeline pipeline = pipeline();
138 allocHandle.lastBytesRead(res);
139
140 if (res >= 0) {
141 allocHandle.incMessagesRead(1);
142 try {
143 Channel channel = newChildChannel(
144 res, acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress);
145 pipeline.fireChannelRead(channel);
146 if (allocHandle.continueReading()) {
147 scheduleRead(false);
148 } else {
149 allocHandle.readComplete();
150 pipeline.fireChannelReadComplete();
151 }
152 } catch (Throwable cause) {
153 allocHandle.readComplete();
154 pipeline.fireChannelReadComplete();
155 pipeline.fireExceptionCaught(cause);
156 }
157 } else if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
158 allocHandle.readComplete();
159 pipeline.fireChannelReadComplete();
160
161 if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) {
162
163 pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res));
164 }
165 }
166 }
167
168 @Override
169 public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
170 final ChannelPromise promise) {
171 promise.setFailure(new UnsupportedOperationException());
172 }
173 }
174 }
175