1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.ByteProcessor;
19 import io.netty.util.CharsetUtil;
20 import io.netty.util.IllegalReferenceCountException;
21 import io.netty.util.NettyRuntime;
22 import io.netty.util.Recycler;
23 import io.netty.util.Recycler.EnhancedHandle;
24 import io.netty.util.concurrent.ConcurrentSkipListIntObjMultimap;
25 import io.netty.util.concurrent.ConcurrentSkipListIntObjMultimap.IntEntry;
26 import io.netty.util.concurrent.FastThreadLocal;
27 import io.netty.util.concurrent.FastThreadLocalThread;
28 import io.netty.util.concurrent.MpscIntQueue;
29 import io.netty.util.internal.MathUtil;
30 import io.netty.util.internal.ObjectUtil;
31 import io.netty.util.internal.PlatformDependent;
32 import io.netty.util.internal.RefCnt;
33 import io.netty.util.internal.SystemPropertyUtil;
34 import io.netty.util.internal.ThreadExecutorMap;
35 import io.netty.util.internal.UnstableApi;
36
37 import java.io.IOException;
38 import java.io.InputStream;
39 import java.io.OutputStream;
40 import java.nio.ByteBuffer;
41 import java.nio.ByteOrder;
42 import java.nio.channels.ClosedChannelException;
43 import java.nio.channels.FileChannel;
44 import java.nio.channels.GatheringByteChannel;
45 import java.nio.channels.ScatteringByteChannel;
46 import java.nio.charset.Charset;
47 import java.util.Arrays;
48 import java.util.Iterator;
49 import java.util.Queue;
50 import java.util.concurrent.ConcurrentLinkedQueue;
51 import java.util.concurrent.atomic.AtomicInteger;
52 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
53 import java.util.concurrent.atomic.LongAdder;
54 import java.util.concurrent.locks.StampedLock;
55 import java.util.function.IntConsumer;
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 @UnstableApi
84 final class AdaptivePoolingAllocator {
85 private static final int LOW_MEM_THRESHOLD = 512 * 1024 * 1024;
86 private static final boolean IS_LOW_MEM = Runtime.getRuntime().maxMemory() <= LOW_MEM_THRESHOLD;
87
88
89
90
91
92 private static final boolean DISABLE_THREAD_LOCAL_MAGAZINES_ON_LOW_MEM = SystemPropertyUtil.getBoolean(
93 "io.netty.allocator.disableThreadLocalMagazinesOnLowMemory", true);
94
95
96
97
98
99
100
101
102 static final int MIN_CHUNK_SIZE = 128 * 1024;
103 private static final int EXPANSION_ATTEMPTS = 3;
104 private static final int INITIAL_MAGAZINES = 1;
105 private static final int RETIRE_CAPACITY = 256;
106 private static final int MAX_STRIPES = IS_LOW_MEM ? 1 : NettyRuntime.availableProcessors() * 2;
107 private static final int BUFS_PER_CHUNK = 8;
108
109
110
111
112
113
114 private static final int MAX_CHUNK_SIZE = IS_LOW_MEM ?
115 2 * 1024 * 1024 :
116 8 * 1024 * 1024;
117 private static final int MAX_POOLED_BUF_SIZE = MAX_CHUNK_SIZE / BUFS_PER_CHUNK;
118
119
120
121
122
123
124 private static final int CHUNK_REUSE_QUEUE = Math.max(2, SystemPropertyUtil.getInt(
125 "io.netty.allocator.chunkReuseQueueCapacity", NettyRuntime.availableProcessors() * 2));
126
127
128
129
130
131 private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
132 "io.netty.allocator.magazineBufferQueueCapacity", 1024);
133
134
135
136
137
138
139
140
141
142
143
144
145
146 private static final int[] SIZE_CLASSES = {
147 32,
148 64,
149 128,
150 256,
151 512,
152 640,
153 1024,
154 1152,
155 2048,
156 2304,
157 4096,
158 4352,
159 8192,
160 8704,
161 16384,
162 16896,
163 };
164
165 private static final int SIZE_CLASSES_COUNT = SIZE_CLASSES.length;
166 private static final byte[] SIZE_INDEXES = new byte[SIZE_CLASSES[SIZE_CLASSES_COUNT - 1] / 32 + 1];
167
168 static {
169 if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
170 throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
171 + " (expected: >= " + 2 + ')');
172 }
173 int lastIndex = 0;
174 for (int i = 0; i < SIZE_CLASSES_COUNT; i++) {
175 int sizeClass = SIZE_CLASSES[i];
176
177 assert (sizeClass & 5) == 0 : "Size class must be a multiple of 32";
178 int sizeIndex = sizeIndexOf(sizeClass);
179 Arrays.fill(SIZE_INDEXES, lastIndex + 1, sizeIndex + 1, (byte) i);
180 lastIndex = sizeIndex;
181 }
182 }
183
184 private final ChunkAllocator chunkAllocator;
185 private final ChunkRegistry chunkRegistry;
186 private final MagazineGroup[] sizeClassedMagazineGroups;
187 private final MagazineGroup largeBufferMagazineGroup;
188 private final FastThreadLocal<MagazineGroup[]> threadLocalGroup;
189
190 AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, boolean useCacheForNonEventLoopThreads) {
191 this.chunkAllocator = ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
192 chunkRegistry = new ChunkRegistry();
193 sizeClassedMagazineGroups = createMagazineGroupSizeClasses(this, false);
194 largeBufferMagazineGroup = new MagazineGroup(
195 this, chunkAllocator, new BuddyChunkManagementStrategy(), false);
196
197 boolean disableThreadLocalGroups = IS_LOW_MEM && DISABLE_THREAD_LOCAL_MAGAZINES_ON_LOW_MEM;
198 threadLocalGroup = disableThreadLocalGroups ? null : new FastThreadLocal<MagazineGroup[]>() {
199 @Override
200 protected MagazineGroup[] initialValue() {
201 if (useCacheForNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
202 return createMagazineGroupSizeClasses(AdaptivePoolingAllocator.this, true);
203 }
204 return null;
205 }
206
207 @Override
208 protected void onRemoval(final MagazineGroup[] groups) throws Exception {
209 if (groups != null) {
210 for (MagazineGroup group : groups) {
211 group.free();
212 }
213 }
214 }
215 };
216 }
217
218 private static MagazineGroup[] createMagazineGroupSizeClasses(
219 AdaptivePoolingAllocator allocator, boolean isThreadLocal) {
220 MagazineGroup[] groups = new MagazineGroup[SIZE_CLASSES.length];
221 for (int i = 0; i < SIZE_CLASSES.length; i++) {
222 int segmentSize = SIZE_CLASSES[i];
223 groups[i] = new MagazineGroup(allocator, allocator.chunkAllocator,
224 new SizeClassChunkManagementStrategy(segmentSize), isThreadLocal);
225 }
226 return groups;
227 }
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249 private static Queue<Chunk> createSharedChunkQueue() {
250 return PlatformDependent.newFixedMpmcQueue(CHUNK_REUSE_QUEUE);
251 }
252
253 ByteBuf allocate(int size, int maxCapacity) {
254 return allocate(size, maxCapacity, Thread.currentThread(), null);
255 }
256
257 private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
258 AdaptiveByteBuf allocated = null;
259 if (size <= MAX_POOLED_BUF_SIZE) {
260 final int index = sizeClassIndexOf(size);
261 MagazineGroup[] magazineGroups;
262 if (!FastThreadLocalThread.currentThreadWillCleanupFastThreadLocals() ||
263 IS_LOW_MEM ||
264 (magazineGroups = threadLocalGroup.get()) == null) {
265 magazineGroups = sizeClassedMagazineGroups;
266 }
267 if (index < magazineGroups.length) {
268 allocated = magazineGroups[index].allocate(size, maxCapacity, currentThread, buf);
269 } else if (!IS_LOW_MEM) {
270 allocated = largeBufferMagazineGroup.allocate(size, maxCapacity, currentThread, buf);
271 }
272 }
273 if (allocated == null) {
274 allocated = allocateFallback(size, maxCapacity, currentThread, buf);
275 }
276 return allocated;
277 }
278
279 private static int sizeIndexOf(final int size) {
280
281 return size + 31 >> 5;
282 }
283
284 static int sizeClassIndexOf(int size) {
285 int sizeIndex = sizeIndexOf(size);
286 if (sizeIndex < SIZE_INDEXES.length) {
287 return SIZE_INDEXES[sizeIndex];
288 }
289 return SIZE_CLASSES_COUNT;
290 }
291
292 static int[] getSizeClasses() {
293 return SIZE_CLASSES.clone();
294 }
295
296 private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
297
298 Magazine magazine;
299 if (buf != null) {
300 Chunk chunk = buf.chunk;
301 if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
302 magazine = getFallbackMagazine(currentThread);
303 }
304 } else {
305 magazine = getFallbackMagazine(currentThread);
306 buf = magazine.newBuffer();
307 }
308
309 AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
310 Chunk chunk = new Chunk(innerChunk, magazine, false);
311 chunkRegistry.add(chunk);
312 try {
313 boolean success = chunk.readInitInto(buf, size, size, maxCapacity);
314 assert success: "Failed to initialize ByteBuf with dedicated chunk";
315 } finally {
316
317
318
319 chunk.release();
320 }
321 return buf;
322 }
323
324 private Magazine getFallbackMagazine(Thread currentThread) {
325 Magazine[] mags = largeBufferMagazineGroup.magazines;
326 return mags[(int) currentThread.getId() & mags.length - 1];
327 }
328
329
330
331
332 void reallocate(int size, int maxCapacity, AdaptiveByteBuf into) {
333 AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
334 assert result == into: "Re-allocation created separate buffer instance";
335 }
336
337 long usedMemory() {
338 return chunkRegistry.totalCapacity();
339 }
340
341
342
343
344 @SuppressWarnings({"FinalizeDeclaration", "deprecation"})
345 @Override
346 protected void finalize() throws Throwable {
347 try {
348 free();
349 } finally {
350 super.finalize();
351 }
352 }
353
354 private void free() {
355 largeBufferMagazineGroup.free();
356 }
357
358 private static final class MagazineGroup {
359 private final AdaptivePoolingAllocator allocator;
360 private final ChunkAllocator chunkAllocator;
361 private final ChunkManagementStrategy chunkManagementStrategy;
362 private final ChunkCache chunkCache;
363 private final StampedLock magazineExpandLock;
364 private final Magazine threadLocalMagazine;
365 private Thread ownerThread;
366 private volatile Magazine[] magazines;
367 private volatile boolean freed;
368
369 MagazineGroup(AdaptivePoolingAllocator allocator,
370 ChunkAllocator chunkAllocator,
371 ChunkManagementStrategy chunkManagementStrategy,
372 boolean isThreadLocal) {
373 this.allocator = allocator;
374 this.chunkAllocator = chunkAllocator;
375 this.chunkManagementStrategy = chunkManagementStrategy;
376 chunkCache = chunkManagementStrategy.createChunkCache(isThreadLocal);
377 if (isThreadLocal) {
378 ownerThread = Thread.currentThread();
379 magazineExpandLock = null;
380 threadLocalMagazine = new Magazine(this, false, chunkManagementStrategy.createController(this));
381 } else {
382 ownerThread = null;
383 magazineExpandLock = new StampedLock();
384 threadLocalMagazine = null;
385 Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
386 for (int i = 0; i < mags.length; i++) {
387 mags[i] = new Magazine(this, true, chunkManagementStrategy.createController(this));
388 }
389 magazines = mags;
390 }
391 }
392
393 public AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
394 boolean reallocate = buf != null;
395
396
397 Magazine tlMag = threadLocalMagazine;
398 if (tlMag != null) {
399 if (buf == null) {
400 buf = tlMag.newBuffer();
401 }
402 boolean allocated = tlMag.tryAllocate(size, maxCapacity, buf, reallocate);
403 assert allocated : "Allocation of threadLocalMagazine must always succeed";
404 return buf;
405 }
406
407
408 long threadId = currentThread.getId();
409 Magazine[] mags;
410 int expansions = 0;
411 do {
412 mags = magazines;
413 int mask = mags.length - 1;
414 int index = (int) (threadId & mask);
415 for (int i = 0, m = mags.length << 1; i < m; i++) {
416 Magazine mag = mags[index + i & mask];
417 if (buf == null) {
418 buf = mag.newBuffer();
419 }
420 if (mag.tryAllocate(size, maxCapacity, buf, reallocate)) {
421
422 return buf;
423 }
424 }
425 expansions++;
426 } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
427
428
429 if (!reallocate && buf != null) {
430 buf.release();
431 }
432 return null;
433 }
434
435 private boolean tryExpandMagazines(int currentLength) {
436 if (currentLength >= MAX_STRIPES) {
437 return true;
438 }
439 final Magazine[] mags;
440 long writeLock = magazineExpandLock.tryWriteLock();
441 if (writeLock != 0) {
442 try {
443 mags = magazines;
444 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
445 return true;
446 }
447 Magazine[] expanded = new Magazine[mags.length * 2];
448 for (int i = 0, l = expanded.length; i < l; i++) {
449 expanded[i] = new Magazine(this, true, chunkManagementStrategy.createController(this));
450 }
451 magazines = expanded;
452 } finally {
453 magazineExpandLock.unlockWrite(writeLock);
454 }
455 for (Magazine magazine : mags) {
456 magazine.free();
457 }
458 }
459 return true;
460 }
461
462 Chunk pollChunk(int size) {
463 return chunkCache.pollChunk(size);
464 }
465
466 boolean offerChunk(Chunk chunk) {
467 if (freed) {
468 return false;
469 }
470
471 if (chunk.hasUnprocessedFreelistEntries()) {
472 chunk.processFreelistEntries();
473 }
474 boolean isAdded = chunkCache.offerChunk(chunk);
475
476 if (freed && isAdded) {
477
478 freeChunkReuseQueue(ownerThread);
479 }
480 return isAdded;
481 }
482
483 private void free() {
484 freed = true;
485 Thread ownerThread = this.ownerThread;
486 if (threadLocalMagazine != null) {
487 this.ownerThread = null;
488 threadLocalMagazine.free();
489 } else {
490 long stamp = magazineExpandLock.writeLock();
491 try {
492 Magazine[] mags = magazines;
493 for (Magazine magazine : mags) {
494 magazine.free();
495 }
496 } finally {
497 magazineExpandLock.unlockWrite(stamp);
498 }
499 }
500 freeChunkReuseQueue(ownerThread);
501 }
502
503 private void freeChunkReuseQueue(Thread ownerThread) {
504 Chunk chunk;
505 while ((chunk = chunkCache.pollChunk(0)) != null) {
506 if (ownerThread != null && chunk instanceof SizeClassedChunk) {
507 SizeClassedChunk threadLocalChunk = (SizeClassedChunk) chunk;
508 assert ownerThread == threadLocalChunk.ownerThread;
509
510
511
512 threadLocalChunk.ownerThread = null;
513 }
514 chunk.release();
515 }
516 }
517 }
518
519 private interface ChunkCache {
520 Chunk pollChunk(int size);
521 boolean offerChunk(Chunk chunk);
522 }
523
524 private static final class ConcurrentQueueChunkCache implements ChunkCache {
525 private final Queue<Chunk> queue;
526
527 private ConcurrentQueueChunkCache() {
528 queue = createSharedChunkQueue();
529 }
530
531 @Override
532 public Chunk pollChunk(int size) {
533 int attemps = queue.size();
534 for (int i = 0; i < attemps; i++) {
535 Chunk chunk = queue.poll();
536 if (chunk == null) {
537 return null;
538 }
539 if (chunk.hasUnprocessedFreelistEntries()) {
540 chunk.processFreelistEntries();
541 }
542 if (chunk.remainingCapacity() >= size) {
543 return chunk;
544 }
545 queue.offer(chunk);
546 }
547 return null;
548 }
549
550 @Override
551 public boolean offerChunk(Chunk chunk) {
552 return queue.offer(chunk);
553 }
554 }
555
556 private static final class ConcurrentSkipListChunkCache implements ChunkCache {
557 private final ConcurrentSkipListIntObjMultimap<Chunk> chunks;
558
559 private ConcurrentSkipListChunkCache() {
560 chunks = new ConcurrentSkipListIntObjMultimap<>(-1);
561 }
562
563 @Override
564 public Chunk pollChunk(int size) {
565 if (chunks.isEmpty()) {
566 return null;
567 }
568 IntEntry<Chunk> entry = chunks.pollCeilingEntry(size);
569 if (entry != null) {
570 Chunk chunk = entry.getValue();
571 if (chunk.hasUnprocessedFreelistEntries()) {
572 chunk.processFreelistEntries();
573 }
574 return chunk;
575 }
576
577 Chunk bestChunk = null;
578 int bestRemainingCapacity = 0;
579 Iterator<IntEntry<Chunk>> itr = chunks.iterator();
580 while (itr.hasNext()) {
581 entry = itr.next();
582 final Chunk chunk;
583 if (entry != null && (chunk = entry.getValue()).hasUnprocessedFreelistEntries()) {
584 if (!chunks.remove(entry.getKey(), entry.getValue())) {
585 continue;
586 }
587 chunk.processFreelistEntries();
588 int remainingCapacity = chunk.remainingCapacity();
589 if (remainingCapacity >= size &&
590 (bestChunk == null || remainingCapacity > bestRemainingCapacity)) {
591 if (bestChunk != null) {
592 chunks.put(bestRemainingCapacity, bestChunk);
593 }
594 bestChunk = chunk;
595 bestRemainingCapacity = remainingCapacity;
596 } else {
597 chunks.put(remainingCapacity, chunk);
598 }
599 }
600 }
601
602 return bestChunk;
603 }
604
605 @Override
606 public boolean offerChunk(Chunk chunk) {
607 chunks.put(chunk.remainingCapacity(), chunk);
608
609 int size = chunks.size();
610 while (size > CHUNK_REUSE_QUEUE) {
611
612 int key = -1;
613 Chunk toDeallocate = null;
614 for (IntEntry<Chunk> entry : chunks) {
615 Chunk candidate = entry.getValue();
616 if (candidate != null) {
617 if (toDeallocate == null) {
618 toDeallocate = candidate;
619 key = entry.getKey();
620 } else {
621 int candidateRefCnt = RefCnt.refCnt(candidate.refCnt);
622 int toDeallocateRefCnt = RefCnt.refCnt(toDeallocate.refCnt);
623 if (candidateRefCnt < toDeallocateRefCnt ||
624 candidateRefCnt == toDeallocateRefCnt &&
625 candidate.capacity() < toDeallocate.capacity()) {
626 toDeallocate = candidate;
627 key = entry.getKey();
628 }
629 }
630 }
631 }
632 if (toDeallocate == null) {
633 break;
634 }
635 if (chunks.remove(key, toDeallocate)) {
636 toDeallocate.release();
637 }
638 size = chunks.size();
639 }
640 return true;
641 }
642 }
643
644 private interface ChunkManagementStrategy {
645 ChunkController createController(MagazineGroup group);
646
647 ChunkCache createChunkCache(boolean isThreadLocal);
648 }
649
650 private interface ChunkController {
651
652
653
654 int computeBufferCapacity(int requestedSize, int maxCapacity, boolean isReallocation);
655
656
657
658
659 Chunk newChunkAllocation(int promptingSize, Magazine magazine);
660 }
661
662 private static final class SizeClassChunkManagementStrategy implements ChunkManagementStrategy {
663
664
665
666 private static final int MIN_SEGMENTS_PER_CHUNK = 32;
667 private final int segmentSize;
668 private final int chunkSize;
669
670 private SizeClassChunkManagementStrategy(int segmentSize) {
671 this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
672 chunkSize = Math.max(MIN_CHUNK_SIZE, segmentSize * MIN_SEGMENTS_PER_CHUNK);
673 }
674
675 @Override
676 public ChunkController createController(MagazineGroup group) {
677 return new SizeClassChunkController(group, segmentSize, chunkSize);
678 }
679
680 @Override
681 public ChunkCache createChunkCache(boolean isThreadLocal) {
682 return new ConcurrentQueueChunkCache();
683 }
684 }
685
686 private static final class SizeClassChunkController implements ChunkController {
687
688 private final ChunkAllocator chunkAllocator;
689 private final int segmentSize;
690 private final int chunkSize;
691 private final ChunkRegistry chunkRegistry;
692
693 private SizeClassChunkController(MagazineGroup group, int segmentSize, int chunkSize) {
694 chunkAllocator = group.chunkAllocator;
695 this.segmentSize = segmentSize;
696 this.chunkSize = chunkSize;
697 chunkRegistry = group.allocator.chunkRegistry;
698 }
699
700 private MpscIntQueue createEmptyFreeList() {
701 return MpscIntQueue.create(chunkSize / segmentSize, SizeClassedChunk.FREE_LIST_EMPTY);
702 }
703
704 private MpscIntQueue createFreeList() {
705 final int segmentsCount = chunkSize / segmentSize;
706 final MpscIntQueue freeList = MpscIntQueue.create(segmentsCount, SizeClassedChunk.FREE_LIST_EMPTY);
707 int segmentOffset = 0;
708 for (int i = 0; i < segmentsCount; i++) {
709 freeList.offer(segmentOffset);
710 segmentOffset += segmentSize;
711 }
712 return freeList;
713 }
714
715 private IntStack createLocalFreeList() {
716 final int segmentsCount = chunkSize / segmentSize;
717 int segmentOffset = chunkSize;
718 int[] offsets = new int[segmentsCount];
719 for (int i = 0; i < segmentsCount; i++) {
720 segmentOffset -= segmentSize;
721 offsets[i] = segmentOffset;
722 }
723 return new IntStack(offsets);
724 }
725
726 @Override
727 public int computeBufferCapacity(
728 int requestedSize, int maxCapacity, boolean isReallocation) {
729 return Math.min(segmentSize, maxCapacity);
730 }
731
732 @Override
733 public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
734 AbstractByteBuf chunkBuffer = chunkAllocator.allocate(chunkSize, chunkSize);
735 assert chunkBuffer.capacity() == chunkSize;
736 SizeClassedChunk chunk = new SizeClassedChunk(chunkBuffer, magazine, this);
737 chunkRegistry.add(chunk);
738 return chunk;
739 }
740 }
741
742 private static final class BuddyChunkManagementStrategy implements ChunkManagementStrategy {
743 private final AtomicInteger maxChunkSize = new AtomicInteger();
744
745 @Override
746 public ChunkController createController(MagazineGroup group) {
747 return new BuddyChunkController(group, maxChunkSize);
748 }
749
750 @Override
751 public ChunkCache createChunkCache(boolean isThreadLocal) {
752 return new ConcurrentSkipListChunkCache();
753 }
754 }
755
756 private static final class BuddyChunkController implements ChunkController {
757 private final ChunkAllocator chunkAllocator;
758 private final ChunkRegistry chunkRegistry;
759 private final AtomicInteger maxChunkSize;
760
761 BuddyChunkController(MagazineGroup group, AtomicInteger maxChunkSize) {
762 chunkAllocator = group.chunkAllocator;
763 chunkRegistry = group.allocator.chunkRegistry;
764 this.maxChunkSize = maxChunkSize;
765 }
766
767 @Override
768 public int computeBufferCapacity(int requestedSize, int maxCapacity, boolean isReallocation) {
769 return MathUtil.safeFindNextPositivePowerOfTwo(requestedSize);
770 }
771
772 @Override
773 public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
774 int maxChunkSize = this.maxChunkSize.get();
775 int proposedChunkSize = MathUtil.safeFindNextPositivePowerOfTwo(BUFS_PER_CHUNK * promptingSize);
776 int chunkSize = Math.min(MAX_CHUNK_SIZE, Math.max(maxChunkSize, proposedChunkSize));
777 if (chunkSize > maxChunkSize) {
778
779 this.maxChunkSize.set(chunkSize);
780 }
781 BuddyChunk chunk = new BuddyChunk(chunkAllocator.allocate(chunkSize, chunkSize), magazine);
782 chunkRegistry.add(chunk);
783 return chunk;
784 }
785 }
786
787 private static final class Magazine {
788 private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
789 static {
790 NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
791 }
792 private static final Chunk MAGAZINE_FREED = new Chunk();
793
794 private static final class AdaptiveRecycler extends Recycler<AdaptiveByteBuf> {
795
796 private AdaptiveRecycler(boolean unguarded) {
797
798 super(unguarded);
799 }
800
801 private AdaptiveRecycler(int maxCapacity, boolean unguarded) {
802
803 super(maxCapacity, unguarded);
804 }
805
806 @Override
807 protected AdaptiveByteBuf newObject(final Handle<AdaptiveByteBuf> handle) {
808 return new AdaptiveByteBuf((EnhancedHandle<AdaptiveByteBuf>) handle);
809 }
810
811 public static AdaptiveRecycler threadLocal() {
812 return new AdaptiveRecycler(true);
813 }
814
815 public static AdaptiveRecycler sharedWith(int maxCapacity) {
816 return new AdaptiveRecycler(maxCapacity, true);
817 }
818 }
819
820 private static final AdaptiveRecycler EVENT_LOOP_LOCAL_BUFFER_POOL = AdaptiveRecycler.threadLocal();
821
822 private Chunk current;
823 @SuppressWarnings("unused")
824 private volatile Chunk nextInLine;
825 private final MagazineGroup group;
826 private final ChunkController chunkController;
827 private final StampedLock allocationLock;
828 private final AdaptiveRecycler recycler;
829
830 Magazine(MagazineGroup group, boolean shareable, ChunkController chunkController) {
831 this.group = group;
832 this.chunkController = chunkController;
833
834 if (shareable) {
835
836 allocationLock = new StampedLock();
837 recycler = AdaptiveRecycler.sharedWith(MAGAZINE_BUFFER_QUEUE_CAPACITY);
838 } else {
839 allocationLock = null;
840 recycler = null;
841 }
842 }
843
844 public boolean tryAllocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
845 if (allocationLock == null) {
846
847 return allocate(size, maxCapacity, buf, reallocate);
848 }
849
850
851 long writeLock = allocationLock.tryWriteLock();
852 if (writeLock != 0) {
853 try {
854 return allocate(size, maxCapacity, buf, reallocate);
855 } finally {
856 allocationLock.unlockWrite(writeLock);
857 }
858 }
859 return allocateWithoutLock(size, maxCapacity, buf);
860 }
861
862 private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf buf) {
863 Chunk curr = NEXT_IN_LINE.getAndSet(this, null);
864 if (curr == MAGAZINE_FREED) {
865
866 restoreMagazineFreed();
867 return false;
868 }
869 if (curr == null) {
870 curr = group.pollChunk(size);
871 if (curr == null) {
872 return false;
873 }
874 curr.attachToMagazine(this);
875 }
876 boolean allocated = false;
877 int remainingCapacity = curr.remainingCapacity();
878 int startingCapacity = chunkController.computeBufferCapacity(
879 size, maxCapacity, true );
880 if (remainingCapacity >= size &&
881 curr.readInitInto(buf, size, Math.min(remainingCapacity, startingCapacity), maxCapacity)) {
882 allocated = true;
883 remainingCapacity = curr.remainingCapacity();
884 }
885 try {
886 if (remainingCapacity >= RETIRE_CAPACITY) {
887 transferToNextInLineOrRelease(curr);
888 curr = null;
889 }
890 } finally {
891 if (curr != null) {
892 curr.releaseFromMagazine();
893 }
894 }
895 return allocated;
896 }
897
898 private boolean allocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
899 int startingCapacity = chunkController.computeBufferCapacity(size, maxCapacity, reallocate);
900 Chunk curr = current;
901 if (curr != null) {
902 boolean success = curr.readInitInto(buf, size, startingCapacity, maxCapacity);
903 int remainingCapacity = curr.remainingCapacity();
904 if (!success && remainingCapacity > 0) {
905 current = null;
906 transferToNextInLineOrRelease(curr);
907 } else if (remainingCapacity == 0) {
908 current = null;
909 curr.releaseFromMagazine();
910 }
911 if (success) {
912 return true;
913 }
914 }
915
916 assert current == null;
917
918
919
920
921
922
923
924
925 curr = NEXT_IN_LINE.getAndSet(this, null);
926 if (curr != null) {
927 if (curr == MAGAZINE_FREED) {
928
929 restoreMagazineFreed();
930 return false;
931 }
932
933 int remainingCapacity = curr.remainingCapacity();
934 if (remainingCapacity > startingCapacity &&
935 curr.readInitInto(buf, size, startingCapacity, maxCapacity)) {
936
937 current = curr;
938 return true;
939 }
940
941 try {
942 if (remainingCapacity >= size) {
943
944
945 return curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
946 }
947 } finally {
948
949
950 curr.releaseFromMagazine();
951 }
952 }
953
954
955 curr = group.pollChunk(size);
956 if (curr == null) {
957 curr = chunkController.newChunkAllocation(size, this);
958 } else {
959 curr.attachToMagazine(this);
960
961 int remainingCapacity = curr.remainingCapacity();
962 if (remainingCapacity == 0 || remainingCapacity < size) {
963
964 if (remainingCapacity < RETIRE_CAPACITY) {
965 curr.releaseFromMagazine();
966 } else {
967
968
969 transferToNextInLineOrRelease(curr);
970 }
971 curr = chunkController.newChunkAllocation(size, this);
972 }
973 }
974
975 current = curr;
976 boolean success;
977 try {
978 int remainingCapacity = curr.remainingCapacity();
979 assert remainingCapacity >= size;
980 if (remainingCapacity > startingCapacity) {
981 success = curr.readInitInto(buf, size, startingCapacity, maxCapacity);
982 curr = null;
983 } else {
984 success = curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
985 }
986 } finally {
987 if (curr != null) {
988
989
990 curr.releaseFromMagazine();
991 current = null;
992 }
993 }
994 return success;
995 }
996
997 private void restoreMagazineFreed() {
998 Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
999 if (next != null && next != MAGAZINE_FREED) {
1000
1001 next.releaseFromMagazine();
1002 }
1003 }
1004
1005 private void transferToNextInLineOrRelease(Chunk chunk) {
1006 if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
1007 return;
1008 }
1009
1010 Chunk nextChunk = NEXT_IN_LINE.get(this);
1011 if (nextChunk != null && nextChunk != MAGAZINE_FREED
1012 && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
1013 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
1014 nextChunk.releaseFromMagazine();
1015 return;
1016 }
1017 }
1018
1019
1020
1021
1022 chunk.releaseFromMagazine();
1023 }
1024
1025 void free() {
1026
1027 restoreMagazineFreed();
1028 long stamp = allocationLock != null ? allocationLock.writeLock() : 0;
1029 try {
1030 if (current != null) {
1031 current.releaseFromMagazine();
1032 current = null;
1033 }
1034 } finally {
1035 if (allocationLock != null) {
1036 allocationLock.unlockWrite(stamp);
1037 }
1038 }
1039 }
1040
1041 public AdaptiveByteBuf newBuffer() {
1042 AdaptiveRecycler recycler = this.recycler;
1043 AdaptiveByteBuf buf = recycler == null? EVENT_LOOP_LOCAL_BUFFER_POOL.get() : recycler.get();
1044 buf.resetRefCnt();
1045 buf.discardMarks();
1046 return buf;
1047 }
1048
1049 boolean offerToQueue(Chunk chunk) {
1050 return group.offerChunk(chunk);
1051 }
1052 }
1053
1054 private static final class ChunkRegistry {
1055 private final LongAdder totalCapacity = new LongAdder();
1056
1057 public long totalCapacity() {
1058 return totalCapacity.sum();
1059 }
1060
1061 public void add(Chunk chunk) {
1062 totalCapacity.add(chunk.capacity());
1063 }
1064
1065 public void remove(Chunk chunk) {
1066 totalCapacity.add(-chunk.capacity());
1067 }
1068 }
1069
1070 private static class Chunk implements ChunkInfo {
1071 protected final AbstractByteBuf delegate;
1072 protected Magazine magazine;
1073 private final AdaptivePoolingAllocator allocator;
1074
1075
1076 private final RefCnt refCnt = new RefCnt();
1077 private final int capacity;
1078 private final boolean pooled;
1079 protected int allocatedBytes;
1080
1081 Chunk() {
1082
1083 delegate = null;
1084 magazine = null;
1085 allocator = null;
1086 capacity = 0;
1087 pooled = false;
1088 }
1089
1090 Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
1091 this.delegate = delegate;
1092 this.pooled = pooled;
1093 capacity = delegate.capacity();
1094 attachToMagazine(magazine);
1095
1096
1097 allocator = magazine.group.allocator;
1098
1099 if (PlatformDependent.isJfrEnabled() && AllocateChunkEvent.isEventEnabled()) {
1100 AllocateChunkEvent event = new AllocateChunkEvent();
1101 if (event.shouldCommit()) {
1102 event.fill(this, AdaptiveByteBufAllocator.class);
1103 event.pooled = pooled;
1104 event.threadLocal = magazine.allocationLock == null;
1105 event.commit();
1106 }
1107 }
1108 }
1109
1110 Magazine currentMagazine() {
1111 return magazine;
1112 }
1113
1114 void detachFromMagazine() {
1115 if (magazine != null) {
1116 magazine = null;
1117 }
1118 }
1119
1120 void attachToMagazine(Magazine magazine) {
1121 assert this.magazine == null;
1122 this.magazine = magazine;
1123 }
1124
1125
1126
1127
1128 boolean releaseFromMagazine() {
1129
1130
1131 Magazine mag = magazine;
1132 detachFromMagazine();
1133 if (!mag.offerToQueue(this)) {
1134 return release();
1135 }
1136 return false;
1137 }
1138
1139
1140
1141
1142 void releaseSegment(int ignoredSegmentId, int size) {
1143 release();
1144 }
1145
1146 private void retain() {
1147 RefCnt.retain(refCnt);
1148 }
1149
1150 protected boolean release() {
1151 boolean deallocate = RefCnt.release(refCnt);
1152 if (deallocate) {
1153 deallocate();
1154 }
1155 return deallocate;
1156 }
1157
1158 protected void deallocate() {
1159 onRelease();
1160 allocator.chunkRegistry.remove(this);
1161 delegate.release();
1162 }
1163
1164 private void onRelease() {
1165 if (PlatformDependent.isJfrEnabled() && FreeChunkEvent.isEventEnabled()) {
1166 FreeChunkEvent event = new FreeChunkEvent();
1167 if (event.shouldCommit()) {
1168 event.fill(this, AdaptiveByteBufAllocator.class);
1169 event.pooled = pooled;
1170 event.commit();
1171 }
1172 }
1173 }
1174
1175 public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1176 int startIndex = allocatedBytes;
1177 allocatedBytes = startIndex + startingCapacity;
1178 Chunk chunk = this;
1179 chunk.retain();
1180 try {
1181 buf.init(delegate, chunk, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1182 chunk = null;
1183 } finally {
1184 if (chunk != null) {
1185
1186
1187
1188 allocatedBytes = startIndex;
1189 chunk.release();
1190 }
1191 }
1192 return true;
1193 }
1194
1195 public int remainingCapacity() {
1196 return capacity - allocatedBytes;
1197 }
1198
1199 public boolean hasUnprocessedFreelistEntries() {
1200 return false;
1201 }
1202
1203 public void processFreelistEntries() {
1204 }
1205
1206 @Override
1207 public int capacity() {
1208 return capacity;
1209 }
1210
1211 @Override
1212 public boolean isDirect() {
1213 return delegate.isDirect();
1214 }
1215
1216 @Override
1217 public long memoryAddress() {
1218 return delegate._memoryAddress();
1219 }
1220 }
1221
1222 private static final class IntStack {
1223
1224 private final int[] stack;
1225 private int top;
1226
1227 IntStack(int[] initialValues) {
1228 stack = initialValues;
1229 top = initialValues.length - 1;
1230 }
1231
1232 public boolean isEmpty() {
1233 return top == -1;
1234 }
1235
1236 public int pop() {
1237 final int last = stack[top];
1238 top--;
1239 return last;
1240 }
1241
1242 public void push(int value) {
1243 stack[top + 1] = value;
1244 top++;
1245 }
1246
1247 public int size() {
1248 return top + 1;
1249 }
1250 }
1251
1252 private static final class SizeClassedChunk extends Chunk {
1253 private static final int FREE_LIST_EMPTY = -1;
1254 private final int segmentSize;
1255 private final MpscIntQueue externalFreeList;
1256 private final IntStack localFreeList;
1257 private Thread ownerThread;
1258
1259 SizeClassedChunk(AbstractByteBuf delegate, Magazine magazine,
1260 SizeClassChunkController controller) {
1261 super(delegate, magazine, true);
1262 segmentSize = controller.segmentSize;
1263 ownerThread = magazine.group.ownerThread;
1264 if (ownerThread == null) {
1265 externalFreeList = controller.createFreeList();
1266 localFreeList = null;
1267 } else {
1268 externalFreeList = controller.createEmptyFreeList();
1269 localFreeList = controller.createLocalFreeList();
1270 }
1271 }
1272
1273 @Override
1274 public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1275 final int startIndex = nextAvailableSegmentOffset();
1276 if (startIndex == FREE_LIST_EMPTY) {
1277 return false;
1278 }
1279 allocatedBytes += segmentSize;
1280 Chunk chunk = this;
1281 chunk.retain();
1282 try {
1283 buf.init(delegate, chunk, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1284 chunk = null;
1285 } finally {
1286 if (chunk != null) {
1287
1288
1289
1290 allocatedBytes -= segmentSize;
1291 chunk.releaseSegment(startIndex, startingCapacity);
1292 }
1293 }
1294 return true;
1295 }
1296
1297 private int nextAvailableSegmentOffset() {
1298 final int startIndex;
1299 IntStack localFreeList = this.localFreeList;
1300 if (localFreeList != null) {
1301 assert Thread.currentThread() == ownerThread;
1302 if (localFreeList.isEmpty()) {
1303 startIndex = externalFreeList.poll();
1304 } else {
1305 startIndex = localFreeList.pop();
1306 }
1307 } else {
1308 startIndex = externalFreeList.poll();
1309 }
1310 return startIndex;
1311 }
1312
1313 private int remainingCapacityOnFreeList() {
1314 final int segmentSize = this.segmentSize;
1315 int remainingCapacity = externalFreeList.size() * segmentSize;
1316 IntStack localFreeList = this.localFreeList;
1317 if (localFreeList != null) {
1318 assert Thread.currentThread() == ownerThread;
1319 remainingCapacity += localFreeList.size() * segmentSize;
1320 }
1321 return remainingCapacity;
1322 }
1323
1324 @Override
1325 public int remainingCapacity() {
1326 int remainingCapacity = super.remainingCapacity();
1327 if (remainingCapacity > segmentSize) {
1328 return remainingCapacity;
1329 }
1330 int updatedRemainingCapacity = remainingCapacityOnFreeList();
1331 if (updatedRemainingCapacity == remainingCapacity) {
1332 return remainingCapacity;
1333 }
1334
1335 allocatedBytes = capacity() - updatedRemainingCapacity;
1336 return updatedRemainingCapacity;
1337 }
1338
1339 private void releaseSegmentOffsetIntoFreeList(int startIndex) {
1340 IntStack localFreeList = this.localFreeList;
1341 if (localFreeList != null && Thread.currentThread() == ownerThread) {
1342 localFreeList.push(startIndex);
1343 } else {
1344 boolean segmentReturned = externalFreeList.offer(startIndex);
1345 assert segmentReturned : "Unable to return segment " + startIndex + " to free list";
1346 }
1347 }
1348
1349 @Override
1350 void releaseSegment(int startIndex, int size) {
1351 release();
1352 releaseSegmentOffsetIntoFreeList(startIndex);
1353 }
1354 }
1355
1356 private static final class BuddyChunk extends Chunk implements IntConsumer {
1357 private static final int MIN_BUDDY_SIZE = 32768;
1358 private static final byte IS_CLAIMED = (byte) (1 << 7);
1359 private static final byte HAS_CLAIMED_CHILDREN = 1 << 6;
1360 private static final byte SHIFT_MASK = ~(IS_CLAIMED | HAS_CLAIMED_CHILDREN);
1361 private static final int PACK_OFFSET_MASK = 0xFFFF;
1362 private static final int PACK_SIZE_SHIFT = Integer.SIZE - Integer.numberOfLeadingZeros(PACK_OFFSET_MASK);
1363
1364 private final MpscIntQueue freeList;
1365
1366 private final byte[] buddies;
1367 private final int freeListCapacity;
1368
1369 BuddyChunk(AbstractByteBuf delegate, Magazine magazine) {
1370 super(delegate, magazine, true);
1371 int capacity = delegate.capacity();
1372 int capFactor = capacity / MIN_BUDDY_SIZE;
1373 int tree = (capFactor << 1) - 1;
1374 int maxShift = Integer.numberOfTrailingZeros(capFactor);
1375 assert maxShift <= 30;
1376 freeListCapacity = tree >> 1;
1377 freeList = MpscIntQueue.create(freeListCapacity, -1);
1378 buddies = new byte[1 + tree];
1379
1380
1381 int index = 1;
1382 int runLength = 1;
1383 int currentRun = 0;
1384 while (maxShift > 0) {
1385 buddies[index++] = (byte) maxShift;
1386 if (++currentRun == runLength) {
1387 currentRun = 0;
1388 runLength <<= 1;
1389 maxShift--;
1390 }
1391 }
1392 }
1393
1394 @Override
1395 public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1396 if (!freeList.isEmpty()) {
1397 freeList.drain(freeListCapacity, this);
1398 }
1399 int startIndex = chooseFirstFreeBuddy(1, startingCapacity, 0);
1400 if (startIndex == -1) {
1401 return false;
1402 }
1403 Chunk chunk = this;
1404 chunk.retain();
1405 try {
1406 buf.init(delegate, this, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1407 allocatedBytes += startingCapacity;
1408 chunk = null;
1409 } finally {
1410 if (chunk != null) {
1411
1412
1413 chunk.release();
1414 }
1415 }
1416 return true;
1417 }
1418
1419 @Override
1420 public void accept(int packed) {
1421
1422 int size = MIN_BUDDY_SIZE << (packed >> PACK_SIZE_SHIFT);
1423 int offset = (packed & PACK_OFFSET_MASK) * MIN_BUDDY_SIZE;
1424 unreserveMatchingBuddy(1, size, offset, 0);
1425 allocatedBytes -= size;
1426 }
1427
1428 @Override
1429 void releaseSegment(int startingIndex, int size) {
1430 int packedOffset = startingIndex / MIN_BUDDY_SIZE;
1431 int packedSize = Integer.numberOfTrailingZeros(size / MIN_BUDDY_SIZE) << PACK_SIZE_SHIFT;
1432 int packed = packedOffset | packedSize;
1433 freeList.offer(packed);
1434 release();
1435 }
1436
1437 @Override
1438 public int remainingCapacity() {
1439 if (!freeList.isEmpty()) {
1440 freeList.drain(freeListCapacity, this);
1441 }
1442 return super.remainingCapacity();
1443 }
1444
1445 @Override
1446 public boolean hasUnprocessedFreelistEntries() {
1447 return !freeList.isEmpty();
1448 }
1449
1450 @Override
1451 public void processFreelistEntries() {
1452 freeList.drain(freeListCapacity, this);
1453 }
1454
1455
1456
1457
1458 private int chooseFirstFreeBuddy(int index, int size, int currOffset) {
1459 byte[] buddies = this.buddies;
1460 while (index < buddies.length) {
1461 byte buddy = buddies[index];
1462 int currValue = MIN_BUDDY_SIZE << (buddy & SHIFT_MASK);
1463 if (currValue < size || (buddy & IS_CLAIMED) == IS_CLAIMED) {
1464 return -1;
1465 }
1466 if (currValue == size && (buddy & HAS_CLAIMED_CHILDREN) == 0) {
1467 buddies[index] |= IS_CLAIMED;
1468 return currOffset;
1469 }
1470 int found = chooseFirstFreeBuddy(index << 1, size, currOffset);
1471 if (found != -1) {
1472 buddies[index] |= HAS_CLAIMED_CHILDREN;
1473 return found;
1474 }
1475 index = (index << 1) + 1;
1476 currOffset += currValue >> 1;
1477 }
1478 return -1;
1479 }
1480
1481
1482
1483
1484 private boolean unreserveMatchingBuddy(int index, int size, int offset, int currOffset) {
1485 byte[] buddies = this.buddies;
1486 if (buddies.length <= index) {
1487 return false;
1488 }
1489 byte buddy = buddies[index];
1490 int currSize = MIN_BUDDY_SIZE << (buddy & SHIFT_MASK);
1491
1492 if (currSize == size) {
1493
1494 if (currOffset == offset) {
1495 buddies[index] &= SHIFT_MASK;
1496 return false;
1497 }
1498 throw new IllegalStateException("The intended segment was not found at index " +
1499 index + ", for size " + size + " and offset " + offset);
1500 }
1501
1502
1503 boolean claims;
1504 int siblingIndex;
1505 if (offset < currOffset + (currSize >> 1)) {
1506
1507 claims = unreserveMatchingBuddy(index << 1, size, offset, currOffset);
1508 siblingIndex = (index << 1) + 1;
1509 } else {
1510
1511 claims = unreserveMatchingBuddy((index << 1) + 1, size, offset, currOffset + (currSize >> 1));
1512 siblingIndex = index << 1;
1513 }
1514 if (!claims) {
1515
1516 byte sibling = buddies[siblingIndex];
1517 if ((sibling & SHIFT_MASK) == sibling) {
1518
1519 buddies[index] &= SHIFT_MASK;
1520 return false;
1521 }
1522 }
1523 return true;
1524 }
1525
1526 @Override
1527 public String toString() {
1528 int capacity = delegate.capacity();
1529 int remaining = capacity - allocatedBytes;
1530 return "BuddyChunk[capacity: " + capacity +
1531 ", remaining: " + remaining +
1532 ", free list: " + freeList.size() + ']';
1533 }
1534 }
1535
1536 static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
1537
1538 private final EnhancedHandle<AdaptiveByteBuf> handle;
1539
1540
1541 private int startIndex;
1542 private AbstractByteBuf rootParent;
1543 Chunk chunk;
1544 private int length;
1545 private int maxFastCapacity;
1546 private ByteBuffer tmpNioBuf;
1547 private boolean hasArray;
1548 private boolean hasMemoryAddress;
1549
1550 AdaptiveByteBuf(EnhancedHandle<AdaptiveByteBuf> recyclerHandle) {
1551 super(0);
1552 handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
1553 }
1554
1555 void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
1556 int startIndex, int size, int capacity, int maxCapacity) {
1557 this.startIndex = startIndex;
1558 chunk = wrapped;
1559 length = size;
1560 maxFastCapacity = capacity;
1561 maxCapacity(maxCapacity);
1562 setIndex0(readerIndex, writerIndex);
1563 hasArray = unwrapped.hasArray();
1564 hasMemoryAddress = unwrapped.hasMemoryAddress();
1565 rootParent = unwrapped;
1566 tmpNioBuf = null;
1567
1568 if (PlatformDependent.isJfrEnabled() && AllocateBufferEvent.isEventEnabled()) {
1569 AllocateBufferEvent event = new AllocateBufferEvent();
1570 if (event.shouldCommit()) {
1571 event.fill(this, AdaptiveByteBufAllocator.class);
1572 event.chunkPooled = wrapped.pooled;
1573 Magazine m = wrapped.magazine;
1574 event.chunkThreadLocal = m != null && m.allocationLock == null;
1575 event.commit();
1576 }
1577 }
1578 }
1579
1580 private AbstractByteBuf rootParent() {
1581 final AbstractByteBuf rootParent = this.rootParent;
1582 if (rootParent != null) {
1583 return rootParent;
1584 }
1585 throw new IllegalReferenceCountException();
1586 }
1587
1588 @Override
1589 public int capacity() {
1590 return length;
1591 }
1592
1593 @Override
1594 public int maxFastWritableBytes() {
1595 return Math.min(maxFastCapacity, maxCapacity()) - writerIndex;
1596 }
1597
1598 @Override
1599 public ByteBuf capacity(int newCapacity) {
1600 if (length <= newCapacity && newCapacity <= maxFastCapacity) {
1601 ensureAccessible();
1602 length = newCapacity;
1603 return this;
1604 }
1605 checkNewCapacity(newCapacity);
1606 if (newCapacity < capacity()) {
1607 length = newCapacity;
1608 trimIndicesToCapacity(newCapacity);
1609 return this;
1610 }
1611
1612 if (PlatformDependent.isJfrEnabled() && ReallocateBufferEvent.isEventEnabled()) {
1613 ReallocateBufferEvent event = new ReallocateBufferEvent();
1614 if (event.shouldCommit()) {
1615 event.fill(this, AdaptiveByteBufAllocator.class);
1616 event.newCapacity = newCapacity;
1617 event.commit();
1618 }
1619 }
1620
1621
1622 Chunk chunk = this.chunk;
1623 AdaptivePoolingAllocator allocator = chunk.allocator;
1624 int readerIndex = this.readerIndex;
1625 int writerIndex = this.writerIndex;
1626 int baseOldRootIndex = startIndex;
1627 int oldLength = length;
1628 int oldCapacity = maxFastCapacity;
1629 AbstractByteBuf oldRoot = rootParent();
1630 allocator.reallocate(newCapacity, maxCapacity(), this);
1631 oldRoot.getBytes(baseOldRootIndex, this, 0, oldLength);
1632 chunk.releaseSegment(baseOldRootIndex, oldCapacity);
1633 assert oldCapacity < maxFastCapacity && newCapacity <= maxFastCapacity:
1634 "Capacity increase failed";
1635 this.readerIndex = readerIndex;
1636 this.writerIndex = writerIndex;
1637 return this;
1638 }
1639
1640 @Override
1641 public ByteBufAllocator alloc() {
1642 return rootParent().alloc();
1643 }
1644
1645 @SuppressWarnings("deprecation")
1646 @Override
1647 public ByteOrder order() {
1648 return rootParent().order();
1649 }
1650
1651 @Override
1652 public ByteBuf unwrap() {
1653 return null;
1654 }
1655
1656 @Override
1657 public boolean isDirect() {
1658 return rootParent().isDirect();
1659 }
1660
1661 @Override
1662 public int arrayOffset() {
1663 return idx(rootParent().arrayOffset());
1664 }
1665
1666 @Override
1667 public boolean hasMemoryAddress() {
1668 return hasMemoryAddress;
1669 }
1670
1671 @Override
1672 public long memoryAddress() {
1673 ensureAccessible();
1674 return _memoryAddress();
1675 }
1676
1677 @Override
1678 long _memoryAddress() {
1679 AbstractByteBuf root = rootParent;
1680 return root != null ? root._memoryAddress() + startIndex : 0L;
1681 }
1682
1683 @Override
1684 public ByteBuffer nioBuffer(int index, int length) {
1685 checkIndex(index, length);
1686 return rootParent().nioBuffer(idx(index), length);
1687 }
1688
1689 @Override
1690 public ByteBuffer internalNioBuffer(int index, int length) {
1691 checkIndex(index, length);
1692 return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1693 }
1694
1695 private ByteBuffer internalNioBuffer() {
1696 if (tmpNioBuf == null) {
1697 tmpNioBuf = rootParent().nioBuffer(startIndex, maxFastCapacity);
1698 }
1699 return (ByteBuffer) tmpNioBuf.clear();
1700 }
1701
1702 @Override
1703 public ByteBuffer[] nioBuffers(int index, int length) {
1704 checkIndex(index, length);
1705 return rootParent().nioBuffers(idx(index), length);
1706 }
1707
1708 @Override
1709 public boolean hasArray() {
1710 return hasArray;
1711 }
1712
1713 @Override
1714 public byte[] array() {
1715 ensureAccessible();
1716 return rootParent().array();
1717 }
1718
1719 @Override
1720 public ByteBuf copy(int index, int length) {
1721 checkIndex(index, length);
1722 return rootParent().copy(idx(index), length);
1723 }
1724
1725 @Override
1726 public int nioBufferCount() {
1727 return rootParent().nioBufferCount();
1728 }
1729
1730 @Override
1731 protected byte _getByte(int index) {
1732 return rootParent()._getByte(idx(index));
1733 }
1734
1735 @Override
1736 protected short _getShort(int index) {
1737 return rootParent()._getShort(idx(index));
1738 }
1739
1740 @Override
1741 protected short _getShortLE(int index) {
1742 return rootParent()._getShortLE(idx(index));
1743 }
1744
1745 @Override
1746 protected int _getUnsignedMedium(int index) {
1747 return rootParent()._getUnsignedMedium(idx(index));
1748 }
1749
1750 @Override
1751 protected int _getUnsignedMediumLE(int index) {
1752 return rootParent()._getUnsignedMediumLE(idx(index));
1753 }
1754
1755 @Override
1756 protected int _getInt(int index) {
1757 return rootParent()._getInt(idx(index));
1758 }
1759
1760 @Override
1761 protected int _getIntLE(int index) {
1762 return rootParent()._getIntLE(idx(index));
1763 }
1764
1765 @Override
1766 protected long _getLong(int index) {
1767 return rootParent()._getLong(idx(index));
1768 }
1769
1770 @Override
1771 protected long _getLongLE(int index) {
1772 return rootParent()._getLongLE(idx(index));
1773 }
1774
1775 @Override
1776 public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1777 checkIndex(index, length);
1778 rootParent().getBytes(idx(index), dst, dstIndex, length);
1779 return this;
1780 }
1781
1782 @Override
1783 public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1784 checkIndex(index, length);
1785 rootParent().getBytes(idx(index), dst, dstIndex, length);
1786 return this;
1787 }
1788
1789 @Override
1790 public ByteBuf getBytes(int index, ByteBuffer dst) {
1791 checkIndex(index, dst.remaining());
1792 rootParent().getBytes(idx(index), dst);
1793 return this;
1794 }
1795
1796 @Override
1797 protected void _setByte(int index, int value) {
1798 rootParent()._setByte(idx(index), value);
1799 }
1800
1801 @Override
1802 protected void _setShort(int index, int value) {
1803 rootParent()._setShort(idx(index), value);
1804 }
1805
1806 @Override
1807 protected void _setShortLE(int index, int value) {
1808 rootParent()._setShortLE(idx(index), value);
1809 }
1810
1811 @Override
1812 protected void _setMedium(int index, int value) {
1813 rootParent()._setMedium(idx(index), value);
1814 }
1815
1816 @Override
1817 protected void _setMediumLE(int index, int value) {
1818 rootParent()._setMediumLE(idx(index), value);
1819 }
1820
1821 @Override
1822 protected void _setInt(int index, int value) {
1823 rootParent()._setInt(idx(index), value);
1824 }
1825
1826 @Override
1827 protected void _setIntLE(int index, int value) {
1828 rootParent()._setIntLE(idx(index), value);
1829 }
1830
1831 @Override
1832 protected void _setLong(int index, long value) {
1833 rootParent()._setLong(idx(index), value);
1834 }
1835
1836 @Override
1837 protected void _setLongLE(int index, long value) {
1838 rootParent().setLongLE(idx(index), value);
1839 }
1840
1841 @Override
1842 public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1843 checkIndex(index, length);
1844 if (tmpNioBuf == null && PlatformDependent.javaVersion() >= 13) {
1845 ByteBuffer dstBuffer = rootParent()._internalNioBuffer();
1846 PlatformDependent.absolutePut(dstBuffer, idx(index), src, srcIndex, length);
1847 } else {
1848 ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1849 tmp.put(src, srcIndex, length);
1850 }
1851 return this;
1852 }
1853
1854 @Override
1855 public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1856 checkIndex(index, length);
1857 if (src instanceof AdaptiveByteBuf && PlatformDependent.javaVersion() >= 16) {
1858 AdaptiveByteBuf srcBuf = (AdaptiveByteBuf) src;
1859 srcBuf.checkIndex(srcIndex, length);
1860 ByteBuffer dstBuffer = rootParent()._internalNioBuffer();
1861 ByteBuffer srcBuffer = srcBuf.rootParent()._internalNioBuffer();
1862 PlatformDependent.absolutePut(dstBuffer, idx(index), srcBuffer, srcBuf.idx(srcIndex), length);
1863 } else {
1864 ByteBuffer tmp = internalNioBuffer();
1865 tmp.position(index);
1866 tmp.put(src.nioBuffer(srcIndex, length));
1867 }
1868 return this;
1869 }
1870
1871 @Override
1872 public ByteBuf setBytes(int index, ByteBuffer src) {
1873 int length = src.remaining();
1874 checkIndex(index, length);
1875 ByteBuffer tmp = internalNioBuffer();
1876 if (PlatformDependent.javaVersion() >= 16) {
1877 int offset = src.position();
1878 PlatformDependent.absolutePut(tmp, index, src, offset, length);
1879 src.position(offset + length);
1880 } else {
1881 tmp.position(index);
1882 tmp.put(src);
1883 }
1884 return this;
1885 }
1886
1887 @Override
1888 public ByteBuf getBytes(int index, OutputStream out, int length)
1889 throws IOException {
1890 checkIndex(index, length);
1891 if (length != 0) {
1892 ByteBuffer tmp = internalNioBuffer();
1893 ByteBufUtil.readBytes(alloc(), tmp.hasArray() ? tmp : tmp.duplicate(), index, length, out);
1894 }
1895 return this;
1896 }
1897
1898 @Override
1899 public int getBytes(int index, GatheringByteChannel out, int length)
1900 throws IOException {
1901 ByteBuffer buf = internalNioBuffer().duplicate();
1902 buf.clear().position(index).limit(index + length);
1903 return out.write(buf);
1904 }
1905
1906 @Override
1907 public int getBytes(int index, FileChannel out, long position, int length)
1908 throws IOException {
1909 ByteBuffer buf = internalNioBuffer().duplicate();
1910 buf.clear().position(index).limit(index + length);
1911 return out.write(buf, position);
1912 }
1913
1914 @Override
1915 public int setBytes(int index, InputStream in, int length)
1916 throws IOException {
1917 checkIndex(index, length);
1918 final AbstractByteBuf rootParent = rootParent();
1919 if (rootParent.hasArray()) {
1920 return rootParent.setBytes(idx(index), in, length);
1921 }
1922 byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1923 int readBytes = in.read(tmp, 0, length);
1924 if (readBytes <= 0) {
1925 return readBytes;
1926 }
1927 setBytes(index, tmp, 0, readBytes);
1928 return readBytes;
1929 }
1930
1931 @Override
1932 public int setBytes(int index, ScatteringByteChannel in, int length)
1933 throws IOException {
1934 try {
1935 return in.read(internalNioBuffer(index, length));
1936 } catch (ClosedChannelException ignored) {
1937 return -1;
1938 }
1939 }
1940
1941 @Override
1942 public int setBytes(int index, FileChannel in, long position, int length)
1943 throws IOException {
1944 try {
1945 return in.read(internalNioBuffer(index, length), position);
1946 } catch (ClosedChannelException ignored) {
1947 return -1;
1948 }
1949 }
1950
1951 @Override
1952 public int setCharSequence(int index, CharSequence sequence, Charset charset) {
1953 return setCharSequence0(index, sequence, charset, false);
1954 }
1955
1956 private int setCharSequence0(int index, CharSequence sequence, Charset charset, boolean expand) {
1957 if (charset.equals(CharsetUtil.UTF_8)) {
1958 int length = ByteBufUtil.utf8MaxBytes(sequence);
1959 if (expand) {
1960 ensureWritable0(length);
1961 checkIndex0(index, length);
1962 } else {
1963 checkIndex(index, length);
1964 }
1965 return ByteBufUtil.writeUtf8(this, index, length, sequence, sequence.length());
1966 }
1967 if (charset.equals(CharsetUtil.US_ASCII) || charset.equals(CharsetUtil.ISO_8859_1)) {
1968 int length = sequence.length();
1969 if (expand) {
1970 ensureWritable0(length);
1971 checkIndex0(index, length);
1972 } else {
1973 checkIndex(index, length);
1974 }
1975 return ByteBufUtil.writeAscii(this, index, sequence, length);
1976 }
1977 byte[] bytes = sequence.toString().getBytes(charset);
1978 if (expand) {
1979 ensureWritable0(bytes.length);
1980
1981 }
1982 setBytes(index, bytes);
1983 return bytes.length;
1984 }
1985
1986 @Override
1987 public int writeCharSequence(CharSequence sequence, Charset charset) {
1988 int written = setCharSequence0(writerIndex, sequence, charset, true);
1989 writerIndex += written;
1990 return written;
1991 }
1992
1993 @Override
1994 public int forEachByte(int index, int length, ByteProcessor processor) {
1995 checkIndex(index, length);
1996 int ret = rootParent().forEachByte(idx(index), length, processor);
1997 return forEachResult(ret);
1998 }
1999
2000 @Override
2001 public int forEachByteDesc(int index, int length, ByteProcessor processor) {
2002 checkIndex(index, length);
2003 int ret = rootParent().forEachByteDesc(idx(index), length, processor);
2004 return forEachResult(ret);
2005 }
2006
2007 @Override
2008 public ByteBuf setZero(int index, int length) {
2009 checkIndex(index, length);
2010 rootParent().setZero(idx(index), length);
2011 return this;
2012 }
2013
2014 @Override
2015 public ByteBuf writeZero(int length) {
2016 ensureWritable(length);
2017 rootParent().setZero(idx(writerIndex), length);
2018 writerIndex += length;
2019 return this;
2020 }
2021
2022 private int forEachResult(int ret) {
2023 if (ret < startIndex) {
2024 return -1;
2025 }
2026 return ret - startIndex;
2027 }
2028
2029 @Override
2030 public boolean isContiguous() {
2031 return rootParent().isContiguous();
2032 }
2033
2034 private int idx(int index) {
2035 return index + startIndex;
2036 }
2037
2038 @Override
2039 protected void deallocate() {
2040 if (PlatformDependent.isJfrEnabled() && FreeBufferEvent.isEventEnabled()) {
2041 FreeBufferEvent event = new FreeBufferEvent();
2042 if (event.shouldCommit()) {
2043 event.fill(this, AdaptiveByteBufAllocator.class);
2044 event.commit();
2045 }
2046 }
2047
2048 if (chunk != null) {
2049 chunk.releaseSegment(startIndex, maxFastCapacity);
2050 }
2051 tmpNioBuf = null;
2052 chunk = null;
2053 rootParent = null;
2054 handle.unguardedRecycle(this);
2055 }
2056 }
2057
2058
2059
2060
2061 interface ChunkAllocator {
2062
2063
2064
2065
2066
2067
2068 AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
2069 }
2070 }