1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.ByteProcessor;
19 import io.netty.util.IllegalReferenceCountException;
20 import io.netty.util.NettyRuntime;
21 import io.netty.util.Recycler;
22 import io.netty.util.ReferenceCounted;
23 import io.netty.util.concurrent.FastThreadLocal;
24 import io.netty.util.concurrent.FastThreadLocalThread;
25 import io.netty.util.internal.ObjectPool;
26 import io.netty.util.internal.ObjectUtil;
27 import io.netty.util.internal.PlatformDependent;
28 import io.netty.util.internal.ReferenceCountUpdater;
29 import io.netty.util.internal.SystemPropertyUtil;
30 import io.netty.util.internal.ThreadExecutorMap;
31 import io.netty.util.internal.UnstableApi;
32
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.io.OutputStream;
36 import java.nio.ByteBuffer;
37 import java.nio.ByteOrder;
38 import java.nio.channels.ClosedChannelException;
39 import java.nio.channels.FileChannel;
40 import java.nio.channels.GatheringByteChannel;
41 import java.nio.channels.ScatteringByteChannel;
42 import java.util.Arrays;
43 import java.util.Queue;
44 import java.util.Set;
45 import java.util.concurrent.ConcurrentLinkedQueue;
46 import java.util.concurrent.CopyOnWriteArraySet;
47 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
48 import java.util.concurrent.atomic.AtomicLong;
49 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
50 import java.util.concurrent.locks.StampedLock;
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 @UnstableApi
79 final class AdaptivePoolingAllocator {
80
81 enum MagazineCaching {
82 EventLoopThreads,
83 FastThreadLocalThreads,
84 None
85 }
86
87 private static final int EXPANSION_ATTEMPTS = 3;
88 private static final int INITIAL_MAGAZINES = 4;
89 private static final int RETIRE_CAPACITY = 4 * 1024;
90 private static final int MIN_CHUNK_SIZE = 128 * 1024;
91 private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
92 private static final int BUFS_PER_CHUNK = 10;
93
94
95
96
97
98
99 private static final int MAX_CHUNK_SIZE =
100 BUFS_PER_CHUNK * (1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT);
101
102
103
104
105
106
107
108
109
110 private static final int CENTRAL_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
111 "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors());
112
113 private static final Object NO_MAGAZINE = Boolean.TRUE;
114
115 private final ChunkAllocator chunkAllocator;
116 private final Queue<Chunk> centralQueue;
117 private final StampedLock magazineExpandLock;
118 private volatile Magazine[] magazines;
119 private final FastThreadLocal<Object> threadLocalMagazine;
120 private final Set<Magazine> liveCachedMagazines;
121 private volatile boolean freed;
122
123 AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
124 ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
125 ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
126 this.chunkAllocator = chunkAllocator;
127 centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
128 magazineExpandLock = new StampedLock();
129 if (magazineCaching != MagazineCaching.None) {
130 assert magazineCaching == MagazineCaching.EventLoopThreads ||
131 magazineCaching == MagazineCaching.FastThreadLocalThreads;
132 final boolean cachedMagazinesNonEventLoopThreads =
133 magazineCaching == MagazineCaching.FastThreadLocalThreads;
134 final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
135 threadLocalMagazine = new FastThreadLocal<Object>() {
136 @Override
137 protected Object initialValue() {
138 if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
139 Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
140
141 if (FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread())) {
142
143
144 liveMagazines.add(mag);
145 }
146 return mag;
147 }
148 return NO_MAGAZINE;
149 }
150
151 @Override
152 protected void onRemoval(final Object value) throws Exception {
153 if (value != NO_MAGAZINE) {
154 liveMagazines.remove(value);
155 }
156 }
157 };
158 liveCachedMagazines = liveMagazines;
159 } else {
160 threadLocalMagazine = null;
161 liveCachedMagazines = null;
162 }
163 Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
164 for (int i = 0; i < mags.length; i++) {
165 mags[i] = new Magazine(this);
166 }
167 magazines = mags;
168 }
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190 private static Queue<Chunk> createSharedChunkQueue() {
191 return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
192 }
193
194 ByteBuf allocate(int size, int maxCapacity) {
195 if (size <= MAX_CHUNK_SIZE) {
196 Thread currentThread = Thread.currentThread();
197 boolean willCleanupFastThreadLocals = FastThreadLocalThread.willCleanupFastThreadLocals(currentThread);
198 AdaptiveByteBuf buf = AdaptiveByteBuf.newInstance(willCleanupFastThreadLocals);
199 try {
200 if (allocate(size, maxCapacity, currentThread, buf)) {
201
202 AdaptiveByteBuf result = buf;
203 buf = null;
204 return result;
205 }
206 } finally {
207 if (buf != null) {
208
209 buf.release();
210 }
211 }
212 }
213
214 return chunkAllocator.allocate(size, maxCapacity);
215 }
216
217 private boolean allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
218 int sizeBucket = AllocationStatistics.sizeBucket(size);
219 FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
220 if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
221 Object mag = threadLocalMagazine.get();
222 if (mag != NO_MAGAZINE) {
223 boolean allocated = ((Magazine) mag).tryAllocate(size, sizeBucket, maxCapacity, buf);
224 assert allocated : "Allocation of threadLocalMagazine must always succeed";
225 return true;
226 }
227 }
228 long threadId = currentThread.getId();
229 Magazine[] mags;
230 int expansions = 0;
231 do {
232 mags = magazines;
233 int mask = mags.length - 1;
234 int index = (int) (threadId & mask);
235 for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
236 Magazine mag = mags[index + i & mask];
237 if (mag.tryAllocate(size, sizeBucket, maxCapacity, buf)) {
238
239 return true;
240 }
241 }
242 expansions++;
243 } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
244 return false;
245 }
246
247
248
249
250 void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
251 Magazine magazine = into.chunk.magazine;
252 if (!allocate(size, maxCapacity, Thread.currentThread(), into)) {
253
254 AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
255 Chunk chunk = new Chunk(innerChunk, magazine, false);
256 try {
257 chunk.readInitInto(into, size, maxCapacity);
258 } finally {
259
260
261
262 chunk.release();
263 }
264 }
265 }
266
267 long usedMemory() {
268 long sum = 0;
269 for (Chunk chunk : centralQueue) {
270 sum += chunk.capacity();
271 }
272 for (Magazine magazine : magazines) {
273 sum += magazine.usedMemory.get();
274 }
275 if (liveCachedMagazines != null) {
276 for (Magazine magazine : liveCachedMagazines) {
277 sum += magazine.usedMemory.get();
278 }
279 }
280 return sum;
281 }
282
283 private boolean tryExpandMagazines(int currentLength) {
284 if (currentLength >= MAX_STRIPES) {
285 return true;
286 }
287 long writeLock = magazineExpandLock.tryWriteLock();
288 if (writeLock != 0) {
289 try {
290 Magazine[] mags = magazines;
291 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
292 return true;
293 }
294 int preferredChunkSize = mags[0].sharedPrefChunkSize;
295 Magazine[] expanded = new Magazine[mags.length * 2];
296 for (int i = 0, l = expanded.length; i < l; i++) {
297 Magazine m = new Magazine(this);
298 m.localPrefChunkSize = preferredChunkSize;
299 m.sharedPrefChunkSize = preferredChunkSize;
300 expanded[i] = m;
301 }
302 magazines = expanded;
303 for (Magazine magazine : mags) {
304 magazine.free();
305 }
306 } finally {
307 magazineExpandLock.unlockWrite(writeLock);
308 }
309 }
310 return true;
311 }
312
313 private boolean offerToQueue(Chunk buffer) {
314 if (freed) {
315 return false;
316 }
317 return centralQueue.offer(buffer);
318 }
319
320
321
322
323 @Override
324 protected void finalize() throws Throwable {
325 try {
326 super.finalize();
327 } finally {
328 free();
329 }
330 }
331
332 private void free() {
333 freed = true;
334 long stamp = magazineExpandLock.writeLock();
335 try {
336 Magazine[] mags = magazines;
337 for (Magazine magazine : mags) {
338 magazine.free();
339 }
340 } finally {
341 magazineExpandLock.unlockWrite(stamp);
342 }
343 for (;;) {
344 Chunk chunk = centralQueue.poll();
345 if (chunk == null) {
346 break;
347 }
348 chunk.release();
349 }
350 }
351
352 static int sizeBucket(int size) {
353 return AllocationStatistics.sizeBucket(size);
354 }
355
356 @SuppressWarnings("checkstyle:finalclass")
357 private static class AllocationStatistics {
358 private static final int MIN_DATUM_TARGET = 1024;
359 private static final int MAX_DATUM_TARGET = 65534;
360 private static final int INIT_DATUM_TARGET = 9;
361 private static final int HISTO_MIN_BUCKET_SHIFT = 13;
362 private static final int HISTO_MAX_BUCKET_SHIFT = 20;
363 private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT;
364 private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
365 private static final int SIZE_MAX_MASK = MAX_CHUNK_SIZE - 1;
366
367 protected final AdaptivePoolingAllocator parent;
368 private final boolean shareable;
369 private final short[][] histos = {
370 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
371 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
372 };
373 private short[] histo = histos[0];
374 private final int[] sums = new int[HISTO_BUCKET_COUNT];
375
376 private int histoIndex;
377 private int datumCount;
378 private int datumTarget = INIT_DATUM_TARGET;
379 protected volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
380 protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
381
382 private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
383 this.parent = parent;
384 this.shareable = shareable;
385 }
386
387 protected void recordAllocationSize(int bucket) {
388 histo[bucket]++;
389 if (datumCount++ == datumTarget) {
390 rotateHistograms();
391 }
392 }
393
394 static int sizeBucket(int size) {
395 if (size == 0) {
396 return 0;
397 }
398
399
400
401
402 int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & SIZE_MAX_MASK;
403 return Math.min(Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize), HISTO_MAX_BUCKET_MASK);
404 }
405
406 private void rotateHistograms() {
407 short[][] hs = histos;
408 for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
409 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
410 }
411 int sum = 0;
412 for (int count : sums) {
413 sum += count;
414 }
415 int targetPercentile = (int) (sum * 0.99);
416 int sizeBucket = 0;
417 for (; sizeBucket < sums.length; sizeBucket++) {
418 if (sums[sizeBucket] > targetPercentile) {
419 break;
420 }
421 targetPercentile -= sums[sizeBucket];
422 }
423 int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
424 int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
425 localPrefChunkSize = prefChunkSize;
426 if (shareable) {
427 for (Magazine mag : parent.magazines) {
428 prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
429 }
430 }
431 if (sharedPrefChunkSize != prefChunkSize) {
432
433 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
434 sharedPrefChunkSize = prefChunkSize;
435 } else {
436
437 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
438 }
439
440 histoIndex = histoIndex + 1 & 3;
441 histo = histos[histoIndex];
442 datumCount = 0;
443 Arrays.fill(histo, (short) 0);
444 }
445
446
447
448
449
450
451
452
453
454 protected int preferredChunkSize() {
455 return sharedPrefChunkSize;
456 }
457 }
458
459 private static final class Magazine extends AllocationStatistics {
460 private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
461 static {
462 NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
463 }
464 private static final Chunk MAGAZINE_FREED = new Chunk();
465
466 private Chunk current;
467 @SuppressWarnings("unused")
468 private volatile Chunk nextInLine;
469 private final AtomicLong usedMemory;
470 private final StampedLock allocationLock;
471
472 Magazine(AdaptivePoolingAllocator parent) {
473 this(parent, true);
474 }
475
476 Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
477 super(parent, shareable);
478
479
480 if (shareable) {
481 allocationLock = new StampedLock();
482 } else {
483 allocationLock = null;
484 }
485 usedMemory = new AtomicLong();
486 }
487
488 public boolean tryAllocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
489 if (allocationLock == null) {
490
491 return allocate(size, sizeBucket, maxCapacity, buf);
492 }
493
494
495 long writeLock = allocationLock.tryWriteLock();
496 if (writeLock != 0) {
497 try {
498 return allocate(size, sizeBucket, maxCapacity, buf);
499 } finally {
500 allocationLock.unlockWrite(writeLock);
501 }
502 }
503 return false;
504 }
505
506 private boolean allocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
507 recordAllocationSize(sizeBucket);
508 Chunk curr = current;
509 if (curr != null) {
510 if (curr.remainingCapacity() > size) {
511 curr.readInitInto(buf, size, maxCapacity);
512
513 return true;
514 }
515
516
517
518 current = null;
519 if (curr.remainingCapacity() == size) {
520 try {
521 curr.readInitInto(buf, size, maxCapacity);
522 return true;
523 } finally {
524 curr.release();
525 }
526 }
527 }
528 Chunk last = curr;
529 assert current == null;
530
531
532
533
534
535
536
537 if (nextInLine != null) {
538 curr = NEXT_IN_LINE.getAndSet(this, null);
539 if (curr == MAGAZINE_FREED) {
540
541 restoreMagazineFreed();
542 return false;
543 }
544 } else {
545 curr = parent.centralQueue.poll();
546 if (curr == null) {
547 curr = newChunkAllocation(size);
548 }
549 }
550 current = curr;
551 assert current != null;
552 if (last != null) {
553 if (last.remainingCapacity() < RETIRE_CAPACITY) {
554 last.release();
555 } else {
556 transferChunk(last);
557 }
558 }
559 if (curr.remainingCapacity() > size) {
560 curr.readInitInto(buf, size, maxCapacity);
561 } else if (curr.remainingCapacity() == size) {
562 try {
563 curr.readInitInto(buf, size, maxCapacity);
564 } finally {
565
566
567 curr.release();
568 current = null;
569 }
570 } else {
571 Chunk newChunk = newChunkAllocation(size);
572 try {
573 newChunk.readInitInto(buf, size, maxCapacity);
574 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
575 curr.release();
576 current = newChunk;
577 } else {
578 transferChunk(newChunk);
579 }
580 newChunk = null;
581 } finally {
582 if (newChunk != null) {
583 assert current == null;
584
585 newChunk.release();
586 }
587 }
588 }
589 return true;
590 }
591
592 private void restoreMagazineFreed() {
593 Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
594 if (next != null && next != MAGAZINE_FREED) {
595 next.release();
596 }
597 }
598
599 private void transferChunk(Chunk current) {
600 if (NEXT_IN_LINE.compareAndSet(this, null, current)
601 || parent.offerToQueue(current)) {
602 return;
603 }
604 Chunk nextChunk = NEXT_IN_LINE.get(this);
605 if (nextChunk != null && current.remainingCapacity() > nextChunk.remainingCapacity()) {
606 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, current)) {
607 if (nextChunk != MAGAZINE_FREED) {
608 nextChunk.release();
609 }
610 return;
611 }
612 }
613
614
615 current.release();
616 }
617
618 private Chunk newChunkAllocation(int promptingSize) {
619 int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
620 ChunkAllocator chunkAllocator = parent.chunkAllocator;
621 return new Chunk(chunkAllocator.allocate(size, size), this, true);
622 }
623
624 boolean trySetNextInLine(Chunk chunk) {
625 return NEXT_IN_LINE.compareAndSet(this, null, chunk);
626 }
627
628 void free() {
629
630 restoreMagazineFreed();
631 long stamp = allocationLock.writeLock();
632 try {
633 if (current != null) {
634 current.release();
635 current = null;
636 }
637 } finally {
638 allocationLock.unlockWrite(stamp);
639 }
640 }
641 }
642
643 private static final class Chunk implements ReferenceCounted {
644
645 private final AbstractByteBuf delegate;
646 private final Magazine magazine;
647 private final int capacity;
648 private final boolean pooled;
649 private int allocatedBytes;
650 private static final long REFCNT_FIELD_OFFSET =
651 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
652 private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
653 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
654
655 private static final ReferenceCountUpdater<Chunk> updater =
656 new ReferenceCountUpdater<Chunk>() {
657 @Override
658 protected AtomicIntegerFieldUpdater<Chunk> updater() {
659 return AIF_UPDATER;
660 }
661 @Override
662 protected long unsafeOffset() {
663 return REFCNT_FIELD_OFFSET;
664 }
665 };
666
667
668 @SuppressWarnings({"unused", "FieldMayBeFinal"})
669 private volatile int refCnt;
670
671 Chunk() {
672
673 delegate = null;
674 magazine = null;
675 capacity = 0;
676 pooled = false;
677 }
678
679 Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
680 this.delegate = delegate;
681 this.magazine = magazine;
682 this.pooled = pooled;
683 capacity = delegate.capacity();
684 magazine.usedMemory.getAndAdd(capacity);
685 updater.setInitialValue(this);
686 }
687
688 @Override
689 public Chunk touch(Object hint) {
690 return this;
691 }
692
693 @Override
694 public int refCnt() {
695 return updater.refCnt(this);
696 }
697
698 @Override
699 public Chunk retain() {
700 return updater.retain(this);
701 }
702
703 @Override
704 public Chunk retain(int increment) {
705 return updater.retain(this, increment);
706 }
707
708 @Override
709 public Chunk touch() {
710 return this;
711 }
712
713 @Override
714 public boolean release() {
715 if (updater.release(this)) {
716 deallocate();
717 return true;
718 }
719 return false;
720 }
721
722 @Override
723 public boolean release(int decrement) {
724 if (updater.release(this, decrement)) {
725 deallocate();
726 return true;
727 }
728 return false;
729 }
730
731 private void deallocate() {
732 Magazine mag = magazine;
733 AdaptivePoolingAllocator parent = mag.parent;
734 int chunkSize = mag.preferredChunkSize();
735 int memSize = delegate.capacity();
736 if (!pooled || memSize < chunkSize || memSize > chunkSize + (chunkSize >> 1)) {
737
738
739 mag.usedMemory.getAndAdd(-capacity());
740 delegate.release();
741 } else {
742 updater.resetRefCnt(this);
743 delegate.setIndex(0, 0);
744 allocatedBytes = 0;
745 if (!mag.trySetNextInLine(this)) {
746 if (!parent.offerToQueue(this)) {
747
748
749 boolean released = updater.release(this);
750 delegate.release();
751 assert released;
752 }
753 }
754 }
755 }
756
757 public void readInitInto(AdaptiveByteBuf buf, int size, int maxCapacity) {
758 int startIndex = allocatedBytes;
759 allocatedBytes = startIndex + size;
760 Chunk chunk = this;
761 chunk.retain();
762 try {
763 buf.init(delegate, chunk, 0, 0, startIndex, size, maxCapacity);
764 chunk = null;
765 } finally {
766 if (chunk != null) {
767
768
769 chunk.release();
770 }
771 }
772 }
773
774 public int remainingCapacity() {
775 return capacity - allocatedBytes;
776 }
777
778 public int capacity() {
779 return capacity;
780 }
781 }
782
783 static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
784 static final ObjectPool<AdaptiveByteBuf> RECYCLER = ObjectPool.newPool(
785 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
786 @Override
787 public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
788 return new AdaptiveByteBuf(handle);
789 }
790 });
791
792 static AdaptiveByteBuf newInstance(boolean useThreadLocal) {
793 if (useThreadLocal) {
794 AdaptiveByteBuf buf = RECYCLER.get();
795 buf.resetRefCnt();
796 buf.discardMarks();
797 return buf;
798 }
799 return new AdaptiveByteBuf(null);
800 }
801
802 private final ObjectPool.Handle<AdaptiveByteBuf> handle;
803
804 private int adjustment;
805 private AbstractByteBuf rootParent;
806 private Chunk chunk;
807 private int length;
808 private ByteBuffer tmpNioBuf;
809 private boolean hasArray;
810 private boolean hasMemoryAddress;
811
812 AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
813 super(0);
814 handle = recyclerHandle;
815 }
816
817 void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
818 int adjustment, int capacity, int maxCapacity) {
819 this.adjustment = adjustment;
820 chunk = wrapped;
821 length = capacity;
822 maxCapacity(maxCapacity);
823 setIndex0(readerIndex, writerIndex);
824 hasArray = unwrapped.hasArray();
825 hasMemoryAddress = unwrapped.hasMemoryAddress();
826 rootParent = unwrapped;
827 tmpNioBuf = unwrapped.internalNioBuffer(adjustment, capacity).slice();
828 }
829
830 private AbstractByteBuf rootParent() {
831 final AbstractByteBuf rootParent = this.rootParent;
832 if (rootParent != null) {
833 return rootParent;
834 }
835 throw new IllegalReferenceCountException();
836 }
837
838 @Override
839 public ByteBuf retainedSlice() {
840 if (handle == null) {
841 return super.retainedSlice();
842 }
843
844 ensureAccessible();
845 return PooledSlicedByteBuf.newInstance(this, this, readerIndex(), readableBytes());
846 }
847
848 @Override
849 public ByteBuf retainedSlice(int index, int length) {
850 if (handle == null) {
851 return super.retainedSlice(index, length);
852 }
853
854 ensureAccessible();
855 return PooledSlicedByteBuf.newInstance(this, this, index, length);
856 }
857
858 @Override
859 public ByteBuf retainedDuplicate() {
860 if (handle == null) {
861 return super.retainedDuplicate();
862 }
863
864 ensureAccessible();
865 return PooledDuplicatedByteBuf.newInstance(this, this, readerIndex(), writerIndex());
866 }
867
868 @Override
869 public int capacity() {
870 return length;
871 }
872
873 @Override
874 public ByteBuf capacity(int newCapacity) {
875 if (newCapacity == capacity()) {
876 ensureAccessible();
877 return this;
878 }
879 checkNewCapacity(newCapacity);
880 if (newCapacity < capacity()) {
881 length = newCapacity;
882 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
883 return this;
884 }
885
886
887 ByteBuffer data = tmpNioBuf;
888 data.clear();
889 tmpNioBuf = null;
890 Chunk chunk = this.chunk;
891 Magazine magazine = chunk.magazine;
892 AdaptivePoolingAllocator allocator = magazine.parent;
893 int readerIndex = this.readerIndex;
894 int writerIndex = this.writerIndex;
895 allocator.allocate(newCapacity, maxCapacity(), this);
896 tmpNioBuf.put(data);
897 tmpNioBuf.clear();
898 chunk.release();
899 this.readerIndex = readerIndex;
900 this.writerIndex = writerIndex;
901 return this;
902 }
903
904 @Override
905 public ByteBufAllocator alloc() {
906 return rootParent().alloc();
907 }
908
909 @Override
910 public ByteOrder order() {
911 return rootParent().order();
912 }
913
914 @Override
915 public ByteBuf unwrap() {
916 return null;
917 }
918
919 @Override
920 public boolean isDirect() {
921 return rootParent().isDirect();
922 }
923
924 @Override
925 public int arrayOffset() {
926 return idx(rootParent().arrayOffset());
927 }
928
929 @Override
930 public boolean hasMemoryAddress() {
931 return hasMemoryAddress;
932 }
933
934 @Override
935 public long memoryAddress() {
936 ensureAccessible();
937 return rootParent().memoryAddress() + adjustment;
938 }
939
940 @Override
941 public ByteBuffer nioBuffer(int index, int length) {
942 checkIndex(index, length);
943 return rootParent().nioBuffer(idx(index), length);
944 }
945
946 @Override
947 public ByteBuffer internalNioBuffer(int index, int length) {
948 checkIndex(index, length);
949 return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
950 }
951
952 private ByteBuffer internalNioBuffer() {
953 return (ByteBuffer) tmpNioBuf.clear();
954 }
955
956 @Override
957 public ByteBuffer[] nioBuffers(int index, int length) {
958 checkIndex(index, length);
959 return rootParent().nioBuffers(idx(index), length);
960 }
961
962 @Override
963 public boolean hasArray() {
964 return hasArray;
965 }
966
967 @Override
968 public byte[] array() {
969 ensureAccessible();
970 return rootParent().array();
971 }
972
973 @Override
974 public ByteBuf copy(int index, int length) {
975 checkIndex(index, length);
976 return rootParent().copy(idx(index), length);
977 }
978
979 @Override
980 public int nioBufferCount() {
981 return rootParent().nioBufferCount();
982 }
983
984 @Override
985 protected byte _getByte(int index) {
986 return rootParent()._getByte(idx(index));
987 }
988
989 @Override
990 protected short _getShort(int index) {
991 return rootParent()._getShort(idx(index));
992 }
993
994 @Override
995 protected short _getShortLE(int index) {
996 return rootParent()._getShortLE(idx(index));
997 }
998
999 @Override
1000 protected int _getUnsignedMedium(int index) {
1001 return rootParent()._getUnsignedMedium(idx(index));
1002 }
1003
1004 @Override
1005 protected int _getUnsignedMediumLE(int index) {
1006 return rootParent()._getUnsignedMediumLE(idx(index));
1007 }
1008
1009 @Override
1010 protected int _getInt(int index) {
1011 return rootParent()._getInt(idx(index));
1012 }
1013
1014 @Override
1015 protected int _getIntLE(int index) {
1016 return rootParent()._getIntLE(idx(index));
1017 }
1018
1019 @Override
1020 protected long _getLong(int index) {
1021 return rootParent()._getLong(idx(index));
1022 }
1023
1024 @Override
1025 protected long _getLongLE(int index) {
1026 return rootParent()._getLongLE(idx(index));
1027 }
1028
1029 @Override
1030 public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1031 checkIndex(index, length);
1032 rootParent().getBytes(idx(index), dst, dstIndex, length);
1033 return this;
1034 }
1035
1036 @Override
1037 public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1038 checkIndex(index, length);
1039 rootParent().getBytes(idx(index), dst, dstIndex, length);
1040 return this;
1041 }
1042
1043 @Override
1044 public ByteBuf getBytes(int index, ByteBuffer dst) {
1045 checkIndex(index, dst.remaining());
1046 rootParent().getBytes(idx(index), dst);
1047 return this;
1048 }
1049
1050 @Override
1051 protected void _setByte(int index, int value) {
1052 rootParent()._setByte(idx(index), value);
1053 }
1054
1055 @Override
1056 protected void _setShort(int index, int value) {
1057 rootParent()._setShort(idx(index), value);
1058 }
1059
1060 @Override
1061 protected void _setShortLE(int index, int value) {
1062 rootParent()._setShortLE(idx(index), value);
1063 }
1064
1065 @Override
1066 protected void _setMedium(int index, int value) {
1067 rootParent()._setMedium(idx(index), value);
1068 }
1069
1070 @Override
1071 protected void _setMediumLE(int index, int value) {
1072 rootParent()._setMediumLE(idx(index), value);
1073 }
1074
1075 @Override
1076 protected void _setInt(int index, int value) {
1077 rootParent()._setInt(idx(index), value);
1078 }
1079
1080 @Override
1081 protected void _setIntLE(int index, int value) {
1082 rootParent()._setIntLE(idx(index), value);
1083 }
1084
1085 @Override
1086 protected void _setLong(int index, long value) {
1087 rootParent()._setLong(idx(index), value);
1088 }
1089
1090 @Override
1091 protected void _setLongLE(int index, long value) {
1092 rootParent().setLongLE(idx(index), value);
1093 }
1094
1095 @Override
1096 public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1097 checkIndex(index, length);
1098 rootParent().setBytes(idx(index), src, srcIndex, length);
1099 return this;
1100 }
1101
1102 @Override
1103 public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1104 checkIndex(index, length);
1105 rootParent().setBytes(idx(index), src, srcIndex, length);
1106 return this;
1107 }
1108
1109 @Override
1110 public ByteBuf setBytes(int index, ByteBuffer src) {
1111 checkIndex(index, src.remaining());
1112 rootParent().setBytes(idx(index), src);
1113 return this;
1114 }
1115
1116 @Override
1117 public ByteBuf getBytes(int index, OutputStream out, int length)
1118 throws IOException {
1119 checkIndex(index, length);
1120 if (length != 0) {
1121 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1122 }
1123 return this;
1124 }
1125
1126 @Override
1127 public int getBytes(int index, GatheringByteChannel out, int length)
1128 throws IOException {
1129 return out.write(internalNioBuffer(index, length).duplicate());
1130 }
1131
1132 @Override
1133 public int getBytes(int index, FileChannel out, long position, int length)
1134 throws IOException {
1135 return out.write(internalNioBuffer(index, length).duplicate(), position);
1136 }
1137
1138 @Override
1139 public int setBytes(int index, InputStream in, int length)
1140 throws IOException {
1141 checkIndex(index, length);
1142 final AbstractByteBuf rootParent = rootParent();
1143 if (rootParent.hasArray()) {
1144 return rootParent.setBytes(idx(index), in, length);
1145 }
1146 byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1147 int readBytes = in.read(tmp, 0, length);
1148 if (readBytes <= 0) {
1149 return readBytes;
1150 }
1151 setBytes(index, tmp, 0, readBytes);
1152 return readBytes;
1153 }
1154
1155 @Override
1156 public int setBytes(int index, ScatteringByteChannel in, int length)
1157 throws IOException {
1158 try {
1159 return in.read(internalNioBuffer(index, length).duplicate());
1160 } catch (ClosedChannelException ignored) {
1161 return -1;
1162 }
1163 }
1164
1165 @Override
1166 public int setBytes(int index, FileChannel in, long position, int length)
1167 throws IOException {
1168 try {
1169 return in.read(internalNioBuffer(index, length).duplicate(), position);
1170 } catch (ClosedChannelException ignored) {
1171 return -1;
1172 }
1173 }
1174
1175 @Override
1176 public int forEachByte(int index, int length, ByteProcessor processor) {
1177 checkIndex(index, length);
1178 int ret = rootParent().forEachByte(idx(index), length, processor);
1179 return forEachResult(ret);
1180 }
1181
1182 @Override
1183 public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1184 checkIndex(index, length);
1185 int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1186 return forEachResult(ret);
1187 }
1188
1189 private int forEachResult(int ret) {
1190 if (ret < adjustment) {
1191 return -1;
1192 }
1193 return ret - adjustment;
1194 }
1195
1196 @Override
1197 public boolean isContiguous() {
1198 return rootParent().isContiguous();
1199 }
1200
1201 private int idx(int index) {
1202 return index + adjustment;
1203 }
1204
1205 @Override
1206 protected void deallocate() {
1207 if (chunk != null) {
1208 chunk.release();
1209 }
1210 tmpNioBuf = null;
1211 chunk = null;
1212 rootParent = null;
1213 if (handle instanceof Recycler.EnhancedHandle) {
1214 ((Recycler.EnhancedHandle<?>) handle).unguardedRecycle(this);
1215 } else if (handle != null) {
1216 handle.recycle(this);
1217 }
1218 }
1219 }
1220
1221
1222
1223
1224 interface ChunkAllocator {
1225
1226
1227
1228
1229
1230
1231 AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1232 }
1233 }