1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.ByteProcessor;
19 import io.netty.util.IllegalReferenceCountException;
20 import io.netty.util.NettyRuntime;
21 import io.netty.util.Recycler.EnhancedHandle;
22 import io.netty.util.ReferenceCounted;
23 import io.netty.util.concurrent.FastThreadLocal;
24 import io.netty.util.concurrent.FastThreadLocalThread;
25 import io.netty.util.internal.ObjectPool;
26 import io.netty.util.internal.ObjectUtil;
27 import io.netty.util.internal.PlatformDependent;
28 import io.netty.util.internal.ReferenceCountUpdater;
29 import io.netty.util.internal.SuppressJava6Requirement;
30 import io.netty.util.internal.SystemPropertyUtil;
31 import io.netty.util.internal.ThreadExecutorMap;
32 import io.netty.util.internal.UnstableApi;
33
34 import java.io.IOException;
35 import java.io.InputStream;
36 import java.io.OutputStream;
37 import java.nio.ByteBuffer;
38 import java.nio.ByteOrder;
39 import java.nio.channels.ClosedChannelException;
40 import java.nio.channels.FileChannel;
41 import java.nio.channels.GatheringByteChannel;
42 import java.nio.channels.ScatteringByteChannel;
43 import java.util.Arrays;
44 import java.util.Queue;
45 import java.util.Set;
46 import java.util.concurrent.ConcurrentLinkedQueue;
47 import java.util.concurrent.CopyOnWriteArraySet;
48 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
49 import java.util.concurrent.atomic.AtomicLong;
50 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
51 import java.util.concurrent.locks.StampedLock;
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79 @SuppressJava6Requirement(reason = "Guarded by version check")
80 @UnstableApi
81 final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.AdaptiveAllocatorApi {
82
83 enum MagazineCaching {
84 EventLoopThreads,
85 FastThreadLocalThreads,
86 None
87 }
88
89 private static final int EXPANSION_ATTEMPTS = 3;
90 private static final int INITIAL_MAGAZINES = 4;
91 private static final int RETIRE_CAPACITY = 4 * 1024;
92 private static final int MIN_CHUNK_SIZE = 128 * 1024;
93 private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
94 private static final int BUFS_PER_CHUNK = 10;
95
96
97
98
99
100
101 private static final int MAX_CHUNK_SIZE =
102 BUFS_PER_CHUNK * (1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT);
103
104
105
106
107
108
109
110
111
112 private static final int CENTRAL_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
113 "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors());
114
115
116
117
118
119 private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
120 "io.netty.allocator.magazineBufferQueueCapacity", 1024);
121
122 private static final Object NO_MAGAZINE = Boolean.TRUE;
123
124 private final ChunkAllocator chunkAllocator;
125 private final Queue<Chunk> centralQueue;
126 private final StampedLock magazineExpandLock;
127 private volatile Magazine[] magazines;
128 private final FastThreadLocal<Object> threadLocalMagazine;
129 private final Set<Magazine> liveCachedMagazines;
130 private volatile boolean freed;
131
132 AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
133 ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
134 ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
135 this.chunkAllocator = chunkAllocator;
136 centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
137 magazineExpandLock = new StampedLock();
138 if (magazineCaching != MagazineCaching.None) {
139 assert magazineCaching == MagazineCaching.EventLoopThreads ||
140 magazineCaching == MagazineCaching.FastThreadLocalThreads;
141 final boolean cachedMagazinesNonEventLoopThreads =
142 magazineCaching == MagazineCaching.FastThreadLocalThreads;
143 final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
144 threadLocalMagazine = new FastThreadLocal<Object>() {
145 @Override
146 protected Object initialValue() {
147 if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
148 Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
149
150 if (FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread())) {
151
152
153 liveMagazines.add(mag);
154 }
155 return mag;
156 }
157 return NO_MAGAZINE;
158 }
159
160 @Override
161 protected void onRemoval(final Object value) throws Exception {
162 if (value != NO_MAGAZINE) {
163 liveMagazines.remove(value);
164 }
165 }
166 };
167 liveCachedMagazines = liveMagazines;
168 } else {
169 threadLocalMagazine = null;
170 liveCachedMagazines = null;
171 }
172 Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
173 for (int i = 0; i < mags.length; i++) {
174 mags[i] = new Magazine(this);
175 }
176 magazines = mags;
177 }
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199 private static Queue<Chunk> createSharedChunkQueue() {
200 return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
201 }
202
203 @Override
204 public ByteBuf allocate(int size, int maxCapacity) {
205 return allocate(size, maxCapacity, Thread.currentThread(), null);
206 }
207
208 private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
209 if (size <= MAX_CHUNK_SIZE) {
210 int sizeBucket = AllocationStatistics.sizeBucket(size);
211 FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
212 if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
213 Object mag = threadLocalMagazine.get();
214 if (mag != NO_MAGAZINE) {
215 Magazine magazine = (Magazine) mag;
216 if (buf == null) {
217 buf = magazine.newBuffer();
218 }
219 boolean allocated = magazine.tryAllocate(size, sizeBucket, maxCapacity, buf);
220 assert allocated : "Allocation of threadLocalMagazine must always succeed";
221 return buf;
222 }
223 }
224 long threadId = currentThread.getId();
225 Magazine[] mags;
226 int expansions = 0;
227 do {
228 mags = magazines;
229 int mask = mags.length - 1;
230 int index = (int) (threadId & mask);
231 for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
232 Magazine mag = mags[index + i & mask];
233 if (buf == null) {
234 buf = mag.newBuffer();
235 }
236 if (mag.tryAllocate(size, sizeBucket, maxCapacity, buf)) {
237
238 return buf;
239 }
240 }
241 expansions++;
242 } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
243 }
244
245
246 return allocateFallback(size, maxCapacity, currentThread, buf);
247 }
248
249 private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
250
251 Magazine magazine;
252 if (buf != null) {
253 Chunk chunk = buf.chunk;
254 magazine = chunk != null && chunk != Magazine.MAGAZINE_FREED?
255 chunk.magazine : getFallbackMagazine(currentThread);
256 } else {
257 magazine = getFallbackMagazine(currentThread);
258 buf = magazine.newBuffer();
259 }
260
261 AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
262 Chunk chunk = new Chunk(innerChunk, magazine, false);
263 try {
264 chunk.readInitInto(buf, size, maxCapacity);
265 } finally {
266
267
268
269 chunk.release();
270 }
271 return buf;
272 }
273
274 private Magazine getFallbackMagazine(Thread currentThread) {
275 Object tlMag;
276 FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
277 if (threadLocalMagazine != null &&
278 currentThread instanceof FastThreadLocalThread &&
279 (tlMag = threadLocalMagazine.get()) != NO_MAGAZINE) {
280 return (Magazine) tlMag;
281 }
282 Magazine[] mags = magazines;
283 return mags[(int) currentThread.getId() & mags.length - 1];
284 }
285
286
287
288
289 void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
290 AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
291 assert result == into: "Re-allocation created separate buffer instance";
292 }
293
294 @Override
295 public long usedMemory() {
296 long sum = 0;
297 for (Chunk chunk : centralQueue) {
298 sum += chunk.capacity();
299 }
300 for (Magazine magazine : magazines) {
301 sum += magazine.usedMemory.get();
302 }
303 if (liveCachedMagazines != null) {
304 for (Magazine magazine : liveCachedMagazines) {
305 sum += magazine.usedMemory.get();
306 }
307 }
308 return sum;
309 }
310
311 private boolean tryExpandMagazines(int currentLength) {
312 if (currentLength >= MAX_STRIPES) {
313 return true;
314 }
315 long writeLock = magazineExpandLock.tryWriteLock();
316 if (writeLock != 0) {
317 try {
318 Magazine[] mags = magazines;
319 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
320 return true;
321 }
322 int preferredChunkSize = mags[0].sharedPrefChunkSize;
323 Magazine[] expanded = new Magazine[mags.length * 2];
324 for (int i = 0, l = expanded.length; i < l; i++) {
325 Magazine m = new Magazine(this);
326 m.localPrefChunkSize = preferredChunkSize;
327 m.sharedPrefChunkSize = preferredChunkSize;
328 expanded[i] = m;
329 }
330 magazines = expanded;
331 for (Magazine magazine : mags) {
332 magazine.free();
333 }
334 } finally {
335 magazineExpandLock.unlockWrite(writeLock);
336 }
337 }
338 return true;
339 }
340
341 private boolean offerToQueue(Chunk buffer) {
342 if (freed) {
343 return false;
344 }
345 return centralQueue.offer(buffer);
346 }
347
348
349
350
351 @Override
352 protected void finalize() throws Throwable {
353 try {
354 super.finalize();
355 } finally {
356 free();
357 }
358 }
359
360 private void free() {
361 freed = true;
362 long stamp = magazineExpandLock.writeLock();
363 try {
364 Magazine[] mags = magazines;
365 for (Magazine magazine : mags) {
366 magazine.free();
367 }
368 } finally {
369 magazineExpandLock.unlockWrite(stamp);
370 }
371 for (;;) {
372 Chunk chunk = centralQueue.poll();
373 if (chunk == null) {
374 break;
375 }
376 chunk.release();
377 }
378 }
379
380 static int sizeBucket(int size) {
381 return AllocationStatistics.sizeBucket(size);
382 }
383
384 @SuppressWarnings("checkstyle:finalclass")
385 private static class AllocationStatistics {
386 private static final int MIN_DATUM_TARGET = 1024;
387 private static final int MAX_DATUM_TARGET = 65534;
388 private static final int INIT_DATUM_TARGET = 9;
389 private static final int HISTO_MIN_BUCKET_SHIFT = 13;
390 private static final int HISTO_MAX_BUCKET_SHIFT = 20;
391 private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT;
392 private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
393 private static final int SIZE_MAX_MASK = MAX_CHUNK_SIZE - 1;
394
395 protected final AdaptivePoolingAllocator parent;
396 private final boolean shareable;
397 private final short[][] histos = {
398 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
399 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
400 };
401 private short[] histo = histos[0];
402 private final int[] sums = new int[HISTO_BUCKET_COUNT];
403
404 private int histoIndex;
405 private int datumCount;
406 private int datumTarget = INIT_DATUM_TARGET;
407 protected volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
408 protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
409
410 private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
411 this.parent = parent;
412 this.shareable = shareable;
413 }
414
415 protected void recordAllocationSize(int bucket) {
416 histo[bucket]++;
417 if (datumCount++ == datumTarget) {
418 rotateHistograms();
419 }
420 }
421
422 static int sizeBucket(int size) {
423 if (size == 0) {
424 return 0;
425 }
426
427
428
429
430 int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & SIZE_MAX_MASK;
431 return Math.min(Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize), HISTO_MAX_BUCKET_MASK);
432 }
433
434 private void rotateHistograms() {
435 short[][] hs = histos;
436 for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
437 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
438 }
439 int sum = 0;
440 for (int count : sums) {
441 sum += count;
442 }
443 int targetPercentile = (int) (sum * 0.99);
444 int sizeBucket = 0;
445 for (; sizeBucket < sums.length; sizeBucket++) {
446 if (sums[sizeBucket] > targetPercentile) {
447 break;
448 }
449 targetPercentile -= sums[sizeBucket];
450 }
451 int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
452 int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
453 localPrefChunkSize = prefChunkSize;
454 if (shareable) {
455 for (Magazine mag : parent.magazines) {
456 prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
457 }
458 }
459 if (sharedPrefChunkSize != prefChunkSize) {
460
461 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
462 sharedPrefChunkSize = prefChunkSize;
463 } else {
464
465 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
466 }
467
468 histoIndex = histoIndex + 1 & 3;
469 histo = histos[histoIndex];
470 datumCount = 0;
471 Arrays.fill(histo, (short) 0);
472 }
473
474
475
476
477
478
479
480
481
482 protected int preferredChunkSize() {
483 return sharedPrefChunkSize;
484 }
485 }
486
487 @SuppressJava6Requirement(reason = "Guarded by version check")
488 private static final class Magazine extends AllocationStatistics {
489 private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
490 static {
491 NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
492 }
493 private static final Chunk MAGAZINE_FREED = new Chunk();
494
495 private static final ObjectPool<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
496 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
497 @Override
498 public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
499 return new AdaptiveByteBuf(handle);
500 }
501 });
502
503 private Chunk current;
504 @SuppressWarnings("unused")
505 private volatile Chunk nextInLine;
506 private final AtomicLong usedMemory;
507 private final StampedLock allocationLock;
508 private final Queue<AdaptiveByteBuf> bufferQueue;
509 private final ObjectPool.Handle<AdaptiveByteBuf> handle;
510
511 Magazine(AdaptivePoolingAllocator parent) {
512 this(parent, true);
513 }
514
515 Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
516 super(parent, shareable);
517
518 if (shareable) {
519
520 allocationLock = new StampedLock();
521 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
522 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
523 @Override
524 public void recycle(AdaptiveByteBuf self) {
525 bufferQueue.offer(self);
526 }
527 };
528 } else {
529 allocationLock = null;
530 bufferQueue = null;
531 handle = null;
532 }
533 usedMemory = new AtomicLong();
534 }
535
536 public boolean tryAllocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
537 if (allocationLock == null) {
538
539 return allocate(size, sizeBucket, maxCapacity, buf);
540 }
541
542
543 long writeLock = allocationLock.tryWriteLock();
544 if (writeLock != 0) {
545 try {
546 return allocate(size, sizeBucket, maxCapacity, buf);
547 } finally {
548 allocationLock.unlockWrite(writeLock);
549 }
550 }
551 return false;
552 }
553
554 private boolean allocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
555 recordAllocationSize(sizeBucket);
556 Chunk curr = current;
557 if (curr != null) {
558 if (curr.remainingCapacity() > size) {
559 curr.readInitInto(buf, size, maxCapacity);
560
561 return true;
562 }
563
564
565
566 current = null;
567 if (curr.remainingCapacity() == size) {
568 try {
569 curr.readInitInto(buf, size, maxCapacity);
570 return true;
571 } finally {
572 curr.release();
573 }
574 }
575 }
576 Chunk last = curr;
577 assert current == null;
578
579
580
581
582
583
584
585 if (nextInLine != null) {
586 curr = NEXT_IN_LINE.getAndSet(this, null);
587 if (curr == MAGAZINE_FREED) {
588
589 restoreMagazineFreed();
590 return false;
591 }
592 } else {
593 curr = parent.centralQueue.poll();
594 if (curr == null) {
595 curr = newChunkAllocation(size);
596 }
597 }
598 current = curr;
599 assert current != null;
600 if (last != null) {
601 if (last.remainingCapacity() < RETIRE_CAPACITY) {
602 last.release();
603 } else {
604 transferChunk(last);
605 }
606 }
607 if (curr.remainingCapacity() > size) {
608 curr.readInitInto(buf, size, maxCapacity);
609 } else if (curr.remainingCapacity() == size) {
610 try {
611 curr.readInitInto(buf, size, maxCapacity);
612 } finally {
613
614
615 curr.release();
616 current = null;
617 }
618 } else {
619 Chunk newChunk = newChunkAllocation(size);
620 try {
621 newChunk.readInitInto(buf, size, maxCapacity);
622 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
623 curr.release();
624 current = newChunk;
625 } else {
626 transferChunk(newChunk);
627 }
628 newChunk = null;
629 } finally {
630 if (newChunk != null) {
631 assert current == null;
632
633 newChunk.release();
634 }
635 }
636 }
637 return true;
638 }
639
640 private void restoreMagazineFreed() {
641 Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
642 if (next != null && next != MAGAZINE_FREED) {
643 next.release();
644 }
645 }
646
647 private void transferChunk(Chunk current) {
648 if (NEXT_IN_LINE.compareAndSet(this, null, current)
649 || parent.offerToQueue(current)) {
650 return;
651 }
652 Chunk nextChunk = NEXT_IN_LINE.get(this);
653 if (nextChunk != null && current.remainingCapacity() > nextChunk.remainingCapacity()) {
654 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, current)) {
655 if (nextChunk != MAGAZINE_FREED) {
656 nextChunk.release();
657 }
658 return;
659 }
660 }
661
662
663 current.release();
664 }
665
666 private Chunk newChunkAllocation(int promptingSize) {
667 int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
668 ChunkAllocator chunkAllocator = parent.chunkAllocator;
669 return new Chunk(chunkAllocator.allocate(size, size), this, true);
670 }
671
672 boolean trySetNextInLine(Chunk chunk) {
673 return NEXT_IN_LINE.compareAndSet(this, null, chunk);
674 }
675
676 void free() {
677
678 restoreMagazineFreed();
679 long stamp = allocationLock.writeLock();
680 try {
681 if (current != null) {
682 current.release();
683 current = null;
684 }
685 } finally {
686 allocationLock.unlockWrite(stamp);
687 }
688 }
689
690 public AdaptiveByteBuf newBuffer() {
691 AdaptiveByteBuf buf;
692 if (handle == null) {
693 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
694 } else {
695 buf = bufferQueue.poll();
696 if (buf == null) {
697 buf = new AdaptiveByteBuf(handle);
698 }
699 }
700 buf.resetRefCnt();
701 buf.discardMarks();
702 return buf;
703 }
704 }
705
706 private static final class Chunk implements ReferenceCounted {
707
708 private final AbstractByteBuf delegate;
709 final Magazine magazine;
710 private final int capacity;
711 private final boolean pooled;
712 private int allocatedBytes;
713 private static final long REFCNT_FIELD_OFFSET =
714 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
715 private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
716 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
717
718 private static final ReferenceCountUpdater<Chunk> updater =
719 new ReferenceCountUpdater<Chunk>() {
720 @Override
721 protected AtomicIntegerFieldUpdater<Chunk> updater() {
722 return AIF_UPDATER;
723 }
724 @Override
725 protected long unsafeOffset() {
726 return REFCNT_FIELD_OFFSET;
727 }
728 };
729
730
731 @SuppressWarnings({"unused", "FieldMayBeFinal"})
732 private volatile int refCnt;
733
734 Chunk() {
735
736 delegate = null;
737 magazine = null;
738 capacity = 0;
739 pooled = false;
740 }
741
742 Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
743 this.delegate = delegate;
744 this.magazine = magazine;
745 this.pooled = pooled;
746 capacity = delegate.capacity();
747 magazine.usedMemory.getAndAdd(capacity);
748 updater.setInitialValue(this);
749 }
750
751 @Override
752 public Chunk touch(Object hint) {
753 return this;
754 }
755
756 @Override
757 public int refCnt() {
758 return updater.refCnt(this);
759 }
760
761 @Override
762 public Chunk retain() {
763 return updater.retain(this);
764 }
765
766 @Override
767 public Chunk retain(int increment) {
768 return updater.retain(this, increment);
769 }
770
771 @Override
772 public Chunk touch() {
773 return this;
774 }
775
776 @Override
777 public boolean release() {
778 if (updater.release(this)) {
779 deallocate();
780 return true;
781 }
782 return false;
783 }
784
785 @Override
786 public boolean release(int decrement) {
787 if (updater.release(this, decrement)) {
788 deallocate();
789 return true;
790 }
791 return false;
792 }
793
794 private void deallocate() {
795 Magazine mag = magazine;
796 AdaptivePoolingAllocator parent = mag.parent;
797 int chunkSize = mag.preferredChunkSize();
798 int memSize = delegate.capacity();
799 if (!pooled || memSize < chunkSize || memSize > chunkSize + (chunkSize >> 1)) {
800
801
802 mag.usedMemory.getAndAdd(-capacity());
803 delegate.release();
804 } else {
805 updater.resetRefCnt(this);
806 delegate.setIndex(0, 0);
807 allocatedBytes = 0;
808 if (!mag.trySetNextInLine(this)) {
809 if (!parent.offerToQueue(this)) {
810
811
812 boolean released = updater.release(this);
813 delegate.release();
814 assert released;
815 }
816 }
817 }
818 }
819
820 public void readInitInto(AdaptiveByteBuf buf, int size, int maxCapacity) {
821 int startIndex = allocatedBytes;
822 allocatedBytes = startIndex + size;
823 Chunk chunk = this;
824 chunk.retain();
825 try {
826 buf.init(delegate, chunk, 0, 0, startIndex, size, maxCapacity);
827 chunk = null;
828 } finally {
829 if (chunk != null) {
830
831
832 chunk.release();
833 }
834 }
835 }
836
837 public int remainingCapacity() {
838 return capacity - allocatedBytes;
839 }
840
841 public int capacity() {
842 return capacity;
843 }
844 }
845
846 static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
847
848 private final ObjectPool.Handle<AdaptiveByteBuf> handle;
849
850 private int adjustment;
851 private AbstractByteBuf rootParent;
852 Chunk chunk;
853 private int length;
854 private ByteBuffer tmpNioBuf;
855 private boolean hasArray;
856 private boolean hasMemoryAddress;
857
858 AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
859 super(0);
860 handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
861 }
862
863 void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
864 int adjustment, int capacity, int maxCapacity) {
865 this.adjustment = adjustment;
866 chunk = wrapped;
867 length = capacity;
868 maxCapacity(maxCapacity);
869 setIndex0(readerIndex, writerIndex);
870 hasArray = unwrapped.hasArray();
871 hasMemoryAddress = unwrapped.hasMemoryAddress();
872 rootParent = unwrapped;
873 tmpNioBuf = unwrapped.internalNioBuffer(adjustment, capacity).slice();
874 }
875
876 private AbstractByteBuf rootParent() {
877 final AbstractByteBuf rootParent = this.rootParent;
878 if (rootParent != null) {
879 return rootParent;
880 }
881 throw new IllegalReferenceCountException();
882 }
883
884 @Override
885 public int capacity() {
886 return length;
887 }
888
889 @Override
890 public ByteBuf capacity(int newCapacity) {
891 if (newCapacity == capacity()) {
892 ensureAccessible();
893 return this;
894 }
895 checkNewCapacity(newCapacity);
896 if (newCapacity < capacity()) {
897 length = newCapacity;
898 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
899 return this;
900 }
901
902
903 ByteBuffer data = tmpNioBuf;
904 data.clear();
905 tmpNioBuf = null;
906 Chunk chunk = this.chunk;
907 Magazine magazine = chunk.magazine;
908 AdaptivePoolingAllocator allocator = magazine.parent;
909 int readerIndex = this.readerIndex;
910 int writerIndex = this.writerIndex;
911 allocator.allocate(newCapacity, maxCapacity(), this);
912 tmpNioBuf.put(data);
913 tmpNioBuf.clear();
914 chunk.release();
915 this.readerIndex = readerIndex;
916 this.writerIndex = writerIndex;
917 return this;
918 }
919
920 @Override
921 public ByteBufAllocator alloc() {
922 return rootParent().alloc();
923 }
924
925 @Override
926 public ByteOrder order() {
927 return rootParent().order();
928 }
929
930 @Override
931 public ByteBuf unwrap() {
932 return null;
933 }
934
935 @Override
936 public boolean isDirect() {
937 return rootParent().isDirect();
938 }
939
940 @Override
941 public int arrayOffset() {
942 return idx(rootParent().arrayOffset());
943 }
944
945 @Override
946 public boolean hasMemoryAddress() {
947 return hasMemoryAddress;
948 }
949
950 @Override
951 public long memoryAddress() {
952 ensureAccessible();
953 return rootParent().memoryAddress() + adjustment;
954 }
955
956 @Override
957 public ByteBuffer nioBuffer(int index, int length) {
958 checkIndex(index, length);
959 return rootParent().nioBuffer(idx(index), length);
960 }
961
962 @Override
963 public ByteBuffer internalNioBuffer(int index, int length) {
964 checkIndex(index, length);
965 return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
966 }
967
968 private ByteBuffer internalNioBuffer() {
969 return (ByteBuffer) tmpNioBuf.clear();
970 }
971
972 @Override
973 public ByteBuffer[] nioBuffers(int index, int length) {
974 checkIndex(index, length);
975 return rootParent().nioBuffers(idx(index), length);
976 }
977
978 @Override
979 public boolean hasArray() {
980 return hasArray;
981 }
982
983 @Override
984 public byte[] array() {
985 ensureAccessible();
986 return rootParent().array();
987 }
988
989 @Override
990 public ByteBuf copy(int index, int length) {
991 checkIndex(index, length);
992 return rootParent().copy(idx(index), length);
993 }
994
995 @Override
996 public int nioBufferCount() {
997 return rootParent().nioBufferCount();
998 }
999
1000 @Override
1001 protected byte _getByte(int index) {
1002 return rootParent()._getByte(idx(index));
1003 }
1004
1005 @Override
1006 protected short _getShort(int index) {
1007 return rootParent()._getShort(idx(index));
1008 }
1009
1010 @Override
1011 protected short _getShortLE(int index) {
1012 return rootParent()._getShortLE(idx(index));
1013 }
1014
1015 @Override
1016 protected int _getUnsignedMedium(int index) {
1017 return rootParent()._getUnsignedMedium(idx(index));
1018 }
1019
1020 @Override
1021 protected int _getUnsignedMediumLE(int index) {
1022 return rootParent()._getUnsignedMediumLE(idx(index));
1023 }
1024
1025 @Override
1026 protected int _getInt(int index) {
1027 return rootParent()._getInt(idx(index));
1028 }
1029
1030 @Override
1031 protected int _getIntLE(int index) {
1032 return rootParent()._getIntLE(idx(index));
1033 }
1034
1035 @Override
1036 protected long _getLong(int index) {
1037 return rootParent()._getLong(idx(index));
1038 }
1039
1040 @Override
1041 protected long _getLongLE(int index) {
1042 return rootParent()._getLongLE(idx(index));
1043 }
1044
1045 @Override
1046 public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1047 checkIndex(index, length);
1048 rootParent().getBytes(idx(index), dst, dstIndex, length);
1049 return this;
1050 }
1051
1052 @Override
1053 public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1054 checkIndex(index, length);
1055 rootParent().getBytes(idx(index), dst, dstIndex, length);
1056 return this;
1057 }
1058
1059 @Override
1060 public ByteBuf getBytes(int index, ByteBuffer dst) {
1061 checkIndex(index, dst.remaining());
1062 rootParent().getBytes(idx(index), dst);
1063 return this;
1064 }
1065
1066 @Override
1067 protected void _setByte(int index, int value) {
1068 rootParent()._setByte(idx(index), value);
1069 }
1070
1071 @Override
1072 protected void _setShort(int index, int value) {
1073 rootParent()._setShort(idx(index), value);
1074 }
1075
1076 @Override
1077 protected void _setShortLE(int index, int value) {
1078 rootParent()._setShortLE(idx(index), value);
1079 }
1080
1081 @Override
1082 protected void _setMedium(int index, int value) {
1083 rootParent()._setMedium(idx(index), value);
1084 }
1085
1086 @Override
1087 protected void _setMediumLE(int index, int value) {
1088 rootParent()._setMediumLE(idx(index), value);
1089 }
1090
1091 @Override
1092 protected void _setInt(int index, int value) {
1093 rootParent()._setInt(idx(index), value);
1094 }
1095
1096 @Override
1097 protected void _setIntLE(int index, int value) {
1098 rootParent()._setIntLE(idx(index), value);
1099 }
1100
1101 @Override
1102 protected void _setLong(int index, long value) {
1103 rootParent()._setLong(idx(index), value);
1104 }
1105
1106 @Override
1107 protected void _setLongLE(int index, long value) {
1108 rootParent().setLongLE(idx(index), value);
1109 }
1110
1111 @Override
1112 public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1113 checkIndex(index, length);
1114 rootParent().setBytes(idx(index), src, srcIndex, length);
1115 return this;
1116 }
1117
1118 @Override
1119 public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1120 checkIndex(index, length);
1121 rootParent().setBytes(idx(index), src, srcIndex, length);
1122 return this;
1123 }
1124
1125 @Override
1126 public ByteBuf setBytes(int index, ByteBuffer src) {
1127 checkIndex(index, src.remaining());
1128 rootParent().setBytes(idx(index), src);
1129 return this;
1130 }
1131
1132 @Override
1133 public ByteBuf getBytes(int index, OutputStream out, int length)
1134 throws IOException {
1135 checkIndex(index, length);
1136 if (length != 0) {
1137 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1138 }
1139 return this;
1140 }
1141
1142 @Override
1143 public int getBytes(int index, GatheringByteChannel out, int length)
1144 throws IOException {
1145 return out.write(internalNioBuffer(index, length).duplicate());
1146 }
1147
1148 @Override
1149 public int getBytes(int index, FileChannel out, long position, int length)
1150 throws IOException {
1151 return out.write(internalNioBuffer(index, length).duplicate(), position);
1152 }
1153
1154 @Override
1155 public int setBytes(int index, InputStream in, int length)
1156 throws IOException {
1157 checkIndex(index, length);
1158 final AbstractByteBuf rootParent = rootParent();
1159 if (rootParent.hasArray()) {
1160 return rootParent.setBytes(idx(index), in, length);
1161 }
1162 byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1163 int readBytes = in.read(tmp, 0, length);
1164 if (readBytes <= 0) {
1165 return readBytes;
1166 }
1167 setBytes(index, tmp, 0, readBytes);
1168 return readBytes;
1169 }
1170
1171 @Override
1172 public int setBytes(int index, ScatteringByteChannel in, int length)
1173 throws IOException {
1174 try {
1175 return in.read(internalNioBuffer(index, length).duplicate());
1176 } catch (ClosedChannelException ignored) {
1177 return -1;
1178 }
1179 }
1180
1181 @Override
1182 public int setBytes(int index, FileChannel in, long position, int length)
1183 throws IOException {
1184 try {
1185 return in.read(internalNioBuffer(index, length).duplicate(), position);
1186 } catch (ClosedChannelException ignored) {
1187 return -1;
1188 }
1189 }
1190
1191 @Override
1192 public int forEachByte(int index, int length, ByteProcessor processor) {
1193 checkIndex(index, length);
1194 int ret = rootParent().forEachByte(idx(index), length, processor);
1195 return forEachResult(ret);
1196 }
1197
1198 @Override
1199 public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1200 checkIndex(index, length);
1201 int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1202 return forEachResult(ret);
1203 }
1204
1205 private int forEachResult(int ret) {
1206 if (ret < adjustment) {
1207 return -1;
1208 }
1209 return ret - adjustment;
1210 }
1211
1212 @Override
1213 public boolean isContiguous() {
1214 return rootParent().isContiguous();
1215 }
1216
1217 private int idx(int index) {
1218 return index + adjustment;
1219 }
1220
1221 @Override
1222 protected void deallocate() {
1223 if (chunk != null) {
1224 chunk.release();
1225 }
1226 tmpNioBuf = null;
1227 chunk = null;
1228 rootParent = null;
1229 if (handle instanceof EnhancedHandle) {
1230 EnhancedHandle<AdaptiveByteBuf> enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1231 enhancedHandle.unguardedRecycle(this);
1232 } else {
1233 handle.recycle(this);
1234 }
1235 }
1236 }
1237
1238
1239
1240
1241 interface ChunkAllocator {
1242
1243
1244
1245
1246
1247
1248 AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1249 }
1250 }