1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.DuplicatedByteBuf;
20 import io.netty.buffer.SlicedByteBuf;
21 import io.netty.buffer.SwappedByteBuf;
22 import io.netty.buffer.WrappedByteBuf;
23 import io.netty.channel.unix.Buffer;
24
25 import java.lang.invoke.MethodHandles;
26 import java.lang.invoke.VarHandle;
27 import java.nio.ByteBuffer;
28 import java.nio.ByteOrder;
29 import java.util.Arrays;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 final class IoUringBufferRing {
33 private static final VarHandle SHORT_HANDLE =
34 MethodHandles.byteBufferViewVarHandle(short[].class, ByteOrder.nativeOrder());
35 private final ByteBuffer ioUringBufRing;
36 private final int tailFieldPosition;
37 private final short entries;
38 private final int batchSize;
39 private final int maxUnreleasedBuffers;
40 private final short mask;
41 private final short bufferGroupId;
42 private final int ringFd;
43 private final IoUringBufferRingByteBuf[] buffers;
44 private final IoUringBufferRingAllocator allocator;
45 private final IoUringBufferRingExhaustedEvent exhaustedEvent;
46 private final boolean incremental;
47 private final AtomicInteger unreleasedBuffers = new AtomicInteger();
48 private volatile boolean usable;
49 private boolean corrupted;
50 private boolean closed;
51 private int numBuffers;
52 private boolean expanded;
53
54 IoUringBufferRing(int ringFd, ByteBuffer ioUringBufRing,
55 short entries, int batchSize, int maxUnreleasedBuffers, short bufferGroupId, boolean incremental,
56 IoUringBufferRingAllocator allocator) {
57 assert entries % 2 == 0;
58 this.ioUringBufRing = ioUringBufRing;
59 this.tailFieldPosition = Native.IO_URING_BUFFER_RING_TAIL;
60 this.entries = entries;
61 this.batchSize = batchSize;
62 this.maxUnreleasedBuffers = maxUnreleasedBuffers;
63 this.mask = (short) (entries - 1);
64 this.bufferGroupId = bufferGroupId;
65 this.ringFd = ringFd;
66 this.buffers = new IoUringBufferRingByteBuf[entries];
67 this.incremental = incremental;
68 this.allocator = allocator;
69 this.exhaustedEvent = new IoUringBufferRingExhaustedEvent(bufferGroupId);
70 }
71
72 boolean isUsable() {
73 return !corrupted && usable;
74 }
75
76 void initialize() {
77 fillBuffers();
78 usable = true;
79 }
80
81
82
83
84
85 void expand() {
86
87 if (!expanded) {
88
89
90 fillBuffers();
91 expanded = true;
92 }
93 }
94
95 private void fillBuffers() {
96 int num = Math.min(batchSize, entries - numBuffers);
97 for (short i = 0; i < num; i++) {
98 fillBuffer();
99 }
100 }
101
102
103
104
105
106 IoUringBufferRingExhaustedEvent getExhaustedEvent() {
107 return exhaustedEvent;
108 }
109
110 void fillBuffer() {
111 if (corrupted || closed) {
112 return;
113 }
114 short oldTail = (short) SHORT_HANDLE.get(ioUringBufRing, tailFieldPosition);
115 short ringIndex = (short) (oldTail & mask);
116 assert buffers[ringIndex] == null;
117 final ByteBuf byteBuf;
118 try {
119 byteBuf = allocator.allocate();
120 } catch (OutOfMemoryError e) {
121
122
123
124 corrupted = true;
125 throw e;
126 }
127 byteBuf.writerIndex(byteBuf.capacity());
128 buffers[ringIndex] = new IoUringBufferRingByteBuf(byteBuf);
129
130
131
132
133 int position = Native.SIZEOF_IOURING_BUF * ringIndex;
134 ioUringBufRing.putLong(position + Native.IOURING_BUFFER_OFFSETOF_ADDR,
135 IoUring.memoryAddress(byteBuf) + byteBuf.readerIndex());
136 ioUringBufRing.putInt(position + Native.IOURING_BUFFER_OFFSETOF_LEN, byteBuf.capacity());
137 ioUringBufRing.putShort(position + Native.IOURING_BUFFER_OFFSETOF_BID, ringIndex);
138
139
140 SHORT_HANDLE.setRelease(ioUringBufRing, tailFieldPosition, (short) (oldTail + 1));
141 numBuffers++;
142
143
144 expanded = false;
145 }
146
147
148
149
150
151
152
153
154 int attemptedBytesRead(short bid) {
155 return buffers[bid].readableBytes();
156 }
157
158
159
160
161
162
163
164
165
166
167
168 ByteBuf useBuffer(short bid, int readableBytes, boolean more) {
169 assert readableBytes > 0;
170 IoUringBufferRingByteBuf byteBuf = buffers[bid];
171
172 allocator.lastBytesRead(byteBuf.readableBytes(), readableBytes);
173 if (incremental && more && byteBuf.readableBytes() > readableBytes) {
174
175 return byteBuf.readRetainedSlice(readableBytes);
176 }
177
178
179 buffers[bid] = null;
180 numBuffers--;
181 byteBuf.markUsed();
182 return byteBuf.writerIndex(byteBuf.readerIndex() +
183 Math.min(readableBytes, byteBuf.readableBytes()));
184 }
185
186 short nextBid(short bid) {
187 return (short) ((bid + 1) & mask);
188 }
189
190
191
192
193
194
195 short bufferGroupId() {
196 return bufferGroupId;
197 }
198
199
200
201
202 void close() {
203 if (closed) {
204 return;
205 }
206 closed = true;
207 Native.ioUringUnRegisterBufRing(ringFd, Buffer.memoryAddress(ioUringBufRing), entries, bufferGroupId);
208 for (ByteBuf byteBuf : buffers) {
209 if (byteBuf != null) {
210 byteBuf.release();
211 }
212 }
213 Arrays.fill(buffers, null);
214 }
215
216
217 final class IoUringBufferRingByteBuf extends WrappedByteBuf {
218 IoUringBufferRingByteBuf(ByteBuf buf) {
219 super(buf);
220 }
221
222 void markUsed() {
223 if (unreleasedBuffers.incrementAndGet() == maxUnreleasedBuffers) {
224 usable = false;
225 }
226 }
227
228 @SuppressWarnings("deprecation")
229 @Override
230 public ByteBuf order(ByteOrder endianness) {
231 if (endianness == order()) {
232 return this;
233 }
234 return new SwappedByteBuf(this);
235 }
236
237 @Override
238 public ByteBuf slice() {
239 return slice(readerIndex(), readableBytes());
240 }
241
242 @Override
243 public ByteBuf retainedSlice() {
244 return slice().retain();
245 }
246
247 @SuppressWarnings("deprecation")
248 @Override
249 public ByteBuf slice(int index, int length) {
250 return new SlicedByteBuf(this, index, length);
251 }
252
253 @Override
254 public ByteBuf retainedSlice(int index, int length) {
255 return slice(index, length).retain();
256 }
257
258 @Override
259 public ByteBuf readSlice(int length) {
260 ByteBuf slice = slice(readerIndex(), length);
261 skipBytes(length);
262 return slice;
263 }
264
265 @Override
266 public ByteBuf readRetainedSlice(int length) {
267 ByteBuf slice = retainedSlice(readerIndex(), length);
268 try {
269 skipBytes(length);
270 } catch (Throwable cause) {
271 slice.release();
272 throw cause;
273 }
274 return slice;
275 }
276
277 @SuppressWarnings("deprecation")
278 @Override
279 public ByteBuf duplicate() {
280 return new DuplicatedByteBuf(this);
281 }
282
283 @Override
284 public ByteBuf retainedDuplicate() {
285 return duplicate().retain();
286 }
287
288 @Override
289 public boolean release() {
290 if (super.release()) {
291 released();
292 return true;
293 }
294 return false;
295 }
296
297 @Override
298 public boolean release(int decrement) {
299 if (super.release(decrement)) {
300 released();
301 return true;
302 }
303 return false;
304 }
305
306 private void released() {
307 if (unreleasedBuffers.decrementAndGet() == maxUnreleasedBuffers / 2) {
308 usable = true;
309 }
310 }
311 }
312 }