View Javadoc
1   /*
2    * Copyright 2022 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.buffer;
17  
18  import io.netty.util.ByteProcessor;
19  import io.netty.util.NettyRuntime;
20  import io.netty.util.Recycler;
21  import io.netty.util.ReferenceCounted;
22  import io.netty.util.concurrent.FastThreadLocal;
23  import io.netty.util.concurrent.FastThreadLocalThread;
24  import io.netty.util.internal.ObjectPool;
25  import io.netty.util.internal.ObjectUtil;
26  import io.netty.util.internal.PlatformDependent;
27  import io.netty.util.internal.SuppressJava6Requirement;
28  import io.netty.util.internal.SystemPropertyUtil;
29  import io.netty.util.internal.ThreadExecutorMap;
30  import io.netty.util.internal.UnstableApi;
31  
32  import java.io.IOException;
33  import java.io.InputStream;
34  import java.io.OutputStream;
35  import java.nio.ByteBuffer;
36  import java.nio.ByteOrder;
37  import java.nio.channels.ClosedChannelException;
38  import java.nio.channels.FileChannel;
39  import java.nio.channels.GatheringByteChannel;
40  import java.nio.channels.ScatteringByteChannel;
41  import java.util.Arrays;
42  import java.util.Queue;
43  import java.util.Set;
44  import java.util.concurrent.ConcurrentLinkedQueue;
45  import java.util.concurrent.CopyOnWriteArraySet;
46  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
47  import java.util.concurrent.atomic.AtomicLong;
48  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
49  import java.util.concurrent.locks.StampedLock;
50  
51  /**
52   * An auto-tuning pooling allocator, that follows an anti-generational hypothesis.
53   * <p>
54   * The allocator is organized into a list of Magazines, and each magazine has a chunk-buffer that they allocate buffers
55   * from.
56   * <p>
57   * The magazines hold the mutexes that ensure the thread-safety of the allocator, and each thread picks a magazine
58   * based on the id of the thread. This spreads the contention of multi-threaded access across the magazines.
59   * If contention is detected above a certain threshold, the number of magazines are increased in response to the
60   * contention.
61   * <p>
62   * The magazines maintain histograms of the sizes of the allocations they do. The histograms are used to compute the
63   * preferred chunk size. The preferred chunk size is one that is big enough to service 10 allocations of the
64   * 99-percentile size. This way, the chunk size is adapted to the allocation patterns.
65   * <p>
66   * Computing the preferred chunk size is a somewhat expensive operation. Therefore, the frequency with which this is
67   * done, is also adapted to the allocation pattern. If a newly computed preferred chunk is the same as the previous
68   * preferred chunk size, then the frequency is reduced. Otherwise, the frequency is increased.
69   * <p>
70   * This allows the allocator to quickly respond to changes in the application workload,
71   * without suffering undue overhead from maintaining its statistics.
72   * <p>
73   * Since magazines are "relatively thread-local", the allocator has a central queue that allow excess chunks from any
74   * magazine, to be shared with other magazines.
75   * The {@link #createSharedChunkQueue()} method can be overridden to customize this queue.
76   */
77  @SuppressJava6Requirement(reason = "Guarded by version check")
78  @UnstableApi
79  final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.AdaptiveAllocatorApi {
80  
81      enum MagazineCaching {
82          EventLoopThreads,
83          FastThreadLocalThreads,
84          None
85      }
86  
87      private static final int EXPANSION_ATTEMPTS = 3;
88      private static final int INITIAL_MAGAZINES = 4;
89      private static final int RETIRE_CAPACITY = 4 * 1024;
90      private static final int MIN_CHUNK_SIZE = 128 * 1024;
91      private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
92      private static final int BUFS_PER_CHUNK = 10; // For large buffers, aim to have about this many buffers per chunk.
93  
94      /**
95       * The maximum size of a pooled chunk, in bytes. Allocations bigger than this will never be pooled.
96       * <p>
97       * This number is 10 MiB, and is derived from the limitations of internal histograms.
98       */
99      private static final int MAX_CHUNK_SIZE =
100             BUFS_PER_CHUNK * (1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT); // 10 MiB.
101 
102     /**
103      * The capacity if the central queue that allow chunks to be shared across magazines.
104      * The default size is {@link NettyRuntime#availableProcessors()},
105      * and the maximum number of magazines is twice this.
106      * <p>
107      * This means the maximum amount of memory that we can have allocated-but-not-in-use is
108      * 5 * {@link NettyRuntime#availableProcessors()} * {@link #MAX_CHUNK_SIZE} bytes.
109      */
110     private static final int CENTRAL_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
111             "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors());
112 
113     private static final Object NO_MAGAZINE = Boolean.TRUE;
114 
115     private final ChunkAllocator chunkAllocator;
116     private final Queue<Chunk> centralQueue;
117     private final StampedLock magazineExpandLock;
118     private volatile Magazine[] magazines;
119     private final FastThreadLocal<Object> threadLocalMagazine;
120     private final Set<Magazine> liveCachedMagazines;
121 
122     AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
123         ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
124         ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
125         this.chunkAllocator = chunkAllocator;
126         centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
127         magazineExpandLock = new StampedLock();
128         if (magazineCaching != MagazineCaching.None) {
129             assert magazineCaching == MagazineCaching.EventLoopThreads ||
130                    magazineCaching == MagazineCaching.FastThreadLocalThreads;
131             final boolean cachedMagazinesNonEventLoopThreads =
132                     magazineCaching == MagazineCaching.FastThreadLocalThreads;
133             final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
134             threadLocalMagazine = new FastThreadLocal<Object>() {
135                 @Override
136                 protected Object initialValue() {
137                     if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
138                         Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
139                         liveMagazines.add(mag);
140                         return mag;
141                     }
142                     return NO_MAGAZINE;
143                 }
144 
145                 @Override
146                 protected void onRemoval(final Object value) throws Exception {
147                     if (value != NO_MAGAZINE) {
148                         liveMagazines.remove(value);
149                     }
150                 }
151             };
152             liveCachedMagazines = liveMagazines;
153         } else {
154             threadLocalMagazine = null;
155             liveCachedMagazines = null;
156         }
157         Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
158         for (int i = 0; i < mags.length; i++) {
159             mags[i] = new Magazine(this);
160         }
161         magazines = mags;
162     }
163 
164     /**
165      * Create a thread-safe multi-producer, multi-consumer queue to hold chunks that spill over from the
166      * internal Magazines.
167      * <p>
168      * Each Magazine can only hold two chunks at any one time: the chunk it currently allocates from,
169      * and the next-in-line chunk which will be used for allocation once the current one has been used up.
170      * This queue will be used by magazines to share any excess chunks they allocate, so that they don't need to
171      * allocate new chunks when their current and next-in-line chunks have both been used up.
172      * <p>
173      * The simplest implementation of this method is to return a new {@link ConcurrentLinkedQueue}.
174      * However, the {@code CLQ} is unbounded, and this means there's no limit to how many chunks can be cached in this
175      * queue.
176      * <p>
177      * Each chunk in this queue can be up to {@link #MAX_CHUNK_SIZE} in size, so it is recommended to use a bounded
178      * queue to limit the maximum memory usage.
179      * <p>
180      * The default implementation will create a bounded queue with a capacity of {@link #CENTRAL_QUEUE_CAPACITY}.
181      *
182      * @return A new multi-producer, multi-consumer queue.
183      */
184     private static Queue<Chunk> createSharedChunkQueue() {
185         return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
186     }
187 
188     @Override
189     public ByteBuf allocate(int size, int maxCapacity) {
190         if (size <= MAX_CHUNK_SIZE) {
191             Thread currentThread = Thread.currentThread();
192             boolean willCleanupFastThreadLocals = FastThreadLocalThread.willCleanupFastThreadLocals(currentThread);
193             AdaptiveByteBuf buf = AdaptiveByteBuf.newInstance(willCleanupFastThreadLocals);
194             AdaptiveByteBuf result = allocate(size, maxCapacity, currentThread, buf);
195             if (result != null) {
196                 return result;
197             }
198             // Return the buffer we pulled from the recycler but didn't use.
199             buf.release();
200         }
201         // The magazines failed us, or the buffer is too big to be pooled.
202         return chunkAllocator.allocate(size, maxCapacity);
203     }
204 
205     private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
206         int sizeBucket = AllocationStatistics.sizeBucket(size); // Compute outside of Magazine lock for better ILP.
207         FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
208         if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
209             Object mag = threadLocalMagazine.get();
210             if (mag != NO_MAGAZINE) {
211                 return ((Magazine) mag).allocate(size, sizeBucket, maxCapacity, buf);
212             }
213         }
214         long threadId = currentThread.getId();
215         Magazine[] mags;
216         int expansions = 0;
217         do {
218             mags = magazines;
219             int mask = mags.length - 1;
220             int index = (int) (threadId & mask);
221             for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
222                 Magazine mag = mags[index + i & mask];
223                 long writeLock = mag.tryWriteLock();
224                 if (writeLock != 0) {
225                     try {
226                         return mag.allocate(size, sizeBucket, maxCapacity, buf);
227                     } finally {
228                         mag.unlockWrite(writeLock);
229                     }
230                 }
231             }
232             expansions++;
233         } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
234         return null;
235     }
236 
237     /**
238      * Allocate into the given buffer. Used by {@link AdaptiveByteBuf#capacity(int)}.
239      */
240     void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
241         Magazine magazine = into.chunk.magazine;
242         AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
243         if (result == null) {
244             // Create a one-off chunk for this allocation.
245             AbstractByteBuf innerChunk = (AbstractByteBuf) chunkAllocator.allocate(size, maxCapacity);
246             Chunk chunk = new Chunk(innerChunk, magazine, false);
247             chunk.readInitInto(into, size, maxCapacity);
248         }
249     }
250 
251     @Override
252     public long usedMemory() {
253         long sum = 0;
254         for (Chunk chunk : centralQueue) {
255             sum += chunk.capacity();
256         }
257         for (Magazine magazine : magazines) {
258             sum += magazine.usedMemory.get();
259         }
260         if (liveCachedMagazines != null) {
261             for (Magazine magazine : liveCachedMagazines) {
262                 sum += magazine.usedMemory.get();
263             }
264         }
265         return sum;
266     }
267 
268     private boolean tryExpandMagazines(int currentLength) {
269         if (currentLength >= MAX_STRIPES) {
270             return true;
271         }
272         long writeLock = magazineExpandLock.tryWriteLock();
273         if (writeLock != 0) {
274             try {
275                 Magazine[] mags = magazines;
276                 if (mags.length >= MAX_STRIPES || mags.length > currentLength) {
277                     return true;
278                 }
279                 Magazine[] expanded = Arrays.copyOf(mags, mags.length * 2);
280                 for (int i = mags.length, m = expanded.length; i < m; i++) {
281                     expanded[i] = new Magazine(this);
282                 }
283                 magazines = expanded;
284             } finally {
285                 magazineExpandLock.unlockWrite(writeLock);
286             }
287         }
288         return true;
289     }
290 
291     private boolean offerToQueue(Chunk buffer) {
292         return centralQueue.offer(buffer);
293     }
294 
295     @SuppressJava6Requirement(reason = "Guarded by version check")
296     @SuppressWarnings("checkstyle:finalclass") // Checkstyle mistakenly believes this class should be final.
297     private static class AllocationStatistics extends StampedLock {
298         private static final long serialVersionUID = -8319929980932269688L;
299         private static final int MIN_DATUM_TARGET = 1024;
300         private static final int MAX_DATUM_TARGET = 65534;
301         private static final int INIT_DATUM_TARGET = 8192;
302         private static final int HISTO_MIN_BUCKET_SHIFT = 13; // Smallest bucket is 1 << 13 = 8192 bytes in size.
303         private static final int HISTO_MAX_BUCKET_SHIFT = 20; // Biggest bucket is 1 << 20 = 1 MiB bytes in size.
304         private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT; // 8 buckets.
305         private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
306 
307         protected final AdaptivePoolingAllocator parent;
308         private final boolean shareable;
309         private final short[][] histos = {
310                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
311                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
312         };
313         private short[] histo = histos[0];
314         private final int[] sums = new int[HISTO_BUCKET_COUNT];
315 
316         private int histoIndex;
317         private int datumCount;
318         private int datumTarget = INIT_DATUM_TARGET;
319         private volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
320         protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
321 
322         private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
323             this.parent = parent;
324             this.shareable = shareable;
325         }
326 
327         protected void recordAllocationSize(int bucket) {
328             histo[bucket]++;
329             if (datumCount++ == datumTarget) {
330                 rotateHistograms();
331             }
332         }
333 
334         static int sizeBucket(int size) {
335             // Minimum chunk size is 128 KiB. We'll only make bigger chunks if the 99-percentile is 16 KiB or greater,
336             // so we truncate and roll up the bottom part of the histogram to 8 KiB.
337             // The upper size band is 1 MiB, and that gives us exactly 8 size buckets,
338             // which is a magical number for JIT optimisations.
339             int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & HISTO_MAX_BUCKET_MASK;
340             return Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize);
341         }
342 
343         private void rotateHistograms() {
344             short[][] hs = histos;
345             for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
346                 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
347             }
348             int sum = 0;
349             for (int count : sums) {
350                 sum  += count;
351             }
352             int targetPercentile = (int) (sum * 0.99);
353             int sizeBucket = 0;
354             for (; sizeBucket < sums.length; sizeBucket++) {
355                 if (sums[sizeBucket] > targetPercentile) {
356                     break;
357                 }
358                 targetPercentile -= sums[sizeBucket];
359             }
360             int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
361             int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
362             localPrefChunkSize = prefChunkSize;
363             if (shareable) {
364                 for (Magazine mag : parent.magazines) {
365                     prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
366                 }
367             }
368             if (sharedPrefChunkSize != prefChunkSize) {
369                 // Preferred chunk size changed. Increase check frequency.
370                 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
371                 sharedPrefChunkSize = prefChunkSize;
372             } else {
373                 // Preferred chunk size did not change. Check less often.
374                 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
375             }
376 
377             histoIndex = histoIndex + 1 & 3;
378             histo = histos[histoIndex];
379             datumCount = 0;
380             Arrays.fill(histo, (short) 0);
381         }
382 
383         /**
384          * Get the preferred chunk size, based on statistics from the {@linkplain #recordAllocationSize(int) recorded}
385          * allocation sizes.
386          * <p>
387          * This method must be thread-safe.
388          *
389          * @return The currently preferred chunk allocation size.
390          */
391         protected int preferredChunkSize() {
392             return sharedPrefChunkSize;
393         }
394     }
395 
396     private static final class Magazine extends AllocationStatistics {
397         private static final long serialVersionUID = -4068223712022528165L;
398         private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
399 
400         static {
401             NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
402         }
403         private Chunk current;
404         @SuppressWarnings("unused") // updated via NEXT_IN_LINE
405         private volatile Chunk nextInLine;
406         private final AtomicLong usedMemory;
407 
408         Magazine(AdaptivePoolingAllocator parent) {
409             this(parent, true);
410         }
411 
412         Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
413             super(parent, shareable);
414             usedMemory = new AtomicLong();
415         }
416 
417         public AdaptiveByteBuf allocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
418             recordAllocationSize(sizeBucket);
419             Chunk curr = current;
420             if (curr != null && curr.remainingCapacity() >= size) {
421                 if (curr.remainingCapacity() == size) {
422                     current = null;
423                     try {
424                         return curr.readInitInto(buf, size, maxCapacity);
425                     } finally {
426                         curr.release();
427                     }
428                 }
429                 return curr.readInitInto(buf, size, maxCapacity);
430             }
431             if (curr != null) {
432                 curr.release();
433             }
434             if (nextInLine != null) {
435                 curr = NEXT_IN_LINE.getAndSet(this, null);
436             } else {
437                 curr = parent.centralQueue.poll();
438                 if (curr == null) {
439                     curr = newChunkAllocation(size);
440                 }
441             }
442             current = curr;
443             final AdaptiveByteBuf result;
444             if (curr.remainingCapacity() > size) {
445                 result = curr.readInitInto(buf, size, maxCapacity);
446             } else if (curr.remainingCapacity() == size) {
447                 result = curr.readInitInto(buf, size, maxCapacity);
448                 curr.release();
449                 current = null;
450             } else {
451                 Chunk newChunk = newChunkAllocation(size);
452                 result = newChunk.readInitInto(buf, size, maxCapacity);
453                 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
454                     curr.release();
455                     current = newChunk;
456                 } else if (!(boolean) NEXT_IN_LINE.compareAndSet(this, null, newChunk)) {
457                     if (!parent.offerToQueue(newChunk)) {
458                         // Next-in-line is occupied AND the central queue is full.
459                         // Rare that we should get here, but we'll only do one allocation out of this chunk, then.
460                         newChunk.release();
461                     }
462                 }
463             }
464             return result;
465         }
466 
467         private Chunk newChunkAllocation(int promptingSize) {
468             int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
469             ChunkAllocator chunkAllocator = parent.chunkAllocator;
470             Chunk chunk = new Chunk((AbstractByteBuf) chunkAllocator.allocate(size, size), this, true);
471             return chunk;
472         }
473 
474         boolean trySetNextInLine(Chunk buffer) {
475             return NEXT_IN_LINE.compareAndSet(this, null, buffer);
476         }
477     }
478 
479     private static final class Chunk {
480 
481         /**
482          * We're using 2 separate counters for reference counting, one for the up-count and one for the down-count,
483          * in order to speed up the borrowing, which shouldn't need atomic operations, being single-threaded.
484          */
485         private static final AtomicIntegerFieldUpdater<Chunk> REF_CNT_UP_UPDATER =
486                 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCntUp");
487         private static final AtomicIntegerFieldUpdater<Chunk> REF_CNT_DOWN_UPDATER =
488                 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCntDown");
489 
490         private final AbstractByteBuf delegate;
491         private final Magazine magazine;
492         private final int capacity;
493         private final boolean pooled;
494         private int allocatedBytes;
495         private volatile int refCntUp;
496         private volatile int refCntDown;
497 
498         Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
499             this.delegate = delegate;
500             this.magazine = magazine;
501             this.pooled = pooled;
502             this.capacity = delegate.capacity();
503             magazine.usedMemory.getAndAdd(capacity);
504             REF_CNT_UP_UPDATER.lazySet(this, 1);
505         }
506 
507         protected void deallocate() {
508             Magazine mag = magazine;
509             AdaptivePoolingAllocator parent = mag.parent;
510             int chunkSize = mag.preferredChunkSize();
511             int memSize = delegate.capacity();
512             if (!pooled || memSize < chunkSize || memSize > chunkSize + (chunkSize >> 1)) {
513                 // Drop the chunk if the parent allocator is closed, or if the chunk is smaller than the
514                 // preferred chunk size, or over 50% larger than the preferred chunk size.
515                 mag.usedMemory.getAndAdd(-capacity());
516                 delegate.release();
517             } else {
518                 REF_CNT_UP_UPDATER.lazySet(this, 1);
519                 REF_CNT_DOWN_UPDATER.lazySet(this, 0);
520                 delegate.setIndex(0, 0);
521                 allocatedBytes = 0;
522                 if (!mag.trySetNextInLine(this)) {
523                     if (!parent.offerToQueue(this)) {
524                         // The central queue is full. Drop the memory with the original Drop instance.
525                         delegate.release();
526                     }
527                 }
528             }
529         }
530 
531         public AdaptiveByteBuf readInitInto(AdaptiveByteBuf buf, int size, int maxCapacity) {
532             int startIndex = allocatedBytes;
533             allocatedBytes = startIndex + size;
534             unguardedRetain();
535             buf.init(delegate, this, 0, 0, startIndex, size, maxCapacity);
536             return buf;
537         }
538 
539         public int remainingCapacity() {
540             return capacity - allocatedBytes;
541         }
542 
543         public int capacity() {
544             return capacity;
545         }
546 
547         private void unguardedRetain() {
548             REF_CNT_UP_UPDATER.lazySet(this, refCntUp + 1);
549         }
550 
551         public void release() {
552             int refCntDown;
553             boolean deallocate;
554             do {
555                 int refCntUp = this.refCntUp;
556                 refCntDown = this.refCntDown;
557                 int remaining = refCntUp - refCntDown;
558                 if (remaining <= 0) {
559                     throw new IllegalStateException("RefCnt is already 0");
560                 }
561                 deallocate = remaining == 1;
562             } while (!REF_CNT_DOWN_UPDATER.compareAndSet(this, refCntDown, refCntDown + 1));
563             if (deallocate) {
564                 deallocate();
565             }
566         }
567     }
568 
569     static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
570         static final ObjectPool<AdaptiveByteBuf> RECYCLER = ObjectPool.newPool(
571                 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
572                     @Override
573                     public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
574                         return new AdaptiveByteBuf(handle);
575                     }
576                 });
577 
578         static AdaptiveByteBuf newInstance(boolean useThreadLocal) {
579             if (useThreadLocal) {
580                 AdaptiveByteBuf buf = RECYCLER.get();
581                 buf.resetRefCnt();
582                 buf.discardMarks();
583                 return buf;
584             }
585             return new AdaptiveByteBuf(null);
586         }
587 
588         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
589 
590         int adjustment;
591         private AbstractByteBuf rootParent;
592         private Chunk chunk;
593         private int length;
594         private ByteBuffer tmpNioBuf;
595 
596         AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
597             super(0);
598             handle = recyclerHandle;
599         }
600 
601         void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
602                   int adjustment, int capacity, int maxCapacity) {
603             this.adjustment = adjustment;
604             chunk = wrapped;
605             length = capacity;
606             maxCapacity(maxCapacity);
607             setIndex0(readerIndex, writerIndex);
608             rootParent = unwrapped;
609             tmpNioBuf = rootParent.internalNioBuffer(adjustment, capacity).slice();
610         }
611 
612         @Override
613         public int capacity() {
614             return length;
615         }
616 
617         @Override
618         public ByteBuf capacity(int newCapacity) {
619             if (newCapacity == capacity()) {
620                 ensureAccessible();
621                 return this;
622             }
623             checkNewCapacity(newCapacity);
624             if (newCapacity < capacity()) {
625                 length = newCapacity;
626                 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
627                 return this;
628             }
629 
630             // Reallocation required.
631             ByteBuffer data = tmpNioBuf;
632             data.clear();
633             tmpNioBuf = null;
634             Chunk chunk = this.chunk;
635             Magazine magazine = chunk.magazine;
636             AdaptivePoolingAllocator allocator = magazine.parent;
637             int readerIndex = this.readerIndex;
638             int writerIndex = this.writerIndex;
639             allocator.allocate(newCapacity, maxCapacity(), this);
640             tmpNioBuf.put(data);
641             tmpNioBuf.clear();
642             chunk.release();
643             this.readerIndex = readerIndex;
644             this.writerIndex = writerIndex;
645             return this;
646         }
647 
648         @Override
649         public ByteBufAllocator alloc() {
650             return rootParent.alloc();
651         }
652 
653         @Override
654         public ByteOrder order() {
655             return rootParent.order();
656         }
657 
658         @Override
659         public ByteBuf unwrap() {
660             return null;
661         }
662 
663         @Override
664         public boolean isDirect() {
665             return rootParent.isDirect();
666         }
667 
668         @Override
669         public int arrayOffset() {
670             return idx(rootParent.arrayOffset());
671         }
672 
673         @Override
674         public boolean hasMemoryAddress() {
675             return rootParent.hasMemoryAddress();
676         }
677 
678         @Override
679         public long memoryAddress() {
680             ensureAccessible();
681             return rootParent.memoryAddress() + adjustment;
682         }
683 
684         @Override
685         public ByteBuffer nioBuffer(int index, int length) {
686             checkIndex(index, length);
687             return rootParent.nioBuffer(idx(index), length);
688         }
689 
690         @Override
691         public ByteBuffer internalNioBuffer(int index, int length) {
692             checkIndex(index, length);
693             return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
694         }
695 
696         private ByteBuffer internalNioBuffer() {
697             return (ByteBuffer) tmpNioBuf.clear();
698         }
699 
700         @Override
701         public ByteBuffer[] nioBuffers(int index, int length) {
702             checkIndex(index, length);
703             return rootParent.nioBuffers(idx(index), length);
704         }
705 
706         @Override
707         public boolean hasArray() {
708             return rootParent.hasArray();
709         }
710 
711         @Override
712         public byte[] array() {
713             ensureAccessible();
714             return rootParent.array();
715         }
716 
717         @Override
718         public ByteBuf copy(int index, int length) {
719             checkIndex(index, length);
720             return rootParent.copy(idx(index), length);
721         }
722 
723         @Override
724         public ByteBuf slice(int index, int length) {
725             checkIndex(index, length);
726             return new PooledNonRetainedSlicedByteBuf(this, rootParent, idx(index), length);
727         }
728 
729         @Override
730         public ByteBuf retainedSlice(int index, int length) {
731             return slice(index, length).retain();
732         }
733 
734         @Override
735         public ByteBuf duplicate() {
736             ensureAccessible();
737             return new PooledNonRetainedDuplicateByteBuf(this, this).setIndex(readerIndex(), writerIndex());
738         }
739 
740         @Override
741         public ByteBuf retainedDuplicate() {
742             return duplicate().retain();
743         }
744 
745         @Override
746         public int nioBufferCount() {
747             return rootParent.nioBufferCount();
748         }
749 
750         @Override
751         public byte getByte(int index) {
752             checkIndex(index, 1);
753             return rootParent.getByte(idx(index));
754         }
755 
756         @Override
757         protected byte _getByte(int index) {
758             return rootParent._getByte(idx(index));
759         }
760 
761         @Override
762         public short getShort(int index) {
763             checkIndex(index, 2);
764             return rootParent.getShort(idx(index));
765         }
766 
767         @Override
768         protected short _getShort(int index) {
769             return rootParent._getShort(idx(index));
770         }
771 
772         @Override
773         public short getShortLE(int index) {
774             checkIndex(index, 2);
775             return rootParent.getShortLE(idx(index));
776         }
777 
778         @Override
779         protected short _getShortLE(int index) {
780             return rootParent._getShortLE(idx(index));
781         }
782 
783         @Override
784         public int getUnsignedMedium(int index) {
785             checkIndex(index, 3);
786             return rootParent.getUnsignedMedium(idx(index));
787         }
788 
789         @Override
790         protected int _getUnsignedMedium(int index) {
791             return rootParent._getUnsignedMedium(idx(index));
792         }
793 
794         @Override
795         public int getUnsignedMediumLE(int index) {
796             checkIndex(index, 3);
797             return rootParent.getUnsignedMediumLE(idx(index));
798         }
799 
800         @Override
801         protected int _getUnsignedMediumLE(int index) {
802             return rootParent._getUnsignedMediumLE(idx(index));
803         }
804 
805         @Override
806         public int getInt(int index) {
807             checkIndex(index, 4);
808             return rootParent.getInt(idx(index));
809         }
810 
811         @Override
812         protected int _getInt(int index) {
813             return rootParent._getInt(idx(index));
814         }
815 
816         @Override
817         public int getIntLE(int index) {
818             checkIndex(index, 4);
819             return rootParent.getIntLE(idx(index));
820         }
821 
822         @Override
823         protected int _getIntLE(int index) {
824             return rootParent._getIntLE(idx(index));
825         }
826 
827         @Override
828         public long getLong(int index) {
829             checkIndex(index, 8);
830             return rootParent.getLong(idx(index));
831         }
832 
833         @Override
834         protected long _getLong(int index) {
835             return rootParent._getLong(idx(index));
836         }
837 
838         @Override
839         public long getLongLE(int index) {
840             checkIndex(index, 8);
841             return rootParent.getLongLE(idx(index));
842         }
843 
844         @Override
845         protected long _getLongLE(int index) {
846             return rootParent._getLongLE(idx(index));
847         }
848 
849         @Override
850         public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
851             checkIndex(index, length);
852             rootParent.getBytes(idx(index), dst, dstIndex, length);
853             return this;
854         }
855 
856         @Override
857         public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
858             checkIndex(index, length);
859             rootParent.getBytes(idx(index), dst, dstIndex, length);
860             return this;
861         }
862 
863         @Override
864         public ByteBuf getBytes(int index, ByteBuffer dst) {
865             checkIndex(index, dst.remaining());
866             rootParent.getBytes(idx(index), dst);
867             return this;
868         }
869 
870         @Override
871         public ByteBuf setByte(int index, int value) {
872             checkIndex(index, 1);
873             rootParent.setByte(idx(index), value);
874             return this;
875         }
876 
877         @Override
878         protected void _setByte(int index, int value) {
879             rootParent._setByte(idx(index), value);
880         }
881 
882         @Override
883         public ByteBuf setShort(int index, int value) {
884             checkIndex(index, 2);
885             rootParent.setShort(idx(index), value);
886             return this;
887         }
888 
889         @Override
890         protected void _setShort(int index, int value) {
891             rootParent._setShort(idx(index), value);
892         }
893 
894         @Override
895         public ByteBuf setShortLE(int index, int value) {
896             checkIndex(index, 2);
897             rootParent.setShortLE(idx(index), value);
898             return this;
899         }
900 
901         @Override
902         protected void _setShortLE(int index, int value) {
903             rootParent._setShortLE(idx(index), value);
904         }
905 
906         @Override
907         public ByteBuf setMedium(int index, int value) {
908             checkIndex(index, 3);
909             rootParent.setMedium(idx(index), value);
910             return this;
911         }
912 
913         @Override
914         protected void _setMedium(int index, int value) {
915             rootParent._setMedium(idx(index), value);
916         }
917 
918         @Override
919         public ByteBuf setMediumLE(int index, int value) {
920             checkIndex(index, 3);
921             rootParent.setMediumLE(idx(index), value);
922             return this;
923         }
924 
925         @Override
926         protected void _setMediumLE(int index, int value) {
927             rootParent._setMediumLE(idx(index), value);
928         }
929 
930         @Override
931         public ByteBuf setInt(int index, int value) {
932             checkIndex(index, 4);
933             rootParent.setInt(idx(index), value);
934             return this;
935         }
936 
937         @Override
938         protected void _setInt(int index, int value) {
939             rootParent._setInt(idx(index), value);
940         }
941 
942         @Override
943         public ByteBuf setIntLE(int index, int value) {
944             checkIndex(index, 4);
945             rootParent.setIntLE(idx(index), value);
946             return this;
947         }
948 
949         @Override
950         protected void _setIntLE(int index, int value) {
951             rootParent._setIntLE(idx(index), value);
952         }
953 
954         @Override
955         public ByteBuf setLong(int index, long value) {
956             checkIndex(index, 8);
957             rootParent.setLong(idx(index), value);
958             return this;
959         }
960 
961         @Override
962         protected void _setLong(int index, long value) {
963             rootParent._setLong(idx(index), value);
964         }
965 
966         @Override
967         public ByteBuf setLongLE(int index, long value) {
968             checkIndex(index, 8);
969             rootParent.setLongLE(idx(index), value);
970             return this;
971         }
972 
973         @Override
974         protected void _setLongLE(int index, long value) {
975             rootParent.setLongLE(idx(index), value);
976         }
977 
978         @Override
979         public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
980             checkIndex(index, length);
981             rootParent.setBytes(idx(index), src, srcIndex, length);
982             return this;
983         }
984 
985         @Override
986         public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
987             checkIndex(index, length);
988             rootParent.setBytes(idx(index), src, srcIndex, length);
989             return this;
990         }
991 
992         @Override
993         public ByteBuf setBytes(int index, ByteBuffer src) {
994             checkIndex(index, src.remaining());
995             rootParent.setBytes(idx(index), src);
996             return this;
997         }
998 
999         @Override
1000         public ByteBuf getBytes(int index, OutputStream out, int length)
1001                 throws IOException {
1002             checkIndex(index, length);
1003             if (length != 0) {
1004                 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1005             }
1006             return this;
1007         }
1008 
1009         @Override
1010         public int getBytes(int index, GatheringByteChannel out, int length)
1011                 throws IOException {
1012             return out.write(internalNioBuffer(index, length).duplicate());
1013         }
1014 
1015         @Override
1016         public int getBytes(int index, FileChannel out, long position, int length)
1017                 throws IOException {
1018             return out.write(internalNioBuffer(index, length).duplicate(), position);
1019         }
1020 
1021         @Override
1022         public int setBytes(int index, InputStream in, int length)
1023                 throws IOException {
1024             checkIndex(index, length);
1025             if (rootParent.hasArray()) {
1026                 return rootParent.setBytes(idx(index), in, length);
1027             }
1028             byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1029             int readBytes = in.read(tmp, 0, length);
1030             if (readBytes <= 0) {
1031                 return readBytes;
1032             }
1033             setBytes(index, tmp, 0, readBytes);
1034             return readBytes;
1035         }
1036 
1037         @Override
1038         public int setBytes(int index, ScatteringByteChannel in, int length)
1039                 throws IOException {
1040             try {
1041                 return in.read(internalNioBuffer(index, length).duplicate());
1042             } catch (ClosedChannelException ignored) {
1043                 return -1;
1044             }
1045         }
1046 
1047         @Override
1048         public int setBytes(int index, FileChannel in, long position, int length)
1049                 throws IOException {
1050             try {
1051                 return in.read(internalNioBuffer(index, length).duplicate(), position);
1052             } catch (ClosedChannelException ignored) {
1053                 return -1;
1054             }
1055         }
1056 
1057         @Override
1058         public int forEachByte(int index, int length, ByteProcessor processor) {
1059             checkIndex(index, length);
1060             int ret = rootParent.forEachByte(idx(index), length, processor);
1061             if (ret < adjustment) {
1062                 return -1;
1063             }
1064             return ret - adjustment;
1065         }
1066 
1067         @Override
1068         public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1069             checkIndex(index, length);
1070             int ret = rootParent.forEachByteDesc(idx(index), length, processor);
1071             if (ret < adjustment) {
1072                 return -1;
1073             }
1074             return ret - adjustment;
1075         }
1076 
1077         @Override
1078         public boolean isContiguous() {
1079             return rootParent.isContiguous();
1080         }
1081 
1082         private int idx(int index) {
1083             return index + adjustment;
1084         }
1085 
1086         @Override
1087         protected void deallocate() {
1088             tmpNioBuf = null;
1089             if (chunk != null) {
1090                 chunk.release();
1091             }
1092             if (handle instanceof Recycler.EnhancedHandle) {
1093                 ((Recycler.EnhancedHandle<?>) handle).unguardedRecycle(this);
1094             } else if (handle != null) {
1095                 handle.recycle(this);
1096             }
1097         }
1098     }
1099 
1100     private static final class PooledNonRetainedDuplicateByteBuf extends UnpooledDuplicatedByteBuf {
1101         private final ReferenceCounted referenceCountDelegate;
1102 
1103         PooledNonRetainedDuplicateByteBuf(ReferenceCounted referenceCountDelegate, AbstractByteBuf buffer) {
1104             super(buffer);
1105             this.referenceCountDelegate = referenceCountDelegate;
1106         }
1107 
1108         @Override
1109         boolean isAccessible0() {
1110             return referenceCountDelegate.refCnt() != 0;
1111         }
1112 
1113         @Override
1114         int refCnt0() {
1115             return referenceCountDelegate.refCnt();
1116         }
1117 
1118         @Override
1119         ByteBuf retain0() {
1120             referenceCountDelegate.retain();
1121             return this;
1122         }
1123 
1124         @Override
1125         ByteBuf retain0(int increment) {
1126             referenceCountDelegate.retain(increment);
1127             return this;
1128         }
1129 
1130         @Override
1131         ByteBuf touch0() {
1132             referenceCountDelegate.touch();
1133             return this;
1134         }
1135 
1136         @Override
1137         ByteBuf touch0(Object hint) {
1138             referenceCountDelegate.touch(hint);
1139             return this;
1140         }
1141 
1142         @Override
1143         boolean release0() {
1144             return referenceCountDelegate.release();
1145         }
1146 
1147         @Override
1148         boolean release0(int decrement) {
1149             return referenceCountDelegate.release(decrement);
1150         }
1151 
1152         @Override
1153         public ByteBuf duplicate() {
1154             ensureAccessible();
1155             return new PooledNonRetainedDuplicateByteBuf(referenceCountDelegate, unwrap());
1156         }
1157 
1158         @Override
1159         public ByteBuf retainedDuplicate() {
1160             return duplicate().retain();
1161         }
1162 
1163         @Override
1164         public ByteBuf slice(int index, int length) {
1165             checkIndex(index, length);
1166             return new PooledNonRetainedSlicedByteBuf(referenceCountDelegate, unwrap(), index, length);
1167         }
1168 
1169         @Override
1170         public ByteBuf retainedSlice() {
1171             // Capacity is not allowed to change for a sliced ByteBuf, so length == capacity()
1172             return retainedSlice(readerIndex(), capacity());
1173         }
1174 
1175         @Override
1176         public ByteBuf retainedSlice(int index, int length) {
1177             return slice(index, length).retain();
1178         }
1179     }
1180 
1181     private static final class PooledNonRetainedSlicedByteBuf extends UnpooledSlicedByteBuf {
1182         private final ReferenceCounted referenceCountDelegate;
1183 
1184         PooledNonRetainedSlicedByteBuf(ReferenceCounted referenceCountDelegate,
1185                                        AbstractByteBuf buffer, int index, int length) {
1186             super(buffer, index, length);
1187             this.referenceCountDelegate = referenceCountDelegate;
1188         }
1189 
1190         @Override
1191         boolean isAccessible0() {
1192             return referenceCountDelegate.refCnt() != 0;
1193         }
1194 
1195         @Override
1196         int refCnt0() {
1197             return referenceCountDelegate.refCnt();
1198         }
1199 
1200         @Override
1201         ByteBuf retain0() {
1202             referenceCountDelegate.retain();
1203             return this;
1204         }
1205 
1206         @Override
1207         ByteBuf retain0(int increment) {
1208             referenceCountDelegate.retain(increment);
1209             return this;
1210         }
1211 
1212         @Override
1213         ByteBuf touch0() {
1214             referenceCountDelegate.touch();
1215             return this;
1216         }
1217 
1218         @Override
1219         ByteBuf touch0(Object hint) {
1220             referenceCountDelegate.touch(hint);
1221             return this;
1222         }
1223 
1224         @Override
1225         boolean release0() {
1226             return referenceCountDelegate.release();
1227         }
1228 
1229         @Override
1230         boolean release0(int decrement) {
1231             return referenceCountDelegate.release(decrement);
1232         }
1233 
1234         @Override
1235         public ByteBuf duplicate() {
1236             ensureAccessible();
1237             return new PooledNonRetainedSlicedByteBuf(referenceCountDelegate, unwrap(), idx(0), capacity())
1238                     .setIndex(readerIndex(), writerIndex());
1239         }
1240 
1241         @Override
1242         public ByteBuf retainedDuplicate() {
1243             return duplicate().retain();
1244         }
1245 
1246         @Override
1247         public ByteBuf slice(int index, int length) {
1248             checkIndex(index, length);
1249             return new PooledNonRetainedSlicedByteBuf(referenceCountDelegate, unwrap(), idx(index), length);
1250         }
1251 
1252         @Override
1253         public ByteBuf retainedSlice() {
1254             // Capacity is not allowed to change for a sliced ByteBuf, so length == capacity()
1255             return retainedSlice(0, capacity());
1256         }
1257 
1258         @Override
1259         public ByteBuf retainedSlice(int index, int length) {
1260             return slice(index, length).retain();
1261         }
1262     }
1263 
1264     /**
1265      * The strategy for how {@link AdaptivePoolingAllocator} should allocate chunk buffers.
1266      */
1267     public interface ChunkAllocator {
1268         /**
1269          * Allocate a buffer for a chunk. This can be any kind of {@link ByteBuf} implementation.
1270          * @param initialCapacity The initial capacity of the returned {@link ByteBuf}.
1271          * @param maxCapacity The maximum capacity of the returned {@link ByteBuf}.
1272          * @return The buffer that represents the chunk memory.
1273          */
1274         ByteBuf allocate(int initialCapacity, int maxCapacity);
1275     }
1276 }