View Javadoc
1   /*
2    * Copyright 2022 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.buffer;
17  
18  import io.netty.util.ByteProcessor;
19  import io.netty.util.NettyRuntime;
20  import io.netty.util.Recycler;
21  import io.netty.util.ReferenceCounted;
22  import io.netty.util.concurrent.FastThreadLocal;
23  import io.netty.util.concurrent.FastThreadLocalThread;
24  import io.netty.util.internal.ObjectPool;
25  import io.netty.util.internal.ObjectUtil;
26  import io.netty.util.internal.PlatformDependent;
27  import io.netty.util.internal.SystemPropertyUtil;
28  import io.netty.util.internal.ThreadExecutorMap;
29  import io.netty.util.internal.UnstableApi;
30  
31  import java.io.IOException;
32  import java.io.InputStream;
33  import java.io.OutputStream;
34  import java.nio.ByteBuffer;
35  import java.nio.ByteOrder;
36  import java.nio.channels.ClosedChannelException;
37  import java.nio.channels.FileChannel;
38  import java.nio.channels.GatheringByteChannel;
39  import java.nio.channels.ScatteringByteChannel;
40  import java.util.Arrays;
41  import java.util.Queue;
42  import java.util.Set;
43  import java.util.concurrent.ConcurrentLinkedQueue;
44  import java.util.concurrent.CopyOnWriteArraySet;
45  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
46  import java.util.concurrent.atomic.AtomicLong;
47  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
48  import java.util.concurrent.locks.StampedLock;
49  
50  /**
51   * An auto-tuning pooling allocator, that follows an anti-generational hypothesis.
52   * <p>
53   * The allocator is organized into a list of Magazines, and each magazine has a chunk-buffer that they allocate buffers
54   * from.
55   * <p>
56   * The magazines hold the mutexes that ensure the thread-safety of the allocator, and each thread picks a magazine
57   * based on the id of the thread. This spreads the contention of multi-threaded access across the magazines.
58   * If contention is detected above a certain threshold, the number of magazines are increased in response to the
59   * contention.
60   * <p>
61   * The magazines maintain histograms of the sizes of the allocations they do. The histograms are used to compute the
62   * preferred chunk size. The preferred chunk size is one that is big enough to service 10 allocations of the
63   * 99-percentile size. This way, the chunk size is adapted to the allocation patterns.
64   * <p>
65   * Computing the preferred chunk size is a somewhat expensive operation. Therefore, the frequency with which this is
66   * done, is also adapted to the allocation pattern. If a newly computed preferred chunk is the same as the previous
67   * preferred chunk size, then the frequency is reduced. Otherwise, the frequency is increased.
68   * <p>
69   * This allows the allocator to quickly respond to changes in the application workload,
70   * without suffering undue overhead from maintaining its statistics.
71   * <p>
72   * Since magazines are "relatively thread-local", the allocator has a central queue that allow excess chunks from any
73   * magazine, to be shared with other magazines.
74   * The {@link #createSharedChunkQueue()} method can be overridden to customize this queue.
75   */
76  @UnstableApi
77  final class AdaptivePoolingAllocator {
78  
79      enum MagazineCaching {
80          EventLoopThreads,
81          FastThreadLocalThreads,
82          None
83      }
84  
85      private static final int EXPANSION_ATTEMPTS = 3;
86      private static final int INITIAL_MAGAZINES = 4;
87      private static final int RETIRE_CAPACITY = 4 * 1024;
88      private static final int MIN_CHUNK_SIZE = 128 * 1024;
89      private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
90      private static final int BUFS_PER_CHUNK = 10; // For large buffers, aim to have about this many buffers per chunk.
91  
92      /**
93       * The maximum size of a pooled chunk, in bytes. Allocations bigger than this will never be pooled.
94       * <p>
95       * This number is 10 MiB, and is derived from the limitations of internal histograms.
96       */
97      private static final int MAX_CHUNK_SIZE =
98              BUFS_PER_CHUNK * (1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT); // 10 MiB.
99  
100     /**
101      * The capacity if the central queue that allow chunks to be shared across magazines.
102      * The default size is {@link NettyRuntime#availableProcessors()},
103      * and the maximum number of magazines is twice this.
104      * <p>
105      * This means the maximum amount of memory that we can have allocated-but-not-in-use is
106      * 5 * {@link NettyRuntime#availableProcessors()} * {@link #MAX_CHUNK_SIZE} bytes.
107      */
108     private static final int CENTRAL_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
109             "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors());
110 
111     private static final Object NO_MAGAZINE = Boolean.TRUE;
112 
113     private final ChunkAllocator chunkAllocator;
114     private final Queue<Chunk> centralQueue;
115     private final StampedLock magazineExpandLock;
116     private volatile Magazine[] magazines;
117     private final FastThreadLocal<Object> threadLocalMagazine;
118     private final Set<Magazine> liveCachedMagazines;
119 
120     AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
121         ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
122         ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
123         this.chunkAllocator = chunkAllocator;
124         centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
125         magazineExpandLock = new StampedLock();
126         if (magazineCaching != MagazineCaching.None) {
127             assert magazineCaching == MagazineCaching.EventLoopThreads ||
128                    magazineCaching == MagazineCaching.FastThreadLocalThreads;
129             final boolean cachedMagazinesNonEventLoopThreads =
130                     magazineCaching == MagazineCaching.FastThreadLocalThreads;
131             final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
132             threadLocalMagazine = new FastThreadLocal<Object>() {
133                 @Override
134                 protected Object initialValue() {
135                     if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
136                         Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
137                         liveMagazines.add(mag);
138                         return mag;
139                     }
140                     return NO_MAGAZINE;
141                 }
142 
143                 @Override
144                 protected void onRemoval(final Object value) throws Exception {
145                     if (value != NO_MAGAZINE) {
146                         liveMagazines.remove(value);
147                     }
148                 }
149             };
150             liveCachedMagazines = liveMagazines;
151         } else {
152             threadLocalMagazine = null;
153             liveCachedMagazines = null;
154         }
155         Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
156         for (int i = 0; i < mags.length; i++) {
157             mags[i] = new Magazine(this);
158         }
159         magazines = mags;
160     }
161 
162     /**
163      * Create a thread-safe multi-producer, multi-consumer queue to hold chunks that spill over from the
164      * internal Magazines.
165      * <p>
166      * Each Magazine can only hold two chunks at any one time: the chunk it currently allocates from,
167      * and the next-in-line chunk which will be used for allocation once the current one has been used up.
168      * This queue will be used by magazines to share any excess chunks they allocate, so that they don't need to
169      * allocate new chunks when their current and next-in-line chunks have both been used up.
170      * <p>
171      * The simplest implementation of this method is to return a new {@link ConcurrentLinkedQueue}.
172      * However, the {@code CLQ} is unbounded, and this means there's no limit to how many chunks can be cached in this
173      * queue.
174      * <p>
175      * Each chunk in this queue can be up to {@link #MAX_CHUNK_SIZE} in size, so it is recommended to use a bounded
176      * queue to limit the maximum memory usage.
177      * <p>
178      * The default implementation will create a bounded queue with a capacity of {@link #CENTRAL_QUEUE_CAPACITY}.
179      *
180      * @return A new multi-producer, multi-consumer queue.
181      */
182     private static Queue<Chunk> createSharedChunkQueue() {
183         return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
184     }
185 
186     ByteBuf allocate(int size, int maxCapacity) {
187         if (size <= MAX_CHUNK_SIZE) {
188             Thread currentThread = Thread.currentThread();
189             boolean willCleanupFastThreadLocals = FastThreadLocalThread.willCleanupFastThreadLocals(currentThread);
190             AdaptiveByteBuf buf = AdaptiveByteBuf.newInstance(willCleanupFastThreadLocals);
191             AdaptiveByteBuf result = allocate(size, maxCapacity, currentThread, buf);
192             if (result != null) {
193                 return result;
194             }
195             // Return the buffer we pulled from the recycler but didn't use.
196             buf.release();
197         }
198         // The magazines failed us, or the buffer is too big to be pooled.
199         return chunkAllocator.allocate(size, maxCapacity);
200     }
201 
202     private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
203         int sizeBucket = AllocationStatistics.sizeBucket(size); // Compute outside of Magazine lock for better ILP.
204         FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
205         if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
206             Object mag = threadLocalMagazine.get();
207             if (mag != NO_MAGAZINE) {
208                 return ((Magazine) mag).allocate(size, sizeBucket, maxCapacity, buf);
209             }
210         }
211         long threadId = currentThread.getId();
212         Magazine[] mags;
213         int expansions = 0;
214         do {
215             mags = magazines;
216             int mask = mags.length - 1;
217             int index = (int) (threadId & mask);
218             for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
219                 Magazine mag = mags[index + i & mask];
220                 long writeLock = mag.tryWriteLock();
221                 if (writeLock != 0) {
222                     try {
223                         return mag.allocate(size, sizeBucket, maxCapacity, buf);
224                     } finally {
225                         mag.unlockWrite(writeLock);
226                     }
227                 }
228             }
229             expansions++;
230         } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
231         return null;
232     }
233 
234     /**
235      * Allocate into the given buffer. Used by {@link AdaptiveByteBuf#capacity(int)}.
236      */
237     void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
238         Magazine magazine = into.chunk.magazine;
239         AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
240         if (result == null) {
241             // Create a one-off chunk for this allocation.
242             AbstractByteBuf innerChunk = (AbstractByteBuf) chunkAllocator.allocate(size, maxCapacity);
243             Chunk chunk = new Chunk(innerChunk, magazine, false);
244             chunk.readInitInto(into, size, maxCapacity);
245         }
246     }
247 
248     long usedMemory() {
249         long sum = 0;
250         for (Chunk chunk : centralQueue) {
251             sum += chunk.capacity();
252         }
253         for (Magazine magazine : magazines) {
254             sum += magazine.usedMemory.get();
255         }
256         if (liveCachedMagazines != null) {
257             for (Magazine magazine : liveCachedMagazines) {
258                 sum += magazine.usedMemory.get();
259             }
260         }
261         return sum;
262     }
263 
264     private boolean tryExpandMagazines(int currentLength) {
265         if (currentLength >= MAX_STRIPES) {
266             return true;
267         }
268         long writeLock = magazineExpandLock.tryWriteLock();
269         if (writeLock != 0) {
270             try {
271                 Magazine[] mags = magazines;
272                 if (mags.length >= MAX_STRIPES || mags.length > currentLength) {
273                     return true;
274                 }
275                 Magazine[] expanded = Arrays.copyOf(mags, mags.length * 2);
276                 for (int i = mags.length, m = expanded.length; i < m; i++) {
277                     expanded[i] = new Magazine(this);
278                 }
279                 magazines = expanded;
280             } finally {
281                 magazineExpandLock.unlockWrite(writeLock);
282             }
283         }
284         return true;
285     }
286 
287     private boolean offerToQueue(Chunk buffer) {
288         return centralQueue.offer(buffer);
289     }
290 
291     @SuppressWarnings("checkstyle:finalclass") // Checkstyle mistakenly believes this class should be final.
292     private static class AllocationStatistics extends StampedLock {
293         private static final long serialVersionUID = -8319929980932269688L;
294         private static final int MIN_DATUM_TARGET = 1024;
295         private static final int MAX_DATUM_TARGET = 65534;
296         private static final int INIT_DATUM_TARGET = 8192;
297         private static final int HISTO_MIN_BUCKET_SHIFT = 13; // Smallest bucket is 1 << 13 = 8192 bytes in size.
298         private static final int HISTO_MAX_BUCKET_SHIFT = 20; // Biggest bucket is 1 << 20 = 1 MiB bytes in size.
299         private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT; // 8 buckets.
300         private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
301 
302         protected final AdaptivePoolingAllocator parent;
303         private final boolean shareable;
304         private final short[][] histos = {
305                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
306                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
307         };
308         private short[] histo = histos[0];
309         private final int[] sums = new int[HISTO_BUCKET_COUNT];
310 
311         private int histoIndex;
312         private int datumCount;
313         private int datumTarget = INIT_DATUM_TARGET;
314         private volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
315         protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
316 
317         private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
318             this.parent = parent;
319             this.shareable = shareable;
320         }
321 
322         protected void recordAllocationSize(int bucket) {
323             histo[bucket]++;
324             if (datumCount++ == datumTarget) {
325                 rotateHistograms();
326             }
327         }
328 
329         static int sizeBucket(int size) {
330             // Minimum chunk size is 128 KiB. We'll only make bigger chunks if the 99-percentile is 16 KiB or greater,
331             // so we truncate and roll up the bottom part of the histogram to 8 KiB.
332             // The upper size band is 1 MiB, and that gives us exactly 8 size buckets,
333             // which is a magical number for JIT optimisations.
334             int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & HISTO_MAX_BUCKET_MASK;
335             return Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize);
336         }
337 
338         private void rotateHistograms() {
339             short[][] hs = histos;
340             for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
341                 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
342             }
343             int sum = 0;
344             for (int count : sums) {
345                 sum  += count;
346             }
347             int targetPercentile = (int) (sum * 0.99);
348             int sizeBucket = 0;
349             for (; sizeBucket < sums.length; sizeBucket++) {
350                 if (sums[sizeBucket] > targetPercentile) {
351                     break;
352                 }
353                 targetPercentile -= sums[sizeBucket];
354             }
355             int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
356             int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
357             localPrefChunkSize = prefChunkSize;
358             if (shareable) {
359                 for (Magazine mag : parent.magazines) {
360                     prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
361                 }
362             }
363             if (sharedPrefChunkSize != prefChunkSize) {
364                 // Preferred chunk size changed. Increase check frequency.
365                 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
366                 sharedPrefChunkSize = prefChunkSize;
367             } else {
368                 // Preferred chunk size did not change. Check less often.
369                 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
370             }
371 
372             histoIndex = histoIndex + 1 & 3;
373             histo = histos[histoIndex];
374             datumCount = 0;
375             Arrays.fill(histo, (short) 0);
376         }
377 
378         /**
379          * Get the preferred chunk size, based on statistics from the {@linkplain #recordAllocationSize(int) recorded}
380          * allocation sizes.
381          * <p>
382          * This method must be thread-safe.
383          *
384          * @return The currently preferred chunk allocation size.
385          */
386         protected int preferredChunkSize() {
387             return sharedPrefChunkSize;
388         }
389     }
390 
391     private static final class Magazine extends AllocationStatistics {
392         private static final long serialVersionUID = -4068223712022528165L;
393         private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
394 
395         static {
396             NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
397         }
398         private Chunk current;
399         @SuppressWarnings("unused") // updated via NEXT_IN_LINE
400         private volatile Chunk nextInLine;
401         private final AtomicLong usedMemory;
402 
403         Magazine(AdaptivePoolingAllocator parent) {
404             this(parent, true);
405         }
406 
407         Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
408             super(parent, shareable);
409             usedMemory = new AtomicLong();
410         }
411 
412         public AdaptiveByteBuf allocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
413             recordAllocationSize(sizeBucket);
414             Chunk curr = current;
415             if (curr != null && curr.remainingCapacity() >= size) {
416                 if (curr.remainingCapacity() == size) {
417                     current = null;
418                     try {
419                         return curr.readInitInto(buf, size, maxCapacity);
420                     } finally {
421                         curr.release();
422                     }
423                 }
424                 return curr.readInitInto(buf, size, maxCapacity);
425             }
426             if (curr != null) {
427                 curr.release();
428             }
429             if (nextInLine != null) {
430                 curr = NEXT_IN_LINE.getAndSet(this, null);
431             } else {
432                 curr = parent.centralQueue.poll();
433                 if (curr == null) {
434                     curr = newChunkAllocation(size);
435                 }
436             }
437             current = curr;
438             final AdaptiveByteBuf result;
439             if (curr.remainingCapacity() > size) {
440                 result = curr.readInitInto(buf, size, maxCapacity);
441             } else if (curr.remainingCapacity() == size) {
442                 result = curr.readInitInto(buf, size, maxCapacity);
443                 curr.release();
444                 current = null;
445             } else {
446                 Chunk newChunk = newChunkAllocation(size);
447                 result = newChunk.readInitInto(buf, size, maxCapacity);
448                 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
449                     curr.release();
450                     current = newChunk;
451                 } else if (!(boolean) NEXT_IN_LINE.compareAndSet(this, null, newChunk)) {
452                     if (!parent.offerToQueue(newChunk)) {
453                         // Next-in-line is occupied AND the central queue is full.
454                         // Rare that we should get here, but we'll only do one allocation out of this chunk, then.
455                         newChunk.release();
456                     }
457                 }
458             }
459             return result;
460         }
461 
462         private Chunk newChunkAllocation(int promptingSize) {
463             int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
464             ChunkAllocator chunkAllocator = parent.chunkAllocator;
465             Chunk chunk = new Chunk((AbstractByteBuf) chunkAllocator.allocate(size, size), this, true);
466             return chunk;
467         }
468 
469         boolean trySetNextInLine(Chunk buffer) {
470             return NEXT_IN_LINE.compareAndSet(this, null, buffer);
471         }
472     }
473 
474     private static final class Chunk {
475 
476         /**
477          * We're using 2 separate counters for reference counting, one for the up-count and one for the down-count,
478          * in order to speed up the borrowing, which shouldn't need atomic operations, being single-threaded.
479          */
480         private static final AtomicIntegerFieldUpdater<Chunk> REF_CNT_UP_UPDATER =
481                 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCntUp");
482         private static final AtomicIntegerFieldUpdater<Chunk> REF_CNT_DOWN_UPDATER =
483                 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCntDown");
484 
485         private final AbstractByteBuf delegate;
486         private final Magazine magazine;
487         private final int capacity;
488         private final boolean pooled;
489         private int allocatedBytes;
490         private volatile int refCntUp;
491         private volatile int refCntDown;
492 
493         Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
494             this.delegate = delegate;
495             this.magazine = magazine;
496             this.pooled = pooled;
497             this.capacity = delegate.capacity();
498             magazine.usedMemory.getAndAdd(capacity);
499             REF_CNT_UP_UPDATER.lazySet(this, 1);
500         }
501 
502         protected void deallocate() {
503             Magazine mag = magazine;
504             AdaptivePoolingAllocator parent = mag.parent;
505             int chunkSize = mag.preferredChunkSize();
506             int memSize = delegate.capacity();
507             if (!pooled || memSize < chunkSize || memSize > chunkSize + (chunkSize >> 1)) {
508                 // Drop the chunk if the parent allocator is closed, or if the chunk is smaller than the
509                 // preferred chunk size, or over 50% larger than the preferred chunk size.
510                 mag.usedMemory.getAndAdd(-capacity());
511                 delegate.release();
512             } else {
513                 REF_CNT_UP_UPDATER.lazySet(this, 1);
514                 REF_CNT_DOWN_UPDATER.lazySet(this, 0);
515                 delegate.setIndex(0, 0);
516                 allocatedBytes = 0;
517                 if (!mag.trySetNextInLine(this)) {
518                     if (!parent.offerToQueue(this)) {
519                         // The central queue is full. Drop the memory with the original Drop instance.
520                         delegate.release();
521                     }
522                 }
523             }
524         }
525 
526         public AdaptiveByteBuf readInitInto(AdaptiveByteBuf buf, int size, int maxCapacity) {
527             int startIndex = allocatedBytes;
528             allocatedBytes = startIndex + size;
529             unguardedRetain();
530             buf.init(delegate, this, 0, 0, startIndex, size, maxCapacity);
531             return buf;
532         }
533 
534         public int remainingCapacity() {
535             return capacity - allocatedBytes;
536         }
537 
538         public int capacity() {
539             return capacity;
540         }
541 
542         private void unguardedRetain() {
543             REF_CNT_UP_UPDATER.lazySet(this, refCntUp + 1);
544         }
545 
546         public void release() {
547             int refCntDown;
548             boolean deallocate;
549             do {
550                 int refCntUp = this.refCntUp;
551                 refCntDown = this.refCntDown;
552                 int remaining = refCntUp - refCntDown;
553                 if (remaining <= 0) {
554                     throw new IllegalStateException("RefCnt is already 0");
555                 }
556                 deallocate = remaining == 1;
557             } while (!REF_CNT_DOWN_UPDATER.compareAndSet(this, refCntDown, refCntDown + 1));
558             if (deallocate) {
559                 deallocate();
560             }
561         }
562     }
563 
564     static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
565         static final ObjectPool<AdaptiveByteBuf> RECYCLER = ObjectPool.newPool(
566                 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
567                     @Override
568                     public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
569                         return new AdaptiveByteBuf(handle);
570                     }
571                 });
572 
573         static AdaptiveByteBuf newInstance(boolean useThreadLocal) {
574             if (useThreadLocal) {
575                 AdaptiveByteBuf buf = RECYCLER.get();
576                 buf.resetRefCnt();
577                 buf.discardMarks();
578                 return buf;
579             }
580             return new AdaptiveByteBuf(null);
581         }
582 
583         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
584 
585         int adjustment;
586         private AbstractByteBuf rootParent;
587         private Chunk chunk;
588         private int length;
589         private ByteBuffer tmpNioBuf;
590 
591         AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
592             super(0);
593             handle = recyclerHandle;
594         }
595 
596         void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
597                   int adjustment, int capacity, int maxCapacity) {
598             this.adjustment = adjustment;
599             chunk = wrapped;
600             length = capacity;
601             maxCapacity(maxCapacity);
602             setIndex0(readerIndex, writerIndex);
603             rootParent = unwrapped;
604             tmpNioBuf = rootParent.internalNioBuffer(adjustment, capacity).slice();
605         }
606 
607         @Override
608         public int capacity() {
609             return length;
610         }
611 
612         @Override
613         public ByteBuf capacity(int newCapacity) {
614             if (newCapacity == capacity()) {
615                 ensureAccessible();
616                 return this;
617             }
618             checkNewCapacity(newCapacity);
619             if (newCapacity < capacity()) {
620                 length = newCapacity;
621                 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
622                 return this;
623             }
624 
625             // Reallocation required.
626             ByteBuffer data = tmpNioBuf;
627             data.clear();
628             tmpNioBuf = null;
629             Chunk chunk = this.chunk;
630             Magazine magazine = chunk.magazine;
631             AdaptivePoolingAllocator allocator = magazine.parent;
632             int readerIndex = this.readerIndex;
633             int writerIndex = this.writerIndex;
634             allocator.allocate(newCapacity, maxCapacity(), this);
635             tmpNioBuf.put(data);
636             tmpNioBuf.clear();
637             chunk.release();
638             this.readerIndex = readerIndex;
639             this.writerIndex = writerIndex;
640             return this;
641         }
642 
643         @Override
644         public ByteBufAllocator alloc() {
645             return rootParent.alloc();
646         }
647 
648         @Override
649         public ByteOrder order() {
650             return rootParent.order();
651         }
652 
653         @Override
654         public ByteBuf unwrap() {
655             return null;
656         }
657 
658         @Override
659         public boolean isDirect() {
660             return rootParent.isDirect();
661         }
662 
663         @Override
664         public int arrayOffset() {
665             return idx(rootParent.arrayOffset());
666         }
667 
668         @Override
669         public boolean hasMemoryAddress() {
670             return rootParent.hasMemoryAddress();
671         }
672 
673         @Override
674         public long memoryAddress() {
675             ensureAccessible();
676             return rootParent.memoryAddress() + adjustment;
677         }
678 
679         @Override
680         public ByteBuffer nioBuffer(int index, int length) {
681             checkIndex(index, length);
682             return rootParent.nioBuffer(idx(index), length);
683         }
684 
685         @Override
686         public ByteBuffer internalNioBuffer(int index, int length) {
687             checkIndex(index, length);
688             return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
689         }
690 
691         private ByteBuffer internalNioBuffer() {
692             return (ByteBuffer) tmpNioBuf.clear();
693         }
694 
695         @Override
696         public ByteBuffer[] nioBuffers(int index, int length) {
697             checkIndex(index, length);
698             return rootParent.nioBuffers(idx(index), length);
699         }
700 
701         @Override
702         public boolean hasArray() {
703             return rootParent.hasArray();
704         }
705 
706         @Override
707         public byte[] array() {
708             ensureAccessible();
709             return rootParent.array();
710         }
711 
712         @Override
713         public ByteBuf copy(int index, int length) {
714             checkIndex(index, length);
715             return rootParent.copy(idx(index), length);
716         }
717 
718         @Override
719         public ByteBuf slice(int index, int length) {
720             checkIndex(index, length);
721             return new PooledNonRetainedSlicedByteBuf(this, rootParent, idx(index), length);
722         }
723 
724         @Override
725         public ByteBuf retainedSlice(int index, int length) {
726             return slice(index, length).retain();
727         }
728 
729         @Override
730         public ByteBuf duplicate() {
731             ensureAccessible();
732             return new PooledNonRetainedDuplicateByteBuf(this, this).setIndex(readerIndex(), writerIndex());
733         }
734 
735         @Override
736         public ByteBuf retainedDuplicate() {
737             return duplicate().retain();
738         }
739 
740         @Override
741         public int nioBufferCount() {
742             return rootParent.nioBufferCount();
743         }
744 
745         @Override
746         public byte getByte(int index) {
747             checkIndex(index, 1);
748             return rootParent.getByte(idx(index));
749         }
750 
751         @Override
752         protected byte _getByte(int index) {
753             return rootParent._getByte(idx(index));
754         }
755 
756         @Override
757         public short getShort(int index) {
758             checkIndex(index, 2);
759             return rootParent.getShort(idx(index));
760         }
761 
762         @Override
763         protected short _getShort(int index) {
764             return rootParent._getShort(idx(index));
765         }
766 
767         @Override
768         public short getShortLE(int index) {
769             checkIndex(index, 2);
770             return rootParent.getShortLE(idx(index));
771         }
772 
773         @Override
774         protected short _getShortLE(int index) {
775             return rootParent._getShortLE(idx(index));
776         }
777 
778         @Override
779         public int getUnsignedMedium(int index) {
780             checkIndex(index, 3);
781             return rootParent.getUnsignedMedium(idx(index));
782         }
783 
784         @Override
785         protected int _getUnsignedMedium(int index) {
786             return rootParent._getUnsignedMedium(idx(index));
787         }
788 
789         @Override
790         public int getUnsignedMediumLE(int index) {
791             checkIndex(index, 3);
792             return rootParent.getUnsignedMediumLE(idx(index));
793         }
794 
795         @Override
796         protected int _getUnsignedMediumLE(int index) {
797             return rootParent._getUnsignedMediumLE(idx(index));
798         }
799 
800         @Override
801         public int getInt(int index) {
802             checkIndex(index, 4);
803             return rootParent.getInt(idx(index));
804         }
805 
806         @Override
807         protected int _getInt(int index) {
808             return rootParent._getInt(idx(index));
809         }
810 
811         @Override
812         public int getIntLE(int index) {
813             checkIndex(index, 4);
814             return rootParent.getIntLE(idx(index));
815         }
816 
817         @Override
818         protected int _getIntLE(int index) {
819             return rootParent._getIntLE(idx(index));
820         }
821 
822         @Override
823         public long getLong(int index) {
824             checkIndex(index, 8);
825             return rootParent.getLong(idx(index));
826         }
827 
828         @Override
829         protected long _getLong(int index) {
830             return rootParent._getLong(idx(index));
831         }
832 
833         @Override
834         public long getLongLE(int index) {
835             checkIndex(index, 8);
836             return rootParent.getLongLE(idx(index));
837         }
838 
839         @Override
840         protected long _getLongLE(int index) {
841             return rootParent._getLongLE(idx(index));
842         }
843 
844         @Override
845         public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
846             checkIndex(index, length);
847             rootParent.getBytes(idx(index), dst, dstIndex, length);
848             return this;
849         }
850 
851         @Override
852         public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
853             checkIndex(index, length);
854             rootParent.getBytes(idx(index), dst, dstIndex, length);
855             return this;
856         }
857 
858         @Override
859         public ByteBuf getBytes(int index, ByteBuffer dst) {
860             checkIndex(index, dst.remaining());
861             rootParent.getBytes(idx(index), dst);
862             return this;
863         }
864 
865         @Override
866         public ByteBuf setByte(int index, int value) {
867             checkIndex(index, 1);
868             rootParent.setByte(idx(index), value);
869             return this;
870         }
871 
872         @Override
873         protected void _setByte(int index, int value) {
874             rootParent._setByte(idx(index), value);
875         }
876 
877         @Override
878         public ByteBuf setShort(int index, int value) {
879             checkIndex(index, 2);
880             rootParent.setShort(idx(index), value);
881             return this;
882         }
883 
884         @Override
885         protected void _setShort(int index, int value) {
886             rootParent._setShort(idx(index), value);
887         }
888 
889         @Override
890         public ByteBuf setShortLE(int index, int value) {
891             checkIndex(index, 2);
892             rootParent.setShortLE(idx(index), value);
893             return this;
894         }
895 
896         @Override
897         protected void _setShortLE(int index, int value) {
898             rootParent._setShortLE(idx(index), value);
899         }
900 
901         @Override
902         public ByteBuf setMedium(int index, int value) {
903             checkIndex(index, 3);
904             rootParent.setMedium(idx(index), value);
905             return this;
906         }
907 
908         @Override
909         protected void _setMedium(int index, int value) {
910             rootParent._setMedium(idx(index), value);
911         }
912 
913         @Override
914         public ByteBuf setMediumLE(int index, int value) {
915             checkIndex(index, 3);
916             rootParent.setMediumLE(idx(index), value);
917             return this;
918         }
919 
920         @Override
921         protected void _setMediumLE(int index, int value) {
922             rootParent._setMediumLE(idx(index), value);
923         }
924 
925         @Override
926         public ByteBuf setInt(int index, int value) {
927             checkIndex(index, 4);
928             rootParent.setInt(idx(index), value);
929             return this;
930         }
931 
932         @Override
933         protected void _setInt(int index, int value) {
934             rootParent._setInt(idx(index), value);
935         }
936 
937         @Override
938         public ByteBuf setIntLE(int index, int value) {
939             checkIndex(index, 4);
940             rootParent.setIntLE(idx(index), value);
941             return this;
942         }
943 
944         @Override
945         protected void _setIntLE(int index, int value) {
946             rootParent._setIntLE(idx(index), value);
947         }
948 
949         @Override
950         public ByteBuf setLong(int index, long value) {
951             checkIndex(index, 8);
952             rootParent.setLong(idx(index), value);
953             return this;
954         }
955 
956         @Override
957         protected void _setLong(int index, long value) {
958             rootParent._setLong(idx(index), value);
959         }
960 
961         @Override
962         public ByteBuf setLongLE(int index, long value) {
963             checkIndex(index, 8);
964             rootParent.setLongLE(idx(index), value);
965             return this;
966         }
967 
968         @Override
969         protected void _setLongLE(int index, long value) {
970             rootParent.setLongLE(idx(index), value);
971         }
972 
973         @Override
974         public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
975             checkIndex(index, length);
976             rootParent.setBytes(idx(index), src, srcIndex, length);
977             return this;
978         }
979 
980         @Override
981         public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
982             checkIndex(index, length);
983             rootParent.setBytes(idx(index), src, srcIndex, length);
984             return this;
985         }
986 
987         @Override
988         public ByteBuf setBytes(int index, ByteBuffer src) {
989             checkIndex(index, src.remaining());
990             rootParent.setBytes(idx(index), src);
991             return this;
992         }
993 
994         @Override
995         public ByteBuf getBytes(int index, OutputStream out, int length)
996                 throws IOException {
997             checkIndex(index, length);
998             if (length != 0) {
999                 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1000             }
1001             return this;
1002         }
1003 
1004         @Override
1005         public int getBytes(int index, GatheringByteChannel out, int length)
1006                 throws IOException {
1007             return out.write(internalNioBuffer(index, length).duplicate());
1008         }
1009 
1010         @Override
1011         public int getBytes(int index, FileChannel out, long position, int length)
1012                 throws IOException {
1013             return out.write(internalNioBuffer(index, length).duplicate(), position);
1014         }
1015 
1016         @Override
1017         public int setBytes(int index, InputStream in, int length)
1018                 throws IOException {
1019             checkIndex(index, length);
1020             if (rootParent.hasArray()) {
1021                 return rootParent.setBytes(idx(index), in, length);
1022             }
1023             byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1024             int readBytes = in.read(tmp, 0, length);
1025             if (readBytes <= 0) {
1026                 return readBytes;
1027             }
1028             setBytes(index, tmp, 0, readBytes);
1029             return readBytes;
1030         }
1031 
1032         @Override
1033         public int setBytes(int index, ScatteringByteChannel in, int length)
1034                 throws IOException {
1035             try {
1036                 return in.read(internalNioBuffer(index, length).duplicate());
1037             } catch (ClosedChannelException ignored) {
1038                 return -1;
1039             }
1040         }
1041 
1042         @Override
1043         public int setBytes(int index, FileChannel in, long position, int length)
1044                 throws IOException {
1045             try {
1046                 return in.read(internalNioBuffer(index, length).duplicate(), position);
1047             } catch (ClosedChannelException ignored) {
1048                 return -1;
1049             }
1050         }
1051 
1052         @Override
1053         public int forEachByte(int index, int length, ByteProcessor processor) {
1054             checkIndex(index, length);
1055             int ret = rootParent.forEachByte(idx(index), length, processor);
1056             if (ret < adjustment) {
1057                 return -1;
1058             }
1059             return ret - adjustment;
1060         }
1061 
1062         @Override
1063         public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1064             checkIndex(index, length);
1065             int ret = rootParent.forEachByteDesc(idx(index), length, processor);
1066             if (ret < adjustment) {
1067                 return -1;
1068             }
1069             return ret - adjustment;
1070         }
1071 
1072         @Override
1073         public boolean isContiguous() {
1074             return rootParent.isContiguous();
1075         }
1076 
1077         private int idx(int index) {
1078             return index + adjustment;
1079         }
1080 
1081         @Override
1082         protected void deallocate() {
1083             tmpNioBuf = null;
1084             if (chunk != null) {
1085                 chunk.release();
1086             }
1087             if (handle instanceof Recycler.EnhancedHandle) {
1088                 ((Recycler.EnhancedHandle<?>) handle).unguardedRecycle(this);
1089             } else if (handle != null) {
1090                 handle.recycle(this);
1091             }
1092         }
1093     }
1094 
1095     private static final class PooledNonRetainedDuplicateByteBuf extends UnpooledDuplicatedByteBuf {
1096         private final ReferenceCounted referenceCountDelegate;
1097 
1098         PooledNonRetainedDuplicateByteBuf(ReferenceCounted referenceCountDelegate, AbstractByteBuf buffer) {
1099             super(buffer);
1100             this.referenceCountDelegate = referenceCountDelegate;
1101         }
1102 
1103         @Override
1104         boolean isAccessible0() {
1105             return referenceCountDelegate.refCnt() != 0;
1106         }
1107 
1108         @Override
1109         int refCnt0() {
1110             return referenceCountDelegate.refCnt();
1111         }
1112 
1113         @Override
1114         ByteBuf retain0() {
1115             referenceCountDelegate.retain();
1116             return this;
1117         }
1118 
1119         @Override
1120         ByteBuf retain0(int increment) {
1121             referenceCountDelegate.retain(increment);
1122             return this;
1123         }
1124 
1125         @Override
1126         ByteBuf touch0() {
1127             referenceCountDelegate.touch();
1128             return this;
1129         }
1130 
1131         @Override
1132         ByteBuf touch0(Object hint) {
1133             referenceCountDelegate.touch(hint);
1134             return this;
1135         }
1136 
1137         @Override
1138         boolean release0() {
1139             return referenceCountDelegate.release();
1140         }
1141 
1142         @Override
1143         boolean release0(int decrement) {
1144             return referenceCountDelegate.release(decrement);
1145         }
1146 
1147         @Override
1148         public ByteBuf duplicate() {
1149             ensureAccessible();
1150             return new PooledNonRetainedDuplicateByteBuf(referenceCountDelegate, unwrap());
1151         }
1152 
1153         @Override
1154         public ByteBuf retainedDuplicate() {
1155             return duplicate().retain();
1156         }
1157 
1158         @Override
1159         public ByteBuf slice(int index, int length) {
1160             checkIndex(index, length);
1161             return new PooledNonRetainedSlicedByteBuf(referenceCountDelegate, unwrap(), index, length);
1162         }
1163 
1164         @Override
1165         public ByteBuf retainedSlice() {
1166             // Capacity is not allowed to change for a sliced ByteBuf, so length == capacity()
1167             return retainedSlice(readerIndex(), capacity());
1168         }
1169 
1170         @Override
1171         public ByteBuf retainedSlice(int index, int length) {
1172             return slice(index, length).retain();
1173         }
1174     }
1175 
1176     private static final class PooledNonRetainedSlicedByteBuf extends UnpooledSlicedByteBuf {
1177         private final ReferenceCounted referenceCountDelegate;
1178 
1179         PooledNonRetainedSlicedByteBuf(ReferenceCounted referenceCountDelegate,
1180                                        AbstractByteBuf buffer, int index, int length) {
1181             super(buffer, index, length);
1182             this.referenceCountDelegate = referenceCountDelegate;
1183         }
1184 
1185         @Override
1186         boolean isAccessible0() {
1187             return referenceCountDelegate.refCnt() != 0;
1188         }
1189 
1190         @Override
1191         int refCnt0() {
1192             return referenceCountDelegate.refCnt();
1193         }
1194 
1195         @Override
1196         ByteBuf retain0() {
1197             referenceCountDelegate.retain();
1198             return this;
1199         }
1200 
1201         @Override
1202         ByteBuf retain0(int increment) {
1203             referenceCountDelegate.retain(increment);
1204             return this;
1205         }
1206 
1207         @Override
1208         ByteBuf touch0() {
1209             referenceCountDelegate.touch();
1210             return this;
1211         }
1212 
1213         @Override
1214         ByteBuf touch0(Object hint) {
1215             referenceCountDelegate.touch(hint);
1216             return this;
1217         }
1218 
1219         @Override
1220         boolean release0() {
1221             return referenceCountDelegate.release();
1222         }
1223 
1224         @Override
1225         boolean release0(int decrement) {
1226             return referenceCountDelegate.release(decrement);
1227         }
1228 
1229         @Override
1230         public ByteBuf duplicate() {
1231             ensureAccessible();
1232             return new PooledNonRetainedSlicedByteBuf(referenceCountDelegate, unwrap(), idx(0), capacity())
1233                     .setIndex(readerIndex(), writerIndex());
1234         }
1235 
1236         @Override
1237         public ByteBuf retainedDuplicate() {
1238             return duplicate().retain();
1239         }
1240 
1241         @Override
1242         public ByteBuf slice(int index, int length) {
1243             checkIndex(index, length);
1244             return new PooledNonRetainedSlicedByteBuf(referenceCountDelegate, unwrap(), idx(index), length);
1245         }
1246 
1247         @Override
1248         public ByteBuf retainedSlice() {
1249             // Capacity is not allowed to change for a sliced ByteBuf, so length == capacity()
1250             return retainedSlice(0, capacity());
1251         }
1252 
1253         @Override
1254         public ByteBuf retainedSlice(int index, int length) {
1255             return slice(index, length).retain();
1256         }
1257     }
1258 
1259     /**
1260      * The strategy for how {@link AdaptivePoolingAllocator} should allocate chunk buffers.
1261      */
1262     public interface ChunkAllocator {
1263         /**
1264          * Allocate a buffer for a chunk. This can be any kind of {@link ByteBuf} implementation.
1265          * @param initialCapacity The initial capacity of the returned {@link ByteBuf}.
1266          * @param maxCapacity The maximum capacity of the returned {@link ByteBuf}.
1267          * @return The buffer that represents the chunk memory.
1268          */
1269         ByteBuf allocate(int initialCapacity, int maxCapacity);
1270     }
1271 }