View Javadoc
1   /*
2    * Copyright 2025 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.DuplicatedByteBuf;
20  import io.netty.buffer.SlicedByteBuf;
21  import io.netty.buffer.SwappedByteBuf;
22  import io.netty.buffer.WrappedByteBuf;
23  import io.netty.util.internal.PlatformDependent;
24  
25  import java.nio.ByteOrder;
26  import java.util.Arrays;
27  import java.util.concurrent.atomic.AtomicInteger;
28  
29  final class IoUringBufferRing {
30      private final long ioUringBufRingAddr;
31      private final long tailFieldAddress;
32      private final short entries;
33      private final int maxUnreleasedBuffers;
34      private final short mask;
35      private final short bufferGroupId;
36      private final int ringFd;
37      private final IoUringBufferRingByteBuf[] buffers;
38      private final IoUringBufferRingAllocator allocator;
39      private final IoUringBufferRingExhaustedEvent exhaustedEvent;
40      private final boolean incremental;
41      private final AtomicInteger unreleasedBuffers = new AtomicInteger();
42      private volatile boolean usable;
43      private boolean corrupted;
44  
45      IoUringBufferRing(int ringFd, long ioUringBufRingAddr,
46                        short entries, int maxUnreleasedBuffers, short bufferGroupId, boolean incremental,
47                        IoUringBufferRingAllocator allocator) {
48          assert entries % 2 == 0;
49          this.ioUringBufRingAddr = ioUringBufRingAddr;
50          this.tailFieldAddress = ioUringBufRingAddr + Native.IO_URING_BUFFER_RING_TAIL;
51          this.entries = entries;
52          this.maxUnreleasedBuffers = maxUnreleasedBuffers;
53          this.mask = (short) (entries - 1);
54          this.bufferGroupId = bufferGroupId;
55          this.ringFd = ringFd;
56          this.buffers = new IoUringBufferRingByteBuf[entries];
57          this.incremental = incremental;
58          this.allocator = allocator;
59          this.exhaustedEvent = new IoUringBufferRingExhaustedEvent(bufferGroupId);
60      }
61  
62      boolean isUsable() {
63          return !corrupted && usable;
64      }
65  
66      void fill() {
67          for (short i = 0; i < entries; i++) {
68              fillBuffer(i);
69          }
70          usable = true;
71      }
72  
73      /**
74       * @return the {@link IoUringBufferRingExhaustedEvent} that should be used to signal that there were no buffers
75       * left for this buffer ring.
76       */
77      IoUringBufferRingExhaustedEvent getExhaustedEvent() {
78          return exhaustedEvent;
79      }
80  
81      void fillBuffer(short bid) {
82          if (corrupted) {
83              return;
84          }
85          short oldTail = PlatformDependent.getShort(tailFieldAddress);
86          int ringIndex = oldTail & mask;
87          assert ringIndex == bid;
88          assert buffers[bid] == null;
89          final ByteBuf byteBuf;
90          try {
91              byteBuf = allocator.allocate();
92          } catch (OutOfMemoryError e) {
93              // We did run out of memory, This buffer ring should be considered corrupted.
94              // TODO: In the future we could try to recover it later by trying to refill it after some time and so
95              //       bring it back to a non-corrupted state.
96              corrupted = true;
97              throw e;
98          }
99          byteBuf.writerIndex(byteBuf.capacity());
100         buffers[bid] = new IoUringBufferRingByteBuf(byteBuf);
101 
102         //  see:
103         //  https://github.com/axboe/liburing/
104         //      blob/19134a8fffd406b22595a5813a3e319c19630ac9/src/include/liburing.h#L1561
105         long ioUringBufAddress = ioUringBufRingAddr + (long) Native.SIZEOF_IOURING_BUF * ringIndex;
106         PlatformDependent.putLong(ioUringBufAddress + Native.IOURING_BUFFER_OFFSETOF_ADDR,
107                 byteBuf.memoryAddress() + byteBuf.readerIndex());
108         PlatformDependent.putInt(ioUringBufAddress + Native.IOURING_BUFFER_OFFSETOF_LEN, byteBuf.capacity());
109         PlatformDependent.putShort(ioUringBufAddress + Native.IOURING_BUFFER_OFFSETOF_BID, bid);
110         // Now advanced the tail by the number of buffers that we just added.
111         PlatformDependent.putShortOrdered(tailFieldAddress, (short) (oldTail + 1));
112     }
113 
114     /**
115      * Return the amount of bytes that we attempted to read for the given id.
116      * This method must be called before {@link #useBuffer(short, int, boolean)}.
117      *
118      * @param bid   the id of the buffer.
119      * @return      the attempted bytes.
120      */
121     int attemptedBytesRead(short bid) {
122         return buffers[bid].readableBytes();
123     }
124 
125     /**
126      * Use the buffer for the given buffer id. The returned {@link ByteBuf} must be released once not used anymore.
127      *
128      * @param bid           the id of the buffer
129      * @param readableBytes the number of bytes that could be read. This value might be larger then what a single
130      *                      {@link ByteBuf} can hold. Because of this, the caller should call
131      *                      @link #useBuffer(short, int, boolean)} in a loop (obtaining the next bid to use by calling
132      *                      {@link #nextBid(short)}) until all buffers could be obtained.
133      * @return              the buffer.
134      */
135     ByteBuf useBuffer(short bid, int readableBytes, boolean more) {
136         assert readableBytes > 0;
137         IoUringBufferRingByteBuf byteBuf = buffers[bid];
138 
139         allocator.lastBytesRead(byteBuf.readableBytes(), readableBytes);
140         if (incremental && more && byteBuf.readableBytes() > readableBytes) {
141             // The buffer will be used later again, just slice out what we did read so far.
142             return byteBuf.readRetainedSlice(readableBytes);
143         }
144 
145         // The buffer is considered to be used, null out the slot.
146         buffers[bid] = null;
147         byteBuf.markUsed();
148         return byteBuf.writerIndex(byteBuf.readerIndex() +
149                 Math.min(readableBytes, byteBuf.readableBytes()));
150     }
151 
152     short nextBid(short bid) {
153         return (short) ((bid + 1) & mask);
154     }
155 
156     /**
157      * The group id that is assigned to this buffer ring.
158      *
159      * @return group id.
160      */
161     short bufferGroupId() {
162         return bufferGroupId;
163     }
164 
165     /**
166      * Close this {@link IoUringBufferRing}, using it after this method is called will lead to undefined behaviour.
167      */
168     void close() {
169         Native.ioUringUnRegisterBufRing(ringFd, ioUringBufRingAddr, entries, bufferGroupId);
170         for (ByteBuf byteBuf : buffers) {
171             if (byteBuf != null) {
172                 byteBuf.release();
173             }
174         }
175         Arrays.fill(buffers, null);
176     }
177 
178     // Package-private for testing
179     final class IoUringBufferRingByteBuf extends WrappedByteBuf {
180         IoUringBufferRingByteBuf(ByteBuf buf) {
181             super(buf);
182         }
183 
184         void markUsed() {
185             if (unreleasedBuffers.incrementAndGet() == maxUnreleasedBuffers) {
186                 usable = false;
187             }
188         }
189 
190         @SuppressWarnings("deprecation")
191         @Override
192         public ByteBuf order(ByteOrder endianness) {
193             if (endianness == order()) {
194                 return this;
195             }
196             return new SwappedByteBuf(this);
197         }
198 
199         @Override
200         public ByteBuf slice() {
201             return slice(readerIndex(), readableBytes());
202         }
203 
204         @Override
205         public ByteBuf retainedSlice() {
206             return slice().retain();
207         }
208 
209         @SuppressWarnings("deprecation")
210         @Override
211         public ByteBuf slice(int index, int length) {
212             return new SlicedByteBuf(this, index, length);
213         }
214 
215         @Override
216         public ByteBuf retainedSlice(int index, int length) {
217             return slice(index, length).retain();
218         }
219 
220         @Override
221         public ByteBuf readSlice(int length) {
222             ByteBuf slice = slice(readerIndex(), length);
223             skipBytes(length);
224             return slice;
225         }
226 
227         @Override
228         public ByteBuf readRetainedSlice(int length) {
229             ByteBuf slice = retainedSlice(readerIndex(), length);
230             try {
231                 skipBytes(length);
232             } catch (Throwable cause) {
233                 slice.release();
234                 throw cause;
235             }
236             return slice;
237         }
238 
239         @SuppressWarnings("deprecation")
240         @Override
241         public ByteBuf duplicate() {
242             return new DuplicatedByteBuf(this);
243         }
244 
245         @Override
246         public ByteBuf retainedDuplicate() {
247             return duplicate().retain();
248         }
249 
250         @Override
251         public boolean release() {
252             if (super.release()) {
253                 released();
254                 return true;
255             }
256             return false;
257         }
258 
259         @Override
260         public boolean release(int decrement) {
261             if (super.release(decrement)) {
262                 released();
263                 return true;
264             }
265             return false;
266         }
267 
268         private void released() {
269             if (unreleasedBuffers.decrementAndGet() == maxUnreleasedBuffers / 2) {
270                 usable = true;
271             }
272         }
273     }
274 }