1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.util.internal.PlatformDependent;
19 import io.netty.util.internal.logging.InternalLogger;
20 import io.netty.util.internal.logging.InternalLoggerFactory;
21
22 import java.util.StringJoiner;
23
24 final class SubmissionQueue {
25 private static final InternalLogger logger = InternalLoggerFactory.getInstance(SubmissionQueue.class);
26
27 private static final long SQE_SIZE = 64;
28 private static final int INT_SIZE = Integer.BYTES;
29
30
31
32 private static final int SQE_OP_CODE_FIELD = 0;
33 private static final int SQE_FLAGS_FIELD = 1;
34 private static final int SQE_IOPRIO_FIELD = 2;
35 private static final int SQE_FD_FIELD = 4;
36 private static final int SQE_UNION1_FIELD = 8;
37 private static final int SQE_UNION2_FIELD = 16;
38 private static final int SQE_LEN_FIELD = 24;
39 private static final int SQE_UNION3_FIELD = 28;
40 private static final int SQE_USER_DATA_FIELD = 32;
41 private static final int SQE_UNION4_FIELD = 40;
42 private static final int SQE_PERSONALITY_FIELD = 42;
43 private static final int SQE_UNION5_FIELD = 44;
44 private static final int SQE_UNION6_FIELD = 48;
45
46
47 private final long kHeadAddress;
48 private final long kTailAddress;
49 private final long kFlagsAddress;
50 private final long kDroppedAddress;
51 private final long kArrayAddress;
52 final long submissionQueueArrayAddress;
53
54 final int ringEntries;
55 private final int ringMask;
56
57 final int ringSize;
58 final long ringAddress;
59 final int ringFd;
60 int enterRingFd;
61 private int enterFlags;
62 private int head;
63 private int tail;
64
65 SubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress,
66 long kFlagsAddress, long kDroppedAddress, long kArrayAddress,
67 long submissionQueueArrayAddress, int ringSize, long ringAddress,
68 int ringFd) {
69 this.kHeadAddress = kHeadAddress;
70 this.kTailAddress = kTailAddress;
71 this.kFlagsAddress = kFlagsAddress;
72 this.kDroppedAddress = kDroppedAddress;
73 this.kArrayAddress = kArrayAddress;
74 this.submissionQueueArrayAddress = submissionQueueArrayAddress;
75 this.ringSize = ringSize;
76 this.ringAddress = ringAddress;
77 this.ringFd = ringFd;
78 this.enterRingFd = ringFd;
79 this.ringEntries = PlatformDependent.getIntVolatile(kRingEntriesAddress);
80 this.ringMask = PlatformDependent.getIntVolatile(kRingMaskAddress);
81 this.head = PlatformDependent.getIntVolatile(kHeadAddress);
82 this.tail = PlatformDependent.getIntVolatile(kTailAddress);
83
84
85 PlatformDependent.setMemory(submissionQueueArrayAddress, ringEntries * SQE_SIZE, (byte) 0);
86
87
88 long address = kArrayAddress;
89 for (int i = 0; i < ringEntries; i++, address += INT_SIZE) {
90 PlatformDependent.putInt(address, i);
91 }
92 }
93
94 void tryRegisterRingFd() {
95
96
97 int enterRingFd = Native.ioUringRegisterRingFds(ringFd);
98 final int enterFlags;
99 if (enterRingFd < 0) {
100
101 enterRingFd = ringFd;
102 enterFlags = 0;
103 } else {
104 enterFlags = Native.IORING_ENTER_REGISTERED_RING;
105 }
106 this.enterRingFd = enterRingFd;
107 this.enterFlags = enterFlags;
108 }
109
110 private long enqueueSqe0(byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len,
111 int union3, long udata, short union4, short personality, int union5, long union6) {
112 int pending = tail - head;
113 if (pending == ringEntries) {
114 int submitted = submit();
115 if (submitted == 0) {
116
117 throw new RuntimeException("SQ ring full and no submissions accepted");
118 }
119 }
120 long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE;
121 setData(sqe, opcode, flags, ioPrio, fd, union1, union2, len,
122 union3, udata, union4, personality, union5, union6);
123 return udata;
124 }
125
126 void enqueueSqe(byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len, int union3,
127 long udata, short union4, short personality, int union5, long union6) {
128 int pending = tail - head;
129 if (pending == ringEntries) {
130 int submitted = submit();
131 if (submitted == 0) {
132
133 throw new RuntimeException("SQ ring full and no submissions accepted");
134 }
135 }
136 long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE;
137 setData(sqe, opcode, flags, ioPrio, fd, union1, union2, len,
138 union3, udata, union4, personality, union5, union6);
139 }
140
141 private void setData(long sqe, byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len,
142 int union3, long udata, short union4, short personality, int union5, long union6) {
143
144
145 PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, opcode);
146 PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, flags);
147
148 PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, ioPrio);
149 PlatformDependent.putInt(sqe + SQE_FD_FIELD, fd);
150 PlatformDependent.putLong(sqe + SQE_UNION1_FIELD, union1);
151 PlatformDependent.putLong(sqe + SQE_UNION2_FIELD, union2);
152 PlatformDependent.putInt(sqe + SQE_LEN_FIELD, len);
153 PlatformDependent.putInt(sqe + SQE_UNION3_FIELD, union3);
154 PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, udata);
155 PlatformDependent.putShort(sqe + SQE_UNION4_FIELD, union4);
156 PlatformDependent.putShort(sqe + SQE_PERSONALITY_FIELD, personality);
157 PlatformDependent.putInt(sqe + SQE_UNION5_FIELD, union5);
158 PlatformDependent.putLong(sqe + SQE_UNION6_FIELD, union6);
159
160 if (logger.isTraceEnabled()) {
161 if (opcode == Native.IORING_OP_WRITEV || opcode == Native.IORING_OP_READV) {
162 logger.trace("add(ring={}, enterRing:{} ): {}(fd={}, len={} ({} bytes), off={}, data={})",
163 ringFd, enterRingFd, Native.opToStr(opcode), fd, len, Iov.sumSize(union2, len), union1, udata);
164 } else {
165 logger.trace("add(ring={}, enterRing:{}): {}(fd={}, len={}, off={}, data={})",
166 ringFd, enterRingFd, Native.opToStr(opcode), fd, len, union1, udata);
167 }
168 }
169 }
170
171 @Override
172 public String toString() {
173 StringJoiner sb = new StringJoiner(", ", "SubmissionQueue [", "]");
174 int pending = tail - head;
175 for (int i = 0; i < pending; i++) {
176 long sqe = submissionQueueArrayAddress + (head + i & ringMask) * SQE_SIZE;
177 sb.add(Native.opToStr(PlatformDependent.getByte(sqe + SQE_OP_CODE_FIELD)) +
178 "(fd=" + PlatformDependent.getInt(sqe + SQE_FD_FIELD) + ')');
179 }
180 return sb.toString();
181 }
182
183 long addNop(byte flags, long udata) {
184
185
186 return enqueueSqe0(Native.IORING_OP_NOP, flags, (short) 0, -1, 0, 0, 0, 0, udata,
187 (short) 0, (short) 0, 0, 0);
188 }
189
190 long addTimeout(long timeoutMemoryAddress, long udata) {
191
192
193 return enqueueSqe0(Native.IORING_OP_TIMEOUT, (byte) 0, (short) 0, -1, 1, timeoutMemoryAddress, 1,
194 0, udata, (short) 0, (short) 0, 0, 0);
195 }
196
197 long addLinkTimeout(long timeoutMemoryAddress, long extraData) {
198
199
200 return enqueueSqe0(Native.IORING_OP_LINK_TIMEOUT, (byte) 0, (short) 0, -1, 1, timeoutMemoryAddress, 1,
201 0, extraData, (short) 0, (short) 0, 0, 0);
202 }
203
204 long addEventFdRead(int fd, long bufferAddress, int pos, int limit, long udata) {
205 return enqueueSqe0(Native.IORING_OP_READ, (byte) 0, (short) 0, fd, 0, bufferAddress + pos, limit - pos,
206 0, udata, (short) 0, (short) 0, 0, 0);
207 }
208
209
210
211 long addCancel(long sqeToCancel, long udata) {
212 return enqueueSqe0(Native.IORING_OP_ASYNC_CANCEL, (byte) 0, (short) 0, -1, 0, sqeToCancel, 0, 0,
213 udata, (short) 0, (short) 0, 0, 0);
214 }
215
216 int submit() {
217 int submit = tail - head;
218 return submit > 0 ? submit(submit, 0, 0) : 0;
219 }
220
221 int submitAndWait() {
222 int submit = tail - head;
223 if (submit > 0) {
224 return submit(submit, 1, Native.IORING_ENTER_GETEVENTS);
225 }
226 assert submit == 0;
227 int ret = ioUringEnter(0, 1, Native.IORING_ENTER_GETEVENTS);
228 if (ret < 0) {
229 throw new RuntimeException("ioUringEnter syscall returned " + ret);
230 }
231 return ret;
232 }
233
234 private int submit(int toSubmit, int minComplete, int flags) {
235 PlatformDependent.putIntOrdered(kTailAddress, tail);
236 int ret = ioUringEnter(toSubmit, minComplete, flags);
237 head = PlatformDependent.getIntVolatile(kHeadAddress);
238 if (ret != toSubmit) {
239 if (ret < 0) {
240 throw new RuntimeException("ioUringEnter syscall returned " + ret);
241 }
242 }
243 return ret;
244 }
245
246 private int ioUringEnter(int toSubmit, int minComplete, int flags) {
247 int f = enterFlags | flags;
248
249 if (IoUring.isSetupSubmitAllSupported()) {
250 return ioUringEnter0(toSubmit, minComplete, f);
251 }
252
253
254 int submitted = 0;
255 for (;;) {
256 int ret = ioUringEnter0(toSubmit, minComplete, f);
257 if (ret < 0) {
258 return ret;
259 }
260 submitted += ret;
261 if (ret == toSubmit) {
262 return submitted;
263 }
264 if (logger.isTraceEnabled()) {
265
266 logger.trace("Not all submissions succeeded. Only {} of {} SQEs were submitted.", ret, toSubmit);
267 }
268 toSubmit -= ret;
269 }
270 }
271
272 private int ioUringEnter0(int toSubmit, int minComplete, int f) {
273 if (logger.isTraceEnabled()) {
274 logger.trace("io_uring_enter(ring={}, enterRing={}, toSubmit={}, minComplete={}, flags={}): {}",
275 ringFd, enterRingFd, toSubmit, minComplete, f, toString());
276 }
277 return Native.ioUringEnter(enterRingFd, toSubmit, minComplete, f);
278 }
279
280 public int count() {
281 return tail - head;
282 }
283
284 public int remaining() {
285 return ringEntries - count();
286 }
287 }