View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.util.internal.PlatformDependent;
19  import io.netty.util.internal.logging.InternalLogger;
20  import io.netty.util.internal.logging.InternalLoggerFactory;
21  
22  import java.util.StringJoiner;
23  import java.util.function.IntSupplier;
24  
25  import static java.lang.Math.max;
26  import static java.lang.Math.min;
27  
28  final class SubmissionQueue {
29      private static final InternalLogger logger = InternalLoggerFactory.getInstance(SubmissionQueue.class);
30  
31      private static final long SQE_SIZE = 64;
32      private static final int INT_SIZE = Integer.BYTES; //no 32 Bit support?
33      private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec
34  
35      //these offsets are used to access specific properties
36      //SQE https://github.com/axboe/liburing/blob/liburing-2.6/src/include/liburing/io_uring.h#L30
37      private static final int SQE_OP_CODE_FIELD = 0;
38      private static final int SQE_FLAGS_FIELD = 1;
39      private static final int SQE_IOPRIO_FIELD = 2; // u16
40      private static final int SQE_FD_FIELD = 4; // s32
41      private static final int SQE_UNION1_FIELD = 8;
42      private static final int SQE_UNION2_FIELD = 16;
43      private static final int SQE_LEN_FIELD = 24;
44      private static final int SQE_UNION3_FIELD = 28;
45      private static final int SQE_USER_DATA_FIELD = 32;
46      private static final int SQE_UNION4_FIELD = 40;
47      private static final int SQE_PERSONALITY_FIELD = 42;
48      private static final int SQE_UNION5_FIELD = 44;
49      private static final int SQE_UNION6_FIELD = 48;
50  
51      private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
52      private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
53  
54      //these unsigned integer pointers(shared with the kernel) will be changed by the kernel
55      private final long kHeadAddress;
56      private final long kTailAddress;
57      private final long kFlagsAddress;
58      private final long kDroppedAddress;
59      private final long kArrayAddress;
60      final long submissionQueueArrayAddress;
61  
62      final int ringEntries;
63      private final int ringMask; // = ringEntries - 1
64  
65      final int ringSize;
66      final long ringAddress;
67      final int ringFd;
68      private final long timeoutMemoryAddress;
69      private final IntSupplier completionCount;
70      private int numHandledFds;
71      private int head;
72      private int tail;
73  
74      SubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress,
75                      long kFlagsAddress, long kDroppedAddress, long kArrayAddress,
76                      long submissionQueueArrayAddress, int ringSize, long ringAddress, int ringFd,
77                      IntSupplier completionCount) {
78          this.kHeadAddress = kHeadAddress;
79          this.kTailAddress = kTailAddress;
80          this.kFlagsAddress = kFlagsAddress;
81          this.kDroppedAddress = kDroppedAddress;
82          this.kArrayAddress = kArrayAddress;
83          this.submissionQueueArrayAddress = submissionQueueArrayAddress;
84          this.ringSize = ringSize;
85          this.ringAddress = ringAddress;
86          this.ringFd = ringFd;
87          this.ringEntries = PlatformDependent.getIntVolatile(kRingEntriesAddress);
88          this.ringMask = PlatformDependent.getIntVolatile(kRingMaskAddress);
89          this.head = PlatformDependent.getIntVolatile(kHeadAddress);
90          this.tail = PlatformDependent.getIntVolatile(kTailAddress);
91  
92          this.timeoutMemoryAddress = PlatformDependent.allocateMemory(KERNEL_TIMESPEC_SIZE);
93          this.completionCount = completionCount;
94  
95          // Zero the whole SQE array first
96          PlatformDependent.setMemory(submissionQueueArrayAddress, ringEntries * SQE_SIZE, (byte) 0);
97  
98          // Fill SQ array indices (1-1 with SQE array) and set nonzero constant SQE fields
99          long address = kArrayAddress;
100         for (int i = 0; i < ringEntries; i++, address += INT_SIZE) {
101             PlatformDependent.putInt(address, i);
102         }
103     }
104 
105     void incrementHandledFds() {
106         numHandledFds++;
107     }
108 
109     void decrementHandledFds() {
110         numHandledFds--;
111         assert numHandledFds >= 0;
112     }
113 
114     private long enqueueSqe0(int id, byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len,
115                              int union3, short data, short union4, short personality, int union5, long union6) {
116         int pending = tail - head;
117         if (pending == ringEntries) {
118             int submitted = submit();
119             if (submitted == 0) {
120                 // We have a problem, could not submit to make more room in the ring
121                 throw new RuntimeException("SQ ring full and no submissions accepted");
122             }
123         }
124         long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE;
125         long udata = UserData.encode(id, opcode, data);
126         setData(sqe, opcode, flags, ioPrio, fd, union1, union2, len,
127                 union3, udata, union4, personality, union5, union6);
128         return udata;
129     }
130 
131     void enqueueSqe(byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len, int union3,
132                     long udata, short union4, short personality, int union5, long union6) {
133         int pending = tail - head;
134         if (pending == ringEntries) {
135             int submitted = submit();
136             if (submitted == 0) {
137                 // We have a problem, could not submit to make more room in the ring
138                 throw new RuntimeException("SQ ring full and no submissions accepted");
139             }
140         }
141         long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE;
142         setData(sqe, opcode, flags, ioPrio, fd, union1, union2, len,
143                 union3, udata, union4, personality, union5, union6);
144     }
145 
146     private void setData(long sqe, byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len,
147                          int union3, long udata, short union4, short personality, int union5, long union6) {
148         //set sqe(submission queue) properties
149 
150         PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, opcode);
151         PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, flags);
152         // This constant is set up-front
153         PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, ioPrio);
154         PlatformDependent.putInt(sqe + SQE_FD_FIELD, fd);
155         PlatformDependent.putLong(sqe + SQE_UNION1_FIELD, union1);
156         PlatformDependent.putLong(sqe + SQE_UNION2_FIELD, union2);
157         PlatformDependent.putInt(sqe + SQE_LEN_FIELD, len);
158         PlatformDependent.putInt(sqe + SQE_UNION3_FIELD, union3);
159         PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, udata);
160         PlatformDependent.putShort(sqe + SQE_UNION4_FIELD, union4);
161         PlatformDependent.putShort(sqe + SQE_PERSONALITY_FIELD, personality);
162         PlatformDependent.putInt(sqe + SQE_UNION5_FIELD, union5);
163         PlatformDependent.putLong(sqe + SQE_UNION6_FIELD, union6);
164 
165         if (logger.isTraceEnabled()) {
166             if (opcode == Native.IORING_OP_WRITEV || opcode == Native.IORING_OP_READV) {
167                 logger.trace("add(ring {}): {}(fd={}, len={} ({} bytes), off={}, data={})",
168                         ringFd, Native.opToStr(opcode), fd, len, Iov.sumSize(union2, len), union1, udata);
169             } else {
170                 logger.trace("add(ring {}): {}(fd={}, len={}, off={}, data={})",
171                         ringFd, Native.opToStr(opcode), fd, len, union1, udata);
172             }
173         }
174     }
175 
176     @Override
177     public String toString() {
178         StringJoiner sb = new StringJoiner(", ", "SubmissionQueue [", "]");
179         int pending = tail - head;
180         for (int i = 0; i < pending; i++) {
181             long sqe = submissionQueueArrayAddress + (head + i & ringMask) * SQE_SIZE;
182             sb.add(Native.opToStr(PlatformDependent.getByte(sqe + SQE_OP_CODE_FIELD)) +
183                     "(fd=" + PlatformDependent.getInt(sqe + SQE_FD_FIELD) + ')');
184         }
185         return sb.toString();
186     }
187 
188     long addNop(int fd, byte flags, int id, short data) {
189         return enqueueSqe0(id, Native.IORING_OP_NOP, flags, (short) 0, fd, 0, 0, 0, 0, data,
190                 (short) 0, (short) 0, 0, 0);
191     }
192 
193     long addTimeout(int fd, long nanoSeconds, int id, short extraData) {
194         setTimeout(nanoSeconds);
195         return enqueueSqe0(id, Native.IORING_OP_TIMEOUT, (byte) 0, (short) 0, fd, 0, timeoutMemoryAddress, 1,
196                 0, extraData, (short) 0, (short) 0, 0, 0);
197     }
198 
199     long addLinkTimeout(int fd, long nanoSeconds, int id, short extraData) {
200         setTimeout(nanoSeconds);
201         return enqueueSqe0(id, Native.IORING_OP_LINK_TIMEOUT, (byte) 0, (short) 0, fd, 0, timeoutMemoryAddress, 1,
202                 0, extraData, (short) 0, (short) 0, 0, 0);
203     }
204 
205     long addEventFdRead(int fd, long bufferAddress, int pos, int limit, int id, short extraData) {
206         return enqueueSqe0(id, Native.IORING_OP_READ, (byte) 0, (short) 0, fd, 0, bufferAddress + pos, limit - pos,
207                 0, extraData, (short) 0, (short) 0, 0, 0);
208     }
209 
210     long addCancel(int fd, long sqeToCancel, int id) {
211         return enqueueSqe0(id, Native.IORING_OP_ASYNC_CANCEL, (byte) 0, (short) 0, fd, 0, sqeToCancel, 0, 0,
212                 (short) 0, (short) 0, (short) 0, 0, 0);
213     }
214 
215     int submit() {
216         int submit = tail - head;
217         return submit > 0 ? submit(submit, 0, 0) : 0;
218     }
219 
220     int submitAndWait() {
221         int submit = tail - head;
222         if (submit > 0) {
223             return submit(submit, 1, Native.IORING_ENTER_GETEVENTS);
224         }
225         assert submit == 0;
226         int ret = Native.ioUringEnter(ringFd, 0, 1, Native.IORING_ENTER_GETEVENTS);
227         if (ret < 0) {
228             throw new RuntimeException("ioUringEnter syscall returned " + ret);
229         }
230         return ret; // should be 0
231     }
232 
233     private int submit(int toSubmit, int minComplete, int flags) {
234         if (logger.isTraceEnabled()) {
235             logger.trace("submit(ring {}): {}", ringFd, toString());
236         }
237         PlatformDependent.putIntOrdered(kTailAddress, tail); // release memory barrier
238         int ret = Native.ioUringEnter(ringFd, toSubmit, minComplete, flags);
239         head = PlatformDependent.getIntVolatile(kHeadAddress); // acquire memory barrier
240         if (ret != toSubmit) {
241             if (ret < 0) {
242                 throw new RuntimeException("ioUringEnter syscall returned " + ret);
243             }
244             logger.warn("Not all submissions succeeded. Only {} of {} SQEs were submitted, " +
245                     "while there are {} pending completions.", ret, toSubmit, completionCount.getAsInt());
246         }
247         return ret;
248     }
249 
250     private void setTimeout(long timeoutNanoSeconds) {
251         long seconds, nanoSeconds;
252 
253         if (timeoutNanoSeconds == 0) {
254             seconds = 0;
255             nanoSeconds = 0;
256         } else {
257             seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
258             nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
259         }
260 
261         PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
262         PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
263     }
264 
265     public int count() {
266         return tail - head;
267     }
268 
269     public int remaining() {
270         return ringEntries - count();
271     }
272 
273     //delete memory
274     public void release() {
275         PlatformDependent.freeMemory(timeoutMemoryAddress);
276     }
277 }