View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.channel.unix.Buffer;
19  import io.netty.util.internal.logging.InternalLogger;
20  import io.netty.util.internal.logging.InternalLoggerFactory;
21  
22  import java.lang.invoke.MethodHandles;
23  import java.lang.invoke.VarHandle;
24  import java.nio.ByteBuffer;
25  import java.nio.ByteOrder;
26  import java.util.StringJoiner;
27  
28  final class SubmissionQueue {
29      private static final InternalLogger logger = InternalLoggerFactory.getInstance(SubmissionQueue.class);
30  
31      static final int SQE_SIZE = 64;
32  
33      //these offsets are used to access specific properties
34      //SQE https://github.com/axboe/liburing/blob/liburing-2.6/src/include/liburing/io_uring.h#L30
35      private static final int SQE_OP_CODE_FIELD = 0;
36      private static final int SQE_FLAGS_FIELD = 1;
37      private static final int SQE_IOPRIO_FIELD = 2; // u16
38      private static final int SQE_FD_FIELD = 4; // s32
39      private static final int SQE_UNION1_FIELD = 8;
40      private static final int SQE_UNION2_FIELD = 16;
41      private static final int SQE_LEN_FIELD = 24;
42      private static final int SQE_UNION3_FIELD = 28;
43      private static final int SQE_USER_DATA_FIELD = 32;
44      private static final int SQE_UNION4_FIELD = 40;
45      private static final int SQE_PERSONALITY_FIELD = 42;
46      private static final int SQE_UNION5_FIELD = 44;
47      private static final int SQE_UNION6_FIELD = 48;
48  
49      // These unsigned integer pointers(shared with the kernel) will be changed by the kernel and us
50      // using a VarHandle.
51      private static final VarHandle INT_HANDLE =
52              MethodHandles.byteBufferViewVarHandle(int[].class, ByteOrder.nativeOrder());
53      private final ByteBuffer kHead;
54      private final ByteBuffer kTail;
55      private final ByteBuffer submissionQueueArray;
56  
57      final int ringEntries;
58      private final int ringMask; // = ringEntries - 1
59  
60      final int ringSize;
61      final long ringAddress;
62      final int ringFd;
63      int enterRingFd;
64      private int enterFlags;
65      private int head;
66      private int tail;
67  
68      private boolean closed;
69  
70      SubmissionQueue(ByteBuffer khead, ByteBuffer ktail, int ringMask, int ringEntries, ByteBuffer submissionQueueArray,
71                      int ringSize, long ringAddress,
72                      int ringFd) {
73          this.kHead = khead;
74          this.kTail = ktail;
75          this.submissionQueueArray = submissionQueueArray;
76          this.ringSize = ringSize;
77          this.ringAddress = ringAddress;
78          this.ringFd = ringFd;
79          this.enterRingFd = ringFd;
80          this.ringEntries = ringEntries;
81          this.ringMask = ringMask;
82          this.head = (int) INT_HANDLE.getVolatile(khead, 0);
83          this.tail = (int) INT_HANDLE.getVolatile(ktail, 0);
84      }
85  
86      long submissionQueueArrayAddress() {
87          return Buffer.memoryAddress(submissionQueueArray);
88      }
89  
90      void close() {
91          closed = true;
92      }
93  
94      private void checkClosed() {
95          if (closed) {
96              throw new IllegalStateException();
97          }
98      }
99  
100     void tryRegisterRingFd() {
101         checkClosed();
102         // Try to use IORING_REGISTER_RING_FDS.
103         // See https://manpages.debian.org/unstable/liburing-dev/io_uring_register.2.en.html#IORING_REGISTER_RING_FDS
104         int enterRingFd = Native.ioUringRegisterRingFds(ringFd);
105         final int enterFlags;
106         if (enterRingFd < 0) {
107             // Use of IORING_REGISTER_RING_FDS failed, just use the ring fd directly.
108             enterRingFd = ringFd;
109             enterFlags = 0;
110         } else {
111             enterFlags = Native.IORING_ENTER_REGISTERED_RING;
112         }
113         this.enterRingFd = enterRingFd;
114         this.enterFlags = enterFlags;
115     }
116 
117     long enqueueSqe(byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len,
118                              int union3, long udata, short union4, short personality, int union5, long union6) {
119         checkClosed();
120         int pending = tail - head;
121         if (pending == ringEntries) {
122             int submitted = submit();
123             if (submitted == 0) {
124                 // We have a problem, could not submit to make more room in the ring
125                 throw new RuntimeException("SQ ring full and no submissions accepted");
126             }
127         }
128         int sqe = sqeIndex(tail++, ringMask);
129 
130         //set sqe(submission queue) properties
131         submissionQueueArray.put(sqe + SQE_OP_CODE_FIELD, opcode);
132         submissionQueueArray.put(sqe + SQE_FLAGS_FIELD, flags);
133         // This constant is set up-front
134         submissionQueueArray.putShort(sqe + SQE_IOPRIO_FIELD, ioPrio);
135         submissionQueueArray.putInt(sqe + SQE_FD_FIELD, fd);
136         submissionQueueArray.putLong(sqe + SQE_UNION1_FIELD, union1);
137         submissionQueueArray.putLong(sqe + SQE_UNION2_FIELD, union2);
138         submissionQueueArray.putInt(sqe + SQE_LEN_FIELD, len);
139         submissionQueueArray.putInt(sqe + SQE_UNION3_FIELD, union3);
140         submissionQueueArray.putLong(sqe + SQE_USER_DATA_FIELD, udata);
141         submissionQueueArray.putShort(sqe + SQE_UNION4_FIELD, union4);
142         submissionQueueArray.putShort(sqe + SQE_PERSONALITY_FIELD, personality);
143         submissionQueueArray.putInt(sqe + SQE_UNION5_FIELD, union5);
144         submissionQueueArray.putLong(sqe + SQE_UNION6_FIELD, union6);
145 
146         if (logger.isTraceEnabled()) {
147             if (opcode == Native.IORING_OP_WRITEV || opcode == Native.IORING_OP_READV) {
148                 logger.trace("add(ring={}, enterRing:{} ): {}(fd={}, len={}, off={}, data={})",
149                         ringFd, enterRingFd, Native.opToStr(opcode), fd, len, union1, udata);
150             } else {
151                 logger.trace("add(ring={}, enterRing:{}): {}(fd={}, len={}, off={}, data={})",
152                         ringFd, enterRingFd, Native.opToStr(opcode), fd, len, union1, udata);
153             }
154         }
155         return udata;
156     }
157 
158     @Override
159     public String toString() {
160         StringJoiner sb = new StringJoiner(", ", "SubmissionQueue [", "]");
161         if (closed) {
162             sb.add("closed");
163         } else {
164             int pending = tail - head;
165             int idx = tail;
166             for (int i = 0; i < pending; i++) {
167                 int sqe = sqeIndex(idx++, ringMask);
168                 sb.add(Native.opToStr(submissionQueueArray.get(sqe + SQE_OP_CODE_FIELD)) +
169                         "(fd=" + submissionQueueArray.getInt(sqe + SQE_FD_FIELD) + ')');
170             }
171         }
172         return sb.toString();
173     }
174 
175     private static int sqeIndex(int tail, int ringMask) {
176         return (tail & ringMask) * SQE_SIZE;
177     }
178 
179     long addNop(byte flags, long udata) {
180         // Mimic what liburing does:
181         // https://github.com/axboe/liburing/blob/liburing-2.8/src/include/liburing.h#L592
182         return enqueueSqe(Native.IORING_OP_NOP, flags, (short) 0, -1, 0, 0, 0, 0, udata,
183                 (short) 0, (short) 0, 0, 0);
184     }
185 
186     long addTimeout(long timeoutMemoryAddress, long udata) {
187         // Mimic what liburing does. We want to use a count of 1:
188         // https://github.com/axboe/liburing/blob/liburing-2.8/src/include/liburing.h#L599
189         return enqueueSqe(Native.IORING_OP_TIMEOUT, (byte) 0, (short) 0, -1, 1, timeoutMemoryAddress, 1,
190                 0, udata, (short) 0, (short) 0, 0, 0);
191     }
192 
193     long addLinkTimeout(long timeoutMemoryAddress, long extraData) {
194         // Mimic what liburing does:
195         // https://github.com/axboe/liburing/blob/liburing-2.8/src/include/liburing.h#L687
196         return enqueueSqe(Native.IORING_OP_LINK_TIMEOUT, (byte) 0, (short) 0, -1, 1, timeoutMemoryAddress, 1,
197                 0, extraData, (short) 0, (short) 0, 0, 0);
198     }
199 
200     long addEventFdRead(int fd, long bufferAddress, int pos, int limit, long udata) {
201         return enqueueSqe(Native.IORING_OP_READ, (byte) 0, (short) 0, fd, 0, bufferAddress + pos, limit - pos,
202                 0, udata, (short) 0, (short) 0, 0, 0);
203     }
204 
205     // Mimic what liburing does:
206     // https://github.com/axboe/liburing/blob/liburing-2.8/src/include/liburing.h#L673
207     long addCancel(long sqeToCancel, long udata) {
208         return enqueueSqe(Native.IORING_OP_ASYNC_CANCEL, (byte) 0, (short) 0, -1, 0, sqeToCancel, 0, 0,
209                 udata, (short) 0, (short) 0, 0, 0);
210     }
211 
212     int submit() {
213         checkClosed();
214         int submit = tail - head;
215         return submit > 0 ? submit(submit, 0, 0) : 0;
216     }
217 
218     int submitAndWait() {
219         checkClosed();
220         int submit = tail - head;
221         if (submit > 0) {
222             return submit(submit, 1, Native.IORING_ENTER_GETEVENTS);
223         }
224         assert submit == 0;
225         int ret = ioUringEnter(0, 1, Native.IORING_ENTER_GETEVENTS);
226         if (ret < 0) {
227             throw new RuntimeException("ioUringEnter syscall returned " + ret);
228         }
229         return ret; // should be 0
230     }
231 
232     private int submit(int toSubmit, int minComplete, int flags) {
233         INT_HANDLE.setRelease(kTail, 0, tail);
234         int ret = ioUringEnter(toSubmit, minComplete, flags);
235         head = (int) INT_HANDLE.getVolatile(kHead, 0); // acquire memory barrier
236         if (ret != toSubmit) {
237             if (ret < 0) {
238                 throw new RuntimeException("ioUringEnter syscall returned " + ret);
239             }
240         }
241         return ret;
242     }
243 
244     private int ioUringEnter(int toSubmit, int minComplete, int flags) {
245         int f = enterFlags | flags;
246 
247         if (IoUring.isSetupSubmitAllSupported()) {
248             return ioUringEnter0(toSubmit, minComplete, f);
249         }
250         // If IORING_SETUP_SUBMIT_ALL is not supported we need to loop until we submitted everything as
251         // io_uring_enter(...) will stop submitting once the first inline executed submission fails.
252         int submitted = 0;
253         for (;;) {
254             int ret = ioUringEnter0(toSubmit, minComplete, f);
255             if (ret < 0) {
256                 return ret;
257             }
258             submitted += ret;
259             if (ret == toSubmit) {
260                 return submitted;
261             }
262             if (logger.isTraceEnabled()) {
263                 // some submission might fail if these are done inline and failed.
264                 logger.trace("Not all submissions succeeded. Only {} of {} SQEs were submitted.", ret, toSubmit);
265             }
266             toSubmit -= ret;
267         }
268     }
269 
270     private int ioUringEnter0(int toSubmit, int minComplete, int f) {
271         if (logger.isTraceEnabled()) {
272             logger.trace("io_uring_enter(ring={}, enterRing={}, toSubmit={}, minComplete={}, flags={}): {}",
273                     ringFd, enterRingFd, toSubmit, minComplete, f, toString());
274         }
275         return Native.ioUringEnter(enterRingFd, toSubmit, minComplete, f);
276     }
277 
278     public int count() {
279         return tail - head;
280     }
281 
282     public int remaining() {
283         return ringEntries - count();
284     }
285 }