View Javadoc
1   /*
2    * Copyright 2022 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.buffer;
17  
18  import io.netty.util.ByteProcessor;
19  import io.netty.util.IllegalReferenceCountException;
20  import io.netty.util.NettyRuntime;
21  import io.netty.util.Recycler.EnhancedHandle;
22  import io.netty.util.ReferenceCounted;
23  import io.netty.util.concurrent.FastThreadLocal;
24  import io.netty.util.concurrent.FastThreadLocalThread;
25  import io.netty.util.internal.ObjectPool;
26  import io.netty.util.internal.ObjectUtil;
27  import io.netty.util.internal.PlatformDependent;
28  import io.netty.util.internal.ReferenceCountUpdater;
29  import io.netty.util.internal.SystemPropertyUtil;
30  import io.netty.util.internal.ThreadExecutorMap;
31  import io.netty.util.internal.UnstableApi;
32  
33  import java.io.IOException;
34  import java.io.InputStream;
35  import java.io.OutputStream;
36  import java.nio.ByteBuffer;
37  import java.nio.ByteOrder;
38  import java.nio.channels.ClosedChannelException;
39  import java.nio.channels.FileChannel;
40  import java.nio.channels.GatheringByteChannel;
41  import java.nio.channels.ScatteringByteChannel;
42  import java.util.Arrays;
43  import java.util.Queue;
44  import java.util.Set;
45  import java.util.concurrent.ConcurrentLinkedQueue;
46  import java.util.concurrent.CopyOnWriteArraySet;
47  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
48  import java.util.concurrent.atomic.AtomicLong;
49  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
50  import java.util.concurrent.locks.StampedLock;
51  
52  /**
53   * An auto-tuning pooling allocator, that follows an anti-generational hypothesis.
54   * <p>
55   * The allocator is organized into a list of Magazines, and each magazine has a chunk-buffer that they allocate buffers
56   * from.
57   * <p>
58   * The magazines hold the mutexes that ensure the thread-safety of the allocator, and each thread picks a magazine
59   * based on the id of the thread. This spreads the contention of multi-threaded access across the magazines.
60   * If contention is detected above a certain threshold, the number of magazines are increased in response to the
61   * contention.
62   * <p>
63   * The magazines maintain histograms of the sizes of the allocations they do. The histograms are used to compute the
64   * preferred chunk size. The preferred chunk size is one that is big enough to service 10 allocations of the
65   * 99-percentile size. This way, the chunk size is adapted to the allocation patterns.
66   * <p>
67   * Computing the preferred chunk size is a somewhat expensive operation. Therefore, the frequency with which this is
68   * done, is also adapted to the allocation pattern. If a newly computed preferred chunk is the same as the previous
69   * preferred chunk size, then the frequency is reduced. Otherwise, the frequency is increased.
70   * <p>
71   * This allows the allocator to quickly respond to changes in the application workload,
72   * without suffering undue overhead from maintaining its statistics.
73   * <p>
74   * Since magazines are "relatively thread-local", the allocator has a central queue that allow excess chunks from any
75   * magazine, to be shared with other magazines.
76   * The {@link #createSharedChunkQueue()} method can be overridden to customize this queue.
77   */
78  @UnstableApi
79  final class AdaptivePoolingAllocator {
80  
81      enum MagazineCaching {
82          EventLoopThreads,
83          FastThreadLocalThreads,
84          None
85      }
86  
87      private static final int EXPANSION_ATTEMPTS = 3;
88      private static final int INITIAL_MAGAZINES = 4;
89      private static final int RETIRE_CAPACITY = 4 * 1024;
90      private static final int MIN_CHUNK_SIZE = 128 * 1024;
91      private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
92      private static final int BUFS_PER_CHUNK = 10; // For large buffers, aim to have about this many buffers per chunk.
93  
94      /**
95       * The maximum size of a pooled chunk, in bytes. Allocations bigger than this will never be pooled.
96       * <p>
97       * This number is 10 MiB, and is derived from the limitations of internal histograms.
98       */
99      private static final int MAX_CHUNK_SIZE =
100             BUFS_PER_CHUNK * (1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT); // 10 MiB.
101 
102     /**
103      * The capacity if the central queue that allow chunks to be shared across magazines.
104      * The default size is {@link NettyRuntime#availableProcessors()},
105      * and the maximum number of magazines is twice this.
106      * <p>
107      * This means the maximum amount of memory that we can have allocated-but-not-in-use is
108      * 5 * {@link NettyRuntime#availableProcessors()} * {@link #MAX_CHUNK_SIZE} bytes.
109      */
110     private static final int CENTRAL_QUEUE_CAPACITY = Math.max(2, SystemPropertyUtil.getInt(
111             "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors()));
112 
113     /**
114      * The capacity if the magazine local buffer queue. This queue just pools the outer ByteBuf instance and not
115      * the actual memory and so helps to reduce GC pressure.
116      */
117     private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
118             "io.netty.allocator.magazineBufferQueueCapacity", 1024);
119 
120     private static final Object NO_MAGAZINE = Boolean.TRUE;
121 
122     private final ChunkAllocator chunkAllocator;
123     private final Queue<Chunk> centralQueue;
124     private final StampedLock magazineExpandLock;
125     private volatile Magazine[] magazines;
126     private final FastThreadLocal<Object> threadLocalMagazine;
127     private final Set<Magazine> liveCachedMagazines;
128     private volatile boolean freed;
129 
130     static {
131         if (CENTRAL_QUEUE_CAPACITY < 2) {
132             throw new IllegalArgumentException("CENTRAL_QUEUE_CAPACITY: " + CENTRAL_QUEUE_CAPACITY
133                     + " (expected: >= " + 2 + ')');
134         }
135         if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
136             throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
137                     + " (expected: >= " + 2 + ')');
138         }
139     }
140 
141     AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
142         ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
143         ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
144         this.chunkAllocator = chunkAllocator;
145         centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
146         magazineExpandLock = new StampedLock();
147         if (magazineCaching != MagazineCaching.None) {
148             assert magazineCaching == MagazineCaching.EventLoopThreads ||
149                    magazineCaching == MagazineCaching.FastThreadLocalThreads;
150             final boolean cachedMagazinesNonEventLoopThreads =
151                     magazineCaching == MagazineCaching.FastThreadLocalThreads;
152             final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
153             threadLocalMagazine = new FastThreadLocal<Object>() {
154                 @Override
155                 protected Object initialValue() {
156                     if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
157                         if (!FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread())) {
158                             // To prevent potential leak, we will not use thread-local magazine.
159                             return NO_MAGAZINE;
160                         }
161                         Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
162                         liveMagazines.add(mag);
163                         return mag;
164                     }
165                     return NO_MAGAZINE;
166                 }
167 
168                 @Override
169                 protected void onRemoval(final Object value) throws Exception {
170                     if (value != NO_MAGAZINE) {
171                         liveMagazines.remove(value);
172                     }
173                 }
174             };
175             liveCachedMagazines = liveMagazines;
176         } else {
177             threadLocalMagazine = null;
178             liveCachedMagazines = null;
179         }
180         Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
181         for (int i = 0; i < mags.length; i++) {
182             mags[i] = new Magazine(this);
183         }
184         magazines = mags;
185     }
186 
187     /**
188      * Create a thread-safe multi-producer, multi-consumer queue to hold chunks that spill over from the
189      * internal Magazines.
190      * <p>
191      * Each Magazine can only hold two chunks at any one time: the chunk it currently allocates from,
192      * and the next-in-line chunk which will be used for allocation once the current one has been used up.
193      * This queue will be used by magazines to share any excess chunks they allocate, so that they don't need to
194      * allocate new chunks when their current and next-in-line chunks have both been used up.
195      * <p>
196      * The simplest implementation of this method is to return a new {@link ConcurrentLinkedQueue}.
197      * However, the {@code CLQ} is unbounded, and this means there's no limit to how many chunks can be cached in this
198      * queue.
199      * <p>
200      * Each chunk in this queue can be up to {@link #MAX_CHUNK_SIZE} in size, so it is recommended to use a bounded
201      * queue to limit the maximum memory usage.
202      * <p>
203      * The default implementation will create a bounded queue with a capacity of {@link #CENTRAL_QUEUE_CAPACITY}.
204      *
205      * @return A new multi-producer, multi-consumer queue.
206      */
207     private static Queue<Chunk> createSharedChunkQueue() {
208         return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
209     }
210 
211     ByteBuf allocate(int size, int maxCapacity) {
212         return allocate(size, maxCapacity, Thread.currentThread(), null);
213     }
214 
215     private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
216         if (size <= MAX_CHUNK_SIZE) {
217             int sizeBucket = AllocationStatistics.sizeBucket(size); // Compute outside of Magazine lock for better ILP.
218             FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
219             if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
220                 Object mag = threadLocalMagazine.get();
221                 if (mag != NO_MAGAZINE) {
222                     Magazine magazine = (Magazine) mag;
223                     if (buf == null) {
224                         buf = magazine.newBuffer();
225                     }
226                     boolean allocated = magazine.tryAllocate(size, sizeBucket, maxCapacity, buf);
227                     assert allocated : "Allocation of threadLocalMagazine must always succeed";
228                     return buf;
229                 }
230             }
231             long threadId = currentThread.getId();
232             Magazine[] mags;
233             int expansions = 0;
234             do {
235                 mags = magazines;
236                 int mask = mags.length - 1;
237                 int index = (int) (threadId & mask);
238                 for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
239                     Magazine mag = mags[index + i & mask];
240                     if (buf == null) {
241                         buf = mag.newBuffer();
242                     }
243                     if (mag.tryAllocate(size, sizeBucket, maxCapacity, buf)) {
244                         // Was able to allocate.
245                         return buf;
246                     }
247                 }
248                 expansions++;
249             } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
250         }
251 
252         // The magazines failed us, or the buffer is too big to be pooled.
253         return allocateFallback(size, maxCapacity, currentThread, buf);
254     }
255 
256     private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
257         // If we don't already have a buffer, obtain one from the most conveniently available magazine.
258         Magazine magazine;
259         if (buf != null) {
260             Chunk chunk = buf.chunk;
261             if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
262                 magazine = getFallbackMagazine(currentThread);
263             }
264         } else {
265             magazine = getFallbackMagazine(currentThread);
266             buf = magazine.newBuffer();
267         }
268         // Create a one-off chunk for this allocation.
269         AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
270         Chunk chunk = new Chunk(innerChunk, magazine, false);
271         try {
272             chunk.readInitInto(buf, size, maxCapacity);
273         } finally {
274             // As the chunk is an one-off we need to always call release explicitly as readInitInto(...)
275             // will take care of retain once when successful. Once The AdaptiveByteBuf is released it will
276             // completely release the Chunk and so the contained innerChunk.
277             chunk.release();
278         }
279         return buf;
280     }
281 
282     private Magazine getFallbackMagazine(Thread currentThread) {
283         Object tlMag;
284         FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
285         if (threadLocalMagazine != null &&
286                 currentThread instanceof FastThreadLocalThread &&
287                 (tlMag = threadLocalMagazine.get()) != NO_MAGAZINE) {
288             return (Magazine) tlMag;
289         }
290         Magazine[] mags = magazines;
291         return mags[(int) currentThread.getId() & mags.length - 1];
292     }
293 
294     /**
295      * Allocate into the given buffer. Used by {@link AdaptiveByteBuf#capacity(int)}.
296      */
297     void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
298         AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
299         assert result == into: "Re-allocation created separate buffer instance";
300     }
301 
302     long usedMemory() {
303         long sum = 0;
304         for (Chunk chunk : centralQueue) {
305             sum += chunk.capacity();
306         }
307         for (Magazine magazine : magazines) {
308             sum += magazine.usedMemory.get();
309         }
310         if (liveCachedMagazines != null) {
311             for (Magazine magazine : liveCachedMagazines) {
312                 sum += magazine.usedMemory.get();
313             }
314         }
315         return sum;
316     }
317 
318     private boolean tryExpandMagazines(int currentLength) {
319         if (currentLength >= MAX_STRIPES) {
320             return true;
321         }
322         final Magazine[] mags;
323         long writeLock = magazineExpandLock.tryWriteLock();
324         if (writeLock != 0) {
325             try {
326                 mags = magazines;
327                 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
328                     return true;
329                 }
330                 int preferredChunkSize = mags[0].sharedPrefChunkSize;
331                 Magazine[] expanded = new Magazine[mags.length * 2];
332                 for (int i = 0, l = expanded.length; i < l; i++) {
333                     Magazine m = new Magazine(this);
334                     m.localPrefChunkSize = preferredChunkSize;
335                     m.sharedPrefChunkSize = preferredChunkSize;
336                     expanded[i] = m;
337                 }
338                 magazines = expanded;
339             } finally {
340                 magazineExpandLock.unlockWrite(writeLock);
341             }
342             for (Magazine magazine : mags) {
343                 magazine.free();
344             }
345         }
346         return true;
347     }
348 
349     private boolean offerToQueue(Chunk buffer) {
350         if (freed) {
351             return false;
352         }
353         // The Buffer should not be used anymore, let's add an assert to so we guard against bugs in the future.
354         assert buffer.allocatedBytes == 0;
355         assert buffer.magazine == null;
356 
357         boolean isAdded = centralQueue.offer(buffer);
358         if (freed && isAdded) {
359             // Help to free the centralQueue.
360             freeCentralQueue();
361         }
362         return isAdded;
363     }
364 
365     // Ensure that we release all previous pooled resources when this object is finalized. This is needed as otherwise
366     // we might end up with leaks. While these leaks are usually harmless in reality it would still at least be
367     // very confusing for users.
368     @Override
369     protected void finalize() throws Throwable {
370         try {
371             super.finalize();
372         } finally {
373             free();
374         }
375     }
376 
377     private void free() {
378         freed = true;
379         long stamp = magazineExpandLock.writeLock();
380         try {
381             Magazine[] mags = magazines;
382             for (Magazine magazine : mags) {
383                 magazine.free();
384             }
385         } finally {
386             magazineExpandLock.unlockWrite(stamp);
387         }
388         freeCentralQueue();
389     }
390 
391     private void freeCentralQueue() {
392         for (;;) {
393             Chunk chunk = centralQueue.poll();
394             if (chunk == null) {
395                 break;
396             }
397             chunk.release();
398         }
399     }
400 
401     static int sizeBucket(int size) {
402         return AllocationStatistics.sizeBucket(size);
403     }
404 
405     @SuppressWarnings("checkstyle:finalclass") // Checkstyle mistakenly believes this class should be final.
406     private static class AllocationStatistics {
407         private static final int MIN_DATUM_TARGET = 1024;
408         private static final int MAX_DATUM_TARGET = 65534;
409         private static final int INIT_DATUM_TARGET = 9;
410         private static final int HISTO_MIN_BUCKET_SHIFT = 13; // Smallest bucket is 1 << 13 = 8192 bytes in size.
411         private static final int HISTO_MAX_BUCKET_SHIFT = 20; // Biggest bucket is 1 << 20 = 1 MiB bytes in size.
412         private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT; // 8 buckets.
413         private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
414         private static final int SIZE_MAX_MASK = MAX_CHUNK_SIZE - 1;
415 
416         protected final AdaptivePoolingAllocator parent;
417         private final boolean shareable;
418         private final short[][] histos = {
419                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
420                 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
421         };
422         private short[] histo = histos[0];
423         private final int[] sums = new int[HISTO_BUCKET_COUNT];
424 
425         private int histoIndex;
426         private int datumCount;
427         private int datumTarget = INIT_DATUM_TARGET;
428         protected volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
429         protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
430 
431         private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
432             this.parent = parent;
433             this.shareable = shareable;
434         }
435 
436         protected void recordAllocationSize(int bucket) {
437             histo[bucket]++;
438             if (datumCount++ == datumTarget) {
439                 rotateHistograms();
440             }
441         }
442 
443         static int sizeBucket(int size) {
444             if (size == 0) {
445                 return 0;
446             }
447             // Minimum chunk size is 128 KiB. We'll only make bigger chunks if the 99-percentile is 16 KiB or greater,
448             // so we truncate and roll up the bottom part of the histogram to 8 KiB.
449             // The upper size band is 1 MiB, and that gives us exactly 8 size buckets,
450             // which is a magical number for JIT optimisations.
451             int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & SIZE_MAX_MASK;
452             return Math.min(Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize), HISTO_MAX_BUCKET_MASK);
453         }
454 
455         private void rotateHistograms() {
456             short[][] hs = histos;
457             for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
458                 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
459             }
460             int sum = 0;
461             for (int count : sums) {
462                 sum  += count;
463             }
464             int targetPercentile = (int) (sum * 0.99);
465             int sizeBucket = 0;
466             for (; sizeBucket < sums.length; sizeBucket++) {
467                 if (sums[sizeBucket] > targetPercentile) {
468                     break;
469                 }
470                 targetPercentile -= sums[sizeBucket];
471             }
472             int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
473             int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
474             localPrefChunkSize = prefChunkSize;
475             if (shareable) {
476                 for (Magazine mag : parent.magazines) {
477                     prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
478                 }
479             }
480             if (sharedPrefChunkSize != prefChunkSize) {
481                 // Preferred chunk size changed. Increase check frequency.
482                 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
483                 sharedPrefChunkSize = prefChunkSize;
484             } else {
485                 // Preferred chunk size did not change. Check less often.
486                 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
487             }
488 
489             histoIndex = histoIndex + 1 & 3;
490             histo = histos[histoIndex];
491             datumCount = 0;
492             Arrays.fill(histo, (short) 0);
493         }
494 
495         /**
496          * Get the preferred chunk size, based on statistics from the {@linkplain #recordAllocationSize(int) recorded}
497          * allocation sizes.
498          * <p>
499          * This method must be thread-safe.
500          *
501          * @return The currently preferred chunk allocation size.
502          */
503         protected int preferredChunkSize() {
504             return sharedPrefChunkSize;
505         }
506     }
507 
508     private static final class Magazine extends AllocationStatistics {
509         private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
510         static {
511             NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
512         }
513         private static final Chunk MAGAZINE_FREED = new Chunk();
514 
515         private static final ObjectPool<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
516                 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
517                     @Override
518                     public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
519                         return new AdaptiveByteBuf(handle);
520                     }
521                 });
522 
523         private Chunk current;
524         @SuppressWarnings("unused") // updated via NEXT_IN_LINE
525         private volatile Chunk nextInLine;
526         private final AtomicLong usedMemory;
527         private final StampedLock allocationLock;
528         private final Queue<AdaptiveByteBuf> bufferQueue;
529         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
530 
531         Magazine(AdaptivePoolingAllocator parent) {
532             this(parent, true);
533         }
534 
535         Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
536             super(parent, shareable);
537 
538             if (shareable) {
539                 // We only need the StampedLock if this Magazine will be shared across threads.
540                 allocationLock = new StampedLock();
541                 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
542                 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
543                     @Override
544                     public void recycle(AdaptiveByteBuf self) {
545                         bufferQueue.offer(self);
546                     }
547                 };
548             } else {
549                 allocationLock = null;
550                 bufferQueue = null;
551                 handle = null;
552             }
553             usedMemory = new AtomicLong();
554         }
555 
556         public boolean tryAllocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
557             if (allocationLock == null) {
558                 // This magazine is not shared across threads, just allocate directly.
559                 return allocate(size, sizeBucket, maxCapacity, buf);
560             }
561 
562             // Try to retrieve the lock and if successful allocate.
563             long writeLock = allocationLock.tryWriteLock();
564             if (writeLock != 0) {
565                 try {
566                     return allocate(size, sizeBucket, maxCapacity, buf);
567                 } finally {
568                     allocationLock.unlockWrite(writeLock);
569                 }
570             }
571             return false;
572         }
573 
574         private boolean allocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
575             recordAllocationSize(sizeBucket);
576             Chunk curr = current;
577             if (curr != null) {
578                 // We have a Chunk that has some space left.
579                 if (curr.remainingCapacity() > size) {
580                     curr.readInitInto(buf, size, maxCapacity);
581                     // We still have some bytes left that we can use for the next allocation, just early return.
582                     return true;
583                 }
584 
585                 // At this point we know that this will be the last time current will be used, so directly set it to
586                 // null and release it once we are done.
587                 current = null;
588                 if (curr.remainingCapacity() == size) {
589                     try {
590                         curr.readInitInto(buf, size, maxCapacity);
591                         return true;
592                     } finally {
593                         curr.release();
594                     }
595                 }
596 
597                 // Check if we either retain the chunk in the nextInLine cache or releasing it.
598                 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
599                     curr.release();
600                 } else {
601                     // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
602                     // This method will release curr if this is not the case
603                     transferToNextInLineOrRelease(curr);
604                 }
605             }
606 
607             assert current == null;
608             // The fast-path for allocations did not work.
609             //
610             // Try to fetch the next "Magazine local" Chunk first, if this this fails as we don't have
611             // one setup we will poll our centralQueue. If this fails as well we will just allocate a new Chunk.
612             //
613             // In any case we will store the Chunk as the current so it will be used again for the next allocation and
614             // so be "reserved" by this Magazine for exclusive usage.
615             if (nextInLine != null) {
616                 curr = NEXT_IN_LINE.getAndSet(this, null);
617                 if (curr == MAGAZINE_FREED) {
618                     // Allocation raced with a stripe-resize that freed this magazine.
619                     restoreMagazineFreed();
620                     return false;
621                 }
622 
623                 if (curr.remainingCapacity() > size) {
624                     // We have a Chunk that has some space left.
625                     curr.readInitInto(buf, size, maxCapacity);
626                     current = curr;
627                     return true;
628                 }
629 
630                 if (curr.remainingCapacity() == size) {
631                     // At this point we know that this will be the last time curr will be used, so directly set it to
632                     // null and release it once we are done.
633                     try {
634                         curr.readInitInto(buf, size, maxCapacity);
635                         return true;
636                     } finally {
637                         // Release in a finally block so even if readInitInto(...) would throw we would still correctly
638                         // release the current chunk before null it out.
639                         curr.release();
640                     }
641                 } else {
642                     // Release it as it's too small.
643                     curr.release();
644                 }
645             }
646 
647             // Now try to poll from the central queue first
648             curr = parent.centralQueue.poll();
649             if (curr == null) {
650                 curr = newChunkAllocation(size);
651             } else {
652                 curr.attachToMagazine(this);
653 
654                 if (curr.remainingCapacity() < size) {
655                     // Check if we either retain the chunk in the nextInLine cache or releasing it.
656                     if (curr.remainingCapacity() < RETIRE_CAPACITY) {
657                         curr.release();
658                     } else {
659                         // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
660                         // This method will release curr if this is not the case
661                         transferToNextInLineOrRelease(curr);
662                     }
663                     curr = newChunkAllocation(size);
664                 }
665             }
666 
667             current = curr;
668             try {
669                 assert current.remainingCapacity() >= size;
670                 if (curr.remainingCapacity() > size) {
671                     curr.readInitInto(buf, size, maxCapacity);
672                     curr = null;
673                 } else {
674                     curr.readInitInto(buf, size, maxCapacity);
675                 }
676             } finally {
677                 if (curr != null) {
678                     // Release in a finally block so even if readInitInto(...) would throw we would still correctly
679                     // release the current chunk before null it out.
680                     curr.release();
681                     current = null;
682                 }
683             }
684             return true;
685         }
686 
687         private void restoreMagazineFreed() {
688             Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
689             if (next != null && next != MAGAZINE_FREED) {
690                 next.release(); // A chunk snuck in through a race. Release it after restoring MAGAZINE_FREED state.
691             }
692         }
693 
694         private void transferToNextInLineOrRelease(Chunk chunk) {
695             if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
696                 return;
697             }
698 
699             Chunk nextChunk = NEXT_IN_LINE.get(this);
700             if (nextChunk != null && nextChunk != MAGAZINE_FREED
701                     && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
702                 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
703                     nextChunk.release();
704                     return;
705                 }
706             }
707             // Next-in-line is occupied. We don't try to add it to the central queue yet as it might still be used
708             // by some buffers and so is attached to a Magazine.
709             // Once a Chunk is completely released by Chunk.release() it will try to move itself to the queue
710             // as last resort.
711             chunk.release();
712         }
713 
714         private Chunk newChunkAllocation(int promptingSize) {
715             int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
716             ChunkAllocator chunkAllocator = parent.chunkAllocator;
717             return new Chunk(chunkAllocator.allocate(size, size), this, true);
718         }
719 
720         boolean trySetNextInLine(Chunk chunk) {
721             return NEXT_IN_LINE.compareAndSet(this, null, chunk);
722         }
723 
724         void free() {
725             // Release the current Chunk and the next that was stored for later usage.
726             restoreMagazineFreed();
727             long stamp = allocationLock.writeLock();
728             try {
729                 if (current != null) {
730                     current.release();
731                     current = null;
732                 }
733             } finally {
734                 allocationLock.unlockWrite(stamp);
735             }
736         }
737 
738         public AdaptiveByteBuf newBuffer() {
739             AdaptiveByteBuf buf;
740             if (handle == null) {
741                 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
742             } else {
743                 buf = bufferQueue.poll();
744                 if (buf == null) {
745                     buf = new AdaptiveByteBuf(handle);
746                 }
747             }
748             buf.resetRefCnt();
749             buf.discardMarks();
750             return buf;
751         }
752     }
753 
754     private static final class Chunk implements ReferenceCounted {
755 
756         private final AbstractByteBuf delegate;
757         private Magazine magazine;
758         private final AdaptivePoolingAllocator allocator;
759         private final int capacity;
760         private final boolean pooled;
761         private int allocatedBytes;
762         private static final long REFCNT_FIELD_OFFSET =
763                 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
764         private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
765                 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
766 
767         private static final ReferenceCountUpdater<Chunk> updater =
768                 new ReferenceCountUpdater<Chunk>() {
769                     @Override
770                     protected AtomicIntegerFieldUpdater<Chunk> updater() {
771                         return AIF_UPDATER;
772                     }
773                     @Override
774                     protected long unsafeOffset() {
775                         return REFCNT_FIELD_OFFSET;
776                     }
777                 };
778 
779         // Value might not equal "real" reference count, all access should be via the updater
780         @SuppressWarnings({"unused", "FieldMayBeFinal"})
781         private volatile int refCnt;
782 
783         Chunk() {
784             // Constructor only used by the MAGAZINE_FREED sentinel.
785             delegate = null;
786             magazine = null;
787             allocator = null;
788             capacity = 0;
789             pooled = false;
790         }
791 
792         Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
793             this.delegate = delegate;
794             this.pooled = pooled;
795             capacity = delegate.capacity();
796             updater.setInitialValue(this);
797             allocator = magazine.parent;
798             attachToMagazine(magazine);
799         }
800 
801         Magazine currentMagazine()  {
802             return magazine;
803         }
804 
805         void detachFromMagazine() {
806             if (magazine != null) {
807                 magazine.usedMemory.getAndAdd(-capacity);
808                 magazine = null;
809             }
810         }
811 
812         void attachToMagazine(Magazine magazine) {
813             assert this.magazine == null;
814             this.magazine = magazine;
815             magazine.usedMemory.getAndAdd(capacity);
816         }
817 
818         @Override
819         public Chunk touch(Object hint) {
820             return this;
821         }
822 
823         @Override
824         public int refCnt() {
825             return updater.refCnt(this);
826         }
827 
828         @Override
829         public Chunk retain() {
830             return updater.retain(this);
831         }
832 
833         @Override
834         public Chunk retain(int increment) {
835             return updater.retain(this, increment);
836         }
837 
838         @Override
839         public Chunk touch() {
840             return this;
841         }
842 
843         @Override
844         public boolean release() {
845             if (updater.release(this)) {
846                 deallocate();
847                 return true;
848             }
849             return false;
850         }
851 
852         @Override
853         public boolean release(int decrement) {
854             if (updater.release(this, decrement)) {
855                 deallocate();
856                 return true;
857             }
858             return false;
859         }
860 
861         private void deallocate() {
862             Magazine mag = magazine;
863             AdaptivePoolingAllocator parent = mag.parent;
864             int chunkSize = mag.preferredChunkSize();
865             int memSize = delegate.capacity();
866             if (!pooled || memSize < chunkSize || memSize > chunkSize + (chunkSize >> 1)) {
867                 // Drop the chunk if the parent allocator is closed, or if the chunk is smaller than the
868                 // preferred chunk size, or over 50% larger than the preferred chunk size.
869                 detachFromMagazine();
870                 delegate.release();
871             } else {
872                 updater.resetRefCnt(this);
873                 delegate.setIndex(0, 0);
874                 allocatedBytes = 0;
875                 if (!mag.trySetNextInLine(this)) {
876                     // As this Chunk does not belong to the mag anymore we need to decrease the used memory .
877                     detachFromMagazine();
878                     if (!parent.offerToQueue(this)) {
879                         // The central queue is full. Ensure we release again as we previously did use resetRefCnt()
880                         // which did increase the reference count by 1.
881                         boolean released = updater.release(this);
882                         delegate.release();
883                         assert released;
884                     }
885                 }
886             }
887         }
888 
889         public void readInitInto(AdaptiveByteBuf buf, int size, int maxCapacity) {
890             int startIndex = allocatedBytes;
891             allocatedBytes = startIndex + size;
892             Chunk chunk = this;
893             chunk.retain();
894             try {
895                 buf.init(delegate, chunk, 0, 0, startIndex, size, maxCapacity);
896                 chunk = null;
897             } finally {
898                 if (chunk != null) {
899                     // If chunk is not null we know that buf.init(...) failed and so we need to manually release
900                     // the chunk again as we retained it before calling buf.init(...). Beside this we also need to
901                     // restore the old allocatedBytes value.
902                     allocatedBytes = startIndex;
903                     chunk.release();
904                 }
905             }
906         }
907 
908         public int remainingCapacity() {
909             return capacity - allocatedBytes;
910         }
911 
912         public int capacity() {
913             return capacity;
914         }
915     }
916 
917     static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
918 
919         private final ObjectPool.Handle<AdaptiveByteBuf> handle;
920 
921         private int adjustment;
922         private AbstractByteBuf rootParent;
923         Chunk chunk;
924         private int length;
925         private ByteBuffer tmpNioBuf;
926         private boolean hasArray;
927         private boolean hasMemoryAddress;
928 
929         AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
930             super(0);
931             handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
932         }
933 
934         void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
935                   int adjustment, int capacity, int maxCapacity) {
936             this.adjustment = adjustment;
937             chunk = wrapped;
938             length = capacity;
939             maxCapacity(maxCapacity);
940             setIndex0(readerIndex, writerIndex);
941             hasArray = unwrapped.hasArray();
942             hasMemoryAddress = unwrapped.hasMemoryAddress();
943             rootParent = unwrapped;
944             tmpNioBuf = unwrapped.internalNioBuffer(adjustment, capacity).slice();
945         }
946 
947         private AbstractByteBuf rootParent() {
948             final AbstractByteBuf rootParent = this.rootParent;
949             if (rootParent != null) {
950                 return rootParent;
951             }
952             throw new IllegalReferenceCountException();
953         }
954 
955         @Override
956         public int capacity() {
957             return length;
958         }
959 
960         @Override
961         public ByteBuf capacity(int newCapacity) {
962             if (newCapacity == capacity()) {
963                 ensureAccessible();
964                 return this;
965             }
966             checkNewCapacity(newCapacity);
967             if (newCapacity < capacity()) {
968                 length = newCapacity;
969                 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
970                 return this;
971             }
972 
973             // Reallocation required.
974             ByteBuffer data = tmpNioBuf;
975             data.clear();
976             tmpNioBuf = null;
977             Chunk chunk = this.chunk;
978             AdaptivePoolingAllocator allocator = chunk.allocator;
979             int readerIndex = this.readerIndex;
980             int writerIndex = this.writerIndex;
981             allocator.allocate(newCapacity, maxCapacity(), this);
982             tmpNioBuf.put(data);
983             tmpNioBuf.clear();
984             chunk.release();
985             this.readerIndex = readerIndex;
986             this.writerIndex = writerIndex;
987             return this;
988         }
989 
990         @Override
991         public ByteBufAllocator alloc() {
992             return rootParent().alloc();
993         }
994 
995         @Override
996         public ByteOrder order() {
997             return rootParent().order();
998         }
999 
1000         @Override
1001         public ByteBuf unwrap() {
1002             return null;
1003         }
1004 
1005         @Override
1006         public boolean isDirect() {
1007             return rootParent().isDirect();
1008         }
1009 
1010         @Override
1011         public int arrayOffset() {
1012             return idx(rootParent().arrayOffset());
1013         }
1014 
1015         @Override
1016         public boolean hasMemoryAddress() {
1017             return hasMemoryAddress;
1018         }
1019 
1020         @Override
1021         public long memoryAddress() {
1022             ensureAccessible();
1023             return rootParent().memoryAddress() + adjustment;
1024         }
1025 
1026         @Override
1027         public ByteBuffer nioBuffer(int index, int length) {
1028             checkIndex(index, length);
1029             return rootParent().nioBuffer(idx(index), length);
1030         }
1031 
1032         @Override
1033         public ByteBuffer internalNioBuffer(int index, int length) {
1034             checkIndex(index, length);
1035             return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1036         }
1037 
1038         private ByteBuffer internalNioBuffer() {
1039             return (ByteBuffer) tmpNioBuf.clear();
1040         }
1041 
1042         @Override
1043         public ByteBuffer[] nioBuffers(int index, int length) {
1044             checkIndex(index, length);
1045             return rootParent().nioBuffers(idx(index), length);
1046         }
1047 
1048         @Override
1049         public boolean hasArray() {
1050             return hasArray;
1051         }
1052 
1053         @Override
1054         public byte[] array() {
1055             ensureAccessible();
1056             return rootParent().array();
1057         }
1058 
1059         @Override
1060         public ByteBuf copy(int index, int length) {
1061             checkIndex(index, length);
1062             return rootParent().copy(idx(index), length);
1063         }
1064 
1065         @Override
1066         public int nioBufferCount() {
1067             return rootParent().nioBufferCount();
1068         }
1069 
1070         @Override
1071         protected byte _getByte(int index) {
1072             return rootParent()._getByte(idx(index));
1073         }
1074 
1075         @Override
1076         protected short _getShort(int index) {
1077             return rootParent()._getShort(idx(index));
1078         }
1079 
1080         @Override
1081         protected short _getShortLE(int index) {
1082             return rootParent()._getShortLE(idx(index));
1083         }
1084 
1085         @Override
1086         protected int _getUnsignedMedium(int index) {
1087             return rootParent()._getUnsignedMedium(idx(index));
1088         }
1089 
1090         @Override
1091         protected int _getUnsignedMediumLE(int index) {
1092             return rootParent()._getUnsignedMediumLE(idx(index));
1093         }
1094 
1095         @Override
1096         protected int _getInt(int index) {
1097             return rootParent()._getInt(idx(index));
1098         }
1099 
1100         @Override
1101         protected int _getIntLE(int index) {
1102             return rootParent()._getIntLE(idx(index));
1103         }
1104 
1105         @Override
1106         protected long _getLong(int index) {
1107             return rootParent()._getLong(idx(index));
1108         }
1109 
1110         @Override
1111         protected long _getLongLE(int index) {
1112             return rootParent()._getLongLE(idx(index));
1113         }
1114 
1115         @Override
1116         public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1117             checkIndex(index, length);
1118             rootParent().getBytes(idx(index), dst, dstIndex, length);
1119             return this;
1120         }
1121 
1122         @Override
1123         public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1124             checkIndex(index, length);
1125             rootParent().getBytes(idx(index), dst, dstIndex, length);
1126             return this;
1127         }
1128 
1129         @Override
1130         public ByteBuf getBytes(int index, ByteBuffer dst) {
1131             checkIndex(index, dst.remaining());
1132             rootParent().getBytes(idx(index), dst);
1133             return this;
1134         }
1135 
1136         @Override
1137         protected void _setByte(int index, int value) {
1138             rootParent()._setByte(idx(index), value);
1139         }
1140 
1141         @Override
1142         protected void _setShort(int index, int value) {
1143             rootParent()._setShort(idx(index), value);
1144         }
1145 
1146         @Override
1147         protected void _setShortLE(int index, int value) {
1148             rootParent()._setShortLE(idx(index), value);
1149         }
1150 
1151         @Override
1152         protected void _setMedium(int index, int value) {
1153             rootParent()._setMedium(idx(index), value);
1154         }
1155 
1156         @Override
1157         protected void _setMediumLE(int index, int value) {
1158             rootParent()._setMediumLE(idx(index), value);
1159         }
1160 
1161         @Override
1162         protected void _setInt(int index, int value) {
1163             rootParent()._setInt(idx(index), value);
1164         }
1165 
1166         @Override
1167         protected void _setIntLE(int index, int value) {
1168             rootParent()._setIntLE(idx(index), value);
1169         }
1170 
1171         @Override
1172         protected void _setLong(int index, long value) {
1173             rootParent()._setLong(idx(index), value);
1174         }
1175 
1176         @Override
1177         protected void _setLongLE(int index, long value) {
1178             rootParent().setLongLE(idx(index), value);
1179         }
1180 
1181         @Override
1182         public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1183             checkIndex(index, length);
1184             rootParent().setBytes(idx(index), src, srcIndex, length);
1185             return this;
1186         }
1187 
1188         @Override
1189         public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1190             checkIndex(index, length);
1191             rootParent().setBytes(idx(index), src, srcIndex, length);
1192             return this;
1193         }
1194 
1195         @Override
1196         public ByteBuf setBytes(int index, ByteBuffer src) {
1197             checkIndex(index, src.remaining());
1198             rootParent().setBytes(idx(index), src);
1199             return this;
1200         }
1201 
1202         @Override
1203         public ByteBuf getBytes(int index, OutputStream out, int length)
1204                 throws IOException {
1205             checkIndex(index, length);
1206             if (length != 0) {
1207                 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1208             }
1209             return this;
1210         }
1211 
1212         @Override
1213         public int getBytes(int index, GatheringByteChannel out, int length)
1214                 throws IOException {
1215             return out.write(internalNioBuffer(index, length).duplicate());
1216         }
1217 
1218         @Override
1219         public int getBytes(int index, FileChannel out, long position, int length)
1220                 throws IOException {
1221             return out.write(internalNioBuffer(index, length).duplicate(), position);
1222         }
1223 
1224         @Override
1225         public int setBytes(int index, InputStream in, int length)
1226                 throws IOException {
1227             checkIndex(index, length);
1228             final AbstractByteBuf rootParent = rootParent();
1229             if (rootParent.hasArray()) {
1230                 return rootParent.setBytes(idx(index), in, length);
1231             }
1232             byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1233             int readBytes = in.read(tmp, 0, length);
1234             if (readBytes <= 0) {
1235                 return readBytes;
1236             }
1237             setBytes(index, tmp, 0, readBytes);
1238             return readBytes;
1239         }
1240 
1241         @Override
1242         public int setBytes(int index, ScatteringByteChannel in, int length)
1243                 throws IOException {
1244             try {
1245                 return in.read(internalNioBuffer(index, length).duplicate());
1246             } catch (ClosedChannelException ignored) {
1247                 return -1;
1248             }
1249         }
1250 
1251         @Override
1252         public int setBytes(int index, FileChannel in, long position, int length)
1253                 throws IOException {
1254             try {
1255                 return in.read(internalNioBuffer(index, length).duplicate(), position);
1256             } catch (ClosedChannelException ignored) {
1257                 return -1;
1258             }
1259         }
1260 
1261         @Override
1262         public int forEachByte(int index, int length, ByteProcessor processor) {
1263             checkIndex(index, length);
1264             int ret = rootParent().forEachByte(idx(index), length, processor);
1265             return forEachResult(ret);
1266         }
1267 
1268         @Override
1269         public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1270             checkIndex(index, length);
1271             int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1272             return forEachResult(ret);
1273         }
1274 
1275         private int forEachResult(int ret) {
1276             if (ret < adjustment) {
1277                 return -1;
1278             }
1279             return ret - adjustment;
1280         }
1281 
1282         @Override
1283         public boolean isContiguous() {
1284             return rootParent().isContiguous();
1285         }
1286 
1287         private int idx(int index) {
1288             return index + adjustment;
1289         }
1290 
1291         @Override
1292         protected void deallocate() {
1293             if (chunk != null) {
1294                 chunk.release();
1295             }
1296             tmpNioBuf = null;
1297             chunk = null;
1298             rootParent = null;
1299             if (handle instanceof EnhancedHandle) {
1300                 EnhancedHandle<AdaptiveByteBuf>  enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1301                 enhancedHandle.unguardedRecycle(this);
1302             } else {
1303                 handle.recycle(this);
1304             }
1305         }
1306     }
1307 
1308     /**
1309      * The strategy for how {@link AdaptivePoolingAllocator} should allocate chunk buffers.
1310      */
1311     interface ChunkAllocator {
1312         /**
1313          * Allocate a buffer for a chunk. This can be any kind of {@link AbstractByteBuf} implementation.
1314          * @param initialCapacity The initial capacity of the returned {@link AbstractByteBuf}.
1315          * @param maxCapacity The maximum capacity of the returned {@link AbstractByteBuf}.
1316          * @return The buffer that represents the chunk memory.
1317          */
1318         AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1319     }
1320 }