View Javadoc
1   /*
2    * Copyright 2021 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.buffer.api.pool;
17  
18  import io.netty5.buffer.api.AllocatorControl;
19  import io.netty5.buffer.api.Buffer;
20  import io.netty5.buffer.api.Drop;
21  import io.netty5.buffer.api.MemoryManager;
22  import io.netty5.buffer.api.internal.ArcDrop;
23  import io.netty5.buffer.api.internal.CleanerDrop;
24  import io.netty5.buffer.api.internal.DropCaptor;
25  import io.netty5.util.internal.LongLongHashMap;
26  import io.netty5.util.internal.LongPriorityQueue;
27  import io.netty5.util.internal.logging.InternalLogger;
28  import io.netty5.util.internal.logging.InternalLoggerFactory;
29  
30  import java.util.PriorityQueue;
31  import java.util.concurrent.locks.ReentrantLock;
32  
33  /**
34   * Description of algorithm for PageRun/PoolSubpage allocation from PoolChunk
35   *
36   * Notation: The following terms are important to understand the code
37   * > page  - a page is the smallest unit of memory chunk that can be allocated
38   * > run   - a run is a collection of pages
39   * > chunk - a chunk is a collection of runs
40   * > in this code chunkSize = maxPages * pageSize
41   *
42   * To begin we allocate a byte array of size = chunkSize
43   * Whenever a ByteBuf of given size needs to be created we search for the first position
44   * in the byte array that has enough empty space to accommodate the requested size and
45   * return a (long) handle that encodes this offset information, (this memory segment is then
46   * marked as reserved, so it is always used by exactly one ByteBuf and no more)
47   *
48   * For simplicity all sizes are normalized according to {@link PoolArena#size2SizeIdx(int)} method.
49   * This ensures that when we request for memory segments of size > pageSize the normalizedCapacity
50   * equals the next nearest size in {@link SizeClasses}.
51   *
52   *
53   *  A chunk has the following layout:
54   *
55   *     /-----------------\
56   *     | run             |
57   *     |                 |
58   *     |                 |
59   *     |-----------------|
60   *     | run             |
61   *     |                 |
62   *     |-----------------|
63   *     | unalloctated    |
64   *     | (freed)         |
65   *     |                 |
66   *     |-----------------|
67   *     | subpage         |
68   *     |-----------------|
69   *     | unallocated     |
70   *     | (freed)         |
71   *     | ...             |
72   *     | ...             |
73   *     | ...             |
74   *     |                 |
75   *     |                 |
76   *     |                 |
77   *     \-----------------/
78   *
79   *
80   * handle:
81   * -------
82   * a handle is a long number, the bit layout of a run looks like:
83   *
84   * oooooooo ooooooos ssssssss ssssssue bbbbbbbb bbbbbbbb bbbbbbbb bbbbbbbb
85   *
86   * o: runOffset (page offset in the chunk), 15bit
87   * s: size (number of pages) of this run, 15bit
88   * u: isUsed?, 1bit
89   * e: isSubpage?, 1bit
90   * b: bitmapIdx of subpage, zero if it's not subpage, 32bit
91   *
92   * runsAvailMap:
93   * ------
94   * a map which manages all runs (used and not in used).
95   * For each run, the first runOffset and last runOffset are stored in runsAvailMap.
96   * key: runOffset
97   * value: handle
98   *
99   * runsAvail:
100  * ----------
101  * an array of {@link PriorityQueue}.
102  * Each queue manages same size of runs.
103  * Runs are sorted by offset, so that we always allocate runs with smaller offset.
104  *
105  *
106  * Algorithm:
107  * ----------
108  *
109  *   As we allocate runs, we update values stored in runsAvailMap and runsAvail so that the property is maintained.
110  *
111  * Initialization -
112  *  In the beginning we store the initial run which is the whole chunk.
113  *  The initial run:
114  *  runOffset = 0
115  *  size = chunkSize
116  *  isUsed = no
117  *  isSubpage = no
118  *  bitmapIdx = 0
119  *
120  *
121  * Algorithm: [allocateRun(size)]
122  * ----------
123  * 1) find the first avail run using in runsAvails according to size
124  * 2) if pages of run is larger than request pages then split it, and save the tailing run
125  *    for later using
126  *
127  * Algorithm: [allocateSubpage(size)]
128  * ----------
129  * 1) find a not full subpage according to size.
130  *    if it already exists just return, otherwise allocate a new PoolSubpage and call init()
131  *    note that this subpage object is added to subpagesPool in the PoolArena when we init() it
132  * 2) call subpage.allocate()
133  *
134  * Algorithm: [free(handle, length, nioBuffer)]
135  * ----------
136  * 1) if it is a subpage, return the slab back into this subpage
137  * 2) if the subpage is not used, or it is a run, then start free this run
138  * 3) merge continuous avail runs
139  * 4) save the merged run
140  *
141  */
142 final class PoolChunk implements PoolChunkMetric {
143     private static final InternalLogger LOGGER = InternalLoggerFactory.getInstance(PoolChunk.class);
144     private static final int SIZE_BIT_LENGTH = 15;
145     private static final int INUSED_BIT_LENGTH = 1;
146     private static final int SUBPAGE_BIT_LENGTH = 1;
147     private static final int BITMAP_IDX_BIT_LENGTH = 32;
148     private static final AllocatorControl CONTROL = () -> {
149         throw new AssertionError("PoolChunk base allocations should never need to access their allocator.");
150     };
151 
152     static final int IS_SUBPAGE_SHIFT = BITMAP_IDX_BIT_LENGTH;
153     static final int IS_USED_SHIFT = SUBPAGE_BIT_LENGTH + IS_SUBPAGE_SHIFT;
154     static final int SIZE_SHIFT = INUSED_BIT_LENGTH + IS_USED_SHIFT;
155     static final int RUN_OFFSET_SHIFT = SIZE_BIT_LENGTH + SIZE_SHIFT;
156 
157     final PoolArena arena;
158     final Buffer base; // The buffer that is the source of the memory. Closing it will free the memory.
159     final Object memory;
160     final Drop<Buffer> baseDrop; // An ArcDrop that manages references to the base Buffer.
161 
162     /**
163      * store the first page and last page of each avail run
164      */
165     private final LongLongHashMap runsAvailMap;
166 
167     /**
168      * manage all avail runs
169      */
170     private final LongPriorityQueue[] runsAvail;
171 
172     private final ReentrantLock runsAvailLock;
173 
174     /**
175      * manage all subpages in this chunk
176      */
177     private final PoolSubpage[] subpages;
178 
179     private final int pageSize;
180     private final int pageShifts;
181     private final int chunkSize;
182 
183     int freeBytes;
184     int pinnedBytes;
185 
186     PoolChunkList parent;
187     PoolChunk prev;
188     PoolChunk next;
189 
190     PoolChunk(PoolArena arena, int pageSize, int pageShifts, int chunkSize, int maxPageIdx) {
191         this.arena = arena;
192         MemoryManager manager = arena.manager;
193         // Unlike a standard wrapping, the CleanerDrop needs to be inside the ArcDrop here, because it can only drop
194         // once. And we need the ArcDrop for the reference counting by every buffer allocated from this chunk.
195         DropCaptor<Buffer> dropCaptor = new DropCaptor<>();
196         base = manager.allocateShared(CONTROL, chunkSize, drop ->
197             dropCaptor.capture(ArcDrop.wrap(CleanerDrop.wrap(drop, manager))), arena.allocationType);
198         baseDrop = dropCaptor.getDrop();
199         memory = manager.unwrapRecoverableMemory(base);
200         baseDrop.attach(base);
201         this.pageSize = pageSize;
202         this.pageShifts = pageShifts;
203         this.chunkSize = chunkSize;
204         freeBytes = chunkSize;
205 
206         runsAvail = newRunsAvailqueueArray(maxPageIdx);
207         runsAvailLock = new ReentrantLock();
208         runsAvailMap = new LongLongHashMap(-1);
209         subpages = new PoolSubpage[chunkSize >> pageShifts];
210 
211         //insert initial run, offset = 0, pages = chunkSize / pageSize
212         int pages = chunkSize >> pageShifts;
213         long initHandle = (long) pages << SIZE_SHIFT;
214         insertAvailRun(0, pages, initHandle);
215     }
216 
217     private static LongPriorityQueue[] newRunsAvailqueueArray(int size) {
218         LongPriorityQueue[] queueArray = new LongPriorityQueue[size];
219         for (int i = 0; i < queueArray.length; i++) {
220             queueArray[i] = new LongPriorityQueue();
221         }
222         return queueArray;
223     }
224 
225     private void insertAvailRun(int runOffset, int pages, long handle) {
226         int pageIdxFloor = arena.pages2pageIdxFloor(pages);
227         LongPriorityQueue queue = runsAvail[pageIdxFloor];
228         queue.offer(handle);
229 
230         //insert first page of run
231         insertAvailRun0(runOffset, handle);
232         if (pages > 1) {
233             //insert last page of run
234             insertAvailRun0(lastPage(runOffset, pages), handle);
235         }
236     }
237 
238     private void insertAvailRun0(int runOffset, long handle) {
239         long pre = runsAvailMap.put(runOffset, handle);
240         assert pre == -1;
241     }
242 
243     private void removeAvailRun(long handle) {
244         int pageIdxFloor = arena.pages2pageIdxFloor(runPages(handle));
245         LongPriorityQueue queue = runsAvail[pageIdxFloor];
246         removeAvailRun(queue, handle);
247     }
248 
249     private void removeAvailRun(LongPriorityQueue queue, long handle) {
250         queue.remove(handle);
251 
252         int runOffset = runOffset(handle);
253         int pages = runPages(handle);
254         //remove first page of run
255         runsAvailMap.remove(runOffset);
256         if (pages > 1) {
257             //remove last page of run
258             runsAvailMap.remove(lastPage(runOffset, pages));
259         }
260     }
261 
262     private static int lastPage(int runOffset, int pages) {
263         return runOffset + pages - 1;
264     }
265 
266     private long getAvailRunByOffset(int runOffset) {
267         return runsAvailMap.get(runOffset);
268     }
269 
270     @Override
271     public int usage() {
272         final int freeBytes;
273         arena.lock();
274         try {
275             freeBytes = this.freeBytes;
276         } finally {
277             arena.unlock();
278         }
279         return usage(freeBytes);
280     }
281 
282     private int usage(int freeBytes) {
283         if (freeBytes == 0) {
284             return 100;
285         }
286 
287         int freePercentage = (int) (freeBytes * 100L / chunkSize);
288         if (freePercentage == 0) {
289             return 99;
290         }
291         return 100 - freePercentage;
292     }
293 
294     UntetheredMemory allocate(int size, int sizeIdx, PoolThreadCache cache) {
295         final long handle;
296         if (sizeIdx <= arena.smallMaxSizeIdx) {
297             // small
298             handle = allocateSubpage(sizeIdx);
299             if (handle < 0) {
300                 return null;
301             }
302             assert isSubpage(handle);
303         } else {
304             // normal
305             // runSize must be multiple of pageSize
306             int runSize = arena.sizeIdx2size(sizeIdx);
307             handle = allocateRun(runSize);
308             if (handle < 0) {
309                 return null;
310             }
311         }
312 
313         return allocateBuffer(handle, size, cache);
314     }
315 
316     private long allocateRun(int runSize) {
317         int pages = runSize >> pageShifts;
318         int pageIdx = arena.pages2pageIdx(pages);
319         runsAvailLock.lock();
320         try {
321             //find first queue which has at least one big enough run
322             int queueIdx = runFirstBestFit(pageIdx);
323             if (queueIdx == -1) {
324                 return -1;
325             }
326 
327             //get run with min offset in this queue
328             LongPriorityQueue queue = runsAvail[queueIdx];
329             long handle = queue.poll();
330 
331             assert handle != LongPriorityQueue.NO_VALUE && !isUsed(handle) : "invalid handle: " + handle;
332 
333             removeAvailRun(queue, handle);
334 
335             if (handle != -1) {
336                 handle = splitLargeRun(handle, pages);
337             }
338 
339             int pinnedSize = runSize(pageShifts, handle);
340             freeBytes -= pinnedSize;
341             pinnedBytes += pinnedSize;
342             return handle;
343         } finally {
344             runsAvailLock.unlock();
345         }
346     }
347 
348     private int calculateRunSize(int sizeIdx) {
349         int maxElements = 1 << pageShifts - SizeClasses.LOG2_QUANTUM;
350         int runSize = 0;
351         int nElements;
352 
353         final int elemSize = arena.sizeIdx2size(sizeIdx);
354 
355         // Find the lowest common multiple of pageSize and elemSize
356         do {
357             runSize += pageSize;
358             nElements = runSize / elemSize;
359         } while (nElements < maxElements && runSize != nElements * elemSize);
360 
361         while (nElements > maxElements) {
362             runSize -= pageSize;
363             nElements = runSize / elemSize;
364         }
365 
366         assert nElements > 0;
367         assert runSize <= chunkSize;
368         assert runSize >= elemSize;
369 
370         return runSize;
371     }
372 
373     private int runFirstBestFit(int pageIdx) {
374         if (freeBytes == chunkSize) {
375             return arena.nPSizes - 1;
376         }
377         for (int i = pageIdx; i < arena.nPSizes; i++) {
378             LongPriorityQueue queue = runsAvail[i];
379             if (queue != null && !queue.isEmpty()) {
380                 return i;
381             }
382         }
383         return -1;
384     }
385 
386     private long splitLargeRun(long handle, int needPages) {
387         assert needPages > 0;
388 
389         int totalPages = runPages(handle);
390         assert needPages <= totalPages;
391 
392         int remPages = totalPages - needPages;
393 
394         if (remPages > 0) {
395             int runOffset = runOffset(handle);
396 
397             // keep track of trailing unused pages for later use
398             int availOffset = runOffset + needPages;
399             long availRun = toRunHandle(availOffset, remPages, 0);
400             insertAvailRun(availOffset, remPages, availRun);
401 
402             // not avail
403             return toRunHandle(runOffset, needPages, 1);
404         }
405 
406         //mark it as used
407         handle |= 1L << IS_USED_SHIFT;
408         return handle;
409     }
410 
411     /**
412      * Create / initialize a new PoolSubpage of normCapacity. Any PoolSubpage created / initialized here is added to
413      * subpage pool in the PoolArena that owns this PoolChunk
414      *
415      * @param sizeIdx sizeIdx of normalized size
416      *
417      * @return index in memoryMap
418      */
419     private long allocateSubpage(int sizeIdx) {
420         // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
421         // This is need as we may add it back and so alter the linked-list structure.
422         PoolSubpage head = arena.findSubpagePoolHead(sizeIdx);
423         head.lock();
424         try {
425             //allocate a new run
426             int runSize = calculateRunSize(sizeIdx);
427             //runSize must be multiples of pageSize
428             long runHandle = allocateRun(runSize);
429             if (runHandle < 0) {
430                 return -1;
431             }
432 
433             int runOffset = runOffset(runHandle);
434             assert subpages[runOffset] == null;
435             int elemSize = arena.sizeIdx2size(sizeIdx);
436 
437             PoolSubpage subpage = new PoolSubpage(head, this, pageShifts, runOffset,
438                                runSize(pageShifts, runHandle), elemSize);
439 
440             subpages[runOffset] = subpage;
441             return subpage.allocate();
442         } finally {
443             head.unlock();
444         }
445     }
446 
447     /**
448      * Free a subpage, or a run of pages When a subpage is freed from PoolSubpage, it might be added back to subpage
449      * pool of the owning PoolArena. If the subpage pool in PoolArena has at least one other PoolSubpage of given
450      * elemSize, we can completely free the owning Page, so it is available for subsequent allocations.
451      *
452      * @param handle handle to free
453      */
454     void free(long handle, int normCapacity) {
455         int runSize = runSize(pageShifts, handle);
456         pinnedBytes -= runSize;
457         if (isSubpage(handle)) {
458             int sizeIdx = arena.size2SizeIdx(normCapacity);
459             PoolSubpage head = arena.findSubpagePoolHead(sizeIdx);
460 
461             int sIdx = runOffset(handle);
462             PoolSubpage subpage = subpages[sIdx];
463             assert subpage != null && subpage.doNotDestroy;
464 
465             // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
466             // This is need as we may add it back and so alter the linked-list structure.
467             head.lock();
468             try {
469                 if (subpage.free(head, bitmapIdx(handle))) {
470                     //the subpage is still used, do not free it
471                     return;
472                 }
473                 assert !subpage.doNotDestroy;
474                 // Null out slot in the array as it was freed, and we should not use it anymore.
475                 subpages[sIdx] = null;
476             } finally {
477                 head.unlock();
478             }
479         }
480 
481         //start free run
482         runsAvailLock.lock();
483         try {
484             // collapse continuous runs, successfully collapsed runs
485             // will be removed from runsAvail and runsAvailMap
486             long finalRun = collapseRuns(handle);
487 
488             //set run as not used
489             finalRun &= ~(1L << IS_USED_SHIFT);
490             //if it is a subpage, set it to run
491             finalRun &= ~(1L << IS_SUBPAGE_SHIFT);
492 
493             insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);
494             freeBytes += runSize;
495         } finally {
496             runsAvailLock.unlock();
497         }
498     }
499 
500     private long collapseRuns(long handle) {
501         return collapseNext(collapsePast(handle));
502     }
503 
504     private long collapsePast(long handle) {
505         for (;;) {
506             int runOffset = runOffset(handle);
507             int runPages = runPages(handle);
508 
509             long pastRun = getAvailRunByOffset(runOffset - 1);
510             if (pastRun == -1) {
511                 return handle;
512             }
513 
514             int pastOffset = runOffset(pastRun);
515             int pastPages = runPages(pastRun);
516 
517             //is continuous
518             if (pastRun != handle && pastOffset + pastPages == runOffset) {
519                 //remove past run
520                 removeAvailRun(pastRun);
521                 handle = toRunHandle(pastOffset, pastPages + runPages, 0);
522             } else {
523                 return handle;
524             }
525         }
526     }
527 
528     private long collapseNext(long handle) {
529         for (;;) {
530             int runOffset = runOffset(handle);
531             int runPages = runPages(handle);
532 
533             long nextRun = getAvailRunByOffset(runOffset + runPages);
534             if (nextRun == -1) {
535                 return handle;
536             }
537 
538             int nextOffset = runOffset(nextRun);
539             int nextPages = runPages(nextRun);
540 
541             //is continuous
542             if (nextRun != handle && runOffset + runPages == nextOffset) {
543                 //remove next run
544                 removeAvailRun(nextRun);
545                 handle = toRunHandle(runOffset, runPages + nextPages, 0);
546             } else {
547                 return handle;
548             }
549         }
550     }
551 
552     private static long toRunHandle(int runOffset, int runPages, int inUsed) {
553         return (long) runOffset << RUN_OFFSET_SHIFT
554                | (long) runPages << SIZE_SHIFT
555                | (long) inUsed << IS_USED_SHIFT;
556     }
557 
558     UntetheredMemory allocateBuffer(long handle, int size, PoolThreadCache threadCache) {
559         if (isSubpage(handle)) {
560             return allocateBufferWithSubpage(handle, size, threadCache);
561         } else {
562             int offset = runOffset(handle) << pageShifts;
563             int maxLength = runSize(pageShifts, handle);
564             PoolThreadCache poolThreadCache = arena.parent.threadCache();
565             return new UntetheredChunkAllocation(
566                     memory, this, poolThreadCache, handle, maxLength, offset, size);
567         }
568     }
569 
570     UntetheredMemory allocateBufferWithSubpage(long handle, int size, PoolThreadCache threadCache) {
571         int runOffset = runOffset(handle);
572         int bitmapIdx = bitmapIdx(handle);
573 
574         PoolSubpage s = subpages[runOffset];
575         assert s.doNotDestroy;
576         assert size <= s.elemSize : size + "<=" + s.elemSize;
577 
578         int offset = (runOffset << pageShifts) + bitmapIdx * s.elemSize;
579         return new UntetheredChunkAllocation(memory, this, threadCache, handle, s.elemSize, offset, size);
580     }
581 
582     @SuppressWarnings("unchecked")
583     private static final class UntetheredChunkAllocation implements UntetheredMemory {
584         private final Object memory;
585         private final PoolChunk chunk;
586         private final PoolThreadCache threadCache;
587         private final long handle;
588         private final int maxLength;
589 
590         private UntetheredChunkAllocation(
591                 Object memory, PoolChunk chunk, PoolThreadCache threadCache,
592                 long handle, int maxLength, int offset, int size) {
593             try {
594                 this.memory = chunk.arena.manager.sliceMemory(memory, offset, size);
595             } catch (Exception e) {
596                 LOGGER.error("Failed to create slice of pool chunk memory. Chunk layout: " +
597                              chunk.toString() + ", memory: " + chunk.memory, e);
598                 throw e;
599             }
600             this.chunk = chunk;
601             this.threadCache = threadCache;
602             this.handle = handle;
603             this.maxLength = maxLength;
604         }
605 
606         @Override
607         public <Memory> Memory memory() {
608             return (Memory) memory;
609         }
610 
611         @Override
612         public <BufferType extends Buffer> Drop<BufferType> drop() {
613             var pooledDrop = ArcDrop.wrap(new PooledDrop(chunk, threadCache, handle, maxLength));
614             return (Drop<BufferType>) CleanerDrop.wrap(pooledDrop, chunk.arena.manager);
615         }
616     }
617 
618     @Override
619     public int chunkSize() {
620         return chunkSize;
621     }
622 
623     @Override
624     public int freeBytes() {
625         arena.lock();
626         try {
627             return freeBytes;
628         } finally {
629             arena.unlock();
630         }
631     }
632 
633     @Override
634     public int pinnedBytes() {
635         arena.lock();
636         try {
637             return pinnedBytes;
638         } finally {
639             arena.unlock();
640         }
641     }
642 
643     @Override
644     public String toString() {
645         final int freeBytes;
646         arena.lock();
647         try {
648             freeBytes = this.freeBytes;
649         } finally {
650             arena.unlock();
651         }
652 
653         return new StringBuilder()
654                 .append("Chunk(")
655                 .append(Integer.toHexString(System.identityHashCode(this)))
656                 .append(": ")
657                 .append(usage(freeBytes))
658                 .append("%, ")
659                 .append(chunkSize - freeBytes)
660                 .append('/')
661                 .append(chunkSize)
662                 .append(')')
663                 .toString();
664     }
665 
666     void destroy() {
667         baseDrop.drop(base); // Decrement reference count from the chunk (allocated buffers may keep the base alive)
668     }
669 
670     static int runOffset(long handle) {
671         return (int) (handle >> RUN_OFFSET_SHIFT);
672     }
673 
674     static int runSize(int pageShifts, long handle) {
675         return runPages(handle) << pageShifts;
676     }
677 
678     static int runPages(long handle) {
679         return (int) (handle >> SIZE_SHIFT & 0x7fff);
680     }
681 
682     static boolean isUsed(long handle) {
683         return (handle >> IS_USED_SHIFT & 1) == 1L;
684     }
685 
686     static boolean isSubpage(long handle) {
687         return (handle >> IS_SUBPAGE_SHIFT & 1) == 1L;
688     }
689 
690     static int bitmapIdx(long handle) {
691         return (int) handle;
692     }
693 }