1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.ByteProcessor;
19 import io.netty.util.IllegalReferenceCountException;
20 import io.netty.util.NettyRuntime;
21 import io.netty.util.Recycler.EnhancedHandle;
22 import io.netty.util.ReferenceCounted;
23 import io.netty.util.concurrent.FastThreadLocal;
24 import io.netty.util.concurrent.FastThreadLocalThread;
25 import io.netty.util.internal.ObjectPool;
26 import io.netty.util.internal.ObjectUtil;
27 import io.netty.util.internal.PlatformDependent;
28 import io.netty.util.internal.ReferenceCountUpdater;
29 import io.netty.util.internal.SuppressJava6Requirement;
30 import io.netty.util.internal.SystemPropertyUtil;
31 import io.netty.util.internal.ThreadExecutorMap;
32 import io.netty.util.internal.ThreadLocalRandom;
33 import io.netty.util.internal.UnstableApi;
34
35 import java.io.IOException;
36 import java.io.InputStream;
37 import java.io.OutputStream;
38 import java.nio.ByteBuffer;
39 import java.nio.ByteOrder;
40 import java.nio.channels.ClosedChannelException;
41 import java.nio.channels.FileChannel;
42 import java.nio.channels.GatheringByteChannel;
43 import java.nio.channels.ScatteringByteChannel;
44 import java.util.Arrays;
45 import java.util.Queue;
46 import java.util.Set;
47 import java.util.concurrent.ConcurrentLinkedQueue;
48 import java.util.concurrent.CopyOnWriteArraySet;
49 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
50 import java.util.concurrent.atomic.AtomicLong;
51 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
52 import java.util.concurrent.locks.StampedLock;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 @SuppressJava6Requirement(reason = "Guarded by version check")
81 @UnstableApi
82 final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.AdaptiveAllocatorApi {
83
84 enum MagazineCaching {
85 EventLoopThreads,
86 FastThreadLocalThreads,
87 None
88 }
89
90
91
92
93
94
95
96
97 private static final int MIN_CHUNK_SIZE = 128 * 1024;
98 private static final int EXPANSION_ATTEMPTS = 3;
99 private static final int INITIAL_MAGAZINES = 4;
100 private static final int RETIRE_CAPACITY = 4 * 1024;
101 private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
102 private static final int BUFS_PER_CHUNK = 10;
103
104
105
106
107
108
109 private static final int MAX_CHUNK_SIZE =
110 BUFS_PER_CHUNK * (1 << AllocationStatistics.HISTO_MAX_BUCKET_SHIFT);
111
112
113
114
115
116
117
118
119
120 private static final int CENTRAL_QUEUE_CAPACITY = Math.max(2, SystemPropertyUtil.getInt(
121 "io.netty.allocator.centralQueueCapacity", NettyRuntime.availableProcessors()));
122
123
124
125
126
127 private static final int MAGAZINE_BUFFER_QUEUE_CAPACITY = SystemPropertyUtil.getInt(
128 "io.netty.allocator.magazineBufferQueueCapacity", 1024);
129
130 private static final Object NO_MAGAZINE = Boolean.TRUE;
131
132 private final ChunkAllocator chunkAllocator;
133 private final Queue<Chunk> centralQueue;
134 private final StampedLock magazineExpandLock;
135 private volatile Magazine[] magazines;
136 private final FastThreadLocal<Object> threadLocalMagazine;
137 private final Set<Magazine> liveCachedMagazines;
138 private volatile boolean freed;
139
140 static {
141 if (CENTRAL_QUEUE_CAPACITY < 2) {
142 throw new IllegalArgumentException("CENTRAL_QUEUE_CAPACITY: " + CENTRAL_QUEUE_CAPACITY
143 + " (expected: >= " + 2 + ')');
144 }
145 if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
146 throw new IllegalArgumentException("MAGAZINE_BUFFER_QUEUE_CAPACITY: " + MAGAZINE_BUFFER_QUEUE_CAPACITY
147 + " (expected: >= " + 2 + ')');
148 }
149 }
150
151 AdaptivePoolingAllocator(ChunkAllocator chunkAllocator, MagazineCaching magazineCaching) {
152 ObjectUtil.checkNotNull(chunkAllocator, "chunkAllocator");
153 ObjectUtil.checkNotNull(magazineCaching, "magazineCaching");
154 this.chunkAllocator = chunkAllocator;
155 centralQueue = ObjectUtil.checkNotNull(createSharedChunkQueue(), "centralQueue");
156 magazineExpandLock = new StampedLock();
157 if (magazineCaching != MagazineCaching.None) {
158 assert magazineCaching == MagazineCaching.EventLoopThreads ||
159 magazineCaching == MagazineCaching.FastThreadLocalThreads;
160 final boolean cachedMagazinesNonEventLoopThreads =
161 magazineCaching == MagazineCaching.FastThreadLocalThreads;
162 final Set<Magazine> liveMagazines = new CopyOnWriteArraySet<Magazine>();
163 threadLocalMagazine = new FastThreadLocal<Object>() {
164 @Override
165 protected Object initialValue() {
166 if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
167 if (!FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread())) {
168
169 return NO_MAGAZINE;
170 }
171 Magazine mag = new Magazine(AdaptivePoolingAllocator.this, false);
172 liveMagazines.add(mag);
173 return mag;
174 }
175 return NO_MAGAZINE;
176 }
177
178 @Override
179 protected void onRemoval(final Object value) throws Exception {
180 if (value != NO_MAGAZINE) {
181 liveMagazines.remove(value);
182 }
183 }
184 };
185 liveCachedMagazines = liveMagazines;
186 } else {
187 threadLocalMagazine = null;
188 liveCachedMagazines = null;
189 }
190 Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
191 for (int i = 0; i < mags.length; i++) {
192 mags[i] = new Magazine(this);
193 }
194 magazines = mags;
195 }
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217 private static Queue<Chunk> createSharedChunkQueue() {
218 return PlatformDependent.newFixedMpmcQueue(CENTRAL_QUEUE_CAPACITY);
219 }
220
221 @Override
222 public ByteBuf allocate(int size, int maxCapacity) {
223 return allocate(size, maxCapacity, Thread.currentThread(), null);
224 }
225
226 private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
227 if (size <= MAX_CHUNK_SIZE) {
228 int sizeBucket = AllocationStatistics.sizeBucket(size);
229 FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
230 if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread) {
231 Object mag = threadLocalMagazine.get();
232 if (mag != NO_MAGAZINE) {
233 Magazine magazine = (Magazine) mag;
234 if (buf == null) {
235 buf = magazine.newBuffer();
236 }
237 boolean allocated = magazine.tryAllocate(size, sizeBucket, maxCapacity, buf);
238 assert allocated : "Allocation of threadLocalMagazine must always succeed";
239 return buf;
240 }
241 }
242 long threadId = currentThread.getId();
243 Magazine[] mags;
244 int expansions = 0;
245 do {
246 mags = magazines;
247 int mask = mags.length - 1;
248 int index = (int) (threadId & mask);
249 for (int i = 0, m = Integer.numberOfTrailingZeros(~mask); i < m; i++) {
250 Magazine mag = mags[index + i & mask];
251 if (buf == null) {
252 buf = mag.newBuffer();
253 }
254 if (mag.tryAllocate(size, sizeBucket, maxCapacity, buf)) {
255
256 return buf;
257 }
258 }
259 expansions++;
260 } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines(mags.length));
261 }
262
263
264 return allocateFallback(size, maxCapacity, currentThread, buf);
265 }
266
267 private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
268
269 Magazine magazine;
270 if (buf != null) {
271 Chunk chunk = buf.chunk;
272 if (chunk == null || chunk == Magazine.MAGAZINE_FREED || (magazine = chunk.currentMagazine()) == null) {
273 magazine = getFallbackMagazine(currentThread);
274 }
275 } else {
276 magazine = getFallbackMagazine(currentThread);
277 buf = magazine.newBuffer();
278 }
279
280 AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
281 Chunk chunk = new Chunk(innerChunk, magazine, false);
282 try {
283 chunk.readInitInto(buf, size, maxCapacity);
284 } finally {
285
286
287
288 chunk.release();
289 }
290 return buf;
291 }
292
293 private Magazine getFallbackMagazine(Thread currentThread) {
294 Object tlMag;
295 FastThreadLocal<Object> threadLocalMagazine = this.threadLocalMagazine;
296 if (threadLocalMagazine != null &&
297 currentThread instanceof FastThreadLocalThread &&
298 (tlMag = threadLocalMagazine.get()) != NO_MAGAZINE) {
299 return (Magazine) tlMag;
300 }
301 Magazine[] mags = magazines;
302 return mags[(int) currentThread.getId() & mags.length - 1];
303 }
304
305
306
307
308 void allocate(int size, int maxCapacity, AdaptiveByteBuf into) {
309 AdaptiveByteBuf result = allocate(size, maxCapacity, Thread.currentThread(), into);
310 assert result == into: "Re-allocation created separate buffer instance";
311 }
312
313 @Override
314 public long usedMemory() {
315 long sum = 0;
316 for (Chunk chunk : centralQueue) {
317 sum += chunk.capacity();
318 }
319 for (Magazine magazine : magazines) {
320 sum += magazine.usedMemory.get();
321 }
322 if (liveCachedMagazines != null) {
323 for (Magazine magazine : liveCachedMagazines) {
324 sum += magazine.usedMemory.get();
325 }
326 }
327 return sum;
328 }
329
330 private boolean tryExpandMagazines(int currentLength) {
331 if (currentLength >= MAX_STRIPES) {
332 return true;
333 }
334 final Magazine[] mags;
335 long writeLock = magazineExpandLock.tryWriteLock();
336 if (writeLock != 0) {
337 try {
338 mags = magazines;
339 if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
340 return true;
341 }
342 int preferredChunkSize = mags[0].sharedPrefChunkSize;
343 Magazine[] expanded = new Magazine[mags.length * 2];
344 for (int i = 0, l = expanded.length; i < l; i++) {
345 Magazine m = new Magazine(this);
346 m.localPrefChunkSize = preferredChunkSize;
347 m.sharedPrefChunkSize = preferredChunkSize;
348 expanded[i] = m;
349 }
350 magazines = expanded;
351 } finally {
352 magazineExpandLock.unlockWrite(writeLock);
353 }
354 for (Magazine magazine : mags) {
355 magazine.free();
356 }
357 }
358 return true;
359 }
360
361 private boolean offerToQueue(Chunk buffer) {
362 if (freed) {
363 return false;
364 }
365
366 assert buffer.allocatedBytes == 0;
367 assert buffer.magazine == null;
368
369 boolean isAdded = centralQueue.offer(buffer);
370 if (freed && isAdded) {
371
372 freeCentralQueue();
373 }
374 return isAdded;
375 }
376
377
378
379
380 @Override
381 protected void finalize() throws Throwable {
382 try {
383 super.finalize();
384 } finally {
385 free();
386 }
387 }
388
389 private void free() {
390 freed = true;
391 long stamp = magazineExpandLock.writeLock();
392 try {
393 Magazine[] mags = magazines;
394 for (Magazine magazine : mags) {
395 magazine.free();
396 }
397 } finally {
398 magazineExpandLock.unlockWrite(stamp);
399 }
400 freeCentralQueue();
401 }
402
403 private void freeCentralQueue() {
404 for (;;) {
405 Chunk chunk = centralQueue.poll();
406 if (chunk == null) {
407 break;
408 }
409 chunk.release();
410 }
411 }
412
413 static int sizeBucket(int size) {
414 return AllocationStatistics.sizeBucket(size);
415 }
416
417 @SuppressWarnings("checkstyle:finalclass")
418 private static class AllocationStatistics {
419 private static final int MIN_DATUM_TARGET = 1024;
420 private static final int MAX_DATUM_TARGET = 65534;
421 private static final int INIT_DATUM_TARGET = 9;
422 private static final int HISTO_MIN_BUCKET_SHIFT = 13;
423 private static final int HISTO_MAX_BUCKET_SHIFT = 20;
424 private static final int HISTO_BUCKET_COUNT = 1 + HISTO_MAX_BUCKET_SHIFT - HISTO_MIN_BUCKET_SHIFT;
425 private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1;
426 private static final int SIZE_MAX_MASK = MAX_CHUNK_SIZE - 1;
427
428 protected final AdaptivePoolingAllocator parent;
429 private final boolean shareable;
430 private final short[][] histos = {
431 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
432 new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
433 };
434 private short[] histo = histos[0];
435 private final int[] sums = new int[HISTO_BUCKET_COUNT];
436
437 private int histoIndex;
438 private int datumCount;
439 private int datumTarget = INIT_DATUM_TARGET;
440 protected volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
441 protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
442
443 private AllocationStatistics(AdaptivePoolingAllocator parent, boolean shareable) {
444 this.parent = parent;
445 this.shareable = shareable;
446 }
447
448 protected void recordAllocationSize(int bucket) {
449 histo[bucket]++;
450 if (datumCount++ == datumTarget) {
451 rotateHistograms();
452 }
453 }
454
455 static int sizeBucket(int size) {
456 if (size == 0) {
457 return 0;
458 }
459
460
461
462
463 int normalizedSize = size - 1 >> HISTO_MIN_BUCKET_SHIFT & SIZE_MAX_MASK;
464 return Math.min(Integer.SIZE - Integer.numberOfLeadingZeros(normalizedSize), HISTO_MAX_BUCKET_MASK);
465 }
466
467 private void rotateHistograms() {
468 short[][] hs = histos;
469 for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
470 sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
471 }
472 int sum = 0;
473 for (int count : sums) {
474 sum += count;
475 }
476 int targetPercentile = (int) (sum * 0.99);
477 int sizeBucket = 0;
478 for (; sizeBucket < sums.length; sizeBucket++) {
479 if (sums[sizeBucket] > targetPercentile) {
480 break;
481 }
482 targetPercentile -= sums[sizeBucket];
483 }
484 int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT;
485 int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
486 localPrefChunkSize = prefChunkSize;
487 if (shareable) {
488 for (Magazine mag : parent.magazines) {
489 prefChunkSize = Math.max(prefChunkSize, mag.localPrefChunkSize);
490 }
491 }
492 if (sharedPrefChunkSize != prefChunkSize) {
493
494 datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
495 sharedPrefChunkSize = prefChunkSize;
496 } else {
497
498 datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
499 }
500
501 histoIndex = histoIndex + 1 & 3;
502 histo = histos[histoIndex];
503 datumCount = 0;
504 Arrays.fill(histo, (short) 0);
505 }
506
507
508
509
510
511
512
513
514
515 protected int preferredChunkSize() {
516 return sharedPrefChunkSize;
517 }
518 }
519
520 @SuppressJava6Requirement(reason = "Guarded by version check")
521 private static final class Magazine extends AllocationStatistics {
522 private static final AtomicReferenceFieldUpdater<Magazine, Chunk> NEXT_IN_LINE;
523 static {
524 NEXT_IN_LINE = AtomicReferenceFieldUpdater.newUpdater(Magazine.class, Chunk.class, "nextInLine");
525 }
526 private static final Chunk MAGAZINE_FREED = new Chunk();
527
528 private static final ObjectPool<AdaptiveByteBuf> EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
529 new ObjectPool.ObjectCreator<AdaptiveByteBuf>() {
530 @Override
531 public AdaptiveByteBuf newObject(ObjectPool.Handle<AdaptiveByteBuf> handle) {
532 return new AdaptiveByteBuf(handle);
533 }
534 });
535
536 private Chunk current;
537 @SuppressWarnings("unused")
538 private volatile Chunk nextInLine;
539 private final AtomicLong usedMemory;
540 private final StampedLock allocationLock;
541 private final Queue<AdaptiveByteBuf> bufferQueue;
542 private final ObjectPool.Handle<AdaptiveByteBuf> handle;
543
544 Magazine(AdaptivePoolingAllocator parent) {
545 this(parent, true);
546 }
547
548 Magazine(AdaptivePoolingAllocator parent, boolean shareable) {
549 super(parent, shareable);
550
551 if (shareable) {
552
553 allocationLock = new StampedLock();
554 bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
555 handle = new ObjectPool.Handle<AdaptiveByteBuf>() {
556 @Override
557 public void recycle(AdaptiveByteBuf self) {
558 bufferQueue.offer(self);
559 }
560 };
561 } else {
562 allocationLock = null;
563 bufferQueue = null;
564 handle = null;
565 }
566 usedMemory = new AtomicLong();
567 }
568
569 public boolean tryAllocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
570 if (allocationLock == null) {
571
572 return allocate(size, sizeBucket, maxCapacity, buf);
573 }
574
575
576 long writeLock = allocationLock.tryWriteLock();
577 if (writeLock != 0) {
578 try {
579 return allocate(size, sizeBucket, maxCapacity, buf);
580 } finally {
581 allocationLock.unlockWrite(writeLock);
582 }
583 }
584 return allocateWithoutLock(size, maxCapacity, buf);
585 }
586
587 private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf buf) {
588 Chunk curr = NEXT_IN_LINE.getAndSet(this, null);
589 if (curr == MAGAZINE_FREED) {
590
591 restoreMagazineFreed();
592 return false;
593 }
594 if (curr == null) {
595 curr = parent.centralQueue.poll();
596 if (curr == null) {
597 return false;
598 }
599 curr.attachToMagazine(this);
600 }
601 if (curr.remainingCapacity() >= size) {
602 curr.readInitInto(buf, size, maxCapacity);
603 }
604 try {
605 if (curr.remainingCapacity() >= RETIRE_CAPACITY) {
606 transferToNextInLineOrRelease(curr);
607 curr = null;
608 }
609 } finally {
610 if (curr != null) {
611 curr.release();
612 }
613 }
614 return true;
615 }
616
617 private boolean allocate(int size, int sizeBucket, int maxCapacity, AdaptiveByteBuf buf) {
618 recordAllocationSize(sizeBucket);
619 Chunk curr = current;
620 if (curr != null) {
621
622 if (curr.remainingCapacity() > size) {
623 curr.readInitInto(buf, size, maxCapacity);
624
625 return true;
626 }
627
628
629
630 current = null;
631 if (curr.remainingCapacity() == size) {
632 try {
633 curr.readInitInto(buf, size, maxCapacity);
634 return true;
635 } finally {
636 curr.release();
637 }
638 }
639
640
641 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
642 curr.release();
643 } else {
644
645
646 transferToNextInLineOrRelease(curr);
647 }
648 }
649
650 assert current == null;
651
652
653
654
655
656
657
658 curr = NEXT_IN_LINE.getAndSet(this, null);
659 if (curr != null) {
660 if (curr == MAGAZINE_FREED) {
661
662 restoreMagazineFreed();
663 return false;
664 }
665
666 if (curr.remainingCapacity() > size) {
667
668 curr.readInitInto(buf, size, maxCapacity);
669 current = curr;
670 return true;
671 }
672
673 if (curr.remainingCapacity() == size) {
674
675
676 try {
677 curr.readInitInto(buf, size, maxCapacity);
678 return true;
679 } finally {
680
681
682 curr.release();
683 }
684 } else {
685
686 curr.release();
687 }
688 }
689
690
691 curr = parent.centralQueue.poll();
692 if (curr == null) {
693 curr = newChunkAllocation(size);
694 } else {
695 curr.attachToMagazine(this);
696
697 if (curr.remainingCapacity() < size) {
698
699 if (curr.remainingCapacity() < RETIRE_CAPACITY) {
700 curr.release();
701 } else {
702
703
704 transferToNextInLineOrRelease(curr);
705 }
706 curr = newChunkAllocation(size);
707 }
708 }
709
710 current = curr;
711 try {
712 assert current.remainingCapacity() >= size;
713 if (curr.remainingCapacity() > size) {
714 curr.readInitInto(buf, size, maxCapacity);
715 curr = null;
716 } else {
717 curr.readInitInto(buf, size, maxCapacity);
718 }
719 } finally {
720 if (curr != null) {
721
722
723 curr.release();
724 current = null;
725 }
726 }
727 return true;
728 }
729
730 private void restoreMagazineFreed() {
731 Chunk next = NEXT_IN_LINE.getAndSet(this, MAGAZINE_FREED);
732 if (next != null && next != MAGAZINE_FREED) {
733 next.release();
734 }
735 }
736
737 private void transferToNextInLineOrRelease(Chunk chunk) {
738 if (NEXT_IN_LINE.compareAndSet(this, null, chunk)) {
739 return;
740 }
741
742 Chunk nextChunk = NEXT_IN_LINE.get(this);
743 if (nextChunk != null && nextChunk != MAGAZINE_FREED
744 && chunk.remainingCapacity() > nextChunk.remainingCapacity()) {
745 if (NEXT_IN_LINE.compareAndSet(this, nextChunk, chunk)) {
746 nextChunk.release();
747 return;
748 }
749 }
750
751
752
753
754 chunk.release();
755 }
756
757 private Chunk newChunkAllocation(int promptingSize) {
758 int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
759 int minChunks = size / MIN_CHUNK_SIZE;
760 if (MIN_CHUNK_SIZE * minChunks < size) {
761
762
763
764
765 size = MIN_CHUNK_SIZE * (1 + minChunks);
766 }
767 ChunkAllocator chunkAllocator = parent.chunkAllocator;
768 return new Chunk(chunkAllocator.allocate(size, size), this, true);
769 }
770
771 boolean trySetNextInLine(Chunk chunk) {
772 return NEXT_IN_LINE.compareAndSet(this, null, chunk);
773 }
774
775 void free() {
776
777 restoreMagazineFreed();
778 long stamp = allocationLock.writeLock();
779 try {
780 if (current != null) {
781 current.release();
782 current = null;
783 }
784 } finally {
785 allocationLock.unlockWrite(stamp);
786 }
787 }
788
789 public AdaptiveByteBuf newBuffer() {
790 AdaptiveByteBuf buf;
791 if (handle == null) {
792 buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
793 } else {
794 buf = bufferQueue.poll();
795 if (buf == null) {
796 buf = new AdaptiveByteBuf(handle);
797 }
798 }
799 buf.resetRefCnt();
800 buf.discardMarks();
801 return buf;
802 }
803 }
804
805 private static final class Chunk implements ReferenceCounted {
806
807 private final AbstractByteBuf delegate;
808 private Magazine magazine;
809 private final AdaptivePoolingAllocator allocator;
810 private final int capacity;
811 private final boolean pooled;
812 private int allocatedBytes;
813 private static final long REFCNT_FIELD_OFFSET =
814 ReferenceCountUpdater.getUnsafeOffset(Chunk.class, "refCnt");
815 private static final AtomicIntegerFieldUpdater<Chunk> AIF_UPDATER =
816 AtomicIntegerFieldUpdater.newUpdater(Chunk.class, "refCnt");
817
818 private static final ReferenceCountUpdater<Chunk> updater =
819 new ReferenceCountUpdater<Chunk>() {
820 @Override
821 protected AtomicIntegerFieldUpdater<Chunk> updater() {
822 return AIF_UPDATER;
823 }
824 @Override
825 protected long unsafeOffset() {
826 return REFCNT_FIELD_OFFSET;
827 }
828 };
829
830
831 @SuppressWarnings({"unused", "FieldMayBeFinal"})
832 private volatile int refCnt;
833
834 Chunk() {
835
836 delegate = null;
837 magazine = null;
838 allocator = null;
839 capacity = 0;
840 pooled = false;
841 }
842
843 Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled) {
844 this.delegate = delegate;
845 this.pooled = pooled;
846 capacity = delegate.capacity();
847 updater.setInitialValue(this);
848 allocator = magazine.parent;
849 attachToMagazine(magazine);
850 }
851
852 Magazine currentMagazine() {
853 return magazine;
854 }
855
856 void detachFromMagazine() {
857 if (magazine != null) {
858 magazine.usedMemory.getAndAdd(-capacity);
859 magazine = null;
860 }
861 }
862
863 void attachToMagazine(Magazine magazine) {
864 assert this.magazine == null;
865 this.magazine = magazine;
866 magazine.usedMemory.getAndAdd(capacity);
867 }
868
869 @Override
870 public Chunk touch(Object hint) {
871 return this;
872 }
873
874 @Override
875 public int refCnt() {
876 return updater.refCnt(this);
877 }
878
879 @Override
880 public Chunk retain() {
881 return updater.retain(this);
882 }
883
884 @Override
885 public Chunk retain(int increment) {
886 return updater.retain(this, increment);
887 }
888
889 @Override
890 public Chunk touch() {
891 return this;
892 }
893
894 @Override
895 public boolean release() {
896 if (updater.release(this)) {
897 deallocate();
898 return true;
899 }
900 return false;
901 }
902
903 @Override
904 public boolean release(int decrement) {
905 if (updater.release(this, decrement)) {
906 deallocate();
907 return true;
908 }
909 return false;
910 }
911
912 private void deallocate() {
913 Magazine mag = magazine;
914 AdaptivePoolingAllocator parent = mag.parent;
915 int chunkSize = mag.preferredChunkSize();
916 int memSize = delegate.capacity();
917 if (!pooled || shouldReleaseSuboptimalChunkSuze(memSize, chunkSize)) {
918
919
920 detachFromMagazine();
921 delegate.release();
922 } else {
923 updater.resetRefCnt(this);
924 delegate.setIndex(0, 0);
925 allocatedBytes = 0;
926 if (!mag.trySetNextInLine(this)) {
927
928 detachFromMagazine();
929 if (!parent.offerToQueue(this)) {
930
931
932 boolean released = updater.release(this);
933 delegate.release();
934 assert released;
935 }
936 }
937 }
938 }
939
940 private static boolean shouldReleaseSuboptimalChunkSuze(int givenSize, int preferredSize) {
941 int givenChunks = givenSize / MIN_CHUNK_SIZE;
942 int preferredChunks = preferredSize / MIN_CHUNK_SIZE;
943 int deviation = Math.abs(givenChunks - preferredChunks);
944
945
946 return deviation != 0 &&
947 PlatformDependent.threadLocalRandom().nextDouble() * 200.0 > deviation;
948 }
949
950 public void readInitInto(AdaptiveByteBuf buf, int size, int maxCapacity) {
951 int startIndex = allocatedBytes;
952 allocatedBytes = startIndex + size;
953 Chunk chunk = this;
954 chunk.retain();
955 try {
956 buf.init(delegate, chunk, 0, 0, startIndex, size, maxCapacity);
957 chunk = null;
958 } finally {
959 if (chunk != null) {
960
961
962
963 allocatedBytes = startIndex;
964 chunk.release();
965 }
966 }
967 }
968
969 public int remainingCapacity() {
970 return capacity - allocatedBytes;
971 }
972
973 public int capacity() {
974 return capacity;
975 }
976 }
977
978 static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
979
980 private final ObjectPool.Handle<AdaptiveByteBuf> handle;
981
982 private int adjustment;
983 private AbstractByteBuf rootParent;
984 Chunk chunk;
985 private int length;
986 private ByteBuffer tmpNioBuf;
987 private boolean hasArray;
988 private boolean hasMemoryAddress;
989
990 AdaptiveByteBuf(ObjectPool.Handle<AdaptiveByteBuf> recyclerHandle) {
991 super(0);
992 handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
993 }
994
995 void init(AbstractByteBuf unwrapped, Chunk wrapped, int readerIndex, int writerIndex,
996 int adjustment, int capacity, int maxCapacity) {
997 this.adjustment = adjustment;
998 chunk = wrapped;
999 length = capacity;
1000 maxCapacity(maxCapacity);
1001 setIndex0(readerIndex, writerIndex);
1002 hasArray = unwrapped.hasArray();
1003 hasMemoryAddress = unwrapped.hasMemoryAddress();
1004 rootParent = unwrapped;
1005 tmpNioBuf = unwrapped.internalNioBuffer(adjustment, capacity).slice();
1006 }
1007
1008 private AbstractByteBuf rootParent() {
1009 final AbstractByteBuf rootParent = this.rootParent;
1010 if (rootParent != null) {
1011 return rootParent;
1012 }
1013 throw new IllegalReferenceCountException();
1014 }
1015
1016 @Override
1017 public int capacity() {
1018 return length;
1019 }
1020
1021 @Override
1022 public ByteBuf capacity(int newCapacity) {
1023 if (newCapacity == capacity()) {
1024 ensureAccessible();
1025 return this;
1026 }
1027 checkNewCapacity(newCapacity);
1028 if (newCapacity < capacity()) {
1029 length = newCapacity;
1030 setIndex0(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
1031 return this;
1032 }
1033
1034
1035 ByteBuffer data = tmpNioBuf;
1036 data.clear();
1037 tmpNioBuf = null;
1038 Chunk chunk = this.chunk;
1039 AdaptivePoolingAllocator allocator = chunk.allocator;
1040 int readerIndex = this.readerIndex;
1041 int writerIndex = this.writerIndex;
1042 allocator.allocate(newCapacity, maxCapacity(), this);
1043 tmpNioBuf.put(data);
1044 tmpNioBuf.clear();
1045 chunk.release();
1046 this.readerIndex = readerIndex;
1047 this.writerIndex = writerIndex;
1048 return this;
1049 }
1050
1051 @Override
1052 public ByteBufAllocator alloc() {
1053 return rootParent().alloc();
1054 }
1055
1056 @Override
1057 public ByteOrder order() {
1058 return rootParent().order();
1059 }
1060
1061 @Override
1062 public ByteBuf unwrap() {
1063 return null;
1064 }
1065
1066 @Override
1067 public boolean isDirect() {
1068 return rootParent().isDirect();
1069 }
1070
1071 @Override
1072 public int arrayOffset() {
1073 return idx(rootParent().arrayOffset());
1074 }
1075
1076 @Override
1077 public boolean hasMemoryAddress() {
1078 return hasMemoryAddress;
1079 }
1080
1081 @Override
1082 public long memoryAddress() {
1083 ensureAccessible();
1084 return rootParent().memoryAddress() + adjustment;
1085 }
1086
1087 @Override
1088 public ByteBuffer nioBuffer(int index, int length) {
1089 checkIndex(index, length);
1090 return rootParent().nioBuffer(idx(index), length);
1091 }
1092
1093 @Override
1094 public ByteBuffer internalNioBuffer(int index, int length) {
1095 checkIndex(index, length);
1096 return (ByteBuffer) internalNioBuffer().position(index).limit(index + length);
1097 }
1098
1099 private ByteBuffer internalNioBuffer() {
1100 return (ByteBuffer) tmpNioBuf.clear();
1101 }
1102
1103 @Override
1104 public ByteBuffer[] nioBuffers(int index, int length) {
1105 checkIndex(index, length);
1106 return rootParent().nioBuffers(idx(index), length);
1107 }
1108
1109 @Override
1110 public boolean hasArray() {
1111 return hasArray;
1112 }
1113
1114 @Override
1115 public byte[] array() {
1116 ensureAccessible();
1117 return rootParent().array();
1118 }
1119
1120 @Override
1121 public ByteBuf copy(int index, int length) {
1122 checkIndex(index, length);
1123 return rootParent().copy(idx(index), length);
1124 }
1125
1126 @Override
1127 public int nioBufferCount() {
1128 return rootParent().nioBufferCount();
1129 }
1130
1131 @Override
1132 protected byte _getByte(int index) {
1133 return rootParent()._getByte(idx(index));
1134 }
1135
1136 @Override
1137 protected short _getShort(int index) {
1138 return rootParent()._getShort(idx(index));
1139 }
1140
1141 @Override
1142 protected short _getShortLE(int index) {
1143 return rootParent()._getShortLE(idx(index));
1144 }
1145
1146 @Override
1147 protected int _getUnsignedMedium(int index) {
1148 return rootParent()._getUnsignedMedium(idx(index));
1149 }
1150
1151 @Override
1152 protected int _getUnsignedMediumLE(int index) {
1153 return rootParent()._getUnsignedMediumLE(idx(index));
1154 }
1155
1156 @Override
1157 protected int _getInt(int index) {
1158 return rootParent()._getInt(idx(index));
1159 }
1160
1161 @Override
1162 protected int _getIntLE(int index) {
1163 return rootParent()._getIntLE(idx(index));
1164 }
1165
1166 @Override
1167 protected long _getLong(int index) {
1168 return rootParent()._getLong(idx(index));
1169 }
1170
1171 @Override
1172 protected long _getLongLE(int index) {
1173 return rootParent()._getLongLE(idx(index));
1174 }
1175
1176 @Override
1177 public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
1178 checkIndex(index, length);
1179 rootParent().getBytes(idx(index), dst, dstIndex, length);
1180 return this;
1181 }
1182
1183 @Override
1184 public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
1185 checkIndex(index, length);
1186 rootParent().getBytes(idx(index), dst, dstIndex, length);
1187 return this;
1188 }
1189
1190 @Override
1191 public ByteBuf getBytes(int index, ByteBuffer dst) {
1192 checkIndex(index, dst.remaining());
1193 rootParent().getBytes(idx(index), dst);
1194 return this;
1195 }
1196
1197 @Override
1198 protected void _setByte(int index, int value) {
1199 rootParent()._setByte(idx(index), value);
1200 }
1201
1202 @Override
1203 protected void _setShort(int index, int value) {
1204 rootParent()._setShort(idx(index), value);
1205 }
1206
1207 @Override
1208 protected void _setShortLE(int index, int value) {
1209 rootParent()._setShortLE(idx(index), value);
1210 }
1211
1212 @Override
1213 protected void _setMedium(int index, int value) {
1214 rootParent()._setMedium(idx(index), value);
1215 }
1216
1217 @Override
1218 protected void _setMediumLE(int index, int value) {
1219 rootParent()._setMediumLE(idx(index), value);
1220 }
1221
1222 @Override
1223 protected void _setInt(int index, int value) {
1224 rootParent()._setInt(idx(index), value);
1225 }
1226
1227 @Override
1228 protected void _setIntLE(int index, int value) {
1229 rootParent()._setIntLE(idx(index), value);
1230 }
1231
1232 @Override
1233 protected void _setLong(int index, long value) {
1234 rootParent()._setLong(idx(index), value);
1235 }
1236
1237 @Override
1238 protected void _setLongLE(int index, long value) {
1239 rootParent().setLongLE(idx(index), value);
1240 }
1241
1242 @Override
1243 public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
1244 checkIndex(index, length);
1245 rootParent().setBytes(idx(index), src, srcIndex, length);
1246 return this;
1247 }
1248
1249 @Override
1250 public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
1251 checkIndex(index, length);
1252 rootParent().setBytes(idx(index), src, srcIndex, length);
1253 return this;
1254 }
1255
1256 @Override
1257 public ByteBuf setBytes(int index, ByteBuffer src) {
1258 checkIndex(index, src.remaining());
1259 rootParent().setBytes(idx(index), src);
1260 return this;
1261 }
1262
1263 @Override
1264 public ByteBuf getBytes(int index, OutputStream out, int length)
1265 throws IOException {
1266 checkIndex(index, length);
1267 if (length != 0) {
1268 ByteBufUtil.readBytes(alloc(), internalNioBuffer().duplicate(), index, length, out);
1269 }
1270 return this;
1271 }
1272
1273 @Override
1274 public int getBytes(int index, GatheringByteChannel out, int length)
1275 throws IOException {
1276 return out.write(internalNioBuffer(index, length).duplicate());
1277 }
1278
1279 @Override
1280 public int getBytes(int index, FileChannel out, long position, int length)
1281 throws IOException {
1282 return out.write(internalNioBuffer(index, length).duplicate(), position);
1283 }
1284
1285 @Override
1286 public int setBytes(int index, InputStream in, int length)
1287 throws IOException {
1288 checkIndex(index, length);
1289 final AbstractByteBuf rootParent = rootParent();
1290 if (rootParent.hasArray()) {
1291 return rootParent.setBytes(idx(index), in, length);
1292 }
1293 byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
1294 int readBytes = in.read(tmp, 0, length);
1295 if (readBytes <= 0) {
1296 return readBytes;
1297 }
1298 setBytes(index, tmp, 0, readBytes);
1299 return readBytes;
1300 }
1301
1302 @Override
1303 public int setBytes(int index, ScatteringByteChannel in, int length)
1304 throws IOException {
1305 try {
1306 return in.read(internalNioBuffer(index, length).duplicate());
1307 } catch (ClosedChannelException ignored) {
1308 return -1;
1309 }
1310 }
1311
1312 @Override
1313 public int setBytes(int index, FileChannel in, long position, int length)
1314 throws IOException {
1315 try {
1316 return in.read(internalNioBuffer(index, length).duplicate(), position);
1317 } catch (ClosedChannelException ignored) {
1318 return -1;
1319 }
1320 }
1321
1322 @Override
1323 public int forEachByte(int index, int length, ByteProcessor processor) {
1324 checkIndex(index, length);
1325 int ret = rootParent().forEachByte(idx(index), length, processor);
1326 return forEachResult(ret);
1327 }
1328
1329 @Override
1330 public int forEachByteDesc(int index, int length, ByteProcessor processor) {
1331 checkIndex(index, length);
1332 int ret = rootParent().forEachByteDesc(idx(index), length, processor);
1333 return forEachResult(ret);
1334 }
1335
1336 private int forEachResult(int ret) {
1337 if (ret < adjustment) {
1338 return -1;
1339 }
1340 return ret - adjustment;
1341 }
1342
1343 @Override
1344 public boolean isContiguous() {
1345 return rootParent().isContiguous();
1346 }
1347
1348 private int idx(int index) {
1349 return index + adjustment;
1350 }
1351
1352 @Override
1353 protected void deallocate() {
1354 if (chunk != null) {
1355 chunk.release();
1356 }
1357 tmpNioBuf = null;
1358 chunk = null;
1359 rootParent = null;
1360 if (handle instanceof EnhancedHandle) {
1361 EnhancedHandle<AdaptiveByteBuf> enhancedHandle = (EnhancedHandle<AdaptiveByteBuf>) handle;
1362 enhancedHandle.unguardedRecycle(this);
1363 } else {
1364 handle.recycle(this);
1365 }
1366 }
1367 }
1368
1369
1370
1371
1372 interface ChunkAllocator {
1373
1374
1375
1376
1377
1378
1379 AbstractByteBuf allocate(int initialCapacity, int maxCapacity);
1380 }
1381 }