1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.util.internal.PlatformDependent;
19 import io.netty.util.internal.logging.InternalLogger;
20 import io.netty.util.internal.logging.InternalLoggerFactory;
21
22 import java.util.StringJoiner;
23 import java.util.function.IntSupplier;
24
25 import static java.lang.Math.max;
26 import static java.lang.Math.min;
27
28 final class SubmissionQueue {
29 private static final InternalLogger logger = InternalLoggerFactory.getInstance(SubmissionQueue.class);
30
31 private static final long SQE_SIZE = 64;
32 private static final int INT_SIZE = Integer.BYTES;
33 private static final int KERNEL_TIMESPEC_SIZE = 16;
34
35
36
37 private static final int SQE_OP_CODE_FIELD = 0;
38 private static final int SQE_FLAGS_FIELD = 1;
39 private static final int SQE_IOPRIO_FIELD = 2;
40 private static final int SQE_FD_FIELD = 4;
41 private static final int SQE_UNION1_FIELD = 8;
42 private static final int SQE_UNION2_FIELD = 16;
43 private static final int SQE_LEN_FIELD = 24;
44 private static final int SQE_UNION3_FIELD = 28;
45 private static final int SQE_USER_DATA_FIELD = 32;
46 private static final int SQE_UNION4_FIELD = 40;
47 private static final int SQE_PERSONALITY_FIELD = 42;
48 private static final int SQE_UNION5_FIELD = 44;
49 private static final int SQE_UNION6_FIELD = 48;
50
51 private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
52 private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
53
54
55 private final long kHeadAddress;
56 private final long kTailAddress;
57 private final long kFlagsAddress;
58 private final long kDroppedAddress;
59 private final long kArrayAddress;
60 final long submissionQueueArrayAddress;
61
62 final int ringEntries;
63 private final int ringMask;
64
65 final int ringSize;
66 final long ringAddress;
67 final int ringFd;
68 private final long timeoutMemoryAddress;
69 private final IntSupplier completionCount;
70 private int numHandledFds;
71 private int head;
72 private int tail;
73
74 SubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress,
75 long kFlagsAddress, long kDroppedAddress, long kArrayAddress,
76 long submissionQueueArrayAddress, int ringSize, long ringAddress, int ringFd,
77 IntSupplier completionCount) {
78 this.kHeadAddress = kHeadAddress;
79 this.kTailAddress = kTailAddress;
80 this.kFlagsAddress = kFlagsAddress;
81 this.kDroppedAddress = kDroppedAddress;
82 this.kArrayAddress = kArrayAddress;
83 this.submissionQueueArrayAddress = submissionQueueArrayAddress;
84 this.ringSize = ringSize;
85 this.ringAddress = ringAddress;
86 this.ringFd = ringFd;
87 this.ringEntries = PlatformDependent.getIntVolatile(kRingEntriesAddress);
88 this.ringMask = PlatformDependent.getIntVolatile(kRingMaskAddress);
89 this.head = PlatformDependent.getIntVolatile(kHeadAddress);
90 this.tail = PlatformDependent.getIntVolatile(kTailAddress);
91
92 this.timeoutMemoryAddress = PlatformDependent.allocateMemory(KERNEL_TIMESPEC_SIZE);
93 this.completionCount = completionCount;
94
95
96 PlatformDependent.setMemory(submissionQueueArrayAddress, ringEntries * SQE_SIZE, (byte) 0);
97
98
99 long address = kArrayAddress;
100 for (int i = 0; i < ringEntries; i++, address += INT_SIZE) {
101 PlatformDependent.putInt(address, i);
102 }
103 }
104
105 void incrementHandledFds() {
106 numHandledFds++;
107 }
108
109 void decrementHandledFds() {
110 numHandledFds--;
111 assert numHandledFds >= 0;
112 }
113
114 private long enqueueSqe0(int id, byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len,
115 int union3, short data, short union4, short personality, int union5, long union6) {
116 int pending = tail - head;
117 if (pending == ringEntries) {
118 int submitted = submit();
119 if (submitted == 0) {
120
121 throw new RuntimeException("SQ ring full and no submissions accepted");
122 }
123 }
124 long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE;
125 long udata = UserData.encode(id, opcode, data);
126 setData(sqe, opcode, flags, ioPrio, fd, union1, union2, len,
127 union3, udata, union4, personality, union5, union6);
128 return udata;
129 }
130
131 void enqueueSqe(byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len, int union3,
132 long udata, short union4, short personality, int union5, long union6) {
133 int pending = tail - head;
134 if (pending == ringEntries) {
135 int submitted = submit();
136 if (submitted == 0) {
137
138 throw new RuntimeException("SQ ring full and no submissions accepted");
139 }
140 }
141 long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE;
142 setData(sqe, opcode, flags, ioPrio, fd, union1, union2, len,
143 union3, udata, union4, personality, union5, union6);
144 }
145
146 private void setData(long sqe, byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len,
147 int union3, long udata, short union4, short personality, int union5, long union6) {
148
149
150 PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, opcode);
151 PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, flags);
152
153 PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, ioPrio);
154 PlatformDependent.putInt(sqe + SQE_FD_FIELD, fd);
155 PlatformDependent.putLong(sqe + SQE_UNION1_FIELD, union1);
156 PlatformDependent.putLong(sqe + SQE_UNION2_FIELD, union2);
157 PlatformDependent.putInt(sqe + SQE_LEN_FIELD, len);
158 PlatformDependent.putInt(sqe + SQE_UNION3_FIELD, union3);
159 PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, udata);
160 PlatformDependent.putShort(sqe + SQE_UNION4_FIELD, union4);
161 PlatformDependent.putShort(sqe + SQE_PERSONALITY_FIELD, personality);
162 PlatformDependent.putInt(sqe + SQE_UNION5_FIELD, union5);
163 PlatformDependent.putLong(sqe + SQE_UNION6_FIELD, union6);
164
165 if (logger.isTraceEnabled()) {
166 if (opcode == Native.IORING_OP_WRITEV || opcode == Native.IORING_OP_READV) {
167 logger.trace("add(ring {}): {}(fd={}, len={} ({} bytes), off={}, data={})",
168 ringFd, Native.opToStr(opcode), fd, len, Iov.sumSize(union2, len), union1, udata);
169 } else {
170 logger.trace("add(ring {}): {}(fd={}, len={}, off={}, data={})",
171 ringFd, Native.opToStr(opcode), fd, len, union1, udata);
172 }
173 }
174 }
175
176 @Override
177 public String toString() {
178 StringJoiner sb = new StringJoiner(", ", "SubmissionQueue [", "]");
179 int pending = tail - head;
180 for (int i = 0; i < pending; i++) {
181 long sqe = submissionQueueArrayAddress + (head + i & ringMask) * SQE_SIZE;
182 sb.add(Native.opToStr(PlatformDependent.getByte(sqe + SQE_OP_CODE_FIELD)) +
183 "(fd=" + PlatformDependent.getInt(sqe + SQE_FD_FIELD) + ')');
184 }
185 return sb.toString();
186 }
187
188 long addNop(int fd, byte flags, int id, short data) {
189 return enqueueSqe0(id, Native.IORING_OP_NOP, flags, (short) 0, fd, 0, 0, 0, 0, data,
190 (short) 0, (short) 0, 0, 0);
191 }
192
193 long addTimeout(int fd, long nanoSeconds, int id, short extraData) {
194 setTimeout(nanoSeconds);
195 return enqueueSqe0(id, Native.IORING_OP_TIMEOUT, (byte) 0, (short) 0, fd, 0, timeoutMemoryAddress, 1,
196 0, extraData, (short) 0, (short) 0, 0, 0);
197 }
198
199 long addLinkTimeout(int fd, long nanoSeconds, int id, short extraData) {
200 setTimeout(nanoSeconds);
201 return enqueueSqe0(id, Native.IORING_OP_LINK_TIMEOUT, (byte) 0, (short) 0, fd, 0, timeoutMemoryAddress, 1,
202 0, extraData, (short) 0, (short) 0, 0, 0);
203 }
204
205 long addEventFdRead(int fd, long bufferAddress, int pos, int limit, int id, short extraData) {
206 return enqueueSqe0(id, Native.IORING_OP_READ, (byte) 0, (short) 0, fd, 0, bufferAddress + pos, limit - pos,
207 0, extraData, (short) 0, (short) 0, 0, 0);
208 }
209
210 long addCancel(int fd, long sqeToCancel, int id) {
211 return enqueueSqe0(id, Native.IORING_OP_ASYNC_CANCEL, (byte) 0, (short) 0, fd, 0, sqeToCancel, 0, 0,
212 (short) 0, (short) 0, (short) 0, 0, 0);
213 }
214
215 int submit() {
216 int submit = tail - head;
217 return submit > 0 ? submit(submit, 0, 0) : 0;
218 }
219
220 int submitAndWait() {
221 int submit = tail - head;
222 if (submit > 0) {
223 return submit(submit, 1, Native.IORING_ENTER_GETEVENTS);
224 }
225 assert submit == 0;
226 int ret = Native.ioUringEnter(ringFd, 0, 1, Native.IORING_ENTER_GETEVENTS);
227 if (ret < 0) {
228 throw new RuntimeException("ioUringEnter syscall returned " + ret);
229 }
230 return ret;
231 }
232
233 private int submit(int toSubmit, int minComplete, int flags) {
234 if (logger.isTraceEnabled()) {
235 logger.trace("submit(ring {}): {}", ringFd, toString());
236 }
237 PlatformDependent.putIntOrdered(kTailAddress, tail);
238 int ret = Native.ioUringEnter(ringFd, toSubmit, minComplete, flags);
239 head = PlatformDependent.getIntVolatile(kHeadAddress);
240 if (ret != toSubmit) {
241 if (ret < 0) {
242 throw new RuntimeException("ioUringEnter syscall returned " + ret);
243 }
244 logger.warn("Not all submissions succeeded. Only {} of {} SQEs were submitted, " +
245 "while there are {} pending completions.", ret, toSubmit, completionCount.getAsInt());
246 }
247 return ret;
248 }
249
250 private void setTimeout(long timeoutNanoSeconds) {
251 long seconds, nanoSeconds;
252
253 if (timeoutNanoSeconds == 0) {
254 seconds = 0;
255 nanoSeconds = 0;
256 } else {
257 seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
258 nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
259 }
260
261 PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
262 PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
263 }
264
265 public int count() {
266 return tail - head;
267 }
268
269 public int remaining() {
270 return ringEntries - count();
271 }
272
273
274 public void release() {
275 PlatformDependent.freeMemory(timeoutMemoryAddress);
276 }
277 }