View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import java.lang.invoke.MethodHandles;
19  import java.lang.invoke.VarHandle;
20  import java.nio.ByteBuffer;
21  import java.nio.ByteOrder;
22  import java.util.StringJoiner;
23  
24  /**
25   * Completion queue implementation for io_uring.
26   */
27  final class CompletionQueue {
28      private static final VarHandle INT_HANDLE =
29              MethodHandles.byteBufferViewVarHandle(int[].class, ByteOrder.nativeOrder());
30  
31      //these offsets are used to access specific properties
32      //CQE (https://github.com/axboe/liburing/blob/master/src/include/liburing/io_uring.h#L162)
33      private static final int CQE_USER_DATA_FIELD = 0;
34      private static final int CQE_RES_FIELD = 8;
35      private static final int CQE_FLAGS_FIELD = 12;
36  
37      //these unsigned integer pointers(shared with the kernel) will be changed by the kernel and us
38      // using a VarHandle.
39      private final ByteBuffer khead;
40      private final ByteBuffer ktail;
41      private final ByteBuffer kflags;
42      private final ByteBuffer completionQueueArray;
43      private final ByteBuffer[] extraCqeData;
44  
45      final int ringSize;
46      final long ringAddress;
47      final int ringFd;
48      final int ringEntries;
49      final int ringCapacity;
50      private final int cqeLength;
51  
52      private final int ringMask;
53      private int ringHead;
54      private boolean closed;
55  
56      CompletionQueue(ByteBuffer kHead, ByteBuffer kTail, int ringMask, int ringEntries, ByteBuffer kflags,
57                      ByteBuffer completionQueueArray, int ringSize, long ringAddress,
58                      int ringFd, int ringCapacity, int cqeLength) {
59          this.khead = kHead;
60          this.ktail = kTail;
61          this.completionQueueArray = completionQueueArray;
62          this.ringSize = ringSize;
63          this.ringAddress = ringAddress;
64          this.ringFd = ringFd;
65          this.ringCapacity = ringCapacity;
66          this.cqeLength = cqeLength;
67          this.ringEntries = ringEntries;
68          this.kflags = kflags;
69          this.ringMask = ringMask;
70          ringHead = (int) INT_HANDLE.getVolatile(kHead, 0);
71  
72          if (cqeLength == Native.CQE32_SIZE) {
73              // Let's create the slices up front to reduce GC-pressure and also ensure that the user
74              // can not escape the memory range.
75              this.extraCqeData = new ByteBuffer[ringEntries];
76              for (int i = 0; i < ringEntries; i++) {
77                  int position = i * cqeLength + Native.CQE_SIZE;
78                  completionQueueArray.position(position).limit(position + Native.CQE_SIZE);
79                  extraCqeData[i] = completionQueueArray.slice();
80                  completionQueueArray.clear();
81              }
82              completionQueueArray.clear();
83          } else {
84              assert cqeLength == Native.CQE_SIZE;
85              this.extraCqeData = null;
86          }
87      }
88  
89      void close() {
90          closed = true;
91      }
92  
93      int flags() {
94          if (closed) {
95              return 0;
96          }
97          // we only need memory_order_relaxed
98          return (int) INT_HANDLE.getOpaque(kflags, 0);
99      }
100 
101     /**
102      * Returns {@code true} if any completion event is ready to be processed by
103      * {@link #process(CompletionCallback)}, {@code false} otherwise.
104      */
105     boolean hasCompletions() {
106         return !closed && ringHead != (int) INT_HANDLE.getVolatile(ktail, 0);
107     }
108 
109     int count() {
110         if (closed) {
111             return 0;
112         }
113         return (int) INT_HANDLE.getVolatile(ktail, 0) - ringHead;
114     }
115 
116     /**
117      * Process the completion events in the {@link CompletionQueue} and return the number of processed
118      * events.
119      */
120     int process(CompletionCallback callback) {
121         if (closed) {
122             return 0;
123         }
124         int tail = (int) INT_HANDLE.getVolatile(ktail, 0);
125         try {
126             int i = 0;
127             while (ringHead != tail) {
128                 int cqeIdx = cqeIdx(ringHead, ringMask);
129                 int cqePosition = cqeIdx *  cqeLength;
130 
131                 long udata = completionQueueArray.getLong(cqePosition + CQE_USER_DATA_FIELD);
132                 int res = completionQueueArray.getInt(cqePosition + CQE_RES_FIELD);
133                 int flags = completionQueueArray.getInt(cqePosition + CQE_FLAGS_FIELD);
134 
135                 ringHead++;
136 
137                 i++;
138                 if (!callback.handle(res, flags, udata, extraCqeData(cqeIdx))) {
139                     // Stop processing. as the callback can not handle any more completions for now,
140                     break;
141                 }
142                 if (ringHead == tail) {
143                     // Let's fetch the tail one more time as it might have changed because a completion might have
144                     // triggered a submission (io_uring_enter). This can happen as we automatically submit once we
145                     // run out of space in the submission queue.
146                     tail = (int) INT_HANDLE.getVolatile(ktail, 0);
147                 }
148             }
149             return i;
150         } finally {
151             // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
152             INT_HANDLE.setRelease(khead, 0, ringHead);
153         }
154     }
155 
156     private ByteBuffer extraCqeData(int cqeIdx) {
157         if (extraCqeData == null) {
158             return null;
159         }
160         ByteBuffer buffer = extraCqeData[cqeIdx];
161         buffer.clear();
162         return buffer;
163     }
164 
165     @Override
166     public String toString() {
167         StringJoiner sb = new StringJoiner(", ", "CompletionQueue [", "]");
168         if (closed) {
169             sb.add("closed");
170         } else {
171             int tail = (int) INT_HANDLE.getVolatile(ktail, 0);
172             int head = ringHead;
173             while (head != tail) {
174                 int cqePosition = cqeIdx(head++, ringMask) * cqeLength;
175                 long udata = completionQueueArray.getLong(cqePosition + CQE_USER_DATA_FIELD);
176                 int res = completionQueueArray.getInt(cqePosition + CQE_RES_FIELD);
177                 int flags = completionQueueArray.getInt(cqePosition + CQE_FLAGS_FIELD);
178 
179                 sb.add("(res=" + res).add(", flags=" + flags).add(", udata=" + udata).add(")");
180             }
181         }
182         return sb.toString();
183     }
184 
185     private static int cqeIdx(int ringHead, int ringMask) {
186         return ringHead & ringMask;
187     }
188 }