1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util;
17
18 import io.netty.util.concurrent.FastThreadLocal;
19 import io.netty.util.concurrent.FastThreadLocalThread;
20 import io.netty.util.internal.ObjectPool;
21 import io.netty.util.internal.PlatformDependent;
22 import io.netty.util.internal.SystemPropertyUtil;
23 import io.netty.util.internal.UnstableApi;
24 import io.netty.util.internal.logging.InternalLogger;
25 import io.netty.util.internal.logging.InternalLoggerFactory;
26 import org.jctools.queues.MessagePassingQueue;
27 import org.jetbrains.annotations.VisibleForTesting;
28
29 import java.util.ArrayDeque;
30 import java.util.Objects;
31 import java.util.Queue;
32 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
33
34 import static io.netty.util.internal.PlatformDependent.newFixedMpmcQueue;
35 import static io.netty.util.internal.PlatformDependent.newMpscQueue;
36 import static java.lang.Math.max;
37 import static java.lang.Math.min;
38
39
40
41
42
43
44 public abstract class Recycler<T> {
45 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
46
47
48
49
50
51 private static final class LocalPoolHandle<T> extends EnhancedHandle<T> {
52 private final UnguardedLocalPool<T> pool;
53
54 private LocalPoolHandle(UnguardedLocalPool<T> pool) {
55 this.pool = pool;
56 }
57
58 @Override
59 public void recycle(T object) {
60 UnguardedLocalPool<T> pool = this.pool;
61 if (pool != null) {
62 pool.release(object);
63 }
64 }
65
66 @Override
67 public void unguardedRecycle(final Object object) {
68 UnguardedLocalPool<T> pool = this.pool;
69 if (pool != null) {
70 pool.release((T) object);
71 }
72 }
73 }
74
75 private static final EnhancedHandle<?> NOOP_HANDLE = new LocalPoolHandle<>(null);
76 private static final UnguardedLocalPool<?> NOOP_LOCAL_POOL = new UnguardedLocalPool<>(0);
77 private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024;
78 private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
79 private static final int RATIO;
80 private static final int DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD;
81 private static final boolean BLOCKING_POOL;
82 private static final boolean BATCH_FAST_TL_ONLY;
83
84 static {
85
86
87
88 int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
89 SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
90 if (maxCapacityPerThread < 0) {
91 maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
92 }
93
94 DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
95 DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD = SystemPropertyUtil.getInt("io.netty.recycler.chunkSize", 32);
96
97
98
99
100 RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
101
102 BLOCKING_POOL = SystemPropertyUtil.getBoolean("io.netty.recycler.blocking", false);
103 BATCH_FAST_TL_ONLY = SystemPropertyUtil.getBoolean("io.netty.recycler.batchFastThreadLocalOnly", true);
104
105 if (logger.isDebugEnabled()) {
106 if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
107 logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
108 logger.debug("-Dio.netty.recycler.ratio: disabled");
109 logger.debug("-Dio.netty.recycler.chunkSize: disabled");
110 logger.debug("-Dio.netty.recycler.blocking: disabled");
111 logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: disabled");
112 } else {
113 logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
114 logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
115 logger.debug("-Dio.netty.recycler.chunkSize: {}", DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
116 logger.debug("-Dio.netty.recycler.blocking: {}", BLOCKING_POOL);
117 logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: {}", BATCH_FAST_TL_ONLY);
118 }
119 }
120 }
121
122 private final LocalPool<?, T> localPool;
123 private final FastThreadLocal<LocalPool<?, T>> threadLocalPool;
124
125
126
127
128
129
130
131
132
133
134
135
136 protected Recycler(int maxCapacity, boolean unguarded) {
137 if (maxCapacity <= 0) {
138 maxCapacity = 0;
139 } else {
140 maxCapacity = max(4, maxCapacity);
141 }
142 threadLocalPool = null;
143 if (maxCapacity == 0) {
144 localPool = (LocalPool<?, T>) NOOP_LOCAL_POOL;
145 } else {
146 localPool = unguarded? new UnguardedLocalPool<>(maxCapacity) : new GuardedLocalPool<>(maxCapacity);
147 }
148 }
149
150
151
152
153
154
155
156
157 protected Recycler(boolean unguarded) {
158 this(DEFAULT_MAX_CAPACITY_PER_THREAD, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD, unguarded);
159 }
160
161
162
163
164
165
166
167
168
169
170
171 protected Recycler(Thread owner, boolean unguarded) {
172 this(DEFAULT_MAX_CAPACITY_PER_THREAD, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD, owner, unguarded);
173 }
174
175 protected Recycler(int maxCapacityPerThread) {
176 this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
177 }
178
179 protected Recycler() {
180 this(DEFAULT_MAX_CAPACITY_PER_THREAD);
181 }
182
183
184
185
186
187
188 protected Recycler(int chunksSize, int maxCapacityPerThread, boolean unguarded) {
189 this(maxCapacityPerThread, RATIO, chunksSize, unguarded);
190 }
191
192
193
194
195
196
197
198
199
200 protected Recycler(int chunkSize, int maxCapacityPerThread, Thread owner, boolean unguarded) {
201 this(maxCapacityPerThread, RATIO, chunkSize, owner, unguarded);
202 }
203
204
205
206
207
208 @Deprecated
209 @SuppressWarnings("unused")
210 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
211 this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
212 }
213
214
215
216
217
218 @Deprecated
219 @SuppressWarnings("unused")
220 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
221 int ratio, int maxDelayedQueuesPerThread) {
222 this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
223 }
224
225
226
227
228
229 @Deprecated
230 @SuppressWarnings("unused")
231 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
232 int ratio, int maxDelayedQueuesPerThread, int delayedQueueRatio) {
233 this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
234 }
235
236 protected Recycler(int maxCapacityPerThread, int interval, int chunkSize) {
237 this(maxCapacityPerThread, interval, chunkSize, true, null, false);
238 }
239
240
241
242
243
244
245 protected Recycler(int maxCapacityPerThread, int interval, int chunkSize, boolean unguarded) {
246 this(maxCapacityPerThread, interval, chunkSize, true, null, unguarded);
247 }
248
249
250
251
252
253
254 protected Recycler(int maxCapacityPerThread, int interval, int chunkSize, Thread owner, boolean unguarded) {
255 this(maxCapacityPerThread, interval, chunkSize, false, owner, unguarded);
256 }
257
258 @SuppressWarnings("unchecked")
259 private Recycler(int maxCapacityPerThread, int ratio, int chunkSize, boolean useThreadLocalStorage,
260 Thread owner, boolean unguarded) {
261 final int interval = max(0, ratio);
262 if (maxCapacityPerThread <= 0) {
263 maxCapacityPerThread = 0;
264 chunkSize = 0;
265 } else {
266 maxCapacityPerThread = max(4, maxCapacityPerThread);
267 chunkSize = max(2, min(chunkSize, maxCapacityPerThread >> 1));
268 }
269 if (maxCapacityPerThread > 0 && useThreadLocalStorage) {
270 final int finalMaxCapacityPerThread = maxCapacityPerThread;
271 final int finalChunkSize = chunkSize;
272 threadLocalPool = new FastThreadLocal<LocalPool<?, T>>() {
273 @Override
274 protected LocalPool<?, T> initialValue() {
275 return unguarded? new UnguardedLocalPool<>(finalMaxCapacityPerThread, interval, finalChunkSize) :
276 new GuardedLocalPool<>(finalMaxCapacityPerThread, interval, finalChunkSize);
277 }
278
279 @Override
280 protected void onRemoval(LocalPool<?, T> value) throws Exception {
281 super.onRemoval(value);
282 MessagePassingQueue<?> handles = value.pooledHandles;
283 value.pooledHandles = null;
284 value.owner = null;
285 if (handles != null) {
286 handles.clear();
287 }
288 }
289 };
290 localPool = null;
291 } else {
292 threadLocalPool = null;
293 if (maxCapacityPerThread == 0) {
294 localPool = (LocalPool<?, T>) NOOP_LOCAL_POOL;
295 } else {
296 Objects.requireNonNull(owner, "owner");
297 localPool = unguarded? new UnguardedLocalPool<>(owner, maxCapacityPerThread, interval, chunkSize) :
298 new GuardedLocalPool<>(owner, maxCapacityPerThread, interval, chunkSize);
299 }
300 }
301 }
302
303 @SuppressWarnings("unchecked")
304 public final T get() {
305 if (localPool != null) {
306 return localPool.getWith(this);
307 } else {
308 if (PlatformDependent.isVirtualThread(Thread.currentThread()) &&
309 !FastThreadLocalThread.currentThreadHasFastThreadLocal()) {
310 return newObject((Handle<T>) NOOP_HANDLE);
311 }
312 return threadLocalPool.get().getWith(this);
313 }
314 }
315
316
317
318
319
320
321
322
323
324 public static void unpinOwner(Recycler<?> recycler) {
325 if (recycler.localPool != null) {
326 recycler.localPool.owner = null;
327 }
328 }
329
330
331
332
333 @Deprecated
334 public final boolean recycle(T o, Handle<T> handle) {
335 if (handle == NOOP_HANDLE) {
336 return false;
337 }
338
339 handle.recycle(o);
340 return true;
341 }
342
343 @VisibleForTesting
344 final int threadLocalSize() {
345 if (localPool != null) {
346 return localPool.size();
347 } else {
348 if (PlatformDependent.isVirtualThread(Thread.currentThread()) &&
349 !FastThreadLocalThread.currentThreadHasFastThreadLocal()) {
350 return 0;
351 }
352 final LocalPool<?, T> pool = threadLocalPool.getIfExists();
353 if (pool == null) {
354 return 0;
355 }
356 return pool.size();
357 }
358 }
359
360
361
362
363 protected abstract T newObject(Handle<T> handle);
364
365 @SuppressWarnings("ClassNameSameAsAncestorName")
366 public interface Handle<T> extends ObjectPool.Handle<T> { }
367
368 @UnstableApi
369 public abstract static class EnhancedHandle<T> implements Handle<T> {
370
371 public abstract void unguardedRecycle(Object object);
372
373 private EnhancedHandle() {
374 }
375 }
376
377 private static final class DefaultHandle<T> extends EnhancedHandle<T> {
378 private static final int STATE_CLAIMED = 0;
379 private static final int STATE_AVAILABLE = 1;
380 private static final AtomicIntegerFieldUpdater<DefaultHandle<?>> STATE_UPDATER;
381 static {
382 AtomicIntegerFieldUpdater<?> updater = AtomicIntegerFieldUpdater.newUpdater(DefaultHandle.class, "state");
383
384 STATE_UPDATER = (AtomicIntegerFieldUpdater<DefaultHandle<?>>) updater;
385 }
386
387 private volatile int state;
388 private final GuardedLocalPool<T> localPool;
389 private T value;
390
391 DefaultHandle(GuardedLocalPool<T> localPool) {
392 this.localPool = localPool;
393 }
394
395 @Override
396 public void recycle(Object object) {
397 if (object != value) {
398 throw new IllegalArgumentException("object does not belong to handle");
399 }
400 toAvailable();
401 localPool.release(this);
402 }
403
404 @Override
405 public void unguardedRecycle(Object object) {
406 if (object != value) {
407 throw new IllegalArgumentException("object does not belong to handle");
408 }
409 unguardedToAvailable();
410 localPool.release(this);
411 }
412
413 T claim() {
414 assert state == STATE_AVAILABLE;
415 STATE_UPDATER.lazySet(this, STATE_CLAIMED);
416 return value;
417 }
418
419 void set(T value) {
420 this.value = value;
421 }
422
423 private void toAvailable() {
424 int prev = STATE_UPDATER.getAndSet(this, STATE_AVAILABLE);
425 if (prev == STATE_AVAILABLE) {
426 throw new IllegalStateException("Object has been recycled already.");
427 }
428 }
429
430 private void unguardedToAvailable() {
431 int prev = state;
432 if (prev == STATE_AVAILABLE) {
433 throw new IllegalStateException("Object has been recycled already.");
434 }
435 STATE_UPDATER.lazySet(this, STATE_AVAILABLE);
436 }
437 }
438
439 private static final class GuardedLocalPool<T> extends LocalPool<DefaultHandle<T>, T> {
440
441 GuardedLocalPool(int maxCapacity) {
442 super(maxCapacity);
443 }
444
445 GuardedLocalPool(Thread owner, int maxCapacity, int ratioInterval, int chunkSize) {
446 super(owner, maxCapacity, ratioInterval, chunkSize);
447 }
448
449 GuardedLocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
450 super(maxCapacity, ratioInterval, chunkSize);
451 }
452
453 @Override
454 public T getWith(Recycler<T> recycler) {
455 DefaultHandle<T> handle = acquire();
456 T obj;
457 if (handle == null) {
458 handle = canAllocatePooled()? new DefaultHandle<>(this) : null;
459 if (handle != null) {
460 obj = recycler.newObject(handle);
461 handle.set(obj);
462 } else {
463 obj = recycler.newObject((Handle<T>) NOOP_HANDLE);
464 }
465 } else {
466 obj = handle.claim();
467 }
468 return obj;
469 }
470 }
471
472 private static final class UnguardedLocalPool<T> extends LocalPool<T, T> {
473 private final EnhancedHandle<T> handle;
474
475 UnguardedLocalPool(int maxCapacity) {
476 super(maxCapacity);
477 handle = maxCapacity == 0? null : new LocalPoolHandle<>(this);
478 }
479
480 UnguardedLocalPool(Thread owner, int maxCapacity, int ratioInterval, int chunkSize) {
481 super(owner, maxCapacity, ratioInterval, chunkSize);
482 handle = new LocalPoolHandle<>(this);
483 }
484
485 UnguardedLocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
486 super(maxCapacity, ratioInterval, chunkSize);
487 handle = new LocalPoolHandle<>(this);
488 }
489
490 @Override
491 public T getWith(Recycler<T> recycler) {
492 T obj = acquire();
493 if (obj == null) {
494 obj = recycler.newObject(canAllocatePooled()? handle : (Handle<T>) NOOP_HANDLE);
495 }
496 return obj;
497 }
498 }
499
500 private abstract static class LocalPool<H, T> {
501 private final int ratioInterval;
502 private final H[] batch;
503 private int batchSize;
504 private Thread owner;
505 private MessagePassingQueue<H> pooledHandles;
506 private int ratioCounter;
507
508 LocalPool(int maxCapacity) {
509
510
511
512 this.ratioInterval = maxCapacity == 0? -1 : 0;
513 this.owner = null;
514 batch = null;
515 batchSize = 0;
516 pooledHandles = createExternalMcPool(maxCapacity);
517 ratioCounter = 0;
518 }
519
520 @SuppressWarnings("unchecked")
521 LocalPool(Thread owner, int maxCapacity, int ratioInterval, int chunkSize) {
522 this.ratioInterval = ratioInterval;
523 this.owner = owner;
524 batch = owner != null? (H[]) new Object[chunkSize] : null;
525 batchSize = 0;
526 pooledHandles = createExternalScPool(chunkSize, maxCapacity);
527 ratioCounter = ratioInterval;
528 }
529
530 private static <H> MessagePassingQueue<H> createExternalMcPool(int maxCapacity) {
531 if (maxCapacity == 0) {
532 return null;
533 }
534 if (BLOCKING_POOL) {
535 return new BlockingMessageQueue<>(maxCapacity);
536 }
537 return (MessagePassingQueue<H>) newFixedMpmcQueue(maxCapacity);
538 }
539
540 private static <H> MessagePassingQueue<H> createExternalScPool(int chunkSize, int maxCapacity) {
541 if (maxCapacity == 0) {
542 return null;
543 }
544 if (BLOCKING_POOL) {
545 return new BlockingMessageQueue<>(maxCapacity);
546 }
547 return (MessagePassingQueue<H>) newMpscQueue(chunkSize, maxCapacity);
548 }
549
550 LocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
551 this(!BATCH_FAST_TL_ONLY || FastThreadLocalThread.currentThreadHasFastThreadLocal()
552 ? Thread.currentThread() : null, maxCapacity, ratioInterval, chunkSize);
553 }
554
555 protected final H acquire() {
556 int size = batchSize;
557 if (size == 0) {
558
559 final MessagePassingQueue<H> handles = pooledHandles;
560 if (handles == null) {
561 return null;
562 }
563 return handles.relaxedPoll();
564 }
565 int top = size - 1;
566 final H h = batch[top];
567 batchSize = top;
568 batch[top] = null;
569 return h;
570 }
571
572 protected final void release(H handle) {
573 Thread owner = this.owner;
574 if (owner != null && Thread.currentThread() == owner && batchSize < batch.length) {
575 batch[batchSize] = handle;
576 batchSize++;
577 } else if (owner != null && isTerminated(owner)) {
578 pooledHandles = null;
579 this.owner = null;
580 } else {
581 MessagePassingQueue<H> handles = pooledHandles;
582 if (handles != null) {
583 handles.relaxedOffer(handle);
584 }
585 }
586 }
587
588 private static boolean isTerminated(Thread owner) {
589
590
591 return PlatformDependent.isJ9Jvm()? !owner.isAlive() : owner.getState() == Thread.State.TERMINATED;
592 }
593
594 boolean canAllocatePooled() {
595 if (ratioInterval < 0) {
596 return false;
597 }
598 if (ratioInterval == 0) {
599 return true;
600 }
601 if (++ratioCounter >= ratioInterval) {
602 ratioCounter = 0;
603 return true;
604 }
605 return false;
606 }
607
608 abstract T getWith(Recycler<T> recycler);
609
610 int size() {
611 MessagePassingQueue<H> handles = pooledHandles;
612 final int externalSize = handles != null? handles.size() : 0;
613 return externalSize + (batch != null? batchSize : 0);
614 }
615 }
616
617
618
619
620
621
622
623 private static final class BlockingMessageQueue<T> implements MessagePassingQueue<T> {
624 private final Queue<T> deque;
625 private final int maxCapacity;
626
627 BlockingMessageQueue(int maxCapacity) {
628 this.maxCapacity = maxCapacity;
629
630
631
632
633
634
635
636
637
638 deque = new ArrayDeque<T>();
639 }
640
641 @Override
642 public synchronized boolean offer(T e) {
643 if (deque.size() == maxCapacity) {
644 return false;
645 }
646 return deque.offer(e);
647 }
648
649 @Override
650 public synchronized T poll() {
651 return deque.poll();
652 }
653
654 @Override
655 public synchronized T peek() {
656 return deque.peek();
657 }
658
659 @Override
660 public synchronized int size() {
661 return deque.size();
662 }
663
664 @Override
665 public synchronized void clear() {
666 deque.clear();
667 }
668
669 @Override
670 public synchronized boolean isEmpty() {
671 return deque.isEmpty();
672 }
673
674 @Override
675 public int capacity() {
676 return maxCapacity;
677 }
678
679 @Override
680 public boolean relaxedOffer(T e) {
681 return offer(e);
682 }
683
684 @Override
685 public T relaxedPoll() {
686 return poll();
687 }
688
689 @Override
690 public T relaxedPeek() {
691 return peek();
692 }
693
694 @Override
695 public int drain(Consumer<T> c, int limit) {
696 T obj;
697 int i = 0;
698 for (; i < limit && (obj = poll()) != null; i++) {
699 c.accept(obj);
700 }
701 return i;
702 }
703
704 @Override
705 public int fill(Supplier<T> s, int limit) {
706 throw new UnsupportedOperationException();
707 }
708
709 @Override
710 public int drain(Consumer<T> c) {
711 throw new UnsupportedOperationException();
712 }
713
714 @Override
715 public int fill(Supplier<T> s) {
716 throw new UnsupportedOperationException();
717 }
718
719 @Override
720 public void drain(Consumer<T> c, WaitStrategy wait, ExitCondition exit) {
721 throw new UnsupportedOperationException();
722 }
723
724 @Override
725 public void fill(Supplier<T> s, WaitStrategy wait, ExitCondition exit) {
726 throw new UnsupportedOperationException();
727 }
728 }
729 }