View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.buffer;
18  
19  
20  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
21  
22  import io.netty.buffer.PoolArena.SizeClass;
23  import io.netty.util.Recycler.EnhancedHandle;
24  import io.netty.util.internal.MathUtil;
25  import io.netty.util.internal.ObjectPool;
26  import io.netty.util.internal.ObjectPool.Handle;
27  import io.netty.util.internal.ObjectPool.ObjectCreator;
28  import io.netty.util.internal.PlatformDependent;
29  import io.netty.util.internal.logging.InternalLogger;
30  import io.netty.util.internal.logging.InternalLoggerFactory;
31  
32  import java.nio.ByteBuffer;
33  import java.util.ArrayList;
34  import java.util.List;
35  import java.util.Queue;
36  import java.util.concurrent.atomic.AtomicBoolean;
37  
38  /**
39   * Acts a Thread cache for allocations. This implementation is moduled after
40   * <a href="https://people.freebsd.org/~jasone/jemalloc/bsdcan2006/jemalloc.pdf">jemalloc</a> and the descripted
41   * technics of
42   * <a href="https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919">
43   * Scalable memory allocation using jemalloc</a>.
44   */
45  final class PoolThreadCache {
46  
47      private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
48      private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;
49  
50      final PoolArena<byte[]> heapArena;
51      final PoolArena<ByteBuffer> directArena;
52  
53      // Hold the caches for the different size classes, which are small and normal.
54      private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
55      private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
56      private final MemoryRegionCache<byte[]>[] normalHeapCaches;
57      private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
58  
59      private final int freeSweepAllocationThreshold;
60      private final AtomicBoolean freed = new AtomicBoolean();
61      @SuppressWarnings("unused") // Field is only here for the finalizer.
62      private final FreeOnFinalize freeOnFinalize;
63  
64      private int allocations;
65  
66      // TODO: Test if adding padding helps under contention
67      //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
68  
69      PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
70                      int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity,
71                      int freeSweepAllocationThreshold, boolean useFinalizer) {
72          checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
73          this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
74          this.heapArena = heapArena;
75          this.directArena = directArena;
76          if (directArena != null) {
77              smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.sizeClass.nSubpages);
78              normalDirectCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, directArena);
79              directArena.numThreadCaches.getAndIncrement();
80          } else {
81              // No directArea is configured so just null out all caches
82              smallSubPageDirectCaches = null;
83              normalDirectCaches = null;
84          }
85          if (heapArena != null) {
86              // Create the caches for the heap allocations
87              smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.sizeClass.nSubpages);
88              normalHeapCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, heapArena);
89              heapArena.numThreadCaches.getAndIncrement();
90          } else {
91              // No heapArea is configured so just null out all caches
92              smallSubPageHeapCaches = null;
93              normalHeapCaches = null;
94          }
95  
96          // Only check if there are caches in use.
97          if ((smallSubPageDirectCaches != null || normalDirectCaches != null
98                  || smallSubPageHeapCaches != null || normalHeapCaches != null)
99                  && freeSweepAllocationThreshold < 1) {
100             throw new IllegalArgumentException("freeSweepAllocationThreshold: "
101                     + freeSweepAllocationThreshold + " (expected: > 0)");
102         }
103         freeOnFinalize = useFinalizer ? new FreeOnFinalize(this) : null;
104     }
105 
106     private static <T> MemoryRegionCache<T>[] createSubPageCaches(
107             int cacheSize, int numCaches) {
108         if (cacheSize > 0 && numCaches > 0) {
109             @SuppressWarnings("unchecked")
110             MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
111             for (int i = 0; i < cache.length; i++) {
112                 // TODO: maybe use cacheSize / cache.length
113                 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize);
114             }
115             return cache;
116         } else {
117             return null;
118         }
119     }
120 
121     @SuppressWarnings("unchecked")
122     private static <T> MemoryRegionCache<T>[] createNormalCaches(
123             int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
124         if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
125             int max = Math.min(area.sizeClass.chunkSize, maxCachedBufferCapacity);
126             // Create as many normal caches as we support based on how many sizeIdx we have and what the upper
127             // bound is that we want to cache in general.
128             List<MemoryRegionCache<T>> cache = new ArrayList<MemoryRegionCache<T>>() ;
129             for (int idx = area.sizeClass.nSubpages; idx < area.sizeClass.nSizes &&
130                     area.sizeClass.sizeIdx2size(idx) <= max; idx++) {
131                 cache.add(new NormalMemoryRegionCache<T>(cacheSize));
132             }
133             return cache.toArray(new MemoryRegionCache[0]);
134         } else {
135             return null;
136         }
137     }
138 
139     // val > 0
140     static int log2(int val) {
141         return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);
142     }
143 
144     /**
145      * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
146      */
147     boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
148         return allocate(cacheForSmall(area, sizeIdx), buf, reqCapacity);
149     }
150 
151     /**
152      * Try to allocate a normal buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
153      */
154     boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
155         return allocate(cacheForNormal(area, sizeIdx), buf, reqCapacity);
156     }
157 
158     @SuppressWarnings({ "unchecked", "rawtypes" })
159     private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
160         if (cache == null) {
161             // no cache found so just return false here
162             return false;
163         }
164         boolean allocated = cache.allocate(buf, reqCapacity, this);
165         if (++ allocations >= freeSweepAllocationThreshold) {
166             allocations = 0;
167             trim();
168         }
169         return allocated;
170     }
171 
172     /**
173      * Add {@link PoolChunk} and {@code handle} to the cache if there is enough room.
174      * Returns {@code true} if it fit into the cache {@code false} otherwise.
175      */
176     @SuppressWarnings({ "unchecked", "rawtypes" })
177     boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
178                 long handle, int normCapacity, SizeClass sizeClass) {
179         int sizeIdx = area.sizeClass.size2SizeIdx(normCapacity);
180         MemoryRegionCache<?> cache = cache(area, sizeIdx, sizeClass);
181         if (cache == null) {
182             return false;
183         }
184         if (freed.get()) {
185             return false;
186         }
187         return cache.add(chunk, nioBuffer, handle, normCapacity);
188     }
189 
190     private MemoryRegionCache<?> cache(PoolArena<?> area, int sizeIdx, SizeClass sizeClass) {
191         switch (sizeClass) {
192         case Normal:
193             return cacheForNormal(area, sizeIdx);
194         case Small:
195             return cacheForSmall(area, sizeIdx);
196         default:
197             throw new Error();
198         }
199     }
200 
201     /**
202      *  Should be called if the Thread that uses this cache is about to exit to release resources out of the cache
203      */
204     void free(boolean finalizer) {
205         // As free() may be called either by the finalizer or by FastThreadLocal.onRemoval(...) we need to ensure
206         // we only call this one time.
207         if (freed.compareAndSet(false, true)) {
208             if (freeOnFinalize != null) {
209                 // Help GC: this can race with a finalizer thread, but will be null out regardless
210                 freeOnFinalize.cache = null;
211             }
212             int numFreed = free(smallSubPageDirectCaches, finalizer) +
213                            free(normalDirectCaches, finalizer) +
214                            free(smallSubPageHeapCaches, finalizer) +
215                            free(normalHeapCaches, finalizer);
216 
217             if (numFreed > 0 && logger.isDebugEnabled()) {
218                 logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed,
219                              Thread.currentThread().getName());
220             }
221 
222             if (directArena != null) {
223                 directArena.numThreadCaches.getAndDecrement();
224             }
225 
226             if (heapArena != null) {
227                 heapArena.numThreadCaches.getAndDecrement();
228             }
229         }
230     }
231 
232     private static int free(MemoryRegionCache<?>[] caches, boolean finalizer) {
233         if (caches == null) {
234             return 0;
235         }
236 
237         int numFreed = 0;
238         for (MemoryRegionCache<?> c: caches) {
239             numFreed += free(c, finalizer);
240         }
241         return numFreed;
242     }
243 
244     private static int free(MemoryRegionCache<?> cache, boolean finalizer) {
245         if (cache == null) {
246             return 0;
247         }
248         return cache.free(finalizer);
249     }
250 
251     void trim() {
252         trim(smallSubPageDirectCaches);
253         trim(normalDirectCaches);
254         trim(smallSubPageHeapCaches);
255         trim(normalHeapCaches);
256     }
257 
258     private static void trim(MemoryRegionCache<?>[] caches) {
259         if (caches == null) {
260             return;
261         }
262         for (MemoryRegionCache<?> c: caches) {
263             trim(c);
264         }
265     }
266 
267     private static void trim(MemoryRegionCache<?> cache) {
268         if (cache == null) {
269             return;
270         }
271         cache.trim();
272     }
273 
274     private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int sizeIdx) {
275         if (area.isDirect()) {
276             return cache(smallSubPageDirectCaches, sizeIdx);
277         }
278         return cache(smallSubPageHeapCaches, sizeIdx);
279     }
280 
281     private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int sizeIdx) {
282         // We need to subtract area.sizeClass.nSubpages as sizeIdx is the overall index for all sizes.
283         int idx = sizeIdx - area.sizeClass.nSubpages;
284         if (area.isDirect()) {
285             return cache(normalDirectCaches, idx);
286         }
287         return cache(normalHeapCaches, idx);
288     }
289 
290     private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int sizeIdx) {
291         if (cache == null || sizeIdx > cache.length - 1) {
292             return null;
293         }
294         return cache[sizeIdx];
295     }
296 
297     /**
298      * Cache used for buffers which are backed by TINY or SMALL size.
299      */
300     private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
301         SubPageMemoryRegionCache(int size) {
302             super(size, SizeClass.Small);
303         }
304 
305         @Override
306         protected void initBuf(
307                 PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity,
308                 PoolThreadCache threadCache) {
309             chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);
310         }
311     }
312 
313     /**
314      * Cache used for buffers which are backed by NORMAL size.
315      */
316     private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
317         NormalMemoryRegionCache(int size) {
318             super(size, SizeClass.Normal);
319         }
320 
321         @Override
322         protected void initBuf(
323                 PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity,
324                 PoolThreadCache threadCache) {
325             chunk.initBuf(buf, nioBuffer, handle, reqCapacity, threadCache);
326         }
327     }
328 
329     private abstract static class MemoryRegionCache<T> {
330         private final int size;
331         private final Queue<Entry<T>> queue;
332         private final SizeClass sizeClass;
333         private int allocations;
334 
335         MemoryRegionCache(int size, SizeClass sizeClass) {
336             this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
337             queue = PlatformDependent.newFixedMpscUnpaddedQueue(this.size);
338             this.sizeClass = sizeClass;
339         }
340 
341         /**
342          * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions.
343          */
344         protected abstract void initBuf(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle,
345                                         PooledByteBuf<T> buf, int reqCapacity, PoolThreadCache threadCache);
346 
347         /**
348          * Add to cache if not already full.
349          */
350         @SuppressWarnings("unchecked")
351         public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
352             Entry<T> entry = newEntry(chunk, nioBuffer, handle, normCapacity);
353             boolean queued = queue.offer(entry);
354             if (!queued) {
355                 // If it was not possible to cache the chunk, immediately recycle the entry
356                 entry.unguardedRecycle();
357             }
358 
359             return queued;
360         }
361 
362         /**
363          * Allocate something out of the cache if possible and remove the entry from the cache.
364          */
365         public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity, PoolThreadCache threadCache) {
366             Entry<T> entry = queue.poll();
367             if (entry == null) {
368                 return false;
369             }
370             initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity, threadCache);
371             entry.unguardedRecycle();
372 
373             // allocations is not thread-safe which is fine as this is only called from the same thread all time.
374             ++ allocations;
375             return true;
376         }
377 
378         /**
379          * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s.
380          */
381         public final int free(boolean finalizer) {
382             return free(Integer.MAX_VALUE, finalizer);
383         }
384 
385         private int free(int max, boolean finalizer) {
386             int numFreed = 0;
387             for (; numFreed < max; numFreed++) {
388                 Entry<T> entry = queue.poll();
389                 if (entry != null) {
390                     freeEntry(entry, finalizer);
391                 } else {
392                     // all cleared
393                     return numFreed;
394                 }
395             }
396             return numFreed;
397         }
398 
399         /**
400          * Free up cached {@link PoolChunk}s if not allocated frequently enough.
401          */
402         public final void trim() {
403             int free = size - allocations;
404             allocations = 0;
405 
406             // We not even allocated all the number that are
407             if (free > 0) {
408                 free(free, false);
409             }
410         }
411 
412         @SuppressWarnings({ "unchecked", "rawtypes" })
413         private  void freeEntry(Entry entry, boolean finalizer) {
414             // Capture entry state before we recycle the entry object.
415             PoolChunk chunk = entry.chunk;
416             long handle = entry.handle;
417             ByteBuffer nioBuffer = entry.nioBuffer;
418             int normCapacity = entry.normCapacity;
419 
420             if (!finalizer) {
421                 // recycle now so PoolChunk can be GC'ed. This will only be done if this is not freed because of
422                 // a finalizer.
423                 entry.recycle();
424             }
425 
426             chunk.arena.freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, finalizer);
427         }
428 
429         static final class Entry<T> {
430             final EnhancedHandle<Entry<?>> recyclerHandle;
431             PoolChunk<T> chunk;
432             ByteBuffer nioBuffer;
433             long handle = -1;
434             int normCapacity;
435 
436             Entry(Handle<Entry<?>> recyclerHandle) {
437                 this.recyclerHandle = (EnhancedHandle<Entry<?>>) recyclerHandle;
438             }
439 
440             void recycle() {
441                 chunk = null;
442                 nioBuffer = null;
443                 handle = -1;
444                 recyclerHandle.recycle(this);
445             }
446 
447             void unguardedRecycle() {
448                 chunk = null;
449                 nioBuffer = null;
450                 handle = -1;
451                 recyclerHandle.unguardedRecycle(this);
452             }
453         }
454 
455         @SuppressWarnings("rawtypes")
456         private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
457             Entry entry = RECYCLER.get();
458             entry.chunk = chunk;
459             entry.nioBuffer = nioBuffer;
460             entry.handle = handle;
461             entry.normCapacity = normCapacity;
462             return entry;
463         }
464 
465         @SuppressWarnings("rawtypes")
466         private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
467             @SuppressWarnings("unchecked")
468             @Override
469             public Entry newObject(Handle<Entry> handle) {
470                 return new Entry(handle);
471             }
472         });
473     }
474 
475     private static final class FreeOnFinalize {
476 
477         private volatile PoolThreadCache cache;
478 
479         private FreeOnFinalize(PoolThreadCache cache) {
480             this.cache = cache;
481         }
482 
483         /// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner.
484         @SuppressWarnings({"FinalizeDeclaration", "deprecation"})
485         @Override
486         protected void finalize() throws Throwable {
487             try {
488                 super.finalize();
489             } finally {
490                 PoolThreadCache cache = this.cache;
491                 // this can race with a non-finalizer thread calling free: regardless who wins, the cache will be
492                 // null out
493                 this.cache = null;
494                 if (cache != null) {
495                     cache.free(true);
496                 }
497             }
498         }
499     }
500 }