View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.channel.unix.Buffer;
19  import io.netty.channel.unix.Errors;
20  import io.netty.util.internal.logging.InternalLogger;
21  import io.netty.util.internal.logging.InternalLoggerFactory;
22  
23  import java.io.UncheckedIOException;
24  import java.lang.invoke.MethodHandles;
25  import java.lang.invoke.VarHandle;
26  import java.nio.ByteBuffer;
27  import java.nio.ByteOrder;
28  import java.util.StringJoiner;
29  
30  final class SubmissionQueue {
31      private static final InternalLogger logger = InternalLoggerFactory.getInstance(SubmissionQueue.class);
32  
33      static final int SQE_SIZE = 64;
34  
35      //these offsets are used to access specific properties
36      //SQE https://github.com/axboe/liburing/blob/liburing-2.6/src/include/liburing/io_uring.h#L30
37      private static final int SQE_OP_CODE_FIELD = 0;
38      private static final int SQE_FLAGS_FIELD = 1;
39      private static final int SQE_IOPRIO_FIELD = 2; // u16
40      private static final int SQE_FD_FIELD = 4; // s32
41      private static final int SQE_UNION1_FIELD = 8;
42      private static final int SQE_UNION2_FIELD = 16;
43      private static final int SQE_LEN_FIELD = 24;
44      private static final int SQE_UNION3_FIELD = 28;
45      private static final int SQE_USER_DATA_FIELD = 32;
46      private static final int SQE_UNION4_FIELD = 40;
47      private static final int SQE_PERSONALITY_FIELD = 42;
48      private static final int SQE_UNION5_FIELD = 44;
49      private static final int SQE_UNION6_FIELD = 48;
50  
51      // These unsigned integer pointers(shared with the kernel) will be changed by the kernel and us
52      // using a VarHandle.
53      private static final VarHandle INT_HANDLE =
54              MethodHandles.byteBufferViewVarHandle(int[].class, ByteOrder.nativeOrder());
55      private final ByteBuffer kHead;
56      private final ByteBuffer kTail;
57      private final ByteBuffer kflags;
58      private final ByteBuffer submissionQueueArray;
59  
60      final int ringEntries;
61      private final int ringMask; // = ringEntries - 1
62  
63      final int ringSize;
64      final long ringAddress;
65      final int ringFd;
66      int enterRingFd;
67      private int enterFlags;
68      private int head;
69      private int tail;
70  
71      private boolean closed;
72  
73      SubmissionQueue(ByteBuffer khead, ByteBuffer ktail, int ringMask, int ringEntries, ByteBuffer kflags,
74                      ByteBuffer submissionQueueArray,
75                      int ringSize, long ringAddress,
76                      int ringFd) {
77          this.kHead = khead;
78          this.kTail = ktail;
79          this.submissionQueueArray = submissionQueueArray;
80          this.ringSize = ringSize;
81          this.ringAddress = ringAddress;
82          this.ringFd = ringFd;
83          this.enterRingFd = ringFd;
84          this.ringEntries = ringEntries;
85          this.kflags = kflags;
86          this.ringMask = ringMask;
87          this.head = (int) INT_HANDLE.getVolatile(khead, 0);
88          this.tail = (int) INT_HANDLE.getVolatile(ktail, 0);
89      }
90  
91      int flags() {
92          if (closed) {
93              return 0;
94          }
95          // we only need memory_order_relaxed
96          return (int) INT_HANDLE.getOpaque(kflags, 0);
97      }
98  
99      long submissionQueueArrayAddress() {
100         return Buffer.memoryAddress(submissionQueueArray);
101     }
102 
103     void close() {
104         closed = true;
105     }
106 
107     private void checkClosed() {
108         if (closed) {
109             throw new IllegalStateException();
110         }
111     }
112 
113     void tryRegisterRingFd() {
114         checkClosed();
115         // Try to use IORING_REGISTER_RING_FDS.
116         // See https://manpages.debian.org/unstable/liburing-dev/io_uring_register.2.en.html#IORING_REGISTER_RING_FDS
117         int enterRingFd = Native.ioUringRegisterRingFds(ringFd);
118         final int enterFlags;
119         if (enterRingFd < 0) {
120             // Use of IORING_REGISTER_RING_FDS failed, just use the ring fd directly.
121             enterRingFd = ringFd;
122             enterFlags = 0;
123         } else {
124             enterFlags = Native.IORING_ENTER_REGISTERED_RING;
125         }
126         this.enterRingFd = enterRingFd;
127         this.enterFlags = enterFlags;
128     }
129 
130     long enqueueSqe(byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len,
131                              int union3, long udata, short union4, short personality, int union5, long union6) {
132         checkClosed();
133         int pending = tail - head;
134         if (pending == ringEntries) {
135             int submitted = submit();
136             if (submitted == 0) {
137                 // We have a problem, could not submit to make more room in the ring
138                 throw new RuntimeException("SQ ring full and no submissions accepted");
139             }
140         }
141         int sqe = sqeIndex(tail++, ringMask);
142 
143         //set sqe(submission queue) properties
144         submissionQueueArray.put(sqe + SQE_OP_CODE_FIELD, opcode);
145         submissionQueueArray.put(sqe + SQE_FLAGS_FIELD, flags);
146         // This constant is set up-front
147         submissionQueueArray.putShort(sqe + SQE_IOPRIO_FIELD, ioPrio);
148         submissionQueueArray.putInt(sqe + SQE_FD_FIELD, fd);
149         submissionQueueArray.putLong(sqe + SQE_UNION1_FIELD, union1);
150         submissionQueueArray.putLong(sqe + SQE_UNION2_FIELD, union2);
151         submissionQueueArray.putInt(sqe + SQE_LEN_FIELD, len);
152         submissionQueueArray.putInt(sqe + SQE_UNION3_FIELD, union3);
153         submissionQueueArray.putLong(sqe + SQE_USER_DATA_FIELD, udata);
154         submissionQueueArray.putShort(sqe + SQE_UNION4_FIELD, union4);
155         submissionQueueArray.putShort(sqe + SQE_PERSONALITY_FIELD, personality);
156         submissionQueueArray.putInt(sqe + SQE_UNION5_FIELD, union5);
157         submissionQueueArray.putLong(sqe + SQE_UNION6_FIELD, union6);
158 
159         if (logger.isTraceEnabled()) {
160             if (opcode == Native.IORING_OP_WRITEV || opcode == Native.IORING_OP_READV) {
161                 logger.trace("add(ring={}, enterRing:{} ): {}(fd={}, len={}, off={}, data={})",
162                         ringFd, enterRingFd, Native.opToStr(opcode), fd, len, union1, udata);
163             } else {
164                 logger.trace("add(ring={}, enterRing:{}): {}(fd={}, len={}, off={}, data={})",
165                         ringFd, enterRingFd, Native.opToStr(opcode), fd, len, union1, udata);
166             }
167         }
168         return udata;
169     }
170 
171     @Override
172     public String toString() {
173         StringJoiner sb = new StringJoiner(", ", "SubmissionQueue [", "]");
174         if (closed) {
175             sb.add("closed");
176         } else {
177             int pending = tail - head;
178             int idx = tail;
179             for (int i = 0; i < pending; i++) {
180                 int sqe = sqeIndex(idx++, ringMask);
181                 sb.add(Native.opToStr(submissionQueueArray.get(sqe + SQE_OP_CODE_FIELD)) +
182                         "(fd=" + submissionQueueArray.getInt(sqe + SQE_FD_FIELD) + ')');
183             }
184         }
185         return sb.toString();
186     }
187 
188     private static int sqeIndex(int tail, int ringMask) {
189         return (tail & ringMask) * SQE_SIZE;
190     }
191 
192     long addNop(byte flags, long udata) {
193         // Mimic what liburing does:
194         // https://github.com/axboe/liburing/blob/liburing-2.8/src/include/liburing.h#L592
195         return enqueueSqe(Native.IORING_OP_NOP, flags, (short) 0, -1, 0, 0, 0, 0, udata,
196                 (short) 0, (short) 0, 0, 0);
197     }
198 
199     long addTimeout(long timeoutMemoryAddress, long udata) {
200         // Mimic what liburing does. We want to use a count of 1:
201         // https://github.com/axboe/liburing/blob/liburing-2.8/src/include/liburing.h#L599
202         return enqueueSqe(Native.IORING_OP_TIMEOUT, (byte) 0, (short) 0, -1, 1, timeoutMemoryAddress, 1,
203                 0, udata, (short) 0, (short) 0, 0, 0);
204     }
205 
206     long addLinkTimeout(long timeoutMemoryAddress, long extraData) {
207         // Mimic what liburing does:
208         // https://github.com/axboe/liburing/blob/liburing-2.8/src/include/liburing.h#L687
209         return enqueueSqe(Native.IORING_OP_LINK_TIMEOUT, (byte) 0, (short) 0, -1, 1, timeoutMemoryAddress, 1,
210                 0, extraData, (short) 0, (short) 0, 0, 0);
211     }
212 
213     long addEventFdRead(int fd, long bufferAddress, int pos, int limit, long udata) {
214         return enqueueSqe(Native.IORING_OP_READ, (byte) 0, (short) 0, fd, 0, bufferAddress + pos, limit - pos,
215                 0, udata, (short) 0, (short) 0, 0, 0);
216     }
217 
218     // Mimic what liburing does:
219     // https://github.com/axboe/liburing/blob/liburing-2.8/src/include/liburing.h#L673
220     long addCancel(long sqeToCancel, long udata) {
221         return enqueueSqe(Native.IORING_OP_ASYNC_CANCEL, (byte) 0, (short) 0, -1, 0, sqeToCancel, 0, 0,
222                 udata, (short) 0, (short) 0, 0, 0);
223     }
224 
225     /**
226      * Submit entries.
227      *
228      * @return  the number of submitted entries.
229      */
230     int submit() {
231         checkClosed();
232         int submit = tail - head;
233         return submit > 0 ? submit(submit, 0, 0) : 0;
234     }
235 
236     /**
237      * Submit entries and fetch completions. This method will block until there is at least one completion ready to be
238      * processed.
239      *
240      * @return  the number of submitted entries.
241      */
242     int submitAndGet() {
243         return submitAndGet0(1);
244     }
245 
246     /**
247      * Submit entries and fetch completions.
248      *
249      * @return  the number of submitted entries.
250      */
251     int submitAndGetNow() {
252         return submitAndGet0(0);
253     }
254 
255     private int submitAndGet0(int minComplete) {
256         checkClosed();
257         int submit = tail - head;
258         if (submit > 0) {
259             return submit(submit, minComplete, Native.IORING_ENTER_GETEVENTS);
260         }
261         assert submit == 0;
262         int ret = ioUringEnter(0, minComplete, Native.IORING_ENTER_GETEVENTS);
263         if (ret < 0) {
264             throw new UncheckedIOException(Errors.newIOException("io_uring_enter", ret));
265         }
266         return ret; // should be 0
267     }
268 
269     private int submit(int toSubmit, int minComplete, int flags) {
270         INT_HANDLE.setRelease(kTail, 0, tail);
271         int ret = ioUringEnter(toSubmit, minComplete, flags);
272         head = (int) INT_HANDLE.getVolatile(kHead, 0); // acquire memory barrier
273         if (ret != toSubmit) {
274             if (ret < 0) {
275                 throw new UncheckedIOException(Errors.newIOException("io_uring_enter", ret));
276             }
277         }
278         return ret;
279     }
280 
281     private int ioUringEnter(int toSubmit, int minComplete, int flags) {
282         int f = enterFlags | flags;
283 
284         if (IoUring.isSetupSubmitAllSupported()) {
285             return ioUringEnter0(toSubmit, minComplete, f);
286         }
287         // If IORING_SETUP_SUBMIT_ALL is not supported we need to loop until we submitted everything as
288         // io_uring_enter(...) will stop submitting once the first inline executed submission fails.
289         int submitted = 0;
290         for (;;) {
291             int ret = ioUringEnter0(toSubmit, minComplete, f);
292             if (ret < 0) {
293                 return ret;
294             }
295             submitted += ret;
296             if (ret == toSubmit) {
297                 return submitted;
298             }
299             if (logger.isTraceEnabled()) {
300                 // some submission might fail if these are done inline and failed.
301                 logger.trace("Not all submissions succeeded. Only {} of {} SQEs were submitted.", ret, toSubmit);
302             }
303             toSubmit -= ret;
304         }
305     }
306 
307     private int ioUringEnter0(int toSubmit, int minComplete, int f) {
308         if (logger.isTraceEnabled()) {
309             logger.trace("io_uring_enter(ring={}, enterRing={}, toSubmit={}, minComplete={}, flags={}): {}",
310                     ringFd, enterRingFd, toSubmit, minComplete, f, toString());
311         }
312         return Native.ioUringEnter(enterRingFd, toSubmit, minComplete, f);
313     }
314 
315     public int count() {
316         return tail - head;
317     }
318 
319     public int remaining() {
320         return ringEntries - count();
321     }
322 }