1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.ByteProcessor;
19 import io.netty.util.IllegalReferenceCountException;
20 import io.netty.util.NettyRuntime;
21 import io.netty.util.Recycler.EnhancedHandle;
22 import io.netty.util.ReferenceCounted;
23 import io.netty.util.concurrent.FastThreadLocal;
24 import io.netty.util.concurrent.FastThreadLocalThread;
25 import io.netty.util.internal.ObjectPool;
26 import io.netty.util.internal.ObjectUtil;
27 import io.netty.util.internal.PlatformDependent;
28 import io.netty.util.internal.ReferenceCountUpdater;
29 import io.netty.util.internal.SystemPropertyUtil;
30 import io.netty.util.internal.ThreadExecutorMap;
31 import io.netty.util.internal.UnstableApi;
32
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.io.OutputStream;
36 import java.nio.ByteBuffer;
37 import java.nio.ByteOrder;
38 import java.nio.channels.ClosedChannelException;
39 import java.nio.channels.FileChannel;
40 import java.nio.channels.GatheringByteChannel;
41 import java.nio.channels.ScatteringByteChannel;
42 import java.util.Arrays;
43 import java.util.Queue;
44 import java.util.Set;
45 import java.util.concurrent.ConcurrentLinkedQueue;
46 import java.util.concurrent.CopyOnWriteArraySet;
47 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
48 import java.util.concurrent.atomic.AtomicLong;
49 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
50 import java.util.concurrent.locks.StampedLock;
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 @UnstableApi
79 final class AdaptivePoolingAllocator {
80
81 enum MagazineCaching {
82 EventLoopThreads,
83 FastThreadLocalThreads,
84 None
85 }
86
87 private static final int EXPANSION_ATTEMPTS = 3;
88 private static final int INITIAL_MAGAZINES = 4;
89 private static final int RETIRE_CAPACITY = 4 * 1024;
90 private static final int MIN_CHUNK_SIZE = 128 * 1024;
91 private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
92 private static final int BUFS_PER_CHUNK = 10;
93
94
95
96
97
98
99 private static final int MAX_CHUNK_SIZE =
100 BUFS_PER_CHUNK * (1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT);
101
102
103
104
105
106
107
108
109
110 private static final int CENTRAL_QUEUE_CAPACITY = Math.max(2, SystemPropertyUtil.getInt(
111 "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors()));
112
113
114
115
116
117 private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
118 "io.netty.allocator.magazineBufferQueueCapacity", 1024);
119
120 private static final Object NO_MAGAZINE = Boolean.TRUE;
121
122 private final ChunkAllocator chunkAllocator;
123 private final Queue<Chunk> centralQueue;
124 private final StampedLock magazineExpandLock;
125 private volatile Magazine[] magazines;
126 private final FastThreadLocal<Object> threadLocalMagazine;
127 private final Set<Magazine> liveCachedMagazines;
128 private volatile boolean freed;
129
130 static {
131 if (CENTRAL_QUEUE_CAPACITY < 2) {
132 throw new IllegalArgumentException("CENTRAL_QUEUE_CAPACITY: " + CENTRAL_QUEUE_CAPACITY
133 + " (expected: >= " + 2 + ')');
134 }
135 if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
136 throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
137 + " (expected: >= " + 2 + ')');
138 }
139 }
140
141 AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
142 ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
143 ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
144 this.chunkAllocator = chunkAllocator;
145 centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
146 magazineExpandLock = new StampedLock();
147 if (magazineCaching != MagazineCaching.None) {
148 assert magazineCaching == MagazineCaching.EventLoopThreads ||
149 magazineCaching == MagazineCaching.FastThreadLocalThreads;
150 final boolean cachedMagazinesNonEventLoopThreads =
151 magazineCaching == MagazineCaching.FastThreadLocalThreads;
152 final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
153 threadLocalMagazine = new FastThreadLocal<Object>() {
154 @Override
155 protected Object initialValue() {
156 if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
157 if (!FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread())) {
158
159 return NO_MAGAZINE;
160 }
161 Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
162 liveMagazines.add(mag);
163 return mag;
164 }
165 return NO_MAGAZINE;
166 }
167
168 @Override
169 protected void onRemoval(final Object value) throws Exception {
170 if (value != NO_MAGAZINE) {
171 liveMagazines.remove(value);
172 }
173 }
174 };
175 liveCachedMagazines = liveMagazines;
176 } else {
177 threadLocalMagazine = null;
178 liveCachedMagazines = null;
179 }
180 Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
181 for (int i = 0; i < mags.length; i++) {
182 mags[i] = new Magazine(this);
183 }
184 magazines = mags;
185 }
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207 private static Queue<Chunk> createSharedChunkQueue() {
208 return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
209 }
210
211 ByteBuf allocate(int size, int maxCapacity) {
212 return allocate(size, maxCapacity, Thread.currentThread(), null);
213 }
214
215 private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
216 if (size <= MAX_CHUNK_SIZE) {
217 int sizeBucket = AllocationStatistics.sizeBucket(size);
218 FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
219 if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
220 Object mag = threadLocalMagazine.get();
221 if (mag != NO_MAGAZINE) {
222 Magazine magazine = (Magazine) mag;
223 if (buf == null) {
224 buf = magazine.newBuffer();
225 }
226 boolean allocated = magazine.tryAllocate(size, sizeBucket, maxCapacity, buf);
227 assert allocated : "Allocation of threadLocalMagazine must always succeed";
228 return buf;
229 }
230 }
231 long threadId = currentThread.getId();
232 Magazine[] mags;
233 int expansions = 0;
234 do {
235 mags = magazines;
236 int mask = mags.length - 1;
237 int index = (int) (threadId & mask);
238 for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
239 Magazine mag = mags[index + i & mask];
240 if (buf == null) {
241 buf = mag.newBuffer();
242 }
243 if (mag.tryAllocate(size, sizeBucket, maxCapacity, buf)) {
244
245 return buf;
246 }
247 }
248 expansions++;
249 } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
250 }
251
252
253 return allocateFallback(size, maxCapacity, currentThread, buf);
254 }
255
256 private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
257
258 Magazine magazine;
259 if (buf != null) {
260 Chunk chunk = buf.chunk;
261 if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
262 magazine = getFallbackMagazine(currentThread);
263 }
264 } else {
265 magazine = getFallbackMagazine(currentThread);
266 buf = magazine.newBuffer();
267 }
268
269 AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
270 Chunk chunk = new Chunk(innerChunk, magazine, false);
271 try {
272 chunk.readInitInto(buf, size, maxCapacity);
273 } finally {
274
275
276
277 chunk.release();
278 }
279 return buf;
280 }
281
282 private Magazine getFallbackMagazine(Thread currentThread) {
283 Object tlMag;
284 FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
285 if (threadLocalMagazine != null &&
286 currentThread instanceof FastThreadLocalThread &&
287 (tlMag = threadLocalMagazine.get()) != NO_MAGAZINE) {
288 return (Magazine) tlMag;
289 }
290 Magazine[] mags = magazines;
291 return mags[(int) currentThread.getId() & mags.length - 1];
292 }
293
294
295
296
297 void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
298 AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
299 assert result == into: "Re-allocation created separate buffer instance";
300 }
301
302 long usedMemory() {
303 long sum = 0;
304 for (Chunk chunk : centralQueue) {
305 sum += chunk.capacity();
306 }
307 for (Magazine magazine : magazines) {
308 sum += magazine.usedMemory.get();
309 }
310 if (liveCachedMagazines != null) {
311 for (Magazine magazine : liveCachedMagazines) {
312 sum += magazine.usedMemory.get();
313 }
314 }
315 return sum;
316 }
317
318 private boolean tryExpandMagazines(int currentLength) {
319 if (currentLength >= MAX_STRIPES) {
320 return true;
321 }
322 final Magazine[] mags;
323 long writeLock = magazineExpandLock.tryWriteLock();
324 if (writeLock != 0) {
325 try {
326 mags = magazines;
327 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
328 return true;
329 }
330 int preferredChunkSize = mags[0].sharedPrefChunkSize;
331 Magazine[] expanded = new Magazine[mags.length * 2];
332 for (int i = 0, l = expanded.length; i < l; i++) {
333 Magazine m = new Magazine(this);
334 m.localPrefChunkSize = preferredChunkSize;
335 m.sharedPrefChunkSize = preferredChunkSize;
336 expanded[i] = m;
337 }
338 magazines = expanded;
339 } finally {
340 magazineExpandLock.unlockWrite(writeLock);
341 }
342 for (Magazine magazine : mags) {
343 magazine.free();
344 }
345 }
346 return true;
347 }
348
349 private boolean offerToQueue(Chunk buffer) {
350 if (freed) {
351 return false;
352 }
353
354 assert buffer.allocatedBytes == 0;
355 assert buffer.magazine == null;
356
357 boolean isAdded = centralQueue.offer(buffer);
358 if (freed && isAdded) {
359
360 freeCentralQueue();
361 }
362 return isAdded;
363 }
364
365
366
367
368 @Override
369 protected void finalize() throws Throwable {
370 try {
371 super.finalize();
372 } finally {
373 free();
374 }
375 }
376
377 private void free() {
378 freed = true;
379 long stamp = magazineExpandLock.writeLock();
380 try {
381 Magazine[] mags = magazines;
382 for (Magazine magazine : mags) {
383 magazine.free();
384 }
385 } finally {
386 magazineExpandLock.unlockWrite(stamp);
387 }
388 freeCentralQueue();
389 }
390
391 private void freeCentralQueue() {
392 for (;;) {
393 Chunk chunk = centralQueue.poll();
394 if (chunk == null) {
395 break;
396 }
397 chunk.release();
398 }
399 }
400
401 static int sizeBucket(int size) {
402 return AllocationStatistics.sizeBucket(size);
403 }
404
405 @SuppressWarnings("checkstyle:finalclass")
406 private static class AllocationStatistics {
407 private static final int MIN_DATUM_TARGET = 1024;
408 private static final int MAX_DATUM_TARGET = 65534;
409 private static final int INIT_DATUM_TARGET = 9;
410 private static final int HISTO_MIN_BUCKET_SHIFT = 13;
411 private static final int HISTO_MAX_BUCKET_SHIFT = 20;
412 private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT;
413 private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
414 private static final int SIZE_MAX_MASK = MAX_CHUNK_SIZE - 1;
415
416 protected final AdaptivePoolingAllocator parent;
417 private final boolean shareable;
418 private final short[][] histos = {
419 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
420 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
421 };
422 private short[] histo = histos[0];
423 private final int[] sums = new int[HISTO_BUCKET_COUNT];
424
425 private int histoIndex;
426 private int datumCount;
427 private int datumTarget = INIT_DATUM_TARGET;
428 protected volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
429 protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
430
431 private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
432 this.parent = parent;
433 this.shareable = shareable;
434 }
435
436 protected void recordAllocationSize(int bucket) {
437 histo[bucket]++;
438 if (datumCount++ == datumTarget) {
439 rotateHistograms();
440 }
441 }
442
443 static int sizeBucket(int size) {
444 if (size == 0) {
445 return 0;
446 }
447
448
449
450
451 int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & SIZE_MAX_MASK;
452 return Math.min(Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize), HISTO_MAX_BUCKET_MASK);
453 }
454
455 private void rotateHistograms() {
456 short[][] hs = histos;
457 for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
458 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
459 }
460 int sum = 0;
461 for (int count : sums) {
462 sum += count;
463 }
464 int targetPercentile = (int) (sum * 0.99);
465 int sizeBucket = 0;
466 for (; sizeBucket < sums.length; sizeBucket++) {
467 if (sums[sizeBucket] > targetPercentile) {
468 break;
469 }
470 targetPercentile -= sums[sizeBucket];
471 }
472 int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
473 int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
474 localPrefChunkSize = prefChunkSize;
475 if (shareable) {
476 for (Magazine mag : parent.magazines) {
477 prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
478 }
479 }
480 if (sharedPrefChunkSize != prefChunkSize) {
481
482 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
483 sharedPrefChunkSize = prefChunkSize;
484 } else {
485
486 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
487 }
488
489 histoIndex = histoIndex + 1 & 3;
490 histo = histos[histoIndex];
491 datumCount = 0;
492 Arrays.fill(histo, (short) 0);
493 }
494
495
496
497
498
499
500
501
502
503 protected int preferredChunkSize() {
504 return sharedPrefChunkSize;
505 }
506 }
507
508 private static final class Magazine extends AllocationStatistics {
509 private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
510 static {
511 NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
512 }
513 private static final Chunk MAGAZINE_FREED = new Chunk();
514
515 private static final ObjectPool<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
516 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
517 @Override
518 public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
519 return new AdaptiveByteBuf(handle);
520 }
521 });
522
523 private Chunk current;
524 @SuppressWarnings("unused")
525 private volatile Chunk nextInLine;
526 private final AtomicLong usedMemory;
527 private final StampedLock allocationLock;
528 private final Queue<AdaptiveByteBuf> bufferQueue;
529 private final ObjectPool.Handle<AdaptiveByteBuf> handle;
530
531 Magazine(AdaptivePoolingAllocator parent) {
532 this(parent, true);
533 }
534
535 Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
536 super(parent, shareable);
537
538 if (shareable) {
539
540 allocationLock = new StampedLock();
541 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
542 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
543 @Override
544 public void recycle(AdaptiveByteBuf self) {
545 bufferQueue.offer(self);
546 }
547 };
548 } else {
549 allocationLock = null;
550 bufferQueue = null;
551 handle = null;
552 }
553 usedMemory = new AtomicLong();
554 }
555
556 public boolean tryAllocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
557 if (allocationLock == null) {
558
559 return allocate(size, sizeBucket, maxCapacity, buf);
560 }
561
562
563 long writeLock = allocationLock.tryWriteLock();
564 if (writeLock != 0) {
565 try {
566 return allocate(size, sizeBucket, maxCapacity, buf);
567 } finally {
568 allocationLock.unlockWrite(writeLock);
569 }
570 }
571 return false;
572 }
573
574 private boolean allocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
575 recordAllocationSize(sizeBucket);
576 Chunk curr = current;
577 if (curr != null) {
578
579 if (curr.remainingCapacity() > size) {
580 curr.readInitInto(buf, size, maxCapacity);
581
582 return true;
583 }
584
585
586
587 current = null;
588 if (curr.remainingCapacity() == size) {
589 try {
590 curr.readInitInto(buf, size, maxCapacity);
591 return true;
592 } finally {
593 curr.release();
594 }
595 }
596
597
598 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
599 curr.release();
600 } else {
601
602
603 transferToNextInLineOrRelease(curr);
604 }
605 }
606
607 assert current == null;
608
609
610
611
612
613
614
615 if (nextInLine != null) {
616 curr = NEXT_IN_LINE.getAndSet(this, null);
617 if (curr == MAGAZINE_FREED) {
618
619 restoreMagazineFreed();
620 return false;
621 }
622
623 if (curr.remainingCapacity() > size) {
624
625 curr.readInitInto(buf, size, maxCapacity);
626 current = curr;
627 return true;
628 }
629
630 if (curr.remainingCapacity() == size) {
631
632
633 try {
634 curr.readInitInto(buf, size, maxCapacity);
635 return true;
636 } finally {
637
638
639 curr.release();
640 }
641 } else {
642
643 curr.release();
644 }
645 }
646
647
648 curr = parent.centralQueue.poll();
649 if (curr == null) {
650 curr = newChunkAllocation(size);
651 } else {
652 curr.attachToMagazine(this);
653
654 if (curr.remainingCapacity() < size) {
655
656 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
657 curr.release();
658 } else {
659
660
661 transferToNextInLineOrRelease(curr);
662 }
663 curr = newChunkAllocation(size);
664 }
665 }
666
667 current = curr;
668 try {
669 assert current.remainingCapacity() >= size;
670 if (curr.remainingCapacity() > size) {
671 curr.readInitInto(buf, size, maxCapacity);
672 curr = null;
673 } else {
674 curr.readInitInto(buf, size, maxCapacity);
675 }
676 } finally {
677 if (curr != null) {
678
679
680 curr.release();
681 current = null;
682 }
683 }
684 return true;
685 }
686
687 private void restoreMagazineFreed() {
688 Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
689 if (next != null && next != MAGAZINE_FREED) {
690 next.release();
691 }
692 }
693
694 private void transferToNextInLineOrRelease(Chunk chunk) {
695 if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
696 return;
697 }
698
699 Chunk nextChunk = NEXT_IN_LINE.get(this);
700 if (nextChunk != null && nextChunk != MAGAZINE_FREED
701 && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
702 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
703 nextChunk.release();
704 return;
705 }
706 }
707
708
709
710
711 chunk.release();
712 }
713
714 private Chunk newChunkAllocation(int promptingSize) {
715 int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
716 ChunkAllocator chunkAllocator = parent.chunkAllocator;
717 return new Chunk(chunkAllocator.allocate(size, size), this, true);
718 }
719
720 boolean trySetNextInLine(Chunk chunk) {
721 return NEXT_IN_LINE.compareAndSet(this, null, chunk);
722 }
723
724 void free() {
725
726 restoreMagazineFreed();
727 long stamp = allocationLock.writeLock();
728 try {
729 if (current != null) {
730 current.release();
731 current = null;
732 }
733 } finally {
734 allocationLock.unlockWrite(stamp);
735 }
736 }
737
738 public AdaptiveByteBuf newBuffer() {
739 AdaptiveByteBuf buf;
740 if (handle == null) {
741 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
742 } else {
743 buf = bufferQueue.poll();
744 if (buf == null) {
745 buf = new AdaptiveByteBuf(handle);
746 }
747 }
748 buf.resetRefCnt();
749 buf.discardMarks();
750 return buf;
751 }
752 }
753
754 private static final class Chunk implements ReferenceCounted {
755
756 private final AbstractByteBuf delegate;
757 private Magazine magazine;
758 private final AdaptivePoolingAllocator allocator;
759 private final int capacity;
760 private final boolean pooled;
761 private int allocatedBytes;
762 private static final long REFCNT_FIELD_OFFSET =
763 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
764 private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
765 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
766
767 private static final ReferenceCountUpdater<Chunk> updater =
768 new ReferenceCountUpdater<Chunk>() {
769 @Override
770 protected AtomicIntegerFieldUpdater<Chunk> updater() {
771 return AIF_UPDATER;
772 }
773 @Override
774 protected long unsafeOffset() {
775 return REFCNT_FIELD_OFFSET;
776 }
777 };
778
779
780 @SuppressWarnings({"unused", "FieldMayBeFinal"})
781 private volatile int refCnt;
782
783 Chunk() {
784
785 delegate = null;
786 magazine = null;
787 allocator = null;
788 capacity = 0;
789 pooled = false;
790 }
791
792 Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
793 this.delegate = delegate;
794 this.pooled = pooled;
795 capacity = delegate.capacity();
796 updater.setInitialValue(this);
797 allocator = magazine.parent;
798 attachToMagazine(magazine);
799 }
800
801 Magazine currentMagazine() {
802 return magazine;
803 }
804
805 void detachFromMagazine() {
806 if (magazine != null) {
807 magazine.usedMemory.getAndAdd(-capacity);
808 magazine = null;
809 }
810 }
811
812 void attachToMagazine(Magazine magazine) {
813 assert this.magazine == null;
814 this.magazine = magazine;
815 magazine.usedMemory.getAndAdd(capacity);
816 }
817
818 @Override
819 public Chunk touch(Object hint) {
820 return this;
821 }
822
823 @Override
824 public int refCnt() {
825 return updater.refCnt(this);
826 }
827
828 @Override
829 public Chunk retain() {
830 return updater.retain(this);
831 }
832
833 @Override
834 public Chunk retain(int increment) {
835 return updater.retain(this, increment);
836 }
837
838 @Override
839 public Chunk touch() {
840 return this;
841 }
842
843 @Override
844 public boolean release() {
845 if (updater.release(this)) {
846 deallocate();
847 return true;
848 }
849 return false;
850 }
851
852 @Override
853 public boolean release(int decrement) {
854 if (updater.release(this, decrement)) {
855 deallocate();
856 return true;
857 }
858 return false;
859 }
860
861 private void deallocate() {
862 Magazine mag = magazine;
863 AdaptivePoolingAllocator parent = mag.parent;
864 int chunkSize = mag.preferredChunkSize();
865 int memSize = delegate.capacity();
866 if (!pooled || memSize < chunkSize || memSize > chunkSize + (chunkSize >> 1)) {
867
868
869 detachFromMagazine();
870 delegate.release();
871 } else {
872 updater.resetRefCnt(this);
873 delegate.setIndex(0, 0);
874 allocatedBytes = 0;
875 if (!mag.trySetNextInLine(this)) {
876
877 detachFromMagazine();
878 if (!parent.offerToQueue(this)) {
879
880
881 boolean released = updater.release(this);
882 delegate.release();
883 assert released;
884 }
885 }
886 }
887 }
888
889 public void readInitInto(AdaptiveByteBuf buf, int size, int maxCapacity) {
890 int startIndex = allocatedBytes;
891 allocatedBytes = startIndex + size;
892 Chunk chunk = this;
893 chunk.retain();
894 try {
895 buf.init(delegate, chunk, 0, 0, startIndex, size, maxCapacity);
896 chunk = null;
897 } finally {
898 if (chunk != null) {
899
900
901
902 allocatedBytes = startIndex;
903 chunk.release();
904 }
905 }
906 }
907
908 public int remainingCapacity() {
909 return capacity - allocatedBytes;
910 }
911
912 public int capacity() {
913 return capacity;
914 }
915 }
916
917 static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
918
919 private final ObjectPool.Handle<AdaptiveByteBuf> handle;
920
921 private int adjustment;
922 private AbstractByteBuf rootParent;
923 Chunk chunk;
924 private int length;
925 private ByteBuffer tmpNioBuf;
926 private boolean hasArray;
927 private boolean hasMemoryAddress;
928
929 AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
930 super(0);
931 handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
932 }
933
934 void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
935 int adjustment, int capacity, int maxCapacity) {
936 this.adjustment = adjustment;
937 chunk = wrapped;
938 length = capacity;
939 maxCapacity(maxCapacity);
940 setIndex0(readerIndex, writerIndex);
941 hasArray = unwrapped.hasArray();
942 hasMemoryAddress = unwrapped.hasMemoryAddress();
943 rootParent = unwrapped;
944 tmpNioBuf = unwrapped.internalNioBuffer(adjustment, capacity).slice();
945 }
946
947 private AbstractByteBuf rootParent() {
948 final AbstractByteBuf rootParent = this.rootParent;
949 if (rootParent != null) {
950 return rootParent;
951 }
952 throw new IllegalReferenceCountException();
953 }
954
955 @Override
956 public int capacity() {
957 return length;
958 }
959
960 @Override
961 public ByteBuf capacity(int newCapacity) {
962 if (newCapacity == capacity()) {
963 ensureAccessible();
964 return this;
965 }
966 checkNewCapacity(newCapacity);
967 if (newCapacity < capacity()) {
968 length = newCapacity;
969 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
970 return this;
971 }
972
973
974 ByteBuffer data = tmpNioBuf;
975 data.clear();
976 tmpNioBuf = null;
977 Chunk chunk = this.chunk;
978 AdaptivePoolingAllocator allocator = chunk.allocator;
979 int readerIndex = this.readerIndex;
980 int writerIndex = this.writerIndex;
981 allocator.allocate(newCapacity, maxCapacity(), this);
982 tmpNioBuf.put(data);
983 tmpNioBuf.clear();
984 chunk.release();
985 this.readerIndex = readerIndex;
986 this.writerIndex = writerIndex;
987 return this;
988 }
989
990 @Override
991 public ByteBufAllocator alloc() {
992 return rootParent().alloc();
993 }
994
995 @Override
996 public ByteOrder order() {
997 return rootParent().order();
998 }
999
1000 @Override
1001 public ByteBuf unwrap() {
1002 return null;
1003 }
1004
1005 @Override
1006 public boolean isDirect() {
1007 return rootParent().isDirect();
1008 }
1009
1010 @Override
1011 public int arrayOffset() {
1012 return idx(rootParent().arrayOffset());
1013 }
1014
1015 @Override
1016 public boolean hasMemoryAddress() {
1017 return hasMemoryAddress;
1018 }
1019
1020 @Override
1021 public long memoryAddress() {
1022 ensureAccessible();
1023 return rootParent().memoryAddress() + adjustment;
1024 }
1025
1026 @Override
1027 public ByteBuffer nioBuffer(int index, int length) {
1028 checkIndex(index, length);
1029 return rootParent().nioBuffer(idx(index), length);
1030 }
1031
1032 @Override
1033 public ByteBuffer internalNioBuffer(int index, int length) {
1034 checkIndex(index, length);
1035 return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1036 }
1037
1038 private ByteBuffer internalNioBuffer() {
1039 return (ByteBuffer) tmpNioBuf.clear();
1040 }
1041
1042 @Override
1043 public ByteBuffer[] nioBuffers(int index, int length) {
1044 checkIndex(index, length);
1045 return rootParent().nioBuffers(idx(index), length);
1046 }
1047
1048 @Override
1049 public boolean hasArray() {
1050 return hasArray;
1051 }
1052
1053 @Override
1054 public byte[] array() {
1055 ensureAccessible();
1056 return rootParent().array();
1057 }
1058
1059 @Override
1060 public ByteBuf copy(int index, int length) {
1061 checkIndex(index, length);
1062 return rootParent().copy(idx(index), length);
1063 }
1064
1065 @Override
1066 public int nioBufferCount() {
1067 return rootParent().nioBufferCount();
1068 }
1069
1070 @Override
1071 protected byte _getByte(int index) {
1072 return rootParent()._getByte(idx(index));
1073 }
1074
1075 @Override
1076 protected short _getShort(int index) {
1077 return rootParent()._getShort(idx(index));
1078 }
1079
1080 @Override
1081 protected short _getShortLE(int index) {
1082 return rootParent()._getShortLE(idx(index));
1083 }
1084
1085 @Override
1086 protected int _getUnsignedMedium(int index) {
1087 return rootParent()._getUnsignedMedium(idx(index));
1088 }
1089
1090 @Override
1091 protected int _getUnsignedMediumLE(int index) {
1092 return rootParent()._getUnsignedMediumLE(idx(index));
1093 }
1094
1095 @Override
1096 protected int _getInt(int index) {
1097 return rootParent()._getInt(idx(index));
1098 }
1099
1100 @Override
1101 protected int _getIntLE(int index) {
1102 return rootParent()._getIntLE(idx(index));
1103 }
1104
1105 @Override
1106 protected long _getLong(int index) {
1107 return rootParent()._getLong(idx(index));
1108 }
1109
1110 @Override
1111 protected long _getLongLE(int index) {
1112 return rootParent()._getLongLE(idx(index));
1113 }
1114
1115 @Override
1116 public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1117 checkIndex(index, length);
1118 rootParent().getBytes(idx(index), dst, dstIndex, length);
1119 return this;
1120 }
1121
1122 @Override
1123 public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1124 checkIndex(index, length);
1125 rootParent().getBytes(idx(index), dst, dstIndex, length);
1126 return this;
1127 }
1128
1129 @Override
1130 public ByteBuf getBytes(int index, ByteBuffer dst) {
1131 checkIndex(index, dst.remaining());
1132 rootParent().getBytes(idx(index), dst);
1133 return this;
1134 }
1135
1136 @Override
1137 protected void _setByte(int index, int value) {
1138 rootParent()._setByte(idx(index), value);
1139 }
1140
1141 @Override
1142 protected void _setShort(int index, int value) {
1143 rootParent()._setShort(idx(index), value);
1144 }
1145
1146 @Override
1147 protected void _setShortLE(int index, int value) {
1148 rootParent()._setShortLE(idx(index), value);
1149 }
1150
1151 @Override
1152 protected void _setMedium(int index, int value) {
1153 rootParent()._setMedium(idx(index), value);
1154 }
1155
1156 @Override
1157 protected void _setMediumLE(int index, int value) {
1158 rootParent()._setMediumLE(idx(index), value);
1159 }
1160
1161 @Override
1162 protected void _setInt(int index, int value) {
1163 rootParent()._setInt(idx(index), value);
1164 }
1165
1166 @Override
1167 protected void _setIntLE(int index, int value) {
1168 rootParent()._setIntLE(idx(index), value);
1169 }
1170
1171 @Override
1172 protected void _setLong(int index, long value) {
1173 rootParent()._setLong(idx(index), value);
1174 }
1175
1176 @Override
1177 protected void _setLongLE(int index, long value) {
1178 rootParent().setLongLE(idx(index), value);
1179 }
1180
1181 @Override
1182 public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1183 checkIndex(index, length);
1184 rootParent().setBytes(idx(index), src, srcIndex, length);
1185 return this;
1186 }
1187
1188 @Override
1189 public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1190 checkIndex(index, length);
1191 rootParent().setBytes(idx(index), src, srcIndex, length);
1192 return this;
1193 }
1194
1195 @Override
1196 public ByteBuf setBytes(int index, ByteBuffer src) {
1197 checkIndex(index, src.remaining());
1198 rootParent().setBytes(idx(index), src);
1199 return this;
1200 }
1201
1202 @Override
1203 public ByteBuf getBytes(int index, OutputStream out, int length)
1204 throws IOException {
1205 checkIndex(index, length);
1206 if (length != 0) {
1207 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1208 }
1209 return this;
1210 }
1211
1212 @Override
1213 public int getBytes(int index, GatheringByteChannel out, int length)
1214 throws IOException {
1215 return out.write(internalNioBuffer(index, length).duplicate());
1216 }
1217
1218 @Override
1219 public int getBytes(int index, FileChannel out, long position, int length)
1220 throws IOException {
1221 return out.write(internalNioBuffer(index, length).duplicate(), position);
1222 }
1223
1224 @Override
1225 public int setBytes(int index, InputStream in, int length)
1226 throws IOException {
1227 checkIndex(index, length);
1228 final AbstractByteBuf rootParent = rootParent();
1229 if (rootParent.hasArray()) {
1230 return rootParent.setBytes(idx(index), in, length);
1231 }
1232 byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1233 int readBytes = in.read(tmp, 0, length);
1234 if (readBytes <= 0) {
1235 return readBytes;
1236 }
1237 setBytes(index, tmp, 0, readBytes);
1238 return readBytes;
1239 }
1240
1241 @Override
1242 public int setBytes(int index, ScatteringByteChannel in, int length)
1243 throws IOException {
1244 try {
1245 return in.read(internalNioBuffer(index, length).duplicate());
1246 } catch (ClosedChannelException ignored) {
1247 return -1;
1248 }
1249 }
1250
1251 @Override
1252 public int setBytes(int index, FileChannel in, long position, int length)
1253 throws IOException {
1254 try {
1255 return in.read(internalNioBuffer(index, length).duplicate(), position);
1256 } catch (ClosedChannelException ignored) {
1257 return -1;
1258 }
1259 }
1260
1261 @Override
1262 public int forEachByte(int index, int length, ByteProcessor processor) {
1263 checkIndex(index, length);
1264 int ret = rootParent().forEachByte(idx(index), length, processor);
1265 return forEachResult(ret);
1266 }
1267
1268 @Override
1269 public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1270 checkIndex(index, length);
1271 int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1272 return forEachResult(ret);
1273 }
1274
1275 private int forEachResult(int ret) {
1276 if (ret < adjustment) {
1277 return -1;
1278 }
1279 return ret - adjustment;
1280 }
1281
1282 @Override
1283 public boolean isContiguous() {
1284 return rootParent().isContiguous();
1285 }
1286
1287 private int idx(int index) {
1288 return index + adjustment;
1289 }
1290
1291 @Override
1292 protected void deallocate() {
1293 if (chunk != null) {
1294 chunk.release();
1295 }
1296 tmpNioBuf = null;
1297 chunk = null;
1298 rootParent = null;
1299 if (handle instanceof EnhancedHandle) {
1300 EnhancedHandle<AdaptiveByteBuf> enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1301 enhancedHandle.unguardedRecycle(this);
1302 } else {
1303 handle.recycle(this);
1304 }
1305 }
1306 }
1307
1308
1309
1310
1311 interface ChunkAllocator {
1312
1313
1314
1315
1316
1317
1318 AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1319 }
1320 }