1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.ByteProcessor;
19 import io.netty.util.CharsetUtil;
20 import io.netty.util.IllegalReferenceCountException;
21 import io.netty.util.IntConsumer;
22 import io.netty.util.NettyRuntime;
23 import io.netty.util.Recycler;
24 import io.netty.util.Recycler.EnhancedHandle;
25 import io.netty.util.ReferenceCounted;
26 import io.netty.util.concurrent.ConcurrentSkipListIntObjMultimap;
27 import io.netty.util.concurrent.ConcurrentSkipListIntObjMultimap.IntEntry;
28 import io.netty.util.concurrent.FastThreadLocal;
29 import io.netty.util.concurrent.FastThreadLocalThread;
30 import io.netty.util.concurrent.MpscAtomicIntegerArrayQueue;
31 import io.netty.util.concurrent.MpscIntQueue;
32 import io.netty.util.internal.MathUtil;
33 import io.netty.util.internal.ObjectUtil;
34 import io.netty.util.internal.PlatformDependent;
35 import io.netty.util.internal.ReferenceCountUpdater;
36 import io.netty.util.internal.SuppressJava6Requirement;
37 import io.netty.util.internal.SystemPropertyUtil;
38 import io.netty.util.internal.ThreadExecutorMap;
39 import io.netty.util.internal.UnstableApi;
40
41 import java.io.IOException;
42 import java.io.InputStream;
43 import java.io.OutputStream;
44 import java.nio.ByteBuffer;
45 import java.nio.ByteOrder;
46 import java.nio.channels.ClosedChannelException;
47 import java.nio.channels.FileChannel;
48 import java.nio.channels.GatheringByteChannel;
49 import java.nio.channels.ScatteringByteChannel;
50 import java.nio.charset.Charset;
51 import java.util.Arrays;
52 import java.util.Iterator;
53 import java.util.Queue;
54 import java.util.concurrent.ConcurrentLinkedQueue;
55 import java.util.concurrent.atomic.AtomicInteger;
56 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
57 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
58 import java.util.concurrent.atomic.LongAdder;
59 import java.util.concurrent.locks.StampedLock;
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87 @SuppressJava6Requirement(reason = "Guarded by version check")
88 @UnstableApi
89 final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.AdaptiveAllocatorApi {
90 private static final int LOW_MEM_THRESHOLD = 512 * 1024 * 1024;
91 private static final boolean IS_LOW_MEM = Runtime.getRuntime().maxMemory() <= LOW_MEM_THRESHOLD;
92
93
94
95
96
97 private static final boolean DISABLE_THREAD_LOCAL_MAGAZINES_ON_LOW_MEM = SystemPropertyUtil.getBoolean(
98 "io.netty.allocator.disableThreadLocalMagazinesOnLowMemory", true);
99
100
101
102
103
104
105
106
107 static final int MIN_CHUNK_SIZE = 128 * 1024;
108 private static final int EXPANSION_ATTEMPTS = 3;
109 private static final int INITIAL_MAGAZINES = 1;
110 private static final int RETIRE_CAPACITY = 256;
111 private static final int MAX_STRIPES = IS_LOW_MEM ? 1 : NettyRuntime.availableProcessors() * 2;
112 private static final int BUFS_PER_CHUNK = 8;
113
114
115
116
117
118
119 private static final int MAX_CHUNK_SIZE = IS_LOW_MEM ?
120 2 * 1024 * 1024 :
121 8 * 1024 * 1024;
122 private static final int MAX_POOLED_BUF_SIZE = MAX_CHUNK_SIZE / BUFS_PER_CHUNK;
123
124
125
126
127
128
129 private static final int CHUNK_REUSE_QUEUE = Math.max(2, SystemPropertyUtil.getInt(
130 "io.netty.allocator.chunkReuseQueueCapacity", NettyRuntime.availableProcessors() * 2));
131
132
133
134
135
136 private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
137 "io.netty.allocator.magazineBufferQueueCapacity", 1024);
138
139
140
141
142
143
144
145
146
147
148
149
150
151 private static final int[] SIZE_CLASSES = {
152 32,
153 64,
154 128,
155 256,
156 512,
157 640,
158 1024,
159 1152,
160 2048,
161 2304,
162 4096,
163 4352,
164 8192,
165 8704,
166 16384,
167 16896,
168 };
169
170 private static final int SIZE_CLASSES_COUNT = SIZE_CLASSES.length;
171 private static final byte[] SIZE_INDEXES = new byte[SIZE_CLASSES[SIZE_CLASSES_COUNT - 1] / 32 + 1];
172
173 static {
174 if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
175 throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
176 + " (expected: >= " + 2 + ')');
177 }
178 int lastIndex = 0;
179 for (int i = 0; i < SIZE_CLASSES_COUNT; i++) {
180 int sizeClass = SIZE_CLASSES[i];
181
182 assert (sizeClass & 31) == 0 : "Size class must be a multiple of 32";
183 int sizeIndex = sizeIndexOf(sizeClass);
184 Arrays.fill(SIZE_INDEXES, lastIndex + 1, sizeIndex + 1, (byte) i);
185 lastIndex = sizeIndex;
186 }
187 }
188
189 private final ChunkAllocator chunkAllocator;
190 private final ChunkRegistry chunkRegistry;
191 private final MagazineGroup[] sizeClassedMagazineGroups;
192 private final MagazineGroup largeBufferMagazineGroup;
193 private final FastThreadLocal<MagazineGroup[]> threadLocalGroup;
194
195 AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, final boolean useCacheForNonEventLoopThreads) {
196 this.chunkAllocator = ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
197 chunkRegistry = new ChunkRegistry();
198 sizeClassedMagazineGroups = createMagazineGroupSizeClasses(this, false);
199 largeBufferMagazineGroup = new MagazineGroup(
200 this, chunkAllocator, new BuddyChunkManagementStrategy(), false);
201
202 boolean disableThreadLocalGroups = IS_LOW_MEM && DISABLE_THREAD_LOCAL_MAGAZINES_ON_LOW_MEM;
203 threadLocalGroup = disableThreadLocalGroups ? null : new FastThreadLocal<MagazineGroup[]>() {
204 @Override
205 protected MagazineGroup[] initialValue() {
206 if (useCacheForNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
207 return createMagazineGroupSizeClasses(AdaptivePoolingAllocator.this, true);
208 }
209 return null;
210 }
211
212 @Override
213 protected void onRemoval(final MagazineGroup[] groups) throws Exception {
214 if (groups != null) {
215 for (MagazineGroup group : groups) {
216 group.free();
217 }
218 }
219 }
220 };
221 }
222
223 private static MagazineGroup[] createMagazineGroupSizeClasses(
224 AdaptivePoolingAllocator allocator, boolean isThreadLocal) {
225 MagazineGroup[] groups = new MagazineGroup[SIZE_CLASSES.length];
226 for (int i = 0; i < SIZE_CLASSES.length; i++) {
227 int segmentSize = SIZE_CLASSES[i];
228 groups[i] = new MagazineGroup(allocator, allocator.chunkAllocator,
229 new SizeClassChunkManagementStrategy(segmentSize), isThreadLocal);
230 }
231 return groups;
232 }
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254 private static Queue<SizeClassedChunk> createSharedChunkQueue() {
255 return PlatformDependent.newFixedMpmcQueue(CHUNK_REUSE_QUEUE);
256 }
257
258 @Override
259 public ByteBuf allocate(int size, int maxCapacity) {
260 return allocate(size, maxCapacity, Thread.currentThread(), null);
261 }
262
263 private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
264 AdaptiveByteBuf allocated = null;
265 if (size <= MAX_POOLED_BUF_SIZE) {
266 final int index = sizeClassIndexOf(size);
267 MagazineGroup[] magazineGroups;
268 if (!FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread()) ||
269 IS_LOW_MEM ||
270 (magazineGroups = threadLocalGroup.get()) == null) {
271 magazineGroups = sizeClassedMagazineGroups;
272 }
273 if (index < magazineGroups.length) {
274 allocated = magazineGroups[index].allocate(size, maxCapacity, currentThread, buf);
275 } else if (!IS_LOW_MEM) {
276 allocated = largeBufferMagazineGroup.allocate(size, maxCapacity, currentThread, buf);
277 }
278 }
279 if (allocated == null) {
280 allocated = allocateFallback(size, maxCapacity, currentThread, buf);
281 }
282 return allocated;
283 }
284
285 private static int sizeIndexOf(final int size) {
286
287 return size + 31 >> 5;
288 }
289
290 static int sizeClassIndexOf(int size) {
291 int sizeIndex = sizeIndexOf(size);
292 if (sizeIndex < SIZE_INDEXES.length) {
293 return SIZE_INDEXES[sizeIndex];
294 }
295 return SIZE_CLASSES_COUNT;
296 }
297
298 static int[] getSizeClasses() {
299 return SIZE_CLASSES.clone();
300 }
301
302 private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
303
304 Magazine magazine;
305 if (buf != null) {
306 Chunk chunk = buf.chunk;
307 if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
308 magazine = getFallbackMagazine(currentThread);
309 }
310 } else {
311 magazine = getFallbackMagazine(currentThread);
312 buf = magazine.newBuffer();
313 }
314
315 AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
316 Chunk chunk = new Chunk(innerChunk, magazine);
317 chunkRegistry.add(chunk);
318 try {
319 boolean success = chunk.readInitInto(buf, size, size, maxCapacity);
320 assert success: "Failed to initialize ByteBuf with dedicated chunk";
321 } finally {
322
323
324
325 chunk.release();
326 }
327 return buf;
328 }
329
330 private Magazine getFallbackMagazine(Thread currentThread) {
331 Magazine[] mags = largeBufferMagazineGroup.magazines;
332 return mags[(int) currentThread.getId() & mags.length - 1];
333 }
334
335
336
337
338 void reallocate(int size, int maxCapacity, AdaptiveByteBuf into) {
339 AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
340 assert result == into: "Re-allocation created separate buffer instance";
341 }
342
343 @Override
344 public long usedMemory() {
345 return chunkRegistry.totalCapacity();
346 }
347
348
349
350
351 @SuppressWarnings({"FinalizeDeclaration", "deprecation"})
352 @Override
353 protected void finalize() throws Throwable {
354 try {
355 super.finalize();
356 } finally {
357 free();
358 }
359 }
360
361 private void free() {
362 largeBufferMagazineGroup.free();
363 }
364
365 @SuppressJava6Requirement(reason = "Guarded by version check")
366 private static final class MagazineGroup {
367 private final AdaptivePoolingAllocator allocator;
368 private final ChunkAllocator chunkAllocator;
369 private final ChunkManagementStrategy chunkManagementStrategy;
370 private final ChunkCache chunkCache;
371 private final StampedLock magazineExpandLock;
372 private final Magazine threadLocalMagazine;
373 private Thread ownerThread;
374 private volatile Magazine[] magazines;
375 private volatile boolean freed;
376
377 MagazineGroup(AdaptivePoolingAllocator allocator,
378 ChunkAllocator chunkAllocator,
379 ChunkManagementStrategy chunkManagementStrategy,
380 boolean isThreadLocal) {
381 this.allocator = allocator;
382 this.chunkAllocator = chunkAllocator;
383 this.chunkManagementStrategy = chunkManagementStrategy;
384 chunkCache = chunkManagementStrategy.createChunkCache(isThreadLocal);
385 if (isThreadLocal) {
386 ownerThread = Thread.currentThread();
387 magazineExpandLock = null;
388 threadLocalMagazine = new Magazine(this, false, chunkManagementStrategy.createController(this));
389 } else {
390 ownerThread = null;
391 magazineExpandLock = new StampedLock();
392 threadLocalMagazine = null;
393 Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
394 for (int i = 0; i < mags.length; i++) {
395 mags[i] = new Magazine(this, true, chunkManagementStrategy.createController(this));
396 }
397 magazines = mags;
398 }
399 }
400
401 public AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
402 boolean reallocate = buf != null;
403
404
405 Magazine tlMag = threadLocalMagazine;
406 if (tlMag != null) {
407 if (buf == null) {
408 buf = tlMag.newBuffer();
409 }
410 boolean allocated = tlMag.tryAllocate(size, maxCapacity, buf, reallocate);
411 assert allocated : "Allocation of threadLocalMagazine must always succeed";
412 return buf;
413 }
414
415
416 long threadId = currentThread.getId();
417 Magazine[] mags;
418 int expansions = 0;
419 do {
420 mags = magazines;
421 int mask = mags.length - 1;
422 int index = (int) (threadId & mask);
423 for (int i = 0, m = mags.length << 1; i < m; i++) {
424 Magazine mag = mags[index + i & mask];
425 if (buf == null) {
426 buf = mag.newBuffer();
427 }
428 if (mag.tryAllocate(size, maxCapacity, buf, reallocate)) {
429
430 return buf;
431 }
432 }
433 expansions++;
434 } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
435
436
437 if (!reallocate && buf != null) {
438 buf.release();
439 }
440 return null;
441 }
442
443 private boolean tryExpandMagazines(int currentLength) {
444 if (currentLength >= MAX_STRIPES) {
445 return true;
446 }
447 final Magazine[] mags;
448 long writeLock = magazineExpandLock.tryWriteLock();
449 if (writeLock != 0) {
450 try {
451 mags = magazines;
452 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
453 return true;
454 }
455 Magazine[] expanded = new Magazine[mags.length * 2];
456 for (int i = 0, l = expanded.length; i < l; i++) {
457 expanded[i] = new Magazine(this, true, chunkManagementStrategy.createController(this));
458 }
459 magazines = expanded;
460 } finally {
461 magazineExpandLock.unlockWrite(writeLock);
462 }
463 for (Magazine magazine : mags) {
464 magazine.free();
465 }
466 }
467 return true;
468 }
469
470 Chunk pollChunk(int size) {
471 return chunkCache.pollChunk(size);
472 }
473
474 boolean offerChunk(Chunk chunk) {
475 if (freed) {
476 return false;
477 }
478
479 if (chunk.hasUnprocessedFreelistEntries()) {
480 chunk.processFreelistEntries();
481 }
482 boolean isAdded = chunkCache.offerChunk(chunk);
483
484 if (freed && isAdded) {
485
486 freeChunkReuseQueue(ownerThread);
487 }
488 return isAdded;
489 }
490
491 private void free() {
492 freed = true;
493 Thread ownerThread = this.ownerThread;
494 if (threadLocalMagazine != null) {
495 this.ownerThread = null;
496 threadLocalMagazine.free();
497 } else {
498 long stamp = magazineExpandLock.writeLock();
499 try {
500 Magazine[] mags = magazines;
501 for (Magazine magazine : mags) {
502 magazine.free();
503 }
504 } finally {
505 magazineExpandLock.unlockWrite(stamp);
506 }
507 }
508 freeChunkReuseQueue(ownerThread);
509 }
510
511 private void freeChunkReuseQueue(Thread ownerThread) {
512 Chunk chunk;
513 while ((chunk = chunkCache.pollChunk(0)) != null) {
514 if (ownerThread != null && chunk instanceof SizeClassedChunk) {
515 SizeClassedChunk threadLocalChunk = (SizeClassedChunk) chunk;
516 assert ownerThread == threadLocalChunk.ownerThread;
517
518
519
520 threadLocalChunk.ownerThread = null;
521 }
522 chunk.markToDeallocate();
523 }
524 }
525 }
526
527 private interface ChunkCache {
528 Chunk pollChunk(int size);
529 boolean offerChunk(Chunk chunk);
530 }
531
532 private static final class ConcurrentQueueChunkCache implements ChunkCache {
533 private final Queue<SizeClassedChunk> queue;
534
535 private ConcurrentQueueChunkCache() {
536 queue = createSharedChunkQueue();
537 }
538
539 @Override
540 public SizeClassedChunk pollChunk(int size) {
541
542
543 Queue<SizeClassedChunk> queue = this.queue;
544 for (int i = 0; i < CHUNK_REUSE_QUEUE; i++) {
545 SizeClassedChunk chunk = queue.poll();
546 if (chunk == null) {
547 return null;
548 }
549 if (chunk.hasRemainingCapacity()) {
550 return chunk;
551 }
552 queue.offer(chunk);
553 }
554 return null;
555 }
556
557 @Override
558 public boolean offerChunk(Chunk chunk) {
559 return queue.offer((SizeClassedChunk) chunk);
560 }
561 }
562
563 private static final class ConcurrentSkipListChunkCache implements ChunkCache {
564 private final ConcurrentSkipListIntObjMultimap<Chunk> chunks;
565
566 private ConcurrentSkipListChunkCache() {
567 chunks = new ConcurrentSkipListIntObjMultimap<Chunk>(-1);
568 }
569
570 @Override
571 public Chunk pollChunk(int size) {
572 if (chunks.isEmpty()) {
573 return null;
574 }
575 IntEntry<Chunk> entry = chunks.pollCeilingEntry(size);
576 if (entry != null) {
577 Chunk chunk = entry.getValue();
578 if (chunk.hasUnprocessedFreelistEntries()) {
579 chunk.processFreelistEntries();
580 }
581 return chunk;
582 }
583
584 Chunk bestChunk = null;
585 int bestRemainingCapacity = 0;
586 Iterator<IntEntry<Chunk>> itr = chunks.iterator();
587 while (itr.hasNext()) {
588 entry = itr.next();
589 final Chunk chunk;
590 if (entry != null && (chunk = entry.getValue()).hasUnprocessedFreelistEntries()) {
591 if (!chunks.remove(entry.getKey(), entry.getValue())) {
592 continue;
593 }
594 chunk.processFreelistEntries();
595 int remainingCapacity = chunk.remainingCapacity();
596 if (remainingCapacity >= size &&
597 (bestChunk == null || remainingCapacity > bestRemainingCapacity)) {
598 if (bestChunk != null) {
599 chunks.put(bestRemainingCapacity, bestChunk);
600 }
601 bestChunk = chunk;
602 bestRemainingCapacity = remainingCapacity;
603 } else {
604 chunks.put(remainingCapacity, chunk);
605 }
606 }
607 }
608
609 return bestChunk;
610 }
611
612 @Override
613 public boolean offerChunk(Chunk chunk) {
614 chunks.put(chunk.remainingCapacity(), chunk);
615
616 int size = chunks.size();
617 while (size > CHUNK_REUSE_QUEUE) {
618
619 int key = -1;
620 Chunk toDeallocate = null;
621 for (IntEntry<Chunk> entry : chunks) {
622 Chunk candidate = entry.getValue();
623 if (candidate != null) {
624 if (toDeallocate == null) {
625 toDeallocate = candidate;
626 key = entry.getKey();
627 } else {
628 int candidateRefCnt = candidate.refCnt();
629 int toDeallocateRefCnt = toDeallocate.refCnt();
630 if (candidateRefCnt < toDeallocateRefCnt ||
631 candidateRefCnt == toDeallocateRefCnt &&
632 candidate.capacity() < toDeallocate.capacity()) {
633 toDeallocate = candidate;
634 key = entry.getKey();
635 }
636 }
637 }
638 }
639 if (toDeallocate == null) {
640 break;
641 }
642 if (chunks.remove(key, toDeallocate)) {
643 toDeallocate.markToDeallocate();
644 }
645 size = chunks.size();
646 }
647 return true;
648 }
649 }
650
651 private interface ChunkManagementStrategy {
652 ChunkController createController(MagazineGroup group);
653
654 ChunkCache createChunkCache(boolean isThreadLocal);
655 }
656
657 private interface ChunkController {
658
659
660
661 int computeBufferCapacity(int requestedSize, int maxCapacity, boolean isReallocation);
662
663
664
665
666 Chunk newChunkAllocation(int promptingSize, Magazine magazine);
667 }
668
669 private static final class SizeClassChunkManagementStrategy implements ChunkManagementStrategy {
670
671
672
673 private static final int MIN_SEGMENTS_PER_CHUNK = 32;
674 private final int segmentSize;
675 private final int chunkSize;
676
677 private SizeClassChunkManagementStrategy(int segmentSize) {
678 this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
679 chunkSize = Math.max(MIN_CHUNK_SIZE, segmentSize * MIN_SEGMENTS_PER_CHUNK);
680 }
681
682 @Override
683 public ChunkController createController(MagazineGroup group) {
684 return new SizeClassChunkController(group, segmentSize, chunkSize);
685 }
686
687 @Override
688 public ChunkCache createChunkCache(boolean isThreadLocal) {
689 return new ConcurrentQueueChunkCache();
690 }
691 }
692
693 private static final class SizeClassChunkController implements ChunkController {
694
695 private final ChunkAllocator chunkAllocator;
696 private final int segmentSize;
697 private final int chunkSize;
698 private final ChunkRegistry chunkRegistry;
699
700 private SizeClassChunkController(MagazineGroup group, int segmentSize, int chunkSize) {
701 chunkAllocator = group.chunkAllocator;
702 this.segmentSize = segmentSize;
703 this.chunkSize = chunkSize;
704 chunkRegistry = group.allocator.chunkRegistry;
705 }
706
707 private MpscIntQueue createEmptyFreeList() {
708 return new MpscAtomicIntegerArrayQueue(chunkSize / segmentSize, SizeClassedChunk.FREE_LIST_EMPTY);
709 }
710
711 private MpscIntQueue createFreeList() {
712 final int segmentsCount = chunkSize / segmentSize;
713 final MpscIntQueue freeList = new MpscAtomicIntegerArrayQueue(
714 segmentsCount, SizeClassedChunk.FREE_LIST_EMPTY);
715 int segmentOffset = 0;
716 for (int i = 0; i < segmentsCount; i++) {
717 freeList.offer(segmentOffset);
718 segmentOffset += segmentSize;
719 }
720 return freeList;
721 }
722
723 private IntStack createLocalFreeList() {
724 final int segmentsCount = chunkSize / segmentSize;
725 int segmentOffset = chunkSize;
726 int[] offsets = new int[segmentsCount];
727 for (int i = 0; i < segmentsCount; i++) {
728 segmentOffset -= segmentSize;
729 offsets[i] = segmentOffset;
730 }
731 return new IntStack(offsets);
732 }
733
734 @Override
735 public int computeBufferCapacity(
736 int requestedSize, int maxCapacity, boolean isReallocation) {
737 return Math.min(segmentSize, maxCapacity);
738 }
739
740 @Override
741 public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
742 AbstractByteBuf chunkBuffer = chunkAllocator.allocate(chunkSize, chunkSize);
743 assert chunkBuffer.capacity() == chunkSize;
744 SizeClassedChunk chunk = new SizeClassedChunk(chunkBuffer, magazine, this);
745 chunkRegistry.add(chunk);
746 return chunk;
747 }
748 }
749
750 private static final class BuddyChunkManagementStrategy implements ChunkManagementStrategy {
751 private final AtomicInteger maxChunkSize = new AtomicInteger();
752
753 @Override
754 public ChunkController createController(MagazineGroup group) {
755 return new BuddyChunkController(group, maxChunkSize);
756 }
757
758 @Override
759 public ChunkCache createChunkCache(boolean isThreadLocal) {
760 return new ConcurrentSkipListChunkCache();
761 }
762 }
763
764 private static final class BuddyChunkController implements ChunkController {
765 private final ChunkAllocator chunkAllocator;
766 private final ChunkRegistry chunkRegistry;
767 private final AtomicInteger maxChunkSize;
768
769 BuddyChunkController(MagazineGroup group, AtomicInteger maxChunkSize) {
770 chunkAllocator = group.chunkAllocator;
771 chunkRegistry = group.allocator.chunkRegistry;
772 this.maxChunkSize = maxChunkSize;
773 }
774
775 @Override
776 public int computeBufferCapacity(int requestedSize, int maxCapacity, boolean isReallocation) {
777 return MathUtil.safeFindNextPositivePowerOfTwo(requestedSize);
778 }
779
780 @Override
781 public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
782 int maxChunkSize = this.maxChunkSize.get();
783 int proposedChunkSize = MathUtil.safeFindNextPositivePowerOfTwo(BUFS_PER_CHUNK * promptingSize);
784 int chunkSize = Math.min(MAX_CHUNK_SIZE, Math.max(maxChunkSize, proposedChunkSize));
785 if (chunkSize > maxChunkSize) {
786
787 this.maxChunkSize.set(chunkSize);
788 }
789 BuddyChunk chunk = new BuddyChunk(chunkAllocator.allocate(chunkSize, chunkSize), magazine);
790 chunkRegistry.add(chunk);
791 return chunk;
792 }
793 }
794
795 @SuppressJava6Requirement(reason = "Guarded by version check")
796 private static final class Magazine {
797 private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
798 static {
799 NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
800 }
801 private static final Chunk MAGAZINE_FREED = new Chunk();
802
803 private static final class AdaptiveRecycler extends Recycler<AdaptiveByteBuf> {
804
805 private AdaptiveRecycler() {
806 }
807
808 private AdaptiveRecycler(int maxCapacity) {
809
810 super(maxCapacity);
811 }
812
813 @Override
814 protected AdaptiveByteBuf newObject(final Handle<AdaptiveByteBuf> handle) {
815 return new AdaptiveByteBuf((EnhancedHandle<AdaptiveByteBuf>) handle);
816 }
817
818 public static AdaptiveRecycler threadLocal() {
819 return new AdaptiveRecycler();
820 }
821
822 public static AdaptiveRecycler sharedWith(int maxCapacity) {
823 return new AdaptiveRecycler(maxCapacity);
824 }
825 }
826
827 private static final AdaptiveRecycler EVENT_LOOP_LOCAL_BUFFER_POOL = AdaptiveRecycler.threadLocal();
828
829 private Chunk current;
830 @SuppressWarnings("unused")
831 private volatile Chunk nextInLine;
832 private final MagazineGroup group;
833 private final ChunkController chunkController;
834 private final StampedLock allocationLock;
835 private final AdaptiveRecycler recycler;
836
837 Magazine(MagazineGroup group, boolean shareable, ChunkController chunkController) {
838 this.group = group;
839 this.chunkController = chunkController;
840
841 if (shareable) {
842
843 allocationLock = new StampedLock();
844 recycler = AdaptiveRecycler.sharedWith(MAGAZINE_BUFFER_QUEUE_CAPACITY);
845 } else {
846 allocationLock = null;
847 recycler = null;
848 }
849 }
850
851 public boolean tryAllocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
852 if (allocationLock == null) {
853
854 return allocate(size, maxCapacity, buf, reallocate);
855 }
856
857
858 long writeLock = allocationLock.tryWriteLock();
859 if (writeLock != 0) {
860 try {
861 return allocate(size, maxCapacity, buf, reallocate);
862 } finally {
863 allocationLock.unlockWrite(writeLock);
864 }
865 }
866 return allocateWithoutLock(size, maxCapacity, buf);
867 }
868
869 private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf buf) {
870 Chunk curr = NEXT_IN_LINE.getAndSet(this, null);
871 if (curr == MAGAZINE_FREED) {
872
873 restoreMagazineFreed();
874 return false;
875 }
876 if (curr == null) {
877 curr = group.pollChunk(size);
878 if (curr == null) {
879 return false;
880 }
881 curr.attachToMagazine(this);
882 }
883 boolean allocated = false;
884 int remainingCapacity = curr.remainingCapacity();
885 int startingCapacity = chunkController.computeBufferCapacity(
886 size, maxCapacity, true );
887 if (remainingCapacity >= size &&
888 curr.readInitInto(buf, size, Math.min(remainingCapacity, startingCapacity), maxCapacity)) {
889 allocated = true;
890 remainingCapacity = curr.remainingCapacity();
891 }
892 try {
893 if (remainingCapacity >= RETIRE_CAPACITY) {
894 transferToNextInLineOrRelease(curr);
895 curr = null;
896 }
897 } finally {
898 if (curr != null) {
899 curr.releaseFromMagazine();
900 }
901 }
902 return allocated;
903 }
904
905 private boolean allocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
906 int startingCapacity = chunkController.computeBufferCapacity(size, maxCapacity, reallocate);
907 Chunk curr = current;
908 if (curr != null) {
909 boolean success = curr.readInitInto(buf, size, startingCapacity, maxCapacity);
910 int remainingCapacity = curr.remainingCapacity();
911 if (!success && remainingCapacity > 0) {
912 current = null;
913 transferToNextInLineOrRelease(curr);
914 } else if (remainingCapacity == 0) {
915 current = null;
916 curr.releaseFromMagazine();
917 }
918 if (success) {
919 return true;
920 }
921 }
922
923 assert current == null;
924
925
926
927
928
929
930
931
932 curr = NEXT_IN_LINE.getAndSet(this, null);
933 if (curr != null) {
934 if (curr == MAGAZINE_FREED) {
935
936 restoreMagazineFreed();
937 return false;
938 }
939
940 int remainingCapacity = curr.remainingCapacity();
941 if (remainingCapacity > startingCapacity &&
942 curr.readInitInto(buf, size, startingCapacity, maxCapacity)) {
943
944 current = curr;
945 return true;
946 }
947
948 try {
949 if (remainingCapacity >= size) {
950
951
952 return curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
953 }
954 } finally {
955
956
957 curr.releaseFromMagazine();
958 }
959 }
960
961
962 curr = group.pollChunk(size);
963 if (curr == null) {
964 curr = chunkController.newChunkAllocation(size, this);
965 } else {
966 curr.attachToMagazine(this);
967
968 int remainingCapacity = curr.remainingCapacity();
969 if (remainingCapacity == 0 || remainingCapacity < size) {
970
971 if (remainingCapacity < RETIRE_CAPACITY) {
972 curr.releaseFromMagazine();
973 } else {
974
975
976 transferToNextInLineOrRelease(curr);
977 }
978 curr = chunkController.newChunkAllocation(size, this);
979 }
980 }
981
982 current = curr;
983 boolean success;
984 try {
985 int remainingCapacity = curr.remainingCapacity();
986 assert remainingCapacity >= size;
987 if (remainingCapacity > startingCapacity) {
988 success = curr.readInitInto(buf, size, startingCapacity, maxCapacity);
989 curr = null;
990 } else {
991 success = curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
992 }
993 } finally {
994 if (curr != null) {
995
996
997 curr.releaseFromMagazine();
998 current = null;
999 }
1000 }
1001 return success;
1002 }
1003
1004 private void restoreMagazineFreed() {
1005 Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
1006 if (next != null && next != MAGAZINE_FREED) {
1007
1008 next.releaseFromMagazine();
1009 }
1010 }
1011
1012 private void transferToNextInLineOrRelease(Chunk chunk) {
1013 if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
1014 return;
1015 }
1016
1017 Chunk nextChunk = NEXT_IN_LINE.get(this);
1018 if (nextChunk != null && nextChunk != MAGAZINE_FREED
1019 && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
1020 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
1021 nextChunk.releaseFromMagazine();
1022 return;
1023 }
1024 }
1025
1026
1027
1028
1029 chunk.releaseFromMagazine();
1030 }
1031
1032 void free() {
1033
1034 restoreMagazineFreed();
1035 long stamp = allocationLock != null ? allocationLock.writeLock() : 0;
1036 try {
1037 if (current != null) {
1038 current.releaseFromMagazine();
1039 current = null;
1040 }
1041 } finally {
1042 if (allocationLock != null) {
1043 allocationLock.unlockWrite(stamp);
1044 }
1045 }
1046 }
1047
1048 public AdaptiveByteBuf newBuffer() {
1049 AdaptiveRecycler recycler = this.recycler;
1050 AdaptiveByteBuf buf = recycler == null? EVENT_LOOP_LOCAL_BUFFER_POOL.get() : recycler.get();
1051 buf.resetRefCnt();
1052 buf.discardMarks();
1053 return buf;
1054 }
1055
1056 boolean offerToQueue(Chunk chunk) {
1057 return group.offerChunk(chunk);
1058 }
1059 }
1060
1061 @SuppressJava6Requirement(reason = "Guarded by version check")
1062 private static final class ChunkRegistry {
1063 private final LongAdder totalCapacity = new LongAdder();
1064
1065 public long totalCapacity() {
1066 return totalCapacity.sum();
1067 }
1068
1069 public void add(Chunk chunk) {
1070 totalCapacity.add(chunk.capacity());
1071 }
1072
1073 public void remove(Chunk chunk) {
1074 totalCapacity.add(-chunk.capacity());
1075 }
1076 }
1077
1078 private static class Chunk implements ReferenceCounted {
1079 private static final long REFCNT_FIELD_OFFSET =
1080 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
1081 private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
1082 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
1083
1084 protected final AbstractByteBuf delegate;
1085 protected Magazine magazine;
1086 private final AdaptivePoolingAllocator allocator;
1087 private final int capacity;
1088 protected int allocatedBytes;
1089
1090 private static final ReferenceCountUpdater<Chunk> updater =
1091 new ReferenceCountUpdater<Chunk>() {
1092 @Override
1093 protected AtomicIntegerFieldUpdater<Chunk> updater() {
1094 return AIF_UPDATER;
1095 }
1096 @Override
1097 protected long unsafeOffset() {
1098
1099
1100 return PlatformDependent.hasUnsafe() ? REFCNT_FIELD_OFFSET : -1;
1101 }
1102 };
1103
1104
1105 @SuppressWarnings({"unused", "FieldMayBeFinal"})
1106 private volatile int refCnt;
1107
1108 Chunk() {
1109
1110 delegate = null;
1111 magazine = null;
1112 allocator = null;
1113 capacity = 0;
1114 }
1115
1116 Chunk(AbstractByteBuf delegate, Magazine magazine) {
1117 this.delegate = delegate;
1118 capacity = delegate.capacity();
1119 updater.setInitialValue(this);
1120 attachToMagazine(magazine);
1121
1122
1123 allocator = magazine.group.allocator;
1124 }
1125
1126 Magazine currentMagazine() {
1127 return magazine;
1128 }
1129
1130 void detachFromMagazine() {
1131 if (magazine != null) {
1132 magazine = null;
1133 }
1134 }
1135
1136 void attachToMagazine(Magazine magazine) {
1137 assert this.magazine == null;
1138 this.magazine = magazine;
1139 }
1140
1141 @Override
1142 public Chunk touch(Object hint) {
1143 return this;
1144 }
1145
1146 @Override
1147 public int refCnt() {
1148 return updater.refCnt(this);
1149 }
1150
1151 @Override
1152 public Chunk retain() {
1153 return updater.retain(this);
1154 }
1155
1156 @Override
1157 public Chunk retain(int increment) {
1158 return updater.retain(this, increment);
1159 }
1160
1161 @Override
1162 public Chunk touch() {
1163 return this;
1164 }
1165
1166 @Override
1167 public boolean release() {
1168 if (updater.release(this)) {
1169 deallocate();
1170 return true;
1171 }
1172 return false;
1173 }
1174
1175 @Override
1176 public boolean release(int decrement) {
1177 if (updater.release(this, decrement)) {
1178 deallocate();
1179 return true;
1180 }
1181 return false;
1182 }
1183
1184
1185
1186
1187 void releaseFromMagazine() {
1188
1189
1190 Magazine mag = magazine;
1191 detachFromMagazine();
1192 if (!mag.offerToQueue(this)) {
1193 markToDeallocate();
1194 }
1195 }
1196
1197
1198
1199
1200 void releaseSegment(int ignoredSegmentId, int size) {
1201 release();
1202 }
1203
1204 void markToDeallocate() {
1205 release();
1206 }
1207
1208 protected void deallocate() {
1209 allocator.chunkRegistry.remove(this);
1210 delegate.release();
1211 }
1212
1213 public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1214 int startIndex = allocatedBytes;
1215 allocatedBytes = startIndex + startingCapacity;
1216 Chunk chunk = this;
1217 chunk.retain();
1218 try {
1219 buf.init(delegate, chunk, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1220 chunk = null;
1221 } finally {
1222 if (chunk != null) {
1223
1224
1225
1226 allocatedBytes = startIndex;
1227 chunk.release();
1228 }
1229 }
1230 return true;
1231 }
1232
1233 public int remainingCapacity() {
1234 return capacity - allocatedBytes;
1235 }
1236
1237 public boolean hasUnprocessedFreelistEntries() {
1238 return false;
1239 }
1240
1241 public void processFreelistEntries() {
1242 }
1243
1244 public int capacity() {
1245 return capacity;
1246 }
1247 }
1248
1249 private static final class IntStack {
1250
1251 private final int[] stack;
1252 private int top;
1253
1254 IntStack(int[] initialValues) {
1255 stack = initialValues;
1256 top = initialValues.length - 1;
1257 }
1258
1259 public boolean isEmpty() {
1260 return top == -1;
1261 }
1262
1263 public int pop() {
1264 final int last = stack[top];
1265 top--;
1266 return last;
1267 }
1268
1269 public void push(int value) {
1270 stack[top + 1] = value;
1271 top++;
1272 }
1273
1274 public int size() {
1275 return top + 1;
1276 }
1277 }
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298 private static final class SizeClassedChunk extends Chunk {
1299 private static final int FREE_LIST_EMPTY = -1;
1300 private static final int AVAILABLE = -1;
1301
1302
1303 private static final int DEALLOCATED = Integer.MIN_VALUE;
1304 private static final AtomicIntegerFieldUpdater<SizeClassedChunk> STATE =
1305 AtomicIntegerFieldUpdater.newUpdater(SizeClassedChunk.class, "state");
1306 private volatile int state;
1307 private final int segments;
1308 private final int segmentSize;
1309 private final MpscIntQueue externalFreeList;
1310 private final IntStack localFreeList;
1311 private Thread ownerThread;
1312
1313 SizeClassedChunk(AbstractByteBuf delegate, Magazine magazine,
1314 SizeClassChunkController controller) {
1315 super(delegate, magazine);
1316 segmentSize = controller.segmentSize;
1317 segments = controller.chunkSize / segmentSize;
1318 STATE.lazySet(this, AVAILABLE);
1319 ownerThread = magazine.group.ownerThread;
1320 if (ownerThread == null) {
1321 externalFreeList = controller.createFreeList();
1322 localFreeList = null;
1323 } else {
1324 externalFreeList = controller.createEmptyFreeList();
1325 localFreeList = controller.createLocalFreeList();
1326 }
1327 }
1328
1329 @Override
1330 public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1331 assert state == AVAILABLE;
1332 final int startIndex = nextAvailableSegmentOffset();
1333 if (startIndex == FREE_LIST_EMPTY) {
1334 return false;
1335 }
1336 allocatedBytes += segmentSize;
1337 try {
1338 buf.init(delegate, this, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1339 } catch (Throwable t) {
1340 allocatedBytes -= segmentSize;
1341 releaseSegmentOffsetIntoFreeList(startIndex);
1342 PlatformDependent.throwException(t);
1343 }
1344 return true;
1345 }
1346
1347 private int nextAvailableSegmentOffset() {
1348 final int startIndex;
1349 IntStack localFreeList = this.localFreeList;
1350 if (localFreeList != null) {
1351 assert Thread.currentThread() == ownerThread;
1352 if (localFreeList.isEmpty()) {
1353 startIndex = externalFreeList.poll();
1354 } else {
1355 startIndex = localFreeList.pop();
1356 }
1357 } else {
1358 startIndex = externalFreeList.poll();
1359 }
1360 return startIndex;
1361 }
1362
1363
1364
1365
1366 public boolean hasRemainingCapacity() {
1367 int remaining = super.remainingCapacity();
1368 if (remaining > 0) {
1369 return true;
1370 }
1371 if (localFreeList != null) {
1372 return !localFreeList.isEmpty();
1373 }
1374 return !externalFreeList.isEmpty();
1375 }
1376
1377 @Override
1378 public int remainingCapacity() {
1379 int remaining = super.remainingCapacity();
1380 return remaining > segmentSize ? remaining : updateRemainingCapacity(remaining);
1381 }
1382
1383 private int updateRemainingCapacity(int snapshotted) {
1384 int freeSegments = externalFreeList.size();
1385 IntStack localFreeList = this.localFreeList;
1386 if (localFreeList != null) {
1387 freeSegments += localFreeList.size();
1388 }
1389 int updated = freeSegments * segmentSize;
1390 if (updated != snapshotted) {
1391 allocatedBytes = capacity() - updated;
1392 }
1393 return updated;
1394 }
1395
1396 private void releaseSegmentOffsetIntoFreeList(int startIndex) {
1397 IntStack localFreeList = this.localFreeList;
1398 if (localFreeList != null && Thread.currentThread() == ownerThread) {
1399 localFreeList.push(startIndex);
1400 } else {
1401 boolean segmentReturned = externalFreeList.offer(startIndex);
1402 assert segmentReturned : "Unable to return segment " + startIndex + " to free list";
1403 }
1404 }
1405
1406 @Override
1407 void releaseSegment(int startIndex, int size) {
1408 IntStack localFreeList = this.localFreeList;
1409 if (localFreeList != null && Thread.currentThread() == ownerThread) {
1410 localFreeList.push(startIndex);
1411 int state = this.state;
1412 if (state != AVAILABLE) {
1413 updateStateOnLocalReleaseSegment(state, localFreeList);
1414 }
1415 } else {
1416 boolean segmentReturned = externalFreeList.offer(startIndex);
1417 assert segmentReturned;
1418
1419 int state = this.state;
1420 if (state != AVAILABLE) {
1421 deallocateIfNeeded(state);
1422 }
1423 }
1424 }
1425
1426 private void updateStateOnLocalReleaseSegment(int previousLocalSize, IntStack localFreeList) {
1427 int newLocalSize = localFreeList.size();
1428 boolean alwaysTrue = STATE.compareAndSet(this, previousLocalSize, newLocalSize);
1429 assert alwaysTrue : "this shouldn't happen unless double release in the local free list";
1430 deallocateIfNeeded(newLocalSize);
1431 }
1432
1433 private void deallocateIfNeeded(int localSize) {
1434
1435 int totalFreeSegments = localSize + externalFreeList.size();
1436 if (totalFreeSegments == segments && STATE.compareAndSet(this, localSize, DEALLOCATED)) {
1437 deallocate();
1438 }
1439 }
1440
1441 @Override
1442 void markToDeallocate() {
1443 IntStack localFreeList = this.localFreeList;
1444 int localSize = localFreeList != null ? localFreeList.size() : 0;
1445 STATE.set(this, localSize);
1446 deallocateIfNeeded(localSize);
1447 }
1448 }
1449
1450 private static final class BuddyChunk extends Chunk implements IntConsumer {
1451 private static final int MIN_BUDDY_SIZE = 32768;
1452 private static final byte IS_CLAIMED = (byte) (1 << 7);
1453 private static final byte HAS_CLAIMED_CHILDREN = 1 << 6;
1454 private static final byte SHIFT_MASK = ~(IS_CLAIMED | HAS_CLAIMED_CHILDREN);
1455 private static final int PACK_OFFSET_MASK = 0xFFFF;
1456 private static final int PACK_SIZE_SHIFT = Integer.SIZE - Integer.numberOfLeadingZeros(PACK_OFFSET_MASK);
1457
1458 private final MpscIntQueue freeList;
1459
1460 private final byte[] buddies;
1461 private final int freeListCapacity;
1462
1463 BuddyChunk(AbstractByteBuf delegate, Magazine magazine) {
1464 super(delegate, magazine);
1465 freeListCapacity = delegate.capacity() / MIN_BUDDY_SIZE;
1466 int maxShift = Integer.numberOfTrailingZeros(freeListCapacity);
1467 assert maxShift <= 30;
1468
1469 freeList = new MpscAtomicIntegerArrayQueue(freeListCapacity, -1);
1470 buddies = new byte[freeListCapacity << 1];
1471
1472
1473 int index = 1;
1474 int runLength = 1;
1475 int currentRun = 0;
1476 while (maxShift > 0) {
1477 buddies[index++] = (byte) maxShift;
1478 if (++currentRun == runLength) {
1479 currentRun = 0;
1480 runLength <<= 1;
1481 maxShift--;
1482 }
1483 }
1484 }
1485
1486 @Override
1487 public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
1488 if (!freeList.isEmpty()) {
1489 freeList.drain(freeListCapacity, this);
1490 }
1491 int startIndex = chooseFirstFreeBuddy(1, startingCapacity, 0);
1492 if (startIndex == -1) {
1493 return false;
1494 }
1495 Chunk chunk = this;
1496 chunk.retain();
1497 try {
1498 buf.init(delegate, this, 0, 0, startIndex, size, startingCapacity, maxCapacity);
1499 allocatedBytes += startingCapacity;
1500 chunk = null;
1501 } finally {
1502 if (chunk != null) {
1503 unreserveMatchingBuddy(1, startingCapacity, startIndex, 0);
1504
1505
1506 chunk.release();
1507 }
1508 }
1509 return true;
1510 }
1511
1512 @Override
1513 public void accept(int packed) {
1514
1515 int size = MIN_BUDDY_SIZE << (packed >> PACK_SIZE_SHIFT);
1516 int offset = (packed & PACK_OFFSET_MASK) * MIN_BUDDY_SIZE;
1517 unreserveMatchingBuddy(1, size, offset, 0);
1518 allocatedBytes -= size;
1519 }
1520
1521 @Override
1522 void releaseSegment(int startingIndex, int size) {
1523 int packedOffset = startingIndex / MIN_BUDDY_SIZE;
1524 int packedSize = Integer.numberOfTrailingZeros(size / MIN_BUDDY_SIZE) << PACK_SIZE_SHIFT;
1525 int packed = packedOffset | packedSize;
1526 freeList.offer(packed);
1527 release();
1528 }
1529
1530 @Override
1531 public int remainingCapacity() {
1532 if (!freeList.isEmpty()) {
1533 freeList.drain(freeListCapacity, this);
1534 }
1535 return super.remainingCapacity();
1536 }
1537
1538 @Override
1539 public boolean hasUnprocessedFreelistEntries() {
1540 return !freeList.isEmpty();
1541 }
1542
1543 @Override
1544 public void processFreelistEntries() {
1545 freeList.drain(freeListCapacity, this);
1546 }
1547
1548
1549
1550
1551 private int chooseFirstFreeBuddy(int index, int size, int currOffset) {
1552 byte[] buddies = this.buddies;
1553 while (index < buddies.length) {
1554 byte buddy = buddies[index];
1555 int currValue = MIN_BUDDY_SIZE << (buddy & SHIFT_MASK);
1556 if (currValue < size || (buddy & IS_CLAIMED) == IS_CLAIMED) {
1557 return -1;
1558 }
1559 if (currValue == size && (buddy & HAS_CLAIMED_CHILDREN) == 0) {
1560 buddies[index] |= IS_CLAIMED;
1561 return currOffset;
1562 }
1563 int found = chooseFirstFreeBuddy(index << 1, size, currOffset);
1564 if (found != -1) {
1565 buddies[index] |= HAS_CLAIMED_CHILDREN;
1566 return found;
1567 }
1568 index = (index << 1) + 1;
1569 currOffset += currValue >> 1;
1570 }
1571 return -1;
1572 }
1573
1574
1575
1576
1577 private boolean unreserveMatchingBuddy(int index, int size, int offset, int currOffset) {
1578 byte[] buddies = this.buddies;
1579 if (buddies.length <= index) {
1580 return false;
1581 }
1582 byte buddy = buddies[index];
1583 int currSize = MIN_BUDDY_SIZE << (buddy & SHIFT_MASK);
1584
1585 if (currSize == size) {
1586
1587 if (currOffset == offset) {
1588 buddies[index] &= SHIFT_MASK;
1589 return false;
1590 }
1591 throw new IllegalStateException("The intended segment was not found at index " +
1592 index + ", for size " + size + " and offset " + offset);
1593 }
1594
1595
1596 boolean claims;
1597 int siblingIndex;
1598 if (offset < currOffset + (currSize >> 1)) {
1599
1600 claims = unreserveMatchingBuddy(index << 1, size, offset, currOffset);
1601 siblingIndex = (index << 1) + 1;
1602 } else {
1603
1604 claims = unreserveMatchingBuddy((index << 1) + 1, size, offset, currOffset + (currSize >> 1));
1605 siblingIndex = index << 1;
1606 }
1607 if (!claims) {
1608
1609 byte sibling = buddies[siblingIndex];
1610 if ((sibling & SHIFT_MASK) == sibling) {
1611
1612 buddies[index] &= SHIFT_MASK;
1613 return false;
1614 }
1615 }
1616 return true;
1617 }
1618
1619 @Override
1620 public String toString() {
1621 int capacity = delegate.capacity();
1622 int remaining = capacity - allocatedBytes;
1623 return "BuddyChunk[capacity: " + capacity +
1624 ", remaining: " + remaining +
1625 ", free list: " + freeList.size() + ']';
1626 }
1627 }
1628
1629 static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
1630
1631 private final EnhancedHandle<AdaptiveByteBuf> handle;
1632
1633
1634 private int startIndex;
1635 private AbstractByteBuf rootParent;
1636 Chunk chunk;
1637 private int length;
1638 private int maxFastCapacity;
1639 private ByteBuffer tmpNioBuf;
1640 private boolean hasArray;
1641 private boolean hasMemoryAddress;
1642
1643 AdaptiveByteBuf(EnhancedHandle<AdaptiveByteBuf> recyclerHandle) {
1644 super(0);
1645 handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
1646 }
1647
1648 void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
1649 int startIndex, int size, int capacity, int maxCapacity) {
1650 this.startIndex = startIndex;
1651 chunk = wrapped;
1652 length = size;
1653 maxFastCapacity = capacity;
1654 maxCapacity(maxCapacity);
1655 setIndex0(readerIndex, writerIndex);
1656 hasArray = unwrapped.hasArray();
1657 hasMemoryAddress = unwrapped.hasMemoryAddress();
1658 rootParent = unwrapped;
1659 tmpNioBuf = null;
1660 }
1661
1662 private AbstractByteBuf rootParent() {
1663 final AbstractByteBuf rootParent = this.rootParent;
1664 if (rootParent != null) {
1665 return rootParent;
1666 }
1667 throw new IllegalReferenceCountException();
1668 }
1669
1670 @Override
1671 public int capacity() {
1672 return length;
1673 }
1674
1675 @Override
1676 public int maxFastWritableBytes() {
1677 return Math.min(maxFastCapacity, maxCapacity()) - writerIndex;
1678 }
1679
1680 @Override
1681 public ByteBuf capacity(int newCapacity) {
1682 checkNewCapacity(newCapacity);
1683 if (length <= newCapacity && newCapacity <= maxFastCapacity) {
1684 length = newCapacity;
1685 return this;
1686 }
1687 if (newCapacity < capacity()) {
1688 length = newCapacity;
1689 trimIndicesToCapacity(newCapacity);
1690 return this;
1691 }
1692
1693
1694 Chunk chunk = this.chunk;
1695 AdaptivePoolingAllocator allocator = chunk.allocator;
1696 int readerIndex = this.readerIndex;
1697 int writerIndex = this.writerIndex;
1698 int baseOldRootIndex = startIndex;
1699 int oldLength = length;
1700 int oldCapacity = maxFastCapacity;
1701 AbstractByteBuf oldRoot = rootParent();
1702 allocator.reallocate(newCapacity, maxCapacity(), this);
1703 oldRoot.getBytes(baseOldRootIndex, this, 0, oldLength);
1704 chunk.releaseSegment(baseOldRootIndex, oldCapacity);
1705 assert oldCapacity < maxFastCapacity && newCapacity <= maxFastCapacity:
1706 "Capacity increase failed";
1707 this.readerIndex = readerIndex;
1708 this.writerIndex = writerIndex;
1709 return this;
1710 }
1711
1712 @Override
1713 public ByteBufAllocator alloc() {
1714 return rootParent().alloc();
1715 }
1716
1717 @SuppressWarnings("deprecation")
1718 @Override
1719 public ByteOrder order() {
1720 return rootParent().order();
1721 }
1722
1723 @Override
1724 public ByteBuf unwrap() {
1725 return null;
1726 }
1727
1728 @Override
1729 public boolean isDirect() {
1730 return rootParent().isDirect();
1731 }
1732
1733 @Override
1734 public int arrayOffset() {
1735 return idx(rootParent().arrayOffset());
1736 }
1737
1738 @Override
1739 public boolean hasMemoryAddress() {
1740 return hasMemoryAddress;
1741 }
1742
1743 @Override
1744 public long memoryAddress() {
1745 ensureAccessible();
1746 return rootParent().memoryAddress() + startIndex;
1747 }
1748
1749 @Override
1750 public ByteBuffer nioBuffer(int index, int length) {
1751 checkIndex(index, length);
1752 return rootParent().nioBuffer(idx(index), length);
1753 }
1754
1755 @Override
1756 public ByteBuffer internalNioBuffer(int index, int length) {
1757 checkIndex(index, length);
1758 return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1759 }
1760
1761 private ByteBuffer internalNioBuffer() {
1762 if (tmpNioBuf == null) {
1763 tmpNioBuf = rootParent().nioBuffer(startIndex, maxFastCapacity);
1764 }
1765 return (ByteBuffer) tmpNioBuf.clear();
1766 }
1767
1768 @Override
1769 public ByteBuffer[] nioBuffers(int index, int length) {
1770 checkIndex(index, length);
1771 return rootParent().nioBuffers(idx(index), length);
1772 }
1773
1774 @Override
1775 public boolean hasArray() {
1776 return hasArray;
1777 }
1778
1779 @Override
1780 public byte[] array() {
1781 ensureAccessible();
1782 return rootParent().array();
1783 }
1784
1785 @Override
1786 public ByteBuf copy(int index, int length) {
1787 checkIndex(index, length);
1788 return rootParent().copy(idx(index), length);
1789 }
1790
1791 @Override
1792 public int nioBufferCount() {
1793 return rootParent().nioBufferCount();
1794 }
1795
1796 @Override
1797 protected byte _getByte(int index) {
1798 return rootParent()._getByte(idx(index));
1799 }
1800
1801 @Override
1802 protected short _getShort(int index) {
1803 return rootParent()._getShort(idx(index));
1804 }
1805
1806 @Override
1807 protected short _getShortLE(int index) {
1808 return rootParent()._getShortLE(idx(index));
1809 }
1810
1811 @Override
1812 protected int _getUnsignedMedium(int index) {
1813 return rootParent()._getUnsignedMedium(idx(index));
1814 }
1815
1816 @Override
1817 protected int _getUnsignedMediumLE(int index) {
1818 return rootParent()._getUnsignedMediumLE(idx(index));
1819 }
1820
1821 @Override
1822 protected int _getInt(int index) {
1823 return rootParent()._getInt(idx(index));
1824 }
1825
1826 @Override
1827 protected int _getIntLE(int index) {
1828 return rootParent()._getIntLE(idx(index));
1829 }
1830
1831 @Override
1832 protected long _getLong(int index) {
1833 return rootParent()._getLong(idx(index));
1834 }
1835
1836 @Override
1837 protected long _getLongLE(int index) {
1838 return rootParent()._getLongLE(idx(index));
1839 }
1840
1841 @Override
1842 public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1843 checkIndex(index, length);
1844 rootParent().getBytes(idx(index), dst, dstIndex, length);
1845 return this;
1846 }
1847
1848 @Override
1849 public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1850 checkIndex(index, length);
1851 rootParent().getBytes(idx(index), dst, dstIndex, length);
1852 return this;
1853 }
1854
1855 @Override
1856 public ByteBuf getBytes(int index, ByteBuffer dst) {
1857 checkIndex(index, dst.remaining());
1858 rootParent().getBytes(idx(index), dst);
1859 return this;
1860 }
1861
1862 @Override
1863 protected void _setByte(int index, int value) {
1864 rootParent()._setByte(idx(index), value);
1865 }
1866
1867 @Override
1868 protected void _setShort(int index, int value) {
1869 rootParent()._setShort(idx(index), value);
1870 }
1871
1872 @Override
1873 protected void _setShortLE(int index, int value) {
1874 rootParent()._setShortLE(idx(index), value);
1875 }
1876
1877 @Override
1878 protected void _setMedium(int index, int value) {
1879 rootParent()._setMedium(idx(index), value);
1880 }
1881
1882 @Override
1883 protected void _setMediumLE(int index, int value) {
1884 rootParent()._setMediumLE(idx(index), value);
1885 }
1886
1887 @Override
1888 protected void _setInt(int index, int value) {
1889 rootParent()._setInt(idx(index), value);
1890 }
1891
1892 @Override
1893 protected void _setIntLE(int index, int value) {
1894 rootParent()._setIntLE(idx(index), value);
1895 }
1896
1897 @Override
1898 protected void _setLong(int index, long value) {
1899 rootParent()._setLong(idx(index), value);
1900 }
1901
1902 @Override
1903 protected void _setLongLE(int index, long value) {
1904 rootParent().setLongLE(idx(index), value);
1905 }
1906
1907 @Override
1908 public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1909 checkIndex(index, length);
1910 ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1911 tmp.put(src, srcIndex, length);
1912 return this;
1913 }
1914
1915 @Override
1916 public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1917 checkIndex(index, length);
1918 ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1919 tmp.put(src.nioBuffer(srcIndex, length));
1920 return this;
1921 }
1922
1923 @Override
1924 public ByteBuf setBytes(int index, ByteBuffer src) {
1925 checkIndex(index, src.remaining());
1926 ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1927 tmp.put(src);
1928 return this;
1929 }
1930
1931 @Override
1932 public ByteBuf getBytes(int index, OutputStream out, int length)
1933 throws IOException {
1934 checkIndex(index, length);
1935 if (length != 0) {
1936 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1937 }
1938 return this;
1939 }
1940
1941 @Override
1942 public int getBytes(int index, GatheringByteChannel out, int length)
1943 throws IOException {
1944 ByteBuffer buf = internalNioBuffer().duplicate();
1945 buf.clear().position(index).limit(index + length);
1946 return out.write(buf);
1947 }
1948
1949 @Override
1950 public int getBytes(int index, FileChannel out, long position, int length)
1951 throws IOException {
1952 ByteBuffer buf = internalNioBuffer().duplicate();
1953 buf.clear().position(index).limit(index + length);
1954 return out.write(buf, position);
1955 }
1956
1957 @Override
1958 public int setBytes(int index, InputStream in, int length)
1959 throws IOException {
1960 checkIndex(index, length);
1961 final AbstractByteBuf rootParent = rootParent();
1962 if (rootParent.hasArray()) {
1963 return rootParent.setBytes(idx(index), in, length);
1964 }
1965 byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1966 int readBytes = in.read(tmp, 0, length);
1967 if (readBytes <= 0) {
1968 return readBytes;
1969 }
1970 setBytes(index, tmp, 0, readBytes);
1971 return readBytes;
1972 }
1973
1974 @Override
1975 public int setBytes(int index, ScatteringByteChannel in, int length)
1976 throws IOException {
1977 try {
1978 return in.read(internalNioBuffer(index, length));
1979 } catch (ClosedChannelException ignored) {
1980 return -1;
1981 }
1982 }
1983
1984 @Override
1985 public int setBytes(int index, FileChannel in, long position, int length)
1986 throws IOException {
1987 try {
1988 return in.read(internalNioBuffer(index, length), position);
1989 } catch (ClosedChannelException ignored) {
1990 return -1;
1991 }
1992 }
1993
1994 @Override
1995 public int setCharSequence(int index, CharSequence sequence, Charset charset) {
1996 return setCharSequence0(index, sequence, charset, false);
1997 }
1998
1999 private int setCharSequence0(int index, CharSequence sequence, Charset charset, boolean expand) {
2000 if (charset.equals(CharsetUtil.UTF_8)) {
2001 int length = ByteBufUtil.utf8MaxBytes(sequence);
2002 if (expand) {
2003 ensureWritable0(length);
2004 checkIndex0(index, length);
2005 } else {
2006 checkIndex(index, length);
2007 }
2008 return ByteBufUtil.writeUtf8(this, index, length, sequence, sequence.length());
2009 }
2010 if (charset.equals(CharsetUtil.US_ASCII) || charset.equals(CharsetUtil.ISO_8859_1)) {
2011 int length = sequence.length();
2012 if (expand) {
2013 ensureWritable0(length);
2014 checkIndex0(index, length);
2015 } else {
2016 checkIndex(index, length);
2017 }
2018 return ByteBufUtil.writeAscii(this, index, sequence, length);
2019 }
2020 byte[] bytes = sequence.toString().getBytes(charset);
2021 if (expand) {
2022 ensureWritable0(bytes.length);
2023
2024 }
2025 setBytes(index, bytes);
2026 return bytes.length;
2027 }
2028
2029 @Override
2030 public int writeCharSequence(CharSequence sequence, Charset charset) {
2031 int written = setCharSequence0(writerIndex, sequence, charset, true);
2032 writerIndex += written;
2033 return written;
2034 }
2035
2036 @Override
2037 public int forEachByte(int index, int length, ByteProcessor processor) {
2038 checkIndex(index, length);
2039 int ret = rootParent().forEachByte(idx(index), length, processor);
2040 return forEachResult(ret);
2041 }
2042
2043 @Override
2044 public int forEachByteDesc(int index, int length, ByteProcessor processor) {
2045 checkIndex(index, length);
2046 int ret = rootParent().forEachByteDesc(idx(index), length, processor);
2047 return forEachResult(ret);
2048 }
2049
2050 @Override
2051 public ByteBuf setZero(int index, int length) {
2052 checkIndex(index, length);
2053 rootParent().setZero(idx(index), length);
2054 return this;
2055 }
2056
2057 @Override
2058 public ByteBuf writeZero(int length) {
2059 ensureWritable(length);
2060 rootParent().setZero(idx(writerIndex), length);
2061 writerIndex += length;
2062 return this;
2063 }
2064
2065 private int forEachResult(int ret) {
2066 if (ret < startIndex) {
2067 return -1;
2068 }
2069 return ret - startIndex;
2070 }
2071
2072 @Override
2073 public boolean isContiguous() {
2074 return rootParent().isContiguous();
2075 }
2076
2077 private int idx(int index) {
2078 return index + startIndex;
2079 }
2080
2081 @Override
2082 protected void deallocate() {
2083 if (chunk != null) {
2084 chunk.releaseSegment(startIndex, maxFastCapacity);
2085 }
2086 tmpNioBuf = null;
2087 chunk = null;
2088 rootParent = null;
2089 handle.unguardedRecycle(this);
2090 }
2091 }
2092
2093
2094
2095
2096 interface ChunkAllocator {
2097
2098
2099
2100
2101
2102
2103 AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
2104 }
2105 }