1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.DuplicatedByteBuf;
20 import io.netty.buffer.SlicedByteBuf;
21 import io.netty.buffer.SwappedByteBuf;
22 import io.netty.buffer.WrappedByteBuf;
23 import io.netty.channel.unix.Buffer;
24
25 import java.lang.invoke.MethodHandles;
26 import java.lang.invoke.VarHandle;
27 import java.nio.ByteBuffer;
28 import java.nio.ByteOrder;
29 import java.util.Arrays;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 final class IoUringBufferRing {
33 private static final VarHandle SHORT_HANDLE =
34 MethodHandles.byteBufferViewVarHandle(short[].class, ByteOrder.nativeOrder());
35 private final ByteBuffer ioUringBufRing;
36 private final int tailFieldPosition;
37 private final short entries;
38 private final int batchSize;
39 private final int maxUnreleasedBuffers;
40 private final short mask;
41 private final short bufferGroupId;
42 private final int ringFd;
43 private final IoUringBufferRingByteBuf[] buffers;
44 private final IoUringBufferRingAllocator allocator;
45 private final IoUringBufferRingExhaustedEvent exhaustedEvent;
46 private final boolean incremental;
47 private final AtomicInteger unreleasedBuffers = new AtomicInteger();
48 private volatile boolean usable;
49 private boolean corrupted;
50 private boolean closed;
51 private int numBuffers;
52 private boolean expanded;
53 private boolean needsLazyExpansion;
54 private short lastGeneratedBid;
55 private short lastAddedBid;
56
57 IoUringBufferRing(int ringFd, ByteBuffer ioUringBufRing,
58 short entries, int batchSize, int maxUnreleasedBuffers, short bufferGroupId, boolean incremental,
59 IoUringBufferRingAllocator allocator) {
60 assert entries % 2 == 0;
61 assert batchSize % 2 == 0;
62 this.ioUringBufRing = ioUringBufRing;
63 this.tailFieldPosition = Native.IO_URING_BUFFER_RING_TAIL;
64 this.entries = entries;
65 this.batchSize = batchSize;
66 this.maxUnreleasedBuffers = maxUnreleasedBuffers;
67 this.mask = (short) (entries - 1);
68 this.bufferGroupId = bufferGroupId;
69 this.ringFd = ringFd;
70 this.buffers = new IoUringBufferRingByteBuf[entries];
71 this.incremental = incremental;
72 this.allocator = allocator;
73 this.exhaustedEvent = new IoUringBufferRingExhaustedEvent(bufferGroupId);
74 }
75
76 boolean isUsable() {
77 return !corrupted && usable;
78 }
79
80 void initialize() {
81 for (short i = 0; i < batchSize; i++) {
82 addBuffer(i);
83 numBuffers++;
84 lastGeneratedBid = i;
85 }
86 assert numBuffers % 2 == 0;
87 usable = true;
88 }
89
90
91
92
93
94 void expand() {
95 if (!expanded) {
96
97
98 tryExpand();
99 }
100 }
101
102 private void tryExpand() {
103
104
105
106 if (lastAddedBid == lastGeneratedBid) {
107 needsLazyExpansion = false;
108
109 int num = Math.min(batchSize, entries - numBuffers);
110 assert num % 2 == 0;
111 for (short i = 0; i < num; i++) {
112 addBuffer(++lastGeneratedBid);
113 numBuffers++;
114 }
115 } else {
116 needsLazyExpansion = true;
117 }
118 }
119
120
121
122
123
124 IoUringBufferRingExhaustedEvent getExhaustedEvent() {
125 return exhaustedEvent;
126 }
127
128 private void addBuffer(short bid) {
129 if (corrupted || closed) {
130 return;
131 }
132 short oldTail = (short) SHORT_HANDLE.get(ioUringBufRing, tailFieldPosition);
133 short ringIndex = (short) (oldTail & mask);
134 assert buffers[bid] == null;
135 final ByteBuf byteBuf;
136 try {
137 byteBuf = allocator.allocate();
138 } catch (OutOfMemoryError e) {
139
140
141
142 corrupted = true;
143 throw e;
144 }
145
146 IoUringBufferRingByteBuf ioUringBuf = new IoUringBufferRingByteBuf(byteBuf.writerIndex(byteBuf.capacity()));
147
148
149
150
151 int position = Native.SIZEOF_IOURING_BUF * ringIndex;
152 ioUringBufRing.putLong(position + Native.IOURING_BUFFER_OFFSETOF_ADDR,
153 IoUring.memoryAddress(ioUringBuf) + ioUringBuf.readerIndex());
154 ioUringBufRing.putInt(position + Native.IOURING_BUFFER_OFFSETOF_LEN, ioUringBuf.readableBytes());
155 ioUringBufRing.putShort(position + Native.IOURING_BUFFER_OFFSETOF_BID, bid);
156
157 buffers[bid] = ioUringBuf;
158 lastAddedBid = bid;
159
160 SHORT_HANDLE.setRelease(ioUringBufRing, tailFieldPosition, (short) (oldTail + 1));
161
162
163 expanded = false;
164 if (needsLazyExpansion) {
165 tryExpand();
166 }
167 }
168
169
170
171
172
173
174
175
176 int attemptedBytesRead(short bid) {
177 return buffers[bid].readableBytes();
178 }
179
180
181
182
183
184
185
186
187
188
189
190 ByteBuf useBuffer(short bid, int readableBytes, boolean more) {
191 assert readableBytes > 0;
192 IoUringBufferRingByteBuf byteBuf = buffers[bid];
193
194 allocator.lastBytesRead(byteBuf.readableBytes(), readableBytes);
195 if (incremental && more && byteBuf.readableBytes() > readableBytes) {
196
197 return byteBuf.readRetainedSlice(readableBytes);
198 }
199
200
201 buffers[bid] = null;
202 addBuffer(bid);
203 byteBuf.markUsed();
204 return byteBuf.writerIndex(byteBuf.readerIndex() +
205 Math.min(readableBytes, byteBuf.readableBytes()));
206 }
207
208 short nextBid(short bid) {
209 return (short) ((bid + 1) & numBuffers - 1);
210 }
211
212
213
214
215
216
217 short bufferGroupId() {
218 return bufferGroupId;
219 }
220
221
222
223
224 void close() {
225 if (closed) {
226 return;
227 }
228 closed = true;
229 Native.ioUringUnRegisterBufRing(ringFd, Buffer.memoryAddress(ioUringBufRing), entries, bufferGroupId);
230 for (ByteBuf byteBuf : buffers) {
231 if (byteBuf != null) {
232 byteBuf.release();
233 }
234 }
235 Arrays.fill(buffers, null);
236 }
237
238
239 final class IoUringBufferRingByteBuf extends WrappedByteBuf {
240 IoUringBufferRingByteBuf(ByteBuf buf) {
241 super(buf);
242 }
243
244 void markUsed() {
245 if (unreleasedBuffers.incrementAndGet() == maxUnreleasedBuffers) {
246 usable = false;
247 }
248 }
249
250 @SuppressWarnings("deprecation")
251 @Override
252 public ByteBuf order(ByteOrder endianness) {
253 if (endianness == order()) {
254 return this;
255 }
256 return new SwappedByteBuf(this);
257 }
258
259 @Override
260 public ByteBuf slice() {
261 return slice(readerIndex(), readableBytes());
262 }
263
264 @Override
265 public ByteBuf retainedSlice() {
266 return slice().retain();
267 }
268
269 @SuppressWarnings("deprecation")
270 @Override
271 public ByteBuf slice(int index, int length) {
272 return new SlicedByteBuf(this, index, length);
273 }
274
275 @Override
276 public ByteBuf retainedSlice(int index, int length) {
277 return slice(index, length).retain();
278 }
279
280 @Override
281 public ByteBuf readSlice(int length) {
282 ByteBuf slice = slice(readerIndex(), length);
283 skipBytes(length);
284 return slice;
285 }
286
287 @Override
288 public ByteBuf readRetainedSlice(int length) {
289 ByteBuf slice = retainedSlice(readerIndex(), length);
290 try {
291 skipBytes(length);
292 } catch (Throwable cause) {
293 slice.release();
294 throw cause;
295 }
296 return slice;
297 }
298
299 @SuppressWarnings("deprecation")
300 @Override
301 public ByteBuf duplicate() {
302 return new DuplicatedByteBuf(this);
303 }
304
305 @Override
306 public ByteBuf retainedDuplicate() {
307 return duplicate().retain();
308 }
309
310 @Override
311 public boolean release() {
312 if (super.release()) {
313 released();
314 return true;
315 }
316 return false;
317 }
318
319 @Override
320 public boolean release(int decrement) {
321 if (super.release(decrement)) {
322 released();
323 return true;
324 }
325 return false;
326 }
327
328 private void released() {
329 if (unreleasedBuffers.decrementAndGet() == maxUnreleasedBuffers / 2) {
330 usable = true;
331 }
332 }
333 }
334 }