1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.internal.LongCounter;
19 import io.netty.util.internal.PlatformDependent;
20
21 import java.nio.ByteBuffer;
22 import java.util.ArrayDeque;
23 import java.util.Deque;
24 import java.util.PriorityQueue;
25 import java.util.concurrent.atomic.LongAdder;
26 import java.util.concurrent.locks.ReentrantLock;
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137 final class PoolChunk<T> implements PoolChunkMetric {
138 private static final int SIZE_BIT_LENGTH = 15;
139 private static final int INUSED_BIT_LENGTH = 1;
140 private static final int SUBPAGE_BIT_LENGTH = 1;
141 private static final int BITMAP_IDX_BIT_LENGTH = 32;
142
143 static final int IS_SUBPAGE_SHIFT = BITMAP_IDX_BIT_LENGTH;
144 static final int IS_USED_SHIFT = SUBPAGE_BIT_LENGTH + IS_SUBPAGE_SHIFT;
145 static final int SIZE_SHIFT = INUSED_BIT_LENGTH + IS_USED_SHIFT;
146 static final int RUN_OFFSET_SHIFT = SIZE_BIT_LENGTH + SIZE_SHIFT;
147
148 final PoolArena<T> arena;
149 final Object base;
150 final T memory;
151 final boolean unpooled;
152
153
154
155
156 private final LongLongHashMap runsAvailMap;
157
158
159
160
161 private final IntPriorityQueue[] runsAvail;
162
163 private final ReentrantLock runsAvailLock;
164
165
166
167
168 private final PoolSubpage<T>[] subpages;
169
170
171
172
173 private final LongAdder pinnedBytes = new LongAdder();
174
175 final int pageSize;
176 final int pageShifts;
177 final int chunkSize;
178 final int maxPageIdx;
179
180
181
182
183
184
185 private final Deque<ByteBuffer> cachedNioBuffers;
186
187 int freeBytes;
188
189 PoolChunkList<T> parent;
190 PoolChunk<T> prev;
191 PoolChunk<T> next;
192
193
194
195
196 @SuppressWarnings("unchecked")
197 PoolChunk(PoolArena<T> arena, Object base, T memory, int pageSize, int pageShifts, int chunkSize, int maxPageIdx) {
198 unpooled = false;
199 this.arena = arena;
200 this.base = base;
201 this.memory = memory;
202 this.pageSize = pageSize;
203 this.pageShifts = pageShifts;
204 this.chunkSize = chunkSize;
205 this.maxPageIdx = maxPageIdx;
206 freeBytes = chunkSize;
207
208 runsAvail = newRunsAvailqueueArray(maxPageIdx);
209 runsAvailLock = new ReentrantLock();
210 runsAvailMap = new LongLongHashMap(-1);
211 subpages = new PoolSubpage[chunkSize >> pageShifts];
212
213
214 int pages = chunkSize >> pageShifts;
215 long initHandle = (long) pages << SIZE_SHIFT;
216 insertAvailRun(0, pages, initHandle);
217
218 cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);
219 }
220
221
222 PoolChunk(PoolArena<T> arena, Object base, T memory, int size) {
223 unpooled = true;
224 this.arena = arena;
225 this.base = base;
226 this.memory = memory;
227 pageSize = 0;
228 pageShifts = 0;
229 maxPageIdx = 0;
230 runsAvailMap = null;
231 runsAvail = null;
232 runsAvailLock = null;
233 subpages = null;
234 chunkSize = size;
235 cachedNioBuffers = null;
236 }
237
238 private static IntPriorityQueue[] newRunsAvailqueueArray(int size) {
239 IntPriorityQueue[] queueArray = new IntPriorityQueue[size];
240 for (int i = 0; i < queueArray.length; i++) {
241 queueArray[i] = new IntPriorityQueue();
242 }
243 return queueArray;
244 }
245
246 private void insertAvailRun(int runOffset, int pages, long handle) {
247 int pageIdxFloor = arena.sizeClass.pages2pageIdxFloor(pages);
248 IntPriorityQueue queue = runsAvail[pageIdxFloor];
249 assert isRun(handle);
250 queue.offer((int) (handle >> BITMAP_IDX_BIT_LENGTH));
251
252
253 insertAvailRun0(runOffset, handle);
254 if (pages > 1) {
255
256 insertAvailRun0(lastPage(runOffset, pages), handle);
257 }
258 }
259
260 private void insertAvailRun0(int runOffset, long handle) {
261 long pre = runsAvailMap.put(runOffset, handle);
262 assert pre == -1;
263 }
264
265 private void removeAvailRun(long handle) {
266 int pageIdxFloor = arena.sizeClass.pages2pageIdxFloor(runPages(handle));
267 runsAvail[pageIdxFloor].remove((int) (handle >> BITMAP_IDX_BIT_LENGTH));
268 removeAvailRun0(handle);
269 }
270
271 private void removeAvailRun0(long handle) {
272 int runOffset = runOffset(handle);
273 int pages = runPages(handle);
274
275 runsAvailMap.remove(runOffset);
276 if (pages > 1) {
277
278 runsAvailMap.remove(lastPage(runOffset, pages));
279 }
280 }
281
282 private static int lastPage(int runOffset, int pages) {
283 return runOffset + pages - 1;
284 }
285
286 private long getAvailRunByOffset(int runOffset) {
287 return runsAvailMap.get(runOffset);
288 }
289
290 @Override
291 public int usage() {
292 final int freeBytes;
293 if (this.unpooled) {
294 freeBytes = this.freeBytes;
295 } else {
296 runsAvailLock.lock();
297 try {
298 freeBytes = this.freeBytes;
299 } finally {
300 runsAvailLock.unlock();
301 }
302 }
303 return usage(freeBytes);
304 }
305
306 private int usage(int freeBytes) {
307 if (freeBytes == 0) {
308 return 100;
309 }
310
311 int freePercentage = (int) (freeBytes * 100L / chunkSize);
312 if (freePercentage == 0) {
313 return 99;
314 }
315 return 100 - freePercentage;
316 }
317
318 boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache cache) {
319 final long handle;
320 if (sizeIdx <= arena.sizeClass.smallMaxSizeIdx) {
321 final PoolSubpage<T> nextSub;
322
323
324
325 PoolSubpage<T> head = arena.smallSubpagePools[sizeIdx];
326 head.lock();
327 try {
328 nextSub = head.next;
329 if (nextSub != head) {
330 assert nextSub.doNotDestroy && nextSub.elemSize == arena.sizeClass.sizeIdx2size(sizeIdx) :
331 "doNotDestroy=" + nextSub.doNotDestroy + ", elemSize=" + nextSub.elemSize + ", sizeIdx=" +
332 sizeIdx;
333 handle = nextSub.allocate();
334 assert handle >= 0;
335 assert isSubpage(handle);
336 nextSub.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
337 return true;
338 }
339 handle = allocateSubpage(sizeIdx, head);
340 if (handle < 0) {
341 return false;
342 }
343 assert isSubpage(handle);
344 } finally {
345 head.unlock();
346 }
347 } else {
348
349
350 int runSize = arena.sizeClass.sizeIdx2size(sizeIdx);
351 handle = allocateRun(runSize);
352 if (handle < 0) {
353 return false;
354 }
355 assert !isSubpage(handle);
356 }
357
358 ByteBuffer nioBuffer = cachedNioBuffers != null? cachedNioBuffers.pollLast() : null;
359 initBuf(buf, nioBuffer, handle, reqCapacity, cache);
360 return true;
361 }
362
363 private long allocateRun(int runSize) {
364 int pages = runSize >> pageShifts;
365 int pageIdx = arena.sizeClass.pages2pageIdx(pages);
366
367 runsAvailLock.lock();
368 try {
369
370 int queueIdx = runFirstBestFit(pageIdx);
371 if (queueIdx == -1) {
372 return -1;
373 }
374
375
376 IntPriorityQueue queue = runsAvail[queueIdx];
377 long handle = queue.poll();
378 assert handle != IntPriorityQueue.NO_VALUE;
379 handle <<= BITMAP_IDX_BIT_LENGTH;
380 assert !isUsed(handle) : "invalid handle: " + handle;
381
382 removeAvailRun0(handle);
383
384 handle = splitLargeRun(handle, pages);
385
386 int pinnedSize = runSize(pageShifts, handle);
387 freeBytes -= pinnedSize;
388 return handle;
389 } finally {
390 runsAvailLock.unlock();
391 }
392 }
393
394 private int calculateRunSize(int sizeIdx) {
395 int maxElements = 1 << pageShifts - SizeClasses.LOG2_QUANTUM;
396 int runSize = 0;
397 int nElements;
398
399 final int elemSize = arena.sizeClass.sizeIdx2size(sizeIdx);
400
401
402 do {
403 runSize += pageSize;
404 nElements = runSize / elemSize;
405 } while (nElements < maxElements && runSize != nElements * elemSize);
406
407 while (nElements > maxElements) {
408 runSize -= pageSize;
409 nElements = runSize / elemSize;
410 }
411
412 assert nElements > 0;
413 assert runSize <= chunkSize;
414 assert runSize >= elemSize;
415
416 return runSize;
417 }
418
419 private int runFirstBestFit(int pageIdx) {
420 if (freeBytes == chunkSize) {
421 return arena.sizeClass.nPSizes - 1;
422 }
423 for (int i = pageIdx; i < arena.sizeClass.nPSizes; i++) {
424 IntPriorityQueue queue = runsAvail[i];
425 if (queue != null && !queue.isEmpty()) {
426 return i;
427 }
428 }
429 return -1;
430 }
431
432 private long splitLargeRun(long handle, int needPages) {
433 assert needPages > 0;
434
435 int totalPages = runPages(handle);
436 assert needPages <= totalPages;
437
438 int remPages = totalPages - needPages;
439
440 if (remPages > 0) {
441 int runOffset = runOffset(handle);
442
443
444 int availOffset = runOffset + needPages;
445 long availRun = toRunHandle(availOffset, remPages, 0);
446 insertAvailRun(availOffset, remPages, availRun);
447
448
449 return toRunHandle(runOffset, needPages, 1);
450 }
451
452
453 handle |= 1L << IS_USED_SHIFT;
454 return handle;
455 }
456
457
458
459
460
461
462
463
464
465
466 private long allocateSubpage(int sizeIdx, PoolSubpage<T> head) {
467
468 int runSize = calculateRunSize(sizeIdx);
469
470 long runHandle = allocateRun(runSize);
471 if (runHandle < 0) {
472 return -1;
473 }
474
475 int runOffset = runOffset(runHandle);
476 assert subpages[runOffset] == null;
477 int elemSize = arena.sizeClass.sizeIdx2size(sizeIdx);
478
479 PoolSubpage<T> subpage = new PoolSubpage<T>(head, this, pageShifts, runOffset,
480 runSize(pageShifts, runHandle), elemSize);
481
482 subpages[runOffset] = subpage;
483 return subpage.allocate();
484 }
485
486
487
488
489
490
491
492
493 void free(long handle, int normCapacity, ByteBuffer nioBuffer) {
494 if (isSubpage(handle)) {
495 int sIdx = runOffset(handle);
496 PoolSubpage<T> subpage = subpages[sIdx];
497 assert subpage != null;
498 PoolSubpage<T> head = subpage.chunk.arena.smallSubpagePools[subpage.headIndex];
499
500
501 head.lock();
502 try {
503 assert subpage.doNotDestroy;
504 if (subpage.free(head, bitmapIdx(handle))) {
505
506 return;
507 }
508 assert !subpage.doNotDestroy;
509
510 subpages[sIdx] = null;
511 } finally {
512 head.unlock();
513 }
514 }
515
516 int runSize = runSize(pageShifts, handle);
517
518 runsAvailLock.lock();
519 try {
520
521
522 long finalRun = collapseRuns(handle);
523
524
525 finalRun &= ~(1L << IS_USED_SHIFT);
526
527 finalRun &= ~(1L << IS_SUBPAGE_SHIFT);
528
529 insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);
530 freeBytes += runSize;
531 } finally {
532 runsAvailLock.unlock();
533 }
534
535 if (nioBuffer != null && cachedNioBuffers != null &&
536 cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
537 cachedNioBuffers.offer(nioBuffer);
538 }
539 }
540
541 private long collapseRuns(long handle) {
542 return collapseNext(collapsePast(handle));
543 }
544
545 private long collapsePast(long handle) {
546 for (;;) {
547 int runOffset = runOffset(handle);
548 int runPages = runPages(handle);
549
550 long pastRun = getAvailRunByOffset(runOffset - 1);
551 if (pastRun == -1) {
552 return handle;
553 }
554
555 int pastOffset = runOffset(pastRun);
556 int pastPages = runPages(pastRun);
557
558
559 if (pastRun != handle && pastOffset + pastPages == runOffset) {
560
561 removeAvailRun(pastRun);
562 handle = toRunHandle(pastOffset, pastPages + runPages, 0);
563 } else {
564 return handle;
565 }
566 }
567 }
568
569 private long collapseNext(long handle) {
570 for (;;) {
571 int runOffset = runOffset(handle);
572 int runPages = runPages(handle);
573
574 long nextRun = getAvailRunByOffset(runOffset + runPages);
575 if (nextRun == -1) {
576 return handle;
577 }
578
579 int nextOffset = runOffset(nextRun);
580 int nextPages = runPages(nextRun);
581
582
583 if (nextRun != handle && runOffset + runPages == nextOffset) {
584
585 removeAvailRun(nextRun);
586 handle = toRunHandle(runOffset, runPages + nextPages, 0);
587 } else {
588 return handle;
589 }
590 }
591 }
592
593 private static long toRunHandle(int runOffset, int runPages, int inUsed) {
594 return (long) runOffset << RUN_OFFSET_SHIFT
595 | (long) runPages << SIZE_SHIFT
596 | (long) inUsed << IS_USED_SHIFT;
597 }
598
599 void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
600 PoolThreadCache threadCache) {
601 if (isSubpage(handle)) {
602 initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);
603 } else {
604 int maxLength = runSize(pageShifts, handle);
605 buf.init(this, nioBuffer, handle, runOffset(handle) << pageShifts,
606 reqCapacity, maxLength, arena.parent.threadCache());
607 }
608 }
609
610 void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
611 PoolThreadCache threadCache) {
612 int runOffset = runOffset(handle);
613 int bitmapIdx = bitmapIdx(handle);
614
615 PoolSubpage<T> s = subpages[runOffset];
616 assert s.isDoNotDestroy();
617 assert reqCapacity <= s.elemSize : reqCapacity + "<=" + s.elemSize;
618
619 int offset = (runOffset << pageShifts) + bitmapIdx * s.elemSize;
620 buf.init(this, nioBuffer, handle, offset, reqCapacity, s.elemSize, threadCache);
621 }
622
623 void incrementPinnedMemory(int delta) {
624 assert delta > 0;
625 pinnedBytes.add(delta);
626 }
627
628 void decrementPinnedMemory(int delta) {
629 assert delta > 0;
630 pinnedBytes.add(-delta);
631 }
632
633 @Override
634 public int chunkSize() {
635 return chunkSize;
636 }
637
638 @Override
639 public int freeBytes() {
640 if (this.unpooled) {
641 return freeBytes;
642 }
643 runsAvailLock.lock();
644 try {
645 return freeBytes;
646 } finally {
647 runsAvailLock.unlock();
648 }
649 }
650
651 public int pinnedBytes() {
652 return (int) pinnedBytes.sum();
653 }
654
655 @Override
656 public String toString() {
657 final int freeBytes;
658 if (this.unpooled) {
659 freeBytes = this.freeBytes;
660 } else {
661 runsAvailLock.lock();
662 try {
663 freeBytes = this.freeBytes;
664 } finally {
665 runsAvailLock.unlock();
666 }
667 }
668
669 return new StringBuilder()
670 .append("Chunk(")
671 .append(Integer.toHexString(System.identityHashCode(this)))
672 .append(": ")
673 .append(usage(freeBytes))
674 .append("%, ")
675 .append(chunkSize - freeBytes)
676 .append('/')
677 .append(chunkSize)
678 .append(')')
679 .toString();
680 }
681
682 void destroy() {
683 arena.destroyChunk(this);
684 }
685
686 static int runOffset(long handle) {
687 return (int) (handle >> RUN_OFFSET_SHIFT);
688 }
689
690 static int runSize(int pageShifts, long handle) {
691 return runPages(handle) << pageShifts;
692 }
693
694 static int runPages(long handle) {
695 return (int) (handle >> SIZE_SHIFT & 0x7fff);
696 }
697
698 static boolean isUsed(long handle) {
699 return (handle >> IS_USED_SHIFT & 1) == 1L;
700 }
701
702 static boolean isRun(long handle) {
703 return !isSubpage(handle);
704 }
705
706 static boolean isSubpage(long handle) {
707 return (handle >> IS_SUBPAGE_SHIFT & 1) == 1L;
708 }
709
710 static int bitmapIdx(long handle) {
711 return (int) handle;
712 }
713 }