View Javadoc
1   /*
2    * Copyright 2025 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.unix.Buffer;
20  
21  import java.lang.invoke.MethodHandles;
22  import java.lang.invoke.VarHandle;
23  import java.nio.ByteBuffer;
24  import java.nio.ByteOrder;
25  import java.util.Arrays;
26  import java.util.function.Consumer;
27  
28  final class IoUringBufferRing {
29      private static final VarHandle SHORT_HANDLE =
30              MethodHandles.byteBufferViewVarHandle(short[].class, ByteOrder.nativeOrder());
31      private final ByteBuffer ioUringBufRing;
32      private final int tailFieldPosition;
33      private final short entries;
34      private final short mask;
35      private final short bufferGroupId;
36      private final int ringFd;
37      private final ByteBuf[] buffers;
38      private final IoUringBufferRingAllocator allocator;
39      private final boolean batchAllocation;
40      private final IoUringBufferRingExhaustedEvent exhaustedEvent;
41      private final RingConsumer ringConsumer;
42      private final boolean incremental;
43      private final int batchSize;
44      private boolean corrupted;
45      private boolean closed;
46      private int usableBuffers;
47      private int allocatedBuffers;
48      private boolean needExpand;
49      private short lastGeneratedBid;
50  
51      IoUringBufferRing(int ringFd, ByteBuffer ioUringBufRing,
52                        short entries, int batchSize, short bufferGroupId, boolean incremental,
53                        IoUringBufferRingAllocator allocator, boolean batchAllocation) {
54          assert entries % 2 == 0;
55          assert batchSize % 2 == 0;
56          this.batchSize = batchSize;
57          this.ioUringBufRing = ioUringBufRing;
58          this.tailFieldPosition = Native.IO_URING_BUFFER_RING_TAIL;
59          this.entries = entries;
60          this.mask = (short) (entries - 1);
61          this.bufferGroupId = bufferGroupId;
62          this.ringFd = ringFd;
63          this.buffers = new ByteBuf[entries];
64          this.incremental = incremental;
65          this.allocator = allocator;
66          this.batchAllocation = batchAllocation;
67          this.ringConsumer  = new RingConsumer();
68          this.exhaustedEvent = new IoUringBufferRingExhaustedEvent(bufferGroupId);
69      }
70  
71      boolean isUsable() {
72          return !closed && !corrupted;
73      }
74  
75      void initialize() {
76          // We already validated that batchSize is <= ring length.
77          fill((short) 0, batchSize);
78          allocatedBuffers = batchSize;
79      }
80  
81      private final class RingConsumer implements Consumer<ByteBuf> {
82          private int expectedBuffers;
83          private short num;
84          private short bid;
85          private short oldTail;
86  
87          short fill(short startBid, int numBuffers) {
88              // Fetch the tail once before allocate the batch.
89              oldTail = (short) SHORT_HANDLE.get(ioUringBufRing, tailFieldPosition);
90  
91              // At the moment we always start with bid 0 and so num and bid is the same. As this is more of an
92              // implementation detail it is better to still keep both separated.
93              this.num = 0;
94              this.bid = startBid;
95              this.expectedBuffers = numBuffers;
96              try {
97                  if (batchAllocation) {
98                      allocator.allocateBatch(this, numBuffers);
99                  } else {
100                     for (int i = 0; i < numBuffers; i++) {
101                         add(oldTail, bid++, num++, allocator.allocate());
102                     }
103                 }
104             } catch (Throwable t) {
105                 corrupted = true;
106                 for (int i = 0; i < buffers.length; i++) {
107                     ByteBuf buffer = buffers[i];
108                     if (buffer != null) {
109                         buffer.release();
110                         buffers[i] = null;
111                     }
112                 }
113                 throw t;
114             }
115             // Now advanced the tail by the number of buffers that we just added.
116             SHORT_HANDLE.setRelease(ioUringBufRing, tailFieldPosition, (short) (oldTail + num));
117 
118             return (short) (bid - 1);
119         }
120 
121         void fill(short bid) {
122             short tail = (short) SHORT_HANDLE.get(ioUringBufRing, tailFieldPosition);
123             add(tail, bid, 0, allocator.allocate());
124             // Now advanced the tail by one
125             SHORT_HANDLE.setRelease(ioUringBufRing, tailFieldPosition, (short) (tail + 1));
126         }
127 
128         @Override
129         public void accept(ByteBuf byteBuf) {
130             if (corrupted || closed) {
131                 byteBuf.release();
132                 throw new IllegalStateException("Already closed");
133             }
134             if (expectedBuffers == num) {
135                 byteBuf.release();
136                 throw new IllegalStateException("Produced too many buffers");
137             }
138             add(oldTail, bid++, num++, byteBuf);
139         }
140 
141         private void add(int tail, short bid, int offset, ByteBuf byteBuf) {
142             short ringIndex = (short) ((tail + offset) & mask);
143             assert buffers[bid] == null;
144 
145             long memoryAddress = IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex();
146             int writable = byteBuf.writableBytes();
147 
148             //  see:
149             //  https://github.com/axboe/liburing/
150             //      blob/19134a8fffd406b22595a5813a3e319c19630ac9/src/include/liburing.h#L1561
151             int position = Native.SIZEOF_IOURING_BUF * ringIndex;
152             ioUringBufRing.putLong(position + Native.IOURING_BUFFER_OFFSETOF_ADDR, memoryAddress);
153             ioUringBufRing.putInt(position + Native.IOURING_BUFFER_OFFSETOF_LEN, writable);
154             ioUringBufRing.putShort(position + Native.IOURING_BUFFER_OFFSETOF_BID, bid);
155 
156             buffers[bid] = byteBuf;
157         }
158     }
159 
160     /**
161      * Try to expand by adding more buffers to the ring if there is any space left, this will be done lazy.
162      *
163      * @return {@code true} if we can expand the number of buffers in the ring, {@code false} otherwise.
164      */
165     boolean expand() {
166         needExpand = true;
167         return allocatedBuffers < buffers.length;
168     }
169 
170     private void fill(short startBid, int buffers) {
171         if (corrupted || closed) {
172             return;
173         }
174         assert buffers % 2 == 0;
175         lastGeneratedBid = ringConsumer.fill(startBid, buffers);
176         usableBuffers += buffers;
177     }
178 
179     private void fill(short bid) {
180         if (corrupted || closed) {
181             return;
182         }
183         ringConsumer.fill(bid);
184         usableBuffers++;
185     }
186 
187     /**
188      * @return the {@link IoUringBufferRingExhaustedEvent} that should be used to signal that there were no buffers
189      * left for this buffer ring.
190      */
191     IoUringBufferRingExhaustedEvent getExhaustedEvent() {
192         return exhaustedEvent;
193     }
194 
195     /**
196      * Return the amount of bytes that we attempted to read for the given id.
197      * This method must be called before {@link #useBuffer(short, int, boolean)}.
198      *
199      * @param bid   the id of the buffer.
200      * @return      the attempted bytes.
201      */
202     int attemptedBytesRead(short bid) {
203         return buffers[bid].writableBytes();
204     }
205 
206     private int calculateNextBufferBatch() {
207         return Math.min(batchSize, entries - allocatedBuffers);
208     }
209 
210     /**
211      * Use the buffer for the given buffer id. The returned {@link ByteBuf} must be released once not used anymore.
212      *
213      * @param bid           the id of the buffer
214      * @param read          the number of bytes that could be read. This value might be larger then what a single
215      *                      {@link ByteBuf} can hold. Because of this, the caller should call
216      *                      @link #useBuffer(short, int, boolean)} in a loop (obtaining the next bid to use by calling
217      *                      {@link #nextBid(short)}) until all buffers could be obtained.
218      * @return              the buffer.
219      */
220     ByteBuf useBuffer(short bid, int read, boolean more) {
221         assert read > 0;
222         ByteBuf byteBuf = buffers[bid];
223 
224         allocator.lastBytesRead(byteBuf.writableBytes(), read);
225         // We always slice so the user will not mess up things later.
226         ByteBuf buffer = byteBuf.retainedSlice(byteBuf.writerIndex(), read);
227         byteBuf.writerIndex(byteBuf.writerIndex() + read);
228 
229         if (incremental && more && byteBuf.isWritable()) {
230             // The buffer will be used later again, just slice out what we did read so far.
231             return buffer;
232         }
233 
234         // The buffer is considered to be used, null out the slot.
235         buffers[bid] = null;
236         byteBuf.release();
237         if (--usableBuffers == 0) {
238             int numBuffers = allocatedBuffers;
239             if (needExpand) {
240                 // We did get a signal that our buffer ring did not have enough buffers, let's see if we
241                 // can grow it.
242                 needExpand = false;
243                 numBuffers += calculateNextBufferBatch();
244             }
245             fill((short) 0, numBuffers);
246             allocatedBuffers = numBuffers;
247             assert allocatedBuffers % 2 == 0;
248         } else if (!batchAllocation) {
249             // If we don'T do bulk allocations to refill the buffer ring we need to fill in the just used bid again
250             // if we didn't get a signal that we need expansion.
251             fill(bid);
252 
253             if (needExpand && lastGeneratedBid == bid) {
254                 // We did get a signal that our buffer ring did not have enough buffers and we just did add the last
255                 // generated bid at the tail of the ring. Now its safe to grow the buffer ring and still guarantee
256                 // sequential ordering which is needed for our RECVSEND_BUNDLE implementation.
257                 needExpand = false;
258                 int numBuffers = calculateNextBufferBatch();
259                 fill((short) (bid + 1), numBuffers);
260                 allocatedBuffers += numBuffers;
261                 assert allocatedBuffers % 2 == 0;
262             }
263         }
264         return buffer;
265     }
266 
267     short nextBid(short bid) {
268         return (short) ((bid + 1) & allocatedBuffers - 1);
269     }
270 
271     /**
272      * The group id that is assigned to this buffer ring.
273      *
274      * @return group id.
275      */
276     short bufferGroupId() {
277         return bufferGroupId;
278     }
279 
280     /**
281      * Close this {@link IoUringBufferRing}, using it after this method is called will lead to undefined behaviour.
282      */
283     void close() {
284         if (closed) {
285             return;
286         }
287         closed = true;
288         Native.ioUringUnRegisterBufRing(ringFd, Buffer.memoryAddress(ioUringBufRing), entries, bufferGroupId);
289         for (ByteBuf byteBuf : buffers) {
290             if (byteBuf != null) {
291                 byteBuf.release();
292             }
293         }
294         Arrays.fill(buffers, null);
295     }
296 }