1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.ByteProcessor;
19 import io.netty.util.IllegalReferenceCountException;
20 import io.netty.util.NettyRuntime;
21 import io.netty.util.Recycler.EnhancedHandle;
22 import io.netty.util.ReferenceCounted;
23 import io.netty.util.concurrent.FastThreadLocal;
24 import io.netty.util.concurrent.FastThreadLocalThread;
25 import io.netty.util.internal.ObjectPool;
26 import io.netty.util.internal.ObjectUtil;
27 import io.netty.util.internal.PlatformDependent;
28 import io.netty.util.internal.ReferenceCountUpdater;
29 import io.netty.util.internal.SuppressJava6Requirement;
30 import io.netty.util.internal.SystemPropertyUtil;
31 import io.netty.util.internal.ThreadExecutorMap;
32 import io.netty.util.internal.ThreadLocalRandom;
33 import io.netty.util.internal.UnstableApi;
34
35 import java.io.IOException;
36 import java.io.InputStream;
37 import java.io.OutputStream;
38 import java.nio.ByteBuffer;
39 import java.nio.ByteOrder;
40 import java.nio.channels.ClosedChannelException;
41 import java.nio.channels.FileChannel;
42 import java.nio.channels.GatheringByteChannel;
43 import java.nio.channels.ScatteringByteChannel;
44 import java.nio.charset.Charset;
45 import java.util.Arrays;
46 import java.util.Queue;
47 import java.util.Set;
48 import java.util.concurrent.ConcurrentLinkedQueue;
49 import java.util.concurrent.CopyOnWriteArraySet;
50 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
51 import java.util.concurrent.atomic.AtomicLong;
52 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
53 import java.util.concurrent.locks.StampedLock;
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81 @SuppressJava6Requirement(reason = "Guarded by version check")
82 @UnstableApi
83 final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.AdaptiveAllocatorApi {
84
85 enum MagazineCaching {
86 EventLoopThreads,
87 FastThreadLocalThreads,
88 None
89 }
90
91
92
93
94
95
96
97
98 private static final int MIN_CHUNK_SIZE = 128 * 1024;
99 private static final int EXPANSION_ATTEMPTS = 3;
100 private static final int INITIAL_MAGAZINES = 4;
101 private static final int RETIRE_CAPACITY = 4 * 1024;
102 private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
103 private static final int BUFS_PER_CHUNK = 10;
104
105
106
107
108
109
110 private static final int MAX_CHUNK_SIZE =
111 BUFS_PER_CHUNK * (1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT);
112
113
114
115
116
117
118
119
120
121 private static final int CENTRAL_QUEUE_CAPACITY = Math.max(2, SystemPropertyUtil.getInt(
122 "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors()));
123
124
125
126
127
128 private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
129 "io.netty.allocator.magazineBufferQueueCapacity", 1024);
130
131 private static final Object NO_MAGAZINE = Boolean.TRUE;
132
133 private final ChunkAllocator chunkAllocator;
134 private final Queue<Chunk> centralQueue;
135 private final StampedLock magazineExpandLock;
136 private volatile Magazine[] magazines;
137 private final FastThreadLocal<Object> threadLocalMagazine;
138 private final Set<Magazine> liveCachedMagazines;
139 private volatile boolean freed;
140
141 static {
142 if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
143 throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
144 + " (expected: >= " + 2 + ')');
145 }
146 }
147
148 AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
149 ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
150 ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
151 this.chunkAllocator = chunkAllocator;
152 centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
153 magazineExpandLock = new StampedLock();
154 if (magazineCaching != MagazineCaching.None) {
155 assert magazineCaching == MagazineCaching.EventLoopThreads ||
156 magazineCaching == MagazineCaching.FastThreadLocalThreads;
157 final boolean cachedMagazinesNonEventLoopThreads =
158 magazineCaching == MagazineCaching.FastThreadLocalThreads;
159 final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
160 threadLocalMagazine = new FastThreadLocal<Object>() {
161 @Override
162 protected Object initialValue() {
163 if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
164 if (!FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread())) {
165
166 return NO_MAGAZINE;
167 }
168 Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
169 liveMagazines.add(mag);
170 return mag;
171 }
172 return NO_MAGAZINE;
173 }
174
175 @Override
176 protected void onRemoval(final Object value) throws Exception {
177 if (value != NO_MAGAZINE) {
178 liveMagazines.remove(value);
179 }
180 }
181 };
182 liveCachedMagazines = liveMagazines;
183 } else {
184 threadLocalMagazine = null;
185 liveCachedMagazines = null;
186 }
187 Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
188 for (int i = 0; i < mags.length; i++) {
189 mags[i] = new Magazine(this);
190 }
191 magazines = mags;
192 }
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214 private static Queue<Chunk> createSharedChunkQueue() {
215 return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
216 }
217
218 @Override
219 public ByteBuf allocate(int size, int maxCapacity) {
220 return allocate(size, maxCapacity, Thread.currentThread(), null);
221 }
222
223 private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
224 if (size <= MAX_CHUNK_SIZE) {
225 int sizeBucket = AllocationStatistics.sizeBucket(size);
226 FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
227 if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
228 Object mag = threadLocalMagazine.get();
229 if (mag != NO_MAGAZINE) {
230 Magazine magazine = (Magazine) mag;
231 if (buf == null) {
232 buf = magazine.newBuffer();
233 }
234 boolean allocated = magazine.tryAllocate(size, sizeBucket, maxCapacity, buf);
235 assert allocated : "Allocation of threadLocalMagazine must always succeed";
236 return buf;
237 }
238 }
239 long threadId = currentThread.getId();
240 Magazine[] mags;
241 int expansions = 0;
242 do {
243 mags = magazines;
244 int mask = mags.length - 1;
245 int index = (int) (threadId & mask);
246 for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
247 Magazine mag = mags[index + i & mask];
248 if (buf == null) {
249 buf = mag.newBuffer();
250 }
251 if (mag.tryAllocate(size, sizeBucket, maxCapacity, buf)) {
252
253 return buf;
254 }
255 }
256 expansions++;
257 } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
258 }
259
260
261 return allocateFallback(size, maxCapacity, currentThread, buf);
262 }
263
264 private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
265
266 Magazine magazine;
267 if (buf != null) {
268 Chunk chunk = buf.chunk;
269 if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
270 magazine = getFallbackMagazine(currentThread);
271 }
272 } else {
273 magazine = getFallbackMagazine(currentThread);
274 buf = magazine.newBuffer();
275 }
276
277 AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
278 Chunk chunk = new Chunk(innerChunk, magazine, false);
279 try {
280 chunk.readInitInto(buf, size, maxCapacity);
281 } finally {
282
283
284
285 chunk.release();
286 }
287 return buf;
288 }
289
290 private Magazine getFallbackMagazine(Thread currentThread) {
291 Object tlMag;
292 FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
293 if (threadLocalMagazine != null &&
294 currentThread instanceof FastThreadLocalThread &&
295 (tlMag = threadLocalMagazine.get()) != NO_MAGAZINE) {
296 return (Magazine) tlMag;
297 }
298 Magazine[] mags = magazines;
299 return mags[(int) currentThread.getId() & mags.length - 1];
300 }
301
302
303
304
305 void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
306 AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
307 assert result == into: "Re-allocation created separate buffer instance";
308 }
309
310 @Override
311 public long usedMemory() {
312 long sum = 0;
313 for (Chunk chunk : centralQueue) {
314 sum += chunk.capacity();
315 }
316 for (Magazine magazine : magazines) {
317 sum += magazine.usedMemory.get();
318 }
319 if (liveCachedMagazines != null) {
320 for (Magazine magazine : liveCachedMagazines) {
321 sum += magazine.usedMemory.get();
322 }
323 }
324 return sum;
325 }
326
327 private boolean tryExpandMagazines(int currentLength) {
328 if (currentLength >= MAX_STRIPES) {
329 return true;
330 }
331 final Magazine[] mags;
332 long writeLock = magazineExpandLock.tryWriteLock();
333 if (writeLock != 0) {
334 try {
335 mags = magazines;
336 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
337 return true;
338 }
339 int preferredChunkSize = mags[0].sharedPrefChunkSize;
340 Magazine[] expanded = new Magazine[mags.length * 2];
341 for (int i = 0, l = expanded.length; i < l; i++) {
342 Magazine m = new Magazine(this);
343 m.localPrefChunkSize = preferredChunkSize;
344 m.sharedPrefChunkSize = preferredChunkSize;
345 expanded[i] = m;
346 }
347 magazines = expanded;
348 } finally {
349 magazineExpandLock.unlockWrite(writeLock);
350 }
351 for (Magazine magazine : mags) {
352 magazine.free();
353 }
354 }
355 return true;
356 }
357
358 private boolean offerToQueue(Chunk buffer) {
359 if (freed) {
360 return false;
361 }
362
363 assert buffer.allocatedBytes == 0;
364 assert buffer.magazine == null;
365
366 boolean isAdded = centralQueue.offer(buffer);
367 if (freed && isAdded) {
368
369 freeCentralQueue();
370 }
371 return isAdded;
372 }
373
374
375
376
377 @Override
378 protected void finalize() throws Throwable {
379 try {
380 super.finalize();
381 } finally {
382 free();
383 }
384 }
385
386 private void free() {
387 freed = true;
388 long stamp = magazineExpandLock.writeLock();
389 try {
390 Magazine[] mags = magazines;
391 for (Magazine magazine : mags) {
392 magazine.free();
393 }
394 } finally {
395 magazineExpandLock.unlockWrite(stamp);
396 }
397 freeCentralQueue();
398 }
399
400 private void freeCentralQueue() {
401 for (;;) {
402 Chunk chunk = centralQueue.poll();
403 if (chunk == null) {
404 break;
405 }
406 chunk.release();
407 }
408 }
409
410 static int sizeBucket(int size) {
411 return AllocationStatistics.sizeBucket(size);
412 }
413
414 @SuppressWarnings("checkstyle:finalclass")
415 private static class AllocationStatistics {
416 private static final int MIN_DATUM_TARGET = 1024;
417 private static final int MAX_DATUM_TARGET = 65534;
418 private static final int INIT_DATUM_TARGET = 9;
419 private static final int HISTO_MIN_BUCKET_SHIFT = 13;
420 private static final int HISTO_MAX_BUCKET_SHIFT = 20;
421 private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT;
422 private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
423 private static final int SIZE_MAX_MASK = MAX_CHUNK_SIZE - 1;
424
425 protected final AdaptivePoolingAllocator parent;
426 private final boolean shareable;
427 private final short[][] histos = {
428 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
429 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
430 };
431 private short[] histo = histos[0];
432 private final int[] sums = new int[HISTO_BUCKET_COUNT];
433
434 private int histoIndex;
435 private int datumCount;
436 private int datumTarget = INIT_DATUM_TARGET;
437 protected volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
438 protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
439
440 private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
441 this.parent = parent;
442 this.shareable = shareable;
443 }
444
445 protected void recordAllocationSize(int bucket) {
446 histo[bucket]++;
447 if (datumCount++ == datumTarget) {
448 rotateHistograms();
449 }
450 }
451
452 static int sizeBucket(int size) {
453 if (size == 0) {
454 return 0;
455 }
456
457
458
459
460 int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & SIZE_MAX_MASK;
461 return Math.min(Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize), HISTO_MAX_BUCKET_MASK);
462 }
463
464 private void rotateHistograms() {
465 short[][] hs = histos;
466 for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
467 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
468 }
469 int sum = 0;
470 for (int count : sums) {
471 sum += count;
472 }
473 int targetPercentile = (int) (sum * 0.99);
474 int sizeBucket = 0;
475 for (; sizeBucket < sums.length; sizeBucket++) {
476 if (sums[sizeBucket] > targetPercentile) {
477 break;
478 }
479 targetPercentile -= sums[sizeBucket];
480 }
481 int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
482 int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
483 localPrefChunkSize = prefChunkSize;
484 if (shareable) {
485 for (Magazine mag : parent.magazines) {
486 prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
487 }
488 }
489 if (sharedPrefChunkSize != prefChunkSize) {
490
491 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
492 sharedPrefChunkSize = prefChunkSize;
493 } else {
494
495 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
496 }
497
498 histoIndex = histoIndex + 1 & 3;
499 histo = histos[histoIndex];
500 datumCount = 0;
501 Arrays.fill(histo, (short) 0);
502 }
503
504
505
506
507
508
509
510
511
512 protected int preferredChunkSize() {
513 return sharedPrefChunkSize;
514 }
515 }
516
517 @SuppressJava6Requirement(reason = "Guarded by version check")
518 private static final class Magazine extends AllocationStatistics {
519 private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
520 static {
521 NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
522 }
523 private static final Chunk MAGAZINE_FREED = new Chunk();
524
525 private static final ObjectPool<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
526 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
527 @Override
528 public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
529 return new AdaptiveByteBuf(handle);
530 }
531 });
532
533 private Chunk current;
534 @SuppressWarnings("unused")
535 private volatile Chunk nextInLine;
536 private final AtomicLong usedMemory;
537 private final StampedLock allocationLock;
538 private final Queue<AdaptiveByteBuf> bufferQueue;
539 private final ObjectPool.Handle<AdaptiveByteBuf> handle;
540
541 Magazine(AdaptivePoolingAllocator parent) {
542 this(parent, true);
543 }
544
545 Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
546 super(parent, shareable);
547
548 if (shareable) {
549
550 allocationLock = new StampedLock();
551 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
552 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
553 @Override
554 public void recycle(AdaptiveByteBuf self) {
555 bufferQueue.offer(self);
556 }
557 };
558 } else {
559 allocationLock = null;
560 bufferQueue = null;
561 handle = null;
562 }
563 usedMemory = new AtomicLong();
564 }
565
566 public boolean tryAllocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
567 if (allocationLock == null) {
568
569 return allocate(size, sizeBucket, maxCapacity, buf);
570 }
571
572
573 long writeLock = allocationLock.tryWriteLock();
574 if (writeLock != 0) {
575 try {
576 return allocate(size, sizeBucket, maxCapacity, buf);
577 } finally {
578 allocationLock.unlockWrite(writeLock);
579 }
580 }
581 return allocateWithoutLock(size, maxCapacity, buf);
582 }
583
584 private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf buf) {
585 Chunk curr = NEXT_IN_LINE.getAndSet(this, null);
586 if (curr == MAGAZINE_FREED) {
587
588 restoreMagazineFreed();
589 return false;
590 }
591 if (curr == null) {
592 curr = parent.centralQueue.poll();
593 if (curr == null) {
594 return false;
595 }
596 curr.attachToMagazine(this);
597 }
598 boolean allocated = false;
599 if (curr.remainingCapacity() >= size) {
600 curr.readInitInto(buf, size, maxCapacity);
601 allocated = true;
602 }
603 try {
604 if (curr.remainingCapacity() >= RETIRE_CAPACITY) {
605 transferToNextInLineOrRelease(curr);
606 curr = null;
607 }
608 } finally {
609 if (curr != null) {
610 curr.release();
611 }
612 }
613 return allocated;
614 }
615
616 private boolean allocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
617 recordAllocationSize(sizeBucket);
618 Chunk curr = current;
619 if (curr != null) {
620
621 if (curr.remainingCapacity() > size) {
622 curr.readInitInto(buf, size, maxCapacity);
623
624 return true;
625 }
626
627
628
629 current = null;
630 if (curr.remainingCapacity() == size) {
631 try {
632 curr.readInitInto(buf, size, maxCapacity);
633 return true;
634 } finally {
635 curr.release();
636 }
637 }
638
639
640 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
641 curr.release();
642 } else {
643
644
645 transferToNextInLineOrRelease(curr);
646 }
647 }
648
649 assert current == null;
650
651
652
653
654
655
656
657 curr = NEXT_IN_LINE.getAndSet(this, null);
658 if (curr != null) {
659 if (curr == MAGAZINE_FREED) {
660
661 restoreMagazineFreed();
662 return false;
663 }
664
665 if (curr.remainingCapacity() > size) {
666
667 curr.readInitInto(buf, size, maxCapacity);
668 current = curr;
669 return true;
670 }
671
672 if (curr.remainingCapacity() == size) {
673
674
675 try {
676 curr.readInitInto(buf, size, maxCapacity);
677 return true;
678 } finally {
679
680
681 curr.release();
682 }
683 } else {
684
685 curr.release();
686 }
687 }
688
689
690 curr = parent.centralQueue.poll();
691 if (curr == null) {
692 curr = newChunkAllocation(size);
693 } else {
694 curr.attachToMagazine(this);
695
696 if (curr.remainingCapacity() < size) {
697
698 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
699 curr.release();
700 } else {
701
702
703 transferToNextInLineOrRelease(curr);
704 }
705 curr = newChunkAllocation(size);
706 }
707 }
708
709 current = curr;
710 try {
711 assert current.remainingCapacity() >= size;
712 if (curr.remainingCapacity() > size) {
713 curr.readInitInto(buf, size, maxCapacity);
714 curr = null;
715 } else {
716 curr.readInitInto(buf, size, maxCapacity);
717 }
718 } finally {
719 if (curr != null) {
720
721
722 curr.release();
723 current = null;
724 }
725 }
726 return true;
727 }
728
729 private void restoreMagazineFreed() {
730 Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
731 if (next != null && next != MAGAZINE_FREED) {
732 next.release();
733 }
734 }
735
736 private void transferToNextInLineOrRelease(Chunk chunk) {
737 if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
738 return;
739 }
740
741 Chunk nextChunk = NEXT_IN_LINE.get(this);
742 if (nextChunk != null && nextChunk != MAGAZINE_FREED
743 && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
744 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
745 nextChunk.release();
746 return;
747 }
748 }
749
750
751
752
753 chunk.release();
754 }
755
756 private Chunk newChunkAllocation(int promptingSize) {
757 int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
758 int minChunks = size / MIN_CHUNK_SIZE;
759 if (MIN_CHUNK_SIZE * minChunks < size) {
760
761
762
763
764 size = MIN_CHUNK_SIZE * (1 + minChunks);
765 }
766 ChunkAllocator chunkAllocator = parent.chunkAllocator;
767 return new Chunk(chunkAllocator.allocate(size, size), this, true);
768 }
769
770 boolean trySetNextInLine(Chunk chunk) {
771 return NEXT_IN_LINE.compareAndSet(this, null, chunk);
772 }
773
774 void free() {
775
776 restoreMagazineFreed();
777 long stamp = allocationLock.writeLock();
778 try {
779 if (current != null) {
780 current.release();
781 current = null;
782 }
783 } finally {
784 allocationLock.unlockWrite(stamp);
785 }
786 }
787
788 public AdaptiveByteBuf newBuffer() {
789 AdaptiveByteBuf buf;
790 if (handle == null) {
791 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
792 } else {
793 buf = bufferQueue.poll();
794 if (buf == null) {
795 buf = new AdaptiveByteBuf(handle);
796 }
797 }
798 buf.resetRefCnt();
799 buf.discardMarks();
800 return buf;
801 }
802 }
803
804 private static final class Chunk implements ReferenceCounted {
805
806 private final AbstractByteBuf delegate;
807 private Magazine magazine;
808 private final AdaptivePoolingAllocator allocator;
809 private final int capacity;
810 private final boolean pooled;
811 private int allocatedBytes;
812 private static final long REFCNT_FIELD_OFFSET =
813 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
814 private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
815 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
816
817 private static final ReferenceCountUpdater<Chunk> updater =
818 new ReferenceCountUpdater<Chunk>() {
819 @Override
820 protected AtomicIntegerFieldUpdater<Chunk> updater() {
821 return AIF_UPDATER;
822 }
823 @Override
824 protected long unsafeOffset() {
825 return REFCNT_FIELD_OFFSET;
826 }
827 };
828
829
830 @SuppressWarnings({"unused", "FieldMayBeFinal"})
831 private volatile int refCnt;
832
833 Chunk() {
834
835 delegate = null;
836 magazine = null;
837 allocator = null;
838 capacity = 0;
839 pooled = false;
840 }
841
842 Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
843 this.delegate = delegate;
844 this.pooled = pooled;
845 capacity = delegate.capacity();
846 updater.setInitialValue(this);
847 allocator = magazine.parent;
848 attachToMagazine(magazine);
849 }
850
851 Magazine currentMagazine() {
852 return magazine;
853 }
854
855 void detachFromMagazine() {
856 if (magazine != null) {
857 magazine.usedMemory.getAndAdd(-capacity);
858 magazine = null;
859 }
860 }
861
862 void attachToMagazine(Magazine magazine) {
863 assert this.magazine == null;
864 this.magazine = magazine;
865 magazine.usedMemory.getAndAdd(capacity);
866 }
867
868 @Override
869 public Chunk touch(Object hint) {
870 return this;
871 }
872
873 @Override
874 public int refCnt() {
875 return updater.refCnt(this);
876 }
877
878 @Override
879 public Chunk retain() {
880 return updater.retain(this);
881 }
882
883 @Override
884 public Chunk retain(int increment) {
885 return updater.retain(this, increment);
886 }
887
888 @Override
889 public Chunk touch() {
890 return this;
891 }
892
893 @Override
894 public boolean release() {
895 if (updater.release(this)) {
896 deallocate();
897 return true;
898 }
899 return false;
900 }
901
902 @Override
903 public boolean release(int decrement) {
904 if (updater.release(this, decrement)) {
905 deallocate();
906 return true;
907 }
908 return false;
909 }
910
911 private void deallocate() {
912 Magazine mag = magazine;
913 AdaptivePoolingAllocator parent = mag.parent;
914 int chunkSize = mag.preferredChunkSize();
915 int memSize = delegate.capacity();
916 if (!pooled || shouldReleaseSuboptimalChunkSize(memSize, chunkSize)) {
917
918
919 detachFromMagazine();
920 delegate.release();
921 } else {
922 updater.resetRefCnt(this);
923 delegate.setIndex(0, 0);
924 allocatedBytes = 0;
925 if (!mag.trySetNextInLine(this)) {
926
927 detachFromMagazine();
928 if (!parent.offerToQueue(this)) {
929
930
931 boolean released = updater.release(this);
932 delegate.release();
933 assert released;
934 }
935 }
936 }
937 }
938
939 private static boolean shouldReleaseSuboptimalChunkSize(int givenSize, int preferredSize) {
940 int givenChunks = givenSize / MIN_CHUNK_SIZE;
941 int preferredChunks = preferredSize / MIN_CHUNK_SIZE;
942 int deviation = Math.abs(givenChunks - preferredChunks);
943
944
945 return deviation != 0 &&
946 PlatformDependent.threadLocalRandom().nextDouble() * 20.0 < deviation;
947 }
948
949 public void readInitInto(AdaptiveByteBuf buf, int size, int maxCapacity) {
950 int startIndex = allocatedBytes;
951 allocatedBytes = startIndex + size;
952 Chunk chunk = this;
953 chunk.retain();
954 try {
955 buf.init(delegate, chunk, 0, 0, startIndex, size, maxCapacity);
956 chunk = null;
957 } finally {
958 if (chunk != null) {
959
960
961
962 allocatedBytes = startIndex;
963 chunk.release();
964 }
965 }
966 }
967
968 public int remainingCapacity() {
969 return capacity - allocatedBytes;
970 }
971
972 public int capacity() {
973 return capacity;
974 }
975 }
976
977 static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
978
979 private final ObjectPool.Handle<AdaptiveByteBuf> handle;
980
981 private int adjustment;
982 private AbstractByteBuf rootParent;
983 Chunk chunk;
984 private int length;
985 private ByteBuffer tmpNioBuf;
986 private boolean hasArray;
987 private boolean hasMemoryAddress;
988
989 AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
990 super(0);
991 handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
992 }
993
994 void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
995 int adjustment, int capacity, int maxCapacity) {
996 this.adjustment = adjustment;
997 chunk = wrapped;
998 length = capacity;
999 maxCapacity(maxCapacity);
1000 setIndex0(readerIndex, writerIndex);
1001 hasArray = unwrapped.hasArray();
1002 hasMemoryAddress = unwrapped.hasMemoryAddress();
1003 rootParent = unwrapped;
1004 tmpNioBuf = null;
1005 }
1006
1007 private AbstractByteBuf rootParent() {
1008 final AbstractByteBuf rootParent = this.rootParent;
1009 if (rootParent != null) {
1010 return rootParent;
1011 }
1012 throw new IllegalReferenceCountException();
1013 }
1014
1015 @Override
1016 public int capacity() {
1017 return length;
1018 }
1019
1020 @Override
1021 public ByteBuf capacity(int newCapacity) {
1022 if (newCapacity == capacity()) {
1023 ensureAccessible();
1024 return this;
1025 }
1026 checkNewCapacity(newCapacity);
1027 if (newCapacity < capacity()) {
1028 length = newCapacity;
1029 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
1030 return this;
1031 }
1032
1033
1034 Chunk chunk = this.chunk;
1035 AdaptivePoolingAllocator allocator = chunk.allocator;
1036 int readerIndex = this.readerIndex;
1037 int writerIndex = this.writerIndex;
1038 int baseOldRootIndex = adjustment;
1039 int oldCapacity = length;
1040 AbstractByteBuf oldRoot = rootParent();
1041 allocator.allocate(newCapacity, maxCapacity(), this);
1042 oldRoot.getBytes(baseOldRootIndex, this, 0, oldCapacity);
1043 chunk.release();
1044 this.readerIndex = readerIndex;
1045 this.writerIndex = writerIndex;
1046 return this;
1047 }
1048
1049 @Override
1050 public ByteBufAllocator alloc() {
1051 return rootParent().alloc();
1052 }
1053
1054 @Override
1055 public ByteOrder order() {
1056 return rootParent().order();
1057 }
1058
1059 @Override
1060 public ByteBuf unwrap() {
1061 return null;
1062 }
1063
1064 @Override
1065 public boolean isDirect() {
1066 return rootParent().isDirect();
1067 }
1068
1069 @Override
1070 public int arrayOffset() {
1071 return idx(rootParent().arrayOffset());
1072 }
1073
1074 @Override
1075 public boolean hasMemoryAddress() {
1076 return hasMemoryAddress;
1077 }
1078
1079 @Override
1080 public long memoryAddress() {
1081 ensureAccessible();
1082 return rootParent().memoryAddress() + adjustment;
1083 }
1084
1085 @Override
1086 public ByteBuffer nioBuffer(int index, int length) {
1087 checkIndex(index, length);
1088 return rootParent().nioBuffer(idx(index), length);
1089 }
1090
1091 @Override
1092 public ByteBuffer internalNioBuffer(int index, int length) {
1093 checkIndex(index, length);
1094 return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1095 }
1096
1097 private ByteBuffer internalNioBuffer() {
1098 if (tmpNioBuf == null) {
1099 tmpNioBuf = rootParent().nioBuffer(adjustment, length);
1100 }
1101 return (ByteBuffer) tmpNioBuf.clear();
1102 }
1103
1104 @Override
1105 public ByteBuffer[] nioBuffers(int index, int length) {
1106 checkIndex(index, length);
1107 return rootParent().nioBuffers(idx(index), length);
1108 }
1109
1110 @Override
1111 public boolean hasArray() {
1112 return hasArray;
1113 }
1114
1115 @Override
1116 public byte[] array() {
1117 ensureAccessible();
1118 return rootParent().array();
1119 }
1120
1121 @Override
1122 public ByteBuf copy(int index, int length) {
1123 checkIndex(index, length);
1124 return rootParent().copy(idx(index), length);
1125 }
1126
1127 @Override
1128 public int nioBufferCount() {
1129 return rootParent().nioBufferCount();
1130 }
1131
1132 @Override
1133 protected byte _getByte(int index) {
1134 return rootParent()._getByte(idx(index));
1135 }
1136
1137 @Override
1138 protected short _getShort(int index) {
1139 return rootParent()._getShort(idx(index));
1140 }
1141
1142 @Override
1143 protected short _getShortLE(int index) {
1144 return rootParent()._getShortLE(idx(index));
1145 }
1146
1147 @Override
1148 protected int _getUnsignedMedium(int index) {
1149 return rootParent()._getUnsignedMedium(idx(index));
1150 }
1151
1152 @Override
1153 protected int _getUnsignedMediumLE(int index) {
1154 return rootParent()._getUnsignedMediumLE(idx(index));
1155 }
1156
1157 @Override
1158 protected int _getInt(int index) {
1159 return rootParent()._getInt(idx(index));
1160 }
1161
1162 @Override
1163 protected int _getIntLE(int index) {
1164 return rootParent()._getIntLE(idx(index));
1165 }
1166
1167 @Override
1168 protected long _getLong(int index) {
1169 return rootParent()._getLong(idx(index));
1170 }
1171
1172 @Override
1173 protected long _getLongLE(int index) {
1174 return rootParent()._getLongLE(idx(index));
1175 }
1176
1177 @Override
1178 public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1179 checkIndex(index, length);
1180 rootParent().getBytes(idx(index), dst, dstIndex, length);
1181 return this;
1182 }
1183
1184 @Override
1185 public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1186 checkIndex(index, length);
1187 rootParent().getBytes(idx(index), dst, dstIndex, length);
1188 return this;
1189 }
1190
1191 @Override
1192 public ByteBuf getBytes(int index, ByteBuffer dst) {
1193 checkIndex(index, dst.remaining());
1194 rootParent().getBytes(idx(index), dst);
1195 return this;
1196 }
1197
1198 @Override
1199 protected void _setByte(int index, int value) {
1200 rootParent()._setByte(idx(index), value);
1201 }
1202
1203 @Override
1204 protected void _setShort(int index, int value) {
1205 rootParent()._setShort(idx(index), value);
1206 }
1207
1208 @Override
1209 protected void _setShortLE(int index, int value) {
1210 rootParent()._setShortLE(idx(index), value);
1211 }
1212
1213 @Override
1214 protected void _setMedium(int index, int value) {
1215 rootParent()._setMedium(idx(index), value);
1216 }
1217
1218 @Override
1219 protected void _setMediumLE(int index, int value) {
1220 rootParent()._setMediumLE(idx(index), value);
1221 }
1222
1223 @Override
1224 protected void _setInt(int index, int value) {
1225 rootParent()._setInt(idx(index), value);
1226 }
1227
1228 @Override
1229 protected void _setIntLE(int index, int value) {
1230 rootParent()._setIntLE(idx(index), value);
1231 }
1232
1233 @Override
1234 protected void _setLong(int index, long value) {
1235 rootParent()._setLong(idx(index), value);
1236 }
1237
1238 @Override
1239 protected void _setLongLE(int index, long value) {
1240 rootParent().setLongLE(idx(index), value);
1241 }
1242
1243 @Override
1244 public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1245 checkIndex(index, length);
1246 ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1247 tmp.put(src, srcIndex, length);
1248 return this;
1249 }
1250
1251 @Override
1252 public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1253 checkIndex(index, length);
1254 ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1255 tmp.put(src.nioBuffer(srcIndex, length));
1256 return this;
1257 }
1258
1259 @Override
1260 public ByteBuf setBytes(int index, ByteBuffer src) {
1261 checkIndex(index, src.remaining());
1262 ByteBuffer tmp = (ByteBuffer) internalNioBuffer().clear().position(index);
1263 tmp.put(src);
1264 return this;
1265 }
1266
1267 @Override
1268 public ByteBuf getBytes(int index, OutputStream out, int length)
1269 throws IOException {
1270 checkIndex(index, length);
1271 if (length != 0) {
1272 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1273 }
1274 return this;
1275 }
1276
1277 @Override
1278 public int getBytes(int index, GatheringByteChannel out, int length)
1279 throws IOException {
1280 return out.write(internalNioBuffer(index, length).duplicate());
1281 }
1282
1283 @Override
1284 public int getBytes(int index, FileChannel out, long position, int length)
1285 throws IOException {
1286 return out.write(internalNioBuffer(index, length).duplicate(), position);
1287 }
1288
1289 @Override
1290 public int setBytes(int index, InputStream in, int length)
1291 throws IOException {
1292 checkIndex(index, length);
1293 final AbstractByteBuf rootParent = rootParent();
1294 if (rootParent.hasArray()) {
1295 return rootParent.setBytes(idx(index), in, length);
1296 }
1297 byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1298 int readBytes = in.read(tmp, 0, length);
1299 if (readBytes <= 0) {
1300 return readBytes;
1301 }
1302 setBytes(index, tmp, 0, readBytes);
1303 return readBytes;
1304 }
1305
1306 @Override
1307 public int setBytes(int index, ScatteringByteChannel in, int length)
1308 throws IOException {
1309 try {
1310 return in.read(internalNioBuffer(index, length).duplicate());
1311 } catch (ClosedChannelException ignored) {
1312 return -1;
1313 }
1314 }
1315
1316 @Override
1317 public int setBytes(int index, FileChannel in, long position, int length)
1318 throws IOException {
1319 try {
1320 return in.read(internalNioBuffer(index, length).duplicate(), position);
1321 } catch (ClosedChannelException ignored) {
1322 return -1;
1323 }
1324 }
1325
1326 @Override
1327 public int setCharSequence(int index, CharSequence sequence, Charset charset) {
1328 checkIndex(index, sequence.length());
1329 return rootParent().setCharSequence(idx(index), sequence, charset);
1330 }
1331
1332 @Override
1333 public int forEachByte(int index, int length, ByteProcessor processor) {
1334 checkIndex(index, length);
1335 int ret = rootParent().forEachByte(idx(index), length, processor);
1336 return forEachResult(ret);
1337 }
1338
1339 @Override
1340 public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1341 checkIndex(index, length);
1342 int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1343 return forEachResult(ret);
1344 }
1345
1346 private int forEachResult(int ret) {
1347 if (ret < adjustment) {
1348 return -1;
1349 }
1350 return ret - adjustment;
1351 }
1352
1353 @Override
1354 public boolean isContiguous() {
1355 return rootParent().isContiguous();
1356 }
1357
1358 private int idx(int index) {
1359 return index + adjustment;
1360 }
1361
1362 @Override
1363 protected void deallocate() {
1364 if (chunk != null) {
1365 chunk.release();
1366 }
1367 tmpNioBuf = null;
1368 chunk = null;
1369 rootParent = null;
1370 if (handle instanceof EnhancedHandle) {
1371 EnhancedHandle<AdaptiveByteBuf> enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1372 enhancedHandle.unguardedRecycle(this);
1373 } else {
1374 handle.recycle(this);
1375 }
1376 }
1377 }
1378
1379
1380
1381
1382 interface ChunkAllocator {
1383
1384
1385
1386
1387
1388
1389 AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1390 }
1391 }