1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.buffer;
18
19
20 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
21
22 import io.netty.buffer.PoolArena.SizeClass;
23 import io.netty.util.internal.MathUtil;
24 import io.netty.util.internal.ObjectPool;
25 import io.netty.util.internal.ObjectPool.Handle;
26 import io.netty.util.internal.ObjectPool.ObjectCreator;
27 import io.netty.util.internal.PlatformDependent;
28 import io.netty.util.internal.logging.InternalLogger;
29 import io.netty.util.internal.logging.InternalLoggerFactory;
30
31 import java.nio.ByteBuffer;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.Queue;
35 import java.util.concurrent.atomic.AtomicBoolean;
36
37
38
39
40
41
42
43
44 final class PoolThreadCache {
45
46 private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
47 private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;
48
49 final PoolArena<byte[]> heapArena;
50 final PoolArena<ByteBuffer> directArena;
51
52
53 private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
54 private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
55 private final MemoryRegionCache<byte[]>[] normalHeapCaches;
56 private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
57
58 private final int freeSweepAllocationThreshold;
59 private final AtomicBoolean freed = new AtomicBoolean();
60
61 private int allocations;
62
63
64
65
66 PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
67 int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity,
68 int freeSweepAllocationThreshold) {
69 checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
70 this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
71 this.heapArena = heapArena;
72 this.directArena = directArena;
73 if (directArena != null) {
74 smallSubPageDirectCaches = createSubPageCaches(
75 smallCacheSize, directArena.numSmallSubpagePools);
76
77 normalDirectCaches = createNormalCaches(
78 normalCacheSize, maxCachedBufferCapacity, directArena);
79
80 directArena.numThreadCaches.getAndIncrement();
81 } else {
82
83 smallSubPageDirectCaches = null;
84 normalDirectCaches = null;
85 }
86 if (heapArena != null) {
87
88 smallSubPageHeapCaches = createSubPageCaches(
89 smallCacheSize, heapArena.numSmallSubpagePools);
90
91 normalHeapCaches = createNormalCaches(
92 normalCacheSize, maxCachedBufferCapacity, heapArena);
93
94 heapArena.numThreadCaches.getAndIncrement();
95 } else {
96
97 smallSubPageHeapCaches = null;
98 normalHeapCaches = null;
99 }
100
101
102 if ((smallSubPageDirectCaches != null || normalDirectCaches != null
103 || smallSubPageHeapCaches != null || normalHeapCaches != null)
104 && freeSweepAllocationThreshold < 1) {
105 throw new IllegalArgumentException("freeSweepAllocationThreshold: "
106 + freeSweepAllocationThreshold + " (expected: > 0)");
107 }
108 }
109
110 private static <T> MemoryRegionCache<T>[] createSubPageCaches(
111 int cacheSize, int numCaches) {
112 if (cacheSize > 0 && numCaches > 0) {
113 @SuppressWarnings("unchecked")
114 MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
115 for (int i = 0; i < cache.length; i++) {
116
117 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize);
118 }
119 return cache;
120 } else {
121 return null;
122 }
123 }
124
125 @SuppressWarnings("unchecked")
126 private static <T> MemoryRegionCache<T>[] createNormalCaches(
127 int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
128 if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
129 int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
130
131
132 List<MemoryRegionCache<T>> cache = new ArrayList<MemoryRegionCache<T>>() ;
133 for (int idx = area.numSmallSubpagePools; idx < area.nSizes && area.sizeIdx2size(idx) <= max ; idx++) {
134 cache.add(new NormalMemoryRegionCache<T>(cacheSize));
135 }
136 return cache.toArray(new MemoryRegionCache[0]);
137 } else {
138 return null;
139 }
140 }
141
142
143 static int log2(int val) {
144 return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);
145 }
146
147
148
149
150 boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
151 return allocate(cacheForSmall(area, sizeIdx), buf, reqCapacity);
152 }
153
154
155
156
157 boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
158 return allocate(cacheForNormal(area, sizeIdx), buf, reqCapacity);
159 }
160
161 @SuppressWarnings({ "unchecked", "rawtypes" })
162 private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
163 if (cache == null) {
164
165 return false;
166 }
167 boolean allocated = cache.allocate(buf, reqCapacity, this);
168 if (++ allocations >= freeSweepAllocationThreshold) {
169 allocations = 0;
170 trim();
171 }
172 return allocated;
173 }
174
175
176
177
178
179 @SuppressWarnings({ "unchecked", "rawtypes" })
180 boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
181 long handle, int normCapacity, SizeClass sizeClass) {
182 int sizeIdx = area.size2SizeIdx(normCapacity);
183 MemoryRegionCache<?> cache = cache(area, sizeIdx, sizeClass);
184 if (cache == null) {
185 return false;
186 }
187 return cache.add(chunk, nioBuffer, handle, normCapacity);
188 }
189
190 private MemoryRegionCache<?> cache(PoolArena<?> area, int sizeIdx, SizeClass sizeClass) {
191 switch (sizeClass) {
192 case Normal:
193 return cacheForNormal(area, sizeIdx);
194 case Small:
195 return cacheForSmall(area, sizeIdx);
196 default:
197 throw new Error();
198 }
199 }
200
201
202 @Override
203 protected void finalize() throws Throwable {
204 try {
205 super.finalize();
206 } finally {
207 free(true);
208 }
209 }
210
211
212
213
214 void free(boolean finalizer) {
215
216
217 if (freed.compareAndSet(false, true)) {
218 int numFreed = free(smallSubPageDirectCaches, finalizer) +
219 free(normalDirectCaches, finalizer) +
220 free(smallSubPageHeapCaches, finalizer) +
221 free(normalHeapCaches, finalizer);
222
223 if (numFreed > 0 && logger.isDebugEnabled()) {
224 logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed,
225 Thread.currentThread().getName());
226 }
227
228 if (directArena != null) {
229 directArena.numThreadCaches.getAndDecrement();
230 }
231
232 if (heapArena != null) {
233 heapArena.numThreadCaches.getAndDecrement();
234 }
235 }
236 }
237
238 private static int free(MemoryRegionCache<?>[] caches, boolean finalizer) {
239 if (caches == null) {
240 return 0;
241 }
242
243 int numFreed = 0;
244 for (MemoryRegionCache<?> c: caches) {
245 numFreed += free(c, finalizer);
246 }
247 return numFreed;
248 }
249
250 private static int free(MemoryRegionCache<?> cache, boolean finalizer) {
251 if (cache == null) {
252 return 0;
253 }
254 return cache.free(finalizer);
255 }
256
257 void trim() {
258 trim(smallSubPageDirectCaches);
259 trim(normalDirectCaches);
260 trim(smallSubPageHeapCaches);
261 trim(normalHeapCaches);
262 }
263
264 private static void trim(MemoryRegionCache<?>[] caches) {
265 if (caches == null) {
266 return;
267 }
268 for (MemoryRegionCache<?> c: caches) {
269 trim(c);
270 }
271 }
272
273 private static void trim(MemoryRegionCache<?> cache) {
274 if (cache == null) {
275 return;
276 }
277 cache.trim();
278 }
279
280 private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int sizeIdx) {
281 if (area.isDirect()) {
282 return cache(smallSubPageDirectCaches, sizeIdx);
283 }
284 return cache(smallSubPageHeapCaches, sizeIdx);
285 }
286
287 private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int sizeIdx) {
288
289 int idx = sizeIdx - area.numSmallSubpagePools;
290 if (area.isDirect()) {
291 return cache(normalDirectCaches, idx);
292 }
293 return cache(normalHeapCaches, idx);
294 }
295
296 private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int sizeIdx) {
297 if (cache == null || sizeIdx > cache.length - 1) {
298 return null;
299 }
300 return cache[sizeIdx];
301 }
302
303
304
305
306 private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
307 SubPageMemoryRegionCache(int size) {
308 super(size, SizeClass.Small);
309 }
310
311 @Override
312 protected void initBuf(
313 PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity,
314 PoolThreadCache threadCache) {
315 chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);
316 }
317 }
318
319
320
321
322 private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
323 NormalMemoryRegionCache(int size) {
324 super(size, SizeClass.Normal);
325 }
326
327 @Override
328 protected void initBuf(
329 PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity,
330 PoolThreadCache threadCache) {
331 chunk.initBuf(buf, nioBuffer, handle, reqCapacity, threadCache);
332 }
333 }
334
335 private abstract static class MemoryRegionCache<T> {
336 private final int size;
337 private final Queue<Entry<T>> queue;
338 private final SizeClass sizeClass;
339 private int allocations;
340
341 MemoryRegionCache(int size, SizeClass sizeClass) {
342 this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
343 queue = PlatformDependent.newFixedMpscQueue(this.size);
344 this.sizeClass = sizeClass;
345 }
346
347
348
349
350 protected abstract void initBuf(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle,
351 PooledByteBuf<T> buf, int reqCapacity, PoolThreadCache threadCache);
352
353
354
355
356 @SuppressWarnings("unchecked")
357 public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
358 Entry<T> entry = newEntry(chunk, nioBuffer, handle, normCapacity);
359 boolean queued = queue.offer(entry);
360 if (!queued) {
361
362 entry.recycle();
363 }
364
365 return queued;
366 }
367
368
369
370
371 public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity, PoolThreadCache threadCache) {
372 Entry<T> entry = queue.poll();
373 if (entry == null) {
374 return false;
375 }
376 initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity, threadCache);
377 entry.recycle();
378
379
380 ++ allocations;
381 return true;
382 }
383
384
385
386
387 public final int free(boolean finalizer) {
388 return free(Integer.MAX_VALUE, finalizer);
389 }
390
391 private int free(int max, boolean finalizer) {
392 int numFreed = 0;
393 for (; numFreed < max; numFreed++) {
394 Entry<T> entry = queue.poll();
395 if (entry != null) {
396 freeEntry(entry, finalizer);
397 } else {
398
399 return numFreed;
400 }
401 }
402 return numFreed;
403 }
404
405
406
407
408 public final void trim() {
409 int free = size - allocations;
410 allocations = 0;
411
412
413 if (free > 0) {
414 free(free, false);
415 }
416 }
417
418 @SuppressWarnings({ "unchecked", "rawtypes" })
419 private void freeEntry(Entry entry, boolean finalizer) {
420
421 PoolChunk chunk = entry.chunk;
422 long handle = entry.handle;
423 ByteBuffer nioBuffer = entry.nioBuffer;
424 int normCapacity = entry.normCapacity;
425
426 if (!finalizer) {
427
428
429 entry.recycle();
430 }
431
432 chunk.arena.freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, finalizer);
433 }
434
435 static final class Entry<T> {
436 final Handle<Entry<?>> recyclerHandle;
437 PoolChunk<T> chunk;
438 ByteBuffer nioBuffer;
439 long handle = -1;
440 int normCapacity;
441
442 Entry(Handle<Entry<?>> recyclerHandle) {
443 this.recyclerHandle = recyclerHandle;
444 }
445
446 void recycle() {
447 chunk = null;
448 nioBuffer = null;
449 handle = -1;
450 recyclerHandle.recycle(this);
451 }
452 }
453
454 @SuppressWarnings("rawtypes")
455 private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
456 Entry entry = RECYCLER.get();
457 entry.chunk = chunk;
458 entry.nioBuffer = nioBuffer;
459 entry.handle = handle;
460 entry.normCapacity = normCapacity;
461 return entry;
462 }
463
464 @SuppressWarnings("rawtypes")
465 private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
466 @SuppressWarnings("unchecked")
467 @Override
468 public Entry newObject(Handle<Entry> handle) {
469 return new Entry(handle);
470 }
471 });
472 }
473 }