View Javadoc
1   /*
2    * Copyright 2022 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.buffer;
17  
18  import io.netty.util.ByteProcessor;
19  import io.netty.util.CharsetUtil;
20  import io.netty.util.IllegalReferenceCountException;
21  import io.netty.util.IntSupplier;
22  import io.netty.util.NettyRuntime;
23  import io.netty.util.Recycler.EnhancedHandle;
24  import io.netty.util.ReferenceCounted;
25  import io.netty.util.concurrent.FastThreadLocal;
26  import io.netty.util.concurrent.FastThreadLocalThread;
27  import io.netty.util.concurrent.MpscAtomicIntegerArrayQueue;
28  import io.netty.util.concurrent.MpscIntQueue;
29  import io.netty.util.internal.ObjectPool;
30  import io.netty.util.internal.ObjectUtil;
31  import io.netty.util.internal.PlatformDependent;
32  import io.netty.util.internal.ReferenceCountUpdater;
33  import io.netty.util.internal.SuppressJava6Requirement;
34  import io.netty.util.internal.SystemPropertyUtil;
35  import io.netty.util.internal.ThreadExecutorMap;
36  import io.netty.util.internal.ThreadLocalRandom;
37  import io.netty.util.internal.UnstableApi;
38  
39  import java.io.IOException;
40  import java.io.InputStream;
41  import java.io.OutputStream;
42  import java.nio.ByteBuffer;
43  import java.nio.ByteOrder;
44  import java.nio.channels.ClosedChannelException;
45  import java.nio.channels.FileChannel;
46  import java.nio.channels.GatheringByteChannel;
47  import java.nio.channels.ScatteringByteChannel;
48  import java.nio.charset.Charset;
49  import java.util.Arrays;
50  import java.util.Queue;
51  import java.util.concurrent.ConcurrentLinkedQueue;
52  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
53  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
54  import java.util.concurrent.atomic.LongAdder;
55  import java.util.concurrent.locks.StampedLock;
56  
57  /**
58   * An auto-tuning pooling allocator, that follows an anti-generational hypothesis.
59   * <p>
60   * The allocator is organized into a list of Magazines, and each magazine has a chunk-buffer that they allocate buffers
61   * from.
62   * <p>
63   * The magazines hold the mutexes that ensure the thread-safety of the allocator, and each thread picks a magazine
64   * based on the id of the thread. This spreads the contention of multi-threaded access across the magazines.
65   * If contention is detected above a certain threshold, the number of magazines are increased in response to the
66   * contention.
67   * <p>
68   * The magazines maintain histograms of the sizes of the allocations they do. The histograms are used to compute the
69   * preferred chunk size. The preferred chunk size is one that is big enough to service 10 allocations of the
70   * 99-percentile size. This way, the chunk size is adapted to the allocation patterns.
71   * <p>
72   * Computing the preferred chunk size is a somewhat expensive operation. Therefore, the frequency with which this is
73   * done, is also adapted to the allocation pattern. If a newly computed preferred chunk is the same as the previous
74   * preferred chunk size, then the frequency is reduced. Otherwise, the frequency is increased.
75   * <p>
76   * This allows the allocator to quickly respond to changes in the application workload,
77   * without suffering undue overhead from maintaining its statistics.
78   * <p>
79   * Since magazines are "relatively thread-local", the allocator has a central queue that allow excess chunks from any
80   * magazine, to be shared with other magazines.
81   * The {@link #createSharedChunkQueue()} method can be overridden to customize this queue.
82   */
83  @SuppressJava6Requirement(reason = "Guarded by version check")
84  @UnstableApi
85  final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.AdaptiveAllocatorApi {
86      /**
87       * The 128 KiB minimum chunk size is chosen to encourage the system allocator to delegate to mmap for chunk
88       * allocations. For instance, glibc will do this.
89       * This pushes any fragmentation from chunk size deviations off physical memory, onto virtual memory,
90       * which is a much, much larger space. Chunks are also allocated in whole multiples of the minimum
91       * chunk size, which itself is a whole multiple of popular page sizes like 4 KiB, 16 KiB, and 64 KiB.
92       */
93      private static final int MIN_CHUNK_SIZE = 128 * 1024;
94      private static final int EXPANSION_ATTEMPTS = 3;
95      private static final int INITIAL_MAGAZINES = 1;
96      private static final int RETIRE_CAPACITY = 256;
97      private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
98      private static final int BUFS_PER_CHUNK = 8; // For large buffers, aim to have about this many buffers per chunk.
99  
100     /**
101      * The maximum size of a pooled chunk, in bytes. Allocations bigger than this will never be pooled.
102      * <p>
103      * This number is 8 MiB, and is derived from the limitations of internal histograms.
104      */
105     private static final int MAX_CHUNK_SIZE = 8 * 1024 * 1024; // 8 MiB.
106     private static final int MAX_POOLED_BUF_SIZE = MAX_CHUNK_SIZE / BUFS_PER_CHUNK;
107 
108     /**
109      * The capacity if the chunk reuse queues, that allow chunks to be shared across magazines in a group.
110      * The default size is twice {@link NettyRuntime#availableProcessors()},
111      * same as the maximum number of magazines per magazine group.
112      */
113     private static final int CHUNK_REUSE_QUEUE = Math.max(2, SystemPropertyUtil.getInt(
114             "io.netty.allocator.chunkReuseQueueCapacity", NettyRuntime.availableProcessors() * 2));
115 
116     /**
117      * The capacity if the magazine local buffer queue. This queue just pools the outer ByteBuf instance and not
118      * the actual memory and so helps to reduce GC pressure.
119      */
120     private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
121             "io.netty.allocator.magazineBufferQueueCapacity", 1024);
122 
123     /**
124      * The size classes are chosen based on the following observation:
125      * <p>
126      * Most allocations, particularly ones above 256 bytes, aim to be a power-of-2. However, many use cases, such
127      * as framing protocols, are themselves operating or moving power-of-2 sized payloads, to which they add a
128      * small amount of overhead, such as headers or checksums.
129      * This means we seem to get a lot of mileage out of having both power-of-2 sizes, and power-of-2-plus-a-bit.
130      * <p>
131      * On the conflicting requirements of both having as few chunks as possible, and having as little wasted
132      * memory within each chunk as possible, this seems to strike a surprisingly good balance for the use cases
133      * tested so far.
134      */
135     private static final int[] SIZE_CLASSES = {
136             32,
137             64,
138             128,
139             256,
140             512,
141             640, // 512 + 128
142             1024,
143             1152, // 1024 + 128
144             2048,
145             2304, // 2048 + 256
146             4096,
147             4352, // 4096 + 256
148             8192,
149             8704, // 8192 + 512
150             16384,
151             16896, // 16384 + 512
152             32768,
153             65536,
154     };
155     private static final ChunkReleasePredicate CHUNK_RELEASE_ALWAYS = new ChunkReleasePredicate() {
156         @Override
157         public boolean shouldReleaseChunk(int chunkSize) {
158             return true;
159         }
160     };
161     private static final ChunkReleasePredicate CHUNK_RELEASE_NEVER = new ChunkReleasePredicate() {
162         @Override
163         public boolean shouldReleaseChunk(int chunkSize) {
164             return false;
165         }
166     };
167 
168     private static final int SIZE_CLASSES_COUNT = SIZE_CLASSES.length;
169     private static final byte[] SIZE_INDEXES = new byte[(SIZE_CLASSES[SIZE_CLASSES_COUNT - 1] / 32) + 1];
170 
171     static {
172         if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
173             throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
174                     + " (expected: >= " + 2 + ')');
175         }
176         int lastIndex = 0;
177         for (int i = 0; i < SIZE_CLASSES_COUNT; i++) {
178             int sizeClass = SIZE_CLASSES[i];
179             //noinspection ConstantValue
180             assert (sizeClass & 5) == 0 : "Size class must be a multiple of 32";
181             int sizeIndex = sizeIndexOf(sizeClass);
182             Arrays.fill(SIZE_INDEXES, lastIndex + 1, sizeIndex + 1, (byte) i);
183             lastIndex = sizeIndex;
184         }
185     }
186 
187     private final ChunkAllocator chunkAllocator;
188     private final ChunkRegistry chunkRegistry;
189     private final MagazineGroup[] sizeClassedMagazineGroups;
190     private final MagazineGroup largeBufferMagazineGroup;
191     private final FastThreadLocal<MagazineGroup[]> threadLocalGroup;
192 
193     AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, final boolean useCacheForNonEventLoopThreads) {
194         this.chunkAllocator = ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
195         chunkRegistry = new ChunkRegistry();
196         sizeClassedMagazineGroups = createMagazineGroupSizeClasses(this, false);
197         largeBufferMagazineGroup = new MagazineGroup(
198                 this, chunkAllocator, new HistogramChunkControllerFactory(true), false);
199         threadLocalGroup = new FastThreadLocal<MagazineGroup[]>() {
200             @Override
201             protected MagazineGroup[] initialValue() {
202                 if (useCacheForNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
203                     return createMagazineGroupSizeClasses(AdaptivePoolingAllocator.this, true);
204                 }
205                 return null;
206             }
207 
208             @Override
209             protected void onRemoval(final MagazineGroup[] groups) throws Exception {
210                 if (groups != null) {
211                     for (MagazineGroup group : groups) {
212                         group.free();
213                     }
214                 }
215             }
216         };
217     }
218 
219     private static MagazineGroup[] createMagazineGroupSizeClasses(
220             AdaptivePoolingAllocator allocator, boolean isThreadLocal) {
221         MagazineGroup[] groups = new MagazineGroup[SIZE_CLASSES.length];
222         for (int i = 0; i < SIZE_CLASSES.length; i++) {
223             int segmentSize = SIZE_CLASSES[i];
224             groups[i] = new MagazineGroup(allocator, allocator.chunkAllocator,
225                     new SizeClassChunkControllerFactory(segmentSize), isThreadLocal);
226         }
227         return groups;
228     }
229 
230     /**
231      * Create a thread-safe multi-producer, multi-consumer queue to hold chunks that spill over from the
232      * internal Magazines.
233      * <p>
234      * Each Magazine can only hold two chunks at any one time: the chunk it currently allocates from,
235      * and the next-in-line chunk which will be used for allocation once the current one has been used up.
236      * This queue will be used by magazines to share any excess chunks they allocate, so that they don't need to
237      * allocate new chunks when their current and next-in-line chunks have both been used up.
238      * <p>
239      * The simplest implementation of this method is to return a new {@link ConcurrentLinkedQueue}.
240      * However, the {@code CLQ} is unbounded, and this means there's no limit to how many chunks can be cached in this
241      * queue.
242      * <p>
243      * Each chunk in this queue can be up to {@link #MAX_CHUNK_SIZE} in size, so it is recommended to use a bounded
244      * queue to limit the maximum memory usage.
245      * <p>
246      * The default implementation will create a bounded queue with a capacity of {@link #CHUNK_REUSE_QUEUE}.
247      *
248      * @return A new multi-producer, multi-consumer queue.
249      */
250     private static Queue<Chunk> createSharedChunkQueue() {
251         return PlatformDependent.newFixedMpmcQueue(CHUNK_REUSE_QUEUE);
252     }
253 
254     @Override
255     public ByteBuf allocate(int size, int maxCapacity) {
256         return allocate(size, maxCapacity, Thread.currentThread(), null);
257     }
258 
259     private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
260         AdaptiveByteBuf allocated = null;
261         if (size <= MAX_POOLED_BUF_SIZE) {
262             final int index = sizeClassIndexOf(size);
263             MagazineGroup[] magazineGroups;
264             if (!FastThreadLocalThread.willCleanupFastThreadLocals(currentThread) ||
265                     (magazineGroups = threadLocalGroup.get()) == null) {
266                 magazineGroups =  sizeClassedMagazineGroups;
267             }
268             if (index < magazineGroups.length) {
269                 allocated = magazineGroups[index].allocate(size, maxCapacity, currentThread, buf);
270             } else {
271                 allocated = largeBufferMagazineGroup.allocate(size, maxCapacity, currentThread, buf);
272             }
273         }
274         if (allocated == null) {
275             allocated = allocateFallback(size, maxCapacity, currentThread, buf);
276         }
277         return allocated;
278     }
279 
280     private static int sizeIndexOf(final int size) {
281         // this is aligning the size to the next multiple of 32 and dividing by 32 to get the size index.
282         return size + 31 >> 5;
283     }
284 
285     static int sizeClassIndexOf(int size) {
286         int sizeIndex = sizeIndexOf(size);
287         if (sizeIndex < SIZE_INDEXES.length) {
288             return SIZE_INDEXES[sizeIndex];
289         }
290         return SIZE_CLASSES_COUNT;
291     }
292 
293     static int[] getSizeClasses() {
294         return SIZE_CLASSES.clone();
295     }
296 
297     private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread,
298                                              AdaptiveByteBuf buf) {
299         // If we don't already have a buffer, obtain one from the most conveniently available magazine.
300         Magazine magazine;
301         if (buf != null) {
302             Chunk chunk = buf.chunk;
303             if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
304                 magazine = getFallbackMagazine(currentThread);
305             }
306         } else {
307             magazine = getFallbackMagazine(currentThread);
308             buf = magazine.newBuffer();
309         }
310         // Create a one-off chunk for this allocation.
311         AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
312         Chunk chunk = new Chunk(innerChunk, magazine, false, CHUNK_RELEASE_ALWAYS);
313         chunkRegistry.add(chunk);
314         try {
315             chunk.readInitInto(buf, size, size, maxCapacity);
316         } finally {
317             // As the chunk is an one-off we need to always call release explicitly as readInitInto(...)
318             // will take care of retain once when successful. Once The AdaptiveByteBuf is released it will
319             // completely release the Chunk and so the contained innerChunk.
320             chunk.release();
321         }
322         return buf;
323     }
324 
325     private Magazine getFallbackMagazine(Thread currentThread) {
326         Magazine[] mags = largeBufferMagazineGroup.magazines;
327         return mags[(int) currentThread.getId() & mags.length - 1];
328     }
329 
330     /**
331      * Allocate into the given buffer. Used by {@link AdaptiveByteBuf#capacity(int)}.
332      */
333     void reallocate(int size, int maxCapacity, AdaptiveByteBuf into) {
334         AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
335         assert result == into: "Re-allocation created separate buffer instance";
336     }
337 
338     @Override
339     public long usedMemory() {
340         return chunkRegistry.totalCapacity();
341     }
342 
343     // Ensure that we release all previous pooled resources when this object is finalized. This is needed as otherwise
344     // we might end up with leaks. While these leaks are usually harmless in reality it would still at least be
345     // very confusing for users.
346     @SuppressWarnings({"FinalizeDeclaration", "deprecation"})
347     @Override
348     protected void finalize() throws Throwable {
349         try {
350             super.finalize();
351         } finally {
352             free();
353         }
354     }
355 
356     private void free() {
357         largeBufferMagazineGroup.free();
358     }
359 
360     static int sizeToBucket(int size) {
361         return HistogramChunkController.sizeToBucket(size);
362     }
363 
364     @SuppressJava6Requirement(reason = "Guarded by version check")
365     private static final class MagazineGroup {
366         private final AdaptivePoolingAllocator allocator;
367         private final ChunkAllocator chunkAllocator;
368         private final ChunkControllerFactory chunkControllerFactory;
369         private final Queue<Chunk> chunkReuseQueue;
370         private final StampedLock magazineExpandLock;
371         private final Magazine threadLocalMagazine;
372         private volatile Magazine[] magazines;
373         private volatile boolean freed;
374 
375         MagazineGroup(AdaptivePoolingAllocator allocator,
376                       ChunkAllocator chunkAllocator,
377                       ChunkControllerFactory chunkControllerFactory,
378                       boolean isThreadLocal) {
379             this.allocator = allocator;
380             this.chunkAllocator = chunkAllocator;
381             this.chunkControllerFactory = chunkControllerFactory;
382             chunkReuseQueue = createSharedChunkQueue();
383             if (isThreadLocal) {
384                 magazineExpandLock = null;
385                 threadLocalMagazine = new Magazine(this, false, chunkReuseQueue, chunkControllerFactory.create(this));
386             } else {
387                 magazineExpandLock = new StampedLock();
388                 threadLocalMagazine = null;
389                 Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
390                 for (int i = 0; i < mags.length; i++) {
391                     mags[i] = new Magazine(this, true, chunkReuseQueue, chunkControllerFactory.create(this));
392                 }
393                 magazines = mags;
394             }
395         }
396 
397         public AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
398             boolean reallocate = buf != null;
399 
400             // Path for thread-local allocation.
401             Magazine tlMag = threadLocalMagazine;
402             if (tlMag != null) {
403                 if (buf == null) {
404                     buf = tlMag.newBuffer();
405                 }
406                 boolean allocated = tlMag.tryAllocate(size, maxCapacity, buf, reallocate);
407                 assert allocated : "Allocation of threadLocalMagazine must always succeed";
408                 return buf;
409             }
410 
411             // Path for concurrent allocation.
412             long threadId = currentThread.getId();
413             Magazine[] mags;
414             int expansions = 0;
415             do {
416                 mags = magazines;
417                 int mask = mags.length - 1;
418                 int index = (int) (threadId & mask);
419                 for (int i = 0, m = mags.length << 1; i < m; i++) {
420                     Magazine mag = mags[index + i & mask];
421                     if (buf == null) {
422                         buf = mag.newBuffer();
423                     }
424                     if (mag.tryAllocate(size, maxCapacity, buf, reallocate)) {
425                         // Was able to allocate.
426                         return buf;
427                     }
428                 }
429                 expansions++;
430             } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
431 
432             // The magazines failed us; contention too high and we don't want to spend more effort expanding the array.
433             if (!reallocate && buf != null) {
434                 buf.release(); // Release the previously claimed buffer before we return.
435             }
436             return null;
437         }
438 
439         private boolean tryExpandMagazines(int currentLength) {
440             if (currentLength >= MAX_STRIPES) {
441                 return true;
442             }
443             final Magazine[] mags;
444             long writeLock = magazineExpandLock.tryWriteLock();
445             if (writeLock != 0) {
446                 try {
447                     mags = magazines;
448                     if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
449                         return true;
450                     }
451                     Magazine firstMagazine = mags[0];
452                     Magazine[] expanded = new Magazine[mags.length * 2];
453                     for (int i = 0, l = expanded.length; i < l; i++) {
454                         Magazine m = new Magazine(this, true, chunkReuseQueue, chunkControllerFactory.create(this));
455                         firstMagazine.initializeSharedStateIn(m);
456                         expanded[i] = m;
457                     }
458                     magazines = expanded;
459                 } finally {
460                     magazineExpandLock.unlockWrite(writeLock);
461                 }
462                 for (Magazine magazine : mags) {
463                     magazine.free();
464                 }
465             }
466             return true;
467         }
468 
469         boolean offerToQueue(Chunk buffer) {
470             if (freed) {
471                 return false;
472             }
473 
474             boolean isAdded = chunkReuseQueue.offer(buffer);
475             if (freed && isAdded) {
476                 // Help to free the reuse queue.
477                 freeChunkReuseQueue();
478             }
479             return isAdded;
480         }
481 
482         private void free() {
483             freed = true;
484             if (threadLocalMagazine != null) {
485                 threadLocalMagazine.free();
486             } else {
487                 long stamp = magazineExpandLock.writeLock();
488                 try {
489                     Magazine[] mags = magazines;
490                     for (Magazine magazine : mags) {
491                         magazine.free();
492                     }
493                 } finally {
494                     magazineExpandLock.unlockWrite(stamp);
495                 }
496             }
497             freeChunkReuseQueue();
498         }
499 
500         private void freeChunkReuseQueue() {
501             for (;;) {
502                 Chunk chunk = chunkReuseQueue.poll();
503                 if (chunk == null) {
504                     break;
505                 }
506                 chunk.release();
507             }
508         }
509     }
510 
511     private interface ChunkControllerFactory {
512         ChunkController create(MagazineGroup group);
513     }
514 
515     private interface ChunkController {
516         /**
517          * Compute the "fast max capacity" value for the buffer.
518          */
519         int computeBufferCapacity(int requestedSize, int maxCapacity, boolean isReallocation);
520 
521         /**
522          * Initialize the given chunk factory with shared statistics state (if any) from this factory.
523          */
524         void initializeSharedStateIn(ChunkController chunkController);
525 
526         /**
527          * Allocate a new {@link Chunk} for the given {@link Magazine}.
528          */
529         Chunk newChunkAllocation(int promptingSize, Magazine magazine);
530     }
531 
532     private interface ChunkReleasePredicate {
533         boolean shouldReleaseChunk(int chunkSize);
534     }
535 
536     private static final class SizeClassChunkControllerFactory implements ChunkControllerFactory {
537         // To amortize activation/deactivation of chunks, we should have a minimum number of segments per chunk.
538         // We choose 32 because it seems neither too small nor too big.
539         // For segments of 16 KiB, the chunks will be half a megabyte.
540         private static final int MIN_SEGMENTS_PER_CHUNK = 32;
541         private final int segmentSize;
542         private final int chunkSize;
543         private final int[] segmentOffsets;
544 
545         private SizeClassChunkControllerFactory(int segmentSize) {
546             this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
547             chunkSize = Math.max(MIN_CHUNK_SIZE, segmentSize * MIN_SEGMENTS_PER_CHUNK);
548             int segmentsCount = chunkSize / segmentSize;
549             segmentOffsets = new int[segmentsCount];
550             for (int i = 0; i < segmentsCount; i++) {
551                 segmentOffsets[i] = i * segmentSize;
552             }
553         }
554 
555         @Override
556         public ChunkController create(MagazineGroup group) {
557             return new SizeClassChunkController(group, segmentSize, chunkSize, segmentOffsets);
558         }
559     }
560 
561     private static final class SizeClassChunkController implements ChunkController {
562 
563         private static final ChunkReleasePredicate FALSE_PREDICATE = new ChunkReleasePredicate() {
564             @Override
565             public boolean shouldReleaseChunk(int chunkSize) {
566                 return false;
567             }
568         };
569         private final ChunkAllocator chunkAllocator;
570         private final int segmentSize;
571         private final int chunkSize;
572         private final ChunkRegistry chunkRegistry;
573         private final int[] segmentOffsets;
574 
575         private SizeClassChunkController(MagazineGroup group, int segmentSize, int chunkSize, int[] segmentOffsets) {
576             chunkAllocator = group.chunkAllocator;
577             this.segmentSize = segmentSize;
578             this.chunkSize = chunkSize;
579             chunkRegistry = group.allocator.chunkRegistry;
580             this.segmentOffsets = segmentOffsets;
581         }
582 
583         @Override
584         public int computeBufferCapacity(
585                 int requestedSize, int maxCapacity, boolean isReallocation) {
586             return Math.min(segmentSize, maxCapacity);
587         }
588 
589         @Override
590         public void initializeSharedStateIn(ChunkController chunkController) {
591             // NOOP
592         }
593 
594         @Override
595         public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
596             AbstractByteBuf chunkBuffer = chunkAllocator.allocate(chunkSize, chunkSize);
597             assert chunkBuffer.capacity() == chunkSize;
598             SizeClassedChunk chunk = new SizeClassedChunk(chunkBuffer, magazine, true,
599                     segmentSize, segmentOffsets, FALSE_PREDICATE);
600             chunkRegistry.add(chunk);
601             return chunk;
602         }
603     }
604 
605     private static final class HistogramChunkControllerFactory implements ChunkControllerFactory {
606         private final boolean shareable;
607 
608         private HistogramChunkControllerFactory(boolean shareable) {
609             this.shareable = shareable;
610         }
611 
612         @Override
613         public ChunkController create(MagazineGroup group) {
614             return new HistogramChunkController(group, shareable);
615         }
616     }
617 
618     private static final class HistogramChunkController implements ChunkController, ChunkReleasePredicate {
619         private static final int MIN_DATUM_TARGET = 1024;
620         private static final int MAX_DATUM_TARGET = 65534;
621         private static final int INIT_DATUM_TARGET = 9;
622         private static final int HISTO_BUCKET_COUNT = 16;
623         private static final int[] HISTO_BUCKETS = {
624                 16 * 1024,
625                 24 * 1024,
626                 32 * 1024,
627                 48 * 1024,
628                 64 * 1024,
629                 96 * 1024,
630                 128 * 1024,
631                 192 * 1024,
632                 256 * 1024,
633                 384 * 1024,
634                 512 * 1024,
635                 768 * 1024,
636                 1024 * 1024,
637                 1792 * 1024,
638                 2048 * 1024,
639                 3072 * 1024
640         };
641 
642         private final MagazineGroup group;
643         private final boolean shareable;
644         private final short[][] histos = {
645                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
646                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
647         };
648         private final ChunkRegistry chunkRegistry;
649         private short[] histo = histos[0];
650         private final int[] sums = new int[HISTO_BUCKET_COUNT];
651 
652         private int histoIndex;
653         private int datumCount;
654         private int datumTarget = INIT_DATUM_TARGET;
655         private boolean hasHadRotation;
656         private volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
657         private volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
658         private volatile int localUpperBufSize;
659 
660         private HistogramChunkController(MagazineGroup group, boolean shareable) {
661             this.group = group;
662             this.shareable = shareable;
663             chunkRegistry = group.allocator.chunkRegistry;
664         }
665 
666         @Override
667         public int computeBufferCapacity(
668                 int requestedSize, int maxCapacity, boolean isReallocation) {
669             if (!isReallocation) {
670                 // Only record allocation size if it's not caused by a reallocation that was triggered by capacity
671                 // change of the buffer.
672                 recordAllocationSize(requestedSize);
673             }
674 
675             // Predict starting capacity from localUpperBufSize, but place limits on the max starting capacity
676             // based on the requested size, because localUpperBufSize can potentially be quite large.
677             int startCapLimits;
678             if (requestedSize <= 32768) { // Less than or equal to 32 KiB.
679                 startCapLimits = 65536; // Use at most 64 KiB, which is also the AdaptiveRecvByteBufAllocator max.
680             } else {
681                 startCapLimits = requestedSize * 2; // Otherwise use at most twice the requested memory.
682             }
683             int startingCapacity = Math.min(startCapLimits, localUpperBufSize);
684             startingCapacity = Math.max(requestedSize, Math.min(maxCapacity, startingCapacity));
685             return startingCapacity;
686         }
687 
688         private void recordAllocationSize(int bufferSizeToRecord) {
689             // Use the preserved size from the reused AdaptiveByteBuf, if available.
690             // Otherwise, use the requested buffer size.
691             // This way, we better take into account
692             if (bufferSizeToRecord == 0) {
693                 return;
694             }
695             int bucket = sizeToBucket(bufferSizeToRecord);
696             histo[bucket]++;
697             if (datumCount++ == datumTarget) {
698                 rotateHistograms();
699             }
700         }
701 
702         static int sizeToBucket(int size) {
703             int index = binarySearchInsertionPoint(Arrays.binarySearch(HISTO_BUCKETS, size));
704             return index >= HISTO_BUCKETS.length ? HISTO_BUCKETS.length - 1 : index;
705         }
706 
707         private static int binarySearchInsertionPoint(int index) {
708             if (index < 0) {
709                 index = -(index + 1);
710             }
711             return index;
712         }
713 
714         static int bucketToSize(int sizeBucket) {
715             return HISTO_BUCKETS[sizeBucket];
716         }
717 
718         private void rotateHistograms() {
719             short[][] hs = histos;
720             for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
721                 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
722             }
723             int sum = 0;
724             for (int count : sums) {
725                 sum  += count;
726             }
727             int targetPercentile = (int) (sum * 0.99);
728             int sizeBucket = 0;
729             for (; sizeBucket < sums.length; sizeBucket++) {
730                 if (sums[sizeBucket] > targetPercentile) {
731                     break;
732                 }
733                 targetPercentile -= sums[sizeBucket];
734             }
735             hasHadRotation = true;
736             int percentileSize = bucketToSize(sizeBucket);
737             int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
738             localUpperBufSize = percentileSize;
739             localPrefChunkSize = prefChunkSize;
740             if (shareable) {
741                 for (Magazine mag : group.magazines) {
742                     HistogramChunkController statistics = (HistogramChunkController) mag.chunkController;
743                     prefChunkSize = Math.max(prefChunkSize, statistics.localPrefChunkSize);
744                 }
745             }
746             if (sharedPrefChunkSize != prefChunkSize) {
747                 // Preferred chunk size changed. Increase check frequency.
748                 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
749                 sharedPrefChunkSize = prefChunkSize;
750             } else {
751                 // Preferred chunk size did not change. Check less often.
752                 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
753             }
754 
755             histoIndex = histoIndex + 1 & 3;
756             histo = histos[histoIndex];
757             datumCount = 0;
758             Arrays.fill(histo, (short) 0);
759         }
760 
761         /**
762          * Get the preferred chunk size, based on statistics from the {@linkplain #recordAllocationSize(int) recorded}
763          * allocation sizes.
764          * <p>
765          * This method must be thread-safe.
766          *
767          * @return The currently preferred chunk allocation size.
768          */
769         int preferredChunkSize() {
770             return sharedPrefChunkSize;
771         }
772 
773         @Override
774         public void initializeSharedStateIn(ChunkController chunkController) {
775             HistogramChunkController statistics = (HistogramChunkController) chunkController;
776             int sharedPrefChunkSize = this.sharedPrefChunkSize;
777             statistics.localPrefChunkSize = sharedPrefChunkSize;
778             statistics.sharedPrefChunkSize = sharedPrefChunkSize;
779         }
780 
781         @Override
782         public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
783             int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
784             int minChunks = size / MIN_CHUNK_SIZE;
785             if (MIN_CHUNK_SIZE * minChunks < size) {
786                 // Round up to nearest whole MIN_CHUNK_SIZE unit. The MIN_CHUNK_SIZE is an even multiple of many
787                 // popular small page sizes, like 4k, 16k, and 64k, which makes it easier for the system allocator
788                 // to manage the memory in terms of whole pages. This reduces memory fragmentation,
789                 // but without the potentially high overhead that power-of-2 chunk sizes would bring.
790                 size = MIN_CHUNK_SIZE * (1 + minChunks);
791             }
792 
793             // Limit chunks to the max size, even if the histogram suggests to go above it.
794             size = Math.min(size, MAX_CHUNK_SIZE);
795 
796             // If we haven't rotated the histogram yet, optimisticly record this chunk size as our preferred.
797             if (!hasHadRotation && sharedPrefChunkSize == MIN_CHUNK_SIZE) {
798                 sharedPrefChunkSize = size;
799             }
800 
801             ChunkAllocator chunkAllocator = group.chunkAllocator;
802             Chunk chunk = new Chunk(chunkAllocator.allocate(size, size), magazine, true, this);
803             chunkRegistry.add(chunk);
804             return chunk;
805         }
806 
807         @Override
808         public boolean shouldReleaseChunk(int chunkSize) {
809             int preferredSize = preferredChunkSize();
810             int givenChunks = chunkSize / MIN_CHUNK_SIZE;
811             int preferredChunks = preferredSize / MIN_CHUNK_SIZE;
812             int deviation = Math.abs(givenChunks - preferredChunks);
813 
814             // Retire chunks with a 5% probability per unit of MIN_CHUNK_SIZE deviation from preference.
815             return deviation != 0 &&
816                     ThreadLocalRandom.current().nextDouble() * 20.0 < deviation;
817         }
818     }
819 
820     @SuppressJava6Requirement(reason = "Guarded by version check")
821     private static final class Magazine {
822         private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
823         static {
824             NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
825         }
826         private static final Chunk MAGAZINE_FREED = new Chunk();
827 
828         private static final ObjectPool<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
829                 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
830                     @Override
831                     public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
832                         return new AdaptiveByteBuf(handle);
833                     }
834                 });
835 
836         private Chunk current;
837         @SuppressWarnings("unused") // updated via NEXT_IN_LINE
838         private volatile Chunk nextInLine;
839         private final MagazineGroup group;
840         private final ChunkController chunkController;
841         private final StampedLock allocationLock;
842         private final Queue<AdaptiveByteBuf> bufferQueue;
843         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
844         private final Queue<Chunk> sharedChunkQueue;
845 
846         Magazine(MagazineGroup group, boolean shareable, Queue<Chunk> sharedChunkQueue,
847                  ChunkController chunkController) {
848             this.group = group;
849             this.chunkController = chunkController;
850 
851             if (shareable) {
852                 // We only need the StampedLock if this Magazine will be shared across threads.
853                 allocationLock = new StampedLock();
854                 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
855                 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
856                     @Override
857                     public void recycle(AdaptiveByteBuf self) {
858                         bufferQueue.offer(self);
859                     }
860                 };
861             } else {
862                 allocationLock = null;
863                 bufferQueue = null;
864                 handle = null;
865             }
866             this.sharedChunkQueue = sharedChunkQueue;
867         }
868 
869         public boolean tryAllocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
870             if (allocationLock == null) {
871                 // This magazine is not shared across threads, just allocate directly.
872                 return allocate(size, maxCapacity, buf, reallocate);
873             }
874 
875             // Try to retrieve the lock and if successful allocate.
876             long writeLock = allocationLock.tryWriteLock();
877             if (writeLock != 0) {
878                 try {
879                     return allocate(size, maxCapacity, buf, reallocate);
880                 } finally {
881                     allocationLock.unlockWrite(writeLock);
882                 }
883             }
884             return allocateWithoutLock(size, maxCapacity, buf);
885         }
886 
887         private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf buf) {
888             Chunk curr = NEXT_IN_LINE.getAndSet(this, null);
889             if (curr == MAGAZINE_FREED) {
890                 // Allocation raced with a stripe-resize that freed this magazine.
891                 restoreMagazineFreed();
892                 return false;
893             }
894             if (curr == null) {
895                 curr = sharedChunkQueue.poll();
896                 if (curr == null) {
897                     return false;
898                 }
899                 curr.attachToMagazine(this);
900             }
901             boolean allocated = false;
902             int remainingCapacity = curr.remainingCapacity();
903             int startingCapacity = chunkController.computeBufferCapacity(
904                     size, maxCapacity, true /* never update stats as we don't hold the magazine lock */);
905             if (remainingCapacity >= size) {
906                 curr.readInitInto(buf, size, Math.min(remainingCapacity, startingCapacity), maxCapacity);
907                 allocated = true;
908             }
909             try {
910                 if (remainingCapacity >= RETIRE_CAPACITY) {
911                     transferToNextInLineOrRelease(curr);
912                     curr = null;
913                 }
914             } finally {
915                 if (curr != null) {
916                     curr.releaseFromMagazine();
917                 }
918             }
919             return allocated;
920         }
921 
922         private boolean allocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
923             int startingCapacity = chunkController.computeBufferCapacity(size, maxCapacity, reallocate);
924             Chunk curr = current;
925             if (curr != null) {
926                 // We have a Chunk that has some space left.
927                 int remainingCapacity = curr.remainingCapacity();
928                 if (remainingCapacity > startingCapacity) {
929                     curr.readInitInto(buf, size, startingCapacity, maxCapacity);
930                     // We still have some bytes left that we can use for the next allocation, just early return.
931                     return true;
932                 }
933 
934                 // At this point we know that this will be the last time current will be used, so directly set it to
935                 // null and release it once we are done.
936                 current = null;
937                 if (remainingCapacity >= size) {
938                     try {
939                         curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
940                         return true;
941                     } finally {
942                         curr.releaseFromMagazine();
943                     }
944                 }
945 
946                 // Check if we either retain the chunk in the nextInLine cache or releasing it.
947                 if (remainingCapacity < RETIRE_CAPACITY) {
948                     curr.releaseFromMagazine();
949                 } else {
950                     // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
951                     // This method will release curr if this is not the case
952                     transferToNextInLineOrRelease(curr);
953                 }
954             }
955 
956             assert current == null;
957             // The fast-path for allocations did not work.
958             //
959             // Try to fetch the next "Magazine local" Chunk first, if this fails because we don't have a
960             // next-in-line chunk available, we will poll our centralQueue.
961             // If this fails as well we will just allocate a new Chunk.
962             //
963             // In any case we will store the Chunk as the current so it will be used again for the next allocation and
964             // thus be "reserved" by this Magazine for exclusive usage.
965             curr = NEXT_IN_LINE.getAndSet(this, null);
966             if (curr != null) {
967                 if (curr == MAGAZINE_FREED) {
968                     // Allocation raced with a stripe-resize that freed this magazine.
969                     restoreMagazineFreed();
970                     return false;
971                 }
972 
973                 int remainingCapacity = curr.remainingCapacity();
974                 if (remainingCapacity > startingCapacity) {
975                     // We have a Chunk that has some space left.
976                     curr.readInitInto(buf, size, startingCapacity, maxCapacity);
977                     current = curr;
978                     return true;
979                 }
980 
981                 if (remainingCapacity >= size) {
982                     // At this point we know that this will be the last time curr will be used, so directly set it to
983                     // null and release it once we are done.
984                     try {
985                         curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
986                         return true;
987                     } finally {
988                         // Release in a finally block so even if readInitInto(...) would throw we would still correctly
989                         // release the current chunk before null it out.
990                         curr.releaseFromMagazine();
991                     }
992                 } else {
993                     // Release it as it's too small.
994                     curr.releaseFromMagazine();
995                 }
996             }
997 
998             // Now try to poll from the central queue first
999             curr = sharedChunkQueue.poll();
1000             if (curr == null) {
1001                 curr = chunkController.newChunkAllocation(size, this);
1002             } else {
1003                 curr.attachToMagazine(this);
1004 
1005                 int remainingCapacity = curr.remainingCapacity();
1006                 if (remainingCapacity == 0 || remainingCapacity < size) {
1007                     // Check if we either retain the chunk in the nextInLine cache or releasing it.
1008                     if (remainingCapacity < RETIRE_CAPACITY) {
1009                         curr.releaseFromMagazine();
1010                     } else {
1011                         // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
1012                         // This method will release curr if this is not the case
1013                         transferToNextInLineOrRelease(curr);
1014                     }
1015                     curr = chunkController.newChunkAllocation(size, this);
1016                 }
1017             }
1018 
1019             current = curr;
1020             try {
1021                 int remainingCapacity = curr.remainingCapacity();
1022                 assert remainingCapacity >= size;
1023                 if (remainingCapacity > startingCapacity) {
1024                     curr.readInitInto(buf, size, startingCapacity, maxCapacity);
1025                     curr = null;
1026                 } else {
1027                     curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
1028                 }
1029             } finally {
1030                 if (curr != null) {
1031                     // Release in a finally block so even if readInitInto(...) would throw we would still correctly
1032                     // release the current chunk before null it out.
1033                     curr.releaseFromMagazine();
1034                     current = null;
1035                 }
1036             }
1037             return true;
1038         }
1039 
1040         private void restoreMagazineFreed() {
1041             Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
1042             if (next != null && next != MAGAZINE_FREED) {
1043                 // A chunk snuck in through a race. Release it after restoring MAGAZINE_FREED state.
1044                 next.releaseFromMagazine();
1045             }
1046         }
1047 
1048         private void transferToNextInLineOrRelease(Chunk chunk) {
1049             if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
1050                 return;
1051             }
1052 
1053             Chunk nextChunk = NEXT_IN_LINE.get(this);
1054             if (nextChunk != null && nextChunk != MAGAZINE_FREED
1055                     && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
1056                 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
1057                     nextChunk.releaseFromMagazine();
1058                     return;
1059                 }
1060             }
1061             // Next-in-line is occupied. We don't try to add it to the central queue yet as it might still be used
1062             // by some buffers and so is attached to a Magazine.
1063             // Once a Chunk is completely released by Chunk.release() it will try to move itself to the queue
1064             // as last resort.
1065             chunk.releaseFromMagazine();
1066         }
1067 
1068         boolean trySetNextInLine(Chunk chunk) {
1069             return NEXT_IN_LINE.compareAndSet(this, null, chunk);
1070         }
1071 
1072         void free() {
1073             // Release the current Chunk and the next that was stored for later usage.
1074             restoreMagazineFreed();
1075             long stamp = allocationLock != null ? allocationLock.writeLock() : 0;
1076             try {
1077                 if (current != null) {
1078                     current.releaseFromMagazine();
1079                     current = null;
1080                 }
1081             } finally {
1082                 if (allocationLock != null) {
1083                     allocationLock.unlockWrite(stamp);
1084                 }
1085             }
1086         }
1087 
1088         public AdaptiveByteBuf newBuffer() {
1089             AdaptiveByteBuf buf;
1090             if (handle == null) {
1091                 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
1092             } else {
1093                 buf = bufferQueue.poll();
1094                 if (buf == null) {
1095                     buf = new AdaptiveByteBuf(handle);
1096                 }
1097             }
1098             buf.resetRefCnt();
1099             buf.discardMarks();
1100             return buf;
1101         }
1102 
1103         boolean offerToQueue(Chunk chunk) {
1104             return group.offerToQueue(chunk);
1105         }
1106 
1107         public void initializeSharedStateIn(Magazine other) {
1108             chunkController.initializeSharedStateIn(other.chunkController);
1109         }
1110     }
1111 
1112     @SuppressJava6Requirement(reason = "Guarded by version check")
1113     private static final class ChunkRegistry {
1114         private final LongAdder totalCapacity = new LongAdder();
1115 
1116         public long totalCapacity() {
1117             return totalCapacity.sum();
1118         }
1119 
1120         public void add(Chunk chunk) {
1121             totalCapacity.add(chunk.capacity());
1122         }
1123 
1124         public void remove(Chunk chunk) {
1125             totalCapacity.add(-chunk.capacity());
1126         }
1127     }
1128 
1129     private static class Chunk implements ReferenceCounted {
1130         private static final long REFCNT_FIELD_OFFSET =
1131                 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
1132         private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
1133                 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
1134 
1135         protected final AbstractByteBuf delegate;
1136         protected Magazine magazine;
1137         private final AdaptivePoolingAllocator allocator;
1138         private final ChunkReleasePredicate chunkReleasePredicate;
1139         private final int capacity;
1140         private final boolean pooled;
1141         protected int allocatedBytes;
1142 
1143         private static final ReferenceCountUpdater<Chunk> updater =
1144                 new ReferenceCountUpdater<Chunk>() {
1145                     @Override
1146                     protected AtomicIntegerFieldUpdater<Chunk> updater() {
1147                         return AIF_UPDATER;
1148                     }
1149                     @Override
1150                     protected long unsafeOffset() {
1151                         // on native image, REFCNT_FIELD_OFFSET can be recomputed even with Unsafe unavailable, so we
1152                         // need to guard here
1153                         return PlatformDependent.hasUnsafe() ? REFCNT_FIELD_OFFSET : -1;
1154                     }
1155                 };
1156 
1157         // Value might not equal "real" reference count, all access should be via the updater
1158         @SuppressWarnings({"unused", "FieldMayBeFinal"})
1159         private volatile int refCnt;
1160 
1161         Chunk() {
1162             // Constructor only used by the MAGAZINE_FREED sentinel.
1163             delegate = null;
1164             magazine = null;
1165             allocator = null;
1166             chunkReleasePredicate = null;
1167             capacity = 0;
1168             pooled = false;
1169         }
1170 
1171         Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled,
1172               ChunkReleasePredicate chunkReleasePredicate) {
1173             this.delegate = delegate;
1174             this.pooled = pooled;
1175             capacity = delegate.capacity();
1176             updater.setInitialValue(this);
1177             attachToMagazine(magazine);
1178 
1179             // We need the top-level allocator so ByteBuf.capacity(int) can call reallocate()
1180             allocator = magazine.group.allocator;
1181 
1182             this.chunkReleasePredicate = chunkReleasePredicate;
1183         }
1184 
1185         Magazine currentMagazine()  {
1186             return magazine;
1187         }
1188 
1189         void detachFromMagazine() {
1190             if (magazine != null) {
1191                 magazine = null;
1192             }
1193         }
1194 
1195         void attachToMagazine(Magazine magazine) {
1196             assert this.magazine == null;
1197             this.magazine = magazine;
1198         }
1199 
1200         @Override
1201         public Chunk touch(Object hint) {
1202             return this;
1203         }
1204 
1205         @Override
1206         public int refCnt() {
1207             return updater.refCnt(this);
1208         }
1209 
1210         @Override
1211         public Chunk retain() {
1212             return updater.retain(this);
1213         }
1214 
1215         @Override
1216         public Chunk retain(int increment) {
1217             return updater.retain(this, increment);
1218         }
1219 
1220         @Override
1221         public Chunk touch() {
1222             return this;
1223         }
1224 
1225         @Override
1226         public boolean release() {
1227             if (updater.release(this)) {
1228                 deallocate();
1229                 return true;
1230             }
1231             return false;
1232         }
1233 
1234         @Override
1235         public boolean release(int decrement) {
1236             if (updater.release(this, decrement)) {
1237                 deallocate();
1238                 return true;
1239             }
1240             return false;
1241         }
1242 
1243         /**
1244          * Called when a magazine is done using this chunk, probably because it was emptied.
1245          */
1246         boolean releaseFromMagazine() {
1247             return release();
1248         }
1249 
1250         /**
1251          * Called when a ByteBuf is done using its allocation in this chunk.
1252          */
1253         boolean releaseSegment(int ignoredSegmentId) {
1254             return release();
1255         }
1256 
1257         private void deallocate() {
1258             Magazine mag = magazine;
1259             int chunkSize = delegate.capacity();
1260             if (!pooled || chunkReleasePredicate.shouldReleaseChunk(chunkSize) || mag == null) {
1261                 // Drop the chunk if the parent allocator is closed,
1262                 // or if the chunk deviates too much from the preferred chunk size.
1263                 detachFromMagazine();
1264                 allocator.chunkRegistry.remove(this);
1265                 delegate.release();
1266             } else {
1267                 updater.resetRefCnt(this);
1268                 delegate.setIndex(0, 0);
1269                 allocatedBytes = 0;
1270                 if (!mag.trySetNextInLine(this)) {
1271                     // As this Chunk does not belong to the mag anymore we need to decrease the used memory .
1272                     detachFromMagazine();
1273                     if (!mag.offerToQueue(this)) {
1274                         // The central queue is full. Ensure we release again as we previously did use resetRefCnt()
1275                         // which did increase the reference count by 1.
1276                         boolean released = updater.release(this);
1277                         allocator.chunkRegistry.remove(this);
1278                         delegate.release();
1279                         assert released;
1280                     }
1281                 }
1282             }
1283         }
1284 
1285         public void readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1286             int startIndex = allocatedBytes;
1287             allocatedBytes = startIndex + startingCapacity;
1288             Chunk chunk = this;
1289             chunk.retain();
1290             try {
1291                 buf.init(delegate, chunk, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1292                 chunk = null;
1293             } finally {
1294                 if (chunk != null) {
1295                     // If chunk is not null we know that buf.init(...) failed and so we need to manually release
1296                     // the chunk again as we retained it before calling buf.init(...). Beside this we also need to
1297                     // restore the old allocatedBytes value.
1298                     allocatedBytes = startIndex;
1299                     chunk.release();
1300                 }
1301             }
1302         }
1303 
1304         public int remainingCapacity() {
1305             return capacity - allocatedBytes;
1306         }
1307 
1308         public int capacity() {
1309             return capacity;
1310         }
1311     }
1312 
1313     private static final class SizeClassedChunk extends Chunk {
1314         private static final int FREE_LIST_EMPTY = -1;
1315         private final int segmentSize;
1316         private final MpscIntQueue freeList;
1317 
1318         SizeClassedChunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled, int segmentSize,
1319                          final int[] segmentOffsets, ChunkReleasePredicate shouldReleaseChunk) {
1320             super(delegate, magazine, pooled, shouldReleaseChunk);
1321             this.segmentSize = segmentSize;
1322             int segmentCount = segmentOffsets.length;
1323             assert delegate.capacity() / segmentSize == segmentCount;
1324             assert segmentCount > 0: "Chunk must have a positive number of segments";
1325             freeList = new MpscAtomicIntegerArrayQueue(segmentCount, FREE_LIST_EMPTY);
1326             freeList.fill(segmentCount, new IntSupplier() {
1327                 int counter;
1328                 @Override
1329                 public int get() {
1330                     return segmentOffsets[counter++];
1331                 }
1332             });
1333         }
1334 
1335         @Override
1336         public void readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1337             int startIndex = freeList.poll();
1338             if (startIndex == FREE_LIST_EMPTY) {
1339                 throw new IllegalStateException("Free list is empty");
1340             }
1341             allocatedBytes += segmentSize;
1342             Chunk chunk = this;
1343             chunk.retain();
1344             try {
1345                 buf.init(delegate, chunk, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1346                 chunk = null;
1347             } finally {
1348                 if (chunk != null) {
1349                     // If chunk is not null we know that buf.init(...) failed and so we need to manually release
1350                     // the chunk again as we retained it before calling buf.init(...). Beside this we also need to
1351                     // restore the old allocatedBytes value.
1352                     allocatedBytes -= segmentSize;
1353                     chunk.releaseSegment(startIndex);
1354                 }
1355             }
1356         }
1357 
1358         @Override
1359         public int remainingCapacity() {
1360             int remainingCapacity = super.remainingCapacity();
1361             if (remainingCapacity > segmentSize) {
1362                 return remainingCapacity;
1363             }
1364             int updatedRemainingCapacity = freeList.size() * segmentSize;
1365             if (updatedRemainingCapacity == remainingCapacity) {
1366                 return remainingCapacity;
1367             }
1368             // update allocatedBytes based on what's available in the free list
1369             allocatedBytes = capacity() - updatedRemainingCapacity;
1370             return updatedRemainingCapacity;
1371         }
1372 
1373         @Override
1374         boolean releaseFromMagazine() {
1375             // Size-classed chunks can be reused before they become empty.
1376             // We can therefor put them in the shared queue as soon as the magazine is done with this chunk.
1377             Magazine mag = magazine;
1378             detachFromMagazine();
1379             if (!mag.offerToQueue(this)) {
1380                 return super.releaseFromMagazine();
1381             }
1382             return false;
1383         }
1384 
1385         @Override
1386         boolean releaseSegment(int startIndex) {
1387             boolean released = release();
1388             boolean segmentReturned = freeList.offer(startIndex);
1389             assert segmentReturned: "Unable to return segment " + startIndex + " to free list";
1390             return released;
1391         }
1392     }
1393 
1394     static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
1395 
1396         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
1397 
1398         // this both act as adjustment and the start index for a free list segment allocation
1399         private int startIndex;
1400         private AbstractByteBuf rootParent;
1401         Chunk chunk;
1402         private int length;
1403         private int maxFastCapacity;
1404         private ByteBuffer tmpNioBuf;
1405         private boolean hasArray;
1406         private boolean hasMemoryAddress;
1407 
1408         AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
1409             super(0);
1410             handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
1411         }
1412 
1413         void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
1414                   int startIndex, int size, int capacity, int maxCapacity) {
1415             this.startIndex = startIndex;
1416             chunk = wrapped;
1417             length = size;
1418             maxFastCapacity = capacity;
1419             maxCapacity(maxCapacity);
1420             setIndex0(readerIndex, writerIndex);
1421             hasArray = unwrapped.hasArray();
1422             hasMemoryAddress = unwrapped.hasMemoryAddress();
1423             rootParent = unwrapped;
1424             tmpNioBuf = null;
1425         }
1426 
1427         private AbstractByteBuf rootParent() {
1428             final AbstractByteBuf rootParent = this.rootParent;
1429             if (rootParent != null) {
1430                 return rootParent;
1431             }
1432             throw new IllegalReferenceCountException();
1433         }
1434 
1435         @Override
1436         public int capacity() {
1437             return length;
1438         }
1439 
1440         @Override
1441         public int maxFastWritableBytes() {
1442             return Math.min(maxFastCapacity, maxCapacity()) - writerIndex;
1443         }
1444 
1445         @Override
1446         public ByteBuf capacity(int newCapacity) {
1447             if (length <= newCapacity && newCapacity <= maxFastCapacity) {
1448                 ensureAccessible();
1449                 length = newCapacity;
1450                 return this;
1451             }
1452             checkNewCapacity(newCapacity);
1453             if (newCapacity < capacity()) {
1454                 length = newCapacity;
1455                 trimIndicesToCapacity(newCapacity);
1456                 return this;
1457             }
1458 
1459             // Reallocation required.
1460             Chunk chunk = this.chunk;
1461             AdaptivePoolingAllocator allocator = chunk.allocator;
1462             int readerIndex = this.readerIndex;
1463             int writerIndex = this.writerIndex;
1464             int baseOldRootIndex = startIndex;
1465             int oldCapacity = length;
1466             AbstractByteBuf oldRoot = rootParent();
1467             allocator.reallocate(newCapacity, maxCapacity(), this);
1468             oldRoot.getBytes(baseOldRootIndex, this, 0, oldCapacity);
1469             chunk.releaseSegment(baseOldRootIndex);
1470             this.readerIndex = readerIndex;
1471             this.writerIndex = writerIndex;
1472             return this;
1473         }
1474 
1475         @Override
1476         public ByteBufAllocator alloc() {
1477             return rootParent().alloc();
1478         }
1479 
1480         @Override
1481         public ByteOrder order() {
1482             return rootParent().order();
1483         }
1484 
1485         @Override
1486         public ByteBuf unwrap() {
1487             return null;
1488         }
1489 
1490         @Override
1491         public boolean isDirect() {
1492             return rootParent().isDirect();
1493         }
1494 
1495         @Override
1496         public int arrayOffset() {
1497             return idx(rootParent().arrayOffset());
1498         }
1499 
1500         @Override
1501         public boolean hasMemoryAddress() {
1502             return hasMemoryAddress;
1503         }
1504 
1505         @Override
1506         public long memoryAddress() {
1507             ensureAccessible();
1508             return rootParent().memoryAddress() + startIndex;
1509         }
1510 
1511         @Override
1512         public ByteBuffer nioBuffer(int index, int length) {
1513             checkIndex(index, length);
1514             return rootParent().nioBuffer(idx(index), length);
1515         }
1516 
1517         @Override
1518         public ByteBuffer internalNioBuffer(int index, int length) {
1519             checkIndex(index, length);
1520             return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1521         }
1522 
1523         private ByteBuffer internalNioBuffer() {
1524             if (tmpNioBuf == null) {
1525                 tmpNioBuf = rootParent().nioBuffer(startIndex, maxFastCapacity);
1526             }
1527             return (ByteBuffer) tmpNioBuf.clear();
1528         }
1529 
1530         @Override
1531         public ByteBuffer[] nioBuffers(int index, int length) {
1532             checkIndex(index, length);
1533             return rootParent().nioBuffers(idx(index), length);
1534         }
1535 
1536         @Override
1537         public boolean hasArray() {
1538             return hasArray;
1539         }
1540 
1541         @Override
1542         public byte[] array() {
1543             ensureAccessible();
1544             return rootParent().array();
1545         }
1546 
1547         @Override
1548         public ByteBuf copy(int index, int length) {
1549             checkIndex(index, length);
1550             return rootParent().copy(idx(index), length);
1551         }
1552 
1553         @Override
1554         public int nioBufferCount() {
1555             return rootParent().nioBufferCount();
1556         }
1557 
1558         @Override
1559         protected byte _getByte(int index) {
1560             return rootParent()._getByte(idx(index));
1561         }
1562 
1563         @Override
1564         protected short _getShort(int index) {
1565             return rootParent()._getShort(idx(index));
1566         }
1567 
1568         @Override
1569         protected short _getShortLE(int index) {
1570             return rootParent()._getShortLE(idx(index));
1571         }
1572 
1573         @Override
1574         protected int _getUnsignedMedium(int index) {
1575             return rootParent()._getUnsignedMedium(idx(index));
1576         }
1577 
1578         @Override
1579         protected int _getUnsignedMediumLE(int index) {
1580             return rootParent()._getUnsignedMediumLE(idx(index));
1581         }
1582 
1583         @Override
1584         protected int _getInt(int index) {
1585             return rootParent()._getInt(idx(index));
1586         }
1587 
1588         @Override
1589         protected int _getIntLE(int index) {
1590             return rootParent()._getIntLE(idx(index));
1591         }
1592 
1593         @Override
1594         protected long _getLong(int index) {
1595             return rootParent()._getLong(idx(index));
1596         }
1597 
1598         @Override
1599         protected long _getLongLE(int index) {
1600             return rootParent()._getLongLE(idx(index));
1601         }
1602 
1603         @Override
1604         public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1605             checkIndex(index, length);
1606             rootParent().getBytes(idx(index), dst, dstIndex, length);
1607             return this;
1608         }
1609 
1610         @Override
1611         public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1612             checkIndex(index, length);
1613             rootParent().getBytes(idx(index), dst, dstIndex, length);
1614             return this;
1615         }
1616 
1617         @Override
1618         public ByteBuf getBytes(int index, ByteBuffer dst) {
1619             checkIndex(index, dst.remaining());
1620             rootParent().getBytes(idx(index), dst);
1621             return this;
1622         }
1623 
1624         @Override
1625         protected void _setByte(int index, int value) {
1626             rootParent()._setByte(idx(index), value);
1627         }
1628 
1629         @Override
1630         protected void _setShort(int index, int value) {
1631             rootParent()._setShort(idx(index), value);
1632         }
1633 
1634         @Override
1635         protected void _setShortLE(int index, int value) {
1636             rootParent()._setShortLE(idx(index), value);
1637         }
1638 
1639         @Override
1640         protected void _setMedium(int index, int value) {
1641             rootParent()._setMedium(idx(index), value);
1642         }
1643 
1644         @Override
1645         protected void _setMediumLE(int index, int value) {
1646             rootParent()._setMediumLE(idx(index), value);
1647         }
1648 
1649         @Override
1650         protected void _setInt(int index, int value) {
1651             rootParent()._setInt(idx(index), value);
1652         }
1653 
1654         @Override
1655         protected void _setIntLE(int index, int value) {
1656             rootParent()._setIntLE(idx(index), value);
1657         }
1658 
1659         @Override
1660         protected void _setLong(int index, long value) {
1661             rootParent()._setLong(idx(index), value);
1662         }
1663 
1664         @Override
1665         protected void _setLongLE(int index, long value) {
1666             rootParent().setLongLE(idx(index), value);
1667         }
1668 
1669         @Override
1670         public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1671             checkIndex(index, length);
1672             ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1673             tmp.put(src, srcIndex, length);
1674             return this;
1675         }
1676 
1677         @Override
1678         public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1679             checkIndex(index, length);
1680             ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1681             tmp.put(src.nioBuffer(srcIndex, length));
1682             return this;
1683         }
1684 
1685         @Override
1686         public ByteBuf setBytes(int index, ByteBuffer src) {
1687             checkIndex(index, src.remaining());
1688             ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1689             tmp.put(src);
1690             return this;
1691         }
1692 
1693         @Override
1694         public ByteBuf getBytes(int index, OutputStream out, int length)
1695                 throws IOException {
1696             checkIndex(index, length);
1697             if (length != 0) {
1698                 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1699             }
1700             return this;
1701         }
1702 
1703         @Override
1704         public int getBytes(int index, GatheringByteChannel out, int length)
1705                 throws IOException {
1706             ByteBuffer buf = internalNioBuffer().duplicate();
1707             buf.clear().position(index).limit(index + length);
1708             return out.write(buf);
1709         }
1710 
1711         @Override
1712         public int getBytes(int index, FileChannel out, long position, int length)
1713                 throws IOException {
1714             ByteBuffer buf = internalNioBuffer().duplicate();
1715             buf.clear().position(index).limit(index + length);
1716             return out.write(buf, position);
1717         }
1718 
1719         @Override
1720         public int setBytes(int index, InputStream in, int length)
1721                 throws IOException {
1722             checkIndex(index, length);
1723             final AbstractByteBuf rootParent = rootParent();
1724             if (rootParent.hasArray()) {
1725                 return rootParent.setBytes(idx(index), in, length);
1726             }
1727             byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1728             int readBytes = in.read(tmp, 0, length);
1729             if (readBytes <= 0) {
1730                 return readBytes;
1731             }
1732             setBytes(index, tmp, 0, readBytes);
1733             return readBytes;
1734         }
1735 
1736         @Override
1737         public int setBytes(int index, ScatteringByteChannel in, int length)
1738                 throws IOException {
1739             try {
1740                 return in.read(internalNioBuffer(index, length));
1741             } catch (ClosedChannelException ignored) {
1742                 return -1;
1743             }
1744         }
1745 
1746         @Override
1747         public int setBytes(int index, FileChannel in, long position, int length)
1748                 throws IOException {
1749             try {
1750                 return in.read(internalNioBuffer(index, length), position);
1751             } catch (ClosedChannelException ignored) {
1752                 return -1;
1753             }
1754         }
1755 
1756         @Override
1757         public int setCharSequence(int index, CharSequence sequence, Charset charset) {
1758             return setCharSequence0(index, sequence, charset, false);
1759         }
1760 
1761         private int setCharSequence0(int index, CharSequence sequence, Charset charset, boolean expand) {
1762             if (charset.equals(CharsetUtil.UTF_8)) {
1763                 int length = ByteBufUtil.utf8MaxBytes(sequence);
1764                 if (expand) {
1765                     ensureWritable0(length);
1766                     checkIndex0(index, length);
1767                 } else {
1768                     checkIndex(index, length);
1769                 }
1770                 return ByteBufUtil.writeUtf8(this, index, length, sequence, sequence.length());
1771             }
1772             if (charset.equals(CharsetUtil.US_ASCII) || charset.equals(CharsetUtil.ISO_8859_1)) {
1773                 int length = sequence.length();
1774                 if (expand) {
1775                     ensureWritable0(length);
1776                     checkIndex0(index, length);
1777                 } else {
1778                     checkIndex(index, length);
1779                 }
1780                 return ByteBufUtil.writeAscii(this, index, sequence, length);
1781             }
1782             byte[] bytes = sequence.toString().getBytes(charset);
1783             if (expand) {
1784                 ensureWritable0(bytes.length);
1785                 // setBytes(...) will take care of checking the indices.
1786             }
1787             setBytes(index, bytes);
1788             return bytes.length;
1789         }
1790 
1791         @Override
1792         public int writeCharSequence(CharSequence sequence, Charset charset) {
1793             int written = setCharSequence0(writerIndex, sequence, charset, true);
1794             writerIndex += written;
1795             return written;
1796         }
1797 
1798         @Override
1799         public int forEachByte(int index, int length, ByteProcessor processor) {
1800             checkIndex(index, length);
1801             int ret = rootParent().forEachByte(idx(index), length, processor);
1802             return forEachResult(ret);
1803         }
1804 
1805         @Override
1806         public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1807             checkIndex(index, length);
1808             int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1809             return forEachResult(ret);
1810         }
1811 
1812         @Override
1813         public ByteBuf setZero(int index, int length) {
1814             checkIndex(index, length);
1815             rootParent().setZero(idx(index), length);
1816             return this;
1817         }
1818 
1819         @Override
1820         public ByteBuf writeZero(int length) {
1821             ensureWritable(length);
1822             rootParent().setZero(idx(writerIndex), length);
1823             writerIndex += length;
1824             return this;
1825         }
1826 
1827         private int forEachResult(int ret) {
1828             if (ret < startIndex) {
1829                 return -1;
1830             }
1831             return ret - startIndex;
1832         }
1833 
1834         @Override
1835         public boolean isContiguous() {
1836             return rootParent().isContiguous();
1837         }
1838 
1839         private int idx(int index) {
1840             return index + startIndex;
1841         }
1842 
1843         @Override
1844         protected void deallocate() {
1845             if (chunk != null) {
1846                 chunk.releaseSegment(startIndex);
1847             }
1848             tmpNioBuf = null;
1849             chunk = null;
1850             rootParent = null;
1851             if (handle instanceof EnhancedHandle) {
1852                 EnhancedHandle<AdaptiveByteBuf>  enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1853                 enhancedHandle.unguardedRecycle(this);
1854             } else {
1855                 handle.recycle(this);
1856             }
1857         }
1858     }
1859 
1860     /**
1861      * The strategy for how {@link AdaptivePoolingAllocator} should allocate chunk buffers.
1862      */
1863     interface ChunkAllocator {
1864         /**
1865          * Allocate a buffer for a chunk. This can be any kind of {@link AbstractByteBuf} implementation.
1866          * @param initialCapacity The initial capacity of the returned {@link AbstractByteBuf}.
1867          * @param maxCapacity The maximum capacity of the returned {@link AbstractByteBuf}.
1868          * @return The buffer that represents the chunk memory.
1869          */
1870         AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1871     }
1872 }