1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.unix.Buffer;
20
21 import java.lang.invoke.MethodHandles;
22 import java.lang.invoke.VarHandle;
23 import java.nio.ByteBuffer;
24 import java.nio.ByteOrder;
25 import java.util.Arrays;
26 import java.util.function.Consumer;
27
28 final class IoUringBufferRing {
29 private static final VarHandle SHORT_HANDLE =
30 MethodHandles.byteBufferViewVarHandle(short[].class, ByteOrder.nativeOrder());
31 private final ByteBuffer ioUringBufRing;
32 private final int tailFieldPosition;
33 private final short entries;
34 private final short mask;
35 private final short bufferGroupId;
36 private final int ringFd;
37 private final ByteBuf[] buffers;
38 private final IoUringBufferRingAllocator allocator;
39 private final boolean batchAllocation;
40 private final IoUringBufferRingExhaustedEvent exhaustedEvent;
41 private final RingConsumer ringConsumer;
42 private final boolean incremental;
43 private final int batchSize;
44 private boolean corrupted;
45 private boolean closed;
46 private int usableBuffers;
47 private int allocatedBuffers;
48 private boolean needExpand;
49 private short lastGeneratedBid;
50
51 IoUringBufferRing(int ringFd, ByteBuffer ioUringBufRing,
52 short entries, int batchSize, short bufferGroupId, boolean incremental,
53 IoUringBufferRingAllocator allocator, boolean batchAllocation) {
54 assert entries % 2 == 0;
55 assert batchSize % 2 == 0;
56 this.batchSize = batchSize;
57 this.ioUringBufRing = ioUringBufRing;
58 this.tailFieldPosition = Native.IO_URING_BUFFER_RING_TAIL;
59 this.entries = entries;
60 this.mask = (short) (entries - 1);
61 this.bufferGroupId = bufferGroupId;
62 this.ringFd = ringFd;
63 this.buffers = new ByteBuf[entries];
64 this.incremental = incremental;
65 this.allocator = allocator;
66 this.batchAllocation = batchAllocation;
67 this.ringConsumer = new RingConsumer();
68 this.exhaustedEvent = new IoUringBufferRingExhaustedEvent(bufferGroupId);
69 }
70
71 boolean isUsable() {
72 return !closed && !corrupted;
73 }
74
75 void initialize() {
76
77 fill((short) 0, batchSize);
78 allocatedBuffers = batchSize;
79 }
80
81 private final class RingConsumer implements Consumer<ByteBuf> {
82 private int expectedBuffers;
83 private short num;
84 private short bid;
85 private short oldTail;
86
87 short fill(short startBid, int numBuffers) {
88
89 oldTail = (short) SHORT_HANDLE.get(ioUringBufRing, tailFieldPosition);
90
91
92
93 this.num = 0;
94 this.bid = startBid;
95 this.expectedBuffers = numBuffers;
96 try {
97 if (batchAllocation) {
98 allocator.allocateBatch(this, numBuffers);
99 } else {
100 for (int i = 0; i < numBuffers; i++) {
101 add(oldTail, bid++, num++, allocator.allocate());
102 }
103 }
104 } catch (Throwable t) {
105 corrupted = true;
106 for (int i = 0; i < buffers.length; i++) {
107 ByteBuf buffer = buffers[i];
108 if (buffer != null) {
109 buffer.release();
110 buffers[i] = null;
111 }
112 }
113 throw t;
114 }
115
116 SHORT_HANDLE.setRelease(ioUringBufRing, tailFieldPosition, (short) (oldTail + num));
117
118 return (short) (bid - 1);
119 }
120
121 void fill(short bid) {
122 short tail = (short) SHORT_HANDLE.get(ioUringBufRing, tailFieldPosition);
123 add(tail, bid, 0, allocator.allocate());
124
125 SHORT_HANDLE.setRelease(ioUringBufRing, tailFieldPosition, (short) (tail + 1));
126 }
127
128 @Override
129 public void accept(ByteBuf byteBuf) {
130 if (corrupted || closed) {
131 byteBuf.release();
132 throw new IllegalStateException("Already closed");
133 }
134 if (expectedBuffers == num) {
135 byteBuf.release();
136 throw new IllegalStateException("Produced too many buffers");
137 }
138 add(oldTail, bid++, num++, byteBuf);
139 }
140
141 private void add(int tail, short bid, int offset, ByteBuf byteBuf) {
142 short ringIndex = (short) ((tail + offset) & mask);
143 assert buffers[bid] == null;
144
145 long memoryAddress = IoUring.memoryAddress(byteBuf) + byteBuf.writerIndex();
146 int writable = byteBuf.writableBytes();
147
148
149
150
151 int position = Native.SIZEOF_IOURING_BUF * ringIndex;
152 ioUringBufRing.putLong(position + Native.IOURING_BUFFER_OFFSETOF_ADDR, memoryAddress);
153 ioUringBufRing.putInt(position + Native.IOURING_BUFFER_OFFSETOF_LEN, writable);
154 ioUringBufRing.putShort(position + Native.IOURING_BUFFER_OFFSETOF_BID, bid);
155
156 buffers[bid] = byteBuf;
157 }
158 }
159
160
161
162
163
164
165 boolean expand() {
166 needExpand = true;
167 return allocatedBuffers < buffers.length;
168 }
169
170 private void fill(short startBid, int buffers) {
171 if (corrupted || closed) {
172 return;
173 }
174 assert buffers % 2 == 0;
175 lastGeneratedBid = ringConsumer.fill(startBid, buffers);
176 usableBuffers += buffers;
177 }
178
179 private void fill(short bid) {
180 if (corrupted || closed) {
181 return;
182 }
183 ringConsumer.fill(bid);
184 usableBuffers++;
185 }
186
187
188
189
190
191 IoUringBufferRingExhaustedEvent getExhaustedEvent() {
192 return exhaustedEvent;
193 }
194
195
196
197
198
199
200
201
202 int attemptedBytesRead(short bid) {
203 return buffers[bid].writableBytes();
204 }
205
206 private int calculateNextBufferBatch() {
207 return Math.min(batchSize, entries - allocatedBuffers);
208 }
209
210
211
212
213
214
215
216
217
218
219
220 ByteBuf useBuffer(short bid, int read, boolean more) {
221 assert read > 0;
222 ByteBuf byteBuf = buffers[bid];
223
224 allocator.lastBytesRead(byteBuf.writableBytes(), read);
225
226 ByteBuf buffer = byteBuf.retainedSlice(byteBuf.writerIndex(), read);
227 byteBuf.writerIndex(byteBuf.writerIndex() + read);
228
229 if (incremental && more && byteBuf.isWritable()) {
230
231 return buffer;
232 }
233
234
235 buffers[bid] = null;
236 byteBuf.release();
237 if (--usableBuffers == 0) {
238 int numBuffers = allocatedBuffers;
239 if (needExpand) {
240
241
242 needExpand = false;
243 numBuffers += calculateNextBufferBatch();
244 }
245 fill((short) 0, numBuffers);
246 allocatedBuffers = numBuffers;
247 assert allocatedBuffers % 2 == 0;
248 } else if (!batchAllocation) {
249
250
251 fill(bid);
252
253 if (needExpand && lastGeneratedBid == bid) {
254
255
256
257 needExpand = false;
258 int numBuffers = calculateNextBufferBatch();
259 fill((short) (bid + 1), numBuffers);
260 allocatedBuffers += numBuffers;
261 assert allocatedBuffers % 2 == 0;
262 }
263 }
264 return buffer;
265 }
266
267 short nextBid(short bid) {
268 return (short) ((bid + 1) & allocatedBuffers - 1);
269 }
270
271
272
273
274
275
276 short bufferGroupId() {
277 return bufferGroupId;
278 }
279
280
281
282
283 void close() {
284 if (closed) {
285 return;
286 }
287 closed = true;
288 Native.ioUringUnRegisterBufRing(ringFd, Buffer.memoryAddress(ioUringBufRing), entries, bufferGroupId);
289 for (ByteBuf byteBuf : buffers) {
290 if (byteBuf != null) {
291 byteBuf.release();
292 }
293 }
294 Arrays.fill(buffers, null);
295 }
296 }