1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.buffer.api.pool;
17
18 import io.netty5.buffer.api.AllocationType;
19 import io.netty5.buffer.api.AllocatorControl;
20 import io.netty5.buffer.api.Buffer;
21 import io.netty5.buffer.api.BufferAllocator;
22 import io.netty5.buffer.api.Drop;
23 import io.netty5.buffer.api.MemoryManager;
24 import io.netty5.buffer.api.StandardAllocationTypes;
25 import io.netty5.buffer.api.internal.ArcDrop;
26 import io.netty5.buffer.api.internal.Statics;
27 import io.netty5.util.NettyRuntime;
28 import io.netty5.util.concurrent.EventExecutor;
29 import io.netty5.util.concurrent.FastThreadLocal;
30 import io.netty5.util.concurrent.FastThreadLocalThread;
31 import io.netty5.util.internal.PlatformDependent;
32 import io.netty5.util.internal.StringUtil;
33 import io.netty5.util.internal.SystemPropertyUtil;
34 import io.netty5.util.internal.ThreadExecutorMap;
35 import io.netty5.util.internal.logging.InternalLogger;
36 import io.netty5.util.internal.logging.InternalLoggerFactory;
37
38 import java.util.ArrayList;
39 import java.util.Collections;
40 import java.util.List;
41 import java.util.concurrent.TimeUnit;
42 import java.util.function.Supplier;
43
44 import static io.netty5.buffer.api.internal.Statics.allocatorClosedException;
45 import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
46 import static java.util.Objects.requireNonNull;
47
48 public class PooledBufferAllocator implements BufferAllocator, BufferAllocatorMetricProvider {
49
50 private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledBufferAllocator.class);
51 private static final int DEFAULT_NUM_HEAP_ARENA;
52 private static final int DEFAULT_NUM_DIRECT_ARENA;
53
54 private static final int DEFAULT_PAGE_SIZE;
55 private static final int DEFAULT_MAX_ORDER;
56 private static final int DEFAULT_SMALL_CACHE_SIZE;
57 private static final int DEFAULT_NORMAL_CACHE_SIZE;
58 static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
59 private static final int DEFAULT_CACHE_TRIM_INTERVAL;
60 private static final long DEFAULT_CACHE_TRIM_INTERVAL_MILLIS;
61 private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;
62 private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;
63 static final int DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK;
64
65 private static final int MIN_PAGE_SIZE = 4096;
66 private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
67
68 private final Runnable trimTask = this::trimCurrentThreadCache;
69 private final AllocatorControl pooledAllocatorControl = () -> this;
70
71 static {
72 int defaultAlignment = SystemPropertyUtil.getInt(
73 "io.netty5.allocator.directMemoryCacheAlignment", 0);
74 int defaultPageSize = SystemPropertyUtil.getInt("io.netty5.allocator.pageSize", 8192);
75 Throwable pageSizeFallbackCause = null;
76 try {
77 validateAndCalculatePageShifts(defaultPageSize, defaultAlignment);
78 } catch (Throwable t) {
79 pageSizeFallbackCause = t;
80 defaultPageSize = 8192;
81 defaultAlignment = 0;
82 }
83 DEFAULT_PAGE_SIZE = defaultPageSize;
84 DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT = defaultAlignment;
85
86 int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty5.allocator.maxOrder", 9);
87 Throwable maxOrderFallbackCause = null;
88 try {
89 validateAndCalculateChunkSize(DEFAULT_PAGE_SIZE, defaultMaxOrder);
90 } catch (Throwable t) {
91 maxOrderFallbackCause = t;
92 defaultMaxOrder = 11;
93 }
94 DEFAULT_MAX_ORDER = defaultMaxOrder;
95
96
97
98 final Runtime runtime = Runtime.getRuntime();
99
100
101
102
103
104
105
106
107 final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
108 final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
109 DEFAULT_NUM_HEAP_ARENA = Math.max(0,
110 SystemPropertyUtil.getInt(
111 "io.netty5.allocator.numArenas",
112 (int) Math.min(
113 defaultMinNumArena,
114 runtime.maxMemory() / defaultChunkSize / 2 / 3)));
115 DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
116 SystemPropertyUtil.getInt(
117 "io.netty5.allocator.numDirectArenas",
118 (int) Math.min(
119 defaultMinNumArena,
120 PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
121
122
123 DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty5.allocator.smallCacheSize", 256);
124 DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty5.allocator.normalCacheSize", 64);
125
126
127
128 DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt(
129 "io.netty5.allocator.maxCachedBufferCapacity", 32 * 1024);
130
131
132 DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
133 "io.netty5.allocator.cacheTrimInterval", 8192);
134
135 DEFAULT_CACHE_TRIM_INTERVAL_MILLIS = SystemPropertyUtil.getLong(
136 "io.netty5.allocator.cacheTrimIntervalMillis", 0);
137
138 DEFAULT_USE_CACHE_FOR_ALL_THREADS = SystemPropertyUtil.getBoolean(
139 "io.netty5.allocator.useCacheForAllThreads", false);
140
141
142
143 DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK = SystemPropertyUtil.getInt(
144 "io.netty5.allocator.maxCachedByteBuffersPerChunk", 1023);
145
146 if (logger.isDebugEnabled()) {
147 logger.debug("-Dio.netty5.allocator.numArenas: {}", DEFAULT_NUM_HEAP_ARENA);
148 logger.debug("-Dio.netty5.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA);
149 if (pageSizeFallbackCause == null) {
150 logger.debug("-Dio.netty5.allocator.pageSize: {}", DEFAULT_PAGE_SIZE);
151 } else {
152 logger.debug("-Dio.netty5.allocator.pageSize: {}", DEFAULT_PAGE_SIZE, pageSizeFallbackCause);
153 }
154 if (maxOrderFallbackCause == null) {
155 logger.debug("-Dio.netty5.allocator.maxOrder: {}", DEFAULT_MAX_ORDER);
156 } else {
157 logger.debug("-Dio.netty5.allocator.maxOrder: {}", DEFAULT_MAX_ORDER, maxOrderFallbackCause);
158 }
159 logger.debug("-Dio.netty5.allocator.chunkSize: {}", DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER);
160 logger.debug("-Dio.netty5.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE);
161 logger.debug("-Dio.netty5.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE);
162 logger.debug("-Dio.netty5.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
163 logger.debug("-Dio.netty5.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL);
164 logger.debug("-Dio.netty5.allocator.cacheTrimIntervalMillis: {}", DEFAULT_CACHE_TRIM_INTERVAL_MILLIS);
165 logger.debug("-Dio.netty5.allocator.useCacheForAllThreads: {}", DEFAULT_USE_CACHE_FOR_ALL_THREADS);
166 logger.debug("-Dio.netty5.allocator.maxCachedByteBuffersPerChunk: {}",
167 DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK);
168 }
169 }
170
171 private final MemoryManager manager;
172 private final AllocationType allocationType;
173 private final PoolArena[] arenas;
174 private final int smallCacheSize;
175 private final int normalCacheSize;
176 private final List<PoolArenaMetric> arenaMetrics;
177 private final List<PoolArenaMetric> arenaMetricsView;
178 private final PoolThreadLocalCache threadCache;
179 private final int chunkSize;
180 private final PooledBufferAllocatorMetric metric;
181 private volatile boolean closed;
182
183 public PooledBufferAllocator(MemoryManager manager, boolean direct) {
184 this(manager, direct, direct? DEFAULT_NUM_DIRECT_ARENA : DEFAULT_NUM_HEAP_ARENA,
185 DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER, DEFAULT_SMALL_CACHE_SIZE,
186 DEFAULT_NORMAL_CACHE_SIZE, DEFAULT_USE_CACHE_FOR_ALL_THREADS,
187 DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
188 }
189
190 public PooledBufferAllocator(MemoryManager manager, boolean direct, int numArenas, int pageSize, int maxOrder) {
191 this(manager, direct, numArenas, pageSize, maxOrder, DEFAULT_SMALL_CACHE_SIZE,
192 DEFAULT_NORMAL_CACHE_SIZE, DEFAULT_USE_CACHE_FOR_ALL_THREADS,
193 DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
194 }
195
196 public PooledBufferAllocator(MemoryManager manager, boolean direct, int numArenas, int pageSize, int maxOrder,
197 int smallCacheSize, int normalCacheSize,
198 boolean useCacheForAllThreads) {
199 this(manager, direct, numArenas, pageSize, maxOrder,
200 smallCacheSize, normalCacheSize,
201 useCacheForAllThreads, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
202 }
203
204 public PooledBufferAllocator(MemoryManager manager, boolean direct, int numArenas, int pageSize, int maxOrder,
205 int smallCacheSize, int normalCacheSize,
206 boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
207 this.manager = requireNonNull(manager, "MemoryManager");
208 allocationType = direct? StandardAllocationTypes.OFF_HEAP : StandardAllocationTypes.ON_HEAP;
209 threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
210 this.smallCacheSize = smallCacheSize;
211 this.normalCacheSize = normalCacheSize;
212
213 if (directMemoryCacheAlignment != 0) {
214 if (!PlatformDependent.hasAlignDirectByteBuffer()) {
215 throw new UnsupportedOperationException("Buffer alignment is not supported. " +
216 "Either Unsafe or ByteBuffer.alignSlice() must be available.");
217 }
218
219
220 pageSize = (int) PlatformDependent.align(pageSize, directMemoryCacheAlignment);
221 }
222
223 chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
224
225 checkPositiveOrZero(numArenas, "numArenas");
226
227 checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment");
228 if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
229 throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
230 }
231
232 if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
233 throw new IllegalArgumentException("directMemoryCacheAlignment: "
234 + directMemoryCacheAlignment + " (expected: power of two)");
235 }
236
237 int pageShifts = validateAndCalculatePageShifts(pageSize, directMemoryCacheAlignment);
238
239 if (numArenas > 0) {
240 arenas = newArenaArray(numArenas);
241 List<PoolArenaMetric> metrics = new ArrayList<>(arenas.length);
242 for (int i = 0; i < arenas.length; i ++) {
243 PoolArena arena = new PoolArena(this, manager, allocationType,
244 pageSize, pageShifts, chunkSize,
245 directMemoryCacheAlignment);
246 arenas[i] = arena;
247 metrics.add(arena);
248 }
249 arenaMetrics = metrics;
250 arenaMetricsView = Collections.unmodifiableList(metrics);
251 } else {
252 arenas = null;
253 arenaMetrics = new ArrayList<>(1);
254 arenaMetricsView = Collections.emptyList();
255 }
256
257 metric = new PooledBufferAllocatorMetric(this);
258 }
259
260 final AllocatorControl getPooledAllocatorControl() {
261 return pooledAllocatorControl;
262 }
263
264 private static PoolArena[] newArenaArray(int size) {
265 return new PoolArena[size];
266 }
267
268 private static int validateAndCalculatePageShifts(int pageSize, int alignment) {
269 if (pageSize < MIN_PAGE_SIZE) {
270 throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: " + MIN_PAGE_SIZE + ')');
271 }
272
273 if ((pageSize & pageSize - 1) != 0) {
274 throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: power of 2)");
275 }
276
277 if (pageSize < alignment) {
278 throw new IllegalArgumentException("Alignment cannot be greater than page size. " +
279 "Alignment: " + alignment + ", page size: " + pageSize + '.');
280 }
281
282
283 return Integer.SIZE - 1 - Integer.numberOfLeadingZeros(pageSize);
284 }
285
286 private static int validateAndCalculateChunkSize(int pageSize, int maxOrder) {
287 if (maxOrder > 14) {
288 throw new IllegalArgumentException("maxOrder: " + maxOrder + " (expected: 0-14)");
289 }
290
291
292 int chunkSize = pageSize;
293 for (int i = maxOrder; i > 0; i--) {
294 if (chunkSize > MAX_CHUNK_SIZE / 2) {
295 throw new IllegalArgumentException(String.format(
296 "pageSize (%d) << maxOrder (%d) must not exceed %d", pageSize, maxOrder, MAX_CHUNK_SIZE));
297 }
298 chunkSize <<= 1;
299 }
300 return chunkSize;
301 }
302
303 @Override
304 public boolean isPooling() {
305 return true;
306 }
307
308 @Override
309 public AllocationType getAllocationType() {
310 return allocationType;
311 }
312
313 @Override
314 public Buffer allocate(int size) {
315 if (closed) {
316 throw allocatorClosedException();
317 }
318 Statics.assertValidBufferSize(size);
319 UntetheredMemory memory = allocateUntethered(size);
320 Drop<Buffer> drop = memory.drop();
321 Buffer buffer = manager.recoverMemory(pooledAllocatorControl, memory.memory(), drop);
322 drop.attach(buffer);
323 return buffer;
324 }
325
326 @Override
327 public Supplier<Buffer> constBufferSupplier(byte[] bytes) {
328 if (closed) {
329 throw allocatorClosedException();
330 }
331 Buffer constantBuffer = manager.allocateShared(
332 pooledAllocatorControl, bytes.length, ArcDrop::wrap, allocationType);
333 constantBuffer.writeBytes(bytes).makeReadOnly();
334 return () -> manager.allocateConstChild(constantBuffer);
335 }
336
337 UntetheredMemory allocateUntethered(int size) {
338 PoolThreadCache cache = threadCache.get();
339 PoolArena arena = cache.getArena();
340
341 if (arena != null) {
342 return arena.allocate(cache, size);
343 }
344 return allocateUnpooled(size);
345 }
346
347 private UntetheredMemory allocateUnpooled(int size) {
348 return new UnpooledUntetheredMemory(this, manager, allocationType, size);
349 }
350
351 @Override
352 public void close() {
353 closed = true;
354 trimCurrentThreadCache();
355 threadCache.remove();
356 for (int i = 0, arenasLength = arenas.length; i < arenasLength; i++) {
357 PoolArena arena = arenas[i];
358 if (arena != null) {
359 arena.close();
360 arenas[i] = null;
361 }
362 }
363 arenaMetrics.clear();
364 }
365
366
367
368
369 public static int defaultNumHeapArena() {
370 return DEFAULT_NUM_HEAP_ARENA;
371 }
372
373
374
375
376 public static int defaultNumDirectArena() {
377 return DEFAULT_NUM_DIRECT_ARENA;
378 }
379
380
381
382
383 public static int defaultPageSize() {
384 return DEFAULT_PAGE_SIZE;
385 }
386
387
388
389
390 public static int defaultMaxOrder() {
391 return DEFAULT_MAX_ORDER;
392 }
393
394
395
396
397 public static boolean defaultUseCacheForAllThreads() {
398 return DEFAULT_USE_CACHE_FOR_ALL_THREADS;
399 }
400
401
402
403
404 public static boolean defaultPreferDirect() {
405 return PlatformDependent.directBufferPreferred();
406 }
407
408
409
410
411 public static int defaultSmallCacheSize() {
412 return DEFAULT_SMALL_CACHE_SIZE;
413 }
414
415
416
417
418 public static int defaultNormalCacheSize() {
419 return DEFAULT_NORMAL_CACHE_SIZE;
420 }
421
422
423
424
425 public static boolean isDirectMemoryCacheAlignmentSupported() {
426 return PlatformDependent.hasUnsafe();
427 }
428
429 public boolean isDirectBufferPooled() {
430 return allocationType == StandardAllocationTypes.OFF_HEAP;
431 }
432
433 public int numArenas() {
434 return arenas.length;
435 }
436
437 final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
438 private final boolean useCacheForAllThreads;
439
440 PoolThreadLocalCache(boolean useCacheForAllThreads) {
441 this.useCacheForAllThreads = useCacheForAllThreads;
442 }
443
444 @Override
445 protected synchronized PoolThreadCache initialValue() {
446 final PoolArena arena = leastUsedArena(arenas);
447
448 final Thread current = Thread.currentThread();
449 final EventExecutor executor = ThreadExecutorMap.currentExecutor();
450 if (useCacheForAllThreads ||
451
452 current instanceof FastThreadLocalThread ||
453
454
455 executor != null) {
456 final PoolThreadCache cache = new PoolThreadCache(
457 arena, smallCacheSize, normalCacheSize,
458 DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
459
460 if (DEFAULT_CACHE_TRIM_INTERVAL_MILLIS > 0) {
461 if (executor != null) {
462 executor.scheduleAtFixedRate(trimTask, DEFAULT_CACHE_TRIM_INTERVAL_MILLIS,
463 DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
464 }
465 }
466 return cache;
467 }
468
469 return new PoolThreadCache(arena, 0, 0, 0, 0);
470 }
471
472 @Override
473 protected void onRemoval(PoolThreadCache threadCache) {
474 threadCache.free();
475 }
476 }
477
478 static PoolArena leastUsedArena(PoolArena[] arenas) {
479 if (arenas == null || arenas.length == 0) {
480 return null;
481 }
482
483 PoolArena minArena = arenas[0];
484 for (int i = 1; i < arenas.length; i++) {
485 PoolArena arena = arenas[i];
486 if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
487 minArena = arena;
488 }
489 }
490
491 return minArena;
492 }
493
494 @Override
495 public BufferAllocatorMetric metric() {
496 return metric;
497 }
498
499
500
501
502 List<PoolArenaMetric> arenaMetrics() {
503 return arenaMetricsView;
504 }
505
506
507
508
509 int numThreadLocalCaches() {
510 if (arenas == null) {
511 return 0;
512 }
513
514 int total = 0;
515 for (PoolArena arena : arenas) {
516 total += arena.numThreadCaches.get();
517 }
518
519 return total;
520 }
521
522
523
524
525 int smallCacheSize() {
526 return smallCacheSize;
527 }
528
529
530
531
532 int normalCacheSize() {
533 return normalCacheSize;
534 }
535
536
537
538
539 final int chunkSize() {
540 return chunkSize;
541 }
542
543 final long usedMemory() {
544 return usedMemory(arenas);
545 }
546
547 private static long usedMemory(PoolArena[] arenas) {
548 if (arenas == null) {
549 return -1;
550 }
551 long used = 0;
552 for (PoolArena arena : arenas) {
553 used += arena.numActiveBytes();
554 if (used < 0) {
555 return Long.MAX_VALUE;
556 }
557 }
558 return used;
559 }
560
561 final long pinnedMemory() {
562 return pinnedMemory(arenas);
563 }
564
565 private static long pinnedMemory(PoolArena[] arenas) {
566 if (arenas == null) {
567 return -1;
568 }
569 long used = 0;
570 for (PoolArena arena : arenas) {
571 used += arena.numPinnedBytes();
572 if (used < 0) {
573 return Long.MAX_VALUE;
574 }
575 }
576 return used;
577 }
578
579 final PoolThreadCache threadCache() {
580 PoolThreadCache cache = threadCache.get();
581 assert cache != null;
582 return cache;
583 }
584
585
586
587
588
589
590
591 public boolean trimCurrentThreadCache() {
592 PoolThreadCache cache = threadCache.getIfExists();
593 if (cache != null) {
594 cache.trim();
595 return true;
596 }
597 return false;
598 }
599
600
601
602
603
604 public String dumpStats() {
605 int heapArenasLen = arenas == null ? 0 : arenas.length;
606 StringBuilder buf = new StringBuilder(512)
607 .append(heapArenasLen)
608 .append(" arena(s):")
609 .append(StringUtil.NEWLINE);
610 if (heapArenasLen > 0) {
611 for (PoolArena a: arenas) {
612 buf.append(a);
613 }
614 }
615
616 return buf.toString();
617 }
618 }