View Javadoc
1   /*
2    * Copyright 2022 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.buffer;
17  
18  import io.netty.util.ByteProcessor;
19  import io.netty.util.IllegalReferenceCountException;
20  import io.netty.util.NettyRuntime;
21  import io.netty.util.Recycler.EnhancedHandle;
22  import io.netty.util.ReferenceCounted;
23  import io.netty.util.concurrent.FastThreadLocal;
24  import io.netty.util.concurrent.FastThreadLocalThread;
25  import io.netty.util.internal.ObjectPool;
26  import io.netty.util.internal.ObjectUtil;
27  import io.netty.util.internal.PlatformDependent;
28  import io.netty.util.internal.ReferenceCountUpdater;
29  import io.netty.util.internal.SuppressJava6Requirement;
30  import io.netty.util.internal.SystemPropertyUtil;
31  import io.netty.util.internal.ThreadExecutorMap;
32  import io.netty.util.internal.ThreadLocalRandom;
33  import io.netty.util.internal.UnstableApi;
34  
35  import java.io.IOException;
36  import java.io.InputStream;
37  import java.io.OutputStream;
38  import java.nio.ByteBuffer;
39  import java.nio.ByteOrder;
40  import java.nio.channels.ClosedChannelException;
41  import java.nio.channels.FileChannel;
42  import java.nio.channels.GatheringByteChannel;
43  import java.nio.channels.ScatteringByteChannel;
44  import java.util.Arrays;
45  import java.util.Queue;
46  import java.util.Set;
47  import java.util.concurrent.ConcurrentLinkedQueue;
48  import java.util.concurrent.CopyOnWriteArraySet;
49  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
50  import java.util.concurrent.atomic.AtomicLong;
51  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
52  import java.util.concurrent.locks.StampedLock;
53  
54  /**
55   * An auto-tuning pooling allocator, that follows an anti-generational hypothesis.
56   * <p>
57   * The allocator is organized into a list of Magazines, and each magazine has a chunk-buffer that they allocate buffers
58   * from.
59   * <p>
60   * The magazines hold the mutexes that ensure the thread-safety of the allocator, and each thread picks a magazine
61   * based on the id of the thread. This spreads the contention of multi-threaded access across the magazines.
62   * If contention is detected above a certain threshold, the number of magazines are increased in response to the
63   * contention.
64   * <p>
65   * The magazines maintain histograms of the sizes of the allocations they do. The histograms are used to compute the
66   * preferred chunk size. The preferred chunk size is one that is big enough to service 10 allocations of the
67   * 99-percentile size. This way, the chunk size is adapted to the allocation patterns.
68   * <p>
69   * Computing the preferred chunk size is a somewhat expensive operation. Therefore, the frequency with which this is
70   * done, is also adapted to the allocation pattern. If a newly computed preferred chunk is the same as the previous
71   * preferred chunk size, then the frequency is reduced. Otherwise, the frequency is increased.
72   * <p>
73   * This allows the allocator to quickly respond to changes in the application workload,
74   * without suffering undue overhead from maintaining its statistics.
75   * <p>
76   * Since magazines are "relatively thread-local", the allocator has a central queue that allow excess chunks from any
77   * magazine, to be shared with other magazines.
78   * The {@link #createSharedChunkQueue()} method can be overridden to customize this queue.
79   */
80  @SuppressJava6Requirement(reason = "Guarded by version check")
81  @UnstableApi
82  final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.AdaptiveAllocatorApi {
83  
84      enum MagazineCaching {
85          EventLoopThreads,
86          FastThreadLocalThreads,
87          None
88      }
89  
90      /**
91       * The 128 KiB minimum chunk size is chosen to encourage the system allocator to delegate to mmap for chunk
92       * allocations. For instance, glibc will do this.
93       * This pushes any fragmentation from chunk size deviations off physical memory, onto virtual memory,
94       * which is a much, much larger space. Chunks are also allocated in whole multiples of the minimum
95       * chunk size, which itself is a whole multiple of popular page sizes like 4 KiB, 16 KiB, and 64 KiB.
96       */
97      private static final int MIN_CHUNK_SIZE = 128 * 1024;
98      private static final int EXPANSION_ATTEMPTS = 3;
99      private static final int INITIAL_MAGAZINES = 4;
100     private static final int RETIRE_CAPACITY = 4 * 1024;
101     private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
102     private static final int BUFS_PER_CHUNK = 10; // For large buffers, aim to have about this many buffers per chunk.
103 
104     /**
105      * The maximum size of a pooled chunk, in bytes. Allocations bigger than this will never be pooled.
106      * <p>
107      * This number is 10 MiB, and is derived from the limitations of internal histograms.
108      */
109     private static final int MAX_CHUNK_SIZE =
110             BUFS_PER_CHUNK * (1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT); // 10 MiB.
111 
112     /**
113      * The capacity if the central queue that allow chunks to be shared across magazines.
114      * The default size is {@link NettyRuntime#availableProcessors()},
115      * and the maximum number of magazines is twice this.
116      * <p>
117      * This means the maximum amount of memory that we can have allocated-but-not-in-use is
118      * 5 * {@link NettyRuntime#availableProcessors()} * {@link #MAX_CHUNK_SIZE} bytes.
119      */
120     private static final int CENTRAL_QUEUE_CAPACITY = Math.max(2, SystemPropertyUtil.getInt(
121             "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors()));
122 
123     /**
124      * The capacity if the magazine local buffer queue. This queue just pools the outer ByteBuf instance and not
125      * the actual memory and so helps to reduce GC pressure.
126      */
127     private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
128             "io.netty.allocator.magazineBufferQueueCapacity", 1024);
129 
130     private static final Object NO_MAGAZINE = Boolean.TRUE;
131 
132     private final ChunkAllocator chunkAllocator;
133     private final Queue<Chunk> centralQueue;
134     private final StampedLock magazineExpandLock;
135     private volatile Magazine[] magazines;
136     private final FastThreadLocal<Object> threadLocalMagazine;
137     private final Set<Magazine> liveCachedMagazines;
138     private volatile boolean freed;
139 
140     static {
141         if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
142             throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
143                     + " (expected: >= " + 2 + ')');
144         }
145     }
146 
147     AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
148         ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
149         ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
150         this.chunkAllocator = chunkAllocator;
151         centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
152         magazineExpandLock = new StampedLock();
153         if (magazineCaching != MagazineCaching.None) {
154             assert magazineCaching == MagazineCaching.EventLoopThreads ||
155                    magazineCaching == MagazineCaching.FastThreadLocalThreads;
156             final boolean cachedMagazinesNonEventLoopThreads =
157                     magazineCaching == MagazineCaching.FastThreadLocalThreads;
158             final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
159             threadLocalMagazine = new FastThreadLocal<Object>() {
160                 @Override
161                 protected Object initialValue() {
162                     if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
163                         if (!FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread())) {
164                             // To prevent potential leak, we will not use thread-local magazine.
165                             return NO_MAGAZINE;
166                         }
167                         Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
168                         liveMagazines.add(mag);
169                         return mag;
170                     }
171                     return NO_MAGAZINE;
172                 }
173 
174                 @Override
175                 protected void onRemoval(final Object value) throws Exception {
176                     if (value != NO_MAGAZINE) {
177                         liveMagazines.remove(value);
178                     }
179                 }
180             };
181             liveCachedMagazines = liveMagazines;
182         } else {
183             threadLocalMagazine = null;
184             liveCachedMagazines = null;
185         }
186         Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
187         for (int i = 0; i < mags.length; i++) {
188             mags[i] = new Magazine(this);
189         }
190         magazines = mags;
191     }
192 
193     /**
194      * Create a thread-safe multi-producer, multi-consumer queue to hold chunks that spill over from the
195      * internal Magazines.
196      * <p>
197      * Each Magazine can only hold two chunks at any one time: the chunk it currently allocates from,
198      * and the next-in-line chunk which will be used for allocation once the current one has been used up.
199      * This queue will be used by magazines to share any excess chunks they allocate, so that they don't need to
200      * allocate new chunks when their current and next-in-line chunks have both been used up.
201      * <p>
202      * The simplest implementation of this method is to return a new {@link ConcurrentLinkedQueue}.
203      * However, the {@code CLQ} is unbounded, and this means there's no limit to how many chunks can be cached in this
204      * queue.
205      * <p>
206      * Each chunk in this queue can be up to {@link #MAX_CHUNK_SIZE} in size, so it is recommended to use a bounded
207      * queue to limit the maximum memory usage.
208      * <p>
209      * The default implementation will create a bounded queue with a capacity of {@link #CENTRAL_QUEUE_CAPACITY}.
210      *
211      * @return A new multi-producer, multi-consumer queue.
212      */
213     private static Queue<Chunk> createSharedChunkQueue() {
214         return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
215     }
216 
217     @Override
218     public ByteBuf allocate(int size, int maxCapacity) {
219         return allocate(size, maxCapacity, Thread.currentThread(), null);
220     }
221 
222     private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
223         if (size <= MAX_CHUNK_SIZE) {
224             int sizeBucket = AllocationStatistics.sizeBucket(size); // Compute outside of Magazine lock for better ILP.
225             FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
226             if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
227                 Object mag = threadLocalMagazine.get();
228                 if (mag != NO_MAGAZINE) {
229                     Magazine magazine = (Magazine) mag;
230                     if (buf == null) {
231                         buf = magazine.newBuffer();
232                     }
233                     boolean allocated = magazine.tryAllocate(size, sizeBucket, maxCapacity, buf);
234                     assert allocated : "Allocation of threadLocalMagazine must always succeed";
235                     return buf;
236                 }
237             }
238             long threadId = currentThread.getId();
239             Magazine[] mags;
240             int expansions = 0;
241             do {
242                 mags = magazines;
243                 int mask = mags.length - 1;
244                 int index = (int) (threadId & mask);
245                 for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
246                     Magazine mag = mags[index + i & mask];
247                     if (buf == null) {
248                         buf = mag.newBuffer();
249                     }
250                     if (mag.tryAllocate(size, sizeBucket, maxCapacity, buf)) {
251                         // Was able to allocate.
252                         return buf;
253                     }
254                 }
255                 expansions++;
256             } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
257         }
258 
259         // The magazines failed us, or the buffer is too big to be pooled.
260         return allocateFallback(size, maxCapacity, currentThread, buf);
261     }
262 
263     private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
264         // If we don't already have a buffer, obtain one from the most conveniently available magazine.
265         Magazine magazine;
266         if (buf != null) {
267             Chunk chunk = buf.chunk;
268             if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
269                 magazine = getFallbackMagazine(currentThread);
270             }
271         } else {
272             magazine = getFallbackMagazine(currentThread);
273             buf = magazine.newBuffer();
274         }
275         // Create a one-off chunk for this allocation.
276         AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
277         Chunk chunk = new Chunk(innerChunk, magazine, false);
278         try {
279             chunk.readInitInto(buf, size, maxCapacity);
280         } finally {
281             // As the chunk is an one-off we need to always call release explicitly as readInitInto(...)
282             // will take care of retain once when successful. Once The AdaptiveByteBuf is released it will
283             // completely release the Chunk and so the contained innerChunk.
284             chunk.release();
285         }
286         return buf;
287     }
288 
289     private Magazine getFallbackMagazine(Thread currentThread) {
290         Object tlMag;
291         FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
292         if (threadLocalMagazine != null &&
293                 currentThread instanceof FastThreadLocalThread &&
294                 (tlMag = threadLocalMagazine.get()) != NO_MAGAZINE) {
295             return (Magazine) tlMag;
296         }
297         Magazine[] mags = magazines;
298         return mags[(int) currentThread.getId() & mags.length - 1];
299     }
300 
301     /**
302      * Allocate into the given buffer. Used by {@link AdaptiveByteBuf#capacity(int)}.
303      */
304     void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
305         AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
306         assert result == into: "Re-allocation created separate buffer instance";
307     }
308 
309     @Override
310     public long usedMemory() {
311         long sum = 0;
312         for (Chunk chunk : centralQueue) {
313             sum += chunk.capacity();
314         }
315         for (Magazine magazine : magazines) {
316             sum += magazine.usedMemory.get();
317         }
318         if (liveCachedMagazines != null) {
319             for (Magazine magazine : liveCachedMagazines) {
320                 sum += magazine.usedMemory.get();
321             }
322         }
323         return sum;
324     }
325 
326     private boolean tryExpandMagazines(int currentLength) {
327         if (currentLength >= MAX_STRIPES) {
328             return true;
329         }
330         final Magazine[] mags;
331         long writeLock = magazineExpandLock.tryWriteLock();
332         if (writeLock != 0) {
333             try {
334                 mags = magazines;
335                 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
336                     return true;
337                 }
338                 int preferredChunkSize = mags[0].sharedPrefChunkSize;
339                 Magazine[] expanded = new Magazine[mags.length * 2];
340                 for (int i = 0, l = expanded.length; i < l; i++) {
341                     Magazine m = new Magazine(this);
342                     m.localPrefChunkSize = preferredChunkSize;
343                     m.sharedPrefChunkSize = preferredChunkSize;
344                     expanded[i] = m;
345                 }
346                 magazines = expanded;
347             } finally {
348                 magazineExpandLock.unlockWrite(writeLock);
349             }
350             for (Magazine magazine : mags) {
351                 magazine.free();
352             }
353         }
354         return true;
355     }
356 
357     private boolean offerToQueue(Chunk buffer) {
358         if (freed) {
359             return false;
360         }
361         // The Buffer should not be used anymore, let's add an assert to so we guard against bugs in the future.
362         assert buffer.allocatedBytes == 0;
363         assert buffer.magazine == null;
364 
365         boolean isAdded = centralQueue.offer(buffer);
366         if (freed && isAdded) {
367             // Help to free the centralQueue.
368             freeCentralQueue();
369         }
370         return isAdded;
371     }
372 
373     // Ensure that we release all previous pooled resources when this object is finalized. This is needed as otherwise
374     // we might end up with leaks. While these leaks are usually harmless in reality it would still at least be
375     // very confusing for users.
376     @Override
377     protected void finalize() throws Throwable {
378         try {
379             super.finalize();
380         } finally {
381             free();
382         }
383     }
384 
385     private void free() {
386         freed = true;
387         long stamp = magazineExpandLock.writeLock();
388         try {
389             Magazine[] mags = magazines;
390             for (Magazine magazine : mags) {
391                 magazine.free();
392             }
393         } finally {
394             magazineExpandLock.unlockWrite(stamp);
395         }
396         freeCentralQueue();
397     }
398 
399     private void freeCentralQueue() {
400         for (;;) {
401             Chunk chunk = centralQueue.poll();
402             if (chunk == null) {
403                 break;
404             }
405             chunk.release();
406         }
407     }
408 
409     static int sizeBucket(int size) {
410         return AllocationStatistics.sizeBucket(size);
411     }
412 
413     @SuppressWarnings("checkstyle:finalclass") // Checkstyle mistakenly believes this class should be final.
414     private static class AllocationStatistics {
415         private static final int MIN_DATUM_TARGET = 1024;
416         private static final int MAX_DATUM_TARGET = 65534;
417         private static final int INIT_DATUM_TARGET = 9;
418         private static final int HISTO_MIN_BUCKET_SHIFT = 13; // Smallest bucket is 1 << 13 = 8192 bytes in size.
419         private static final int HISTO_MAX_BUCKET_SHIFT = 20; // Biggest bucket is 1 << 20 = 1 MiB bytes in size.
420         private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT; // 8 buckets.
421         private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
422         private static final int SIZE_MAX_MASK = MAX_CHUNK_SIZE - 1;
423 
424         protected final AdaptivePoolingAllocator parent;
425         private final boolean shareable;
426         private final short[][] histos = {
427                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
428                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
429         };
430         private short[] histo = histos[0];
431         private final int[] sums = new int[HISTO_BUCKET_COUNT];
432 
433         private int histoIndex;
434         private int datumCount;
435         private int datumTarget = INIT_DATUM_TARGET;
436         protected volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
437         protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
438 
439         private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
440             this.parent = parent;
441             this.shareable = shareable;
442         }
443 
444         protected void recordAllocationSize(int bucket) {
445             histo[bucket]++;
446             if (datumCount++ == datumTarget) {
447                 rotateHistograms();
448             }
449         }
450 
451         static int sizeBucket(int size) {
452             if (size == 0) {
453                 return 0;
454             }
455             // Minimum chunk size is 128 KiB. We'll only make bigger chunks if the 99-percentile is 16 KiB or greater,
456             // so we truncate and roll up the bottom part of the histogram to 8 KiB.
457             // The upper size band is 1 MiB, and that gives us exactly 8 size buckets,
458             // which is a magical number for JIT optimisations.
459             int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & SIZE_MAX_MASK;
460             return Math.min(Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize), HISTO_MAX_BUCKET_MASK);
461         }
462 
463         private void rotateHistograms() {
464             short[][] hs = histos;
465             for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
466                 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
467             }
468             int sum = 0;
469             for (int count : sums) {
470                 sum  += count;
471             }
472             int targetPercentile = (int) (sum * 0.99);
473             int sizeBucket = 0;
474             for (; sizeBucket < sums.length; sizeBucket++) {
475                 if (sums[sizeBucket] > targetPercentile) {
476                     break;
477                 }
478                 targetPercentile -= sums[sizeBucket];
479             }
480             int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
481             int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
482             localPrefChunkSize = prefChunkSize;
483             if (shareable) {
484                 for (Magazine mag : parent.magazines) {
485                     prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
486                 }
487             }
488             if (sharedPrefChunkSize != prefChunkSize) {
489                 // Preferred chunk size changed. Increase check frequency.
490                 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
491                 sharedPrefChunkSize = prefChunkSize;
492             } else {
493                 // Preferred chunk size did not change. Check less often.
494                 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
495             }
496 
497             histoIndex = histoIndex + 1 & 3;
498             histo = histos[histoIndex];
499             datumCount = 0;
500             Arrays.fill(histo, (short) 0);
501         }
502 
503         /**
504          * Get the preferred chunk size, based on statistics from the {@linkplain #recordAllocationSize(int) recorded}
505          * allocation sizes.
506          * <p>
507          * This method must be thread-safe.
508          *
509          * @return The currently preferred chunk allocation size.
510          */
511         protected int preferredChunkSize() {
512             return sharedPrefChunkSize;
513         }
514     }
515 
516     @SuppressJava6Requirement(reason = "Guarded by version check")
517     private static final class Magazine extends AllocationStatistics {
518         private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
519         static {
520             NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
521         }
522         private static final Chunk MAGAZINE_FREED = new Chunk();
523 
524         private static final ObjectPool<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
525                 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
526                     @Override
527                     public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
528                         return new AdaptiveByteBuf(handle);
529                     }
530                 });
531 
532         private Chunk current;
533         @SuppressWarnings("unused") // updated via NEXT_IN_LINE
534         private volatile Chunk nextInLine;
535         private final AtomicLong usedMemory;
536         private final StampedLock allocationLock;
537         private final Queue<AdaptiveByteBuf> bufferQueue;
538         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
539 
540         Magazine(AdaptivePoolingAllocator parent) {
541             this(parent, true);
542         }
543 
544         Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
545             super(parent, shareable);
546 
547             if (shareable) {
548                 // We only need the StampedLock if this Magazine will be shared across threads.
549                 allocationLock = new StampedLock();
550                 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
551                 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
552                     @Override
553                     public void recycle(AdaptiveByteBuf self) {
554                         bufferQueue.offer(self);
555                     }
556                 };
557             } else {
558                 allocationLock = null;
559                 bufferQueue = null;
560                 handle = null;
561             }
562             usedMemory = new AtomicLong();
563         }
564 
565         public boolean tryAllocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
566             if (allocationLock == null) {
567                 // This magazine is not shared across threads, just allocate directly.
568                 return allocate(size, sizeBucket, maxCapacity, buf);
569             }
570 
571             // Try to retrieve the lock and if successful allocate.
572             long writeLock = allocationLock.tryWriteLock();
573             if (writeLock != 0) {
574                 try {
575                     return allocate(size, sizeBucket, maxCapacity, buf);
576                 } finally {
577                     allocationLock.unlockWrite(writeLock);
578                 }
579             }
580             return allocateWithoutLock(size, maxCapacity, buf);
581         }
582 
583         private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf buf) {
584             Chunk curr = NEXT_IN_LINE.getAndSet(this, null);
585             if (curr == MAGAZINE_FREED) {
586                 // Allocation raced with a stripe-resize that freed this magazine.
587                 restoreMagazineFreed();
588                 return false;
589             }
590             if (curr == null) {
591                 curr = parent.centralQueue.poll();
592                 if (curr == null) {
593                     return false;
594                 }
595                 curr.attachToMagazine(this);
596             }
597             boolean allocated = false;
598             if (curr.remainingCapacity() >= size) {
599                 curr.readInitInto(buf, size, maxCapacity);
600                 allocated = true;
601             }
602             try {
603                 if (curr.remainingCapacity() >= RETIRE_CAPACITY) {
604                     transferToNextInLineOrRelease(curr);
605                     curr = null;
606                 }
607             } finally {
608                 if (curr != null) {
609                     curr.release();
610                 }
611             }
612             return allocated;
613         }
614 
615         private boolean allocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
616             recordAllocationSize(sizeBucket);
617             Chunk curr = current;
618             if (curr != null) {
619                 // We have a Chunk that has some space left.
620                 if (curr.remainingCapacity() > size) {
621                     curr.readInitInto(buf, size, maxCapacity);
622                     // We still have some bytes left that we can use for the next allocation, just early return.
623                     return true;
624                 }
625 
626                 // At this point we know that this will be the last time current will be used, so directly set it to
627                 // null and release it once we are done.
628                 current = null;
629                 if (curr.remainingCapacity() == size) {
630                     try {
631                         curr.readInitInto(buf, size, maxCapacity);
632                         return true;
633                     } finally {
634                         curr.release();
635                     }
636                 }
637 
638                 // Check if we either retain the chunk in the nextInLine cache or releasing it.
639                 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
640                     curr.release();
641                 } else {
642                     // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
643                     // This method will release curr if this is not the case
644                     transferToNextInLineOrRelease(curr);
645                 }
646             }
647 
648             assert current == null;
649             // The fast-path for allocations did not work.
650             //
651             // Try to fetch the next "Magazine local" Chunk first, if this this fails as we don't have
652             // one setup we will poll our centralQueue. If this fails as well we will just allocate a new Chunk.
653             //
654             // In any case we will store the Chunk as the current so it will be used again for the next allocation and
655             // so be "reserved" by this Magazine for exclusive usage.
656             curr = NEXT_IN_LINE.getAndSet(this, null);
657             if (curr != null) {
658                 if (curr == MAGAZINE_FREED) {
659                     // Allocation raced with a stripe-resize that freed this magazine.
660                     restoreMagazineFreed();
661                     return false;
662                 }
663 
664                 if (curr.remainingCapacity() > size) {
665                     // We have a Chunk that has some space left.
666                     curr.readInitInto(buf, size, maxCapacity);
667                     current = curr;
668                     return true;
669                 }
670 
671                 if (curr.remainingCapacity() == size) {
672                     // At this point we know that this will be the last time curr will be used, so directly set it to
673                     // null and release it once we are done.
674                     try {
675                         curr.readInitInto(buf, size, maxCapacity);
676                         return true;
677                     } finally {
678                         // Release in a finally block so even if readInitInto(...) would throw we would still correctly
679                         // release the current chunk before null it out.
680                         curr.release();
681                     }
682                 } else {
683                     // Release it as it's too small.
684                     curr.release();
685                 }
686             }
687 
688             // Now try to poll from the central queue first
689             curr = parent.centralQueue.poll();
690             if (curr == null) {
691                 curr = newChunkAllocation(size);
692             } else {
693                 curr.attachToMagazine(this);
694 
695                 if (curr.remainingCapacity() < size) {
696                     // Check if we either retain the chunk in the nextInLine cache or releasing it.
697                     if (curr.remainingCapacity() < RETIRE_CAPACITY) {
698                         curr.release();
699                     } else {
700                         // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
701                         // This method will release curr if this is not the case
702                         transferToNextInLineOrRelease(curr);
703                     }
704                     curr = newChunkAllocation(size);
705                 }
706             }
707 
708             current = curr;
709             try {
710                 assert current.remainingCapacity() >= size;
711                 if (curr.remainingCapacity() > size) {
712                     curr.readInitInto(buf, size, maxCapacity);
713                     curr = null;
714                 } else {
715                     curr.readInitInto(buf, size, maxCapacity);
716                 }
717             } finally {
718                 if (curr != null) {
719                     // Release in a finally block so even if readInitInto(...) would throw we would still correctly
720                     // release the current chunk before null it out.
721                     curr.release();
722                     current = null;
723                 }
724             }
725             return true;
726         }
727 
728         private void restoreMagazineFreed() {
729             Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
730             if (next != null && next != MAGAZINE_FREED) {
731                 next.release(); // A chunk snuck in through a race. Release it after restoring MAGAZINE_FREED state.
732             }
733         }
734 
735         private void transferToNextInLineOrRelease(Chunk chunk) {
736             if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
737                 return;
738             }
739 
740             Chunk nextChunk = NEXT_IN_LINE.get(this);
741             if (nextChunk != null && nextChunk != MAGAZINE_FREED
742                     && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
743                 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
744                     nextChunk.release();
745                     return;
746                 }
747             }
748             // Next-in-line is occupied. We don't try to add it to the central queue yet as it might still be used
749             // by some buffers and so is attached to a Magazine.
750             // Once a Chunk is completely released by Chunk.release() it will try to move itself to the queue
751             // as last resort.
752             chunk.release();
753         }
754 
755         private Chunk newChunkAllocation(int promptingSize) {
756             int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
757             int minChunks = size / MIN_CHUNK_SIZE;
758             if (MIN_CHUNK_SIZE * minChunks < size) {
759                 // Round up to nearest whole MIN_CHUNK_SIZE unit. The MIN_CHUNK_SIZE is an even multiple of many
760                 // popular small page sizes, like 4k, 16k, and 64k, which makes it easier for the system allocator
761                 // to manage the memory in terms of whole pages. This reduces memory fragmentation,
762                 // but without the potentially high overhead that power-of-2 chunk sizes would bring.
763                 size = MIN_CHUNK_SIZE * (1 + minChunks);
764             }
765             ChunkAllocator chunkAllocator = parent.chunkAllocator;
766             return new Chunk(chunkAllocator.allocate(size, size), this, true);
767         }
768 
769         boolean trySetNextInLine(Chunk chunk) {
770             return NEXT_IN_LINE.compareAndSet(this, null, chunk);
771         }
772 
773         void free() {
774             // Release the current Chunk and the next that was stored for later usage.
775             restoreMagazineFreed();
776             long stamp = allocationLock.writeLock();
777             try {
778                 if (current != null) {
779                     current.release();
780                     current = null;
781                 }
782             } finally {
783                 allocationLock.unlockWrite(stamp);
784             }
785         }
786 
787         public AdaptiveByteBuf newBuffer() {
788             AdaptiveByteBuf buf;
789             if (handle == null) {
790                 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
791             } else {
792                 buf = bufferQueue.poll();
793                 if (buf == null) {
794                     buf = new AdaptiveByteBuf(handle);
795                 }
796             }
797             buf.resetRefCnt();
798             buf.discardMarks();
799             return buf;
800         }
801     }
802 
803     private static final class Chunk implements ReferenceCounted {
804 
805         private final AbstractByteBuf delegate;
806         private Magazine magazine;
807         private final AdaptivePoolingAllocator allocator;
808         private final int capacity;
809         private final boolean pooled;
810         private int allocatedBytes;
811         private static final long REFCNT_FIELD_OFFSET =
812                 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
813         private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
814                 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
815 
816         private static final ReferenceCountUpdater<Chunk> updater =
817                 new ReferenceCountUpdater<Chunk>() {
818                     @Override
819                     protected AtomicIntegerFieldUpdater<Chunk> updater() {
820                         return AIF_UPDATER;
821                     }
822                     @Override
823                     protected long unsafeOffset() {
824                         return REFCNT_FIELD_OFFSET;
825                     }
826                 };
827 
828         // Value might not equal "real" reference count, all access should be via the updater
829         @SuppressWarnings({"unused", "FieldMayBeFinal"})
830         private volatile int refCnt;
831 
832         Chunk() {
833             // Constructor only used by the MAGAZINE_FREED sentinel.
834             delegate = null;
835             magazine = null;
836             allocator = null;
837             capacity = 0;
838             pooled = false;
839         }
840 
841         Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
842             this.delegate = delegate;
843             this.pooled = pooled;
844             capacity = delegate.capacity();
845             updater.setInitialValue(this);
846             allocator = magazine.parent;
847             attachToMagazine(magazine);
848         }
849 
850         Magazine currentMagazine()  {
851             return magazine;
852         }
853 
854         void detachFromMagazine() {
855             if (magazine != null) {
856                 magazine.usedMemory.getAndAdd(-capacity);
857                 magazine = null;
858             }
859         }
860 
861         void attachToMagazine(Magazine magazine) {
862             assert this.magazine == null;
863             this.magazine = magazine;
864             magazine.usedMemory.getAndAdd(capacity);
865         }
866 
867         @Override
868         public Chunk touch(Object hint) {
869             return this;
870         }
871 
872         @Override
873         public int refCnt() {
874             return updater.refCnt(this);
875         }
876 
877         @Override
878         public Chunk retain() {
879             return updater.retain(this);
880         }
881 
882         @Override
883         public Chunk retain(int increment) {
884             return updater.retain(this, increment);
885         }
886 
887         @Override
888         public Chunk touch() {
889             return this;
890         }
891 
892         @Override
893         public boolean release() {
894             if (updater.release(this)) {
895                 deallocate();
896                 return true;
897             }
898             return false;
899         }
900 
901         @Override
902         public boolean release(int decrement) {
903             if (updater.release(this, decrement)) {
904                 deallocate();
905                 return true;
906             }
907             return false;
908         }
909 
910         private void deallocate() {
911             Magazine mag = magazine;
912             AdaptivePoolingAllocator parent = mag.parent;
913             int chunkSize = mag.preferredChunkSize();
914             int memSize = delegate.capacity();
915             if (!pooled || shouldReleaseSuboptimalChunkSize(memSize, chunkSize)) {
916                 // Drop the chunk if the parent allocator is closed,
917                 // or if the chunk deviates too much from the preferred chunk size.
918                 detachFromMagazine();
919                 delegate.release();
920             } else {
921                 updater.resetRefCnt(this);
922                 delegate.setIndex(0, 0);
923                 allocatedBytes = 0;
924                 if (!mag.trySetNextInLine(this)) {
925                     // As this Chunk does not belong to the mag anymore we need to decrease the used memory .
926                     detachFromMagazine();
927                     if (!parent.offerToQueue(this)) {
928                         // The central queue is full. Ensure we release again as we previously did use resetRefCnt()
929                         // which did increase the reference count by 1.
930                         boolean released = updater.release(this);
931                         delegate.release();
932                         assert released;
933                     }
934                 }
935             }
936         }
937 
938         private static boolean shouldReleaseSuboptimalChunkSize(int givenSize, int preferredSize) {
939             int givenChunks = givenSize / MIN_CHUNK_SIZE;
940             int preferredChunks = preferredSize / MIN_CHUNK_SIZE;
941             int deviation = Math.abs(givenChunks - preferredChunks);
942 
943             // Retire chunks with a 5% probability per unit of MIN_CHUNK_SIZE deviation from preference.
944             return deviation != 0 &&
945                     PlatformDependent.threadLocalRandom().nextDouble() * 20.0 < deviation;
946         }
947 
948         public void readInitInto(AdaptiveByteBuf buf, int size, int maxCapacity) {
949             int startIndex = allocatedBytes;
950             allocatedBytes = startIndex + size;
951             Chunk chunk = this;
952             chunk.retain();
953             try {
954                 buf.init(delegate, chunk, 0, 0, startIndex, size, maxCapacity);
955                 chunk = null;
956             } finally {
957                 if (chunk != null) {
958                     // If chunk is not null we know that buf.init(...) failed and so we need to manually release
959                     // the chunk again as we retained it before calling buf.init(...). Beside this we also need to
960                     // restore the old allocatedBytes value.
961                     allocatedBytes = startIndex;
962                     chunk.release();
963                 }
964             }
965         }
966 
967         public int remainingCapacity() {
968             return capacity - allocatedBytes;
969         }
970 
971         public int capacity() {
972             return capacity;
973         }
974     }
975 
976     static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
977 
978         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
979 
980         private int adjustment;
981         private AbstractByteBuf rootParent;
982         Chunk chunk;
983         private int length;
984         private ByteBuffer tmpNioBuf;
985         private boolean hasArray;
986         private boolean hasMemoryAddress;
987 
988         AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
989             super(0);
990             handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
991         }
992 
993         void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
994                   int adjustment, int capacity, int maxCapacity) {
995             this.adjustment = adjustment;
996             chunk = wrapped;
997             length = capacity;
998             maxCapacity(maxCapacity);
999             setIndex0(readerIndex, writerIndex);
1000             hasArray = unwrapped.hasArray();
1001             hasMemoryAddress = unwrapped.hasMemoryAddress();
1002             rootParent = unwrapped;
1003             tmpNioBuf = unwrapped.internalNioBuffer(adjustment, capacity).slice();
1004         }
1005 
1006         private AbstractByteBuf rootParent() {
1007             final AbstractByteBuf rootParent = this.rootParent;
1008             if (rootParent != null) {
1009                 return rootParent;
1010             }
1011             throw new IllegalReferenceCountException();
1012         }
1013 
1014         @Override
1015         public int capacity() {
1016             return length;
1017         }
1018 
1019         @Override
1020         public ByteBuf capacity(int newCapacity) {
1021             if (newCapacity == capacity()) {
1022                 ensureAccessible();
1023                 return this;
1024             }
1025             checkNewCapacity(newCapacity);
1026             if (newCapacity < capacity()) {
1027                 length = newCapacity;
1028                 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
1029                 return this;
1030             }
1031 
1032             // Reallocation required.
1033             ByteBuffer data = tmpNioBuf;
1034             data.clear();
1035             tmpNioBuf = null;
1036             Chunk chunk = this.chunk;
1037             AdaptivePoolingAllocator allocator = chunk.allocator;
1038             int readerIndex = this.readerIndex;
1039             int writerIndex = this.writerIndex;
1040             allocator.allocate(newCapacity, maxCapacity(), this);
1041             tmpNioBuf.put(data);
1042             tmpNioBuf.clear();
1043             chunk.release();
1044             this.readerIndex = readerIndex;
1045             this.writerIndex = writerIndex;
1046             return this;
1047         }
1048 
1049         @Override
1050         public ByteBufAllocator alloc() {
1051             return rootParent().alloc();
1052         }
1053 
1054         @Override
1055         public ByteOrder order() {
1056             return rootParent().order();
1057         }
1058 
1059         @Override
1060         public ByteBuf unwrap() {
1061             return null;
1062         }
1063 
1064         @Override
1065         public boolean isDirect() {
1066             return rootParent().isDirect();
1067         }
1068 
1069         @Override
1070         public int arrayOffset() {
1071             return idx(rootParent().arrayOffset());
1072         }
1073 
1074         @Override
1075         public boolean hasMemoryAddress() {
1076             return hasMemoryAddress;
1077         }
1078 
1079         @Override
1080         public long memoryAddress() {
1081             ensureAccessible();
1082             return rootParent().memoryAddress() + adjustment;
1083         }
1084 
1085         @Override
1086         public ByteBuffer nioBuffer(int index, int length) {
1087             checkIndex(index, length);
1088             return rootParent().nioBuffer(idx(index), length);
1089         }
1090 
1091         @Override
1092         public ByteBuffer internalNioBuffer(int index, int length) {
1093             checkIndex(index, length);
1094             return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1095         }
1096 
1097         private ByteBuffer internalNioBuffer() {
1098             return (ByteBuffer) tmpNioBuf.clear();
1099         }
1100 
1101         @Override
1102         public ByteBuffer[] nioBuffers(int index, int length) {
1103             checkIndex(index, length);
1104             return rootParent().nioBuffers(idx(index), length);
1105         }
1106 
1107         @Override
1108         public boolean hasArray() {
1109             return hasArray;
1110         }
1111 
1112         @Override
1113         public byte[] array() {
1114             ensureAccessible();
1115             return rootParent().array();
1116         }
1117 
1118         @Override
1119         public ByteBuf copy(int index, int length) {
1120             checkIndex(index, length);
1121             return rootParent().copy(idx(index), length);
1122         }
1123 
1124         @Override
1125         public int nioBufferCount() {
1126             return rootParent().nioBufferCount();
1127         }
1128 
1129         @Override
1130         protected byte _getByte(int index) {
1131             return rootParent()._getByte(idx(index));
1132         }
1133 
1134         @Override
1135         protected short _getShort(int index) {
1136             return rootParent()._getShort(idx(index));
1137         }
1138 
1139         @Override
1140         protected short _getShortLE(int index) {
1141             return rootParent()._getShortLE(idx(index));
1142         }
1143 
1144         @Override
1145         protected int _getUnsignedMedium(int index) {
1146             return rootParent()._getUnsignedMedium(idx(index));
1147         }
1148 
1149         @Override
1150         protected int _getUnsignedMediumLE(int index) {
1151             return rootParent()._getUnsignedMediumLE(idx(index));
1152         }
1153 
1154         @Override
1155         protected int _getInt(int index) {
1156             return rootParent()._getInt(idx(index));
1157         }
1158 
1159         @Override
1160         protected int _getIntLE(int index) {
1161             return rootParent()._getIntLE(idx(index));
1162         }
1163 
1164         @Override
1165         protected long _getLong(int index) {
1166             return rootParent()._getLong(idx(index));
1167         }
1168 
1169         @Override
1170         protected long _getLongLE(int index) {
1171             return rootParent()._getLongLE(idx(index));
1172         }
1173 
1174         @Override
1175         public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1176             checkIndex(index, length);
1177             rootParent().getBytes(idx(index), dst, dstIndex, length);
1178             return this;
1179         }
1180 
1181         @Override
1182         public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1183             checkIndex(index, length);
1184             rootParent().getBytes(idx(index), dst, dstIndex, length);
1185             return this;
1186         }
1187 
1188         @Override
1189         public ByteBuf getBytes(int index, ByteBuffer dst) {
1190             checkIndex(index, dst.remaining());
1191             rootParent().getBytes(idx(index), dst);
1192             return this;
1193         }
1194 
1195         @Override
1196         protected void _setByte(int index, int value) {
1197             rootParent()._setByte(idx(index), value);
1198         }
1199 
1200         @Override
1201         protected void _setShort(int index, int value) {
1202             rootParent()._setShort(idx(index), value);
1203         }
1204 
1205         @Override
1206         protected void _setShortLE(int index, int value) {
1207             rootParent()._setShortLE(idx(index), value);
1208         }
1209 
1210         @Override
1211         protected void _setMedium(int index, int value) {
1212             rootParent()._setMedium(idx(index), value);
1213         }
1214 
1215         @Override
1216         protected void _setMediumLE(int index, int value) {
1217             rootParent()._setMediumLE(idx(index), value);
1218         }
1219 
1220         @Override
1221         protected void _setInt(int index, int value) {
1222             rootParent()._setInt(idx(index), value);
1223         }
1224 
1225         @Override
1226         protected void _setIntLE(int index, int value) {
1227             rootParent()._setIntLE(idx(index), value);
1228         }
1229 
1230         @Override
1231         protected void _setLong(int index, long value) {
1232             rootParent()._setLong(idx(index), value);
1233         }
1234 
1235         @Override
1236         protected void _setLongLE(int index, long value) {
1237             rootParent().setLongLE(idx(index), value);
1238         }
1239 
1240         @Override
1241         public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1242             checkIndex(index, length);
1243             rootParent().setBytes(idx(index), src, srcIndex, length);
1244             return this;
1245         }
1246 
1247         @Override
1248         public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1249             checkIndex(index, length);
1250             rootParent().setBytes(idx(index), src, srcIndex, length);
1251             return this;
1252         }
1253 
1254         @Override
1255         public ByteBuf setBytes(int index, ByteBuffer src) {
1256             checkIndex(index, src.remaining());
1257             rootParent().setBytes(idx(index), src);
1258             return this;
1259         }
1260 
1261         @Override
1262         public ByteBuf getBytes(int index, OutputStream out, int length)
1263                 throws IOException {
1264             checkIndex(index, length);
1265             if (length != 0) {
1266                 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1267             }
1268             return this;
1269         }
1270 
1271         @Override
1272         public int getBytes(int index, GatheringByteChannel out, int length)
1273                 throws IOException {
1274             return out.write(internalNioBuffer(index, length).duplicate());
1275         }
1276 
1277         @Override
1278         public int getBytes(int index, FileChannel out, long position, int length)
1279                 throws IOException {
1280             return out.write(internalNioBuffer(index, length).duplicate(), position);
1281         }
1282 
1283         @Override
1284         public int setBytes(int index, InputStream in, int length)
1285                 throws IOException {
1286             checkIndex(index, length);
1287             final AbstractByteBuf rootParent = rootParent();
1288             if (rootParent.hasArray()) {
1289                 return rootParent.setBytes(idx(index), in, length);
1290             }
1291             byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1292             int readBytes = in.read(tmp, 0, length);
1293             if (readBytes <= 0) {
1294                 return readBytes;
1295             }
1296             setBytes(index, tmp, 0, readBytes);
1297             return readBytes;
1298         }
1299 
1300         @Override
1301         public int setBytes(int index, ScatteringByteChannel in, int length)
1302                 throws IOException {
1303             try {
1304                 return in.read(internalNioBuffer(index, length).duplicate());
1305             } catch (ClosedChannelException ignored) {
1306                 return -1;
1307             }
1308         }
1309 
1310         @Override
1311         public int setBytes(int index, FileChannel in, long position, int length)
1312                 throws IOException {
1313             try {
1314                 return in.read(internalNioBuffer(index, length).duplicate(), position);
1315             } catch (ClosedChannelException ignored) {
1316                 return -1;
1317             }
1318         }
1319 
1320         @Override
1321         public int forEachByte(int index, int length, ByteProcessor processor) {
1322             checkIndex(index, length);
1323             int ret = rootParent().forEachByte(idx(index), length, processor);
1324             return forEachResult(ret);
1325         }
1326 
1327         @Override
1328         public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1329             checkIndex(index, length);
1330             int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1331             return forEachResult(ret);
1332         }
1333 
1334         private int forEachResult(int ret) {
1335             if (ret < adjustment) {
1336                 return -1;
1337             }
1338             return ret - adjustment;
1339         }
1340 
1341         @Override
1342         public boolean isContiguous() {
1343             return rootParent().isContiguous();
1344         }
1345 
1346         private int idx(int index) {
1347             return index + adjustment;
1348         }
1349 
1350         @Override
1351         protected void deallocate() {
1352             if (chunk != null) {
1353                 chunk.release();
1354             }
1355             tmpNioBuf = null;
1356             chunk = null;
1357             rootParent = null;
1358             if (handle instanceof EnhancedHandle) {
1359                 EnhancedHandle<AdaptiveByteBuf>  enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1360                 enhancedHandle.unguardedRecycle(this);
1361             } else {
1362                 handle.recycle(this);
1363             }
1364         }
1365     }
1366 
1367     /**
1368      * The strategy for how {@link AdaptivePoolingAllocator} should allocate chunk buffers.
1369      */
1370     interface ChunkAllocator {
1371         /**
1372          * Allocate a buffer for a chunk. This can be any kind of {@link AbstractByteBuf} implementation.
1373          * @param initialCapacity The initial capacity of the returned {@link AbstractByteBuf}.
1374          * @param maxCapacity The maximum capacity of the returned {@link AbstractByteBuf}.
1375          * @return The buffer that represents the chunk memory.
1376          */
1377         AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1378     }
1379 }