1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.uring;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.DuplicatedByteBuf;
20 import io.netty.buffer.SlicedByteBuf;
21 import io.netty.buffer.SwappedByteBuf;
22 import io.netty.buffer.WrappedByteBuf;
23 import io.netty.util.internal.PlatformDependent;
24
25 import java.nio.ByteOrder;
26 import java.util.Arrays;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 final class IoUringBufferRing {
30 private final long ioUringBufRingAddr;
31 private final long tailFieldAddress;
32 private final short entries;
33 private final int maxUnreleasedBuffers;
34 private final short mask;
35 private final short bufferGroupId;
36 private final int ringFd;
37 private final IoUringBufferRingByteBuf[] buffers;
38 private final IoUringBufferRingAllocator allocator;
39 private final IoUringBufferRingExhaustedEvent exhaustedEvent;
40 private final boolean incremental;
41 private final AtomicInteger unreleasedBuffers = new AtomicInteger();
42 private volatile boolean usable;
43 private boolean corrupted;
44
45 IoUringBufferRing(int ringFd, long ioUringBufRingAddr,
46 short entries, int maxUnreleasedBuffers, short bufferGroupId, boolean incremental,
47 IoUringBufferRingAllocator allocator) {
48 assert entries % 2 == 0;
49 this.ioUringBufRingAddr = ioUringBufRingAddr;
50 this.tailFieldAddress = ioUringBufRingAddr + Native.IO_URING_BUFFER_RING_TAIL;
51 this.entries = entries;
52 this.maxUnreleasedBuffers = maxUnreleasedBuffers;
53 this.mask = (short) (entries - 1);
54 this.bufferGroupId = bufferGroupId;
55 this.ringFd = ringFd;
56 this.buffers = new IoUringBufferRingByteBuf[entries];
57 this.incremental = incremental;
58 this.allocator = allocator;
59 this.exhaustedEvent = new IoUringBufferRingExhaustedEvent(bufferGroupId);
60 }
61
62 boolean isUsable() {
63 return !corrupted && usable;
64 }
65
66 void fill() {
67 for (short i = 0; i < entries; i++) {
68 fillBuffer(i);
69 }
70 usable = true;
71 }
72
73
74
75
76
77 IoUringBufferRingExhaustedEvent getExhaustedEvent() {
78 return exhaustedEvent;
79 }
80
81 void fillBuffer(short bid) {
82 if (corrupted) {
83 return;
84 }
85 short oldTail = PlatformDependent.getShort(tailFieldAddress);
86 int ringIndex = oldTail & mask;
87 assert ringIndex == bid;
88 assert buffers[bid] == null;
89 final ByteBuf byteBuf;
90 try {
91 byteBuf = allocator.allocate();
92 } catch (OutOfMemoryError e) {
93
94
95
96 corrupted = true;
97 throw e;
98 }
99 byteBuf.writerIndex(byteBuf.capacity());
100 buffers[bid] = new IoUringBufferRingByteBuf(byteBuf);
101
102
103
104
105 long ioUringBufAddress = ioUringBufRingAddr + (long) Native.SIZEOF_IOURING_BUF * ringIndex;
106 PlatformDependent.putLong(ioUringBufAddress + Native.IOURING_BUFFER_OFFSETOF_ADDR,
107 byteBuf.memoryAddress() + byteBuf.readerIndex());
108 PlatformDependent.putInt(ioUringBufAddress + Native.IOURING_BUFFER_OFFSETOF_LEN, byteBuf.capacity());
109 PlatformDependent.putShort(ioUringBufAddress + Native.IOURING_BUFFER_OFFSETOF_BID, bid);
110
111 PlatformDependent.putShortOrdered(tailFieldAddress, (short) (oldTail + 1));
112 }
113
114
115
116
117
118
119
120
121 int attemptedBytesRead(short bid) {
122 return buffers[bid].readableBytes();
123 }
124
125
126
127
128
129
130
131
132
133
134
135 ByteBuf useBuffer(short bid, int readableBytes, boolean more) {
136 assert readableBytes > 0;
137 IoUringBufferRingByteBuf byteBuf = buffers[bid];
138
139 allocator.lastBytesRead(byteBuf.readableBytes(), readableBytes);
140 if (incremental && more && byteBuf.readableBytes() > readableBytes) {
141
142 return byteBuf.readRetainedSlice(readableBytes);
143 }
144
145
146 buffers[bid] = null;
147 byteBuf.markUsed();
148 return byteBuf.writerIndex(byteBuf.readerIndex() +
149 Math.min(readableBytes, byteBuf.readableBytes()));
150 }
151
152 short nextBid(short bid) {
153 return (short) ((bid + 1) & mask);
154 }
155
156
157
158
159
160
161 short bufferGroupId() {
162 return bufferGroupId;
163 }
164
165
166
167
168 void close() {
169 Native.ioUringUnRegisterBufRing(ringFd, ioUringBufRingAddr, entries, bufferGroupId);
170 for (ByteBuf byteBuf : buffers) {
171 if (byteBuf != null) {
172 byteBuf.release();
173 }
174 }
175 Arrays.fill(buffers, null);
176 }
177
178
179 final class IoUringBufferRingByteBuf extends WrappedByteBuf {
180 IoUringBufferRingByteBuf(ByteBuf buf) {
181 super(buf);
182 }
183
184 void markUsed() {
185 if (unreleasedBuffers.incrementAndGet() == maxUnreleasedBuffers) {
186 usable = false;
187 }
188 }
189
190 @SuppressWarnings("deprecation")
191 @Override
192 public ByteBuf order(ByteOrder endianness) {
193 if (endianness == order()) {
194 return this;
195 }
196 return new SwappedByteBuf(this);
197 }
198
199 @Override
200 public ByteBuf slice() {
201 return slice(readerIndex(), readableBytes());
202 }
203
204 @Override
205 public ByteBuf retainedSlice() {
206 return slice().retain();
207 }
208
209 @SuppressWarnings("deprecation")
210 @Override
211 public ByteBuf slice(int index, int length) {
212 return new SlicedByteBuf(this, index, length);
213 }
214
215 @Override
216 public ByteBuf retainedSlice(int index, int length) {
217 return slice(index, length).retain();
218 }
219
220 @Override
221 public ByteBuf readSlice(int length) {
222 ByteBuf slice = slice(readerIndex(), length);
223 skipBytes(length);
224 return slice;
225 }
226
227 @Override
228 public ByteBuf readRetainedSlice(int length) {
229 ByteBuf slice = retainedSlice(readerIndex(), length);
230 try {
231 skipBytes(length);
232 } catch (Throwable cause) {
233 slice.release();
234 throw cause;
235 }
236 return slice;
237 }
238
239 @SuppressWarnings("deprecation")
240 @Override
241 public ByteBuf duplicate() {
242 return new DuplicatedByteBuf(this);
243 }
244
245 @Override
246 public ByteBuf retainedDuplicate() {
247 return duplicate().retain();
248 }
249
250 @Override
251 public boolean release() {
252 if (super.release()) {
253 released();
254 return true;
255 }
256 return false;
257 }
258
259 @Override
260 public boolean release(int decrement) {
261 if (super.release(decrement)) {
262 released();
263 return true;
264 }
265 return false;
266 }
267
268 private void released() {
269 if (unreleasedBuffers.decrementAndGet() == maxUnreleasedBuffers / 2) {
270 usable = true;
271 }
272 }
273 }
274 }