1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.channel.unix.Buffer;
19 import io.netty.channel.unix.Errors;
20 import io.netty.util.internal.logging.InternalLogger;
21 import io.netty.util.internal.logging.InternalLoggerFactory;
22
23 import java.io.UncheckedIOException;
24 import java.lang.invoke.MethodHandles;
25 import java.lang.invoke.VarHandle;
26 import java.nio.ByteBuffer;
27 import java.nio.ByteOrder;
28 import java.util.StringJoiner;
29
30 final class SubmissionQueue {
31 private static final InternalLogger logger = InternalLoggerFactory.getInstance(SubmissionQueue.class);
32
33 static final int SQE_SIZE = 64;
34
35
36
37 private static final int SQE_OP_CODE_FIELD = 0;
38 private static final int SQE_FLAGS_FIELD = 1;
39 private static final int SQE_IOPRIO_FIELD = 2;
40 private static final int SQE_FD_FIELD = 4;
41 private static final int SQE_UNION1_FIELD = 8;
42 private static final int SQE_UNION2_FIELD = 16;
43 private static final int SQE_LEN_FIELD = 24;
44 private static final int SQE_UNION3_FIELD = 28;
45 private static final int SQE_USER_DATA_FIELD = 32;
46 private static final int SQE_UNION4_FIELD = 40;
47 private static final int SQE_PERSONALITY_FIELD = 42;
48 private static final int SQE_UNION5_FIELD = 44;
49 private static final int SQE_UNION6_FIELD = 48;
50
51
52
53 private static final VarHandle INT_HANDLE =
54 MethodHandles.byteBufferViewVarHandle(int[].class, ByteOrder.nativeOrder());
55 private final ByteBuffer kHead;
56 private final ByteBuffer kTail;
57 private final ByteBuffer kflags;
58 private final ByteBuffer submissionQueueArray;
59
60 final int ringEntries;
61 private final int ringMask;
62
63 final int ringSize;
64 final long ringAddress;
65 final int ringFd;
66 int enterRingFd;
67 private int enterFlags;
68 private int head;
69 private int tail;
70
71 private boolean closed;
72
73 SubmissionQueue(ByteBuffer khead, ByteBuffer ktail, int ringMask, int ringEntries, ByteBuffer kflags,
74 ByteBuffer submissionQueueArray,
75 int ringSize, long ringAddress,
76 int ringFd) {
77 this.kHead = khead;
78 this.kTail = ktail;
79 this.submissionQueueArray = submissionQueueArray;
80 this.ringSize = ringSize;
81 this.ringAddress = ringAddress;
82 this.ringFd = ringFd;
83 this.enterRingFd = ringFd;
84 this.ringEntries = ringEntries;
85 this.kflags = kflags;
86 this.ringMask = ringMask;
87 this.head = (int) INT_HANDLE.getVolatile(khead, 0);
88 this.tail = (int) INT_HANDLE.getVolatile(ktail, 0);
89 }
90
91 int flags() {
92 if (closed) {
93 return 0;
94 }
95
96 return (int) INT_HANDLE.getOpaque(kflags, 0);
97 }
98
99 long submissionQueueArrayAddress() {
100 return Buffer.memoryAddress(submissionQueueArray);
101 }
102
103 void close() {
104 closed = true;
105 }
106
107 private void checkClosed() {
108 if (closed) {
109 throw new IllegalStateException();
110 }
111 }
112
113 void tryRegisterRingFd() {
114 checkClosed();
115
116
117 int enterRingFd = Native.ioUringRegisterRingFds(ringFd);
118 final int enterFlags;
119 if (enterRingFd < 0) {
120
121 enterRingFd = ringFd;
122 enterFlags = 0;
123 } else {
124 enterFlags = Native.IORING_ENTER_REGISTERED_RING;
125 }
126 this.enterRingFd = enterRingFd;
127 this.enterFlags = enterFlags;
128 }
129
130 long enqueueSqe(byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len,
131 int union3, long udata, short union4, short personality, int union5, long union6) {
132 checkClosed();
133 int pending = tail - head;
134 if (pending == ringEntries) {
135 int submitted = submit();
136 if (submitted == 0) {
137
138 throw new RuntimeException("SQ ring full and no submissions accepted");
139 }
140 }
141 int sqe = sqeIndex(tail++, ringMask);
142
143
144 submissionQueueArray.put(sqe + SQE_OP_CODE_FIELD, opcode);
145 submissionQueueArray.put(sqe + SQE_FLAGS_FIELD, flags);
146
147 submissionQueueArray.putShort(sqe + SQE_IOPRIO_FIELD, ioPrio);
148 submissionQueueArray.putInt(sqe + SQE_FD_FIELD, fd);
149 submissionQueueArray.putLong(sqe + SQE_UNION1_FIELD, union1);
150 submissionQueueArray.putLong(sqe + SQE_UNION2_FIELD, union2);
151 submissionQueueArray.putInt(sqe + SQE_LEN_FIELD, len);
152 submissionQueueArray.putInt(sqe + SQE_UNION3_FIELD, union3);
153 submissionQueueArray.putLong(sqe + SQE_USER_DATA_FIELD, udata);
154 submissionQueueArray.putShort(sqe + SQE_UNION4_FIELD, union4);
155 submissionQueueArray.putShort(sqe + SQE_PERSONALITY_FIELD, personality);
156 submissionQueueArray.putInt(sqe + SQE_UNION5_FIELD, union5);
157 submissionQueueArray.putLong(sqe + SQE_UNION6_FIELD, union6);
158
159 if (logger.isTraceEnabled()) {
160 if (opcode == Native.IORING_OP_WRITEV || opcode == Native.IORING_OP_READV) {
161 logger.trace("add(ring={}, enterRing:{} ): {}(fd={}, len={}, off={}, data={})",
162 ringFd, enterRingFd, Native.opToStr(opcode), fd, len, union1, udata);
163 } else {
164 logger.trace("add(ring={}, enterRing:{}): {}(fd={}, len={}, off={}, data={})",
165 ringFd, enterRingFd, Native.opToStr(opcode), fd, len, union1, udata);
166 }
167 }
168 return udata;
169 }
170
171 @Override
172 public String toString() {
173 StringJoiner sb = new StringJoiner(", ", "SubmissionQueue [", "]");
174 if (closed) {
175 sb.add("closed");
176 } else {
177 int pending = tail - head;
178 int idx = tail;
179 for (int i = 0; i < pending; i++) {
180 int sqe = sqeIndex(idx++, ringMask);
181 sb.add(Native.opToStr(submissionQueueArray.get(sqe + SQE_OP_CODE_FIELD)) +
182 "(fd=" + submissionQueueArray.getInt(sqe + SQE_FD_FIELD) + ')');
183 }
184 }
185 return sb.toString();
186 }
187
188 private static int sqeIndex(int tail, int ringMask) {
189 return (tail & ringMask) * SQE_SIZE;
190 }
191
192 long addNop(byte flags, long udata) {
193
194
195 return enqueueSqe(Native.IORING_OP_NOP, flags, (short) 0, -1, 0, 0, 0, 0, udata,
196 (short) 0, (short) 0, 0, 0);
197 }
198
199 long addTimeout(long timeoutMemoryAddress, long udata) {
200
201
202 return enqueueSqe(Native.IORING_OP_TIMEOUT, (byte) 0, (short) 0, -1, 1, timeoutMemoryAddress, 1,
203 0, udata, (short) 0, (short) 0, 0, 0);
204 }
205
206 long addLinkTimeout(long timeoutMemoryAddress, long extraData) {
207
208
209 return enqueueSqe(Native.IORING_OP_LINK_TIMEOUT, (byte) 0, (short) 0, -1, 1, timeoutMemoryAddress, 1,
210 0, extraData, (short) 0, (short) 0, 0, 0);
211 }
212
213 long addEventFdRead(int fd, long bufferAddress, int pos, int limit, long udata) {
214 return enqueueSqe(Native.IORING_OP_READ, (byte) 0, (short) 0, fd, 0, bufferAddress + pos, limit - pos,
215 0, udata, (short) 0, (short) 0, 0, 0);
216 }
217
218
219
220 long addCancel(long sqeToCancel, long udata) {
221 return enqueueSqe(Native.IORING_OP_ASYNC_CANCEL, (byte) 0, (short) 0, -1, 0, sqeToCancel, 0, 0,
222 udata, (short) 0, (short) 0, 0, 0);
223 }
224
225
226
227
228
229
230 int submit() {
231 checkClosed();
232 int submit = tail - head;
233 return submit > 0 ? submit(submit, 0, 0) : 0;
234 }
235
236
237
238
239
240
241
242 int submitAndGet() {
243 return submitAndGet0(1);
244 }
245
246
247
248
249
250
251 int submitAndGetNow() {
252 return submitAndGet0(0);
253 }
254
255 private int submitAndGet0(int minComplete) {
256 checkClosed();
257 int submit = tail - head;
258 if (submit > 0) {
259 return submit(submit, minComplete, Native.IORING_ENTER_GETEVENTS);
260 }
261 assert submit == 0;
262 int ret = ioUringEnter(0, minComplete, Native.IORING_ENTER_GETEVENTS);
263 if (ret < 0) {
264 throw new UncheckedIOException(Errors.newIOException("io_uring_enter", ret));
265 }
266 return ret;
267 }
268
269 private int submit(int toSubmit, int minComplete, int flags) {
270 INT_HANDLE.setRelease(kTail, 0, tail);
271 int ret = ioUringEnter(toSubmit, minComplete, flags);
272 head = (int) INT_HANDLE.getVolatile(kHead, 0);
273 if (ret != toSubmit) {
274 if (ret < 0) {
275 throw new UncheckedIOException(Errors.newIOException("io_uring_enter", ret));
276 }
277 }
278 return ret;
279 }
280
281 private int ioUringEnter(int toSubmit, int minComplete, int flags) {
282 int f = enterFlags | flags;
283
284 if (IoUring.isSetupSubmitAllSupported()) {
285 return ioUringEnter0(toSubmit, minComplete, f);
286 }
287
288
289 int submitted = 0;
290 for (;;) {
291 int ret = ioUringEnter0(toSubmit, minComplete, f);
292 if (ret < 0) {
293 return ret;
294 }
295 submitted += ret;
296 if (ret == toSubmit) {
297 return submitted;
298 }
299 if (logger.isTraceEnabled()) {
300
301 logger.trace("Not all submissions succeeded. Only {} of {} SQEs were submitted.", ret, toSubmit);
302 }
303 toSubmit -= ret;
304 }
305 }
306
307 private int ioUringEnter0(int toSubmit, int minComplete, int f) {
308 if (logger.isTraceEnabled()) {
309 logger.trace("io_uring_enter(ring={}, enterRing={}, toSubmit={}, minComplete={}, flags={}): {}",
310 ringFd, enterRingFd, toSubmit, minComplete, f, toString());
311 }
312 return Native.ioUringEnter(enterRingFd, toSubmit, minComplete, f);
313 }
314
315 public int count() {
316 return tail - head;
317 }
318
319 public int remaining() {
320 return ringEntries - count();
321 }
322 }