View Javadoc
1   /*
2    * Copyright 2025 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.DuplicatedByteBuf;
20  import io.netty.buffer.SlicedByteBuf;
21  import io.netty.buffer.SwappedByteBuf;
22  import io.netty.buffer.WrappedByteBuf;
23  import io.netty.channel.unix.Buffer;
24  
25  import java.lang.invoke.MethodHandles;
26  import java.lang.invoke.VarHandle;
27  import java.nio.ByteBuffer;
28  import java.nio.ByteOrder;
29  import java.util.Arrays;
30  import java.util.concurrent.atomic.AtomicInteger;
31  
32  final class IoUringBufferRing {
33      private static final VarHandle SHORT_HANDLE =
34              MethodHandles.byteBufferViewVarHandle(short[].class, ByteOrder.nativeOrder());
35      private final ByteBuffer ioUringBufRing;
36      private final int tailFieldPosition;
37      private final short entries;
38      private final int batchSize;
39      private final int maxUnreleasedBuffers;
40      private final short mask;
41      private final short bufferGroupId;
42      private final int ringFd;
43      private final IoUringBufferRingByteBuf[] buffers;
44      private final IoUringBufferRingAllocator allocator;
45      private final IoUringBufferRingExhaustedEvent exhaustedEvent;
46      private final boolean incremental;
47      private final AtomicInteger unreleasedBuffers = new AtomicInteger();
48      private volatile boolean usable;
49      private boolean corrupted;
50      private boolean closed;
51      private int numBuffers;
52      private boolean expanded;
53  
54      IoUringBufferRing(int ringFd, ByteBuffer ioUringBufRing,
55                        short entries, int batchSize, int maxUnreleasedBuffers, short bufferGroupId, boolean incremental,
56                        IoUringBufferRingAllocator allocator) {
57          assert entries % 2 == 0;
58          this.ioUringBufRing = ioUringBufRing;
59          this.tailFieldPosition = Native.IO_URING_BUFFER_RING_TAIL;
60          this.entries = entries;
61          this.batchSize = batchSize;
62          this.maxUnreleasedBuffers = maxUnreleasedBuffers;
63          this.mask = (short) (entries - 1);
64          this.bufferGroupId = bufferGroupId;
65          this.ringFd = ringFd;
66          this.buffers = new IoUringBufferRingByteBuf[entries];
67          this.incremental = incremental;
68          this.allocator = allocator;
69          this.exhaustedEvent = new IoUringBufferRingExhaustedEvent(bufferGroupId);
70      }
71  
72      boolean isUsable() {
73          return !corrupted && usable;
74      }
75  
76      void initialize() {
77          fillBuffers();
78          usable = true;
79      }
80  
81      /**
82       * Try to expand by adding more buffers to the ring if there is any space left.
83       * This method might be called multiple times before we call {@link #fillBuffer()} again.
84       */
85      void expand() {
86          // TODO: We could also shrink the number of elements again if we find out we not use all of it frequently.
87          if (!expanded) {
88              // Only expand once before we reset expanded in fillBuffer() which is called once a buffer was completely
89              // used and moved out of the buffer ring.
90              fillBuffers();
91              expanded = true;
92          }
93      }
94  
95      private void fillBuffers() {
96          int num = Math.min(batchSize, entries - numBuffers);
97          for (short i = 0; i < num; i++) {
98              fillBuffer();
99          }
100     }
101 
102     /**
103      * @return the {@link IoUringBufferRingExhaustedEvent} that should be used to signal that there were no buffers
104      * left for this buffer ring.
105      */
106     IoUringBufferRingExhaustedEvent getExhaustedEvent() {
107         return exhaustedEvent;
108     }
109 
110     void fillBuffer() {
111         if (corrupted || closed) {
112             return;
113         }
114         short oldTail = (short) SHORT_HANDLE.get(ioUringBufRing, tailFieldPosition);
115         short ringIndex = (short) (oldTail & mask);
116         assert buffers[ringIndex] == null;
117         final ByteBuf byteBuf;
118         try {
119             byteBuf = allocator.allocate();
120         } catch (OutOfMemoryError e) {
121             // We did run out of memory, This buffer ring should be considered corrupted.
122             // TODO: In the future we could try to recover it later by trying to refill it after some time and so
123             //       bring it back to a non-corrupted state.
124             corrupted = true;
125             throw e;
126         }
127         byteBuf.writerIndex(byteBuf.capacity());
128         buffers[ringIndex] = new IoUringBufferRingByteBuf(byteBuf);
129 
130         //  see:
131         //  https://github.com/axboe/liburing/
132         //      blob/19134a8fffd406b22595a5813a3e319c19630ac9/src/include/liburing.h#L1561
133         int  position = Native.SIZEOF_IOURING_BUF * ringIndex;
134         ioUringBufRing.putLong(position + Native.IOURING_BUFFER_OFFSETOF_ADDR,
135                 IoUring.memoryAddress(byteBuf) + byteBuf.readerIndex());
136         ioUringBufRing.putInt(position + Native.IOURING_BUFFER_OFFSETOF_LEN, byteBuf.capacity());
137         ioUringBufRing.putShort(position + Native.IOURING_BUFFER_OFFSETOF_BID, ringIndex);
138 
139         // Now advanced the tail by the number of buffers that we just added.
140         SHORT_HANDLE.setRelease(ioUringBufRing, tailFieldPosition, (short) (oldTail + 1));
141         numBuffers++;
142         // We added a buffer to the ring, let's reset the expanded variable so we can expand it if we receive
143         // ENOBUFS.
144         expanded = false;
145     }
146 
147     /**
148      * Return the amount of bytes that we attempted to read for the given id.
149      * This method must be called before {@link #useBuffer(short, int, boolean)}.
150      *
151      * @param bid   the id of the buffer.
152      * @return      the attempted bytes.
153      */
154     int attemptedBytesRead(short bid) {
155         return buffers[bid].readableBytes();
156     }
157 
158     /**
159      * Use the buffer for the given buffer id. The returned {@link ByteBuf} must be released once not used anymore.
160      *
161      * @param bid           the id of the buffer
162      * @param readableBytes the number of bytes that could be read. This value might be larger then what a single
163      *                      {@link ByteBuf} can hold. Because of this, the caller should call
164      *                      @link #useBuffer(short, int, boolean)} in a loop (obtaining the next bid to use by calling
165      *                      {@link #nextBid(short)}) until all buffers could be obtained.
166      * @return              the buffer.
167      */
168     ByteBuf useBuffer(short bid, int readableBytes, boolean more) {
169         assert readableBytes > 0;
170         IoUringBufferRingByteBuf byteBuf = buffers[bid];
171 
172         allocator.lastBytesRead(byteBuf.readableBytes(), readableBytes);
173         if (incremental && more && byteBuf.readableBytes() > readableBytes) {
174             // The buffer will be used later again, just slice out what we did read so far.
175             return byteBuf.readRetainedSlice(readableBytes);
176         }
177 
178         // The buffer is considered to be used, null out the slot.
179         buffers[bid] = null;
180         numBuffers--;
181         byteBuf.markUsed();
182         return byteBuf.writerIndex(byteBuf.readerIndex() +
183                 Math.min(readableBytes, byteBuf.readableBytes()));
184     }
185 
186     short nextBid(short bid) {
187         return (short) ((bid + 1) & mask);
188     }
189 
190     /**
191      * The group id that is assigned to this buffer ring.
192      *
193      * @return group id.
194      */
195     short bufferGroupId() {
196         return bufferGroupId;
197     }
198 
199     /**
200      * Close this {@link IoUringBufferRing}, using it after this method is called will lead to undefined behaviour.
201      */
202     void close() {
203         if (closed) {
204             return;
205         }
206         closed = true;
207         Native.ioUringUnRegisterBufRing(ringFd, Buffer.memoryAddress(ioUringBufRing), entries, bufferGroupId);
208         for (ByteBuf byteBuf : buffers) {
209             if (byteBuf != null) {
210                 byteBuf.release();
211             }
212         }
213         Arrays.fill(buffers, null);
214     }
215 
216     // Package-private for testing
217     final class IoUringBufferRingByteBuf extends WrappedByteBuf {
218         IoUringBufferRingByteBuf(ByteBuf buf) {
219             super(buf);
220         }
221 
222         void markUsed() {
223             if (unreleasedBuffers.incrementAndGet() == maxUnreleasedBuffers) {
224                 usable = false;
225             }
226         }
227 
228         @SuppressWarnings("deprecation")
229         @Override
230         public ByteBuf order(ByteOrder endianness) {
231             if (endianness == order()) {
232                 return this;
233             }
234             return new SwappedByteBuf(this);
235         }
236 
237         @Override
238         public ByteBuf slice() {
239             return slice(readerIndex(), readableBytes());
240         }
241 
242         @Override
243         public ByteBuf retainedSlice() {
244             return slice().retain();
245         }
246 
247         @SuppressWarnings("deprecation")
248         @Override
249         public ByteBuf slice(int index, int length) {
250             return new SlicedByteBuf(this, index, length);
251         }
252 
253         @Override
254         public ByteBuf retainedSlice(int index, int length) {
255             return slice(index, length).retain();
256         }
257 
258         @Override
259         public ByteBuf readSlice(int length) {
260             ByteBuf slice = slice(readerIndex(), length);
261             skipBytes(length);
262             return slice;
263         }
264 
265         @Override
266         public ByteBuf readRetainedSlice(int length) {
267             ByteBuf slice = retainedSlice(readerIndex(), length);
268             try {
269                 skipBytes(length);
270             } catch (Throwable cause) {
271                 slice.release();
272                 throw cause;
273             }
274             return slice;
275         }
276 
277         @SuppressWarnings("deprecation")
278         @Override
279         public ByteBuf duplicate() {
280             return new DuplicatedByteBuf(this);
281         }
282 
283         @Override
284         public ByteBuf retainedDuplicate() {
285             return duplicate().retain();
286         }
287 
288         @Override
289         public boolean release() {
290             if (super.release()) {
291                 released();
292                 return true;
293             }
294             return false;
295         }
296 
297         @Override
298         public boolean release(int decrement) {
299             if (super.release(decrement)) {
300                 released();
301                 return true;
302             }
303             return false;
304         }
305 
306         private void released() {
307             if (unreleasedBuffers.decrementAndGet() == maxUnreleasedBuffers / 2) {
308                 usable = true;
309             }
310         }
311     }
312 }