View Javadoc
1   /*
2    * Copyright 2022 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.buffer;
17  
18  import io.netty.util.ByteProcessor;
19  import io.netty.util.CharsetUtil;
20  import io.netty.util.IllegalReferenceCountException;
21  import io.netty.util.NettyRuntime;
22  import io.netty.util.Recycler;
23  import io.netty.util.Recycler.EnhancedHandle;
24  import io.netty.util.ReferenceCounted;
25  import io.netty.util.concurrent.FastThreadLocal;
26  import io.netty.util.concurrent.FastThreadLocalThread;
27  import io.netty.util.concurrent.MpscIntQueue;
28  import io.netty.util.internal.ObjectPool;
29  import io.netty.util.internal.ObjectUtil;
30  import io.netty.util.internal.PlatformDependent;
31  import io.netty.util.internal.ReferenceCountUpdater;
32  import io.netty.util.internal.SystemPropertyUtil;
33  import io.netty.util.internal.ThreadExecutorMap;
34  import io.netty.util.internal.UnstableApi;
35  
36  import java.io.IOException;
37  import java.io.InputStream;
38  import java.io.OutputStream;
39  import java.nio.ByteBuffer;
40  import java.nio.ByteOrder;
41  import java.nio.channels.ClosedChannelException;
42  import java.nio.channels.FileChannel;
43  import java.nio.channels.GatheringByteChannel;
44  import java.nio.channels.ScatteringByteChannel;
45  import java.nio.charset.Charset;
46  import java.util.Arrays;
47  import java.util.Queue;
48  import java.util.Set;
49  import java.util.concurrent.ConcurrentHashMap;
50  import java.util.concurrent.ConcurrentLinkedQueue;
51  import java.util.concurrent.ThreadLocalRandom;
52  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
53  import java.util.concurrent.atomic.AtomicLong;
54  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
55  import java.util.concurrent.locks.StampedLock;
56  import java.util.function.IntSupplier;
57  
58  /**
59   * An auto-tuning pooling allocator, that follows an anti-generational hypothesis.
60   * <p>
61   * The allocator is organized into a list of Magazines, and each magazine has a chunk-buffer that they allocate buffers
62   * from.
63   * <p>
64   * The magazines hold the mutexes that ensure the thread-safety of the allocator, and each thread picks a magazine
65   * based on the id of the thread. This spreads the contention of multi-threaded access across the magazines.
66   * If contention is detected above a certain threshold, the number of magazines are increased in response to the
67   * contention.
68   * <p>
69   * The magazines maintain histograms of the sizes of the allocations they do. The histograms are used to compute the
70   * preferred chunk size. The preferred chunk size is one that is big enough to service 10 allocations of the
71   * 99-percentile size. This way, the chunk size is adapted to the allocation patterns.
72   * <p>
73   * Computing the preferred chunk size is a somewhat expensive operation. Therefore, the frequency with which this is
74   * done, is also adapted to the allocation pattern. If a newly computed preferred chunk is the same as the previous
75   * preferred chunk size, then the frequency is reduced. Otherwise, the frequency is increased.
76   * <p>
77   * This allows the allocator to quickly respond to changes in the application workload,
78   * without suffering undue overhead from maintaining its statistics.
79   * <p>
80   * Since magazines are "relatively thread-local", the allocator has a central queue that allow excess chunks from any
81   * magazine, to be shared with other magazines.
82   * The {@link #createSharedChunkQueue()} method can be overridden to customize this queue.
83   */
84  @UnstableApi
85  final class AdaptivePoolingAllocator {
86      /**
87       * The 128 KiB minimum chunk size is chosen to encourage the system allocator to delegate to mmap for chunk
88       * allocations. For instance, glibc will do this.
89       * This pushes any fragmentation from chunk size deviations off physical memory, onto virtual memory,
90       * which is a much, much larger space. Chunks are also allocated in whole multiples of the minimum
91       * chunk size, which itself is a whole multiple of popular page sizes like 4 KiB, 16 KiB, and 64 KiB.
92       */
93      static final int MIN_CHUNK_SIZE = 128 * 1024;
94      private static final int EXPANSION_ATTEMPTS = 3;
95      private static final int INITIAL_MAGAZINES = 1;
96      private static final int RETIRE_CAPACITY = 256;
97      private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
98      private static final int BUFS_PER_CHUNK = 8; // For large buffers, aim to have about this many buffers per chunk.
99  
100     /**
101      * The maximum size of a pooled chunk, in bytes. Allocations bigger than this will never be pooled.
102      * <p>
103      * This number is 8 MiB, and is derived from the limitations of internal histograms.
104      */
105     private static final int MAX_CHUNK_SIZE = 8 * 1024 * 1024; // 8 MiB.
106     private static final int MAX_POOLED_BUF_SIZE = MAX_CHUNK_SIZE / BUFS_PER_CHUNK;
107 
108     /**
109      * The capacity if the chunk reuse queues, that allow chunks to be shared across magazines in a group.
110      * The default size is twice {@link NettyRuntime#availableProcessors()},
111      * same as the maximum number of magazines per magazine group.
112      */
113     private static final int CHUNK_REUSE_QUEUE = Math.max(2, SystemPropertyUtil.getInt(
114             "io.netty.allocator.chunkReuseQueueCapacity", NettyRuntime.availableProcessors() * 2));
115 
116     /**
117      * The capacity if the magazine local buffer queue. This queue just pools the outer ByteBuf instance and not
118      * the actual memory and so helps to reduce GC pressure.
119      */
120     private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
121             "io.netty.allocator.magazineBufferQueueCapacity", 1024);
122 
123     /**
124      * The size classes are chosen based on the following observation:
125      * <p>
126      * Most allocations, particularly ones above 256 bytes, aim to be a power-of-2. However, many use cases, such
127      * as framing protocols, are themselves operating or moving power-of-2 sized payloads, to which they add a
128      * small amount of overhead, such as headers or checksums.
129      * This means we seem to get a lot of mileage out of having both power-of-2 sizes, and power-of-2-plus-a-bit.
130      * <p>
131      * On the conflicting requirements of both having as few chunks as possible, and having as little wasted
132      * memory within each chunk as possible, this seems to strike a surprisingly good balance for the use cases
133      * tested so far.
134      */
135     private static final int[] SIZE_CLASSES = {
136             32,
137             64,
138             128,
139             256,
140             512,
141             640, // 512 + 128
142             1024,
143             1152, // 1024 + 128
144             2048,
145             2304, // 2048 + 256
146             4096,
147             4352, // 4096 + 256
148             8192,
149             8704, // 8192 + 512
150             16384,
151             16896, // 16384 + 512
152     };
153 
154     private static final int SIZE_CLASSES_COUNT = SIZE_CLASSES.length;
155     private static final byte[] SIZE_INDEXES = new byte[(SIZE_CLASSES[SIZE_CLASSES_COUNT - 1] / 32) + 1];
156 
157     static {
158         if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
159             throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
160                     + " (expected: >= " + 2 + ')');
161         }
162         int lastIndex = 0;
163         for (int i = 0; i < SIZE_CLASSES_COUNT; i++) {
164             int sizeClass = SIZE_CLASSES[i];
165             //noinspection ConstantValue
166             assert (sizeClass & 5) == 0 : "Size class must be a multiple of 32";
167             int sizeIndex = sizeIndexOf(sizeClass);
168             Arrays.fill(SIZE_INDEXES, lastIndex + 1, sizeIndex + 1, (byte) i);
169             lastIndex = sizeIndex;
170         }
171     }
172 
173     private final ChunkAllocator chunkAllocator;
174     private final Set<Chunk> chunkRegistry;
175     private final MagazineGroup[] sizeClassedMagazineGroups;
176     private final MagazineGroup largeBufferMagazineGroup;
177     private final FastThreadLocal<MagazineGroup[]> threadLocalGroup;
178 
179     AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, boolean useCacheForNonEventLoopThreads) {
180         this.chunkAllocator = ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
181         chunkRegistry = ConcurrentHashMap.newKeySet();
182         sizeClassedMagazineGroups = createMagazineGroupSizeClasses(this, false);
183         largeBufferMagazineGroup = new MagazineGroup(
184                 this, chunkAllocator, new HistogramChunkControllerFactory(true), false);
185 
186         threadLocalGroup = new FastThreadLocal<MagazineGroup[]>() {
187             @Override
188             protected MagazineGroup[] initialValue() {
189                 if (useCacheForNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
190                     return createMagazineGroupSizeClasses(AdaptivePoolingAllocator.this, true);
191                 }
192                 return null;
193             }
194 
195             @Override
196             protected void onRemoval(final MagazineGroup[] groups) throws Exception {
197                 if (groups != null) {
198                     for (MagazineGroup group : groups) {
199                         group.free();
200                     }
201                 }
202             }
203         };
204     }
205 
206     private static MagazineGroup[] createMagazineGroupSizeClasses(
207             AdaptivePoolingAllocator allocator, boolean isThreadLocal) {
208         MagazineGroup[] groups = new MagazineGroup[SIZE_CLASSES.length];
209         for (int i = 0; i < SIZE_CLASSES.length; i++) {
210             int segmentSize = SIZE_CLASSES[i];
211             groups[i] = new MagazineGroup(allocator, allocator.chunkAllocator,
212                     new SizeClassChunkControllerFactory(segmentSize), isThreadLocal);
213         }
214         return groups;
215     }
216 
217     /**
218      * Create a thread-safe multi-producer, multi-consumer queue to hold chunks that spill over from the
219      * internal Magazines.
220      * <p>
221      * Each Magazine can only hold two chunks at any one time: the chunk it currently allocates from,
222      * and the next-in-line chunk which will be used for allocation once the current one has been used up.
223      * This queue will be used by magazines to share any excess chunks they allocate, so that they don't need to
224      * allocate new chunks when their current and next-in-line chunks have both been used up.
225      * <p>
226      * The simplest implementation of this method is to return a new {@link ConcurrentLinkedQueue}.
227      * However, the {@code CLQ} is unbounded, and this means there's no limit to how many chunks can be cached in this
228      * queue.
229      * <p>
230      * Each chunk in this queue can be up to {@link #MAX_CHUNK_SIZE} in size, so it is recommended to use a bounded
231      * queue to limit the maximum memory usage.
232      * <p>
233      * The default implementation will create a bounded queue with a capacity of {@link #CHUNK_REUSE_QUEUE}.
234      *
235      * @return A new multi-producer, multi-consumer queue.
236      */
237     private static Queue<Chunk> createSharedChunkQueue() {
238         return PlatformDependent.newFixedMpmcQueue(CHUNK_REUSE_QUEUE);
239     }
240 
241     ByteBuf allocate(int size, int maxCapacity) {
242         return allocate(size, maxCapacity, Thread.currentThread(), null);
243     }
244 
245     private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
246         AdaptiveByteBuf allocated = null;
247         if (size <= MAX_POOLED_BUF_SIZE) {
248             final int index = sizeClassIndexOf(size);
249             MagazineGroup[] magazineGroups;
250             if (!FastThreadLocalThread.currentThreadWillCleanupFastThreadLocals() ||
251                     (magazineGroups = threadLocalGroup.get()) == null) {
252                 magazineGroups =  sizeClassedMagazineGroups;
253             }
254             if (index < magazineGroups.length) {
255                 allocated = magazineGroups[index].allocate(size, maxCapacity, currentThread, buf);
256             } else {
257                 allocated = largeBufferMagazineGroup.allocate(size, maxCapacity, currentThread, buf);
258             }
259         }
260         if (allocated == null) {
261             allocated = allocateFallback(size, maxCapacity, currentThread, buf);
262         }
263         return allocated;
264     }
265 
266     private static int sizeIndexOf(final int size) {
267         // this is aligning the size to the next multiple of 32 and dividing by 32 to get the size index.
268         return size + 31 >> 5;
269     }
270 
271     static int sizeClassIndexOf(int size) {
272         int sizeIndex = sizeIndexOf(size);
273         if (sizeIndex < SIZE_INDEXES.length) {
274             return SIZE_INDEXES[sizeIndex];
275         }
276         return SIZE_CLASSES_COUNT;
277     }
278 
279     static int[] getSizeClasses() {
280         return SIZE_CLASSES.clone();
281     }
282 
283     private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread,
284                                              AdaptiveByteBuf buf) {
285         // If we don't already have a buffer, obtain one from the most conveniently available magazine.
286         Magazine magazine;
287         if (buf != null) {
288             Chunk chunk = buf.chunk;
289             if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
290                 magazine = getFallbackMagazine(currentThread);
291             }
292         } else {
293             magazine = getFallbackMagazine(currentThread);
294             buf = magazine.newBuffer();
295         }
296         // Create a one-off chunk for this allocation.
297         AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
298         Chunk chunk = new Chunk(innerChunk, magazine, false, chunkSize -> true);
299         try {
300             chunk.readInitInto(buf, size, size, maxCapacity);
301         } finally {
302             // As the chunk is an one-off we need to always call release explicitly as readInitInto(...)
303             // will take care of retain once when successful. Once The AdaptiveByteBuf is released it will
304             // completely release the Chunk and so the contained innerChunk.
305             chunk.release();
306         }
307         return buf;
308     }
309 
310     private Magazine getFallbackMagazine(Thread currentThread) {
311         Magazine[] mags = largeBufferMagazineGroup.magazines;
312         return mags[(int) currentThread.getId() & mags.length - 1];
313     }
314 
315     /**
316      * Allocate into the given buffer. Used by {@link AdaptiveByteBuf#capacity(int)}.
317      */
318     void reallocate(int size, int maxCapacity, AdaptiveByteBuf into) {
319         AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
320         assert result == into: "Re-allocation created separate buffer instance";
321     }
322 
323     long usedMemory() {
324         long sum = 0;
325         for (Chunk chunk : chunkRegistry) {
326             sum += chunk.capacity();
327         }
328         return sum;
329     }
330 
331     // Ensure that we release all previous pooled resources when this object is finalized. This is needed as otherwise
332     // we might end up with leaks. While these leaks are usually harmless in reality it would still at least be
333     // very confusing for users.
334     @SuppressWarnings({"FinalizeDeclaration", "deprecation"})
335     @Override
336     protected void finalize() throws Throwable {
337         try {
338             super.finalize();
339         } finally {
340             free();
341         }
342     }
343 
344     private void free() {
345         largeBufferMagazineGroup.free();
346     }
347 
348     static int sizeToBucket(int size) {
349         return HistogramChunkController.sizeToBucket(size);
350     }
351 
352     private static final class MagazineGroup {
353         private final AdaptivePoolingAllocator allocator;
354         private final ChunkAllocator chunkAllocator;
355         private final ChunkControllerFactory chunkControllerFactory;
356         private final Queue<Chunk> chunkReuseQueue;
357         private final StampedLock magazineExpandLock;
358         private final Magazine threadLocalMagazine;
359         private volatile Magazine[] magazines;
360         private volatile boolean freed;
361 
362         MagazineGroup(AdaptivePoolingAllocator allocator,
363                       ChunkAllocator chunkAllocator,
364                       ChunkControllerFactory chunkControllerFactory,
365                       boolean isThreadLocal) {
366             this.allocator = allocator;
367             this.chunkAllocator = chunkAllocator;
368             this.chunkControllerFactory = chunkControllerFactory;
369             chunkReuseQueue = createSharedChunkQueue();
370             if (isThreadLocal) {
371                 magazineExpandLock = null;
372                 threadLocalMagazine = new Magazine(this, false, chunkReuseQueue, chunkControllerFactory.create(this));
373             } else {
374                 magazineExpandLock = new StampedLock();
375                 threadLocalMagazine = null;
376                 Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
377                 for (int i = 0; i < mags.length; i++) {
378                     mags[i] = new Magazine(this, true, chunkReuseQueue, chunkControllerFactory.create(this));
379                 }
380                 magazines = mags;
381             }
382         }
383 
384         public AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
385             boolean reallocate = buf != null;
386 
387             // Path for thread-local allocation.
388             Magazine tlMag = threadLocalMagazine;
389             if (tlMag != null) {
390                 if (buf == null) {
391                     buf = tlMag.newBuffer();
392                 }
393                 boolean allocated = tlMag.tryAllocate(size, maxCapacity, buf, reallocate);
394                 assert allocated : "Allocation of threadLocalMagazine must always succeed";
395                 return buf;
396             }
397 
398             // Path for concurrent allocation.
399             long threadId = currentThread.getId();
400             Magazine[] mags;
401             int expansions = 0;
402             do {
403                 mags = magazines;
404                 int mask = mags.length - 1;
405                 int index = (int) (threadId & mask);
406                 for (int i = 0, m = mags.length << 1; i < m; i++) {
407                     Magazine mag = mags[index + i & mask];
408                     if (buf == null) {
409                         buf = mag.newBuffer();
410                     }
411                     if (mag.tryAllocate(size, maxCapacity, buf, reallocate)) {
412                         // Was able to allocate.
413                         return buf;
414                     }
415                 }
416                 expansions++;
417             } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
418 
419             // The magazines failed us; contention too high and we don't want to spend more effort expanding the array.
420             if (!reallocate && buf != null) {
421                 buf.release(); // Release the previously claimed buffer before we return.
422             }
423             return null;
424         }
425 
426         private boolean tryExpandMagazines(int currentLength) {
427             if (currentLength >= MAX_STRIPES) {
428                 return true;
429             }
430             final Magazine[] mags;
431             long writeLock = magazineExpandLock.tryWriteLock();
432             if (writeLock != 0) {
433                 try {
434                     mags = magazines;
435                     if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
436                         return true;
437                     }
438                     Magazine firstMagazine = mags[0];
439                     Magazine[] expanded = new Magazine[mags.length * 2];
440                     for (int i = 0, l = expanded.length; i < l; i++) {
441                         Magazine m = new Magazine(this, true, chunkReuseQueue, chunkControllerFactory.create(this));
442                         firstMagazine.initializeSharedStateIn(m);
443                         expanded[i] = m;
444                     }
445                     magazines = expanded;
446                 } finally {
447                     magazineExpandLock.unlockWrite(writeLock);
448                 }
449                 for (Magazine magazine : mags) {
450                     magazine.free();
451                 }
452             }
453             return true;
454         }
455 
456         boolean offerToQueue(Chunk buffer) {
457             if (freed) {
458                 return false;
459             }
460 
461             boolean isAdded = chunkReuseQueue.offer(buffer);
462             if (freed && isAdded) {
463                 // Help to free the reuse queue.
464                 freeChunkReuseQueue();
465             }
466             return isAdded;
467         }
468 
469         private void free() {
470             freed = true;
471             if (threadLocalMagazine != null) {
472                 threadLocalMagazine.free();
473             } else {
474                 long stamp = magazineExpandLock.writeLock();
475                 try {
476                     Magazine[] mags = magazines;
477                     for (Magazine magazine : mags) {
478                         magazine.free();
479                     }
480                 } finally {
481                     magazineExpandLock.unlockWrite(stamp);
482                 }
483             }
484             freeChunkReuseQueue();
485         }
486 
487         private void freeChunkReuseQueue() {
488             for (;;) {
489                 Chunk chunk = chunkReuseQueue.poll();
490                 if (chunk == null) {
491                     break;
492                 }
493                 chunk.release();
494             }
495         }
496     }
497 
498     private interface ChunkControllerFactory {
499         ChunkController create(MagazineGroup group);
500     }
501 
502     private interface ChunkController {
503         /**
504          * Compute the "fast max capacity" value for the buffer.
505          */
506         int computeBufferCapacity(int requestedSize, int maxCapacity, boolean isReallocation);
507 
508         /**
509          * Initialize the given chunk factory with shared statistics state (if any) from this factory.
510          */
511         void initializeSharedStateIn(ChunkController chunkController);
512 
513         /**
514          * Allocate a new {@link Chunk} for the given {@link Magazine}.
515          */
516         Chunk newChunkAllocation(int promptingSize, Magazine magazine);
517     }
518 
519     private interface ChunkReleasePredicate {
520         boolean shouldReleaseChunk(int chunkSize);
521     }
522 
523     private static final class SizeClassChunkControllerFactory implements ChunkControllerFactory {
524         private final int segmentSize;
525 
526         private SizeClassChunkControllerFactory(int segmentSize) {
527             this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
528         }
529 
530         @Override
531         public ChunkController create(MagazineGroup group) {
532             return new SizeClassChunkController(group, segmentSize);
533         }
534     }
535 
536     private static final class SizeClassChunkController implements ChunkController {
537         // To amortize activation/deactivation of chunks, we should have a minimum number of segments per chunk.
538         // We choose 32 because it seems neither too small nor too big.
539         // For segments of 16 KiB, the chunks will be half a megabyte.
540         private static final int MIN_SEGMENTS_PER_CHUNK = 32;
541         private final ChunkAllocator chunkAllocator;
542         private final int segmentSize;
543         private final int chunkSize;
544         private final Set<Chunk> chunkRegistry;
545 
546         private SizeClassChunkController(MagazineGroup group, int segmentSize) {
547             chunkAllocator = group.chunkAllocator;
548             this.segmentSize = segmentSize;
549             chunkSize = Math.max(MIN_CHUNK_SIZE, segmentSize * MIN_SEGMENTS_PER_CHUNK);
550             chunkRegistry = group.allocator.chunkRegistry;
551         }
552 
553         @Override
554         public int computeBufferCapacity(
555                 int requestedSize, int maxCapacity, boolean isReallocation) {
556             return Math.min(segmentSize, maxCapacity);
557         }
558 
559         @Override
560         public void initializeSharedStateIn(ChunkController chunkController) {
561             // NOOP
562         }
563 
564         @Override
565         public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
566             SizeClassedChunk chunk = new SizeClassedChunk(chunkAllocator.allocate(chunkSize, chunkSize),
567                     magazine, true, segmentSize, size -> false);
568             chunkRegistry.add(chunk);
569             return chunk;
570         }
571     }
572 
573     private static final class HistogramChunkControllerFactory implements ChunkControllerFactory {
574         private final boolean shareable;
575 
576         private HistogramChunkControllerFactory(boolean shareable) {
577             this.shareable = shareable;
578         }
579 
580         @Override
581         public ChunkController create(MagazineGroup group) {
582             return new HistogramChunkController(group, shareable);
583         }
584     }
585 
586     private static final class HistogramChunkController implements ChunkController, ChunkReleasePredicate {
587         private static final int MIN_DATUM_TARGET = 1024;
588         private static final int MAX_DATUM_TARGET = 65534;
589         private static final int INIT_DATUM_TARGET = 9;
590         private static final int HISTO_BUCKET_COUNT = 16;
591         private static final int[] HISTO_BUCKETS = {
592                 16 * 1024,
593                 24 * 1024,
594                 32 * 1024,
595                 48 * 1024,
596                 64 * 1024,
597                 96 * 1024,
598                 128 * 1024,
599                 192 * 1024,
600                 256 * 1024,
601                 384 * 1024,
602                 512 * 1024,
603                 768 * 1024,
604                 1024 * 1024,
605                 1792 * 1024,
606                 2048 * 1024,
607                 3072 * 1024
608         };
609 
610         private final MagazineGroup group;
611         private final boolean shareable;
612         private final short[][] histos = {
613                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
614                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
615         };
616         private final Set<Chunk> chunkRegistry;
617         private short[] histo = histos[0];
618         private final int[] sums = new int[HISTO_BUCKET_COUNT];
619 
620         private int histoIndex;
621         private int datumCount;
622         private int datumTarget = INIT_DATUM_TARGET;
623         private boolean hasHadRotation;
624         private volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
625         private volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
626         private volatile int localUpperBufSize;
627 
628         private HistogramChunkController(MagazineGroup group, boolean shareable) {
629             this.group = group;
630             this.shareable = shareable;
631             chunkRegistry = group.allocator.chunkRegistry;
632         }
633 
634         @Override
635         public int computeBufferCapacity(
636                 int requestedSize, int maxCapacity, boolean isReallocation) {
637             if (!isReallocation) {
638                 // Only record allocation size if it's not caused by a reallocation that was triggered by capacity
639                 // change of the buffer.
640                 recordAllocationSize(requestedSize);
641             }
642 
643             // Predict starting capacity from localUpperBufSize, but place limits on the max starting capacity
644             // based on the requested size, because localUpperBufSize can potentially be quite large.
645             int startCapLimits;
646             if (requestedSize <= 32768) { // Less than or equal to 32 KiB.
647                 startCapLimits = 65536; // Use at most 64 KiB, which is also the AdaptiveRecvByteBufAllocator max.
648             } else {
649                 startCapLimits = requestedSize * 2; // Otherwise use at most twice the requested memory.
650             }
651             int startingCapacity = Math.min(startCapLimits, localUpperBufSize);
652             startingCapacity = Math.max(requestedSize, Math.min(maxCapacity, startingCapacity));
653             return startingCapacity;
654         }
655 
656         private void recordAllocationSize(int bufferSizeToRecord) {
657             // Use the preserved size from the reused AdaptiveByteBuf, if available.
658             // Otherwise, use the requested buffer size.
659             // This way, we better take into account
660             if (bufferSizeToRecord == 0) {
661                 return;
662             }
663             int bucket = sizeToBucket(bufferSizeToRecord);
664             histo[bucket]++;
665             if (datumCount++ == datumTarget) {
666                 rotateHistograms();
667             }
668         }
669 
670         static int sizeToBucket(int size) {
671             int index = binarySearchInsertionPoint(Arrays.binarySearch(HISTO_BUCKETS, size));
672             return index >= HISTO_BUCKETS.length ? HISTO_BUCKETS.length - 1 : index;
673         }
674 
675         private static int binarySearchInsertionPoint(int index) {
676             if (index < 0) {
677                 index = -(index + 1);
678             }
679             return index;
680         }
681 
682         static int bucketToSize(int sizeBucket) {
683             return HISTO_BUCKETS[sizeBucket];
684         }
685 
686         private void rotateHistograms() {
687             short[][] hs = histos;
688             for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
689                 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
690             }
691             int sum = 0;
692             for (int count : sums) {
693                 sum  += count;
694             }
695             int targetPercentile = (int) (sum * 0.99);
696             int sizeBucket = 0;
697             for (; sizeBucket < sums.length; sizeBucket++) {
698                 if (sums[sizeBucket] > targetPercentile) {
699                     break;
700                 }
701                 targetPercentile -= sums[sizeBucket];
702             }
703             hasHadRotation = true;
704             int percentileSize = bucketToSize(sizeBucket);
705             int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
706             localUpperBufSize = percentileSize;
707             localPrefChunkSize = prefChunkSize;
708             if (shareable) {
709                 for (Magazine mag : group.magazines) {
710                     HistogramChunkController statistics = (HistogramChunkController) mag.chunkController;
711                     prefChunkSize = Math.max(prefChunkSize, statistics.localPrefChunkSize);
712                 }
713             }
714             if (sharedPrefChunkSize != prefChunkSize) {
715                 // Preferred chunk size changed. Increase check frequency.
716                 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
717                 sharedPrefChunkSize = prefChunkSize;
718             } else {
719                 // Preferred chunk size did not change. Check less often.
720                 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
721             }
722 
723             histoIndex = histoIndex + 1 & 3;
724             histo = histos[histoIndex];
725             datumCount = 0;
726             Arrays.fill(histo, (short) 0);
727         }
728 
729         /**
730          * Get the preferred chunk size, based on statistics from the {@linkplain #recordAllocationSize(int) recorded}
731          * allocation sizes.
732          * <p>
733          * This method must be thread-safe.
734          *
735          * @return The currently preferred chunk allocation size.
736          */
737         int preferredChunkSize() {
738             return sharedPrefChunkSize;
739         }
740 
741         @Override
742         public void initializeSharedStateIn(ChunkController chunkController) {
743             HistogramChunkController statistics = (HistogramChunkController) chunkController;
744             int sharedPrefChunkSize = this.sharedPrefChunkSize;
745             statistics.localPrefChunkSize = sharedPrefChunkSize;
746             statistics.sharedPrefChunkSize = sharedPrefChunkSize;
747         }
748 
749         @Override
750         public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
751             int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
752             int minChunks = size / MIN_CHUNK_SIZE;
753             if (MIN_CHUNK_SIZE * minChunks < size) {
754                 // Round up to nearest whole MIN_CHUNK_SIZE unit. The MIN_CHUNK_SIZE is an even multiple of many
755                 // popular small page sizes, like 4k, 16k, and 64k, which makes it easier for the system allocator
756                 // to manage the memory in terms of whole pages. This reduces memory fragmentation,
757                 // but without the potentially high overhead that power-of-2 chunk sizes would bring.
758                 size = MIN_CHUNK_SIZE * (1 + minChunks);
759             }
760 
761             // Limit chunks to the max size, even if the histogram suggests to go above it.
762             size = Math.min(size, MAX_CHUNK_SIZE);
763 
764             // If we haven't rotated the histogram yet, optimisticly record this chunk size as our preferred.
765             if (!hasHadRotation && sharedPrefChunkSize == MIN_CHUNK_SIZE) {
766                 sharedPrefChunkSize = size;
767             }
768 
769             ChunkAllocator chunkAllocator = group.chunkAllocator;
770             Chunk chunk = new Chunk(chunkAllocator.allocate(size, size), magazine, true, this);
771             chunkRegistry.add(chunk);
772             return chunk;
773         }
774 
775         @Override
776         public boolean shouldReleaseChunk(int chunkSize) {
777             int preferredSize = preferredChunkSize();
778             int givenChunks = chunkSize / MIN_CHUNK_SIZE;
779             int preferredChunks = preferredSize / MIN_CHUNK_SIZE;
780             int deviation = Math.abs(givenChunks - preferredChunks);
781 
782             // Retire chunks with a 5% probability per unit of MIN_CHUNK_SIZE deviation from preference.
783             return deviation != 0 &&
784                     ThreadLocalRandom.current().nextDouble() * 20.0 < deviation;
785         }
786     }
787 
788     private static final class Magazine {
789         private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
790         static {
791             NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
792         }
793         private static final Chunk MAGAZINE_FREED = new Chunk();
794 
795         private static final Recycler<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = new Recycler<AdaptiveByteBuf>() {
796             @Override
797             protected AdaptiveByteBuf newObject(Handle<AdaptiveByteBuf> handle) {
798                 return new AdaptiveByteBuf(handle);
799             }
800         };
801 
802         private Chunk current;
803         @SuppressWarnings("unused") // updated via NEXT_IN_LINE
804         private volatile Chunk nextInLine;
805         private final MagazineGroup group;
806         private final ChunkController chunkController;
807         private final AtomicLong usedMemory;
808         private final StampedLock allocationLock;
809         private final Queue<AdaptiveByteBuf> bufferQueue;
810         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
811         private final Queue<Chunk> sharedChunkQueue;
812 
813         Magazine(MagazineGroup group, boolean shareable, Queue<Chunk> sharedChunkQueue,
814                  ChunkController chunkController) {
815             this.group = group;
816             this.chunkController = chunkController;
817 
818             if (shareable) {
819                 // We only need the StampedLock if this Magazine will be shared across threads.
820                 allocationLock = new StampedLock();
821                 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
822                 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
823                     @Override
824                     public void recycle(AdaptiveByteBuf self) {
825                         bufferQueue.offer(self);
826                     }
827                 };
828             } else {
829                 allocationLock = null;
830                 bufferQueue = null;
831                 handle = null;
832             }
833             usedMemory = new AtomicLong();
834             this.sharedChunkQueue = sharedChunkQueue;
835         }
836 
837         public boolean tryAllocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
838             if (allocationLock == null) {
839                 // This magazine is not shared across threads, just allocate directly.
840                 return allocate(size, maxCapacity, buf, reallocate);
841             }
842 
843             // Try to retrieve the lock and if successful allocate.
844             long writeLock = allocationLock.tryWriteLock();
845             if (writeLock != 0) {
846                 try {
847                     return allocate(size, maxCapacity, buf, reallocate);
848                 } finally {
849                     allocationLock.unlockWrite(writeLock);
850                 }
851             }
852             return allocateWithoutLock(size, maxCapacity, buf);
853         }
854 
855         private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf buf) {
856             Chunk curr = NEXT_IN_LINE.getAndSet(this, null);
857             if (curr == MAGAZINE_FREED) {
858                 // Allocation raced with a stripe-resize that freed this magazine.
859                 restoreMagazineFreed();
860                 return false;
861             }
862             if (curr == null) {
863                 curr = sharedChunkQueue.poll();
864                 if (curr == null) {
865                     return false;
866                 }
867                 curr.attachToMagazine(this);
868             }
869             boolean allocated = false;
870             int remainingCapacity = curr.remainingCapacity();
871             int startingCapacity = chunkController.computeBufferCapacity(
872                     size, maxCapacity, true /* never update stats as we don't hold the magazine lock */);
873             if (remainingCapacity >= size) {
874                 curr.readInitInto(buf, size, Math.min(remainingCapacity, startingCapacity), maxCapacity);
875                 allocated = true;
876             }
877             try {
878                 if (remainingCapacity >= RETIRE_CAPACITY) {
879                     transferToNextInLineOrRelease(curr);
880                     curr = null;
881                 }
882             } finally {
883                 if (curr != null) {
884                     curr.releaseFromMagazine();
885                 }
886             }
887             return allocated;
888         }
889 
890         private boolean allocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
891             int startingCapacity = chunkController.computeBufferCapacity(size, maxCapacity, reallocate);
892             Chunk curr = current;
893             if (curr != null) {
894                 // We have a Chunk that has some space left.
895                 int remainingCapacity = curr.remainingCapacity();
896                 if (remainingCapacity > startingCapacity) {
897                     curr.readInitInto(buf, size, startingCapacity, maxCapacity);
898                     // We still have some bytes left that we can use for the next allocation, just early return.
899                     return true;
900                 }
901 
902                 // At this point we know that this will be the last time current will be used, so directly set it to
903                 // null and release it once we are done.
904                 current = null;
905                 if (remainingCapacity >= size) {
906                     try {
907                         curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
908                         return true;
909                     } finally {
910                         curr.releaseFromMagazine();
911                     }
912                 }
913 
914                 // Check if we either retain the chunk in the nextInLine cache or releasing it.
915                 if (remainingCapacity < RETIRE_CAPACITY) {
916                     curr.releaseFromMagazine();
917                 } else {
918                     // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
919                     // This method will release curr if this is not the case
920                     transferToNextInLineOrRelease(curr);
921                 }
922             }
923 
924             assert current == null;
925             // The fast-path for allocations did not work.
926             //
927             // Try to fetch the next "Magazine local" Chunk first, if this fails because we don't have a
928             // next-in-line chunk available, we will poll our centralQueue.
929             // If this fails as well we will just allocate a new Chunk.
930             //
931             // In any case we will store the Chunk as the current so it will be used again for the next allocation and
932             // thus be "reserved" by this Magazine for exclusive usage.
933             curr = NEXT_IN_LINE.getAndSet(this, null);
934             if (curr != null) {
935                 if (curr == MAGAZINE_FREED) {
936                     // Allocation raced with a stripe-resize that freed this magazine.
937                     restoreMagazineFreed();
938                     return false;
939                 }
940 
941                 int remainingCapacity = curr.remainingCapacity();
942                 if (remainingCapacity > startingCapacity) {
943                     // We have a Chunk that has some space left.
944                     curr.readInitInto(buf, size, startingCapacity, maxCapacity);
945                     current = curr;
946                     return true;
947                 }
948 
949                 if (remainingCapacity >= size) {
950                     // At this point we know that this will be the last time curr will be used, so directly set it to
951                     // null and release it once we are done.
952                     try {
953                         curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
954                         return true;
955                     } finally {
956                         // Release in a finally block so even if readInitInto(...) would throw we would still correctly
957                         // release the current chunk before null it out.
958                         curr.releaseFromMagazine();
959                     }
960                 } else {
961                     // Release it as it's too small.
962                     curr.releaseFromMagazine();
963                 }
964             }
965 
966             // Now try to poll from the central queue first
967             curr = sharedChunkQueue.poll();
968             if (curr == null) {
969                 curr = chunkController.newChunkAllocation(size, this);
970             } else {
971                 curr.attachToMagazine(this);
972 
973                 int remainingCapacity = curr.remainingCapacity();
974                 if (remainingCapacity < size) {
975                     // Check if we either retain the chunk in the nextInLine cache or releasing it.
976                     if (remainingCapacity < RETIRE_CAPACITY) {
977                         curr.releaseFromMagazine();
978                     } else {
979                         // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
980                         // This method will release curr if this is not the case
981                         transferToNextInLineOrRelease(curr);
982                     }
983                     curr = chunkController.newChunkAllocation(size, this);
984                 }
985             }
986 
987             current = curr;
988             try {
989                 int remainingCapacity = curr.remainingCapacity();
990                 assert remainingCapacity >= size;
991                 if (remainingCapacity > startingCapacity) {
992                     curr.readInitInto(buf, size, startingCapacity, maxCapacity);
993                     curr = null;
994                 } else {
995                     curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
996                 }
997             } finally {
998                 if (curr != null) {
999                     // Release in a finally block so even if readInitInto(...) would throw we would still correctly
1000                     // release the current chunk before null it out.
1001                     curr.releaseFromMagazine();
1002                     current = null;
1003                 }
1004             }
1005             return true;
1006         }
1007 
1008         private void restoreMagazineFreed() {
1009             Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
1010             if (next != null && next != MAGAZINE_FREED) {
1011                 // A chunk snuck in through a race. Release it after restoring MAGAZINE_FREED state.
1012                 next.releaseFromMagazine();
1013             }
1014         }
1015 
1016         private void transferToNextInLineOrRelease(Chunk chunk) {
1017             if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
1018                 return;
1019             }
1020 
1021             Chunk nextChunk = NEXT_IN_LINE.get(this);
1022             if (nextChunk != null && nextChunk != MAGAZINE_FREED
1023                     && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
1024                 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
1025                     nextChunk.releaseFromMagazine();
1026                     return;
1027                 }
1028             }
1029             // Next-in-line is occupied. We don't try to add it to the central queue yet as it might still be used
1030             // by some buffers and so is attached to a Magazine.
1031             // Once a Chunk is completely released by Chunk.release() it will try to move itself to the queue
1032             // as last resort.
1033             chunk.releaseFromMagazine();
1034         }
1035 
1036         boolean trySetNextInLine(Chunk chunk) {
1037             return NEXT_IN_LINE.compareAndSet(this, null, chunk);
1038         }
1039 
1040         void free() {
1041             // Release the current Chunk and the next that was stored for later usage.
1042             restoreMagazineFreed();
1043             long stamp = allocationLock != null ? allocationLock.writeLock() : 0;
1044             try {
1045                 if (current != null) {
1046                     current.releaseFromMagazine();
1047                     current = null;
1048                 }
1049             } finally {
1050                 if (allocationLock != null) {
1051                     allocationLock.unlockWrite(stamp);
1052                 }
1053             }
1054         }
1055 
1056         public AdaptiveByteBuf newBuffer() {
1057             AdaptiveByteBuf buf;
1058             if (handle == null) {
1059                 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
1060             } else {
1061                 buf = bufferQueue.poll();
1062                 if (buf == null) {
1063                     buf = new AdaptiveByteBuf(handle);
1064                 }
1065             }
1066             buf.resetRefCnt();
1067             buf.discardMarks();
1068             return buf;
1069         }
1070 
1071         boolean offerToQueue(Chunk chunk) {
1072             return group.offerToQueue(chunk);
1073         }
1074 
1075         public void initializeSharedStateIn(Magazine other) {
1076             chunkController.initializeSharedStateIn(other.chunkController);
1077         }
1078     }
1079 
1080     private static class Chunk implements ReferenceCounted, ChunkInfo {
1081         private static final long REFCNT_FIELD_OFFSET =
1082                 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
1083         private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
1084                 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
1085 
1086         protected final AbstractByteBuf delegate;
1087         protected Magazine magazine;
1088         private final AdaptivePoolingAllocator allocator;
1089         private final ChunkReleasePredicate chunkReleasePredicate;
1090         private final int capacity;
1091         private final boolean pooled;
1092         protected int allocatedBytes;
1093 
1094         private static final ReferenceCountUpdater<Chunk> updater =
1095                 new ReferenceCountUpdater<Chunk>() {
1096                     @Override
1097                     protected AtomicIntegerFieldUpdater<Chunk> updater() {
1098                         return AIF_UPDATER;
1099                     }
1100                     @Override
1101                     protected long unsafeOffset() {
1102                         // on native image, REFCNT_FIELD_OFFSET can be recomputed even with Unsafe unavailable, so we
1103                         // need to guard here
1104                         return PlatformDependent.hasUnsafe() ? REFCNT_FIELD_OFFSET : -1;
1105                     }
1106                 };
1107 
1108         // Value might not equal "real" reference count, all access should be via the updater
1109         @SuppressWarnings({"unused", "FieldMayBeFinal"})
1110         private volatile int refCnt;
1111 
1112         Chunk() {
1113             // Constructor only used by the MAGAZINE_FREED sentinel.
1114             delegate = null;
1115             magazine = null;
1116             allocator = null;
1117             chunkReleasePredicate = null;
1118             capacity = 0;
1119             pooled = false;
1120         }
1121 
1122         Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled,
1123               ChunkReleasePredicate chunkReleasePredicate) {
1124             this.delegate = delegate;
1125             this.pooled = pooled;
1126             capacity = delegate.capacity();
1127             updater.setInitialValue(this);
1128             attachToMagazine(magazine);
1129 
1130             // We need the top-level allocator so ByteBuf.capacity(int) can call reallocate()
1131             allocator = magazine.group.allocator;
1132 
1133             this.chunkReleasePredicate = chunkReleasePredicate;
1134 
1135             if (PlatformDependent.isJfrEnabled() && AllocateChunkEvent.isEventEnabled()) {
1136                 AllocateChunkEvent event = new AllocateChunkEvent();
1137                 if (event.shouldCommit()) {
1138                     event.fill(this, AllocatorType.adaptive);
1139                     event.pooled = pooled;
1140                     event.threadLocal = magazine.allocationLock == null;
1141                     event.commit();
1142                 }
1143             }
1144         }
1145 
1146         Magazine currentMagazine()  {
1147             return magazine;
1148         }
1149 
1150         void detachFromMagazine() {
1151             if (magazine != null) {
1152                 magazine.usedMemory.getAndAdd(-capacity);
1153                 magazine = null;
1154             }
1155         }
1156 
1157         void attachToMagazine(Magazine magazine) {
1158             assert this.magazine == null;
1159             this.magazine = magazine;
1160             magazine.usedMemory.getAndAdd(capacity);
1161         }
1162 
1163         @Override
1164         public Chunk touch(Object hint) {
1165             return this;
1166         }
1167 
1168         @Override
1169         public int refCnt() {
1170             return updater.refCnt(this);
1171         }
1172 
1173         @Override
1174         public Chunk retain() {
1175             return updater.retain(this);
1176         }
1177 
1178         @Override
1179         public Chunk retain(int increment) {
1180             return updater.retain(this, increment);
1181         }
1182 
1183         @Override
1184         public Chunk touch() {
1185             return this;
1186         }
1187 
1188         @Override
1189         public boolean release() {
1190             if (updater.release(this)) {
1191                 deallocate();
1192                 return true;
1193             }
1194             return false;
1195         }
1196 
1197         @Override
1198         public boolean release(int decrement) {
1199             if (updater.release(this, decrement)) {
1200                 deallocate();
1201                 return true;
1202             }
1203             return false;
1204         }
1205 
1206         /**
1207          * Called when a magazine is done using this chunk, probably because it was emptied.
1208          */
1209         boolean releaseFromMagazine() {
1210             return release();
1211         }
1212 
1213         /**
1214          * Called when a ByteBuf is done using its allocation in this chunk.
1215          */
1216         boolean releaseSegment(int ignoredSegmentId) {
1217             return release();
1218         }
1219 
1220         private void deallocate() {
1221             Magazine mag = magazine;
1222             int chunkSize = delegate.capacity();
1223             if (!pooled || chunkReleasePredicate.shouldReleaseChunk(chunkSize) || mag == null) {
1224                 // Drop the chunk if the parent allocator is closed,
1225                 // or if the chunk deviates too much from the preferred chunk size.
1226                 detachFromMagazine();
1227                 onRelease();
1228                 allocator.chunkRegistry.remove(this);
1229                 delegate.release();
1230             } else {
1231                 updater.resetRefCnt(this);
1232                 delegate.setIndex(0, 0);
1233                 allocatedBytes = 0;
1234                 if (!mag.trySetNextInLine(this)) {
1235                     // As this Chunk does not belong to the mag anymore we need to decrease the used memory .
1236                     detachFromMagazine();
1237                     if (!mag.offerToQueue(this)) {
1238                         // The central queue is full. Ensure we release again as we previously did use resetRefCnt()
1239                         // which did increase the reference count by 1.
1240                         boolean released = updater.release(this);
1241                         onRelease();
1242                         allocator.chunkRegistry.remove(this);
1243                         delegate.release();
1244                         assert released;
1245                     } else {
1246                         onReturn(false);
1247                     }
1248                 } else {
1249                     onReturn(true);
1250                 }
1251             }
1252         }
1253 
1254         private void onReturn(boolean returnedToMagazine) {
1255             if (PlatformDependent.isJfrEnabled() && ReturnChunkEvent.isEventEnabled()) {
1256                 ReturnChunkEvent event = new ReturnChunkEvent();
1257                 if (event.shouldCommit()) {
1258                     event.fill(this, AllocatorType.adaptive);
1259                     event.returnedToMagazine = returnedToMagazine;
1260                     event.commit();
1261                 }
1262             }
1263         }
1264 
1265         private void onRelease() {
1266             if (PlatformDependent.isJfrEnabled() && FreeChunkEvent.isEventEnabled()) {
1267                 FreeChunkEvent event = new FreeChunkEvent();
1268                 if (event.shouldCommit()) {
1269                     event.fill(this, AllocatorType.adaptive);
1270                     event.pooled = pooled;
1271                     event.commit();
1272                 }
1273             }
1274         }
1275 
1276         public void readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1277             int startIndex = allocatedBytes;
1278             allocatedBytes = startIndex + startingCapacity;
1279             Chunk chunk = this;
1280             chunk.retain();
1281             try {
1282                 buf.init(delegate, chunk, 0, 0, startIndex, size, startingCapacity, maxCapacity, 0);
1283                 chunk = null;
1284             } finally {
1285                 if (chunk != null) {
1286                     // If chunk is not null we know that buf.init(...) failed and so we need to manually release
1287                     // the chunk again as we retained it before calling buf.init(...). Beside this we also need to
1288                     // restore the old allocatedBytes value.
1289                     allocatedBytes = startIndex;
1290                     chunk.release();
1291                 }
1292             }
1293         }
1294 
1295         public int remainingCapacity() {
1296             return capacity - allocatedBytes;
1297         }
1298 
1299         @Override
1300         public int capacity() {
1301             return capacity;
1302         }
1303 
1304         @Override
1305         public boolean isDirect() {
1306             return delegate.isDirect();
1307         }
1308 
1309         @Override
1310         public long memoryAddress() {
1311             return delegate._memoryAddress();
1312         }
1313     }
1314 
1315     private static final class SizeClassedChunk extends Chunk {
1316         private static final int FREE_LIST_EMPTY = -1;
1317         private final int segmentSize;
1318         private final MpscIntQueue freeList;
1319 
1320         SizeClassedChunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled, int segmentSize,
1321                          ChunkReleasePredicate shouldReleaseChunk) {
1322             super(delegate, magazine, pooled, shouldReleaseChunk);
1323             int capacity = delegate.capacity();
1324             this.segmentSize = segmentSize;
1325             int segmentCount = capacity / segmentSize;
1326             assert segmentCount > 0: "Chunk must have a positive number of segments";
1327             freeList = MpscIntQueue.create(segmentCount, FREE_LIST_EMPTY);
1328             freeList.fill(segmentCount, new IntSupplier() {
1329                 int counter;
1330                 @Override
1331                 public int getAsInt() {
1332                     return counter++;
1333                 }
1334             });
1335         }
1336 
1337         @Override
1338         public void readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1339             int segmentId = freeList.poll();
1340             if (segmentId == FREE_LIST_EMPTY) {
1341                 throw new IllegalStateException("Free list is empty");
1342             }
1343 
1344             int startIndex = segmentId * segmentSize;
1345             allocatedBytes += segmentSize;
1346             Chunk chunk = this;
1347             chunk.retain();
1348             try {
1349                 buf.init(delegate, chunk, 0, 0, startIndex, size, startingCapacity, maxCapacity, segmentId);
1350                 chunk = null;
1351             } finally {
1352                 if (chunk != null) {
1353                     // If chunk is not null we know that buf.init(...) failed and so we need to manually release
1354                     // the chunk again as we retained it before calling buf.init(...). Beside this we also need to
1355                     // restore the old allocatedBytes value.
1356                     allocatedBytes -= segmentSize;
1357                     chunk.releaseSegment(segmentId);
1358                 }
1359             }
1360         }
1361 
1362         @Override
1363         public int remainingCapacity() {
1364             int remainingCapacity = super.remainingCapacity();
1365             if (remainingCapacity > segmentSize) {
1366                 return remainingCapacity;
1367             }
1368             int updatedRemainingCapacity = freeList.size() * segmentSize;
1369             if (updatedRemainingCapacity == remainingCapacity) {
1370                 return remainingCapacity;
1371             }
1372             // update allocatedBytes based on what's available in the free list
1373             allocatedBytes = capacity() - updatedRemainingCapacity;
1374             return updatedRemainingCapacity;
1375         }
1376 
1377         @Override
1378         boolean releaseFromMagazine() {
1379             // Size-classed chunks can be reused before they become empty.
1380             // We can therefor put them in the shared queue as soon as the magazine is done with this chunk.
1381             Magazine mag = magazine;
1382             detachFromMagazine();
1383             if (!mag.offerToQueue(this)) {
1384                 return super.releaseFromMagazine();
1385             }
1386             return false;
1387         }
1388 
1389         @Override
1390         boolean releaseSegment(int segmentId) {
1391             boolean released = release();
1392             boolean segmentReturned = freeList.offer(segmentId);
1393             assert segmentReturned: "Unable to return segment " + segmentId + " to free list";
1394             return released;
1395         }
1396     }
1397 
1398     static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
1399 
1400         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
1401 
1402         private int adjustment;
1403         private AbstractByteBuf rootParent;
1404         Chunk chunk;
1405         private int length;
1406         private int maxFastCapacity;
1407         private int segmentId;
1408         private ByteBuffer tmpNioBuf;
1409         private boolean hasArray;
1410         private boolean hasMemoryAddress;
1411 
1412         AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
1413             super(0);
1414             handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
1415         }
1416 
1417         void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
1418                   int adjustment, int size, int capacity, int maxCapacity, int segmentId) {
1419             this.adjustment = adjustment;
1420             chunk = wrapped;
1421             length = size;
1422             maxFastCapacity = capacity;
1423             maxCapacity(maxCapacity);
1424             setIndex0(readerIndex, writerIndex);
1425             this.segmentId = segmentId;
1426             hasArray = unwrapped.hasArray();
1427             hasMemoryAddress = unwrapped.hasMemoryAddress();
1428             rootParent = unwrapped;
1429             tmpNioBuf = null;
1430 
1431             if (PlatformDependent.isJfrEnabled() && AllocateBufferEvent.isEventEnabled()) {
1432                 AllocateBufferEvent event = new AllocateBufferEvent();
1433                 if (event.shouldCommit()) {
1434                     event.fill(this, AllocatorType.adaptive);
1435                     event.chunkPooled = wrapped.pooled;
1436                     Magazine m = wrapped.magazine;
1437                     event.chunkThreadLocal = m != null && m.allocationLock == null;
1438                     event.commit();
1439                 }
1440             }
1441         }
1442 
1443         private AbstractByteBuf rootParent() {
1444             final AbstractByteBuf rootParent = this.rootParent;
1445             if (rootParent != null) {
1446                 return rootParent;
1447             }
1448             throw new IllegalReferenceCountException();
1449         }
1450 
1451         @Override
1452         public int capacity() {
1453             return length;
1454         }
1455 
1456         @Override
1457         public int maxFastWritableBytes() {
1458             return Math.min(maxFastCapacity, maxCapacity()) - writerIndex;
1459         }
1460 
1461         @Override
1462         public ByteBuf capacity(int newCapacity) {
1463             if (length <= newCapacity && newCapacity <= maxFastCapacity) {
1464                 ensureAccessible();
1465                 length = newCapacity;
1466                 return this;
1467             }
1468             checkNewCapacity(newCapacity);
1469             if (newCapacity < capacity()) {
1470                 length = newCapacity;
1471                 trimIndicesToCapacity(newCapacity);
1472                 return this;
1473             }
1474 
1475             if (PlatformDependent.isJfrEnabled() && ReallocateBufferEvent.isEventEnabled()) {
1476                 ReallocateBufferEvent event = new ReallocateBufferEvent();
1477                 if (event.shouldCommit()) {
1478                     event.fill(this, AllocatorType.adaptive);
1479                     event.newCapacity = newCapacity;
1480                     event.commit();
1481                 }
1482             }
1483 
1484             // Reallocation required.
1485             Chunk chunk = this.chunk;
1486             AdaptivePoolingAllocator allocator = chunk.allocator;
1487             int readerIndex = this.readerIndex;
1488             int writerIndex = this.writerIndex;
1489             int baseOldRootIndex = adjustment;
1490             int oldCapacity = length;
1491             int oldSegmentId = segmentId;
1492             AbstractByteBuf oldRoot = rootParent();
1493             allocator.reallocate(newCapacity, maxCapacity(), this);
1494             oldRoot.getBytes(baseOldRootIndex, this, 0, oldCapacity);
1495             chunk.releaseSegment(oldSegmentId);
1496             this.readerIndex = readerIndex;
1497             this.writerIndex = writerIndex;
1498             return this;
1499         }
1500 
1501         @Override
1502         public ByteBufAllocator alloc() {
1503             return rootParent().alloc();
1504         }
1505 
1506         @Override
1507         public ByteOrder order() {
1508             return rootParent().order();
1509         }
1510 
1511         @Override
1512         public ByteBuf unwrap() {
1513             return null;
1514         }
1515 
1516         @Override
1517         public boolean isDirect() {
1518             return rootParent().isDirect();
1519         }
1520 
1521         @Override
1522         public int arrayOffset() {
1523             return idx(rootParent().arrayOffset());
1524         }
1525 
1526         @Override
1527         public boolean hasMemoryAddress() {
1528             return hasMemoryAddress;
1529         }
1530 
1531         @Override
1532         public long memoryAddress() {
1533             ensureAccessible();
1534             return _memoryAddress();
1535         }
1536 
1537         @Override
1538         long _memoryAddress() {
1539             AbstractByteBuf root = rootParent;
1540             return root != null ? root._memoryAddress() + adjustment : 0L;
1541         }
1542 
1543         @Override
1544         public ByteBuffer nioBuffer(int index, int length) {
1545             checkIndex(index, length);
1546             return rootParent().nioBuffer(idx(index), length);
1547         }
1548 
1549         @Override
1550         public ByteBuffer internalNioBuffer(int index, int length) {
1551             checkIndex(index, length);
1552             return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1553         }
1554 
1555         private ByteBuffer internalNioBuffer() {
1556             if (tmpNioBuf == null) {
1557                 tmpNioBuf = rootParent().nioBuffer(adjustment, maxFastCapacity);
1558             }
1559             return (ByteBuffer) tmpNioBuf.clear();
1560         }
1561 
1562         @Override
1563         public ByteBuffer[] nioBuffers(int index, int length) {
1564             checkIndex(index, length);
1565             return rootParent().nioBuffers(idx(index), length);
1566         }
1567 
1568         @Override
1569         public boolean hasArray() {
1570             return hasArray;
1571         }
1572 
1573         @Override
1574         public byte[] array() {
1575             ensureAccessible();
1576             return rootParent().array();
1577         }
1578 
1579         @Override
1580         public ByteBuf copy(int index, int length) {
1581             checkIndex(index, length);
1582             return rootParent().copy(idx(index), length);
1583         }
1584 
1585         @Override
1586         public int nioBufferCount() {
1587             return rootParent().nioBufferCount();
1588         }
1589 
1590         @Override
1591         protected byte _getByte(int index) {
1592             return rootParent()._getByte(idx(index));
1593         }
1594 
1595         @Override
1596         protected short _getShort(int index) {
1597             return rootParent()._getShort(idx(index));
1598         }
1599 
1600         @Override
1601         protected short _getShortLE(int index) {
1602             return rootParent()._getShortLE(idx(index));
1603         }
1604 
1605         @Override
1606         protected int _getUnsignedMedium(int index) {
1607             return rootParent()._getUnsignedMedium(idx(index));
1608         }
1609 
1610         @Override
1611         protected int _getUnsignedMediumLE(int index) {
1612             return rootParent()._getUnsignedMediumLE(idx(index));
1613         }
1614 
1615         @Override
1616         protected int _getInt(int index) {
1617             return rootParent()._getInt(idx(index));
1618         }
1619 
1620         @Override
1621         protected int _getIntLE(int index) {
1622             return rootParent()._getIntLE(idx(index));
1623         }
1624 
1625         @Override
1626         protected long _getLong(int index) {
1627             return rootParent()._getLong(idx(index));
1628         }
1629 
1630         @Override
1631         protected long _getLongLE(int index) {
1632             return rootParent()._getLongLE(idx(index));
1633         }
1634 
1635         @Override
1636         public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1637             checkIndex(index, length);
1638             rootParent().getBytes(idx(index), dst, dstIndex, length);
1639             return this;
1640         }
1641 
1642         @Override
1643         public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1644             checkIndex(index, length);
1645             rootParent().getBytes(idx(index), dst, dstIndex, length);
1646             return this;
1647         }
1648 
1649         @Override
1650         public ByteBuf getBytes(int index, ByteBuffer dst) {
1651             checkIndex(index, dst.remaining());
1652             rootParent().getBytes(idx(index), dst);
1653             return this;
1654         }
1655 
1656         @Override
1657         protected void _setByte(int index, int value) {
1658             rootParent()._setByte(idx(index), value);
1659         }
1660 
1661         @Override
1662         protected void _setShort(int index, int value) {
1663             rootParent()._setShort(idx(index), value);
1664         }
1665 
1666         @Override
1667         protected void _setShortLE(int index, int value) {
1668             rootParent()._setShortLE(idx(index), value);
1669         }
1670 
1671         @Override
1672         protected void _setMedium(int index, int value) {
1673             rootParent()._setMedium(idx(index), value);
1674         }
1675 
1676         @Override
1677         protected void _setMediumLE(int index, int value) {
1678             rootParent()._setMediumLE(idx(index), value);
1679         }
1680 
1681         @Override
1682         protected void _setInt(int index, int value) {
1683             rootParent()._setInt(idx(index), value);
1684         }
1685 
1686         @Override
1687         protected void _setIntLE(int index, int value) {
1688             rootParent()._setIntLE(idx(index), value);
1689         }
1690 
1691         @Override
1692         protected void _setLong(int index, long value) {
1693             rootParent()._setLong(idx(index), value);
1694         }
1695 
1696         @Override
1697         protected void _setLongLE(int index, long value) {
1698             rootParent().setLongLE(idx(index), value);
1699         }
1700 
1701         @Override
1702         public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1703             checkIndex(index, length);
1704             ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1705             tmp.put(src, srcIndex, length);
1706             return this;
1707         }
1708 
1709         @Override
1710         public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1711             checkIndex(index, length);
1712             ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1713             tmp.put(src.nioBuffer(srcIndex, length));
1714             return this;
1715         }
1716 
1717         @Override
1718         public ByteBuf setBytes(int index, ByteBuffer src) {
1719             checkIndex(index, src.remaining());
1720             ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1721             tmp.put(src);
1722             return this;
1723         }
1724 
1725         @Override
1726         public ByteBuf getBytes(int index, OutputStream out, int length)
1727                 throws IOException {
1728             checkIndex(index, length);
1729             if (length != 0) {
1730                 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1731             }
1732             return this;
1733         }
1734 
1735         @Override
1736         public int getBytes(int index, GatheringByteChannel out, int length)
1737                 throws IOException {
1738             ByteBuffer buf = internalNioBuffer().duplicate();
1739             buf.clear().position(index).limit(index + length);
1740             return out.write(buf);
1741         }
1742 
1743         @Override
1744         public int getBytes(int index, FileChannel out, long position, int length)
1745                 throws IOException {
1746             ByteBuffer buf = internalNioBuffer().duplicate();
1747             buf.clear().position(index).limit(index + length);
1748             return out.write(buf, position);
1749         }
1750 
1751         @Override
1752         public int setBytes(int index, InputStream in, int length)
1753                 throws IOException {
1754             checkIndex(index, length);
1755             final AbstractByteBuf rootParent = rootParent();
1756             if (rootParent.hasArray()) {
1757                 return rootParent.setBytes(idx(index), in, length);
1758             }
1759             byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1760             int readBytes = in.read(tmp, 0, length);
1761             if (readBytes <= 0) {
1762                 return readBytes;
1763             }
1764             setBytes(index, tmp, 0, readBytes);
1765             return readBytes;
1766         }
1767 
1768         @Override
1769         public int setBytes(int index, ScatteringByteChannel in, int length)
1770                 throws IOException {
1771             try {
1772                 return in.read(internalNioBuffer(index, length).duplicate());
1773             } catch (ClosedChannelException ignored) {
1774                 return -1;
1775             }
1776         }
1777 
1778         @Override
1779         public int setBytes(int index, FileChannel in, long position, int length)
1780                 throws IOException {
1781             try {
1782                 return in.read(internalNioBuffer(index, length).duplicate(), position);
1783             } catch (ClosedChannelException ignored) {
1784                 return -1;
1785             }
1786         }
1787 
1788         @Override
1789         public int setCharSequence(int index, CharSequence sequence, Charset charset) {
1790             return setCharSequence0(index, sequence, charset, false);
1791         }
1792 
1793         private int setCharSequence0(int index, CharSequence sequence, Charset charset, boolean expand) {
1794             if (charset.equals(CharsetUtil.UTF_8)) {
1795                 int length = ByteBufUtil.utf8MaxBytes(sequence);
1796                 if (expand) {
1797                     ensureWritable0(length);
1798                     checkIndex0(index, length);
1799                 } else {
1800                     checkIndex(index, length);
1801                 }
1802                 // Directly pass in the rootParent() with the adjusted index
1803                 return ByteBufUtil.writeUtf8(rootParent(), idx(index), length, sequence, sequence.length());
1804             }
1805             if (charset.equals(CharsetUtil.US_ASCII) || charset.equals(CharsetUtil.ISO_8859_1)) {
1806                 int length = sequence.length();
1807                 if (expand) {
1808                     ensureWritable0(length);
1809                     checkIndex0(index, length);
1810                 } else {
1811                     checkIndex(index, length);
1812                 }
1813                 // Directly pass in the rootParent() with the adjusted index
1814                 return ByteBufUtil.writeAscii(rootParent(), idx(index), sequence, length);
1815             }
1816             byte[] bytes = sequence.toString().getBytes(charset);
1817             if (expand) {
1818                 ensureWritable0(bytes.length);
1819                 // setBytes(...) will take care of checking the indices.
1820             }
1821             setBytes(index, bytes);
1822             return bytes.length;
1823         }
1824 
1825         @Override
1826         public int writeCharSequence(CharSequence sequence, Charset charset) {
1827             int written = setCharSequence0(writerIndex, sequence, charset, true);
1828             writerIndex += written;
1829             return written;
1830         }
1831 
1832         @Override
1833         public int forEachByte(int index, int length, ByteProcessor processor) {
1834             checkIndex(index, length);
1835             int ret = rootParent().forEachByte(idx(index), length, processor);
1836             return forEachResult(ret);
1837         }
1838 
1839         @Override
1840         public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1841             checkIndex(index, length);
1842             int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1843             return forEachResult(ret);
1844         }
1845 
1846         @Override
1847         public ByteBuf setZero(int index, int length) {
1848             checkIndex(index, length);
1849             rootParent().setZero(idx(index), length);
1850             return this;
1851         }
1852 
1853         @Override
1854         public ByteBuf writeZero(int length) {
1855             ensureWritable(length);
1856             rootParent().setZero(idx(writerIndex), length);
1857             writerIndex += length;
1858             return this;
1859         }
1860 
1861         private int forEachResult(int ret) {
1862             if (ret < adjustment) {
1863                 return -1;
1864             }
1865             return ret - adjustment;
1866         }
1867 
1868         @Override
1869         public boolean isContiguous() {
1870             return rootParent().isContiguous();
1871         }
1872 
1873         private int idx(int index) {
1874             return index + adjustment;
1875         }
1876 
1877         @Override
1878         protected void deallocate() {
1879             if (PlatformDependent.isJfrEnabled() && FreeBufferEvent.isEventEnabled()) {
1880                 FreeBufferEvent event = new FreeBufferEvent();
1881                 if (event.shouldCommit()) {
1882                     event.fill(this, AllocatorType.adaptive);
1883                     event.commit();
1884                 }
1885             }
1886 
1887             if (chunk != null) {
1888                 chunk.releaseSegment(segmentId);
1889             }
1890             tmpNioBuf = null;
1891             chunk = null;
1892             rootParent = null;
1893             if (handle instanceof EnhancedHandle) {
1894                 EnhancedHandle<AdaptiveByteBuf>  enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1895                 enhancedHandle.unguardedRecycle(this);
1896             } else {
1897                 handle.recycle(this);
1898             }
1899         }
1900     }
1901 
1902     /**
1903      * The strategy for how {@link AdaptivePoolingAllocator} should allocate chunk buffers.
1904      */
1905     interface ChunkAllocator {
1906         /**
1907          * Allocate a buffer for a chunk. This can be any kind of {@link AbstractByteBuf} implementation.
1908          * @param initialCapacity The initial capacity of the returned {@link AbstractByteBuf}.
1909          * @param maxCapacity The maximum capacity of the returned {@link AbstractByteBuf}.
1910          * @return The buffer that represents the chunk memory.
1911          */
1912         AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1913     }
1914 }