1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.buffer;
18
19
20 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
21
22 import io.netty.buffer.PoolArena.SizeClass;
23 import io.netty.util.Recycler.EnhancedHandle;
24 import io.netty.util.internal.MathUtil;
25 import io.netty.util.internal.ObjectPool;
26 import io.netty.util.internal.ObjectPool.Handle;
27 import io.netty.util.internal.ObjectPool.ObjectCreator;
28 import io.netty.util.internal.PlatformDependent;
29 import io.netty.util.internal.logging.InternalLogger;
30 import io.netty.util.internal.logging.InternalLoggerFactory;
31
32 import java.nio.ByteBuffer;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Queue;
36 import java.util.concurrent.atomic.AtomicBoolean;
37
38
39
40
41
42
43
44
45 final class PoolThreadCache {
46
47 private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
48 private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;
49
50 final PoolArena<byte[]> heapArena;
51 final PoolArena<ByteBuffer> directArena;
52
53
54 private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
55 private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
56 private final MemoryRegionCache<byte[]>[] normalHeapCaches;
57 private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
58
59 private final int freeSweepAllocationThreshold;
60 private final AtomicBoolean freed = new AtomicBoolean();
61 @SuppressWarnings("unused")
62 private final FreeOnFinalize freeOnFinalize;
63
64 private int allocations;
65
66
67
68
69 PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
70 int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity,
71 int freeSweepAllocationThreshold, boolean useFinalizer) {
72 checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
73 this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
74 this.heapArena = heapArena;
75 this.directArena = directArena;
76 if (directArena != null) {
77 smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.sizeClass.nSubpages);
78 normalDirectCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, directArena);
79 directArena.numThreadCaches.getAndIncrement();
80 } else {
81
82 smallSubPageDirectCaches = null;
83 normalDirectCaches = null;
84 }
85 if (heapArena != null) {
86
87 smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.sizeClass.nSubpages);
88 normalHeapCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, heapArena);
89 heapArena.numThreadCaches.getAndIncrement();
90 } else {
91
92 smallSubPageHeapCaches = null;
93 normalHeapCaches = null;
94 }
95
96
97 if ((smallSubPageDirectCaches != null || normalDirectCaches != null
98 || smallSubPageHeapCaches != null || normalHeapCaches != null)
99 && freeSweepAllocationThreshold < 1) {
100 throw new IllegalArgumentException("freeSweepAllocationThreshold: "
101 + freeSweepAllocationThreshold + " (expected: > 0)");
102 }
103 freeOnFinalize = useFinalizer ? new FreeOnFinalize(this) : null;
104 }
105
106 private static <T> MemoryRegionCache<T>[] createSubPageCaches(
107 int cacheSize, int numCaches) {
108 if (cacheSize > 0 && numCaches > 0) {
109 @SuppressWarnings("unchecked")
110 MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
111 for (int i = 0; i < cache.length; i++) {
112
113 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize);
114 }
115 return cache;
116 } else {
117 return null;
118 }
119 }
120
121 @SuppressWarnings("unchecked")
122 private static <T> MemoryRegionCache<T>[] createNormalCaches(
123 int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
124 if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
125 int max = Math.min(area.sizeClass.chunkSize, maxCachedBufferCapacity);
126
127
128 List<MemoryRegionCache<T>> cache = new ArrayList<MemoryRegionCache<T>>() ;
129 for (int idx = area.sizeClass.nSubpages; idx < area.sizeClass.nSizes &&
130 area.sizeClass.sizeIdx2size(idx) <= max; idx++) {
131 cache.add(new NormalMemoryRegionCache<T>(cacheSize));
132 }
133 return cache.toArray(new MemoryRegionCache[0]);
134 } else {
135 return null;
136 }
137 }
138
139
140 static int log2(int val) {
141 return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);
142 }
143
144
145
146
147 boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
148 return allocate(cacheForSmall(area, sizeIdx), buf, reqCapacity);
149 }
150
151
152
153
154 boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
155 return allocate(cacheForNormal(area, sizeIdx), buf, reqCapacity);
156 }
157
158 @SuppressWarnings({ "unchecked", "rawtypes" })
159 private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
160 if (cache == null) {
161
162 return false;
163 }
164 boolean allocated = cache.allocate(buf, reqCapacity, this);
165 if (++ allocations >= freeSweepAllocationThreshold) {
166 allocations = 0;
167 trim();
168 }
169 return allocated;
170 }
171
172
173
174
175
176 @SuppressWarnings({ "unchecked", "rawtypes" })
177 boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
178 long handle, int normCapacity, SizeClass sizeClass) {
179 int sizeIdx = area.sizeClass.size2SizeIdx(normCapacity);
180 MemoryRegionCache<?> cache = cache(area, sizeIdx, sizeClass);
181 if (cache == null) {
182 return false;
183 }
184 if (freed.get()) {
185 return false;
186 }
187 return cache.add(chunk, nioBuffer, handle, normCapacity);
188 }
189
190 private MemoryRegionCache<?> cache(PoolArena<?> area, int sizeIdx, SizeClass sizeClass) {
191 switch (sizeClass) {
192 case Normal:
193 return cacheForNormal(area, sizeIdx);
194 case Small:
195 return cacheForSmall(area, sizeIdx);
196 default:
197 throw new Error();
198 }
199 }
200
201
202
203
204 void free(boolean finalizer) {
205
206
207 if (freed.compareAndSet(false, true)) {
208 if (freeOnFinalize != null) {
209
210 freeOnFinalize.cache = null;
211 }
212 int numFreed = free(smallSubPageDirectCaches, finalizer) +
213 free(normalDirectCaches, finalizer) +
214 free(smallSubPageHeapCaches, finalizer) +
215 free(normalHeapCaches, finalizer);
216
217 if (numFreed > 0 && logger.isDebugEnabled()) {
218 logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed,
219 Thread.currentThread().getName());
220 }
221
222 if (directArena != null) {
223 directArena.numThreadCaches.getAndDecrement();
224 }
225
226 if (heapArena != null) {
227 heapArena.numThreadCaches.getAndDecrement();
228 }
229 }
230 }
231
232 private static int free(MemoryRegionCache<?>[] caches, boolean finalizer) {
233 if (caches == null) {
234 return 0;
235 }
236
237 int numFreed = 0;
238 for (MemoryRegionCache<?> c: caches) {
239 numFreed += free(c, finalizer);
240 }
241 return numFreed;
242 }
243
244 private static int free(MemoryRegionCache<?> cache, boolean finalizer) {
245 if (cache == null) {
246 return 0;
247 }
248 return cache.free(finalizer);
249 }
250
251 void trim() {
252 trim(smallSubPageDirectCaches);
253 trim(normalDirectCaches);
254 trim(smallSubPageHeapCaches);
255 trim(normalHeapCaches);
256 }
257
258 private static void trim(MemoryRegionCache<?>[] caches) {
259 if (caches == null) {
260 return;
261 }
262 for (MemoryRegionCache<?> c: caches) {
263 trim(c);
264 }
265 }
266
267 private static void trim(MemoryRegionCache<?> cache) {
268 if (cache == null) {
269 return;
270 }
271 cache.trim();
272 }
273
274 private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int sizeIdx) {
275 if (area.isDirect()) {
276 return cache(smallSubPageDirectCaches, sizeIdx);
277 }
278 return cache(smallSubPageHeapCaches, sizeIdx);
279 }
280
281 private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int sizeIdx) {
282
283 int idx = sizeIdx - area.sizeClass.nSubpages;
284 if (area.isDirect()) {
285 return cache(normalDirectCaches, idx);
286 }
287 return cache(normalHeapCaches, idx);
288 }
289
290 private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int sizeIdx) {
291 if (cache == null || sizeIdx > cache.length - 1) {
292 return null;
293 }
294 return cache[sizeIdx];
295 }
296
297
298
299
300 private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
301 SubPageMemoryRegionCache(int size) {
302 super(size, SizeClass.Small);
303 }
304
305 @Override
306 protected void initBuf(
307 PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity,
308 PoolThreadCache threadCache) {
309 chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);
310 }
311 }
312
313
314
315
316 private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
317 NormalMemoryRegionCache(int size) {
318 super(size, SizeClass.Normal);
319 }
320
321 @Override
322 protected void initBuf(
323 PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity,
324 PoolThreadCache threadCache) {
325 chunk.initBuf(buf, nioBuffer, handle, reqCapacity, threadCache);
326 }
327 }
328
329 private abstract static class MemoryRegionCache<T> {
330 private final int size;
331 private final Queue<Entry<T>> queue;
332 private final SizeClass sizeClass;
333 private int allocations;
334
335 MemoryRegionCache(int size, SizeClass sizeClass) {
336 this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
337 queue = PlatformDependent.newFixedMpscUnpaddedQueue(this.size);
338 this.sizeClass = sizeClass;
339 }
340
341
342
343
344 protected abstract void initBuf(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle,
345 PooledByteBuf<T> buf, int reqCapacity, PoolThreadCache threadCache);
346
347
348
349
350 @SuppressWarnings("unchecked")
351 public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
352 Entry<T> entry = newEntry(chunk, nioBuffer, handle, normCapacity);
353 boolean queued = queue.offer(entry);
354 if (!queued) {
355
356 entry.unguardedRecycle();
357 }
358
359 return queued;
360 }
361
362
363
364
365 public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity, PoolThreadCache threadCache) {
366 Entry<T> entry = queue.poll();
367 if (entry == null) {
368 return false;
369 }
370 initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity, threadCache);
371 entry.unguardedRecycle();
372
373
374 ++ allocations;
375 return true;
376 }
377
378
379
380
381 public final int free(boolean finalizer) {
382 return free(Integer.MAX_VALUE, finalizer);
383 }
384
385 private int free(int max, boolean finalizer) {
386 int numFreed = 0;
387 for (; numFreed < max; numFreed++) {
388 Entry<T> entry = queue.poll();
389 if (entry != null) {
390 freeEntry(entry, finalizer);
391 } else {
392
393 return numFreed;
394 }
395 }
396 return numFreed;
397 }
398
399
400
401
402 public final void trim() {
403 int free = size - allocations;
404 allocations = 0;
405
406
407 if (free > 0) {
408 free(free, false);
409 }
410 }
411
412 @SuppressWarnings({ "unchecked", "rawtypes" })
413 private void freeEntry(Entry entry, boolean finalizer) {
414
415 PoolChunk chunk = entry.chunk;
416 long handle = entry.handle;
417 ByteBuffer nioBuffer = entry.nioBuffer;
418 int normCapacity = entry.normCapacity;
419
420 if (!finalizer) {
421
422
423 entry.recycle();
424 }
425
426 chunk.arena.freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, finalizer);
427 }
428
429 static final class Entry<T> {
430 final EnhancedHandle<Entry<?>> recyclerHandle;
431 PoolChunk<T> chunk;
432 ByteBuffer nioBuffer;
433 long handle = -1;
434 int normCapacity;
435
436 Entry(Handle<Entry<?>> recyclerHandle) {
437 this.recyclerHandle = (EnhancedHandle<Entry<?>>) recyclerHandle;
438 }
439
440 void recycle() {
441 chunk = null;
442 nioBuffer = null;
443 handle = -1;
444 recyclerHandle.recycle(this);
445 }
446
447 void unguardedRecycle() {
448 chunk = null;
449 nioBuffer = null;
450 handle = -1;
451 recyclerHandle.unguardedRecycle(this);
452 }
453 }
454
455 @SuppressWarnings("rawtypes")
456 private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
457 Entry entry = RECYCLER.get();
458 entry.chunk = chunk;
459 entry.nioBuffer = nioBuffer;
460 entry.handle = handle;
461 entry.normCapacity = normCapacity;
462 return entry;
463 }
464
465 @SuppressWarnings("rawtypes")
466 private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
467 @SuppressWarnings("unchecked")
468 @Override
469 public Entry newObject(Handle<Entry> handle) {
470 return new Entry(handle);
471 }
472 });
473 }
474
475 private static final class FreeOnFinalize {
476
477 private volatile PoolThreadCache cache;
478
479 private FreeOnFinalize(PoolThreadCache cache) {
480 this.cache = cache;
481 }
482
483
484 @SuppressWarnings({"FinalizeDeclaration", "deprecation"})
485 @Override
486 protected void finalize() throws Throwable {
487 try {
488 super.finalize();
489 } finally {
490 PoolThreadCache cache = this.cache;
491
492
493 this.cache = null;
494 if (cache != null) {
495 cache.free(true);
496 }
497 }
498 }
499 }
500 }