View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.util.internal.PlatformDependent;
19  import io.netty.util.internal.logging.InternalLogger;
20  import io.netty.util.internal.logging.InternalLoggerFactory;
21  
22  import java.util.StringJoiner;
23  
24  final class SubmissionQueue {
25      private static final InternalLogger logger = InternalLoggerFactory.getInstance(SubmissionQueue.class);
26  
27      private static final long SQE_SIZE = 64;
28      private static final int INT_SIZE = Integer.BYTES; //no 32 Bit support?
29  
30      //these offsets are used to access specific properties
31      //SQE https://github.com/axboe/liburing/blob/liburing-2.6/src/include/liburing/io_uring.h#L30
32      private static final int SQE_OP_CODE_FIELD = 0;
33      private static final int SQE_FLAGS_FIELD = 1;
34      private static final int SQE_IOPRIO_FIELD = 2; // u16
35      private static final int SQE_FD_FIELD = 4; // s32
36      private static final int SQE_UNION1_FIELD = 8;
37      private static final int SQE_UNION2_FIELD = 16;
38      private static final int SQE_LEN_FIELD = 24;
39      private static final int SQE_UNION3_FIELD = 28;
40      private static final int SQE_USER_DATA_FIELD = 32;
41      private static final int SQE_UNION4_FIELD = 40;
42      private static final int SQE_PERSONALITY_FIELD = 42;
43      private static final int SQE_UNION5_FIELD = 44;
44      private static final int SQE_UNION6_FIELD = 48;
45  
46      //these unsigned integer pointers(shared with the kernel) will be changed by the kernel
47      private final long kHeadAddress;
48      private final long kTailAddress;
49      private final long kFlagsAddress;
50      private final long kDroppedAddress;
51      private final long kArrayAddress;
52      final long submissionQueueArrayAddress;
53  
54      final int ringEntries;
55      private final int ringMask; // = ringEntries - 1
56  
57      final int ringSize;
58      final long ringAddress;
59      final int ringFd;
60      int enterRingFd;
61      private int enterFlags;
62      private int head;
63      private int tail;
64  
65      SubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress,
66                      long kFlagsAddress, long kDroppedAddress, long kArrayAddress,
67                      long submissionQueueArrayAddress, int ringSize, long ringAddress,
68                      int ringFd) {
69          this.kHeadAddress = kHeadAddress;
70          this.kTailAddress = kTailAddress;
71          this.kFlagsAddress = kFlagsAddress;
72          this.kDroppedAddress = kDroppedAddress;
73          this.kArrayAddress = kArrayAddress;
74          this.submissionQueueArrayAddress = submissionQueueArrayAddress;
75          this.ringSize = ringSize;
76          this.ringAddress = ringAddress;
77          this.ringFd = ringFd;
78          this.enterRingFd = ringFd;
79          this.ringEntries = PlatformDependent.getIntVolatile(kRingEntriesAddress);
80          this.ringMask = PlatformDependent.getIntVolatile(kRingMaskAddress);
81          this.head = PlatformDependent.getIntVolatile(kHeadAddress);
82          this.tail = PlatformDependent.getIntVolatile(kTailAddress);
83  
84          // Zero the whole SQE array first
85          PlatformDependent.setMemory(submissionQueueArrayAddress, ringEntries * SQE_SIZE, (byte) 0);
86  
87          // Fill SQ array indices (1-1 with SQE array) and set nonzero constant SQE fields
88          long address = kArrayAddress;
89          for (int i = 0; i < ringEntries; i++, address += INT_SIZE) {
90              PlatformDependent.putInt(address, i);
91          }
92      }
93  
94      void tryRegisterRingFd() {
95          // Try to use IORING_REGISTER_RING_FDS.
96          // See https://manpages.debian.org/unstable/liburing-dev/io_uring_register.2.en.html#IORING_REGISTER_RING_FDS
97          int enterRingFd = Native.ioUringRegisterRingFds(ringFd);
98          final int enterFlags;
99          if (enterRingFd < 0) {
100             // Use of IORING_REGISTER_RING_FDS failed, just use the ring fd directly.
101             enterRingFd = ringFd;
102             enterFlags = 0;
103         } else {
104             enterFlags = Native.IORING_ENTER_REGISTERED_RING;
105         }
106         this.enterRingFd = enterRingFd;
107         this.enterFlags = enterFlags;
108     }
109 
110     private long enqueueSqe0(byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len,
111                              int union3, long udata, short union4, short personality, int union5, long union6) {
112         int pending = tail - head;
113         if (pending == ringEntries) {
114             int submitted = submit();
115             if (submitted == 0) {
116                 // We have a problem, could not submit to make more room in the ring
117                 throw new RuntimeException("SQ ring full and no submissions accepted");
118             }
119         }
120         long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE;
121         setData(sqe, opcode, flags, ioPrio, fd, union1, union2, len,
122                 union3, udata, union4, personality, union5, union6);
123         return udata;
124     }
125 
126     void enqueueSqe(byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len, int union3,
127                     long udata, short union4, short personality, int union5, long union6) {
128         int pending = tail - head;
129         if (pending == ringEntries) {
130             int submitted = submit();
131             if (submitted == 0) {
132                 // We have a problem, could not submit to make more room in the ring
133                 throw new RuntimeException("SQ ring full and no submissions accepted");
134             }
135         }
136         long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE;
137         setData(sqe, opcode, flags, ioPrio, fd, union1, union2, len,
138                 union3, udata, union4, personality, union5, union6);
139     }
140 
141     private void setData(long sqe, byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len,
142                          int union3, long udata, short union4, short personality, int union5, long union6) {
143         //set sqe(submission queue) properties
144 
145         PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, opcode);
146         PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, flags);
147         // This constant is set up-front
148         PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, ioPrio);
149         PlatformDependent.putInt(sqe + SQE_FD_FIELD, fd);
150         PlatformDependent.putLong(sqe + SQE_UNION1_FIELD, union1);
151         PlatformDependent.putLong(sqe + SQE_UNION2_FIELD, union2);
152         PlatformDependent.putInt(sqe + SQE_LEN_FIELD, len);
153         PlatformDependent.putInt(sqe + SQE_UNION3_FIELD, union3);
154         PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, udata);
155         PlatformDependent.putShort(sqe + SQE_UNION4_FIELD, union4);
156         PlatformDependent.putShort(sqe + SQE_PERSONALITY_FIELD, personality);
157         PlatformDependent.putInt(sqe + SQE_UNION5_FIELD, union5);
158         PlatformDependent.putLong(sqe + SQE_UNION6_FIELD, union6);
159 
160         if (logger.isTraceEnabled()) {
161             if (opcode == Native.IORING_OP_WRITEV || opcode == Native.IORING_OP_READV) {
162                 logger.trace("add(ring={}, enterRing:{} ): {}(fd={}, len={} ({} bytes), off={}, data={})",
163                         ringFd, enterRingFd, Native.opToStr(opcode), fd, len, Iov.sumSize(union2, len), union1, udata);
164             } else {
165                 logger.trace("add(ring={}, enterRing:{}): {}(fd={}, len={}, off={}, data={})",
166                         ringFd, enterRingFd, Native.opToStr(opcode), fd, len, union1, udata);
167             }
168         }
169     }
170 
171     @Override
172     public String toString() {
173         StringJoiner sb = new StringJoiner(", ", "SubmissionQueue [", "]");
174         int pending = tail - head;
175         for (int i = 0; i < pending; i++) {
176             long sqe = submissionQueueArrayAddress + (head + i & ringMask) * SQE_SIZE;
177             sb.add(Native.opToStr(PlatformDependent.getByte(sqe + SQE_OP_CODE_FIELD)) +
178                     "(fd=" + PlatformDependent.getInt(sqe + SQE_FD_FIELD) + ')');
179         }
180         return sb.toString();
181     }
182 
183     long addNop(byte flags, long udata) {
184         // Mimic what liburing does:
185         // https://github.com/axboe/liburing/blob/liburing-2.8/src/include/liburing.h#L592
186         return enqueueSqe0(Native.IORING_OP_NOP, flags, (short) 0, -1, 0, 0, 0, 0, udata,
187                 (short) 0, (short) 0, 0, 0);
188     }
189 
190     long addTimeout(long timeoutMemoryAddress, long udata) {
191         // Mimic what liburing does. We want to use a count of 1:
192         // https://github.com/axboe/liburing/blob/liburing-2.8/src/include/liburing.h#L599
193         return enqueueSqe0(Native.IORING_OP_TIMEOUT, (byte) 0, (short) 0, -1, 1, timeoutMemoryAddress, 1,
194                 0, udata, (short) 0, (short) 0, 0, 0);
195     }
196 
197     long addLinkTimeout(long timeoutMemoryAddress, long extraData) {
198         // Mimic what liburing does:
199         // https://github.com/axboe/liburing/blob/liburing-2.8/src/include/liburing.h#L687
200         return enqueueSqe0(Native.IORING_OP_LINK_TIMEOUT, (byte) 0, (short) 0, -1, 1, timeoutMemoryAddress, 1,
201                 0, extraData, (short) 0, (short) 0, 0, 0);
202     }
203 
204     long addEventFdRead(int fd, long bufferAddress, int pos, int limit, long udata) {
205         return enqueueSqe0(Native.IORING_OP_READ, (byte) 0, (short) 0, fd, 0, bufferAddress + pos, limit - pos,
206                 0, udata, (short) 0, (short) 0, 0, 0);
207     }
208 
209     // Mimic what liburing does:
210     // https://github.com/axboe/liburing/blob/liburing-2.8/src/include/liburing.h#L673
211     long addCancel(long sqeToCancel, long udata) {
212         return enqueueSqe0(Native.IORING_OP_ASYNC_CANCEL, (byte) 0, (short) 0, -1, 0, sqeToCancel, 0, 0,
213                 udata, (short) 0, (short) 0, 0, 0);
214     }
215 
216     int submit() {
217         int submit = tail - head;
218         return submit > 0 ? submit(submit, 0, 0) : 0;
219     }
220 
221     int submitAndWait() {
222         int submit = tail - head;
223         if (submit > 0) {
224             return submit(submit, 1, Native.IORING_ENTER_GETEVENTS);
225         }
226         assert submit == 0;
227         int ret = ioUringEnter(0, 1, Native.IORING_ENTER_GETEVENTS);
228         if (ret < 0) {
229             throw new RuntimeException("ioUringEnter syscall returned " + ret);
230         }
231         return ret; // should be 0
232     }
233 
234     private int submit(int toSubmit, int minComplete, int flags) {
235         PlatformDependent.putIntOrdered(kTailAddress, tail); // release memory barrier
236         int ret = ioUringEnter(toSubmit, minComplete, flags);
237         head = PlatformDependent.getIntVolatile(kHeadAddress); // acquire memory barrier
238         if (ret != toSubmit) {
239             if (ret < 0) {
240                 throw new RuntimeException("ioUringEnter syscall returned " + ret);
241             }
242         }
243         return ret;
244     }
245 
246     private int ioUringEnter(int toSubmit, int minComplete, int flags) {
247         int f = enterFlags | flags;
248 
249         if (IoUring.isSetupSubmitAllSupported()) {
250             return ioUringEnter0(toSubmit, minComplete, f);
251         }
252         // If IORING_SETUP_SUBMIT_ALL is not supported we need to loop until we submitted everything as
253         // io_uring_enter(...) will stop submitting once the first inline executed submission fails.
254         int submitted = 0;
255         for (;;) {
256             int ret = ioUringEnter0(toSubmit, minComplete, f);
257             if (ret < 0) {
258                 return ret;
259             }
260             submitted += ret;
261             if (ret == toSubmit) {
262                 return submitted;
263             }
264             if (logger.isTraceEnabled()) {
265                 // some submission might fail if these are done inline and failed.
266                 logger.trace("Not all submissions succeeded. Only {} of {} SQEs were submitted.", ret, toSubmit);
267             }
268             toSubmit -= ret;
269         }
270     }
271 
272     private int ioUringEnter0(int toSubmit, int minComplete, int f) {
273         if (logger.isTraceEnabled()) {
274             logger.trace("io_uring_enter(ring={}, enterRing={}, toSubmit={}, minComplete={}, flags={}): {}",
275                     ringFd, enterRingFd, toSubmit, minComplete, f, toString());
276         }
277         return Native.ioUringEnter(enterRingFd, toSubmit, minComplete, f);
278     }
279 
280     public int count() {
281         return tail - head;
282     }
283 
284     public int remaining() {
285         return ringEntries - count();
286     }
287 }