View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.buffer;
18  
19  
20  import io.netty.buffer.PoolArena.SizeClass;
21  import io.netty.util.Recycler;
22  import io.netty.util.Recycler.Handle;
23  import io.netty.util.internal.MathUtil;
24  import io.netty.util.internal.PlatformDependent;
25  import io.netty.util.internal.logging.InternalLogger;
26  import io.netty.util.internal.logging.InternalLoggerFactory;
27  
28  import java.nio.ByteBuffer;
29  import java.util.Queue;
30  import java.util.concurrent.atomic.AtomicBoolean;
31  
32  /**
33   * Acts a Thread cache for allocations. This implementation is moduled after
34   * <a href="http://people.freebsd.org/~jasone/jemalloc/bsdcan2006/jemalloc.pdf">jemalloc</a> and the descripted
35   * technics of
36   * <a href="https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919">
37   * Scalable memory allocation using jemalloc</a>.
38   */
39  final class PoolThreadCache {
40  
41      private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
42  
43      final PoolArena<byte[]> heapArena;
44      final PoolArena<ByteBuffer> directArena;
45  
46      // Hold the caches for the different size classes, which are tiny, small and normal.
47      private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
48      private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
49      private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
50      private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
51      private final MemoryRegionCache<byte[]>[] normalHeapCaches;
52      private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
53  
54      // Used for bitshifting when calculate the index of normal caches later
55      private final int numShiftsNormalDirect;
56      private final int numShiftsNormalHeap;
57      private final int freeSweepAllocationThreshold;
58      private final AtomicBoolean freed = new AtomicBoolean();
59  
60      private int allocations;
61  
62      // TODO: Test if adding padding helps under contention
63      //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
64  
65      PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
66                      int tinyCacheSize, int smallCacheSize, int normalCacheSize,
67                      int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
68          if (maxCachedBufferCapacity < 0) {
69              throw new IllegalArgumentException("maxCachedBufferCapacity: "
70                      + maxCachedBufferCapacity + " (expected: >= 0)");
71          }
72          this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
73          this.heapArena = heapArena;
74          this.directArena = directArena;
75          if (directArena != null) {
76              tinySubPageDirectCaches = createSubPageCaches(
77                      tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
78              smallSubPageDirectCaches = createSubPageCaches(
79                      smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
80  
81              numShiftsNormalDirect = log2(directArena.pageSize);
82              normalDirectCaches = createNormalCaches(
83                      normalCacheSize, maxCachedBufferCapacity, directArena);
84  
85              directArena.numThreadCaches.getAndIncrement();
86          } else {
87              // No directArea is configured so just null out all caches
88              tinySubPageDirectCaches = null;
89              smallSubPageDirectCaches = null;
90              normalDirectCaches = null;
91              numShiftsNormalDirect = -1;
92          }
93          if (heapArena != null) {
94              // Create the caches for the heap allocations
95              tinySubPageHeapCaches = createSubPageCaches(
96                      tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
97              smallSubPageHeapCaches = createSubPageCaches(
98                      smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
99  
100             numShiftsNormalHeap = log2(heapArena.pageSize);
101             normalHeapCaches = createNormalCaches(
102                     normalCacheSize, maxCachedBufferCapacity, heapArena);
103 
104             heapArena.numThreadCaches.getAndIncrement();
105         } else {
106             // No heapArea is configured so just null out all caches
107             tinySubPageHeapCaches = null;
108             smallSubPageHeapCaches = null;
109             normalHeapCaches = null;
110             numShiftsNormalHeap = -1;
111         }
112 
113         // Only check if there are caches in use.
114         if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
115                 || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
116                 && freeSweepAllocationThreshold < 1) {
117             throw new IllegalArgumentException("freeSweepAllocationThreshold: "
118                     + freeSweepAllocationThreshold + " (expected: > 0)");
119         }
120     }
121 
122     private static <T> MemoryRegionCache<T>[] createSubPageCaches(
123             int cacheSize, int numCaches, SizeClass sizeClass) {
124         if (cacheSize > 0 && numCaches > 0) {
125             @SuppressWarnings("unchecked")
126             MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
127             for (int i = 0; i < cache.length; i++) {
128                 // TODO: maybe use cacheSize / cache.length
129                 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
130             }
131             return cache;
132         } else {
133             return null;
134         }
135     }
136 
137     private static <T> MemoryRegionCache<T>[] createNormalCaches(
138             int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
139         if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
140             int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
141             int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
142 
143             @SuppressWarnings("unchecked")
144             MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
145             for (int i = 0; i < cache.length; i++) {
146                 cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
147             }
148             return cache;
149         } else {
150             return null;
151         }
152     }
153 
154     private static int log2(int val) {
155         int res = 0;
156         while (val > 1) {
157             val >>= 1;
158             res++;
159         }
160         return res;
161     }
162 
163     /**
164      * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
165      */
166     boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
167         return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
168     }
169 
170     /**
171      * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
172      */
173     boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
174         return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
175     }
176 
177     /**
178      * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
179      */
180     boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
181         return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
182     }
183 
184     @SuppressWarnings({ "unchecked", "rawtypes" })
185     private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
186         if (cache == null) {
187             // no cache found so just return false here
188             return false;
189         }
190         boolean allocated = cache.allocate(buf, reqCapacity);
191         if (++ allocations >= freeSweepAllocationThreshold) {
192             allocations = 0;
193             trim();
194         }
195         return allocated;
196     }
197 
198     /**
199      * Add {@link PoolChunk} and {@code handle} to the cache if there is enough room.
200      * Returns {@code true} if it fit into the cache {@code false} otherwise.
201      */
202     @SuppressWarnings({ "unchecked", "rawtypes" })
203     boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
204         MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
205         if (cache == null) {
206             return false;
207         }
208         return cache.add(chunk, handle);
209     }
210 
211     private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
212         switch (sizeClass) {
213         case Normal:
214             return cacheForNormal(area, normCapacity);
215         case Small:
216             return cacheForSmall(area, normCapacity);
217         case Tiny:
218             return cacheForTiny(area, normCapacity);
219         default:
220             throw new Error();
221         }
222     }
223 
224     /// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner.
225     @Override
226     protected void finalize() throws Throwable {
227         try {
228             super.finalize();
229         } finally {
230             free();
231         }
232     }
233 
234     /**
235      *  Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
236      */
237     void free() {
238         // As free() may be called either by the finalizer or by FastThreadLocal.onRemoval(...) we need to ensure
239         // we only call this one time.
240         if (freed.compareAndSet(false, true)) {
241             int numFreed = free(tinySubPageDirectCaches) +
242                     free(smallSubPageDirectCaches) +
243                     free(normalDirectCaches) +
244                     free(tinySubPageHeapCaches) +
245                     free(smallSubPageHeapCaches) +
246                     free(normalHeapCaches);
247 
248             if (numFreed > 0 && logger.isDebugEnabled()) {
249                 logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed,
250                         Thread.currentThread().getName());
251             }
252 
253             if (directArena != null) {
254                 directArena.numThreadCaches.getAndDecrement();
255             }
256 
257             if (heapArena != null) {
258                 heapArena.numThreadCaches.getAndDecrement();
259             }
260         }
261     }
262 
263     private static int free(MemoryRegionCache<?>[] caches) {
264         if (caches == null) {
265             return 0;
266         }
267 
268         int numFreed = 0;
269         for (MemoryRegionCache<?> c: caches) {
270             numFreed += free(c);
271         }
272         return numFreed;
273     }
274 
275     private static int free(MemoryRegionCache<?> cache) {
276         if (cache == null) {
277             return 0;
278         }
279         return cache.free();
280     }
281 
282     void trim() {
283         trim(tinySubPageDirectCaches);
284         trim(smallSubPageDirectCaches);
285         trim(normalDirectCaches);
286         trim(tinySubPageHeapCaches);
287         trim(smallSubPageHeapCaches);
288         trim(normalHeapCaches);
289     }
290 
291     private static void trim(MemoryRegionCache<?>[] caches) {
292         if (caches == null) {
293             return;
294         }
295         for (MemoryRegionCache<?> c: caches) {
296             trim(c);
297         }
298     }
299 
300     private static void trim(MemoryRegionCache<?> cache) {
301         if (cache == null) {
302             return;
303         }
304         cache.trim();
305     }
306 
307     private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
308         int idx = PoolArena.tinyIdx(normCapacity);
309         if (area.isDirect()) {
310             return cache(tinySubPageDirectCaches, idx);
311         }
312         return cache(tinySubPageHeapCaches, idx);
313     }
314 
315     private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
316         int idx = PoolArena.smallIdx(normCapacity);
317         if (area.isDirect()) {
318             return cache(smallSubPageDirectCaches, idx);
319         }
320         return cache(smallSubPageHeapCaches, idx);
321     }
322 
323     private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
324         if (area.isDirect()) {
325             int idx = log2(normCapacity >> numShiftsNormalDirect);
326             return cache(normalDirectCaches, idx);
327         }
328         int idx = log2(normCapacity >> numShiftsNormalHeap);
329         return cache(normalHeapCaches, idx);
330     }
331 
332     private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
333         if (cache == null || idx > cache.length - 1) {
334             return null;
335         }
336         return cache[idx];
337     }
338 
339     /**
340      * Cache used for buffers which are backed by TINY or SMALL size.
341      */
342     private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
343         SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
344             super(size, sizeClass);
345         }
346 
347         @Override
348         protected void initBuf(
349                 PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
350             chunk.initBufWithSubpage(buf, handle, reqCapacity);
351         }
352     }
353 
354     /**
355      * Cache used for buffers which are backed by NORMAL size.
356      */
357     private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
358         NormalMemoryRegionCache(int size) {
359             super(size, SizeClass.Normal);
360         }
361 
362         @Override
363         protected void initBuf(
364                 PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
365             chunk.initBuf(buf, handle, reqCapacity);
366         }
367     }
368 
369     private abstract static class MemoryRegionCache<T> {
370         private final int size;
371         private final Queue<Entry<T>> queue;
372         private final SizeClass sizeClass;
373         private int allocations;
374 
375         MemoryRegionCache(int size, SizeClass sizeClass) {
376             this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
377             queue = PlatformDependent.newFixedMpscQueue(this.size);
378             this.sizeClass = sizeClass;
379         }
380 
381         /**
382          * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions.
383          */
384         protected abstract void initBuf(PoolChunk<T> chunk, long handle,
385                                         PooledByteBuf<T> buf, int reqCapacity);
386 
387         /**
388          * Add to cache if not already full.
389          */
390         @SuppressWarnings("unchecked")
391         public final boolean add(PoolChunk<T> chunk, long handle) {
392             Entry<T> entry = newEntry(chunk, handle);
393             boolean queued = queue.offer(entry);
394             if (!queued) {
395                 // If it was not possible to cache the chunk, immediately recycle the entry
396                 entry.recycle();
397             }
398 
399             return queued;
400         }
401 
402         /**
403          * Allocate something out of the cache if possible and remove the entry from the cache.
404          */
405         public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
406             Entry<T> entry = queue.poll();
407             if (entry == null) {
408                 return false;
409             }
410             initBuf(entry.chunk, entry.handle, buf, reqCapacity);
411             entry.recycle();
412 
413             // allocations is not thread-safe which is fine as this is only called from the same thread all time.
414             ++ allocations;
415             return true;
416         }
417 
418         /**
419          * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s.
420          */
421         public final int free() {
422             return free(Integer.MAX_VALUE);
423         }
424 
425         private int free(int max) {
426             int numFreed = 0;
427             for (; numFreed < max; numFreed++) {
428                 Entry<T> entry = queue.poll();
429                 if (entry != null) {
430                     freeEntry(entry);
431                 } else {
432                     // all cleared
433                     return numFreed;
434                 }
435             }
436             return numFreed;
437         }
438 
439         /**
440          * Free up cached {@link PoolChunk}s if not allocated frequently enough.
441          */
442         public final void trim() {
443             int free = size - allocations;
444             allocations = 0;
445 
446             // We not even allocated all the number that are
447             if (free > 0) {
448                 free(free);
449             }
450         }
451 
452         @SuppressWarnings({ "unchecked", "rawtypes" })
453         private  void freeEntry(Entry entry) {
454             PoolChunk chunk = entry.chunk;
455             long handle = entry.handle;
456 
457             // recycle now so PoolChunk can be GC'ed.
458             entry.recycle();
459 
460             chunk.arena.freeChunk(chunk, handle, sizeClass);
461         }
462 
463         static final class Entry<T> {
464             final Handle<Entry<?>> recyclerHandle;
465             PoolChunk<T> chunk;
466             long handle = -1;
467 
468             Entry(Handle<Entry<?>> recyclerHandle) {
469                 this.recyclerHandle = recyclerHandle;
470             }
471 
472             void recycle() {
473                 chunk = null;
474                 handle = -1;
475                 recyclerHandle.recycle(this);
476             }
477         }
478 
479         @SuppressWarnings("rawtypes")
480         private static Entry newEntry(PoolChunk<?> chunk, long handle) {
481             Entry entry = RECYCLER.get();
482             entry.chunk = chunk;
483             entry.handle = handle;
484             return entry;
485         }
486 
487         @SuppressWarnings("rawtypes")
488         private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
489             @SuppressWarnings("unchecked")
490             @Override
491             protected Entry newObject(Handle<Entry> handle) {
492                 return new Entry(handle);
493             }
494         };
495     }
496 }