1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.ByteProcessor;
19 import io.netty.util.CharsetUtil;
20 import io.netty.util.IllegalReferenceCountException;
21 import io.netty.util.IntConsumer;
22 import io.netty.util.NettyRuntime;
23 import io.netty.util.Recycler;
24 import io.netty.util.Recycler.EnhancedHandle;
25 import io.netty.util.ReferenceCounted;
26 import io.netty.util.concurrent.ConcurrentSkipListIntObjMultimap;
27 import io.netty.util.concurrent.ConcurrentSkipListIntObjMultimap.IntEntry;
28 import io.netty.util.concurrent.FastThreadLocal;
29 import io.netty.util.concurrent.FastThreadLocalThread;
30 import io.netty.util.concurrent.MpscAtomicIntegerArrayQueue;
31 import io.netty.util.concurrent.MpscIntQueue;
32 import io.netty.util.internal.MathUtil;
33 import io.netty.util.internal.ObjectUtil;
34 import io.netty.util.internal.PlatformDependent;
35 import io.netty.util.internal.ReferenceCountUpdater;
36 import io.netty.util.internal.SuppressJava6Requirement;
37 import io.netty.util.internal.SystemPropertyUtil;
38 import io.netty.util.internal.ThreadExecutorMap;
39 import io.netty.util.internal.UnstableApi;
40
41 import java.io.IOException;
42 import java.io.InputStream;
43 import java.io.OutputStream;
44 import java.nio.ByteBuffer;
45 import java.nio.ByteOrder;
46 import java.nio.channels.ClosedChannelException;
47 import java.nio.channels.FileChannel;
48 import java.nio.channels.GatheringByteChannel;
49 import java.nio.channels.ScatteringByteChannel;
50 import java.nio.charset.Charset;
51 import java.util.Arrays;
52 import java.util.Iterator;
53 import java.util.Queue;
54 import java.util.concurrent.ConcurrentLinkedQueue;
55 import java.util.concurrent.atomic.AtomicInteger;
56 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
57 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
58 import java.util.concurrent.atomic.LongAdder;
59 import java.util.concurrent.locks.StampedLock;
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87 @SuppressJava6Requirement(reason = "Guarded by version check")
88 @UnstableApi
89 final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.AdaptiveAllocatorApi {
90 private static final int LOW_MEM_THRESHOLD = 512 * 1024 * 1024;
91 private static final boolean IS_LOW_MEM = Runtime.getRuntime().maxMemory() <= LOW_MEM_THRESHOLD;
92
93
94
95
96
97 private static final boolean DISABLE_THREAD_LOCAL_MAGAZINES_ON_LOW_MEM = SystemPropertyUtil.getBoolean(
98 "io.netty.allocator.disableThreadLocalMagazinesOnLowMemory", true);
99
100
101
102
103
104
105
106
107 static final int MIN_CHUNK_SIZE = 128 * 1024;
108 private static final int EXPANSION_ATTEMPTS = 3;
109 private static final int INITIAL_MAGAZINES = 1;
110 private static final int RETIRE_CAPACITY = 256;
111 private static final int MAX_STRIPES = IS_LOW_MEM ? 1 : NettyRuntime.availableProcessors() * 2;
112 private static final int BUFS_PER_CHUNK = 8;
113
114
115
116
117
118
119 private static final int MAX_CHUNK_SIZE = IS_LOW_MEM ?
120 2 * 1024 * 1024 :
121 8 * 1024 * 1024;
122 private static final int MAX_POOLED_BUF_SIZE = MAX_CHUNK_SIZE / BUFS_PER_CHUNK;
123
124
125
126
127
128
129 private static final int CHUNK_REUSE_QUEUE = Math.max(2, SystemPropertyUtil.getInt(
130 "io.netty.allocator.chunkReuseQueueCapacity", NettyRuntime.availableProcessors() * 2));
131
132
133
134
135
136 private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
137 "io.netty.allocator.magazineBufferQueueCapacity", 1024);
138
139
140
141
142
143
144
145
146
147
148
149
150
151 private static final int[] SIZE_CLASSES = {
152 32,
153 64,
154 128,
155 256,
156 512,
157 640,
158 1024,
159 1152,
160 2048,
161 2304,
162 4096,
163 4352,
164 8192,
165 8704,
166 16384,
167 16896,
168 };
169
170 private static final int SIZE_CLASSES_COUNT = SIZE_CLASSES.length;
171 private static final byte[] SIZE_INDEXES = new byte[SIZE_CLASSES[SIZE_CLASSES_COUNT - 1] / 32 + 1];
172
173 static {
174 if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
175 throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
176 + " (expected: >= " + 2 + ')');
177 }
178 int lastIndex = 0;
179 for (int i = 0; i < SIZE_CLASSES_COUNT; i++) {
180 int sizeClass = SIZE_CLASSES[i];
181
182 assert (sizeClass & 31) == 0 : "Size class must be a multiple of 32";
183 int sizeIndex = sizeIndexOf(sizeClass);
184 Arrays.fill(SIZE_INDEXES, lastIndex + 1, sizeIndex + 1, (byte) i);
185 lastIndex = sizeIndex;
186 }
187 }
188
189 private final ChunkAllocator chunkAllocator;
190 private final ChunkRegistry chunkRegistry;
191 private final MagazineGroup[] sizeClassedMagazineGroups;
192 private final MagazineGroup largeBufferMagazineGroup;
193 private final FastThreadLocal<MagazineGroup[]> threadLocalGroup;
194
195 AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, final boolean useCacheForNonEventLoopThreads) {
196 this.chunkAllocator = ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
197 chunkRegistry = new ChunkRegistry();
198 sizeClassedMagazineGroups = createMagazineGroupSizeClasses(this, false);
199 largeBufferMagazineGroup = new MagazineGroup(
200 this, chunkAllocator, new BuddyChunkManagementStrategy(), false);
201
202 boolean disableThreadLocalGroups = IS_LOW_MEM && DISABLE_THREAD_LOCAL_MAGAZINES_ON_LOW_MEM;
203 threadLocalGroup = disableThreadLocalGroups ? null : new FastThreadLocal<MagazineGroup[]>() {
204 @Override
205 protected MagazineGroup[] initialValue() {
206 if (useCacheForNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
207 return createMagazineGroupSizeClasses(AdaptivePoolingAllocator.this, true);
208 }
209 return null;
210 }
211
212 @Override
213 protected void onRemoval(final MagazineGroup[] groups) throws Exception {
214 if (groups != null) {
215 for (MagazineGroup group : groups) {
216 group.free();
217 }
218 }
219 }
220 };
221 }
222
223 private static MagazineGroup[] createMagazineGroupSizeClasses(
224 AdaptivePoolingAllocator allocator, boolean isThreadLocal) {
225 MagazineGroup[] groups = new MagazineGroup[SIZE_CLASSES.length];
226 for (int i = 0; i < SIZE_CLASSES.length; i++) {
227 int segmentSize = SIZE_CLASSES[i];
228 groups[i] = new MagazineGroup(allocator, allocator.chunkAllocator,
229 new SizeClassChunkManagementStrategy(segmentSize), isThreadLocal);
230 }
231 return groups;
232 }
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254 private static Queue<SizeClassedChunk> createSharedChunkQueue() {
255 return PlatformDependent.newFixedMpmcQueue(CHUNK_REUSE_QUEUE);
256 }
257
258 @Override
259 public ByteBuf allocate(int size, int maxCapacity) {
260 return allocate(size, maxCapacity, Thread.currentThread(), null);
261 }
262
263 private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
264 AdaptiveByteBuf allocated = null;
265 if (size <= MAX_POOLED_BUF_SIZE) {
266 final int index = sizeClassIndexOf(size);
267 MagazineGroup[] magazineGroups;
268 if (!FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread()) ||
269 IS_LOW_MEM ||
270 (magazineGroups = threadLocalGroup.get()) == null) {
271 magazineGroups = sizeClassedMagazineGroups;
272 }
273 if (index < magazineGroups.length) {
274 allocated = magazineGroups[index].allocate(size, maxCapacity, currentThread, buf);
275 } else if (!IS_LOW_MEM) {
276 allocated = largeBufferMagazineGroup.allocate(size, maxCapacity, currentThread, buf);
277 }
278 }
279 if (allocated == null) {
280 allocated = allocateFallback(size, maxCapacity, currentThread, buf);
281 }
282 return allocated;
283 }
284
285 private static int sizeIndexOf(final int size) {
286
287 return size + 31 >> 5;
288 }
289
290 static int sizeClassIndexOf(int size) {
291 int sizeIndex = sizeIndexOf(size);
292 if (sizeIndex < SIZE_INDEXES.length) {
293 return SIZE_INDEXES[sizeIndex];
294 }
295 return SIZE_CLASSES_COUNT;
296 }
297
298 static int[] getSizeClasses() {
299 return SIZE_CLASSES.clone();
300 }
301
302 private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
303
304 Magazine magazine;
305 if (buf != null) {
306 Chunk chunk = buf.chunk;
307 if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
308 magazine = getFallbackMagazine(currentThread);
309 }
310 } else {
311 magazine = getFallbackMagazine(currentThread);
312 buf = magazine.newBuffer();
313 }
314
315 AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
316 Chunk chunk = new Chunk(innerChunk, magazine);
317 chunkRegistry.add(chunk);
318 try {
319 boolean success = chunk.readInitInto(buf, size, size, maxCapacity);
320 assert success: "Failed to initialize ByteBuf with dedicated chunk";
321 } finally {
322
323
324
325 chunk.release();
326 }
327 return buf;
328 }
329
330 private Magazine getFallbackMagazine(Thread currentThread) {
331 Magazine[] mags = largeBufferMagazineGroup.magazines;
332 return mags[(int) currentThread.getId() & mags.length - 1];
333 }
334
335
336
337
338 void reallocate(int size, int maxCapacity, AdaptiveByteBuf into) {
339 AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
340 assert result == into: "Re-allocation created separate buffer instance";
341 }
342
343 @Override
344 public long usedMemory() {
345 return chunkRegistry.totalCapacity();
346 }
347
348
349
350
351 @SuppressWarnings({"FinalizeDeclaration", "deprecation"})
352 @Override
353 protected void finalize() throws Throwable {
354 try {
355 super.finalize();
356 } finally {
357 free();
358 }
359 }
360
361 private void free() {
362 largeBufferMagazineGroup.free();
363 }
364
365 @SuppressJava6Requirement(reason = "Guarded by version check")
366 private static final class MagazineGroup {
367 private final AdaptivePoolingAllocator allocator;
368 private final ChunkAllocator chunkAllocator;
369 private final ChunkManagementStrategy chunkManagementStrategy;
370 private final ChunkCache chunkCache;
371 private final StampedLock magazineExpandLock;
372 private final Magazine threadLocalMagazine;
373 private Thread ownerThread;
374 private volatile Magazine[] magazines;
375 private volatile boolean freed;
376
377 MagazineGroup(AdaptivePoolingAllocator allocator,
378 ChunkAllocator chunkAllocator,
379 ChunkManagementStrategy chunkManagementStrategy,
380 boolean isThreadLocal) {
381 this.allocator = allocator;
382 this.chunkAllocator = chunkAllocator;
383 this.chunkManagementStrategy = chunkManagementStrategy;
384 chunkCache = chunkManagementStrategy.createChunkCache(isThreadLocal);
385 if (isThreadLocal) {
386 ownerThread = Thread.currentThread();
387 magazineExpandLock = null;
388 threadLocalMagazine = new Magazine(this, false, chunkManagementStrategy.createController(this));
389 } else {
390 ownerThread = null;
391 magazineExpandLock = new StampedLock();
392 threadLocalMagazine = null;
393 Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
394 for (int i = 0; i < mags.length; i++) {
395 mags[i] = new Magazine(this, true, chunkManagementStrategy.createController(this));
396 }
397 magazines = mags;
398 }
399 }
400
401 public AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
402 boolean reallocate = buf != null;
403
404
405 Magazine tlMag = threadLocalMagazine;
406 if (tlMag != null) {
407 if (buf == null) {
408 buf = tlMag.newBuffer();
409 }
410 boolean allocated = tlMag.tryAllocate(size, maxCapacity, buf, reallocate);
411 assert allocated : "Allocation of threadLocalMagazine must always succeed";
412 return buf;
413 }
414
415
416 long threadId = currentThread.getId();
417 Magazine[] mags;
418 int expansions = 0;
419 do {
420 mags = magazines;
421 int mask = mags.length - 1;
422 int index = (int) (threadId & mask);
423 for (int i = 0, m = mags.length << 1; i < m; i++) {
424 Magazine mag = mags[index + i & mask];
425 if (buf == null) {
426 buf = mag.newBuffer();
427 }
428 if (mag.tryAllocate(size, maxCapacity, buf, reallocate)) {
429
430 return buf;
431 }
432 }
433 expansions++;
434 } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
435
436
437 if (!reallocate && buf != null) {
438 buf.release();
439 }
440 return null;
441 }
442
443 private boolean tryExpandMagazines(int currentLength) {
444 if (currentLength >= MAX_STRIPES) {
445 return true;
446 }
447 final Magazine[] mags;
448 long writeLock = magazineExpandLock.tryWriteLock();
449 if (writeLock != 0) {
450 try {
451 mags = magazines;
452 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
453 return true;
454 }
455 Magazine[] expanded = new Magazine[mags.length * 2];
456 for (int i = 0, l = expanded.length; i < l; i++) {
457 expanded[i] = new Magazine(this, true, chunkManagementStrategy.createController(this));
458 }
459 magazines = expanded;
460 } finally {
461 magazineExpandLock.unlockWrite(writeLock);
462 }
463 for (Magazine magazine : mags) {
464 magazine.free();
465 }
466 }
467 return true;
468 }
469
470 Chunk pollChunk(int size) {
471 return chunkCache.pollChunk(size);
472 }
473
474 boolean offerChunk(Chunk chunk) {
475 if (freed) {
476 return false;
477 }
478
479 if (chunk.hasUnprocessedFreelistEntries()) {
480 chunk.processFreelistEntries();
481 }
482 boolean isAdded = chunkCache.offerChunk(chunk);
483
484 if (freed && isAdded) {
485
486 freeChunkReuseQueue(ownerThread);
487 }
488 return isAdded;
489 }
490
491 private void free() {
492 freed = true;
493 Thread ownerThread = this.ownerThread;
494 if (threadLocalMagazine != null) {
495 this.ownerThread = null;
496 threadLocalMagazine.free();
497 } else {
498 long stamp = magazineExpandLock.writeLock();
499 try {
500 Magazine[] mags = magazines;
501 for (Magazine magazine : mags) {
502 magazine.free();
503 }
504 } finally {
505 magazineExpandLock.unlockWrite(stamp);
506 }
507 }
508 freeChunkReuseQueue(ownerThread);
509 }
510
511 private void freeChunkReuseQueue(Thread ownerThread) {
512 Chunk chunk;
513 while ((chunk = chunkCache.pollChunk(0)) != null) {
514 if (ownerThread != null && chunk instanceof SizeClassedChunk) {
515 SizeClassedChunk threadLocalChunk = (SizeClassedChunk) chunk;
516 assert ownerThread == threadLocalChunk.ownerThread;
517
518
519
520 threadLocalChunk.ownerThread = null;
521 }
522 chunk.markToDeallocate();
523 }
524 }
525 }
526
527 private interface ChunkCache {
528 Chunk pollChunk(int size);
529 boolean offerChunk(Chunk chunk);
530 }
531
532 private static final class ConcurrentQueueChunkCache implements ChunkCache {
533 private final Queue<SizeClassedChunk> queue;
534
535 private ConcurrentQueueChunkCache() {
536 queue = createSharedChunkQueue();
537 }
538
539 @Override
540 public SizeClassedChunk pollChunk(int size) {
541
542
543 Queue<SizeClassedChunk> queue = this.queue;
544 for (int i = 0; i < CHUNK_REUSE_QUEUE; i++) {
545 SizeClassedChunk chunk = queue.poll();
546 if (chunk == null) {
547 return null;
548 }
549 if (chunk.hasRemainingCapacity()) {
550 return chunk;
551 }
552 queue.offer(chunk);
553 }
554 return null;
555 }
556
557 @Override
558 public boolean offerChunk(Chunk chunk) {
559 return queue.offer((SizeClassedChunk) chunk);
560 }
561 }
562
563 private static final class ConcurrentSkipListChunkCache implements ChunkCache {
564 private final ConcurrentSkipListIntObjMultimap<Chunk> chunks;
565
566 private ConcurrentSkipListChunkCache() {
567 chunks = new ConcurrentSkipListIntObjMultimap<Chunk>(-1);
568 }
569
570 @Override
571 public Chunk pollChunk(int size) {
572 if (chunks.isEmpty()) {
573 return null;
574 }
575 IntEntry<Chunk> entry = chunks.pollCeilingEntry(size);
576 if (entry != null) {
577 Chunk chunk = entry.getValue();
578 if (chunk.hasUnprocessedFreelistEntries()) {
579 chunk.processFreelistEntries();
580 }
581 return chunk;
582 }
583
584 Chunk bestChunk = null;
585 int bestRemainingCapacity = 0;
586 Iterator<IntEntry<Chunk>> itr = chunks.iterator();
587 while (itr.hasNext()) {
588 entry = itr.next();
589 final Chunk chunk;
590 if (entry != null && (chunk = entry.getValue()).hasUnprocessedFreelistEntries()) {
591 if (!chunks.remove(entry.getKey(), entry.getValue())) {
592 continue;
593 }
594 chunk.processFreelistEntries();
595 int remainingCapacity = chunk.remainingCapacity();
596 if (remainingCapacity >= size &&
597 (bestChunk == null || remainingCapacity > bestRemainingCapacity)) {
598 if (bestChunk != null) {
599 chunks.put(bestRemainingCapacity, bestChunk);
600 }
601 bestChunk = chunk;
602 bestRemainingCapacity = remainingCapacity;
603 } else {
604 chunks.put(remainingCapacity, chunk);
605 }
606 }
607 }
608
609 return bestChunk;
610 }
611
612 @Override
613 public boolean offerChunk(Chunk chunk) {
614 chunks.put(chunk.remainingCapacity(), chunk);
615
616 int size = chunks.size();
617 while (size > CHUNK_REUSE_QUEUE) {
618
619 int key = -1;
620 Chunk toDeallocate = null;
621 for (IntEntry<Chunk> entry : chunks) {
622 Chunk candidate = entry.getValue();
623 if (candidate != null) {
624 if (toDeallocate == null) {
625 toDeallocate = candidate;
626 key = entry.getKey();
627 } else {
628 int candidateRefCnt = candidate.refCnt();
629 int toDeallocateRefCnt = toDeallocate.refCnt();
630 if (candidateRefCnt < toDeallocateRefCnt ||
631 candidateRefCnt == toDeallocateRefCnt &&
632 candidate.capacity() < toDeallocate.capacity()) {
633 toDeallocate = candidate;
634 key = entry.getKey();
635 }
636 }
637 }
638 }
639 if (toDeallocate == null) {
640 break;
641 }
642 if (chunks.remove(key, toDeallocate)) {
643 toDeallocate.markToDeallocate();
644 }
645 size = chunks.size();
646 }
647 return true;
648 }
649 }
650
651 private interface ChunkManagementStrategy {
652 ChunkController createController(MagazineGroup group);
653
654 ChunkCache createChunkCache(boolean isThreadLocal);
655 }
656
657 private interface ChunkController {
658
659
660
661 int computeBufferCapacity(int requestedSize, int maxCapacity, boolean isReallocation);
662
663
664
665
666 Chunk newChunkAllocation(int promptingSize, Magazine magazine);
667 }
668
669 private static final class SizeClassChunkManagementStrategy implements ChunkManagementStrategy {
670
671
672
673 private static final int MIN_SEGMENTS_PER_CHUNK = 32;
674 private final int segmentSize;
675 private final int chunkSize;
676
677 private SizeClassChunkManagementStrategy(int segmentSize) {
678 this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
679 chunkSize = Math.max(MIN_CHUNK_SIZE, segmentSize * MIN_SEGMENTS_PER_CHUNK);
680 }
681
682 @Override
683 public ChunkController createController(MagazineGroup group) {
684 return new SizeClassChunkController(group, segmentSize, chunkSize);
685 }
686
687 @Override
688 public ChunkCache createChunkCache(boolean isThreadLocal) {
689 return new ConcurrentQueueChunkCache();
690 }
691 }
692
693 private static final class SizeClassChunkController implements ChunkController {
694
695 private final ChunkAllocator chunkAllocator;
696 private final int segmentSize;
697 private final int chunkSize;
698 private final ChunkRegistry chunkRegistry;
699
700 private SizeClassChunkController(MagazineGroup group, int segmentSize, int chunkSize) {
701 chunkAllocator = group.chunkAllocator;
702 this.segmentSize = segmentSize;
703 this.chunkSize = chunkSize;
704 chunkRegistry = group.allocator.chunkRegistry;
705 }
706
707 private MpscAtomicIntegerArrayQueue createEmptyFreeList() {
708 return new MpscAtomicIntegerArrayQueue(chunkSize / segmentSize, SizeClassedChunk.FREE_LIST_EMPTY);
709 }
710
711 private MpscAtomicIntegerArrayQueue createFreeList() {
712 final int segmentsCount = chunkSize / segmentSize;
713 final MpscAtomicIntegerArrayQueue freeList = new MpscAtomicIntegerArrayQueue(
714 segmentsCount, SizeClassedChunk.FREE_LIST_EMPTY);
715 int segmentOffset = 0;
716 for (int i = 0; i < segmentsCount; i++) {
717 freeList.offer(segmentOffset);
718 segmentOffset += segmentSize;
719 }
720 return freeList;
721 }
722
723 private IntStack createLocalFreeList() {
724 final int segmentsCount = chunkSize / segmentSize;
725 int segmentOffset = chunkSize;
726 int[] offsets = new int[segmentsCount];
727 for (int i = 0; i < segmentsCount; i++) {
728 segmentOffset -= segmentSize;
729 offsets[i] = segmentOffset;
730 }
731 return new IntStack(offsets);
732 }
733
734 @Override
735 public int computeBufferCapacity(
736 int requestedSize, int maxCapacity, boolean isReallocation) {
737 return Math.min(segmentSize, maxCapacity);
738 }
739
740 @Override
741 public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
742 AbstractByteBuf chunkBuffer = chunkAllocator.allocate(chunkSize, chunkSize);
743 assert chunkBuffer.capacity() == chunkSize;
744 SizeClassedChunk chunk = new SizeClassedChunk(chunkBuffer, magazine, this);
745 chunkRegistry.add(chunk);
746 return chunk;
747 }
748 }
749
750 private static final class BuddyChunkManagementStrategy implements ChunkManagementStrategy {
751 private final AtomicInteger maxChunkSize = new AtomicInteger();
752
753 @Override
754 public ChunkController createController(MagazineGroup group) {
755 return new BuddyChunkController(group, maxChunkSize);
756 }
757
758 @Override
759 public ChunkCache createChunkCache(boolean isThreadLocal) {
760 return new ConcurrentSkipListChunkCache();
761 }
762 }
763
764 private static final class BuddyChunkController implements ChunkController {
765 private final ChunkAllocator chunkAllocator;
766 private final ChunkRegistry chunkRegistry;
767 private final AtomicInteger maxChunkSize;
768
769 BuddyChunkController(MagazineGroup group, AtomicInteger maxChunkSize) {
770 chunkAllocator = group.chunkAllocator;
771 chunkRegistry = group.allocator.chunkRegistry;
772 this.maxChunkSize = maxChunkSize;
773 }
774
775 @Override
776 public int computeBufferCapacity(int requestedSize, int maxCapacity, boolean isReallocation) {
777 return MathUtil.safeFindNextPositivePowerOfTwo(requestedSize);
778 }
779
780 @Override
781 public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
782 int maxChunkSize = this.maxChunkSize.get();
783 int proposedChunkSize = MathUtil.safeFindNextPositivePowerOfTwo(BUFS_PER_CHUNK * promptingSize);
784 int chunkSize = Math.min(MAX_CHUNK_SIZE, Math.max(maxChunkSize, proposedChunkSize));
785 if (chunkSize > maxChunkSize) {
786
787 this.maxChunkSize.set(chunkSize);
788 }
789 BuddyChunk chunk = new BuddyChunk(chunkAllocator.allocate(chunkSize, chunkSize), magazine);
790 chunkRegistry.add(chunk);
791 return chunk;
792 }
793 }
794
795 @SuppressJava6Requirement(reason = "Guarded by version check")
796 private static final class Magazine {
797 private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
798 static {
799 NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
800 }
801 private static final Chunk MAGAZINE_FREED = new Chunk();
802
803 private static final class AdaptiveRecycler extends Recycler<AdaptiveByteBuf> {
804
805 private AdaptiveRecycler() {
806 }
807
808 private AdaptiveRecycler(int maxCapacity) {
809
810 super(maxCapacity);
811 }
812
813 @Override
814 protected AdaptiveByteBuf newObject(final Handle<AdaptiveByteBuf> handle) {
815 return new AdaptiveByteBuf((EnhancedHandle<AdaptiveByteBuf>) handle);
816 }
817
818 public static AdaptiveRecycler threadLocal() {
819 return new AdaptiveRecycler();
820 }
821
822 public static AdaptiveRecycler sharedWith(int maxCapacity) {
823 return new AdaptiveRecycler(maxCapacity);
824 }
825 }
826
827 private static final AdaptiveRecycler EVENT_LOOP_LOCAL_BUFFER_POOL = AdaptiveRecycler.threadLocal();
828
829 private Chunk current;
830 @SuppressWarnings("unused")
831 private volatile Chunk nextInLine;
832 private final MagazineGroup group;
833 private final ChunkController chunkController;
834 private final StampedLock allocationLock;
835 private final AdaptiveRecycler recycler;
836
837 Magazine(MagazineGroup group, boolean shareable, ChunkController chunkController) {
838 this.group = group;
839 this.chunkController = chunkController;
840
841 if (shareable) {
842
843 allocationLock = new StampedLock();
844 recycler = AdaptiveRecycler.sharedWith(MAGAZINE_BUFFER_QUEUE_CAPACITY);
845 } else {
846 allocationLock = null;
847 recycler = null;
848 }
849 }
850
851 public boolean tryAllocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
852 if (allocationLock == null) {
853
854 return allocate(size, maxCapacity, buf, reallocate);
855 }
856
857
858 long writeLock = allocationLock.tryWriteLock();
859 if (writeLock != 0) {
860 try {
861 return allocate(size, maxCapacity, buf, reallocate);
862 } finally {
863 allocationLock.unlockWrite(writeLock);
864 }
865 }
866 return allocateWithoutLock(size, maxCapacity, buf);
867 }
868
869 private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf buf) {
870 Chunk curr = NEXT_IN_LINE.getAndSet(this, null);
871 if (curr == MAGAZINE_FREED) {
872
873 restoreMagazineFreed();
874 return false;
875 }
876 if (curr == null) {
877 curr = group.pollChunk(size);
878 if (curr == null) {
879 return false;
880 }
881 curr.attachToMagazine(this);
882 }
883 boolean allocated = false;
884 int remainingCapacity = curr.remainingCapacity();
885 int startingCapacity = chunkController.computeBufferCapacity(
886 size, maxCapacity, true );
887 if (remainingCapacity >= size &&
888 curr.readInitInto(buf, size, Math.min(remainingCapacity, startingCapacity), maxCapacity)) {
889 allocated = true;
890 remainingCapacity = curr.remainingCapacity();
891 }
892 try {
893 if (remainingCapacity >= RETIRE_CAPACITY) {
894 transferToNextInLineOrRelease(curr);
895 curr = null;
896 }
897 } finally {
898 if (curr != null) {
899 curr.releaseFromMagazine();
900 }
901 }
902 return allocated;
903 }
904
905 private boolean allocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
906 int startingCapacity = chunkController.computeBufferCapacity(size, maxCapacity, reallocate);
907 Chunk curr = current;
908 if (curr != null) {
909 boolean success = curr.readInitInto(buf, size, startingCapacity, maxCapacity);
910 int remainingCapacity = curr.remainingCapacity();
911 if (!success && remainingCapacity > 0) {
912 current = null;
913 transferToNextInLineOrRelease(curr);
914 } else if (remainingCapacity == 0) {
915 current = null;
916 curr.releaseFromMagazine();
917 }
918 if (success) {
919 return true;
920 }
921 }
922
923 assert current == null;
924
925
926
927
928
929
930
931
932 curr = NEXT_IN_LINE.getAndSet(this, null);
933 if (curr != null) {
934 if (curr == MAGAZINE_FREED) {
935
936 restoreMagazineFreed();
937 return false;
938 }
939
940 int remainingCapacity = curr.remainingCapacity();
941 if (remainingCapacity > startingCapacity &&
942 curr.readInitInto(buf, size, startingCapacity, maxCapacity)) {
943
944 current = curr;
945 return true;
946 }
947
948 try {
949 if (remainingCapacity >= size) {
950
951
952 return curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
953 }
954 } finally {
955
956
957 curr.releaseFromMagazine();
958 }
959 }
960
961
962 curr = group.pollChunk(size);
963 if (curr == null) {
964 curr = chunkController.newChunkAllocation(size, this);
965 } else {
966 curr.attachToMagazine(this);
967
968 int remainingCapacity = curr.remainingCapacity();
969 if (remainingCapacity == 0 || remainingCapacity < size) {
970
971 if (remainingCapacity < RETIRE_CAPACITY) {
972 curr.releaseFromMagazine();
973 } else {
974
975
976 transferToNextInLineOrRelease(curr);
977 }
978 curr = chunkController.newChunkAllocation(size, this);
979 }
980 }
981
982 current = curr;
983 boolean success;
984 try {
985 int remainingCapacity = curr.remainingCapacity();
986 assert remainingCapacity >= size;
987 if (remainingCapacity > startingCapacity) {
988 success = curr.readInitInto(buf, size, startingCapacity, maxCapacity);
989 curr = null;
990 } else {
991 success = curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
992 }
993 } finally {
994 if (curr != null) {
995
996
997 curr.releaseFromMagazine();
998 current = null;
999 }
1000 }
1001 return success;
1002 }
1003
1004 private void restoreMagazineFreed() {
1005 Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
1006 if (next != null && next != MAGAZINE_FREED) {
1007
1008 next.releaseFromMagazine();
1009 }
1010 }
1011
1012 private void transferToNextInLineOrRelease(Chunk chunk) {
1013 if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
1014 return;
1015 }
1016
1017 Chunk nextChunk = NEXT_IN_LINE.get(this);
1018 if (nextChunk != null && nextChunk != MAGAZINE_FREED
1019 && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
1020 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
1021 nextChunk.releaseFromMagazine();
1022 return;
1023 }
1024 }
1025
1026
1027
1028
1029 chunk.releaseFromMagazine();
1030 }
1031
1032 void free() {
1033
1034 restoreMagazineFreed();
1035 long stamp = allocationLock != null ? allocationLock.writeLock() : 0;
1036 try {
1037 if (current != null) {
1038 current.releaseFromMagazine();
1039 current = null;
1040 }
1041 } finally {
1042 if (allocationLock != null) {
1043 allocationLock.unlockWrite(stamp);
1044 }
1045 }
1046 }
1047
1048 public AdaptiveByteBuf newBuffer() {
1049 AdaptiveRecycler recycler = this.recycler;
1050 AdaptiveByteBuf buf = recycler == null? EVENT_LOOP_LOCAL_BUFFER_POOL.get() : recycler.get();
1051 buf.resetRefCnt();
1052 buf.discardMarks();
1053 return buf;
1054 }
1055
1056 boolean offerToQueue(Chunk chunk) {
1057 return group.offerChunk(chunk);
1058 }
1059 }
1060
1061 @SuppressJava6Requirement(reason = "Guarded by version check")
1062 private static final class ChunkRegistry {
1063 private final LongAdder totalCapacity = new LongAdder();
1064
1065 public long totalCapacity() {
1066 return totalCapacity.sum();
1067 }
1068
1069 public void add(Chunk chunk) {
1070 totalCapacity.add(chunk.capacity());
1071 }
1072
1073 public void remove(Chunk chunk) {
1074 totalCapacity.add(-chunk.capacity());
1075 }
1076 }
1077
1078 private static class Chunk implements ReferenceCounted {
1079 private static final long REFCNT_FIELD_OFFSET =
1080 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
1081 private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
1082 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
1083
1084 protected final AbstractByteBuf delegate;
1085 protected Magazine magazine;
1086 private final AdaptivePoolingAllocator allocator;
1087 private final int capacity;
1088 protected int allocatedBytes;
1089
1090 private static final ReferenceCountUpdater<Chunk> updater =
1091 new ReferenceCountUpdater<Chunk>() {
1092 @Override
1093 protected AtomicIntegerFieldUpdater<Chunk> updater() {
1094 return AIF_UPDATER;
1095 }
1096 @Override
1097 protected long unsafeOffset() {
1098
1099
1100 return PlatformDependent.hasUnsafe() ? REFCNT_FIELD_OFFSET : -1;
1101 }
1102 };
1103
1104
1105 @SuppressWarnings({"unused", "FieldMayBeFinal"})
1106 private volatile int refCnt;
1107
1108 Chunk() {
1109
1110 delegate = null;
1111 magazine = null;
1112 allocator = null;
1113 capacity = 0;
1114 }
1115
1116 Chunk(AbstractByteBuf delegate, Magazine magazine) {
1117 this.delegate = delegate;
1118 capacity = delegate.capacity();
1119 updater.setInitialValue(this);
1120 attachToMagazine(magazine);
1121
1122
1123 allocator = magazine.group.allocator;
1124 }
1125
1126 Magazine currentMagazine() {
1127 return magazine;
1128 }
1129
1130 void detachFromMagazine() {
1131 if (magazine != null) {
1132 magazine = null;
1133 }
1134 }
1135
1136 void attachToMagazine(Magazine magazine) {
1137 assert this.magazine == null;
1138 this.magazine = magazine;
1139 }
1140
1141 @Override
1142 public Chunk touch(Object hint) {
1143 return this;
1144 }
1145
1146 @Override
1147 public int refCnt() {
1148 return updater.refCnt(this);
1149 }
1150
1151 @Override
1152 public Chunk retain() {
1153 return updater.retain(this);
1154 }
1155
1156 @Override
1157 public Chunk retain(int increment) {
1158 return updater.retain(this, increment);
1159 }
1160
1161 @Override
1162 public Chunk touch() {
1163 return this;
1164 }
1165
1166 @Override
1167 public boolean release() {
1168 if (updater.release(this)) {
1169 deallocate();
1170 return true;
1171 }
1172 return false;
1173 }
1174
1175 @Override
1176 public boolean release(int decrement) {
1177 if (updater.release(this, decrement)) {
1178 deallocate();
1179 return true;
1180 }
1181 return false;
1182 }
1183
1184
1185
1186
1187 void releaseFromMagazine() {
1188
1189
1190 Magazine mag = magazine;
1191 detachFromMagazine();
1192 if (!mag.offerToQueue(this)) {
1193 markToDeallocate();
1194 }
1195 }
1196
1197
1198
1199
1200 void releaseSegment(int ignoredSegmentId, int size) {
1201 release();
1202 }
1203
1204 void markToDeallocate() {
1205 release();
1206 }
1207
1208 protected void deallocate() {
1209 allocator.chunkRegistry.remove(this);
1210 delegate.release();
1211 }
1212
1213 public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1214 int startIndex = allocatedBytes;
1215 allocatedBytes = startIndex + startingCapacity;
1216 Chunk chunk = this;
1217 chunk.retain();
1218 try {
1219 buf.init(delegate, chunk, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1220 chunk = null;
1221 } finally {
1222 if (chunk != null) {
1223
1224
1225
1226 allocatedBytes = startIndex;
1227 chunk.release();
1228 }
1229 }
1230 return true;
1231 }
1232
1233 public int remainingCapacity() {
1234 return capacity - allocatedBytes;
1235 }
1236
1237 public boolean hasUnprocessedFreelistEntries() {
1238 return false;
1239 }
1240
1241 public void processFreelistEntries() {
1242 }
1243
1244 public int capacity() {
1245 return capacity;
1246 }
1247 }
1248
1249 private static final class IntStack {
1250
1251 private final int[] stack;
1252 private int top;
1253
1254 IntStack(int[] initialValues) {
1255 stack = initialValues;
1256 top = initialValues.length - 1;
1257 }
1258
1259 public boolean isEmpty() {
1260 return top == -1;
1261 }
1262
1263 public int pop() {
1264 final int last = stack[top];
1265 top--;
1266 return last;
1267 }
1268
1269 public void push(int value) {
1270 stack[top + 1] = value;
1271 top++;
1272 }
1273
1274 public int size() {
1275 return top + 1;
1276 }
1277 }
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298 private static final class SizeClassedChunk extends Chunk {
1299 private static final int FREE_LIST_EMPTY = -1;
1300 private static final int AVAILABLE = -1;
1301
1302
1303 private static final int DEALLOCATED = Integer.MIN_VALUE;
1304 private static final AtomicIntegerFieldUpdater<SizeClassedChunk> STATE =
1305 AtomicIntegerFieldUpdater.newUpdater(SizeClassedChunk.class, "state");
1306 private volatile int state;
1307 private final int segments;
1308 private final int segmentSize;
1309 private final MpscIntQueue externalFreeList;
1310 private final IntStack localFreeList;
1311 private Thread ownerThread;
1312
1313 SizeClassedChunk(AbstractByteBuf delegate, Magazine magazine,
1314 SizeClassChunkController controller) {
1315 super(delegate, magazine);
1316 segmentSize = controller.segmentSize;
1317 segments = controller.chunkSize / segmentSize;
1318 STATE.lazySet(this, AVAILABLE);
1319 ownerThread = magazine.group.ownerThread;
1320 if (ownerThread == null) {
1321 externalFreeList = controller.createFreeList();
1322 localFreeList = null;
1323 } else {
1324 externalFreeList = controller.createEmptyFreeList();
1325 localFreeList = controller.createLocalFreeList();
1326 }
1327 }
1328
1329 @Override
1330 public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1331 assert state == AVAILABLE;
1332 final int startIndex = nextAvailableSegmentOffset();
1333 if (startIndex == FREE_LIST_EMPTY) {
1334 return false;
1335 }
1336 allocatedBytes += segmentSize;
1337 try {
1338 buf.init(delegate, this, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1339 } catch (Throwable t) {
1340 allocatedBytes -= segmentSize;
1341 releaseSegmentOffsetIntoFreeList(startIndex);
1342 PlatformDependent.throwException(t);
1343 }
1344 return true;
1345 }
1346
1347 private int nextAvailableSegmentOffset() {
1348 final int startIndex;
1349 IntStack localFreeList = this.localFreeList;
1350 if (localFreeList != null) {
1351 assert Thread.currentThread() == ownerThread;
1352 if (localFreeList.isEmpty()) {
1353 startIndex = externalFreeList.poll();
1354 } else {
1355 startIndex = localFreeList.pop();
1356 }
1357 } else {
1358 startIndex = externalFreeList.poll();
1359 }
1360 return startIndex;
1361 }
1362
1363
1364
1365
1366 public boolean hasRemainingCapacity() {
1367 int remaining = super.remainingCapacity();
1368 if (remaining > 0) {
1369 return true;
1370 }
1371 if (localFreeList != null) {
1372 return !localFreeList.isEmpty();
1373 }
1374 return !externalFreeList.isEmpty();
1375 }
1376
1377 @Override
1378 public int remainingCapacity() {
1379 int remaining = super.remainingCapacity();
1380 return remaining > segmentSize ? remaining : updateRemainingCapacity(remaining);
1381 }
1382
1383 private int updateRemainingCapacity(int snapshotted) {
1384 int freeSegments = externalFreeList.size();
1385 IntStack localFreeList = this.localFreeList;
1386 if (localFreeList != null) {
1387 freeSegments += localFreeList.size();
1388 }
1389 int updated = freeSegments * segmentSize;
1390 if (updated != snapshotted) {
1391 allocatedBytes = capacity() - updated;
1392 }
1393 return updated;
1394 }
1395
1396 private void releaseSegmentOffsetIntoFreeList(int startIndex) {
1397 IntStack localFreeList = this.localFreeList;
1398 if (localFreeList != null && Thread.currentThread() == ownerThread) {
1399 localFreeList.push(startIndex);
1400 } else {
1401 boolean segmentReturned = externalFreeList.offer(startIndex);
1402 assert segmentReturned : "Unable to return segment " + startIndex + " to free list";
1403 }
1404 }
1405
1406 @Override
1407 void releaseSegment(int startIndex, int size) {
1408 IntStack localFreeList = this.localFreeList;
1409 if (localFreeList != null && Thread.currentThread() == ownerThread) {
1410 localFreeList.push(startIndex);
1411 int state = this.state;
1412 if (state != AVAILABLE) {
1413 updateStateOnLocalReleaseSegment(state, localFreeList);
1414 }
1415 } else {
1416 boolean segmentReturned = externalFreeList.offer(startIndex);
1417 assert segmentReturned;
1418
1419 int state = this.state;
1420 if (state != AVAILABLE) {
1421 deallocateIfNeeded(state);
1422 }
1423 }
1424 }
1425
1426 private void updateStateOnLocalReleaseSegment(int previousLocalSize, IntStack localFreeList) {
1427 int newLocalSize = localFreeList.size();
1428 boolean alwaysTrue = STATE.compareAndSet(this, previousLocalSize, newLocalSize);
1429 assert alwaysTrue : "this shouldn't happen unless double release in the local free list";
1430 deallocateIfNeeded(newLocalSize);
1431 }
1432
1433 private void deallocateIfNeeded(int localSize) {
1434
1435 int totalFreeSegments = localSize + externalFreeList.size();
1436 if (totalFreeSegments == segments && STATE.compareAndSet(this, localSize, DEALLOCATED)) {
1437 deallocate();
1438 }
1439 }
1440
1441 @Override
1442 void markToDeallocate() {
1443 IntStack localFreeList = this.localFreeList;
1444 int localSize = localFreeList != null ? localFreeList.size() : 0;
1445 STATE.set(this, localSize);
1446 deallocateIfNeeded(localSize);
1447 }
1448 }
1449
1450 private static final class BuddyChunk extends Chunk implements IntConsumer {
1451 private static final int MIN_BUDDY_SIZE = 32768;
1452 private static final byte IS_CLAIMED = (byte) (1 << 7);
1453 private static final byte HAS_CLAIMED_CHILDREN = 1 << 6;
1454 private static final byte SHIFT_MASK = ~(IS_CLAIMED | HAS_CLAIMED_CHILDREN);
1455 private static final int PACK_OFFSET_MASK = 0xFFFF;
1456 private static final int PACK_SIZE_SHIFT = Integer.SIZE - Integer.numberOfLeadingZeros(PACK_OFFSET_MASK);
1457
1458 private final MpscAtomicIntegerArrayQueue freeList;
1459
1460 private final byte[] buddies;
1461 private final int freeListCapacity;
1462
1463 BuddyChunk(AbstractByteBuf delegate, Magazine magazine) {
1464 super(delegate, magazine);
1465 freeListCapacity = delegate.capacity() / MIN_BUDDY_SIZE;
1466 int maxShift = Integer.numberOfTrailingZeros(freeListCapacity);
1467 assert maxShift <= 30;
1468
1469 freeList = new MpscAtomicIntegerArrayQueue(freeListCapacity, -1);
1470 buddies = new byte[freeListCapacity << 1];
1471
1472
1473 int index = 1;
1474 int runLength = 1;
1475 int currentRun = 0;
1476 while (maxShift > 0) {
1477 buddies[index++] = (byte) maxShift;
1478 if (++currentRun == runLength) {
1479 currentRun = 0;
1480 runLength <<= 1;
1481 maxShift--;
1482 }
1483 }
1484 }
1485
1486 @Override
1487 public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1488 if (!freeList.isEmpty()) {
1489 freeList.drain(freeListCapacity, this);
1490 }
1491 int startIndex = chooseFirstFreeBuddy(1, startingCapacity, 0);
1492 if (startIndex == -1) {
1493 return false;
1494 }
1495 Chunk chunk = this;
1496 chunk.retain();
1497 try {
1498 buf.init(delegate, this, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1499 allocatedBytes += startingCapacity;
1500 chunk = null;
1501 } finally {
1502 if (chunk != null) {
1503 unreserveMatchingBuddy(1, startingCapacity, startIndex, 0);
1504
1505
1506 chunk.release();
1507 }
1508 }
1509 return true;
1510 }
1511
1512 @Override
1513 public void accept(int packed) {
1514
1515 int size = unpackSize(packed);
1516 int offset = unpackOffset(packed);
1517 unreserveMatchingBuddy(1, size, offset, 0);
1518 allocatedBytes -= size;
1519 }
1520
1521 private static int unpackSize(int packed) {
1522 return MIN_BUDDY_SIZE << (packed >> PACK_SIZE_SHIFT);
1523 }
1524
1525 private static int unpackOffset(int packed) {
1526 return (packed & PACK_OFFSET_MASK) * MIN_BUDDY_SIZE;
1527 }
1528
1529 @Override
1530 void releaseSegment(int startingIndex, int size) {
1531 int packedOffset = startingIndex / MIN_BUDDY_SIZE;
1532 int packedSize = Integer.numberOfTrailingZeros(size / MIN_BUDDY_SIZE) << PACK_SIZE_SHIFT;
1533 int packed = packedOffset | packedSize;
1534 freeList.offer(packed);
1535 release();
1536 }
1537
1538 @Override
1539 public int remainingCapacity() {
1540 int capacityInFreeList = 0;
1541 if (!freeList.isEmpty()) {
1542 capacityInFreeList = freeList.weakPeekReduce(freeListCapacity, 0,
1543 new MpscAtomicIntegerArrayQueue.IntBinaryOperator() {
1544 @Override
1545 public int applyAsInt(int sum, int entry) {
1546 return sum + unpackSize(entry);
1547 }
1548 });
1549 }
1550 return super.remainingCapacity() + capacityInFreeList;
1551 }
1552
1553 @Override
1554 public boolean hasUnprocessedFreelistEntries() {
1555 return !freeList.isEmpty();
1556 }
1557
1558 @Override
1559 public void processFreelistEntries() {
1560 freeList.drain(freeListCapacity, this);
1561 }
1562
1563
1564
1565
1566 private int chooseFirstFreeBuddy(int index, int size, int currOffset) {
1567 byte[] buddies = this.buddies;
1568 while (index < buddies.length) {
1569 byte buddy = buddies[index];
1570 int currValue = MIN_BUDDY_SIZE << (buddy & SHIFT_MASK);
1571 if (currValue < size || (buddy & IS_CLAIMED) == IS_CLAIMED) {
1572 return -1;
1573 }
1574 if (currValue == size && (buddy & HAS_CLAIMED_CHILDREN) == 0) {
1575 buddies[index] |= IS_CLAIMED;
1576 return currOffset;
1577 }
1578 int found = chooseFirstFreeBuddy(index << 1, size, currOffset);
1579 if (found != -1) {
1580 buddies[index] |= HAS_CLAIMED_CHILDREN;
1581 return found;
1582 }
1583 index = (index << 1) + 1;
1584 currOffset += currValue >> 1;
1585 }
1586 return -1;
1587 }
1588
1589
1590
1591
1592 private boolean unreserveMatchingBuddy(int index, int size, int offset, int currOffset) {
1593 byte[] buddies = this.buddies;
1594 if (buddies.length <= index) {
1595 return false;
1596 }
1597 byte buddy = buddies[index];
1598 int currSize = MIN_BUDDY_SIZE << (buddy & SHIFT_MASK);
1599
1600 if (currSize == size) {
1601
1602 if (currOffset == offset) {
1603 buddies[index] &= SHIFT_MASK;
1604 return false;
1605 }
1606 throw new IllegalStateException("The intended segment was not found at index " +
1607 index + ", for size " + size + " and offset " + offset);
1608 }
1609
1610
1611 boolean claims;
1612 int siblingIndex;
1613 if (offset < currOffset + (currSize >> 1)) {
1614
1615 claims = unreserveMatchingBuddy(index << 1, size, offset, currOffset);
1616 siblingIndex = (index << 1) + 1;
1617 } else {
1618
1619 claims = unreserveMatchingBuddy((index << 1) + 1, size, offset, currOffset + (currSize >> 1));
1620 siblingIndex = index << 1;
1621 }
1622 if (!claims) {
1623
1624 byte sibling = buddies[siblingIndex];
1625 if ((sibling & SHIFT_MASK) == sibling) {
1626
1627 buddies[index] &= SHIFT_MASK;
1628 return false;
1629 }
1630 }
1631 return true;
1632 }
1633
1634 @Override
1635 public String toString() {
1636 int capacity = delegate.capacity();
1637 int remaining = capacity - allocatedBytes;
1638 return "BuddyChunk[capacity: " + capacity +
1639 ", remaining: " + remaining +
1640 ", free list: " + freeList.size() + ']';
1641 }
1642 }
1643
1644 static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
1645
1646 private final EnhancedHandle<AdaptiveByteBuf> handle;
1647
1648
1649 private int startIndex;
1650 private AbstractByteBuf rootParent;
1651 Chunk chunk;
1652 private int length;
1653 private int maxFastCapacity;
1654 private ByteBuffer tmpNioBuf;
1655 private boolean hasArray;
1656 private boolean hasMemoryAddress;
1657
1658 AdaptiveByteBuf(EnhancedHandle<AdaptiveByteBuf> recyclerHandle) {
1659 super(0);
1660 handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
1661 }
1662
1663 void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
1664 int startIndex, int size, int capacity, int maxCapacity) {
1665 this.startIndex = startIndex;
1666 chunk = wrapped;
1667 length = size;
1668 maxFastCapacity = capacity;
1669 maxCapacity(maxCapacity);
1670 setIndex0(readerIndex, writerIndex);
1671 hasArray = unwrapped.hasArray();
1672 hasMemoryAddress = unwrapped.hasMemoryAddress();
1673 rootParent = unwrapped;
1674 tmpNioBuf = null;
1675 }
1676
1677 private AbstractByteBuf rootParent() {
1678 final AbstractByteBuf rootParent = this.rootParent;
1679 if (rootParent != null) {
1680 return rootParent;
1681 }
1682 throw new IllegalReferenceCountException();
1683 }
1684
1685 @Override
1686 public int capacity() {
1687 return length;
1688 }
1689
1690 @Override
1691 public int maxFastWritableBytes() {
1692 return Math.min(maxFastCapacity, maxCapacity()) - writerIndex;
1693 }
1694
1695 @Override
1696 public ByteBuf capacity(int newCapacity) {
1697 checkNewCapacity(newCapacity);
1698 if (length <= newCapacity && newCapacity <= maxFastCapacity) {
1699 length = newCapacity;
1700 return this;
1701 }
1702 if (newCapacity < capacity()) {
1703 length = newCapacity;
1704 trimIndicesToCapacity(newCapacity);
1705 return this;
1706 }
1707
1708
1709 Chunk chunk = this.chunk;
1710 AdaptivePoolingAllocator allocator = chunk.allocator;
1711 int readerIndex = this.readerIndex;
1712 int writerIndex = this.writerIndex;
1713 int baseOldRootIndex = startIndex;
1714 int oldLength = length;
1715 int oldCapacity = maxFastCapacity;
1716 AbstractByteBuf oldRoot = rootParent();
1717 allocator.reallocate(newCapacity, maxCapacity(), this);
1718 oldRoot.getBytes(baseOldRootIndex, this, 0, oldLength);
1719 chunk.releaseSegment(baseOldRootIndex, oldCapacity);
1720 assert oldCapacity < maxFastCapacity && newCapacity <= maxFastCapacity:
1721 "Capacity increase failed";
1722 this.readerIndex = readerIndex;
1723 this.writerIndex = writerIndex;
1724 return this;
1725 }
1726
1727 @Override
1728 public ByteBufAllocator alloc() {
1729 return rootParent().alloc();
1730 }
1731
1732 @SuppressWarnings("deprecation")
1733 @Override
1734 public ByteOrder order() {
1735 return rootParent().order();
1736 }
1737
1738 @Override
1739 public ByteBuf unwrap() {
1740 return null;
1741 }
1742
1743 @Override
1744 public boolean isDirect() {
1745 return rootParent().isDirect();
1746 }
1747
1748 @Override
1749 public int arrayOffset() {
1750 return idx(rootParent().arrayOffset());
1751 }
1752
1753 @Override
1754 public boolean hasMemoryAddress() {
1755 return hasMemoryAddress;
1756 }
1757
1758 @Override
1759 public long memoryAddress() {
1760 ensureAccessible();
1761 return rootParent().memoryAddress() + startIndex;
1762 }
1763
1764 @Override
1765 public ByteBuffer nioBuffer(int index, int length) {
1766 checkIndex(index, length);
1767 return rootParent().nioBuffer(idx(index), length);
1768 }
1769
1770 @Override
1771 public ByteBuffer internalNioBuffer(int index, int length) {
1772 checkIndex(index, length);
1773 return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1774 }
1775
1776 private ByteBuffer internalNioBuffer() {
1777 if (tmpNioBuf == null) {
1778 tmpNioBuf = rootParent().nioBuffer(startIndex, maxFastCapacity);
1779 }
1780 return (ByteBuffer) tmpNioBuf.clear();
1781 }
1782
1783 @Override
1784 public ByteBuffer[] nioBuffers(int index, int length) {
1785 checkIndex(index, length);
1786 return rootParent().nioBuffers(idx(index), length);
1787 }
1788
1789 @Override
1790 public boolean hasArray() {
1791 return hasArray;
1792 }
1793
1794 @Override
1795 public byte[] array() {
1796 ensureAccessible();
1797 return rootParent().array();
1798 }
1799
1800 @Override
1801 public ByteBuf copy(int index, int length) {
1802 checkIndex(index, length);
1803 return rootParent().copy(idx(index), length);
1804 }
1805
1806 @Override
1807 public int nioBufferCount() {
1808 return rootParent().nioBufferCount();
1809 }
1810
1811 @Override
1812 protected byte _getByte(int index) {
1813 return rootParent()._getByte(idx(index));
1814 }
1815
1816 @Override
1817 protected short _getShort(int index) {
1818 return rootParent()._getShort(idx(index));
1819 }
1820
1821 @Override
1822 protected short _getShortLE(int index) {
1823 return rootParent()._getShortLE(idx(index));
1824 }
1825
1826 @Override
1827 protected int _getUnsignedMedium(int index) {
1828 return rootParent()._getUnsignedMedium(idx(index));
1829 }
1830
1831 @Override
1832 protected int _getUnsignedMediumLE(int index) {
1833 return rootParent()._getUnsignedMediumLE(idx(index));
1834 }
1835
1836 @Override
1837 protected int _getInt(int index) {
1838 return rootParent()._getInt(idx(index));
1839 }
1840
1841 @Override
1842 protected int _getIntLE(int index) {
1843 return rootParent()._getIntLE(idx(index));
1844 }
1845
1846 @Override
1847 protected long _getLong(int index) {
1848 return rootParent()._getLong(idx(index));
1849 }
1850
1851 @Override
1852 protected long _getLongLE(int index) {
1853 return rootParent()._getLongLE(idx(index));
1854 }
1855
1856 @Override
1857 public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1858 checkIndex(index, length);
1859 rootParent().getBytes(idx(index), dst, dstIndex, length);
1860 return this;
1861 }
1862
1863 @Override
1864 public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1865 checkIndex(index, length);
1866 rootParent().getBytes(idx(index), dst, dstIndex, length);
1867 return this;
1868 }
1869
1870 @Override
1871 public ByteBuf getBytes(int index, ByteBuffer dst) {
1872 checkIndex(index, dst.remaining());
1873 rootParent().getBytes(idx(index), dst);
1874 return this;
1875 }
1876
1877 @Override
1878 protected void _setByte(int index, int value) {
1879 rootParent()._setByte(idx(index), value);
1880 }
1881
1882 @Override
1883 protected void _setShort(int index, int value) {
1884 rootParent()._setShort(idx(index), value);
1885 }
1886
1887 @Override
1888 protected void _setShortLE(int index, int value) {
1889 rootParent()._setShortLE(idx(index), value);
1890 }
1891
1892 @Override
1893 protected void _setMedium(int index, int value) {
1894 rootParent()._setMedium(idx(index), value);
1895 }
1896
1897 @Override
1898 protected void _setMediumLE(int index, int value) {
1899 rootParent()._setMediumLE(idx(index), value);
1900 }
1901
1902 @Override
1903 protected void _setInt(int index, int value) {
1904 rootParent()._setInt(idx(index), value);
1905 }
1906
1907 @Override
1908 protected void _setIntLE(int index, int value) {
1909 rootParent()._setIntLE(idx(index), value);
1910 }
1911
1912 @Override
1913 protected void _setLong(int index, long value) {
1914 rootParent()._setLong(idx(index), value);
1915 }
1916
1917 @Override
1918 protected void _setLongLE(int index, long value) {
1919 rootParent().setLongLE(idx(index), value);
1920 }
1921
1922 @Override
1923 public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1924 checkIndex(index, length);
1925 ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1926 tmp.put(src, srcIndex, length);
1927 return this;
1928 }
1929
1930 @Override
1931 public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1932 checkIndex(index, length);
1933 ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1934 tmp.put(src.nioBuffer(srcIndex, length));
1935 return this;
1936 }
1937
1938 @Override
1939 public ByteBuf setBytes(int index, ByteBuffer src) {
1940 checkIndex(index, src.remaining());
1941 ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1942 tmp.put(src);
1943 return this;
1944 }
1945
1946 @Override
1947 public ByteBuf getBytes(int index, OutputStream out, int length)
1948 throws IOException {
1949 checkIndex(index, length);
1950 if (length != 0) {
1951 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1952 }
1953 return this;
1954 }
1955
1956 @Override
1957 public int getBytes(int index, GatheringByteChannel out, int length)
1958 throws IOException {
1959 ByteBuffer buf = internalNioBuffer().duplicate();
1960 buf.clear().position(index).limit(index + length);
1961 return out.write(buf);
1962 }
1963
1964 @Override
1965 public int getBytes(int index, FileChannel out, long position, int length)
1966 throws IOException {
1967 ByteBuffer buf = internalNioBuffer().duplicate();
1968 buf.clear().position(index).limit(index + length);
1969 return out.write(buf, position);
1970 }
1971
1972 @Override
1973 public int setBytes(int index, InputStream in, int length)
1974 throws IOException {
1975 checkIndex(index, length);
1976 final AbstractByteBuf rootParent = rootParent();
1977 if (rootParent.hasArray()) {
1978 return rootParent.setBytes(idx(index), in, length);
1979 }
1980 byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1981 int readBytes = in.read(tmp, 0, length);
1982 if (readBytes <= 0) {
1983 return readBytes;
1984 }
1985 setBytes(index, tmp, 0, readBytes);
1986 return readBytes;
1987 }
1988
1989 @Override
1990 public int setBytes(int index, ScatteringByteChannel in, int length)
1991 throws IOException {
1992 try {
1993 return in.read(internalNioBuffer(index, length));
1994 } catch (ClosedChannelException ignored) {
1995 return -1;
1996 }
1997 }
1998
1999 @Override
2000 public int setBytes(int index, FileChannel in, long position, int length)
2001 throws IOException {
2002 try {
2003 return in.read(internalNioBuffer(index, length), position);
2004 } catch (ClosedChannelException ignored) {
2005 return -1;
2006 }
2007 }
2008
2009 @Override
2010 public int setCharSequence(int index, CharSequence sequence, Charset charset) {
2011 return setCharSequence0(index, sequence, charset, false);
2012 }
2013
2014 private int setCharSequence0(int index, CharSequence sequence, Charset charset, boolean expand) {
2015 if (charset.equals(CharsetUtil.UTF_8)) {
2016 int length = ByteBufUtil.utf8MaxBytes(sequence);
2017 if (expand) {
2018 ensureWritable0(length);
2019 checkIndex0(index, length);
2020 } else {
2021 checkIndex(index, length);
2022 }
2023 return ByteBufUtil.writeUtf8(this, index, length, sequence, sequence.length());
2024 }
2025 if (charset.equals(CharsetUtil.US_ASCII) || charset.equals(CharsetUtil.ISO_8859_1)) {
2026 int length = sequence.length();
2027 if (expand) {
2028 ensureWritable0(length);
2029 checkIndex0(index, length);
2030 } else {
2031 checkIndex(index, length);
2032 }
2033 return ByteBufUtil.writeAscii(this, index, sequence, length);
2034 }
2035 byte[] bytes = sequence.toString().getBytes(charset);
2036 if (expand) {
2037 ensureWritable0(bytes.length);
2038
2039 }
2040 setBytes(index, bytes);
2041 return bytes.length;
2042 }
2043
2044 @Override
2045 public int writeCharSequence(CharSequence sequence, Charset charset) {
2046 int written = setCharSequence0(writerIndex, sequence, charset, true);
2047 writerIndex += written;
2048 return written;
2049 }
2050
2051 @Override
2052 public int forEachByte(int index, int length, ByteProcessor processor) {
2053 checkIndex(index, length);
2054 int ret = rootParent().forEachByte(idx(index), length, processor);
2055 return forEachResult(ret);
2056 }
2057
2058 @Override
2059 public int forEachByteDesc(int index, int length, ByteProcessor processor) {
2060 checkIndex(index, length);
2061 int ret = rootParent().forEachByteDesc(idx(index), length, processor);
2062 return forEachResult(ret);
2063 }
2064
2065 @Override
2066 public ByteBuf setZero(int index, int length) {
2067 checkIndex(index, length);
2068 rootParent().setZero(idx(index), length);
2069 return this;
2070 }
2071
2072 @Override
2073 public ByteBuf writeZero(int length) {
2074 ensureWritable(length);
2075 rootParent().setZero(idx(writerIndex), length);
2076 writerIndex += length;
2077 return this;
2078 }
2079
2080 private int forEachResult(int ret) {
2081 if (ret < startIndex) {
2082 return -1;
2083 }
2084 return ret - startIndex;
2085 }
2086
2087 @Override
2088 public boolean isContiguous() {
2089 return rootParent().isContiguous();
2090 }
2091
2092 private int idx(int index) {
2093 return index + startIndex;
2094 }
2095
2096 @Override
2097 protected void deallocate() {
2098 if (chunk != null) {
2099 chunk.releaseSegment(startIndex, maxFastCapacity);
2100 }
2101 tmpNioBuf = null;
2102 chunk = null;
2103 rootParent = null;
2104 handle.unguardedRecycle(this);
2105 }
2106 }
2107
2108
2109
2110
2111 interface ChunkAllocator {
2112
2113
2114
2115
2116
2117
2118 AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
2119 }
2120 }