1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.ByteProcessor;
19 import io.netty.util.IllegalReferenceCountException;
20 import io.netty.util.NettyRuntime;
21 import io.netty.util.Recycler.EnhancedHandle;
22 import io.netty.util.ReferenceCounted;
23 import io.netty.util.concurrent.FastThreadLocal;
24 import io.netty.util.concurrent.FastThreadLocalThread;
25 import io.netty.util.internal.ObjectPool;
26 import io.netty.util.internal.ObjectUtil;
27 import io.netty.util.internal.PlatformDependent;
28 import io.netty.util.internal.ReferenceCountUpdater;
29 import io.netty.util.internal.SystemPropertyUtil;
30 import io.netty.util.internal.ThreadExecutorMap;
31 import io.netty.util.internal.UnstableApi;
32
33 import java.io.IOException;
34 import java.io.InputStream;
35 import java.io.OutputStream;
36 import java.nio.ByteBuffer;
37 import java.nio.ByteOrder;
38 import java.nio.channels.ClosedChannelException;
39 import java.nio.channels.FileChannel;
40 import java.nio.channels.GatheringByteChannel;
41 import java.nio.channels.ScatteringByteChannel;
42 import java.util.Arrays;
43 import java.util.Queue;
44 import java.util.Set;
45 import java.util.concurrent.ConcurrentLinkedQueue;
46 import java.util.concurrent.CopyOnWriteArraySet;
47 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
48 import java.util.concurrent.atomic.AtomicLong;
49 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
50 import java.util.concurrent.locks.StampedLock;
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 @UnstableApi
79 final class AdaptivePoolingAllocator {
80
81 enum MagazineCaching {
82 EventLoopThreads,
83 FastThreadLocalThreads,
84 None
85 }
86
87 private static final int EXPANSION_ATTEMPTS = 3;
88 private static final int INITIAL_MAGAZINES = 4;
89 private static final int RETIRE_CAPACITY = 4 * 1024;
90 private static final int MIN_CHUNK_SIZE = 128 * 1024;
91 private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
92 private static final int BUFS_PER_CHUNK = 10;
93
94
95
96
97
98
99 private static final int MAX_CHUNK_SIZE =
100 BUFS_PER_CHUNK * (1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT);
101
102
103
104
105
106
107
108
109
110 private static final int CENTRAL_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
111 "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors());
112
113
114
115
116
117 private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
118 "io.netty.allocator.magazineBufferQueueCapacity", 1024);
119
120 private static final Object NO_MAGAZINE = Boolean.TRUE;
121
122 private final ChunkAllocator chunkAllocator;
123 private final Queue<Chunk> centralQueue;
124 private final StampedLock magazineExpandLock;
125 private volatile Magazine[] magazines;
126 private final FastThreadLocal<Object> threadLocalMagazine;
127 private final Set<Magazine> liveCachedMagazines;
128 private volatile boolean freed;
129
130 static {
131 if (CENTRAL_QUEUE_CAPACITY < 2) {
132 throw new IllegalArgumentException("CENTRAL_QUEUE_CAPACITY: " + CENTRAL_QUEUE_CAPACITY
133 + " (expected: >= " + 2 + ')');
134 }
135 if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
136 throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
137 + " (expected: >= " + 2 + ')');
138 }
139 }
140
141 AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
142 ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
143 ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
144 this.chunkAllocator = chunkAllocator;
145 centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
146 magazineExpandLock = new StampedLock();
147 if (magazineCaching != MagazineCaching.None) {
148 assert magazineCaching == MagazineCaching.EventLoopThreads ||
149 magazineCaching == MagazineCaching.FastThreadLocalThreads;
150 final boolean cachedMagazinesNonEventLoopThreads =
151 magazineCaching == MagazineCaching.FastThreadLocalThreads;
152 final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
153 threadLocalMagazine = new FastThreadLocal<Object>() {
154 @Override
155 protected Object initialValue() {
156 if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
157 if (!FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread())) {
158
159 return NO_MAGAZINE;
160 }
161 Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
162 liveMagazines.add(mag);
163 return mag;
164 }
165 return NO_MAGAZINE;
166 }
167
168 @Override
169 protected void onRemoval(final Object value) throws Exception {
170 if (value != NO_MAGAZINE) {
171 liveMagazines.remove(value);
172 }
173 }
174 };
175 liveCachedMagazines = liveMagazines;
176 } else {
177 threadLocalMagazine = null;
178 liveCachedMagazines = null;
179 }
180 Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
181 for (int i = 0; i < mags.length; i++) {
182 mags[i] = new Magazine(this);
183 }
184 magazines = mags;
185 }
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207 private static Queue<Chunk> createSharedChunkQueue() {
208 return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
209 }
210
211 ByteBuf allocate(int size, int maxCapacity) {
212 return allocate(size, maxCapacity, Thread.currentThread(), null);
213 }
214
215 private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
216 if (size <= MAX_CHUNK_SIZE) {
217 int sizeBucket = AllocationStatistics.sizeBucket(size);
218 FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
219 if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
220 Object mag = threadLocalMagazine.get();
221 if (mag != NO_MAGAZINE) {
222 Magazine magazine = (Magazine) mag;
223 if (buf == null) {
224 buf = magazine.newBuffer();
225 }
226 boolean allocated = magazine.tryAllocate(size, sizeBucket, maxCapacity, buf);
227 assert allocated : "Allocation of threadLocalMagazine must always succeed";
228 return buf;
229 }
230 }
231 long threadId = currentThread.getId();
232 Magazine[] mags;
233 int expansions = 0;
234 do {
235 mags = magazines;
236 int mask = mags.length - 1;
237 int index = (int) (threadId & mask);
238 for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
239 Magazine mag = mags[index + i & mask];
240 if (buf == null) {
241 buf = mag.newBuffer();
242 }
243 if (mag.tryAllocate(size, sizeBucket, maxCapacity, buf)) {
244
245 return buf;
246 }
247 }
248 expansions++;
249 } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
250 }
251
252
253 return allocateFallback(size, maxCapacity, currentThread, buf);
254 }
255
256 private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
257
258 Magazine magazine;
259 if (buf != null) {
260 Chunk chunk = buf.chunk;
261 if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
262 magazine = getFallbackMagazine(currentThread);
263 }
264 } else {
265 magazine = getFallbackMagazine(currentThread);
266 buf = magazine.newBuffer();
267 }
268
269 AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
270 Chunk chunk = new Chunk(innerChunk, magazine, false);
271 try {
272 chunk.readInitInto(buf, size, maxCapacity);
273 } finally {
274
275
276
277 chunk.release();
278 }
279 return buf;
280 }
281
282 private Magazine getFallbackMagazine(Thread currentThread) {
283 Object tlMag;
284 FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
285 if (threadLocalMagazine != null &&
286 currentThread instanceof FastThreadLocalThread &&
287 (tlMag = threadLocalMagazine.get()) != NO_MAGAZINE) {
288 return (Magazine) tlMag;
289 }
290 Magazine[] mags = magazines;
291 return mags[(int) currentThread.getId() & mags.length - 1];
292 }
293
294
295
296
297 void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
298 AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
299 assert result == into: "Re-allocation created separate buffer instance";
300 }
301
302 long usedMemory() {
303 long sum = 0;
304 for (Chunk chunk : centralQueue) {
305 sum += chunk.capacity();
306 }
307 for (Magazine magazine : magazines) {
308 sum += magazine.usedMemory.get();
309 }
310 if (liveCachedMagazines != null) {
311 for (Magazine magazine : liveCachedMagazines) {
312 sum += magazine.usedMemory.get();
313 }
314 }
315 return sum;
316 }
317
318 private boolean tryExpandMagazines(int currentLength) {
319 if (currentLength >= MAX_STRIPES) {
320 return true;
321 }
322 final Magazine[] mags;
323 long writeLock = magazineExpandLock.tryWriteLock();
324 if (writeLock != 0) {
325 try {
326 mags = magazines;
327 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
328 return true;
329 }
330 int preferredChunkSize = mags[0].sharedPrefChunkSize;
331 Magazine[] expanded = new Magazine[mags.length * 2];
332 for (int i = 0, l = expanded.length; i < l; i++) {
333 Magazine m = new Magazine(this);
334 m.localPrefChunkSize = preferredChunkSize;
335 m.sharedPrefChunkSize = preferredChunkSize;
336 expanded[i] = m;
337 }
338 magazines = expanded;
339 } finally {
340 magazineExpandLock.unlockWrite(writeLock);
341 }
342 for (Magazine magazine : mags) {
343 magazine.free();
344 }
345 }
346 return true;
347 }
348
349 private boolean offerToQueue(Chunk buffer) {
350 if (freed) {
351 return false;
352 }
353 boolean isAdded = centralQueue.offer(buffer);
354 if (freed && isAdded) {
355
356 freeCentralQueue();
357 }
358 return isAdded;
359 }
360
361
362
363
364 @Override
365 protected void finalize() throws Throwable {
366 try {
367 super.finalize();
368 } finally {
369 free();
370 }
371 }
372
373 private void free() {
374 freed = true;
375 long stamp = magazineExpandLock.writeLock();
376 try {
377 Magazine[] mags = magazines;
378 for (Magazine magazine : mags) {
379 magazine.free();
380 }
381 } finally {
382 magazineExpandLock.unlockWrite(stamp);
383 }
384 freeCentralQueue();
385 }
386
387 private void freeCentralQueue() {
388 for (;;) {
389 Chunk chunk = centralQueue.poll();
390 if (chunk == null) {
391 break;
392 }
393 chunk.release();
394 }
395 }
396
397 static int sizeBucket(int size) {
398 return AllocationStatistics.sizeBucket(size);
399 }
400
401 @SuppressWarnings("checkstyle:finalclass")
402 private static class AllocationStatistics {
403 private static final int MIN_DATUM_TARGET = 1024;
404 private static final int MAX_DATUM_TARGET = 65534;
405 private static final int INIT_DATUM_TARGET = 9;
406 private static final int HISTO_MIN_BUCKET_SHIFT = 13;
407 private static final int HISTO_MAX_BUCKET_SHIFT = 20;
408 private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT;
409 private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
410 private static final int SIZE_MAX_MASK = MAX_CHUNK_SIZE - 1;
411
412 protected final AdaptivePoolingAllocator parent;
413 private final boolean shareable;
414 private final short[][] histos = {
415 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
416 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
417 };
418 private short[] histo = histos[0];
419 private final int[] sums = new int[HISTO_BUCKET_COUNT];
420
421 private int histoIndex;
422 private int datumCount;
423 private int datumTarget = INIT_DATUM_TARGET;
424 protected volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
425 protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
426
427 private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
428 this.parent = parent;
429 this.shareable = shareable;
430 }
431
432 protected void recordAllocationSize(int bucket) {
433 histo[bucket]++;
434 if (datumCount++ == datumTarget) {
435 rotateHistograms();
436 }
437 }
438
439 static int sizeBucket(int size) {
440 if (size == 0) {
441 return 0;
442 }
443
444
445
446
447 int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & SIZE_MAX_MASK;
448 return Math.min(Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize), HISTO_MAX_BUCKET_MASK);
449 }
450
451 private void rotateHistograms() {
452 short[][] hs = histos;
453 for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
454 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
455 }
456 int sum = 0;
457 for (int count : sums) {
458 sum += count;
459 }
460 int targetPercentile = (int) (sum * 0.99);
461 int sizeBucket = 0;
462 for (; sizeBucket < sums.length; sizeBucket++) {
463 if (sums[sizeBucket] > targetPercentile) {
464 break;
465 }
466 targetPercentile -= sums[sizeBucket];
467 }
468 int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
469 int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
470 localPrefChunkSize = prefChunkSize;
471 if (shareable) {
472 for (Magazine mag : parent.magazines) {
473 prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
474 }
475 }
476 if (sharedPrefChunkSize != prefChunkSize) {
477
478 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
479 sharedPrefChunkSize = prefChunkSize;
480 } else {
481
482 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
483 }
484
485 histoIndex = histoIndex + 1 & 3;
486 histo = histos[histoIndex];
487 datumCount = 0;
488 Arrays.fill(histo, (short) 0);
489 }
490
491
492
493
494
495
496
497
498
499 protected int preferredChunkSize() {
500 return sharedPrefChunkSize;
501 }
502 }
503
504 private static final class Magazine extends AllocationStatistics {
505 private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
506 static {
507 NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
508 }
509 private static final Chunk MAGAZINE_FREED = new Chunk();
510
511 private static final ObjectPool<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
512 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
513 @Override
514 public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
515 return new AdaptiveByteBuf(handle);
516 }
517 });
518
519 private Chunk current;
520 @SuppressWarnings("unused")
521 private volatile Chunk nextInLine;
522 private final AtomicLong usedMemory;
523 private final StampedLock allocationLock;
524 private final Queue<AdaptiveByteBuf> bufferQueue;
525 private final ObjectPool.Handle<AdaptiveByteBuf> handle;
526
527 Magazine(AdaptivePoolingAllocator parent) {
528 this(parent, true);
529 }
530
531 Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
532 super(parent, shareable);
533
534 if (shareable) {
535
536 allocationLock = new StampedLock();
537 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
538 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
539 @Override
540 public void recycle(AdaptiveByteBuf self) {
541 bufferQueue.offer(self);
542 }
543 };
544 } else {
545 allocationLock = null;
546 bufferQueue = null;
547 handle = null;
548 }
549 usedMemory = new AtomicLong();
550 }
551
552 public boolean tryAllocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
553 if (allocationLock == null) {
554
555 return allocate(size, sizeBucket, maxCapacity, buf);
556 }
557
558
559 long writeLock = allocationLock.tryWriteLock();
560 if (writeLock != 0) {
561 try {
562 return allocate(size, sizeBucket, maxCapacity, buf);
563 } finally {
564 allocationLock.unlockWrite(writeLock);
565 }
566 }
567 return false;
568 }
569
570 private boolean allocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
571 recordAllocationSize(sizeBucket);
572 Chunk curr = current;
573 if (curr != null) {
574 if (curr.remainingCapacity() > size) {
575 curr.readInitInto(buf, size, maxCapacity);
576
577 return true;
578 }
579
580
581
582 current = null;
583 if (curr.remainingCapacity() == size) {
584 try {
585 curr.readInitInto(buf, size, maxCapacity);
586 return true;
587 } finally {
588 curr.release();
589 }
590 }
591 }
592 Chunk last = curr;
593 assert current == null;
594
595
596
597
598
599
600
601 if (nextInLine != null) {
602 curr = NEXT_IN_LINE.getAndSet(this, null);
603 if (curr == MAGAZINE_FREED) {
604
605 restoreMagazineFreed();
606 return false;
607 }
608 } else {
609 curr = parent.centralQueue.poll();
610 if (curr == null) {
611 curr = newChunkAllocation(size);
612 } else {
613 curr.attachToMagazine(this);
614 }
615 }
616 current = curr;
617 assert current != null;
618 if (last != null) {
619 if (last.remainingCapacity() < RETIRE_CAPACITY) {
620 last.release();
621 } else {
622 transferChunk(last);
623 }
624 }
625 if (curr.remainingCapacity() > size) {
626 curr.readInitInto(buf, size, maxCapacity);
627 } else if (curr.remainingCapacity() == size) {
628 try {
629 curr.readInitInto(buf, size, maxCapacity);
630 } finally {
631
632
633 curr.release();
634 current = null;
635 }
636 } else {
637 Chunk newChunk = newChunkAllocation(size);
638 try {
639 newChunk.readInitInto(buf, size, maxCapacity);
640 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
641 curr.release();
642 current = newChunk;
643 } else {
644 transferChunk(newChunk);
645 }
646 newChunk = null;
647 } finally {
648 if (newChunk != null) {
649 assert current == null;
650
651 newChunk.release();
652 }
653 }
654 }
655 return true;
656 }
657
658 private void restoreMagazineFreed() {
659 Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
660 if (next != null && next != MAGAZINE_FREED) {
661 next.release();
662 }
663 }
664
665 private void transferChunk(Chunk current) {
666 if (NEXT_IN_LINE.compareAndSet(this, null, current)) {
667 return;
668 }
669
670
671 current.detachFromMagazine();
672 if (parent.offerToQueue(current)) {
673 return;
674 }
675
676 current.attachToMagazine(this);
677 Chunk nextChunk = NEXT_IN_LINE.get(this);
678 if (nextChunk != null && nextChunk != MAGAZINE_FREED
679 && current.remainingCapacity() > nextChunk.remainingCapacity()) {
680 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, current)) {
681 nextChunk.release();
682 return;
683 }
684 }
685
686
687 current.release();
688 }
689
690 private Chunk newChunkAllocation(int promptingSize) {
691 int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
692 ChunkAllocator chunkAllocator = parent.chunkAllocator;
693 return new Chunk(chunkAllocator.allocate(size, size), this, true);
694 }
695
696 boolean trySetNextInLine(Chunk chunk) {
697 return NEXT_IN_LINE.compareAndSet(this, null, chunk);
698 }
699
700 void free() {
701
702 restoreMagazineFreed();
703 long stamp = allocationLock.writeLock();
704 try {
705 if (current != null) {
706 current.release();
707 current = null;
708 }
709 } finally {
710 allocationLock.unlockWrite(stamp);
711 }
712 }
713
714 public AdaptiveByteBuf newBuffer() {
715 AdaptiveByteBuf buf;
716 if (handle == null) {
717 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
718 } else {
719 buf = bufferQueue.poll();
720 if (buf == null) {
721 buf = new AdaptiveByteBuf(handle);
722 }
723 }
724 buf.resetRefCnt();
725 buf.discardMarks();
726 return buf;
727 }
728 }
729
730 private static final class Chunk implements ReferenceCounted {
731
732 private final AbstractByteBuf delegate;
733 private Magazine magazine;
734 private final AdaptivePoolingAllocator allocator;
735 private final int capacity;
736 private final boolean pooled;
737 private int allocatedBytes;
738 private static final long REFCNT_FIELD_OFFSET =
739 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
740 private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
741 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
742
743 private static final ReferenceCountUpdater<Chunk> updater =
744 new ReferenceCountUpdater<Chunk>() {
745 @Override
746 protected AtomicIntegerFieldUpdater<Chunk> updater() {
747 return AIF_UPDATER;
748 }
749 @Override
750 protected long unsafeOffset() {
751 return REFCNT_FIELD_OFFSET;
752 }
753 };
754
755
756 @SuppressWarnings({"unused", "FieldMayBeFinal"})
757 private volatile int refCnt;
758
759 Chunk() {
760
761 delegate = null;
762 magazine = null;
763 allocator = null;
764 capacity = 0;
765 pooled = false;
766 }
767
768 Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
769 this.delegate = delegate;
770 this.pooled = pooled;
771 capacity = delegate.capacity();
772 updater.setInitialValue(this);
773 allocator = magazine.parent;
774 attachToMagazine(magazine);
775 }
776
777 Magazine currentMagazine() {
778 return magazine;
779 }
780
781 void detachFromMagazine() {
782 if (magazine != null) {
783 magazine.usedMemory.getAndAdd(-capacity);
784 magazine = null;
785 }
786 }
787
788 void attachToMagazine(Magazine magazine) {
789 assert this.magazine == null;
790 this.magazine = magazine;
791 magazine.usedMemory.getAndAdd(capacity);
792 }
793
794 @Override
795 public Chunk touch(Object hint) {
796 return this;
797 }
798
799 @Override
800 public int refCnt() {
801 return updater.refCnt(this);
802 }
803
804 @Override
805 public Chunk retain() {
806 return updater.retain(this);
807 }
808
809 @Override
810 public Chunk retain(int increment) {
811 return updater.retain(this, increment);
812 }
813
814 @Override
815 public Chunk touch() {
816 return this;
817 }
818
819 @Override
820 public boolean release() {
821 if (updater.release(this)) {
822 deallocate();
823 return true;
824 }
825 return false;
826 }
827
828 @Override
829 public boolean release(int decrement) {
830 if (updater.release(this, decrement)) {
831 deallocate();
832 return true;
833 }
834 return false;
835 }
836
837 private void deallocate() {
838 Magazine mag = magazine;
839 AdaptivePoolingAllocator parent = mag.parent;
840 int chunkSize = mag.preferredChunkSize();
841 int memSize = delegate.capacity();
842 if (!pooled || memSize < chunkSize || memSize > chunkSize + (chunkSize >> 1)) {
843
844
845 detachFromMagazine();
846 delegate.release();
847 } else {
848 updater.resetRefCnt(this);
849 delegate.setIndex(0, 0);
850 allocatedBytes = 0;
851 if (!mag.trySetNextInLine(this)) {
852
853 detachFromMagazine();
854 if (!parent.offerToQueue(this)) {
855
856
857 boolean released = updater.release(this);
858 delegate.release();
859 assert released;
860 }
861 }
862 }
863 }
864
865 public void readInitInto(AdaptiveByteBuf buf, int size, int maxCapacity) {
866 int startIndex = allocatedBytes;
867 allocatedBytes = startIndex + size;
868 Chunk chunk = this;
869 chunk.retain();
870 try {
871 buf.init(delegate, chunk, 0, 0, startIndex, size, maxCapacity);
872 chunk = null;
873 } finally {
874 if (chunk != null) {
875
876
877 chunk.release();
878 }
879 }
880 }
881
882 public int remainingCapacity() {
883 return capacity - allocatedBytes;
884 }
885
886 public int capacity() {
887 return capacity;
888 }
889 }
890
891 static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
892
893 private final ObjectPool.Handle<AdaptiveByteBuf> handle;
894
895 private int adjustment;
896 private AbstractByteBuf rootParent;
897 Chunk chunk;
898 private int length;
899 private ByteBuffer tmpNioBuf;
900 private boolean hasArray;
901 private boolean hasMemoryAddress;
902
903 AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
904 super(0);
905 handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
906 }
907
908 void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
909 int adjustment, int capacity, int maxCapacity) {
910 this.adjustment = adjustment;
911 chunk = wrapped;
912 length = capacity;
913 maxCapacity(maxCapacity);
914 setIndex0(readerIndex, writerIndex);
915 hasArray = unwrapped.hasArray();
916 hasMemoryAddress = unwrapped.hasMemoryAddress();
917 rootParent = unwrapped;
918 tmpNioBuf = unwrapped.internalNioBuffer(adjustment, capacity).slice();
919 }
920
921 private AbstractByteBuf rootParent() {
922 final AbstractByteBuf rootParent = this.rootParent;
923 if (rootParent != null) {
924 return rootParent;
925 }
926 throw new IllegalReferenceCountException();
927 }
928
929 @Override
930 public int capacity() {
931 return length;
932 }
933
934 @Override
935 public ByteBuf capacity(int newCapacity) {
936 if (newCapacity == capacity()) {
937 ensureAccessible();
938 return this;
939 }
940 checkNewCapacity(newCapacity);
941 if (newCapacity < capacity()) {
942 length = newCapacity;
943 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
944 return this;
945 }
946
947
948 ByteBuffer data = tmpNioBuf;
949 data.clear();
950 tmpNioBuf = null;
951 Chunk chunk = this.chunk;
952 AdaptivePoolingAllocator allocator = chunk.allocator;
953 int readerIndex = this.readerIndex;
954 int writerIndex = this.writerIndex;
955 allocator.allocate(newCapacity, maxCapacity(), this);
956 tmpNioBuf.put(data);
957 tmpNioBuf.clear();
958 chunk.release();
959 this.readerIndex = readerIndex;
960 this.writerIndex = writerIndex;
961 return this;
962 }
963
964 @Override
965 public ByteBufAllocator alloc() {
966 return rootParent().alloc();
967 }
968
969 @Override
970 public ByteOrder order() {
971 return rootParent().order();
972 }
973
974 @Override
975 public ByteBuf unwrap() {
976 return null;
977 }
978
979 @Override
980 public boolean isDirect() {
981 return rootParent().isDirect();
982 }
983
984 @Override
985 public int arrayOffset() {
986 return idx(rootParent().arrayOffset());
987 }
988
989 @Override
990 public boolean hasMemoryAddress() {
991 return hasMemoryAddress;
992 }
993
994 @Override
995 public long memoryAddress() {
996 ensureAccessible();
997 return rootParent().memoryAddress() + adjustment;
998 }
999
1000 @Override
1001 public ByteBuffer nioBuffer(int index, int length) {
1002 checkIndex(index, length);
1003 return rootParent().nioBuffer(idx(index), length);
1004 }
1005
1006 @Override
1007 public ByteBuffer internalNioBuffer(int index, int length) {
1008 checkIndex(index, length);
1009 return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1010 }
1011
1012 private ByteBuffer internalNioBuffer() {
1013 return (ByteBuffer) tmpNioBuf.clear();
1014 }
1015
1016 @Override
1017 public ByteBuffer[] nioBuffers(int index, int length) {
1018 checkIndex(index, length);
1019 return rootParent().nioBuffers(idx(index), length);
1020 }
1021
1022 @Override
1023 public boolean hasArray() {
1024 return hasArray;
1025 }
1026
1027 @Override
1028 public byte[] array() {
1029 ensureAccessible();
1030 return rootParent().array();
1031 }
1032
1033 @Override
1034 public ByteBuf copy(int index, int length) {
1035 checkIndex(index, length);
1036 return rootParent().copy(idx(index), length);
1037 }
1038
1039 @Override
1040 public int nioBufferCount() {
1041 return rootParent().nioBufferCount();
1042 }
1043
1044 @Override
1045 protected byte _getByte(int index) {
1046 return rootParent()._getByte(idx(index));
1047 }
1048
1049 @Override
1050 protected short _getShort(int index) {
1051 return rootParent()._getShort(idx(index));
1052 }
1053
1054 @Override
1055 protected short _getShortLE(int index) {
1056 return rootParent()._getShortLE(idx(index));
1057 }
1058
1059 @Override
1060 protected int _getUnsignedMedium(int index) {
1061 return rootParent()._getUnsignedMedium(idx(index));
1062 }
1063
1064 @Override
1065 protected int _getUnsignedMediumLE(int index) {
1066 return rootParent()._getUnsignedMediumLE(idx(index));
1067 }
1068
1069 @Override
1070 protected int _getInt(int index) {
1071 return rootParent()._getInt(idx(index));
1072 }
1073
1074 @Override
1075 protected int _getIntLE(int index) {
1076 return rootParent()._getIntLE(idx(index));
1077 }
1078
1079 @Override
1080 protected long _getLong(int index) {
1081 return rootParent()._getLong(idx(index));
1082 }
1083
1084 @Override
1085 protected long _getLongLE(int index) {
1086 return rootParent()._getLongLE(idx(index));
1087 }
1088
1089 @Override
1090 public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1091 checkIndex(index, length);
1092 rootParent().getBytes(idx(index), dst, dstIndex, length);
1093 return this;
1094 }
1095
1096 @Override
1097 public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1098 checkIndex(index, length);
1099 rootParent().getBytes(idx(index), dst, dstIndex, length);
1100 return this;
1101 }
1102
1103 @Override
1104 public ByteBuf getBytes(int index, ByteBuffer dst) {
1105 checkIndex(index, dst.remaining());
1106 rootParent().getBytes(idx(index), dst);
1107 return this;
1108 }
1109
1110 @Override
1111 protected void _setByte(int index, int value) {
1112 rootParent()._setByte(idx(index), value);
1113 }
1114
1115 @Override
1116 protected void _setShort(int index, int value) {
1117 rootParent()._setShort(idx(index), value);
1118 }
1119
1120 @Override
1121 protected void _setShortLE(int index, int value) {
1122 rootParent()._setShortLE(idx(index), value);
1123 }
1124
1125 @Override
1126 protected void _setMedium(int index, int value) {
1127 rootParent()._setMedium(idx(index), value);
1128 }
1129
1130 @Override
1131 protected void _setMediumLE(int index, int value) {
1132 rootParent()._setMediumLE(idx(index), value);
1133 }
1134
1135 @Override
1136 protected void _setInt(int index, int value) {
1137 rootParent()._setInt(idx(index), value);
1138 }
1139
1140 @Override
1141 protected void _setIntLE(int index, int value) {
1142 rootParent()._setIntLE(idx(index), value);
1143 }
1144
1145 @Override
1146 protected void _setLong(int index, long value) {
1147 rootParent()._setLong(idx(index), value);
1148 }
1149
1150 @Override
1151 protected void _setLongLE(int index, long value) {
1152 rootParent().setLongLE(idx(index), value);
1153 }
1154
1155 @Override
1156 public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1157 checkIndex(index, length);
1158 rootParent().setBytes(idx(index), src, srcIndex, length);
1159 return this;
1160 }
1161
1162 @Override
1163 public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1164 checkIndex(index, length);
1165 rootParent().setBytes(idx(index), src, srcIndex, length);
1166 return this;
1167 }
1168
1169 @Override
1170 public ByteBuf setBytes(int index, ByteBuffer src) {
1171 checkIndex(index, src.remaining());
1172 rootParent().setBytes(idx(index), src);
1173 return this;
1174 }
1175
1176 @Override
1177 public ByteBuf getBytes(int index, OutputStream out, int length)
1178 throws IOException {
1179 checkIndex(index, length);
1180 if (length != 0) {
1181 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1182 }
1183 return this;
1184 }
1185
1186 @Override
1187 public int getBytes(int index, GatheringByteChannel out, int length)
1188 throws IOException {
1189 return out.write(internalNioBuffer(index, length).duplicate());
1190 }
1191
1192 @Override
1193 public int getBytes(int index, FileChannel out, long position, int length)
1194 throws IOException {
1195 return out.write(internalNioBuffer(index, length).duplicate(), position);
1196 }
1197
1198 @Override
1199 public int setBytes(int index, InputStream in, int length)
1200 throws IOException {
1201 checkIndex(index, length);
1202 final AbstractByteBuf rootParent = rootParent();
1203 if (rootParent.hasArray()) {
1204 return rootParent.setBytes(idx(index), in, length);
1205 }
1206 byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1207 int readBytes = in.read(tmp, 0, length);
1208 if (readBytes <= 0) {
1209 return readBytes;
1210 }
1211 setBytes(index, tmp, 0, readBytes);
1212 return readBytes;
1213 }
1214
1215 @Override
1216 public int setBytes(int index, ScatteringByteChannel in, int length)
1217 throws IOException {
1218 try {
1219 return in.read(internalNioBuffer(index, length).duplicate());
1220 } catch (ClosedChannelException ignored) {
1221 return -1;
1222 }
1223 }
1224
1225 @Override
1226 public int setBytes(int index, FileChannel in, long position, int length)
1227 throws IOException {
1228 try {
1229 return in.read(internalNioBuffer(index, length).duplicate(), position);
1230 } catch (ClosedChannelException ignored) {
1231 return -1;
1232 }
1233 }
1234
1235 @Override
1236 public int forEachByte(int index, int length, ByteProcessor processor) {
1237 checkIndex(index, length);
1238 int ret = rootParent().forEachByte(idx(index), length, processor);
1239 return forEachResult(ret);
1240 }
1241
1242 @Override
1243 public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1244 checkIndex(index, length);
1245 int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1246 return forEachResult(ret);
1247 }
1248
1249 private int forEachResult(int ret) {
1250 if (ret < adjustment) {
1251 return -1;
1252 }
1253 return ret - adjustment;
1254 }
1255
1256 @Override
1257 public boolean isContiguous() {
1258 return rootParent().isContiguous();
1259 }
1260
1261 private int idx(int index) {
1262 return index + adjustment;
1263 }
1264
1265 @Override
1266 protected void deallocate() {
1267 if (chunk != null) {
1268 chunk.release();
1269 }
1270 tmpNioBuf = null;
1271 chunk = null;
1272 rootParent = null;
1273 if (handle instanceof EnhancedHandle) {
1274 EnhancedHandle<AdaptiveByteBuf> enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1275 enhancedHandle.unguardedRecycle(this);
1276 } else {
1277 handle.recycle(this);
1278 }
1279 }
1280 }
1281
1282
1283
1284
1285 interface ChunkAllocator {
1286
1287
1288
1289
1290
1291
1292 AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1293 }
1294 }