View Javadoc
1   /*
2    * Copyright 2022 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.buffer;
17  
18  import io.netty.util.ByteProcessor;
19  import io.netty.util.IllegalReferenceCountException;
20  import io.netty.util.NettyRuntime;
21  import io.netty.util.Recycler.EnhancedHandle;
22  import io.netty.util.ReferenceCounted;
23  import io.netty.util.concurrent.FastThreadLocal;
24  import io.netty.util.concurrent.FastThreadLocalThread;
25  import io.netty.util.internal.ObjectPool;
26  import io.netty.util.internal.ObjectUtil;
27  import io.netty.util.internal.PlatformDependent;
28  import io.netty.util.internal.ReferenceCountUpdater;
29  import io.netty.util.internal.SuppressJava6Requirement;
30  import io.netty.util.internal.SystemPropertyUtil;
31  import io.netty.util.internal.ThreadExecutorMap;
32  import io.netty.util.internal.UnstableApi;
33  
34  import java.io.IOException;
35  import java.io.InputStream;
36  import java.io.OutputStream;
37  import java.nio.ByteBuffer;
38  import java.nio.ByteOrder;
39  import java.nio.channels.ClosedChannelException;
40  import java.nio.channels.FileChannel;
41  import java.nio.channels.GatheringByteChannel;
42  import java.nio.channels.ScatteringByteChannel;
43  import java.util.Arrays;
44  import java.util.Queue;
45  import java.util.Set;
46  import java.util.concurrent.ConcurrentLinkedQueue;
47  import java.util.concurrent.CopyOnWriteArraySet;
48  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
49  import java.util.concurrent.atomic.AtomicLong;
50  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
51  import java.util.concurrent.locks.StampedLock;
52  
53  /**
54   * An auto-tuning pooling allocator, that follows an anti-generational hypothesis.
55   * <p>
56   * The allocator is organized into a list of Magazines, and each magazine has a chunk-buffer that they allocate buffers
57   * from.
58   * <p>
59   * The magazines hold the mutexes that ensure the thread-safety of the allocator, and each thread picks a magazine
60   * based on the id of the thread. This spreads the contention of multi-threaded access across the magazines.
61   * If contention is detected above a certain threshold, the number of magazines are increased in response to the
62   * contention.
63   * <p>
64   * The magazines maintain histograms of the sizes of the allocations they do. The histograms are used to compute the
65   * preferred chunk size. The preferred chunk size is one that is big enough to service 10 allocations of the
66   * 99-percentile size. This way, the chunk size is adapted to the allocation patterns.
67   * <p>
68   * Computing the preferred chunk size is a somewhat expensive operation. Therefore, the frequency with which this is
69   * done, is also adapted to the allocation pattern. If a newly computed preferred chunk is the same as the previous
70   * preferred chunk size, then the frequency is reduced. Otherwise, the frequency is increased.
71   * <p>
72   * This allows the allocator to quickly respond to changes in the application workload,
73   * without suffering undue overhead from maintaining its statistics.
74   * <p>
75   * Since magazines are "relatively thread-local", the allocator has a central queue that allow excess chunks from any
76   * magazine, to be shared with other magazines.
77   * The {@link #createSharedChunkQueue()} method can be overridden to customize this queue.
78   */
79  @SuppressJava6Requirement(reason = "Guarded by version check")
80  @UnstableApi
81  final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.AdaptiveAllocatorApi {
82  
83      enum MagazineCaching {
84          EventLoopThreads,
85          FastThreadLocalThreads,
86          None
87      }
88  
89      private static final int EXPANSION_ATTEMPTS = 3;
90      private static final int INITIAL_MAGAZINES = 4;
91      private static final int RETIRE_CAPACITY = 4 * 1024;
92      private static final int MIN_CHUNK_SIZE = 128 * 1024;
93      private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
94      private static final int BUFS_PER_CHUNK = 10; // For large buffers, aim to have about this many buffers per chunk.
95  
96      /**
97       * The maximum size of a pooled chunk, in bytes. Allocations bigger than this will never be pooled.
98       * <p>
99       * This number is 10 MiB, and is derived from the limitations of internal histograms.
100      */
101     private static final int MAX_CHUNK_SIZE =
102             BUFS_PER_CHUNK * (1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT); // 10 MiB.
103 
104     /**
105      * The capacity if the central queue that allow chunks to be shared across magazines.
106      * The default size is {@link NettyRuntime#availableProcessors()},
107      * and the maximum number of magazines is twice this.
108      * <p>
109      * This means the maximum amount of memory that we can have allocated-but-not-in-use is
110      * 5 * {@link NettyRuntime#availableProcessors()} * {@link #MAX_CHUNK_SIZE} bytes.
111      */
112     private static final int CENTRAL_QUEUE_CAPACITY = Math.max(2, SystemPropertyUtil.getInt(
113             "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors()));
114 
115     /**
116      * The capacity if the magazine local buffer queue. This queue just pools the outer ByteBuf instance and not
117      * the actual memory and so helps to reduce GC pressure.
118      */
119     private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
120             "io.netty.allocator.magazineBufferQueueCapacity", 1024);
121 
122     private static final Object NO_MAGAZINE = Boolean.TRUE;
123 
124     private final ChunkAllocator chunkAllocator;
125     private final Queue<Chunk> centralQueue;
126     private final StampedLock magazineExpandLock;
127     private volatile Magazine[] magazines;
128     private final FastThreadLocal<Object> threadLocalMagazine;
129     private final Set<Magazine> liveCachedMagazines;
130     private volatile boolean freed;
131 
132     static {
133         if (CENTRAL_QUEUE_CAPACITY < 2) {
134             throw new IllegalArgumentException("CENTRAL_QUEUE_CAPACITY: " + CENTRAL_QUEUE_CAPACITY
135                     + " (expected: >= " + 2 + ')');
136         }
137         if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
138             throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
139                     + " (expected: >= " + 2 + ')');
140         }
141     }
142 
143     AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
144         ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
145         ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
146         this.chunkAllocator = chunkAllocator;
147         centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
148         magazineExpandLock = new StampedLock();
149         if (magazineCaching != MagazineCaching.None) {
150             assert magazineCaching == MagazineCaching.EventLoopThreads ||
151                    magazineCaching == MagazineCaching.FastThreadLocalThreads;
152             final boolean cachedMagazinesNonEventLoopThreads =
153                     magazineCaching == MagazineCaching.FastThreadLocalThreads;
154             final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
155             threadLocalMagazine = new FastThreadLocal<Object>() {
156                 @Override
157                 protected Object initialValue() {
158                     if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
159                         if (!FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread())) {
160                             // To prevent potential leak, we will not use thread-local magazine.
161                             return NO_MAGAZINE;
162                         }
163                         Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
164                         liveMagazines.add(mag);
165                         return mag;
166                     }
167                     return NO_MAGAZINE;
168                 }
169 
170                 @Override
171                 protected void onRemoval(final Object value) throws Exception {
172                     if (value != NO_MAGAZINE) {
173                         liveMagazines.remove(value);
174                     }
175                 }
176             };
177             liveCachedMagazines = liveMagazines;
178         } else {
179             threadLocalMagazine = null;
180             liveCachedMagazines = null;
181         }
182         Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
183         for (int i = 0; i < mags.length; i++) {
184             mags[i] = new Magazine(this);
185         }
186         magazines = mags;
187     }
188 
189     /**
190      * Create a thread-safe multi-producer, multi-consumer queue to hold chunks that spill over from the
191      * internal Magazines.
192      * <p>
193      * Each Magazine can only hold two chunks at any one time: the chunk it currently allocates from,
194      * and the next-in-line chunk which will be used for allocation once the current one has been used up.
195      * This queue will be used by magazines to share any excess chunks they allocate, so that they don't need to
196      * allocate new chunks when their current and next-in-line chunks have both been used up.
197      * <p>
198      * The simplest implementation of this method is to return a new {@link ConcurrentLinkedQueue}.
199      * However, the {@code CLQ} is unbounded, and this means there's no limit to how many chunks can be cached in this
200      * queue.
201      * <p>
202      * Each chunk in this queue can be up to {@link #MAX_CHUNK_SIZE} in size, so it is recommended to use a bounded
203      * queue to limit the maximum memory usage.
204      * <p>
205      * The default implementation will create a bounded queue with a capacity of {@link #CENTRAL_QUEUE_CAPACITY}.
206      *
207      * @return A new multi-producer, multi-consumer queue.
208      */
209     private static Queue<Chunk> createSharedChunkQueue() {
210         return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
211     }
212 
213     @Override
214     public ByteBuf allocate(int size, int maxCapacity) {
215         return allocate(size, maxCapacity, Thread.currentThread(), null);
216     }
217 
218     private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
219         if (size <= MAX_CHUNK_SIZE) {
220             int sizeBucket = AllocationStatistics.sizeBucket(size); // Compute outside of Magazine lock for better ILP.
221             FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
222             if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
223                 Object mag = threadLocalMagazine.get();
224                 if (mag != NO_MAGAZINE) {
225                     Magazine magazine = (Magazine) mag;
226                     if (buf == null) {
227                         buf = magazine.newBuffer();
228                     }
229                     boolean allocated = magazine.tryAllocate(size, sizeBucket, maxCapacity, buf);
230                     assert allocated : "Allocation of threadLocalMagazine must always succeed";
231                     return buf;
232                 }
233             }
234             long threadId = currentThread.getId();
235             Magazine[] mags;
236             int expansions = 0;
237             do {
238                 mags = magazines;
239                 int mask = mags.length - 1;
240                 int index = (int) (threadId & mask);
241                 for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
242                     Magazine mag = mags[index + i & mask];
243                     if (buf == null) {
244                         buf = mag.newBuffer();
245                     }
246                     if (mag.tryAllocate(size, sizeBucket, maxCapacity, buf)) {
247                         // Was able to allocate.
248                         return buf;
249                     }
250                 }
251                 expansions++;
252             } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
253         }
254 
255         // The magazines failed us, or the buffer is too big to be pooled.
256         return allocateFallback(size, maxCapacity, currentThread, buf);
257     }
258 
259     private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
260         // If we don't already have a buffer, obtain one from the most conveniently available magazine.
261         Magazine magazine;
262         if (buf != null) {
263             Chunk chunk = buf.chunk;
264             if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
265                 magazine = getFallbackMagazine(currentThread);
266             }
267         } else {
268             magazine = getFallbackMagazine(currentThread);
269             buf = magazine.newBuffer();
270         }
271         // Create a one-off chunk for this allocation.
272         AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
273         Chunk chunk = new Chunk(innerChunk, magazine, false);
274         try {
275             chunk.readInitInto(buf, size, maxCapacity);
276         } finally {
277             // As the chunk is an one-off we need to always call release explicitly as readInitInto(...)
278             // will take care of retain once when successful. Once The AdaptiveByteBuf is released it will
279             // completely release the Chunk and so the contained innerChunk.
280             chunk.release();
281         }
282         return buf;
283     }
284 
285     private Magazine getFallbackMagazine(Thread currentThread) {
286         Object tlMag;
287         FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
288         if (threadLocalMagazine != null &&
289                 currentThread instanceof FastThreadLocalThread &&
290                 (tlMag = threadLocalMagazine.get()) != NO_MAGAZINE) {
291             return (Magazine) tlMag;
292         }
293         Magazine[] mags = magazines;
294         return mags[(int) currentThread.getId() & mags.length - 1];
295     }
296 
297     /**
298      * Allocate into the given buffer. Used by {@link AdaptiveByteBuf#capacity(int)}.
299      */
300     void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
301         AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
302         assert result == into: "Re-allocation created separate buffer instance";
303     }
304 
305     @Override
306     public long usedMemory() {
307         long sum = 0;
308         for (Chunk chunk : centralQueue) {
309             sum += chunk.capacity();
310         }
311         for (Magazine magazine : magazines) {
312             sum += magazine.usedMemory.get();
313         }
314         if (liveCachedMagazines != null) {
315             for (Magazine magazine : liveCachedMagazines) {
316                 sum += magazine.usedMemory.get();
317             }
318         }
319         return sum;
320     }
321 
322     private boolean tryExpandMagazines(int currentLength) {
323         if (currentLength >= MAX_STRIPES) {
324             return true;
325         }
326         final Magazine[] mags;
327         long writeLock = magazineExpandLock.tryWriteLock();
328         if (writeLock != 0) {
329             try {
330                 mags = magazines;
331                 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
332                     return true;
333                 }
334                 int preferredChunkSize = mags[0].sharedPrefChunkSize;
335                 Magazine[] expanded = new Magazine[mags.length * 2];
336                 for (int i = 0, l = expanded.length; i < l; i++) {
337                     Magazine m = new Magazine(this);
338                     m.localPrefChunkSize = preferredChunkSize;
339                     m.sharedPrefChunkSize = preferredChunkSize;
340                     expanded[i] = m;
341                 }
342                 magazines = expanded;
343             } finally {
344                 magazineExpandLock.unlockWrite(writeLock);
345             }
346             for (Magazine magazine : mags) {
347                 magazine.free();
348             }
349         }
350         return true;
351     }
352 
353     private boolean offerToQueue(Chunk buffer) {
354         if (freed) {
355             return false;
356         }
357         // The Buffer should not be used anymore, let's add an assert to so we guard against bugs in the future.
358         assert buffer.allocatedBytes == 0;
359         assert buffer.magazine == null;
360 
361         boolean isAdded = centralQueue.offer(buffer);
362         if (freed && isAdded) {
363             // Help to free the centralQueue.
364             freeCentralQueue();
365         }
366         return isAdded;
367     }
368 
369     // Ensure that we release all previous pooled resources when this object is finalized. This is needed as otherwise
370     // we might end up with leaks. While these leaks are usually harmless in reality it would still at least be
371     // very confusing for users.
372     @Override
373     protected void finalize() throws Throwable {
374         try {
375             super.finalize();
376         } finally {
377             free();
378         }
379     }
380 
381     private void free() {
382         freed = true;
383         long stamp = magazineExpandLock.writeLock();
384         try {
385             Magazine[] mags = magazines;
386             for (Magazine magazine : mags) {
387                 magazine.free();
388             }
389         } finally {
390             magazineExpandLock.unlockWrite(stamp);
391         }
392         freeCentralQueue();
393     }
394 
395     private void freeCentralQueue() {
396         for (;;) {
397             Chunk chunk = centralQueue.poll();
398             if (chunk == null) {
399                 break;
400             }
401             chunk.release();
402         }
403     }
404 
405     static int sizeBucket(int size) {
406         return AllocationStatistics.sizeBucket(size);
407     }
408 
409     @SuppressWarnings("checkstyle:finalclass") // Checkstyle mistakenly believes this class should be final.
410     private static class AllocationStatistics {
411         private static final int MIN_DATUM_TARGET = 1024;
412         private static final int MAX_DATUM_TARGET = 65534;
413         private static final int INIT_DATUM_TARGET = 9;
414         private static final int HISTO_MIN_BUCKET_SHIFT = 13; // Smallest bucket is 1 << 13 = 8192 bytes in size.
415         private static final int HISTO_MAX_BUCKET_SHIFT = 20; // Biggest bucket is 1 << 20 = 1 MiB bytes in size.
416         private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT; // 8 buckets.
417         private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
418         private static final int SIZE_MAX_MASK = MAX_CHUNK_SIZE - 1;
419 
420         protected final AdaptivePoolingAllocator parent;
421         private final boolean shareable;
422         private final short[][] histos = {
423                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
424                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
425         };
426         private short[] histo = histos[0];
427         private final int[] sums = new int[HISTO_BUCKET_COUNT];
428 
429         private int histoIndex;
430         private int datumCount;
431         private int datumTarget = INIT_DATUM_TARGET;
432         protected volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
433         protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
434 
435         private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
436             this.parent = parent;
437             this.shareable = shareable;
438         }
439 
440         protected void recordAllocationSize(int bucket) {
441             histo[bucket]++;
442             if (datumCount++ == datumTarget) {
443                 rotateHistograms();
444             }
445         }
446 
447         static int sizeBucket(int size) {
448             if (size == 0) {
449                 return 0;
450             }
451             // Minimum chunk size is 128 KiB. We'll only make bigger chunks if the 99-percentile is 16 KiB or greater,
452             // so we truncate and roll up the bottom part of the histogram to 8 KiB.
453             // The upper size band is 1 MiB, and that gives us exactly 8 size buckets,
454             // which is a magical number for JIT optimisations.
455             int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & SIZE_MAX_MASK;
456             return Math.min(Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize), HISTO_MAX_BUCKET_MASK);
457         }
458 
459         private void rotateHistograms() {
460             short[][] hs = histos;
461             for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
462                 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
463             }
464             int sum = 0;
465             for (int count : sums) {
466                 sum  += count;
467             }
468             int targetPercentile = (int) (sum * 0.99);
469             int sizeBucket = 0;
470             for (; sizeBucket < sums.length; sizeBucket++) {
471                 if (sums[sizeBucket] > targetPercentile) {
472                     break;
473                 }
474                 targetPercentile -= sums[sizeBucket];
475             }
476             int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
477             int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
478             localPrefChunkSize = prefChunkSize;
479             if (shareable) {
480                 for (Magazine mag : parent.magazines) {
481                     prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
482                 }
483             }
484             if (sharedPrefChunkSize != prefChunkSize) {
485                 // Preferred chunk size changed. Increase check frequency.
486                 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
487                 sharedPrefChunkSize = prefChunkSize;
488             } else {
489                 // Preferred chunk size did not change. Check less often.
490                 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
491             }
492 
493             histoIndex = histoIndex + 1 & 3;
494             histo = histos[histoIndex];
495             datumCount = 0;
496             Arrays.fill(histo, (short) 0);
497         }
498 
499         /**
500          * Get the preferred chunk size, based on statistics from the {@linkplain #recordAllocationSize(int) recorded}
501          * allocation sizes.
502          * <p>
503          * This method must be thread-safe.
504          *
505          * @return The currently preferred chunk allocation size.
506          */
507         protected int preferredChunkSize() {
508             return sharedPrefChunkSize;
509         }
510     }
511 
512     @SuppressJava6Requirement(reason = "Guarded by version check")
513     private static final class Magazine extends AllocationStatistics {
514         private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
515         static {
516             NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
517         }
518         private static final Chunk MAGAZINE_FREED = new Chunk();
519 
520         private static final ObjectPool<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
521                 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
522                     @Override
523                     public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
524                         return new AdaptiveByteBuf(handle);
525                     }
526                 });
527 
528         private Chunk current;
529         @SuppressWarnings("unused") // updated via NEXT_IN_LINE
530         private volatile Chunk nextInLine;
531         private final AtomicLong usedMemory;
532         private final StampedLock allocationLock;
533         private final Queue<AdaptiveByteBuf> bufferQueue;
534         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
535 
536         Magazine(AdaptivePoolingAllocator parent) {
537             this(parent, true);
538         }
539 
540         Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
541             super(parent, shareable);
542 
543             if (shareable) {
544                 // We only need the StampedLock if this Magazine will be shared across threads.
545                 allocationLock = new StampedLock();
546                 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
547                 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
548                     @Override
549                     public void recycle(AdaptiveByteBuf self) {
550                         bufferQueue.offer(self);
551                     }
552                 };
553             } else {
554                 allocationLock = null;
555                 bufferQueue = null;
556                 handle = null;
557             }
558             usedMemory = new AtomicLong();
559         }
560 
561         public boolean tryAllocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
562             if (allocationLock == null) {
563                 // This magazine is not shared across threads, just allocate directly.
564                 return allocate(size, sizeBucket, maxCapacity, buf);
565             }
566 
567             // Try to retrieve the lock and if successful allocate.
568             long writeLock = allocationLock.tryWriteLock();
569             if (writeLock != 0) {
570                 try {
571                     return allocate(size, sizeBucket, maxCapacity, buf);
572                 } finally {
573                     allocationLock.unlockWrite(writeLock);
574                 }
575             }
576             return false;
577         }
578 
579         private boolean allocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
580             recordAllocationSize(sizeBucket);
581             Chunk curr = current;
582             if (curr != null) {
583                 // We have a Chunk that has some space left.
584                 if (curr.remainingCapacity() > size) {
585                     curr.readInitInto(buf, size, maxCapacity);
586                     // We still have some bytes left that we can use for the next allocation, just early return.
587                     return true;
588                 }
589 
590                 // At this point we know that this will be the last time current will be used, so directly set it to
591                 // null and release it once we are done.
592                 current = null;
593                 if (curr.remainingCapacity() == size) {
594                     try {
595                         curr.readInitInto(buf, size, maxCapacity);
596                         return true;
597                     } finally {
598                         curr.release();
599                     }
600                 }
601 
602                 // Check if we either retain the chunk in the nextInLine cache or releasing it.
603                 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
604                     curr.release();
605                 } else {
606                     // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
607                     // This method will release curr if this is not the case
608                     transferToNextInLineOrRelease(curr);
609                 }
610             }
611 
612             assert current == null;
613             // The fast-path for allocations did not work.
614             //
615             // Try to fetch the next "Magazine local" Chunk first, if this this fails as we don't have
616             // one setup we will poll our centralQueue. If this fails as well we will just allocate a new Chunk.
617             //
618             // In any case we will store the Chunk as the current so it will be used again for the next allocation and
619             // so be "reserved" by this Magazine for exclusive usage.
620             if (nextInLine != null) {
621                 curr = NEXT_IN_LINE.getAndSet(this, null);
622                 if (curr == MAGAZINE_FREED) {
623                     // Allocation raced with a stripe-resize that freed this magazine.
624                     restoreMagazineFreed();
625                     return false;
626                 }
627 
628                 if (curr.remainingCapacity() > size) {
629                     // We have a Chunk that has some space left.
630                     curr.readInitInto(buf, size, maxCapacity);
631                     current = curr;
632                     return true;
633                 }
634 
635                 if (curr.remainingCapacity() == size) {
636                     // At this point we know that this will be the last time curr will be used, so directly set it to
637                     // null and release it once we are done.
638                     try {
639                         curr.readInitInto(buf, size, maxCapacity);
640                         return true;
641                     } finally {
642                         // Release in a finally block so even if readInitInto(...) would throw we would still correctly
643                         // release the current chunk before null it out.
644                         curr.release();
645                     }
646                 } else {
647                     // Release it as it's too small.
648                     curr.release();
649                 }
650             }
651 
652             // Now try to poll from the central queue first
653             curr = parent.centralQueue.poll();
654             if (curr == null) {
655                 curr = newChunkAllocation(size);
656             } else {
657                 curr.attachToMagazine(this);
658 
659                 if (curr.remainingCapacity() < size) {
660                     // Check if we either retain the chunk in the nextInLine cache or releasing it.
661                     if (curr.remainingCapacity() < RETIRE_CAPACITY) {
662                         curr.release();
663                     } else {
664                         // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
665                         // This method will release curr if this is not the case
666                         transferToNextInLineOrRelease(curr);
667                     }
668                     curr = newChunkAllocation(size);
669                 }
670             }
671 
672             current = curr;
673             try {
674                 assert current.remainingCapacity() >= size;
675                 if (curr.remainingCapacity() > size) {
676                     curr.readInitInto(buf, size, maxCapacity);
677                     curr = null;
678                 } else {
679                     curr.readInitInto(buf, size, maxCapacity);
680                 }
681             } finally {
682                 if (curr != null) {
683                     // Release in a finally block so even if readInitInto(...) would throw we would still correctly
684                     // release the current chunk before null it out.
685                     curr.release();
686                     current = null;
687                 }
688             }
689             return true;
690         }
691 
692         private void restoreMagazineFreed() {
693             Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
694             if (next != null && next != MAGAZINE_FREED) {
695                 next.release(); // A chunk snuck in through a race. Release it after restoring MAGAZINE_FREED state.
696             }
697         }
698 
699         private void transferToNextInLineOrRelease(Chunk chunk) {
700             if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
701                 return;
702             }
703 
704             Chunk nextChunk = NEXT_IN_LINE.get(this);
705             if (nextChunk != null && nextChunk != MAGAZINE_FREED
706                     && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
707                 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
708                     nextChunk.release();
709                     return;
710                 }
711             }
712             // Next-in-line is occupied. We don't try to add it to the central queue yet as it might still be used
713             // by some buffers and so is attached to a Magazine.
714             // Once a Chunk is completely released by Chunk.release() it will try to move itself to the queue
715             // as last resort.
716             chunk.release();
717         }
718 
719         private Chunk newChunkAllocation(int promptingSize) {
720             int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
721             ChunkAllocator chunkAllocator = parent.chunkAllocator;
722             return new Chunk(chunkAllocator.allocate(size, size), this, true);
723         }
724 
725         boolean trySetNextInLine(Chunk chunk) {
726             return NEXT_IN_LINE.compareAndSet(this, null, chunk);
727         }
728 
729         void free() {
730             // Release the current Chunk and the next that was stored for later usage.
731             restoreMagazineFreed();
732             long stamp = allocationLock.writeLock();
733             try {
734                 if (current != null) {
735                     current.release();
736                     current = null;
737                 }
738             } finally {
739                 allocationLock.unlockWrite(stamp);
740             }
741         }
742 
743         public AdaptiveByteBuf newBuffer() {
744             AdaptiveByteBuf buf;
745             if (handle == null) {
746                 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
747             } else {
748                 buf = bufferQueue.poll();
749                 if (buf == null) {
750                     buf = new AdaptiveByteBuf(handle);
751                 }
752             }
753             buf.resetRefCnt();
754             buf.discardMarks();
755             return buf;
756         }
757     }
758 
759     private static final class Chunk implements ReferenceCounted {
760 
761         private final AbstractByteBuf delegate;
762         private Magazine magazine;
763         private final AdaptivePoolingAllocator allocator;
764         private final int capacity;
765         private final boolean pooled;
766         private int allocatedBytes;
767         private static final long REFCNT_FIELD_OFFSET =
768                 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
769         private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
770                 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
771 
772         private static final ReferenceCountUpdater<Chunk> updater =
773                 new ReferenceCountUpdater<Chunk>() {
774                     @Override
775                     protected AtomicIntegerFieldUpdater<Chunk> updater() {
776                         return AIF_UPDATER;
777                     }
778                     @Override
779                     protected long unsafeOffset() {
780                         return REFCNT_FIELD_OFFSET;
781                     }
782                 };
783 
784         // Value might not equal "real" reference count, all access should be via the updater
785         @SuppressWarnings({"unused", "FieldMayBeFinal"})
786         private volatile int refCnt;
787 
788         Chunk() {
789             // Constructor only used by the MAGAZINE_FREED sentinel.
790             delegate = null;
791             magazine = null;
792             allocator = null;
793             capacity = 0;
794             pooled = false;
795         }
796 
797         Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
798             this.delegate = delegate;
799             this.pooled = pooled;
800             capacity = delegate.capacity();
801             updater.setInitialValue(this);
802             allocator = magazine.parent;
803             attachToMagazine(magazine);
804         }
805 
806         Magazine currentMagazine()  {
807             return magazine;
808         }
809 
810         void detachFromMagazine() {
811             if (magazine != null) {
812                 magazine.usedMemory.getAndAdd(-capacity);
813                 magazine = null;
814             }
815         }
816 
817         void attachToMagazine(Magazine magazine) {
818             assert this.magazine == null;
819             this.magazine = magazine;
820             magazine.usedMemory.getAndAdd(capacity);
821         }
822 
823         @Override
824         public Chunk touch(Object hint) {
825             return this;
826         }
827 
828         @Override
829         public int refCnt() {
830             return updater.refCnt(this);
831         }
832 
833         @Override
834         public Chunk retain() {
835             return updater.retain(this);
836         }
837 
838         @Override
839         public Chunk retain(int increment) {
840             return updater.retain(this, increment);
841         }
842 
843         @Override
844         public Chunk touch() {
845             return this;
846         }
847 
848         @Override
849         public boolean release() {
850             if (updater.release(this)) {
851                 deallocate();
852                 return true;
853             }
854             return false;
855         }
856 
857         @Override
858         public boolean release(int decrement) {
859             if (updater.release(this, decrement)) {
860                 deallocate();
861                 return true;
862             }
863             return false;
864         }
865 
866         private void deallocate() {
867             Magazine mag = magazine;
868             AdaptivePoolingAllocator parent = mag.parent;
869             int chunkSize = mag.preferredChunkSize();
870             int memSize = delegate.capacity();
871             if (!pooled || memSize < chunkSize || memSize > chunkSize + (chunkSize >> 1)) {
872                 // Drop the chunk if the parent allocator is closed, or if the chunk is smaller than the
873                 // preferred chunk size, or over 50% larger than the preferred chunk size.
874                 detachFromMagazine();
875                 delegate.release();
876             } else {
877                 updater.resetRefCnt(this);
878                 delegate.setIndex(0, 0);
879                 allocatedBytes = 0;
880                 if (!mag.trySetNextInLine(this)) {
881                     // As this Chunk does not belong to the mag anymore we need to decrease the used memory .
882                     detachFromMagazine();
883                     if (!parent.offerToQueue(this)) {
884                         // The central queue is full. Ensure we release again as we previously did use resetRefCnt()
885                         // which did increase the reference count by 1.
886                         boolean released = updater.release(this);
887                         delegate.release();
888                         assert released;
889                     }
890                 }
891             }
892         }
893 
894         public void readInitInto(AdaptiveByteBuf buf, int size, int maxCapacity) {
895             int startIndex = allocatedBytes;
896             allocatedBytes = startIndex + size;
897             Chunk chunk = this;
898             chunk.retain();
899             try {
900                 buf.init(delegate, chunk, 0, 0, startIndex, size, maxCapacity);
901                 chunk = null;
902             } finally {
903                 if (chunk != null) {
904                     // If chunk is not null we know that buf.init(...) failed and so we need to manually release
905                     // the chunk again as we retained it before calling buf.init(...). Beside this we also need to
906                     // restore the old allocatedBytes value.
907                     allocatedBytes = startIndex;
908                     chunk.release();
909                 }
910             }
911         }
912 
913         public int remainingCapacity() {
914             return capacity - allocatedBytes;
915         }
916 
917         public int capacity() {
918             return capacity;
919         }
920     }
921 
922     static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
923 
924         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
925 
926         private int adjustment;
927         private AbstractByteBuf rootParent;
928         Chunk chunk;
929         private int length;
930         private ByteBuffer tmpNioBuf;
931         private boolean hasArray;
932         private boolean hasMemoryAddress;
933 
934         AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
935             super(0);
936             handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
937         }
938 
939         void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
940                   int adjustment, int capacity, int maxCapacity) {
941             this.adjustment = adjustment;
942             chunk = wrapped;
943             length = capacity;
944             maxCapacity(maxCapacity);
945             setIndex0(readerIndex, writerIndex);
946             hasArray = unwrapped.hasArray();
947             hasMemoryAddress = unwrapped.hasMemoryAddress();
948             rootParent = unwrapped;
949             tmpNioBuf = unwrapped.internalNioBuffer(adjustment, capacity).slice();
950         }
951 
952         private AbstractByteBuf rootParent() {
953             final AbstractByteBuf rootParent = this.rootParent;
954             if (rootParent != null) {
955                 return rootParent;
956             }
957             throw new IllegalReferenceCountException();
958         }
959 
960         @Override
961         public int capacity() {
962             return length;
963         }
964 
965         @Override
966         public ByteBuf capacity(int newCapacity) {
967             if (newCapacity == capacity()) {
968                 ensureAccessible();
969                 return this;
970             }
971             checkNewCapacity(newCapacity);
972             if (newCapacity < capacity()) {
973                 length = newCapacity;
974                 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
975                 return this;
976             }
977 
978             // Reallocation required.
979             ByteBuffer data = tmpNioBuf;
980             data.clear();
981             tmpNioBuf = null;
982             Chunk chunk = this.chunk;
983             AdaptivePoolingAllocator allocator = chunk.allocator;
984             int readerIndex = this.readerIndex;
985             int writerIndex = this.writerIndex;
986             allocator.allocate(newCapacity, maxCapacity(), this);
987             tmpNioBuf.put(data);
988             tmpNioBuf.clear();
989             chunk.release();
990             this.readerIndex = readerIndex;
991             this.writerIndex = writerIndex;
992             return this;
993         }
994 
995         @Override
996         public ByteBufAllocator alloc() {
997             return rootParent().alloc();
998         }
999 
1000         @Override
1001         public ByteOrder order() {
1002             return rootParent().order();
1003         }
1004 
1005         @Override
1006         public ByteBuf unwrap() {
1007             return null;
1008         }
1009 
1010         @Override
1011         public boolean isDirect() {
1012             return rootParent().isDirect();
1013         }
1014 
1015         @Override
1016         public int arrayOffset() {
1017             return idx(rootParent().arrayOffset());
1018         }
1019 
1020         @Override
1021         public boolean hasMemoryAddress() {
1022             return hasMemoryAddress;
1023         }
1024 
1025         @Override
1026         public long memoryAddress() {
1027             ensureAccessible();
1028             return rootParent().memoryAddress() + adjustment;
1029         }
1030 
1031         @Override
1032         public ByteBuffer nioBuffer(int index, int length) {
1033             checkIndex(index, length);
1034             return rootParent().nioBuffer(idx(index), length);
1035         }
1036 
1037         @Override
1038         public ByteBuffer internalNioBuffer(int index, int length) {
1039             checkIndex(index, length);
1040             return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1041         }
1042 
1043         private ByteBuffer internalNioBuffer() {
1044             return (ByteBuffer) tmpNioBuf.clear();
1045         }
1046 
1047         @Override
1048         public ByteBuffer[] nioBuffers(int index, int length) {
1049             checkIndex(index, length);
1050             return rootParent().nioBuffers(idx(index), length);
1051         }
1052 
1053         @Override
1054         public boolean hasArray() {
1055             return hasArray;
1056         }
1057 
1058         @Override
1059         public byte[] array() {
1060             ensureAccessible();
1061             return rootParent().array();
1062         }
1063 
1064         @Override
1065         public ByteBuf copy(int index, int length) {
1066             checkIndex(index, length);
1067             return rootParent().copy(idx(index), length);
1068         }
1069 
1070         @Override
1071         public int nioBufferCount() {
1072             return rootParent().nioBufferCount();
1073         }
1074 
1075         @Override
1076         protected byte _getByte(int index) {
1077             return rootParent()._getByte(idx(index));
1078         }
1079 
1080         @Override
1081         protected short _getShort(int index) {
1082             return rootParent()._getShort(idx(index));
1083         }
1084 
1085         @Override
1086         protected short _getShortLE(int index) {
1087             return rootParent()._getShortLE(idx(index));
1088         }
1089 
1090         @Override
1091         protected int _getUnsignedMedium(int index) {
1092             return rootParent()._getUnsignedMedium(idx(index));
1093         }
1094 
1095         @Override
1096         protected int _getUnsignedMediumLE(int index) {
1097             return rootParent()._getUnsignedMediumLE(idx(index));
1098         }
1099 
1100         @Override
1101         protected int _getInt(int index) {
1102             return rootParent()._getInt(idx(index));
1103         }
1104 
1105         @Override
1106         protected int _getIntLE(int index) {
1107             return rootParent()._getIntLE(idx(index));
1108         }
1109 
1110         @Override
1111         protected long _getLong(int index) {
1112             return rootParent()._getLong(idx(index));
1113         }
1114 
1115         @Override
1116         protected long _getLongLE(int index) {
1117             return rootParent()._getLongLE(idx(index));
1118         }
1119 
1120         @Override
1121         public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1122             checkIndex(index, length);
1123             rootParent().getBytes(idx(index), dst, dstIndex, length);
1124             return this;
1125         }
1126 
1127         @Override
1128         public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1129             checkIndex(index, length);
1130             rootParent().getBytes(idx(index), dst, dstIndex, length);
1131             return this;
1132         }
1133 
1134         @Override
1135         public ByteBuf getBytes(int index, ByteBuffer dst) {
1136             checkIndex(index, dst.remaining());
1137             rootParent().getBytes(idx(index), dst);
1138             return this;
1139         }
1140 
1141         @Override
1142         protected void _setByte(int index, int value) {
1143             rootParent()._setByte(idx(index), value);
1144         }
1145 
1146         @Override
1147         protected void _setShort(int index, int value) {
1148             rootParent()._setShort(idx(index), value);
1149         }
1150 
1151         @Override
1152         protected void _setShortLE(int index, int value) {
1153             rootParent()._setShortLE(idx(index), value);
1154         }
1155 
1156         @Override
1157         protected void _setMedium(int index, int value) {
1158             rootParent()._setMedium(idx(index), value);
1159         }
1160 
1161         @Override
1162         protected void _setMediumLE(int index, int value) {
1163             rootParent()._setMediumLE(idx(index), value);
1164         }
1165 
1166         @Override
1167         protected void _setInt(int index, int value) {
1168             rootParent()._setInt(idx(index), value);
1169         }
1170 
1171         @Override
1172         protected void _setIntLE(int index, int value) {
1173             rootParent()._setIntLE(idx(index), value);
1174         }
1175 
1176         @Override
1177         protected void _setLong(int index, long value) {
1178             rootParent()._setLong(idx(index), value);
1179         }
1180 
1181         @Override
1182         protected void _setLongLE(int index, long value) {
1183             rootParent().setLongLE(idx(index), value);
1184         }
1185 
1186         @Override
1187         public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1188             checkIndex(index, length);
1189             rootParent().setBytes(idx(index), src, srcIndex, length);
1190             return this;
1191         }
1192 
1193         @Override
1194         public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1195             checkIndex(index, length);
1196             rootParent().setBytes(idx(index), src, srcIndex, length);
1197             return this;
1198         }
1199 
1200         @Override
1201         public ByteBuf setBytes(int index, ByteBuffer src) {
1202             checkIndex(index, src.remaining());
1203             rootParent().setBytes(idx(index), src);
1204             return this;
1205         }
1206 
1207         @Override
1208         public ByteBuf getBytes(int index, OutputStream out, int length)
1209                 throws IOException {
1210             checkIndex(index, length);
1211             if (length != 0) {
1212                 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1213             }
1214             return this;
1215         }
1216 
1217         @Override
1218         public int getBytes(int index, GatheringByteChannel out, int length)
1219                 throws IOException {
1220             return out.write(internalNioBuffer(index, length).duplicate());
1221         }
1222 
1223         @Override
1224         public int getBytes(int index, FileChannel out, long position, int length)
1225                 throws IOException {
1226             return out.write(internalNioBuffer(index, length).duplicate(), position);
1227         }
1228 
1229         @Override
1230         public int setBytes(int index, InputStream in, int length)
1231                 throws IOException {
1232             checkIndex(index, length);
1233             final AbstractByteBuf rootParent = rootParent();
1234             if (rootParent.hasArray()) {
1235                 return rootParent.setBytes(idx(index), in, length);
1236             }
1237             byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1238             int readBytes = in.read(tmp, 0, length);
1239             if (readBytes <= 0) {
1240                 return readBytes;
1241             }
1242             setBytes(index, tmp, 0, readBytes);
1243             return readBytes;
1244         }
1245 
1246         @Override
1247         public int setBytes(int index, ScatteringByteChannel in, int length)
1248                 throws IOException {
1249             try {
1250                 return in.read(internalNioBuffer(index, length).duplicate());
1251             } catch (ClosedChannelException ignored) {
1252                 return -1;
1253             }
1254         }
1255 
1256         @Override
1257         public int setBytes(int index, FileChannel in, long position, int length)
1258                 throws IOException {
1259             try {
1260                 return in.read(internalNioBuffer(index, length).duplicate(), position);
1261             } catch (ClosedChannelException ignored) {
1262                 return -1;
1263             }
1264         }
1265 
1266         @Override
1267         public int forEachByte(int index, int length, ByteProcessor processor) {
1268             checkIndex(index, length);
1269             int ret = rootParent().forEachByte(idx(index), length, processor);
1270             return forEachResult(ret);
1271         }
1272 
1273         @Override
1274         public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1275             checkIndex(index, length);
1276             int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1277             return forEachResult(ret);
1278         }
1279 
1280         private int forEachResult(int ret) {
1281             if (ret < adjustment) {
1282                 return -1;
1283             }
1284             return ret - adjustment;
1285         }
1286 
1287         @Override
1288         public boolean isContiguous() {
1289             return rootParent().isContiguous();
1290         }
1291 
1292         private int idx(int index) {
1293             return index + adjustment;
1294         }
1295 
1296         @Override
1297         protected void deallocate() {
1298             if (chunk != null) {
1299                 chunk.release();
1300             }
1301             tmpNioBuf = null;
1302             chunk = null;
1303             rootParent = null;
1304             if (handle instanceof EnhancedHandle) {
1305                 EnhancedHandle<AdaptiveByteBuf>  enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1306                 enhancedHandle.unguardedRecycle(this);
1307             } else {
1308                 handle.recycle(this);
1309             }
1310         }
1311     }
1312 
1313     /**
1314      * The strategy for how {@link AdaptivePoolingAllocator} should allocate chunk buffers.
1315      */
1316     interface ChunkAllocator {
1317         /**
1318          * Allocate a buffer for a chunk. This can be any kind of {@link AbstractByteBuf} implementation.
1319          * @param initialCapacity The initial capacity of the returned {@link AbstractByteBuf}.
1320          * @param maxCapacity The maximum capacity of the returned {@link AbstractByteBuf}.
1321          * @return The buffer that represents the chunk memory.
1322          */
1323         AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1324     }
1325 }