1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util;
17
18 import io.netty.util.concurrent.FastThreadLocal;
19 import io.netty.util.concurrent.FastThreadLocalThread;
20 import io.netty.util.internal.ObjectPool;
21 import io.netty.util.internal.PlatformDependent;
22 import io.netty.util.internal.SystemPropertyUtil;
23 import io.netty.util.internal.UnstableApi;
24 import io.netty.util.internal.logging.InternalLogger;
25 import io.netty.util.internal.logging.InternalLoggerFactory;
26 import org.jctools.queues.MessagePassingQueue;
27 import org.jetbrains.annotations.VisibleForTesting;
28
29 import java.util.ArrayDeque;
30 import java.util.Queue;
31 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
32
33 import static io.netty.util.internal.PlatformDependent.newMpscQueue;
34 import static java.lang.Math.max;
35 import static java.lang.Math.min;
36
37
38
39
40
41
42 public abstract class Recycler<T> {
43 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
44 private static final EnhancedHandle<?> NOOP_HANDLE = new EnhancedHandle<Object>() {
45 @Override
46 public void recycle(Object object) {
47
48 }
49
50 @Override
51 public void unguardedRecycle(final Object object) {
52
53 }
54
55 @Override
56 public String toString() {
57 return "NOOP_HANDLE";
58 }
59 };
60 private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024;
61 private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
62 private static final int RATIO;
63 private static final int DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD;
64 private static final boolean BLOCKING_POOL;
65 private static final boolean BATCH_FAST_TL_ONLY;
66
67 static {
68
69
70
71 int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
72 SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
73 if (maxCapacityPerThread < 0) {
74 maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
75 }
76
77 DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
78 DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD = SystemPropertyUtil.getInt("io.netty.recycler.chunkSize", 32);
79
80
81
82
83 RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
84
85 BLOCKING_POOL = SystemPropertyUtil.getBoolean("io.netty.recycler.blocking", false);
86 BATCH_FAST_TL_ONLY = SystemPropertyUtil.getBoolean("io.netty.recycler.batchFastThreadLocalOnly", true);
87
88 if (logger.isDebugEnabled()) {
89 if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
90 logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
91 logger.debug("-Dio.netty.recycler.ratio: disabled");
92 logger.debug("-Dio.netty.recycler.chunkSize: disabled");
93 logger.debug("-Dio.netty.recycler.blocking: disabled");
94 logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: disabled");
95 } else {
96 logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
97 logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
98 logger.debug("-Dio.netty.recycler.chunkSize: {}", DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
99 logger.debug("-Dio.netty.recycler.blocking: {}", BLOCKING_POOL);
100 logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: {}", BATCH_FAST_TL_ONLY);
101 }
102 }
103 }
104
105 private final int maxCapacityPerThread;
106 private final int interval;
107 private final int chunkSize;
108 private final FastThreadLocal<LocalPool<T>> threadLocal = new FastThreadLocal<LocalPool<T>>() {
109 @Override
110 protected LocalPool<T> initialValue() {
111 return new LocalPool<T>(maxCapacityPerThread, interval, chunkSize);
112 }
113
114 @Override
115 protected void onRemoval(LocalPool<T> value) throws Exception {
116 super.onRemoval(value);
117 MessagePassingQueue<DefaultHandle<T>> handles = value.pooledHandles;
118 value.pooledHandles = null;
119 value.owner = null;
120 handles.clear();
121 }
122 };
123
124 protected Recycler() {
125 this(DEFAULT_MAX_CAPACITY_PER_THREAD);
126 }
127
128 protected Recycler(int maxCapacityPerThread) {
129 this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
130 }
131
132
133
134
135
136 @Deprecated
137 @SuppressWarnings("unused")
138 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
139 this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
140 }
141
142
143
144
145
146 @Deprecated
147 @SuppressWarnings("unused")
148 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
149 int ratio, int maxDelayedQueuesPerThread) {
150 this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
151 }
152
153
154
155
156
157 @Deprecated
158 @SuppressWarnings("unused")
159 protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
160 int ratio, int maxDelayedQueuesPerThread, int delayedQueueRatio) {
161 this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
162 }
163
164 protected Recycler(int maxCapacityPerThread, int ratio, int chunkSize) {
165 interval = max(0, ratio);
166 if (maxCapacityPerThread <= 0) {
167 this.maxCapacityPerThread = 0;
168 this.chunkSize = 0;
169 } else {
170 this.maxCapacityPerThread = max(4, maxCapacityPerThread);
171 this.chunkSize = max(2, min(chunkSize, this.maxCapacityPerThread >> 1));
172 }
173 }
174
175 @SuppressWarnings("unchecked")
176 public final T get() {
177 if (maxCapacityPerThread == 0 ||
178 (PlatformDependent.isVirtualThread(Thread.currentThread()) &&
179 !FastThreadLocalThread.currentThreadHasFastThreadLocal())) {
180 return newObject((Handle<T>) NOOP_HANDLE);
181 }
182 LocalPool<T> localPool = threadLocal.get();
183 DefaultHandle<T> handle = localPool.claim();
184 T obj;
185 if (handle == null) {
186 handle = localPool.newHandle();
187 if (handle != null) {
188 obj = newObject(handle);
189 handle.set(obj);
190 } else {
191 obj = newObject((Handle<T>) NOOP_HANDLE);
192 }
193 } else {
194 obj = handle.get();
195 }
196
197 return obj;
198 }
199
200
201
202
203 @Deprecated
204 public final boolean recycle(T o, Handle<T> handle) {
205 if (handle == NOOP_HANDLE) {
206 return false;
207 }
208
209 handle.recycle(o);
210 return true;
211 }
212
213 @VisibleForTesting
214 final int threadLocalSize() {
215 if (PlatformDependent.isVirtualThread(Thread.currentThread()) &&
216 !FastThreadLocalThread.currentThreadHasFastThreadLocal()) {
217 return 0;
218 }
219 LocalPool<T> localPool = threadLocal.getIfExists();
220 return localPool == null ? 0 : localPool.pooledHandles.size() + localPool.batch.size();
221 }
222
223
224
225
226 protected abstract T newObject(Handle<T> handle);
227
228 @SuppressWarnings("ClassNameSameAsAncestorName")
229 public interface Handle<T> extends ObjectPool.Handle<T> { }
230
231 @UnstableApi
232 public abstract static class EnhancedHandle<T> implements Handle<T> {
233
234 public abstract void unguardedRecycle(Object object);
235
236 private EnhancedHandle() {
237 }
238 }
239
240 private static final class DefaultHandle<T> extends EnhancedHandle<T> {
241 private static final int STATE_CLAIMED = 0;
242 private static final int STATE_AVAILABLE = 1;
243 private static final AtomicIntegerFieldUpdater<DefaultHandle<?>> STATE_UPDATER;
244 static {
245 AtomicIntegerFieldUpdater<?> updater = AtomicIntegerFieldUpdater.newUpdater(DefaultHandle.class, "state");
246
247 STATE_UPDATER = (AtomicIntegerFieldUpdater<DefaultHandle<?>>) updater;
248 }
249
250 private volatile int state;
251 private final LocalPool<T> localPool;
252 private T value;
253
254 DefaultHandle(LocalPool<T> localPool) {
255 this.localPool = localPool;
256 }
257
258 @Override
259 public void recycle(Object object) {
260 if (object != value) {
261 throw new IllegalArgumentException("object does not belong to handle");
262 }
263 localPool.release(this, true);
264 }
265
266 @Override
267 public void unguardedRecycle(Object object) {
268 if (object != value) {
269 throw new IllegalArgumentException("object does not belong to handle");
270 }
271 localPool.release(this, false);
272 }
273
274 T get() {
275 return value;
276 }
277
278 void set(T value) {
279 this.value = value;
280 }
281
282 void toClaimed() {
283 assert state == STATE_AVAILABLE;
284 STATE_UPDATER.lazySet(this, STATE_CLAIMED);
285 }
286
287 void toAvailable() {
288 int prev = STATE_UPDATER.getAndSet(this, STATE_AVAILABLE);
289 if (prev == STATE_AVAILABLE) {
290 throw new IllegalStateException("Object has been recycled already.");
291 }
292 }
293
294 void unguardedToAvailable() {
295 int prev = state;
296 if (prev == STATE_AVAILABLE) {
297 throw new IllegalStateException("Object has been recycled already.");
298 }
299 STATE_UPDATER.lazySet(this, STATE_AVAILABLE);
300 }
301 }
302
303 private static final class LocalPool<T> implements MessagePassingQueue.Consumer<DefaultHandle<T>> {
304 private final int ratioInterval;
305 private final int chunkSize;
306 private final ArrayDeque<DefaultHandle<T>> batch;
307 private volatile Thread owner;
308 private volatile MessagePassingQueue<DefaultHandle<T>> pooledHandles;
309 private int ratioCounter;
310
311 @SuppressWarnings("unchecked")
312 LocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
313 this.ratioInterval = ratioInterval;
314 this.chunkSize = chunkSize;
315 batch = new ArrayDeque<DefaultHandle<T>>(chunkSize);
316 Thread currentThread = Thread.currentThread();
317 owner = !BATCH_FAST_TL_ONLY || FastThreadLocalThread.currentThreadHasFastThreadLocal()
318 ? currentThread : null;
319 if (BLOCKING_POOL) {
320 pooledHandles = new BlockingMessageQueue<DefaultHandle<T>>(maxCapacity);
321 } else {
322 pooledHandles = (MessagePassingQueue<DefaultHandle<T>>) newMpscQueue(chunkSize, maxCapacity);
323 }
324 ratioCounter = ratioInterval;
325 }
326
327 DefaultHandle<T> claim() {
328 MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
329 if (handles == null) {
330 return null;
331 }
332 if (batch.isEmpty()) {
333 handles.drain(this, chunkSize);
334 }
335 DefaultHandle<T> handle = batch.pollLast();
336 if (null != handle) {
337 handle.toClaimed();
338 }
339 return handle;
340 }
341
342 void release(DefaultHandle<T> handle, boolean guarded) {
343 if (guarded) {
344 handle.toAvailable();
345 } else {
346 handle.unguardedToAvailable();
347 }
348 Thread owner = this.owner;
349 if (owner != null && Thread.currentThread() == owner && batch.size() < chunkSize) {
350 accept(handle);
351 } else if (owner != null && isTerminated(owner)) {
352 this.owner = null;
353 pooledHandles = null;
354 } else {
355 MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
356 if (handles != null) {
357 handles.relaxedOffer(handle);
358 }
359 }
360 }
361
362 private static boolean isTerminated(Thread owner) {
363
364
365 return PlatformDependent.isJ9Jvm() ? !owner.isAlive() : owner.getState() == Thread.State.TERMINATED;
366 }
367
368 DefaultHandle<T> newHandle() {
369 if (++ratioCounter >= ratioInterval) {
370 ratioCounter = 0;
371 return new DefaultHandle<T>(this);
372 }
373 return null;
374 }
375
376 @Override
377 public void accept(DefaultHandle<T> e) {
378 batch.addLast(e);
379 }
380 }
381
382
383
384
385
386
387
388 private static final class BlockingMessageQueue<T> implements MessagePassingQueue<T> {
389 private final Queue<T> deque;
390 private final int maxCapacity;
391
392 BlockingMessageQueue(int maxCapacity) {
393 this.maxCapacity = maxCapacity;
394
395
396
397
398
399
400
401
402
403 deque = new ArrayDeque<T>();
404 }
405
406 @Override
407 public synchronized boolean offer(T e) {
408 if (deque.size() == maxCapacity) {
409 return false;
410 }
411 return deque.offer(e);
412 }
413
414 @Override
415 public synchronized T poll() {
416 return deque.poll();
417 }
418
419 @Override
420 public synchronized T peek() {
421 return deque.peek();
422 }
423
424 @Override
425 public synchronized int size() {
426 return deque.size();
427 }
428
429 @Override
430 public synchronized void clear() {
431 deque.clear();
432 }
433
434 @Override
435 public synchronized boolean isEmpty() {
436 return deque.isEmpty();
437 }
438
439 @Override
440 public int capacity() {
441 return maxCapacity;
442 }
443
444 @Override
445 public boolean relaxedOffer(T e) {
446 return offer(e);
447 }
448
449 @Override
450 public T relaxedPoll() {
451 return poll();
452 }
453
454 @Override
455 public T relaxedPeek() {
456 return peek();
457 }
458
459 @Override
460 public int drain(Consumer<T> c, int limit) {
461 T obj;
462 int i = 0;
463 for (; i < limit && (obj = poll()) != null; i++) {
464 c.accept(obj);
465 }
466 return i;
467 }
468
469 @Override
470 public int fill(Supplier<T> s, int limit) {
471 throw new UnsupportedOperationException();
472 }
473
474 @Override
475 public int drain(Consumer<T> c) {
476 throw new UnsupportedOperationException();
477 }
478
479 @Override
480 public int fill(Supplier<T> s) {
481 throw new UnsupportedOperationException();
482 }
483
484 @Override
485 public void drain(Consumer<T> c, WaitStrategy wait, ExitCondition exit) {
486 throw new UnsupportedOperationException();
487 }
488
489 @Override
490 public void fill(Supplier<T> s, WaitStrategy wait, ExitCondition exit) {
491 throw new UnsupportedOperationException();
492 }
493 }
494 }