View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.buffer;
18  
19  
20  import io.netty.buffer.PoolArena.SizeClass;
21  import io.netty.util.Recycler;
22  import io.netty.util.Recycler.Handle;
23  import io.netty.util.ThreadDeathWatcher;
24  import io.netty.util.internal.MathUtil;
25  import io.netty.util.internal.PlatformDependent;
26  import io.netty.util.internal.logging.InternalLogger;
27  import io.netty.util.internal.logging.InternalLoggerFactory;
28  
29  import java.nio.ByteBuffer;
30  import java.util.Queue;
31  
32  /**
33   * Acts a Thread cache for allocations. This implementation is moduled after
34   * <a href="http://people.freebsd.org/~jasone/jemalloc/bsdcan2006/jemalloc.pdf">jemalloc</a> and the descripted
35   * technics of
36   * <a href="https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919">
37   * Scalable memory allocation using jemalloc</a>.
38   */
39  final class PoolThreadCache {
40  
41      private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
42  
43      final PoolArena<byte[]> heapArena;
44      final PoolArena<ByteBuffer> directArena;
45  
46      // Hold the caches for the different size classes, which are tiny, small and normal.
47      private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
48      private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
49      private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
50      private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
51      private final MemoryRegionCache<byte[]>[] normalHeapCaches;
52      private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
53  
54      // Used for bitshifting when calculate the index of normal caches later
55      private final int numShiftsNormalDirect;
56      private final int numShiftsNormalHeap;
57      private final int freeSweepAllocationThreshold;
58  
59      private final Thread deathWatchThread;
60      private final Runnable freeTask;
61  
62      private int allocations;
63  
64      // TODO: Test if adding padding helps under contention
65      //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
66  
67      PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
68                      int tinyCacheSize, int smallCacheSize, int normalCacheSize,
69                      int maxCachedBufferCapacity, int freeSweepAllocationThreshold,
70                      boolean useThreadDeathWatcher) {
71          if (maxCachedBufferCapacity < 0) {
72              throw new IllegalArgumentException("maxCachedBufferCapacity: "
73                      + maxCachedBufferCapacity + " (expected: >= 0)");
74          }
75          this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
76          this.heapArena = heapArena;
77          this.directArena = directArena;
78          if (directArena != null) {
79              tinySubPageDirectCaches = createSubPageCaches(
80                      tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
81              smallSubPageDirectCaches = createSubPageCaches(
82                      smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
83  
84              numShiftsNormalDirect = log2(directArena.pageSize);
85              normalDirectCaches = createNormalCaches(
86                      normalCacheSize, maxCachedBufferCapacity, directArena);
87  
88              directArena.numThreadCaches.getAndIncrement();
89          } else {
90              // No directArea is configured so just null out all caches
91              tinySubPageDirectCaches = null;
92              smallSubPageDirectCaches = null;
93              normalDirectCaches = null;
94              numShiftsNormalDirect = -1;
95          }
96          if (heapArena != null) {
97              // Create the caches for the heap allocations
98              tinySubPageHeapCaches = createSubPageCaches(
99                      tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
100             smallSubPageHeapCaches = createSubPageCaches(
101                     smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
102 
103             numShiftsNormalHeap = log2(heapArena.pageSize);
104             normalHeapCaches = createNormalCaches(
105                     normalCacheSize, maxCachedBufferCapacity, heapArena);
106 
107             heapArena.numThreadCaches.getAndIncrement();
108         } else {
109             // No heapArea is configured so just null out all caches
110             tinySubPageHeapCaches = null;
111             smallSubPageHeapCaches = null;
112             normalHeapCaches = null;
113             numShiftsNormalHeap = -1;
114         }
115 
116         // Only check if there are caches in use.
117         if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
118                 || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
119                 && freeSweepAllocationThreshold < 1) {
120             throw new IllegalArgumentException("freeSweepAllocationThreshold: "
121                     + freeSweepAllocationThreshold + " (expected: > 0)");
122         }
123 
124         if (useThreadDeathWatcher) {
125 
126             freeTask = new Runnable() {
127                 @Override
128                 public void run() {
129                     free0();
130                 }
131             };
132 
133             deathWatchThread = Thread.currentThread();
134 
135             // The thread-local cache will keep a list of pooled buffers which must be returned to
136             // the pool when the thread is not alive anymore.
137             ThreadDeathWatcher.watch(deathWatchThread, freeTask);
138         } else {
139             freeTask = null;
140             deathWatchThread = null;
141         }
142     }
143 
144     private static <T> MemoryRegionCache<T>[] createSubPageCaches(
145             int cacheSize, int numCaches, SizeClass sizeClass) {
146         if (cacheSize > 0 && numCaches > 0) {
147             @SuppressWarnings("unchecked")
148             MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
149             for (int i = 0; i < cache.length; i++) {
150                 // TODO: maybe use cacheSize / cache.length
151                 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
152             }
153             return cache;
154         } else {
155             return null;
156         }
157     }
158 
159     private static <T> MemoryRegionCache<T>[] createNormalCaches(
160             int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
161         if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
162             int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
163             int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
164 
165             @SuppressWarnings("unchecked")
166             MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
167             for (int i = 0; i < cache.length; i++) {
168                 cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
169             }
170             return cache;
171         } else {
172             return null;
173         }
174     }
175 
176     private static int log2(int val) {
177         int res = 0;
178         while (val > 1) {
179             val >>= 1;
180             res++;
181         }
182         return res;
183     }
184 
185     /**
186      * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
187      */
188     boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
189         return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
190     }
191 
192     /**
193      * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
194      */
195     boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
196         return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
197     }
198 
199     /**
200      * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
201      */
202     boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
203         return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
204     }
205 
206     @SuppressWarnings({ "unchecked", "rawtypes" })
207     private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
208         if (cache == null) {
209             // no cache found so just return false here
210             return false;
211         }
212         boolean allocated = cache.allocate(buf, reqCapacity);
213         if (++ allocations >= freeSweepAllocationThreshold) {
214             allocations = 0;
215             trim();
216         }
217         return allocated;
218     }
219 
220     /**
221      * Add {@link PoolChunk} and {@code handle} to the cache if there is enough room.
222      * Returns {@code true} if it fit into the cache {@code false} otherwise.
223      */
224     @SuppressWarnings({ "unchecked", "rawtypes" })
225     boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
226         MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
227         if (cache == null) {
228             return false;
229         }
230         return cache.add(chunk, handle);
231     }
232 
233     private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
234         switch (sizeClass) {
235         case Normal:
236             return cacheForNormal(area, normCapacity);
237         case Small:
238             return cacheForSmall(area, normCapacity);
239         case Tiny:
240             return cacheForTiny(area, normCapacity);
241         default:
242             throw new Error();
243         }
244     }
245 
246     /**
247      *  Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
248      */
249     void free() {
250         if (freeTask != null) {
251             assert deathWatchThread != null;
252             ThreadDeathWatcher.unwatch(deathWatchThread, freeTask);
253         }
254         free0();
255     }
256 
257     private void free0() {
258         int numFreed = free(tinySubPageDirectCaches) +
259                 free(smallSubPageDirectCaches) +
260                 free(normalDirectCaches) +
261                 free(tinySubPageHeapCaches) +
262                 free(smallSubPageHeapCaches) +
263                 free(normalHeapCaches);
264 
265         if (numFreed > 0 && logger.isDebugEnabled()) {
266             logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed, Thread.currentThread().getName());
267         }
268 
269         if (directArena != null) {
270             directArena.numThreadCaches.getAndDecrement();
271         }
272 
273         if (heapArena != null) {
274             heapArena.numThreadCaches.getAndDecrement();
275         }
276     }
277 
278     private static int free(MemoryRegionCache<?>[] caches) {
279         if (caches == null) {
280             return 0;
281         }
282 
283         int numFreed = 0;
284         for (MemoryRegionCache<?> c: caches) {
285             numFreed += free(c);
286         }
287         return numFreed;
288     }
289 
290     private static int free(MemoryRegionCache<?> cache) {
291         if (cache == null) {
292             return 0;
293         }
294         return cache.free();
295     }
296 
297     void trim() {
298         trim(tinySubPageDirectCaches);
299         trim(smallSubPageDirectCaches);
300         trim(normalDirectCaches);
301         trim(tinySubPageHeapCaches);
302         trim(smallSubPageHeapCaches);
303         trim(normalHeapCaches);
304     }
305 
306     private static void trim(MemoryRegionCache<?>[] caches) {
307         if (caches == null) {
308             return;
309         }
310         for (MemoryRegionCache<?> c: caches) {
311             trim(c);
312         }
313     }
314 
315     private static void trim(MemoryRegionCache<?> cache) {
316         if (cache == null) {
317             return;
318         }
319         cache.trim();
320     }
321 
322     private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
323         int idx = PoolArena.tinyIdx(normCapacity);
324         if (area.isDirect()) {
325             return cache(tinySubPageDirectCaches, idx);
326         }
327         return cache(tinySubPageHeapCaches, idx);
328     }
329 
330     private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
331         int idx = PoolArena.smallIdx(normCapacity);
332         if (area.isDirect()) {
333             return cache(smallSubPageDirectCaches, idx);
334         }
335         return cache(smallSubPageHeapCaches, idx);
336     }
337 
338     private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
339         if (area.isDirect()) {
340             int idx = log2(normCapacity >> numShiftsNormalDirect);
341             return cache(normalDirectCaches, idx);
342         }
343         int idx = log2(normCapacity >> numShiftsNormalHeap);
344         return cache(normalHeapCaches, idx);
345     }
346 
347     private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
348         if (cache == null || idx > cache.length - 1) {
349             return null;
350         }
351         return cache[idx];
352     }
353 
354     /**
355      * Cache used for buffers which are backed by TINY or SMALL size.
356      */
357     private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
358         SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
359             super(size, sizeClass);
360         }
361 
362         @Override
363         protected void initBuf(
364                 PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
365             chunk.initBufWithSubpage(buf, handle, reqCapacity);
366         }
367     }
368 
369     /**
370      * Cache used for buffers which are backed by NORMAL size.
371      */
372     private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
373         NormalMemoryRegionCache(int size) {
374             super(size, SizeClass.Normal);
375         }
376 
377         @Override
378         protected void initBuf(
379                 PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
380             chunk.initBuf(buf, handle, reqCapacity);
381         }
382     }
383 
384     private abstract static class MemoryRegionCache<T> {
385         private final int size;
386         private final Queue<Entry<T>> queue;
387         private final SizeClass sizeClass;
388         private int allocations;
389 
390         MemoryRegionCache(int size, SizeClass sizeClass) {
391             this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
392             queue = PlatformDependent.newFixedMpscQueue(this.size);
393             this.sizeClass = sizeClass;
394         }
395 
396         /**
397          * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions.
398          */
399         protected abstract void initBuf(PoolChunk<T> chunk, long handle,
400                                         PooledByteBuf<T> buf, int reqCapacity);
401 
402         /**
403          * Add to cache if not already full.
404          */
405         @SuppressWarnings("unchecked")
406         public final boolean add(PoolChunk<T> chunk, long handle) {
407             Entry<T> entry = newEntry(chunk, handle);
408             boolean queued = queue.offer(entry);
409             if (!queued) {
410                 // If it was not possible to cache the chunk, immediately recycle the entry
411                 entry.recycle();
412             }
413 
414             return queued;
415         }
416 
417         /**
418          * Allocate something out of the cache if possible and remove the entry from the cache.
419          */
420         public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
421             Entry<T> entry = queue.poll();
422             if (entry == null) {
423                 return false;
424             }
425             initBuf(entry.chunk, entry.handle, buf, reqCapacity);
426             entry.recycle();
427 
428             // allocations is not thread-safe which is fine as this is only called from the same thread all time.
429             ++ allocations;
430             return true;
431         }
432 
433         /**
434          * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s.
435          */
436         public final int free() {
437             return free(Integer.MAX_VALUE);
438         }
439 
440         private int free(int max) {
441             int numFreed = 0;
442             for (; numFreed < max; numFreed++) {
443                 Entry<T> entry = queue.poll();
444                 if (entry != null) {
445                     freeEntry(entry);
446                 } else {
447                     // all cleared
448                     return numFreed;
449                 }
450             }
451             return numFreed;
452         }
453 
454         /**
455          * Free up cached {@link PoolChunk}s if not allocated frequently enough.
456          */
457         public final void trim() {
458             int free = size - allocations;
459             allocations = 0;
460 
461             // We not even allocated all the number that are
462             if (free > 0) {
463                 free(free);
464             }
465         }
466 
467         @SuppressWarnings({ "unchecked", "rawtypes" })
468         private  void freeEntry(Entry entry) {
469             PoolChunk chunk = entry.chunk;
470             long handle = entry.handle;
471 
472             // recycle now so PoolChunk can be GC'ed.
473             entry.recycle();
474 
475             chunk.arena.freeChunk(chunk, handle, sizeClass);
476         }
477 
478         static final class Entry<T> {
479             final Handle<Entry<?>> recyclerHandle;
480             PoolChunk<T> chunk;
481             long handle = -1;
482 
483             Entry(Handle<Entry<?>> recyclerHandle) {
484                 this.recyclerHandle = recyclerHandle;
485             }
486 
487             void recycle() {
488                 chunk = null;
489                 handle = -1;
490                 recyclerHandle.recycle(this);
491             }
492         }
493 
494         @SuppressWarnings("rawtypes")
495         private static Entry newEntry(PoolChunk<?> chunk, long handle) {
496             Entry entry = RECYCLER.get();
497             entry.chunk = chunk;
498             entry.handle = handle;
499             return entry;
500         }
501 
502         @SuppressWarnings("rawtypes")
503         private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
504             @SuppressWarnings("unchecked")
505             @Override
506             protected Entry newObject(Handle<Entry> handle) {
507                 return new Entry(handle);
508             }
509         };
510     }
511 }