View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.buffer;
18  
19  
20  import io.netty.buffer.PoolArena.SizeClass;
21  import io.netty.util.Recycler;
22  import io.netty.util.Recycler.EnhancedHandle;
23  import io.netty.util.internal.MathUtil;
24  import io.netty.util.internal.ObjectPool.Handle;
25  import io.netty.util.internal.PlatformDependent;
26  import io.netty.util.internal.logging.InternalLogger;
27  import io.netty.util.internal.logging.InternalLoggerFactory;
28  
29  import java.nio.ByteBuffer;
30  import java.util.ArrayList;
31  import java.util.List;
32  import java.util.Queue;
33  import java.util.concurrent.atomic.AtomicBoolean;
34  
35  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
36  
37  /**
38   * Acts a Thread cache for allocations. This implementation is moduled after
39   * <a href="https://people.freebsd.org/~jasone/jemalloc/bsdcan2006/jemalloc.pdf">jemalloc</a> and the descripted
40   * technics of
41   * <a href="https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919">
42   * Scalable memory allocation using jemalloc</a>.
43   */
44  final class PoolThreadCache {
45  
46      private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
47      private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;
48  
49      final PoolArena<byte[]> heapArena;
50      final PoolArena<ByteBuffer> directArena;
51  
52      // Hold the caches for the different size classes, which are small and normal.
53      private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
54      private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
55      private final MemoryRegionCache<byte[]>[] normalHeapCaches;
56      private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
57  
58      private final int freeSweepAllocationThreshold;
59      private final AtomicBoolean freed = new AtomicBoolean();
60      @SuppressWarnings("unused") // Field is only here for the finalizer.
61      private final FreeOnFinalize freeOnFinalize;
62  
63      private int allocations;
64  
65      // TODO: Test if adding padding helps under contention
66      //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
67  
68      PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
69                      int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity,
70                      int freeSweepAllocationThreshold, boolean useFinalizer) {
71          checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
72          this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
73          this.heapArena = heapArena;
74          this.directArena = directArena;
75          if (directArena != null) {
76              smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.sizeClass.nSubpages);
77              normalDirectCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, directArena);
78              directArena.numThreadCaches.getAndIncrement();
79          } else {
80              // No directArea is configured so just null out all caches
81              smallSubPageDirectCaches = null;
82              normalDirectCaches = null;
83          }
84          if (heapArena != null) {
85              // Create the caches for the heap allocations
86              smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.sizeClass.nSubpages);
87              normalHeapCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, heapArena);
88              heapArena.numThreadCaches.getAndIncrement();
89          } else {
90              // No heapArea is configured so just null out all caches
91              smallSubPageHeapCaches = null;
92              normalHeapCaches = null;
93          }
94  
95          // Only check if there are caches in use.
96          if ((smallSubPageDirectCaches != null || normalDirectCaches != null
97                  || smallSubPageHeapCaches != null || normalHeapCaches != null)
98                  && freeSweepAllocationThreshold < 1) {
99              throw new IllegalArgumentException("freeSweepAllocationThreshold: "
100                     + freeSweepAllocationThreshold + " (expected: > 0)");
101         }
102         freeOnFinalize = useFinalizer ? new FreeOnFinalize(this) : null;
103     }
104 
105     private static <T> MemoryRegionCache<T>[] createSubPageCaches(
106             int cacheSize, int numCaches) {
107         if (cacheSize > 0 && numCaches > 0) {
108             @SuppressWarnings("unchecked")
109             MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
110             for (int i = 0; i < cache.length; i++) {
111                 // TODO: maybe use cacheSize / cache.length
112                 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize);
113             }
114             return cache;
115         } else {
116             return null;
117         }
118     }
119 
120     @SuppressWarnings("unchecked")
121     private static <T> MemoryRegionCache<T>[] createNormalCaches(
122             int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
123         if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
124             int max = Math.min(area.sizeClass.chunkSize, maxCachedBufferCapacity);
125             // Create as many normal caches as we support based on how many sizeIdx we have and what the upper
126             // bound is that we want to cache in general.
127             List<MemoryRegionCache<T>> cache = new ArrayList<MemoryRegionCache<T>>() ;
128             for (int idx = area.sizeClass.nSubpages; idx < area.sizeClass.nSizes &&
129                     area.sizeClass.sizeIdx2size(idx) <= max; idx++) {
130                 cache.add(new NormalMemoryRegionCache<T>(cacheSize));
131             }
132             return cache.toArray(new MemoryRegionCache[0]);
133         } else {
134             return null;
135         }
136     }
137 
138     // val > 0
139     static int log2(int val) {
140         return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);
141     }
142 
143     /**
144      * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
145      */
146     boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
147         return allocate(cacheForSmall(area, sizeIdx), buf, reqCapacity);
148     }
149 
150     /**
151      * Try to allocate a normal buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
152      */
153     boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
154         return allocate(cacheForNormal(area, sizeIdx), buf, reqCapacity);
155     }
156 
157     @SuppressWarnings({ "unchecked", "rawtypes" })
158     private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
159         if (cache == null) {
160             // no cache found so just return false here
161             return false;
162         }
163         boolean allocated = cache.allocate(buf, reqCapacity, this);
164         if (++ allocations >= freeSweepAllocationThreshold) {
165             allocations = 0;
166             trim();
167         }
168         return allocated;
169     }
170 
171     /**
172      * Add {@link PoolChunk} and {@code handle} to the cache if there is enough room.
173      * Returns {@code true} if it fit into the cache {@code false} otherwise.
174      */
175     @SuppressWarnings({ "unchecked", "rawtypes" })
176     boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
177                 long handle, int normCapacity, SizeClass sizeClass) {
178         int sizeIdx = area.sizeClass.size2SizeIdx(normCapacity);
179         MemoryRegionCache<?> cache = cache(area, sizeIdx, sizeClass);
180         if (cache == null) {
181             return false;
182         }
183         if (freed.get()) {
184             return false;
185         }
186         return cache.add(chunk, nioBuffer, handle, normCapacity);
187     }
188 
189     private MemoryRegionCache<?> cache(PoolArena<?> area, int sizeIdx, SizeClass sizeClass) {
190         switch (sizeClass) {
191         case Normal:
192             return cacheForNormal(area, sizeIdx);
193         case Small:
194             return cacheForSmall(area, sizeIdx);
195         default:
196             throw new Error();
197         }
198     }
199 
200     /**
201      *  Should be called if the Thread that uses this cache is about to exit to release resources out of the cache
202      */
203     void free(boolean finalizer) {
204         // As free() may be called either by the finalizer or by FastThreadLocal.onRemoval(...) we need to ensure
205         // we only call this one time.
206         if (freed.compareAndSet(false, true)) {
207             if (freeOnFinalize != null) {
208                 // Help GC: this can race with a finalizer thread, but will be null out regardless
209                 freeOnFinalize.cache = null;
210             }
211             int numFreed = free(smallSubPageDirectCaches, finalizer) +
212                            free(normalDirectCaches, finalizer) +
213                            free(smallSubPageHeapCaches, finalizer) +
214                            free(normalHeapCaches, finalizer);
215 
216             if (numFreed > 0 && logger.isDebugEnabled()) {
217                 logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed,
218                              Thread.currentThread().getName());
219             }
220 
221             if (directArena != null) {
222                 directArena.numThreadCaches.getAndDecrement();
223             }
224 
225             if (heapArena != null) {
226                 heapArena.numThreadCaches.getAndDecrement();
227             }
228         }
229     }
230 
231     private static int free(MemoryRegionCache<?>[] caches, boolean finalizer) {
232         if (caches == null) {
233             return 0;
234         }
235 
236         int numFreed = 0;
237         for (MemoryRegionCache<?> c: caches) {
238             numFreed += free(c, finalizer);
239         }
240         return numFreed;
241     }
242 
243     private static int free(MemoryRegionCache<?> cache, boolean finalizer) {
244         if (cache == null) {
245             return 0;
246         }
247         return cache.free(finalizer);
248     }
249 
250     void trim() {
251         trim(smallSubPageDirectCaches);
252         trim(normalDirectCaches);
253         trim(smallSubPageHeapCaches);
254         trim(normalHeapCaches);
255     }
256 
257     private static void trim(MemoryRegionCache<?>[] caches) {
258         if (caches == null) {
259             return;
260         }
261         for (MemoryRegionCache<?> c: caches) {
262             trim(c);
263         }
264     }
265 
266     private static void trim(MemoryRegionCache<?> cache) {
267         if (cache == null) {
268             return;
269         }
270         cache.trim();
271     }
272 
273     private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int sizeIdx) {
274         if (area.isDirect()) {
275             return cache(smallSubPageDirectCaches, sizeIdx);
276         }
277         return cache(smallSubPageHeapCaches, sizeIdx);
278     }
279 
280     private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int sizeIdx) {
281         // We need to subtract area.sizeClass.nSubpages as sizeIdx is the overall index for all sizes.
282         int idx = sizeIdx - area.sizeClass.nSubpages;
283         if (area.isDirect()) {
284             return cache(normalDirectCaches, idx);
285         }
286         return cache(normalHeapCaches, idx);
287     }
288 
289     private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int sizeIdx) {
290         if (cache == null || sizeIdx >= cache.length) {
291             return null;
292         }
293         return cache[sizeIdx];
294     }
295 
296     /**
297      * Cache used for buffers which are backed by TINY or SMALL size.
298      */
299     private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
300         SubPageMemoryRegionCache(int size) {
301             super(size, SizeClass.Small);
302         }
303 
304         @Override
305         protected void initBuf(
306                 PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity,
307                 PoolThreadCache threadCache) {
308             chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache, true);
309         }
310     }
311 
312     /**
313      * Cache used for buffers which are backed by NORMAL size.
314      */
315     private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
316         NormalMemoryRegionCache(int size) {
317             super(size, SizeClass.Normal);
318         }
319 
320         @Override
321         protected void initBuf(
322                 PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity,
323                 PoolThreadCache threadCache) {
324             chunk.initBuf(buf, nioBuffer, handle, reqCapacity, threadCache, true);
325         }
326     }
327 
328     private abstract static class MemoryRegionCache<T> {
329         private final int size;
330         private final Queue<Entry<T>> queue;
331         private final SizeClass sizeClass;
332         private int allocations;
333 
334         MemoryRegionCache(int size, SizeClass sizeClass) {
335             this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
336             queue = PlatformDependent.newFixedMpscUnpaddedQueue(this.size);
337             this.sizeClass = sizeClass;
338         }
339 
340         /**
341          * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions.
342          */
343         protected abstract void initBuf(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle,
344                                         PooledByteBuf<T> buf, int reqCapacity, PoolThreadCache threadCache);
345 
346         /**
347          * Add to cache if not already full.
348          */
349         @SuppressWarnings("unchecked")
350         public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
351             Entry<T> entry = newEntry(chunk, nioBuffer, handle, normCapacity);
352             boolean queued = queue.offer(entry);
353             if (!queued) {
354                 // If it was not possible to cache the chunk, immediately recycle the entry
355                 entry.unguardedRecycle();
356             }
357 
358             return queued;
359         }
360 
361         /**
362          * Allocate something out of the cache if possible and remove the entry from the cache.
363          */
364         public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity, PoolThreadCache threadCache) {
365             Entry<T> entry = queue.poll();
366             if (entry == null) {
367                 return false;
368             }
369             initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity, threadCache);
370             entry.unguardedRecycle();
371 
372             // allocations is not thread-safe which is fine as this is only called from the same thread all time.
373             ++ allocations;
374             return true;
375         }
376 
377         /**
378          * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s.
379          */
380         public final int free(boolean finalizer) {
381             return free(Integer.MAX_VALUE, finalizer);
382         }
383 
384         private int free(int max, boolean finalizer) {
385             int numFreed = 0;
386             for (; numFreed < max; numFreed++) {
387                 Entry<T> entry = queue.poll();
388                 if (entry != null) {
389                     freeEntry(entry, finalizer);
390                 } else {
391                     // all cleared
392                     return numFreed;
393                 }
394             }
395             return numFreed;
396         }
397 
398         /**
399          * Free up cached {@link PoolChunk}s if not allocated frequently enough.
400          */
401         public final void trim() {
402             int free = size - allocations;
403             allocations = 0;
404 
405             // We not even allocated all the number that are
406             if (free > 0) {
407                 free(free, false);
408             }
409         }
410 
411         @SuppressWarnings({ "unchecked", "rawtypes" })
412         private  void freeEntry(Entry entry, boolean finalizer) {
413             // Capture entry state before we recycle the entry object.
414             PoolChunk chunk = entry.chunk;
415             long handle = entry.handle;
416             ByteBuffer nioBuffer = entry.nioBuffer;
417             int normCapacity = entry.normCapacity;
418 
419             if (!finalizer) {
420                 // recycle now so PoolChunk can be GC'ed. This will only be done if this is not freed because of
421                 // a finalizer.
422                 entry.recycle();
423             }
424 
425             chunk.arena.freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, finalizer);
426         }
427 
428         static final class Entry<T> {
429             final EnhancedHandle<Entry<?>> recyclerHandle;
430             PoolChunk<T> chunk;
431             ByteBuffer nioBuffer;
432             long handle = -1;
433             int normCapacity;
434 
435             Entry(Handle<Entry<?>> recyclerHandle) {
436                 this.recyclerHandle = (EnhancedHandle<Entry<?>>) recyclerHandle;
437             }
438 
439             void recycle() {
440                 chunk = null;
441                 nioBuffer = null;
442                 handle = -1;
443                 recyclerHandle.recycle(this);
444             }
445 
446             void unguardedRecycle() {
447                 chunk = null;
448                 nioBuffer = null;
449                 handle = -1;
450                 recyclerHandle.unguardedRecycle(this);
451             }
452         }
453 
454         @SuppressWarnings("rawtypes")
455         private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
456             Entry entry = RECYCLER.get();
457             entry.chunk = chunk;
458             entry.nioBuffer = nioBuffer;
459             entry.handle = handle;
460             entry.normCapacity = normCapacity;
461             return entry;
462         }
463 
464         @SuppressWarnings("rawtypes")
465         private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
466             @Override
467             @SuppressWarnings("unchecked")
468             protected Entry newObject(Handle<Entry> handle) {
469                 return new Entry(handle);
470             }
471         };
472     }
473 
474     private static final class FreeOnFinalize {
475 
476         private volatile PoolThreadCache cache;
477 
478         private FreeOnFinalize(PoolThreadCache cache) {
479             this.cache = cache;
480         }
481 
482         /// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner.
483         @SuppressWarnings({"FinalizeDeclaration", "deprecation"})
484         @Override
485         protected void finalize() throws Throwable {
486             try {
487                 super.finalize();
488             } finally {
489                 PoolThreadCache cache = this.cache;
490                 // this can race with a non-finalizer thread calling free: regardless who wins, the cache will be
491                 // null out
492                 this.cache = null;
493                 if (cache != null) {
494                     cache.free(true);
495                 }
496             }
497         }
498     }
499 }