1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.ByteProcessor;
19 import io.netty.util.IllegalReferenceCountException;
20 import io.netty.util.NettyRuntime;
21 import io.netty.util.Recycler.EnhancedHandle;
22 import io.netty.util.ReferenceCounted;
23 import io.netty.util.concurrent.FastThreadLocal;
24 import io.netty.util.concurrent.FastThreadLocalThread;
25 import io.netty.util.internal.ObjectPool;
26 import io.netty.util.internal.ObjectUtil;
27 import io.netty.util.internal.PlatformDependent;
28 import io.netty.util.internal.ReferenceCountUpdater;
29 import io.netty.util.internal.SystemPropertyUtil;
30 import io.netty.util.internal.ThreadExecutorMap;
31 import io.netty.util.internal.UnstableApi;
32
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.io.OutputStream;
36 import java.nio.ByteBuffer;
37 import java.nio.ByteOrder;
38 import java.nio.channels.ClosedChannelException;
39 import java.nio.channels.FileChannel;
40 import java.nio.channels.GatheringByteChannel;
41 import java.nio.channels.ScatteringByteChannel;
42 import java.util.Arrays;
43 import java.util.Queue;
44 import java.util.Set;
45 import java.util.concurrent.ConcurrentLinkedQueue;
46 import java.util.concurrent.CopyOnWriteArraySet;
47 import java.util.concurrent.ThreadLocalRandom;
48 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
49 import java.util.concurrent.atomic.AtomicLong;
50 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
51 import java.util.concurrent.locks.StampedLock;
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79 @UnstableApi
80 final class AdaptivePoolingAllocator {
81
82 enum MagazineCaching {
83 EventLoopThreads,
84 FastThreadLocalThreads,
85 None
86 }
87
88
89
90
91
92
93
94
95 private static final int MIN_CHUNK_SIZE = 128 * 1024;
96 private static final int EXPANSION_ATTEMPTS = 3;
97 private static final int INITIAL_MAGAZINES = 4;
98 private static final int RETIRE_CAPACITY = 4 * 1024;
99 private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
100 private static final int BUFS_PER_CHUNK = 10;
101
102
103
104
105
106
107 private static final int MAX_CHUNK_SIZE =
108 BUFS_PER_CHUNK * (1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT);
109
110
111
112
113
114
115
116
117
118 private static final int CENTRAL_QUEUE_CAPACITY = Math.max(2, SystemPropertyUtil.getInt(
119 "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors()));
120
121
122
123
124
125 private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
126 "io.netty.allocator.magazineBufferQueueCapacity", 1024);
127
128 private static final Object NO_MAGAZINE = Boolean.TRUE;
129
130 private final ChunkAllocator chunkAllocator;
131 private final Queue<Chunk> centralQueue;
132 private final StampedLock magazineExpandLock;
133 private volatile Magazine[] magazines;
134 private final FastThreadLocal<Object> threadLocalMagazine;
135 private final Set<Magazine> liveCachedMagazines;
136 private volatile boolean freed;
137
138 static {
139 if (CENTRAL_QUEUE_CAPACITY < 2) {
140 throw new IllegalArgumentException("CENTRAL_QUEUE_CAPACITY: " + CENTRAL_QUEUE_CAPACITY
141 + " (expected: >= " + 2 + ')');
142 }
143 if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
144 throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
145 + " (expected: >= " + 2 + ')');
146 }
147 }
148
149 AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
150 ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
151 ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
152 this.chunkAllocator = chunkAllocator;
153 centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
154 magazineExpandLock = new StampedLock();
155 if (magazineCaching != MagazineCaching.None) {
156 assert magazineCaching == MagazineCaching.EventLoopThreads ||
157 magazineCaching == MagazineCaching.FastThreadLocalThreads;
158 final boolean cachedMagazinesNonEventLoopThreads =
159 magazineCaching == MagazineCaching.FastThreadLocalThreads;
160 final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
161 threadLocalMagazine = new FastThreadLocal<Object>() {
162 @Override
163 protected Object initialValue() {
164 if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
165 if (!FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread())) {
166
167 return NO_MAGAZINE;
168 }
169 Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
170 liveMagazines.add(mag);
171 return mag;
172 }
173 return NO_MAGAZINE;
174 }
175
176 @Override
177 protected void onRemoval(final Object value) throws Exception {
178 if (value != NO_MAGAZINE) {
179 liveMagazines.remove(value);
180 }
181 }
182 };
183 liveCachedMagazines = liveMagazines;
184 } else {
185 threadLocalMagazine = null;
186 liveCachedMagazines = null;
187 }
188 Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
189 for (int i = 0; i < mags.length; i++) {
190 mags[i] = new Magazine(this);
191 }
192 magazines = mags;
193 }
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215 private static Queue<Chunk> createSharedChunkQueue() {
216 return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
217 }
218
219 ByteBuf allocate(int size, int maxCapacity) {
220 return allocate(size, maxCapacity, Thread.currentThread(), null);
221 }
222
223 private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
224 if (size <= MAX_CHUNK_SIZE) {
225 int sizeBucket = AllocationStatistics.sizeBucket(size);
226 FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
227 if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
228 Object mag = threadLocalMagazine.get();
229 if (mag != NO_MAGAZINE) {
230 Magazine magazine = (Magazine) mag;
231 if (buf == null) {
232 buf = magazine.newBuffer();
233 }
234 boolean allocated = magazine.tryAllocate(size, sizeBucket, maxCapacity, buf);
235 assert allocated : "Allocation of threadLocalMagazine must always succeed";
236 return buf;
237 }
238 }
239 long threadId = currentThread.getId();
240 Magazine[] mags;
241 int expansions = 0;
242 do {
243 mags = magazines;
244 int mask = mags.length - 1;
245 int index = (int) (threadId & mask);
246 for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
247 Magazine mag = mags[index + i & mask];
248 if (buf == null) {
249 buf = mag.newBuffer();
250 }
251 if (mag.tryAllocate(size, sizeBucket, maxCapacity, buf)) {
252
253 return buf;
254 }
255 }
256 expansions++;
257 } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
258 }
259
260
261 return allocateFallback(size, maxCapacity, currentThread, buf);
262 }
263
264 private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
265
266 Magazine magazine;
267 if (buf != null) {
268 Chunk chunk = buf.chunk;
269 if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
270 magazine = getFallbackMagazine(currentThread);
271 }
272 } else {
273 magazine = getFallbackMagazine(currentThread);
274 buf = magazine.newBuffer();
275 }
276
277 AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
278 Chunk chunk = new Chunk(innerChunk, magazine, false);
279 try {
280 chunk.readInitInto(buf, size, maxCapacity);
281 } finally {
282
283
284
285 chunk.release();
286 }
287 return buf;
288 }
289
290 private Magazine getFallbackMagazine(Thread currentThread) {
291 Object tlMag;
292 FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
293 if (threadLocalMagazine != null &&
294 currentThread instanceof FastThreadLocalThread &&
295 (tlMag = threadLocalMagazine.get()) != NO_MAGAZINE) {
296 return (Magazine) tlMag;
297 }
298 Magazine[] mags = magazines;
299 return mags[(int) currentThread.getId() & mags.length - 1];
300 }
301
302
303
304
305 void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
306 AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
307 assert result == into: "Re-allocation created separate buffer instance";
308 }
309
310 long usedMemory() {
311 long sum = 0;
312 for (Chunk chunk : centralQueue) {
313 sum += chunk.capacity();
314 }
315 for (Magazine magazine : magazines) {
316 sum += magazine.usedMemory.get();
317 }
318 if (liveCachedMagazines != null) {
319 for (Magazine magazine : liveCachedMagazines) {
320 sum += magazine.usedMemory.get();
321 }
322 }
323 return sum;
324 }
325
326 private boolean tryExpandMagazines(int currentLength) {
327 if (currentLength >= MAX_STRIPES) {
328 return true;
329 }
330 final Magazine[] mags;
331 long writeLock = magazineExpandLock.tryWriteLock();
332 if (writeLock != 0) {
333 try {
334 mags = magazines;
335 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
336 return true;
337 }
338 int preferredChunkSize = mags[0].sharedPrefChunkSize;
339 Magazine[] expanded = new Magazine[mags.length * 2];
340 for (int i = 0, l = expanded.length; i < l; i++) {
341 Magazine m = new Magazine(this);
342 m.localPrefChunkSize = preferredChunkSize;
343 m.sharedPrefChunkSize = preferredChunkSize;
344 expanded[i] = m;
345 }
346 magazines = expanded;
347 } finally {
348 magazineExpandLock.unlockWrite(writeLock);
349 }
350 for (Magazine magazine : mags) {
351 magazine.free();
352 }
353 }
354 return true;
355 }
356
357 private boolean offerToQueue(Chunk buffer) {
358 if (freed) {
359 return false;
360 }
361
362 assert buffer.allocatedBytes == 0;
363 assert buffer.magazine == null;
364
365 boolean isAdded = centralQueue.offer(buffer);
366 if (freed && isAdded) {
367
368 freeCentralQueue();
369 }
370 return isAdded;
371 }
372
373
374
375
376 @Override
377 protected void finalize() throws Throwable {
378 try {
379 super.finalize();
380 } finally {
381 free();
382 }
383 }
384
385 private void free() {
386 freed = true;
387 long stamp = magazineExpandLock.writeLock();
388 try {
389 Magazine[] mags = magazines;
390 for (Magazine magazine : mags) {
391 magazine.free();
392 }
393 } finally {
394 magazineExpandLock.unlockWrite(stamp);
395 }
396 freeCentralQueue();
397 }
398
399 private void freeCentralQueue() {
400 for (;;) {
401 Chunk chunk = centralQueue.poll();
402 if (chunk == null) {
403 break;
404 }
405 chunk.release();
406 }
407 }
408
409 static int sizeBucket(int size) {
410 return AllocationStatistics.sizeBucket(size);
411 }
412
413 @SuppressWarnings("checkstyle:finalclass")
414 private static class AllocationStatistics {
415 private static final int MIN_DATUM_TARGET = 1024;
416 private static final int MAX_DATUM_TARGET = 65534;
417 private static final int INIT_DATUM_TARGET = 9;
418 private static final int HISTO_MIN_BUCKET_SHIFT = 13;
419 private static final int HISTO_MAX_BUCKET_SHIFT = 20;
420 private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT;
421 private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
422 private static final int SIZE_MAX_MASK = MAX_CHUNK_SIZE - 1;
423
424 protected final AdaptivePoolingAllocator parent;
425 private final boolean shareable;
426 private final short[][] histos = {
427 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
428 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
429 };
430 private short[] histo = histos[0];
431 private final int[] sums = new int[HISTO_BUCKET_COUNT];
432
433 private int histoIndex;
434 private int datumCount;
435 private int datumTarget = INIT_DATUM_TARGET;
436 protected volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
437 protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
438
439 private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
440 this.parent = parent;
441 this.shareable = shareable;
442 }
443
444 protected void recordAllocationSize(int bucket) {
445 histo[bucket]++;
446 if (datumCount++ == datumTarget) {
447 rotateHistograms();
448 }
449 }
450
451 static int sizeBucket(int size) {
452 if (size == 0) {
453 return 0;
454 }
455
456
457
458
459 int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & SIZE_MAX_MASK;
460 return Math.min(Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize), HISTO_MAX_BUCKET_MASK);
461 }
462
463 private void rotateHistograms() {
464 short[][] hs = histos;
465 for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
466 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
467 }
468 int sum = 0;
469 for (int count : sums) {
470 sum += count;
471 }
472 int targetPercentile = (int) (sum * 0.99);
473 int sizeBucket = 0;
474 for (; sizeBucket < sums.length; sizeBucket++) {
475 if (sums[sizeBucket] > targetPercentile) {
476 break;
477 }
478 targetPercentile -= sums[sizeBucket];
479 }
480 int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
481 int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
482 localPrefChunkSize = prefChunkSize;
483 if (shareable) {
484 for (Magazine mag : parent.magazines) {
485 prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
486 }
487 }
488 if (sharedPrefChunkSize != prefChunkSize) {
489
490 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
491 sharedPrefChunkSize = prefChunkSize;
492 } else {
493
494 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
495 }
496
497 histoIndex = histoIndex + 1 & 3;
498 histo = histos[histoIndex];
499 datumCount = 0;
500 Arrays.fill(histo, (short) 0);
501 }
502
503
504
505
506
507
508
509
510
511 protected int preferredChunkSize() {
512 return sharedPrefChunkSize;
513 }
514 }
515
516 private static final class Magazine extends AllocationStatistics {
517 private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
518 static {
519 NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
520 }
521 private static final Chunk MAGAZINE_FREED = new Chunk();
522
523 private static final ObjectPool<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
524 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
525 @Override
526 public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
527 return new AdaptiveByteBuf(handle);
528 }
529 });
530
531 private Chunk current;
532 @SuppressWarnings("unused")
533 private volatile Chunk nextInLine;
534 private final AtomicLong usedMemory;
535 private final StampedLock allocationLock;
536 private final Queue<AdaptiveByteBuf> bufferQueue;
537 private final ObjectPool.Handle<AdaptiveByteBuf> handle;
538
539 Magazine(AdaptivePoolingAllocator parent) {
540 this(parent, true);
541 }
542
543 Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
544 super(parent, shareable);
545
546 if (shareable) {
547
548 allocationLock = new StampedLock();
549 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
550 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
551 @Override
552 public void recycle(AdaptiveByteBuf self) {
553 bufferQueue.offer(self);
554 }
555 };
556 } else {
557 allocationLock = null;
558 bufferQueue = null;
559 handle = null;
560 }
561 usedMemory = new AtomicLong();
562 }
563
564 public boolean tryAllocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
565 if (allocationLock == null) {
566
567 return allocate(size, sizeBucket, maxCapacity, buf);
568 }
569
570
571 long writeLock = allocationLock.tryWriteLock();
572 if (writeLock != 0) {
573 try {
574 return allocate(size, sizeBucket, maxCapacity, buf);
575 } finally {
576 allocationLock.unlockWrite(writeLock);
577 }
578 }
579 return allocateWithoutLock(size, maxCapacity, buf);
580 }
581
582 private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf buf) {
583 Chunk curr = NEXT_IN_LINE.getAndSet(this, null);
584 if (curr == MAGAZINE_FREED) {
585
586 restoreMagazineFreed();
587 return false;
588 }
589 if (curr == null) {
590 curr = parent.centralQueue.poll();
591 if (curr == null) {
592 return false;
593 }
594 curr.attachToMagazine(this);
595 }
596 if (curr.remainingCapacity() >= size) {
597 curr.readInitInto(buf, size, maxCapacity);
598 }
599 try {
600 if (curr.remainingCapacity() >= RETIRE_CAPACITY) {
601 transferToNextInLineOrRelease(curr);
602 curr = null;
603 }
604 } finally {
605 if (curr != null) {
606 curr.release();
607 }
608 }
609 return true;
610 }
611
612 private boolean allocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
613 recordAllocationSize(sizeBucket);
614 Chunk curr = current;
615 if (curr != null) {
616
617 if (curr.remainingCapacity() > size) {
618 curr.readInitInto(buf, size, maxCapacity);
619
620 return true;
621 }
622
623
624
625 current = null;
626 if (curr.remainingCapacity() == size) {
627 try {
628 curr.readInitInto(buf, size, maxCapacity);
629 return true;
630 } finally {
631 curr.release();
632 }
633 }
634
635
636 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
637 curr.release();
638 } else {
639
640
641 transferToNextInLineOrRelease(curr);
642 }
643 }
644
645 assert current == null;
646
647
648
649
650
651
652
653 curr = NEXT_IN_LINE.getAndSet(this, null);
654 if (curr != null) {
655 if (curr == MAGAZINE_FREED) {
656
657 restoreMagazineFreed();
658 return false;
659 }
660
661 if (curr.remainingCapacity() > size) {
662
663 curr.readInitInto(buf, size, maxCapacity);
664 current = curr;
665 return true;
666 }
667
668 if (curr.remainingCapacity() == size) {
669
670
671 try {
672 curr.readInitInto(buf, size, maxCapacity);
673 return true;
674 } finally {
675
676
677 curr.release();
678 }
679 } else {
680
681 curr.release();
682 }
683 }
684
685
686 curr = parent.centralQueue.poll();
687 if (curr == null) {
688 curr = newChunkAllocation(size);
689 } else {
690 curr.attachToMagazine(this);
691
692 if (curr.remainingCapacity() < size) {
693
694 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
695 curr.release();
696 } else {
697
698
699 transferToNextInLineOrRelease(curr);
700 }
701 curr = newChunkAllocation(size);
702 }
703 }
704
705 current = curr;
706 try {
707 assert current.remainingCapacity() >= size;
708 if (curr.remainingCapacity() > size) {
709 curr.readInitInto(buf, size, maxCapacity);
710 curr = null;
711 } else {
712 curr.readInitInto(buf, size, maxCapacity);
713 }
714 } finally {
715 if (curr != null) {
716
717
718 curr.release();
719 current = null;
720 }
721 }
722 return true;
723 }
724
725 private void restoreMagazineFreed() {
726 Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
727 if (next != null && next != MAGAZINE_FREED) {
728 next.release();
729 }
730 }
731
732 private void transferToNextInLineOrRelease(Chunk chunk) {
733 if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
734 return;
735 }
736
737 Chunk nextChunk = NEXT_IN_LINE.get(this);
738 if (nextChunk != null && nextChunk != MAGAZINE_FREED
739 && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
740 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
741 nextChunk.release();
742 return;
743 }
744 }
745
746
747
748
749 chunk.release();
750 }
751
752 private Chunk newChunkAllocation(int promptingSize) {
753 int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
754 int minChunks = size / MIN_CHUNK_SIZE;
755 if (MIN_CHUNK_SIZE * minChunks < size) {
756
757
758
759
760 size = MIN_CHUNK_SIZE * (1 + minChunks);
761 }
762 ChunkAllocator chunkAllocator = parent.chunkAllocator;
763 return new Chunk(chunkAllocator.allocate(size, size), this, true);
764 }
765
766 boolean trySetNextInLine(Chunk chunk) {
767 return NEXT_IN_LINE.compareAndSet(this, null, chunk);
768 }
769
770 void free() {
771
772 restoreMagazineFreed();
773 long stamp = allocationLock.writeLock();
774 try {
775 if (current != null) {
776 current.release();
777 current = null;
778 }
779 } finally {
780 allocationLock.unlockWrite(stamp);
781 }
782 }
783
784 public AdaptiveByteBuf newBuffer() {
785 AdaptiveByteBuf buf;
786 if (handle == null) {
787 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
788 } else {
789 buf = bufferQueue.poll();
790 if (buf == null) {
791 buf = new AdaptiveByteBuf(handle);
792 }
793 }
794 buf.resetRefCnt();
795 buf.discardMarks();
796 return buf;
797 }
798 }
799
800 private static final class Chunk implements ReferenceCounted {
801
802 private final AbstractByteBuf delegate;
803 private Magazine magazine;
804 private final AdaptivePoolingAllocator allocator;
805 private final int capacity;
806 private final boolean pooled;
807 private int allocatedBytes;
808 private static final long REFCNT_FIELD_OFFSET =
809 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
810 private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
811 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
812
813 private static final ReferenceCountUpdater<Chunk> updater =
814 new ReferenceCountUpdater<Chunk>() {
815 @Override
816 protected AtomicIntegerFieldUpdater<Chunk> updater() {
817 return AIF_UPDATER;
818 }
819 @Override
820 protected long unsafeOffset() {
821 return REFCNT_FIELD_OFFSET;
822 }
823 };
824
825
826 @SuppressWarnings({"unused", "FieldMayBeFinal"})
827 private volatile int refCnt;
828
829 Chunk() {
830
831 delegate = null;
832 magazine = null;
833 allocator = null;
834 capacity = 0;
835 pooled = false;
836 }
837
838 Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
839 this.delegate = delegate;
840 this.pooled = pooled;
841 capacity = delegate.capacity();
842 updater.setInitialValue(this);
843 allocator = magazine.parent;
844 attachToMagazine(magazine);
845 }
846
847 Magazine currentMagazine() {
848 return magazine;
849 }
850
851 void detachFromMagazine() {
852 if (magazine != null) {
853 magazine.usedMemory.getAndAdd(-capacity);
854 magazine = null;
855 }
856 }
857
858 void attachToMagazine(Magazine magazine) {
859 assert this.magazine == null;
860 this.magazine = magazine;
861 magazine.usedMemory.getAndAdd(capacity);
862 }
863
864 @Override
865 public Chunk touch(Object hint) {
866 return this;
867 }
868
869 @Override
870 public int refCnt() {
871 return updater.refCnt(this);
872 }
873
874 @Override
875 public Chunk retain() {
876 return updater.retain(this);
877 }
878
879 @Override
880 public Chunk retain(int increment) {
881 return updater.retain(this, increment);
882 }
883
884 @Override
885 public Chunk touch() {
886 return this;
887 }
888
889 @Override
890 public boolean release() {
891 if (updater.release(this)) {
892 deallocate();
893 return true;
894 }
895 return false;
896 }
897
898 @Override
899 public boolean release(int decrement) {
900 if (updater.release(this, decrement)) {
901 deallocate();
902 return true;
903 }
904 return false;
905 }
906
907 private void deallocate() {
908 Magazine mag = magazine;
909 AdaptivePoolingAllocator parent = mag.parent;
910 int chunkSize = mag.preferredChunkSize();
911 int memSize = delegate.capacity();
912 if (!pooled || shouldReleaseSuboptimalChunkSuze(memSize, chunkSize)) {
913
914
915 detachFromMagazine();
916 delegate.release();
917 } else {
918 updater.resetRefCnt(this);
919 delegate.setIndex(0, 0);
920 allocatedBytes = 0;
921 if (!mag.trySetNextInLine(this)) {
922
923 detachFromMagazine();
924 if (!parent.offerToQueue(this)) {
925
926
927 boolean released = updater.release(this);
928 delegate.release();
929 assert released;
930 }
931 }
932 }
933 }
934
935 private static boolean shouldReleaseSuboptimalChunkSuze(int givenSize, int preferredSize) {
936 int givenChunks = givenSize / MIN_CHUNK_SIZE;
937 int preferredChunks = preferredSize / MIN_CHUNK_SIZE;
938 int deviation = Math.abs(givenChunks - preferredChunks);
939
940
941 return deviation != 0 &&
942 ThreadLocalRandom.current().nextDouble() * 200.0 > deviation;
943 }
944
945 public void readInitInto(AdaptiveByteBuf buf, int size, int maxCapacity) {
946 int startIndex = allocatedBytes;
947 allocatedBytes = startIndex + size;
948 Chunk chunk = this;
949 chunk.retain();
950 try {
951 buf.init(delegate, chunk, 0, 0, startIndex, size, maxCapacity);
952 chunk = null;
953 } finally {
954 if (chunk != null) {
955
956
957
958 allocatedBytes = startIndex;
959 chunk.release();
960 }
961 }
962 }
963
964 public int remainingCapacity() {
965 return capacity - allocatedBytes;
966 }
967
968 public int capacity() {
969 return capacity;
970 }
971 }
972
973 static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
974
975 private final ObjectPool.Handle<AdaptiveByteBuf> handle;
976
977 private int adjustment;
978 private AbstractByteBuf rootParent;
979 Chunk chunk;
980 private int length;
981 private ByteBuffer tmpNioBuf;
982 private boolean hasArray;
983 private boolean hasMemoryAddress;
984
985 AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
986 super(0);
987 handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
988 }
989
990 void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
991 int adjustment, int capacity, int maxCapacity) {
992 this.adjustment = adjustment;
993 chunk = wrapped;
994 length = capacity;
995 maxCapacity(maxCapacity);
996 setIndex0(readerIndex, writerIndex);
997 hasArray = unwrapped.hasArray();
998 hasMemoryAddress = unwrapped.hasMemoryAddress();
999 rootParent = unwrapped;
1000 tmpNioBuf = unwrapped.internalNioBuffer(adjustment, capacity).slice();
1001 }
1002
1003 private AbstractByteBuf rootParent() {
1004 final AbstractByteBuf rootParent = this.rootParent;
1005 if (rootParent != null) {
1006 return rootParent;
1007 }
1008 throw new IllegalReferenceCountException();
1009 }
1010
1011 @Override
1012 public int capacity() {
1013 return length;
1014 }
1015
1016 @Override
1017 public ByteBuf capacity(int newCapacity) {
1018 if (newCapacity == capacity()) {
1019 ensureAccessible();
1020 return this;
1021 }
1022 checkNewCapacity(newCapacity);
1023 if (newCapacity < capacity()) {
1024 length = newCapacity;
1025 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
1026 return this;
1027 }
1028
1029
1030 ByteBuffer data = tmpNioBuf;
1031 data.clear();
1032 tmpNioBuf = null;
1033 Chunk chunk = this.chunk;
1034 AdaptivePoolingAllocator allocator = chunk.allocator;
1035 int readerIndex = this.readerIndex;
1036 int writerIndex = this.writerIndex;
1037 allocator.allocate(newCapacity, maxCapacity(), this);
1038 tmpNioBuf.put(data);
1039 tmpNioBuf.clear();
1040 chunk.release();
1041 this.readerIndex = readerIndex;
1042 this.writerIndex = writerIndex;
1043 return this;
1044 }
1045
1046 @Override
1047 public ByteBufAllocator alloc() {
1048 return rootParent().alloc();
1049 }
1050
1051 @Override
1052 public ByteOrder order() {
1053 return rootParent().order();
1054 }
1055
1056 @Override
1057 public ByteBuf unwrap() {
1058 return null;
1059 }
1060
1061 @Override
1062 public boolean isDirect() {
1063 return rootParent().isDirect();
1064 }
1065
1066 @Override
1067 public int arrayOffset() {
1068 return idx(rootParent().arrayOffset());
1069 }
1070
1071 @Override
1072 public boolean hasMemoryAddress() {
1073 return hasMemoryAddress;
1074 }
1075
1076 @Override
1077 public long memoryAddress() {
1078 ensureAccessible();
1079 return rootParent().memoryAddress() + adjustment;
1080 }
1081
1082 @Override
1083 public ByteBuffer nioBuffer(int index, int length) {
1084 checkIndex(index, length);
1085 return rootParent().nioBuffer(idx(index), length);
1086 }
1087
1088 @Override
1089 public ByteBuffer internalNioBuffer(int index, int length) {
1090 checkIndex(index, length);
1091 return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1092 }
1093
1094 private ByteBuffer internalNioBuffer() {
1095 return (ByteBuffer) tmpNioBuf.clear();
1096 }
1097
1098 @Override
1099 public ByteBuffer[] nioBuffers(int index, int length) {
1100 checkIndex(index, length);
1101 return rootParent().nioBuffers(idx(index), length);
1102 }
1103
1104 @Override
1105 public boolean hasArray() {
1106 return hasArray;
1107 }
1108
1109 @Override
1110 public byte[] array() {
1111 ensureAccessible();
1112 return rootParent().array();
1113 }
1114
1115 @Override
1116 public ByteBuf copy(int index, int length) {
1117 checkIndex(index, length);
1118 return rootParent().copy(idx(index), length);
1119 }
1120
1121 @Override
1122 public int nioBufferCount() {
1123 return rootParent().nioBufferCount();
1124 }
1125
1126 @Override
1127 protected byte _getByte(int index) {
1128 return rootParent()._getByte(idx(index));
1129 }
1130
1131 @Override
1132 protected short _getShort(int index) {
1133 return rootParent()._getShort(idx(index));
1134 }
1135
1136 @Override
1137 protected short _getShortLE(int index) {
1138 return rootParent()._getShortLE(idx(index));
1139 }
1140
1141 @Override
1142 protected int _getUnsignedMedium(int index) {
1143 return rootParent()._getUnsignedMedium(idx(index));
1144 }
1145
1146 @Override
1147 protected int _getUnsignedMediumLE(int index) {
1148 return rootParent()._getUnsignedMediumLE(idx(index));
1149 }
1150
1151 @Override
1152 protected int _getInt(int index) {
1153 return rootParent()._getInt(idx(index));
1154 }
1155
1156 @Override
1157 protected int _getIntLE(int index) {
1158 return rootParent()._getIntLE(idx(index));
1159 }
1160
1161 @Override
1162 protected long _getLong(int index) {
1163 return rootParent()._getLong(idx(index));
1164 }
1165
1166 @Override
1167 protected long _getLongLE(int index) {
1168 return rootParent()._getLongLE(idx(index));
1169 }
1170
1171 @Override
1172 public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1173 checkIndex(index, length);
1174 rootParent().getBytes(idx(index), dst, dstIndex, length);
1175 return this;
1176 }
1177
1178 @Override
1179 public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1180 checkIndex(index, length);
1181 rootParent().getBytes(idx(index), dst, dstIndex, length);
1182 return this;
1183 }
1184
1185 @Override
1186 public ByteBuf getBytes(int index, ByteBuffer dst) {
1187 checkIndex(index, dst.remaining());
1188 rootParent().getBytes(idx(index), dst);
1189 return this;
1190 }
1191
1192 @Override
1193 protected void _setByte(int index, int value) {
1194 rootParent()._setByte(idx(index), value);
1195 }
1196
1197 @Override
1198 protected void _setShort(int index, int value) {
1199 rootParent()._setShort(idx(index), value);
1200 }
1201
1202 @Override
1203 protected void _setShortLE(int index, int value) {
1204 rootParent()._setShortLE(idx(index), value);
1205 }
1206
1207 @Override
1208 protected void _setMedium(int index, int value) {
1209 rootParent()._setMedium(idx(index), value);
1210 }
1211
1212 @Override
1213 protected void _setMediumLE(int index, int value) {
1214 rootParent()._setMediumLE(idx(index), value);
1215 }
1216
1217 @Override
1218 protected void _setInt(int index, int value) {
1219 rootParent()._setInt(idx(index), value);
1220 }
1221
1222 @Override
1223 protected void _setIntLE(int index, int value) {
1224 rootParent()._setIntLE(idx(index), value);
1225 }
1226
1227 @Override
1228 protected void _setLong(int index, long value) {
1229 rootParent()._setLong(idx(index), value);
1230 }
1231
1232 @Override
1233 protected void _setLongLE(int index, long value) {
1234 rootParent().setLongLE(idx(index), value);
1235 }
1236
1237 @Override
1238 public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1239 checkIndex(index, length);
1240 rootParent().setBytes(idx(index), src, srcIndex, length);
1241 return this;
1242 }
1243
1244 @Override
1245 public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1246 checkIndex(index, length);
1247 rootParent().setBytes(idx(index), src, srcIndex, length);
1248 return this;
1249 }
1250
1251 @Override
1252 public ByteBuf setBytes(int index, ByteBuffer src) {
1253 checkIndex(index, src.remaining());
1254 rootParent().setBytes(idx(index), src);
1255 return this;
1256 }
1257
1258 @Override
1259 public ByteBuf getBytes(int index, OutputStream out, int length)
1260 throws IOException {
1261 checkIndex(index, length);
1262 if (length != 0) {
1263 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1264 }
1265 return this;
1266 }
1267
1268 @Override
1269 public int getBytes(int index, GatheringByteChannel out, int length)
1270 throws IOException {
1271 return out.write(internalNioBuffer(index, length).duplicate());
1272 }
1273
1274 @Override
1275 public int getBytes(int index, FileChannel out, long position, int length)
1276 throws IOException {
1277 return out.write(internalNioBuffer(index, length).duplicate(), position);
1278 }
1279
1280 @Override
1281 public int setBytes(int index, InputStream in, int length)
1282 throws IOException {
1283 checkIndex(index, length);
1284 final AbstractByteBuf rootParent = rootParent();
1285 if (rootParent.hasArray()) {
1286 return rootParent.setBytes(idx(index), in, length);
1287 }
1288 byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1289 int readBytes = in.read(tmp, 0, length);
1290 if (readBytes <= 0) {
1291 return readBytes;
1292 }
1293 setBytes(index, tmp, 0, readBytes);
1294 return readBytes;
1295 }
1296
1297 @Override
1298 public int setBytes(int index, ScatteringByteChannel in, int length)
1299 throws IOException {
1300 try {
1301 return in.read(internalNioBuffer(index, length).duplicate());
1302 } catch (ClosedChannelException ignored) {
1303 return -1;
1304 }
1305 }
1306
1307 @Override
1308 public int setBytes(int index, FileChannel in, long position, int length)
1309 throws IOException {
1310 try {
1311 return in.read(internalNioBuffer(index, length).duplicate(), position);
1312 } catch (ClosedChannelException ignored) {
1313 return -1;
1314 }
1315 }
1316
1317 @Override
1318 public int forEachByte(int index, int length, ByteProcessor processor) {
1319 checkIndex(index, length);
1320 int ret = rootParent().forEachByte(idx(index), length, processor);
1321 return forEachResult(ret);
1322 }
1323
1324 @Override
1325 public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1326 checkIndex(index, length);
1327 int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1328 return forEachResult(ret);
1329 }
1330
1331 private int forEachResult(int ret) {
1332 if (ret < adjustment) {
1333 return -1;
1334 }
1335 return ret - adjustment;
1336 }
1337
1338 @Override
1339 public boolean isContiguous() {
1340 return rootParent().isContiguous();
1341 }
1342
1343 private int idx(int index) {
1344 return index + adjustment;
1345 }
1346
1347 @Override
1348 protected void deallocate() {
1349 if (chunk != null) {
1350 chunk.release();
1351 }
1352 tmpNioBuf = null;
1353 chunk = null;
1354 rootParent = null;
1355 if (handle instanceof EnhancedHandle) {
1356 EnhancedHandle<AdaptiveByteBuf> enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1357 enhancedHandle.unguardedRecycle(this);
1358 } else {
1359 handle.recycle(this);
1360 }
1361 }
1362 }
1363
1364
1365
1366
1367 interface ChunkAllocator {
1368
1369
1370
1371
1372
1373
1374 AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1375 }
1376 }