View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.util.internal.PlatformDependent;
19  import io.netty.util.internal.logging.InternalLogger;
20  import io.netty.util.internal.logging.InternalLoggerFactory;
21  
22  import java.util.StringJoiner;
23  import java.util.function.IntSupplier;
24  
25  import static java.lang.Math.max;
26  import static java.lang.Math.min;
27  
28  final class SubmissionQueue {
29      private static final InternalLogger logger = InternalLoggerFactory.getInstance(SubmissionQueue.class);
30  
31      private static final long SQE_SIZE = 64;
32      private static final int INT_SIZE = Integer.BYTES; //no 32 Bit support?
33      private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec
34  
35      //these offsets are used to access specific properties
36      //SQE https://github.com/axboe/liburing/blob/liburing-2.6/src/include/liburing/io_uring.h#L30
37      private static final int SQE_OP_CODE_FIELD = 0;
38      private static final int SQE_FLAGS_FIELD = 1;
39      private static final int SQE_IOPRIO_FIELD = 2; // u16
40      private static final int SQE_FD_FIELD = 4; // s32
41      private static final int SQE_OFFSET_FIELD = 8;
42      private static final int SQE_ADDRESS_FIELD = 16;
43      private static final int SQE_LEN_FIELD = 24;
44      private static final int SQE_RW_FLAGS_FIELD = 28;
45      private static final int SQE_USER_DATA_FIELD = 32;
46      private static final int SQE_PAD_FIELD = 40;
47  
48      private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
49      private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
50  
51      //these unsigned integer pointers(shared with the kernel) will be changed by the kernel
52      private final long kHeadAddress;
53      private final long kTailAddress;
54      private final long kFlagsAddress;
55      private final long kDroppedAddress;
56      private final long kArrayAddress;
57      final long submissionQueueArrayAddress;
58  
59      final int ringEntries;
60      private final int ringMask; // = ringEntries - 1
61  
62      final int ringSize;
63      final long ringAddress;
64      final int ringFd;
65      private final long timeoutMemoryAddress;
66      private final IntSupplier completionCount;
67      private int numHandledFds;
68      private int head;
69      private int tail;
70  
71      SubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress,
72                      long kFlagsAddress, long kDroppedAddress, long kArrayAddress,
73                      long submissionQueueArrayAddress, int ringSize, long ringAddress, int ringFd,
74                      IntSupplier completionCount) {
75          this.kHeadAddress = kHeadAddress;
76          this.kTailAddress = kTailAddress;
77          this.kFlagsAddress = kFlagsAddress;
78          this.kDroppedAddress = kDroppedAddress;
79          this.kArrayAddress = kArrayAddress;
80          this.submissionQueueArrayAddress = submissionQueueArrayAddress;
81          this.ringSize = ringSize;
82          this.ringAddress = ringAddress;
83          this.ringFd = ringFd;
84          this.ringEntries = PlatformDependent.getIntVolatile(kRingEntriesAddress);
85          this.ringMask = PlatformDependent.getIntVolatile(kRingMaskAddress);
86          this.head = PlatformDependent.getIntVolatile(kHeadAddress);
87          this.tail = PlatformDependent.getIntVolatile(kTailAddress);
88  
89          this.timeoutMemoryAddress = PlatformDependent.allocateMemory(KERNEL_TIMESPEC_SIZE);
90          this.completionCount = completionCount;
91  
92          // Zero the whole SQE array first
93          PlatformDependent.setMemory(submissionQueueArrayAddress, ringEntries * SQE_SIZE, (byte) 0);
94  
95          // Fill SQ array indices (1-1 with SQE array) and set nonzero constant SQE fields
96          long address = kArrayAddress;
97          for (int i = 0; i < ringEntries; i++, address += INT_SIZE) {
98              PlatformDependent.putInt(address, i);
99          }
100     }
101 
102     void incrementHandledFds() {
103         numHandledFds++;
104     }
105 
106     void decrementHandledFds() {
107         numHandledFds--;
108         assert numHandledFds >= 0;
109     }
110 
111     long enqueueSqe(byte op, int flags, short ioPrio, int rwFlags, int fd,
112                                long bufferAddress, int length, long offset, int id, short data) {
113         int pending = tail - head;
114         if (pending == ringEntries) {
115             int submitted = submit();
116             if (submitted == 0) {
117                 // We have a problem, could not submit to make more room in the ring
118                 throw new RuntimeException("SQ ring full and no submissions accepted");
119             }
120         }
121         long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE;
122         long udata = UserData.encode(id, op, data);
123         setData(sqe, op, flags, ioPrio, rwFlags, fd,
124                 bufferAddress, length, offset, udata);
125         return udata;
126     }
127 
128     void enqueueSqe(byte op, int flags, short ioPrio, int rwFlags, int fd,
129                     long bufferAddress, int length, long offset, long udata) {
130         int pending = tail - head;
131         if (pending == ringEntries) {
132             int submitted = submit();
133             if (submitted == 0) {
134                 // We have a problem, could not submit to make more room in the ring
135                 throw new RuntimeException("SQ ring full and no submissions accepted");
136             }
137         }
138         long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE;
139         setData(sqe, op, flags, ioPrio, rwFlags, fd, bufferAddress, length, offset, udata);
140     }
141 
142     private void setData(long sqe, byte op, int flags, short ioPrio, int rwFlags, int fd, long bufferAddress,
143                          int length, long offset, long udata) {
144         //set sqe(submission queue) properties
145 
146         PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, op);
147         PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) flags);
148         // This constant is set up-front
149         PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, ioPrio);
150         PlatformDependent.putInt(sqe + SQE_FD_FIELD, fd);
151         PlatformDependent.putLong(sqe + SQE_OFFSET_FIELD, offset);
152         PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, bufferAddress);
153         PlatformDependent.putInt(sqe + SQE_LEN_FIELD, length);
154         PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, rwFlags);
155         PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, udata);
156 
157         if (logger.isTraceEnabled()) {
158             if (op == Native.IORING_OP_WRITEV || op == Native.IORING_OP_READV) {
159                 logger.trace("add(ring {}): {}(fd={}, len={} ({} bytes), off={}, data={})",
160                         ringFd, Native.opToStr(op), fd, length, Iov.sumSize(bufferAddress, length), offset, udata);
161             } else {
162                 logger.trace("add(ring {}): {}(fd={}, len={}, off={}, data={})",
163                         ringFd, Native.opToStr(op), fd, length, offset, udata);
164             }
165         }
166     }
167 
168     @Override
169     public String toString() {
170         StringJoiner sb = new StringJoiner(", ", "SubmissionQueue [", "]");
171         int pending = tail - head;
172         for (int i = 0; i < pending; i++) {
173             long sqe = submissionQueueArrayAddress + (head + i & ringMask) * SQE_SIZE;
174             sb.add(Native.opToStr(PlatformDependent.getByte(sqe + SQE_OP_CODE_FIELD)) +
175                     "(fd=" + PlatformDependent.getInt(sqe + SQE_FD_FIELD) + ')');
176         }
177         return sb.toString();
178     }
179 
180     long addNop(int fd, int flags, int id, short data) {
181         return enqueueSqe(Native.IORING_OP_NOP, flags, (short) 0, 0, fd, 0, 0, 0, id, data);
182     }
183 
184     long addTimeout(int fd, long nanoSeconds, int id, short extraData) {
185         setTimeout(nanoSeconds);
186         return enqueueSqe(Native.IORING_OP_TIMEOUT, 0, (short) 0, 0, fd,
187                 timeoutMemoryAddress, 1, 0, id, extraData);
188     }
189 
190     long addLinkTimeout(int fd, long nanoSeconds, int id, short extraData) {
191         setTimeout(nanoSeconds);
192         return enqueueSqe(Native.IORING_OP_LINK_TIMEOUT, 0, (short) 0, 0, fd,
193                 timeoutMemoryAddress, 1, 0, id, extraData);
194     }
195 
196     long addEventFdRead(int fd, long bufferAddress, int pos, int limit, int id, short extraData) {
197         return enqueueSqe(Native.IORING_OP_READ, 0, (short) 0, 0, fd,
198                 bufferAddress + pos, limit - pos, 0, id, extraData);
199     }
200 
201     long addCancel(int fd, long sqeToCancel, int id) {
202         return enqueueSqe(Native.IORING_OP_ASYNC_CANCEL, 0, (short) 0, 0, fd, sqeToCancel, 0, 0, id, (short) 0);
203     }
204 
205     int submit() {
206         int submit = tail - head;
207         return submit > 0 ? submit(submit, 0, 0) : 0;
208     }
209 
210     int submitAndWait() {
211         int submit = tail - head;
212         if (submit > 0) {
213             return submit(submit, 1, Native.IORING_ENTER_GETEVENTS);
214         }
215         assert submit == 0;
216         int ret = Native.ioUringEnter(ringFd, 0, 1, Native.IORING_ENTER_GETEVENTS);
217         if (ret < 0) {
218             throw new RuntimeException("ioUringEnter syscall returned " + ret);
219         }
220         return ret; // should be 0
221     }
222 
223     private int submit(int toSubmit, int minComplete, int flags) {
224         if (logger.isTraceEnabled()) {
225             logger.trace("submit(ring {}): {}", ringFd, toString());
226         }
227         PlatformDependent.putIntOrdered(kTailAddress, tail); // release memory barrier
228         int ret = Native.ioUringEnter(ringFd, toSubmit, minComplete, flags);
229         head = PlatformDependent.getIntVolatile(kHeadAddress); // acquire memory barrier
230         if (ret != toSubmit) {
231             if (ret < 0) {
232                 throw new RuntimeException("ioUringEnter syscall returned " + ret);
233             }
234             logger.warn("Not all submissions succeeded. Only {} of {} SQEs were submitted, " +
235                     "while there are {} pending completions.", ret, toSubmit, completionCount.getAsInt());
236         }
237         return ret;
238     }
239 
240     private void setTimeout(long timeoutNanoSeconds) {
241         long seconds, nanoSeconds;
242 
243         if (timeoutNanoSeconds == 0) {
244             seconds = 0;
245             nanoSeconds = 0;
246         } else {
247             seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
248             nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
249         }
250 
251         PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
252         PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
253     }
254 
255     public int count() {
256         return tail - head;
257     }
258 
259     public int remaining() {
260         return ringEntries - count();
261     }
262 
263     //delete memory
264     public void release() {
265         PlatformDependent.freeMemory(timeoutMemoryAddress);
266     }
267 }