1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.channel.unix.Buffer;
19 import io.netty.channel.unix.Errors;
20 import io.netty.util.internal.logging.InternalLogger;
21 import io.netty.util.internal.logging.InternalLoggerFactory;
22
23 import java.io.UncheckedIOException;
24 import java.lang.invoke.MethodHandles;
25 import java.lang.invoke.VarHandle;
26 import java.nio.ByteBuffer;
27 import java.nio.ByteOrder;
28 import java.util.StringJoiner;
29
30 final class SubmissionQueue {
31 private static final InternalLogger logger = InternalLoggerFactory.getInstance(SubmissionQueue.class);
32
33 static final int SQE_SIZE = 64;
34
35
36
37 private static final int SQE_OP_CODE_FIELD = 0;
38 private static final int SQE_FLAGS_FIELD = 1;
39 private static final int SQE_IOPRIO_FIELD = 2;
40 private static final int SQE_FD_FIELD = 4;
41 private static final int SQE_UNION1_FIELD = 8;
42 private static final int SQE_UNION2_FIELD = 16;
43 private static final int SQE_LEN_FIELD = 24;
44 private static final int SQE_UNION3_FIELD = 28;
45 private static final int SQE_USER_DATA_FIELD = 32;
46 private static final int SQE_UNION4_FIELD = 40;
47 private static final int SQE_PERSONALITY_FIELD = 42;
48 private static final int SQE_UNION5_FIELD = 44;
49 private static final int SQE_UNION6_FIELD = 48;
50
51
52
53 private static final VarHandle INT_HANDLE =
54 MethodHandles.byteBufferViewVarHandle(int[].class, ByteOrder.nativeOrder());
55 private final ByteBuffer kHead;
56 private final ByteBuffer kTail;
57 private final ByteBuffer kflags;
58 private final ByteBuffer submissionQueueArray;
59
60 final int ringEntries;
61 private final int ringMask;
62
63 final int ringSize;
64 final long ringAddress;
65 final int ringFd;
66 int enterRingFd;
67 private int enterFlags;
68 private int head;
69 private int tail;
70
71 private boolean closed;
72
73 SubmissionQueue(ByteBuffer khead, ByteBuffer ktail, int ringMask, int ringEntries, ByteBuffer kflags,
74 ByteBuffer submissionQueueArray,
75 int ringSize, long ringAddress,
76 int ringFd) {
77 this.kHead = khead;
78 this.kTail = ktail;
79 this.submissionQueueArray = submissionQueueArray;
80 this.ringSize = ringSize;
81 this.ringAddress = ringAddress;
82 this.ringFd = ringFd;
83 this.enterRingFd = ringFd;
84 this.ringEntries = ringEntries;
85 this.kflags = kflags;
86 this.ringMask = ringMask;
87 this.head = (int) INT_HANDLE.getVolatile(khead, 0);
88 this.tail = (int) INT_HANDLE.getVolatile(ktail, 0);
89 }
90
91 int flags() {
92 if (closed) {
93 return 0;
94 }
95
96 return (int) INT_HANDLE.getOpaque(kflags, 0);
97 }
98
99 long submissionQueueArrayAddress() {
100 return Buffer.memoryAddress(submissionQueueArray);
101 }
102
103 void close() {
104 closed = true;
105 }
106
107 private void checkClosed() {
108 if (closed) {
109 throw new IllegalStateException();
110 }
111 }
112
113 void tryRegisterRingFd() {
114 checkClosed();
115
116
117 int enterRingFd = Native.ioUringRegisterRingFds(ringFd);
118 final int enterFlags;
119 if (enterRingFd < 0) {
120
121 enterRingFd = ringFd;
122 enterFlags = 0;
123 } else {
124 enterFlags = Native.IORING_ENTER_REGISTERED_RING;
125 }
126 this.enterRingFd = enterRingFd;
127 this.enterFlags = enterFlags;
128 }
129
130 long enqueueSqe(byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len,
131 int union3, long udata, short union4, short personality, int union5, long union6) {
132 checkClosed();
133 int pending = tail - head;
134 if (pending == ringEntries) {
135 int submitted = submit();
136 if (submitted == 0) {
137
138 throw new RuntimeException("SQ ring full and no submissions accepted");
139 }
140 }
141 int sqe = sqeIndex(tail++, ringMask);
142
143
144 submissionQueueArray.put(sqe + SQE_OP_CODE_FIELD, opcode);
145 submissionQueueArray.put(sqe + SQE_FLAGS_FIELD, flags);
146
147 submissionQueueArray.putShort(sqe + SQE_IOPRIO_FIELD, ioPrio);
148 submissionQueueArray.putInt(sqe + SQE_FD_FIELD, fd);
149 submissionQueueArray.putLong(sqe + SQE_UNION1_FIELD, union1);
150 submissionQueueArray.putLong(sqe + SQE_UNION2_FIELD, union2);
151 submissionQueueArray.putInt(sqe + SQE_LEN_FIELD, len);
152 submissionQueueArray.putInt(sqe + SQE_UNION3_FIELD, union3);
153 submissionQueueArray.putLong(sqe + SQE_USER_DATA_FIELD, udata);
154 submissionQueueArray.putShort(sqe + SQE_UNION4_FIELD, union4);
155 submissionQueueArray.putShort(sqe + SQE_PERSONALITY_FIELD, personality);
156 submissionQueueArray.putInt(sqe + SQE_UNION5_FIELD, union5);
157 submissionQueueArray.putLong(sqe + SQE_UNION6_FIELD, union6);
158
159 if (logger.isTraceEnabled()) {
160 if (opcode == Native.IORING_OP_WRITEV || opcode == Native.IORING_OP_READV) {
161 logger.trace("add(ring={}, enterRing:{} ): {}(fd={}, len={}, off={}, data={})",
162 ringFd, enterRingFd, Native.opToStr(opcode), fd, len, union1, udata);
163 } else {
164 logger.trace("add(ring={}, enterRing:{}): {}(fd={}, len={}, off={}, data={})",
165 ringFd, enterRingFd, Native.opToStr(opcode), fd, len, union1, udata);
166 }
167 }
168 return udata;
169 }
170
171 @Override
172 public String toString() {
173 StringJoiner sb = new StringJoiner(", ", "SubmissionQueue [", "]");
174 if (closed) {
175 sb.add("closed");
176 } else {
177 int pending = tail - head;
178 int idx = head;
179 for (int i = 0; i < pending; i++) {
180 int sqe = sqeIndex(idx++, ringMask);
181 sb.add(Native.opToStr(submissionQueueArray.get(sqe + SQE_OP_CODE_FIELD)) +
182 "(fd=" + submissionQueueArray.getInt(sqe + SQE_FD_FIELD) + ')');
183 }
184 }
185 return sb.toString();
186 }
187
188 private static int sqeIndex(int tail, int ringMask) {
189 return (tail & ringMask) * SQE_SIZE;
190 }
191
192 long addNop(byte flags, long udata) {
193 return addNop(flags, 0, udata);
194 }
195
196 long addNop(byte flags, int nopFlags, long udata) {
197
198
199 return enqueueSqe(Native.IORING_OP_NOP, flags, (short) 0, -1, 0, 0, 0, nopFlags, udata,
200 (short) 0, (short) 0, 0, 0);
201 }
202
203 long addTimeout(long timeoutMemoryAddress, long udata) {
204
205
206 return enqueueSqe(Native.IORING_OP_TIMEOUT, (byte) 0, (short) 0, -1, 1, timeoutMemoryAddress, 1,
207 0, udata, (short) 0, (short) 0, 0, 0);
208 }
209
210 long addLinkTimeout(long timeoutMemoryAddress, long extraData) {
211
212
213 return enqueueSqe(Native.IORING_OP_LINK_TIMEOUT, (byte) 0, (short) 0, -1, 1, timeoutMemoryAddress, 1,
214 0, extraData, (short) 0, (short) 0, 0, 0);
215 }
216
217 long addEventFdRead(int fd, long bufferAddress, int pos, int limit, long udata) {
218 return enqueueSqe(Native.IORING_OP_READ, (byte) 0, (short) 0, fd, 0, bufferAddress + pos, limit - pos,
219 0, udata, (short) 0, (short) 0, 0, 0);
220 }
221
222
223
224 long addCancel(long sqeToCancel, long udata) {
225 return enqueueSqe(Native.IORING_OP_ASYNC_CANCEL, (byte) 0, (short) 0, -1, 0, sqeToCancel, 0, 0,
226 udata, (short) 0, (short) 0, 0, 0);
227 }
228
229
230
231
232
233
234 int submit() {
235 checkClosed();
236 int submit = tail - head;
237 return submit > 0 ? submit(submit, 0, 0) : 0;
238 }
239
240
241
242
243
244
245
246 int submitAndGet() {
247 return submitAndGet0(1);
248 }
249
250
251
252
253
254
255 int submitAndGetNow() {
256 return submitAndGet0(0);
257 }
258
259 private int submitAndGet0(int minComplete) {
260 checkClosed();
261 int submit = tail - head;
262 if (submit > 0) {
263 return submit(submit, minComplete, Native.IORING_ENTER_GETEVENTS);
264 }
265 assert submit == 0;
266 int ret = ioUringEnter(0, minComplete, Native.IORING_ENTER_GETEVENTS);
267 if (ret < 0) {
268 throw new UncheckedIOException(Errors.newIOException("io_uring_enter", ret));
269 }
270 return ret;
271 }
272
273 private int submit(int toSubmit, int minComplete, int flags) {
274 INT_HANDLE.setRelease(kTail, 0, tail);
275 int ret = ioUringEnter(toSubmit, minComplete, flags);
276 head = (int) INT_HANDLE.getVolatile(kHead, 0);
277 if (ret != toSubmit) {
278 if (ret < 0) {
279 throw new UncheckedIOException(Errors.newIOException("io_uring_enter", ret));
280 }
281 }
282 return ret;
283 }
284
285 private int ioUringEnter(int toSubmit, int minComplete, int flags) {
286 int f = enterFlags | flags;
287
288 if (IoUring.isSetupSubmitAllSupported()) {
289 return ioUringEnter0(toSubmit, minComplete, f);
290 }
291
292
293 int submitted = 0;
294 for (;;) {
295 int ret = ioUringEnter0(toSubmit, minComplete, f);
296 if (ret < 0) {
297 return ret;
298 }
299 submitted += ret;
300 if (ret == toSubmit) {
301 return submitted;
302 }
303 if (logger.isTraceEnabled()) {
304
305 logger.trace("Not all submissions succeeded. Only {} of {} SQEs were submitted.", ret, toSubmit);
306 }
307 toSubmit -= ret;
308 }
309 }
310
311 private int ioUringEnter0(int toSubmit, int minComplete, int f) {
312 if (logger.isTraceEnabled()) {
313 logger.trace("io_uring_enter(ring={}, enterRing={}, toSubmit={}, minComplete={}, flags={}): {}",
314 ringFd, enterRingFd, toSubmit, minComplete, f, toString());
315 }
316 return Native.ioUringEnter(enterRingFd, toSubmit, minComplete, f);
317 }
318
319 public int count() {
320 return tail - head;
321 }
322
323 public int remaining() {
324 return ringEntries - count();
325 }
326 }