View Javadoc
1   /*
2    * Copyright 2022 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.buffer;
17  
18  import io.netty.util.ByteProcessor;
19  import io.netty.util.IllegalReferenceCountException;
20  import io.netty.util.NettyRuntime;
21  import io.netty.util.Recycler.EnhancedHandle;
22  import io.netty.util.ReferenceCounted;
23  import io.netty.util.concurrent.FastThreadLocal;
24  import io.netty.util.concurrent.FastThreadLocalThread;
25  import io.netty.util.internal.ObjectPool;
26  import io.netty.util.internal.ObjectUtil;
27  import io.netty.util.internal.PlatformDependent;
28  import io.netty.util.internal.ReferenceCountUpdater;
29  import io.netty.util.internal.SystemPropertyUtil;
30  import io.netty.util.internal.ThreadExecutorMap;
31  import io.netty.util.internal.UnstableApi;
32  
33  import java.io.IOException;
34  import java.io.InputStream;
35  import java.io.OutputStream;
36  import java.nio.ByteBuffer;
37  import java.nio.ByteOrder;
38  import java.nio.channels.ClosedChannelException;
39  import java.nio.channels.FileChannel;
40  import java.nio.channels.GatheringByteChannel;
41  import java.nio.channels.ScatteringByteChannel;
42  import java.nio.charset.Charset;
43  import java.util.Arrays;
44  import java.util.Queue;
45  import java.util.Set;
46  import java.util.concurrent.ConcurrentLinkedQueue;
47  import java.util.concurrent.CopyOnWriteArraySet;
48  import java.util.concurrent.ThreadLocalRandom;
49  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
50  import java.util.concurrent.atomic.AtomicLong;
51  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
52  import java.util.concurrent.locks.StampedLock;
53  
54  /**
55   * An auto-tuning pooling allocator, that follows an anti-generational hypothesis.
56   * <p>
57   * The allocator is organized into a list of Magazines, and each magazine has a chunk-buffer that they allocate buffers
58   * from.
59   * <p>
60   * The magazines hold the mutexes that ensure the thread-safety of the allocator, and each thread picks a magazine
61   * based on the id of the thread. This spreads the contention of multi-threaded access across the magazines.
62   * If contention is detected above a certain threshold, the number of magazines are increased in response to the
63   * contention.
64   * <p>
65   * The magazines maintain histograms of the sizes of the allocations they do. The histograms are used to compute the
66   * preferred chunk size. The preferred chunk size is one that is big enough to service 10 allocations of the
67   * 99-percentile size. This way, the chunk size is adapted to the allocation patterns.
68   * <p>
69   * Computing the preferred chunk size is a somewhat expensive operation. Therefore, the frequency with which this is
70   * done, is also adapted to the allocation pattern. If a newly computed preferred chunk is the same as the previous
71   * preferred chunk size, then the frequency is reduced. Otherwise, the frequency is increased.
72   * <p>
73   * This allows the allocator to quickly respond to changes in the application workload,
74   * without suffering undue overhead from maintaining its statistics.
75   * <p>
76   * Since magazines are "relatively thread-local", the allocator has a central queue that allow excess chunks from any
77   * magazine, to be shared with other magazines.
78   * The {@link #createSharedChunkQueue()} method can be overridden to customize this queue.
79   */
80  @UnstableApi
81  final class AdaptivePoolingAllocator {
82  
83      enum MagazineCaching {
84          EventLoopThreads,
85          FastThreadLocalThreads,
86          None
87      }
88  
89      /**
90       * The 128 KiB minimum chunk size is chosen to encourage the system allocator to delegate to mmap for chunk
91       * allocations. For instance, glibc will do this.
92       * This pushes any fragmentation from chunk size deviations off physical memory, onto virtual memory,
93       * which is a much, much larger space. Chunks are also allocated in whole multiples of the minimum
94       * chunk size, which itself is a whole multiple of popular page sizes like 4 KiB, 16 KiB, and 64 KiB.
95       */
96      private static final int MIN_CHUNK_SIZE = 128 * 1024;
97      private static final int EXPANSION_ATTEMPTS = 3;
98      private static final int INITIAL_MAGAZINES = 4;
99      private static final int RETIRE_CAPACITY = 256;
100     private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
101     private static final int BUFS_PER_CHUNK = 8; // For large buffers, aim to have about this many buffers per chunk.
102 
103     /**
104      * The maximum size of a pooled chunk, in bytes. Allocations bigger than this will never be pooled.
105      * <p>
106      * This number is 8 MiB, and is derived from the limitations of internal histograms.
107      */
108     private static final int MAX_CHUNK_SIZE = 1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT; // 8 MiB.
109     private static final int MAX_POOLED_BUF_SIZE = MAX_CHUNK_SIZE / BUFS_PER_CHUNK;
110 
111     /**
112      * The capacity if the central queue that allow chunks to be shared across magazines.
113      * The default size is {@link NettyRuntime#availableProcessors()},
114      * and the maximum number of magazines is twice this.
115      * <p>
116      * This means the maximum amount of memory that we can have allocated-but-not-in-use is
117      * 5 * {@link NettyRuntime#availableProcessors()} * {@link #MAX_CHUNK_SIZE} bytes.
118      */
119     private static final int CENTRAL_QUEUE_CAPACITY = Math.max(2, SystemPropertyUtil.getInt(
120             "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors()));
121 
122     /**
123      * The capacity if the magazine local buffer queue. This queue just pools the outer ByteBuf instance and not
124      * the actual memory and so helps to reduce GC pressure.
125      */
126     private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
127             "io.netty.allocator.magazineBufferQueueCapacity", 1024);
128 
129     private static final Object NO_MAGAZINE = Boolean.TRUE;
130 
131     private final ChunkAllocator chunkAllocator;
132     private final Queue<Chunk> centralQueue;
133     private final StampedLock magazineExpandLock;
134     private volatile Magazine[] magazines;
135     private final FastThreadLocal<Object> threadLocalMagazine;
136     private final Set<Magazine> liveCachedMagazines;
137     private volatile boolean freed;
138 
139     static {
140         if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
141             throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
142                     + " (expected: >= " + 2 + ')');
143         }
144     }
145 
146     AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
147         ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
148         ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
149         this.chunkAllocator = chunkAllocator;
150         centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
151         magazineExpandLock = new StampedLock();
152         if (magazineCaching != MagazineCaching.None) {
153             assert magazineCaching == MagazineCaching.EventLoopThreads ||
154                    magazineCaching == MagazineCaching.FastThreadLocalThreads;
155             final boolean cachedMagazinesNonEventLoopThreads =
156                     magazineCaching == MagazineCaching.FastThreadLocalThreads;
157             final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
158             threadLocalMagazine = new FastThreadLocal<Object>() {
159                 @Override
160                 protected Object initialValue() {
161                     if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
162                         if (!FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread())) {
163                             // To prevent a potential leak, we will not use thread-local magazine.
164                             return NO_MAGAZINE;
165                         }
166                         Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
167                         liveMagazines.add(mag);
168                         return mag;
169                     }
170                     return NO_MAGAZINE;
171                 }
172 
173                 @Override
174                 protected void onRemoval(final Object value) throws Exception {
175                     if (value != NO_MAGAZINE) {
176                         liveMagazines.remove(value);
177                     }
178                 }
179             };
180             liveCachedMagazines = liveMagazines;
181         } else {
182             threadLocalMagazine = null;
183             liveCachedMagazines = null;
184         }
185         Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
186         for (int i = 0; i < mags.length; i++) {
187             mags[i] = new Magazine(this);
188         }
189         magazines = mags;
190     }
191 
192     /**
193      * Create a thread-safe multi-producer, multi-consumer queue to hold chunks that spill over from the
194      * internal Magazines.
195      * <p>
196      * Each Magazine can only hold two chunks at any one time: the chunk it currently allocates from,
197      * and the next-in-line chunk which will be used for allocation once the current one has been used up.
198      * This queue will be used by magazines to share any excess chunks they allocate, so that they don't need to
199      * allocate new chunks when their current and next-in-line chunks have both been used up.
200      * <p>
201      * The simplest implementation of this method is to return a new {@link ConcurrentLinkedQueue}.
202      * However, the {@code CLQ} is unbounded, and this means there's no limit to how many chunks can be cached in this
203      * queue.
204      * <p>
205      * Each chunk in this queue can be up to {@link #MAX_CHUNK_SIZE} in size, so it is recommended to use a bounded
206      * queue to limit the maximum memory usage.
207      * <p>
208      * The default implementation will create a bounded queue with a capacity of {@link #CENTRAL_QUEUE_CAPACITY}.
209      *
210      * @return A new multi-producer, multi-consumer queue.
211      */
212     private static Queue<Chunk> createSharedChunkQueue() {
213         return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
214     }
215 
216     ByteBuf allocate(int size, int maxCapacity) {
217         return allocate(size, maxCapacity, Thread.currentThread(), null);
218     }
219 
220     private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
221         if (size <= MAX_POOLED_BUF_SIZE) {
222             FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
223             if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
224                 Object mag = threadLocalMagazine.get();
225                 if (mag != NO_MAGAZINE) {
226                     Magazine magazine = (Magazine) mag;
227                     if (buf == null) {
228                         buf = magazine.newBuffer();
229                     }
230                     boolean allocated = magazine.tryAllocate(size, maxCapacity, buf);
231                     assert allocated : "Allocation of threadLocalMagazine must always succeed";
232                     return buf;
233                 }
234             }
235             long threadId = currentThread.getId();
236             Magazine[] mags;
237             int expansions = 0;
238             do {
239                 mags = magazines;
240                 int mask = mags.length - 1;
241                 int index = (int) (threadId & mask);
242                 for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
243                     Magazine mag = mags[index + i & mask];
244                     if (buf == null) {
245                         buf = mag.newBuffer();
246                     }
247                     if (mag.tryAllocate(size, maxCapacity, buf)) {
248                         // Was able to allocate.
249                         return buf;
250                     }
251                 }
252                 expansions++;
253             } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
254         }
255 
256         // The magazines failed us, or the buffer is too big to be pooled.
257         return allocateFallback(size, maxCapacity, currentThread, buf);
258     }
259 
260     private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
261         // If we don't already have a buffer, obtain one from the most conveniently available magazine.
262         Magazine magazine;
263         if (buf != null) {
264             Chunk chunk = buf.chunk;
265             if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
266                 magazine = getFallbackMagazine(currentThread);
267             }
268         } else {
269             magazine = getFallbackMagazine(currentThread);
270             buf = magazine.newBuffer();
271         }
272         // Create a one-off chunk for this allocation.
273         AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
274         Chunk chunk = new Chunk(innerChunk, magazine, false);
275         try {
276             chunk.readInitInto(buf, size, size, maxCapacity);
277         } finally {
278             // As the chunk is an one-off we need to always call release explicitly as readInitInto(...)
279             // will take care of retain once when successful. Once The AdaptiveByteBuf is released it will
280             // completely release the Chunk and so the contained innerChunk.
281             chunk.release();
282         }
283         return buf;
284     }
285 
286     private Magazine getFallbackMagazine(Thread currentThread) {
287         Object tlMag;
288         FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
289         if (threadLocalMagazine != null &&
290                 currentThread instanceof FastThreadLocalThread &&
291                 (tlMag = threadLocalMagazine.get()) != NO_MAGAZINE) {
292             return (Magazine) tlMag;
293         }
294         Magazine[] mags = magazines;
295         return mags[(int) currentThread.getId() & mags.length - 1];
296     }
297 
298     /**
299      * Allocate into the given buffer. Used by {@link AdaptiveByteBuf#capacity(int)}.
300      */
301     void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
302         AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
303         assert result == into: "Re-allocation created separate buffer instance";
304     }
305 
306     long usedMemory() {
307         long sum = 0;
308         for (Chunk chunk : centralQueue) {
309             sum += chunk.capacity();
310         }
311         for (Magazine magazine : magazines) {
312             sum += magazine.usedMemory.get();
313         }
314         if (liveCachedMagazines != null) {
315             for (Magazine magazine : liveCachedMagazines) {
316                 sum += magazine.usedMemory.get();
317             }
318         }
319         return sum;
320     }
321 
322     private boolean tryExpandMagazines(int currentLength) {
323         if (currentLength >= MAX_STRIPES) {
324             return true;
325         }
326         final Magazine[] mags;
327         long writeLock = magazineExpandLock.tryWriteLock();
328         if (writeLock != 0) {
329             try {
330                 mags = magazines;
331                 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
332                     return true;
333                 }
334                 int preferredChunkSize = mags[0].sharedPrefChunkSize;
335                 Magazine[] expanded = new Magazine[mags.length * 2];
336                 for (int i = 0, l = expanded.length; i < l; i++) {
337                     Magazine m = new Magazine(this);
338                     m.localPrefChunkSize = preferredChunkSize;
339                     m.sharedPrefChunkSize = preferredChunkSize;
340                     expanded[i] = m;
341                 }
342                 magazines = expanded;
343             } finally {
344                 magazineExpandLock.unlockWrite(writeLock);
345             }
346             for (Magazine magazine : mags) {
347                 magazine.free();
348             }
349         }
350         return true;
351     }
352 
353     boolean offerToQueue(Chunk buffer) {
354         if (freed) {
355             return false;
356         }
357         // The Buffer should not be used anymore, let's add an assert to so we guard against bugs in the future.
358         assert buffer.allocatedBytes == 0;
359         assert buffer.magazine == null;
360 
361         boolean isAdded = centralQueue.offer(buffer);
362         if (freed && isAdded) {
363             // Help to free the centralQueue.
364             freeCentralQueue();
365         }
366         return isAdded;
367     }
368 
369     // Ensure that we release all previous pooled resources when this object is finalized. This is needed as otherwise
370     // we might end up with leaks. While these leaks are usually harmless in reality it would still at least be
371     // very confusing for users.
372     @Override
373     protected void finalize() throws Throwable {
374         try {
375             super.finalize();
376         } finally {
377             free();
378         }
379     }
380 
381     private void free() {
382         freed = true;
383         long stamp = magazineExpandLock.writeLock();
384         try {
385             Magazine[] mags = magazines;
386             for (Magazine magazine : mags) {
387                 magazine.free();
388             }
389         } finally {
390             magazineExpandLock.unlockWrite(stamp);
391         }
392         freeCentralQueue();
393     }
394 
395     private void freeCentralQueue() {
396         for (;;) {
397             Chunk chunk = centralQueue.poll();
398             if (chunk == null) {
399                 break;
400             }
401             chunk.release();
402         }
403     }
404 
405     static int sizeBucket(int size) {
406         return AllocationStatistics.sizeBucket(size);
407     }
408 
409     @SuppressWarnings("checkstyle:finalclass") // Checkstyle mistakenly believes this class should be final.
410     private static class AllocationStatistics {
411         private static final int MIN_DATUM_TARGET = 1024;
412         private static final int MAX_DATUM_TARGET = 65534;
413         private static final int INIT_DATUM_TARGET = 9;
414         private static final int HISTO_MIN_BUCKET_SHIFT = 8; // Smallest bucket is 1 << 8 = 256 bytes in size.
415         private static final int HISTO_MAX_BUCKET_SHIFT = 23; // Biggest bucket is 1 << 23 = 8 MiB bytes in size.
416         private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT; // 16 buckets
417         private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
418         private static final int SIZE_MAX_MASK = MAX_CHUNK_SIZE - 1;
419 
420         protected final AdaptivePoolingAllocator parent;
421         private final boolean shareable;
422         private final short[][] histos = {
423                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
424                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
425         };
426         private short[] histo = histos[0];
427         private final int[] sums = new int[HISTO_BUCKET_COUNT];
428 
429         private int histoIndex;
430         private int datumCount;
431         private int datumTarget = INIT_DATUM_TARGET;
432         protected boolean hasHadRotation;
433         protected volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
434         protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
435         protected volatile int localUpperBufSize;
436 
437         private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
438             this.parent = parent;
439             this.shareable = shareable;
440         }
441 
442         protected void recordAllocationSize(int bufferSizeToRecord) {
443             // Use the preserved size from the reused AdaptiveByteBuf, if available.
444             // Otherwise, use the requested buffer size.
445             // This way, we better take into account
446             if (bufferSizeToRecord == 0) {
447                 return;
448             }
449             int bucket = sizeBucket(bufferSizeToRecord);
450             histo[bucket]++;
451             if (datumCount++ == datumTarget) {
452                 rotateHistograms();
453             }
454         }
455 
456         static int sizeBucket(int size) {
457             if (size == 0) {
458                 return 0;
459             }
460             // Minimum chunk size is 128 KiB. We'll only make bigger chunks if the 99-percentile is 16 KiB or greater.
461             // We truncate and roll up the bottom part of the histogram to 256 bytes.
462             // The upper size bound is 8 MiB, and that gives us exactly 16 size buckets,
463             // which is a magical number for JIT optimisations.
464             int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & SIZE_MAX_MASK;
465             return Math.min(Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize), HISTO_MAX_BUCKET_MASK);
466         }
467 
468         private void rotateHistograms() {
469             short[][] hs = histos;
470             for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
471                 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
472             }
473             int sum = 0;
474             for (int count : sums) {
475                 sum  += count;
476             }
477             int targetPercentile = (int) (sum * 0.99);
478             int sizeBucket = 0;
479             for (; sizeBucket < sums.length; sizeBucket++) {
480                 if (sums[sizeBucket] > targetPercentile) {
481                     break;
482                 }
483                 targetPercentile -= sums[sizeBucket];
484             }
485             hasHadRotation = true;
486             int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
487             int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
488             localUpperBufSize = percentileSize;
489             localPrefChunkSize = prefChunkSize;
490             if (shareable) {
491                 for (Magazine mag : parent.magazines) {
492                     prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
493                 }
494             }
495             if (sharedPrefChunkSize != prefChunkSize) {
496                 // Preferred chunk size changed. Increase check frequency.
497                 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
498                 sharedPrefChunkSize = prefChunkSize;
499             } else {
500                 // Preferred chunk size did not change. Check less often.
501                 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
502             }
503 
504             histoIndex = histoIndex + 1 & 3;
505             histo = histos[histoIndex];
506             datumCount = 0;
507             Arrays.fill(histo, (short) 0);
508         }
509 
510         /**
511          * Get the preferred chunk size, based on statistics from the {@linkplain #recordAllocationSize(int) recorded}
512          * allocation sizes.
513          * <p>
514          * This method must be thread-safe.
515          *
516          * @return The currently preferred chunk allocation size.
517          */
518         protected int preferredChunkSize() {
519             return sharedPrefChunkSize;
520         }
521     }
522 
523     private static final class Magazine extends AllocationStatistics {
524         private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
525         static {
526             NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
527         }
528         private static final Chunk MAGAZINE_FREED = new Chunk();
529 
530         private static final ObjectPool<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
531                 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
532                     @Override
533                     public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
534                         return new AdaptiveByteBuf(handle);
535                     }
536                 });
537 
538         private Chunk current;
539         @SuppressWarnings("unused") // updated via NEXT_IN_LINE
540         private volatile Chunk nextInLine;
541         private final AtomicLong usedMemory;
542         private final StampedLock allocationLock;
543         private final Queue<AdaptiveByteBuf> bufferQueue;
544         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
545 
546         Magazine(AdaptivePoolingAllocator parent) {
547             this(parent, true);
548         }
549 
550         Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
551             super(parent, shareable);
552 
553             if (shareable) {
554                 // We only need the StampedLock if this Magazine will be shared across threads.
555                 allocationLock = new StampedLock();
556                 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
557                 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
558                     @Override
559                     public void recycle(AdaptiveByteBuf self) {
560                         bufferQueue.offer(self);
561                     }
562                 };
563             } else {
564                 allocationLock = null;
565                 bufferQueue = null;
566                 handle = null;
567             }
568             usedMemory = new AtomicLong();
569         }
570 
571         public boolean tryAllocate(int size, int maxCapacity, AdaptiveByteBuf buf) {
572             if (allocationLock == null) {
573                 // This magazine is not shared across threads, just allocate directly.
574                 return allocate(size, maxCapacity, buf);
575             }
576 
577             // Try to retrieve the lock and if successful allocate.
578             long writeLock = allocationLock.tryWriteLock();
579             if (writeLock != 0) {
580                 try {
581                     return allocate(size, maxCapacity, buf);
582                 } finally {
583                     allocationLock.unlockWrite(writeLock);
584                 }
585             }
586             return allocateWithoutLock(size, maxCapacity, buf);
587         }
588 
589         private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf buf) {
590             Chunk curr = NEXT_IN_LINE.getAndSet(this, null);
591             if (curr == MAGAZINE_FREED) {
592                 // Allocation raced with a stripe-resize that freed this magazine.
593                 restoreMagazineFreed();
594                 return false;
595             }
596             if (curr == null) {
597                 curr = parent.centralQueue.poll();
598                 if (curr == null) {
599                     return false;
600                 }
601                 curr.attachToMagazine(this);
602             }
603             boolean allocated = false;
604             int remainingCapacity = curr.remainingCapacity();
605             int startingCapacity = getStartingCapacity(size, maxCapacity);
606             if (remainingCapacity >= size) {
607                 curr.readInitInto(buf, size, Math.min(remainingCapacity, startingCapacity), maxCapacity);
608                 allocated = true;
609             }
610             try {
611                 if (remainingCapacity >= RETIRE_CAPACITY) {
612                     transferToNextInLineOrRelease(curr);
613                     curr = null;
614                 }
615             } finally {
616                 if (curr != null) {
617                     curr.release();
618                 }
619             }
620             return allocated;
621         }
622 
623         private boolean allocate(int size, int maxCapacity, AdaptiveByteBuf buf) {
624             recordAllocationSize(buf.length);
625             int startingCapacity = getStartingCapacity(size, maxCapacity);
626             Chunk curr = current;
627             if (curr != null) {
628                 // We have a Chunk that has some space left.
629                 int remainingCapacity = curr.remainingCapacity();
630                 if (remainingCapacity > startingCapacity) {
631                     curr.readInitInto(buf, size, startingCapacity, maxCapacity);
632                     // We still have some bytes left that we can use for the next allocation, just early return.
633                     return true;
634                 }
635 
636                 // At this point we know that this will be the last time current will be used, so directly set it to
637                 // null and release it once we are done.
638                 current = null;
639                 if (remainingCapacity >= size) {
640                     try {
641                         curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
642                         return true;
643                     } finally {
644                         curr.release();
645                     }
646                 }
647 
648                 // Check if we either retain the chunk in the nextInLine cache or releasing it.
649                 if (remainingCapacity < RETIRE_CAPACITY) {
650                     curr.release();
651                 } else {
652                     // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
653                     // This method will release curr if this is not the case
654                     transferToNextInLineOrRelease(curr);
655                 }
656             }
657 
658             assert current == null;
659             // The fast-path for allocations did not work.
660             //
661             // Try to fetch the next "Magazine local" Chunk first, if this fails because we don't have a
662             // next-in-line chunk available, we will poll our centralQueue.
663             // If this fails as well we will just allocate a new Chunk.
664             //
665             // In any case we will store the Chunk as the current so it will be used again for the next allocation and
666             // thus be "reserved" by this Magazine for exclusive usage.
667             curr = NEXT_IN_LINE.getAndSet(this, null);
668             if (curr != null) {
669                 if (curr == MAGAZINE_FREED) {
670                     // Allocation raced with a stripe-resize that freed this magazine.
671                     restoreMagazineFreed();
672                     return false;
673                 }
674 
675                 int remainingCapacity = curr.remainingCapacity();
676                 if (remainingCapacity > startingCapacity) {
677                     // We have a Chunk that has some space left.
678                     curr.readInitInto(buf, size, startingCapacity, maxCapacity);
679                     current = curr;
680                     return true;
681                 }
682 
683                 if (remainingCapacity >= size) {
684                     // At this point we know that this will be the last time curr will be used, so directly set it to
685                     // null and release it once we are done.
686                     try {
687                         curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
688                         return true;
689                     } finally {
690                         // Release in a finally block so even if readInitInto(...) would throw we would still correctly
691                         // release the current chunk before null it out.
692                         curr.release();
693                     }
694                 } else {
695                     // Release it as it's too small.
696                     curr.release();
697                 }
698             }
699 
700             // Now try to poll from the central queue first
701             curr = parent.centralQueue.poll();
702             if (curr == null) {
703                 curr = newChunkAllocation(size);
704             } else {
705                 curr.attachToMagazine(this);
706 
707                 int remainingCapacity = curr.remainingCapacity();
708                 if (remainingCapacity < size) {
709                     // Check if we either retain the chunk in the nextInLine cache or releasing it.
710                     if (remainingCapacity < RETIRE_CAPACITY) {
711                         curr.release();
712                     } else {
713                         // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
714                         // This method will release curr if this is not the case
715                         transferToNextInLineOrRelease(curr);
716                     }
717                     curr = newChunkAllocation(size);
718                 }
719             }
720 
721             current = curr;
722             try {
723                 int remainingCapacity = curr.remainingCapacity();
724                 assert remainingCapacity >= size;
725                 if (remainingCapacity > startingCapacity) {
726                     curr.readInitInto(buf, size, startingCapacity, maxCapacity);
727                     curr = null;
728                 } else {
729                     curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
730                 }
731             } finally {
732                 if (curr != null) {
733                     // Release in a finally block so even if readInitInto(...) would throw we would still correctly
734                     // release the current chunk before null it out.
735                     curr.release();
736                     current = null;
737                 }
738             }
739             return true;
740         }
741 
742         private int getStartingCapacity(int size, int maxCapacity) {
743             // Predict starting capacity from localUpperBufSize, but place limits on the max starting capacity
744             // based on the requested size, because localUpperBufSize can potentially be quite large.
745             int startCapLimits;
746             if (size <= 2048) { // Less than or equal to 2 KiB.
747                 startCapLimits = 16384; // Use at most 16 KiB.
748             } else if (size <= 32768) { // Less than or equal to 32 KiB.
749                 startCapLimits = 65536; // Use at most 64 KiB, which is also the AdaptiveRecvByteBufAllocator max.
750             } else {
751                 startCapLimits = size * 2; // Otherwise use at most twice the requested memory.
752             }
753             int startingCapacity = Math.min(startCapLimits, localUpperBufSize);
754             startingCapacity = Math.max(size, Math.min(maxCapacity, startingCapacity));
755             return startingCapacity;
756         }
757 
758         private void restoreMagazineFreed() {
759             Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
760             if (next != null && next != MAGAZINE_FREED) {
761                 next.release(); // A chunk snuck in through a race. Release it after restoring MAGAZINE_FREED state.
762             }
763         }
764 
765         private void transferToNextInLineOrRelease(Chunk chunk) {
766             if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
767                 return;
768             }
769 
770             Chunk nextChunk = NEXT_IN_LINE.get(this);
771             if (nextChunk != null && nextChunk != MAGAZINE_FREED
772                     && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
773                 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
774                     nextChunk.release();
775                     return;
776                 }
777             }
778             // Next-in-line is occupied. We don't try to add it to the central queue yet as it might still be used
779             // by some buffers and so is attached to a Magazine.
780             // Once a Chunk is completely released by Chunk.release() it will try to move itself to the queue
781             // as last resort.
782             chunk.release();
783         }
784 
785         private Chunk newChunkAllocation(int promptingSize) {
786             int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
787             int minChunks = size / MIN_CHUNK_SIZE;
788             if (MIN_CHUNK_SIZE * minChunks < size) {
789                 // Round up to nearest whole MIN_CHUNK_SIZE unit. The MIN_CHUNK_SIZE is an even multiple of many
790                 // popular small page sizes, like 4k, 16k, and 64k, which makes it easier for the system allocator
791                 // to manage the memory in terms of whole pages. This reduces memory fragmentation,
792                 // but without the potentially high overhead that power-of-2 chunk sizes would bring.
793                 size = MIN_CHUNK_SIZE * (1 + minChunks);
794             }
795 
796             // Limit chunks to the max size, even if the histogram suggests to go above it.
797             size = Math.min(size, MAX_CHUNK_SIZE);
798 
799             // If we haven't rotated the histogram yet, optimisticly record this chunk size as our preferred.
800             if (!hasHadRotation && sharedPrefChunkSize == MIN_CHUNK_SIZE) {
801                 sharedPrefChunkSize = size;
802             }
803 
804             ChunkAllocator chunkAllocator = parent.chunkAllocator;
805             return new Chunk(chunkAllocator.allocate(size, size), this, true);
806         }
807 
808         boolean trySetNextInLine(Chunk chunk) {
809             return NEXT_IN_LINE.compareAndSet(this, null, chunk);
810         }
811 
812         void free() {
813             // Release the current Chunk and the next that was stored for later usage.
814             restoreMagazineFreed();
815             long stamp = allocationLock.writeLock();
816             try {
817                 if (current != null) {
818                     current.release();
819                     current = null;
820                 }
821             } finally {
822                 allocationLock.unlockWrite(stamp);
823             }
824         }
825 
826         public AdaptiveByteBuf newBuffer() {
827             AdaptiveByteBuf buf;
828             if (handle == null) {
829                 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
830             } else {
831                 buf = bufferQueue.poll();
832                 if (buf == null) {
833                     buf = new AdaptiveByteBuf(handle);
834                 }
835             }
836             buf.resetRefCnt();
837             buf.discardMarks();
838             return buf;
839         }
840     }
841 
842     private static final class Chunk implements ReferenceCounted {
843 
844         private final AbstractByteBuf delegate;
845         private Magazine magazine;
846         private final AdaptivePoolingAllocator allocator;
847         private final int capacity;
848         private final boolean pooled;
849         private int allocatedBytes;
850         private static final long REFCNT_FIELD_OFFSET =
851                 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
852         private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
853                 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
854 
855         private static final ReferenceCountUpdater<Chunk> updater =
856                 new ReferenceCountUpdater<Chunk>() {
857                     @Override
858                     protected AtomicIntegerFieldUpdater<Chunk> updater() {
859                         return AIF_UPDATER;
860                     }
861                     @Override
862                     protected long unsafeOffset() {
863                         return REFCNT_FIELD_OFFSET;
864                     }
865                 };
866 
867         // Value might not equal "real" reference count, all access should be via the updater
868         @SuppressWarnings({"unused", "FieldMayBeFinal"})
869         private volatile int refCnt;
870 
871         Chunk() {
872             // Constructor only used by the MAGAZINE_FREED sentinel.
873             delegate = null;
874             magazine = null;
875             allocator = null;
876             capacity = 0;
877             pooled = false;
878         }
879 
880         Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
881             this.delegate = delegate;
882             this.pooled = pooled;
883             capacity = delegate.capacity();
884             updater.setInitialValue(this);
885             allocator = magazine.parent;
886             attachToMagazine(magazine);
887         }
888 
889         Magazine currentMagazine()  {
890             return magazine;
891         }
892 
893         void detachFromMagazine() {
894             if (magazine != null) {
895                 magazine.usedMemory.getAndAdd(-capacity);
896                 magazine = null;
897             }
898         }
899 
900         void attachToMagazine(Magazine magazine) {
901             assert this.magazine == null;
902             this.magazine = magazine;
903             magazine.usedMemory.getAndAdd(capacity);
904         }
905 
906         @Override
907         public Chunk touch(Object hint) {
908             return this;
909         }
910 
911         @Override
912         public int refCnt() {
913             return updater.refCnt(this);
914         }
915 
916         @Override
917         public Chunk retain() {
918             return updater.retain(this);
919         }
920 
921         @Override
922         public Chunk retain(int increment) {
923             return updater.retain(this, increment);
924         }
925 
926         @Override
927         public Chunk touch() {
928             return this;
929         }
930 
931         @Override
932         public boolean release() {
933             if (updater.release(this)) {
934                 deallocate();
935                 return true;
936             }
937             return false;
938         }
939 
940         @Override
941         public boolean release(int decrement) {
942             if (updater.release(this, decrement)) {
943                 deallocate();
944                 return true;
945             }
946             return false;
947         }
948 
949         private void deallocate() {
950             Magazine mag = magazine;
951             AdaptivePoolingAllocator parent = mag.parent;
952             int chunkSize = mag.preferredChunkSize();
953             int memSize = delegate.capacity();
954             if (!pooled || shouldReleaseSuboptimalChunkSize(memSize, chunkSize)) {
955                 // Drop the chunk if the parent allocator is closed,
956                 // or if the chunk deviates too much from the preferred chunk size.
957                 detachFromMagazine();
958                 delegate.release();
959             } else {
960                 updater.resetRefCnt(this);
961                 delegate.setIndex(0, 0);
962                 allocatedBytes = 0;
963                 if (!mag.trySetNextInLine(this)) {
964                     // As this Chunk does not belong to the mag anymore we need to decrease the used memory .
965                     detachFromMagazine();
966                     if (!parent.offerToQueue(this)) {
967                         // The central queue is full. Ensure we release again as we previously did use resetRefCnt()
968                         // which did increase the reference count by 1.
969                         boolean released = updater.release(this);
970                         delegate.release();
971                         assert released;
972                     }
973                 }
974             }
975         }
976 
977         private static boolean shouldReleaseSuboptimalChunkSize(int givenSize, int preferredSize) {
978             int givenChunks = givenSize / MIN_CHUNK_SIZE;
979             int preferredChunks = preferredSize / MIN_CHUNK_SIZE;
980             int deviation = Math.abs(givenChunks - preferredChunks);
981 
982             // Retire chunks with a 5% probability per unit of MIN_CHUNK_SIZE deviation from preference.
983             return deviation != 0 &&
984                     ThreadLocalRandom.current().nextDouble() * 20.0 < deviation;
985         }
986 
987         public void readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
988             int startIndex = allocatedBytes;
989             allocatedBytes = startIndex + startingCapacity;
990             Chunk chunk = this;
991             chunk.retain();
992             try {
993                 buf.init(delegate, chunk, 0, 0, startIndex, size, startingCapacity, maxCapacity);
994                 chunk = null;
995             } finally {
996                 if (chunk != null) {
997                     // If chunk is not null we know that buf.init(...) failed and so we need to manually release
998                     // the chunk again as we retained it before calling buf.init(...). Beside this we also need to
999                     // restore the old allocatedBytes value.
1000                     allocatedBytes = startIndex;
1001                     chunk.release();
1002                 }
1003             }
1004         }
1005 
1006         public int remainingCapacity() {
1007             return capacity - allocatedBytes;
1008         }
1009 
1010         public int capacity() {
1011             return capacity;
1012         }
1013     }
1014 
1015     static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
1016 
1017         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
1018 
1019         private int adjustment;
1020         private AbstractByteBuf rootParent;
1021         Chunk chunk;
1022         private int length;
1023         private int maxFastCapacity;
1024         private ByteBuffer tmpNioBuf;
1025         private boolean hasArray;
1026         private boolean hasMemoryAddress;
1027 
1028         AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
1029             super(0);
1030             handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
1031         }
1032 
1033         void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
1034                   int adjustment, int size, int capacity, int maxCapacity) {
1035             this.adjustment = adjustment;
1036             chunk = wrapped;
1037             length = size;
1038             maxFastCapacity = capacity;
1039             maxCapacity(maxCapacity);
1040             setIndex0(readerIndex, writerIndex);
1041             hasArray = unwrapped.hasArray();
1042             hasMemoryAddress = unwrapped.hasMemoryAddress();
1043             rootParent = unwrapped;
1044             tmpNioBuf = null;
1045         }
1046 
1047         private AbstractByteBuf rootParent() {
1048             final AbstractByteBuf rootParent = this.rootParent;
1049             if (rootParent != null) {
1050                 return rootParent;
1051             }
1052             throw new IllegalReferenceCountException();
1053         }
1054 
1055         @Override
1056         public int capacity() {
1057             return length;
1058         }
1059 
1060         @Override
1061         public int maxFastWritableBytes() {
1062             return maxFastCapacity;
1063         }
1064 
1065         @Override
1066         public ByteBuf capacity(int newCapacity) {
1067             if (length <= newCapacity && newCapacity <= maxFastCapacity) {
1068                 ensureAccessible();
1069                 length = newCapacity;
1070                 return this;
1071             }
1072             checkNewCapacity(newCapacity);
1073             if (newCapacity < capacity()) {
1074                 length = newCapacity;
1075                 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
1076                 return this;
1077             }
1078 
1079             // Reallocation required.
1080             Chunk chunk = this.chunk;
1081             AdaptivePoolingAllocator allocator = chunk.allocator;
1082             int readerIndex = this.readerIndex;
1083             int writerIndex = this.writerIndex;
1084             int baseOldRootIndex = adjustment;
1085             int oldCapacity = length;
1086             AbstractByteBuf oldRoot = rootParent();
1087             length = 0; // Don't record buffer size statistics for this allocation.
1088             allocator.allocate(newCapacity, maxCapacity(), this);
1089             oldRoot.getBytes(baseOldRootIndex, this, 0, oldCapacity);
1090             chunk.release();
1091             this.readerIndex = readerIndex;
1092             this.writerIndex = writerIndex;
1093             return this;
1094         }
1095 
1096         @Override
1097         public ByteBufAllocator alloc() {
1098             return rootParent().alloc();
1099         }
1100 
1101         @Override
1102         public ByteOrder order() {
1103             return rootParent().order();
1104         }
1105 
1106         @Override
1107         public ByteBuf unwrap() {
1108             return null;
1109         }
1110 
1111         @Override
1112         public boolean isDirect() {
1113             return rootParent().isDirect();
1114         }
1115 
1116         @Override
1117         public int arrayOffset() {
1118             return idx(rootParent().arrayOffset());
1119         }
1120 
1121         @Override
1122         public boolean hasMemoryAddress() {
1123             return hasMemoryAddress;
1124         }
1125 
1126         @Override
1127         public long memoryAddress() {
1128             ensureAccessible();
1129             return rootParent().memoryAddress() + adjustment;
1130         }
1131 
1132         @Override
1133         public ByteBuffer nioBuffer(int index, int length) {
1134             checkIndex(index, length);
1135             return rootParent().nioBuffer(idx(index), length);
1136         }
1137 
1138         @Override
1139         public ByteBuffer internalNioBuffer(int index, int length) {
1140             checkIndex(index, length);
1141             return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1142         }
1143 
1144         private ByteBuffer internalNioBuffer() {
1145             if (tmpNioBuf == null) {
1146                 tmpNioBuf = rootParent().nioBuffer(adjustment, maxFastCapacity);
1147             }
1148             return (ByteBuffer) tmpNioBuf.clear();
1149         }
1150 
1151         @Override
1152         public ByteBuffer[] nioBuffers(int index, int length) {
1153             checkIndex(index, length);
1154             return rootParent().nioBuffers(idx(index), length);
1155         }
1156 
1157         @Override
1158         public boolean hasArray() {
1159             return hasArray;
1160         }
1161 
1162         @Override
1163         public byte[] array() {
1164             ensureAccessible();
1165             return rootParent().array();
1166         }
1167 
1168         @Override
1169         public ByteBuf copy(int index, int length) {
1170             checkIndex(index, length);
1171             return rootParent().copy(idx(index), length);
1172         }
1173 
1174         @Override
1175         public int nioBufferCount() {
1176             return rootParent().nioBufferCount();
1177         }
1178 
1179         @Override
1180         protected byte _getByte(int index) {
1181             return rootParent()._getByte(idx(index));
1182         }
1183 
1184         @Override
1185         protected short _getShort(int index) {
1186             return rootParent()._getShort(idx(index));
1187         }
1188 
1189         @Override
1190         protected short _getShortLE(int index) {
1191             return rootParent()._getShortLE(idx(index));
1192         }
1193 
1194         @Override
1195         protected int _getUnsignedMedium(int index) {
1196             return rootParent()._getUnsignedMedium(idx(index));
1197         }
1198 
1199         @Override
1200         protected int _getUnsignedMediumLE(int index) {
1201             return rootParent()._getUnsignedMediumLE(idx(index));
1202         }
1203 
1204         @Override
1205         protected int _getInt(int index) {
1206             return rootParent()._getInt(idx(index));
1207         }
1208 
1209         @Override
1210         protected int _getIntLE(int index) {
1211             return rootParent()._getIntLE(idx(index));
1212         }
1213 
1214         @Override
1215         protected long _getLong(int index) {
1216             return rootParent()._getLong(idx(index));
1217         }
1218 
1219         @Override
1220         protected long _getLongLE(int index) {
1221             return rootParent()._getLongLE(idx(index));
1222         }
1223 
1224         @Override
1225         public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1226             checkIndex(index, length);
1227             rootParent().getBytes(idx(index), dst, dstIndex, length);
1228             return this;
1229         }
1230 
1231         @Override
1232         public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1233             checkIndex(index, length);
1234             rootParent().getBytes(idx(index), dst, dstIndex, length);
1235             return this;
1236         }
1237 
1238         @Override
1239         public ByteBuf getBytes(int index, ByteBuffer dst) {
1240             checkIndex(index, dst.remaining());
1241             rootParent().getBytes(idx(index), dst);
1242             return this;
1243         }
1244 
1245         @Override
1246         protected void _setByte(int index, int value) {
1247             rootParent()._setByte(idx(index), value);
1248         }
1249 
1250         @Override
1251         protected void _setShort(int index, int value) {
1252             rootParent()._setShort(idx(index), value);
1253         }
1254 
1255         @Override
1256         protected void _setShortLE(int index, int value) {
1257             rootParent()._setShortLE(idx(index), value);
1258         }
1259 
1260         @Override
1261         protected void _setMedium(int index, int value) {
1262             rootParent()._setMedium(idx(index), value);
1263         }
1264 
1265         @Override
1266         protected void _setMediumLE(int index, int value) {
1267             rootParent()._setMediumLE(idx(index), value);
1268         }
1269 
1270         @Override
1271         protected void _setInt(int index, int value) {
1272             rootParent()._setInt(idx(index), value);
1273         }
1274 
1275         @Override
1276         protected void _setIntLE(int index, int value) {
1277             rootParent()._setIntLE(idx(index), value);
1278         }
1279 
1280         @Override
1281         protected void _setLong(int index, long value) {
1282             rootParent()._setLong(idx(index), value);
1283         }
1284 
1285         @Override
1286         protected void _setLongLE(int index, long value) {
1287             rootParent().setLongLE(idx(index), value);
1288         }
1289 
1290         @Override
1291         public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1292             checkIndex(index, length);
1293             ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1294             tmp.put(src, srcIndex, length);
1295             return this;
1296         }
1297 
1298         @Override
1299         public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1300             checkIndex(index, length);
1301             ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1302             tmp.put(src.nioBuffer(srcIndex, length));
1303             return this;
1304         }
1305 
1306         @Override
1307         public ByteBuf setBytes(int index, ByteBuffer src) {
1308             checkIndex(index, src.remaining());
1309             ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1310             tmp.put(src);
1311             return this;
1312         }
1313 
1314         @Override
1315         public ByteBuf getBytes(int index, OutputStream out, int length)
1316                 throws IOException {
1317             checkIndex(index, length);
1318             if (length != 0) {
1319                 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1320             }
1321             return this;
1322         }
1323 
1324         @Override
1325         public int getBytes(int index, GatheringByteChannel out, int length)
1326                 throws IOException {
1327             return out.write(internalNioBuffer(index, length).duplicate());
1328         }
1329 
1330         @Override
1331         public int getBytes(int index, FileChannel out, long position, int length)
1332                 throws IOException {
1333             return out.write(internalNioBuffer(index, length).duplicate(), position);
1334         }
1335 
1336         @Override
1337         public int setBytes(int index, InputStream in, int length)
1338                 throws IOException {
1339             checkIndex(index, length);
1340             final AbstractByteBuf rootParent = rootParent();
1341             if (rootParent.hasArray()) {
1342                 return rootParent.setBytes(idx(index), in, length);
1343             }
1344             byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1345             int readBytes = in.read(tmp, 0, length);
1346             if (readBytes <= 0) {
1347                 return readBytes;
1348             }
1349             setBytes(index, tmp, 0, readBytes);
1350             return readBytes;
1351         }
1352 
1353         @Override
1354         public int setBytes(int index, ScatteringByteChannel in, int length)
1355                 throws IOException {
1356             try {
1357                 return in.read(internalNioBuffer(index, length).duplicate());
1358             } catch (ClosedChannelException ignored) {
1359                 return -1;
1360             }
1361         }
1362 
1363         @Override
1364         public int setBytes(int index, FileChannel in, long position, int length)
1365                 throws IOException {
1366             try {
1367                 return in.read(internalNioBuffer(index, length).duplicate(), position);
1368             } catch (ClosedChannelException ignored) {
1369                 return -1;
1370             }
1371         }
1372 
1373         @Override
1374         public int setCharSequence(int index, CharSequence sequence, Charset charset) {
1375             checkIndex(index, sequence.length());
1376             return rootParent().setCharSequence(idx(index), sequence, charset);
1377         }
1378 
1379         @Override
1380         public int forEachByte(int index, int length, ByteProcessor processor) {
1381             checkIndex(index, length);
1382             int ret = rootParent().forEachByte(idx(index), length, processor);
1383             return forEachResult(ret);
1384         }
1385 
1386         @Override
1387         public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1388             checkIndex(index, length);
1389             int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1390             return forEachResult(ret);
1391         }
1392 
1393         private int forEachResult(int ret) {
1394             if (ret < adjustment) {
1395                 return -1;
1396             }
1397             return ret - adjustment;
1398         }
1399 
1400         @Override
1401         public boolean isContiguous() {
1402             return rootParent().isContiguous();
1403         }
1404 
1405         private int idx(int index) {
1406             return index + adjustment;
1407         }
1408 
1409         @Override
1410         protected void deallocate() {
1411             if (chunk != null) {
1412                 chunk.release();
1413             }
1414             tmpNioBuf = null;
1415             chunk = null;
1416             rootParent = null;
1417             if (handle instanceof EnhancedHandle) {
1418                 EnhancedHandle<AdaptiveByteBuf>  enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1419                 enhancedHandle.unguardedRecycle(this);
1420             } else {
1421                 handle.recycle(this);
1422             }
1423         }
1424     }
1425 
1426     /**
1427      * The strategy for how {@link AdaptivePoolingAllocator} should allocate chunk buffers.
1428      */
1429     interface ChunkAllocator {
1430         /**
1431          * Allocate a buffer for a chunk. This can be any kind of {@link AbstractByteBuf} implementation.
1432          * @param initialCapacity The initial capacity of the returned {@link AbstractByteBuf}.
1433          * @param maxCapacity The maximum capacity of the returned {@link AbstractByteBuf}.
1434          * @return The buffer that represents the chunk memory.
1435          */
1436         AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1437     }
1438 }