1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.internal.CleanableDirectBuffer;
19 import io.netty.util.internal.LongLongHashMap;
20 import io.netty.util.internal.SystemPropertyUtil;
21
22 import java.nio.ByteBuffer;
23 import java.util.ArrayDeque;
24 import java.util.Deque;
25 import java.util.PriorityQueue;
26 import java.util.concurrent.atomic.LongAdder;
27 import java.util.concurrent.locks.ReentrantLock;
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138 final class PoolChunk<T> implements PoolChunkMetric, ChunkInfo {
139 private static final int SIZE_BIT_LENGTH = 15;
140 private static final int INUSED_BIT_LENGTH = 1;
141 private static final int SUBPAGE_BIT_LENGTH = 1;
142 private static final int BITMAP_IDX_BIT_LENGTH = 32;
143
144 private static final boolean trackPinnedMemory =
145 SystemPropertyUtil.getBoolean("io.netty.trackPinnedMemory", true);
146
147 static final int IS_SUBPAGE_SHIFT = BITMAP_IDX_BIT_LENGTH;
148 static final int IS_USED_SHIFT = SUBPAGE_BIT_LENGTH + IS_SUBPAGE_SHIFT;
149 static final int SIZE_SHIFT = INUSED_BIT_LENGTH + IS_USED_SHIFT;
150 static final int RUN_OFFSET_SHIFT = SIZE_BIT_LENGTH + SIZE_SHIFT;
151
152 final PoolArena<T> arena;
153 final CleanableDirectBuffer cleanable;
154 final Object base;
155 final T memory;
156 final boolean unpooled;
157
158
159
160
161 private final LongLongHashMap runsAvailMap;
162
163
164
165
166 private final IntPriorityQueue[] runsAvail;
167
168 private final ReentrantLock runsAvailLock;
169
170
171
172
173 private final PoolSubpage<T>[] subpages;
174
175
176
177
178 private final LongAdder pinnedBytes;
179
180 final int pageSize;
181 final int pageShifts;
182 final int chunkSize;
183 final int maxPageIdx;
184
185
186
187
188
189
190 private final Deque<ByteBuffer> cachedNioBuffers;
191
192 int freeBytes;
193
194 PoolChunkList<T> parent;
195 PoolChunk<T> prev;
196 PoolChunk<T> next;
197
198 @SuppressWarnings("unchecked")
199 PoolChunk(PoolArena<T> arena, CleanableDirectBuffer cleanable, Object base, T memory, int pageSize, int pageShifts,
200 int chunkSize, int maxPageIdx) {
201 unpooled = false;
202 this.arena = arena;
203 this.cleanable = cleanable;
204 this.base = base;
205 this.memory = memory;
206 this.pageSize = pageSize;
207 this.pageShifts = pageShifts;
208 this.chunkSize = chunkSize;
209 this.maxPageIdx = maxPageIdx;
210 freeBytes = chunkSize;
211
212 runsAvail = newRunsAvailqueueArray(maxPageIdx);
213 runsAvailLock = new ReentrantLock();
214 runsAvailMap = new LongLongHashMap(-1);
215 subpages = new PoolSubpage[chunkSize >> pageShifts];
216
217
218 int pages = chunkSize >> pageShifts;
219 long initHandle = (long) pages << SIZE_SHIFT;
220 insertAvailRun(0, pages, initHandle);
221
222 cachedNioBuffers = new ArrayDeque<>(8);
223 this.pinnedBytes = trackPinnedMemory ? new LongAdder() : null;
224 }
225
226
227 PoolChunk(PoolArena<T> arena, CleanableDirectBuffer cleanable, Object base, T memory, int size) {
228 unpooled = true;
229 this.arena = arena;
230 this.cleanable = cleanable;
231 this.base = base;
232 this.memory = memory;
233 pageSize = 0;
234 pageShifts = 0;
235 maxPageIdx = 0;
236 runsAvailMap = null;
237 runsAvail = null;
238 runsAvailLock = null;
239 subpages = null;
240 chunkSize = size;
241 cachedNioBuffers = null;
242 this.pinnedBytes = trackPinnedMemory ? new LongAdder() : null;
243 }
244
245 private static IntPriorityQueue[] newRunsAvailqueueArray(int size) {
246 IntPriorityQueue[] queueArray = new IntPriorityQueue[size];
247 for (int i = 0; i < queueArray.length; i++) {
248 queueArray[i] = new IntPriorityQueue();
249 }
250 return queueArray;
251 }
252
253 private void insertAvailRun(int runOffset, int pages, long handle) {
254 int pageIdxFloor = arena.sizeClass.pages2pageIdxFloor(pages);
255 IntPriorityQueue queue = runsAvail[pageIdxFloor];
256 assert isRun(handle);
257 queue.offer((int) (handle >> BITMAP_IDX_BIT_LENGTH));
258
259
260 insertAvailRun0(runOffset, handle);
261 if (pages > 1) {
262
263 insertAvailRun0(lastPage(runOffset, pages), handle);
264 }
265 }
266
267 private void insertAvailRun0(int runOffset, long handle) {
268 long pre = runsAvailMap.put(runOffset, handle);
269 assert pre == -1;
270 }
271
272 private void removeAvailRun(long handle) {
273 int pageIdxFloor = arena.sizeClass.pages2pageIdxFloor(runPages(handle));
274 runsAvail[pageIdxFloor].remove((int) (handle >> BITMAP_IDX_BIT_LENGTH));
275 removeAvailRun0(handle);
276 }
277
278 private void removeAvailRun0(long handle) {
279 int runOffset = runOffset(handle);
280 int pages = runPages(handle);
281
282 runsAvailMap.remove(runOffset);
283 if (pages > 1) {
284
285 runsAvailMap.remove(lastPage(runOffset, pages));
286 }
287 }
288
289 private static int lastPage(int runOffset, int pages) {
290 return runOffset + pages - 1;
291 }
292
293 private long getAvailRunByOffset(int runOffset) {
294 return runsAvailMap.get(runOffset);
295 }
296
297 @Override
298 public int usage() {
299 final int freeBytes;
300 if (this.unpooled) {
301 freeBytes = this.freeBytes;
302 } else {
303 runsAvailLock.lock();
304 try {
305 freeBytes = this.freeBytes;
306 } finally {
307 runsAvailLock.unlock();
308 }
309 }
310 return usage(freeBytes);
311 }
312
313 private int usage(int freeBytes) {
314 if (freeBytes == 0) {
315 return 100;
316 }
317
318 int freePercentage = (int) (freeBytes * 100L / chunkSize);
319 if (freePercentage == 0) {
320 return 99;
321 }
322 return 100 - freePercentage;
323 }
324
325 boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache cache) {
326 final long handle;
327 if (sizeIdx <= arena.sizeClass.smallMaxSizeIdx) {
328 final PoolSubpage<T> nextSub;
329
330
331
332 PoolSubpage<T> head = arena.smallSubpagePools[sizeIdx];
333 head.lock();
334 try {
335 nextSub = head.next;
336 if (nextSub != head) {
337 assert nextSub.doNotDestroy && nextSub.elemSize == arena.sizeClass.sizeIdx2size(sizeIdx) :
338 "doNotDestroy=" + nextSub.doNotDestroy + ", elemSize=" + nextSub.elemSize + ", sizeIdx=" +
339 sizeIdx;
340 handle = nextSub.allocate();
341 assert handle >= 0;
342 assert isSubpage(handle);
343 nextSub.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache, false);
344 return true;
345 }
346 handle = allocateSubpage(sizeIdx, head);
347 if (handle < 0) {
348 return false;
349 }
350 assert isSubpage(handle);
351 } finally {
352 head.unlock();
353 }
354 } else {
355
356
357 int runSize = arena.sizeClass.sizeIdx2size(sizeIdx);
358 handle = allocateRun(runSize);
359 if (handle < 0) {
360 return false;
361 }
362 assert !isSubpage(handle);
363 }
364
365 ByteBuffer nioBuffer = cachedNioBuffers != null? cachedNioBuffers.pollLast() : null;
366 initBuf(buf, nioBuffer, handle, reqCapacity, cache, false);
367 return true;
368 }
369
370 private long allocateRun(int runSize) {
371 int pages = runSize >> pageShifts;
372 int pageIdx = arena.sizeClass.pages2pageIdx(pages);
373
374 runsAvailLock.lock();
375 try {
376
377 int queueIdx = runFirstBestFit(pageIdx);
378 if (queueIdx == -1) {
379 return -1;
380 }
381
382
383 IntPriorityQueue queue = runsAvail[queueIdx];
384 long handle = queue.poll();
385 assert handle != IntPriorityQueue.NO_VALUE;
386 handle <<= BITMAP_IDX_BIT_LENGTH;
387 assert !isUsed(handle) : "invalid handle: " + handle;
388
389 removeAvailRun0(handle);
390
391 handle = splitLargeRun(handle, pages);
392
393 int pinnedSize = runSize(pageShifts, handle);
394 freeBytes -= pinnedSize;
395 return handle;
396 } finally {
397 runsAvailLock.unlock();
398 }
399 }
400
401 private int calculateRunSize(int sizeIdx) {
402 int maxElements = 1 << pageShifts - SizeClasses.LOG2_QUANTUM;
403 int runSize = 0;
404 int nElements;
405
406 final int elemSize = arena.sizeClass.sizeIdx2size(sizeIdx);
407
408
409 do {
410 runSize += pageSize;
411 nElements = runSize / elemSize;
412 } while (nElements < maxElements && runSize != nElements * elemSize);
413
414 while (nElements > maxElements) {
415 runSize -= pageSize;
416 nElements = runSize / elemSize;
417 }
418
419 assert nElements > 0;
420 assert runSize <= chunkSize;
421 assert runSize >= elemSize;
422
423 return runSize;
424 }
425
426 private int runFirstBestFit(int pageIdx) {
427 if (freeBytes == chunkSize) {
428 return arena.sizeClass.nPSizes - 1;
429 }
430 for (int i = pageIdx; i < arena.sizeClass.nPSizes; i++) {
431 IntPriorityQueue queue = runsAvail[i];
432 if (queue != null && !queue.isEmpty()) {
433 return i;
434 }
435 }
436 return -1;
437 }
438
439 private long splitLargeRun(long handle, int needPages) {
440 assert needPages > 0;
441
442 int totalPages = runPages(handle);
443 assert needPages <= totalPages;
444
445 int remPages = totalPages - needPages;
446
447 if (remPages > 0) {
448 int runOffset = runOffset(handle);
449
450
451 int availOffset = runOffset + needPages;
452 long availRun = toRunHandle(availOffset, remPages, 0);
453 insertAvailRun(availOffset, remPages, availRun);
454
455
456 return toRunHandle(runOffset, needPages, 1);
457 }
458
459
460 handle |= 1L << IS_USED_SHIFT;
461 return handle;
462 }
463
464
465
466
467
468
469
470
471
472
473 private long allocateSubpage(int sizeIdx, PoolSubpage<T> head) {
474
475 int runSize = calculateRunSize(sizeIdx);
476
477 long runHandle = allocateRun(runSize);
478 if (runHandle < 0) {
479 return -1;
480 }
481
482 int runOffset = runOffset(runHandle);
483 assert subpages[runOffset] == null;
484 int elemSize = arena.sizeClass.sizeIdx2size(sizeIdx);
485
486 PoolSubpage<T> subpage = new PoolSubpage<T>(head, this, pageShifts, runOffset,
487 runSize(pageShifts, runHandle), elemSize);
488
489 subpages[runOffset] = subpage;
490 return subpage.allocate();
491 }
492
493
494
495
496
497
498
499
500 void free(long handle, int normCapacity, ByteBuffer nioBuffer) {
501 if (isSubpage(handle)) {
502 int sIdx = runOffset(handle);
503 PoolSubpage<T> subpage = subpages[sIdx];
504 assert subpage != null;
505 PoolSubpage<T> head = subpage.chunk.arena.smallSubpagePools[subpage.headIndex];
506
507
508 head.lock();
509 try {
510 assert subpage.doNotDestroy;
511 if (subpage.free(head, bitmapIdx(handle))) {
512
513 return;
514 }
515 assert !subpage.doNotDestroy;
516
517 subpages[sIdx] = null;
518 } finally {
519 head.unlock();
520 }
521 }
522
523 int runSize = runSize(pageShifts, handle);
524
525 runsAvailLock.lock();
526 try {
527
528
529 long finalRun = collapseRuns(handle);
530
531
532 finalRun &= ~(1L << IS_USED_SHIFT);
533
534 finalRun &= ~(1L << IS_SUBPAGE_SHIFT);
535
536 insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);
537 freeBytes += runSize;
538 } finally {
539 runsAvailLock.unlock();
540 }
541
542 if (nioBuffer != null && cachedNioBuffers != null &&
543 cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
544 cachedNioBuffers.offer(nioBuffer);
545 }
546 }
547
548 private long collapseRuns(long handle) {
549 return collapseNext(collapsePast(handle));
550 }
551
552 private long collapsePast(long handle) {
553 for (;;) {
554 int runOffset = runOffset(handle);
555 int runPages = runPages(handle);
556
557 long pastRun = getAvailRunByOffset(runOffset - 1);
558 if (pastRun == -1) {
559 return handle;
560 }
561
562 int pastOffset = runOffset(pastRun);
563 int pastPages = runPages(pastRun);
564
565
566 if (pastRun != handle && pastOffset + pastPages == runOffset) {
567
568 removeAvailRun(pastRun);
569 handle = toRunHandle(pastOffset, pastPages + runPages, 0);
570 } else {
571 return handle;
572 }
573 }
574 }
575
576 private long collapseNext(long handle) {
577 for (;;) {
578 int runOffset = runOffset(handle);
579 int runPages = runPages(handle);
580
581 long nextRun = getAvailRunByOffset(runOffset + runPages);
582 if (nextRun == -1) {
583 return handle;
584 }
585
586 int nextOffset = runOffset(nextRun);
587 int nextPages = runPages(nextRun);
588
589
590 if (nextRun != handle && runOffset + runPages == nextOffset) {
591
592 removeAvailRun(nextRun);
593 handle = toRunHandle(runOffset, runPages + nextPages, 0);
594 } else {
595 return handle;
596 }
597 }
598 }
599
600 private static long toRunHandle(int runOffset, int runPages, int inUsed) {
601 return (long) runOffset << RUN_OFFSET_SHIFT
602 | (long) runPages << SIZE_SHIFT
603 | (long) inUsed << IS_USED_SHIFT;
604 }
605
606 void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
607 PoolThreadCache threadCache, boolean threadLocal) {
608 if (isSubpage(handle)) {
609 initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache, threadLocal);
610 } else {
611 int maxLength = runSize(pageShifts, handle);
612 buf.init(this, nioBuffer, handle, runOffset(handle) << pageShifts,
613 reqCapacity, maxLength, arena.parent.threadCache(), threadLocal);
614 }
615 }
616
617 void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
618 PoolThreadCache threadCache, boolean threadLocal) {
619 int runOffset = runOffset(handle);
620 int bitmapIdx = bitmapIdx(handle);
621
622 PoolSubpage<T> s = subpages[runOffset];
623 assert s.isDoNotDestroy();
624 assert reqCapacity <= s.elemSize : reqCapacity + "<=" + s.elemSize;
625
626 int offset = (runOffset << pageShifts) + bitmapIdx * s.elemSize;
627 buf.init(this, nioBuffer, handle, offset, reqCapacity, s.elemSize, threadCache, threadLocal);
628 }
629
630 void incrementPinnedMemory(int delta) {
631 assert delta > 0;
632 if (pinnedBytes != null) {
633 pinnedBytes.add(delta);
634 }
635 }
636
637 void decrementPinnedMemory(int delta) {
638 assert delta > 0;
639 if (pinnedBytes != null) {
640 pinnedBytes.add(-delta);
641 }
642 }
643
644 @Override
645 public int chunkSize() {
646 return chunkSize;
647 }
648
649 @Override
650 public int freeBytes() {
651 if (this.unpooled) {
652 return freeBytes;
653 }
654 runsAvailLock.lock();
655 try {
656 return freeBytes;
657 } finally {
658 runsAvailLock.unlock();
659 }
660 }
661
662 public int pinnedBytes() {
663 return pinnedBytes == null ? 0 : (int) pinnedBytes.sum();
664 }
665
666 @Override
667 public String toString() {
668 final int freeBytes;
669 if (this.unpooled) {
670 freeBytes = this.freeBytes;
671 } else {
672 runsAvailLock.lock();
673 try {
674 freeBytes = this.freeBytes;
675 } finally {
676 runsAvailLock.unlock();
677 }
678 }
679
680 return new StringBuilder()
681 .append("Chunk(")
682 .append(Integer.toHexString(System.identityHashCode(this)))
683 .append(": ")
684 .append(usage(freeBytes))
685 .append("%, ")
686 .append(chunkSize - freeBytes)
687 .append('/')
688 .append(chunkSize)
689 .append(')')
690 .toString();
691 }
692
693 void destroy() {
694 arena.destroyChunk(this);
695 }
696
697 static int runOffset(long handle) {
698 return (int) (handle >> RUN_OFFSET_SHIFT);
699 }
700
701 static int runSize(int pageShifts, long handle) {
702 return runPages(handle) << pageShifts;
703 }
704
705 static int runPages(long handle) {
706 return (int) (handle >> SIZE_SHIFT & 0x7fff);
707 }
708
709 static boolean isUsed(long handle) {
710 return (handle >> IS_USED_SHIFT & 1) == 1L;
711 }
712
713 static boolean isRun(long handle) {
714 return !isSubpage(handle);
715 }
716
717 static boolean isSubpage(long handle) {
718 return (handle >> IS_SUBPAGE_SHIFT & 1) == 1L;
719 }
720
721 static int bitmapIdx(long handle) {
722 return (int) handle;
723 }
724
725 @Override
726 public int capacity() {
727 return chunkSize;
728 }
729
730 @Override
731 public boolean isDirect() {
732 return cleanable != null && cleanable.buffer().isDirect();
733 }
734
735 @Override
736 public long memoryAddress() {
737 return cleanable != null && cleanable.hasMemoryAddress() ? cleanable.memoryAddress() : 0L;
738 }
739 }