1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.buffer.api.pool;
17
18 import io.netty5.buffer.api.AllocatorControl;
19 import io.netty5.buffer.api.Buffer;
20 import io.netty5.buffer.api.Drop;
21 import io.netty5.buffer.api.MemoryManager;
22 import io.netty5.buffer.api.internal.ArcDrop;
23 import io.netty5.buffer.api.internal.CleanerDrop;
24 import io.netty5.buffer.api.internal.DropCaptor;
25 import io.netty5.util.internal.LongLongHashMap;
26 import io.netty5.util.internal.LongPriorityQueue;
27 import io.netty5.util.internal.logging.InternalLogger;
28 import io.netty5.util.internal.logging.InternalLoggerFactory;
29
30 import java.util.PriorityQueue;
31 import java.util.concurrent.locks.ReentrantLock;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142 final class PoolChunk implements PoolChunkMetric {
143 private static final InternalLogger LOGGER = InternalLoggerFactory.getInstance(PoolChunk.class);
144 private static final int SIZE_BIT_LENGTH = 15;
145 private static final int INUSED_BIT_LENGTH = 1;
146 private static final int SUBPAGE_BIT_LENGTH = 1;
147 private static final int BITMAP_IDX_BIT_LENGTH = 32;
148 private static final AllocatorControl CONTROL = () -> {
149 throw new AssertionError("PoolChunk base allocations should never need to access their allocator.");
150 };
151
152 static final int IS_SUBPAGE_SHIFT = BITMAP_IDX_BIT_LENGTH;
153 static final int IS_USED_SHIFT = SUBPAGE_BIT_LENGTH + IS_SUBPAGE_SHIFT;
154 static final int SIZE_SHIFT = INUSED_BIT_LENGTH + IS_USED_SHIFT;
155 static final int RUN_OFFSET_SHIFT = SIZE_BIT_LENGTH + SIZE_SHIFT;
156
157 final PoolArena arena;
158 final Buffer base;
159 final Object memory;
160 final Drop<Buffer> baseDrop;
161
162
163
164
165 private final LongLongHashMap runsAvailMap;
166
167
168
169
170 private final LongPriorityQueue[] runsAvail;
171
172 private final ReentrantLock runsAvailLock;
173
174
175
176
177 private final PoolSubpage[] subpages;
178
179 private final int pageSize;
180 private final int pageShifts;
181 private final int chunkSize;
182
183 int freeBytes;
184 int pinnedBytes;
185
186 PoolChunkList parent;
187 PoolChunk prev;
188 PoolChunk next;
189
190 PoolChunk(PoolArena arena, int pageSize, int pageShifts, int chunkSize, int maxPageIdx) {
191 this.arena = arena;
192 MemoryManager manager = arena.manager;
193
194
195 DropCaptor<Buffer> dropCaptor = new DropCaptor<>();
196 base = manager.allocateShared(CONTROL, chunkSize, drop ->
197 dropCaptor.capture(ArcDrop.wrap(CleanerDrop.wrap(drop, manager))), arena.allocationType);
198 baseDrop = dropCaptor.getDrop();
199 memory = manager.unwrapRecoverableMemory(base);
200 baseDrop.attach(base);
201 this.pageSize = pageSize;
202 this.pageShifts = pageShifts;
203 this.chunkSize = chunkSize;
204 freeBytes = chunkSize;
205
206 runsAvail = newRunsAvailqueueArray(maxPageIdx);
207 runsAvailLock = new ReentrantLock();
208 runsAvailMap = new LongLongHashMap(-1);
209 subpages = new PoolSubpage[chunkSize >> pageShifts];
210
211
212 int pages = chunkSize >> pageShifts;
213 long initHandle = (long) pages << SIZE_SHIFT;
214 insertAvailRun(0, pages, initHandle);
215 }
216
217 private static LongPriorityQueue[] newRunsAvailqueueArray(int size) {
218 LongPriorityQueue[] queueArray = new LongPriorityQueue[size];
219 for (int i = 0; i < queueArray.length; i++) {
220 queueArray[i] = new LongPriorityQueue();
221 }
222 return queueArray;
223 }
224
225 private void insertAvailRun(int runOffset, int pages, long handle) {
226 int pageIdxFloor = arena.pages2pageIdxFloor(pages);
227 LongPriorityQueue queue = runsAvail[pageIdxFloor];
228 queue.offer(handle);
229
230
231 insertAvailRun0(runOffset, handle);
232 if (pages > 1) {
233
234 insertAvailRun0(lastPage(runOffset, pages), handle);
235 }
236 }
237
238 private void insertAvailRun0(int runOffset, long handle) {
239 long pre = runsAvailMap.put(runOffset, handle);
240 assert pre == -1;
241 }
242
243 private void removeAvailRun(long handle) {
244 int pageIdxFloor = arena.pages2pageIdxFloor(runPages(handle));
245 LongPriorityQueue queue = runsAvail[pageIdxFloor];
246 removeAvailRun(queue, handle);
247 }
248
249 private void removeAvailRun(LongPriorityQueue queue, long handle) {
250 queue.remove(handle);
251
252 int runOffset = runOffset(handle);
253 int pages = runPages(handle);
254
255 runsAvailMap.remove(runOffset);
256 if (pages > 1) {
257
258 runsAvailMap.remove(lastPage(runOffset, pages));
259 }
260 }
261
262 private static int lastPage(int runOffset, int pages) {
263 return runOffset + pages - 1;
264 }
265
266 private long getAvailRunByOffset(int runOffset) {
267 return runsAvailMap.get(runOffset);
268 }
269
270 @Override
271 public int usage() {
272 final int freeBytes;
273 arena.lock();
274 try {
275 freeBytes = this.freeBytes;
276 } finally {
277 arena.unlock();
278 }
279 return usage(freeBytes);
280 }
281
282 private int usage(int freeBytes) {
283 if (freeBytes == 0) {
284 return 100;
285 }
286
287 int freePercentage = (int) (freeBytes * 100L / chunkSize);
288 if (freePercentage == 0) {
289 return 99;
290 }
291 return 100 - freePercentage;
292 }
293
294 UntetheredMemory allocate(int size, int sizeIdx, PoolThreadCache cache) {
295 final long handle;
296 if (sizeIdx <= arena.smallMaxSizeIdx) {
297
298 handle = allocateSubpage(sizeIdx);
299 if (handle < 0) {
300 return null;
301 }
302 assert isSubpage(handle);
303 } else {
304
305
306 int runSize = arena.sizeIdx2size(sizeIdx);
307 handle = allocateRun(runSize);
308 if (handle < 0) {
309 return null;
310 }
311 }
312
313 return allocateBuffer(handle, size, cache);
314 }
315
316 private long allocateRun(int runSize) {
317 int pages = runSize >> pageShifts;
318 int pageIdx = arena.pages2pageIdx(pages);
319 runsAvailLock.lock();
320 try {
321
322 int queueIdx = runFirstBestFit(pageIdx);
323 if (queueIdx == -1) {
324 return -1;
325 }
326
327
328 LongPriorityQueue queue = runsAvail[queueIdx];
329 long handle = queue.poll();
330
331 assert handle != LongPriorityQueue.NO_VALUE && !isUsed(handle) : "invalid handle: " + handle;
332
333 removeAvailRun(queue, handle);
334
335 if (handle != -1) {
336 handle = splitLargeRun(handle, pages);
337 }
338
339 int pinnedSize = runSize(pageShifts, handle);
340 freeBytes -= pinnedSize;
341 pinnedBytes += pinnedSize;
342 return handle;
343 } finally {
344 runsAvailLock.unlock();
345 }
346 }
347
348 private int calculateRunSize(int sizeIdx) {
349 int maxElements = 1 << pageShifts - SizeClasses.LOG2_QUANTUM;
350 int runSize = 0;
351 int nElements;
352
353 final int elemSize = arena.sizeIdx2size(sizeIdx);
354
355
356 do {
357 runSize += pageSize;
358 nElements = runSize / elemSize;
359 } while (nElements < maxElements && runSize != nElements * elemSize);
360
361 while (nElements > maxElements) {
362 runSize -= pageSize;
363 nElements = runSize / elemSize;
364 }
365
366 assert nElements > 0;
367 assert runSize <= chunkSize;
368 assert runSize >= elemSize;
369
370 return runSize;
371 }
372
373 private int runFirstBestFit(int pageIdx) {
374 if (freeBytes == chunkSize) {
375 return arena.nPSizes - 1;
376 }
377 for (int i = pageIdx; i < arena.nPSizes; i++) {
378 LongPriorityQueue queue = runsAvail[i];
379 if (queue != null && !queue.isEmpty()) {
380 return i;
381 }
382 }
383 return -1;
384 }
385
386 private long splitLargeRun(long handle, int needPages) {
387 assert needPages > 0;
388
389 int totalPages = runPages(handle);
390 assert needPages <= totalPages;
391
392 int remPages = totalPages - needPages;
393
394 if (remPages > 0) {
395 int runOffset = runOffset(handle);
396
397
398 int availOffset = runOffset + needPages;
399 long availRun = toRunHandle(availOffset, remPages, 0);
400 insertAvailRun(availOffset, remPages, availRun);
401
402
403 return toRunHandle(runOffset, needPages, 1);
404 }
405
406
407 handle |= 1L << IS_USED_SHIFT;
408 return handle;
409 }
410
411
412
413
414
415
416
417
418
419 private long allocateSubpage(int sizeIdx) {
420
421
422 PoolSubpage head = arena.findSubpagePoolHead(sizeIdx);
423 head.lock();
424 try {
425
426 int runSize = calculateRunSize(sizeIdx);
427
428 long runHandle = allocateRun(runSize);
429 if (runHandle < 0) {
430 return -1;
431 }
432
433 int runOffset = runOffset(runHandle);
434 assert subpages[runOffset] == null;
435 int elemSize = arena.sizeIdx2size(sizeIdx);
436
437 PoolSubpage subpage = new PoolSubpage(head, this, pageShifts, runOffset,
438 runSize(pageShifts, runHandle), elemSize);
439
440 subpages[runOffset] = subpage;
441 return subpage.allocate();
442 } finally {
443 head.unlock();
444 }
445 }
446
447
448
449
450
451
452
453
454 void free(long handle, int normCapacity) {
455 int runSize = runSize(pageShifts, handle);
456 pinnedBytes -= runSize;
457 if (isSubpage(handle)) {
458 int sizeIdx = arena.size2SizeIdx(normCapacity);
459 PoolSubpage head = arena.findSubpagePoolHead(sizeIdx);
460
461 int sIdx = runOffset(handle);
462 PoolSubpage subpage = subpages[sIdx];
463 assert subpage != null && subpage.doNotDestroy;
464
465
466
467 head.lock();
468 try {
469 if (subpage.free(head, bitmapIdx(handle))) {
470
471 return;
472 }
473 assert !subpage.doNotDestroy;
474
475 subpages[sIdx] = null;
476 } finally {
477 head.unlock();
478 }
479 }
480
481
482 runsAvailLock.lock();
483 try {
484
485
486 long finalRun = collapseRuns(handle);
487
488
489 finalRun &= ~(1L << IS_USED_SHIFT);
490
491 finalRun &= ~(1L << IS_SUBPAGE_SHIFT);
492
493 insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);
494 freeBytes += runSize;
495 } finally {
496 runsAvailLock.unlock();
497 }
498 }
499
500 private long collapseRuns(long handle) {
501 return collapseNext(collapsePast(handle));
502 }
503
504 private long collapsePast(long handle) {
505 for (;;) {
506 int runOffset = runOffset(handle);
507 int runPages = runPages(handle);
508
509 long pastRun = getAvailRunByOffset(runOffset - 1);
510 if (pastRun == -1) {
511 return handle;
512 }
513
514 int pastOffset = runOffset(pastRun);
515 int pastPages = runPages(pastRun);
516
517
518 if (pastRun != handle && pastOffset + pastPages == runOffset) {
519
520 removeAvailRun(pastRun);
521 handle = toRunHandle(pastOffset, pastPages + runPages, 0);
522 } else {
523 return handle;
524 }
525 }
526 }
527
528 private long collapseNext(long handle) {
529 for (;;) {
530 int runOffset = runOffset(handle);
531 int runPages = runPages(handle);
532
533 long nextRun = getAvailRunByOffset(runOffset + runPages);
534 if (nextRun == -1) {
535 return handle;
536 }
537
538 int nextOffset = runOffset(nextRun);
539 int nextPages = runPages(nextRun);
540
541
542 if (nextRun != handle && runOffset + runPages == nextOffset) {
543
544 removeAvailRun(nextRun);
545 handle = toRunHandle(runOffset, runPages + nextPages, 0);
546 } else {
547 return handle;
548 }
549 }
550 }
551
552 private static long toRunHandle(int runOffset, int runPages, int inUsed) {
553 return (long) runOffset << RUN_OFFSET_SHIFT
554 | (long) runPages << SIZE_SHIFT
555 | (long) inUsed << IS_USED_SHIFT;
556 }
557
558 UntetheredMemory allocateBuffer(long handle, int size, PoolThreadCache threadCache) {
559 if (isSubpage(handle)) {
560 return allocateBufferWithSubpage(handle, size, threadCache);
561 } else {
562 int offset = runOffset(handle) << pageShifts;
563 int maxLength = runSize(pageShifts, handle);
564 PoolThreadCache poolThreadCache = arena.parent.threadCache();
565 return new UntetheredChunkAllocation(
566 memory, this, poolThreadCache, handle, maxLength, offset, size);
567 }
568 }
569
570 UntetheredMemory allocateBufferWithSubpage(long handle, int size, PoolThreadCache threadCache) {
571 int runOffset = runOffset(handle);
572 int bitmapIdx = bitmapIdx(handle);
573
574 PoolSubpage s = subpages[runOffset];
575 assert s.doNotDestroy;
576 assert size <= s.elemSize : size + "<=" + s.elemSize;
577
578 int offset = (runOffset << pageShifts) + bitmapIdx * s.elemSize;
579 return new UntetheredChunkAllocation(memory, this, threadCache, handle, s.elemSize, offset, size);
580 }
581
582 @SuppressWarnings("unchecked")
583 private static final class UntetheredChunkAllocation implements UntetheredMemory {
584 private final Object memory;
585 private final PoolChunk chunk;
586 private final PoolThreadCache threadCache;
587 private final long handle;
588 private final int maxLength;
589
590 private UntetheredChunkAllocation(
591 Object memory, PoolChunk chunk, PoolThreadCache threadCache,
592 long handle, int maxLength, int offset, int size) {
593 try {
594 this.memory = chunk.arena.manager.sliceMemory(memory, offset, size);
595 } catch (Exception e) {
596 LOGGER.error("Failed to create slice of pool chunk memory. Chunk layout: " +
597 chunk.toString() + ", memory: " + chunk.memory, e);
598 throw e;
599 }
600 this.chunk = chunk;
601 this.threadCache = threadCache;
602 this.handle = handle;
603 this.maxLength = maxLength;
604 }
605
606 @Override
607 public <Memory> Memory memory() {
608 return (Memory) memory;
609 }
610
611 @Override
612 public <BufferType extends Buffer> Drop<BufferType> drop() {
613 var pooledDrop = ArcDrop.wrap(new PooledDrop(chunk, threadCache, handle, maxLength));
614 return (Drop<BufferType>) CleanerDrop.wrap(pooledDrop, chunk.arena.manager);
615 }
616 }
617
618 @Override
619 public int chunkSize() {
620 return chunkSize;
621 }
622
623 @Override
624 public int freeBytes() {
625 arena.lock();
626 try {
627 return freeBytes;
628 } finally {
629 arena.unlock();
630 }
631 }
632
633 @Override
634 public int pinnedBytes() {
635 arena.lock();
636 try {
637 return pinnedBytes;
638 } finally {
639 arena.unlock();
640 }
641 }
642
643 @Override
644 public String toString() {
645 final int freeBytes;
646 arena.lock();
647 try {
648 freeBytes = this.freeBytes;
649 } finally {
650 arena.unlock();
651 }
652
653 return new StringBuilder()
654 .append("Chunk(")
655 .append(Integer.toHexString(System.identityHashCode(this)))
656 .append(": ")
657 .append(usage(freeBytes))
658 .append("%, ")
659 .append(chunkSize - freeBytes)
660 .append('/')
661 .append(chunkSize)
662 .append(')')
663 .toString();
664 }
665
666 void destroy() {
667 baseDrop.drop(base);
668 }
669
670 static int runOffset(long handle) {
671 return (int) (handle >> RUN_OFFSET_SHIFT);
672 }
673
674 static int runSize(int pageShifts, long handle) {
675 return runPages(handle) << pageShifts;
676 }
677
678 static int runPages(long handle) {
679 return (int) (handle >> SIZE_SHIFT & 0x7fff);
680 }
681
682 static boolean isUsed(long handle) {
683 return (handle >> IS_USED_SHIFT & 1) == 1L;
684 }
685
686 static boolean isSubpage(long handle) {
687 return (handle >> IS_SUBPAGE_SHIFT & 1) == 1L;
688 }
689
690 static int bitmapIdx(long handle) {
691 return (int) handle;
692 }
693 }