View Javadoc
1   /*
2    * Copyright 2022 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.buffer;
17  
18  import io.netty.util.ByteProcessor;
19  import io.netty.util.CharsetUtil;
20  import io.netty.util.IllegalReferenceCountException;
21  import io.netty.util.IntSupplier;
22  import io.netty.util.NettyRuntime;
23  import io.netty.util.Recycler.EnhancedHandle;
24  import io.netty.util.ReferenceCounted;
25  import io.netty.util.concurrent.FastThreadLocal;
26  import io.netty.util.concurrent.FastThreadLocalThread;
27  import io.netty.util.concurrent.MpscAtomicIntegerArrayQueue;
28  import io.netty.util.concurrent.MpscIntQueue;
29  import io.netty.util.internal.ObjectPool;
30  import io.netty.util.internal.ObjectUtil;
31  import io.netty.util.internal.PlatformDependent;
32  import io.netty.util.internal.ReferenceCountUpdater;
33  import io.netty.util.internal.SuppressJava6Requirement;
34  import io.netty.util.internal.SystemPropertyUtil;
35  import io.netty.util.internal.ThreadExecutorMap;
36  import io.netty.util.internal.ThreadLocalRandom;
37  import io.netty.util.internal.UnstableApi;
38  
39  import java.io.IOException;
40  import java.io.InputStream;
41  import java.io.OutputStream;
42  import java.nio.ByteBuffer;
43  import java.nio.ByteOrder;
44  import java.nio.channels.ClosedChannelException;
45  import java.nio.channels.FileChannel;
46  import java.nio.channels.GatheringByteChannel;
47  import java.nio.channels.ScatteringByteChannel;
48  import java.nio.charset.Charset;
49  import java.util.Arrays;
50  import java.util.Queue;
51  import java.util.concurrent.ConcurrentLinkedQueue;
52  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
53  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
54  import java.util.concurrent.atomic.LongAdder;
55  import java.util.concurrent.locks.StampedLock;
56  
57  /**
58   * An auto-tuning pooling allocator, that follows an anti-generational hypothesis.
59   * <p>
60   * The allocator is organized into a list of Magazines, and each magazine has a chunk-buffer that they allocate buffers
61   * from.
62   * <p>
63   * The magazines hold the mutexes that ensure the thread-safety of the allocator, and each thread picks a magazine
64   * based on the id of the thread. This spreads the contention of multi-threaded access across the magazines.
65   * If contention is detected above a certain threshold, the number of magazines are increased in response to the
66   * contention.
67   * <p>
68   * The magazines maintain histograms of the sizes of the allocations they do. The histograms are used to compute the
69   * preferred chunk size. The preferred chunk size is one that is big enough to service 10 allocations of the
70   * 99-percentile size. This way, the chunk size is adapted to the allocation patterns.
71   * <p>
72   * Computing the preferred chunk size is a somewhat expensive operation. Therefore, the frequency with which this is
73   * done, is also adapted to the allocation pattern. If a newly computed preferred chunk is the same as the previous
74   * preferred chunk size, then the frequency is reduced. Otherwise, the frequency is increased.
75   * <p>
76   * This allows the allocator to quickly respond to changes in the application workload,
77   * without suffering undue overhead from maintaining its statistics.
78   * <p>
79   * Since magazines are "relatively thread-local", the allocator has a central queue that allow excess chunks from any
80   * magazine, to be shared with other magazines.
81   * The {@link #createSharedChunkQueue()} method can be overridden to customize this queue.
82   */
83  @SuppressJava6Requirement(reason = "Guarded by version check")
84  @UnstableApi
85  final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.AdaptiveAllocatorApi {
86      /**
87       * The 128 KiB minimum chunk size is chosen to encourage the system allocator to delegate to mmap for chunk
88       * allocations. For instance, glibc will do this.
89       * This pushes any fragmentation from chunk size deviations off physical memory, onto virtual memory,
90       * which is a much, much larger space. Chunks are also allocated in whole multiples of the minimum
91       * chunk size, which itself is a whole multiple of popular page sizes like 4 KiB, 16 KiB, and 64 KiB.
92       */
93      private static final int MIN_CHUNK_SIZE = 128 * 1024;
94      private static final int EXPANSION_ATTEMPTS = 3;
95      private static final int INITIAL_MAGAZINES = 1;
96      private static final int RETIRE_CAPACITY = 256;
97      private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
98      private static final int BUFS_PER_CHUNK = 8; // For large buffers, aim to have about this many buffers per chunk.
99  
100     /**
101      * The maximum size of a pooled chunk, in bytes. Allocations bigger than this will never be pooled.
102      * <p>
103      * This number is 8 MiB, and is derived from the limitations of internal histograms.
104      */
105     private static final int MAX_CHUNK_SIZE = 8 * 1024 * 1024; // 8 MiB.
106     private static final int MAX_POOLED_BUF_SIZE = MAX_CHUNK_SIZE / BUFS_PER_CHUNK;
107 
108     /**
109      * The capacity if the chunk reuse queues, that allow chunks to be shared across magazines in a group.
110      * The default size is twice {@link NettyRuntime#availableProcessors()},
111      * same as the maximum number of magazines per magazine group.
112      */
113     private static final int CHUNK_REUSE_QUEUE = Math.max(2, SystemPropertyUtil.getInt(
114             "io.netty.allocator.chunkReuseQueueCapacity", NettyRuntime.availableProcessors() * 2));
115 
116     /**
117      * The capacity if the magazine local buffer queue. This queue just pools the outer ByteBuf instance and not
118      * the actual memory and so helps to reduce GC pressure.
119      */
120     private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
121             "io.netty.allocator.magazineBufferQueueCapacity", 1024);
122 
123     /**
124      * The size classes are chosen based on the following observation:
125      * <p>
126      * Most allocations, particularly ones above 256 bytes, aim to be a power-of-2. However, many use cases, such
127      * as framing protocols, are themselves operating or moving power-of-2 sized payloads, to which they add a
128      * small amount of overhead, such as headers or checksums.
129      * This means we seem to get a lot of mileage out of having both power-of-2 sizes, and power-of-2-plus-a-bit.
130      * <p>
131      * On the conflicting requirements of both having as few chunks as possible, and having as little wasted
132      * memory within each chunk as possible, this seems to strike a surprisingly good balance for the use cases
133      * tested so far.
134      */
135     private static final int[] SIZE_CLASSES = {
136             32,
137             64,
138             128,
139             256,
140             512,
141             640, // 512 + 128
142             1024,
143             1152, // 1024 + 128
144             2048,
145             2304, // 2048 + 256
146             4096,
147             4352, // 4096 + 256
148             8192,
149             8704, // 8192 + 512
150             16384,
151             16896, // 16384 + 512
152     };
153     private static final ChunkReleasePredicate CHUNK_RELEASE_ALWAYS = new ChunkReleasePredicate() {
154         @Override
155         public boolean shouldReleaseChunk(int chunkSize) {
156             return true;
157         }
158     };
159     private static final ChunkReleasePredicate CHUNK_RELEASE_NEVER = new ChunkReleasePredicate() {
160         @Override
161         public boolean shouldReleaseChunk(int chunkSize) {
162             return false;
163         }
164     };
165 
166     private static final int SIZE_CLASSES_COUNT = SIZE_CLASSES.length;
167     private static final byte[] SIZE_INDEXES = new byte[(SIZE_CLASSES[SIZE_CLASSES_COUNT - 1] / 32) + 1];
168 
169     static {
170         if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
171             throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
172                     + " (expected: >= " + 2 + ')');
173         }
174         int lastIndex = 0;
175         for (int i = 0; i < SIZE_CLASSES_COUNT; i++) {
176             int sizeClass = SIZE_CLASSES[i];
177             //noinspection ConstantValue
178             assert (sizeClass & 5) == 0 : "Size class must be a multiple of 32";
179             int sizeIndex = sizeIndexOf(sizeClass);
180             Arrays.fill(SIZE_INDEXES, lastIndex + 1, sizeIndex + 1, (byte) i);
181             lastIndex = sizeIndex;
182         }
183     }
184 
185     private final ChunkAllocator chunkAllocator;
186     private final ChunkRegistry chunkRegistry;
187     private final MagazineGroup[] sizeClassedMagazineGroups;
188     private final MagazineGroup largeBufferMagazineGroup;
189     private final FastThreadLocal<MagazineGroup[]> threadLocalGroup;
190 
191     AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, final boolean useCacheForNonEventLoopThreads) {
192         this.chunkAllocator = ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
193         chunkRegistry = new ChunkRegistry();
194         sizeClassedMagazineGroups = createMagazineGroupSizeClasses(this, false);
195         largeBufferMagazineGroup = new MagazineGroup(
196                 this, chunkAllocator, new HistogramChunkControllerFactory(true), false);
197         threadLocalGroup = new FastThreadLocal<MagazineGroup[]>() {
198             @Override
199             protected MagazineGroup[] initialValue() {
200                 if (useCacheForNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
201                     return createMagazineGroupSizeClasses(AdaptivePoolingAllocator.this, true);
202                 }
203                 return null;
204             }
205 
206             @Override
207             protected void onRemoval(final MagazineGroup[] groups) throws Exception {
208                 if (groups != null) {
209                     for (MagazineGroup group : groups) {
210                         group.free();
211                     }
212                 }
213             }
214         };
215     }
216 
217     private static MagazineGroup[] createMagazineGroupSizeClasses(
218             AdaptivePoolingAllocator allocator, boolean isThreadLocal) {
219         MagazineGroup[] groups = new MagazineGroup[SIZE_CLASSES.length];
220         for (int i = 0; i < SIZE_CLASSES.length; i++) {
221             int segmentSize = SIZE_CLASSES[i];
222             groups[i] = new MagazineGroup(allocator, allocator.chunkAllocator,
223                     new SizeClassChunkControllerFactory(segmentSize), isThreadLocal);
224         }
225         return groups;
226     }
227 
228     /**
229      * Create a thread-safe multi-producer, multi-consumer queue to hold chunks that spill over from the
230      * internal Magazines.
231      * <p>
232      * Each Magazine can only hold two chunks at any one time: the chunk it currently allocates from,
233      * and the next-in-line chunk which will be used for allocation once the current one has been used up.
234      * This queue will be used by magazines to share any excess chunks they allocate, so that they don't need to
235      * allocate new chunks when their current and next-in-line chunks have both been used up.
236      * <p>
237      * The simplest implementation of this method is to return a new {@link ConcurrentLinkedQueue}.
238      * However, the {@code CLQ} is unbounded, and this means there's no limit to how many chunks can be cached in this
239      * queue.
240      * <p>
241      * Each chunk in this queue can be up to {@link #MAX_CHUNK_SIZE} in size, so it is recommended to use a bounded
242      * queue to limit the maximum memory usage.
243      * <p>
244      * The default implementation will create a bounded queue with a capacity of {@link #CHUNK_REUSE_QUEUE}.
245      *
246      * @return A new multi-producer, multi-consumer queue.
247      */
248     private static Queue<Chunk> createSharedChunkQueue() {
249         return PlatformDependent.newFixedMpmcQueue(CHUNK_REUSE_QUEUE);
250     }
251 
252     @Override
253     public ByteBuf allocate(int size, int maxCapacity) {
254         return allocate(size, maxCapacity, Thread.currentThread(), null);
255     }
256 
257     private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
258         AdaptiveByteBuf allocated = null;
259         if (size <= MAX_POOLED_BUF_SIZE) {
260             final int index = sizeClassIndexOf(size);
261             MagazineGroup[] magazineGroups;
262             if (!FastThreadLocalThread.willCleanupFastThreadLocals(currentThread) ||
263                     (magazineGroups = threadLocalGroup.get()) == null) {
264                 magazineGroups =  sizeClassedMagazineGroups;
265             }
266             if (index < magazineGroups.length) {
267                 allocated = magazineGroups[index].allocate(size, maxCapacity, currentThread, buf);
268             } else {
269                 allocated = largeBufferMagazineGroup.allocate(size, maxCapacity, currentThread, buf);
270             }
271         }
272         if (allocated == null) {
273             allocated = allocateFallback(size, maxCapacity, currentThread, buf);
274         }
275         return allocated;
276     }
277 
278     private static int sizeIndexOf(final int size) {
279         // this is aligning the size to the next multiple of 32 and dividing by 32 to get the size index.
280         return size + 31 >> 5;
281     }
282 
283     static int sizeClassIndexOf(int size) {
284         int sizeIndex = sizeIndexOf(size);
285         if (sizeIndex < SIZE_INDEXES.length) {
286             return SIZE_INDEXES[sizeIndex];
287         }
288         return SIZE_CLASSES_COUNT;
289     }
290 
291     static int[] getSizeClasses() {
292         return SIZE_CLASSES.clone();
293     }
294 
295     private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread,
296                                              AdaptiveByteBuf buf) {
297         // If we don't already have a buffer, obtain one from the most conveniently available magazine.
298         Magazine magazine;
299         if (buf != null) {
300             Chunk chunk = buf.chunk;
301             if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
302                 magazine = getFallbackMagazine(currentThread);
303             }
304         } else {
305             magazine = getFallbackMagazine(currentThread);
306             buf = magazine.newBuffer();
307         }
308         // Create a one-off chunk for this allocation.
309         AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
310         Chunk chunk = new Chunk(innerChunk, magazine, false, CHUNK_RELEASE_ALWAYS);
311         chunkRegistry.add(chunk);
312         try {
313             chunk.readInitInto(buf, size, size, maxCapacity);
314         } finally {
315             // As the chunk is an one-off we need to always call release explicitly as readInitInto(...)
316             // will take care of retain once when successful. Once The AdaptiveByteBuf is released it will
317             // completely release the Chunk and so the contained innerChunk.
318             chunk.release();
319         }
320         return buf;
321     }
322 
323     private Magazine getFallbackMagazine(Thread currentThread) {
324         Magazine[] mags = largeBufferMagazineGroup.magazines;
325         return mags[(int) currentThread.getId() & mags.length - 1];
326     }
327 
328     /**
329      * Allocate into the given buffer. Used by {@link AdaptiveByteBuf#capacity(int)}.
330      */
331     void reallocate(int size, int maxCapacity, AdaptiveByteBuf into) {
332         AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
333         assert result == into: "Re-allocation created separate buffer instance";
334     }
335 
336     @Override
337     public long usedMemory() {
338         return chunkRegistry.totalCapacity();
339     }
340 
341     // Ensure that we release all previous pooled resources when this object is finalized. This is needed as otherwise
342     // we might end up with leaks. While these leaks are usually harmless in reality it would still at least be
343     // very confusing for users.
344     @SuppressWarnings({"FinalizeDeclaration", "deprecation"})
345     @Override
346     protected void finalize() throws Throwable {
347         try {
348             super.finalize();
349         } finally {
350             free();
351         }
352     }
353 
354     private void free() {
355         largeBufferMagazineGroup.free();
356     }
357 
358     static int sizeToBucket(int size) {
359         return HistogramChunkController.sizeToBucket(size);
360     }
361 
362     @SuppressJava6Requirement(reason = "Guarded by version check")
363     private static final class MagazineGroup {
364         private final AdaptivePoolingAllocator allocator;
365         private final ChunkAllocator chunkAllocator;
366         private final ChunkControllerFactory chunkControllerFactory;
367         private final Queue<Chunk> chunkReuseQueue;
368         private final StampedLock magazineExpandLock;
369         private final Magazine threadLocalMagazine;
370         private volatile Magazine[] magazines;
371         private volatile boolean freed;
372 
373         MagazineGroup(AdaptivePoolingAllocator allocator,
374                       ChunkAllocator chunkAllocator,
375                       ChunkControllerFactory chunkControllerFactory,
376                       boolean isThreadLocal) {
377             this.allocator = allocator;
378             this.chunkAllocator = chunkAllocator;
379             this.chunkControllerFactory = chunkControllerFactory;
380             chunkReuseQueue = createSharedChunkQueue();
381             if (isThreadLocal) {
382                 magazineExpandLock = null;
383                 threadLocalMagazine = new Magazine(this, false, chunkReuseQueue, chunkControllerFactory.create(this));
384             } else {
385                 magazineExpandLock = new StampedLock();
386                 threadLocalMagazine = null;
387                 Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
388                 for (int i = 0; i < mags.length; i++) {
389                     mags[i] = new Magazine(this, true, chunkReuseQueue, chunkControllerFactory.create(this));
390                 }
391                 magazines = mags;
392             }
393         }
394 
395         public AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
396             boolean reallocate = buf != null;
397 
398             // Path for thread-local allocation.
399             Magazine tlMag = threadLocalMagazine;
400             if (tlMag != null) {
401                 if (buf == null) {
402                     buf = tlMag.newBuffer();
403                 }
404                 boolean allocated = tlMag.tryAllocate(size, maxCapacity, buf, reallocate);
405                 assert allocated : "Allocation of threadLocalMagazine must always succeed";
406                 return buf;
407             }
408 
409             // Path for concurrent allocation.
410             long threadId = currentThread.getId();
411             Magazine[] mags;
412             int expansions = 0;
413             do {
414                 mags = magazines;
415                 int mask = mags.length - 1;
416                 int index = (int) (threadId & mask);
417                 for (int i = 0, m = mags.length << 1; i < m; i++) {
418                     Magazine mag = mags[index + i & mask];
419                     if (buf == null) {
420                         buf = mag.newBuffer();
421                     }
422                     if (mag.tryAllocate(size, maxCapacity, buf, reallocate)) {
423                         // Was able to allocate.
424                         return buf;
425                     }
426                 }
427                 expansions++;
428             } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
429 
430             // The magazines failed us; contention too high and we don't want to spend more effort expanding the array.
431             if (!reallocate && buf != null) {
432                 buf.release(); // Release the previously claimed buffer before we return.
433             }
434             return null;
435         }
436 
437         private boolean tryExpandMagazines(int currentLength) {
438             if (currentLength >= MAX_STRIPES) {
439                 return true;
440             }
441             final Magazine[] mags;
442             long writeLock = magazineExpandLock.tryWriteLock();
443             if (writeLock != 0) {
444                 try {
445                     mags = magazines;
446                     if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
447                         return true;
448                     }
449                     Magazine firstMagazine = mags[0];
450                     Magazine[] expanded = new Magazine[mags.length * 2];
451                     for (int i = 0, l = expanded.length; i < l; i++) {
452                         Magazine m = new Magazine(this, true, chunkReuseQueue, chunkControllerFactory.create(this));
453                         firstMagazine.initializeSharedStateIn(m);
454                         expanded[i] = m;
455                     }
456                     magazines = expanded;
457                 } finally {
458                     magazineExpandLock.unlockWrite(writeLock);
459                 }
460                 for (Magazine magazine : mags) {
461                     magazine.free();
462                 }
463             }
464             return true;
465         }
466 
467         boolean offerToQueue(Chunk buffer) {
468             if (freed) {
469                 return false;
470             }
471 
472             boolean isAdded = chunkReuseQueue.offer(buffer);
473             if (freed && isAdded) {
474                 // Help to free the reuse queue.
475                 freeChunkReuseQueue();
476             }
477             return isAdded;
478         }
479 
480         private void free() {
481             freed = true;
482             if (threadLocalMagazine != null) {
483                 threadLocalMagazine.free();
484             } else {
485                 long stamp = magazineExpandLock.writeLock();
486                 try {
487                     Magazine[] mags = magazines;
488                     for (Magazine magazine : mags) {
489                         magazine.free();
490                     }
491                 } finally {
492                     magazineExpandLock.unlockWrite(stamp);
493                 }
494             }
495             freeChunkReuseQueue();
496         }
497 
498         private void freeChunkReuseQueue() {
499             for (;;) {
500                 Chunk chunk = chunkReuseQueue.poll();
501                 if (chunk == null) {
502                     break;
503                 }
504                 chunk.release();
505             }
506         }
507     }
508 
509     private interface ChunkControllerFactory {
510         ChunkController create(MagazineGroup group);
511     }
512 
513     private interface ChunkController {
514         /**
515          * Compute the "fast max capacity" value for the buffer.
516          */
517         int computeBufferCapacity(int requestedSize, int maxCapacity, boolean isReallocation);
518 
519         /**
520          * Initialize the given chunk factory with shared statistics state (if any) from this factory.
521          */
522         void initializeSharedStateIn(ChunkController chunkController);
523 
524         /**
525          * Allocate a new {@link Chunk} for the given {@link Magazine}.
526          */
527         Chunk newChunkAllocation(int promptingSize, Magazine magazine);
528     }
529 
530     private interface ChunkReleasePredicate {
531         boolean shouldReleaseChunk(int chunkSize);
532     }
533 
534     private static final class SizeClassChunkControllerFactory implements ChunkControllerFactory {
535         // To amortize activation/deactivation of chunks, we should have a minimum number of segments per chunk.
536         // We choose 32 because it seems neither too small nor too big.
537         // For segments of 16 KiB, the chunks will be half a megabyte.
538         private static final int MIN_SEGMENTS_PER_CHUNK = 32;
539         private final int segmentSize;
540         private final int chunkSize;
541         private final int[] segmentOffsets;
542 
543         private SizeClassChunkControllerFactory(int segmentSize) {
544             this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
545             chunkSize = Math.max(MIN_CHUNK_SIZE, segmentSize * MIN_SEGMENTS_PER_CHUNK);
546             int segmentsCount = chunkSize / segmentSize;
547             segmentOffsets = new int[segmentsCount];
548             for (int i = 0; i < segmentsCount; i++) {
549                 segmentOffsets[i] = i * segmentSize;
550             }
551         }
552 
553         @Override
554         public ChunkController create(MagazineGroup group) {
555             return new SizeClassChunkController(group, segmentSize, chunkSize, segmentOffsets);
556         }
557     }
558 
559     private static final class SizeClassChunkController implements ChunkController {
560 
561         private static final ChunkReleasePredicate FALSE_PREDICATE = new ChunkReleasePredicate() {
562             @Override
563             public boolean shouldReleaseChunk(int chunkSize) {
564                 return false;
565             }
566         };
567         private final ChunkAllocator chunkAllocator;
568         private final int segmentSize;
569         private final int chunkSize;
570         private final ChunkRegistry chunkRegistry;
571         private final int[] segmentOffsets;
572 
573         private SizeClassChunkController(MagazineGroup group, int segmentSize, int chunkSize, int[] segmentOffsets) {
574             chunkAllocator = group.chunkAllocator;
575             this.segmentSize = segmentSize;
576             this.chunkSize = chunkSize;
577             chunkRegistry = group.allocator.chunkRegistry;
578             this.segmentOffsets = segmentOffsets;
579         }
580 
581         @Override
582         public int computeBufferCapacity(
583                 int requestedSize, int maxCapacity, boolean isReallocation) {
584             return Math.min(segmentSize, maxCapacity);
585         }
586 
587         @Override
588         public void initializeSharedStateIn(ChunkController chunkController) {
589             // NOOP
590         }
591 
592         @Override
593         public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
594             AbstractByteBuf chunkBuffer = chunkAllocator.allocate(chunkSize, chunkSize);
595             assert chunkBuffer.capacity() == chunkSize;
596             SizeClassedChunk chunk = new SizeClassedChunk(chunkBuffer, magazine, true,
597                     segmentSize, segmentOffsets, FALSE_PREDICATE);
598             chunkRegistry.add(chunk);
599             return chunk;
600         }
601     }
602 
603     private static final class HistogramChunkControllerFactory implements ChunkControllerFactory {
604         private final boolean shareable;
605 
606         private HistogramChunkControllerFactory(boolean shareable) {
607             this.shareable = shareable;
608         }
609 
610         @Override
611         public ChunkController create(MagazineGroup group) {
612             return new HistogramChunkController(group, shareable);
613         }
614     }
615 
616     private static final class HistogramChunkController implements ChunkController, ChunkReleasePredicate {
617         private static final int MIN_DATUM_TARGET = 1024;
618         private static final int MAX_DATUM_TARGET = 65534;
619         private static final int INIT_DATUM_TARGET = 9;
620         private static final int HISTO_BUCKET_COUNT = 16;
621         private static final int[] HISTO_BUCKETS = {
622                 16 * 1024,
623                 24 * 1024,
624                 32 * 1024,
625                 48 * 1024,
626                 64 * 1024,
627                 96 * 1024,
628                 128 * 1024,
629                 192 * 1024,
630                 256 * 1024,
631                 384 * 1024,
632                 512 * 1024,
633                 768 * 1024,
634                 1024 * 1024,
635                 1792 * 1024,
636                 2048 * 1024,
637                 3072 * 1024
638         };
639 
640         private final MagazineGroup group;
641         private final boolean shareable;
642         private final short[][] histos = {
643                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
644                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
645         };
646         private final ChunkRegistry chunkRegistry;
647         private short[] histo = histos[0];
648         private final int[] sums = new int[HISTO_BUCKET_COUNT];
649 
650         private int histoIndex;
651         private int datumCount;
652         private int datumTarget = INIT_DATUM_TARGET;
653         private boolean hasHadRotation;
654         private volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
655         private volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
656         private volatile int localUpperBufSize;
657 
658         private HistogramChunkController(MagazineGroup group, boolean shareable) {
659             this.group = group;
660             this.shareable = shareable;
661             chunkRegistry = group.allocator.chunkRegistry;
662         }
663 
664         @Override
665         public int computeBufferCapacity(
666                 int requestedSize, int maxCapacity, boolean isReallocation) {
667             if (!isReallocation) {
668                 // Only record allocation size if it's not caused by a reallocation that was triggered by capacity
669                 // change of the buffer.
670                 recordAllocationSize(requestedSize);
671             }
672 
673             // Predict starting capacity from localUpperBufSize, but place limits on the max starting capacity
674             // based on the requested size, because localUpperBufSize can potentially be quite large.
675             int startCapLimits;
676             if (requestedSize <= 32768) { // Less than or equal to 32 KiB.
677                 startCapLimits = 65536; // Use at most 64 KiB, which is also the AdaptiveRecvByteBufAllocator max.
678             } else {
679                 startCapLimits = requestedSize * 2; // Otherwise use at most twice the requested memory.
680             }
681             int startingCapacity = Math.min(startCapLimits, localUpperBufSize);
682             startingCapacity = Math.max(requestedSize, Math.min(maxCapacity, startingCapacity));
683             return startingCapacity;
684         }
685 
686         private void recordAllocationSize(int bufferSizeToRecord) {
687             // Use the preserved size from the reused AdaptiveByteBuf, if available.
688             // Otherwise, use the requested buffer size.
689             // This way, we better take into account
690             if (bufferSizeToRecord == 0) {
691                 return;
692             }
693             int bucket = sizeToBucket(bufferSizeToRecord);
694             histo[bucket]++;
695             if (datumCount++ == datumTarget) {
696                 rotateHistograms();
697             }
698         }
699 
700         static int sizeToBucket(int size) {
701             int index = binarySearchInsertionPoint(Arrays.binarySearch(HISTO_BUCKETS, size));
702             return index >= HISTO_BUCKETS.length ? HISTO_BUCKETS.length - 1 : index;
703         }
704 
705         private static int binarySearchInsertionPoint(int index) {
706             if (index < 0) {
707                 index = -(index + 1);
708             }
709             return index;
710         }
711 
712         static int bucketToSize(int sizeBucket) {
713             return HISTO_BUCKETS[sizeBucket];
714         }
715 
716         private void rotateHistograms() {
717             short[][] hs = histos;
718             for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
719                 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
720             }
721             int sum = 0;
722             for (int count : sums) {
723                 sum  += count;
724             }
725             int targetPercentile = (int) (sum * 0.99);
726             int sizeBucket = 0;
727             for (; sizeBucket < sums.length; sizeBucket++) {
728                 if (sums[sizeBucket] > targetPercentile) {
729                     break;
730                 }
731                 targetPercentile -= sums[sizeBucket];
732             }
733             hasHadRotation = true;
734             int percentileSize = bucketToSize(sizeBucket);
735             int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
736             localUpperBufSize = percentileSize;
737             localPrefChunkSize = prefChunkSize;
738             if (shareable) {
739                 for (Magazine mag : group.magazines) {
740                     HistogramChunkController statistics = (HistogramChunkController) mag.chunkController;
741                     prefChunkSize = Math.max(prefChunkSize, statistics.localPrefChunkSize);
742                 }
743             }
744             if (sharedPrefChunkSize != prefChunkSize) {
745                 // Preferred chunk size changed. Increase check frequency.
746                 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
747                 sharedPrefChunkSize = prefChunkSize;
748             } else {
749                 // Preferred chunk size did not change. Check less often.
750                 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
751             }
752 
753             histoIndex = histoIndex + 1 & 3;
754             histo = histos[histoIndex];
755             datumCount = 0;
756             Arrays.fill(histo, (short) 0);
757         }
758 
759         /**
760          * Get the preferred chunk size, based on statistics from the {@linkplain #recordAllocationSize(int) recorded}
761          * allocation sizes.
762          * <p>
763          * This method must be thread-safe.
764          *
765          * @return The currently preferred chunk allocation size.
766          */
767         int preferredChunkSize() {
768             return sharedPrefChunkSize;
769         }
770 
771         @Override
772         public void initializeSharedStateIn(ChunkController chunkController) {
773             HistogramChunkController statistics = (HistogramChunkController) chunkController;
774             int sharedPrefChunkSize = this.sharedPrefChunkSize;
775             statistics.localPrefChunkSize = sharedPrefChunkSize;
776             statistics.sharedPrefChunkSize = sharedPrefChunkSize;
777         }
778 
779         @Override
780         public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
781             int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
782             int minChunks = size / MIN_CHUNK_SIZE;
783             if (MIN_CHUNK_SIZE * minChunks < size) {
784                 // Round up to nearest whole MIN_CHUNK_SIZE unit. The MIN_CHUNK_SIZE is an even multiple of many
785                 // popular small page sizes, like 4k, 16k, and 64k, which makes it easier for the system allocator
786                 // to manage the memory in terms of whole pages. This reduces memory fragmentation,
787                 // but without the potentially high overhead that power-of-2 chunk sizes would bring.
788                 size = MIN_CHUNK_SIZE * (1 + minChunks);
789             }
790 
791             // Limit chunks to the max size, even if the histogram suggests to go above it.
792             size = Math.min(size, MAX_CHUNK_SIZE);
793 
794             // If we haven't rotated the histogram yet, optimisticly record this chunk size as our preferred.
795             if (!hasHadRotation && sharedPrefChunkSize == MIN_CHUNK_SIZE) {
796                 sharedPrefChunkSize = size;
797             }
798 
799             ChunkAllocator chunkAllocator = group.chunkAllocator;
800             Chunk chunk = new Chunk(chunkAllocator.allocate(size, size), magazine, true, this);
801             chunkRegistry.add(chunk);
802             return chunk;
803         }
804 
805         @Override
806         public boolean shouldReleaseChunk(int chunkSize) {
807             int preferredSize = preferredChunkSize();
808             int givenChunks = chunkSize / MIN_CHUNK_SIZE;
809             int preferredChunks = preferredSize / MIN_CHUNK_SIZE;
810             int deviation = Math.abs(givenChunks - preferredChunks);
811 
812             // Retire chunks with a 5% probability per unit of MIN_CHUNK_SIZE deviation from preference.
813             return deviation != 0 &&
814                     ThreadLocalRandom.current().nextDouble() * 20.0 < deviation;
815         }
816     }
817 
818     @SuppressJava6Requirement(reason = "Guarded by version check")
819     private static final class Magazine {
820         private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
821         static {
822             NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
823         }
824         private static final Chunk MAGAZINE_FREED = new Chunk();
825 
826         private static final ObjectPool<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
827                 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
828                     @Override
829                     public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
830                         return new AdaptiveByteBuf(handle);
831                     }
832                 });
833 
834         private Chunk current;
835         @SuppressWarnings("unused") // updated via NEXT_IN_LINE
836         private volatile Chunk nextInLine;
837         private final MagazineGroup group;
838         private final ChunkController chunkController;
839         private final StampedLock allocationLock;
840         private final Queue<AdaptiveByteBuf> bufferQueue;
841         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
842         private final Queue<Chunk> sharedChunkQueue;
843 
844         Magazine(MagazineGroup group, boolean shareable, Queue<Chunk> sharedChunkQueue,
845                  ChunkController chunkController) {
846             this.group = group;
847             this.chunkController = chunkController;
848 
849             if (shareable) {
850                 // We only need the StampedLock if this Magazine will be shared across threads.
851                 allocationLock = new StampedLock();
852                 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
853                 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
854                     @Override
855                     public void recycle(AdaptiveByteBuf self) {
856                         bufferQueue.offer(self);
857                     }
858                 };
859             } else {
860                 allocationLock = null;
861                 bufferQueue = null;
862                 handle = null;
863             }
864             this.sharedChunkQueue = sharedChunkQueue;
865         }
866 
867         public boolean tryAllocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
868             if (allocationLock == null) {
869                 // This magazine is not shared across threads, just allocate directly.
870                 return allocate(size, maxCapacity, buf, reallocate);
871             }
872 
873             // Try to retrieve the lock and if successful allocate.
874             long writeLock = allocationLock.tryWriteLock();
875             if (writeLock != 0) {
876                 try {
877                     return allocate(size, maxCapacity, buf, reallocate);
878                 } finally {
879                     allocationLock.unlockWrite(writeLock);
880                 }
881             }
882             return allocateWithoutLock(size, maxCapacity, buf);
883         }
884 
885         private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf buf) {
886             Chunk curr = NEXT_IN_LINE.getAndSet(this, null);
887             if (curr == MAGAZINE_FREED) {
888                 // Allocation raced with a stripe-resize that freed this magazine.
889                 restoreMagazineFreed();
890                 return false;
891             }
892             if (curr == null) {
893                 curr = sharedChunkQueue.poll();
894                 if (curr == null) {
895                     return false;
896                 }
897                 curr.attachToMagazine(this);
898             }
899             boolean allocated = false;
900             int remainingCapacity = curr.remainingCapacity();
901             int startingCapacity = chunkController.computeBufferCapacity(
902                     size, maxCapacity, true /* never update stats as we don't hold the magazine lock */);
903             if (remainingCapacity >= size) {
904                 curr.readInitInto(buf, size, Math.min(remainingCapacity, startingCapacity), maxCapacity);
905                 allocated = true;
906             }
907             try {
908                 if (remainingCapacity >= RETIRE_CAPACITY) {
909                     transferToNextInLineOrRelease(curr);
910                     curr = null;
911                 }
912             } finally {
913                 if (curr != null) {
914                     curr.releaseFromMagazine();
915                 }
916             }
917             return allocated;
918         }
919 
920         private boolean allocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
921             int startingCapacity = chunkController.computeBufferCapacity(size, maxCapacity, reallocate);
922             Chunk curr = current;
923             if (curr != null) {
924                 // We have a Chunk that has some space left.
925                 int remainingCapacity = curr.remainingCapacity();
926                 if (remainingCapacity > startingCapacity) {
927                     curr.readInitInto(buf, size, startingCapacity, maxCapacity);
928                     // We still have some bytes left that we can use for the next allocation, just early return.
929                     return true;
930                 }
931 
932                 // At this point we know that this will be the last time current will be used, so directly set it to
933                 // null and release it once we are done.
934                 current = null;
935                 if (remainingCapacity >= size) {
936                     try {
937                         curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
938                         return true;
939                     } finally {
940                         curr.releaseFromMagazine();
941                     }
942                 }
943 
944                 // Check if we either retain the chunk in the nextInLine cache or releasing it.
945                 if (remainingCapacity < RETIRE_CAPACITY) {
946                     curr.releaseFromMagazine();
947                 } else {
948                     // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
949                     // This method will release curr if this is not the case
950                     transferToNextInLineOrRelease(curr);
951                 }
952             }
953 
954             assert current == null;
955             // The fast-path for allocations did not work.
956             //
957             // Try to fetch the next "Magazine local" Chunk first, if this fails because we don't have a
958             // next-in-line chunk available, we will poll our centralQueue.
959             // If this fails as well we will just allocate a new Chunk.
960             //
961             // In any case we will store the Chunk as the current so it will be used again for the next allocation and
962             // thus be "reserved" by this Magazine for exclusive usage.
963             curr = NEXT_IN_LINE.getAndSet(this, null);
964             if (curr != null) {
965                 if (curr == MAGAZINE_FREED) {
966                     // Allocation raced with a stripe-resize that freed this magazine.
967                     restoreMagazineFreed();
968                     return false;
969                 }
970 
971                 int remainingCapacity = curr.remainingCapacity();
972                 if (remainingCapacity > startingCapacity) {
973                     // We have a Chunk that has some space left.
974                     curr.readInitInto(buf, size, startingCapacity, maxCapacity);
975                     current = curr;
976                     return true;
977                 }
978 
979                 if (remainingCapacity >= size) {
980                     // At this point we know that this will be the last time curr will be used, so directly set it to
981                     // null and release it once we are done.
982                     try {
983                         curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
984                         return true;
985                     } finally {
986                         // Release in a finally block so even if readInitInto(...) would throw we would still correctly
987                         // release the current chunk before null it out.
988                         curr.releaseFromMagazine();
989                     }
990                 } else {
991                     // Release it as it's too small.
992                     curr.releaseFromMagazine();
993                 }
994             }
995 
996             // Now try to poll from the central queue first
997             curr = sharedChunkQueue.poll();
998             if (curr == null) {
999                 curr = chunkController.newChunkAllocation(size, this);
1000             } else {
1001                 curr.attachToMagazine(this);
1002 
1003                 int remainingCapacity = curr.remainingCapacity();
1004                 if (remainingCapacity == 0 || remainingCapacity < size) {
1005                     // Check if we either retain the chunk in the nextInLine cache or releasing it.
1006                     if (remainingCapacity < RETIRE_CAPACITY) {
1007                         curr.releaseFromMagazine();
1008                     } else {
1009                         // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
1010                         // This method will release curr if this is not the case
1011                         transferToNextInLineOrRelease(curr);
1012                     }
1013                     curr = chunkController.newChunkAllocation(size, this);
1014                 }
1015             }
1016 
1017             current = curr;
1018             try {
1019                 int remainingCapacity = curr.remainingCapacity();
1020                 assert remainingCapacity >= size;
1021                 if (remainingCapacity > startingCapacity) {
1022                     curr.readInitInto(buf, size, startingCapacity, maxCapacity);
1023                     curr = null;
1024                 } else {
1025                     curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
1026                 }
1027             } finally {
1028                 if (curr != null) {
1029                     // Release in a finally block so even if readInitInto(...) would throw we would still correctly
1030                     // release the current chunk before null it out.
1031                     curr.releaseFromMagazine();
1032                     current = null;
1033                 }
1034             }
1035             return true;
1036         }
1037 
1038         private void restoreMagazineFreed() {
1039             Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
1040             if (next != null && next != MAGAZINE_FREED) {
1041                 // A chunk snuck in through a race. Release it after restoring MAGAZINE_FREED state.
1042                 next.releaseFromMagazine();
1043             }
1044         }
1045 
1046         private void transferToNextInLineOrRelease(Chunk chunk) {
1047             if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
1048                 return;
1049             }
1050 
1051             Chunk nextChunk = NEXT_IN_LINE.get(this);
1052             if (nextChunk != null && nextChunk != MAGAZINE_FREED
1053                     && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
1054                 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
1055                     nextChunk.releaseFromMagazine();
1056                     return;
1057                 }
1058             }
1059             // Next-in-line is occupied. We don't try to add it to the central queue yet as it might still be used
1060             // by some buffers and so is attached to a Magazine.
1061             // Once a Chunk is completely released by Chunk.release() it will try to move itself to the queue
1062             // as last resort.
1063             chunk.releaseFromMagazine();
1064         }
1065 
1066         boolean trySetNextInLine(Chunk chunk) {
1067             return NEXT_IN_LINE.compareAndSet(this, null, chunk);
1068         }
1069 
1070         void free() {
1071             // Release the current Chunk and the next that was stored for later usage.
1072             restoreMagazineFreed();
1073             long stamp = allocationLock != null ? allocationLock.writeLock() : 0;
1074             try {
1075                 if (current != null) {
1076                     current.releaseFromMagazine();
1077                     current = null;
1078                 }
1079             } finally {
1080                 if (allocationLock != null) {
1081                     allocationLock.unlockWrite(stamp);
1082                 }
1083             }
1084         }
1085 
1086         public AdaptiveByteBuf newBuffer() {
1087             AdaptiveByteBuf buf;
1088             if (handle == null) {
1089                 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
1090             } else {
1091                 buf = bufferQueue.poll();
1092                 if (buf == null) {
1093                     buf = new AdaptiveByteBuf(handle);
1094                 }
1095             }
1096             buf.resetRefCnt();
1097             buf.discardMarks();
1098             return buf;
1099         }
1100 
1101         boolean offerToQueue(Chunk chunk) {
1102             return group.offerToQueue(chunk);
1103         }
1104 
1105         public void initializeSharedStateIn(Magazine other) {
1106             chunkController.initializeSharedStateIn(other.chunkController);
1107         }
1108     }
1109 
1110     @SuppressJava6Requirement(reason = "Guarded by version check")
1111     private static final class ChunkRegistry {
1112         private final LongAdder totalCapacity = new LongAdder();
1113 
1114         public long totalCapacity() {
1115             return totalCapacity.sum();
1116         }
1117 
1118         public void add(Chunk chunk) {
1119             totalCapacity.add(chunk.capacity());
1120         }
1121 
1122         public void remove(Chunk chunk) {
1123             totalCapacity.add(-chunk.capacity());
1124         }
1125     }
1126 
1127     private static class Chunk implements ReferenceCounted {
1128         private static final long REFCNT_FIELD_OFFSET =
1129                 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
1130         private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
1131                 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
1132 
1133         protected final AbstractByteBuf delegate;
1134         protected Magazine magazine;
1135         private final AdaptivePoolingAllocator allocator;
1136         private final ChunkReleasePredicate chunkReleasePredicate;
1137         private final int capacity;
1138         private final boolean pooled;
1139         protected int allocatedBytes;
1140 
1141         private static final ReferenceCountUpdater<Chunk> updater =
1142                 new ReferenceCountUpdater<Chunk>() {
1143                     @Override
1144                     protected AtomicIntegerFieldUpdater<Chunk> updater() {
1145                         return AIF_UPDATER;
1146                     }
1147                     @Override
1148                     protected long unsafeOffset() {
1149                         // on native image, REFCNT_FIELD_OFFSET can be recomputed even with Unsafe unavailable, so we
1150                         // need to guard here
1151                         return PlatformDependent.hasUnsafe() ? REFCNT_FIELD_OFFSET : -1;
1152                     }
1153                 };
1154 
1155         // Value might not equal "real" reference count, all access should be via the updater
1156         @SuppressWarnings({"unused", "FieldMayBeFinal"})
1157         private volatile int refCnt;
1158 
1159         Chunk() {
1160             // Constructor only used by the MAGAZINE_FREED sentinel.
1161             delegate = null;
1162             magazine = null;
1163             allocator = null;
1164             chunkReleasePredicate = null;
1165             capacity = 0;
1166             pooled = false;
1167         }
1168 
1169         Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled,
1170               ChunkReleasePredicate chunkReleasePredicate) {
1171             this.delegate = delegate;
1172             this.pooled = pooled;
1173             capacity = delegate.capacity();
1174             updater.setInitialValue(this);
1175             attachToMagazine(magazine);
1176 
1177             // We need the top-level allocator so ByteBuf.capacity(int) can call reallocate()
1178             allocator = magazine.group.allocator;
1179 
1180             this.chunkReleasePredicate = chunkReleasePredicate;
1181         }
1182 
1183         Magazine currentMagazine()  {
1184             return magazine;
1185         }
1186 
1187         void detachFromMagazine() {
1188             if (magazine != null) {
1189                 magazine = null;
1190             }
1191         }
1192 
1193         void attachToMagazine(Magazine magazine) {
1194             assert this.magazine == null;
1195             this.magazine = magazine;
1196         }
1197 
1198         @Override
1199         public Chunk touch(Object hint) {
1200             return this;
1201         }
1202 
1203         @Override
1204         public int refCnt() {
1205             return updater.refCnt(this);
1206         }
1207 
1208         @Override
1209         public Chunk retain() {
1210             return updater.retain(this);
1211         }
1212 
1213         @Override
1214         public Chunk retain(int increment) {
1215             return updater.retain(this, increment);
1216         }
1217 
1218         @Override
1219         public Chunk touch() {
1220             return this;
1221         }
1222 
1223         @Override
1224         public boolean release() {
1225             if (updater.release(this)) {
1226                 deallocate();
1227                 return true;
1228             }
1229             return false;
1230         }
1231 
1232         @Override
1233         public boolean release(int decrement) {
1234             if (updater.release(this, decrement)) {
1235                 deallocate();
1236                 return true;
1237             }
1238             return false;
1239         }
1240 
1241         /**
1242          * Called when a magazine is done using this chunk, probably because it was emptied.
1243          */
1244         boolean releaseFromMagazine() {
1245             return release();
1246         }
1247 
1248         /**
1249          * Called when a ByteBuf is done using its allocation in this chunk.
1250          */
1251         boolean releaseSegment(int ignoredSegmentId) {
1252             return release();
1253         }
1254 
1255         private void deallocate() {
1256             Magazine mag = magazine;
1257             int chunkSize = delegate.capacity();
1258             if (!pooled || chunkReleasePredicate.shouldReleaseChunk(chunkSize) || mag == null) {
1259                 // Drop the chunk if the parent allocator is closed,
1260                 // or if the chunk deviates too much from the preferred chunk size.
1261                 detachFromMagazine();
1262                 allocator.chunkRegistry.remove(this);
1263                 delegate.release();
1264             } else {
1265                 updater.resetRefCnt(this);
1266                 delegate.setIndex(0, 0);
1267                 allocatedBytes = 0;
1268                 if (!mag.trySetNextInLine(this)) {
1269                     // As this Chunk does not belong to the mag anymore we need to decrease the used memory .
1270                     detachFromMagazine();
1271                     if (!mag.offerToQueue(this)) {
1272                         // The central queue is full. Ensure we release again as we previously did use resetRefCnt()
1273                         // which did increase the reference count by 1.
1274                         boolean released = updater.release(this);
1275                         allocator.chunkRegistry.remove(this);
1276                         delegate.release();
1277                         assert released;
1278                     }
1279                 }
1280             }
1281         }
1282 
1283         public void readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1284             int startIndex = allocatedBytes;
1285             allocatedBytes = startIndex + startingCapacity;
1286             Chunk chunk = this;
1287             chunk.retain();
1288             try {
1289                 buf.init(delegate, chunk, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1290                 chunk = null;
1291             } finally {
1292                 if (chunk != null) {
1293                     // If chunk is not null we know that buf.init(...) failed and so we need to manually release
1294                     // the chunk again as we retained it before calling buf.init(...). Beside this we also need to
1295                     // restore the old allocatedBytes value.
1296                     allocatedBytes = startIndex;
1297                     chunk.release();
1298                 }
1299             }
1300         }
1301 
1302         public int remainingCapacity() {
1303             return capacity - allocatedBytes;
1304         }
1305 
1306         public int capacity() {
1307             return capacity;
1308         }
1309     }
1310 
1311     private static final class SizeClassedChunk extends Chunk {
1312         private static final int FREE_LIST_EMPTY = -1;
1313         private final int segmentSize;
1314         private final MpscIntQueue freeList;
1315 
1316         SizeClassedChunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled, int segmentSize,
1317                          final int[] segmentOffsets, ChunkReleasePredicate shouldReleaseChunk) {
1318             super(delegate, magazine, pooled, shouldReleaseChunk);
1319             this.segmentSize = segmentSize;
1320             int segmentCount = segmentOffsets.length;
1321             assert delegate.capacity() / segmentSize == segmentCount;
1322             assert segmentCount > 0: "Chunk must have a positive number of segments";
1323             freeList = new MpscAtomicIntegerArrayQueue(segmentCount, FREE_LIST_EMPTY);
1324             freeList.fill(segmentCount, new IntSupplier() {
1325                 int counter;
1326                 @Override
1327                 public int get() {
1328                     return segmentOffsets[counter++];
1329                 }
1330             });
1331         }
1332 
1333         @Override
1334         public void readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1335             int startIndex = freeList.poll();
1336             if (startIndex == FREE_LIST_EMPTY) {
1337                 throw new IllegalStateException("Free list is empty");
1338             }
1339             allocatedBytes += segmentSize;
1340             Chunk chunk = this;
1341             chunk.retain();
1342             try {
1343                 buf.init(delegate, chunk, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1344                 chunk = null;
1345             } finally {
1346                 if (chunk != null) {
1347                     // If chunk is not null we know that buf.init(...) failed and so we need to manually release
1348                     // the chunk again as we retained it before calling buf.init(...). Beside this we also need to
1349                     // restore the old allocatedBytes value.
1350                     allocatedBytes -= segmentSize;
1351                     chunk.releaseSegment(startIndex);
1352                 }
1353             }
1354         }
1355 
1356         @Override
1357         public int remainingCapacity() {
1358             int remainingCapacity = super.remainingCapacity();
1359             if (remainingCapacity > segmentSize) {
1360                 return remainingCapacity;
1361             }
1362             int updatedRemainingCapacity = freeList.size() * segmentSize;
1363             if (updatedRemainingCapacity == remainingCapacity) {
1364                 return remainingCapacity;
1365             }
1366             // update allocatedBytes based on what's available in the free list
1367             allocatedBytes = capacity() - updatedRemainingCapacity;
1368             return updatedRemainingCapacity;
1369         }
1370 
1371         @Override
1372         boolean releaseFromMagazine() {
1373             // Size-classed chunks can be reused before they become empty.
1374             // We can therefor put them in the shared queue as soon as the magazine is done with this chunk.
1375             Magazine mag = magazine;
1376             detachFromMagazine();
1377             if (!mag.offerToQueue(this)) {
1378                 return super.releaseFromMagazine();
1379             }
1380             return false;
1381         }
1382 
1383         @Override
1384         boolean releaseSegment(int startIndex) {
1385             boolean released = release();
1386             boolean segmentReturned = freeList.offer(startIndex);
1387             assert segmentReturned: "Unable to return segment " + startIndex + " to free list";
1388             return released;
1389         }
1390     }
1391 
1392     static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
1393 
1394         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
1395 
1396         // this both act as adjustment and the start index for a free list segment allocation
1397         private int startIndex;
1398         private AbstractByteBuf rootParent;
1399         Chunk chunk;
1400         private int length;
1401         private int maxFastCapacity;
1402         private ByteBuffer tmpNioBuf;
1403         private boolean hasArray;
1404         private boolean hasMemoryAddress;
1405 
1406         AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
1407             super(0);
1408             handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
1409         }
1410 
1411         void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
1412                   int startIndex, int size, int capacity, int maxCapacity) {
1413             this.startIndex = startIndex;
1414             chunk = wrapped;
1415             length = size;
1416             maxFastCapacity = capacity;
1417             maxCapacity(maxCapacity);
1418             setIndex0(readerIndex, writerIndex);
1419             hasArray = unwrapped.hasArray();
1420             hasMemoryAddress = unwrapped.hasMemoryAddress();
1421             rootParent = unwrapped;
1422             tmpNioBuf = null;
1423         }
1424 
1425         private AbstractByteBuf rootParent() {
1426             final AbstractByteBuf rootParent = this.rootParent;
1427             if (rootParent != null) {
1428                 return rootParent;
1429             }
1430             throw new IllegalReferenceCountException();
1431         }
1432 
1433         @Override
1434         public int capacity() {
1435             return length;
1436         }
1437 
1438         @Override
1439         public int maxFastWritableBytes() {
1440             return Math.min(maxFastCapacity, maxCapacity()) - writerIndex;
1441         }
1442 
1443         @Override
1444         public ByteBuf capacity(int newCapacity) {
1445             if (length <= newCapacity && newCapacity <= maxFastCapacity) {
1446                 ensureAccessible();
1447                 length = newCapacity;
1448                 return this;
1449             }
1450             checkNewCapacity(newCapacity);
1451             if (newCapacity < capacity()) {
1452                 length = newCapacity;
1453                 trimIndicesToCapacity(newCapacity);
1454                 return this;
1455             }
1456 
1457             // Reallocation required.
1458             Chunk chunk = this.chunk;
1459             AdaptivePoolingAllocator allocator = chunk.allocator;
1460             int readerIndex = this.readerIndex;
1461             int writerIndex = this.writerIndex;
1462             int baseOldRootIndex = startIndex;
1463             int oldCapacity = length;
1464             AbstractByteBuf oldRoot = rootParent();
1465             allocator.reallocate(newCapacity, maxCapacity(), this);
1466             oldRoot.getBytes(baseOldRootIndex, this, 0, oldCapacity);
1467             chunk.releaseSegment(baseOldRootIndex);
1468             this.readerIndex = readerIndex;
1469             this.writerIndex = writerIndex;
1470             return this;
1471         }
1472 
1473         @Override
1474         public ByteBufAllocator alloc() {
1475             return rootParent().alloc();
1476         }
1477 
1478         @Override
1479         public ByteOrder order() {
1480             return rootParent().order();
1481         }
1482 
1483         @Override
1484         public ByteBuf unwrap() {
1485             return null;
1486         }
1487 
1488         @Override
1489         public boolean isDirect() {
1490             return rootParent().isDirect();
1491         }
1492 
1493         @Override
1494         public int arrayOffset() {
1495             return idx(rootParent().arrayOffset());
1496         }
1497 
1498         @Override
1499         public boolean hasMemoryAddress() {
1500             return hasMemoryAddress;
1501         }
1502 
1503         @Override
1504         public long memoryAddress() {
1505             ensureAccessible();
1506             return rootParent().memoryAddress() + startIndex;
1507         }
1508 
1509         @Override
1510         public ByteBuffer nioBuffer(int index, int length) {
1511             checkIndex(index, length);
1512             return rootParent().nioBuffer(idx(index), length);
1513         }
1514 
1515         @Override
1516         public ByteBuffer internalNioBuffer(int index, int length) {
1517             checkIndex(index, length);
1518             return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1519         }
1520 
1521         private ByteBuffer internalNioBuffer() {
1522             if (tmpNioBuf == null) {
1523                 tmpNioBuf = rootParent().nioBuffer(startIndex, maxFastCapacity);
1524             }
1525             return (ByteBuffer) tmpNioBuf.clear();
1526         }
1527 
1528         @Override
1529         public ByteBuffer[] nioBuffers(int index, int length) {
1530             checkIndex(index, length);
1531             return rootParent().nioBuffers(idx(index), length);
1532         }
1533 
1534         @Override
1535         public boolean hasArray() {
1536             return hasArray;
1537         }
1538 
1539         @Override
1540         public byte[] array() {
1541             ensureAccessible();
1542             return rootParent().array();
1543         }
1544 
1545         @Override
1546         public ByteBuf copy(int index, int length) {
1547             checkIndex(index, length);
1548             return rootParent().copy(idx(index), length);
1549         }
1550 
1551         @Override
1552         public int nioBufferCount() {
1553             return rootParent().nioBufferCount();
1554         }
1555 
1556         @Override
1557         protected byte _getByte(int index) {
1558             return rootParent()._getByte(idx(index));
1559         }
1560 
1561         @Override
1562         protected short _getShort(int index) {
1563             return rootParent()._getShort(idx(index));
1564         }
1565 
1566         @Override
1567         protected short _getShortLE(int index) {
1568             return rootParent()._getShortLE(idx(index));
1569         }
1570 
1571         @Override
1572         protected int _getUnsignedMedium(int index) {
1573             return rootParent()._getUnsignedMedium(idx(index));
1574         }
1575 
1576         @Override
1577         protected int _getUnsignedMediumLE(int index) {
1578             return rootParent()._getUnsignedMediumLE(idx(index));
1579         }
1580 
1581         @Override
1582         protected int _getInt(int index) {
1583             return rootParent()._getInt(idx(index));
1584         }
1585 
1586         @Override
1587         protected int _getIntLE(int index) {
1588             return rootParent()._getIntLE(idx(index));
1589         }
1590 
1591         @Override
1592         protected long _getLong(int index) {
1593             return rootParent()._getLong(idx(index));
1594         }
1595 
1596         @Override
1597         protected long _getLongLE(int index) {
1598             return rootParent()._getLongLE(idx(index));
1599         }
1600 
1601         @Override
1602         public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1603             checkIndex(index, length);
1604             rootParent().getBytes(idx(index), dst, dstIndex, length);
1605             return this;
1606         }
1607 
1608         @Override
1609         public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1610             checkIndex(index, length);
1611             rootParent().getBytes(idx(index), dst, dstIndex, length);
1612             return this;
1613         }
1614 
1615         @Override
1616         public ByteBuf getBytes(int index, ByteBuffer dst) {
1617             checkIndex(index, dst.remaining());
1618             rootParent().getBytes(idx(index), dst);
1619             return this;
1620         }
1621 
1622         @Override
1623         protected void _setByte(int index, int value) {
1624             rootParent()._setByte(idx(index), value);
1625         }
1626 
1627         @Override
1628         protected void _setShort(int index, int value) {
1629             rootParent()._setShort(idx(index), value);
1630         }
1631 
1632         @Override
1633         protected void _setShortLE(int index, int value) {
1634             rootParent()._setShortLE(idx(index), value);
1635         }
1636 
1637         @Override
1638         protected void _setMedium(int index, int value) {
1639             rootParent()._setMedium(idx(index), value);
1640         }
1641 
1642         @Override
1643         protected void _setMediumLE(int index, int value) {
1644             rootParent()._setMediumLE(idx(index), value);
1645         }
1646 
1647         @Override
1648         protected void _setInt(int index, int value) {
1649             rootParent()._setInt(idx(index), value);
1650         }
1651 
1652         @Override
1653         protected void _setIntLE(int index, int value) {
1654             rootParent()._setIntLE(idx(index), value);
1655         }
1656 
1657         @Override
1658         protected void _setLong(int index, long value) {
1659             rootParent()._setLong(idx(index), value);
1660         }
1661 
1662         @Override
1663         protected void _setLongLE(int index, long value) {
1664             rootParent().setLongLE(idx(index), value);
1665         }
1666 
1667         @Override
1668         public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1669             checkIndex(index, length);
1670             ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1671             tmp.put(src, srcIndex, length);
1672             return this;
1673         }
1674 
1675         @Override
1676         public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1677             checkIndex(index, length);
1678             ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1679             tmp.put(src.nioBuffer(srcIndex, length));
1680             return this;
1681         }
1682 
1683         @Override
1684         public ByteBuf setBytes(int index, ByteBuffer src) {
1685             checkIndex(index, src.remaining());
1686             ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1687             tmp.put(src);
1688             return this;
1689         }
1690 
1691         @Override
1692         public ByteBuf getBytes(int index, OutputStream out, int length)
1693                 throws IOException {
1694             checkIndex(index, length);
1695             if (length != 0) {
1696                 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1697             }
1698             return this;
1699         }
1700 
1701         @Override
1702         public int getBytes(int index, GatheringByteChannel out, int length)
1703                 throws IOException {
1704             ByteBuffer buf = internalNioBuffer().duplicate();
1705             buf.clear().position(index).limit(index + length);
1706             return out.write(buf);
1707         }
1708 
1709         @Override
1710         public int getBytes(int index, FileChannel out, long position, int length)
1711                 throws IOException {
1712             ByteBuffer buf = internalNioBuffer().duplicate();
1713             buf.clear().position(index).limit(index + length);
1714             return out.write(buf, position);
1715         }
1716 
1717         @Override
1718         public int setBytes(int index, InputStream in, int length)
1719                 throws IOException {
1720             checkIndex(index, length);
1721             final AbstractByteBuf rootParent = rootParent();
1722             if (rootParent.hasArray()) {
1723                 return rootParent.setBytes(idx(index), in, length);
1724             }
1725             byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1726             int readBytes = in.read(tmp, 0, length);
1727             if (readBytes <= 0) {
1728                 return readBytes;
1729             }
1730             setBytes(index, tmp, 0, readBytes);
1731             return readBytes;
1732         }
1733 
1734         @Override
1735         public int setBytes(int index, ScatteringByteChannel in, int length)
1736                 throws IOException {
1737             try {
1738                 return in.read(internalNioBuffer(index, length));
1739             } catch (ClosedChannelException ignored) {
1740                 return -1;
1741             }
1742         }
1743 
1744         @Override
1745         public int setBytes(int index, FileChannel in, long position, int length)
1746                 throws IOException {
1747             try {
1748                 return in.read(internalNioBuffer(index, length), position);
1749             } catch (ClosedChannelException ignored) {
1750                 return -1;
1751             }
1752         }
1753 
1754         @Override
1755         public int setCharSequence(int index, CharSequence sequence, Charset charset) {
1756             return setCharSequence0(index, sequence, charset, false);
1757         }
1758 
1759         private int setCharSequence0(int index, CharSequence sequence, Charset charset, boolean expand) {
1760             if (charset.equals(CharsetUtil.UTF_8)) {
1761                 int length = ByteBufUtil.utf8MaxBytes(sequence);
1762                 if (expand) {
1763                     ensureWritable0(length);
1764                     checkIndex0(index, length);
1765                 } else {
1766                     checkIndex(index, length);
1767                 }
1768                 return ByteBufUtil.writeUtf8(this, index, length, sequence, sequence.length());
1769             }
1770             if (charset.equals(CharsetUtil.US_ASCII) || charset.equals(CharsetUtil.ISO_8859_1)) {
1771                 int length = sequence.length();
1772                 if (expand) {
1773                     ensureWritable0(length);
1774                     checkIndex0(index, length);
1775                 } else {
1776                     checkIndex(index, length);
1777                 }
1778                 return ByteBufUtil.writeAscii(this, index, sequence, length);
1779             }
1780             byte[] bytes = sequence.toString().getBytes(charset);
1781             if (expand) {
1782                 ensureWritable0(bytes.length);
1783                 // setBytes(...) will take care of checking the indices.
1784             }
1785             setBytes(index, bytes);
1786             return bytes.length;
1787         }
1788 
1789         @Override
1790         public int writeCharSequence(CharSequence sequence, Charset charset) {
1791             int written = setCharSequence0(writerIndex, sequence, charset, true);
1792             writerIndex += written;
1793             return written;
1794         }
1795 
1796         @Override
1797         public int forEachByte(int index, int length, ByteProcessor processor) {
1798             checkIndex(index, length);
1799             int ret = rootParent().forEachByte(idx(index), length, processor);
1800             return forEachResult(ret);
1801         }
1802 
1803         @Override
1804         public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1805             checkIndex(index, length);
1806             int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1807             return forEachResult(ret);
1808         }
1809 
1810         @Override
1811         public ByteBuf setZero(int index, int length) {
1812             checkIndex(index, length);
1813             rootParent().setZero(idx(index), length);
1814             return this;
1815         }
1816 
1817         @Override
1818         public ByteBuf writeZero(int length) {
1819             ensureWritable(length);
1820             rootParent().setZero(idx(writerIndex), length);
1821             writerIndex += length;
1822             return this;
1823         }
1824 
1825         private int forEachResult(int ret) {
1826             if (ret < startIndex) {
1827                 return -1;
1828             }
1829             return ret - startIndex;
1830         }
1831 
1832         @Override
1833         public boolean isContiguous() {
1834             return rootParent().isContiguous();
1835         }
1836 
1837         private int idx(int index) {
1838             return index + startIndex;
1839         }
1840 
1841         @Override
1842         protected void deallocate() {
1843             if (chunk != null) {
1844                 chunk.releaseSegment(startIndex);
1845             }
1846             tmpNioBuf = null;
1847             chunk = null;
1848             rootParent = null;
1849             if (handle instanceof EnhancedHandle) {
1850                 EnhancedHandle<AdaptiveByteBuf>  enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1851                 enhancedHandle.unguardedRecycle(this);
1852             } else {
1853                 handle.recycle(this);
1854             }
1855         }
1856     }
1857 
1858     /**
1859      * The strategy for how {@link AdaptivePoolingAllocator} should allocate chunk buffers.
1860      */
1861     interface ChunkAllocator {
1862         /**
1863          * Allocate a buffer for a chunk. This can be any kind of {@link AbstractByteBuf} implementation.
1864          * @param initialCapacity The initial capacity of the returned {@link AbstractByteBuf}.
1865          * @param maxCapacity The maximum capacity of the returned {@link AbstractByteBuf}.
1866          * @return The buffer that represents the chunk memory.
1867          */
1868         AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1869     }
1870 }