View Javadoc
1   /*
2    * Copyright 2022 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.buffer;
17  
18  import io.netty.util.ByteProcessor;
19  import io.netty.util.CharsetUtil;
20  import io.netty.util.IllegalReferenceCountException;
21  import io.netty.util.IntSupplier;
22  import io.netty.util.NettyRuntime;
23  import io.netty.util.Recycler.EnhancedHandle;
24  import io.netty.util.ReferenceCounted;
25  import io.netty.util.concurrent.FastThreadLocal;
26  import io.netty.util.concurrent.FastThreadLocalThread;
27  import io.netty.util.concurrent.MpscAtomicIntegerArrayQueue;
28  import io.netty.util.concurrent.MpscIntQueue;
29  import io.netty.util.internal.ObjectPool;
30  import io.netty.util.internal.ObjectUtil;
31  import io.netty.util.internal.PlatformDependent;
32  import io.netty.util.internal.ReferenceCountUpdater;
33  import io.netty.util.internal.SuppressJava6Requirement;
34  import io.netty.util.internal.SystemPropertyUtil;
35  import io.netty.util.internal.ThreadExecutorMap;
36  import io.netty.util.internal.ThreadLocalRandom;
37  import io.netty.util.internal.UnstableApi;
38  
39  import java.io.IOException;
40  import java.io.InputStream;
41  import java.io.OutputStream;
42  import java.nio.ByteBuffer;
43  import java.nio.ByteOrder;
44  import java.nio.channels.ClosedChannelException;
45  import java.nio.channels.FileChannel;
46  import java.nio.channels.GatheringByteChannel;
47  import java.nio.channels.ScatteringByteChannel;
48  import java.nio.charset.Charset;
49  import java.util.Arrays;
50  import java.util.Collections;
51  import java.util.Queue;
52  import java.util.Set;
53  import java.util.concurrent.ConcurrentHashMap;
54  import java.util.concurrent.ConcurrentLinkedQueue;
55  import java.util.concurrent.CopyOnWriteArraySet;
56  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
57  import java.util.concurrent.atomic.AtomicLong;
58  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
59  import java.util.concurrent.locks.StampedLock;
60  
61  /**
62   * An auto-tuning pooling allocator, that follows an anti-generational hypothesis.
63   * <p>
64   * The allocator is organized into a list of Magazines, and each magazine has a chunk-buffer that they allocate buffers
65   * from.
66   * <p>
67   * The magazines hold the mutexes that ensure the thread-safety of the allocator, and each thread picks a magazine
68   * based on the id of the thread. This spreads the contention of multi-threaded access across the magazines.
69   * If contention is detected above a certain threshold, the number of magazines are increased in response to the
70   * contention.
71   * <p>
72   * The magazines maintain histograms of the sizes of the allocations they do. The histograms are used to compute the
73   * preferred chunk size. The preferred chunk size is one that is big enough to service 10 allocations of the
74   * 99-percentile size. This way, the chunk size is adapted to the allocation patterns.
75   * <p>
76   * Computing the preferred chunk size is a somewhat expensive operation. Therefore, the frequency with which this is
77   * done, is also adapted to the allocation pattern. If a newly computed preferred chunk is the same as the previous
78   * preferred chunk size, then the frequency is reduced. Otherwise, the frequency is increased.
79   * <p>
80   * This allows the allocator to quickly respond to changes in the application workload,
81   * without suffering undue overhead from maintaining its statistics.
82   * <p>
83   * Since magazines are "relatively thread-local", the allocator has a central queue that allow excess chunks from any
84   * magazine, to be shared with other magazines.
85   * The {@link #createSharedChunkQueue()} method can be overridden to customize this queue.
86   */
87  @SuppressJava6Requirement(reason = "Guarded by version check")
88  @UnstableApi
89  final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.AdaptiveAllocatorApi {
90      /**
91       * The 128 KiB minimum chunk size is chosen to encourage the system allocator to delegate to mmap for chunk
92       * allocations. For instance, glibc will do this.
93       * This pushes any fragmentation from chunk size deviations off physical memory, onto virtual memory,
94       * which is a much, much larger space. Chunks are also allocated in whole multiples of the minimum
95       * chunk size, which itself is a whole multiple of popular page sizes like 4 KiB, 16 KiB, and 64 KiB.
96       */
97      private static final int MIN_CHUNK_SIZE = 128 * 1024;
98      private static final int EXPANSION_ATTEMPTS = 3;
99      private static final int INITIAL_MAGAZINES = 1;
100     private static final int RETIRE_CAPACITY = 256;
101     private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
102     private static final int BUFS_PER_CHUNK = 8; // For large buffers, aim to have about this many buffers per chunk.
103 
104     /**
105      * The maximum size of a pooled chunk, in bytes. Allocations bigger than this will never be pooled.
106      * <p>
107      * This number is 8 MiB, and is derived from the limitations of internal histograms.
108      */
109     private static final int MAX_CHUNK_SIZE = 8 * 1024 * 1024; // 8 MiB.
110     private static final int MAX_POOLED_BUF_SIZE = MAX_CHUNK_SIZE / BUFS_PER_CHUNK;
111 
112     /**
113      * The capacity if the chunk reuse queues, that allow chunks to be shared across magazines in a group.
114      * The default size is twice {@link NettyRuntime#availableProcessors()},
115      * same as the maximum number of magazines per magazine group.
116      */
117     private static final int CHUNK_REUSE_QUEUE = Math.max(2, SystemPropertyUtil.getInt(
118             "io.netty.allocator.chunkReuseQueueCapacity", NettyRuntime.availableProcessors() * 2));
119 
120     /**
121      * The capacity if the magazine local buffer queue. This queue just pools the outer ByteBuf instance and not
122      * the actual memory and so helps to reduce GC pressure.
123      */
124     private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
125             "io.netty.allocator.magazineBufferQueueCapacity", 1024);
126 
127     /**
128      * The size classes are chosen based on the following observation:
129      * <p>
130      * Most allocations, particularly ones above 256 bytes, aim to be a power-of-2. However, many use cases, such
131      * as framing protocols, are themselves operating or moving power-of-2 sized payloads, to which they add a
132      * small amount of overhead, such as headers or checksums.
133      * This means we seem to get a lot of mileage out of having both power-of-2 sizes, and power-of-2-plus-a-bit.
134      * <p>
135      * On the conflicting requirements of both having as few chunks as possible, and having as little wasted
136      * memory within each chunk as possible, this seems to strike a surprisingly good balance for the use cases
137      * tested so far.
138      */
139     private static final int[] SIZE_CLASSES = {
140             32,
141             64,
142             128,
143             256,
144             512,
145             640, // 512 + 128
146             1024,
147             1152, // 1024 + 128
148             2048,
149             2304, // 2048 + 256
150             4096,
151             4352, // 4096 + 256
152             8192,
153             8704, // 8192 + 512
154             16384,
155             16896, // 16384 + 512
156     };
157     private static final ChunkReleasePredicate CHUNK_RELEASE_ALWAYS = new ChunkReleasePredicate() {
158         @Override
159         public boolean shouldReleaseChunk(int chunkSize) {
160             return true;
161         }
162     };
163     private static final ChunkReleasePredicate CHUNK_RELEASE_NEVER = new ChunkReleasePredicate() {
164         @Override
165         public boolean shouldReleaseChunk(int chunkSize) {
166             return false;
167         }
168     };
169 
170     private static final int SIZE_CLASSES_COUNT = SIZE_CLASSES.length;
171     private static final byte[] SIZE_INDEXES = new byte[(SIZE_CLASSES[SIZE_CLASSES_COUNT - 1] / 32) + 1];
172 
173     static {
174         if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
175             throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
176                     + " (expected: >= " + 2 + ')');
177         }
178         int lastIndex = 0;
179         for (int i = 0; i < SIZE_CLASSES_COUNT; i++) {
180             int sizeClass = SIZE_CLASSES[i];
181             //noinspection ConstantValue
182             assert (sizeClass & 5) == 0 : "Size class must be a multiple of 32";
183             int sizeIndex = sizeIndexOf(sizeClass);
184             Arrays.fill(SIZE_INDEXES, lastIndex + 1, sizeIndex + 1, (byte) i);
185             lastIndex = sizeIndex;
186         }
187     }
188 
189     private final ChunkAllocator chunkAllocator;
190     private final Set<Chunk> chunkRegistry;
191     private final MagazineGroup[] sizeClassedMagazineGroups;
192     private final MagazineGroup largeBufferMagazineGroup;
193     private final FastThreadLocal<MagazineGroup[]> threadLocalGroup;
194 
195     AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, final boolean useCacheForNonEventLoopThreads) {
196         this.chunkAllocator = ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
197         chunkRegistry = Collections.<Chunk>newSetFromMap(PlatformDependent.<Chunk, Boolean>newConcurrentHashMap());
198         sizeClassedMagazineGroups = createMagazineGroupSizeClasses(this, false);
199         largeBufferMagazineGroup = new MagazineGroup(
200                 this, chunkAllocator, new HistogramChunkControllerFactory(true), false);
201         threadLocalGroup = new FastThreadLocal<MagazineGroup[]>() {
202             @Override
203             protected MagazineGroup[] initialValue() {
204                 if (useCacheForNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
205                     return createMagazineGroupSizeClasses(AdaptivePoolingAllocator.this, true);
206                 }
207                 return null;
208             }
209 
210             @Override
211             protected void onRemoval(final MagazineGroup[] groups) throws Exception {
212                 if (groups != null) {
213                     for (MagazineGroup group : groups) {
214                         group.free();
215                     }
216                 }
217             }
218         };
219     }
220 
221     private static MagazineGroup[] createMagazineGroupSizeClasses(
222             AdaptivePoolingAllocator allocator, boolean isThreadLocal) {
223         MagazineGroup[] groups = new MagazineGroup[SIZE_CLASSES.length];
224         for (int i = 0; i < SIZE_CLASSES.length; i++) {
225             int segmentSize = SIZE_CLASSES[i];
226             groups[i] = new MagazineGroup(allocator, allocator.chunkAllocator,
227                     new SizeClassChunkControllerFactory(segmentSize), isThreadLocal);
228         }
229         return groups;
230     }
231 
232     /**
233      * Create a thread-safe multi-producer, multi-consumer queue to hold chunks that spill over from the
234      * internal Magazines.
235      * <p>
236      * Each Magazine can only hold two chunks at any one time: the chunk it currently allocates from,
237      * and the next-in-line chunk which will be used for allocation once the current one has been used up.
238      * This queue will be used by magazines to share any excess chunks they allocate, so that they don't need to
239      * allocate new chunks when their current and next-in-line chunks have both been used up.
240      * <p>
241      * The simplest implementation of this method is to return a new {@link ConcurrentLinkedQueue}.
242      * However, the {@code CLQ} is unbounded, and this means there's no limit to how many chunks can be cached in this
243      * queue.
244      * <p>
245      * Each chunk in this queue can be up to {@link #MAX_CHUNK_SIZE} in size, so it is recommended to use a bounded
246      * queue to limit the maximum memory usage.
247      * <p>
248      * The default implementation will create a bounded queue with a capacity of {@link #CHUNK_REUSE_QUEUE}.
249      *
250      * @return A new multi-producer, multi-consumer queue.
251      */
252     private static Queue<Chunk> createSharedChunkQueue() {
253         return PlatformDependent.newFixedMpmcQueue(CHUNK_REUSE_QUEUE);
254     }
255 
256     @Override
257     public ByteBuf allocate(int size, int maxCapacity) {
258         return allocate(size, maxCapacity, Thread.currentThread(), null);
259     }
260 
261     private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
262         AdaptiveByteBuf allocated = null;
263         if (size <= MAX_POOLED_BUF_SIZE) {
264             final int index = sizeClassIndexOf(size);
265             MagazineGroup[] magazineGroups;
266             if (!FastThreadLocalThread.willCleanupFastThreadLocals(currentThread) ||
267                     (magazineGroups = threadLocalGroup.get()) == null) {
268                 magazineGroups =  sizeClassedMagazineGroups;
269             }
270             if (index < magazineGroups.length) {
271                 allocated = magazineGroups[index].allocate(size, maxCapacity, currentThread, buf);
272             } else {
273                 allocated = largeBufferMagazineGroup.allocate(size, maxCapacity, currentThread, buf);
274             }
275         }
276         if (allocated == null) {
277             allocated = allocateFallback(size, maxCapacity, currentThread, buf);
278         }
279         return allocated;
280     }
281 
282     private static int sizeIndexOf(final int size) {
283         // this is aligning the size to the next multiple of 32 and dividing by 32 to get the size index.
284         return size + 31 >> 5;
285     }
286 
287     static int sizeClassIndexOf(int size) {
288         int sizeIndex = sizeIndexOf(size);
289         if (sizeIndex < SIZE_INDEXES.length) {
290             return SIZE_INDEXES[sizeIndex];
291         }
292         return SIZE_CLASSES_COUNT;
293     }
294 
295     static int[] getSizeClasses() {
296         return SIZE_CLASSES.clone();
297     }
298 
299     private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread,
300                                              AdaptiveByteBuf buf) {
301         // If we don't already have a buffer, obtain one from the most conveniently available magazine.
302         Magazine magazine;
303         if (buf != null) {
304             Chunk chunk = buf.chunk;
305             if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
306                 magazine = getFallbackMagazine(currentThread);
307             }
308         } else {
309             magazine = getFallbackMagazine(currentThread);
310             buf = magazine.newBuffer();
311         }
312         // Create a one-off chunk for this allocation.
313         AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
314         Chunk chunk = new Chunk(innerChunk, magazine, false, CHUNK_RELEASE_ALWAYS);
315         try {
316             chunk.readInitInto(buf, size, size, maxCapacity);
317         } finally {
318             // As the chunk is an one-off we need to always call release explicitly as readInitInto(...)
319             // will take care of retain once when successful. Once The AdaptiveByteBuf is released it will
320             // completely release the Chunk and so the contained innerChunk.
321             chunk.release();
322         }
323         return buf;
324     }
325 
326     private Magazine getFallbackMagazine(Thread currentThread) {
327         Magazine[] mags = largeBufferMagazineGroup.magazines;
328         return mags[(int) currentThread.getId() & mags.length - 1];
329     }
330 
331     /**
332      * Allocate into the given buffer. Used by {@link AdaptiveByteBuf#capacity(int)}.
333      */
334     void reallocate(int size, int maxCapacity, AdaptiveByteBuf into) {
335         AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
336         assert result == into: "Re-allocation created separate buffer instance";
337     }
338 
339     @Override
340     public long usedMemory() {
341         long sum = 0;
342         for (Chunk chunk : chunkRegistry) {
343             sum += chunk.capacity();
344         }
345         return sum;
346     }
347 
348     // Ensure that we release all previous pooled resources when this object is finalized. This is needed as otherwise
349     // we might end up with leaks. While these leaks are usually harmless in reality it would still at least be
350     // very confusing for users.
351     @SuppressWarnings({"FinalizeDeclaration", "deprecation"})
352     @Override
353     protected void finalize() throws Throwable {
354         try {
355             super.finalize();
356         } finally {
357             free();
358         }
359     }
360 
361     private void free() {
362         largeBufferMagazineGroup.free();
363     }
364 
365     static int sizeToBucket(int size) {
366         return HistogramChunkController.sizeToBucket(size);
367     }
368 
369     @SuppressJava6Requirement(reason = "Guarded by version check")
370     private static final class MagazineGroup {
371         private final AdaptivePoolingAllocator allocator;
372         private final ChunkAllocator chunkAllocator;
373         private final ChunkControllerFactory chunkControllerFactory;
374         private final Queue<Chunk> chunkReuseQueue;
375         private final StampedLock magazineExpandLock;
376         private final Magazine threadLocalMagazine;
377         private volatile Magazine[] magazines;
378         private volatile boolean freed;
379 
380         MagazineGroup(AdaptivePoolingAllocator allocator,
381                       ChunkAllocator chunkAllocator,
382                       ChunkControllerFactory chunkControllerFactory,
383                       boolean isThreadLocal) {
384             this.allocator = allocator;
385             this.chunkAllocator = chunkAllocator;
386             this.chunkControllerFactory = chunkControllerFactory;
387             chunkReuseQueue = createSharedChunkQueue();
388             if (isThreadLocal) {
389                 magazineExpandLock = null;
390                 threadLocalMagazine = new Magazine(this, false, chunkReuseQueue, chunkControllerFactory.create(this));
391             } else {
392                 magazineExpandLock = new StampedLock();
393                 threadLocalMagazine = null;
394                 Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
395                 for (int i = 0; i < mags.length; i++) {
396                     mags[i] = new Magazine(this, true, chunkReuseQueue, chunkControllerFactory.create(this));
397                 }
398                 magazines = mags;
399             }
400         }
401 
402         public AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
403             boolean reallocate = buf != null;
404 
405             // Path for thread-local allocation.
406             Magazine tlMag = threadLocalMagazine;
407             if (tlMag != null) {
408                 if (buf == null) {
409                     buf = tlMag.newBuffer();
410                 }
411                 boolean allocated = tlMag.tryAllocate(size, maxCapacity, buf, reallocate);
412                 assert allocated : "Allocation of threadLocalMagazine must always succeed";
413                 return buf;
414             }
415 
416             // Path for concurrent allocation.
417             long threadId = currentThread.getId();
418             Magazine[] mags;
419             int expansions = 0;
420             do {
421                 mags = magazines;
422                 int mask = mags.length - 1;
423                 int index = (int) (threadId & mask);
424                 for (int i = 0, m = mags.length << 1; i < m; i++) {
425                     Magazine mag = mags[index + i & mask];
426                     if (buf == null) {
427                         buf = mag.newBuffer();
428                     }
429                     if (mag.tryAllocate(size, maxCapacity, buf, reallocate)) {
430                         // Was able to allocate.
431                         return buf;
432                     }
433                 }
434                 expansions++;
435             } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
436 
437             // The magazines failed us; contention too high and we don't want to spend more effort expanding the array.
438             if (!reallocate && buf != null) {
439                 buf.release(); // Release the previously claimed buffer before we return.
440             }
441             return null;
442         }
443 
444         private boolean tryExpandMagazines(int currentLength) {
445             if (currentLength >= MAX_STRIPES) {
446                 return true;
447             }
448             final Magazine[] mags;
449             long writeLock = magazineExpandLock.tryWriteLock();
450             if (writeLock != 0) {
451                 try {
452                     mags = magazines;
453                     if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
454                         return true;
455                     }
456                     Magazine firstMagazine = mags[0];
457                     Magazine[] expanded = new Magazine[mags.length * 2];
458                     for (int i = 0, l = expanded.length; i < l; i++) {
459                         Magazine m = new Magazine(this, true, chunkReuseQueue, chunkControllerFactory.create(this));
460                         firstMagazine.initializeSharedStateIn(m);
461                         expanded[i] = m;
462                     }
463                     magazines = expanded;
464                 } finally {
465                     magazineExpandLock.unlockWrite(writeLock);
466                 }
467                 for (Magazine magazine : mags) {
468                     magazine.free();
469                 }
470             }
471             return true;
472         }
473 
474         boolean offerToQueue(Chunk buffer) {
475             if (freed) {
476                 return false;
477             }
478 
479             boolean isAdded = chunkReuseQueue.offer(buffer);
480             if (freed && isAdded) {
481                 // Help to free the reuse queue.
482                 freeChunkReuseQueue();
483             }
484             return isAdded;
485         }
486 
487         private void free() {
488             freed = true;
489             if (threadLocalMagazine != null) {
490                 threadLocalMagazine.free();
491             } else {
492                 long stamp = magazineExpandLock.writeLock();
493                 try {
494                     Magazine[] mags = magazines;
495                     for (Magazine magazine : mags) {
496                         magazine.free();
497                     }
498                 } finally {
499                     magazineExpandLock.unlockWrite(stamp);
500                 }
501             }
502             freeChunkReuseQueue();
503         }
504 
505         private void freeChunkReuseQueue() {
506             for (;;) {
507                 Chunk chunk = chunkReuseQueue.poll();
508                 if (chunk == null) {
509                     break;
510                 }
511                 chunk.release();
512             }
513         }
514     }
515 
516     private interface ChunkControllerFactory {
517         ChunkController create(MagazineGroup group);
518     }
519 
520     private interface ChunkController {
521         /**
522          * Compute the "fast max capacity" value for the buffer.
523          */
524         int computeBufferCapacity(int requestedSize, int maxCapacity, boolean isReallocation);
525 
526         /**
527          * Initialize the given chunk factory with shared statistics state (if any) from this factory.
528          */
529         void initializeSharedStateIn(ChunkController chunkController);
530 
531         /**
532          * Allocate a new {@link Chunk} for the given {@link Magazine}.
533          */
534         Chunk newChunkAllocation(int promptingSize, Magazine magazine);
535     }
536 
537     private interface ChunkReleasePredicate {
538         boolean shouldReleaseChunk(int chunkSize);
539     }
540 
541     private static final class SizeClassChunkControllerFactory implements ChunkControllerFactory {
542         private final int segmentSize;
543 
544         private SizeClassChunkControllerFactory(int segmentSize) {
545             this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
546         }
547 
548         @Override
549         public ChunkController create(MagazineGroup group) {
550             return new SizeClassChunkController(group, segmentSize);
551         }
552     }
553 
554     private static final class SizeClassChunkController implements ChunkController {
555         // To amortize activation/deactivation of chunks, we should have a minimum number of segments per chunk.
556         // We choose 32 because it seems neither too small nor too big.
557         // For segments of 16 KiB, the chunks will be half a megabyte.
558         private static final int MIN_SEGMENTS_PER_CHUNK = 32;
559         private final ChunkAllocator chunkAllocator;
560         private final int segmentSize;
561         private final int chunkSize;
562         private final Set<Chunk> chunkRegistry;
563 
564         private SizeClassChunkController(MagazineGroup group, int segmentSize) {
565             chunkAllocator = group.chunkAllocator;
566             this.segmentSize = segmentSize;
567             chunkSize = Math.max(MIN_CHUNK_SIZE, segmentSize * MIN_SEGMENTS_PER_CHUNK);
568             chunkRegistry = group.allocator.chunkRegistry;
569         }
570 
571         @Override
572         public int computeBufferCapacity(
573                 int requestedSize, int maxCapacity, boolean isReallocation) {
574             return Math.min(segmentSize, maxCapacity);
575         }
576 
577         @Override
578         public void initializeSharedStateIn(ChunkController chunkController) {
579             // NOOP
580         }
581 
582         @Override
583         public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
584             SizeClassedChunk chunk = new SizeClassedChunk(chunkAllocator.allocate(chunkSize, chunkSize),
585                     magazine, true, segmentSize, CHUNK_RELEASE_NEVER);
586             chunkRegistry.add(chunk);
587             return chunk;
588         }
589     }
590 
591     private static final class HistogramChunkControllerFactory implements ChunkControllerFactory {
592         private final boolean shareable;
593 
594         private HistogramChunkControllerFactory(boolean shareable) {
595             this.shareable = shareable;
596         }
597 
598         @Override
599         public ChunkController create(MagazineGroup group) {
600             return new HistogramChunkController(group, shareable);
601         }
602     }
603 
604     private static final class HistogramChunkController implements ChunkController, ChunkReleasePredicate {
605         private static final int MIN_DATUM_TARGET = 1024;
606         private static final int MAX_DATUM_TARGET = 65534;
607         private static final int INIT_DATUM_TARGET = 9;
608         private static final int HISTO_BUCKET_COUNT = 16;
609         private static final int[] HISTO_BUCKETS = {
610                 16 * 1024,
611                 24 * 1024,
612                 32 * 1024,
613                 48 * 1024,
614                 64 * 1024,
615                 96 * 1024,
616                 128 * 1024,
617                 192 * 1024,
618                 256 * 1024,
619                 384 * 1024,
620                 512 * 1024,
621                 768 * 1024,
622                 1024 * 1024,
623                 1792 * 1024,
624                 2048 * 1024,
625                 3072 * 1024
626         };
627 
628         private final MagazineGroup group;
629         private final boolean shareable;
630         private final short[][] histos = {
631                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
632                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
633         };
634         private final Set<Chunk> chunkRegistry;
635         private short[] histo = histos[0];
636         private final int[] sums = new int[HISTO_BUCKET_COUNT];
637 
638         private int histoIndex;
639         private int datumCount;
640         private int datumTarget = INIT_DATUM_TARGET;
641         private boolean hasHadRotation;
642         private volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
643         private volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
644         private volatile int localUpperBufSize;
645 
646         private HistogramChunkController(MagazineGroup group, boolean shareable) {
647             this.group = group;
648             this.shareable = shareable;
649             chunkRegistry = group.allocator.chunkRegistry;
650         }
651 
652         @Override
653         public int computeBufferCapacity(
654                 int requestedSize, int maxCapacity, boolean isReallocation) {
655             if (!isReallocation) {
656                 // Only record allocation size if it's not caused by a reallocation that was triggered by capacity
657                 // change of the buffer.
658                 recordAllocationSize(requestedSize);
659             }
660 
661             // Predict starting capacity from localUpperBufSize, but place limits on the max starting capacity
662             // based on the requested size, because localUpperBufSize can potentially be quite large.
663             int startCapLimits;
664             if (requestedSize <= 32768) { // Less than or equal to 32 KiB.
665                 startCapLimits = 65536; // Use at most 64 KiB, which is also the AdaptiveRecvByteBufAllocator max.
666             } else {
667                 startCapLimits = requestedSize * 2; // Otherwise use at most twice the requested memory.
668             }
669             int startingCapacity = Math.min(startCapLimits, localUpperBufSize);
670             startingCapacity = Math.max(requestedSize, Math.min(maxCapacity, startingCapacity));
671             return startingCapacity;
672         }
673 
674         private void recordAllocationSize(int bufferSizeToRecord) {
675             // Use the preserved size from the reused AdaptiveByteBuf, if available.
676             // Otherwise, use the requested buffer size.
677             // This way, we better take into account
678             if (bufferSizeToRecord == 0) {
679                 return;
680             }
681             int bucket = sizeToBucket(bufferSizeToRecord);
682             histo[bucket]++;
683             if (datumCount++ == datumTarget) {
684                 rotateHistograms();
685             }
686         }
687 
688         static int sizeToBucket(int size) {
689             int index = binarySearchInsertionPoint(Arrays.binarySearch(HISTO_BUCKETS, size));
690             return index >= HISTO_BUCKETS.length ? HISTO_BUCKETS.length - 1 : index;
691         }
692 
693         private static int binarySearchInsertionPoint(int index) {
694             if (index < 0) {
695                 index = -(index + 1);
696             }
697             return index;
698         }
699 
700         static int bucketToSize(int sizeBucket) {
701             return HISTO_BUCKETS[sizeBucket];
702         }
703 
704         private void rotateHistograms() {
705             short[][] hs = histos;
706             for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
707                 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
708             }
709             int sum = 0;
710             for (int count : sums) {
711                 sum  += count;
712             }
713             int targetPercentile = (int) (sum * 0.99);
714             int sizeBucket = 0;
715             for (; sizeBucket < sums.length; sizeBucket++) {
716                 if (sums[sizeBucket] > targetPercentile) {
717                     break;
718                 }
719                 targetPercentile -= sums[sizeBucket];
720             }
721             hasHadRotation = true;
722             int percentileSize = bucketToSize(sizeBucket);
723             int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
724             localUpperBufSize = percentileSize;
725             localPrefChunkSize = prefChunkSize;
726             if (shareable) {
727                 for (Magazine mag : group.magazines) {
728                     HistogramChunkController statistics = (HistogramChunkController) mag.chunkController;
729                     prefChunkSize = Math.max(prefChunkSize, statistics.localPrefChunkSize);
730                 }
731             }
732             if (sharedPrefChunkSize != prefChunkSize) {
733                 // Preferred chunk size changed. Increase check frequency.
734                 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
735                 sharedPrefChunkSize = prefChunkSize;
736             } else {
737                 // Preferred chunk size did not change. Check less often.
738                 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
739             }
740 
741             histoIndex = histoIndex + 1 & 3;
742             histo = histos[histoIndex];
743             datumCount = 0;
744             Arrays.fill(histo, (short) 0);
745         }
746 
747         /**
748          * Get the preferred chunk size, based on statistics from the {@linkplain #recordAllocationSize(int) recorded}
749          * allocation sizes.
750          * <p>
751          * This method must be thread-safe.
752          *
753          * @return The currently preferred chunk allocation size.
754          */
755         int preferredChunkSize() {
756             return sharedPrefChunkSize;
757         }
758 
759         @Override
760         public void initializeSharedStateIn(ChunkController chunkController) {
761             HistogramChunkController statistics = (HistogramChunkController) chunkController;
762             int sharedPrefChunkSize = this.sharedPrefChunkSize;
763             statistics.localPrefChunkSize = sharedPrefChunkSize;
764             statistics.sharedPrefChunkSize = sharedPrefChunkSize;
765         }
766 
767         @Override
768         public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
769             int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
770             int minChunks = size / MIN_CHUNK_SIZE;
771             if (MIN_CHUNK_SIZE * minChunks < size) {
772                 // Round up to nearest whole MIN_CHUNK_SIZE unit. The MIN_CHUNK_SIZE is an even multiple of many
773                 // popular small page sizes, like 4k, 16k, and 64k, which makes it easier for the system allocator
774                 // to manage the memory in terms of whole pages. This reduces memory fragmentation,
775                 // but without the potentially high overhead that power-of-2 chunk sizes would bring.
776                 size = MIN_CHUNK_SIZE * (1 + minChunks);
777             }
778 
779             // Limit chunks to the max size, even if the histogram suggests to go above it.
780             size = Math.min(size, MAX_CHUNK_SIZE);
781 
782             // If we haven't rotated the histogram yet, optimisticly record this chunk size as our preferred.
783             if (!hasHadRotation && sharedPrefChunkSize == MIN_CHUNK_SIZE) {
784                 sharedPrefChunkSize = size;
785             }
786 
787             ChunkAllocator chunkAllocator = group.chunkAllocator;
788             Chunk chunk = new Chunk(chunkAllocator.allocate(size, size), magazine, true, this);
789             chunkRegistry.add(chunk);
790             return chunk;
791         }
792 
793         @Override
794         public boolean shouldReleaseChunk(int chunkSize) {
795             int preferredSize = preferredChunkSize();
796             int givenChunks = chunkSize / MIN_CHUNK_SIZE;
797             int preferredChunks = preferredSize / MIN_CHUNK_SIZE;
798             int deviation = Math.abs(givenChunks - preferredChunks);
799 
800             // Retire chunks with a 5% probability per unit of MIN_CHUNK_SIZE deviation from preference.
801             return deviation != 0 &&
802                     ThreadLocalRandom.current().nextDouble() * 20.0 < deviation;
803         }
804     }
805 
806     @SuppressJava6Requirement(reason = "Guarded by version check")
807     private static final class Magazine {
808         private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
809         static {
810             NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
811         }
812         private static final Chunk MAGAZINE_FREED = new Chunk();
813 
814         private static final ObjectPool<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
815                 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
816                     @Override
817                     public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
818                         return new AdaptiveByteBuf(handle);
819                     }
820                 });
821 
822         private Chunk current;
823         @SuppressWarnings("unused") // updated via NEXT_IN_LINE
824         private volatile Chunk nextInLine;
825         private final MagazineGroup group;
826         private final ChunkController chunkController;
827         private final AtomicLong usedMemory;
828         private final StampedLock allocationLock;
829         private final Queue<AdaptiveByteBuf> bufferQueue;
830         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
831         private final Queue<Chunk> sharedChunkQueue;
832 
833         Magazine(MagazineGroup group, boolean shareable, Queue<Chunk> sharedChunkQueue,
834                  ChunkController chunkController) {
835             this.group = group;
836             this.chunkController = chunkController;
837 
838             if (shareable) {
839                 // We only need the StampedLock if this Magazine will be shared across threads.
840                 allocationLock = new StampedLock();
841                 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
842                 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
843                     @Override
844                     public void recycle(AdaptiveByteBuf self) {
845                         bufferQueue.offer(self);
846                     }
847                 };
848             } else {
849                 allocationLock = null;
850                 bufferQueue = null;
851                 handle = null;
852             }
853             usedMemory = new AtomicLong();
854             this.sharedChunkQueue = sharedChunkQueue;
855         }
856 
857         public boolean tryAllocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
858             if (allocationLock == null) {
859                 // This magazine is not shared across threads, just allocate directly.
860                 return allocate(size, maxCapacity, buf, reallocate);
861             }
862 
863             // Try to retrieve the lock and if successful allocate.
864             long writeLock = allocationLock.tryWriteLock();
865             if (writeLock != 0) {
866                 try {
867                     return allocate(size, maxCapacity, buf, reallocate);
868                 } finally {
869                     allocationLock.unlockWrite(writeLock);
870                 }
871             }
872             return allocateWithoutLock(size, maxCapacity, buf);
873         }
874 
875         private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf buf) {
876             Chunk curr = NEXT_IN_LINE.getAndSet(this, null);
877             if (curr == MAGAZINE_FREED) {
878                 // Allocation raced with a stripe-resize that freed this magazine.
879                 restoreMagazineFreed();
880                 return false;
881             }
882             if (curr == null) {
883                 curr = sharedChunkQueue.poll();
884                 if (curr == null) {
885                     return false;
886                 }
887                 curr.attachToMagazine(this);
888             }
889             boolean allocated = false;
890             int remainingCapacity = curr.remainingCapacity();
891             int startingCapacity = chunkController.computeBufferCapacity(
892                     size, maxCapacity, true /* never update stats as we don't hold the magazine lock */);
893             if (remainingCapacity >= size) {
894                 curr.readInitInto(buf, size, Math.min(remainingCapacity, startingCapacity), maxCapacity);
895                 allocated = true;
896             }
897             try {
898                 if (remainingCapacity >= RETIRE_CAPACITY) {
899                     transferToNextInLineOrRelease(curr);
900                     curr = null;
901                 }
902             } finally {
903                 if (curr != null) {
904                     curr.releaseFromMagazine();
905                 }
906             }
907             return allocated;
908         }
909 
910         private boolean allocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
911             int startingCapacity = chunkController.computeBufferCapacity(size, maxCapacity, reallocate);
912             Chunk curr = current;
913             if (curr != null) {
914                 // We have a Chunk that has some space left.
915                 int remainingCapacity = curr.remainingCapacity();
916                 if (remainingCapacity > startingCapacity) {
917                     curr.readInitInto(buf, size, startingCapacity, maxCapacity);
918                     // We still have some bytes left that we can use for the next allocation, just early return.
919                     return true;
920                 }
921 
922                 // At this point we know that this will be the last time current will be used, so directly set it to
923                 // null and release it once we are done.
924                 current = null;
925                 if (remainingCapacity >= size) {
926                     try {
927                         curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
928                         return true;
929                     } finally {
930                         curr.releaseFromMagazine();
931                     }
932                 }
933 
934                 // Check if we either retain the chunk in the nextInLine cache or releasing it.
935                 if (remainingCapacity < RETIRE_CAPACITY) {
936                     curr.releaseFromMagazine();
937                 } else {
938                     // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
939                     // This method will release curr if this is not the case
940                     transferToNextInLineOrRelease(curr);
941                 }
942             }
943 
944             assert current == null;
945             // The fast-path for allocations did not work.
946             //
947             // Try to fetch the next "Magazine local" Chunk first, if this fails because we don't have a
948             // next-in-line chunk available, we will poll our centralQueue.
949             // If this fails as well we will just allocate a new Chunk.
950             //
951             // In any case we will store the Chunk as the current so it will be used again for the next allocation and
952             // thus be "reserved" by this Magazine for exclusive usage.
953             curr = NEXT_IN_LINE.getAndSet(this, null);
954             if (curr != null) {
955                 if (curr == MAGAZINE_FREED) {
956                     // Allocation raced with a stripe-resize that freed this magazine.
957                     restoreMagazineFreed();
958                     return false;
959                 }
960 
961                 int remainingCapacity = curr.remainingCapacity();
962                 if (remainingCapacity > startingCapacity) {
963                     // We have a Chunk that has some space left.
964                     curr.readInitInto(buf, size, startingCapacity, maxCapacity);
965                     current = curr;
966                     return true;
967                 }
968 
969                 if (remainingCapacity >= size) {
970                     // At this point we know that this will be the last time curr will be used, so directly set it to
971                     // null and release it once we are done.
972                     try {
973                         curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
974                         return true;
975                     } finally {
976                         // Release in a finally block so even if readInitInto(...) would throw we would still correctly
977                         // release the current chunk before null it out.
978                         curr.releaseFromMagazine();
979                     }
980                 } else {
981                     // Release it as it's too small.
982                     curr.releaseFromMagazine();
983                 }
984             }
985 
986             // Now try to poll from the central queue first
987             curr = sharedChunkQueue.poll();
988             if (curr == null) {
989                 curr = chunkController.newChunkAllocation(size, this);
990             } else {
991                 curr.attachToMagazine(this);
992 
993                 int remainingCapacity = curr.remainingCapacity();
994                 if (remainingCapacity < size) {
995                     // Check if we either retain the chunk in the nextInLine cache or releasing it.
996                     if (remainingCapacity < RETIRE_CAPACITY) {
997                         curr.releaseFromMagazine();
998                     } else {
999                         // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
1000                         // This method will release curr if this is not the case
1001                         transferToNextInLineOrRelease(curr);
1002                     }
1003                     curr = chunkController.newChunkAllocation(size, this);
1004                 }
1005             }
1006 
1007             current = curr;
1008             try {
1009                 int remainingCapacity = curr.remainingCapacity();
1010                 assert remainingCapacity >= size;
1011                 if (remainingCapacity > startingCapacity) {
1012                     curr.readInitInto(buf, size, startingCapacity, maxCapacity);
1013                     curr = null;
1014                 } else {
1015                     curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
1016                 }
1017             } finally {
1018                 if (curr != null) {
1019                     // Release in a finally block so even if readInitInto(...) would throw we would still correctly
1020                     // release the current chunk before null it out.
1021                     curr.releaseFromMagazine();
1022                     current = null;
1023                 }
1024             }
1025             return true;
1026         }
1027 
1028         private void restoreMagazineFreed() {
1029             Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
1030             if (next != null && next != MAGAZINE_FREED) {
1031                 // A chunk snuck in through a race. Release it after restoring MAGAZINE_FREED state.
1032                 next.releaseFromMagazine();
1033             }
1034         }
1035 
1036         private void transferToNextInLineOrRelease(Chunk chunk) {
1037             if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
1038                 return;
1039             }
1040 
1041             Chunk nextChunk = NEXT_IN_LINE.get(this);
1042             if (nextChunk != null && nextChunk != MAGAZINE_FREED
1043                     && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
1044                 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
1045                     nextChunk.releaseFromMagazine();
1046                     return;
1047                 }
1048             }
1049             // Next-in-line is occupied. We don't try to add it to the central queue yet as it might still be used
1050             // by some buffers and so is attached to a Magazine.
1051             // Once a Chunk is completely released by Chunk.release() it will try to move itself to the queue
1052             // as last resort.
1053             chunk.releaseFromMagazine();
1054         }
1055 
1056         boolean trySetNextInLine(Chunk chunk) {
1057             return NEXT_IN_LINE.compareAndSet(this, null, chunk);
1058         }
1059 
1060         void free() {
1061             // Release the current Chunk and the next that was stored for later usage.
1062             restoreMagazineFreed();
1063             long stamp = allocationLock != null ? allocationLock.writeLock() : 0;
1064             try {
1065                 if (current != null) {
1066                     current.releaseFromMagazine();
1067                     current = null;
1068                 }
1069             } finally {
1070                 if (allocationLock != null) {
1071                     allocationLock.unlockWrite(stamp);
1072                 }
1073             }
1074         }
1075 
1076         public AdaptiveByteBuf newBuffer() {
1077             AdaptiveByteBuf buf;
1078             if (handle == null) {
1079                 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
1080             } else {
1081                 buf = bufferQueue.poll();
1082                 if (buf == null) {
1083                     buf = new AdaptiveByteBuf(handle);
1084                 }
1085             }
1086             buf.resetRefCnt();
1087             buf.discardMarks();
1088             return buf;
1089         }
1090 
1091         boolean offerToQueue(Chunk chunk) {
1092             return group.offerToQueue(chunk);
1093         }
1094 
1095         public void initializeSharedStateIn(Magazine other) {
1096             chunkController.initializeSharedStateIn(other.chunkController);
1097         }
1098     }
1099 
1100     private static class Chunk implements ReferenceCounted {
1101         private static final long REFCNT_FIELD_OFFSET =
1102                 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
1103         private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
1104                 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
1105 
1106         protected final AbstractByteBuf delegate;
1107         protected Magazine magazine;
1108         private final AdaptivePoolingAllocator allocator;
1109         private final ChunkReleasePredicate chunkReleasePredicate;
1110         private final int capacity;
1111         private final boolean pooled;
1112         protected int allocatedBytes;
1113 
1114         private static final ReferenceCountUpdater<Chunk> updater =
1115                 new ReferenceCountUpdater<Chunk>() {
1116                     @Override
1117                     protected AtomicIntegerFieldUpdater<Chunk> updater() {
1118                         return AIF_UPDATER;
1119                     }
1120                     @Override
1121                     protected long unsafeOffset() {
1122                         // on native image, REFCNT_FIELD_OFFSET can be recomputed even with Unsafe unavailable, so we
1123                         // need to guard here
1124                         return PlatformDependent.hasUnsafe() ? REFCNT_FIELD_OFFSET : -1;
1125                     }
1126                 };
1127 
1128         // Value might not equal "real" reference count, all access should be via the updater
1129         @SuppressWarnings({"unused", "FieldMayBeFinal"})
1130         private volatile int refCnt;
1131 
1132         Chunk() {
1133             // Constructor only used by the MAGAZINE_FREED sentinel.
1134             delegate = null;
1135             magazine = null;
1136             allocator = null;
1137             chunkReleasePredicate = null;
1138             capacity = 0;
1139             pooled = false;
1140         }
1141 
1142         Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled,
1143               ChunkReleasePredicate chunkReleasePredicate) {
1144             this.delegate = delegate;
1145             this.pooled = pooled;
1146             capacity = delegate.capacity();
1147             updater.setInitialValue(this);
1148             attachToMagazine(magazine);
1149 
1150             // We need the top-level allocator so ByteBuf.capacity(int) can call reallocate()
1151             allocator = magazine.group.allocator;
1152 
1153             this.chunkReleasePredicate = chunkReleasePredicate;
1154         }
1155 
1156         Magazine currentMagazine()  {
1157             return magazine;
1158         }
1159 
1160         void detachFromMagazine() {
1161             if (magazine != null) {
1162                 magazine.usedMemory.getAndAdd(-capacity);
1163                 magazine = null;
1164             }
1165         }
1166 
1167         void attachToMagazine(Magazine magazine) {
1168             assert this.magazine == null;
1169             this.magazine = magazine;
1170             magazine.usedMemory.getAndAdd(capacity);
1171         }
1172 
1173         @Override
1174         public Chunk touch(Object hint) {
1175             return this;
1176         }
1177 
1178         @Override
1179         public int refCnt() {
1180             return updater.refCnt(this);
1181         }
1182 
1183         @Override
1184         public Chunk retain() {
1185             return updater.retain(this);
1186         }
1187 
1188         @Override
1189         public Chunk retain(int increment) {
1190             return updater.retain(this, increment);
1191         }
1192 
1193         @Override
1194         public Chunk touch() {
1195             return this;
1196         }
1197 
1198         @Override
1199         public boolean release() {
1200             if (updater.release(this)) {
1201                 deallocate();
1202                 return true;
1203             }
1204             return false;
1205         }
1206 
1207         @Override
1208         public boolean release(int decrement) {
1209             if (updater.release(this, decrement)) {
1210                 deallocate();
1211                 return true;
1212             }
1213             return false;
1214         }
1215 
1216         /**
1217          * Called when a magazine is done using this chunk, probably because it was emptied.
1218          */
1219         boolean releaseFromMagazine() {
1220             return release();
1221         }
1222 
1223         /**
1224          * Called when a ByteBuf is done using its allocation in this chunk.
1225          */
1226         boolean releaseSegment(int ignoredSegmentId) {
1227             return release();
1228         }
1229 
1230         private void deallocate() {
1231             Magazine mag = magazine;
1232             int chunkSize = delegate.capacity();
1233             if (!pooled || chunkReleasePredicate.shouldReleaseChunk(chunkSize) || mag == null) {
1234                 // Drop the chunk if the parent allocator is closed,
1235                 // or if the chunk deviates too much from the preferred chunk size.
1236                 detachFromMagazine();
1237                 allocator.chunkRegistry.remove(this);
1238                 delegate.release();
1239             } else {
1240                 updater.resetRefCnt(this);
1241                 delegate.setIndex(0, 0);
1242                 allocatedBytes = 0;
1243                 if (!mag.trySetNextInLine(this)) {
1244                     // As this Chunk does not belong to the mag anymore we need to decrease the used memory .
1245                     detachFromMagazine();
1246                     if (!mag.offerToQueue(this)) {
1247                         // The central queue is full. Ensure we release again as we previously did use resetRefCnt()
1248                         // which did increase the reference count by 1.
1249                         boolean released = updater.release(this);
1250                         allocator.chunkRegistry.remove(this);
1251                         delegate.release();
1252                         assert released;
1253                     }
1254                 }
1255             }
1256         }
1257 
1258         public void readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1259             int startIndex = allocatedBytes;
1260             allocatedBytes = startIndex + startingCapacity;
1261             Chunk chunk = this;
1262             chunk.retain();
1263             try {
1264                 buf.init(delegate, chunk, 0, 0, startIndex, size, startingCapacity, maxCapacity, 0);
1265                 chunk = null;
1266             } finally {
1267                 if (chunk != null) {
1268                     // If chunk is not null we know that buf.init(...) failed and so we need to manually release
1269                     // the chunk again as we retained it before calling buf.init(...). Beside this we also need to
1270                     // restore the old allocatedBytes value.
1271                     allocatedBytes = startIndex;
1272                     chunk.release();
1273                 }
1274             }
1275         }
1276 
1277         public int remainingCapacity() {
1278             return capacity - allocatedBytes;
1279         }
1280 
1281         public int capacity() {
1282             return capacity;
1283         }
1284     }
1285 
1286     private static final class SizeClassedChunk extends Chunk {
1287         private static final int FREE_LIST_EMPTY = -1;
1288         private final int segmentSize;
1289         private final MpscIntQueue freeList;
1290 
1291         SizeClassedChunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled, int segmentSize,
1292                          ChunkReleasePredicate shouldReleaseChunk) {
1293             super(delegate, magazine, pooled, shouldReleaseChunk);
1294             int capacity = delegate.capacity();
1295             this.segmentSize = segmentSize;
1296             int segmentCount = capacity / segmentSize;
1297             assert segmentCount > 0: "Chunk must have a positive number of segments";
1298             freeList = new MpscAtomicIntegerArrayQueue(segmentCount, FREE_LIST_EMPTY);
1299             freeList.fill(segmentCount, new IntSupplier() {
1300                 int counter;
1301                 @Override
1302                 public int get() {
1303                     return counter++;
1304                 }
1305             });
1306         }
1307 
1308         @Override
1309         public void readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1310             int segmentId = freeList.poll();
1311             if (segmentId == FREE_LIST_EMPTY) {
1312                 throw new IllegalStateException("Free list is empty");
1313             }
1314 
1315             int startIndex = segmentId * segmentSize;
1316             allocatedBytes += segmentSize;
1317             Chunk chunk = this;
1318             chunk.retain();
1319             try {
1320                 buf.init(delegate, chunk, 0, 0, startIndex, size, startingCapacity, maxCapacity, segmentId);
1321                 chunk = null;
1322             } finally {
1323                 if (chunk != null) {
1324                     // If chunk is not null we know that buf.init(...) failed and so we need to manually release
1325                     // the chunk again as we retained it before calling buf.init(...). Beside this we also need to
1326                     // restore the old allocatedBytes value.
1327                     allocatedBytes -= segmentSize;
1328                     chunk.releaseSegment(segmentId);
1329                 }
1330             }
1331         }
1332 
1333         @Override
1334         public int remainingCapacity() {
1335             int remainingCapacity = super.remainingCapacity();
1336             if (remainingCapacity > segmentSize) {
1337                 return remainingCapacity;
1338             }
1339             int updatedRemainingCapacity = freeList.size() * segmentSize;
1340             if (updatedRemainingCapacity == remainingCapacity) {
1341                 return remainingCapacity;
1342             }
1343             // update allocatedBytes based on what's available in the free list
1344             allocatedBytes = capacity() - updatedRemainingCapacity;
1345             return updatedRemainingCapacity;
1346         }
1347 
1348         @Override
1349         boolean releaseFromMagazine() {
1350             // Size-classed chunks can be reused before they become empty.
1351             // We can therefor put them in the shared queue as soon as the magazine is done with this chunk.
1352             Magazine mag = magazine;
1353             detachFromMagazine();
1354             if (!mag.offerToQueue(this)) {
1355                 return super.releaseFromMagazine();
1356             }
1357             return false;
1358         }
1359 
1360         @Override
1361         boolean releaseSegment(int segmentId) {
1362             boolean released = release();
1363             boolean segmentReturned = freeList.offer(segmentId);
1364             assert segmentReturned: "Unable to return segment " + segmentId + " to free list";
1365             return released;
1366         }
1367     }
1368 
1369     static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
1370 
1371         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
1372 
1373         private int adjustment;
1374         private AbstractByteBuf rootParent;
1375         Chunk chunk;
1376         private int length;
1377         private int maxFastCapacity;
1378         private int segmentId;
1379         private ByteBuffer tmpNioBuf;
1380         private boolean hasArray;
1381         private boolean hasMemoryAddress;
1382 
1383         AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
1384             super(0);
1385             handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
1386         }
1387 
1388         void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
1389                   int adjustment, int size, int capacity, int maxCapacity, int segmentId) {
1390             this.adjustment = adjustment;
1391             chunk = wrapped;
1392             length = size;
1393             maxFastCapacity = capacity;
1394             maxCapacity(maxCapacity);
1395             setIndex0(readerIndex, writerIndex);
1396             this.segmentId = segmentId;
1397             hasArray = unwrapped.hasArray();
1398             hasMemoryAddress = unwrapped.hasMemoryAddress();
1399             rootParent = unwrapped;
1400             tmpNioBuf = null;
1401         }
1402 
1403         private AbstractByteBuf rootParent() {
1404             final AbstractByteBuf rootParent = this.rootParent;
1405             if (rootParent != null) {
1406                 return rootParent;
1407             }
1408             throw new IllegalReferenceCountException();
1409         }
1410 
1411         @Override
1412         public int capacity() {
1413             return length;
1414         }
1415 
1416         @Override
1417         public int maxFastWritableBytes() {
1418             return Math.min(maxFastCapacity, maxCapacity()) - writerIndex;
1419         }
1420 
1421         @Override
1422         public ByteBuf capacity(int newCapacity) {
1423             if (length <= newCapacity && newCapacity <= maxFastCapacity) {
1424                 ensureAccessible();
1425                 length = newCapacity;
1426                 return this;
1427             }
1428             checkNewCapacity(newCapacity);
1429             if (newCapacity < capacity()) {
1430                 length = newCapacity;
1431                 trimIndicesToCapacity(newCapacity);
1432                 return this;
1433             }
1434 
1435             // Reallocation required.
1436             Chunk chunk = this.chunk;
1437             AdaptivePoolingAllocator allocator = chunk.allocator;
1438             int readerIndex = this.readerIndex;
1439             int writerIndex = this.writerIndex;
1440             int baseOldRootIndex = adjustment;
1441             int oldCapacity = length;
1442             int oldSegmentId = segmentId;
1443             AbstractByteBuf oldRoot = rootParent();
1444             allocator.reallocate(newCapacity, maxCapacity(), this);
1445             oldRoot.getBytes(baseOldRootIndex, this, 0, oldCapacity);
1446             chunk.releaseSegment(oldSegmentId);
1447             this.readerIndex = readerIndex;
1448             this.writerIndex = writerIndex;
1449             return this;
1450         }
1451 
1452         @Override
1453         public ByteBufAllocator alloc() {
1454             return rootParent().alloc();
1455         }
1456 
1457         @Override
1458         public ByteOrder order() {
1459             return rootParent().order();
1460         }
1461 
1462         @Override
1463         public ByteBuf unwrap() {
1464             return null;
1465         }
1466 
1467         @Override
1468         public boolean isDirect() {
1469             return rootParent().isDirect();
1470         }
1471 
1472         @Override
1473         public int arrayOffset() {
1474             return idx(rootParent().arrayOffset());
1475         }
1476 
1477         @Override
1478         public boolean hasMemoryAddress() {
1479             return hasMemoryAddress;
1480         }
1481 
1482         @Override
1483         public long memoryAddress() {
1484             ensureAccessible();
1485             return rootParent().memoryAddress() + adjustment;
1486         }
1487 
1488         @Override
1489         public ByteBuffer nioBuffer(int index, int length) {
1490             checkIndex(index, length);
1491             return rootParent().nioBuffer(idx(index), length);
1492         }
1493 
1494         @Override
1495         public ByteBuffer internalNioBuffer(int index, int length) {
1496             checkIndex(index, length);
1497             return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1498         }
1499 
1500         private ByteBuffer internalNioBuffer() {
1501             if (tmpNioBuf == null) {
1502                 tmpNioBuf = rootParent().nioBuffer(adjustment, maxFastCapacity);
1503             }
1504             return (ByteBuffer) tmpNioBuf.clear();
1505         }
1506 
1507         @Override
1508         public ByteBuffer[] nioBuffers(int index, int length) {
1509             checkIndex(index, length);
1510             return rootParent().nioBuffers(idx(index), length);
1511         }
1512 
1513         @Override
1514         public boolean hasArray() {
1515             return hasArray;
1516         }
1517 
1518         @Override
1519         public byte[] array() {
1520             ensureAccessible();
1521             return rootParent().array();
1522         }
1523 
1524         @Override
1525         public ByteBuf copy(int index, int length) {
1526             checkIndex(index, length);
1527             return rootParent().copy(idx(index), length);
1528         }
1529 
1530         @Override
1531         public int nioBufferCount() {
1532             return rootParent().nioBufferCount();
1533         }
1534 
1535         @Override
1536         protected byte _getByte(int index) {
1537             return rootParent()._getByte(idx(index));
1538         }
1539 
1540         @Override
1541         protected short _getShort(int index) {
1542             return rootParent()._getShort(idx(index));
1543         }
1544 
1545         @Override
1546         protected short _getShortLE(int index) {
1547             return rootParent()._getShortLE(idx(index));
1548         }
1549 
1550         @Override
1551         protected int _getUnsignedMedium(int index) {
1552             return rootParent()._getUnsignedMedium(idx(index));
1553         }
1554 
1555         @Override
1556         protected int _getUnsignedMediumLE(int index) {
1557             return rootParent()._getUnsignedMediumLE(idx(index));
1558         }
1559 
1560         @Override
1561         protected int _getInt(int index) {
1562             return rootParent()._getInt(idx(index));
1563         }
1564 
1565         @Override
1566         protected int _getIntLE(int index) {
1567             return rootParent()._getIntLE(idx(index));
1568         }
1569 
1570         @Override
1571         protected long _getLong(int index) {
1572             return rootParent()._getLong(idx(index));
1573         }
1574 
1575         @Override
1576         protected long _getLongLE(int index) {
1577             return rootParent()._getLongLE(idx(index));
1578         }
1579 
1580         @Override
1581         public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1582             checkIndex(index, length);
1583             rootParent().getBytes(idx(index), dst, dstIndex, length);
1584             return this;
1585         }
1586 
1587         @Override
1588         public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1589             checkIndex(index, length);
1590             rootParent().getBytes(idx(index), dst, dstIndex, length);
1591             return this;
1592         }
1593 
1594         @Override
1595         public ByteBuf getBytes(int index, ByteBuffer dst) {
1596             checkIndex(index, dst.remaining());
1597             rootParent().getBytes(idx(index), dst);
1598             return this;
1599         }
1600 
1601         @Override
1602         protected void _setByte(int index, int value) {
1603             rootParent()._setByte(idx(index), value);
1604         }
1605 
1606         @Override
1607         protected void _setShort(int index, int value) {
1608             rootParent()._setShort(idx(index), value);
1609         }
1610 
1611         @Override
1612         protected void _setShortLE(int index, int value) {
1613             rootParent()._setShortLE(idx(index), value);
1614         }
1615 
1616         @Override
1617         protected void _setMedium(int index, int value) {
1618             rootParent()._setMedium(idx(index), value);
1619         }
1620 
1621         @Override
1622         protected void _setMediumLE(int index, int value) {
1623             rootParent()._setMediumLE(idx(index), value);
1624         }
1625 
1626         @Override
1627         protected void _setInt(int index, int value) {
1628             rootParent()._setInt(idx(index), value);
1629         }
1630 
1631         @Override
1632         protected void _setIntLE(int index, int value) {
1633             rootParent()._setIntLE(idx(index), value);
1634         }
1635 
1636         @Override
1637         protected void _setLong(int index, long value) {
1638             rootParent()._setLong(idx(index), value);
1639         }
1640 
1641         @Override
1642         protected void _setLongLE(int index, long value) {
1643             rootParent().setLongLE(idx(index), value);
1644         }
1645 
1646         @Override
1647         public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1648             checkIndex(index, length);
1649             ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1650             tmp.put(src, srcIndex, length);
1651             return this;
1652         }
1653 
1654         @Override
1655         public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1656             checkIndex(index, length);
1657             ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1658             tmp.put(src.nioBuffer(srcIndex, length));
1659             return this;
1660         }
1661 
1662         @Override
1663         public ByteBuf setBytes(int index, ByteBuffer src) {
1664             checkIndex(index, src.remaining());
1665             ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1666             tmp.put(src);
1667             return this;
1668         }
1669 
1670         @Override
1671         public ByteBuf getBytes(int index, OutputStream out, int length)
1672                 throws IOException {
1673             checkIndex(index, length);
1674             if (length != 0) {
1675                 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1676             }
1677             return this;
1678         }
1679 
1680         @Override
1681         public int getBytes(int index, GatheringByteChannel out, int length)
1682                 throws IOException {
1683             ByteBuffer buf = internalNioBuffer().duplicate();
1684             buf.clear().position(index).limit(index + length);
1685             return out.write(buf);
1686         }
1687 
1688         @Override
1689         public int getBytes(int index, FileChannel out, long position, int length)
1690                 throws IOException {
1691             ByteBuffer buf = internalNioBuffer().duplicate();
1692             buf.clear().position(index).limit(index + length);
1693             return out.write(buf, position);
1694         }
1695 
1696         @Override
1697         public int setBytes(int index, InputStream in, int length)
1698                 throws IOException {
1699             checkIndex(index, length);
1700             final AbstractByteBuf rootParent = rootParent();
1701             if (rootParent.hasArray()) {
1702                 return rootParent.setBytes(idx(index), in, length);
1703             }
1704             byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1705             int readBytes = in.read(tmp, 0, length);
1706             if (readBytes <= 0) {
1707                 return readBytes;
1708             }
1709             setBytes(index, tmp, 0, readBytes);
1710             return readBytes;
1711         }
1712 
1713         @Override
1714         public int setBytes(int index, ScatteringByteChannel in, int length)
1715                 throws IOException {
1716             try {
1717                 return in.read(internalNioBuffer(index, length).duplicate());
1718             } catch (ClosedChannelException ignored) {
1719                 return -1;
1720             }
1721         }
1722 
1723         @Override
1724         public int setBytes(int index, FileChannel in, long position, int length)
1725                 throws IOException {
1726             try {
1727                 return in.read(internalNioBuffer(index, length).duplicate(), position);
1728             } catch (ClosedChannelException ignored) {
1729                 return -1;
1730             }
1731         }
1732 
1733         @Override
1734         public int setCharSequence(int index, CharSequence sequence, Charset charset) {
1735             return setCharSequence0(index, sequence, charset, false);
1736         }
1737 
1738         private int setCharSequence0(int index, CharSequence sequence, Charset charset, boolean expand) {
1739             if (charset.equals(CharsetUtil.UTF_8)) {
1740                 int length = ByteBufUtil.utf8MaxBytes(sequence);
1741                 if (expand) {
1742                     ensureWritable0(length);
1743                     checkIndex0(index, length);
1744                 } else {
1745                     checkIndex(index, length);
1746                 }
1747                 // Directly pass in the rootParent() with the adjusted index
1748                 return ByteBufUtil.writeUtf8(rootParent(), idx(index), length, sequence, sequence.length());
1749             }
1750             if (charset.equals(CharsetUtil.US_ASCII) || charset.equals(CharsetUtil.ISO_8859_1)) {
1751                 int length = sequence.length();
1752                 if (expand) {
1753                     ensureWritable0(length);
1754                     checkIndex0(index, length);
1755                 } else {
1756                     checkIndex(index, length);
1757                 }
1758                 // Directly pass in the rootParent() with the adjusted index
1759                 return ByteBufUtil.writeAscii(rootParent(), idx(index), sequence, length);
1760             }
1761             byte[] bytes = sequence.toString().getBytes(charset);
1762             if (expand) {
1763                 ensureWritable0(bytes.length);
1764                 // setBytes(...) will take care of checking the indices.
1765             }
1766             setBytes(index, bytes);
1767             return bytes.length;
1768         }
1769 
1770         @Override
1771         public int writeCharSequence(CharSequence sequence, Charset charset) {
1772             int written = setCharSequence0(writerIndex, sequence, charset, true);
1773             writerIndex += written;
1774             return written;
1775         }
1776 
1777         @Override
1778         public int forEachByte(int index, int length, ByteProcessor processor) {
1779             checkIndex(index, length);
1780             int ret = rootParent().forEachByte(idx(index), length, processor);
1781             return forEachResult(ret);
1782         }
1783 
1784         @Override
1785         public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1786             checkIndex(index, length);
1787             int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1788             return forEachResult(ret);
1789         }
1790 
1791         @Override
1792         public ByteBuf setZero(int index, int length) {
1793             checkIndex(index, length);
1794             rootParent().setZero(idx(index), length);
1795             return this;
1796         }
1797 
1798         @Override
1799         public ByteBuf writeZero(int length) {
1800             ensureWritable(length);
1801             rootParent().setZero(idx(writerIndex), length);
1802             writerIndex += length;
1803             return this;
1804         }
1805 
1806         private int forEachResult(int ret) {
1807             if (ret < adjustment) {
1808                 return -1;
1809             }
1810             return ret - adjustment;
1811         }
1812 
1813         @Override
1814         public boolean isContiguous() {
1815             return rootParent().isContiguous();
1816         }
1817 
1818         private int idx(int index) {
1819             return index + adjustment;
1820         }
1821 
1822         @Override
1823         protected void deallocate() {
1824             if (chunk != null) {
1825                 chunk.releaseSegment(segmentId);
1826             }
1827             tmpNioBuf = null;
1828             chunk = null;
1829             rootParent = null;
1830             if (handle instanceof EnhancedHandle) {
1831                 EnhancedHandle<AdaptiveByteBuf>  enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1832                 enhancedHandle.unguardedRecycle(this);
1833             } else {
1834                 handle.recycle(this);
1835             }
1836         }
1837     }
1838 
1839     /**
1840      * The strategy for how {@link AdaptivePoolingAllocator} should allocate chunk buffers.
1841      */
1842     interface ChunkAllocator {
1843         /**
1844          * Allocate a buffer for a chunk. This can be any kind of {@link AbstractByteBuf} implementation.
1845          * @param initialCapacity The initial capacity of the returned {@link AbstractByteBuf}.
1846          * @param maxCapacity The maximum capacity of the returned {@link AbstractByteBuf}.
1847          * @return The buffer that represents the chunk memory.
1848          */
1849         AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1850     }
1851 }