View Javadoc
1   /*
2    * Copyright 2022 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.buffer;
17  
18  import io.netty.util.ByteProcessor;
19  import io.netty.util.IllegalReferenceCountException;
20  import io.netty.util.NettyRuntime;
21  import io.netty.util.Recycler.EnhancedHandle;
22  import io.netty.util.ReferenceCounted;
23  import io.netty.util.concurrent.FastThreadLocal;
24  import io.netty.util.concurrent.FastThreadLocalThread;
25  import io.netty.util.internal.ObjectPool;
26  import io.netty.util.internal.ObjectUtil;
27  import io.netty.util.internal.PlatformDependent;
28  import io.netty.util.internal.ReferenceCountUpdater;
29  import io.netty.util.internal.SuppressJava6Requirement;
30  import io.netty.util.internal.SystemPropertyUtil;
31  import io.netty.util.internal.ThreadExecutorMap;
32  import io.netty.util.internal.UnstableApi;
33  
34  import java.io.IOException;
35  import java.io.InputStream;
36  import java.io.OutputStream;
37  import java.nio.ByteBuffer;
38  import java.nio.ByteOrder;
39  import java.nio.channels.ClosedChannelException;
40  import java.nio.channels.FileChannel;
41  import java.nio.channels.GatheringByteChannel;
42  import java.nio.channels.ScatteringByteChannel;
43  import java.util.Arrays;
44  import java.util.Queue;
45  import java.util.Set;
46  import java.util.concurrent.ConcurrentLinkedQueue;
47  import java.util.concurrent.CopyOnWriteArraySet;
48  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
49  import java.util.concurrent.atomic.AtomicLong;
50  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
51  import java.util.concurrent.locks.StampedLock;
52  
53  /**
54   * An auto-tuning pooling allocator, that follows an anti-generational hypothesis.
55   * <p>
56   * The allocator is organized into a list of Magazines, and each magazine has a chunk-buffer that they allocate buffers
57   * from.
58   * <p>
59   * The magazines hold the mutexes that ensure the thread-safety of the allocator, and each thread picks a magazine
60   * based on the id of the thread. This spreads the contention of multi-threaded access across the magazines.
61   * If contention is detected above a certain threshold, the number of magazines are increased in response to the
62   * contention.
63   * <p>
64   * The magazines maintain histograms of the sizes of the allocations they do. The histograms are used to compute the
65   * preferred chunk size. The preferred chunk size is one that is big enough to service 10 allocations of the
66   * 99-percentile size. This way, the chunk size is adapted to the allocation patterns.
67   * <p>
68   * Computing the preferred chunk size is a somewhat expensive operation. Therefore, the frequency with which this is
69   * done, is also adapted to the allocation pattern. If a newly computed preferred chunk is the same as the previous
70   * preferred chunk size, then the frequency is reduced. Otherwise, the frequency is increased.
71   * <p>
72   * This allows the allocator to quickly respond to changes in the application workload,
73   * without suffering undue overhead from maintaining its statistics.
74   * <p>
75   * Since magazines are "relatively thread-local", the allocator has a central queue that allow excess chunks from any
76   * magazine, to be shared with other magazines.
77   * The {@link #createSharedChunkQueue()} method can be overridden to customize this queue.
78   */
79  @SuppressJava6Requirement(reason = "Guarded by version check")
80  @UnstableApi
81  final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.AdaptiveAllocatorApi {
82  
83      enum MagazineCaching {
84          EventLoopThreads,
85          FastThreadLocalThreads,
86          None
87      }
88  
89      private static final int EXPANSION_ATTEMPTS = 3;
90      private static final int INITIAL_MAGAZINES = 4;
91      private static final int RETIRE_CAPACITY = 4 * 1024;
92      private static final int MIN_CHUNK_SIZE = 128 * 1024;
93      private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
94      private static final int BUFS_PER_CHUNK = 10; // For large buffers, aim to have about this many buffers per chunk.
95  
96      /**
97       * The maximum size of a pooled chunk, in bytes. Allocations bigger than this will never be pooled.
98       * <p>
99       * This number is 10 MiB, and is derived from the limitations of internal histograms.
100      */
101     private static final int MAX_CHUNK_SIZE =
102             BUFS_PER_CHUNK * (1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT); // 10 MiB.
103 
104     /**
105      * The capacity if the central queue that allow chunks to be shared across magazines.
106      * The default size is {@link NettyRuntime#availableProcessors()},
107      * and the maximum number of magazines is twice this.
108      * <p>
109      * This means the maximum amount of memory that we can have allocated-but-not-in-use is
110      * 5 * {@link NettyRuntime#availableProcessors()} * {@link #MAX_CHUNK_SIZE} bytes.
111      */
112     private static final int CENTRAL_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
113             "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors());
114 
115     /**
116      * The capacity if the magazine local buffer queue. This queue just pools the outer ByteBuf instance and not
117      * the actual memory and so helps to reduce GC pressure.
118      */
119     private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
120             "io.netty.allocator.magazineBufferQueueCapacity", 1024);
121 
122     private static final Object NO_MAGAZINE = Boolean.TRUE;
123 
124     private final ChunkAllocator chunkAllocator;
125     private final Queue<Chunk> centralQueue;
126     private final StampedLock magazineExpandLock;
127     private volatile Magazine[] magazines;
128     private final FastThreadLocal<Object> threadLocalMagazine;
129     private final Set<Magazine> liveCachedMagazines;
130     private volatile boolean freed;
131 
132     AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
133         ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
134         ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
135         this.chunkAllocator = chunkAllocator;
136         centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
137         magazineExpandLock = new StampedLock();
138         if (magazineCaching != MagazineCaching.None) {
139             assert magazineCaching == MagazineCaching.EventLoopThreads ||
140                    magazineCaching == MagazineCaching.FastThreadLocalThreads;
141             final boolean cachedMagazinesNonEventLoopThreads =
142                     magazineCaching == MagazineCaching.FastThreadLocalThreads;
143             final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
144             threadLocalMagazine = new FastThreadLocal<Object>() {
145                 @Override
146                 protected Object initialValue() {
147                     if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
148                         Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
149 
150                         if (FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread())) {
151                             // Only add it to the liveMagazines if we can guarantee that onRemoval(...) is called,
152                             // as otherwise we might end up holding the reference forever.
153                             liveMagazines.add(mag);
154                         }
155                         return mag;
156                     }
157                     return NO_MAGAZINE;
158                 }
159 
160                 @Override
161                 protected void onRemoval(final Object value) throws Exception {
162                     if (value != NO_MAGAZINE) {
163                         liveMagazines.remove(value);
164                     }
165                 }
166             };
167             liveCachedMagazines = liveMagazines;
168         } else {
169             threadLocalMagazine = null;
170             liveCachedMagazines = null;
171         }
172         Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
173         for (int i = 0; i < mags.length; i++) {
174             mags[i] = new Magazine(this);
175         }
176         magazines = mags;
177     }
178 
179     /**
180      * Create a thread-safe multi-producer, multi-consumer queue to hold chunks that spill over from the
181      * internal Magazines.
182      * <p>
183      * Each Magazine can only hold two chunks at any one time: the chunk it currently allocates from,
184      * and the next-in-line chunk which will be used for allocation once the current one has been used up.
185      * This queue will be used by magazines to share any excess chunks they allocate, so that they don't need to
186      * allocate new chunks when their current and next-in-line chunks have both been used up.
187      * <p>
188      * The simplest implementation of this method is to return a new {@link ConcurrentLinkedQueue}.
189      * However, the {@code CLQ} is unbounded, and this means there's no limit to how many chunks can be cached in this
190      * queue.
191      * <p>
192      * Each chunk in this queue can be up to {@link #MAX_CHUNK_SIZE} in size, so it is recommended to use a bounded
193      * queue to limit the maximum memory usage.
194      * <p>
195      * The default implementation will create a bounded queue with a capacity of {@link #CENTRAL_QUEUE_CAPACITY}.
196      *
197      * @return A new multi-producer, multi-consumer queue.
198      */
199     private static Queue<Chunk> createSharedChunkQueue() {
200         return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
201     }
202 
203     @Override
204     public ByteBuf allocate(int size, int maxCapacity) {
205         return allocate(size, maxCapacity, Thread.currentThread(), null);
206     }
207 
208     private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
209         if (size <= MAX_CHUNK_SIZE) {
210             int sizeBucket = AllocationStatistics.sizeBucket(size); // Compute outside of Magazine lock for better ILP.
211             FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
212             if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
213                 Object mag = threadLocalMagazine.get();
214                 if (mag != NO_MAGAZINE) {
215                     Magazine magazine = (Magazine) mag;
216                     if (buf == null) {
217                         buf = magazine.newBuffer();
218                     }
219                     boolean allocated = magazine.tryAllocate(size, sizeBucket, maxCapacity, buf);
220                     assert allocated : "Allocation of threadLocalMagazine must always succeed";
221                     return buf;
222                 }
223             }
224             long threadId = currentThread.getId();
225             Magazine[] mags;
226             int expansions = 0;
227             do {
228                 mags = magazines;
229                 int mask = mags.length - 1;
230                 int index = (int) (threadId & mask);
231                 for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
232                     Magazine mag = mags[index + i & mask];
233                     if (buf == null) {
234                         buf = mag.newBuffer();
235                     }
236                     if (mag.tryAllocate(size, sizeBucket, maxCapacity, buf)) {
237                         // Was able to allocate.
238                         return buf;
239                     }
240                 }
241                 expansions++;
242             } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
243         }
244 
245         // The magazines failed us, or the buffer is too big to be pooled.
246         return allocateFallback(size, maxCapacity, currentThread, buf);
247     }
248 
249     private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
250         // If we don't already have a buffer, obtain one from the most conveniently available magazine.
251         Magazine magazine;
252         if (buf != null) {
253             Chunk chunk = buf.chunk;
254             magazine = chunk != null && chunk != Magazine.MAGAZINE_FREED?
255                     chunk.magazine : getFallbackMagazine(currentThread);
256         } else {
257             magazine = getFallbackMagazine(currentThread);
258             buf = magazine.newBuffer();
259         }
260         // Create a one-off chunk for this allocation.
261         AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
262         Chunk chunk = new Chunk(innerChunk, magazine, false);
263         try {
264             chunk.readInitInto(buf, size, maxCapacity);
265         } finally {
266             // As the chunk is an one-off we need to always call release explicitly as readInitInto(...)
267             // will take care of retain once when successful. Once The AdaptiveByteBuf is released it will
268             // completely release the Chunk and so the contained innerChunk.
269             chunk.release();
270         }
271         return buf;
272     }
273 
274     private Magazine getFallbackMagazine(Thread currentThread) {
275         Object tlMag;
276         FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
277         if (threadLocalMagazine != null &&
278                 currentThread instanceof FastThreadLocalThread &&
279                 (tlMag = threadLocalMagazine.get()) != NO_MAGAZINE) {
280             return (Magazine) tlMag;
281         }
282         Magazine[] mags = magazines;
283         return mags[(int) currentThread.getId() & mags.length - 1];
284     }
285 
286     /**
287      * Allocate into the given buffer. Used by {@link AdaptiveByteBuf#capacity(int)}.
288      */
289     void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
290         AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
291         assert result == into: "Re-allocation created separate buffer instance";
292     }
293 
294     @Override
295     public long usedMemory() {
296         long sum = 0;
297         for (Chunk chunk : centralQueue) {
298             sum += chunk.capacity();
299         }
300         for (Magazine magazine : magazines) {
301             sum += magazine.usedMemory.get();
302         }
303         if (liveCachedMagazines != null) {
304             for (Magazine magazine : liveCachedMagazines) {
305                 sum += magazine.usedMemory.get();
306             }
307         }
308         return sum;
309     }
310 
311     private boolean tryExpandMagazines(int currentLength) {
312         if (currentLength >= MAX_STRIPES) {
313             return true;
314         }
315         long writeLock = magazineExpandLock.tryWriteLock();
316         if (writeLock != 0) {
317             try {
318                 Magazine[] mags = magazines;
319                 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
320                     return true;
321                 }
322                 int preferredChunkSize = mags[0].sharedPrefChunkSize;
323                 Magazine[] expanded = new Magazine[mags.length * 2];
324                 for (int i = 0, l = expanded.length; i < l; i++) {
325                     Magazine m = new Magazine(this);
326                     m.localPrefChunkSize = preferredChunkSize;
327                     m.sharedPrefChunkSize = preferredChunkSize;
328                     expanded[i] = m;
329                 }
330                 magazines = expanded;
331                 for (Magazine magazine : mags) {
332                     magazine.free();
333                 }
334             } finally {
335                 magazineExpandLock.unlockWrite(writeLock);
336             }
337         }
338         return true;
339     }
340 
341     private boolean offerToQueue(Chunk buffer) {
342         if (freed) {
343             return false;
344         }
345         return centralQueue.offer(buffer);
346     }
347 
348     // Ensure that we release all previous pooled resources when this object is finalized. This is needed as otherwise
349     // we might end up with leaks. While these leaks are usually harmless in reality it would still at least be
350     // very confusing for users.
351     @Override
352     protected void finalize() throws Throwable {
353         try {
354             super.finalize();
355         } finally {
356             free();
357         }
358     }
359 
360     private void free() {
361         freed = true;
362         long stamp = magazineExpandLock.writeLock();
363         try {
364             Magazine[] mags = magazines;
365             for (Magazine magazine : mags) {
366                 magazine.free();
367             }
368         } finally {
369             magazineExpandLock.unlockWrite(stamp);
370         }
371         for (;;) {
372             Chunk chunk = centralQueue.poll();
373             if (chunk == null) {
374                 break;
375             }
376             chunk.release();
377         }
378     }
379 
380     static int sizeBucket(int size) {
381         return AllocationStatistics.sizeBucket(size);
382     }
383 
384     @SuppressWarnings("checkstyle:finalclass") // Checkstyle mistakenly believes this class should be final.
385     private static class AllocationStatistics {
386         private static final int MIN_DATUM_TARGET = 1024;
387         private static final int MAX_DATUM_TARGET = 65534;
388         private static final int INIT_DATUM_TARGET = 9;
389         private static final int HISTO_MIN_BUCKET_SHIFT = 13; // Smallest bucket is 1 << 13 = 8192 bytes in size.
390         private static final int HISTO_MAX_BUCKET_SHIFT = 20; // Biggest bucket is 1 << 20 = 1 MiB bytes in size.
391         private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT; // 8 buckets.
392         private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
393         private static final int SIZE_MAX_MASK = MAX_CHUNK_SIZE - 1;
394 
395         protected final AdaptivePoolingAllocator parent;
396         private final boolean shareable;
397         private final short[][] histos = {
398                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
399                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
400         };
401         private short[] histo = histos[0];
402         private final int[] sums = new int[HISTO_BUCKET_COUNT];
403 
404         private int histoIndex;
405         private int datumCount;
406         private int datumTarget = INIT_DATUM_TARGET;
407         protected volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
408         protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
409 
410         private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
411             this.parent = parent;
412             this.shareable = shareable;
413         }
414 
415         protected void recordAllocationSize(int bucket) {
416             histo[bucket]++;
417             if (datumCount++ == datumTarget) {
418                 rotateHistograms();
419             }
420         }
421 
422         static int sizeBucket(int size) {
423             if (size == 0) {
424                 return 0;
425             }
426             // Minimum chunk size is 128 KiB. We'll only make bigger chunks if the 99-percentile is 16 KiB or greater,
427             // so we truncate and roll up the bottom part of the histogram to 8 KiB.
428             // The upper size band is 1 MiB, and that gives us exactly 8 size buckets,
429             // which is a magical number for JIT optimisations.
430             int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & SIZE_MAX_MASK;
431             return Math.min(Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize), HISTO_MAX_BUCKET_MASK);
432         }
433 
434         private void rotateHistograms() {
435             short[][] hs = histos;
436             for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
437                 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
438             }
439             int sum = 0;
440             for (int count : sums) {
441                 sum  += count;
442             }
443             int targetPercentile = (int) (sum * 0.99);
444             int sizeBucket = 0;
445             for (; sizeBucket < sums.length; sizeBucket++) {
446                 if (sums[sizeBucket] > targetPercentile) {
447                     break;
448                 }
449                 targetPercentile -= sums[sizeBucket];
450             }
451             int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
452             int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
453             localPrefChunkSize = prefChunkSize;
454             if (shareable) {
455                 for (Magazine mag : parent.magazines) {
456                     prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
457                 }
458             }
459             if (sharedPrefChunkSize != prefChunkSize) {
460                 // Preferred chunk size changed. Increase check frequency.
461                 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
462                 sharedPrefChunkSize = prefChunkSize;
463             } else {
464                 // Preferred chunk size did not change. Check less often.
465                 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
466             }
467 
468             histoIndex = histoIndex + 1 & 3;
469             histo = histos[histoIndex];
470             datumCount = 0;
471             Arrays.fill(histo, (short) 0);
472         }
473 
474         /**
475          * Get the preferred chunk size, based on statistics from the {@linkplain #recordAllocationSize(int) recorded}
476          * allocation sizes.
477          * <p>
478          * This method must be thread-safe.
479          *
480          * @return The currently preferred chunk allocation size.
481          */
482         protected int preferredChunkSize() {
483             return sharedPrefChunkSize;
484         }
485     }
486 
487     @SuppressJava6Requirement(reason = "Guarded by version check")
488     private static final class Magazine extends AllocationStatistics {
489         private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
490         static {
491             NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
492         }
493         private static final Chunk MAGAZINE_FREED = new Chunk();
494 
495         private static final ObjectPool<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
496                 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
497                     @Override
498                     public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
499                         return new AdaptiveByteBuf(handle);
500                     }
501                 });
502 
503         private Chunk current;
504         @SuppressWarnings("unused") // updated via NEXT_IN_LINE
505         private volatile Chunk nextInLine;
506         private final AtomicLong usedMemory;
507         private final StampedLock allocationLock;
508         private final Queue<AdaptiveByteBuf> bufferQueue;
509         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
510 
511         Magazine(AdaptivePoolingAllocator parent) {
512             this(parent, true);
513         }
514 
515         Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
516             super(parent, shareable);
517 
518             if (shareable) {
519                 // We only need the StampedLock if this Magazine will be shared across threads.
520                 allocationLock = new StampedLock();
521                 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
522                 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
523                     @Override
524                     public void recycle(AdaptiveByteBuf self) {
525                         bufferQueue.offer(self);
526                     }
527                 };
528             } else {
529                 allocationLock = null;
530                 bufferQueue = null;
531                 handle = null;
532             }
533             usedMemory = new AtomicLong();
534         }
535 
536         public boolean tryAllocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
537             if (allocationLock == null) {
538                 // This magazine is not shared across threads, just allocate directly.
539                 return allocate(size, sizeBucket, maxCapacity, buf);
540             }
541 
542             // Try to retrieve the lock and if successful allocate.
543             long writeLock = allocationLock.tryWriteLock();
544             if (writeLock != 0) {
545                 try {
546                     return allocate(size, sizeBucket, maxCapacity, buf);
547                 } finally {
548                     allocationLock.unlockWrite(writeLock);
549                 }
550             }
551             return false;
552         }
553 
554         private boolean allocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
555             recordAllocationSize(sizeBucket);
556             Chunk curr = current;
557             if (curr != null) {
558                 if (curr.remainingCapacity() > size) {
559                     curr.readInitInto(buf, size, maxCapacity);
560                     // We still have some bytes left that we can use for the next allocation, just early return.
561                     return true;
562                 }
563 
564                 // At this point we know that this will be the last time current will be used, so directly set it to
565                 // null and release it once we are done.
566                 current = null;
567                 if (curr.remainingCapacity() == size) {
568                     try {
569                         curr.readInitInto(buf, size, maxCapacity);
570                         return true;
571                     } finally {
572                         curr.release();
573                     }
574                 }
575             }
576             Chunk last = curr;
577             assert current == null;
578             // The fast-path for allocations did not work.
579             //
580             // Try to fetch the next "Magazine local" Chunk first, if this this fails as we don't have
581             // one setup we will poll our centralQueue. If this fails as well we will just allocate a new Chunk.
582             //
583             // In any case we will store the Chunk as the current so it will be used again for the next allocation and
584             // so be "reserved" by this Magazine for exclusive usage.
585             if (nextInLine != null) {
586                 curr = NEXT_IN_LINE.getAndSet(this, null);
587                 if (curr == MAGAZINE_FREED) {
588                     // Allocation raced with a stripe-resize that freed this magazine.
589                     restoreMagazineFreed();
590                     return false;
591                 }
592             } else {
593                 curr = parent.centralQueue.poll();
594                 if (curr == null) {
595                     curr = newChunkAllocation(size);
596                 }
597             }
598             current = curr;
599             assert current != null;
600             if (last != null) {
601                 if (last.remainingCapacity() < RETIRE_CAPACITY) {
602                     last.release();
603                 } else {
604                     transferChunk(last);
605                 }
606             }
607             if (curr.remainingCapacity() > size) {
608                 curr.readInitInto(buf, size, maxCapacity);
609             } else if (curr.remainingCapacity() == size) {
610                 try {
611                     curr.readInitInto(buf, size, maxCapacity);
612                 } finally {
613                     // Release in a finally block so even if readInitInto(...) would throw we would still correctly
614                     // release the current chunk before null it out.
615                     curr.release();
616                     current = null;
617                 }
618             } else {
619                 Chunk newChunk = newChunkAllocation(size);
620                 try {
621                     newChunk.readInitInto(buf, size, maxCapacity);
622                     if (curr.remainingCapacity() < RETIRE_CAPACITY) {
623                         curr.release();
624                         current = newChunk;
625                     } else {
626                         transferChunk(newChunk);
627                     }
628                     newChunk = null;
629                 } finally {
630                     if (newChunk != null) {
631                         assert current == null;
632                         // Something went wrong, let's ensure we not leak the newChunk.
633                         newChunk.release();
634                     }
635                 }
636             }
637             return true;
638         }
639 
640         private void restoreMagazineFreed() {
641             Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
642             if (next != null && next != MAGAZINE_FREED) {
643                 next.release(); // A chunk snuck in through a race. Release it after restoring MAGAZINE_FREED state.
644             }
645         }
646 
647         private void transferChunk(Chunk current) {
648             if (NEXT_IN_LINE.compareAndSet(this, null, current)
649                     || parent.offerToQueue(current)) {
650                 return;
651             }
652             Chunk nextChunk = NEXT_IN_LINE.get(this);
653             if (nextChunk != null && current.remainingCapacity() > nextChunk.remainingCapacity()) {
654                 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, current)) {
655                     if (nextChunk != MAGAZINE_FREED) {
656                         nextChunk.release();
657                     }
658                     return;
659                 }
660             }
661             // Next-in-line is occupied AND the central queue is full.
662             // Rare that we should get here, but we'll only do one allocation out of this chunk, then.
663             current.release();
664         }
665 
666         private Chunk newChunkAllocation(int promptingSize) {
667             int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
668             ChunkAllocator chunkAllocator = parent.chunkAllocator;
669             return new Chunk(chunkAllocator.allocate(size, size), this, true);
670         }
671 
672         boolean trySetNextInLine(Chunk chunk) {
673             return NEXT_IN_LINE.compareAndSet(this, null, chunk);
674         }
675 
676         void free() {
677             // Release the current Chunk and the next that was stored for later usage.
678             restoreMagazineFreed();
679             long stamp = allocationLock.writeLock();
680             try {
681                 if (current != null) {
682                     current.release();
683                     current = null;
684                 }
685             } finally {
686                 allocationLock.unlockWrite(stamp);
687             }
688         }
689 
690         public AdaptiveByteBuf newBuffer() {
691             AdaptiveByteBuf buf;
692             if (handle == null) {
693                 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
694             } else {
695                 buf = bufferQueue.poll();
696                 if (buf == null) {
697                     buf = new AdaptiveByteBuf(handle);
698                 }
699             }
700             buf.resetRefCnt();
701             buf.discardMarks();
702             return buf;
703         }
704     }
705 
706     private static final class Chunk implements ReferenceCounted {
707 
708         private final AbstractByteBuf delegate;
709         final Magazine magazine;
710         private final int capacity;
711         private final boolean pooled;
712         private int allocatedBytes;
713         private static final long REFCNT_FIELD_OFFSET =
714                 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
715         private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
716                 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
717 
718         private static final ReferenceCountUpdater<Chunk> updater =
719                 new ReferenceCountUpdater<Chunk>() {
720                     @Override
721                     protected AtomicIntegerFieldUpdater<Chunk> updater() {
722                         return AIF_UPDATER;
723                     }
724                     @Override
725                     protected long unsafeOffset() {
726                         return REFCNT_FIELD_OFFSET;
727                     }
728                 };
729 
730         // Value might not equal "real" reference count, all access should be via the updater
731         @SuppressWarnings({"unused", "FieldMayBeFinal"})
732         private volatile int refCnt;
733 
734         Chunk() {
735             // Constructor only used by the MAGAZINE_FREED sentinel.
736             delegate = null;
737             magazine = null;
738             capacity = 0;
739             pooled = false;
740         }
741 
742         Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
743             this.delegate = delegate;
744             this.magazine = magazine;
745             this.pooled = pooled;
746             capacity = delegate.capacity();
747             magazine.usedMemory.getAndAdd(capacity);
748             updater.setInitialValue(this);
749         }
750 
751         @Override
752         public Chunk touch(Object hint) {
753             return this;
754         }
755 
756         @Override
757         public int refCnt() {
758             return updater.refCnt(this);
759         }
760 
761         @Override
762         public Chunk retain() {
763             return updater.retain(this);
764         }
765 
766         @Override
767         public Chunk retain(int increment) {
768             return updater.retain(this, increment);
769         }
770 
771         @Override
772         public Chunk touch() {
773             return this;
774         }
775 
776         @Override
777         public boolean release() {
778             if (updater.release(this)) {
779                 deallocate();
780                 return true;
781             }
782             return false;
783         }
784 
785         @Override
786         public boolean release(int decrement) {
787             if (updater.release(this, decrement)) {
788                 deallocate();
789                 return true;
790             }
791             return false;
792         }
793 
794         private void deallocate() {
795             Magazine mag = magazine;
796             AdaptivePoolingAllocator parent = mag.parent;
797             int chunkSize = mag.preferredChunkSize();
798             int memSize = delegate.capacity();
799             if (!pooled || memSize < chunkSize || memSize > chunkSize + (chunkSize >> 1)) {
800                 // Drop the chunk if the parent allocator is closed, or if the chunk is smaller than the
801                 // preferred chunk size, or over 50% larger than the preferred chunk size.
802                 mag.usedMemory.getAndAdd(-capacity());
803                 delegate.release();
804             } else {
805                 updater.resetRefCnt(this);
806                 delegate.setIndex(0, 0);
807                 allocatedBytes = 0;
808                 if (!mag.trySetNextInLine(this)) {
809                     if (!parent.offerToQueue(this)) {
810                         // The central queue is full. Ensure we release again as we previously did use resetRefCnt()
811                         // which did increase the reference count by 1.
812                         boolean released = updater.release(this);
813                         delegate.release();
814                         assert released;
815                     }
816                 }
817             }
818         }
819 
820         public void readInitInto(AdaptiveByteBuf buf, int size, int maxCapacity) {
821             int startIndex = allocatedBytes;
822             allocatedBytes = startIndex + size;
823             Chunk chunk = this;
824             chunk.retain();
825             try {
826                 buf.init(delegate, chunk, 0, 0, startIndex, size, maxCapacity);
827                 chunk = null;
828             } finally {
829                 if (chunk != null) {
830                     // If chunk is not null we know that buf.init(...) failed and so we need to manually release
831                     // the chunk again as we retained it before calling buf.init(...).
832                     chunk.release();
833                 }
834             }
835         }
836 
837         public int remainingCapacity() {
838             return capacity - allocatedBytes;
839         }
840 
841         public int capacity() {
842             return capacity;
843         }
844     }
845 
846     static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
847 
848         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
849 
850         private int adjustment;
851         private AbstractByteBuf rootParent;
852         Chunk chunk;
853         private int length;
854         private ByteBuffer tmpNioBuf;
855         private boolean hasArray;
856         private boolean hasMemoryAddress;
857 
858         AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
859             super(0);
860             handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
861         }
862 
863         void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
864                   int adjustment, int capacity, int maxCapacity) {
865             this.adjustment = adjustment;
866             chunk = wrapped;
867             length = capacity;
868             maxCapacity(maxCapacity);
869             setIndex0(readerIndex, writerIndex);
870             hasArray = unwrapped.hasArray();
871             hasMemoryAddress = unwrapped.hasMemoryAddress();
872             rootParent = unwrapped;
873             tmpNioBuf = unwrapped.internalNioBuffer(adjustment, capacity).slice();
874         }
875 
876         private AbstractByteBuf rootParent() {
877             final AbstractByteBuf rootParent = this.rootParent;
878             if (rootParent != null) {
879                 return rootParent;
880             }
881             throw new IllegalReferenceCountException();
882         }
883 
884         @Override
885         public int capacity() {
886             return length;
887         }
888 
889         @Override
890         public ByteBuf capacity(int newCapacity) {
891             if (newCapacity == capacity()) {
892                 ensureAccessible();
893                 return this;
894             }
895             checkNewCapacity(newCapacity);
896             if (newCapacity < capacity()) {
897                 length = newCapacity;
898                 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
899                 return this;
900             }
901 
902             // Reallocation required.
903             ByteBuffer data = tmpNioBuf;
904             data.clear();
905             tmpNioBuf = null;
906             Chunk chunk = this.chunk;
907             Magazine magazine = chunk.magazine;
908             AdaptivePoolingAllocator allocator = magazine.parent;
909             int readerIndex = this.readerIndex;
910             int writerIndex = this.writerIndex;
911             allocator.allocate(newCapacity, maxCapacity(), this);
912             tmpNioBuf.put(data);
913             tmpNioBuf.clear();
914             chunk.release();
915             this.readerIndex = readerIndex;
916             this.writerIndex = writerIndex;
917             return this;
918         }
919 
920         @Override
921         public ByteBufAllocator alloc() {
922             return rootParent().alloc();
923         }
924 
925         @Override
926         public ByteOrder order() {
927             return rootParent().order();
928         }
929 
930         @Override
931         public ByteBuf unwrap() {
932             return null;
933         }
934 
935         @Override
936         public boolean isDirect() {
937             return rootParent().isDirect();
938         }
939 
940         @Override
941         public int arrayOffset() {
942             return idx(rootParent().arrayOffset());
943         }
944 
945         @Override
946         public boolean hasMemoryAddress() {
947             return hasMemoryAddress;
948         }
949 
950         @Override
951         public long memoryAddress() {
952             ensureAccessible();
953             return rootParent().memoryAddress() + adjustment;
954         }
955 
956         @Override
957         public ByteBuffer nioBuffer(int index, int length) {
958             checkIndex(index, length);
959             return rootParent().nioBuffer(idx(index), length);
960         }
961 
962         @Override
963         public ByteBuffer internalNioBuffer(int index, int length) {
964             checkIndex(index, length);
965             return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
966         }
967 
968         private ByteBuffer internalNioBuffer() {
969             return (ByteBuffer) tmpNioBuf.clear();
970         }
971 
972         @Override
973         public ByteBuffer[] nioBuffers(int index, int length) {
974             checkIndex(index, length);
975             return rootParent().nioBuffers(idx(index), length);
976         }
977 
978         @Override
979         public boolean hasArray() {
980             return hasArray;
981         }
982 
983         @Override
984         public byte[] array() {
985             ensureAccessible();
986             return rootParent().array();
987         }
988 
989         @Override
990         public ByteBuf copy(int index, int length) {
991             checkIndex(index, length);
992             return rootParent().copy(idx(index), length);
993         }
994 
995         @Override
996         public int nioBufferCount() {
997             return rootParent().nioBufferCount();
998         }
999 
1000         @Override
1001         protected byte _getByte(int index) {
1002             return rootParent()._getByte(idx(index));
1003         }
1004 
1005         @Override
1006         protected short _getShort(int index) {
1007             return rootParent()._getShort(idx(index));
1008         }
1009 
1010         @Override
1011         protected short _getShortLE(int index) {
1012             return rootParent()._getShortLE(idx(index));
1013         }
1014 
1015         @Override
1016         protected int _getUnsignedMedium(int index) {
1017             return rootParent()._getUnsignedMedium(idx(index));
1018         }
1019 
1020         @Override
1021         protected int _getUnsignedMediumLE(int index) {
1022             return rootParent()._getUnsignedMediumLE(idx(index));
1023         }
1024 
1025         @Override
1026         protected int _getInt(int index) {
1027             return rootParent()._getInt(idx(index));
1028         }
1029 
1030         @Override
1031         protected int _getIntLE(int index) {
1032             return rootParent()._getIntLE(idx(index));
1033         }
1034 
1035         @Override
1036         protected long _getLong(int index) {
1037             return rootParent()._getLong(idx(index));
1038         }
1039 
1040         @Override
1041         protected long _getLongLE(int index) {
1042             return rootParent()._getLongLE(idx(index));
1043         }
1044 
1045         @Override
1046         public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1047             checkIndex(index, length);
1048             rootParent().getBytes(idx(index), dst, dstIndex, length);
1049             return this;
1050         }
1051 
1052         @Override
1053         public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1054             checkIndex(index, length);
1055             rootParent().getBytes(idx(index), dst, dstIndex, length);
1056             return this;
1057         }
1058 
1059         @Override
1060         public ByteBuf getBytes(int index, ByteBuffer dst) {
1061             checkIndex(index, dst.remaining());
1062             rootParent().getBytes(idx(index), dst);
1063             return this;
1064         }
1065 
1066         @Override
1067         protected void _setByte(int index, int value) {
1068             rootParent()._setByte(idx(index), value);
1069         }
1070 
1071         @Override
1072         protected void _setShort(int index, int value) {
1073             rootParent()._setShort(idx(index), value);
1074         }
1075 
1076         @Override
1077         protected void _setShortLE(int index, int value) {
1078             rootParent()._setShortLE(idx(index), value);
1079         }
1080 
1081         @Override
1082         protected void _setMedium(int index, int value) {
1083             rootParent()._setMedium(idx(index), value);
1084         }
1085 
1086         @Override
1087         protected void _setMediumLE(int index, int value) {
1088             rootParent()._setMediumLE(idx(index), value);
1089         }
1090 
1091         @Override
1092         protected void _setInt(int index, int value) {
1093             rootParent()._setInt(idx(index), value);
1094         }
1095 
1096         @Override
1097         protected void _setIntLE(int index, int value) {
1098             rootParent()._setIntLE(idx(index), value);
1099         }
1100 
1101         @Override
1102         protected void _setLong(int index, long value) {
1103             rootParent()._setLong(idx(index), value);
1104         }
1105 
1106         @Override
1107         protected void _setLongLE(int index, long value) {
1108             rootParent().setLongLE(idx(index), value);
1109         }
1110 
1111         @Override
1112         public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1113             checkIndex(index, length);
1114             rootParent().setBytes(idx(index), src, srcIndex, length);
1115             return this;
1116         }
1117 
1118         @Override
1119         public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1120             checkIndex(index, length);
1121             rootParent().setBytes(idx(index), src, srcIndex, length);
1122             return this;
1123         }
1124 
1125         @Override
1126         public ByteBuf setBytes(int index, ByteBuffer src) {
1127             checkIndex(index, src.remaining());
1128             rootParent().setBytes(idx(index), src);
1129             return this;
1130         }
1131 
1132         @Override
1133         public ByteBuf getBytes(int index, OutputStream out, int length)
1134                 throws IOException {
1135             checkIndex(index, length);
1136             if (length != 0) {
1137                 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1138             }
1139             return this;
1140         }
1141 
1142         @Override
1143         public int getBytes(int index, GatheringByteChannel out, int length)
1144                 throws IOException {
1145             return out.write(internalNioBuffer(index, length).duplicate());
1146         }
1147 
1148         @Override
1149         public int getBytes(int index, FileChannel out, long position, int length)
1150                 throws IOException {
1151             return out.write(internalNioBuffer(index, length).duplicate(), position);
1152         }
1153 
1154         @Override
1155         public int setBytes(int index, InputStream in, int length)
1156                 throws IOException {
1157             checkIndex(index, length);
1158             final AbstractByteBuf rootParent = rootParent();
1159             if (rootParent.hasArray()) {
1160                 return rootParent.setBytes(idx(index), in, length);
1161             }
1162             byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1163             int readBytes = in.read(tmp, 0, length);
1164             if (readBytes <= 0) {
1165                 return readBytes;
1166             }
1167             setBytes(index, tmp, 0, readBytes);
1168             return readBytes;
1169         }
1170 
1171         @Override
1172         public int setBytes(int index, ScatteringByteChannel in, int length)
1173                 throws IOException {
1174             try {
1175                 return in.read(internalNioBuffer(index, length).duplicate());
1176             } catch (ClosedChannelException ignored) {
1177                 return -1;
1178             }
1179         }
1180 
1181         @Override
1182         public int setBytes(int index, FileChannel in, long position, int length)
1183                 throws IOException {
1184             try {
1185                 return in.read(internalNioBuffer(index, length).duplicate(), position);
1186             } catch (ClosedChannelException ignored) {
1187                 return -1;
1188             }
1189         }
1190 
1191         @Override
1192         public int forEachByte(int index, int length, ByteProcessor processor) {
1193             checkIndex(index, length);
1194             int ret = rootParent().forEachByte(idx(index), length, processor);
1195             return forEachResult(ret);
1196         }
1197 
1198         @Override
1199         public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1200             checkIndex(index, length);
1201             int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1202             return forEachResult(ret);
1203         }
1204 
1205         private int forEachResult(int ret) {
1206             if (ret < adjustment) {
1207                 return -1;
1208             }
1209             return ret - adjustment;
1210         }
1211 
1212         @Override
1213         public boolean isContiguous() {
1214             return rootParent().isContiguous();
1215         }
1216 
1217         private int idx(int index) {
1218             return index + adjustment;
1219         }
1220 
1221         @Override
1222         protected void deallocate() {
1223             if (chunk != null) {
1224                 chunk.release();
1225             }
1226             tmpNioBuf = null;
1227             chunk = null;
1228             rootParent = null;
1229             if (handle instanceof EnhancedHandle) {
1230                 EnhancedHandle<AdaptiveByteBuf>  enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1231                 enhancedHandle.unguardedRecycle(this);
1232             } else {
1233                 handle.recycle(this);
1234             }
1235         }
1236     }
1237 
1238     /**
1239      * The strategy for how {@link AdaptivePoolingAllocator} should allocate chunk buffers.
1240      */
1241     interface ChunkAllocator {
1242         /**
1243          * Allocate a buffer for a chunk. This can be any kind of {@link AbstractByteBuf} implementation.
1244          * @param initialCapacity The initial capacity of the returned {@link AbstractByteBuf}.
1245          * @param maxCapacity The maximum capacity of the returned {@link AbstractByteBuf}.
1246          * @return The buffer that represents the chunk memory.
1247          */
1248         AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1249     }
1250 }