View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.buffer;
17  
18  import io.netty.util.internal.LongCounter;
19  import io.netty.util.internal.PlatformDependent;
20  
21  import java.nio.ByteBuffer;
22  import java.util.ArrayDeque;
23  import java.util.Deque;
24  import java.util.PriorityQueue;
25  import java.util.concurrent.locks.ReentrantLock;
26  
27  /**
28   * Description of algorithm for PageRun/PoolSubpage allocation from PoolChunk
29   *
30   * Notation: The following terms are important to understand the code
31   * > page  - a page is the smallest unit of memory chunk that can be allocated
32   * > run   - a run is a collection of pages
33   * > chunk - a chunk is a collection of runs
34   * > in this code chunkSize = maxPages * pageSize
35   *
36   * To begin we allocate a byte array of size = chunkSize
37   * Whenever a ByteBuf of given size needs to be created we search for the first position
38   * in the byte array that has enough empty space to accommodate the requested size and
39   * return a (long) handle that encodes this offset information, (this memory segment is then
40   * marked as reserved so it is always used by exactly one ByteBuf and no more)
41   *
42   * For simplicity all sizes are normalized according to {@link PoolArena#sizeClass#size2SizeIdx(int)} method.
43   * This ensures that when we request for memory segments of size > pageSize the normalizedCapacity
44   * equals the next nearest size in {@link SizeClasses}.
45   *
46   *
47   *  A chunk has the following layout:
48   *
49   *     /-----------------\
50   *     | run             |
51   *     |                 |
52   *     |                 |
53   *     |-----------------|
54   *     | run             |
55   *     |                 |
56   *     |-----------------|
57   *     | unalloctated    |
58   *     | (freed)         |
59   *     |                 |
60   *     |-----------------|
61   *     | subpage         |
62   *     |-----------------|
63   *     | unallocated     |
64   *     | (freed)         |
65   *     | ...             |
66   *     | ...             |
67   *     | ...             |
68   *     |                 |
69   *     |                 |
70   *     |                 |
71   *     \-----------------/
72   *
73   *
74   * handle:
75   * -------
76   * a handle is a long number, the bit layout of a run looks like:
77   *
78   * oooooooo ooooooos ssssssss ssssssue bbbbbbbb bbbbbbbb bbbbbbbb bbbbbbbb
79   *
80   * o: runOffset (page offset in the chunk), 15bit
81   * s: size (number of pages) of this run, 15bit
82   * u: isUsed?, 1bit
83   * e: isSubpage?, 1bit
84   * b: bitmapIdx of subpage, zero if it's not subpage, 32bit
85   *
86   * runsAvailMap:
87   * ------
88   * a map which manages all runs (used and not in used).
89   * For each run, the first runOffset and last runOffset are stored in runsAvailMap.
90   * key: runOffset
91   * value: handle
92   *
93   * runsAvail:
94   * ----------
95   * an array of {@link PriorityQueue}.
96   * Each queue manages same size of runs.
97   * Runs are sorted by offset, so that we always allocate runs with smaller offset.
98   *
99   *
100  * Algorithm:
101  * ----------
102  *
103  *   As we allocate runs, we update values stored in runsAvailMap and runsAvail so that the property is maintained.
104  *
105  * Initialization -
106  *  In the beginning we store the initial run which is the whole chunk.
107  *  The initial run:
108  *  runOffset = 0
109  *  size = chunkSize
110  *  isUsed = no
111  *  isSubpage = no
112  *  bitmapIdx = 0
113  *
114  *
115  * Algorithm: [allocateRun(size)]
116  * ----------
117  * 1) find the first avail run using in runsAvails according to size
118  * 2) if pages of run is larger than request pages then split it, and save the tailing run
119  *    for later using
120  *
121  * Algorithm: [allocateSubpage(size)]
122  * ----------
123  * 1) find a not full subpage according to size.
124  *    if it already exists just return, otherwise allocate a new PoolSubpage and call init()
125  *    note that this subpage object is added to subpagesPool in the PoolArena when we init() it
126  * 2) call subpage.allocate()
127  *
128  * Algorithm: [free(handle, length, nioBuffer)]
129  * ----------
130  * 1) if it is a subpage, return the slab back into this subpage
131  * 2) if the subpage is not used or it is a run, then start free this run
132  * 3) merge continuous avail runs
133  * 4) save the merged run
134  *
135  */
136 final class PoolChunk<T> implements PoolChunkMetric {
137     private static final int SIZE_BIT_LENGTH = 15;
138     private static final int INUSED_BIT_LENGTH = 1;
139     private static final int SUBPAGE_BIT_LENGTH = 1;
140     private static final int BITMAP_IDX_BIT_LENGTH = 32;
141 
142     static final int IS_SUBPAGE_SHIFT = BITMAP_IDX_BIT_LENGTH;
143     static final int IS_USED_SHIFT = SUBPAGE_BIT_LENGTH + IS_SUBPAGE_SHIFT;
144     static final int SIZE_SHIFT = INUSED_BIT_LENGTH + IS_USED_SHIFT;
145     static final int RUN_OFFSET_SHIFT = SIZE_BIT_LENGTH + SIZE_SHIFT;
146 
147     final PoolArena<T> arena;
148     final Object base;
149     final T memory;
150     final boolean unpooled;
151 
152     /**
153      * store the first page and last page of each avail run
154      */
155     private final LongLongHashMap runsAvailMap;
156 
157     /**
158      * manage all avail runs
159      */
160     private final IntPriorityQueue[] runsAvail;
161 
162     private final ReentrantLock runsAvailLock;
163 
164     /**
165      * manage all subpages in this chunk
166      */
167     private final PoolSubpage<T>[] subpages;
168 
169     /**
170      * Accounting of pinned memory – memory that is currently in use by ByteBuf instances.
171      */
172     private final LongCounter pinnedBytes = PlatformDependent.newLongCounter();
173 
174     final int pageSize;
175     final int pageShifts;
176     final int chunkSize;
177     final int maxPageIdx;
178 
179     // Use as cache for ByteBuffer created from the memory. These are just duplicates and so are only a container
180     // around the memory itself. These are often needed for operations within the Pooled*ByteBuf and so
181     // may produce extra GC, which can be greatly reduced by caching the duplicates.
182     //
183     // This may be null if the PoolChunk is unpooled as pooling the ByteBuffer instances does not make any sense here.
184     private final Deque<ByteBuffer> cachedNioBuffers;
185 
186     int freeBytes;
187 
188     PoolChunkList<T> parent;
189     PoolChunk<T> prev;
190     PoolChunk<T> next;
191 
192     // TODO: Test if adding padding helps under contention
193     //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
194 
195     @SuppressWarnings("unchecked")
196     PoolChunk(PoolArena<T> arena, Object base, T memory, int pageSize, int pageShifts, int chunkSize, int maxPageIdx) {
197         unpooled = false;
198         this.arena = arena;
199         this.base = base;
200         this.memory = memory;
201         this.pageSize = pageSize;
202         this.pageShifts = pageShifts;
203         this.chunkSize = chunkSize;
204         this.maxPageIdx = maxPageIdx;
205         freeBytes = chunkSize;
206 
207         runsAvail = newRunsAvailqueueArray(maxPageIdx);
208         runsAvailLock = new ReentrantLock();
209         runsAvailMap = new LongLongHashMap(-1);
210         subpages = new PoolSubpage[chunkSize >> pageShifts];
211 
212         //insert initial run, offset = 0, pages = chunkSize / pageSize
213         int pages = chunkSize >> pageShifts;
214         long initHandle = (long) pages << SIZE_SHIFT;
215         insertAvailRun(0, pages, initHandle);
216 
217         cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);
218     }
219 
220     /** Creates a special chunk that is not pooled. */
221     PoolChunk(PoolArena<T> arena, Object base, T memory, int size) {
222         unpooled = true;
223         this.arena = arena;
224         this.base = base;
225         this.memory = memory;
226         pageSize = 0;
227         pageShifts = 0;
228         maxPageIdx = 0;
229         runsAvailMap = null;
230         runsAvail = null;
231         runsAvailLock = null;
232         subpages = null;
233         chunkSize = size;
234         cachedNioBuffers = null;
235     }
236 
237     private static IntPriorityQueue[] newRunsAvailqueueArray(int size) {
238         IntPriorityQueue[] queueArray = new IntPriorityQueue[size];
239         for (int i = 0; i < queueArray.length; i++) {
240             queueArray[i] = new IntPriorityQueue();
241         }
242         return queueArray;
243     }
244 
245     private void insertAvailRun(int runOffset, int pages, long handle) {
246         int pageIdxFloor = arena.sizeClass.pages2pageIdxFloor(pages);
247         IntPriorityQueue queue = runsAvail[pageIdxFloor];
248         assert isRun(handle);
249         queue.offer((int) (handle >> BITMAP_IDX_BIT_LENGTH));
250 
251         //insert first page of run
252         insertAvailRun0(runOffset, handle);
253         if (pages > 1) {
254             //insert last page of run
255             insertAvailRun0(lastPage(runOffset, pages), handle);
256         }
257     }
258 
259     private void insertAvailRun0(int runOffset, long handle) {
260         long pre = runsAvailMap.put(runOffset, handle);
261         assert pre == -1;
262     }
263 
264     private void removeAvailRun(long handle) {
265         int pageIdxFloor = arena.sizeClass.pages2pageIdxFloor(runPages(handle));
266         runsAvail[pageIdxFloor].remove((int) (handle >> BITMAP_IDX_BIT_LENGTH));
267         removeAvailRun0(handle);
268     }
269 
270     private void removeAvailRun0(long handle) {
271         int runOffset = runOffset(handle);
272         int pages = runPages(handle);
273         //remove first page of run
274         runsAvailMap.remove(runOffset);
275         if (pages > 1) {
276             //remove last page of run
277             runsAvailMap.remove(lastPage(runOffset, pages));
278         }
279     }
280 
281     private static int lastPage(int runOffset, int pages) {
282         return runOffset + pages - 1;
283     }
284 
285     private long getAvailRunByOffset(int runOffset) {
286         return runsAvailMap.get(runOffset);
287     }
288 
289     @Override
290     public int usage() {
291         final int freeBytes;
292         if (this.unpooled) {
293             freeBytes = this.freeBytes;
294         } else {
295             runsAvailLock.lock();
296             try {
297                 freeBytes = this.freeBytes;
298             } finally {
299                 runsAvailLock.unlock();
300             }
301         }
302         return usage(freeBytes);
303     }
304 
305     private int usage(int freeBytes) {
306         if (freeBytes == 0) {
307             return 100;
308         }
309 
310         int freePercentage = (int) (freeBytes * 100L / chunkSize);
311         if (freePercentage == 0) {
312             return 99;
313         }
314         return 100 - freePercentage;
315     }
316 
317     boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache cache) {
318         final long handle;
319         if (sizeIdx <= arena.sizeClass.smallMaxSizeIdx) {
320             final PoolSubpage<T> nextSub;
321             // small
322             // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
323             // This is need as we may add it back and so alter the linked-list structure.
324             PoolSubpage<T> head = arena.smallSubpagePools[sizeIdx];
325             head.lock();
326             try {
327                 nextSub = head.next;
328                 if (nextSub != head) {
329                     assert nextSub.doNotDestroy && nextSub.elemSize == arena.sizeClass.sizeIdx2size(sizeIdx) :
330                             "doNotDestroy=" + nextSub.doNotDestroy + ", elemSize=" + nextSub.elemSize + ", sizeIdx=" +
331                                     sizeIdx;
332                     handle = nextSub.allocate();
333                     assert handle >= 0;
334                     assert isSubpage(handle);
335                     nextSub.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
336                     return true;
337                 }
338                 handle = allocateSubpage(sizeIdx, head);
339                 if (handle < 0) {
340                     return false;
341                 }
342                 assert isSubpage(handle);
343             } finally {
344                 head.unlock();
345             }
346         } else {
347             // normal
348             // runSize must be multiple of pageSize
349             int runSize = arena.sizeClass.sizeIdx2size(sizeIdx);
350             handle = allocateRun(runSize);
351             if (handle < 0) {
352                 return false;
353             }
354             assert !isSubpage(handle);
355         }
356 
357         ByteBuffer nioBuffer = cachedNioBuffers != null? cachedNioBuffers.pollLast() : null;
358         initBuf(buf, nioBuffer, handle, reqCapacity, cache);
359         return true;
360     }
361 
362     private long allocateRun(int runSize) {
363         int pages = runSize >> pageShifts;
364         int pageIdx = arena.sizeClass.pages2pageIdx(pages);
365 
366         runsAvailLock.lock();
367         try {
368             //find first queue which has at least one big enough run
369             int queueIdx = runFirstBestFit(pageIdx);
370             if (queueIdx == -1) {
371                 return -1;
372             }
373 
374             //get run with min offset in this queue
375             IntPriorityQueue queue = runsAvail[queueIdx];
376             long handle = queue.poll();
377             assert handle != IntPriorityQueue.NO_VALUE;
378             handle <<= BITMAP_IDX_BIT_LENGTH;
379             assert !isUsed(handle) : "invalid handle: " + handle;
380 
381             removeAvailRun0(handle);
382 
383             handle = splitLargeRun(handle, pages);
384 
385             int pinnedSize = runSize(pageShifts, handle);
386             freeBytes -= pinnedSize;
387             return handle;
388         } finally {
389             runsAvailLock.unlock();
390         }
391     }
392 
393     private int calculateRunSize(int sizeIdx) {
394         int maxElements = 1 << pageShifts - SizeClasses.LOG2_QUANTUM;
395         int runSize = 0;
396         int nElements;
397 
398         final int elemSize = arena.sizeClass.sizeIdx2size(sizeIdx);
399 
400         //find lowest common multiple of pageSize and elemSize
401         do {
402             runSize += pageSize;
403             nElements = runSize / elemSize;
404         } while (nElements < maxElements && runSize != nElements * elemSize);
405 
406         while (nElements > maxElements) {
407             runSize -= pageSize;
408             nElements = runSize / elemSize;
409         }
410 
411         assert nElements > 0;
412         assert runSize <= chunkSize;
413         assert runSize >= elemSize;
414 
415         return runSize;
416     }
417 
418     private int runFirstBestFit(int pageIdx) {
419         if (freeBytes == chunkSize) {
420             return arena.sizeClass.nPSizes - 1;
421         }
422         for (int i = pageIdx; i < arena.sizeClass.nPSizes; i++) {
423             IntPriorityQueue queue = runsAvail[i];
424             if (queue != null && !queue.isEmpty()) {
425                 return i;
426             }
427         }
428         return -1;
429     }
430 
431     private long splitLargeRun(long handle, int needPages) {
432         assert needPages > 0;
433 
434         int totalPages = runPages(handle);
435         assert needPages <= totalPages;
436 
437         int remPages = totalPages - needPages;
438 
439         if (remPages > 0) {
440             int runOffset = runOffset(handle);
441 
442             // keep track of trailing unused pages for later use
443             int availOffset = runOffset + needPages;
444             long availRun = toRunHandle(availOffset, remPages, 0);
445             insertAvailRun(availOffset, remPages, availRun);
446 
447             // not avail
448             return toRunHandle(runOffset, needPages, 1);
449         }
450 
451         //mark it as used
452         handle |= 1L << IS_USED_SHIFT;
453         return handle;
454     }
455 
456     /**
457      * Create / initialize a new PoolSubpage of normCapacity. Any PoolSubpage created / initialized here is added to
458      * subpage pool in the PoolArena that owns this PoolChunk.
459      *
460      * @param sizeIdx sizeIdx of normalized size
461      * @param head head of subpages
462      *
463      * @return index in memoryMap
464      */
465     private long allocateSubpage(int sizeIdx, PoolSubpage<T> head) {
466         //allocate a new run
467         int runSize = calculateRunSize(sizeIdx);
468         //runSize must be multiples of pageSize
469         long runHandle = allocateRun(runSize);
470         if (runHandle < 0) {
471             return -1;
472         }
473 
474         int runOffset = runOffset(runHandle);
475         assert subpages[runOffset] == null;
476         int elemSize = arena.sizeClass.sizeIdx2size(sizeIdx);
477 
478         PoolSubpage<T> subpage = new PoolSubpage<T>(head, this, pageShifts, runOffset,
479                 runSize(pageShifts, runHandle), elemSize);
480 
481         subpages[runOffset] = subpage;
482         return subpage.allocate();
483     }
484 
485     /**
486      * Free a subpage or a run of pages When a subpage is freed from PoolSubpage, it might be added back to subpage pool
487      * of the owning PoolArena. If the subpage pool in PoolArena has at least one other PoolSubpage of given elemSize,
488      * we can completely free the owning Page so it is available for subsequent allocations
489      *
490      * @param handle handle to free
491      */
492     void free(long handle, int normCapacity, ByteBuffer nioBuffer) {
493         if (isSubpage(handle)) {
494             int sIdx = runOffset(handle);
495             PoolSubpage<T> subpage = subpages[sIdx];
496             assert subpage != null;
497             PoolSubpage<T> head = subpage.chunk.arena.smallSubpagePools[subpage.headIndex];
498             // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
499             // This is need as we may add it back and so alter the linked-list structure.
500             head.lock();
501             try {
502                 assert subpage.doNotDestroy;
503                 if (subpage.free(head, bitmapIdx(handle))) {
504                     //the subpage is still used, do not free it
505                     return;
506                 }
507                 assert !subpage.doNotDestroy;
508                 // Null out slot in the array as it was freed and we should not use it anymore.
509                 subpages[sIdx] = null;
510             } finally {
511                 head.unlock();
512             }
513         }
514 
515         int runSize = runSize(pageShifts, handle);
516         //start free run
517         runsAvailLock.lock();
518         try {
519             // collapse continuous runs, successfully collapsed runs
520             // will be removed from runsAvail and runsAvailMap
521             long finalRun = collapseRuns(handle);
522 
523             //set run as not used
524             finalRun &= ~(1L << IS_USED_SHIFT);
525             //if it is a subpage, set it to run
526             finalRun &= ~(1L << IS_SUBPAGE_SHIFT);
527 
528             insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);
529             freeBytes += runSize;
530         } finally {
531             runsAvailLock.unlock();
532         }
533 
534         if (nioBuffer != null && cachedNioBuffers != null &&
535             cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
536             cachedNioBuffers.offer(nioBuffer);
537         }
538     }
539 
540     private long collapseRuns(long handle) {
541         return collapseNext(collapsePast(handle));
542     }
543 
544     private long collapsePast(long handle) {
545         for (;;) {
546             int runOffset = runOffset(handle);
547             int runPages = runPages(handle);
548 
549             long pastRun = getAvailRunByOffset(runOffset - 1);
550             if (pastRun == -1) {
551                 return handle;
552             }
553 
554             int pastOffset = runOffset(pastRun);
555             int pastPages = runPages(pastRun);
556 
557             //is continuous
558             if (pastRun != handle && pastOffset + pastPages == runOffset) {
559                 //remove past run
560                 removeAvailRun(pastRun);
561                 handle = toRunHandle(pastOffset, pastPages + runPages, 0);
562             } else {
563                 return handle;
564             }
565         }
566     }
567 
568     private long collapseNext(long handle) {
569         for (;;) {
570             int runOffset = runOffset(handle);
571             int runPages = runPages(handle);
572 
573             long nextRun = getAvailRunByOffset(runOffset + runPages);
574             if (nextRun == -1) {
575                 return handle;
576             }
577 
578             int nextOffset = runOffset(nextRun);
579             int nextPages = runPages(nextRun);
580 
581             //is continuous
582             if (nextRun != handle && runOffset + runPages == nextOffset) {
583                 //remove next run
584                 removeAvailRun(nextRun);
585                 handle = toRunHandle(runOffset, runPages + nextPages, 0);
586             } else {
587                 return handle;
588             }
589         }
590     }
591 
592     private static long toRunHandle(int runOffset, int runPages, int inUsed) {
593         return (long) runOffset << RUN_OFFSET_SHIFT
594                | (long) runPages << SIZE_SHIFT
595                | (long) inUsed << IS_USED_SHIFT;
596     }
597 
598     void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
599                  PoolThreadCache threadCache) {
600         if (isSubpage(handle)) {
601             initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);
602         } else {
603             int maxLength = runSize(pageShifts, handle);
604             buf.init(this, nioBuffer, handle, runOffset(handle) << pageShifts,
605                     reqCapacity, maxLength, arena.parent.threadCache());
606         }
607     }
608 
609     void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
610                             PoolThreadCache threadCache) {
611         int runOffset = runOffset(handle);
612         int bitmapIdx = bitmapIdx(handle);
613 
614         PoolSubpage<T> s = subpages[runOffset];
615         assert s.isDoNotDestroy();
616         assert reqCapacity <= s.elemSize : reqCapacity + "<=" + s.elemSize;
617 
618         int offset = (runOffset << pageShifts) + bitmapIdx * s.elemSize;
619         buf.init(this, nioBuffer, handle, offset, reqCapacity, s.elemSize, threadCache);
620     }
621 
622     void incrementPinnedMemory(int delta) {
623         assert delta > 0;
624         pinnedBytes.add(delta);
625     }
626 
627     void decrementPinnedMemory(int delta) {
628         assert delta > 0;
629         pinnedBytes.add(-delta);
630     }
631 
632     @Override
633     public int chunkSize() {
634         return chunkSize;
635     }
636 
637     @Override
638     public int freeBytes() {
639         if (this.unpooled) {
640             return freeBytes;
641         }
642         runsAvailLock.lock();
643         try {
644             return freeBytes;
645         } finally {
646             runsAvailLock.unlock();
647         }
648     }
649 
650     public int pinnedBytes() {
651         return (int) pinnedBytes.value();
652     }
653 
654     @Override
655     public String toString() {
656         final int freeBytes;
657         if (this.unpooled) {
658             freeBytes = this.freeBytes;
659         } else {
660             runsAvailLock.lock();
661             try {
662                 freeBytes = this.freeBytes;
663             } finally {
664                 runsAvailLock.unlock();
665             }
666         }
667 
668         return new StringBuilder()
669                 .append("Chunk(")
670                 .append(Integer.toHexString(System.identityHashCode(this)))
671                 .append(": ")
672                 .append(usage(freeBytes))
673                 .append("%, ")
674                 .append(chunkSize - freeBytes)
675                 .append('/')
676                 .append(chunkSize)
677                 .append(')')
678                 .toString();
679     }
680 
681     void destroy() {
682         arena.destroyChunk(this);
683     }
684 
685     static int runOffset(long handle) {
686         return (int) (handle >> RUN_OFFSET_SHIFT);
687     }
688 
689     static int runSize(int pageShifts, long handle) {
690         return runPages(handle) << pageShifts;
691     }
692 
693     static int runPages(long handle) {
694         return (int) (handle >> SIZE_SHIFT & 0x7fff);
695     }
696 
697     static boolean isUsed(long handle) {
698         return (handle >> IS_USED_SHIFT & 1) == 1L;
699     }
700 
701     static boolean isRun(long handle) {
702         return !isSubpage(handle);
703     }
704 
705     static boolean isSubpage(long handle) {
706         return (handle >> IS_SUBPAGE_SHIFT & 1) == 1L;
707     }
708 
709     static int bitmapIdx(long handle) {
710         return (int) handle;
711     }
712 }