1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.ByteProcessor;
19 import io.netty.util.CharsetUtil;
20 import io.netty.util.IllegalReferenceCountException;
21 import io.netty.util.IntConsumer;
22 import io.netty.util.NettyRuntime;
23 import io.netty.util.Recycler;
24 import io.netty.util.Recycler.EnhancedHandle;
25 import io.netty.util.ReferenceCounted;
26 import io.netty.util.concurrent.ConcurrentSkipListIntObjMultimap;
27 import io.netty.util.concurrent.ConcurrentSkipListIntObjMultimap.IntEntry;
28 import io.netty.util.concurrent.FastThreadLocal;
29 import io.netty.util.concurrent.FastThreadLocalThread;
30 import io.netty.util.concurrent.MpscAtomicIntegerArrayQueue;
31 import io.netty.util.concurrent.MpscIntQueue;
32 import io.netty.util.internal.MathUtil;
33 import io.netty.util.internal.ObjectUtil;
34 import io.netty.util.internal.PlatformDependent;
35 import io.netty.util.internal.ReferenceCountUpdater;
36 import io.netty.util.internal.SuppressJava6Requirement;
37 import io.netty.util.internal.SystemPropertyUtil;
38 import io.netty.util.internal.ThreadExecutorMap;
39 import io.netty.util.internal.UnstableApi;
40
41 import java.io.IOException;
42 import java.io.InputStream;
43 import java.io.OutputStream;
44 import java.nio.ByteBuffer;
45 import java.nio.ByteOrder;
46 import java.nio.channels.ClosedChannelException;
47 import java.nio.channels.FileChannel;
48 import java.nio.channels.GatheringByteChannel;
49 import java.nio.channels.ScatteringByteChannel;
50 import java.nio.charset.Charset;
51 import java.util.Arrays;
52 import java.util.Iterator;
53 import java.util.Queue;
54 import java.util.concurrent.ConcurrentLinkedQueue;
55 import java.util.concurrent.atomic.AtomicInteger;
56 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
57 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
58 import java.util.concurrent.atomic.LongAdder;
59 import java.util.concurrent.locks.StampedLock;
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87 @SuppressJava6Requirement(reason = "Guarded by version check")
88 @UnstableApi
89 final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.AdaptiveAllocatorApi {
90 private static final int LOW_MEM_THRESHOLD = 512 * 1024 * 1024;
91 private static final boolean IS_LOW_MEM = Runtime.getRuntime().maxMemory() <= LOW_MEM_THRESHOLD;
92
93
94
95
96
97 private static final boolean DISABLE_THREAD_LOCAL_MAGAZINES_ON_LOW_MEM = SystemPropertyUtil.getBoolean(
98 "io.netty.allocator.disableThreadLocalMagazinesOnLowMemory", true);
99
100
101
102
103
104
105
106
107 static final int MIN_CHUNK_SIZE = 128 * 1024;
108 private static final int EXPANSION_ATTEMPTS = 3;
109 private static final int INITIAL_MAGAZINES = 1;
110 private static final int RETIRE_CAPACITY = 256;
111 private static final int MAX_STRIPES = IS_LOW_MEM ? 1 : NettyRuntime.availableProcessors() * 2;
112 private static final int BUFS_PER_CHUNK = 8;
113
114
115
116
117
118
119 private static final int MAX_CHUNK_SIZE = IS_LOW_MEM ?
120 2 * 1024 * 1024 :
121 8 * 1024 * 1024;
122 private static final int MAX_POOLED_BUF_SIZE = MAX_CHUNK_SIZE / BUFS_PER_CHUNK;
123
124
125
126
127
128
129 private static final int CHUNK_REUSE_QUEUE = Math.max(2, SystemPropertyUtil.getInt(
130 "io.netty.allocator.chunkReuseQueueCapacity", NettyRuntime.availableProcessors() * 2));
131
132
133
134
135
136 private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
137 "io.netty.allocator.magazineBufferQueueCapacity", 1024);
138
139
140
141
142
143
144
145
146
147
148
149
150
151 private static final int[] SIZE_CLASSES = {
152 32,
153 64,
154 128,
155 256,
156 512,
157 640,
158 1024,
159 1152,
160 2048,
161 2304,
162 4096,
163 4352,
164 8192,
165 8704,
166 16384,
167 16896,
168 };
169
170 private static final int SIZE_CLASSES_COUNT = SIZE_CLASSES.length;
171 private static final byte[] SIZE_INDEXES = new byte[SIZE_CLASSES[SIZE_CLASSES_COUNT - 1] / 32 + 1];
172
173 static {
174 if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
175 throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
176 + " (expected: >= " + 2 + ')');
177 }
178 int lastIndex = 0;
179 for (int i = 0; i < SIZE_CLASSES_COUNT; i++) {
180 int sizeClass = SIZE_CLASSES[i];
181
182 assert (sizeClass & 5) == 0 : "Size class must be a multiple of 32";
183 int sizeIndex = sizeIndexOf(sizeClass);
184 Arrays.fill(SIZE_INDEXES, lastIndex + 1, sizeIndex + 1, (byte) i);
185 lastIndex = sizeIndex;
186 }
187 }
188
189 private final ChunkAllocator chunkAllocator;
190 private final ChunkRegistry chunkRegistry;
191 private final MagazineGroup[] sizeClassedMagazineGroups;
192 private final MagazineGroup largeBufferMagazineGroup;
193 private final FastThreadLocal<MagazineGroup[]> threadLocalGroup;
194
195 AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, final boolean useCacheForNonEventLoopThreads) {
196 this.chunkAllocator = ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
197 chunkRegistry = new ChunkRegistry();
198 sizeClassedMagazineGroups = createMagazineGroupSizeClasses(this, false);
199 largeBufferMagazineGroup = new MagazineGroup(
200 this, chunkAllocator, new BuddyChunkManagementStrategy(), false);
201
202 boolean disableThreadLocalGroups = IS_LOW_MEM && DISABLE_THREAD_LOCAL_MAGAZINES_ON_LOW_MEM;
203 threadLocalGroup = disableThreadLocalGroups ? null : new FastThreadLocal<MagazineGroup[]>() {
204 @Override
205 protected MagazineGroup[] initialValue() {
206 if (useCacheForNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
207 return createMagazineGroupSizeClasses(AdaptivePoolingAllocator.this, true);
208 }
209 return null;
210 }
211
212 @Override
213 protected void onRemoval(final MagazineGroup[] groups) throws Exception {
214 if (groups != null) {
215 for (MagazineGroup group : groups) {
216 group.free();
217 }
218 }
219 }
220 };
221 }
222
223 private static MagazineGroup[] createMagazineGroupSizeClasses(
224 AdaptivePoolingAllocator allocator, boolean isThreadLocal) {
225 MagazineGroup[] groups = new MagazineGroup[SIZE_CLASSES.length];
226 for (int i = 0; i < SIZE_CLASSES.length; i++) {
227 int segmentSize = SIZE_CLASSES[i];
228 groups[i] = new MagazineGroup(allocator, allocator.chunkAllocator,
229 new SizeClassChunkManagementStrategy(segmentSize), isThreadLocal);
230 }
231 return groups;
232 }
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254 private static Queue<Chunk> createSharedChunkQueue() {
255 return PlatformDependent.newFixedMpmcQueue(CHUNK_REUSE_QUEUE);
256 }
257
258 @Override
259 public ByteBuf allocate(int size, int maxCapacity) {
260 return allocate(size, maxCapacity, Thread.currentThread(), null);
261 }
262
263 private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
264 AdaptiveByteBuf allocated = null;
265 if (size <= MAX_POOLED_BUF_SIZE) {
266 final int index = sizeClassIndexOf(size);
267 MagazineGroup[] magazineGroups;
268 if (!FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread()) ||
269 IS_LOW_MEM ||
270 (magazineGroups = threadLocalGroup.get()) == null) {
271 magazineGroups = sizeClassedMagazineGroups;
272 }
273 if (index < magazineGroups.length) {
274 allocated = magazineGroups[index].allocate(size, maxCapacity, currentThread, buf);
275 } else if (!IS_LOW_MEM) {
276 allocated = largeBufferMagazineGroup.allocate(size, maxCapacity, currentThread, buf);
277 }
278 }
279 if (allocated == null) {
280 allocated = allocateFallback(size, maxCapacity, currentThread, buf);
281 }
282 return allocated;
283 }
284
285 private static int sizeIndexOf(final int size) {
286
287 return size + 31 >> 5;
288 }
289
290 static int sizeClassIndexOf(int size) {
291 int sizeIndex = sizeIndexOf(size);
292 if (sizeIndex < SIZE_INDEXES.length) {
293 return SIZE_INDEXES[sizeIndex];
294 }
295 return SIZE_CLASSES_COUNT;
296 }
297
298 static int[] getSizeClasses() {
299 return SIZE_CLASSES.clone();
300 }
301
302 private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
303
304 Magazine magazine;
305 if (buf != null) {
306 Chunk chunk = buf.chunk;
307 if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
308 magazine = getFallbackMagazine(currentThread);
309 }
310 } else {
311 magazine = getFallbackMagazine(currentThread);
312 buf = magazine.newBuffer();
313 }
314
315 AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
316 Chunk chunk = new Chunk(innerChunk, magazine);
317 chunkRegistry.add(chunk);
318 try {
319 boolean success = chunk.readInitInto(buf, size, size, maxCapacity);
320 assert success: "Failed to initialize ByteBuf with dedicated chunk";
321 } finally {
322
323
324
325 chunk.release();
326 }
327 return buf;
328 }
329
330 private Magazine getFallbackMagazine(Thread currentThread) {
331 Magazine[] mags = largeBufferMagazineGroup.magazines;
332 return mags[(int) currentThread.getId() & mags.length - 1];
333 }
334
335
336
337
338 void reallocate(int size, int maxCapacity, AdaptiveByteBuf into) {
339 AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
340 assert result == into: "Re-allocation created separate buffer instance";
341 }
342
343 @Override
344 public long usedMemory() {
345 return chunkRegistry.totalCapacity();
346 }
347
348
349
350
351 @SuppressWarnings({"FinalizeDeclaration", "deprecation"})
352 @Override
353 protected void finalize() throws Throwable {
354 try {
355 super.finalize();
356 } finally {
357 free();
358 }
359 }
360
361 private void free() {
362 largeBufferMagazineGroup.free();
363 }
364
365 @SuppressJava6Requirement(reason = "Guarded by version check")
366 private static final class MagazineGroup {
367 private final AdaptivePoolingAllocator allocator;
368 private final ChunkAllocator chunkAllocator;
369 private final ChunkManagementStrategy chunkManagementStrategy;
370 private final ChunkCache chunkCache;
371 private final StampedLock magazineExpandLock;
372 private final Magazine threadLocalMagazine;
373 private Thread ownerThread;
374 private volatile Magazine[] magazines;
375 private volatile boolean freed;
376
377 MagazineGroup(AdaptivePoolingAllocator allocator,
378 ChunkAllocator chunkAllocator,
379 ChunkManagementStrategy chunkManagementStrategy,
380 boolean isThreadLocal) {
381 this.allocator = allocator;
382 this.chunkAllocator = chunkAllocator;
383 this.chunkManagementStrategy = chunkManagementStrategy;
384 chunkCache = chunkManagementStrategy.createChunkCache(isThreadLocal);
385 if (isThreadLocal) {
386 ownerThread = Thread.currentThread();
387 magazineExpandLock = null;
388 threadLocalMagazine = new Magazine(this, false, chunkManagementStrategy.createController(this));
389 } else {
390 ownerThread = null;
391 magazineExpandLock = new StampedLock();
392 threadLocalMagazine = null;
393 Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
394 for (int i = 0; i < mags.length; i++) {
395 mags[i] = new Magazine(this, true, chunkManagementStrategy.createController(this));
396 }
397 magazines = mags;
398 }
399 }
400
401 public AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
402 boolean reallocate = buf != null;
403
404
405 Magazine tlMag = threadLocalMagazine;
406 if (tlMag != null) {
407 if (buf == null) {
408 buf = tlMag.newBuffer();
409 }
410 boolean allocated = tlMag.tryAllocate(size, maxCapacity, buf, reallocate);
411 assert allocated : "Allocation of threadLocalMagazine must always succeed";
412 return buf;
413 }
414
415
416 long threadId = currentThread.getId();
417 Magazine[] mags;
418 int expansions = 0;
419 do {
420 mags = magazines;
421 int mask = mags.length - 1;
422 int index = (int) (threadId & mask);
423 for (int i = 0, m = mags.length << 1; i < m; i++) {
424 Magazine mag = mags[index + i & mask];
425 if (buf == null) {
426 buf = mag.newBuffer();
427 }
428 if (mag.tryAllocate(size, maxCapacity, buf, reallocate)) {
429
430 return buf;
431 }
432 }
433 expansions++;
434 } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
435
436
437 if (!reallocate && buf != null) {
438 buf.release();
439 }
440 return null;
441 }
442
443 private boolean tryExpandMagazines(int currentLength) {
444 if (currentLength >= MAX_STRIPES) {
445 return true;
446 }
447 final Magazine[] mags;
448 long writeLock = magazineExpandLock.tryWriteLock();
449 if (writeLock != 0) {
450 try {
451 mags = magazines;
452 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
453 return true;
454 }
455 Magazine[] expanded = new Magazine[mags.length * 2];
456 for (int i = 0, l = expanded.length; i < l; i++) {
457 expanded[i] = new Magazine(this, true, chunkManagementStrategy.createController(this));
458 }
459 magazines = expanded;
460 } finally {
461 magazineExpandLock.unlockWrite(writeLock);
462 }
463 for (Magazine magazine : mags) {
464 magazine.free();
465 }
466 }
467 return true;
468 }
469
470 Chunk pollChunk(int size) {
471 return chunkCache.pollChunk(size);
472 }
473
474 boolean offerChunk(Chunk chunk) {
475 if (freed) {
476 return false;
477 }
478
479 if (chunk.hasUnprocessedFreelistEntries()) {
480 chunk.processFreelistEntries();
481 }
482 boolean isAdded = chunkCache.offerChunk(chunk);
483
484 if (freed && isAdded) {
485
486 freeChunkReuseQueue(ownerThread);
487 }
488 return isAdded;
489 }
490
491 private void free() {
492 freed = true;
493 Thread ownerThread = this.ownerThread;
494 if (threadLocalMagazine != null) {
495 this.ownerThread = null;
496 threadLocalMagazine.free();
497 } else {
498 long stamp = magazineExpandLock.writeLock();
499 try {
500 Magazine[] mags = magazines;
501 for (Magazine magazine : mags) {
502 magazine.free();
503 }
504 } finally {
505 magazineExpandLock.unlockWrite(stamp);
506 }
507 }
508 freeChunkReuseQueue(ownerThread);
509 }
510
511 private void freeChunkReuseQueue(Thread ownerThread) {
512 Chunk chunk;
513 while ((chunk = chunkCache.pollChunk(0)) != null) {
514 if (ownerThread != null && chunk instanceof SizeClassedChunk) {
515 SizeClassedChunk threadLocalChunk = (SizeClassedChunk) chunk;
516 assert ownerThread == threadLocalChunk.ownerThread;
517
518
519
520 threadLocalChunk.ownerThread = null;
521 }
522 chunk.release();
523 }
524 }
525 }
526
527 private interface ChunkCache {
528 Chunk pollChunk(int size);
529 boolean offerChunk(Chunk chunk);
530 }
531
532 private static final class ConcurrentQueueChunkCache implements ChunkCache {
533 private final Queue<Chunk> queue;
534
535 private ConcurrentQueueChunkCache() {
536 queue = createSharedChunkQueue();
537 }
538
539 @Override
540 public Chunk pollChunk(int size) {
541 int attemps = queue.size();
542 for (int i = 0; i < attemps; i++) {
543 Chunk chunk = queue.poll();
544 if (chunk == null) {
545 return null;
546 }
547 if (chunk.hasUnprocessedFreelistEntries()) {
548 chunk.processFreelistEntries();
549 }
550 if (chunk.remainingCapacity() >= size) {
551 return chunk;
552 }
553 queue.offer(chunk);
554 }
555 return null;
556 }
557
558 @Override
559 public boolean offerChunk(Chunk chunk) {
560 return queue.offer(chunk);
561 }
562 }
563
564 private static final class ConcurrentSkipListChunkCache implements ChunkCache {
565 private final ConcurrentSkipListIntObjMultimap<Chunk> chunks;
566
567 private ConcurrentSkipListChunkCache() {
568 chunks = new ConcurrentSkipListIntObjMultimap<Chunk>(-1);
569 }
570
571 @Override
572 public Chunk pollChunk(int size) {
573 if (chunks.isEmpty()) {
574 return null;
575 }
576 IntEntry<Chunk> entry = chunks.pollCeilingEntry(size);
577 if (entry != null) {
578 Chunk chunk = entry.getValue();
579 if (chunk.hasUnprocessedFreelistEntries()) {
580 chunk.processFreelistEntries();
581 }
582 return chunk;
583 }
584
585 Chunk bestChunk = null;
586 int bestRemainingCapacity = 0;
587 Iterator<IntEntry<Chunk>> itr = chunks.iterator();
588 while (itr.hasNext()) {
589 entry = itr.next();
590 final Chunk chunk;
591 if (entry != null && (chunk = entry.getValue()).hasUnprocessedFreelistEntries()) {
592 if (!chunks.remove(entry.getKey(), entry.getValue())) {
593 continue;
594 }
595 chunk.processFreelistEntries();
596 int remainingCapacity = chunk.remainingCapacity();
597 if (remainingCapacity >= size &&
598 (bestChunk == null || remainingCapacity > bestRemainingCapacity)) {
599 if (bestChunk != null) {
600 chunks.put(bestRemainingCapacity, bestChunk);
601 }
602 bestChunk = chunk;
603 bestRemainingCapacity = remainingCapacity;
604 } else {
605 chunks.put(remainingCapacity, chunk);
606 }
607 }
608 }
609
610 return bestChunk;
611 }
612
613 @Override
614 public boolean offerChunk(Chunk chunk) {
615 chunks.put(chunk.remainingCapacity(), chunk);
616
617 int size = chunks.size();
618 while (size > CHUNK_REUSE_QUEUE) {
619
620 int key = -1;
621 Chunk toDeallocate = null;
622 for (IntEntry<Chunk> entry : chunks) {
623 Chunk candidate = entry.getValue();
624 if (candidate != null) {
625 if (toDeallocate == null) {
626 toDeallocate = candidate;
627 key = entry.getKey();
628 } else {
629 int candidateRefCnt = candidate.refCnt();
630 int toDeallocateRefCnt = toDeallocate.refCnt();
631 if (candidateRefCnt < toDeallocateRefCnt ||
632 candidateRefCnt == toDeallocateRefCnt &&
633 candidate.capacity() < toDeallocate.capacity()) {
634 toDeallocate = candidate;
635 key = entry.getKey();
636 }
637 }
638 }
639 }
640 if (toDeallocate == null) {
641 break;
642 }
643 if (chunks.remove(key, toDeallocate)) {
644 toDeallocate.release();
645 }
646 size = chunks.size();
647 }
648 return true;
649 }
650 }
651
652 private interface ChunkManagementStrategy {
653 ChunkController createController(MagazineGroup group);
654
655 ChunkCache createChunkCache(boolean isThreadLocal);
656 }
657
658 private interface ChunkController {
659
660
661
662 int computeBufferCapacity(int requestedSize, int maxCapacity, boolean isReallocation);
663
664
665
666
667 Chunk newChunkAllocation(int promptingSize, Magazine magazine);
668 }
669
670 private static final class SizeClassChunkManagementStrategy implements ChunkManagementStrategy {
671
672
673
674 private static final int MIN_SEGMENTS_PER_CHUNK = 32;
675 private final int segmentSize;
676 private final int chunkSize;
677
678 private SizeClassChunkManagementStrategy(int segmentSize) {
679 this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
680 chunkSize = Math.max(MIN_CHUNK_SIZE, segmentSize * MIN_SEGMENTS_PER_CHUNK);
681 }
682
683 @Override
684 public ChunkController createController(MagazineGroup group) {
685 return new SizeClassChunkController(group, segmentSize, chunkSize);
686 }
687
688 @Override
689 public ChunkCache createChunkCache(boolean isThreadLocal) {
690 return new ConcurrentQueueChunkCache();
691 }
692 }
693
694 private static final class SizeClassChunkController implements ChunkController {
695
696 private final ChunkAllocator chunkAllocator;
697 private final int segmentSize;
698 private final int chunkSize;
699 private final ChunkRegistry chunkRegistry;
700
701 private SizeClassChunkController(MagazineGroup group, int segmentSize, int chunkSize) {
702 chunkAllocator = group.chunkAllocator;
703 this.segmentSize = segmentSize;
704 this.chunkSize = chunkSize;
705 chunkRegistry = group.allocator.chunkRegistry;
706 }
707
708 private MpscIntQueue createEmptyFreeList() {
709 return new MpscAtomicIntegerArrayQueue(chunkSize / segmentSize, SizeClassedChunk.FREE_LIST_EMPTY);
710 }
711
712 private MpscIntQueue createFreeList() {
713 final int segmentsCount = chunkSize / segmentSize;
714 final MpscIntQueue freeList = new MpscAtomicIntegerArrayQueue(
715 segmentsCount, SizeClassedChunk.FREE_LIST_EMPTY);
716 int segmentOffset = 0;
717 for (int i = 0; i < segmentsCount; i++) {
718 freeList.offer(segmentOffset);
719 segmentOffset += segmentSize;
720 }
721 return freeList;
722 }
723
724 private IntStack createLocalFreeList() {
725 final int segmentsCount = chunkSize / segmentSize;
726 int segmentOffset = chunkSize;
727 int[] offsets = new int[segmentsCount];
728 for (int i = 0; i < segmentsCount; i++) {
729 segmentOffset -= segmentSize;
730 offsets[i] = segmentOffset;
731 }
732 return new IntStack(offsets);
733 }
734
735 @Override
736 public int computeBufferCapacity(
737 int requestedSize, int maxCapacity, boolean isReallocation) {
738 return Math.min(segmentSize, maxCapacity);
739 }
740
741 @Override
742 public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
743 AbstractByteBuf chunkBuffer = chunkAllocator.allocate(chunkSize, chunkSize);
744 assert chunkBuffer.capacity() == chunkSize;
745 SizeClassedChunk chunk = new SizeClassedChunk(chunkBuffer, magazine, this);
746 chunkRegistry.add(chunk);
747 return chunk;
748 }
749 }
750
751 private static final class BuddyChunkManagementStrategy implements ChunkManagementStrategy {
752 private final AtomicInteger maxChunkSize = new AtomicInteger();
753
754 @Override
755 public ChunkController createController(MagazineGroup group) {
756 return new BuddyChunkController(group, maxChunkSize);
757 }
758
759 @Override
760 public ChunkCache createChunkCache(boolean isThreadLocal) {
761 return new ConcurrentSkipListChunkCache();
762 }
763 }
764
765 private static final class BuddyChunkController implements ChunkController {
766 private final ChunkAllocator chunkAllocator;
767 private final ChunkRegistry chunkRegistry;
768 private final AtomicInteger maxChunkSize;
769
770 BuddyChunkController(MagazineGroup group, AtomicInteger maxChunkSize) {
771 chunkAllocator = group.chunkAllocator;
772 chunkRegistry = group.allocator.chunkRegistry;
773 this.maxChunkSize = maxChunkSize;
774 }
775
776 @Override
777 public int computeBufferCapacity(int requestedSize, int maxCapacity, boolean isReallocation) {
778 return MathUtil.safeFindNextPositivePowerOfTwo(requestedSize);
779 }
780
781 @Override
782 public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
783 int maxChunkSize = this.maxChunkSize.get();
784 int proposedChunkSize = MathUtil.safeFindNextPositivePowerOfTwo(BUFS_PER_CHUNK * promptingSize);
785 int chunkSize = Math.min(MAX_CHUNK_SIZE, Math.max(maxChunkSize, proposedChunkSize));
786 if (chunkSize > maxChunkSize) {
787
788 this.maxChunkSize.set(chunkSize);
789 }
790 BuddyChunk chunk = new BuddyChunk(chunkAllocator.allocate(chunkSize, chunkSize), magazine);
791 chunkRegistry.add(chunk);
792 return chunk;
793 }
794 }
795
796 @SuppressJava6Requirement(reason = "Guarded by version check")
797 private static final class Magazine {
798 private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
799 static {
800 NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
801 }
802 private static final Chunk MAGAZINE_FREED = new Chunk();
803
804 private static final class AdaptiveRecycler extends Recycler<AdaptiveByteBuf> {
805
806 private AdaptiveRecycler() {
807 }
808
809 private AdaptiveRecycler(int maxCapacity) {
810
811 super(maxCapacity);
812 }
813
814 @Override
815 protected AdaptiveByteBuf newObject(final Handle<AdaptiveByteBuf> handle) {
816 return new AdaptiveByteBuf((EnhancedHandle<AdaptiveByteBuf>) handle);
817 }
818
819 public static AdaptiveRecycler threadLocal() {
820 return new AdaptiveRecycler();
821 }
822
823 public static AdaptiveRecycler sharedWith(int maxCapacity) {
824 return new AdaptiveRecycler(maxCapacity);
825 }
826 }
827
828 private static final AdaptiveRecycler EVENT_LOOP_LOCAL_BUFFER_POOL = AdaptiveRecycler.threadLocal();
829
830 private Chunk current;
831 @SuppressWarnings("unused")
832 private volatile Chunk nextInLine;
833 private final MagazineGroup group;
834 private final ChunkController chunkController;
835 private final StampedLock allocationLock;
836 private final AdaptiveRecycler recycler;
837
838 Magazine(MagazineGroup group, boolean shareable, ChunkController chunkController) {
839 this.group = group;
840 this.chunkController = chunkController;
841
842 if (shareable) {
843
844 allocationLock = new StampedLock();
845 recycler = AdaptiveRecycler.sharedWith(MAGAZINE_BUFFER_QUEUE_CAPACITY);
846 } else {
847 allocationLock = null;
848 recycler = null;
849 }
850 }
851
852 public boolean tryAllocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
853 if (allocationLock == null) {
854
855 return allocate(size, maxCapacity, buf, reallocate);
856 }
857
858
859 long writeLock = allocationLock.tryWriteLock();
860 if (writeLock != 0) {
861 try {
862 return allocate(size, maxCapacity, buf, reallocate);
863 } finally {
864 allocationLock.unlockWrite(writeLock);
865 }
866 }
867 return allocateWithoutLock(size, maxCapacity, buf);
868 }
869
870 private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf buf) {
871 Chunk curr = NEXT_IN_LINE.getAndSet(this, null);
872 if (curr == MAGAZINE_FREED) {
873
874 restoreMagazineFreed();
875 return false;
876 }
877 if (curr == null) {
878 curr = group.pollChunk(size);
879 if (curr == null) {
880 return false;
881 }
882 curr.attachToMagazine(this);
883 }
884 boolean allocated = false;
885 int remainingCapacity = curr.remainingCapacity();
886 int startingCapacity = chunkController.computeBufferCapacity(
887 size, maxCapacity, true );
888 if (remainingCapacity >= size &&
889 curr.readInitInto(buf, size, Math.min(remainingCapacity, startingCapacity), maxCapacity)) {
890 allocated = true;
891 remainingCapacity = curr.remainingCapacity();
892 }
893 try {
894 if (remainingCapacity >= RETIRE_CAPACITY) {
895 transferToNextInLineOrRelease(curr);
896 curr = null;
897 }
898 } finally {
899 if (curr != null) {
900 curr.releaseFromMagazine();
901 }
902 }
903 return allocated;
904 }
905
906 private boolean allocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
907 int startingCapacity = chunkController.computeBufferCapacity(size, maxCapacity, reallocate);
908 Chunk curr = current;
909 if (curr != null) {
910 boolean success = curr.readInitInto(buf, size, startingCapacity, maxCapacity);
911 int remainingCapacity = curr.remainingCapacity();
912 if (!success && remainingCapacity > 0) {
913 current = null;
914 transferToNextInLineOrRelease(curr);
915 } else if (remainingCapacity == 0) {
916 current = null;
917 curr.releaseFromMagazine();
918 }
919 if (success) {
920 return true;
921 }
922 }
923
924 assert current == null;
925
926
927
928
929
930
931
932
933 curr = NEXT_IN_LINE.getAndSet(this, null);
934 if (curr != null) {
935 if (curr == MAGAZINE_FREED) {
936
937 restoreMagazineFreed();
938 return false;
939 }
940
941 int remainingCapacity = curr.remainingCapacity();
942 if (remainingCapacity > startingCapacity &&
943 curr.readInitInto(buf, size, startingCapacity, maxCapacity)) {
944
945 current = curr;
946 return true;
947 }
948
949 try {
950 if (remainingCapacity >= size) {
951
952
953 return curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
954 }
955 } finally {
956
957
958 curr.releaseFromMagazine();
959 }
960 }
961
962
963 curr = group.pollChunk(size);
964 if (curr == null) {
965 curr = chunkController.newChunkAllocation(size, this);
966 } else {
967 curr.attachToMagazine(this);
968
969 int remainingCapacity = curr.remainingCapacity();
970 if (remainingCapacity == 0 || remainingCapacity < size) {
971
972 if (remainingCapacity < RETIRE_CAPACITY) {
973 curr.releaseFromMagazine();
974 } else {
975
976
977 transferToNextInLineOrRelease(curr);
978 }
979 curr = chunkController.newChunkAllocation(size, this);
980 }
981 }
982
983 current = curr;
984 boolean success;
985 try {
986 int remainingCapacity = curr.remainingCapacity();
987 assert remainingCapacity >= size;
988 if (remainingCapacity > startingCapacity) {
989 success = curr.readInitInto(buf, size, startingCapacity, maxCapacity);
990 curr = null;
991 } else {
992 success = curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
993 }
994 } finally {
995 if (curr != null) {
996
997
998 curr.releaseFromMagazine();
999 current = null;
1000 }
1001 }
1002 return success;
1003 }
1004
1005 private void restoreMagazineFreed() {
1006 Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
1007 if (next != null && next != MAGAZINE_FREED) {
1008
1009 next.releaseFromMagazine();
1010 }
1011 }
1012
1013 private void transferToNextInLineOrRelease(Chunk chunk) {
1014 if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
1015 return;
1016 }
1017
1018 Chunk nextChunk = NEXT_IN_LINE.get(this);
1019 if (nextChunk != null && nextChunk != MAGAZINE_FREED
1020 && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
1021 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
1022 nextChunk.releaseFromMagazine();
1023 return;
1024 }
1025 }
1026
1027
1028
1029
1030 chunk.releaseFromMagazine();
1031 }
1032
1033 void free() {
1034
1035 restoreMagazineFreed();
1036 long stamp = allocationLock != null ? allocationLock.writeLock() : 0;
1037 try {
1038 if (current != null) {
1039 current.releaseFromMagazine();
1040 current = null;
1041 }
1042 } finally {
1043 if (allocationLock != null) {
1044 allocationLock.unlockWrite(stamp);
1045 }
1046 }
1047 }
1048
1049 public AdaptiveByteBuf newBuffer() {
1050 AdaptiveRecycler recycler = this.recycler;
1051 AdaptiveByteBuf buf = recycler == null? EVENT_LOOP_LOCAL_BUFFER_POOL.get() : recycler.get();
1052 buf.resetRefCnt();
1053 buf.discardMarks();
1054 return buf;
1055 }
1056
1057 boolean offerToQueue(Chunk chunk) {
1058 return group.offerChunk(chunk);
1059 }
1060 }
1061
1062 @SuppressJava6Requirement(reason = "Guarded by version check")
1063 private static final class ChunkRegistry {
1064 private final LongAdder totalCapacity = new LongAdder();
1065
1066 public long totalCapacity() {
1067 return totalCapacity.sum();
1068 }
1069
1070 public void add(Chunk chunk) {
1071 totalCapacity.add(chunk.capacity());
1072 }
1073
1074 public void remove(Chunk chunk) {
1075 totalCapacity.add(-chunk.capacity());
1076 }
1077 }
1078
1079 private static class Chunk implements ReferenceCounted {
1080 private static final long REFCNT_FIELD_OFFSET =
1081 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
1082 private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
1083 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
1084
1085 protected final AbstractByteBuf delegate;
1086 protected Magazine magazine;
1087 private final AdaptivePoolingAllocator allocator;
1088 private final int capacity;
1089 protected int allocatedBytes;
1090
1091 private static final ReferenceCountUpdater<Chunk> updater =
1092 new ReferenceCountUpdater<Chunk>() {
1093 @Override
1094 protected AtomicIntegerFieldUpdater<Chunk> updater() {
1095 return AIF_UPDATER;
1096 }
1097 @Override
1098 protected long unsafeOffset() {
1099
1100
1101 return PlatformDependent.hasUnsafe() ? REFCNT_FIELD_OFFSET : -1;
1102 }
1103 };
1104
1105
1106 @SuppressWarnings({"unused", "FieldMayBeFinal"})
1107 private volatile int refCnt;
1108
1109 Chunk() {
1110
1111 delegate = null;
1112 magazine = null;
1113 allocator = null;
1114 capacity = 0;
1115 }
1116
1117 Chunk(AbstractByteBuf delegate, Magazine magazine) {
1118 this.delegate = delegate;
1119 capacity = delegate.capacity();
1120 updater.setInitialValue(this);
1121 attachToMagazine(magazine);
1122
1123
1124 allocator = magazine.group.allocator;
1125 }
1126
1127 Magazine currentMagazine() {
1128 return magazine;
1129 }
1130
1131 void detachFromMagazine() {
1132 if (magazine != null) {
1133 magazine = null;
1134 }
1135 }
1136
1137 void attachToMagazine(Magazine magazine) {
1138 assert this.magazine == null;
1139 this.magazine = magazine;
1140 }
1141
1142 @Override
1143 public Chunk touch(Object hint) {
1144 return this;
1145 }
1146
1147 @Override
1148 public int refCnt() {
1149 return updater.refCnt(this);
1150 }
1151
1152 @Override
1153 public Chunk retain() {
1154 return updater.retain(this);
1155 }
1156
1157 @Override
1158 public Chunk retain(int increment) {
1159 return updater.retain(this, increment);
1160 }
1161
1162 @Override
1163 public Chunk touch() {
1164 return this;
1165 }
1166
1167 @Override
1168 public boolean release() {
1169 if (updater.release(this)) {
1170 deallocate();
1171 return true;
1172 }
1173 return false;
1174 }
1175
1176 @Override
1177 public boolean release(int decrement) {
1178 if (updater.release(this, decrement)) {
1179 deallocate();
1180 return true;
1181 }
1182 return false;
1183 }
1184
1185
1186
1187
1188 boolean releaseFromMagazine() {
1189
1190
1191 Magazine mag = magazine;
1192 detachFromMagazine();
1193 if (!mag.offerToQueue(this)) {
1194 return release();
1195 }
1196 return false;
1197 }
1198
1199
1200
1201
1202 void releaseSegment(int ignoredSegmentId, int size) {
1203 release();
1204 }
1205
1206 protected void deallocate() {
1207 allocator.chunkRegistry.remove(this);
1208 delegate.release();
1209 }
1210
1211 public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1212 int startIndex = allocatedBytes;
1213 allocatedBytes = startIndex + startingCapacity;
1214 Chunk chunk = this;
1215 chunk.retain();
1216 try {
1217 buf.init(delegate, chunk, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1218 chunk = null;
1219 } finally {
1220 if (chunk != null) {
1221
1222
1223
1224 allocatedBytes = startIndex;
1225 chunk.release();
1226 }
1227 }
1228 return true;
1229 }
1230
1231 public int remainingCapacity() {
1232 return capacity - allocatedBytes;
1233 }
1234
1235 public boolean hasUnprocessedFreelistEntries() {
1236 return false;
1237 }
1238
1239 public void processFreelistEntries() {
1240 }
1241
1242 public int capacity() {
1243 return capacity;
1244 }
1245 }
1246
1247 private static final class IntStack {
1248
1249 private final int[] stack;
1250 private int top;
1251
1252 IntStack(int[] initialValues) {
1253 stack = initialValues;
1254 top = initialValues.length - 1;
1255 }
1256
1257 public boolean isEmpty() {
1258 return top == -1;
1259 }
1260
1261 public int pop() {
1262 final int last = stack[top];
1263 top--;
1264 return last;
1265 }
1266
1267 public void push(int value) {
1268 stack[top + 1] = value;
1269 top++;
1270 }
1271
1272 public int size() {
1273 return top + 1;
1274 }
1275 }
1276
1277 private static final class SizeClassedChunk extends Chunk {
1278 private static final int FREE_LIST_EMPTY = -1;
1279 private final int segmentSize;
1280 private final MpscIntQueue externalFreeList;
1281 private final IntStack localFreeList;
1282 private Thread ownerThread;
1283
1284 SizeClassedChunk(AbstractByteBuf delegate, Magazine magazine,
1285 SizeClassChunkController controller) {
1286 super(delegate, magazine);
1287 segmentSize = controller.segmentSize;
1288 ownerThread = magazine.group.ownerThread;
1289 if (ownerThread == null) {
1290 externalFreeList = controller.createFreeList();
1291 localFreeList = null;
1292 } else {
1293 externalFreeList = controller.createEmptyFreeList();
1294 localFreeList = controller.createLocalFreeList();
1295 }
1296 }
1297
1298 @Override
1299 public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1300 final int startIndex = nextAvailableSegmentOffset();
1301 if (startIndex == FREE_LIST_EMPTY) {
1302 return false;
1303 }
1304 allocatedBytes += segmentSize;
1305 Chunk chunk = this;
1306 chunk.retain();
1307 try {
1308 buf.init(delegate, chunk, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1309 chunk = null;
1310 } finally {
1311 if (chunk != null) {
1312
1313
1314
1315 allocatedBytes -= segmentSize;
1316 chunk.releaseSegment(startIndex, startingCapacity);
1317 }
1318 }
1319 return true;
1320 }
1321
1322 private int nextAvailableSegmentOffset() {
1323 final int startIndex;
1324 IntStack localFreeList = this.localFreeList;
1325 if (localFreeList != null) {
1326 assert Thread.currentThread() == ownerThread;
1327 if (localFreeList.isEmpty()) {
1328 startIndex = externalFreeList.poll();
1329 } else {
1330 startIndex = localFreeList.pop();
1331 }
1332 } else {
1333 startIndex = externalFreeList.poll();
1334 }
1335 return startIndex;
1336 }
1337
1338 private int remainingCapacityOnFreeList() {
1339 final int segmentSize = this.segmentSize;
1340 int remainingCapacity = externalFreeList.size() * segmentSize;
1341 IntStack localFreeList = this.localFreeList;
1342 if (localFreeList != null) {
1343 assert Thread.currentThread() == ownerThread;
1344 remainingCapacity += localFreeList.size() * segmentSize;
1345 }
1346 return remainingCapacity;
1347 }
1348
1349 @Override
1350 public int remainingCapacity() {
1351 int remainingCapacity = super.remainingCapacity();
1352 if (remainingCapacity > segmentSize) {
1353 return remainingCapacity;
1354 }
1355 int updatedRemainingCapacity = remainingCapacityOnFreeList();
1356 if (updatedRemainingCapacity == remainingCapacity) {
1357 return remainingCapacity;
1358 }
1359
1360 allocatedBytes = capacity() - updatedRemainingCapacity;
1361 return updatedRemainingCapacity;
1362 }
1363
1364 private void releaseSegmentOffsetIntoFreeList(int startIndex) {
1365 IntStack localFreeList = this.localFreeList;
1366 if (localFreeList != null && Thread.currentThread() == ownerThread) {
1367 localFreeList.push(startIndex);
1368 } else {
1369 boolean segmentReturned = externalFreeList.offer(startIndex);
1370 assert segmentReturned : "Unable to return segment " + startIndex + " to free list";
1371 }
1372 }
1373
1374 @Override
1375 void releaseSegment(int startIndex, int size) {
1376 release();
1377 releaseSegmentOffsetIntoFreeList(startIndex);
1378 }
1379 }
1380
1381 private static final class BuddyChunk extends Chunk implements IntConsumer {
1382 private static final int MIN_BUDDY_SIZE = 32768;
1383 private static final byte IS_CLAIMED = (byte) (1 << 7);
1384 private static final byte HAS_CLAIMED_CHILDREN = 1 << 6;
1385 private static final byte SHIFT_MASK = ~(IS_CLAIMED | HAS_CLAIMED_CHILDREN);
1386 private static final int PACK_OFFSET_MASK = 0xFFFF;
1387 private static final int PACK_SIZE_SHIFT = Integer.SIZE - Integer.numberOfLeadingZeros(PACK_OFFSET_MASK);
1388
1389 private final MpscIntQueue freeList;
1390
1391 private final byte[] buddies;
1392 private final int freeListCapacity;
1393
1394 BuddyChunk(AbstractByteBuf delegate, Magazine magazine) {
1395 super(delegate, magazine);
1396 int capacity = delegate.capacity();
1397 int capFactor = capacity / MIN_BUDDY_SIZE;
1398 int tree = (capFactor << 1) - 1;
1399 int maxShift = Integer.numberOfTrailingZeros(capFactor);
1400 assert maxShift <= 30;
1401 freeListCapacity = tree >> 1;
1402 freeList = new MpscAtomicIntegerArrayQueue(freeListCapacity, -1);
1403 buddies = new byte[1 + tree];
1404
1405
1406 int index = 1;
1407 int runLength = 1;
1408 int currentRun = 0;
1409 while (maxShift > 0) {
1410 buddies[index++] = (byte) maxShift;
1411 if (++currentRun == runLength) {
1412 currentRun = 0;
1413 runLength <<= 1;
1414 maxShift--;
1415 }
1416 }
1417 }
1418
1419 @Override
1420 public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1421 if (!freeList.isEmpty()) {
1422 freeList.drain(freeListCapacity, this);
1423 }
1424 int startIndex = chooseFirstFreeBuddy(1, startingCapacity, 0);
1425 if (startIndex == -1) {
1426 return false;
1427 }
1428 Chunk chunk = this;
1429 chunk.retain();
1430 try {
1431 buf.init(delegate, this, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1432 allocatedBytes += startingCapacity;
1433 chunk = null;
1434 } finally {
1435 if (chunk != null) {
1436
1437
1438 chunk.release();
1439 }
1440 }
1441 return true;
1442 }
1443
1444 @Override
1445 public void accept(int packed) {
1446
1447 int size = MIN_BUDDY_SIZE << (packed >> PACK_SIZE_SHIFT);
1448 int offset = (packed & PACK_OFFSET_MASK) * MIN_BUDDY_SIZE;
1449 unreserveMatchingBuddy(1, size, offset, 0);
1450 allocatedBytes -= size;
1451 }
1452
1453 @Override
1454 void releaseSegment(int startingIndex, int size) {
1455 int packedOffset = startingIndex / MIN_BUDDY_SIZE;
1456 int packedSize = Integer.numberOfTrailingZeros(size / MIN_BUDDY_SIZE) << PACK_SIZE_SHIFT;
1457 int packed = packedOffset | packedSize;
1458 freeList.offer(packed);
1459 release();
1460 }
1461
1462 @Override
1463 public int remainingCapacity() {
1464 if (!freeList.isEmpty()) {
1465 freeList.drain(freeListCapacity, this);
1466 }
1467 return super.remainingCapacity();
1468 }
1469
1470 @Override
1471 public boolean hasUnprocessedFreelistEntries() {
1472 return !freeList.isEmpty();
1473 }
1474
1475 @Override
1476 public void processFreelistEntries() {
1477 freeList.drain(freeListCapacity, this);
1478 }
1479
1480
1481
1482
1483 private int chooseFirstFreeBuddy(int index, int size, int currOffset) {
1484 byte[] buddies = this.buddies;
1485 while (index < buddies.length) {
1486 byte buddy = buddies[index];
1487 int currValue = MIN_BUDDY_SIZE << (buddy & SHIFT_MASK);
1488 if (currValue < size || (buddy & IS_CLAIMED) == IS_CLAIMED) {
1489 return -1;
1490 }
1491 if (currValue == size && (buddy & HAS_CLAIMED_CHILDREN) == 0) {
1492 buddies[index] |= IS_CLAIMED;
1493 return currOffset;
1494 }
1495 int found = chooseFirstFreeBuddy(index << 1, size, currOffset);
1496 if (found != -1) {
1497 buddies[index] |= HAS_CLAIMED_CHILDREN;
1498 return found;
1499 }
1500 index = (index << 1) + 1;
1501 currOffset += currValue >> 1;
1502 }
1503 return -1;
1504 }
1505
1506
1507
1508
1509 private boolean unreserveMatchingBuddy(int index, int size, int offset, int currOffset) {
1510 byte[] buddies = this.buddies;
1511 if (buddies.length <= index) {
1512 return false;
1513 }
1514 byte buddy = buddies[index];
1515 int currSize = MIN_BUDDY_SIZE << (buddy & SHIFT_MASK);
1516
1517 if (currSize == size) {
1518
1519 if (currOffset == offset) {
1520 buddies[index] &= SHIFT_MASK;
1521 return false;
1522 }
1523 throw new IllegalStateException("The intended segment was not found at index " +
1524 index + ", for size " + size + " and offset " + offset);
1525 }
1526
1527
1528 boolean claims;
1529 int siblingIndex;
1530 if (offset < currOffset + (currSize >> 1)) {
1531
1532 claims = unreserveMatchingBuddy(index << 1, size, offset, currOffset);
1533 siblingIndex = (index << 1) + 1;
1534 } else {
1535
1536 claims = unreserveMatchingBuddy((index << 1) + 1, size, offset, currOffset + (currSize >> 1));
1537 siblingIndex = index << 1;
1538 }
1539 if (!claims) {
1540
1541 byte sibling = buddies[siblingIndex];
1542 if ((sibling & SHIFT_MASK) == sibling) {
1543
1544 buddies[index] &= SHIFT_MASK;
1545 return false;
1546 }
1547 }
1548 return true;
1549 }
1550
1551 @Override
1552 public String toString() {
1553 int capacity = delegate.capacity();
1554 int remaining = capacity - allocatedBytes;
1555 return "BuddyChunk[capacity: " + capacity +
1556 ", remaining: " + remaining +
1557 ", free list: " + freeList.size() + ']';
1558 }
1559 }
1560
1561 static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
1562
1563 private final EnhancedHandle<AdaptiveByteBuf> handle;
1564
1565
1566 private int startIndex;
1567 private AbstractByteBuf rootParent;
1568 Chunk chunk;
1569 private int length;
1570 private int maxFastCapacity;
1571 private ByteBuffer tmpNioBuf;
1572 private boolean hasArray;
1573 private boolean hasMemoryAddress;
1574
1575 AdaptiveByteBuf(EnhancedHandle<AdaptiveByteBuf> recyclerHandle) {
1576 super(0);
1577 handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
1578 }
1579
1580 void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
1581 int startIndex, int size, int capacity, int maxCapacity) {
1582 this.startIndex = startIndex;
1583 chunk = wrapped;
1584 length = size;
1585 maxFastCapacity = capacity;
1586 maxCapacity(maxCapacity);
1587 setIndex0(readerIndex, writerIndex);
1588 hasArray = unwrapped.hasArray();
1589 hasMemoryAddress = unwrapped.hasMemoryAddress();
1590 rootParent = unwrapped;
1591 tmpNioBuf = null;
1592 }
1593
1594 private AbstractByteBuf rootParent() {
1595 final AbstractByteBuf rootParent = this.rootParent;
1596 if (rootParent != null) {
1597 return rootParent;
1598 }
1599 throw new IllegalReferenceCountException();
1600 }
1601
1602 @Override
1603 public int capacity() {
1604 return length;
1605 }
1606
1607 @Override
1608 public int maxFastWritableBytes() {
1609 return Math.min(maxFastCapacity, maxCapacity()) - writerIndex;
1610 }
1611
1612 @Override
1613 public ByteBuf capacity(int newCapacity) {
1614 if (length <= newCapacity && newCapacity <= maxFastCapacity) {
1615 ensureAccessible();
1616 length = newCapacity;
1617 return this;
1618 }
1619 checkNewCapacity(newCapacity);
1620 if (newCapacity < capacity()) {
1621 length = newCapacity;
1622 trimIndicesToCapacity(newCapacity);
1623 return this;
1624 }
1625
1626
1627 Chunk chunk = this.chunk;
1628 AdaptivePoolingAllocator allocator = chunk.allocator;
1629 int readerIndex = this.readerIndex;
1630 int writerIndex = this.writerIndex;
1631 int baseOldRootIndex = startIndex;
1632 int oldLength = length;
1633 int oldCapacity = maxFastCapacity;
1634 AbstractByteBuf oldRoot = rootParent();
1635 allocator.reallocate(newCapacity, maxCapacity(), this);
1636 oldRoot.getBytes(baseOldRootIndex, this, 0, oldLength);
1637 chunk.releaseSegment(baseOldRootIndex, oldCapacity);
1638 assert oldCapacity < maxFastCapacity && newCapacity <= maxFastCapacity:
1639 "Capacity increase failed";
1640 this.readerIndex = readerIndex;
1641 this.writerIndex = writerIndex;
1642 return this;
1643 }
1644
1645 @Override
1646 public ByteBufAllocator alloc() {
1647 return rootParent().alloc();
1648 }
1649
1650 @SuppressWarnings("deprecation")
1651 @Override
1652 public ByteOrder order() {
1653 return rootParent().order();
1654 }
1655
1656 @Override
1657 public ByteBuf unwrap() {
1658 return null;
1659 }
1660
1661 @Override
1662 public boolean isDirect() {
1663 return rootParent().isDirect();
1664 }
1665
1666 @Override
1667 public int arrayOffset() {
1668 return idx(rootParent().arrayOffset());
1669 }
1670
1671 @Override
1672 public boolean hasMemoryAddress() {
1673 return hasMemoryAddress;
1674 }
1675
1676 @Override
1677 public long memoryAddress() {
1678 ensureAccessible();
1679 return rootParent().memoryAddress() + startIndex;
1680 }
1681
1682 @Override
1683 public ByteBuffer nioBuffer(int index, int length) {
1684 checkIndex(index, length);
1685 return rootParent().nioBuffer(idx(index), length);
1686 }
1687
1688 @Override
1689 public ByteBuffer internalNioBuffer(int index, int length) {
1690 checkIndex(index, length);
1691 return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1692 }
1693
1694 private ByteBuffer internalNioBuffer() {
1695 if (tmpNioBuf == null) {
1696 tmpNioBuf = rootParent().nioBuffer(startIndex, maxFastCapacity);
1697 }
1698 return (ByteBuffer) tmpNioBuf.clear();
1699 }
1700
1701 @Override
1702 public ByteBuffer[] nioBuffers(int index, int length) {
1703 checkIndex(index, length);
1704 return rootParent().nioBuffers(idx(index), length);
1705 }
1706
1707 @Override
1708 public boolean hasArray() {
1709 return hasArray;
1710 }
1711
1712 @Override
1713 public byte[] array() {
1714 ensureAccessible();
1715 return rootParent().array();
1716 }
1717
1718 @Override
1719 public ByteBuf copy(int index, int length) {
1720 checkIndex(index, length);
1721 return rootParent().copy(idx(index), length);
1722 }
1723
1724 @Override
1725 public int nioBufferCount() {
1726 return rootParent().nioBufferCount();
1727 }
1728
1729 @Override
1730 protected byte _getByte(int index) {
1731 return rootParent()._getByte(idx(index));
1732 }
1733
1734 @Override
1735 protected short _getShort(int index) {
1736 return rootParent()._getShort(idx(index));
1737 }
1738
1739 @Override
1740 protected short _getShortLE(int index) {
1741 return rootParent()._getShortLE(idx(index));
1742 }
1743
1744 @Override
1745 protected int _getUnsignedMedium(int index) {
1746 return rootParent()._getUnsignedMedium(idx(index));
1747 }
1748
1749 @Override
1750 protected int _getUnsignedMediumLE(int index) {
1751 return rootParent()._getUnsignedMediumLE(idx(index));
1752 }
1753
1754 @Override
1755 protected int _getInt(int index) {
1756 return rootParent()._getInt(idx(index));
1757 }
1758
1759 @Override
1760 protected int _getIntLE(int index) {
1761 return rootParent()._getIntLE(idx(index));
1762 }
1763
1764 @Override
1765 protected long _getLong(int index) {
1766 return rootParent()._getLong(idx(index));
1767 }
1768
1769 @Override
1770 protected long _getLongLE(int index) {
1771 return rootParent()._getLongLE(idx(index));
1772 }
1773
1774 @Override
1775 public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1776 checkIndex(index, length);
1777 rootParent().getBytes(idx(index), dst, dstIndex, length);
1778 return this;
1779 }
1780
1781 @Override
1782 public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1783 checkIndex(index, length);
1784 rootParent().getBytes(idx(index), dst, dstIndex, length);
1785 return this;
1786 }
1787
1788 @Override
1789 public ByteBuf getBytes(int index, ByteBuffer dst) {
1790 checkIndex(index, dst.remaining());
1791 rootParent().getBytes(idx(index), dst);
1792 return this;
1793 }
1794
1795 @Override
1796 protected void _setByte(int index, int value) {
1797 rootParent()._setByte(idx(index), value);
1798 }
1799
1800 @Override
1801 protected void _setShort(int index, int value) {
1802 rootParent()._setShort(idx(index), value);
1803 }
1804
1805 @Override
1806 protected void _setShortLE(int index, int value) {
1807 rootParent()._setShortLE(idx(index), value);
1808 }
1809
1810 @Override
1811 protected void _setMedium(int index, int value) {
1812 rootParent()._setMedium(idx(index), value);
1813 }
1814
1815 @Override
1816 protected void _setMediumLE(int index, int value) {
1817 rootParent()._setMediumLE(idx(index), value);
1818 }
1819
1820 @Override
1821 protected void _setInt(int index, int value) {
1822 rootParent()._setInt(idx(index), value);
1823 }
1824
1825 @Override
1826 protected void _setIntLE(int index, int value) {
1827 rootParent()._setIntLE(idx(index), value);
1828 }
1829
1830 @Override
1831 protected void _setLong(int index, long value) {
1832 rootParent()._setLong(idx(index), value);
1833 }
1834
1835 @Override
1836 protected void _setLongLE(int index, long value) {
1837 rootParent().setLongLE(idx(index), value);
1838 }
1839
1840 @Override
1841 public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1842 checkIndex(index, length);
1843 ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1844 tmp.put(src, srcIndex, length);
1845 return this;
1846 }
1847
1848 @Override
1849 public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1850 checkIndex(index, length);
1851 ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1852 tmp.put(src.nioBuffer(srcIndex, length));
1853 return this;
1854 }
1855
1856 @Override
1857 public ByteBuf setBytes(int index, ByteBuffer src) {
1858 checkIndex(index, src.remaining());
1859 ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1860 tmp.put(src);
1861 return this;
1862 }
1863
1864 @Override
1865 public ByteBuf getBytes(int index, OutputStream out, int length)
1866 throws IOException {
1867 checkIndex(index, length);
1868 if (length != 0) {
1869 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1870 }
1871 return this;
1872 }
1873
1874 @Override
1875 public int getBytes(int index, GatheringByteChannel out, int length)
1876 throws IOException {
1877 ByteBuffer buf = internalNioBuffer().duplicate();
1878 buf.clear().position(index).limit(index + length);
1879 return out.write(buf);
1880 }
1881
1882 @Override
1883 public int getBytes(int index, FileChannel out, long position, int length)
1884 throws IOException {
1885 ByteBuffer buf = internalNioBuffer().duplicate();
1886 buf.clear().position(index).limit(index + length);
1887 return out.write(buf, position);
1888 }
1889
1890 @Override
1891 public int setBytes(int index, InputStream in, int length)
1892 throws IOException {
1893 checkIndex(index, length);
1894 final AbstractByteBuf rootParent = rootParent();
1895 if (rootParent.hasArray()) {
1896 return rootParent.setBytes(idx(index), in, length);
1897 }
1898 byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1899 int readBytes = in.read(tmp, 0, length);
1900 if (readBytes <= 0) {
1901 return readBytes;
1902 }
1903 setBytes(index, tmp, 0, readBytes);
1904 return readBytes;
1905 }
1906
1907 @Override
1908 public int setBytes(int index, ScatteringByteChannel in, int length)
1909 throws IOException {
1910 try {
1911 return in.read(internalNioBuffer(index, length));
1912 } catch (ClosedChannelException ignored) {
1913 return -1;
1914 }
1915 }
1916
1917 @Override
1918 public int setBytes(int index, FileChannel in, long position, int length)
1919 throws IOException {
1920 try {
1921 return in.read(internalNioBuffer(index, length), position);
1922 } catch (ClosedChannelException ignored) {
1923 return -1;
1924 }
1925 }
1926
1927 @Override
1928 public int setCharSequence(int index, CharSequence sequence, Charset charset) {
1929 return setCharSequence0(index, sequence, charset, false);
1930 }
1931
1932 private int setCharSequence0(int index, CharSequence sequence, Charset charset, boolean expand) {
1933 if (charset.equals(CharsetUtil.UTF_8)) {
1934 int length = ByteBufUtil.utf8MaxBytes(sequence);
1935 if (expand) {
1936 ensureWritable0(length);
1937 checkIndex0(index, length);
1938 } else {
1939 checkIndex(index, length);
1940 }
1941 return ByteBufUtil.writeUtf8(this, index, length, sequence, sequence.length());
1942 }
1943 if (charset.equals(CharsetUtil.US_ASCII) || charset.equals(CharsetUtil.ISO_8859_1)) {
1944 int length = sequence.length();
1945 if (expand) {
1946 ensureWritable0(length);
1947 checkIndex0(index, length);
1948 } else {
1949 checkIndex(index, length);
1950 }
1951 return ByteBufUtil.writeAscii(this, index, sequence, length);
1952 }
1953 byte[] bytes = sequence.toString().getBytes(charset);
1954 if (expand) {
1955 ensureWritable0(bytes.length);
1956
1957 }
1958 setBytes(index, bytes);
1959 return bytes.length;
1960 }
1961
1962 @Override
1963 public int writeCharSequence(CharSequence sequence, Charset charset) {
1964 int written = setCharSequence0(writerIndex, sequence, charset, true);
1965 writerIndex += written;
1966 return written;
1967 }
1968
1969 @Override
1970 public int forEachByte(int index, int length, ByteProcessor processor) {
1971 checkIndex(index, length);
1972 int ret = rootParent().forEachByte(idx(index), length, processor);
1973 return forEachResult(ret);
1974 }
1975
1976 @Override
1977 public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1978 checkIndex(index, length);
1979 int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1980 return forEachResult(ret);
1981 }
1982
1983 @Override
1984 public ByteBuf setZero(int index, int length) {
1985 checkIndex(index, length);
1986 rootParent().setZero(idx(index), length);
1987 return this;
1988 }
1989
1990 @Override
1991 public ByteBuf writeZero(int length) {
1992 ensureWritable(length);
1993 rootParent().setZero(idx(writerIndex), length);
1994 writerIndex += length;
1995 return this;
1996 }
1997
1998 private int forEachResult(int ret) {
1999 if (ret < startIndex) {
2000 return -1;
2001 }
2002 return ret - startIndex;
2003 }
2004
2005 @Override
2006 public boolean isContiguous() {
2007 return rootParent().isContiguous();
2008 }
2009
2010 private int idx(int index) {
2011 return index + startIndex;
2012 }
2013
2014 @Override
2015 protected void deallocate() {
2016 if (chunk != null) {
2017 chunk.releaseSegment(startIndex, maxFastCapacity);
2018 }
2019 tmpNioBuf = null;
2020 chunk = null;
2021 rootParent = null;
2022 handle.unguardedRecycle(this);
2023 }
2024 }
2025
2026
2027
2028
2029 interface ChunkAllocator {
2030
2031
2032
2033
2034
2035
2036 AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
2037 }
2038 }