1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.buffer;
18
19
20 import io.netty.buffer.PoolArena.SizeClass;
21 import io.netty.util.Recycler;
22 import io.netty.util.Recycler.EnhancedHandle;
23 import io.netty.util.internal.MathUtil;
24 import io.netty.util.internal.ObjectPool.Handle;
25 import io.netty.util.internal.PlatformDependent;
26 import io.netty.util.internal.logging.InternalLogger;
27 import io.netty.util.internal.logging.InternalLoggerFactory;
28
29 import java.nio.ByteBuffer;
30 import java.util.ArrayList;
31 import java.util.List;
32 import java.util.Queue;
33 import java.util.concurrent.atomic.AtomicBoolean;
34
35 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
36
37
38
39
40
41
42
43
44 final class PoolThreadCache {
45
46 private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
47 private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;
48
49 final PoolArena<byte[]> heapArena;
50 final PoolArena<ByteBuffer> directArena;
51
52
53 private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
54 private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
55 private final MemoryRegionCache<byte[]>[] normalHeapCaches;
56 private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
57
58 private final int freeSweepAllocationThreshold;
59 private final AtomicBoolean freed = new AtomicBoolean();
60 @SuppressWarnings("unused")
61 private final FreeOnFinalize freeOnFinalize;
62
63 private int allocations;
64
65
66
67
68 PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
69 int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity,
70 int freeSweepAllocationThreshold, boolean useFinalizer) {
71 checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
72 this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
73 this.heapArena = heapArena;
74 this.directArena = directArena;
75 if (directArena != null) {
76 smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.sizeClass.nSubpages);
77 normalDirectCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, directArena);
78 directArena.numThreadCaches.getAndIncrement();
79 } else {
80
81 smallSubPageDirectCaches = null;
82 normalDirectCaches = null;
83 }
84 if (heapArena != null) {
85
86 smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.sizeClass.nSubpages);
87 normalHeapCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, heapArena);
88 heapArena.numThreadCaches.getAndIncrement();
89 } else {
90
91 smallSubPageHeapCaches = null;
92 normalHeapCaches = null;
93 }
94
95
96 if ((smallSubPageDirectCaches != null || normalDirectCaches != null
97 || smallSubPageHeapCaches != null || normalHeapCaches != null)
98 && freeSweepAllocationThreshold < 1) {
99 throw new IllegalArgumentException("freeSweepAllocationThreshold: "
100 + freeSweepAllocationThreshold + " (expected: > 0)");
101 }
102 freeOnFinalize = useFinalizer ? new FreeOnFinalize(this) : null;
103 }
104
105 private static <T> MemoryRegionCache<T>[] createSubPageCaches(
106 int cacheSize, int numCaches) {
107 if (cacheSize > 0 && numCaches > 0) {
108 @SuppressWarnings("unchecked")
109 MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
110 for (int i = 0; i < cache.length; i++) {
111
112 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize);
113 }
114 return cache;
115 } else {
116 return null;
117 }
118 }
119
120 @SuppressWarnings("unchecked")
121 private static <T> MemoryRegionCache<T>[] createNormalCaches(
122 int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
123 if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
124 int max = Math.min(area.sizeClass.chunkSize, maxCachedBufferCapacity);
125
126
127 List<MemoryRegionCache<T>> cache = new ArrayList<MemoryRegionCache<T>>() ;
128 for (int idx = area.sizeClass.nSubpages; idx < area.sizeClass.nSizes &&
129 area.sizeClass.sizeIdx2size(idx) <= max; idx++) {
130 cache.add(new NormalMemoryRegionCache<T>(cacheSize));
131 }
132 return cache.toArray(new MemoryRegionCache[0]);
133 } else {
134 return null;
135 }
136 }
137
138
139 static int log2(int val) {
140 return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);
141 }
142
143
144
145
146 boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
147 return allocate(cacheForSmall(area, sizeIdx), buf, reqCapacity);
148 }
149
150
151
152
153 boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
154 return allocate(cacheForNormal(area, sizeIdx), buf, reqCapacity);
155 }
156
157 @SuppressWarnings({ "unchecked", "rawtypes" })
158 private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
159 if (cache == null) {
160
161 return false;
162 }
163 boolean allocated = cache.allocate(buf, reqCapacity, this);
164 if (++ allocations >= freeSweepAllocationThreshold) {
165 allocations = 0;
166 trim();
167 }
168 return allocated;
169 }
170
171
172
173
174
175 @SuppressWarnings({ "unchecked", "rawtypes" })
176 boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
177 long handle, int normCapacity, SizeClass sizeClass) {
178 int sizeIdx = area.sizeClass.size2SizeIdx(normCapacity);
179 MemoryRegionCache<?> cache = cache(area, sizeIdx, sizeClass);
180 if (cache == null) {
181 return false;
182 }
183 if (freed.get()) {
184 return false;
185 }
186 return cache.add(chunk, nioBuffer, handle, normCapacity);
187 }
188
189 private MemoryRegionCache<?> cache(PoolArena<?> area, int sizeIdx, SizeClass sizeClass) {
190 switch (sizeClass) {
191 case Normal:
192 return cacheForNormal(area, sizeIdx);
193 case Small:
194 return cacheForSmall(area, sizeIdx);
195 default:
196 throw new Error();
197 }
198 }
199
200
201
202
203 void free(boolean finalizer) {
204
205
206 if (freed.compareAndSet(false, true)) {
207 if (freeOnFinalize != null) {
208
209 freeOnFinalize.cache = null;
210 }
211 int numFreed = free(smallSubPageDirectCaches, finalizer) +
212 free(normalDirectCaches, finalizer) +
213 free(smallSubPageHeapCaches, finalizer) +
214 free(normalHeapCaches, finalizer);
215
216 if (numFreed > 0 && logger.isDebugEnabled()) {
217 logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed,
218 Thread.currentThread().getName());
219 }
220
221 if (directArena != null) {
222 directArena.numThreadCaches.getAndDecrement();
223 }
224
225 if (heapArena != null) {
226 heapArena.numThreadCaches.getAndDecrement();
227 }
228 }
229 }
230
231 private static int free(MemoryRegionCache<?>[] caches, boolean finalizer) {
232 if (caches == null) {
233 return 0;
234 }
235
236 int numFreed = 0;
237 for (MemoryRegionCache<?> c: caches) {
238 numFreed += free(c, finalizer);
239 }
240 return numFreed;
241 }
242
243 private static int free(MemoryRegionCache<?> cache, boolean finalizer) {
244 if (cache == null) {
245 return 0;
246 }
247 return cache.free(finalizer);
248 }
249
250 void trim() {
251 trim(smallSubPageDirectCaches);
252 trim(normalDirectCaches);
253 trim(smallSubPageHeapCaches);
254 trim(normalHeapCaches);
255 }
256
257 private static void trim(MemoryRegionCache<?>[] caches) {
258 if (caches == null) {
259 return;
260 }
261 for (MemoryRegionCache<?> c: caches) {
262 trim(c);
263 }
264 }
265
266 private static void trim(MemoryRegionCache<?> cache) {
267 if (cache == null) {
268 return;
269 }
270 cache.trim();
271 }
272
273 private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int sizeIdx) {
274 if (area.isDirect()) {
275 return cache(smallSubPageDirectCaches, sizeIdx);
276 }
277 return cache(smallSubPageHeapCaches, sizeIdx);
278 }
279
280 private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int sizeIdx) {
281
282 int idx = sizeIdx - area.sizeClass.nSubpages;
283 if (area.isDirect()) {
284 return cache(normalDirectCaches, idx);
285 }
286 return cache(normalHeapCaches, idx);
287 }
288
289 private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int sizeIdx) {
290 if (cache == null || sizeIdx >= cache.length) {
291 return null;
292 }
293 return cache[sizeIdx];
294 }
295
296
297
298
299 private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
300 SubPageMemoryRegionCache(int size) {
301 super(size, SizeClass.Small);
302 }
303
304 @Override
305 protected void initBuf(
306 PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity,
307 PoolThreadCache threadCache) {
308 chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache, true);
309 }
310 }
311
312
313
314
315 private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
316 NormalMemoryRegionCache(int size) {
317 super(size, SizeClass.Normal);
318 }
319
320 @Override
321 protected void initBuf(
322 PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity,
323 PoolThreadCache threadCache) {
324 chunk.initBuf(buf, nioBuffer, handle, reqCapacity, threadCache, true);
325 }
326 }
327
328 private abstract static class MemoryRegionCache<T> {
329 private final int size;
330 private final Queue<Entry<T>> queue;
331 private final SizeClass sizeClass;
332 private int allocations;
333
334 MemoryRegionCache(int size, SizeClass sizeClass) {
335 this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
336 queue = PlatformDependent.newFixedMpscUnpaddedQueue(this.size);
337 this.sizeClass = sizeClass;
338 }
339
340
341
342
343 protected abstract void initBuf(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle,
344 PooledByteBuf<T> buf, int reqCapacity, PoolThreadCache threadCache);
345
346
347
348
349 @SuppressWarnings("unchecked")
350 public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
351 Entry<T> entry = newEntry(chunk, nioBuffer, handle, normCapacity);
352 boolean queued = queue.offer(entry);
353 if (!queued) {
354
355 entry.unguardedRecycle();
356 }
357
358 return queued;
359 }
360
361
362
363
364 public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity, PoolThreadCache threadCache) {
365 Entry<T> entry = queue.poll();
366 if (entry == null) {
367 return false;
368 }
369 initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity, threadCache);
370 entry.unguardedRecycle();
371
372
373 ++ allocations;
374 return true;
375 }
376
377
378
379
380 public final int free(boolean finalizer) {
381 return free(Integer.MAX_VALUE, finalizer);
382 }
383
384 private int free(int max, boolean finalizer) {
385 int numFreed = 0;
386 for (; numFreed < max; numFreed++) {
387 Entry<T> entry = queue.poll();
388 if (entry != null) {
389 freeEntry(entry, finalizer);
390 } else {
391
392 return numFreed;
393 }
394 }
395 return numFreed;
396 }
397
398
399
400
401 public final void trim() {
402 int free = size - allocations;
403 allocations = 0;
404
405
406 if (free > 0) {
407 free(free, false);
408 }
409 }
410
411 @SuppressWarnings({ "unchecked", "rawtypes" })
412 private void freeEntry(Entry entry, boolean finalizer) {
413
414 PoolChunk chunk = entry.chunk;
415 long handle = entry.handle;
416 ByteBuffer nioBuffer = entry.nioBuffer;
417 int normCapacity = entry.normCapacity;
418
419 if (!finalizer) {
420
421
422 entry.recycle();
423 }
424
425 chunk.arena.freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, finalizer);
426 }
427
428 static final class Entry<T> {
429 final EnhancedHandle<Entry<?>> recyclerHandle;
430 PoolChunk<T> chunk;
431 ByteBuffer nioBuffer;
432 long handle = -1;
433 int normCapacity;
434
435 Entry(Handle<Entry<?>> recyclerHandle) {
436 this.recyclerHandle = (EnhancedHandle<Entry<?>>) recyclerHandle;
437 }
438
439 void recycle() {
440 chunk = null;
441 nioBuffer = null;
442 handle = -1;
443 recyclerHandle.recycle(this);
444 }
445
446 void unguardedRecycle() {
447 chunk = null;
448 nioBuffer = null;
449 handle = -1;
450 recyclerHandle.unguardedRecycle(this);
451 }
452 }
453
454 @SuppressWarnings("rawtypes")
455 private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
456 Entry entry = RECYCLER.get();
457 entry.chunk = chunk;
458 entry.nioBuffer = nioBuffer;
459 entry.handle = handle;
460 entry.normCapacity = normCapacity;
461 return entry;
462 }
463
464 @SuppressWarnings("rawtypes")
465 private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
466 @Override
467 @SuppressWarnings("unchecked")
468 protected Entry newObject(Handle<Entry> handle) {
469 return new Entry(handle);
470 }
471 };
472 }
473
474 private static final class FreeOnFinalize {
475
476 private volatile PoolThreadCache cache;
477
478 private FreeOnFinalize(PoolThreadCache cache) {
479 this.cache = cache;
480 }
481
482
483 @SuppressWarnings({"FinalizeDeclaration", "deprecation"})
484 @Override
485 protected void finalize() throws Throwable {
486 try {
487 super.finalize();
488 } finally {
489 PoolThreadCache cache = this.cache;
490
491
492 this.cache = null;
493 if (cache != null) {
494 cache.free(true);
495 }
496 }
497 }
498 }
499 }