1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.buffer.api.pool;
17
18 import io.netty5.buffer.api.pool.PoolArena.SizeClass;
19 import io.netty5.util.internal.MathUtil;
20 import io.netty5.util.internal.ObjectPool;
21 import io.netty5.util.internal.ObjectPool.Handle;
22 import io.netty5.util.internal.PlatformDependent;
23 import io.netty5.util.internal.logging.InternalLogger;
24 import io.netty5.util.internal.logging.InternalLoggerFactory;
25
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.Queue;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import static io.netty5.buffer.api.pool.PoolArena.SizeClass.Normal;
32 import static io.netty5.buffer.api.pool.PoolArena.SizeClass.Small;
33 import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
34
35
36
37
38
39
40
41
42 final class PoolThreadCache {
43
44 private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
45 private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;
46
47 final AtomicInteger arenaReferenceCounter;
48
49 private final PoolArena arena;
50
51 private final MemoryRegionCache[] smallSubPageCaches;
52 private final MemoryRegionCache[] normalCaches;
53
54 private final int freeSweepAllocationThreshold;
55
56 private int allocations;
57
58 PoolThreadCache(PoolArena arena,
59 int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity,
60 int freeSweepAllocationThreshold) {
61 checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
62 this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
63 if (arena != null) {
64
65 MemoryRegionCache[] smallSubPageCaches = createSubPageCaches(
66 smallCacheSize, arena.numSmallSubpagePools);
67
68 MemoryRegionCache[] normalCaches = createNormalCaches(
69 normalCacheSize, maxCachedBufferCapacity, arena);
70
71
72 if ((smallSubPageCaches != null || normalCaches != null)
73 && freeSweepAllocationThreshold < 1) {
74 throw new IllegalArgumentException("freeSweepAllocationThreshold: "
75 + freeSweepAllocationThreshold + " (expected: > 0)");
76 }
77
78 this.arena = arena;
79 this.smallSubPageCaches = smallSubPageCaches;
80 this.normalCaches = normalCaches;
81 arenaReferenceCounter = arena.numThreadCaches;
82 arenaReferenceCounter.getAndIncrement();
83 } else {
84
85 this.arena = null;
86 smallSubPageCaches = null;
87 normalCaches = null;
88 arenaReferenceCounter = null;
89 }
90 }
91
92 private static MemoryRegionCache[] createSubPageCaches(
93 int cacheSize, int numCaches) {
94 if (cacheSize > 0 && numCaches > 0) {
95 MemoryRegionCache[] cache = new MemoryRegionCache[numCaches];
96 for (int i = 0; i < cache.length; i++) {
97
98 cache[i] = new SubPageMemoryRegionCache(cacheSize);
99 }
100 return cache;
101 } else {
102 return null;
103 }
104 }
105
106 private static MemoryRegionCache[] createNormalCaches(
107 int cacheSize, int maxCachedBufferCapacity, PoolArena area) {
108 if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
109 int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
110
111
112
113 List<MemoryRegionCache> cache = new ArrayList<>() ;
114 for (int idx = area.numSmallSubpagePools; idx < area.nSizes && area.sizeIdx2size(idx) <= max ; idx++) {
115 cache.add(new NormalMemoryRegionCache(cacheSize));
116 }
117 return cache.toArray(MemoryRegionCache[]::new);
118 } else {
119 return null;
120 }
121 }
122
123 PoolArena getArena() {
124 return arena;
125 }
126
127
128 static int log2(int val) {
129 return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);
130 }
131
132
133
134
135 UntetheredMemory allocateSmall(int size, int sizeIdx) {
136 return allocate(cacheForSmall(sizeIdx), size);
137 }
138
139
140
141
142 UntetheredMemory allocateNormal(PoolArena area, int size, int sizeIdx) {
143 return allocate(cacheForNormal(area, sizeIdx), size);
144 }
145
146 private UntetheredMemory allocate(MemoryRegionCache cache, int size) {
147 if (cache == null) {
148
149 return null;
150 }
151 UntetheredMemory allocated = cache.allocate(size, this);
152 if (++allocations >= freeSweepAllocationThreshold) {
153 allocations = 0;
154 trim();
155 }
156 return allocated;
157 }
158
159
160
161
162
163 boolean add(PoolArena area, PoolChunk chunk,
164 long handle, int normCapacity, SizeClass sizeClass) {
165 int sizeIdx = area.size2SizeIdx(normCapacity);
166 MemoryRegionCache cache = cache(area, sizeIdx, sizeClass);
167 if (cache == null) {
168 return false;
169 }
170 return cache.add(chunk, handle, normCapacity);
171 }
172
173 private MemoryRegionCache cache(PoolArena area, int sizeIdx, SizeClass sizeClass) {
174 if (sizeClass == Normal) {
175 return cacheForNormal(area, sizeIdx);
176 }
177 if (sizeClass == Small) {
178 return cacheForSmall(sizeIdx);
179 }
180 throw new AssertionError("Unexpected size class: " + sizeClass);
181 }
182
183
184
185
186 void free() {
187 if (arena != null) {
188 int numFreed = free(smallSubPageCaches) + free(normalCaches);
189
190 if (numFreed > 0 && logger.isDebugEnabled()) {
191 logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed,
192 Thread.currentThread().getName());
193 }
194 }
195
196 if (arenaReferenceCounter != null) {
197 arenaReferenceCounter.getAndDecrement();
198 }
199 }
200
201 private static int free(MemoryRegionCache[] caches) {
202 if (caches == null) {
203 return 0;
204 }
205
206 int numFreed = 0;
207 for (MemoryRegionCache c: caches) {
208 numFreed += free(c);
209 }
210 return numFreed;
211 }
212
213 private static int free(MemoryRegionCache cache) {
214 if (cache == null) {
215 return 0;
216 }
217 return cache.free();
218 }
219
220 void trim() {
221 if (arena != null) {
222 trim(smallSubPageCaches);
223 trim(normalCaches);
224 }
225 }
226
227 private static void trim(MemoryRegionCache[] caches) {
228 if (caches == null) {
229 return;
230 }
231 for (MemoryRegionCache c: caches) {
232 trim(c);
233 }
234 }
235
236 private static void trim(MemoryRegionCache cache) {
237 if (cache == null) {
238 return;
239 }
240 cache.trim();
241 }
242
243 private MemoryRegionCache cacheForSmall(int sizeIdx) {
244 if (arena != null) {
245 return cache(smallSubPageCaches, sizeIdx);
246 }
247 return null;
248 }
249
250 private MemoryRegionCache cacheForNormal(PoolArena area, int sizeIdx) {
251 if (area != null) {
252
253 int idx = sizeIdx - area.numSmallSubpagePools;
254 return cache(normalCaches, idx);
255 }
256 return null;
257 }
258
259 private static MemoryRegionCache cache(MemoryRegionCache[] cache, int sizeIdx) {
260 if (cache == null || sizeIdx > cache.length - 1) {
261 return null;
262 }
263 return cache[sizeIdx];
264 }
265
266
267
268
269 private static final class SubPageMemoryRegionCache extends MemoryRegionCache {
270 SubPageMemoryRegionCache(int size) {
271 super(size, Small);
272 }
273
274 @Override
275 protected UntetheredMemory allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache) {
276 return chunk.allocateBufferWithSubpage(handle, size, threadCache);
277 }
278 }
279
280
281
282
283 private static final class NormalMemoryRegionCache extends MemoryRegionCache {
284 NormalMemoryRegionCache(int size) {
285 super(size, Normal);
286 }
287
288 @Override
289 protected UntetheredMemory allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache) {
290 return chunk.allocateBuffer(handle, size, threadCache);
291 }
292 }
293
294 private abstract static class MemoryRegionCache {
295 private final int size;
296 private final Queue<Entry> queue;
297 private final SizeClass sizeClass;
298 private int allocations;
299
300 MemoryRegionCache(int size, SizeClass sizeClass) {
301 this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
302 queue = PlatformDependent.newFixedMpscQueue(this.size);
303 this.sizeClass = sizeClass;
304 }
305
306
307
308
309 protected abstract UntetheredMemory allocBuf(
310 PoolChunk chunk, long handle, int size, PoolThreadCache threadCache);
311
312
313
314
315 public final boolean add(PoolChunk chunk, long handle, int normCapacity) {
316 Entry entry = newEntry(chunk, handle, normCapacity);
317 boolean queued = queue.offer(entry);
318 if (!queued) {
319
320 entry.recycle();
321 }
322
323 return queued;
324 }
325
326
327
328
329 public final UntetheredMemory allocate(int size, PoolThreadCache threadCache) {
330 Entry entry = queue.poll();
331 if (entry == null) {
332 return null;
333 }
334 UntetheredMemory buffer = allocBuf(entry.chunk, entry.handle, size, threadCache);
335 entry.recycle();
336
337
338 allocations++;
339 return buffer;
340 }
341
342
343
344
345 public final int free() {
346 return free(Integer.MAX_VALUE);
347 }
348
349 private int free(int max) {
350 int numFreed = 0;
351 for (; numFreed < max; numFreed++) {
352 Entry entry = queue.poll();
353 if (entry != null) {
354 freeEntry(entry);
355 } else {
356
357 return numFreed;
358 }
359 }
360 return numFreed;
361 }
362
363
364
365
366 public final void trim() {
367 int free = size - allocations;
368 allocations = 0;
369
370
371 if (free > 0) {
372 free(free);
373 }
374 }
375
376 private void freeEntry(Entry entry) {
377 PoolChunk chunk = entry.chunk;
378 long handle = entry.handle;
379
380 entry.recycle();
381 chunk.arena.freeChunk(chunk, handle, entry.normCapacity, sizeClass);
382 }
383
384 static final class Entry {
385 final Handle<Entry> recyclerHandle;
386 PoolChunk chunk;
387 long handle = -1;
388 int normCapacity;
389
390 Entry(Handle<Entry> recyclerHandle) {
391 this.recyclerHandle = recyclerHandle;
392 }
393
394 void recycle() {
395 chunk = null;
396 handle = -1;
397 recyclerHandle.recycle(this);
398 }
399 }
400
401 private static Entry newEntry(PoolChunk chunk, long handle, int normCapacity) {
402 Entry entry = RECYCLER.get();
403 entry.chunk = chunk;
404 entry.handle = handle;
405 entry.normCapacity = normCapacity;
406 return entry;
407 }
408
409 private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(Entry::new);
410 }
411 }