1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.internal.LongCounter;
19 import io.netty.util.internal.PlatformDependent;
20
21 import java.nio.ByteBuffer;
22 import java.util.ArrayDeque;
23 import java.util.Deque;
24 import java.util.PriorityQueue;
25 import java.util.concurrent.locks.ReentrantLock;
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136 final class PoolChunk<T> implements PoolChunkMetric {
137 private static final int SIZE_BIT_LENGTH = 15;
138 private static final int INUSED_BIT_LENGTH = 1;
139 private static final int SUBPAGE_BIT_LENGTH = 1;
140 private static final int BITMAP_IDX_BIT_LENGTH = 32;
141
142 static final int IS_SUBPAGE_SHIFT = BITMAP_IDX_BIT_LENGTH;
143 static final int IS_USED_SHIFT = SUBPAGE_BIT_LENGTH + IS_SUBPAGE_SHIFT;
144 static final int SIZE_SHIFT = INUSED_BIT_LENGTH + IS_USED_SHIFT;
145 static final int RUN_OFFSET_SHIFT = SIZE_BIT_LENGTH + SIZE_SHIFT;
146
147 final PoolArena<T> arena;
148 final Object base;
149 final T memory;
150 final boolean unpooled;
151
152
153
154
155 private final LongLongHashMap runsAvailMap;
156
157
158
159
160 private final IntPriorityQueue[] runsAvail;
161
162 private final ReentrantLock runsAvailLock;
163
164
165
166
167 private final PoolSubpage<T>[] subpages;
168
169
170
171
172 private final LongCounter pinnedBytes = PlatformDependent.newLongCounter();
173
174 final int pageSize;
175 final int pageShifts;
176 final int chunkSize;
177 final int maxPageIdx;
178
179
180
181
182
183
184 private final Deque<ByteBuffer> cachedNioBuffers;
185
186 int freeBytes;
187
188 PoolChunkList<T> parent;
189 PoolChunk<T> prev;
190 PoolChunk<T> next;
191
192
193
194
195 @SuppressWarnings("unchecked")
196 PoolChunk(PoolArena<T> arena, Object base, T memory, int pageSize, int pageShifts, int chunkSize, int maxPageIdx) {
197 unpooled = false;
198 this.arena = arena;
199 this.base = base;
200 this.memory = memory;
201 this.pageSize = pageSize;
202 this.pageShifts = pageShifts;
203 this.chunkSize = chunkSize;
204 this.maxPageIdx = maxPageIdx;
205 freeBytes = chunkSize;
206
207 runsAvail = newRunsAvailqueueArray(maxPageIdx);
208 runsAvailLock = new ReentrantLock();
209 runsAvailMap = new LongLongHashMap(-1);
210 subpages = new PoolSubpage[chunkSize >> pageShifts];
211
212
213 int pages = chunkSize >> pageShifts;
214 long initHandle = (long) pages << SIZE_SHIFT;
215 insertAvailRun(0, pages, initHandle);
216
217 cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);
218 }
219
220
221 PoolChunk(PoolArena<T> arena, Object base, T memory, int size) {
222 unpooled = true;
223 this.arena = arena;
224 this.base = base;
225 this.memory = memory;
226 pageSize = 0;
227 pageShifts = 0;
228 maxPageIdx = 0;
229 runsAvailMap = null;
230 runsAvail = null;
231 runsAvailLock = null;
232 subpages = null;
233 chunkSize = size;
234 cachedNioBuffers = null;
235 }
236
237 private static IntPriorityQueue[] newRunsAvailqueueArray(int size) {
238 IntPriorityQueue[] queueArray = new IntPriorityQueue[size];
239 for (int i = 0; i < queueArray.length; i++) {
240 queueArray[i] = new IntPriorityQueue();
241 }
242 return queueArray;
243 }
244
245 private void insertAvailRun(int runOffset, int pages, long handle) {
246 int pageIdxFloor = arena.sizeClass.pages2pageIdxFloor(pages);
247 IntPriorityQueue queue = runsAvail[pageIdxFloor];
248 assert isRun(handle);
249 queue.offer((int) (handle >> BITMAP_IDX_BIT_LENGTH));
250
251
252 insertAvailRun0(runOffset, handle);
253 if (pages > 1) {
254
255 insertAvailRun0(lastPage(runOffset, pages), handle);
256 }
257 }
258
259 private void insertAvailRun0(int runOffset, long handle) {
260 long pre = runsAvailMap.put(runOffset, handle);
261 assert pre == -1;
262 }
263
264 private void removeAvailRun(long handle) {
265 int pageIdxFloor = arena.sizeClass.pages2pageIdxFloor(runPages(handle));
266 runsAvail[pageIdxFloor].remove((int) (handle >> BITMAP_IDX_BIT_LENGTH));
267 removeAvailRun0(handle);
268 }
269
270 private void removeAvailRun0(long handle) {
271 int runOffset = runOffset(handle);
272 int pages = runPages(handle);
273
274 runsAvailMap.remove(runOffset);
275 if (pages > 1) {
276
277 runsAvailMap.remove(lastPage(runOffset, pages));
278 }
279 }
280
281 private static int lastPage(int runOffset, int pages) {
282 return runOffset + pages - 1;
283 }
284
285 private long getAvailRunByOffset(int runOffset) {
286 return runsAvailMap.get(runOffset);
287 }
288
289 @Override
290 public int usage() {
291 final int freeBytes;
292 if (this.unpooled) {
293 freeBytes = this.freeBytes;
294 } else {
295 runsAvailLock.lock();
296 try {
297 freeBytes = this.freeBytes;
298 } finally {
299 runsAvailLock.unlock();
300 }
301 }
302 return usage(freeBytes);
303 }
304
305 private int usage(int freeBytes) {
306 if (freeBytes == 0) {
307 return 100;
308 }
309
310 int freePercentage = (int) (freeBytes * 100L / chunkSize);
311 if (freePercentage == 0) {
312 return 99;
313 }
314 return 100 - freePercentage;
315 }
316
317 boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache cache) {
318 final long handle;
319 if (sizeIdx <= arena.sizeClass.smallMaxSizeIdx) {
320 final PoolSubpage<T> nextSub;
321
322
323
324 PoolSubpage<T> head = arena.smallSubpagePools[sizeIdx];
325 head.lock();
326 try {
327 nextSub = head.next;
328 if (nextSub != head) {
329 assert nextSub.doNotDestroy && nextSub.elemSize == arena.sizeClass.sizeIdx2size(sizeIdx) :
330 "doNotDestroy=" + nextSub.doNotDestroy + ", elemSize=" + nextSub.elemSize + ", sizeIdx=" +
331 sizeIdx;
332 handle = nextSub.allocate();
333 assert handle >= 0;
334 assert isSubpage(handle);
335 nextSub.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
336 return true;
337 }
338 handle = allocateSubpage(sizeIdx, head);
339 if (handle < 0) {
340 return false;
341 }
342 assert isSubpage(handle);
343 } finally {
344 head.unlock();
345 }
346 } else {
347
348
349 int runSize = arena.sizeClass.sizeIdx2size(sizeIdx);
350 handle = allocateRun(runSize);
351 if (handle < 0) {
352 return false;
353 }
354 assert !isSubpage(handle);
355 }
356
357 ByteBuffer nioBuffer = cachedNioBuffers != null? cachedNioBuffers.pollLast() : null;
358 initBuf(buf, nioBuffer, handle, reqCapacity, cache);
359 return true;
360 }
361
362 private long allocateRun(int runSize) {
363 int pages = runSize >> pageShifts;
364 int pageIdx = arena.sizeClass.pages2pageIdx(pages);
365
366 runsAvailLock.lock();
367 try {
368
369 int queueIdx = runFirstBestFit(pageIdx);
370 if (queueIdx == -1) {
371 return -1;
372 }
373
374
375 IntPriorityQueue queue = runsAvail[queueIdx];
376 long handle = queue.poll();
377 assert handle != IntPriorityQueue.NO_VALUE;
378 handle <<= BITMAP_IDX_BIT_LENGTH;
379 assert !isUsed(handle) : "invalid handle: " + handle;
380
381 removeAvailRun0(handle);
382
383 handle = splitLargeRun(handle, pages);
384
385 int pinnedSize = runSize(pageShifts, handle);
386 freeBytes -= pinnedSize;
387 return handle;
388 } finally {
389 runsAvailLock.unlock();
390 }
391 }
392
393 private int calculateRunSize(int sizeIdx) {
394 int maxElements = 1 << pageShifts - SizeClasses.LOG2_QUANTUM;
395 int runSize = 0;
396 int nElements;
397
398 final int elemSize = arena.sizeClass.sizeIdx2size(sizeIdx);
399
400
401 do {
402 runSize += pageSize;
403 nElements = runSize / elemSize;
404 } while (nElements < maxElements && runSize != nElements * elemSize);
405
406 while (nElements > maxElements) {
407 runSize -= pageSize;
408 nElements = runSize / elemSize;
409 }
410
411 assert nElements > 0;
412 assert runSize <= chunkSize;
413 assert runSize >= elemSize;
414
415 return runSize;
416 }
417
418 private int runFirstBestFit(int pageIdx) {
419 if (freeBytes == chunkSize) {
420 return arena.sizeClass.nPSizes - 1;
421 }
422 for (int i = pageIdx; i < arena.sizeClass.nPSizes; i++) {
423 IntPriorityQueue queue = runsAvail[i];
424 if (queue != null && !queue.isEmpty()) {
425 return i;
426 }
427 }
428 return -1;
429 }
430
431 private long splitLargeRun(long handle, int needPages) {
432 assert needPages > 0;
433
434 int totalPages = runPages(handle);
435 assert needPages <= totalPages;
436
437 int remPages = totalPages - needPages;
438
439 if (remPages > 0) {
440 int runOffset = runOffset(handle);
441
442
443 int availOffset = runOffset + needPages;
444 long availRun = toRunHandle(availOffset, remPages, 0);
445 insertAvailRun(availOffset, remPages, availRun);
446
447
448 return toRunHandle(runOffset, needPages, 1);
449 }
450
451
452 handle |= 1L << IS_USED_SHIFT;
453 return handle;
454 }
455
456
457
458
459
460
461
462
463
464
465 private long allocateSubpage(int sizeIdx, PoolSubpage<T> head) {
466
467 int runSize = calculateRunSize(sizeIdx);
468
469 long runHandle = allocateRun(runSize);
470 if (runHandle < 0) {
471 return -1;
472 }
473
474 int runOffset = runOffset(runHandle);
475 assert subpages[runOffset] == null;
476 int elemSize = arena.sizeClass.sizeIdx2size(sizeIdx);
477
478 PoolSubpage<T> subpage = new PoolSubpage<T>(head, this, pageShifts, runOffset,
479 runSize(pageShifts, runHandle), elemSize);
480
481 subpages[runOffset] = subpage;
482 return subpage.allocate();
483 }
484
485
486
487
488
489
490
491
492 void free(long handle, int normCapacity, ByteBuffer nioBuffer) {
493 if (isSubpage(handle)) {
494 int sIdx = runOffset(handle);
495 PoolSubpage<T> subpage = subpages[sIdx];
496 assert subpage != null;
497 PoolSubpage<T> head = subpage.chunk.arena.smallSubpagePools[subpage.headIndex];
498
499
500 head.lock();
501 try {
502 assert subpage.doNotDestroy;
503 if (subpage.free(head, bitmapIdx(handle))) {
504
505 return;
506 }
507 assert !subpage.doNotDestroy;
508
509 subpages[sIdx] = null;
510 } finally {
511 head.unlock();
512 }
513 }
514
515 int runSize = runSize(pageShifts, handle);
516
517 runsAvailLock.lock();
518 try {
519
520
521 long finalRun = collapseRuns(handle);
522
523
524 finalRun &= ~(1L << IS_USED_SHIFT);
525
526 finalRun &= ~(1L << IS_SUBPAGE_SHIFT);
527
528 insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);
529 freeBytes += runSize;
530 } finally {
531 runsAvailLock.unlock();
532 }
533
534 if (nioBuffer != null && cachedNioBuffers != null &&
535 cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
536 cachedNioBuffers.offer(nioBuffer);
537 }
538 }
539
540 private long collapseRuns(long handle) {
541 return collapseNext(collapsePast(handle));
542 }
543
544 private long collapsePast(long handle) {
545 for (;;) {
546 int runOffset = runOffset(handle);
547 int runPages = runPages(handle);
548
549 long pastRun = getAvailRunByOffset(runOffset - 1);
550 if (pastRun == -1) {
551 return handle;
552 }
553
554 int pastOffset = runOffset(pastRun);
555 int pastPages = runPages(pastRun);
556
557
558 if (pastRun != handle && pastOffset + pastPages == runOffset) {
559
560 removeAvailRun(pastRun);
561 handle = toRunHandle(pastOffset, pastPages + runPages, 0);
562 } else {
563 return handle;
564 }
565 }
566 }
567
568 private long collapseNext(long handle) {
569 for (;;) {
570 int runOffset = runOffset(handle);
571 int runPages = runPages(handle);
572
573 long nextRun = getAvailRunByOffset(runOffset + runPages);
574 if (nextRun == -1) {
575 return handle;
576 }
577
578 int nextOffset = runOffset(nextRun);
579 int nextPages = runPages(nextRun);
580
581
582 if (nextRun != handle && runOffset + runPages == nextOffset) {
583
584 removeAvailRun(nextRun);
585 handle = toRunHandle(runOffset, runPages + nextPages, 0);
586 } else {
587 return handle;
588 }
589 }
590 }
591
592 private static long toRunHandle(int runOffset, int runPages, int inUsed) {
593 return (long) runOffset << RUN_OFFSET_SHIFT
594 | (long) runPages << SIZE_SHIFT
595 | (long) inUsed << IS_USED_SHIFT;
596 }
597
598 void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
599 PoolThreadCache threadCache) {
600 if (isSubpage(handle)) {
601 initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);
602 } else {
603 int maxLength = runSize(pageShifts, handle);
604 buf.init(this, nioBuffer, handle, runOffset(handle) << pageShifts,
605 reqCapacity, maxLength, arena.parent.threadCache());
606 }
607 }
608
609 void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
610 PoolThreadCache threadCache) {
611 int runOffset = runOffset(handle);
612 int bitmapIdx = bitmapIdx(handle);
613
614 PoolSubpage<T> s = subpages[runOffset];
615 assert s.isDoNotDestroy();
616 assert reqCapacity <= s.elemSize : reqCapacity + "<=" + s.elemSize;
617
618 int offset = (runOffset << pageShifts) + bitmapIdx * s.elemSize;
619 buf.init(this, nioBuffer, handle, offset, reqCapacity, s.elemSize, threadCache);
620 }
621
622 void incrementPinnedMemory(int delta) {
623 assert delta > 0;
624 pinnedBytes.add(delta);
625 }
626
627 void decrementPinnedMemory(int delta) {
628 assert delta > 0;
629 pinnedBytes.add(-delta);
630 }
631
632 @Override
633 public int chunkSize() {
634 return chunkSize;
635 }
636
637 @Override
638 public int freeBytes() {
639 if (this.unpooled) {
640 return freeBytes;
641 }
642 runsAvailLock.lock();
643 try {
644 return freeBytes;
645 } finally {
646 runsAvailLock.unlock();
647 }
648 }
649
650 public int pinnedBytes() {
651 return (int) pinnedBytes.value();
652 }
653
654 @Override
655 public String toString() {
656 final int freeBytes;
657 if (this.unpooled) {
658 freeBytes = this.freeBytes;
659 } else {
660 runsAvailLock.lock();
661 try {
662 freeBytes = this.freeBytes;
663 } finally {
664 runsAvailLock.unlock();
665 }
666 }
667
668 return new StringBuilder()
669 .append("Chunk(")
670 .append(Integer.toHexString(System.identityHashCode(this)))
671 .append(": ")
672 .append(usage(freeBytes))
673 .append("%, ")
674 .append(chunkSize - freeBytes)
675 .append('/')
676 .append(chunkSize)
677 .append(')')
678 .toString();
679 }
680
681 void destroy() {
682 arena.destroyChunk(this);
683 }
684
685 static int runOffset(long handle) {
686 return (int) (handle >> RUN_OFFSET_SHIFT);
687 }
688
689 static int runSize(int pageShifts, long handle) {
690 return runPages(handle) << pageShifts;
691 }
692
693 static int runPages(long handle) {
694 return (int) (handle >> SIZE_SHIFT & 0x7fff);
695 }
696
697 static boolean isUsed(long handle) {
698 return (handle >> IS_USED_SHIFT & 1) == 1L;
699 }
700
701 static boolean isRun(long handle) {
702 return !isSubpage(handle);
703 }
704
705 static boolean isSubpage(long handle) {
706 return (handle >> IS_SUBPAGE_SHIFT & 1) == 1L;
707 }
708
709 static int bitmapIdx(long handle) {
710 return (int) handle;
711 }
712 }