View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.buffer;
18  
19  
20  import io.netty.util.ThreadDeathWatcher;
21  import io.netty.util.internal.logging.InternalLogger;
22  import io.netty.util.internal.logging.InternalLoggerFactory;
23  
24  import java.nio.ByteBuffer;
25  
26  /**
27   * Acts a Thread cache for allocations. This implementation is moduled after
28   * <a href="http://people.freebsd.org/~jasone/jemalloc/bsdcan2006/jemalloc.pdf">jemalloc</a> and the descripted
29   * technics of <a href="https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/
30   * 480222803919">Scalable memory allocation using jemalloc</a>.
31   */
32  final class PoolThreadCache {
33  
34      private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
35  
36      final PoolArena<byte[]> heapArena;
37      final PoolArena<ByteBuffer> directArena;
38  
39      // Hold the caches for the different size classes, which are tiny, small and normal.
40      private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
41      private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
42      private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
43      private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
44      private final MemoryRegionCache<byte[]>[] normalHeapCaches;
45      private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
46  
47      // Used for bitshifting when calculate the index of normal caches later
48      private final int numShiftsNormalDirect;
49      private final int numShiftsNormalHeap;
50      private final int freeSweepAllocationThreshold;
51  
52      private int allocations;
53  
54      private final Thread thread = Thread.currentThread();
55      private final Runnable freeTask = new Runnable() {
56          @Override
57          public void run() {
58              free0();
59          }
60      };
61  
62      // TODO: Test if adding padding helps under contention
63      //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
64  
65      PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
66                      int tinyCacheSize, int smallCacheSize, int normalCacheSize,
67                      int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
68          if (maxCachedBufferCapacity < 0) {
69              throw new IllegalArgumentException("maxCachedBufferCapacity: "
70                      + maxCachedBufferCapacity + " (expected: >= 0)");
71          }
72          if (freeSweepAllocationThreshold < 1) {
73              throw new IllegalArgumentException("freeSweepAllocationThreshold: "
74                      + maxCachedBufferCapacity + " (expected: > 0)");
75          }
76          this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
77          this.heapArena = heapArena;
78          this.directArena = directArena;
79          if (directArena != null) {
80              tinySubPageDirectCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools);
81              smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.numSmallSubpagePools);
82  
83              numShiftsNormalDirect = log2(directArena.pageSize);
84              normalDirectCaches = createNormalCaches(
85                      normalCacheSize, maxCachedBufferCapacity, directArena);
86          } else {
87              // No directArea is configured so just null out all caches
88              tinySubPageDirectCaches = null;
89              smallSubPageDirectCaches = null;
90              normalDirectCaches = null;
91              numShiftsNormalDirect = -1;
92          }
93          if (heapArena != null) {
94              // Create the caches for the heap allocations
95              tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools);
96              smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools);
97  
98              numShiftsNormalHeap = log2(heapArena.pageSize);
99              normalHeapCaches = createNormalCaches(
100                     normalCacheSize, maxCachedBufferCapacity, heapArena);
101         } else {
102             // No heapArea is configured so just null out all caches
103             tinySubPageHeapCaches = null;
104             smallSubPageHeapCaches = null;
105             normalHeapCaches = null;
106             numShiftsNormalHeap = -1;
107         }
108 
109         // The thread-local cache will keep a list of pooled buffers which must be returned to
110         // the pool when the thread is not alive anymore.
111         ThreadDeathWatcher.watch(thread, freeTask);
112     }
113 
114     private static <T> SubPageMemoryRegionCache<T>[] createSubPageCaches(int cacheSize, int numCaches) {
115         if (cacheSize > 0) {
116             @SuppressWarnings("unchecked")
117             SubPageMemoryRegionCache<T>[] cache = new SubPageMemoryRegionCache[numCaches];
118             for (int i = 0; i < cache.length; i++) {
119                 // TODO: maybe use cacheSize / cache.length
120                 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize);
121             }
122             return cache;
123         } else {
124             return null;
125         }
126     }
127 
128     private static <T> NormalMemoryRegionCache<T>[] createNormalCaches(
129             int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
130         if (cacheSize > 0) {
131             int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
132             int arraySize = Math.max(1, max / area.pageSize);
133 
134             @SuppressWarnings("unchecked")
135             NormalMemoryRegionCache<T>[] cache = new NormalMemoryRegionCache[arraySize];
136             for (int i = 0; i < cache.length; i++) {
137                 cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
138             }
139             return cache;
140         } else {
141             return null;
142         }
143     }
144 
145     private static int log2(int val) {
146         int res = 0;
147         while (val > 1) {
148             val >>= 1;
149             res++;
150         }
151         return res;
152     }
153 
154     /**
155      * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
156      */
157     boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
158         return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
159     }
160 
161     /**
162      * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
163      */
164     boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
165         return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
166     }
167 
168     /**
169      * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
170      */
171     boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
172         return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
173     }
174 
175     @SuppressWarnings({ "unchecked", "rawtypes" })
176     private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
177         if (cache == null) {
178             // no cache found so just return false here
179             return false;
180         }
181         boolean allocated = cache.allocate(buf, reqCapacity);
182         if (++ allocations >= freeSweepAllocationThreshold) {
183             allocations = 0;
184             trim();
185         }
186         return allocated;
187     }
188 
189     /**
190      * Add {@link PoolChunk} and {@code handle} to the cache if there is enough room.
191      * Returns {@code true} if it fit into the cache {@code false} otherwise.
192      */
193     @SuppressWarnings({ "unchecked", "rawtypes" })
194     boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity) {
195         MemoryRegionCache<?> cache;
196         if (area.isTinyOrSmall(normCapacity)) {
197             if (PoolArena.isTiny(normCapacity)) {
198                 cache = cacheForTiny(area, normCapacity);
199             } else {
200                 cache = cacheForSmall(area, normCapacity);
201             }
202         } else {
203             cache = cacheForNormal(area, normCapacity);
204         }
205         if (cache == null) {
206             return false;
207         }
208         return cache.add(chunk, handle);
209     }
210 
211     /**
212      *  Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
213      */
214     void free() {
215         ThreadDeathWatcher.unwatch(thread, freeTask);
216         free0();
217     }
218 
219     private void free0() {
220         int numFreed = free(tinySubPageDirectCaches) +
221                 free(smallSubPageDirectCaches) +
222                 free(normalDirectCaches) +
223                 free(tinySubPageHeapCaches) +
224                 free(smallSubPageHeapCaches) +
225                 free(normalHeapCaches);
226 
227         if (numFreed > 0 && logger.isDebugEnabled()) {
228             logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed, thread.getName());
229         }
230     }
231 
232     private static int free(MemoryRegionCache<?>[] caches) {
233         if (caches == null) {
234             return 0;
235         }
236 
237         int numFreed = 0;
238         for (MemoryRegionCache<?> c: caches) {
239             numFreed += free(c);
240         }
241         return numFreed;
242     }
243 
244     private static int free(MemoryRegionCache<?> cache) {
245         if (cache == null) {
246             return 0;
247         }
248         return cache.free();
249     }
250 
251     void trim() {
252         trim(tinySubPageDirectCaches);
253         trim(smallSubPageDirectCaches);
254         trim(normalDirectCaches);
255         trim(tinySubPageHeapCaches);
256         trim(smallSubPageHeapCaches);
257         trim(normalHeapCaches);
258     }
259 
260     private static void trim(MemoryRegionCache<?>[] caches) {
261         if (caches == null) {
262             return;
263         }
264         for (MemoryRegionCache<?> c: caches) {
265             trim(c);
266         }
267     }
268 
269     private static void trim(MemoryRegionCache<?> cache) {
270         if (cache == null) {
271             return;
272         }
273         cache.trim();
274     }
275 
276     private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
277         int idx = PoolArena.tinyIdx(normCapacity);
278         if (area.isDirect()) {
279             return cache(tinySubPageDirectCaches, idx);
280         }
281         return cache(tinySubPageHeapCaches, idx);
282     }
283 
284     private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
285         int idx = PoolArena.smallIdx(normCapacity);
286         if (area.isDirect()) {
287             return cache(smallSubPageDirectCaches, idx);
288         }
289         return cache(smallSubPageHeapCaches, idx);
290     }
291 
292     private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
293         if (area.isDirect()) {
294             int idx = log2(normCapacity >> numShiftsNormalDirect);
295             return cache(normalDirectCaches, idx);
296         }
297         int idx = log2(normCapacity >> numShiftsNormalHeap);
298         return cache(normalHeapCaches, idx);
299     }
300 
301     private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
302         if (cache == null || idx > cache.length - 1) {
303             return null;
304         }
305         return cache[idx];
306     }
307 
308     /**
309      * Cache used for buffers which are backed by TINY or SMALL size.
310      */
311     private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
312         SubPageMemoryRegionCache(int size) {
313             super(size);
314         }
315 
316         @Override
317         protected void initBuf(
318                 PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
319             chunk.initBufWithSubpage(buf, handle, reqCapacity);
320         }
321     }
322 
323     /**
324      * Cache used for buffers which are backed by NORMAL size.
325      */
326     private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
327         NormalMemoryRegionCache(int size) {
328             super(size);
329         }
330 
331         @Override
332         protected void initBuf(
333                 PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
334             chunk.initBuf(buf, handle, reqCapacity);
335         }
336     }
337 
338     /**
339      * Cache of {@link PoolChunk} and handles which can be used to allocate a buffer without locking at all.
340      */
341     private abstract static class MemoryRegionCache<T> {
342         private final Entry<T>[] entries;
343         private final int maxUnusedCached;
344         private int head;
345         private int tail;
346         private int maxEntriesInUse;
347         private int entriesInUse;
348 
349         @SuppressWarnings("unchecked")
350         MemoryRegionCache(int size) {
351             entries = new Entry[powerOfTwo(size)];
352             for (int i = 0; i < entries.length; i++) {
353                 entries[i] = new Entry<T>();
354             }
355             maxUnusedCached = size / 2;
356         }
357 
358         private static int powerOfTwo(int res) {
359             if (res <= 2) {
360                 return 2;
361             }
362             res--;
363             res |= res >> 1;
364             res |= res >> 2;
365             res |= res >> 4;
366             res |= res >> 8;
367             res |= res >> 16;
368             res++;
369             return res;
370         }
371 
372         /**
373          * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions.
374          */
375         protected abstract void initBuf(PoolChunk<T> chunk, long handle,
376                                         PooledByteBuf<T> buf, int reqCapacity);
377 
378         /**
379          * Add to cache if not already full.
380          */
381         public boolean add(PoolChunk<T> chunk, long handle) {
382             Entry<T> entry = entries[tail];
383             if (entry.chunk != null) {
384                 // cache is full
385                 return false;
386             }
387             entriesInUse --;
388 
389             entry.chunk = chunk;
390             entry.handle = handle;
391             tail = nextIdx(tail);
392             return true;
393         }
394 
395         /**
396          * Allocate something out of the cache if possible and remove the entry from the cache.
397          */
398         public boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
399             Entry<T> entry = entries[head];
400             if (entry.chunk == null) {
401                 return false;
402             }
403 
404             entriesInUse ++;
405             if (maxEntriesInUse < entriesInUse) {
406                 maxEntriesInUse = entriesInUse;
407             }
408             initBuf(entry.chunk, entry.handle, buf, reqCapacity);
409             // only null out the chunk as we only use the chunk to check if the buffer is full or not.
410             entry.chunk = null;
411             head = nextIdx(head);
412             return true;
413         }
414 
415         /**
416          * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s.
417          */
418         public int free() {
419             int numFreed = 0;
420             entriesInUse = 0;
421             maxEntriesInUse = 0;
422             for (int i = head;; i = nextIdx(i)) {
423                 if (freeEntry(entries[i])) {
424                     numFreed++;
425                 } else {
426                     // all cleared
427                     return numFreed;
428                 }
429             }
430         }
431 
432         /**
433          * Free up cached {@link PoolChunk}s if not allocated frequently enough.
434          */
435         private void trim() {
436             int free = size() - maxEntriesInUse;
437             entriesInUse = 0;
438             maxEntriesInUse = 0;
439 
440             if (free <= maxUnusedCached) {
441                 return;
442             }
443 
444             int i = head;
445             for (; free > 0; free--) {
446                 if (!freeEntry(entries[i])) {
447                     // all freed
448                     break;
449                 }
450                 i = nextIdx(i);
451             }
452 
453             // Update head to point to te correct entry
454             // See https://github.com/netty/netty/issues/2924
455             head = i;
456         }
457 
458         @SuppressWarnings({ "unchecked", "rawtypes" })
459         private static boolean freeEntry(Entry entry) {
460             PoolChunk chunk = entry.chunk;
461             if (chunk == null) {
462                 return false;
463             }
464             // need to synchronize on the area from which it was allocated before.
465             synchronized (chunk.arena) {
466                 chunk.parent.free(chunk, entry.handle);
467             }
468             entry.chunk = null;
469             return true;
470         }
471 
472         /**
473          * Return the number of cached entries.
474          */
475         private int size()  {
476             return tail - head & entries.length - 1;
477         }
478 
479         private int nextIdx(int index) {
480             // use bitwise operation as this is faster as using modulo.
481             return index + 1 & entries.length - 1;
482         }
483 
484         private static final class Entry<T> {
485             PoolChunk<T> chunk;
486             long handle;
487         }
488     }
489 }