View Javadoc
1   /*
2    * Copyright 2025 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.DuplicatedByteBuf;
20  import io.netty.buffer.SlicedByteBuf;
21  import io.netty.buffer.SwappedByteBuf;
22  import io.netty.buffer.WrappedByteBuf;
23  import io.netty.channel.unix.Buffer;
24  
25  import java.lang.invoke.MethodHandles;
26  import java.lang.invoke.VarHandle;
27  import java.nio.ByteBuffer;
28  import java.nio.ByteOrder;
29  import java.util.Arrays;
30  import java.util.concurrent.atomic.AtomicInteger;
31  
32  final class IoUringBufferRing {
33      private static final VarHandle SHORT_HANDLE =
34              MethodHandles.byteBufferViewVarHandle(short[].class, ByteOrder.nativeOrder());
35      private final ByteBuffer ioUringBufRing;
36      private final int tailFieldPosition;
37      private final short entries;
38      private final int batchSize;
39      private final int maxUnreleasedBuffers;
40      private final short mask;
41      private final short bufferGroupId;
42      private final int ringFd;
43      private final IoUringBufferRingByteBuf[] buffers;
44      private final IoUringBufferRingAllocator allocator;
45      private final IoUringBufferRingExhaustedEvent exhaustedEvent;
46      private final boolean incremental;
47      private final AtomicInteger unreleasedBuffers = new AtomicInteger();
48      private volatile boolean usable;
49      private boolean corrupted;
50      private boolean closed;
51      private int numBuffers;
52      private boolean expanded;
53      private boolean needsLazyExpansion;
54      private short lastGeneratedBid;
55      private short lastAddedBid;
56  
57      IoUringBufferRing(int ringFd, ByteBuffer ioUringBufRing,
58                        short entries, int batchSize, int maxUnreleasedBuffers, short bufferGroupId, boolean incremental,
59                        IoUringBufferRingAllocator allocator) {
60          assert entries % 2 == 0;
61          assert batchSize % 2 == 0;
62          this.ioUringBufRing = ioUringBufRing;
63          this.tailFieldPosition = Native.IO_URING_BUFFER_RING_TAIL;
64          this.entries = entries;
65          this.batchSize = batchSize;
66          this.maxUnreleasedBuffers = maxUnreleasedBuffers;
67          this.mask = (short) (entries - 1);
68          this.bufferGroupId = bufferGroupId;
69          this.ringFd = ringFd;
70          this.buffers = new IoUringBufferRingByteBuf[entries];
71          this.incremental = incremental;
72          this.allocator = allocator;
73          this.exhaustedEvent = new IoUringBufferRingExhaustedEvent(bufferGroupId);
74      }
75  
76      boolean isUsable() {
77          return !corrupted && usable;
78      }
79  
80      void initialize() {
81          for (short i = 0; i < batchSize; i++) {
82              addBuffer(i);
83              numBuffers++;
84              lastGeneratedBid = i;
85          }
86          assert numBuffers % 2 == 0;
87          usable = true;
88      }
89  
90      /**
91       * Try to expand by adding more buffers to the ring if there is any space left.
92       * This method might be called multiple times before we call {@link #useBuffer(short, int, boolean)} again.
93       */
94      void expand() {
95          if (!expanded) {
96              // Only expand once before we reset expanded in addBuffer() which is called once a buffer was completely
97              // used and moved out of the buffer ring.
98              tryExpand();
99          }
100     }
101 
102     private void tryExpand() {
103         // We only expand if the last added BID is the last generated BID. The reason for this is as we want to
104         // ensure we have a sequential ordering of BIDs as this is required for our nextBid(...) to work correctly when
105         // RECVSEND_BUNDLE is used.
106         if (lastAddedBid == lastGeneratedBid) {
107             needsLazyExpansion = false;
108             // TODO: We could also shrink the number of elements again if we find out we not use all of it frequently.
109             int num = Math.min(batchSize, entries - numBuffers);
110             assert num % 2 == 0;
111             for (short i = 0; i < num; i++) {
112                 addBuffer(++lastGeneratedBid);
113                 numBuffers++;
114             }
115         } else {
116             needsLazyExpansion = true;
117         }
118     }
119 
120     /**
121      * @return the {@link IoUringBufferRingExhaustedEvent} that should be used to signal that there were no buffers
122      * left for this buffer ring.
123      */
124     IoUringBufferRingExhaustedEvent getExhaustedEvent() {
125         return exhaustedEvent;
126     }
127 
128     private void addBuffer(short bid) {
129         if (corrupted || closed) {
130             return;
131         }
132         short oldTail = (short) SHORT_HANDLE.get(ioUringBufRing, tailFieldPosition);
133         short ringIndex = (short) (oldTail & mask);
134         assert buffers[bid] == null;
135         final ByteBuf byteBuf;
136         try {
137             byteBuf = allocator.allocate();
138         } catch (OutOfMemoryError e) {
139             // We did run out of memory, This buffer ring should be considered corrupted.
140             // TODO: In the future we could try to recover it later by trying to refill it after some time and so
141             //       bring it back to a non-corrupted state.
142             corrupted = true;
143             throw e;
144         }
145 
146         IoUringBufferRingByteBuf ioUringBuf = new IoUringBufferRingByteBuf(byteBuf.writerIndex(byteBuf.capacity()));
147 
148         //  see:
149         //  https://github.com/axboe/liburing/
150         //      blob/19134a8fffd406b22595a5813a3e319c19630ac9/src/include/liburing.h#L1561
151         int position = Native.SIZEOF_IOURING_BUF * ringIndex;
152         ioUringBufRing.putLong(position + Native.IOURING_BUFFER_OFFSETOF_ADDR,
153                 IoUring.memoryAddress(ioUringBuf) + ioUringBuf.readerIndex());
154         ioUringBufRing.putInt(position + Native.IOURING_BUFFER_OFFSETOF_LEN, ioUringBuf.readableBytes());
155         ioUringBufRing.putShort(position + Native.IOURING_BUFFER_OFFSETOF_BID, bid);
156 
157         buffers[bid] = ioUringBuf;
158         lastAddedBid = bid;
159         // Now advanced the tail by the number of buffers that we just added.
160         SHORT_HANDLE.setRelease(ioUringBufRing, tailFieldPosition, (short) (oldTail + 1));
161         // We added a buffer to the ring, let's reset the expanded variable so we can expand it if we receive
162         // ENOBUFS.
163         expanded = false;
164         if (needsLazyExpansion) {
165             tryExpand();
166         }
167     }
168 
169     /**
170      * Return the amount of bytes that we attempted to read for the given id.
171      * This method must be called before {@link #useBuffer(short, int, boolean)}.
172      *
173      * @param bid   the id of the buffer.
174      * @return      the attempted bytes.
175      */
176     int attemptedBytesRead(short bid) {
177         return buffers[bid].readableBytes();
178     }
179 
180     /**
181      * Use the buffer for the given buffer id. The returned {@link ByteBuf} must be released once not used anymore.
182      *
183      * @param bid           the id of the buffer
184      * @param readableBytes the number of bytes that could be read. This value might be larger then what a single
185      *                      {@link ByteBuf} can hold. Because of this, the caller should call
186      *                      @link #useBuffer(short, int, boolean)} in a loop (obtaining the next bid to use by calling
187      *                      {@link #nextBid(short)}) until all buffers could be obtained.
188      * @return              the buffer.
189      */
190     ByteBuf useBuffer(short bid, int readableBytes, boolean more) {
191         assert readableBytes > 0;
192         IoUringBufferRingByteBuf byteBuf = buffers[bid];
193 
194         allocator.lastBytesRead(byteBuf.readableBytes(), readableBytes);
195         if (incremental && more && byteBuf.readableBytes() > readableBytes) {
196             // The buffer will be used later again, just slice out what we did read so far.
197             return byteBuf.readRetainedSlice(readableBytes);
198         }
199 
200         // The buffer is considered to be used, null out the slot.
201         buffers[bid] = null;
202         addBuffer(bid);
203         byteBuf.markUsed();
204         return byteBuf.writerIndex(byteBuf.readerIndex() +
205                 Math.min(readableBytes, byteBuf.readableBytes()));
206     }
207 
208     short nextBid(short bid) {
209         return (short) ((bid + 1) & numBuffers - 1);
210     }
211 
212     /**
213      * The group id that is assigned to this buffer ring.
214      *
215      * @return group id.
216      */
217     short bufferGroupId() {
218         return bufferGroupId;
219     }
220 
221     /**
222      * Close this {@link IoUringBufferRing}, using it after this method is called will lead to undefined behaviour.
223      */
224     void close() {
225         if (closed) {
226             return;
227         }
228         closed = true;
229         Native.ioUringUnRegisterBufRing(ringFd, Buffer.memoryAddress(ioUringBufRing), entries, bufferGroupId);
230         for (ByteBuf byteBuf : buffers) {
231             if (byteBuf != null) {
232                 byteBuf.release();
233             }
234         }
235         Arrays.fill(buffers, null);
236     }
237 
238     // Package-private for testing
239     final class IoUringBufferRingByteBuf extends WrappedByteBuf {
240         IoUringBufferRingByteBuf(ByteBuf buf) {
241             super(buf);
242         }
243 
244         void markUsed() {
245             if (unreleasedBuffers.incrementAndGet() == maxUnreleasedBuffers) {
246                 usable = false;
247             }
248         }
249 
250         @SuppressWarnings("deprecation")
251         @Override
252         public ByteBuf order(ByteOrder endianness) {
253             if (endianness == order()) {
254                 return this;
255             }
256             return new SwappedByteBuf(this);
257         }
258 
259         @Override
260         public ByteBuf slice() {
261             return slice(readerIndex(), readableBytes());
262         }
263 
264         @Override
265         public ByteBuf retainedSlice() {
266             return slice().retain();
267         }
268 
269         @SuppressWarnings("deprecation")
270         @Override
271         public ByteBuf slice(int index, int length) {
272             return new SlicedByteBuf(this, index, length);
273         }
274 
275         @Override
276         public ByteBuf retainedSlice(int index, int length) {
277             return slice(index, length).retain();
278         }
279 
280         @Override
281         public ByteBuf readSlice(int length) {
282             ByteBuf slice = slice(readerIndex(), length);
283             skipBytes(length);
284             return slice;
285         }
286 
287         @Override
288         public ByteBuf readRetainedSlice(int length) {
289             ByteBuf slice = retainedSlice(readerIndex(), length);
290             try {
291                 skipBytes(length);
292             } catch (Throwable cause) {
293                 slice.release();
294                 throw cause;
295             }
296             return slice;
297         }
298 
299         @SuppressWarnings("deprecation")
300         @Override
301         public ByteBuf duplicate() {
302             return new DuplicatedByteBuf(this);
303         }
304 
305         @Override
306         public ByteBuf retainedDuplicate() {
307             return duplicate().retain();
308         }
309 
310         @Override
311         public boolean release() {
312             if (super.release()) {
313                 released();
314                 return true;
315             }
316             return false;
317         }
318 
319         @Override
320         public boolean release(int decrement) {
321             if (super.release(decrement)) {
322                 released();
323                 return true;
324             }
325             return false;
326         }
327 
328         private void released() {
329             if (unreleasedBuffers.decrementAndGet() == maxUnreleasedBuffers / 2) {
330                 usable = true;
331             }
332         }
333     }
334 }