1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.internal.LongCounter;
19 import io.netty.util.internal.PlatformDependent;
20
21 import java.nio.ByteBuffer;
22 import java.util.ArrayDeque;
23 import java.util.Deque;
24 import java.util.PriorityQueue;
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135 final class PoolChunk<T> implements PoolChunkMetric {
136 private static final int SIZE_BIT_LENGTH = 15;
137 private static final int INUSED_BIT_LENGTH = 1;
138 private static final int SUBPAGE_BIT_LENGTH = 1;
139 private static final int BITMAP_IDX_BIT_LENGTH = 32;
140
141 static final int IS_SUBPAGE_SHIFT = BITMAP_IDX_BIT_LENGTH;
142 static final int IS_USED_SHIFT = SUBPAGE_BIT_LENGTH + IS_SUBPAGE_SHIFT;
143 static final int SIZE_SHIFT = INUSED_BIT_LENGTH + IS_USED_SHIFT;
144 static final int RUN_OFFSET_SHIFT = SIZE_BIT_LENGTH + SIZE_SHIFT;
145
146 final PoolArena<T> arena;
147 final Object base;
148 final T memory;
149 final boolean unpooled;
150
151
152
153
154 private final LongLongHashMap runsAvailMap;
155
156
157
158
159 private final LongPriorityQueue[] runsAvail;
160
161
162
163
164 private final PoolSubpage<T>[] subpages;
165
166
167
168
169 private final LongCounter pinnedBytes = PlatformDependent.newLongCounter();
170
171 private final int pageSize;
172 private final int pageShifts;
173 private final int chunkSize;
174
175
176
177
178
179
180 private final Deque<ByteBuffer> cachedNioBuffers;
181
182 int freeBytes;
183
184 PoolChunkList<T> parent;
185 PoolChunk<T> prev;
186 PoolChunk<T> next;
187
188
189
190
191 @SuppressWarnings("unchecked")
192 PoolChunk(PoolArena<T> arena, Object base, T memory, int pageSize, int pageShifts, int chunkSize, int maxPageIdx) {
193 unpooled = false;
194 this.arena = arena;
195 this.base = base;
196 this.memory = memory;
197 this.pageSize = pageSize;
198 this.pageShifts = pageShifts;
199 this.chunkSize = chunkSize;
200 freeBytes = chunkSize;
201
202 runsAvail = newRunsAvailqueueArray(maxPageIdx);
203 runsAvailMap = new LongLongHashMap(-1);
204 subpages = new PoolSubpage[chunkSize >> pageShifts];
205
206
207 int pages = chunkSize >> pageShifts;
208 long initHandle = (long) pages << SIZE_SHIFT;
209 insertAvailRun(0, pages, initHandle);
210
211 cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);
212 }
213
214
215 PoolChunk(PoolArena<T> arena, Object base, T memory, int size) {
216 unpooled = true;
217 this.arena = arena;
218 this.base = base;
219 this.memory = memory;
220 pageSize = 0;
221 pageShifts = 0;
222 runsAvailMap = null;
223 runsAvail = null;
224 subpages = null;
225 chunkSize = size;
226 cachedNioBuffers = null;
227 }
228
229 private static LongPriorityQueue[] newRunsAvailqueueArray(int size) {
230 LongPriorityQueue[] queueArray = new LongPriorityQueue[size];
231 for (int i = 0; i < queueArray.length; i++) {
232 queueArray[i] = new LongPriorityQueue();
233 }
234 return queueArray;
235 }
236
237 private void insertAvailRun(int runOffset, int pages, long handle) {
238 int pageIdxFloor = arena.pages2pageIdxFloor(pages);
239 LongPriorityQueue queue = runsAvail[pageIdxFloor];
240 queue.offer(handle);
241
242
243 insertAvailRun0(runOffset, handle);
244 if (pages > 1) {
245
246 insertAvailRun0(lastPage(runOffset, pages), handle);
247 }
248 }
249
250 private void insertAvailRun0(int runOffset, long handle) {
251 long pre = runsAvailMap.put(runOffset, handle);
252 assert pre == -1;
253 }
254
255 private void removeAvailRun(long handle) {
256 int pageIdxFloor = arena.pages2pageIdxFloor(runPages(handle));
257 LongPriorityQueue queue = runsAvail[pageIdxFloor];
258 removeAvailRun(queue, handle);
259 }
260
261 private void removeAvailRun(LongPriorityQueue queue, long handle) {
262 queue.remove(handle);
263
264 int runOffset = runOffset(handle);
265 int pages = runPages(handle);
266
267 runsAvailMap.remove(runOffset);
268 if (pages > 1) {
269
270 runsAvailMap.remove(lastPage(runOffset, pages));
271 }
272 }
273
274 private static int lastPage(int runOffset, int pages) {
275 return runOffset + pages - 1;
276 }
277
278 private long getAvailRunByOffset(int runOffset) {
279 return runsAvailMap.get(runOffset);
280 }
281
282 @Override
283 public int usage() {
284 final int freeBytes;
285 synchronized (arena) {
286 freeBytes = this.freeBytes;
287 }
288 return usage(freeBytes);
289 }
290
291 private int usage(int freeBytes) {
292 if (freeBytes == 0) {
293 return 100;
294 }
295
296 int freePercentage = (int) (freeBytes * 100L / chunkSize);
297 if (freePercentage == 0) {
298 return 99;
299 }
300 return 100 - freePercentage;
301 }
302
303 boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache cache) {
304 final long handle;
305 if (sizeIdx <= arena.smallMaxSizeIdx) {
306
307 handle = allocateSubpage(sizeIdx);
308 if (handle < 0) {
309 return false;
310 }
311 assert isSubpage(handle);
312 } else {
313
314
315 int runSize = arena.sizeIdx2size(sizeIdx);
316 handle = allocateRun(runSize);
317 if (handle < 0) {
318 return false;
319 }
320 assert !isSubpage(handle);
321 }
322
323 ByteBuffer nioBuffer = cachedNioBuffers != null? cachedNioBuffers.pollLast() : null;
324 initBuf(buf, nioBuffer, handle, reqCapacity, cache);
325 return true;
326 }
327
328 private long allocateRun(int runSize) {
329 int pages = runSize >> pageShifts;
330 int pageIdx = arena.pages2pageIdx(pages);
331
332 synchronized (runsAvail) {
333
334 int queueIdx = runFirstBestFit(pageIdx);
335 if (queueIdx == -1) {
336 return -1;
337 }
338
339
340 LongPriorityQueue queue = runsAvail[queueIdx];
341 long handle = queue.poll();
342
343 assert handle != LongPriorityQueue.NO_VALUE && !isUsed(handle) : "invalid handle: " + handle;
344
345 removeAvailRun(queue, handle);
346
347 if (handle != -1) {
348 handle = splitLargeRun(handle, pages);
349 }
350
351 int pinnedSize = runSize(pageShifts, handle);
352 freeBytes -= pinnedSize;
353 return handle;
354 }
355 }
356
357 private int calculateRunSize(int sizeIdx) {
358 int maxElements = 1 << pageShifts - SizeClasses.LOG2_QUANTUM;
359 int runSize = 0;
360 int nElements;
361
362 final int elemSize = arena.sizeIdx2size(sizeIdx);
363
364
365 do {
366 runSize += pageSize;
367 nElements = runSize / elemSize;
368 } while (nElements < maxElements && runSize != nElements * elemSize);
369
370 while (nElements > maxElements) {
371 runSize -= pageSize;
372 nElements = runSize / elemSize;
373 }
374
375 assert nElements > 0;
376 assert runSize <= chunkSize;
377 assert runSize >= elemSize;
378
379 return runSize;
380 }
381
382 private int runFirstBestFit(int pageIdx) {
383 if (freeBytes == chunkSize) {
384 return arena.nPSizes - 1;
385 }
386 for (int i = pageIdx; i < arena.nPSizes; i++) {
387 LongPriorityQueue queue = runsAvail[i];
388 if (queue != null && !queue.isEmpty()) {
389 return i;
390 }
391 }
392 return -1;
393 }
394
395 private long splitLargeRun(long handle, int needPages) {
396 assert needPages > 0;
397
398 int totalPages = runPages(handle);
399 assert needPages <= totalPages;
400
401 int remPages = totalPages - needPages;
402
403 if (remPages > 0) {
404 int runOffset = runOffset(handle);
405
406
407 int availOffset = runOffset + needPages;
408 long availRun = toRunHandle(availOffset, remPages, 0);
409 insertAvailRun(availOffset, remPages, availRun);
410
411
412 return toRunHandle(runOffset, needPages, 1);
413 }
414
415
416 handle |= 1L << IS_USED_SHIFT;
417 return handle;
418 }
419
420
421
422
423
424
425
426
427
428 private long allocateSubpage(int sizeIdx) {
429
430
431 PoolSubpage<T> head = arena.findSubpagePoolHead(sizeIdx);
432 synchronized (head) {
433
434 int runSize = calculateRunSize(sizeIdx);
435
436 long runHandle = allocateRun(runSize);
437 if (runHandle < 0) {
438 return -1;
439 }
440
441 int runOffset = runOffset(runHandle);
442 assert subpages[runOffset] == null;
443 int elemSize = arena.sizeIdx2size(sizeIdx);
444
445 PoolSubpage<T> subpage = new PoolSubpage<T>(head, this, pageShifts, runOffset,
446 runSize(pageShifts, runHandle), elemSize);
447
448 subpages[runOffset] = subpage;
449 return subpage.allocate();
450 }
451 }
452
453
454
455
456
457
458
459
460 void free(long handle, int normCapacity, ByteBuffer nioBuffer) {
461 int runSize = runSize(pageShifts, handle);
462 if (isSubpage(handle)) {
463 int sizeIdx = arena.size2SizeIdx(normCapacity);
464 PoolSubpage<T> head = arena.findSubpagePoolHead(sizeIdx);
465
466 int sIdx = runOffset(handle);
467 PoolSubpage<T> subpage = subpages[sIdx];
468 assert subpage != null && subpage.doNotDestroy;
469
470
471
472 synchronized (head) {
473 if (subpage.free(head, bitmapIdx(handle))) {
474
475 return;
476 }
477 assert !subpage.doNotDestroy;
478
479 subpages[sIdx] = null;
480 }
481 }
482
483
484 synchronized (runsAvail) {
485
486
487 long finalRun = collapseRuns(handle);
488
489
490 finalRun &= ~(1L << IS_USED_SHIFT);
491
492 finalRun &= ~(1L << IS_SUBPAGE_SHIFT);
493
494 insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);
495 freeBytes += runSize;
496 }
497
498 if (nioBuffer != null && cachedNioBuffers != null &&
499 cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
500 cachedNioBuffers.offer(nioBuffer);
501 }
502 }
503
504 private long collapseRuns(long handle) {
505 return collapseNext(collapsePast(handle));
506 }
507
508 private long collapsePast(long handle) {
509 for (;;) {
510 int runOffset = runOffset(handle);
511 int runPages = runPages(handle);
512
513 long pastRun = getAvailRunByOffset(runOffset - 1);
514 if (pastRun == -1) {
515 return handle;
516 }
517
518 int pastOffset = runOffset(pastRun);
519 int pastPages = runPages(pastRun);
520
521
522 if (pastRun != handle && pastOffset + pastPages == runOffset) {
523
524 removeAvailRun(pastRun);
525 handle = toRunHandle(pastOffset, pastPages + runPages, 0);
526 } else {
527 return handle;
528 }
529 }
530 }
531
532 private long collapseNext(long handle) {
533 for (;;) {
534 int runOffset = runOffset(handle);
535 int runPages = runPages(handle);
536
537 long nextRun = getAvailRunByOffset(runOffset + runPages);
538 if (nextRun == -1) {
539 return handle;
540 }
541
542 int nextOffset = runOffset(nextRun);
543 int nextPages = runPages(nextRun);
544
545
546 if (nextRun != handle && runOffset + runPages == nextOffset) {
547
548 removeAvailRun(nextRun);
549 handle = toRunHandle(runOffset, runPages + nextPages, 0);
550 } else {
551 return handle;
552 }
553 }
554 }
555
556 private static long toRunHandle(int runOffset, int runPages, int inUsed) {
557 return (long) runOffset << RUN_OFFSET_SHIFT
558 | (long) runPages << SIZE_SHIFT
559 | (long) inUsed << IS_USED_SHIFT;
560 }
561
562 void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
563 PoolThreadCache threadCache) {
564 if (isSubpage(handle)) {
565 initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);
566 } else {
567 int maxLength = runSize(pageShifts, handle);
568 buf.init(this, nioBuffer, handle, runOffset(handle) << pageShifts,
569 reqCapacity, maxLength, arena.parent.threadCache());
570 }
571 }
572
573 void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
574 PoolThreadCache threadCache) {
575 int runOffset = runOffset(handle);
576 int bitmapIdx = bitmapIdx(handle);
577
578 PoolSubpage<T> s = subpages[runOffset];
579 assert s.doNotDestroy;
580 assert reqCapacity <= s.elemSize : reqCapacity + "<=" + s.elemSize;
581
582 int offset = (runOffset << pageShifts) + bitmapIdx * s.elemSize;
583 buf.init(this, nioBuffer, handle, offset, reqCapacity, s.elemSize, threadCache);
584 }
585
586 void incrementPinnedMemory(int delta) {
587 assert delta > 0;
588 pinnedBytes.add(delta);
589 }
590
591 void decrementPinnedMemory(int delta) {
592 assert delta > 0;
593 pinnedBytes.add(-delta);
594 }
595
596 @Override
597 public int chunkSize() {
598 return chunkSize;
599 }
600
601 @Override
602 public int freeBytes() {
603 synchronized (arena) {
604 return freeBytes;
605 }
606 }
607
608 public int pinnedBytes() {
609 return (int) pinnedBytes.value();
610 }
611
612 @Override
613 public String toString() {
614 final int freeBytes;
615 synchronized (arena) {
616 freeBytes = this.freeBytes;
617 }
618
619 return new StringBuilder()
620 .append("Chunk(")
621 .append(Integer.toHexString(System.identityHashCode(this)))
622 .append(": ")
623 .append(usage(freeBytes))
624 .append("%, ")
625 .append(chunkSize - freeBytes)
626 .append('/')
627 .append(chunkSize)
628 .append(')')
629 .toString();
630 }
631
632 void destroy() {
633 arena.destroyChunk(this);
634 }
635
636 static int runOffset(long handle) {
637 return (int) (handle >> RUN_OFFSET_SHIFT);
638 }
639
640 static int runSize(int pageShifts, long handle) {
641 return runPages(handle) << pageShifts;
642 }
643
644 static int runPages(long handle) {
645 return (int) (handle >> SIZE_SHIFT & 0x7fff);
646 }
647
648 static boolean isUsed(long handle) {
649 return (handle >> IS_USED_SHIFT & 1) == 1L;
650 }
651
652 static boolean isRun(long handle) {
653 return !isSubpage(handle);
654 }
655
656 static boolean isSubpage(long handle) {
657 return (handle >> IS_SUBPAGE_SHIFT & 1) == 1L;
658 }
659
660 static int bitmapIdx(long handle) {
661 return (int) handle;
662 }
663 }