1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.ByteProcessor;
19 import io.netty.util.CharsetUtil;
20 import io.netty.util.IllegalReferenceCountException;
21 import io.netty.util.NettyRuntime;
22 import io.netty.util.Recycler;
23 import io.netty.util.Recycler.EnhancedHandle;
24 import io.netty.util.concurrent.ConcurrentSkipListIntObjMultimap;
25 import io.netty.util.concurrent.ConcurrentSkipListIntObjMultimap.IntEntry;
26 import io.netty.util.concurrent.FastThreadLocal;
27 import io.netty.util.concurrent.FastThreadLocalThread;
28 import io.netty.util.concurrent.MpscIntQueue;
29 import io.netty.util.internal.MathUtil;
30 import io.netty.util.internal.ObjectUtil;
31 import io.netty.util.internal.PlatformDependent;
32 import io.netty.util.internal.RefCnt;
33 import io.netty.util.internal.SystemPropertyUtil;
34 import io.netty.util.internal.ThreadExecutorMap;
35 import io.netty.util.internal.UnstableApi;
36
37 import java.io.IOException;
38 import java.io.InputStream;
39 import java.io.OutputStream;
40 import java.nio.ByteBuffer;
41 import java.nio.ByteOrder;
42 import java.nio.channels.ClosedChannelException;
43 import java.nio.channels.FileChannel;
44 import java.nio.channels.GatheringByteChannel;
45 import java.nio.channels.ScatteringByteChannel;
46 import java.nio.charset.Charset;
47 import java.util.Arrays;
48 import java.util.Iterator;
49 import java.util.Queue;
50 import java.util.concurrent.ConcurrentLinkedQueue;
51 import java.util.concurrent.atomic.AtomicInteger;
52 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
53 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
54 import java.util.concurrent.atomic.LongAdder;
55 import java.util.concurrent.locks.StampedLock;
56 import java.util.function.IntConsumer;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 @UnstableApi
85 final class AdaptivePoolingAllocator {
86 private static final int LOW_MEM_THRESHOLD = 512 * 1024 * 1024;
87 private static final boolean IS_LOW_MEM = Runtime.getRuntime().maxMemory() <= LOW_MEM_THRESHOLD;
88
89
90
91
92
93 private static final boolean DISABLE_THREAD_LOCAL_MAGAZINES_ON_LOW_MEM = SystemPropertyUtil.getBoolean(
94 "io.netty.allocator.disableThreadLocalMagazinesOnLowMemory", true);
95
96
97
98
99
100
101
102
103 static final int MIN_CHUNK_SIZE = 128 * 1024;
104 private static final int EXPANSION_ATTEMPTS = 3;
105 private static final int INITIAL_MAGAZINES = 1;
106 private static final int RETIRE_CAPACITY = 256;
107 private static final int MAX_STRIPES = IS_LOW_MEM ? 1 : NettyRuntime.availableProcessors() * 2;
108 private static final int BUFS_PER_CHUNK = 8;
109
110
111
112
113
114
115 private static final int MAX_CHUNK_SIZE = IS_LOW_MEM ?
116 2 * 1024 * 1024 :
117 8 * 1024 * 1024;
118 private static final int MAX_POOLED_BUF_SIZE = MAX_CHUNK_SIZE / BUFS_PER_CHUNK;
119
120
121
122
123
124
125 private static final int CHUNK_REUSE_QUEUE = Math.max(2, SystemPropertyUtil.getInt(
126 "io.netty.allocator.chunkReuseQueueCapacity", NettyRuntime.availableProcessors() * 2));
127
128
129
130
131
132 private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
133 "io.netty.allocator.magazineBufferQueueCapacity", 1024);
134
135
136
137
138
139
140
141
142
143
144
145
146
147 private static final int[] SIZE_CLASSES = {
148 32,
149 64,
150 128,
151 256,
152 512,
153 640,
154 1024,
155 1152,
156 2048,
157 2304,
158 4096,
159 4352,
160 8192,
161 8704,
162 16384,
163 16896,
164 };
165
166 private static final int SIZE_CLASSES_COUNT = SIZE_CLASSES.length;
167 private static final byte[] SIZE_INDEXES = new byte[SIZE_CLASSES[SIZE_CLASSES_COUNT - 1] / 32 + 1];
168
169 static {
170 if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
171 throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
172 + " (expected: >= " + 2 + ')');
173 }
174 int lastIndex = 0;
175 for (int i = 0; i < SIZE_CLASSES_COUNT; i++) {
176 int sizeClass = SIZE_CLASSES[i];
177
178 assert (sizeClass & 31) == 0 : "Size class must be a multiple of 32";
179 int sizeIndex = sizeIndexOf(sizeClass);
180 Arrays.fill(SIZE_INDEXES, lastIndex + 1, sizeIndex + 1, (byte) i);
181 lastIndex = sizeIndex;
182 }
183 }
184
185 private final ChunkAllocator chunkAllocator;
186 private final ChunkRegistry chunkRegistry;
187 private final MagazineGroup[] sizeClassedMagazineGroups;
188 private final MagazineGroup largeBufferMagazineGroup;
189 private final FastThreadLocal<MagazineGroup[]> threadLocalGroup;
190
191 AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, boolean useCacheForNonEventLoopThreads) {
192 this.chunkAllocator = ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
193 chunkRegistry = new ChunkRegistry();
194 sizeClassedMagazineGroups = createMagazineGroupSizeClasses(this, false);
195 largeBufferMagazineGroup = new MagazineGroup(
196 this, chunkAllocator, new BuddyChunkManagementStrategy(), false);
197
198 boolean disableThreadLocalGroups = IS_LOW_MEM && DISABLE_THREAD_LOCAL_MAGAZINES_ON_LOW_MEM;
199 threadLocalGroup = disableThreadLocalGroups ? null : new FastThreadLocal<MagazineGroup[]>() {
200 @Override
201 protected MagazineGroup[] initialValue() {
202 if (useCacheForNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
203 return createMagazineGroupSizeClasses(AdaptivePoolingAllocator.this, true);
204 }
205 return null;
206 }
207
208 @Override
209 protected void onRemoval(final MagazineGroup[] groups) throws Exception {
210 if (groups != null) {
211 for (MagazineGroup group : groups) {
212 group.free();
213 }
214 }
215 }
216 };
217 }
218
219 private static MagazineGroup[] createMagazineGroupSizeClasses(
220 AdaptivePoolingAllocator allocator, boolean isThreadLocal) {
221 MagazineGroup[] groups = new MagazineGroup[SIZE_CLASSES.length];
222 for (int i = 0; i < SIZE_CLASSES.length; i++) {
223 int segmentSize = SIZE_CLASSES[i];
224 groups[i] = new MagazineGroup(allocator, allocator.chunkAllocator,
225 new SizeClassChunkManagementStrategy(segmentSize), isThreadLocal);
226 }
227 return groups;
228 }
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250 private static Queue<SizeClassedChunk> createSharedChunkQueue() {
251 return PlatformDependent.newFixedMpmcQueue(CHUNK_REUSE_QUEUE);
252 }
253
254 ByteBuf allocate(int size, int maxCapacity) {
255 return allocate(size, maxCapacity, Thread.currentThread(), null);
256 }
257
258 private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
259 AdaptiveByteBuf allocated = null;
260 if (size <= MAX_POOLED_BUF_SIZE) {
261 final int index = sizeClassIndexOf(size);
262 MagazineGroup[] magazineGroups;
263 if (!FastThreadLocalThread.currentThreadWillCleanupFastThreadLocals() ||
264 IS_LOW_MEM ||
265 (magazineGroups = threadLocalGroup.get()) == null) {
266 magazineGroups = sizeClassedMagazineGroups;
267 }
268 if (index < magazineGroups.length) {
269 allocated = magazineGroups[index].allocate(size, maxCapacity, currentThread, buf);
270 } else if (!IS_LOW_MEM) {
271 allocated = largeBufferMagazineGroup.allocate(size, maxCapacity, currentThread, buf);
272 }
273 }
274 if (allocated == null) {
275 allocated = allocateFallback(size, maxCapacity, currentThread, buf);
276 }
277 return allocated;
278 }
279
280 private static int sizeIndexOf(final int size) {
281
282 return size + 31 >> 5;
283 }
284
285 static int sizeClassIndexOf(int size) {
286 int sizeIndex = sizeIndexOf(size);
287 if (sizeIndex < SIZE_INDEXES.length) {
288 return SIZE_INDEXES[sizeIndex];
289 }
290 return SIZE_CLASSES_COUNT;
291 }
292
293 static int[] getSizeClasses() {
294 return SIZE_CLASSES.clone();
295 }
296
297 private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
298
299 Magazine magazine;
300 if (buf != null) {
301 Chunk chunk = buf.chunk;
302 if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
303 magazine = getFallbackMagazine(currentThread);
304 }
305 } else {
306 magazine = getFallbackMagazine(currentThread);
307 buf = magazine.newBuffer();
308 }
309
310 AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
311 Chunk chunk = new Chunk(innerChunk, magazine, false);
312 chunkRegistry.add(chunk);
313 try {
314 boolean success = chunk.readInitInto(buf, size, size, maxCapacity);
315 assert success: "Failed to initialize ByteBuf with dedicated chunk";
316 } finally {
317
318
319
320 chunk.release();
321 }
322 return buf;
323 }
324
325 private Magazine getFallbackMagazine(Thread currentThread) {
326 Magazine[] mags = largeBufferMagazineGroup.magazines;
327 return mags[(int) currentThread.getId() & mags.length - 1];
328 }
329
330
331
332
333 void reallocate(int size, int maxCapacity, AdaptiveByteBuf into) {
334 AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
335 assert result == into: "Re-allocation created separate buffer instance";
336 }
337
338 long usedMemory() {
339 return chunkRegistry.totalCapacity();
340 }
341
342
343
344
345 @SuppressWarnings({"FinalizeDeclaration", "deprecation"})
346 @Override
347 protected void finalize() throws Throwable {
348 try {
349 free();
350 } finally {
351 super.finalize();
352 }
353 }
354
355 private void free() {
356 largeBufferMagazineGroup.free();
357 }
358
359 private static final class MagazineGroup {
360 private final AdaptivePoolingAllocator allocator;
361 private final ChunkAllocator chunkAllocator;
362 private final ChunkManagementStrategy chunkManagementStrategy;
363 private final ChunkCache chunkCache;
364 private final StampedLock magazineExpandLock;
365 private final Magazine threadLocalMagazine;
366 private Thread ownerThread;
367 private volatile Magazine[] magazines;
368 private volatile boolean freed;
369
370 MagazineGroup(AdaptivePoolingAllocator allocator,
371 ChunkAllocator chunkAllocator,
372 ChunkManagementStrategy chunkManagementStrategy,
373 boolean isThreadLocal) {
374 this.allocator = allocator;
375 this.chunkAllocator = chunkAllocator;
376 this.chunkManagementStrategy = chunkManagementStrategy;
377 chunkCache = chunkManagementStrategy.createChunkCache(isThreadLocal);
378 if (isThreadLocal) {
379 ownerThread = Thread.currentThread();
380 magazineExpandLock = null;
381 threadLocalMagazine = new Magazine(this, false, chunkManagementStrategy.createController(this));
382 } else {
383 ownerThread = null;
384 magazineExpandLock = new StampedLock();
385 threadLocalMagazine = null;
386 Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
387 for (int i = 0; i < mags.length; i++) {
388 mags[i] = new Magazine(this, true, chunkManagementStrategy.createController(this));
389 }
390 magazines = mags;
391 }
392 }
393
394 public AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
395 boolean reallocate = buf != null;
396
397
398 Magazine tlMag = threadLocalMagazine;
399 if (tlMag != null) {
400 if (buf == null) {
401 buf = tlMag.newBuffer();
402 }
403 boolean allocated = tlMag.tryAllocate(size, maxCapacity, buf, reallocate);
404 assert allocated : "Allocation of threadLocalMagazine must always succeed";
405 return buf;
406 }
407
408
409 long threadId = currentThread.getId();
410 Magazine[] mags;
411 int expansions = 0;
412 do {
413 mags = magazines;
414 int mask = mags.length - 1;
415 int index = (int) (threadId & mask);
416 for (int i = 0, m = mags.length << 1; i < m; i++) {
417 Magazine mag = mags[index + i & mask];
418 if (buf == null) {
419 buf = mag.newBuffer();
420 }
421 if (mag.tryAllocate(size, maxCapacity, buf, reallocate)) {
422
423 return buf;
424 }
425 }
426 expansions++;
427 } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
428
429
430 if (!reallocate && buf != null) {
431 buf.release();
432 }
433 return null;
434 }
435
436 private boolean tryExpandMagazines(int currentLength) {
437 if (currentLength >= MAX_STRIPES) {
438 return true;
439 }
440 final Magazine[] mags;
441 long writeLock = magazineExpandLock.tryWriteLock();
442 if (writeLock != 0) {
443 try {
444 mags = magazines;
445 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
446 return true;
447 }
448 Magazine[] expanded = new Magazine[mags.length * 2];
449 for (int i = 0, l = expanded.length; i < l; i++) {
450 expanded[i] = new Magazine(this, true, chunkManagementStrategy.createController(this));
451 }
452 magazines = expanded;
453 } finally {
454 magazineExpandLock.unlockWrite(writeLock);
455 }
456 for (Magazine magazine : mags) {
457 magazine.free();
458 }
459 }
460 return true;
461 }
462
463 Chunk pollChunk(int size) {
464 return chunkCache.pollChunk(size);
465 }
466
467 boolean offerChunk(Chunk chunk) {
468 if (freed) {
469 return false;
470 }
471
472 if (chunk.hasUnprocessedFreelistEntries()) {
473 chunk.processFreelistEntries();
474 }
475 boolean isAdded = chunkCache.offerChunk(chunk);
476
477 if (freed && isAdded) {
478
479 freeChunkReuseQueue(ownerThread);
480 }
481 return isAdded;
482 }
483
484 private void free() {
485 freed = true;
486 Thread ownerThread = this.ownerThread;
487 if (threadLocalMagazine != null) {
488 this.ownerThread = null;
489 threadLocalMagazine.free();
490 } else {
491 long stamp = magazineExpandLock.writeLock();
492 try {
493 Magazine[] mags = magazines;
494 for (Magazine magazine : mags) {
495 magazine.free();
496 }
497 } finally {
498 magazineExpandLock.unlockWrite(stamp);
499 }
500 }
501 freeChunkReuseQueue(ownerThread);
502 }
503
504 private void freeChunkReuseQueue(Thread ownerThread) {
505 Chunk chunk;
506 while ((chunk = chunkCache.pollChunk(0)) != null) {
507 if (ownerThread != null && chunk instanceof SizeClassedChunk) {
508 SizeClassedChunk threadLocalChunk = (SizeClassedChunk) chunk;
509 assert ownerThread == threadLocalChunk.ownerThread;
510
511
512
513 threadLocalChunk.ownerThread = null;
514 }
515 chunk.markToDeallocate();
516 }
517 }
518 }
519
520 private interface ChunkCache {
521 Chunk pollChunk(int size);
522 boolean offerChunk(Chunk chunk);
523 }
524
525 private static final class ConcurrentQueueChunkCache implements ChunkCache {
526 private final Queue<SizeClassedChunk> queue;
527
528 private ConcurrentQueueChunkCache() {
529 queue = createSharedChunkQueue();
530 }
531
532 @Override
533 public SizeClassedChunk pollChunk(int size) {
534
535
536 Queue<SizeClassedChunk> queue = this.queue;
537 for (int i = 0; i < CHUNK_REUSE_QUEUE; i++) {
538 SizeClassedChunk chunk = queue.poll();
539 if (chunk == null) {
540 return null;
541 }
542 if (chunk.hasRemainingCapacity()) {
543 return chunk;
544 }
545 queue.offer(chunk);
546 }
547 return null;
548 }
549
550 @Override
551 public boolean offerChunk(Chunk chunk) {
552 return queue.offer((SizeClassedChunk) chunk);
553 }
554 }
555
556 private static final class ConcurrentSkipListChunkCache implements ChunkCache {
557 private final ConcurrentSkipListIntObjMultimap<Chunk> chunks;
558
559 private ConcurrentSkipListChunkCache() {
560 chunks = new ConcurrentSkipListIntObjMultimap<>(-1);
561 }
562
563 @Override
564 public Chunk pollChunk(int size) {
565 if (chunks.isEmpty()) {
566 return null;
567 }
568 IntEntry<Chunk> entry = chunks.pollCeilingEntry(size);
569 if (entry != null) {
570 Chunk chunk = entry.getValue();
571 if (chunk.hasUnprocessedFreelistEntries()) {
572 chunk.processFreelistEntries();
573 }
574 return chunk;
575 }
576
577 Chunk bestChunk = null;
578 int bestRemainingCapacity = 0;
579 Iterator<IntEntry<Chunk>> itr = chunks.iterator();
580 while (itr.hasNext()) {
581 entry = itr.next();
582 final Chunk chunk;
583 if (entry != null && (chunk = entry.getValue()).hasUnprocessedFreelistEntries()) {
584 if (!chunks.remove(entry.getKey(), entry.getValue())) {
585 continue;
586 }
587 chunk.processFreelistEntries();
588 int remainingCapacity = chunk.remainingCapacity();
589 if (remainingCapacity >= size &&
590 (bestChunk == null || remainingCapacity > bestRemainingCapacity)) {
591 if (bestChunk != null) {
592 chunks.put(bestRemainingCapacity, bestChunk);
593 }
594 bestChunk = chunk;
595 bestRemainingCapacity = remainingCapacity;
596 } else {
597 chunks.put(remainingCapacity, chunk);
598 }
599 }
600 }
601
602 return bestChunk;
603 }
604
605 @Override
606 public boolean offerChunk(Chunk chunk) {
607 chunks.put(chunk.remainingCapacity(), chunk);
608
609 int size = chunks.size();
610 while (size > CHUNK_REUSE_QUEUE) {
611
612 int key = -1;
613 Chunk toDeallocate = null;
614 for (IntEntry<Chunk> entry : chunks) {
615 Chunk candidate = entry.getValue();
616 if (candidate != null) {
617 if (toDeallocate == null) {
618 toDeallocate = candidate;
619 key = entry.getKey();
620 } else {
621 int candidateRefCnt = RefCnt.refCnt(candidate.refCnt);
622 int toDeallocateRefCnt = RefCnt.refCnt(toDeallocate.refCnt);
623 if (candidateRefCnt < toDeallocateRefCnt ||
624 candidateRefCnt == toDeallocateRefCnt &&
625 candidate.capacity() < toDeallocate.capacity()) {
626 toDeallocate = candidate;
627 key = entry.getKey();
628 }
629 }
630 }
631 }
632 if (toDeallocate == null) {
633 break;
634 }
635 if (chunks.remove(key, toDeallocate)) {
636 toDeallocate.markToDeallocate();
637 }
638 size = chunks.size();
639 }
640 return true;
641 }
642 }
643
644 private interface ChunkManagementStrategy {
645 ChunkController createController(MagazineGroup group);
646
647 ChunkCache createChunkCache(boolean isThreadLocal);
648 }
649
650 private interface ChunkController {
651
652
653
654 int computeBufferCapacity(int requestedSize, int maxCapacity, boolean isReallocation);
655
656
657
658
659 Chunk newChunkAllocation(int promptingSize, Magazine magazine);
660 }
661
662 private static final class SizeClassChunkManagementStrategy implements ChunkManagementStrategy {
663
664
665
666 private static final int MIN_SEGMENTS_PER_CHUNK = 32;
667 private final int segmentSize;
668 private final int chunkSize;
669
670 private SizeClassChunkManagementStrategy(int segmentSize) {
671 this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
672 chunkSize = Math.max(MIN_CHUNK_SIZE, segmentSize * MIN_SEGMENTS_PER_CHUNK);
673 }
674
675 @Override
676 public ChunkController createController(MagazineGroup group) {
677 return new SizeClassChunkController(group, segmentSize, chunkSize);
678 }
679
680 @Override
681 public ChunkCache createChunkCache(boolean isThreadLocal) {
682 return new ConcurrentQueueChunkCache();
683 }
684 }
685
686 private static final class SizeClassChunkController implements ChunkController {
687
688 private final ChunkAllocator chunkAllocator;
689 private final int segmentSize;
690 private final int chunkSize;
691 private final ChunkRegistry chunkRegistry;
692
693 private SizeClassChunkController(MagazineGroup group, int segmentSize, int chunkSize) {
694 chunkAllocator = group.chunkAllocator;
695 this.segmentSize = segmentSize;
696 this.chunkSize = chunkSize;
697 chunkRegistry = group.allocator.chunkRegistry;
698 }
699
700 private MpscIntQueue createEmptyFreeList() {
701 return MpscIntQueue.create(chunkSize / segmentSize, SizeClassedChunk.FREE_LIST_EMPTY);
702 }
703
704 private MpscIntQueue createFreeList() {
705 final int segmentsCount = chunkSize / segmentSize;
706 final MpscIntQueue freeList = MpscIntQueue.create(segmentsCount, SizeClassedChunk.FREE_LIST_EMPTY);
707 int segmentOffset = 0;
708 for (int i = 0; i < segmentsCount; i++) {
709 freeList.offer(segmentOffset);
710 segmentOffset += segmentSize;
711 }
712 return freeList;
713 }
714
715 private IntStack createLocalFreeList() {
716 final int segmentsCount = chunkSize / segmentSize;
717 int segmentOffset = chunkSize;
718 int[] offsets = new int[segmentsCount];
719 for (int i = 0; i < segmentsCount; i++) {
720 segmentOffset -= segmentSize;
721 offsets[i] = segmentOffset;
722 }
723 return new IntStack(offsets);
724 }
725
726 @Override
727 public int computeBufferCapacity(
728 int requestedSize, int maxCapacity, boolean isReallocation) {
729 return Math.min(segmentSize, maxCapacity);
730 }
731
732 @Override
733 public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
734 AbstractByteBuf chunkBuffer = chunkAllocator.allocate(chunkSize, chunkSize);
735 assert chunkBuffer.capacity() == chunkSize;
736 SizeClassedChunk chunk = new SizeClassedChunk(chunkBuffer, magazine, this);
737 chunkRegistry.add(chunk);
738 return chunk;
739 }
740 }
741
742 private static final class BuddyChunkManagementStrategy implements ChunkManagementStrategy {
743 private final AtomicInteger maxChunkSize = new AtomicInteger();
744
745 @Override
746 public ChunkController createController(MagazineGroup group) {
747 return new BuddyChunkController(group, maxChunkSize);
748 }
749
750 @Override
751 public ChunkCache createChunkCache(boolean isThreadLocal) {
752 return new ConcurrentSkipListChunkCache();
753 }
754 }
755
756 private static final class BuddyChunkController implements ChunkController {
757 private final ChunkAllocator chunkAllocator;
758 private final ChunkRegistry chunkRegistry;
759 private final AtomicInteger maxChunkSize;
760
761 BuddyChunkController(MagazineGroup group, AtomicInteger maxChunkSize) {
762 chunkAllocator = group.chunkAllocator;
763 chunkRegistry = group.allocator.chunkRegistry;
764 this.maxChunkSize = maxChunkSize;
765 }
766
767 @Override
768 public int computeBufferCapacity(int requestedSize, int maxCapacity, boolean isReallocation) {
769 return MathUtil.safeFindNextPositivePowerOfTwo(requestedSize);
770 }
771
772 @Override
773 public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
774 int maxChunkSize = this.maxChunkSize.get();
775 int proposedChunkSize = MathUtil.safeFindNextPositivePowerOfTwo(BUFS_PER_CHUNK * promptingSize);
776 int chunkSize = Math.min(MAX_CHUNK_SIZE, Math.max(maxChunkSize, proposedChunkSize));
777 if (chunkSize > maxChunkSize) {
778
779 this.maxChunkSize.set(chunkSize);
780 }
781 BuddyChunk chunk = new BuddyChunk(chunkAllocator.allocate(chunkSize, chunkSize), magazine);
782 chunkRegistry.add(chunk);
783 return chunk;
784 }
785 }
786
787 private static final class Magazine {
788 private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
789 static {
790 NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
791 }
792 private static final Chunk MAGAZINE_FREED = new Chunk();
793
794 private static final class AdaptiveRecycler extends Recycler<AdaptiveByteBuf> {
795
796 private AdaptiveRecycler(boolean unguarded) {
797
798 super(unguarded);
799 }
800
801 private AdaptiveRecycler(int maxCapacity, boolean unguarded) {
802
803 super(maxCapacity, unguarded);
804 }
805
806 @Override
807 protected AdaptiveByteBuf newObject(final Handle<AdaptiveByteBuf> handle) {
808 return new AdaptiveByteBuf((EnhancedHandle<AdaptiveByteBuf>) handle);
809 }
810
811 public static AdaptiveRecycler threadLocal() {
812 return new AdaptiveRecycler(true);
813 }
814
815 public static AdaptiveRecycler sharedWith(int maxCapacity) {
816 return new AdaptiveRecycler(maxCapacity, true);
817 }
818 }
819
820 private static final AdaptiveRecycler EVENT_LOOP_LOCAL_BUFFER_POOL = AdaptiveRecycler.threadLocal();
821
822 private Chunk current;
823 @SuppressWarnings("unused")
824 private volatile Chunk nextInLine;
825 private final MagazineGroup group;
826 private final ChunkController chunkController;
827 private final StampedLock allocationLock;
828 private final AdaptiveRecycler recycler;
829
830 Magazine(MagazineGroup group, boolean shareable, ChunkController chunkController) {
831 this.group = group;
832 this.chunkController = chunkController;
833
834 if (shareable) {
835
836 allocationLock = new StampedLock();
837 recycler = AdaptiveRecycler.sharedWith(MAGAZINE_BUFFER_QUEUE_CAPACITY);
838 } else {
839 allocationLock = null;
840 recycler = null;
841 }
842 }
843
844 public boolean tryAllocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
845 if (allocationLock == null) {
846
847 return allocate(size, maxCapacity, buf, reallocate);
848 }
849
850
851 long writeLock = allocationLock.tryWriteLock();
852 if (writeLock != 0) {
853 try {
854 return allocate(size, maxCapacity, buf, reallocate);
855 } finally {
856 allocationLock.unlockWrite(writeLock);
857 }
858 }
859 return allocateWithoutLock(size, maxCapacity, buf);
860 }
861
862 private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf buf) {
863 Chunk curr = NEXT_IN_LINE.getAndSet(this, null);
864 if (curr == MAGAZINE_FREED) {
865
866 restoreMagazineFreed();
867 return false;
868 }
869 if (curr == null) {
870 curr = group.pollChunk(size);
871 if (curr == null) {
872 return false;
873 }
874 curr.attachToMagazine(this);
875 }
876 boolean allocated = false;
877 int remainingCapacity = curr.remainingCapacity();
878 int startingCapacity = chunkController.computeBufferCapacity(
879 size, maxCapacity, true );
880 if (remainingCapacity >= size &&
881 curr.readInitInto(buf, size, Math.min(remainingCapacity, startingCapacity), maxCapacity)) {
882 allocated = true;
883 remainingCapacity = curr.remainingCapacity();
884 }
885 try {
886 if (remainingCapacity >= RETIRE_CAPACITY) {
887 transferToNextInLineOrRelease(curr);
888 curr = null;
889 }
890 } finally {
891 if (curr != null) {
892 curr.releaseFromMagazine();
893 }
894 }
895 return allocated;
896 }
897
898 private boolean allocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
899 int startingCapacity = chunkController.computeBufferCapacity(size, maxCapacity, reallocate);
900 Chunk curr = current;
901 if (curr != null) {
902 boolean success = curr.readInitInto(buf, size, startingCapacity, maxCapacity);
903 int remainingCapacity = curr.remainingCapacity();
904 if (!success && remainingCapacity > 0) {
905 current = null;
906 transferToNextInLineOrRelease(curr);
907 } else if (remainingCapacity == 0) {
908 current = null;
909 curr.releaseFromMagazine();
910 }
911 if (success) {
912 return true;
913 }
914 }
915
916 assert current == null;
917
918
919
920
921
922
923
924
925 curr = NEXT_IN_LINE.getAndSet(this, null);
926 if (curr != null) {
927 if (curr == MAGAZINE_FREED) {
928
929 restoreMagazineFreed();
930 return false;
931 }
932
933 int remainingCapacity = curr.remainingCapacity();
934 if (remainingCapacity > startingCapacity &&
935 curr.readInitInto(buf, size, startingCapacity, maxCapacity)) {
936
937 current = curr;
938 return true;
939 }
940
941 try {
942 if (remainingCapacity >= size) {
943
944
945 return curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
946 }
947 } finally {
948
949
950 curr.releaseFromMagazine();
951 }
952 }
953
954
955 curr = group.pollChunk(size);
956 if (curr == null) {
957 curr = chunkController.newChunkAllocation(size, this);
958 } else {
959 curr.attachToMagazine(this);
960
961 int remainingCapacity = curr.remainingCapacity();
962 if (remainingCapacity == 0 || remainingCapacity < size) {
963
964 if (remainingCapacity < RETIRE_CAPACITY) {
965 curr.releaseFromMagazine();
966 } else {
967
968
969 transferToNextInLineOrRelease(curr);
970 }
971 curr = chunkController.newChunkAllocation(size, this);
972 }
973 }
974
975 current = curr;
976 boolean success;
977 try {
978 int remainingCapacity = curr.remainingCapacity();
979 assert remainingCapacity >= size;
980 if (remainingCapacity > startingCapacity) {
981 success = curr.readInitInto(buf, size, startingCapacity, maxCapacity);
982 curr = null;
983 } else {
984 success = curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
985 }
986 } finally {
987 if (curr != null) {
988
989
990 curr.releaseFromMagazine();
991 current = null;
992 }
993 }
994 return success;
995 }
996
997 private void restoreMagazineFreed() {
998 Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
999 if (next != null && next != MAGAZINE_FREED) {
1000
1001 next.releaseFromMagazine();
1002 }
1003 }
1004
1005 private void transferToNextInLineOrRelease(Chunk chunk) {
1006 if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
1007 return;
1008 }
1009
1010 Chunk nextChunk = NEXT_IN_LINE.get(this);
1011 if (nextChunk != null && nextChunk != MAGAZINE_FREED
1012 && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
1013 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
1014 nextChunk.releaseFromMagazine();
1015 return;
1016 }
1017 }
1018
1019
1020
1021
1022 chunk.releaseFromMagazine();
1023 }
1024
1025 void free() {
1026
1027 restoreMagazineFreed();
1028 long stamp = allocationLock != null ? allocationLock.writeLock() : 0;
1029 try {
1030 if (current != null) {
1031 current.releaseFromMagazine();
1032 current = null;
1033 }
1034 } finally {
1035 if (allocationLock != null) {
1036 allocationLock.unlockWrite(stamp);
1037 }
1038 }
1039 }
1040
1041 public AdaptiveByteBuf newBuffer() {
1042 AdaptiveRecycler recycler = this.recycler;
1043 AdaptiveByteBuf buf = recycler == null? EVENT_LOOP_LOCAL_BUFFER_POOL.get() : recycler.get();
1044 buf.resetRefCnt();
1045 buf.discardMarks();
1046 return buf;
1047 }
1048
1049 boolean offerToQueue(Chunk chunk) {
1050 return group.offerChunk(chunk);
1051 }
1052 }
1053
1054 private static final class ChunkRegistry {
1055 private final LongAdder totalCapacity = new LongAdder();
1056
1057 public long totalCapacity() {
1058 return totalCapacity.sum();
1059 }
1060
1061 public void add(Chunk chunk) {
1062 totalCapacity.add(chunk.capacity());
1063 }
1064
1065 public void remove(Chunk chunk) {
1066 totalCapacity.add(-chunk.capacity());
1067 }
1068 }
1069
1070 private static class Chunk implements ChunkInfo {
1071 protected final AbstractByteBuf delegate;
1072 protected Magazine magazine;
1073 private final AdaptivePoolingAllocator allocator;
1074
1075
1076 private final RefCnt refCnt = new RefCnt();
1077 private final int capacity;
1078 private final boolean pooled;
1079 protected int allocatedBytes;
1080
1081 Chunk() {
1082
1083 delegate = null;
1084 magazine = null;
1085 allocator = null;
1086 capacity = 0;
1087 pooled = false;
1088 }
1089
1090 Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
1091 this.delegate = delegate;
1092 this.pooled = pooled;
1093 capacity = delegate.capacity();
1094 attachToMagazine(magazine);
1095
1096
1097 allocator = magazine.group.allocator;
1098
1099 if (PlatformDependent.isJfrEnabled() && AllocateChunkEvent.isEventEnabled()) {
1100 AllocateChunkEvent event = new AllocateChunkEvent();
1101 if (event.shouldCommit()) {
1102 event.fill(this, AdaptiveByteBufAllocator.class);
1103 event.pooled = pooled;
1104 event.threadLocal = magazine.allocationLock == null;
1105 event.commit();
1106 }
1107 }
1108 }
1109
1110 Magazine currentMagazine() {
1111 return magazine;
1112 }
1113
1114 void detachFromMagazine() {
1115 if (magazine != null) {
1116 magazine = null;
1117 }
1118 }
1119
1120 void attachToMagazine(Magazine magazine) {
1121 assert this.magazine == null;
1122 this.magazine = magazine;
1123 }
1124
1125
1126
1127
1128 void releaseFromMagazine() {
1129
1130
1131 Magazine mag = magazine;
1132 detachFromMagazine();
1133 if (!mag.offerToQueue(this)) {
1134 markToDeallocate();
1135 }
1136 }
1137
1138
1139
1140
1141 void releaseSegment(int ignoredSegmentId, int size) {
1142 release();
1143 }
1144
1145 void markToDeallocate() {
1146 release();
1147 }
1148
1149 private void retain() {
1150 RefCnt.retain(refCnt);
1151 }
1152
1153 protected boolean release() {
1154 boolean deallocate = RefCnt.release(refCnt);
1155 if (deallocate) {
1156 deallocate();
1157 }
1158 return deallocate;
1159 }
1160
1161 protected void deallocate() {
1162 onRelease();
1163 allocator.chunkRegistry.remove(this);
1164 delegate.release();
1165 }
1166
1167 private void onRelease() {
1168 if (PlatformDependent.isJfrEnabled() && FreeChunkEvent.isEventEnabled()) {
1169 FreeChunkEvent event = new FreeChunkEvent();
1170 if (event.shouldCommit()) {
1171 event.fill(this, AdaptiveByteBufAllocator.class);
1172 event.pooled = pooled;
1173 event.commit();
1174 }
1175 }
1176 }
1177
1178 public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1179 int startIndex = allocatedBytes;
1180 allocatedBytes = startIndex + startingCapacity;
1181 Chunk chunk = this;
1182 chunk.retain();
1183 try {
1184 buf.init(delegate, chunk, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1185 chunk = null;
1186 } finally {
1187 if (chunk != null) {
1188
1189
1190
1191 allocatedBytes = startIndex;
1192 chunk.release();
1193 }
1194 }
1195 return true;
1196 }
1197
1198 public int remainingCapacity() {
1199 return capacity - allocatedBytes;
1200 }
1201
1202 public boolean hasUnprocessedFreelistEntries() {
1203 return false;
1204 }
1205
1206 public void processFreelistEntries() {
1207 }
1208
1209 @Override
1210 public int capacity() {
1211 return capacity;
1212 }
1213
1214 @Override
1215 public boolean isDirect() {
1216 return delegate.isDirect();
1217 }
1218
1219 @Override
1220 public long memoryAddress() {
1221 return delegate._memoryAddress();
1222 }
1223 }
1224
1225 private static final class IntStack {
1226
1227 private final int[] stack;
1228 private int top;
1229
1230 IntStack(int[] initialValues) {
1231 stack = initialValues;
1232 top = initialValues.length - 1;
1233 }
1234
1235 public boolean isEmpty() {
1236 return top == -1;
1237 }
1238
1239 public int pop() {
1240 final int last = stack[top];
1241 top--;
1242 return last;
1243 }
1244
1245 public void push(int value) {
1246 stack[top + 1] = value;
1247 top++;
1248 }
1249
1250 public int size() {
1251 return top + 1;
1252 }
1253 }
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274 private static final class SizeClassedChunk extends Chunk {
1275 private static final int FREE_LIST_EMPTY = -1;
1276 private static final int AVAILABLE = -1;
1277
1278
1279 private static final int DEALLOCATED = Integer.MIN_VALUE;
1280 private static final AtomicIntegerFieldUpdater<SizeClassedChunk> STATE =
1281 AtomicIntegerFieldUpdater.newUpdater(SizeClassedChunk.class, "state");
1282 private volatile int state;
1283 private final int segments;
1284 private final int segmentSize;
1285 private final MpscIntQueue externalFreeList;
1286 private final IntStack localFreeList;
1287 private Thread ownerThread;
1288
1289 SizeClassedChunk(AbstractByteBuf delegate, Magazine magazine,
1290 SizeClassChunkController controller) {
1291 super(delegate, magazine, true);
1292 segmentSize = controller.segmentSize;
1293 segments = controller.chunkSize / segmentSize;
1294 STATE.lazySet(this, AVAILABLE);
1295 ownerThread = magazine.group.ownerThread;
1296 if (ownerThread == null) {
1297 externalFreeList = controller.createFreeList();
1298 localFreeList = null;
1299 } else {
1300 externalFreeList = controller.createEmptyFreeList();
1301 localFreeList = controller.createLocalFreeList();
1302 }
1303 }
1304
1305 @Override
1306 public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1307 assert state == AVAILABLE;
1308 final int startIndex = nextAvailableSegmentOffset();
1309 if (startIndex == FREE_LIST_EMPTY) {
1310 return false;
1311 }
1312 allocatedBytes += segmentSize;
1313 try {
1314 buf.init(delegate, this, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1315 } catch (Throwable t) {
1316 allocatedBytes -= segmentSize;
1317 releaseSegmentOffsetIntoFreeList(startIndex);
1318 throw t;
1319 }
1320 return true;
1321 }
1322
1323 private int nextAvailableSegmentOffset() {
1324 final int startIndex;
1325 IntStack localFreeList = this.localFreeList;
1326 if (localFreeList != null) {
1327 assert Thread.currentThread() == ownerThread;
1328 if (localFreeList.isEmpty()) {
1329 startIndex = externalFreeList.poll();
1330 } else {
1331 startIndex = localFreeList.pop();
1332 }
1333 } else {
1334 startIndex = externalFreeList.poll();
1335 }
1336 return startIndex;
1337 }
1338
1339
1340
1341
1342 public boolean hasRemainingCapacity() {
1343 int remaining = super.remainingCapacity();
1344 if (remaining > 0) {
1345 return true;
1346 }
1347 if (localFreeList != null) {
1348 return !localFreeList.isEmpty();
1349 }
1350 return !externalFreeList.isEmpty();
1351 }
1352
1353 @Override
1354 public int remainingCapacity() {
1355 int remaining = super.remainingCapacity();
1356 return remaining > segmentSize ? remaining : updateRemainingCapacity(remaining);
1357 }
1358
1359 private int updateRemainingCapacity(int snapshotted) {
1360 int freeSegments = externalFreeList.size();
1361 IntStack localFreeList = this.localFreeList;
1362 if (localFreeList != null) {
1363 freeSegments += localFreeList.size();
1364 }
1365 int updated = freeSegments * segmentSize;
1366 if (updated != snapshotted) {
1367 allocatedBytes = capacity() - updated;
1368 }
1369 return updated;
1370 }
1371
1372 private void releaseSegmentOffsetIntoFreeList(int startIndex) {
1373 IntStack localFreeList = this.localFreeList;
1374 if (localFreeList != null && Thread.currentThread() == ownerThread) {
1375 localFreeList.push(startIndex);
1376 } else {
1377 boolean segmentReturned = externalFreeList.offer(startIndex);
1378 assert segmentReturned : "Unable to return segment " + startIndex + " to free list";
1379 }
1380 }
1381
1382 @Override
1383 void releaseSegment(int startIndex, int size) {
1384 IntStack localFreeList = this.localFreeList;
1385 if (localFreeList != null && Thread.currentThread() == ownerThread) {
1386 localFreeList.push(startIndex);
1387 int state = this.state;
1388 if (state != AVAILABLE) {
1389 updateStateOnLocalReleaseSegment(state, localFreeList);
1390 }
1391 } else {
1392 boolean segmentReturned = externalFreeList.offer(startIndex);
1393 assert segmentReturned;
1394
1395 int state = this.state;
1396 if (state != AVAILABLE) {
1397 deallocateIfNeeded(state);
1398 }
1399 }
1400 }
1401
1402 private void updateStateOnLocalReleaseSegment(int previousLocalSize, IntStack localFreeList) {
1403 int newLocalSize = localFreeList.size();
1404 boolean alwaysTrue = STATE.compareAndSet(this, previousLocalSize, newLocalSize);
1405 assert alwaysTrue : "this shouldn't happen unless double release in the local free list";
1406 deallocateIfNeeded(newLocalSize);
1407 }
1408
1409 private void deallocateIfNeeded(int localSize) {
1410
1411 int totalFreeSegments = localSize + externalFreeList.size();
1412 if (totalFreeSegments == segments && STATE.compareAndSet(this, localSize, DEALLOCATED)) {
1413 deallocate();
1414 }
1415 }
1416
1417 @Override
1418 void markToDeallocate() {
1419 IntStack localFreeList = this.localFreeList;
1420 int localSize = localFreeList != null ? localFreeList.size() : 0;
1421 STATE.set(this, localSize);
1422 deallocateIfNeeded(localSize);
1423 }
1424 }
1425
1426 private static final class BuddyChunk extends Chunk implements IntConsumer {
1427 private static final int MIN_BUDDY_SIZE = 32768;
1428 private static final byte IS_CLAIMED = (byte) (1 << 7);
1429 private static final byte HAS_CLAIMED_CHILDREN = 1 << 6;
1430 private static final byte SHIFT_MASK = ~(IS_CLAIMED | HAS_CLAIMED_CHILDREN);
1431 private static final int PACK_OFFSET_MASK = 0xFFFF;
1432 private static final int PACK_SIZE_SHIFT = Integer.SIZE - Integer.numberOfLeadingZeros(PACK_OFFSET_MASK);
1433
1434 private final MpscIntQueue freeList;
1435
1436 private final byte[] buddies;
1437 private final int freeListCapacity;
1438
1439 BuddyChunk(AbstractByteBuf delegate, Magazine magazine) {
1440 super(delegate, magazine, true);
1441 freeListCapacity = delegate.capacity() / MIN_BUDDY_SIZE;
1442 int maxShift = Integer.numberOfTrailingZeros(freeListCapacity);
1443 assert maxShift <= 30;
1444 freeList = MpscIntQueue.create(freeListCapacity, -1);
1445 buddies = new byte[freeListCapacity << 1];
1446
1447
1448 int index = 1;
1449 int runLength = 1;
1450 int currentRun = 0;
1451 while (maxShift > 0) {
1452 buddies[index++] = (byte) maxShift;
1453 if (++currentRun == runLength) {
1454 currentRun = 0;
1455 runLength <<= 1;
1456 maxShift--;
1457 }
1458 }
1459 }
1460
1461 @Override
1462 public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1463 if (!freeList.isEmpty()) {
1464 freeList.drain(freeListCapacity, this);
1465 }
1466 int startIndex = chooseFirstFreeBuddy(1, startingCapacity, 0);
1467 if (startIndex == -1) {
1468 return false;
1469 }
1470 Chunk chunk = this;
1471 chunk.retain();
1472 try {
1473 buf.init(delegate, this, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1474 allocatedBytes += startingCapacity;
1475 chunk = null;
1476 } finally {
1477 if (chunk != null) {
1478 unreserveMatchingBuddy(1, startingCapacity, startIndex, 0);
1479
1480
1481 chunk.release();
1482 }
1483 }
1484 return true;
1485 }
1486
1487 @Override
1488 public void accept(int packed) {
1489
1490 int size = MIN_BUDDY_SIZE << (packed >> PACK_SIZE_SHIFT);
1491 int offset = (packed & PACK_OFFSET_MASK) * MIN_BUDDY_SIZE;
1492 unreserveMatchingBuddy(1, size, offset, 0);
1493 allocatedBytes -= size;
1494 }
1495
1496 @Override
1497 void releaseSegment(int startingIndex, int size) {
1498 int packedOffset = startingIndex / MIN_BUDDY_SIZE;
1499 int packedSize = Integer.numberOfTrailingZeros(size / MIN_BUDDY_SIZE) << PACK_SIZE_SHIFT;
1500 int packed = packedOffset | packedSize;
1501 freeList.offer(packed);
1502 release();
1503 }
1504
1505 @Override
1506 public int remainingCapacity() {
1507 if (!freeList.isEmpty()) {
1508 freeList.drain(freeListCapacity, this);
1509 }
1510 return super.remainingCapacity();
1511 }
1512
1513 @Override
1514 public boolean hasUnprocessedFreelistEntries() {
1515 return !freeList.isEmpty();
1516 }
1517
1518 @Override
1519 public void processFreelistEntries() {
1520 freeList.drain(freeListCapacity, this);
1521 }
1522
1523
1524
1525
1526 private int chooseFirstFreeBuddy(int index, int size, int currOffset) {
1527 byte[] buddies = this.buddies;
1528 while (index < buddies.length) {
1529 byte buddy = buddies[index];
1530 int currValue = MIN_BUDDY_SIZE << (buddy & SHIFT_MASK);
1531 if (currValue < size || (buddy & IS_CLAIMED) == IS_CLAIMED) {
1532 return -1;
1533 }
1534 if (currValue == size && (buddy & HAS_CLAIMED_CHILDREN) == 0) {
1535 buddies[index] |= IS_CLAIMED;
1536 return currOffset;
1537 }
1538 int found = chooseFirstFreeBuddy(index << 1, size, currOffset);
1539 if (found != -1) {
1540 buddies[index] |= HAS_CLAIMED_CHILDREN;
1541 return found;
1542 }
1543 index = (index << 1) + 1;
1544 currOffset += currValue >> 1;
1545 }
1546 return -1;
1547 }
1548
1549
1550
1551
1552 private boolean unreserveMatchingBuddy(int index, int size, int offset, int currOffset) {
1553 byte[] buddies = this.buddies;
1554 if (buddies.length <= index) {
1555 return false;
1556 }
1557 byte buddy = buddies[index];
1558 int currSize = MIN_BUDDY_SIZE << (buddy & SHIFT_MASK);
1559
1560 if (currSize == size) {
1561
1562 if (currOffset == offset) {
1563 buddies[index] &= SHIFT_MASK;
1564 return false;
1565 }
1566 throw new IllegalStateException("The intended segment was not found at index " +
1567 index + ", for size " + size + " and offset " + offset);
1568 }
1569
1570
1571 boolean claims;
1572 int siblingIndex;
1573 if (offset < currOffset + (currSize >> 1)) {
1574
1575 claims = unreserveMatchingBuddy(index << 1, size, offset, currOffset);
1576 siblingIndex = (index << 1) + 1;
1577 } else {
1578
1579 claims = unreserveMatchingBuddy((index << 1) + 1, size, offset, currOffset + (currSize >> 1));
1580 siblingIndex = index << 1;
1581 }
1582 if (!claims) {
1583
1584 byte sibling = buddies[siblingIndex];
1585 if ((sibling & SHIFT_MASK) == sibling) {
1586
1587 buddies[index] &= SHIFT_MASK;
1588 return false;
1589 }
1590 }
1591 return true;
1592 }
1593
1594 @Override
1595 public String toString() {
1596 int capacity = delegate.capacity();
1597 int remaining = capacity - allocatedBytes;
1598 return "BuddyChunk[capacity: " + capacity +
1599 ", remaining: " + remaining +
1600 ", free list: " + freeList.size() + ']';
1601 }
1602 }
1603
1604 static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
1605
1606 private final EnhancedHandle<AdaptiveByteBuf> handle;
1607
1608
1609 private int startIndex;
1610 private AbstractByteBuf rootParent;
1611 Chunk chunk;
1612 private int length;
1613 private int maxFastCapacity;
1614 private ByteBuffer tmpNioBuf;
1615 private boolean hasArray;
1616 private boolean hasMemoryAddress;
1617
1618 AdaptiveByteBuf(EnhancedHandle<AdaptiveByteBuf> recyclerHandle) {
1619 super(0);
1620 handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
1621 }
1622
1623 void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
1624 int startIndex, int size, int capacity, int maxCapacity) {
1625 this.startIndex = startIndex;
1626 chunk = wrapped;
1627 length = size;
1628 maxFastCapacity = capacity;
1629 maxCapacity(maxCapacity);
1630 setIndex0(readerIndex, writerIndex);
1631 hasArray = unwrapped.hasArray();
1632 hasMemoryAddress = unwrapped.hasMemoryAddress();
1633 rootParent = unwrapped;
1634 tmpNioBuf = null;
1635
1636 if (PlatformDependent.isJfrEnabled() && AllocateBufferEvent.isEventEnabled()) {
1637 AllocateBufferEvent event = new AllocateBufferEvent();
1638 if (event.shouldCommit()) {
1639 event.fill(this, AdaptiveByteBufAllocator.class);
1640 event.chunkPooled = wrapped.pooled;
1641 Magazine m = wrapped.magazine;
1642 event.chunkThreadLocal = m != null && m.allocationLock == null;
1643 event.commit();
1644 }
1645 }
1646 }
1647
1648 private AbstractByteBuf rootParent() {
1649 final AbstractByteBuf rootParent = this.rootParent;
1650 if (rootParent != null) {
1651 return rootParent;
1652 }
1653 throw new IllegalReferenceCountException();
1654 }
1655
1656 @Override
1657 public int capacity() {
1658 return length;
1659 }
1660
1661 @Override
1662 public int maxFastWritableBytes() {
1663 return Math.min(maxFastCapacity, maxCapacity()) - writerIndex;
1664 }
1665
1666 @Override
1667 public ByteBuf capacity(int newCapacity) {
1668 checkNewCapacity(newCapacity);
1669 if (length <= newCapacity && newCapacity <= maxFastCapacity) {
1670 length = newCapacity;
1671 return this;
1672 }
1673 if (newCapacity < capacity()) {
1674 length = newCapacity;
1675 trimIndicesToCapacity(newCapacity);
1676 return this;
1677 }
1678
1679 if (PlatformDependent.isJfrEnabled() && ReallocateBufferEvent.isEventEnabled()) {
1680 ReallocateBufferEvent event = new ReallocateBufferEvent();
1681 if (event.shouldCommit()) {
1682 event.fill(this, AdaptiveByteBufAllocator.class);
1683 event.newCapacity = newCapacity;
1684 event.commit();
1685 }
1686 }
1687
1688
1689 Chunk chunk = this.chunk;
1690 AdaptivePoolingAllocator allocator = chunk.allocator;
1691 int readerIndex = this.readerIndex;
1692 int writerIndex = this.writerIndex;
1693 int baseOldRootIndex = startIndex;
1694 int oldLength = length;
1695 int oldCapacity = maxFastCapacity;
1696 AbstractByteBuf oldRoot = rootParent();
1697 allocator.reallocate(newCapacity, maxCapacity(), this);
1698 oldRoot.getBytes(baseOldRootIndex, this, 0, oldLength);
1699 chunk.releaseSegment(baseOldRootIndex, oldCapacity);
1700 assert oldCapacity < maxFastCapacity && newCapacity <= maxFastCapacity:
1701 "Capacity increase failed";
1702 this.readerIndex = readerIndex;
1703 this.writerIndex = writerIndex;
1704 return this;
1705 }
1706
1707 @Override
1708 public ByteBufAllocator alloc() {
1709 return rootParent().alloc();
1710 }
1711
1712 @SuppressWarnings("deprecation")
1713 @Override
1714 public ByteOrder order() {
1715 return rootParent().order();
1716 }
1717
1718 @Override
1719 public ByteBuf unwrap() {
1720 return null;
1721 }
1722
1723 @Override
1724 public boolean isDirect() {
1725 return rootParent().isDirect();
1726 }
1727
1728 @Override
1729 public int arrayOffset() {
1730 return idx(rootParent().arrayOffset());
1731 }
1732
1733 @Override
1734 public boolean hasMemoryAddress() {
1735 return hasMemoryAddress;
1736 }
1737
1738 @Override
1739 public long memoryAddress() {
1740 ensureAccessible();
1741 return _memoryAddress();
1742 }
1743
1744 @Override
1745 long _memoryAddress() {
1746 AbstractByteBuf root = rootParent;
1747 return root != null ? root._memoryAddress() + startIndex : 0L;
1748 }
1749
1750 @Override
1751 boolean _isDirect() {
1752 AbstractByteBuf root = rootParent;
1753 return root != null && root.isDirect();
1754 }
1755
1756 @Override
1757 public ByteBuffer nioBuffer(int index, int length) {
1758 checkIndex(index, length);
1759 return rootParent().nioBuffer(idx(index), length);
1760 }
1761
1762 @Override
1763 public ByteBuffer internalNioBuffer(int index, int length) {
1764 checkIndex(index, length);
1765 return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1766 }
1767
1768 private ByteBuffer internalNioBuffer() {
1769 if (tmpNioBuf == null) {
1770 tmpNioBuf = rootParent().nioBuffer(startIndex, maxFastCapacity);
1771 }
1772 return (ByteBuffer) tmpNioBuf.clear();
1773 }
1774
1775 @Override
1776 public ByteBuffer[] nioBuffers(int index, int length) {
1777 checkIndex(index, length);
1778 return rootParent().nioBuffers(idx(index), length);
1779 }
1780
1781 @Override
1782 public boolean hasArray() {
1783 return hasArray;
1784 }
1785
1786 @Override
1787 public byte[] array() {
1788 ensureAccessible();
1789 return rootParent().array();
1790 }
1791
1792 @Override
1793 public ByteBuf copy(int index, int length) {
1794 checkIndex(index, length);
1795 return rootParent().copy(idx(index), length);
1796 }
1797
1798 @Override
1799 public int nioBufferCount() {
1800 return rootParent().nioBufferCount();
1801 }
1802
1803 @Override
1804 protected byte _getByte(int index) {
1805 return rootParent()._getByte(idx(index));
1806 }
1807
1808 @Override
1809 protected short _getShort(int index) {
1810 return rootParent()._getShort(idx(index));
1811 }
1812
1813 @Override
1814 protected short _getShortLE(int index) {
1815 return rootParent()._getShortLE(idx(index));
1816 }
1817
1818 @Override
1819 protected int _getUnsignedMedium(int index) {
1820 return rootParent()._getUnsignedMedium(idx(index));
1821 }
1822
1823 @Override
1824 protected int _getUnsignedMediumLE(int index) {
1825 return rootParent()._getUnsignedMediumLE(idx(index));
1826 }
1827
1828 @Override
1829 protected int _getInt(int index) {
1830 return rootParent()._getInt(idx(index));
1831 }
1832
1833 @Override
1834 protected int _getIntLE(int index) {
1835 return rootParent()._getIntLE(idx(index));
1836 }
1837
1838 @Override
1839 protected long _getLong(int index) {
1840 return rootParent()._getLong(idx(index));
1841 }
1842
1843 @Override
1844 protected long _getLongLE(int index) {
1845 return rootParent()._getLongLE(idx(index));
1846 }
1847
1848 @Override
1849 public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1850 checkIndex(index, length);
1851 rootParent().getBytes(idx(index), dst, dstIndex, length);
1852 return this;
1853 }
1854
1855 @Override
1856 public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1857 checkIndex(index, length);
1858 rootParent().getBytes(idx(index), dst, dstIndex, length);
1859 return this;
1860 }
1861
1862 @Override
1863 public ByteBuf getBytes(int index, ByteBuffer dst) {
1864 checkIndex(index, dst.remaining());
1865 rootParent().getBytes(idx(index), dst);
1866 return this;
1867 }
1868
1869 @Override
1870 protected void _setByte(int index, int value) {
1871 rootParent()._setByte(idx(index), value);
1872 }
1873
1874 @Override
1875 protected void _setShort(int index, int value) {
1876 rootParent()._setShort(idx(index), value);
1877 }
1878
1879 @Override
1880 protected void _setShortLE(int index, int value) {
1881 rootParent()._setShortLE(idx(index), value);
1882 }
1883
1884 @Override
1885 protected void _setMedium(int index, int value) {
1886 rootParent()._setMedium(idx(index), value);
1887 }
1888
1889 @Override
1890 protected void _setMediumLE(int index, int value) {
1891 rootParent()._setMediumLE(idx(index), value);
1892 }
1893
1894 @Override
1895 protected void _setInt(int index, int value) {
1896 rootParent()._setInt(idx(index), value);
1897 }
1898
1899 @Override
1900 protected void _setIntLE(int index, int value) {
1901 rootParent()._setIntLE(idx(index), value);
1902 }
1903
1904 @Override
1905 protected void _setLong(int index, long value) {
1906 rootParent()._setLong(idx(index), value);
1907 }
1908
1909 @Override
1910 protected void _setLongLE(int index, long value) {
1911 rootParent().setLongLE(idx(index), value);
1912 }
1913
1914 @Override
1915 public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1916 checkIndex(index, length);
1917 if (tmpNioBuf == null && PlatformDependent.javaVersion() >= 13) {
1918 ByteBuffer dstBuffer = rootParent()._internalNioBuffer();
1919 PlatformDependent.absolutePut(dstBuffer, idx(index), src, srcIndex, length);
1920 } else {
1921 ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1922 tmp.put(src, srcIndex, length);
1923 }
1924 return this;
1925 }
1926
1927 @Override
1928 public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1929 checkIndex(index, length);
1930 if (src instanceof AdaptiveByteBuf && PlatformDependent.javaVersion() >= 16) {
1931 AdaptiveByteBuf srcBuf = (AdaptiveByteBuf) src;
1932 srcBuf.checkIndex(srcIndex, length);
1933 ByteBuffer dstBuffer = rootParent()._internalNioBuffer();
1934 ByteBuffer srcBuffer = srcBuf.rootParent()._internalNioBuffer();
1935 PlatformDependent.absolutePut(dstBuffer, idx(index), srcBuffer, srcBuf.idx(srcIndex), length);
1936 } else {
1937 ByteBuffer tmp = internalNioBuffer();
1938 tmp.position(index);
1939 tmp.put(src.nioBuffer(srcIndex, length));
1940 }
1941 return this;
1942 }
1943
1944 @Override
1945 public ByteBuf setBytes(int index, ByteBuffer src) {
1946 int length = src.remaining();
1947 checkIndex(index, length);
1948 ByteBuffer tmp = internalNioBuffer();
1949 if (PlatformDependent.javaVersion() >= 16) {
1950 int offset = src.position();
1951 PlatformDependent.absolutePut(tmp, index, src, offset, length);
1952 src.position(offset + length);
1953 } else {
1954 tmp.position(index);
1955 tmp.put(src);
1956 }
1957 return this;
1958 }
1959
1960 @Override
1961 public ByteBuf getBytes(int index, OutputStream out, int length)
1962 throws IOException {
1963 checkIndex(index, length);
1964 if (length != 0) {
1965 ByteBuffer tmp = internalNioBuffer();
1966 ByteBufUtil.readBytes(alloc(), tmp.hasArray() ? tmp : tmp.duplicate(), index, length, out);
1967 }
1968 return this;
1969 }
1970
1971 @Override
1972 public int getBytes(int index, GatheringByteChannel out, int length)
1973 throws IOException {
1974 ByteBuffer buf = internalNioBuffer().duplicate();
1975 buf.clear().position(index).limit(index + length);
1976 return out.write(buf);
1977 }
1978
1979 @Override
1980 public int getBytes(int index, FileChannel out, long position, int length)
1981 throws IOException {
1982 ByteBuffer buf = internalNioBuffer().duplicate();
1983 buf.clear().position(index).limit(index + length);
1984 return out.write(buf, position);
1985 }
1986
1987 @Override
1988 public int setBytes(int index, InputStream in, int length)
1989 throws IOException {
1990 checkIndex(index, length);
1991 final AbstractByteBuf rootParent = rootParent();
1992 if (rootParent.hasArray()) {
1993 return rootParent.setBytes(idx(index), in, length);
1994 }
1995 byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1996 int readBytes = in.read(tmp, 0, length);
1997 if (readBytes <= 0) {
1998 return readBytes;
1999 }
2000 setBytes(index, tmp, 0, readBytes);
2001 return readBytes;
2002 }
2003
2004 @Override
2005 public int setBytes(int index, ScatteringByteChannel in, int length)
2006 throws IOException {
2007 try {
2008 return in.read(internalNioBuffer(index, length));
2009 } catch (ClosedChannelException ignored) {
2010 return -1;
2011 }
2012 }
2013
2014 @Override
2015 public int setBytes(int index, FileChannel in, long position, int length)
2016 throws IOException {
2017 try {
2018 return in.read(internalNioBuffer(index, length), position);
2019 } catch (ClosedChannelException ignored) {
2020 return -1;
2021 }
2022 }
2023
2024 @Override
2025 public int setCharSequence(int index, CharSequence sequence, Charset charset) {
2026 return setCharSequence0(index, sequence, charset, false);
2027 }
2028
2029 private int setCharSequence0(int index, CharSequence sequence, Charset charset, boolean expand) {
2030 if (charset.equals(CharsetUtil.UTF_8)) {
2031 int length = ByteBufUtil.utf8MaxBytes(sequence);
2032 if (expand) {
2033 ensureWritable0(length);
2034 checkIndex0(index, length);
2035 } else {
2036 checkIndex(index, length);
2037 }
2038 return ByteBufUtil.writeUtf8(this, index, length, sequence, sequence.length());
2039 }
2040 if (charset.equals(CharsetUtil.US_ASCII) || charset.equals(CharsetUtil.ISO_8859_1)) {
2041 int length = sequence.length();
2042 if (expand) {
2043 ensureWritable0(length);
2044 checkIndex0(index, length);
2045 } else {
2046 checkIndex(index, length);
2047 }
2048 return ByteBufUtil.writeAscii(this, index, sequence, length);
2049 }
2050 byte[] bytes = sequence.toString().getBytes(charset);
2051 if (expand) {
2052 ensureWritable0(bytes.length);
2053
2054 }
2055 setBytes(index, bytes);
2056 return bytes.length;
2057 }
2058
2059 @Override
2060 public int writeCharSequence(CharSequence sequence, Charset charset) {
2061 int written = setCharSequence0(writerIndex, sequence, charset, true);
2062 writerIndex += written;
2063 return written;
2064 }
2065
2066 @Override
2067 public int forEachByte(int index, int length, ByteProcessor processor) {
2068 checkIndex(index, length);
2069 int ret = rootParent().forEachByte(idx(index), length, processor);
2070 return forEachResult(ret);
2071 }
2072
2073 @Override
2074 public int forEachByteDesc(int index, int length, ByteProcessor processor) {
2075 checkIndex(index, length);
2076 int ret = rootParent().forEachByteDesc(idx(index), length, processor);
2077 return forEachResult(ret);
2078 }
2079
2080 @Override
2081 public ByteBuf setZero(int index, int length) {
2082 checkIndex(index, length);
2083 rootParent().setZero(idx(index), length);
2084 return this;
2085 }
2086
2087 @Override
2088 public ByteBuf writeZero(int length) {
2089 ensureWritable(length);
2090 rootParent().setZero(idx(writerIndex), length);
2091 writerIndex += length;
2092 return this;
2093 }
2094
2095 private int forEachResult(int ret) {
2096 if (ret < startIndex) {
2097 return -1;
2098 }
2099 return ret - startIndex;
2100 }
2101
2102 @Override
2103 public boolean isContiguous() {
2104 return rootParent().isContiguous();
2105 }
2106
2107 private int idx(int index) {
2108 return index + startIndex;
2109 }
2110
2111 @Override
2112 protected void deallocate() {
2113 if (PlatformDependent.isJfrEnabled() && FreeBufferEvent.isEventEnabled()) {
2114 FreeBufferEvent event = new FreeBufferEvent();
2115 if (event.shouldCommit()) {
2116 event.fill(this, AdaptiveByteBufAllocator.class);
2117 event.commit();
2118 }
2119 }
2120
2121 if (chunk != null) {
2122 chunk.releaseSegment(startIndex, maxFastCapacity);
2123 }
2124 tmpNioBuf = null;
2125 chunk = null;
2126 rootParent = null;
2127 handle.unguardedRecycle(this);
2128 }
2129 }
2130
2131
2132
2133
2134 interface ChunkAllocator {
2135
2136
2137
2138
2139
2140
2141 AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
2142 }
2143 }