View Javadoc
1   /*
2    * Copyright 2022 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.buffer;
17  
18  import io.netty.util.ByteProcessor;
19  import io.netty.util.IllegalReferenceCountException;
20  import io.netty.util.NettyRuntime;
21  import io.netty.util.Recycler.EnhancedHandle;
22  import io.netty.util.ReferenceCounted;
23  import io.netty.util.concurrent.FastThreadLocal;
24  import io.netty.util.concurrent.FastThreadLocalThread;
25  import io.netty.util.internal.ObjectPool;
26  import io.netty.util.internal.ObjectUtil;
27  import io.netty.util.internal.PlatformDependent;
28  import io.netty.util.internal.ReferenceCountUpdater;
29  import io.netty.util.internal.SuppressJava6Requirement;
30  import io.netty.util.internal.SystemPropertyUtil;
31  import io.netty.util.internal.ThreadExecutorMap;
32  import io.netty.util.internal.ThreadLocalRandom;
33  import io.netty.util.internal.UnstableApi;
34  
35  import java.io.IOException;
36  import java.io.InputStream;
37  import java.io.OutputStream;
38  import java.nio.ByteBuffer;
39  import java.nio.ByteOrder;
40  import java.nio.channels.ClosedChannelException;
41  import java.nio.channels.FileChannel;
42  import java.nio.channels.GatheringByteChannel;
43  import java.nio.channels.ScatteringByteChannel;
44  import java.util.Arrays;
45  import java.util.Queue;
46  import java.util.Set;
47  import java.util.concurrent.ConcurrentLinkedQueue;
48  import java.util.concurrent.CopyOnWriteArraySet;
49  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
50  import java.util.concurrent.atomic.AtomicLong;
51  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
52  import java.util.concurrent.locks.StampedLock;
53  
54  /**
55   * An auto-tuning pooling allocator, that follows an anti-generational hypothesis.
56   * <p>
57   * The allocator is organized into a list of Magazines, and each magazine has a chunk-buffer that they allocate buffers
58   * from.
59   * <p>
60   * The magazines hold the mutexes that ensure the thread-safety of the allocator, and each thread picks a magazine
61   * based on the id of the thread. This spreads the contention of multi-threaded access across the magazines.
62   * If contention is detected above a certain threshold, the number of magazines are increased in response to the
63   * contention.
64   * <p>
65   * The magazines maintain histograms of the sizes of the allocations they do. The histograms are used to compute the
66   * preferred chunk size. The preferred chunk size is one that is big enough to service 10 allocations of the
67   * 99-percentile size. This way, the chunk size is adapted to the allocation patterns.
68   * <p>
69   * Computing the preferred chunk size is a somewhat expensive operation. Therefore, the frequency with which this is
70   * done, is also adapted to the allocation pattern. If a newly computed preferred chunk is the same as the previous
71   * preferred chunk size, then the frequency is reduced. Otherwise, the frequency is increased.
72   * <p>
73   * This allows the allocator to quickly respond to changes in the application workload,
74   * without suffering undue overhead from maintaining its statistics.
75   * <p>
76   * Since magazines are "relatively thread-local", the allocator has a central queue that allow excess chunks from any
77   * magazine, to be shared with other magazines.
78   * The {@link #createSharedChunkQueue()} method can be overridden to customize this queue.
79   */
80  @SuppressJava6Requirement(reason = "Guarded by version check")
81  @UnstableApi
82  final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.AdaptiveAllocatorApi {
83  
84      enum MagazineCaching {
85          EventLoopThreads,
86          FastThreadLocalThreads,
87          None
88      }
89  
90      /**
91       * The 128 KiB minimum chunk size is chosen to encourage the system allocator to delegate to mmap for chunk
92       * allocations. For instance, glibc will do this.
93       * This pushes any fragmentation from chunk size deviations off physical memory, onto virtual memory,
94       * which is a much, much larger space. Chunks are also allocated in whole multiples of the minimum
95       * chunk size, which itself is a whole multiple of popular page sizes like 4 KiB, 16 KiB, and 64 KiB.
96       */
97      private static final int MIN_CHUNK_SIZE = 128 * 1024;
98      private static final int EXPANSION_ATTEMPTS = 3;
99      private static final int INITIAL_MAGAZINES = 4;
100     private static final int RETIRE_CAPACITY = 4 * 1024;
101     private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
102     private static final int BUFS_PER_CHUNK = 10; // For large buffers, aim to have about this many buffers per chunk.
103 
104     /**
105      * The maximum size of a pooled chunk, in bytes. Allocations bigger than this will never be pooled.
106      * <p>
107      * This number is 10 MiB, and is derived from the limitations of internal histograms.
108      */
109     private static final int MAX_CHUNK_SIZE =
110             BUFS_PER_CHUNK * (1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT); // 10 MiB.
111 
112     /**
113      * The capacity if the central queue that allow chunks to be shared across magazines.
114      * The default size is {@link NettyRuntime#availableProcessors()},
115      * and the maximum number of magazines is twice this.
116      * <p>
117      * This means the maximum amount of memory that we can have allocated-but-not-in-use is
118      * 5 * {@link NettyRuntime#availableProcessors()} * {@link #MAX_CHUNK_SIZE} bytes.
119      */
120     private static final int CENTRAL_QUEUE_CAPACITY = Math.max(2, SystemPropertyUtil.getInt(
121             "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors()));
122 
123     /**
124      * The capacity if the magazine local buffer queue. This queue just pools the outer ByteBuf instance and not
125      * the actual memory and so helps to reduce GC pressure.
126      */
127     private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
128             "io.netty.allocator.magazineBufferQueueCapacity", 1024);
129 
130     private static final Object NO_MAGAZINE = Boolean.TRUE;
131 
132     private final ChunkAllocator chunkAllocator;
133     private final Queue<Chunk> centralQueue;
134     private final StampedLock magazineExpandLock;
135     private volatile Magazine[] magazines;
136     private final FastThreadLocal<Object> threadLocalMagazine;
137     private final Set<Magazine> liveCachedMagazines;
138     private volatile boolean freed;
139 
140     static {
141         if (CENTRAL_QUEUE_CAPACITY < 2) {
142             throw new IllegalArgumentException("CENTRAL_QUEUE_CAPACITY: " + CENTRAL_QUEUE_CAPACITY
143                     + " (expected: >= " + 2 + ')');
144         }
145         if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
146             throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
147                     + " (expected: >= " + 2 + ')');
148         }
149     }
150 
151     AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
152         ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
153         ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
154         this.chunkAllocator = chunkAllocator;
155         centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
156         magazineExpandLock = new StampedLock();
157         if (magazineCaching != MagazineCaching.None) {
158             assert magazineCaching == MagazineCaching.EventLoopThreads ||
159                    magazineCaching == MagazineCaching.FastThreadLocalThreads;
160             final boolean cachedMagazinesNonEventLoopThreads =
161                     magazineCaching == MagazineCaching.FastThreadLocalThreads;
162             final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
163             threadLocalMagazine = new FastThreadLocal<Object>() {
164                 @Override
165                 protected Object initialValue() {
166                     if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
167                         if (!FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread())) {
168                             // To prevent potential leak, we will not use thread-local magazine.
169                             return NO_MAGAZINE;
170                         }
171                         Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
172                         liveMagazines.add(mag);
173                         return mag;
174                     }
175                     return NO_MAGAZINE;
176                 }
177 
178                 @Override
179                 protected void onRemoval(final Object value) throws Exception {
180                     if (value != NO_MAGAZINE) {
181                         liveMagazines.remove(value);
182                     }
183                 }
184             };
185             liveCachedMagazines = liveMagazines;
186         } else {
187             threadLocalMagazine = null;
188             liveCachedMagazines = null;
189         }
190         Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
191         for (int i = 0; i < mags.length; i++) {
192             mags[i] = new Magazine(this);
193         }
194         magazines = mags;
195     }
196 
197     /**
198      * Create a thread-safe multi-producer, multi-consumer queue to hold chunks that spill over from the
199      * internal Magazines.
200      * <p>
201      * Each Magazine can only hold two chunks at any one time: the chunk it currently allocates from,
202      * and the next-in-line chunk which will be used for allocation once the current one has been used up.
203      * This queue will be used by magazines to share any excess chunks they allocate, so that they don't need to
204      * allocate new chunks when their current and next-in-line chunks have both been used up.
205      * <p>
206      * The simplest implementation of this method is to return a new {@link ConcurrentLinkedQueue}.
207      * However, the {@code CLQ} is unbounded, and this means there's no limit to how many chunks can be cached in this
208      * queue.
209      * <p>
210      * Each chunk in this queue can be up to {@link #MAX_CHUNK_SIZE} in size, so it is recommended to use a bounded
211      * queue to limit the maximum memory usage.
212      * <p>
213      * The default implementation will create a bounded queue with a capacity of {@link #CENTRAL_QUEUE_CAPACITY}.
214      *
215      * @return A new multi-producer, multi-consumer queue.
216      */
217     private static Queue<Chunk> createSharedChunkQueue() {
218         return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
219     }
220 
221     @Override
222     public ByteBuf allocate(int size, int maxCapacity) {
223         return allocate(size, maxCapacity, Thread.currentThread(), null);
224     }
225 
226     private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
227         if (size <= MAX_CHUNK_SIZE) {
228             int sizeBucket = AllocationStatistics.sizeBucket(size); // Compute outside of Magazine lock for better ILP.
229             FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
230             if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
231                 Object mag = threadLocalMagazine.get();
232                 if (mag != NO_MAGAZINE) {
233                     Magazine magazine = (Magazine) mag;
234                     if (buf == null) {
235                         buf = magazine.newBuffer();
236                     }
237                     boolean allocated = magazine.tryAllocate(size, sizeBucket, maxCapacity, buf);
238                     assert allocated : "Allocation of threadLocalMagazine must always succeed";
239                     return buf;
240                 }
241             }
242             long threadId = currentThread.getId();
243             Magazine[] mags;
244             int expansions = 0;
245             do {
246                 mags = magazines;
247                 int mask = mags.length - 1;
248                 int index = (int) (threadId & mask);
249                 for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
250                     Magazine mag = mags[index + i & mask];
251                     if (buf == null) {
252                         buf = mag.newBuffer();
253                     }
254                     if (mag.tryAllocate(size, sizeBucket, maxCapacity, buf)) {
255                         // Was able to allocate.
256                         return buf;
257                     }
258                 }
259                 expansions++;
260             } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
261         }
262 
263         // The magazines failed us, or the buffer is too big to be pooled.
264         return allocateFallback(size, maxCapacity, currentThread, buf);
265     }
266 
267     private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
268         // If we don't already have a buffer, obtain one from the most conveniently available magazine.
269         Magazine magazine;
270         if (buf != null) {
271             Chunk chunk = buf.chunk;
272             if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
273                 magazine = getFallbackMagazine(currentThread);
274             }
275         } else {
276             magazine = getFallbackMagazine(currentThread);
277             buf = magazine.newBuffer();
278         }
279         // Create a one-off chunk for this allocation.
280         AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
281         Chunk chunk = new Chunk(innerChunk, magazine, false);
282         try {
283             chunk.readInitInto(buf, size, maxCapacity);
284         } finally {
285             // As the chunk is an one-off we need to always call release explicitly as readInitInto(...)
286             // will take care of retain once when successful. Once The AdaptiveByteBuf is released it will
287             // completely release the Chunk and so the contained innerChunk.
288             chunk.release();
289         }
290         return buf;
291     }
292 
293     private Magazine getFallbackMagazine(Thread currentThread) {
294         Object tlMag;
295         FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
296         if (threadLocalMagazine != null &&
297                 currentThread instanceof FastThreadLocalThread &&
298                 (tlMag = threadLocalMagazine.get()) != NO_MAGAZINE) {
299             return (Magazine) tlMag;
300         }
301         Magazine[] mags = magazines;
302         return mags[(int) currentThread.getId() & mags.length - 1];
303     }
304 
305     /**
306      * Allocate into the given buffer. Used by {@link AdaptiveByteBuf#capacity(int)}.
307      */
308     void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
309         AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
310         assert result == into: "Re-allocation created separate buffer instance";
311     }
312 
313     @Override
314     public long usedMemory() {
315         long sum = 0;
316         for (Chunk chunk : centralQueue) {
317             sum += chunk.capacity();
318         }
319         for (Magazine magazine : magazines) {
320             sum += magazine.usedMemory.get();
321         }
322         if (liveCachedMagazines != null) {
323             for (Magazine magazine : liveCachedMagazines) {
324                 sum += magazine.usedMemory.get();
325             }
326         }
327         return sum;
328     }
329 
330     private boolean tryExpandMagazines(int currentLength) {
331         if (currentLength >= MAX_STRIPES) {
332             return true;
333         }
334         final Magazine[] mags;
335         long writeLock = magazineExpandLock.tryWriteLock();
336         if (writeLock != 0) {
337             try {
338                 mags = magazines;
339                 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
340                     return true;
341                 }
342                 int preferredChunkSize = mags[0].sharedPrefChunkSize;
343                 Magazine[] expanded = new Magazine[mags.length * 2];
344                 for (int i = 0, l = expanded.length; i < l; i++) {
345                     Magazine m = new Magazine(this);
346                     m.localPrefChunkSize = preferredChunkSize;
347                     m.sharedPrefChunkSize = preferredChunkSize;
348                     expanded[i] = m;
349                 }
350                 magazines = expanded;
351             } finally {
352                 magazineExpandLock.unlockWrite(writeLock);
353             }
354             for (Magazine magazine : mags) {
355                 magazine.free();
356             }
357         }
358         return true;
359     }
360 
361     private boolean offerToQueue(Chunk buffer) {
362         if (freed) {
363             return false;
364         }
365         // The Buffer should not be used anymore, let's add an assert to so we guard against bugs in the future.
366         assert buffer.allocatedBytes == 0;
367         assert buffer.magazine == null;
368 
369         boolean isAdded = centralQueue.offer(buffer);
370         if (freed && isAdded) {
371             // Help to free the centralQueue.
372             freeCentralQueue();
373         }
374         return isAdded;
375     }
376 
377     // Ensure that we release all previous pooled resources when this object is finalized. This is needed as otherwise
378     // we might end up with leaks. While these leaks are usually harmless in reality it would still at least be
379     // very confusing for users.
380     @Override
381     protected void finalize() throws Throwable {
382         try {
383             super.finalize();
384         } finally {
385             free();
386         }
387     }
388 
389     private void free() {
390         freed = true;
391         long stamp = magazineExpandLock.writeLock();
392         try {
393             Magazine[] mags = magazines;
394             for (Magazine magazine : mags) {
395                 magazine.free();
396             }
397         } finally {
398             magazineExpandLock.unlockWrite(stamp);
399         }
400         freeCentralQueue();
401     }
402 
403     private void freeCentralQueue() {
404         for (;;) {
405             Chunk chunk = centralQueue.poll();
406             if (chunk == null) {
407                 break;
408             }
409             chunk.release();
410         }
411     }
412 
413     static int sizeBucket(int size) {
414         return AllocationStatistics.sizeBucket(size);
415     }
416 
417     @SuppressWarnings("checkstyle:finalclass") // Checkstyle mistakenly believes this class should be final.
418     private static class AllocationStatistics {
419         private static final int MIN_DATUM_TARGET = 1024;
420         private static final int MAX_DATUM_TARGET = 65534;
421         private static final int INIT_DATUM_TARGET = 9;
422         private static final int HISTO_MIN_BUCKET_SHIFT = 13; // Smallest bucket is 1 << 13 = 8192 bytes in size.
423         private static final int HISTO_MAX_BUCKET_SHIFT = 20; // Biggest bucket is 1 << 20 = 1 MiB bytes in size.
424         private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT; // 8 buckets.
425         private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
426         private static final int SIZE_MAX_MASK = MAX_CHUNK_SIZE - 1;
427 
428         protected final AdaptivePoolingAllocator parent;
429         private final boolean shareable;
430         private final short[][] histos = {
431                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
432                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
433         };
434         private short[] histo = histos[0];
435         private final int[] sums = new int[HISTO_BUCKET_COUNT];
436 
437         private int histoIndex;
438         private int datumCount;
439         private int datumTarget = INIT_DATUM_TARGET;
440         protected volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
441         protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
442 
443         private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
444             this.parent = parent;
445             this.shareable = shareable;
446         }
447 
448         protected void recordAllocationSize(int bucket) {
449             histo[bucket]++;
450             if (datumCount++ == datumTarget) {
451                 rotateHistograms();
452             }
453         }
454 
455         static int sizeBucket(int size) {
456             if (size == 0) {
457                 return 0;
458             }
459             // Minimum chunk size is 128 KiB. We'll only make bigger chunks if the 99-percentile is 16 KiB or greater,
460             // so we truncate and roll up the bottom part of the histogram to 8 KiB.
461             // The upper size band is 1 MiB, and that gives us exactly 8 size buckets,
462             // which is a magical number for JIT optimisations.
463             int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & SIZE_MAX_MASK;
464             return Math.min(Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize), HISTO_MAX_BUCKET_MASK);
465         }
466 
467         private void rotateHistograms() {
468             short[][] hs = histos;
469             for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
470                 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
471             }
472             int sum = 0;
473             for (int count : sums) {
474                 sum  += count;
475             }
476             int targetPercentile = (int) (sum * 0.99);
477             int sizeBucket = 0;
478             for (; sizeBucket < sums.length; sizeBucket++) {
479                 if (sums[sizeBucket] > targetPercentile) {
480                     break;
481                 }
482                 targetPercentile -= sums[sizeBucket];
483             }
484             int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
485             int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
486             localPrefChunkSize = prefChunkSize;
487             if (shareable) {
488                 for (Magazine mag : parent.magazines) {
489                     prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
490                 }
491             }
492             if (sharedPrefChunkSize != prefChunkSize) {
493                 // Preferred chunk size changed. Increase check frequency.
494                 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
495                 sharedPrefChunkSize = prefChunkSize;
496             } else {
497                 // Preferred chunk size did not change. Check less often.
498                 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
499             }
500 
501             histoIndex = histoIndex + 1 & 3;
502             histo = histos[histoIndex];
503             datumCount = 0;
504             Arrays.fill(histo, (short) 0);
505         }
506 
507         /**
508          * Get the preferred chunk size, based on statistics from the {@linkplain #recordAllocationSize(int) recorded}
509          * allocation sizes.
510          * <p>
511          * This method must be thread-safe.
512          *
513          * @return The currently preferred chunk allocation size.
514          */
515         protected int preferredChunkSize() {
516             return sharedPrefChunkSize;
517         }
518     }
519 
520     @SuppressJava6Requirement(reason = "Guarded by version check")
521     private static final class Magazine extends AllocationStatistics {
522         private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
523         static {
524             NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
525         }
526         private static final Chunk MAGAZINE_FREED = new Chunk();
527 
528         private static final ObjectPool<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
529                 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
530                     @Override
531                     public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
532                         return new AdaptiveByteBuf(handle);
533                     }
534                 });
535 
536         private Chunk current;
537         @SuppressWarnings("unused") // updated via NEXT_IN_LINE
538         private volatile Chunk nextInLine;
539         private final AtomicLong usedMemory;
540         private final StampedLock allocationLock;
541         private final Queue<AdaptiveByteBuf> bufferQueue;
542         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
543 
544         Magazine(AdaptivePoolingAllocator parent) {
545             this(parent, true);
546         }
547 
548         Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
549             super(parent, shareable);
550 
551             if (shareable) {
552                 // We only need the StampedLock if this Magazine will be shared across threads.
553                 allocationLock = new StampedLock();
554                 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
555                 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
556                     @Override
557                     public void recycle(AdaptiveByteBuf self) {
558                         bufferQueue.offer(self);
559                     }
560                 };
561             } else {
562                 allocationLock = null;
563                 bufferQueue = null;
564                 handle = null;
565             }
566             usedMemory = new AtomicLong();
567         }
568 
569         public boolean tryAllocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
570             if (allocationLock == null) {
571                 // This magazine is not shared across threads, just allocate directly.
572                 return allocate(size, sizeBucket, maxCapacity, buf);
573             }
574 
575             // Try to retrieve the lock and if successful allocate.
576             long writeLock = allocationLock.tryWriteLock();
577             if (writeLock != 0) {
578                 try {
579                     return allocate(size, sizeBucket, maxCapacity, buf);
580                 } finally {
581                     allocationLock.unlockWrite(writeLock);
582                 }
583             }
584             return allocateWithoutLock(size, maxCapacity, buf);
585         }
586 
587         private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf buf) {
588             Chunk curr = NEXT_IN_LINE.getAndSet(this, null);
589             if (curr == MAGAZINE_FREED) {
590                 // Allocation raced with a stripe-resize that freed this magazine.
591                 restoreMagazineFreed();
592                 return false;
593             }
594             if (curr == null) {
595                 curr = parent.centralQueue.poll();
596                 if (curr == null) {
597                     return false;
598                 }
599                 curr.attachToMagazine(this);
600             }
601             if (curr.remainingCapacity() >= size) {
602                 curr.readInitInto(buf, size, maxCapacity);
603             }
604             try {
605                 if (curr.remainingCapacity() >= RETIRE_CAPACITY) {
606                     transferToNextInLineOrRelease(curr);
607                     curr = null;
608                 }
609             } finally {
610                 if (curr != null) {
611                     curr.release();
612                 }
613             }
614             return true;
615         }
616 
617         private boolean allocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
618             recordAllocationSize(sizeBucket);
619             Chunk curr = current;
620             if (curr != null) {
621                 // We have a Chunk that has some space left.
622                 if (curr.remainingCapacity() > size) {
623                     curr.readInitInto(buf, size, maxCapacity);
624                     // We still have some bytes left that we can use for the next allocation, just early return.
625                     return true;
626                 }
627 
628                 // At this point we know that this will be the last time current will be used, so directly set it to
629                 // null and release it once we are done.
630                 current = null;
631                 if (curr.remainingCapacity() == size) {
632                     try {
633                         curr.readInitInto(buf, size, maxCapacity);
634                         return true;
635                     } finally {
636                         curr.release();
637                     }
638                 }
639 
640                 // Check if we either retain the chunk in the nextInLine cache or releasing it.
641                 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
642                     curr.release();
643                 } else {
644                     // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
645                     // This method will release curr if this is not the case
646                     transferToNextInLineOrRelease(curr);
647                 }
648             }
649 
650             assert current == null;
651             // The fast-path for allocations did not work.
652             //
653             // Try to fetch the next "Magazine local" Chunk first, if this this fails as we don't have
654             // one setup we will poll our centralQueue. If this fails as well we will just allocate a new Chunk.
655             //
656             // In any case we will store the Chunk as the current so it will be used again for the next allocation and
657             // so be "reserved" by this Magazine for exclusive usage.
658             curr = NEXT_IN_LINE.getAndSet(this, null);
659             if (curr != null) {
660                 if (curr == MAGAZINE_FREED) {
661                     // Allocation raced with a stripe-resize that freed this magazine.
662                     restoreMagazineFreed();
663                     return false;
664                 }
665 
666                 if (curr.remainingCapacity() > size) {
667                     // We have a Chunk that has some space left.
668                     curr.readInitInto(buf, size, maxCapacity);
669                     current = curr;
670                     return true;
671                 }
672 
673                 if (curr.remainingCapacity() == size) {
674                     // At this point we know that this will be the last time curr will be used, so directly set it to
675                     // null and release it once we are done.
676                     try {
677                         curr.readInitInto(buf, size, maxCapacity);
678                         return true;
679                     } finally {
680                         // Release in a finally block so even if readInitInto(...) would throw we would still correctly
681                         // release the current chunk before null it out.
682                         curr.release();
683                     }
684                 } else {
685                     // Release it as it's too small.
686                     curr.release();
687                 }
688             }
689 
690             // Now try to poll from the central queue first
691             curr = parent.centralQueue.poll();
692             if (curr == null) {
693                 curr = newChunkAllocation(size);
694             } else {
695                 curr.attachToMagazine(this);
696 
697                 if (curr.remainingCapacity() < size) {
698                     // Check if we either retain the chunk in the nextInLine cache or releasing it.
699                     if (curr.remainingCapacity() < RETIRE_CAPACITY) {
700                         curr.release();
701                     } else {
702                         // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
703                         // This method will release curr if this is not the case
704                         transferToNextInLineOrRelease(curr);
705                     }
706                     curr = newChunkAllocation(size);
707                 }
708             }
709 
710             current = curr;
711             try {
712                 assert current.remainingCapacity() >= size;
713                 if (curr.remainingCapacity() > size) {
714                     curr.readInitInto(buf, size, maxCapacity);
715                     curr = null;
716                 } else {
717                     curr.readInitInto(buf, size, maxCapacity);
718                 }
719             } finally {
720                 if (curr != null) {
721                     // Release in a finally block so even if readInitInto(...) would throw we would still correctly
722                     // release the current chunk before null it out.
723                     curr.release();
724                     current = null;
725                 }
726             }
727             return true;
728         }
729 
730         private void restoreMagazineFreed() {
731             Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
732             if (next != null && next != MAGAZINE_FREED) {
733                 next.release(); // A chunk snuck in through a race. Release it after restoring MAGAZINE_FREED state.
734             }
735         }
736 
737         private void transferToNextInLineOrRelease(Chunk chunk) {
738             if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
739                 return;
740             }
741 
742             Chunk nextChunk = NEXT_IN_LINE.get(this);
743             if (nextChunk != null && nextChunk != MAGAZINE_FREED
744                     && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
745                 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
746                     nextChunk.release();
747                     return;
748                 }
749             }
750             // Next-in-line is occupied. We don't try to add it to the central queue yet as it might still be used
751             // by some buffers and so is attached to a Magazine.
752             // Once a Chunk is completely released by Chunk.release() it will try to move itself to the queue
753             // as last resort.
754             chunk.release();
755         }
756 
757         private Chunk newChunkAllocation(int promptingSize) {
758             int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
759             int minChunks = size / MIN_CHUNK_SIZE;
760             if (MIN_CHUNK_SIZE * minChunks < size) {
761                 // Round up to nearest whole MIN_CHUNK_SIZE unit. The MIN_CHUNK_SIZE is an even multiple of many
762                 // popular small page sizes, like 4k, 16k, and 64k, which makes it easier for the system allocator
763                 // to manage the memory in terms of whole pages. This reduces memory fragmentation,
764                 // but without the potentially high overhead that power-of-2 chunk sizes would bring.
765                 size = MIN_CHUNK_SIZE * (1 + minChunks);
766             }
767             ChunkAllocator chunkAllocator = parent.chunkAllocator;
768             return new Chunk(chunkAllocator.allocate(size, size), this, true);
769         }
770 
771         boolean trySetNextInLine(Chunk chunk) {
772             return NEXT_IN_LINE.compareAndSet(this, null, chunk);
773         }
774 
775         void free() {
776             // Release the current Chunk and the next that was stored for later usage.
777             restoreMagazineFreed();
778             long stamp = allocationLock.writeLock();
779             try {
780                 if (current != null) {
781                     current.release();
782                     current = null;
783                 }
784             } finally {
785                 allocationLock.unlockWrite(stamp);
786             }
787         }
788 
789         public AdaptiveByteBuf newBuffer() {
790             AdaptiveByteBuf buf;
791             if (handle == null) {
792                 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
793             } else {
794                 buf = bufferQueue.poll();
795                 if (buf == null) {
796                     buf = new AdaptiveByteBuf(handle);
797                 }
798             }
799             buf.resetRefCnt();
800             buf.discardMarks();
801             return buf;
802         }
803     }
804 
805     private static final class Chunk implements ReferenceCounted {
806 
807         private final AbstractByteBuf delegate;
808         private Magazine magazine;
809         private final AdaptivePoolingAllocator allocator;
810         private final int capacity;
811         private final boolean pooled;
812         private int allocatedBytes;
813         private static final long REFCNT_FIELD_OFFSET =
814                 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
815         private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
816                 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
817 
818         private static final ReferenceCountUpdater<Chunk> updater =
819                 new ReferenceCountUpdater<Chunk>() {
820                     @Override
821                     protected AtomicIntegerFieldUpdater<Chunk> updater() {
822                         return AIF_UPDATER;
823                     }
824                     @Override
825                     protected long unsafeOffset() {
826                         return REFCNT_FIELD_OFFSET;
827                     }
828                 };
829 
830         // Value might not equal "real" reference count, all access should be via the updater
831         @SuppressWarnings({"unused", "FieldMayBeFinal"})
832         private volatile int refCnt;
833 
834         Chunk() {
835             // Constructor only used by the MAGAZINE_FREED sentinel.
836             delegate = null;
837             magazine = null;
838             allocator = null;
839             capacity = 0;
840             pooled = false;
841         }
842 
843         Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
844             this.delegate = delegate;
845             this.pooled = pooled;
846             capacity = delegate.capacity();
847             updater.setInitialValue(this);
848             allocator = magazine.parent;
849             attachToMagazine(magazine);
850         }
851 
852         Magazine currentMagazine()  {
853             return magazine;
854         }
855 
856         void detachFromMagazine() {
857             if (magazine != null) {
858                 magazine.usedMemory.getAndAdd(-capacity);
859                 magazine = null;
860             }
861         }
862 
863         void attachToMagazine(Magazine magazine) {
864             assert this.magazine == null;
865             this.magazine = magazine;
866             magazine.usedMemory.getAndAdd(capacity);
867         }
868 
869         @Override
870         public Chunk touch(Object hint) {
871             return this;
872         }
873 
874         @Override
875         public int refCnt() {
876             return updater.refCnt(this);
877         }
878 
879         @Override
880         public Chunk retain() {
881             return updater.retain(this);
882         }
883 
884         @Override
885         public Chunk retain(int increment) {
886             return updater.retain(this, increment);
887         }
888 
889         @Override
890         public Chunk touch() {
891             return this;
892         }
893 
894         @Override
895         public boolean release() {
896             if (updater.release(this)) {
897                 deallocate();
898                 return true;
899             }
900             return false;
901         }
902 
903         @Override
904         public boolean release(int decrement) {
905             if (updater.release(this, decrement)) {
906                 deallocate();
907                 return true;
908             }
909             return false;
910         }
911 
912         private void deallocate() {
913             Magazine mag = magazine;
914             AdaptivePoolingAllocator parent = mag.parent;
915             int chunkSize = mag.preferredChunkSize();
916             int memSize = delegate.capacity();
917             if (!pooled || shouldReleaseSuboptimalChunkSuze(memSize, chunkSize)) {
918                 // Drop the chunk if the parent allocator is closed,
919                 // or if the chunk deviates too much from the preferred chunk size.
920                 detachFromMagazine();
921                 delegate.release();
922             } else {
923                 updater.resetRefCnt(this);
924                 delegate.setIndex(0, 0);
925                 allocatedBytes = 0;
926                 if (!mag.trySetNextInLine(this)) {
927                     // As this Chunk does not belong to the mag anymore we need to decrease the used memory .
928                     detachFromMagazine();
929                     if (!parent.offerToQueue(this)) {
930                         // The central queue is full. Ensure we release again as we previously did use resetRefCnt()
931                         // which did increase the reference count by 1.
932                         boolean released = updater.release(this);
933                         delegate.release();
934                         assert released;
935                     }
936                 }
937             }
938         }
939 
940         private static boolean shouldReleaseSuboptimalChunkSuze(int givenSize, int preferredSize) {
941             int givenChunks = givenSize / MIN_CHUNK_SIZE;
942             int preferredChunks = preferredSize / MIN_CHUNK_SIZE;
943             int deviation = Math.abs(givenChunks - preferredChunks);
944 
945             // Retire chunks with a 0.5% probability per unit of MIN_CHUNK_SIZE deviation from preference.
946             return deviation != 0 &&
947                     PlatformDependent.threadLocalRandom().nextDouble() * 200.0 > deviation;
948         }
949 
950         public void readInitInto(AdaptiveByteBuf buf, int size, int maxCapacity) {
951             int startIndex = allocatedBytes;
952             allocatedBytes = startIndex + size;
953             Chunk chunk = this;
954             chunk.retain();
955             try {
956                 buf.init(delegate, chunk, 0, 0, startIndex, size, maxCapacity);
957                 chunk = null;
958             } finally {
959                 if (chunk != null) {
960                     // If chunk is not null we know that buf.init(...) failed and so we need to manually release
961                     // the chunk again as we retained it before calling buf.init(...). Beside this we also need to
962                     // restore the old allocatedBytes value.
963                     allocatedBytes = startIndex;
964                     chunk.release();
965                 }
966             }
967         }
968 
969         public int remainingCapacity() {
970             return capacity - allocatedBytes;
971         }
972 
973         public int capacity() {
974             return capacity;
975         }
976     }
977 
978     static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
979 
980         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
981 
982         private int adjustment;
983         private AbstractByteBuf rootParent;
984         Chunk chunk;
985         private int length;
986         private ByteBuffer tmpNioBuf;
987         private boolean hasArray;
988         private boolean hasMemoryAddress;
989 
990         AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
991             super(0);
992             handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
993         }
994 
995         void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
996                   int adjustment, int capacity, int maxCapacity) {
997             this.adjustment = adjustment;
998             chunk = wrapped;
999             length = capacity;
1000             maxCapacity(maxCapacity);
1001             setIndex0(readerIndex, writerIndex);
1002             hasArray = unwrapped.hasArray();
1003             hasMemoryAddress = unwrapped.hasMemoryAddress();
1004             rootParent = unwrapped;
1005             tmpNioBuf = unwrapped.internalNioBuffer(adjustment, capacity).slice();
1006         }
1007 
1008         private AbstractByteBuf rootParent() {
1009             final AbstractByteBuf rootParent = this.rootParent;
1010             if (rootParent != null) {
1011                 return rootParent;
1012             }
1013             throw new IllegalReferenceCountException();
1014         }
1015 
1016         @Override
1017         public int capacity() {
1018             return length;
1019         }
1020 
1021         @Override
1022         public ByteBuf capacity(int newCapacity) {
1023             if (newCapacity == capacity()) {
1024                 ensureAccessible();
1025                 return this;
1026             }
1027             checkNewCapacity(newCapacity);
1028             if (newCapacity < capacity()) {
1029                 length = newCapacity;
1030                 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
1031                 return this;
1032             }
1033 
1034             // Reallocation required.
1035             ByteBuffer data = tmpNioBuf;
1036             data.clear();
1037             tmpNioBuf = null;
1038             Chunk chunk = this.chunk;
1039             AdaptivePoolingAllocator allocator = chunk.allocator;
1040             int readerIndex = this.readerIndex;
1041             int writerIndex = this.writerIndex;
1042             allocator.allocate(newCapacity, maxCapacity(), this);
1043             tmpNioBuf.put(data);
1044             tmpNioBuf.clear();
1045             chunk.release();
1046             this.readerIndex = readerIndex;
1047             this.writerIndex = writerIndex;
1048             return this;
1049         }
1050 
1051         @Override
1052         public ByteBufAllocator alloc() {
1053             return rootParent().alloc();
1054         }
1055 
1056         @Override
1057         public ByteOrder order() {
1058             return rootParent().order();
1059         }
1060 
1061         @Override
1062         public ByteBuf unwrap() {
1063             return null;
1064         }
1065 
1066         @Override
1067         public boolean isDirect() {
1068             return rootParent().isDirect();
1069         }
1070 
1071         @Override
1072         public int arrayOffset() {
1073             return idx(rootParent().arrayOffset());
1074         }
1075 
1076         @Override
1077         public boolean hasMemoryAddress() {
1078             return hasMemoryAddress;
1079         }
1080 
1081         @Override
1082         public long memoryAddress() {
1083             ensureAccessible();
1084             return rootParent().memoryAddress() + adjustment;
1085         }
1086 
1087         @Override
1088         public ByteBuffer nioBuffer(int index, int length) {
1089             checkIndex(index, length);
1090             return rootParent().nioBuffer(idx(index), length);
1091         }
1092 
1093         @Override
1094         public ByteBuffer internalNioBuffer(int index, int length) {
1095             checkIndex(index, length);
1096             return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1097         }
1098 
1099         private ByteBuffer internalNioBuffer() {
1100             return (ByteBuffer) tmpNioBuf.clear();
1101         }
1102 
1103         @Override
1104         public ByteBuffer[] nioBuffers(int index, int length) {
1105             checkIndex(index, length);
1106             return rootParent().nioBuffers(idx(index), length);
1107         }
1108 
1109         @Override
1110         public boolean hasArray() {
1111             return hasArray;
1112         }
1113 
1114         @Override
1115         public byte[] array() {
1116             ensureAccessible();
1117             return rootParent().array();
1118         }
1119 
1120         @Override
1121         public ByteBuf copy(int index, int length) {
1122             checkIndex(index, length);
1123             return rootParent().copy(idx(index), length);
1124         }
1125 
1126         @Override
1127         public int nioBufferCount() {
1128             return rootParent().nioBufferCount();
1129         }
1130 
1131         @Override
1132         protected byte _getByte(int index) {
1133             return rootParent()._getByte(idx(index));
1134         }
1135 
1136         @Override
1137         protected short _getShort(int index) {
1138             return rootParent()._getShort(idx(index));
1139         }
1140 
1141         @Override
1142         protected short _getShortLE(int index) {
1143             return rootParent()._getShortLE(idx(index));
1144         }
1145 
1146         @Override
1147         protected int _getUnsignedMedium(int index) {
1148             return rootParent()._getUnsignedMedium(idx(index));
1149         }
1150 
1151         @Override
1152         protected int _getUnsignedMediumLE(int index) {
1153             return rootParent()._getUnsignedMediumLE(idx(index));
1154         }
1155 
1156         @Override
1157         protected int _getInt(int index) {
1158             return rootParent()._getInt(idx(index));
1159         }
1160 
1161         @Override
1162         protected int _getIntLE(int index) {
1163             return rootParent()._getIntLE(idx(index));
1164         }
1165 
1166         @Override
1167         protected long _getLong(int index) {
1168             return rootParent()._getLong(idx(index));
1169         }
1170 
1171         @Override
1172         protected long _getLongLE(int index) {
1173             return rootParent()._getLongLE(idx(index));
1174         }
1175 
1176         @Override
1177         public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1178             checkIndex(index, length);
1179             rootParent().getBytes(idx(index), dst, dstIndex, length);
1180             return this;
1181         }
1182 
1183         @Override
1184         public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1185             checkIndex(index, length);
1186             rootParent().getBytes(idx(index), dst, dstIndex, length);
1187             return this;
1188         }
1189 
1190         @Override
1191         public ByteBuf getBytes(int index, ByteBuffer dst) {
1192             checkIndex(index, dst.remaining());
1193             rootParent().getBytes(idx(index), dst);
1194             return this;
1195         }
1196 
1197         @Override
1198         protected void _setByte(int index, int value) {
1199             rootParent()._setByte(idx(index), value);
1200         }
1201 
1202         @Override
1203         protected void _setShort(int index, int value) {
1204             rootParent()._setShort(idx(index), value);
1205         }
1206 
1207         @Override
1208         protected void _setShortLE(int index, int value) {
1209             rootParent()._setShortLE(idx(index), value);
1210         }
1211 
1212         @Override
1213         protected void _setMedium(int index, int value) {
1214             rootParent()._setMedium(idx(index), value);
1215         }
1216 
1217         @Override
1218         protected void _setMediumLE(int index, int value) {
1219             rootParent()._setMediumLE(idx(index), value);
1220         }
1221 
1222         @Override
1223         protected void _setInt(int index, int value) {
1224             rootParent()._setInt(idx(index), value);
1225         }
1226 
1227         @Override
1228         protected void _setIntLE(int index, int value) {
1229             rootParent()._setIntLE(idx(index), value);
1230         }
1231 
1232         @Override
1233         protected void _setLong(int index, long value) {
1234             rootParent()._setLong(idx(index), value);
1235         }
1236 
1237         @Override
1238         protected void _setLongLE(int index, long value) {
1239             rootParent().setLongLE(idx(index), value);
1240         }
1241 
1242         @Override
1243         public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1244             checkIndex(index, length);
1245             rootParent().setBytes(idx(index), src, srcIndex, length);
1246             return this;
1247         }
1248 
1249         @Override
1250         public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1251             checkIndex(index, length);
1252             rootParent().setBytes(idx(index), src, srcIndex, length);
1253             return this;
1254         }
1255 
1256         @Override
1257         public ByteBuf setBytes(int index, ByteBuffer src) {
1258             checkIndex(index, src.remaining());
1259             rootParent().setBytes(idx(index), src);
1260             return this;
1261         }
1262 
1263         @Override
1264         public ByteBuf getBytes(int index, OutputStream out, int length)
1265                 throws IOException {
1266             checkIndex(index, length);
1267             if (length != 0) {
1268                 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1269             }
1270             return this;
1271         }
1272 
1273         @Override
1274         public int getBytes(int index, GatheringByteChannel out, int length)
1275                 throws IOException {
1276             return out.write(internalNioBuffer(index, length).duplicate());
1277         }
1278 
1279         @Override
1280         public int getBytes(int index, FileChannel out, long position, int length)
1281                 throws IOException {
1282             return out.write(internalNioBuffer(index, length).duplicate(), position);
1283         }
1284 
1285         @Override
1286         public int setBytes(int index, InputStream in, int length)
1287                 throws IOException {
1288             checkIndex(index, length);
1289             final AbstractByteBuf rootParent = rootParent();
1290             if (rootParent.hasArray()) {
1291                 return rootParent.setBytes(idx(index), in, length);
1292             }
1293             byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1294             int readBytes = in.read(tmp, 0, length);
1295             if (readBytes <= 0) {
1296                 return readBytes;
1297             }
1298             setBytes(index, tmp, 0, readBytes);
1299             return readBytes;
1300         }
1301 
1302         @Override
1303         public int setBytes(int index, ScatteringByteChannel in, int length)
1304                 throws IOException {
1305             try {
1306                 return in.read(internalNioBuffer(index, length).duplicate());
1307             } catch (ClosedChannelException ignored) {
1308                 return -1;
1309             }
1310         }
1311 
1312         @Override
1313         public int setBytes(int index, FileChannel in, long position, int length)
1314                 throws IOException {
1315             try {
1316                 return in.read(internalNioBuffer(index, length).duplicate(), position);
1317             } catch (ClosedChannelException ignored) {
1318                 return -1;
1319             }
1320         }
1321 
1322         @Override
1323         public int forEachByte(int index, int length, ByteProcessor processor) {
1324             checkIndex(index, length);
1325             int ret = rootParent().forEachByte(idx(index), length, processor);
1326             return forEachResult(ret);
1327         }
1328 
1329         @Override
1330         public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1331             checkIndex(index, length);
1332             int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1333             return forEachResult(ret);
1334         }
1335 
1336         private int forEachResult(int ret) {
1337             if (ret < adjustment) {
1338                 return -1;
1339             }
1340             return ret - adjustment;
1341         }
1342 
1343         @Override
1344         public boolean isContiguous() {
1345             return rootParent().isContiguous();
1346         }
1347 
1348         private int idx(int index) {
1349             return index + adjustment;
1350         }
1351 
1352         @Override
1353         protected void deallocate() {
1354             if (chunk != null) {
1355                 chunk.release();
1356             }
1357             tmpNioBuf = null;
1358             chunk = null;
1359             rootParent = null;
1360             if (handle instanceof EnhancedHandle) {
1361                 EnhancedHandle<AdaptiveByteBuf>  enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1362                 enhancedHandle.unguardedRecycle(this);
1363             } else {
1364                 handle.recycle(this);
1365             }
1366         }
1367     }
1368 
1369     /**
1370      * The strategy for how {@link AdaptivePoolingAllocator} should allocate chunk buffers.
1371      */
1372     interface ChunkAllocator {
1373         /**
1374          * Allocate a buffer for a chunk. This can be any kind of {@link AbstractByteBuf} implementation.
1375          * @param initialCapacity The initial capacity of the returned {@link AbstractByteBuf}.
1376          * @param maxCapacity The maximum capacity of the returned {@link AbstractByteBuf}.
1377          * @return The buffer that represents the chunk memory.
1378          */
1379         AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1380     }
1381 }