View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import java.lang.invoke.MethodHandles;
19  import java.lang.invoke.VarHandle;
20  import java.nio.ByteBuffer;
21  import java.nio.ByteOrder;
22  import java.util.StringJoiner;
23  
24  /**
25   * Completion queue implementation for io_uring.
26   */
27  final class CompletionQueue {
28      private static final VarHandle INT_HANDLE =
29              MethodHandles.byteBufferViewVarHandle(int[].class, ByteOrder.nativeOrder());
30  
31      //these offsets are used to access specific properties
32      //CQE (https://github.com/axboe/liburing/blob/master/src/include/liburing/io_uring.h#L162)
33      private static final int CQE_USER_DATA_FIELD = 0;
34      private static final int CQE_RES_FIELD = 8;
35      private static final int CQE_FLAGS_FIELD = 12;
36  
37      //these unsigned integer pointers(shared with the kernel) will be changed by the kernel and us
38      // using a VarHandle.
39      private final ByteBuffer khead;
40      private final ByteBuffer ktail;
41      private final ByteBuffer kflags;
42      private final ByteBuffer completionQueueArray;
43      private final ByteBuffer[] extraCqeData;
44  
45      final int ringSize;
46      final long ringAddress;
47      final int ringFd;
48      final int ringEntries;
49      final int ringCapacity;
50      private final int cqeLength;
51  
52      private final int ringMask;
53      private int ringHead;
54      private boolean closed;
55  
56      CompletionQueue(ByteBuffer kHead, ByteBuffer kTail, int ringMask, int ringEntries, ByteBuffer kflags,
57                      ByteBuffer completionQueueArray, int ringSize, long ringAddress,
58                      int ringFd, int ringCapacity, int cqeLength, boolean extraCqeDataNeeded) {
59          this.khead = kHead;
60          this.ktail = kTail;
61          this.completionQueueArray = completionQueueArray;
62          this.ringSize = ringSize;
63          this.ringAddress = ringAddress;
64          this.ringFd = ringFd;
65          this.ringCapacity = ringCapacity;
66          this.cqeLength = cqeLength;
67          this.ringEntries = ringEntries;
68          this.kflags = kflags;
69          this.ringMask = ringMask;
70          ringHead = (int) INT_HANDLE.getVolatile(kHead, 0);
71  
72          if (extraCqeDataNeeded) {
73              // Let's create the slices up front to reduce GC-pressure and also ensure that the user
74              // can not escape the memory range.
75              // We slice every Native.CQE_SIZE to support IORING_SETUP_CQE32 and IORING_SETUP_CQE_MIXED.
76              this.extraCqeData = new ByteBuffer[ringEntries];
77              for (int i = 0; i < ringEntries; i++) {
78                  int position = i * cqeLength;
79                  completionQueueArray.position(position).limit(position + Native.CQE_SIZE);
80                  extraCqeData[i] = completionQueueArray.slice();
81                  completionQueueArray.clear();
82              }
83          } else {
84              this.extraCqeData = null;
85          }
86      }
87  
88      void close() {
89          closed = true;
90      }
91  
92      int flags() {
93          if (closed) {
94              return 0;
95          }
96          // we only need memory_order_relaxed
97          return (int) INT_HANDLE.getOpaque(kflags, 0);
98      }
99  
100     /**
101      * Returns {@code true} if any completion event is ready to be processed by
102      * {@link #process(CompletionCallback)}, {@code false} otherwise.
103      */
104     boolean hasCompletions() {
105         return !closed && ringHead != (int) INT_HANDLE.getVolatile(ktail, 0);
106     }
107 
108     int count() {
109         if (closed) {
110             return 0;
111         }
112         return (int) INT_HANDLE.getVolatile(ktail, 0) - ringHead;
113     }
114 
115     /**
116      * Process the completion events in the {@link CompletionQueue} and return the number of processed
117      * events.
118      */
119     int process(CompletionCallback callback) {
120         if (closed) {
121             return 0;
122         }
123         int tail = (int) INT_HANDLE.getVolatile(ktail, 0);
124         try {
125             int i = 0;
126             while (ringHead != tail) {
127                 int cqeIdx = cqeIdx(ringHead, ringMask);
128                 int cqePosition = cqeIdx * cqeLength;
129 
130                 long udata = completionQueueArray.getLong(cqePosition + CQE_USER_DATA_FIELD);
131                 int res = completionQueueArray.getInt(cqePosition + CQE_RES_FIELD);
132                 int flags = completionQueueArray.getInt(cqePosition + CQE_FLAGS_FIELD);
133 
134                 ringHead++;
135                 final ByteBuffer extraCqeData;
136                 if ((flags & Native.IORING_CQE_F_32) != 0) {
137                     extraCqeData = extraCqeData(cqeIdx + 1);
138                     // We used mixed mode and this was a 32 byte CQE, let's increment the head once more.
139                     ringHead++;
140                 } else if (cqeLength == Native.CQE32_SIZE) {
141                     extraCqeData = extraCqeData(cqeIdx + 1);
142                 } else {
143                     extraCqeData = null;
144                 }
145                 // Check if we should just skip it.
146                 if ((flags & Native.IORING_CQE_F_SKIP) == 0) {
147                     i++;
148 
149                     callback.handle(res, flags, udata, extraCqeData);
150                 }
151 
152                 if (ringHead == tail) {
153                     // Let's fetch the tail one more time as it might have changed because a completion might have
154                     // triggered a submission (io_uring_enter). This can happen as we automatically submit once we
155                     // run out of space in the submission queue.
156                     tail = (int) INT_HANDLE.getVolatile(ktail, 0);
157                 }
158             }
159             return i;
160         } finally {
161             // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
162             INT_HANDLE.setRelease(khead, 0, ringHead);
163         }
164     }
165 
166     private ByteBuffer extraCqeData(int cqeIdx) {
167         if (extraCqeData == null) {
168             return null;
169         }
170         ByteBuffer buffer = extraCqeData[cqeIdx];
171         buffer.clear();
172         return buffer;
173     }
174 
175     @Override
176     public String toString() {
177         StringJoiner sb = new StringJoiner(", ", "CompletionQueue [", "]");
178         if (closed) {
179             sb.add("closed");
180         } else {
181             int tail = (int) INT_HANDLE.getVolatile(ktail, 0);
182             int head = ringHead;
183             while (head != tail) {
184                 int cqePosition = cqeIdx(head++, ringMask) * cqeLength;
185                 long udata = completionQueueArray.getLong(cqePosition + CQE_USER_DATA_FIELD);
186                 int res = completionQueueArray.getInt(cqePosition + CQE_RES_FIELD);
187                 int flags = completionQueueArray.getInt(cqePosition + CQE_FLAGS_FIELD);
188                 if ((flags & Native.IORING_CQE_F_32) != 0) {
189                     // We used mixed mode and this was a 32 byte CQE, let's increment the head once more.
190                     head++;
191                 }
192                 sb.add("(res=" + res).add(", flags=" + flags).add(", udata=" + udata).add(")");
193             }
194         }
195         return sb.toString();
196     }
197 
198     private static int cqeIdx(int ringHead, int ringMask) {
199         return ringHead & ringMask;
200     }
201 }