1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.ByteProcessor;
19 import io.netty.util.IllegalReferenceCountException;
20 import io.netty.util.NettyRuntime;
21 import io.netty.util.Recycler.EnhancedHandle;
22 import io.netty.util.ReferenceCounted;
23 import io.netty.util.concurrent.FastThreadLocal;
24 import io.netty.util.concurrent.FastThreadLocalThread;
25 import io.netty.util.internal.ObjectPool;
26 import io.netty.util.internal.ObjectUtil;
27 import io.netty.util.internal.PlatformDependent;
28 import io.netty.util.internal.ReferenceCountUpdater;
29 import io.netty.util.internal.SuppressJava6Requirement;
30 import io.netty.util.internal.SystemPropertyUtil;
31 import io.netty.util.internal.ThreadExecutorMap;
32 import io.netty.util.internal.ThreadLocalRandom;
33 import io.netty.util.internal.UnstableApi;
34
35 import java.io.IOException;
36 import java.io.InputStream;
37 import java.io.OutputStream;
38 import java.nio.ByteBuffer;
39 import java.nio.ByteOrder;
40 import java.nio.channels.ClosedChannelException;
41 import java.nio.channels.FileChannel;
42 import java.nio.channels.GatheringByteChannel;
43 import java.nio.channels.ScatteringByteChannel;
44 import java.util.Arrays;
45 import java.util.Queue;
46 import java.util.Set;
47 import java.util.concurrent.ConcurrentLinkedQueue;
48 import java.util.concurrent.CopyOnWriteArraySet;
49 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
50 import java.util.concurrent.atomic.AtomicLong;
51 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
52 import java.util.concurrent.locks.StampedLock;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 @SuppressJava6Requirement(reason = "Guarded by version check")
81 @UnstableApi
82 final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.AdaptiveAllocatorApi {
83
84 enum MagazineCaching {
85 EventLoopThreads,
86 FastThreadLocalThreads,
87 None
88 }
89
90
91
92
93
94
95
96
97 private static final int MIN_CHUNK_SIZE = 128 * 1024;
98 private static final int EXPANSION_ATTEMPTS = 3;
99 private static final int INITIAL_MAGAZINES = 4;
100 private static final int RETIRE_CAPACITY = 4 * 1024;
101 private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
102 private static final int BUFS_PER_CHUNK = 10;
103
104
105
106
107
108
109 private static final int MAX_CHUNK_SIZE =
110 BUFS_PER_CHUNK * (1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT);
111
112
113
114
115
116
117
118
119
120 private static final int CENTRAL_QUEUE_CAPACITY = Math.max(2, SystemPropertyUtil.getInt(
121 "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors()));
122
123
124
125
126
127 private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
128 "io.netty.allocator.magazineBufferQueueCapacity", 1024);
129
130 private static final Object NO_MAGAZINE = Boolean.TRUE;
131
132 private final ChunkAllocator chunkAllocator;
133 private final Queue<Chunk> centralQueue;
134 private final StampedLock magazineExpandLock;
135 private volatile Magazine[] magazines;
136 private final FastThreadLocal<Object> threadLocalMagazine;
137 private final Set<Magazine> liveCachedMagazines;
138 private volatile boolean freed;
139
140 static {
141 if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
142 throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
143 + " (expected: >= " + 2 + ')');
144 }
145 }
146
147 AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
148 ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
149 ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
150 this.chunkAllocator = chunkAllocator;
151 centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
152 magazineExpandLock = new StampedLock();
153 if (magazineCaching != MagazineCaching.None) {
154 assert magazineCaching == MagazineCaching.EventLoopThreads ||
155 magazineCaching == MagazineCaching.FastThreadLocalThreads;
156 final boolean cachedMagazinesNonEventLoopThreads =
157 magazineCaching == MagazineCaching.FastThreadLocalThreads;
158 final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
159 threadLocalMagazine = new FastThreadLocal<Object>() {
160 @Override
161 protected Object initialValue() {
162 if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
163 if (!FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread())) {
164
165 return NO_MAGAZINE;
166 }
167 Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
168 liveMagazines.add(mag);
169 return mag;
170 }
171 return NO_MAGAZINE;
172 }
173
174 @Override
175 protected void onRemoval(final Object value) throws Exception {
176 if (value != NO_MAGAZINE) {
177 liveMagazines.remove(value);
178 }
179 }
180 };
181 liveCachedMagazines = liveMagazines;
182 } else {
183 threadLocalMagazine = null;
184 liveCachedMagazines = null;
185 }
186 Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
187 for (int i = 0; i < mags.length; i++) {
188 mags[i] = new Magazine(this);
189 }
190 magazines = mags;
191 }
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213 private static Queue<Chunk> createSharedChunkQueue() {
214 return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
215 }
216
217 @Override
218 public ByteBuf allocate(int size, int maxCapacity) {
219 return allocate(size, maxCapacity, Thread.currentThread(), null);
220 }
221
222 private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
223 if (size <= MAX_CHUNK_SIZE) {
224 int sizeBucket = AllocationStatistics.sizeBucket(size);
225 FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
226 if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
227 Object mag = threadLocalMagazine.get();
228 if (mag != NO_MAGAZINE) {
229 Magazine magazine = (Magazine) mag;
230 if (buf == null) {
231 buf = magazine.newBuffer();
232 }
233 boolean allocated = magazine.tryAllocate(size, sizeBucket, maxCapacity, buf);
234 assert allocated : "Allocation of threadLocalMagazine must always succeed";
235 return buf;
236 }
237 }
238 long threadId = currentThread.getId();
239 Magazine[] mags;
240 int expansions = 0;
241 do {
242 mags = magazines;
243 int mask = mags.length - 1;
244 int index = (int) (threadId & mask);
245 for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
246 Magazine mag = mags[index + i & mask];
247 if (buf == null) {
248 buf = mag.newBuffer();
249 }
250 if (mag.tryAllocate(size, sizeBucket, maxCapacity, buf)) {
251
252 return buf;
253 }
254 }
255 expansions++;
256 } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
257 }
258
259
260 return allocateFallback(size, maxCapacity, currentThread, buf);
261 }
262
263 private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
264
265 Magazine magazine;
266 if (buf != null) {
267 Chunk chunk = buf.chunk;
268 if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
269 magazine = getFallbackMagazine(currentThread);
270 }
271 } else {
272 magazine = getFallbackMagazine(currentThread);
273 buf = magazine.newBuffer();
274 }
275
276 AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
277 Chunk chunk = new Chunk(innerChunk, magazine, false);
278 try {
279 chunk.readInitInto(buf, size, maxCapacity);
280 } finally {
281
282
283
284 chunk.release();
285 }
286 return buf;
287 }
288
289 private Magazine getFallbackMagazine(Thread currentThread) {
290 Object tlMag;
291 FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
292 if (threadLocalMagazine != null &&
293 currentThread instanceof FastThreadLocalThread &&
294 (tlMag = threadLocalMagazine.get()) != NO_MAGAZINE) {
295 return (Magazine) tlMag;
296 }
297 Magazine[] mags = magazines;
298 return mags[(int) currentThread.getId() & mags.length - 1];
299 }
300
301
302
303
304 void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
305 AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
306 assert result == into: "Re-allocation created separate buffer instance";
307 }
308
309 @Override
310 public long usedMemory() {
311 long sum = 0;
312 for (Chunk chunk : centralQueue) {
313 sum += chunk.capacity();
314 }
315 for (Magazine magazine : magazines) {
316 sum += magazine.usedMemory.get();
317 }
318 if (liveCachedMagazines != null) {
319 for (Magazine magazine : liveCachedMagazines) {
320 sum += magazine.usedMemory.get();
321 }
322 }
323 return sum;
324 }
325
326 private boolean tryExpandMagazines(int currentLength) {
327 if (currentLength >= MAX_STRIPES) {
328 return true;
329 }
330 final Magazine[] mags;
331 long writeLock = magazineExpandLock.tryWriteLock();
332 if (writeLock != 0) {
333 try {
334 mags = magazines;
335 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
336 return true;
337 }
338 int preferredChunkSize = mags[0].sharedPrefChunkSize;
339 Magazine[] expanded = new Magazine[mags.length * 2];
340 for (int i = 0, l = expanded.length; i < l; i++) {
341 Magazine m = new Magazine(this);
342 m.localPrefChunkSize = preferredChunkSize;
343 m.sharedPrefChunkSize = preferredChunkSize;
344 expanded[i] = m;
345 }
346 magazines = expanded;
347 } finally {
348 magazineExpandLock.unlockWrite(writeLock);
349 }
350 for (Magazine magazine : mags) {
351 magazine.free();
352 }
353 }
354 return true;
355 }
356
357 private boolean offerToQueue(Chunk buffer) {
358 if (freed) {
359 return false;
360 }
361
362 assert buffer.allocatedBytes == 0;
363 assert buffer.magazine == null;
364
365 boolean isAdded = centralQueue.offer(buffer);
366 if (freed && isAdded) {
367
368 freeCentralQueue();
369 }
370 return isAdded;
371 }
372
373
374
375
376 @Override
377 protected void finalize() throws Throwable {
378 try {
379 super.finalize();
380 } finally {
381 free();
382 }
383 }
384
385 private void free() {
386 freed = true;
387 long stamp = magazineExpandLock.writeLock();
388 try {
389 Magazine[] mags = magazines;
390 for (Magazine magazine : mags) {
391 magazine.free();
392 }
393 } finally {
394 magazineExpandLock.unlockWrite(stamp);
395 }
396 freeCentralQueue();
397 }
398
399 private void freeCentralQueue() {
400 for (;;) {
401 Chunk chunk = centralQueue.poll();
402 if (chunk == null) {
403 break;
404 }
405 chunk.release();
406 }
407 }
408
409 static int sizeBucket(int size) {
410 return AllocationStatistics.sizeBucket(size);
411 }
412
413 @SuppressWarnings("checkstyle:finalclass")
414 private static class AllocationStatistics {
415 private static final int MIN_DATUM_TARGET = 1024;
416 private static final int MAX_DATUM_TARGET = 65534;
417 private static final int INIT_DATUM_TARGET = 9;
418 private static final int HISTO_MIN_BUCKET_SHIFT = 13;
419 private static final int HISTO_MAX_BUCKET_SHIFT = 20;
420 private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT;
421 private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
422 private static final int SIZE_MAX_MASK = MAX_CHUNK_SIZE - 1;
423
424 protected final AdaptivePoolingAllocator parent;
425 private final boolean shareable;
426 private final short[][] histos = {
427 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
428 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
429 };
430 private short[] histo = histos[0];
431 private final int[] sums = new int[HISTO_BUCKET_COUNT];
432
433 private int histoIndex;
434 private int datumCount;
435 private int datumTarget = INIT_DATUM_TARGET;
436 protected volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
437 protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
438
439 private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
440 this.parent = parent;
441 this.shareable = shareable;
442 }
443
444 protected void recordAllocationSize(int bucket) {
445 histo[bucket]++;
446 if (datumCount++ == datumTarget) {
447 rotateHistograms();
448 }
449 }
450
451 static int sizeBucket(int size) {
452 if (size == 0) {
453 return 0;
454 }
455
456
457
458
459 int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & SIZE_MAX_MASK;
460 return Math.min(Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize), HISTO_MAX_BUCKET_MASK);
461 }
462
463 private void rotateHistograms() {
464 short[][] hs = histos;
465 for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
466 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
467 }
468 int sum = 0;
469 for (int count : sums) {
470 sum += count;
471 }
472 int targetPercentile = (int) (sum * 0.99);
473 int sizeBucket = 0;
474 for (; sizeBucket < sums.length; sizeBucket++) {
475 if (sums[sizeBucket] > targetPercentile) {
476 break;
477 }
478 targetPercentile -= sums[sizeBucket];
479 }
480 int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
481 int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
482 localPrefChunkSize = prefChunkSize;
483 if (shareable) {
484 for (Magazine mag : parent.magazines) {
485 prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
486 }
487 }
488 if (sharedPrefChunkSize != prefChunkSize) {
489
490 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
491 sharedPrefChunkSize = prefChunkSize;
492 } else {
493
494 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
495 }
496
497 histoIndex = histoIndex + 1 & 3;
498 histo = histos[histoIndex];
499 datumCount = 0;
500 Arrays.fill(histo, (short) 0);
501 }
502
503
504
505
506
507
508
509
510
511 protected int preferredChunkSize() {
512 return sharedPrefChunkSize;
513 }
514 }
515
516 @SuppressJava6Requirement(reason = "Guarded by version check")
517 private static final class Magazine extends AllocationStatistics {
518 private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
519 static {
520 NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
521 }
522 private static final Chunk MAGAZINE_FREED = new Chunk();
523
524 private static final ObjectPool<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
525 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
526 @Override
527 public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
528 return new AdaptiveByteBuf(handle);
529 }
530 });
531
532 private Chunk current;
533 @SuppressWarnings("unused")
534 private volatile Chunk nextInLine;
535 private final AtomicLong usedMemory;
536 private final StampedLock allocationLock;
537 private final Queue<AdaptiveByteBuf> bufferQueue;
538 private final ObjectPool.Handle<AdaptiveByteBuf> handle;
539
540 Magazine(AdaptivePoolingAllocator parent) {
541 this(parent, true);
542 }
543
544 Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
545 super(parent, shareable);
546
547 if (shareable) {
548
549 allocationLock = new StampedLock();
550 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
551 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
552 @Override
553 public void recycle(AdaptiveByteBuf self) {
554 bufferQueue.offer(self);
555 }
556 };
557 } else {
558 allocationLock = null;
559 bufferQueue = null;
560 handle = null;
561 }
562 usedMemory = new AtomicLong();
563 }
564
565 public boolean tryAllocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
566 if (allocationLock == null) {
567
568 return allocate(size, sizeBucket, maxCapacity, buf);
569 }
570
571
572 long writeLock = allocationLock.tryWriteLock();
573 if (writeLock != 0) {
574 try {
575 return allocate(size, sizeBucket, maxCapacity, buf);
576 } finally {
577 allocationLock.unlockWrite(writeLock);
578 }
579 }
580 return allocateWithoutLock(size, maxCapacity, buf);
581 }
582
583 private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf buf) {
584 Chunk curr = NEXT_IN_LINE.getAndSet(this, null);
585 if (curr == MAGAZINE_FREED) {
586
587 restoreMagazineFreed();
588 return false;
589 }
590 if (curr == null) {
591 curr = parent.centralQueue.poll();
592 if (curr == null) {
593 return false;
594 }
595 curr.attachToMagazine(this);
596 }
597 boolean allocated = false;
598 if (curr.remainingCapacity() >= size) {
599 curr.readInitInto(buf, size, maxCapacity);
600 allocated = true;
601 }
602 try {
603 if (curr.remainingCapacity() >= RETIRE_CAPACITY) {
604 transferToNextInLineOrRelease(curr);
605 curr = null;
606 }
607 } finally {
608 if (curr != null) {
609 curr.release();
610 }
611 }
612 return allocated;
613 }
614
615 private boolean allocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
616 recordAllocationSize(sizeBucket);
617 Chunk curr = current;
618 if (curr != null) {
619
620 if (curr.remainingCapacity() > size) {
621 curr.readInitInto(buf, size, maxCapacity);
622
623 return true;
624 }
625
626
627
628 current = null;
629 if (curr.remainingCapacity() == size) {
630 try {
631 curr.readInitInto(buf, size, maxCapacity);
632 return true;
633 } finally {
634 curr.release();
635 }
636 }
637
638
639 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
640 curr.release();
641 } else {
642
643
644 transferToNextInLineOrRelease(curr);
645 }
646 }
647
648 assert current == null;
649
650
651
652
653
654
655
656 curr = NEXT_IN_LINE.getAndSet(this, null);
657 if (curr != null) {
658 if (curr == MAGAZINE_FREED) {
659
660 restoreMagazineFreed();
661 return false;
662 }
663
664 if (curr.remainingCapacity() > size) {
665
666 curr.readInitInto(buf, size, maxCapacity);
667 current = curr;
668 return true;
669 }
670
671 if (curr.remainingCapacity() == size) {
672
673
674 try {
675 curr.readInitInto(buf, size, maxCapacity);
676 return true;
677 } finally {
678
679
680 curr.release();
681 }
682 } else {
683
684 curr.release();
685 }
686 }
687
688
689 curr = parent.centralQueue.poll();
690 if (curr == null) {
691 curr = newChunkAllocation(size);
692 } else {
693 curr.attachToMagazine(this);
694
695 if (curr.remainingCapacity() < size) {
696
697 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
698 curr.release();
699 } else {
700
701
702 transferToNextInLineOrRelease(curr);
703 }
704 curr = newChunkAllocation(size);
705 }
706 }
707
708 current = curr;
709 try {
710 assert current.remainingCapacity() >= size;
711 if (curr.remainingCapacity() > size) {
712 curr.readInitInto(buf, size, maxCapacity);
713 curr = null;
714 } else {
715 curr.readInitInto(buf, size, maxCapacity);
716 }
717 } finally {
718 if (curr != null) {
719
720
721 curr.release();
722 current = null;
723 }
724 }
725 return true;
726 }
727
728 private void restoreMagazineFreed() {
729 Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
730 if (next != null && next != MAGAZINE_FREED) {
731 next.release();
732 }
733 }
734
735 private void transferToNextInLineOrRelease(Chunk chunk) {
736 if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
737 return;
738 }
739
740 Chunk nextChunk = NEXT_IN_LINE.get(this);
741 if (nextChunk != null && nextChunk != MAGAZINE_FREED
742 && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
743 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
744 nextChunk.release();
745 return;
746 }
747 }
748
749
750
751
752 chunk.release();
753 }
754
755 private Chunk newChunkAllocation(int promptingSize) {
756 int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
757 int minChunks = size / MIN_CHUNK_SIZE;
758 if (MIN_CHUNK_SIZE * minChunks < size) {
759
760
761
762
763 size = MIN_CHUNK_SIZE * (1 + minChunks);
764 }
765 ChunkAllocator chunkAllocator = parent.chunkAllocator;
766 return new Chunk(chunkAllocator.allocate(size, size), this, true);
767 }
768
769 boolean trySetNextInLine(Chunk chunk) {
770 return NEXT_IN_LINE.compareAndSet(this, null, chunk);
771 }
772
773 void free() {
774
775 restoreMagazineFreed();
776 long stamp = allocationLock.writeLock();
777 try {
778 if (current != null) {
779 current.release();
780 current = null;
781 }
782 } finally {
783 allocationLock.unlockWrite(stamp);
784 }
785 }
786
787 public AdaptiveByteBuf newBuffer() {
788 AdaptiveByteBuf buf;
789 if (handle == null) {
790 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
791 } else {
792 buf = bufferQueue.poll();
793 if (buf == null) {
794 buf = new AdaptiveByteBuf(handle);
795 }
796 }
797 buf.resetRefCnt();
798 buf.discardMarks();
799 return buf;
800 }
801 }
802
803 private static final class Chunk implements ReferenceCounted {
804
805 private final AbstractByteBuf delegate;
806 private Magazine magazine;
807 private final AdaptivePoolingAllocator allocator;
808 private final int capacity;
809 private final boolean pooled;
810 private int allocatedBytes;
811 private static final long REFCNT_FIELD_OFFSET =
812 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
813 private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
814 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
815
816 private static final ReferenceCountUpdater<Chunk> updater =
817 new ReferenceCountUpdater<Chunk>() {
818 @Override
819 protected AtomicIntegerFieldUpdater<Chunk> updater() {
820 return AIF_UPDATER;
821 }
822 @Override
823 protected long unsafeOffset() {
824 return REFCNT_FIELD_OFFSET;
825 }
826 };
827
828
829 @SuppressWarnings({"unused", "FieldMayBeFinal"})
830 private volatile int refCnt;
831
832 Chunk() {
833
834 delegate = null;
835 magazine = null;
836 allocator = null;
837 capacity = 0;
838 pooled = false;
839 }
840
841 Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
842 this.delegate = delegate;
843 this.pooled = pooled;
844 capacity = delegate.capacity();
845 updater.setInitialValue(this);
846 allocator = magazine.parent;
847 attachToMagazine(magazine);
848 }
849
850 Magazine currentMagazine() {
851 return magazine;
852 }
853
854 void detachFromMagazine() {
855 if (magazine != null) {
856 magazine.usedMemory.getAndAdd(-capacity);
857 magazine = null;
858 }
859 }
860
861 void attachToMagazine(Magazine magazine) {
862 assert this.magazine == null;
863 this.magazine = magazine;
864 magazine.usedMemory.getAndAdd(capacity);
865 }
866
867 @Override
868 public Chunk touch(Object hint) {
869 return this;
870 }
871
872 @Override
873 public int refCnt() {
874 return updater.refCnt(this);
875 }
876
877 @Override
878 public Chunk retain() {
879 return updater.retain(this);
880 }
881
882 @Override
883 public Chunk retain(int increment) {
884 return updater.retain(this, increment);
885 }
886
887 @Override
888 public Chunk touch() {
889 return this;
890 }
891
892 @Override
893 public boolean release() {
894 if (updater.release(this)) {
895 deallocate();
896 return true;
897 }
898 return false;
899 }
900
901 @Override
902 public boolean release(int decrement) {
903 if (updater.release(this, decrement)) {
904 deallocate();
905 return true;
906 }
907 return false;
908 }
909
910 private void deallocate() {
911 Magazine mag = magazine;
912 AdaptivePoolingAllocator parent = mag.parent;
913 int chunkSize = mag.preferredChunkSize();
914 int memSize = delegate.capacity();
915 if (!pooled || shouldReleaseSuboptimalChunkSize(memSize, chunkSize)) {
916
917
918 detachFromMagazine();
919 delegate.release();
920 } else {
921 updater.resetRefCnt(this);
922 delegate.setIndex(0, 0);
923 allocatedBytes = 0;
924 if (!mag.trySetNextInLine(this)) {
925
926 detachFromMagazine();
927 if (!parent.offerToQueue(this)) {
928
929
930 boolean released = updater.release(this);
931 delegate.release();
932 assert released;
933 }
934 }
935 }
936 }
937
938 private static boolean shouldReleaseSuboptimalChunkSize(int givenSize, int preferredSize) {
939 int givenChunks = givenSize / MIN_CHUNK_SIZE;
940 int preferredChunks = preferredSize / MIN_CHUNK_SIZE;
941 int deviation = Math.abs(givenChunks - preferredChunks);
942
943
944 return deviation != 0 &&
945 PlatformDependent.threadLocalRandom().nextDouble() * 20.0 < deviation;
946 }
947
948 public void readInitInto(AdaptiveByteBuf buf, int size, int maxCapacity) {
949 int startIndex = allocatedBytes;
950 allocatedBytes = startIndex + size;
951 Chunk chunk = this;
952 chunk.retain();
953 try {
954 buf.init(delegate, chunk, 0, 0, startIndex, size, maxCapacity);
955 chunk = null;
956 } finally {
957 if (chunk != null) {
958
959
960
961 allocatedBytes = startIndex;
962 chunk.release();
963 }
964 }
965 }
966
967 public int remainingCapacity() {
968 return capacity - allocatedBytes;
969 }
970
971 public int capacity() {
972 return capacity;
973 }
974 }
975
976 static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
977
978 private final ObjectPool.Handle<AdaptiveByteBuf> handle;
979
980 private int adjustment;
981 private AbstractByteBuf rootParent;
982 Chunk chunk;
983 private int length;
984 private ByteBuffer tmpNioBuf;
985 private boolean hasArray;
986 private boolean hasMemoryAddress;
987
988 AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
989 super(0);
990 handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
991 }
992
993 void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
994 int adjustment, int capacity, int maxCapacity) {
995 this.adjustment = adjustment;
996 chunk = wrapped;
997 length = capacity;
998 maxCapacity(maxCapacity);
999 setIndex0(readerIndex, writerIndex);
1000 hasArray = unwrapped.hasArray();
1001 hasMemoryAddress = unwrapped.hasMemoryAddress();
1002 rootParent = unwrapped;
1003 tmpNioBuf = unwrapped.internalNioBuffer(adjustment, capacity).slice();
1004 }
1005
1006 private AbstractByteBuf rootParent() {
1007 final AbstractByteBuf rootParent = this.rootParent;
1008 if (rootParent != null) {
1009 return rootParent;
1010 }
1011 throw new IllegalReferenceCountException();
1012 }
1013
1014 @Override
1015 public int capacity() {
1016 return length;
1017 }
1018
1019 @Override
1020 public ByteBuf capacity(int newCapacity) {
1021 if (newCapacity == capacity()) {
1022 ensureAccessible();
1023 return this;
1024 }
1025 checkNewCapacity(newCapacity);
1026 if (newCapacity < capacity()) {
1027 length = newCapacity;
1028 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
1029 return this;
1030 }
1031
1032
1033 ByteBuffer data = tmpNioBuf;
1034 data.clear();
1035 tmpNioBuf = null;
1036 Chunk chunk = this.chunk;
1037 AdaptivePoolingAllocator allocator = chunk.allocator;
1038 int readerIndex = this.readerIndex;
1039 int writerIndex = this.writerIndex;
1040 allocator.allocate(newCapacity, maxCapacity(), this);
1041 tmpNioBuf.put(data);
1042 tmpNioBuf.clear();
1043 chunk.release();
1044 this.readerIndex = readerIndex;
1045 this.writerIndex = writerIndex;
1046 return this;
1047 }
1048
1049 @Override
1050 public ByteBufAllocator alloc() {
1051 return rootParent().alloc();
1052 }
1053
1054 @Override
1055 public ByteOrder order() {
1056 return rootParent().order();
1057 }
1058
1059 @Override
1060 public ByteBuf unwrap() {
1061 return null;
1062 }
1063
1064 @Override
1065 public boolean isDirect() {
1066 return rootParent().isDirect();
1067 }
1068
1069 @Override
1070 public int arrayOffset() {
1071 return idx(rootParent().arrayOffset());
1072 }
1073
1074 @Override
1075 public boolean hasMemoryAddress() {
1076 return hasMemoryAddress;
1077 }
1078
1079 @Override
1080 public long memoryAddress() {
1081 ensureAccessible();
1082 return rootParent().memoryAddress() + adjustment;
1083 }
1084
1085 @Override
1086 public ByteBuffer nioBuffer(int index, int length) {
1087 checkIndex(index, length);
1088 return rootParent().nioBuffer(idx(index), length);
1089 }
1090
1091 @Override
1092 public ByteBuffer internalNioBuffer(int index, int length) {
1093 checkIndex(index, length);
1094 return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1095 }
1096
1097 private ByteBuffer internalNioBuffer() {
1098 return (ByteBuffer) tmpNioBuf.clear();
1099 }
1100
1101 @Override
1102 public ByteBuffer[] nioBuffers(int index, int length) {
1103 checkIndex(index, length);
1104 return rootParent().nioBuffers(idx(index), length);
1105 }
1106
1107 @Override
1108 public boolean hasArray() {
1109 return hasArray;
1110 }
1111
1112 @Override
1113 public byte[] array() {
1114 ensureAccessible();
1115 return rootParent().array();
1116 }
1117
1118 @Override
1119 public ByteBuf copy(int index, int length) {
1120 checkIndex(index, length);
1121 return rootParent().copy(idx(index), length);
1122 }
1123
1124 @Override
1125 public int nioBufferCount() {
1126 return rootParent().nioBufferCount();
1127 }
1128
1129 @Override
1130 protected byte _getByte(int index) {
1131 return rootParent()._getByte(idx(index));
1132 }
1133
1134 @Override
1135 protected short _getShort(int index) {
1136 return rootParent()._getShort(idx(index));
1137 }
1138
1139 @Override
1140 protected short _getShortLE(int index) {
1141 return rootParent()._getShortLE(idx(index));
1142 }
1143
1144 @Override
1145 protected int _getUnsignedMedium(int index) {
1146 return rootParent()._getUnsignedMedium(idx(index));
1147 }
1148
1149 @Override
1150 protected int _getUnsignedMediumLE(int index) {
1151 return rootParent()._getUnsignedMediumLE(idx(index));
1152 }
1153
1154 @Override
1155 protected int _getInt(int index) {
1156 return rootParent()._getInt(idx(index));
1157 }
1158
1159 @Override
1160 protected int _getIntLE(int index) {
1161 return rootParent()._getIntLE(idx(index));
1162 }
1163
1164 @Override
1165 protected long _getLong(int index) {
1166 return rootParent()._getLong(idx(index));
1167 }
1168
1169 @Override
1170 protected long _getLongLE(int index) {
1171 return rootParent()._getLongLE(idx(index));
1172 }
1173
1174 @Override
1175 public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1176 checkIndex(index, length);
1177 rootParent().getBytes(idx(index), dst, dstIndex, length);
1178 return this;
1179 }
1180
1181 @Override
1182 public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1183 checkIndex(index, length);
1184 rootParent().getBytes(idx(index), dst, dstIndex, length);
1185 return this;
1186 }
1187
1188 @Override
1189 public ByteBuf getBytes(int index, ByteBuffer dst) {
1190 checkIndex(index, dst.remaining());
1191 rootParent().getBytes(idx(index), dst);
1192 return this;
1193 }
1194
1195 @Override
1196 protected void _setByte(int index, int value) {
1197 rootParent()._setByte(idx(index), value);
1198 }
1199
1200 @Override
1201 protected void _setShort(int index, int value) {
1202 rootParent()._setShort(idx(index), value);
1203 }
1204
1205 @Override
1206 protected void _setShortLE(int index, int value) {
1207 rootParent()._setShortLE(idx(index), value);
1208 }
1209
1210 @Override
1211 protected void _setMedium(int index, int value) {
1212 rootParent()._setMedium(idx(index), value);
1213 }
1214
1215 @Override
1216 protected void _setMediumLE(int index, int value) {
1217 rootParent()._setMediumLE(idx(index), value);
1218 }
1219
1220 @Override
1221 protected void _setInt(int index, int value) {
1222 rootParent()._setInt(idx(index), value);
1223 }
1224
1225 @Override
1226 protected void _setIntLE(int index, int value) {
1227 rootParent()._setIntLE(idx(index), value);
1228 }
1229
1230 @Override
1231 protected void _setLong(int index, long value) {
1232 rootParent()._setLong(idx(index), value);
1233 }
1234
1235 @Override
1236 protected void _setLongLE(int index, long value) {
1237 rootParent().setLongLE(idx(index), value);
1238 }
1239
1240 @Override
1241 public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1242 checkIndex(index, length);
1243 rootParent().setBytes(idx(index), src, srcIndex, length);
1244 return this;
1245 }
1246
1247 @Override
1248 public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1249 checkIndex(index, length);
1250 rootParent().setBytes(idx(index), src, srcIndex, length);
1251 return this;
1252 }
1253
1254 @Override
1255 public ByteBuf setBytes(int index, ByteBuffer src) {
1256 checkIndex(index, src.remaining());
1257 rootParent().setBytes(idx(index), src);
1258 return this;
1259 }
1260
1261 @Override
1262 public ByteBuf getBytes(int index, OutputStream out, int length)
1263 throws IOException {
1264 checkIndex(index, length);
1265 if (length != 0) {
1266 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1267 }
1268 return this;
1269 }
1270
1271 @Override
1272 public int getBytes(int index, GatheringByteChannel out, int length)
1273 throws IOException {
1274 return out.write(internalNioBuffer(index, length).duplicate());
1275 }
1276
1277 @Override
1278 public int getBytes(int index, FileChannel out, long position, int length)
1279 throws IOException {
1280 return out.write(internalNioBuffer(index, length).duplicate(), position);
1281 }
1282
1283 @Override
1284 public int setBytes(int index, InputStream in, int length)
1285 throws IOException {
1286 checkIndex(index, length);
1287 final AbstractByteBuf rootParent = rootParent();
1288 if (rootParent.hasArray()) {
1289 return rootParent.setBytes(idx(index), in, length);
1290 }
1291 byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1292 int readBytes = in.read(tmp, 0, length);
1293 if (readBytes <= 0) {
1294 return readBytes;
1295 }
1296 setBytes(index, tmp, 0, readBytes);
1297 return readBytes;
1298 }
1299
1300 @Override
1301 public int setBytes(int index, ScatteringByteChannel in, int length)
1302 throws IOException {
1303 try {
1304 return in.read(internalNioBuffer(index, length).duplicate());
1305 } catch (ClosedChannelException ignored) {
1306 return -1;
1307 }
1308 }
1309
1310 @Override
1311 public int setBytes(int index, FileChannel in, long position, int length)
1312 throws IOException {
1313 try {
1314 return in.read(internalNioBuffer(index, length).duplicate(), position);
1315 } catch (ClosedChannelException ignored) {
1316 return -1;
1317 }
1318 }
1319
1320 @Override
1321 public int forEachByte(int index, int length, ByteProcessor processor) {
1322 checkIndex(index, length);
1323 int ret = rootParent().forEachByte(idx(index), length, processor);
1324 return forEachResult(ret);
1325 }
1326
1327 @Override
1328 public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1329 checkIndex(index, length);
1330 int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1331 return forEachResult(ret);
1332 }
1333
1334 private int forEachResult(int ret) {
1335 if (ret < adjustment) {
1336 return -1;
1337 }
1338 return ret - adjustment;
1339 }
1340
1341 @Override
1342 public boolean isContiguous() {
1343 return rootParent().isContiguous();
1344 }
1345
1346 private int idx(int index) {
1347 return index + adjustment;
1348 }
1349
1350 @Override
1351 protected void deallocate() {
1352 if (chunk != null) {
1353 chunk.release();
1354 }
1355 tmpNioBuf = null;
1356 chunk = null;
1357 rootParent = null;
1358 if (handle instanceof EnhancedHandle) {
1359 EnhancedHandle<AdaptiveByteBuf> enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1360 enhancedHandle.unguardedRecycle(this);
1361 } else {
1362 handle.recycle(this);
1363 }
1364 }
1365 }
1366
1367
1368
1369
1370 interface ChunkAllocator {
1371
1372
1373
1374
1375
1376
1377 AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1378 }
1379 }