View Javadoc
1   /*
2    * Copyright 2021 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.buffer.api.pool;
17  
18  import io.netty5.buffer.api.pool.PoolArena.SizeClass;
19  import io.netty5.util.internal.MathUtil;
20  import io.netty5.util.internal.ObjectPool;
21  import io.netty5.util.internal.ObjectPool.Handle;
22  import io.netty5.util.internal.PlatformDependent;
23  import io.netty5.util.internal.logging.InternalLogger;
24  import io.netty5.util.internal.logging.InternalLoggerFactory;
25  
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.Queue;
29  import java.util.concurrent.atomic.AtomicInteger;
30  
31  import static io.netty5.buffer.api.pool.PoolArena.SizeClass.Normal;
32  import static io.netty5.buffer.api.pool.PoolArena.SizeClass.Small;
33  import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
34  
35  /**
36   * Acts a Thread cache for allocations. This implementation is modelled after
37   * <a href="https://people.freebsd.org/~jasone/jemalloc/bsdcan2006/jemalloc.pdf">jemalloc</a> and the described
38   * techniques of
39   * <a href="https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919">
40   * Scalable memory allocation using jemalloc</a>.
41   */
42  final class PoolThreadCache {
43  
44      private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
45      private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;
46  
47      final AtomicInteger arenaReferenceCounter;
48  
49      private final PoolArena arena;
50      // Hold the caches for the different size classes, which are small and normal.
51      private final MemoryRegionCache[] smallSubPageCaches;
52      private final MemoryRegionCache[] normalCaches;
53  
54      private final int freeSweepAllocationThreshold;
55  
56      private int allocations;
57  
58      PoolThreadCache(PoolArena arena,
59                      int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity,
60                      int freeSweepAllocationThreshold) {
61          checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
62          this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
63          if (arena != null) {
64              // Create the caches for the heap allocations
65              MemoryRegionCache[] smallSubPageCaches = createSubPageCaches(
66                      smallCacheSize, arena.numSmallSubpagePools);
67  
68              MemoryRegionCache[] normalCaches = createNormalCaches(
69                      normalCacheSize, maxCachedBufferCapacity, arena);
70  
71              // Only check if there are caches in use.
72              if ((smallSubPageCaches != null || normalCaches != null)
73                  && freeSweepAllocationThreshold < 1) {
74                  throw new IllegalArgumentException("freeSweepAllocationThreshold: "
75                                                     + freeSweepAllocationThreshold + " (expected: > 0)");
76              }
77  
78              this.arena = arena;
79              this.smallSubPageCaches = smallSubPageCaches;
80              this.normalCaches = normalCaches;
81              arenaReferenceCounter = arena.numThreadCaches;
82              arenaReferenceCounter.getAndIncrement();
83          } else {
84              // No heapArea is configured so just null out all caches
85              this.arena = null;
86              smallSubPageCaches = null;
87              normalCaches = null;
88              arenaReferenceCounter = null;
89          }
90      }
91  
92      private static MemoryRegionCache[] createSubPageCaches(
93              int cacheSize, int numCaches) {
94          if (cacheSize > 0 && numCaches > 0) {
95              MemoryRegionCache[] cache = new MemoryRegionCache[numCaches];
96              for (int i = 0; i < cache.length; i++) {
97                  // TODO: maybe use cacheSize / cache.length
98                  cache[i] = new SubPageMemoryRegionCache(cacheSize);
99              }
100             return cache;
101         } else {
102             return null;
103         }
104     }
105 
106     private static MemoryRegionCache[] createNormalCaches(
107             int cacheSize, int maxCachedBufferCapacity, PoolArena area) {
108         if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
109             int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
110 
111             // Create as many normal caches as we support based on how many sizeIdx we have and what the upper
112             // bound is that we want to cache in general.
113             List<MemoryRegionCache> cache = new ArrayList<>() ;
114             for (int idx = area.numSmallSubpagePools; idx < area.nSizes && area.sizeIdx2size(idx) <= max ; idx++) {
115                 cache.add(new NormalMemoryRegionCache(cacheSize));
116             }
117             return cache.toArray(MemoryRegionCache[]::new);
118         } else {
119             return null;
120         }
121     }
122 
123     PoolArena getArena() {
124         return arena;
125     }
126 
127     // val > 0
128     static int log2(int val) {
129         return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);
130     }
131 
132     /**
133      * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
134      */
135     UntetheredMemory allocateSmall(int size, int sizeIdx) {
136         return allocate(cacheForSmall(sizeIdx), size);
137     }
138 
139     /**
140      * Try to allocate a normal buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
141      */
142     UntetheredMemory allocateNormal(PoolArena area, int size, int sizeIdx) {
143         return allocate(cacheForNormal(area, sizeIdx), size);
144     }
145 
146     private UntetheredMemory allocate(MemoryRegionCache cache, int size) {
147         if (cache == null) {
148             // no cache found so just return false here
149             return null;
150         }
151         UntetheredMemory allocated = cache.allocate(size, this);
152         if (++allocations >= freeSweepAllocationThreshold) {
153             allocations = 0;
154             trim();
155         }
156         return allocated;
157     }
158 
159     /**
160      * Add {@link PoolChunk} and {@code handle} to the cache if there is enough room.
161      * Returns {@code true} if it fit into the cache {@code false} otherwise.
162      */
163     boolean add(PoolArena area, PoolChunk chunk,
164                 long handle, int normCapacity, SizeClass sizeClass) {
165         int sizeIdx = area.size2SizeIdx(normCapacity);
166         MemoryRegionCache cache = cache(area, sizeIdx, sizeClass);
167         if (cache == null) {
168             return false;
169         }
170         return cache.add(chunk, handle, normCapacity);
171     }
172 
173     private MemoryRegionCache cache(PoolArena area, int sizeIdx, SizeClass sizeClass) {
174         if (sizeClass == Normal) {
175             return cacheForNormal(area, sizeIdx);
176         }
177         if (sizeClass == Small) {
178             return cacheForSmall(sizeIdx);
179         }
180         throw new AssertionError("Unexpected size class: " + sizeClass);
181     }
182 
183     /**
184      *  Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
185      */
186     void free() {
187         if (arena != null) {
188             int numFreed = free(smallSubPageCaches) + free(normalCaches);
189 
190             if (numFreed > 0 && logger.isDebugEnabled()) {
191                 logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed,
192                              Thread.currentThread().getName());
193             }
194         }
195 
196         if (arenaReferenceCounter != null) {
197             arenaReferenceCounter.getAndDecrement();
198         }
199     }
200 
201     private static int free(MemoryRegionCache[] caches) {
202         if (caches == null) {
203             return 0;
204         }
205 
206         int numFreed = 0;
207         for (MemoryRegionCache c: caches) {
208             numFreed += free(c);
209         }
210         return numFreed;
211     }
212 
213     private static int free(MemoryRegionCache cache) {
214         if (cache == null) {
215             return 0;
216         }
217         return cache.free();
218     }
219 
220     void trim() {
221         if (arena != null) {
222             trim(smallSubPageCaches);
223             trim(normalCaches);
224         }
225     }
226 
227     private static void trim(MemoryRegionCache[] caches) {
228         if (caches == null) {
229             return;
230         }
231         for (MemoryRegionCache c: caches) {
232             trim(c);
233         }
234     }
235 
236     private static void trim(MemoryRegionCache cache) {
237         if (cache == null) {
238             return;
239         }
240         cache.trim();
241     }
242 
243     private MemoryRegionCache cacheForSmall(int sizeIdx) {
244         if (arena != null) {
245             return cache(smallSubPageCaches, sizeIdx);
246         }
247         return null;
248     }
249 
250     private MemoryRegionCache cacheForNormal(PoolArena area, int sizeIdx) {
251         if (area != null) {
252             // We need to substract area.numSmallSubpagePools as sizeIdx is the overall index for all sizes.
253             int idx = sizeIdx - area.numSmallSubpagePools;
254             return cache(normalCaches, idx);
255         }
256         return null;
257     }
258 
259     private static  MemoryRegionCache cache(MemoryRegionCache[] cache, int sizeIdx) {
260         if (cache == null || sizeIdx > cache.length - 1) {
261             return null;
262         }
263         return cache[sizeIdx];
264     }
265 
266     /**
267      * Cache used for buffers which are backed by SMALL size.
268      */
269     private static final class SubPageMemoryRegionCache extends MemoryRegionCache {
270         SubPageMemoryRegionCache(int size) {
271             super(size, Small);
272         }
273 
274         @Override
275         protected UntetheredMemory allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache) {
276             return chunk.allocateBufferWithSubpage(handle, size, threadCache);
277         }
278     }
279 
280     /**
281      * Cache used for buffers which are backed by NORMAL size.
282      */
283     private static final class NormalMemoryRegionCache extends MemoryRegionCache {
284         NormalMemoryRegionCache(int size) {
285             super(size, Normal);
286         }
287 
288         @Override
289         protected UntetheredMemory allocBuf(PoolChunk chunk, long handle, int size, PoolThreadCache threadCache) {
290             return chunk.allocateBuffer(handle, size, threadCache);
291         }
292     }
293 
294     private abstract static class MemoryRegionCache {
295         private final int size;
296         private final Queue<Entry> queue;
297         private final SizeClass sizeClass;
298         private int allocations;
299 
300         MemoryRegionCache(int size, SizeClass sizeClass) {
301             this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
302             queue = PlatformDependent.newFixedMpscQueue(this.size);
303             this.sizeClass = sizeClass;
304         }
305 
306         /**
307          * Allocate a new {@link UntetheredMemory} using the provided chunk and handle with the capacity restrictions.
308          */
309         protected abstract UntetheredMemory allocBuf(
310                 PoolChunk chunk, long handle, int size, PoolThreadCache threadCache);
311 
312         /**
313          * Add to cache if not already full.
314          */
315         public final boolean add(PoolChunk chunk, long handle, int normCapacity) {
316             Entry entry = newEntry(chunk, handle, normCapacity);
317             boolean queued = queue.offer(entry);
318             if (!queued) {
319                 // If it was not possible to cache the chunk, immediately recycle the entry
320                 entry.recycle();
321             }
322 
323             return queued;
324         }
325 
326         /**
327          * Allocate something out of the cache if possible and remove the entry from the cache.
328          */
329         public final UntetheredMemory allocate(int size, PoolThreadCache threadCache) {
330             Entry entry = queue.poll();
331             if (entry == null) {
332                 return null;
333             }
334             UntetheredMemory buffer = allocBuf(entry.chunk, entry.handle, size, threadCache);
335             entry.recycle();
336 
337             // allocations are not thread-safe which is fine as this is only called from the same thread all time.
338             allocations++;
339             return buffer;
340         }
341 
342         /**
343          * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s.
344          */
345         public final int free() {
346             return free(Integer.MAX_VALUE);
347         }
348 
349         private int free(int max) {
350             int numFreed = 0;
351             for (; numFreed < max; numFreed++) {
352                 Entry entry = queue.poll();
353                 if (entry != null) {
354                     freeEntry(entry);
355                 } else {
356                     // all cleared
357                     return numFreed;
358                 }
359             }
360             return numFreed;
361         }
362 
363         /**
364          * Free up cached {@link PoolChunk}s if not allocated frequently enough.
365          */
366         public final void trim() {
367             int free = size - allocations;
368             allocations = 0;
369 
370             // We not even allocated all the number that are
371             if (free > 0) {
372                 free(free);
373             }
374         }
375 
376         private  void freeEntry(Entry entry) {
377             PoolChunk chunk = entry.chunk;
378             long handle = entry.handle;
379 
380             entry.recycle();
381             chunk.arena.freeChunk(chunk, handle, entry.normCapacity, sizeClass);
382         }
383 
384         static final class Entry {
385             final Handle<Entry> recyclerHandle;
386             PoolChunk chunk;
387             long handle = -1;
388             int normCapacity;
389 
390             Entry(Handle<Entry> recyclerHandle) {
391                 this.recyclerHandle = recyclerHandle;
392             }
393 
394             void recycle() {
395                 chunk = null;
396                 handle = -1;
397                 recyclerHandle.recycle(this);
398             }
399         }
400 
401         private static Entry newEntry(PoolChunk chunk, long handle, int normCapacity) {
402             Entry entry = RECYCLER.get();
403             entry.chunk = chunk;
404             entry.handle = handle;
405             entry.normCapacity = normCapacity;
406             return entry;
407         }
408 
409         private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(Entry::new);
410     }
411 }