1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.ByteProcessor;
19 import io.netty.util.IllegalReferenceCountException;
20 import io.netty.util.NettyRuntime;
21 import io.netty.util.Recycler.EnhancedHandle;
22 import io.netty.util.ReferenceCounted;
23 import io.netty.util.concurrent.FastThreadLocal;
24 import io.netty.util.concurrent.FastThreadLocalThread;
25 import io.netty.util.internal.ObjectPool;
26 import io.netty.util.internal.ObjectUtil;
27 import io.netty.util.internal.PlatformDependent;
28 import io.netty.util.internal.ReferenceCountUpdater;
29 import io.netty.util.internal.SystemPropertyUtil;
30 import io.netty.util.internal.ThreadExecutorMap;
31 import io.netty.util.internal.UnstableApi;
32
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.io.OutputStream;
36 import java.nio.ByteBuffer;
37 import java.nio.ByteOrder;
38 import java.nio.channels.ClosedChannelException;
39 import java.nio.channels.FileChannel;
40 import java.nio.channels.GatheringByteChannel;
41 import java.nio.channels.ScatteringByteChannel;
42 import java.nio.charset.Charset;
43 import java.util.Arrays;
44 import java.util.Queue;
45 import java.util.Set;
46 import java.util.concurrent.ConcurrentLinkedQueue;
47 import java.util.concurrent.CopyOnWriteArraySet;
48 import java.util.concurrent.ThreadLocalRandom;
49 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
50 import java.util.concurrent.atomic.AtomicLong;
51 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
52 import java.util.concurrent.locks.StampedLock;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 @UnstableApi
81 final class AdaptivePoolingAllocator {
82
83 enum MagazineCaching {
84 EventLoopThreads,
85 FastThreadLocalThreads,
86 None
87 }
88
89
90
91
92
93
94
95
96 private static final int MIN_CHUNK_SIZE = 128 * 1024;
97 private static final int EXPANSION_ATTEMPTS = 3;
98 private static final int INITIAL_MAGAZINES = 4;
99 private static final int RETIRE_CAPACITY = 256;
100 private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
101 private static final int BUFS_PER_CHUNK = 8;
102
103
104
105
106
107
108 private static final int MAX_CHUNK_SIZE = 1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT;
109 private static final int MAX_POOLED_BUF_SIZE = MAX_CHUNK_SIZE / BUFS_PER_CHUNK;
110
111
112
113
114
115
116
117
118
119 private static final int CENTRAL_QUEUE_CAPACITY = Math.max(2, SystemPropertyUtil.getInt(
120 "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors()));
121
122
123
124
125
126 private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
127 "io.netty.allocator.magazineBufferQueueCapacity", 1024);
128
129 private static final Object NO_MAGAZINE = Boolean.TRUE;
130
131 private final ChunkAllocator chunkAllocator;
132 private final Queue<Chunk> centralQueue;
133 private final StampedLock magazineExpandLock;
134 private volatile Magazine[] magazines;
135 private final FastThreadLocal<Object> threadLocalMagazine;
136 private final Set<Magazine> liveCachedMagazines;
137 private volatile boolean freed;
138
139 static {
140 if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
141 throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
142 + " (expected: >= " + 2 + ')');
143 }
144 }
145
146 AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
147 ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
148 ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
149 this.chunkAllocator = chunkAllocator;
150 centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
151 magazineExpandLock = new StampedLock();
152 if (magazineCaching != MagazineCaching.None) {
153 assert magazineCaching == MagazineCaching.EventLoopThreads ||
154 magazineCaching == MagazineCaching.FastThreadLocalThreads;
155 final boolean cachedMagazinesNonEventLoopThreads =
156 magazineCaching == MagazineCaching.FastThreadLocalThreads;
157 final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
158 threadLocalMagazine = new FastThreadLocal<Object>() {
159 @Override
160 protected Object initialValue() {
161 if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
162 if (!FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread())) {
163
164 return NO_MAGAZINE;
165 }
166 Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
167 liveMagazines.add(mag);
168 return mag;
169 }
170 return NO_MAGAZINE;
171 }
172
173 @Override
174 protected void onRemoval(final Object value) throws Exception {
175 if (value != NO_MAGAZINE) {
176 liveMagazines.remove(value);
177 }
178 }
179 };
180 liveCachedMagazines = liveMagazines;
181 } else {
182 threadLocalMagazine = null;
183 liveCachedMagazines = null;
184 }
185 Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
186 for (int i = 0; i < mags.length; i++) {
187 mags[i] = new Magazine(this);
188 }
189 magazines = mags;
190 }
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212 private static Queue<Chunk> createSharedChunkQueue() {
213 return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
214 }
215
216 ByteBuf allocate(int size, int maxCapacity) {
217 return allocate(size, maxCapacity, Thread.currentThread(), null);
218 }
219
220 private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
221 if (size <= MAX_POOLED_BUF_SIZE) {
222 FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
223 if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
224 Object mag = threadLocalMagazine.get();
225 if (mag != NO_MAGAZINE) {
226 Magazine magazine = (Magazine) mag;
227 if (buf == null) {
228 buf = magazine.newBuffer();
229 }
230 boolean allocated = magazine.tryAllocate(size, maxCapacity, buf);
231 assert allocated : "Allocation of threadLocalMagazine must always succeed";
232 return buf;
233 }
234 }
235 long threadId = currentThread.getId();
236 Magazine[] mags;
237 int expansions = 0;
238 do {
239 mags = magazines;
240 int mask = mags.length - 1;
241 int index = (int) (threadId & mask);
242 for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
243 Magazine mag = mags[index + i & mask];
244 if (buf == null) {
245 buf = mag.newBuffer();
246 }
247 if (mag.tryAllocate(size, maxCapacity, buf)) {
248
249 return buf;
250 }
251 }
252 expansions++;
253 } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
254 }
255
256
257 return allocateFallback(size, maxCapacity, currentThread, buf);
258 }
259
260 private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
261
262 Magazine magazine;
263 if (buf != null) {
264 Chunk chunk = buf.chunk;
265 if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
266 magazine = getFallbackMagazine(currentThread);
267 }
268 } else {
269 magazine = getFallbackMagazine(currentThread);
270 buf = magazine.newBuffer();
271 }
272
273 AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
274 Chunk chunk = new Chunk(innerChunk, magazine, false);
275 try {
276 chunk.readInitInto(buf, size, size, maxCapacity);
277 } finally {
278
279
280
281 chunk.release();
282 }
283 return buf;
284 }
285
286 private Magazine getFallbackMagazine(Thread currentThread) {
287 Object tlMag;
288 FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
289 if (threadLocalMagazine != null &&
290 currentThread instanceof FastThreadLocalThread &&
291 (tlMag = threadLocalMagazine.get()) != NO_MAGAZINE) {
292 return (Magazine) tlMag;
293 }
294 Magazine[] mags = magazines;
295 return mags[(int) currentThread.getId() & mags.length - 1];
296 }
297
298
299
300
301 void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
302 AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
303 assert result == into: "Re-allocation created separate buffer instance";
304 }
305
306 long usedMemory() {
307 long sum = 0;
308 for (Chunk chunk : centralQueue) {
309 sum += chunk.capacity();
310 }
311 for (Magazine magazine : magazines) {
312 sum += magazine.usedMemory.get();
313 }
314 if (liveCachedMagazines != null) {
315 for (Magazine magazine : liveCachedMagazines) {
316 sum += magazine.usedMemory.get();
317 }
318 }
319 return sum;
320 }
321
322 private boolean tryExpandMagazines(int currentLength) {
323 if (currentLength >= MAX_STRIPES) {
324 return true;
325 }
326 final Magazine[] mags;
327 long writeLock = magazineExpandLock.tryWriteLock();
328 if (writeLock != 0) {
329 try {
330 mags = magazines;
331 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
332 return true;
333 }
334 int preferredChunkSize = mags[0].sharedPrefChunkSize;
335 Magazine[] expanded = new Magazine[mags.length * 2];
336 for (int i = 0, l = expanded.length; i < l; i++) {
337 Magazine m = new Magazine(this);
338 m.localPrefChunkSize = preferredChunkSize;
339 m.sharedPrefChunkSize = preferredChunkSize;
340 expanded[i] = m;
341 }
342 magazines = expanded;
343 } finally {
344 magazineExpandLock.unlockWrite(writeLock);
345 }
346 for (Magazine magazine : mags) {
347 magazine.free();
348 }
349 }
350 return true;
351 }
352
353 boolean offerToQueue(Chunk buffer) {
354 if (freed) {
355 return false;
356 }
357
358 assert buffer.allocatedBytes == 0;
359 assert buffer.magazine == null;
360
361 boolean isAdded = centralQueue.offer(buffer);
362 if (freed && isAdded) {
363
364 freeCentralQueue();
365 }
366 return isAdded;
367 }
368
369
370
371
372 @Override
373 protected void finalize() throws Throwable {
374 try {
375 super.finalize();
376 } finally {
377 free();
378 }
379 }
380
381 private void free() {
382 freed = true;
383 long stamp = magazineExpandLock.writeLock();
384 try {
385 Magazine[] mags = magazines;
386 for (Magazine magazine : mags) {
387 magazine.free();
388 }
389 } finally {
390 magazineExpandLock.unlockWrite(stamp);
391 }
392 freeCentralQueue();
393 }
394
395 private void freeCentralQueue() {
396 for (;;) {
397 Chunk chunk = centralQueue.poll();
398 if (chunk == null) {
399 break;
400 }
401 chunk.release();
402 }
403 }
404
405 static int sizeBucket(int size) {
406 return AllocationStatistics.sizeBucket(size);
407 }
408
409 @SuppressWarnings("checkstyle:finalclass")
410 private static class AllocationStatistics {
411 private static final int MIN_DATUM_TARGET = 1024;
412 private static final int MAX_DATUM_TARGET = 65534;
413 private static final int INIT_DATUM_TARGET = 9;
414 private static final int HISTO_MIN_BUCKET_SHIFT = 8;
415 private static final int HISTO_MAX_BUCKET_SHIFT = 23;
416 private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT;
417 private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
418 private static final int SIZE_MAX_MASK = MAX_CHUNK_SIZE - 1;
419
420 protected final AdaptivePoolingAllocator parent;
421 private final boolean shareable;
422 private final short[][] histos = {
423 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
424 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
425 };
426 private short[] histo = histos[0];
427 private final int[] sums = new int[HISTO_BUCKET_COUNT];
428
429 private int histoIndex;
430 private int datumCount;
431 private int datumTarget = INIT_DATUM_TARGET;
432 protected boolean hasHadRotation;
433 protected volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
434 protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
435 protected volatile int localUpperBufSize;
436
437 private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
438 this.parent = parent;
439 this.shareable = shareable;
440 }
441
442 protected void recordAllocationSize(int bufferSizeToRecord) {
443
444
445
446 if (bufferSizeToRecord == 0) {
447 return;
448 }
449 int bucket = sizeBucket(bufferSizeToRecord);
450 histo[bucket]++;
451 if (datumCount++ == datumTarget) {
452 rotateHistograms();
453 }
454 }
455
456 static int sizeBucket(int size) {
457 if (size == 0) {
458 return 0;
459 }
460
461
462
463
464 int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & SIZE_MAX_MASK;
465 return Math.min(Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize), HISTO_MAX_BUCKET_MASK);
466 }
467
468 private void rotateHistograms() {
469 short[][] hs = histos;
470 for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
471 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
472 }
473 int sum = 0;
474 for (int count : sums) {
475 sum += count;
476 }
477 int targetPercentile = (int) (sum * 0.99);
478 int sizeBucket = 0;
479 for (; sizeBucket < sums.length; sizeBucket++) {
480 if (sums[sizeBucket] > targetPercentile) {
481 break;
482 }
483 targetPercentile -= sums[sizeBucket];
484 }
485 hasHadRotation = true;
486 int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
487 int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
488 localUpperBufSize = percentileSize;
489 localPrefChunkSize = prefChunkSize;
490 if (shareable) {
491 for (Magazine mag : parent.magazines) {
492 prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
493 }
494 }
495 if (sharedPrefChunkSize != prefChunkSize) {
496
497 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
498 sharedPrefChunkSize = prefChunkSize;
499 } else {
500
501 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
502 }
503
504 histoIndex = histoIndex + 1 & 3;
505 histo = histos[histoIndex];
506 datumCount = 0;
507 Arrays.fill(histo, (short) 0);
508 }
509
510
511
512
513
514
515
516
517
518 protected int preferredChunkSize() {
519 return sharedPrefChunkSize;
520 }
521 }
522
523 private static final class Magazine extends AllocationStatistics {
524 private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
525 static {
526 NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
527 }
528 private static final Chunk MAGAZINE_FREED = new Chunk();
529
530 private static final ObjectPool<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
531 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
532 @Override
533 public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
534 return new AdaptiveByteBuf(handle);
535 }
536 });
537
538 private Chunk current;
539 @SuppressWarnings("unused")
540 private volatile Chunk nextInLine;
541 private final AtomicLong usedMemory;
542 private final StampedLock allocationLock;
543 private final Queue<AdaptiveByteBuf> bufferQueue;
544 private final ObjectPool.Handle<AdaptiveByteBuf> handle;
545
546 Magazine(AdaptivePoolingAllocator parent) {
547 this(parent, true);
548 }
549
550 Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
551 super(parent, shareable);
552
553 if (shareable) {
554
555 allocationLock = new StampedLock();
556 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
557 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
558 @Override
559 public void recycle(AdaptiveByteBuf self) {
560 bufferQueue.offer(self);
561 }
562 };
563 } else {
564 allocationLock = null;
565 bufferQueue = null;
566 handle = null;
567 }
568 usedMemory = new AtomicLong();
569 }
570
571 public boolean tryAllocate(int size, int maxCapacity, AdaptiveByteBuf buf) {
572 if (allocationLock == null) {
573
574 return allocate(size, maxCapacity, buf);
575 }
576
577
578 long writeLock = allocationLock.tryWriteLock();
579 if (writeLock != 0) {
580 try {
581 return allocate(size, maxCapacity, buf);
582 } finally {
583 allocationLock.unlockWrite(writeLock);
584 }
585 }
586 return allocateWithoutLock(size, maxCapacity, buf);
587 }
588
589 private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf buf) {
590 Chunk curr = NEXT_IN_LINE.getAndSet(this, null);
591 if (curr == MAGAZINE_FREED) {
592
593 restoreMagazineFreed();
594 return false;
595 }
596 if (curr == null) {
597 curr = parent.centralQueue.poll();
598 if (curr == null) {
599 return false;
600 }
601 curr.attachToMagazine(this);
602 }
603 boolean allocated = false;
604 int remainingCapacity = curr.remainingCapacity();
605 int startingCapacity = getStartingCapacity(size, maxCapacity);
606 if (remainingCapacity >= size) {
607 curr.readInitInto(buf, size, Math.min(remainingCapacity, startingCapacity), maxCapacity);
608 allocated = true;
609 }
610 try {
611 if (remainingCapacity >= RETIRE_CAPACITY) {
612 transferToNextInLineOrRelease(curr);
613 curr = null;
614 }
615 } finally {
616 if (curr != null) {
617 curr.release();
618 }
619 }
620 return allocated;
621 }
622
623 private boolean allocate(int size, int maxCapacity, AdaptiveByteBuf buf) {
624 recordAllocationSize(buf.length);
625 int startingCapacity = getStartingCapacity(size, maxCapacity);
626 Chunk curr = current;
627 if (curr != null) {
628
629 int remainingCapacity = curr.remainingCapacity();
630 if (remainingCapacity > startingCapacity) {
631 curr.readInitInto(buf, size, startingCapacity, maxCapacity);
632
633 return true;
634 }
635
636
637
638 current = null;
639 if (remainingCapacity >= size) {
640 try {
641 curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
642 return true;
643 } finally {
644 curr.release();
645 }
646 }
647
648
649 if (remainingCapacity < RETIRE_CAPACITY) {
650 curr.release();
651 } else {
652
653
654 transferToNextInLineOrRelease(curr);
655 }
656 }
657
658 assert current == null;
659
660
661
662
663
664
665
666
667 curr = NEXT_IN_LINE.getAndSet(this, null);
668 if (curr != null) {
669 if (curr == MAGAZINE_FREED) {
670
671 restoreMagazineFreed();
672 return false;
673 }
674
675 int remainingCapacity = curr.remainingCapacity();
676 if (remainingCapacity > startingCapacity) {
677
678 curr.readInitInto(buf, size, startingCapacity, maxCapacity);
679 current = curr;
680 return true;
681 }
682
683 if (remainingCapacity >= size) {
684
685
686 try {
687 curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
688 return true;
689 } finally {
690
691
692 curr.release();
693 }
694 } else {
695
696 curr.release();
697 }
698 }
699
700
701 curr = parent.centralQueue.poll();
702 if (curr == null) {
703 curr = newChunkAllocation(size);
704 } else {
705 curr.attachToMagazine(this);
706
707 int remainingCapacity = curr.remainingCapacity();
708 if (remainingCapacity < size) {
709
710 if (remainingCapacity < RETIRE_CAPACITY) {
711 curr.release();
712 } else {
713
714
715 transferToNextInLineOrRelease(curr);
716 }
717 curr = newChunkAllocation(size);
718 }
719 }
720
721 current = curr;
722 try {
723 int remainingCapacity = curr.remainingCapacity();
724 assert remainingCapacity >= size;
725 if (remainingCapacity > startingCapacity) {
726 curr.readInitInto(buf, size, startingCapacity, maxCapacity);
727 curr = null;
728 } else {
729 curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
730 }
731 } finally {
732 if (curr != null) {
733
734
735 curr.release();
736 current = null;
737 }
738 }
739 return true;
740 }
741
742 private int getStartingCapacity(int size, int maxCapacity) {
743
744
745 int startCapLimits;
746 if (size <= 2048) {
747 startCapLimits = 16384;
748 } else if (size <= 32768) {
749 startCapLimits = 65536;
750 } else {
751 startCapLimits = size * 2;
752 }
753 int startingCapacity = Math.min(startCapLimits, localUpperBufSize);
754 startingCapacity = Math.max(size, Math.min(maxCapacity, startingCapacity));
755 return startingCapacity;
756 }
757
758 private void restoreMagazineFreed() {
759 Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
760 if (next != null && next != MAGAZINE_FREED) {
761 next.release();
762 }
763 }
764
765 private void transferToNextInLineOrRelease(Chunk chunk) {
766 if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
767 return;
768 }
769
770 Chunk nextChunk = NEXT_IN_LINE.get(this);
771 if (nextChunk != null && nextChunk != MAGAZINE_FREED
772 && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
773 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
774 nextChunk.release();
775 return;
776 }
777 }
778
779
780
781
782 chunk.release();
783 }
784
785 private Chunk newChunkAllocation(int promptingSize) {
786 int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
787 int minChunks = size / MIN_CHUNK_SIZE;
788 if (MIN_CHUNK_SIZE * minChunks < size) {
789
790
791
792
793 size = MIN_CHUNK_SIZE * (1 + minChunks);
794 }
795
796
797 size = Math.min(size, MAX_CHUNK_SIZE);
798
799
800 if (!hasHadRotation && sharedPrefChunkSize == MIN_CHUNK_SIZE) {
801 sharedPrefChunkSize = size;
802 }
803
804 ChunkAllocator chunkAllocator = parent.chunkAllocator;
805 return new Chunk(chunkAllocator.allocate(size, size), this, true);
806 }
807
808 boolean trySetNextInLine(Chunk chunk) {
809 return NEXT_IN_LINE.compareAndSet(this, null, chunk);
810 }
811
812 void free() {
813
814 restoreMagazineFreed();
815 long stamp = allocationLock.writeLock();
816 try {
817 if (current != null) {
818 current.release();
819 current = null;
820 }
821 } finally {
822 allocationLock.unlockWrite(stamp);
823 }
824 }
825
826 public AdaptiveByteBuf newBuffer() {
827 AdaptiveByteBuf buf;
828 if (handle == null) {
829 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
830 } else {
831 buf = bufferQueue.poll();
832 if (buf == null) {
833 buf = new AdaptiveByteBuf(handle);
834 }
835 }
836 buf.resetRefCnt();
837 buf.discardMarks();
838 return buf;
839 }
840 }
841
842 private static final class Chunk implements ReferenceCounted {
843
844 private final AbstractByteBuf delegate;
845 private Magazine magazine;
846 private final AdaptivePoolingAllocator allocator;
847 private final int capacity;
848 private final boolean pooled;
849 private int allocatedBytes;
850 private static final long REFCNT_FIELD_OFFSET =
851 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
852 private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
853 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
854
855 private static final ReferenceCountUpdater<Chunk> updater =
856 new ReferenceCountUpdater<Chunk>() {
857 @Override
858 protected AtomicIntegerFieldUpdater<Chunk> updater() {
859 return AIF_UPDATER;
860 }
861 @Override
862 protected long unsafeOffset() {
863 return REFCNT_FIELD_OFFSET;
864 }
865 };
866
867
868 @SuppressWarnings({"unused", "FieldMayBeFinal"})
869 private volatile int refCnt;
870
871 Chunk() {
872
873 delegate = null;
874 magazine = null;
875 allocator = null;
876 capacity = 0;
877 pooled = false;
878 }
879
880 Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
881 this.delegate = delegate;
882 this.pooled = pooled;
883 capacity = delegate.capacity();
884 updater.setInitialValue(this);
885 allocator = magazine.parent;
886 attachToMagazine(magazine);
887 }
888
889 Magazine currentMagazine() {
890 return magazine;
891 }
892
893 void detachFromMagazine() {
894 if (magazine != null) {
895 magazine.usedMemory.getAndAdd(-capacity);
896 magazine = null;
897 }
898 }
899
900 void attachToMagazine(Magazine magazine) {
901 assert this.magazine == null;
902 this.magazine = magazine;
903 magazine.usedMemory.getAndAdd(capacity);
904 }
905
906 @Override
907 public Chunk touch(Object hint) {
908 return this;
909 }
910
911 @Override
912 public int refCnt() {
913 return updater.refCnt(this);
914 }
915
916 @Override
917 public Chunk retain() {
918 return updater.retain(this);
919 }
920
921 @Override
922 public Chunk retain(int increment) {
923 return updater.retain(this, increment);
924 }
925
926 @Override
927 public Chunk touch() {
928 return this;
929 }
930
931 @Override
932 public boolean release() {
933 if (updater.release(this)) {
934 deallocate();
935 return true;
936 }
937 return false;
938 }
939
940 @Override
941 public boolean release(int decrement) {
942 if (updater.release(this, decrement)) {
943 deallocate();
944 return true;
945 }
946 return false;
947 }
948
949 private void deallocate() {
950 Magazine mag = magazine;
951 AdaptivePoolingAllocator parent = mag.parent;
952 int chunkSize = mag.preferredChunkSize();
953 int memSize = delegate.capacity();
954 if (!pooled || shouldReleaseSuboptimalChunkSize(memSize, chunkSize)) {
955
956
957 detachFromMagazine();
958 delegate.release();
959 } else {
960 updater.resetRefCnt(this);
961 delegate.setIndex(0, 0);
962 allocatedBytes = 0;
963 if (!mag.trySetNextInLine(this)) {
964
965 detachFromMagazine();
966 if (!parent.offerToQueue(this)) {
967
968
969 boolean released = updater.release(this);
970 delegate.release();
971 assert released;
972 }
973 }
974 }
975 }
976
977 private static boolean shouldReleaseSuboptimalChunkSize(int givenSize, int preferredSize) {
978 int givenChunks = givenSize / MIN_CHUNK_SIZE;
979 int preferredChunks = preferredSize / MIN_CHUNK_SIZE;
980 int deviation = Math.abs(givenChunks - preferredChunks);
981
982
983 return deviation != 0 &&
984 ThreadLocalRandom.current().nextDouble() * 20.0 < deviation;
985 }
986
987 public void readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
988 int startIndex = allocatedBytes;
989 allocatedBytes = startIndex + startingCapacity;
990 Chunk chunk = this;
991 chunk.retain();
992 try {
993 buf.init(delegate, chunk, 0, 0, startIndex, size, startingCapacity, maxCapacity);
994 chunk = null;
995 } finally {
996 if (chunk != null) {
997
998
999
1000 allocatedBytes = startIndex;
1001 chunk.release();
1002 }
1003 }
1004 }
1005
1006 public int remainingCapacity() {
1007 return capacity - allocatedBytes;
1008 }
1009
1010 public int capacity() {
1011 return capacity;
1012 }
1013 }
1014
1015 static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
1016
1017 private final ObjectPool.Handle<AdaptiveByteBuf> handle;
1018
1019 private int adjustment;
1020 private AbstractByteBuf rootParent;
1021 Chunk chunk;
1022 private int length;
1023 private int maxFastCapacity;
1024 private ByteBuffer tmpNioBuf;
1025 private boolean hasArray;
1026 private boolean hasMemoryAddress;
1027
1028 AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
1029 super(0);
1030 handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
1031 }
1032
1033 void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
1034 int adjustment, int size, int capacity, int maxCapacity) {
1035 this.adjustment = adjustment;
1036 chunk = wrapped;
1037 length = size;
1038 maxFastCapacity = capacity;
1039 maxCapacity(maxCapacity);
1040 setIndex0(readerIndex, writerIndex);
1041 hasArray = unwrapped.hasArray();
1042 hasMemoryAddress = unwrapped.hasMemoryAddress();
1043 rootParent = unwrapped;
1044 tmpNioBuf = null;
1045 }
1046
1047 private AbstractByteBuf rootParent() {
1048 final AbstractByteBuf rootParent = this.rootParent;
1049 if (rootParent != null) {
1050 return rootParent;
1051 }
1052 throw new IllegalReferenceCountException();
1053 }
1054
1055 @Override
1056 public int capacity() {
1057 return length;
1058 }
1059
1060 @Override
1061 public int maxFastWritableBytes() {
1062 return maxFastCapacity;
1063 }
1064
1065 @Override
1066 public ByteBuf capacity(int newCapacity) {
1067 if (length <= newCapacity && newCapacity <= maxFastCapacity) {
1068 ensureAccessible();
1069 length = newCapacity;
1070 return this;
1071 }
1072 checkNewCapacity(newCapacity);
1073 if (newCapacity < capacity()) {
1074 length = newCapacity;
1075 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
1076 return this;
1077 }
1078
1079
1080 Chunk chunk = this.chunk;
1081 AdaptivePoolingAllocator allocator = chunk.allocator;
1082 int readerIndex = this.readerIndex;
1083 int writerIndex = this.writerIndex;
1084 int baseOldRootIndex = adjustment;
1085 int oldCapacity = length;
1086 AbstractByteBuf oldRoot = rootParent();
1087 length = 0;
1088 allocator.allocate(newCapacity, maxCapacity(), this);
1089 oldRoot.getBytes(baseOldRootIndex, this, 0, oldCapacity);
1090 chunk.release();
1091 this.readerIndex = readerIndex;
1092 this.writerIndex = writerIndex;
1093 return this;
1094 }
1095
1096 @Override
1097 public ByteBufAllocator alloc() {
1098 return rootParent().alloc();
1099 }
1100
1101 @Override
1102 public ByteOrder order() {
1103 return rootParent().order();
1104 }
1105
1106 @Override
1107 public ByteBuf unwrap() {
1108 return null;
1109 }
1110
1111 @Override
1112 public boolean isDirect() {
1113 return rootParent().isDirect();
1114 }
1115
1116 @Override
1117 public int arrayOffset() {
1118 return idx(rootParent().arrayOffset());
1119 }
1120
1121 @Override
1122 public boolean hasMemoryAddress() {
1123 return hasMemoryAddress;
1124 }
1125
1126 @Override
1127 public long memoryAddress() {
1128 ensureAccessible();
1129 return rootParent().memoryAddress() + adjustment;
1130 }
1131
1132 @Override
1133 public ByteBuffer nioBuffer(int index, int length) {
1134 checkIndex(index, length);
1135 return rootParent().nioBuffer(idx(index), length);
1136 }
1137
1138 @Override
1139 public ByteBuffer internalNioBuffer(int index, int length) {
1140 checkIndex(index, length);
1141 return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1142 }
1143
1144 private ByteBuffer internalNioBuffer() {
1145 if (tmpNioBuf == null) {
1146 tmpNioBuf = rootParent().nioBuffer(adjustment, maxFastCapacity);
1147 }
1148 return (ByteBuffer) tmpNioBuf.clear();
1149 }
1150
1151 @Override
1152 public ByteBuffer[] nioBuffers(int index, int length) {
1153 checkIndex(index, length);
1154 return rootParent().nioBuffers(idx(index), length);
1155 }
1156
1157 @Override
1158 public boolean hasArray() {
1159 return hasArray;
1160 }
1161
1162 @Override
1163 public byte[] array() {
1164 ensureAccessible();
1165 return rootParent().array();
1166 }
1167
1168 @Override
1169 public ByteBuf copy(int index, int length) {
1170 checkIndex(index, length);
1171 return rootParent().copy(idx(index), length);
1172 }
1173
1174 @Override
1175 public int nioBufferCount() {
1176 return rootParent().nioBufferCount();
1177 }
1178
1179 @Override
1180 protected byte _getByte(int index) {
1181 return rootParent()._getByte(idx(index));
1182 }
1183
1184 @Override
1185 protected short _getShort(int index) {
1186 return rootParent()._getShort(idx(index));
1187 }
1188
1189 @Override
1190 protected short _getShortLE(int index) {
1191 return rootParent()._getShortLE(idx(index));
1192 }
1193
1194 @Override
1195 protected int _getUnsignedMedium(int index) {
1196 return rootParent()._getUnsignedMedium(idx(index));
1197 }
1198
1199 @Override
1200 protected int _getUnsignedMediumLE(int index) {
1201 return rootParent()._getUnsignedMediumLE(idx(index));
1202 }
1203
1204 @Override
1205 protected int _getInt(int index) {
1206 return rootParent()._getInt(idx(index));
1207 }
1208
1209 @Override
1210 protected int _getIntLE(int index) {
1211 return rootParent()._getIntLE(idx(index));
1212 }
1213
1214 @Override
1215 protected long _getLong(int index) {
1216 return rootParent()._getLong(idx(index));
1217 }
1218
1219 @Override
1220 protected long _getLongLE(int index) {
1221 return rootParent()._getLongLE(idx(index));
1222 }
1223
1224 @Override
1225 public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1226 checkIndex(index, length);
1227 rootParent().getBytes(idx(index), dst, dstIndex, length);
1228 return this;
1229 }
1230
1231 @Override
1232 public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1233 checkIndex(index, length);
1234 rootParent().getBytes(idx(index), dst, dstIndex, length);
1235 return this;
1236 }
1237
1238 @Override
1239 public ByteBuf getBytes(int index, ByteBuffer dst) {
1240 checkIndex(index, dst.remaining());
1241 rootParent().getBytes(idx(index), dst);
1242 return this;
1243 }
1244
1245 @Override
1246 protected void _setByte(int index, int value) {
1247 rootParent()._setByte(idx(index), value);
1248 }
1249
1250 @Override
1251 protected void _setShort(int index, int value) {
1252 rootParent()._setShort(idx(index), value);
1253 }
1254
1255 @Override
1256 protected void _setShortLE(int index, int value) {
1257 rootParent()._setShortLE(idx(index), value);
1258 }
1259
1260 @Override
1261 protected void _setMedium(int index, int value) {
1262 rootParent()._setMedium(idx(index), value);
1263 }
1264
1265 @Override
1266 protected void _setMediumLE(int index, int value) {
1267 rootParent()._setMediumLE(idx(index), value);
1268 }
1269
1270 @Override
1271 protected void _setInt(int index, int value) {
1272 rootParent()._setInt(idx(index), value);
1273 }
1274
1275 @Override
1276 protected void _setIntLE(int index, int value) {
1277 rootParent()._setIntLE(idx(index), value);
1278 }
1279
1280 @Override
1281 protected void _setLong(int index, long value) {
1282 rootParent()._setLong(idx(index), value);
1283 }
1284
1285 @Override
1286 protected void _setLongLE(int index, long value) {
1287 rootParent().setLongLE(idx(index), value);
1288 }
1289
1290 @Override
1291 public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1292 checkIndex(index, length);
1293 ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1294 tmp.put(src, srcIndex, length);
1295 return this;
1296 }
1297
1298 @Override
1299 public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1300 checkIndex(index, length);
1301 ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1302 tmp.put(src.nioBuffer(srcIndex, length));
1303 return this;
1304 }
1305
1306 @Override
1307 public ByteBuf setBytes(int index, ByteBuffer src) {
1308 checkIndex(index, src.remaining());
1309 ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1310 tmp.put(src);
1311 return this;
1312 }
1313
1314 @Override
1315 public ByteBuf getBytes(int index, OutputStream out, int length)
1316 throws IOException {
1317 checkIndex(index, length);
1318 if (length != 0) {
1319 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1320 }
1321 return this;
1322 }
1323
1324 @Override
1325 public int getBytes(int index, GatheringByteChannel out, int length)
1326 throws IOException {
1327 return out.write(internalNioBuffer(index, length).duplicate());
1328 }
1329
1330 @Override
1331 public int getBytes(int index, FileChannel out, long position, int length)
1332 throws IOException {
1333 return out.write(internalNioBuffer(index, length).duplicate(), position);
1334 }
1335
1336 @Override
1337 public int setBytes(int index, InputStream in, int length)
1338 throws IOException {
1339 checkIndex(index, length);
1340 final AbstractByteBuf rootParent = rootParent();
1341 if (rootParent.hasArray()) {
1342 return rootParent.setBytes(idx(index), in, length);
1343 }
1344 byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1345 int readBytes = in.read(tmp, 0, length);
1346 if (readBytes <= 0) {
1347 return readBytes;
1348 }
1349 setBytes(index, tmp, 0, readBytes);
1350 return readBytes;
1351 }
1352
1353 @Override
1354 public int setBytes(int index, ScatteringByteChannel in, int length)
1355 throws IOException {
1356 try {
1357 return in.read(internalNioBuffer(index, length).duplicate());
1358 } catch (ClosedChannelException ignored) {
1359 return -1;
1360 }
1361 }
1362
1363 @Override
1364 public int setBytes(int index, FileChannel in, long position, int length)
1365 throws IOException {
1366 try {
1367 return in.read(internalNioBuffer(index, length).duplicate(), position);
1368 } catch (ClosedChannelException ignored) {
1369 return -1;
1370 }
1371 }
1372
1373 @Override
1374 public int setCharSequence(int index, CharSequence sequence, Charset charset) {
1375 checkIndex(index, sequence.length());
1376 return rootParent().setCharSequence(idx(index), sequence, charset);
1377 }
1378
1379 @Override
1380 public int forEachByte(int index, int length, ByteProcessor processor) {
1381 checkIndex(index, length);
1382 int ret = rootParent().forEachByte(idx(index), length, processor);
1383 return forEachResult(ret);
1384 }
1385
1386 @Override
1387 public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1388 checkIndex(index, length);
1389 int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1390 return forEachResult(ret);
1391 }
1392
1393 private int forEachResult(int ret) {
1394 if (ret < adjustment) {
1395 return -1;
1396 }
1397 return ret - adjustment;
1398 }
1399
1400 @Override
1401 public boolean isContiguous() {
1402 return rootParent().isContiguous();
1403 }
1404
1405 private int idx(int index) {
1406 return index + adjustment;
1407 }
1408
1409 @Override
1410 protected void deallocate() {
1411 if (chunk != null) {
1412 chunk.release();
1413 }
1414 tmpNioBuf = null;
1415 chunk = null;
1416 rootParent = null;
1417 if (handle instanceof EnhancedHandle) {
1418 EnhancedHandle<AdaptiveByteBuf> enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1419 enhancedHandle.unguardedRecycle(this);
1420 } else {
1421 handle.recycle(this);
1422 }
1423 }
1424 }
1425
1426
1427
1428
1429 interface ChunkAllocator {
1430
1431
1432
1433
1434
1435
1436 AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1437 }
1438 }