View Javadoc
1   /*
2    * Copyright 2022 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.buffer;
17  
18  import io.netty.util.ByteProcessor;
19  import io.netty.util.IllegalReferenceCountException;
20  import io.netty.util.NettyRuntime;
21  import io.netty.util.Recycler.EnhancedHandle;
22  import io.netty.util.ReferenceCounted;
23  import io.netty.util.concurrent.FastThreadLocal;
24  import io.netty.util.concurrent.FastThreadLocalThread;
25  import io.netty.util.internal.ObjectPool;
26  import io.netty.util.internal.ObjectUtil;
27  import io.netty.util.internal.PlatformDependent;
28  import io.netty.util.internal.ReferenceCountUpdater;
29  import io.netty.util.internal.SuppressJava6Requirement;
30  import io.netty.util.internal.SystemPropertyUtil;
31  import io.netty.util.internal.ThreadExecutorMap;
32  import io.netty.util.internal.ThreadLocalRandom;
33  import io.netty.util.internal.UnstableApi;
34  
35  import java.io.IOException;
36  import java.io.InputStream;
37  import java.io.OutputStream;
38  import java.nio.ByteBuffer;
39  import java.nio.ByteOrder;
40  import java.nio.channels.ClosedChannelException;
41  import java.nio.channels.FileChannel;
42  import java.nio.channels.GatheringByteChannel;
43  import java.nio.channels.ScatteringByteChannel;
44  import java.nio.charset.Charset;
45  import java.util.Arrays;
46  import java.util.Queue;
47  import java.util.Set;
48  import java.util.concurrent.ConcurrentLinkedQueue;
49  import java.util.concurrent.CopyOnWriteArraySet;
50  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
51  import java.util.concurrent.atomic.AtomicLong;
52  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
53  import java.util.concurrent.locks.StampedLock;
54  
55  /**
56   * An auto-tuning pooling allocator, that follows an anti-generational hypothesis.
57   * <p>
58   * The allocator is organized into a list of Magazines, and each magazine has a chunk-buffer that they allocate buffers
59   * from.
60   * <p>
61   * The magazines hold the mutexes that ensure the thread-safety of the allocator, and each thread picks a magazine
62   * based on the id of the thread. This spreads the contention of multi-threaded access across the magazines.
63   * If contention is detected above a certain threshold, the number of magazines are increased in response to the
64   * contention.
65   * <p>
66   * The magazines maintain histograms of the sizes of the allocations they do. The histograms are used to compute the
67   * preferred chunk size. The preferred chunk size is one that is big enough to service 10 allocations of the
68   * 99-percentile size. This way, the chunk size is adapted to the allocation patterns.
69   * <p>
70   * Computing the preferred chunk size is a somewhat expensive operation. Therefore, the frequency with which this is
71   * done, is also adapted to the allocation pattern. If a newly computed preferred chunk is the same as the previous
72   * preferred chunk size, then the frequency is reduced. Otherwise, the frequency is increased.
73   * <p>
74   * This allows the allocator to quickly respond to changes in the application workload,
75   * without suffering undue overhead from maintaining its statistics.
76   * <p>
77   * Since magazines are "relatively thread-local", the allocator has a central queue that allow excess chunks from any
78   * magazine, to be shared with other magazines.
79   * The {@link #createSharedChunkQueue()} method can be overridden to customize this queue.
80   */
81  @SuppressJava6Requirement(reason = "Guarded by version check")
82  @UnstableApi
83  final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.AdaptiveAllocatorApi {
84  
85      enum MagazineCaching {
86          EventLoopThreads,
87          FastThreadLocalThreads,
88          None
89      }
90  
91      /**
92       * The 128 KiB minimum chunk size is chosen to encourage the system allocator to delegate to mmap for chunk
93       * allocations. For instance, glibc will do this.
94       * This pushes any fragmentation from chunk size deviations off physical memory, onto virtual memory,
95       * which is a much, much larger space. Chunks are also allocated in whole multiples of the minimum
96       * chunk size, which itself is a whole multiple of popular page sizes like 4 KiB, 16 KiB, and 64 KiB.
97       */
98      private static final int MIN_CHUNK_SIZE = 128 * 1024;
99      private static final int EXPANSION_ATTEMPTS = 3;
100     private static final int INITIAL_MAGAZINES = 4;
101     private static final int RETIRE_CAPACITY = 4 * 1024;
102     private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
103     private static final int BUFS_PER_CHUNK = 10; // For large buffers, aim to have about this many buffers per chunk.
104 
105     /**
106      * The maximum size of a pooled chunk, in bytes. Allocations bigger than this will never be pooled.
107      * <p>
108      * This number is 10 MiB, and is derived from the limitations of internal histograms.
109      */
110     private static final int MAX_CHUNK_SIZE =
111             BUFS_PER_CHUNK * (1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT); // 10 MiB.
112 
113     /**
114      * The capacity if the central queue that allow chunks to be shared across magazines.
115      * The default size is {@link NettyRuntime#availableProcessors()},
116      * and the maximum number of magazines is twice this.
117      * <p>
118      * This means the maximum amount of memory that we can have allocated-but-not-in-use is
119      * 5 * {@link NettyRuntime#availableProcessors()} * {@link #MAX_CHUNK_SIZE} bytes.
120      */
121     private static final int CENTRAL_QUEUE_CAPACITY = Math.max(2, SystemPropertyUtil.getInt(
122             "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors()));
123 
124     /**
125      * The capacity if the magazine local buffer queue. This queue just pools the outer ByteBuf instance and not
126      * the actual memory and so helps to reduce GC pressure.
127      */
128     private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
129             "io.netty.allocator.magazineBufferQueueCapacity", 1024);
130 
131     private static final Object NO_MAGAZINE = Boolean.TRUE;
132 
133     private final ChunkAllocator chunkAllocator;
134     private final Queue<Chunk> centralQueue;
135     private final StampedLock magazineExpandLock;
136     private volatile Magazine[] magazines;
137     private final FastThreadLocal<Object> threadLocalMagazine;
138     private final Set<Magazine> liveCachedMagazines;
139     private volatile boolean freed;
140 
141     static {
142         if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
143             throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
144                     + " (expected: >= " + 2 + ')');
145         }
146     }
147 
148     AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
149         ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
150         ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
151         this.chunkAllocator = chunkAllocator;
152         centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
153         magazineExpandLock = new StampedLock();
154         if (magazineCaching != MagazineCaching.None) {
155             assert magazineCaching == MagazineCaching.EventLoopThreads ||
156                    magazineCaching == MagazineCaching.FastThreadLocalThreads;
157             final boolean cachedMagazinesNonEventLoopThreads =
158                     magazineCaching == MagazineCaching.FastThreadLocalThreads;
159             final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
160             threadLocalMagazine = new FastThreadLocal<Object>() {
161                 @Override
162                 protected Object initialValue() {
163                     if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
164                         if (!FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread())) {
165                             // To prevent potential leak, we will not use thread-local magazine.
166                             return NO_MAGAZINE;
167                         }
168                         Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
169                         liveMagazines.add(mag);
170                         return mag;
171                     }
172                     return NO_MAGAZINE;
173                 }
174 
175                 @Override
176                 protected void onRemoval(final Object value) throws Exception {
177                     if (value != NO_MAGAZINE) {
178                         liveMagazines.remove(value);
179                     }
180                 }
181             };
182             liveCachedMagazines = liveMagazines;
183         } else {
184             threadLocalMagazine = null;
185             liveCachedMagazines = null;
186         }
187         Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
188         for (int i = 0; i < mags.length; i++) {
189             mags[i] = new Magazine(this);
190         }
191         magazines = mags;
192     }
193 
194     /**
195      * Create a thread-safe multi-producer, multi-consumer queue to hold chunks that spill over from the
196      * internal Magazines.
197      * <p>
198      * Each Magazine can only hold two chunks at any one time: the chunk it currently allocates from,
199      * and the next-in-line chunk which will be used for allocation once the current one has been used up.
200      * This queue will be used by magazines to share any excess chunks they allocate, so that they don't need to
201      * allocate new chunks when their current and next-in-line chunks have both been used up.
202      * <p>
203      * The simplest implementation of this method is to return a new {@link ConcurrentLinkedQueue}.
204      * However, the {@code CLQ} is unbounded, and this means there's no limit to how many chunks can be cached in this
205      * queue.
206      * <p>
207      * Each chunk in this queue can be up to {@link #MAX_CHUNK_SIZE} in size, so it is recommended to use a bounded
208      * queue to limit the maximum memory usage.
209      * <p>
210      * The default implementation will create a bounded queue with a capacity of {@link #CENTRAL_QUEUE_CAPACITY}.
211      *
212      * @return A new multi-producer, multi-consumer queue.
213      */
214     private static Queue<Chunk> createSharedChunkQueue() {
215         return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
216     }
217 
218     @Override
219     public ByteBuf allocate(int size, int maxCapacity) {
220         return allocate(size, maxCapacity, Thread.currentThread(), null);
221     }
222 
223     private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
224         if (size <= MAX_CHUNK_SIZE) {
225             int sizeBucket = AllocationStatistics.sizeBucket(size); // Compute outside of Magazine lock for better ILP.
226             FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
227             if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
228                 Object mag = threadLocalMagazine.get();
229                 if (mag != NO_MAGAZINE) {
230                     Magazine magazine = (Magazine) mag;
231                     if (buf == null) {
232                         buf = magazine.newBuffer();
233                     }
234                     boolean allocated = magazine.tryAllocate(size, sizeBucket, maxCapacity, buf);
235                     assert allocated : "Allocation of threadLocalMagazine must always succeed";
236                     return buf;
237                 }
238             }
239             long threadId = currentThread.getId();
240             Magazine[] mags;
241             int expansions = 0;
242             do {
243                 mags = magazines;
244                 int mask = mags.length - 1;
245                 int index = (int) (threadId & mask);
246                 for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
247                     Magazine mag = mags[index + i & mask];
248                     if (buf == null) {
249                         buf = mag.newBuffer();
250                     }
251                     if (mag.tryAllocate(size, sizeBucket, maxCapacity, buf)) {
252                         // Was able to allocate.
253                         return buf;
254                     }
255                 }
256                 expansions++;
257             } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
258         }
259 
260         // The magazines failed us, or the buffer is too big to be pooled.
261         return allocateFallback(size, maxCapacity, currentThread, buf);
262     }
263 
264     private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
265         // If we don't already have a buffer, obtain one from the most conveniently available magazine.
266         Magazine magazine;
267         if (buf != null) {
268             Chunk chunk = buf.chunk;
269             if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
270                 magazine = getFallbackMagazine(currentThread);
271             }
272         } else {
273             magazine = getFallbackMagazine(currentThread);
274             buf = magazine.newBuffer();
275         }
276         // Create a one-off chunk for this allocation.
277         AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
278         Chunk chunk = new Chunk(innerChunk, magazine, false);
279         try {
280             chunk.readInitInto(buf, size, maxCapacity);
281         } finally {
282             // As the chunk is an one-off we need to always call release explicitly as readInitInto(...)
283             // will take care of retain once when successful. Once The AdaptiveByteBuf is released it will
284             // completely release the Chunk and so the contained innerChunk.
285             chunk.release();
286         }
287         return buf;
288     }
289 
290     private Magazine getFallbackMagazine(Thread currentThread) {
291         Object tlMag;
292         FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
293         if (threadLocalMagazine != null &&
294                 currentThread instanceof FastThreadLocalThread &&
295                 (tlMag = threadLocalMagazine.get()) != NO_MAGAZINE) {
296             return (Magazine) tlMag;
297         }
298         Magazine[] mags = magazines;
299         return mags[(int) currentThread.getId() & mags.length - 1];
300     }
301 
302     /**
303      * Allocate into the given buffer. Used by {@link AdaptiveByteBuf#capacity(int)}.
304      */
305     void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
306         AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
307         assert result == into: "Re-allocation created separate buffer instance";
308     }
309 
310     @Override
311     public long usedMemory() {
312         long sum = 0;
313         for (Chunk chunk : centralQueue) {
314             sum += chunk.capacity();
315         }
316         for (Magazine magazine : magazines) {
317             sum += magazine.usedMemory.get();
318         }
319         if (liveCachedMagazines != null) {
320             for (Magazine magazine : liveCachedMagazines) {
321                 sum += magazine.usedMemory.get();
322             }
323         }
324         return sum;
325     }
326 
327     private boolean tryExpandMagazines(int currentLength) {
328         if (currentLength >= MAX_STRIPES) {
329             return true;
330         }
331         final Magazine[] mags;
332         long writeLock = magazineExpandLock.tryWriteLock();
333         if (writeLock != 0) {
334             try {
335                 mags = magazines;
336                 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
337                     return true;
338                 }
339                 int preferredChunkSize = mags[0].sharedPrefChunkSize;
340                 Magazine[] expanded = new Magazine[mags.length * 2];
341                 for (int i = 0, l = expanded.length; i < l; i++) {
342                     Magazine m = new Magazine(this);
343                     m.localPrefChunkSize = preferredChunkSize;
344                     m.sharedPrefChunkSize = preferredChunkSize;
345                     expanded[i] = m;
346                 }
347                 magazines = expanded;
348             } finally {
349                 magazineExpandLock.unlockWrite(writeLock);
350             }
351             for (Magazine magazine : mags) {
352                 magazine.free();
353             }
354         }
355         return true;
356     }
357 
358     private boolean offerToQueue(Chunk buffer) {
359         if (freed) {
360             return false;
361         }
362         // The Buffer should not be used anymore, let's add an assert to so we guard against bugs in the future.
363         assert buffer.allocatedBytes == 0;
364         assert buffer.magazine == null;
365 
366         boolean isAdded = centralQueue.offer(buffer);
367         if (freed && isAdded) {
368             // Help to free the centralQueue.
369             freeCentralQueue();
370         }
371         return isAdded;
372     }
373 
374     // Ensure that we release all previous pooled resources when this object is finalized. This is needed as otherwise
375     // we might end up with leaks. While these leaks are usually harmless in reality it would still at least be
376     // very confusing for users.
377     @Override
378     protected void finalize() throws Throwable {
379         try {
380             super.finalize();
381         } finally {
382             free();
383         }
384     }
385 
386     private void free() {
387         freed = true;
388         long stamp = magazineExpandLock.writeLock();
389         try {
390             Magazine[] mags = magazines;
391             for (Magazine magazine : mags) {
392                 magazine.free();
393             }
394         } finally {
395             magazineExpandLock.unlockWrite(stamp);
396         }
397         freeCentralQueue();
398     }
399 
400     private void freeCentralQueue() {
401         for (;;) {
402             Chunk chunk = centralQueue.poll();
403             if (chunk == null) {
404                 break;
405             }
406             chunk.release();
407         }
408     }
409 
410     static int sizeBucket(int size) {
411         return AllocationStatistics.sizeBucket(size);
412     }
413 
414     @SuppressWarnings("checkstyle:finalclass") // Checkstyle mistakenly believes this class should be final.
415     private static class AllocationStatistics {
416         private static final int MIN_DATUM_TARGET = 1024;
417         private static final int MAX_DATUM_TARGET = 65534;
418         private static final int INIT_DATUM_TARGET = 9;
419         private static final int HISTO_MIN_BUCKET_SHIFT = 13; // Smallest bucket is 1 << 13 = 8192 bytes in size.
420         private static final int HISTO_MAX_BUCKET_SHIFT = 20; // Biggest bucket is 1 << 20 = 1 MiB bytes in size.
421         private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT; // 8 buckets.
422         private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
423         private static final int SIZE_MAX_MASK = MAX_CHUNK_SIZE - 1;
424 
425         protected final AdaptivePoolingAllocator parent;
426         private final boolean shareable;
427         private final short[][] histos = {
428                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
429                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
430         };
431         private short[] histo = histos[0];
432         private final int[] sums = new int[HISTO_BUCKET_COUNT];
433 
434         private int histoIndex;
435         private int datumCount;
436         private int datumTarget = INIT_DATUM_TARGET;
437         protected volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
438         protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
439 
440         private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
441             this.parent = parent;
442             this.shareable = shareable;
443         }
444 
445         protected void recordAllocationSize(int bucket) {
446             histo[bucket]++;
447             if (datumCount++ == datumTarget) {
448                 rotateHistograms();
449             }
450         }
451 
452         static int sizeBucket(int size) {
453             if (size == 0) {
454                 return 0;
455             }
456             // Minimum chunk size is 128 KiB. We'll only make bigger chunks if the 99-percentile is 16 KiB or greater,
457             // so we truncate and roll up the bottom part of the histogram to 8 KiB.
458             // The upper size band is 1 MiB, and that gives us exactly 8 size buckets,
459             // which is a magical number for JIT optimisations.
460             int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & SIZE_MAX_MASK;
461             return Math.min(Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize), HISTO_MAX_BUCKET_MASK);
462         }
463 
464         private void rotateHistograms() {
465             short[][] hs = histos;
466             for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
467                 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
468             }
469             int sum = 0;
470             for (int count : sums) {
471                 sum  += count;
472             }
473             int targetPercentile = (int) (sum * 0.99);
474             int sizeBucket = 0;
475             for (; sizeBucket < sums.length; sizeBucket++) {
476                 if (sums[sizeBucket] > targetPercentile) {
477                     break;
478                 }
479                 targetPercentile -= sums[sizeBucket];
480             }
481             int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
482             int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
483             localPrefChunkSize = prefChunkSize;
484             if (shareable) {
485                 for (Magazine mag : parent.magazines) {
486                     prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
487                 }
488             }
489             if (sharedPrefChunkSize != prefChunkSize) {
490                 // Preferred chunk size changed. Increase check frequency.
491                 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
492                 sharedPrefChunkSize = prefChunkSize;
493             } else {
494                 // Preferred chunk size did not change. Check less often.
495                 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
496             }
497 
498             histoIndex = histoIndex + 1 & 3;
499             histo = histos[histoIndex];
500             datumCount = 0;
501             Arrays.fill(histo, (short) 0);
502         }
503 
504         /**
505          * Get the preferred chunk size, based on statistics from the {@linkplain #recordAllocationSize(int) recorded}
506          * allocation sizes.
507          * <p>
508          * This method must be thread-safe.
509          *
510          * @return The currently preferred chunk allocation size.
511          */
512         protected int preferredChunkSize() {
513             return sharedPrefChunkSize;
514         }
515     }
516 
517     @SuppressJava6Requirement(reason = "Guarded by version check")
518     private static final class Magazine extends AllocationStatistics {
519         private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
520         static {
521             NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
522         }
523         private static final Chunk MAGAZINE_FREED = new Chunk();
524 
525         private static final ObjectPool<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
526                 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
527                     @Override
528                     public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
529                         return new AdaptiveByteBuf(handle);
530                     }
531                 });
532 
533         private Chunk current;
534         @SuppressWarnings("unused") // updated via NEXT_IN_LINE
535         private volatile Chunk nextInLine;
536         private final AtomicLong usedMemory;
537         private final StampedLock allocationLock;
538         private final Queue<AdaptiveByteBuf> bufferQueue;
539         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
540 
541         Magazine(AdaptivePoolingAllocator parent) {
542             this(parent, true);
543         }
544 
545         Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
546             super(parent, shareable);
547 
548             if (shareable) {
549                 // We only need the StampedLock if this Magazine will be shared across threads.
550                 allocationLock = new StampedLock();
551                 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
552                 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
553                     @Override
554                     public void recycle(AdaptiveByteBuf self) {
555                         bufferQueue.offer(self);
556                     }
557                 };
558             } else {
559                 allocationLock = null;
560                 bufferQueue = null;
561                 handle = null;
562             }
563             usedMemory = new AtomicLong();
564         }
565 
566         public boolean tryAllocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
567             if (allocationLock == null) {
568                 // This magazine is not shared across threads, just allocate directly.
569                 return allocate(size, sizeBucket, maxCapacity, buf);
570             }
571 
572             // Try to retrieve the lock and if successful allocate.
573             long writeLock = allocationLock.tryWriteLock();
574             if (writeLock != 0) {
575                 try {
576                     return allocate(size, sizeBucket, maxCapacity, buf);
577                 } finally {
578                     allocationLock.unlockWrite(writeLock);
579                 }
580             }
581             return allocateWithoutLock(size, maxCapacity, buf);
582         }
583 
584         private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf buf) {
585             Chunk curr = NEXT_IN_LINE.getAndSet(this, null);
586             if (curr == MAGAZINE_FREED) {
587                 // Allocation raced with a stripe-resize that freed this magazine.
588                 restoreMagazineFreed();
589                 return false;
590             }
591             if (curr == null) {
592                 curr = parent.centralQueue.poll();
593                 if (curr == null) {
594                     return false;
595                 }
596                 curr.attachToMagazine(this);
597             }
598             boolean allocated = false;
599             if (curr.remainingCapacity() >= size) {
600                 curr.readInitInto(buf, size, maxCapacity);
601                 allocated = true;
602             }
603             try {
604                 if (curr.remainingCapacity() >= RETIRE_CAPACITY) {
605                     transferToNextInLineOrRelease(curr);
606                     curr = null;
607                 }
608             } finally {
609                 if (curr != null) {
610                     curr.release();
611                 }
612             }
613             return allocated;
614         }
615 
616         private boolean allocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
617             recordAllocationSize(sizeBucket);
618             Chunk curr = current;
619             if (curr != null) {
620                 // We have a Chunk that has some space left.
621                 if (curr.remainingCapacity() > size) {
622                     curr.readInitInto(buf, size, maxCapacity);
623                     // We still have some bytes left that we can use for the next allocation, just early return.
624                     return true;
625                 }
626 
627                 // At this point we know that this will be the last time current will be used, so directly set it to
628                 // null and release it once we are done.
629                 current = null;
630                 if (curr.remainingCapacity() == size) {
631                     try {
632                         curr.readInitInto(buf, size, maxCapacity);
633                         return true;
634                     } finally {
635                         curr.release();
636                     }
637                 }
638 
639                 // Check if we either retain the chunk in the nextInLine cache or releasing it.
640                 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
641                     curr.release();
642                 } else {
643                     // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
644                     // This method will release curr if this is not the case
645                     transferToNextInLineOrRelease(curr);
646                 }
647             }
648 
649             assert current == null;
650             // The fast-path for allocations did not work.
651             //
652             // Try to fetch the next "Magazine local" Chunk first, if this this fails as we don't have
653             // one setup we will poll our centralQueue. If this fails as well we will just allocate a new Chunk.
654             //
655             // In any case we will store the Chunk as the current so it will be used again for the next allocation and
656             // so be "reserved" by this Magazine for exclusive usage.
657             curr = NEXT_IN_LINE.getAndSet(this, null);
658             if (curr != null) {
659                 if (curr == MAGAZINE_FREED) {
660                     // Allocation raced with a stripe-resize that freed this magazine.
661                     restoreMagazineFreed();
662                     return false;
663                 }
664 
665                 if (curr.remainingCapacity() > size) {
666                     // We have a Chunk that has some space left.
667                     curr.readInitInto(buf, size, maxCapacity);
668                     current = curr;
669                     return true;
670                 }
671 
672                 if (curr.remainingCapacity() == size) {
673                     // At this point we know that this will be the last time curr will be used, so directly set it to
674                     // null and release it once we are done.
675                     try {
676                         curr.readInitInto(buf, size, maxCapacity);
677                         return true;
678                     } finally {
679                         // Release in a finally block so even if readInitInto(...) would throw we would still correctly
680                         // release the current chunk before null it out.
681                         curr.release();
682                     }
683                 } else {
684                     // Release it as it's too small.
685                     curr.release();
686                 }
687             }
688 
689             // Now try to poll from the central queue first
690             curr = parent.centralQueue.poll();
691             if (curr == null) {
692                 curr = newChunkAllocation(size);
693             } else {
694                 curr.attachToMagazine(this);
695 
696                 if (curr.remainingCapacity() < size) {
697                     // Check if we either retain the chunk in the nextInLine cache or releasing it.
698                     if (curr.remainingCapacity() < RETIRE_CAPACITY) {
699                         curr.release();
700                     } else {
701                         // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
702                         // This method will release curr if this is not the case
703                         transferToNextInLineOrRelease(curr);
704                     }
705                     curr = newChunkAllocation(size);
706                 }
707             }
708 
709             current = curr;
710             try {
711                 assert current.remainingCapacity() >= size;
712                 if (curr.remainingCapacity() > size) {
713                     curr.readInitInto(buf, size, maxCapacity);
714                     curr = null;
715                 } else {
716                     curr.readInitInto(buf, size, maxCapacity);
717                 }
718             } finally {
719                 if (curr != null) {
720                     // Release in a finally block so even if readInitInto(...) would throw we would still correctly
721                     // release the current chunk before null it out.
722                     curr.release();
723                     current = null;
724                 }
725             }
726             return true;
727         }
728 
729         private void restoreMagazineFreed() {
730             Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
731             if (next != null && next != MAGAZINE_FREED) {
732                 next.release(); // A chunk snuck in through a race. Release it after restoring MAGAZINE_FREED state.
733             }
734         }
735 
736         private void transferToNextInLineOrRelease(Chunk chunk) {
737             if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
738                 return;
739             }
740 
741             Chunk nextChunk = NEXT_IN_LINE.get(this);
742             if (nextChunk != null && nextChunk != MAGAZINE_FREED
743                     && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
744                 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
745                     nextChunk.release();
746                     return;
747                 }
748             }
749             // Next-in-line is occupied. We don't try to add it to the central queue yet as it might still be used
750             // by some buffers and so is attached to a Magazine.
751             // Once a Chunk is completely released by Chunk.release() it will try to move itself to the queue
752             // as last resort.
753             chunk.release();
754         }
755 
756         private Chunk newChunkAllocation(int promptingSize) {
757             int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
758             int minChunks = size / MIN_CHUNK_SIZE;
759             if (MIN_CHUNK_SIZE * minChunks < size) {
760                 // Round up to nearest whole MIN_CHUNK_SIZE unit. The MIN_CHUNK_SIZE is an even multiple of many
761                 // popular small page sizes, like 4k, 16k, and 64k, which makes it easier for the system allocator
762                 // to manage the memory in terms of whole pages. This reduces memory fragmentation,
763                 // but without the potentially high overhead that power-of-2 chunk sizes would bring.
764                 size = MIN_CHUNK_SIZE * (1 + minChunks);
765             }
766             ChunkAllocator chunkAllocator = parent.chunkAllocator;
767             return new Chunk(chunkAllocator.allocate(size, size), this, true);
768         }
769 
770         boolean trySetNextInLine(Chunk chunk) {
771             return NEXT_IN_LINE.compareAndSet(this, null, chunk);
772         }
773 
774         void free() {
775             // Release the current Chunk and the next that was stored for later usage.
776             restoreMagazineFreed();
777             long stamp = allocationLock.writeLock();
778             try {
779                 if (current != null) {
780                     current.release();
781                     current = null;
782                 }
783             } finally {
784                 allocationLock.unlockWrite(stamp);
785             }
786         }
787 
788         public AdaptiveByteBuf newBuffer() {
789             AdaptiveByteBuf buf;
790             if (handle == null) {
791                 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
792             } else {
793                 buf = bufferQueue.poll();
794                 if (buf == null) {
795                     buf = new AdaptiveByteBuf(handle);
796                 }
797             }
798             buf.resetRefCnt();
799             buf.discardMarks();
800             return buf;
801         }
802     }
803 
804     private static final class Chunk implements ReferenceCounted {
805 
806         private final AbstractByteBuf delegate;
807         private Magazine magazine;
808         private final AdaptivePoolingAllocator allocator;
809         private final int capacity;
810         private final boolean pooled;
811         private int allocatedBytes;
812         private static final long REFCNT_FIELD_OFFSET =
813                 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
814         private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
815                 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
816 
817         private static final ReferenceCountUpdater<Chunk> updater =
818                 new ReferenceCountUpdater<Chunk>() {
819                     @Override
820                     protected AtomicIntegerFieldUpdater<Chunk> updater() {
821                         return AIF_UPDATER;
822                     }
823                     @Override
824                     protected long unsafeOffset() {
825                         return REFCNT_FIELD_OFFSET;
826                     }
827                 };
828 
829         // Value might not equal "real" reference count, all access should be via the updater
830         @SuppressWarnings({"unused", "FieldMayBeFinal"})
831         private volatile int refCnt;
832 
833         Chunk() {
834             // Constructor only used by the MAGAZINE_FREED sentinel.
835             delegate = null;
836             magazine = null;
837             allocator = null;
838             capacity = 0;
839             pooled = false;
840         }
841 
842         Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
843             this.delegate = delegate;
844             this.pooled = pooled;
845             capacity = delegate.capacity();
846             updater.setInitialValue(this);
847             allocator = magazine.parent;
848             attachToMagazine(magazine);
849         }
850 
851         Magazine currentMagazine()  {
852             return magazine;
853         }
854 
855         void detachFromMagazine() {
856             if (magazine != null) {
857                 magazine.usedMemory.getAndAdd(-capacity);
858                 magazine = null;
859             }
860         }
861 
862         void attachToMagazine(Magazine magazine) {
863             assert this.magazine == null;
864             this.magazine = magazine;
865             magazine.usedMemory.getAndAdd(capacity);
866         }
867 
868         @Override
869         public Chunk touch(Object hint) {
870             return this;
871         }
872 
873         @Override
874         public int refCnt() {
875             return updater.refCnt(this);
876         }
877 
878         @Override
879         public Chunk retain() {
880             return updater.retain(this);
881         }
882 
883         @Override
884         public Chunk retain(int increment) {
885             return updater.retain(this, increment);
886         }
887 
888         @Override
889         public Chunk touch() {
890             return this;
891         }
892 
893         @Override
894         public boolean release() {
895             if (updater.release(this)) {
896                 deallocate();
897                 return true;
898             }
899             return false;
900         }
901 
902         @Override
903         public boolean release(int decrement) {
904             if (updater.release(this, decrement)) {
905                 deallocate();
906                 return true;
907             }
908             return false;
909         }
910 
911         private void deallocate() {
912             Magazine mag = magazine;
913             AdaptivePoolingAllocator parent = mag.parent;
914             int chunkSize = mag.preferredChunkSize();
915             int memSize = delegate.capacity();
916             if (!pooled || shouldReleaseSuboptimalChunkSize(memSize, chunkSize)) {
917                 // Drop the chunk if the parent allocator is closed,
918                 // or if the chunk deviates too much from the preferred chunk size.
919                 detachFromMagazine();
920                 delegate.release();
921             } else {
922                 updater.resetRefCnt(this);
923                 delegate.setIndex(0, 0);
924                 allocatedBytes = 0;
925                 if (!mag.trySetNextInLine(this)) {
926                     // As this Chunk does not belong to the mag anymore we need to decrease the used memory .
927                     detachFromMagazine();
928                     if (!parent.offerToQueue(this)) {
929                         // The central queue is full. Ensure we release again as we previously did use resetRefCnt()
930                         // which did increase the reference count by 1.
931                         boolean released = updater.release(this);
932                         delegate.release();
933                         assert released;
934                     }
935                 }
936             }
937         }
938 
939         private static boolean shouldReleaseSuboptimalChunkSize(int givenSize, int preferredSize) {
940             int givenChunks = givenSize / MIN_CHUNK_SIZE;
941             int preferredChunks = preferredSize / MIN_CHUNK_SIZE;
942             int deviation = Math.abs(givenChunks - preferredChunks);
943 
944             // Retire chunks with a 5% probability per unit of MIN_CHUNK_SIZE deviation from preference.
945             return deviation != 0 &&
946                     PlatformDependent.threadLocalRandom().nextDouble() * 20.0 < deviation;
947         }
948 
949         public void readInitInto(AdaptiveByteBuf buf, int size, int maxCapacity) {
950             int startIndex = allocatedBytes;
951             allocatedBytes = startIndex + size;
952             Chunk chunk = this;
953             chunk.retain();
954             try {
955                 buf.init(delegate, chunk, 0, 0, startIndex, size, maxCapacity);
956                 chunk = null;
957             } finally {
958                 if (chunk != null) {
959                     // If chunk is not null we know that buf.init(...) failed and so we need to manually release
960                     // the chunk again as we retained it before calling buf.init(...). Beside this we also need to
961                     // restore the old allocatedBytes value.
962                     allocatedBytes = startIndex;
963                     chunk.release();
964                 }
965             }
966         }
967 
968         public int remainingCapacity() {
969             return capacity - allocatedBytes;
970         }
971 
972         public int capacity() {
973             return capacity;
974         }
975     }
976 
977     static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
978 
979         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
980 
981         private int adjustment;
982         private AbstractByteBuf rootParent;
983         Chunk chunk;
984         private int length;
985         private ByteBuffer tmpNioBuf;
986         private boolean hasArray;
987         private boolean hasMemoryAddress;
988 
989         AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
990             super(0);
991             handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
992         }
993 
994         void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
995                   int adjustment, int capacity, int maxCapacity) {
996             this.adjustment = adjustment;
997             chunk = wrapped;
998             length = capacity;
999             maxCapacity(maxCapacity);
1000             setIndex0(readerIndex, writerIndex);
1001             hasArray = unwrapped.hasArray();
1002             hasMemoryAddress = unwrapped.hasMemoryAddress();
1003             rootParent = unwrapped;
1004             tmpNioBuf = null;
1005         }
1006 
1007         private AbstractByteBuf rootParent() {
1008             final AbstractByteBuf rootParent = this.rootParent;
1009             if (rootParent != null) {
1010                 return rootParent;
1011             }
1012             throw new IllegalReferenceCountException();
1013         }
1014 
1015         @Override
1016         public int capacity() {
1017             return length;
1018         }
1019 
1020         @Override
1021         public ByteBuf capacity(int newCapacity) {
1022             if (newCapacity == capacity()) {
1023                 ensureAccessible();
1024                 return this;
1025             }
1026             checkNewCapacity(newCapacity);
1027             if (newCapacity < capacity()) {
1028                 length = newCapacity;
1029                 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
1030                 return this;
1031             }
1032 
1033             // Reallocation required.
1034             Chunk chunk = this.chunk;
1035             AdaptivePoolingAllocator allocator = chunk.allocator;
1036             int readerIndex = this.readerIndex;
1037             int writerIndex = this.writerIndex;
1038             int baseOldRootIndex = adjustment;
1039             int oldCapacity = length;
1040             AbstractByteBuf oldRoot = rootParent();
1041             allocator.allocate(newCapacity, maxCapacity(), this);
1042             oldRoot.getBytes(baseOldRootIndex, this, 0, oldCapacity);
1043             chunk.release();
1044             this.readerIndex = readerIndex;
1045             this.writerIndex = writerIndex;
1046             return this;
1047         }
1048 
1049         @Override
1050         public ByteBufAllocator alloc() {
1051             return rootParent().alloc();
1052         }
1053 
1054         @Override
1055         public ByteOrder order() {
1056             return rootParent().order();
1057         }
1058 
1059         @Override
1060         public ByteBuf unwrap() {
1061             return null;
1062         }
1063 
1064         @Override
1065         public boolean isDirect() {
1066             return rootParent().isDirect();
1067         }
1068 
1069         @Override
1070         public int arrayOffset() {
1071             return idx(rootParent().arrayOffset());
1072         }
1073 
1074         @Override
1075         public boolean hasMemoryAddress() {
1076             return hasMemoryAddress;
1077         }
1078 
1079         @Override
1080         public long memoryAddress() {
1081             ensureAccessible();
1082             return rootParent().memoryAddress() + adjustment;
1083         }
1084 
1085         @Override
1086         public ByteBuffer nioBuffer(int index, int length) {
1087             checkIndex(index, length);
1088             return rootParent().nioBuffer(idx(index), length);
1089         }
1090 
1091         @Override
1092         public ByteBuffer internalNioBuffer(int index, int length) {
1093             checkIndex(index, length);
1094             return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1095         }
1096 
1097         private ByteBuffer internalNioBuffer() {
1098             if (tmpNioBuf == null) {
1099                 tmpNioBuf = rootParent().nioBuffer(adjustment, length);
1100             }
1101             return (ByteBuffer) tmpNioBuf.clear();
1102         }
1103 
1104         @Override
1105         public ByteBuffer[] nioBuffers(int index, int length) {
1106             checkIndex(index, length);
1107             return rootParent().nioBuffers(idx(index), length);
1108         }
1109 
1110         @Override
1111         public boolean hasArray() {
1112             return hasArray;
1113         }
1114 
1115         @Override
1116         public byte[] array() {
1117             ensureAccessible();
1118             return rootParent().array();
1119         }
1120 
1121         @Override
1122         public ByteBuf copy(int index, int length) {
1123             checkIndex(index, length);
1124             return rootParent().copy(idx(index), length);
1125         }
1126 
1127         @Override
1128         public int nioBufferCount() {
1129             return rootParent().nioBufferCount();
1130         }
1131 
1132         @Override
1133         protected byte _getByte(int index) {
1134             return rootParent()._getByte(idx(index));
1135         }
1136 
1137         @Override
1138         protected short _getShort(int index) {
1139             return rootParent()._getShort(idx(index));
1140         }
1141 
1142         @Override
1143         protected short _getShortLE(int index) {
1144             return rootParent()._getShortLE(idx(index));
1145         }
1146 
1147         @Override
1148         protected int _getUnsignedMedium(int index) {
1149             return rootParent()._getUnsignedMedium(idx(index));
1150         }
1151 
1152         @Override
1153         protected int _getUnsignedMediumLE(int index) {
1154             return rootParent()._getUnsignedMediumLE(idx(index));
1155         }
1156 
1157         @Override
1158         protected int _getInt(int index) {
1159             return rootParent()._getInt(idx(index));
1160         }
1161 
1162         @Override
1163         protected int _getIntLE(int index) {
1164             return rootParent()._getIntLE(idx(index));
1165         }
1166 
1167         @Override
1168         protected long _getLong(int index) {
1169             return rootParent()._getLong(idx(index));
1170         }
1171 
1172         @Override
1173         protected long _getLongLE(int index) {
1174             return rootParent()._getLongLE(idx(index));
1175         }
1176 
1177         @Override
1178         public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1179             checkIndex(index, length);
1180             rootParent().getBytes(idx(index), dst, dstIndex, length);
1181             return this;
1182         }
1183 
1184         @Override
1185         public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1186             checkIndex(index, length);
1187             rootParent().getBytes(idx(index), dst, dstIndex, length);
1188             return this;
1189         }
1190 
1191         @Override
1192         public ByteBuf getBytes(int index, ByteBuffer dst) {
1193             checkIndex(index, dst.remaining());
1194             rootParent().getBytes(idx(index), dst);
1195             return this;
1196         }
1197 
1198         @Override
1199         protected void _setByte(int index, int value) {
1200             rootParent()._setByte(idx(index), value);
1201         }
1202 
1203         @Override
1204         protected void _setShort(int index, int value) {
1205             rootParent()._setShort(idx(index), value);
1206         }
1207 
1208         @Override
1209         protected void _setShortLE(int index, int value) {
1210             rootParent()._setShortLE(idx(index), value);
1211         }
1212 
1213         @Override
1214         protected void _setMedium(int index, int value) {
1215             rootParent()._setMedium(idx(index), value);
1216         }
1217 
1218         @Override
1219         protected void _setMediumLE(int index, int value) {
1220             rootParent()._setMediumLE(idx(index), value);
1221         }
1222 
1223         @Override
1224         protected void _setInt(int index, int value) {
1225             rootParent()._setInt(idx(index), value);
1226         }
1227 
1228         @Override
1229         protected void _setIntLE(int index, int value) {
1230             rootParent()._setIntLE(idx(index), value);
1231         }
1232 
1233         @Override
1234         protected void _setLong(int index, long value) {
1235             rootParent()._setLong(idx(index), value);
1236         }
1237 
1238         @Override
1239         protected void _setLongLE(int index, long value) {
1240             rootParent().setLongLE(idx(index), value);
1241         }
1242 
1243         @Override
1244         public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1245             checkIndex(index, length);
1246             ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1247             tmp.put(src, srcIndex, length);
1248             return this;
1249         }
1250 
1251         @Override
1252         public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1253             checkIndex(index, length);
1254             ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1255             tmp.put(src.nioBuffer(srcIndex, length));
1256             return this;
1257         }
1258 
1259         @Override
1260         public ByteBuf setBytes(int index, ByteBuffer src) {
1261             checkIndex(index, src.remaining());
1262             ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1263             tmp.put(src);
1264             return this;
1265         }
1266 
1267         @Override
1268         public ByteBuf getBytes(int index, OutputStream out, int length)
1269                 throws IOException {
1270             checkIndex(index, length);
1271             if (length != 0) {
1272                 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1273             }
1274             return this;
1275         }
1276 
1277         @Override
1278         public int getBytes(int index, GatheringByteChannel out, int length)
1279                 throws IOException {
1280             return out.write(internalNioBuffer(index, length).duplicate());
1281         }
1282 
1283         @Override
1284         public int getBytes(int index, FileChannel out, long position, int length)
1285                 throws IOException {
1286             return out.write(internalNioBuffer(index, length).duplicate(), position);
1287         }
1288 
1289         @Override
1290         public int setBytes(int index, InputStream in, int length)
1291                 throws IOException {
1292             checkIndex(index, length);
1293             final AbstractByteBuf rootParent = rootParent();
1294             if (rootParent.hasArray()) {
1295                 return rootParent.setBytes(idx(index), in, length);
1296             }
1297             byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1298             int readBytes = in.read(tmp, 0, length);
1299             if (readBytes <= 0) {
1300                 return readBytes;
1301             }
1302             setBytes(index, tmp, 0, readBytes);
1303             return readBytes;
1304         }
1305 
1306         @Override
1307         public int setBytes(int index, ScatteringByteChannel in, int length)
1308                 throws IOException {
1309             try {
1310                 return in.read(internalNioBuffer(index, length).duplicate());
1311             } catch (ClosedChannelException ignored) {
1312                 return -1;
1313             }
1314         }
1315 
1316         @Override
1317         public int setBytes(int index, FileChannel in, long position, int length)
1318                 throws IOException {
1319             try {
1320                 return in.read(internalNioBuffer(index, length).duplicate(), position);
1321             } catch (ClosedChannelException ignored) {
1322                 return -1;
1323             }
1324         }
1325 
1326         @Override
1327         public int setCharSequence(int index, CharSequence sequence, Charset charset) {
1328             checkIndex(index, sequence.length());
1329             return rootParent().setCharSequence(idx(index), sequence, charset);
1330         }
1331 
1332         @Override
1333         public int forEachByte(int index, int length, ByteProcessor processor) {
1334             checkIndex(index, length);
1335             int ret = rootParent().forEachByte(idx(index), length, processor);
1336             return forEachResult(ret);
1337         }
1338 
1339         @Override
1340         public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1341             checkIndex(index, length);
1342             int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1343             return forEachResult(ret);
1344         }
1345 
1346         private int forEachResult(int ret) {
1347             if (ret < adjustment) {
1348                 return -1;
1349             }
1350             return ret - adjustment;
1351         }
1352 
1353         @Override
1354         public boolean isContiguous() {
1355             return rootParent().isContiguous();
1356         }
1357 
1358         private int idx(int index) {
1359             return index + adjustment;
1360         }
1361 
1362         @Override
1363         protected void deallocate() {
1364             if (chunk != null) {
1365                 chunk.release();
1366             }
1367             tmpNioBuf = null;
1368             chunk = null;
1369             rootParent = null;
1370             if (handle instanceof EnhancedHandle) {
1371                 EnhancedHandle<AdaptiveByteBuf>  enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1372                 enhancedHandle.unguardedRecycle(this);
1373             } else {
1374                 handle.recycle(this);
1375             }
1376         }
1377     }
1378 
1379     /**
1380      * The strategy for how {@link AdaptivePoolingAllocator} should allocate chunk buffers.
1381      */
1382     interface ChunkAllocator {
1383         /**
1384          * Allocate a buffer for a chunk. This can be any kind of {@link AbstractByteBuf} implementation.
1385          * @param initialCapacity The initial capacity of the returned {@link AbstractByteBuf}.
1386          * @param maxCapacity The maximum capacity of the returned {@link AbstractByteBuf}.
1387          * @return The buffer that represents the chunk memory.
1388          */
1389         AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1390     }
1391 }