View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.buffer;
18  
19  
20  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
21  
22  import io.netty.buffer.PoolArena.SizeClass;
23  import io.netty.util.internal.MathUtil;
24  import io.netty.util.internal.ObjectPool;
25  import io.netty.util.internal.ObjectPool.Handle;
26  import io.netty.util.internal.ObjectPool.ObjectCreator;
27  import io.netty.util.internal.PlatformDependent;
28  import io.netty.util.internal.logging.InternalLogger;
29  import io.netty.util.internal.logging.InternalLoggerFactory;
30  
31  import java.nio.ByteBuffer;
32  import java.util.ArrayList;
33  import java.util.List;
34  import java.util.Queue;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  
37  /**
38   * Acts a Thread cache for allocations. This implementation is moduled after
39   * <a href="https://people.freebsd.org/~jasone/jemalloc/bsdcan2006/jemalloc.pdf">jemalloc</a> and the descripted
40   * technics of
41   * <a href="https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919">
42   * Scalable memory allocation using jemalloc</a>.
43   */
44  final class PoolThreadCache {
45  
46      private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
47      private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;
48  
49      final PoolArena<byte[]> heapArena;
50      final PoolArena<ByteBuffer> directArena;
51  
52      // Hold the caches for the different size classes, which are small and normal.
53      private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
54      private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
55      private final MemoryRegionCache<byte[]>[] normalHeapCaches;
56      private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
57  
58      private final int freeSweepAllocationThreshold;
59      private final AtomicBoolean freed = new AtomicBoolean();
60  
61      private int allocations;
62  
63      // TODO: Test if adding padding helps under contention
64      //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
65  
66      PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
67                      int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity,
68                      int freeSweepAllocationThreshold) {
69          checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
70          this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
71          this.heapArena = heapArena;
72          this.directArena = directArena;
73          if (directArena != null) {
74              smallSubPageDirectCaches = createSubPageCaches(
75                      smallCacheSize, directArena.numSmallSubpagePools);
76  
77              normalDirectCaches = createNormalCaches(
78                      normalCacheSize, maxCachedBufferCapacity, directArena);
79  
80              directArena.numThreadCaches.getAndIncrement();
81          } else {
82              // No directArea is configured so just null out all caches
83              smallSubPageDirectCaches = null;
84              normalDirectCaches = null;
85          }
86          if (heapArena != null) {
87              // Create the caches for the heap allocations
88              smallSubPageHeapCaches = createSubPageCaches(
89                      smallCacheSize, heapArena.numSmallSubpagePools);
90  
91              normalHeapCaches = createNormalCaches(
92                      normalCacheSize, maxCachedBufferCapacity, heapArena);
93  
94              heapArena.numThreadCaches.getAndIncrement();
95          } else {
96              // No heapArea is configured so just null out all caches
97              smallSubPageHeapCaches = null;
98              normalHeapCaches = null;
99          }
100 
101         // Only check if there are caches in use.
102         if ((smallSubPageDirectCaches != null || normalDirectCaches != null
103                 || smallSubPageHeapCaches != null || normalHeapCaches != null)
104                 && freeSweepAllocationThreshold < 1) {
105             throw new IllegalArgumentException("freeSweepAllocationThreshold: "
106                     + freeSweepAllocationThreshold + " (expected: > 0)");
107         }
108     }
109 
110     private static <T> MemoryRegionCache<T>[] createSubPageCaches(
111             int cacheSize, int numCaches) {
112         if (cacheSize > 0 && numCaches > 0) {
113             @SuppressWarnings("unchecked")
114             MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
115             for (int i = 0; i < cache.length; i++) {
116                 // TODO: maybe use cacheSize / cache.length
117                 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize);
118             }
119             return cache;
120         } else {
121             return null;
122         }
123     }
124 
125     @SuppressWarnings("unchecked")
126     private static <T> MemoryRegionCache<T>[] createNormalCaches(
127             int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
128         if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
129             int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
130             // Create as many normal caches as we support based on how many sizeIdx we have and what the upper
131             // bound is that we want to cache in general.
132             List<MemoryRegionCache<T>> cache = new ArrayList<MemoryRegionCache<T>>() ;
133             for (int idx = area.numSmallSubpagePools; idx < area.nSizes && area.sizeIdx2size(idx) <= max ; idx++) {
134                 cache.add(new NormalMemoryRegionCache<T>(cacheSize));
135             }
136             return cache.toArray(new MemoryRegionCache[0]);
137         } else {
138             return null;
139         }
140     }
141 
142     // val > 0
143     static int log2(int val) {
144         return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);
145     }
146 
147     /**
148      * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
149      */
150     boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
151         return allocate(cacheForSmall(area, sizeIdx), buf, reqCapacity);
152     }
153 
154     /**
155      * Try to allocate a normal buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
156      */
157     boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
158         return allocate(cacheForNormal(area, sizeIdx), buf, reqCapacity);
159     }
160 
161     @SuppressWarnings({ "unchecked", "rawtypes" })
162     private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
163         if (cache == null) {
164             // no cache found so just return false here
165             return false;
166         }
167         boolean allocated = cache.allocate(buf, reqCapacity, this);
168         if (++ allocations >= freeSweepAllocationThreshold) {
169             allocations = 0;
170             trim();
171         }
172         return allocated;
173     }
174 
175     /**
176      * Add {@link PoolChunk} and {@code handle} to the cache if there is enough room.
177      * Returns {@code true} if it fit into the cache {@code false} otherwise.
178      */
179     @SuppressWarnings({ "unchecked", "rawtypes" })
180     boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
181                 long handle, int normCapacity, SizeClass sizeClass) {
182         int sizeIdx = area.size2SizeIdx(normCapacity);
183         MemoryRegionCache<?> cache = cache(area, sizeIdx, sizeClass);
184         if (cache == null) {
185             return false;
186         }
187         return cache.add(chunk, nioBuffer, handle, normCapacity);
188     }
189 
190     private MemoryRegionCache<?> cache(PoolArena<?> area, int sizeIdx, SizeClass sizeClass) {
191         switch (sizeClass) {
192         case Normal:
193             return cacheForNormal(area, sizeIdx);
194         case Small:
195             return cacheForSmall(area, sizeIdx);
196         default:
197             throw new Error();
198         }
199     }
200 
201     /// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner.
202     @Override
203     protected void finalize() throws Throwable {
204         try {
205             super.finalize();
206         } finally {
207             free(true);
208         }
209     }
210 
211     /**
212      *  Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
213      */
214     void free(boolean finalizer) {
215         // As free() may be called either by the finalizer or by FastThreadLocal.onRemoval(...) we need to ensure
216         // we only call this one time.
217         if (freed.compareAndSet(false, true)) {
218             int numFreed = free(smallSubPageDirectCaches, finalizer) +
219                     free(normalDirectCaches, finalizer) +
220                     free(smallSubPageHeapCaches, finalizer) +
221                     free(normalHeapCaches, finalizer);
222 
223             if (numFreed > 0 && logger.isDebugEnabled()) {
224                 logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed,
225                         Thread.currentThread().getName());
226             }
227 
228             if (directArena != null) {
229                 directArena.numThreadCaches.getAndDecrement();
230             }
231 
232             if (heapArena != null) {
233                 heapArena.numThreadCaches.getAndDecrement();
234             }
235         }
236     }
237 
238     private static int free(MemoryRegionCache<?>[] caches, boolean finalizer) {
239         if (caches == null) {
240             return 0;
241         }
242 
243         int numFreed = 0;
244         for (MemoryRegionCache<?> c: caches) {
245             numFreed += free(c, finalizer);
246         }
247         return numFreed;
248     }
249 
250     private static int free(MemoryRegionCache<?> cache, boolean finalizer) {
251         if (cache == null) {
252             return 0;
253         }
254         return cache.free(finalizer);
255     }
256 
257     void trim() {
258         trim(smallSubPageDirectCaches);
259         trim(normalDirectCaches);
260         trim(smallSubPageHeapCaches);
261         trim(normalHeapCaches);
262     }
263 
264     private static void trim(MemoryRegionCache<?>[] caches) {
265         if (caches == null) {
266             return;
267         }
268         for (MemoryRegionCache<?> c: caches) {
269             trim(c);
270         }
271     }
272 
273     private static void trim(MemoryRegionCache<?> cache) {
274         if (cache == null) {
275             return;
276         }
277         cache.trim();
278     }
279 
280     private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int sizeIdx) {
281         if (area.isDirect()) {
282             return cache(smallSubPageDirectCaches, sizeIdx);
283         }
284         return cache(smallSubPageHeapCaches, sizeIdx);
285     }
286 
287     private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int sizeIdx) {
288         // We need to substract area.numSmallSubpagePools as sizeIdx is the overall index for all sizes.
289         int idx = sizeIdx - area.numSmallSubpagePools;
290         if (area.isDirect()) {
291             return cache(normalDirectCaches, idx);
292         }
293         return cache(normalHeapCaches, idx);
294     }
295 
296     private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int sizeIdx) {
297         if (cache == null || sizeIdx > cache.length - 1) {
298             return null;
299         }
300         return cache[sizeIdx];
301     }
302 
303     /**
304      * Cache used for buffers which are backed by TINY or SMALL size.
305      */
306     private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
307         SubPageMemoryRegionCache(int size) {
308             super(size, SizeClass.Small);
309         }
310 
311         @Override
312         protected void initBuf(
313                 PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity,
314                 PoolThreadCache threadCache) {
315             chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);
316         }
317     }
318 
319     /**
320      * Cache used for buffers which are backed by NORMAL size.
321      */
322     private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
323         NormalMemoryRegionCache(int size) {
324             super(size, SizeClass.Normal);
325         }
326 
327         @Override
328         protected void initBuf(
329                 PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity,
330                 PoolThreadCache threadCache) {
331             chunk.initBuf(buf, nioBuffer, handle, reqCapacity, threadCache);
332         }
333     }
334 
335     private abstract static class MemoryRegionCache<T> {
336         private final int size;
337         private final Queue<Entry<T>> queue;
338         private final SizeClass sizeClass;
339         private int allocations;
340 
341         MemoryRegionCache(int size, SizeClass sizeClass) {
342             this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
343             queue = PlatformDependent.newFixedMpscQueue(this.size);
344             this.sizeClass = sizeClass;
345         }
346 
347         /**
348          * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions.
349          */
350         protected abstract void initBuf(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle,
351                                         PooledByteBuf<T> buf, int reqCapacity, PoolThreadCache threadCache);
352 
353         /**
354          * Add to cache if not already full.
355          */
356         @SuppressWarnings("unchecked")
357         public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
358             Entry<T> entry = newEntry(chunk, nioBuffer, handle, normCapacity);
359             boolean queued = queue.offer(entry);
360             if (!queued) {
361                 // If it was not possible to cache the chunk, immediately recycle the entry
362                 entry.recycle();
363             }
364 
365             return queued;
366         }
367 
368         /**
369          * Allocate something out of the cache if possible and remove the entry from the cache.
370          */
371         public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity, PoolThreadCache threadCache) {
372             Entry<T> entry = queue.poll();
373             if (entry == null) {
374                 return false;
375             }
376             initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity, threadCache);
377             entry.recycle();
378 
379             // allocations is not thread-safe which is fine as this is only called from the same thread all time.
380             ++ allocations;
381             return true;
382         }
383 
384         /**
385          * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s.
386          */
387         public final int free(boolean finalizer) {
388             return free(Integer.MAX_VALUE, finalizer);
389         }
390 
391         private int free(int max, boolean finalizer) {
392             int numFreed = 0;
393             for (; numFreed < max; numFreed++) {
394                 Entry<T> entry = queue.poll();
395                 if (entry != null) {
396                     freeEntry(entry, finalizer);
397                 } else {
398                     // all cleared
399                     return numFreed;
400                 }
401             }
402             return numFreed;
403         }
404 
405         /**
406          * Free up cached {@link PoolChunk}s if not allocated frequently enough.
407          */
408         public final void trim() {
409             int free = size - allocations;
410             allocations = 0;
411 
412             // We not even allocated all the number that are
413             if (free > 0) {
414                 free(free, false);
415             }
416         }
417 
418         @SuppressWarnings({ "unchecked", "rawtypes" })
419         private  void freeEntry(Entry entry, boolean finalizer) {
420             // Capture entry state before we recycle the entry object.
421             PoolChunk chunk = entry.chunk;
422             long handle = entry.handle;
423             ByteBuffer nioBuffer = entry.nioBuffer;
424             int normCapacity = entry.normCapacity;
425 
426             if (!finalizer) {
427                 // recycle now so PoolChunk can be GC'ed. This will only be done if this is not freed because of
428                 // a finalizer.
429                 entry.recycle();
430             }
431 
432             chunk.arena.freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, finalizer);
433         }
434 
435         static final class Entry<T> {
436             final Handle<Entry<?>> recyclerHandle;
437             PoolChunk<T> chunk;
438             ByteBuffer nioBuffer;
439             long handle = -1;
440             int normCapacity;
441 
442             Entry(Handle<Entry<?>> recyclerHandle) {
443                 this.recyclerHandle = recyclerHandle;
444             }
445 
446             void recycle() {
447                 chunk = null;
448                 nioBuffer = null;
449                 handle = -1;
450                 recyclerHandle.recycle(this);
451             }
452         }
453 
454         @SuppressWarnings("rawtypes")
455         private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
456             Entry entry = RECYCLER.get();
457             entry.chunk = chunk;
458             entry.nioBuffer = nioBuffer;
459             entry.handle = handle;
460             entry.normCapacity = normCapacity;
461             return entry;
462         }
463 
464         @SuppressWarnings("rawtypes")
465         private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
466             @SuppressWarnings("unchecked")
467             @Override
468             public Entry newObject(Handle<Entry> handle) {
469                 return new Entry(handle);
470             }
471         });
472     }
473 }