View Javadoc
1   /*
2    * Copyright 2021 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.buffer.api.pool;
17  
18  import io.netty5.buffer.api.AllocationType;
19  import io.netty5.buffer.api.MemoryManager;
20  import io.netty5.util.internal.StringUtil;
21  
22  import java.lang.invoke.MethodHandles;
23  import java.lang.invoke.VarHandle;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.concurrent.atomic.AtomicInteger;
27  import java.util.concurrent.atomic.LongAdder;
28  import java.util.concurrent.locks.ReentrantLock;
29  
30  import static io.netty5.buffer.api.pool.PoolChunk.isSubpage;
31  import static java.lang.Math.max;
32  
33  class PoolArena extends SizeClasses implements PoolArenaMetric {
34      private static final VarHandle SUBPAGE_ARRAY = MethodHandles.arrayElementVarHandle(PoolSubpage[].class);
35      enum SizeClass {
36          Small,
37          Normal
38      }
39  
40      final PooledBufferAllocator parent;
41      final MemoryManager manager;
42      final AllocationType allocationType;
43  
44      final int numSmallSubpagePools;
45      final int directMemoryCacheAlignment;
46      private final PoolSubpage[] smallSubpagePools;
47  
48      private final PoolChunkList q050;
49      private final PoolChunkList q025;
50      private final PoolChunkList q000;
51      private final PoolChunkList qInit;
52      private final PoolChunkList q075;
53      private final PoolChunkList q100;
54  
55      private final List<PoolChunkListMetric> chunkListMetrics;
56  
57      // Metrics for allocations and deallocations
58      private long allocationsNormal;
59  
60      // We need to use the LongAdder here as this is not guarded via synchronized block.
61      private final LongAdder allocationsSmall = new LongAdder();
62      private final LongAdder allocationsHuge = new LongAdder();
63      private final LongAdder activeBytesHuge = new LongAdder();
64  
65      private long deallocationsSmall;
66      private long deallocationsNormal;
67  
68      // We need to use the LongAdder here as this is not guarded via synchronized block.
69      private final LongAdder deallocationsHuge = new LongAdder();
70  
71      // Number of thread caches backed by this arena.
72      final AtomicInteger numThreadCaches = new AtomicInteger();
73  
74      private final ReentrantLock lock = new ReentrantLock();
75  
76      protected PoolArena(PooledBufferAllocator parent, MemoryManager manager, AllocationType allocationType,
77                          int pageSize, int pageShifts, int chunkSize, int cacheAlignment) {
78          super(pageSize, pageShifts, chunkSize, cacheAlignment);
79          this.parent = parent;
80          this.manager = manager;
81          this.allocationType = allocationType;
82          directMemoryCacheAlignment = cacheAlignment;
83  
84          numSmallSubpagePools = nSubpages;
85          smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
86  
87          q100 = new PoolChunkList(this, null, 100, Integer.MAX_VALUE, chunkSize);
88          q075 = new PoolChunkList(this, q100, 75, 100, chunkSize);
89          q050 = new PoolChunkList(this, q075, 50, 100, chunkSize);
90          q025 = new PoolChunkList(this, q050, 25, 75, chunkSize);
91          q000 = new PoolChunkList(this, q025, 1, 50, chunkSize);
92          qInit = new PoolChunkList(this, q000, Integer.MIN_VALUE, 25, chunkSize);
93  
94          q100.prevList(q075);
95          q075.prevList(q050);
96          q050.prevList(q025);
97          q025.prevList(q000);
98          q000.prevList(null);
99          qInit.prevList(qInit);
100 
101         chunkListMetrics = List.of(qInit, q000, q025, q050, q075, q100);
102     }
103 
104     private static PoolSubpage newSubpagePoolHead() {
105         PoolSubpage head = new PoolSubpage();
106         head.prev = head;
107         head.next = head;
108         return head;
109     }
110 
111     private static PoolSubpage[] newSubpagePoolArray(int size) {
112         return new PoolSubpage[size];
113     }
114 
115     UntetheredMemory allocate(PoolThreadCache cache, int size) {
116         final int sizeIdx = size2SizeIdx(size);
117 
118         if (sizeIdx <= smallMaxSizeIdx) {
119             return tcacheAllocateSmall(cache, size, sizeIdx);
120         } else if (sizeIdx < nSizes) {
121             return tcacheAllocateNormal(cache, size, sizeIdx);
122         } else {
123             int normCapacity = directMemoryCacheAlignment > 0
124                     ? normalizeSize(size) : size;
125             // Huge allocations are never served via the cache so just call allocateHuge
126             return allocateHuge(normCapacity);
127         }
128     }
129 
130     private UntetheredMemory tcacheAllocateSmall(PoolThreadCache cache, final int size, final int sizeIdx) {
131         UntetheredMemory memory = cache.allocateSmall(size, sizeIdx);
132         if (memory != null) {
133             // was able to allocate out of the cache so move on
134             return memory;
135         }
136 
137         /*
138          * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
139          * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
140          */
141         PoolSubpage head = findSubpagePoolHead(sizeIdx);
142         final boolean needsNormalAllocation;
143         head.lock();
144         try {
145             final PoolSubpage s = head.next;
146             needsNormalAllocation = s == head;
147             if (!needsNormalAllocation) {
148                 assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx) :
149                         "doNotDestroy=" + s.doNotDestroy + ", elemSize=" + s.elemSize + ", sizeIdx=" + sizeIdx;
150                 long handle = s.allocate();
151                 assert handle >= 0;
152                 memory = s.chunk.allocateBufferWithSubpage(handle, size, cache);
153             }
154         } finally {
155             head.unlock();
156         }
157 
158         if (needsNormalAllocation) {
159             lock();
160             try {
161                 memory = allocateNormal(size, sizeIdx, cache);
162             } finally {
163                 unlock();
164             }
165         }
166 
167         incSmallAllocation();
168         return memory;
169     }
170 
171     private UntetheredMemory tcacheAllocateNormal(
172             PoolThreadCache cache, int size, int sizeIdx) {
173         UntetheredMemory memory = cache.allocateNormal(this, size, sizeIdx);
174         if (memory != null) {
175             // was able to allocate out of the cache so move on
176             return memory;
177         }
178         lock();
179         try {
180             memory = allocateNormal(size, sizeIdx, cache);
181             allocationsNormal++;
182         } finally {
183             unlock();
184         }
185         return memory;
186     }
187 
188     private UntetheredMemory allocateNormal(int size, int sizeIdx, PoolThreadCache threadCache) {
189         assert lock.isHeldByCurrentThread();
190         UntetheredMemory memory = q050.allocate(size, sizeIdx, threadCache);
191         if (memory != null) {
192             return memory;
193         }
194         memory = q025.allocate(size, sizeIdx, threadCache);
195         if (memory != null) {
196             return memory;
197         }
198         memory = q000.allocate(size, sizeIdx, threadCache);
199         if (memory != null) {
200             return memory;
201         }
202         memory = qInit.allocate(size, sizeIdx, threadCache);
203         if (memory != null) {
204             return memory;
205         }
206         memory = q075.allocate(size, sizeIdx, threadCache);
207         if (memory != null) {
208             return memory;
209         }
210 
211         // Add a new chunk.
212         PoolChunk c = newChunk(pageSize, nPSizes, pageShifts, chunkSize);
213         memory = c.allocate(size, sizeIdx, threadCache);
214         assert memory != null;
215         qInit.add(c);
216         return memory;
217     }
218 
219     private void incSmallAllocation() {
220         allocationsSmall.increment();
221     }
222 
223     private UntetheredMemory allocateHuge(int size) {
224         activeBytesHuge.add(size);
225         allocationsHuge.increment();
226         return new UnpooledUntetheredMemory(parent, manager, allocationType, size);
227     }
228 
229     void free(PoolChunk chunk, long handle, int normCapacity, PoolThreadCache cache) {
230         SizeClass sizeClass = sizeClass(handle);
231         if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
232             // cached so not free it.
233             return;
234         }
235         freeChunk(chunk, handle, normCapacity, sizeClass);
236     }
237 
238     private static SizeClass sizeClass(long handle) {
239         return isSubpage(handle) ? SizeClass.Small : SizeClass.Normal;
240     }
241 
242     void freeChunk(PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
243         final boolean destroyChunk;
244         lock();
245         try {
246             if (sizeClass == SizeClass.Normal) {
247                 ++deallocationsNormal;
248             } else if (sizeClass == SizeClass.Small) {
249                 ++deallocationsSmall;
250             } else {
251                 throw new AssertionError("Unexpected size class: " + sizeClass);
252             }
253             destroyChunk = !chunk.parent.free(chunk, handle, normCapacity);
254         } finally {
255             unlock();
256         }
257         if (destroyChunk) {
258             // destroyChunk not need to be called while holding the synchronized lock.
259             chunk.destroy();
260         }
261     }
262 
263     PoolSubpage findSubpagePoolHead(int sizeIdx) {
264         PoolSubpage head = (PoolSubpage) SUBPAGE_ARRAY.getVolatile(smallSubpagePools, sizeIdx);
265         if (head == null) {
266             head = newSubpagePoolHead();
267             if (!SUBPAGE_ARRAY.compareAndSet(smallSubpagePools, sizeIdx, null, head)) {
268                 // We lost the race. Read the winning value.
269                 head = (PoolSubpage) SUBPAGE_ARRAY.getVolatile(smallSubpagePools, sizeIdx);
270             }
271         }
272         return head;
273     }
274 
275     @Override
276     public int numThreadCaches() {
277         return numThreadCaches.get();
278     }
279 
280     @Override
281     public int numSmallSubpages() {
282         return smallSubpagePools.length;
283     }
284 
285     @Override
286     public int numChunkLists() {
287         return chunkListMetrics.size();
288     }
289 
290     @Override
291     public List<PoolSubpageMetric> smallSubpages() {
292         return subPageMetricList(smallSubpagePools);
293     }
294 
295     @Override
296     public List<PoolChunkListMetric> chunkLists() {
297         return chunkListMetrics;
298     }
299 
300     private static List<PoolSubpageMetric> subPageMetricList(PoolSubpage[] pages) {
301         List<PoolSubpageMetric> metrics = new ArrayList<>();
302         for (int i = 0, len = pages.length; i < len; i++) {
303             PoolSubpage head = (PoolSubpage) SUBPAGE_ARRAY.getVolatile(pages, i);
304             if (head == null || head.next == head) {
305                 continue;
306             }
307             PoolSubpage s = head.next;
308             do {
309                 metrics.add(s);
310                 s = s.next;
311             } while (s != head);
312         }
313         return metrics;
314     }
315 
316     @Override
317     public long numAllocations() {
318         final long allocsNormal;
319         lock();
320         try {
321             allocsNormal = allocationsNormal;
322         } finally {
323             unlock();
324         }
325 
326         return allocationsSmall.longValue() + allocsNormal + allocationsHuge.longValue();
327     }
328 
329     @Override
330     public long numSmallAllocations() {
331         return allocationsSmall.longValue();
332     }
333 
334     @Override
335     public long numNormalAllocations() {
336         lock();
337         try {
338             return allocationsNormal;
339         } finally {
340             unlock();
341         }
342     }
343 
344     @Override
345     public long numDeallocations() {
346         final long deallocs;
347         lock();
348         try {
349             deallocs = deallocationsSmall + deallocationsNormal;
350         } finally {
351             unlock();
352         }
353         return deallocs + deallocationsHuge.longValue();
354     }
355 
356     @Override
357     public long numSmallDeallocations() {
358         lock();
359         try {
360             return deallocationsSmall;
361         } finally {
362             unlock();
363         }
364     }
365 
366     @Override
367     public long numNormalDeallocations() {
368         lock();
369         try {
370             return deallocationsNormal;
371         } finally {
372             unlock();
373         }
374     }
375 
376     @Override
377     public long numHugeAllocations() {
378         return allocationsHuge.longValue();
379     }
380 
381     @Override
382     public long numHugeDeallocations() {
383         return deallocationsHuge.longValue();
384     }
385 
386     @Override
387     public  long numActiveAllocations() {
388         long val = allocationsSmall.longValue() + allocationsHuge.longValue()
389                 - deallocationsHuge.longValue();
390         lock();
391         try {
392             val += allocationsNormal - (deallocationsSmall + deallocationsNormal);
393         } finally {
394             unlock();
395         }
396         return max(val, 0);
397     }
398 
399     @Override
400     public long numActiveSmallAllocations() {
401         return max(numSmallAllocations() - numSmallDeallocations(), 0);
402     }
403 
404     @Override
405     public long numActiveNormalAllocations() {
406         final long val;
407         lock();
408         try {
409             val = allocationsNormal - deallocationsNormal;
410         } finally {
411             unlock();
412         }
413         return max(val, 0);
414     }
415 
416     @Override
417     public long numActiveHugeAllocations() {
418         return max(numHugeAllocations() - numHugeDeallocations(), 0);
419     }
420 
421     @Override
422     public long numActiveBytes() {
423         long val = activeBytesHuge.longValue();
424         lock();
425         try {
426             for (int i = 0; i < chunkListMetrics.size(); i++) {
427                 for (PoolChunkMetric m: chunkListMetrics.get(i)) {
428                     val += m.chunkSize();
429                 }
430             }
431         } finally {
432             unlock();
433         }
434         return max(0, val);
435     }
436 
437     @Override
438     public long numPinnedBytes() {
439         long val = activeBytesHuge.longValue();
440         lock();
441         try {
442             for (int i = 0; i < chunkListMetrics.size(); i++) {
443                 for (PoolChunkMetric m: chunkListMetrics.get(i)) {
444                     val += m.pinnedBytes();
445                 }
446             }
447         } finally {
448             unlock();
449         }
450         return max(0, val);
451     }
452 
453     protected final PoolChunk newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {
454         return new PoolChunk(this, pageSize, pageShifts, chunkSize, maxPageIdx);
455     }
456 
457     @Override
458     public String toString() {
459         lock();
460         try {
461             StringBuilder buf = new StringBuilder()
462                     .append("Chunk(s) at 0~25%:")
463                     .append(StringUtil.NEWLINE)
464                     .append(qInit)
465                     .append(StringUtil.NEWLINE)
466                     .append("Chunk(s) at 0~50%:")
467                     .append(StringUtil.NEWLINE)
468                     .append(q000)
469                     .append(StringUtil.NEWLINE)
470                     .append("Chunk(s) at 25~75%:")
471                     .append(StringUtil.NEWLINE)
472                     .append(q025)
473                     .append(StringUtil.NEWLINE)
474                     .append("Chunk(s) at 50~100%:")
475                     .append(StringUtil.NEWLINE)
476                     .append(q050)
477                     .append(StringUtil.NEWLINE)
478                     .append("Chunk(s) at 75~100%:")
479                     .append(StringUtil.NEWLINE)
480                     .append(q075)
481                     .append(StringUtil.NEWLINE)
482                     .append("Chunk(s) at 100%:")
483                     .append(StringUtil.NEWLINE)
484                     .append(q100)
485                     .append(StringUtil.NEWLINE)
486                     .append("small subpages:");
487             appendPoolSubPages(buf, smallSubpagePools);
488             buf.append(StringUtil.NEWLINE);
489 
490             return buf.toString();
491         } finally {
492             unlock();
493         }
494     }
495 
496     private static void appendPoolSubPages(StringBuilder buf, PoolSubpage[] subpages) {
497         for (int i = 0; i < subpages.length; i ++) {
498             PoolSubpage head = (PoolSubpage) SUBPAGE_ARRAY.getVolatile(subpages, i);
499             if (head == null || head.next == head) {
500                 continue;
501             }
502 
503             buf.append(StringUtil.NEWLINE)
504                     .append(i)
505                     .append(": ");
506             PoolSubpage s = head.next;
507             do {
508                 buf.append(s);
509                 s = s.next;
510             } while (s != head);
511         }
512     }
513 
514     public void close() {
515         for (int i = 0, len = smallSubpagePools.length; i < len; i++) {
516             PoolSubpage page = (PoolSubpage) SUBPAGE_ARRAY.getVolatile(smallSubpagePools, i);
517             if (page != null) {
518                 page.destroy();
519                 SUBPAGE_ARRAY.setVolatile(smallSubpagePools, i, null);
520             }
521         }
522         for (PoolChunkList list : new PoolChunkList[] {qInit, q000, q025, q050, q100}) {
523             list.destroy();
524         }
525     }
526 
527     void lock() {
528         lock.lock();
529     }
530 
531     void unlock() {
532         lock.unlock();
533     }
534 }