1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.util.internal.PlatformDependent;
19 import io.netty.util.internal.logging.InternalLogger;
20 import io.netty.util.internal.logging.InternalLoggerFactory;
21
22 import java.util.StringJoiner;
23 import java.util.function.IntSupplier;
24
25 import static java.lang.Math.max;
26 import static java.lang.Math.min;
27
28 final class SubmissionQueue {
29 private static final InternalLogger logger = InternalLoggerFactory.getInstance(SubmissionQueue.class);
30
31 private static final long SQE_SIZE = 64;
32 private static final int INT_SIZE = Integer.BYTES;
33 private static final int KERNEL_TIMESPEC_SIZE = 16;
34
35
36
37 private static final int SQE_OP_CODE_FIELD = 0;
38 private static final int SQE_FLAGS_FIELD = 1;
39 private static final int SQE_IOPRIO_FIELD = 2;
40 private static final int SQE_FD_FIELD = 4;
41 private static final int SQE_OFFSET_FIELD = 8;
42 private static final int SQE_ADDRESS_FIELD = 16;
43 private static final int SQE_LEN_FIELD = 24;
44 private static final int SQE_RW_FLAGS_FIELD = 28;
45 private static final int SQE_USER_DATA_FIELD = 32;
46 private static final int SQE_PAD_FIELD = 40;
47
48 private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
49 private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
50
51
52 private final long kHeadAddress;
53 private final long kTailAddress;
54 private final long kFlagsAddress;
55 private final long kDroppedAddress;
56 private final long kArrayAddress;
57 final long submissionQueueArrayAddress;
58
59 final int ringEntries;
60 private final int ringMask;
61
62 final int ringSize;
63 final long ringAddress;
64 final int ringFd;
65 private final long timeoutMemoryAddress;
66 private final IntSupplier completionCount;
67 private int numHandledFds;
68 private int head;
69 private int tail;
70
71 SubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress,
72 long kFlagsAddress, long kDroppedAddress, long kArrayAddress,
73 long submissionQueueArrayAddress, int ringSize, long ringAddress, int ringFd,
74 IntSupplier completionCount) {
75 this.kHeadAddress = kHeadAddress;
76 this.kTailAddress = kTailAddress;
77 this.kFlagsAddress = kFlagsAddress;
78 this.kDroppedAddress = kDroppedAddress;
79 this.kArrayAddress = kArrayAddress;
80 this.submissionQueueArrayAddress = submissionQueueArrayAddress;
81 this.ringSize = ringSize;
82 this.ringAddress = ringAddress;
83 this.ringFd = ringFd;
84 this.ringEntries = PlatformDependent.getIntVolatile(kRingEntriesAddress);
85 this.ringMask = PlatformDependent.getIntVolatile(kRingMaskAddress);
86 this.head = PlatformDependent.getIntVolatile(kHeadAddress);
87 this.tail = PlatformDependent.getIntVolatile(kTailAddress);
88
89 this.timeoutMemoryAddress = PlatformDependent.allocateMemory(KERNEL_TIMESPEC_SIZE);
90 this.completionCount = completionCount;
91
92
93 PlatformDependent.setMemory(submissionQueueArrayAddress, ringEntries * SQE_SIZE, (byte) 0);
94
95
96 long address = kArrayAddress;
97 for (int i = 0; i < ringEntries; i++, address += INT_SIZE) {
98 PlatformDependent.putInt(address, i);
99 }
100 }
101
102 void incrementHandledFds() {
103 numHandledFds++;
104 }
105
106 void decrementHandledFds() {
107 numHandledFds--;
108 assert numHandledFds >= 0;
109 }
110
111 long enqueueSqe(byte op, int flags, short ioPrio, int rwFlags, int fd,
112 long bufferAddress, int length, long offset, int id, short data) {
113 int pending = tail - head;
114 if (pending == ringEntries) {
115 int submitted = submit();
116 if (submitted == 0) {
117
118 throw new RuntimeException("SQ ring full and no submissions accepted");
119 }
120 }
121 long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE;
122 long udata = UserData.encode(id, op, data);
123 setData(sqe, op, flags, ioPrio, rwFlags, fd,
124 bufferAddress, length, offset, udata);
125 return udata;
126 }
127
128 void enqueueSqe(byte op, int flags, short ioPrio, int rwFlags, int fd,
129 long bufferAddress, int length, long offset, long udata) {
130 int pending = tail - head;
131 if (pending == ringEntries) {
132 int submitted = submit();
133 if (submitted == 0) {
134
135 throw new RuntimeException("SQ ring full and no submissions accepted");
136 }
137 }
138 long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE;
139 setData(sqe, op, flags, ioPrio, rwFlags, fd, bufferAddress, length, offset, udata);
140 }
141
142 private void setData(long sqe, byte op, int flags, short ioPrio, int rwFlags, int fd, long bufferAddress,
143 int length, long offset, long udata) {
144
145
146 PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, op);
147 PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) flags);
148
149 PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, ioPrio);
150 PlatformDependent.putInt(sqe + SQE_FD_FIELD, fd);
151 PlatformDependent.putLong(sqe + SQE_OFFSET_FIELD, offset);
152 PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, bufferAddress);
153 PlatformDependent.putInt(sqe + SQE_LEN_FIELD, length);
154 PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, rwFlags);
155 PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, udata);
156
157 if (logger.isTraceEnabled()) {
158 if (op == Native.IORING_OP_WRITEV || op == Native.IORING_OP_READV) {
159 logger.trace("add(ring {}): {}(fd={}, len={} ({} bytes), off={}, data={})",
160 ringFd, Native.opToStr(op), fd, length, Iov.sumSize(bufferAddress, length), offset, udata);
161 } else {
162 logger.trace("add(ring {}): {}(fd={}, len={}, off={}, data={})",
163 ringFd, Native.opToStr(op), fd, length, offset, udata);
164 }
165 }
166 }
167
168 @Override
169 public String toString() {
170 StringJoiner sb = new StringJoiner(", ", "SubmissionQueue [", "]");
171 int pending = tail - head;
172 for (int i = 0; i < pending; i++) {
173 long sqe = submissionQueueArrayAddress + (head + i & ringMask) * SQE_SIZE;
174 sb.add(Native.opToStr(PlatformDependent.getByte(sqe + SQE_OP_CODE_FIELD)) +
175 "(fd=" + PlatformDependent.getInt(sqe + SQE_FD_FIELD) + ')');
176 }
177 return sb.toString();
178 }
179
180 long addNop(int fd, int flags, int id, short data) {
181 return enqueueSqe(Native.IORING_OP_NOP, flags, (short) 0, 0, fd, 0, 0, 0, id, data);
182 }
183
184 long addTimeout(int fd, long nanoSeconds, int id, short extraData) {
185 setTimeout(nanoSeconds);
186 return enqueueSqe(Native.IORING_OP_TIMEOUT, 0, (short) 0, 0, fd,
187 timeoutMemoryAddress, 1, 0, id, extraData);
188 }
189
190 long addLinkTimeout(int fd, long nanoSeconds, int id, short extraData) {
191 setTimeout(nanoSeconds);
192 return enqueueSqe(Native.IORING_OP_LINK_TIMEOUT, 0, (short) 0, 0, fd,
193 timeoutMemoryAddress, 1, 0, id, extraData);
194 }
195
196 long addEventFdRead(int fd, long bufferAddress, int pos, int limit, int id, short extraData) {
197 return enqueueSqe(Native.IORING_OP_READ, 0, (short) 0, 0, fd,
198 bufferAddress + pos, limit - pos, 0, id, extraData);
199 }
200
201 long addCancel(int fd, long sqeToCancel, int id) {
202 return enqueueSqe(Native.IORING_OP_ASYNC_CANCEL, 0, (short) 0, 0, fd, sqeToCancel, 0, 0, id, (short) 0);
203 }
204
205 int submit() {
206 int submit = tail - head;
207 return submit > 0 ? submit(submit, 0, 0) : 0;
208 }
209
210 int submitAndWait() {
211 int submit = tail - head;
212 if (submit > 0) {
213 return submit(submit, 1, Native.IORING_ENTER_GETEVENTS);
214 }
215 assert submit == 0;
216 int ret = Native.ioUringEnter(ringFd, 0, 1, Native.IORING_ENTER_GETEVENTS);
217 if (ret < 0) {
218 throw new RuntimeException("ioUringEnter syscall returned " + ret);
219 }
220 return ret;
221 }
222
223 private int submit(int toSubmit, int minComplete, int flags) {
224 if (logger.isTraceEnabled()) {
225 logger.trace("submit(ring {}): {}", ringFd, toString());
226 }
227 PlatformDependent.putIntOrdered(kTailAddress, tail);
228 int ret = Native.ioUringEnter(ringFd, toSubmit, minComplete, flags);
229 head = PlatformDependent.getIntVolatile(kHeadAddress);
230 if (ret != toSubmit) {
231 if (ret < 0) {
232 throw new RuntimeException("ioUringEnter syscall returned " + ret);
233 }
234 logger.warn("Not all submissions succeeded. Only {} of {} SQEs were submitted, " +
235 "while there are {} pending completions.", ret, toSubmit, completionCount.getAsInt());
236 }
237 return ret;
238 }
239
240 private void setTimeout(long timeoutNanoSeconds) {
241 long seconds, nanoSeconds;
242
243 if (timeoutNanoSeconds == 0) {
244 seconds = 0;
245 nanoSeconds = 0;
246 } else {
247 seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
248 nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
249 }
250
251 PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
252 PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
253 }
254
255 public int count() {
256 return tail - head;
257 }
258
259 public int remaining() {
260 return ringEntries - count();
261 }
262
263
264 public void release() {
265 PlatformDependent.freeMemory(timeoutMemoryAddress);
266 }
267 }