View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.buffer;
17  
18  import io.netty.util.internal.LongCounter;
19  import io.netty.util.internal.PlatformDependent;
20  
21  import java.nio.ByteBuffer;
22  import java.util.ArrayDeque;
23  import java.util.Deque;
24  import java.util.PriorityQueue;
25  
26  /**
27   * Description of algorithm for PageRun/PoolSubpage allocation from PoolChunk
28   *
29   * Notation: The following terms are important to understand the code
30   * > page  - a page is the smallest unit of memory chunk that can be allocated
31   * > run   - a run is a collection of pages
32   * > chunk - a chunk is a collection of runs
33   * > in this code chunkSize = maxPages * pageSize
34   *
35   * To begin we allocate a byte array of size = chunkSize
36   * Whenever a ByteBuf of given size needs to be created we search for the first position
37   * in the byte array that has enough empty space to accommodate the requested size and
38   * return a (long) handle that encodes this offset information, (this memory segment is then
39   * marked as reserved so it is always used by exactly one ByteBuf and no more)
40   *
41   * For simplicity all sizes are normalized according to {@link PoolArena#size2SizeIdx(int)} method.
42   * This ensures that when we request for memory segments of size > pageSize the normalizedCapacity
43   * equals the next nearest size in {@link SizeClasses}.
44   *
45   *
46   *  A chunk has the following layout:
47   *
48   *     /-----------------\
49   *     | run             |
50   *     |                 |
51   *     |                 |
52   *     |-----------------|
53   *     | run             |
54   *     |                 |
55   *     |-----------------|
56   *     | unalloctated    |
57   *     | (freed)         |
58   *     |                 |
59   *     |-----------------|
60   *     | subpage         |
61   *     |-----------------|
62   *     | unallocated     |
63   *     | (freed)         |
64   *     | ...             |
65   *     | ...             |
66   *     | ...             |
67   *     |                 |
68   *     |                 |
69   *     |                 |
70   *     \-----------------/
71   *
72   *
73   * handle:
74   * -------
75   * a handle is a long number, the bit layout of a run looks like:
76   *
77   * oooooooo ooooooos ssssssss ssssssue bbbbbbbb bbbbbbbb bbbbbbbb bbbbbbbb
78   *
79   * o: runOffset (page offset in the chunk), 15bit
80   * s: size (number of pages) of this run, 15bit
81   * u: isUsed?, 1bit
82   * e: isSubpage?, 1bit
83   * b: bitmapIdx of subpage, zero if it's not subpage, 32bit
84   *
85   * runsAvailMap:
86   * ------
87   * a map which manages all runs (used and not in used).
88   * For each run, the first runOffset and last runOffset are stored in runsAvailMap.
89   * key: runOffset
90   * value: handle
91   *
92   * runsAvail:
93   * ----------
94   * an array of {@link PriorityQueue}.
95   * Each queue manages same size of runs.
96   * Runs are sorted by offset, so that we always allocate runs with smaller offset.
97   *
98   *
99   * Algorithm:
100  * ----------
101  *
102  *   As we allocate runs, we update values stored in runsAvailMap and runsAvail so that the property is maintained.
103  *
104  * Initialization -
105  *  In the beginning we store the initial run which is the whole chunk.
106  *  The initial run:
107  *  runOffset = 0
108  *  size = chunkSize
109  *  isUsed = no
110  *  isSubpage = no
111  *  bitmapIdx = 0
112  *
113  *
114  * Algorithm: [allocateRun(size)]
115  * ----------
116  * 1) find the first avail run using in runsAvails according to size
117  * 2) if pages of run is larger than request pages then split it, and save the tailing run
118  *    for later using
119  *
120  * Algorithm: [allocateSubpage(size)]
121  * ----------
122  * 1) find a not full subpage according to size.
123  *    if it already exists just return, otherwise allocate a new PoolSubpage and call init()
124  *    note that this subpage object is added to subpagesPool in the PoolArena when we init() it
125  * 2) call subpage.allocate()
126  *
127  * Algorithm: [free(handle, length, nioBuffer)]
128  * ----------
129  * 1) if it is a subpage, return the slab back into this subpage
130  * 2) if the subpage is not used or it is a run, then start free this run
131  * 3) merge continuous avail runs
132  * 4) save the merged run
133  *
134  */
135 final class PoolChunk<T> implements PoolChunkMetric {
136     private static final int SIZE_BIT_LENGTH = 15;
137     private static final int INUSED_BIT_LENGTH = 1;
138     private static final int SUBPAGE_BIT_LENGTH = 1;
139     private static final int BITMAP_IDX_BIT_LENGTH = 32;
140 
141     static final int IS_SUBPAGE_SHIFT = BITMAP_IDX_BIT_LENGTH;
142     static final int IS_USED_SHIFT = SUBPAGE_BIT_LENGTH + IS_SUBPAGE_SHIFT;
143     static final int SIZE_SHIFT = INUSED_BIT_LENGTH + IS_USED_SHIFT;
144     static final int RUN_OFFSET_SHIFT = SIZE_BIT_LENGTH + SIZE_SHIFT;
145 
146     final PoolArena<T> arena;
147     final Object base;
148     final T memory;
149     final boolean unpooled;
150 
151     /**
152      * store the first page and last page of each avail run
153      */
154     private final LongLongHashMap runsAvailMap;
155 
156     /**
157      * manage all avail runs
158      */
159     private final LongPriorityQueue[] runsAvail;
160 
161     /**
162      * manage all subpages in this chunk
163      */
164     private final PoolSubpage<T>[] subpages;
165 
166     /**
167      * Accounting of pinned memory – memory that is currently in use by ByteBuf instances.
168      */
169     private final LongCounter pinnedBytes = PlatformDependent.newLongCounter();
170 
171     private final int pageSize;
172     private final int pageShifts;
173     private final int chunkSize;
174 
175     // Use as cache for ByteBuffer created from the memory. These are just duplicates and so are only a container
176     // around the memory itself. These are often needed for operations within the Pooled*ByteBuf and so
177     // may produce extra GC, which can be greatly reduced by caching the duplicates.
178     //
179     // This may be null if the PoolChunk is unpooled as pooling the ByteBuffer instances does not make any sense here.
180     private final Deque<ByteBuffer> cachedNioBuffers;
181 
182     int freeBytes;
183 
184     PoolChunkList<T> parent;
185     PoolChunk<T> prev;
186     PoolChunk<T> next;
187 
188     // TODO: Test if adding padding helps under contention
189     //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
190 
191     @SuppressWarnings("unchecked")
192     PoolChunk(PoolArena<T> arena, Object base, T memory, int pageSize, int pageShifts, int chunkSize, int maxPageIdx) {
193         unpooled = false;
194         this.arena = arena;
195         this.base = base;
196         this.memory = memory;
197         this.pageSize = pageSize;
198         this.pageShifts = pageShifts;
199         this.chunkSize = chunkSize;
200         freeBytes = chunkSize;
201 
202         runsAvail = newRunsAvailqueueArray(maxPageIdx);
203         runsAvailMap = new LongLongHashMap(-1);
204         subpages = new PoolSubpage[chunkSize >> pageShifts];
205 
206         //insert initial run, offset = 0, pages = chunkSize / pageSize
207         int pages = chunkSize >> pageShifts;
208         long initHandle = (long) pages << SIZE_SHIFT;
209         insertAvailRun(0, pages, initHandle);
210 
211         cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);
212     }
213 
214     /** Creates a special chunk that is not pooled. */
215     PoolChunk(PoolArena<T> arena, Object base, T memory, int size) {
216         unpooled = true;
217         this.arena = arena;
218         this.base = base;
219         this.memory = memory;
220         pageSize = 0;
221         pageShifts = 0;
222         runsAvailMap = null;
223         runsAvail = null;
224         subpages = null;
225         chunkSize = size;
226         cachedNioBuffers = null;
227     }
228 
229     private static LongPriorityQueue[] newRunsAvailqueueArray(int size) {
230         LongPriorityQueue[] queueArray = new LongPriorityQueue[size];
231         for (int i = 0; i < queueArray.length; i++) {
232             queueArray[i] = new LongPriorityQueue();
233         }
234         return queueArray;
235     }
236 
237     private void insertAvailRun(int runOffset, int pages, long handle) {
238         int pageIdxFloor = arena.pages2pageIdxFloor(pages);
239         LongPriorityQueue queue = runsAvail[pageIdxFloor];
240         queue.offer(handle);
241 
242         //insert first page of run
243         insertAvailRun0(runOffset, handle);
244         if (pages > 1) {
245             //insert last page of run
246             insertAvailRun0(lastPage(runOffset, pages), handle);
247         }
248     }
249 
250     private void insertAvailRun0(int runOffset, long handle) {
251         long pre = runsAvailMap.put(runOffset, handle);
252         assert pre == -1;
253     }
254 
255     private void removeAvailRun(long handle) {
256         int pageIdxFloor = arena.pages2pageIdxFloor(runPages(handle));
257         LongPriorityQueue queue = runsAvail[pageIdxFloor];
258         removeAvailRun(queue, handle);
259     }
260 
261     private void removeAvailRun(LongPriorityQueue queue, long handle) {
262         queue.remove(handle);
263 
264         int runOffset = runOffset(handle);
265         int pages = runPages(handle);
266         //remove first page of run
267         runsAvailMap.remove(runOffset);
268         if (pages > 1) {
269             //remove last page of run
270             runsAvailMap.remove(lastPage(runOffset, pages));
271         }
272     }
273 
274     private static int lastPage(int runOffset, int pages) {
275         return runOffset + pages - 1;
276     }
277 
278     private long getAvailRunByOffset(int runOffset) {
279         return runsAvailMap.get(runOffset);
280     }
281 
282     @Override
283     public int usage() {
284         final int freeBytes;
285         synchronized (arena) {
286             freeBytes = this.freeBytes;
287         }
288         return usage(freeBytes);
289     }
290 
291     private int usage(int freeBytes) {
292         if (freeBytes == 0) {
293             return 100;
294         }
295 
296         int freePercentage = (int) (freeBytes * 100L / chunkSize);
297         if (freePercentage == 0) {
298             return 99;
299         }
300         return 100 - freePercentage;
301     }
302 
303     boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache cache) {
304         final long handle;
305         if (sizeIdx <= arena.smallMaxSizeIdx) {
306             // small
307             handle = allocateSubpage(sizeIdx);
308             if (handle < 0) {
309                 return false;
310             }
311             assert isSubpage(handle);
312         } else {
313             // normal
314             // runSize must be multiple of pageSize
315             int runSize = arena.sizeIdx2size(sizeIdx);
316             handle = allocateRun(runSize);
317             if (handle < 0) {
318                 return false;
319             }
320             assert !isSubpage(handle);
321         }
322 
323         ByteBuffer nioBuffer = cachedNioBuffers != null? cachedNioBuffers.pollLast() : null;
324         initBuf(buf, nioBuffer, handle, reqCapacity, cache);
325         return true;
326     }
327 
328     private long allocateRun(int runSize) {
329         int pages = runSize >> pageShifts;
330         int pageIdx = arena.pages2pageIdx(pages);
331 
332         synchronized (runsAvail) {
333             //find first queue which has at least one big enough run
334             int queueIdx = runFirstBestFit(pageIdx);
335             if (queueIdx == -1) {
336                 return -1;
337             }
338 
339             //get run with min offset in this queue
340             LongPriorityQueue queue = runsAvail[queueIdx];
341             long handle = queue.poll();
342 
343             assert handle != LongPriorityQueue.NO_VALUE && !isUsed(handle) : "invalid handle: " + handle;
344 
345             removeAvailRun(queue, handle);
346 
347             if (handle != -1) {
348                 handle = splitLargeRun(handle, pages);
349             }
350 
351             int pinnedSize = runSize(pageShifts, handle);
352             freeBytes -= pinnedSize;
353             return handle;
354         }
355     }
356 
357     private int calculateRunSize(int sizeIdx) {
358         int maxElements = 1 << pageShifts - SizeClasses.LOG2_QUANTUM;
359         int runSize = 0;
360         int nElements;
361 
362         final int elemSize = arena.sizeIdx2size(sizeIdx);
363 
364         //find lowest common multiple of pageSize and elemSize
365         do {
366             runSize += pageSize;
367             nElements = runSize / elemSize;
368         } while (nElements < maxElements && runSize != nElements * elemSize);
369 
370         while (nElements > maxElements) {
371             runSize -= pageSize;
372             nElements = runSize / elemSize;
373         }
374 
375         assert nElements > 0;
376         assert runSize <= chunkSize;
377         assert runSize >= elemSize;
378 
379         return runSize;
380     }
381 
382     private int runFirstBestFit(int pageIdx) {
383         if (freeBytes == chunkSize) {
384             return arena.nPSizes - 1;
385         }
386         for (int i = pageIdx; i < arena.nPSizes; i++) {
387             LongPriorityQueue queue = runsAvail[i];
388             if (queue != null && !queue.isEmpty()) {
389                 return i;
390             }
391         }
392         return -1;
393     }
394 
395     private long splitLargeRun(long handle, int needPages) {
396         assert needPages > 0;
397 
398         int totalPages = runPages(handle);
399         assert needPages <= totalPages;
400 
401         int remPages = totalPages - needPages;
402 
403         if (remPages > 0) {
404             int runOffset = runOffset(handle);
405 
406             // keep track of trailing unused pages for later use
407             int availOffset = runOffset + needPages;
408             long availRun = toRunHandle(availOffset, remPages, 0);
409             insertAvailRun(availOffset, remPages, availRun);
410 
411             // not avail
412             return toRunHandle(runOffset, needPages, 1);
413         }
414 
415         //mark it as used
416         handle |= 1L << IS_USED_SHIFT;
417         return handle;
418     }
419 
420     /**
421      * Create / initialize a new PoolSubpage of normCapacity. Any PoolSubpage created / initialized here is added to
422      * subpage pool in the PoolArena that owns this PoolChunk
423      *
424      * @param sizeIdx sizeIdx of normalized size
425      *
426      * @return index in memoryMap
427      */
428     private long allocateSubpage(int sizeIdx) {
429         // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
430         // This is need as we may add it back and so alter the linked-list structure.
431         PoolSubpage<T> head = arena.findSubpagePoolHead(sizeIdx);
432         synchronized (head) {
433             //allocate a new run
434             int runSize = calculateRunSize(sizeIdx);
435             //runSize must be multiples of pageSize
436             long runHandle = allocateRun(runSize);
437             if (runHandle < 0) {
438                 return -1;
439             }
440 
441             int runOffset = runOffset(runHandle);
442             assert subpages[runOffset] == null;
443             int elemSize = arena.sizeIdx2size(sizeIdx);
444 
445             PoolSubpage<T> subpage = new PoolSubpage<T>(head, this, pageShifts, runOffset,
446                                runSize(pageShifts, runHandle), elemSize);
447 
448             subpages[runOffset] = subpage;
449             return subpage.allocate();
450         }
451     }
452 
453     /**
454      * Free a subpage or a run of pages When a subpage is freed from PoolSubpage, it might be added back to subpage pool
455      * of the owning PoolArena. If the subpage pool in PoolArena has at least one other PoolSubpage of given elemSize,
456      * we can completely free the owning Page so it is available for subsequent allocations
457      *
458      * @param handle handle to free
459      */
460     void free(long handle, int normCapacity, ByteBuffer nioBuffer) {
461         int runSize = runSize(pageShifts, handle);
462         if (isSubpage(handle)) {
463             int sizeIdx = arena.size2SizeIdx(normCapacity);
464             PoolSubpage<T> head = arena.findSubpagePoolHead(sizeIdx);
465 
466             int sIdx = runOffset(handle);
467             PoolSubpage<T> subpage = subpages[sIdx];
468             assert subpage != null && subpage.doNotDestroy;
469 
470             // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
471             // This is need as we may add it back and so alter the linked-list structure.
472             synchronized (head) {
473                 if (subpage.free(head, bitmapIdx(handle))) {
474                     //the subpage is still used, do not free it
475                     return;
476                 }
477                 assert !subpage.doNotDestroy;
478                 // Null out slot in the array as it was freed and we should not use it anymore.
479                 subpages[sIdx] = null;
480             }
481         }
482 
483         //start free run
484         synchronized (runsAvail) {
485             // collapse continuous runs, successfully collapsed runs
486             // will be removed from runsAvail and runsAvailMap
487             long finalRun = collapseRuns(handle);
488 
489             //set run as not used
490             finalRun &= ~(1L << IS_USED_SHIFT);
491             //if it is a subpage, set it to run
492             finalRun &= ~(1L << IS_SUBPAGE_SHIFT);
493 
494             insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);
495             freeBytes += runSize;
496         }
497 
498         if (nioBuffer != null && cachedNioBuffers != null &&
499             cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
500             cachedNioBuffers.offer(nioBuffer);
501         }
502     }
503 
504     private long collapseRuns(long handle) {
505         return collapseNext(collapsePast(handle));
506     }
507 
508     private long collapsePast(long handle) {
509         for (;;) {
510             int runOffset = runOffset(handle);
511             int runPages = runPages(handle);
512 
513             long pastRun = getAvailRunByOffset(runOffset - 1);
514             if (pastRun == -1) {
515                 return handle;
516             }
517 
518             int pastOffset = runOffset(pastRun);
519             int pastPages = runPages(pastRun);
520 
521             //is continuous
522             if (pastRun != handle && pastOffset + pastPages == runOffset) {
523                 //remove past run
524                 removeAvailRun(pastRun);
525                 handle = toRunHandle(pastOffset, pastPages + runPages, 0);
526             } else {
527                 return handle;
528             }
529         }
530     }
531 
532     private long collapseNext(long handle) {
533         for (;;) {
534             int runOffset = runOffset(handle);
535             int runPages = runPages(handle);
536 
537             long nextRun = getAvailRunByOffset(runOffset + runPages);
538             if (nextRun == -1) {
539                 return handle;
540             }
541 
542             int nextOffset = runOffset(nextRun);
543             int nextPages = runPages(nextRun);
544 
545             //is continuous
546             if (nextRun != handle && runOffset + runPages == nextOffset) {
547                 //remove next run
548                 removeAvailRun(nextRun);
549                 handle = toRunHandle(runOffset, runPages + nextPages, 0);
550             } else {
551                 return handle;
552             }
553         }
554     }
555 
556     private static long toRunHandle(int runOffset, int runPages, int inUsed) {
557         return (long) runOffset << RUN_OFFSET_SHIFT
558                | (long) runPages << SIZE_SHIFT
559                | (long) inUsed << IS_USED_SHIFT;
560     }
561 
562     void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
563                  PoolThreadCache threadCache) {
564         if (isSubpage(handle)) {
565             initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);
566         } else {
567             int maxLength = runSize(pageShifts, handle);
568             buf.init(this, nioBuffer, handle, runOffset(handle) << pageShifts,
569                     reqCapacity, maxLength, arena.parent.threadCache());
570         }
571     }
572 
573     void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
574                             PoolThreadCache threadCache) {
575         int runOffset = runOffset(handle);
576         int bitmapIdx = bitmapIdx(handle);
577 
578         PoolSubpage<T> s = subpages[runOffset];
579         assert s.doNotDestroy;
580         assert reqCapacity <= s.elemSize : reqCapacity + "<=" + s.elemSize;
581 
582         int offset = (runOffset << pageShifts) + bitmapIdx * s.elemSize;
583         buf.init(this, nioBuffer, handle, offset, reqCapacity, s.elemSize, threadCache);
584     }
585 
586     void incrementPinnedMemory(int delta) {
587         assert delta > 0;
588         pinnedBytes.add(delta);
589     }
590 
591     void decrementPinnedMemory(int delta) {
592         assert delta > 0;
593         pinnedBytes.add(-delta);
594     }
595 
596     @Override
597     public int chunkSize() {
598         return chunkSize;
599     }
600 
601     @Override
602     public int freeBytes() {
603         synchronized (arena) {
604             return freeBytes;
605         }
606     }
607 
608     public int pinnedBytes() {
609         return (int) pinnedBytes.value();
610     }
611 
612     @Override
613     public String toString() {
614         final int freeBytes;
615         synchronized (arena) {
616             freeBytes = this.freeBytes;
617         }
618 
619         return new StringBuilder()
620                 .append("Chunk(")
621                 .append(Integer.toHexString(System.identityHashCode(this)))
622                 .append(": ")
623                 .append(usage(freeBytes))
624                 .append("%, ")
625                 .append(chunkSize - freeBytes)
626                 .append('/')
627                 .append(chunkSize)
628                 .append(')')
629                 .toString();
630     }
631 
632     void destroy() {
633         arena.destroyChunk(this);
634     }
635 
636     static int runOffset(long handle) {
637         return (int) (handle >> RUN_OFFSET_SHIFT);
638     }
639 
640     static int runSize(int pageShifts, long handle) {
641         return runPages(handle) << pageShifts;
642     }
643 
644     static int runPages(long handle) {
645         return (int) (handle >> SIZE_SHIFT & 0x7fff);
646     }
647 
648     static boolean isUsed(long handle) {
649         return (handle >> IS_USED_SHIFT & 1) == 1L;
650     }
651 
652     static boolean isRun(long handle) {
653         return !isSubpage(handle);
654     }
655 
656     static boolean isSubpage(long handle) {
657         return (handle >> IS_SUBPAGE_SHIFT & 1) == 1L;
658     }
659 
660     static int bitmapIdx(long handle) {
661         return (int) handle;
662     }
663 }