View Javadoc
1   /*
2    * Copyright 2022 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.buffer;
17  
18  import io.netty.util.ByteProcessor;
19  import io.netty.util.IllegalReferenceCountException;
20  import io.netty.util.NettyRuntime;
21  import io.netty.util.Recycler;
22  import io.netty.util.ReferenceCounted;
23  import io.netty.util.concurrent.FastThreadLocal;
24  import io.netty.util.concurrent.FastThreadLocalThread;
25  import io.netty.util.internal.ObjectPool;
26  import io.netty.util.internal.ObjectUtil;
27  import io.netty.util.internal.PlatformDependent;
28  import io.netty.util.internal.ReferenceCountUpdater;
29  import io.netty.util.internal.SystemPropertyUtil;
30  import io.netty.util.internal.ThreadExecutorMap;
31  import io.netty.util.internal.UnstableApi;
32  
33  import java.io.IOException;
34  import java.io.InputStream;
35  import java.io.OutputStream;
36  import java.nio.ByteBuffer;
37  import java.nio.ByteOrder;
38  import java.nio.channels.ClosedChannelException;
39  import java.nio.channels.FileChannel;
40  import java.nio.channels.GatheringByteChannel;
41  import java.nio.channels.ScatteringByteChannel;
42  import java.util.Arrays;
43  import java.util.Queue;
44  import java.util.Set;
45  import java.util.concurrent.ConcurrentLinkedQueue;
46  import java.util.concurrent.CopyOnWriteArraySet;
47  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
48  import java.util.concurrent.atomic.AtomicLong;
49  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
50  import java.util.concurrent.locks.StampedLock;
51  
52  /**
53   * An auto-tuning pooling allocator, that follows an anti-generational hypothesis.
54   * <p>
55   * The allocator is organized into a list of Magazines, and each magazine has a chunk-buffer that they allocate buffers
56   * from.
57   * <p>
58   * The magazines hold the mutexes that ensure the thread-safety of the allocator, and each thread picks a magazine
59   * based on the id of the thread. This spreads the contention of multi-threaded access across the magazines.
60   * If contention is detected above a certain threshold, the number of magazines are increased in response to the
61   * contention.
62   * <p>
63   * The magazines maintain histograms of the sizes of the allocations they do. The histograms are used to compute the
64   * preferred chunk size. The preferred chunk size is one that is big enough to service 10 allocations of the
65   * 99-percentile size. This way, the chunk size is adapted to the allocation patterns.
66   * <p>
67   * Computing the preferred chunk size is a somewhat expensive operation. Therefore, the frequency with which this is
68   * done, is also adapted to the allocation pattern. If a newly computed preferred chunk is the same as the previous
69   * preferred chunk size, then the frequency is reduced. Otherwise, the frequency is increased.
70   * <p>
71   * This allows the allocator to quickly respond to changes in the application workload,
72   * without suffering undue overhead from maintaining its statistics.
73   * <p>
74   * Since magazines are "relatively thread-local", the allocator has a central queue that allow excess chunks from any
75   * magazine, to be shared with other magazines.
76   * The {@link #createSharedChunkQueue()} method can be overridden to customize this queue.
77   */
78  @UnstableApi
79  final class AdaptivePoolingAllocator {
80  
81      enum MagazineCaching {
82          EventLoopThreads,
83          FastThreadLocalThreads,
84          None
85      }
86  
87      private static final int EXPANSION_ATTEMPTS = 3;
88      private static final int INITIAL_MAGAZINES = 4;
89      private static final int RETIRE_CAPACITY = 4 * 1024;
90      private static final int MIN_CHUNK_SIZE = 128 * 1024;
91      private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
92      private static final int BUFS_PER_CHUNK = 10; // For large buffers, aim to have about this many buffers per chunk.
93  
94      /**
95       * The maximum size of a pooled chunk, in bytes. Allocations bigger than this will never be pooled.
96       * <p>
97       * This number is 10 MiB, and is derived from the limitations of internal histograms.
98       */
99      private static final int MAX_CHUNK_SIZE =
100             BUFS_PER_CHUNK * (1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT); // 10 MiB.
101 
102     /**
103      * The capacity if the central queue that allow chunks to be shared across magazines.
104      * The default size is {@link NettyRuntime#availableProcessors()},
105      * and the maximum number of magazines is twice this.
106      * <p>
107      * This means the maximum amount of memory that we can have allocated-but-not-in-use is
108      * 5 * {@link NettyRuntime#availableProcessors()} * {@link #MAX_CHUNK_SIZE} bytes.
109      */
110     private static final int CENTRAL_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
111             "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors());
112 
113     private static final Object NO_MAGAZINE = Boolean.TRUE;
114 
115     private final ChunkAllocator chunkAllocator;
116     private final Queue<Chunk> centralQueue;
117     private final StampedLock magazineExpandLock;
118     private volatile Magazine[] magazines;
119     private final FastThreadLocal<Object> threadLocalMagazine;
120     private final Set<Magazine> liveCachedMagazines;
121     private volatile boolean freed;
122 
123     AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
124         ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
125         ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
126         this.chunkAllocator = chunkAllocator;
127         centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
128         magazineExpandLock = new StampedLock();
129         if (magazineCaching != MagazineCaching.None) {
130             assert magazineCaching == MagazineCaching.EventLoopThreads ||
131                    magazineCaching == MagazineCaching.FastThreadLocalThreads;
132             final boolean cachedMagazinesNonEventLoopThreads =
133                     magazineCaching == MagazineCaching.FastThreadLocalThreads;
134             final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
135             threadLocalMagazine = new FastThreadLocal<Object>() {
136                 @Override
137                 protected Object initialValue() {
138                     if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
139                         Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
140 
141                         if (FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread())) {
142                             // Only add it to the liveMagazines if we can guarantee that onRemoval(...) is called,
143                             // as otherwise we might end up holding the reference forever.
144                             liveMagazines.add(mag);
145                         }
146                         return mag;
147                     }
148                     return NO_MAGAZINE;
149                 }
150 
151                 @Override
152                 protected void onRemoval(final Object value) throws Exception {
153                     if (value != NO_MAGAZINE) {
154                         liveMagazines.remove(value);
155                     }
156                 }
157             };
158             liveCachedMagazines = liveMagazines;
159         } else {
160             threadLocalMagazine = null;
161             liveCachedMagazines = null;
162         }
163         Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
164         for (int i = 0; i < mags.length; i++) {
165             mags[i] = new Magazine(this);
166         }
167         magazines = mags;
168     }
169 
170     /**
171      * Create a thread-safe multi-producer, multi-consumer queue to hold chunks that spill over from the
172      * internal Magazines.
173      * <p>
174      * Each Magazine can only hold two chunks at any one time: the chunk it currently allocates from,
175      * and the next-in-line chunk which will be used for allocation once the current one has been used up.
176      * This queue will be used by magazines to share any excess chunks they allocate, so that they don't need to
177      * allocate new chunks when their current and next-in-line chunks have both been used up.
178      * <p>
179      * The simplest implementation of this method is to return a new {@link ConcurrentLinkedQueue}.
180      * However, the {@code CLQ} is unbounded, and this means there's no limit to how many chunks can be cached in this
181      * queue.
182      * <p>
183      * Each chunk in this queue can be up to {@link #MAX_CHUNK_SIZE} in size, so it is recommended to use a bounded
184      * queue to limit the maximum memory usage.
185      * <p>
186      * The default implementation will create a bounded queue with a capacity of {@link #CENTRAL_QUEUE_CAPACITY}.
187      *
188      * @return A new multi-producer, multi-consumer queue.
189      */
190     private static Queue<Chunk> createSharedChunkQueue() {
191         return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
192     }
193 
194     ByteBuf allocate(int size, int maxCapacity) {
195         if (size <= MAX_CHUNK_SIZE) {
196             Thread currentThread = Thread.currentThread();
197             boolean willCleanupFastThreadLocals = FastThreadLocalThread.willCleanupFastThreadLocals(currentThread);
198             AdaptiveByteBuf buf = AdaptiveByteBuf.newInstance(willCleanupFastThreadLocals);
199             try {
200                 if (allocate(size, maxCapacity, currentThread, buf)) {
201                     // Allocation did work, the ownership will is transferred to the caller.
202                     AdaptiveByteBuf result = buf;
203                     buf = null;
204                     return result;
205                 }
206             } finally {
207                 if (buf != null) {
208                     // We didn't handover ownership of the buf, release it now so we don't leak.
209                     buf.release();
210                 }
211             }
212         }
213         // The magazines failed us, or the buffer is too big to be pooled.
214         return chunkAllocator.allocate(size, maxCapacity);
215     }
216 
217     private boolean allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
218         int sizeBucket = AllocationStatistics.sizeBucket(size); // Compute outside of Magazine lock for better ILP.
219         FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
220         if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
221             Object mag = threadLocalMagazine.get();
222             if (mag != NO_MAGAZINE) {
223                 boolean allocated = ((Magazine) mag).tryAllocate(size, sizeBucket, maxCapacity, buf);
224                 assert allocated : "Allocation of threadLocalMagazine must always succeed";
225                 return true;
226             }
227         }
228         long threadId = currentThread.getId();
229         Magazine[] mags;
230         int expansions = 0;
231         do {
232             mags = magazines;
233             int mask = mags.length - 1;
234             int index = (int) (threadId & mask);
235             for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
236                 Magazine mag = mags[index + i & mask];
237                 if (mag.tryAllocate(size, sizeBucket, maxCapacity, buf)) {
238                     // Was able to allocate.
239                     return true;
240                 }
241             }
242             expansions++;
243         } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
244         return false;
245     }
246 
247     /**
248      * Allocate into the given buffer. Used by {@link AdaptiveByteBuf#capacity(int)}.
249      */
250     void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
251         Magazine magazine = into.chunk.magazine;
252         if (!allocate(size, maxCapacity, Thread.currentThread(), into)) {
253             // Create a one-off chunk for this allocation as the previous allocate call did not work out.
254             AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
255             Chunk chunk = new Chunk(innerChunk, magazine, false);
256             try {
257                 chunk.readInitInto(into, size, maxCapacity);
258             } finally {
259                 // As the chunk is an one-off we need to always call release explicitly as readInitInto(...)
260                 // will take care of retain once when successful. Once The AdaptiveByteBuf is released it will
261                 // completely release the Chunk and so the contained innerChunk.
262                 chunk.release();
263             }
264         }
265     }
266 
267     long usedMemory() {
268         long sum = 0;
269         for (Chunk chunk : centralQueue) {
270             sum += chunk.capacity();
271         }
272         for (Magazine magazine : magazines) {
273             sum += magazine.usedMemory.get();
274         }
275         if (liveCachedMagazines != null) {
276             for (Magazine magazine : liveCachedMagazines) {
277                 sum += magazine.usedMemory.get();
278             }
279         }
280         return sum;
281     }
282 
283     private boolean tryExpandMagazines(int currentLength) {
284         if (currentLength >= MAX_STRIPES) {
285             return true;
286         }
287         long writeLock = magazineExpandLock.tryWriteLock();
288         if (writeLock != 0) {
289             try {
290                 Magazine[] mags = magazines;
291                 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
292                     return true;
293                 }
294                 int preferredChunkSize = mags[0].sharedPrefChunkSize;
295                 Magazine[] expanded = new Magazine[mags.length * 2];
296                 for (int i = 0, l = expanded.length; i < l; i++) {
297                     Magazine m = new Magazine(this);
298                     m.localPrefChunkSize = preferredChunkSize;
299                     m.sharedPrefChunkSize = preferredChunkSize;
300                     expanded[i] = m;
301                 }
302                 magazines = expanded;
303                 for (Magazine magazine : mags) {
304                     magazine.free();
305                 }
306             } finally {
307                 magazineExpandLock.unlockWrite(writeLock);
308             }
309         }
310         return true;
311     }
312 
313     private boolean offerToQueue(Chunk buffer) {
314         if (freed) {
315             return false;
316         }
317         return centralQueue.offer(buffer);
318     }
319 
320     // Ensure that we release all previous pooled resources when this object is finalized. This is needed as otherwise
321     // we might end up with leaks. While these leaks are usually harmless in reality it would still at least be
322     // very confusing for users.
323     @Override
324     protected void finalize() throws Throwable {
325         try {
326             super.finalize();
327         } finally {
328             free();
329         }
330     }
331 
332     private void free() {
333         freed = true;
334         long stamp = magazineExpandLock.writeLock();
335         try {
336             Magazine[] mags = magazines;
337             for (Magazine magazine : mags) {
338                 magazine.free();
339             }
340         } finally {
341             magazineExpandLock.unlockWrite(stamp);
342         }
343         for (;;) {
344             Chunk chunk = centralQueue.poll();
345             if (chunk == null) {
346                 break;
347             }
348             chunk.release();
349         }
350     }
351 
352     static int sizeBucket(int size) {
353         return AllocationStatistics.sizeBucket(size);
354     }
355 
356     @SuppressWarnings("checkstyle:finalclass") // Checkstyle mistakenly believes this class should be final.
357     private static class AllocationStatistics {
358         private static final int MIN_DATUM_TARGET = 1024;
359         private static final int MAX_DATUM_TARGET = 65534;
360         private static final int INIT_DATUM_TARGET = 9;
361         private static final int HISTO_MIN_BUCKET_SHIFT = 13; // Smallest bucket is 1 << 13 = 8192 bytes in size.
362         private static final int HISTO_MAX_BUCKET_SHIFT = 20; // Biggest bucket is 1 << 20 = 1 MiB bytes in size.
363         private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT; // 8 buckets.
364         private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
365         private static final int SIZE_MAX_MASK = MAX_CHUNK_SIZE - 1;
366 
367         protected final AdaptivePoolingAllocator parent;
368         private final boolean shareable;
369         private final short[][] histos = {
370                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
371                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
372         };
373         private short[] histo = histos[0];
374         private final int[] sums = new int[HISTO_BUCKET_COUNT];
375 
376         private int histoIndex;
377         private int datumCount;
378         private int datumTarget = INIT_DATUM_TARGET;
379         protected volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
380         protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
381 
382         private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
383             this.parent = parent;
384             this.shareable = shareable;
385         }
386 
387         protected void recordAllocationSize(int bucket) {
388             histo[bucket]++;
389             if (datumCount++ == datumTarget) {
390                 rotateHistograms();
391             }
392         }
393 
394         static int sizeBucket(int size) {
395             if (size == 0) {
396                 return 0;
397             }
398             // Minimum chunk size is 128 KiB. We'll only make bigger chunks if the 99-percentile is 16 KiB or greater,
399             // so we truncate and roll up the bottom part of the histogram to 8 KiB.
400             // The upper size band is 1 MiB, and that gives us exactly 8 size buckets,
401             // which is a magical number for JIT optimisations.
402             int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & SIZE_MAX_MASK;
403             return Math.min(Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize), HISTO_MAX_BUCKET_MASK);
404         }
405 
406         private void rotateHistograms() {
407             short[][] hs = histos;
408             for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
409                 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
410             }
411             int sum = 0;
412             for (int count : sums) {
413                 sum  += count;
414             }
415             int targetPercentile = (int) (sum * 0.99);
416             int sizeBucket = 0;
417             for (; sizeBucket < sums.length; sizeBucket++) {
418                 if (sums[sizeBucket] > targetPercentile) {
419                     break;
420                 }
421                 targetPercentile -= sums[sizeBucket];
422             }
423             int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
424             int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
425             localPrefChunkSize = prefChunkSize;
426             if (shareable) {
427                 for (Magazine mag : parent.magazines) {
428                     prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
429                 }
430             }
431             if (sharedPrefChunkSize != prefChunkSize) {
432                 // Preferred chunk size changed. Increase check frequency.
433                 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
434                 sharedPrefChunkSize = prefChunkSize;
435             } else {
436                 // Preferred chunk size did not change. Check less often.
437                 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
438             }
439 
440             histoIndex = histoIndex + 1 & 3;
441             histo = histos[histoIndex];
442             datumCount = 0;
443             Arrays.fill(histo, (short) 0);
444         }
445 
446         /**
447          * Get the preferred chunk size, based on statistics from the {@linkplain #recordAllocationSize(int) recorded}
448          * allocation sizes.
449          * <p>
450          * This method must be thread-safe.
451          *
452          * @return The currently preferred chunk allocation size.
453          */
454         protected int preferredChunkSize() {
455             return sharedPrefChunkSize;
456         }
457     }
458 
459     private static final class Magazine extends AllocationStatistics {
460         private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
461         static {
462             NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
463         }
464         private static final Chunk MAGAZINE_FREED = new Chunk();
465 
466         private Chunk current;
467         @SuppressWarnings("unused") // updated via NEXT_IN_LINE
468         private volatile Chunk nextInLine;
469         private final AtomicLong usedMemory;
470         private final StampedLock allocationLock;
471 
472         Magazine(AdaptivePoolingAllocator parent) {
473             this(parent, true);
474         }
475 
476         Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
477             super(parent, shareable);
478 
479             // We only need the StampedLock if this Magazine will be shared across threads.
480             if (shareable) {
481                 allocationLock = new StampedLock();
482             } else {
483                 allocationLock = null;
484             }
485             usedMemory = new AtomicLong();
486         }
487 
488         public boolean tryAllocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
489             if (allocationLock == null) {
490                 // This magazine is not shared across threads, just allocate directly.
491                 return allocate(size, sizeBucket, maxCapacity, buf);
492             }
493 
494             // Try to retrieve the lock and if successful allocate.
495             long writeLock = allocationLock.tryWriteLock();
496             if (writeLock != 0) {
497                 try {
498                     return allocate(size, sizeBucket, maxCapacity, buf);
499                 } finally {
500                     allocationLock.unlockWrite(writeLock);
501                 }
502             }
503             return false;
504         }
505 
506         private boolean allocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
507             recordAllocationSize(sizeBucket);
508             Chunk curr = current;
509             if (curr != null) {
510                 if (curr.remainingCapacity() > size) {
511                     curr.readInitInto(buf, size, maxCapacity);
512                     // We still have some bytes left that we can use for the next allocation, just early return.
513                     return true;
514                 }
515 
516                 // At this point we know that this will be the last time current will be used, so directly set it to
517                 // null and release it once we are done.
518                 current = null;
519                 if (curr.remainingCapacity() == size) {
520                     try {
521                         curr.readInitInto(buf, size, maxCapacity);
522                         return true;
523                     } finally {
524                         curr.release();
525                     }
526                 }
527             }
528             Chunk last = curr;
529             assert current == null;
530             // The fast-path for allocations did not work.
531             //
532             // Try to fetch the next "Magazine local" Chunk first, if this this fails as we don't have
533             // one setup we will poll our centralQueue. If this fails as well we will just allocate a new Chunk.
534             //
535             // In any case we will store the Chunk as the current so it will be used again for the next allocation and
536             // so be "reserved" by this Magazine for exclusive usage.
537             if (nextInLine != null) {
538                 curr = NEXT_IN_LINE.getAndSet(this, null);
539                 if (curr == MAGAZINE_FREED) {
540                     // Allocation raced with a stripe-resize that freed this magazine.
541                     restoreMagazineFreed();
542                     return false;
543                 }
544             } else {
545                 curr = parent.centralQueue.poll();
546                 if (curr == null) {
547                     curr = newChunkAllocation(size);
548                 }
549             }
550             current = curr;
551             assert current != null;
552             if (last != null) {
553                 if (last.remainingCapacity() < RETIRE_CAPACITY) {
554                     last.release();
555                 } else {
556                     transferChunk(last);
557                 }
558             }
559             if (curr.remainingCapacity() > size) {
560                 curr.readInitInto(buf, size, maxCapacity);
561             } else if (curr.remainingCapacity() == size) {
562                 try {
563                     curr.readInitInto(buf, size, maxCapacity);
564                 } finally {
565                     // Release in a finally block so even if readInitInto(...) would throw we would still correctly
566                     // release the current chunk before null it out.
567                     curr.release();
568                     current = null;
569                 }
570             } else {
571                 Chunk newChunk = newChunkAllocation(size);
572                 try {
573                     newChunk.readInitInto(buf, size, maxCapacity);
574                     if (curr.remainingCapacity() < RETIRE_CAPACITY) {
575                         curr.release();
576                         current = newChunk;
577                     } else {
578                         transferChunk(newChunk);
579                     }
580                     newChunk = null;
581                 } finally {
582                     if (newChunk != null) {
583                         assert current == null;
584                         // Something went wrong, let's ensure we not leak the newChunk.
585                         newChunk.release();
586                     }
587                 }
588             }
589             return true;
590         }
591 
592         private void restoreMagazineFreed() {
593             Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
594             if (next != null && next != MAGAZINE_FREED) {
595                 next.release(); // A chunk snuck in through a race. Release it after restoring MAGAZINE_FREED state.
596             }
597         }
598 
599         private void transferChunk(Chunk current) {
600             if (NEXT_IN_LINE.compareAndSet(this, null, current)
601                     || parent.offerToQueue(current)) {
602                 return;
603             }
604             Chunk nextChunk = NEXT_IN_LINE.get(this);
605             if (nextChunk != null && current.remainingCapacity() > nextChunk.remainingCapacity()) {
606                 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, current)) {
607                     if (nextChunk != MAGAZINE_FREED) {
608                         nextChunk.release();
609                     }
610                     return;
611                 }
612             }
613             // Next-in-line is occupied AND the central queue is full.
614             // Rare that we should get here, but we'll only do one allocation out of this chunk, then.
615             current.release();
616         }
617 
618         private Chunk newChunkAllocation(int promptingSize) {
619             int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
620             ChunkAllocator chunkAllocator = parent.chunkAllocator;
621             return new Chunk(chunkAllocator.allocate(size, size), this, true);
622         }
623 
624         boolean trySetNextInLine(Chunk chunk) {
625             return NEXT_IN_LINE.compareAndSet(this, null, chunk);
626         }
627 
628         void free() {
629             // Release the current Chunk and the next that was stored for later usage.
630             restoreMagazineFreed();
631             long stamp = allocationLock.writeLock();
632             try {
633                 if (current != null) {
634                     current.release();
635                     current = null;
636                 }
637             } finally {
638                 allocationLock.unlockWrite(stamp);
639             }
640         }
641     }
642 
643     private static final class Chunk implements ReferenceCounted {
644 
645         private final AbstractByteBuf delegate;
646         private final Magazine magazine;
647         private final int capacity;
648         private final boolean pooled;
649         private int allocatedBytes;
650         private static final long REFCNT_FIELD_OFFSET =
651                 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
652         private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
653                 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
654 
655         private static final ReferenceCountUpdater<Chunk> updater =
656                 new ReferenceCountUpdater<Chunk>() {
657                     @Override
658                     protected AtomicIntegerFieldUpdater<Chunk> updater() {
659                         return AIF_UPDATER;
660                     }
661                     @Override
662                     protected long unsafeOffset() {
663                         return REFCNT_FIELD_OFFSET;
664                     }
665                 };
666 
667         // Value might not equal "real" reference count, all access should be via the updater
668         @SuppressWarnings({"unused", "FieldMayBeFinal"})
669         private volatile int refCnt;
670 
671         Chunk() {
672             // Constructor only used by the MAGAZINE_FREED sentinel.
673             delegate = null;
674             magazine = null;
675             capacity = 0;
676             pooled = false;
677         }
678 
679         Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
680             this.delegate = delegate;
681             this.magazine = magazine;
682             this.pooled = pooled;
683             capacity = delegate.capacity();
684             magazine.usedMemory.getAndAdd(capacity);
685             updater.setInitialValue(this);
686         }
687 
688         @Override
689         public Chunk touch(Object hint) {
690             return this;
691         }
692 
693         @Override
694         public int refCnt() {
695             return updater.refCnt(this);
696         }
697 
698         @Override
699         public Chunk retain() {
700             return updater.retain(this);
701         }
702 
703         @Override
704         public Chunk retain(int increment) {
705             return updater.retain(this, increment);
706         }
707 
708         @Override
709         public Chunk touch() {
710             return this;
711         }
712 
713         @Override
714         public boolean release() {
715             if (updater.release(this)) {
716                 deallocate();
717                 return true;
718             }
719             return false;
720         }
721 
722         @Override
723         public boolean release(int decrement) {
724             if (updater.release(this, decrement)) {
725                 deallocate();
726                 return true;
727             }
728             return false;
729         }
730 
731         private void deallocate() {
732             Magazine mag = magazine;
733             AdaptivePoolingAllocator parent = mag.parent;
734             int chunkSize = mag.preferredChunkSize();
735             int memSize = delegate.capacity();
736             if (!pooled || memSize < chunkSize || memSize > chunkSize + (chunkSize >> 1)) {
737                 // Drop the chunk if the parent allocator is closed, or if the chunk is smaller than the
738                 // preferred chunk size, or over 50% larger than the preferred chunk size.
739                 mag.usedMemory.getAndAdd(-capacity());
740                 delegate.release();
741             } else {
742                 updater.resetRefCnt(this);
743                 delegate.setIndex(0, 0);
744                 allocatedBytes = 0;
745                 if (!mag.trySetNextInLine(this)) {
746                     if (!parent.offerToQueue(this)) {
747                         // The central queue is full. Ensure we release again as we previously did use resetRefCnt()
748                         // which did increase the reference count by 1.
749                         boolean released = updater.release(this);
750                         delegate.release();
751                         assert released;
752                     }
753                 }
754             }
755         }
756 
757         public void readInitInto(AdaptiveByteBuf buf, int size, int maxCapacity) {
758             int startIndex = allocatedBytes;
759             allocatedBytes = startIndex + size;
760             Chunk chunk = this;
761             chunk.retain();
762             try {
763                 buf.init(delegate, chunk, 0, 0, startIndex, size, maxCapacity);
764                 chunk = null;
765             } finally {
766                 if (chunk != null) {
767                     // If chunk is not null we know that buf.init(...) failed and so we need to manually release
768                     // the chunk again as we retained it before calling buf.init(...).
769                     chunk.release();
770                 }
771             }
772         }
773 
774         public int remainingCapacity() {
775             return capacity - allocatedBytes;
776         }
777 
778         public int capacity() {
779             return capacity;
780         }
781     }
782 
783     static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
784         static final ObjectPool<AdaptiveByteBuf> RECYCLER = ObjectPool.newPool(
785                 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
786                     @Override
787                     public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
788                         return new AdaptiveByteBuf(handle);
789                     }
790                 });
791 
792         static AdaptiveByteBuf newInstance(boolean useThreadLocal) {
793             if (useThreadLocal) {
794                 AdaptiveByteBuf buf = RECYCLER.get();
795                 buf.resetRefCnt();
796                 buf.discardMarks();
797                 return buf;
798             }
799             return new AdaptiveByteBuf(null);
800         }
801 
802         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
803 
804         private int adjustment;
805         private AbstractByteBuf rootParent;
806         private Chunk chunk;
807         private int length;
808         private ByteBuffer tmpNioBuf;
809         private boolean hasArray;
810         private boolean hasMemoryAddress;
811 
812         AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
813             super(0);
814             handle = recyclerHandle;
815         }
816 
817         void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
818                   int adjustment, int capacity, int maxCapacity) {
819             this.adjustment = adjustment;
820             chunk = wrapped;
821             length = capacity;
822             maxCapacity(maxCapacity);
823             setIndex0(readerIndex, writerIndex);
824             hasArray = unwrapped.hasArray();
825             hasMemoryAddress = unwrapped.hasMemoryAddress();
826             rootParent = unwrapped;
827             tmpNioBuf = unwrapped.internalNioBuffer(adjustment, capacity).slice();
828         }
829 
830         private AbstractByteBuf rootParent() {
831             final AbstractByteBuf rootParent = this.rootParent;
832             if (rootParent != null) {
833                 return rootParent;
834             }
835             throw new IllegalReferenceCountException();
836         }
837 
838         @Override
839         public ByteBuf retainedSlice() {
840             if (handle == null) {
841                 return super.retainedSlice();
842             }
843             // If the handle is not null we can use the Pooled variants that use a Recycler.
844             ensureAccessible();
845             return PooledSlicedByteBuf.newInstance(this, this, readerIndex(), readableBytes());
846         }
847 
848         @Override
849         public ByteBuf retainedSlice(int index, int length) {
850             if (handle == null) {
851                 return super.retainedSlice(index, length);
852             }
853             // If the handle is not null we can use the Pooled variants that use a Recycler.
854             ensureAccessible();
855             return PooledSlicedByteBuf.newInstance(this, this, index, length);
856         }
857 
858         @Override
859         public ByteBuf retainedDuplicate() {
860             if (handle == null) {
861                 return super.retainedDuplicate();
862             }
863             // If the handle is not null we can use the Pooled variants that use a Recycler.
864             ensureAccessible();
865             return PooledDuplicatedByteBuf.newInstance(this, this, readerIndex(), writerIndex());
866         }
867 
868         @Override
869         public int capacity() {
870             return length;
871         }
872 
873         @Override
874         public ByteBuf capacity(int newCapacity) {
875             if (newCapacity == capacity()) {
876                 ensureAccessible();
877                 return this;
878             }
879             checkNewCapacity(newCapacity);
880             if (newCapacity < capacity()) {
881                 length = newCapacity;
882                 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
883                 return this;
884             }
885 
886             // Reallocation required.
887             ByteBuffer data = tmpNioBuf;
888             data.clear();
889             tmpNioBuf = null;
890             Chunk chunk = this.chunk;
891             Magazine magazine = chunk.magazine;
892             AdaptivePoolingAllocator allocator = magazine.parent;
893             int readerIndex = this.readerIndex;
894             int writerIndex = this.writerIndex;
895             allocator.allocate(newCapacity, maxCapacity(), this);
896             tmpNioBuf.put(data);
897             tmpNioBuf.clear();
898             chunk.release();
899             this.readerIndex = readerIndex;
900             this.writerIndex = writerIndex;
901             return this;
902         }
903 
904         @Override
905         public ByteBufAllocator alloc() {
906             return rootParent().alloc();
907         }
908 
909         @Override
910         public ByteOrder order() {
911             return rootParent().order();
912         }
913 
914         @Override
915         public ByteBuf unwrap() {
916             return null;
917         }
918 
919         @Override
920         public boolean isDirect() {
921             return rootParent().isDirect();
922         }
923 
924         @Override
925         public int arrayOffset() {
926             return idx(rootParent().arrayOffset());
927         }
928 
929         @Override
930         public boolean hasMemoryAddress() {
931             return hasMemoryAddress;
932         }
933 
934         @Override
935         public long memoryAddress() {
936             ensureAccessible();
937             return rootParent().memoryAddress() + adjustment;
938         }
939 
940         @Override
941         public ByteBuffer nioBuffer(int index, int length) {
942             checkIndex(index, length);
943             return rootParent().nioBuffer(idx(index), length);
944         }
945 
946         @Override
947         public ByteBuffer internalNioBuffer(int index, int length) {
948             checkIndex(index, length);
949             return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
950         }
951 
952         private ByteBuffer internalNioBuffer() {
953             return (ByteBuffer) tmpNioBuf.clear();
954         }
955 
956         @Override
957         public ByteBuffer[] nioBuffers(int index, int length) {
958             checkIndex(index, length);
959             return rootParent().nioBuffers(idx(index), length);
960         }
961 
962         @Override
963         public boolean hasArray() {
964             return hasArray;
965         }
966 
967         @Override
968         public byte[] array() {
969             ensureAccessible();
970             return rootParent().array();
971         }
972 
973         @Override
974         public ByteBuf copy(int index, int length) {
975             checkIndex(index, length);
976             return rootParent().copy(idx(index), length);
977         }
978 
979         @Override
980         public int nioBufferCount() {
981             return rootParent().nioBufferCount();
982         }
983 
984         @Override
985         protected byte _getByte(int index) {
986             return rootParent()._getByte(idx(index));
987         }
988 
989         @Override
990         protected short _getShort(int index) {
991             return rootParent()._getShort(idx(index));
992         }
993 
994         @Override
995         protected short _getShortLE(int index) {
996             return rootParent()._getShortLE(idx(index));
997         }
998 
999         @Override
1000         protected int _getUnsignedMedium(int index) {
1001             return rootParent()._getUnsignedMedium(idx(index));
1002         }
1003 
1004         @Override
1005         protected int _getUnsignedMediumLE(int index) {
1006             return rootParent()._getUnsignedMediumLE(idx(index));
1007         }
1008 
1009         @Override
1010         protected int _getInt(int index) {
1011             return rootParent()._getInt(idx(index));
1012         }
1013 
1014         @Override
1015         protected int _getIntLE(int index) {
1016             return rootParent()._getIntLE(idx(index));
1017         }
1018 
1019         @Override
1020         protected long _getLong(int index) {
1021             return rootParent()._getLong(idx(index));
1022         }
1023 
1024         @Override
1025         protected long _getLongLE(int index) {
1026             return rootParent()._getLongLE(idx(index));
1027         }
1028 
1029         @Override
1030         public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1031             checkIndex(index, length);
1032             rootParent().getBytes(idx(index), dst, dstIndex, length);
1033             return this;
1034         }
1035 
1036         @Override
1037         public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1038             checkIndex(index, length);
1039             rootParent().getBytes(idx(index), dst, dstIndex, length);
1040             return this;
1041         }
1042 
1043         @Override
1044         public ByteBuf getBytes(int index, ByteBuffer dst) {
1045             checkIndex(index, dst.remaining());
1046             rootParent().getBytes(idx(index), dst);
1047             return this;
1048         }
1049 
1050         @Override
1051         protected void _setByte(int index, int value) {
1052             rootParent()._setByte(idx(index), value);
1053         }
1054 
1055         @Override
1056         protected void _setShort(int index, int value) {
1057             rootParent()._setShort(idx(index), value);
1058         }
1059 
1060         @Override
1061         protected void _setShortLE(int index, int value) {
1062             rootParent()._setShortLE(idx(index), value);
1063         }
1064 
1065         @Override
1066         protected void _setMedium(int index, int value) {
1067             rootParent()._setMedium(idx(index), value);
1068         }
1069 
1070         @Override
1071         protected void _setMediumLE(int index, int value) {
1072             rootParent()._setMediumLE(idx(index), value);
1073         }
1074 
1075         @Override
1076         protected void _setInt(int index, int value) {
1077             rootParent()._setInt(idx(index), value);
1078         }
1079 
1080         @Override
1081         protected void _setIntLE(int index, int value) {
1082             rootParent()._setIntLE(idx(index), value);
1083         }
1084 
1085         @Override
1086         protected void _setLong(int index, long value) {
1087             rootParent()._setLong(idx(index), value);
1088         }
1089 
1090         @Override
1091         protected void _setLongLE(int index, long value) {
1092             rootParent().setLongLE(idx(index), value);
1093         }
1094 
1095         @Override
1096         public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1097             checkIndex(index, length);
1098             rootParent().setBytes(idx(index), src, srcIndex, length);
1099             return this;
1100         }
1101 
1102         @Override
1103         public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1104             checkIndex(index, length);
1105             rootParent().setBytes(idx(index), src, srcIndex, length);
1106             return this;
1107         }
1108 
1109         @Override
1110         public ByteBuf setBytes(int index, ByteBuffer src) {
1111             checkIndex(index, src.remaining());
1112             rootParent().setBytes(idx(index), src);
1113             return this;
1114         }
1115 
1116         @Override
1117         public ByteBuf getBytes(int index, OutputStream out, int length)
1118                 throws IOException {
1119             checkIndex(index, length);
1120             if (length != 0) {
1121                 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1122             }
1123             return this;
1124         }
1125 
1126         @Override
1127         public int getBytes(int index, GatheringByteChannel out, int length)
1128                 throws IOException {
1129             return out.write(internalNioBuffer(index, length).duplicate());
1130         }
1131 
1132         @Override
1133         public int getBytes(int index, FileChannel out, long position, int length)
1134                 throws IOException {
1135             return out.write(internalNioBuffer(index, length).duplicate(), position);
1136         }
1137 
1138         @Override
1139         public int setBytes(int index, InputStream in, int length)
1140                 throws IOException {
1141             checkIndex(index, length);
1142             final AbstractByteBuf rootParent = rootParent();
1143             if (rootParent.hasArray()) {
1144                 return rootParent.setBytes(idx(index), in, length);
1145             }
1146             byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1147             int readBytes = in.read(tmp, 0, length);
1148             if (readBytes <= 0) {
1149                 return readBytes;
1150             }
1151             setBytes(index, tmp, 0, readBytes);
1152             return readBytes;
1153         }
1154 
1155         @Override
1156         public int setBytes(int index, ScatteringByteChannel in, int length)
1157                 throws IOException {
1158             try {
1159                 return in.read(internalNioBuffer(index, length).duplicate());
1160             } catch (ClosedChannelException ignored) {
1161                 return -1;
1162             }
1163         }
1164 
1165         @Override
1166         public int setBytes(int index, FileChannel in, long position, int length)
1167                 throws IOException {
1168             try {
1169                 return in.read(internalNioBuffer(index, length).duplicate(), position);
1170             } catch (ClosedChannelException ignored) {
1171                 return -1;
1172             }
1173         }
1174 
1175         @Override
1176         public int forEachByte(int index, int length, ByteProcessor processor) {
1177             checkIndex(index, length);
1178             int ret = rootParent().forEachByte(idx(index), length, processor);
1179             return forEachResult(ret);
1180         }
1181 
1182         @Override
1183         public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1184             checkIndex(index, length);
1185             int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1186             return forEachResult(ret);
1187         }
1188 
1189         private int forEachResult(int ret) {
1190             if (ret < adjustment) {
1191                 return -1;
1192             }
1193             return ret - adjustment;
1194         }
1195 
1196         @Override
1197         public boolean isContiguous() {
1198             return rootParent().isContiguous();
1199         }
1200 
1201         private int idx(int index) {
1202             return index + adjustment;
1203         }
1204 
1205         @Override
1206         protected void deallocate() {
1207             if (chunk != null) {
1208                 chunk.release();
1209             }
1210             tmpNioBuf = null;
1211             chunk = null;
1212             rootParent = null;
1213             if (handle instanceof Recycler.EnhancedHandle) {
1214                 ((Recycler.EnhancedHandle<?>) handle).unguardedRecycle(this);
1215             } else if (handle != null) {
1216                 handle.recycle(this);
1217             }
1218         }
1219     }
1220 
1221     /**
1222      * The strategy for how {@link AdaptivePoolingAllocator} should allocate chunk buffers.
1223      */
1224     interface ChunkAllocator {
1225         /**
1226          * Allocate a buffer for a chunk. This can be any kind of {@link AbstractByteBuf} implementation.
1227          * @param initialCapacity The initial capacity of the returned {@link AbstractByteBuf}.
1228          * @param maxCapacity The maximum capacity of the returned {@link AbstractByteBuf}.
1229          * @return The buffer that represents the chunk memory.
1230          */
1231         AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1232     }
1233 }