1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.util.internal.PlatformDependent;
19 import io.netty.util.internal.logging.InternalLogger;
20 import io.netty.util.internal.logging.InternalLoggerFactory;
21
22 import java.util.StringJoiner;
23
24 import static java.lang.Math.max;
25 import static java.lang.Math.min;
26
27 final class SubmissionQueue {
28 private static final InternalLogger logger = InternalLoggerFactory.getInstance(SubmissionQueue.class);
29
30 private static final long SQE_SIZE = 64;
31 private static final int INT_SIZE = Integer.BYTES;
32 private static final int KERNEL_TIMESPEC_SIZE = 16;
33
34
35
36 private static final int SQE_OP_CODE_FIELD = 0;
37 private static final int SQE_FLAGS_FIELD = 1;
38 private static final int SQE_IOPRIO_FIELD = 2;
39 private static final int SQE_FD_FIELD = 4;
40 private static final int SQE_UNION1_FIELD = 8;
41 private static final int SQE_UNION2_FIELD = 16;
42 private static final int SQE_LEN_FIELD = 24;
43 private static final int SQE_UNION3_FIELD = 28;
44 private static final int SQE_USER_DATA_FIELD = 32;
45 private static final int SQE_UNION4_FIELD = 40;
46 private static final int SQE_PERSONALITY_FIELD = 42;
47 private static final int SQE_UNION5_FIELD = 44;
48 private static final int SQE_UNION6_FIELD = 48;
49
50 private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
51 private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
52
53
54 private final long kHeadAddress;
55 private final long kTailAddress;
56 private final long kFlagsAddress;
57 private final long kDroppedAddress;
58 private final long kArrayAddress;
59 final long submissionQueueArrayAddress;
60
61 final int ringEntries;
62 private final int ringMask;
63
64 final int ringSize;
65 final long ringAddress;
66 final int ringFd;
67 private final long timeoutMemoryAddress;
68 private int head;
69 private int tail;
70
71 SubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress,
72 long kFlagsAddress, long kDroppedAddress, long kArrayAddress,
73 long submissionQueueArrayAddress, int ringSize, long ringAddress, int ringFd) {
74 this.kHeadAddress = kHeadAddress;
75 this.kTailAddress = kTailAddress;
76 this.kFlagsAddress = kFlagsAddress;
77 this.kDroppedAddress = kDroppedAddress;
78 this.kArrayAddress = kArrayAddress;
79 this.submissionQueueArrayAddress = submissionQueueArrayAddress;
80 this.ringSize = ringSize;
81 this.ringAddress = ringAddress;
82 this.ringFd = ringFd;
83 this.ringEntries = PlatformDependent.getIntVolatile(kRingEntriesAddress);
84 this.ringMask = PlatformDependent.getIntVolatile(kRingMaskAddress);
85 this.head = PlatformDependent.getIntVolatile(kHeadAddress);
86 this.tail = PlatformDependent.getIntVolatile(kTailAddress);
87
88 this.timeoutMemoryAddress = PlatformDependent.allocateMemory(KERNEL_TIMESPEC_SIZE);
89
90
91 PlatformDependent.setMemory(submissionQueueArrayAddress, ringEntries * SQE_SIZE, (byte) 0);
92
93
94 long address = kArrayAddress;
95 for (int i = 0; i < ringEntries; i++, address += INT_SIZE) {
96 PlatformDependent.putInt(address, i);
97 }
98 }
99
100 private long enqueueSqe0(byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len,
101 int union3, long udata, short union4, short personality, int union5, long union6) {
102 int pending = tail - head;
103 if (pending == ringEntries) {
104 int submitted = submit();
105 if (submitted == 0) {
106
107 throw new RuntimeException("SQ ring full and no submissions accepted");
108 }
109 }
110 long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE;
111 setData(sqe, opcode, flags, ioPrio, fd, union1, union2, len,
112 union3, udata, union4, personality, union5, union6);
113 return udata;
114 }
115
116 void enqueueSqe(byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len, int union3,
117 long udata, short union4, short personality, int union5, long union6) {
118 int pending = tail - head;
119 if (pending == ringEntries) {
120 int submitted = submit();
121 if (submitted == 0) {
122
123 throw new RuntimeException("SQ ring full and no submissions accepted");
124 }
125 }
126 long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE;
127 setData(sqe, opcode, flags, ioPrio, fd, union1, union2, len,
128 union3, udata, union4, personality, union5, union6);
129 }
130
131 private void setData(long sqe, byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len,
132 int union3, long udata, short union4, short personality, int union5, long union6) {
133
134
135 PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, opcode);
136 PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, flags);
137
138 PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, ioPrio);
139 PlatformDependent.putInt(sqe + SQE_FD_FIELD, fd);
140 PlatformDependent.putLong(sqe + SQE_UNION1_FIELD, union1);
141 PlatformDependent.putLong(sqe + SQE_UNION2_FIELD, union2);
142 PlatformDependent.putInt(sqe + SQE_LEN_FIELD, len);
143 PlatformDependent.putInt(sqe + SQE_UNION3_FIELD, union3);
144 PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, udata);
145 PlatformDependent.putShort(sqe + SQE_UNION4_FIELD, union4);
146 PlatformDependent.putShort(sqe + SQE_PERSONALITY_FIELD, personality);
147 PlatformDependent.putInt(sqe + SQE_UNION5_FIELD, union5);
148 PlatformDependent.putLong(sqe + SQE_UNION6_FIELD, union6);
149
150 if (logger.isTraceEnabled()) {
151 if (opcode == Native.IORING_OP_WRITEV || opcode == Native.IORING_OP_READV) {
152 logger.trace("add(ring {}): {}(fd={}, len={} ({} bytes), off={}, data={})",
153 ringFd, Native.opToStr(opcode), fd, len, Iov.sumSize(union2, len), union1, udata);
154 } else {
155 logger.trace("add(ring {}): {}(fd={}, len={}, off={}, data={})",
156 ringFd, Native.opToStr(opcode), fd, len, union1, udata);
157 }
158 }
159 }
160
161 @Override
162 public String toString() {
163 StringJoiner sb = new StringJoiner(", ", "SubmissionQueue [", "]");
164 int pending = tail - head;
165 for (int i = 0; i < pending; i++) {
166 long sqe = submissionQueueArrayAddress + (head + i & ringMask) * SQE_SIZE;
167 sb.add(Native.opToStr(PlatformDependent.getByte(sqe + SQE_OP_CODE_FIELD)) +
168 "(fd=" + PlatformDependent.getInt(sqe + SQE_FD_FIELD) + ')');
169 }
170 return sb.toString();
171 }
172
173 long addNop(int fd, byte flags, long udata) {
174 return enqueueSqe0(Native.IORING_OP_NOP, flags, (short) 0, fd, 0, 0, 0, 0, udata,
175 (short) 0, (short) 0, 0, 0);
176 }
177
178 long addTimeout(int fd, long nanoSeconds, long udata) {
179 setTimeout(nanoSeconds);
180 return enqueueSqe0(Native.IORING_OP_TIMEOUT, (byte) 0, (short) 0, fd, 0, timeoutMemoryAddress, 1,
181 0, udata, (short) 0, (short) 0, 0, 0);
182 }
183
184 long addLinkTimeout(int fd, long nanoSeconds, long extraData) {
185 setTimeout(nanoSeconds);
186 return enqueueSqe0(Native.IORING_OP_LINK_TIMEOUT, (byte) 0, (short) 0, fd, 0, timeoutMemoryAddress, 1,
187 0, extraData, (short) 0, (short) 0, 0, 0);
188 }
189
190 long addEventFdRead(int fd, long bufferAddress, int pos, int limit, long udata) {
191 return enqueueSqe0(Native.IORING_OP_READ, (byte) 0, (short) 0, fd, 0, bufferAddress + pos, limit - pos,
192 0, udata, (short) 0, (short) 0, 0, 0);
193 }
194
195 long addCancel(int fd, long sqeToCancel, long udata) {
196 return enqueueSqe0(Native.IORING_OP_ASYNC_CANCEL, (byte) 0, (short) 0, fd, 0, sqeToCancel, 0, 0,
197 udata, (short) 0, (short) 0, 0, 0);
198 }
199
200 int submit() {
201 int submit = tail - head;
202 return submit > 0 ? submit(submit, 0, 0) : 0;
203 }
204
205 int submitAndWait() {
206 int submit = tail - head;
207 if (submit > 0) {
208 return submit(submit, 1, Native.IORING_ENTER_GETEVENTS);
209 }
210 assert submit == 0;
211 int ret = Native.ioUringEnter(ringFd, 0, 1, Native.IORING_ENTER_GETEVENTS);
212 if (ret < 0) {
213 throw new RuntimeException("ioUringEnter syscall returned " + ret);
214 }
215 return ret;
216 }
217
218 private int submit(int toSubmit, int minComplete, int flags) {
219 if (logger.isTraceEnabled()) {
220 logger.trace("submit(ring {}): {}", ringFd, toString());
221 }
222 PlatformDependent.putIntOrdered(kTailAddress, tail);
223 int ret = Native.ioUringEnter(ringFd, toSubmit, minComplete, flags);
224 head = PlatformDependent.getIntVolatile(kHeadAddress);
225 if (ret != toSubmit) {
226 if (ret < 0) {
227 throw new RuntimeException("ioUringEnter syscall returned " + ret);
228 }
229
230 logger.trace("Not all submissions succeeded. Only {} of {} SQEs were submitted.", ret, toSubmit);
231 }
232 return ret;
233 }
234
235 private void setTimeout(long timeoutNanoSeconds) {
236 long seconds, nanoSeconds;
237
238 if (timeoutNanoSeconds == 0) {
239 seconds = 0;
240 nanoSeconds = 0;
241 } else {
242 seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
243 nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
244 }
245
246 PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
247 PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
248 }
249
250 public int count() {
251 return tail - head;
252 }
253
254 public int remaining() {
255 return ringEntries - count();
256 }
257
258
259 public void release() {
260 PlatformDependent.freeMemory(timeoutMemoryAddress);
261 }
262 }