1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.ByteProcessor;
19 import io.netty.util.CharsetUtil;
20 import io.netty.util.IllegalReferenceCountException;
21 import io.netty.util.NettyRuntime;
22 import io.netty.util.Recycler;
23 import io.netty.util.Recycler.EnhancedHandle;
24 import io.netty.util.concurrent.ConcurrentSkipListIntObjMultimap;
25 import io.netty.util.concurrent.ConcurrentSkipListIntObjMultimap.IntEntry;
26 import io.netty.util.concurrent.FastThreadLocal;
27 import io.netty.util.concurrent.FastThreadLocalThread;
28 import io.netty.util.concurrent.MpscIntQueue;
29 import io.netty.util.internal.MathUtil;
30 import io.netty.util.internal.ObjectUtil;
31 import io.netty.util.internal.PlatformDependent;
32 import io.netty.util.internal.RefCnt;
33 import io.netty.util.internal.SystemPropertyUtil;
34 import io.netty.util.internal.ThreadExecutorMap;
35 import io.netty.util.internal.UnstableApi;
36
37 import java.io.IOException;
38 import java.io.InputStream;
39 import java.io.OutputStream;
40 import java.nio.ByteBuffer;
41 import java.nio.ByteOrder;
42 import java.nio.channels.ClosedChannelException;
43 import java.nio.channels.FileChannel;
44 import java.nio.channels.GatheringByteChannel;
45 import java.nio.channels.ScatteringByteChannel;
46 import java.nio.charset.Charset;
47 import java.util.Arrays;
48 import java.util.Iterator;
49 import java.util.Queue;
50 import java.util.concurrent.ConcurrentLinkedQueue;
51 import java.util.concurrent.atomic.AtomicInteger;
52 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
53 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
54 import java.util.concurrent.atomic.LongAdder;
55 import java.util.concurrent.locks.StampedLock;
56 import java.util.function.IntConsumer;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 @UnstableApi
85 final class AdaptivePoolingAllocator {
86 private static final int LOW_MEM_THRESHOLD = 512 * 1024 * 1024;
87 private static final boolean IS_LOW_MEM = Runtime.getRuntime().maxMemory() <= LOW_MEM_THRESHOLD;
88
89
90
91
92
93 private static final boolean DISABLE_THREAD_LOCAL_MAGAZINES_ON_LOW_MEM = SystemPropertyUtil.getBoolean(
94 "io.netty.allocator.disableThreadLocalMagazinesOnLowMemory", true);
95
96
97
98
99
100
101
102
103 static final int MIN_CHUNK_SIZE = 128 * 1024;
104 private static final int EXPANSION_ATTEMPTS = 3;
105 private static final int INITIAL_MAGAZINES = 1;
106 private static final int RETIRE_CAPACITY = 256;
107 private static final int MAX_STRIPES = IS_LOW_MEM ? 1 : NettyRuntime.availableProcessors() * 2;
108 private static final int BUFS_PER_CHUNK = 8;
109
110
111
112
113
114
115 private static final int MAX_CHUNK_SIZE = IS_LOW_MEM ?
116 2 * 1024 * 1024 :
117 8 * 1024 * 1024;
118 private static final int MAX_POOLED_BUF_SIZE = MAX_CHUNK_SIZE / BUFS_PER_CHUNK;
119
120
121
122
123
124
125 private static final int CHUNK_REUSE_QUEUE = Math.max(2, SystemPropertyUtil.getInt(
126 "io.netty.allocator.chunkReuseQueueCapacity", NettyRuntime.availableProcessors() * 2));
127
128
129
130
131
132 private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
133 "io.netty.allocator.magazineBufferQueueCapacity", 1024);
134
135
136
137
138
139
140
141
142
143
144
145
146
147 private static final int[] SIZE_CLASSES = {
148 32,
149 64,
150 128,
151 256,
152 512,
153 640,
154 1024,
155 1152,
156 2048,
157 2304,
158 4096,
159 4352,
160 8192,
161 8704,
162 16384,
163 16896,
164 };
165
166 private static final int SIZE_CLASSES_COUNT = SIZE_CLASSES.length;
167 private static final byte[] SIZE_INDEXES = new byte[SIZE_CLASSES[SIZE_CLASSES_COUNT - 1] / 32 + 1];
168
169 static {
170 if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
171 throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
172 + " (expected: >= " + 2 + ')');
173 }
174 int lastIndex = 0;
175 for (int i = 0; i < SIZE_CLASSES_COUNT; i++) {
176 int sizeClass = SIZE_CLASSES[i];
177
178 assert (sizeClass & 31) == 0 : "Size class must be a multiple of 32";
179 int sizeIndex = sizeIndexOf(sizeClass);
180 Arrays.fill(SIZE_INDEXES, lastIndex + 1, sizeIndex + 1, (byte) i);
181 lastIndex = sizeIndex;
182 }
183 }
184
185 private final ChunkAllocator chunkAllocator;
186 private final ChunkRegistry chunkRegistry;
187 private final MagazineGroup[] sizeClassedMagazineGroups;
188 private final MagazineGroup largeBufferMagazineGroup;
189 private final FastThreadLocal<MagazineGroup[]> threadLocalGroup;
190
191 AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, boolean useCacheForNonEventLoopThreads) {
192 this.chunkAllocator = ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
193 chunkRegistry = new ChunkRegistry();
194 sizeClassedMagazineGroups = createMagazineGroupSizeClasses(this, false);
195 largeBufferMagazineGroup = new MagazineGroup(
196 this, chunkAllocator, new BuddyChunkManagementStrategy(), false);
197
198 boolean disableThreadLocalGroups = IS_LOW_MEM && DISABLE_THREAD_LOCAL_MAGAZINES_ON_LOW_MEM;
199 threadLocalGroup = disableThreadLocalGroups ? null : new FastThreadLocal<MagazineGroup[]>() {
200 @Override
201 protected MagazineGroup[] initialValue() {
202 if (useCacheForNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
203 return createMagazineGroupSizeClasses(AdaptivePoolingAllocator.this, true);
204 }
205 return null;
206 }
207
208 @Override
209 protected void onRemoval(final MagazineGroup[] groups) throws Exception {
210 if (groups != null) {
211 for (MagazineGroup group : groups) {
212 group.free();
213 }
214 }
215 }
216 };
217 }
218
219 private static MagazineGroup[] createMagazineGroupSizeClasses(
220 AdaptivePoolingAllocator allocator, boolean isThreadLocal) {
221 MagazineGroup[] groups = new MagazineGroup[SIZE_CLASSES.length];
222 for (int i = 0; i < SIZE_CLASSES.length; i++) {
223 int segmentSize = SIZE_CLASSES[i];
224 groups[i] = new MagazineGroup(allocator, allocator.chunkAllocator,
225 new SizeClassChunkManagementStrategy(segmentSize), isThreadLocal);
226 }
227 return groups;
228 }
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250 private static Queue<SizeClassedChunk> createSharedChunkQueue() {
251 return PlatformDependent.newFixedMpmcQueue(CHUNK_REUSE_QUEUE);
252 }
253
254 ByteBuf allocate(int size, int maxCapacity) {
255 return allocate(size, maxCapacity, Thread.currentThread(), null);
256 }
257
258 private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
259 AdaptiveByteBuf allocated = null;
260 if (size <= MAX_POOLED_BUF_SIZE) {
261 final int index = sizeClassIndexOf(size);
262 MagazineGroup[] magazineGroups;
263 if (!FastThreadLocalThread.currentThreadWillCleanupFastThreadLocals() ||
264 IS_LOW_MEM ||
265 (magazineGroups = threadLocalGroup.get()) == null) {
266 magazineGroups = sizeClassedMagazineGroups;
267 }
268 if (index < magazineGroups.length) {
269 allocated = magazineGroups[index].allocate(size, maxCapacity, currentThread, buf);
270 } else if (!IS_LOW_MEM) {
271 allocated = largeBufferMagazineGroup.allocate(size, maxCapacity, currentThread, buf);
272 }
273 }
274 if (allocated == null) {
275 allocated = allocateFallback(size, maxCapacity, currentThread, buf);
276 }
277 return allocated;
278 }
279
280 private static int sizeIndexOf(final int size) {
281
282 return size + 31 >> 5;
283 }
284
285 static int sizeClassIndexOf(int size) {
286 int sizeIndex = sizeIndexOf(size);
287 if (sizeIndex < SIZE_INDEXES.length) {
288 return SIZE_INDEXES[sizeIndex];
289 }
290 return SIZE_CLASSES_COUNT;
291 }
292
293 static int[] getSizeClasses() {
294 return SIZE_CLASSES.clone();
295 }
296
297 private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
298
299 Magazine magazine;
300 if (buf != null) {
301 Chunk chunk = buf.chunk;
302 if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
303 magazine = getFallbackMagazine(currentThread);
304 }
305 } else {
306 magazine = getFallbackMagazine(currentThread);
307 buf = magazine.newBuffer();
308 }
309
310 AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
311 Chunk chunk = new Chunk(innerChunk, magazine, false);
312 chunkRegistry.add(chunk);
313 try {
314 boolean success = chunk.readInitInto(buf, size, size, maxCapacity);
315 assert success: "Failed to initialize ByteBuf with dedicated chunk";
316 } finally {
317
318
319
320 chunk.release();
321 }
322 return buf;
323 }
324
325 private Magazine getFallbackMagazine(Thread currentThread) {
326 Magazine[] mags = largeBufferMagazineGroup.magazines;
327 return mags[(int) currentThread.getId() & mags.length - 1];
328 }
329
330
331
332
333 void reallocate(int size, int maxCapacity, AdaptiveByteBuf into) {
334 AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
335 assert result == into: "Re-allocation created separate buffer instance";
336 }
337
338 long usedMemory() {
339 return chunkRegistry.totalCapacity();
340 }
341
342
343
344
345 @SuppressWarnings({"FinalizeDeclaration", "deprecation"})
346 @Override
347 protected void finalize() throws Throwable {
348 try {
349 free();
350 } finally {
351 super.finalize();
352 }
353 }
354
355 private void free() {
356 largeBufferMagazineGroup.free();
357 }
358
359 private static final class MagazineGroup {
360 private final AdaptivePoolingAllocator allocator;
361 private final ChunkAllocator chunkAllocator;
362 private final ChunkManagementStrategy chunkManagementStrategy;
363 private final ChunkCache chunkCache;
364 private final StampedLock magazineExpandLock;
365 private final Magazine threadLocalMagazine;
366 private Thread ownerThread;
367 private volatile Magazine[] magazines;
368 private volatile boolean freed;
369
370 MagazineGroup(AdaptivePoolingAllocator allocator,
371 ChunkAllocator chunkAllocator,
372 ChunkManagementStrategy chunkManagementStrategy,
373 boolean isThreadLocal) {
374 this.allocator = allocator;
375 this.chunkAllocator = chunkAllocator;
376 this.chunkManagementStrategy = chunkManagementStrategy;
377 chunkCache = chunkManagementStrategy.createChunkCache(isThreadLocal);
378 if (isThreadLocal) {
379 ownerThread = Thread.currentThread();
380 magazineExpandLock = null;
381 threadLocalMagazine = new Magazine(this, false, chunkManagementStrategy.createController(this));
382 } else {
383 ownerThread = null;
384 magazineExpandLock = new StampedLock();
385 threadLocalMagazine = null;
386 Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
387 for (int i = 0; i < mags.length; i++) {
388 mags[i] = new Magazine(this, true, chunkManagementStrategy.createController(this));
389 }
390 magazines = mags;
391 }
392 }
393
394 public AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
395 boolean reallocate = buf != null;
396
397
398 Magazine tlMag = threadLocalMagazine;
399 if (tlMag != null) {
400 if (buf == null) {
401 buf = tlMag.newBuffer();
402 }
403 boolean allocated = tlMag.tryAllocate(size, maxCapacity, buf, reallocate);
404 assert allocated : "Allocation of threadLocalMagazine must always succeed";
405 return buf;
406 }
407
408
409 long threadId = currentThread.getId();
410 Magazine[] mags;
411 int expansions = 0;
412 do {
413 mags = magazines;
414 int mask = mags.length - 1;
415 int index = (int) (threadId & mask);
416 for (int i = 0, m = mags.length << 1; i < m; i++) {
417 Magazine mag = mags[index + i & mask];
418 if (buf == null) {
419 buf = mag.newBuffer();
420 }
421 if (mag.tryAllocate(size, maxCapacity, buf, reallocate)) {
422
423 return buf;
424 }
425 }
426 expansions++;
427 } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
428
429
430 if (!reallocate && buf != null) {
431 buf.release();
432 }
433 return null;
434 }
435
436 private boolean tryExpandMagazines(int currentLength) {
437 if (currentLength >= MAX_STRIPES) {
438 return true;
439 }
440 final Magazine[] mags;
441 long writeLock = magazineExpandLock.tryWriteLock();
442 if (writeLock != 0) {
443 try {
444 mags = magazines;
445 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
446 return true;
447 }
448 Magazine[] expanded = new Magazine[mags.length * 2];
449 for (int i = 0, l = expanded.length; i < l; i++) {
450 expanded[i] = new Magazine(this, true, chunkManagementStrategy.createController(this));
451 }
452 magazines = expanded;
453 } finally {
454 magazineExpandLock.unlockWrite(writeLock);
455 }
456 for (Magazine magazine : mags) {
457 magazine.free();
458 }
459 }
460 return true;
461 }
462
463 Chunk pollChunk(int size) {
464 return chunkCache.pollChunk(size);
465 }
466
467 boolean offerChunk(Chunk chunk) {
468 if (freed) {
469 return false;
470 }
471
472 if (chunk.hasUnprocessedFreelistEntries()) {
473 chunk.processFreelistEntries();
474 }
475 boolean isAdded = chunkCache.offerChunk(chunk);
476
477 if (freed && isAdded) {
478
479 freeChunkReuseQueue(ownerThread);
480 }
481 return isAdded;
482 }
483
484 private void free() {
485 freed = true;
486 Thread ownerThread = this.ownerThread;
487 if (threadLocalMagazine != null) {
488 this.ownerThread = null;
489 threadLocalMagazine.free();
490 } else {
491 long stamp = magazineExpandLock.writeLock();
492 try {
493 Magazine[] mags = magazines;
494 for (Magazine magazine : mags) {
495 magazine.free();
496 }
497 } finally {
498 magazineExpandLock.unlockWrite(stamp);
499 }
500 }
501 freeChunkReuseQueue(ownerThread);
502 }
503
504 private void freeChunkReuseQueue(Thread ownerThread) {
505 Chunk chunk;
506 while ((chunk = chunkCache.pollChunk(0)) != null) {
507 if (ownerThread != null && chunk instanceof SizeClassedChunk) {
508 SizeClassedChunk threadLocalChunk = (SizeClassedChunk) chunk;
509 assert ownerThread == threadLocalChunk.ownerThread;
510
511
512
513 threadLocalChunk.ownerThread = null;
514 }
515 chunk.markToDeallocate();
516 }
517 }
518 }
519
520 private interface ChunkCache {
521 Chunk pollChunk(int size);
522 boolean offerChunk(Chunk chunk);
523 }
524
525 private static final class ConcurrentQueueChunkCache implements ChunkCache {
526 private final Queue<SizeClassedChunk> queue;
527
528 private ConcurrentQueueChunkCache() {
529 queue = createSharedChunkQueue();
530 }
531
532 @Override
533 public SizeClassedChunk pollChunk(int size) {
534
535
536 Queue<SizeClassedChunk> queue = this.queue;
537 for (int i = 0; i < CHUNK_REUSE_QUEUE; i++) {
538 SizeClassedChunk chunk = queue.poll();
539 if (chunk == null) {
540 return null;
541 }
542 if (chunk.hasRemainingCapacity()) {
543 return chunk;
544 }
545 queue.offer(chunk);
546 }
547 return null;
548 }
549
550 @Override
551 public boolean offerChunk(Chunk chunk) {
552 return queue.offer((SizeClassedChunk) chunk);
553 }
554 }
555
556 private static final class ConcurrentSkipListChunkCache implements ChunkCache {
557 private final ConcurrentSkipListIntObjMultimap<Chunk> chunks;
558
559 private ConcurrentSkipListChunkCache() {
560 chunks = new ConcurrentSkipListIntObjMultimap<>(-1);
561 }
562
563 @Override
564 public Chunk pollChunk(int size) {
565 if (chunks.isEmpty()) {
566 return null;
567 }
568 IntEntry<Chunk> entry = chunks.pollCeilingEntry(size);
569 if (entry != null) {
570 Chunk chunk = entry.getValue();
571 if (chunk.hasUnprocessedFreelistEntries()) {
572 chunk.processFreelistEntries();
573 }
574 return chunk;
575 }
576
577 Chunk bestChunk = null;
578 int bestRemainingCapacity = 0;
579 Iterator<IntEntry<Chunk>> itr = chunks.iterator();
580 while (itr.hasNext()) {
581 entry = itr.next();
582 final Chunk chunk;
583 if (entry != null && (chunk = entry.getValue()).hasUnprocessedFreelistEntries()) {
584 if (!chunks.remove(entry.getKey(), entry.getValue())) {
585 continue;
586 }
587 chunk.processFreelistEntries();
588 int remainingCapacity = chunk.remainingCapacity();
589 if (remainingCapacity >= size &&
590 (bestChunk == null || remainingCapacity > bestRemainingCapacity)) {
591 if (bestChunk != null) {
592 chunks.put(bestRemainingCapacity, bestChunk);
593 }
594 bestChunk = chunk;
595 bestRemainingCapacity = remainingCapacity;
596 } else {
597 chunks.put(remainingCapacity, chunk);
598 }
599 }
600 }
601
602 return bestChunk;
603 }
604
605 @Override
606 public boolean offerChunk(Chunk chunk) {
607 chunks.put(chunk.remainingCapacity(), chunk);
608
609 int size = chunks.size();
610 while (size > CHUNK_REUSE_QUEUE) {
611
612 int key = -1;
613 Chunk toDeallocate = null;
614 for (IntEntry<Chunk> entry : chunks) {
615 Chunk candidate = entry.getValue();
616 if (candidate != null) {
617 if (toDeallocate == null) {
618 toDeallocate = candidate;
619 key = entry.getKey();
620 } else {
621 int candidateRefCnt = RefCnt.refCnt(candidate.refCnt);
622 int toDeallocateRefCnt = RefCnt.refCnt(toDeallocate.refCnt);
623 if (candidateRefCnt < toDeallocateRefCnt ||
624 candidateRefCnt == toDeallocateRefCnt &&
625 candidate.capacity() < toDeallocate.capacity()) {
626 toDeallocate = candidate;
627 key = entry.getKey();
628 }
629 }
630 }
631 }
632 if (toDeallocate == null) {
633 break;
634 }
635 if (chunks.remove(key, toDeallocate)) {
636 toDeallocate.markToDeallocate();
637 }
638 size = chunks.size();
639 }
640 return true;
641 }
642 }
643
644 private interface ChunkManagementStrategy {
645 ChunkController createController(MagazineGroup group);
646
647 ChunkCache createChunkCache(boolean isThreadLocal);
648 }
649
650 private interface ChunkController {
651
652
653
654 int computeBufferCapacity(int requestedSize, int maxCapacity, boolean isReallocation);
655
656
657
658
659 Chunk newChunkAllocation(int promptingSize, Magazine magazine);
660 }
661
662 private static final class SizeClassChunkManagementStrategy implements ChunkManagementStrategy {
663
664
665
666 private static final int MIN_SEGMENTS_PER_CHUNK = 32;
667 private final int segmentSize;
668 private final int chunkSize;
669
670 private SizeClassChunkManagementStrategy(int segmentSize) {
671 this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
672 chunkSize = Math.max(MIN_CHUNK_SIZE, segmentSize * MIN_SEGMENTS_PER_CHUNK);
673 }
674
675 @Override
676 public ChunkController createController(MagazineGroup group) {
677 return new SizeClassChunkController(group, segmentSize, chunkSize);
678 }
679
680 @Override
681 public ChunkCache createChunkCache(boolean isThreadLocal) {
682 return new ConcurrentQueueChunkCache();
683 }
684 }
685
686 private static final class SizeClassChunkController implements ChunkController {
687
688 private final ChunkAllocator chunkAllocator;
689 private final int segmentSize;
690 private final int chunkSize;
691 private final ChunkRegistry chunkRegistry;
692
693 private SizeClassChunkController(MagazineGroup group, int segmentSize, int chunkSize) {
694 chunkAllocator = group.chunkAllocator;
695 this.segmentSize = segmentSize;
696 this.chunkSize = chunkSize;
697 chunkRegistry = group.allocator.chunkRegistry;
698 }
699
700 private MpscIntQueue createEmptyFreeList() {
701 return MpscIntQueue.create(chunkSize / segmentSize, SizeClassedChunk.FREE_LIST_EMPTY);
702 }
703
704 private MpscIntQueue createFreeList() {
705 final int segmentsCount = chunkSize / segmentSize;
706 final MpscIntQueue freeList = MpscIntQueue.create(segmentsCount, SizeClassedChunk.FREE_LIST_EMPTY);
707 int segmentOffset = 0;
708 for (int i = 0; i < segmentsCount; i++) {
709 freeList.offer(segmentOffset);
710 segmentOffset += segmentSize;
711 }
712 return freeList;
713 }
714
715 private IntStack createLocalFreeList() {
716 final int segmentsCount = chunkSize / segmentSize;
717 int segmentOffset = chunkSize;
718 int[] offsets = new int[segmentsCount];
719 for (int i = 0; i < segmentsCount; i++) {
720 segmentOffset -= segmentSize;
721 offsets[i] = segmentOffset;
722 }
723 return new IntStack(offsets);
724 }
725
726 @Override
727 public int computeBufferCapacity(
728 int requestedSize, int maxCapacity, boolean isReallocation) {
729 return Math.min(segmentSize, maxCapacity);
730 }
731
732 @Override
733 public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
734 AbstractByteBuf chunkBuffer = chunkAllocator.allocate(chunkSize, chunkSize);
735 assert chunkBuffer.capacity() == chunkSize;
736 SizeClassedChunk chunk = new SizeClassedChunk(chunkBuffer, magazine, this);
737 chunkRegistry.add(chunk);
738 return chunk;
739 }
740 }
741
742 private static final class BuddyChunkManagementStrategy implements ChunkManagementStrategy {
743 private final AtomicInteger maxChunkSize = new AtomicInteger();
744
745 @Override
746 public ChunkController createController(MagazineGroup group) {
747 return new BuddyChunkController(group, maxChunkSize);
748 }
749
750 @Override
751 public ChunkCache createChunkCache(boolean isThreadLocal) {
752 return new ConcurrentSkipListChunkCache();
753 }
754 }
755
756 private static final class BuddyChunkController implements ChunkController {
757 private final ChunkAllocator chunkAllocator;
758 private final ChunkRegistry chunkRegistry;
759 private final AtomicInteger maxChunkSize;
760
761 BuddyChunkController(MagazineGroup group, AtomicInteger maxChunkSize) {
762 chunkAllocator = group.chunkAllocator;
763 chunkRegistry = group.allocator.chunkRegistry;
764 this.maxChunkSize = maxChunkSize;
765 }
766
767 @Override
768 public int computeBufferCapacity(int requestedSize, int maxCapacity, boolean isReallocation) {
769 return MathUtil.safeFindNextPositivePowerOfTwo(requestedSize);
770 }
771
772 @Override
773 public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
774 int maxChunkSize = this.maxChunkSize.get();
775 int proposedChunkSize = MathUtil.safeFindNextPositivePowerOfTwo(BUFS_PER_CHUNK * promptingSize);
776 int chunkSize = Math.min(MAX_CHUNK_SIZE, Math.max(maxChunkSize, proposedChunkSize));
777 if (chunkSize > maxChunkSize) {
778
779 this.maxChunkSize.set(chunkSize);
780 }
781 BuddyChunk chunk = new BuddyChunk(chunkAllocator.allocate(chunkSize, chunkSize), magazine);
782 chunkRegistry.add(chunk);
783 return chunk;
784 }
785 }
786
787 private static final class Magazine {
788 private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
789 static {
790 NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
791 }
792 private static final Chunk MAGAZINE_FREED = new Chunk();
793
794 private static final class AdaptiveRecycler extends Recycler<AdaptiveByteBuf> {
795
796 private AdaptiveRecycler(boolean unguarded) {
797
798 super(unguarded);
799 }
800
801 private AdaptiveRecycler(int maxCapacity, boolean unguarded) {
802
803 super(maxCapacity, unguarded);
804 }
805
806 @Override
807 protected AdaptiveByteBuf newObject(final Handle<AdaptiveByteBuf> handle) {
808 return new AdaptiveByteBuf((EnhancedHandle<AdaptiveByteBuf>) handle);
809 }
810
811 public static AdaptiveRecycler threadLocal() {
812 return new AdaptiveRecycler(true);
813 }
814
815 public static AdaptiveRecycler sharedWith(int maxCapacity) {
816 return new AdaptiveRecycler(maxCapacity, true);
817 }
818 }
819
820 private static final AdaptiveRecycler EVENT_LOOP_LOCAL_BUFFER_POOL = AdaptiveRecycler.threadLocal();
821
822 private Chunk current;
823 @SuppressWarnings("unused")
824 private volatile Chunk nextInLine;
825 private final MagazineGroup group;
826 private final ChunkController chunkController;
827 private final StampedLock allocationLock;
828 private final AdaptiveRecycler recycler;
829
830 Magazine(MagazineGroup group, boolean shareable, ChunkController chunkController) {
831 this.group = group;
832 this.chunkController = chunkController;
833
834 if (shareable) {
835
836 allocationLock = new StampedLock();
837 recycler = AdaptiveRecycler.sharedWith(MAGAZINE_BUFFER_QUEUE_CAPACITY);
838 } else {
839 allocationLock = null;
840 recycler = null;
841 }
842 }
843
844 public boolean tryAllocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
845 if (allocationLock == null) {
846
847 return allocate(size, maxCapacity, buf, reallocate);
848 }
849
850
851 long writeLock = allocationLock.tryWriteLock();
852 if (writeLock != 0) {
853 try {
854 return allocate(size, maxCapacity, buf, reallocate);
855 } finally {
856 allocationLock.unlockWrite(writeLock);
857 }
858 }
859 return allocateWithoutLock(size, maxCapacity, buf);
860 }
861
862 private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf buf) {
863 Chunk curr = NEXT_IN_LINE.getAndSet(this, null);
864 if (curr == MAGAZINE_FREED) {
865
866 restoreMagazineFreed();
867 return false;
868 }
869 if (curr == null) {
870 curr = group.pollChunk(size);
871 if (curr == null) {
872 return false;
873 }
874 curr.attachToMagazine(this);
875 }
876 boolean allocated = false;
877 int remainingCapacity = curr.remainingCapacity();
878 int startingCapacity = chunkController.computeBufferCapacity(
879 size, maxCapacity, true );
880 if (remainingCapacity >= size &&
881 curr.readInitInto(buf, size, Math.min(remainingCapacity, startingCapacity), maxCapacity)) {
882 allocated = true;
883 remainingCapacity = curr.remainingCapacity();
884 }
885 try {
886 if (remainingCapacity >= RETIRE_CAPACITY) {
887 transferToNextInLineOrRelease(curr);
888 curr = null;
889 }
890 } finally {
891 if (curr != null) {
892 curr.releaseFromMagazine();
893 }
894 }
895 return allocated;
896 }
897
898 private boolean allocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
899 int startingCapacity = chunkController.computeBufferCapacity(size, maxCapacity, reallocate);
900 Chunk curr = current;
901 if (curr != null) {
902 boolean success = curr.readInitInto(buf, size, startingCapacity, maxCapacity);
903 int remainingCapacity = curr.remainingCapacity();
904 if (!success && remainingCapacity > 0) {
905 current = null;
906 transferToNextInLineOrRelease(curr);
907 } else if (remainingCapacity == 0) {
908 current = null;
909 curr.releaseFromMagazine();
910 }
911 if (success) {
912 return true;
913 }
914 }
915
916 assert current == null;
917
918
919
920
921
922
923
924
925 curr = NEXT_IN_LINE.getAndSet(this, null);
926 if (curr != null) {
927 if (curr == MAGAZINE_FREED) {
928
929 restoreMagazineFreed();
930 return false;
931 }
932
933 int remainingCapacity = curr.remainingCapacity();
934 if (remainingCapacity > startingCapacity &&
935 curr.readInitInto(buf, size, startingCapacity, maxCapacity)) {
936
937 current = curr;
938 return true;
939 }
940
941 try {
942 if (remainingCapacity >= size) {
943
944
945 return curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
946 }
947 } finally {
948
949
950 curr.releaseFromMagazine();
951 }
952 }
953
954
955 curr = group.pollChunk(size);
956 if (curr == null) {
957 curr = chunkController.newChunkAllocation(size, this);
958 } else {
959 curr.attachToMagazine(this);
960
961 int remainingCapacity = curr.remainingCapacity();
962 if (remainingCapacity == 0 || remainingCapacity < size) {
963
964 if (remainingCapacity < RETIRE_CAPACITY) {
965 curr.releaseFromMagazine();
966 } else {
967
968
969 transferToNextInLineOrRelease(curr);
970 }
971 curr = chunkController.newChunkAllocation(size, this);
972 }
973 }
974
975 current = curr;
976 boolean success;
977 try {
978 int remainingCapacity = curr.remainingCapacity();
979 assert remainingCapacity >= size;
980 if (remainingCapacity > startingCapacity) {
981 success = curr.readInitInto(buf, size, startingCapacity, maxCapacity);
982 curr = null;
983 } else {
984 success = curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
985 }
986 } finally {
987 if (curr != null) {
988
989
990 curr.releaseFromMagazine();
991 current = null;
992 }
993 }
994 return success;
995 }
996
997 private void restoreMagazineFreed() {
998 Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
999 if (next != null && next != MAGAZINE_FREED) {
1000
1001 next.releaseFromMagazine();
1002 }
1003 }
1004
1005 private void transferToNextInLineOrRelease(Chunk chunk) {
1006 if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
1007 return;
1008 }
1009
1010 Chunk nextChunk = NEXT_IN_LINE.get(this);
1011 if (nextChunk != null && nextChunk != MAGAZINE_FREED
1012 && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
1013 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
1014 nextChunk.releaseFromMagazine();
1015 return;
1016 }
1017 }
1018
1019
1020
1021
1022 chunk.releaseFromMagazine();
1023 }
1024
1025 void free() {
1026
1027 restoreMagazineFreed();
1028 long stamp = allocationLock != null ? allocationLock.writeLock() : 0;
1029 try {
1030 if (current != null) {
1031 current.releaseFromMagazine();
1032 current = null;
1033 }
1034 } finally {
1035 if (allocationLock != null) {
1036 allocationLock.unlockWrite(stamp);
1037 }
1038 }
1039 }
1040
1041 public AdaptiveByteBuf newBuffer() {
1042 AdaptiveRecycler recycler = this.recycler;
1043 AdaptiveByteBuf buf = recycler == null? EVENT_LOOP_LOCAL_BUFFER_POOL.get() : recycler.get();
1044 buf.resetRefCnt();
1045 buf.discardMarks();
1046 return buf;
1047 }
1048
1049 boolean offerToQueue(Chunk chunk) {
1050 return group.offerChunk(chunk);
1051 }
1052 }
1053
1054 private static final class ChunkRegistry {
1055 private final LongAdder totalCapacity = new LongAdder();
1056
1057 public long totalCapacity() {
1058 return totalCapacity.sum();
1059 }
1060
1061 public void add(Chunk chunk) {
1062 totalCapacity.add(chunk.capacity());
1063 }
1064
1065 public void remove(Chunk chunk) {
1066 totalCapacity.add(-chunk.capacity());
1067 }
1068 }
1069
1070 private static class Chunk implements ChunkInfo {
1071 protected final AbstractByteBuf delegate;
1072 protected Magazine magazine;
1073 private final AdaptivePoolingAllocator allocator;
1074
1075
1076 private final RefCnt refCnt = new RefCnt();
1077 private final int capacity;
1078 private final boolean pooled;
1079 protected int allocatedBytes;
1080
1081 Chunk() {
1082
1083 delegate = null;
1084 magazine = null;
1085 allocator = null;
1086 capacity = 0;
1087 pooled = false;
1088 }
1089
1090 Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
1091 this.delegate = delegate;
1092 this.pooled = pooled;
1093 capacity = delegate.capacity();
1094 attachToMagazine(magazine);
1095
1096
1097 allocator = magazine.group.allocator;
1098
1099 if (PlatformDependent.isJfrEnabled() && AllocateChunkEvent.isEventEnabled()) {
1100 AllocateChunkEvent event = new AllocateChunkEvent();
1101 if (event.shouldCommit()) {
1102 event.fill(this, AdaptiveByteBufAllocator.class);
1103 event.pooled = pooled;
1104 event.threadLocal = magazine.allocationLock == null;
1105 event.commit();
1106 }
1107 }
1108 }
1109
1110 Magazine currentMagazine() {
1111 return magazine;
1112 }
1113
1114 void detachFromMagazine() {
1115 if (magazine != null) {
1116 magazine = null;
1117 }
1118 }
1119
1120 void attachToMagazine(Magazine magazine) {
1121 assert this.magazine == null;
1122 this.magazine = magazine;
1123 }
1124
1125
1126
1127
1128 void releaseFromMagazine() {
1129
1130
1131 Magazine mag = magazine;
1132 detachFromMagazine();
1133 if (!mag.offerToQueue(this)) {
1134 markToDeallocate();
1135 }
1136 }
1137
1138
1139
1140
1141 void releaseSegment(int ignoredSegmentId, int size) {
1142 release();
1143 }
1144
1145 void markToDeallocate() {
1146 release();
1147 }
1148
1149 private void retain() {
1150 RefCnt.retain(refCnt);
1151 }
1152
1153 protected boolean release() {
1154 boolean deallocate = RefCnt.release(refCnt);
1155 if (deallocate) {
1156 deallocate();
1157 }
1158 return deallocate;
1159 }
1160
1161 protected void deallocate() {
1162 onRelease();
1163 allocator.chunkRegistry.remove(this);
1164 delegate.release();
1165 }
1166
1167 private void onRelease() {
1168 if (PlatformDependent.isJfrEnabled() && FreeChunkEvent.isEventEnabled()) {
1169 FreeChunkEvent event = new FreeChunkEvent();
1170 if (event.shouldCommit()) {
1171 event.fill(this, AdaptiveByteBufAllocator.class);
1172 event.pooled = pooled;
1173 event.commit();
1174 }
1175 }
1176 }
1177
1178 public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1179 int startIndex = allocatedBytes;
1180 allocatedBytes = startIndex + startingCapacity;
1181 Chunk chunk = this;
1182 chunk.retain();
1183 try {
1184 buf.init(delegate, chunk, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1185 chunk = null;
1186 } finally {
1187 if (chunk != null) {
1188
1189
1190
1191 allocatedBytes = startIndex;
1192 chunk.release();
1193 }
1194 }
1195 return true;
1196 }
1197
1198 public int remainingCapacity() {
1199 return capacity - allocatedBytes;
1200 }
1201
1202 public boolean hasUnprocessedFreelistEntries() {
1203 return false;
1204 }
1205
1206 public void processFreelistEntries() {
1207 }
1208
1209 @Override
1210 public int capacity() {
1211 return capacity;
1212 }
1213
1214 @Override
1215 public boolean isDirect() {
1216 return delegate.isDirect();
1217 }
1218
1219 @Override
1220 public long memoryAddress() {
1221 return delegate._memoryAddress();
1222 }
1223 }
1224
1225 private static final class IntStack {
1226
1227 private final int[] stack;
1228 private int top;
1229
1230 IntStack(int[] initialValues) {
1231 stack = initialValues;
1232 top = initialValues.length - 1;
1233 }
1234
1235 public boolean isEmpty() {
1236 return top == -1;
1237 }
1238
1239 public int pop() {
1240 final int last = stack[top];
1241 top--;
1242 return last;
1243 }
1244
1245 public void push(int value) {
1246 stack[top + 1] = value;
1247 top++;
1248 }
1249
1250 public int size() {
1251 return top + 1;
1252 }
1253 }
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274 private static final class SizeClassedChunk extends Chunk {
1275 private static final int FREE_LIST_EMPTY = -1;
1276 private static final int AVAILABLE = -1;
1277
1278
1279 private static final int DEALLOCATED = Integer.MIN_VALUE;
1280 private static final AtomicIntegerFieldUpdater<SizeClassedChunk> STATE =
1281 AtomicIntegerFieldUpdater.newUpdater(SizeClassedChunk.class, "state");
1282 private volatile int state;
1283 private final int segments;
1284 private final int segmentSize;
1285 private final MpscIntQueue externalFreeList;
1286 private final IntStack localFreeList;
1287 private Thread ownerThread;
1288
1289 SizeClassedChunk(AbstractByteBuf delegate, Magazine magazine,
1290 SizeClassChunkController controller) {
1291 super(delegate, magazine, true);
1292 segmentSize = controller.segmentSize;
1293 segments = controller.chunkSize / segmentSize;
1294 STATE.lazySet(this, AVAILABLE);
1295 ownerThread = magazine.group.ownerThread;
1296 if (ownerThread == null) {
1297 externalFreeList = controller.createFreeList();
1298 localFreeList = null;
1299 } else {
1300 externalFreeList = controller.createEmptyFreeList();
1301 localFreeList = controller.createLocalFreeList();
1302 }
1303 }
1304
1305 @Override
1306 public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1307 assert state == AVAILABLE;
1308 final int startIndex = nextAvailableSegmentOffset();
1309 if (startIndex == FREE_LIST_EMPTY) {
1310 return false;
1311 }
1312 allocatedBytes += segmentSize;
1313 try {
1314 buf.init(delegate, this, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1315 } catch (Throwable t) {
1316 allocatedBytes -= segmentSize;
1317 releaseSegmentOffsetIntoFreeList(startIndex);
1318 throw t;
1319 }
1320 return true;
1321 }
1322
1323 private int nextAvailableSegmentOffset() {
1324 final int startIndex;
1325 IntStack localFreeList = this.localFreeList;
1326 if (localFreeList != null) {
1327 assert Thread.currentThread() == ownerThread;
1328 if (localFreeList.isEmpty()) {
1329 startIndex = externalFreeList.poll();
1330 } else {
1331 startIndex = localFreeList.pop();
1332 }
1333 } else {
1334 startIndex = externalFreeList.poll();
1335 }
1336 return startIndex;
1337 }
1338
1339
1340
1341
1342 public boolean hasRemainingCapacity() {
1343 int remaining = super.remainingCapacity();
1344 if (remaining > 0) {
1345 return true;
1346 }
1347 if (localFreeList != null) {
1348 return !localFreeList.isEmpty();
1349 }
1350 return !externalFreeList.isEmpty();
1351 }
1352
1353 @Override
1354 public int remainingCapacity() {
1355 int remaining = super.remainingCapacity();
1356 return remaining > segmentSize ? remaining : updateRemainingCapacity(remaining);
1357 }
1358
1359 private int updateRemainingCapacity(int snapshotted) {
1360 int freeSegments = externalFreeList.size();
1361 IntStack localFreeList = this.localFreeList;
1362 if (localFreeList != null) {
1363 freeSegments += localFreeList.size();
1364 }
1365 int updated = freeSegments * segmentSize;
1366 if (updated != snapshotted) {
1367 allocatedBytes = capacity() - updated;
1368 }
1369 return updated;
1370 }
1371
1372 private void releaseSegmentOffsetIntoFreeList(int startIndex) {
1373 IntStack localFreeList = this.localFreeList;
1374 if (localFreeList != null && Thread.currentThread() == ownerThread) {
1375 localFreeList.push(startIndex);
1376 } else {
1377 boolean segmentReturned = externalFreeList.offer(startIndex);
1378 assert segmentReturned : "Unable to return segment " + startIndex + " to free list";
1379 }
1380 }
1381
1382 @Override
1383 void releaseSegment(int startIndex, int size) {
1384 IntStack localFreeList = this.localFreeList;
1385 if (localFreeList != null && Thread.currentThread() == ownerThread) {
1386 localFreeList.push(startIndex);
1387 int state = this.state;
1388 if (state != AVAILABLE) {
1389 updateStateOnLocalReleaseSegment(state, localFreeList);
1390 }
1391 } else {
1392 boolean segmentReturned = externalFreeList.offer(startIndex);
1393 assert segmentReturned;
1394
1395 int state = this.state;
1396 if (state != AVAILABLE) {
1397 deallocateIfNeeded(state);
1398 }
1399 }
1400 }
1401
1402 private void updateStateOnLocalReleaseSegment(int previousLocalSize, IntStack localFreeList) {
1403 int newLocalSize = localFreeList.size();
1404 boolean alwaysTrue = STATE.compareAndSet(this, previousLocalSize, newLocalSize);
1405 assert alwaysTrue : "this shouldn't happen unless double release in the local free list";
1406 deallocateIfNeeded(newLocalSize);
1407 }
1408
1409 private void deallocateIfNeeded(int localSize) {
1410
1411 int totalFreeSegments = localSize + externalFreeList.size();
1412 if (totalFreeSegments == segments && STATE.compareAndSet(this, localSize, DEALLOCATED)) {
1413 deallocate();
1414 }
1415 }
1416
1417 @Override
1418 void markToDeallocate() {
1419 IntStack localFreeList = this.localFreeList;
1420 int localSize = localFreeList != null ? localFreeList.size() : 0;
1421 STATE.set(this, localSize);
1422 deallocateIfNeeded(localSize);
1423 }
1424 }
1425
1426 private static final class BuddyChunk extends Chunk implements IntConsumer {
1427 private static final int MIN_BUDDY_SIZE = 32768;
1428 private static final byte IS_CLAIMED = (byte) (1 << 7);
1429 private static final byte HAS_CLAIMED_CHILDREN = 1 << 6;
1430 private static final byte SHIFT_MASK = ~(IS_CLAIMED | HAS_CLAIMED_CHILDREN);
1431 private static final int PACK_OFFSET_MASK = 0xFFFF;
1432 private static final int PACK_SIZE_SHIFT = Integer.SIZE - Integer.numberOfLeadingZeros(PACK_OFFSET_MASK);
1433
1434 private final MpscIntQueue freeList;
1435
1436 private final byte[] buddies;
1437 private final int freeListCapacity;
1438
1439 BuddyChunk(AbstractByteBuf delegate, Magazine magazine) {
1440 super(delegate, magazine, true);
1441 freeListCapacity = delegate.capacity() / MIN_BUDDY_SIZE;
1442 int maxShift = Integer.numberOfTrailingZeros(freeListCapacity);
1443 assert maxShift <= 30;
1444 freeList = MpscIntQueue.create(freeListCapacity, -1);
1445 buddies = new byte[freeListCapacity << 1];
1446
1447
1448 int index = 1;
1449 int runLength = 1;
1450 int currentRun = 0;
1451 while (maxShift > 0) {
1452 buddies[index++] = (byte) maxShift;
1453 if (++currentRun == runLength) {
1454 currentRun = 0;
1455 runLength <<= 1;
1456 maxShift--;
1457 }
1458 }
1459 }
1460
1461 @Override
1462 public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1463 if (!freeList.isEmpty()) {
1464 freeList.drain(freeListCapacity, this);
1465 }
1466 int startIndex = chooseFirstFreeBuddy(1, startingCapacity, 0);
1467 if (startIndex == -1) {
1468 return false;
1469 }
1470 Chunk chunk = this;
1471 chunk.retain();
1472 try {
1473 buf.init(delegate, this, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1474 allocatedBytes += startingCapacity;
1475 chunk = null;
1476 } finally {
1477 if (chunk != null) {
1478 unreserveMatchingBuddy(1, startingCapacity, startIndex, 0);
1479
1480
1481 chunk.release();
1482 }
1483 }
1484 return true;
1485 }
1486
1487 @Override
1488 public void accept(int packed) {
1489
1490 int size = MIN_BUDDY_SIZE << (packed >> PACK_SIZE_SHIFT);
1491 int offset = (packed & PACK_OFFSET_MASK) * MIN_BUDDY_SIZE;
1492 unreserveMatchingBuddy(1, size, offset, 0);
1493 allocatedBytes -= size;
1494 }
1495
1496 @Override
1497 void releaseSegment(int startingIndex, int size) {
1498 int packedOffset = startingIndex / MIN_BUDDY_SIZE;
1499 int packedSize = Integer.numberOfTrailingZeros(size / MIN_BUDDY_SIZE) << PACK_SIZE_SHIFT;
1500 int packed = packedOffset | packedSize;
1501 freeList.offer(packed);
1502 release();
1503 }
1504
1505 @Override
1506 public int remainingCapacity() {
1507 if (!freeList.isEmpty()) {
1508 freeList.drain(freeListCapacity, this);
1509 }
1510 return super.remainingCapacity();
1511 }
1512
1513 @Override
1514 public boolean hasUnprocessedFreelistEntries() {
1515 return !freeList.isEmpty();
1516 }
1517
1518 @Override
1519 public void processFreelistEntries() {
1520 freeList.drain(freeListCapacity, this);
1521 }
1522
1523
1524
1525
1526 private int chooseFirstFreeBuddy(int index, int size, int currOffset) {
1527 byte[] buddies = this.buddies;
1528 while (index < buddies.length) {
1529 byte buddy = buddies[index];
1530 int currValue = MIN_BUDDY_SIZE << (buddy & SHIFT_MASK);
1531 if (currValue < size || (buddy & IS_CLAIMED) == IS_CLAIMED) {
1532 return -1;
1533 }
1534 if (currValue == size && (buddy & HAS_CLAIMED_CHILDREN) == 0) {
1535 buddies[index] |= IS_CLAIMED;
1536 return currOffset;
1537 }
1538 int found = chooseFirstFreeBuddy(index << 1, size, currOffset);
1539 if (found != -1) {
1540 buddies[index] |= HAS_CLAIMED_CHILDREN;
1541 return found;
1542 }
1543 index = (index << 1) + 1;
1544 currOffset += currValue >> 1;
1545 }
1546 return -1;
1547 }
1548
1549
1550
1551
1552 private boolean unreserveMatchingBuddy(int index, int size, int offset, int currOffset) {
1553 byte[] buddies = this.buddies;
1554 if (buddies.length <= index) {
1555 return false;
1556 }
1557 byte buddy = buddies[index];
1558 int currSize = MIN_BUDDY_SIZE << (buddy & SHIFT_MASK);
1559
1560 if (currSize == size) {
1561
1562 if (currOffset == offset) {
1563 buddies[index] &= SHIFT_MASK;
1564 return false;
1565 }
1566 throw new IllegalStateException("The intended segment was not found at index " +
1567 index + ", for size " + size + " and offset " + offset);
1568 }
1569
1570
1571 boolean claims;
1572 int siblingIndex;
1573 if (offset < currOffset + (currSize >> 1)) {
1574
1575 claims = unreserveMatchingBuddy(index << 1, size, offset, currOffset);
1576 siblingIndex = (index << 1) + 1;
1577 } else {
1578
1579 claims = unreserveMatchingBuddy((index << 1) + 1, size, offset, currOffset + (currSize >> 1));
1580 siblingIndex = index << 1;
1581 }
1582 if (!claims) {
1583
1584 byte sibling = buddies[siblingIndex];
1585 if ((sibling & SHIFT_MASK) == sibling) {
1586
1587 buddies[index] &= SHIFT_MASK;
1588 return false;
1589 }
1590 }
1591 return true;
1592 }
1593
1594 @Override
1595 public String toString() {
1596 int capacity = delegate.capacity();
1597 int remaining = capacity - allocatedBytes;
1598 return "BuddyChunk[capacity: " + capacity +
1599 ", remaining: " + remaining +
1600 ", free list: " + freeList.size() + ']';
1601 }
1602 }
1603
1604 static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
1605
1606 private final EnhancedHandle<AdaptiveByteBuf> handle;
1607
1608
1609 private int startIndex;
1610 private AbstractByteBuf rootParent;
1611 Chunk chunk;
1612 private int length;
1613 private int maxFastCapacity;
1614 private ByteBuffer tmpNioBuf;
1615 private boolean hasArray;
1616 private boolean hasMemoryAddress;
1617
1618 AdaptiveByteBuf(EnhancedHandle<AdaptiveByteBuf> recyclerHandle) {
1619 super(0);
1620 handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
1621 }
1622
1623 void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
1624 int startIndex, int size, int capacity, int maxCapacity) {
1625 this.startIndex = startIndex;
1626 chunk = wrapped;
1627 length = size;
1628 maxFastCapacity = capacity;
1629 maxCapacity(maxCapacity);
1630 setIndex0(readerIndex, writerIndex);
1631 hasArray = unwrapped.hasArray();
1632 hasMemoryAddress = unwrapped.hasMemoryAddress();
1633 rootParent = unwrapped;
1634 tmpNioBuf = null;
1635
1636 if (PlatformDependent.isJfrEnabled() && AllocateBufferEvent.isEventEnabled()) {
1637 AllocateBufferEvent event = new AllocateBufferEvent();
1638 if (event.shouldCommit()) {
1639 event.fill(this, AdaptiveByteBufAllocator.class);
1640 event.chunkPooled = wrapped.pooled;
1641 Magazine m = wrapped.magazine;
1642 event.chunkThreadLocal = m != null && m.allocationLock == null;
1643 event.commit();
1644 }
1645 }
1646 }
1647
1648 private AbstractByteBuf rootParent() {
1649 final AbstractByteBuf rootParent = this.rootParent;
1650 if (rootParent != null) {
1651 return rootParent;
1652 }
1653 throw new IllegalReferenceCountException();
1654 }
1655
1656 @Override
1657 public int capacity() {
1658 return length;
1659 }
1660
1661 @Override
1662 public int maxFastWritableBytes() {
1663 return Math.min(maxFastCapacity, maxCapacity()) - writerIndex;
1664 }
1665
1666 @Override
1667 public ByteBuf capacity(int newCapacity) {
1668 checkNewCapacity(newCapacity);
1669 if (length <= newCapacity && newCapacity <= maxFastCapacity) {
1670 length = newCapacity;
1671 return this;
1672 }
1673 if (newCapacity < capacity()) {
1674 length = newCapacity;
1675 trimIndicesToCapacity(newCapacity);
1676 return this;
1677 }
1678
1679 if (PlatformDependent.isJfrEnabled() && ReallocateBufferEvent.isEventEnabled()) {
1680 ReallocateBufferEvent event = new ReallocateBufferEvent();
1681 if (event.shouldCommit()) {
1682 event.fill(this, AdaptiveByteBufAllocator.class);
1683 event.newCapacity = newCapacity;
1684 event.commit();
1685 }
1686 }
1687
1688
1689 Chunk chunk = this.chunk;
1690 AdaptivePoolingAllocator allocator = chunk.allocator;
1691 int readerIndex = this.readerIndex;
1692 int writerIndex = this.writerIndex;
1693 int baseOldRootIndex = startIndex;
1694 int oldLength = length;
1695 int oldCapacity = maxFastCapacity;
1696 AbstractByteBuf oldRoot = rootParent();
1697 allocator.reallocate(newCapacity, maxCapacity(), this);
1698 oldRoot.getBytes(baseOldRootIndex, this, 0, oldLength);
1699 chunk.releaseSegment(baseOldRootIndex, oldCapacity);
1700 assert oldCapacity < maxFastCapacity && newCapacity <= maxFastCapacity:
1701 "Capacity increase failed";
1702 this.readerIndex = readerIndex;
1703 this.writerIndex = writerIndex;
1704 return this;
1705 }
1706
1707 @Override
1708 public ByteBufAllocator alloc() {
1709 return rootParent().alloc();
1710 }
1711
1712 @SuppressWarnings("deprecation")
1713 @Override
1714 public ByteOrder order() {
1715 return rootParent().order();
1716 }
1717
1718 @Override
1719 public ByteBuf unwrap() {
1720 return null;
1721 }
1722
1723 @Override
1724 public boolean isDirect() {
1725 return rootParent().isDirect();
1726 }
1727
1728 @Override
1729 public int arrayOffset() {
1730 return idx(rootParent().arrayOffset());
1731 }
1732
1733 @Override
1734 public boolean hasMemoryAddress() {
1735 return hasMemoryAddress;
1736 }
1737
1738 @Override
1739 public long memoryAddress() {
1740 ensureAccessible();
1741 return _memoryAddress();
1742 }
1743
1744 @Override
1745 long _memoryAddress() {
1746 AbstractByteBuf root = rootParent;
1747 return root != null ? root._memoryAddress() + startIndex : 0L;
1748 }
1749
1750 @Override
1751 public ByteBuffer nioBuffer(int index, int length) {
1752 checkIndex(index, length);
1753 return rootParent().nioBuffer(idx(index), length);
1754 }
1755
1756 @Override
1757 public ByteBuffer internalNioBuffer(int index, int length) {
1758 checkIndex(index, length);
1759 return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1760 }
1761
1762 private ByteBuffer internalNioBuffer() {
1763 if (tmpNioBuf == null) {
1764 tmpNioBuf = rootParent().nioBuffer(startIndex, maxFastCapacity);
1765 }
1766 return (ByteBuffer) tmpNioBuf.clear();
1767 }
1768
1769 @Override
1770 public ByteBuffer[] nioBuffers(int index, int length) {
1771 checkIndex(index, length);
1772 return rootParent().nioBuffers(idx(index), length);
1773 }
1774
1775 @Override
1776 public boolean hasArray() {
1777 return hasArray;
1778 }
1779
1780 @Override
1781 public byte[] array() {
1782 ensureAccessible();
1783 return rootParent().array();
1784 }
1785
1786 @Override
1787 public ByteBuf copy(int index, int length) {
1788 checkIndex(index, length);
1789 return rootParent().copy(idx(index), length);
1790 }
1791
1792 @Override
1793 public int nioBufferCount() {
1794 return rootParent().nioBufferCount();
1795 }
1796
1797 @Override
1798 protected byte _getByte(int index) {
1799 return rootParent()._getByte(idx(index));
1800 }
1801
1802 @Override
1803 protected short _getShort(int index) {
1804 return rootParent()._getShort(idx(index));
1805 }
1806
1807 @Override
1808 protected short _getShortLE(int index) {
1809 return rootParent()._getShortLE(idx(index));
1810 }
1811
1812 @Override
1813 protected int _getUnsignedMedium(int index) {
1814 return rootParent()._getUnsignedMedium(idx(index));
1815 }
1816
1817 @Override
1818 protected int _getUnsignedMediumLE(int index) {
1819 return rootParent()._getUnsignedMediumLE(idx(index));
1820 }
1821
1822 @Override
1823 protected int _getInt(int index) {
1824 return rootParent()._getInt(idx(index));
1825 }
1826
1827 @Override
1828 protected int _getIntLE(int index) {
1829 return rootParent()._getIntLE(idx(index));
1830 }
1831
1832 @Override
1833 protected long _getLong(int index) {
1834 return rootParent()._getLong(idx(index));
1835 }
1836
1837 @Override
1838 protected long _getLongLE(int index) {
1839 return rootParent()._getLongLE(idx(index));
1840 }
1841
1842 @Override
1843 public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1844 checkIndex(index, length);
1845 rootParent().getBytes(idx(index), dst, dstIndex, length);
1846 return this;
1847 }
1848
1849 @Override
1850 public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1851 checkIndex(index, length);
1852 rootParent().getBytes(idx(index), dst, dstIndex, length);
1853 return this;
1854 }
1855
1856 @Override
1857 public ByteBuf getBytes(int index, ByteBuffer dst) {
1858 checkIndex(index, dst.remaining());
1859 rootParent().getBytes(idx(index), dst);
1860 return this;
1861 }
1862
1863 @Override
1864 protected void _setByte(int index, int value) {
1865 rootParent()._setByte(idx(index), value);
1866 }
1867
1868 @Override
1869 protected void _setShort(int index, int value) {
1870 rootParent()._setShort(idx(index), value);
1871 }
1872
1873 @Override
1874 protected void _setShortLE(int index, int value) {
1875 rootParent()._setShortLE(idx(index), value);
1876 }
1877
1878 @Override
1879 protected void _setMedium(int index, int value) {
1880 rootParent()._setMedium(idx(index), value);
1881 }
1882
1883 @Override
1884 protected void _setMediumLE(int index, int value) {
1885 rootParent()._setMediumLE(idx(index), value);
1886 }
1887
1888 @Override
1889 protected void _setInt(int index, int value) {
1890 rootParent()._setInt(idx(index), value);
1891 }
1892
1893 @Override
1894 protected void _setIntLE(int index, int value) {
1895 rootParent()._setIntLE(idx(index), value);
1896 }
1897
1898 @Override
1899 protected void _setLong(int index, long value) {
1900 rootParent()._setLong(idx(index), value);
1901 }
1902
1903 @Override
1904 protected void _setLongLE(int index, long value) {
1905 rootParent().setLongLE(idx(index), value);
1906 }
1907
1908 @Override
1909 public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1910 checkIndex(index, length);
1911 if (tmpNioBuf == null && PlatformDependent.javaVersion() >= 13) {
1912 ByteBuffer dstBuffer = rootParent()._internalNioBuffer();
1913 PlatformDependent.absolutePut(dstBuffer, idx(index), src, srcIndex, length);
1914 } else {
1915 ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1916 tmp.put(src, srcIndex, length);
1917 }
1918 return this;
1919 }
1920
1921 @Override
1922 public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1923 checkIndex(index, length);
1924 if (src instanceof AdaptiveByteBuf && PlatformDependent.javaVersion() >= 16) {
1925 AdaptiveByteBuf srcBuf = (AdaptiveByteBuf) src;
1926 srcBuf.checkIndex(srcIndex, length);
1927 ByteBuffer dstBuffer = rootParent()._internalNioBuffer();
1928 ByteBuffer srcBuffer = srcBuf.rootParent()._internalNioBuffer();
1929 PlatformDependent.absolutePut(dstBuffer, idx(index), srcBuffer, srcBuf.idx(srcIndex), length);
1930 } else {
1931 ByteBuffer tmp = internalNioBuffer();
1932 tmp.position(index);
1933 tmp.put(src.nioBuffer(srcIndex, length));
1934 }
1935 return this;
1936 }
1937
1938 @Override
1939 public ByteBuf setBytes(int index, ByteBuffer src) {
1940 int length = src.remaining();
1941 checkIndex(index, length);
1942 ByteBuffer tmp = internalNioBuffer();
1943 if (PlatformDependent.javaVersion() >= 16) {
1944 int offset = src.position();
1945 PlatformDependent.absolutePut(tmp, index, src, offset, length);
1946 src.position(offset + length);
1947 } else {
1948 tmp.position(index);
1949 tmp.put(src);
1950 }
1951 return this;
1952 }
1953
1954 @Override
1955 public ByteBuf getBytes(int index, OutputStream out, int length)
1956 throws IOException {
1957 checkIndex(index, length);
1958 if (length != 0) {
1959 ByteBuffer tmp = internalNioBuffer();
1960 ByteBufUtil.readBytes(alloc(), tmp.hasArray() ? tmp : tmp.duplicate(), index, length, out);
1961 }
1962 return this;
1963 }
1964
1965 @Override
1966 public int getBytes(int index, GatheringByteChannel out, int length)
1967 throws IOException {
1968 ByteBuffer buf = internalNioBuffer().duplicate();
1969 buf.clear().position(index).limit(index + length);
1970 return out.write(buf);
1971 }
1972
1973 @Override
1974 public int getBytes(int index, FileChannel out, long position, int length)
1975 throws IOException {
1976 ByteBuffer buf = internalNioBuffer().duplicate();
1977 buf.clear().position(index).limit(index + length);
1978 return out.write(buf, position);
1979 }
1980
1981 @Override
1982 public int setBytes(int index, InputStream in, int length)
1983 throws IOException {
1984 checkIndex(index, length);
1985 final AbstractByteBuf rootParent = rootParent();
1986 if (rootParent.hasArray()) {
1987 return rootParent.setBytes(idx(index), in, length);
1988 }
1989 byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1990 int readBytes = in.read(tmp, 0, length);
1991 if (readBytes <= 0) {
1992 return readBytes;
1993 }
1994 setBytes(index, tmp, 0, readBytes);
1995 return readBytes;
1996 }
1997
1998 @Override
1999 public int setBytes(int index, ScatteringByteChannel in, int length)
2000 throws IOException {
2001 try {
2002 return in.read(internalNioBuffer(index, length));
2003 } catch (ClosedChannelException ignored) {
2004 return -1;
2005 }
2006 }
2007
2008 @Override
2009 public int setBytes(int index, FileChannel in, long position, int length)
2010 throws IOException {
2011 try {
2012 return in.read(internalNioBuffer(index, length), position);
2013 } catch (ClosedChannelException ignored) {
2014 return -1;
2015 }
2016 }
2017
2018 @Override
2019 public int setCharSequence(int index, CharSequence sequence, Charset charset) {
2020 return setCharSequence0(index, sequence, charset, false);
2021 }
2022
2023 private int setCharSequence0(int index, CharSequence sequence, Charset charset, boolean expand) {
2024 if (charset.equals(CharsetUtil.UTF_8)) {
2025 int length = ByteBufUtil.utf8MaxBytes(sequence);
2026 if (expand) {
2027 ensureWritable0(length);
2028 checkIndex0(index, length);
2029 } else {
2030 checkIndex(index, length);
2031 }
2032 return ByteBufUtil.writeUtf8(this, index, length, sequence, sequence.length());
2033 }
2034 if (charset.equals(CharsetUtil.US_ASCII) || charset.equals(CharsetUtil.ISO_8859_1)) {
2035 int length = sequence.length();
2036 if (expand) {
2037 ensureWritable0(length);
2038 checkIndex0(index, length);
2039 } else {
2040 checkIndex(index, length);
2041 }
2042 return ByteBufUtil.writeAscii(this, index, sequence, length);
2043 }
2044 byte[] bytes = sequence.toString().getBytes(charset);
2045 if (expand) {
2046 ensureWritable0(bytes.length);
2047
2048 }
2049 setBytes(index, bytes);
2050 return bytes.length;
2051 }
2052
2053 @Override
2054 public int writeCharSequence(CharSequence sequence, Charset charset) {
2055 int written = setCharSequence0(writerIndex, sequence, charset, true);
2056 writerIndex += written;
2057 return written;
2058 }
2059
2060 @Override
2061 public int forEachByte(int index, int length, ByteProcessor processor) {
2062 checkIndex(index, length);
2063 int ret = rootParent().forEachByte(idx(index), length, processor);
2064 return forEachResult(ret);
2065 }
2066
2067 @Override
2068 public int forEachByteDesc(int index, int length, ByteProcessor processor) {
2069 checkIndex(index, length);
2070 int ret = rootParent().forEachByteDesc(idx(index), length, processor);
2071 return forEachResult(ret);
2072 }
2073
2074 @Override
2075 public ByteBuf setZero(int index, int length) {
2076 checkIndex(index, length);
2077 rootParent().setZero(idx(index), length);
2078 return this;
2079 }
2080
2081 @Override
2082 public ByteBuf writeZero(int length) {
2083 ensureWritable(length);
2084 rootParent().setZero(idx(writerIndex), length);
2085 writerIndex += length;
2086 return this;
2087 }
2088
2089 private int forEachResult(int ret) {
2090 if (ret < startIndex) {
2091 return -1;
2092 }
2093 return ret - startIndex;
2094 }
2095
2096 @Override
2097 public boolean isContiguous() {
2098 return rootParent().isContiguous();
2099 }
2100
2101 private int idx(int index) {
2102 return index + startIndex;
2103 }
2104
2105 @Override
2106 protected void deallocate() {
2107 if (PlatformDependent.isJfrEnabled() && FreeBufferEvent.isEventEnabled()) {
2108 FreeBufferEvent event = new FreeBufferEvent();
2109 if (event.shouldCommit()) {
2110 event.fill(this, AdaptiveByteBufAllocator.class);
2111 event.commit();
2112 }
2113 }
2114
2115 if (chunk != null) {
2116 chunk.releaseSegment(startIndex, maxFastCapacity);
2117 }
2118 tmpNioBuf = null;
2119 chunk = null;
2120 rootParent = null;
2121 handle.unguardedRecycle(this);
2122 }
2123 }
2124
2125
2126
2127
2128 interface ChunkAllocator {
2129
2130
2131
2132
2133
2134
2135 AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
2136 }
2137 }