1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.channel.unix.Buffer;
19 import io.netty.util.internal.logging.InternalLogger;
20 import io.netty.util.internal.logging.InternalLoggerFactory;
21
22 import java.lang.invoke.MethodHandles;
23 import java.lang.invoke.VarHandle;
24 import java.nio.ByteBuffer;
25 import java.nio.ByteOrder;
26 import java.util.StringJoiner;
27
28 final class SubmissionQueue {
29 private static final InternalLogger logger = InternalLoggerFactory.getInstance(SubmissionQueue.class);
30
31 static final int SQE_SIZE = 64;
32
33
34
35 private static final int SQE_OP_CODE_FIELD = 0;
36 private static final int SQE_FLAGS_FIELD = 1;
37 private static final int SQE_IOPRIO_FIELD = 2;
38 private static final int SQE_FD_FIELD = 4;
39 private static final int SQE_UNION1_FIELD = 8;
40 private static final int SQE_UNION2_FIELD = 16;
41 private static final int SQE_LEN_FIELD = 24;
42 private static final int SQE_UNION3_FIELD = 28;
43 private static final int SQE_USER_DATA_FIELD = 32;
44 private static final int SQE_UNION4_FIELD = 40;
45 private static final int SQE_PERSONALITY_FIELD = 42;
46 private static final int SQE_UNION5_FIELD = 44;
47 private static final int SQE_UNION6_FIELD = 48;
48
49
50
51 private static final VarHandle INT_HANDLE =
52 MethodHandles.byteBufferViewVarHandle(int[].class, ByteOrder.nativeOrder());
53 private final ByteBuffer kHead;
54 private final ByteBuffer kTail;
55 private final ByteBuffer submissionQueueArray;
56
57 final int ringEntries;
58 private final int ringMask;
59
60 final int ringSize;
61 final long ringAddress;
62 final int ringFd;
63 int enterRingFd;
64 private int enterFlags;
65 private int head;
66 private int tail;
67
68 private boolean closed;
69
70 SubmissionQueue(ByteBuffer khead, ByteBuffer ktail, int ringMask, int ringEntries, ByteBuffer submissionQueueArray,
71 int ringSize, long ringAddress,
72 int ringFd) {
73 this.kHead = khead;
74 this.kTail = ktail;
75 this.submissionQueueArray = submissionQueueArray;
76 this.ringSize = ringSize;
77 this.ringAddress = ringAddress;
78 this.ringFd = ringFd;
79 this.enterRingFd = ringFd;
80 this.ringEntries = ringEntries;
81 this.ringMask = ringMask;
82 this.head = (int) INT_HANDLE.getVolatile(khead, 0);
83 this.tail = (int) INT_HANDLE.getVolatile(ktail, 0);
84 }
85
86 long submissionQueueArrayAddress() {
87 return Buffer.memoryAddress(submissionQueueArray);
88 }
89
90 void close() {
91 closed = true;
92 }
93
94 private void checkClosed() {
95 if (closed) {
96 throw new IllegalStateException();
97 }
98 }
99
100 void tryRegisterRingFd() {
101 checkClosed();
102
103
104 int enterRingFd = Native.ioUringRegisterRingFds(ringFd);
105 final int enterFlags;
106 if (enterRingFd < 0) {
107
108 enterRingFd = ringFd;
109 enterFlags = 0;
110 } else {
111 enterFlags = Native.IORING_ENTER_REGISTERED_RING;
112 }
113 this.enterRingFd = enterRingFd;
114 this.enterFlags = enterFlags;
115 }
116
117 long enqueueSqe(byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len,
118 int union3, long udata, short union4, short personality, int union5, long union6) {
119 checkClosed();
120 int pending = tail - head;
121 if (pending == ringEntries) {
122 int submitted = submit();
123 if (submitted == 0) {
124
125 throw new RuntimeException("SQ ring full and no submissions accepted");
126 }
127 }
128 int sqe = sqeIndex(tail++, ringMask);
129
130
131 submissionQueueArray.put(sqe + SQE_OP_CODE_FIELD, opcode);
132 submissionQueueArray.put(sqe + SQE_FLAGS_FIELD, flags);
133
134 submissionQueueArray.putShort(sqe + SQE_IOPRIO_FIELD, ioPrio);
135 submissionQueueArray.putInt(sqe + SQE_FD_FIELD, fd);
136 submissionQueueArray.putLong(sqe + SQE_UNION1_FIELD, union1);
137 submissionQueueArray.putLong(sqe + SQE_UNION2_FIELD, union2);
138 submissionQueueArray.putInt(sqe + SQE_LEN_FIELD, len);
139 submissionQueueArray.putInt(sqe + SQE_UNION3_FIELD, union3);
140 submissionQueueArray.putLong(sqe + SQE_USER_DATA_FIELD, udata);
141 submissionQueueArray.putShort(sqe + SQE_UNION4_FIELD, union4);
142 submissionQueueArray.putShort(sqe + SQE_PERSONALITY_FIELD, personality);
143 submissionQueueArray.putInt(sqe + SQE_UNION5_FIELD, union5);
144 submissionQueueArray.putLong(sqe + SQE_UNION6_FIELD, union6);
145
146 if (logger.isTraceEnabled()) {
147 if (opcode == Native.IORING_OP_WRITEV || opcode == Native.IORING_OP_READV) {
148 logger.trace("add(ring={}, enterRing:{} ): {}(fd={}, len={}, off={}, data={})",
149 ringFd, enterRingFd, Native.opToStr(opcode), fd, len, union1, udata);
150 } else {
151 logger.trace("add(ring={}, enterRing:{}): {}(fd={}, len={}, off={}, data={})",
152 ringFd, enterRingFd, Native.opToStr(opcode), fd, len, union1, udata);
153 }
154 }
155 return udata;
156 }
157
158 @Override
159 public String toString() {
160 StringJoiner sb = new StringJoiner(", ", "SubmissionQueue [", "]");
161 if (closed) {
162 sb.add("closed");
163 } else {
164 int pending = tail - head;
165 int idx = tail;
166 for (int i = 0; i < pending; i++) {
167 int sqe = sqeIndex(idx++, ringMask);
168 sb.add(Native.opToStr(submissionQueueArray.get(sqe + SQE_OP_CODE_FIELD)) +
169 "(fd=" + submissionQueueArray.getInt(sqe + SQE_FD_FIELD) + ')');
170 }
171 }
172 return sb.toString();
173 }
174
175 private static int sqeIndex(int tail, int ringMask) {
176 return (tail & ringMask) * SQE_SIZE;
177 }
178
179 long addNop(byte flags, long udata) {
180
181
182 return enqueueSqe(Native.IORING_OP_NOP, flags, (short) 0, -1, 0, 0, 0, 0, udata,
183 (short) 0, (short) 0, 0, 0);
184 }
185
186 long addTimeout(long timeoutMemoryAddress, long udata) {
187
188
189 return enqueueSqe(Native.IORING_OP_TIMEOUT, (byte) 0, (short) 0, -1, 1, timeoutMemoryAddress, 1,
190 0, udata, (short) 0, (short) 0, 0, 0);
191 }
192
193 long addLinkTimeout(long timeoutMemoryAddress, long extraData) {
194
195
196 return enqueueSqe(Native.IORING_OP_LINK_TIMEOUT, (byte) 0, (short) 0, -1, 1, timeoutMemoryAddress, 1,
197 0, extraData, (short) 0, (short) 0, 0, 0);
198 }
199
200 long addEventFdRead(int fd, long bufferAddress, int pos, int limit, long udata) {
201 return enqueueSqe(Native.IORING_OP_READ, (byte) 0, (short) 0, fd, 0, bufferAddress + pos, limit - pos,
202 0, udata, (short) 0, (short) 0, 0, 0);
203 }
204
205
206
207 long addCancel(long sqeToCancel, long udata) {
208 return enqueueSqe(Native.IORING_OP_ASYNC_CANCEL, (byte) 0, (short) 0, -1, 0, sqeToCancel, 0, 0,
209 udata, (short) 0, (short) 0, 0, 0);
210 }
211
212 int submit() {
213 checkClosed();
214 int submit = tail - head;
215 return submit > 0 ? submit(submit, 0, 0) : 0;
216 }
217
218 int submitAndWait() {
219 checkClosed();
220 int submit = tail - head;
221 if (submit > 0) {
222 return submit(submit, 1, Native.IORING_ENTER_GETEVENTS);
223 }
224 assert submit == 0;
225 int ret = ioUringEnter(0, 1, Native.IORING_ENTER_GETEVENTS);
226 if (ret < 0) {
227 throw new RuntimeException("ioUringEnter syscall returned " + ret);
228 }
229 return ret;
230 }
231
232 private int submit(int toSubmit, int minComplete, int flags) {
233 INT_HANDLE.setRelease(kTail, 0, tail);
234 int ret = ioUringEnter(toSubmit, minComplete, flags);
235 head = (int) INT_HANDLE.getVolatile(kHead, 0);
236 if (ret != toSubmit) {
237 if (ret < 0) {
238 throw new RuntimeException("ioUringEnter syscall returned " + ret);
239 }
240 }
241 return ret;
242 }
243
244 private int ioUringEnter(int toSubmit, int minComplete, int flags) {
245 int f = enterFlags | flags;
246
247 if (IoUring.isSetupSubmitAllSupported()) {
248 return ioUringEnter0(toSubmit, minComplete, f);
249 }
250
251
252 int submitted = 0;
253 for (;;) {
254 int ret = ioUringEnter0(toSubmit, minComplete, f);
255 if (ret < 0) {
256 return ret;
257 }
258 submitted += ret;
259 if (ret == toSubmit) {
260 return submitted;
261 }
262 if (logger.isTraceEnabled()) {
263
264 logger.trace("Not all submissions succeeded. Only {} of {} SQEs were submitted.", ret, toSubmit);
265 }
266 toSubmit -= ret;
267 }
268 }
269
270 private int ioUringEnter0(int toSubmit, int minComplete, int f) {
271 if (logger.isTraceEnabled()) {
272 logger.trace("io_uring_enter(ring={}, enterRing={}, toSubmit={}, minComplete={}, flags={}): {}",
273 ringFd, enterRingFd, toSubmit, minComplete, f, toString());
274 }
275 return Native.ioUringEnter(enterRingFd, toSubmit, minComplete, f);
276 }
277
278 public int count() {
279 return tail - head;
280 }
281
282 public int remaining() {
283 return ringEntries - count();
284 }
285 }