View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.util.internal.PlatformDependent;
19  import io.netty.util.internal.logging.InternalLogger;
20  import io.netty.util.internal.logging.InternalLoggerFactory;
21  
22  import java.util.StringJoiner;
23  
24  import static java.lang.Math.max;
25  import static java.lang.Math.min;
26  
27  final class SubmissionQueue {
28      private static final InternalLogger logger = InternalLoggerFactory.getInstance(SubmissionQueue.class);
29  
30      private static final long SQE_SIZE = 64;
31      private static final int INT_SIZE = Integer.BYTES; //no 32 Bit support?
32      private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec
33  
34      //these offsets are used to access specific properties
35      //SQE https://github.com/axboe/liburing/blob/liburing-2.6/src/include/liburing/io_uring.h#L30
36      private static final int SQE_OP_CODE_FIELD = 0;
37      private static final int SQE_FLAGS_FIELD = 1;
38      private static final int SQE_IOPRIO_FIELD = 2; // u16
39      private static final int SQE_FD_FIELD = 4; // s32
40      private static final int SQE_UNION1_FIELD = 8;
41      private static final int SQE_UNION2_FIELD = 16;
42      private static final int SQE_LEN_FIELD = 24;
43      private static final int SQE_UNION3_FIELD = 28;
44      private static final int SQE_USER_DATA_FIELD = 32;
45      private static final int SQE_UNION4_FIELD = 40;
46      private static final int SQE_PERSONALITY_FIELD = 42;
47      private static final int SQE_UNION5_FIELD = 44;
48      private static final int SQE_UNION6_FIELD = 48;
49  
50      private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
51      private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
52  
53      //these unsigned integer pointers(shared with the kernel) will be changed by the kernel
54      private final long kHeadAddress;
55      private final long kTailAddress;
56      private final long kFlagsAddress;
57      private final long kDroppedAddress;
58      private final long kArrayAddress;
59      final long submissionQueueArrayAddress;
60  
61      final int ringEntries;
62      private final int ringMask; // = ringEntries - 1
63  
64      final int ringSize;
65      final long ringAddress;
66      final int ringFd;
67      private final long timeoutMemoryAddress;
68      private int head;
69      private int tail;
70  
71      SubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress,
72                      long kFlagsAddress, long kDroppedAddress, long kArrayAddress,
73                      long submissionQueueArrayAddress, int ringSize, long ringAddress, int ringFd) {
74          this.kHeadAddress = kHeadAddress;
75          this.kTailAddress = kTailAddress;
76          this.kFlagsAddress = kFlagsAddress;
77          this.kDroppedAddress = kDroppedAddress;
78          this.kArrayAddress = kArrayAddress;
79          this.submissionQueueArrayAddress = submissionQueueArrayAddress;
80          this.ringSize = ringSize;
81          this.ringAddress = ringAddress;
82          this.ringFd = ringFd;
83          this.ringEntries = PlatformDependent.getIntVolatile(kRingEntriesAddress);
84          this.ringMask = PlatformDependent.getIntVolatile(kRingMaskAddress);
85          this.head = PlatformDependent.getIntVolatile(kHeadAddress);
86          this.tail = PlatformDependent.getIntVolatile(kTailAddress);
87  
88          this.timeoutMemoryAddress = PlatformDependent.allocateMemory(KERNEL_TIMESPEC_SIZE);
89  
90          // Zero the whole SQE array first
91          PlatformDependent.setMemory(submissionQueueArrayAddress, ringEntries * SQE_SIZE, (byte) 0);
92  
93          // Fill SQ array indices (1-1 with SQE array) and set nonzero constant SQE fields
94          long address = kArrayAddress;
95          for (int i = 0; i < ringEntries; i++, address += INT_SIZE) {
96              PlatformDependent.putInt(address, i);
97          }
98      }
99  
100     private long enqueueSqe0(byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len,
101                              int union3, long udata, short union4, short personality, int union5, long union6) {
102         int pending = tail - head;
103         if (pending == ringEntries) {
104             int submitted = submit();
105             if (submitted == 0) {
106                 // We have a problem, could not submit to make more room in the ring
107                 throw new RuntimeException("SQ ring full and no submissions accepted");
108             }
109         }
110         long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE;
111         setData(sqe, opcode, flags, ioPrio, fd, union1, union2, len,
112                 union3, udata, union4, personality, union5, union6);
113         return udata;
114     }
115 
116     void enqueueSqe(byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len, int union3,
117                     long udata, short union4, short personality, int union5, long union6) {
118         int pending = tail - head;
119         if (pending == ringEntries) {
120             int submitted = submit();
121             if (submitted == 0) {
122                 // We have a problem, could not submit to make more room in the ring
123                 throw new RuntimeException("SQ ring full and no submissions accepted");
124             }
125         }
126         long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE;
127         setData(sqe, opcode, flags, ioPrio, fd, union1, union2, len,
128                 union3, udata, union4, personality, union5, union6);
129     }
130 
131     private void setData(long sqe, byte opcode, byte flags, short ioPrio, int fd, long union1, long union2, int len,
132                          int union3, long udata, short union4, short personality, int union5, long union6) {
133         //set sqe(submission queue) properties
134 
135         PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, opcode);
136         PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, flags);
137         // This constant is set up-front
138         PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, ioPrio);
139         PlatformDependent.putInt(sqe + SQE_FD_FIELD, fd);
140         PlatformDependent.putLong(sqe + SQE_UNION1_FIELD, union1);
141         PlatformDependent.putLong(sqe + SQE_UNION2_FIELD, union2);
142         PlatformDependent.putInt(sqe + SQE_LEN_FIELD, len);
143         PlatformDependent.putInt(sqe + SQE_UNION3_FIELD, union3);
144         PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, udata);
145         PlatformDependent.putShort(sqe + SQE_UNION4_FIELD, union4);
146         PlatformDependent.putShort(sqe + SQE_PERSONALITY_FIELD, personality);
147         PlatformDependent.putInt(sqe + SQE_UNION5_FIELD, union5);
148         PlatformDependent.putLong(sqe + SQE_UNION6_FIELD, union6);
149 
150         if (logger.isTraceEnabled()) {
151             if (opcode == Native.IORING_OP_WRITEV || opcode == Native.IORING_OP_READV) {
152                 logger.trace("add(ring {}): {}(fd={}, len={} ({} bytes), off={}, data={})",
153                         ringFd, Native.opToStr(opcode), fd, len, Iov.sumSize(union2, len), union1, udata);
154             } else {
155                 logger.trace("add(ring {}): {}(fd={}, len={}, off={}, data={})",
156                         ringFd, Native.opToStr(opcode), fd, len, union1, udata);
157             }
158         }
159     }
160 
161     @Override
162     public String toString() {
163         StringJoiner sb = new StringJoiner(", ", "SubmissionQueue [", "]");
164         int pending = tail - head;
165         for (int i = 0; i < pending; i++) {
166             long sqe = submissionQueueArrayAddress + (head + i & ringMask) * SQE_SIZE;
167             sb.add(Native.opToStr(PlatformDependent.getByte(sqe + SQE_OP_CODE_FIELD)) +
168                     "(fd=" + PlatformDependent.getInt(sqe + SQE_FD_FIELD) + ')');
169         }
170         return sb.toString();
171     }
172 
173     long addNop(int fd, byte flags, long udata) {
174         return enqueueSqe0(Native.IORING_OP_NOP, flags, (short) 0, fd, 0, 0, 0, 0, udata,
175                 (short) 0, (short) 0, 0, 0);
176     }
177 
178     long addTimeout(int fd, long nanoSeconds, long udata) {
179         setTimeout(nanoSeconds);
180         return enqueueSqe0(Native.IORING_OP_TIMEOUT, (byte) 0, (short) 0, fd, 0, timeoutMemoryAddress, 1,
181                 0, udata, (short) 0, (short) 0, 0, 0);
182     }
183 
184     long addLinkTimeout(int fd, long nanoSeconds, long extraData) {
185         setTimeout(nanoSeconds);
186         return enqueueSqe0(Native.IORING_OP_LINK_TIMEOUT, (byte) 0, (short) 0, fd, 0, timeoutMemoryAddress, 1,
187                 0, extraData, (short) 0, (short) 0, 0, 0);
188     }
189 
190     long addEventFdRead(int fd, long bufferAddress, int pos, int limit, long udata) {
191         return enqueueSqe0(Native.IORING_OP_READ, (byte) 0, (short) 0, fd, 0, bufferAddress + pos, limit - pos,
192                 0, udata, (short) 0, (short) 0, 0, 0);
193     }
194 
195     long addCancel(int fd, long sqeToCancel, long udata) {
196         return enqueueSqe0(Native.IORING_OP_ASYNC_CANCEL, (byte) 0, (short) 0, fd, 0, sqeToCancel, 0, 0,
197                 udata, (short) 0, (short) 0, 0, 0);
198     }
199 
200     int submit() {
201         int submit = tail - head;
202         return submit > 0 ? submit(submit, 0, 0) : 0;
203     }
204 
205     int submitAndWait() {
206         int submit = tail - head;
207         if (submit > 0) {
208             return submit(submit, 1, Native.IORING_ENTER_GETEVENTS);
209         }
210         assert submit == 0;
211         int ret = Native.ioUringEnter(ringFd, 0, 1, Native.IORING_ENTER_GETEVENTS);
212         if (ret < 0) {
213             throw new RuntimeException("ioUringEnter syscall returned " + ret);
214         }
215         return ret; // should be 0
216     }
217 
218     private int submit(int toSubmit, int minComplete, int flags) {
219         if (logger.isTraceEnabled()) {
220             logger.trace("submit(ring {}): {}", ringFd, toString());
221         }
222         PlatformDependent.putIntOrdered(kTailAddress, tail); // release memory barrier
223         int ret = Native.ioUringEnter(ringFd, toSubmit, minComplete, flags);
224         head = PlatformDependent.getIntVolatile(kHeadAddress); // acquire memory barrier
225         if (ret != toSubmit) {
226             if (ret < 0) {
227                 throw new RuntimeException("ioUringEnter syscall returned " + ret);
228             }
229             // some submission might fail if these are done inline and failed.
230             logger.trace("Not all submissions succeeded. Only {} of {} SQEs were submitted.", ret, toSubmit);
231         }
232         return ret;
233     }
234 
235     private void setTimeout(long timeoutNanoSeconds) {
236         long seconds, nanoSeconds;
237 
238         if (timeoutNanoSeconds == 0) {
239             seconds = 0;
240             nanoSeconds = 0;
241         } else {
242             seconds = (int) min(timeoutNanoSeconds / 1000000000L, Integer.MAX_VALUE);
243             nanoSeconds = (int) max(timeoutNanoSeconds - seconds * 1000000000L, 0);
244         }
245 
246         PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
247         PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
248     }
249 
250     public int count() {
251         return tail - head;
252     }
253 
254     public int remaining() {
255         return ringEntries - count();
256     }
257 
258     //delete memory
259     public void release() {
260         PlatformDependent.freeMemory(timeoutMemoryAddress);
261     }
262 }