1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.ByteProcessor;
19 import io.netty.util.IllegalReferenceCountException;
20 import io.netty.util.NettyRuntime;
21 import io.netty.util.Recycler.EnhancedHandle;
22 import io.netty.util.ReferenceCounted;
23 import io.netty.util.concurrent.FastThreadLocal;
24 import io.netty.util.concurrent.FastThreadLocalThread;
25 import io.netty.util.internal.ObjectPool;
26 import io.netty.util.internal.ObjectUtil;
27 import io.netty.util.internal.PlatformDependent;
28 import io.netty.util.internal.ReferenceCountUpdater;
29 import io.netty.util.internal.SuppressJava6Requirement;
30 import io.netty.util.internal.SystemPropertyUtil;
31 import io.netty.util.internal.ThreadExecutorMap;
32 import io.netty.util.internal.UnstableApi;
33
34 import java.io.IOException;
35 import java.io.InputStream;
36 import java.io.OutputStream;
37 import java.nio.ByteBuffer;
38 import java.nio.ByteOrder;
39 import java.nio.channels.ClosedChannelException;
40 import java.nio.channels.FileChannel;
41 import java.nio.channels.GatheringByteChannel;
42 import java.nio.channels.ScatteringByteChannel;
43 import java.util.Arrays;
44 import java.util.Queue;
45 import java.util.Set;
46 import java.util.concurrent.ConcurrentLinkedQueue;
47 import java.util.concurrent.CopyOnWriteArraySet;
48 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
49 import java.util.concurrent.atomic.AtomicLong;
50 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
51 import java.util.concurrent.locks.StampedLock;
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79 @SuppressJava6Requirement(reason = "Guarded by version check")
80 @UnstableApi
81 final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.AdaptiveAllocatorApi {
82
83 enum MagazineCaching {
84 EventLoopThreads,
85 FastThreadLocalThreads,
86 None
87 }
88
89 private static final int EXPANSION_ATTEMPTS = 3;
90 private static final int INITIAL_MAGAZINES = 4;
91 private static final int RETIRE_CAPACITY = 4 * 1024;
92 private static final int MIN_CHUNK_SIZE = 128 * 1024;
93 private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
94 private static final int BUFS_PER_CHUNK = 10;
95
96
97
98
99
100
101 private static final int MAX_CHUNK_SIZE =
102 BUFS_PER_CHUNK * (1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT);
103
104
105
106
107
108
109
110
111
112 private static final int CENTRAL_QUEUE_CAPACITY = Math.max(2, SystemPropertyUtil.getInt(
113 "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors()));
114
115
116
117
118
119 private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
120 "io.netty.allocator.magazineBufferQueueCapacity", 1024);
121
122 private static final Object NO_MAGAZINE = Boolean.TRUE;
123
124 private final ChunkAllocator chunkAllocator;
125 private final Queue<Chunk> centralQueue;
126 private final StampedLock magazineExpandLock;
127 private volatile Magazine[] magazines;
128 private final FastThreadLocal<Object> threadLocalMagazine;
129 private final Set<Magazine> liveCachedMagazines;
130 private volatile boolean freed;
131
132 static {
133 if (CENTRAL_QUEUE_CAPACITY < 2) {
134 throw new IllegalArgumentException("CENTRAL_QUEUE_CAPACITY: " + CENTRAL_QUEUE_CAPACITY
135 + " (expected: >= " + 2 + ')');
136 }
137 if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
138 throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
139 + " (expected: >= " + 2 + ')');
140 }
141 }
142
143 AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
144 ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
145 ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
146 this.chunkAllocator = chunkAllocator;
147 centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
148 magazineExpandLock = new StampedLock();
149 if (magazineCaching != MagazineCaching.None) {
150 assert magazineCaching == MagazineCaching.EventLoopThreads ||
151 magazineCaching == MagazineCaching.FastThreadLocalThreads;
152 final boolean cachedMagazinesNonEventLoopThreads =
153 magazineCaching == MagazineCaching.FastThreadLocalThreads;
154 final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
155 threadLocalMagazine = new FastThreadLocal<Object>() {
156 @Override
157 protected Object initialValue() {
158 if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
159 if (!FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread())) {
160
161 return NO_MAGAZINE;
162 }
163 Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
164 liveMagazines.add(mag);
165 return mag;
166 }
167 return NO_MAGAZINE;
168 }
169
170 @Override
171 protected void onRemoval(final Object value) throws Exception {
172 if (value != NO_MAGAZINE) {
173 liveMagazines.remove(value);
174 }
175 }
176 };
177 liveCachedMagazines = liveMagazines;
178 } else {
179 threadLocalMagazine = null;
180 liveCachedMagazines = null;
181 }
182 Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
183 for (int i = 0; i < mags.length; i++) {
184 mags[i] = new Magazine(this);
185 }
186 magazines = mags;
187 }
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209 private static Queue<Chunk> createSharedChunkQueue() {
210 return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
211 }
212
213 @Override
214 public ByteBuf allocate(int size, int maxCapacity) {
215 return allocate(size, maxCapacity, Thread.currentThread(), null);
216 }
217
218 private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
219 if (size <= MAX_CHUNK_SIZE) {
220 int sizeBucket = AllocationStatistics.sizeBucket(size);
221 FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
222 if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
223 Object mag = threadLocalMagazine.get();
224 if (mag != NO_MAGAZINE) {
225 Magazine magazine = (Magazine) mag;
226 if (buf == null) {
227 buf = magazine.newBuffer();
228 }
229 boolean allocated = magazine.tryAllocate(size, sizeBucket, maxCapacity, buf);
230 assert allocated : "Allocation of threadLocalMagazine must always succeed";
231 return buf;
232 }
233 }
234 long threadId = currentThread.getId();
235 Magazine[] mags;
236 int expansions = 0;
237 do {
238 mags = magazines;
239 int mask = mags.length - 1;
240 int index = (int) (threadId & mask);
241 for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
242 Magazine mag = mags[index + i & mask];
243 if (buf == null) {
244 buf = mag.newBuffer();
245 }
246 if (mag.tryAllocate(size, sizeBucket, maxCapacity, buf)) {
247
248 return buf;
249 }
250 }
251 expansions++;
252 } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
253 }
254
255
256 return allocateFallback(size, maxCapacity, currentThread, buf);
257 }
258
259 private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
260
261 Magazine magazine;
262 if (buf != null) {
263 Chunk chunk = buf.chunk;
264 if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
265 magazine = getFallbackMagazine(currentThread);
266 }
267 } else {
268 magazine = getFallbackMagazine(currentThread);
269 buf = magazine.newBuffer();
270 }
271
272 AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
273 Chunk chunk = new Chunk(innerChunk, magazine, false);
274 try {
275 chunk.readInitInto(buf, size, maxCapacity);
276 } finally {
277
278
279
280 chunk.release();
281 }
282 return buf;
283 }
284
285 private Magazine getFallbackMagazine(Thread currentThread) {
286 Object tlMag;
287 FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
288 if (threadLocalMagazine != null &&
289 currentThread instanceof FastThreadLocalThread &&
290 (tlMag = threadLocalMagazine.get()) != NO_MAGAZINE) {
291 return (Magazine) tlMag;
292 }
293 Magazine[] mags = magazines;
294 return mags[(int) currentThread.getId() & mags.length - 1];
295 }
296
297
298
299
300 void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
301 AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
302 assert result == into: "Re-allocation created separate buffer instance";
303 }
304
305 @Override
306 public long usedMemory() {
307 long sum = 0;
308 for (Chunk chunk : centralQueue) {
309 sum += chunk.capacity();
310 }
311 for (Magazine magazine : magazines) {
312 sum += magazine.usedMemory.get();
313 }
314 if (liveCachedMagazines != null) {
315 for (Magazine magazine : liveCachedMagazines) {
316 sum += magazine.usedMemory.get();
317 }
318 }
319 return sum;
320 }
321
322 private boolean tryExpandMagazines(int currentLength) {
323 if (currentLength >= MAX_STRIPES) {
324 return true;
325 }
326 final Magazine[] mags;
327 long writeLock = magazineExpandLock.tryWriteLock();
328 if (writeLock != 0) {
329 try {
330 mags = magazines;
331 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
332 return true;
333 }
334 int preferredChunkSize = mags[0].sharedPrefChunkSize;
335 Magazine[] expanded = new Magazine[mags.length * 2];
336 for (int i = 0, l = expanded.length; i < l; i++) {
337 Magazine m = new Magazine(this);
338 m.localPrefChunkSize = preferredChunkSize;
339 m.sharedPrefChunkSize = preferredChunkSize;
340 expanded[i] = m;
341 }
342 magazines = expanded;
343 } finally {
344 magazineExpandLock.unlockWrite(writeLock);
345 }
346 for (Magazine magazine : mags) {
347 magazine.free();
348 }
349 }
350 return true;
351 }
352
353 private boolean offerToQueue(Chunk buffer) {
354 if (freed) {
355 return false;
356 }
357
358 assert buffer.allocatedBytes == 0;
359 assert buffer.magazine == null;
360
361 boolean isAdded = centralQueue.offer(buffer);
362 if (freed && isAdded) {
363
364 freeCentralQueue();
365 }
366 return isAdded;
367 }
368
369
370
371
372 @Override
373 protected void finalize() throws Throwable {
374 try {
375 super.finalize();
376 } finally {
377 free();
378 }
379 }
380
381 private void free() {
382 freed = true;
383 long stamp = magazineExpandLock.writeLock();
384 try {
385 Magazine[] mags = magazines;
386 for (Magazine magazine : mags) {
387 magazine.free();
388 }
389 } finally {
390 magazineExpandLock.unlockWrite(stamp);
391 }
392 freeCentralQueue();
393 }
394
395 private void freeCentralQueue() {
396 for (;;) {
397 Chunk chunk = centralQueue.poll();
398 if (chunk == null) {
399 break;
400 }
401 chunk.release();
402 }
403 }
404
405 static int sizeBucket(int size) {
406 return AllocationStatistics.sizeBucket(size);
407 }
408
409 @SuppressWarnings("checkstyle:finalclass")
410 private static class AllocationStatistics {
411 private static final int MIN_DATUM_TARGET = 1024;
412 private static final int MAX_DATUM_TARGET = 65534;
413 private static final int INIT_DATUM_TARGET = 9;
414 private static final int HISTO_MIN_BUCKET_SHIFT = 13;
415 private static final int HISTO_MAX_BUCKET_SHIFT = 20;
416 private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT;
417 private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
418 private static final int SIZE_MAX_MASK = MAX_CHUNK_SIZE - 1;
419
420 protected final AdaptivePoolingAllocator parent;
421 private final boolean shareable;
422 private final short[][] histos = {
423 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
424 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
425 };
426 private short[] histo = histos[0];
427 private final int[] sums = new int[HISTO_BUCKET_COUNT];
428
429 private int histoIndex;
430 private int datumCount;
431 private int datumTarget = INIT_DATUM_TARGET;
432 protected volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
433 protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
434
435 private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
436 this.parent = parent;
437 this.shareable = shareable;
438 }
439
440 protected void recordAllocationSize(int bucket) {
441 histo[bucket]++;
442 if (datumCount++ == datumTarget) {
443 rotateHistograms();
444 }
445 }
446
447 static int sizeBucket(int size) {
448 if (size == 0) {
449 return 0;
450 }
451
452
453
454
455 int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & SIZE_MAX_MASK;
456 return Math.min(Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize), HISTO_MAX_BUCKET_MASK);
457 }
458
459 private void rotateHistograms() {
460 short[][] hs = histos;
461 for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
462 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
463 }
464 int sum = 0;
465 for (int count : sums) {
466 sum += count;
467 }
468 int targetPercentile = (int) (sum * 0.99);
469 int sizeBucket = 0;
470 for (; sizeBucket < sums.length; sizeBucket++) {
471 if (sums[sizeBucket] > targetPercentile) {
472 break;
473 }
474 targetPercentile -= sums[sizeBucket];
475 }
476 int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
477 int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
478 localPrefChunkSize = prefChunkSize;
479 if (shareable) {
480 for (Magazine mag : parent.magazines) {
481 prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
482 }
483 }
484 if (sharedPrefChunkSize != prefChunkSize) {
485
486 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
487 sharedPrefChunkSize = prefChunkSize;
488 } else {
489
490 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
491 }
492
493 histoIndex = histoIndex + 1 & 3;
494 histo = histos[histoIndex];
495 datumCount = 0;
496 Arrays.fill(histo, (short) 0);
497 }
498
499
500
501
502
503
504
505
506
507 protected int preferredChunkSize() {
508 return sharedPrefChunkSize;
509 }
510 }
511
512 @SuppressJava6Requirement(reason = "Guarded by version check")
513 private static final class Magazine extends AllocationStatistics {
514 private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
515 static {
516 NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
517 }
518 private static final Chunk MAGAZINE_FREED = new Chunk();
519
520 private static final ObjectPool<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
521 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
522 @Override
523 public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
524 return new AdaptiveByteBuf(handle);
525 }
526 });
527
528 private Chunk current;
529 @SuppressWarnings("unused")
530 private volatile Chunk nextInLine;
531 private final AtomicLong usedMemory;
532 private final StampedLock allocationLock;
533 private final Queue<AdaptiveByteBuf> bufferQueue;
534 private final ObjectPool.Handle<AdaptiveByteBuf> handle;
535
536 Magazine(AdaptivePoolingAllocator parent) {
537 this(parent, true);
538 }
539
540 Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
541 super(parent, shareable);
542
543 if (shareable) {
544
545 allocationLock = new StampedLock();
546 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
547 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
548 @Override
549 public void recycle(AdaptiveByteBuf self) {
550 bufferQueue.offer(self);
551 }
552 };
553 } else {
554 allocationLock = null;
555 bufferQueue = null;
556 handle = null;
557 }
558 usedMemory = new AtomicLong();
559 }
560
561 public boolean tryAllocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
562 if (allocationLock == null) {
563
564 return allocate(size, sizeBucket, maxCapacity, buf);
565 }
566
567
568 long writeLock = allocationLock.tryWriteLock();
569 if (writeLock != 0) {
570 try {
571 return allocate(size, sizeBucket, maxCapacity, buf);
572 } finally {
573 allocationLock.unlockWrite(writeLock);
574 }
575 }
576 return false;
577 }
578
579 private boolean allocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
580 recordAllocationSize(sizeBucket);
581 Chunk curr = current;
582 if (curr != null) {
583
584 if (curr.remainingCapacity() > size) {
585 curr.readInitInto(buf, size, maxCapacity);
586
587 return true;
588 }
589
590
591
592 current = null;
593 if (curr.remainingCapacity() == size) {
594 try {
595 curr.readInitInto(buf, size, maxCapacity);
596 return true;
597 } finally {
598 curr.release();
599 }
600 }
601
602
603 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
604 curr.release();
605 } else {
606
607
608 transferToNextInLineOrRelease(curr);
609 }
610 }
611
612 assert current == null;
613
614
615
616
617
618
619
620 if (nextInLine != null) {
621 curr = NEXT_IN_LINE.getAndSet(this, null);
622 if (curr == MAGAZINE_FREED) {
623
624 restoreMagazineFreed();
625 return false;
626 }
627
628 if (curr.remainingCapacity() > size) {
629
630 curr.readInitInto(buf, size, maxCapacity);
631 current = curr;
632 return true;
633 }
634
635 if (curr.remainingCapacity() == size) {
636
637
638 try {
639 curr.readInitInto(buf, size, maxCapacity);
640 return true;
641 } finally {
642
643
644 curr.release();
645 }
646 } else {
647
648 curr.release();
649 }
650 }
651
652
653 curr = parent.centralQueue.poll();
654 if (curr == null) {
655 curr = newChunkAllocation(size);
656 } else {
657 curr.attachToMagazine(this);
658
659 if (curr.remainingCapacity() < size) {
660
661 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
662 curr.release();
663 } else {
664
665
666 transferToNextInLineOrRelease(curr);
667 }
668 curr = newChunkAllocation(size);
669 }
670 }
671
672 current = curr;
673 try {
674 assert current.remainingCapacity() >= size;
675 if (curr.remainingCapacity() > size) {
676 curr.readInitInto(buf, size, maxCapacity);
677 curr = null;
678 } else {
679 curr.readInitInto(buf, size, maxCapacity);
680 }
681 } finally {
682 if (curr != null) {
683
684
685 curr.release();
686 current = null;
687 }
688 }
689 return true;
690 }
691
692 private void restoreMagazineFreed() {
693 Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
694 if (next != null && next != MAGAZINE_FREED) {
695 next.release();
696 }
697 }
698
699 private void transferToNextInLineOrRelease(Chunk chunk) {
700 if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
701 return;
702 }
703
704 Chunk nextChunk = NEXT_IN_LINE.get(this);
705 if (nextChunk != null && nextChunk != MAGAZINE_FREED
706 && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
707 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
708 nextChunk.release();
709 return;
710 }
711 }
712
713
714
715
716 chunk.release();
717 }
718
719 private Chunk newChunkAllocation(int promptingSize) {
720 int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
721 ChunkAllocator chunkAllocator = parent.chunkAllocator;
722 return new Chunk(chunkAllocator.allocate(size, size), this, true);
723 }
724
725 boolean trySetNextInLine(Chunk chunk) {
726 return NEXT_IN_LINE.compareAndSet(this, null, chunk);
727 }
728
729 void free() {
730
731 restoreMagazineFreed();
732 long stamp = allocationLock.writeLock();
733 try {
734 if (current != null) {
735 current.release();
736 current = null;
737 }
738 } finally {
739 allocationLock.unlockWrite(stamp);
740 }
741 }
742
743 public AdaptiveByteBuf newBuffer() {
744 AdaptiveByteBuf buf;
745 if (handle == null) {
746 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
747 } else {
748 buf = bufferQueue.poll();
749 if (buf == null) {
750 buf = new AdaptiveByteBuf(handle);
751 }
752 }
753 buf.resetRefCnt();
754 buf.discardMarks();
755 return buf;
756 }
757 }
758
759 private static final class Chunk implements ReferenceCounted {
760
761 private final AbstractByteBuf delegate;
762 private Magazine magazine;
763 private final AdaptivePoolingAllocator allocator;
764 private final int capacity;
765 private final boolean pooled;
766 private int allocatedBytes;
767 private static final long REFCNT_FIELD_OFFSET =
768 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
769 private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
770 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
771
772 private static final ReferenceCountUpdater<Chunk> updater =
773 new ReferenceCountUpdater<Chunk>() {
774 @Override
775 protected AtomicIntegerFieldUpdater<Chunk> updater() {
776 return AIF_UPDATER;
777 }
778 @Override
779 protected long unsafeOffset() {
780 return REFCNT_FIELD_OFFSET;
781 }
782 };
783
784
785 @SuppressWarnings({"unused", "FieldMayBeFinal"})
786 private volatile int refCnt;
787
788 Chunk() {
789
790 delegate = null;
791 magazine = null;
792 allocator = null;
793 capacity = 0;
794 pooled = false;
795 }
796
797 Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
798 this.delegate = delegate;
799 this.pooled = pooled;
800 capacity = delegate.capacity();
801 updater.setInitialValue(this);
802 allocator = magazine.parent;
803 attachToMagazine(magazine);
804 }
805
806 Magazine currentMagazine() {
807 return magazine;
808 }
809
810 void detachFromMagazine() {
811 if (magazine != null) {
812 magazine.usedMemory.getAndAdd(-capacity);
813 magazine = null;
814 }
815 }
816
817 void attachToMagazine(Magazine magazine) {
818 assert this.magazine == null;
819 this.magazine = magazine;
820 magazine.usedMemory.getAndAdd(capacity);
821 }
822
823 @Override
824 public Chunk touch(Object hint) {
825 return this;
826 }
827
828 @Override
829 public int refCnt() {
830 return updater.refCnt(this);
831 }
832
833 @Override
834 public Chunk retain() {
835 return updater.retain(this);
836 }
837
838 @Override
839 public Chunk retain(int increment) {
840 return updater.retain(this, increment);
841 }
842
843 @Override
844 public Chunk touch() {
845 return this;
846 }
847
848 @Override
849 public boolean release() {
850 if (updater.release(this)) {
851 deallocate();
852 return true;
853 }
854 return false;
855 }
856
857 @Override
858 public boolean release(int decrement) {
859 if (updater.release(this, decrement)) {
860 deallocate();
861 return true;
862 }
863 return false;
864 }
865
866 private void deallocate() {
867 Magazine mag = magazine;
868 AdaptivePoolingAllocator parent = mag.parent;
869 int chunkSize = mag.preferredChunkSize();
870 int memSize = delegate.capacity();
871 if (!pooled || memSize < chunkSize || memSize > chunkSize + (chunkSize >> 1)) {
872
873
874 detachFromMagazine();
875 delegate.release();
876 } else {
877 updater.resetRefCnt(this);
878 delegate.setIndex(0, 0);
879 allocatedBytes = 0;
880 if (!mag.trySetNextInLine(this)) {
881
882 detachFromMagazine();
883 if (!parent.offerToQueue(this)) {
884
885
886 boolean released = updater.release(this);
887 delegate.release();
888 assert released;
889 }
890 }
891 }
892 }
893
894 public void readInitInto(AdaptiveByteBuf buf, int size, int maxCapacity) {
895 int startIndex = allocatedBytes;
896 allocatedBytes = startIndex + size;
897 Chunk chunk = this;
898 chunk.retain();
899 try {
900 buf.init(delegate, chunk, 0, 0, startIndex, size, maxCapacity);
901 chunk = null;
902 } finally {
903 if (chunk != null) {
904
905
906
907 allocatedBytes = startIndex;
908 chunk.release();
909 }
910 }
911 }
912
913 public int remainingCapacity() {
914 return capacity - allocatedBytes;
915 }
916
917 public int capacity() {
918 return capacity;
919 }
920 }
921
922 static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
923
924 private final ObjectPool.Handle<AdaptiveByteBuf> handle;
925
926 private int adjustment;
927 private AbstractByteBuf rootParent;
928 Chunk chunk;
929 private int length;
930 private ByteBuffer tmpNioBuf;
931 private boolean hasArray;
932 private boolean hasMemoryAddress;
933
934 AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
935 super(0);
936 handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
937 }
938
939 void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
940 int adjustment, int capacity, int maxCapacity) {
941 this.adjustment = adjustment;
942 chunk = wrapped;
943 length = capacity;
944 maxCapacity(maxCapacity);
945 setIndex0(readerIndex, writerIndex);
946 hasArray = unwrapped.hasArray();
947 hasMemoryAddress = unwrapped.hasMemoryAddress();
948 rootParent = unwrapped;
949 tmpNioBuf = unwrapped.internalNioBuffer(adjustment, capacity).slice();
950 }
951
952 private AbstractByteBuf rootParent() {
953 final AbstractByteBuf rootParent = this.rootParent;
954 if (rootParent != null) {
955 return rootParent;
956 }
957 throw new IllegalReferenceCountException();
958 }
959
960 @Override
961 public int capacity() {
962 return length;
963 }
964
965 @Override
966 public ByteBuf capacity(int newCapacity) {
967 if (newCapacity == capacity()) {
968 ensureAccessible();
969 return this;
970 }
971 checkNewCapacity(newCapacity);
972 if (newCapacity < capacity()) {
973 length = newCapacity;
974 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
975 return this;
976 }
977
978
979 ByteBuffer data = tmpNioBuf;
980 data.clear();
981 tmpNioBuf = null;
982 Chunk chunk = this.chunk;
983 AdaptivePoolingAllocator allocator = chunk.allocator;
984 int readerIndex = this.readerIndex;
985 int writerIndex = this.writerIndex;
986 allocator.allocate(newCapacity, maxCapacity(), this);
987 tmpNioBuf.put(data);
988 tmpNioBuf.clear();
989 chunk.release();
990 this.readerIndex = readerIndex;
991 this.writerIndex = writerIndex;
992 return this;
993 }
994
995 @Override
996 public ByteBufAllocator alloc() {
997 return rootParent().alloc();
998 }
999
1000 @Override
1001 public ByteOrder order() {
1002 return rootParent().order();
1003 }
1004
1005 @Override
1006 public ByteBuf unwrap() {
1007 return null;
1008 }
1009
1010 @Override
1011 public boolean isDirect() {
1012 return rootParent().isDirect();
1013 }
1014
1015 @Override
1016 public int arrayOffset() {
1017 return idx(rootParent().arrayOffset());
1018 }
1019
1020 @Override
1021 public boolean hasMemoryAddress() {
1022 return hasMemoryAddress;
1023 }
1024
1025 @Override
1026 public long memoryAddress() {
1027 ensureAccessible();
1028 return rootParent().memoryAddress() + adjustment;
1029 }
1030
1031 @Override
1032 public ByteBuffer nioBuffer(int index, int length) {
1033 checkIndex(index, length);
1034 return rootParent().nioBuffer(idx(index), length);
1035 }
1036
1037 @Override
1038 public ByteBuffer internalNioBuffer(int index, int length) {
1039 checkIndex(index, length);
1040 return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1041 }
1042
1043 private ByteBuffer internalNioBuffer() {
1044 return (ByteBuffer) tmpNioBuf.clear();
1045 }
1046
1047 @Override
1048 public ByteBuffer[] nioBuffers(int index, int length) {
1049 checkIndex(index, length);
1050 return rootParent().nioBuffers(idx(index), length);
1051 }
1052
1053 @Override
1054 public boolean hasArray() {
1055 return hasArray;
1056 }
1057
1058 @Override
1059 public byte[] array() {
1060 ensureAccessible();
1061 return rootParent().array();
1062 }
1063
1064 @Override
1065 public ByteBuf copy(int index, int length) {
1066 checkIndex(index, length);
1067 return rootParent().copy(idx(index), length);
1068 }
1069
1070 @Override
1071 public int nioBufferCount() {
1072 return rootParent().nioBufferCount();
1073 }
1074
1075 @Override
1076 protected byte _getByte(int index) {
1077 return rootParent()._getByte(idx(index));
1078 }
1079
1080 @Override
1081 protected short _getShort(int index) {
1082 return rootParent()._getShort(idx(index));
1083 }
1084
1085 @Override
1086 protected short _getShortLE(int index) {
1087 return rootParent()._getShortLE(idx(index));
1088 }
1089
1090 @Override
1091 protected int _getUnsignedMedium(int index) {
1092 return rootParent()._getUnsignedMedium(idx(index));
1093 }
1094
1095 @Override
1096 protected int _getUnsignedMediumLE(int index) {
1097 return rootParent()._getUnsignedMediumLE(idx(index));
1098 }
1099
1100 @Override
1101 protected int _getInt(int index) {
1102 return rootParent()._getInt(idx(index));
1103 }
1104
1105 @Override
1106 protected int _getIntLE(int index) {
1107 return rootParent()._getIntLE(idx(index));
1108 }
1109
1110 @Override
1111 protected long _getLong(int index) {
1112 return rootParent()._getLong(idx(index));
1113 }
1114
1115 @Override
1116 protected long _getLongLE(int index) {
1117 return rootParent()._getLongLE(idx(index));
1118 }
1119
1120 @Override
1121 public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1122 checkIndex(index, length);
1123 rootParent().getBytes(idx(index), dst, dstIndex, length);
1124 return this;
1125 }
1126
1127 @Override
1128 public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1129 checkIndex(index, length);
1130 rootParent().getBytes(idx(index), dst, dstIndex, length);
1131 return this;
1132 }
1133
1134 @Override
1135 public ByteBuf getBytes(int index, ByteBuffer dst) {
1136 checkIndex(index, dst.remaining());
1137 rootParent().getBytes(idx(index), dst);
1138 return this;
1139 }
1140
1141 @Override
1142 protected void _setByte(int index, int value) {
1143 rootParent()._setByte(idx(index), value);
1144 }
1145
1146 @Override
1147 protected void _setShort(int index, int value) {
1148 rootParent()._setShort(idx(index), value);
1149 }
1150
1151 @Override
1152 protected void _setShortLE(int index, int value) {
1153 rootParent()._setShortLE(idx(index), value);
1154 }
1155
1156 @Override
1157 protected void _setMedium(int index, int value) {
1158 rootParent()._setMedium(idx(index), value);
1159 }
1160
1161 @Override
1162 protected void _setMediumLE(int index, int value) {
1163 rootParent()._setMediumLE(idx(index), value);
1164 }
1165
1166 @Override
1167 protected void _setInt(int index, int value) {
1168 rootParent()._setInt(idx(index), value);
1169 }
1170
1171 @Override
1172 protected void _setIntLE(int index, int value) {
1173 rootParent()._setIntLE(idx(index), value);
1174 }
1175
1176 @Override
1177 protected void _setLong(int index, long value) {
1178 rootParent()._setLong(idx(index), value);
1179 }
1180
1181 @Override
1182 protected void _setLongLE(int index, long value) {
1183 rootParent().setLongLE(idx(index), value);
1184 }
1185
1186 @Override
1187 public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1188 checkIndex(index, length);
1189 rootParent().setBytes(idx(index), src, srcIndex, length);
1190 return this;
1191 }
1192
1193 @Override
1194 public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1195 checkIndex(index, length);
1196 rootParent().setBytes(idx(index), src, srcIndex, length);
1197 return this;
1198 }
1199
1200 @Override
1201 public ByteBuf setBytes(int index, ByteBuffer src) {
1202 checkIndex(index, src.remaining());
1203 rootParent().setBytes(idx(index), src);
1204 return this;
1205 }
1206
1207 @Override
1208 public ByteBuf getBytes(int index, OutputStream out, int length)
1209 throws IOException {
1210 checkIndex(index, length);
1211 if (length != 0) {
1212 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1213 }
1214 return this;
1215 }
1216
1217 @Override
1218 public int getBytes(int index, GatheringByteChannel out, int length)
1219 throws IOException {
1220 return out.write(internalNioBuffer(index, length).duplicate());
1221 }
1222
1223 @Override
1224 public int getBytes(int index, FileChannel out, long position, int length)
1225 throws IOException {
1226 return out.write(internalNioBuffer(index, length).duplicate(), position);
1227 }
1228
1229 @Override
1230 public int setBytes(int index, InputStream in, int length)
1231 throws IOException {
1232 checkIndex(index, length);
1233 final AbstractByteBuf rootParent = rootParent();
1234 if (rootParent.hasArray()) {
1235 return rootParent.setBytes(idx(index), in, length);
1236 }
1237 byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1238 int readBytes = in.read(tmp, 0, length);
1239 if (readBytes <= 0) {
1240 return readBytes;
1241 }
1242 setBytes(index, tmp, 0, readBytes);
1243 return readBytes;
1244 }
1245
1246 @Override
1247 public int setBytes(int index, ScatteringByteChannel in, int length)
1248 throws IOException {
1249 try {
1250 return in.read(internalNioBuffer(index, length).duplicate());
1251 } catch (ClosedChannelException ignored) {
1252 return -1;
1253 }
1254 }
1255
1256 @Override
1257 public int setBytes(int index, FileChannel in, long position, int length)
1258 throws IOException {
1259 try {
1260 return in.read(internalNioBuffer(index, length).duplicate(), position);
1261 } catch (ClosedChannelException ignored) {
1262 return -1;
1263 }
1264 }
1265
1266 @Override
1267 public int forEachByte(int index, int length, ByteProcessor processor) {
1268 checkIndex(index, length);
1269 int ret = rootParent().forEachByte(idx(index), length, processor);
1270 return forEachResult(ret);
1271 }
1272
1273 @Override
1274 public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1275 checkIndex(index, length);
1276 int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1277 return forEachResult(ret);
1278 }
1279
1280 private int forEachResult(int ret) {
1281 if (ret < adjustment) {
1282 return -1;
1283 }
1284 return ret - adjustment;
1285 }
1286
1287 @Override
1288 public boolean isContiguous() {
1289 return rootParent().isContiguous();
1290 }
1291
1292 private int idx(int index) {
1293 return index + adjustment;
1294 }
1295
1296 @Override
1297 protected void deallocate() {
1298 if (chunk != null) {
1299 chunk.release();
1300 }
1301 tmpNioBuf = null;
1302 chunk = null;
1303 rootParent = null;
1304 if (handle instanceof EnhancedHandle) {
1305 EnhancedHandle<AdaptiveByteBuf> enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1306 enhancedHandle.unguardedRecycle(this);
1307 } else {
1308 handle.recycle(this);
1309 }
1310 }
1311 }
1312
1313
1314
1315
1316 interface ChunkAllocator {
1317
1318
1319
1320
1321
1322
1323 AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1324 }
1325 }