View Javadoc
1   /*
2    * Copyright 2022 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.buffer;
17  
18  import io.netty.util.ByteProcessor;
19  import io.netty.util.IllegalReferenceCountException;
20  import io.netty.util.NettyRuntime;
21  import io.netty.util.Recycler.EnhancedHandle;
22  import io.netty.util.ReferenceCounted;
23  import io.netty.util.concurrent.FastThreadLocal;
24  import io.netty.util.concurrent.FastThreadLocalThread;
25  import io.netty.util.internal.ObjectPool;
26  import io.netty.util.internal.ObjectUtil;
27  import io.netty.util.internal.PlatformDependent;
28  import io.netty.util.internal.ReferenceCountUpdater;
29  import io.netty.util.internal.SystemPropertyUtil;
30  import io.netty.util.internal.ThreadExecutorMap;
31  import io.netty.util.internal.UnstableApi;
32  
33  import java.io.IOException;
34  import java.io.InputStream;
35  import java.io.OutputStream;
36  import java.nio.ByteBuffer;
37  import java.nio.ByteOrder;
38  import java.nio.channels.ClosedChannelException;
39  import java.nio.channels.FileChannel;
40  import java.nio.channels.GatheringByteChannel;
41  import java.nio.channels.ScatteringByteChannel;
42  import java.util.Arrays;
43  import java.util.Queue;
44  import java.util.Set;
45  import java.util.concurrent.ConcurrentLinkedQueue;
46  import java.util.concurrent.CopyOnWriteArraySet;
47  import java.util.concurrent.ThreadLocalRandom;
48  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
49  import java.util.concurrent.atomic.AtomicLong;
50  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
51  import java.util.concurrent.locks.StampedLock;
52  
53  /**
54   * An auto-tuning pooling allocator, that follows an anti-generational hypothesis.
55   * <p>
56   * The allocator is organized into a list of Magazines, and each magazine has a chunk-buffer that they allocate buffers
57   * from.
58   * <p>
59   * The magazines hold the mutexes that ensure the thread-safety of the allocator, and each thread picks a magazine
60   * based on the id of the thread. This spreads the contention of multi-threaded access across the magazines.
61   * If contention is detected above a certain threshold, the number of magazines are increased in response to the
62   * contention.
63   * <p>
64   * The magazines maintain histograms of the sizes of the allocations they do. The histograms are used to compute the
65   * preferred chunk size. The preferred chunk size is one that is big enough to service 10 allocations of the
66   * 99-percentile size. This way, the chunk size is adapted to the allocation patterns.
67   * <p>
68   * Computing the preferred chunk size is a somewhat expensive operation. Therefore, the frequency with which this is
69   * done, is also adapted to the allocation pattern. If a newly computed preferred chunk is the same as the previous
70   * preferred chunk size, then the frequency is reduced. Otherwise, the frequency is increased.
71   * <p>
72   * This allows the allocator to quickly respond to changes in the application workload,
73   * without suffering undue overhead from maintaining its statistics.
74   * <p>
75   * Since magazines are "relatively thread-local", the allocator has a central queue that allow excess chunks from any
76   * magazine, to be shared with other magazines.
77   * The {@link #createSharedChunkQueue()} method can be overridden to customize this queue.
78   */
79  @UnstableApi
80  final class AdaptivePoolingAllocator {
81  
82      enum MagazineCaching {
83          EventLoopThreads,
84          FastThreadLocalThreads,
85          None
86      }
87  
88      /**
89       * The 128 KiB minimum chunk size is chosen to encourage the system allocator to delegate to mmap for chunk
90       * allocations. For instance, glibc will do this.
91       * This pushes any fragmentation from chunk size deviations off physical memory, onto virtual memory,
92       * which is a much, much larger space. Chunks are also allocated in whole multiples of the minimum
93       * chunk size, which itself is a whole multiple of popular page sizes like 4 KiB, 16 KiB, and 64 KiB.
94       */
95      private static final int MIN_CHUNK_SIZE = 128 * 1024;
96      private static final int EXPANSION_ATTEMPTS = 3;
97      private static final int INITIAL_MAGAZINES = 4;
98      private static final int RETIRE_CAPACITY = 4 * 1024;
99      private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
100     private static final int BUFS_PER_CHUNK = 10; // For large buffers, aim to have about this many buffers per chunk.
101 
102     /**
103      * The maximum size of a pooled chunk, in bytes. Allocations bigger than this will never be pooled.
104      * <p>
105      * This number is 10 MiB, and is derived from the limitations of internal histograms.
106      */
107     private static final int MAX_CHUNK_SIZE =
108             BUFS_PER_CHUNK * (1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT); // 10 MiB.
109 
110     /**
111      * The capacity if the central queue that allow chunks to be shared across magazines.
112      * The default size is {@link NettyRuntime#availableProcessors()},
113      * and the maximum number of magazines is twice this.
114      * <p>
115      * This means the maximum amount of memory that we can have allocated-but-not-in-use is
116      * 5 * {@link NettyRuntime#availableProcessors()} * {@link #MAX_CHUNK_SIZE} bytes.
117      */
118     private static final int CENTRAL_QUEUE_CAPACITY = Math.max(2, SystemPropertyUtil.getInt(
119             "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors()));
120 
121     /**
122      * The capacity if the magazine local buffer queue. This queue just pools the outer ByteBuf instance and not
123      * the actual memory and so helps to reduce GC pressure.
124      */
125     private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
126             "io.netty.allocator.magazineBufferQueueCapacity", 1024);
127 
128     private static final Object NO_MAGAZINE = Boolean.TRUE;
129 
130     private final ChunkAllocator chunkAllocator;
131     private final Queue<Chunk> centralQueue;
132     private final StampedLock magazineExpandLock;
133     private volatile Magazine[] magazines;
134     private final FastThreadLocal<Object> threadLocalMagazine;
135     private final Set<Magazine> liveCachedMagazines;
136     private volatile boolean freed;
137 
138     static {
139         if (CENTRAL_QUEUE_CAPACITY < 2) {
140             throw new IllegalArgumentException("CENTRAL_QUEUE_CAPACITY: " + CENTRAL_QUEUE_CAPACITY
141                     + " (expected: >= " + 2 + ')');
142         }
143         if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
144             throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
145                     + " (expected: >= " + 2 + ')');
146         }
147     }
148 
149     AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
150         ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
151         ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
152         this.chunkAllocator = chunkAllocator;
153         centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
154         magazineExpandLock = new StampedLock();
155         if (magazineCaching != MagazineCaching.None) {
156             assert magazineCaching == MagazineCaching.EventLoopThreads ||
157                    magazineCaching == MagazineCaching.FastThreadLocalThreads;
158             final boolean cachedMagazinesNonEventLoopThreads =
159                     magazineCaching == MagazineCaching.FastThreadLocalThreads;
160             final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
161             threadLocalMagazine = new FastThreadLocal<Object>() {
162                 @Override
163                 protected Object initialValue() {
164                     if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
165                         if (!FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread())) {
166                             // To prevent potential leak, we will not use thread-local magazine.
167                             return NO_MAGAZINE;
168                         }
169                         Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
170                         liveMagazines.add(mag);
171                         return mag;
172                     }
173                     return NO_MAGAZINE;
174                 }
175 
176                 @Override
177                 protected void onRemoval(final Object value) throws Exception {
178                     if (value != NO_MAGAZINE) {
179                         liveMagazines.remove(value);
180                     }
181                 }
182             };
183             liveCachedMagazines = liveMagazines;
184         } else {
185             threadLocalMagazine = null;
186             liveCachedMagazines = null;
187         }
188         Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
189         for (int i = 0; i < mags.length; i++) {
190             mags[i] = new Magazine(this);
191         }
192         magazines = mags;
193     }
194 
195     /**
196      * Create a thread-safe multi-producer, multi-consumer queue to hold chunks that spill over from the
197      * internal Magazines.
198      * <p>
199      * Each Magazine can only hold two chunks at any one time: the chunk it currently allocates from,
200      * and the next-in-line chunk which will be used for allocation once the current one has been used up.
201      * This queue will be used by magazines to share any excess chunks they allocate, so that they don't need to
202      * allocate new chunks when their current and next-in-line chunks have both been used up.
203      * <p>
204      * The simplest implementation of this method is to return a new {@link ConcurrentLinkedQueue}.
205      * However, the {@code CLQ} is unbounded, and this means there's no limit to how many chunks can be cached in this
206      * queue.
207      * <p>
208      * Each chunk in this queue can be up to {@link #MAX_CHUNK_SIZE} in size, so it is recommended to use a bounded
209      * queue to limit the maximum memory usage.
210      * <p>
211      * The default implementation will create a bounded queue with a capacity of {@link #CENTRAL_QUEUE_CAPACITY}.
212      *
213      * @return A new multi-producer, multi-consumer queue.
214      */
215     private static Queue<Chunk> createSharedChunkQueue() {
216         return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
217     }
218 
219     ByteBuf allocate(int size, int maxCapacity) {
220         return allocate(size, maxCapacity, Thread.currentThread(), null);
221     }
222 
223     private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
224         if (size <= MAX_CHUNK_SIZE) {
225             int sizeBucket = AllocationStatistics.sizeBucket(size); // Compute outside of Magazine lock for better ILP.
226             FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
227             if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
228                 Object mag = threadLocalMagazine.get();
229                 if (mag != NO_MAGAZINE) {
230                     Magazine magazine = (Magazine) mag;
231                     if (buf == null) {
232                         buf = magazine.newBuffer();
233                     }
234                     boolean allocated = magazine.tryAllocate(size, sizeBucket, maxCapacity, buf);
235                     assert allocated : "Allocation of threadLocalMagazine must always succeed";
236                     return buf;
237                 }
238             }
239             long threadId = currentThread.getId();
240             Magazine[] mags;
241             int expansions = 0;
242             do {
243                 mags = magazines;
244                 int mask = mags.length - 1;
245                 int index = (int) (threadId & mask);
246                 for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
247                     Magazine mag = mags[index + i & mask];
248                     if (buf == null) {
249                         buf = mag.newBuffer();
250                     }
251                     if (mag.tryAllocate(size, sizeBucket, maxCapacity, buf)) {
252                         // Was able to allocate.
253                         return buf;
254                     }
255                 }
256                 expansions++;
257             } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
258         }
259 
260         // The magazines failed us, or the buffer is too big to be pooled.
261         return allocateFallback(size, maxCapacity, currentThread, buf);
262     }
263 
264     private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
265         // If we don't already have a buffer, obtain one from the most conveniently available magazine.
266         Magazine magazine;
267         if (buf != null) {
268             Chunk chunk = buf.chunk;
269             if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
270                 magazine = getFallbackMagazine(currentThread);
271             }
272         } else {
273             magazine = getFallbackMagazine(currentThread);
274             buf = magazine.newBuffer();
275         }
276         // Create a one-off chunk for this allocation.
277         AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
278         Chunk chunk = new Chunk(innerChunk, magazine, false);
279         try {
280             chunk.readInitInto(buf, size, maxCapacity);
281         } finally {
282             // As the chunk is an one-off we need to always call release explicitly as readInitInto(...)
283             // will take care of retain once when successful. Once The AdaptiveByteBuf is released it will
284             // completely release the Chunk and so the contained innerChunk.
285             chunk.release();
286         }
287         return buf;
288     }
289 
290     private Magazine getFallbackMagazine(Thread currentThread) {
291         Object tlMag;
292         FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
293         if (threadLocalMagazine != null &&
294                 currentThread instanceof FastThreadLocalThread &&
295                 (tlMag = threadLocalMagazine.get()) != NO_MAGAZINE) {
296             return (Magazine) tlMag;
297         }
298         Magazine[] mags = magazines;
299         return mags[(int) currentThread.getId() & mags.length - 1];
300     }
301 
302     /**
303      * Allocate into the given buffer. Used by {@link AdaptiveByteBuf#capacity(int)}.
304      */
305     void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
306         AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
307         assert result == into: "Re-allocation created separate buffer instance";
308     }
309 
310     long usedMemory() {
311         long sum = 0;
312         for (Chunk chunk : centralQueue) {
313             sum += chunk.capacity();
314         }
315         for (Magazine magazine : magazines) {
316             sum += magazine.usedMemory.get();
317         }
318         if (liveCachedMagazines != null) {
319             for (Magazine magazine : liveCachedMagazines) {
320                 sum += magazine.usedMemory.get();
321             }
322         }
323         return sum;
324     }
325 
326     private boolean tryExpandMagazines(int currentLength) {
327         if (currentLength >= MAX_STRIPES) {
328             return true;
329         }
330         final Magazine[] mags;
331         long writeLock = magazineExpandLock.tryWriteLock();
332         if (writeLock != 0) {
333             try {
334                 mags = magazines;
335                 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
336                     return true;
337                 }
338                 int preferredChunkSize = mags[0].sharedPrefChunkSize;
339                 Magazine[] expanded = new Magazine[mags.length * 2];
340                 for (int i = 0, l = expanded.length; i < l; i++) {
341                     Magazine m = new Magazine(this);
342                     m.localPrefChunkSize = preferredChunkSize;
343                     m.sharedPrefChunkSize = preferredChunkSize;
344                     expanded[i] = m;
345                 }
346                 magazines = expanded;
347             } finally {
348                 magazineExpandLock.unlockWrite(writeLock);
349             }
350             for (Magazine magazine : mags) {
351                 magazine.free();
352             }
353         }
354         return true;
355     }
356 
357     private boolean offerToQueue(Chunk buffer) {
358         if (freed) {
359             return false;
360         }
361         // The Buffer should not be used anymore, let's add an assert to so we guard against bugs in the future.
362         assert buffer.allocatedBytes == 0;
363         assert buffer.magazine == null;
364 
365         boolean isAdded = centralQueue.offer(buffer);
366         if (freed && isAdded) {
367             // Help to free the centralQueue.
368             freeCentralQueue();
369         }
370         return isAdded;
371     }
372 
373     // Ensure that we release all previous pooled resources when this object is finalized. This is needed as otherwise
374     // we might end up with leaks. While these leaks are usually harmless in reality it would still at least be
375     // very confusing for users.
376     @Override
377     protected void finalize() throws Throwable {
378         try {
379             super.finalize();
380         } finally {
381             free();
382         }
383     }
384 
385     private void free() {
386         freed = true;
387         long stamp = magazineExpandLock.writeLock();
388         try {
389             Magazine[] mags = magazines;
390             for (Magazine magazine : mags) {
391                 magazine.free();
392             }
393         } finally {
394             magazineExpandLock.unlockWrite(stamp);
395         }
396         freeCentralQueue();
397     }
398 
399     private void freeCentralQueue() {
400         for (;;) {
401             Chunk chunk = centralQueue.poll();
402             if (chunk == null) {
403                 break;
404             }
405             chunk.release();
406         }
407     }
408 
409     static int sizeBucket(int size) {
410         return AllocationStatistics.sizeBucket(size);
411     }
412 
413     @SuppressWarnings("checkstyle:finalclass") // Checkstyle mistakenly believes this class should be final.
414     private static class AllocationStatistics {
415         private static final int MIN_DATUM_TARGET = 1024;
416         private static final int MAX_DATUM_TARGET = 65534;
417         private static final int INIT_DATUM_TARGET = 9;
418         private static final int HISTO_MIN_BUCKET_SHIFT = 13; // Smallest bucket is 1 << 13 = 8192 bytes in size.
419         private static final int HISTO_MAX_BUCKET_SHIFT = 20; // Biggest bucket is 1 << 20 = 1 MiB bytes in size.
420         private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT; // 8 buckets.
421         private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
422         private static final int SIZE_MAX_MASK = MAX_CHUNK_SIZE - 1;
423 
424         protected final AdaptivePoolingAllocator parent;
425         private final boolean shareable;
426         private final short[][] histos = {
427                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
428                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
429         };
430         private short[] histo = histos[0];
431         private final int[] sums = new int[HISTO_BUCKET_COUNT];
432 
433         private int histoIndex;
434         private int datumCount;
435         private int datumTarget = INIT_DATUM_TARGET;
436         protected volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
437         protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
438 
439         private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
440             this.parent = parent;
441             this.shareable = shareable;
442         }
443 
444         protected void recordAllocationSize(int bucket) {
445             histo[bucket]++;
446             if (datumCount++ == datumTarget) {
447                 rotateHistograms();
448             }
449         }
450 
451         static int sizeBucket(int size) {
452             if (size == 0) {
453                 return 0;
454             }
455             // Minimum chunk size is 128 KiB. We'll only make bigger chunks if the 99-percentile is 16 KiB or greater,
456             // so we truncate and roll up the bottom part of the histogram to 8 KiB.
457             // The upper size band is 1 MiB, and that gives us exactly 8 size buckets,
458             // which is a magical number for JIT optimisations.
459             int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & SIZE_MAX_MASK;
460             return Math.min(Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize), HISTO_MAX_BUCKET_MASK);
461         }
462 
463         private void rotateHistograms() {
464             short[][] hs = histos;
465             for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
466                 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
467             }
468             int sum = 0;
469             for (int count : sums) {
470                 sum  += count;
471             }
472             int targetPercentile = (int) (sum * 0.99);
473             int sizeBucket = 0;
474             for (; sizeBucket < sums.length; sizeBucket++) {
475                 if (sums[sizeBucket] > targetPercentile) {
476                     break;
477                 }
478                 targetPercentile -= sums[sizeBucket];
479             }
480             int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
481             int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
482             localPrefChunkSize = prefChunkSize;
483             if (shareable) {
484                 for (Magazine mag : parent.magazines) {
485                     prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
486                 }
487             }
488             if (sharedPrefChunkSize != prefChunkSize) {
489                 // Preferred chunk size changed. Increase check frequency.
490                 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
491                 sharedPrefChunkSize = prefChunkSize;
492             } else {
493                 // Preferred chunk size did not change. Check less often.
494                 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
495             }
496 
497             histoIndex = histoIndex + 1 & 3;
498             histo = histos[histoIndex];
499             datumCount = 0;
500             Arrays.fill(histo, (short) 0);
501         }
502 
503         /**
504          * Get the preferred chunk size, based on statistics from the {@linkplain #recordAllocationSize(int) recorded}
505          * allocation sizes.
506          * <p>
507          * This method must be thread-safe.
508          *
509          * @return The currently preferred chunk allocation size.
510          */
511         protected int preferredChunkSize() {
512             return sharedPrefChunkSize;
513         }
514     }
515 
516     private static final class Magazine extends AllocationStatistics {
517         private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
518         static {
519             NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
520         }
521         private static final Chunk MAGAZINE_FREED = new Chunk();
522 
523         private static final ObjectPool<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
524                 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
525                     @Override
526                     public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
527                         return new AdaptiveByteBuf(handle);
528                     }
529                 });
530 
531         private Chunk current;
532         @SuppressWarnings("unused") // updated via NEXT_IN_LINE
533         private volatile Chunk nextInLine;
534         private final AtomicLong usedMemory;
535         private final StampedLock allocationLock;
536         private final Queue<AdaptiveByteBuf> bufferQueue;
537         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
538 
539         Magazine(AdaptivePoolingAllocator parent) {
540             this(parent, true);
541         }
542 
543         Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
544             super(parent, shareable);
545 
546             if (shareable) {
547                 // We only need the StampedLock if this Magazine will be shared across threads.
548                 allocationLock = new StampedLock();
549                 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
550                 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
551                     @Override
552                     public void recycle(AdaptiveByteBuf self) {
553                         bufferQueue.offer(self);
554                     }
555                 };
556             } else {
557                 allocationLock = null;
558                 bufferQueue = null;
559                 handle = null;
560             }
561             usedMemory = new AtomicLong();
562         }
563 
564         public boolean tryAllocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
565             if (allocationLock == null) {
566                 // This magazine is not shared across threads, just allocate directly.
567                 return allocate(size, sizeBucket, maxCapacity, buf);
568             }
569 
570             // Try to retrieve the lock and if successful allocate.
571             long writeLock = allocationLock.tryWriteLock();
572             if (writeLock != 0) {
573                 try {
574                     return allocate(size, sizeBucket, maxCapacity, buf);
575                 } finally {
576                     allocationLock.unlockWrite(writeLock);
577                 }
578             }
579             return allocateWithoutLock(size, maxCapacity, buf);
580         }
581 
582         private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf buf) {
583             Chunk curr = NEXT_IN_LINE.getAndSet(this, null);
584             if (curr == MAGAZINE_FREED) {
585                 // Allocation raced with a stripe-resize that freed this magazine.
586                 restoreMagazineFreed();
587                 return false;
588             }
589             if (curr == null) {
590                 curr = parent.centralQueue.poll();
591                 if (curr == null) {
592                     return false;
593                 }
594                 curr.attachToMagazine(this);
595             }
596             if (curr.remainingCapacity() >= size) {
597                 curr.readInitInto(buf, size, maxCapacity);
598             }
599             try {
600                 if (curr.remainingCapacity() >= RETIRE_CAPACITY) {
601                     transferToNextInLineOrRelease(curr);
602                     curr = null;
603                 }
604             } finally {
605                 if (curr != null) {
606                     curr.release();
607                 }
608             }
609             return true;
610         }
611 
612         private boolean allocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
613             recordAllocationSize(sizeBucket);
614             Chunk curr = current;
615             if (curr != null) {
616                 // We have a Chunk that has some space left.
617                 if (curr.remainingCapacity() > size) {
618                     curr.readInitInto(buf, size, maxCapacity);
619                     // We still have some bytes left that we can use for the next allocation, just early return.
620                     return true;
621                 }
622 
623                 // At this point we know that this will be the last time current will be used, so directly set it to
624                 // null and release it once we are done.
625                 current = null;
626                 if (curr.remainingCapacity() == size) {
627                     try {
628                         curr.readInitInto(buf, size, maxCapacity);
629                         return true;
630                     } finally {
631                         curr.release();
632                     }
633                 }
634 
635                 // Check if we either retain the chunk in the nextInLine cache or releasing it.
636                 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
637                     curr.release();
638                 } else {
639                     // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
640                     // This method will release curr if this is not the case
641                     transferToNextInLineOrRelease(curr);
642                 }
643             }
644 
645             assert current == null;
646             // The fast-path for allocations did not work.
647             //
648             // Try to fetch the next "Magazine local" Chunk first, if this this fails as we don't have
649             // one setup we will poll our centralQueue. If this fails as well we will just allocate a new Chunk.
650             //
651             // In any case we will store the Chunk as the current so it will be used again for the next allocation and
652             // so be "reserved" by this Magazine for exclusive usage.
653             curr = NEXT_IN_LINE.getAndSet(this, null);
654             if (curr != null) {
655                 if (curr == MAGAZINE_FREED) {
656                     // Allocation raced with a stripe-resize that freed this magazine.
657                     restoreMagazineFreed();
658                     return false;
659                 }
660 
661                 if (curr.remainingCapacity() > size) {
662                     // We have a Chunk that has some space left.
663                     curr.readInitInto(buf, size, maxCapacity);
664                     current = curr;
665                     return true;
666                 }
667 
668                 if (curr.remainingCapacity() == size) {
669                     // At this point we know that this will be the last time curr will be used, so directly set it to
670                     // null and release it once we are done.
671                     try {
672                         curr.readInitInto(buf, size, maxCapacity);
673                         return true;
674                     } finally {
675                         // Release in a finally block so even if readInitInto(...) would throw we would still correctly
676                         // release the current chunk before null it out.
677                         curr.release();
678                     }
679                 } else {
680                     // Release it as it's too small.
681                     curr.release();
682                 }
683             }
684 
685             // Now try to poll from the central queue first
686             curr = parent.centralQueue.poll();
687             if (curr == null) {
688                 curr = newChunkAllocation(size);
689             } else {
690                 curr.attachToMagazine(this);
691 
692                 if (curr.remainingCapacity() < size) {
693                     // Check if we either retain the chunk in the nextInLine cache or releasing it.
694                     if (curr.remainingCapacity() < RETIRE_CAPACITY) {
695                         curr.release();
696                     } else {
697                         // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
698                         // This method will release curr if this is not the case
699                         transferToNextInLineOrRelease(curr);
700                     }
701                     curr = newChunkAllocation(size);
702                 }
703             }
704 
705             current = curr;
706             try {
707                 assert current.remainingCapacity() >= size;
708                 if (curr.remainingCapacity() > size) {
709                     curr.readInitInto(buf, size, maxCapacity);
710                     curr = null;
711                 } else {
712                     curr.readInitInto(buf, size, maxCapacity);
713                 }
714             } finally {
715                 if (curr != null) {
716                     // Release in a finally block so even if readInitInto(...) would throw we would still correctly
717                     // release the current chunk before null it out.
718                     curr.release();
719                     current = null;
720                 }
721             }
722             return true;
723         }
724 
725         private void restoreMagazineFreed() {
726             Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
727             if (next != null && next != MAGAZINE_FREED) {
728                 next.release(); // A chunk snuck in through a race. Release it after restoring MAGAZINE_FREED state.
729             }
730         }
731 
732         private void transferToNextInLineOrRelease(Chunk chunk) {
733             if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
734                 return;
735             }
736 
737             Chunk nextChunk = NEXT_IN_LINE.get(this);
738             if (nextChunk != null && nextChunk != MAGAZINE_FREED
739                     && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
740                 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
741                     nextChunk.release();
742                     return;
743                 }
744             }
745             // Next-in-line is occupied. We don't try to add it to the central queue yet as it might still be used
746             // by some buffers and so is attached to a Magazine.
747             // Once a Chunk is completely released by Chunk.release() it will try to move itself to the queue
748             // as last resort.
749             chunk.release();
750         }
751 
752         private Chunk newChunkAllocation(int promptingSize) {
753             int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
754             int minChunks = size / MIN_CHUNK_SIZE;
755             if (MIN_CHUNK_SIZE * minChunks < size) {
756                 // Round up to nearest whole MIN_CHUNK_SIZE unit. The MIN_CHUNK_SIZE is an even multiple of many
757                 // popular small page sizes, like 4k, 16k, and 64k, which makes it easier for the system allocator
758                 // to manage the memory in terms of whole pages. This reduces memory fragmentation,
759                 // but without the potentially high overhead that power-of-2 chunk sizes would bring.
760                 size = MIN_CHUNK_SIZE * (1 + minChunks);
761             }
762             ChunkAllocator chunkAllocator = parent.chunkAllocator;
763             return new Chunk(chunkAllocator.allocate(size, size), this, true);
764         }
765 
766         boolean trySetNextInLine(Chunk chunk) {
767             return NEXT_IN_LINE.compareAndSet(this, null, chunk);
768         }
769 
770         void free() {
771             // Release the current Chunk and the next that was stored for later usage.
772             restoreMagazineFreed();
773             long stamp = allocationLock.writeLock();
774             try {
775                 if (current != null) {
776                     current.release();
777                     current = null;
778                 }
779             } finally {
780                 allocationLock.unlockWrite(stamp);
781             }
782         }
783 
784         public AdaptiveByteBuf newBuffer() {
785             AdaptiveByteBuf buf;
786             if (handle == null) {
787                 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
788             } else {
789                 buf = bufferQueue.poll();
790                 if (buf == null) {
791                     buf = new AdaptiveByteBuf(handle);
792                 }
793             }
794             buf.resetRefCnt();
795             buf.discardMarks();
796             return buf;
797         }
798     }
799 
800     private static final class Chunk implements ReferenceCounted {
801 
802         private final AbstractByteBuf delegate;
803         private Magazine magazine;
804         private final AdaptivePoolingAllocator allocator;
805         private final int capacity;
806         private final boolean pooled;
807         private int allocatedBytes;
808         private static final long REFCNT_FIELD_OFFSET =
809                 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
810         private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
811                 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
812 
813         private static final ReferenceCountUpdater<Chunk> updater =
814                 new ReferenceCountUpdater<Chunk>() {
815                     @Override
816                     protected AtomicIntegerFieldUpdater<Chunk> updater() {
817                         return AIF_UPDATER;
818                     }
819                     @Override
820                     protected long unsafeOffset() {
821                         return REFCNT_FIELD_OFFSET;
822                     }
823                 };
824 
825         // Value might not equal "real" reference count, all access should be via the updater
826         @SuppressWarnings({"unused", "FieldMayBeFinal"})
827         private volatile int refCnt;
828 
829         Chunk() {
830             // Constructor only used by the MAGAZINE_FREED sentinel.
831             delegate = null;
832             magazine = null;
833             allocator = null;
834             capacity = 0;
835             pooled = false;
836         }
837 
838         Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
839             this.delegate = delegate;
840             this.pooled = pooled;
841             capacity = delegate.capacity();
842             updater.setInitialValue(this);
843             allocator = magazine.parent;
844             attachToMagazine(magazine);
845         }
846 
847         Magazine currentMagazine()  {
848             return magazine;
849         }
850 
851         void detachFromMagazine() {
852             if (magazine != null) {
853                 magazine.usedMemory.getAndAdd(-capacity);
854                 magazine = null;
855             }
856         }
857 
858         void attachToMagazine(Magazine magazine) {
859             assert this.magazine == null;
860             this.magazine = magazine;
861             magazine.usedMemory.getAndAdd(capacity);
862         }
863 
864         @Override
865         public Chunk touch(Object hint) {
866             return this;
867         }
868 
869         @Override
870         public int refCnt() {
871             return updater.refCnt(this);
872         }
873 
874         @Override
875         public Chunk retain() {
876             return updater.retain(this);
877         }
878 
879         @Override
880         public Chunk retain(int increment) {
881             return updater.retain(this, increment);
882         }
883 
884         @Override
885         public Chunk touch() {
886             return this;
887         }
888 
889         @Override
890         public boolean release() {
891             if (updater.release(this)) {
892                 deallocate();
893                 return true;
894             }
895             return false;
896         }
897 
898         @Override
899         public boolean release(int decrement) {
900             if (updater.release(this, decrement)) {
901                 deallocate();
902                 return true;
903             }
904             return false;
905         }
906 
907         private void deallocate() {
908             Magazine mag = magazine;
909             AdaptivePoolingAllocator parent = mag.parent;
910             int chunkSize = mag.preferredChunkSize();
911             int memSize = delegate.capacity();
912             if (!pooled || shouldReleaseSuboptimalChunkSuze(memSize, chunkSize)) {
913                 // Drop the chunk if the parent allocator is closed,
914                 // or if the chunk deviates too much from the preferred chunk size.
915                 detachFromMagazine();
916                 delegate.release();
917             } else {
918                 updater.resetRefCnt(this);
919                 delegate.setIndex(0, 0);
920                 allocatedBytes = 0;
921                 if (!mag.trySetNextInLine(this)) {
922                     // As this Chunk does not belong to the mag anymore we need to decrease the used memory .
923                     detachFromMagazine();
924                     if (!parent.offerToQueue(this)) {
925                         // The central queue is full. Ensure we release again as we previously did use resetRefCnt()
926                         // which did increase the reference count by 1.
927                         boolean released = updater.release(this);
928                         delegate.release();
929                         assert released;
930                     }
931                 }
932             }
933         }
934 
935         private static boolean shouldReleaseSuboptimalChunkSuze(int givenSize, int preferredSize) {
936             int givenChunks = givenSize / MIN_CHUNK_SIZE;
937             int preferredChunks = preferredSize / MIN_CHUNK_SIZE;
938             int deviation = Math.abs(givenChunks - preferredChunks);
939 
940             // Retire chunks with a 0.5% probability per unit of MIN_CHUNK_SIZE deviation from preference.
941             return deviation != 0 &&
942                     ThreadLocalRandom.current().nextDouble() * 200.0 > deviation;
943         }
944 
945         public void readInitInto(AdaptiveByteBuf buf, int size, int maxCapacity) {
946             int startIndex = allocatedBytes;
947             allocatedBytes = startIndex + size;
948             Chunk chunk = this;
949             chunk.retain();
950             try {
951                 buf.init(delegate, chunk, 0, 0, startIndex, size, maxCapacity);
952                 chunk = null;
953             } finally {
954                 if (chunk != null) {
955                     // If chunk is not null we know that buf.init(...) failed and so we need to manually release
956                     // the chunk again as we retained it before calling buf.init(...). Beside this we also need to
957                     // restore the old allocatedBytes value.
958                     allocatedBytes = startIndex;
959                     chunk.release();
960                 }
961             }
962         }
963 
964         public int remainingCapacity() {
965             return capacity - allocatedBytes;
966         }
967 
968         public int capacity() {
969             return capacity;
970         }
971     }
972 
973     static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
974 
975         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
976 
977         private int adjustment;
978         private AbstractByteBuf rootParent;
979         Chunk chunk;
980         private int length;
981         private ByteBuffer tmpNioBuf;
982         private boolean hasArray;
983         private boolean hasMemoryAddress;
984 
985         AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
986             super(0);
987             handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
988         }
989 
990         void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
991                   int adjustment, int capacity, int maxCapacity) {
992             this.adjustment = adjustment;
993             chunk = wrapped;
994             length = capacity;
995             maxCapacity(maxCapacity);
996             setIndex0(readerIndex, writerIndex);
997             hasArray = unwrapped.hasArray();
998             hasMemoryAddress = unwrapped.hasMemoryAddress();
999             rootParent = unwrapped;
1000             tmpNioBuf = unwrapped.internalNioBuffer(adjustment, capacity).slice();
1001         }
1002 
1003         private AbstractByteBuf rootParent() {
1004             final AbstractByteBuf rootParent = this.rootParent;
1005             if (rootParent != null) {
1006                 return rootParent;
1007             }
1008             throw new IllegalReferenceCountException();
1009         }
1010 
1011         @Override
1012         public int capacity() {
1013             return length;
1014         }
1015 
1016         @Override
1017         public ByteBuf capacity(int newCapacity) {
1018             if (newCapacity == capacity()) {
1019                 ensureAccessible();
1020                 return this;
1021             }
1022             checkNewCapacity(newCapacity);
1023             if (newCapacity < capacity()) {
1024                 length = newCapacity;
1025                 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
1026                 return this;
1027             }
1028 
1029             // Reallocation required.
1030             ByteBuffer data = tmpNioBuf;
1031             data.clear();
1032             tmpNioBuf = null;
1033             Chunk chunk = this.chunk;
1034             AdaptivePoolingAllocator allocator = chunk.allocator;
1035             int readerIndex = this.readerIndex;
1036             int writerIndex = this.writerIndex;
1037             allocator.allocate(newCapacity, maxCapacity(), this);
1038             tmpNioBuf.put(data);
1039             tmpNioBuf.clear();
1040             chunk.release();
1041             this.readerIndex = readerIndex;
1042             this.writerIndex = writerIndex;
1043             return this;
1044         }
1045 
1046         @Override
1047         public ByteBufAllocator alloc() {
1048             return rootParent().alloc();
1049         }
1050 
1051         @Override
1052         public ByteOrder order() {
1053             return rootParent().order();
1054         }
1055 
1056         @Override
1057         public ByteBuf unwrap() {
1058             return null;
1059         }
1060 
1061         @Override
1062         public boolean isDirect() {
1063             return rootParent().isDirect();
1064         }
1065 
1066         @Override
1067         public int arrayOffset() {
1068             return idx(rootParent().arrayOffset());
1069         }
1070 
1071         @Override
1072         public boolean hasMemoryAddress() {
1073             return hasMemoryAddress;
1074         }
1075 
1076         @Override
1077         public long memoryAddress() {
1078             ensureAccessible();
1079             return rootParent().memoryAddress() + adjustment;
1080         }
1081 
1082         @Override
1083         public ByteBuffer nioBuffer(int index, int length) {
1084             checkIndex(index, length);
1085             return rootParent().nioBuffer(idx(index), length);
1086         }
1087 
1088         @Override
1089         public ByteBuffer internalNioBuffer(int index, int length) {
1090             checkIndex(index, length);
1091             return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1092         }
1093 
1094         private ByteBuffer internalNioBuffer() {
1095             return (ByteBuffer) tmpNioBuf.clear();
1096         }
1097 
1098         @Override
1099         public ByteBuffer[] nioBuffers(int index, int length) {
1100             checkIndex(index, length);
1101             return rootParent().nioBuffers(idx(index), length);
1102         }
1103 
1104         @Override
1105         public boolean hasArray() {
1106             return hasArray;
1107         }
1108 
1109         @Override
1110         public byte[] array() {
1111             ensureAccessible();
1112             return rootParent().array();
1113         }
1114 
1115         @Override
1116         public ByteBuf copy(int index, int length) {
1117             checkIndex(index, length);
1118             return rootParent().copy(idx(index), length);
1119         }
1120 
1121         @Override
1122         public int nioBufferCount() {
1123             return rootParent().nioBufferCount();
1124         }
1125 
1126         @Override
1127         protected byte _getByte(int index) {
1128             return rootParent()._getByte(idx(index));
1129         }
1130 
1131         @Override
1132         protected short _getShort(int index) {
1133             return rootParent()._getShort(idx(index));
1134         }
1135 
1136         @Override
1137         protected short _getShortLE(int index) {
1138             return rootParent()._getShortLE(idx(index));
1139         }
1140 
1141         @Override
1142         protected int _getUnsignedMedium(int index) {
1143             return rootParent()._getUnsignedMedium(idx(index));
1144         }
1145 
1146         @Override
1147         protected int _getUnsignedMediumLE(int index) {
1148             return rootParent()._getUnsignedMediumLE(idx(index));
1149         }
1150 
1151         @Override
1152         protected int _getInt(int index) {
1153             return rootParent()._getInt(idx(index));
1154         }
1155 
1156         @Override
1157         protected int _getIntLE(int index) {
1158             return rootParent()._getIntLE(idx(index));
1159         }
1160 
1161         @Override
1162         protected long _getLong(int index) {
1163             return rootParent()._getLong(idx(index));
1164         }
1165 
1166         @Override
1167         protected long _getLongLE(int index) {
1168             return rootParent()._getLongLE(idx(index));
1169         }
1170 
1171         @Override
1172         public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1173             checkIndex(index, length);
1174             rootParent().getBytes(idx(index), dst, dstIndex, length);
1175             return this;
1176         }
1177 
1178         @Override
1179         public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1180             checkIndex(index, length);
1181             rootParent().getBytes(idx(index), dst, dstIndex, length);
1182             return this;
1183         }
1184 
1185         @Override
1186         public ByteBuf getBytes(int index, ByteBuffer dst) {
1187             checkIndex(index, dst.remaining());
1188             rootParent().getBytes(idx(index), dst);
1189             return this;
1190         }
1191 
1192         @Override
1193         protected void _setByte(int index, int value) {
1194             rootParent()._setByte(idx(index), value);
1195         }
1196 
1197         @Override
1198         protected void _setShort(int index, int value) {
1199             rootParent()._setShort(idx(index), value);
1200         }
1201 
1202         @Override
1203         protected void _setShortLE(int index, int value) {
1204             rootParent()._setShortLE(idx(index), value);
1205         }
1206 
1207         @Override
1208         protected void _setMedium(int index, int value) {
1209             rootParent()._setMedium(idx(index), value);
1210         }
1211 
1212         @Override
1213         protected void _setMediumLE(int index, int value) {
1214             rootParent()._setMediumLE(idx(index), value);
1215         }
1216 
1217         @Override
1218         protected void _setInt(int index, int value) {
1219             rootParent()._setInt(idx(index), value);
1220         }
1221 
1222         @Override
1223         protected void _setIntLE(int index, int value) {
1224             rootParent()._setIntLE(idx(index), value);
1225         }
1226 
1227         @Override
1228         protected void _setLong(int index, long value) {
1229             rootParent()._setLong(idx(index), value);
1230         }
1231 
1232         @Override
1233         protected void _setLongLE(int index, long value) {
1234             rootParent().setLongLE(idx(index), value);
1235         }
1236 
1237         @Override
1238         public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1239             checkIndex(index, length);
1240             rootParent().setBytes(idx(index), src, srcIndex, length);
1241             return this;
1242         }
1243 
1244         @Override
1245         public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1246             checkIndex(index, length);
1247             rootParent().setBytes(idx(index), src, srcIndex, length);
1248             return this;
1249         }
1250 
1251         @Override
1252         public ByteBuf setBytes(int index, ByteBuffer src) {
1253             checkIndex(index, src.remaining());
1254             rootParent().setBytes(idx(index), src);
1255             return this;
1256         }
1257 
1258         @Override
1259         public ByteBuf getBytes(int index, OutputStream out, int length)
1260                 throws IOException {
1261             checkIndex(index, length);
1262             if (length != 0) {
1263                 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1264             }
1265             return this;
1266         }
1267 
1268         @Override
1269         public int getBytes(int index, GatheringByteChannel out, int length)
1270                 throws IOException {
1271             return out.write(internalNioBuffer(index, length).duplicate());
1272         }
1273 
1274         @Override
1275         public int getBytes(int index, FileChannel out, long position, int length)
1276                 throws IOException {
1277             return out.write(internalNioBuffer(index, length).duplicate(), position);
1278         }
1279 
1280         @Override
1281         public int setBytes(int index, InputStream in, int length)
1282                 throws IOException {
1283             checkIndex(index, length);
1284             final AbstractByteBuf rootParent = rootParent();
1285             if (rootParent.hasArray()) {
1286                 return rootParent.setBytes(idx(index), in, length);
1287             }
1288             byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1289             int readBytes = in.read(tmp, 0, length);
1290             if (readBytes <= 0) {
1291                 return readBytes;
1292             }
1293             setBytes(index, tmp, 0, readBytes);
1294             return readBytes;
1295         }
1296 
1297         @Override
1298         public int setBytes(int index, ScatteringByteChannel in, int length)
1299                 throws IOException {
1300             try {
1301                 return in.read(internalNioBuffer(index, length).duplicate());
1302             } catch (ClosedChannelException ignored) {
1303                 return -1;
1304             }
1305         }
1306 
1307         @Override
1308         public int setBytes(int index, FileChannel in, long position, int length)
1309                 throws IOException {
1310             try {
1311                 return in.read(internalNioBuffer(index, length).duplicate(), position);
1312             } catch (ClosedChannelException ignored) {
1313                 return -1;
1314             }
1315         }
1316 
1317         @Override
1318         public int forEachByte(int index, int length, ByteProcessor processor) {
1319             checkIndex(index, length);
1320             int ret = rootParent().forEachByte(idx(index), length, processor);
1321             return forEachResult(ret);
1322         }
1323 
1324         @Override
1325         public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1326             checkIndex(index, length);
1327             int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1328             return forEachResult(ret);
1329         }
1330 
1331         private int forEachResult(int ret) {
1332             if (ret < adjustment) {
1333                 return -1;
1334             }
1335             return ret - adjustment;
1336         }
1337 
1338         @Override
1339         public boolean isContiguous() {
1340             return rootParent().isContiguous();
1341         }
1342 
1343         private int idx(int index) {
1344             return index + adjustment;
1345         }
1346 
1347         @Override
1348         protected void deallocate() {
1349             if (chunk != null) {
1350                 chunk.release();
1351             }
1352             tmpNioBuf = null;
1353             chunk = null;
1354             rootParent = null;
1355             if (handle instanceof EnhancedHandle) {
1356                 EnhancedHandle<AdaptiveByteBuf>  enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1357                 enhancedHandle.unguardedRecycle(this);
1358             } else {
1359                 handle.recycle(this);
1360             }
1361         }
1362     }
1363 
1364     /**
1365      * The strategy for how {@link AdaptivePoolingAllocator} should allocate chunk buffers.
1366      */
1367     interface ChunkAllocator {
1368         /**
1369          * Allocate a buffer for a chunk. This can be any kind of {@link AbstractByteBuf} implementation.
1370          * @param initialCapacity The initial capacity of the returned {@link AbstractByteBuf}.
1371          * @param maxCapacity The maximum capacity of the returned {@link AbstractByteBuf}.
1372          * @return The buffer that represents the chunk memory.
1373          */
1374         AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1375     }
1376 }